Skip to main content

MEMO-004: Backend Plugin Implementation Guide

Purpose

Strategic guide for implementing backend plugins in priority order, with analysis of Go SDK support, data model complexity, testing difficulty, and recommended demo configurations for the acceptance test framework (RFC-015).

Backend Implementability Matrix

Comprehensive comparison of all backends discussed for Prism, prioritized by internal needs and ranked by ease of implementation.

Comparison Table (Internal Priority Order)

RankBackendGo SDK QualityData ModelsTest DifficultyProtocol ComplexityImplementability ScorePriority
0MemStore (In-Memory)⭐⭐⭐⭐⭐ Native (sync.Map)KeyValue⭐⭐⭐⭐⭐ Instant (no deps)⭐⭐⭐⭐⭐ Trivial (Go map)100/100🔥 Internal - Testing
1Kafka⭐⭐⭐⭐ Good (segmentio/kafka-go)Event Streaming⭐⭐⭐⭐ Moderate (testcontainers)⭐⭐⭐ Complex (wire protocol)78/100🔥 Internal - Messaging
2NATS⭐⭐⭐⭐⭐ Excellent (nats.go - official)PubSub, Queue⭐⭐⭐⭐⭐ Easy (lightweight)⭐⭐⭐⭐⭐ Simple (text protocol)90/100🔥 Internal - PubSub
3PostgreSQL⭐⭐⭐⭐⭐ Excellent (pgx, pq)Relational, JSON⭐⭐⭐⭐⭐ Easy (testcontainers)⭐⭐⭐⭐ Moderate (SQL)93/100🔥 Internal - Relational
4Neptune⭐⭐ Fair (gremlin-go, AWS SDK)Graph (Gremlin/SPARQL)⭐⭐ Hard (AWS-only, no local)⭐⭐ Complex (Gremlin)50/100🔥 Internal - Graph
5Redis⭐⭐⭐⭐⭐ Excellent (go-redis)KeyValue, Cache⭐⭐⭐⭐⭐ Easy (testcontainers)⭐⭐⭐⭐⭐ Simple (RESP)95/100External
6SQLite⭐⭐⭐⭐⭐ Excellent (mattn/go-sqlite3)Relational⭐⭐⭐⭐⭐ Trivial (embedded)⭐⭐⭐⭐⭐ Simple (SQL)92/100External
7S3/MinIO⭐⭐⭐⭐⭐ Excellent (aws-sdk-go-v2, minio-go)Object Storage⭐⭐⭐⭐ Moderate (MinIO for local)⭐⭐⭐⭐ Simple (REST API)85/100External
8ClickHouse⭐⭐⭐ Good (clickhouse-go)Columnar/TimeSeries⭐⭐⭐ Moderate (testcontainers)⭐⭐⭐ Moderate (custom protocol)70/100External

Scoring Criteria

Implementability Score = weighted average of:

  • Go SDK Quality (30%): Maturity, documentation, community support
  • Data Models (15%): Complexity and variety of supported models
  • Test Difficulty (25%): Local testing, testcontainers support, startup time
  • Protocol Complexity (20%): Wire protocol complexity, client implementation difficulty
  • Community/Ecosystem (10%): Available examples, Stack Overflow answers, production usage

Detailed Backend Analysis

0. MemStore (Score: 100/100) - Simplest Possible Plugin 🔥 INTERNAL PRIORITY

Why Implement First:

  • Zero dependencies: Pure Go, uses sync.Map for thread-safe storage
  • Instant startup: No containers, no external processes
  • Perfect for testing: Fastest possible feedback loop
  • Reference implementation: Demonstrates plugin interface patterns

Go Implementation:

// plugins/memstore/store.go
package memstore

import (
"context"
"sync"
"time"
)

// MemStore implements a thread-safe in-memory key-value store
type MemStore struct {
data sync.Map
expiry sync.Map
}

func NewMemStore() *MemStore {
return &MemStore{}
}

func (m *MemStore) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
m.data.Store(key, value)

if ttl > 0 {
m.expiry.Store(key, time.Now().Add(ttl))
}

return nil
}

func (m *MemStore) Get(ctx context.Context, key string) ([]byte, error) {
// Check expiry
if exp, ok := m.expiry.Load(key); ok {
if time.Now().After(exp.(time.Time)) {
m.data.Delete(key)
m.expiry.Delete(key)
return nil, ErrKeyNotFound
}
}

value, ok := m.data.Load(key)
if !ok {
return nil, ErrKeyNotFound
}

return value.([]byte), nil
}

func (m *MemStore) Delete(ctx context.Context, key string) error {
m.data.Delete(key)
m.expiry.Delete(key)
return nil
}

Data Models Supported:

  • KeyValue (primary use case)
  • TTL support for expiration
  • PubSub (can add channels with Go channels)

Testing Strategy:

func TestMemStore(t *testing.T) {
store := NewMemStore()

// No setup needed - instant!
ctx := context.Background()

// Test basic operations
store.Set(ctx, "key1", []byte("value1"), 0)
value, err := store.Get(ctx, "key1")
assert.NoError(t, err)
assert.Equal(t, []byte("value1"), value)

// Test TTL
store.Set(ctx, "key2", []byte("value2"), 100*time.Millisecond)
time.Sleep(200 * time.Millisecond)
_, err = store.Get(ctx, "key2")
assert.Error(t, err) // Should be expired
}

Demo Plugin Operations:

  • GET, SET, DEL (basic operations)
  • EXPIRE (TTL support)
  • KEYS (list all keys)
  • FLUSH (clear all data)

RFC-015 Test Coverage:

  • Authentication: N/A (in-process)
  • Concurrency: Thread-safe via sync.Map
  • Error handling: Key not found, expired keys
  • Performance: Sub-microsecond latency

Use Cases:

  • Rapid prototyping: Test plugin patterns without external dependencies
  • CI/CD: Fastest possible test execution
  • Learning: Reference implementation for new backend developers
  • Development: Local testing without Docker

Performance Characteristics:

  • Write latency: <1μs (microsecond)
  • Read latency: <1μs
  • Throughput: 1M+ operations/sec (single instance)
  • Memory: ~10MB baseline (scales with data)

1. Redis (Score: 95/100) - Highest Implementability

Why Implement First:

  • Simplest protocol: RESP (REdis Serialization Protocol) is text-based and trivial to implement
  • Fastest to test: Starts in <1 second, minimal memory footprint
  • Perfect for demos: In-memory, no persistence needed for basic examples
  • Excellent Go SDK: go-redis/redis is mature, well-documented, idiomatic Go

Go SDK:

import "github.com/redis/go-redis/v9"

client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

Data Models Supported:

  • KeyValue (primary use case)
  • Cache (TTL support)
  • PubSub (lightweight messaging)
  • Lists, Sets, Sorted Sets

Testing Strategy:

// testcontainers integration
func NewRedisInstance(t *testing.T) *RedisInstance {
req := testcontainers.ContainerRequest{
Image: "redis:7-alpine",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForLog("Ready to accept connections"),
}
// Starts in &lt;1 second
}

Demo Plugin Operations:

  • GET, SET, DEL (basic operations)
  • EXPIRE, TTL (cache semantics)
  • PUBLISH, SUBSCRIBE (pub/sub pattern)

RFC-015 Test Coverage:

  • Authentication: AUTH command with password
  • Connection pooling: Verify multiple connections
  • Error handling: Wrong key types, expired keys
  • Concurrency: 1000s of concurrent ops

2. PostgreSQL (Score: 93/100) - Production Ready

Why Implement Second:

  • Industry standard: Most developers understand SQL
  • Strong Go ecosystem: pgx is the gold standard for Postgres Go clients
  • Rich testing: testcontainers, postgres:alpine images
  • Complex data models: Supports JSON, arrays, full-text search

Go SDK:

import "github.com/jackc/pgx/v5"

conn, _ := pgx.Connect(context.Background(), "postgres://user:pass@localhost:5432/db")

Data Models Supported:

  • Relational (tables, foreign keys, transactions)
  • JSON/JSONB (document-like queries)
  • Full-text search
  • Time-series (with extensions like TimescaleDB)

Testing Strategy:

func NewPostgresInstance(t *testing.T) *PostgresInstance {
req := testcontainers.ContainerRequest{
Image: "postgres:16-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{
"POSTGRES_PASSWORD": "testpass",
},
WaitingFor: wait.ForLog("database system is ready to accept connections"),
}
// Starts in 3-5 seconds
}

Demo Plugin Operations:

  • SELECT, INSERT, UPDATE, DELETE
  • BEGIN, COMMIT, ROLLBACK (transactions)
  • LISTEN, NOTIFY (pub/sub via Postgres)
  • Prepared statements for performance

RFC-015 Test Coverage:

  • Authentication: Username/password, SSL/TLS
  • Transaction isolation levels
  • Constraint violations (foreign keys, unique)
  • JSON operations and indexing
  • Connection pool exhaustion

3. SQLite (Score: 92/100) - Perfect for Demos

Why Implement Third:

  • Zero configuration: Embedded, no separate process
  • Instant startup: No container needed
  • Perfect for CI/CD: Fast, deterministic tests
  • Same SQL as Postgres: Easy to understand

Go SDK:

import "github.com/mattn/go-sqlite3"

db, _ := sql.Open("sqlite3", ":memory:") // In-memory DB

Data Models Supported:

  • Relational (full SQL support)
  • JSON1 extension for JSON queries
  • Full-text search (FTS5)

Testing Strategy:

func NewSQLiteInstance(t *testing.T) *SQLiteInstance {
// No container needed!
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatal(err)
}

// Create schema immediately
db.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")

return &SQLiteInstance{db: db}
}

Demo Plugin Operations:

  • All standard SQL operations
  • In-memory for speed, file-backed for persistence
  • WAL mode for concurrent reads

RFC-015 Test Coverage:

  • Authentication: N/A (file-based permissions)
  • Concurrency: Multiple readers, single writer
  • Error handling: Locked database, constraint violations

Use Cases:

  • Local development without Docker
  • CI/CD where container startup overhead matters
  • Embedded demos (single binary with DB)

4. NATS (Score: 90/100) - Cloud-Native Messaging

Why Implement Fourth:

  • Go-native: Written in Go, official Go client
  • Lightweight: <10MB memory, starts instantly
  • Modern patterns: Request-reply, streams, key-value (JetStream)
  • Simple protocol: Text-based, easy to debug

Go SDK:

import "github.com/nats-io/nats.go"

nc, _ := nats.Connect("nats://localhost:4222")

Data Models Supported:

  • PubSub (core NATS)
  • Queue groups (load balancing)
  • JetStream (persistent streams, like Kafka-lite)
  • Key-Value store (JetStream KV)

Testing Strategy:

func NewNATSInstance(t *testing.T) *NATSInstance {
// Option 1: Embedded NATS server (no container!)
s, err := server.NewServer(&server.Options{
Port: -1, // Random port
})
s.Start()

// Option 2: Container for full features
req := testcontainers.ContainerRequest{
Image: "nats:2-alpine",
ExposedPorts: []string{"4222/tcp"},
}
// Starts in &lt;2 seconds
}

Demo Plugin Operations:

  • Publish, Subscribe (pub/sub)
  • Request, Reply (RPC pattern)
  • QueueSubscribe (load balancing)
  • JetStream: AddStream, Publish, Subscribe with ack

RFC-015 Test Coverage:

  • Authentication: Token, username/password, TLS certs
  • Connection resilience: Automatic reconnect
  • Consumer acknowledgments
  • Exactly-once delivery (JetStream)

5. Kafka (Score: 78/100) - Production Event Streaming

Why Implement Fifth:

  • Industry standard: De facto event streaming platform
  • Complex but mature: Well-understood patterns
  • Good Go SDKs: segmentio/kafka-go (pure Go) or confluent-kafka-go (C bindings)
  • Testable: testcontainers support, but slow startup

Go SDK:

// Option 1: segmentio/kafka-go (pure Go)
import "github.com/segmentio/kafka-go"

writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "events",
}

// Option 2: confluent-kafka-go (faster, C deps)
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

producer, _ := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})

Data Models Supported:

  • Event streaming (append-only log)
  • Partitioned queues
  • Change data capture (Kafka Connect)
  • Stream processing (Kafka Streams)

Testing Strategy:

func NewKafkaInstance(t *testing.T) *KafkaInstance {
req := testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafka:7.5.0",
ExposedPorts: []string{"9092/tcp", "9093/tcp"},
Env: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": "zookeeper:2181",
// ... complex configuration
},
WaitingFor: wait.ForLog("started (kafka.server.KafkaServer)").
WithStartupTimeout(120 * time.Second), // Slow!
}
// Starts in 30-60 seconds (needs Zookeeper or KRaft mode)
}

Demo Plugin Operations:

  • Produce with key and value
  • Consume with consumer group
  • Offset management (commit, reset)
  • Partition assignment

RFC-015 Test Coverage:

  • Authentication: SASL/SCRAM, mTLS
  • Consumer groups: Rebalancing, partition assignment
  • Exactly-once semantics: Idempotent producer, transactional writes
  • High throughput: 10k+ messages/sec

Challenges:

  • Startup time: 30-60 seconds vs <5 seconds for Redis/Postgres
  • Configuration complexity: Many knobs to tune
  • Testing: Requires Zookeeper (or KRaft mode in newer versions)

6. S3/MinIO (Score: 85/100) - Object Storage

Why Implement Sixth:

  • Standard API: S3-compatible API used everywhere
  • MinIO for local: Production-grade S3 alternative
  • Essential for patterns: Claim Check pattern requires object storage
  • Excellent SDKs: AWS SDK v2 and MinIO Go client

Go SDK:

// AWS S3
import "github.com/aws/aws-sdk-go-v2/service/s3"

client := s3.NewFromConfig(cfg)

// MinIO (S3-compatible)
import "github.com/minio/minio-go/v7"

minioClient, _ := minio.New("localhost:9000", &minio.Options{
Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
})

Data Models Supported:

  • Object storage (key → blob)
  • Metadata (key-value tags per object)
  • Versioning (multiple versions of same key)
  • Lifecycle policies (auto-archival)

Testing Strategy:

func NewMinIOInstance(t *testing.T) *MinIOInstance {
req := testcontainers.ContainerRequest{
Image: "minio/minio:latest",
ExposedPorts: []string{"9000/tcp", "9001/tcp"},
Cmd: []string{"server", "/data", "--console-address", ":9001"},
Env: map[string]string{
"MINIO_ROOT_USER": "minioadmin",
"MINIO_ROOT_PASSWORD": "minioadmin",
},
WaitingFor: wait.ForHTTP("/minio/health/live").WithPort("9000"),
}
// Starts in 3-5 seconds
}

Demo Plugin Operations:

  • PutObject, GetObject, DeleteObject
  • ListObjects with prefix
  • Multipart upload (large files)
  • Presigned URLs (temporary access)

RFC-015 Test Coverage:

  • Authentication: Access key + secret key
  • Large objects: Multipart upload, streaming
  • Versioning: Multiple versions of same key
  • Lifecycle: Expiration policies

Use Cases:

  • Claim Check pattern (store large payloads)
  • Tiered storage (archive cold data)
  • Backup and recovery

7. ClickHouse (Score: 70/100) - Analytical Queries

Why Implement Seventh:

  • Specialized: Columnar database for analytics
  • Fast for aggregations: OLAP queries
  • Decent Go SDK: clickhouse-go is maintained
  • Testable: testcontainers support

Go SDK:

import "github.com/ClickHouse/clickhouse-go/v2"

conn, _ := clickhouse.Open(&clickhouse.Options{
Addr: []string{"localhost:9000"},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "",
},
})

Data Models Supported:

  • TimeSeries (high-cardinality metrics)
  • Columnar (fast aggregations)
  • Event logs (append-only)

Testing Strategy:

func NewClickHouseInstance(t *testing.T) *ClickHouseInstance {
req := testcontainers.ContainerRequest{
Image: "clickhouse/clickhouse-server:latest",
ExposedPorts: []string{"9000/tcp", "8123/tcp"},
WaitingFor: wait.ForLog("Ready for connections"),
}
// Starts in 5-10 seconds
}

Demo Plugin Operations:

  • INSERT (batch inserts for performance)
  • SELECT with aggregations (SUM, AVG, percentile)
  • Time-based queries (toStartOfHour, toDate)

RFC-015 Test Coverage:

  • Authentication: Username/password
  • Batch inserts: 10k+ rows/sec
  • Complex queries: Joins, aggregations
  • Compression: Verify data compression

Use Cases:

  • Metrics and observability
  • Log aggregation
  • Business intelligence

8. Neptune (Score: 50/100) - Graph Database (AWS)

Why Implement Last:

  • AWS-only: No local testing without AWS account
  • Complex protocol: Gremlin (graph traversal language)
  • Limited Go support: gremlin-go is less mature
  • Expensive to test: AWS charges, no free tier for Neptune

Go SDK:

import "github.com/apache/tinkerpop/gremlin-go/v3/driver"

remote, _ := gremlingo.NewDriverRemoteConnection("ws://localhost:8182/gremlin")
g := gremlingo.Traversal_().WithRemote(remote)

Data Models Supported:

  • Graph (vertices, edges, properties)
  • Property graph model (Gremlin)
  • RDF triples (SPARQL)

Testing Strategy:

// Problem: No good local testing option
// Option 1: Mock Gremlin responses (not ideal)
// Option 2: Use TinkerPop Gremlin Server (complex setup)
// Option 3: Fake AWS Neptune with localstack (limited support)

func NewNeptuneInstance(t *testing.T) *NeptuneInstance {
// Best option: Use Gremlin Server (JVM-based)
req := testcontainers.ContainerRequest{
Image: "tinkerpop/gremlin-server:latest",
ExposedPorts: []string{"8182/tcp"},
WaitingFor: wait.ForLog("Channel started at port 8182"),
}
// Starts in 10-15 seconds (JVM startup)
}

Demo Plugin Operations:

  • AddVertex, AddEdge (create graph elements)
  • Graph traversals: g.V().has('name', 'Alice').out('knows')
  • Path queries: Shortest path, neighbors

RFC-015 Test Coverage:

  • Authentication: IAM-based (AWS SigV4)
  • Graph traversals: Verify Gremlin queries
  • Transactions: Graph mutations

Challenges:

  • No free local testing
  • Gremlin learning curve
  • Limited Go ecosystem
  • Difficult to seed test data

Recommendation: Defer until other backends are stable.


Phase 0: Baseline Plugin (Week 1) 🔥 INTERNAL

Priority: MemStore

Rationale:

  • Zero external dependencies: Pure Go implementation
  • Fastest feedback loop: No container startup time
  • Reference implementation: Establishes plugin patterns
  • Testing foundation: Validates RFC-015 test framework immediately

Deliverables:

  • Complete in-memory plugin with RFC-015 test suite
  • Plugin interface patterns documented
  • Thread-safe concurrent operations verified
  • Sub-microsecond latency baseline established

Phase 1: Internal Messaging (Weeks 2-6) 🔥 INTERNAL

Priority: Kafka → NATS

Rationale:

  • Kafka: Internal event streaming requirement, production-critical
  • NATS: Internal pub/sub messaging, lightweight alternative
  • Both needed: Different use cases, complementary patterns

Deliverables:

  • Kafka plugin with consumer groups, partitioning, exactly-once
  • NATS plugin with JetStream support
  • PubSub and queue patterns working end-to-end
  • High-throughput verification (10k+ rps)

Phase 2: Internal Data Storage (Weeks 7-10) 🔥 INTERNAL

Priority: PostgreSQL → Neptune

Rationale:

  • PostgreSQL: Internal relational data requirement, ACID transactions
  • Neptune: Internal graph data requirement, specialized use case
  • Production parity: Both backends mirror production environment

Deliverables:

  • PostgreSQL plugin with transaction support, LISTEN/NOTIFY, JSON
  • Neptune plugin with Gremlin traversals (AWS SDK)
  • Outbox pattern implementation (PostgreSQL)
  • Graph query patterns (Neptune)

Phase 3: External/Supporting Backends (Weeks 11-14)

Priority: Redis → SQLite → S3/MinIO

Rationale:

  • Redis: General-purpose cache, widely used
  • SQLite: Embedded testing, CI/CD optimization
  • S3/MinIO: Claim Check pattern support

Deliverables:

  • Redis plugin for caching patterns
  • SQLite plugin for embedded demos
  • S3/MinIO plugin for large payload handling
  • Claim Check pattern implementation

Phase 4: Analytics (Weeks 15-16)

Priority: ClickHouse

Rationale:

  • Observability and metrics use cases
  • TimeSeries data model
  • Optional: Lower priority than internal needs

Deliverables:

  • ClickHouse plugin for analytical queries
  • Metrics aggregation patterns
  • Batch insert optimization

Demo Plugin Configurations

Demo 0: MemStore In-Memory KeyValue (Simplest) 🔥 INTERNAL

Purpose: Demonstrate simplest possible plugin with zero external dependencies

# config/demo-memstore.yaml
namespaces:
- name: dev-cache
pattern: keyvalue

needs:
latency: &lt;1μs
ttl: true
persistence: false

backend:
type: memstore
# No configuration needed - runs in-process

Client Code:

// Demo: Instant GET/SET operations
client.Set("session:abc123", sessionData, 5*time.Minute)
value := client.Get("session:abc123")

// No Docker, no containers, no startup time

Test Focus:

  • Zero-dependency testing
  • Thread-safe concurrency (sync.Map)
  • TTL expiration
  • Performance baseline (<1μs latency)

Use Cases:

  • CI/CD: Fastest test execution (no container overhead)
  • Learning: Reference implementation for new plugin developers
  • Rapid prototyping: Test proxy and client patterns instantly
  • Local dev: No Docker required

Demo 1: Redis KeyValue Store

Purpose: Show simplest possible plugin

# config/demo-redis.yaml
namespaces:
- name: cache
pattern: keyvalue

needs:
latency: &lt;1ms
ttl: true

backend:
type: redis
host: localhost
port: 6379

Client Code:

// Demo: GET/SET operations
client.Set("user:123", userData, 300*time.Second) // 5 min TTL
value := client.Get("user:123")

Test Focus:

  • Authentication (password)
  • TTL expiration
  • Connection pooling

Demo 2: PostgreSQL with Transactions

Purpose: Show transactional reliability

namespaces:
- name: orders
pattern: transactional-queue

needs:
consistency: strong
durability: fsync

backend:
type: postgres
database: orders

Client Code:

// Demo: Outbox pattern
tx := client.BeginTx()
tx.Execute("INSERT INTO orders (...) VALUES (...)")
tx.Publish("order-events", orderEvent)
tx.Commit() // Atomic

Test Focus:

  • ACID transactions
  • Outbox pattern verification
  • Rollback behavior

Demo 3: Kafka Event Streaming

Purpose: Show high-throughput messaging

namespaces:
- name: events
pattern: event-stream

needs:
throughput: 10k rps
retention: 7days
ordered: true

backend:
type: kafka
brokers: [localhost:9092]
partitions: 10

Client Code:

// Demo: Producer
for i := 0; i < 10000; i++ {
client.Publish("events", event)
}

// Demo: Consumer with consumer group
for event := range client.Subscribe("events", "group1") {
process(event)
event.Ack()
}

Test Focus:

  • Consumer groups
  • Partitioning
  • Offset management

Demo 4: S3 Large Payload (Claim Check)

Purpose: Show pattern composition

namespaces:
- name: videos
pattern: pubsub

needs:
max_message_size: 5GB
storage_backend: s3

backend:
type: kafka # For metadata
storage:
type: s3
bucket: prism-claim-checks

Client Code:

// Demo: Transparent large payload handling
video := loadVideo("movie.mp4") // 2GB
client.Publish("videos", video) // Prism stores in S3 automatically

// Consumer gets full video
event := client.Subscribe("videos")
video := event.Payload // Prism fetches from S3 transparently

Test Focus:

  • Claim Check pattern
  • S3 upload/download
  • Cleanup after consumption

Demo 5: Multi-Backend Composition

Purpose: Show layered architecture power

namespaces:
- name: ml-models
pattern: pubsub

needs:
consistency: strong # → Outbox (Postgres)
max_message_size: 5GB # → Claim Check (S3)
durability: strong # → WAL
retention: 30days # → Tiered Storage

backends:
transactional: postgres
storage: s3
queue: kafka

Client Code:

// Demo: All patterns composed automatically
with client.transaction() as tx:
tx.execute("INSERT INTO model_registry ...")
tx.publish("model-releases", model_weights) // 2GB
tx.commit()

// Prism automatically:
// 1. Writes to WAL (durability)
// 2. Stores model in S3 (claim check)
// 3. Inserts to Postgres outbox (transactional)
// 4. Publishes to Kafka (queue)

Test Focus:

  • Pattern composition
  • End-to-end flow
  • Failure recovery

Testing Infrastructure Requirements

Docker Compose for Local Testing

# docker-compose.test.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]

postgres:
image: postgres:16-alpine
environment:
POSTGRES_PASSWORD: testpass
ports:
- "5432:5432"
healthcheck:
test: ["CMD", "pg_isready"]

nats:
image: nats:2-alpine
ports:
- "4222:4222"
healthcheck:
test: ["CMD", "wget", "-q", "-O-", "http://localhost:8222/healthz"]

kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
ports:
- "9092:9092"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]

minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9000:9000"
- "9001:9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]

clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "9000:9000"
- "8123:8123"
healthcheck:
test: ["CMD", "wget", "-q", "-O-", "http://localhost:8123/ping"]

Usage:

# Start all backends
docker-compose -f docker-compose.test.yml up -d

# Run acceptance tests
go test ./tests/acceptance/... -v

# Stop all backends
docker-compose -f docker-compose.test.yml down

Appendix: Go SDK Comparison

Package Recommendations

BackendPrimary SDKAlternativeNotes
Redisgithub.com/redis/go-redis/v9github.com/gomodule/redigogo-redis is modern, v9+ uses context
PostgreSQLgithub.com/jackc/pgx/v5github.com/lib/pqpgx is faster, better error handling
SQLitegithub.com/mattn/go-sqlite3modernc.org/sqlite (pure Go)mattn requires CGO but faster
NATSgithub.com/nats-io/nats.go (official)-Official client, well-maintained
Kafkagithub.com/segmentio/kafka-gogithub.com/confluentinc/confluent-kafka-go/v2segmentio pure Go, confluent has C deps but faster
S3github.com/aws/aws-sdk-go-v2github.com/minio/minio-go/v7AWS SDK for production, MinIO for dev
ClickHousegithub.com/ClickHouse/clickhouse-go/v2-Official client
Neptunegithub.com/apache/tinkerpop/gremlin-go-Gremlin traversal language

Installation

# Redis
go get github.com/redis/go-redis/v9

# PostgreSQL
go get github.com/jackc/pgx/v5

# SQLite
go get github.com/mattn/go-sqlite3

# NATS
go get github.com/nats-io/nats.go

# Kafka
go get github.com/segmentio/kafka-go

# S3
go get github.com/aws/aws-sdk-go-v2/service/s3
go get github.com/minio/minio-go/v7

# ClickHouse
go get github.com/ClickHouse/clickhouse-go/v2

# testcontainers
go get github.com/testcontainers/testcontainers-go

Summary

Implementation Priority (Internal Needs First):

Internal Priority (Must Have): 0. 🔥 MemStore (Score: 100) - Start here, zero dependencies, instant testing

  1. 🔥 Kafka (Score: 78) - Internal messaging requirement, event streaming
  2. 🔥 NATS (Score: 90) - Internal pub/sub requirement, lightweight
  3. 🔥 PostgreSQL (Score: 93) - Internal relational data, ACID transactions
  4. 🔥 Neptune (Score: 50) - Internal graph data requirement

External/Supporting (Nice to Have): 5. ⏭️ Redis (Score: 95) - General caching, external clients 6. ⏭️ SQLite (Score: 92) - Embedded testing, CI/CD optimization 7. ⏭️ S3/MinIO (Score: 85) - Claim Check pattern support 8. ⏭️ ClickHouse (Score: 70) - Analytics and observability

Key Takeaways:

  • Start with MemStore: Zero external dependencies, establishes plugin patterns
  • Prioritize internal needs: Kafka, NATS, PostgreSQL, Neptune are production-critical
  • Use testcontainers: For all backends except MemStore (in-process) and SQLite (embedded)
  • Build acceptance tests: Alongside plugin implementation using RFC-015 framework
  • Validate patterns early: MemStore enables immediate pattern validation without infrastructure
  • Phase external backends: Redis, SQLite, S3, ClickHouse after internal needs satisfied

References

Go SDK Documentation

Backend Documentation


Revision History

  • 2025-10-10: Implementation Update - Documented architecture refactoring (drivers vs patterns), interface metadata system, SDK pattern for drivers, and interface-based acceptance testing
  • 2025-10-09: Re-prioritized backends based on internal needs (Kafka, NATS, Neptune, PostgreSQL first); added MemStore in-memory plugin as simplest reference implementation
  • 2025-10-09: Initial draft with backend comparison matrix, implementability scoring, and demo plugin configurations

Implementation Learnings (2025-10-10)

Architecture Refactoring: Drivers vs Patterns

Problem: Initial implementation conflated backend drivers (Redis, Postgres) with patterns (KeyValue, PubSub).

Solution: Established clear separation:

Backends (Redis, PostgreSQL, Kafka)

Backend Drivers (drivers/redis, drivers/postgres)

Patterns (patterns/keyvalue, patterns/pubsub)

Applications (client code)

Directory Structure:

  • drivers/memstore/ - In-memory driver (moved from patterns/memstore)
  • drivers/redis/ - Redis driver (moved from patterns/redis)
  • patterns/keyvalue/ - KeyValue pattern that wraps backend drivers
  • patterns/pubsub/ - PubSub pattern that wraps NATS/Kafka drivers
  • patterns/stream/ - Stream pattern that wraps Kafka driver

Key Insight: Patterns are client-facing abstractions. Drivers are backend-specific implementations. One pattern can use multiple driver backends.

Backend Interface Metadata System

Implemented: patterns/core/interfaces.go - Declarative metadata for pattern slot matching

45 Backend Interface Constants:

// KeyValue interfaces (6)
InterfaceKeyValueBasic // Set, Get, Delete, Exists
InterfaceKeyValueScan // Scan, ScanKeys, Count
InterfaceKeyValueTTL // Expire, GetTTL, Persist
InterfaceKeyValueTransactional // BeginTx, Commit, Rollback
InterfaceKeyValueBatch // BatchSet, BatchGet, BatchDelete
InterfaceKeyValueCAS // CompareAndSwap

// PubSub interfaces (5)
InterfacePubSubBasic // Publish, Subscribe, Unsubscribe
InterfacePubSubWildcards // Pattern subscriptions
InterfacePubSubPersistent // Durable messages with offsets
InterfacePubSubFiltering // Server-side filtering
InterfacePubSubOrdering // Message ordering guarantees

// Stream interfaces (5)
InterfaceStreamBasic // Append, Read, GetLatestOffset
InterfaceStreamConsumerGroups // CreateGroup, Join, Ack
InterfaceStreamReplay // SeekToOffset, SeekToTimestamp
InterfaceStreamRetention // SetRetention, Compact
InterfaceStreamPartitioning // GetPartitions, AppendToPartition

// ...and 29 more across Queue, List, Set, SortedSet, TimeSeries, Graph, Document

Driver Metadata Example (MemStore):

func (m *MemStore) Metadata() *core.BackendInterfaceMetadata {
return &core.BackendInterfaceMetadata{
DriverName: "memstore",
Version: "0.1.0",
Interfaces: []core.BackendInterface{
core.InterfaceKeyValueBasic, // Set, Get, Delete, Exists
core.InterfaceKeyValueTTL, // TTL support
},
Description: "In-memory Go map for local testing",
ConnectionStringFormat: "mem://local",
}
}

Driver Metadata Example (Redis - 24 interfaces!):

func (r *RedisPattern) Metadata() *core.BackendInterfaceMetadata {
return &core.BackendInterfaceMetadata{
DriverName: "redis",
Version: "0.1.0",
Interfaces: []core.BackendInterface{
// KeyValue (5 of 6)
core.InterfaceKeyValueBasic, core.InterfaceKeyValueScan,
core.InterfaceKeyValueTTL, core.InterfaceKeyValueTransactional,
core.InterfaceKeyValueBatch,

// PubSub (2 of 5)
core.InterfacePubSubBasic, core.InterfacePubSubWildcards,

// Stream (4 of 5)
core.InterfaceStreamBasic, core.InterfaceStreamConsumerGroups,
core.InterfaceStreamReplay, core.InterfaceStreamRetention,

// List (4 of 4)
core.InterfaceListBasic, core.InterfaceListIndexing,
core.InterfaceListRange, core.InterfaceListBlocking,

// Set (4 of 4)
core.InterfaceSetBasic, core.InterfaceSetOperations,
core.InterfaceSetCardinality, core.InterfaceSetRandom,

// SortedSet (5 of 5)
core.InterfaceSortedSetBasic, core.InterfaceSortedSetRange,
core.InterfaceSortedSetRank, core.InterfaceSortedSetOperations,
core.InterfaceSortedSetLex,
},
Description: "In-memory data structure store with persistence",
ConnectionStringFormat: "redis://host:port/db or rediss://host:port/db (TLS)",
}
}

Usage: At configuration time, patterns can query driver metadata to match requirements:

// Pattern requires these interfaces for a slot
required := []core.BackendInterface{
core.InterfaceKeyValueBasic,
core.InterfaceKeyValueTTL,
}

// Check if driver implements all required interfaces
driver := memstore.New()
metadata := driver.Metadata()
if metadata.ImplementsAll(required) {
// Driver is suitable for this pattern slot
}

SDK Pattern for Backend Drivers

Implemented: patterns/core/serve.go - ServeBackendDriver() function

Before (65 lines of boilerplate in every driver):

func main() {
configPath := flag.String("config", "config.yaml", ...)
grpcPort := flag.Int("grpc-port", 0, ...)
flag.Parse()

logger := slog.New(slog.NewJSONHandler(os.Stdout, ...))
slog.SetDefault(logger)

config, err := core.LoadConfig(*configPath)
if err != nil {
// Create default config...
}

if *grpcPort != 0 {
config.ControlPlane.Port = *grpcPort
}

plugin := memstore.New()

if err := core.BootstrapWithConfig(plugin, config); err != nil {
log.Fatal(err)
}
}

After (25 lines with SDK pattern):

func main() {
core.ServeBackendDriver(func() core.Plugin {
return memstore.New()
}, core.ServeOptions{
DefaultName: "memstore",
DefaultVersion: "0.1.0",
DefaultPort: 0, // Dynamic port allocation
ConfigPath: "config.yaml",
})
}

SDK Handles:

  • Flag parsing (--config, --grpc-port, --debug)
  • Logging setup (structured JSON logging)
  • Config loading with defaults
  • Dynamic port allocation (0 = OS assigns available port)
  • Driver lifecycle (Initialize → Start → Stop)
  • Error handling and logging

Result: Reduced driver main.go from 65 lines to 25 lines (62% reduction). All common logic moved to SDK.

Interface-Based Acceptance Testing

Implemented: tests/acceptance/interfaces/ - Test interfaces across multiple backends

Structure:

// tests/acceptance/interfaces/keyvalue_basic_test.go

// Define interface being tested
type KeyValueBasicDriver interface {
Set(key string, value []byte, ttlSeconds int64) error
Get(key string) ([]byte, bool, error)
Delete(key string) error
Exists(key string) (bool, error)
}

// Backend driver setup table
backendDrivers := []BackendDriverSetup{
{
Name: "Redis",
SetupFunc: setupRedisDriver, // Uses testcontainers
SupportsTTL: true,
SupportsScan: true,
},
{
Name: "MemStore",
SetupFunc: setupMemStoreDriver, // In-process, no container
SupportsTTL: true,
SupportsScan: false, // Intentionally minimal
},
// Add PostgreSQL, DynamoDB, etcd here...
}

// Single test runs against ALL backends
for _, backend := range backendDrivers {
t.Run(backend.Name, func(t *testing.T) {
driver, cleanup := backend.SetupFunc(t, ctx)
defer cleanup()

// Test Set/Get
err := driver.Set("test-key", []byte("test-value"), 0)
require.NoError(t, err)

value, found, err := driver.Get("test-key")
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, "test-value", string(value))
})
}

Benefits:

  • Single test suite validates multiple backend drivers
  • Easy to add new backends: 3 lines of code in table
  • Interface compliance verification: Ensures drivers implement contracts correctly
  • Consistent behavior: Same assertions across all backends

Test Files:

  • keyvalue_basic_test.go - Tests KeyValue basic operations (Set, Get, Delete, Exists)
  • keyvalue_ttl_test.go - Tests TTL expiration (only runs on TTL-supporting backends)

Key Insight: Test the interface (KeyValue), not the backend (Redis). Backends are interchangeable implementations.

Updated Terminology (MEMO-006 Alignment)

Documented in: patterns/core/plugin.go

// Plugin represents a backend driver lifecycle.
//
// TERMINOLOGY (from MEMO-006):
// - Backend: The actual storage/messaging system (Redis, PostgreSQL, Kafka, NATS, etc.)
// - Backend Driver: The Go implementation that interfaces with a backend (this interface)
// - Pattern: The data access pattern being implemented (KeyValue, PubSub, Stream, etc.)
// - Interface: Thin proto service definitions (keyvalue_basic, keyvalue_ttl, pubsub_basic, etc.)
//
// A Backend Driver (Plugin):
// - Connects to a specific Backend (e.g., Redis, PostgreSQL)
// - Implements one or more Patterns (e.g., KeyValue, PubSub)
// - Supports multiple Interfaces within those patterns (e.g., keyvalue_basic + keyvalue_ttl)
//
// Example: The Redis backend driver implements:
// - KeyValue pattern (keyvalue_basic, keyvalue_scan, keyvalue_ttl interfaces)
// - PubSub pattern (pubsub_basic, pubsub_wildcards interfaces)
// - Stream pattern (stream_basic, stream_consumer_groups interfaces)

Implementation Status

Completed:

  • ✅ MemStore driver with metadata (drivers/memstore/)
  • ✅ Redis driver with metadata (drivers/redis/)
  • ✅ KeyValue pattern (patterns/keyvalue/)
  • ✅ Backend interface metadata system (patterns/core/interfaces.go)
  • ✅ SDK pattern for drivers (patterns/core/serve.go)
  • ✅ Interface-based acceptance tests (tests/acceptance/interfaces/)
  • ✅ Both drivers compile and build successfully

Next Steps:

  • Create PostgreSQL driver (drivers/postgres/)
  • Create Meilisearch driver for search (drivers/meilisearch/)
  • Create PubSub pattern (patterns/pubsub/) using NATS driver
  • Create Stream pattern (patterns/stream/) using Kafka driver
  • Add more interface-based tests (batch, scan, transactional)
  • Run full acceptance test suite