← All posts

Amazon Marketing Stream: Real-Time Ad Data Without Batch Processing

For three years, our reporting pipeline at the platform ran on nightly batch jobs. Every night at 2am, a fleet of Go workers would call the Amazon Advertising API to pull the previous day's campaign performance data for 2,000+ advertisers. It worked. Until it did not scale.

The problems accumulated over time: API rate limits tightened, some advertisers grew to hundreds of campaigns making their nightly sync take 20 minutes, and any API downtime meant the entire previous day's data was missing. Most importantly, our bidding algorithms were always working with data that was at least 24 hours stale. Amazon Marketing Stream changed all of this.

What Is Amazon Marketing Stream?

Amazon Marketing Stream (AMS) is Amazon's event-driven reporting system that delivers advertising performance data as near-real-time events via SNS → SQS. Instead of you pulling data from Amazon, Amazon pushes data to you as events — impressions, clicks, spend, conversions — typically 3–5 hours after the event occurs.

This is a significant architectural shift. Instead of a scheduled job that calls the API at a fixed time, you subscribe to a stream and process events continuously. Your data is always as fresh as the latest event Amazon has sent, and you never hammer the API with bulk historical pulls.

Architecture: SNS → SQS → Go Consumer → RDS

The flow is straightforward. Amazon publishes events to an SNS topic they control. You subscribe an SQS queue (in your AWS account) to that topic. Your consumer processes messages from the queue and writes to your database:

// The architecture in code:
// Amazon SNS ─────► Your SQS Queue ─────► Go Consumer ─────► RDS (upsert)
//                                                      └────► DynamoDB (dedupe)

type AmsEvent struct {
    DatasetType string          `json:"datasetType"`
    ProfileID   string          `json:"profileId"`
    Date        string          `json:"date"`
    Records     []CampaignRecord `json:"records"`
}

type CampaignRecord struct {
    CampaignID    string  `json:"campaignId"`
    Impressions   int64   `json:"impressions"`
    Clicks        int64   `json:"clicks"`
    Cost          float64 `json:"cost"`
    AttributedSales float64 `json:"attributedSales"`
}

Setting Up the SQS Subscription

Amazon provides the SNS topic ARN for your region. You create an SQS queue, subscribe it to the Amazon SNS topic, and configure the queue policy to allow Amazon to send messages:

// Queue policy allowing Amazon's SNS to send messages
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {"Service": "sns.amazonaws.com"},
    "Action": "sqs:SendMessage",
    "Resource": "arn:aws:sqs:us-east-1:YOUR_ACCOUNT:ams-events",
    "Condition": {
      "ArnLike": {
        "aws:SourceArn": "arn:aws:sns:us-east-1:AMAZON_ACCOUNT:advertising-marketing-stream-*"
      }
    }
  }]
}

The Consumer: Core Processing Loop

type AMSConsumer struct {
    sqs    *sqs.Client
    dynamo *dynamodb.Client
    db     *sql.DB
    qURL   string
}

func (c *AMSConsumer) Run(ctx context.Context) error {
    slog.Info("AMS consumer starting")
    for {
        select {
        case <-ctx.Done():
            slog.Info("AMS consumer stopping")
            return nil
        default:
        }

        output, err := c.sqs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
            QueueUrl:            &c.qURL,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds:     20,
        })
        if err != nil {
            if ctx.Err() != nil { return nil }
            slog.Error("receive error", "err", err)
            time.Sleep(5 * time.Second)
            continue
        }

        var wg sync.WaitGroup
        for _, msg := range output.Messages {
            wg.Add(1)
            go func(m sqstypes.Message) {
                defer wg.Done()
                if err := c.process(ctx, m); err != nil {
                    slog.Error("process error", "msgID", *m.MessageId, "err", err)
                    return // leave in queue for retry
                }
                c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{
                    QueueUrl:      &c.qURL,
                    ReceiptHandle: m.ReceiptHandle,
                })
            }(msg)
        }
        wg.Wait()
    }
}

Idempotency: The Most Important Part

AMS delivers at-least-once. The same event — with identical campaign performance data for a given date — can arrive multiple times. Without idempotency, you will double-count impressions, clicks, and spend. Every write to your database must be an upsert keyed on something that identifies the specific dataset:

func (c *AMSConsumer) process(ctx context.Context, msg sqstypes.Message) error {
    var snsNotification struct {
        Message string `json:"Message"`
    }
    json.Unmarshal([]byte(*msg.Body), &snsNotification)

    var event AMSEvent
    json.Unmarshal([]byte(snsNotification.Message), &event)

    // Deduplication using DynamoDB conditional write
    // Key: profileID + datasetType + date + a hash of the payload
    dedupeKey := fmt.Sprintf("%s#%s#%s", event.ProfileID, event.DatasetType, event.Date)
    _, err := c.dynamo.PutItem(ctx, &dynamodb.PutItemInput{
        TableName: aws.String("ams-dedup"),
        Item: map[string]dynamotypes.AttributeValue{
            "pk":  &dynamotypes.AttributeValueMemberS{Value: dedupeKey},
            "ttl": &dynamotypes.AttributeValueMemberN{Value: strconv.FormatInt(
                time.Now().Add(48*time.Hour).Unix(), 10,
            )},
        },
        ConditionExpression: aws.String("attribute_not_exists(pk)"),
    })

    var condErr *dynamotypes.ConditionalCheckFailedException
    if errors.As(err, &condErr) {
        // Duplicate event — skip processing but delete from queue
        return nil
    }
    if err != nil { return fmt.Errorf("dedup check: %w", err) }

    // Process each record as an upsert
    return c.upsertRecords(ctx, event)
}

func (c *AMSConsumer) upsertRecords(ctx context.Context, event AMSEvent) error {
    const query = `
        INSERT INTO campaign_performance (profile_id, campaign_id, date, impressions, clicks, cost, sales)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
        ON CONFLICT (profile_id, campaign_id, date)
        DO UPDATE SET
            impressions = EXCLUDED.impressions,
            clicks      = EXCLUDED.clicks,
            cost        = EXCLUDED.cost,
            sales       = EXCLUDED.sales,
            updated_at  = NOW()`

    tx, err := c.db.BeginTx(ctx, nil)
    if err != nil { return err }
    defer tx.Rollback()

    stmt, err := tx.PrepareContext(ctx, query)
    if err != nil { return err }
    defer stmt.Close()

    for _, record := range event.Records {
        _, err = stmt.ExecContext(ctx,
            event.ProfileID, record.CampaignID, event.Date,
            record.Impressions, record.Clicks, record.Cost, record.AttributedSales)
        if err != nil { return fmt.Errorf("upsert campaign %s: %w", record.CampaignID, err) }
    }

    return tx.Commit()
}

Handling Backpressure

During Amazon's peak reporting windows (typically evening UTC), AMS can deliver thousands of events per minute. Key configuration choices that kept us stable:

  • SQS visibility timeout: 5 minutes — generous for batches with many records
  • Dead-letter queue after 5 failures with CloudWatch alarm on depth
  • Consumer concurrency: 10 goroutines — enough parallelism without overwhelming the database
  • ECS autoscaling on queue depth — scale out consumers when queue grows
  • DynamoDB TTL on dedup table: 48 hours — auto-cleanup without management

Monitoring

The metrics we track for the AMS pipeline:

  • Queue depth (scale trigger) and oldest message age
  • Processing duration per event type
  • Duplicate rate (high duplicates indicate Amazon is resending aggressively)
  • Upsert rate and error rate
  • DLQ depth (processing bugs)

Results

After migrating from batch to AMS, our Ads API call volume dropped by ~40%. Report data is now available within hours rather than 24 hours. Our bidding algorithms can respond to same-day performance signals. And the 2am cron job that regularly triggered on-call alerts is gone. The migration took about two weeks end-to-end; the operational improvement has been permanent.

Comments