ADR-058: Proxy Drain-on-Shutdown
Status
Accepted - 2025-10-16
Context
The prism-proxy needs graceful shutdown behavior when signaled to stop by prism-admin or receiving a termination signal. Current implementation immediately stops accepting connections and kills pattern processes, which can result in:
- Lost in-flight requests from clients
- Aborted backend operations mid-transaction
- Incomplete data writes
- Poor user experience during rolling updates
Requirements
- Frontend Drain Phase: Stop accepting NEW frontend connections while completing existing requests
- Backend Work Completion: Wait for all backend operations attached to frontend work to complete
- Pattern Runner Coordination: Signal pattern runners to drain (finish current work, reject new work)
- Clean Exit: Only exit when all frontend connections closed AND all pattern processes exited
- Timeout Safety: Force shutdown after timeout to prevent indefinite hangs
Current Architecture
┌─────────────────┐
│ prism-admin │ (sends stop command)
└────────┬────────┘
│ gRPC
▼
┌─────────────────┐
│ prism-proxy │◄──── Frontend gRPC connections (KeyValue, PubSub, etc.)
└────────┬────────┘
│ Lifecycle gRPC
▼
┌─────────────────┐
│ Pattern Runners │ (keyvalue-runner, consumer-runner, etc.)
│ (Go processes) │
└────────┬────────┘
│
▼
Backends (Redis, NATS, Postgres, etc.)
Decision
State Machine
Proxy states during shutdown:
Running → Draining → Stopping → Stopped
- Running: Normal operation, accepting all connections
- Draining:
- Reject NEW frontend connections (return UNAVAILABLE)
- Complete existing frontend requests
- Signal pattern runners to drain
- Track active request count
- Stopping:
- All frontend connections closed
- Send Stop to pattern runners
- Wait for pattern processes to exit
- Stopped: Clean exit
Implementation Components
1. Lifecycle.proto Extension
Add DrainRequest message to lifecycle.proto:
// Drain request tells pattern to enter drain mode
message DrainRequest {
// Graceful drain timeout in seconds
int32 timeout_seconds = 1;
// Reason for drain (for logging/debugging)
string reason = 2;
}
Add to ProxyCommand in proxy_control_plane.proto:
message ProxyCommand {
string correlation_id = 1;
oneof command {
// ... existing commands ...
DrainRequest drain = 8; // NEW
}
}
2. ProxyServer Drain State
Add connection tracking and drain state to ProxyServer:
pub struct ProxyServer {
router: Arc<Router>,
listen_address: String,
shutdown_tx: Option<oneshot::Sender<()>>,
// NEW: Drain state
drain_state: Arc<RwLock<DrainState>>,
active_connections: Arc<AtomicUsize>,
}
enum DrainState {
Running,
Draining { started_at: Instant },
Stopping,
}
3. Frontend Connection Interception
Use tonic interceptor to reject new connections during drain:
fn connection_interceptor(
mut req: Request<()>,
drain_state: Arc<RwLock<DrainState>>,
) -> Result<Request<()>, Status> {
let state = drain_state.read().await;
match *state {
DrainState::Draining { .. } | DrainState::Stopping => {
Err(Status::unavailable("Server is draining, not accepting new connections"))
}
DrainState::Running => Ok(req),
}
}
4. Pattern Runner Drain Logic
Pattern runners receive DrainRequest via control plane and:
- Stop accepting new work (return UNAVAILABLE on new RPCs)
- Complete pending backend operations
- Send completion signal back to proxy
- Wait for Stop command
Example in keyvalue-runner:
func (a *KeyValuePluginAdapter) Drain(ctx context.Context, timeoutSeconds int32) error {
log.Printf("[DRAIN] Entering drain mode (timeout: %ds)", timeoutSeconds)
// Set drain flag
a.draining.Store(true)
// Wait for pending operations (with timeout)
deadline := time.Now().Add(time.Duration(timeoutSeconds) * time.Second)
for a.pendingOps.Load() > 0 {
if time.Now().After(deadline) {
log.Printf("[DRAIN] Timeout waiting for %d pending ops", a.pendingOps.Load())
break
}
time.Sleep(50 * time.Millisecond)
}
log.Printf("[DRAIN] Drain complete, ready for stop")
return nil
}
5. Shutdown Orchestration
New ProxyServer::drain_and_shutdown() method:
pub async fn drain_and_shutdown(&mut self, timeout: Duration) -> Result<()> {
// Phase 1: Enter drain mode
{
let mut state = self.drain_state.write().await;
*state = DrainState::Draining { started_at: Instant::now() };
}
info!("🔸 Entering DRAIN mode");
// Phase 2: Signal pattern runners to drain
self.router.pattern_manager.drain_all_patterns(timeout).await?;
// Phase 3: Wait for frontend connections to complete
let poll_interval = Duration::from_millis(100);
let deadline = Instant::now() + timeout;
while self.active_connections.load(Ordering::Relaxed) > 0 {
if Instant::now() > deadline {
warn!("⏱️ Drain timeout, {} connections still active",
self.active_connections.load(Ordering::Relaxed));
break;
}
sleep(poll_interval).await;
}
info!("✅ All frontend connections drained");
// Phase 4: Stop pattern runners
{
let mut state = self.drain_state.write().await;
*state = DrainState::Stopping;
}
info!("🔹 Entering STOPPING mode");
self.router.pattern_manager.stop_all_patterns().await?;
// Phase 5: Shutdown gRPC server
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
info!("✅ Proxy shutdown complete");
Ok(())
}
Admin Control Plane Integration
Admin sends drain command via new RPC:
service AdminControlPlane {
// ... existing RPCs ...
// Initiate graceful drain and shutdown
rpc DrainProxy(DrainProxyRequest) returns (DrainProxyResponse);
}
message DrainProxyRequest {
int32 timeout_seconds = 1;
string reason = 2;
}
message DrainProxyResponse {
bool success = 1;
string message = 2;
}
Timeout Handling
- Default drain timeout: 30 seconds
- Configurable via: Admin request or environment variable
PRISM_DRAIN_TIMEOUT_SECONDS - Behavior on timeout: Force shutdown with warning logs
- Per-component timeouts:
- Frontend connections: 30s
- Pattern runners: 30s
- Backend operations: Determined by pattern runner logic
Consequences
Positive
- ✅ Zero data loss: All in-flight operations complete before shutdown
- ✅ Graceful rolling updates: Kubernetes can drain pods safely
- ✅ Better observability: Clear state transitions logged
- ✅ Configurable timeouts: Operators control drain duration
- ✅ Backwards compatible: Existing Stop behavior preserved as fallback
Negative
- ⚠️ Increased shutdown time: From instant to 30+ seconds
- ⚠️ Complexity: More state tracking and coordination logic
- ⚠️ Potential timeout issues: Slow backends can cause forced shutdowns
Risks
- Stuck drains: If backend operations hang, timeout must force shutdown
- Mitigation: Configurable timeouts, forced kill after 2x timeout
- Connection leaks: If connections aren't tracked properly
- Mitigation: Comprehensive integration tests with connection counting
Implementation Plan
- Phase 1: Protobuf changes (lifecycle.proto, proxy_control_plane.proto)
- Phase 2: ProxyServer drain state and connection tracking
- Phase 3: Pattern runner drain logic (plugin SDK changes)
- Phase 4: Admin control plane drain RPC
- Phase 5: Integration tests with real backend operations
- Phase 6: Documentation and runbooks
Testing Strategy
Unit Tests
- State transitions (Running → Draining → Stopping → Stopped)
- Connection counting accuracy
- Timeout enforcement
Integration Tests
- Happy path: Start proxy, send requests, drain, verify completion
- Timeout path: Long-running operations, verify forced shutdown
- Connection rejection: New connections during drain return UNAVAILABLE
- Pattern coordination: Multiple pattern runners drain in parallel
Load Testing
- 1000 concurrent connections
- Trigger drain mid-load
- Measure: completion rate, drain duration, error rate
References
- RFC-016: Local Development Infrastructure (shutdown patterns)
- ADR-048: Local Signoz Observability (shutdown tracing)
- ADR-055: Control Plane Connectivity (admin → proxy communication)
- Kubernetes Pod Lifecycle: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-termination
Related Work
Similar patterns in industry:
- Envoy:
drain_listeners+drain_connections_on_host_removal - gRPC Go:
GracefulStop()drains connections before shutdown - Kubernetes:
preStophooks +terminationGracePeriodSeconds