Backend Configuration Registry
Note: This RFC is consolidated into RFC-056: Unified Configuration Model as Layer 4 (Backend Binding) and Layer 5 (Frontend Exposure). This document remains as a detailed technical reference for the backend and frontend registry implementation.
Abstract
This RFC defines how Prism manages backend configurations and frontend interface bindings as flat, globally-shared registries administered by the admin control plane and distributed to proxies and pattern runners at startup.
Backend configurations are uniquely named resources that encapsulate connection details, capabilities, and operational metadata, enabling pattern runners to bind their slot implementations to specific backends without embedding connection details in pattern code.
Frontend interface bindings are uniquely named resources that define how external clients interact with patterns via REST, GraphQL, or other protocols, in addition to the default gRPC pattern interfaces. Frontends enable protocol translation (e.g., HTTP → gRPC) and API compatibility layers (e.g., Confluent Schema Registry REST API → Registry pattern).
Motivation
Current Problem: Backend Slot Binding
Pattern implementations need to connect to backends (Kafka, NATS, PostgreSQL, etc.) to fulfill their slot requirements, but there's no standardized way to:
- Centrally manage backend configurations: Connection strings, credentials, and settings are scattered across pattern definitions
- Share backends across patterns: Multiple patterns that need "Kafka" have to independently configure the same backend
- Map slots to backends at runtime: Pattern runners don't have a clean way to discover "which Kafka instance should I use for the messaging slot?"
- Administer backends globally: Operators need to add/remove/update backend configurations for the entire cluster
- Separate concerns: Pattern authors shouldn't need to know connection details—they should only declare "I need a messaging backend"
Current Problem: Frontend Interface Exposure
Pattern implementations expose gRPC interfaces by default, but external clients often need REST, GraphQL, or compatibility with existing APIs (e.g., Confluent Schema Registry). There's no standardized way to:
- Map REST routes to pattern operations: No declarative way to expose pattern RPCs as REST endpoints
- Support multiple API protocols: Clients need REST, SSE, WebSocket, GraphQL—not just gRPC
- Provide API compatibility layers: Can't easily expose registry pattern as Confluent Schema Registry API
- Manage frontend configurations globally: REST route mappings, OpenAPI specs, and protocol configs are scattered
- Disable default gRPC interface: Some patterns may only want REST exposure, not gRPC
Example Scenario
Consider three patterns in different namespaces:
- order-processing: Uses
multicast-registrypattern (needs: registry slot → PostgreSQL, messaging slot → Kafka) - user-notifications: Uses
durable-queuepattern (needs: queue slot → Kafka, durability slot → PostgreSQL) - analytics-ingest: Uses
event-streampattern (needs: streaming slot → Kafka)
Without this RFC:
- Each pattern embeds Kafka connection string
kafka-prod.example.com:9092in its config - Changing Kafka requires updating all 3 pattern configurations
- No visibility into which patterns use which backends
- Pattern runners must parse arbitrary config formats
With this RFC:
- Admin defines global backend:
kafka-prodwith connection details - Patterns reference backend by name:
messaging_slot: kafka-prod - Pattern runners fetch
kafka-prodconfig at startup and bind slots - Changing Kafka means updating one backend config—all patterns automatically use new settings
Goals
Backend Registry Goals
- Flat backend registry: All backends have globally unique names (e.g.,
kafka-prod,postgres-primary,nats-dev) - Admin-managed: Backends are created/updated/deleted via admin control plane APIs
- Distributed to runners: Pattern runners fetch backend configs at startup
- Typed configurations: Each backend type (Kafka, PostgreSQL, Redis) has a structured config schema
- Slot binding: Pattern runners map slot names → backend names → connection configs
- Capability metadata: Backends declare what operations they support for slot selection
- Slot schema integration: Pattern definitions declare required/optional backend interfaces per slot (see MEMO-006)
Frontend Registry Goals
- Flat frontend registry: All frontends have globally unique names (e.g.,
confluent-schema-registry-rest,graphql-api) - Admin-managed: Frontends are created/updated/deleted via admin control plane APIs
- Distributed to proxies: Proxies fetch frontend configs at startup
- Typed configurations: Each frontend type (REST, GraphQL, gRPC) has a structured config schema
- Route mapping: Proxies map REST routes → pattern RPC operations using OpenAPI-style route definitions
- Default gRPC: Pattern gRPC interfaces are enabled by default but can be disabled per-namespace
- Protocol translation: Proxies handle HTTP → gRPC, SSE → streaming gRPC, etc.
Non-Goals
Backend Non-Goals
- Dynamic backend switching: Patterns bind backends at startup; runtime changes require restart
- Per-namespace backends: All backends are global; namespaces reference shared backends
- Automatic slot matching: Patterns explicitly configure which backend fills which slot
- Backend health monitoring: This RFC focuses on config distribution, not runtime health checks
- Multi-region backends: Initial implementation assumes single-region; multi-region is future work
Frontend Non-Goals
- Full OpenAPI code generation: We use OpenAPI semantics for route mapping, not full codegen
- GraphQL schema stitching: Initial implementation focuses on REST; GraphQL is future work
- WebSocket protocol translation: SSE for streaming; full WebSocket support is future
- Per-namespace frontend overrides: All frontends are global; namespaces opt-in to frontends
- Frontend load balancing: Proxy handles routing; external load balancer handles HA
Design
Backend Registry Model
Backend Resource
// Backend represents a globally-unique backend configuration
message Backend {
string name = 1; // Globally unique (e.g., "kafka-prod")
BackendType type = 2; // Enum: KAFKA, NATS, POSTGRES, REDIS, etc.
BackendConfig config = 3; // Type-specific configuration
repeated string capabilities = 4; // Slot operations supported (e.g., ["publish", "subscribe"])
map<string, string> metadata = 5; // Operator annotations (region, env, owner)
int64 created_at = 6;
int64 updated_at = 7;
string created_by = 8;
}
enum BackendType {
BACKEND_TYPE_UNSPECIFIED = 0;
BACKEND_TYPE_KAFKA = 1;
BACKEND_TYPE_NATS = 2;
BACKEND_TYPE_POSTGRES = 3;
BACKEND_TYPE_REDIS = 4;
BACKEND_TYPE_SQLITE = 5;
BACKEND_TYPE_S3 = 6;
BACKEND_TYPE_MEMSTORE = 7;
// Add more as needed
}
// BackendConfig is a oneof containing type-specific configs
message BackendConfig {
oneof config {
KafkaConfig kafka = 1;
NatsConfig nats = 2;
PostgresConfig postgres = 3;
RedisConfig redis = 4;
SqliteConfig sqlite = 5;
S3Config s3 = 6;
MemStoreConfig memstore = 7;
}
}
Type-Specific Configs
message KafkaConfig {
repeated string brokers = 1; // ["kafka-01:9092", "kafka-02:9092"]
string security_protocol = 2; // "PLAINTEXT", "SASL_SSL"
KafkaAuth auth = 3;
int32 connection_timeout_ms = 4;
int32 session_timeout_ms = 5;
}
message KafkaAuth {
oneof auth {
KafkaSASL sasl = 1;
KafkaMTLS mtls = 2;
}
}
message NatsConfig {
repeated string servers = 1; // ["nats://nats-01:4222"]
string credentials_file = 2; // Path to NATS creds file
bool tls_enabled = 3;
int32 connection_timeout_ms = 4;
}
message PostgresConfig {
string host = 1; // "postgres.example.com"
int32 port = 2; // 5432
string database = 3;
string user = 4;
PostgresAuth auth = 5;
int32 max_connections = 6;
bool ssl_mode = 7;
}
message RedisConfig {
string host = 1;
int32 port = 2;
int32 database = 3; // Redis DB number (0-15)
RedisAuth auth = 4;
bool tls_enabled = 5;
int32 pool_size = 6;
}
message SqliteConfig {
string path = 1; // File path or ":memory:"
bool wal_enabled = 2;
int32 cache_size_kb = 3;
}
message S3Config {
string endpoint = 1; // "s3.amazonaws.com" or MinIO endpoint
string region = 2;
string bucket = 3;
S3Auth auth = 4;
bool path_style = 5; // For MinIO compatibility
}
message MemStoreConfig {
int64 max_size_bytes = 1; // Memory limit
int32 eviction_policy = 2; // LRU, LFU, etc.
}
Admin Control Plane Integration
New Admin State
Add backends to the admin Raft FSM state:
message AdminState {
int32 version = 1;
map<string, NamespaceEntry> namespaces = 2;
map<string, ProxyEntry> proxies = 3;
map<string, LauncherEntry> launchers = 4;
map<string, PatternEntry> patterns = 5;
// NEW: Backend registry
map<string, BackendEntry> backends = 6; // key = backend name
int64 last_applied_index = 10;
int64 last_applied_term = 11;
int64 state_updated_at = 12;
}
message BackendEntry {
Backend backend = 1; // Full backend config
string status = 2; // "active", "deprecated", "disabled"
int64 registered_at = 3;
int64 updated_at = 4;
}
New Admin Commands
enum CommandType {
// ... existing commands ...
COMMAND_TYPE_REGISTER_BACKEND = 7;
COMMAND_TYPE_UPDATE_BACKEND = 8;
COMMAND_TYPE_DELETE_BACKEND = 9;
}
message RegisterBackendCommand {
Backend backend = 1;
}
message UpdateBackendCommand {
string name = 1; // Backend to update
BackendConfig config = 2; // New config
map<string, string> metadata = 3; // New metadata
}
message DeleteBackendCommand {
string name = 1;
}
Admin API Extensions
service ControlPlane {
// ... existing RPCs ...
// Backend management
rpc RegisterBackend(RegisterBackendRequest) returns (RegisterBackendResponse);
rpc UpdateBackend(UpdateBackendRequest) returns (UpdateBackendResponse);
rpc DeleteBackend(DeleteBackendRequest) returns (DeleteBackendResponse);
rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse);
rpc GetBackend(GetBackendRequest) returns (GetBackendResponse);
}
message RegisterBackendRequest {
Backend backend = 1;
}
message RegisterBackendResponse {
bool success = 1;
string error = 2;
}
message ListBackendsRequest {
optional BackendType type = 1; // Filter by type
repeated string names = 2; // Filter by specific names
}
message ListBackendsResponse {
repeated Backend backends = 1;
}
Frontend Registry Model
Frontend Resource
// Frontend represents a globally-unique frontend interface binding
message Frontend {
string name = 1; // Globally unique (e.g., "confluent-schema-registry-rest")
FrontendType type = 2; // Enum: REST, GRAPHQL, GRPC_WEB, etc.
FrontendConfig config = 3; // Type-specific configuration
repeated RouteMapping routes = 4; // REST route → pattern operation mappings
map<string, string> metadata = 5; // Operator annotations (api_version, compatibility)
int64 created_at = 6;
int64 updated_at = 7;
string created_by = 8;
}
enum FrontendType {
FRONTEND_TYPE_UNSPECIFIED = 0;
FRONTEND_TYPE_REST = 1; // HTTP/REST with JSON
FRONTEND_TYPE_GRAPHQL = 2; // GraphQL API
FRONTEND_TYPE_GRPC_WEB = 3; // gRPC-Web for browsers
FRONTEND_TYPE_SSE = 4; // Server-Sent Events
FRONTEND_TYPE_WEBSOCKET = 5; // WebSocket bidirectional
}
// FrontendConfig is a oneof containing type-specific configs
message FrontendConfig {
oneof config {
RestConfig rest = 1;
GraphQLConfig graphql = 2;
GrpcWebConfig grpc_web = 3;
SseConfig sse = 4;
WebSocketConfig websocket = 5;
}
}
Type-Specific Frontend Configs
message RestConfig {
string base_path = 1; // Base path prefix (e.g., "/api/v1")
string openapi_spec_url = 2; // Optional OpenAPI 3.0 spec URL
bool enable_cors = 3; // Enable CORS headers
repeated string allowed_origins = 4; // CORS allowed origins
RestAuthentication auth = 5; // REST auth config
map<string, string> headers = 6; // Default response headers
}
message RestAuthentication {
oneof auth {
BearerTokenAuth bearer = 1;
BasicAuth basic = 2;
ApiKeyAuth api_key = 3;
OAuthAuth oauth = 4;
}
}
message GraphQLConfig {
string endpoint = 1; // GraphQL endpoint path (e.g., "/graphql")
bool enable_playground = 2; // Enable GraphQL Playground UI
string schema_file = 3; // Path to GraphQL schema file
}
message GrpcWebConfig {
string endpoint = 1; // gRPC-Web endpoint (e.g., "/grpc")
bool text_format = 2; // Use text format vs binary
}
message SseConfig {
string endpoint = 1; // SSE endpoint (e.g., "/events")
int32 heartbeat_interval_ms = 2; // Heartbeat interval (default 30s)
}
message WebSocketConfig {
string endpoint = 1; // WebSocket endpoint (e.g., "/ws")
int32 ping_interval_ms = 2; // Ping interval (default 30s)
}
Route Mapping (OpenAPI-style)
// RouteMapping defines REST route → pattern RPC mapping
message RouteMapping {
string http_method = 1; // GET, POST, PUT, DELETE, PATCH
string path_pattern = 2; // /subjects/{subject}/versions/{version}
string pattern_rpc = 3; // Fully-qualified RPC (e.g., "prism.patterns.Registry.GetSchema")
repeated ParamMapping param_mappings = 4; // How to map HTTP params to RPC fields
ResponseMapping response = 5; // How to map RPC response to HTTP response
int32 timeout_ms = 6; // Per-route timeout override
}
// ParamMapping defines how HTTP params map to protobuf fields
message ParamMapping {
string source = 1; // "path", "query", "header", "body"
string source_name = 2; // HTTP param name (e.g., "subject")
string target_field = 3; // Protobuf field path (e.g., "request.subject")
string default_value = 4; // Optional default value
bool required = 5; // Whether param is required
}
// ResponseMapping defines how protobuf response maps to HTTP response
message ResponseMapping {
int32 status_code = 1; // HTTP status code (default 200)
string body_field = 2; // Which protobuf field to use as body (default: whole message)
map<string, string> header_mappings = 3; // Protobuf field → HTTP header mappings
}
Admin State Integration for Frontends
message AdminState {
int32 version = 1;
map<string, NamespaceEntry> namespaces = 2;
map<string, ProxyEntry> proxies = 3;
map<string, LauncherEntry> launchers = 4;
map<string, PatternEntry> patterns = 5;
map<string, BackendEntry> backends = 6;
// NEW: Frontend registry
map<string, FrontendEntry> frontends = 7; // key = frontend name
int64 last_applied_index = 10;
int64 last_applied_term = 11;
int64 state_updated_at = 12;
}
message FrontendEntry {
Frontend frontend = 1; // Full frontend config
string status = 2; // "active", "deprecated", "disabled"
int64 registered_at = 3;
int64 updated_at = 4;
}
Admin Commands for Frontends
enum CommandType {
// ... existing commands ...
COMMAND_TYPE_REGISTER_BACKEND = 7;
COMMAND_TYPE_UPDATE_BACKEND = 8;
COMMAND_TYPE_DELETE_BACKEND = 9;
// NEW: Frontend commands
COMMAND_TYPE_REGISTER_FRONTEND = 10;
COMMAND_TYPE_UPDATE_FRONTEND = 11;
COMMAND_TYPE_DELETE_FRONTEND = 12;
}
message RegisterFrontendCommand {
Frontend frontend = 1;
}
message UpdateFrontendCommand {
string name = 1; // Frontend to update
FrontendConfig config = 2; // New config
repeated RouteMapping routes = 3; // New route mappings
map<string, string> metadata = 4; // New metadata
}
message DeleteFrontendCommand {
string name = 1;
}
Admin API Extensions for Frontends
service ControlPlane {
// ... existing RPCs ...
// Backend management
rpc RegisterBackend(RegisterBackendRequest) returns (RegisterBackendResponse);
rpc UpdateBackend(UpdateBackendRequest) returns (UpdateBackendResponse);
rpc DeleteBackend(DeleteBackendRequest) returns (DeleteBackendResponse);
rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse);
rpc GetBackend(GetBackendRequest) returns (GetBackendResponse);
// NEW: Frontend management
rpc RegisterFrontend(RegisterFrontendRequest) returns (RegisterFrontendResponse);
rpc UpdateFrontend(UpdateFrontendRequest) returns (UpdateFrontendResponse);
rpc DeleteFrontend(DeleteFrontendRequest) returns (DeleteFrontendResponse);
rpc ListFrontends(ListFrontendsRequest) returns (ListFrontendsResponse);
rpc GetFrontend(GetFrontendRequest) returns (GetFrontendResponse);
}
message RegisterFrontendRequest {
Frontend frontend = 1;
}
message RegisterFrontendResponse {
bool success = 1;
string error = 2;
}
message ListFrontendsRequest {
optional FrontendType type = 1; // Filter by type
repeated string names = 2; // Filter by specific names
}
message ListFrontendsResponse {
repeated Frontend frontends = 1;
}
Pattern Configuration with Backend References
Pattern Definition with Slot Bindings
# Pattern configuration (in namespace config)
namespace: order-processing
pattern: multicast-registry
# Slot bindings: Map slot names → backend names
slot_bindings:
registry: postgres-primary # Registry slot uses PostgreSQL backend
messaging: kafka-prod # Messaging slot uses Kafka backend
durability: kafka-prod # Durability slot reuses same Kafka
# Pattern-specific config (non-backend settings)
pattern_config:
ttl_seconds: 300
max_identities: 10000
Equivalent Protobuf
message PatternEntry {
string pattern_id = 1;
string namespace = 2;
string pattern_type = 3; // "multicast-registry"
string launcher_id = 4;
// NEW: Slot bindings
map<string, string> slot_bindings = 5; // {"registry": "postgres-primary", "messaging": "kafka-prod"}
map<string, string> config = 6; // Pattern-specific config
string status = 7;
int64 assigned_at = 8;
int64 updated_at = 9;
}
Pattern Runner Startup Flow
sequenceDiagram
participant Launcher as Launcher
participant Pattern as Pattern Runner
participant Admin as Admin Control Plane
participant Backend as Backend (Kafka)
Note over Launcher,Backend: 1. Pattern Assignment
Admin->>Launcher: AssignPattern(pattern_id, namespace, slot_bindings)
Launcher->>Pattern: Start(pattern_config)
Note over Pattern,Admin: 2. Fetch Backend Configs
Pattern->>Admin: GetBackend("kafka-prod")
Admin-->>Pattern: Backend{type: KAFKA, config: KafkaConfig{...}}
Pattern->>Admin: GetBackend("postgres-primary")
Admin-->>Pattern: Backend{type: POSTGRES, config: PostgresConfig{...}}
Note over Pattern,Backend: 3. Bind Slots to Backends
Pattern->>Pattern: messaging_slot.bind(kafka_config)
Pattern->>Pattern: registry_slot.bind(postgres_config)
Note over Pattern,Backend: 4. Connect to Backends
Pattern->>Backend: Connect(brokers=["kafka:9092"])
Backend-->>Pattern: Connection established
Pattern->>Pattern: Start processing
Backend Binding in Pattern Code
Go Pattern Runner Implementation
// Pattern runner startup
func (r *MulticastRegistryRunner) Start(ctx context.Context, config *PatternConfig) error {
// 1. Extract slot bindings from pattern config
slotBindings := config.SlotBindings // {"registry": "postgres-primary", "messaging": "kafka-prod"}
// 2. Fetch backend configs from admin
backends := make(map[string]*Backend)
for slotName, backendName := range slotBindings {
backend, err := r.adminClient.GetBackend(ctx, backendName)
if err != nil {
return fmt.Errorf("failed to fetch backend %s for slot %s: %w", backendName, slotName, err)
}
backends[slotName] = backend
}
// 3. Bind slots to backends using type-specific drivers
registryBackend := backends["registry"]
switch registryBackend.Type {
case prism.BACKEND_TYPE_POSTGRES:
pgConfig := registryBackend.Config.GetPostgres()
r.registrySlot = NewPostgresRegistrySlot(pgConfig)
case prism.BACKEND_TYPE_REDIS:
redisConfig := registryBackend.Config.GetRedis()
r.registrySlot = NewRedisRegistrySlot(redisConfig)
default:
return fmt.Errorf("unsupported backend type for registry slot: %v", registryBackend.Type)
}
messagingBackend := backends["messaging"]
switch messagingBackend.Type {
case prism.BACKEND_TYPE_KAFKA:
kafkaConfig := messagingBackend.Config.GetKafka()
r.messagingSlot = NewKafkaMessagingSlot(kafkaConfig)
case prism.BACKEND_TYPE_NATS:
natsConfig := messagingBackend.Config.GetNats()
r.messagingSlot = NewNatsMessagingSlot(natsConfig)
default:
return fmt.Errorf("unsupported backend type for messaging slot: %v", messagingBackend.Type)
}
// 4. Connect all slots
if err := r.registrySlot.Connect(ctx); err != nil {
return fmt.Errorf("registry slot connect failed: %w", err)
}
if err := r.messagingSlot.Connect(ctx); err != nil {
return fmt.Errorf("messaging slot connect failed: %w", err)
}
return nil
}
Shared Backend Binding Logic
To reduce duplication across pattern runners, we can provide a generic slot binder:
// pkg/pattern/slot_binder.go
type SlotBinder struct {
adminClient admin.ControlPlaneClient
}
func NewSlotBinder(adminClient admin.ControlPlaneClient) *SlotBinder {
return &SlotBinder{adminClient: adminClient}
}
// BindSlot fetches backend config and creates slot implementation
func (b *SlotBinder) BindSlot(ctx context.Context, slotName, backendName string, slotType SlotType) (Slot, error) {
// 1. Fetch backend config
backend, err := b.adminClient.GetBackend(ctx, backendName)
if err != nil {
return nil, fmt.Errorf("failed to fetch backend %s: %w", backendName, err)
}
// 2. Create slot implementation based on backend type and slot type
switch slotType {
case SlotTypeRegistry:
return b.createRegistrySlot(backend)
case SlotTypeMessaging:
return b.createMessagingSlot(backend)
case SlotTypeDurability:
return b.createDurabilitySlot(backend)
default:
return nil, fmt.Errorf("unknown slot type: %v", slotType)
}
}
func (b *SlotBinder) createMessagingSlot(backend *Backend) (Slot, error) {
switch backend.Type {
case prism.BACKEND_TYPE_KAFKA:
return NewKafkaMessagingSlot(backend.Config.GetKafka()), nil
case prism.BACKEND_TYPE_NATS:
return NewNatsMessagingSlot(backend.Config.GetNats()), nil
default:
return nil, fmt.Errorf("backend type %v does not support messaging slot", backend.Type)
}
}
Admin CLI for Backend Management
# Register a new backend
prism-admin backend register kafka-prod \
--type kafka \
--brokers kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--security-protocol SASL_SSL \
--metadata region=us-west-2,env=production
# List all backends
prism-admin backend list
# Output:
# NAME TYPE CAPABILITIES STATUS CREATED
# kafka-prod kafka [publish,subscribe] active 2025-10-18
# postgres-primary postgres [get,set,scan,delete] active 2025-10-18
# nats-dev nats [publish,subscribe] active 2025-10-18
# memstore-local memstore [get,set,delete] active 2025-10-18
# Get backend details
prism-admin backend get kafka-prod
# Output:
# Name: kafka-prod
# Type: kafka
# Brokers: kafka-01:9092, kafka-02:9092, kafka-03:9092
# Security: SASL_SSL
# Capabilities: publish, subscribe
# Metadata:
# region: us-west-2
# env: production
# Status: active
# Created: 2025-10-18 10:30:00
# Update backend config
prism-admin backend update kafka-prod --brokers kafka-01:9092,kafka-04:9092
# Delete backend
prism-admin backend delete kafka-dev
# Warning: Backend 'kafka-dev' is used by 3 patterns:
# - order-processing (messaging slot)
# - user-notifications (queue slot)
# - analytics-ingest (streaming slot)
# Are you sure? [y/N]:
Concrete Example: Registry Pattern as Confluent Schema Registry API
This section demonstrates how to expose Prism's registry pattern as a Confluent-compatible Schema Registry REST API using frontend bindings.
Pattern: Registry (gRPC)
The registry pattern exposes these gRPC RPCs:
service RegistryService {
rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse);
rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
rpc ListSchemas(ListSchemasRequest) returns (ListSchemasResponse);
rpc DeleteSchema(DeleteSchemaRequest) returns (DeleteSchemaResponse);
rpc CheckCompatibility(CheckCompatibilityRequest) returns (CheckCompatibilityResponse);
}
message RegisterSchemaRequest {
string subject = 1;
string schema = 2;
string schema_type = 3; // PROTOBUF, JSON, AVRO
}
message RegisterSchemaResponse {
int32 id = 1;
int32 version = 2;
}
message GetSchemaRequest {
string subject = 1;
int32 version = 2;
}
message GetSchemaResponse {
int32 id = 1;
int32 version = 2;
string schema = 3;
string schema_type = 4;
}
Frontend: Confluent Schema Registry REST API
Register a frontend that maps Confluent's REST API to the registry pattern:
name: confluent-schema-registry-rest
type: REST
config:
rest:
base_path: /
enable_cors: true
allowed_origins: ["*"]
auth:
bearer:
issuer: "https://auth.example.com"
audience: "schema-registry"
# Route mappings: Confluent REST → Registry gRPC
routes:
# POST /subjects/{subject}/versions → RegisterSchema
- http_method: POST
path_pattern: /subjects/{subject}/versions
pattern_rpc: prism.patterns.RegistryService.RegisterSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: body
source_name: schema
target_field: schema
required: true
- source: body
source_name: schemaType
target_field: schema_type
default_value: PROTOBUF
response:
status_code: 200
# Body is whole RegisterSchemaResponse: {"id": 1, "version": 1}
# GET /subjects/{subject}/versions/{version} → GetSchema
- http_method: GET
path_pattern: /subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.GetSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: version
required: true
response:
status_code: 200
# GET /subjects/{subject}/versions → ListSchemas (filtered by subject)
- http_method: GET
path_pattern: /subjects/{subject}/versions
pattern_rpc: prism.patterns.RegistryService.ListSchemas
param_mappings:
- source: path
source_name: subject
target_field: subject_filter
required: true
response:
status_code: 200
body_field: versions # Extract versions array from ListSchemasResponse
# DELETE /subjects/{subject}/versions/{version} → DeleteSchema
- http_method: DELETE
path_pattern: /subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.DeleteSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: version
required: true
response:
status_code: 200
body_field: deleted_version # Return just the version number
# POST /compatibility/subjects/{subject}/versions/{version} → CheckCompatibility
- http_method: POST
path_pattern: /compatibility/subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.CheckCompatibility
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: base_version
required: true
- source: body
source_name: schema
target_field: new_schema
required: true
- source: body
source_name: schemaType
target_field: schema_type
default_value: PROTOBUF
response:
status_code: 200
# Body: {"is_compatible": true}
metadata:
api_version: "v7"
compatibility_mode: "confluent"
documentation_url: "https://docs.confluent.io/platform/current/schema-registry/develop/api.html"
Namespace Configuration with Frontend Binding
namespace: schema-registry
pattern: registry
# Backend slots
slot_bindings:
registry: postgres-primary # Store schemas in PostgreSQL
cache: redis-prod # Cache recently accessed schemas
# Frontend bindings
frontend_bindings:
- name: confluent-schema-registry-rest
enabled: true
- name: default-grpc
enabled: true # Keep gRPC interface as well
# Pattern-specific config
pattern_config:
max_schemas_per_subject: 100
compatibility_check: backward # Default compatibility mode
Proxy Request Handling Flow
sequenceDiagram
participant Client as Confluent Client
participant Proxy as Prism Proxy (REST Adapter)
participant Pattern as Registry Pattern Runner
participant Backend as PostgreSQL
Note over Client,Backend: 1. REST Request (Confluent API)
Client->>Proxy: POST /subjects/orders.created/versions
Client->>Proxy: {"schema": "...", "schemaType": "PROTOBUF"}
Note over Proxy: 2. Route Matching & Param Extraction
Proxy->>Proxy: Match route: POST /subjects/{subject}/versions
Proxy->>Proxy: Extract params: subject="orders.created", schema="...", schemaType="PROTOBUF"
Note over Proxy: 3. Build gRPC Request
Proxy->>Proxy: Map to RegisterSchemaRequest{subject, schema, schema_type}
Note over Proxy,Pattern: 4. Call Pattern Runner via gRPC
Proxy->>Pattern: RegisterSchema(RegisterSchemaRequest)
Note over Pattern,Backend: 5. Pattern Executes Against Backend
Pattern->>Backend: INSERT INTO schemas (subject, schema, type, version) VALUES (...)
Backend-->>Pattern: id=1, version=1
Pattern-->>Proxy: RegisterSchemaResponse{id: 1, version: 1}
Note over Proxy: 6. Map gRPC Response to REST
Proxy->>Proxy: Convert to JSON: {"id": 1, "version": 1}
Note over Proxy,Client: 7. REST Response (Confluent Format)
Proxy-->>Client: 200 OK
Proxy-->>Client: {"id": 1, "version": 1}
Equivalent Protobuf Representation
message NamespaceEntry {
string name = 1; // "schema-registry"
string pattern_type = 2; // "registry"
// Backend slot bindings
map<string, string> slot_bindings = 3; // {"registry": "postgres-primary", "cache": "redis-prod"}
// NEW: Frontend bindings
repeated FrontendBinding frontend_bindings = 4;
map<string, string> pattern_config = 5;
}
message FrontendBinding {
string name = 1; // "confluent-schema-registry-rest"
bool enabled = 2; // true
map<string, string> overrides = 3; // Optional namespace-specific overrides
}
Client Usage (Confluent SDK)
# Python client using Confluent Schema Registry SDK
from confluent_kafka.schema_registry import SchemaRegistryClient
# Point to Prism proxy's frontend binding
schema_registry = SchemaRegistryClient({
'url': 'http://prism-proxy.example.com', # Prism proxy with frontend binding
'basic.auth.user.info': 'token:secret'
})
# Register schema (goes through Prism's REST → gRPC translation)
schema_str = """
syntax = "proto3";
message Order {
string order_id = 1;
int64 amount = 2;
}
"""
schema_id = schema_registry.register_schema(
'orders.created',
Schema(schema_str, 'PROTOBUF')
)
# → POST /subjects/orders.created/versions
# → Prism translates to RegistryService.RegisterSchema gRPC
# → Returns: {"id": 1, "version": 1}
# Get schema by version
schema = schema_registry.get_version('orders.created', 1)
# → GET /subjects/orders.created/versions/1
# → Prism translates to RegistryService.GetSchema gRPC
Benefits of This Approach
- API Compatibility: Existing Confluent clients work without modification
- Backend Flexibility: Registry pattern can use PostgreSQL, Redis, or any backend with KV + scan interfaces
- Protocol Translation: Proxy handles HTTP → gRPC conversion transparently
- Centralized Management: Frontend config is admin-managed, not embedded in application code
- Multi-Protocol Support: Same pattern accessible via both REST (Confluent API) and gRPC (native)
- Observability: Proxy can log/trace REST requests and correlate with gRPC calls
Pattern Slot Schema Integration (MEMO-006)
Building on MEMO-006's backend interface decomposition, patterns declare required/optional backend interfaces per slot:
Pattern Slot Definition (from MEMO-006)
pattern: multicast-registry
version: v1
slots:
registry:
description: "Stores identity → metadata mappings"
required_interfaces:
- keyvalue_basic # MUST implement basic KV operations
- keyvalue_scan # MUST support enumeration
optional_interfaces:
- keyvalue_ttl # Nice to have: auto-expire offline identities
recommended_backends:
- redis # Has all 3 interfaces
- postgres # Has basic + scan (no TTL)
- dynamodb # Has all 3 interfaces
messaging:
description: "Delivers multicast messages to identities"
required_interfaces:
- pubsub_basic # MUST implement basic pub/sub
optional_interfaces:
- pubsub_persistent # Nice to have: durable delivery
recommended_backends:
- nats
- redis
- kafka
Validation at Runtime
When pattern runner binds backend to slot, validate backend implements required interfaces:
func (b *SlotBinder) BindSlot(ctx context.Context, slotName, backendName string, slotSchema SlotSchema) (Slot, error) {
// 1. Fetch backend config
backend, err := b.adminClient.GetBackend(ctx, backendName)
if err != nil {
return nil, fmt.Errorf("backend %s not found: %w", backendName, err)
}
// 2. Validate backend implements required interfaces
for _, requiredInterface := range slotSchema.RequiredInterfaces {
if !backend.ImplementsInterface(requiredInterface) {
return nil, fmt.Errorf(
"backend %s (type %v) does not implement required interface %s for slot %s",
backendName, backend.Type, requiredInterface, slotName,
)
}
}
// 3. Warn if optional interfaces missing
for _, optionalInterface := range slotSchema.OptionalInterfaces {
if !backend.ImplementsInterface(optionalInterface) {
log.Warnf("Backend %s missing optional interface %s for slot %s (degraded functionality)",
backendName, optionalInterface, slotName)
}
}
// 4. Create slot implementation
return b.createSlot(backend, slotSchema)
}
Backend Capability Metadata
Backends declare which interfaces they implement (per MEMO-006):
backend: redis
type: REDIS
capabilities:
- keyvalue_basic
- keyvalue_scan
- keyvalue_ttl
- keyvalue_transactional
- keyvalue_batch
- pubsub_basic
- pubsub_wildcards
- stream_basic
- stream_consumer_groups
# ... 16 total interfaces
Stored in protobuf:
message Backend {
string name = 1;
BackendType type = 2;
BackendConfig config = 3;
repeated string capabilities = 4; // ["keyvalue_basic", "keyvalue_scan", ...]
map<string, string> metadata = 5;
}
Compatibility with Existing Patterns
Migration Path
Existing patterns that embed backend connection details in their config can migrate gradually:
Before (embedded config):
namespace: order-processing
pattern: multicast-registry
config:
registry_backend:
type: postgres
host: postgres.example.com
port: 5432
database: prism
messaging_backend:
type: kafka
brokers: ["kafka:9092"]
After (backend references):
namespace: order-processing
pattern: multicast-registry
slot_bindings:
registry: postgres-primary
messaging: kafka-prod
Migration steps:
- Admin registers backends:
prism-admin backend register postgres-primary ... - Update pattern config to use
slot_bindings - Pattern runner detects
slot_bindingsand uses backend registry - Old
config.registry_backendis ignored
Backward Compatibility
Pattern runners can support both styles:
func (r *Runner) Start(ctx context.Context, config *PatternConfig) error {
if len(config.SlotBindings) > 0 {
// NEW: Use backend registry
return r.startWithBackendRegistry(ctx, config)
} else {
// OLD: Use embedded config
return r.startWithEmbeddedConfig(ctx, config)
}
}
Implementation Plan
Phase 1: Protobuf Definitions (Week 1)
Backend Registry:
- Add
Backend,BackendConfig,BackendTypemessages to proto - Add
BackendEntrytoAdminState - Add backend commands to
CommandTypeenum - Add backend management RPCs to
ControlPlaneservice
Frontend Registry:
- Add
Frontend,FrontendConfig,FrontendTypemessages to proto - Add
RouteMapping,ParamMapping,ResponseMappingfor route definitions - Add
FrontendEntrytoAdminState - Add frontend commands to
CommandTypeenum - Add frontend management RPCs to
ControlPlaneservice
Generate Code:
- Generate Go code for all new proto definitions
Phase 2: Admin FSM Integration (Week 1-2)
Backend FSM:
- Implement
RegisterBackend,UpdateBackend,DeleteBackendcommands in FSM - Add backend storage sync (persist to SQLite/PostgreSQL)
- Update admin startup to load backends from storage
- Add Raft tests for backend commands
Frontend FSM:
- Implement
RegisterFrontend,UpdateFrontend,DeleteFrontendcommands in FSM - Add frontend storage sync (persist to SQLite/PostgreSQL)
- Update admin startup to load frontends from storage
- Add Raft tests for frontend commands
Phase 3: Admin API Implementation (Week 2)
Backend APIs:
- Implement
RegisterBackend,UpdateBackend,DeleteBackendRPCs - Implement
ListBackends,GetBackendRPCs - Add validation (unique names, valid configs)
- Add admin CLI commands for backend management
Frontend APIs:
- Implement
RegisterFrontend,UpdateFrontend,DeleteFrontendRPCs - Implement
ListFrontends,GetFrontendRPCs - Add validation (unique names, valid route mappings)
- Add admin CLI commands for frontend management
Phase 4: Pattern Runner Integration (Week 3)
Backend Slot Binding:
- Create
SlotBinderutility for fetching and binding backends - Implement type-specific slot factories (Kafka, NATS, PostgreSQL, etc.)
- Add slot schema validation (required/optional interfaces from MEMO-006)
- Update pattern runners to support
slot_bindingsconfig - Add backward compatibility for embedded configs
Pattern Slot Schemas:
- Define slot schemas for all existing patterns (multicast-registry, etc.)
- Add interface requirements per slot (keyvalue_basic, pubsub_basic, etc.)
- Validate backend capabilities against slot requirements at runtime
Phase 5: Proxy Frontend Integration (Week 3-4)
REST Adapter:
- Create REST adapter middleware in Rust proxy
- Implement route matching (path patterns with variables)
- Implement parameter mapping (path/query/header/body → protobuf fields)
- Implement HTTP → gRPC protocol translation
- Implement gRPC → HTTP response mapping
- Add CORS support
- Add REST authentication (Bearer, Basic, API Key)
Frontend Configuration Loading:
- Proxy fetches frontend configs from admin at startup
- Proxy builds route table from frontend route mappings
- Proxy registers HTTP handlers for frontend routes
- Add hot-reload support for frontend config changes (future)
Phase 6: Concrete Example Implementation (Week 4)
Confluent Schema Registry Frontend:
- Implement registry pattern with schema storage slot
- Define Confluent-compatible REST route mappings
- Register
confluent-schema-registry-restfrontend in admin - Test with Confluent Python SDK client
- Validate compatibility with Confluent REST API spec
Phase 7: Testing (Week 4-5)
Backend Tests:
- Unit tests for backend validation
- Integration tests with prism-admin backend CRUD
- End-to-end test: Register backend → Assign pattern → Pattern runner binds slots
- Test multiple patterns sharing same backend
- Test slot schema validation (missing required interfaces)
Frontend Tests:
- Unit tests for route matching and parameter mapping
- Integration tests with prism-admin frontend CRUD
- End-to-end test: Register frontend → Proxy loads routes → REST request → gRPC call
- Test Confluent Schema Registry compatibility
- Test multiple frontends on same namespace
Phase 8: Documentation (Week 5)
Backend Documentation:
- Update pattern documentation with slot binding examples
- Write operator guide for backend management
- Create backend configuration templates for common setups
- Document slot schema validation and interface requirements
Frontend Documentation:
- Write operator guide for frontend management
- Document route mapping syntax (OpenAPI-style)
- Create frontend configuration templates (REST, GraphQL, etc.)
- Document Confluent Schema Registry example
- Update quickstart to include both backend and frontend registry
Alternatives Considered
Alternative 1: Per-Namespace Backend Configs
Approach: Each namespace defines its own backend configs (not globally shared).
Pros:
- Namespace isolation
- No global naming conflicts
Cons:
- Massive duplication (every namespace configures "Kafka")
- No shared backend updates
- Harder to track which namespaces use which backends
Rejected: Violates DRY principle and makes operations harder.
Alternative 2: Backend Auto-Discovery
Approach: Pattern runners discover backends via service discovery (Consul, etc.).
Pros:
- Dynamic backend discovery
- No admin config needed
Cons:
- Requires external service discovery infrastructure
- Pattern runners need to know backend names without explicit config
- No centralized governance
Rejected: Adds complexity and loses centralized control.
Alternative 3: Backend Config in Pattern Code
Approach: Hard-code backend connection details in pattern runner binaries.
Pros:
- Simple deployment (no config needed)
Cons:
- Requires recompilation to change backends
- No multi-environment support (dev vs prod)
- Security risk (credentials in binaries)
Rejected: Fundamentally wrong for production systems.
Success Criteria
- Backend CRUD: Admin can register, update, delete, list backends via CLI/API
- Slot Binding: Pattern runners successfully bind slots to backends using backend registry
- Shared Backends: Multiple patterns use the same backend (e.g., 3 patterns share
kafka-prod) - Zero Downtime Updates: Updating backend config doesn't require pattern restart (future enhancement)
- Migration: Existing patterns can migrate from embedded configs to backend registry
- Observability: Admin dashboard shows which patterns use which backends
Related Documents
- RFC-056: Unified Configuration Model - Complete unified configuration story (consolidates this RFC as Layer 4 & 5)
- RFC-027: Namespace Configuration - User-facing namespace configuration
- ADR-002: Client-Originated Configuration - Core configuration principle
- RFC-014: Layered Data Access Patterns - Slot architecture
- RFC-017: Multicast Registry Pattern - 3-slot example
- RFC-020: Streaming HTTP Listener API Adapter - HTTP adapter pattern
- RFC-032: Minimal Schema Registry for Local Testing - Confluent API compatibility
- RFC-035: Pattern Process Launcher - Pattern runner lifecycle
- RFC-038: Admin Leader Election with Raft - Admin state management
- MEMO-004: Backend Plugin Implementation Guide - Backend types and priorities
- MEMO-006: Backend Interface Decomposition - Interface-based capabilities, slot schemas
External References
- Confluent Schema Registry API: https://docs.confluent.io/platform/current/schema-registry/develop/api.html
Open Questions
-
Secret management: How do we securely store backend credentials (passwords, API keys)?
- Proposed: Integrate with Kubernetes Secrets or HashiCorp Vault
- Short-term: Store encrypted in admin database with key from env var
-
Backend versioning: What happens when we update a backend config while patterns are running?
- Proposed: Pattern runners cache config at startup; updates require restart
- Future: Support hot-reload with connection pool refresh
-
Multi-region backends: How do we model backends that span regions?
- Proposed: Create separate backend entries per region (
kafka-prod-usw2,kafka-prod-euw1) - Future: Add region-awareness to backend config
- Proposed: Create separate backend entries per region (
-
Backend health: Should admin monitor backend health and mark unhealthy backends?
- Proposed: Phase 2 feature—add health checks and status updates
- Initial: Backends are always assumed available
-
Namespace-specific overrides: Should namespaces override global backend configs?
- Proposed: No—keep it simple; create separate backends if needed
- Example: Instead of overriding
kafka-prod, createkafka-prod-criticalfor high-priority namespaces
Revision History
- 2025-11-14: Added cross-references to RFC-056 unified configuration model
- 2025-10-18: Initial draft