SNS S3 Archiver
A Lambda function that archives all messages from the Hummingbird Events SNS
topic to S3. Events are stored as clean JSON with decoded payloads, enabling
easy browsing with zcat | jq and replay to handlers.
Features
- S3 Storage: Events stored with time-hierarchical key structure for efficient time-range queries
- Clean JSON: Decodes gzip+base64 Message payload for human-readable storage
- Replay Support: Preserves all MessageAttributes with updated
content_encodingfor replay fidelity - Easy Access: Download via
aws s3 sync, browse withzcat | jq
Prerequisites
- AWS CLI configured with appropriate credentials (IAM permissions for Lambda, S3, SNS, CloudFormation)
- Podman or Docker (for containerized SAM build/deploy)
- Python 3.11 or later (for development)
Deployment
Build and deploy using containerized AWS SAM CLI:
cd sns-s3-archiver
make build # Build SAM application
make deploy # First deployment (interactive/guided)
make redeploy # Subsequent deployments (non-interactive)
Deployment outputs:
BucketName- S3 bucket name for archived eventsBucketArn- S3 bucket ARNFunctionArn- Lambda function ARN
Parameters
| Parameter | Description | Default |
|---|---|---|
ResourcePrefix |
Prefix for all resource names | myapp-prod |
SnsTopicArn |
ARN of the SNS topic to subscribe to | (required) |
SentryDsn |
Optional Sentry DSN for error tracking | "" |
RetentionDays |
Days to retain events (0=indefinite) | 0 |
Resource naming: Bucket and Lambda names are derived from ResourcePrefix:
- S3 bucket:
{ResourcePrefix}-s3-events - Lambda function:
{ResourcePrefix}-lambda-s3-archiver
Prerequisites: Requires an existing SNS topic. Deploy
hummingbird-events-topic first to create the topic, then use its
ARN for the SnsTopicArn parameter.
S3 Key Structure
Events are stored with a time-hierarchical key structure optimized for time-range queries:
sns/YYYY/MM/DD/HH/MM/TIMESTAMP#MSGID::source::kind::name.json.gz
Examples:
sns/2025/12/13/12/34/2025-12-13T12:34:56.789Z#abc12345::kubernetes::Snapshot::my-snapshot.json.gz
sns/2025/12/13/12/34/2025-12-13T12:34:56.789Z#def67890::gitlab::push::abc123def456.json.gz
The sns/ prefix separates the new format from legacy data, enabling incremental
migration. The :: delimiter separates metadata fields (avoiding conflicts with K8s
names that contain --). The resource name in the key enables efficient “latest state
per resource” queries without downloading all files.
The minute-level subdirectory (/MM/) provides write distribution for high-throughput scenarios.
Storage Format
Each S3 object contains a gzip-compressed JSON record:
{
"MessageId": "abc12345-1234-5678-9abc-def012345678",
"Timestamp": "2025-12-13T12:34:56.789Z",
"Message": {
"apiVersion": "appstudio.redhat.com/v1alpha1",
"kind": "Snapshot",
"metadata": { ... },
"spec": { ... }
},
"MessageAttributes": {
"source": { "Type": "String", "Value": "kubernetes" },
"kind": { "Type": "String", "Value": "Snapshot" },
"content_encoding": { "Type": "String", "Value": "json" }
}
}
Key points:
Messageis a decoded JSON object (not the original base64+gzip string from SNS)content_encodingis set to"json"to indicate the decoded format
Usage
Download Events Locally
# Sync entire bucket
aws s3 sync s3://bucket-name/ ./local-events/
# Sync specific time range
aws s3 sync s3://bucket-name/2025/12/13/ ./local-events/2025/12/13/
# Sync with filtering
aws s3 sync s3://bucket-name/ ./local-events/ --exclude "*" --include "*kubernetes*Snapshot*"
Browse Events
# List events for a specific hour
aws s3 ls s3://bucket-name/2025/12/13/12/ --recursive
# View a single event
aws s3 cp s3://bucket-name/2025/12/13/12/34/event.json.gz - | zcat | jq
# Local browsing
zcat ./local-events/2025/12/13/12/34/event.json.gz | jq
Replay Events
Consumer handlers should support both live SNS format and archived format:
def _decode_message(sns_record: dict) -> dict:
"""Decode Message - handles live SNS and archived formats."""
message = sns_record.get("Message", "")
encoding = sns_record.get("MessageAttributes", {}).get("content_encoding", {}).get("Value")
# Archived format: Message is already decoded
if encoding == "json" or isinstance(message, dict):
return message if isinstance(message, dict) else json.loads(message)
# Live SNS format: Message is gzip+base64
if encoding == "gzip+base64":
return json.loads(gzip.decompress(base64.b64decode(message)))
# Fallback: plain JSON string
return json.loads(message)
Compression Flow
Forwarder → SNS → Archiver → S3
1. Forwarder: gzip(payload) → base64 → SNS Message
2. SNS: Passes through with content_encoding="gzip+base64"
3. Archiver: Decodes Message, sets content_encoding="json", gzip(record) → S3
4. S3: Stores clean JSON record, gzip-compressed at object level
Development
See the main README for development workflows.
make setup # Install dependencies
make check # Lint code (ruff)
make fmt # Format code
make test # Run unit tests
make coverage # Run tests with coverage
S3 Lifecycle Policies
The bucket has two lifecycle rules:
- AbortIncompleteUploads: Automatically cleans up incomplete multipart uploads after 1 day (prevents storage cost from failed uploads)
- ExpireOldEvents: When
RetentionDays > 0, automatically deletes objects older than the specified retention period. Disabled whenRetentionDays = 0(indefinite retention, the default).
Security & Limitations
Security:
- S3 bucket blocks all public access by default
- S3 objects encrypted at rest (AES256)
- Lambda follows least privilege principle (PutObject only)
- CloudWatch logs capture all archive operations (7-day retention)
- Sentry integration for error tracking
Limitations:
- Lambda timeout: 30 seconds
- Lambda memory: 256 MB
- S3 object key length: max 1024 bytes
- Single-region deployment (follows SNS topic region)
License
This project is licensed under the GNU General Public License v3.0 or later - see the LICENSE file for details.