Skip to main content

RFC-041: Graceful Subprocess Shutdown Protocol

Status

Implemented - Implementation completed 2025-01-22

Phase 1-3 (Core Protocol): ✅ Complete

  • Phase 1: Protocol definition in proto/prism/interfaces/lifecycle.proto (commit 07227173)
    • ProcessLifecycleInterface service with Shutdown, GetShutdownStatus, NotifyShutdownComplete, GetReadinessStatus, NotifyReady RPCs
    • All message types with Process* prefix to avoid conflicts
    • Go code generation verified
  • Phase 2: Launcher-side implementation in pkg/launcher/shutdown.go (commit a496e87c)
    • ProcessManager with graceful shutdown coordination
    • monitorShutdown polls status with state machine tracking
    • forceShutdown escalates SIGTERM → SIGKILL on timeout
    • ShutdownAll for concurrent process shutdown
  • Phase 2: Unit tests (commit 385eb960) - 9 tests, all passing
    • State machine, timeout enforcement, force shutdown, concurrent shutdown
    • Mock gRPC client for isolated testing
  • Phase 3: Client SDK in pkg/launcher/client/shutdown_coordinator.go (commit 5ca93770)
    • ShutdownCoordinator implements ProcessLifecycleInterface server
    • ShutdownMetrics tracking with thread-safe accessors
    • Run method for drain logic execution
    • RequestMoreTime for extension requests
  • Phase 3: Integration example (commit ced08cc0)
    • ExampleShutdownIntegration for keyvalue-runner
    • Demonstrates coordinator.Run(ctx, drainFunc) pattern

Phase 5 (Process Groups): ✅ Complete

  • ProcessGroupManager in pkg/launcher/process_groups.go (commit 9bce7165)
    • ProcessGroupConfig with desired/min_healthy/max_surge settings
    • RollingReplace algorithm: start new → wait ready → shutdown old
    • Automatic rollback on readiness failure
    • ShutdownGroup for concurrent shutdown

Testing Infrastructure: ✅ Complete

  • Test runner with CLI in tests/testing/testrunner/main.go (commit 5d3cc04b)
    • 5 behavior modes: clean, slow-drain, hang, request-more, crash
    • Full RFC-041 lifecycle implementation
    • Suitable for integration testing
  • Integration test suite in tests/integration/shutdown/ (commit TBD)
    • 7 comprehensive tests covering all behaviors
    • TestShutdownHarness with procmgr and launcher integration
    • Process group and rolling replacement validation
    • Make targets: test-integration-shutdown, test-integration-shutdown-race
    • Full documentation in tests/integration/shutdown/README.md

Not Started:

  • Phase 4: Admin delegation (RPC forwarding to launcher)
  • Phase 6: Observability (Prometheus metrics, dashboards)

Context

Currently, the Prism launcher (prismctl) spawns and manages subprocess patterns (keyvalue-runner, pubsub-runner, etc.) but lacks a graceful shutdown mechanism. When processes need to terminate, the launcher likely uses signals (SIGTERM/SIGKILL) or abrupt termination, which can lead to:

  1. Data loss: In-flight requests or buffered data not flushed
  2. Resource leaks: Open connections, file handles, or backend sessions not cleaned up
  3. Inconsistent state: Backends left in partially-updated states
  4. Poor observability: No visibility into shutdown progress or blocking operations

This RFC proposes a cooperative shutdown handshake protocol that allows processes to:

  • Acknowledge shutdown requests
  • Report shutdown progress
  • Complete in-flight work with bounded time limits
  • Signal completion or request additional time
  • Be forcefully terminated only after grace periods expire

Goals

  1. Cooperative shutdown: Processes acknowledge and participate in shutdown
  2. Bounded termination: All processes shut down within predictable time limits
  3. Progress visibility: Launcher can monitor shutdown state transitions
  4. Delegated control: Admin can request shutdown of specific processes or groups
  5. Process groups: Launcher manages 1+ instances of the same pattern
  6. Rolling replacement: Zero-downtime upgrades via new process → old process handoff
  7. Readiness signaling: Processes signal when ready to accept traffic
  8. Backward compatibility: Non-participating processes still shut down (degraded to SIGTERM/SIGKILL)
  9. Testability: Shutdown behavior can be tested independently

Non-Goals

  1. Distributed coordination: This RFC covers single-launcher scope only (not multi-node)
  2. Checkpoint/restart: We don't preserve and restore process state
  3. Client-side coordination: Client SDKs are not involved in process shutdown
  4. Load balancing: Traffic distribution across group members (handled by proxy layer)

Proposal

Process Groups and Lifecycle Management

The launcher manages process groups: one or more instances of the same pattern. Each group has:

type ProcessGroup struct {
Name string // "keyvalue-runner"
Instances map[string]*ManagedProcess // "kv-1", "kv-2", etc.
Desired int // Target instance count
Config ProcessGroupConfig
}

type ProcessGroupConfig struct {
MinHealthyInstances int // Minimum ready instances during rolling restart
MaxSurge int // Max extra instances during rolling restart (default: 1)
HealthCheckTimeout time.Duration // Time to wait for new instance readiness (default: 30s)
}

Example: keyvalue-runner group with 2 instances

  • Current: kv-1 (READY), kv-2 (READY)
  • During rolling restart: kv-3 (STARTING) → kv-1 (DRAINING) → kv-3 (READY) → kv-1 (TERMINATED)

Three-Phase Shutdown Protocol

Phase 1: SHUTDOWN_REQUESTED (0-3s)
- Launcher sends ShutdownRequest
- Process acknowledges with ShutdownAck
- Process stops accepting new work
- Process begins draining in-flight work

Phase 2: SHUTDOWN_DRAINING (3-10s configurable)
- Process reports progress via ShutdownStatus
- Process may request extension via ShutdownStatus.need_more_time
- Launcher monitors and logs progress

Phase 3: SHUTDOWN_COMPLETE or SHUTDOWN_FORCED
- Process sends ShutdownComplete when clean
- OR launcher sends SIGTERM → SIGKILL after deadline

Protocol Messages (gRPC)

// New service in prism/interfaces/lifecycle.proto
service ProcessLifecycleInterface {
// Request graceful shutdown of a process
rpc Shutdown(ShutdownRequest) returns (ShutdownAck);

// Query shutdown status (polled by launcher)
rpc GetShutdownStatus(ShutdownStatusRequest) returns (ShutdownStatus);

// Process signals shutdown completion (push from process)
rpc NotifyShutdownComplete(ShutdownComplete) returns (ShutdownCompleteAck);

// Query readiness for accepting traffic (polled by launcher during startup)
rpc GetReadinessStatus(ReadinessRequest) returns (ReadinessResponse);

// Process signals readiness (push from process, optional optimization)
rpc NotifyReady(ReadyNotification) returns (ReadyAck);
}

message ShutdownRequest {
// Process ID to shut down
string process_id = 1;

// Reason for shutdown (admin request, launcher termination, etc.)
string reason = 2;

// Initial grace period in seconds (default: 3s)
int32 grace_period_seconds = 3;

// Maximum total time before forced kill (default: 10s)
int32 max_shutdown_seconds = 4;
}

message ShutdownAck {
// Whether process accepted shutdown request
bool acknowledged = 1;

// Estimated time to complete (seconds)
int32 estimated_seconds = 2;

// Any warnings or notes
string message = 3;
}

message ShutdownStatusRequest {
string process_id = 1;
}

message ShutdownStatus {
enum State {
RUNNING = 0; // Normal operation
SHUTDOWN_REQUESTED = 1; // Received request, starting drain
SHUTDOWN_DRAINING = 2; // Actively draining work
SHUTDOWN_BLOCKED = 3; // Blocked on external resource
SHUTDOWN_COMPLETE = 4; // Clean shutdown finished
SHUTDOWN_FORCED = 5; // Killed by launcher
}

State state = 1;

// Human-readable status message
string message = 2;

// Metrics about what's being drained
ShutdownMetrics metrics = 3;

// Whether process needs more time beyond initial grace period
bool need_more_time = 4;

// Additional seconds requested (only if need_more_time = true)
int32 additional_seconds = 5;
}

message ShutdownMetrics {
// In-flight requests still processing
int32 in_flight_requests = 1;

// Backend connections being closed
int32 open_connections = 2;

// Buffered data to flush (bytes)
int64 buffered_bytes = 3;

// Any blocking operations (e.g., "waiting for Kafka flush")
repeated string blocking_operations = 4;
}

message ShutdownComplete {
string process_id = 1;

// Final status message
string message = 2;

// Total shutdown time (milliseconds)
int64 shutdown_duration_ms = 3;
}

message ShutdownCompleteAck {
bool acknowledged = 1;
}

// Readiness check messages (for rolling replacement)
message ReadinessRequest {
string process_id = 1;
}

message ReadinessResponse {
enum ReadinessState {
STARTING = 0; // Process initializing
WARMING = 1; // Backend connections warming up
READY = 2; // Ready to accept traffic
DRAINING = 3; // Shutting down, not accepting new work
UNHEALTHY = 4; // Health check failed
}

ReadinessState state = 1;

// Human-readable status
string message = 2;

// Readiness checks (all must pass for READY state)
ReadinessChecks checks = 3;
}

message ReadinessChecks {
// gRPC server listening
bool grpc_server_ready = 1;

// Backend connection established
bool backend_connected = 2;

// Backend warmup complete (connection pool filled, etc.)
bool backend_warmed = 3;

// Any custom checks (e.g., schema migration complete)
map<string, bool> custom_checks = 4;
}

message ReadyNotification {
string process_id = 1;
string message = 2;
}

message ReadyAck {
bool acknowledged = 1;
}

Handshake Flows

Flow 0: Process Startup and Readiness

┌─────────┐                           ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ exec(keyvalue-runner) │
│───────────────────────────────────────>│
│ │ gRPC server
│ GetReadinessStatus() [polling] │ starting...
│───────────────────────────────────────>│
│ │
│ ReadinessResponse(STARTING) │
│<───────────────────────────────────────│
│ │
│ GetReadinessStatus() │
│───────────────────────────────────────>│
│ │ Backend
│ ReadinessResponse(WARMING) │ connecting...
│<───────────────────────────────────────│
│ │
│ GetReadinessStatus() │
│───────────────────────────────────────>│
│ │ All checks
│ ReadinessResponse(READY, checks=OK) │ passed
│<───────────────────────────────────────│
│ │
│ [Launcher adds to routing table] │ Accepting
│ │ traffic

Flow 1: Launcher → Subprocess (Direct Shutdown)

┌─────────┐                           ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ ShutdownRequest(grace=3s, max=10s) │
│───────────────────────────────────────>│
│ │ Stop accepting
│ ShutdownAck(estimated=2s) │ new requests
│<───────────────────────────────────────│
│ │
│ GetShutdownStatus() [polling] │
│───────────────────────────────────────>│
│ │ Drain work
│ ShutdownStatus(DRAINING, 5 inflight) │
│<───────────────────────────────────────│
│ │
│ GetShutdownStatus() │
│───────────────────────────────────────>│
│ │ All work done
│ ShutdownStatus(DRAINING, 0 inflight) │
│<───────────────────────────────────────│
│ │
│ NotifyShutdownComplete(duration=1.8s) │
│<───────────────────────────────────────│
│ │ Exit(0)
│ ShutdownCompleteAck() │
│───────────────────────────────────────>│
│ X

Flow 2: Admin → Launcher → Subprocess (Delegated)

┌──────┐          ┌─────────┐          ┌────────────────┐
│Admin │ │Launcher │ │Subprocess (KV) │
│ │ │ ProcMgr │ │ │
└──┬───┘ └────┬────┘ └───────┬────────┘
│ │ │
│ StopProcess(kv-1)│ │
│──────────────────>│ │
│ │ ShutdownRequest() │
│ │──────────────────────>│
│ │ │
│ │ ShutdownAck() │
│ │<──────────────────────│
│ Ack(stopping) │ │
│<──────────────────│ │
│ │ │
│ GetProcessStatus()│ │
│──────────────────>│ GetShutdownStatus() │
│ │──────────────────────>│
│ │ ShutdownStatus() │
│ │<──────────────────────│
│ ProcessStatus() │ │
│<──────────────────│ │
│ │ │
│ │ NotifyShutdownComplete()
│ │<──────────────────────│
│ ProcessStopped() │ X
│<──────────────────│

Flow 3: Timeout and Forced Kill

┌─────────┐                           ┌────────────────┐
│Launcher │ │Subprocess (KV) │
│ ProcMgr │ │ │
└────┬────┘ └───────┬────────┘
│ │
│ ShutdownRequest(grace=3s, max=10s) │
│───────────────────────────────────────>│
│ │
│ ShutdownAck(estimated=2s) │
│<───────────────────────────────────────│
│ │
│ GetShutdownStatus() [polling] │
│───────────────────────────────────────>│
│ │ Still draining
│ ShutdownStatus(DRAINING, need_more_time=true)
│<───────────────────────────────────────│
│ │
│ [3s grace period expires] │
│ GetShutdownStatus() │
│───────────────────────────────────────>│
│ │ Still stuck
│ ShutdownStatus(BLOCKED, "Kafka flush")│
│<───────────────────────────────────────│
│ │
│ [10s max deadline expires] │
│ SIGTERM │
│───────────────────────────────────────>│
│ │ Attempt cleanup
│ [2s SIGTERM timeout] │
│ SIGKILL │
│───────────────────────────────────────>│
│ X Process killed
│ Log: SHUTDOWN_FORCED (12.5s)

Process State Machine (Extended for Rolling Restart)

  ┌──────────┐
│ SPAWNING │ (launcher starts process)
└────┬─────┘

│ gRPC server starts
v
┌──────────┐
│ STARTING │ (process initializing)
└────┬─────┘

│ Backend connecting
v
┌──────────┐
│ WARMING │ (connection pools filling)
└────┬─────┘

│ All checks pass
v
┌──────────┐
│ READY │ (accepting traffic) ◄──┐
└────┬─────┘ │
│ │
│ ShutdownRequest() │ Still healthy
v │
┌──────────────────┐ │
│SHUTDOWN_REQUESTED│ │
└────┬─────────────┘ │
│ │
│ Stop accepting new work │
v │
┌──────────────┐ need_more_time │
│ DRAINING │◄───────────────────┘
└──┬───────┬───┘
│ │
│ │ Deadline expires
│ v
│ ┌────────┐
│ │ FORCED │ (SIGTERM → SIGKILL)
│ └────────┘

│ All work complete
v
┌──────────┐
│ COMPLETE │ (exit 0)
└──────────┘

Unhealthy at any point → UNHEALTHY → FORCED

Implementation Components

1. Launcher Process Manager (pkg/launcher/procmgr)

// ProcessManager manages subprocess lifecycle
type ProcessManager struct {
processes map[string]*ManagedProcess
config ShutdownConfig
}

type ShutdownConfig struct {
DefaultGracePeriod time.Duration // 3s
MaxShutdownDuration time.Duration // 10s
StatusPollInterval time.Duration // 500ms
SIGTERMTimeout time.Duration // 2s
}

type ManagedProcess struct {
ID string
Cmd *exec.Cmd
LifecycleClient lifecycle.ProcessLifecycleInterfaceClient
State ShutdownState
ShutdownStart time.Time
}

// InitiateShutdown starts graceful shutdown handshake
func (pm *ProcessManager) InitiateShutdown(ctx context.Context, processID string) error {
proc := pm.processes[processID]

// Send shutdown request
ack, err := proc.LifecycleClient.Shutdown(ctx, &lifecycle.ShutdownRequest{
ProcessId: processID,
Reason: "launcher shutdown",
GracePeriodSeconds: int32(pm.config.DefaultGracePeriod.Seconds()),
MaxShutdownSeconds: int32(pm.config.MaxShutdownDuration.Seconds()),
})

if err != nil {
// Process doesn't support graceful shutdown, fall back to SIGTERM
return pm.forceShutdown(proc)
}

proc.State = ShutdownStateRequested
proc.ShutdownStart = time.Now()

// Start monitoring goroutine
go pm.monitorShutdown(ctx, proc)

return nil
}

// monitorShutdown polls process until complete or timeout
func (pm *ProcessManager) monitorShutdown(ctx context.Context, proc *ManagedProcess) {
ticker := time.NewTicker(pm.config.StatusPollInterval)
defer ticker.Stop()

gracePeriodDeadline := proc.ShutdownStart.Add(pm.config.DefaultGracePeriod)
maxDeadline := proc.ShutdownStart.Add(pm.config.MaxShutdownDuration)

for {
select {
case <-ticker.C:
status, err := proc.LifecycleClient.GetShutdownStatus(ctx, &lifecycle.ShutdownStatusRequest{
ProcessId: proc.ID,
})

if err != nil {
// Lost connection, assume dead
pm.handleProcessGone(proc)
return
}

switch status.State {
case lifecycle.ShutdownStatus_SHUTDOWN_COMPLETE:
pm.handleShutdownComplete(proc, status)
return

case lifecycle.ShutdownStatus_SHUTDOWN_DRAINING:
pm.logShutdownProgress(proc, status)

// Check if we're past grace period
if time.Now().After(gracePeriodDeadline) {
if status.NeedMoreTime {
// Allow extension up to max deadline
pm.logExtension(proc, status)
}
}

// Check if we're past max deadline
if time.Now().After(maxDeadline) {
pm.forceShutdown(proc)
return
}

case lifecycle.ShutdownStatus_SHUTDOWN_BLOCKED:
pm.logBlocked(proc, status)

if time.Now().After(maxDeadline) {
pm.forceShutdown(proc)
return
}
}

case <-ctx.Done():
// Launcher itself shutting down, force kill
pm.forceShutdown(proc)
return
}
}
}

// forceShutdown escalates to SIGTERM then SIGKILL
func (pm *ProcessManager) forceShutdown(proc *ManagedProcess) error {
log.Warn().
Str("process_id", proc.ID).
Dur("elapsed", time.Since(proc.ShutdownStart)).
Msg("Forcing shutdown with SIGTERM")

proc.Cmd.Process.Signal(syscall.SIGTERM)

// Wait for SIGTERM timeout
done := make(chan error)
go func() {
done <- proc.Cmd.Wait()
}()

select {
case <-time.After(pm.config.SIGTERMTimeout):
log.Error().
Str("process_id", proc.ID).
Msg("SIGTERM timeout, sending SIGKILL")
proc.Cmd.Process.Kill()
case <-done:
// Process exited
}

proc.State = ShutdownStateForced
return nil
}

2. Launcher Client SDK (pkg/launcher/client)

// ShutdownCoordinator handles graceful shutdown in subprocess
type ShutdownCoordinator struct {
processID string
shutdownChan chan ShutdownRequest
completeChan chan struct{}
state atomic.Value // ShutdownState
metrics ShutdownMetrics
}

// NewShutdownCoordinator creates coordinator and registers gRPC service
func NewShutdownCoordinator(processID string, grpcServer *grpc.Server) *ShutdownCoordinator {
sc := &ShutdownCoordinator{
processID: processID,
shutdownChan: make(chan ShutdownRequest, 1),
completeChan: make(chan struct{}),
}
sc.state.Store(lifecycle.ShutdownStatus_RUNNING)

lifecycle.RegisterProcessLifecycleInterfaceServer(grpcServer, sc)

return sc
}

// Shutdown implements ProcessLifecycleInterface
func (sc *ShutdownCoordinator) Shutdown(ctx context.Context, req *lifecycle.ShutdownRequest) (*lifecycle.ShutdownAck, error) {
log.Info().
Str("reason", req.Reason).
Int32("grace_period", req.GracePeriodSeconds).
Msg("Received shutdown request")

sc.shutdownChan <- *req
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_REQUESTED)

return &lifecycle.ShutdownAck{
Acknowledged: true,
EstimatedSeconds: 2,
Message: "Beginning graceful shutdown",
}, nil
}

// GetShutdownStatus implements ProcessLifecycleInterface
func (sc *ShutdownCoordinator) GetShutdownStatus(ctx context.Context, req *lifecycle.ShutdownStatusRequest) (*lifecycle.ShutdownStatus, error) {
state := sc.state.Load().(lifecycle.ShutdownStatus_State)

return &lifecycle.ShutdownStatus{
State: state,
Message: sc.getStatusMessage(),
Metrics: &lifecycle.ShutdownMetrics{
InFlightRequests: int32(sc.metrics.InFlightRequests),
OpenConnections: int32(sc.metrics.OpenConnections),
BufferedBytes: sc.metrics.BufferedBytes,
BlockingOperations: sc.metrics.BlockingOperations,
},
NeedMoreTime: sc.needsMoreTime(),
AdditionalSeconds: sc.estimateAdditionalTime(),
}, nil
}

// Run starts shutdown coordination loop (in subprocess main)
func (sc *ShutdownCoordinator) Run(ctx context.Context, drainFunc func(context.Context) error) error {
select {
case req := <-sc.shutdownChan:
log.Info().Msg("Starting shutdown drain")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_DRAINING)

// Execute application-specific drain logic
drainCtx, cancel := context.WithTimeout(ctx, time.Duration(req.MaxShutdownSeconds)*time.Second)
defer cancel()

if err := drainFunc(drainCtx); err != nil {
log.Error().Err(err).Msg("Drain failed")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_BLOCKED)
return err
}

log.Info().Msg("Drain complete")
sc.state.Store(lifecycle.ShutdownStatus_SHUTDOWN_COMPLETE)
close(sc.completeChan)

return nil

case <-ctx.Done():
return ctx.Err()
}
}

3. Pattern Implementation Example

// keyvalue-runner main.go
func main() {
// ... setup code ...

grpcServer := grpc.NewServer()

// Register pattern service
keyvalue.RegisterKeyValueBasicInterfaceServer(grpcServer, kvService)

// Register shutdown coordinator
shutdownCoordinator := client.NewShutdownCoordinator("keyvalue-runner-1", grpcServer)

// Start serving
go grpcServer.Serve(lis)

// Wait for shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := shutdownCoordinator.Run(ctx, func(drainCtx context.Context) error {
// Application-specific drain logic
log.Info().Msg("Stopping new request acceptance")
kvService.StopAcceptingRequests()

// Wait for in-flight requests (with timeout)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
inFlight := kvService.GetInFlightCount()
if inFlight == 0 {
break
}

shutdownCoordinator.SetMetric("in_flight_requests", inFlight)

select {
case <-ticker.C:
continue
case <-drainCtx.Done():
return fmt.Errorf("drain timeout with %d in-flight", inFlight)
}
}

// Close backend connections
log.Info().Msg("Closing backend connections")
if err := kvService.CloseBackend(); err != nil {
return fmt.Errorf("backend close failed: %w", err)
}

// Stop gRPC server gracefully
grpcServer.GracefulStop()

return nil
})

if err != nil {
log.Error().Err(err).Msg("Shutdown failed")
os.Exit(1)
}

log.Info().Msg("Clean shutdown complete")
os.Exit(0)
}

4. Admin Delegated Shutdown

// Admin service (prism-admin) delegates to launcher
func (a *AdminService) StopProcess(ctx context.Context, req *admin.StopProcessRequest) (*admin.StopProcessResponse, error) {
// Find launcher managing this process
launcher, err := a.registry.GetLauncherForProcess(req.ProcessId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "process not found: %v", err)
}

// Forward shutdown request to launcher
shutdownReq := &launcher.ShutdownProcessRequest{
ProcessId: req.ProcessId,
Reason: "admin request",
GracePeriod: req.GracePeriodSeconds,
}

resp, err := launcher.ProcessMgrClient.ShutdownProcess(ctx, shutdownReq)
if err != nil {
return nil, status.Errorf(codes.Internal, "shutdown failed: %v", err)
}

return &admin.StopProcessResponse{
Acknowledged: true,
Message: resp.Message,
}, nil
}

// Admin can also query shutdown status
func (a *AdminService) GetProcessStatus(ctx context.Context, req *admin.GetProcessStatusRequest) (*admin.ProcessStatusResponse, error) {
launcher, err := a.registry.GetLauncherForProcess(req.ProcessId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "process not found: %v", err)
}

statusResp, err := launcher.ProcessMgrClient.GetProcessStatus(ctx, &launcher.ProcessStatusRequest{
ProcessId: req.ProcessId,
})

// ... convert and return status ...
}

Tradeoffs and Design Decisions

1. Pull vs Push Status Updates

Decision: Hybrid (Pull with Push Completion)

  • Launcher polls subprocess for status (GetShutdownStatus)
  • Subprocess pushes final completion (NotifyShutdownComplete)

Rationale:

  • Pull gives launcher control over monitoring frequency
  • Push completion avoids unnecessary polling after success
  • Subprocess doesn't need to know launcher endpoint

Tradeoff: More network calls vs simpler error handling

2. Grace Period Values

Decision: 3s initial grace, 10s max deadline (configurable)

Rationale:

  • 3s is typically sufficient for request draining (P99 < 1s)
  • 10s covers edge cases (slow backends, large buffers)
  • Configurable per pattern or deployment

Tradeoff: Fast shutdowns vs data safety

3. Extension Requests vs Fixed Deadline

Decision: Allow extensions via need_more_time flag, but enforce max deadline

Rationale:

  • Some operations genuinely need more time (Kafka flush, transaction commit)
  • Max deadline prevents infinite hangs
  • Process must explicitly request extension (no automatic grants)

Tradeoff: Flexibility vs bounded termination

4. State Machine Complexity

Decision: Simple 5-state machine (RUNNING, REQUESTED, DRAINING, COMPLETE, FORCED)

Rationale:

  • Enough states to track progress without over-engineering
  • BLOCKED state provides visibility into why shutdown is stuck
  • Single-direction transitions (no state reversals)

Tradeoff: Limited expressiveness vs implementation simplicity

5. Backward Compatibility

Decision: Fall back to SIGTERM/SIGKILL if gRPC shutdown not available

Rationale:

  • Processes can adopt gradually
  • Non-Go processes (if added) can use signals
  • Launcher doesn't hard-fail if process doesn't support protocol

Tradeoff: Mixed shutdown behaviors during migration

6. Process Groups

Decision: Deferred to future RFC (not in scope)

Rationale:

  • Single-process shutdown is sufficient for initial implementation
  • Group semantics (parallel vs sequential) need separate design
  • Can build on top of this protocol

Tradeoff: Admin must request shutdown per-process initially

Implementation Criteria

Must Have (P0)

  1. Protocol definition: ProcessLifecycleInterface service with shutdown + readiness RPCs
  2. Process groups: Launcher manages 1+ instances per pattern with desired/min_healthy counts
  3. Rolling replacement: Algorithm for zero-downtime instance upgrades
  4. Readiness checks: Launcher polls GetReadinessStatus until READY
  5. Launcher integration: ProcMgr implements handshake, monitoring, and replacement logic
  6. Client SDK: ShutdownCoordinator + ReadinessCoordinator in pkg/launcher/client
  7. Timeout enforcement: SIGTERM/SIGKILL escalation after max deadline
  8. Rollback: Kill new instance and keep old if readiness fails
  9. Observability: Structured logs for all state transitions (startup, ready, draining, complete)
  10. Tests: Unit tests for state machine + integration test for rolling replacement

Should Have (P1)

  1. Admin delegation: Admin can request process shutdown or rolling restart via launcher
  2. Metrics: Prometheus metrics for startup time, shutdown duration, readiness failures, rollback rate
  3. Configurable timeouts: Per-pattern grace period, max shutdown, and health check timeout
  4. Status API: Admin can query shutdown progress and group health
  5. Example implementation: keyvalue-runner demonstrates drain + readiness logic
  6. Health check customization: Processes can define custom readiness checks

Could Have (P2)

  1. Pre-shutdown hooks: Configurable actions before shutdown (e.g., deregister from service discovery)
  2. Shutdown dry-run: Admin can test what would be drained without actually shutting down
  3. Canary deployments: Staged rollouts (1 instance → wait → rest of group)
  4. Custom readiness checks: Plugin-defined health checks beyond backend connectivity
  5. Graceful degradation: Continue with fewer instances if replacement fails repeatedly

Won't Have (This RFC)

  1. Multi-launcher coordination: Cross-node shutdown orchestration
  2. Client SDK notification: Telling clients to disconnect before shutdown
  3. State persistence: Saving process state for resume
  4. Dynamic scaling: Auto-scaling based on load (separate RFC)
  5. Blue-green deployments: Full environment swaps (use orchestrator like Kubernetes)

Testing Strategy

Unit Tests

// Test state machine transitions
func TestShutdownStateMachine(t *testing.T) {
// RUNNING → REQUESTED → DRAINING → COMPLETE
}

// Test timeout enforcement
func TestShutdownTimeoutForceKill(t *testing.T) {
// Process doesn't complete, SIGTERM/SIGKILL sent
}

// Test extension requests
func TestShutdownExtensionRequest(t *testing.T) {
// Process requests more time, granted up to max deadline
}

Integration Tests

Implementation: tests/integration/shutdown/graceful_shutdown_test.go

The integration test suite validates all shutdown behaviors using a configurable test runner:

# Run all shutdown integration tests
make test-integration-shutdown

# Run with race detector
make test-integration-shutdown-race

Test Coverage

TestBehaviorValidates
TestGracefulShutdownCleanClean drain within grace periodFast completion (<3s)
TestGracefulShutdownSlowDrainSlow but successful drainRespects drain duration
TestGracefulShutdownHangNever completes drainForce kill after timeout
TestGracefulShutdownRequestMoreRequests extensionExtension granted
TestGracefulShutdownCrashCrashes during drainCrash detection
TestProcessGroupShutdownMultiple instancesParallel shutdown
TestRollingReplacementZero-downtime replacementMinHealthy maintained

Test Runner

The testrunner binary simulates pattern subprocess behavior:

testrunner \
--process-id "test-1" \
--port 50000 \
--behavior "clean|slow-drain|hang|request-more|crash" \
--drain-duration 2s \
--work-duration 100ms \
--initial-work 5

Behaviors:

  • clean: Drains all work and exits cleanly (happy path)
  • slow-drain: Takes full drain duration but succeeds
  • hang: Never completes, forces timeout and SIGKILL
  • request-more: Requests additional shutdown time (5s)
  • crash: Crashes (exit code 2) during drain

Test Architecture

TestShutdownHarness
├── procmgr.ProcessManager (OS process lifecycle)
├── launcher.ProcessManager (gRPC shutdown coordination)
└── launcher.ProcessGroupManager (rolling replacement)
└── spawns testrunner subprocesses

Documentation: See tests/integration/shutdown/README.md for detailed usage.

Acceptance Tests

// Test admin delegation
func TestAdminDelegatedShutdown(t *testing.T) {
// 1. Admin sends StopProcess request
// 2. Launcher receives and forwards to subprocess
// 3. Subprocess drains and completes
// 4. Admin queries status and sees COMPLETE
}

Migration Path

Phase 1: Protocol Definition (Week 1)

  • Add ProcessLifecycleInterface to proto
  • Generate Go code
  • Add to pkg/launcher/client SDK

Phase 2: Launcher Integration (Week 2)

  • Implement ProcMgr.InitiateShutdown()
  • Add monitoring loop and timeout logic
  • Add fallback to SIGTERM/SIGKILL
  • Unit tests for launcher-side logic

Phase 3: Pattern Integration (Week 3)

  • Integrate ShutdownCoordinator into keyvalue-runner
  • Implement drain logic for in-flight requests
  • Add backend connection cleanup
  • Integration test: launcher → keyvalue-runner shutdown

Phase 4: Admin Delegation (Week 4)

  • Add StopProcess RPC to admin service
  • Forward to launcher ProcessMgr
  • Add status query delegation
  • E2E test: admin → launcher → subprocess

Phase 5: Process Groups and Rolling Replacement (Week 5-6)

  • Implement ProcessGroup management in ProcMgr
  • Add RollingReplace() algorithm
  • Add readiness polling and rollback logic
  • Integration test: rolling restart of keyvalue-runner group
  • Configuration: YAML support for desired/min_healthy/max_surge

Phase 6: Observability (Week 7)

  • Add Prometheus metrics for startup time, shutdown duration, rollback rate
  • Add structured logging for all state transitions
  • Dashboard showing process group health and replacement progress
  • Runbook for debugging stuck shutdowns and failed readiness

Alternatives Considered

Alternative 1: Signal-Only Shutdown

Approach: Use SIGUSR1 for "graceful shutdown signal", no gRPC handshake

Pros:

  • Simpler, no protocol design
  • Works with any subprocess (not gRPC-specific)

Cons:

  • No visibility into shutdown progress
  • No bounded timeouts (or need complex signal chaining)
  • No status reporting (why is drain blocked?)
  • Hard to test shutdown logic independently

Verdict: Rejected due to lack of observability

Alternative 2: HTTP/REST Shutdown Endpoint

Approach: Each process exposes POST /shutdown endpoint

Pros:

  • Simpler than gRPC (just HTTP)
  • Easy to test with curl

Cons:

  • Mixing HTTP and gRPC is awkward architecturally
  • No bidirectional streaming (can't push completion)
  • Polling still required for status
  • Every process needs HTTP server + gRPC server

Verdict: Rejected, doesn't fit gRPC-centric architecture

Alternative 3: Shared Memory or File-Based Coordination

Approach: Launcher writes to /tmp/shutdown-<pid>, process polls file

Pros:

  • No network calls
  • Very low overhead

Cons:

  • No remote shutdown (admin delegation impossible)
  • Platform-specific (file semantics differ)
  • Hard to convey rich status (metrics, blocking operations)
  • Polling file is clunky

Verdict: Rejected, doesn't scale to admin delegation

Alternative 4: Context Cancellation Only

Approach: Launcher cancels subprocess's root context, waits for exit

Pros:

  • Idiomatic Go pattern
  • Minimal infrastructure

Cons:

  • No visibility into shutdown progress
  • No bounded timeouts (or hard to enforce)
  • Launcher can't distinguish "slow drain" from "hung process"
  • No protocol for admin delegation

Verdict: Rejected, context is internal-only mechanism

Rolling Replacement Flow

Flow 4: Rolling Restart of Process Group

Initial state: keyvalue-runner group with 2 instances (Desired=2, MinHealthy=1)
- kv-1: READY
- kv-2: READY

Step 1: Launch replacement for kv-1
┌─────────┐
│Launcher │ exec(kv-3) ┌────┐
│ ProcMgr │──────────────────────────>│kv-3│ SPAWNING
└────┬────┘ └─┬──┘
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ STARTING
│ ReadinessResponse(STARTING) │
│<─────────────────────────────────│
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ WARMING
│ ReadinessResponse(WARMING) │
│<─────────────────────────────────│
│ │
│ GetReadinessStatus() │
│─────────────────────────────────>│ READY
│ ReadinessResponse(READY) │
│<─────────────────────────────────│

State: kv-1 (READY), kv-2 (READY), kv-3 (READY) - 3 instances (over desired)

Step 2: Drain and stop kv-1
┌─────────┐
│Launcher │ ShutdownRequest() ┌────┐
│ ProcMgr │──────────────────────────>│kv-1│ SHUTDOWN_REQUESTED
└────┬────┘ └─┬──┘
│ │ Stop accepting
│ ShutdownAck() │ new work
│<─────────────────────────────────│
│ │
│ [Launcher removes kv-1 from │
│ routing table] │
│ │
│ GetShutdownStatus() │
│─────────────────────────────────>│ DRAINING
│ ShutdownStatus(DRAINING) │
│<─────────────────────────────────│
│ │
│ NotifyShutdownComplete() │
│<─────────────────────────────────│ COMPLETE
│ X exit(0)

State: kv-2 (READY), kv-3 (READY) - 2 instances (matches desired)

Step 3: Repeat for kv-2 (launch kv-4, drain kv-2)

Final state: kv-3 (READY), kv-4 (READY) - all instances replaced

Rolling Replacement Algorithm

// RollingReplace replaces all instances in a process group
func (pm *ProcessManager) RollingReplace(ctx context.Context, groupName string, newConfig ProcessConfig) error {
group := pm.groups[groupName]

// Validate constraints
if group.Config.MinHealthyInstances > group.Desired {
return fmt.Errorf("MinHealthyInstances (%d) > Desired (%d)",
group.Config.MinHealthyInstances, group.Desired)
}

maxSurge := group.Config.MaxSurge
if maxSurge == 0 {
maxSurge = 1 // Default: allow 1 extra instance
}

// Replace each instance one-by-one
for _, oldInstance := range group.Instances {
// Step 1: Start new instance
newInstance, err := pm.startProcess(ctx, groupName, newConfig)
if err != nil {
return fmt.Errorf("failed to start new instance: %w", err)
}

// Step 2: Wait for new instance to be READY (with timeout)
readyCtx, cancel := context.WithTimeout(ctx, group.Config.HealthCheckTimeout)
err = pm.waitForReady(readyCtx, newInstance)
cancel()

if err != nil {
// Rollback: kill new instance, keep old one
pm.forceShutdown(newInstance)
return fmt.Errorf("new instance failed to become ready: %w", err)
}

// Step 3: Add new instance to routing (if applicable)
pm.addToRouting(newInstance)

// Step 4: Initiate graceful shutdown of old instance
if err := pm.InitiateShutdown(ctx, oldInstance.ID); err != nil {
log.Warn().Err(err).Msg("Graceful shutdown failed, forcing")
pm.forceShutdown(oldInstance)
}

// Step 5: Wait for old instance to complete (or force after timeout)
// This is handled by monitorShutdown() goroutine

// Continue to next instance
}

return nil
}

// waitForReady polls readiness until READY or timeout
func (pm *ProcessManager) waitForReady(ctx context.Context, proc *ManagedProcess) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
resp, err := proc.LifecycleClient.GetReadinessStatus(ctx, &lifecycle.ReadinessRequest{
ProcessId: proc.ID,
})

if err != nil {
return fmt.Errorf("readiness check failed: %w", err)
}

switch resp.State {
case lifecycle.ReadinessResponse_READY:
log.Info().
Str("process_id", proc.ID).
Msg("Process ready")
return nil

case lifecycle.ReadinessResponse_UNHEALTHY:
return fmt.Errorf("process unhealthy: %s", resp.Message)

case lifecycle.ReadinessResponse_STARTING, lifecycle.ReadinessResponse_WARMING:
log.Debug().
Str("process_id", proc.ID).
Str("state", resp.State.String()).
Msg("Waiting for ready")
// Continue polling
}

case <-ctx.Done():
return fmt.Errorf("readiness timeout: %w", ctx.Err())
}
}
}

Process Group Configuration

# launcher-config.yaml
process_groups:
- name: keyvalue-runner
desired_instances: 2
min_healthy_instances: 1 # At least 1 READY during rolling restart
max_surge: 1 # Allow 1 extra instance temporarily
health_check_timeout: 30s # Wait up to 30s for new instance readiness
shutdown:
grace_period: 3s
max_duration: 10s

- name: pubsub-runner
desired_instances: 3
min_healthy_instances: 2
max_surge: 1
health_check_timeout: 45s # Longer for backend connection warmup
shutdown:
grace_period: 5s # More time for message acks
max_duration: 15s

Rollback on Failed Replacement

If a new instance fails to become READY:

  1. Immediate rollback: Kill new instance, keep old one
  2. Preserve availability: Old instance continues serving traffic
  3. Alert operator: Log error and emit metric
  4. Retry policy: Configurable (immediate retry, exponential backoff, manual)
func (pm *ProcessManager) RollingReplace(ctx context.Context, groupName string, newConfig ProcessConfig) error {
// ... start new instance ...

err = pm.waitForReady(readyCtx, newInstance)
if err != nil {
// ROLLBACK: Kill new instance
log.Error().
Err(err).
Str("old_instance", oldInstance.ID).
Str("new_instance", newInstance.ID).
Msg("Rolling replace failed, rolling back")

pm.forceShutdown(newInstance)
pm.recordMetric("rolling_replace_failures", 1)

// OLD INSTANCE CONTINUES RUNNING
return fmt.Errorf("rollback executed: %w", err)
}

// Success: proceed with draining old instance
}

Success Metrics

  1. Shutdown success rate: >99% of shutdowns complete cleanly (exit 0) within grace period
  2. Forced kill rate: <1% of shutdowns require SIGKILL
  3. Shutdown duration: P50 <2s, P99 <5s, P100 <10s
  4. Data loss: Zero reported data loss incidents due to abrupt termination
  5. Observability: 100% of shutdowns have structured logs with state transitions
  6. Adoption: All pattern runners implement graceful shutdown within 2 sprints
  7. Rolling restart success: >99% of rolling restarts complete without downtime
  8. Startup time: P50 <5s, P99 <15s for new instances to reach READY
  9. Rollback rate: <1% of rolling restarts require rollback due to failed readiness
  10. Zero-downtime: 100% availability maintained during rolling restarts (min_healthy honored)

Open Questions

  1. Health check customization: Should processes define custom readiness checks beyond backend connection? (Yes, via ReadinessChecks.custom_checks)
  2. Pre-shutdown hooks: Do we need plugin hooks for pre-shutdown actions (e.g., deregister from service discovery)? (Future enhancement)
  3. Client notification: Should we notify client SDKs before shutting down backends? (Out of scope for this RFC)
  4. Auto-restart vs rolling restart: How do we distinguish between crash recovery (auto-restart) and intentional upgrade (rolling restart)? (Config flag: restart_policy: on-failure vs rolling_replace)
  5. Canary deployments: Should we support staged rollouts (e.g., 1 instance first, then rest)? (Future RFC)
  6. Routing coordination: How does launcher notify proxy of instance availability changes? (Requires service discovery integration - future work)

References

  • RFC-008: Proxy-Plugin Architecture (patterns run as subprocesses)
  • RFC-018: POC Implementation Strategy (keyvalue-runner is first pattern)
  • ADR-005: Backend Plugin Architecture (drivers as libraries, not processes)

Changelog

  • 2025-01-21: Initial RFC created