Skip to main content

RFC-055: Graph Pattern - High-Level Graph Database API

Status: Draft Author: Platform Team Created: 2025-11-14 Updated: 2025-11-14

Abstract

This RFC defines a Graph Pattern that provides a high-level, backend-agnostic API for graph database operations. Applications can model and query highly connected data (social networks, knowledge graphs, recommendation systems, fraud detection) through intuitive graph operations without writing backend-specific query languages.

The pattern abstracts common graph operations (vertex/edge CRUD, traversals, pathfinding, algorithms) and can be backed by multiple graph databases (AWS Neptune, Neo4j, JanusGraph) through Prism's layered architecture.

Key Innovation: Same client API works with property graphs (Gremlin), RDF graphs (SPARQL), or native graph databases (Cypher), allowing backend migration without application changes.

Motivation

Problem Statement

Modern applications need graph capabilities for highly connected data, but implementing graph operations is complex:

Current Challenges:

  1. Query Language Lock-in: Learning Gremlin, Cypher, or SPARQL specific to each backend
  2. Backend-Specific Code: Direct Neptune/Neo4j SDK usage couples apps to infrastructure
  3. No Type Safety: Query strings lack compile-time validation
  4. Manual Optimization: Applications must implement caching, batching, connection pooling
  5. Complex Operations: Implementing graph algorithms (PageRank, shortest path) from scratch
  6. Testing Difficulty: Mocking graph queries requires extensive setup

Example Without Prism:

// Application tightly coupled to Gremlin
query := "g.V('user:alice').out('FOLLOWS').out('FOLLOWS').dedup().limit(10)"
result := gremlinClient.Submit(query)

// Must parse Gremlin-specific result format
vertices := parseGremlinResults(result)

Problems:

  • Application must know Gremlin syntax
  • Switching to Neo4j (Cypher) requires rewriting queries
  • No type safety or validation
  • Manual result parsing

Goals

  1. Backend Abstraction: Same API for Neptune, Neo4j, JanusGraph, ArangoDB
  2. Type Safety: Protobuf-based operations with compile-time guarantees
  3. Common Operations: Built-in support for traversals, pathfinding, algorithms
  4. Performance: Automatic query optimization, caching, batching
  5. Composability: Layer with other patterns (Cache + Graph, CDC + Graph)
  6. Testing: Local graph backend (embedded mode) for development

Use Cases

Use Case 1: Social Network Graph

Application Problem: Find friends-of-friends, mutual connections, community detection.

pattern: graph
use_case: social-network

# What you model
entities:
- type: User
properties: [name, email, joined_at]
- type: Post
properties: [content, created_at]

relationships:
- type: FOLLOWS
from: User
to: User
- type: LIKES
from: User
to: Post

Client Code:

# Find friends of friends (2 hops)
fof = client.graph.traverse(
start_vertex="user:alice",
steps=[
TraversalStep(direction=OUT, edge_label="FOLLOWS"),
TraversalStep(direction=OUT, edge_label="FOLLOWS")
],
limit=10
)

# Find shortest connection path
path = client.graph.shortest_path(
start="user:alice",
end="user:bob",
max_hops=6
)

Use Case 2: Knowledge Graph

Application Problem: Model entities and relationships, semantic search, inference.

pattern: graph
use_case: knowledge-graph

entities:
- type: Person
properties: [name, birth_date, nationality]
- type: Organization
properties: [name, founded, industry]
- type: Event
properties: [name, date, location]

relationships:
- type: WORKS_AT
from: Person
to: Organization
properties: [role, start_date]
- type: FOUNDED
from: Person
to: Organization
- type: ATTENDED
from: Person
to: Event

Client Code:

# Find all organizations founded by Alice
orgs = client.graph.traverse(
start_vertex="person:alice",
steps=[TraversalStep(direction=OUT, edge_label="FOUNDED")],
vertex_filter={"type": "Organization"}
)

# Complex query: People who worked at companies Alice founded
people = client.graph.traverse(
start_vertex="person:alice",
steps=[
TraversalStep(direction=OUT, edge_label="FOUNDED"),
TraversalStep(direction=IN, edge_label="WORKS_AT")
],
limit=50
)

Use Case 3: Recommendation System

Application Problem: Product recommendations based on user behavior and item similarity.

pattern: graph
use_case: recommendations

entities:
- type: User
properties: [user_id, preferences]
- type: Product
properties: [product_id, category, price]

relationships:
- type: PURCHASED
from: User
to: Product
properties: [timestamp, rating]
- type: VIEWED
from: User
to: Product
- type: SIMILAR_TO
from: Product
to: Product
properties: [similarity_score]

Client Code:

# Collaborative filtering: What did similar users buy?
recommendations = client.graph.traverse(
start_vertex="user:123",
steps=[
# Users who bought the same products
TraversalStep(direction=OUT, edge_label="PURCHASED"),
TraversalStep(direction=IN, edge_label="PURCHASED"),
# What else they bought
TraversalStep(direction=OUT, edge_label="PURCHASED")
],
filters={"exclude": ["user:123"]}, # Exclude original user
rank_by="purchase_count",
limit=20
)

Use Case 4: Fraud Detection Network

Application Problem: Detect fraud rings by analyzing transaction networks and account linkages.

pattern: graph
use_case: fraud-detection

entities:
- type: Account
properties: [account_id, created_at, status]
- type: Transaction
properties: [amount, timestamp]
- type: Device
properties: [device_id, ip_address]

relationships:
- type: TRANSFERRED_TO
from: Account
to: Account
properties: [amount, timestamp]
- type: LOGGED_IN_FROM
from: Account
to: Device
- type: LINKED_TO
from: Account
to: Account
properties: [link_type]

Client Code:

# Find potential fraud ring: accounts with circular transfers
rings = client.graph.detect_cycles(
start_vertex="account:suspicious-123",
edge_label="TRANSFERRED_TO",
max_depth=5,
min_cycle_length=3
)

# PageRank to identify hub accounts
hubs = client.graph.pagerank(
vertex_filter={"type": "Account"},
iterations=20,
damping_factor=0.85,
limit=10
)

Pattern Definition

Core Operations

The Graph pattern provides these primitive operations:

1. Vertex Operations

syntax = "proto3";
package prism.graph.v1;

// Create vertex
message CreateVertexRequest {
string id = 1; // Unique vertex ID
string label = 2; // Vertex type (e.g., "User", "Product")
map<string, Value> properties = 3; // Vertex properties
}

// Read vertex
message GetVertexRequest {
string id = 1;
bool include_edges = 2; // Include connected edges
}

message GetVertexResponse {
Vertex vertex = 1;
repeated Edge edges = 2; // If include_edges=true
}

// Update vertex
message UpdateVertexRequest {
string id = 1;
map<string, Value> properties = 2; // Properties to update
bool merge = 3; // Merge or replace
}

// Delete vertex
message DeleteVertexRequest {
string id = 1;
bool cascade = 2; // Delete connected edges
}

2. Edge Operations

// Create edge
message CreateEdgeRequest {
string id = 1; // Unique edge ID
string label = 2; // Edge type (e.g., "FOLLOWS", "PURCHASED")
string from_vertex_id = 3; // Source vertex
string to_vertex_id = 4; // Target vertex
map<string, Value> properties = 5; // Edge properties
}

// Get edge
message GetEdgeRequest {
string id = 1;
}

message GetEdgeResponse {
Edge edge = 1;
}

// Delete edge
message DeleteEdgeRequest {
string id = 1;
}

3. Traversal Operations

// Traverse graph
message TraverseRequest {
string start_vertex_id = 1;
repeated TraversalStep steps = 2;
optional int32 max_depth = 3;
optional int32 limit = 4;
optional VertexFilter vertex_filter = 5;
optional EdgeFilter edge_filter = 6;
}

message TraversalStep {
enum Direction {
OUT = 0; // Follow outgoing edges
IN = 1; // Follow incoming edges
BOTH = 2; // Follow both directions
}

Direction direction = 1;
repeated string edge_labels = 2; // Filter by edge type
optional int32 min_hops = 3; // Minimum hops
optional int32 max_hops = 4; // Maximum hops
}

message TraverseResponse {
repeated Vertex vertices = 1;
repeated Edge edges = 2;
repeated Path paths = 3;
}

message Path {
repeated string vertex_ids = 1;
repeated string edge_ids = 2;
double weight = 3; // Path weight (if weighted)
}

4. Pathfinding Operations

// Shortest path
message ShortestPathRequest {
string start_vertex_id = 1;
string end_vertex_id = 2;
optional int32 max_hops = 3;
optional string weight_property = 4; // Edge property for weighted paths
repeated string edge_labels = 5; // Allowed edge types
}

message ShortestPathResponse {
Path path = 1;
double total_weight = 2;
}

// All paths
message AllPathsRequest {
string start_vertex_id = 1;
string end_vertex_id = 2;
int32 max_hops = 3;
int32 limit = 4; // Max paths to return
}

message AllPathsResponse {
repeated Path paths = 1;
}

5. Graph Algorithms

// PageRank
message PageRankRequest {
optional VertexFilter vertex_filter = 1;
int32 iterations = 2; // Number of iterations
double damping_factor = 3; // Damping factor (0.85 typical)
int32 limit = 4; // Top N vertices
}

message PageRankResponse {
repeated VertexScore scores = 1;
}

message VertexScore {
string vertex_id = 1;
double score = 2;
}

// Community detection
message CommunityDetectionRequest {
string algorithm = 1; // "louvain", "label_propagation", "connected_components"
optional VertexFilter vertex_filter = 2;
}

message CommunityDetectionResponse {
repeated Community communities = 1;
}

message Community {
string community_id = 1;
repeated string vertex_ids = 2;
double modularity = 3;
}

// Cycle detection
message DetectCyclesRequest {
string start_vertex_id = 1;
repeated string edge_labels = 2;
int32 max_depth = 3;
int32 min_cycle_length = 4;
}

message DetectCyclesResponse {
repeated Path cycles = 1;
}

6. Batch Operations

// Batch create vertices
message BatchCreateVerticesRequest {
repeated CreateVertexRequest vertices = 1;
bool atomic = 2; // All or nothing
}

message BatchCreateVerticesResponse {
repeated Vertex vertices = 1;
repeated Error errors = 2; // Per-vertex errors
}

// Batch create edges
message BatchCreateEdgesRequest {
repeated CreateEdgeRequest edges = 1;
bool atomic = 2;
}

message BatchCreateEdgesResponse {
repeated Edge edges = 1;
repeated Error errors = 2;
}

Data Model

message Vertex {
string id = 1;
string label = 2;
map<string, Value> properties = 3;
Timestamp created_at = 4;
Timestamp updated_at = 5;
}

message Edge {
string id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, Value> properties = 5;
Timestamp created_at = 6;
}

message Value {
oneof value {
string string_value = 1;
int64 int_value = 2;
double double_value = 3;
bool bool_value = 4;
bytes bytes_value = 5;
repeated Value list_value = 6;
map<string, Value> map_value = 7;
}
}

message VertexFilter {
repeated string labels = 1; // Filter by vertex types
map<string, ValuePredicate> property_filters = 2;
}

message EdgeFilter {
repeated string labels = 1;
map<string, ValuePredicate> property_filters = 2;
}

message ValuePredicate {
enum Operator {
EQ = 0;
NE = 1;
LT = 2;
LE = 3;
GT = 4;
GE = 5;
IN = 6;
CONTAINS = 7;
STARTS_WITH = 8;
}

Operator operator = 1;
Value value = 2;
}

Architecture

Three-Layer Model

┌─────────────────────────────────────────────────────────┐
│ Layer 3: Client API (Graph Pattern) │
│ Traverse | ShortestPath | PageRank | CreateVertex │
│ "I want to traverse 2 hops from Alice" │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Layer 2: Pattern Composition │
│ Cache | Query Optimizer | Result Materialization │
│ "Cache traversal results in Redis" │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Layer 1: Backend Execution │
│ Neptune (Gremlin) | Neo4j (Cypher) | JanusGraph │
│ "Translate to Gremlin and execute on Neptune" │
└─────────────────────────────────────────────────────────┘

Backend Slot Architecture

The Graph pattern can be backed by different graph databases:

BackendQuery LanguageStrengthsUse When
AWS NeptuneGremlin, SPARQLFully managed, AWS integrationProduction, AWS deployments
Neo4jCypherRich query language, large communitySelf-hosted, advanced features
JanusGraphGremlinOpen source, multi-backendLarge-scale, custom storage
ArangoDBAQLMulti-model (graph + document)Flexible data model
SQLite (local)SQL emulationEmbedded, fast startupDevelopment, testing

Configuration Examples

Example 1: High-Performance In-Memory Graph (MemStore)

namespaces:
- name: social-graph
pattern: graph

# Backend: MemStore (in-memory, lock-free)
backend:
type: memstore
max_vertices: 100_000_000 # 100M vertices
max_edges: 1_000_000_000 # 1B edges

# Fast snapshot loading
snapshot:
enabled: true
format: protobuf # Or jsonlines
source: s3://snapshots/social-graph-20251114.pb.gz
compression: gzip
parallel_workers: 100 # 100 parallel ingest workers

# Performance characteristics
performance:
traversal_timeout: 100ms
max_traversal_depth: 6
batch_size: 1000

Characteristics:

  • 100M vertices, 1B edges in-memory
  • Load 10GB snapshot in ~3 seconds (100 workers)
  • 50 μs traversal latency
  • Lock-free concurrent writes
  • Zero coordination overhead

Use cases: Real-time fraud detection, social network analysis, recommendation engines

Example 2: Distributed Multi-Proxy Graph Cluster

# Configuration for each proxy in 10-node cluster
namespaces:
- name: social-graph-distributed
pattern: graph

backend:
type: memstore
max_vertices: 100_000_000 # 100M per proxy = 1B total

# Partitioning strategy
partitioning:
enabled: true
strategy: consistent_hash
num_partitions: 10
partition_key: vertex_id

# Snapshot loading (each proxy loads its partition)
snapshot:
enabled: true
source: s3://snapshots/social-graph-partition-${PARTITION_ID}.pb.gz
parallel_workers: 50

# Cross-proxy communication
cluster:
enabled: true
proxies:
- host: proxy-0.prism.local:50090
- host: proxy-1.prism.local:50090
- host: proxy-2.prism.local:50090
# ... (10 proxies total)
routing_strategy: consistent_hash

Characteristics:

  • 10 proxy instances
  • 1 billion vertices total (100M per proxy)
  • Consistent hashing for vertex distribution
  • Cross-proxy traversal support
  • 100k traversals/sec aggregate throughput

Use cases: Large-scale social networks, knowledge graphs, global recommendation systems

Example 3: Snapshot-to-Production Pipeline

# CI/CD pipeline: Build graph snapshot → Load to production
build_snapshot:
# Step 1: Export from source databases
sources:
- type: postgres
query: "SELECT id, properties FROM users"
output: vertices-users.jsonl

- type: postgres
query: "SELECT from_id, to_id FROM friendships"
output: edges-friendships.jsonl

# Step 2: Merge and convert to protobuf
merge:
input: ["vertices-*.jsonl", "edges-*.jsonl"]
output: social-graph-snapshot.pb
compression: gzip

# Step 3: Upload to S3
upload:
destination: s3://snapshots/social-graph-${DATE}.pb.gz

deploy_to_production:
# Step 4: Prism proxy loads snapshot on startup
namespace: social-graph
backend: memstore
snapshot:
source: s3://snapshots/social-graph-${DATE}.pb.gz
parallel_workers: 100
validation:
check_vertex_count: true
check_edge_count: true
expected_vertices: 100000000
expected_edges: 1000000000

Characteristics:

  • Automated snapshot generation from source databases
  • Fast binary format (protobuf + gzip)
  • Validation before serving traffic
  • Blue-green deployment (load new snapshot, switch traffic)

Use cases: Daily graph updates, batch ETL pipelines, staging environments

Example 4: Development with Local MemStore

namespaces:
- name: social-graph-dev
pattern: graph

backend:
type: memstore
max_vertices: 10000 # Small dev dataset

# Seed data for testing
seed:
enabled: true
format: jsonlines
source: ./test/fixtures/social-graph-seed.jsonl

# Faster iteration during development
performance:
snapshot_loading_timeout: 5s
parallel_workers: 4

Characteristics:

  • Small dataset for fast iteration
  • Local file-based seed data
  • Same API as production
  • <1 second startup

Use cases: Unit tests, integration tests, local development

Example 5: Production-Grade with Distributed WAL + MemStore + Neptune

namespaces:
- name: social-graph-production
pattern: graph

# Layer 1: Distributed WAL (durability + ordering)
wal:
enabled: true
backend: kafka
topic: graph-wal
partitions: 100
replication_factor: 3
retention: 7d
acks: all # Wait for all replicas before ack

# Layer 2: MemStore (hot read path)
backend:
type: memstore
max_vertices: 100_000_000
max_edges: 1_000_000_000

# Layer 3: Neptune (cold storage, source of truth)
persistence:
enabled: true
type: neptune
cluster_endpoint: my-cluster.us-east-1.neptune.amazonaws.com
region: us-east-1
iam_auth: true

# Write path: WAL → MemStore + Neptune
write_strategy:
mode: wal_first # Append to WAL before ack
async_apply: true
memstore_workers: 10 # Workers applying WAL to MemStore
neptune_workers: 5 # Workers applying WAL to Neptune
batch_size: 1000

# Read path: MemStore → Neptune fallback
read_strategy:
mode: memstore_first
fallback_to_neptune: true
fallback_timeout: 100ms

# Recovery strategy
recovery:
source: wal # Replay WAL on startup
checkpoint_interval: 3600 # Checkpoint every hour
max_replay_duration: 600s # 10 min replay limit

Characteristics:

  • Write latency: 5ms (Kafka WAL append)
  • Read latency: 50 μs (MemStore) or 30ms (Neptune fallback)
  • Durability: Guaranteed (WAL with acks=all)
  • Consistency: Strong (ordered WAL)
  • Recovery: Replay WAL from last checkpoint
  • Throughput: 100k writes/sec, 1M reads/sec

Use cases: Production systems requiring durability, disaster recovery, audit trails, compliance

Pattern Composition

Graph + Cache Pattern

Use Case: Frequently accessed subgraphs (user profiles, friend lists)

namespaces:
- name: social-graph
pattern: graph

backend:
type: neptune

# Layer 2: Look-aside cache
patterns:
- type: cache
strategy: look-aside
backend: redis
ttl: 300
operations: [GetVertex, Traverse]
key_pattern: "graph:{operation}:{params_hash}"

Data Flow:

sequenceDiagram
participant App
participant Prism
participant Cache as Redis Cache
participant Neptune as Neptune Backend

App->>Prism: Traverse(user:alice, 2 hops)
Prism->>Cache: Check cache key

alt Cache Hit
Cache-->>Prism: Cached traversal result
Prism-->>App: Return vertices
else Cache Miss
Prism->>Neptune: Execute Gremlin traversal
Neptune-->>Prism: Vertices [Bob, Charlie, Dave]
Prism->>Cache: Store result (TTL=300s)
Prism-->>App: Return vertices
end

Graph + CDC Pattern

Use Case: Sync graph changes to search index (Elasticsearch)

namespaces:
- name: knowledge-graph
pattern: graph

backend:
type: neptune

# CDC to stream graph changes
cdc:
enabled: true
destination: kafka
topic: graph-changes
operations: [CreateVertex, UpdateVertex, DeleteVertex, CreateEdge, DeleteEdge]

# Consumer: Elasticsearch indexer
consumers:
- name: es-indexer
type: custom
backend: elasticsearch
index: knowledge-graph
mapping:
vertex: document
edge: relationship

Benefits:

  • Keep search index synchronized with graph
  • Full-text search on graph data
  • No manual sync code

Graph + Query Optimizer Pattern

Use Case: Automatically optimize complex traversals

namespaces:
- name: social-graph
pattern: graph

backend:
type: neptune

patterns:
- type: query-optimizer
strategies:
- name: index-hints
enabled: true
- name: traversal-pruning
enabled: true
max_branches: 100
- name: result-streaming
enabled: true
batch_size: 50

Optimizations:

  • Add index hints for vertex lookups
  • Prune traversals with too many branches
  • Stream large result sets instead of materializing

Client API Examples

⚠️ NOTE: Python examples below are for illustration only. Production client libraries will be in Go (primary) and Rust (secondary).

Python Client API (Conceptual)

from prism import Client, TraversalStep, Direction

client = Client(namespace="social-graph")

# Create vertices
alice = client.graph.create_vertex(
id="user:alice",
label="User",
properties={"name": "Alice", "joined": "2024-01-01"}
)

bob = client.graph.create_vertex(
id="user:bob",
label="User",
properties={"name": "Bob", "joined": "2024-02-01"}
)

# Create edge
client.graph.create_edge(
id="follow:1",
label="FOLLOWS",
from_vertex_id="user:alice",
to_vertex_id="user:bob",
properties={"since": "2024-03-01"}
)

# Traverse: Find friends of friends
fof = client.graph.traverse(
start_vertex_id="user:alice",
steps=[
TraversalStep(direction=Direction.OUT, edge_labels=["FOLLOWS"]),
TraversalStep(direction=Direction.OUT, edge_labels=["FOLLOWS"])
],
limit=10
)

print(f"Friends of friends: {[v.id for v in fof.vertices]}")

# Shortest path
path = client.graph.shortest_path(
start_vertex_id="user:alice",
end_vertex_id="user:charlie",
max_hops=6
)

print(f"Path length: {len(path.vertex_ids)}")

# PageRank
influential = client.graph.pagerank(
vertex_filter={"labels": ["User"]},
iterations=20,
damping_factor=0.85,
limit=10
)

for score in influential.scores:
print(f"{score.vertex_id}: {score.score}")

Go Client API

import "github.com/prism/client-go/graph"

func main() {
client := prism.NewClient("social-graph")

// Create vertex
alice, err := client.Graph.CreateVertex(ctx, graph.CreateVertexRequest{
ID: "user:alice",
Label: "User",
Properties: map[string]interface{}{
"name": "Alice",
"joined": "2024-01-01",
},
})

// Traverse
result, err := client.Graph.Traverse(ctx, graph.TraverseRequest{
StartVertexID: "user:alice",
Steps: []graph.TraversalStep{
{Direction: graph.DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
{Direction: graph.DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
},
Limit: 10,
})

for _, vertex := range result.Vertices {
fmt.Printf("Friend of friend: %s\n", vertex.ID)
}

// Shortest path
path, err := client.Graph.ShortestPath(ctx, graph.ShortestPathRequest{
StartVertexID: "user:alice",
EndVertexID: "user:charlie",
MaxHops: 6,
})

fmt.Printf("Path length: %d hops\n", len(path.VertexIDs))
}

Rust Client API

use prism::{Client, graph::{TraversalStep, Direction}};

#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new("social-graph").await?;

// Create vertex
let alice = client.graph.create_vertex(CreateVertexRequest {
id: "user:alice".into(),
label: "User".into(),
properties: hashmap!{
"name" => "Alice".into(),
"joined" => "2024-01-01".into(),
},
}).await?;

// Traverse
let result = client.graph.traverse(TraverseRequest {
start_vertex_id: "user:alice".into(),
steps: vec![
TraversalStep {
direction: Direction::Out,
edge_labels: vec!["FOLLOWS".into()],
..Default::default()
},
TraversalStep {
direction: Direction::Out,
edge_labels: vec!["FOLLOWS".into()],
..Default::default()
},
],
limit: Some(10),
..Default::default()
}).await?;

for vertex in result.vertices {
println!("Friend of friend: {}", vertex.id);
}

Ok(())
}

Distributed In-Memory Graph with MemStore

Architecture Overview

The Graph pattern uses MemStore as its backend for ultra-fast, lock-free, distributed graph construction and traversal. This enables building graphs from multi-megabyte snapshots in parallel without coordination overhead.

MemStore Graph Encoding

Vertices stored as hashes:

Key: "v:{vertex_id}"
Hash fields:
- "label" → vertex type (e.g., "User")
- "name" → property value
- "email" → property value
- ... (arbitrary properties)

Outgoing edges stored as sets:

Key: "v:{vertex_id}:out:{edge_label}"
Set members: ["{target_id_1}", "{target_id_2}", ...]

Incoming edges stored as sets (for bidirectional traversal):

Key: "v:{vertex_id}:in:{edge_label}"
Set members: ["{source_id_1}", "{source_id_2}", ...]

Edge properties stored as hashes:

Key: "e:{edge_id}"
Hash fields:
- "from" → source vertex ID
- "to" → target vertex ID
- "label" → edge type
- ... (edge properties)

Example Graph Encoding

Social graph:

Alice (user:alice):
v:user:alice → {label: "User", name: "Alice", joined: "2024-01-01"}
v:user:alice:out:FOLLOWS → ["user:bob", "user:charlie"]
v:user:alice:in:FOLLOWS → ["user:dave"]

Bob (user:bob):
v:user:bob → {label: "User", name: "Bob", joined: "2024-02-01"}
v:user:bob:out:FOLLOWS → ["user:charlie"]
v:user:bob:in:FOLLOWS → ["user:alice"]

Edge (follow:1):
e:follow:1 → {from: "user:alice", to: "user:bob", label: "FOLLOWS", since: "2024-03-01"}

Why MemStore for Graphs?

  1. Lock-Free Concurrency: sync.Map enables thousands of concurrent writes
  2. Zero Coordination: No distributed locks, no leader election
  3. Instant Reads: In-memory, sub-microsecond access
  4. Simple Encoding: Hash/Set operations map naturally to graph structure
  5. Horizontal Scaling: Each Prism proxy instance has independent memstore

Fast Parallel Graph Construction from Snapshots

Problem: Loading Multi-Megabyte Snapshots

Scenario: Load 10GB social graph snapshot with 100M vertices and 1B edges in <60 seconds.

Traditional Approach (Sequential):

Parse snapshot → For each vertex → Write to DB → For each edge → Write to DB
Time: 100M vertices × 0.1ms = 10,000 seconds (2.7 hours)

Prism Map-Reduce Approach (Parallel):

1. Partition snapshot by vertex ID ranges
2. Map phase: N workers read partitions in parallel
3. Each worker writes to memstore (lock-free)
4. No reduce phase needed (writes are independent)

Time: 10GB / 100 workers / 100 MB/s = 1 second (read) + 2 seconds (write) = 3 seconds

Partitioning Strategy

Snapshot format (JSON lines):

{"type":"vertex","id":"user:000001","label":"User","properties":{"name":"Alice"}}
{"type":"edge","id":"follow:000001","from":"user:000001","to":"user:000002","label":"FOLLOWS"}
{"type":"vertex","id":"user:000002","label":"User","properties":{"name":"Bob"}}
...

Partition by hash (consistent hashing):

Partition 0: vertices with hash(id) % 100 == 0
Partition 1: vertices with hash(id) % 100 == 1
...
Partition 99: vertices with hash(id) % 100 == 99

Each worker:

  1. Reads assigned partition from snapshot file (byte ranges)
  2. Parses vertices/edges
  3. Writes to memstore (no locks!)

Map Job Implementation

// Parallel snapshot ingestion with map workers
func IngestSnapshot(snapshotPath string, numWorkers int, client *prism.Client) error {
file, _ := os.Open(snapshotPath)
fileInfo, _ := file.Stat()
fileSize := fileInfo.Size()

// Partition file into byte ranges
chunkSize := fileSize / int64(numWorkers)
var wg sync.WaitGroup

for i := 0; i < numWorkers; i++ {
wg.Add(1)
startOffset := int64(i) * chunkSize
endOffset := startOffset + chunkSize
if i == numWorkers-1 {
endOffset = fileSize // Last worker gets remainder
}

go func(workerID int, start, end int64) {
defer wg.Done()

// Read partition
reader := bufio.NewReader(io.NewSectionReader(file, start, end-start))

// Batch writes for efficiency
vertexBatch := make([]CreateVertexRequest, 0, 1000)
edgeBatch := make([]CreateEdgeRequest, 0, 1000)

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
var obj map[string]interface{}
json.Unmarshal([]byte(line), &obj)

if obj["type"] == "vertex" {
vertexBatch = append(vertexBatch, CreateVertexRequest{
ID: obj["id"].(string),
Label: obj["label"].(string),
Properties: obj["properties"].(map[string]interface{}),
})

if len(vertexBatch) >= 1000 {
client.Graph.BatchCreateVertices(ctx, vertexBatch)
vertexBatch = vertexBatch[:0]
}
} else if obj["type"] == "edge" {
edgeBatch = append(edgeBatch, CreateEdgeRequest{
ID: obj["id"].(string),
Label: obj["label"].(string),
FromVertexID: obj["from"].(string),
ToVertexID: obj["to"].(string),
Properties: obj["properties"].(map[string]interface{}),
})

if len(edgeBatch) >= 1000 {
client.Graph.BatchCreateEdges(ctx, edgeBatch)
edgeBatch = edgeBatch[:0]
}
}
}

// Flush remaining batches
if len(vertexBatch) > 0 {
client.Graph.BatchCreateVertices(ctx, vertexBatch)
}
if len(edgeBatch) > 0 {
client.Graph.BatchCreateEdges(ctx, edgeBatch)
}

fmt.Printf("Worker %d: Processed %d-%d bytes\n", workerID, start, end)
}(i, startOffset, endOffset)
}

wg.Wait()
return nil
}

Proxy MemStore Implementation

BatchCreateVertices (lock-free):

func (g *GraphPattern) BatchCreateVertices(ctx context.Context, vertices []CreateVertexRequest) error {
// No locks needed - sync.Map handles concurrency
for _, v := range vertices {
// Store vertex properties as hash
properties := make(map[string][]byte)
properties["label"] = []byte(v.Label)
for k, val := range v.Properties {
properties[k] = serializeValue(val)
}

// Write to memstore (lock-free)
memstore.HMSet(ctx, fmt.Sprintf("v:%s", v.ID), properties)
}

return nil
}

BatchCreateEdges (lock-free):

func (g *GraphPattern) BatchCreateEdges(ctx context.Context, edges []CreateEdgeRequest) error {
for _, e := range edges {
// Add to outgoing edge set (lock-free)
outKey := fmt.Sprintf("v:%s:out:%s", e.FromVertexID, e.Label)
memstore.SAdd(ctx, outKey, []string{e.ToVertexID})

// Add to incoming edge set (lock-free)
inKey := fmt.Sprintf("v:%s:in:%s", e.ToVertexID, e.Label)
memstore.SAdd(ctx, inKey, []string{e.FromVertexID})

// Store edge properties if any
if len(e.Properties) > 0 {
properties := make(map[string][]byte)
properties["from"] = []byte(e.FromVertexID)
properties["to"] = []byte(e.ToVertexID)
properties["label"] = []byte(e.Label)
for k, val := range e.Properties {
properties[k] = serializeValue(val)
}
memstore.HMSet(ctx, fmt.Sprintf("e:%s", e.ID), properties)
}
}

return nil
}

Performance Characteristics

Single-threaded baseline:

  • Create vertex: 10 μs
  • Create edge: 15 μs
  • 100M vertices: 1000 seconds
  • 1B edges: 15000 seconds
  • Total: ~4.4 hours

100-worker parallel (memstore):

  • Throughput: 10M vertices/sec (100 workers × 100k vertices/sec)
  • 100M vertices: 10 seconds
  • 1B edges: 100 seconds
  • Total: ~2 minutes (130× speedup)

Key factors:

  1. Lock-free writes (no contention)
  2. Partitioned workload (no coordination)
  3. In-memory writes (no disk I/O)
  4. Batch operations (1000 per call)

Graph Traversal with MemStore

Traverse operation (2 hops):

func (g *GraphPattern) Traverse(ctx context.Context, req *TraverseRequest) (*TraverseResponse, error) {
currentLevel := []string{req.StartVertexID}
visited := make(map[string]bool)

for _, step := range req.Steps {
nextLevel := []string{}

for _, vertexID := range currentLevel {
if visited[vertexID] {
continue // Skip already visited
}
visited[vertexID] = true

// Get outgoing edges from memstore (set operation)
var edgeKey string
if step.Direction == DirectionOUT {
edgeKey = fmt.Sprintf("v:%s:out:%s", vertexID, step.EdgeLabel)
} else {
edgeKey = fmt.Sprintf("v:%s:in:%s", vertexID, step.EdgeLabel)
}

// SMembers is lock-free read
neighbors, _ := memstore.SMembers(ctx, edgeKey)
nextLevel = append(nextLevel, neighbors...)
}

currentLevel = nextLevel
}

// Fetch vertex details (parallel)
vertices := make([]Vertex, 0, len(currentLevel))
for _, vertexID := range currentLevel {
properties, _ := memstore.HGetAll(ctx, fmt.Sprintf("v:%s", vertexID))
vertices = append(vertices, Vertex{
ID: vertexID,
Label: string(properties["label"]),
Properties: deserializeProperties(properties),
})

if len(vertices) >= int(req.Limit) {
break
}
}

return &TraverseResponse{Vertices: vertices}, nil
}

Latency (in-memory):

  • 1-hop traversal: 50 μs
  • 2-hop traversal: 200 μs
  • 3-hop traversal: 1 ms

Distributed Graph Across Multiple Proxies

Scenario: 1 billion vertices across 10 Prism proxy instances.

Partitioning strategy (consistent hashing):

Proxy 0: Vertices with hash(id) % 10 == 0 (100M vertices)
Proxy 1: Vertices with hash(id) % 10 == 1 (100M vertices)
...
Proxy 9: Vertices with hash(id) % 10 == 9 (100M vertices)

Query routing:

func (c *Client) GetVertex(ctx context.Context, vertexID string) (*Vertex, error) {
// Determine which proxy owns this vertex
proxyID := hash(vertexID) % numProxies
proxy := c.proxies[proxyID]

// Route query to owning proxy
return proxy.Graph.GetVertex(ctx, vertexID)
}

Multi-hop traversal (cross-proxy):

func (c *Client) Traverse(ctx context.Context, req *TraverseRequest) (*TraverseResponse, error) {
currentLevel := []string{req.StartVertexID}

for _, step := range req.Steps {
// Group vertices by owning proxy
verticesByProxy := groupByProxy(currentLevel)

// Fan-out to all proxies in parallel
var wg sync.WaitGroup
results := make(chan []string, len(verticesByProxy))

for proxyID, vertexIDs := range verticesByProxy {
wg.Add(1)
go func(pID int, vIDs []string) {
defer wg.Done()
// Each proxy traverses its local vertices
neighbors := c.proxies[pID].Graph.TraverseLocal(ctx, vIDs, step)
results <- neighbors
}(proxyID, vertexIDs)
}

wg.Wait()
close(results)

// Merge results from all proxies
nextLevel := []string{}
for neighbors := range results {
nextLevel = append(nextLevel, neighbors...)
}

currentLevel = nextLevel
}

return &TraverseResponse{Vertices: fetchVertexDetails(currentLevel)}, nil
}

Performance (10 proxy cluster):

  • Single proxy query: 50 μs
  • Cross-proxy 2-hop: 300 μs (includes network round-trip)
  • Aggregate throughput: 100k traversals/sec (10k per proxy × 10 proxies)

Snapshot Format Specification

Efficient binary format (for fast parsing):

message GraphSnapshot {
repeated VertexRecord vertices = 1;
repeated EdgeRecord edges = 2;
}

message VertexRecord {
string id = 1;
string label = 2;
map<string, bytes> properties = 3;
}

message EdgeRecord {
string id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, bytes> properties = 5;
}

Streaming format (line-delimited protobuf):

Each line: varint(message_length) + serialized_protobuf_message

Advantages:

  1. Fast parsing (no JSON overhead)
  2. Type-safe (protobuf schema)
  3. Seekable (can jump to byte offsets)
  4. Compressible (gzip/zstd)

Distributed WAL Architecture (Production-Grade)

Three-Layer Architecture

┌─────────────────────────────────────────────────────────────┐
│ Client Applications │
└────────────────────┬────────────────────────────────────────┘
│ CreateVertex, CreateEdge, Traverse

┌─────────────────────────────────────────────────────────────┐
│ Prism Proxy Cluster (10 nodes) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 1: Distributed WAL (Kafka) │ │
│ │ - Append writes to partitioned topic │ │
│ │ - Acks=all (replicated to 3 brokers) │ │
│ │ - Guarantees: Durability + Ordering │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ Background Workers │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 2: MemStore (In-Memory) │ │
│ │ - Hot read path (50 μs latency) │ │
│ │ - Lock-free concurrent access │ │
│ │ - 100M vertices per proxy │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ Async replication │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Layer 3: Neptune (Persistent Storage) │ │
│ │ - Source of truth for graph │ │
│ │ - Fallback for cache misses │ │
│ │ - Snapshot generation │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Write Path (WAL-First)

sequenceDiagram
participant Client
participant Proxy as Prism Proxy
participant WAL as Kafka WAL
participant MemStore
participant Neptune

Note over Client,Neptune: Write Path (CreateVertex)

Client->>Proxy: CreateVertex(id, label, properties)

Note over Proxy: 1. Validate request
Proxy->>Proxy: Check schema, validate properties

Note over Proxy,WAL: 2. Append to WAL (durable)
Proxy->>WAL: Produce(topic="graph-wal", key=vertex_id, value=operation)
WAL->>WAL: Replicate to 3 brokers (acks=all)
WAL-->>Proxy: Offset 12345678

Note over Proxy: 3. Acknowledge to client (write is durable)
Proxy-->>Client: Success (latency: 5ms)

Note over Proxy,MemStore: 4. Background: Apply to MemStore
Proxy->>MemStore: HMSet(v:vertex_id, properties)
MemStore-->>Proxy: OK

Note over Proxy,Neptune: 5. Background: Replicate to Neptune
Proxy->>Neptune: g.addV(label).property(id, vertex_id)...
Neptune-->>Proxy: OK

Note over Proxy: 6. Checkpoint WAL offset
Proxy->>Proxy: Save offset 12345678 to checkpoint

Key guarantees:

  1. Durability: Write acknowledged only after WAL replication (acks=all)
  2. No data loss: WAL persisted before ack, survives proxy crashes
  3. Ordering: Kafka partition ordering ensures sequential application
  4. Async apply: MemStore/Neptune updates don't block client

Read Path (MemStore-First with Fallback)

sequenceDiagram
participant Client
participant Proxy as Prism Proxy
participant MemStore
participant Neptune

Note over Client,Neptune: Read Path (GetVertex)

Client->>Proxy: GetVertex(id="user:alice")

Note over Proxy,MemStore: 1. Try MemStore (hot path)
Proxy->>MemStore: HGetAll(v:user:alice)

alt Vertex in MemStore (99.9% case)
MemStore-->>Proxy: {label: "User", name: "Alice", ...}
Proxy-->>Client: Vertex (latency: 50 μs)
else Cache miss (0.1% case)
MemStore-->>Proxy: nil (not found)

Note over Proxy,Neptune: 2. Fallback to Neptune
Proxy->>Neptune: g.V('user:alice').valueMap()
Neptune-->>Proxy: {label: "User", name: "Alice", ...}

Note over Proxy,MemStore: 3. Populate MemStore for next read
Proxy->>MemStore: HMSet(v:user:alice, properties)

Proxy-->>Client: Vertex (latency: 30ms)
end

Read characteristics:

  • 99.9% cache hit rate: Most reads served from MemStore (50 μs)
  • 0.1% cache miss: Fallback to Neptune, then populate MemStore
  • No blocking: Reads never wait for WAL application

WAL Consumer Architecture

Per-proxy consumers (pull model):

// Each proxy runs WAL consumer workers
func (p *Proxy) StartWALConsumers() {
// Consumer group: Each proxy consumes from assigned partitions
consumer := kafka.NewConsumer(kafka.ConsumerConfig{
GroupID: fmt.Sprintf("graph-wal-consumer-proxy-%d", p.ID),
Topics: []string{"graph-wal"},
AutoCommit: false, // Manual commit after apply
})

// Start multiple workers for parallel processing
for i := 0; i < p.Config.WAL.MemStoreWorkers; i++ {
go p.applyWALToMemStore(consumer)
}

for i := 0; i < p.Config.WAL.NeptuneWorkers; i++ {
go p.applyWALToNeptune(consumer)
}
}

func (p *Proxy) applyWALToMemStore(consumer *kafka.Consumer) {
batch := make([]*WALOperation, 0, 1000)

for msg := range consumer.Messages() {
op := &WALOperation{}
proto.Unmarshal(msg.Value, op)

batch = append(batch, op)

// Apply batch every 1000 operations or 100ms
if len(batch) >= 1000 || time.Since(lastFlush) > 100*time.Millisecond {
p.applyBatchToMemStore(batch)
consumer.CommitOffset(msg.Offset)
batch = batch[:0]
}
}
}

func (p *Proxy) applyBatchToMemStore(batch []*WALOperation) {
for _, op := range batch {
switch op.Type {
case OperationType_CREATE_VERTEX:
p.memstore.HMSet(
fmt.Sprintf("v:%s", op.Vertex.ID),
serializeProperties(op.Vertex.Properties),
)
case OperationType_CREATE_EDGE:
p.memstore.SAdd(
fmt.Sprintf("v:%s:out:%s", op.Edge.FromVertexID, op.Edge.Label),
[]string{op.Edge.ToVertexID},
)
p.memstore.SAdd(
fmt.Sprintf("v:%s:in:%s", op.Edge.ToVertexID, op.Edge.Label),
[]string{op.Edge.FromVertexID},
)
}
}
}

WAL operation protobuf:

message WALOperation {
enum OperationType {
CREATE_VERTEX = 0;
UPDATE_VERTEX = 1;
DELETE_VERTEX = 2;
CREATE_EDGE = 3;
DELETE_EDGE = 4;
}

OperationType type = 1;
int64 timestamp = 2;
string transaction_id = 3;

oneof operation {
VertexRecord vertex = 4;
EdgeRecord edge = 5;
}
}

Crash Recovery (Replay from WAL)

On proxy restart:

func (p *Proxy) Recover(ctx context.Context) error {
// 1. Load last checkpoint
checkpoint, err := p.loadCheckpoint()
if err != nil {
return fmt.Errorf("failed to load checkpoint: %w", err)
}

log.Infof("Starting recovery from offset %d", checkpoint.LastOffset)

// 2. Replay WAL from checkpoint to latest
consumer := kafka.NewConsumer(kafka.ConsumerConfig{
GroupID: fmt.Sprintf("graph-recovery-proxy-%d", p.ID),
Topics: []string{"graph-wal"},
})

// Seek to checkpoint offset
consumer.Seek("graph-wal", checkpoint.Partition, checkpoint.LastOffset)

replayCount := 0
startTime := time.Now()

for {
msg, err := consumer.Poll(100 * time.Millisecond)
if err != nil || msg == nil {
break // Reached end of log
}

// Apply operation to MemStore
op := &WALOperation{}
proto.Unmarshal(msg.Value, op)
p.applyOperationToMemStore(op)
replayCount++

// Safety: Stop after 10 minutes
if time.Since(startTime) > 10*time.Minute {
log.Warnf("Replay timeout after %d operations", replayCount)
break
}
}

log.Infof("Recovery complete: replayed %d operations in %v",
replayCount, time.Since(startTime))

// 3. Resume normal operation
p.StartWALConsumers()
return nil
}

Checkpoint format (stored in Neptune or S3):

checkpoint:
proxy_id: 3
partition: 42
last_offset: 12345678
timestamp: 2025-11-14T10:30:00Z
memstore_vertex_count: 10000000
memstore_edge_count: 50000000

Cross-Cluster Consistency

Scenario: 10 proxies, each with independent MemStore

Challenge: How to ensure consistent reads across proxies?

Solution 1: Consistent Hashing (Recommended)

// Each vertex assigned to specific proxy
func (c *Client) getVertexOwner(vertexID string) int {
hash := fnv.New64a()
hash.Write([]byte(vertexID))
return int(hash.Sum64() % uint64(c.NumProxies))
}

// Always route vertex queries to owning proxy
func (c *Client) GetVertex(ctx context.Context, vertexID string) (*Vertex, error) {
proxyID := c.getVertexOwner(vertexID)
proxy := c.proxies[proxyID]
return proxy.Graph.GetVertex(ctx, vertexID)
}

Benefits:

  • Each vertex has single source of truth (one proxy)
  • No consistency problems (no replication)
  • Partitioned WAL: Each proxy consumes its partition

Solution 2: Eventually Consistent Replication (Alternative)

# All proxies replicate entire graph
wal:
replication_mode: broadcast
# Every proxy consumes entire WAL
consumer_group: graph-wal-consumers # Shared group

Trade-offs:

  • ✅ Any proxy can serve any vertex
  • ✅ No cross-proxy routing needed
  • ❌ 10× memory usage (full graph on each proxy)
  • ❌ Eventual consistency (WAL lag)

Performance Characteristics (Three-Layer)

OperationLatencyThroughputPath
Write (CreateVertex)5ms P50, 15ms P99100k/secKafka WAL append
Read (MemStore hit)50 μs P50, 200 μs P991M/secMemStore
Read (Neptune fallback)30ms P50, 100ms P9910k/secNeptune query
Traverse (2-hop, MemStore)200 μs P50, 1ms P99500k/secMemStore sets
Traverse (2-hop, cross-proxy)500 μs P50, 2ms P99200k/secRPC fan-out
Recovery (replay WAL)~10 min for 10M ops-Kafka consume

Write amplification:

  • 1 write → 3 WAL replicas (Kafka)
  • 1 write → 1 MemStore update (async)
  • 1 write → 1 Neptune write (async)
  • Total: 5× amplification (acceptable for durability)

Memory usage (per proxy):

  • 100M vertices × 100 bytes = 10 GB
  • 1B edges × 20 bytes = 20 GB
  • Total: ~30 GB per proxy

Snapshot-Based Updates with Set Difference (Fastest Path)

Problem: Incremental Graph Updates

Scenario: Update 1M vertices out of 100M daily (1% churn)

Traditional approach (WAL replay):

  • Replay 1M operations from WAL
  • Time: 1M ops × 5ms = 5000 seconds (~1.4 hours)

Snapshot-based approach (set difference):

  • Compute diff between snapshots
  • Apply only changes
  • Time: <10 seconds

Architecture: Snapshot Diff Engine

┌────────────────────────────────────────────────────────────┐
│ Graph Update Pipeline │
├────────────────────────────────────────────────────────────┤
│ │
│ 1. Snapshot T1 (yesterday) │
│ vertices_t1 = {v1, v2, v3, ..., v100M} │
│ edges_t1 = {e1, e2, e3, ..., e1B} │
│ │
│ 2. Snapshot T2 (today) │
│ vertices_t2 = {v1, v2', v3, ..., v100M, v100M+1} │
│ edges_t2 = {e1, e2, e3', ..., e1B, e1B+1} │
│ │
│ 3. Compute Set Difference (distributed) │
│ added_vertices = vertices_t2 - vertices_t1 │
│ removed_vertices = vertices_t1 - vertices_t2 │
│ modified_vertices = vertices_t2 ∩ vertices_t1 (changed)│
│ │
│ 4. Apply Changes to MemStore (parallel) │
│ - SAdd for new vertices/edges │
│ - SRem for removed vertices/edges │
│ - HMSet for modified properties │
│ │
└────────────────────────────────────────────────────────────┘

Custom Snapshot Format

Prism snapshot format (space-efficient):

message GraphSnapshotV2 {
SnapshotMetadata metadata = 1;
VertexSet vertices = 2;
EdgeSet edges = 3;
}

message SnapshotMetadata {
string snapshot_id = 1;
int64 timestamp = 2;
int64 vertex_count = 3;
int64 edge_count = 4;
string previous_snapshot_id = 5; // For diff chaining
}

message VertexSet {
// Bloom filter for fast existence checks
bytes bloom_filter = 1;

// Sorted vertex IDs (enables binary search)
repeated string ids = 2;

// Properties stored separately (columnar)
map<string, VertexProperties> properties_by_id = 3;
}

message EdgeSet {
bytes bloom_filter = 1;

// Adjacency list format (space-efficient)
repeated AdjacencyList adjacency_lists = 2;
}

message AdjacencyList {
string vertex_id = 1;
repeated Edge outgoing_edges = 2;
}

Storage format:

  • Compression: zstd (3:1 ratio typical)
  • Partitioning: By vertex ID hash (parallel processing)
  • Indexing: Bloom filters for fast membership tests

Set Difference Algorithm (Distributed)

// Compute vertex differences between two snapshots
func ComputeVertexDiff(snapshotT1, snapshotT2 *GraphSnapshot) (*VertexDiff, error) {
// 1. Load both snapshots into Sets
set1 := NewVertexSet(snapshotT1)
set2 := NewVertexSet(snapshotT2)

// 2. Compute differences (parallel)
var wg sync.WaitGroup
added := make([]string, 0)
removed := make([]string, 0)
modified := make([]string, 0)

// Added: in T2 but not in T1
wg.Add(1)
go func() {
defer wg.Done()
for id := range set2.Vertices {
if !set1.Contains(id) {
added = append(added, id)
}
}
}()

// Removed: in T1 but not in T2
wg.Add(1)
go func() {
defer wg.Done()
for id := range set1.Vertices {
if !set2.Contains(id) {
removed = append(removed, id)
}
}
}()

// Modified: in both but properties changed
wg.Add(1)
go func() {
defer wg.Done()
for id := range set2.Vertices {
if set1.Contains(id) {
props1 := set1.Properties[id]
props2 := set2.Properties[id]
if !propsEqual(props1, props2) {
modified = append(modified, id)
}
}
}
}()

wg.Wait()

return &VertexDiff{
Added: added,
Removed: removed,
Modified: modified,
}, nil
}

Applying Diff to MemStore (Parallel)

// Apply vertex diff to MemStore using batch operations
func (p *Proxy) ApplyVertexDiff(ctx context.Context, diff *VertexDiff, snapshot *GraphSnapshot) error {
// Parallel apply with worker pool
numWorkers := 100
work := make(chan string, numWorkers*10)
var wg sync.WaitGroup

// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
batch := make([]string, 0, 1000)

for vertexID := range work {
batch = append(batch, vertexID)

if len(batch) >= 1000 {
p.applyVertexBatch(ctx, batch, snapshot)
batch = batch[:0]
}
}

if len(batch) > 0 {
p.applyVertexBatch(ctx, batch, snapshot)
}
}()
}

// 1. Remove vertices (fastest: just delete keys)
for _, id := range diff.Removed {
p.memstore.Delete(fmt.Sprintf("v:%s", id))
}

// 2. Add new vertices
for _, id := range diff.Added {
work <- id
}

// 3. Update modified vertices
for _, id := range diff.Modified {
work <- id
}

close(work)
wg.Wait()

return nil
}

func (p *Proxy) applyVertexBatch(ctx context.Context, batch []string, snapshot *GraphSnapshot) {
for _, id := range batch {
vertex := snapshot.Vertices[id]
props := make(map[string][]byte)
for k, v := range vertex.Properties {
props[k] = serializeValue(v)
}
// Lock-free write to MemStore
p.memstore.HMSet(ctx, fmt.Sprintf("v:%s", id), props)
}
}

Performance: WAL vs Snapshot Diff

ApproachUpdate SizeTimeThroughputBest For
WAL Replay1K ops5s200 ops/sReal-time, small updates
WAL Replay1M ops5000s (1.4h)200 ops/s❌ Too slow
Snapshot Diff1K ops10s100 ops/s❌ Overhead too high
Snapshot Diff1M ops10s100k ops/s✅ Bulk updates
Snapshot Diff100M ops100s1M ops/s✅ Full refresh

Key insight: Snapshot diff has O(1) apply time per change (parallel batching), WAL replay is O(n).

Hybrid Strategy (Best of Both Worlds)

namespaces:
- name: social-graph-production
pattern: graph

# Real-time writes: Use WAL
wal:
enabled: true
backend: kafka
topic: graph-wal

# Bulk updates: Use snapshot diff
snapshot_updates:
enabled: true
schedule: "0 2 * * *" # Daily at 2 AM
source: s3://snapshots/social-graph-${DATE}.pb.zst
strategy: set_difference
parallel_workers: 100

# Cutover strategy
update_mode: hybrid
# - Real-time: WAL for minute-to-minute changes
# - Daily: Snapshot diff for bulk reconciliation
wal_retention: 24h # Only keep 1 day of WAL

Timeline:

Day 1:
00:00 - Load snapshot (100M vertices)
00:01 - Start serving traffic (reads from MemStore)
00:01 - Apply WAL for real-time updates

Day 2:
02:00 - Download new snapshot (100M + 1M new)
02:01 - Compute diff (1M added, 500K modified, 200K removed)
02:11 - Apply diff to MemStore (10 seconds)
02:11 - Truncate WAL (no longer needed)
02:11 - Resume real-time WAL application

Set Operations in MemStore

MemStore's set operations are perfect for graph diffs:

// Add outgoing edges (set union)
func (p *Proxy) AddOutgoingEdges(vertexID string, label string, targets []string) {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
p.memstore.SAdd(key, targets) // Lock-free set add
}

// Remove edges (set difference)
func (p *Proxy) RemoveOutgoingEdges(vertexID string, label string, targets []string) {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
p.memstore.SRem(key, targets) // Lock-free set remove
}

// Get neighbors (set membership)
func (p *Proxy) GetNeighbors(vertexID string, label string) []string {
key := fmt.Sprintf("v:%s:out:%s", vertexID, label)
return p.memstore.SMembers(key) // Lock-free set read
}

// Compute edge diff
func ComputeEdgeDiff(edgesT1, edgesT2 []string) (added, removed []string) {
set1 := make(map[string]bool)
set2 := make(map[string]bool)

for _, e := range edgesT1 { set1[e] = true }
for _, e := range edgesT2 { set2[e] = true }

// Added edges
for e := range set2 {
if !set1[e] {
added = append(added, e)
}
}

// Removed edges
for e := range set1 {
if !set2[e] {
removed = append(removed, e)
}
}

return added, removed
}

Snapshot Diff Optimizations

1. Bloom Filters (membership tests):

// Fast negative check: "Is vertex in snapshot?"
if !snapshot.BloomFilter.Contains(vertexID) {
// Definitely not in snapshot, skip diff computation
return nil
}

2. Sorted IDs (binary search):

// O(log n) lookup instead of O(n)
func (s *VertexSet) Contains(id string) bool {
idx := sort.SearchStrings(s.SortedIDs, id)
return idx < len(s.SortedIDs) && s.SortedIDs[idx] == id
}

3. Columnar Properties (avoid deserializing unchanged properties):

// Only deserialize properties for modified vertices
if vertexModified(id) {
props := deserializeProperties(snapshot.Properties[id])
}

4. Parallel Partitioning (independent diff computation):

// Each worker computes diff for its partition
func ComputeDiffPartitioned(snapshotT1, snapshotT2 *GraphSnapshot, numPartitions int) []*VertexDiff {
diffs := make([]*VertexDiff, numPartitions)
var wg sync.WaitGroup

for i := 0; i < numPartitions; i++ {
wg.Add(1)
go func(partition int) {
defer wg.Done()
diffs[partition] = computeDiffForPartition(snapshotT1, snapshotT2, partition)
}(i)
}

wg.Wait()
return diffs
}

Result: 100× Faster Bulk Updates

Traditional WAL approach:

  • 1M operations × 5ms = 5000 seconds (1.4 hours)

Snapshot diff approach:

  • Compute diff: 5 seconds (parallel)
  • Apply changes: 10 seconds (100 workers)
  • Total: 15 seconds (333× faster!)

Best for:

  • Daily batch updates from source databases
  • Full graph refreshes
  • Schema migrations
  • Bulk corrections

Performance Characteristics

Operation Latency (Neptune Backend)

OperationLatency (P50)Latency (P99)Use Case
GetVertex5ms15msFetch single vertex
CreateVertex10ms25msWrite vertex
Traverse (2 hops)30ms100msFriends of friends
ShortestPath50ms200msConnection path
PageRank (10k vertices)5s15sInfluence analysis

Optimization Strategies

  1. Index Vertex IDs: Ensure primary key indexes
  2. Limit Traversal Depth: Set max_depth to prevent explosion
  3. Cache Frequent Queries: Cache GetVertex, short traversals
  4. Batch Operations: Use BatchCreateVertices for bulk inserts
  5. Read Replicas: Route read queries to Neptune replicas
  6. Denormalize: Store frequently accessed properties on edges

Testing Strategy

Local Development with SQLite

# Use SQLite for fast local testing
namespaces:
- name: social-graph-test
pattern: graph
backend:
type: sqlite
database: ":memory:"
seed:
enabled: true
fixtures: ./test/fixtures/social-graph.json

Benefits:

  • No external dependencies
  • Fast setup (<100ms)
  • Same API as production Neptune
  • Easy CI/CD integration

Integration Tests with Real Neptune

func TestGraphTraversal(t *testing.T) {
client := setupTestNeptune(t) // Real Neptune cluster

// Setup test graph
client.Graph.CreateVertex(ctx, CreateVertexRequest{
ID: "A", Label: "User",
})
client.Graph.CreateVertex(ctx, CreateVertexRequest{
ID: "B", Label: "User",
})
client.Graph.CreateEdge(ctx, CreateEdgeRequest{
ID: "A-B", Label: "FOLLOWS",
FromVertexID: "A", ToVertexID: "B",
})

// Test traversal
result, err := client.Graph.Traverse(ctx, TraverseRequest{
StartVertexID: "A",
Steps: []TraversalStep{
{Direction: DirectionOUT, EdgeLabels: []string{"FOLLOWS"}},
},
})

require.NoError(t, err)
assert.Len(t, result.Vertices, 1)
assert.Equal(t, "B", result.Vertices[0].ID)
}

Security Considerations

  1. Vertex ID Authorization: Enforce namespace-level access control
  2. Traversal Depth Limits: Prevent DoS via unbounded traversals
  3. Query Complexity: Limit expensive algorithms (PageRank iterations)
  4. PII in Graphs: Tag sensitive vertex properties in schema
  5. Audit Logging: Log all graph mutations (create, update, delete)

Comparison to Alternatives

vs. Direct Neptune SDK

FeaturePrism Graph PatternDirect Neptune SDK
Query LanguageProtobuf (type-safe)Gremlin strings
Backend Lock-inNo (swap backends)Yes (Neptune only)
CachingBuilt-inManual
TestingLocal SQLiteRequires Neptune
Learning CurveSimple APILearn Gremlin

vs. Direct Neo4j Driver

FeaturePrism Graph PatternNeo4j Driver
Query LanguageProtobufCypher strings
Type SafetyCompile-timeRuntime
BatchingAutomaticManual
CachingConfigurableApplication code

vs. Graph Libraries (NetworkX, igraph)

FeaturePrism Graph PatternNetworkX
ScaleMillions of vertices (backend)In-memory limit
PersistenceDurable storageNot built-in
DistributionMulti-nodeSingle process
OperationsDatabase queriesAlgorithm library

Migration Path

Phase 1: Core Operations (Week 1-2)

Deliverables:

  • Protobuf definitions for Vertex/Edge CRUD
  • Neptune backend implementation (Gremlin)
  • Basic traversal support
  • Go client library

Demo: Social graph with 2-hop traversal

Phase 2: Advanced Traversals (Week 3-4)

Deliverables:

  • Shortest path, all paths
  • Filter expressions (vertex, edge)
  • Pagination for large results
  • Query optimizer pattern

Demo: Knowledge graph with complex queries

Phase 3: Graph Algorithms (Week 5-6)

Deliverables:

  • PageRank
  • Community detection (Louvain)
  • Cycle detection
  • Batch operations

Demo: Recommendation system with collaborative filtering

Phase 4: Multi-Backend Support (Week 7-8)

Deliverables:

  • Neo4j backend (Cypher)
  • SQLite backend (local testing)
  • Backend migration tool
  • Performance benchmarks

Demo: Same application on Neptune vs Neo4j

Open Questions

  1. Graph Schema Evolution: How to handle schema changes in production graphs?
  2. Distributed Graphs: Support for graph partitioning across backends?
  3. Temporal Graphs: Time-traveling queries (graph state at timestamp)?
  4. Graph Streaming: Real-time graph updates via CDC?
  5. Custom Algorithms: Plugin system for user-defined graph algorithms?

References

Graph Databases

Query Languages

Graph Theory

Revision History

  • 2025-11-14: Initial draft for high-level Graph pattern API