ADR-053: Claim Check TTL and Garbage Collection
Status
Proposed - Pending review
Context
The claim check pattern (RFC-033) stores large payloads in object storage. Without proper cleanup, storage costs grow unboundedly as claims accumulate. We need a strategy for:
- Automatic Expiration: Remove claims after consumer retrieval
- Orphan Cleanup: Delete claims from failed/crashed consumers
- Cost Control: Prevent storage bloat from forgotten claims
- Audit Trail: Track claim lifecycle for debugging
- Configurable TTL: Different namespaces have different retention needs
Problem Statement
Scenario 1: Happy Path
Producer → Upload claim → Consumer retrieves → Claim should be deleted
(claim valid) (immediate cleanup)
Scenario 2: Consumer Crash
Producer → Upload claim → Consumer crashes → Claim orphaned
(claim valid) (never retrieved) (needs TTL cleanup)
Scenario 3: Slow Consumer
Producer → Upload claim → Long processing → Consumer retrieves → Claim deleted
(claim valid) (still valid) (delayed cleanup)
Scenario 4: Replay/Redelivery
Producer → Upload claim → Consumer retrieves → Message redelivered → Claim missing
(claim valid) (claim deleted) (ERROR!)
Requirements
- No Orphans: All claims must eventually expire
- Safe TTL: TTL must account for max consumer processing time
- Immediate Cleanup Option: Delete claim after successful retrieval
- Idempotent Retrieval: Multiple retrievals should work (for redelivery scenarios)
- Cost Effective: Minimize storage costs without breaking functionality
Decision
Use a two-phase TTL strategy: short consumer-driven cleanup + long safety net.
Strategy Overview
┌─────────────────────────── ──────────────────────────────────┐
│ Claim Lifecycle │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer Upload │
│ ├─ Set bucket lifecycle policy (safety net: 24h) │
│ └─ Store claim reference in message │
│ │
│ Consumer Retrieval │
│ ├─ Download claim payload │
│ ├─ Verify checksum │
│ ├─ Process message │
│ └─ [Optional] Delete claim immediately │
│ │
│ Background Cleanup (if not deleted by consumer) │
│ └─ Bucket lifecycle policy expires claim after 24h │
│ │
└─────────────────────────────────────────────────────────────┘
Configuration Model
# Namespace-level claim check configuration
namespace: video-processing
claim_check:
enabled: true
threshold: 1048576 # 1MB
# TTL Strategy
ttl:
# Safety net: hard expiration via bucket lifecycle
max_age: 86400 # 24 hours
# Consumer behavior: delete after successful retrieval
delete_after_read: true # Default: true
# Redelivery protection: keep claim briefly after first read
retention_after_read: 300 # 5 minutes (for message redelivery)
# Grace period before lifecycle policy kicks in
# Allows for slow consumers, retries, debugging
grace_period: 3600 # 1 hour minimum before eligible for cleanup
Implementation
1. Producer: Set Bucket Lifecycle at Startup
func (p *Producer) Start(ctx context.Context) error {
// ... existing startup logic
if p.claimCheck != nil {
// Ensure bucket exists with lifecycle policy
if err := p.ensureBucketLifecycle(ctx); err != nil {
return fmt.Errorf("failed to configure claim check bucket: %w", err)
}
}
return nil
}
func (p *Producer) ensureBucketLifecycle(ctx context.Context) error {
bucket := p.claimCheck.Bucket
// Create bucket if needed
if err := p.objectStore.CreateBucket(ctx, bucket); err != nil {
return err
}
// Set lifecycle policy (idempotent)
policy := &LifecyclePolicy{
Rules: []LifecycleRule{
{
ID: "expire-claims",
Status: "Enabled",
Filter: &LifecycleFilter{
Prefix: p.namespace + "/", // Only claims from this namespace
},
Expiration: &LifecycleExpiration{
Days: p.claimCheck.TTL.MaxAge / 86400, // Convert seconds to days
},
},
},
}
return p.objectStore.SetBucketLifecycle(ctx, bucket, policy)
}
2. Producer: Upload Claim with Metadata
func (p *Producer) uploadClaim(ctx context.Context, payload []byte) (*ClaimCheckMessage, error) {
claimID := generateClaimID()
objectKey := fmt.Sprintf("%s/%s/%s", p.namespace, topic, claimID)
// Compress if configured
data := payload
if p.claimCheck.Compression != "none" {
data = compress(payload, p.claimCheck.Compression)
}
// Upload with metadata
metadata := map[string]string{
"prism-namespace": p.namespace,
"prism-created-at": time.Now().Format(time.RFC3339),
"prism-original-size": strconv.FormatInt(int64(len(payload)), 10),
"prism-compression": p.claimCheck.Compression,
"prism-checksum": hex.EncodeToString(sha256.Sum256(payload)),
}
if err := p.objectStore.PutWithMetadata(ctx, p.claimCheck.Bucket, objectKey, data, metadata); err != nil {
return nil, fmt.Errorf("claim upload failed: %w", err)
}
return &ClaimCheckMessage{
ClaimID: claimID,
Bucket: p.claimCheck.Bucket,
ObjectKey: objectKey,
OriginalSize: int64(len(payload)),
Compression: p.claimCheck.Compression,
Checksum: sha256.Sum256(payload),
CreatedAt: time.Now().Unix(),
ExpiresAt: time.Now().Add(time.Duration(p.claimCheck.TTL.MaxAge) * time.Second).Unix(),
}, nil
}
3. Consumer: Retrieve and Conditionally Delete
func (c *Consumer) retrieveClaim(ctx context.Context, claim *ClaimCheckMessage) ([]byte, error) {
// Check if claim has expired
if time.Now().Unix() > claim.ExpiresAt {
return nil, fmt.Errorf("claim expired: %s", claim.ClaimID)
}
// Download claim
data, err := c.objectStore.Get(ctx, claim.Bucket, claim.ObjectKey)
if err != nil {
if errors.Is(err, ErrObjectNotFound) {
return nil, fmt.Errorf("claim not found (may have expired): %s", claim.ClaimID)
}
return nil, fmt.Errorf("claim retrieval failed: %w", err)
}
// Verify checksum
actualChecksum := sha256.Sum256(data)
if !bytes.Equal(actualChecksum[:], claim.Checksum[:]) {
return nil, fmt.Errorf("claim checksum mismatch: %s", claim.ClaimID)
}
// Decompress if needed
if claim.Compression != "none" {
data, err = decompress(data, claim.Compression)
if err != nil {
return nil, fmt.Errorf("claim decompression failed: %w", err)
}
}
// Delete claim based on configuration
if c.claimCheck.TTL.DeleteAfterRead {
// Option 1: Immediate deletion (default)
go c.deleteClaim(context.Background(), claim)
} else if c.claimCheck.TTL.RetentionAfterRead > 0 {
// Option 2: Delayed deletion (for redelivery protection)
go c.scheduleClaimDeletion(context.Background(), claim,
time.Duration(c.claimCheck.TTL.RetentionAfterRead)*time.Second)
}
// Otherwise: Let bucket lifecycle policy handle cleanup
return data, nil
}
func (c *Consumer) deleteClaim(ctx context.Context, claim *ClaimCheckMessage) {
if err := c.objectStore.Delete(ctx, claim.Bucket, claim.ObjectKey); err != nil {
// Log but don't fail - lifecycle policy will clean up eventually
slog.Warn("failed to delete claim after read",
"claim_id", claim.ClaimID,
"error", err)
// Emit metric for monitoring
c.metrics.ClaimDeleteFailures.Inc()
} else {
slog.Debug("claim deleted after retrieval", "claim_id", claim.ClaimID)
c.metrics.ClaimsDeleted.Inc()
}
}
func (c *Consumer) scheduleClaimDeletion(ctx context.Context, claim *ClaimCheckMessage, delay time.Duration) {
time.Sleep(delay)
c.deleteClaim(ctx, claim)
}
4. Proxy Validation: TTL Compatibility Check
func (p *Proxy) validateClaimCheckTTL(producerTTL, consumerTTL ClaimCheckTTL) error {
// Both must use same max age
if producerTTL.MaxAge != consumerTTL.MaxAge {
return fmt.Errorf("producer/consumer max_age mismatch: %d != %d",
producerTTL.MaxAge, consumerTTL.MaxAge)
}
// If consumer keeps claims after read, ensure TTL is longer than retention
if consumerTTL.RetentionAfterRead > 0 {
if consumerTTL.RetentionAfterRead > producerTTL.MaxAge {
return fmt.Errorf("retention_after_read (%d) exceeds max_age (%d)",
consumerTTL.RetentionAfterRead, producerTTL.MaxAge)
}
}
// Warn if delete_after_read differs (not fatal, but inconsistent)
if producerTTL.DeleteAfterRead != consumerTTL.DeleteAfterRead {
slog.Warn("producer/consumer delete_after_read mismatch",
"producer", producerTTL.DeleteAfterRead,
"consumer", consumerTTL.DeleteAfterRead)
}
return nil
}
TTL Configuration Examples
Example 1: Aggressive Cleanup (Minimize Storage Cost)
claim_check:
ttl:
max_age: 3600 # 1 hour safety net
delete_after_read: true # Immediate deletion
retention_after_read: 0 # No redelivery protection
grace_period: 300 # 5 min before eligible for lifecycle
Use Case: High-throughput, reliable consumers, no message redelivery.
Example 2: Conservative (Handle Slow Consumers)
claim_check:
ttl:
max_age: 86400 # 24 hour safety net
delete_after_read: false # Let lifecycle policy handle cleanup
retention_after_read: 0 # N/A (not deleting after read)
grace_period: 7200 # 2 hours for slow processing
Use Case: Long-running ML processing, batch jobs, debugging.
Example 3: Redelivery Protection (Handle Message Broker Retries)
claim_check:
ttl:
max_age: 7200 # 2 hour safety net
delete_after_read: true # Delete to save costs
retention_after_read: 600 # Keep 10 min for retries
grace_period: 600 # 10 min before eligible
Use Case: NATS/Kafka with message redelivery on failure.
Consequences
Positive
- No Orphaned Claims: Bucket lifecycle policy ensures eventual cleanup
- Cost Optimization: Immediate deletion reduces storage costs
- Flexible: Configuration adapts to different use cases
- Redelivery Safe: Retention window protects against message redelivery edge cases
- Fail-Safe: If consumer deletion fails, lifecycle policy backstops
- Namespace Isolation: Each namespace controls its own TTL policy
- Debugging Friendly: Long TTLs enable post-mortem investigation
Negative
- Complexity: Multiple cleanup mechanisms (consumer + lifecycle)
- Redelivery Edge Case: If claim deleted and message redelivered, consumer fails
- Storage Cost: Retention windows increase storage usage
- Clock Skew: TTL relies on accurate system clocks
- Lifecycle Granularity: S3 lifecycle runs once per day (not immediate)
Neutral
- Configuration Surface: More TTL knobs = more tuning required
- Monitoring Need: Must track claim creation/deletion rates
- Testing Complexity: TTL tests require time simulation
Lifecycle Policy Details
S3/MinIO Lifecycle Behavior
Lifecycle Rules:
<LifecycleConfiguration>
<Rule>
<ID>expire-claims</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>video-processing/</Prefix>
</Filter>
<Expiration>
<Days>1</Days>
</Expiration>
</Rule>
</LifecycleConfiguration>
Timing:
- S3 processes lifecycle rules once per day (typically midnight UTC)
- Objects become eligible for deletion after
Days
have passed - Deletion is not immediate - may take up to 48 hours
- MinIO processes rules hourly (more responsive)
Limitations:
- Cannot set expiration <1 day on S3 (MinIO supports minutes)
- Lifecycle applies to entire bucket or prefix
- Cannot set per-object expiration (workaround: use object tags)
Workaround for Fine-Grained TTL
If sub-day TTL needed:
// Use object tagging for fine-grained expiration
func (p *Producer) uploadClaimWithTag(ctx context.Context, payload []byte) error {
// Upload with expiration tag
tags := map[string]string{
"expires-at": strconv.FormatInt(time.Now().Add(1*time.Hour).Unix(), 10),
}
err := p.objectStore.PutWithTags(ctx, bucket, key, data, tags)
// ...
// Separate cleanup service reads tags and deletes expired claims
// (More complex, but enables hour/minute-level TTL)
}
Monitoring and Alerting
Metrics to Track
// Claim lifecycle metrics
type ClaimCheckMetrics struct {
// Producer metrics
ClaimsCreated prometheus.Counter
ClaimUploadDuration prometheus.Histogram
ClaimUploadBytes prometheus.Histogram
// Consumer metrics
ClaimsRetrieved prometheus.Counter
ClaimsDeleted prometheus.Counter
ClaimDeleteFailures prometheus.Counter
ClaimRetrievalDuration prometheus.Histogram
ClaimNotFoundErrors prometheus.Counter // Expired or missing
// Lifecycle metrics
ClaimsExpired prometheus.Counter // From lifecycle policy
OrphanedClaims prometheus.Gauge // Claims > max_age not deleted
}
Alerts
# Alert if claims not being deleted (storage leak)
- alert: ClaimCheckStorageLeak
expr: rate(claim_check_claims_created[5m]) > rate(claim_check_claims_deleted[5m]) * 1.5
for: 30m
labels:
severity: warning
annotations:
summary: "Claim check storage leak detected"
description: "Claims being created faster than deleted for {{ $labels.namespace }}"
# Alert if many claims not found (TTL too short)
- alert: ClaimCheckTTLTooShort
expr: rate(claim_check_claim_not_found_errors[5m]) > 0.01
for: 10m
labels:
severity: warning
annotations:
summary: "Claim check TTL may be too short"
description: "Consumers encountering expired claims in {{ $labels.namespace }}"
# Alert if claim delete failures
- alert: ClaimCheckDeleteFailures
expr: rate(claim_check_claim_delete_failures[5m]) > 0.1
for: 10m
labels:
severity: warning
annotations:
summary: "Claim check delete failures"
description: "Consumer failing to delete claims in {{ $labels.namespace }}"
Dashboard Panels
# Claim creation rate
rate(claim_check_claims_created[5m])
# Claim deletion rate
rate(claim_check_claims_deleted[5m])
# Outstanding claims (created - deleted)
sum(claim_check_claims_created) - sum(claim_check_claims_deleted)
# Average claim lifetime (creation to deletion)
histogram_quantile(0.50, claim_check_claim_lifetime_seconds_bucket)
# Storage usage (estimated)
sum(claim_check_claim_upload_bytes) * (1 - rate(claim_check_claims_deleted[5m]) / rate(claim_check_claims_created[5m]))
Testing Strategy
Unit Tests (Time Simulation)
func TestClaimExpiration(t *testing.T) {
// Use mock clock
clock := clockwork.NewFakeClock()
producer := NewProducerWithClock(config, clock)
claim, err := producer.uploadClaim(ctx, largePayload)
require.NoError(t, err)
// Advance clock past expiration
clock.Advance(25 * time.Hour)
// Consumer should fail to retrieve
_, err = consumer.retrieveClaim(ctx, claim)
assert.ErrorIs(t, err, ErrClaimExpired)
}
func TestDeleteAfterRead(t *testing.T) {
config := Config{
ClaimCheck: &ClaimCheckConfig{
TTL: ClaimCheckTTL{
DeleteAfterRead: true,
},
},
}
consumer := NewConsumer(config)
payload, err := consumer.retrieveClaim(ctx, claim)
require.NoError(t, err)
// Wait for async deletion
time.Sleep(100 * time.Millisecond)
// Claim should be gone
exists, err := objectStore.Exists(ctx, claim.Bucket, claim.ObjectKey)
require.NoError(t, err)
assert.False(t, exists)
}
Integration Tests (MinIO Lifecycle)
func TestLifecycleCleanup(t *testing.T) {
// Start MinIO with lifecycle enabled
driver, cleanup := setupMinIOWithLifecycle(t)
defer cleanup()
// Upload claim
claim, err := producer.uploadClaim(ctx, largePayload)
require.NoError(t, err)
// Verify claim exists
exists, _ := driver.Exists(ctx, claim.Bucket, claim.ObjectKey)
assert.True(t, exists)
// Fast-forward lifecycle (MinIO test mode can run lifecycle on-demand)
driver.TriggerLifecycle(ctx, claim.Bucket)
// Claim should be deleted after lifecycle runs
exists, _ = driver.Exists(ctx, claim.Bucket, claim.ObjectKey)
assert.False(t, exists)
}
Alternatives Considered
1. No Automatic Cleanup
Require manual claim deletion by consumers.
Rejected: Error-prone, orphans accumulate, unbounded storage cost.
2. Separate Cleanup Service
Background service scans object store and deletes expired claims.
Rejected: Adds operational complexity, lifecycle policies are simpler.
3. Database-Tracked TTL
Store claim metadata in database with TTL, delete objects based on DB.
Rejected: Adds database dependency, lifecycle policies are native to object stores.
4. Always Delete Immediately
No retention window, delete claim as soon as consumer retrieves.
Rejected: Breaks message redelivery scenarios (NATS retries, Kafka rebalancing).
5. Never Delete Explicitly
Rely entirely on lifecycle policies for cleanup.
Rejected: Storage costs higher, lifecycle granularity insufficient (daily runs).
Open Questions
- Cross-Namespace Claims: Can producer in namespace A store claim for consumer in namespace B?
- Answer: No - namespace isolation enforced by bucket prefix
- Multipart Cleanup: How are abandoned multipart uploads cleaned?
- Answer: Separate lifecycle rule for incomplete multipart uploads
- Claim Reuse: Should we support updating/extending claim TTL?
- Answer: No - simplicity over flexibility
- Storage Class: Should old claims move to cheaper storage (Glacier)?
- Answer: Deferred - typically deleted before archival makes sense
References
Related Documents
- RFC-033: Claim Check Pattern for Large Payloads
- ADR-051: MinIO for Claim Check Testing
- ADR-052: Object Store Interface Design