Protocol Recording with Protobuf Tagging
Context
Prism handles complex distributed protocols (Queue, PubSub, Transact) across multiple services. Need to:
- Record protocol interactions for debugging
- Trace multi-step operations
- Reconstruct failure scenarios
- Audit protocol compliance
- Enable replay for testing
Requirements:
- Capture protocol messages without code changes
- Tag messages for categorization and filtering
- Support sampling (don't record everything)
- Queryable storage
- Privacy-aware (PII handling)
Decision
Use Protobuf custom options for protocol recording tags:
- Custom option
(prism.protocol)
: Tag messages for recording - Recording levels:
NONE
,METADATA
,FULL
- Sampling policy: Configurable per message type
- Storage backend: Pluggable (file, database, S3)
- Query interface: Filter by tags, time, session, operation
Rationale
Why Custom Protobuf Options
Protobuf options allow declarative metadata on messages:
- No code changes needed
- Centralized configuration
- Type-safe
- Code generation aware
- Version controlled
Protocol Option Definition
// proto/prism/options.proto
syntax = "proto3";
package prism;
import "google/protobuf/descriptor.proto";
// Protocol recording options
extend google.protobuf.MessageOptions {
ProtocolOptions protocol = 50100;
}
message ProtocolOptions {
// Recording level for this message type
RecordingLevel recording = 1;
// Protocol category
string category = 2; // "queue", "pubsub", "transact", "session"
// Operation name
string operation = 3; // "publish", "subscribe", "write", "commit"
// Sampling rate (0.0 - 1.0)
float sample_rate = 4 [default = 1.0];
// Include in protocol trace
bool trace = 5 [default = true];
// Tags for filtering
repeated string tags = 6;
}
enum RecordingLevel {
RECORDING_LEVEL_UNSPECIFIED = 0;
RECORDING_LEVEL_NONE = 1; // Don't record
RECORDING_LEVEL_METADATA = 2; // Record metadata only (no payload)
RECORDING_LEVEL_FULL = 3; // Record complete message
RECORDING_LEVEL_SAMPLED = 4; // Sample based on sample_rate
}
Tagged Message Examples
Queue Protocol:
// proto/prism/queue/v1/queue.proto
import "prism/options.proto";
message PublishRequest {
option (prism.protocol) = {
recording: RECORDING_LEVEL_FULL
category: "queue"
operation: "publish"
sample_rate: 0.1 // Record 10% of publish requests
tags: ["write", "producer"]
};
string session_token = 1;
string topic = 2;
bytes payload = 3;
// ...
}
message PublishResponse {
option (prism.protocol) = {
recording: RECORDING_LEVEL_METADATA
category: "queue"
operation: "publish_response"
tags: ["write", "producer"]
};
string message_id = 1;
int64 offset = 2;
// ...
}
message Message {
option (prism.protocol) = {
recording: RECORDING_LEVEL_SAMPLED
category: "queue"
operation: "message_delivery"
sample_rate: 0.05 // Record 5% of messages
tags: ["read", "consumer"]
};
string message_id = 1;
bytes payload = 2;
// ...
}
Transaction Protocol:
// proto/prism/transact/v1/transact.proto
message WriteRequest {
option (prism.protocol) = {
recording: RECORDING_LEVEL_FULL
category: "transact"
operation: "write"
sample_rate: 1.0 // Record all transactions
trace: true
tags: ["transaction", "write", "critical"]
};
DataWrite data = 1;
MailboxWrite mailbox = 2;
// ...
}
message TransactionStarted {
option (prism.protocol) = {
recording: RECORDING_LEVEL_METADATA
category: "transact"
operation: "begin"
tags: ["transaction", "lifecycle"]
};
string transaction_id = 1;
// ...
}
Recording Infrastructure
Protocol Recorder Interface:
// proxy/src/protocol/recorder.rs
use prost::Message;
#[async_trait]
pub trait ProtocolRecorder: Send + Sync {
async fn record(&self, entry: ProtocolEntry) -> Result<()>;
async fn query(&self, filter: ProtocolFilter) -> Result<Vec<ProtocolEntry>>;
}
pub struct ProtocolEntry {
pub id: String,
pub timestamp: Timestamp,
pub session_id: Option<String>,
pub category: String,
pub operation: String,
pub message_type: String,
pub recording_level: RecordingLevel,
pub metadata: HashMap<String, String>,
pub payload: Option<Vec<u8>>, // Only if FULL recording
pub tags: Vec<String>,
}
pub struct ProtocolFilter {
pub start_time: Option<Timestamp>,
pub end_time: Option<Timestamp>,
pub session_id: Option<String>,
pub category: Option<String>,
pub operation: Option<String>,
pub tags: Vec<String>,
}
Interceptor for Recording:
// proxy/src/protocol/interceptor.rs
pub struct RecordingInterceptor {
recorder: Arc<dyn ProtocolRecorder>,
sampler: Arc<Sampler>,
}
impl Interceptor for RecordingInterceptor {
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
let message_type = req.extensions().get::<MessageType>().unwrap();
// Get protocol options from generated code
let options = get_protocol_options(message_type);
// Check if should record
if !should_record(&options, &self.sampler) {
return Ok(req);
}
// Extract metadata
let metadata = extract_metadata(&req);
// Get payload based on recording level
let payload = match options.recording {
RecordingLevel::Full => Some(req.get_ref().encode_to_vec()),
_ => None,
};
// Record
let entry = ProtocolEntry {
id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
session_id: metadata.get("session_id").cloned(),
category: options.category.clone(),
operation: options.operation.clone(),
message_type: message_type.clone(),
recording_level: options.recording,
metadata,
payload,
tags: options.tags.clone(),
};
tokio::spawn(async move {
recorder.record(entry).await.ok();
});
Ok(req)
}
}
Sampling Logic:
pub struct Sampler {
rng: ThreadRng,
}
impl Sampler {
fn should_sample(&self, sample_rate: f32) -> bool {
if sample_rate >= 1.0 {
return true;
}
if sample_rate <= 0.0 {
return false;
}
self.rng.gen::<f32>() < sample_rate
}
}
fn should_record(options: &ProtocolOptions, sampler: &Sampler) -> bool {
match options.recording {
RecordingLevel::None => false,
RecordingLevel::Metadata | RecordingLevel::Full => true,
RecordingLevel::Sampled => sampler.should_sample(options.sample_rate),
RecordingLevel::Unspecified => false,
}
}
Storage Backends
File Storage:
pub struct FileProtocolRecorder {
path: PathBuf,
}
impl ProtocolRecorder for FileProtocolRecorder {
async fn record(&self, entry: ProtocolEntry) -> Result<()> {
let json = serde_json::to_string(&entry)?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
writeln!(file, "{}", json)?;
Ok(())
}
}
PostgreSQL Storage:
pub struct PostgresProtocolRecorder {
pool: PgPool,
}
impl ProtocolRecorder for PostgresProtocolRecorder {
async fn record(&self, entry: ProtocolEntry) -> Result<()> {
sqlx::query(
r#"
INSERT INTO protocol_recordings
(id, timestamp, session_id, category, operation, message_type,
recording_level, metadata, payload, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#
)
.bind(&entry.id)
.bind(&entry.timestamp)
.bind(&entry.session_id)
.bind(&entry.category)
.bind(&entry.operation)
.bind(&entry.message_type)
.bind(&entry.recording_level)
.bind(&entry.metadata)
.bind(&entry.payload)
.bind(&entry.tags)
.execute(&self.pool)
.await?;
Ok(())
}
async fn query(&self, filter: ProtocolFilter) -> Result<Vec<ProtocolEntry>> {
let mut query = QueryBuilder::new(
"SELECT * FROM protocol_recordings WHERE 1=1"
);
if let Some(start) = filter.start_time {
query.push(" AND timestamp >= ").push_bind(start);
}
if let Some(category) = filter.category {
query.push(" AND category = ").push_bind(category);
}
if !filter.tags.is_empty() {
query.push(" AND tags && ").push_bind(&filter.tags);
}
query.push(" ORDER BY timestamp DESC LIMIT 1000");
let entries = query
.build_query_as::<ProtocolEntry>()
.fetch_all(&self.pool)
.await?;
Ok(entries)
}
}
Query Interface
CLI Tool:
# Query protocol recordings
prism-admin protocol query \
--category queue \
--operation publish \
--session abc123 \
--start "2025-10-07T00:00:00Z" \
--tags write,producer
# Replay protocol sequence
prism-admin protocol replay \
--session abc123 \
--start "2025-10-07T12:00:00Z" \
--end "2025-10-07T12:05:00Z"
gRPC Admin API:
service AdminService {
// Query protocol recordings
rpc QueryProtocol(QueryProtocolRequest) returns (stream ProtocolEntry);
// Replay protocol sequence
rpc ReplayProtocol(ReplayProtocolRequest) returns (stream ReplayEvent);
}
message QueryProtocolRequest {
optional google.protobuf.Timestamp start_time = 1;
optional google.protobuf.Timestamp end_time = 2;
optional string session_id = 3;
optional string category = 4;
optional string operation = 5;
repeated string tags = 6;
int32 limit = 7;
}
Privacy Considerations
PII in Protocol Messages:
message UserProfile {
option (prism.protocol) = {
recording: RECORDING_LEVEL_METADATA // Don't record full payload
category: "data"
operation: "user_profile"
tags: ["pii", "sensitive"]
};
string user_id = 1;
string email = 2 [(prism.pii) = "email"]; // Flagged as PII
string name = 3 [(prism.pii) = "name"];
}
Automatic PII Scrubbing:
fn scrub_pii(entry: &mut ProtocolEntry) {
if entry.tags.contains(&"pii".to_string()) {
// Scrub payload if contains PII
if let Some(payload) = &mut entry.payload {
*payload = scrub_pii_from_bytes(payload);
}
// Scrub metadata
for (key, value) in &mut entry.metadata {
if is_pii_field(key) {
*value = "[REDACTED]".to_string();
}
}
}
}
Configuration
# proxy/config.yaml
protocol_recording:
enabled: true
backend: postgres
postgres:
connection_string: postgres://...
table: protocol_recordings
# Override recording levels
overrides:
- message_type: "prism.queue.v1.Message"
recording: RECORDING_LEVEL_NONE # Disable for performance
- category: "transact"
recording: RECORDING_LEVEL_FULL # Always record transactions
# Global sampling
default_sample_rate: 0.1 # 10% by default
# Retention
retention_days: 30
auto_cleanup: true
Alternatives Considered
-
Application-level logging
- Pros: Simple, already exists
- Cons: Not structured, hard to query, scattered
- Rejected: Need structured protocol-specific recording
-
Network packet capture
- Pros: Captures everything, no code changes
- Cons: Binary parsing, performance impact, storage intensive
- Rejected: Too low-level, hard to query
-
OpenTelemetry spans
- Pros: Standard, integrates with tracing
- Cons: Not protocol-specific, limited queryability
- Deferred: Use for tracing, protocol recording for detailed protocol analysis
Consequences
Positive
- Declarative: Protocol recording via protobuf tags
- Type-safe: Options validated at compile time
- Queryable: Structured storage enables filtering
- Sampling: Control recording overhead
- Privacy-aware: PII handling built-in
- Debuggable: Reconstruct protocol sequences
Negative
- Storage overhead: Recording consumes storage
- Performance impact: Interceptor adds latency (mitigated by async)
- Complexity: Another system to manage
Neutral
- Retention policy: Must configure cleanup
- Query performance: Depends on storage backend
Implementation Notes
Code Generation
Extract protocol options in build:
// build.rs
fn main() {
// Generate protocol option extractors
prost_build::Config::new()
.type_attribute(".", "#[derive(serde::Serialize)]")
.compile_protos(&["proto/prism/queue/v1/queue.proto"], &["proto/"])
.unwrap();
}
Database Schema
CREATE TABLE protocol_recordings (
id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
session_id TEXT,
category TEXT NOT NULL,
operation TEXT NOT NULL,
message_type TEXT NOT NULL,
recording_level TEXT NOT NULL,
metadata JSONB,
payload BYTEA,
tags TEXT[],
-- Indexes for querying
INDEX idx_timestamp ON protocol_recordings(timestamp),
INDEX idx_session ON protocol_recordings(session_id),
INDEX idx_category ON protocol_recordings(category),
INDEX idx_tags ON protocol_recordings USING GIN(tags)
);
-- Retention policy
CREATE INDEX idx_retention ON protocol_recordings(timestamp)
WHERE timestamp < NOW() - INTERVAL '30 days';
References
- Protobuf Options
- ADR-003: Protobuf as Single Source of Truth
- ADR-008: Observability Strategy
Revision History
- 2025-10-07: Initial draft and acceptance