OpenTelemetry Tracing Integration
Context
Prism's request flow spans multiple components:
- Client (application)
- Proxy (Rust) - gRPC server, authentication, routing
- Plugin (Go/Rust) - Backend-specific protocol implementation
- Backend (PostgreSQL, Kafka, Redis, etc.) - Data storage
Debugging issues requires understanding the full request path. Without distributed tracing:
- No visibility into plugin-level operations
- Can't correlate proxy logs with plugin logs
- Hard to identify bottlenecks (is it proxy routing? plugin processing? backend query?)
- No end-to-end latency breakdown
ADR-008 established OpenTelemetry as our observability strategy. This ADR details the implementation of distributed tracing across Rust proxy and Go plugins, with trace context propagation at every hop.
Decision
Implement end-to-end OpenTelemetry tracing across all Prism components:
- Rust Proxy: Use
tracing
+tracing-opentelemetry
crates - Go Plugins: Use
go.opentelemetry.io/otel
in plugin core library - Trace Propagation: W3C Trace Context via gRPC metadata
- Plugin Core Integration: Automatic tracing middleware in
plugins/core
package - Sampling Strategy: Adaptive sampling (100% errors, 100% slow requests, 1% normal)
Rationale
Why End-to-End Tracing?
Problem: Request takes 150ms. Where is the time spent?
With tracing: prism.handle_request [150ms total] ├─ prism.auth.verify [5ms] ├─ prism.routing.select_plugin [2ms] ├─ plugin.postgres.execute [140ms] ← Bottleneck found! │ ├─ plugin.pool.acquire [3ms] │ └─ postgres.query [137ms] │ └─ SQL: SELECT * FROM... [135ms] └─ prism.response.serialize [3ms]
### Architecture
sequenceDiagram participant Client participant Proxy as Rust Proxy participant Plugin as Go Plugin participant Backend as Backend (PostgreSQL)
Client->>Proxy: gRPC Request<br/>(traceparent header)
Note over Proxy: Extract trace context<br/>Create root span<br/>trace_id: abc123
Proxy->>Proxy: Auth span<br/>(parent: abc123)
Proxy->>Proxy: Routing span<br/>(parent: abc123)
Proxy->>Plugin: gRPC Plugin Call<br/>(inject traceparent)
Note over Plugin: Extract trace context<br/>Create child span<br/>same trace_id: abc123
Plugin->>Backend: PostgreSQL Query<br/>(with trace context)
Backend-->>Plugin: Result
Plugin-->>Proxy: gRPC Response
Proxy-->>Client: gRPC Response
Note over Client,Backend: All spans linked by<br/>trace_id: abc123
### Trace Context Propagation
Use **W3C Trace Context** standard:
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
│ │ │ └─ flags (sampled)
│ │ └─ span_id (16 hex)
│ └─ trace_id (32 hex)
└─ version
Propagation Flow:
- Client → Proxy: gRPC metadata
traceparent
- Proxy → Plugin: gRPC metadata
traceparent
- Plugin → Backend: SQL comments or protocol-specific tags
Implementation
1. Rust Proxy Integration
Proxy Initialization
// proxy/src/telemetry.rs
use opentelemetry::sdk::trace::{self, Tracer, Sampler};
use opentelemetry::global;
use tracing_subscriber::{layer::SubscriberExt, Registry, EnvFilter};
use tracing_opentelemetry::OpenTelemetryLayer;
pub fn init_telemetry() -> Result<()> {
// Configure OpenTelemetry tracer with Jaeger exporter
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("prism-proxy")
.with_agent_endpoint("jaeger:6831")
.with_trace_config(
trace::config()
.with_sampler(AdaptiveSampler::new())
.with_resource(opentelemetry::sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "prism-proxy"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]))
)
.install_batch(opentelemetry::runtime::Tokio)?;
// Create OpenTelemetry tracing layer
let telemetry_layer = OpenTelemetryLayer::new(tracer);
// Combine with structured logging
let subscriber = Registry::default()
.with(EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().json())
.with(telemetry_layer);
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
/// Adaptive sampler: 100% errors, 100% slow (>100ms), 1% normal
pub struct AdaptiveSampler;
impl AdaptiveSampler {
pub fn new() -> Self {
Self
}
}
impl Sampler for AdaptiveSampler {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
trace_id: opentelemetry::trace::TraceId,
name: &str,
_span_kind: &opentelemetry::trace::SpanKind,
attributes: &[opentelemetry::KeyValue],
_links: &[opentelemetry::trace::Link],
) -> opentelemetry::sdk::trace::SamplingResult {
use opentelemetry::sdk::trace::SamplingDecision;
// Always sample if parent is sampled
if let Some(ctx) = parent_context {
if ctx.span().span_context().is_sampled() {
return SamplingResult {
decision: SamplingDecision::RecordAndSample,
attributes: vec![],
trace_state: Default::default(),
};
}
}
// Check for error attribute
let has_error = attributes.iter()
.any(|kv| kv.key.as_str() == "error" && kv.value.as_str() == "true");
if has_error {
return SamplingResult {
decision: SamplingDecision::RecordAndSample,
attributes: vec![],
trace_state: Default::default(),
};
}
// Check for high latency (set by instrumentation)
let is_slow = attributes.iter()
.any(|kv| kv.key.as_str() == "slow" && kv.value.as_str() == "true");
if is_slow {
return SamplingResult {
decision: SamplingDecision::RecordAndSample,
attributes: vec![],
trace_state: Default::default(),
};
}
// Otherwise 1% sample rate
let sample = (trace_id.to_u128() % 100) == 0;
SamplingResult {
decision: if sample {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
},
attributes: vec![],
trace_state: Default::default(),
}
}
}
gRPC Request Handler
// proxy/src/grpc/handler.rs
use tonic::{Request, Response, Status};
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing::{instrument, info, error};
/// Extract trace context from gRPC metadata
struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap);
impl<'a> Extractor for MetadataExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0.keys().map(|k| k.as_str()).collect()
}
}
/// Inject trace context into gRPC metadata
struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl<'a> Injector for MetadataInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(metadata_value) = tonic::metadata::MetadataValue::try_from(&value) {
self.0.insert(
tonic::metadata::MetadataKey::from_bytes(key.as_bytes()).unwrap(),
metadata_value
);
}
}
}
#[instrument(
skip(request, plugin_client),
fields(
request_id = %request.get_ref().request_id,
namespace = %request.get_ref().namespace,
operation = %request.get_ref().operation,
trace_id = tracing::field::Empty,
)
)]
pub async fn handle_data_request(
request: Request<DataRequest>,
plugin_client: &PluginClient,
) -> Result<Response<DataResponse>, Status> {
// Extract trace context from incoming request
let propagator = TraceContextPropagator::new();
let parent_context = propagator.extract(&MetadataExtractor(request.metadata()));
// Set current trace context
let span = tracing::Span::current();
if let Some(span_ref) = parent_context.span().span_context().trace_id().to_string() {
span.record("trace_id", &tracing::field::display(&span_ref));
}
info!("Processing data request");
// Create child span for plugin call
let plugin_response = {
let _plugin_span = tracing::info_span!(
"plugin.execute",
plugin = "postgres",
backend = "postgresql"
).entered();
// Inject trace context into plugin request metadata
let mut plugin_metadata = tonic::metadata::MetadataMap::new();
let current_context = opentelemetry::Context::current();
propagator.inject_context(¤t_context, &mut MetadataInjector(&mut plugin_metadata));
let mut plugin_req = tonic::Request::new(request.into_inner());
*plugin_req.metadata_mut() = plugin_metadata;
plugin_client.execute(plugin_req).await
.map_err(|e| {
error!(error = %e, "Plugin execution failed");
Status::internal("Plugin error")
})?
};
info!("Request completed successfully");
Ok(plugin_response)
}
2. Go Plugin Core Integration
Plugin Core Library (plugins/core/tracing
)
// plugins/core/tracing/tracing.go
package tracing
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
)
// InitTracer initializes OpenTelemetry tracing for a plugin
func InitTracer(pluginName, pluginVersion string) (func(context.Context) error, error) {
// Create Jaeger exporter
exporter, err := jaeger.New(jaeger.WithAgentEndpoint())
if err != nil {
return nil, fmt.Errorf("failed to create Jaeger exporter: %w", err)
}
// Create trace provider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(fmt.Sprintf("prism-plugin-%s", pluginName)),
semconv.ServiceVersionKey.String(pluginVersion),
attribute.String("plugin.name", pluginName),
)),
sdktrace.WithSampler(newAdaptiveSampler()),
)
// Set global trace provider
otel.SetTracerProvider(tp)
// Set global propagator (W3C Trace Context)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
// Return cleanup function
return tp.Shutdown, nil
}
// ExtractTraceContext extracts trace context from gRPC incoming metadata
func ExtractTraceContext(ctx context.Context) context.Context {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ctx
}
// Create propagator
propagator := otel.GetTextMapPropagator()
// Extract trace context from metadata
return propagator.Extract(ctx, &metadataSupplier{md})
}
// InjectTraceContext injects trace context into gRPC outgoing metadata
func InjectTraceContext(ctx context.Context) context.Context {
md := metadata.MD{}
// Create propagator
propagator := otel.GetTextMapPropagator()
// Inject trace context into metadata
propagator.Inject(ctx, &metadataSupplier{md})
return metadata.NewOutgoingContext(ctx, md)
}
// metadataSupplier implements TextMapCarrier for gRPC metadata
type metadataSupplier struct {
metadata metadata.MD
}
func (s *metadataSupplier) Get(key string) string {
values := s.metadata.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}
func (s *metadataSupplier) Set(key, value string) {
s.metadata.Set(key, value)
}
func (s *metadataSupplier) Keys() []string {
keys := make([]string, 0, len(s.metadata))
for k := range s.metadata {
keys = append(keys, k)
}
return keys
}
// adaptiveSampler implements same logic as Rust proxy
type adaptiveSampler struct {
fallback sdktrace.Sampler
}
func newAdaptiveSampler() sdktrace.Sampler {
return &adaptiveSampler{
fallback: sdktrace.TraceIDRatioBased(0.01), // 1% for normal requests
}
}
func (s *adaptiveSampler) ShouldSample(p sdktrace.SamplingParameters) sdktrace.SamplingResult {
// Always sample if parent is sampled
if p.ParentContext.IsSampled() {
return sdktrace.SamplingResult{
Decision: sdktrace.RecordAndSample,
Tracestate: trace.SpanContextFromContext(p.ParentContext).TraceState(),
}
}
// Check for error attribute
for _, attr := range p.Attributes {
if attr.Key == "error" && attr.Value.AsString() == "true" {
return sdktrace.SamplingResult{
Decision: sdktrace.RecordAndSample,
Tracestate: trace.SpanContextFromContext(p.ParentContext).TraceState(),
}
}
}
// Fallback to ratio-based sampling (1%)
return s.fallback.ShouldSample(p)
}
func (s *adaptiveSampler) Description() string {
return "AdaptiveSampler{errors=100%, slow=100%, normal=1%}"
}
Plugin gRPC Interceptor
// plugins/core/tracing/interceptor.go
package tracing
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
// UnaryServerInterceptor creates a gRPC interceptor for automatic tracing
func UnaryServerInterceptor(pluginName string) grpc.UnaryServerInterceptor {
tracer := otel.Tracer(fmt.Sprintf("prism-plugin-%s", pluginName))
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Extract trace context from incoming metadata
ctx = ExtractTraceContext(ctx)
// Start new span
ctx, span := tracer.Start(ctx, info.FullMethod,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("rpc.system", "grpc"),
attribute.String("rpc.service", pluginName),
attribute.String("rpc.method", info.FullMethod),
),
)
defer span.End()
// Call handler
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetStatus(codes.Ok, "")
}
return resp, err
}
}
3. Plugin Implementation Example
// plugins/postgres/main.go
package main
import (
"context"
"log"
"github.com/jrepp/prism-data-layer/plugins/core/tracing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
)
func main() {
// Initialize tracing
shutdown, err := tracing.InitTracer("postgres", "1.0.0")
if err != nil {
log.Fatalf("Failed to initialize tracing: %v", err)
}
defer shutdown(context.Background())
// Create gRPC server with tracing interceptor
server := grpc.NewServer(
grpc.UnaryInterceptor(tracing.UnaryServerInterceptor("postgres")),
)
// Register plugin service
RegisterPluginService(server, &PostgresPlugin{})
// Start server...
}
// PostgresPlugin implements plugin with tracing
type PostgresPlugin struct {
tracer trace.Tracer
}
func (p *PostgresPlugin) Execute(ctx context.Context, req *ExecuteRequest) (*ExecuteResponse, error) {
// Trace context automatically extracted by interceptor
// Create child span for database operation
tracer := otel.Tracer("prism-plugin-postgres")
ctx, span := tracer.Start(ctx, "postgres.query",
trace.WithAttributes(
attribute.String("db.system", "postgresql"),
attribute.String("db.operation", req.Operation),
attribute.String("db.table", req.Table),
),
)
defer span.End()
// Execute query (trace context propagated)
result, err := p.db.QueryContext(ctx, req.Query)
if err != nil {
span.RecordError(err)
return nil, err
}
span.SetAttributes(attribute.Int("db.rows", result.RowsAffected))
return &ExecuteResponse{Data: result}, nil
}
4. Backend Trace Context Propagation
PostgreSQL (SQL Comments)
// Inject trace context as SQL comment
func addTraceContextToQuery(ctx context.Context, query string) string {
spanCtx := trace.SpanContextFromContext(ctx)
if !spanCtx.IsValid() {
return query
}
comment := fmt.Sprintf(
"/* traceparent='%s' */",
spanCtx.TraceID().String(),
)
return comment + " " + query
}
// Usage
query := "SELECT * FROM users WHERE id = $1"
tracedQuery := addTraceContextToQuery(ctx, query)
// Result: /* traceparent='0af765...' */ SELECT * FROM users WHERE id = $1
Redis (Tags)
// Add trace context to Redis commands
func executeWithTracing(ctx context.Context, cmd string, args ...interface{}) error {
spanCtx := trace.SpanContextFromContext(ctx)
// Add trace_id as tag
if spanCtx.IsValid() {
args = append(args, "trace_id", spanCtx.TraceID().String())
}
return redisClient.Do(ctx, cmd, args...)
}
5. Testing Tracing
// plugins/core/tracing/tracing_test.go
package tracing_test
import (
"context"
"testing"
"github.com/jrepp/prism-data-layer/plugins/core/tracing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"google.golang.org/grpc/metadata"
)
func TestTraceContextPropagation(t *testing.T) {
// Create in-memory span recorder
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
otel.SetTracerProvider(tp)
// Create parent span
ctx := context.Background()
tracer := tp.Tracer("test")
ctx, parentSpan := tracer.Start(ctx, "parent")
defer parentSpan.End()
// Inject trace context into metadata
ctx = tracing.InjectTraceContext(ctx)
// Simulate gRPC call - extract on receiver side
md, _ := metadata.FromOutgoingContext(ctx)
receiverCtx := metadata.NewIncomingContext(context.Background(), md)
receiverCtx = tracing.ExtractTraceContext(receiverCtx)
// Create child span in receiver
_, childSpan := tracer.Start(receiverCtx, "child")
childSpan.End()
// Verify trace lineage
spans := sr.Ended()
if len(spans) != 2 {
t.Fatalf("Expected 2 spans, got %d", len(spans))
}
parentTraceID := spans[0].SpanContext().TraceID()
childTraceID := spans[1].SpanContext().TraceID()
if parentTraceID != childTraceID {
t.Errorf("Child span has different trace ID: parent=%s, child=%s",
parentTraceID, childTraceID)
}
if spans[1].Parent().SpanID() != spans[0].SpanContext().SpanID() {
t.Error("Child span not linked to parent")
}
}
Alternatives Considered
1. Custom Tracing Implementation
- Pros: Full control, minimal dependencies
- Cons: Reinventing the wheel, no ecosystem, hard to maintain
- Rejected: OpenTelemetry is industry standard
2. Jaeger-Only (No OpenTelemetry)
- Pros: Simpler, direct Jaeger integration
- Cons: Vendor lock-in, no metrics/logs correlation
- Rejected: OpenTelemetry provides vendor neutrality
3. No Plugin Tracing
- Pros: Simpler plugin implementation
- Cons: No visibility into plugin internals, hard to debug
- Rejected: Plugin performance is critical to debug
4. Separate Trace IDs per Component
- Pros: Simpler (no context propagation)
- Cons: Can't correlate proxy and plugin spans
- Rejected: End-to-end correlation essential
Consequences
Positive
✅ Complete Visibility: Full request path from client to backend ✅ Bottleneck Identification: Know exactly where time is spent ✅ Cross-Language Tracing: Rust proxy + Go plugins seamlessly connected ✅ Production Debugging: Trace sampling captures errors and slow requests ✅ Plugin Core Simplicity: Automatic tracing via interceptor, minimal plugin code
Negative
❌ Resource Overhead: Tracing adds CPU/memory cost (~2-5%)
- Mitigation: Adaptive sampling (1% normal requests)
❌ Complexity: Developers must understand trace context propagation
- Mitigation: Plugin core library handles propagation automatically
❌ Storage Cost: Traces require storage (Jaeger/Tempo backend)
- Mitigation: Retention policies (7 days default)
Neutral
⚪ Learning Curve: Team must learn OpenTelemetry concepts
- Industry-standard skill, valuable beyond Prism
⚪ Backend-Specific Propagation: Each backend (PostgreSQL, Redis) needs custom trace injection
- One-time implementation per backend type
Implementation Notes
Deployment Configuration
# docker-compose.yaml
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "6831:6831/udp" # Jaeger agent (UDP)
environment:
COLLECTOR_ZIPKIN_HOST_PORT: ":9411"
prism-proxy:
image: prism/proxy:latest
environment:
OTEL_EXPORTER_JAEGER_AGENT_HOST: jaeger
OTEL_EXPORTER_JAEGER_AGENT_PORT: 6831
OTEL_SERVICE_NAME: prism-proxy
depends_on:
- jaeger
postgres-plugin:
image: prism/plugin-postgres:latest
environment:
OTEL_EXPORTER_JAEGER_AGENT_HOST: jaeger
OTEL_EXPORTER_JAEGER_AGENT_PORT: 6831
OTEL_SERVICE_NAME: prism-plugin-postgres
depends_on:
- jaeger
Trace Example (Jaeger UI)
Trace ID: 0af7651916cd43dd8448eb211c80319c Duration: 142ms Spans: 7
prism-proxy: handle_data_request [142ms] ├─ prism-proxy: auth.verify [3ms] ├─ prism-proxy: routing.select_plugin [1ms] ├─ prism-proxy: plugin.execute [137ms] │ │ │ └─ prism-plugin-postgres: Execute [136ms] │ ├─ prism-plugin-postgres: pool.acquire [2ms] │ └─ prism-plugin-postgres: postgres.query [134ms] │ └─ postgresql: SELECT * FROM users WHERE id = $1 [132ms]
## References
- [ADR-008: Observability Strategy](/adr/adr-008) - High-level observability architecture
- [OpenTelemetry Specification](https://opentelemetry.io/docs/specs/otel/)
- [W3C Trace Context](https://www.w3.org/TR/trace-context/)
- [OpenTelemetry Rust](https://github.com/open-telemetry/opentelemetry-rust)
- [OpenTelemetry Go](https://github.com/open-telemetry/opentelemetry-go)
- [Tracing in Rust with Tokio](https://tokio.rs/tokio/topics/tracing)
- RFC-008: Plugin Architecture - Plugin core library design
## Revision History
- 2025-10-09: Initial draft and acceptance