RFC-057: Massive-Scale Graph Sharding for 100B Vertices
Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15
Abstract
This RFC extends Prism's graph pattern (RFC-055) to support massive-scale distributed graphs with 100 billion vertices and trillions of edges. The existing architecture supports 1 billion vertices across 10 proxy instances (100M vertices per proxy). Scaling to 100B vertices requires a multi-tier sharding strategy with lightweight compute nodes, hierarchical partition routing, and intelligent data placement. This RFC defines the distributed sharding architecture, partition strategies, query routing protocols, and operational patterns needed to achieve this scale.
Key Innovations:
- Hierarchical Sharding: Three-tier architecture (cluster → proxy → partition)
- Lightweight Nodes: 1000+ nodes with 100M vertices each
- Hot/Cold Tier Integration: In-memory hot data, S3 cold storage (RFC-059)
- Partition-Aware Queries: Gremlin queries optimized for distributed execution
- Dynamic Rebalancing: Automatic partition migration without downtime
- Fine-Grained Authorization: Vertex-level access control (RFC-061)
Motivation
Current Scale Limitations
RFC-055 demonstrates 1B vertices across 10 proxies:
| Scale Dimension | RFC-055 (Current) | This RFC (Target) | Multiplier |
|---|---|---|---|
| Total Vertices | 1 billion | 100 billion | 100× |
| Total Edges | 10 billion | 10 trillion | 1000× |
| Proxy Instances | 10 | 1000 | 100× |
| Vertices per Node | 100M | 100M | 1× |
| Memory per Node | 30 GB | 30 GB | 1× |
| Total Memory | 300 GB | 30 TB | 100× |
Why Scale Beyond 1B Vertices?
Use Cases Requiring 100B+ Scale
1. Global Social Network Graph
Example: Facebook/Meta-scale social graph
- Users: 3 billion active users
- Connections: Average 200 friends/connections per user
- Total Edges: 600 billion edges (friend relationships)
- Content Interactions: 10 trillion edges (likes, comments, shares)
- Total Vertices: 3B users + 50B posts/photos/videos = 53B vertices
Query Patterns:
- Friends of friends (2-3 hop traversals)
- Mutual connections
- Content recommendations based on friend graph
- Influence analysis (PageRank on 3B users)
2. Global Financial Transaction Network
Example: SWIFT/Visa-scale transaction graph
- Accounts: 5 billion bank accounts worldwide
- Merchants: 100 million merchants
- Transactions: 500 billion transactions per year (vertices)
- Transaction edges: Account → Transaction → Merchant
- Total Vertices: 505B vertices
- Total Edges: 1 trillion edges
Query Patterns:
- Fraud detection (circular transaction paths)
- Money laundering detection (complex multi-hop patterns)
- Account risk scoring (network analysis)
- Real-time suspicious pattern detection
3. Internet of Things (IoT) Device Network
Example: Smart city/industrial IoT graph
- Devices: 50 billion IoT devices globally by 2030
- Sensors: 200 billion sensor readings/day (time-series vertices)
- Device relationships: Physical topology, data flows
- Total Vertices: 50B devices + 73 trillion sensor readings/year = 73B+ vertices
- Total Edges: 500B device-to-device edges + trillions of data flow edges
Query Patterns:
- Device dependency analysis
- Cascade failure prediction
- Data lineage tracking
- Real-time anomaly detection across device clusters
4. Knowledge Graph at Web Scale
Example: Google Knowledge Graph scale
- Entities: 500 billion entities (people, places, things, concepts)
- Relationships: 3.5 trillion semantic relationships
- Total Vertices: 500B entities
- Total Edges: 3.5T relationships
Query Patterns:
- Semantic search (multi-hop entity resolution)
- Question answering (traverse relationship chains)
- Entity disambiguation
- Recommendation via semantic similarity
Problems with Current Architecture
Problem 1: Limited Node Count (10 proxies)
RFC-055 architecture assumes 10-20 proxy instances:
- Partition count fixed at 256
- Consistent hashing assumes small cluster
- Control plane distribution limited to 10-20 nodes
Problem 2: No Hierarchical Routing
Flat partition model doesn't scale to 1000+ nodes:
- Every node needs partition table for all 1000 nodes
- Cross-node queries require 1000-way coordination
- Partition rebalancing touches all nodes
Problem 3: Memory-Only Architecture
RFC-055 keeps entire graph in memory:
- 100B vertices × 100 bytes = 10 TB minimum
- 10T edges × 20 bytes = 200 TB minimum
- Total: 210 TB in-memory (not feasible)
Problem 4: No Data Locality Optimization
Random vertex assignment via consistent hashing:
- Traversals always hit multiple nodes
- No co-location of frequently accessed subgraphs
- No optimization for access patterns
Problem 5: Limited Query Optimization
Single-proxy query execution:
- No distributed query planning
- No partition pruning for filtered queries
- No parallel execution across partitions
Goals
- Horizontal Scalability: Support 1000+ lightweight compute nodes
- Hierarchical Sharding: Three-tier partition hierarchy (cluster → proxy → partition)
- Cost-Effective Storage: Hot/cold data tiers (memory + S3)
- Intelligent Placement: Co-locate related vertices for query optimization
- Distributed Queries: Gremlin query execution across 1000+ nodes
- Dynamic Rebalancing: Add/remove nodes without downtime
- Authorization-Aware: Vertex labeling for fine-grained access control
Non-Goals
- Backend-Agnostic Sharding: This RFC assumes in-memory MemStore + S3 (not Neptune, Neo4j)
- Multi-Region Distribution: Cross-region replication covered in RFC-012
- ACID Transactions: Distributed transactions across shards (future work)
- Schema Evolution: Graph schema changes not covered here
- Real-Time Ingestion: Bulk loading focus, streaming ingestion in RFC-051 (WAL pattern)
Hierarchical Sharding Architecture
Three-Tier Partition Model
┌────────────────────────────────────────────────────────────────┐
│ Tier 1: Cluster │
│ │
│ Global Coordinator (Admin Plane) │
│ - 10-20 clusters, each with 100 proxy nodes │
│ - Cluster ID: 0-19 (e.g., us-west-1, us-east-1) │
│ │
└────────────────────┬───────────────────────────────────────────┘
│
┌───────────┼───────────┬─────────────┐
│ │ │ │
┌────────▼─────┐ ┌──▼────────┐ ┌▼──────────┐ ┌▼──────────────┐
│ Cluster 0 │ │ Cluster 1 │ │ Cluster 2 │ │ ... │
│ (us-west-1) │ │(us-east-1)│ │(eu-west-1)│ │ Cluster 19 │
│ │ │ │ │ │ │ │
│ 100 proxies │ │ 100 proxies│ │100 proxies│ │ 100 proxies │
│ 10B vertices │ │10B vertices│ │10B vertices│ │10B vertices │
└────────┬─────┘ └───────────┘ └───────────┘ └───────────────┘
│
│ Tier 2: Proxy (within cluster)
│
┌────────▼───────────────────────────────────────────────────────┐
│ Cluster 0: 100 Proxy Instances │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Proxy │ │Proxy │ │Proxy │ ... │Proxy │ │
│ │ 0 │ │ 1 │ │ 2 │ │ 99 │ │
│ │ │ │ │ │ │ │ │ │
│ │100M V│ │100M V│ │100M V│ │100M V│ │
│ └───┬──┘ └───┬──┘ └───┬──┘ └───┬──┘ │
│ │ │ │ │ │
└──────┼────────┼────────┼──────────────────┼───────────────────┘
│ │ │ │
│ Tier 3: Partitions (within proxy)
│
┌──────▼──────────────────────────────────────────┐
│ Proxy 0: 64 Partitions │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Part 0 │ │ Part 1 │ │ Part 63 │ │
│ │ 1.56M V │ │ 1.56M V │ │ 1.56M V │ ... │
│ │ Hot data │ │ Warm data│ │ Cold data│ │
│ │ In-memory│ │ Partial │ │ S3 backed│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────┘
Partition Addressing
Vertex ID Format: {cluster_id}:{proxy_id}:{partition_id}:{local_vertex_id}
Example: 02:0045:12:user_alice
- Cluster 2 (eu-west-1)
- Proxy 45 (within cluster 2)
- Partition 12 (within proxy 45)
- Local ID:
user_alice
Routing Algorithm:
Vertex ID: "02:0045:12:user_alice"
Step 1: Parse vertex ID
cluster_id = 02
proxy_id = 0045
partition_id = 12
local_vertex_id = "user_alice"
Step 2: Route to cluster
Global coordinator → Cluster 2 gateway
Step 3: Route to proxy
Cluster 2 gateway → Proxy 45
Step 4: Route to partition
Proxy 45 → Partition 12
Step 5: Execute query
Partition 12 → Lookup "user_alice" in local MemStore/S3
Alternative: Opaque Vertex IDs
Trade-Off: Hierarchical IDs (above) embed topology in vertex ID, making routing fast but rebalancing expensive. Opaque IDs decouple vertex identity from physical location, enabling free rebalancing at the cost of routing overhead (MEMO-050 Finding 15).
Hierarchical IDs (Current Approach)
Advantages:
- Zero-overhead routing: Parse vertex ID to determine partition (O(1), ~10 ns)
- No external dependencies: No routing table required
- Deterministic: Same vertex ID always routes to same partition
Disadvantages:
- Expensive rebalancing: Moving partition requires rewriting all vertex IDs
- Topology-dependent: Vertex IDs encode cluster/proxy/partition structure
- Limited flexibility: Cannot easily move individual vertices between partitions
Rebalancing Cost Example:
Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89
With hierarchical IDs:
1. Update all 1.56M vertex IDs in partition
- Old: 07:0042:05:user_alice → New: 07:0089:05:user_alice
2. Update all incoming edge references (10M edges pointing to this partition)
3. Update all indexes containing these vertex IDs
4. Broadcast ID changes to all proxies
Cost: ~30 minutes for 1.56M vertices + 10M edge updates
Downtime: Partition unavailable during rewrite
Opaque IDs (Alternative Approach)
Design: Vertex IDs are globally unique, topology-independent identifiers. Routing table maps opaque ID → physical location.
Vertex ID Format: vertex:{uuid} or vertex:{snowflake_id}
Example: vertex:550e8400-e29b-41d4-a716-446655440000
Routing Table Architecture:
message VertexRoutingTable {
// Distributed hash table: vertex_id → partition_location
map<string, PartitionLocation> routing_table = 1;
// Bloom filter for fast negative lookups
bytes bloom_filter = 2;
}
message PartitionLocation {
string cluster_id = 1;
string proxy_id = 2;
string partition_id = 3;
int64 last_updated = 4;
}
Routing Implementation:
type OpaqueVertexRouter struct {
routingTable *DistributedHashTable
bloomFilter *bloom.BloomFilter
cache *RoutingCache
}
func (ovr *OpaqueVertexRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Fast path: Check cache (99% hit rate)
if location := ovr.cache.Get(vertexID); location != nil {
return location, nil
}
// Bloom filter: Fast negative lookup
if !ovr.bloomFilter.Contains(vertexID) {
return nil, VertexNotFoundError{VertexID: vertexID}
}
// Slow path: Lookup in routing table
location, err := ovr.routingTable.Get(vertexID)
if err != nil {
return nil, err
}
// Cache result
ovr.cache.Put(vertexID, location)
return location, nil
}
Routing Table Storage: Distributed across cluster using consistent hashing
type DistributedRoutingTable struct {
shards map[uint32]*RoutingTableShard
}
func (drt *DistributedRoutingTable) GetShard(vertexID string) *RoutingTableShard {
hash := xxhash.Sum64String(vertexID)
shardID := hash % uint64(len(drt.shards))
return drt.shards[uint32(shardID)]
}
func (drt *DistributedRoutingTable) Get(vertexID string) (*PartitionLocation, error) {
shard := drt.GetShard(vertexID)
return shard.Lookup(vertexID)
}
Routing Cache Strategy:
routing_cache_config:
type: local_lru # Per-proxy cache
max_entries: 1_000_000 # 1M vertex locations cached
ttl: 3600s # 1 hour expiration
memory_size: 100 MB # ~100 bytes per entry
performance:
hit_rate: 99% # Power-law: 1% of vertices get 99% of queries
cold_lookup: 1 μs # Cache miss → routing table lookup
hot_lookup: 10 ns # Cache hit
Rebalancing with Opaque IDs:
Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89
With opaque IDs:
1. Copy partition data to Proxy 89
2. Update routing table: vertex_id → 07:0089:05 (for all vertices in partition)
3. Broadcast routing table update to all proxies
4. Invalidate caches
Cost: ~10 seconds for routing table updates
Downtime: None (dual-read during migration)
Vertex IDs: Unchanged (no rewrites needed)
Performance Comparison
Routing Latency:
| Operation | Hierarchical IDs | Opaque IDs (Cold) | Opaque IDs (Hot) |
|---|---|---|---|
| Parse vertex ID | 10 ns | - | - |
| Routing table lookup | - | 1 μs | 10 ns (cached) |
| Total routing time | 10 ns | 1 μs | 10 ns |
Cache hit rate: 99% (power-law distribution)
Effective routing latency: 0.01 × 1 μs + 0.99 × 10 ns = 20 ns average
Rebalancing Time:
| Operation | Hierarchical IDs | Opaque IDs | Speedup |
|---|---|---|---|
| Move single partition | 30 minutes | 10 seconds | 180× |
| Move 100 partitions | 50 hours | 16 minutes | 187× |
| Rebalance entire cluster | 3 days | 2 hours | 36× |
Hybrid Approach (Recommended)
Best of both worlds: Use hierarchical IDs for most vertices, opaque IDs for frequently moved data
vertex_id_strategy:
default: hierarchical # Most vertices use cluster:proxy:partition:id
opaque_for:
- hot_partitions # Frequently rebalanced partitions
- cross_partition_vertices # Vertices referenced from many partitions
- user_managed_ids # Application-controlled IDs
routing_table:
shards: 256 # Distributed routing table shards
replication: 3 # Routing table replicated 3×
cache_size: 1_000_000 # Per-proxy cache
Hybrid Routing:
func (hr *HybridRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Check if hierarchical or opaque
if hr.IsHierarchicalID(vertexID) {
// Fast path: Parse ID directly
return hr.ParseHierarchicalID(vertexID), nil
}
// Slow path: Lookup in routing table
return hr.opaqueRouter.RouteVertex(vertexID)
}
func (hr *HybridRouter) IsHierarchicalID(vertexID string) bool {
// Hierarchical IDs match pattern: cluster:proxy:partition:local_id
return strings.Count(vertexID, ":") == 3 && len(strings.Split(vertexID, ":")) == 4
}
Decision Matrix
| Use Case | Recommended Approach | Rationale |
|---|---|---|
| Static graph | Hierarchical IDs | No rebalancing needed, zero routing overhead |
| Frequently rebalanced | Opaque IDs | 180× faster rebalancing, 1 μs routing acceptable |
| Mixed workload | Hybrid (80% hierarchical, 20% opaque) | Optimize common case, flexibility for hot data |
| Application-controlled IDs | Opaque IDs | Apps provide IDs, system maintains routing |
Summary
Opaque vertex IDs trade 1 μs routing overhead for free rebalancing:
- Routing overhead: 1 μs cold, 10 ns hot (99% cache hit rate)
- Rebalancing speedup: 180× faster (10 seconds vs 30 minutes per partition)
- Memory overhead: 100 MB per proxy for routing cache (negligible)
- Operational benefit: Enable live rebalancing without downtime
Recommendation: Start with hierarchical IDs for simplicity, migrate to hybrid approach when operational flexibility is needed. Reserve opaque IDs for hot partitions and application-managed identifiers.
Partition Size Guidelines
| Tier | Count | Size | Memory | Purpose |
|---|---|---|---|---|
| Cluster | 10-20 | 5-10B vertices | - | Geographic/organizational boundary |
| Proxy | 100 per cluster | 100M vertices | 30 GB | Compute node with in-memory cache |
| Partition | 64 per proxy | 1.56M vertices | 156 MB | Hot/warm/cold data unit |
Why 64 partitions per proxy? (Updated from 16 based on MEMO-050 Finding 6)
- Finer hot/cold granularity: 156 MB partitions vs 625 MB (4× smaller)
- Faster rebalancing: 10× more parallelism, 13 seconds vs 2.1 minutes
- Better load distribution: Lower variance (2% vs 15%)
- Smaller failure blast radius: 1.56M vertices vs 6.25M
Trade-off: Slightly more partition metadata (64k vs 16k cluster-wide), but benefits outweigh costs at 100B scale.
Network Topology-Aware Sharding
Critical: At 100B scale, cross-AZ bandwidth costs $365M/year without network-aware placement (MEMO-050 Finding 3). This section defines network topology integration for 95% cost savings.
AWS Network Cost Model
| Network Path | Latency | Cost | Use Case |
|---|---|---|---|
| Same rack | 100 μs | $0/GB | Preferred |
| Same AZ | 200 μs | $0/GB | Good |
| Cross-AZ | 1-2 ms | $0.01/GB | Expensive! |
| Cross-region | 20-100 ms | $0.02/GB | Very expensive! |
Cost Impact Without Optimization:
Typical 2-hop query: 100 partitions × 100 KB = 10 MB
50% cross-AZ traffic: 5 MB cross-AZ per query
At 1B queries/day: 5 PB/day × $0.01/GB = $50M/day = $1.5B/month ❌
Cost Impact With Optimization:
AZ-aware placement: 5% cross-AZ traffic (not 50%)
At 1B queries/day: 250 TB/day × $0.01/GB = $2.5M/day = $75M/month
Annual: $900M/year → With CDN: $30M/year ✅
Extended Partition Metadata
message PartitionMetadata {
string partition_id = 1;
string cluster_id = 2;
string proxy_id = 3;
// Network topology (NEW)
NetworkLocation location = 4;
}
message NetworkLocation {
string region = 1; // "us-west-2"
string availability_zone = 2; // "us-west-2a"
string rack_id = 3; // "rack-42" (optional, for same-DC optimization)
// Network characteristics for cost/latency calculations
float cross_az_bandwidth_gbps = 4; // Typical: 10 Gbps
float cross_region_bandwidth_gbps = 5; // Typical: 1-5 Gbps
}
Multi-AZ Deployment Strategy
cluster_topology:
region: us-west-2
availability_zones:
- id: us-west-2a
proxies: 334
primary_for: [cluster_0, cluster_3, cluster_6, cluster_9]
- id: us-west-2b
proxies: 333
primary_for: [cluster_1, cluster_4, cluster_7]
- id: us-west-2c
proxies: 333
primary_for: [cluster_2, cluster_5, cluster_8]
# Partition replication for availability
partition_replication:
factor: 3 # 3 replicas per partition
placement: different_az # Each replica in different AZ
consistency: eventual # Async replication (lower cross-AZ cost)
read_preference: primary_az # Read from local AZ first (zero cost)
# Failover policy
failover:
enabled: true
timeout: 100ms # Try local AZ first
fallback: cross_az # Fall back to remote AZ if needed
Locality-Aware Partitioning
Strategy: Co-locate frequently co-traversed vertices in same AZ.
type NetworkAwarePartitioner struct {
topologyMap *NetworkTopology
localityHints map[string]string // vertex_id → preferred_az
}
func (nap *NetworkAwarePartitioner) AssignPartition(
vertex *Vertex,
placementHint *PlacementHint,
) string {
// Default: Consistent hashing (random placement)
defaultPartition := nap.ConsistentHash(vertex.ID)
// If hint provided, prefer co-location in same AZ
if placementHint != nil && placementHint.CoLocateWithVertex != "" {
// Place in same AZ as target vertex
targetPartition := nap.GetPartitionForVertex(placementHint.CoLocateWithVertex)
targetAZ := nap.topologyMap.GetAZ(targetPartition)
// Find partition in same AZ with capacity
sameAZPartition := nap.FindPartitionInAZ(targetAZ)
if sameAZPartition != "" {
return sameAZPartition
}
}
// Fall back to consistent hashing
return defaultPartition
}
// Example: Social graph with locality
func CreateFriendship(user1, user2 string) error {
// Place user2 in same AZ as user1 if possible
edge := &Edge{
FromVertexID: user1,
ToVertexID: user2,
Label: "FRIENDS",
PlacementHint: &PlacementHint{
CoLocateWithVertex: user1,
Reason: "Reduce cross-AZ traffic for friend traversals",
},
}
return CreateEdge(edge)
}
Query Routing with Network Cost
type NetworkCostModel struct {
sameAZLatency time.Duration // 200 μs
crossAZLatency time.Duration // 2 ms
crossRegionLatency time.Duration // 50 ms
sameAZCost float64 // $0/GB
crossAZCost float64 // $0.01/GB
crossRegionCost float64 // $0.02/GB
}
func (qp *QueryPlanner) OptimizeWithNetworkCost(plan *QueryPlan) *QueryPlan {
for i, stage := range plan.Stages {
// Group partitions by AZ
partitionsByAZ := qp.GroupPartitionsByAZ(stage.TargetPartitions)
// Prefer partitions in same AZ as coordinator
coordinatorAZ := qp.GetCoordinatorAZ()
// Reorder: local AZ first, then remote
optimizedOrder := []string{}
optimizedOrder = append(optimizedOrder, partitionsByAZ[coordinatorAZ]...)
for az, partitions := range partitionsByAZ {
if az != coordinatorAZ {
optimizedOrder = append(optimizedOrder, partitions...)
}
}
stage.TargetPartitions = optimizedOrder
// Estimate network cost
stage.EstimatedNetworkCost = qp.CalculateNetworkCost(stage)
plan.Stages[i] = stage
}
return plan
}
func (qp *QueryPlanner) CalculateNetworkCost(stage *ExecutionStage) float64 {
cost := 0.0
coordinatorAZ := qp.GetCoordinatorAZ()
for _, partitionID := range stage.TargetPartitions {
partitionAZ := qp.GetPartitionAZ(partitionID)
// Estimate data transfer (100 KB per partition average)
dataTransferGB := 0.0001 // 100 KB
if partitionAZ == coordinatorAZ {
cost += 0 // Same AZ: free
} else {
cost += dataTransferGB * qp.networkCostModel.crossAZCost
}
}
return cost
}
Deployment Patterns by Scale
1B Vertices (10 Nodes): Single AZ
strategy: single_az
rationale: Small cluster, cross-AZ overhead > benefits
cost: $100k/year
availability: Single AZ failure = full outage
trade_off: Simple but less available
10B Vertices (100 Nodes): Multi-AZ with Read Preferences
strategy: multi_az_read_local
rationale: Amortize cross-AZ cost, most queries stay local
cost: $5M/year (10% cross-AZ traffic)
availability: AZ failure = 33% capacity loss (degraded)
trade_off: Balanced cost/availability
100B Vertices (1000 Nodes): Multi-AZ with Aggressive Optimization
strategy: multi_az_network_optimized
rationale: Network cost dominates, must optimize aggressively
cost: $47M/year (5% cross-AZ traffic)
availability: AZ failure = 33% capacity loss
trade_off: Complex but essential at scale
Cost Savings Summary
| Scale | Without AZ Awareness | With AZ Awareness | Savings |
|---|---|---|---|
| 1B vertices | $100k/year | $100k/year | 0% (single AZ) |
| 10B vertices | $45M/year | $5M/year | 89% |
| 100B vertices | $365M/year | $30M/year | 92% |
Key Takeaway: At 100B scale, network-aware placement is mandatory, not optional. Without it, bandwidth costs exceed infrastructure costs by 10×.
Sharding Strategies
Strategy 1: Hierarchical Consistent Hashing (Default)
Three-level hash:
1. Cluster Assignment: hash(vertex_id) % num_clusters → cluster_id
2. Proxy Assignment: hash(vertex_id) % proxies_per_cluster → proxy_id
3. Partition Assignment: hash(vertex_id) % partitions_per_proxy → partition_id
Example:
vertex_id = "user:alice"
Level 1 (Cluster):
xxhash64("user:alice") % 10 = 7 → Cluster 7
Level 2 (Proxy within cluster 7):
xxhash64("user:alice") % 100 = 42 → Proxy 42
Level 3 (Partition within proxy 42):
xxhash64("user:alice") % 64 = 37 → Partition 37
Full address: 07:0042:37:user:alice
Hash Function Choice: xxHash vs CRC32
At 100B scale, hash function quality significantly impacts load distribution. Poor distribution leads to hotspots, uneven resource utilization, and degraded performance.
Benchmark Comparison (100M vertex IDs hashed):
| Hash Function | Throughput | Collision Rate | Load Variance | Notes |
|---|---|---|---|---|
| xxHash64 | 8.5 GB/s | 1 in 100k | 2% | Recommended ✅ |
| CRC32 | 5.0 GB/s | 1 in 10k | 15% | Legacy, poor distribution |
| MurmurHash3 | 3.8 GB/s | 1 in 50k | 5% | Good, but slower |
| Jump Hash | 12.0 GB/s | N/A | <1% | Best for minimal rebalancing |
Key Findings:
- xxHash64: 1.7× faster than CRC32, 8× better load distribution (15% → 2% variance)
- Jump Hash: Optimal for scenarios requiring minimal data movement during rebalancing (adds/removes nodes with minimal key reassignment)
- CRC32: Deprecated due to poor distribution properties at massive scale
Implementation (Go):
import "github.com/cespare/xxhash/v2"
func GetPartitionAddress(vertexID string, numClusters, proxiesPerCluster, partitionsPerProxy int) string {
hash := xxhash.Sum64String(vertexID)
clusterID := hash % uint64(numClusters)
proxyID := hash % uint64(proxiesPerCluster)
partitionID := hash % uint64(partitionsPerProxy)
return fmt.Sprintf("%02d:%04d:%02d:%s", clusterID, proxyID, partitionID, vertexID)
}
Alternative: Jump Hash for Rebalancing
For scenarios with frequent topology changes (adding/removing proxies), Jump Hash minimizes data movement:
import "github.com/dgryski/go-jump"
func GetPartitionWithJumpHash(vertexID string, numPartitions int) int {
hash := xxhash.Sum64String(vertexID)
return int(jump.Hash(hash, numPartitions))
}
Jump Hash Properties:
- When adding 1 proxy: Only 1/N keys move (vs 50% with modulo hashing)
- Example: Add proxy 101 to 100-proxy cluster → Only 1% of keys rebalance
- Trade-off: Slightly more complex routing logic
Benefits:
- Even distribution across all tiers
- Deterministic routing (no lookups needed)
- Minimal rebalancing on topology changes
Drawbacks:
- No data locality (related vertices scattered)
- No access pattern optimization
Strategy 2: Locality-Aware Partitioning
Principle: Co-locate vertices that are frequently traversed together.
Examples:
Social Graph Locality
User "alice" has 200 friends. Co-locate alice + friends on same proxy.
Approach:
1. Assign user to cluster/proxy via consistent hash
2. When creating friend edge:
- Check if target vertex exists
- If not, create on SAME proxy as source vertex
3. Result: Most 1-hop traversals stay within single proxy
Benefits:
- Friends-of-friends queries mostly single-proxy
- No cross-node RPC for common queries
- 10-100× faster traversals
Financial Transaction Locality
Account "acct_123" has 1000 transactions. Co-locate transactions near account.
Approach:
1. Assign account to cluster/proxy
2. Create transactions on same proxy (partition determined by timestamp)
3. Result: Account history queries are local
Benefits:
- Transaction history: single proxy query
- Account risk analysis: local graph traversal
Implementation:
message CreateVertexRequest {
string id = 1;
string label = 2;
map<string, Value> properties = 3;
// Locality hint
optional PlacementHint placement_hint = 4;
}
message PlacementHint {
oneof hint {
string co_locate_with_vertex = 1; // Place near this vertex
string co_locate_with_label = 2; // Place with vertices of this type
string explicit_partition = 3; // Explicit partition ID
}
}
Co-location Algorithm:
func (g *GraphShardManager) CreateVertex(req *CreateVertexRequest) error {
var targetPartition string
if req.PlacementHint != nil {
switch hint := req.PlacementHint.Hint.(type) {
case *PlacementHint_CoLocateWithVertex:
// Place on same partition as target vertex
targetVertex := g.GetVertex(hint.CoLocateWithVertex)
targetPartition = targetVertex.PartitionID
case *PlacementHint_ExplicitPartition:
targetPartition = hint.ExplicitPartition
default:
// Fall back to consistent hashing
targetPartition = g.CalculatePartition(req.Id)
}
} else {
targetPartition = g.CalculatePartition(req.Id)
}
// Create vertex on target partition
return g.CreateVertexOnPartition(targetPartition, req)
}
Strategy 3: Label-Based Partitioning
Principle: Partition by vertex label (type), not vertex ID.
Example: Multi-tenant SaaS graph
Labels: User, Post, Comment, Organization
Partitioning:
Cluster 0: All "User" vertices
Cluster 1: All "Post" vertices
Cluster 2: All "Comment" vertices
Cluster 3: All "Organization" vertices
Benefits:
- Label-specific queries are single-cluster
- Easy to scale individual vertex types
- Clear capacity planning
Drawbacks:
- Cross-label traversals always multi-cluster
- Imbalanced load if one label dominates
Configuration:
partition_strategy:
type: label_based
label_cluster_mapping:
User: cluster_0
Post: cluster_1
Comment: cluster_2
Organization: cluster_3
fallback: consistent_hash # For unknown labels
Strategy 4: Hybrid Sharding
Principle: Combine strategies based on access patterns.
Example: Social network with hot users
Tier 1 (Cluster): Label-based
- Cluster 0-5: Regular users (consistent hash)
- Cluster 6: VIP users (celebrities, influencers)
- Cluster 7: Business pages
- Cluster 8: Groups
Tier 2 (Proxy): Locality-aware
- VIP users: Co-locate with their posts
- Regular users: Consistent hash
Tier 3 (Partition): Access frequency
- Hot partitions: In-memory
- Warm partitions: Partial memory
- Cold partitions: S3-backed
Configuration:
partition_strategy:
type: hybrid
cluster_strategy:
type: label_based
labels:
regular_user: [0, 1, 2, 3, 4, 5]
vip_user: [6]
business_page: [7]
group: [8]
proxy_strategy:
type: locality_aware
co_locate_edge_labels:
- FRIENDS
- FOLLOWS
- MEMBER_OF
partition_strategy:
type: access_frequency
hot_threshold: 1000_requests_per_minute
cold_threshold: 10_requests_per_minute
Distributed Query Routing
Query Execution Models
Model 1: Gather-Scatter (Simple Traversals)
Use Case: Broad queries that touch many partitions
Query: Get all users in organization "acme-corp"
Step 1: Parse query
Start: organization:acme-corp
Traversal: IN(MEMBER_OF) → User vertices
Step 2: Identify partition scope
Query coordinator: Scan all partitions (can't prune)
Step 3: Scatter phase
Coordinator → Send subquery to all 1000 proxies in parallel
Step 4: Gather phase
Each proxy returns matching vertices
Coordinator aggregates results
Step 5: Return to client
Total: 50,000 users in org
Performance: O(num_partitions) parallelism
Model 2: Partition-Aware Pruning (Filtered Traversals)
Use Case: Queries with partition-prunable filters
Query: Get user "alice" and traverse 2 hops
Step 1: Parse query
Start: user:alice (deterministic partition via vertex ID)
Step 2: Identify start partition
vertex_id = "user:alice"
partition = hash("user:alice") = 07:0042:05
Step 3: Execute first hop locally
Proxy 07:0042 executes: user:alice.out(FOLLOWS)
Result: 200 friends (all may be on different partitions)
Step 4: Group by partition
Friends on Proxy 07:0042: 50 (local)
Friends on other proxies: 150 (remote)
Step 5: Second hop (distributed)
Execute out(FOLLOWS) on 50 local friends (fast)
Fan-out to 20 other proxies for 150 remote friends (parallel RPC)
Step 6: Aggregate
Coordinator collects results from 20 proxies
Total: 5,000 friends-of-friends
Performance: O(num_partitions_touched) << O(total_partitions)
Model 3: Index-Accelerated Queries
Use Case: Queries with indexed property filters
Query: Find users where city = "San Francisco" AND age > 25
Step 1: Check for index
Index exists: city_index (partition-local)
Step 2: Index scan (parallel across all partitions)
Each partition scans local city_index["San Francisco"]
Filter results by age > 25
Step 3: Return matches
Total: 125,000 users across 500 partitions
Performance: O(matches_per_partition), not O(total_vertices)
Cross-Partition Traversal Optimization
Problem: 2-hop query touches 1M vertices across 1000 partitions
Solution 1: Adaptive Parallelism
func (q *QueryCoordinator) ExecuteTraversal(start string, hops int) ([]Vertex, error) {
currentLevel := []string{start}
for hop := 0; hop < hops; hop++ {
// Group vertices by partition
partitionGroups := q.groupByPartition(currentLevel)
// Adaptive parallelism based on partition count
if len(partitionGroups) < 10 {
// Few partitions: Execute sequentially for each
nextLevel = q.executeSequential(partitionGroups)
} else if len(partitionGroups) < 100 {
// Medium partitions: Execute in batches of 10
nextLevel = q.executeParallelBatched(partitionGroups, 10)
} else {
// Many partitions: Full parallelism
nextLevel = q.executeFullParallel(partitionGroups)
}
currentLevel = nextLevel
}
return currentLevel, nil
}
Solution 2: Early Termination
// Limit traversal breadth to prevent explosion
func (q *QueryCoordinator) ExecuteTraversal(
start string,
hops int,
maxResultsPerHop int,
) ([]Vertex, error) {
currentLevel := []string{start}
for hop := 0; hop < hops; hop++ {
partitionGroups := q.groupByPartition(currentLevel)
nextLevel := []string{}
for _, group := range partitionGroups {
neighbors := q.executePartitionTraversal(group)
nextLevel = append(nextLevel, neighbors...)
// Early termination if too many results
if len(nextLevel) > maxResultsPerHop {
nextLevel = nextLevel[:maxResultsPerHop]
break
}
}
currentLevel = nextLevel
}
return currentLevel, nil
}
Solution 3: Partition-Local Caching
Query: Frequent traversal from popular vertex "celebrity_user"
Optimization:
1. First query: celebrity_user.out(FOLLOWS) → 10M followers
Cache result on query coordinator
2. Subsequent queries: Use cached follower list
TTL: 60 seconds
3. Invalidation: On edge create/delete to celebrity_user
Performance:
Cold query: 500ms (hits 200 partitions)
Warm query: 5ms (cached)
Query Coordinator Architecture
┌────────────────────────────────────────────────────────────────┐
│ Query Coordinator │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Query Parser │ │ Query Planner│ │ Result Cache │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Distributed Execution Engine │ │
│ │ - Partition pruning │ │
│ │ - Parallel scatter-gather │ │
│ │ - Adaptive parallelism │ │
│ │ - Early termination │ │
│ └────────────────────────┬───────────────────────────┘ │
│ │ │
└────────────────────────────┼───────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Partition │ │ Partition │ │ Partition │
│ Executor 1 │ │ Executor 2 │ ... │Executor 1000│
│ │ │ │ │ │
│ Proxy 1 │ │ Proxy 2 │ │ Proxy 1000 │
└─────────────┘ └─────────────┘ └─────────────┘
Partition Management
Partition States
enum PartitionState {
PARTITION_STATE_UNSPECIFIED = 0;
PARTITION_STATE_COLD = 1; // S3-backed, not in memory
PARTITION_STATE_WARMING = 2; // Loading into memory
PARTITION_STATE_WARM = 3; // Partially in memory
PARTITION_STATE_HOT = 4; // Fully in memory
PARTITION_STATE_DRAINING = 5; // Being migrated to another proxy
PARTITION_STATE_OFFLINE = 6; // Not accessible
}
Partition Migration Protocol
Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89
Step 1: Prepare Target (Proxy 89)
Admin → Proxy 89: PreparePartition(07:0042:05)
Proxy 89: Allocate memory, download from S3 if needed
Proxy 89 → Admin: Ready
Step 2: Dual-Write Phase
Admin updates routing table: partition 07:0042:05 → [Proxy 42, Proxy 89]
Writes go to BOTH proxies
Reads go to Proxy 42 (primary)
Duration: Until Proxy 89 catches up
Step 3: Synchronize State
Proxy 42 → Proxy 89: Stream recent writes (WAL replay)
Proxy 89 applies writes, reports lag
When lag < 1 second → Ready for cutover
Step 4: Cutover
Admin updates routing: partition 07:0042:05 → Proxy 89 (single writer)
Proxy 89 becomes primary
Proxy 42 continues serving reads temporarily
Step 5: Drain Source
Admin → Proxy 42: DrainPartition(07:0042:05)
Proxy 42: Wait for in-flight queries to complete
Proxy 42: Unload partition from memory
Proxy 42 → Admin: Drained
Step 6: Confirm Complete
Admin removes Proxy 42 from routing table for partition 07:0042:05
Migration complete
Downtime: Zero (dual-write + cutover)
Automatic Partition Temperature Management
Temperature Metrics:
- Hot: >1000 requests/minute
- Warm: 10-1000 requests/minute
- Cold: <10 requests/minute
Automatic Actions:
partition_temperature_policy:
hot_threshold: 1000 # requests/minute
cold_threshold: 10
actions:
# Hot partition: Ensure in memory
- condition: temperature == HOT
action: load_to_memory
priority: high
# Warm partition: Keep in memory if space available
- condition: temperature == WARM
action: keep_in_memory_if_space
priority: medium
# Cold partition: Offload to S3
- condition: temperature == COLD
action: offload_to_s3
delay: 3600s # Keep in memory for 1 hour before offloading
Implementation:
func (pm *PartitionManager) MonitorTemperature() {
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
for _, partition := range pm.GetAllPartitions() {
temp := pm.GetTemperature(partition.ID)
switch {
case temp.RequestsPerMinute > 1000:
// Hot: Ensure in memory
if partition.State != PartitionStateHot {
pm.LoadPartitionToMemory(partition.ID)
}
case temp.RequestsPerMinute < 10:
// Cold: Offload to S3 after delay
if time.Since(temp.LastAccess) > time.Hour {
pm.OffloadPartitionToS3(partition.ID)
}
default:
// Warm: No action
}
}
}
}
Failure Detection and Recovery
Problem: At 1000 nodes, hardware failures are not exceptional—they are statistically inevitable. With a mean time between failures (MTBF) of 3 years per node, expect ~1 failure per day cluster-wide. Without automated detection and recovery, manual intervention would be required multiple times daily, making the system operationally infeasible.
Failure Probability at Scale
Cluster: 1000 nodes
MTBF per node: 3 years = 1,095 days
Cluster-wide failure rate: 1000 nodes ÷ 1095 days = 0.91 failures/day
Expected failures:
- Per day: ~1 node failure
- Per week: ~7 node failures
- Per month: ~30 node failures
- Per year: ~333 node failures (1/3 of cluster replaced annually)
Key Insight: Failure is not an edge case—it's the steady state. Recovery must be fully automated, <60s MTTR, and transparent to queries.
Heartbeat-Based Failure Detection
Three-Tier Heartbeat System:
heartbeat_configuration:
# Tier 1: Proxy → Cluster Gateway
proxy_to_gateway:
interval: 5s
timeout: 15s
missed_threshold: 3 # 3 missed heartbeats = 15s detection
# Tier 2: Cluster Gateway → Global Coordinator
gateway_to_coordinator:
interval: 10s
timeout: 30s
missed_threshold: 3 # 30s detection
# Tier 3: Partition → Proxy (health check)
partition_health:
check_interval: 1s
check_type: simple_query # SELECT 1 equivalent
timeout: 100ms
Implementation:
type FailureDetector struct {
proxies map[string]*ProxyHealth
lastHeartbeat map[string]time.Time
missedHeartbeats map[string]int
failureHandlers []FailureHandler
}
func (fd *FailureDetector) MonitorHeartbeats() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
now := time.Now()
for proxyID, lastSeen := range fd.lastHeartbeat {
timeSinceLastHeartbeat := now.Sub(lastSeen)
// Check if heartbeat missed
if timeSinceLastHeartbeat > 5*time.Second {
fd.missedHeartbeats[proxyID]++
// Threshold exceeded → Mark as failed
if fd.missedHeartbeats[proxyID] >= 3 {
fd.HandleProxyFailure(proxyID)
}
} else {
// Heartbeat received → Reset counter
fd.missedHeartbeats[proxyID] = 0
}
}
}
}
func (fd *FailureDetector) ReceiveHeartbeat(proxyID string) {
fd.lastHeartbeat[proxyID] = time.Now()
fd.missedHeartbeats[proxyID] = 0
}
Detection Latency: 15 seconds maximum (3 missed × 5s intervals)
Recovery Strategy: Fast Failover vs S3 Restore
Option A: Replica Failover (Fast)
When partition has in-cluster replicas (3× replication within cluster):
Failure detected: Proxy 42 unresponsive (15s)
Step 1: Identify affected partitions (500ms)
- Proxy 42 hosts partitions 07:0042:00 through 07:0042:63 (64 partitions)
Step 2: Find replica proxies (100ms)
- Primary: Proxy 42 (FAILED)
- Replica 1: Proxy 18 (HEALTHY)
- Replica 2: Proxy 91 (HEALTHY)
Step 3: Promote Replica 1 to primary (5s)
- Update routing table: 07:0042:* → Proxy 18
- Broadcast routing update to all proxies
- Resume query traffic
Total recovery time: 15s detection + 5s promotion = 20 seconds
Data loss: None (replica up-to-date via WAL)
Implementation:
func (fm *FailureManager) HandleProxyFailure(proxyID string) error {
// Get all partitions on failed proxy
partitions := fm.GetPartitionsOnProxy(proxyID)
for _, partition := range partitions {
// Check for in-cluster replicas
replicas := fm.GetReplicasForPartition(partition.ID)
if len(replicas) > 0 {
// Fast path: Promote replica
healthyReplica := fm.FindHealthyReplica(replicas)
if healthyReplica != nil {
fm.PromoteReplica(partition.ID, healthyReplica.ProxyID)
log.Infof("Promoted replica for partition %s to proxy %s (10s recovery)",
partition.ID, healthyReplica.ProxyID)
continue
}
}
// Slow path: Restore from S3
fm.RestorePartitionFromS3(partition.ID)
}
return nil
}
Option B: S3 Restore (Slow)
When no in-cluster replicas available:
Failure detected: Proxy 42 unresponsive (15s)
Step 1: Identify affected partitions (500ms)
- Partitions 07:0042:00 through 07:0042:63 (64 partitions)
Step 2: Assign to replacement proxy (1s)
- Select least-loaded proxy: Proxy 23
Step 3: Restore from S3 (parallel, 64 partitions) (5 min)
- Download partition snapshot: s3://cluster-07/partition-0042-00.pb.gz
- Decompress and load into memory
- Rebuild indexes
- 64 partitions × 156 MB = 10 GB total
- Download: 10 GB ÷ 200 MB/s = 50 seconds
- Decompress + Load: 4 minutes
Total recovery time: 15s detection + 5 min restore = 5 min 15 seconds
Data loss: Writes since last snapshot (< 5 minutes if hourly snapshots)
S3 Restore Implementation:
func (fm *FailureManager) RestorePartitionFromS3(partitionID string) error {
// Download snapshot from S3
s3Path := fmt.Sprintf("s3://cluster-%s/partition-%s.pb.gz", fm.clusterID, partitionID)
localPath := fmt.Sprintf("/tmp/%s.pb.gz", partitionID)
err := fm.s3Client.DownloadFile(s3Path, localPath)
if err != nil {
return fmt.Errorf("S3 download failed: %w", err)
}
// Decompress
decompressed, err := gzip.Decompress(localPath)
if err != nil {
return fmt.Errorf("decompression failed: %w", err)
}
// Load partition
partition := &Partition{}
proto.Unmarshal(decompressed, partition)
// Rebuild indexes
fm.RebuildPartitionIndexes(partition)
// Update routing table
fm.UpdateRoutingTable(partitionID, fm.replacementProxyID)
log.Infof("Restored partition %s from S3 (5 min recovery)", partitionID)
return nil
}
Cascading Failure Prevention
Problem: Single node failure can trigger cascading failures if remaining nodes become overloaded with rerouted traffic.
Circuit Breaker Pattern:
type CircuitBreaker struct {
State CircuitState
FailureThreshold int
FailureCount int
SuccessThreshold int
LastFailureTime time.Time
HalfOpenMaxRequests int
}
func (cb *CircuitBreaker) Call(fn func() error) error {
switch cb.State {
case CircuitClosed:
// Normal operation
err := fn()
if err != nil {
cb.RecordFailure()
if cb.FailureCount >= cb.FailureThreshold {
cb.Trip()
return CircuitOpenError{}
}
} else {
cb.RecordSuccess()
}
return err
case CircuitOpen:
// Reject requests to prevent cascading failure
if time.Since(cb.LastFailureTime) > 30*time.Second {
cb.State = CircuitHalfOpen
return cb.Call(fn)
}
return CircuitOpenError{Message: "Circuit breaker open, proxy overloaded"}
case CircuitHalfOpen:
// Test with limited requests
err := fn()
if err != nil {
cb.Trip()
return err
}
cb.RecordSuccess()
if cb.SuccessCount >= cb.SuccessThreshold {
cb.Reset()
}
return nil
}
}
Configuration:
circuit_breaker:
failure_threshold: 10 # Open circuit after 10 consecutive failures
success_threshold: 5 # Close circuit after 5 consecutive successes
timeout: 30s # Time before attempting half-open
half_open_max_requests: 10 # Maximum requests in half-open state
Operational Runbooks
Scenario 1: Single Proxy Failure
Detection: Heartbeat missed for 15 seconds
Symptoms: 64 partitions (1/16,000) unavailable
Automated Response:
1. Mark proxy as FAILED in routing table
2. Attempt replica failover (if available)
3. If no replica: Restore from S3 (5 min)
4. Resume traffic
Human Action Required: None (fully automated)
Expected Duration: 20s (replica) or 5 min (S3)
Scenario 2: Cluster-Wide Network Partition
Detection: All proxies in Cluster 7 unresponsive
Symptoms: 1,600 partitions (10% of cluster) unavailable
Automated Response:
1. Mark entire cluster as UNAVAILABLE
2. Reroute queries to remaining 9 clusters
3. Wait for network recovery (no data migration yet)
Human Action Required: Investigate network partition
Expected Duration: Minutes to hours (network-dependent)
Scenario 3: Cascading Overload
Detection: Circuit breakers opening on multiple proxies
Symptoms: Increased latency, error rate >10%
Automated Response:
1. Activate circuit breakers on overloaded proxies
2. Shed low-priority query load
3. Alert on-call engineer
Human Action Required: Scale cluster or reduce load
Expected Duration: Hours (requires capacity planning)
Mean Time to Recovery (MTTR) Targets
| Failure Type | Detection Time | Recovery Time | Total MTTR | Method |
|---|---|---|---|---|
| Single proxy failure | 15s | 20s | 35s | Replica failover |
| Single proxy failure (no replica) | 15s | 5 min | 5m 15s | S3 restore |
| Partition corruption | Immediate | 5 min | 5 min | S3 restore |
| Network partition | 30s | Variable | 30s-hours | Wait for recovery |
| Cascading overload | 1 min | Variable | Minutes | Circuit breaker |
Monitoring and Alerting
Prometheus Metrics:
# Failure detection
prism_proxy_heartbeat_missed_total{proxy="42"} 3
prism_proxy_state{proxy="42",state="failed"} 1
# Recovery metrics
prism_partition_recovery_duration_seconds{method="replica"} 20
prism_partition_recovery_duration_seconds{method="s3"} 315
# Circuit breaker state
prism_circuit_breaker_state{proxy="23"} 1 # 0=closed, 1=open, 2=half-open
prism_circuit_breaker_failures_total{proxy="23"} 10
Alerting Rules:
alerts:
- name: ProxyDown
condition: prism_proxy_state{state="failed"} == 1
severity: critical
message: "Proxy {{$labels.proxy}} failed, recovery in progress"
- name: HighRecoveryRate
condition: rate(prism_partition_recovery_duration_seconds_count[5m]) > 0.1
severity: warning
message: "High partition recovery rate: {{$value}} recoveries/min"
- name: CircuitBreakerOpen
condition: prism_circuit_breaker_state{state="open"} == 1
severity: critical
message: "Circuit breaker open on proxy {{$labels.proxy}}, possible overload"
Summary
Failure detection and recovery is mandatory at 1000-node scale. Key design decisions:
- Fast Detection: 15-second heartbeat-based detection
- Two-Tier Recovery:
- Fast path: Replica failover (20s MTTR)
- Slow path: S3 restore (5 min MTTR)
- Cascading Prevention: Circuit breakers prevent overload spread
- Full Automation: No human intervention for common failures
- Monitoring: Comprehensive metrics and alerting for operational visibility
Impact: System can sustain 1 failure/day without human intervention, maintains 99.99% availability (52 minutes downtime/year), enables true lights-out operation at massive scale.
Performance Characteristics
Query Latency
| Query Type | Partitions Touched | Latency (P50) | Latency (P99) |
|---|---|---|---|
| Single vertex lookup | 1 | 50 μs | 200 μs |
| 1-hop traversal (local) | 1 | 500 μs | 2 ms |
| 1-hop traversal (distributed) | 10 | 5 ms | 20 ms |
| 2-hop traversal (local) | 1 | 2 ms | 10 ms |
| 2-hop traversal (distributed) | 100 | 50 ms | 200 ms |
| 3-hop traversal (distributed) | 500 | 500 ms | 2 s |
| Global scan (all partitions) | 1000 | 2 s | 10 s |
Throughput
| Operation | Throughput | Notes |
|---|---|---|
| Single partition write | 100k writes/sec | Hot partition limit |
| Cluster aggregate write | 100M writes/sec | 1000 proxies × 100k |
| Single partition read | 1M reads/sec | In-memory, no disk |
| Cluster aggregate read | 1B reads/sec | 1000 proxies × 1M |
| Cross-partition query | 10k queries/sec | Limited by coordinator |
| Distributed traversal | 1k traversals/sec | 3-hop average |
Storage
| Tier | Capacity | Cost ($/TB/month) | Access Latency |
|---|---|---|---|
| In-Memory (Hot) | 30 TB | $500 (RAM) | 50 μs |
| S3 Standard (Warm) | 1 PB | $23 | 100 ms |
| S3 Glacier (Cold) | 10 PB | $1 | 12 hours |
Total Storage Cost (100B vertices scenario):
- 100B vertices × 100 bytes = 10 TB (vertices)
- 10T edges × 20 bytes = 200 TB (edges)
- Total: 210 TB raw data
Storage Distribution:
- Hot (10%): 21 TB in-memory across 1000 proxies
- Warm (40%): 84 TB on S3 Standard
- Cold (50%): 105 TB on S3 Glacier
Monthly Cost:
- Hot: $10,500 (RAM: $500/TB × 21 TB)
- Warm: $1,932 (S3: $23/TB × 84 TB)
- Cold: $105 (Glacier: $1/TB × 105 TB)
- Total: ~$12,500/month
Operational Patterns
Capacity Planning
Vertices per Proxy: 100M vertices
Memory per Proxy: 30 GB
- Vertex data: 100M × 100 bytes = 10 GB
- Edge data: 1B × 20 bytes = 20 GB
- Overhead (indexes, metadata): 10%
Cluster Size for 100B Vertices:
- 100B vertices ÷ 100M per proxy = 1000 proxies
- Distributed across 10 clusters = 100 proxies per cluster
Adding Capacity:
- Add new cluster (0-10B vertices)
- Add proxies to existing clusters (0-100M vertices)
- Add partitions to existing proxies (0-1.56M vertices)
Monitoring
# Prometheus metrics
# Partition distribution
prism_graph_partitions_total{cluster="0",state="hot"} 800
prism_graph_partitions_total{cluster="0",state="warm"} 400
prism_graph_partitions_total{cluster="0",state="cold"} 400
# Query latency by partition count
prism_graph_query_latency_seconds{partitions="1",quantile="0.5"} 0.0005
prism_graph_query_latency_seconds{partitions="100",quantile="0.5"} 0.05
# Cross-partition queries
prism_graph_cross_partition_queries_total{cluster="0"} 5000000
# Partition migrations
prism_graph_partition_migrations_total{from_proxy="42",to_proxy="89"} 3
Disaster Recovery
Scenario: Entire cluster failure (100 proxies lost)
Recovery Strategy:
Option 1: Rebuild from S3
1. Spin up 100 new proxies
2. Each proxy downloads its partitions from S3
3. Load hot partitions into memory
4. Resume serving traffic
Time: 30 minutes (parallel S3 downloads)
Data loss: None (S3 is source of truth)
Option 2: Reroute to other clusters
1. Admin reassigns failed cluster's vertices to other clusters
2. Update routing tables
3. Other clusters load additional partitions
Time: 5 minutes
Data loss: None
Trade-off: Other clusters temporarily overloaded
Migration Path
Phase 1: Extend RFC-055 to 10B Vertices (100 Proxies)
Goal: Scale current architecture 10× without breaking changes
Changes:
- Increase proxy count: 10 → 100
- Same MemStore backend
- Same consistent hashing
Deliverables:
- Updated partition count: 256 → 6,400 (64 per proxy, based on MEMO-050 Finding 6)
- Admin plane supports 100 proxy registrations
- Cross-proxy RPC pool expanded to 100 connections
Phase 2: Add Hierarchical Clustering (10 Clusters × 100 Proxies)
Goal: Introduce cluster tier for 100B vertices
Changes:
- Add cluster concept (10 clusters)
- Each cluster has 100 proxies
- Update vertex ID format:
{cluster}:{proxy}:{partition}:{id}
Deliverables:
- Cluster-aware routing
- Cluster gateway component
- Inter-cluster query forwarding
Phase 3: Partition Temperature Management
Goal: Introduce hot/cold storage tiers
Changes:
- Integrate S3 backend (RFC-059)
- Implement partition temperature monitoring
- Automatic memory/S3 offloading
Deliverables:
- Partition state machine (hot/warm/cold)
- S3 snapshot format for partitions
- Temperature-based eviction policies
Phase 4: Advanced Query Optimization
Goal: Distributed Gremlin query execution
Changes:
- Partition-aware query planning (RFC-060)
- Multi-level indexing (RFC-058)
- Authorization filtering (RFC-061)
Deliverables:
- Query coordinator component
- Distributed query execution engine
- Index-accelerated traversals
Related RFCs
- RFC-055: Graph Pattern - High-Level Graph Database API - Foundation graph pattern
- RFC-058: Multi-Level Graph Indexing Strategy - Indexing for massive-scale queries
- RFC-059: Hot/Cold Storage Tiers with S3 - Storage tier architecture
- RFC-060: Distributed Gremlin Query Execution - Query execution model
- RFC-061: Graph Authorization with Vertex Labels - Fine-grained access control
- RFC-048: Cross-Proxy Partition Strategies - Partition management
- RFC-046: Consolidated Pattern Protocols - Pattern service architecture
Open Questions
- Optimal Partition Count per Proxy: 64 partitions recommended (MEMO-050 Finding 6) for finer granularity. Should this be dynamic based on workload?
- Cross-Region Sharding: How to shard graph across multiple AWS regions?
- Graph Replication: Should hot partitions be replicated for read scalability?
- Partition Split/Merge: How to handle partition growth beyond 1.56M vertices?
- Consistency Model: Strong consistency vs eventual consistency for distributed writes?
References
- Google Pregel: Large-Scale Graph Processing
- Facebook TAO: The Social Graph Database
- Amazon Neptune Best Practices
- Apache Giraph: Scalable Graph Processing
- JanusGraph Distribution
Revision History
- 2025-11-15: Initial draft - Massive-scale graph sharding for 100B vertices