Layered Interface Hierarchy
Context
Prism needs a coherent interface hierarchy that:
- Starts with basic primitives (sessions, auth, auditing)
- Builds up to use-case-specific operations
- Maintains clean separation of concerns
- Supports multiple backend implementations
- Enables progressive disclosure of complexity
Interface Layers:
- Session Layer: Authorization, auditing, connection state
- Queue Layer: Kafka-style message queues
- Pub/Sub Layer: NATS-style publish-subscribe
- Paged Reader Layer: Database pagination and queries
- Transact Write Layer: Two-table transactional writes
Decision
Implement layered interface hierarchy with clear dependencies:
- Session as foundation: All operations require active session
- Layer independence: Each use-case layer operates independently
- Composable operations: Clients can use multiple layers simultaneously
- Backend polymorphism: Each layer supports multiple backend implementations
- Protobuf definitions: All interfaces defined in protobuf
Rationale
Layer Hierarchy
┌──────────────────────────────────┐
│ Client Applications │
└────────────┬─────────────────────┘
│
┌────────────────────────┼────────────────────────┐
│ │ │
│ │ │
┌───────▼────────┐ ┌──────────▼──────┐ ┌──────────▼──────┐
│ Queue Layer │ │ PubSub Layer │ │ Reader Layer │
│ (Kafka-style) │ │ (NATS-style) │ │ (DB pagination) │
└───────┬────────┘ └──────────┬──────┘ └──────────┬──────┘
│ │ │
│ ┌──────────▼──────┐ │
│ │ Transact Layer │ │
│ │ (2-table write) │ │
│ └──────────┬──────┘ │
│ │ │
└────────────────────────┼────────────────────────┘
│
┌────────────▼─────────────┐
│ Session Layer │
│ (auth, audit, state) │
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ Prism Proxy Core │
└──────────────────────────┘
Layer 1: Session Service
Purpose: Foundation for all operations - authentication, authorization, auditing, connection state
// proto/prism/session/v1/session_service.proto
syntax = "proto3";
package prism.session.v1;
import "google/protobuf/timestamp.proto";
import "prism/config/v1/client_config.proto";
service SessionService {
// Create new session with client configuration
rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);
// Close session cleanly
rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
// Heartbeat to keep session alive
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
// Get session info
rpc GetSession(GetSessionRequest) returns (GetSessionResponse);
}
message CreateSessionRequest {
// Authentication credentials
oneof auth {
string api_key = 1;
string jwt_token = 2;
MutualTLSAuth mtls = 3;
}
// Client configuration (named or inline)
oneof config {
string config_name = 4;
prism.config.v1.ClientConfig inline_config = 5;
}
// Client metadata
string client_id = 6;
string client_version = 7;
}
message CreateSessionResponse {
// Session token for subsequent requests
string session_token = 1;
// Session metadata
string session_id = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Timestamp expires_at = 4;
// Resolved configuration
prism.config.v1.ClientConfig config = 5;
}
message CloseSessionRequest {
string session_token = 1;
bool force = 2; // Force close even with pending operations
}
message CloseSessionResponse {
bool success = 1;
string message = 2;
}
message HeartbeatRequest {
string session_token = 1;
}
message HeartbeatResponse {
google.protobuf.Timestamp server_time = 1;
int32 ttl_seconds = 2;
}
message MutualTLSAuth {
bytes client_cert = 1;
}
Session State:
- Active sessions tracked server-side
- Idle timeout (default: 5 minutes)
- Max session duration (default: 24 hours)
- Heartbeat keeps session alive
- Clean closure releases resources
Layer 2: Queue Service
Purpose: Kafka-style message queue operations
// proto/prism/queue/v1/queue_service.proto
syntax = "proto3";
package prism.queue.v1;
import "google/protobuf/timestamp.proto";
service QueueService {
// Publish message to topic
rpc Publish(PublishRequest) returns (PublishResponse);
// Subscribe to topic (server streaming)
rpc Subscribe(SubscribeRequest) returns (stream Message);
// Acknowledge message processing
rpc Acknowledge(AcknowledgeRequest) returns (AcknowledgeResponse);
// Commit offset
rpc Commit(CommitRequest) returns (CommitResponse);
// Seek to offset
rpc Seek(SeekRequest) returns (SeekResponse);
}
message PublishRequest {
string session_token = 1;
string topic = 2;
bytes payload = 3;
map<string, string> headers = 4;
optional string partition_key = 5;
}
message PublishResponse {
string message_id = 1;
int64 offset = 2;
int32 partition = 3;
}
message SubscribeRequest {
string session_token = 1;
string topic = 2;
string consumer_group = 3;
optional int64 start_offset = 4;
}
message Message {
string message_id = 1;
bytes payload = 2;
map<string, string> headers = 3;
int64 offset = 4;
int32 partition = 5;
google.protobuf.Timestamp timestamp = 6;
}
message AcknowledgeRequest {
string session_token = 1;
string message_id = 2;
}
message AcknowledgeResponse {
bool success = 1;
}
message CommitRequest {
string session_token = 1;
string topic = 2;
int32 partition = 3;
int64 offset = 4;
}
message CommitResponse {
bool success = 1;
}
Backend Mapping:
- Kafka: Direct mapping to topics/partitions/offsets
- NATS JetStream: Stream/consumer/sequence
- Postgres: Table-based queue with SKIP LOCKED
Layer 3: PubSub Service
Purpose: NATS-style publish-subscribe with topics and wildcards
// proto/prism/pubsub/v1/pubsub_service.proto
syntax = "proto3";
package prism.pubsub.v1;
import "google/protobuf/timestamp.proto";
service PubSubService {
// Publish event to topic
rpc Publish(PublishRequest) returns (PublishResponse);
// Subscribe to topic pattern (server streaming)
rpc Subscribe(SubscribeRequest) returns (stream Event);
// Unsubscribe from topic
rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse);
}
message PublishRequest {
string session_token = 1;
string topic = 2; // e.g., "events.user.created"
bytes payload = 3;
map<string, string> metadata = 4;
}
message PublishResponse {
string event_id = 1;
google.protobuf.Timestamp published_at = 2;
}
message SubscribeRequest {
string session_token = 1;
string topic_pattern = 2; // e.g., "events.user.*"
optional string queue_group = 3; // For load balancing
}
message Event {
string event_id = 1;
string topic = 2;
bytes payload = 3;
map<string, string> metadata = 4;
google.protobuf.Timestamp timestamp = 5;
}
message UnsubscribeRequest {
string session_token = 1;
string topic_pattern = 2;
}
message UnsubscribeResponse {
bool success = 1;
}
Backend Mapping:
- NATS: Native subject-based routing with wildcards
- Kafka: Topic prefix matching
- Redis Pub/Sub: Channel pattern subscription
Layer 4: Reader Service
Purpose: Database-style paged reading and queries
// proto/prism/reader/v1/reader_service.proto
syntax = "proto3";
package prism.reader.v1;
import "google/protobuf/struct.proto";
service ReaderService {
// Read pages of data (server streaming)
rpc Read(ReadRequest) returns (stream Page);
// Query with filters (server streaming)
rpc Query(QueryRequest) returns (stream Row);
// Count matching records
rpc Count(CountRequest) returns (CountResponse);
}
message ReadRequest {
string session_token = 1;
string collection = 2;
int32 page_size = 3;
optional string cursor = 4; // Continuation token
repeated string fields = 5; // Projection
}
message Page {
repeated Row rows = 1;
optional string next_cursor = 2;
bool has_more = 3;
}
message QueryRequest {
string session_token = 1;
string collection = 2;
Filter filter = 3;
repeated Sort sort = 4;
int32 page_size = 5;
optional string cursor = 6;
}
message Filter {
oneof filter {
FieldFilter field = 1;
CompositeFilter composite = 2;
}
}
message FieldFilter {
string field = 1;
Operator op = 2;
google.protobuf.Value value = 3;
enum Operator {
OPERATOR_UNSPECIFIED = 0;
OPERATOR_EQUALS = 1;
OPERATOR_NOT_EQUALS = 2;
OPERATOR_GREATER_THAN = 3;
OPERATOR_LESS_THAN = 4;
OPERATOR_IN = 5;
OPERATOR_CONTAINS = 6;
}
}
message CompositeFilter {
LogicalOperator op = 1;
repeated Filter filters = 2;
enum LogicalOperator {
LOGICAL_OPERATOR_UNSPECIFIED = 0;
LOGICAL_OPERATOR_AND = 1;
LOGICAL_OPERATOR_OR = 2;
}
}
message Sort {
string field = 1;
Direction direction = 2;
enum Direction {
DIRECTION_UNSPECIFIED = 0;
DIRECTION_ASC = 1;
DIRECTION_DESC = 2;
}
}
message Row {
map<string, google.protobuf.Value> fields = 1;
}
message CountRequest {
string session_token = 1;
string collection = 2;
optional Filter filter = 3;
}
message CountResponse {
int64 count = 1;
}
Backend Mapping:
- Postgres: SQL queries with LIMIT/OFFSET
- SQLite: Same as Postgres
- DynamoDB: Query with pagination tokens
- Neptune: Gremlin queries with pagination
Layer 5: Transact Service
Purpose: Transactional writes across two tables (inbox/outbox pattern)
// proto/prism/transact/v1/transact_service.proto
syntax = "proto3";
package prism.transact.v1;
import "google/protobuf/struct.proto";
service TransactService {
// Single transactional write
rpc Write(WriteRequest) returns (WriteResponse);
// Streaming transaction
rpc Transaction(stream TransactRequest) returns (stream TransactResponse);
}
message WriteRequest {
string session_token = 1;
// Data table write
DataWrite data = 2;
// Mailbox table write
MailboxWrite mailbox = 3;
// Transaction options
TransactionOptions options = 4;
}
message DataWrite {
string table = 1;
map<string, google.protobuf.Value> record = 2;
WriteMode mode = 3;
enum WriteMode {
WRITE_MODE_UNSPECIFIED = 0;
WRITE_MODE_INSERT = 1;
WRITE_MODE_UPDATE = 2;
WRITE_MODE_UPSERT = 3;
}
}
message MailboxWrite {
string mailbox_id = 1;
bytes message = 2;
map<string, string> metadata = 3;
}
message TransactionOptions {
IsolationLevel isolation = 1;
int32 timeout_ms = 2;
enum IsolationLevel {
ISOLATION_LEVEL_UNSPECIFIED = 0;
ISOLATION_LEVEL_READ_COMMITTED = 1;
ISOLATION_LEVEL_SERIALIZABLE = 2;
}
}
message WriteResponse {
string transaction_id = 1;
bool committed = 2;
DataWriteResult data_result = 3;
MailboxWriteResult mailbox_result = 4;
}
message DataWriteResult {
int64 rows_affected = 1;
map<string, google.protobuf.Value> generated_values = 2;
}
message MailboxWriteResult {
string message_id = 1;
int64 sequence = 2;
}
// For streaming transactions
message TransactRequest {
oneof request {
BeginTransaction begin = 1;
WriteRequest write = 2;
CommitTransaction commit = 3;
RollbackTransaction rollback = 4;
}
}
message BeginTransaction {
string session_token = 1;
TransactionOptions options = 2;
}
message CommitTransaction {}
message RollbackTransaction {}
message TransactResponse {
oneof response {
TransactionStarted started = 1;
WriteResponse write_result = 2;
TransactionCommitted committed = 3;
TransactionRolledBack rolled_back = 4;
}
}
message TransactionStarted {
string transaction_id = 1;
}
message TransactionCommitted {
bool success = 1;
}
message TransactionRolledBack {
string reason = 1;
}
Backend Mapping:
- Postgres: Native transactions with two-table writes
- SQLite: Same as Postgres
- DynamoDB: TransactWriteItems with two items
Cross-Layer Concepts
Session Token Propagation: All layers require session token in metadata or request:
// Server: extract session from request
async fn validate_session(&self, token: &str) -> Result<Session, Status> {
self.session_store
.get(token)
.await
.ok_or_else(|| Status::unauthenticated("invalid session token"))
}
// All service methods start with validation
async fn publish(&self, req: Request<PublishRequest>) -> Result<Response<PublishResponse>, Status> {
let req = req.into_inner();
let session = self.validate_session(&req.session_token).await?;
// Use session for authorization, auditing, routing
// ...
}
Auditing: Session layer provides audit hooks:
struct AuditLog {
session_id: String,
operation: String,
resource: String,
timestamp: Timestamp,
success: bool,
}
// Logged for all operations
self.audit_logger.log(AuditLog {
session_id: session.id,
operation: "queue.publish",
resource: format!("topic:{}", req.topic),
timestamp: Utc::now(),
success: true,
});
Alternatives Considered
-
Monolithic service with all operations
- Pros: Simple, single service
- Cons: Tight coupling, hard to evolve independently
- Rejected: Violates separation of concerns
-
Backend-specific services (KafkaService, PostgresService)
- Pros: Clear backend mapping
- Cons: Leaks implementation, prevents backend swapping
- Rejected: Violates abstraction goal
-
Single generic DataService
- Pros: Ultimate flexibility
- Cons: No type safety, unclear semantics
- Rejected: Too generic, loses use-case clarity
Consequences
Positive
- Clear separation: Each layer has distinct purpose
- Progressive disclosure: Clients use only what they need
- Independent evolution: Layers evolve independently
- Backend polymorphism: Multiple backends per layer
- Type safety: Protobuf enforces correct usage
- Session foundation: All operations audited and authorized
Negative
- Multiple services: More gRPC services to manage
- Session overhead: All requests must validate session
- Complexity: More interfaces to learn
Neutral
- Service discovery: Clients must know which service to use
- Version management: Each layer versions independently
References
- ADR-022: Dynamic Client Configuration
- ADR-023: gRPC-First Interface Design
- Inbox/Outbox Pattern
- Netflix Data Gateway Architecture
Revision History
- 2025-10-07: Initial draft and acceptance