RFC-041: Graceful Subprocess Shutdown Protocol
Status
Implemented - Implementation completed 2025-01-22
Phase 1-3 (Core Protocol): ✅ Complete
- Phase 1: Protocol definition in
proto/prism/interfaces/lifecycle.proto(commit 07227173)- ProcessLifecycleInterface service with Shutdown, GetShutdownStatus, NotifyShutdownComplete, GetReadinessStatus, NotifyReady RPCs
- All message types with Process* prefix to avoid conflicts
- Go code generation verified
- Phase 2: Launcher-side implementation in
pkg/launcher/shutdown.go(commit a496e87c)- ProcessManager with graceful shutdown coordination
- monitorShutdown polls status with state machine tracking
- forceShutdown escalates SIGTERM → SIGKILL on timeout
- ShutdownAll for concurrent process shutdown
- Phase 2: Unit tests (commit 385eb960) - 9 tests, all passing
- State machine, timeout enforcement, force shutdown, concurrent shutdown
- Mock gRPC client for isolated testing
- Phase 3: Client SDK in
pkg/launcher/client/shutdown_coordinator.go(commit 5ca93770)- ShutdownCoordinator implements ProcessLifecycleInterface server
- ShutdownMetrics tracking with thread-safe accessors
- Run method for drain logic execution
- RequestMoreTime for extension requests
- Phase 3: Integration example (commit ced08cc0)
- ExampleShutdownIntegration for keyvalue-runner
- Demonstrates coordinator.Run(ctx, drainFunc) pattern
Phase 5 (Process Groups): ✅ Complete
- ProcessGroupManager in
pkg/launcher/process_groups.go(commit 9bce7165)- ProcessGroupConfig with desired/min_healthy/max_surge settings
- RollingReplace algorithm: start new → wait ready → shutdown old
- Automatic rollback on readiness failure
- ShutdownGroup for concurrent shutdown
Testing Infrastructure: ✅ Complete
- Test runner with CLI in
tests/testing/testrunner/main.go(commit 5d3cc04b)- 5 behavior modes: clean, slow-drain, hang, request-more, crash
- Full RFC-041 lifecycle implementation
- Suitable for integration testing
- Integration test suite in
tests/integration/shutdown/(commit TBD)- 7 comprehensive tests covering all behaviors
- TestShutdownHarness with procmgr and launcher integration
- Process group and rolling replacement validation
- Make targets:
test-integration-shutdown,test-integration-shutdown-race - Full documentation in
tests/integration/shutdown/README.md
Not Started:
- Phase 4: Admin delegation (RPC forwarding to launcher)
- Phase 6: Observability (Prometheus metrics, dashboards)
Context
Currently, the Prism launcher (prismctl) spawns and manages subprocess patterns (keyvalue-runner, pubsub-runner, etc.) but lacks a graceful shutdown mechanism. When processes need to terminate, the launcher likely uses signals (SIGTERM/SIGKILL) or abrupt termination, which can lead to:
- Data loss: In-flight requests or buffered data not flushed
- Resource leaks: Open connections, file handles, or backend sessions not cleaned up
- Inconsistent state: Backends left in partially-updated states
- Poor observability: No visibility into shutdown progress or blocking operations
This RFC proposes a cooperative shutdown handshake protocol that allows processes to:
- Acknowledge shutdown requests
- Report shutdown progress
- Complete in-flight work with bounded time limits
- Signal completion or request additional time
- Be forcefully terminated only after grace periods expire
Goals
- Cooperative shutdown: Processes acknowledge and participate in shutdown
- Bounded termination: All processes shut down within predictable time limits
- Progress visibility: Launcher can monitor shutdown state transitions
- Delegated control: Admin can request shutdown of specific processes or groups
- Process groups: Launcher manages 1+ instances of the same pattern
- Rolling replacement: Zero-downtime upgrades via new process → old process handoff
- Readiness signaling: Processes signal when ready to accept traffic
- Backward compatibility: Non-participating processes still shut down (degraded to SIGTERM/SIGKILL)
- Testability: Shutdown behavior can be tested independently
Non-Goals
- Distributed coordination: This RFC covers single-launcher scope only (not multi-node)
- Checkpoint/restart: We don't preserve and restore process state
- Client-side coordination: Client SDKs are not involved in process shutdown
- Load balancing: Traffic distribution across group members (handled by proxy layer)
Proposal
Process Groups and Lifecycle Management
The launcher manages process groups: one or more instances of the same pattern. Each group has:
type ProcessGroup struct {
Name string // "keyvalue-runner"
Instances map[string]*ManagedProcess // "kv-1", "kv-2", etc.
Desired int // Target instance count
Config ProcessGroupConfig
}
type ProcessGroupConfig struct {
MinHealthyInstances int // Minimum ready instances during rolling restart
MaxSurge int // Max extra instances during rolling restart (default: 1)
HealthCheckTimeout time.Duration // Time to wait for new instance readiness (default: 30s)
}
Example: keyvalue-runner group with 2 instances
- Current:
kv-1(READY),kv-2(READY) - During rolling restart:
kv-3(STARTING) →kv-1(DRAINING) →kv-3(READY) →kv-1(TERMINATED)
Three-Phase Shutdown Protocol
Phase 1: SHUTDOWN_REQUESTED (0-3s)
- Launcher sends ShutdownRequest
- Process acknowledges with ShutdownAck
- Process stops accepting new work
- Process begins draining in-flight work
Phase 2: SHUTDOWN_DRAINING (3-10s configurable)
- Process reports progress via ShutdownStatus
- Process may request extension via ShutdownStatus.need_more_time
- Launcher monitors and logs progress
Phase 3: SHUTDOWN_COMPLETE or SHUTDOWN_FORCED
- Process sends ShutdownComplete when clean
- OR launcher sends SIGTERM → SIGKILL after deadline
Protocol Messages (gRPC)
// New service in prism/interfaces/lifecycle.proto
service ProcessLifecycleInterface {
// Request graceful shutdown of a process
rpc Shutdown(ShutdownRequest) returns (ShutdownAck);
// Query shutdown status (polled by launcher)
rpc GetShutdownStatus(ShutdownStatusRequest) returns (ShutdownStatus);
// Process signals shutdown completion (push from process)
rpc NotifyShutdownComplete(ShutdownComplete) returns (ShutdownCompleteAck);
// Query readiness for accepting traffic (polled by launcher during startup)
rpc GetReadinessStatus(ReadinessRequest) returns (ReadinessResponse);
// Process signals readiness (push from process, optional optimization)
rpc NotifyReady(ReadyNotification) returns (ReadyAck);
}
message ShutdownRequest {
// Process ID to shut down
string process_id = 1;
// Reason for shutdown (admin request, launcher termination, etc.)
string reason = 2;
// Initial grace period in seconds (default: 3s)
int32 grace_period_seconds = 3;
// Maximum total time before forced kill (default: 10s)
int32 max_shutdown_seconds = 4;
}
message ShutdownAck {
// Whether process accepted shutdown request
bool acknowledged = 1;
// Estimated time to complete (seconds)
int32 estimated_seconds = 2;
// Any warnings or notes
string message = 3;
}
message ShutdownStatusRequest {
string process_id = 1;
}
message ShutdownStatus {
enum State {
RUNNING = 0; // Normal operation
SHUTDOWN_REQUESTED = 1; // Received request, starting drain
SHUTDOWN_DRAINING = 2; // Actively draining work
SHUTDOWN_BLOCKED = 3; // Blocked on external resource
SHUTDOWN_COMPLETE = 4; // Clean shutdown finished
SHUTDOWN_FORCED = 5; // Killed by launcher
}
State state = 1;
// Human-readable status message
string message = 2;
// Metrics about what's being drained
ShutdownMetrics metrics = 3;
// Whether process needs more time beyond initial grace period
bool need_more_time = 4;
// Additional seconds requested (only if need_more_time = true)
int32 additional_seconds = 5;
}
message ShutdownMetrics {
// In-flight requests still processing
int32 in_flight_requests = 1;
// Backend connections being closed
int32 open_connections = 2;
// Buffered data to flush (bytes)
int64 buffered_bytes = 3;
// Any blocking operations (e.g., "waiting for Kafka flush")
repeated string blocking_operations = 4;
}
message ShutdownComplete {
string process_id = 1;
// Final status message
string message = 2;
// Total shutdown time (milliseconds)
int64 shutdown_duration_ms = 3;
}
message ShutdownCompleteAck {
bool acknowledged = 1;
}
// Readiness check messages (for rolling replacement)
message ReadinessRequest {
string process_id = 1;
}
message ReadinessResponse {
enum ReadinessState {
STARTING = 0; // Process initializing
WARMING = 1; // Backend connections warming up
READY = 2; // Ready to accept traffic
DRAINING = 3; // Shutting down, not accepting new work
UNHEALTHY = 4; // Health check failed
}
ReadinessState state = 1;
// Human-readable status
string message = 2;
// Readiness checks (all must pass for READY state)
ReadinessChecks checks = 3;
}
message ReadinessChecks {
// gRPC server listening
bool grpc_server_ready = 1;
// Backend connection established
bool backend_connected = 2;
// Backend warmup complete (connection pool filled, etc.)
bool backend_warmed = 3;
// Any custom checks (e.g., schema migration complete)
map<string, bool> custom_checks = 4;
}
message ReadyNotification {
string process_id = 1;
string message = 2;
}
message ReadyAck {
bool acknowledged = 1;
}
Handshake Flows
Flow 0: Process Startup and Readiness
┌─────────┐ ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ exec(keyvalue-runner) │
│───────────────────────────────────────>│
│ │ gRPC server
│ GetReadinessStatus() [polling] │ starting...
│───────────────────────────────────────>│
│ │
│ ReadinessResponse(STARTING) │
│<───────────────────────────────────────│
│ │
│ GetReadinessStatus() │
│───────────────────────────────────────>│
│ │ Backend
│ ReadinessResponse(WARMING) │ connecting...
│<───────────────────────────────────────│
│ │
│ GetReadinessStatus() │
│───────────────────────────────────────>│
│ │ All checks
│ ReadinessResponse(READY, checks=OK) │ passed
│<───────────────────────────────────────│
│ │
│ [Launcher adds to routing table] │ Accepting
│ │ traffic
Flow 1: Launcher → Subprocess (Direct Shutdown)
┌─────────┐ ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ ShutdownRequest(grace=3s, max=10s) │
│───────────────────────────────────────>│
│ │ Stop accepting
│ ShutdownAck(estimated=2s) │ new requests
│<──────────── ───────────────────────────│
│ │
│ GetShutdownStatus() [polling] │
│───────────────────────────────────────>│
│ │ Drain work
│ ShutdownStatus(DRAINING, 5 inflight) │
│<───────────────────────────────────────│
│ │
│ GetShutdownStatus() │
│───────────────────────────────────────>│
│ │ All work done
│ ShutdownStatus(DRAINING, 0 inflight) │
│<────── ─────────────────────────────────│
│ │
│ NotifyShutdownComplete(duration=1.8s) │
│<───────────────────────────────────────│
│ │ Exit(0)
│ ShutdownCompleteAck() │
│───────────────────────────────────────>│
│ X
Flow 2: Admin → Launcher → Subprocess (Delegated)
┌──────┐ ┌─────────┐ ┌────────────────┐
│Admin │ │Launcher │ │Subprocess (KV) │
│ │ │ ProcMgr │ │ │
└──┬───┘ └────┬────┘ └───────┬────────┘
│ │ │
│ StopProcess(kv-1)│ │
│──────────────────>│ │
│ │ ShutdownRequest() │
│ │──────────────────────>│
│ │ │
│ │ ShutdownAck() │
│ │<──────────────────────│
│ Ack(stopping) │ │
│<──────────────────│ │
│ │ │
│ GetProcessStatus()│ │
│──────────────────>│ GetShutdownStatus() │
│ │──────────────────────>│
│ │ ShutdownStatus() │
│ │<──────────────────────│
│ ProcessStatus() │ │
│<──────────────────│ │
│ │ │
│ │ NotifyShutdownComplete()
│ │<──────────────────────│
│ ProcessStopped() │ X
│<──────────────────│
Flow 3: Timeout and Forced Kill
┌─────────┐ ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ ShutdownRequest(grace=3s, max=10s) │
│───────────────────────────────────────>│
│ │
│ ShutdownAck(estimated=2s) │
│<───────────────────────────────────────│
│ │
│ GetShutdownStatus() [polling] │
│───────────────────────────────────────>│
│ │ Still draining
│ ShutdownStatus(DRAINING, need_more_time=true)
│<───────────────────────────────────────│
│ │
│ [3s grace period expires] │
│ GetShutdownStatus() │
│───────────────────────────────────────>│
│ │ Still stuck
│ ShutdownStatus(BLOCKED, "Kafka flush")│
│<───────────────────────────────────────│
│ │
│ [10s max deadline expires] │
│ SIGTERM │
│───────────────────────────────────────>│
│ │ Attempt cleanup
│ [2s SIGTERM timeout] │
│ SIGKILL │
│───────────────────────────────────────>│
│ X Process killed
│ Log: SHUTDOWN_FORCED (12.5s)
Process State Machine (Extended for Rolling Restart)
┌──────────┐
│ SPAWNING │ (launcher starts process)
└────┬─────┘
│
│ gRPC server starts
v
┌──────────┐
│ STARTING │ (process initializing)
└────┬─────┘
│
│ Backend connecting
v
┌──────────┐
│ WARMING │ (connection pools filling)
└────┬─────┘
│
│ All checks pass
v
┌──────────┐
│ READY │ (accepting traffic) ◄──┐
└────┬─────┘ │
│ │
│ ShutdownRequest() │ Still healthy
v │
┌──────────────────┐ │
│SHUTDOWN_REQUESTED│ │
└────┬─────────────┘ │
│ │
│ Stop accepting new work │
v │
┌──────────────┐ need_more_time │
│ DRAINING │◄───────────────────┘
└──┬───────┬───┘
│ │
│ │ Deadline expires
│ v
│ ┌────────┐
│ │ FORCED │ (SIGTERM → SIGKILL)
│ └────────┘
│
│ All work complete
v
┌──────────┐
│ COMPLETE │ (exit 0)
└──────────┘
Unhealthy at any point → UNHEALTHY → FORCED
Implementation Components
1. Launcher Process Manager (pkg/launcher/procmgr)
// ProcessManager manages subprocess lifecycle
type ProcessManager struct {
processes map[string]*ManagedProcess
config ShutdownConfig
}
type ShutdownConfig struct {
DefaultGracePeriod time.Duration // 3s
MaxShutdownDuration time.Duration // 10s
StatusPollInterval time.Duration // 500ms
SIGTERMTimeout time.Duration // 2s
}
type ManagedProcess struct {
ID string
Cmd *exec.Cmd
LifecycleClient lifecycle.ProcessLifecycleInterfaceClient
State ShutdownState
ShutdownStart time.Time
}
// InitiateShutdown starts graceful shutdown handshake
func (pm *ProcessManager) InitiateShutdown(ctx context.Context, processID string) error {
proc := pm.processes[processID]
// Send shutdown request
ack, err := proc.LifecycleClient.Shutdown(ctx, &lifecycle.ShutdownRequest{
ProcessId: processID,
Reason: "launcher shutdown",
GracePeriodSeconds: int32(pm.config.DefaultGracePeriod.Seconds()),
MaxShutdownSeconds: int32(pm.config.MaxShutdownDuration.Seconds()),
})
if err != nil {
// Process doesn't support graceful shutdown, fall back to SIGTERM
return pm.forceShutdown(proc)
}
proc.State = ShutdownStateRequested
proc.ShutdownStart = time.Now()
// Start monitoring goroutine
go pm.monitorShutdown(ctx, proc)
return nil
}
// monitorShutdown polls process until complete or timeout
func (pm *ProcessManager) monitorShutdown(ctx context.Context, proc *ManagedProcess) {
ticker := time.NewTicker(pm.config.StatusPollInterval)
defer ticker.Stop()
gracePeriodDeadline := proc.ShutdownStart.Add(pm.config.DefaultGracePeriod)
maxDeadline := proc.ShutdownStart.Add(pm.config.MaxShutdownDuration)
for {
select {
case <-ticker.C:
status, err := proc.LifecycleClient.GetShutdownStatus(ctx, &lifecycle.ShutdownStatusRequest{
ProcessId: proc.ID,
})
if err != nil {
// Lost connection, assume dead
pm.handleProcessGone(proc)
return
}
switch status.State {
case lifecycle.ShutdownStatus_SHUTDOWN_COMPLETE:
pm.handleShutdownComplete(proc, status)
return
case lifecycle.ShutdownStatus_SHUTDOWN_DRAINING:
pm.logShutdownProgress(proc, status)
// Check if we're past grace period
if time.Now().After(gracePeriodDeadline) {
if status.NeedMoreTime {
// Allow extension up to max deadline
pm.logExtension(proc, status)
}
}
// Check if we're past max deadline
if time.Now().After(maxDeadline) {
pm.forceShutdown(proc)
return
}
case lifecycle.ShutdownStatus_SHUTDOWN_BLOCKED:
pm.logBlocked(proc, status)
if time.Now().After(maxDeadline) {
pm.forceShutdown(proc)
return
}
}
case <-ctx.Done():
// Launcher itself shutting down, force kill
pm.forceShutdown(proc)
return
}
}
}
// forceShutdown escalates to SIGTERM then SIGKILL
func (pm *ProcessManager) forceShutdown(proc *ManagedProcess) error {
log.Warn().
Str("process_id", proc.ID).
Dur("elapsed", time.Since(proc.ShutdownStart)).
Msg("Forcing shutdown with SIGTERM")
proc.Cmd.Process.Signal(syscall.SIGTERM)
// Wait for SIGTERM timeout
done := make(chan error)
go func() {
done <- proc.Cmd.Wait()
}()
select {
case <-time.After(pm.config.SIGTERMTimeout):
log.Error().
Str("process_id", proc.ID).
Msg("SIGTERM timeout, sending SIGKILL")
proc.Cmd.Process.Kill()
case <-done:
// Process exited
}
proc.State = ShutdownStateForced
return nil
}
2. Launcher Client SDK (pkg/launcher/client)
// ShutdownCoordinator handles graceful shutdown in subprocess
type ShutdownCoordinator struct {
processID string
shutdownChan chan ShutdownRequest
completeChan chan struct{}
state atomic.Value // ShutdownState
metrics ShutdownMetrics
}
// NewShutdownCoordinator creates coordinator and registers gRPC service
func NewShutdownCoordinator(processID string, grpcServer *grpc.Server) *ShutdownCoordinator {
sc := &ShutdownCoordinator{
processID: processID,
shutdownChan: make(chan ShutdownRequest, 1),
completeChan: make(chan struct{}),
}
sc.state.Store(lifecycle.ShutdownStatus_RUNNING)
lifecycle.RegisterProcessLifecycleInterfaceServer(grpcServer, sc)
return sc
}
// Shutdown implements ProcessLifecycleInterface
func (sc *ShutdownCoordinator) Shutdown(ctx context.Context, req *lifecycle.ShutdownRequest) (*lifecycle.ShutdownAck, error) {
log.Info().
Str("reason", req.Reason).
Int32("grace_period", req.GracePeriodSeconds).
Msg("Received shutdown request")
sc.shutdownChan <- *req
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_REQUESTED)
return &lifecycle.ShutdownAck{
Acknowledged: true,
EstimatedSeconds: 2,
Message: "Beginning graceful shutdown",
}, nil
}
// GetShutdownStatus implements ProcessLifecycleInterface
func (sc *ShutdownCoordinator) GetShutdownStatus(ctx context.Context, req *lifecycle.ShutdownStatusRequest) (*lifecycle.ShutdownStatus, error) {
state := sc.state.Load().(lifecycle.ShutdownStatus_State)
return &lifecycle.ShutdownStatus{
State: state,
Message: sc.getStatusMessage(),
Metrics: &lifecycle.ShutdownMetrics{
InFlightRequests: int32(sc.metrics.InFlightRequests),
OpenConnections: int32(sc.metrics.OpenConnections),
BufferedBytes: sc.metrics.BufferedBytes,
BlockingOperations: sc.metrics.BlockingOperations,
},
NeedMoreTime: sc.needsMoreTime(),
AdditionalSeconds: sc.estimateAdditionalTime(),
}, nil
}
// Run starts shutdown coordination loop (in subprocess main)
func (sc *ShutdownCoordinator) Run(ctx context.Context, drainFunc func(context.Context) error) error {
select {
case req := <-sc.shutdownChan:
log.Info().Msg("Starting shutdown drain")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_DRAINING)
// Execute application-specific drain logic
drainCtx, cancel := context.WithTimeout(ctx, time.Duration(req.MaxShutdownSeconds)*time.Second)
defer cancel()
if err := drainFunc(drainCtx); err != nil {
log.Error().Err(err).Msg("Drain failed")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_BLOCKED)
return err
}
log.Info().Msg("Drain complete")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_COMPLETE)
close(sc.completeChan)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
3. Pattern Implementation Example
// keyvalue-runner main.go
func main() {
// ... setup code ...
grpcServer := grpc.NewServer()
// Register pattern service
keyvalue.RegisterKeyValueBasicInterfaceServer(grpcServer, kvService)
// Register shutdown coordinator
shutdownCoordinator := client.NewShutdownCoordinator("keyvalue-runner-1", grpcServer)
// Start serving
go grpcServer.Serve(lis)
// Wait for shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := shutdownCoordinator.Run(ctx, func(drainCtx context.Context) error {
// Application-specific drain logic
log.Info().Msg("Stopping new request acceptance")
kvService.StopAcceptingRequests()
// Wait for in-flight requests (with timeout)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
inFlight := kvService.GetInFlightCount()
if inFlight == 0 {
break
}
shutdownCoordinator.SetMetric("in_flight_requests", inFlight)
select {
case <-ticker.C:
continue
case <-drainCtx.Done():
return fmt.Errorf("drain timeout with %d in-flight", inFlight)
}
}
// Close backend connections
log.Info().Msg("Closing backend connections")
if err := kvService.CloseBackend(); err != nil {
return fmt.Errorf("backend close failed: %w", err)
}
// Stop gRPC server gracefully
grpcServer.GracefulStop()
return nil
})
if err != nil {
log.Error().Err(err).Msg("Shutdown failed")
os.Exit(1)
}
log.Info().Msg("Clean shutdown complete")
os.Exit(0)
}
4. Admin Delegated Shutdown
// Admin service (prism-admin) delegates to launcher
func (a *AdminService) StopProcess(ctx context.Context, req *admin.StopProcessRequest) (*admin.StopProcessResponse, error) {
// Find launcher managing this process
launcher, err := a.registry.GetLauncherForProcess(req.ProcessId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "process not found: %v", err)
}
// Forward shutdown request to launcher
shutdownReq := &launcher.ShutdownProcessRequest{
ProcessId: req.ProcessId,
Reason: "admin request",
GracePeriod: req.GracePeriodSeconds,
}
resp, err := launcher.ProcessMgrClient.ShutdownProcess(ctx, shutdownReq)
if err != nil {
return nil, status.Errorf(codes.Internal, "shutdown failed: %v", err)
}
return &admin.StopProcessResponse{
Acknowledged: true,
Message: resp.Message,
}, nil
}
// Admin can also query shutdown status
func (a *AdminService) GetProcessStatus(ctx context.Context, req *admin.GetProcessStatusRequest) (*admin.ProcessStatusResponse, error) {
launcher, err := a.registry.GetLauncherForProcess(req.ProcessId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "process not found: %v", err)
}
statusResp, err := launcher.ProcessMgrClient.GetProcessStatus(ctx, &launcher.ProcessStatusRequest{
ProcessId: req.ProcessId,
})
// ... convert and return status ...
}
Tradeoffs and Design Decisions
1. Pull vs Push Status Updates
Decision: Hybrid (Pull with Push Completion)
- Launcher polls subprocess for status (GetShutdownStatus)
- Subprocess pushes final completion (NotifyShutdownComplete)
Rationale:
- Pull gives launcher control over monitoring frequency
- Push completion avoids unnecessary polling after success
- Subprocess doesn't need to know launcher endpoint
Tradeoff: More network calls vs simpler error handling
2. Grace Period Values
Decision: 3s initial grace, 10s max deadline (configurable)
Rationale:
- 3s is typically sufficient for request draining (P99 < 1s)
- 10s covers edge cases (slow backends, large buffers)
- Configurable per pattern or deployment
Tradeoff: Fast shutdowns vs data safety
3. Extension Requests vs Fixed Deadline
Decision: Allow extensions via need_more_time flag, but enforce max deadline
Rationale:
- Some operations genuinely need more time (Kafka flush, transaction commit)
- Max deadline prevents infinite hangs
- Process must explicitly request extension (no automatic grants)
Tradeoff: Flexibility vs bounded termination
4. State Machine Complexity
Decision: Simple 5-state machine (RUNNING, REQUESTED, DRAINING, COMPLETE, FORCED)
Rationale:
- Enough states to track progress without over-engineering
- BLOCKED state provides visibility into why shutdown is stuck
- Single-direction transitions (no state reversals)
Tradeoff: Limited expressiveness vs implementation simplicity
5. Backward Compatibility
Decision: Fall back to SIGTERM/SIGKILL if gRPC shutdown not available
Rationale:
- Processes can adopt gradually
- Non-Go processes (if added) can use signals
- Launcher doesn't hard-fail if process doesn't support protocol
Tradeoff: Mixed shutdown behaviors during migration
6. Process Groups
Decision: Deferred to future RFC (not in scope)
Rationale:
- Single-process shutdown is sufficient for initial implementation
- Group semantics (parallel vs sequential) need separate design
- Can build on top of this protocol
Tradeoff: Admin must request shutdown per-process initially
Implementation Criteria
Must Have (P0)
- Protocol definition:
ProcessLifecycleInterfaceservice with shutdown + readiness RPCs - Process groups: Launcher manages 1+ instances per pattern with desired/min_healthy counts
- Rolling replacement: Algorithm for zero-downtime instance upgrades
- Readiness checks: Launcher polls GetReadinessStatus until READY
- Launcher integration: ProcMgr implements handshake, monitoring, and replacement logic
- Client SDK:
ShutdownCoordinator+ReadinessCoordinatorinpkg/launcher/client - Timeout enforcement: SIGTERM/SIGKILL escalation after max deadline
- Rollback: Kill new instance and keep old if readiness fails
- Observability: Structured logs for all state transitions (startup, ready, draining, complete)
- Tests: Unit tests for state machine + integration test for rolling replacement
Should Have (P1)
- Admin delegation: Admin can request process shutdown or rolling restart via launcher
- Metrics: Prometheus metrics for startup time, shutdown duration, readiness failures, rollback rate
- Configurable timeouts: Per-pattern grace period, max shutdown, and health check timeout
- Status API: Admin can query shutdown progress and group health
- Example implementation: keyvalue-runner demonstrates drain + readiness logic
- Health check customization: Processes can define custom readiness checks
Could Have (P2)
- Pre-shutdown hooks: Configurable actions before shutdown (e.g., deregister from service discovery)
- Shutdown dry-run: Admin can test what would be drained without actually shutting down
- Canary deployments: Staged rollouts (1 instance → wait → rest of group)
- Custom readiness checks: Plugin-defined health checks beyond backend connectivity
- Graceful degradation: Continue with fewer instances if replacement fails repeatedly
Won't Have (This RFC)
- Multi-launcher coordination: Cross-node shutdown orchestration
- Client SDK notification: Telling clients to disconnect before shutdown
- State persistence: Saving process state for resume
- Dynamic scaling: Auto-scaling based on load (separate RFC)
- Blue-green deployments: Full environment swaps (use orchestrator like Kubernetes)
Testing Strategy
Unit Tests
// Test state machine transitions
func TestShutdownStateMachine(t *testing.T) {
// RUNNING → REQUESTED → DRAINING → COMPLETE
}
// Test timeout enforcement
func TestShutdownTimeoutForceKill(t *testing.T) {
// Process doesn't complete, SIGTERM/SIGKILL sent
}
// Test extension requests
func TestShutdownExtensionRequest(t *testing.T) {
// Process requests more time, granted up to max deadline
}
Integration Tests
Implementation: tests/integration/shutdown/graceful_shutdown_test.go
The integration test suite validates all shutdown behaviors using a configurable test runner:
# Run all shutdown integration tests
make test-integration-shutdown
# Run with race detector
make test-integration-shutdown-race
Test Coverage
| Test | Behavior | Validates |
|---|---|---|
TestGracefulShutdownClean | Clean drain within grace period | Fast completion (<3s) |
TestGracefulShutdownSlowDrain | Slow but successful drain | Respects drain duration |
TestGracefulShutdownHang | Never completes drain | Force kill after timeout |
TestGracefulShutdownRequestMore | Requests extension | Extension granted |
TestGracefulShutdownCrash | Crashes during drain | Crash detection |
TestProcessGroupShutdown | Multiple instances | Parallel shutdown |
TestRollingReplacement | Zero-downtime replacement | MinHealthy maintained |
Test Runner
The testrunner binary simulates pattern subprocess behavior:
testrunner \
--process-id "test-1" \
--port 50000 \
--behavior "clean|slow-drain|hang|request-more|crash" \
--drain-duration 2s \
--work-duration 100ms \
--initial-work 5
Behaviors:
clean: Drains all work and exits cleanly (happy path)slow-drain: Takes full drain duration but succeedshang: Never completes, forces timeout and SIGKILLrequest-more: Requests additional shutdown time (5s)crash: Crashes (exit code 2) during drain
Test Architecture
TestShutdownHarness
├── procmgr.ProcessManager (OS process lifecycle)
├── launcher.ProcessManager (gRPC shutdown coordination)
└── launcher.ProcessGroupManager (rolling replacement)
└── spawns testrunner subprocesses
Documentation: See tests/integration/shutdown/README.md for detailed usage.
Acceptance Tests
// Test admin delegation
func TestAdminDelegatedShutdown(t *testing.T) {
// 1. Admin sends StopProcess request
// 2. Launcher receives and forwards to subprocess
// 3. Subprocess drains and completes
// 4. Admin queries status and sees COMPLETE
}
Migration Path
Phase 1: Protocol Definition (Week 1)
- Add
ProcessLifecycleInterfaceto proto - Generate Go code
- Add to
pkg/launcher/clientSDK
Phase 2: Launcher Integration (Week 2)
- Implement
ProcMgr.InitiateShutdown() - Add monitoring loop and timeout logic
- Add fallback to SIGTERM/SIGKILL
- Unit tests for launcher-side logic
Phase 3: Pattern Integration (Week 3)
- Integrate
ShutdownCoordinatorinto keyvalue-runner - Implement drain logic for in-flight requests
- Add backend connection cleanup
- Integration test: launcher → keyvalue-runner shutdown
Phase 4: Admin Delegation (Week 4)
- Add
StopProcessRPC to admin service - Forward to launcher ProcessMgr
- Add status query delegation
- E2E test: admin → launcher → subprocess
Phase 5: Process Groups and Rolling Replacement (Week 5-6)
- Implement
ProcessGroupmanagement in ProcMgr - Add
RollingReplace()algorithm - Add readiness polling and rollback logic
- Integration test: rolling restart of keyvalue-runner group
- Configuration: YAML support for desired/min_healthy/max_surge
Phase 6: Observability (Week 7)
- Add Prometheus metrics for startup time, shutdown duration, rollback rate
- Add structured logging for all state transitions
- Dashboard showing process group health and replacement progress
- Runbook for debugging stuck shutdowns and failed readiness
Alternatives Considered
Alternative 1: Signal-Only Shutdown
Approach: Use SIGUSR1 for "graceful shutdown signal", no gRPC handshake
Pros:
- Simpler, no protocol design
- Works with any subprocess (not gRPC-specific)
Cons:
- No visibility into shutdown progress
- No bounded timeouts (or need complex signal chaining)
- No status reporting (why is drain blocked?)
- Hard to test shutdown logic independently
Verdict: Rejected due to lack of observability
Alternative 2: HTTP/REST Shutdown Endpoint
Approach: Each process exposes POST /shutdown endpoint
Pros:
- Simpler than gRPC (just HTTP)
- Easy to test with curl
Cons:
- Mixing HTTP and gRPC is awkward architecturally
- No bidirectional streaming (can't push completion)
- Polling still required for status
- Every process needs HTTP server + gRPC server
Verdict: Rejected, doesn't fit gRPC-centric architecture
Alternative 3: Shared Memory or File-Based Coordination
Approach: Launcher writes to /tmp/shutdown-<pid>, process polls file
Pros:
- No network calls
- Very low overhead
Cons:
- No remote shutdown (admin delegation impossible)
- Platform-specific (file semantics differ)
- Hard to convey rich status (metrics, blocking operations)
- Polling file is clunky
Verdict: Rejected, doesn't scale to admin delegation
Alternative 4: Context Cancellation Only
Approach: Launcher cancels subprocess's root context, waits for exit
Pros:
- Idiomatic Go pattern
- Minimal infrastructure
Cons:
- No visibility into shutdown progress
- No bounded timeouts (or hard to enforce)
- Launcher can't distinguish "slow drain" from "hung process"
- No protocol for admin delegation
Verdict: Rejected, context is internal-only mechanism
Rolling Replacement Flow
Flow 4: Rolling Restart of Process Group
Initial state: keyvalue-runner group with 2 instances (Desired=2, MinHealthy=1)
- kv-1: READY
- kv-2: READY
Step 1: Launch replacement for kv-1
┌─────────┐
│Launcher │ exec(kv-3) ┌────┐
│ ProcMgr │──────────────────────────>│kv-3│ SPAWNING
└────┬────┘ └─┬──┘
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ STARTING
│ ReadinessResponse(STARTING) │
│<─────────────────────────────────│
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ WARMING
│ ReadinessResponse(WARMING) │
│<─────────────────────────────────│
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ READY
│ ReadinessResponse(READY) │
│<─────────────────────────────────│
State: kv-1 (READY), kv-2 (READY), kv-3 (READY) - 3 instances (over desired)
Step 2: Drain and stop kv-1
┌─────────┐
│Launcher │ ShutdownRequest() ┌────┐
│ ProcMgr │──────────────────────────>│kv-1│ SHUTDOWN_REQUESTED
└────┬────┘ └─┬──┘
│ │ Stop accepting
│ ShutdownAck() │ new work
│<─────────────────────────────────│
│ │
│ [Launcher removes kv-1 from │
│ routing table] │
│ │
│ GetShutdownStatus() │
│─────────────────────────────────>│ DRAINING
│ ShutdownStatus(DRAINING) │
│<─────────────────────────────────│
│ │
│ NotifyShutdownComplete() │
│<─────────────────────────────────│ COMPLETE
│ X exit(0)
State: kv-2 (READY), kv-3 (READY) - 2 instances (matches desired)
Step 3: Repeat for kv-2 (launch kv-4, drain kv-2)
Final state: kv-3 (READY), kv-4 (READY) - all instances replaced
Rolling Replacement Algorithm
// RollingReplace replaces all instances in a process group
func (pm *ProcessManager) RollingReplace(ctx context.Context, groupName string, newConfig ProcessConfig) error {
group := pm.groups[groupName]
// Validate constraints
if group.Config.MinHealthyInstances > group.Desired {
return fmt.Errorf("MinHealthyInstances (%d) > Desired (%d)",
group.Config.MinHealthyInstances, group.Desired)
}
maxSurge := group.Config.MaxSurge
if maxSurge == 0 {
maxSurge = 1 // Default: allow 1 extra instance
}
// Replace each instance one-by-one
for _, oldInstance := range group.Instances {
// Step 1: Start new instance
newInstance, err := pm.startProcess(ctx, groupName, newConfig)
if err != nil {
return fmt.Errorf("failed to start new instance: %w", err)
}
// Step 2: Wait for new instance to be READY (with timeout)
readyCtx, cancel := context.WithTimeout(ctx, group.Config.HealthCheckTimeout)
err = pm.waitForReady(readyCtx, newInstance)
cancel()
if err != nil {
// Rollback: kill new instance, keep old one
pm.forceShutdown(newInstance)
return fmt.Errorf("new instance failed to become ready: %w", err)
}
// Step 3: Add new instance to routing (if applicable)
pm.addToRouting(newInstance)
// Step 4: Initiate graceful shutdown of old instance
if err := pm.InitiateShutdown(ctx, oldInstance.ID); err != nil {
log.Warn().Err(err).Msg("Graceful shutdown failed, forcing")
pm.forceShutdown(oldInstance)
}
// Step 5: Wait for old instance to complete (or force after timeout)
// This is handled by monitorShutdown() goroutine
// Continue to next instance
}
return nil
}
// waitForReady polls readiness until READY or timeout
func (pm *ProcessManager) waitForReady(ctx context.Context, proc *ManagedProcess) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
resp, err := proc.LifecycleClient.GetReadinessStatus(ctx, &lifecycle.ReadinessRequest{
ProcessId: proc.ID,
})
if err != nil {
return fmt.Errorf("readiness check failed: %w", err)
}
switch resp.State {
case lifecycle.ReadinessResponse_READY:
log.Info().
Str("process_id", proc.ID).
Msg("Process ready")
return nil
case lifecycle.ReadinessResponse_UNHEALTHY:
return fmt.Errorf("process unhealthy: %s", resp.Message)
case lifecycle.ReadinessResponse_STARTING, lifecycle.ReadinessResponse_WARMING:
log.Debug().
Str("process_id", proc.ID).
Str("state", resp.State.String()).
Msg("Waiting for ready")
// Continue polling
}
case <-ctx.Done():
return fmt.Errorf("readiness timeout: %w", ctx.Err())
}
}
}
Process Group Configuration
# launcher-config.yaml
process_groups:
- name: keyvalue-runner
desired_instances: 2
min_healthy_instances: 1 # At least 1 READY during rolling restart
max_surge: 1 # Allow 1 extra instance temporarily
health_check_timeout: 30s # Wait up to 30s for new instance readiness
shutdown:
grace_period: 3s
max_duration: 10s
- name: pubsub-runner
desired_instances: 3
min_healthy_instances: 2
max_surge: 1
health_check_timeout: 45s # Longer for backend connection warmup
shutdown:
grace_period: 5s # More time for message acks
max_duration: 15s
Rollback on Failed Replacement
If a new instance fails to become READY:
- Immediate rollback: Kill new instance, keep old one
- Preserve availability: Old instance continues serving traffic
- Alert operator: Log error and emit metric
- Retry policy: Configurable (immediate retry, exponential backoff, manual)
func (pm *ProcessManager) RollingReplace(ctx context.Context, groupName string, newConfig ProcessConfig) error {
// ... start new instance ...
err = pm.waitForReady(readyCtx, newInstance)
if err != nil {
// ROLLBACK: Kill new instance
log.Error().
Err(err).
Str("old_instance", oldInstance.ID).
Str("new_instance", newInstance.ID).
Msg("Rolling replace failed, rolling back")
pm.forceShutdown(newInstance)
pm.recordMetric("rolling_replace_failures", 1)
// OLD INSTANCE CONTINUES RUNNING
return fmt.Errorf("rollback executed: %w", err)
}
// Success: proceed with draining old instance
}
Success Metrics
- Shutdown success rate: >99% of shutdowns complete cleanly (exit 0) within grace period
- Forced kill rate: <1% of shutdowns require SIGKILL
- Shutdown duration: P50 <2s, P99 <5s, P100 <10s
- Data loss: Zero reported data loss incidents due to abrupt termination
- Observability: 100% of shutdowns have structured logs with state transitions
- Adoption: All pattern runners implement graceful shutdown within 2 sprints
- Rolling restart success: >99% of rolling restarts complete without downtime
- Startup time: P50 <5s, P99 <15s for new instances to reach READY
- Rollback rate: <1% of rolling restarts require rollback due to failed readiness
- Zero-downtime: 100% availability maintained during rolling restarts (min_healthy honored)
Open Questions
- Health check customization: Should processes define custom readiness checks beyond backend connection? (Yes, via
ReadinessChecks.custom_checks) - Pre-shutdown hooks: Do we need plugin hooks for pre-shutdown actions (e.g., deregister from service discovery)? (Future enhancement)
- Client notification: Should we notify client SDKs before shutting down backends? (Out of scope for this RFC)
- Auto-restart vs rolling restart: How do we distinguish between crash recovery (auto-restart) and intentional upgrade (rolling restart)? (Config flag:
restart_policy: on-failurevsrolling_replace) - Canary deployments: Should we support staged rollouts (e.g., 1 instance first, then rest)? (Future RFC)
- Routing coordination: How does launcher notify proxy of instance availability changes? (Requires service discovery integration - future work)
References
- Netflix: Zuul's Graceful Shutdown
- Kubernetes: Pod Termination Lifecycle
- gRPC: Server Graceful Stop
- Cloudflare: Graceful Shutdowns in Go
Related RFCs
- RFC-008: Proxy-Plugin Architecture (patterns run as subprocesses)
- RFC-018: POC Implementation Strategy (keyvalue-runner is first pattern)
- ADR-005: Backend Plugin Architecture (drivers as libraries, not processes)
Changelog
- 2025-01-21: Initial RFC created