Skip to main content

RFC-058: Multi-Level Graph Indexing for Massive-Scale Queries

Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15

Abstract

This RFC defines a multi-level indexing strategy for massive-scale graph queries (100B vertices, 10T edges). At this scale, unindexed graph traversals become impractical—a property filter scan across 100B vertices takes hours even with perfect parallelization. Multi-level indexing enables sub-second queries by providing: (1) Partition-level indexes for local property lookups, (2) Cluster-level indexes for global property scans, (3) Edge indexes for fast traversal initiation, and (4) Label indexes for type-based filtering. Critically, all indexes must be built and maintained online while serving queries using incremental index construction and distributed Write-Ahead Log (WAL) synchronization.

Key Innovations:

  • Four-Tier Index Hierarchy: Partition → Proxy → Cluster → Global
  • Online Index Building: Construct indexes without blocking queries
  • Incremental Updates: Update indexes via distributed WAL without full rebuilds
  • Bloom Filter Cascade: Fast negative lookups across 1000+ partitions
  • Inverted Edge Indexes: Accelerate backward traversals and fan-in queries

Motivation

The Index-Free Adjacency Myth

Graph databases often claim "index-free adjacency" as a selling point—edges are pointers, so traversals are O(1). While true for individual edge traversals, this breaks down at massive scale for traversal initiation and property-filtered traversals.

Problem 1: Traversal Initiation Without Indexes

Query: "Find all users where city = 'San Francisco'"

Without Index:
Scan all 100B vertices
Check city property for each
Time: 100B vertices ÷ 1M scan rate = 100,000 seconds (27 hours)

With Index:
Lookup city_index["San Francisco"] → 5M vertex IDs
Time: 5M vertices ÷ 1M scan rate = 5 seconds
Speedup: 20,000×

Problem 2: Property-Filtered Traversals

Query: "Friends of Alice who live in San Francisco"

Without Index:
1. Traverse alice.out(FRIENDS) → 1000 friends
2. For each friend, check if city == "San Francisco"
3. Load properties for 1000 friends (may require S3 fetches)
Time: 1000 property lookups × 50ms avg = 50 seconds

With Index:
1. Traverse alice.out(FRIENDS) → 1000 friend IDs
2. Intersect with city_index["San Francisco"]
3. Return intersection
Time: 1000 IDs × 10μs = 10ms
Speedup: 5,000×

Problem 3: Backward Traversals (Fan-In)

Query: "Who follows celebrity user 'taylor_swift'?"

Without Edge Index:
Scan all FOLLOWS edges (10T edges)
Check if target == taylor_swift
Time: Hours (even with parallelization)

With Inverted Edge Index:
Lookup in_edges[taylor_swift][FOLLOWS] → 100M followers
Time: 100ms (bitmap scan)
Speedup: 36,000×

Why Multi-Level Indexing?

At 100B scale with 1000+ nodes, single-level indexes are insufficient:

Problem: Global index lookup hits all nodes

Query: city = "San Francisco"

Single-Level Approach:
Query coordinator → Broadcast query to 1000 proxies
Each proxy scans local index
Return results to coordinator
Time: 1000 RPCs (even if each is 5ms) = 5+ seconds network overhead

Multi-Level Approach:
Query coordinator → Check cluster-level index
Cluster index says: "San Francisco users on clusters 2, 5, 7"
Query only 3 clusters (300 proxies) instead of all 1000
Time: 300 RPCs = 1.5 seconds
Speedup: 3.3×

Insight: Multi-level indexes enable partition pruning to avoid querying irrelevant nodes.

Goals

  1. Online Index Construction: Build indexes without blocking query traffic
  2. Incremental Updates: Update indexes via WAL without full rebuilds
  3. Multi-Level Hierarchy: Partition → Proxy → Cluster → Global indexes
  4. Fast Lookups: Sub-millisecond index lookups for common queries
  5. Space Efficiency: Indexes <20% of graph data size
  6. Partition Pruning: Skip irrelevant partitions using cluster-level indexes
  7. Consistency: WAL-synchronized index updates across 1000+ nodes

Non-Goals

  • Full-Text Search: Use Elasticsearch/Solr for text search, not graph indexes
  • Geospatial Indexes: Use PostGIS/S2 for spatial queries
  • Machine Learning Embeddings: Use vector databases (Pinecone, Weaviate)
  • Real-Time Streaming: Index updates are near-real-time (seconds), not microseconds

Four-Tier Index Hierarchy

Tier 1: Partition-Level Indexes (Local)

Location: On each partition (16 partitions per proxy, 1000 proxies)

Purpose: Enable fast property lookups within single partition

Index Types:

  1. Hash Index: Exact property match (e.g., user_id = "alice")
  2. Range Index: Numeric/timestamp range queries (e.g., age > 25)
  3. Inverted Index: Set membership (e.g., tags contains "vip")
  4. Edge Index: Outgoing/incoming edge lists per vertex

Example:

Partition 07:0042:05 Indexes:

1. Hash Index (user_id):
"alice" → vertex_offset_12345
"bob" → vertex_offset_67890

2. Range Index (age):
[18-24] → bitmap of vertex offsets
[25-34] → bitmap of vertex offsets
[35-44] → bitmap of vertex offsets

3. Inverted Index (city):
"San Francisco" → [vertex_offset_123, vertex_offset_456, ...]
"New York" → [vertex_offset_789, ...]

4. Edge Index (FOLLOWS):
Outgoing: vertex_12345 → [target_1, target_2, ...]
Incoming: vertex_12345 → [source_1, source_2, ...]

Storage Format:

message PartitionIndex {
string partition_id = 1; // "07:0042:05"
int64 vertex_count = 2;
int64 edge_count = 3;
int64 last_updated = 4; // Unix timestamp

// Schema version for index format evolution
// Version history:
// v1: Initial release (hash + range indexes)
// v2: Added inverted indexes (2025-01)
// v3: Added edge indexes (2025-03)
// v4: Roaring bitmap compression (2025-06)
// v5: Bloom filter optimization (2025-09)
int32 schema_version = 10; // Current: 5

// Hash indexes (property → vertex ID list)
map<string, PropertyHashIndex> hash_indexes = 5;

// Range indexes (property → range → bitmap)
map<string, PropertyRangeIndex> range_indexes = 6;

// Inverted indexes (property value → vertex ID list)
// Added in v2
map<string, InvertedIndex> inverted_indexes = 7;

// Edge indexes (label → vertex → targets/sources)
// Added in v3
map<string, EdgeIndex> edge_indexes = 8;

// Bloom filter for fast negative lookups
// Optimized in v5 (reduced false positive rate 1% → 0.1%)
bytes bloom_filter = 9;
}

message PropertyHashIndex {
// property_value → list of vertex offsets
map<string, VertexOffsetList> entries = 1;
}

message PropertyRangeIndex {
repeated RangeBucket buckets = 1;
}

message RangeBucket {
int64 min_value = 1;
int64 max_value = 2;
bytes vertex_bitmap = 3; // Roaring bitmap of vertex offsets (compressed bitmap format: 80:1 compression ratio)
}

message InvertedIndex {
// property_value → list of vertex IDs (full IDs, not offsets)
map<string, VertexIDList> postings = 1;
}

message EdgeIndex {
// vertex_id → list of edge targets
map<string, EdgeTargetList> outgoing = 1;
// vertex_id → list of edge sources
map<string, EdgeSourceList> incoming = 2;
}

Note on Roaring Bitmaps:

Throughout this RFC, bitmaps use Roaring Bitmap compression format for efficient storage and fast operations:

  • Compression: 80:1 ratio for clustered IDs (e.g., 800 MB → 10 MB)
  • Operations: Intersection, union in O(n) time where n = number of bitmap containers
  • Use cases: Range index buckets, inverted edge indexes, partition bloom filters

See Inverted Edge Index section for detailed compression example.

Size Estimate:

Partition: 6.25M vertices, 62.5M edges

Hash Index (user_id):
6.25M entries × 50 bytes = 312 MB

Range Index (age):
100 buckets × 1 MB bitmap = 100 MB

Inverted Index (city):
10,000 cities × 625 vertices avg × 20 bytes = 125 MB

Edge Index:
6.25M vertices × 10 edges avg × 8 bytes = 500 MB

Bloom Filter: 10 MB

Total per partition: ~1 GB
Total per proxy (16 partitions): ~16 GB
Total cluster (1000 proxies): ~16 TB

Index Schema Versioning and Migration

Problem: At 100B scale with 16,000 partitions, upgrading index format without downtime requires schema versioning and graceful migration.

Schema Evolution Strategy:

type IndexMigrator struct {
currentVersion int32
}

func (im *IndexMigrator) LoadPartitionIndex(
partitionID string,
data []byte,
) (*PartitionIndex, error) {
// Deserialize index
index := &PartitionIndex{}
proto.Unmarshal(data, index)

// Check schema version
if index.SchemaVersion == 0 {
// Legacy index (pre-versioning)
index = im.MigrateFromV0(index)
}

// Upgrade if needed
for index.SchemaVersion < im.currentVersion {
switch index.SchemaVersion {
case 1:
index = im.MigrateV1ToV2(index)
case 2:
index = im.MigrateV2ToV3(index)
case 3:
index = im.MigrateV3ToV4(index)
case 4:
index = im.MigrateV4ToV5(index)
default:
return nil, fmt.Errorf("unknown version: %d", index.SchemaVersion)
}
}

return index, nil
}

func (im *IndexMigrator) MigrateV2ToV3(index *PartitionIndex) *PartitionIndex {
// v2 → v3: Add edge indexes
log.Infof("Migrating partition %s from v2 to v3 (adding edge indexes)", index.PartitionId)

// Edge indexes built on first query (lazy initialization)
index.EdgeIndexes = make(map[string]*EdgeIndex)
index.SchemaVersion = 3

return index
}

func (im *IndexMigrator) MigrateV3ToV4(index *PartitionIndex) *PartitionIndex {
// v3 → v4: Compress range index bitmaps with Roaring
log.Infof("Migrating partition %s from v3 to v4 (Roaring compression)", index.PartitionId)

for propName, rangeIndex := range index.RangeIndexes {
for i, bucket := range rangeIndex.Buckets {
// Convert bitmap to Roaring format
roaringBitmap := im.ConvertToRoaring(bucket.VertexBitmap)
index.RangeIndexes[propName].Buckets[i].VertexBitmap = roaringBitmap
}
}

index.SchemaVersion = 4

return index
}

Upgrade Path:

migration_strategy:
# Phase 1: Deploy new proxy version (supports v1-v5 read)
phase_1:
duration: 1 week
action: Rolling deployment of proxies with multi-version reader
risk: None (read-only, backward compatible)

# Phase 2: Background migration (v1-v4 → v5)
phase_2:
duration: 2 weeks
action: Migrate partitions during low-traffic periods
rate_limit: 10 partitions/minute (avoid overload)
progress: Track via metrics (16,000 partitions total)

# Phase 3: Deprecate old formats
phase_3:
duration: 1 month
action: Remove v1-v4 read support from codebase
requirement: 100% of partitions migrated to v5

Migration Metrics:

# Prometheus metrics for migration tracking
prism_index_version{partition_id="07:0042:05"} 5
prism_index_migrations_total{from_version="3",to_version="4"} 1200
prism_index_migration_duration_seconds{from_version="3",to_version="4"} 2.5

Backward Compatibility Guarantee:

  • Proxies ALWAYS support reading N-1 versions (e.g., v5 proxy reads v4, v5)
  • Migration happens asynchronously, never blocking queries
  • Indexes written in old format continue to work until migrated
  • Zero downtime guarantee during schema upgrades

Tier 2: Proxy-Level Indexes (Aggregated)

Location: On each proxy (1000 proxies)

Purpose: Enable cross-partition queries within single proxy

Index Types:

  1. Partition Directory: Which partitions contain which property values
  2. Min/Max Statistics: Range bounds per partition
  3. Cardinality Estimates: How many matches per partition

Example:

Proxy 0042 Index (16 partitions):

Partition Directory (city):
"San Francisco":
- Partition 07:0042:01 (1,250 vertices)
- Partition 07:0042:05 (3,000 vertices)
- Partition 07:0042:12 (800 vertices)
"New York":
- Partition 07:0042:03 (2,500 vertices)
- Partition 07:0042:08 (1,800 vertices)

Min/Max Statistics (age):
Partition 07:0042:01: min=18, max=65
Partition 07:0042:02: min=22, max=58
...

Cardinality Estimates:
city="San Francisco": 5,050 vertices (across 3 partitions)
age>30: 42,000 vertices (across 12 partitions)

Query Optimization:

Query: city = "San Francisco" AND age > 50

Without Proxy Index:
Scan all 16 partitions
16 partition index lookups

With Proxy Index:
1. Check partition directory: Only partitions 1, 5, 12 have San Francisco
2. Check min/max: All 3 partitions have age > 50
3. Scan only 3 partitions
Result: 3 partition lookups instead of 16 (5.3× faster)

Tier 3: Cluster-Level Indexes (Regional)

Location: On each cluster gateway (10 clusters)

Purpose: Enable partition pruning across proxies in cluster

Index Types:

  1. Proxy Directory: Which proxies contain which property values
  2. Bloom Filter Cascade: Fast negative lookups across 100 proxies
  3. Hot Property Cache: Most frequently queried property values

Example:

Cluster 07 Index (100 proxies):

Proxy Directory (city):
"San Francisco":
- Proxies: 5, 12, 18, 23, 34, 42, 51, 67, 78, 89 (10 proxies)
- Estimated vertices: 125,000
"New York":
- Proxies: 3, 8, 15, 22, 29, 37, 44, 56, 63, 71 (10 proxies)
- Estimated vertices: 200,000

Bloom Filter Cascade:
For each proxy: Bloom filter of all property values
Query: "Does proxy 42 have city='San Francisco'?"
Answer: Check bloom filter (10 μs) → Yes/No

Hot Property Cache (top 100 most queried):
city="San Francisco" → [list of proxy IDs]
user_id="taylor_swift" → proxy_45
organization_id="acme-corp" → proxy_23

Query Optimization:

Query: city = "San Francisco"

Without Cluster Index:
Query all 100 proxies in cluster

With Cluster Index:
1. Check proxy directory
2. Identify 10 proxies with San Francisco data
3. Query only those 10 proxies
Result: 10 proxy queries instead of 100 (10× faster)

Tier 4: Global Index (Cross-Cluster)

Location: Global query coordinator

Purpose: Route queries to relevant clusters

Index Types:

  1. Cluster Directory: Which clusters contain which data
  2. Data Distribution Stats: Vertex/edge counts per cluster
  3. Query Routing Hints: Pre-computed routing for common queries

Example:

Global Index (10 clusters):

Cluster Directory (geographic data):
US users: Clusters 0, 1, 2, 3
Europe users: Clusters 4, 5, 6
Asia users: Clusters 7, 8, 9

Cluster Directory (vertex labels):
User vertices: All clusters (uniform distribution)
Post vertices: Clusters 2, 5, 7 (80% of posts)
Organization vertices: Cluster 1 (centralized)

Query Routing Hints:
Query pattern: "FOLLOWS graph in US"
→ Route to clusters 0, 1, 2, 3 only

Query pattern: "Organization dependencies"
→ Route to cluster 1 first, then expand if needed

Query Optimization:

Query: User vertices where country = "Germany"

Without Global Index:
Query all 10 clusters

With Global Index:
1. Check cluster directory
2. German users are in clusters 4, 5
3. Query only clusters 4, 5
Result: 2 cluster queries instead of 10 (5× faster)

Online Index Building

Challenge: Build Indexes Without Downtime

At 100B scale, building indexes from scratch takes days:

Estimate:
100B vertices × 100 bytes per index entry = 10 TB index data
Write speed: 100 MB/s (single-threaded)
Time: 10 TB ÷ 100 MB/s = 100,000 seconds = 27 hours

Solution: Incremental index construction using multi-phase approach.

Phase 1: Partition-Parallel Index Construction

Approach: Build indexes for each partition independently in parallel

Architecture:
1000 proxies × 16 partitions = 16,000 index build tasks
Run 16,000 tasks in parallel
Each task builds index for ~6.25M vertices

Time:
Per-partition build: 6.25M vertices × 10 μs = 62.5 seconds
Parallel across 16,000 partitions: 62.5 seconds total
Result: Complete partition indexes in ~1 minute

Implementation:

func (pm *PartitionManager) BuildPartitionIndex(partitionID string) error {
partition := pm.GetPartition(partitionID)

// Create index builders
builders := []IndexBuilder{
NewHashIndexBuilder("user_id"),
NewRangeIndexBuilder("age"),
NewInvertedIndexBuilder("city"),
NewEdgeIndexBuilder("FOLLOWS"),
}

// Scan all vertices in partition
scanner := partition.NewVertexScanner()
for scanner.Next() {
vertex := scanner.Vertex()

// Update all index builders
for _, builder := range builders {
builder.Add(vertex)
}
}

// Finalize indexes
index := &PartitionIndex{
PartitionID: partitionID,
VertexCount: scanner.Count(),
LastUpdated: time.Now().Unix(),
}

for _, builder := range builders {
builder.Finalize(index)
}

// Write index to storage
return pm.WritePartitionIndex(partitionID, index)
}

Phase 2: Query-Serving with Incomplete Indexes

Problem: Some partitions have indexes, others don't

Solution: Graceful degradation with index availability flags

func (qe *QueryExecutor) ExecuteQuery(query *Query) (*QueryResult, error) {
partitions := qe.IdentifyPartitions(query)

results := []*PartialResult{}

for _, partition := range partitions {
// Check if index exists
if partition.HasIndex(query.IndexedProperty) {
// Fast path: Use index
result := partition.QueryWithIndex(query)
results = append(results, result)
} else {
// Slow path: Full scan
log.Warnf("Partition %s lacks index for %s, using full scan",
partition.ID, query.IndexedProperty)
result := partition.QueryWithFullScan(query)
results = append(results, result)
}
}

return qe.MergeResults(results), nil
}

User Experience:

Query: city = "San Francisco"

Time T0 (no indexes):
Query latency: 50 seconds (full scan of 100B vertices)

Time T1 (30% of partitions have indexes):
Query latency: 15 seconds (indexed) + 35 seconds (scan) = 35 seconds

Time T2 (70% of partitions have indexes):
Query latency: 5 seconds (indexed) + 15 seconds (scan) = 15 seconds

Time T3 (100% of partitions have indexes):
Query latency: 2 seconds (fully indexed)

Phase 3: Incremental Index Updates via WAL

Approach: Update indexes in near-real-time as writes occur

Write Path:
1. Client writes vertex/edge
2. Write appended to distributed WAL (Kafka)
3. WAL consumer on each partition applies write
4. Index updater processes WAL entry

WAL Entry Format:
- operation: CREATE_VERTEX, UPDATE_VERTEX, DELETE_VERTEX, CREATE_EDGE, DELETE_EDGE
- partition_id: Target partition
- vertex_id: Affected vertex
- properties: Changed properties
- timestamp: Write timestamp

Index Update Algorithm:

func (iu *IndexUpdater) ProcessWALEntry(entry *WALEntry) error {
partition := iu.GetPartition(entry.PartitionID)

switch entry.Operation {
case CREATE_VERTEX:
// Add to all applicable indexes
partition.HashIndex["user_id"].Add(entry.Vertex.ID, entry.Vertex.Properties["user_id"])
partition.RangeIndex["age"].Add(entry.Vertex.ID, entry.Vertex.Properties["age"])
partition.InvertedIndex["city"].Add(entry.Vertex.Properties["city"], entry.Vertex.ID)

case UPDATE_VERTEX:
// Remove old values, add new values
oldProps := partition.GetVertexProperties(entry.Vertex.ID)
partition.InvertedIndex["city"].Remove(oldProps["city"], entry.Vertex.ID)
partition.InvertedIndex["city"].Add(entry.Vertex.Properties["city"], entry.Vertex.ID)

case DELETE_VERTEX:
// Remove from all indexes
partition.HashIndex["user_id"].Remove(entry.Vertex.ID)
partition.RangeIndex["age"].Remove(entry.Vertex.ID)
partition.InvertedIndex["city"].RemoveVertex(entry.Vertex.ID)

case CREATE_EDGE:
// Update edge indexes
partition.EdgeIndex["FOLLOWS"].AddOutgoing(entry.Edge.FromVertexID, entry.Edge.ToVertexID)
partition.EdgeIndex["FOLLOWS"].AddIncoming(entry.Edge.ToVertexID, entry.Edge.FromVertexID)

case DELETE_EDGE:
partition.EdgeIndex["FOLLOWS"].RemoveOutgoing(entry.Edge.FromVertexID, entry.Edge.ToVertexID)
partition.EdgeIndex["FOLLOWS"].RemoveIncoming(entry.Edge.ToVertexID, entry.Edge.FromVertexID)
}

return nil
}

Consistency Guarantees:

Consistency Model: Eventual Consistency

Timeline:
T0: Write committed to WAL (durable)
T0 + 50ms: Index update applied on partition
T0 + 100ms: Proxy-level index updated
T0 + 500ms: Cluster-level index updated
T0 + 1s: Global index updated

Query Behavior:
- Queries see index updates within 1 second
- Stale index entries possible for 1 second window
- Applications can specify "read_after_write" consistency for critical queries

Phase 4: Background Index Optimization

Problem: Incremental updates fragment indexes

Solution: Periodic background compaction

func (io *IndexOptimizer) CompactPartitionIndexes() {
ticker := time.NewTicker(1 * time.Hour)

for range ticker.C {
partitions := io.GetAllPartitions()

for _, partition := range partitions {
// Check if compaction needed
if partition.IndexFragmentation() > 0.2 {
// Rebuild index in background
go io.RebuildPartitionIndex(partition.ID)
}
}
}
}

func (io *IndexOptimizer) RebuildPartitionIndex(partitionID string) {
partition := io.GetPartition(partitionID)

// Build new index (doesn't block queries)
newIndex := partition.BuildFreshIndex()

// Atomic swap
partition.SwapIndex(newIndex)

log.Infof("Compacted index for partition %s, fragmentation: %0.1f%% → %0.1f%%",
partitionID, partition.OldFragmentation*100, partition.NewFragmentation*100)
}

Distributed WAL for Index Synchronization

Why Distributed WAL?

At 1000+ nodes, centralized WAL (single Kafka cluster) becomes bottleneck:

Centralized WAL Limits:
- Kafka partition limit: ~1000 partitions
- Write throughput: 100k writes/sec per partition
- Total throughput: 100M writes/sec (hard limit)

Distributed WAL Solution:
- 10 Kafka clusters (one per graph cluster)
- Each Kafka cluster handles 100 proxies
- Total throughput: 1B writes/sec (10× higher)

Distributed WAL Architecture

┌────────────────────────────────────────────────────────────────┐
│ Graph Cluster 0 │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ WAL Cluster 0 (Kafka) │ │
│ │ - 100 partitions (one per proxy) │ │
│ │ - Replication factor: 3 │ │
│ │ - Retention: 7 days │ │
│ │ - Throughput: 100M writes/sec │ │
│ └────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌───────────┼───────────┬─────────────┐ │
│ │ │ │ │ │
│ ┌────▼────┐ ┌───▼──────┐ ┌──▼───────┐ ┌──▼──────────┐ │
│ │ Proxy 0 │ │ Proxy 1 │ │ Proxy 2 │ │ Proxy 99 │ │
│ │ │ │ │ │ │ │ │ │
│ │ WAL │ │ WAL │ │ WAL │ │ WAL │ │
│ │Consumer │ │Consumer │ │Consumer │ │Consumer │ │
│ └─────────┘ └──────────┘ └──────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

WAL Message Format

message WALMessage {
// Message metadata
string message_id = 1; // UUID
int64 timestamp = 2; // Unix timestamp (nanoseconds)
string source_proxy_id = 3; // Originating proxy
string cluster_id = 4; // Graph cluster

// Partition routing
string partition_id = 5; // Target partition

// Operation
WALOperation operation = 6;

// Ordering
int64 sequence_number = 7; // Per-partition sequence
string causality_token = 8; // For causal consistency
}

message WALOperation {
oneof op {
CreateVertexOp create_vertex = 1;
UpdateVertexOp update_vertex = 2;
DeleteVertexOp delete_vertex = 3;
CreateEdgeOp create_edge = 4;
DeleteEdgeOp delete_edge = 5;
}
}

message CreateVertexOp {
string vertex_id = 1;
string label = 2;
map<string, bytes> properties = 3;
repeated string tags = 4; // For authorization (RFC-061)
}

message UpdateVertexOp {
string vertex_id = 1;
map<string, bytes> updated_properties = 2; // Only changed properties
repeated string removed_properties = 3;
}

message DeleteVertexOp {
string vertex_id = 1;
bool cascade_edges = 2; // Delete connected edges
}

message CreateEdgeOp {
string edge_id = 1;
string label = 2;
string from_vertex_id = 3;
string to_vertex_id = 4;
map<string, bytes> properties = 5;
}

message DeleteEdgeOp {
string edge_id = 1;
}

WAL Consumer with Index Updates

func (wc *WALConsumer) ConsumeWAL(partitionID string) {
consumer := wc.KafkaConsumer.Subscribe([]string{wc.Topic}, partitionID)

for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
continue
}

walMsg := &WALMessage{}
proto.Unmarshal(msg.Value, walMsg)

// Apply to graph storage
err := wc.ApplyToGraph(walMsg)
if err != nil {
log.Errorf("Failed to apply WAL message: %v", err)
continue
}

// Update indexes
err = wc.UpdateIndexes(walMsg)
if err != nil {
log.Errorf("Failed to update indexes: %v", err)
// Don't fail - index updates are best-effort
}

// Commit offset
consumer.CommitMessage(msg)
}
}

func (wc *WALConsumer) UpdateIndexes(msg *WALMessage) error {
partition := wc.GetPartition(msg.PartitionID)

switch op := msg.Operation.Op.(type) {
case *WALOperation_CreateVertex:
// Update partition-level indexes
partition.UpdateIndexesForVertexCreate(op.CreateVertex)

// Update proxy-level indexes
wc.ProxyIndexManager.UpdateForVertexCreate(msg.PartitionID, op.CreateVertex)

// Async: Update cluster-level indexes
wc.ClusterIndexUpdater.EnqueueUpdate(msg)

case *WALOperation_UpdateVertex:
partition.UpdateIndexesForVertexUpdate(op.UpdateVertex)
wc.ProxyIndexManager.UpdateForVertexUpdate(msg.PartitionID, op.UpdateVertex)
wc.ClusterIndexUpdater.EnqueueUpdate(msg)

// ... other operations
}

return nil
}

Bloom Filter Cascade for Fast Negative Lookups

Problem: False Positives Waste Query Time

Query: user_id = "nonexistent_user_123"

Without Bloom Filters:
1. Query all 1000 proxies
2. Each proxy checks its index
3. All return "not found"
Time: 1000 RPCs × 5ms = 5 seconds (wasted)

With Bloom Filters:
1. Check bloom filter for each proxy (fast)
2. Bloom filter says "definitely not here" for 999 proxies
3. Query only 1 proxy (false positive)
4. Return "not found"
Time: 1000 bloom checks × 10μs + 1 RPC × 5ms = 15ms
Speedup: 333×

Bloom Filter Hierarchy

Level 1: Partition Bloom Filters (16 per proxy)
Size: 1 MB per partition
False positive rate: 0.1%
Build time: 62 seconds per partition

Level 2: Proxy Bloom Filters (1000 total)
Size: 16 MB per proxy (union of 16 partition filters)
False positive rate: 0.1%
Build time: 10 seconds per proxy

Level 3: Cluster Bloom Filters (10 total)
Size: 160 MB per cluster (union of 100 proxy filters)
False positive rate: 0.2%
Build time: 100 seconds per cluster

Level 4: Global Bloom Filter (1 total)
Size: 1.6 GB (union of 10 cluster filters)
False positive rate: 0.5%
Build time: 1000 seconds (16 minutes)

Bloom Filter Implementation

type BloomFilterCascade struct {
partitionFilters map[string]*bloom.BloomFilter // 16,000 filters
proxyFilters map[string]*bloom.BloomFilter // 1,000 filters
clusterFilters map[string]*bloom.BloomFilter // 10 filters
globalFilter *bloom.BloomFilter // 1 filter
}

func (bfc *BloomFilterCascade) MayContain(vertexID string) *QueryHints {
hints := &QueryHints{}

// Level 1: Check global filter
if !bfc.globalFilter.Contains(vertexID) {
hints.DefinitelyNotExists = true
return hints
}

// Level 2: Check cluster filters
candidateClusters := []string{}
for clusterID, filter := range bfc.clusterFilters {
if filter.Contains(vertexID) {
candidateClusters = append(candidateClusters, clusterID)
}
}

if len(candidateClusters) == 0 {
hints.DefinitelyNotExists = true
return hints
}

// Level 3: Check proxy filters (within candidate clusters)
candidateProxies := []string{}
for _, clusterID := range candidateClusters {
for proxyID, filter := range bfc.GetProxyFiltersForCluster(clusterID) {
if filter.Contains(vertexID) {
candidateProxies = append(candidateProxies, proxyID)
}
}
}

hints.CandidateClusters = candidateClusters
hints.CandidateProxies = candidateProxies
return hints
}

Inverted Edge Indexes for Backward Traversals

Problem: Backward Traversals Are Expensive

Example: "Who follows @taylor_swift?"

Forward Edge (default storage):
user:alice → FOLLOWS → user:taylor_swift

Backward Traversal (without index):
Scan ALL vertices, check if they have FOLLOWS edge to taylor_swift
Time: 100B vertices scan = 27 hours

Backward Traversal (with inverted edge index):
Lookup incoming_edges[user:taylor_swift][FOLLOWS] → bitmap of followers
Time: Bitmap scan = 100ms
Speedup: 972,000×

Inverted Edge Index Structure

Forward Edge Index (default):
source_vertex → edge_label → list of target vertices

Example:
user:alice → FOLLOWS → [user:bob, user:charlie, user:taylor_swift]

Inverted Edge Index (for backward traversals):
target_vertex → edge_label → list of source vertices

Example:
user:taylor_swift → FOLLOWS → [user:alice, user:bob, user:charlie, ...]
(100M followers)

Storage Format:

message InvertedEdgeIndex {
string partition_id = 1;

// target_vertex → edge_label → bitmap of source vertices
map<string, EdgeLabelBitmaps> inverted_edges = 2;
}

message EdgeLabelBitmaps {
// edge_label → compressed bitmap of source vertex IDs
map<string, bytes> label_bitmaps = 1;
}

Compression: Use Roaring Bitmaps

Uncompressed:
100M follower IDs × 8 bytes = 800 MB

Roaring Bitmap Compressed:
100M follower IDs (clustered) → ~10 MB
Compression ratio: 80:1

Fan-In Query Optimization

Query: "Users who follow BOTH @taylor_swift AND @beyonce"

Without Inverted Index:
1. Find all users (100B vertices)
2. For each user, check if they follow both
Time: 100B × 2 edge checks = Days

With Inverted Index:
1. Get followers of taylor_swift: bitmap_1 (100M bits)
2. Get followers of beyonce: bitmap_2 (50M bits)
3. Compute intersection: bitmap_1 AND bitmap_2
4. Result: 5M mutual followers
Time: Bitmap AND operation = 50ms
Speedup: >1,000,000×

Implementation

func (pm *PartitionManager) GetIncomingEdges(vertexID string, edgeLabel string) ([]string, error) {
partition := pm.GetPartitionForVertex(vertexID)

// Check if inverted index exists
invertedIndex := partition.GetInvertedEdgeIndex()
if invertedIndex == nil {
return nil, errors.New("inverted edge index not available")
}

// Lookup bitmap of source vertices
bitmapData := invertedIndex.InvertedEdges[vertexID].LabelBitmaps[edgeLabel]
bitmap := roaring.New()
bitmap.UnmarshalBinary(bitmapData)

// Convert bitmap to vertex ID list
sources := []string{}
for _, vertexOffset := range bitmap.ToArray() {
vertexID := partition.OffsetToVertexID(vertexOffset)
sources = append(sources, vertexID)
}

return sources, nil
}

Index Storage and Persistence

Index Storage Locations

Index TierStorage LocationDurabilityAccess Latency
Partition IndexIn-memory (proxy RAM) + S3 backupDurable via S31 μs (memory), 100 ms (S3)
Proxy IndexIn-memory (proxy RAM)Ephemeral (rebuilt on restart)1 μs
Cluster IndexIn-memory (cluster gateway)Ephemeral1 μs
Global IndexIn-memory (query coordinator)Ephemeral1 μs

Index Persistence Strategy

Partition Indexes (durable):
Location: S3 bucket per cluster
Path: s3://prism-indexes-cluster-07/partition-0042-05/index-v1.pb
Update frequency: Every 5 minutes
Format: Protobuf + gzip compression

Proxy Indexes (ephemeral):
Rebuilt from partition indexes on proxy restart
Build time: 10 seconds per proxy

Cluster Indexes (ephemeral):
Rebuilt from proxy indexes on cluster gateway restart
Build time: 100 seconds per cluster

Global Index (ephemeral):
Rebuilt from cluster indexes on coordinator restart
Build time: 1000 seconds (16 minutes)

Index Snapshot Format

S3 Path:
s3://prism-indexes-cluster-{cluster_id}/
partition-{proxy_id}-{partition_id}/
index-{version}.pb.gz
bloom-filter-{version}.bin
inverted-edges-{version}.roaring

Example:
s3://prism-indexes-cluster-07/
partition-0042-05/
index-v20251115.pb.gz (1 GB compressed)
bloom-filter-v20251115.bin (10 MB)
inverted-edges-v20251115.roaring (500 MB compressed)

Performance Characteristics

Index Build Times

Index TypeVerticesBuild Time (Single-threaded)Build Time (Parallel)
Partition Hash Index6.25M62 seconds62 seconds
Partition Range Index6.25M125 seconds125 seconds
Partition Inverted Index6.25M187 seconds187 seconds
Partition Edge Index62.5M edges312 seconds312 seconds
Full Partition Index6.25M vertices686 seconds (11 min)686 seconds
All Partitions (16,000)100B vertices127 days (serial)11 minutes (parallel)

Query Speedups with Indexes

Query PatternWithout IndexWith IndexSpeedup
Exact vertex lookup27 hours (full scan)50 μs (hash index)1,944,000,000×
Property filter (city="SF")27 hours2 seconds48,600×
Range query (age>30)27 hours5 seconds19,440×
Backward traversal (followers)27 hours100 ms972,000×
Fan-in intersectionWeeks50 ms>10,000,000×

Index Space Overhead

Graph Data:
100B vertices × 100 bytes = 10 TB
10T edges × 20 bytes = 200 TB
Total: 210 TB

Index Data:
Partition indexes: 16 TB
Inverted edge indexes: 8 TB
Bloom filters: 1.6 GB
Total: 24 TB

Overhead: 24 TB ÷ 210 TB = 11.4%

Index Tiering Strategy

Problem: At 100B scale, index memory requirements exceed available capacity (MEMO-050 Finding 5).

Memory Capacity Reconciliation

Original Memory Budget (RFC-057):

Total memory: 30 TB (1000 proxies × 30 GB each)
Hot data allocation: 21 TB (10% of 210 TB graph)
Index allocation: 16 TB (partition indexes)
Total needed: 21 TB + 16 TB = 37 TB ❌
Available: 30 TB
Shortfall: 7 TB (23% over budget)

Root Cause: All indexes assumed to be in memory simultaneously. At massive scale, not all indexes are equally valuable—most queries touch a small fraction of partitions.

Index Temperature Classification

Similar to data tiers (RFC-059), indexes should be classified by access frequency:

TemperatureAccess PatternStorageSizeExample
Hot>1000 req/minMemory (RAM)4.8 TBActive user indexes, trending topics
Warm10-1000 req/minMemory (evictable)6.4 TBRecent activity indexes
Cold<10 req/minS3 (on-demand load)12.8 TBArchive indexes, dormant users

Power-Law Distribution: At scale, index access follows Zipf distribution:

  • Top 20% of indexes handle 80% of queries (hot)
  • Next 30% handle 15% of queries (warm)
  • Bottom 50% handle 5% of queries (cold)

Revised Memory Budget

With Index Tiering:

Hot Data (RFC-057):
Graph data: 21 TB (10% of 210 TB)

Hot Indexes (30% of total indexes in memory):
Partition indexes: 4.8 TB (30% of 16 TB)
Inverted edge indexes: 2.4 TB (30% of 8 TB)
Bloom filters: 1.6 GB (all in memory - cheap)
Total hot indexes: 7.2 TB

Total memory used: 21 TB + 7.2 TB = 28.2 TB ✅
Available: 30 TB
Headroom: 1.8 TB (6% buffer for spikes)

Cost Savings: Storing 70% of indexes on S3 instead of RAM:

  • Cold indexes: 16.8 TB × $23/TB/month (S3) = $386/month
  • vs in-memory: 16.8 TB × $500/TB/month (RAM) = $8,400/month
  • Savings: $8,014/month = $96k/year per 16.8 TB

Index Promotion/Demotion Logic

Promotion Criteria (Cold → Warm → Hot):

type IndexTemperature struct {
PartitionID string
RequestsPerMinute float64
LastAccessTime time.Time
LoadTimeP99 time.Duration
CurrentState TemperatureState
}

func (im *IndexManager) EvaluateTemperature(index *IndexTemperature) TemperatureState {
rpm := index.RequestsPerMinute

// Promotion thresholds
if rpm > 1000 && index.CurrentState != Hot {
return Hot // Immediate promotion to hot
}

if rpm > 10 && index.CurrentState == Cold {
return Warm // Cold → Warm
}

// Demotion thresholds (with hysteresis to prevent thrashing)
if rpm < 8 && index.CurrentState == Warm { // 20% hysteresis below 10
if time.Since(index.LastAccessTime) > 10*time.Minute {
return Cold
}
}

if rpm < 800 && index.CurrentState == Hot { // 20% hysteresis below 1000
if time.Since(index.LastAccessTime) > 5*time.Minute {
return Warm
}
}

return index.CurrentState // No state change
}

Background Process:

func (im *IndexManager) TemperatureMonitor() {
ticker := time.NewTicker(60 * time.Second)

for range ticker.C {
for _, index := range im.GetAllIndexes() {
currentTemp := index.Temperature
evaluatedTemp := im.EvaluateTemperature(index)

if evaluatedTemp != currentTemp {
switch evaluatedTemp {
case Hot:
im.LoadIndexToMemory(index.PartitionID, PriorityHigh)
case Warm:
im.LoadIndexToMemory(index.PartitionID, PriorityMedium)
case Cold:
im.OffloadIndexToS3(index.PartitionID)
}
}
}
}
}

Performance Trade-offs

OperationHot IndexWarm IndexCold Index
Index Lookup50 μs50 μs (if cached)100 ms (S3 load)
First Query After Load50 μs2 ms (partial load)5 s (full S3 download)
Memory Footprint7.2 TB+6.4 TB (evictable)0 TB (S3)
Cost per TB/month$500 (RAM)$500 (RAM)$23 (S3)

Query Impact:

  • Hot partition query: Zero impact (index already in memory)
  • Warm partition query: ~2 ms first query, then cached
  • Cold partition query: ~5 s first query (rare: <5% of queries)

Acceptable Trade-off: 95% of queries hit hot/warm indexes (<2 ms overhead), 5% pay S3 load cost (~5s). At 1B queries/sec, only 50M queries/sec hit cold indexes.

Integration with RFC-059 Data Tiers

Coordinated Temperature Management:

partition_temperature_policy:
# Data and indexes move together
hot_partition:
data: memory (RFC-059)
indexes: memory (this RFC)

warm_partition:
data: partial_memory (RFC-059)
indexes: memory_evictable (this RFC)

cold_partition:
data: s3 (RFC-059)
indexes: s3 (this RFC)

# Co-location optimization
co_locate_index_with_data: true # Load index when data loads

# Preemptive loading
prefetch_on_access_pattern: true # Load index if query pattern suggests next access

Example Workflow:

  1. Cold partition receives query → Load data from S3 (~5s) + Load index from S3 (~5s in parallel) = ~5s total
  2. Partition promoted to Warm → Keep data + index in memory
  3. Continued high access → Promoted to Hot → Locked in memory (won't be evicted)

Alternative Approaches Considered

Option 1: Partial Indexes (Rejected)

  • Approach: Keep only top-K entries per index in memory
  • Rejected Why: Incomplete indexes break query correctness; can't differentiate "not found" vs "not indexed"

Option 2: Index Compression (Evaluated)

  • Approach: Compress cold indexes 10× using zstd
  • Trade-off: 10× compression → 3× slower decompression (30 ms vs 3 ms)
  • Decision: Use compression for S3 storage, but decompress fully on load to memory

Option 3: No Indexes for Cold Data (Rejected)

  • Approach: Cold partition queries use full scan instead of indexes
  • Rejected Why: 1.56M vertices × 100 bytes = 156 MB scan @ 1 GB/s = 156 ms minimum (vs 5s S3 load with index = faster queries)

Summary

Index tiering solves the 37 TB → 30 TB memory capacity problem by applying temperature-based management to indexes, mirroring data tier strategy (RFC-059). Key results:

  • Memory reconciliation: 28.2 TB used (vs 37 TB needed without tiering)
  • Headroom: 1.8 TB buffer (6%) for traffic spikes
  • Cost savings: $96k/year for every 16.8 TB moved to S3
  • Query impact: 95% of queries unaffected (<2 ms overhead)
  • Scalability: Enables 100B scale within 30 TB memory budget

Open Questions

  1. Index Consistency: Strong vs eventual consistency for index updates?
  2. Partial Index Queries: How to handle queries when indexes incomplete?
  3. Index Versioning: How to upgrade index format without rebuild?
  4. Cross-Partition Joins: How to optimize joins across multiple partitions?
  5. Index Compression Trade-offs: More compression vs faster access?

References

Revision History

  • 2025-11-15: Initial draft - Multi-level indexing with online building and distributed WAL