Infrastructure Change Events via Terraform Provider Patterns
Abstract
This RFC explores how Terraform provider API patterns can be adapted to create a push-based event notification system for infrastructure changes in Prism. Instead of pattern runners polling for backend configuration changes, this system would enable real-time notifications when backends are modified, credentials rotate, or infrastructure state changes.
The core insight from Terraform providers is their Read/Refresh cycle combined with state drift detection—mechanisms that continuously compare desired state vs. actual state and generate events when divergence is detected.
Motivation
Current State: Pull-Based Configuration
Per RFC-039 and RFC-042, Prism currently uses a pull-based model:
┌─────────────────────────────────────────────────────────────────┐
│ Current Flow (Pull-Based) │
│ │
│ Pattern Runner Startup │
│ ├─ Call GetBackend("kafka-prod") → Admin Control Plane │
│ ├─ Receive backend config │
│ ├─ Connect to backend │
│ └─ Run until restart │
│ │
│ Problem: Configuration Changes Require Manual Intervention │
│ ├─ Admin updates backend config in registry │
│ ├─ Pattern runners don't know about the change │
│ ├─ Operators must restart pattern runners manually │
│ └─ Downtime or stale connections until restart │
└─────────────────────────────────────────────────────────────────┘
Desired State: Push-Based Event Notifications
Terraform providers solve a similar problem: detecting when infrastructure deviates from desired state. Key patterns we can adapt:
- Refresh Cycle: Providers continuously check actual state vs. stored state
- Drift Detection: Generate events when divergence is detected
- Resource Watches: Monitor infrastructure for changes
- State Reconciliation: Trigger workflows to converge actual → desired
┌─────────────────────────────────────────────────────────────────┐
│ Desired Flow (Event-Based) │
│ │
│ Pattern Runner │
│ ├─ Subscribe to backend change events │
│ ├─ Receive event: "kafka-prod config updated" │
│ ├─ Fetch new config from admin │
│ ├─ Reconnect to backend with new credentials │
│ └─ Continue processing (zero downtime) │
│ │
│ Admin Control Plane │
│ ├─ Monitor backend health │
│ ├─ Detect credential rotation │
│ ├─ Detect backend failure/recovery │
│ └─ Publish events to subscribed pattern runners │
└─────────────────────────────────────────────────────────────────┘
Use Cases
- Credential Rotation: Kubernetes secrets updated → Pattern runners reconnect with new credentials
- Backend Failover: Primary Kafka broker down → Pattern runners switch to replica
- Configuration Updates: Admin changes backend settings → Pattern runners hot-reload config
- Infrastructure Drift: Backend manually modified outside Prism → Admin detects and reconciles
- Health Monitoring: Backend becomes unhealthy → Pattern runners reroute traffic
Goals
- Event-Driven Backend Discovery: Pattern runners receive push notifications on backend changes
- Terraform-Inspired State Management: Adapt provider patterns (Read/Refresh/Drift) to Prism
- Zero-Downtime Updates: Pattern runners hot-reload configs without restart
- Infrastructure Observability: Centralized visibility into backend state changes
- Reconciliation Loops: Automatically converge actual state → desired state
- Multi-Source Events: Aggregate signals from Kubernetes, Terraform, cloud APIs, manual changes
Non-Goals
- Full Terraform Provider Implementation: We're adapting patterns, not building a Terraform provider
- Declarative Infrastructure: This RFC focuses on event notification, not provisioning
- Multi-Cloud Orchestration: Initial focus is Kubernetes; cloud providers are future work
- Real-Time Streaming: Events are near-real-time (seconds), not sub-second latency
Background: Terraform Provider API Patterns
How Terraform Providers Work
Terraform providers implement the CRUD + Refresh lifecycle:
// Terraform Provider Schema (simplified)
type Provider struct {
// Resource lifecycle hooks
Create func(d *ResourceData) error
Read func(d *ResourceData) error // ← Key for drift detection
Update func(d *ResourceData) error
Delete func(d *ResourceData) error
}
// Terraform Refresh Cycle
func (p *Provider) RefreshState(ctx context.Context, resource *Resource) (*Resource, error) {
// 1. Read actual state from infrastructure
actualState, err := p.readInfrastructure(ctx, resource.ID)
if err != nil {
return nil, err
}
// 2. Compare actual vs. stored state
if !reflect.DeepEqual(actualState, resource.StoredState) {
// 3. Generate drift event
p.emitDriftEvent(resource, actualState, resource.StoredState)
// 4. Update stored state
resource.StoredState = actualState
}
return resource, nil
}
Key Patterns to Adapt
1. Read/Refresh Cycle
Terraform: Periodically calls Read() to refresh state from infrastructure
Prism: Admin control plane periodically checks backend health and config
// Prism Backend State Refresh
type BackendMonitor struct {
adminClient admin.ControlPlaneClient
k8sClient kubernetes.Interface
refreshRate time.Duration
}
func (m *BackendMonitor) RefreshBackend(ctx context.Context, backendName string) error {
// 1. Fetch stored backend config from admin
storedBackend, err := m.adminClient.GetBackend(ctx, backendName)
if err != nil {
return err
}
// 2. Read actual state from infrastructure (e.g., Kubernetes)
actualBackend, err := m.readActualState(ctx, storedBackend)
if err != nil {
return err
}
// 3. Compare and detect drift
drift := m.detectDrift(storedBackend, actualBackend)
if len(drift) > 0 {
// 4. Publish drift events
for _, change := range drift {
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: change.Type,
OldValue: change.OldValue,
NewValue: change.NewValue,
})
}
// 5. Update admin state
m.adminClient.UpdateBackend(ctx, actualBackend)
}
return nil
}
2. State Drift Detection
Terraform: Compares desired_state (in .tf files) vs. actual_state (from provider Read)
Prism: Compares admin_state (backend registry) vs. actual_state (Kubernetes/cloud)
type StateDrift struct {
Field string // e.g., "config.kafka.brokers"
OldValue interface{} // ["kafka-01:9092"]
NewValue interface{} // ["kafka-02:9092", "kafka-03:9092"]
DriftType DriftType // CREDENTIAL_CHANGE, CONFIG_UPDATE, HEALTH_STATUS, etc.
}
func (m *BackendMonitor) detectDrift(stored, actual *Backend) []StateDrift {
var drifts []StateDrift
// Example: Detect broker list changes
if !reflect.DeepEqual(stored.Config.GetKafka().Brokers, actual.Config.GetKafka().Brokers) {
drifts = append(drifts, StateDrift{
Field: "config.kafka.brokers",
OldValue: stored.Config.GetKafka().Brokers,
NewValue: actual.Config.GetKafka().Brokers,
DriftType: DRIFT_TYPE_CONFIG_UPDATE,
})
}
// Example: Detect secret rotation
if stored.Config.GetKafka().Auth.GetSasl().Password != actual.Config.GetKafka().Auth.GetSasl().Password {
drifts = append(drifts, StateDrift{
Field: "config.kafka.auth.sasl.password",
OldValue: "[REDACTED]",
NewValue: "[REDACTED]",
DriftType: DRIFT_TYPE_CREDENTIAL_ROTATION,
})
}
return drifts
}
3. Resource Watchers
Terraform: Providers can watch cloud APIs for resource changes (via webhooks, polling, or event streams) Prism: Admin watches Kubernetes resources, cloud APIs, and internal state
// Kubernetes Secret Watcher
func (m *BackendMonitor) WatchSecrets(ctx context.Context) error {
watcher, err := m.k8sClient.CoreV1().Secrets("infrastructure").Watch(ctx, metav1.ListOptions{
LabelSelector: "prism.io/backend=true",
})
if err != nil {
return err
}
for event := range watcher.ResultChan() {
secret := event.Object.(*corev1.Secret)
backendName := secret.Labels["prism.io/backend-name"]
switch event.Type {
case watch.Modified:
// Secret updated (credential rotation)
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: CHANGE_TYPE_CREDENTIAL_ROTATION,
Source: "kubernetes-secret",
})
case watch.Deleted:
// Secret deleted (backend unavailable)
m.publishEvent(ctx, BackendChangeEvent{
BackendName: backendName,
ChangeType: CHANGE_TYPE_BACKEND_DELETED,
Source: "kubernetes-secret",
})
}
}
return nil
}
4. State Reconciliation
Terraform: When drift is detected, Terraform can auto-apply or prompt user to reconcile Prism: Admin can auto-reconcile or notify operators
type ReconciliationPolicy string
const (
RECONCILE_AUTO ReconciliationPolicy = "auto" // Auto-update stored state
RECONCILE_MANUAL ReconciliationPolicy = "manual" // Notify operator, wait for approval
RECONCILE_IGNORE ReconciliationPolicy = "ignore" // Log drift but don't reconcile
)
func (m *BackendMonitor) Reconcile(ctx context.Context, drift StateDrift, policy ReconciliationPolicy) error {
switch policy {
case RECONCILE_AUTO:
// Automatically update admin state to match actual state
return m.adminClient.UpdateBackend(ctx, drift.NewValue)
case RECONCILE_MANUAL:
// Create admin notification for operator approval
return m.adminClient.CreateNotification(ctx, Notification{
Title: fmt.Sprintf("Backend drift detected: %s", drift.BackendName),
Message: fmt.Sprintf("Field %s changed from %v to %v", drift.Field, drift.OldValue, drift.NewValue),
Actions: []string{"APPROVE", "REJECT", "IGNORE"},
})
case RECONCILE_IGNORE:
// Just log the drift
log.Warnf("Backend drift detected but ignored: %s", drift.Field)
return nil
}
return nil
}
Design: Prism Backend Event System
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ Prism Event-Driven Backend System │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Sources (Infrastructure Signals) │ │
│ │ │ │
│ │ Kubernetes Terraform Cloud Manual │ │
│ │ ├─ PrismBackend CRDs ├─ Terraform state Admin UI │ │
│ │ ├─ Secrets (auth) ├─ Resource changes CLI │ │
│ │ ├─ Services (endpoints) ├─ Drift detection API │ │
│ │ └─ ConfigMaps └─ Plan/Apply events │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (watch/poll) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Backend State Monitor (Prism Admin Component) │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ State Refresh Loop │ │ │
│ │ │ ├─ Kubernetes Watcher (PrismBackend CRDs, Secrets) │ │ │
│ │ │ ├─ Terraform State Watcher (via Terraform Cloud API) │ │ │
│ │ │ ├─ Cloud API Pollers (AWS, GCP, Azure) │ │ │
│ │ │ └─ Health Checkers (backend connectivity tests) │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Drift Detector │ │ │
│ │ │ ├─ Compare admin_state vs. actual_state │ │ │
│ │ │ ├─ Classify drift type (credential, config, health) │ │ │
│ │ │ └─ Generate BackendChangeEvents │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (publish events) │
│ ┌────────────────────────────────────── ──────────────────────────────┐ │
│ │ Event Bus (NATS/Kafka/Internal) │ │
│ │ │ │
│ │ Topics: │ │
│ │ ├─ backend.*.config.updated │ │
│ │ ├─ backend.*.credentials.rotated │ │
│ │ ├─ backend.*.health.changed │ │
│ │ └─ backend.*.deleted │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ (subscribe) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Event Consumers │ │
│ │ │ │
│ │ Pattern Runners Prism Operator Monitoring │ │
│ │ ├─ Hot-reload configs ├─ Update CRD status ├─ Alert on drift │ │
│ │ ├─ Reconnect backends ├─ Trigger reconcile ├─ Audit log │ │
│ │ └─ Update local cache └─ Update K8s secrets └─ Metrics │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Event Schema
// Backend change event
message BackendChangeEvent {
string event_id = 1; // Unique event ID
string backend_name = 2; // e.g., "kafka-prod"
BackendChangeType change_type = 3; // Type of change
string source = 4; // "kubernetes", "terraform", "manual"
int64 timestamp = 5; // Event timestamp
// Old and new values (for drift)
Backend old_state = 6;
Backend new_state = 7;
// Affected fields
repeated string changed_fields = 8; // ["config.kafka.brokers", "metadata.region"]
// Reconciliation metadata
ReconciliationPolicy policy = 9;
string reconciliation_status = 10; // "pending", "approved", "rejected"
// Observability
map<string, string> metadata = 11;
repeated string affected_patterns = 12; // Patterns using this backend
}
enum BackendChangeType {
BACKEND_CHANGE_TYPE_UNSPECIFIED = 0;
BACKEND_CHANGE_TYPE_CREATED = 1;
BACKEND_CHANGE_TYPE_UPDATED = 2;
BACKEND_CHANGE_TYPE_DELETED = 3;
BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED = 4;
BACKEND_CHANGE_TYPE_HEALTH_CHANGED = 5;
BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED = 6;
BACKEND_CHANGE_TYPE_CONFIG_DRIFT_DETECTED = 7;
}
// Event subscription
message BackendEventSubscription {
string subscriber_id = 1; // Pattern runner ID
repeated string backend_names = 2; // Backends to watch
repeated BackendChangeType event_types = 3; // Filter by event type
string callback_endpoint = 4; // gRPC endpoint for notifications
}
Component 1: Backend State Monitor
New admin component that implements Terraform-inspired refresh cycle:
// pkg/admin/backend_monitor.go
type BackendStateMonitor struct {
adminClient admin.ControlPlaneClient
k8sClient kubernetes.Interface
terraformClient *tfe.Client // Terraform Cloud API
eventPublisher EventPublisher
refreshInterval time.Duration
// State tracking
lastKnownStates map[string]*Backend
driftDetector *DriftDetector
}
func NewBackendStateMonitor(
adminClient admin.ControlPlaneClient,
k8sClient kubernetes.Interface,
eventPublisher EventPublisher,
) *BackendStateMonitor {
return &BackendStateMonitor{
adminClient: adminClient,
k8sClient: k8sClient,
eventPublisher: eventPublisher,
refreshInterval: 30 * time.Second,
lastKnownStates: make(map[string]*Backend),
driftDetector: NewDriftDetector(),
}
}
func (m *BackendStateMonitor) Start(ctx context.Context) error {
// Start periodic refresh loop
go m.refreshLoop(ctx)
// Start Kubernetes watchers
go m.watchKubernetesSecrets(ctx)
go m.watchKubernetesCRDs(ctx)
// Start Terraform watcher (if configured)
if m.terraformClient != nil {
go m.watchTerraformState(ctx)
}
return nil
}
// Terraform-style refresh loop
func (m *BackendStateMonitor) refreshLoop(ctx context.Context) {
ticker := time.NewTicker(m.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.refreshAllBackends(ctx)
}
}
}
func (m *BackendStateMonitor) refreshAllBackends(ctx context.Context) {
// 1. List all backends from admin
backends, err := m.adminClient.ListBackends(ctx, &adminpb.ListBackendsRequest{})
if err != nil {
log.Errorf("Failed to list backends: %v", err)
return
}
// 2. Refresh each backend
for _, backend := range backends.Backends {
if err := m.refreshBackend(ctx, backend); err != nil {
log.Errorf("Failed to refresh backend %s: %v", backend.Name, err)
}
}
}
func (m *BackendStateMonitor) refreshBackend(ctx context.Context, storedBackend *adminpb.Backend) error {
// 1. Read actual state from infrastructure
actualBackend, err := m.readActualState(ctx, storedBackend)
if err != nil {
return fmt.Errorf("failed to read actual state: %w", err)
}
// 2. Detect drift
drifts := m.driftDetector.Detect(storedBackend, actualBackend)
if len(drifts) == 0 {
// No changes
return nil
}
// 3. Publish events for each drift
for _, drift := range drifts {
event := &BackendChangeEvent{
EventId: uuid.New().String(),
BackendName: storedBackend.Name,
ChangeType: m.mapDriftToChangeType(drift),
Source: drift.Source,
Timestamp: time.Now().Unix(),
OldState: storedBackend,
NewState: actualBackend,
ChangedFields: []string{drift.Field},
}
if err := m.eventPublisher.Publish(ctx, event); err != nil {
log.Errorf("Failed to publish event: %v", err)
}
}
// 4. Update stored state (auto-reconcile)
m.lastKnownStates[storedBackend.Name] = actualBackend
return nil
}
func (m *BackendStateMonitor) readActualState(ctx context.Context, backend *adminpb.Backend) (*adminpb.Backend, error) {
switch backend.Type {
case adminpb.BackendType_BACKEND_TYPE_KAFKA:
return m.readKafkaState(ctx, backend)
case adminpb.BackendType_BACKEND_TYPE_POSTGRES:
return m.readPostgresState(ctx, backend)
case adminpb.BackendType_BACKEND_TYPE_REDIS:
return m.readRedisState(ctx, backend)
default:
return nil, fmt.Errorf("unsupported backend type: %v", backend.Type)
}
}
// Read actual Kafka state from Kubernetes
func (m *BackendStateMonitor) readKafkaState(ctx context.Context, backend *adminpb.Backend) (*adminpb.Backend, error) {
// 1. Read Kubernetes service (get actual endpoints)
svc, err := m.k8sClient.CoreV1().Services("infrastructure").Get(ctx, "kafka", metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Kafka service: %w", err)
}
// 2. Read Kubernetes secret (get current credentials)
secret, err := m.k8sClient.CoreV1().Secrets("infrastructure").Get(ctx, "kafka-credentials", metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get Kafka secret: %w", err)
}
// 3. Build actual backend config
actualBackend := proto.Clone(backend).(*adminpb.Backend)
kafkaConfig := actualBackend.Config.GetKafka()
// Update brokers from service endpoints
if svc.Spec.ClusterIP != "" {
kafkaConfig.Brokers = []string{fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, svc.Spec.Ports[0].Port)}
}
// Update password from secret
if password, ok := secret.Data["password"]; ok {
kafkaConfig.Auth = &adminpb.KafkaAuth{
Auth: &adminpb.KafkaAuth_Sasl{
Sasl: &adminpb.KafkaSASL{
Password: string(password),
},
},
}
}
return actualBackend, nil
}
Component 2: Drift Detector
// pkg/admin/drift_detector.go
type DriftDetector struct {
// Configuration for what constitutes "drift"
ignoreFields []string // Fields to ignore (e.g., "updated_at")
}
func NewDriftDetector() *DriftDetector {
return &DriftDetector{
ignoreFields: []string{"updated_at", "created_at"},
}
}
type Drift struct {
Field string
OldValue interface{}
NewValue interface{}
DriftType DriftType
Source string // "kubernetes", "terraform", "health-check"
}
type DriftType int
const (
DriftTypeConfigUpdate DriftType = iota
DriftTypeCredentialRotation
DriftTypeEndpointChange
DriftTypeHealthStatusChange
)
func (d *DriftDetector) Detect(stored, actual *adminpb.Backend) []Drift {
var drifts []Drift
// Compare backend configs
switch stored.Type {
case adminpb.BackendType_BACKEND_TYPE_KAFKA:
drifts = append(drifts, d.detectKafkaDrift(stored, actual)...)
case adminpb.BackendType_BACKEND_TYPE_POSTGRES:
drifts = append(drifts, d.detectPostgresDrift(stored, actual)...)
case adminpb.BackendType_BACKEND_TYPE_REDIS:
drifts = append(drifts, d.detectRedisDrift(stored, actual)...)
}
return drifts
}
func (d *DriftDetector) detectKafkaDrift(stored, actual *adminpb.Backend) []Drift {
var drifts []Drift
storedKafka := stored.Config.GetKafka()
actualKafka := actual.Config.GetKafka()
// Check broker list changes
if !reflect.DeepEqual(storedKafka.Brokers, actualKafka.Brokers) {
drifts = append(drifts, Drift{
Field: "config.kafka.brokers",
OldValue: storedKafka.Brokers,
NewValue: actualKafka.Brokers,
DriftType: DriftTypeEndpointChange,
Source: "kubernetes-service",
})
}
// Check password changes (credential rotation)
storedPassword := storedKafka.GetAuth().GetSasl().GetPassword()
actualPassword := actualKafka.GetAuth().GetSasl().GetPassword()
if storedPassword != actualPassword {
drifts = append(drifts, Drift{
Field: "config.kafka.auth.sasl.password",
OldValue: "[REDACTED]",
NewValue: "[REDACTED]",
DriftType: DriftTypeCredentialRotation,
Source: "kubernetes-secret",
})
}
return drifts
}
Component 3: Event Publisher
// pkg/admin/event_publisher.go
type EventPublisher interface {
Publish(ctx context.Context, event *BackendChangeEvent) error
Subscribe(ctx context.Context, subscription *BackendEventSubscription) (<-chan *BackendChangeEvent, error)
}
// NATS-based event publisher
type NatsEventPublisher struct {
nc *nats.Conn
}
func NewNatsEventPublisher(url string) (*NatsEventPublisher, error) {
nc, err := nats.Connect(url)
if err != nil {
return nil, err
}
return &NatsEventPublisher{nc: nc}, nil
}
func (p *NatsEventPublisher) Publish(ctx context.Context, event *BackendChangeEvent) error {
// Build NATS subject: backend.{backend_name}.{change_type}
subject := fmt.Sprintf("backend.%s.%s", event.BackendName, event.ChangeType)
// Serialize event to protobuf
data, err := proto.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
// Publish to NATS
if err := p.nc.Publish(subject, data); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
log.Infof("Published event: %s to subject %s", event.EventId, subject)
return nil
}
func (p *NatsEventPublisher) Subscribe(ctx context.Context, subscription *BackendEventSubscription) (<-chan *BackendChangeEvent, error) {
eventChan := make(chan *BackendChangeEvent, 100)
// Build NATS subject patterns
for _, backendName := range subscription.BackendNames {
subject := fmt.Sprintf("backend.%s.*", backendName)
sub, err := p.nc.Subscribe(subject, func(msg *nats.Msg) {
var event BackendChangeEvent
if err := proto.Unmarshal(msg.Data, &event); err != nil {
log.Errorf("Failed to unmarshal event: %v", err)
return
}
// Filter by event types
if len(subscription.EventTypes) > 0 {
found := false
for _, eventType := range subscription.EventTypes {
if event.ChangeType == eventType {
found = true
break
}
}
if !found {
return
}
}
select {
case eventChan <- &event:
default:
log.Warnf("Event channel full, dropping event %s", event.EventId)
}
})
if err != nil {
return nil, fmt.Errorf("failed to subscribe to subject %s: %w", subject, err)
}
// Handle context cancellation
go func() {
<-ctx.Done()
sub.Unsubscribe()
close(eventChan)
}()
}
return eventChan, nil
}
Component 4: Pattern Runner Event Consumer
// Pattern runner subscribes to backend change events
type PatternRunner struct {
adminClient admin.ControlPlaneClient
eventPublisher EventPublisher
slotBinder *SlotBinder
// Current slot bindings
slotBindings map[string]*Slot // slot_name → slot implementation
}
func (r *PatternRunner) Start(ctx context.Context, config *PatternConfig) error {
// 1. Initial backend binding (as before)
if err := r.bindSlots(ctx, config.SlotBindings); err != nil {
return err
}
// 2. Subscribe to backend change events
subscription := &BackendEventSubscription{
SubscriberId: r.patternID,
BackendNames: r.getBackendNames(config.SlotBindings),
EventTypes: []BackendChangeType{
BACKEND_CHANGE_TYPE_UPDATED,
BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED,
BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED,
},
}
eventChan, err := r.eventPublisher.Subscribe(ctx, subscription)
if err != nil {
return fmt.Errorf("failed to subscribe to backend events: %w", err)
}
// 3. Start event handling loop
go r.handleBackendEvents(ctx, eventChan)
// 4. Start processing
return r.startProcessing(ctx)
}
func (r *PatternRunner) handleBackendEvents(ctx context.Context, eventChan <-chan *BackendChangeEvent) {
for {
select {
case <-ctx.Done():
return
case event := <-eventChan:
log.Infof("Received backend change event: %s for backend %s", event.ChangeType, event.BackendName)
// Handle event based on type
switch event.ChangeType {
case BACKEND_CHANGE_TYPE_CREDENTIAL_ROTATED:
// Hot-reload credentials
if err := r.rotateCredentials(ctx, event); err != nil {
log.Errorf("Failed to rotate credentials for backend %s: %v", event.BackendName, err)
}
case BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED:
// Reconnect to new endpoints
if err := r.reconnectBackend(ctx, event); err != nil {
log.Errorf("Failed to reconnect to backend %s: %v", event.BackendName, err)
}
case BACKEND_CHANGE_TYPE_UPDATED:
// General config update
if err := r.reloadBackendConfig(ctx, event); err != nil {
log.Errorf("Failed to reload config for backend %s: %v", event.BackendName, err)
}
}
}
}
}
func (r *PatternRunner) rotateCredentials(ctx context.Context, event *BackendChangeEvent) error {
// 1. Fetch new backend config
newBackend := event.NewState
// 2. Find slot using this backend
var targetSlot *Slot
var slotName string
for name, slot := range r.slotBindings {
if slot.BackendName == event.BackendName {
targetSlot = slot
slotName = name
break
}
}
if targetSlot == nil {
return fmt.Errorf("no slot found for backend %s", event.BackendName)
}
// 3. Update credentials on existing connection
if err := targetSlot.UpdateCredentials(ctx, newBackend.Config); err != nil {
// If hot-reload fails, reconnect
log.Warnf("Hot credential reload failed, reconnecting slot %s", slotName)
return r.reconnectSlot(ctx, slotName, newBackend)
}
log.Infof("Successfully rotated credentials for slot %s", slotName)
return nil
}
func (r *PatternRunner) reconnectBackend(ctx context.Context, event *BackendChangeEvent) error {
// Similar to rotateCredentials, but force reconnect
newBackend := event.NewState
for slotName, slot := range r.slotBindings {
if slot.BackendName == event.BackendName {
log.Infof("Reconnecting slot %s to backend %s", slotName, event.BackendName)
// Disconnect old connection
slot.Disconnect(ctx)
// Create new slot with updated config
newSlot, err := r.slotBinder.BindSlot(ctx, slotName, event.BackendName, slot.SlotType)
if err != nil {
return fmt.Errorf("failed to bind slot %s: %w", slotName, err)
}
// Replace slot
r.slotBindings[slotName] = newSlot
log.Infof("Successfully reconnected slot %s", slotName)
}
}
return nil
}
Integration with Kubernetes Operator
Per RFC-042, the Prism Operator already watches PrismBackend CRDs. We extend it to publish events:
// prism-operator backend controller
func (r *PrismBackendReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var backend prismv1alpha1.PrismBackend
if err := r.Get(ctx, req.NamespacedName, &backend); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 1. Validate service and secret exist (as before)
// ...
// 2. Check if backend config changed
oldStatus := backend.Status.ResolvedEndpoint
newStatus := fmt.Sprintf("%s.%s.svc:%d", svcName, svcNamespace, backend.Spec.Discovery.Service.Port)
if oldStatus != newStatus {
// 3. Publish endpoint change event
r.eventPublisher.Publish(ctx, &BackendChangeEvent{
BackendName: backend.Name,
ChangeType: BACKEND_CHANGE_TYPE_ENDPOINT_CHANGED,
Source: "kubernetes-operator",
Timestamp: time.Now().Unix(),
ChangedFields: []string{"discovery.service.port"},
})
}
// 4. Update status (as before)
return r.updateStatus(ctx, &backend, "Discovered", newStatus)
}
Integration with Terraform
For backends managed via Terraform, we can use Terraform Cloud API or state file polling:
// pkg/admin/terraform_watcher.go
type TerraformStateWatcher struct {
client *tfe.Client
organization string
workspace string
eventPublisher EventPublisher
pollInterval time.Duration
}
func (w *TerraformStateWatcher) Watch(ctx context.Context) error {
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := w.checkTerraformState(ctx); err != nil {
log.Errorf("Failed to check Terraform state: %v", err)
}
}
}
}
func (w *TerraformStateWatcher) checkTerraformState(ctx context.Context) error {
// 1. Fetch current Terraform state
currentStateVersion, err := w.client.StateVersions.ReadCurrent(ctx, w.workspace)
if err != nil {
return fmt.Errorf("failed to read Terraform state: %w", err)
}
// 2. Download state file
stateFile, err := w.client.StateVersions.Download(ctx, currentStateVersion.DownloadURL)
if err != nil {
return fmt.Errorf("failed to download Terraform state: %w", err)
}
// 3. Parse state and look for backend resources
var state tfstate.State
if err := json.Unmarshal(stateFile, &state); err != nil {
return fmt.Errorf("failed to parse Terraform state: %w", err)
}
// 4. Check for backend resource changes
for _, resource := range state.Resources {
if resource.Type == "kubernetes_secret" || resource.Type == "helm_release" {
// Check if this resource corresponds to a Prism backend
if backendName, ok := resource.Attributes["labels"]["prism.io/backend-name"]; ok {
// Compare with admin state
if changed := w.detectTerraformDrift(ctx, backendName, resource); changed {
w.eventPublisher.Publish(ctx, &BackendChangeEvent{
BackendName: backendName,
ChangeType: BACKEND_CHANGE_TYPE_UPDATED,
Source: "terraform",
Timestamp: time.Now().Unix(),
})
}
}
}
}
return nil
}
Event Topic Hierarchy
backend.* # All backend events
├─ backend.{backend_name}.* # Events for specific backend
│ ├─ backend.{name}.created
│ ├─ backend.{name}.updated
│ ├─ backend.{name}.deleted
│ ├─ backend.{name}.credentials.rotated
│ ├─ backend.{name}.health.changed
│ └─ backend.{name}.config.updated
│
├─ backend.type.{backend_type}.* # Events by backend type
│ ├─ backend.type.kafka.*
│ ├─ backend.type.postgres.*
│ └─ backend.type.redis.*
│
└─ backend.source.{source}.* # Events by source
├─ backend.source.kubernetes.*
├─ backend.source.terraform.*
├─ backend.source.manual.*
└─ backend.source.health-check.*
Pattern runners subscribe to:
// Subscribe to all events for backends I use
"backend.kafka-prod.*"
"backend.redis-cache.*"
// Or subscribe to specific event types
"backend.*.credentials.rotated"
"backend.*.health.changed"
Example Scenarios
Scenario 1: Kubernetes Secret Rotation
┌─────────────────────────────────────────────────────────────────┐
│ 1. Kubernetes Admin rotates Kafka password │
│ │
│ kubectl create secret generic kafka-credentials \ │
│ --from-literal=password=new-password \ │
│ --dry-run=client -o yaml | kubectl apply -f - │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────── ────────────────────────────────────────────┐
│ 2. Backend State Monitor detects secret change │
│ │
│ K8s Secret Watcher: │
│ ├─ Receives watch event: Secret "kafka-credentials" Modified │
│ ├─ Reads new secret value │
│ ├─ Compares with admin state │
│ └─ Detects drift: password changed │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────── ─────────────────────────────┐
│ 3. Publish credential rotation event │
│ │
│ NATS Publish: │
│ Subject: backend.kafka-prod.credentials.rotated │
│ Payload: BackendChangeEvent{...} │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. Pattern Runners receive event │
│ │
│ order-consumer: │
│ ├─ Receives event on subscription │
│ ├─ Fetches new backend config from admin │
│ ├─ Calls slot.UpdateCredentials(new_password) │
│ └─ Continues processing (no downtime) │
│ │
│ analytics-ingest: │
│ ├─ Receives event on subscription │
│ ├─ Updates Kafka producer config │
│ └─ Reconnects to Kafka with new credentials │
└─────────────────────────────────────────────────────────────────┘
Scenario 2: Terraform-Managed Backend Update
┌─────────────────────────────────────────────────────────────────┐
│ 1. Operator updates Terraform config │
│ │
│ # main.tf │
│ resource "helm_release" "kafka" { │
│ replicas = 5 # scaled up from 3 │
│ } │
│ │
│ terraform apply │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. Terraform State Watcher detects change │
│ │
│ Poll Terraform Cloud API: │
│ ├─ Fetch current state version │
│ ├─ Download state file │
│ ├─ Parse resources │
│ ├─ Detect kafka helm_release changed │
│ └─ Extract new broker endpoints │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. Publish endpoint change event │
│ │
│ NATS Publish: │
│ Subject: backend.kafka-prod.config.updated │
│ Payload: BackendChangeEvent{ │
│ changed_fields: ["config.kafka.brokers"], │
│ old_value: ["kafka-01:9092", "kafka-02:9092", "kafka-03:9092"], │
│ new_value: ["kafka-01:9092", ..., "kafka-05:9092"] │
│ } │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. Admin updates backend registry │
│ 5. Pattern runners receive event and reconnect │
└─────────────────────────────────────────────────────────────────┘
Observability and Debugging
Admin UI Dashboard
Backend Events (Last 24 Hours)
┌────────────────────────────────────────────────────────────────┐
│ Backend Event Type Source Time │
├────────────────────────────────────────────────────────────────┤
│ kafka-prod credentials.rotated kubernetes 2min ago │
│ postgres-db health.changed health-check 15min ago │
│ redis-cache config.updated manual 1hr ago │
│ nats-messaging endpoint.changed terraform 3hr ago │
└────────────────────────────────────────────────────────────────┘
Backend Drift Detection
┌────────────────────────────────────────────────────────────────┐
│ Backend Drift Type Status Action │
├────────────────────────────────────────────────────────────────┤
│ kafka-prod Broker count mismatch Active Reconcile │
│ postgres-db Password changed Resolved Auto-applied │
└────────────────────────────────────────────────────────────────┘
Pattern Runner Subscriptions
┌────────────────────────────────────────────────────────────────┐
│ Pattern Subscribed Backends Status │
├────────────────────────────────────────────────────────────────┤
│ order-consumer kafka-prod, redis-cache Connected │
│ analytics-ingest kafka-prod Connected │
│ user-notifications postgres-db, nats-* Connected │
└────────────────────────────────────────────────────────────────┘
Metrics
# Backend state monitor metrics
prism_backend_refresh_total{backend="kafka-prod"} 1440
prism_backend_drift_detected_total{backend="kafka-prod",type="credential"} 3
prism_backend_event_published_total{backend="kafka-prod",event="credentials.rotated"} 3
prism_backend_state_refresh_duration_seconds{backend="kafka-prod"} 0.150
# Pattern runner metrics
prism_pattern_backend_event_received_total{pattern="order-consumer",backend="kafka-prod"} 3
prism_pattern_credential_rotation_duration_seconds{pattern="order-consumer"} 0.050
prism_pattern_backend_reconnect_total{pattern="order-consumer",backend="kafka-prod"} 0
Audit Log
{
"timestamp": "2025-10-24T10:15:30Z",
"event_type": "backend.credentials.rotated",
"backend_name": "kafka-prod",
"source": "kubernetes-secret",
"changed_fields": ["config.kafka.auth.sasl.password"],
"affected_patterns": ["order-consumer", "analytics-ingest"],
"reconciliation_status": "auto-applied",
"metadata": {
"kubernetes_secret": "kafka-credentials",
"kubernetes_namespace": "infrastructure",
"secret_version": "v2"
}
}
Implementation Plan
Phase 1: Core Event Infrastructure (Week 1-2)
- Define
BackendChangeEventprotobuf schema - Implement NATS-based event publisher/subscriber
- Add event topic hierarchy
- Create admin API for event subscriptions
- Unit tests for event serialization and routing
Phase 2: Backend State Monitor (Week 2-3)
- Implement
BackendStateMonitorwith refresh loop - Add Kubernetes Secret watcher
- Add Kubernetes PrismBackend CRD watcher
- Implement drift detector
- Add reconciliation policies
- Integration tests with kind cluster
Phase 3: Pattern Runner Integration (Week 3-4)
- Add event subscription to pattern runner startup
- Implement hot credential rotation
- Implement backend reconnection
- Add circuit breaker for failed reconnections
- End-to-end tests with credential rotation
Phase 4: Terraform Integration (Week 4-5)
- Implement Terraform Cloud API watcher
- Add Terraform state file polling
- Map Terraform resources to Prism backends
- Integration tests with Terraform Cloud sandbox
Phase 5: Observability (Week 5-6)
- Add Prometheus metrics for state monitor
- Add Prometheus metrics for pattern runners
- Create Grafana dashboards
- Implement audit logging
- Add admin UI for event monitoring
Phase 6: Documentation (Week 6)
- Write operator guide for event system
- Document event subscription patterns
- Create troubleshooting guide
- Write ADR for design decisions
- Update changelog
Success Criteria
- Zero-Downtime Credential Rotation: Pattern runners automatically reload credentials without restart
- Drift Detection Latency: Backend changes detected within 30 seconds
- Event Delivery Reliability: 99.9% of events delivered to subscribers
- Pattern Runner Resilience: Failed reconnections trigger retry with exponential backoff
- Observability: All backend changes visible in admin UI and metrics
- Multi-Source Support: Events from Kubernetes, Terraform, and manual changes
Alternatives Considered
Alternative 1: Poll-Based Config Refresh
Approach: Pattern runners periodically poll admin for backend config changes
Pros:
- Simple to implement
- No event infrastructure needed
Cons:
- Higher latency (poll interval = 30-60s)
- Wasted admin API calls (polling with no changes)
- No real-time notifications
Rejected: Poor latency and efficiency
Alternative 2: Webhook-Based Notifications
Approach: Admin sends HTTP webhooks to pattern runners on changes
Pros:
- Standard HTTP protocol
- Firewall-friendly
Cons:
- Pattern runners need HTTP server (additional port)
- No built-in retry on webhook failure
- Requires service discovery (how does admin find pattern runners?)
Rejected: Adds complexity to pattern runners
Alternative 3: Kubernetes Watch API Only
Approach: Pattern runners directly watch Kubernetes CRDs/Secrets
Pros:
- Native Kubernetes integration
- Real-time notifications
Cons:
- Only works for Kubernetes-hosted backends
- No support for Terraform or manual changes
- Pattern runners need K8s API access (RBAC complexity)
Rejected: Not portable beyond Kubernetes
References
- RFC-039: Backend Configuration Registry
- RFC-042: Kubernetes Backend Discovery and Automated Binding
- ADR-037: Kubernetes Operator with Custom Resource Definitions
- Terraform Provider Framework
- Kubernetes Watch API
- NATS Publish-Subscribe
Open Questions
-
Event Ordering: Should we guarantee event ordering per backend?
- Proposed: Use NATS JetStream with ordered consumers
-
Event Retention: How long should events be retained?
- Proposed: 7 days in NATS, 30 days in audit log
-
Event Backpressure: What happens if pattern runner can't keep up with events?
- Proposed: Drop events if channel full, rely on periodic refresh as fallback
-
Cross-Cluster Events: How to propagate events across regions/clusters?
- Proposed: Phase 2 feature—use NATS leaf nodes or NATS supercluster
-
Event Replay: Should pattern runners be able to replay missed events?
- Proposed: Yes, use NATS JetStream durable consumers with per-pattern checkpoints