Skip to main content

Infrastructure Change Events via Terraform Provider Patterns

Abstract

This RFC explores how Terraform provider API patterns can be adapted to create a push-based event notification system for infrastructure changes in Prism. Instead of pattern runners polling for backend configuration changes, this system would enable real-time notifications when backends are modified, credentials rotate, or infrastructure state changes.

The core insight from Terraform providers is their Read/Refresh cycle combined with state drift detection—mechanisms that continuously compare desired state vs. actual state and generate events when divergence is detected.

Motivation

Current State: Pull-Based Configuration

Per RFC-039 and RFC-042, Prism currently uses a pull-based model:

┌─────────────────────────────────────────────────────────────────┐
│ Current Flow (Pull-Based) │
│ │
│ Pattern Runner Startup │
│ ├─ Call GetBackend("kafka-prod") → Admin Control Plane │
│ ├─ Receive backend config │
│ ├─ Connect to backend │
│ └─ Run until restart │
│ │
│ Problem: Configuration Changes Require Manual Intervention │
│ ├─ Admin updates backend config in registry │
│ ├─ Pattern runners don't know about the change │
│ ├─ Operators must restart pattern runners manually │
│ └─ Downtime or stale connections until restart │
└─────────────────────────────────────────────────────────────────┘

Desired State: Push-Based Event Notifications

Terraform providers solve a similar problem: detecting when infrastructure deviates from desired state. Key patterns we can adapt:

  1. Refresh Cycle: Providers continuously check actual state vs. stored state
  2. Drift Detection: Generate events when divergence is detected
  3. Resource Watches: Monitor infrastructure for changes
  4. State Reconciliation: Trigger workflows to converge actual → desired
┌─────────────────────────────────────────────────────────────────┐
│ Desired Flow (Event-Based) │
│ │
│ Pattern Runner │
│ ├─ Subscribe to backend change events │
│ ├─ Receive event: "kafka-prod config updated" │
│ ├─ Fetch new config from admin │
│ ├─ Reconnect to backend with new credentials │
│ └─ Continue processing (zero downtime) │
│ │
│ Admin Control Plane │
│ ├─ Monitor backend health │
│ ├─ Detect credential rotation │
│ ├─ Detect backend failure/recovery │
│ └─ Publish events to subscribed pattern runners │
└─────────────────────────────────────────────────────────────────┘

Use Cases

  1. Credential Rotation: Kubernetes secrets updated → Pattern runners reconnect with new credentials
  2. Backend Failover: Primary Kafka broker down → Pattern runners switch to replica
  3. Configuration Updates: Admin changes backend settings → Pattern runners hot-reload config
  4. Infrastructure Drift: Backend manually modified outside Prism → Admin detects and reconciles
  5. Health Monitoring: Backend becomes unhealthy → Pattern runners reroute traffic

Goals

  1. Event-Driven Backend Discovery: Pattern runners receive push notifications on backend changes
  2. Terraform-Inspired State Management: Adapt provider patterns (Read/Refresh/Drift) to Prism
  3. Zero-Downtime Updates: Pattern runners hot-reload configs without restart
  4. Infrastructure Observability: Centralized visibility into backend state changes
  5. Reconciliation Loops: Automatically converge actual state → desired state
  6. Multi-Source Events: Aggregate signals from Kubernetes, Terraform, cloud APIs, manual changes

Non-Goals

  1. Full Terraform Provider Implementation: We're adapting patterns, not building a Terraform provider
  2. Declarative Infrastructure: This RFC focuses on event notification, not provisioning
  3. Multi-Cloud Orchestration: Initial focus is Kubernetes; cloud providers are future work
  4. Real-Time Streaming: Events are near-real-time (seconds), not sub-second latency

Background: Terraform Provider API Patterns

How Terraform Providers Work

Terraform providers implement the CRUD + Refresh lifecycle:

// Terraform Provider Schema (simplified)
type Provider struct {
// Resource lifecycle hooks
Create func(d *ResourceData) error
Read func(d *ResourceData) error // ← Key for drift detection
Update func(d *ResourceData) error
Delete func(d *ResourceData) error
}

// Terraform Refresh Cycle
func (p *Provider) RefreshState(ctx context.Context, resource *Resource) (*Resource, error) {
// 1. Read actual state from infrastructure
actualState, err := p.readInfrastructure(ctx, resource.ID)
if err != nil {
return nil, err
}

// 2. Compare actual vs. stored state
if !reflect.DeepEqual(actualState, resource.StoredState) {
// 3. Generate drift event
p.emitDriftEvent(resource, actualState, resource.StoredState)

// 4. Update stored state
resource.StoredState = actualState
}

return resource, nil
}

Key Patterns to Adapt

1. Read/Refresh Cycle

Terraform: Periodically calls Read() to refresh state from infrastructure Prism: Admin control plane periodically checks backend health and config

// Prism Backend State Refresh
type BackendMonitor struct {
adminClient admin.ControlPlaneClient
k8sClient kubernetes.Interface
refreshRate time.Duration
}

func (m *BackendMonitor) RefreshBackend(ctx context.Context, backendName string) error {
// 1. Fetch stored backend config from admin
storedBackend, err := m.adminClient.GetBackend(ctx, backendName)
if err != nil {
return err
}

// 2. Read actual state from infrastructure (e.g., Kubernetes)
actualBackend, err := m.readActualState(ctx, storedBackend)
if err != nil {
return err
}

// 3. Compare and detect drift
drift := m.detectDrift(storedBackend, actualBackend)
if len(drift) > 0 {
// 4. Publish drift events
for _, change := range drift {
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: change.Type,
OldValue: change.OldValue,
NewValue: change.NewValue,
})
}

// 5. Update admin state
m.adminClient.UpdateBackend(ctx, actualBackend)
}

return nil
}

2. State Drift Detection

Terraform: Compares desired_state (in .tf files) vs. actual_state (from provider Read) Prism: Compares admin_state (backend registry) vs. actual_state (Kubernetes/cloud)

type StateDrift struct {
Field string // e.g., "config.kafka.brokers"
OldValue interface{} // ["kafka-01:9092"]
NewValue interface{} // ["kafka-02:9092", "kafka-03:9092"]
DriftType DriftType // CREDENTIAL_CHANGE, CONFIG_UPDATE, HEALTH_STATUS, etc.
}

func (m *BackendMonitor) detectDrift(stored, actual *Backend) []StateDrift {
var drifts []StateDrift

// Example: Detect broker list changes
if !reflect.DeepEqual(stored.Config.GetKafka().Brokers, actual.Config.GetKafka().Brokers) {
drifts = append(drifts, StateDrift{
Field: "config.kafka.brokers",
OldValue: stored.Config.GetKafka().Brokers,
NewValue: actual.Config.GetKafka().Brokers,
DriftType: DRIFT_TYPE_CONFIG_UPDATE,
})
}

// Example: Detect secret rotation
if stored.Config.GetKafka().Auth.GetSasl().Password != actual.Config.GetKafka().Auth.GetSasl().Password {
drifts = append(drifts, StateDrift{
Field: "config.kafka.auth.sasl.password",
OldValue: "[REDACTED]",
NewValue: "[REDACTED]",
DriftType: DRIFT_TYPE_CREDENTIAL_ROTATION,
})
}

return drifts
}

3. Resource Watchers

Terraform: Providers can watch cloud APIs for resource changes (via webhooks, polling, or event streams) Prism: Admin watches Kubernetes resources, cloud APIs, and internal state

// Kubernetes Secret Watcher
func (m *BackendMonitor) WatchSecrets(ctx context.Context) error {
watcher, err := m.k8sClient.CoreV1().Secrets("infrastructure").Watch(ctx, metav1.ListOptions{
LabelSelector: "prism.io/backend=true",
})
if err != nil {
return err
}

for event := range watcher.ResultChan() {
secret := event.Object.(*corev1.Secret)
backendName := secret.Labels["prism.io/backend-name"]

switch event.Type {
case watch.Modified:
// Secret updated (credential rotation)
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: CHANGE_TYPE_CREDENTIAL_ROTATION,
Source: "kubernetes-secret",
})

case watch.Deleted:
// Secret deleted (backend unavailable)
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: CHANGE_TYPE_BACKEND_DELETED,
Source: "kubernetes-secret",
})
}
}

return nil
}

4. State Reconciliation

Terraform: When drift is detected, Terraform can auto-apply or prompt user to reconcile Prism: Admin can auto-reconcile or notify operators

type ReconciliationPolicy string

const (
RECONCILE_AUTO ReconciliationPolicy = "auto" // Auto-update stored state
RECONCILE_MANUAL ReconciliationPolicy = "manual" // Notify operator, wait for approval
RECONCILE_IGNORE ReconciliationPolicy = "ignore" // Log drift but don't reconcile
)

func (m *BackendMonitor) Reconcile(ctx context.Context, drift StateDrift, policy ReconciliationPolicy) error {
switch policy {
case RECONCILE_AUTO:
// Automatically update admin state to match actual state
return m.adminClient.UpdateBackend(ctx, drift.NewValue)

case RECONCILE_MANUAL:
// Create admin notification for operator approval
return m.adminClient.CreateNotification(ctx, Notification{
Title: fmt.Sprintf("Backend drift detected: %s", drift.BackendName),
Message: fmt.Sprintf("Field %s changed from %v to %v", drift.Field, drift.OldValue, drift.NewValue),
Actions: []string{"APPROVE", "REJECT", "IGNORE"},
})

case RECONCILE_IGNORE:
// Just log the drift
log.Warnf("Backend drift detected but ignored: %s", drift.Field)
return nil
}

return nil
}

Design: Prism Backend Event System

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│ Prism Event-Driven Backend System │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Sources (Infrastructure Signals) │ │
│ │ │ │
│ │ Kubernetes Terraform Cloud Manual │ │
│ │ ├─ PrismBackend CRDs ├─ Terraform state Admin UI │ │
│ │ ├─ Secrets (auth) ├─ Resource changes CLI │ │
│ │ ├─ Services (endpoints) ├─ Drift detection API │ │
│ │ └─ ConfigMaps └─ Plan/Apply events │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (watch/poll) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Backend State Monitor (Prism Admin Component) │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ State Refresh Loop │ │ │
│ │ │ ├─ Kubernetes Watcher (PrismBackend CRDs, Secrets) │ │ │
│ │ │ ├─ Terraform State Watcher (via Terraform Cloud API) │ │ │
│ │ │ ├─ Cloud API Pollers (AWS, GCP, Azure) │ │ │
│ │ │ └─ Health Checkers (backend connectivity tests) │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Drift Detector │ │ │
│ │ │ ├─ Compare admin_state vs. actual_state │ │ │
│ │ │ ├─ Classify drift type (credential, config, health) │ │ │
│ │ │ └─ Generate BackendChangeEvents │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (publish events) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Bus (NATS/Kafka/Internal) │ │
│ │ │ │
│ │ Topics: │ │
│ │ ├─ backend.*.config.updated │ │
│ │ ├─ backend.*.credentials.rotated │ │
│ │ ├─ backend.*.health.changed │ │
│ │ └─ backend.*.deleted │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (subscribe) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Consumers │ │
│ │ │ │
│ │ Pattern Runners Prism Operator Monitoring │ │
│ │ ├─ Hot-reload configs ├─ Update CRD status ├─ Alert on drift │ │
│ │ ├─ Reconnect backends ├─ Trigger reconcile ├─ Audit log │ │
│ │ └─ Update local cache └─ Update K8s secrets └─ Metrics │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘

Event Schema

// Backend change event
message BackendChangeEvent {
string event_id = 1; // Unique event ID
string backend_name = 2; // e.g., "kafka-prod"
BackendChangeType change_type = 3; // Type of change
string source = 4; // "kubernetes", "terraform", "manual"
int64 timestamp = 5; // Event timestamp

// Old and new values (for drift)
Backend old_state = 6;
Backend new_state = 7;

// Affected fields
repeated string changed_fields = 8; // ["config.kafka.brokers", "metadata.region"]

// Reconciliation metadata
ReconciliationPolicy policy = 9;
string reconciliation_status = 10; // "pending", "approved", "rejected"

// Observability
map<string, string> metadata = 11;
repeated string affected_patterns = 12; // Patterns using this backend
}

enum BackendChangeType {
BACKEND_CHANGE_TYPE_UNSPECIFIED = 0;
BACKEND_CHANGE_TYPE_CREATED = 1;
BACKEND_CHANGE_TYPE_UPDATED = 2;
BACKEND_CHANGE_TYPE_DELETED = 3;
BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED = 4;
BACKEND_CHANGE_TYPE_HEALTH_CHANGED = 5;
BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED = 6;
BACKEND_CHANGE_TYPE_CONFIG_DRIFT_DETECTED = 7;
}

// Event subscription
message BackendEventSubscription {
string subscriber_id = 1; // Pattern runner ID
repeated string backend_names = 2; // Backends to watch
repeated BackendChangeType event_types = 3; // Filter by event type
string callback_endpoint = 4; // gRPC endpoint for notifications
}

Component 1: Backend State Monitor

New admin component that implements Terraform-inspired refresh cycle:

// pkg/admin/backend_monitor.go

type BackendStateMonitor struct {
adminClient admin.ControlPlaneClient
k8sClient kubernetes.Interface
terraformClient *tfe.Client // Terraform Cloud API
eventPublisher EventPublisher
refreshInterval time.Duration

// State tracking
lastKnownStates map[string]*Backend
driftDetector *DriftDetector
}

func NewBackendStateMonitor(
adminClient admin.ControlPlaneClient,
k8sClient kubernetes.Interface,
eventPublisher EventPublisher,
) *BackendStateMonitor {
return &BackendStateMonitor{
adminClient: adminClient,
k8sClient: k8sClient,
eventPublisher: eventPublisher,
refreshInterval: 30 * time.Second,
lastKnownStates: make(map[string]*Backend),
driftDetector: NewDriftDetector(),
}
}

func (m *BackendStateMonitor) Start(ctx context.Context) error {
// Start periodic refresh loop
go m.refreshLoop(ctx)

// Start Kubernetes watchers
go m.watchKubernetesSecrets(ctx)
go m.watchKubernetesCRDs(ctx)

// Start Terraform watcher (if configured)
if m.terraformClient != nil {
go m.watchTerraformState(ctx)
}

return nil
}

// Terraform-style refresh loop
func (m *BackendStateMonitor) refreshLoop(ctx context.Context) {
ticker := time.NewTicker(m.refreshInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.refreshAllBackends(ctx)
}
}
}

func (m *BackendStateMonitor) refreshAllBackends(ctx context.Context) {
// 1. List all backends from admin
backends, err := m.adminClient.ListBackends(ctx, &adminpb.ListBackendsRequest{})
if err != nil {
log.Errorf("Failed to list backends: %v", err)
return
}

// 2. Refresh each backend
for _, backend := range backends.Backends {
if err := m.refreshBackend(ctx, backend); err != nil {
log.Errorf("Failed to refresh backend %s: %v", backend.Name, err)
}
}
}

func (m *BackendStateMonitor) refreshBackend(ctx context.Context, storedBackend *adminpb.Backend) error {
// 1. Read actual state from infrastructure
actualBackend, err := m.readActualState(ctx, storedBackend)
if err != nil {
return fmt.Errorf("failed to read actual state: %w", err)
}

// 2. Detect drift
drifts := m.driftDetector.Detect(storedBackend, actualBackend)
if len(drifts) == 0 {
// No changes
return nil
}

// 3. Publish events for each drift
for _, drift := range drifts {
event := &BackendChangeEvent{
EventId: uuid.New().String(),
BackendName: storedBackend.Name,
ChangeType: m.mapDriftToChangeType(drift),
Source: drift.Source,
Timestamp: time.Now().Unix(),
OldState: storedBackend,
NewState: actualBackend,
ChangedFields: []string{drift.Field},
}

if err := m.eventPublisher.Publish(ctx, event); err != nil {
log.Errorf("Failed to publish event: %v", err)
}
}

// 4. Update stored state (auto-reconcile)
m.lastKnownStates[storedBackend.Name] = actualBackend

return nil
}

func (m *BackendStateMonitor) readActualState(ctx context.Context, backend *adminpb.Backend) (*adminpb.Backend, error) {
switch backend.Type {
case adminpb.BackendType_BACKEND_TYPE_KAFKA:
return m.readKafkaState(ctx, backend)
case adminpb.BackendType_BACKEND_TYPE_POSTGRES:
return m.readPostgresState(ctx, backend)
case adminpb.BackendType_BACKEND_TYPE_REDIS:
return m.readRedisState(ctx, backend)
default:
return nil, fmt.Errorf("unsupported backend type: %v", backend.Type)
}
}

// Read actual Kafka state from Kubernetes
func (m *BackendStateMonitor) readKafkaState(ctx context.Context, backend *adminpb.Backend) (*adminpb.Backend, error) {
// 1. Read Kubernetes service (get actual endpoints)
svc, err := m.k8sClient.CoreV1().Services("infrastructure").Get(ctx, "kafka", metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Kafka service: %w", err)
}

// 2. Read Kubernetes secret (get current credentials)
secret, err := m.k8sClient.CoreV1().Secrets("infrastructure").Get(ctx, "kafka-credentials", metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Kafka secret: %w", err)
}

// 3. Build actual backend config
actualBackend := proto.Clone(backend).(*adminpb.Backend)
kafkaConfig := actualBackend.Config.GetKafka()

// Update brokers from service endpoints
if svc.Spec.ClusterIP != "" {
kafkaConfig.Brokers = []string{fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, svc.Spec.Ports[0].Port)}
}

// Update password from secret
if password, ok := secret.Data["password"]; ok {
kafkaConfig.Auth = &adminpb.KafkaAuth{
Auth: &adminpb.KafkaAuth_Sasl{
Sasl: &adminpb.KafkaSASL{
Password: string(password),
},
},
}
}

return actualBackend, nil
}

Component 2: Drift Detector

// pkg/admin/drift_detector.go

type DriftDetector struct {
// Configuration for what constitutes "drift"
ignoreFields []string // Fields to ignore (e.g., "updated_at")
}

func NewDriftDetector() *DriftDetector {
return &DriftDetector{
ignoreFields: []string{"updated_at", "created_at"},
}
}

type Drift struct {
Field string
OldValue interface{}
NewValue interface{}
DriftType DriftType
Source string // "kubernetes", "terraform", "health-check"
}

type DriftType int

const (
DriftTypeConfigUpdate DriftType = iota
DriftTypeCredentialRotation
DriftTypeEndpointChange
DriftTypeHealthStatusChange
)

func (d *DriftDetector) Detect(stored, actual *adminpb.Backend) []Drift {
var drifts []Drift

// Compare backend configs
switch stored.Type {
case adminpb.BackendType_BACKEND_TYPE_KAFKA:
drifts = append(drifts, d.detectKafkaDrift(stored, actual)...)
case adminpb.BackendType_BACKEND_TYPE_POSTGRES:
drifts = append(drifts, d.detectPostgresDrift(stored, actual)...)
case adminpb.BackendType_BACKEND_TYPE_REDIS:
drifts = append(drifts, d.detectRedisDrift(stored, actual)...)
}

return drifts
}

func (d *DriftDetector) detectKafkaDrift(stored, actual *adminpb.Backend) []Drift {
var drifts []Drift

storedKafka := stored.Config.GetKafka()
actualKafka := actual.Config.GetKafka()

// Check broker list changes
if !reflect.DeepEqual(storedKafka.Brokers, actualKafka.Brokers) {
drifts = append(drifts, Drift{
Field: "config.kafka.brokers",
OldValue: storedKafka.Brokers,
NewValue: actualKafka.Brokers,
DriftType: DriftTypeEndpointChange,
Source: "kubernetes-service",
})
}

// Check password changes (credential rotation)
storedPassword := storedKafka.GetAuth().GetSasl().GetPassword()
actualPassword := actualKafka.GetAuth().GetSasl().GetPassword()
if storedPassword != actualPassword {
drifts = append(drifts, Drift{
Field: "config.kafka.auth.sasl.password",
OldValue: "[REDACTED]",
NewValue: "[REDACTED]",
DriftType: DriftTypeCredentialRotation,
Source: "kubernetes-secret",
})
}

return drifts
}

Component 3: Event Publisher

// pkg/admin/event_publisher.go

type EventPublisher interface {
Publish(ctx context.Context, event *BackendChangeEvent) error
Subscribe(ctx context.Context, subscription *BackendEventSubscription) (<-chan *BackendChangeEvent, error)
}

// NATS-based event publisher
type NatsEventPublisher struct {
nc *nats.Conn
}

func NewNatsEventPublisher(url string) (*NatsEventPublisher, error) {
nc, err := nats.Connect(url)
if err != nil {
return nil, err
}
return &NatsEventPublisher{nc: nc}, nil
}

func (p *NatsEventPublisher) Publish(ctx context.Context, event *BackendChangeEvent) error {
// Build NATS subject: backend.{backend_name}.{change_type}
subject := fmt.Sprintf("backend.%s.%s", event.BackendName, event.ChangeType)

// Serialize event to protobuf
data, err := proto.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}

// Publish to NATS
if err := p.nc.Publish(subject, data); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}

log.Infof("Published event: %s to subject %s", event.EventId, subject)
return nil
}

func (p *NatsEventPublisher) Subscribe(ctx context.Context, subscription *BackendEventSubscription) (<-chan *BackendChangeEvent, error) {
eventChan := make(chan *BackendChangeEvent, 100)

// Build NATS subject patterns
for _, backendName := range subscription.BackendNames {
subject := fmt.Sprintf("backend.%s.*", backendName)

sub, err := p.nc.Subscribe(subject, func(msg *nats.Msg) {
var event BackendChangeEvent
if err := proto.Unmarshal(msg.Data, &event); err != nil {
log.Errorf("Failed to unmarshal event: %v", err)
return
}

// Filter by event types
if len(subscription.EventTypes) > 0 {
found := false
for _, eventType := range subscription.EventTypes {
if event.ChangeType == eventType {
found = true
break
}
}
if !found {
return
}
}

select {
case eventChan <- &event:
default:
log.Warnf("Event channel full, dropping event %s", event.EventId)
}
})

if err != nil {
return nil, fmt.Errorf("failed to subscribe to subject %s: %w", subject, err)
}

// Handle context cancellation
go func() {
<-ctx.Done()
sub.Unsubscribe()
close(eventChan)
}()
}

return eventChan, nil
}

Component 4: Pattern Runner Event Consumer

// Pattern runner subscribes to backend change events

type PatternRunner struct {
adminClient admin.ControlPlaneClient
eventPublisher EventPublisher
slotBinder *SlotBinder

// Current slot bindings
slotBindings map[string]*Slot // slot_name → slot implementation
}

func (r *PatternRunner) Start(ctx context.Context, config *PatternConfig) error {
// 1. Initial backend binding (as before)
if err := r.bindSlots(ctx, config.SlotBindings); err != nil {
return err
}

// 2. Subscribe to backend change events
subscription := &BackendEventSubscription{
SubscriberId: r.patternID,
BackendNames: r.getBackendNames(config.SlotBindings),
EventTypes: []BackendChangeType{
BACKEND_CHANGE_TYPE_UPDATED,
BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED,
BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED,
},
}

eventChan, err := r.eventPublisher.Subscribe(ctx, subscription)
if err != nil {
return fmt.Errorf("failed to subscribe to backend events: %w", err)
}

// 3. Start event handling loop
go r.handleBackendEvents(ctx, eventChan)

// 4. Start processing
return r.startProcessing(ctx)
}

func (r *PatternRunner) handleBackendEvents(ctx context.Context, eventChan <-chan *BackendChangeEvent) {
for {
select {
case <-ctx.Done():
return

case event := <-eventChan:
log.Infof("Received backend change event: %s for backend %s", event.ChangeType, event.BackendName)

// Handle event based on type
switch event.ChangeType {
case BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED:
// Hot-reload credentials
if err := r.rotateCredentials(ctx, event); err != nil {
log.Errorf("Failed to rotate credentials for backend %s: %v", event.BackendName, err)
}

case BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED:
// Reconnect to new endpoints
if err := r.reconnectBackend(ctx, event); err != nil {
log.Errorf("Failed to reconnect to backend %s: %v", event.BackendName, err)
}

case BACKEND_CHANGE_TYPE_UPDATED:
// General config update
if err := r.reloadBackendConfig(ctx, event); err != nil {
log.Errorf("Failed to reload config for backend %s: %v", event.BackendName, err)
}
}
}
}
}

func (r *PatternRunner) rotateCredentials(ctx context.Context, event *BackendChangeEvent) error {
// 1. Fetch new backend config
newBackend := event.NewState

// 2. Find slot using this backend
var targetSlot *Slot
var slotName string
for name, slot := range r.slotBindings {
if slot.BackendName == event.BackendName {
targetSlot = slot
slotName = name
break
}
}

if targetSlot == nil {
return fmt.Errorf("no slot found for backend %s", event.BackendName)
}

// 3. Update credentials on existing connection
if err := targetSlot.UpdateCredentials(ctx, newBackend.Config); err != nil {
// If hot-reload fails, reconnect
log.Warnf("Hot credential reload failed, reconnecting slot %s", slotName)
return r.reconnectSlot(ctx, slotName, newBackend)
}

log.Infof("Successfully rotated credentials for slot %s", slotName)
return nil
}

func (r *PatternRunner) reconnectBackend(ctx context.Context, event *BackendChangeEvent) error {
// Similar to rotateCredentials, but force reconnect
newBackend := event.NewState

for slotName, slot := range r.slotBindings {
if slot.BackendName == event.BackendName {
log.Infof("Reconnecting slot %s to backend %s", slotName, event.BackendName)

// Disconnect old connection
slot.Disconnect(ctx)

// Create new slot with updated config
newSlot, err := r.slotBinder.BindSlot(ctx, slotName, event.BackendName, slot.SlotType)
if err != nil {
return fmt.Errorf("failed to bind slot %s: %w", slotName, err)
}

// Replace slot
r.slotBindings[slotName] = newSlot

log.Infof("Successfully reconnected slot %s", slotName)
}
}

return nil
}

Integration with Kubernetes Operator

Per RFC-042, the Prism Operator already watches PrismBackend CRDs. We extend it to publish events:

// prism-operator backend controller

func (r *PrismBackendReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var backend prismv1alpha1.PrismBackend
if err := r.Get(ctx, req.NamespacedName, &backend); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// 1. Validate service and secret exist (as before)
// ...

// 2. Check if backend config changed
oldStatus := backend.Status.ResolvedEndpoint
newStatus := fmt.Sprintf("%s.%s.svc:%d", svcName, svcNamespace, backend.Spec.Discovery.Service.Port)

if oldStatus != newStatus {
// 3. Publish endpoint change event
r.eventPublisher.Publish(ctx, &BackendChangeEvent{
BackendName: backend.Name,
ChangeType: BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED,
Source: "kubernetes-operator",
Timestamp: time.Now().Unix(),
ChangedFields: []string{"discovery.service.port"},
})
}

// 4. Update status (as before)
return r.updateStatus(ctx, &backend, "Discovered", newStatus)
}

Integration with Terraform

For backends managed via Terraform, we can use Terraform Cloud API or state file polling:

// pkg/admin/terraform_watcher.go

type TerraformStateWatcher struct {
client *tfe.Client
organization string
workspace string
eventPublisher EventPublisher
pollInterval time.Duration
}

func (w *TerraformStateWatcher) Watch(ctx context.Context) error {
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := w.checkTerraformState(ctx); err != nil {
log.Errorf("Failed to check Terraform state: %v", err)
}
}
}
}

func (w *TerraformStateWatcher) checkTerraformState(ctx context.Context) error {
// 1. Fetch current Terraform state
currentStateVersion, err := w.client.StateVersions.ReadCurrent(ctx, w.workspace)
if err != nil {
return fmt.Errorf("failed to read Terraform state: %w", err)
}

// 2. Download state file
stateFile, err := w.client.StateVersions.Download(ctx, currentStateVersion.DownloadURL)
if err != nil {
return fmt.Errorf("failed to download Terraform state: %w", err)
}

// 3. Parse state and look for backend resources
var state tfstate.State
if err := json.Unmarshal(stateFile, &state); err != nil {
return fmt.Errorf("failed to parse Terraform state: %w", err)
}

// 4. Check for backend resource changes
for _, resource := range state.Resources {
if resource.Type == "kubernetes_secret" || resource.Type == "helm_release" {
// Check if this resource corresponds to a Prism backend
if backendName, ok := resource.Attributes["labels"]["prism.io/backend-name"]; ok {
// Compare with admin state
if changed := w.detectTerraformDrift(ctx, backendName, resource); changed {
w.eventPublisher.Publish(ctx, &BackendChangeEvent{
BackendName: backendName,
ChangeType: BACKEND_CHANGE_TYPE_UPDATED,
Source: "terraform",
Timestamp: time.Now().Unix(),
})
}
}
}
}

return nil
}

Event Topic Hierarchy

backend.*                              # All backend events
├─ backend.{backend_name}.* # Events for specific backend
│ ├─ backend.{name}.created
│ ├─ backend.{name}.updated
│ ├─ backend.{name}.deleted
│ ├─ backend.{name}.credentials.rotated
│ ├─ backend.{name}.health.changed
│ └─ backend.{name}.config.updated

├─ backend.type.{backend_type}.* # Events by backend type
│ ├─ backend.type.kafka.*
│ ├─ backend.type.postgres.*
│ └─ backend.type.redis.*

└─ backend.source.{source}.* # Events by source
├─ backend.source.kubernetes.*
├─ backend.source.terraform.*
├─ backend.source.manual.*
└─ backend.source.health-check.*

Pattern runners subscribe to:

// Subscribe to all events for backends I use
"backend.kafka-prod.*"
"backend.redis-cache.*"

// Or subscribe to specific event types
"backend.*.credentials.rotated"
"backend.*.health.changed"

Example Scenarios

Scenario 1: Kubernetes Secret Rotation

┌─────────────────────────────────────────────────────────────────┐
│ 1. Kubernetes Admin rotates Kafka password │
│ │
│ kubectl create secret generic kafka-credentials \ │
│ --from-literal=password=new-password \ │
│ --dry-run=client -o yaml | kubectl apply -f - │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 2. Backend State Monitor detects secret change │
│ │
│ K8s Secret Watcher: │
│ ├─ Receives watch event: Secret "kafka-credentials" Modified │
│ ├─ Reads new secret value │
│ ├─ Compares with admin state │
│ └─ Detects drift: password changed │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 3. Publish credential rotation event │
│ │
│ NATS Publish: │
│ Subject: backend.kafka-prod.credentials.rotated │
│ Payload: BackendChangeEvent{...} │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 4. Pattern Runners receive event │
│ │
│ order-consumer: │
│ ├─ Receives event on subscription │
│ ├─ Fetches new backend config from admin │
│ ├─ Calls slot.UpdateCredentials(new_password) │
│ └─ Continues processing (no downtime) │
│ │
│ analytics-ingest: │
│ ├─ Receives event on subscription │
│ ├─ Updates Kafka producer config │
│ └─ Reconnects to Kafka with new credentials │
└─────────────────────────────────────────────────────────────────┘

Scenario 2: Terraform-Managed Backend Update

┌─────────────────────────────────────────────────────────────────┐
│ 1. Operator updates Terraform config │
│ │
│ # main.tf │
│ resource "helm_release" "kafka" { │
│ replicas = 5 # scaled up from 3 │
│ } │
│ │
│ terraform apply │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 2. Terraform State Watcher detects change │
│ │
│ Poll Terraform Cloud API: │
│ ├─ Fetch current state version │
│ ├─ Download state file │
│ ├─ Parse resources │
│ ├─ Detect kafka helm_release changed │
│ └─ Extract new broker endpoints │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 3. Publish endpoint change event │
│ │
│ NATS Publish: │
│ Subject: backend.kafka-prod.config.updated │
│ Payload: BackendChangeEvent{ │
│ changed_fields: ["config.kafka.brokers"], │
│ old_value: ["kafka-01:9092", "kafka-02:9092", "kafka-03:9092"], │
│ new_value: ["kafka-01:9092", ..., "kafka-05:9092"] │
│ } │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 4. Admin updates backend registry │
│ 5. Pattern runners receive event and reconnect │
└─────────────────────────────────────────────────────────────────┘

Observability and Debugging

Admin UI Dashboard

Backend Events (Last 24 Hours)
┌────────────────────────────────────────────────────────────────┐
│ Backend Event Type Source Time │
├────────────────────────────────────────────────────────────────┤
│ kafka-prod credentials.rotated kubernetes 2min ago │
│ postgres-db health.changed health-check 15min ago │
│ redis-cache config.updated manual 1hr ago │
│ nats-messaging endpoint.changed terraform 3hr ago │
└────────────────────────────────────────────────────────────────┘

Backend Drift Detection
┌────────────────────────────────────────────────────────────────┐
│ Backend Drift Type Status Action │
├────────────────────────────────────────────────────────────────┤
│ kafka-prod Broker count mismatch Active Reconcile │
│ postgres-db Password changed Resolved Auto-applied │
└────────────────────────────────────────────────────────────────┘

Pattern Runner Subscriptions
┌────────────────────────────────────────────────────────────────┐
│ Pattern Subscribed Backends Status │
├────────────────────────────────────────────────────────────────┤
│ order-consumer kafka-prod, redis-cache Connected │
│ analytics-ingest kafka-prod Connected │
│ user-notifications postgres-db, nats-* Connected │
└────────────────────────────────────────────────────────────────┘

Metrics

# Backend state monitor metrics
prism_backend_refresh_total{backend="kafka-prod"} 1440
prism_backend_drift_detected_total{backend="kafka-prod",type="credential"} 3
prism_backend_event_published_total{backend="kafka-prod",event="credentials.rotated"} 3
prism_backend_state_refresh_duration_seconds{backend="kafka-prod"} 0.150

# Pattern runner metrics
prism_pattern_backend_event_received_total{pattern="order-consumer",backend="kafka-prod"} 3
prism_pattern_credential_rotation_duration_seconds{pattern="order-consumer"} 0.050
prism_pattern_backend_reconnect_total{pattern="order-consumer",backend="kafka-prod"} 0

Audit Log

{
"timestamp": "2025-10-24T10:15:30Z",
"event_type": "backend.credentials.rotated",
"backend_name": "kafka-prod",
"source": "kubernetes-secret",
"changed_fields": ["config.kafka.auth.sasl.password"],
"affected_patterns": ["order-consumer", "analytics-ingest"],
"reconciliation_status": "auto-applied",
"metadata": {
"kubernetes_secret": "kafka-credentials",
"kubernetes_namespace": "infrastructure",
"secret_version": "v2"
}
}

Implementation Plan

Phase 1: Core Event Infrastructure (Week 1-2)

  • Define BackendChangeEvent protobuf schema
  • Implement NATS-based event publisher/subscriber
  • Add event topic hierarchy
  • Create admin API for event subscriptions
  • Unit tests for event serialization and routing

Phase 2: Backend State Monitor (Week 2-3)

  • Implement BackendStateMonitor with refresh loop
  • Add Kubernetes Secret watcher
  • Add Kubernetes PrismBackend CRD watcher
  • Implement drift detector
  • Add reconciliation policies
  • Integration tests with kind cluster

Phase 3: Pattern Runner Integration (Week 3-4)

  • Add event subscription to pattern runner startup
  • Implement hot credential rotation
  • Implement backend reconnection
  • Add circuit breaker for failed reconnections
  • End-to-end tests with credential rotation

Phase 4: Terraform Integration (Week 4-5)

  • Implement Terraform Cloud API watcher
  • Add Terraform state file polling
  • Map Terraform resources to Prism backends
  • Integration tests with Terraform Cloud sandbox

Phase 5: Observability (Week 5-6)

  • Add Prometheus metrics for state monitor
  • Add Prometheus metrics for pattern runners
  • Create Grafana dashboards
  • Implement audit logging
  • Add admin UI for event monitoring

Phase 6: Documentation (Week 6)

  • Write operator guide for event system
  • Document event subscription patterns
  • Create troubleshooting guide
  • Write ADR for design decisions
  • Update changelog

Success Criteria

  1. Zero-Downtime Credential Rotation: Pattern runners automatically reload credentials without restart
  2. Drift Detection Latency: Backend changes detected within 30 seconds
  3. Event Delivery Reliability: 99.9% of events delivered to subscribers
  4. Pattern Runner Resilience: Failed reconnections trigger retry with exponential backoff
  5. Observability: All backend changes visible in admin UI and metrics
  6. Multi-Source Support: Events from Kubernetes, Terraform, and manual changes

Alternatives Considered

Alternative 1: Poll-Based Config Refresh

Approach: Pattern runners periodically poll admin for backend config changes

Pros:

  • Simple to implement
  • No event infrastructure needed

Cons:

  • Higher latency (poll interval = 30-60s)
  • Wasted admin API calls (polling with no changes)
  • No real-time notifications

Rejected: Poor latency and efficiency

Alternative 2: Webhook-Based Notifications

Approach: Admin sends HTTP webhooks to pattern runners on changes

Pros:

  • Standard HTTP protocol
  • Firewall-friendly

Cons:

  • Pattern runners need HTTP server (additional port)
  • No built-in retry on webhook failure
  • Requires service discovery (how does admin find pattern runners?)

Rejected: Adds complexity to pattern runners

Alternative 3: Kubernetes Watch API Only

Approach: Pattern runners directly watch Kubernetes CRDs/Secrets

Pros:

  • Native Kubernetes integration
  • Real-time notifications

Cons:

  • Only works for Kubernetes-hosted backends
  • No support for Terraform or manual changes
  • Pattern runners need K8s API access (RBAC complexity)

Rejected: Not portable beyond Kubernetes

References

Open Questions

  1. Event Ordering: Should we guarantee event ordering per backend?

    • Proposed: Use NATS JetStream with ordered consumers
  2. Event Retention: How long should events be retained?

    • Proposed: 7 days in NATS, 30 days in audit log
  3. Event Backpressure: What happens if pattern runner can't keep up with events?

    • Proposed: Drop events if channel full, rely on periodic refresh as fallback
  4. Cross-Cluster Events: How to propagate events across regions/clusters?

    • Proposed: Phase 2 feature—use NATS leaf nodes or NATS supercluster
  5. Event Replay: Should pattern runners be able to replay missed events?

    • Proposed: Yes, use NATS JetStream durable consumers with per-pattern checkpoints