RFC-031: Message Envelope Protocol for Pub/Sub Systems
Abstract
This RFC defines a unified message envelope protocol for Prism's pub/sub systems that is:
- Consistent: Same envelope structure across all backends (Kafka, NATS, Redis, PostgreSQL, SQS)
- Flexible: Extensible for future features without breaking existing consumers
- Secure: Built-in support for authentication, encryption, and audit metadata
- Developer-Friendly: Ergonomic APIs for common operations, minimal boilerplate
- Future-Proof: Designed for 10+ year evolution with backward compatibility
The envelope wraps user payloads with metadata (routing, schema, auth, observability) while remaining backend-agnostic to support any message transport.
Motivation
The Problem: Inconsistent Message Envelopes
Current pub/sub implementations across different backends use inconsistent metadata formats:
Kafka (message headers):
schema-id: 123
correlation-id: abc-456
timestamp: 1697200000
NATS (custom headers):
Nats-Msg-Id: xyz-789
X-Schema-URL: github.com/...
X-Trace-ID: trace-123
Redis (no native headers, metadata in payload prefix):
{"meta":{"schema":"v2","ts":1697200000},"payload":<user data>}
PostgreSQL (JSON columns):
INSERT INTO events (topic, payload, metadata) VALUES ('orders', '{...}', '{"schema_version":"v2"}');
Problems This Creates:
- Client Complexity: Developers must handle each backend differently
- Migration Pain: Moving from Redis → Kafka requires rewriting envelope logic
- Missing Features: Some backends lack auth metadata, schema info, or trace context
- No Versioning: Can't evolve envelope without breaking consumers
- Security Gaps: No standard place for encryption keys, PII flags, or auth tokens
Real-World Example: Cross-Backend Migration
# Producer wants to migrate from Redis to Kafka
# Current code (Redis):
redis_client.publish("orders.created", json.dumps({
"meta": {"schema": "v2", "trace_id": trace_id},
"payload": order_dict
}))
# New code (Kafka) - COMPLETELY DIFFERENT API:
kafka_producer.send("orders.created", value=order_dict, headers=[
("schema-version", b"v2"),
("trace-id", trace_id.encode())
])
# Consumer code ALSO breaks:
# Redis consumer expects: json.loads(msg)["payload"]
# Kafka consumer expects: deserialize(msg.value)
# NATS consumer expects: msg.data
This is unacceptable for sustainable development.
Goals
- Single Envelope Format: One protobuf-based envelope for all backends
- Backend Abstraction: Prism SDK hides backend-specific serialization
- Backward Compatibility: Envelope v1 consumers work with v2 envelopes
- Forward Compatibility: v2 consumers ignore unknown v3 fields
- Security by Default: Auth tokens, encryption metadata built-in
- Observability: Trace IDs, timestamps, causality chains standard
- Schema Integration: Tight integration with RFC-030 schema registry
- Performance: Minimal overhead (<1% latency increase)
Non-Goals
- Payload Encryption: Envelope defines metadata; encryption is separate RFC
- Compression: Backend-specific optimization (e.g., Kafka compression)
- Ordered Delivery: Envelope doesn't enforce ordering (backend responsibility)
- Replay Protection: Deduplication is application-level concern
- Routing Logic: Envelope carries routing metadata; proxy implements routing
Proposed Solution: Prism Message Envelope v1
Core Design: Protobuf Envelope
syntax = "proto3";
package prism.envelope.v1;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
// PrismEnvelope wraps all pub/sub messages
message PrismEnvelope {
// Envelope version for evolution (REQUIRED)
int32 envelope_version = 1; // Currently: 1
// Message metadata (REQUIRED)
PrismMetadata metadata = 2;
// User payload (REQUIRED)
// Can be protobuf (Any), JSON (bytes), or custom format
google.protobuf.Any payload = 3;
// Security context (OPTIONAL but recommended)
SecurityContext security = 4;
// Observability context (OPTIONAL but recommended)
ObservabilityContext observability = 5;
// Schema metadata (OPTIONAL, required if RFC-030 schema validation enabled)
SchemaContext schema = 6;
// Extension fields for future evolution (OPTIONAL)
map<string, bytes> extensions = 99;
}
// Core message metadata
message PrismMetadata {
// Unique message ID (UUID v7 recommended for time-ordering)
string message_id = 1;
// Topic name (e.g., "orders.created")
string topic = 2;
// Namespace (multi-tenancy isolation)
string namespace = 3;
// Publish timestamp (producer clock)
google.protobuf.Timestamp published_at = 4;
// Content type (e.g., "application/protobuf", "application/json")
string content_type = 5;
// Content encoding (e.g., "gzip", "snappy", "none")
string content_encoding = 6;
// Message priority (0=lowest, 10=highest, default=5)
int32 priority = 7;
// TTL in seconds (0 = no expiration)
int64 ttl_seconds = 8;
// Correlation ID for request/response patterns
string correlation_id = 9;
// Causality: parent message ID (for event chains)
string causality_parent = 10;
}
// Security context
message SecurityContext {
// Publisher identity (from OIDC token or mTLS cert)
string publisher_id = 1;
// Publisher team/organization
string publisher_team = 2;
// Authorization token (for consumer validation)
// NOTE: Redacted in logs/audit trails
string auth_token = 3;
// Signature for message authenticity (HMAC-SHA256 or Ed25519)
bytes signature = 4;
// Signature algorithm ("hmac-sha256", "ed25519")
string signature_algorithm = 5;
// Encryption metadata (key ID, algorithm, IV)
EncryptionMetadata encryption = 6;
// PII sensitivity flag (from schema governance)
bool contains_pii = 7;
// Data classification ("public", "internal", "confidential", "restricted")
string data_classification = 8;
}
// Encryption metadata (payload encryption details)
// Keys are NEVER stored in envelope - always referenced from configuration/secrets
message EncryptionMetadata {
// Key ID (reference to key in Vault/KMS/configuration secrets)
// For symmetric: shared secret key ID
// For asymmetric: private key ID (for decryption)
// For post-quantum: private key ID (for decryption)
string key_id = 1;
// Encryption type
EncryptionType encryption_type = 2;
// Algorithm identifier (MUST be FIPS 140-3 compliant)
// Symmetric: "aes-256-gcm", "chacha20-poly1305"
// Asymmetric: "rsa-oaep-4096", "x25519-chacha20-poly1305", "ed25519"
// Post-Quantum: "kyber1024", "kyber768", "ml-kem-1024"
// Hybrid: "x25519-kyber1024", "rsa4096-kyber768"
string algorithm = 3;
// Public key ID (for asymmetric/post-quantum encryption)
// Producer encrypts with this public key
// Consumer decrypts with corresponding private key (from key_id)
// Not used for symmetric encryption
optional string public_key_id = 4;
// Initialization vector / nonce (algorithm-specific)
// Required for: AES-GCM, ChaCha20-Poly1305
// Not used for: RSA-OAEP (uses random padding)
optional bytes iv = 5;
// Additional authenticated data (for AEAD ciphers)
// Used by: AES-GCM, ChaCha20-Poly1305
// Contains metadata authenticated but not encrypted
optional bytes aad = 6;
// Encapsulated key (for hybrid/post-quantum encryption)
// Contains encrypted session key from KEM (Key Encapsulation Mechanism)
// Used by: Kyber, ML-KEM, hybrid schemes
optional bytes encapsulated_key = 7;
// Algorithm parameters (JSON-encoded)
// For extensibility without schema changes
// Example: {"kdf": "hkdf-sha256", "kdf_salt": "base64..."} for X25519
optional string algorithm_params = 8;
}
// Encryption type enumeration
enum EncryptionType {
ENCRYPTION_TYPE_UNSPECIFIED = 0; // Invalid/not encrypted
ENCRYPTION_TYPE_SYMMETRIC = 1; // Shared secret (AES, ChaCha20)
ENCRYPTION_TYPE_ASYMMETRIC = 2; // Public/private key (RSA, X25519)
ENCRYPTION_TYPE_POST_QUANTUM = 3; // PQ algorithms (Kyber, ML-KEM)
ENCRYPTION_TYPE_HYBRID = 4; // Classical + PQ (X25519+Kyber)
}
// Observability context (distributed tracing + metrics)
message ObservabilityContext {
// Trace ID (W3C Trace Context format)
string trace_id = 1;
// Span ID (W3C Trace Context format)
string span_id = 2;
// Parent span ID (for nested traces)
string parent_span_id = 3;
// Trace flags (W3C Trace Context sampled bit, etc.)
int32 trace_flags = 4;
// Baggage (key-value pairs for cross-service context)
map<string, string> baggage = 5;
// Metrics labels (for aggregation in Prometheus/Signoz)
map<string, string> labels = 6;
}
// Schema context (tight integration with RFC-030)
message SchemaContext {
// Schema URL (GitHub, Prism Registry, or HTTPS endpoint)
string schema_url = 1;
// Schema version (e.g., "v2", "1.0.0")
string schema_version = 2;
// Schema format ("protobuf", "json-schema", "avro")
string schema_format = 3;
// Schema hash (SHA-256 for immutability check)
string schema_hash = 4;
// Schema name (protobuf message name, e.g., "OrderCreated")
string schema_name = 5;
// Compatibility mode ("backward", "forward", "full", "none")
string compatibility_mode = 6;
// Deprecated fields accessed (for migration tracking)
repeated string deprecated_fields_used = 7;
}
Key Design Decisions
1. Protobuf for Envelope (Not JSON)
Why Protobuf:
- ✅ Binary Efficiency: 3-10x smaller than JSON for metadata
- ✅ Type Safety: Compile-time validation of envelope structure
- ✅ Evolution: Add fields without breaking consumers (field numbers)
- ✅ Language Support: Generated clients for Go, Python, Rust, JavaScript
Payload Flexibility:
- Payload can be any format (protobuf, JSON, Avro, custom)
- Envelope metadata is always protobuf (consistent)
google.protobuf.Any
allows any protobuf message- For JSON payloads, use
content_type: "application/json"
and store bytes
2. Envelope Version Field
int32 envelope_version = 1; // REQUIRED, always first field
Evolution Strategy:
Version | Changes | Backward Compatible? |
---|---|---|
v1 | Initial design (this RFC) | N/A (baseline) |
v2 | Add routing_hints field | ✅ Yes (v1 consumers ignore it) |
v3 | Change trace_id to structured type | ⚠️ Depends (need migration period) |
Consumer Handling:
// Consumer checks envelope version
envelope := &prism.PrismEnvelope{}
proto.Unmarshal(bytes, envelope)
if envelope.EnvelopeVersion > 1 {
log.Warn("Received envelope v%d (consumer supports v1), attempting best-effort parse", envelope.EnvelopeVersion)
// v1 consumer ignores unknown fields, continues processing
}
3. Extension Map for Future-Proofing
map<string, bytes> extensions = 99; // Field 99 reserved for extensions
Use Cases:
// Future: Add custom metadata without envelope version bump
envelope.Extensions["x-retry-count"] = []byte("3")
envelope.Extensions["x-dlq-source"] = []byte("orders.failed")
envelope.Extensions["x-custom-routing"] = []byte(`{"region":"us-west-2"}`)
Guidelines:
- Extensions prefixed with
x-
are non-standard (experimental) - Extensions without
x-
are standardized (future RFC) - Consumers MUST ignore unknown extensions
- Extensions are opaque
bytes
(serialize as JSON/protobuf as needed)
Backend-Specific Serialization
Prism SDK hides backend differences:
Kafka: Envelope as Message Value + Headers
Kafka Message {
Key: <partition key>
Value: <PrismEnvelope protobuf bytes>
Headers: {
"prism-envelope-version": "1"
"prism-message-id": "<uuid>"
"prism-topic": "orders.created"
"prism-trace-id": "<trace-id>"
}
}
Why Duplicate Metadata in Headers:
- Kafka tools (console consumer, Connect, etc.) can read headers without deserializing
- Filtering/routing at broker level (Kafka Streams, KSQLdb)
- Backward compat with non-Prism consumers (can read headers)
NATS: Envelope as Message Data + NATS Headers
NATS Message {
Subject: "orders.created"
Data: <PrismEnvelope protobuf bytes>
Headers: {
"Prism-Envelope-Version": "1"
"Prism-Message-ID": "<uuid>"
"Prism-Trace-ID": "<trace-id>"
"Nats-Msg-Id": "<uuid>" // NATS deduplication
}
}
Redis: Envelope as Pub/Sub Message
PUBLISH orders.created <PrismEnvelope protobuf bytes>
No headers in Redis Pub/Sub, so envelope is self-contained.
PostgreSQL: Envelope as JSONB Column
CREATE TABLE prism_events (
id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL,
envelope JSONB NOT NULL, -- PrismEnvelope as JSON
published_at TIMESTAMPTZ DEFAULT NOW(),
consumed BOOLEAN DEFAULT FALSE
);
-- Index for efficient topic queries
CREATE INDEX idx_events_topic_consumed ON prism_events(topic, consumed);
Why JSON for PostgreSQL:
- PostgreSQL JSONB has rich querying (GIN indexes)
- Easier debugging (human-readable)
- Still type-safe via protobuf → JSON conversion
S3/Object Storage: Envelope as Blob Metadata
S3 Object {
Key: "events/2025/10/13/orders.created/<uuid>.bin"
Body: <PrismEnvelope protobuf bytes>
Metadata: {
"x-amz-meta-prism-envelope-version": "1"
"x-amz-meta-prism-message-id": "<uuid>"
"x-amz-meta-prism-topic": "orders.created"
}
}
Developer APIs: Ergonomic Wrappers
Python Producer (High-Level API):
from prism_sdk import PrismClient, PrismPublishOptions
client = PrismClient(namespace="order-events")
# Simple publish (envelope auto-generated)
client.publish(
topic="orders.created",
payload=order, # Can be protobuf or dict
)
# Advanced publish (custom metadata)
client.publish(
topic="orders.created",
payload=order,
options=PrismPublishOptions(
correlation_id="req-12345",
priority=8,
ttl_seconds=3600,
labels={"region": "us-west", "tier": "premium"},
)
)
# Batch publish (single envelope for multiple messages)
client.publish_batch([
(topic="orders.created", payload=order1),
(topic="orders.created", payload=order2),
(topic="orders.updated", payload=order3),
])
Go Consumer (Type-Safe API):
import "prism.io/sdk/go"
client := prism.NewClient("order-events")
// Subscribe with typed messages
stream := client.SubscribeTyped[OrderCreated]("orders.created")
for envelope := range stream {
// Envelope provides metadata access
log.Info("Received message",
"message_id", envelope.Metadata().MessageID,
"published_at", envelope.Metadata().PublishedAt,
"trace_id", envelope.Observability().TraceID,
)
// Payload is strongly typed
order := envelope.Payload() // *OrderCreated
fmt.Printf("Order: %s, Total: %.2f\n", order.OrderId, order.Total)
// Check for deprecated fields
if len(envelope.Schema().DeprecatedFieldsUsed) > 0 {
log.Warn("Message uses deprecated fields", "fields", envelope.Schema().DeprecatedFieldsUsed)
}
// Acknowledge message
envelope.Ack()
}
Rust Consumer (Zero-Copy Deserialization):
use prism_sdk::{PrismClient, PrismEnvelope};
let client = PrismClient::new("order-events");
let mut stream = client.subscribe("orders.created").await?;
while let Some(envelope) = stream.next().await {
// Access metadata without copying
let metadata = envelope.metadata();
println!("Message ID: {}", metadata.message_id());
println!("Trace ID: {}", envelope.observability().trace_id());
// Deserialize payload (lazy, on-demand)
let order: OrderCreated = envelope.payload().parse()?;
println!("Order: {}, Total: {}", order.order_id, order.total);
// Security context
if envelope.security().contains_pii() {
// Handle PII appropriately
mask_pii_fields(&order);
}
envelope.ack().await?;
}
Backward Compatibility Strategy
v1 Consumers Reading v2 Envelopes:
// v1 envelope (baseline)
message PrismEnvelope {
int32 envelope_version = 1;
PrismMetadata metadata = 2;
google.protobuf.Any payload = 3;
}
// v2 envelope (adds routing hints)
message PrismEnvelope {
int32 envelope_version = 1;
PrismMetadata metadata = 2;
google.protobuf.Any payload = 3;
RoutingHints routing = 7; // NEW field
}
Protobuf Behavior:
- v1 consumer ignores field 7 (unknown field, no error)
- v1 consumer continues processing normally
- No coordination needed between producer/consumer upgrades
v2 Consumers Reading v1 Envelopes:
// v2 consumer checks for routing hints
envelope := &prism.PrismEnvelope{}
proto.Unmarshal(bytes, envelope)
if envelope.Routing != nil {
// Use routing hints (v2 envelope)
region := envelope.Routing.PreferredRegion
} else {
// No routing hints (v1 envelope), use default
region := "us-west-2"
}
Breaking Change Procedure (Last Resort):
If v3 envelope needs a breaking change:
# 1. Dual-publish period (6 months)
# Producer sends BOTH v2 and v3 envelopes (separate topics)
producer.publish("orders.created.v2", envelope_v2) # Existing consumers
producer.publish("orders.created.v3", envelope_v3) # New consumers
# 2. Consumer migration window (3 months)
# Consumers migrate from v2 → v3 topic at their own pace
# 3. Deprecation notice (3 months before cutoff)
# Prism logs warnings for v2 consumers
# 4. Cutoff date (12 months after v3 release)
# Stop publishing to v2 topic
Security Considerations
1. Auth Token Handling
message SecurityContext {
string auth_token = 3; // JWT or opaque token
}
Rules:
- Auth tokens are redacted in logs (never logged in plaintext)
- Auth tokens are validated by Prism proxy (not forwarded to backend)
- Consumers do not see auth tokens (proxy strips before delivery)
2. Message Signing
// Producer signs message
envelope := createEnvelope(payload)
signature := hmacSHA256(envelope, secretKey)
envelope.Security.Signature = signature
envelope.Security.SignatureAlgorithm = "hmac-sha256"
// Consumer verifies signature
computedSignature := hmacSHA256(envelope, secretKey)
if !bytes.Equal(computedSignature, envelope.Security.Signature) {
return errors.New("signature verification failed")
}
Use Cases:
- Prevent message tampering in untrusted backends
- Non-repudiation (prove publisher identity)
- Regulatory compliance (HIPAA, SOX)
3. PII Awareness
message SecurityContext {
bool contains_pii = 7; // Set by schema governance
}
Automatic Population:
- Prism proxy sets
contains_pii=true
if schema (RFC-030) has PII fields - Consumers check flag before logging/storing
- Audit logs track PII access
4. Data Classification
message SecurityContext {
string data_classification = 8; // "public", "internal", "confidential", "restricted"
}
Enforcement:
- High-classification messages require encryption
- Consumers validate their compliance level matches message classification
- Audit logs track access to restricted data
Payload Encryption Patterns
CRITICAL: All encryption implementations MUST use FIPS 140-3 validated cryptographic modules.
Pattern 1: Symmetric Encryption (Shared Secret)
Use Case: High-throughput messaging where producer and consumer share a secret key
Configuration (Producer):
encryption:
enabled: true
type: symmetric
algorithm: aes-256-gcm
key_ref: "vault://secrets/messaging/order-events/encryption-key"
Producer Code (Go):
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"prism.io/sdk/encryption"
)
// Load key from Vault (NOT from envelope!)
key := loadKeyFromVault("vault://secrets/messaging/order-events/encryption-key")
// Encrypt payload
nonce := make([]byte, 12) // GCM standard nonce size
rand.Read(nonce)
block, _ := aes.NewCipher(key)
aesgcm, _ := cipher.NewGCM(block)
ciphertext := aesgcm.Seal(nil, nonce, payloadBytes, nil)
// Populate envelope encryption metadata
envelope.Security.Encryption = &EncryptionMetadata{
KeyId: "order-events-key-v2", // Reference only, NOT the key itself
EncryptionType: ENCRYPTION_TYPE_SYMMETRIC,
Algorithm: "aes-256-gcm",
Iv: nonce,
Aad: nil, // Optional: can include message_id for binding
}
// Payload is now encrypted ciphertext
envelope.Payload = &Any{Value: ciphertext}
Consumer Code (Go):
// Load SAME key from Vault using key_id reference
keyId := envelope.Security.Encryption.KeyId
key := loadKeyFromVault("vault://secrets/messaging/" + keyId)
// Decrypt payload
nonce := envelope.Security.Encryption.Iv
block, _ := aes.NewCipher(key)
aesgcm, _ := cipher.NewGCM(block)
plaintext, err := aesgcm.Open(nil, nonce, envelope.Payload.Value, nil)
if err != nil {
return fmt.Errorf("decryption failed: %w", err)
}
FIPS Compliance: Use github.com/google/boringcrypto
or crypto/aes
with FIPS mode enabled.
Pattern 2: Asymmetric Encryption (Public/Private Key)
Use Case: Producer doesn't trust consumer's key storage; only consumer can decrypt
Configuration (Producer):
encryption:
enabled: true
type: asymmetric
algorithm: rsa-oaep-4096
public_key_ref: "vault://secrets/consumers/order-processor/public-key"
Configuration (Consumer):
encryption:
enabled: true
type: asymmetric
private_key_ref: "vault://secrets/consumers/order-processor/private-key"
Producer Code (Go):
import (
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
)
// Load consumer's PUBLIC key (producer can't decrypt)
publicKey := loadPublicKeyFromVault("vault://secrets/consumers/order-processor/public-key")
// Encrypt payload with consumer's public key
ciphertext, err := rsa.EncryptOAEP(
sha256.New(),
rand.Reader,
publicKey,
payloadBytes,
nil, // label
)
envelope.Security.Encryption = &EncryptionMetadata{
KeyId: "order-processor-key-v1", // Consumer's key ID
EncryptionType: ENCRYPTION_TYPE_ASYMMETRIC,
Algorithm: "rsa-oaep-4096",
PublicKeyId: "order-processor-public-v1",
// No IV needed for RSA-OAEP (uses random padding)
}
envelope.Payload = &Any{Value: ciphertext}
Consumer Code (Go):
// Load consumer's PRIVATE key (only consumer has this)
privateKey := loadPrivateKeyFromVault("vault://secrets/consumers/order-processor/private-key")
// Decrypt payload
plaintext, err := rsa.DecryptOAEP(
sha256.New(),
rand.Reader,
privateKey,
envelope.Payload.Value,
nil, // label
)
if err != nil {
return fmt.Errorf("asymmetric decryption failed: %w", err)
}
FIPS Compliance: RSA key size MUST be ≥3072 bits (4096 recommended). Use FIPS-validated libraries.
Pattern 3: Post-Quantum Encryption (Kyber/ML-KEM)
Use Case: Future-proof encryption resistant to quantum computer attacks
Configuration (Producer):
encryption:
enabled: true
type: post_quantum
algorithm: kyber1024 # NIST ML-KEM Level 5
public_key_ref: "vault://secrets/consumers/pq/order-processor/public-key"
Producer Code (Go):
import "github.com/cloudflare/circl/kem/kyber/kyber1024"
// Load consumer's Kyber public key
publicKey := loadKyberPublicKeyFromVault("vault://secrets/consumers/pq/order-processor/public-key")
// Generate shared secret using KEM
ct, ss, err := kyber1024.Encapsulate(publicKey) // ct = ciphertext (encapsulated key), ss = shared secret
if err != nil {
return err
}
// Use shared secret for symmetric encryption of payload
block, _ := aes.NewCipher(ss[:32]) // First 32 bytes of shared secret
aesgcm, _ := cipher.NewGCM(block)
nonce := make([]byte, 12)
rand.Read(nonce)
ciphertext := aesgcm.Seal(nil, nonce, payloadBytes, nil)
envelope.Security.Encryption = &EncryptionMetadata{
KeyId: "order-processor-kyber-v1",
EncryptionType: ENCRYPTION_TYPE_POST_QUANTUM,
Algorithm: "kyber1024",
PublicKeyId: "order-processor-kyber-public-v1",
EncapsulatedKey: ct, // KEM ciphertext (consumer needs this to derive shared secret)
Iv: nonce,
}
envelope.Payload = &Any{Value: ciphertext}
Consumer Code (Go):
// Load consumer's Kyber private key
privateKey := loadKyberPrivateKeyFromVault("vault://secrets/consumers/pq/order-processor/private-key")
// Decapsulate to recover shared secret
ss, err := kyber1024.Decapsulate(privateKey, envelope.Security.Encryption.EncapsulatedKey)
if err != nil {
return fmt.Errorf("KEM decapsulation failed: %w", err)
}
// Decrypt payload using shared secret
block, _ := aes.NewCipher(ss[:32])
aesgcm, _ := cipher.NewGCM(block)
plaintext, err := aesgcm.Open(nil, envelope.Security.Encryption.Iv, envelope.Payload.Value, nil)
if err != nil {
return fmt.Errorf("post-quantum decryption failed: %w", err)
}
FIPS Compliance: ML-KEM (Kyber) is standardized in FIPS 203. Use NIST-approved implementations.
Pattern 4: Hybrid Encryption (Classical + Post-Quantum)
Use Case: Transition period; protect against both current and future threats
Configuration (Producer):
encryption:
enabled: true
type: hybrid
algorithm: x25519-kyber1024
public_key_ref: "vault://secrets/consumers/hybrid/order-processor/public-key"
Producer Code (Go):
// Hybrid approach: X25519 (classical ECDH) + Kyber1024 (post-quantum KEM)
// Shared secret = KDF(x25519_secret || kyber_secret)
// 1. X25519 key exchange
x25519Public := loadX25519PublicKey("vault://secrets/consumers/hybrid/order-processor/x25519-public")
x25519Ephemeral := generateX25519EphemeralKey()
x25519Secret := x25519ECDH(x25519Ephemeral.private, x25519Public)
// 2. Kyber1024 KEM
kyberPublic := loadKyberPublicKey("vault://secrets/consumers/hybrid/order-processor/kyber-public")
kyberCt, kyberSecret, _ := kyber1024.Encapsulate(kyberPublic)
// 3. Combine secrets with KDF
combinedSecret := hkdf.Extract(sha256.New, append(x25519Secret, kyberSecret...))
// 4. Encrypt payload with combined secret
block, _ := aes.NewCipher(combinedSecret[:32])
aesgcm, _ := cipher.NewGCM(block)
nonce := make([]byte, 12)
rand.Read(nonce)
ciphertext := aesgcm.Seal(nil, nonce, payloadBytes, nil)
envelope.Security.Encryption = &EncryptionMetadata{
KeyId: "order-processor-hybrid-v1",
EncryptionType: ENCRYPTION_TYPE_HYBRID,
Algorithm: "x25519-kyber1024",
PublicKeyId: "order-processor-hybrid-public-v1",
EncapsulatedKey: kyberCt,
Iv: nonce,
AlgorithmParams: fmt.Sprintf(`{"x25519_ephemeral_public":"%s"}`, base64.Encode(x25519Ephemeral.public)),
}
envelope.Payload = &Any{Value: ciphertext}
Consumer Code (Go):
// Load consumer's hybrid private keys
x25519Private := loadX25519PrivateKey("vault://secrets/consumers/hybrid/order-processor/x25519-private")
kyberPrivate := loadKyberPrivateKey("vault://secrets/consumers/hybrid/order-processor/kyber-private")
// 1. Recover X25519 secret
params := parseAlgorithmParams(envelope.Security.Encryption.AlgorithmParams)
x25519EphemeralPublic := base64.Decode(params["x25519_ephemeral_public"])
x25519Secret := x25519ECDH(x25519Private, x25519EphemeralPublic)
// 2. Recover Kyber secret
kyberSecret, _ := kyber1024.Decapsulate(kyberPrivate, envelope.Security.Encryption.EncapsulatedKey)
// 3. Derive combined secret
combinedSecret := hkdf.Extract(sha256.New, append(x25519Secret, kyberSecret...))
// 4. Decrypt payload
block, _ := aes.NewCipher(combinedSecret[:32])
aesgcm, _ := cipher.NewGCM(block)
plaintext, err := aesgcm.Open(nil, envelope.Security.Encryption.Iv, envelope.Payload.Value, nil)
if err != nil {
return fmt.Errorf("hybrid decryption failed: %w", err)
}
FIPS Compliance: X25519 + Kyber1024 hybrid scheme provides both classical and post-quantum security.
FIPS 140-3 Compliance Requirements
Approved Algorithms (MUST USE):
Type | Algorithm | FIPS Standard | Key Size | Notes |
---|---|---|---|---|
Symmetric | AES-256-GCM | FIPS 197 | 256-bit | AEAD cipher, preferred |
Symmetric | ChaCha20-Poly1305 | RFC 8439 | 256-bit | AEAD cipher, FIPS approved |
Asymmetric | RSA-OAEP | FIPS 186-5 | ≥3072-bit | 4096-bit recommended |
Asymmetric | ECDH (X25519) | FIPS 186-5 | 256-bit | Curve25519 |
Post-Quantum | ML-KEM (Kyber) | FIPS 203 | Level 3/5 | Kyber768/1024 |
Hash | SHA-256 | FIPS 180-4 | 256-bit | For KDF, HMAC |
Hash | SHA-384 | FIPS 180-4 | 384-bit | For RSA signatures |
KDF | HKDF-SHA256 | NIST SP 800-108 | Variable | Key derivation |
Deprecated/Weak Algorithms (MUST NOT USE):
Algorithm | Reason | Replacement |
---|---|---|
AES-128-GCM | Key size too small | AES-256-GCM |
RSA-2048 | Insufficient for 2025+ | RSA-4096 |
MD5 | Cryptographically broken | SHA-256 |
SHA-1 | Collision attacks | SHA-256 |
3DES | Weak, slow | AES-256-GCM |
RC4 | Multiple vulnerabilities | ChaCha20-Poly1305 |
DES | Completely broken | AES-256-GCM |
Validation:
- Prism SDK MUST reject messages using deprecated algorithms
- Prism proxy MUST log warnings for non-FIPS algorithms
- Configuration validation MUST enforce FIPS compliance when
fips_mode: true
Go FIPS Libraries:
// Use FIPS-validated crypto libraries
import (
"crypto/aes" // FIPS 140-3 validated
"crypto/cipher" // FIPS 140-3 validated
"crypto/rsa" // FIPS 140-3 validated
"crypto/sha256" // FIPS 140-3 validated
// For Kyber/ML-KEM:
"github.com/cloudflare/circl/kem/kyber/kyber1024" // NIST ML-KEM implementation
// Avoid these (not FIPS compliant):
// "golang.org/x/crypto/chacha20poly1305" - use crypto/cipher instead
)
Environment Setup:
# Enable FIPS mode in Go runtime
export GOFIPS=1
export CGO_ENABLED=1
# Build with FIPS tags
go build -tags=fips ./...
Encryption Security Best Practices
1. Key Rotation:
encryption:
key_rotation:
enabled: true
rotation_period: 90d # Rotate every 90 days
overlap_period: 7d # Support both old and new keys for 7 days
2. Key Separation:
- ✅ DO: Use different keys per namespace/topic
- ❌ DON'T: Share encryption keys across environments (dev/staging/prod)
3. Audit Logging:
- All encryption/decryption operations MUST be audited
- Log: timestamp, key_id, algorithm, success/failure, message_id
4. Payload Size Limits:
- RSA-OAEP: Max payload = (key_size / 8) - 66 bytes
- RSA-4096: Max 446 bytes direct encryption
- For larger payloads, use hybrid encryption (RSA for session key, AES for data)
5. Nonce/IV Reuse Prevention:
- NEVER reuse nonce with same key
- Use cryptographically random nonces (
crypto/rand
) - For high-throughput: use counter-based nonces with sequence tracking
6. Timing Attack Prevention:
- Use constant-time comparison for MACs/signatures
- Go's
crypto/subtle.ConstantTimeCompare()
for validation
Key Management Integration
Vault Integration (Recommended):
encryption:
key_provider: vault
vault:
address: https://vault.example.com
namespace: prism/messaging
auth_method: kubernetes # Or: approle, token, etc.
secret_path: secret/data/encryption-keys
AWS KMS Integration:
encryption:
key_provider: aws_kms
aws_kms:
region: us-west-2
key_id: arn:aws:kms:us-west-2:123456789:key/abc-def-123
encryption_context:
namespace: order-events
environment: production
Kubernetes Secrets (For Development Only):
encryption:
key_provider: kubernetes_secret
kubernetes_secret:
name: prism-encryption-keys
namespace: prism-system
key_field: encryption_key
⚠️ WARNING: Kubernetes secrets are NOT suitable for production (base64-encoded, not encrypted at rest by default). Use Vault or KMS.
Observability Integration
W3C Trace Context Support:
message ObservabilityContext {
string trace_id = 1; // 32-hex-char trace ID
string span_id = 2; // 16-hex-char span ID
string parent_span_id = 3; // Parent span for nested traces
int32 trace_flags = 4; // Sampled bit, etc.
}
Automatic Trace Propagation:
# Producer (trace context from HTTP request)
with tracer.start_span("publish_order") as span:
client.publish(
topic="orders.created",
payload=order,
trace_context=span.context # SDK auto-populates observability fields
)
# Consumer (trace context continues)
for envelope in client.subscribe("orders.created"):
with tracer.start_span("process_order", parent_context=envelope.trace_context()) as span:
process_order(envelope.payload())
Metrics Labels:
message ObservabilityContext {
map<string, string> labels = 6; // Prometheus/Signoz labels
}
Use Cases:
- Track message volume by customer tier:
labels={tier: "premium"}
- SLA monitoring by region:
labels={region: "us-west-2"}
- Error rates by version:
labels={app_version: "v2.1.0"}
Schema Context Integration (RFC-030)
Automatic Schema Population:
# Namespace config (RFC-030)
namespaces:
- name: order-events
schema:
registry_type: prism
url: https://schema-registry.example.com
version: v2
Prism SDK Auto-Populates Schema Fields:
# Producer publishes with schema metadata
client.publish(
topic="orders.created",
payload=order # OrderCreated protobuf
)
# SDK automatically sets:
# envelope.schema.schema_url = "prism-registry.example.com/schemas/orders.created/v2"
# envelope.schema.schema_version = "v2"
# envelope.schema.schema_format = "protobuf"
# envelope.schema.schema_hash = "sha256:abc123..."
# envelope.schema.schema_name = "OrderCreated"
Consumer Validation:
envelope := <-stream
// Check schema compatibility
if envelope.Schema().SchemaVersion != "v2" {
log.Warn("Unexpected schema version", "expected", "v2", "actual", envelope.Schema().SchemaVersion)
}
// Verify schema integrity
expectedHash := "sha256:abc123..."
if envelope.Schema().SchemaHash != expectedHash {
return errors.New("schema hash mismatch, possible tampering")
}
Deprecation Tracking:
message SchemaContext {
repeated string deprecated_fields_used = 7; // Track deprecated field access
}
Producer Behavior:
- If message uses deprecated fields, SDK populates
deprecated_fields_used
- Enables migration tracking: "Which consumers still use old fields?"
Performance Characteristics
Envelope Overhead:
Backend | Baseline (no envelope) | With Prism Envelope | Overhead |
---|---|---|---|
Kafka | 500 bytes/msg | 650 bytes/msg | +150 bytes (+30%) |
NATS | 100 bytes/msg | 250 bytes/msg | +150 bytes (+150%) |
Redis | 200 bytes/msg | 350 bytes/msg | +150 bytes (+75%) |
Latency Impact:
Baseline publish (no envelope): 10ms P99
With envelope serialization: 10.5ms P99 (+5%)
Rationale: Protobuf serialization is <0.5ms even on mobile CPUs
Mitigation:
- Envelope is small (150-300 bytes typically)
- Protobuf is highly optimized (binary format)
- For high-throughput, batch multiple messages in single envelope
Migration Path from Current Systems
Phase 1: Dual-Write (Transition Period)
# Producer writes both old format and new envelope
# Old format (backward compat)
redis_client.publish("orders", json.dumps({"payload": order_dict}))
# New format (Prism envelope)
prism_client.publish("orders", payload=order)
Phase 2: Dual-Read (Consumers Migrate)
# Consumer reads both formats
msg = redis_client.get_message()
if is_prism_envelope(msg):
envelope = parse_prism_envelope(msg)
payload = envelope.payload()
else:
# Legacy format
payload = json.loads(msg)["payload"]
Phase 3: Prism-Only (Cutover)
# Producer only writes Prism envelope
prism_client.publish("orders", payload=order)
# Consumer only reads Prism envelope
envelope = prism_client.subscribe("orders")
Implementation Plan
Phase 1: Protobuf Definition (Week 1)
Deliverables:
- ✅ Define
prism.envelope.v1
protobuf package - ✅ Generate Go, Python, Rust client code
- ✅ Unit tests for envelope serialization/deserialization
- ✅ Documentation: Envelope field guide
Success Criteria:
- All fields documented with examples
- Protobuf compiles in all target languages
- Unit tests cover all optional field combinations
Phase 2: SDK Integration (Weeks 2-3)
Deliverables:
- ✅ Python SDK:
client.publish()
wraps payload in envelope - ✅ Go SDK: Type-safe envelope wrappers
- ✅ Rust SDK: Zero-copy envelope parsing
- ✅ Envelope builder API for custom metadata
Success Criteria:
- SDK hides envelope complexity from developers
- Publish/subscribe APIs unchanged (envelope is transparent)
- Performance overhead <5% latency
Phase 3: Backend Plugin Support (Weeks 4-5)
Deliverables:
- ✅ Kafka plugin: Envelope as message value + headers
- ✅ NATS plugin: Envelope as message data + NATS headers
- ✅ Redis plugin: Envelope as pub/sub message
- ✅ PostgreSQL plugin: Envelope as JSONB column
Success Criteria:
- All plugins serialize/deserialize envelope correctly
- Backend-specific features preserved (Kafka partition keys, NATS headers)
- Integration tests pass for all backends
Phase 4: Observability Integration (Week 6)
Deliverables:
- ✅ OpenTelemetry trace context propagation
- ✅ Prometheus metrics with envelope labels
- ✅ Signoz dashboard for envelope metadata
Success Criteria:
- Traces span producer → proxy → consumer
- Metrics breakdowns by topic, namespace, schema version
- Audit logs include envelope metadata
Phase 5: Migration Tools (Week 7)
Deliverables:
- ✅ CLI tool:
prism envelope migrate --from redis --to kafka
- ✅ Dual-write proxy for transition period
- ✅ Validation tool: Check envelope compatibility
Success Criteria:
- Zero downtime migration for existing deployments
- Backward compatibility verified with integration tests
Trade-Offs and Alternatives
Alternative 1: JSON Envelope
Pros:
- ✅ Human-readable (debugging easier)
- ✅ Language-agnostic (no code generation)
Cons:
- ❌ 3-10x larger than protobuf
- ❌ No type safety (runtime errors)
- ❌ Slower parsing (JSON vs protobuf)
Verdict: Protobuf's benefits outweigh JSON's readability.
Alternative 2: No Envelope (Backend-Specific Headers)
Pros:
- ✅ Zero overhead (no wrapper)
- ✅ Native to each backend
Cons:
- ❌ Inconsistent across backends
- ❌ Can't evolve metadata without breaking consumers
- ❌ No standard place for auth, schema, trace context
Verdict: Envelope provides consistency and evolution worth the overhead.
Alternative 3: CloudEvents Standard
Pros:
- ✅ Industry standard (CNCF)
- ✅ Rich tooling ecosystem
Cons:
- ❌ JSON-based (larger payloads)
- ❌ Designed for HTTP, not native pub/sub
- ❌ Missing Prism-specific fields (namespace, schema governance)
Verdict: CloudEvents-inspired but not compatible (different goals).
Success Criteria
- Developer Adoption: 80% of new pub/sub code uses Prism envelope within 6 months
- Performance: <5% latency overhead vs baseline (no envelope)
- Backward Compatibility: v1 → v2 envelope migration with zero downtime
- Cross-Backend Portability: Same producer/consumer code works with Kafka, NATS, Redis
- Security Compliance: 100% of PII messages have
contains_pii=true
flag
Open Questions
- Batch Envelope: Should we support multi-message envelopes for high-throughput?
- Compression: Should envelope metadata be compressed (gzip) for large payloads?
- Deduplication: Should envelope include nonce for idempotent processing?
- Replay: Should envelope track message lineage for event sourcing?
References
- RFC-030: Schema Evolution and Validation (schema context integration)
- RFC-014: Layered Data Access Patterns (pub/sub patterns)
- RFC-008: Proxy Plugin Architecture (backend plugins)
- W3C Trace Context
- CloudEvents Spec (inspiration for observability fields)
- Protobuf Best Practices
Revision History
- 2025-10-13 (v1): Initial draft - Unified message envelope protocol for pub/sub systems