MEMO-086: Composable Pattern Architecture with SessionManager
Executive Summary
This memo describes Prism's composable pattern architecture, where a single namespace can compose multiple patterns together with SessionManager as a first-class pattern. SessionManager uses slot-based backend configuration to declare dependencies (session storage, event publishing) and produces session lifecycle events. Other patterns (KeyValue, Producer, Consumer) become session-aware, operating within authenticated session contexts.
Key Innovation: Patterns compose like microservices - SessionManager provides authentication infrastructure through declared slots, while data patterns proxy operations through session context.
Context
Phase 2 Vault integration is complete with full authentication infrastructure:
pkg/authz/package with SessionManager- JWT validation and Vault token exchange
- Dynamic credential generation and renewal
- Service identity authentication (K8s + AWS)
Architectural Shift: Instead of integrating SessionManager into each pattern separately, we treat patterns as composable components. A single namespace can declare multiple patterns that work together, with SessionManager as a composable pattern that provides authentication infrastructure through slot-based dependencies.
Composable Pattern Concepts
1. Single Namespace, Multiple Patterns
namespaces:
- name: team-alpha
description: "Team Alpha's authenticated data workspace"
# Multiple patterns composed in one namespace
patterns:
- type: session-manager # Authentication infrastructure
- type: keyvalue # Data pattern (session-aware)
- type: producer # Messaging pattern (session-aware)
- type: consumer # Messaging pattern (session-aware)
2. Slot-Based Backend Configuration
Patterns declare requires (dependencies) and produces (outputs):
# SessionManager pattern
- type: session-manager
version: 0.1.0
# Dependencies: What backends this pattern needs
requires:
- slot: session-store # Where to store session metadata
pattern: keyvalue
backend: redis
config:
host: redis-sessions.prism.internal
port: 6379
# Outputs: What this pattern produces
produces:
- slot: session-events # Where to publish session lifecycle events
pattern: producer
backend: nats
config:
servers: ["nats://nats.prism.internal:4222"]
subject: "prism.sessions.{namespace}.{event}"
3. Session-Aware Patterns
Data patterns mark themselves as session-aware to operate within session context:
# KeyValue pattern (session-aware)
- type: keyvalue
version: 0.1.0
session_aware: true # Operations use session credentials
config:
backend: postgres
backend_config:
host: postgres.prism.internal
# No credentials - fetched from session!
Benefits of Composable Patterns
- Separation of Concerns: SessionManager handles auth, data patterns handle operations
- Flexible Composition: Mix and match patterns in any namespace
- Explicit Dependencies: Slots make backend requirements clear
- Reusable Infrastructure: SessionManager works with any data pattern
- Testable Components: Each pattern can be tested independently
- Observable Lifecycle: Session events provide audit trail
Integration Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Composable Namespace: team-alpha │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ SessionManager Pattern (Infrastructure) │ │
│ │ │ │
│ │ Requires: │ │
│ │ [session-store] ──▶ Redis (session metadata) │ │
│ │ │ │
│ │ Produces: │ │
│ │ [session-events] ──▶ NATS (lifecycle events) │ │
│ │ │ │
│ │ Functions: │ │
│ │ • JWT validation ──▶ TokenValidator + JWKS cache │ │
│ │ • Vault token exchange ──▶ VaultClient │ │
│ │ • Credential generation ──▶ BackendCredentials │ │
│ │ • Session lifecycle ──▶ Create/Renew/Revoke/Expire │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ Provides session context │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Session-Aware Data Patterns │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ KeyValue │ │ Producer │ │ Consumer │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Postgres │ │ Kafka │ │ Kafka │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Uses: │ │ Uses: │ │ Uses: │ │ │
│ │ │ • Session │ │ • Session │ │ • Session │ │ │
│ │ │ creds │ │ creds │ │ creds │ │ │
│ │ │ • User ID │ │ • User ID │ │ • User ID │ │ │
│ │ │ • Namespace │ │ • Namespace │ │ • Namespace │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ Request Flow: │
│ 1. Client ──[JWT]──▶ SessionManager ──[Validate]──▶ Create Session│
│ 2. SessionManager ──[Store]──▶ Redis (session-store slot) │
│ 3. SessionManager ──[Publish]──▶ NATS (session-events slot) │
│ 4. SessionManager ──[Provide]──▶ KeyValue/Producer/Consumer │
│ 5. Pattern ──[Execute]──▶ Backend (with session credentials) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Configuration Schema
Complete Composable Namespace Configuration
# namespace-config.yaml
namespaces:
- name: team-alpha
description: "Team Alpha's authenticated data workspace"
# Multiple patterns composed together
patterns:
# 1. SessionManager Pattern (Infrastructure)
- type: session-manager
version: 0.1.0
config:
# Token validation
token:
issuer: https://dex.prism.local:5556/dex
audience: prism-patterns
cache_ttl: 1h
# Vault configuration
vault:
address: https://vault.prism.internal:8200
jwt_auth:
role: prism-patterns
auth_path: auth/jwt
credentials:
secret_path: database/creds/team-alpha-role
renew_interval: 1800s # 30 minutes
tls:
enabled: true
ca_cert: /etc/prism/vault-ca.pem
# Session management
session:
ttl: 1h
idle_timeout: 30m
cleanup_interval: 15m
# Required backend slots
requires:
- slot: session-store
pattern: keyvalue
backend: redis
config:
host: redis-sessions.prism.internal
port: 6379
db: 0
# Produced event slots
produces:
- slot: session-events
pattern: producer
backend: nats
config:
servers: ["nats://nats.prism.internal:4222"]
subject: "prism.sessions.{namespace}.{event}"
# Events: created, authenticated, credential_renewed, expired, revoked
# 2. KeyValue Pattern (Session-Aware)
- type: keyvalue
version: 0.1.0
session_aware: true # Requires SessionManager in namespace
config:
backend: postgres
backend_config:
host: postgres.prism.internal
port: 5432
database: team_alpha_data
# NO CREDENTIALS - fetched from session!
# 3. Producer Pattern (Session-Aware)
- type: producer
version: 0.1.0
session_aware: true
config:
backend: kafka
backend_config:
brokers: ["kafka.prism.internal:9092"]
# NO CREDENTIALS - fetched from session!
# 4. Consumer Pattern (Session-Aware)
- type: consumer
version: 0.1.0
session_aware: true
config:
backend: kafka
backend_config:
brokers: ["kafka.prism.internal:9092"]
group_id: "team-alpha-consumers"
# NO CREDENTIALS - fetched from session!
Configuration Validation Rules
-
SessionManager Requirements:
- MUST have
session-storeslot (KeyValue backend) - SHOULD have
session-eventsslot (Producer backend)
- MUST have
-
Session-Aware Patterns:
- MUST NOT specify credentials in config
- REQUIRE SessionManager in namespace
- RECEIVE credentials from session context
-
Slot Naming:
- Must be unique within namespace
- Follow pattern:
{purpose}-{type}(e.g.,session-store,audit-events)
-
Namespace Composition:
- Max 1 SessionManager per namespace
- Unlimited session-aware patterns
- SessionManager must be declared first
Implementation Status
✅ Completed
-
Configuration Parser (
pkg/config/)- ComposableNamespace types with validation
- PatternSlot for requires/produces declarations
- Comprehensive validation rules
- 100% test coverage (all tests passing)
- Located:
pkg/config/composable.go,pkg/config/loader.go
-
Integration Test (
tests/testing/composable_patterns_test.go)- Demonstrates full composable pattern architecture
- Tests SessionManager with slot-based backends
- Validates configuration validation logic
- Located:
tests/testing/composable_patterns_test.go
-
Example Configuration (
examples/configs/composable-namespace.yaml)- Complete working example
- SessionManager with Redis + NATS slots
- Session-aware KeyValue, Producer, Consumer
- Documented usage flow
-
Backend Test Helpers
- Updated Redis, NATS, Postgres, Kafka backends
- Added
StartX()andAddress()methods - Support non-testing variants for integration tests
- Located:
tests/testing/backends/
-
Session Context Middleware (
pkg/plugin/session_middleware.go)- ✅ gRPC unary and stream interceptors for JWT extraction
- ✅ Session creation and caching (token hash-based)
- ✅ Credential injection helper functions
- ✅ Session context propagation through gRPC metadata
- ✅ Supports both Bearer and x-prism-token headers
- ✅ Unit tests passing (disabled mode, credential injection)
- ✅ Mock implementations for testing (
pkg/authz/test_mocks.go) - ✅ Cleanup of expired sessions from cache
- Located:
pkg/plugin/session_middleware.go,pkg/plugin/session_middleware_test.go
🚧 In Progress
- Session-Aware Pattern Runners
- Update KeyValue pattern for session operations
- Update Producer pattern for session operations
- Update Consumer pattern for session operations
- Integrate SessionMiddleware into pattern runners
📋 Pending
- SessionManager Pattern Runner
- Implement pattern runner with slot support
- Initialize slot backends (session-store, session-events)
- Integrate with pkg/authz SessionManager
- Provide session context to data patterns
- Session lifecycle event publishing
Implementation Steps
Step 1: Update Config Structures (✅ Complete)
File: patterns/keyvalue/cmd/keyvalue-runner/main.go
// Add auth configuration to Namespace struct
type Namespace struct {
Name string `yaml:"name"`
Pattern string `yaml:"pattern"`
PatternVersion string `yaml:"pattern_version"`
Description string `yaml:"description"`
Backend string `yaml:"backend"`
BackendConfig map[string]interface{} `yaml:"backend_config"`
// NEW: Authentication configuration
Auth *AuthConfig `yaml:"auth,omitempty"`
}
// AuthConfig contains authentication settings
type AuthConfig struct {
Enabled bool `yaml:"enabled"`
Token TokenConfig `yaml:"token"`
Vault VaultConfig `yaml:"vault"`
Session SessionConfig `yaml:"session"`
}
// TokenConfig contains JWT validation settings
type TokenConfig struct {
Issuer string `yaml:"issuer"`
Audience string `yaml:"audience"`
CacheTTL string `yaml:"cache_ttl"`
}
// VaultConfig contains Vault connection settings
type VaultConfig struct {
Address string `yaml:"address"`
JWTAuth JWTAuthConfig `yaml:"jwt_auth"`
Credentials CredentialsConfig `yaml:"credentials"`
TLS TLSConfig `yaml:"tls"`
}
// JWTAuthConfig contains JWT auth method settings
type JWTAuthConfig struct {
Role string `yaml:"role"`
AuthPath string `yaml:"auth_path"`
}
// CredentialsConfig contains credential fetching settings
type CredentialsConfig struct {
SecretPath string `yaml:"secret_path"`
RenewInterval string `yaml:"renew_interval"`
}
// TLSConfig contains TLS settings
type TLSConfig struct {
Enabled bool `yaml:"enabled"`
CACert string `yaml:"ca_cert"`
SkipVerify bool `yaml:"skip_verify"`
}
// SessionConfig contains session management settings
type SessionConfig struct {
TTL string `yaml:"ttl"`
IdleTimeout string `yaml:"idle_timeout"`
CleanupInterval string `yaml:"cleanup_interval"`
}
Step 2: Add SessionManager to Runner
import (
"github.com/jrepp/prism-data-layer/pkg/authz"
// ... other imports
)
// KeyValueRunner runs a keyvalue pattern with authentication
type KeyValueRunner struct {
config *Config
kv *keyvalue.KeyValue
backend plugin.Plugin
grpcServer *keyvalue.GRPCServer
// NEW: Authentication components
sessionManager *authz.SessionManager
tokenExtractor *authz.TokenExtractor
sessionCache map[string]*authz.Session // sessionID -> Session
cacheMu sync.RWMutex
}
Step 3: Initialize Authentication
// InitializeAuth initializes authentication components
func (r *KeyValueRunner) InitializeAuth(ctx context.Context, authConfig *AuthConfig) error {
if authConfig == nil || !authConfig.Enabled {
log.Println("[KEYVALUE-RUNNER] Authentication disabled, using static credentials")
return nil
}
log.Println("[KEYVALUE-RUNNER] Initializing authentication...")
// Parse durations
cacheTTL, err := time.ParseDuration(authConfig.Token.CacheTTL)
if err != nil {
return fmt.Errorf("invalid token cache_ttl: %w", err)
}
sessionTTL, err := time.ParseDuration(authConfig.Session.TTL)
if err != nil {
return fmt.Errorf("invalid session ttl: %w", err)
}
idleTimeout, err := time.ParseDuration(authConfig.Session.IdleTimeout)
if err != nil {
return fmt.Errorf("invalid session idle_timeout: %w", err)
}
// 1. Create TokenValidator
tokenValidator, err := authz.NewTokenValidator(
authConfig.Token.Issuer,
authConfig.Token.Audience,
)
if err != nil {
return fmt.Errorf("failed to create token validator: %w", err)
}
// 2. Create VaultClient
vaultClient, err := authz.NewVaultClient(authz.VaultConfig{
Address: authConfig.Vault.Address,
Role: authConfig.Vault.JWTAuth.Role,
AuthPath: authConfig.Vault.JWTAuth.AuthPath,
SecretPath: authConfig.Vault.Credentials.SecretPath,
TLSConfig: &authz.TLSConfig{
Enabled: authConfig.Vault.TLS.Enabled,
CACert: authConfig.Vault.TLS.CACert,
SkipVerify: authConfig.Vault.TLS.SkipVerify,
},
})
if err != nil {
return fmt.Errorf("failed to create Vault client: %w", err)
}
// 3. Create SessionManager
sessionManager, err := authz.NewSessionManager(authz.SessionManagerConfig{
TokenValidator: tokenValidator,
VaultClient: vaultClient,
SessionTTL: sessionTTL,
IdleTimeout: idleTimeout,
})
if err != nil {
return fmt.Errorf("failed to create session manager: %w", err)
}
// 4. Create TokenExtractor
tokenExtractor := authz.NewTokenExtractor()
// 5. Start background cleanup
cleanupInterval, _ := time.ParseDuration(authConfig.Session.CleanupInterval)
go sessionManager.StartBackgroundCleanup(cleanupInterval)
r.sessionManager = sessionManager
r.tokenExtractor = tokenExtractor
r.sessionCache = make(map[string]*authz.Session)
log.Println("[KEYVALUE-RUNNER] ✅ Authentication initialized")
return nil
}
Step 4: Session-Based Backend Connection
// GetOrCreateSession gets or creates a session for the current request
func (r *KeyValueRunner) GetOrCreateSession(ctx context.Context) (*authz.Session, error) {
if r.sessionManager == nil {
return nil, fmt.Errorf("authentication not enabled")
}
// Extract JWT from gRPC metadata
token, err := r.tokenExtractor.ExtractToken(ctx)
if err != nil {
return nil, fmt.Errorf("failed to extract token: %w", err)
}
// Check session cache first (avoid creating duplicate sessions)
sessionID := hashToken(token) // Use token hash as cache key
r.cacheMu.RLock()
session, exists := r.sessionCache[sessionID]
r.cacheMu.RUnlock()
if exists && !session.IsExpired() {
session.UpdateLastAccessed()
return session, nil
}
// Create new session
session, err = r.sessionManager.CreateSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to create session: %w", err)
}
// Cache session
r.cacheMu.Lock()
r.sessionCache[sessionID] = session
r.cacheMu.Unlock()
log.Printf("[KEYVALUE-RUNNER] Created new session: %s (user: %s)",
session.SessionID, session.UserID)
return session, nil
}
// hashToken creates a cache key from JWT token
func hashToken(token string) string {
hash := sha256.Sum256([]byte(token))
return hex.EncodeToString(hash[:])
}
Step 5: Update Backend Initialization
// initializeBackendWithAuth initializes backend with per-session credentials
func (r *KeyValueRunner) initializeBackendWithAuth(ctx context.Context, backendName string, backendConfig map[string]interface{}) (plugin.Plugin, error) {
log.Printf("[KEYVALUE-RUNNER] Initializing authenticated backend: %s", backendName)
// Get session and credentials
session, err := r.GetOrCreateSession(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get session: %w", err)
}
credentials := session.GetCredentials()
// Inject credentials into backend config
backendConfig["username"] = credentials.Username
backendConfig["password"] = credentials.Password
log.Printf("[KEYVALUE-RUNNER] Using dynamic credentials: username=%s, lease=%s",
credentials.Username, credentials.LeaseID)
// Initialize backend (same as before, but with dynamic credentials)
return r.initializeBackend(ctx, backendName, backendConfig)
}
Testing Strategy
Unit Tests
File: patterns/keyvalue/cmd/keyvalue-runner/auth_test.go
func TestKeyValueRunner_InitializeAuth(t *testing.T) {
// Test auth initialization with valid config
// Test auth initialization with invalid config
// Test auth disabled mode
}
func TestKeyValueRunner_GetOrCreateSession(t *testing.T) {
// Test session creation with valid JWT
// Test session caching
// Test expired session handling
}
Integration Tests
File: patterns/keyvalue/auth_integration_test.go
func TestKeyValueRunner_E2E_WithVault(t *testing.T) {
// 1. Start Vault testcontainer
// 2. Configure JWT auth and database secrets
// 3. Start KeyValue runner with auth enabled
// 4. Make request with JWT token
// 5. Verify dynamic credentials used
// 6. Verify credential renewal
}
Migration Path
Phase 1: Dual Mode Support (Week 1)
Goal: Support both authenticated and unauthenticated modes
- Add auth configuration (optional)
- Implement SessionManager integration
- Keep static credentials as fallback
- Test with auth disabled (no breaking changes)
Phase 2: Auth-Enabled Deployments (Week 2)
Goal: Deploy auth-enabled runners to staging
- Deploy Vault infrastructure
- Configure JWT auth and database secrets
- Deploy pattern runners with auth enabled
- Validate E2E flows
Phase 3: Mandatory Authentication (Week 3)
Goal: Make authentication mandatory
- Remove static credential support
- Require auth configuration
- Update all deployments
- Monitor and optimize
Rollout Strategy
Development Environment
# 1. Start Vault in dev mode
vault server -dev -dev-root-token-id="root-token"
# 2. Start Dex for OIDC
docker-compose up dex
# 3. Start pattern runner with auth
./keyvalue-runner -config config-auth-enabled.yaml
Staging Environment
# 1. Deploy Vault cluster (HA mode)
# 2. Configure JWT auth with production OIDC provider
# 3. Set up database secrets engines
# 4. Deploy pattern runners with auth configuration
# 5. Run smoke tests
Production Rollout
Canary Deployment:
- Deploy auth-enabled runners to 10% of traffic
- Monitor metrics (latency, error rate, credential usage)
- Gradually increase to 50%, then 100%
- Rollback if issues detected
Monitoring
Key Metrics
# Session creation rate
rate(prism_session_created_total[5m])
# Session creation latency
histogram_quantile(0.99, rate(prism_session_creation_duration_bucket[5m]))
# Active sessions
prism_sessions_active{pattern="keyvalue"}
# Credential renewal success rate
rate(prism_credential_renewal_total{result="success"}[5m]) /
rate(prism_credential_renewal_total[5m])
# Backend operation latency (with auth)
histogram_quantile(0.99, rate(prism_backend_operation_duration_bucket{auth="enabled"}[5m]))
Alerts
- alert: HighSessionCreationLatency
expr: histogram_quantile(0.99, rate(prism_session_creation_duration_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High session creation latency (p99 > 500ms)"
- alert: CredentialRenewalFailure
expr: rate(prism_credential_renewal_total{result="failure"}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High credential renewal failure rate"
Related Documents
- MEMO-083: Phase 2 Vault Integration Implementation Plan
- MEMO-084: Vault Operator Guide
- MEMO-085: Vault Troubleshooting Guide
- RFC-062: Unified Authentication and Session Management
Revision History
- 2025-11-17: Initial pattern runner integration guide