MEMO-042: Consolidated Pattern Protocols Implementation Strategy
Purpose
Provide tactical implementation guidance for RFC-046 (Consolidated Pattern Protocols). This memo breaks down the architectural changes into concrete, executable tasks with clear success criteria, test strategies, and rollout plans.
Related: RFC-046: Consolidated Pattern Protocols
Executive Summary
What: Consolidate pattern protocols from multiple backend-level services into single semantic pattern services.
Why: Current architecture leaks backend details to clients, has no capability negotiation, and makes patterns hard to evolve.
How: 10-week phased implementation with backward compatibility, comprehensive testing, and gradual client migration.
Risk: Medium - requires coordination across proto, Go patterns, Rust proxy, and client SDKs. Mitigated by maintaining backward compatibility during transition.
Current State Analysis
Architecture Gaps
| Component | Current State | Problem | Target State |
|---|---|---|---|
| KeyValue Pattern | Exposes 4 separate services | Client must handle 4 imports | Single KeyValuePattern service |
| Multicast Registry | No proto definition | Pattern logic not in protocol | Complete proto with 3-slot schema |
| Capability Discovery | Runtime try/catch | Clients don't know features | GetCapabilities() RPC |
| Backend Slots | Implicit Go interfaces | Not validated or documented | Formal slot schema in proto |
| Graceful Degradation | Client-side fallbacks | Inconsistent error handling | Pattern-side emulation/errors |
Code Impact Analysis
Proto Changes:
- Create:
proto/prism/patterns/directory (6 new files) - Modify: None (additive only)
- Delete: None (deprecate in-place)
Go Pattern Changes (per pattern):
- New:
pattern_service.go(consolidated gRPC service impl) - New:
capabilities.go(capability detection logic) - New:
emulation.go(graceful degradation for missing features) - Modify:
grpc_server.go(register new service alongside old) - Modify:
*_test.go(add capability tests)
Rust Proxy Changes:
- Modify: Client code to use new pattern services
- New: Capability discovery at client initialization
- Modify: Error handling for unsupported operations
Estimated Lines of Code:
- Proto: ~2000 lines (6 pattern protos + slot schema)
- Go: ~5000 lines (5 patterns × ~1000 lines each)
- Rust: ~1500 lines (client adaptations)
- Tests: ~3000 lines (integration tests for all patterns)
- Total: ~11,500 lines
Detailed Implementation Plan
Phase 1: Proto Foundation (Week 1)
Goal: Create all proto definitions and validate they compile.
Tasks
1.1 Create Directory Structure
mkdir -p proto/prism/patterns/{keyvalue,multicast_registry,session_store,producer,consumer,schemas}
1.2 Write Proto Files
Priority order (most complex first):
patterns/schemas/slot_schema.proto(foundation for others)patterns/keyvalue/keyvalue_pattern.proto(40% of LOC from RFC)patterns/multicast_registry/multicast_registry_pattern.proto(35% of LOC)patterns/session_store/session_store_pattern.proto(10% of LOC)patterns/producer/producer_pattern.proto(8% of LOC)patterns/consumer/consumer_pattern.proto(7% of LOC)
1.3 Generate Code
# Add to proto/Makefile
generate-patterns:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
prism/patterns/**/*.proto
# Run generation
make generate-patterns
1.4 Validation
# Proto compilation
task proto-lint
# Generated code builds
cd pkg/plugin/gen/prism/patterns/keyvalue
go build .
# gRPC services have correct signatures
go doc KeyValuePatternServer
Success Criteria
- All 6 proto files compile without errors
- Generated Go code passes
go vetandgolangci-lint - All gRPC service interfaces exported
- Documentation comments generated correctly
Estimated Effort
- Proto writing: 3 days (use RFC-046 as template)
- Code generation setup: 1 day
- Validation and fixes: 1 day
- Total: 5 days
Phase 2: KeyValue Pattern Migration (Weeks 2-3)
Goal: Implement consolidated KeyValuePattern service with full capability negotiation.
Tasks
2.1 Implement Pattern Service (patterns/keyvalue/pattern_service.go)
// patterns/keyvalue/pattern_service.go
package keyvalue
import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/keyvalue"
)
// PatternService implements the consolidated KeyValuePattern gRPC service
type PatternService struct {
pb.UnimplementedKeyValuePatternServer
kv *KeyValue // Existing KeyValue implementation
caps *pb.Capabilities // Cached capabilities
}
func NewPatternService(kv *KeyValue) (*PatternService, error) {
caps, err := detectCapabilities(kv)
if err != nil {
return nil, err
}
return &PatternService{
kv: kv,
caps: caps,
}, nil
}
// GetCapabilities returns pattern capabilities (cached at startup)
func (s *PatternService) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.Capabilities, error) {
return s.caps, nil
}
// Store is the semantic key-value set operation
func (s *PatternService) Store(ctx context.Context, req *pb.StoreRequest) (*pb.StoreResponse, error) {
// Delegate to existing KeyValue.Set() implementation
err := s.kv.Set(ctx, req.Key, req.Value)
if err != nil {
return &pb.StoreResponse{Success: false, Error: proto.String(err.Error())}, nil
}
// Handle inline expiration if requested
if req.ExpirationSeconds != nil {
if s.caps.SupportsExpiration {
err = s.kv.Expire(ctx, req.Key, *req.ExpirationSeconds)
if err != nil {
return &pb.StoreResponse{Success: false, Error: proto.String(err.Error())}, nil
}
} else {
return &pb.StoreResponse{
Success: false,
Error: proto.String("expiration not supported by backend"),
}, nil
}
}
return &pb.StoreResponse{Success: true}, nil
}
// Retrieve is the semantic key-value get operation
func (s *PatternService) Retrieve(ctx context.Context, req *pb.RetrieveRequest) (*pb.RetrieveResponse, error) {
value, found, err := s.kv.Get(ctx, req.Key)
if err != nil {
return &pb.RetrieveResponse{Found: false, Error: proto.String(err.Error())}, nil
}
if !found {
return &pb.RetrieveResponse{Found: false}, nil
}
return &pb.RetrieveResponse{Found: true, Value: value}, nil
}
// Scan with graceful degradation
func (s *PatternService) Scan(req *pb.ScanRequest, stream pb.KeyValuePattern_ScanServer) error {
if !s.caps.SupportsScan {
return status.Error(codes.Unimplemented, "scan not supported: backend does not implement KeyValueScanInterface")
}
// Check performance characteristics and warn if emulated
if s.caps.ScanPerformance == pb.ScanPerformance_SCAN_EMULATED {
// Log warning but proceed
log.Warn("Scan is emulated, performance may be degraded")
}
// Delegate to backend scan implementation
return s.kv.Scan(req, stream)
}
// ... implement remaining operations
2.2 Implement Capability Detection (patterns/keyvalue/capabilities.go)
// patterns/keyvalue/capabilities.go
package keyvalue
import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/keyvalue"
)
func detectCapabilities(kv *KeyValue) (*pb.Capabilities, error) {
caps := &pb.Capabilities{
Slots: &pb.SlotConfiguration{},
}
// Detect backend type
backendType := kv.GetBackendType()
caps.Slots.PrimaryBackendType = backendType
// Detect interfaces implemented by backend
interfaces := kv.GetImplementedInterfaces()
caps.Slots.PrimaryInterfaces = interfaces
// Feature detection based on interfaces
caps.SupportsBatch = contains(interfaces, "KeyValueBatchInterface")
caps.SupportsScan = contains(interfaces, "KeyValueScanInterface")
caps.SupportsExpiration = contains(interfaces, "KeyValueTTLInterface")
caps.SupportsTransactions = contains(interfaces, "KeyValueTransactionalInterface")
// Performance characteristics based on backend type
caps.ScanPerformance = detectScanPerformance(backendType)
caps.BatchPerformance = detectBatchPerformance(backendType)
// Limitations based on backend
caps.MaxKeySize = getMaxKeySize(backendType)
caps.MaxValueSize = getMaxValueSize(backendType)
return caps, nil
}
func detectScanPerformance(backendType string) pb.ScanPerformance {
switch backendType {
case "redis", "postgres", "memstore":
return pb.ScanPerformance_SCAN_NATIVE
case "dynamodb":
return pb.ScanPerformance_SCAN_EMULATED
case "s3", "minio":
return pb.ScanPerformance_SCAN_UNAVAILABLE
default:
return pb.ScanPerformance_SCAN_UNAVAILABLE
}
}
func detectBatchPerformance(backendType string) pb.BatchPerformance {
switch backendType {
case "redis", "postgres", "dynamodb":
return pb.BatchPerformance_BATCH_NATIVE
case "memstore":
return pb.BatchPerformance_BATCH_PIPELINE
case "s3", "minio":
return pb.BatchPerformance_BATCH_UNAVAILABLE
default:
return pb.BatchPerformance_BATCH_UNAVAILABLE
}
}
func getMaxKeySize(backendType string) *int64 {
limits := map[string]int64{
"redis": 512 * 1024 * 1024, // 512MB
"postgres": 8192, // 8KB
"memstore": 1024 * 1024, // 1MB
"dynamodb": 2048, // 2KB
"s3": 1024, // 1KB
}
if limit, ok := limits[backendType]; ok {
return &limit
}
return nil
}
2.3 Implement Graceful Degradation (patterns/keyvalue/emulation.go)
// patterns/keyvalue/emulation.go
package keyvalue
// Emulate scan for backends that support Query but not Scan (e.g., DynamoDB)
func (s *PatternService) emulateScan(req *pb.ScanRequest, stream pb.KeyValuePattern_ScanServer) error {
// DynamoDB example: use Query with pagination
var cursor *string
if req.Cursor != nil && *req.Cursor != "" {
cursor = req.Cursor
}
limit := int32(100)
if req.Limit != nil {
limit = *req.Limit
}
// Query with pagination
keys, values, nextCursor, hasMore, err := s.kv.QueryWithPagination(
stream.Context(),
req.Prefix,
cursor,
limit,
req.IncludeValues != nil && *req.IncludeValues,
)
if err != nil {
return err
}
// Send response
response := &pb.ScanResponse{
Keys: keys,
Values: values,
HasMore: hasMore,
}
if nextCursor != nil {
response.NextCursor = nextCursor
}
return stream.Send(response)
}
// Emulate batch operations by pipelining individual operations
func (s *PatternService) emulateBatchSet(ctx context.Context, req *pb.StoreBatchRequest) (*pb.StoreBatchResponse, error) {
results := make([]*pb.StoreResponse, len(req.Requests))
successCount := int32(0)
// Execute in parallel with bounded concurrency
sem := make(chan struct{}, 10) // Max 10 concurrent operations
var wg sync.WaitGroup
var mu sync.Mutex
for i, storeReq := range req.Requests {
wg.Add(1)
sem <- struct{}{}
go func(idx int, r *pb.StoreRequest) {
defer wg.Done()
defer func() { <-sem }()
resp, err := s.Store(ctx, r)
mu.Lock()
results[idx] = resp
if err == nil && resp.Success {
successCount++
}
mu.Unlock()
}(i, storeReq)
}
wg.Wait()
return &pb.StoreBatchResponse{
Results: results,
AllSuccess: successCount == int32(len(req.Requests)),
SuccessCount: successCount,
}, nil
}
2.4 Update gRPC Server Registration
// patterns/keyvalue/grpc_server.go
func NewGRPCServer(kv *KeyValue, port int) (*GRPCServer, error) {
// ... existing setup ...
// BACKWARD COMPATIBILITY: Register old services
basicService := &KeyValueBasicService{kv: kv}
pb_kv.RegisterKeyValueBasicInterfaceServer(grpcServer, basicService)
ttlService := &KeyValueTTLService{kv: kv}
pb_kv.RegisterKeyValueTTLInterfaceServer(grpcServer, ttlService)
if kv.SupportsScan() {
scanService := &KeyValueScanService{kv: kv}
pb_kv.RegisterKeyValueScanInterfaceServer(grpcServer, scanService)
}
// NEW: Register consolidated pattern service
patternService, err := NewPatternService(kv)
if err != nil {
return nil, fmt.Errorf("failed to create pattern service: %w", err)
}
pb_pattern.RegisterKeyValuePatternServer(grpcServer, patternService)
log.Printf("[KeyValue gRPC] ✅ Registered KeyValuePattern service (NEW)")
log.Printf("[KeyValue gRPC] ⚠️ Old services still available for backward compatibility")
return server, nil
}
2.5 Write Tests
// patterns/keyvalue/pattern_service_test.go
func TestGetCapabilities_Redis(t *testing.T) {
kv := setupRedisKeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)
caps, err := service.GetCapabilities(context.Background(), &pb.GetCapabilitiesRequest{})
require.NoError(t, err)
assert.True(t, caps.SupportsBatch)
assert.True(t, caps.SupportsScan)
assert.True(t, caps.SupportsExpiration)
assert.Equal(t, pb.ScanPerformance_SCAN_NATIVE, caps.ScanPerformance)
assert.Equal(t, "redis", caps.Slots.PrimaryBackendType)
}
func TestGetCapabilities_S3(t *testing.T) {
kv := setupS3KeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)
caps, err := service.GetCapabilities(context.Background(), &pb.GetCapabilitiesRequest{})
require.NoError(t, err)
assert.False(t, caps.SupportsBatch)
assert.False(t, caps.SupportsScan)
assert.False(t, caps.SupportsExpiration)
assert.Equal(t, pb.ScanPerformance_SCAN_UNAVAILABLE, caps.ScanPerformance)
assert.Equal(t, "s3", caps.Slots.PrimaryBackendType)
}
func TestScan_UnsupportedBackend(t *testing.T) {
kv := setupS3KeyValue(t) // S3 doesn't support scan
service, err := NewPatternService(kv)
require.NoError(t, err)
stream := &mockScanStream{}
err = service.Scan(&pb.ScanRequest{Prefix: proto.String("test:")}, stream)
assert.Error(t, err)
assert.Contains(t, err.Error(), "scan not supported")
assert.Equal(t, codes.Unimplemented, status.Code(err))
}
func TestStore_WithInlineExpiration_Supported(t *testing.T) {
kv := setupRedisKeyValue(t)
service, err := NewPatternService(kv)
require.NoError(t, err)
resp, err := service.Store(context.Background(), &pb.StoreRequest{
Key: "test:key",
Value: []byte("value"),
ExpirationSeconds: proto.Int64(60),
})
require.NoError(t, err)
assert.True(t, resp.Success)
// Verify expiration was set
ttl, err := kv.GetTTL(context.Background(), "test:key")
require.NoError(t, err)
assert.Greater(t, ttl, int64(50))
assert.LessOrEqual(t, ttl, int64(60))
}
func TestStore_WithInlineExpiration_Unsupported(t *testing.T) {
kv := setupPostgresKeyValue(t) // Postgres doesn't support TTL natively
service, err := NewPatternService(kv)
require.NoError(t, err)
resp, err := service.Store(context.Background(), &pb.StoreRequest{
Key: "test:key",
Value: []byte("value"),
ExpirationSeconds: proto.Int64(60),
})
require.NoError(t, err)
assert.False(t, resp.Success)
assert.Contains(t, *resp.Error, "expiration not supported")
}
Success Criteria
-
KeyValuePatternservice fully implemented -
GetCapabilities()correctly detects Redis, Postgres, MemStore, S3 - Scan returns proper error for S3
- Scan emulation works for DynamoDB (if implemented)
- Batch emulation works for MemStore
- All 20+ unit tests pass
- Integration tests pass with 4 different backends
- Backward compatibility maintained (old services still work)
Estimated Effort
- Implementation: 5 days
- Testing: 3 days
- Integration validation: 2 days
- Total: 10 days (2 weeks)
Phase 3: Multicast Registry Pattern Migration (Weeks 4-5)
Goal: Create proto definition for Multicast Registry (currently has none) and implement consolidated service.
Tasks
3.1 Implement Pattern Service (patterns/multicast_registry/pattern_service.go)
// patterns/multicast_registry/pattern_service.go
package multicast_registry
import (
pb "github.com/jrepp/prism-data-layer/pkg/plugin/gen/prism/patterns/multicast_registry"
)
type PatternService struct {
pb.UnimplementedMulticastRegistryPatternServer
coordinator *Coordinator // Existing implementation
caps *pb.Capabilities
}
func NewPatternService(coordinator *Coordinator) (*PatternService, error) {
caps, err := detectCapabilities(coordinator)
if err != nil {
return nil, err
}
return &PatternService{
coordinator: coordinator,
caps: caps,
}, nil
}
func (s *PatternService) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.Capabilities, error) {
return s.caps, nil
}
func (s *PatternService) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error) {
// Convert protobuf.Struct to map[string]interface{}
metadata := req.Metadata.AsMap()
// Convert duration to time.Duration
ttl := time.Duration(0)
if req.Ttl != nil {
ttl = req.Ttl.AsDuration()
}
// Delegate to coordinator
err := s.coordinator.Register(ctx, req.Identity, metadata, ttl)
if err != nil {
return &pb.RegisterResponse{Success: false, Error: proto.String(err.Error())}, nil
}
return &pb.RegisterResponse{Success: true, Identity: req.Identity}, nil
}
func (s *PatternService) Enumerate(ctx context.Context, req *pb.EnumerateRequest) (*pb.EnumerateResponse, error) {
// Convert proto filter to internal filter format
filter := convertFilter(req.Filter)
// Delegate to coordinator
identities, err := s.coordinator.Enumerate(ctx, filter)
if err != nil {
return nil, err
}
// Convert to proto format
pbIdentities := make([]*pb.Identity, len(identities))
for i, id := range identities {
pbIdentities[i] = &pb.Identity{
Identity: id.ID,
Metadata: structpb.NewStructValue(id.Metadata),
Tags: id.Tags,
RegisteredAt: id.RegisteredAt.Unix(),
}
if id.ExpiresAt != nil {
pbIdentities[i].ExpiresAt = proto.Int64(id.ExpiresAt.Unix())
}
}
return &pb.EnumerateResponse{
Identities: pbIdentities,
TotalCount: int32(len(identities)),
HasMore: false,
}, nil
}
func (s *PatternService) Multicast(ctx context.Context, req *pb.MulticastRequest) (*pb.MulticastResponse, error) {
filter := convertFilter(req.Filter)
// Set timeout if specified
if req.TimeoutMs != nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(*req.TimeoutMs)*time.Millisecond)
defer cancel()
}
// Delegate to coordinator
resp, err := s.coordinator.Multicast(ctx, filter, req.Payload)
if err != nil {
return &pb.MulticastResponse{Success: false, Error: proto.String(err.Error())}, nil
}
// Convert results
pbResults := make([]*pb.DeliveryResult, len(resp.Results))
for i, r := range resp.Results {
pbResults[i] = &pb.DeliveryResult{
Identity: r.Identity,
Status: convertDeliveryStatus(r.Status),
LatencyMs: int32(r.Latency.Milliseconds()),
}
if r.Error != nil {
pbResults[i].Error = proto.String(r.Error.Error())
}
}
return &pb.MulticastResponse{
Success: resp.DeliveredCount == resp.TargetCount,
TargetCount: int32(resp.TargetCount),
DeliveredCount: int32(resp.DeliveredCount),
FailedCount: int32(resp.FailedCount),
Results: pbResults,
}, nil
}
3.2 Implement Capability Detection
// patterns/multicast_registry/capabilities.go
package multicast_registry
func detectCapabilities(coord *Coordinator) (*pb.Capabilities, error) {
caps := &pb.Capabilities{
Slots: &pb.SlotConfiguration{},
}
// Detect registry backend capabilities
registryType := coord.registry.GetBackendType()
registryInterfaces := coord.registry.GetImplementedInterfaces()
caps.Slots.RegistryBackendType = registryType
caps.Slots.RegistryInterfaces = registryInterfaces
// Detect messaging backend capabilities
messagingType := coord.messaging.GetBackendType()
messagingInterfaces := coord.messaging.GetImplementedInterfaces()
caps.Slots.MessagingBackendType = messagingType
caps.Slots.MessagingInterfaces = messagingInterfaces
// Detect optional durability backend
if coord.durability != nil {
durabilityType := coord.durability.GetBackendType()
caps.Slots.DurabilityBackendType = proto.String(durabilityType)
caps.SupportsDurability = true
} else {
caps.SupportsDurability = false
}
// Detect filtering capabilities
caps.SupportsFiltering = true // Always true (client-side fallback)
caps.FilterPerformance = detectFilterPerformance(registryType)
// Detect streaming support
caps.SupportsStreaming = contains(messagingInterfaces, "PubSubBasicInterface")
// Detect wildcard filter support
caps.SupportsWildcardFilters = detectWildcardSupport(registryType)
// Detect multicast performance characteristics
caps.MulticastPerformance = detectMulticastPerformance(
messagingType,
caps.SupportsDurability,
)
// Limitations
if coord.config.MaxIdentities > 0 {
caps.MaxIdentities = proto.Int32(int32(coord.config.MaxIdentities))
}
return caps, nil
}
func detectFilterPerformance(registryType string) pb.FilterPerformance {
switch registryType {
case "redis":
// Redis can use Lua for server-side filtering
return pb.FilterPerformance_FILTER_NATIVE
case "postgres":
// Postgres can use WHERE clauses
return pb.FilterPerformance_FILTER_NATIVE
case "memstore":
// MemStore filters in Go
return pb.FilterPerformance_FILTER_CLIENT_SIDE
default:
return pb.FilterPerformance_FILTER_CLIENT_SIDE
}
}
func detectMulticastPerformance(messagingType string, hasDurability bool) pb.MulticastPerformance {
if hasDurability {
return pb.MulticastPerformance_MULTICAST_QUEUED
}
switch messagingType {
case "redis-pubsub":
return pb.MulticastPerformance_MULTICAST_NATIVE
case "nats", "kafka":
return pb.MulticastPerformance_MULTICAST_FANOUT
default:
return pb.MulticastPerformance_MULTICAST_FANOUT
}
}
3.3 Write Tests
Focus on:
- Capability detection for different 3-slot combinations
- Filter performance (native vs client-side)
- Multicast with different messaging backends
- Graceful degradation when durability slot missing
Success Criteria
- Proto definition complete (currently doesn't exist)
- Pattern service implements all RPCs
- Capability detection covers 6+ slot combinations
- Filter performance correctly identified
- Integration tests with Redis+NATS
- Integration tests with Postgres+Kafka
- Tests verify durability slot optional behavior
Estimated Effort
- Proto writing: 2 days (large proto, ~600 lines)
- Implementation: 4 days
- Testing: 3 days
- Integration validation: 1 day
- Total: 10 days (2 weeks)
Phase 4: Client Integration Testing (Week 6)
Goal: Update Rust proxy to use new pattern services and validate end-to-end.
Tasks
4.1 Update Rust Client Code
Before:
// clients/rust/prism-client/src/keyvalue.rs
use prism::interfaces::keyvalue::{
KeyValueBasicInterface, KeyValueScanInterface, KeyValueTTLInterface
};
pub struct KeyValueClient {
basic_client: KeyValueBasicInterfaceClient<Channel>,
scan_client: Option<KeyValueScanInterfaceClient<Channel>>,
ttl_client: KeyValueTTLInterfaceClient<Channel>,
}
After:
// clients/rust/prism-client/src/keyvalue.rs
use prism::patterns::keyvalue::{KeyValuePatternClient, Capabilities};
pub struct KeyValueClient {
client: KeyValuePatternClient<Channel>,
capabilities: Capabilities,
}
impl KeyValueClient {
pub async fn new(addr: String) -> Result<Self, Error> {
let channel = Channel::from_shared(addr)?.connect().await?;
let mut client = KeyValuePatternClient::new(channel);
// Discover capabilities at initialization
let capabilities = client
.get_capabilities(GetCapabilitiesRequest {})
.await?
.into_inner();
info!(
"KeyValue backend: {} (scan: {}, expiration: {})",
capabilities.slots.as_ref().unwrap().primary_backend_type,
capabilities.supports_scan,
capabilities.supports_expiration
);
Ok(Self { client, capabilities })
}
pub async fn store(&mut self, key: String, value: Vec<u8>) -> Result<(), Error> {
let response = self
.client
.store(StoreRequest {
key,
value,
expiration_seconds: None,
tags: None,
})
.await?
.into_inner();
if !response.success {
return Err(Error::Store(response.error.unwrap_or_default()));
}
Ok(())
}
pub async fn scan(&mut self, prefix: String) -> Result<Vec<String>, Error> {
// Check capability before calling
if !self.capabilities.supports_scan {
return Err(Error::UnsupportedOperation(
"Scan not supported by backend".to_string()
));
}
// Warn if emulated
if self.capabilities.scan_performance == ScanPerformance::ScanEmulated as i32 {
warn!("Scan is emulated, performance may be degraded");
}
let mut stream = self
.client
.scan(ScanRequest {
prefix: Some(prefix),
cursor: None,
limit: Some(100),
include_values: Some(false),
})
.await?
.into_inner();
let mut keys = Vec::new();
while let Some(response) = stream.message().await? {
keys.extend(response.keys);
if !response.has_more {
break;
}
}
Ok(keys)
}
}
4.2 Update Integration Tests
// clients/rust/prism-client/tests/integration_test.rs
#[tokio::test]
async fn test_capability_discovery() {
let client = KeyValueClient::new("http://localhost:50051".to_string())
.await
.unwrap();
let caps = client.capabilities();
// Verify capabilities detected
assert_eq!(caps.slots.as_ref().unwrap().primary_backend_type, "redis");
assert!(caps.supports_batch);
assert!(caps.supports_scan);
assert_eq!(caps.scan_performance, ScanPerformance::ScanNative as i32);
}
#[tokio::test]
async fn test_unsupported_operation_graceful() {
// Start KeyValue pattern with S3 backend (no scan support)
let _pattern = start_keyvalue_with_s3().await;
let mut client = KeyValueClient::new("http://localhost:50052".to_string())
.await
.unwrap();
// Verify capabilities indicate no scan support
assert!(!client.capabilities().supports_scan);
// Attempt scan should return clear error
let result = client.scan("test:".to_string()).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not supported by backend"));
}
#[tokio::test]
async fn test_inline_expiration() {
let mut client = KeyValueClient::new("http://localhost:50051".to_string())
.await
.unwrap();
// Verify expiration supported
assert!(client.capabilities().supports_expiration);
// Store with expiration
client
.client
.store(StoreRequest {
key: "expire:key".to_string(),
value: b"value".to_vec(),
expiration_seconds: Some(60),
tags: None,
})
.await
.unwrap();
// Verify key exists
let resp = client.retrieve("expire:key".to_string()).await.unwrap();
assert!(resp.found);
// Verify expiration set
let ttl_resp = client
.client
.get_expiration(GetExpirationRequest {
key: "expire:key".to_string(),
})
.await
.unwrap()
.into_inner();
assert!(ttl_resp.expiration_seconds > 50);
assert!(ttl_resp.expiration_seconds <= 60);
}
4.3 Matrix Testing
Test all backend combinations:
# Test matrix script
#!/bin/bash
backends=(
"redis:6379"
"postgres:5432"
"memstore:internal"
)
for backend in "${backends[@]}"; do
echo "Testing KeyValue with $backend"
# Start pattern with specific backend
./start_keyvalue_pattern.sh "$backend"
# Run integration tests
cargo test --test integration_test -- --test-threads=1
# Verify capabilities
cargo test test_capability_discovery_${backend%%:*}
# Cleanup
./stop_keyvalue_pattern.sh
done
Success Criteria
- Rust client uses single
KeyValuePatternClient - Capability discovery integrated into client initialization
- Integration tests pass with Redis
- Integration tests pass with Postgres
- Integration tests pass with MemStore
- Graceful error handling for unsupported operations
- Matrix testing covers 3 backends × 5 operations = 15 scenarios
Estimated Effort
- Rust client updates: 2 days
- Integration test writing: 2 days
- Matrix testing setup: 1 day
- Total: 5 days (1 week)
Phase 5: Producer/Consumer Migration (Week 7)
Goal: Implement pattern services for Producer and Consumer patterns.
Approach
Similar to KeyValue but simpler (fewer operations):
- Write proto (from RFC-046 template)
- Implement pattern services
- Detect capabilities (ordering, batching, exactly-once)
- Write tests
- Update clients
Success Criteria
- Proto definitions complete
- Pattern services implemented
- Capability detection for Kafka vs NATS
- Integration tests pass
Estimated Effort
- Total: 5 days (1 week) (simpler than KeyValue/Multicast)
Phase 6: Documentation and Deprecation (Week 8)
Goal: Document new architecture, deprecate old services, provide migration guide.
Tasks
6.1 Update RFC Documents
- Update RFC-014 (Layered Patterns) to reference RFC-046
- Update RFC-025 (Pattern SDK) to reference consolidated protocols
- Mark backend interface RFCs as "implementation detail, not client-facing"
6.2 Write Migration Guide
Create a comprehensive migration guide document with the following sections:
Title: Migration Guide: Backend Interfaces to Pattern Protocols
Overview Section: Explain that Prism patterns now expose consolidated protocol services instead of multiple backend-level interfaces.
KeyValue Pattern Migration Section:
Before (Deprecated) - Show old approach:
- Import multiple interface stubs (KeyValueBasicInterfaceStub, KeyValueScanInterfaceStub)
- Create separate clients for each interface
- Use try/catch for optional operations
After (Recommended) - Show new approach:
- Import single KeyValuePatternStub
- Call GetCapabilities() during initialization
- Use semantic operations (Store instead of Set)
- Check capabilities before optional operations
Migration Checklist:
- Replace backend interface imports with pattern imports
- Add GetCapabilities() call during initialization
- Replace backend operation names with semantic names
- Add capability checks before optional operations
- Test with multiple backends
6.3 Add Deprecation Warnings
// patterns/keyvalue/grpc_server.go
// DEPRECATED: KeyValueBasicService is deprecated. Use KeyValuePattern instead.
// This service will be removed in v2.0.
type KeyValueBasicService struct {
pb_kv.UnimplementedKeyValueBasicInterfaceServer
kv *KeyValue
}
func (s *KeyValueBasicService) Set(ctx context.Context, req *pb_kv.SetRequest) (*pb_kv.SetResponse, error) {
// Emit deprecation warning
log.Warn("KeyValueBasicInterface.Set is deprecated, use KeyValuePattern.Store instead")
// Delegate to existing implementation
return s.kv.handleSet(ctx, req)
}
6.4 Update API Documentation
Generate updated docs:
# Generate proto docs
task proto-docs
# Generate Go package docs
go doc ./patterns/keyvalue
# Generate Rust client docs
cargo doc --no-deps
Success Criteria
- Migration guide complete with code examples
- RFC documents updated
- Deprecation warnings in place
- API documentation generated
- Changelog updated
Estimated Effort
- Documentation writing: 3 days
- Deprecation warnings: 1 day
- Doc generation: 1 day
- Total: 5 days (1 week)
Phase 7: Cleanup (Weeks 9-10)
Goal: Remove deprecated code after migration period.
Tasks
7.1 Remove Old Service Registrations
// patterns/keyvalue/grpc_server.go
func NewGRPCServer(kv *KeyValue, port int) (*GRPCServer, error) {
// ... existing setup ...
// OLD (REMOVED):
// basicService := &KeyValueBasicService{kv: kv}
// pb_kv.RegisterKeyValueBasicInterfaceServer(grpcServer, basicService)
// NEW (ONLY):
patternService, err := NewPatternService(kv)
if err != nil {
return nil, fmt.Errorf("failed to create pattern service: %w", err)
}
pb_pattern.RegisterKeyValuePatternServer(grpcServer, patternService)
log.Printf("[KeyValue gRPC] ✅ Registered KeyValuePattern service")
return server, nil
}
7.2 Move Deprecated Protos
mkdir proto/deprecated
git mv proto/prism/interfaces/keyvalue/keyvalue_basic.proto proto/deprecated/
# Update imports in any internal code still using them
7.3 Performance Benchmarking
# Benchmark before cleanup
go test -bench=. -benchmem ./patterns/keyvalue > before.txt
# Benchmark after cleanup
go test -bench=. -benchmem ./patterns/keyvalue > after.txt
# Compare
benchstat before.txt after.txt
Expected result: Performance within 5% (overhead from capability detection is negligible).
7.4 Binary Size Analysis
# Before
ls -lh bin/keyvalue-runner
# After
ls -lh bin/keyvalue-runner
# Verify size unchanged or reduced (removed old service code)
Success Criteria
- All deprecated service registrations removed
- Old proto files moved to
proto/deprecated/ - All tests pass without deprecated code
- Performance benchmarks within 5% of baseline
- Binary size unchanged or reduced
Estimated Effort
- Code removal: 2 days
- Testing and validation: 3 days
- Performance benchmarking: 2 days
- Documentation cleanup: 1 day
- Final review: 2 days
- Total: 10 days (2 weeks)
Risk Management
Risk Matrix
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
| Breaking Client Changes | Medium | High | Maintain backward compatibility for 6 months |
| Performance Regression | Low | High | Benchmark every phase, capability detection is O(1) |
| Capability Detection Bugs | Medium | Medium | Extensive integration tests across all backends |
| Proto Breaking Changes | Low | Medium | Only additive changes, version protos carefully |
| Migration Fatigue | High | Low | Clear migration guide, automated tools |
| Incomplete Backend Coverage | Medium | Medium | Start with core patterns, expand gradually |
Rollback Plan
Each phase is independently rollbackable:
Phase 1-3 (Proto + Go patterns):
- Keep old services running
- If new services have bugs, just don't promote them to clients
- Rollback: Remove new service registration, keep old
Phase 4 (Client updates):
- Feature flag new client code
- Rollback: Flip feature flag, clients use old services
Phase 6-7 (Deprecation/Cleanup):
- Don't remove old code until 100% of clients migrated
- Metrics: Track usage of old vs new services
- Rollback: Re-enable old service registration
Testing Strategy
Unit Testing
Each pattern must have:
- Capability detection tests (per backend)
- Operation tests (semantic operations)
- Error handling tests (graceful degradation)
- Emulation tests (if applicable)
Target: 85% coverage for pattern service code.
Integration Testing
Matrix test every pattern × backend combination:
| Pattern | Backends | Operations | Total Tests |
|---|---|---|---|
| KeyValue | 4 (Redis, Postgres, MemStore, S3) | 10 | 40 |
| Multicast Registry | 6 (Redis+NATS, Postgres+Kafka, etc.) | 6 | 36 |
| Session Store | 3 (Redis, Postgres, DynamoDB) | 8 | 24 |
| Producer | 2 (Kafka, NATS) | 5 | 10 |
| Consumer | 2 (Kafka, NATS) | 6 | 12 |
| Total | 122 tests |
Performance Testing
Benchmark key operations:
- Capability detection overhead: <1ms
- Store operation: No regression
- Scan operation: No regression
- Batch operation: No regression
Backward Compatibility Testing
Verify old clients still work:
- Keep one client using old services
- Run full integration suite
- Verify no errors or warnings (until deprecation phase)
Success Metrics
Quantitative
- Protocol Consolidation: 100% of patterns have single gRPC service
- Capability Discovery Adoption: 100% of clients call GetCapabilities()
- Error Rate: Zero "method not found" errors in production
- Performance: <5% overhead from new architecture
- Test Coverage: 85% for pattern services, 122 integration tests passing
- Migration Progress: Track % of clients migrated week-over-week
Qualitative
- Developer Experience: Survey developers, expect "much easier" or "easier" ratings
- Client Code Simplicity: Measure LOC reduction in client code (~50% expected)
- Backend Flexibility: Demonstrate same client code works with 3+ backend combinations
- Documentation Quality: Zero ambiguity in capability detection process
Timeline Summary
| Phase | Duration | Deliverables |
|---|---|---|
| 1. Proto Foundation | 1 week | 6 proto files, code generation |
| 2. KeyValue Migration | 2 weeks | Pattern service, capability detection, tests |
| 3. Multicast Registry | 2 weeks | Proto + service + tests |
| 4. Client Integration | 1 week | Rust proxy updates, matrix tests |
| 5. Producer/Consumer | 1 week | 2 pattern services + tests |
| 6. Documentation | 1 week | Migration guide, deprecation warnings |
| 7. Cleanup | 2 weeks | Remove old code, benchmarking |
| Total | 10 weeks | Consolidated pattern protocols |
Appendix: Code Review Checklist
Proto Reviews
- All services have GetCapabilities() RPC
- Capabilities message includes Slots configuration
- Performance enums defined (NATIVE, EMULATED, UNAVAILABLE)
- Operations have clear semantic names (Store not Set)
- Request/response messages are well-documented
- Proto follows style guide (no breaking changes)
Go Pattern Service Reviews
- Pattern service delegates to existing implementation
- Capability detection is comprehensive
- Graceful degradation implemented for missing features
- Error messages are clear and actionable
- Unit tests cover all capability combinations
- Integration tests pass with multiple backends
- No performance regressions
Client Code Reviews
- Client calls GetCapabilities() during initialization
- Capability checks before optional operations
- Clear fallback paths for unsupported features
- Error handling is graceful
- Integration tests cover all scenarios
- Documentation updated
Deprecation Reviews
- Deprecation warnings in place
- Migration guide complete with examples
- Timeline communicated to all stakeholders
- Metrics tracking old vs new service usage
- Rollback plan documented