RFC-032: Minimal Prism Schema Registry for Local Testing
Abstract
This RFC defines a minimal Prism Schema Registry as a local stand-in for testing and acceptance tests. It provides a lightweight implementation of the schema registry interface (RFC-030) that:
- Runs locally without external dependencies (no Confluent, no Apicurio)
- Implements core schema registry operations (register, get, list, validate)
- Serves as baseline for acceptance tests across all backend plugins
- Provides interface compatibility with Confluent and AWS Glue schema registries
- Enables fast developer iteration (<100ms startup, in-memory storage)
This is not a production schema registry - it's a testing tool for local development and CI/CD.
Motivation
The Problem: External Dependencies in Tests
Current Testing Challenges:
# Test requires running Confluent Schema Registry (JVM, 1GB+ memory, 30s startup)
docker-compose up schema-registry kafka zookeeper # 3 services for one test!
# Integration test:
pytest test_schema_validation.py --schema-registry http://localhost:8081
# ❌ Flaky: Schema registry not ready yet
# ❌ Slow: 30s startup + 5s per test
# ❌ Heavy: 1GB+ memory for registry alone
Problems:
- External Dependency: Tests can't run without Confluent/Apicurio
- Slow Startup: 30+ seconds before tests can run
- Resource Heavy: 1GB+ memory for JVM-based registry
- Flaky Tests: Race conditions during startup
- CI/CD Cost: Every test run spawns heavy containers
What We Need: Minimal Local Registry
# Ideal test experience:
prism-schema-registry --port 8081 & # <100ms startup, <10MB memory
pytest test_schema_validation.py # Tests run immediately
Requirements:
- ✅ In-memory storage (no persistence needed for tests)
- ✅ Rust-based (fast, small footprint)
- ✅ REST + gRPC APIs (compatible with Confluent clients)
- ✅ Schema validation (protobuf, JSON Schema)
- ✅ Compatibility checking (backward, forward, full)
- ❌ NOT for production (no HA, no persistence, no auth)
Goals
- Fast Local Testing: <100ms startup, in-memory storage
- Acceptance Test Baseline: All plugin tests use same registry
- Interface Compatibility: Drop-in replacement for Confluent Schema Registry REST API
- Schema Format Support: Protobuf, JSON Schema, Avro
- Validation Coverage: Backward/forward/full compatibility checks
- Developer Experience: Single binary, no external dependencies
Non-Goals
- Production Deployment: Use Confluent/Apicurio for production
- Persistence: In-memory only (tests recreate schemas)
- High Availability: Single instance, no clustering
- Authentication: No auth/authz (local testing only)
- Multi-Tenancy: Single global namespace
Proposed Solution: Minimal Prism Schema Registry
Core Architecture
┌────────────────────────────────────────────────────────────┐
│ prism-schema-registry (Rust binary, <10MB) │
├────────────────────────────────────────────────────────────┤
│ │
│ REST API (Confluent-compatible) │
│ ├─ POST /subjects/:subject/versions │
│ ├─ GET /subjects/:subject/versions/:version │
│ ├─ GET /subjects/:subject/versions │
│ ├─ POST /compatibility/subjects/:subject/versions/:ver │
│ └─ DELETE /subjects/:subject/versions/:version │
│ │
│ gRPC API (Prism-native) │
│ ├─ RegisterSchema() │
│ ├─ GetSchema() │
│ ├─ ListSchemas() │
│ └─ CheckCompatibility() │
│ │
│ In-Memory Storage │
│ └─ HashMap<SubjectVersion, Schema> │
│ │
│ Schema Validators │
│ ├─ Protobuf (via prost) │
│ ├─ JSON Schema (via jsonschema crate) │
│ └─ Avro (via apache-avro) │
│ │
│ Compatibility Checker │
│ └─ Backward/Forward/Full validation logic │
│ │
└──────────────────────────────────────── ────────────────────┘
Confluent Schema Registry REST API Compatibility
Why Confluent API: Most widely adopted, rich client library ecosystem
Core Endpoints (Subset):
# Register new schema version
POST /subjects/{subject}/versions
{
"schema": "{...protobuf IDL...}",
"schemaType": "PROTOBUF"
}
→ 200 OK
{
"id": 1,
"version": 1
}
# Get schema by version
GET /subjects/{subject}/versions/{version}
→ 200 OK
{
"id": 1,
"version": 1,
"schema": "{...protobuf IDL...}",
"schemaType": "PROTOBUF"
}
# List all versions for subject
GET /subjects/{subject}/versions
→ 200 OK
[1, 2, 3]
# Check compatibility
POST /compatibility/subjects/{subject}/versions/{version}
{
"schema": "{...new schema...}",
"schemaType": "PROTOBUF"
}
→ 200 OK
{
"is_compatible": true
}
# Delete schema version
DELETE /subjects/{subject}/versions/{version}
→ 200 OK
1
Not Implemented (Out of Scope for Minimal Registry):
/configendpoints (global/subject compatibility settings)/modeendpoints (READONLY, READWRITE modes)/schemas/ids/:id(lookup by global schema ID)- Advanced compatibility modes (TRANSITIVE, NONE_TRANSITIVE)
Schema Format Support
Protobuf (Primary):
use prost_reflect::DescriptorPool;
fn validate_protobuf(schema: &str) -> Result<(), ValidationError> {
// Parse protobuf schema
let descriptor = DescriptorPool::decode(schema.as_bytes())?;
// Validate syntax
for msg in descriptor.all_messages() {
// Check for required fields (backward compat violation)
for field in msg.fields() {
if field.is_required() {
return Err(ValidationError::RequiredField(field.name()));
}
}
}
Ok(())
}
JSON Schema (Secondary):
use jsonschema::JSONSchema;
fn validate_json_schema(schema: &str) -> Result<(), ValidationError> {
let schema_json: serde_json::Value = serde_json::from_str(schema)?;
let compiled = JSONSchema::compile(&schema_json)?;
Ok(())
}
Avro (Tertiary - Basic Support):
use apache_avro::Schema as AvroSchema;
fn validate_avro(schema: &str) -> Result<(), ValidationError> {
let avro_schema = AvroSchema::parse_str(schema)?;
Ok(())
}
Compatibility Checking
Backward Compatibility (Most Common):
fn check_backward_compatible(old_schema: &Schema, new_schema: &Schema) -> CompatibilityResult {
match (old_schema.schema_type, new_schema.schema_type) {
(SchemaType::Protobuf, SchemaType::Protobuf) => {
check_protobuf_backward(old_schema, new_schema)
}
_ => CompatibilityResult::Incompatible("Type mismatch")
}
}
fn check_protobuf_backward(old: &Schema, new: &Schema) -> CompatibilityResult {
let old_desc = parse_protobuf(&old.content)?;
let new_desc = parse_protobuf(&new.content)?;
// Check rules:
// 1. New schema can read old data
// 2. Can't remove required fields
// 3. Can't change field types
// 4. Can add optional fields
for old_field in old_desc.fields() {
if let Some(new_field) = new_desc.get_field(old_field.number()) {
// Field exists in both - check type compatibility
if old_field.type_name() != new_field.type_name() {
return CompatibilityResult::Incompatible(
format!("Field {} changed type", old_field.name())
);
}
} else {
// Field removed - check if it was required
if old_field.is_required() {
return CompatibilityResult::Incompatible(
format!("Required field {} removed", old_field.name())
);
}
}
}
CompatibilityResult::Compatible
}