SNS S3 Archiver

A Lambda function that archives all messages from the Hummingbird Events SNS topic to S3. Events are stored as clean JSON with decoded payloads, enabling easy browsing with zcat | jq and replay to handlers.

Features

  • S3 Storage: Events stored with time-hierarchical key structure for efficient time-range queries
  • Clean JSON: Decodes gzip+base64 Message payload for human-readable storage
  • Replay Support: Preserves all MessageAttributes with updated content_encoding for replay fidelity
  • Easy Access: Download via aws s3 sync, browse with zcat | jq

Prerequisites

  • AWS CLI configured with appropriate credentials (IAM permissions for Lambda, S3, SNS, CloudFormation)
  • Podman or Docker (for containerized SAM build/deploy)
  • Python 3.11 or later (for development)

Deployment

Build and deploy using containerized AWS SAM CLI:

cd sns-s3-archiver
make build     # Build SAM application
make deploy    # First deployment (interactive/guided)
make redeploy  # Subsequent deployments (non-interactive)

Deployment outputs:

  • BucketName - S3 bucket name for archived events
  • BucketArn - S3 bucket ARN
  • FunctionArn - Lambda function ARN

Parameters

Parameter Description Default
ResourcePrefix Prefix for all resource names myapp-prod
SnsTopicArn ARN of the SNS topic to subscribe to (required)
SentryDsn Optional Sentry DSN for error tracking ""
RetentionDays Days to retain events (0=indefinite) 0

Resource naming: Bucket and Lambda names are derived from ResourcePrefix:

  • S3 bucket: {ResourcePrefix}-s3-events
  • Lambda function: {ResourcePrefix}-lambda-s3-archiver

Prerequisites: Requires an existing SNS topic. Deploy hummingbird-events-topic first to create the topic, then use its ARN for the SnsTopicArn parameter.

S3 Key Structure

Events are stored with a time-hierarchical key structure optimized for time-range queries:

sns/YYYY/MM/DD/HH/MM/TIMESTAMP#MSGID::source::kind::name.json.gz

Examples:

sns/2025/12/13/12/34/2025-12-13T12:34:56.789Z#abc12345::kubernetes::Snapshot::my-snapshot.json.gz
sns/2025/12/13/12/34/2025-12-13T12:34:56.789Z#def67890::gitlab::push::abc123def456.json.gz

The sns/ prefix separates the new format from legacy data, enabling incremental migration. The :: delimiter separates metadata fields (avoiding conflicts with K8s names that contain --). The resource name in the key enables efficient “latest state per resource” queries without downloading all files.

The minute-level subdirectory (/MM/) provides write distribution for high-throughput scenarios.

Storage Format

Each S3 object contains a gzip-compressed JSON record:

{
  "MessageId": "abc12345-1234-5678-9abc-def012345678",
  "Timestamp": "2025-12-13T12:34:56.789Z",
  "Message": {
    "apiVersion": "appstudio.redhat.com/v1alpha1",
    "kind": "Snapshot",
    "metadata": { ... },
    "spec": { ... }
  },
  "MessageAttributes": {
    "source": { "Type": "String", "Value": "kubernetes" },
    "kind": { "Type": "String", "Value": "Snapshot" },
    "content_encoding": { "Type": "String", "Value": "json" }
  }
}

Key points:

  • Message is a decoded JSON object (not the original base64+gzip string from SNS)
  • content_encoding is set to "json" to indicate the decoded format

Usage

Download Events Locally

# Sync entire bucket
aws s3 sync s3://bucket-name/ ./local-events/

# Sync specific time range
aws s3 sync s3://bucket-name/2025/12/13/ ./local-events/2025/12/13/

# Sync with filtering
aws s3 sync s3://bucket-name/ ./local-events/ --exclude "*" --include "*kubernetes*Snapshot*"

Browse Events

# List events for a specific hour
aws s3 ls s3://bucket-name/2025/12/13/12/ --recursive

# View a single event
aws s3 cp s3://bucket-name/2025/12/13/12/34/event.json.gz - | zcat | jq

# Local browsing
zcat ./local-events/2025/12/13/12/34/event.json.gz | jq

Replay Events

Consumer handlers should support both live SNS format and archived format:

def _decode_message(sns_record: dict) -> dict:
    """Decode Message - handles live SNS and archived formats."""
    message = sns_record.get("Message", "")
    encoding = sns_record.get("MessageAttributes", {}).get("content_encoding", {}).get("Value")

    # Archived format: Message is already decoded
    if encoding == "json" or isinstance(message, dict):
        return message if isinstance(message, dict) else json.loads(message)

    # Live SNS format: Message is gzip+base64
    if encoding == "gzip+base64":
        return json.loads(gzip.decompress(base64.b64decode(message)))

    # Fallback: plain JSON string
    return json.loads(message)

Compression Flow

Forwarder → SNS → Archiver → S3

1. Forwarder: gzip(payload) → base64 → SNS Message
2. SNS: Passes through with content_encoding="gzip+base64"
3. Archiver: Decodes Message, sets content_encoding="json", gzip(record) → S3
4. S3: Stores clean JSON record, gzip-compressed at object level

Development

See the main README for development workflows.

make setup     # Install dependencies
make check     # Lint code (ruff)
make fmt       # Format code
make test      # Run unit tests
make coverage  # Run tests with coverage

S3 Lifecycle Policies

The bucket has two lifecycle rules:

  • AbortIncompleteUploads: Automatically cleans up incomplete multipart uploads after 1 day (prevents storage cost from failed uploads)
  • ExpireOldEvents: When RetentionDays > 0, automatically deletes objects older than the specified retention period. Disabled when RetentionDays = 0 (indefinite retention, the default).

Security & Limitations

Security:

  • S3 bucket blocks all public access by default
  • S3 objects encrypted at rest (AES256)
  • Lambda follows least privilege principle (PutObject only)
  • CloudWatch logs capture all archive operations (7-day retention)
  • Sentry integration for error tracking

Limitations:

  • Lambda timeout: 30 seconds
  • Lambda memory: 256 MB
  • S3 object key length: max 1024 bytes
  • Single-region deployment (follows SNS topic region)

License

This project is licensed under the GNU General Public License v3.0 or later - see the LICENSE file for details.