Skip to main content

Backend Configuration Registry

Note: This RFC is consolidated into RFC-056: Unified Configuration Model as Layer 4 (Backend Binding) and Layer 5 (Frontend Exposure). This document remains as a detailed technical reference for the backend and frontend registry implementation.

Abstract

This RFC defines how Prism manages backend configurations and frontend interface bindings as flat, globally-shared registries administered by the admin control plane and distributed to proxies and pattern runners at startup.

Backend configurations are uniquely named resources that encapsulate connection details, capabilities, and operational metadata, enabling pattern runners to bind their slot implementations to specific backends without embedding connection details in pattern code.

Frontend interface bindings are uniquely named resources that define how external clients interact with patterns via REST, GraphQL, or other protocols, in addition to the default gRPC pattern interfaces. Frontends enable protocol translation (e.g., HTTP → gRPC) and API compatibility layers (e.g., Confluent Schema Registry REST API → Registry pattern).

Motivation

Current Problem: Backend Slot Binding

Pattern implementations need to connect to backends (Kafka, NATS, PostgreSQL, etc.) to fulfill their slot requirements, but there's no standardized way to:

  1. Centrally manage backend configurations: Connection strings, credentials, and settings are scattered across pattern definitions
  2. Share backends across patterns: Multiple patterns that need "Kafka" have to independently configure the same backend
  3. Map slots to backends at runtime: Pattern runners don't have a clean way to discover "which Kafka instance should I use for the messaging slot?"
  4. Administer backends globally: Operators need to add/remove/update backend configurations for the entire cluster
  5. Separate concerns: Pattern authors shouldn't need to know connection details—they should only declare "I need a messaging backend"

Current Problem: Frontend Interface Exposure

Pattern implementations expose gRPC interfaces by default, but external clients often need REST, GraphQL, or compatibility with existing APIs (e.g., Confluent Schema Registry). There's no standardized way to:

  1. Map REST routes to pattern operations: No declarative way to expose pattern RPCs as REST endpoints
  2. Support multiple API protocols: Clients need REST, SSE, WebSocket, GraphQL—not just gRPC
  3. Provide API compatibility layers: Can't easily expose registry pattern as Confluent Schema Registry API
  4. Manage frontend configurations globally: REST route mappings, OpenAPI specs, and protocol configs are scattered
  5. Disable default gRPC interface: Some patterns may only want REST exposure, not gRPC

Example Scenario

Consider three patterns in different namespaces:

  • order-processing: Uses multicast-registry pattern (needs: registry slot → PostgreSQL, messaging slot → Kafka)
  • user-notifications: Uses durable-queue pattern (needs: queue slot → Kafka, durability slot → PostgreSQL)
  • analytics-ingest: Uses event-stream pattern (needs: streaming slot → Kafka)

Without this RFC:

  • Each pattern embeds Kafka connection string kafka-prod.example.com:9092 in its config
  • Changing Kafka requires updating all 3 pattern configurations
  • No visibility into which patterns use which backends
  • Pattern runners must parse arbitrary config formats

With this RFC:

  • Admin defines global backend: kafka-prod with connection details
  • Patterns reference backend by name: messaging_slot: kafka-prod
  • Pattern runners fetch kafka-prod config at startup and bind slots
  • Changing Kafka means updating one backend config—all patterns automatically use new settings

Goals

Backend Registry Goals

  1. Flat backend registry: All backends have globally unique names (e.g., kafka-prod, postgres-primary, nats-dev)
  2. Admin-managed: Backends are created/updated/deleted via admin control plane APIs
  3. Distributed to runners: Pattern runners fetch backend configs at startup
  4. Typed configurations: Each backend type (Kafka, PostgreSQL, Redis) has a structured config schema
  5. Slot binding: Pattern runners map slot names → backend names → connection configs
  6. Capability metadata: Backends declare what operations they support for slot selection
  7. Slot schema integration: Pattern definitions declare required/optional backend interfaces per slot (see MEMO-006)

Frontend Registry Goals

  1. Flat frontend registry: All frontends have globally unique names (e.g., confluent-schema-registry-rest, graphql-api)
  2. Admin-managed: Frontends are created/updated/deleted via admin control plane APIs
  3. Distributed to proxies: Proxies fetch frontend configs at startup
  4. Typed configurations: Each frontend type (REST, GraphQL, gRPC) has a structured config schema
  5. Route mapping: Proxies map REST routes → pattern RPC operations using OpenAPI-style route definitions
  6. Default gRPC: Pattern gRPC interfaces are enabled by default but can be disabled per-namespace
  7. Protocol translation: Proxies handle HTTP → gRPC, SSE → streaming gRPC, etc.

Non-Goals

Backend Non-Goals

  1. Dynamic backend switching: Patterns bind backends at startup; runtime changes require restart
  2. Per-namespace backends: All backends are global; namespaces reference shared backends
  3. Automatic slot matching: Patterns explicitly configure which backend fills which slot
  4. Backend health monitoring: This RFC focuses on config distribution, not runtime health checks
  5. Multi-region backends: Initial implementation assumes single-region; multi-region is future work

Frontend Non-Goals

  1. Full OpenAPI code generation: We use OpenAPI semantics for route mapping, not full codegen
  2. GraphQL schema stitching: Initial implementation focuses on REST; GraphQL is future work
  3. WebSocket protocol translation: SSE for streaming; full WebSocket support is future
  4. Per-namespace frontend overrides: All frontends are global; namespaces opt-in to frontends
  5. Frontend load balancing: Proxy handles routing; external load balancer handles HA

Design

Backend Registry Model

Backend Resource

// Backend represents a globally-unique backend configuration
message Backend {
string name = 1; // Globally unique (e.g., "kafka-prod")
BackendType type = 2; // Enum: KAFKA, NATS, POSTGRES, REDIS, etc.
BackendConfig config = 3; // Type-specific configuration
repeated string capabilities = 4; // Slot operations supported (e.g., ["publish", "subscribe"])
map<string, string> metadata = 5; // Operator annotations (region, env, owner)
int64 created_at = 6;
int64 updated_at = 7;
string created_by = 8;
}

enum BackendType {
BACKEND_TYPE_UNSPECIFIED = 0;
BACKEND_TYPE_KAFKA = 1;
BACKEND_TYPE_NATS = 2;
BACKEND_TYPE_POSTGRES = 3;
BACKEND_TYPE_REDIS = 4;
BACKEND_TYPE_SQLITE = 5;
BACKEND_TYPE_S3 = 6;
BACKEND_TYPE_MEMSTORE = 7;
// Add more as needed
}

// BackendConfig is a oneof containing type-specific configs
message BackendConfig {
oneof config {
KafkaConfig kafka = 1;
NatsConfig nats = 2;
PostgresConfig postgres = 3;
RedisConfig redis = 4;
SqliteConfig sqlite = 5;
S3Config s3 = 6;
MemStoreConfig memstore = 7;
}
}

Type-Specific Configs

message KafkaConfig {
repeated string brokers = 1; // ["kafka-01:9092", "kafka-02:9092"]
string security_protocol = 2; // "PLAINTEXT", "SASL_SSL"
KafkaAuth auth = 3;
int32 connection_timeout_ms = 4;
int32 session_timeout_ms = 5;
}

message KafkaAuth {
oneof auth {
KafkaSASL sasl = 1;
KafkaMTLS mtls = 2;
}
}

message NatsConfig {
repeated string servers = 1; // ["nats://nats-01:4222"]
string credentials_file = 2; // Path to NATS creds file
bool tls_enabled = 3;
int32 connection_timeout_ms = 4;
}

message PostgresConfig {
string host = 1; // "postgres.example.com"
int32 port = 2; // 5432
string database = 3;
string user = 4;
PostgresAuth auth = 5;
int32 max_connections = 6;
bool ssl_mode = 7;
}

message RedisConfig {
string host = 1;
int32 port = 2;
int32 database = 3; // Redis DB number (0-15)
RedisAuth auth = 4;
bool tls_enabled = 5;
int32 pool_size = 6;
}

message SqliteConfig {
string path = 1; // File path or ":memory:"
bool wal_enabled = 2;
int32 cache_size_kb = 3;
}

message S3Config {
string endpoint = 1; // "s3.amazonaws.com" or MinIO endpoint
string region = 2;
string bucket = 3;
S3Auth auth = 4;
bool path_style = 5; // For MinIO compatibility
}

message MemStoreConfig {
int64 max_size_bytes = 1; // Memory limit
int32 eviction_policy = 2; // LRU, LFU, etc.
}

Admin Control Plane Integration

New Admin State

Add backends to the admin Raft FSM state:

message AdminState {
int32 version = 1;

map<string, NamespaceEntry> namespaces = 2;
map<string, ProxyEntry> proxies = 3;
map<string, LauncherEntry> launchers = 4;
map<string, PatternEntry> patterns = 5;

// NEW: Backend registry
map<string, BackendEntry> backends = 6; // key = backend name

int64 last_applied_index = 10;
int64 last_applied_term = 11;
int64 state_updated_at = 12;
}

message BackendEntry {
Backend backend = 1; // Full backend config
string status = 2; // "active", "deprecated", "disabled"
int64 registered_at = 3;
int64 updated_at = 4;
}

New Admin Commands

enum CommandType {
// ... existing commands ...
COMMAND_TYPE_REGISTER_BACKEND = 7;
COMMAND_TYPE_UPDATE_BACKEND = 8;
COMMAND_TYPE_DELETE_BACKEND = 9;
}

message RegisterBackendCommand {
Backend backend = 1;
}

message UpdateBackendCommand {
string name = 1; // Backend to update
BackendConfig config = 2; // New config
map<string, string> metadata = 3; // New metadata
}

message DeleteBackendCommand {
string name = 1;
}

Admin API Extensions

service ControlPlane {
// ... existing RPCs ...

// Backend management
rpc RegisterBackend(RegisterBackendRequest) returns (RegisterBackendResponse);
rpc UpdateBackend(UpdateBackendRequest) returns (UpdateBackendResponse);
rpc DeleteBackend(DeleteBackendRequest) returns (DeleteBackendResponse);
rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse);
rpc GetBackend(GetBackendRequest) returns (GetBackendResponse);
}

message RegisterBackendRequest {
Backend backend = 1;
}

message RegisterBackendResponse {
bool success = 1;
string error = 2;
}

message ListBackendsRequest {
optional BackendType type = 1; // Filter by type
repeated string names = 2; // Filter by specific names
}

message ListBackendsResponse {
repeated Backend backends = 1;
}

Frontend Registry Model

Frontend Resource

// Frontend represents a globally-unique frontend interface binding
message Frontend {
string name = 1; // Globally unique (e.g., "confluent-schema-registry-rest")
FrontendType type = 2; // Enum: REST, GRAPHQL, GRPC_WEB, etc.
FrontendConfig config = 3; // Type-specific configuration
repeated RouteMapping routes = 4; // REST route → pattern operation mappings
map<string, string> metadata = 5; // Operator annotations (api_version, compatibility)
int64 created_at = 6;
int64 updated_at = 7;
string created_by = 8;
}

enum FrontendType {
FRONTEND_TYPE_UNSPECIFIED = 0;
FRONTEND_TYPE_REST = 1; // HTTP/REST with JSON
FRONTEND_TYPE_GRAPHQL = 2; // GraphQL API
FRONTEND_TYPE_GRPC_WEB = 3; // gRPC-Web for browsers
FRONTEND_TYPE_SSE = 4; // Server-Sent Events
FRONTEND_TYPE_WEBSOCKET = 5; // WebSocket bidirectional
}

// FrontendConfig is a oneof containing type-specific configs
message FrontendConfig {
oneof config {
RestConfig rest = 1;
GraphQLConfig graphql = 2;
GrpcWebConfig grpc_web = 3;
SseConfig sse = 4;
WebSocketConfig websocket = 5;
}
}

Type-Specific Frontend Configs

message RestConfig {
string base_path = 1; // Base path prefix (e.g., "/api/v1")
string openapi_spec_url = 2; // Optional OpenAPI 3.0 spec URL
bool enable_cors = 3; // Enable CORS headers
repeated string allowed_origins = 4; // CORS allowed origins
RestAuthentication auth = 5; // REST auth config
map<string, string> headers = 6; // Default response headers
}

message RestAuthentication {
oneof auth {
BearerTokenAuth bearer = 1;
BasicAuth basic = 2;
ApiKeyAuth api_key = 3;
OAuthAuth oauth = 4;
}
}

message GraphQLConfig {
string endpoint = 1; // GraphQL endpoint path (e.g., "/graphql")
bool enable_playground = 2; // Enable GraphQL Playground UI
string schema_file = 3; // Path to GraphQL schema file
}

message GrpcWebConfig {
string endpoint = 1; // gRPC-Web endpoint (e.g., "/grpc")
bool text_format = 2; // Use text format vs binary
}

message SseConfig {
string endpoint = 1; // SSE endpoint (e.g., "/events")
int32 heartbeat_interval_ms = 2; // Heartbeat interval (default 30s)
}

message WebSocketConfig {
string endpoint = 1; // WebSocket endpoint (e.g., "/ws")
int32 ping_interval_ms = 2; // Ping interval (default 30s)
}

Route Mapping (OpenAPI-style)

// RouteMapping defines REST route → pattern RPC mapping
message RouteMapping {
string http_method = 1; // GET, POST, PUT, DELETE, PATCH
string path_pattern = 2; // /subjects/{subject}/versions/{version}
string pattern_rpc = 3; // Fully-qualified RPC (e.g., "prism.patterns.Registry.GetSchema")
repeated ParamMapping param_mappings = 4; // How to map HTTP params to RPC fields
ResponseMapping response = 5; // How to map RPC response to HTTP response
int32 timeout_ms = 6; // Per-route timeout override
}

// ParamMapping defines how HTTP params map to protobuf fields
message ParamMapping {
string source = 1; // "path", "query", "header", "body"
string source_name = 2; // HTTP param name (e.g., "subject")
string target_field = 3; // Protobuf field path (e.g., "request.subject")
string default_value = 4; // Optional default value
bool required = 5; // Whether param is required
}

// ResponseMapping defines how protobuf response maps to HTTP response
message ResponseMapping {
int32 status_code = 1; // HTTP status code (default 200)
string body_field = 2; // Which protobuf field to use as body (default: whole message)
map<string, string> header_mappings = 3; // Protobuf field → HTTP header mappings
}

Admin State Integration for Frontends

message AdminState {
int32 version = 1;

map<string, NamespaceEntry> namespaces = 2;
map<string, ProxyEntry> proxies = 3;
map<string, LauncherEntry> launchers = 4;
map<string, PatternEntry> patterns = 5;
map<string, BackendEntry> backends = 6;

// NEW: Frontend registry
map<string, FrontendEntry> frontends = 7; // key = frontend name

int64 last_applied_index = 10;
int64 last_applied_term = 11;
int64 state_updated_at = 12;
}

message FrontendEntry {
Frontend frontend = 1; // Full frontend config
string status = 2; // "active", "deprecated", "disabled"
int64 registered_at = 3;
int64 updated_at = 4;
}

Admin Commands for Frontends

enum CommandType {
// ... existing commands ...
COMMAND_TYPE_REGISTER_BACKEND = 7;
COMMAND_TYPE_UPDATE_BACKEND = 8;
COMMAND_TYPE_DELETE_BACKEND = 9;

// NEW: Frontend commands
COMMAND_TYPE_REGISTER_FRONTEND = 10;
COMMAND_TYPE_UPDATE_FRONTEND = 11;
COMMAND_TYPE_DELETE_FRONTEND = 12;
}

message RegisterFrontendCommand {
Frontend frontend = 1;
}

message UpdateFrontendCommand {
string name = 1; // Frontend to update
FrontendConfig config = 2; // New config
repeated RouteMapping routes = 3; // New route mappings
map<string, string> metadata = 4; // New metadata
}

message DeleteFrontendCommand {
string name = 1;
}

Admin API Extensions for Frontends

service ControlPlane {
// ... existing RPCs ...

// Backend management
rpc RegisterBackend(RegisterBackendRequest) returns (RegisterBackendResponse);
rpc UpdateBackend(UpdateBackendRequest) returns (UpdateBackendResponse);
rpc DeleteBackend(DeleteBackendRequest) returns (DeleteBackendResponse);
rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse);
rpc GetBackend(GetBackendRequest) returns (GetBackendResponse);

// NEW: Frontend management
rpc RegisterFrontend(RegisterFrontendRequest) returns (RegisterFrontendResponse);
rpc UpdateFrontend(UpdateFrontendRequest) returns (UpdateFrontendResponse);
rpc DeleteFrontend(DeleteFrontendRequest) returns (DeleteFrontendResponse);
rpc ListFrontends(ListFrontendsRequest) returns (ListFrontendsResponse);
rpc GetFrontend(GetFrontendRequest) returns (GetFrontendResponse);
}

message RegisterFrontendRequest {
Frontend frontend = 1;
}

message RegisterFrontendResponse {
bool success = 1;
string error = 2;
}

message ListFrontendsRequest {
optional FrontendType type = 1; // Filter by type
repeated string names = 2; // Filter by specific names
}

message ListFrontendsResponse {
repeated Frontend frontends = 1;
}

Pattern Configuration with Backend References

Pattern Definition with Slot Bindings

# Pattern configuration (in namespace config)
namespace: order-processing
pattern: multicast-registry

# Slot bindings: Map slot names → backend names
slot_bindings:
registry: postgres-primary # Registry slot uses PostgreSQL backend
messaging: kafka-prod # Messaging slot uses Kafka backend
durability: kafka-prod # Durability slot reuses same Kafka

# Pattern-specific config (non-backend settings)
pattern_config:
ttl_seconds: 300
max_identities: 10000

Equivalent Protobuf

message PatternEntry {
string pattern_id = 1;
string namespace = 2;
string pattern_type = 3; // "multicast-registry"
string launcher_id = 4;

// NEW: Slot bindings
map<string, string> slot_bindings = 5; // {"registry": "postgres-primary", "messaging": "kafka-prod"}

map<string, string> config = 6; // Pattern-specific config
string status = 7;
int64 assigned_at = 8;
int64 updated_at = 9;
}

Pattern Runner Startup Flow

sequenceDiagram
participant Launcher as Launcher
participant Pattern as Pattern Runner
participant Admin as Admin Control Plane
participant Backend as Backend (Kafka)

Note over Launcher,Backend: 1. Pattern Assignment
Admin->>Launcher: AssignPattern(pattern_id, namespace, slot_bindings)
Launcher->>Pattern: Start(pattern_config)

Note over Pattern,Admin: 2. Fetch Backend Configs
Pattern->>Admin: GetBackend("kafka-prod")
Admin-->>Pattern: Backend{type: KAFKA, config: KafkaConfig{...}}

Pattern->>Admin: GetBackend("postgres-primary")
Admin-->>Pattern: Backend{type: POSTGRES, config: PostgresConfig{...}}

Note over Pattern,Backend: 3. Bind Slots to Backends
Pattern->>Pattern: messaging_slot.bind(kafka_config)
Pattern->>Pattern: registry_slot.bind(postgres_config)

Note over Pattern,Backend: 4. Connect to Backends
Pattern->>Backend: Connect(brokers=["kafka:9092"])
Backend-->>Pattern: Connection established

Pattern->>Pattern: Start processing

Backend Binding in Pattern Code

Go Pattern Runner Implementation

// Pattern runner startup
func (r *MulticastRegistryRunner) Start(ctx context.Context, config *PatternConfig) error {
// 1. Extract slot bindings from pattern config
slotBindings := config.SlotBindings // {"registry": "postgres-primary", "messaging": "kafka-prod"}

// 2. Fetch backend configs from admin
backends := make(map[string]*Backend)
for slotName, backendName := range slotBindings {
backend, err := r.adminClient.GetBackend(ctx, backendName)
if err != nil {
return fmt.Errorf("failed to fetch backend %s for slot %s: %w", backendName, slotName, err)
}
backends[slotName] = backend
}

// 3. Bind slots to backends using type-specific drivers
registryBackend := backends["registry"]
switch registryBackend.Type {
case prism.BACKEND_TYPE_POSTGRES:
pgConfig := registryBackend.Config.GetPostgres()
r.registrySlot = NewPostgresRegistrySlot(pgConfig)
case prism.BACKEND_TYPE_REDIS:
redisConfig := registryBackend.Config.GetRedis()
r.registrySlot = NewRedisRegistrySlot(redisConfig)
default:
return fmt.Errorf("unsupported backend type for registry slot: %v", registryBackend.Type)
}

messagingBackend := backends["messaging"]
switch messagingBackend.Type {
case prism.BACKEND_TYPE_KAFKA:
kafkaConfig := messagingBackend.Config.GetKafka()
r.messagingSlot = NewKafkaMessagingSlot(kafkaConfig)
case prism.BACKEND_TYPE_NATS:
natsConfig := messagingBackend.Config.GetNats()
r.messagingSlot = NewNatsMessagingSlot(natsConfig)
default:
return fmt.Errorf("unsupported backend type for messaging slot: %v", messagingBackend.Type)
}

// 4. Connect all slots
if err := r.registrySlot.Connect(ctx); err != nil {
return fmt.Errorf("registry slot connect failed: %w", err)
}
if err := r.messagingSlot.Connect(ctx); err != nil {
return fmt.Errorf("messaging slot connect failed: %w", err)
}

return nil
}

Shared Backend Binding Logic

To reduce duplication across pattern runners, we can provide a generic slot binder:

// pkg/pattern/slot_binder.go

type SlotBinder struct {
adminClient admin.ControlPlaneClient
}

func NewSlotBinder(adminClient admin.ControlPlaneClient) *SlotBinder {
return &SlotBinder{adminClient: adminClient}
}

// BindSlot fetches backend config and creates slot implementation
func (b *SlotBinder) BindSlot(ctx context.Context, slotName, backendName string, slotType SlotType) (Slot, error) {
// 1. Fetch backend config
backend, err := b.adminClient.GetBackend(ctx, backendName)
if err != nil {
return nil, fmt.Errorf("failed to fetch backend %s: %w", backendName, err)
}

// 2. Create slot implementation based on backend type and slot type
switch slotType {
case SlotTypeRegistry:
return b.createRegistrySlot(backend)
case SlotTypeMessaging:
return b.createMessagingSlot(backend)
case SlotTypeDurability:
return b.createDurabilitySlot(backend)
default:
return nil, fmt.Errorf("unknown slot type: %v", slotType)
}
}

func (b *SlotBinder) createMessagingSlot(backend *Backend) (Slot, error) {
switch backend.Type {
case prism.BACKEND_TYPE_KAFKA:
return NewKafkaMessagingSlot(backend.Config.GetKafka()), nil
case prism.BACKEND_TYPE_NATS:
return NewNatsMessagingSlot(backend.Config.GetNats()), nil
default:
return nil, fmt.Errorf("backend type %v does not support messaging slot", backend.Type)
}
}

Admin CLI for Backend Management

# Register a new backend
prism-admin backend register kafka-prod \
--type kafka \
--brokers kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--security-protocol SASL_SSL \
--metadata region=us-west-2,env=production

# List all backends
prism-admin backend list
# Output:
# NAME TYPE CAPABILITIES STATUS CREATED
# kafka-prod kafka [publish,subscribe] active 2025-10-18
# postgres-primary postgres [get,set,scan,delete] active 2025-10-18
# nats-dev nats [publish,subscribe] active 2025-10-18
# memstore-local memstore [get,set,delete] active 2025-10-18

# Get backend details
prism-admin backend get kafka-prod
# Output:
# Name: kafka-prod
# Type: kafka
# Brokers: kafka-01:9092, kafka-02:9092, kafka-03:9092
# Security: SASL_SSL
# Capabilities: publish, subscribe
# Metadata:
# region: us-west-2
# env: production
# Status: active
# Created: 2025-10-18 10:30:00

# Update backend config
prism-admin backend update kafka-prod --brokers kafka-01:9092,kafka-04:9092

# Delete backend
prism-admin backend delete kafka-dev
# Warning: Backend 'kafka-dev' is used by 3 patterns:
# - order-processing (messaging slot)
# - user-notifications (queue slot)
# - analytics-ingest (streaming slot)
# Are you sure? [y/N]:

Concrete Example: Registry Pattern as Confluent Schema Registry API

This section demonstrates how to expose Prism's registry pattern as a Confluent-compatible Schema Registry REST API using frontend bindings.

Pattern: Registry (gRPC)

The registry pattern exposes these gRPC RPCs:

service RegistryService {
rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse);
rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
rpc ListSchemas(ListSchemasRequest) returns (ListSchemasResponse);
rpc DeleteSchema(DeleteSchemaRequest) returns (DeleteSchemaResponse);
rpc CheckCompatibility(CheckCompatibilityRequest) returns (CheckCompatibilityResponse);
}

message RegisterSchemaRequest {
string subject = 1;
string schema = 2;
string schema_type = 3; // PROTOBUF, JSON, AVRO
}

message RegisterSchemaResponse {
int32 id = 1;
int32 version = 2;
}

message GetSchemaRequest {
string subject = 1;
int32 version = 2;
}

message GetSchemaResponse {
int32 id = 1;
int32 version = 2;
string schema = 3;
string schema_type = 4;
}

Frontend: Confluent Schema Registry REST API

Register a frontend that maps Confluent's REST API to the registry pattern:

name: confluent-schema-registry-rest
type: REST
config:
rest:
base_path: /
enable_cors: true
allowed_origins: ["*"]
auth:
bearer:
issuer: "https://auth.example.com"
audience: "schema-registry"

# Route mappings: Confluent REST → Registry gRPC
routes:
# POST /subjects/{subject}/versions → RegisterSchema
- http_method: POST
path_pattern: /subjects/{subject}/versions
pattern_rpc: prism.patterns.RegistryService.RegisterSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: body
source_name: schema
target_field: schema
required: true
- source: body
source_name: schemaType
target_field: schema_type
default_value: PROTOBUF
response:
status_code: 200
# Body is whole RegisterSchemaResponse: {"id": 1, "version": 1}

# GET /subjects/{subject}/versions/{version} → GetSchema
- http_method: GET
path_pattern: /subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.GetSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: version
required: true
response:
status_code: 200

# GET /subjects/{subject}/versions → ListSchemas (filtered by subject)
- http_method: GET
path_pattern: /subjects/{subject}/versions
pattern_rpc: prism.patterns.RegistryService.ListSchemas
param_mappings:
- source: path
source_name: subject
target_field: subject_filter
required: true
response:
status_code: 200
body_field: versions # Extract versions array from ListSchemasResponse

# DELETE /subjects/{subject}/versions/{version} → DeleteSchema
- http_method: DELETE
path_pattern: /subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.DeleteSchema
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: version
required: true
response:
status_code: 200
body_field: deleted_version # Return just the version number

# POST /compatibility/subjects/{subject}/versions/{version} → CheckCompatibility
- http_method: POST
path_pattern: /compatibility/subjects/{subject}/versions/{version}
pattern_rpc: prism.patterns.RegistryService.CheckCompatibility
param_mappings:
- source: path
source_name: subject
target_field: subject
required: true
- source: path
source_name: version
target_field: base_version
required: true
- source: body
source_name: schema
target_field: new_schema
required: true
- source: body
source_name: schemaType
target_field: schema_type
default_value: PROTOBUF
response:
status_code: 200
# Body: {"is_compatible": true}

metadata:
api_version: "v7"
compatibility_mode: "confluent"
documentation_url: "https://docs.confluent.io/platform/current/schema-registry/develop/api.html"

Namespace Configuration with Frontend Binding

namespace: schema-registry
pattern: registry

# Backend slots
slot_bindings:
registry: postgres-primary # Store schemas in PostgreSQL
cache: redis-prod # Cache recently accessed schemas

# Frontend bindings
frontend_bindings:
- name: confluent-schema-registry-rest
enabled: true
- name: default-grpc
enabled: true # Keep gRPC interface as well

# Pattern-specific config
pattern_config:
max_schemas_per_subject: 100
compatibility_check: backward # Default compatibility mode

Proxy Request Handling Flow

sequenceDiagram
participant Client as Confluent Client
participant Proxy as Prism Proxy (REST Adapter)
participant Pattern as Registry Pattern Runner
participant Backend as PostgreSQL

Note over Client,Backend: 1. REST Request (Confluent API)
Client->>Proxy: POST /subjects/orders.created/versions
Client->>Proxy: {"schema": "...", "schemaType": "PROTOBUF"}

Note over Proxy: 2. Route Matching & Param Extraction
Proxy->>Proxy: Match route: POST /subjects/{subject}/versions
Proxy->>Proxy: Extract params: subject="orders.created", schema="...", schemaType="PROTOBUF"

Note over Proxy: 3. Build gRPC Request
Proxy->>Proxy: Map to RegisterSchemaRequest{subject, schema, schema_type}

Note over Proxy,Pattern: 4. Call Pattern Runner via gRPC
Proxy->>Pattern: RegisterSchema(RegisterSchemaRequest)

Note over Pattern,Backend: 5. Pattern Executes Against Backend
Pattern->>Backend: INSERT INTO schemas (subject, schema, type, version) VALUES (...)
Backend-->>Pattern: id=1, version=1

Pattern-->>Proxy: RegisterSchemaResponse{id: 1, version: 1}

Note over Proxy: 6. Map gRPC Response to REST
Proxy->>Proxy: Convert to JSON: {"id": 1, "version": 1}

Note over Proxy,Client: 7. REST Response (Confluent Format)
Proxy-->>Client: 200 OK
Proxy-->>Client: {"id": 1, "version": 1}

Equivalent Protobuf Representation

message NamespaceEntry {
string name = 1; // "schema-registry"
string pattern_type = 2; // "registry"

// Backend slot bindings
map<string, string> slot_bindings = 3; // {"registry": "postgres-primary", "cache": "redis-prod"}

// NEW: Frontend bindings
repeated FrontendBinding frontend_bindings = 4;

map<string, string> pattern_config = 5;
}

message FrontendBinding {
string name = 1; // "confluent-schema-registry-rest"
bool enabled = 2; // true
map<string, string> overrides = 3; // Optional namespace-specific overrides
}

Client Usage (Confluent SDK)

# Python client using Confluent Schema Registry SDK
from confluent_kafka.schema_registry import SchemaRegistryClient

# Point to Prism proxy's frontend binding
schema_registry = SchemaRegistryClient({
'url': 'http://prism-proxy.example.com', # Prism proxy with frontend binding
'basic.auth.user.info': 'token:secret'
})

# Register schema (goes through Prism's REST → gRPC translation)
schema_str = """
syntax = "proto3";
message Order {
string order_id = 1;
int64 amount = 2;
}
"""

schema_id = schema_registry.register_schema(
'orders.created',
Schema(schema_str, 'PROTOBUF')
)
# → POST /subjects/orders.created/versions
# → Prism translates to RegistryService.RegisterSchema gRPC
# → Returns: {"id": 1, "version": 1}

# Get schema by version
schema = schema_registry.get_version('orders.created', 1)
# → GET /subjects/orders.created/versions/1
# → Prism translates to RegistryService.GetSchema gRPC

Benefits of This Approach

  1. API Compatibility: Existing Confluent clients work without modification
  2. Backend Flexibility: Registry pattern can use PostgreSQL, Redis, or any backend with KV + scan interfaces
  3. Protocol Translation: Proxy handles HTTP → gRPC conversion transparently
  4. Centralized Management: Frontend config is admin-managed, not embedded in application code
  5. Multi-Protocol Support: Same pattern accessible via both REST (Confluent API) and gRPC (native)
  6. Observability: Proxy can log/trace REST requests and correlate with gRPC calls

Pattern Slot Schema Integration (MEMO-006)

Building on MEMO-006's backend interface decomposition, patterns declare required/optional backend interfaces per slot:

Pattern Slot Definition (from MEMO-006)

pattern: multicast-registry
version: v1

slots:
registry:
description: "Stores identity → metadata mappings"
required_interfaces:
- keyvalue_basic # MUST implement basic KV operations
- keyvalue_scan # MUST support enumeration
optional_interfaces:
- keyvalue_ttl # Nice to have: auto-expire offline identities
recommended_backends:
- redis # Has all 3 interfaces
- postgres # Has basic + scan (no TTL)
- dynamodb # Has all 3 interfaces

messaging:
description: "Delivers multicast messages to identities"
required_interfaces:
- pubsub_basic # MUST implement basic pub/sub
optional_interfaces:
- pubsub_persistent # Nice to have: durable delivery
recommended_backends:
- nats
- redis
- kafka

Validation at Runtime

When pattern runner binds backend to slot, validate backend implements required interfaces:

func (b *SlotBinder) BindSlot(ctx context.Context, slotName, backendName string, slotSchema SlotSchema) (Slot, error) {
// 1. Fetch backend config
backend, err := b.adminClient.GetBackend(ctx, backendName)
if err != nil {
return nil, fmt.Errorf("backend %s not found: %w", backendName, err)
}

// 2. Validate backend implements required interfaces
for _, requiredInterface := range slotSchema.RequiredInterfaces {
if !backend.ImplementsInterface(requiredInterface) {
return nil, fmt.Errorf(
"backend %s (type %v) does not implement required interface %s for slot %s",
backendName, backend.Type, requiredInterface, slotName,
)
}
}

// 3. Warn if optional interfaces missing
for _, optionalInterface := range slotSchema.OptionalInterfaces {
if !backend.ImplementsInterface(optionalInterface) {
log.Warnf("Backend %s missing optional interface %s for slot %s (degraded functionality)",
backendName, optionalInterface, slotName)
}
}

// 4. Create slot implementation
return b.createSlot(backend, slotSchema)
}

Backend Capability Metadata

Backends declare which interfaces they implement (per MEMO-006):

backend: redis
type: REDIS
capabilities:
- keyvalue_basic
- keyvalue_scan
- keyvalue_ttl
- keyvalue_transactional
- keyvalue_batch
- pubsub_basic
- pubsub_wildcards
- stream_basic
- stream_consumer_groups
# ... 16 total interfaces

Stored in protobuf:

message Backend {
string name = 1;
BackendType type = 2;
BackendConfig config = 3;
repeated string capabilities = 4; // ["keyvalue_basic", "keyvalue_scan", ...]
map<string, string> metadata = 5;
}

Compatibility with Existing Patterns

Migration Path

Existing patterns that embed backend connection details in their config can migrate gradually:

Before (embedded config):

namespace: order-processing
pattern: multicast-registry
config:
registry_backend:
type: postgres
host: postgres.example.com
port: 5432
database: prism
messaging_backend:
type: kafka
brokers: ["kafka:9092"]

After (backend references):

namespace: order-processing
pattern: multicast-registry
slot_bindings:
registry: postgres-primary
messaging: kafka-prod

Migration steps:

  1. Admin registers backends: prism-admin backend register postgres-primary ...
  2. Update pattern config to use slot_bindings
  3. Pattern runner detects slot_bindings and uses backend registry
  4. Old config.registry_backend is ignored

Backward Compatibility

Pattern runners can support both styles:

func (r *Runner) Start(ctx context.Context, config *PatternConfig) error {
if len(config.SlotBindings) > 0 {
// NEW: Use backend registry
return r.startWithBackendRegistry(ctx, config)
} else {
// OLD: Use embedded config
return r.startWithEmbeddedConfig(ctx, config)
}
}

Implementation Plan

Phase 1: Protobuf Definitions (Week 1)

Backend Registry:

  • Add Backend, BackendConfig, BackendType messages to proto
  • Add BackendEntry to AdminState
  • Add backend commands to CommandType enum
  • Add backend management RPCs to ControlPlane service

Frontend Registry:

  • Add Frontend, FrontendConfig, FrontendType messages to proto
  • Add RouteMapping, ParamMapping, ResponseMapping for route definitions
  • Add FrontendEntry to AdminState
  • Add frontend commands to CommandType enum
  • Add frontend management RPCs to ControlPlane service

Generate Code:

  • Generate Go code for all new proto definitions

Phase 2: Admin FSM Integration (Week 1-2)

Backend FSM:

  • Implement RegisterBackend, UpdateBackend, DeleteBackend commands in FSM
  • Add backend storage sync (persist to SQLite/PostgreSQL)
  • Update admin startup to load backends from storage
  • Add Raft tests for backend commands

Frontend FSM:

  • Implement RegisterFrontend, UpdateFrontend, DeleteFrontend commands in FSM
  • Add frontend storage sync (persist to SQLite/PostgreSQL)
  • Update admin startup to load frontends from storage
  • Add Raft tests for frontend commands

Phase 3: Admin API Implementation (Week 2)

Backend APIs:

  • Implement RegisterBackend, UpdateBackend, DeleteBackend RPCs
  • Implement ListBackends, GetBackend RPCs
  • Add validation (unique names, valid configs)
  • Add admin CLI commands for backend management

Frontend APIs:

  • Implement RegisterFrontend, UpdateFrontend, DeleteFrontend RPCs
  • Implement ListFrontends, GetFrontend RPCs
  • Add validation (unique names, valid route mappings)
  • Add admin CLI commands for frontend management

Phase 4: Pattern Runner Integration (Week 3)

Backend Slot Binding:

  • Create SlotBinder utility for fetching and binding backends
  • Implement type-specific slot factories (Kafka, NATS, PostgreSQL, etc.)
  • Add slot schema validation (required/optional interfaces from MEMO-006)
  • Update pattern runners to support slot_bindings config
  • Add backward compatibility for embedded configs

Pattern Slot Schemas:

  • Define slot schemas for all existing patterns (multicast-registry, etc.)
  • Add interface requirements per slot (keyvalue_basic, pubsub_basic, etc.)
  • Validate backend capabilities against slot requirements at runtime

Phase 5: Proxy Frontend Integration (Week 3-4)

REST Adapter:

  • Create REST adapter middleware in Rust proxy
  • Implement route matching (path patterns with variables)
  • Implement parameter mapping (path/query/header/body → protobuf fields)
  • Implement HTTP → gRPC protocol translation
  • Implement gRPC → HTTP response mapping
  • Add CORS support
  • Add REST authentication (Bearer, Basic, API Key)

Frontend Configuration Loading:

  • Proxy fetches frontend configs from admin at startup
  • Proxy builds route table from frontend route mappings
  • Proxy registers HTTP handlers for frontend routes
  • Add hot-reload support for frontend config changes (future)

Phase 6: Concrete Example Implementation (Week 4)

Confluent Schema Registry Frontend:

  • Implement registry pattern with schema storage slot
  • Define Confluent-compatible REST route mappings
  • Register confluent-schema-registry-rest frontend in admin
  • Test with Confluent Python SDK client
  • Validate compatibility with Confluent REST API spec

Phase 7: Testing (Week 4-5)

Backend Tests:

  • Unit tests for backend validation
  • Integration tests with prism-admin backend CRUD
  • End-to-end test: Register backend → Assign pattern → Pattern runner binds slots
  • Test multiple patterns sharing same backend
  • Test slot schema validation (missing required interfaces)

Frontend Tests:

  • Unit tests for route matching and parameter mapping
  • Integration tests with prism-admin frontend CRUD
  • End-to-end test: Register frontend → Proxy loads routes → REST request → gRPC call
  • Test Confluent Schema Registry compatibility
  • Test multiple frontends on same namespace

Phase 8: Documentation (Week 5)

Backend Documentation:

  • Update pattern documentation with slot binding examples
  • Write operator guide for backend management
  • Create backend configuration templates for common setups
  • Document slot schema validation and interface requirements

Frontend Documentation:

  • Write operator guide for frontend management
  • Document route mapping syntax (OpenAPI-style)
  • Create frontend configuration templates (REST, GraphQL, etc.)
  • Document Confluent Schema Registry example
  • Update quickstart to include both backend and frontend registry

Alternatives Considered

Alternative 1: Per-Namespace Backend Configs

Approach: Each namespace defines its own backend configs (not globally shared).

Pros:

  • Namespace isolation
  • No global naming conflicts

Cons:

  • Massive duplication (every namespace configures "Kafka")
  • No shared backend updates
  • Harder to track which namespaces use which backends

Rejected: Violates DRY principle and makes operations harder.

Alternative 2: Backend Auto-Discovery

Approach: Pattern runners discover backends via service discovery (Consul, etc.).

Pros:

  • Dynamic backend discovery
  • No admin config needed

Cons:

  • Requires external service discovery infrastructure
  • Pattern runners need to know backend names without explicit config
  • No centralized governance

Rejected: Adds complexity and loses centralized control.

Alternative 3: Backend Config in Pattern Code

Approach: Hard-code backend connection details in pattern runner binaries.

Pros:

  • Simple deployment (no config needed)

Cons:

  • Requires recompilation to change backends
  • No multi-environment support (dev vs prod)
  • Security risk (credentials in binaries)

Rejected: Fundamentally wrong for production systems.

Success Criteria

  1. Backend CRUD: Admin can register, update, delete, list backends via CLI/API
  2. Slot Binding: Pattern runners successfully bind slots to backends using backend registry
  3. Shared Backends: Multiple patterns use the same backend (e.g., 3 patterns share kafka-prod)
  4. Zero Downtime Updates: Updating backend config doesn't require pattern restart (future enhancement)
  5. Migration: Existing patterns can migrate from embedded configs to backend registry
  6. Observability: Admin dashboard shows which patterns use which backends

External References

Open Questions

  1. Secret management: How do we securely store backend credentials (passwords, API keys)?

    • Proposed: Integrate with Kubernetes Secrets or HashiCorp Vault
    • Short-term: Store encrypted in admin database with key from env var
  2. Backend versioning: What happens when we update a backend config while patterns are running?

    • Proposed: Pattern runners cache config at startup; updates require restart
    • Future: Support hot-reload with connection pool refresh
  3. Multi-region backends: How do we model backends that span regions?

    • Proposed: Create separate backend entries per region (kafka-prod-usw2, kafka-prod-euw1)
    • Future: Add region-awareness to backend config
  4. Backend health: Should admin monitor backend health and mark unhealthy backends?

    • Proposed: Phase 2 feature—add health checks and status updates
    • Initial: Backends are always assumed available
  5. Namespace-specific overrides: Should namespaces override global backend configs?

    • Proposed: No—keep it simple; create separate backends if needed
    • Example: Instead of overriding kafka-prod, create kafka-prod-critical for high-priority namespaces

Revision History

  • 2025-11-14: Added cross-references to RFC-056 unified configuration model
  • 2025-10-18: Initial draft