RFC-055: Graph Pattern - High-Level Graph Database API
Status: Draft Author: Platform Team Created: 2025-11-14 Updated: 2025-11-14
Abstract
This RFC defines a Graph Pattern that provides a high-level, backend-agnostic API for graph database operations. Applications can model and query highly connected data (social networks, knowledge graphs, recommendation systems, fraud detection) through intuitive graph operations without writing backend-specific query languages.
The pattern abstracts common graph operations (vertex/edge CRUD, traversals, pathfinding, algorithms) and can be backed by multiple graph databases (AWS Neptune, Neo4j, JanusGraph) through Prism's layered architecture.
Key Innovation: Same client API works with property graphs (Gremlin), RDF graphs (SPARQL), or native graph databases (Cypher), allowing backend migration without application changes.
Motivation
Problem Statement
Modern applications need graph capabilities for highly connected data, but implementing graph operations is complex:
Current Challenges:
- Query Language Lock-in: Learning Gremlin, Cypher, or SPARQL specific to each backend
- Backend-Specific Code: Direct Neptune/Neo4j SDK usage couples apps to infrastructure
- No Type Safety: Query strings lack compile-time validation
- Manual Optimization: Applications must implement caching, batching, connection pooling
- Complex Operations: Implementing graph algorithms (PageRank, shortest path) from scratch
- Testing Difficulty: Mocking graph queries requires extensive setup
Example Without Prism:
// Application tightly coupled to Gremlin
query := "g.V('user:alice').out('FOLLOWS').out('FOLLOWS').dedup().limit(10)"
result := gremlinClient.Submit(query)
// Must parse Gremlin-specific result format
vertices := parseGremlinResults(result)
Problems:
- Application must know Gremlin syntax
- Switching to Neo4j (Cypher) requires rewriting queries
- No type safety or validation
- Manual result parsing
Goals
- Backend Abstraction: Same API for Neptune, Neo4j, JanusGraph, ArangoDB
- Type Safety: Protobuf-based operations with compile-time guarantees
- Common Operations: Built-in support for traversals, pathfinding, algorithms
- Performance: Automatic query optimization, caching, batching
- Composability: Layer with other patterns (Cache + Graph, CDC + Graph)
- Testing: Local graph backend (embedded mode) for development
Use Cases
Use Case 1: Social Network Graph
Application Problem: Find friends-of-friends, mutual connections, community detection.
pattern: graph
use_case: social-network
# What you model
entities:
- type: User
properties: [name, email, joined_at]
- type: Post
properties: [content, created_at]
relationships:
- type: FOLLOWS
from: User
to: User
- type: LIKES
from: User
to: Post
Client Code:
# Find friends of friends (2 hops)
fof = client.graph.traverse(
start_vertex="user:alice",
steps=[
TraversalStep(direction=OUT, edge_label="FOLLOWS"),
TraversalStep(direction=OUT, edge_label="FOLLOWS")
],
limit=10
)
# Find shortest connection path
path = client.graph.shortest_path(
start="user:alice",
end="user:bob",
max_hops=6
)
Use Case 2: Knowledge Graph
Application Problem: Model entities and relationships, semantic search, inference.
pattern: graph
use_case: knowledge-graph
entities:
- type: Person
properties: [name, birth_date, nationality]
- type: Organization
properties: [name, founded, industry]
- type: Event
properties: [name, date, location]
relationships:
- type: WORKS_AT
from: Person
to: Organization
properties: [role, start_date]
- type: FOUNDED
from: Person
to: Organization
- type: ATTENDED
from: Person
to: Event
Client Code:
# Find all organizations founded by Alice
orgs = client.graph.traverse(
start_vertex="person:alice",
steps=[TraversalStep(direction=OUT, edge_label="FOUNDED")],
vertex_filter={"type": "Organization"}
)
# Complex query: People who worked at companies Alice founded
people = client.graph.traverse(
start_vertex="person:alice",
steps=[
TraversalStep(direction=OUT, edge_label="FOUNDED"),
TraversalStep(direction=IN, edge_label="WORKS_AT")
],
limit=50
)
Use Case 3: Recommendation System
Application Problem: Product recommendations based on user behavior and item similarity.
pattern: graph
use_case: recommendations
entities:
- type: User
properties: [user_id, preferences]
- type: Product
properties: [product_id, category, price]
relationships:
- type: PURCHASED
from: User
to: Product
properties: [timestamp, rating]
- type: VIEWED
from: User
to: Product
- type: SIMILAR_TO
from: Product
to: Product
properties: [similarity_score]
Client Code:
# Collaborative filtering: What did similar users buy?
recommendations = client.graph.traverse(
start_vertex="user:123",
steps=[
# Users who bought the same products
TraversalStep(direction=OUT, edge_label="PURCHASED"),
TraversalStep(direction=IN, edge_label="PURCHASED"),
# What else they bought
TraversalStep(direction=OUT, edge_label="PURCHASED")
],
filters={"exclude": ["user:123"]}, # Exclude original user
rank_by="purchase_count",
limit=20
)
Use Case 4: Fraud Detection Network
Application Problem: Detect fraud rings by analyzing transaction networks and account linkages.
pattern: graph
use_case: fraud-detection
entities:
- type: Account
properties: [account_id, created_at, status]
- type: Transaction
properties: [amount, timestamp]
- type: Device
properties: [device_id, ip_address]
relationships:
- type: TRANSFERRED_TO
from: Account
to: Account
properties: [amount, timestamp]
- type: LOGGED_IN_FROM
from: Account
to: Device
- type: LINKED_TO
from: Account
to: Account
properties: [link_type]
Client Code:
# Find potential fraud ring: accounts with circular transfers
rings = client.graph.detect_cycles(
start_vertex="account:suspicious-123",
edge_label="TRANSFERRED_TO",
max_depth=5,
min_cycle_length=3
)
# PageRank to identify hub accounts
hubs = client.graph.pagerank(
vertex_filter={"type": "Account"},
iterations=20,
damping_factor=0.85,
limit=10
)
Pattern Definition
Core Operations
The Graph pattern provides these primitive operations:
1. Vertex Operations
syntax = "proto3";
package prism.graph.v1;
// Create vertex
message CreateVertexRequest {
string id = 1; // Unique vertex ID
string label = 2; // Vertex type (e.g., "User", "Product")
map<string, Value> properties = 3; // Vertex properties
}
// Read vertex
message GetVertexRequest {
string id = 1;
bool include_edges = 2; // Include connected edges
}
message GetVertexResponse {
Vertex vertex = 1;
repeated Edge edges = 2; // If include_edges=true
}
// Update vertex
message UpdateVertexRequest {
string id = 1;
map<string, Value> properties = 2; // Properties to update
bool merge = 3; // Merge or replace
}
// Delete vertex
message DeleteVertexRequest {
string id = 1;
bool cascade = 2; // Delete connected edges
}
2. Edge Operations
// Create edge
message CreateEdgeRequest {
string id = 1; // Unique edge ID
string label = 2; // Edge type (e.g., "FOLLOWS", "PURCHASED")
string from_vertex_id = 3; // Source vertex
string to_vertex_id = 4; // Target vertex
map<string, Value> properties = 5; // Edge properties
}
// Get edge
message GetEdgeRequest {
string id = 1;
}
message GetEdgeResponse {
Edge edge = 1;
}
// Delete edge
message DeleteEdgeRequest {
string id = 1;
}
3. Traversal Operations
// Traverse graph
message TraverseRequest {
string start_vertex_id = 1;
repeated TraversalStep steps = 2;
optional int32 max_depth = 3;
optional int32 limit = 4;
optional VertexFilter vertex_filter = 5;
optional EdgeFilter edge_filter = 6;
}
message TraversalStep {
enum Direction {
OUT = 0; // Follow outgoing edges
IN = 1; // Follow incoming edges
BOTH = 2; // Follow both directions
}
Direction direction = 1;
repeated string edge_labels = 2; // Filter by edge type
optional int32 min_hops = 3; // Minimum hops
optional int32 max_hops = 4; // Maximum hops
}
message TraverseResponse {
repeated Vertex vertices = 1;
repeated Edge edges = 2;
repeated Path paths = 3;
}
message Path {
repeated string vertex_ids = 1;
repeated string edge_ids = 2;
double weight = 3; // Path weight (if weighted)
}
4. Pathfinding Operations
// Shortest path
message ShortestPathRequest {
string start_vertex_id = 1;
string end_vertex_id = 2;
optional int32 max_hops = 3;
optional string weight_property = 4; // Edge property for weighted paths
repeated string edge_labels = 5; // Allowed edge types
}
message ShortestPathResponse {
Path path = 1;
double total_weight = 2;
}
// All paths
message AllPathsRequest {
string start_vertex_id = 1;
string end_vertex_id = 2;
int32 max_hops = 3;
int32 limit = 4; // Max paths to return
}
message AllPathsResponse {
repeated Path paths = 1;
}
5. Graph Algorithms
// PageRank
message PageRankRequest {
optional VertexFilter vertex_filter = 1;
int32 iterations = 2; // Number of iterations
double damping_factor = 3; // Damping factor (0.85 typical)
int32 limit = 4; // Top N vertices
}
message PageRankResponse {
repeated VertexScore scores = 1;
}
message VertexScore {
string vertex_id = 1;
double score = 2;
}
// Community detection
message CommunityDetectionRequest {
string algorithm = 1; // "louvain", "label_propagation", "connected_components"
optional VertexFilter vertex_filter = 2;
}
message CommunityDetectionResponse {
repeated Community communities = 1;
}
message Community {
string community_id = 1;
repeated string vertex_ids = 2;
double modularity = 3;
}
// Cycle detection
message DetectCyclesRequest {
string start_vertex_id = 1;
repeated string edge_labels = 2;
int32 max_depth = 3;
int32 min_cycle_length = 4;
}
message DetectCyclesResponse {
repeated Path cycles = 1;
}
6. Batch Operations
// Batch create vertices
message BatchCreateVerticesRequest {
repeated CreateVertexRequest vertices = 1;
bool atomic = 2; // All or nothing
}
message BatchCreateVerticesResponse {
repeated Vertex vertices = 1;
repeated Error errors = 2; // Per-vertex errors
}
// Batch create edges
message BatchCreateEdgesRequest {
repeated CreateEdgeRequest edges = 1;
bool atomic = 2;
}
message BatchCreateEdgesResponse {
repeated Edge edges = 1;
repeated Error errors = 2;
}
Data Model
message Vertex {
string id = 1;
string label = 2;
map<string, Value> properties = 3;
Timestamp created_at = 4;
Timestamp updated_at = 5;
}
message Edge {
string id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, Value> properties = 5;
Timestamp created_at = 6;
}
message Value {
oneof value {
string string_value = 1;
int64 int_value = 2;
double double_value = 3;
bool bool_value = 4;
bytes bytes_value = 5;
repeated Value list_value = 6;
map<string, Value> map_value = 7;
}
}
message VertexFilter {
repeated string labels = 1; // Filter by vertex types
map<string, ValuePredicate> property_filters = 2;
}
message EdgeFilter {
repeated string labels = 1;
map<string, ValuePredicate> property_filters = 2;
}
message ValuePredicate {
enum Operator {
EQ = 0;
NE = 1;
LT = 2;
LE = 3;
GT = 4;
GE = 5;
IN = 6;
CONTAINS = 7;
STARTS_WITH = 8;
}
Operator operator = 1;
Value value = 2;
}
Architecture
Three-Layer Model
┌─────────────────────────────────────────────────────────┐
│ Layer 3: Client API (Graph Pattern) │
│ Traverse | ShortestPath | PageRank | CreateVertex │
│ "I want to traverse 2 hops from Alice" │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 2: Pattern Composition │
│ Cache | Query Optimizer | Result Materialization │
│ "Cache traversal results in Redis" │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 1: Backend Execution │
│ Neptune (Gremlin) | Neo4j (Cypher) | JanusGraph │
│ "Translate to Gremlin and execute on Neptune" │
└─────────────────────────────────────────────────────────┘
Backend Slot Architecture
The Graph pattern can be backed by different graph databases:
| Backend | Query Language | Strengths | Use When |
|---|---|---|---|
| AWS Neptune | Gremlin, SPARQL | Fully managed, AWS integration | Production, AWS deployments |
| Neo4j | Cypher | Rich query language, large community | Self-hosted, advanced features |
| JanusGraph | Gremlin | Open source, multi-backend | Large-scale, custom storage |
| ArangoDB | AQL | Multi-model (graph + document) | Flexible data model |
| SQLite (local) | SQL emulation | Embedded, fast startup | Development, testing |
Configuration Examples
Example 1: High-Performance In-Memory Graph (MemStore)
namespaces:
- name: social-graph
pattern: graph
# Backend: MemStore (in-memory, lock-free)
backend:
type: memstore
max_vertices: 100_000_000 # 100M vertices
max_edges: 1_000_000_000 # 1B edges
# Fast snapshot loading
snapshot:
enabled: true
format: protobuf # Or jsonlines
source: s3://snapshots/social-graph-20251114.pb.gz
compression: gzip
parallel_workers: 100 # 100 parallel ingest workers
# Performance characteristics
performance:
traversal_timeout: 100ms
max_traversal_depth: 6
batch_size: 1000
Characteristics:
- 100M vertices, 1B edges in-memory
- Load 10GB snapshot in ~3 seconds (100 workers)
- 50 μs traversal latency
- Lock-free concurrent writes
- Zero coordination overhead
Use cases: Real-time fraud detection, social network analysis, recommendation engines
Example 2: Distributed Multi-Proxy Graph Cluster
# Configuration for each proxy in 10-node cluster
namespaces:
- name: social-graph-distributed
pattern: graph
backend:
type: memstore
max_vertices: 100_000_000 # 100M per proxy = 1B total
# Partitioning strategy
partitioning:
enabled: true
strategy: consistent_hash
num_partitions: 10
partition_key: vertex_id
# Snapshot loading (each proxy loads its partition)
snapshot:
enabled: true
source: s3://snapshots/social-graph-partition-${PARTITION_ID}.pb.gz
parallel_workers: 50
# Cross-proxy communication
cluster:
enabled: true
proxies:
- host: proxy-0.prism.local:50090
- host: proxy-1.prism.local:50090
- host: proxy-2.prism.local:50090
# ... (10 proxies total)
routing_strategy: consistent_hash
Characteristics:
- 10 proxy instances
- 1 billion vertices total (100M per proxy)
- Consistent hashing for vertex distribution
- Cross-proxy traversal support
- 100k traversals/sec aggregate throughput
Use cases: Large-scale social networks, knowledge graphs, global recommendation systems
Example 3: Snapshot-to-Production Pipeline
# CI/CD pipeline: Build graph snapshot → Load to production
build_snapshot:
# Step 1: Export from source databases
sources:
- type: postgres
query: "SELECT id, properties FROM users"
output: vertices-users.jsonl
- type: postgres
query: "SELECT from_id, to_id FROM friendships"
output: edges-friendships.jsonl
# Step 2: Merge and convert to protobuf
merge:
input: ["vertices-*.jsonl", "edges-*.jsonl"]
output: social-graph-snapshot.pb
compression: gzip
# Step 3: Upload to S3
upload:
destination: s3://snapshots/social-graph-${DATE}.pb.gz
deploy_to_production:
# Step 4: Prism proxy loads snapshot on startup
namespace: social-graph
backend: memstore
snapshot:
source: s3://snapshots/social-graph-${DATE}.pb.gz
parallel_workers: 100
validation:
check_vertex_count: true
check_edge_count: true
expected_vertices: 100000000
expected_edges: 1000000000
Characteristics:
- Automated snapshot generation from source databases
- Fast binary format (protobuf + gzip)
- Validation before serving traffic
- Blue-green deployment (load new snapshot, switch traffic)
Use cases: Daily graph updates, batch ETL pipelines, staging environments
Example 4: Development with Local MemStore
namespaces:
- name: social-graph-dev
pattern: graph
backend:
type: memstore
max_vertices: 10000 # Small dev dataset
# Seed data for testing
seed:
enabled: true
format: jsonlines
source: ./test/fixtures/social-graph-seed.jsonl
# Faster iteration during development
performance:
snapshot_loading_timeout: 5s
parallel_workers: 4
Characteristics:
- Small dataset for fast iteration
- Local file-based seed data
- Same API as production
- <1 second startup
Use cases: Unit tests, integration tests, local development
Example 5: Production-Grade with Distributed WAL + MemStore + Neptune
namespaces:
- name: social-graph-production
pattern: graph
# Layer 1: Distributed WAL (durability + ordering)
wal:
enabled: true
backend: kafka
topic: graph-wal
partitions: 100
replication_factor: 3
retention: 7d
acks: all # Wait for all replicas before ack
# Layer 2: MemStore (hot read path)
backend:
type: memstore
max_vertices: 100_000_000
max_edges: 1_000_000_000
# Layer 3: Neptune (cold storage, source of truth)
persistence:
enabled: true
type: neptune
cluster_endpoint: my-cluster.us-east-1.neptune.amazonaws.com
region: us-east-1
iam_auth: true
# Write path: WAL → MemStore + Neptune
write_strategy:
mode: wal_first # Append to WAL before ack
async_apply: true
memstore_workers: 10 # Workers applying WAL to MemStore
neptune_workers: 5 # Workers applying WAL to Neptune
batch_size: 1000
# Read path: MemStore → Neptune fallback
read_strategy:
mode: memstore_first
fallback_to_neptune: true
fallback_timeout: 100ms
# Recovery strategy
recovery:
source: wal # Replay WAL on startup
checkpoint_interval: 3600 # Checkpoint every hour
max_replay_duration: 600s # 10 min replay limit
Characteristics:
- Write latency: 5ms (Kafka WAL append)
- Read latency: 50 μs (MemStore) or 30ms (Neptune fallback)
- Durability: Guaranteed (WAL with acks=all)
- Consistency: Strong (ordered WAL)
- Recovery: Replay WAL from last checkpoint
- Throughput: 100k writes/sec, 1M reads/sec
Use cases: Production systems requiring durability, disaster recovery, audit trails, compliance
Pattern Composition
Graph + Cache Pattern
Use Case: Frequently accessed subgraphs (user profiles, friend lists)
namespaces:
- name: social-graph
pattern: graph
backend:
type: neptune
# Layer 2: Look-aside cache
patterns:
- type: cache
strategy: look-aside
backend: redis
ttl: 300
operations: [GetVertex, Traverse]
key_pattern: "graph:{operation}:{params_hash}"
Data Flow:
sequenceDiagram
participant App
participant Prism
participant Cache as Redis Cache
participant Neptune as Neptune Backend
App->>Prism: Traverse(user:alice, 2 hops)
Prism->>Cache: Check cache key
alt Cache Hit
Cache-->>Prism: Cached traversal result
Prism-->>App: Return vertices
else Cache Miss
Prism->>Neptune: Execute Gremlin traversal
Neptune-->>Prism: Vertices [Bob, Charlie, Dave]
Prism->>Cache: Store result (TTL=300s)
Prism-->>App: Return vertices
end
Graph + CDC Pattern
Use Case: Sync graph changes to search index (Elasticsearch)
namespaces:
- name: knowledge-graph
pattern: graph
backend:
type: neptune
# CDC to stream graph changes
cdc:
enabled: true
destination: kafka
topic: graph-changes
operations: [CreateVertex, UpdateVertex, DeleteVertex, CreateEdge, DeleteEdge]
# Consumer: Elasticsearch indexer
consumers:
- name: es-indexer
type: custom
backend: elasticsearch
index: knowledge-graph
mapping:
vertex: document
edge: relationship
Benefits:
- Keep search index synchronized with graph
- Full-text search on graph data
- No manual sync code
Graph + Query Optimizer Pattern
Use Case: Automatically optimize complex traversals
namespaces:
- name: social-graph
pattern: graph
backend:
type: neptune
patterns:
- type: query-optimizer
strategies:
- name: index-hints
enabled: true
- name: traversal-pruning
enabled: true
max_branches: 100
- name: result-streaming
enabled: true
batch_size: 50
Optimizations:
- Add index hints for vertex lookups
- Prune traversals with too many branches
- Stream large result sets instead of materializing
Client API Examples
⚠️ NOTE: Python examples below are for illustration only. Production client libraries will be in Go (primary) and Rust (secondary).
Python Client API (Conceptual)
from prism import Client, TraversalStep, Direction
client = Client(namespace="social-graph")
# Create vertices
alice = client.graph.create_vertex(
id="user:alice",
label="User",
properties={"name": "Alice", "joined": "2024-01-01"}
)
bob = client.graph.create_vertex(
id="user:bob",
label="User",
properties={"name": "Bob", "joined": "2024-02-01"}
)
# Create edge
client.graph.create_edge(
id="follow:1",
label="FOLLOWS",
from_vertex_id="user:alice",
to_vertex_id="user:bob",
properties={"since": "2024-03-01"}
)
# Traverse: Find friends of friends
fof = client.graph.traverse(
start_vertex_id="user:alice",
steps=[
TraversalStep(direction=Direction.OUT, edge_labels=["FOLLOWS"]),
TraversalStep(direction=Direction.OUT, edge_labels=["FOLLOWS"])
],
limit=10
)
print(f"Friends of friends: {[v.id for v in fof.vertices]}")
# Shortest path
path = client.graph.shortest_path(
start_vertex_id="user:alice",
end_vertex_id="user:charlie",
max_hops=6
)
print(f"Path length: {len(path.vertex_ids)}")
# PageRank
influential = client.graph.pagerank(
vertex_filter={"labels": ["User"]},
iterations=20,
damping_factor=0.85,
limit=10
)
for score in influential.scores:
print(f"{score.vertex_id}: {score.score}")
Go Client API
import "github.com/prism/client-go/graph"
func main() {
client := prism.NewClient("social-graph")
// Create vertex
alice, err := client.Graph.CreateVertex(ctx, graph.CreateVertexRequest{
ID: "user:alice",
Label: "User",
Properties: map[string]interface{}{
"name": "Alice",
"joined": "2024-01-01",
},
})
// Traverse
result, err := client.Graph.Traverse(ctx, graph.TraverseRequest{
StartVertexID: "user:alice",
Steps: []graph.TraversalStep{
{Direction: graph.DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
{Direction: graph.DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
},
Limit: 10,
})
for _, vertex := range result.Vertices {
fmt.Printf("Friend of friend: %s\n", vertex.ID)
}
// Shortest path
path, err := client.Graph.ShortestPath(ctx, graph.ShortestPathRequest{
StartVertexID: "user:alice",
EndVertexID: "user:charlie",
MaxHops: 6,
})
fmt.Printf("Path length: %d hops\n", len(path.VertexIDs))
}
Rust Client API
use prism::{Client, graph::{TraversalStep, Direction}};
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new("social-graph").await?;
// Create vertex
let alice = client.graph.create_vertex(CreateVertexRequest {
id: "user:alice".into(),
label: "User".into(),
properties: hashmap!{
"name" => "Alice".into(),
"joined" => "2024-01-01".into(),
},
}).await?;
// Traverse
let result = client.graph.traverse(TraverseRequest {
start_vertex_id: "user:alice".into(),
steps: vec![
TraversalStep {
direction: Direction::Out,
edge_labels: vec!["FOLLOWS".into()],
..Default::default()
},
TraversalStep {
direction: Direction::Out,
edge_labels: vec!["FOLLOWS".into()],
..Default::default()
},
],
limit: Some(10),
..Default::default()
}).await?;
for vertex in result.vertices {
println!("Friend of friend: {}", vertex.id);
}
Ok(())
}
Distributed In-Memory Graph with MemStore
Architecture Overview
The Graph pattern uses MemStore as its backend for ultra-fast, lock-free, distributed graph construction and traversal. This enables building graphs from multi-megabyte snapshots in parallel without coordination overhead.
MemStore Graph Encoding
Vertices stored as hashes:
Key: "v:{vertex_id}"
Hash fields:
- "label" → vertex type (e.g., "User")
- "name" → property value
- "email" → property value
- ... (arbitrary properties)
Outgoing edges stored as sets:
Key: "v:{vertex_id}:out:{edge_label}"
Set members: ["{target_id_1}", "{target_id_2}", ...]
Incoming edges stored as sets (for bidirectional traversal):
Key: "v:{vertex_id}:in:{edge_label}"
Set members: ["{source_id_1}", "{source_id_2}", ...]
Edge properties stored as hashes:
Key: "e:{edge_id}"
Hash fields:
- "from" → source vertex ID
- "to" → target vertex ID
- "label" → edge type
- ... (edge properties)
Example Graph Encoding
Social graph:
Alice (user:alice):
v:user:alice → {label: "User", name: "Alice", joined: "2024-01-01"}
v:user:alice:out:FOLLOWS → ["user:bob", "user:charlie"]
v:user:alice:in:FOLLOWS → ["user:dave"]
Bob (user:bob):
v:user:bob → {label: "User", name: "Bob", joined: "2024-02-01"}
v:user:bob:out:FOLLOWS → ["user:charlie"]
v:user:bob:in:FOLLOWS → ["user:alice"]
Edge (follow:1):
e:follow:1 → {from: "user:alice", to: "user:bob", label: "FOLLOWS", since: "2024-03-01"}
Why MemStore for Graphs?
- Lock-Free Concurrency:
sync.Mapenables thousands of concurrent writes - Zero Coordination: No distributed locks, no leader election
- Instant Reads: In-memory, sub-microsecond access
- Simple Encoding: Hash/Set operations map naturally to graph structure
- Horizontal Scaling: Each Prism proxy instance has independent memstore
Fast Parallel Graph Construction from Snapshots
Problem: Loading Multi-Megabyte Snapshots
Scenario: Load 10GB social graph snapshot with 100M vertices and 1B edges in <60 seconds.
Traditional Approach (Sequential):
Parse snapshot → For each vertex → Write to DB → For each edge → Write to DB
Time: 100M vertices × 0.1ms = 10,000 seconds (2.7 hours)
Prism Map-Reduce Approach (Parallel):
1. Partition snapshot by vertex ID ranges
2. Map phase: N workers read partitions in parallel
3. Each worker writes to memstore (lock-free)
4. No reduce phase needed (writes are independent)
Time: 10GB / 100 workers / 100 MB/s = 1 second (read) + 2 seconds (write) = 3 seconds
Partitioning Strategy
Snapshot format (JSON lines):
{"type":"vertex","id":"user:000001","label":"User","properties":{"name":"Alice"}}
{"type":"edge","id":"follow:000001","from":"user:000001","to":"user:000002","label":"FOLLOWS"}
{"type":"vertex","id":"user:000002","label":"User","properties":{"name":"Bob"}}
...
Partition by hash (consistent hashing):
Partition 0: vertices with hash(id) % 100 == 0
Partition 1: vertices with hash(id) % 100 == 1
...
Partition 99: vertices with hash(id) % 100 == 99
Each worker:
- Reads assigned partition from snapshot file (byte ranges)
- Parses vertices/edges
- Writes to memstore (no locks!)
Map Job Implementation
// Parallel snapshot ingestion with map workers
func IngestSnapshot(snapshotPath string, numWorkers int, client *prism.Client) error {
file, _ := os.Open(snapshotPath)
fileInfo, _ := file.Stat()
fileSize := fileInfo.Size()
// Partition file into byte ranges
chunkSize := fileSize / int64(numWorkers)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
startOffset := int64(i) * chunkSize
endOffset := startOffset + chunkSize
if i == numWorkers-1 {
endOffset = fileSize // Last worker gets remainder
}
go func(workerID int, start, end int64) {
defer wg.Done()
// Read partition
reader := bufio.NewReader(io.NewSectionReader(file, start, end-start))
// Batch writes for efficiency
vertexBatch := make([]CreateVertexRequest, 0, 1000)
edgeBatch := make([]CreateEdgeRequest, 0, 1000)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
var obj map[string]interface{}
json.Unmarshal([]byte(line), &obj)
if obj["type"] == "vertex" {
vertexBatch = append(vertexBatch, CreateVertexRequest{
ID: obj["id"].(string),
Label: obj["label"].(string),
Properties: obj["properties"].(map[string]interface{}),
})
if len(vertexBatch) >= 1000 {
client.Graph.BatchCreateVertices(ctx, vertexBatch)
vertexBatch = vertexBatch[:0]
}
} else if obj["type"] == "edge" {
edgeBatch = append(edgeBatch, CreateEdgeRequest{
ID: obj["id"].(string),
Label: obj["label"].(string),
FromVertexID: obj["from"].(string),
ToVertexID: obj["to"].(string),
Properties: obj["properties"].(map[string]interface{}),
})
if len(edgeBatch) >= 1000 {
client.Graph.BatchCreateEdges(ctx, edgeBatch)
edgeBatch = edgeBatch[:0]
}
}
}
// Flush remaining batches
if len(vertexBatch) > 0 {
client.Graph.BatchCreateVertices(ctx, vertexBatch)
}
if len(edgeBatch) > 0 {
client.Graph.BatchCreateEdges(ctx, edgeBatch)
}
fmt.Printf("Worker %d: Processed %d-%d bytes\n", workerID, start, end)
}(i, startOffset, endOffset)
}
wg.Wait()
return nil
}
Proxy MemStore Implementation
BatchCreateVertices (lock-free):
func (g *GraphPattern) BatchCreateVertices(ctx context.Context, vertices []CreateVertexRequest) error {
// No locks needed - sync.Map handles concurrency
for _, v := range vertices {
// Store vertex properties as hash
properties := make(map[string][]byte)
properties["label"] = []byte(v.Label)
for k, val := range v.Properties {
properties[k] = serializeValue(val)
}
// Write to memstore (lock-free)
memstore.HMSet(ctx, fmt.Sprintf("v:%s", v.ID), properties)
}
return nil
}
BatchCreateEdges (lock-free):
func (g *GraphPattern) BatchCreateEdges(ctx context.Context, edges []CreateEdgeRequest) error {
for _, e := range edges {
// Add to outgoing edge set (lock-free)
outKey := fmt.Sprintf("v:%s:out:%s", e.FromVertexID, e.Label)
memstore.SAdd(ctx, outKey, []string{e.ToVertexID})
// Add to incoming edge set (lock-free)
inKey := fmt.Sprintf("v:%s:in:%s", e.ToVertexID, e.Label)
memstore.SAdd(ctx, inKey, []string{e.FromVertexID})
// Store edge properties if any
if len(e.Properties) > 0 {
properties := make(map[string][]byte)
properties["from"] = []byte(e.FromVertexID)
properties["to"] = []byte(e.ToVertexID)
properties["label"] = []byte(e.Label)
for k, val := range e.Properties {
properties[k] = serializeValue(val)
}
memstore.HMSet(ctx, fmt.Sprintf("e:%s", e.ID), properties)
}
}
return nil
}
Performance Characteristics
Single-threaded baseline:
- Create vertex: 10 μs
- Create edge: 15 μs
- 100M vertices: 1000 seconds
- 1B edges: 15000 seconds
- Total: ~4.4 hours
100-worker parallel (memstore):
- Throughput: 10M vertices/sec (100 workers × 100k vertices/sec)
- 100M vertices: 10 seconds
- 1B edges: 100 seconds
- Total: ~2 minutes (130× speedup)
Key factors:
- Lock-free writes (no contention)
- Partitioned workload (no coordination)
- In-memory writes (no disk I/O)
- Batch operations (1000 per call)
Graph Traversal with MemStore
Traverse operation (2 hops):
func (g *GraphPattern) Traverse(ctx context.Context, req *TraverseRequest) (*TraverseResponse, error) {
currentLevel := []string{req.StartVertexID}
visited := make(map[string]bool)
for _, step := range req.Steps {
nextLevel := []string{}
for _, vertexID := range currentLevel {
if visited[vertexID] {
continue // Skip already visited
}
visited[vertexID] = true
// Get outgoing edges from memstore (set operation)
var edgeKey string
if step.Direction == DirectionOUT {
edgeKey = fmt.Sprintf("v:%s:out:%s", vertexID, step.EdgeLabel)
} else {
edgeKey = fmt.Sprintf("v:%s:in:%s", vertexID, step.EdgeLabel)
}
// SMembers is lock-free read
neighbors, _ := memstore.SMembers(ctx, edgeKey)
nextLevel = append(nextLevel, neighbors...)
}
currentLevel = nextLevel
}
// Fetch vertex details (parallel)
vertices := make([]Vertex, 0, len(currentLevel))
for _, vertexID := range currentLevel {
properties, _ := memstore.HGetAll(ctx, fmt.Sprintf("v:%s", vertexID))
vertices = append(vertices, Vertex{
ID: vertexID,
Label: string(properties["label"]),
Properties: deserializeProperties(properties),
})
if len(vertices) >= int(req.Limit) {
break
}
}
return &TraverseResponse{Vertices: vertices}, nil
}
Latency (in-memory):
- 1-hop traversal: 50 μs
- 2-hop traversal: 200 μs
- 3-hop traversal: 1 ms
Distributed Graph Across Multiple Proxies
Scenario: 1 billion vertices across 10 Prism proxy instances.
Partitioning strategy (consistent hashing):
Proxy 0: Vertices with hash(id) % 10 == 0 (100M vertices)
Proxy 1: Vertices with hash(id) % 10 == 1 (100M vertices)
...
Proxy 9: Vertices with hash(id) % 10 == 9 (100M vertices)
Query routing:
func (c *Client) GetVertex(ctx context.Context, vertexID string) (*Vertex, error) {
// Determine which proxy owns this vertex
proxyID := hash(vertexID) % numProxies
proxy := c.proxies[proxyID]
// Route query to owning proxy
return proxy.Graph.GetVertex(ctx, vertexID)
}
Multi-hop traversal (cross-proxy):
func (c *Client) Traverse(ctx context.Context, req *TraverseRequest) (*TraverseResponse, error) {
currentLevel := []string{req.StartVertexID}
for _, step := range req.Steps {
// Group vertices by owning proxy
verticesByProxy := groupByProxy(currentLevel)
// Fan-out to all proxies in parallel
var wg sync.WaitGroup
results := make(chan []string, len(verticesByProxy))
for proxyID, vertexIDs := range verticesByProxy {
wg.Add(1)
go func(pID int, vIDs []string) {
defer wg.Done()
// Each proxy traverses its local vertices
neighbors := c.proxies[pID].Graph.TraverseLocal(ctx, vIDs, step)
results <- neighbors
}(proxyID, vertexIDs)
}
wg.Wait()
close(results)
// Merge results from all proxies
nextLevel := []string{}
for neighbors := range results {
nextLevel = append(nextLevel, neighbors...)
}
currentLevel = nextLevel
}
return &TraverseResponse{Vertices: fetchVertexDetails(currentLevel)}, nil
}
Performance (10 proxy cluster):
- Single proxy query: 50 μs
- Cross-proxy 2-hop: 300 μs (includes network round-trip)
- Aggregate throughput: 100k traversals/sec (10k per proxy × 10 proxies)
Snapshot Format Specification
Efficient binary format (for fast parsing):
message GraphSnapshot {
repeated VertexRecord vertices = 1;
repeated EdgeRecord edges = 2;
}
message VertexRecord {
string id = 1;
string label = 2;
map<string, bytes> properties = 3;
}
message EdgeRecord {
string id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, bytes> properties = 5;
}
Streaming format (line-delimited protobuf):
Each line: varint(message_length) + serialized_protobuf_message
Advantages:
- Fast parsing (no JSON overhead)
- Type-safe (protobuf schema)
- Seekable (can jump to byte offsets)
- Compressible (gzip/zstd)
Distributed WAL Architecture (Production-Grade)
Three-Layer Architecture
┌─────────────────────────────────────────────────────────────┐
│ Client Applications │
└────────────────────┬────────────────────────────────────────┘
│ CreateVertex, CreateEdge, Traverse
▼
┌─────────────────────────────────────────────────────────────┐
│ Prism Proxy Cluster (10 nodes) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 1: Distributed WAL (Kafka) │ │
│ │ - Append writes to partitioned topic │ │
│ │ - Acks=all (replicated to 3 brokers) │ │
│ │ - Guarantees: Durability + Ordering │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ Background Workers │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 2: MemStore (In-Memory) │ │
│ │ - Hot read path (50 μs latency) │ │
│ │ - Lock-free concurrent access │ │
│ │ - 100M vertices per proxy │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ Async replication │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 3: Neptune (Persistent Storage) │ │
│ │ - Source of truth for graph │ │
│ │ - Fallback for cache misses │ │
│ │ - Snapshot generation │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Write Path (WAL-First)
sequenceDiagram
participant Client
participant Proxy as Prism Proxy
participant WAL as Kafka WAL
participant MemStore
participant Neptune
Note over Client,Neptune: Write Path (CreateVertex)
Client->>Proxy: CreateVertex(id, label, properties)
Note over Proxy: 1. Validate request
Proxy->>Proxy: Check schema, validate properties
Note over Proxy,WAL: 2. Append to WAL (durable)
Proxy->>WAL: Produce(topic="graph-wal", key=vertex_id, value=operation)
WAL->>WAL: Replicate to 3 brokers (acks=all)
WAL-->>Proxy: Offset 12345678
Note over Proxy: 3. Acknowledge to client (write is durable)
Proxy-->>Client: Success (latency: 5ms)
Note over Proxy,MemStore: 4. Background: Apply to MemStore
Proxy->>MemStore: HMSet(v:vertex_id, properties)
MemStore-->>Proxy: OK
Note over Proxy,Neptune: 5. Background: Replicate to Neptune
Proxy->>Neptune: g.addV(label).property(id, vertex_id)...
Neptune-->>Proxy: OK
Note over Proxy: 6. Checkpoint WAL offset
Proxy->>Proxy: Save offset 12345678 to checkpoint
Key guarantees:
- Durability: Write acknowledged only after WAL replication (acks=all)
- No data loss: WAL persisted before ack, survives proxy crashes
- Ordering: Kafka partition ordering ensures sequential application
- Async apply: MemStore/Neptune updates don't block client
Read Path (MemStore-First with Fallback)
sequenceDiagram
participant Client
participant Proxy as Prism Proxy
participant MemStore
participant Neptune
Note over Client,Neptune: Read Path (GetVertex)
Client->>Proxy: GetVertex(id="user:alice")
Note over Proxy,MemStore: 1. Try MemStore (hot path)
Proxy->>MemStore: HGetAll(v:user:alice)
alt Vertex in MemStore (99.9% case)
MemStore-->>Proxy: {label: "User", name: "Alice", ...}
Proxy-->>Client: Vertex (latency: 50 μs)
else Cache miss (0.1% case)
MemStore-->>Proxy: nil (not found)
Note over Proxy,Neptune: 2. Fallback to Neptune
Proxy->>Neptune: g.V('user:alice').valueMap()
Neptune-->>Proxy: {label: "User", name: "Alice", ...}
Note over Proxy,MemStore: 3. Populate MemStore for next read
Proxy->>MemStore: HMSet(v:user:alice, properties)
Proxy-->>Client: Vertex (latency: 30ms)
end
Read characteristics:
- 99.9% cache hit rate: Most reads served from MemStore (50 μs)
- 0.1% cache miss: Fallback to Neptune, then populate MemStore
- No blocking: Reads never wait for WAL application
WAL Consumer Architecture
Per-proxy consumers (pull model):
// Each proxy runs WAL consumer workers
func (p *Proxy) StartWALConsumers() {
// Consumer group: Each proxy consumes from assigned partitions
consumer := kafka.NewConsumer(kafka.ConsumerConfig{
GroupID: fmt.Sprintf("graph-wal-consumer-proxy-%d", p.ID),
Topics: []string{"graph-wal"},
AutoCommit: false, // Manual commit after apply
})
// Start multiple workers for parallel processing
for i := 0; i < p.Config.WAL.MemStoreWorkers; i++ {
go p.applyWALToMemStore(consumer)
}
for i := 0; i < p.Config.WAL.NeptuneWorkers; i++ {
go p.applyWALToNeptune(consumer)
}
}
func (p *Proxy) applyWALToMemStore(consumer *kafka.Consumer) {
batch := make([]*WALOperation, 0, 1000)
for msg := range consumer.Messages() {
op := &WALOperation{}
proto.Unmarshal(msg.Value, op)
batch = append(batch, op)
// Apply batch every 1000 operations or 100ms
if len(batch) >= 1000 || time.Since(lastFlush) > 100*time.Millisecond {
p.applyBatchToMemStore(batch)
consumer.CommitOffset(msg.Offset)
batch = batch[:0]
}
}
}
func (p *Proxy) applyBatchToMemStore(batch []*WALOperation) {
for _, op := range batch {
switch op.Type {
case OperationType_CREATE_VERTEX:
p.memstore.HMSet(
fmt.Sprintf("v:%s", op.Vertex.ID),
serializeProperties(op.Vertex.Properties),
)
case OperationType_CREATE_EDGE:
p.memstore.SAdd(
fmt.Sprintf("v:%s:out:%s", op.Edge.FromVertexID, op.Edge.Label),
[]string{op.Edge.ToVertexID},
)
p.memstore.SAdd(
fmt.Sprintf("v:%s:in:%s", op.Edge.ToVertexID, op.Edge.Label),
[]string{op.Edge.FromVertexID},
)
}
}
}
WAL operation protobuf:
message WALOperation {
enum OperationType {
CREATE_VERTEX = 0;
UPDATE_VERTEX = 1;
DELETE_VERTEX = 2;
CREATE_EDGE = 3;
DELETE_EDGE = 4;
}
OperationType type = 1;
int64 timestamp = 2;
string transaction_id = 3;
oneof operation {
VertexRecord vertex = 4;
EdgeRecord edge = 5;
}
}
Crash Recovery (Replay from WAL)
On proxy restart:
func (p *Proxy) Recover(ctx context.Context) error {
// 1. Load last checkpoint
checkpoint, err := p.loadCheckpoint()
if err != nil {
return fmt.Errorf("failed to load checkpoint: %w", err)
}
log.Infof("Starting recovery from offset %d", checkpoint.LastOffset)
// 2. Replay WAL from checkpoint to latest
consumer := kafka.NewConsumer(kafka.ConsumerConfig{
GroupID: fmt.Sprintf("graph-recovery-proxy-%d", p.ID),
Topics: []string{"graph-wal"},
})
// Seek to checkpoint offset
consumer.Seek("graph-wal", checkpoint.Partition, checkpoint.LastOffset)
replayCount := 0
startTime := time.Now()
for {
msg, err := consumer.Poll(100 * time.Millisecond)
if err != nil || msg == nil {
break // Reached end of log
}
// Apply operation to MemStore
op := &WALOperation{}
proto.Unmarshal(msg.Value, op)
p.applyOperationToMemStore(op)
replayCount++
// Safety: Stop after 10 minutes
if time.Since(startTime) > 10*time.Minute {
log.Warnf("Replay timeout after %d operations", replayCount)
break
}
}
log.Infof("Recovery complete: replayed %d operations in %v",
replayCount, time.Since(startTime))
// 3. Resume normal operation
p.StartWALConsumers()
return nil
}
Checkpoint format (stored in Neptune or S3):
checkpoint:
proxy_id: 3
partition: 42
last_offset: 12345678
timestamp: 2025-11-14T10:30:00Z
memstore_vertex_count: 10000000
memstore_edge_count: 50000000
Cross-Cluster Consistency
Scenario: 10 proxies, each with independent MemStore
Challenge: How to ensure consistent reads across proxies?
Solution 1: Consistent Hashing (Recommended)
// Each vertex assigned to specific proxy
func (c *Client) getVertexOwner(vertexID string) int {
hash := fnv.New64a()
hash.Write([]byte(vertexID))
return int(hash.Sum64() % uint64(c.NumProxies))
}
// Always route vertex queries to owning proxy
func (c *Client) GetVertex(ctx context.Context, vertexID string) (*Vertex, error) {
proxyID := c.getVertexOwner(vertexID)
proxy := c.proxies[proxyID]
return proxy.Graph.GetVertex(ctx, vertexID)
}
Benefits:
- Each vertex has single source of truth (one proxy)
- No consistency problems (no replication)
- Partitioned WAL: Each proxy consumes its partition
Solution 2: Eventually Consistent Replication (Alternative)
# All proxies replicate entire graph
wal:
replication_mode: broadcast
# Every proxy consumes entire WAL
consumer_group: graph-wal-consumers # Shared group
Trade-offs:
- ✅ Any proxy can serve any vertex
- ✅ No cross-proxy routing needed
- ❌ 10× memory usage (full graph on each proxy)
- ❌ Eventual consistency (WAL lag)
Performance Characteristics (Three-Layer)
| Operation | Latency | Throughput | Path |
|---|---|---|---|
| Write (CreateVertex) | 5ms P50, 15ms P99 | 100k/sec | Kafka WAL append |
| Read (MemStore hit) | 50 μs P50, 200 μs P99 | 1M/sec | MemStore |
| Read (Neptune fallback) | 30ms P50, 100ms P99 | 10k/sec | Neptune query |
| Traverse (2-hop, MemStore) | 200 μs P50, 1ms P99 | 500k/sec | MemStore sets |
| Traverse (2-hop, cross-proxy) | 500 μs P50, 2ms P99 | 200k/sec | RPC fan-out |
| Recovery (replay WAL) | ~10 min for 10M ops | - | Kafka consume |
Write amplification:
- 1 write → 3 WAL replicas (Kafka)
- 1 write → 1 MemStore update (async)
- 1 write → 1 Neptune write (async)
- Total: 5× amplification (acceptable for durability)
Memory usage (per proxy):
- 100M vertices × 100 bytes = 10 GB
- 1B edges × 20 bytes = 20 GB
- Total: ~30 GB per proxy
Snapshot-Based Updates with Set Difference (Fastest Path)
Problem: Incremental Graph Updates
Scenario: Update 1M vertices out of 100M daily (1% churn)
Traditional approach (WAL replay):
- Replay 1M operations from WAL
- Time: 1M ops × 5ms = 5000 seconds (~1.4 hours)
Snapshot-based approach (set difference):
- Compute diff between snapshots
- Apply only changes
- Time: <10 seconds
Architecture: Snapshot Diff Engine
┌────────────────────────────────────────────────────────────┐
│ Graph Update Pipeline │
├───────────── ───────────────────────────────────────────────┤
│ │
│ 1. Snapshot T1 (yesterday) │
│ vertices_t1 = {v1, v2, v3, ..., v100M} │
│ edges_t1 = {e1, e2, e3, ..., e1B} │
│ │
│ 2. Snapshot T2 (today) │
│ vertices_t2 = {v1, v2', v3, ..., v100M, v100M+1} │
│ edges_t2 = {e1, e2, e3', ..., e1B, e1B+1} │
│ │
│ 3. Compute Set Difference (distributed) │
│ added_vertices = vertices_t2 - vertices_t1 │
│ removed_vertices = vertices_t1 - vertices_t2 │
│ modified_vertices = vertices_t2 ∩ vertices_t1 (changed)│
│ │
│ 4. Apply Changes to MemStore (parallel) │
│ - SAdd for new vertices/edges │
│ - SRem for removed vertices/edges │
│ - HMSet for modified properties │
│ │
└────────────────────────────────────────────────────────────┘
Custom Snapshot Format
Prism snapshot format (space-efficient):
message GraphSnapshotV2 {
SnapshotMetadata metadata = 1;
VertexSet vertices = 2;
EdgeSet edges = 3;
}
message SnapshotMetadata {
string snapshot_id = 1;
int64 timestamp = 2;
int64 vertex_count = 3;
int64 edge_count = 4;
string previous_snapshot_id = 5; // For diff chaining
}
message VertexSet {
// Bloom filter for fast existence checks
bytes bloom_filter = 1;
// Sorted vertex IDs (enables binary search)
repeated string ids = 2;
// Properties stored separately (columnar)
map<string, VertexProperties> properties_by_id = 3;
}
message EdgeSet {
bytes bloom_filter = 1;
// Adjacency list format (space-efficient)
repeated AdjacencyList adjacency_lists = 2;
}
message AdjacencyList {
string vertex_id = 1;
repeated Edge outgoing_edges = 2;
}
Storage format:
- Compression: zstd (3:1 ratio typical)
- Partitioning: By vertex ID hash (parallel processing)
- Indexing: Bloom filters for fast membership tests
Set Difference Algorithm (Distributed)
// Compute vertex differences between two snapshots
func ComputeVertexDiff(snapshotT1, snapshotT2 *GraphSnapshot) (*VertexDiff, error) {
// 1. Load both snapshots into Sets
set1 := NewVertexSet(snapshotT1)
set2 := NewVertexSet(snapshotT2)
// 2. Compute differences (parallel)
var wg sync.WaitGroup
added := make([]string, 0)
removed := make([]string, 0)
modified := make([]string, 0)
// Added: in T2 but not in T1
wg.Add(1)
go func() {
defer wg.Done()
for id := range set2.Vertices {
if !set1.Contains(id) {
added = append(added, id)
}
}
}()
// Removed: in T1 but not in T2
wg.Add(1)
go func() {
defer wg.Done()
for id := range set1.Vertices {
if !set2.Contains(id) {
removed = append(removed, id)
}
}
}()
// Modified: in both but properties changed
wg.Add(1)
go func() {
defer wg.Done()
for id := range set2.Vertices {
if set1.Contains(id) {
props1 := set1.Properties[id]
props2 := set2.Properties[id]
if !propsEqual(props1, props2) {
modified = append(modified, id)
}
}
}
}()
wg.Wait()
return &VertexDiff{
Added: added,
Removed: removed,
Modified: modified,
}, nil
}
Applying Diff to MemStore (Parallel)
// Apply vertex diff to MemStore using batch operations
func (p *Proxy) ApplyVertexDiff(ctx context.Context, diff *VertexDiff, snapshot *GraphSnapshot) error {
// Parallel apply with worker pool
numWorkers := 100
work := make(chan string, numWorkers*10)
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
batch := make([]string, 0, 1000)
for vertexID := range work {
batch = append(batch, vertexID)
if len(batch) >= 1000 {
p.applyVertexBatch(ctx, batch, snapshot)
batch = batch[:0]
}
}
if len(batch) > 0 {
p.applyVertexBatch(ctx, batch, snapshot)
}
}()
}
// 1. Remove vertices (fastest: just delete keys)
for _, id := range diff.Removed {
p.memstore.Delete(fmt.Sprintf("v:%s", id))
}
// 2. Add new vertices
for _, id := range diff.Added {
work <- id
}
// 3. Update modified vertices
for _, id := range diff.Modified {
work <- id
}
close(work)
wg.Wait()
return nil
}
func (p *Proxy) applyVertexBatch(ctx context.Context, batch []string, snapshot *GraphSnapshot) {
for _, id := range batch {
vertex := snapshot.Vertices[id]
props := make(map[string][]byte)
for k, v := range vertex.Properties {
props[k] = serializeValue(v)
}
// Lock-free write to MemStore
p.memstore.HMSet(ctx, fmt.Sprintf("v:%s", id), props)
}
}
Performance: WAL vs Snapshot Diff
| Approach | Update Size | Time | Throughput | Best For |
|---|---|---|---|---|
| WAL Replay | 1K ops | 5s | 200 ops/s | Real-time, small updates |
| WAL Replay | 1M ops | 5000s (1.4h) | 200 ops/s | ❌ Too slow |
| Snapshot Diff | 1K ops | 10s | 100 ops/s | ❌ Overhead too high |
| Snapshot Diff | 1M ops | 10s | 100k ops/s | ✅ Bulk updates |
| Snapshot Diff | 100M ops | 100s | 1M ops/s | ✅ Full refresh |
Key insight: Snapshot diff has O(1) apply time per change (parallel batching), WAL replay is O(n).
Hybrid Strategy (Best of Both Worlds)
namespaces:
- name: social-graph-production
pattern: graph
# Real-time writes: Use WAL
wal:
enabled: true
backend: kafka
topic: graph-wal
# Bulk updates: Use snapshot diff
snapshot_updates:
enabled: true
schedule: "0 2 * * *" # Daily at 2 AM
source: s3://snapshots/social-graph-${DATE}.pb.zst
strategy: set_difference
parallel_workers: 100
# Cutover strategy
update_mode: hybrid
# - Real-time: WAL for minute-to-minute changes
# - Daily: Snapshot diff for bulk reconciliation
wal_retention: 24h # Only keep 1 day of WAL
Timeline:
Day 1:
00:00 - Load snapshot (100M vertices)
00:01 - Start serving traffic (reads from MemStore)
00:01 - Apply WAL for real-time updates
Day 2:
02:00 - Download new snapshot (100M + 1M new)
02:01 - Compute diff (1M added, 500K modified, 200K removed)
02:11 - Apply diff to MemStore (10 seconds)
02:11 - Truncate WAL (no longer needed)
02:11 - Resume real-time WAL application
Set Operations in MemStore
MemStore's set operations are perfect for graph diffs:
// Add outgoing edges (set union)
func (p *Proxy) AddOutgoingEdges(vertexID string, label string, targets []string) {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
p.memstore.SAdd(key, targets) // Lock-free set add
}
// Remove edges (set difference)
func (p *Proxy) RemoveOutgoingEdges(vertexID string, label string, targets []string) {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
p.memstore.SRem(key, targets) // Lock-free set remove
}
// Get neighbors (set membership)
func (p *Proxy) GetNeighbors(vertexID string, label string) []string {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
return p.memstore.SMembers(key) // Lock-free set read
}
// Compute edge diff
func ComputeEdgeDiff(edgesT1, edgesT2 []string) (added, removed []string) {
set1 := make(map[string]bool)
set2 := make(map[string]bool)
for _, e := range edgesT1 { set1[e] = true }
for _, e := range edgesT2 { set2[e] = true }
// Added edges
for e := range set2 {
if !set1[e] {
added = append(added, e)
}
}
// Removed edges
for e := range set1 {
if !set2[e] {
removed = append(removed, e)
}
}
return added, removed
}
Snapshot Diff Optimizations
1. Bloom Filters (membership tests):
// Fast negative check: "Is vertex in snapshot?"
if !snapshot.BloomFilter.Contains(vertexID) {
// Definitely not in snapshot, skip diff computation
return nil
}
2. Sorted IDs (binary search):
// O(log n) lookup instead of O(n)
func (s *VertexSet) Contains(id string) bool {
idx := sort.SearchStrings(s.SortedIDs, id)
return idx < len(s.SortedIDs) && s.SortedIDs[idx] == id
}
3. Columnar Properties (avoid deserializing unchanged properties):
// Only deserialize properties for modified vertices
if vertexModified(id) {
props := deserializeProperties(snapshot.Properties[id])
}
4. Parallel Partitioning (independent diff computation):
// Each worker computes diff for its partition
func ComputeDiffPartitioned(snapshotT1, snapshotT2 *GraphSnapshot, numPartitions int) []*VertexDiff {
diffs := make([]*VertexDiff, numPartitions)
var wg sync.WaitGroup
for i := 0; i < numPartitions; i++ {
wg.Add(1)
go func(partition int) {
defer wg.Done()
diffs[partition] = computeDiffForPartition(snapshotT1, snapshotT2, partition)
}(i)
}
wg.Wait()
return diffs
}
Result: 100× Faster Bulk Updates
Traditional WAL approach:
- 1M operations × 5ms = 5000 seconds (1.4 hours)
Snapshot diff approach:
- Compute diff: 5 seconds (parallel)
- Apply changes: 10 seconds (100 workers)
- Total: 15 seconds (333× faster!)
Best for:
- Daily batch updates from source databases
- Full graph refreshes
- Schema migrations
- Bulk corrections
Performance Characteristics
Operation Latency (Neptune Backend)
| Operation | Latency (P50) | Latency (P99) | Use Case |
|---|---|---|---|
| GetVertex | 5ms | 15ms | Fetch single vertex |
| CreateVertex | 10ms | 25ms | Write vertex |
| Traverse (2 hops) | 30ms | 100ms | Friends of friends |
| ShortestPath | 50ms | 200ms | Connection path |
| PageRank (10k vertices) | 5s | 15s | Influence analysis |
Optimization Strategies
- Index Vertex IDs: Ensure primary key indexes
- Limit Traversal Depth: Set
max_depthto prevent explosion - Cache Frequent Queries: Cache GetVertex, short traversals
- Batch Operations: Use BatchCreateVertices for bulk inserts
- Read Replicas: Route read queries to Neptune replicas
- Denormalize: Store frequently accessed properties on edges
Testing Strategy
Local Development with SQLite
# Use SQLite for fast local testing
namespaces:
- name: social-graph-test
pattern: graph
backend:
type: sqlite
database: ":memory:"
seed:
enabled: true
fixtures: ./test/fixtures/social-graph.json
Benefits:
- No external dependencies
- Fast setup (<100ms)
- Same API as production Neptune
- Easy CI/CD integration
Integration Tests with Real Neptune
func TestGraphTraversal(t *testing.T) {
client := setupTestNeptune(t) // Real Neptune cluster
// Setup test graph
client.Graph.CreateVertex(ctx, CreateVertexRequest{
ID: "A", Label: "User",
})
client.Graph.CreateVertex(ctx, CreateVertexRequest{
ID: "B", Label: "User",
})
client.Graph.CreateEdge(ctx, CreateEdgeRequest{
ID: "A-B", Label: "FOLLOWS",
FromVertexID: "A", ToVertexID: "B",
})
// Test traversal
result, err := client.Graph.Traverse(ctx, TraverseRequest{
StartVertexID: "A",
Steps: []TraversalStep{
{Direction: DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
},
})
require.NoError(t, err)
assert.Len(t, result.Vertices, 1)
assert.Equal(t, "B", result.Vertices[0].ID)
}
Security Considerations
- Vertex ID Authorization: Enforce namespace-level access control
- Traversal Depth Limits: Prevent DoS via unbounded traversals
- Query Complexity: Limit expensive algorithms (PageRank iterations)
- PII in Graphs: Tag sensitive vertex properties in schema
- Audit Logging: Log all graph mutations (create, update, delete)
Comparison to Alternatives
vs. Direct Neptune SDK
| Feature | Prism Graph Pattern | Direct Neptune SDK |
|---|---|---|
| Query Language | Protobuf (type-safe) | Gremlin strings |
| Backend Lock-in | No (swap backends) | Yes (Neptune only) |
| Caching | Built-in | Manual |
| Testing | Local SQLite | Requires Neptune |
| Learning Curve | Simple API | Learn Gremlin |
vs. Direct Neo4j Driver
| Feature | Prism Graph Pattern | Neo4j Driver |
|---|---|---|
| Query Language | Protobuf | Cypher strings |
| Type Safety | Compile-time | Runtime |
| Batching | Automatic | Manual |
| Caching | Configurable | Application code |
vs. Graph Libraries (NetworkX, igraph)
| Feature | Prism Graph Pattern | NetworkX |
|---|---|---|
| Scale | Millions of vertices (backend) | In-memory limit |
| Persistence | Durable storage | Not built-in |
| Distribution | Multi-node | Single process |
| Operations | Database queries | Algorithm library |
Migration Path
Phase 1: Core Operations (Week 1-2)
Deliverables:
- Protobuf definitions for Vertex/Edge CRUD
- Neptune backend implementation (Gremlin)
- Basic traversal support
- Go client library
Demo: Social graph with 2-hop traversal
Phase 2: Advanced Traversals (Week 3-4)
Deliverables:
- Shortest path, all paths
- Filter expressions (vertex, edge)
- Pagination for large results
- Query optimizer pattern
Demo: Knowledge graph with complex queries
Phase 3: Graph Algorithms (Week 5-6)
Deliverables:
- PageRank
- Community detection (Louvain)
- Cycle detection
- Batch operations
Demo: Recommendation system with collaborative filtering
Phase 4: Multi-Backend Support (Week 7-8)
Deliverables:
- Neo4j backend (Cypher)
- SQLite backend (local testing)
- Backend migration tool
- Performance benchmarks
Demo: Same application on Neptune vs Neo4j
Related RFCs and ADRs
- RFC-013: Neptune Graph Backend Implementation - Backend implementation details
- RFC-014: Layered Data Access Patterns - Pattern architecture
- RFC-017: Multicast Registry Pattern - Composite pattern example
- ADR-041: Graph Database Backend Support - Backend selection criteria
Open Questions
- Graph Schema Evolution: How to handle schema changes in production graphs?
- Distributed Graphs: Support for graph partitioning across backends?
- Temporal Graphs: Time-traveling queries (graph state at timestamp)?
- Graph Streaming: Real-time graph updates via CDC?
- Custom Algorithms: Plugin system for user-defined graph algorithms?
References
Graph Databases
Query Languages
Graph Theory
Revision History
- 2025-11-14: Initial draft for high-level Graph pattern API