Amazon Marketing Stream: Real-Time Ad Data Without Batch Processing
For three years, our reporting pipeline at the platform ran on nightly batch jobs. Every night at 2am, a fleet of Go workers would call the Amazon Advertising API to pull the previous day's campaign performance data for 2,000+ advertisers. It worked. Until it did not scale.
The problems accumulated over time: API rate limits tightened, some advertisers grew to hundreds of campaigns making their nightly sync take 20 minutes, and any API downtime meant the entire previous day's data was missing. Most importantly, our bidding algorithms were always working with data that was at least 24 hours stale. Amazon Marketing Stream changed all of this.
What Is Amazon Marketing Stream?
Amazon Marketing Stream (AMS) is Amazon's event-driven reporting system that delivers advertising performance data as near-real-time events via SNS → SQS. Instead of you pulling data from Amazon, Amazon pushes data to you as events — impressions, clicks, spend, conversions — typically 3–5 hours after the event occurs.
This is a significant architectural shift. Instead of a scheduled job that calls the API at a fixed time, you subscribe to a stream and process events continuously. Your data is always as fresh as the latest event Amazon has sent, and you never hammer the API with bulk historical pulls.
Architecture: SNS → SQS → Go Consumer → RDS
The flow is straightforward. Amazon publishes events to an SNS topic they control. You subscribe an SQS queue (in your AWS account) to that topic. Your consumer processes messages from the queue and writes to your database:
// The architecture in code:
// Amazon SNS ─────► Your SQS Queue ─────► Go Consumer ─────► RDS (upsert)
// └────► DynamoDB (dedupe)
type AmsEvent struct {
DatasetType string `json:"datasetType"`
ProfileID string `json:"profileId"`
Date string `json:"date"`
Records []CampaignRecord `json:"records"`
}
type CampaignRecord struct {
CampaignID string `json:"campaignId"`
Impressions int64 `json:"impressions"`
Clicks int64 `json:"clicks"`
Cost float64 `json:"cost"`
AttributedSales float64 `json:"attributedSales"`
}
Setting Up the SQS Subscription
Amazon provides the SNS topic ARN for your region. You create an SQS queue, subscribe it to the Amazon SNS topic, and configure the queue policy to allow Amazon to send messages:
// Queue policy allowing Amazon's SNS to send messages
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:us-east-1:YOUR_ACCOUNT:ams-events",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-1:AMAZON_ACCOUNT:advertising-marketing-stream-*"
}
}
}]
}
The Consumer: Core Processing Loop
type AMSConsumer struct {
sqs *sqs.Client
dynamo *dynamodb.Client
db *sql.DB
qURL string
}
func (c *AMSConsumer) Run(ctx context.Context) error {
slog.Info("AMS consumer starting")
for {
select {
case <-ctx.Done():
slog.Info("AMS consumer stopping")
return nil
default:
}
output, err := c.sqs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &c.qURL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20,
})
if err != nil {
if ctx.Err() != nil { return nil }
slog.Error("receive error", "err", err)
time.Sleep(5 * time.Second)
continue
}
var wg sync.WaitGroup
for _, msg := range output.Messages {
wg.Add(1)
go func(m sqstypes.Message) {
defer wg.Done()
if err := c.process(ctx, m); err != nil {
slog.Error("process error", "msgID", *m.MessageId, "err", err)
return // leave in queue for retry
}
c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: &c.qURL,
ReceiptHandle: m.ReceiptHandle,
})
}(msg)
}
wg.Wait()
}
}
Idempotency: The Most Important Part
AMS delivers at-least-once. The same event — with identical campaign performance data for a given date — can arrive multiple times. Without idempotency, you will double-count impressions, clicks, and spend. Every write to your database must be an upsert keyed on something that identifies the specific dataset:
func (c *AMSConsumer) process(ctx context.Context, msg sqstypes.Message) error {
var snsNotification struct {
Message string `json:"Message"`
}
json.Unmarshal([]byte(*msg.Body), &snsNotification)
var event AMSEvent
json.Unmarshal([]byte(snsNotification.Message), &event)
// Deduplication using DynamoDB conditional write
// Key: profileID + datasetType + date + a hash of the payload
dedupeKey := fmt.Sprintf("%s#%s#%s", event.ProfileID, event.DatasetType, event.Date)
_, err := c.dynamo.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String("ams-dedup"),
Item: map[string]dynamotypes.AttributeValue{
"pk": &dynamotypes.AttributeValueMemberS{Value: dedupeKey},
"ttl": &dynamotypes.AttributeValueMemberN{Value: strconv.FormatInt(
time.Now().Add(48*time.Hour).Unix(), 10,
)},
},
ConditionExpression: aws.String("attribute_not_exists(pk)"),
})
var condErr *dynamotypes.ConditionalCheckFailedException
if errors.As(err, &condErr) {
// Duplicate event — skip processing but delete from queue
return nil
}
if err != nil { return fmt.Errorf("dedup check: %w", err) }
// Process each record as an upsert
return c.upsertRecords(ctx, event)
}
func (c *AMSConsumer) upsertRecords(ctx context.Context, event AMSEvent) error {
const query = `
INSERT INTO campaign_performance (profile_id, campaign_id, date, impressions, clicks, cost, sales)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (profile_id, campaign_id, date)
DO UPDATE SET
impressions = EXCLUDED.impressions,
clicks = EXCLUDED.clicks,
cost = EXCLUDED.cost,
sales = EXCLUDED.sales,
updated_at = NOW()`
tx, err := c.db.BeginTx(ctx, nil)
if err != nil { return err }
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil { return err }
defer stmt.Close()
for _, record := range event.Records {
_, err = stmt.ExecContext(ctx,
event.ProfileID, record.CampaignID, event.Date,
record.Impressions, record.Clicks, record.Cost, record.AttributedSales)
if err != nil { return fmt.Errorf("upsert campaign %s: %w", record.CampaignID, err) }
}
return tx.Commit()
}
Handling Backpressure
During Amazon's peak reporting windows (typically evening UTC), AMS can deliver thousands of events per minute. Key configuration choices that kept us stable:
- SQS visibility timeout: 5 minutes — generous for batches with many records
- Dead-letter queue after 5 failures with CloudWatch alarm on depth
- Consumer concurrency: 10 goroutines — enough parallelism without overwhelming the database
- ECS autoscaling on queue depth — scale out consumers when queue grows
- DynamoDB TTL on dedup table: 48 hours — auto-cleanup without management
Monitoring
The metrics we track for the AMS pipeline:
- Queue depth (scale trigger) and oldest message age
- Processing duration per event type
- Duplicate rate (high duplicates indicate Amazon is resending aggressively)
- Upsert rate and error rate
- DLQ depth (processing bugs)
Results
After migrating from batch to AMS, our Ads API call volume dropped by ~40%. Report data is now available within hours rather than 24 hours. Our bidding algorithms can respond to same-day performance signals. And the 2am cron job that regularly triggered on-call alerts is gone. The migration took about two weeks end-to-end; the operational improvement has been permanent.
Comments
Post a Comment