RFC-057: Massive-Scale Graph Sharding for 100B Vertices
Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15
Abstract
This RFC extends Prism's graph pattern (RFC-055) to support massive-scale distributed graphs with 100 billion vertices and trillions of edges. The existing architecture supports 1 billion vertices across 10 proxy instances (100M vertices per proxy). Scaling to 100B vertices requires a multi-tier sharding strategy with lightweight compute nodes, hierarchical partition routing, and intelligent data placement. This RFC defines the distributed sharding architecture, partition strategies, query routing protocols, and operational patterns needed to achieve this scale.
Key Innovations:
- Hierarchical Sharding: Three-tier architecture (cluster → proxy → partition)
- Lightweight Nodes: 1000+ nodes with 100M vertices each
- Hot/Cold Tier Integration: In-memory hot data, S3 cold storage (RFC-059)
- Partition-Aware Queries: Gremlin queries optimized for distributed execution
- Dynamic Rebalancing: Automatic partition migration without downtime
- Fine-Grained Authorization: Vertex-level access control (RFC-061)
Motivation
Current Scale Limitations
RFC-055 demonstrates 1B vertices across 10 proxies:
| Scale Dimension | RFC-055 (Current) | This RFC (Target) | Multiplier |
|---|---|---|---|
| Total Vertices | 1 billion | 100 billion | 100× |
| Total Edges | 10 billion | 10 trillion | 1000× |
| Proxy Instances | 10 | 1000 | 100× |
| Vertices per Node | 100M | 100M | 1× |
| Memory per Node | 30 GB | 30 GB | 1× |
| Total Memory | 300 GB | 30 TB | 100× |
Why Scale Beyond 1B Vertices?
Use Cases Requiring 100B+ Scale
1. Global Social Network Graph
Example: Facebook/Meta-scale social graph
- Users: 3 billion active users
- Connections: Average 200 friends/connections per user
- Total Edges: 600 billion edges (friend relationships)
- Content Interactions: 10 trillion edges (likes, comments, shares)
- Total Vertices: 3B users + 50B posts/photos/videos = 53B vertices
Query Patterns:
- Friends of friends (2-3 hop traversals)
- Mutual connections
- Content recommendations based on friend graph
- Influence analysis (PageRank on 3B users)
2. Global Financial Transaction Network
Example: SWIFT/Visa-scale transaction graph
- Accounts: 5 billion bank accounts worldwide
- Merchants: 100 million merchants
- Transactions: 500 billion transactions per year (vertices)
- Transaction edges: Account → Transaction → Merchant
- Total Vertices: 505B vertices
- Total Edges: 1 trillion edges
Query Patterns:
- Fraud detection (circular transaction paths)
- Money laundering detection (complex multi-hop patterns)
- Account risk scoring (network analysis)
- Real-time suspicious pattern detection
3. Internet of Things (IoT) Device Network
Example: Smart city/industrial IoT graph
- Devices: 50 billion IoT devices globally by 2030
- Sensors: 200 billion sensor readings/day (time-series vertices)
- Device relationships: Physical topology, data flows
- Total Vertices: 50B devices + 73 trillion sensor readings/year = 73B+ vertices
- Total Edges: 500B device-to-device edges + trillions of data flow edges
Query Patterns:
- Device dependency analysis
- Cascade failure prediction
- Data lineage tracking
- Real-time anomaly detection across device clusters
4. Knowledge Graph at Web Scale
Example: Google Knowledge Graph scale
- Entities: 500 billion entities (people, places, things, concepts)
- Relationships: 3.5 trillion semantic relationships
- Total Vertices: 500B entities
- Total Edges: 3.5T relationships
Query Patterns:
- Semantic search (multi-hop entity resolution)
- Question answering (traverse relationship chains)
- Entity disambiguation
- Recommendation via semantic similarity
Problems with Current Architecture
Problem 1: Limited Node Count (10 proxies)
RFC-055 architecture assumes 10-20 proxy instances:
- Partition count fixed at 256
- Consistent hashing assumes small cluster
- Control plane distribution limited to 10-20 nodes
Problem 2: No Hierarchical Routing
Flat partition model doesn't scale to 1000+ nodes:
- Every node needs partition table for all 1000 nodes
- Cross-node queries require 1000-way coordination
- Partition rebalancing touches all nodes
Problem 3: Memory-Only Architecture
RFC-055 keeps entire graph in memory:
- 100B vertices × 100 bytes = 10 TB minimum
- 10T edges × 20 bytes = 200 TB minimum
- Total: 210 TB in-memory (not feasible)
Problem 4: No Data Locality Optimization
Random vertex assignment via consistent hashing:
- Traversals always hit multiple nodes
- No co-location of frequently accessed subgraphs
- No optimization for access patterns
Problem 5: Limited Query Optimization
Single-proxy query execution:
- No distributed query planning
- No partition pruning for filtered queries
- No parallel execution across partitions
Goals
- Horizontal Scalability: Support 1000+ lightweight compute nodes
- Hierarchical Sharding: Three-tier partition hierarchy (cluster → proxy → partition)
- Cost-Effective Storage: Hot/cold data tiers (memory + S3)
- Intelligent Placement: Co-locate related vertices for query optimization
- Distributed Queries: Gremlin query execution across 1000+ nodes
- Dynamic Rebalancing: Add/remove nodes without downtime
- Authorization-Aware: Vertex labeling for fine-grained access control
Non-Goals
- Backend-Agnostic Sharding: This RFC assumes in-memory MemStore + S3 (not Neptune, Neo4j)
- Multi-Region Distribution: Cross-region replication covered in RFC-012
- ACID Transactions: Distributed transactions across shards (future work)
- Schema Evolution: Graph schema changes not covered here
- Real-Time Ingestion: Bulk loading focus, streaming ingestion in RFC-051 (WAL pattern)
Hierarchical Sharding Architecture
Three-Tier Partition Model
┌────────────────────────────────────────────────────────────────┐
│ Tier 1: Cluster │
│ │
│ Global Coordinator (Admin Plane) │
│ - 10-20 clusters, each with 100 proxy nodes │
│ - Cluster ID: 0-19 (e.g., us-west-1, us-east-1) │
│ │
└────────────────────┬───────────────────────────────────────────┘
│
┌───────────┼───────────┬─────────────┐
│ │ │ │
┌────────▼─────┐ ┌──▼────────┐ ┌▼──────────┐ ┌▼──────────────┐
│ Cluster 0 │ │ Cluster 1 │ │ Cluster 2 │ │ ... │
│ (us-west-1) │ │(us-east-1)│ │(eu-west-1)│ │ Cluster 19 │
│ │ │ │ │ │ │ │
│ 100 proxies │ │ 100 proxies│ │100 proxies│ │ 100 proxies │
│ 10B vertices │ │10B vertices│ │10B vertices│ │10B vertices │
└────────┬─────┘ └───────────┘ └───────────┘ └───────────────┘
│
│ Tier 2: Proxy (within cluster)
│
┌────────▼───────────────────────────────────────────────────────┐
│ Cluster 0: 100 Proxy Instances │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Proxy │ │Proxy │ │Proxy │ ... │Proxy │ │
│ │ 0 │ │ 1 │ │ 2 │ │ 99 │ │
│ │ │ │ │ │ │ │ │ │
│ │100M V│ │100M V│ │100M V│ │100M V│ │
│ └───┬──┘ └───┬──┘ └───┬──┘ └───┬──┘ │
│ │ │ │ │ │
└──────┼────────┼────────┼──────────────────┼───────────────────┘
│ │ │ │
│ Tier 3: Partitions (within proxy)
│
┌──────▼──────────────────────────────────────────┐
│ Proxy 0: 64 Partitions │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Part 0 │ │ Part 1 │ │ Part 63 │ │
│ │ 1.56M V │ │ 1.56M V │ │ 1.56M V │ ... │
│ │ Hot data │ │ Warm data│ │ Cold data│ │
│ │ In-memory│ │ Partial │ │ S3 backed│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└──────────────────────────────────────────────────┘
Partition Addressing
Vertex ID Format: {cluster_id}:{proxy_id}:{partition_id}:{local_vertex_id}
Example: 02:0045:12:user_alice
- Cluster 2 (eu-west-1)
- Proxy 45 (within cluster 2)
- Partition 12 (within proxy 45)
- Local ID:
user_alice
Routing Algorithm:
Vertex ID: "02:0045:12:user_alice"
Step 1: Parse vertex ID
cluster_id = 02
proxy_id = 0045
partition_id = 12
local_vertex_id = "user_alice"
Step 2: Route to cluster
Global coordinator → Cluster 2 gateway
Step 3: Route to proxy
Cluster 2 gateway → Proxy 45
Step 4: Route to partition
Proxy 45 → Partition 12
Step 5: Execute query
Partition 12 → Lookup "user_alice" in local MemStore/S3
Alternative: Opaque Vertex IDs
Trade-Off: Hierarchical IDs (above) embed topology in vertex ID, making routing fast but rebalancing expensive. Opaque IDs decouple vertex identity from physical location, enabling free rebalancing at the cost of routing overhead (MEMO-050 Finding 15).
Hierarchical IDs (Current Approach)
Advantages:
- Zero-overhead routing: Parse vertex ID to determine partition (O(1), ~10 ns)
- No external dependencies: No routing table required
- Deterministic: Same vertex ID always routes to same partition
Disadvantages:
- Expensive rebalancing: Moving partition requires rewriting all vertex IDs
- Topology-dependent: Vertex IDs encode cluster/proxy/partition structure
- Limited flexibility: Cannot easily move individual vertices between partitions
Rebalancing Cost Example:
Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89
With hierarchical IDs:
1. Update all 1.56M vertex IDs in partition
- Old: 07:0042:05:user_alice → New: 07:0089:05:user_alice
2. Update all incoming edge references (10M edges pointing to this partition)
3. Update all indexes containing these vertex IDs
4. Broadcast ID changes to all proxies
Cost: ~30 minutes for 1.56M vertices + 10M edge updates
Downtime: Partition unavailable during rewrite
Opaque IDs (Alternative Approach)
Design: Vertex IDs are globally unique, topology-independent identifiers. Routing table maps opaque ID → physical location.
Vertex ID Format: vertex:{uuid} or vertex:{snowflake_id}
Example: vertex:550e8400-e29b-41d4-a716-446655440000
Routing Table Architecture:
message VertexRoutingTable {
// Distributed hash table: vertex_id → partition_location
map<string, PartitionLocation> routing_table = 1;
// Bloom filter for fast negative lookups
bytes bloom_filter = 2;
}
message PartitionLocation {
string cluster_id = 1;
string proxy_id = 2;
string partition_id = 3;
int64 last_updated = 4;
}
Routing Implementation:
type OpaqueVertexRouter struct {
routingTable *DistributedHashTable
bloomFilter *bloom.BloomFilter
cache *RoutingCache
}
func (ovr *OpaqueVertexRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Fast path: Check cache (99% hit rate)
if location := ovr.cache.Get(vertexID); location != nil {
return location, nil
}
// Bloom filter: Fast negative lookup
if !ovr.bloomFilter.Contains(vertexID) {
return nil, VertexNotFoundError{VertexID: vertexID}
}
// Slow path: Lookup in routing table
location, err := ovr.routingTable.Get(vertexID)
if err != nil {
return nil, err
}
// Cache result
ovr.cache.Put(vertexID, location)
return location, nil
}
Routing Table Storage: Distributed across cluster using consistent hashing
type DistributedRoutingTable struct {
shards map[uint32]*RoutingTableShard
}
func (drt *DistributedRoutingTable) GetShard(vertexID string) *RoutingTableShard {
hash := xxhash.Sum64String(vertexID)
shardID := hash % uint64(len(drt.shards))
return drt.shards[uint32(shardID)]
}
func (drt *DistributedRoutingTable) Get(vertexID string) (*PartitionLocation, error) {
shard := drt.GetShard(vertexID)
return shard.Lookup(vertexID)
}
Routing Cache Strategy:
routing_cache_config:
type: local_lru # Per-proxy cache
max_entries: 1_000_000 # 1M vertex locations cached
ttl: 3600s # 1 hour expiration
memory_size: 100 MB # ~100 bytes per entry
performance:
hit_rate: 99% # Power-law: 1% of vertices get 99% of queries
cold_lookup: 1 μs # Cache miss → routing table lookup
hot_lookup: 10 ns # Cache hit
Rebalancing with Opaque IDs:
Scenario: Move partition 07:0042:05 from Proxy 42 to Proxy 89
With opaque IDs:
1. Copy partition data to Proxy 89
2. Update routing table: vertex_id → 07:0089:05 (for all vertices in partition)
3. Broadcast routing table update to all proxies
4. Invalidate caches
Cost: ~10 seconds for routing table updates
Downtime: None (dual-read during migration)
Vertex IDs: Unchanged (no rewrites needed)
Performance Comparison
Routing Latency:
| Operation | Hierarchical IDs | Opaque IDs (Cold) | Opaque IDs (Hot) |
|---|---|---|---|
| Parse vertex ID | 10 ns | - | - |
| Routing table lookup | - | 1 μs | 10 ns (cached) |
| Total routing time | 10 ns | 1 μs | 10 ns |
Cache hit rate: 99% (power-law distribution)
Effective routing latency: 0.01 × 1 μs + 0.99 × 10 ns = 20 ns average
Rebalancing Time:
| Operation | Hierarchical IDs | Opaque IDs | Speedup |
|---|---|---|---|
| Move single partition | 30 minutes | 10 seconds | 180× |
| Move 100 partitions | 50 hours | 16 minutes | 187× |
| Rebalance entire cluster | 3 days | 2 hours | 36× |
Hybrid Approach (Recommended)
Best of both worlds: Use hierarchical IDs for most vertices, opaque IDs for frequently moved data
vertex_id_strategy:
default: hierarchical # Most vertices use cluster:proxy:partition:id
opaque_for:
- hot_partitions # Frequently rebalanced partitions
- cross_partition_vertices # Vertices referenced from many partitions
- user_managed_ids # Application-controlled IDs
routing_table:
shards: 256 # Distributed routing table shards
replication: 3 # Routing table replicated 3×
cache_size: 1_000_000 # Per-proxy cache
Hybrid Routing:
func (hr *HybridRouter) RouteVertex(vertexID string) (*PartitionLocation, error) {
// Check if hierarchical or opaque
if hr.IsHierarchicalID(vertexID) {
// Fast path: Parse ID directly
return hr.ParseHierarchicalID(vertexID), nil
}
// Slow path: Lookup in routing table
return hr.opaqueRouter.RouteVertex(vertexID)
}
func (hr *HybridRouter) IsHierarchicalID(vertexID string) bool {
// Hierarchical IDs match pattern: cluster:proxy:partition:local_id
return strings.Count(vertexID, ":") == 3 && len(strings.Split(vertexID, ":")) == 4
}
Decision Matrix
| Use Case | Recommended Approach | Rationale |
|---|---|---|
| Static graph | Hierarchical IDs | No rebalancing needed, zero routing overhead |
| Frequently rebalanced | Opaque IDs | 180× faster rebalancing, 1 μs routing acceptable |
| Mixed workload | Hybrid (80% hierarchical, 20% opaque) | Optimize common case, flexibility for hot data |
| Application-controlled IDs | Opaque IDs | Apps provide IDs, system maintains routing |
Summary
Opaque vertex IDs trade 1 μs routing overhead for free rebalancing:
- Routing overhead: 1 μs cold, 10 ns hot (99% cache hit rate)
- Rebalancing speedup: 180× faster (10 seconds vs 30 minutes per partition)
- Memory overhead: 100 MB per proxy for routing cache (negligible)
- Operational benefit: Enable live rebalancing without downtime
Recommendation: Start with hierarchical IDs for simplicity, migrate to hybrid approach when operational flexibility is needed. Reserve opaque IDs for hot partitions and application-managed identifiers.
Partition Size Guidelines
| Tier | Count | Size | Memory | Purpose |
|---|---|---|---|---|
| Cluster | 10-20 | 5-10B vertices | - | Geographic/organizational boundary |
| Proxy | 100 per cluster | 100M vertices | 30 GB | Compute node with in-memory cache |
| Partition | 64 per proxy | 1.56M vertices | 156 MB | Hot/warm/cold data unit |
Why 64 partitions per proxy? (Updated from 16 based on MEMO-050 Finding 6)
- Finer hot/cold granularity: 156 MB partitions vs 625 MB (4× smaller)
- Faster rebalancing: 10× more parallelism, 13 seconds vs 2.1 minutes
- Better load distribution: Lower variance (2% vs 15%)
- Smaller failure blast radius: 1.56M vertices vs 6.25M
Trade-off: Slightly more partition metadata (64k vs 16k cluster-wide), but benefits outweigh costs at 100B scale.
Network Topology-Aware Sharding
Critical: At 100B scale, cross-AZ bandwidth costs $365M/year without network-aware placement (MEMO-050 Finding 3). This section defines network topology integration for 95% cost savings.
AWS Network Cost Model
| Network Path | Latency | Cost | Use Case |
|---|---|---|---|
| Same rack | 100 μs | $0/GB | Preferred |
| Same AZ | 200 μs | $0/GB | Good |
| Cross-AZ | 1-2 ms | $0.01/GB | Expensive! |
| Cross-region | 20-100 ms | $0.02/GB | Very expensive! |
Cost Impact Without Optimization:
Typical 2-hop query: 100 partitions × 100 KB = 10 MB
50% cross-AZ traffic: 5 MB cross-AZ per query
At 1B queries/day: 5 PB/day × $0.01/GB = $50M/day = $1.5B/month ❌
Cost Impact With Optimization:
AZ-aware placement: 5% cross-AZ traffic (not 50%)
At 1B queries/day: 250 TB/day × $0.01/GB = $2.5M/day = $75M/month
Annual: $900M/year → With CDN: $30M/year ✅
Extended Partition Metadata
message PartitionMetadata {
string partition_id = 1;
string cluster_id = 2;
string proxy_id = 3;
// Network topology (NEW)
NetworkLocation location = 4;
}
message NetworkLocation {
string region = 1; // "us-west-2"
string availability_zone = 2; // "us-west-2a"
string rack_id = 3; // "rack-42" (optional, for same-DC optimization)
// Network characteristics for cost/latency calculations
float cross_az_bandwidth_gbps = 4; // Typical: 10 Gbps
float cross_region_bandwidth_gbps = 5; // Typical: 1-5 Gbps
}
Multi-AZ Deployment Strategy
cluster_topology:
region: us-west-2
availability_zones:
- id: us-west-2a
proxies: 334
primary_for: [cluster_0, cluster_3, cluster_6, cluster_9]
- id: us-west-2b
proxies: 333
primary_for: [cluster_1, cluster_4, cluster_7]
- id: us-west-2c
proxies: 333
primary_for: [cluster_2, cluster_5, cluster_8]
# Partition replication for availability
partition_replication:
factor: 3 # 3 replicas per partition
placement: different_az # Each replica in different AZ
consistency: eventual # Async replication (lower cross-AZ cost)
read_preference: primary_az # Read from local AZ first (zero cost)
# Failover policy
failover:
enabled: true
timeout: 100ms # Try local AZ first
fallback: cross_az # Fall back to remote AZ if needed
Locality-Aware Partitioning
Strategy: Co-locate frequently co-traversed vertices in same AZ.
type NetworkAwarePartitioner struct {
topologyMap *NetworkTopology
localityHints map[string]string // vertex_id → preferred_az
}
func (nap *NetworkAwarePartitioner) AssignPartition(
vertex *Vertex,
placementHint *PlacementHint,
) string {
// Default: Consistent hashing (random placement)
defaultPartition := nap.ConsistentHash(vertex.ID)
// If hint provided, prefer co-location in same AZ
if placementHint != nil && placementHint.CoLocateWithVertex != "" {
// Place in same AZ as target vertex
targetPartition := nap.GetPartitionForVertex(placementHint.CoLocateWithVertex)
targetAZ := nap.topologyMap.GetAZ(targetPartition)
// Find partition in same AZ with capacity
sameAZPartition := nap.FindPartitionInAZ(targetAZ)
if sameAZPartition != "" {
return sameAZPartition
}
}
// Fall back to consistent hashing
return defaultPartition
}
// Example: Social graph with locality
func CreateFriendship(user1, user2 string) error {
// Place user2 in same AZ as user1 if possible
edge := &Edge{
FromVertexID: user1,
ToVertexID: user2,
Label: "FRIENDS",
PlacementHint: &PlacementHint{
CoLocateWithVertex: user1,
Reason: "Reduce cross-AZ traffic for friend traversals",
},
}
return CreateEdge(edge)
}
Query Routing with Network Cost
type NetworkCostModel struct {
sameAZLatency time.Duration // 200 μs
crossAZLatency time.Duration // 2 ms
crossRegionLatency time.Duration // 50 ms
sameAZCost float64 // $0/GB
crossAZCost float64 // $0.01/GB
crossRegionCost float64 // $0.02/GB
}
func (qp *QueryPlanner) OptimizeWithNetworkCost(plan *QueryPlan) *QueryPlan {
for i, stage := range plan.Stages {
// Group partitions by AZ
partitionsByAZ := qp.GroupPartitionsByAZ(stage.TargetPartitions)
// Prefer partitions in same AZ as coordinator
coordinatorAZ := qp.GetCoordinatorAZ()
// Reorder: local AZ first, then remote
optimizedOrder := []string{}
optimizedOrder = append(optimizedOrder, partitionsByAZ[coordinatorAZ]...)
for az, partitions := range partitionsByAZ {
if az != coordinatorAZ {
optimizedOrder = append(optimizedOrder, partitions...)
}
}
stage.TargetPartitions = optimizedOrder
// Estimate network cost
stage.EstimatedNetworkCost = qp.CalculateNetworkCost(stage)
plan.Stages[i] = stage
}
return plan
}
func (qp *QueryPlanner) CalculateNetworkCost(stage *ExecutionStage) float64 {
cost := 0.0
coordinatorAZ := qp.GetCoordinatorAZ()
for _, partitionID := range stage.TargetPartitions {
partitionAZ := qp.GetPartitionAZ(partitionID)
// Estimate data transfer (100 KB per partition average)
dataTransferGB := 0.0001 // 100 KB
if partitionAZ == coordinatorAZ {
cost += 0 // Same AZ: free
} else {
cost += dataTransferGB * qp.networkCostModel.crossAZCost
}
}
return cost
}
Deployment Patterns by Scale
1B Vertices (10 Nodes): Single AZ
strategy: single_az
rationale: Small cluster, cross-AZ overhead > benefits
cost: $100k/year
availability: Single AZ failure = full outage
trade_off: Simple but less available
10B Vertices (100 Nodes): Multi-AZ with Read Preferences
strategy: multi_az_read_local
rationale: Amortize cross-AZ cost, most queries stay local
cost: $5M/year (10% cross-AZ traffic)
availability: AZ failure = 33% capacity loss (degraded)
trade_off: Balanced cost/availability
100B Vertices (1000 Nodes): Multi-AZ with Aggressive Optimization
strategy: multi_az_network_optimized
rationale: Network cost dominates, must optimize aggressively
cost: $47M/year (5% cross-AZ traffic)
availability: AZ failure = 33% capacity loss
trade_off: Complex but essential at scale
Cost Savings Summary
| Scale | Without AZ Awareness | With AZ Awareness | Savings |
|---|---|---|---|
| 1B vertices | $100k/year | $100k/year | 0% (single AZ) |
| 10B vertices | $45M/year | $5M/year | 89% |
| 100B vertices | $365M/year | $30M/year | 92% |
Key Takeaway: At 100B scale, network-aware placement is mandatory, not optional. Without it, bandwidth costs exceed infrastructure costs by 10×.
Sharding Strategies
Strategy 1: Hierarchical Consistent Hashing (Default)
Three-level hash:
1. Cluster Assignment: hash(vertex_id) % num_clusters → cluster_id
2. Proxy Assignment: hash(vertex_id) % proxies_per_cluster → proxy_id
3. Partition Assignment: hash(vertex_id) % partitions_per_proxy → partition_id
Example:
vertex_id = "user:alice"
Level 1 (Cluster):
xxhash64("user:alice") % 10 = 7 → Cluster 7
Level 2 (Proxy within cluster 7):
xxhash64("user:alice") % 100 = 42 → Proxy 42
Level 3 (Partition within proxy 42):
xxhash64("user:alice") % 64 = 37 → Partition 37
Full address: 07:0042:37:user:alice
Hash Function Choice: xxHash vs CRC32
At 100B scale, hash function quality significantly impacts load distribution. Poor distribution leads to hotspots, uneven resource utilization, and degraded performance.
Benchmark Comparison (100M vertex IDs hashed):
| Hash Function | Throughput | Collision Rate | Load Variance | Notes |
|---|---|---|---|---|
| xxHash64 | 8.5 GB/s | 1 in 100k | 2% | Recommended ✅ |
| CRC32 | 5.0 GB/s | 1 in 10k | 15% | Legacy, poor distribution |
| MurmurHash3 | 3.8 GB/s | 1 in 50k | 5% | Good, but slower |
| Jump Hash | 12.0 GB/s | N/A | <1% | Best for minimal rebalancing |
Key Findings:
- xxHash64: 1.7× faster than CRC32, 8× better load distribution (15% → 2% variance)
- Jump Hash: Optimal for scenarios requiring minimal data movement during rebalancing (adds/removes nodes with minimal key reassignment)
- CRC32: Deprecated due to poor distribution properties at massive scale
Implementation (Go):
import "github.com/cespare/xxhash/v2"
func GetPartitionAddress(vertexID string, numClusters, proxiesPerCluster, partitionsPerProxy int) string {
hash := xxhash.Sum64String(vertexID)
clusterID := hash % uint64(numClusters)
proxyID := hash % uint64(proxiesPerCluster)
partitionID := hash % uint64(partitionsPerProxy)
return fmt.Sprintf("%02d:%04d:%02d:%s", clusterID, proxyID, partitionID, vertexID)
}
Alternative: Jump Hash for Rebalancing
For scenarios with frequent topology changes (adding/removing proxies), Jump Hash minimizes data movement:
import "github.com/dgryski/go-jump"
func GetPartitionWithJumpHash(vertexID string, numPartitions int) int {
hash := xxhash.Sum64String(vertexID)
return int(jump.Hash(hash, numPartitions))
}
Jump Hash Properties:
- When adding 1 proxy: Only 1/N keys move (vs 50% with modulo hashing)
- Example: Add proxy 101 to 100-proxy cluster → Only 1% of keys rebalance
- Trade-off: Slightly more complex routing logic
Benefits:
- Even distribution across all tiers
- Deterministic routing (no lookups needed)
- Minimal rebalancing on topology changes
Drawbacks:
- No data locality (related vertices scattered)
- No access pattern optimization
Strategy 2: Locality-Aware Partitioning
Principle: Co-locate vertices that are frequently traversed together.
Examples:
Social Graph Locality
User "alice" has 200 friends. Co-locate alice + friends on same proxy.
Approach:
1. Assign user to cluster/proxy via consistent hash
2. When creating friend edge:
- Check if target vertex exists
- If not, create on SAME proxy as source vertex
3. Result: Most 1-hop traversals stay within single proxy
Benefits:
- Friends-of-friends queries mostly single-proxy
- No cross-node RPC for common queries
- 10-100× faster traversals
Financial Transaction Locality
Account "acct_123" has 1000 transactions. Co-locate transactions near account.
Approach:
1. Assign account to cluster/proxy
2. Create transactions on same proxy (partition determined by timestamp)
3. Result: Account history queries are local
Benefits:
- Transaction history: single proxy query
- Account risk analysis: local graph traversal
Implementation:
message CreateVertexRequest {
string id = 1;
string label = 2;
map<string, Value> properties = 3;
// Locality hint
optional PlacementHint placement_hint = 4;
}
message PlacementHint {
oneof hint {
string co_locate_with_vertex = 1; // Place near this vertex
string co_locate_with_label = 2; // Place with vertices of this type
string explicit_partition = 3; // Explicit partition ID
}
}
Co-location Algorithm:
func (g *GraphShardManager) CreateVertex(req *CreateVertexRequest) error {
var targetPartition string
if req.PlacementHint != nil {
switch hint := req.PlacementHint.Hint.(type) {
case *PlacementHint_CoLocateWithVertex:
// Place on same partition as target vertex
targetVertex := g.GetVertex(hint.CoLocateWithVertex)
targetPartition = targetVertex.PartitionID
case *PlacementHint_ExplicitPartition:
targetPartition = hint.ExplicitPartition
default:
// Fall back to consistent hashing
targetPartition = g.CalculatePartition(req.Id)
}
} else {
targetPartition = g.CalculatePartition(req.Id)
}
// Create vertex on target partition
return g.CreateVertexOnPartition(targetPartition, req)
}
Strategy 3: Label-Based Partitioning
Principle: Partition by vertex label (type), not vertex ID.
Example: Multi-tenant SaaS graph
Labels: User, Post, Comment, Organization
Partitioning:
Cluster 0: All "User" vertices
Cluster 1: All "Post" vertices
Cluster 2: All "Comment" vertices
Cluster 3: All "Organization" vertices
Benefits:
- Label-specific queries are single-cluster
- Easy to scale individual vertex types
- Clear capacity planning
Drawbacks:
- Cross-label traversals always multi-cluster
- Imbalanced load if one label dominates
Configuration:
partition_strategy:
type: label_based
label_cluster_mapping:
User: cluster_0
Post: cluster_1
Comment: cluster_2
Organization: cluster_3
fallback: consistent_hash # For unknown labels
Strategy 4: Hybrid Sharding
Principle: Combine strategies based on access patterns.
Example: Social network with hot users
Tier 1 (Cluster): Label-based
- Cluster 0-5: Regular users (consistent hash)
- Cluster 6: VIP users (celebrities, influencers)
- Cluster 7: Business pages
- Cluster 8: Groups
Tier 2 (Proxy): Locality-aware
- VIP users: Co-locate with their posts
- Regular users: Consistent hash
Tier 3 (Partition): Access frequency
- Hot partitions: In-memory
- Warm partitions: Partial memory
- Cold partitions: S3-backed
Configuration:
partition_strategy:
type: hybrid
cluster_strategy:
type: label_based
labels:
regular_user: [0, 1, 2, 3, 4, 5]
vip_user: [6]
business_page: [7]
group: [8]
proxy_strategy:
type: locality_aware
co_locate_edge_labels:
- FRIENDS
- FOLLOWS
- MEMBER_OF
partition_strategy:
type: access_frequency
hot_threshold: 1000_requests_per_minute
cold_threshold: 10_requests_per_minute
Distributed Query Routing
Query Execution Models
Model 1: Gather-Scatter (Simple Traversals)
Use Case: Broad queries that touch many partitions
Query: Get all users in organization "acme-corp"
Step 1: Parse query
Start: organization:acme-corp
Traversal: IN(MEMBER_OF) → User vertices
Step 2: Identify partition scope
Query coordinator: Scan all partitions (can't prune)
Step 3: Scatter phase
Coordinator → Send subquery to all 1000 proxies in parallel
Step 4: Gather phase
Each proxy returns matching vertices
Coordinator aggregates results
Step 5: Return to client
Total: 50,000 users in org
Performance: O(num_partitions) parallelism
Model 2: Partition-Aware Pruning (Filtered Traversals)
Use Case: Queries with partition-prunable filters
Query: Get user "alice" and traverse 2 hops
Step 1: Parse query
Start: user:alice (deterministic partition via vertex ID)
Step 2: Identify start partition
vertex_id = "user:alice"
partition = hash("user:alice") = 07:0042:05
Step 3: Execute first hop locally
Proxy 07:0042 executes: user:alice.out(FOLLOWS)
Result: 200 friends (all may be on different partitions)
Step 4: Group by partition
Friends on Proxy 07:0042: 50 (local)
Friends on other proxies: 150 (remote)
Step 5: Second hop (distributed)
Execute out(FOLLOWS) on 50 local friends (fast)
Fan-out to 20 other proxies for 150 remote friends (parallel RPC)
Step 6: Aggregate
Coordinator collects results from 20 proxies
Total: 5,000 friends-of-friends
Performance: O(num_partitions_touched) << O(total_partitions)
Model 3: Index-Accelerated Queries
Use Case: Queries with indexed property filters
Query: Find users where city = "San Francisco" AND age > 25
Step 1: Check for index
Index exists: city_index (partition-local)
Step 2: Index scan (parallel across all partitions)
Each partition scans local city_index["San Francisco"]
Filter results by age > 25
Step 3: Return matches
Total: 125,000 users across 500 partitions
Performance: O(matches_per_partition), not O(total_vertices)
Cross-Partition Traversal Optimization
Problem: 2-hop query touches 1M vertices across 1000 partitions
Solution 1: Adaptive Parallelism
func (q *QueryCoordinator) ExecuteTraversal(start string, hops int) ([]Vertex, error) {
currentLevel := []string{start}
for hop := 0; hop < hops; hop++ {
// Group vertices by partition
partitionGroups := q.groupByPartition(currentLevel)
// Adaptive parallelism based on partition count
if len(partitionGroups) < 10 {
// Few partitions: Execute sequentially for each
nextLevel = q.executeSequential(partitionGroups)
} else if len(partitionGroups) < 100 {
// Medium partitions: Execute in batches of 10
nextLevel = q.executeParallelBatched(partitionGroups, 10)
} else {
// Many partitions: Full parallelism
nextLevel = q.executeFullParallel(partitionGroups)
}
currentLevel = nextLevel
}
return currentLevel, nil
}
Solution 2: Early Termination
// Limit traversal breadth to prevent explosion
func (q *QueryCoordinator) ExecuteTraversal(
start string,
hops int,
maxResultsPerHop int,
) ([]Vertex, error) {
currentLevel := []string{start}
for hop := 0; hop < hops; hop++ {
partitionGroups := q.groupByPartition(currentLevel)
nextLevel := []string{}
for _, group := range partitionGroups {
neighbors := q.executePartitionTraversal(group)
nextLevel = append(nextLevel, neighbors...)
// Early termination if too many results
if len(nextLevel) > maxResultsPerHop {
nextLevel = nextLevel[:maxResultsPerHop]
break
}
}
currentLevel = nextLevel
}
return currentLevel, nil
}
Solution 3: Partition-Local Caching
Query: Frequent traversal from popular vertex "celebrity_user"
Optimization:
1. First query: celebrity_user.out(FOLLOWS) → 10M followers
Cache result on query coordinator
2. Subsequent queries: Use cached follower list
TTL: 60 seconds
3. Invalidation: On edge create/delete to celebrity_user
Performance:
Cold query: 500ms (hits 200 partitions)
Warm query: 5ms (cached)
Query Coordinator Architecture
┌────────────────────────────────────────────────────────────────┐
│ Query Coordinator │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Query Parser │ │ Query Planner│ │ Result Cache │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Distributed Execution Engine │ │
│ │ - Partition pruning │ │
│ │ - Parallel scatter-gather │ │
│ │ - Adaptive parallelism │ │