Skip to main content

MEMO-042: Consolidated Pattern Protocols Implementation Strategy

Purpose

Provide tactical implementation guidance for RFC-046 (Consolidated Pattern Protocols). This memo breaks down the architectural changes into concrete, executable tasks with clear success criteria, test strategies, and rollout plans.

Related: RFC-046: Consolidated Pattern Protocols

Executive Summary

What: Consolidate pattern protocols from multiple backend-level services into single semantic pattern services.

Why: Current architecture leaks backend details to clients, has no capability negotiation, and makes patterns hard to evolve.

How: 10-week phased implementation with backward compatibility, comprehensive testing, and gradual client migration.

Risk: Medium - requires coordination across proto, Go patterns, Rust proxy, and client SDKs. Mitigated by maintaining backward compatibility during transition.

Current State Analysis

Architecture Gaps

ComponentCurrent StateProblemTarget State
KeyValue PatternExposes 4 separate servicesClient must handle 4 importsSingle KeyValuePattern service
Multicast RegistryNo proto definitionPattern logic not in protocolComplete proto with 3-slot schema
Capability DiscoveryRuntime try/catchClients don't know featuresGetCapabilities() RPC
Backend SlotsImplicit Go interfacesNot validated or documentedFormal slot schema in proto
Graceful DegradationClient-side fallbacksInconsistent error handlingPattern-side emulation/errors

Code Impact Analysis

Proto Changes:

  • Create: proto/prism/patterns/ directory (6 new files)
  • Modify: None (additive only)
  • Delete: None (deprecate in-place)

Go Pattern Changes (per pattern):

  • New: pattern_service.go (consolidated gRPC service impl)
  • New: capabilities.go (capability detection logic)
  • New: emulation.go (graceful degradation for missing features)
  • Modify: grpc_server.go (register new service alongside old)
  • Modify: *_test.go (add capability tests)

Rust Proxy Changes:

  • Modify: Client code to use new pattern services
  • New: Capability discovery at client initialization
  • Modify: Error handling for unsupported operations

Estimated Lines of Code:

  • Proto: ~2000 lines (6 pattern protos + slot schema)
  • Go: ~5000 lines (5 patterns × ~1000 lines each)
  • Rust: ~1500 lines (client adaptations)
  • Tests: ~3000 lines (integration tests for all patterns)
  • Total: ~11,500 lines

Detailed Implementation Plan

Phase 1: Proto Foundation (Week 1)

Goal: Create all proto definitions and validate they compile.

Tasks

1.1 Create Directory Structure

mkdir -p proto/prism/patterns/{keyvalue,multicast_registry,session_store,producer,consumer,schemas}

1.2 Write Proto Files

Priority order (most complex first):

  1. patterns/schemas/slot_schema.proto (foundation for others)
  2. patterns/keyvalue/keyvalue_pattern.proto (40% of LOC from RFC)
  3. patterns/multicast_registry/multicast_registry_pattern.proto (35% of LOC)
  4. patterns/session_store/session_store_pattern.proto (10% of LOC)
  5. patterns/producer/producer_pattern.proto (8% of LOC)
  6. patterns/consumer/consumer_pattern.proto (7% of LOC)

1.3 Generate Code

# Add to proto/Makefile
generate-patterns:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
prism/patterns/**/*.proto

# Run generation
make generate-patterns

1.4 Validation

# Proto compilation
task proto-lint

# Generated code builds
cd pkg/plugin/gen/prism/patterns/keyvalue
go build .

# gRPC services have correct signatures
go doc KeyValuePatternServer

Success Criteria

  • All 6 proto files compile without errors
  • Generated Go code passes go vet and golangci-lint
  • All gRPC service interfaces exported
  • Documentation comments generated correctly

Estimated Effort

  • Proto writing: 3 days (use RFC-046 as template)
  • Code generation setup: 1 day
  • Validation and fixes: 1 day
  • Total: 5 days

Phase 2: KeyValue Pattern Migration (Weeks 2-3)

Goal: Implement consolidated KeyValuePattern service with full capability negotiation.

Tasks

2.1 Implement Pattern Service (patterns/keyvalue/pattern_service.go)

// patterns/keyvalue/pattern_service.go
package keyvalue

import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/keyvalue"
)

// PatternService implements the consolidated KeyValuePattern gRPC service
type PatternService struct {
pb.UnimplementedKeyValuePatternServer
kv *KeyValue // Existing KeyValue implementation
caps *pb.Capabilities // Cached capabilities
}

func NewPatternService(kv *KeyValue) (*PatternService, error) {
caps, err := detectCapabilities(kv)
if err != nil {
return nil, err
}

return &PatternService{
kv: kv,
caps: caps,
}, nil
}

// GetCapabilities returns pattern capabilities (cached at startup)
func (s *PatternService) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.Capabilities, error) {
return s.caps, nil
}

// Store is the semantic key-value set operation
func (s *PatternService) Store(ctx context.Context, req *pb.StoreRequest) (*pb.StoreResponse, error) {
// Delegate to existing KeyValue.Set() implementation
err := s.kv.Set(ctx, req.Key, req.Value)
if err != nil {
return &pb.StoreResponse{Success: false, Error: proto.String(err.Error())}, nil
}

// Handle inline expiration if requested
if req.ExpirationSeconds != nil {
if s.caps.SupportsExpiration {
err = s.kv.Expire(ctx, req.Key, *req.ExpirationSeconds)
if err != nil {
return &pb.StoreResponse{Success: false, Error: proto.String(err.Error())}, nil
}
} else {
return &pb.StoreResponse{
Success: false,
Error: proto.String("expiration not supported by backend"),
}, nil
}
}

return &pb.StoreResponse{Success: true}, nil
}

// Retrieve is the semantic key-value get operation
func (s *PatternService) Retrieve(ctx context.Context, req *pb.RetrieveRequest) (*pb.RetrieveResponse, error) {
value, found, err := s.kv.Get(ctx, req.Key)
if err != nil {
return &pb.RetrieveResponse{Found: false, Error: proto.String(err.Error())}, nil
}
if !found {
return &pb.RetrieveResponse{Found: false}, nil
}
return &pb.RetrieveResponse{Found: true, Value: value}, nil
}

// Scan with graceful degradation
func (s *PatternService) Scan(req *pb.ScanRequest, stream pb.KeyValuePattern_ScanServer) error {
if !s.caps.SupportsScan {
return status.Error(codes.Unimplemented, "scan not supported: backend does not implement KeyValueScanInterface")
}

// Check performance characteristics and warn if emulated
if s.caps.ScanPerformance == pb.ScanPerformance_SCAN_EMULATED {
// Log warning but proceed
log.Warn("Scan is emulated, performance may be degraded")
}

// Delegate to backend scan implementation
return s.kv.Scan(req, stream)
}

// ... implement remaining operations

2.2 Implement Capability Detection (patterns/keyvalue/capabilities.go)

// patterns/keyvalue/capabilities.go
package keyvalue

import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/keyvalue"
)

func detectCapabilities(kv *KeyValue) (*pb.Capabilities, error) {
caps := &pb.Capabilities{
Slots: &pb.SlotConfiguration{},
}

// Detect backend type
backendType := kv.GetBackendType()
caps.Slots.PrimaryBackendType = backendType

// Detect interfaces implemented by backend
interfaces := kv.GetImplementedInterfaces()
caps.Slots.PrimaryInterfaces = interfaces

// Feature detection based on interfaces
caps.SupportsBatch = contains(interfaces, "KeyValueBatchInterface")
caps.SupportsScan = contains(interfaces, "KeyValueScanInterface")
caps.SupportsExpiration = contains(interfaces, "KeyValueTTLInterface")
caps.SupportsTransactions = contains(interfaces, "KeyValueTransactionalInterface")

// Performance characteristics based on backend type
caps.ScanPerformance = detectScanPerformance(backendType)
caps.BatchPerformance = detectBatchPerformance(backendType)

// Limitations based on backend
caps.MaxKeySize = getMaxKeySize(backendType)
caps.MaxValueSize = getMaxValueSize(backendType)

return caps, nil
}

func detectScanPerformance(backendType string) pb.ScanPerformance {
switch backendType {
case "redis", "postgres", "memstore":
return pb.ScanPerformance_SCAN_NATIVE
case "dynamodb":
return pb.ScanPerformance_SCAN_EMULATED
case "s3", "minio":
return pb.ScanPerformance_SCAN_UNAVAILABLE
default:
return pb.ScanPerformance_SCAN_UNAVAILABLE
}
}

func detectBatchPerformance(backendType string) pb.BatchPerformance {
switch backendType {
case "redis", "postgres", "dynamodb":
return pb.BatchPerformance_BATCH_NATIVE
case "memstore":
return pb.BatchPerformance_BATCH_PIPELINE
case "s3", "minio":
return pb.BatchPerformance_BATCH_UNAVAILABLE
default:
return pb.BatchPerformance_BATCH_UNAVAILABLE
}
}

func getMaxKeySize(backendType string) *int64 {
limits := map[string]int64{
"redis": 512 * 1024 * 1024, // 512MB
"postgres": 8192, // 8KB
"memstore": 1024 * 1024, // 1MB
"dynamodb": 2048, // 2KB
"s3": 1024, // 1KB
}
if limit, ok := limits[backendType]; ok {
return &limit
}
return nil
}

2.3 Implement Graceful Degradation (patterns/keyvalue/emulation.go)

// patterns/keyvalue/emulation.go
package keyvalue

// Emulate scan for backends that support Query but not Scan (e.g., DynamoDB)
func (s *PatternService) emulateScan(req *pb.ScanRequest, stream pb.KeyValuePattern_ScanServer) error {
// DynamoDB example: use Query with pagination
var cursor *string
if req.Cursor != nil && *req.Cursor != "" {
cursor = req.Cursor
}

limit := int32(100)
if req.Limit != nil {
limit = *req.Limit
}

// Query with pagination
keys, values, nextCursor, hasMore, err := s.kv.QueryWithPagination(
stream.Context(),
req.Prefix,
cursor,
limit,
req.IncludeValues != nil && *req.IncludeValues,
)
if err != nil {
return err
}

// Send response
response := &pb.ScanResponse{
Keys: keys,
Values: values,
HasMore: hasMore,
}
if nextCursor != nil {
response.NextCursor = nextCursor
}

return stream.Send(response)
}

// Emulate batch operations by pipelining individual operations
func (s *PatternService) emulateBatchSet(ctx context.Context, req *pb.StoreBatchRequest) (*pb.StoreBatchResponse, error) {
results := make([]*pb.StoreResponse, len(req.Requests))
successCount := int32(0)

// Execute in parallel with bounded concurrency
sem := make(chan struct{}, 10) // Max 10 concurrent operations
var wg sync.WaitGroup
var mu sync.Mutex

for i, storeReq := range req.Requests {
wg.Add(1)
sem <- struct{}{}

go func(idx int, r *pb.StoreRequest) {
defer wg.Done()
defer func() { <-sem }()

resp, err := s.Store(ctx, r)

mu.Lock()
results[idx] = resp
if err == nil && resp.Success {
successCount++
}
mu.Unlock()
}(i, storeReq)
}

wg.Wait()

return &pb.StoreBatchResponse{
Results: results,
AllSuccess: successCount == int32(len(req.Requests)),
SuccessCount: successCount,
}, nil
}

2.4 Update gRPC Server Registration

// patterns/keyvalue/grpc_server.go
func NewGRPCServer(kv *KeyValue, port int) (*GRPCServer, error) {
// ... existing setup ...

// BACKWARD COMPATIBILITY: Register old services
basicService := &KeyValueBasicService{kv: kv}
pb_kv.RegisterKeyValueBasicInterfaceServer(grpcServer, basicService)

ttlService := &KeyValueTTLService{kv: kv}
pb_kv.RegisterKeyValueTTLInterfaceServer(grpcServer, ttlService)

if kv.SupportsScan() {
scanService := &KeyValueScanService{kv: kv}
pb_kv.RegisterKeyValueScanInterfaceServer(grpcServer, scanService)
}

// NEW: Register consolidated pattern service
patternService, err := NewPatternService(kv)
if err != nil {
return nil, fmt.Errorf("failed to create pattern service: %w", err)
}
pb_pattern.RegisterKeyValuePatternServer(grpcServer, patternService)

log.Printf("[KeyValue gRPC] ✅ Registered KeyValuePattern service (NEW)")
log.Printf("[KeyValue gRPC] ⚠️ Old services still available for backward compatibility")

return server, nil
}

2.5 Write Tests

// patterns/keyvalue/pattern_service_test.go
func TestGetCapabilities_Redis(t *testing.T) {
kv := setupRedisKeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)

caps, err := service.GetCapabilities(context.Background(), &pb.GetCapabilitiesRequest{})
require.NoError(t, err)

assert.True(t, caps.SupportsBatch)
assert.True(t, caps.SupportsScan)
assert.True(t, caps.SupportsExpiration)
assert.Equal(t, pb.ScanPerformance_SCAN_NATIVE, caps.ScanPerformance)
assert.Equal(t, "redis", caps.Slots.PrimaryBackendType)
}

func TestGetCapabilities_S3(t *testing.T) {
kv := setupS3KeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)

caps, err := service.GetCapabilities(context.Background(), &pb.GetCapabilitiesRequest{})
require.NoError(t, err)

assert.False(t, caps.SupportsBatch)
assert.False(t, caps.SupportsScan)
assert.False(t, caps.SupportsExpiration)
assert.Equal(t, pb.ScanPerformance_SCAN_UNAVAILABLE, caps.ScanPerformance)
assert.Equal(t, "s3", caps.Slots.PrimaryBackendType)
}

func TestScan_UnsupportedBackend(t *testing.T) {
kv := setupS3KeyValue(t) // S3 doesn't support scan
service, err := NewPatternService(kv)
require.NoError(t, err)

stream := &mockScanStream{}
err = service.Scan(&pb.ScanRequest{Prefix: proto.String("test:")}, stream)

assert.Error(t, err)
assert.Contains(t, err.Error(), "scan not supported")
assert.Equal(t, codes.Unimplemented, status.Code(err))
}

func TestStore_WithInlineExpiration_Supported(t *testing.T) {
kv := setupRedisKeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)

resp, err := service.Store(context.Background(), &pb.StoreRequest{
Key: "test:key",
Value: []byte("value"),
ExpirationSeconds: proto.Int64(60),
})

require.NoError(t, err)
assert.True(t, resp.Success)

// Verify expiration was set
ttl, err := kv.GetTTL(context.Background(), "test:key")
require.NoError(t, err)
assert.Greater(t, ttl, int64(50))
assert.LessOrEqual(t, ttl, int64(60))
}

func TestStore_WithInlineExpiration_Unsupported(t *testing.T) {
kv := setupPostgresKeyValue(t) // Postgres doesn't support TTL natively
service, err := NewPatternService(kv)
require.NoError(t, err)

resp, err := service.Store(context.Background(), &pb.StoreRequest{
Key: "test:key",
Value: []byte("value"),
ExpirationSeconds: proto.Int64(60),
})

require.NoError(t, err)
assert.False(t, resp.Success)
assert.Contains(t, *resp.Error, "expiration not supported")
}

Success Criteria

  • KeyValuePattern service fully implemented
  • GetCapabilities() correctly detects Redis, Postgres, MemStore, S3
  • Scan returns proper error for S3
  • Scan emulation works for DynamoDB (if implemented)
  • Batch emulation works for MemStore
  • All 20+ unit tests pass
  • Integration tests pass with 4 different backends
  • Backward compatibility maintained (old services still work)

Estimated Effort

  • Implementation: 5 days
  • Testing: 3 days
  • Integration validation: 2 days
  • Total: 10 days (2 weeks)

Phase 3: Multicast Registry Pattern Migration (Weeks 4-5)

Goal: Create proto definition for Multicast Registry (currently has none) and implement consolidated service.

Tasks

3.1 Implement Pattern Service (patterns/multicast_registry/pattern_service.go)

// patterns/multicast_registry/pattern_service.go
package multicast_registry

import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/multicast_registry"
)

type PatternService struct {
pb.UnimplementedMulticastRegistryPatternServer
coordinator *Coordinator // Existing implementation
caps *pb.Capabilities
}

func NewPatternService(coordinator *Coordinator) (*PatternService, error) {
caps, err := detectCapabilities(coordinator)
if err != nil {
return nil, err
}

return &PatternService{
coordinator: coordinator,
caps: caps,
}, nil
}

func (s *PatternService) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.Capabilities, error) {
return s.caps, nil
}

func (s *PatternService) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error) {
// Convert protobuf.Struct to map[string]interface{}
metadata := req.Metadata.AsMap()

// Convert duration to time.Duration
ttl := time.Duration(0)
if req.Ttl != nil {
ttl = req.Ttl.AsDuration()
}

// Delegate to coordinator
err := s.coordinator.Register(ctx, req.Identity, metadata, ttl)
if err != nil {
return &pb.RegisterResponse{Success: false, Error: proto.String(err.Error())}, nil
}

return &pb.RegisterResponse{Success: true, Identity: req.Identity}, nil
}

func (s *PatternService) Enumerate(ctx context.Context, req *pb.EnumerateRequest) (*pb.EnumerateResponse, error) {
// Convert proto filter to internal filter format
filter := convertFilter(req.Filter)

// Delegate to coordinator
identities, err := s.coordinator.Enumerate(ctx, filter)
if err != nil {
return nil, err
}

// Convert to proto format
pbIdentities := make([]*pb.Identity, len(identities))
for i, id := range identities {
pbIdentities[i] = &pb.Identity{
Identity: id.ID,
Metadata: structpb.NewStructValue(id.Metadata),
Tags: id.Tags,
RegisteredAt: id.RegisteredAt.Unix(),
}
if id.ExpiresAt != nil {
pbIdentities[i].ExpiresAt = proto.Int64(id.ExpiresAt.Unix())
}
}

return &pb.EnumerateResponse{
Identities: pbIdentities,
TotalCount: int32(len(identities)),
HasMore: false,
}, nil
}

func (s *PatternService) Multicast(ctx context.Context, req *pb.MulticastRequest) (*pb.MulticastResponse, error) {
filter := convertFilter(req.Filter)

// Set timeout if specified
if req.TimeoutMs != nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(*req.TimeoutMs)*time.Millisecond)
defer cancel()
}

// Delegate to coordinator
resp, err := s.coordinator.Multicast(ctx, filter, req.Payload)
if err != nil {
return &pb.MulticastResponse{Success: false, Error: proto.String(err.Error())}, nil
}

// Convert results
pbResults := make([]*pb.DeliveryResult, len(resp.Results))
for i, r := range resp.Results {
pbResults[i] = &pb.DeliveryResult{
Identity: r.Identity,
Status: convertDeliveryStatus(r.Status),
LatencyMs: int32(r.Latency.Milliseconds()),
}
if r.Error != nil {
pbResults[i].Error = proto.String(r.Error.Error())
}
}

return &pb.MulticastResponse{
Success: resp.DeliveredCount == resp.TargetCount,
TargetCount: int32(resp.TargetCount),
DeliveredCount: int32(resp.DeliveredCount),
FailedCount: int32(resp.FailedCount),
Results: pbResults,
}, nil
}

3.2 Implement Capability Detection

// patterns/multicast_registry/capabilities.go
package multicast_registry

func detectCapabilities(coord *Coordinator) (*pb.Capabilities, error) {
caps := &pb.Capabilities{
Slots: &pb.SlotConfiguration{},
}

// Detect registry backend capabilities
registryType := coord.registry.GetBackendType()
registryInterfaces := coord.registry.GetImplementedInterfaces()

caps.Slots.RegistryBackendType = registryType
caps.Slots.RegistryInterfaces = registryInterfaces

// Detect messaging backend capabilities
messagingType := coord.messaging.GetBackendType()
messagingInterfaces := coord.messaging.GetImplementedInterfaces()

caps.Slots.MessagingBackendType = messagingType
caps.Slots.MessagingInterfaces = messagingInterfaces

// Detect optional durability backend
if coord.durability != nil {
durabilityType := coord.durability.GetBackendType()
caps.Slots.DurabilityBackendType = proto.String(durabilityType)
caps.SupportsDurability = true
} else {
caps.SupportsDurability = false
}

// Detect filtering capabilities
caps.SupportsFiltering = true // Always true (client-side fallback)
caps.FilterPerformance = detectFilterPerformance(registryType)

// Detect streaming support
caps.SupportsStreaming = contains(messagingInterfaces, "PubSubBasicInterface")

// Detect wildcard filter support
caps.SupportsWildcardFilters = detectWildcardSupport(registryType)

// Detect multicast performance characteristics
caps.MulticastPerformance = detectMulticastPerformance(
messagingType,
caps.SupportsDurability,
)

// Limitations
if coord.config.MaxIdentities > 0 {
caps.MaxIdentities = proto.Int32(int32(coord.config.MaxIdentities))
}

return caps, nil
}

func detectFilterPerformance(registryType string) pb.FilterPerformance {
switch registryType {
case "redis":
// Redis can use Lua for server-side filtering
return pb.FilterPerformance_FILTER_NATIVE
case "postgres":
// Postgres can use WHERE clauses
return pb.FilterPerformance_FILTER_NATIVE
case "memstore":
// MemStore filters in Go
return pb.FilterPerformance_FILTER_CLIENT_SIDE
default:
return pb.FilterPerformance_FILTER_CLIENT_SIDE
}
}

func detectMulticastPerformance(messagingType string, hasDurability bool) pb.MulticastPerformance {
if hasDurability {
return pb.MulticastPerformance_MULTICAST_QUEUED
}

switch messagingType {
case "redis-pubsub":
return pb.MulticastPerformance_MULTICAST_NATIVE
case "nats", "kafka":
return pb.MulticastPerformance_MULTICAST_FANOUT
default:
return pb.MulticastPerformance_MULTICAST_FANOUT
}
}

3.3 Write Tests

Focus on:

  • Capability detection for different 3-slot combinations
  • Filter performance (native vs client-side)
  • Multicast with different messaging backends
  • Graceful degradation when durability slot missing

Success Criteria

  • Proto definition complete (currently doesn't exist)
  • Pattern service implements all RPCs
  • Capability detection covers 6+ slot combinations
  • Filter performance correctly identified
  • Integration tests with Redis+NATS
  • Integration tests with Postgres+Kafka
  • Tests verify durability slot optional behavior

Estimated Effort

  • Proto writing: 2 days (large proto, ~600 lines)
  • Implementation: 4 days
  • Testing: 3 days
  • Integration validation: 1 day
  • Total: 10 days (2 weeks)

Phase 4: Client Integration Testing (Week 6)

Goal: Update Rust proxy to use new pattern services and validate end-to-end.

Tasks

4.1 Update Rust Client Code

Before:

// clients/rust/prism-client/src/keyvalue.rs
use prism::interfaces::keyvalue::{
KeyValueBasicInterface, KeyValueScanInterface, KeyValueTTLInterface
};

pub struct KeyValueClient {
basic_client: KeyValueBasicInterfaceClient<Channel>,
scan_client: Option<KeyValueScanInterfaceClient<Channel>>,
ttl_client: KeyValueTTLInterfaceClient<Channel>,
}

After:

// clients/rust/prism-client/src/keyvalue.rs
use prism::patterns::keyvalue::{KeyValuePatternClient, Capabilities};

pub struct KeyValueClient {
client: KeyValuePatternClient<Channel>,
capabilities: Capabilities,
}

impl KeyValueClient {
pub async fn new(addr: String) -> Result<Self, Error> {
let channel = Channel::from_shared(addr)?.connect().await?;
let mut client = KeyValuePatternClient::new(channel);

// Discover capabilities at initialization
let capabilities = client
.get_capabilities(GetCapabilitiesRequest {})
.await?
.into_inner();

info!(
"KeyValue backend: {} (scan: {}, expiration: {})",
capabilities.slots.as_ref().unwrap().primary_backend_type,
capabilities.supports_scan,
capabilities.supports_expiration
);

Ok(Self { client, capabilities })
}

pub async fn store(&mut self, key: String, value: Vec<u8>) -> Result<(), Error> {
let response = self
.client
.store(StoreRequest {
key,
value,
expiration_seconds: None,
tags: None,
})
.await?
.into_inner();

if !response.success {
return Err(Error::Store(response.error.unwrap_or_default()));
}

Ok(())
}

pub async fn scan(&mut self, prefix: String) -> Result<Vec<String>, Error> {
// Check capability before calling
if !self.capabilities.supports_scan {
return Err(Error::UnsupportedOperation(
"Scan not supported by backend".to_string()
));
}

// Warn if emulated
if self.capabilities.scan_performance == ScanPerformance::ScanEmulated as i32 {
warn!("Scan is emulated, performance may be degraded");
}

let mut stream = self
.client
.scan(ScanRequest {
prefix: Some(prefix),
cursor: None,
limit: Some(100),
include_values: Some(false),
})
.await?
.into_inner();

let mut keys = Vec::new();
while let Some(response) = stream.message().await? {
keys.extend(response.keys);
if !response.has_more {
break;
}
}

Ok(keys)
}
}

4.2 Update Integration Tests

// clients/rust/prism-client/tests/integration_test.rs

#[tokio::test]
async fn test_capability_discovery() {
let client = KeyValueClient::new("http://localhost:50051".to_string())
.await
.unwrap();

let caps = client.capabilities();

// Verify capabilities detected
assert_eq!(caps.slots.as_ref().unwrap().primary_backend_type, "redis");
assert!(caps.supports_batch);
assert!(caps.supports_scan);
assert_eq!(caps.scan_performance, ScanPerformance::ScanNative as i32);
}

#[tokio::test]
async fn test_unsupported_operation_graceful() {
// Start KeyValue pattern with S3 backend (no scan support)
let _pattern = start_keyvalue_with_s3().await;

let mut client = KeyValueClient::new("http://localhost:50052".to_string())
.await
.unwrap();

// Verify capabilities indicate no scan support
assert!(!client.capabilities().supports_scan);

// Attempt scan should return clear error
let result = client.scan("test:".to_string()).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not supported by backend"));
}

#[tokio::test]
async fn test_inline_expiration() {
let mut client = KeyValueClient::new("http://localhost:50051".to_string())
.await
.unwrap();

// Verify expiration supported
assert!(client.capabilities().supports_expiration);

// Store with expiration
client
.client
.store(StoreRequest {
key: "expire:key".to_string(),
value: b"value".to_vec(),
expiration_seconds: Some(60),
tags: None,
})
.await
.unwrap();

// Verify key exists
let resp = client.retrieve("expire:key".to_string()).await.unwrap();
assert!(resp.found);

// Verify expiration set
let ttl_resp = client
.client
.get_expiration(GetExpirationRequest {
key: "expire:key".to_string(),
})
.await
.unwrap()
.into_inner();

assert!(ttl_resp.expiration_seconds > 50);
assert!(ttl_resp.expiration_seconds <= 60);
}

4.3 Matrix Testing

Test all backend combinations:

# Test matrix script
#!/bin/bash

backends=(
"redis:6379"
"postgres:5432"
"memstore:internal"
)

for backend in "${backends[@]}"; do
echo "Testing KeyValue with $backend"

# Start pattern with specific backend
./start_keyvalue_pattern.sh "$backend"

# Run integration tests
cargo test --test integration_test -- --test-threads=1

# Verify capabilities
cargo test test_capability_discovery_${backend%%:*}

# Cleanup
./stop_keyvalue_pattern.sh
done

Success Criteria

  • Rust client uses single KeyValuePatternClient
  • Capability discovery integrated into client initialization
  • Integration tests pass with Redis
  • Integration tests pass with Postgres
  • Integration tests pass with MemStore
  • Graceful error handling for unsupported operations
  • Matrix testing covers 3 backends × 5 operations = 15 scenarios

Estimated Effort

  • Rust client updates: 2 days
  • Integration test writing: 2 days
  • Matrix testing setup: 1 day
  • Total: 5 days (1 week)

Phase 5: Producer/Consumer Migration (Week 7)

Goal: Implement pattern services for Producer and Consumer patterns.

Approach

Similar to KeyValue but simpler (fewer operations):

  • Write proto (from RFC-046 template)
  • Implement pattern services
  • Detect capabilities (ordering, batching, exactly-once)
  • Write tests
  • Update clients

Success Criteria

  • Proto definitions complete
  • Pattern services implemented
  • Capability detection for Kafka vs NATS
  • Integration tests pass

Estimated Effort

  • Total: 5 days (1 week) (simpler than KeyValue/Multicast)

Phase 6: Documentation and Deprecation (Week 8)

Goal: Document new architecture, deprecate old services, provide migration guide.

Tasks

6.1 Update RFC Documents

  • Update RFC-014 (Layered Patterns) to reference RFC-046
  • Update RFC-025 (Pattern SDK) to reference consolidated protocols
  • Mark backend interface RFCs as "implementation detail, not client-facing"

6.2 Write Migration Guide

Create a comprehensive migration guide document with the following sections:

Title: Migration Guide: Backend Interfaces to Pattern Protocols

Overview Section: Explain that Prism patterns now expose consolidated protocol services instead of multiple backend-level interfaces.

KeyValue Pattern Migration Section:

Before (Deprecated) - Show old approach:

  • Import multiple interface stubs (KeyValueBasicInterfaceStub, KeyValueScanInterfaceStub)
  • Create separate clients for each interface
  • Use try/catch for optional operations

After (Recommended) - Show new approach:

  • Import single KeyValuePatternStub
  • Call GetCapabilities() during initialization
  • Use semantic operations (Store instead of Set)
  • Check capabilities before optional operations

Migration Checklist:

  • Replace backend interface imports with pattern imports
  • Add GetCapabilities() call during initialization
  • Replace backend operation names with semantic names
  • Add capability checks before optional operations
  • Test with multiple backends

6.3 Add Deprecation Warnings

// patterns/keyvalue/grpc_server.go

// DEPRECATED: KeyValueBasicService is deprecated. Use KeyValuePattern instead.
// This service will be removed in v2.0.
type KeyValueBasicService struct {
pb_kv.UnimplementedKeyValueBasicInterfaceServer
kv *KeyValue
}

func (s *KeyValueBasicService) Set(ctx context.Context, req *pb_kv.SetRequest) (*pb_kv.SetResponse, error) {
// Emit deprecation warning
log.Warn("KeyValueBasicInterface.Set is deprecated, use KeyValuePattern.Store instead")

// Delegate to existing implementation
return s.kv.handleSet(ctx, req)
}

6.4 Update API Documentation

Generate updated docs:

# Generate proto docs
task proto-docs

# Generate Go package docs
go doc ./patterns/keyvalue

# Generate Rust client docs
cargo doc --no-deps

Success Criteria

  • Migration guide complete with code examples
  • RFC documents updated
  • Deprecation warnings in place
  • API documentation generated
  • Changelog updated

Estimated Effort

  • Documentation writing: 3 days
  • Deprecation warnings: 1 day
  • Doc generation: 1 day
  • Total: 5 days (1 week)

Phase 7: Cleanup (Weeks 9-10)

Goal: Remove deprecated code after migration period.

Tasks

7.1 Remove Old Service Registrations

// patterns/keyvalue/grpc_server.go
func NewGRPCServer(kv *KeyValue, port int) (*GRPCServer, error) {
// ... existing setup ...

// OLD (REMOVED):
// basicService := &KeyValueBasicService{kv: kv}
// pb_kv.RegisterKeyValueBasicInterfaceServer(grpcServer, basicService)

// NEW (ONLY):
patternService, err := NewPatternService(kv)
if err != nil {
return nil, fmt.Errorf("failed to create pattern service: %w", err)
}
pb_pattern.RegisterKeyValuePatternServer(grpcServer, patternService)

log.Printf("[KeyValue gRPC] ✅ Registered KeyValuePattern service")

return server, nil
}

7.2 Move Deprecated Protos

mkdir proto/deprecated
git mv proto/prism/interfaces/keyvalue/keyvalue_basic.proto proto/deprecated/
# Update imports in any internal code still using them

7.3 Performance Benchmarking

# Benchmark before cleanup
go test -bench=. -benchmem ./patterns/keyvalue > before.txt

# Benchmark after cleanup
go test -bench=. -benchmem ./patterns/keyvalue > after.txt

# Compare
benchstat before.txt after.txt

Expected result: Performance within 5% (overhead from capability detection is negligible).

7.4 Binary Size Analysis

# Before
ls -lh bin/keyvalue-runner

# After
ls -lh bin/keyvalue-runner

# Verify size unchanged or reduced (removed old service code)

Success Criteria

  • All deprecated service registrations removed
  • Old proto files moved to proto/deprecated/
  • All tests pass without deprecated code
  • Performance benchmarks within 5% of baseline
  • Binary size unchanged or reduced

Estimated Effort

  • Code removal: 2 days
  • Testing and validation: 3 days
  • Performance benchmarking: 2 days
  • Documentation cleanup: 1 day
  • Final review: 2 days
  • Total: 10 days (2 weeks)

Risk Management

Risk Matrix

RiskLikelihoodImpactMitigation
Breaking Client ChangesMediumHighMaintain backward compatibility for 6 months
Performance RegressionLowHighBenchmark every phase, capability detection is O(1)
Capability Detection BugsMediumMediumExtensive integration tests across all backends
Proto Breaking ChangesLowMediumOnly additive changes, version protos carefully
Migration FatigueHighLowClear migration guide, automated tools
Incomplete Backend CoverageMediumMediumStart with core patterns, expand gradually

Rollback Plan

Each phase is independently rollbackable:

Phase 1-3 (Proto + Go patterns):

  • Keep old services running
  • If new services have bugs, just don't promote them to clients
  • Rollback: Remove new service registration, keep old

Phase 4 (Client updates):

  • Feature flag new client code
  • Rollback: Flip feature flag, clients use old services

Phase 6-7 (Deprecation/Cleanup):

  • Don't remove old code until 100% of clients migrated
  • Metrics: Track usage of old vs new services
  • Rollback: Re-enable old service registration

Testing Strategy

Unit Testing

Each pattern must have:

  • Capability detection tests (per backend)
  • Operation tests (semantic operations)
  • Error handling tests (graceful degradation)
  • Emulation tests (if applicable)

Target: 85% coverage for pattern service code.

Integration Testing

Matrix test every pattern × backend combination:

PatternBackendsOperationsTotal Tests
KeyValue4 (Redis, Postgres, MemStore, S3)1040
Multicast Registry6 (Redis+NATS, Postgres+Kafka, etc.)636
Session Store3 (Redis, Postgres, DynamoDB)824
Producer2 (Kafka, NATS)510
Consumer2 (Kafka, NATS)612
Total122 tests

Performance Testing

Benchmark key operations:

  • Capability detection overhead: <1ms
  • Store operation: No regression
  • Scan operation: No regression
  • Batch operation: No regression

Backward Compatibility Testing

Verify old clients still work:

  • Keep one client using old services
  • Run full integration suite
  • Verify no errors or warnings (until deprecation phase)

Success Metrics

Quantitative

  • Protocol Consolidation: 100% of patterns have single gRPC service
  • Capability Discovery Adoption: 100% of clients call GetCapabilities()
  • Error Rate: Zero "method not found" errors in production
  • Performance: <5% overhead from new architecture
  • Test Coverage: 85% for pattern services, 122 integration tests passing
  • Migration Progress: Track % of clients migrated week-over-week

Qualitative

  • Developer Experience: Survey developers, expect "much easier" or "easier" ratings
  • Client Code Simplicity: Measure LOC reduction in client code (~50% expected)
  • Backend Flexibility: Demonstrate same client code works with 3+ backend combinations
  • Documentation Quality: Zero ambiguity in capability detection process

Timeline Summary

PhaseDurationDeliverables
1. Proto Foundation1 week6 proto files, code generation
2. KeyValue Migration2 weeksPattern service, capability detection, tests
3. Multicast Registry2 weeksProto + service + tests
4. Client Integration1 weekRust proxy updates, matrix tests
5. Producer/Consumer1 week2 pattern services + tests
6. Documentation1 weekMigration guide, deprecation warnings
7. Cleanup2 weeksRemove old code, benchmarking
Total10 weeksConsolidated pattern protocols

Appendix: Code Review Checklist

Proto Reviews

  • All services have GetCapabilities() RPC
  • Capabilities message includes Slots configuration
  • Performance enums defined (NATIVE, EMULATED, UNAVAILABLE)
  • Operations have clear semantic names (Store not Set)
  • Request/response messages are well-documented
  • Proto follows style guide (no breaking changes)

Go Pattern Service Reviews

  • Pattern service delegates to existing implementation
  • Capability detection is comprehensive
  • Graceful degradation implemented for missing features
  • Error messages are clear and actionable
  • Unit tests cover all capability combinations
  • Integration tests pass with multiple backends
  • No performance regressions

Client Code Reviews

  • Client calls GetCapabilities() during initialization
  • Capability checks before optional operations
  • Clear fallback paths for unsupported features
  • Error handling is graceful
  • Integration tests cover all scenarios
  • Documentation updated

Deprecation Reviews

  • Deprecation warnings in place
  • Migration guide complete with examples
  • Timeline communicated to all stakeholders
  • Metrics tracking old vs new service usage
  • Rollback plan documented