RFC-033: Claim Check Pattern for Large Payloads
Status
Proposed - Design phase, awaiting review
Context
Messaging systems typically have message size limits (NATS: 1MB default, Kafka: 1MB default, Redis: 512MB max). Sending large payloads (videos, images, ML models, datasets) through message queues creates several problems:
- Performance Degradation: Large messages slow down message brokers and increase network congestion
- Memory Pressure: Brokers must buffer large messages, causing memory issues
- Increased Latency: Large message serialization/deserialization adds latency
- Size Limits: Hard limits prevent sending certain payloads
- Cost: Cloud message brokers charge per GB transferred
The Claim Check Pattern solves this by:
- Storing large payloads in object storage (S3, MinIO)
- Sending only a reference (claim check) through the message queue
- Consumer retrieves payload from object storage using the claim check
This is a standard enterprise integration pattern (EIP) used by Azure Service Bus, AWS EventBridge, and Google Pub/Sub.
Proposal
Add optional Claim Check slot to Producer and Consumer patterns, coordinating through namespace-level requirements.
Architecture
┌─────────────┐
│ Producer │
│ │
│ 1. Check │──────┐
│ payload │ │
│ size │ │
│ │ ▼
│ 2. Upload │ ┌──────────────┐
│ if > X │─▶│ Object Store │
│ │ │ (MinIO/S3) │
│ 3. Send │ └──────────────┘
│ claim │ │
│ check │ │
└─────────────┘ │
│ │
│ Message │
│ (small) │
▼ │
┌─────────────┐ │
│ Message │ │
│ Broker │ │
│ (NATS/ │ │
│ Kafka) │ │
└─────────────┘ │
│ │
│ │
▼ │
┌─────────────┐ │
│ Consumer │ │
│ │ │
│ 1. Receive │ │
│ message │ │
│ │ │
│ 2. Check │ │
│ claim │ │
│ │ │
│ 3. Download │◀─────┘
│ if claim │
│ exists │
│ │
│ 4. Process │
│ payload │
└─────────────┘
Namespace-Level Coordination
Producers and consumers in the same namespace share claim check requirements:
namespace: video-processing
claim_check:
enabled: true
threshold: 1048576 # 1MB - messages larger trigger claim check
backend: minio
bucket: prism-claims-video-processing
ttl: 3600 # Claim expires after 1 hour
compression: gzip # Optional compression before upload
The proxy validates that:
- Producer and consumer claim check configurations match
- Both have access to the same object store backend
- Bucket exists and is accessible
- TTL policies are compatible
Producer Behavior
// Producer configuration with claim check slot
type Config struct {
Name string
Behavior BehaviorConfig
Slots SlotConfig
ClaimCheck *ClaimCheckConfig // NEW: Optional claim check
}
type ClaimCheckConfig struct {
Enabled bool
Threshold int64 // Bytes - payloads > threshold use claim check
Backend string // "minio", "s3", "gcs", etc.
Bucket string
TTL int // Seconds - how long claim is valid
Compression string // "none", "gzip", "zstd"
}
// Producer slots with optional object store
type SlotConfig struct {
MessageSink string // Required: NATS, Kafka, Redis
StateStore string // Optional: for deduplication
ObjectStore string // Optional: for claim check
}
Publish Flow:
func (p *Producer) Publish(ctx context.Context, topic string, payload []byte, metadata map[string]string) error {
// Check if payload exceeds threshold
if p.claimCheck != nil && int64(len(payload)) > p.claimCheck.Threshold {
// 1. Compress if configured
data := payload
if p.claimCheck.Compression != "none" {
data = compress(payload, p.claimCheck.Compression)
}
// 2. Upload to object store
claimID := generateClaimID()
objectKey := fmt.Sprintf("%s/%s/%s", p.namespace, topic, claimID)
if err := p.objectStore.Put(ctx, p.claimCheck.Bucket, objectKey, data); err != nil {
return fmt.Errorf("claim check upload failed: %w", err)
}
// 3. Set TTL for automatic cleanup
if p.claimCheck.TTL > 0 {
if err := p.objectStore.SetTTL(ctx, p.claimCheck.Bucket, objectKey, p.claimCheck.TTL); err != nil {
// Non-fatal: log warning and continue
slog.Warn("failed to set claim check TTL", "error", err)
}
}
// 4. Send small message with claim reference
claimPayload := ClaimCheckMessage{
ClaimID: claimID,
Bucket: p.claimCheck.Bucket,
ObjectKey: objectKey,
OriginalSize: len(payload),
Compression: p.claimCheck.Compression,
ContentType: metadata["content-type"],
Checksum: sha256.Sum256(payload),
}
smallPayload, _ := json.Marshal(claimPayload)
metadata["prism-claim-check"] = "true"
return p.messageSink.Publish(ctx, topic, smallPayload, metadata)
}
// Normal path: small payload sent directly
return p.messageSink.Publish(ctx, topic, payload, metadata)
}
Consumer Behavior
// Consumer configuration with claim check slot
type Config struct {
Name string
Behavior BehaviorConfig
Slots SlotConfig
ClaimCheck *ClaimCheckConfig // NEW: Optional claim check
}
// Consumer slots with optional object store
type SlotConfig struct {
MessageSource string // Required: NATS, Kafka, Redis
StateStore string // Optional: for offset tracking
ObjectStore string // Optional: for claim check
}
Consumption Flow:
func (c *Consumer) processMessage(ctx context.Context, msg *plugin.PubSubMessage) error {
// Check if message is a claim check
if msg.Metadata["prism-claim-check"] == "true" {
// 1. Deserialize claim check
var claim ClaimCheckMessage
if err := json.Unmarshal(msg.Payload, &claim); err != nil {
return fmt.Errorf("invalid claim check message: %w", err)
}
// 2. Download from object store
data, err := c.objectStore.Get(ctx, claim.Bucket, claim.ObjectKey)
if err != nil {
return fmt.Errorf("claim check download failed: %w", err)
}
// 3. Verify checksum
actualChecksum := sha256.Sum256(data)
if !bytes.Equal(actualChecksum[:], claim.Checksum[:]) {
return fmt.Errorf("claim check checksum mismatch")
}
// 4. Decompress if needed
if claim.Compression != "none" {
data, err = decompress(data, claim.Compression)
if err != nil {
return fmt.Errorf("claim check decompression failed: %w", err)
}
}
// 5. Replace payload with actual data
msg.Payload = data
msg.Metadata["content-type"] = claim.ContentType
delete(msg.Metadata, "prism-claim-check")
// 6. Optional: Delete claim after successful retrieval
if c.claimCheck.DeleteAfterRead {
go func() {
if err := c.objectStore.Delete(ctx, claim.Bucket, claim.ObjectKey); err != nil {
slog.Warn("failed to delete claim after read", "error", err)
}
}()
}
}
// Process message (with original or retrieved payload)
return c.processor(ctx, msg)
}
Message Format
Claim Check Message:
message ClaimCheckMessage {
// Unique claim identifier
string claim_id = 1;
// Object store location
string bucket = 2;
string object_key = 3;
// Metadata about original payload
int64 original_size = 4;
string content_type = 5;
bytes checksum = 6; // SHA-256 of uncompressed payload
// Compression info
string compression = 7; // "none", "gzip", "zstd"
// Expiration
int64 expires_at = 8; // Unix timestamp
// Optional: multipart for very large files
optional MultipartInfo multipart = 9;
}
message MultipartInfo {
int32 part_count = 1;
repeated string part_keys = 2;
}
Proxy Validation
The proxy validates claim check coordination at pattern registration:
// When producer registers
func (p *Proxy) RegisterProducer(ctx context.Context, req *RegisterRequest) error {
// Load namespace configuration
ns := p.getNamespace(req.Namespace)
// If namespace requires claim check, validate producer config
if ns.ClaimCheck.Required {
if req.Config.ClaimCheck == nil {
return status.Error(codes.FailedPrecondition,
"namespace requires claim check but producer does not support it")
}
// Validate configuration matches namespace requirements
if err := validateClaimCheckConfig(req.Config.ClaimCheck, ns.ClaimCheck); err != nil {
return status.Errorf(codes.InvalidArgument,
"claim check config mismatch: %v", err)
}
// Verify object store backend is accessible
if err := p.verifyObjectStoreAccess(ctx, req.Config.ClaimCheck); err != nil {
return status.Errorf(codes.FailedPrecondition,
"object store not accessible: %v", err)
}
}
return nil
}
// Similar validation for consumer registration
Object Store Interface
// ObjectStoreInterface defines operations needed for claim check
type ObjectStoreInterface interface {
// Put stores an object
Put(ctx context.Context, bucket, key string, data []byte) error
// Get retrieves an object
Get(ctx context.Context, bucket, key string) ([]byte, error)
// Delete removes an object
Delete(ctx context.Context, bucket, key string) error
// SetTTL sets object expiration
SetTTL(ctx context.Context, bucket, key string, ttlSeconds int) error
// Exists checks if object exists
Exists(ctx context.Context, bucket, key string) (bool, error)
// GetMetadata retrieves object metadata without downloading
GetMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error)
}
type ObjectMetadata struct {
Size int64
ContentType string
LastModified time.Time
ETag string
}
MinIO Driver for Testing
For acceptance testing, we'll use MinIO (S3-compatible) via testcontainers:
// pkg/drivers/minio/driver.go
type MinioDriver struct {
client *minio.Client
config MinioConfig
}
func (d *MinioDriver) Put(ctx context.Context, bucket, key string, data []byte) error {
_, err := d.client.PutObject(ctx, bucket, key,
bytes.NewReader(data), int64(len(data)),
minio.PutObjectOptions{})
return err
}
func (d *MinioDriver) Get(ctx context.Context, bucket, key string) ([]byte, error) {
obj, err := d.client.GetObject(ctx, bucket, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer obj.Close()
return io.ReadAll(obj)
}
// ... other methods
Acceptance Test Setup:
// tests/acceptance/backends/minio.go
func init() {
framework.MustRegisterBackend(framework.Backend{
Name: "MinIO",
SetupFunc: setupMinIO,
SupportedPatterns: []framework.Pattern{
framework.PatternObjectStore, // New pattern
},
Capabilities: framework.Capabilities{
SupportsObjectStore: true,
MaxObjectSize: 5 * 1024 * 1024 * 1024, // 5GB
},
})
}
func setupMinIO(t *testing.T, ctx context.Context) (interface{}, func()) {
// Start MinIO testcontainer
minioContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "minio/minio:latest",
ExposedPorts: []string{"9000/tcp"},
Env: map[string]string{
"MINIO_ROOT_USER": "minioadmin",
"MINIO_ROOT_PASSWORD": "minioadmin",
},
Cmd: []string{"server", "/data"},
WaitingFor: wait.ForHTTP("/minio/health/live").WithPort("9000/tcp"),
},
Started: true,
})
require.NoError(t, err)
// Get connection details
endpoint, err := minioContainer.Endpoint(ctx, "")
require.NoError(t, err)
// Create MinIO driver
driver := minio.New()
config := &plugin.Config{
Plugin: plugin.PluginConfig{
Name: "minio-test",
Version: "0.1.0",
},
Backend: map[string]any{
"endpoint": endpoint,
"access_key": "minioadmin",
"secret_key": "minioadmin",
"use_ssl": false,
},
}
err = driver.Initialize(ctx, config)
require.NoError(t, err)
err = driver.Start(ctx)
require.NoError(t, err)
cleanup := func() {
driver.Stop(ctx)
minioContainer.Terminate(ctx)
}
return driver, cleanup
}
Acceptance Test Scenarios
// tests/acceptance/patterns/claimcheck/claimcheck_test.go
func TestClaimCheckPattern(t *testing.T) {
tests := []framework.MultiPatternTest{
{
Name: "LargePayloadClaimCheck",
RequiredPatterns: map[string]framework.Pattern{
"producer": framework.PatternProducer,
"consumer": framework.PatternConsumer,
"objectstore": framework.PatternObjectStore,
},
Func: testLargePayloadClaimCheck,
Timeout: 60 * time.Second,
Tags: []string{"claim-check", "large-payload"},
},
{
Name: "ThresholdBoundary",
RequiredPatterns: map[string]framework.Pattern{
"producer": framework.PatternProducer,
"consumer": framework.PatternConsumer,
"objectstore": framework.PatternObjectStore,
},
Func: testThresholdBoundary,
Timeout: 30 * time.Second,
Tags: []string{"claim-check", "boundary"},
},
{
Name: "Compression",
RequiredPatterns: map[string]framework.Pattern{
"producer": framework.PatternProducer,
"consumer": framework.PatternConsumer,
"objectstore": framework.PatternObjectStore,
},
Func: testCompression,
Timeout: 30 * time.Second,
Tags: []string{"claim-check", "compression"},
},
{
Name: "TTLExpiration",
RequiredPatterns: map[string]framework.Pattern{
"producer": framework.PatternProducer,
"consumer": framework.PatternConsumer,
"objectstore": framework.PatternObjectStore,
},
Func: testTTLExpiration,
Timeout: 45 * time.Second,
Tags: []string{"claim-check", "ttl"},
},
{
Name: "ChecksumValidation",
RequiredPatterns: map[string]framework.Pattern{
"producer": framework.PatternProducer,
"consumer": framework.PatternConsumer,
"objectstore": framework.PatternObjectStore,
},
Func: testChecksumValidation,
Timeout: 30 * time.Second,
Tags: []string{"claim-check", "security"},
},
}
framework.RunMultiPatternTests(t, tests)
}
func testLargePayloadClaimCheck(t *testing.T, drivers map[string]interface{}, caps framework.Capabilities) {
ctx := context.Background()
// Setup producer with claim check
prod := setupProducerWithClaimCheck(t, ctx, drivers["producer"], drivers["objectstore"])
cons := setupConsumerWithClaimCheck(t, ctx, drivers["consumer"], drivers["objectstore"])
// Generate 5MB payload (exceeds 1MB threshold)
largePayload := make([]byte, 5*1024*1024)
rand.Read(largePayload)
// Publish
err := prod.Publish(ctx, "test-topic", largePayload, map[string]string{
"content-type": "application/octet-stream",
})
require.NoError(t, err)
// Consumer should receive full payload via claim check
received := <-cons.Messages()
assert.Equal(t, len(largePayload), len(received.Payload))
assert.Equal(t, largePayload, received.Payload)
}
func testThresholdBoundary(t *testing.T, drivers map[string]interface{}, caps framework.Capabilities) {
ctx := context.Background()
prod := setupProducerWithClaimCheck(t, ctx, drivers["producer"], drivers["objectstore"])
cons := setupConsumerWithClaimCheck(t, ctx, drivers["consumer"], drivers["objectstore"])
// Payload just under threshold (should NOT use claim check)
smallPayload := make([]byte, 1048575) // 1MB - 1 byte
err := prod.Publish(ctx, "test-topic", smallPayload, nil)
require.NoError(t, err)
msg1 := <-cons.Messages()
assert.Empty(t, msg1.Metadata["prism-claim-check"])
// Payload exactly at threshold (should use claim check)
thresholdPayload := make([]byte, 1048576) // 1MB
err = prod.Publish(ctx, "test-topic", thresholdPayload, nil)
require.NoError(t, err)
msg2 := <-cons.Messages()
assert.Equal(t, thresholdPayload, msg2.Payload)
}
Benefits
- Handles Large Payloads: No message broker size limits
- Reduces Broker Load: Small messages flow through queue
- Better Performance: Less serialization/deserialization overhead
- Cost Optimization: Object storage cheaper than message transfer
- Automatic Cleanup: TTL-based claim expiration prevents storage bloat
- Transparent: Application code unchanged, pattern handles complexity
- Namespace Coordination: Proxy validates producer/consumer compatibility
Trade-offs
Advantages
- Decouples message flow from payload storage
- Scales to multi-GB payloads
- Reduces network congestion
- Automatic garbage collection via TTL
Disadvantages
- Increased latency (additional network hop to object store)
- More infrastructure dependencies (object store required)
- Potential consistency issues if claim expires before consumption
- Additional operational complexity
Migration Path
- Phase 1: Implement object store interface and MinIO driver
- Phase 2: Add claim check support to producer pattern
- Phase 3: Add claim check support to consumer pattern
- Phase 4: Add namespace validation in proxy
- Phase 5: Create acceptance tests with MinIO
- Phase 6: Document pattern and create examples
Alternatives Considered
1. Inline Chunking
Break large messages into multiple small messages.
Rejected: Adds complexity to consumer (reassembly), doesn't reduce broker load proportionally.
2. Separate Large Message Queue
Use different queue for large messages.
Rejected: Requires dual-queue management, ordering issues, more complex routing.
3. Always Use Object Store
Store all payloads in object store, regardless of size.
Rejected: Unnecessary overhead for small messages, increased latency.
Open Questions
- Multipart Upload: Should we support multipart uploads for very large payloads (>5GB)?
- Encryption: Should claims be encrypted in object store? (Separate RFC)
- Bandwidth Throttling: Should we rate-limit object store operations?
- Cross-Region: How do claims work in multi-region deployments?
- Claim ID Collision: Use UUID v4 or content-addressed (hash-based)?
References
- Enterprise Integration Patterns: Claim Check
- Azure Service Bus: Send large messages using claim check pattern
- AWS EventBridge: Claim check pattern
- MinIO Documentation
- S3 API Reference
Related Documents
- ADR-051: MinIO for Claim Check Testing (to be created)
- ADR-052: Object Store Interface Design (to be created)
- ADR-053: Claim Check TTL and Garbage Collection (to be created)
- RFC-031: Message Envelope Protocol (encryption)
- RFC-008: Proxy-Plugin Architecture