RFC-058: Multi-Level Graph Indexing for Massive-Scale Queries
Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15
Abstract
This RFC defines a multi-level indexing strategy for massive-scale graph queries (100B vertices, 10T edges). At this scale, unindexed graph traversals become impractical—a property filter scan across 100B vertices takes hours even with perfect parallelization. Multi-level indexing enables sub-second queries by providing: (1) Partition-level indexes for local property lookups, (2) Cluster-level indexes for global property scans, (3) Edge indexes for fast traversal initiation, and (4) Label indexes for type-based filtering. Critically, all indexes must be built and maintained online while serving queries using incremental index construction and distributed Write-Ahead Log (WAL) synchronization.
Key Innovations:
- Four-Tier Index Hierarchy: Partition → Proxy → Cluster → Global
- Online Index Building: Construct indexes without blocking queries
- Incremental Updates: Update indexes via distributed WAL without full rebuilds
- Bloom Filter Cascade: Fast negative lookups across 1000+ partitions
- Inverted Edge Indexes: Accelerate backward traversals and fan-in queries
Motivation
The Index-Free Adjacency Myth
Graph databases often claim "index-free adjacency" as a selling point—edges are pointers, so traversals are O(1). While true for individual edge traversals, this breaks down at massive scale for traversal initiation and property-filtered traversals.
Problem 1: Traversal Initiation Without Indexes
Query: "Find all users where city = 'San Francisco'"
Without Index:
Scan all 100B vertices
Check city property for each
Time: 100B vertices ÷ 1M scan rate = 100,000 seconds (27 hours)
With Index:
Lookup city_index["San Francisco"] → 5M vertex IDs
Time: 5M vertices ÷ 1M scan rate = 5 seconds
Speedup: 20,000×
Problem 2: Property-Filtered Traversals
Query: "Friends of Alice who live in San Francisco"
Without Index:
1. Traverse alice.out(FRIENDS) → 1000 friends
2. For each friend, check if city == "San Francisco"
3. Load properties for 1000 friends (may require S3 fetches)
Time: 1000 property lookups × 50ms avg = 50 seconds
With Index:
1. Traverse alice.out(FRIENDS) → 1000 friend IDs
2. Intersect with city_index["San Francisco"]
3. Return intersection
Time: 1000 IDs × 10μs = 10ms
Speedup: 5,000×
Problem 3: Backward Traversals (Fan-In)
Query: "Who follows celebrity user 'taylor_swift'?"
Without Edge Index:
Scan all FOLLOWS edges (10T edges)
Check if target == taylor_swift
Time: Hours (even with parallelization)
With Inverted Edge Index:
Lookup in_edges[taylor_swift][FOLLOWS] → 100M followers
Time: 100ms (bitmap scan)
Speedup: 36,000×
Why Multi-Level Indexing?
At 100B scale with 1000+ nodes, single-level indexes are insufficient:
Problem: Global index lookup hits all nodes
Query: city = "San Francisco"
Single-Level Approach:
Query coordinator → Broadcast query to 1000 proxies
Each proxy scans local index
Return results to coordinator
Time: 1000 RPCs (even if each is 5ms) = 5+ seconds network overhead
Multi-Level Approach:
Query coordinator → Check cluster-level index
Cluster index says: "San Francisco users on clusters 2, 5, 7"
Query only 3 clusters (300 proxies) instead of all 1000
Time: 300 RPCs = 1.5 seconds
Speedup: 3.3×
Insight: Multi-level indexes enable partition pruning to avoid querying irrelevant nodes.
Goals
- Online Index Construction: Build indexes without blocking query traffic
- Incremental Updates: Update indexes via WAL without full rebuilds
- Multi-Level Hierarchy: Partition → Proxy → Cluster → Global indexes
- Fast Lookups: Sub-millisecond index lookups for common queries
- Space Efficiency: Indexes <20% of graph data size
- Partition Pruning: Skip irrelevant partitions using cluster-level indexes
- Consistency: WAL-synchronized index updates across 1000+ nodes
Non-Goals
- Full-Text Search: Use Elasticsearch/Solr for text search, not graph indexes
- Geospatial Indexes: Use PostGIS/S2 for spatial queries
- Machine Learning Embeddings: Use vector databases (Pinecone, Weaviate)
- Real-Time Streaming: Index updates are near-real-time (seconds), not microseconds
Four-Tier Index Hierarchy
Tier 1: Partition-Level Indexes (Local)
Location: On each partition (16 partitions per proxy, 1000 proxies)
Purpose: Enable fast property lookups within single partition
Index Types:
- Hash Index: Exact property match (e.g., user_id = "alice")
- Range Index: Numeric/timestamp range queries (e.g., age > 25)
- Inverted Index: Set membership (e.g., tags contains "vip")
- Edge Index: Outgoing/incoming edge lists per vertex
Example:
Partition 07:0042:05 Indexes:
1. Hash Index (user_id):
"alice" → vertex_offset_12345
"bob" → vertex_offset_67890
2. Range Index (age):
[18-24] → bitmap of vertex offsets
[25-34] → bitmap of vertex offsets
[35-44] → bitmap of vertex offsets
3. Inverted Index (city):
"San Francisco" → [vertex_offset_123, vertex_offset_456, ...]
"New York" → [vertex_offset_789, ...]
4. Edge Index (FOLLOWS):
Outgoing: vertex_12345 → [target_1, target_2, ...]
Incoming: vertex_12345 → [source_1, source_2, ...]
Storage Format:
message PartitionIndex {
string partition_id = 1; // "07:0042:05"
int64 vertex_count = 2;
int64 edge_count = 3;
int64 last_updated = 4; // Unix timestamp
// Schema version for index format evolution
// Version history:
// v1: Initial release (hash + range indexes)
// v2: Added inverted indexes (2025-01)
// v3: Added edge indexes (2025-03)
// v4: Roaring bitmap compression (2025-06)
// v5: Bloom filter optimization (2025-09)
int32 schema_version = 10; // Current: 5
// Hash indexes (property → vertex ID list)
map<string, PropertyHashIndex> hash_indexes = 5;
// Range indexes (property → range → bitmap)
map<string, PropertyRangeIndex> range_indexes = 6;
// Inverted indexes (property value → vertex ID list)
// Added in v2
map<string, InvertedIndex> inverted_indexes = 7;
// Edge indexes (label → vertex → targets/sources)
// Added in v3
map<string, EdgeIndex> edge_indexes = 8;
// Bloom filter for fast negative lookups
// Optimized in v5 (reduced false positive rate 1% → 0.1%)
bytes bloom_filter = 9;
}
message PropertyHashIndex {
// property_value → list of vertex offsets
map<string, VertexOffsetList> entries = 1;
}
message PropertyRangeIndex {
repeated RangeBucket buckets = 1;
}
message RangeBucket {
int64 min_value = 1;
int64 max_value = 2;
bytes vertex_bitmap = 3; // Roaring bitmap of vertex offsets (compressed bitmap format: 80:1 compression ratio)
}
message InvertedIndex {
// property_value → list of vertex IDs (full IDs, not offsets)
map<string, VertexIDList> postings = 1;
}
message EdgeIndex {
// vertex_id → list of edge targets
map<string, EdgeTargetList> outgoing = 1;
// vertex_id → list of edge sources
map<string, EdgeSourceList> incoming = 2;
}
Note on Roaring Bitmaps:
Throughout this RFC, bitmaps use Roaring Bitmap compression format for efficient storage and fast operations:
- Compression: 80:1 ratio for clustered IDs (e.g., 800 MB → 10 MB)
- Operations: Intersection, union in O(n) time where n = number of bitmap containers
- Use cases: Range index buckets, inverted edge indexes, partition bloom filters
See Inverted Edge Index section for detailed compression example.
Size Estimate:
Partition: 6.25M vertices, 62.5M edges
Hash Index (user_id):
6.25M entries × 50 bytes = 312 MB
Range Index (age):
100 buckets × 1 MB bitmap = 100 MB
Inverted Index (city):
10,000 cities × 625 vertices avg × 20 bytes = 125 MB
Edge Index:
6.25M vertices × 10 edges avg × 8 bytes = 500 MB
Bloom Filter: 10 MB
Total per partition: ~1 GB
Total per proxy (16 partitions): ~16 GB
Total cluster (1000 proxies): ~16 TB
Index Schema Versioning and Migration
Problem: At 100B scale with 16,000 partitions, upgrading index format without downtime requires schema versioning and graceful migration.
Schema Evolution Strategy:
type IndexMigrator struct {
currentVersion int32
}
func (im *IndexMigrator) LoadPartitionIndex(
partitionID string,
data []byte,
) (*PartitionIndex, error) {
// Deserialize index
index := &PartitionIndex{}
proto.Unmarshal(data, index)
// Check schema version
if index.SchemaVersion == 0 {
// Legacy index (pre-versioning)
index = im.MigrateFromV0(index)
}
// Upgrade if needed
for index.SchemaVersion < im.currentVersion {
switch index.SchemaVersion {
case 1:
index = im.MigrateV1ToV2(index)
case 2:
index = im.MigrateV2ToV3(index)
case 3:
index = im.MigrateV3ToV4(index)
case 4:
index = im.MigrateV4ToV5(index)
default:
return nil, fmt.Errorf("unknown version: %d", index.SchemaVersion)
}
}
return index, nil
}
func (im *IndexMigrator) MigrateV2ToV3(index *PartitionIndex) *PartitionIndex {
// v2 → v3: Add edge indexes
log.Infof("Migrating partition %s from v2 to v3 (adding edge indexes)", index.PartitionId)
// Edge indexes built on first query (lazy initialization)
index.EdgeIndexes = make(map[string]*EdgeIndex)
index.SchemaVersion = 3
return index
}
func (im *IndexMigrator) MigrateV3ToV4(index *PartitionIndex) *PartitionIndex {
// v3 → v4: Compress range index bitmaps with Roaring
log.Infof("Migrating partition %s from v3 to v4 (Roaring compression)", index.PartitionId)
for propName, rangeIndex := range index.RangeIndexes {
for i, bucket := range rangeIndex.Buckets {
// Convert bitmap to Roaring format
roaringBitmap := im.ConvertToRoaring(bucket.VertexBitmap)
index.RangeIndexes[propName].Buckets[i].VertexBitmap = roaringBitmap
}
}
index.SchemaVersion = 4
return index
}
Upgrade Path:
migration_strategy:
# Phase 1: Deploy new proxy version (supports v1-v5 read)
phase_1:
duration: 1 week
action: Rolling deployment of proxies with multi-version reader
risk: None (read-only, backward compatible)
# Phase 2: Background migration (v1-v4 → v5)
phase_2:
duration: 2 weeks
action: Migrate partitions during low-traffic periods
rate_limit: 10 partitions/minute (avoid overload)
progress: Track via metrics (16,000 partitions total)
# Phase 3: Deprecate old formats
phase_3:
duration: 1 month
action: Remove v1-v4 read support from codebase
requirement: 100% of partitions migrated to v5
Migration Metrics:
# Prometheus metrics for migration tracking
prism_index_version{partition_id="07:0042:05"} 5
prism_index_migrations_total{from_version="3",to_version="4"} 1200
prism_index_migration_duration_seconds{from_version="3",to_version="4"} 2.5
Backward Compatibility Guarantee:
- Proxies ALWAYS support reading N-1 versions (e.g., v5 proxy reads v4, v5)
- Migration happens asynchronously, never blocking queries
- Indexes written in old format continue to work until migrated
- Zero downtime guarantee during schema upgrades
Tier 2: Proxy-Level Indexes (Aggregated)
Location: On each proxy (1000 proxies)
Purpose: Enable cross-partition queries within single proxy
Index Types:
- Partition Directory: Which partitions contain which property values
- Min/Max Statistics: Range bounds per partition
- Cardinality Estimates: How many matches per partition
Example:
Proxy 0042 Index (16 partitions):
Partition Directory (city):
"San Francisco":
- Partition 07:0042:01 (1,250 vertices)
- Partition 07:0042:05 (3,000 vertices)
- Partition 07:0042:12 (800 vertices)
"New York":
- Partition 07:0042:03 (2,500 vertices)
- Partition 07:0042:08 (1,800 vertices)
Min/Max Statistics (age):
Partition 07:0042:01: min=18, max=65
Partition 07:0042:02: min=22, max=58
...
Cardinality Estimates:
city="San Francisco": 5,050 vertices (across 3 partitions)
age>30: 42,000 vertices (across 12 partitions)
Query Optimization:
Query: city = "San Francisco" AND age > 50
Without Proxy Index:
Scan all 16 partitions
16 partition index lookups
With Proxy Index:
1. Check partition directory: Only partitions 1, 5, 12 have San Francisco
2. Check min/max: All 3 partitions have age > 50
3. Scan only 3 partitions
Result: 3 partition lookups instead of 16 (5.3× faster)
Tier 3: Cluster-Level Indexes (Regional)
Location: On each cluster gateway (10 clusters)
Purpose: Enable partition pruning across proxies in cluster
Index Types:
- Proxy Directory: Which proxies contain which property values
- Bloom Filter Cascade: Fast negative lookups across 100 proxies
- Hot Property Cache: Most frequently queried property values
Example:
Cluster 07 Index (100 proxies):
Proxy Directory (city):
"San Francisco":
- Proxies: 5, 12, 18, 23, 34, 42, 51, 67, 78, 89 (10 proxies)
- Estimated vertices: 125,000
"New York":
- Proxies: 3, 8, 15, 22, 29, 37, 44, 56, 63, 71 (10 proxies)
- Estimated vertices: 200,000
Bloom Filter Cascade:
For each proxy: Bloom filter of all property values
Query: "Does proxy 42 have city='San Francisco'?"
Answer: Check bloom filter (10 μs) → Yes/No
Hot Property Cache (top 100 most queried):
city="San Francisco" → [list of proxy IDs]
user_id="taylor_swift" → proxy_45
organization_id="acme-corp" → proxy_23
Query Optimization:
Query: city = "San Francisco"
Without Cluster Index:
Query all 100 proxies in cluster
With Cluster Index:
1. Check proxy directory
2. Identify 10 proxies with San Francisco data
3. Query only those 10 proxies
Result: 10 proxy queries instead of 100 (10× faster)
Tier 4: Global Index (Cross-Cluster)
Location: Global query coordinator
Purpose: Route queries to relevant clusters
Index Types:
- Cluster Directory: Which clusters contain which data
- Data Distribution Stats: Vertex/edge counts per cluster
- Query Routing Hints: Pre-computed routing for common queries
Example:
Global Index (10 clusters):
Cluster Directory (geographic data):
US users: Clusters 0, 1, 2, 3
Europe users: Clusters 4, 5, 6
Asia users: Clusters 7, 8, 9
Cluster Directory (vertex labels):
User vertices: All clusters (uniform distribution)
Post vertices: Clusters 2, 5, 7 (80% of posts)
Organization vertices: Cluster 1 (centralized)
Query Routing Hints:
Query pattern: "FOLLOWS graph in US"
→ Route to clusters 0, 1, 2, 3 only
Query pattern: "Organization dependencies"
→ Route to cluster 1 first, then expand if needed
Query Optimization:
Query: User vertices where country = "Germany"
Without Global Index:
Query all 10 clusters
With Global Index:
1. Check cluster directory
2. German users are in clusters 4, 5
3. Query only clusters 4, 5
Result: 2 cluster queries instead of 10 (5× faster)
Online Index Building
Challenge: Build Indexes Without Downtime
At 100B scale, building indexes from scratch takes days:
Estimate:
100B vertices × 100 bytes per index entry = 10 TB index data
Write speed: 100 MB/s (single-threaded)
Time: 10 TB ÷ 100 MB/s = 100,000 seconds = 27 hours
Solution: Incremental index construction using multi-phase approach.
Phase 1: Partition-Parallel Index Construction
Approach: Build indexes for each partition independently in parallel
Architecture:
1000 proxies × 16 partitions = 16,000 index build tasks
Run 16,000 tasks in parallel
Each task builds index for ~6.25M vertices
Time:
Per-partition build: 6.25M vertices × 10 μs = 62.5 seconds
Parallel across 16,000 partitions: 62.5 seconds total
Result: Complete partition indexes in ~1 minute
Implementation:
func (pm *PartitionManager) BuildPartitionIndex(partitionID string) error {
partition := pm.GetPartition(partitionID)
// Create index builders
builders := []IndexBuilder{
NewHashIndexBuilder("user_id"),
NewRangeIndexBuilder("age"),
NewInvertedIndexBuilder("city"),
NewEdgeIndexBuilder("FOLLOWS"),
}
// Scan all vertices in partition
scanner := partition.NewVertexScanner()
for scanner.Next() {
vertex := scanner.Vertex()
// Update all index builders
for _, builder := range builders {
builder.Add(vertex)
}
}
// Finalize indexes
index := &PartitionIndex{
PartitionID: partitionID,
VertexCount: scanner.Count(),
LastUpdated: time.Now().Unix(),
}
for _, builder := range builders {
builder.Finalize(index)
}
// Write index to storage
return pm.WritePartitionIndex(partitionID, index)
}
Phase 2: Query-Serving with Incomplete Indexes
Problem: Some partitions have indexes, others don't
Solution: Graceful degradation with index availability flags
func (qe *QueryExecutor) ExecuteQuery(query *Query) (*QueryResult, error) {
partitions := qe.IdentifyPartitions(query)
results := []*PartialResult{}
for _, partition := range partitions {
// Check if index exists
if partition.HasIndex(query.IndexedProperty) {
// Fast path: Use index
result := partition.QueryWithIndex(query)
results = append(results, result)
} else {
// Slow path: Full scan
log.Warnf("Partition %s lacks index for %s, using full scan",
partition.ID, query.IndexedProperty)
result := partition.QueryWithFullScan(query)
results = append(results, result)
}
}
return qe.MergeResults(results), nil
}
User Experience:
Query: city = "San Francisco"
Time T0 (no indexes):
Query latency: 50 seconds (full scan of 100B vertices)
Time T1 (30% of partitions have indexes):
Query latency: 15 seconds (indexed) + 35 seconds (scan) = 35 seconds
Time T2 (70% of partitions have indexes):
Query latency: 5 seconds (indexed) + 15 seconds (scan) = 15 seconds
Time T3 (100% of partitions have indexes):
Query latency: 2 seconds (fully indexed)
Phase 3: Incremental Index Updates via WAL
Approach: Update indexes in near-real-time as writes occur
Write Path:
1. Client writes vertex/edge
2. Write appended to distributed WAL (Kafka)
3. WAL consumer on each partition applies write
4. Index updater processes WAL entry
WAL Entry Format:
- operation: CREATE_VERTEX, UPDATE_VERTEX, DELETE_VERTEX, CREATE_EDGE, DELETE_EDGE
- partition_id: Target partition
- vertex_id: Affected vertex
- properties: Changed properties
- timestamp: Write timestamp
Index Update Algorithm:
func (iu *IndexUpdater) ProcessWALEntry(entry *WALEntry) error {
partition := iu.GetPartition(entry.PartitionID)
switch entry.Operation {
case CREATE_VERTEX:
// Add to all applicable indexes
partition.HashIndex["user_id"].Add(entry.Vertex.ID, entry.Vertex.Properties["user_id"])
partition.RangeIndex["age"].Add(entry.Vertex.ID, entry.Vertex.Properties["age"])
partition.InvertedIndex["city"].Add(entry.Vertex.Properties["city"], entry.Vertex.ID)
case UPDATE_VERTEX:
// Remove old values, add new values
oldProps := partition.GetVertexProperties(entry.Vertex.ID)
partition.InvertedIndex["city"].Remove(oldProps["city"], entry.Vertex.ID)
partition.InvertedIndex["city"].Add(entry.Vertex.Properties["city"], entry.Vertex.ID)
case DELETE_VERTEX:
// Remove from all indexes
partition.HashIndex["user_id"].Remove(entry.Vertex.ID)
partition.RangeIndex["age"].Remove(entry.Vertex.ID)
partition.InvertedIndex["city"].RemoveVertex(entry.Vertex.ID)
case CREATE_EDGE:
// Update edge indexes
partition.EdgeIndex["FOLLOWS"].AddOutgoing(entry.Edge.FromVertexID, entry.Edge.ToVertexID)
partition.EdgeIndex["FOLLOWS"].AddIncoming(entry.Edge.ToVertexID, entry.Edge.FromVertexID)
case DELETE_EDGE:
partition.EdgeIndex["FOLLOWS"].RemoveOutgoing(entry.Edge.FromVertexID, entry.Edge.ToVertexID)
partition.EdgeIndex["FOLLOWS"].RemoveIncoming(entry.Edge.ToVertexID, entry.Edge.FromVertexID)
}
return nil
}
Consistency Guarantees:
Consistency Model: Eventual Consistency
Timeline:
T0: Write committed to WAL (durable)
T0 + 50ms: Index update applied on partition
T0 + 100ms: Proxy-level index updated
T0 + 500ms: Cluster-level index updated
T0 + 1s: Global index updated
Query Behavior:
- Queries see index updates within 1 second
- Stale index entries possible for 1 second window
- Applications can specify "read_after_write" consistency for critical queries
Phase 4: Background Index Optimization
Problem: Incremental updates fragment indexes
Solution: Periodic background compaction
func (io *IndexOptimizer) CompactPartitionIndexes() {
ticker := time.NewTicker(1 * time.Hour)
for range ticker.C {
partitions := io.GetAllPartitions()
for _, partition := range partitions {
// Check if compaction needed
if partition.IndexFragmentation() > 0.2 {
// Rebuild index in background
go io.RebuildPartitionIndex(partition.ID)
}
}
}
}
func (io *IndexOptimizer) RebuildPartitionIndex(partitionID string) {
partition := io.GetPartition(partitionID)
// Build new index (doesn't block queries)
newIndex := partition.BuildFreshIndex()
// Atomic swap
partition.SwapIndex(newIndex)
log.Infof("Compacted index for partition %s, fragmentation: %0.1f%% → %0.1f%%",
partitionID, partition.OldFragmentation*100, partition.NewFragmentation*100)
}
Distributed WAL for Index Synchronization
Why Distributed WAL?
At 1000+ nodes, centralized WAL (single Kafka cluster) becomes bottleneck:
Centralized WAL Limits:
- Kafka partition limit: ~1000 partitions
- Write throughput: 100k writes/sec per partition
- Total throughput: 100M writes/sec (hard limit)
Distributed WAL Solution:
- 10 Kafka clusters (one per graph cluster)
- Each Kafka cluster handles 100 proxies
- Total throughput: 1B writes/sec (10× higher)
Distributed WAL Architecture
┌────────────────────────────────────────────────────────────────┐
│ Graph Cluster 0 │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ WAL Cluster 0 (Kafka) │ │
│ │ - 100 partitions (one per proxy) │ │
│ │ - Replication factor: 3 │ │
│ │ - Retention: 7 days │ │
│ │ - Throughput: 100M writes/sec │ │
│ └────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌───────────┼───────────┬─────────────┐ │
│ │ │ │ │ │
│ ┌────▼────┐ ┌───▼──────┐ ┌──▼───────┐ ┌──▼──────────┐ │
│ │ Proxy 0 │ │ Proxy 1 │ │ Proxy 2 │ │ Proxy 99 │ │
│ │ │ │ │ │ │ │ │ │
│ │ WAL │ │ WAL │ │ WAL │ │ WAL │ │
│ │Consumer │ │Consumer │ │Consumer │ │Consumer │ │
│ └─────────┘ └──────────┘ └──────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
WAL Message Format
message WALMessage {
// Message metadata
string message_id = 1; // UUID
int64 timestamp = 2; // Unix timestamp (nanoseconds)
string source_proxy_id = 3; // Originating proxy
string cluster_id = 4; // Graph cluster
// Partition routing
string partition_id = 5; // Target partition
// Operation
WALOperation operation = 6;
// Ordering
int64 sequence_number = 7; // Per-partition sequence
string causality_token = 8; // For causal consistency
}
message WALOperation {
oneof op {
CreateVertexOp create_vertex = 1;
UpdateVertexOp update_vertex = 2;
DeleteVertexOp delete_vertex = 3;
CreateEdgeOp create_edge = 4;
DeleteEdgeOp delete_edge = 5;
}
}
message CreateVertexOp {
string vertex_id = 1;
string label = 2;
map<string, bytes> properties = 3;
repeated string tags = 4; // For authorization (RFC-061)
}
message UpdateVertexOp {
string vertex_id = 1;
map<string, bytes> updated_properties = 2; // Only changed properties
repeated string removed_properties = 3;
}
message DeleteVertexOp {
string vertex_id = 1;
bool cascade_edges = 2; // Delete connected edges
}
message CreateEdgeOp {
string edge_id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, bytes> properties = 5;
}
message DeleteEdgeOp {
string edge_id = 1;
}
WAL Consumer with Index Updates
func (wc *WALConsumer) ConsumeWAL(partitionID string) {
consumer := wc.KafkaConsumer.Subscribe([]string{wc.Topic}, partitionID)
for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
continue
}
walMsg := &WALMessage{}
proto.Unmarshal(msg.Value, walMsg)
// Apply to graph storage
err := wc.ApplyToGraph(walMsg)
if err != nil {
log.Errorf("Failed to apply WAL message: %v", err)
continue
}
// Update indexes
err = wc.UpdateIndexes(walMsg)
if err != nil {
log.Errorf("Failed to update indexes: %v", err)
// Don't fail - index updates are best-effort
}
// Commit offset
consumer.CommitMessage(msg)
}
}
func (wc *WALConsumer) UpdateIndexes(msg *WALMessage) error {
partition := wc.GetPartition(msg.PartitionID)
switch op := msg.Operation.Op.(type) {
case *WALOperation_CreateVertex:
// Update partition-level indexes
partition.UpdateIndexesForVertexCreate(op.CreateVertex)
// Update proxy-level indexes
wc.ProxyIndexManager.UpdateForVertexCreate(msg.PartitionID, op.CreateVertex)
// Async: Update cluster-level indexes
wc.ClusterIndexUpdater.EnqueueUpdate(msg)
case *WALOperation_UpdateVertex:
partition.UpdateIndexesForVertexUpdate(op.UpdateVertex)
wc.ProxyIndexManager.UpdateForVertexUpdate(msg.PartitionID, op.UpdateVertex)
wc.ClusterIndexUpdater.EnqueueUpdate(msg)
// ... other operations
}
return nil
}
Bloom Filter Cascade for Fast Negative Lookups
Problem: False Positives Waste Query Time
Query: user_id = "nonexistent_user_123"
Without Bloom Filters:
1. Query all 1000 proxies
2. Each proxy checks its index
3. All return "not found"
Time: 1000 RPCs × 5ms = 5 seconds (wasted)
With Bloom Filters:
1. Check bloom filter for each proxy (fast)
2. Bloom filter says "definitely not here" for 999 proxies
3. Query only 1 proxy (false positive)
4. Return "not found"
Time: 1000 bloom checks × 10μs + 1 RPC × 5ms = 15ms
Speedup: 333×
Bloom Filter Hierarchy
Level 1: Partition Bloom Filters (16 per proxy)
Size: 1 MB per partition
False positive rate: 0.1%
Build time: 62 seconds per partition
Level 2: Proxy Bloom Filters (1000 total)
Size: 16 MB per proxy (union of 16 partition filters)
False positive rate: 0.1%
Build time: 10 seconds per proxy
Level 3: Cluster Bloom Filters (10 total)
Size: 160 MB per cluster (union of 100 proxy filters)
False positive rate: 0.2%
Build time: 100 seconds per cluster
Level 4: Global Bloom Filter (1 total)
Size: 1.6 GB (union of 10 cluster filters)
False positive rate: 0.5%
Build time: 1000 seconds (16 minutes)
Bloom Filter Implementation
type BloomFilterCascade struct {
partitionFilters map[string]*bloom.BloomFilter // 16,000 filters
proxyFilters map[string]*bloom.BloomFilter // 1,000 filters
clusterFilters map[string]*bloom.BloomFilter // 10 filters
globalFilter *bloom.BloomFilter // 1 filter
}
func (bfc *BloomFilterCascade) MayContain(vertexID string) *QueryHints {
hints := &QueryHints{}
// Level 1: Check global filter
if !bfc.globalFilter.Contains(vertexID) {
hints.DefinitelyNotExists = true
return hints
}
// Level 2: Check cluster filters
candidateClusters := []string{}
for clusterID, filter := range bfc.clusterFilters {
if filter.Contains(vertexID) {
candidateClusters = append(candidateClusters, clusterID)
}
}
if len(candidateClusters) == 0 {
hints.DefinitelyNotExists = true
return hints
}
// Level 3: Check proxy filters (within candidate clusters)
candidateProxies := []string{}
for _, clusterID := range candidateClusters {
for proxyID, filter := range bfc.GetProxyFiltersForCluster(clusterID) {
if filter.Contains(vertexID) {
candidateProxies = append(candidateProxies, proxyID)
}
}
}
hints.CandidateClusters = candidateClusters
hints.CandidateProxies = candidateProxies
return hints
}
Inverted Edge Indexes for Backward Traversals
Problem: Backward Traversals Are Expensive
Example: "Who follows @taylor_swift?"
Forward Edge (default storage):
user:alice → FOLLOWS → user:taylor_swift
Backward Traversal (without index):
Scan ALL vertices, check if they have FOLLOWS edge to taylor_swift
Time: 100B vertices scan = 27 hours
Backward Traversal (with inverted edge index):
Lookup incoming_edges[user:taylor_swift][FOLLOWS] → bitmap of followers
Time: Bitmap scan = 100ms
Speedup: 972,000×
Inverted Edge Index Structure
Forward Edge Index (default):
source_vertex → edge_label → list of target vertices
Example:
user:alice → FOLLOWS → [user:bob, user:charlie, user:taylor_swift]
Inverted Edge Index (for backward traversals):
target_vertex → edge_label → list of source vertices
Example:
user:taylor_swift → FOLLOWS → [user:alice, user:bob, user:charlie, ...]
(100M followers)
Storage Format:
message InvertedEdgeIndex {
string partition_id = 1;
// target_vertex → edge_label → bitmap of source vertices
map<string, EdgeLabelBitmaps> inverted_edges = 2;
}
message EdgeLabelBitmaps {
// edge_label → compressed bitmap of source vertex IDs
map<string, bytes> label_bitmaps = 1;
}
Compression: Use Roaring Bitmaps
Uncompressed:
100M follower IDs × 8 bytes = 800 MB
Roaring Bitmap Compressed:
100M follower IDs (clustered) → ~10 MB
Compression ratio: 80:1
Fan-In Query Optimization
Query: "Users who follow BOTH @taylor_swift AND @beyonce"
Without Inverted Index:
1. Find all users (100B vertices)
2. For each user, check if they follow both
Time: 100B × 2 edge checks = Days
With Inverted Index:
1. Get followers of taylor_swift: bitmap_1 (100M bits)
2. Get followers of beyonce: bitmap_2 (50M bits)
3. Compute intersection: bitmap_1 AND bitmap_2
4. Result: 5M mutual followers
Time: Bitmap AND operation = 50ms
Speedup: >1,000,000×
Implementation
func (pm *PartitionManager) GetIncomingEdges(vertexID string, edgeLabel string) ([]string, error) {
partition := pm.GetPartitionForVertex(vertexID)
// Check if inverted index exists
invertedIndex := partition.GetInvertedEdgeIndex()
if invertedIndex == nil {
return nil, errors.New("inverted edge index not available")
}
// Lookup bitmap of source vertices
bitmapData := invertedIndex.InvertedEdges[vertexID].LabelBitmaps[edgeLabel]
bitmap := roaring.New()
bitmap.UnmarshalBinary(bitmapData)
// Convert bitmap to vertex ID list
sources := []string{}
for _, vertexOffset := range bitmap.ToArray() {
vertexID := partition.OffsetToVertexID(vertexOffset)
sources = append(sources, vertexID)
}
return sources, nil
}
Index Storage and Persistence
Index Storage Locations
| Index Tier | Storage Location | Durability | Access Latency |
|---|---|---|---|
| Partition Index | In-memory (proxy RAM) + S3 backup | Durable via S3 | 1 μs (memory), 100 ms (S3) |
| Proxy Index | In-memory (proxy RAM) | Ephemeral (rebuilt on restart) | 1 μs |
| Cluster Index | In-memory (cluster gateway) | Ephemeral | 1 μs |
| Global Index | In-memory (query coordinator) | Ephemeral | 1 μs |
Index Persistence Strategy
Partition Indexes (durable):
Location: S3 bucket per cluster
Path: s3://prism-indexes-cluster-07/partition-0042-05/index-v1.pb
Update frequency: Every 5 minutes
Format: Protobuf + gzip compression
Proxy Indexes (ephemeral):
Rebuilt from partition indexes on proxy restart
Build time: 10 seconds per proxy
Cluster Indexes (ephemeral):
Rebuilt from proxy indexes on cluster gateway restart
Build time: 100 seconds per cluster
Global Index (ephemeral):
Rebuilt from cluster indexes on coordinator restart
Build time: 1000 seconds (16 minutes)
Index Snapshot Format
S3 Path:
s3://prism-indexes-cluster-{cluster_id}/
partition-{proxy_id}-{partition_id}/
index-{version}.pb.gz
bloom-filter-{version}.bin
inverted-edges-{version}.roaring
Example:
s3://prism-indexes-cluster-07/
partition-0042-05/
index-v20251115.pb.gz (1 GB compressed)
bloom-filter-v20251115.bin (10 MB)
inverted-edges-v20251115.roaring (500 MB compressed)
Performance Characteristics
Index Build Times
| Index Type | Vertices | Build Time (Single-threaded) | Build Time (Parallel) |
|---|---|---|---|
| Partition Hash Index | 6.25M | 62 seconds | 62 seconds |
| Partition Range Index | 6.25M | 125 seconds | 125 seconds |
| Partition Inverted Index | 6.25M | 187 seconds | 187 seconds |
| Partition Edge Index | 62.5M edges | 312 seconds | 312 seconds |
| Full Partition Index | 6.25M vertices | 686 seconds (11 min) | 686 seconds |
| All Partitions (16,000) | 100B vertices | 127 days (serial) | 11 minutes (parallel) |
Query Speedups with Indexes
| Query Pattern | Without Index | With Index | Speedup |
|---|---|---|---|
| Exact vertex lookup | 27 hours (full scan) | 50 μs (hash index) | 1,944,000,000× |
| Property filter (city="SF") | 27 hours | 2 seconds | 48,600× |
| Range query (age>30) | 27 hours | 5 seconds | 19,440× |
| Backward traversal (followers) | 27 hours | 100 ms | 972,000× |
| Fan-in intersection | Weeks | 50 ms | >10,000,000× |
Index Space Overhead
Graph Data:
100B vertices × 100 bytes = 10 TB
10T edges × 20 bytes = 200 TB
Total: 210 TB
Index Data:
Partition indexes: 16 TB
Inverted edge indexes: 8 TB
Bloom filters: 1.6 GB
Total: 24 TB
Overhead: 24 TB ÷ 210 TB = 11.4%
Index Tiering Strategy
Problem: At 100B scale, index memory requirements exceed available capacity (MEMO-050 Finding 5).
Memory Capacity Reconciliation
Original Memory Budget (RFC-057):
Total memory: 30 TB (1000 proxies × 30 GB each)
Hot data allocation: 21 TB (10% of 210 TB graph)
Index allocation: 16 TB (partition indexes)
Total needed: 21 TB + 16 TB = 37 TB ❌
Available: 30 TB
Shortfall: 7 TB (23% over budget)
Root Cause: All indexes assumed to be in memory simultaneously. At massive scale, not all indexes are equally valuable—most queries touch a small fraction of partitions.
Index Temperature Classification
Similar to data tiers (RFC-059), indexes should be classified by access frequency:
| Temperature | Access Pattern | Storage | Size | Example |
|---|---|---|---|---|
| Hot | >1000 req/min | Memory (RAM) | 4.8 TB | Active user indexes, trending topics |
| Warm | 10-1000 req/min | Memory (evictable) | 6.4 TB | Recent activity indexes |
| Cold | <10 req/min | S3 (on-demand load) | 12.8 TB | Archive indexes, dormant users |
Power-Law Distribution: At scale, index access follows Zipf distribution:
- Top 20% of indexes handle 80% of queries (hot)
- Next 30% handle 15% of queries (warm)
- Bottom 50% handle 5% of queries (cold)
Revised Memory Budget
With Index Tiering:
Hot Data (RFC-057):
Graph data: 21 TB (10% of 210 TB)
Hot Indexes (30% of total indexes in memory):
Partition indexes: 4.8 TB (30% of 16 TB)
Inverted edge indexes: 2.4 TB (30% of 8 TB)
Bloom filters: 1.6 GB (all in memory - cheap)
Total hot indexes: 7.2 TB
Total memory used: 21 TB + 7.2 TB = 28.2 TB ✅
Available: 30 TB
Headroom: 1.8 TB (6% buffer for spikes)
Cost Savings: Storing 70% of indexes on S3 instead of RAM:
- Cold indexes: 16.8 TB × $23/TB/month (S3) = $386/month
- vs in-memory: 16.8 TB × $500/TB/month (RAM) = $8,400/month
- Savings: $8,014/month = $96k/year per 16.8 TB
Index Promotion/Demotion Logic
Promotion Criteria (Cold → Warm → Hot):
type IndexTemperature struct {
PartitionID string
RequestsPerMinute float64
LastAccessTime time.Time
LoadTimeP99 time.Duration
CurrentState TemperatureState
}
func (im *IndexManager) EvaluateTemperature(index *IndexTemperature) TemperatureState {
rpm := index.RequestsPerMinute
// Promotion thresholds
if rpm > 1000 && index.CurrentState != Hot {
return Hot // Immediate promotion to hot
}
if rpm > 10 && index.CurrentState == Cold {
return Warm // Cold → Warm
}
// Demotion thresholds (with hysteresis to prevent thrashing)
if rpm < 8 && index.CurrentState == Warm { // 20% hysteresis below 10
if time.Since(index.LastAccessTime) > 10*time.Minute {
return Cold
}
}
if rpm < 800 && index.CurrentState == Hot { // 20% hysteresis below 1000
if time.Since(index.LastAccessTime) > 5*time.Minute {
return Warm
}
}
return index.CurrentState // No state change
}
Background Process:
func (im *IndexManager) TemperatureMonitor() {
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
for _, index := range im.GetAllIndexes() {
currentTemp := index.Temperature
evaluatedTemp := im.EvaluateTemperature(index)
if evaluatedTemp != currentTemp {
switch evaluatedTemp {
case Hot:
im.LoadIndexToMemory(index.PartitionID, PriorityHigh)
case Warm:
im.LoadIndexToMemory(index.PartitionID, PriorityMedium)
case Cold:
im.OffloadIndexToS3(index.PartitionID)
}
}
}
}
}
Performance Trade-offs
| Operation | Hot Index | Warm Index | Cold Index |
|---|---|---|---|
| Index Lookup | 50 μs | 50 μs (if cached) | 100 ms (S3 load) |
| First Query After Load | 50 μs | 2 ms (partial load) | 5 s (full S3 download) |
| Memory Footprint | 7.2 TB | +6.4 TB (evictable) | 0 TB (S3) |
| Cost per TB/month | $500 (RAM) | $500 (RAM) | $23 (S3) |
Query Impact:
- Hot partition query: Zero impact (index already in memory)
- Warm partition query: ~2 ms first query, then cached
- Cold partition query: ~5 s first query (rare: <5% of queries)
Acceptable Trade-off: 95% of queries hit hot/warm indexes (<2 ms overhead), 5% pay S3 load cost (~5s). At 1B queries/sec, only 50M queries/sec hit cold indexes.
Integration with RFC-059 Data Tiers
Coordinated Temperature Management:
partition_temperature_policy:
# Data and indexes move together
hot_partition:
data: memory (RFC-059)
indexes: memory (this RFC)
warm_partition:
data: partial_memory (RFC-059)
indexes: memory_evictable (this RFC)
cold_partition:
data: s3 (RFC-059)
indexes: s3 (this RFC)
# Co-location optimization
co_locate_index_with_data: true # Load index when data loads
# Preemptive loading
prefetch_on_access_pattern: true # Load index if query pattern suggests next access
Example Workflow:
- Cold partition receives query → Load data from S3 (~5s) + Load index from S3 (~5s in parallel) = ~5s total
- Partition promoted to Warm → Keep data + index in memory
- Continued high access → Promoted to Hot → Locked in memory (won't be evicted)
Alternative Approaches Considered
Option 1: Partial Indexes (Rejected)
- Approach: Keep only top-K entries per index in memory
- Rejected Why: Incomplete indexes break query correctness; can't differentiate "not found" vs "not indexed"
Option 2: Index Compression (Evaluated)
- Approach: Compress cold indexes 10× using zstd
- Trade-off: 10× compression → 3× slower decompression (30 ms vs 3 ms)
- Decision: Use compression for S3 storage, but decompress fully on load to memory
Option 3: No Indexes for Cold Data (Rejected)
- Approach: Cold partition queries use full scan instead of indexes
- Rejected Why: 1.56M vertices × 100 bytes = 156 MB scan @ 1 GB/s = 156 ms minimum (vs 5s S3 load with index = faster queries)
Summary
Index tiering solves the 37 TB → 30 TB memory capacity problem by applying temperature-based management to indexes, mirroring data tier strategy (RFC-059). Key results:
- Memory reconciliation: 28.2 TB used (vs 37 TB needed without tiering)
- Headroom: 1.8 TB buffer (6%) for traffic spikes
- Cost savings: $96k/year for every 16.8 TB moved to S3
- Query impact: 95% of queries unaffected (<2 ms overhead)
- Scalability: Enables 100B scale within 30 TB memory budget
Related RFCs
- RFC-057: Massive-Scale Graph Sharding - Distributed sharding architecture
- RFC-059: Hot/Cold Storage Tiers with S3 - Storage tier management
- RFC-060: Distributed Gremlin Query Execution - Query optimization
- RFC-061: Graph Authorization with Vertex Labels - Authorization model
- RFC-055: Graph Pattern - Base graph pattern
- RFC-051: Write-Ahead Log Pattern - WAL architecture
Open Questions
- Index Consistency: Strong vs eventual consistency for index updates?
- Partial Index Queries: How to handle queries when indexes incomplete?
- Index Versioning: How to upgrade index format without rebuild?
- Cross-Partition Joins: How to optimize joins across multiple partitions?
- Index Compression Trade-offs: More compression vs faster access?
References
- Roaring Bitmaps - Compressed bitmap library
- Bloom Filters in Practice
- Facebook TAO: Distributed Data Store for Social Graph
- Google Caffeine: Incremental Index Updates
- Apache Lucene Inverted Index
Revision History
- 2025-11-15: Initial draft - Multi-level indexing with online building and distributed WAL