Skip to main content

RFC-057: Massive-Scale Graph Sharding for 100B Vertices

Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15

Abstract

This RFC extends Prism's graph pattern (RFC-055) to support massive-scale distributed graphs with 100 billion vertices and trillions of edges. The existing architecture supports 1 billion vertices across 10 proxy instances (100M vertices per proxy). Scaling to 100B vertices requires a multi-tier sharding strategy with lightweight compute nodes, hierarchical partition routing, and intelligent data placement. This RFC defines the distributed sharding architecture, partition strategies, query routing protocols, and operational patterns needed to achieve this scale.

Key Innovations:

  • Hierarchical Sharding: Three-tier architecture (cluster → proxy → partition)
  • Lightweight Nodes: 1000+ nodes with 100M vertices each
  • Hot/Cold Tier Integration: In-memory hot data, S3 cold storage (RFC-059)
  • Partition-Aware Queries: Gremlin queries optimized for distributed execution
  • Dynamic Rebalancing: Automatic partition migration without downtime
  • Fine-Grained Authorization: Vertex-level access control (RFC-061)

Motivation

Current Scale Limitations

RFC-055 demonstrates 1B vertices across 10 proxies:

Scale DimensionRFC-055 (Current)This RFC (Target)Multiplier
Total Vertices1 billion100 billion100×
Total Edges10 billion10 trillion1000×
Proxy Instances101000100×
Vertices per Node100M100M
Memory per Node30 GB30 GB
Total Memory300 GB30 TB100×

Why Scale Beyond 1B Vertices?

Use Cases Requiring 100B+ Scale

1. Global Social Network Graph

Example: Facebook/Meta-scale social graph

  • Users: 3 billion active users
  • Connections: Average 200 friends/connections per user
  • Total Edges: 600 billion edges (friend relationships)
  • Content Interactions: 10 trillion edges (likes, comments, shares)
  • Total Vertices: 3B users + 50B posts/photos/videos = 53B vertices

Query Patterns:

  • Friends of friends (2-3 hop traversals)
  • Mutual connections
  • Content recommendations based on friend graph
  • Influence analysis (PageRank on 3B users)

2. Global Financial Transaction Network

Example: SWIFT/Visa-scale transaction graph

  • Accounts: 5 billion bank accounts worldwide
  • Merchants: 100 million merchants
  • Transactions: 500 billion transactions per year (vertices)
  • Transaction edges: Account → Transaction → Merchant
  • Total Vertices: 505B vertices
  • Total Edges: 1 trillion edges

Query Patterns:

  • Fraud detection (circular transaction paths)
  • Money laundering detection (complex multi-hop patterns)
  • Account risk scoring (network analysis)
  • Real-time suspicious pattern detection

3. Internet of Things (IoT) Device Network

Example: Smart city/industrial IoT graph

  • Devices: 50 billion IoT devices globally by 2030
  • Sensors: 200 billion sensor readings/day (time-series vertices)
  • Device relationships: Physical topology, data flows
  • Total Vertices: 50B devices + 73 trillion sensor readings/year = 73B+ vertices
  • Total Edges: 500B device-to-device edges + trillions of data flow edges

Query Patterns:

  • Device dependency analysis
  • Cascade failure prediction
  • Data lineage tracking
  • Real-time anomaly detection across device clusters

4. Knowledge Graph at Web Scale

Example: Google Knowledge Graph scale

  • Entities: 500 billion entities (people, places, things, concepts)
  • Relationships: 3.5 trillion semantic relationships
  • Total Vertices: 500B entities
  • Total Edges: 3.5T relationships

Query Patterns:

  • Semantic search (multi-hop entity resolution)
  • Question answering (traverse relationship chains)
  • Entity disambiguation
  • Recommendation via semantic similarity

Problems with Current Architecture

Problem 1: Limited Node Count (10 proxies)

RFC-055 architecture assumes 10-20 proxy instances:

  • Partition count fixed at 256
  • Consistent hashing assumes small cluster
  • Control plane distribution limited to 10-20 nodes

Problem 2: No Hierarchical Routing

Flat partition model doesn't scale to 1000+ nodes:

  • Every node needs partition table for all 1000 nodes
  • Cross-node queries require 1000-way coordination
  • Partition rebalancing touches all nodes

Problem 3: Memory-Only Architecture

RFC-055 keeps entire graph in memory:

  • 100B vertices × 100 bytes = 10 TB minimum
  • 10T edges × 20 bytes = 200 TB minimum
  • Total: 210 TB in-memory (not feasible)

Problem 4: No Data Locality Optimization

Random vertex assignment via consistent hashing:

  • Traversals always hit multiple nodes
  • No co-location of frequently accessed subgraphs
  • No optimization for access patterns

Problem 5: Limited Query Optimization

Single-proxy query execution:

  • No distributed query planning
  • No partition pruning for filtered queries
  • No parallel execution across partitions

Goals

  1. Horizontal Scalability: Support 1000+ lightweight compute nodes
  2. Hierarchical Sharding: Three-tier partition hierarchy (cluster → proxy → partition)
  3. Cost-Effective Storage: Hot/cold data tiers (memory + S3)
  4. Intelligent Placement: Co-locate related vertices for query optimization
  5. Distributed Queries: Gremlin query execution across 1000+ nodes
  6. Dynamic Rebalancing: Add/remove nodes without downtime
  7. Authorization-Aware: Vertex labeling for fine-grained access control

Non-Goals

  • Backend-Agnostic Sharding: This RFC assumes in-memory MemStore + S3 (not Neptune, Neo4j)
  • Multi-Region Distribution: Cross-region replication covered in RFC-012
  • ACID Transactions: Distributed transactions across shards (future work)
  • Schema Evolution: Graph schema changes not covered here
  • Real-Time Ingestion: Bulk loading focus, streaming ingestion in RFC-051 (WAL pattern)

Hierarchical Sharding Architecture

Three-Tier Partition Model

┌────────────────────────────────────────────────────────────────┐
│ Tier 1: Cluster │
│ │
│ Global Coordinator (Admin Plane) │
│ - 10-20 clusters, each with 100 proxy nodes │
│ - Cluster ID: 0-19 (e.g., us-west-1, us-east-1) │
│ │
└────────────────────┬───────────────────────────────────────────┘

┌───────────┼───────────┬─────────────┐
│ │ │ │
┌────────▼─────┐ ┌──▼────────┐ ┌▼──────────┐ ┌▼──────────────┐
│ Cluster 0 │ │ Cluster 1 │ │ Cluster 2 │ │ ... │
│ (us-west-1) │ │(us-east-1)│ │(eu-west-1)│ │ Cluster 19 │
│ │ │ │ │ │ │ │
│ 100 proxies │ │ 100 proxies│ │100 proxies│ │ 100 proxies │
│ 10B vertices │ │10B vertices│ │10B vertices│ │10B vertices │
└────────┬─────┘ └───────────┘ └───────────┘ └───────────────┘

│ Tier 2: Proxy (within cluster)

┌────────▼───────────────────────────────────────────────────────┐
│ Cluster 0: 100 Proxy Instances │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Proxy │ │Proxy │ │Proxy │ ... │Proxy │ │
│ │ 0 │ │ 1 │ │ 2 │ │ 99 │ │
│ │ │ │ │ │ │ │ │ │
│ │100M V│ │100M V│ │100M V│ │100M V│ │
│ └───┬──┘ └───┬──┘ └───┬──┘ └───┬──┘ │
│ │ │ │ │ │
└──────┼────────┼────────┼──────────────────┼───────────────────┘
│ │ │ │
│ Tier 3: Partitions (within proxy)

┌──────▼──────────────────────────────────────────┐
│ Proxy 0: 64 Partitions │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Part 0 │ │ Part 1 │ │ Part 63 │ │
│ │ 1.56M V │ │ 1.56M V │ │ 1.56M V │ ... │
│ │ Hot data │ │ Warm data│ │ Cold data│ │
│ │ In-memory│ │ Partial │ │ S3 backed│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────┘

Partition Addressing

Vertex ID Format: {cluster_id}:{proxy_id}:{partition_id}:{local_vertex_id}

Example: 02:0045:12:user_alice

  • Cluster 2 (eu-west-1)
  • Proxy 45 (within cluster 2)
  • Partition 12 (within proxy 45)
  • Local ID: user_alice

Routing Algorithm:

Vertex ID: "02:0045:12:user_alice"

Step 1: Parse vertex ID
cluster_id = 02
proxy_id = 0045
partition_id = 12
local_vertex_id = "user_alice"

Step 2: Route to cluster
Global coordinator → Cluster 2 gateway

Step 3: Route to proxy
Cluster 2 gateway → Proxy 45

Step 4: Route to partition
Proxy 45 → Partition 12

Step 5: Execute query
Partition 12 → Lookup "user_alice" in local MemStore/S3

Alternative: Opaque Vertex IDs

Trade-Off: Hierarchical IDs (above) embed topology in vertex ID, making routing fast but rebalancing expensive. Opaque IDs decouple vertex identity from physical location, enabling free rebalancing at the cost of routing overhead (MEMO-050 Finding 15).

Hierarchical IDs (Current Approach)

Advantages:

  • Zero-overhead routing: Parse vertex ID to determine partition (O(1), ~10 ns)
  • No external dependencies: No routing table required
  • Deterministic: Same vertex ID always routes to same partition

Disadvantages:

  • Expensive rebalancing: Moving partition requires rewriting all vertex IDs
  • Topology-dependent: Vertex IDs encode cluster/proxy/partition structure
  • Limited flexibility: Cannot easily move individual vertices between partitions

Rebalancing Cost Example:

Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89

With hierarchical IDs:
1. Update all 1.56M vertex IDs in partition
- Old: 07:0042:05:user_alice → New: 07:0089:05:user_alice
2. Update all incoming edge references (10M edges pointing to this partition)
3. Update all indexes containing these vertex IDs
4. Broadcast ID changes to all proxies

Cost: ~30 minutes for 1.56M vertices + 10M edge updates
Downtime: Partition unavailable during rewrite

Opaque IDs (Alternative Approach)

Design: Vertex IDs are globally unique, topology-independent identifiers. Routing table maps opaque ID → physical location.

Vertex ID Format: vertex:{uuid} or vertex:{snowflake_id}

Example: vertex:550e8400-e29b-41d4-a716-446655440000

Routing Table Architecture:

message VertexRoutingTable {
// Distributed hash table: vertex_id → partition_location
map<string, PartitionLocation> routing_table = 1;

// Bloom filter for fast negative lookups
bytes bloom_filter = 2;
}

message PartitionLocation {
string cluster_id = 1;
string proxy_id = 2;
string partition_id = 3;
int64 last_updated = 4;
}

Routing Implementation:

type OpaqueVertexRouter struct {
routingTable *DistributedHashTable
bloomFilter *bloom.BloomFilter
cache *RoutingCache
}

func (ovr *OpaqueVertexRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Fast path: Check cache (99% hit rate)
if location := ovr.cache.Get(vertexID); location != nil {
return location, nil
}

// Bloom filter: Fast negative lookup
if !ovr.bloomFilter.Contains(vertexID) {
return nil, VertexNotFoundError{VertexID: vertexID}
}

// Slow path: Lookup in routing table
location, err := ovr.routingTable.Get(vertexID)
if err != nil {
return nil, err
}

// Cache result
ovr.cache.Put(vertexID, location)

return location, nil
}

Routing Table Storage: Distributed across cluster using consistent hashing

type DistributedRoutingTable struct {
shards map[uint32]*RoutingTableShard
}

func (drt *DistributedRoutingTable) GetShard(vertexID string) *RoutingTableShard {
hash := xxhash.Sum64String(vertexID)
shardID := hash % uint64(len(drt.shards))
return drt.shards[uint32(shardID)]
}

func (drt *DistributedRoutingTable) Get(vertexID string) (*PartitionLocation, error) {
shard := drt.GetShard(vertexID)
return shard.Lookup(vertexID)
}

Routing Cache Strategy:

routing_cache_config:
type: local_lru # Per-proxy cache
max_entries: 1_000_000 # 1M vertex locations cached
ttl: 3600s # 1 hour expiration
memory_size: 100 MB # ~100 bytes per entry

performance:
hit_rate: 99% # Power-law: 1% of vertices get 99% of queries
cold_lookup: 1 μs # Cache miss → routing table lookup
hot_lookup: 10 ns # Cache hit

Rebalancing with Opaque IDs:

Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89

With opaque IDs:
1. Copy partition data to Proxy 89
2. Update routing table: vertex_id → 07:0089:05 (for all vertices in partition)
3. Broadcast routing table update to all proxies
4. Invalidate caches

Cost: ~10 seconds for routing table updates
Downtime: None (dual-read during migration)
Vertex IDs: Unchanged (no rewrites needed)

Performance Comparison

Routing Latency:

OperationHierarchical IDsOpaque IDs (Cold)Opaque IDs (Hot)
Parse vertex ID10 ns--
Routing table lookup-1 μs10 ns (cached)
Total routing time10 ns1 μs10 ns

Cache hit rate: 99% (power-law distribution)

Effective routing latency: 0.01 × 1 μs + 0.99 × 10 ns = 20 ns average

Rebalancing Time:

OperationHierarchical IDsOpaque IDsSpeedup
Move single partition30 minutes10 seconds180×
Move 100 partitions50 hours16 minutes187×
Rebalance entire cluster3 days2 hours36×

Best of both worlds: Use hierarchical IDs for most vertices, opaque IDs for frequently moved data

vertex_id_strategy:
default: hierarchical # Most vertices use cluster:proxy:partition:id
opaque_for:
- hot_partitions # Frequently rebalanced partitions
- cross_partition_vertices # Vertices referenced from many partitions
- user_managed_ids # Application-controlled IDs

routing_table:
shards: 256 # Distributed routing table shards
replication: 3 # Routing table replicated 3×
cache_size: 1_000_000 # Per-proxy cache

Hybrid Routing:

func (hr *HybridRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Check if hierarchical or opaque
if hr.IsHierarchicalID(vertexID) {
// Fast path: Parse ID directly
return hr.ParseHierarchicalID(vertexID), nil
}

// Slow path: Lookup in routing table
return hr.opaqueRouter.RouteVertex(vertexID)
}

func (hr *HybridRouter) IsHierarchicalID(vertexID string) bool {
// Hierarchical IDs match pattern: cluster:proxy:partition:local_id
return strings.Count(vertexID, ":") == 3 && len(strings.Split(vertexID, ":")) == 4
}

Decision Matrix

Use CaseRecommended ApproachRationale
Static graphHierarchical IDsNo rebalancing needed, zero routing overhead
Frequently rebalancedOpaque IDs180× faster rebalancing, 1 μs routing acceptable
Mixed workloadHybrid (80% hierarchical, 20% opaque)Optimize common case, flexibility for hot data
Application-controlled IDsOpaque IDsApps provide IDs, system maintains routing

Summary

Opaque vertex IDs trade 1 μs routing overhead for free rebalancing:

  • Routing overhead: 1 μs cold, 10 ns hot (99% cache hit rate)
  • Rebalancing speedup: 180× faster (10 seconds vs 30 minutes per partition)
  • Memory overhead: 100 MB per proxy for routing cache (negligible)
  • Operational benefit: Enable live rebalancing without downtime

Recommendation: Start with hierarchical IDs for simplicity, migrate to hybrid approach when operational flexibility is needed. Reserve opaque IDs for hot partitions and application-managed identifiers.

Partition Size Guidelines

TierCountSizeMemoryPurpose
Cluster10-205-10B vertices-Geographic/organizational boundary
Proxy100 per cluster100M vertices30 GBCompute node with in-memory cache
Partition64 per proxy1.56M vertices156 MBHot/warm/cold data unit

Why 64 partitions per proxy? (Updated from 16 based on MEMO-050 Finding 6)

  • Finer hot/cold granularity: 156 MB partitions vs 625 MB (4× smaller)
  • Faster rebalancing: 10× more parallelism, 13 seconds vs 2.1 minutes
  • Better load distribution: Lower variance (2% vs 15%)
  • Smaller failure blast radius: 1.56M vertices vs 6.25M

Trade-off: Slightly more partition metadata (64k vs 16k cluster-wide), but benefits outweigh costs at 100B scale.

Network Topology-Aware Sharding

Critical: At 100B scale, cross-AZ bandwidth costs $365M/year without network-aware placement (MEMO-050 Finding 3). This section defines network topology integration for 95% cost savings.

AWS Network Cost Model

Network PathLatencyCostUse Case
Same rack100 μs$0/GBPreferred
Same AZ200 μs$0/GBGood
Cross-AZ1-2 ms$0.01/GBExpensive!
Cross-region20-100 ms$0.02/GBVery expensive!

Cost Impact Without Optimization:

Typical 2-hop query: 100 partitions × 100 KB = 10 MB
50% cross-AZ traffic: 5 MB cross-AZ per query
At 1B queries/day: 5 PB/day × $0.01/GB = $50M/day = $1.5B/month ❌

Cost Impact With Optimization:

AZ-aware placement: 5% cross-AZ traffic (not 50%)
At 1B queries/day: 250 TB/day × $0.01/GB = $2.5M/day = $75M/month
Annual: $900M/year → With CDN: $30M/year ✅

Extended Partition Metadata

message PartitionMetadata {
string partition_id = 1;
string cluster_id = 2;
string proxy_id = 3;

// Network topology (NEW)
NetworkLocation location = 4;
}

message NetworkLocation {
string region = 1; // "us-west-2"
string availability_zone = 2; // "us-west-2a"
string rack_id = 3; // "rack-42" (optional, for same-DC optimization)

// Network characteristics for cost/latency calculations
float cross_az_bandwidth_gbps = 4; // Typical: 10 Gbps
float cross_region_bandwidth_gbps = 5; // Typical: 1-5 Gbps
}

Multi-AZ Deployment Strategy

cluster_topology:
region: us-west-2

availability_zones:
- id: us-west-2a
proxies: 334
primary_for: [cluster_0, cluster_3, cluster_6, cluster_9]

- id: us-west-2b
proxies: 333
primary_for: [cluster_1, cluster_4, cluster_7]

- id: us-west-2c
proxies: 333
primary_for: [cluster_2, cluster_5, cluster_8]

# Partition replication for availability
partition_replication:
factor: 3 # 3 replicas per partition
placement: different_az # Each replica in different AZ
consistency: eventual # Async replication (lower cross-AZ cost)
read_preference: primary_az # Read from local AZ first (zero cost)

# Failover policy
failover:
enabled: true
timeout: 100ms # Try local AZ first
fallback: cross_az # Fall back to remote AZ if needed

Locality-Aware Partitioning

Strategy: Co-locate frequently co-traversed vertices in same AZ.

type NetworkAwarePartitioner struct {
topologyMap *NetworkTopology
localityHints map[string]string // vertex_id → preferred_az
}

func (nap *NetworkAwarePartitioner) AssignPartition(
vertex *Vertex,
placementHint *PlacementHint,
) string {
// Default: Consistent hashing (random placement)
defaultPartition := nap.ConsistentHash(vertex.ID)

// If hint provided, prefer co-location in same AZ
if placementHint != nil && placementHint.CoLocateWithVertex != "" {
// Place in same AZ as target vertex
targetPartition := nap.GetPartitionForVertex(placementHint.CoLocateWithVertex)
targetAZ := nap.topologyMap.GetAZ(targetPartition)

// Find partition in same AZ with capacity
sameAZPartition := nap.FindPartitionInAZ(targetAZ)
if sameAZPartition != "" {
return sameAZPartition
}
}

// Fall back to consistent hashing
return defaultPartition
}

// Example: Social graph with locality
func CreateFriendship(user1, user2 string) error {
// Place user2 in same AZ as user1 if possible
edge := &Edge{
FromVertexID: user1,
ToVertexID: user2,
Label: "FRIENDS",
PlacementHint: &PlacementHint{
CoLocateWithVertex: user1,
Reason: "Reduce cross-AZ traffic for friend traversals",
},
}

return CreateEdge(edge)
}

Query Routing with Network Cost

type NetworkCostModel struct {
sameAZLatency time.Duration // 200 μs
crossAZLatency time.Duration // 2 ms
crossRegionLatency time.Duration // 50 ms

sameAZCost float64 // $0/GB
crossAZCost float64 // $0.01/GB
crossRegionCost float64 // $0.02/GB
}

func (qp *QueryPlanner) OptimizeWithNetworkCost(plan *QueryPlan) *QueryPlan {
for i, stage := range plan.Stages {
// Group partitions by AZ
partitionsByAZ := qp.GroupPartitionsByAZ(stage.TargetPartitions)

// Prefer partitions in same AZ as coordinator
coordinatorAZ := qp.GetCoordinatorAZ()

// Reorder: local AZ first, then remote
optimizedOrder := []string{}
optimizedOrder = append(optimizedOrder, partitionsByAZ[coordinatorAZ]...)

for az, partitions := range partitionsByAZ {
if az != coordinatorAZ {
optimizedOrder = append(optimizedOrder, partitions...)
}
}

stage.TargetPartitions = optimizedOrder

// Estimate network cost
stage.EstimatedNetworkCost = qp.CalculateNetworkCost(stage)

plan.Stages[i] = stage
}

return plan
}

func (qp *QueryPlanner) CalculateNetworkCost(stage *ExecutionStage) float64 {
cost := 0.0
coordinatorAZ := qp.GetCoordinatorAZ()

for _, partitionID := range stage.TargetPartitions {
partitionAZ := qp.GetPartitionAZ(partitionID)

// Estimate data transfer (100 KB per partition average)
dataTransferGB := 0.0001 // 100 KB

if partitionAZ == coordinatorAZ {
cost += 0 // Same AZ: free
} else {
cost += dataTransferGB * qp.networkCostModel.crossAZCost
}
}

return cost
}

Deployment Patterns by Scale

1B Vertices (10 Nodes): Single AZ

strategy: single_az
rationale: Small cluster, cross-AZ overhead > benefits
cost: $100k/year
availability: Single AZ failure = full outage
trade_off: Simple but less available

10B Vertices (100 Nodes): Multi-AZ with Read Preferences

strategy: multi_az_read_local
rationale: Amortize cross-AZ cost, most queries stay local
cost: $5M/year (10% cross-AZ traffic)
availability: AZ failure = 33% capacity loss (degraded)
trade_off: Balanced cost/availability

100B Vertices (1000 Nodes): Multi-AZ with Aggressive Optimization

strategy: multi_az_network_optimized
rationale: Network cost dominates, must optimize aggressively
cost: $47M/year (5% cross-AZ traffic)
availability: AZ failure = 33% capacity loss
trade_off: Complex but essential at scale

Cost Savings Summary

ScaleWithout AZ AwarenessWith AZ AwarenessSavings
1B vertices$100k/year$100k/year0% (single AZ)
10B vertices$45M/year$5M/year89%
100B vertices$365M/year$30M/year92%

Key Takeaway: At 100B scale, network-aware placement is mandatory, not optional. Without it, bandwidth costs exceed infrastructure costs by 10×.

Sharding Strategies

Strategy 1: Hierarchical Consistent Hashing (Default)

Three-level hash:

1. Cluster Assignment: hash(vertex_id) % num_clusters → cluster_id
2. Proxy Assignment: hash(vertex_id) % proxies_per_cluster → proxy_id
3. Partition Assignment: hash(vertex_id) % partitions_per_proxy → partition_id

Example:
vertex_id = "user:alice"

Level 1 (Cluster):
xxhash64("user:alice") % 10 = 7 → Cluster 7

Level 2 (Proxy within cluster 7):
xxhash64("user:alice") % 100 = 42 → Proxy 42

Level 3 (Partition within proxy 42):
xxhash64("user:alice") % 64 = 37 → Partition 37

Full address: 07:0042:37:user:alice

Hash Function Choice: xxHash vs CRC32

At 100B scale, hash function quality significantly impacts load distribution. Poor distribution leads to hotspots, uneven resource utilization, and degraded performance.

Benchmark Comparison (100M vertex IDs hashed):

Hash FunctionThroughputCollision RateLoad VarianceNotes
xxHash648.5 GB/s1 in 100k2%Recommended ✅
CRC325.0 GB/s1 in 10k15%Legacy, poor distribution
MurmurHash33.8 GB/s1 in 50k5%Good, but slower
Jump Hash12.0 GB/sN/A<1%Best for minimal rebalancing

Key Findings:

  • xxHash64: 1.7× faster than CRC32, 8× better load distribution (15% → 2% variance)
  • Jump Hash: Optimal for scenarios requiring minimal data movement during rebalancing (adds/removes nodes with minimal key reassignment)
  • CRC32: Deprecated due to poor distribution properties at massive scale

Implementation (Go):

import "github.com/cespare/xxhash/v2"

func GetPartitionAddress(vertexID string, numClusters, proxiesPerCluster, partitionsPerProxy int) string {
hash := xxhash.Sum64String(vertexID)

clusterID := hash % uint64(numClusters)
proxyID := hash % uint64(proxiesPerCluster)
partitionID := hash % uint64(partitionsPerProxy)

return fmt.Sprintf("%02d:%04d:%02d:%s", clusterID, proxyID, partitionID, vertexID)
}

Alternative: Jump Hash for Rebalancing

For scenarios with frequent topology changes (adding/removing proxies), Jump Hash minimizes data movement:

import "github.com/dgryski/go-jump"

func GetPartitionWithJumpHash(vertexID string, numPartitions int) int {
hash := xxhash.Sum64String(vertexID)
return int(jump.Hash(hash, numPartitions))
}

Jump Hash Properties:

  • When adding 1 proxy: Only 1/N keys move (vs 50% with modulo hashing)
  • Example: Add proxy 101 to 100-proxy cluster → Only 1% of keys rebalance
  • Trade-off: Slightly more complex routing logic

Benefits:

  • Even distribution across all tiers
  • Deterministic routing (no lookups needed)
  • Minimal rebalancing on topology changes

Drawbacks:

  • No data locality (related vertices scattered)
  • No access pattern optimization

Strategy 2: Locality-Aware Partitioning

Principle: Co-locate vertices that are frequently traversed together.

Examples:

Social Graph Locality

User "alice" has 200 friends. Co-locate alice + friends on same proxy.

Approach:
1. Assign user to cluster/proxy via consistent hash
2. When creating friend edge:
- Check if target vertex exists
- If not, create on SAME proxy as source vertex
3. Result: Most 1-hop traversals stay within single proxy

Benefits:
- Friends-of-friends queries mostly single-proxy
- No cross-node RPC for common queries
- 10-100× faster traversals

Financial Transaction Locality

Account "acct_123" has 1000 transactions. Co-locate transactions near account.

Approach:
1. Assign account to cluster/proxy
2. Create transactions on same proxy (partition determined by timestamp)
3. Result: Account history queries are local

Benefits:
- Transaction history: single proxy query
- Account risk analysis: local graph traversal

Implementation:

message CreateVertexRequest {
string id = 1;
string label = 2;
map<string, Value> properties = 3;

// Locality hint
optional PlacementHint placement_hint = 4;
}

message PlacementHint {
oneof hint {
string co_locate_with_vertex = 1; // Place near this vertex
string co_locate_with_label = 2; // Place with vertices of this type
string explicit_partition = 3; // Explicit partition ID
}
}

Co-location Algorithm:

func (g *GraphShardManager) CreateVertex(req *CreateVertexRequest) error {
var targetPartition string

if req.PlacementHint != nil {
switch hint := req.PlacementHint.Hint.(type) {
case *PlacementHint_CoLocateWithVertex:
// Place on same partition as target vertex
targetVertex := g.GetVertex(hint.CoLocateWithVertex)
targetPartition = targetVertex.PartitionID

case *PlacementHint_ExplicitPartition:
targetPartition = hint.ExplicitPartition

default:
// Fall back to consistent hashing
targetPartition = g.CalculatePartition(req.Id)
}
} else {
targetPartition = g.CalculatePartition(req.Id)
}

// Create vertex on target partition
return g.CreateVertexOnPartition(targetPartition, req)
}

Strategy 3: Label-Based Partitioning

Principle: Partition by vertex label (type), not vertex ID.

Example: Multi-tenant SaaS graph

Labels: User, Post, Comment, Organization

Partitioning:
Cluster 0: All "User" vertices
Cluster 1: All "Post" vertices
Cluster 2: All "Comment" vertices
Cluster 3: All "Organization" vertices

Benefits:
- Label-specific queries are single-cluster
- Easy to scale individual vertex types
- Clear capacity planning

Drawbacks:
- Cross-label traversals always multi-cluster
- Imbalanced load if one label dominates

Configuration:

partition_strategy:
type: label_based
label_cluster_mapping:
User: cluster_0
Post: cluster_1
Comment: cluster_2
Organization: cluster_3
fallback: consistent_hash # For unknown labels

Strategy 4: Hybrid Sharding

Principle: Combine strategies based on access patterns.

Example: Social network with hot users

Tier 1 (Cluster): Label-based
- Cluster 0-5: Regular users (consistent hash)
- Cluster 6: VIP users (celebrities, influencers)
- Cluster 7: Business pages
- Cluster 8: Groups

Tier 2 (Proxy): Locality-aware
- VIP users: Co-locate with their posts
- Regular users: Consistent hash

Tier 3 (Partition): Access frequency
- Hot partitions: In-memory
- Warm partitions: Partial memory
- Cold partitions: S3-backed

Configuration:

partition_strategy:
type: hybrid

cluster_strategy:
type: label_based
labels:
regular_user: [0, 1, 2, 3, 4, 5]
vip_user: [6]
business_page: [7]
group: [8]

proxy_strategy:
type: locality_aware
co_locate_edge_labels:
- FRIENDS
- FOLLOWS
- MEMBER_OF

partition_strategy:
type: access_frequency
hot_threshold: 1000_requests_per_minute
cold_threshold: 10_requests_per_minute

Distributed Query Routing

Query Execution Models

Model 1: Gather-Scatter (Simple Traversals)

Use Case: Broad queries that touch many partitions

Query: Get all users in organization "acme-corp"

Step 1: Parse query
Start: organization:acme-corp
Traversal: IN(MEMBER_OF) → User vertices

Step 2: Identify partition scope
Query coordinator: Scan all partitions (can't prune)

Step 3: Scatter phase
Coordinator → Send subquery to all 1000 proxies in parallel

Step 4: Gather phase
Each proxy returns matching vertices
Coordinator aggregates results

Step 5: Return to client
Total: 50,000 users in org

Performance: O(num_partitions) parallelism

Model 2: Partition-Aware Pruning (Filtered Traversals)

Use Case: Queries with partition-prunable filters

Query: Get user "alice" and traverse 2 hops

Step 1: Parse query
Start: user:alice (deterministic partition via vertex ID)

Step 2: Identify start partition
vertex_id = "user:alice"
partition = hash("user:alice") = 07:0042:05

Step 3: Execute first hop locally
Proxy 07:0042 executes: user:alice.out(FOLLOWS)
Result: 200 friends (all may be on different partitions)

Step 4: Group by partition
Friends on Proxy 07:0042: 50 (local)
Friends on other proxies: 150 (remote)

Step 5: Second hop (distributed)
Execute out(FOLLOWS) on 50 local friends (fast)
Fan-out to 20 other proxies for 150 remote friends (parallel RPC)

Step 6: Aggregate
Coordinator collects results from 20 proxies
Total: 5,000 friends-of-friends

Performance: O(num_partitions_touched) << O(total_partitions)

Model 3: Index-Accelerated Queries

Use Case: Queries with indexed property filters

Query: Find users where city = "San Francisco" AND age > 25

Step 1: Check for index
Index exists: city_index (partition-local)

Step 2: Index scan (parallel across all partitions)
Each partition scans local city_index["San Francisco"]
Filter results by age > 25

Step 3: Return matches
Total: 125,000 users across 500 partitions

Performance: O(matches_per_partition), not O(total_vertices)

Cross-Partition Traversal Optimization

Problem: 2-hop query touches 1M vertices across 1000 partitions

Solution 1: Adaptive Parallelism

func (q *QueryCoordinator) ExecuteTraversal(start string, hops int) ([]Vertex, error) {
currentLevel := []string{start}

for hop := 0; hop < hops; hop++ {
// Group vertices by partition
partitionGroups := q.groupByPartition(currentLevel)

// Adaptive parallelism based on partition count
if len(partitionGroups) < 10 {
// Few partitions: Execute sequentially for each
nextLevel = q.executeSequential(partitionGroups)
} else if len(partitionGroups) < 100 {
// Medium partitions: Execute in batches of 10
nextLevel = q.executeParallelBatched(partitionGroups, 10)
} else {
// Many partitions: Full parallelism
nextLevel = q.executeFullParallel(partitionGroups)
}

currentLevel = nextLevel
}

return currentLevel, nil
}

Solution 2: Early Termination

// Limit traversal breadth to prevent explosion
func (q *QueryCoordinator) ExecuteTraversal(
start string,
hops int,
maxResultsPerHop int,
) ([]Vertex, error) {
currentLevel := []string{start}

for hop := 0; hop < hops; hop++ {
partitionGroups := q.groupByPartition(currentLevel)

nextLevel := []string{}
for _, group := range partitionGroups {
neighbors := q.executePartitionTraversal(group)
nextLevel = append(nextLevel, neighbors...)

// Early termination if too many results
if len(nextLevel) > maxResultsPerHop {
nextLevel = nextLevel[:maxResultsPerHop]
break
}
}

currentLevel = nextLevel
}

return currentLevel, nil
}

Solution 3: Partition-Local Caching

Query: Frequent traversal from popular vertex "celebrity_user"

Optimization:
1. First query: celebrity_user.out(FOLLOWS) → 10M followers
Cache result on query coordinator

2. Subsequent queries: Use cached follower list
TTL: 60 seconds

3. Invalidation: On edge create/delete to celebrity_user

Performance:
Cold query: 500ms (hits 200 partitions)
Warm query: 5ms (cached)

Query Coordinator Architecture

┌────────────────────────────────────────────────────────────────┐
│ Query Coordinator │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Query Parser │ │ Query Planner│ │ Result Cache │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Distributed Execution Engine │ │
│ │ - Partition pruning │ │
│ │ - Parallel scatter-gather │ │
│ │ - Adaptive parallelism │ │
│ │ - Early termination │ │
│ └────────────────────────┬───────────────────────────┘ │
│ │ │
└────────────────────────────┼───────────────────────────────────┘

┌─────────────────────┼─────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Partition │ │ Partition │ │ Partition │
│ Executor 1 │ │ Executor 2 │ ... │Executor 1000│
│ │ │ │ │ │
│ Proxy 1 │ │ Proxy 2 │ │ Proxy 1000 │
└─────────────┘ └─────────────┘ └─────────────┘

Partition Management

Partition States

enum PartitionState {
PARTITION_STATE_UNSPECIFIED = 0;
PARTITION_STATE_COLD = 1; // S3-backed, not in memory
PARTITION_STATE_WARMING = 2; // Loading into memory
PARTITION_STATE_WARM = 3; // Partially in memory
PARTITION_STATE_HOT = 4; // Fully in memory
PARTITION_STATE_DRAINING = 5; // Being migrated to another proxy
PARTITION_STATE_OFFLINE = 6; // Not accessible
}

Partition Migration Protocol

Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89

Step 1: Prepare Target (Proxy 89)
Admin → Proxy 89: PreparePartition(07:0042:05)
Proxy 89: Allocate memory, download from S3 if needed
Proxy 89 → Admin: Ready

Step 2: Dual-Write Phase
Admin updates routing table: partition 07:0042:05 → [Proxy 42, Proxy 89]
Writes go to BOTH proxies
Reads go to Proxy 42 (primary)
Duration: Until Proxy 89 catches up

Step 3: Synchronize State
Proxy 42 → Proxy 89: Stream recent writes (WAL replay)
Proxy 89 applies writes, reports lag
When lag < 1 second → Ready for cutover

Step 4: Cutover
Admin updates routing: partition 07:0042:05 → Proxy 89 (single writer)
Proxy 89 becomes primary
Proxy 42 continues serving reads temporarily

Step 5: Drain Source
Admin → Proxy 42: DrainPartition(07:0042:05)
Proxy 42: Wait for in-flight queries to complete
Proxy 42: Unload partition from memory
Proxy 42 → Admin: Drained

Step 6: Confirm Complete
Admin removes Proxy 42 from routing table for partition 07:0042:05
Migration complete

Downtime: Zero (dual-write + cutover)

Automatic Partition Temperature Management

Temperature Metrics:

  • Hot: >1000 requests/minute
  • Warm: 10-1000 requests/minute
  • Cold: <10 requests/minute

Automatic Actions:

partition_temperature_policy:
hot_threshold: 1000 # requests/minute
cold_threshold: 10

actions:
# Hot partition: Ensure in memory
- condition: temperature == HOT
action: load_to_memory
priority: high

# Warm partition: Keep in memory if space available
- condition: temperature == WARM
action: keep_in_memory_if_space
priority: medium

# Cold partition: Offload to S3
- condition: temperature == COLD
action: offload_to_s3
delay: 3600s # Keep in memory for 1 hour before offloading

Implementation:

func (pm *PartitionManager) MonitorTemperature() {
ticker := time.NewTicker(60 * time.Second)

for range ticker.C {
for _, partition := range pm.GetAllPartitions() {
temp := pm.GetTemperature(partition.ID)

switch {
case temp.RequestsPerMinute > 1000:
// Hot: Ensure in memory
if partition.State != PartitionStateHot {
pm.LoadPartitionToMemory(partition.ID)
}

case temp.RequestsPerMinute < 10:
// Cold: Offload to S3 after delay
if time.Since(temp.LastAccess) > time.Hour {
pm.OffloadPartitionToS3(partition.ID)
}

default:
// Warm: No action
}
}
}
}

Failure Detection and Recovery

Problem: At 1000 nodes, hardware failures are not exceptional—they are statistically inevitable. With a mean time between failures (MTBF) of 3 years per node, expect ~1 failure per day cluster-wide. Without automated detection and recovery, manual intervention would be required multiple times daily, making the system operationally infeasible.

Failure Probability at Scale

Cluster: 1000 nodes
MTBF per node: 3 years = 1,095 days
Cluster-wide failure rate: 1000 nodes ÷ 1095 days = 0.91 failures/day

Expected failures:
- Per day: ~1 node failure
- Per week: ~7 node failures
- Per month: ~30 node failures
- Per year: ~333 node failures (1/3 of cluster replaced annually)

Key Insight: Failure is not an edge case—it's the steady state. Recovery must be fully automated, <60s MTTR, and transparent to queries.

Heartbeat-Based Failure Detection

Three-Tier Heartbeat System:

heartbeat_configuration:
# Tier 1: Proxy → Cluster Gateway
proxy_to_gateway:
interval: 5s
timeout: 15s
missed_threshold: 3 # 3 missed heartbeats = 15s detection

# Tier 2: Cluster Gateway → Global Coordinator
gateway_to_coordinator:
interval: 10s
timeout: 30s
missed_threshold: 3 # 30s detection

# Tier 3: Partition → Proxy (health check)
partition_health:
check_interval: 1s
check_type: simple_query # SELECT 1 equivalent
timeout: 100ms

Implementation:

type FailureDetector struct {
proxies map[string]*ProxyHealth
lastHeartbeat map[string]time.Time
missedHeartbeats map[string]int
failureHandlers []FailureHandler
}

func (fd *FailureDetector) MonitorHeartbeats() {
ticker := time.NewTicker(5 * time.Second)

for range ticker.C {
now := time.Now()

for proxyID, lastSeen := range fd.lastHeartbeat {
timeSinceLastHeartbeat := now.Sub(lastSeen)

// Check if heartbeat missed
if timeSinceLastHeartbeat > 5*time.Second {
fd.missedHeartbeats[proxyID]++

// Threshold exceeded → Mark as failed
if fd.missedHeartbeats[proxyID] >= 3 {
fd.HandleProxyFailure(proxyID)
}
} else {
// Heartbeat received → Reset counter
fd.missedHeartbeats[proxyID] = 0
}
}
}
}

func (fd *FailureDetector) ReceiveHeartbeat(proxyID string) {
fd.lastHeartbeat[proxyID] = time.Now()
fd.missedHeartbeats[proxyID] = 0
}

Detection Latency: 15 seconds maximum (3 missed × 5s intervals)

Recovery Strategy: Fast Failover vs S3 Restore

Option A: Replica Failover (Fast)

When partition has in-cluster replicas (3× replication within cluster):

Failure detected: Proxy 42 unresponsive (15s)

Step 1: Identify affected partitions (500ms)
- Proxy 42 hosts partitions 07:0042:00 through 07:0042:63 (64 partitions)

Step 2: Find replica proxies (100ms)
- Primary: Proxy 42 (FAILED)
- Replica 1: Proxy 18 (HEALTHY)
- Replica 2: Proxy 91 (HEALTHY)

Step 3: Promote Replica 1 to primary (5s)
- Update routing table: 07:0042:* → Proxy 18
- Broadcast routing update to all proxies
- Resume query traffic

Total recovery time: 15s detection + 5s promotion = 20 seconds
Data loss: None (replica up-to-date via WAL)

Implementation:

func (fm *FailureManager) HandleProxyFailure(proxyID string) error {
// Get all partitions on failed proxy
partitions := fm.GetPartitionsOnProxy(proxyID)

for _, partition := range partitions {
// Check for in-cluster replicas
replicas := fm.GetReplicasForPartition(partition.ID)

if len(replicas) > 0 {
// Fast path: Promote replica
healthyReplica := fm.FindHealthyReplica(replicas)
if healthyReplica != nil {
fm.PromoteReplica(partition.ID, healthyReplica.ProxyID)
log.Infof("Promoted replica for partition %s to proxy %s (10s recovery)",
partition.ID, healthyReplica.ProxyID)
continue
}
}

// Slow path: Restore from S3
fm.RestorePartitionFromS3(partition.ID)
}

return nil
}

Option B: S3 Restore (Slow)

When no in-cluster replicas available:

Failure detected: Proxy 42 unresponsive (15s)

Step 1: Identify affected partitions (500ms)
- Partitions 07:0042:00 through 07:0042:63 (64 partitions)

Step 2: Assign to replacement proxy (1s)
- Select least-loaded proxy: Proxy 23

Step 3: Restore from S3 (parallel, 64 partitions) (5 min)
- Download partition snapshot: s3://cluster-07/partition-0042-00.pb.gz
- Decompress and load into memory
- Rebuild indexes
- 64 partitions × 156 MB = 10 GB total
- Download: 10 GB ÷ 200 MB/s = 50 seconds
- Decompress + Load: 4 minutes

Total recovery time: 15s detection + 5 min restore = 5 min 15 seconds
Data loss: Writes since last snapshot (< 5 minutes if hourly snapshots)

S3 Restore Implementation:

func (fm *FailureManager) RestorePartitionFromS3(partitionID string) error {
// Download snapshot from S3
s3Path := fmt.Sprintf("s3://cluster-%s/partition-%s.pb.gz", fm.clusterID, partitionID)
localPath := fmt.Sprintf("/tmp/%s.pb.gz", partitionID)

err := fm.s3Client.DownloadFile(s3Path, localPath)
if err != nil {
return fmt.Errorf("S3 download failed: %w", err)
}

// Decompress
decompressed, err := gzip.Decompress(localPath)
if err != nil {
return fmt.Errorf("decompression failed: %w", err)
}

// Load partition
partition := &Partition{}
proto.Unmarshal(decompressed, partition)

// Rebuild indexes
fm.RebuildPartitionIndexes(partition)

// Update routing table
fm.UpdateRoutingTable(partitionID, fm.replacementProxyID)

log.Infof("Restored partition %s from S3 (5 min recovery)", partitionID)
return nil
}

Cascading Failure Prevention

Problem: Single node failure can trigger cascading failures if remaining nodes become overloaded with rerouted traffic.

Circuit Breaker Pattern:

type CircuitBreaker struct {
State CircuitState
FailureThreshold int
FailureCount int
SuccessThreshold int
LastFailureTime time.Time
HalfOpenMaxRequests int
}

func (cb *CircuitBreaker) Call(fn func() error) error {
switch cb.State {
case CircuitClosed:
// Normal operation
err := fn()
if err != nil {
cb.RecordFailure()
if cb.FailureCount >= cb.FailureThreshold {
cb.Trip()
return CircuitOpenError{}
}
} else {
cb.RecordSuccess()
}
return err

case CircuitOpen:
// Reject requests to prevent cascading failure
if time.Since(cb.LastFailureTime) > 30*time.Second {
cb.State = CircuitHalfOpen
return cb.Call(fn)
}
return CircuitOpenError{Message: "Circuit breaker open, proxy overloaded"}

case CircuitHalfOpen:
// Test with limited requests
err := fn()
if err != nil {
cb.Trip()
return err
}
cb.RecordSuccess()
if cb.SuccessCount >= cb.SuccessThreshold {
cb.Reset()
}
return nil
}
}

Configuration:

circuit_breaker:
failure_threshold: 10 # Open circuit after 10 consecutive failures
success_threshold: 5 # Close circuit after 5 consecutive successes
timeout: 30s # Time before attempting half-open
half_open_max_requests: 10 # Maximum requests in half-open state

Operational Runbooks

Scenario 1: Single Proxy Failure

Detection: Heartbeat missed for 15 seconds
Symptoms: 64 partitions (1/16,000) unavailable

Automated Response:
1. Mark proxy as FAILED in routing table
2. Attempt replica failover (if available)
3. If no replica: Restore from S3 (5 min)
4. Resume traffic

Human Action Required: None (fully automated)
Expected Duration: 20s (replica) or 5 min (S3)

Scenario 2: Cluster-Wide Network Partition

Detection: All proxies in Cluster 7 unresponsive
Symptoms: 1,600 partitions (10% of cluster) unavailable

Automated Response:
1. Mark entire cluster as UNAVAILABLE
2. Reroute queries to remaining 9 clusters
3. Wait for network recovery (no data migration yet)

Human Action Required: Investigate network partition
Expected Duration: Minutes to hours (network-dependent)

Scenario 3: Cascading Overload

Detection: Circuit breakers opening on multiple proxies
Symptoms: Increased latency, error rate >10%

Automated Response:
1. Activate circuit breakers on overloaded proxies
2. Shed low-priority query load
3. Alert on-call engineer

Human Action Required: Scale cluster or reduce load
Expected Duration: Hours (requires capacity planning)

Mean Time to Recovery (MTTR) Targets

Failure TypeDetection TimeRecovery TimeTotal MTTRMethod
Single proxy failure15s20s35sReplica failover
Single proxy failure (no replica)15s5 min5m 15sS3 restore
Partition corruptionImmediate5 min5 minS3 restore
Network partition30sVariable30s-hoursWait for recovery
Cascading overload1 minVariableMinutesCircuit breaker

Monitoring and Alerting

Prometheus Metrics:

# Failure detection
prism_proxy_heartbeat_missed_total{proxy="42"} 3
prism_proxy_state{proxy="42",state="failed"} 1

# Recovery metrics
prism_partition_recovery_duration_seconds{method="replica"} 20
prism_partition_recovery_duration_seconds{method="s3"} 315

# Circuit breaker state
prism_circuit_breaker_state{proxy="23"} 1 # 0=closed, 1=open, 2=half-open
prism_circuit_breaker_failures_total{proxy="23"} 10

Alerting Rules:

alerts:
- name: ProxyDown
condition: prism_proxy_state{state="failed"} == 1
severity: critical
message: "Proxy {{$labels.proxy}} failed, recovery in progress"

- name: HighRecoveryRate
condition: rate(prism_partition_recovery_duration_seconds_count[5m]) > 0.1
severity: warning
message: "High partition recovery rate: {{$value}} recoveries/min"

- name: CircuitBreakerOpen
condition: prism_circuit_breaker_state{state="open"} == 1
severity: critical
message: "Circuit breaker open on proxy {{$labels.proxy}}, possible overload"

Summary

Failure detection and recovery is mandatory at 1000-node scale. Key design decisions:

  1. Fast Detection: 15-second heartbeat-based detection
  2. Two-Tier Recovery:
    • Fast path: Replica failover (20s MTTR)
    • Slow path: S3 restore (5 min MTTR)
  3. Cascading Prevention: Circuit breakers prevent overload spread
  4. Full Automation: No human intervention for common failures
  5. Monitoring: Comprehensive metrics and alerting for operational visibility

Impact: System can sustain 1 failure/day without human intervention, maintains 99.99% availability (52 minutes downtime/year), enables true lights-out operation at massive scale.

Performance Characteristics

Query Latency

Query TypePartitions TouchedLatency (P50)Latency (P99)
Single vertex lookup150 μs200 μs
1-hop traversal (local)1500 μs2 ms
1-hop traversal (distributed)105 ms20 ms
2-hop traversal (local)12 ms10 ms
2-hop traversal (distributed)10050 ms200 ms
3-hop traversal (distributed)500500 ms2 s
Global scan (all partitions)10002 s10 s

Throughput

OperationThroughputNotes
Single partition write100k writes/secHot partition limit
Cluster aggregate write100M writes/sec1000 proxies × 100k
Single partition read1M reads/secIn-memory, no disk
Cluster aggregate read1B reads/sec1000 proxies × 1M
Cross-partition query10k queries/secLimited by coordinator
Distributed traversal1k traversals/sec3-hop average

Storage

TierCapacityCost ($/TB/month)Access Latency
In-Memory (Hot)30 TB$500 (RAM)50 μs
S3 Standard (Warm)1 PB$23100 ms
S3 Glacier (Cold)10 PB$112 hours

Total Storage Cost (100B vertices scenario):

  • 100B vertices × 100 bytes = 10 TB (vertices)
  • 10T edges × 20 bytes = 200 TB (edges)
  • Total: 210 TB raw data

Storage Distribution:

  • Hot (10%): 21 TB in-memory across 1000 proxies
  • Warm (40%): 84 TB on S3 Standard
  • Cold (50%): 105 TB on S3 Glacier

Monthly Cost:

  • Hot: $10,500 (RAM: $500/TB × 21 TB)
  • Warm: $1,932 (S3: $23/TB × 84 TB)
  • Cold: $105 (Glacier: $1/TB × 105 TB)
  • Total: ~$12,500/month

Operational Patterns

Capacity Planning

Vertices per Proxy: 100M vertices

Memory per Proxy: 30 GB

  • Vertex data: 100M × 100 bytes = 10 GB
  • Edge data: 1B × 20 bytes = 20 GB
  • Overhead (indexes, metadata): 10%

Cluster Size for 100B Vertices:

  • 100B vertices ÷ 100M per proxy = 1000 proxies
  • Distributed across 10 clusters = 100 proxies per cluster

Adding Capacity:

  1. Add new cluster (0-10B vertices)
  2. Add proxies to existing clusters (0-100M vertices)
  3. Add partitions to existing proxies (0-1.56M vertices)

Monitoring

# Prometheus metrics

# Partition distribution
prism_graph_partitions_total{cluster="0",state="hot"} 800
prism_graph_partitions_total{cluster="0",state="warm"} 400
prism_graph_partitions_total{cluster="0",state="cold"} 400

# Query latency by partition count
prism_graph_query_latency_seconds{partitions="1",quantile="0.5"} 0.0005
prism_graph_query_latency_seconds{partitions="100",quantile="0.5"} 0.05

# Cross-partition queries
prism_graph_cross_partition_queries_total{cluster="0"} 5000000

# Partition migrations
prism_graph_partition_migrations_total{from_proxy="42",to_proxy="89"} 3

Disaster Recovery

Scenario: Entire cluster failure (100 proxies lost)

Recovery Strategy:

Option 1: Rebuild from S3
1. Spin up 100 new proxies
2. Each proxy downloads its partitions from S3
3. Load hot partitions into memory
4. Resume serving traffic

Time: 30 minutes (parallel S3 downloads)
Data loss: None (S3 is source of truth)

Option 2: Reroute to other clusters
1. Admin reassigns failed cluster's vertices to other clusters
2. Update routing tables
3. Other clusters load additional partitions

Time: 5 minutes
Data loss: None
Trade-off: Other clusters temporarily overloaded

Migration Path

Phase 1: Extend RFC-055 to 10B Vertices (100 Proxies)

Goal: Scale current architecture 10× without breaking changes

Changes:

  • Increase proxy count: 10 → 100
  • Same MemStore backend
  • Same consistent hashing

Deliverables:

  • Updated partition count: 256 → 6,400 (64 per proxy, based on MEMO-050 Finding 6)
  • Admin plane supports 100 proxy registrations
  • Cross-proxy RPC pool expanded to 100 connections

Phase 2: Add Hierarchical Clustering (10 Clusters × 100 Proxies)

Goal: Introduce cluster tier for 100B vertices

Changes:

  • Add cluster concept (10 clusters)
  • Each cluster has 100 proxies
  • Update vertex ID format: {cluster}:{proxy}:{partition}:{id}

Deliverables:

  • Cluster-aware routing
  • Cluster gateway component
  • Inter-cluster query forwarding

Phase 3: Partition Temperature Management

Goal: Introduce hot/cold storage tiers

Changes:

  • Integrate S3 backend (RFC-059)
  • Implement partition temperature monitoring
  • Automatic memory/S3 offloading

Deliverables:

  • Partition state machine (hot/warm/cold)
  • S3 snapshot format for partitions
  • Temperature-based eviction policies

Phase 4: Advanced Query Optimization

Goal: Distributed Gremlin query execution

Changes:

  • Partition-aware query planning (RFC-060)
  • Multi-level indexing (RFC-058)
  • Authorization filtering (RFC-061)

Deliverables:

  • Query coordinator component
  • Distributed query execution engine
  • Index-accelerated traversals

Open Questions

  1. Optimal Partition Count per Proxy: 64 partitions recommended (MEMO-050 Finding 6) for finer granularity. Should this be dynamic based on workload?
  2. Cross-Region Sharding: How to shard graph across multiple AWS regions?
  3. Graph Replication: Should hot partitions be replicated for read scalability?
  4. Partition Split/Merge: How to handle partition growth beyond 1.56M vertices?
  5. Consistency Model: Strong consistency vs eventual consistency for distributed writes?

References

Revision History

  • 2025-11-15: Initial draft - Massive-scale graph sharding for 100B vertices