Skip to main content

RFC-037: Mailbox Pattern - Searchable Event Store

Summary

The Mailbox Pattern provides a searchable, persistent event store by consuming messages from a queue and storing them in a structured database with indexed headers and blob bodies. The pattern consumes messages in the universal PrismEnvelope format (RFC-031), extracts indexed headers from envelope metadata fields, and stores the complete envelope as an opaque blob for later reconstruction. This enables efficient querying by metadata while preserving full message context.

Motivation

Use Cases

  1. Audit Logging: Store all system events with searchable metadata (user, action, resource) but encrypted PII
  2. Email/Message Archives: Store communications with searchable headers (from, to, subject, date) and encrypted bodies
  3. Event Sourcing: Capture all domain events with indexed event types, aggregates, and timestamps
  4. System Observability: Archive traces, logs, and metrics with searchable dimensions
  5. Compliance: Retain records with searchable metadata while protecting sensitive payload data

Problem Statement

Existing patterns lack a unified solution for:

  • Indexed Search: Query events by metadata without scanning all messages
  • Encrypted Bodies: Store sensitive payloads securely while maintaining header searchability
  • Schema Evolution: Handle varying header schemas across different event types
  • Pluggable Storage: Decouple pattern logic from storage backend (SQLite, PostgreSQL, ClickHouse)

Design

Architecture

┌────────────────────────────────────────────────────────────────┐
│ Mailbox Pattern (Composite) │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐│
│ │ Message │ │ Table │ │ Table ││
│ │ Consumer │──────▶│ Writer │ │ Reader ││
│ │ Slot │ │ Slot │ │ Slot ││
│ └─────────────┘ └──────────────┘ └─────────────┘│
│ │ │ ▲ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ SQLite DB │─────────────┘ │
│ │ │ (Headers + │ │
│ │ │ Blob) │ │
│ │ └────────────────┘ │
│ │ │
│ ▼ │
│ Extract Headers → Index Columns │
│ Store Body → Blob Column │
│ Query Interface → Returns MailboxItem[] │
│ │
└────────────────────────────────────────────────────────────────┘

Slot Architecture

The Mailbox Pattern has three backend slots:

Slot 1: Message Source (Queue Consumer)

  • Interface: QueueInterface or PubSubInterface
  • Purpose: Consume events from messaging backend
  • Implementations: NATS, Kafka, Redis Streams, RabbitMQ
  • Configuration: topic, consumer group, batch size

Slot 2: Storage Backend (Table Writer)

  • Interface: TableWriterInterface (new)
  • Purpose: Persist events with indexed headers
  • Implementations: SQLite, PostgreSQL, ClickHouse
  • Configuration: table name, indexed columns, retention policy

Slot 3: Query Interface (Table Reader)

  • Interface: TableReaderInterface (new)
  • Purpose: Retrieve stored messages as array of MailboxItem (header + payload)
  • Implementations: SQLite, PostgreSQL, ClickHouse (same backends as writer)
  • Configuration: shared database connection with writer slot

Message Structure

Messages consumed from the queue use the PrismEnvelope format defined in RFC-031:

// PrismEnvelope wraps all messages (pub/sub, queue, mailbox)
message PrismEnvelope {
int32 envelope_version = 1;
PrismMetadata metadata = 2;
google.protobuf.Any payload = 3;
SecurityContext security = 4;
ObservabilityContext observability = 5;
SchemaContext schema = 6;
map<string, bytes> extensions = 99;
}

See RFC-031 for complete envelope specification.

Header Extraction and Indexing

The pattern extracts indexed headers from PrismEnvelope fields (not from a flat metadata map):

Standard Indexed Headers (from PrismEnvelope):

Indexed ColumnEnvelope FieldPurpose
message_idmetadata.message_idUnique identifier (primary key)
timestampmetadata.published_atTime range queries
topicmetadata.topicTopic filter queries
content_typemetadata.content_typePayload format
schema_idschema.schema_urlSchema registry reference
encryptionsecurity.encryption.algorithmEncryption algorithm
correlation_idmetadata.correlation_idRequest tracing
principalsecurity.publisher_idIdentity-based queries
namespacemetadata.namespaceMulti-tenancy isolation

Observability Headers (optional indexing):

  • observability.trace_id → Distributed trace reconstruction
  • observability.span_id → Trace span identification
  • observability.labels → Custom metric dimensions

Custom Headers:

  • extensions map → Flexible JSON storage for application-specific metadata

Table Schema

Default SQLite table schema:

CREATE TABLE IF NOT EXISTS mailbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,

-- Indexed headers (extracted from PrismEnvelope)
message_id TEXT NOT NULL UNIQUE,
timestamp INTEGER NOT NULL,
topic TEXT NOT NULL,
content_type TEXT,
schema_url TEXT,
schema_version TEXT,
encryption TEXT,
correlation_id TEXT,
principal TEXT,
namespace TEXT,

-- Observability (optional indexed columns)
trace_id TEXT,
span_id TEXT,

-- Custom headers (JSON for flexibility)
custom_headers TEXT, -- JSON map from extensions field

-- Full envelope as blob (for complete reconstruction)
envelope_blob BLOB NOT NULL,

-- Metadata
created_at INTEGER NOT NULL,

-- Indexes for common queries
INDEX idx_timestamp (timestamp),
INDEX idx_topic (topic),
INDEX idx_principal (principal),
INDEX idx_correlation_id (correlation_id),
INDEX idx_trace_id (trace_id)
);

Key Design Decision: Store the complete PrismEnvelope as envelope_blob to enable full message reconstruction. Indexed columns provide fast queries, while the blob preserves all envelope context (security, observability, schema).

Backend Interfaces

Two new backend interfaces for structured storage:

TableWriterInterface

// TableWriterInterface defines operations for writing structured events
type TableWriterInterface interface {
// WriteEvent stores an event with indexed headers and body
WriteEvent(ctx context.Context, event *MailboxItem) error

// DeleteOldEvents removes events older than retention period
DeleteOldEvents(ctx context.Context, olderThan int64) (int64, error)

// GetTableStats returns storage statistics
GetTableStats(ctx context.Context) (*TableStats, error)
}

TableReaderInterface

// TableReaderInterface defines operations for reading structured events
type TableReaderInterface interface {
// QueryEvents retrieves events matching filter criteria
// Returns messages as array of MailboxItem (header + payload)
QueryEvents(ctx context.Context, filter *EventFilter) ([]*MailboxItem, error)

// GetEvent retrieves a single event by message ID
GetEvent(ctx context.Context, messageID string) (*MailboxItem, error)

// GetTableStats returns storage statistics
GetTableStats(ctx context.Context) (*TableStats, error)
}

Shared Types

// MailboxItem represents a structured event for storage
// Fields are extracted from PrismEnvelope for indexed queries
type MailboxItem struct {
// Indexed columns (from PrismEnvelope)
MessageID string // metadata.message_id
Timestamp int64 // metadata.published_at (Unix millis)
Topic string // metadata.topic
ContentType string // metadata.content_type
SchemaURL string // schema.schema_url
SchemaVersion string // schema.schema_version
Encryption string // security.encryption.algorithm
CorrelationID string // metadata.correlation_id
Principal string // security.publisher_id
Namespace string // metadata.namespace

// Observability (optional indexed columns)
TraceID string // observability.trace_id
SpanID string // observability.span_id

// Custom headers (from extensions map)
CustomHeaders map[string]string // extensions field

// Full envelope blob (complete PrismEnvelope protobuf bytes)
EnvelopeBlob []byte
}

// ExtractMailboxItem converts PrismEnvelope to MailboxItem
func ExtractMailboxItem(envelope *PrismEnvelope) (*MailboxItem, error) {
envelopeBytes, err := proto.Marshal(envelope)
if err != nil {
return nil, fmt.Errorf("failed to marshal envelope: %w", err)
}

customHeaders := make(map[string]string)
for k, v := range envelope.Extensions {
customHeaders[k] = string(v)
}

return &MailboxItem{
MessageID: envelope.Metadata.MessageId,
Timestamp: envelope.Metadata.PublishedAt.AsTime().UnixMilli(),
Topic: envelope.Metadata.Topic,
ContentType: envelope.Metadata.ContentType,
SchemaURL: envelope.Schema.SchemaUrl,
SchemaVersion: envelope.Schema.SchemaVersion,
Encryption: envelope.Security.Encryption.Algorithm,
CorrelationID: envelope.Metadata.CorrelationId,
Principal: envelope.Security.PublisherId,
Namespace: envelope.Metadata.Namespace,
TraceID: envelope.Observability.TraceId,
SpanID: envelope.Observability.SpanId,
CustomHeaders: customHeaders,
EnvelopeBlob: envelopeBytes,
}, nil
}

// ReconstructEnvelope deserializes the full PrismEnvelope from blob
func (me *MailboxItem) ReconstructEnvelope() (*PrismEnvelope, error) {
envelope := &PrismEnvelope{}
if err := proto.Unmarshal(me.EnvelopeBlob, envelope); err != nil {
return nil, fmt.Errorf("failed to unmarshal envelope: %w", err)
}
return envelope, nil
}

// EventFilter defines query criteria
type EventFilter struct {
StartTime *time.Time
EndTime *time.Time
Topics []string
Principals []string
CorrelationID *string
TraceID *string // Filter by trace ID for distributed trace reconstruction
Limit int
Offset int
}

// TableStats provides storage metrics
type TableStats struct {
TotalEvents int64
TotalSizeBytes int64
OldestEvent time.Time
NewestEvent time.Time
}

Pattern Configuration

YAML configuration for mailbox pattern:

namespaces:
- name: $admin
pattern: mailbox
pattern_version: 0.1.0
description: Store admin events with searchable headers

# Schema configuration (RFC-031)
schemas:
# Envelope schema (PrismEnvelope)
envelope:
url: github.com/prism-project/prism/proto/envelope/v1/envelope.proto
version: v1
format: protobuf
message_name: prism.envelope.v1.PrismEnvelope
validation: strict # Validate envelope structure

# Payload schemas (admin events)
payload:
url: github.com/prism-project/prism/proto/admin/v1
version: v1
format: protobuf
schemas:
- topic: admin.users.created
message_name: UserCreatedEvent
- topic: admin.users.updated
message_name: UserUpdatedEvent
- topic: admin.roles.assigned
message_name: RoleAssignedEvent
- topic: admin.policies.changed
message_name: PolicyChangedEvent
validation: strict
compatibility: backward

slots:
message_source:
backend: nats
interfaces:
- QueueInterface
config:
url: nats://localhost:4222
subject: admin.events.>
consumer_group: mailbox-admin
durable: true

storage:
backend: sqlite
interfaces:
- TableWriterInterface
config:
database_path: /Users/jrepp/.prism/mailbox-admin.db
table_name: mailbox
indexed_headers:
- message_id
- timestamp
- topic
- principal
- correlation_id
- trace_id
retention_days: 90

query:
backend: sqlite
interfaces:
- TableReaderInterface
config:
database_path: /Users/jrepp/.prism/mailbox-admin.db
table_name: mailbox

behavior:
batch_size: 100
auto_commit: true
max_retries: 3
retention_policy:
max_age_days: 90
max_size_gb: 10

Schema Validation

The Mailbox Pattern validates both envelope and payload schemas before storage:

Validation Flow:

// 1. Validate envelope structure (PrismEnvelope)
envelope := &PrismEnvelope{}
if err := proto.Unmarshal(bytes, envelope); err != nil {
return fmt.Errorf("invalid envelope structure: %w", err)
}

// 2. Validate envelope version
if envelope.EnvelopeVersion != 1 {
log.Warn("Unsupported envelope version", "version", envelope.EnvelopeVersion)
}

// 3. Validate required envelope fields
if envelope.Metadata == nil || envelope.Metadata.MessageId == "" {
return fmt.Errorf("missing required envelope metadata")
}

// 4. Validate payload schema (from namespace config)
schemaValidator := getSchemaValidator(envelope.Metadata.Topic)
if err := schemaValidator.Validate(envelope.Payload); err != nil {
return fmt.Errorf("payload schema validation failed: %w", err)
}

// 5. Extract and store (validation passed)
mailboxItem, err := ExtractMailboxItem(envelope)

Schema Enforcement Modes:

ModeBehaviorUse Case
strictReject invalid messagesProduction environments
warnLog warnings, store anywayDevelopment/testing
disabledNo validationLegacy migrations

Configuration:

schemas:
envelope:
validation: strict # Reject malformed envelopes
payload:
validation: warn # Log schema violations but store

Pattern Behavior

Message Processing Flow:

  1. Consume: Read PrismEnvelope from queue (message_source slot)
  2. Deserialize: Parse PrismEnvelope protobuf bytes
  3. Extract: Extract indexed headers from envelope fields using ExtractMailboxItem()
  4. Store: Write to table with indexed headers + full envelope blob
  5. Commit: Acknowledge message (if auto_commit enabled)

Code Example:

// Consumer receives PrismEnvelope from queue
envelopeBytes := queueBackend.Consume(ctx)

// Deserialize envelope
envelope := &PrismEnvelope{}
proto.Unmarshal(envelopeBytes, envelope)

// Extract indexed headers + blob
mailboxEvent, err := ExtractMailboxItem(envelope)
if err != nil {
log.Error("Failed to extract mailbox event", "error", err)
return err
}

// Store in database
err = tableWriter.WriteEvent(ctx, mailboxEvent)
if err != nil {
log.Error("Failed to write event", "error", err)
return err
}

// Acknowledge
queueBackend.Ack(ctx, envelope.Metadata.MessageId)

Error Handling:

  • Parse errors → Skip message, log warning
  • Storage errors → Retry with exponential backoff
  • Max retries exceeded → Log to dead letter queue (if configured)

Retention Policy:

  • Background job deletes events older than max_age_days
  • Vacuum/compact database when exceeding max_size_gb

Comparison to Alternatives

FeatureMailbox PatternConsumer PatternRaw SQL
Message Format✅ PrismEnvelope (RFC-031)❌ Custom❌ Any
Indexed Headers✅ Automatic (from envelope)❌ Manual✅ Manual
Encrypted Bodies✅ Supported (envelope blob)❌ Not handled✅ Manual
Pluggable Storage✅ Slot-based❌ None❌ Fixed
Schema Evolution✅ Envelope versioning❌ Not handled⚠️ Migrations
Observability✅ Trace/span indexing❌ None❌ Manual
Query API✅ Built-in❌ None✅ SQL
Retention Management✅ Automatic❌ Manual❌ Manual
Full Context Recovery✅ envelope_blob❌ None⚠️ Partial

Implementation Plan

Phase 1: Core Interfaces (Week 1)

  • Define TableWriterInterface in pkg/plugin/interfaces.go
  • Define MailboxItem, EventFilter, TableStats types
  • Add proto definitions for new interfaces

Phase 2: SQLite Backend (Week 2)

  • Implement SQLite table writer in pkg/drivers/sqlite/
  • Create table schema with indexed columns
  • Implement WriteEvent, QueryEvents, DeleteOldEvents
  • Add connection pooling and WAL mode
  • Write unit tests with testcontainers

Phase 3: Mailbox Pattern (Week 2-3)

  • Create patterns/mailbox/ directory structure
  • Implement mailbox pattern core logic
  • Implement header extraction and mapping
  • Add retention policy background job
  • Implement mailbox-runner command
  • Create manifest.yaml

Phase 4: Integration & Testing (Week 3)

  • Integration tests with NATS + SQLite
  • Test encrypted body handling
  • Test custom header indexing
  • Load test with 100k events/sec
  • Documentation and examples

Phase 5: $admin Namespace Setup (Week 4)

  • Configure mailbox pattern for $admin namespace
  • Set up NATS subscription for admin.* topics
  • Deploy with prism-launcher
  • Verify event capture and search

Testing Strategy

Unit Tests

  • Header extraction from various metadata formats
  • SQLite table writer operations
  • Retention policy logic
  • Error handling (storage failures, parse errors)

Integration Tests

  • End-to-end: NATS → Mailbox → SQLite → Query
  • Encrypted body storage and retrieval
  • Custom header indexing
  • Concurrent writes (10 goroutines)

Load Tests

  • Throughput: 100k events/sec for 10 minutes
  • Query performance: 1000 QPS on indexed headers
  • Storage growth: 1M events = ~500MB database
  • Retention policy: Delete 100k old events <1 second

Security Considerations

Encrypted Bodies

  • Pattern stores encrypted bodies as-is (opaque blobs)
  • No decryption required for indexing headers
  • Encryption indicated by prism-encryption header

Access Control

  • Namespace-level authorization via Prism auth layer
  • SQLite file permissions: 0600 (owner read/write only)
  • No direct database access from applications

PII Handling

  • Headers should NOT contain PII (by convention)
  • PII must be in encrypted body
  • Audit headers: user ID, action, resource (not names/emails)

Open Questions

  1. PostgreSQL Support: Should we implement PostgreSQL table writer in Phase 2 or defer?

    • Decision: Defer to Phase 6, focus on SQLite first
  2. Query Language: Expose SQL directly or create filter DSL?

    • Decision: Start with EventFilter struct, add SQL query API later if needed
  3. Compression: Should we compress bodies before storage?

    • Decision: No automatic compression. Applications can pre-compress and set content-encoding header
  4. Partitioning: How to handle very large mailboxes (>10M events)?

    • Decision: Use SQLite ATTACH for time-based partitions (one DB per month)
  5. Custom Index Columns: Allow dynamic index creation at runtime?

    • Decision: No. Indexes defined at configuration time only

Success Criteria

  • ✅ Consume 10k events/sec from NATS with SQLite backend
  • ✅ Query indexed headers with <10ms latency (1M events)
  • ✅ Support encrypted bodies without header degradation
  • ✅ Automatic retention policy deletes old events
  • ✅ Zero data loss during pattern restart (durable consumer)
  • ✅ Integration with prism-launcher and prism-admin

References

  • RFC-031: Universal Message Envelope Protocol (PrismEnvelope format)
  • RFC-014: Layered Data Access Patterns (slot architecture)
  • RFC-017: Multicast Registry Pattern (slot binding examples)
  • RFC-030: Schema Evolution and Validation (schema context)
  • RFC-033: Claim Check Pattern (large payload handling)
  • ADR-005: Backend Plugin Architecture

Appendix A: Example Queries

Query by Time Range:

SELECT message_id, timestamp, topic, principal
FROM mailbox
WHERE timestamp BETWEEN 1697000000000 AND 1697086400000
ORDER BY timestamp DESC
LIMIT 100;

Query by Principal and Topic:

SELECT message_id, timestamp, correlation_id, body
FROM mailbox
WHERE principal = 'user-123'
AND topic LIKE 'admin.users.%'
ORDER BY timestamp DESC;

Query by Correlation ID (Distributed Trace):

SELECT message_id, timestamp, topic, principal, body
FROM mailbox
WHERE correlation_id = 'trace-abc123'
ORDER BY timestamp ASC;

Appendix B: SQLite Backend Details

Connection Settings:

// Optimized SQLite settings for write-heavy workload
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=10000;
PRAGMA temp_store=MEMORY;
PRAGMA mmap_size=30000000000;

Write Performance:

  • Batch inserts: 100 events per transaction
  • WAL mode: 10x faster writes vs rollback journal
  • Expected: 50k events/sec on SSD (single process)

Query Performance:

  • Indexed queries: <10ms for 1M events
  • Full-text search: Add FTS5 virtual table for body search (if decrypted)
  • Explain query plans with EXPLAIN QUERY PLAN

Appendix C: Future Enhancements

Phase 6: Additional Backends

  • PostgreSQL table writer (horizontal scaling)
  • ClickHouse table writer (OLAP analytics)
  • DynamoDB table writer (serverless)

Phase 7: Advanced Features

  • Full-text search on decrypted bodies (opt-in)
  • Time-series aggregations (events per hour/day)
  • Materialized views for common queries
  • Export to Parquet for data lake integration

Phase 8: Admin UI

  • Web UI for searching mailbox events
  • Query builder for non-SQL users
  • Event detail view with header/body inspection
  • Export results to CSV/JSON