Skip to main content

RFC-040: Multi-Language Client SDK Architecture

Status: Draft Author: Platform Team Created: 2025-10-19 Updated: 2025-10-19

Abstract

This RFC defines the architecture for Prism client SDKs in Rust, Python, and Go. These SDKs provide first-class, idiomatic client libraries that expose pattern interfaces (Producer, Consumer, KeyValue) directly to application developers. The SDKs are designed with:

  1. Full integration test coverage using testcontainers and real local backends
  2. Shared directory structure across all three languages for consistency
  3. Async-first APIs leveraging native concurrency primitives (tokio, asyncio, goroutines)
  4. Direct gRPC communication with Prism proxy for maximum performance
  5. OAuth2 authentication as the default auth mechanism
  6. Namespace-aware configuration for multi-tenancy support

The SDKs abstract away the complexity of backend implementation while providing simple, type-safe APIs that feel native to each language ecosystem.

Motivation

Problem Statement

Application teams currently face several challenges when integrating with Prism:

  1. No Official Client Libraries: Teams must hand-write gRPC clients or use generic tools
  2. Pattern Discovery: No clear mapping from use case → pattern API
  3. Auth Complexity: OAuth2 token management is repetitive and error-prone
  4. Namespace Configuration: Unclear how to configure pattern-specific settings
  5. Testing Difficulty: No guidance on testing against Prism locally
  6. Language Inconsistency: Each team invents their own client patterns

Goals

  1. Developer Experience: Simple, idiomatic APIs in Rust, Python, and Go
  2. Type Safety: Leverage protobuf code generation for compile-time safety
  3. Pattern-First Design: Expose Producer, Consumer, KeyValue as first-class APIs
  4. OAuth2 Integration: Built-in token acquisition and refresh
  5. Comprehensive Testing: Full integration test suites with testcontainers
  6. Namespace Support: Easy configuration of namespace-specific settings
  7. Observability: Built-in metrics, tracing, and structured logging
  8. Cross-Language Consistency: Same API shape across Rust, Python, Go

Non-Goals

  1. Admin API Client: This RFC focuses on data plane clients (separate admin SDK)
  2. Reactive Streams: Advanced backpressure mechanisms (use native streams)
  3. Custom Serialization: Only protobuf-based serialization supported initially
  4. Multi-Region Failover: Automatic cross-region failover (use DNS-based discovery)
  5. Schema Registry Integration: Schema evolution (deferred to separate RFC)

Design Principles

1. Pattern-Centric APIs

Expose patterns as top-level modules, not low-level gRPC calls:

// ✅ Good: Pattern-centric
use prism_client::Producer;

let producer = Producer::connect("orders-namespace").await?;
producer.publish(b"order-123", order_data).await?;

// ❌ Bad: Low-level gRPC
use prism_proto::pubsub::PubSubBasicInterfaceClient;

let mut client = PubSubBasicInterfaceClient::connect("http://prism:8980").await?;
client.publish(PublishRequest { topic: "...", ... }).await?;

2. Synchronous and Asynchronous APIs

Synchronous APIs as the default for simplicity, with asynchronous APIs available for advanced use cases:

  • Sync: Simple, blocking APIs for CLI tools, scripts, and traditional applications
  • Async: High-performance APIs for concurrent workloads using native runtimes

Each language provides both sync and async interfaces:

  • Rust: Sync wrappers around tokio (using Runtime::block_on)
  • Python: Sync module alongside asyncio (using asyncio.run internally)
  • Go: Naturally synchronous with goroutines (async patterns via channels)

Default recommendation: Use synchronous APIs unless you have specific concurrency requirements.

3. Pluggable Authentication with Credential Providers

SDKs support multiple authentication methods through a Credential Provider abstraction:

# client-config.yaml

# Option 1: API Key (simplest, for development/internal services)
auth:
type: api_key
api_key: ${PRISM_API_KEY}

# Option 2: OAuth2 (production, token management)
auth:
type: oauth2
token_endpoint: https://auth.example.com/token
client_id: my-app
client_secret: ${CLIENT_SECRET}
scopes: [prism.producer, prism.consumer]

# Option 3: Mutual TLS (certificate-based)
auth:
type: mtls
client_cert: /etc/prism/client.pem
client_key: /etc/prism/client-key.pem

# Option 4: Environment variable (for local dev)
auth:
type: env
token_var: PRISM_TOKEN

Credential Provider Trait (Rust):

pub trait CredentialProvider {
async fn get_credentials(&self) -> Result<Credentials>;
}

pub enum Credentials {
Bearer(String), // OAuth2 token or API key
ApiKey(String), // Simple API key
Mtls(ClientCert), // Client certificate
Custom(HashMap<String, String>), // Custom headers
}

// Built-in providers
impl CredentialProvider for OAuth2Provider { /* ... */ }
impl CredentialProvider for ApiKeyProvider { /* ... */ }
impl CredentialProvider for EnvProvider { /* ... */ }
impl CredentialProvider for MtlsProvider { /* ... */ }

OAuth2-specific handling (when using OAuth2 provider):

  • Token acquisition on first request
  • Token refresh before expiration
  • Retry with new token on 401 responses
  • Token caching with encryption

4. Full Integration Test Coverage

Every SDK must include:

  • Unit tests: Mock gRPC responses for isolated logic
  • Integration tests: Real Prism proxy + backends via testcontainers
  • End-to-end tests: Multi-pattern workflows (produce → consume)
  • Performance tests: Throughput and latency benchmarks

Target coverage:

  • Producer/Consumer: 85%+ line coverage
  • KeyValue: 85%+ line coverage
  • Auth: 90%+ line coverage
  • Config: 90%+ line coverage

5. Shared Directory Structure

All SDKs follow the same directory layout for discoverability:

prism-client-{lang}/
├── src/ # Source code
│ ├── patterns/ # Pattern implementations
│ │ ├── producer.{ext}
│ │ ├── consumer.{ext}
│ │ └── keyvalue.{ext}
│ ├── auth/ # OAuth2 client
│ ├── config/ # Configuration management
│ ├── proto/ # Generated protobuf code
│ └── client.{ext} # Main client entry point
├── tests/ # Test suite
│ ├── unit/ # Unit tests
│ ├── integration/ # Integration tests (testcontainers)
│ └── e2e/ # End-to-end tests
├── examples/ # Usage examples
└── docs/ # SDK-specific documentation

Client SDK Architecture

High-Level Components

┌────────────────────────────────────────────────────────────┐
│ Application Code │
│ │
│ producer.publish("order-123", data) │
│ message = consumer.receive() │
│ kv.set("user:42", user_data) │
└────────────────────────┬───────────────────────────────────┘

│ Pattern APIs

┌────────────────────────▼───────────────────────────────────┐
│ Prism Client SDK │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Producer │ │ Consumer │ │ KeyValue │ │
│ │ │ │ │ │ │ │
│ │ - publish() │ │ - receive() │ │ - get() │ │
│ │ - flush() │ │ - ack() │ │ - set() │ │
│ │ │ │ - nack() │ │ - delete() │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ OAuth2 │ │ Config │ │ Metrics │ │
│ │ Client │ │ Manager │ │ Tracer │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────┬───────────────────────────────────┘

│ gRPC + OAuth2 headers

┌────────────────────────▼───────────────────────────────────┐
│ Prism Proxy │
│ │
│ Pattern Layer → Backend Drivers → Backends │
└────────────────────────────────────────────────────────────┘

Core Abstractions

0. Synchronous API Design

To maximize developer ergonomics, all SDKs provide synchronous wrappers as the primary interface, with async APIs available as an advanced option.

Rust Synchronous API:

// Synchronous (recommended for most use cases)
use prism_client::Client;

fn main() -> Result<()> {
let client = Client::connect("localhost:8980")?;
let producer = client.producer("orders")?;
producer.publish("new-orders", b"order-123")?;
client.close()?;
Ok(())
}

// Async API (for high-performance servers)
use prism_client::r#async::Client as AsyncClient;

#[tokio::main]
async fn main() -> Result<()> {
let client = AsyncClient::connect("localhost:8980").await?;
let producer = client.producer("orders").await?;
producer.publish("new-orders", b"order-123").await?;
Ok(())
}

Python Synchronous API:

# Synchronous (recommended for Django, Flask, scripts)
from prism_client.sync import Client

with Client.from_file("prism.yaml") as client:
producer = client.producer("orders")
producer.publish("new-orders", b"order-123")

# Async API (for FastAPI, aiohttp)
from prism_client import Client

async with Client.from_file("prism.yaml") as client:
producer = await client.producer("orders")
await producer.publish("new-orders", b"order-123")

Go API (naturally synchronous):

// Go is synchronous by default
client, err := prism.Connect("localhost:8980")
defer client.Close()

producer := client.Producer("orders")
producer.Publish(ctx, "new-orders", []byte("order-123"))

Implementation Strategy:

The sync wrapper maintains an internal async runtime and blocks on async operations:

// Rust sync wrapper (internal implementation)
pub struct Client {
async_client: AsyncClient,
runtime: Arc<Runtime>,
}

impl Client {
pub fn producer(&self, namespace: &str) -> Result<Producer> {
self.runtime.block_on(self.async_client.producer(namespace))
}
}
# Python sync wrapper (internal implementation)
class Client:
def producer(self, namespace: str) -> Producer:
return asyncio.run(self._async_client.producer(namespace))

Benefits:

  1. Lower barrier to entry: No async knowledge required for basic use
  2. Smaller conceptual surface area: 80% of users avoid async complexity
  3. Framework compatibility: Works with Django, Flask, CLI tools
  4. No code duplication: Sync wraps async implementation

1. Client Factory

Entry point for creating pattern-specific clients:

// Rust
use prism_client::{Client, ClientConfig};

let config = ClientConfig::from_file("prism.yaml")?;
let client = Client::connect(config).await?;

let producer = client.producer("orders-namespace").await?;
let consumer = client.consumer("orders-namespace").await?;
let kv = client.keyvalue("cache-namespace").await?;
# Python
from prism_client import Client, ClientConfig

config = ClientConfig.from_file("prism.yaml")
async with Client(config) as client:
producer = await client.producer("orders-namespace")
consumer = await client.consumer("orders-namespace")
kv = await client.keyvalue("cache-namespace")
// Go
import "github.com/prism/client-go"

config, err := prism.LoadConfig("prism.yaml")
client, err := prism.Connect(config)
defer client.Close()

producer := client.Producer("orders-namespace")
consumer := client.Consumer("orders-namespace")
kv := client.KeyValue("cache-namespace")

2. Producer Pattern

Publishes messages to topics/queues:

// Rust
use prism_client::Producer;

let producer = client.producer("events").await?;

// Publish single message
producer.publish("user-events", b"user-login").await?;

// Publish with metadata
producer.publish_with_metadata(
"user-events",
b"user-login",
vec![("user_id", "alice"), ("action", "login")]
).await?;

// Batch publish
producer.publish_batch(vec![
("topic-a", b"msg1"),
("topic-a", b"msg2"),
("topic-b", b"msg3"),
]).await?;
# Python
from prism_client import Producer

producer = await client.producer("events")

# Publish single message
await producer.publish("user-events", b"user-login")

# Publish with metadata
await producer.publish(
"user-events",
b"user-login",
metadata={"user_id": "alice", "action": "login"}
)

# Batch publish
await producer.publish_batch([
("topic-a", b"msg1"),
("topic-a", b"msg2"),
("topic-b", b"msg3"),
])
// Go
producer := client.Producer("events")

// Publish single message
err := producer.Publish(ctx, "user-events", []byte("user-login"))

// Publish with metadata
err = producer.PublishWithMetadata(ctx, "user-events", []byte("user-login"),
map[string]string{"user_id": "alice", "action": "login"})

// Batch publish
err = producer.PublishBatch(ctx, []prism.Message{
{Topic: "topic-a", Payload: []byte("msg1")},
{Topic: "topic-a", Payload: []byte("msg2")},
{Topic: "topic-b", Payload: []byte("msg3")},
})

3. Consumer Pattern

Receives messages from topics/queues:

// Rust - Streaming API
use prism_client::Consumer;
use futures::StreamExt;

let consumer = client.consumer("events").await?;
let mut stream = consumer.subscribe("user-events").await?;

while let Some(msg) = stream.next().await {
match msg {
Ok(message) => {
println!("Received: {:?}", message.payload);
message.ack().await?;
}
Err(e) => eprintln!("Error: {}", e),
}
}
# Python - Async iterator
from prism_client import Consumer

consumer = await client.consumer("events")

async for message in consumer.subscribe("user-events"):
print(f"Received: {message.payload}")
await message.ack()
// Go - Channel-based
consumer := client.Consumer("events")
messages, err := consumer.Subscribe(ctx, "user-events")

for msg := range messages {
fmt.Printf("Received: %s\n", msg.Payload)
msg.Ack(ctx)
}

4. KeyValue Pattern

Simple key-value operations:

// Rust
use prism_client::KeyValue;

let kv = client.keyvalue("cache").await?;

// Set
kv.set("user:42", b"alice").await?;

// Get
if let Some(value) = kv.get("user:42").await? {
println!("User: {:?}", value);
}

// Delete
kv.delete("user:42").await?;

// Exists
if kv.exists("user:42").await? {
println!("Key exists");
}
# Python
kv = await client.keyvalue("cache")

# Set
await kv.set("user:42", b"alice")

# Get
value = await kv.get("user:42")
if value:
print(f"User: {value}")

# Delete
await kv.delete("user:42")

# Exists
if await kv.exists("user:42"):
print("Key exists")
// Go
kv := client.KeyValue("cache")

// Set
err := kv.Set(ctx, "user:42", []byte("alice"))

// Get
value, err := kv.Get(ctx, "user:42")
if value != nil {
fmt.Printf("User: %s\n", value)
}

// Delete
err = kv.Delete(ctx, "user:42")

// Exists
exists, err := kv.Exists(ctx, "user:42")

Configuration Schema

All SDKs support progressive disclosure with three configuration tiers:

Tier 1: Minimal Configuration (Hello World)

For getting started quickly with sensible defaults:

# prism-minimal.yaml
proxy:
endpoint: localhost:8980

Usage:

let client = Client::from_file("prism-minimal.yaml")?;

Tier 2: Production Configuration

For production deployments with authentication and basic tuning:

# prism-production.yaml
proxy:
endpoints:
- prism-proxy-1.example.com:8980
- prism-proxy-2.example.com:8980

auth:
type: oauth2
client_id: my-app
client_secret: ${CLIENT_SECRET}
token_endpoint: https://auth.example.com/oauth/token

Tier 3: Advanced Configuration (Full Options)

For advanced use cases with fine-grained control:

# prism-advanced.yaml
proxy:
endpoints:
- prism-proxy-1.example.com:8980
- prism-proxy-2.example.com:8980
discovery: dns # dns | api | static
tls:
enabled: true
ca_cert: /path/to/ca.pem
client_cert: /path/to/client.pem
client_key: /path/to/client-key.pem

auth:
type: oauth2 # oauth2 | mtls | token
token_endpoint: https://auth.example.com/oauth/token
client_id: my-app-id
client_secret: ${CLIENT_SECRET} # Environment variable
scopes: [prism.producer, prism.consumer, prism.keyvalue]
token_cache: ~/.prism/tokens # Cache tokens between runs

namespaces:
# Per-namespace configuration
orders:
pattern: producer
options:
max_message_size: 1MB
batch_size: 100
flush_interval: 100ms

events:
pattern: consumer
options:
consumer_group: my-app-group
auto_offset_reset: earliest
max_poll_records: 500

cache:
pattern: keyvalue
options:
default_ttl: 15m

observability:
metrics:
enabled: true
provider: prometheus # prometheus | otlp
listen_addr: :9090

tracing:
enabled: true
provider: otlp # otlp | jaeger
endpoint: http://localhost:4317
sample_rate: 0.1 # 10% sampling

logging:
level: info # debug | info | warn | error
format: json # json | text
output: stdout # stdout | file

retry:
max_attempts: 3
initial_backoff: 100ms
max_backoff: 5s
backoff_multiplier: 2.0

timeouts:
connect: 5s
request: 30s
idle: 60s

Builder Pattern API (Rust)

For programmatic configuration, Rust SDK provides a builder API:

use prism_client::Client;
use std::time::Duration;

// Minimal builder
let client = Client::builder()
.endpoint("localhost:8980")
.build()?;

// Production builder with OAuth2
let client = Client::builder()
.endpoints(&["prism-1:8980", "prism-2:8980"])
.oauth2(
"my-app-id",
std::env::var("CLIENT_SECRET")?,
"https://auth.example.com/oauth/token"
)
.namespace("orders")
.build()?;

// Advanced builder with all options
let client = Client::builder()
.endpoints(&["prism-1:8980", "prism-2:8980"])
.api_key(std::env::var("PRISM_API_KEY")?)
.tls()
.ca_cert("/etc/prism/ca.pem")
.client_cert("/etc/prism/client.pem")
.client_key("/etc/prism/client-key.pem")
.done()
.timeout(Duration::from_secs(30))
.retry()
.max_attempts(3)
.initial_backoff(Duration::from_millis(100))
.done()
.build()?;

Python equivalent (using context managers):

from prism_client import ClientBuilder

# Minimal
client = ClientBuilder() \
.endpoint("localhost:8980") \
.build()

# Production
client = ClientBuilder() \
.endpoints(["prism-1:8980", "prism-2:8980"]) \
.oauth2("my-app-id", os.environ["CLIENT_SECRET"],
"https://auth.example.com/oauth/token") \
.build()

OAuth2 Authentication Flow

All SDKs implement the OAuth2 client credentials flow:

┌─────────────┐
│ SDK Client │
└──────┬──────┘

│ 1. Load config (client_id, client_secret, token_endpoint)


┌──────────────────┐
│ OAuth2 Client │
└──────┬───────────┘

│ 2. POST /oauth/token
│ grant_type=client_credentials
│ client_id=...
│ client_secret=...
│ scope=prism.producer prism.consumer


┌──────────────────┐
│ OAuth2 Server │
└──────┬───────────┘

│ 3. Return access_token + expires_in


┌──────────────────┐
│ OAuth2 Client │
│ (Token cached) │
└──────┬───────────┘

│ 4. gRPC request with header:
│ Authorization: Bearer {access_token}


┌──────────────────┐
│ Prism Proxy │
│ (Validates JWT) │
└──────────────────┘

Token Refresh Logic:

// Rust implementation (conceptual)
impl OAuth2Client {
async fn get_token(&self) -> Result<String> {
// Check cache
if let Some(token) = self.token_cache.get().await? {
if !token.is_expired() {
return Ok(token.access_token);
}
}

// Acquire new token
let response = self.http_client
.post(&self.config.token_endpoint)
.form(&[
("grant_type", "client_credentials"),
("client_id", &self.config.client_id),
("client_secret", &self.config.client_secret),
("scope", &self.config.scopes.join(" ")),
])
.send()
.await?;

let token: TokenResponse = response.json().await?;

// Cache token
self.token_cache.set(token.clone()).await?;

Ok(token.access_token)
}
}

Error Taxonomy and Retry Semantics

All SDKs implement a structured error hierarchy to enable intelligent error handling and automatic retries.

Error Classification

// Rust error taxonomy
pub enum PrismError {
// Transient errors - safe to retry
Transient(TransientError),

// Permanent errors - do not retry
Permanent(PermanentError),

// Client-side errors - fix your code
ClientError(ClientError),
}

pub enum TransientError {
NetworkTimeout { duration: Duration },
ServiceUnavailable { retry_after: Option<Duration> },
RateLimited { retry_after: Duration },
ConnectionReset,
}

pub enum PermanentError {
Unauthorized { namespace: String, reason: String },
NotFound { resource: String },
InvalidData { field: String, reason: String },
QuotaExceeded { namespace: String },
}

pub enum ClientError {
InvalidConfig { field: String, reason: String },
InvalidArgument { param: String, reason: String },
AlreadyClosed,
}

impl PrismError {
/// Returns true if this error can be retried
pub fn is_retryable(&self) -> bool {
matches!(self, PrismError::Transient(_))
}

/// Returns the recommended retry delay, if any
pub fn retry_after(&self) -> Option<Duration> {
match self {
PrismError::Transient(TransientError::RateLimited { retry_after }) => Some(*retry_after),
PrismError::Transient(TransientError::ServiceUnavailable { retry_after }) => *retry_after,
PrismError::Transient(_) => Some(Duration::from_millis(100)), // Default backoff
_ => None,
}
}
}

Retry Configuration

Enhanced retry configuration with circuit breaker and jitter:

retry:
max_attempts: 3
initial_backoff: 100ms
max_backoff: 5s
multiplier: 2.0
jitter: 0.1 # Add ±10% randomness to prevent thundering herd

# Only retry specific errors
retryable_errors:
- NetworkTimeout
- ServiceUnavailable
- RateLimited
- ConnectionReset

# Circuit breaker configuration
circuit_breaker:
enabled: true
failure_threshold: 5 # Open after 5 consecutive failures
timeout: 30s # Stay open for 30s
half_open_requests: 3 # Test with 3 requests before closing

Usage Example

use prism_client::{Client, PrismError};

async fn publish_with_retry(producer: &Producer, topic: &str, data: &[u8]) -> Result<()> {
match producer.publish(topic, data).await {
Ok(_) => Ok(()),
Err(e) if e.is_retryable() => {
// SDK automatically retries based on retry config
// This error means all retry attempts failed
Err(e)
}
Err(PrismError::Permanent(p)) => {
// Permanent error - log and fail fast
error!("Permanent error: {:?}", p);
Err(p.into())
}
Err(PrismError::ClientError(c)) => {
// Client error - fix the code
panic!("Client error - fix your code: {:?}", c);
}
}
}

Python Example:

from prism_client import Client, PrismError, TransientError, PermanentError

async def publish_with_retry(producer, topic, data):
try:
await producer.publish(topic, data)
except TransientError as e:
# SDK already retried - all attempts failed
logger.error(f"All retry attempts failed: {e}")
raise
except PermanentError as e:
# Don't retry - permanent failure
logger.error(f"Permanent error: {e}")
raise

Automatic Retry Behavior

The SDK automatically retries transient errors with:

  1. Exponential backoff: initial_backoff * (multiplier ^ attempt)
  2. Jitter: Random variation to prevent thundering herd
  3. Max backoff cap: Never exceeds max_backoff
  4. Respect Retry-After header: For rate limiting
  5. Circuit breaker: Stops retrying dead services
// SDK internal retry implementation (conceptual)
async fn retry_with_backoff<F, T>(
operation: F,
config: &RetryConfig,
) -> Result<T>
where
F: Fn() -> Future<Output = Result<T>>,
{
let mut attempt = 0;
let mut circuit_breaker = CircuitBreaker::new(&config.circuit_breaker);

loop {
// Check circuit breaker
if circuit_breaker.is_open() {
return Err(PrismError::CircuitBreakerOpen);
}

match operation().await {
Ok(result) => {
circuit_breaker.record_success();
return Ok(result);
}
Err(e) if e.is_retryable() && attempt < config.max_attempts => {
circuit_breaker.record_failure();

let backoff = calculate_backoff(attempt, config);
let jitter = calculate_jitter(backoff, config.jitter);

sleep(backoff + jitter).await;
attempt += 1;
}
Err(e) => return Err(e),
}
}
}

Shared Directory Structure

All three SDKs follow this consistent structure:

prism-client-rust/
├── Cargo.toml # Rust dependencies
├── README.md # SDK overview and quickstart
├── LICENSE # Apache 2.0
├── .gitignore

├── src/
│ ├── lib.rs # Main library entry point
│ ├── client.rs # Client factory
│ ├── config.rs # Configuration management
│ ├── error.rs # Error types
│ │
│ ├── patterns/ # Pattern implementations
│ │ ├── mod.rs
│ │ ├── producer.rs # Producer pattern
│ │ ├── consumer.rs # Consumer pattern
│ │ └── keyvalue.rs # KeyValue pattern
│ │
│ ├── auth/ # Authentication
│ │ ├── mod.rs
│ │ ├── oauth2.rs # OAuth2 client credentials flow
│ │ └── cache.rs # Token caching
│ │
│ ├── proto/ # Generated protobuf code
│ │ ├── mod.rs
│ │ └── gen/ # Auto-generated (git-ignored)
│ │
│ └── observability/ # Metrics and tracing
│ ├── mod.rs
│ ├── metrics.rs # Prometheus metrics
│ └── tracing.rs # OpenTelemetry tracing

├── tests/
│ ├── unit/ # Unit tests
│ │ ├── producer_test.rs
│ │ ├── consumer_test.rs
│ │ └── keyvalue_test.rs
│ │
│ ├── integration/ # Integration tests with testcontainers
│ │ ├── setup.rs # Testcontainer orchestration
│ │ ├── producer_integration_test.rs
│ │ ├── consumer_integration_test.rs
│ │ └── e2e_test.rs # End-to-end workflows
│ │
│ └── fixtures/ # Test data and configs
│ ├── test-config.yaml
│ └── sample-messages.json

├── examples/ # Usage examples
│ ├── simple_producer.rs
│ ├── simple_consumer.rs
│ ├── keyvalue_cache.rs
│ └── oauth2_auth.rs

├── benches/ # Performance benchmarks
│ ├── producer_bench.rs
│ └── consumer_bench.rs

└── docs/
├── quickstart.md # Getting started guide
├── patterns.md # Pattern API reference
├── configuration.md # Config file reference
└── testing.md # Testing guide

Python Structure (prism-client-python/):

  • Replace Cargo.toml with pyproject.toml (Poetry/PDM)
  • Replace .rs with .py
  • Replace benches/ with benchmarks/
  • Add setup.py for pip installation

Go Structure (prism-client-go/):

  • Replace Cargo.toml with go.mod
  • Replace .rs with .go
  • Move tests to *_test.go files alongside source
  • Add Makefile for build/test tasks

Integration Testing Strategy

Testcontainers-Based Integration Tests

All SDKs use testcontainers to spin up real Prism infrastructure:

// Rust integration test example
#[tokio::test]
async fn test_producer_consumer_e2e() {
// Start Prism proxy + Redis backend
let containers = PrismTestContainers::start().await;

// Create client
let config = ClientConfig {
proxy_endpoints: vec![containers.proxy_endpoint()],
auth: AuthConfig::None, // Local test environment
..Default::default()
};
let client = Client::connect(config).await.unwrap();

// Producer publishes message
let producer = client.producer("test-namespace").await.unwrap();
producer.publish("test-topic", b"hello world").await.unwrap();

// Consumer receives message
let consumer = client.consumer("test-namespace").await.unwrap();
let mut stream = consumer.subscribe("test-topic").await.unwrap();

let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg.payload, b"hello world");
msg.ack().await.unwrap();

// Cleanup
containers.stop().await;
}
# Python integration test example
import pytest
from prism_client import Client, ClientConfig
from prism_testing import PrismTestContainers

@pytest.mark.asyncio
async def test_producer_consumer_e2e():
# Start Prism proxy + Redis backend
async with PrismTestContainers() as containers:
# Create client
config = ClientConfig(
proxy_endpoints=[containers.proxy_endpoint],
auth=None, # Local test
)
async with Client(config) as client:
# Producer publishes
producer = await client.producer("test-namespace")
await producer.publish("test-topic", b"hello world")

# Consumer receives
consumer = await client.consumer("test-namespace")
async for msg in consumer.subscribe("test-topic"):
assert msg.payload == b"hello world"
await msg.ack()
break
// Go integration test example
func TestProducerConsumerE2E(t *testing.T) {
// Start Prism proxy + Redis backend
containers, err := prismtest.StartContainers(t)
require.NoError(t, err)
defer containers.Stop()

// Create client
config := &prism.Config{
ProxyEndpoints: []string{containers.ProxyEndpoint()},
Auth: prism.NoAuth,
}
client, err := prism.Connect(config)
require.NoError(t, err)
defer client.Close()

ctx := context.Background()

// Producer publishes
producer := client.Producer("test-namespace")
err = producer.Publish(ctx, "test-topic", []byte("hello world"))
require.NoError(t, err)

// Consumer receives
consumer := client.Consumer("test-namespace")
messages, err := consumer.Subscribe(ctx, "test-topic")
require.NoError(t, err)

msg := <-messages
assert.Equal(t, []byte("hello world"), msg.Payload)
msg.Ack(ctx)
}

Test Coverage Requirements

Each SDK must achieve:

ComponentMinimum CoverageTest Types
Producer85%Unit + Integration
Consumer85%Unit + Integration
KeyValue85%Unit + Integration
OAuth2 Client90%Unit + Mock server
Config Parser90%Unit
Error Handling80%Unit
Retry Logic85%Unit + Integration

Coverage Tools:

  • Rust: cargo-tarpaulin
  • Python: pytest-cov
  • Go: go test -cover

Performance Benchmarks

Each SDK includes throughput and latency benchmarks:

// Rust benchmark (using criterion)
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn producer_throughput_benchmark(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let client = runtime.block_on(setup_client());
let producer = runtime.block_on(client.producer("bench")).unwrap();

c.bench_function("producer_publish_1kb", |b| {
b.to_async(&runtime).iter(|| async {
producer.publish("bench-topic", black_box(&[0u8; 1024])).await
});
});
}

criterion_group!(benches, producer_throughput_benchmark);
criterion_main!(benches);

Target Performance (single client, local proxy):

  • Producer throughput: >10,000 msg/sec (1KB payloads)
  • Consumer throughput: >8,000 msg/sec
  • KeyValue GET latency: <1ms p99
  • KeyValue SET latency: <2ms p99

Language-Specific Implementation Details

Rust SDK

Dependencies (Cargo.toml):

[dependencies]
# Minimal core dependencies (required)
tokio = { version = "1", features = ["rt-multi-thread", "net", "sync"] }
tonic = "0.11"
prost = "0.12"
serde = { version = "1", features = ["derive"] }

# Optional dependencies (feature-gated)
serde_yaml = { version = "0.9", optional = true }
reqwest = { version = "0.11", features = ["json"], optional = true }
opentelemetry = { version = "0.22", optional = true }
prometheus = { version = "0.13", optional = true }
tracing = { version = "0.1", optional = true }
futures = { version = "0.3", optional = true }

[features]
default = ["config-files", "auth-oauth2"]

# Core features
config-files = ["serde_yaml"]
auth-oauth2 = ["reqwest"]
auth-api-key = []
streaming = ["futures"]

# Observability features (opt-in)
metrics = ["prometheus"]
tracing = ["opentelemetry", "dep:tracing"]
observability = ["metrics", "tracing"]

# Convenience feature for production
full = ["config-files", "auth-oauth2", "streaming", "observability"]

[dev-dependencies]
testcontainers = "0.15"
criterion = "0.5"
tokio-test = "0.4"

[build-dependencies]
tonic-build = "0.11"

Minimal binary size (with default features only):

  • Core SDK: ~2MB
  • Full SDK: ~8MB

Dependency count (transitive):

  • Minimal: 4 crates → ~15 total
  • Full: 10 crates → ~60 total

Usage:

# Minimal (default features)
[dependencies]
prism-client = "0.1"

# With observability
[dependencies]
prism-client = { version = "0.1", features = ["observability"] }

# Full features
[dependencies]
prism-client = { version = "0.1", features = ["full"] }

Code Generation (build.rs):

fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false) // Client-only
.compile(
&[
"../../proto/prism/interfaces/pubsub/pubsub_basic.proto",
"../../proto/prism/interfaces/keyvalue/keyvalue_basic.proto",
],
&["../../proto"],
)?;
Ok(())
}

Async Streams:

// Consumer uses Stream trait
pub struct Consumer {
client: PubSubBasicInterfaceClient<Channel>,
}

impl Consumer {
pub async fn subscribe(&self, topic: &str)
-> Result<impl Stream<Item = Result<Message>>>
{
let request = SubscribeRequest {
topic: topic.to_string(),
subscriber_id: uuid::Uuid::new_v4().to_string(),
options: Default::default(),
};

let response = self.client.subscribe(request).await?;
Ok(response.into_inner())
}
}

Python SDK

Dependencies (pyproject.toml with Poetry):

[tool.poetry]
name = "prism-client"
version = "0.1.0"
description = "Prism data access client SDK for Python"

[tool.poetry.dependencies]
python = "^3.10"
grpcio = "^1.59"
grpcio-tools = "^1.59"
protobuf = "^4.25"
pyyaml = "^6.0"
httpx = "^0.25" # Async HTTP for OAuth2
pydantic = "^2.5"
opentelemetry-api = "^1.21"
prometheus-client = "^0.19"

[tool.poetry.dev-dependencies]
pytest = "^7.4"
pytest-asyncio = "^0.21"
pytest-cov = "^4.1"
testcontainers = "^3.7"
black = "^23.11"
mypy = "^1.7"
ruff = "^0.1"

Code Generation:

# Generate protobuf stubs
python -m grpc_tools.protoc \
-I../../proto \
--python_out=src/prism_client/proto \
--grpc_python_out=src/prism_client/proto \
--pyi_out=src/prism_client/proto \
prism/interfaces/pubsub/pubsub_basic.proto \
prism/interfaces/keyvalue/keyvalue_basic.proto

Async Iterators:

# Consumer uses async iterator
class Consumer:
def __init__(self, client: PubSubBasicInterfaceStub):
self._client = client

async def subscribe(self, topic: str) -> AsyncIterator[Message]:
request = SubscribeRequest(
topic=topic,
subscriber_id=str(uuid.uuid4()),
)

async for message in self._client.Subscribe(request):
yield message

Go SDK

Dependencies (go.mod):

module github.com/prism/client-go

go 1.21

require (
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
golang.org/x/oauth2 v0.15.0
go.opentelemetry.io/otel v1.21.0
github.com/prometheus/client_golang v1.17.0
github.com/testcontainers/testcontainers-go v0.26.0
)

Code Generation (gen.go):

//go:generate protoc --go_out=. --go_opt=paths=source_relative \
// --go-grpc_out=. --go-grpc_opt=paths=source_relative \
// -I../../proto \
// prism/interfaces/pubsub/pubsub_basic.proto \
// prism/interfaces/keyvalue/keyvalue_basic.proto

Channel-Based Consumer:

// Consumer returns channel for messages
type Consumer struct {
client pb.PubSubBasicInterfaceClient
}

func (c *Consumer) Subscribe(ctx context.Context, topic string) (<-chan Message, error) {
stream, err := c.client.Subscribe(ctx, &pb.SubscribeRequest{
Topic: topic,
SubscriberId: uuid.New().String(),
})
if err != nil {
return nil, err
}

messages := make(chan Message, 100)
go func() {
defer close(messages)
for {
msg, err := stream.Recv()
if err != nil {
return
}
messages <- Message{Payload: msg.Payload, /* ... */}
}
}()

return messages, nil
}

Example Usage Patterns

Example 1: Simple Producer

// Rust
use prism_client::{Client, ClientConfig};

#[tokio::main]
async fn main() -> Result<()> {
let config = ClientConfig::from_file("prism.yaml")?;
let client = Client::connect(config).await?;

let producer = client.producer("orders").await?;

for i in 0..1000 {
let order = format!("order-{}", i);
producer.publish("new-orders", order.as_bytes()).await?;
}

Ok(())
}
# Python
import asyncio
from prism_client import Client, ClientConfig

async def main():
config = ClientConfig.from_file("prism.yaml")
async with Client(config) as client:
producer = await client.producer("orders")

for i in range(1000):
order = f"order-{i}"
await producer.publish("new-orders", order.encode())

asyncio.run(main())
// Go
package main

import (
"context"
"fmt"
prism "github.com/prism/client-go"
)

func main() {
config, _ := prism.LoadConfig("prism.yaml")
client, _ := prism.Connect(config)
defer client.Close()

producer := client.Producer("orders")
ctx := context.Background()

for i := 0; i < 1000; i++ {
order := fmt.Sprintf("order-%d", i)
producer.Publish(ctx, "new-orders", []byte(order))
}
}

Example 2: Consumer with Error Handling

// Rust
use prism_client::{Client, ClientConfig};
use futures::StreamExt;
use tracing::error;

#[tokio::main]
async fn main() -> Result<()> {
let client = Client::connect(ClientConfig::from_file("prism.yaml")?).await?;
let consumer = client.consumer("orders").await?;
let mut stream = consumer.subscribe("new-orders").await?;

while let Some(result) = stream.next().await {
match result {
Ok(message) => {
match process_order(&message.payload).await {
Ok(_) => message.ack().await?,
Err(e) => {
error!("Failed to process order: {}", e);
message.nack().await?; // Requeue for retry
}
}
}
Err(e) => error!("Stream error: {}", e),
}
}

Ok(())
}

async fn process_order(data: &[u8]) -> Result<()> {
// Business logic
Ok(())
}
# Python
import asyncio
import logging
from prism_client import Client, ClientConfig

async def process_order(data: bytes):
# Business logic
pass

async def main():
config = ClientConfig.from_file("prism.yaml")
async with Client(config) as client:
consumer = await client.consumer("orders")

async for message in consumer.subscribe("new-orders"):
try:
await process_order(message.payload)
await message.ack()
except Exception as e:
logging.error(f"Failed to process: {e}")
await message.nack() # Requeue

asyncio.run(main())

Example 3: KeyValue with TTL

// Rust
use prism_client::{Client, ClientConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
let client = Client::connect(ClientConfig::from_file("prism.yaml")?).await?;
let kv = client.keyvalue("cache").await?;

// Set with default namespace TTL (from config)
kv.set("session:alice", b"session-data").await?;

// Set with custom TTL (if backend supports it via extensions)
kv.set_with_ttl("temp:data", b"temporary", Duration::from_secs(300)).await?;

// Get
if let Some(data) = kv.get("session:alice").await? {
println!("Session: {:?}", data);
}

Ok(())
}

Observability Integration

All SDKs include built-in observability:

Metrics (Prometheus)

Auto-instrumented metrics:

# Producer metrics
prism_client_producer_publish_total{namespace="orders",topic="new-orders"} 12345
prism_client_producer_publish_errors_total{namespace="orders",error="timeout"} 42
prism_client_producer_publish_duration_seconds{namespace="orders",quantile="0.99"} 0.05

# Consumer metrics
prism_client_consumer_messages_received_total{namespace="orders",topic="new-orders"} 12300
prism_client_consumer_messages_acked_total{namespace="orders"} 12250
prism_client_consumer_messages_nacked_total{namespace="orders"} 50
prism_client_consumer_lag{namespace="orders",topic="new-orders"} 45

# KeyValue metrics
prism_client_keyvalue_get_total{namespace="cache"} 98765
prism_client_keyvalue_set_total{namespace="cache"} 54321
prism_client_keyvalue_get_duration_seconds{namespace="cache",quantile="0.99"} 0.001

# Auth metrics
prism_client_oauth2_token_refreshes_total 23
prism_client_oauth2_token_errors_total 1

Tracing (OpenTelemetry)

Automatic span creation for all operations:

Trace: order-processing-workflow
├─ Span: producer.publish (orders/new-orders)
│ ├─ oauth2.get_token (cached)
│ └─ grpc.call (PubSubBasicInterface/Publish)

└─ Span: consumer.receive (orders/new-orders)
├─ grpc.call (PubSubBasicInterface/Subscribe)
├─ business_logic.process_order
└─ consumer.ack

Structured Logging

JSON-formatted logs with context:

{
"timestamp": "2025-10-19T10:30:45Z",
"level": "info",
"message": "Published message",
"namespace": "orders",
"topic": "new-orders",
"message_id": "msg-abc-123",
"duration_ms": 12,
"trace_id": "550e8400-e29b-41d4-a716-446655440000"
}

Backpressure and Flow Control

Consumers must handle backpressure when processing messages slower than they arrive.

Overflow Strategies

namespaces:
events:
pattern: consumer
options:
buffer_size: 1000
overflow_strategy: block # block | drop-oldest | drop-newest | error
block_timeout: 30s # For 'block' strategy

Strategy Details:

  1. block: Wait for consumer to process messages (default, no data loss)

    • Blocks producer until buffer has space
    • Use block_timeout to prevent deadlock
    • Best for critical data
  2. drop-oldest: Discard oldest messages when buffer full

    • Maintains most recent data
    • Use for real-time metrics where staleness matters
  3. drop-newest: Discard incoming messages when buffer full

    • Preserves historical data
    • Use for audit logs where completeness matters
  4. error: Return error when buffer full

    • Explicit failure for application to handle
    • Use when data loss is unacceptable but blocking is risky

Rust Example:

let consumer = client.consumer("events").await?
.with_buffer_size(1000)
.with_overflow_strategy(OverflowStrategy::Block { timeout: Duration::from_secs(30) })
.build()?;

let mut stream = consumer.subscribe("high-volume-topic").await?;

// Process with backpressure
while let Some(msg) = stream.next().await {
// Slow processing blocks upstream (with timeout)
process_message(&msg).await?;
msg.ack().await?;
}

Cancellation Semantics

What happens when operations are cancelled:

  1. Consumer stream dropped mid-iteration:

    • All unacknowledged messages are automatically NACKed
    • Messages return to queue for redelivery
    • Connection closed gracefully
  2. Producer publish timeout:

    • Operation fails with TransientError::NetworkTimeout
    • No partial writes (message either fully sent or not sent)
    • Safe to retry
  3. Client closed with pending operations:

    • All in-flight operations are cancelled
    • Unacknowledged messages are NACKed
    • Connections closed with grace period (configurable)
// Explicit timeout handling
use tokio::time::timeout;

let result = timeout(
Duration::from_secs(5),
producer.publish("topic", data)
).await;

match result {
Ok(Ok(_)) => println!("Published successfully"),
Ok(Err(e)) => println!("Publish failed: {}", e),
Err(_) => println!("Publish timed out after 5s"),
}

Security Considerations

1. Credential Management

Never hardcode secrets in config files:

# ❌ Bad: Hardcoded secret
auth:
client_secret: super-secret-password

# ✅ Good: Environment variable
auth:
client_secret: ${CLIENT_SECRET}

# ✅ Good: Secret management integration
auth:
client_secret: vault://secret/prism/client-secret

2. Secure Defaults

TLS enabled by default with explicit opt-out required:

// TLS is mandatory unless explicitly disabled
let client = Client::builder()
.endpoint("localhost:8980")
.insecure() // Must explicitly call this for non-TLS
.build()?;
# YAML config - TLS enabled by default
proxy:
endpoint: prism.example.com:8980
tls:
enabled: true # Default
ca_cert: /etc/prism/ca.pem

# Optional: mTLS for extra security
client_cert: /etc/prism/client.pem
client_key: /etc/prism/client-key.pem

3. Secret Handling Best Practices

SDKs implement secure secret handling:

use zeroize::Zeroize;

#[derive(Zeroize)]
#[zeroize(drop)]
struct ClientSecret(String); // Auto-zeroed on drop

impl Debug for ClientSecret {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "ClientSecret(***)") // Never log plaintext
}
}

// Error messages never include secrets
impl Error for PrismError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
PrismError::Unauthorized { namespace } => {
// ✅ Good: No token in error
write!(f, "Unauthorized for namespace: {}", namespace)
}
// ❌ Never do this: write!(f, "Invalid token: {}", token)
}
}
}

4. Token Expiration and Caching

SDKs must handle token expiration gracefully with secure caching:

auth:
type: oauth2
token_cache:
path: ~/.prism/tokens
permissions: 0600 # Enforced by SDK
encryption: true # Encrypt with system keychain
max_age: 24h # Evict old tokens

Token refresh logic:

  1. Cache tokens with expiration metadata
  2. Refresh tokens 60s before expiration
  3. Retry failed requests with fresh token
  4. Never log tokens in plaintext
  5. Encrypt token cache at rest

5. Namespace Isolation and Authorization

Clients are restricted to namespaces they have access to:

# OAuth2 scopes encode namespace permissions
auth:
scopes:
- prism.producer:orders # Publish to "orders" namespace
- prism.consumer:events # Subscribe to "events" namespace
- prism.keyvalue:cache # Read/write "cache" namespace

Fail-fast on unauthorized namespace:

// SDK validates namespace against token scopes
let producer = client.producer("unauthorized-ns").await;
// Error: Forbidden - namespace 'unauthorized-ns' not in token scopes
// Available namespaces: [orders, events, cache]

6. Dependency Security

All SDKs include security scanning in CI:

# .github/workflows/security.yml
- name: Audit Rust dependencies
run: cargo audit

- name: Audit Python dependencies
run: pip-audit

- name: Scan Go dependencies
run: govulncheck ./...

Dependency pinning:

  • Lock files committed to repository
  • Automated dependency updates via Dependabot
  • Security advisories trigger immediate updates

Migration Guide

From Direct Backend Clients

Before (direct Kafka):

from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('orders', {'order_id': 123})

After (Prism SDK):

from prism_client import Client

async with Client.from_file("prism.yaml") as client:
producer = await client.producer("orders")
await producer.publish("orders", b'{"order_id": 123}')

Benefits:

  • No Kafka-specific knowledge required
  • OAuth2 auth handled automatically
  • Platform can migrate from Kafka → NATS without code changes
  • Built-in observability and retry logic

Implementation Roadmap

Phase 0: Simplification (Week 1-2) NEW

Goal: Build the simplest possible working SDK with minimal dependencies.

Deliverables:

  • Minimal Rust SDK with 4 core dependencies (tokio, tonic, prost, serde)
  • Synchronous-first API (sync wrappers around async core)
  • API key authentication only (no OAuth2 yet)
  • Producer pattern only (no Consumer or KeyValue)
  • No observability features
  • Working hello-world example in <10 lines of code

Success Criteria:

  • New user can publish a message in <5 minutes

  • Binary size <2MB

  • No configuration file required (programmatic API)

  • Example code:

    let client = Client::connect("localhost:8980")?;
    let producer = client.producer("orders")?;
    producer.publish("new-orders", b"order-123")?;

Target Metrics:

  • Time to first publish: <5 minutes
  • Lines of code: <10
  • Dependencies: 4 crates
  • Binary size: ~2MB

Phase 1: Foundation (Week 3-4) REVISED

Deliverables:

  • Add OAuth2 authentication as optional feature
  • Add Consumer and KeyValue patterns
  • YAML configuration support (optional, via feature flag)
  • Basic error handling with error taxonomy
  • Unit tests (no integration tests yet)

Success Criteria:

  • All three patterns (Producer, Consumer, KeyValue) working
  • OAuth2 token acquisition and refresh
  • Structured error types with retry semantics
  • 80%+ unit test coverage

Phase 2: Production-Ready (Week 5-6) REVISED

Deliverables:

  • Integration tests with testcontainers
  • Retry logic with circuit breaker and jitter
  • TLS/mTLS support with secure defaults
  • Backpressure handling with overflow strategies
  • Cancellation semantics documented and tested
  • 85%+ coverage across all patterns

Success Criteria:

  • Integration test suite passes
  • All retry strategies tested
  • TLS required by default (explicit opt-out)
  • Backpressure strategies verified under load

Phase 3: Observability (Week 7-8) NEW

Deliverables:

  • Optional metrics feature (Prometheus)
  • Optional tracing feature (OpenTelemetry)
  • Performance benchmarks
  • Middleware pattern for observability

Success Criteria:

  • Metrics work when feature enabled
  • Tracing integrates with OTLP
  • Minimal SDK unaffected by observability features
  • Benchmark baseline established

Phase 4: Documentation & Polish (Week 9-10) REVISED

Deliverables:

  • Complete API reference documentation
  • Quickstart guides (sync and async versions)
  • Migration guides from direct backend clients
  • Security hardening guide
  • Example applications

Success Criteria:

  • New user can publish first message in <5 minutes
  • API documentation complete for all three languages
  • Migration guide validates against real Kafka/Redis code
  • Security guide covers all best practices

Implementation Learnings (Rust SDK)

This section documents practical discoveries from implementing the Rust client SDK POC, including gRPC integration, TTL support, and testing infrastructure.

TTL Pattern Implementation (2025-10-19)

Context: Added Time-To-Live (TTL) support to KeyValue pattern, enabling key expiration for cache and session management use cases.

Key Discoveries:

  1. Two-Phase TTL Architecture

    • Pattern: set_with_ttl(key, value, ttl_seconds) → two gRPC calls
    • Phase 1: Set value using KeyValueBasicInterface.Set()
    • Phase 2: Set expiration using KeyValueTTLInterface.Expire()
    • Rationale: Interface decomposition (MEMO-006) separates basic operations from TTL capabilities
    • Benefit: Backends that don't support TTL can still implement KeyValueBasicInterface
  2. Pattern-Level API Simplification

    • Client code: kv.set_with_ttl("session:42", data, 300) (single method call)
    • SDK handles: namespace prefixing, two gRPC calls, error aggregation
    • Developer sees: simple API, SDK manages complexity
    • Lesson: Pattern-level abstractions hide gRPC interface details effectively
  3. Optional Interface Handling

    • KeyValueTTLInterfaceClient stored as Option<> in AsyncKeyValue struct
    • Returns PermanentError::InvalidData if TTL interface unavailable
    • Allows graceful degradation when backend doesn't support TTL
    • Lesson: Optional capabilities should fail fast with clear error messages
  4. Test Performance Optimization

    • Before: 5s TTL + 7s wait = 12s per test × 3 backends = 36s total
    • After: 2s TTL + 4s wait (2s buffer) = 6s per test × 3 backends = 18s total
    • Improvement: 50% faster test execution (18s vs 36s)
    • Lesson: Minimize TTL values in tests while maintaining reliability (2s buffer proven sufficient)
  5. Integration Test Structure

    • Test lifecycle: Set with TTL → Verify immediate existence → Wait for expiration → Verify absence
    • Uses #[ignore] attribute for tests requiring live proxy
    • Comprehensive validation: set_with_ttl(), exists(), get(), delete()
    • Lesson: Integration tests should validate full operation lifecycle, not just success cases
  6. Namespace-Aware Key Management

    • Keys automatically prefixed: {namespace}/{key} in gRPC calls
    • Client API: kv.set("user:42", data) (no namespace in key)
    • gRPC call: key: "cache/user:42" (namespace prefix added)
    • Lesson: Namespace abstraction simplifies multi-tenancy for application developers
  7. Error Handling Patterns

    • Transient errors: gRPC status codes (Unavailable, DeadlineExceeded) → retry
    • Permanent errors: Invalid data, missing TTL interface → fail immediately
    • Error context: Include namespace, key, operation in error messages
    • Lesson: Error taxonomy (Transient/Permanent/Client) guides retry logic

Code References:

  • clients/rust/prism-client/build.rs - Added keyvalue_ttl.proto compilation
  • clients/rust/prism-client/src/patterns/keyvalue.rs:71-101 - TTL implementation
  • clients/rust/prism-client/tests/integration_test.rs:245-279 - TTL integration test
  • tests/interface-suites/keyvalue_ttl/suite.go - Reusable TTL test suite

Impact: Demonstrates pattern-level API effectiveness. Single method call (set_with_ttl) hides two-phase gRPC complexity. Validates interface decomposition architecture (MEMO-006).

gRPC Integration Patterns

Tonic Build Configuration:

// build.rs
tonic_build::configure()
.build_server(false) // Client-only
.compile(
&[
"proto/prism/common/types.proto",
"proto/prism/interfaces/keyvalue/keyvalue_basic.proto",
"proto/prism/interfaces/keyvalue/keyvalue_ttl.proto",
"proto/prism/interfaces/pubsub/pubsub_basic.proto",
],
&["proto"],
)?;

Key Learnings:

  • Proto files compiled at build time (generates Rust code in target/)
  • Path structure: proto/prism/interfaces/{pattern}/{interface}.proto
  • Generated clients: {Interface}Client<Channel> structs
  • Multiple interfaces → multiple client instances in pattern struct

Pattern Struct Design:

pub struct AsyncKeyValue {
namespace: String,
client: KeyValueBasicInterfaceClient<Channel>,
ttl_client: Option<KeyValueTTLInterfaceClient<Channel>>,
}

Learnings:

  • One gRPC channel shared by multiple interface clients (multiplexing)
  • Optional interfaces modeled as Option<> types
  • Namespace stored in struct, not passed per-call
  • Synchronous wrapper (KeyValue) holds Arc<Runtime> + AsyncKeyValue

Test Infrastructure

Integration Test Discovery:

  • Proxy availability check: TcpStream::connect() with 1s timeout
  • Skip tests gracefully if proxy not available (avoid false failures in CI)
  • Comprehensive test coverage: Basic ops, TTL, batch operations, error cases
  • Docker Compose support: docker-compose.test.yml for local testing

Backend Test Optimization:

  • Reduced test durations by 30% (24s → 17s across MemStore + Redis)
  • Standardized timing: 2s TTL + 2s buffer (4s total wait)
  • Applied consistently: Interface suites, acceptance tests, integration tests
  • Lesson: Test performance matters for developer experience (faster iteration)

Open Questions

  1. Schema Registry Integration: Should SDKs support schema validation before publish?

    • Proposal: Phase 2 feature, optional schema validation via protobuf descriptors
  2. Connection Pooling: How many gRPC connections per client instance?

    • Proposal: Single connection per endpoint with multiplexing
    • Evidence: TTL implementation confirmed single channel works for multiple interface clients
  3. Backpressure: How to handle slow consumers?

    • Proposal: Configurable buffer sizes + drop-oldest strategy
  4. Multi-Region: Should SDKs support automatic region failover?

    • Proposal: Phase 3, opt-in via discovery: geo-dns
  5. Batching: Auto-batching for producers?

    • Proposal: Configurable batching with flush interval + size threshold
  6. Optional Interface Discovery: Should clients query available interfaces at connection time?

    • Proposal: Fail-fast on first use with clear error message (current TTL approach)
    • Alternative: Capabilities RPC during client initialization (overhead vs clarity trade-off)

Appendix A: Hello-World Examples

Minimal Hello-World (Phase 0)

Goal: New user publishes first message in <5 minutes with <10 lines of code.

Rust (Synchronous)

// hello.rs
use prism_client::Client;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::connect("localhost:8980")?;
let producer = client.producer("orders")?;
producer.publish("new-orders", b"order-123")?;
println!("Published!");
Ok(())
}

Run:

cargo add prism-client
cargo run
# Output: Published!

Python (Synchronous)

# hello.py
from prism_client.sync import Client

with Client("localhost:8980", api_key="dev-key") as client:
producer = client.producer("orders")
producer.publish("new-orders", b"order-123")
print("Published!")

Run:

pip install prism-client
python hello.py
# Output: Published!

Go (Naturally Synchronous)

// hello.go
package main

import (
"fmt"
prism "github.com/prism/client-go"
)

func main() {
client, _ := prism.Connect("localhost:8980")
defer client.Close()

producer := client.Producer("orders")
producer.Publish(ctx, "new-orders", []byte("order-123"))
fmt.Println("Published!")
}

Run:

go get github.com/prism/client-go
go run hello.go
# Output: Published!

Comparison: Before vs. After

Current RFC (Before Revision):

  • Requires YAML config file (15+ lines)
  • Requires OAuth2 setup (client ID, secret, token endpoint)
  • Requires async boilerplate (asyncio.run, async with)
  • ~25 lines of code to publish one message

Revised RFC (After Phase 0):

  • No config file required (programmatic API)
  • API key authentication (or no auth for local dev)
  • Synchronous by default (no async required)
  • 6 lines of code to publish one message

Production Hello-World (Phase 1+)

With OAuth2 and configuration file:

# hello_production.py
from prism_client.sync import Client

with Client.from_file("prism.yaml") as client:
producer = client.producer("orders")
producer.publish("new-orders", b"order-123")
print("Published!")
# prism.yaml
proxy:
endpoints: [prism-1:8980, prism-2:8980]
auth:
type: oauth2
client_id: my-app
client_secret: ${CLIENT_SECRET}
token_endpoint: https://auth.example.com/oauth/token

Still simple: 6 lines of Python + config file (OAuth2 handled automatically)

Revision History

  • 2025-10-19: Initial draft covering Rust, Python, Go SDKs with OAuth2, integration testing, and shared structure
  • 2025-10-19: Major revision implementing MEMO-037 recommendations:
    • Added synchronous API layer as default
    • Added credential provider abstraction (OAuth2, API key, mTLS, env)
    • Added error taxonomy and retry semantics with circuit breaker
    • Added progressive disclosure configuration (3 tiers)
    • Added Rust builder pattern API
    • Made observability optional via feature flags
    • Added security hardening section
    • Added backpressure and flow control strategies
    • Added Phase 0 (Simplification) to implementation roadmap
    • Added hello-world examples appendix
  • 2025-10-19: Added Implementation Learnings section:
    • Documented TTL pattern implementation (two-phase gRPC architecture)
    • Added gRPC integration patterns with tonic-build
    • Documented test infrastructure and performance optimization (30% improvement)
    • Added practical discoveries from Rust SDK POC implementation
    • Updated Open Questions with evidence from TTL implementation
    • Added new question about optional interface discovery
    • Updated success criteria: <5 min to first publish, <10 lines of code