Skip to main content

MEMO-086: Composable Pattern Architecture with SessionManager

Executive Summary

This memo describes Prism's composable pattern architecture, where a single namespace can compose multiple patterns together with SessionManager as a first-class pattern. SessionManager uses slot-based backend configuration to declare dependencies (session storage, event publishing) and produces session lifecycle events. Other patterns (KeyValue, Producer, Consumer) become session-aware, operating within authenticated session contexts.

Key Innovation: Patterns compose like microservices - SessionManager provides authentication infrastructure through declared slots, while data patterns proxy operations through session context.

Context

Phase 2 Vault integration is complete with full authentication infrastructure:

  • pkg/authz/ package with SessionManager
  • JWT validation and Vault token exchange
  • Dynamic credential generation and renewal
  • Service identity authentication (K8s + AWS)

Architectural Shift: Instead of integrating SessionManager into each pattern separately, we treat patterns as composable components. A single namespace can declare multiple patterns that work together, with SessionManager as a composable pattern that provides authentication infrastructure through slot-based dependencies.

Composable Pattern Concepts

1. Single Namespace, Multiple Patterns

namespaces:
- name: team-alpha
description: "Team Alpha's authenticated data workspace"

# Multiple patterns composed in one namespace
patterns:
- type: session-manager # Authentication infrastructure
- type: keyvalue # Data pattern (session-aware)
- type: producer # Messaging pattern (session-aware)
- type: consumer # Messaging pattern (session-aware)

2. Slot-Based Backend Configuration

Patterns declare requires (dependencies) and produces (outputs):

# SessionManager pattern
- type: session-manager
version: 0.1.0

# Dependencies: What backends this pattern needs
requires:
- slot: session-store # Where to store session metadata
pattern: keyvalue
backend: redis
config:
host: redis-sessions.prism.internal
port: 6379

# Outputs: What this pattern produces
produces:
- slot: session-events # Where to publish session lifecycle events
pattern: producer
backend: nats
config:
servers: ["nats://nats.prism.internal:4222"]
subject: "prism.sessions.{namespace}.{event}"

3. Session-Aware Patterns

Data patterns mark themselves as session-aware to operate within session context:

# KeyValue pattern (session-aware)
- type: keyvalue
version: 0.1.0
session_aware: true # Operations use session credentials
config:
backend: postgres
backend_config:
host: postgres.prism.internal
# No credentials - fetched from session!

Benefits of Composable Patterns

  1. Separation of Concerns: SessionManager handles auth, data patterns handle operations
  2. Flexible Composition: Mix and match patterns in any namespace
  3. Explicit Dependencies: Slots make backend requirements clear
  4. Reusable Infrastructure: SessionManager works with any data pattern
  5. Testable Components: Each pattern can be tested independently
  6. Observable Lifecycle: Session events provide audit trail

Integration Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ Composable Namespace: team-alpha │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ SessionManager Pattern (Infrastructure) │ │
│ │ │ │
│ │ Requires: │ │
│ │ [session-store] ──▶ Redis (session metadata) │ │
│ │ │ │
│ │ Produces: │ │
│ │ [session-events] ──▶ NATS (lifecycle events) │ │
│ │ │ │
│ │ Functions: │ │
│ │ • JWT validation ──▶ TokenValidator + JWKS cache │ │
│ │ • Vault token exchange ──▶ VaultClient │ │
│ │ • Credential generation ──▶ BackendCredentials │ │
│ │ • Session lifecycle ──▶ Create/Renew/Revoke/Expire │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ Provides session context │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Session-Aware Data Patterns │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ KeyValue │ │ Producer │ │ Consumer │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Postgres │ │ Kafka │ │ Kafka │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Uses: │ │ Uses: │ │ Uses: │ │ │
│ │ │ • Session │ │ • Session │ │ • Session │ │ │
│ │ │ creds │ │ creds │ │ creds │ │ │
│ │ │ • User ID │ │ • User ID │ │ • User ID │ │ │
│ │ │ • Namespace │ │ • Namespace │ │ • Namespace │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ Request Flow: │
│ 1. Client ──[JWT]──▶ SessionManager ──[Validate]──▶ Create Session│
│ 2. SessionManager ──[Store]──▶ Redis (session-store slot) │
│ 3. SessionManager ──[Publish]──▶ NATS (session-events slot) │
│ 4. SessionManager ──[Provide]──▶ KeyValue/Producer/Consumer │
│ 5. Pattern ──[Execute]──▶ Backend (with session credentials) │
│ │
└─────────────────────────────────────────────────────────────────────┘

Configuration Schema

Complete Composable Namespace Configuration

# namespace-config.yaml
namespaces:
- name: team-alpha
description: "Team Alpha's authenticated data workspace"

# Multiple patterns composed together
patterns:
# 1. SessionManager Pattern (Infrastructure)
- type: session-manager
version: 0.1.0
config:
# Token validation
token:
issuer: https://dex.prism.local:5556/dex
audience: prism-patterns
cache_ttl: 1h

# Vault configuration
vault:
address: https://vault.prism.internal:8200
jwt_auth:
role: prism-patterns
auth_path: auth/jwt
credentials:
secret_path: database/creds/team-alpha-role
renew_interval: 1800s # 30 minutes
tls:
enabled: true
ca_cert: /etc/prism/vault-ca.pem

# Session management
session:
ttl: 1h
idle_timeout: 30m
cleanup_interval: 15m

# Required backend slots
requires:
- slot: session-store
pattern: keyvalue
backend: redis
config:
host: redis-sessions.prism.internal
port: 6379
db: 0

# Produced event slots
produces:
- slot: session-events
pattern: producer
backend: nats
config:
servers: ["nats://nats.prism.internal:4222"]
subject: "prism.sessions.{namespace}.{event}"
# Events: created, authenticated, credential_renewed, expired, revoked

# 2. KeyValue Pattern (Session-Aware)
- type: keyvalue
version: 0.1.0
session_aware: true # Requires SessionManager in namespace
config:
backend: postgres
backend_config:
host: postgres.prism.internal
port: 5432
database: team_alpha_data
# NO CREDENTIALS - fetched from session!

# 3. Producer Pattern (Session-Aware)
- type: producer
version: 0.1.0
session_aware: true
config:
backend: kafka
backend_config:
brokers: ["kafka.prism.internal:9092"]
# NO CREDENTIALS - fetched from session!

# 4. Consumer Pattern (Session-Aware)
- type: consumer
version: 0.1.0
session_aware: true
config:
backend: kafka
backend_config:
brokers: ["kafka.prism.internal:9092"]
group_id: "team-alpha-consumers"
# NO CREDENTIALS - fetched from session!

Configuration Validation Rules

  1. SessionManager Requirements:

    • MUST have session-store slot (KeyValue backend)
    • SHOULD have session-events slot (Producer backend)
  2. Session-Aware Patterns:

    • MUST NOT specify credentials in config
    • REQUIRE SessionManager in namespace
    • RECEIVE credentials from session context
  3. Slot Naming:

    • Must be unique within namespace
    • Follow pattern: {purpose}-{type} (e.g., session-store, audit-events)
  4. Namespace Composition:

    • Max 1 SessionManager per namespace
    • Unlimited session-aware patterns
    • SessionManager must be declared first

Implementation Status

✅ Completed

  1. Configuration Parser (pkg/config/)

    • ComposableNamespace types with validation
    • PatternSlot for requires/produces declarations
    • Comprehensive validation rules
    • 100% test coverage (all tests passing)
    • Located: pkg/config/composable.go, pkg/config/loader.go
  2. Integration Test (tests/testing/composable_patterns_test.go)

    • Demonstrates full composable pattern architecture
    • Tests SessionManager with slot-based backends
    • Validates configuration validation logic
    • Located: tests/testing/composable_patterns_test.go
  3. Example Configuration (examples/configs/composable-namespace.yaml)

    • Complete working example
    • SessionManager with Redis + NATS slots
    • Session-aware KeyValue, Producer, Consumer
    • Documented usage flow
  4. Backend Test Helpers

    • Updated Redis, NATS, Postgres, Kafka backends
    • Added StartX() and Address() methods
    • Support non-testing variants for integration tests
    • Located: tests/testing/backends/
  5. Session Context Middleware (pkg/plugin/session_middleware.go)

    • ✅ gRPC unary and stream interceptors for JWT extraction
    • ✅ Session creation and caching (token hash-based)
    • ✅ Credential injection helper functions
    • ✅ Session context propagation through gRPC metadata
    • ✅ Supports both Bearer and x-prism-token headers
    • ✅ Unit tests passing (disabled mode, credential injection)
    • ✅ Mock implementations for testing (pkg/authz/test_mocks.go)
    • ✅ Cleanup of expired sessions from cache
    • Located: pkg/plugin/session_middleware.go, pkg/plugin/session_middleware_test.go

🚧 In Progress

  1. Session-Aware Pattern Runners
    • Update KeyValue pattern for session operations
    • Update Producer pattern for session operations
    • Update Consumer pattern for session operations
    • Integrate SessionMiddleware into pattern runners

📋 Pending

  1. SessionManager Pattern Runner
    • Implement pattern runner with slot support
    • Initialize slot backends (session-store, session-events)
    • Integrate with pkg/authz SessionManager
    • Provide session context to data patterns
    • Session lifecycle event publishing

Implementation Steps

Step 1: Update Config Structures (✅ Complete)

File: patterns/keyvalue/cmd/keyvalue-runner/main.go

// Add auth configuration to Namespace struct
type Namespace struct {
Name string `yaml:"name"`
Pattern string `yaml:"pattern"`
PatternVersion string `yaml:"pattern_version"`
Description string `yaml:"description"`
Backend string `yaml:"backend"`
BackendConfig map[string]interface{} `yaml:"backend_config"`

// NEW: Authentication configuration
Auth *AuthConfig `yaml:"auth,omitempty"`
}

// AuthConfig contains authentication settings
type AuthConfig struct {
Enabled bool `yaml:"enabled"`
Token TokenConfig `yaml:"token"`
Vault VaultConfig `yaml:"vault"`
Session SessionConfig `yaml:"session"`
}

// TokenConfig contains JWT validation settings
type TokenConfig struct {
Issuer string `yaml:"issuer"`
Audience string `yaml:"audience"`
CacheTTL string `yaml:"cache_ttl"`
}

// VaultConfig contains Vault connection settings
type VaultConfig struct {
Address string `yaml:"address"`
JWTAuth JWTAuthConfig `yaml:"jwt_auth"`
Credentials CredentialsConfig `yaml:"credentials"`
TLS TLSConfig `yaml:"tls"`
}

// JWTAuthConfig contains JWT auth method settings
type JWTAuthConfig struct {
Role string `yaml:"role"`
AuthPath string `yaml:"auth_path"`
}

// CredentialsConfig contains credential fetching settings
type CredentialsConfig struct {
SecretPath string `yaml:"secret_path"`
RenewInterval string `yaml:"renew_interval"`
}

// TLSConfig contains TLS settings
type TLSConfig struct {
Enabled bool `yaml:"enabled"`
CACert string `yaml:"ca_cert"`
SkipVerify bool `yaml:"skip_verify"`
}

// SessionConfig contains session management settings
type SessionConfig struct {
TTL string `yaml:"ttl"`
IdleTimeout string `yaml:"idle_timeout"`
CleanupInterval string `yaml:"cleanup_interval"`
}

Step 2: Add SessionManager to Runner

import (
"github.com/jrepp/prism-data-layer/pkg/authz"
// ... other imports
)

// KeyValueRunner runs a keyvalue pattern with authentication
type KeyValueRunner struct {
config *Config
kv *keyvalue.KeyValue
backend plugin.Plugin
grpcServer *keyvalue.GRPCServer

// NEW: Authentication components
sessionManager *authz.SessionManager
tokenExtractor *authz.TokenExtractor
sessionCache map[string]*authz.Session // sessionID -> Session
cacheMu sync.RWMutex
}

Step 3: Initialize Authentication

// InitializeAuth initializes authentication components
func (r *KeyValueRunner) InitializeAuth(ctx context.Context, authConfig *AuthConfig) error {
if authConfig == nil || !authConfig.Enabled {
log.Println("[KEYVALUE-RUNNER] Authentication disabled, using static credentials")
return nil
}

log.Println("[KEYVALUE-RUNNER] Initializing authentication...")

// Parse durations
cacheTTL, err := time.ParseDuration(authConfig.Token.CacheTTL)
if err != nil {
return fmt.Errorf("invalid token cache_ttl: %w", err)
}

sessionTTL, err := time.ParseDuration(authConfig.Session.TTL)
if err != nil {
return fmt.Errorf("invalid session ttl: %w", err)
}

idleTimeout, err := time.ParseDuration(authConfig.Session.IdleTimeout)
if err != nil {
return fmt.Errorf("invalid session idle_timeout: %w", err)
}

// 1. Create TokenValidator
tokenValidator, err := authz.NewTokenValidator(
authConfig.Token.Issuer,
authConfig.Token.Audience,
)
if err != nil {
return fmt.Errorf("failed to create token validator: %w", err)
}

// 2. Create VaultClient
vaultClient, err := authz.NewVaultClient(authz.VaultConfig{
Address: authConfig.Vault.Address,
Role: authConfig.Vault.JWTAuth.Role,
AuthPath: authConfig.Vault.JWTAuth.AuthPath,
SecretPath: authConfig.Vault.Credentials.SecretPath,
TLSConfig: &authz.TLSConfig{
Enabled: authConfig.Vault.TLS.Enabled,
CACert: authConfig.Vault.TLS.CACert,
SkipVerify: authConfig.Vault.TLS.SkipVerify,
},
})
if err != nil {
return fmt.Errorf("failed to create Vault client: %w", err)
}

// 3. Create SessionManager
sessionManager, err := authz.NewSessionManager(authz.SessionManagerConfig{
TokenValidator: tokenValidator,
VaultClient: vaultClient,
SessionTTL: sessionTTL,
IdleTimeout: idleTimeout,
})
if err != nil {
return fmt.Errorf("failed to create session manager: %w", err)
}

// 4. Create TokenExtractor
tokenExtractor := authz.NewTokenExtractor()

// 5. Start background cleanup
cleanupInterval, _ := time.ParseDuration(authConfig.Session.CleanupInterval)
go sessionManager.StartBackgroundCleanup(cleanupInterval)

r.sessionManager = sessionManager
r.tokenExtractor = tokenExtractor
r.sessionCache = make(map[string]*authz.Session)

log.Println("[KEYVALUE-RUNNER] ✅ Authentication initialized")
return nil
}

Step 4: Session-Based Backend Connection

// GetOrCreateSession gets or creates a session for the current request
func (r *KeyValueRunner) GetOrCreateSession(ctx context.Context) (*authz.Session, error) {
if r.sessionManager == nil {
return nil, fmt.Errorf("authentication not enabled")
}

// Extract JWT from gRPC metadata
token, err := r.tokenExtractor.ExtractToken(ctx)
if err != nil {
return nil, fmt.Errorf("failed to extract token: %w", err)
}

// Check session cache first (avoid creating duplicate sessions)
sessionID := hashToken(token) // Use token hash as cache key

r.cacheMu.RLock()
session, exists := r.sessionCache[sessionID]
r.cacheMu.RUnlock()

if exists && !session.IsExpired() {
session.UpdateLastAccessed()
return session, nil
}

// Create new session
session, err = r.sessionManager.CreateSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to create session: %w", err)
}

// Cache session
r.cacheMu.Lock()
r.sessionCache[sessionID] = session
r.cacheMu.Unlock()

log.Printf("[KEYVALUE-RUNNER] Created new session: %s (user: %s)",
session.SessionID, session.UserID)

return session, nil
}

// hashToken creates a cache key from JWT token
func hashToken(token string) string {
hash := sha256.Sum256([]byte(token))
return hex.EncodeToString(hash[:])
}

Step 5: Update Backend Initialization

// initializeBackendWithAuth initializes backend with per-session credentials
func (r *KeyValueRunner) initializeBackendWithAuth(ctx context.Context, backendName string, backendConfig map[string]interface{}) (plugin.Plugin, error) {
log.Printf("[KEYVALUE-RUNNER] Initializing authenticated backend: %s", backendName)

// Get session and credentials
session, err := r.GetOrCreateSession(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get session: %w", err)
}

credentials := session.GetCredentials()

// Inject credentials into backend config
backendConfig["username"] = credentials.Username
backendConfig["password"] = credentials.Password

log.Printf("[KEYVALUE-RUNNER] Using dynamic credentials: username=%s, lease=%s",
credentials.Username, credentials.LeaseID)

// Initialize backend (same as before, but with dynamic credentials)
return r.initializeBackend(ctx, backendName, backendConfig)
}

Testing Strategy

Unit Tests

File: patterns/keyvalue/cmd/keyvalue-runner/auth_test.go

func TestKeyValueRunner_InitializeAuth(t *testing.T) {
// Test auth initialization with valid config
// Test auth initialization with invalid config
// Test auth disabled mode
}

func TestKeyValueRunner_GetOrCreateSession(t *testing.T) {
// Test session creation with valid JWT
// Test session caching
// Test expired session handling
}

Integration Tests

File: patterns/keyvalue/auth_integration_test.go

func TestKeyValueRunner_E2E_WithVault(t *testing.T) {
// 1. Start Vault testcontainer
// 2. Configure JWT auth and database secrets
// 3. Start KeyValue runner with auth enabled
// 4. Make request with JWT token
// 5. Verify dynamic credentials used
// 6. Verify credential renewal
}

Migration Path

Phase 1: Dual Mode Support (Week 1)

Goal: Support both authenticated and unauthenticated modes

  1. Add auth configuration (optional)
  2. Implement SessionManager integration
  3. Keep static credentials as fallback
  4. Test with auth disabled (no breaking changes)

Phase 2: Auth-Enabled Deployments (Week 2)

Goal: Deploy auth-enabled runners to staging

  1. Deploy Vault infrastructure
  2. Configure JWT auth and database secrets
  3. Deploy pattern runners with auth enabled
  4. Validate E2E flows

Phase 3: Mandatory Authentication (Week 3)

Goal: Make authentication mandatory

  1. Remove static credential support
  2. Require auth configuration
  3. Update all deployments
  4. Monitor and optimize

Rollout Strategy

Development Environment

# 1. Start Vault in dev mode
vault server -dev -dev-root-token-id="root-token"

# 2. Start Dex for OIDC
docker-compose up dex

# 3. Start pattern runner with auth
./keyvalue-runner -config config-auth-enabled.yaml

Staging Environment

# 1. Deploy Vault cluster (HA mode)
# 2. Configure JWT auth with production OIDC provider
# 3. Set up database secrets engines
# 4. Deploy pattern runners with auth configuration
# 5. Run smoke tests

Production Rollout

Canary Deployment:

  1. Deploy auth-enabled runners to 10% of traffic
  2. Monitor metrics (latency, error rate, credential usage)
  3. Gradually increase to 50%, then 100%
  4. Rollback if issues detected

Monitoring

Key Metrics

# Session creation rate
rate(prism_session_created_total[5m])

# Session creation latency
histogram_quantile(0.99, rate(prism_session_creation_duration_bucket[5m]))

# Active sessions
prism_sessions_active{pattern="keyvalue"}

# Credential renewal success rate
rate(prism_credential_renewal_total{result="success"}[5m]) /
rate(prism_credential_renewal_total[5m])

# Backend operation latency (with auth)
histogram_quantile(0.99, rate(prism_backend_operation_duration_bucket{auth="enabled"}[5m]))

Alerts

- alert: HighSessionCreationLatency
expr: histogram_quantile(0.99, rate(prism_session_creation_duration_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High session creation latency (p99 > 500ms)"

- alert: CredentialRenewalFailure
expr: rate(prism_credential_renewal_total{result="failure"}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High credential renewal failure rate"

Revision History

  • 2025-11-17: Initial pattern runner integration guide