RFC-060: Distributed Gremlin Query Execution for Massive-Scale Graphs
Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15
Abstract
This RFC defines a distributed Gremlin query execution engine for massive-scale graphs (100B vertices across 1000+ nodes). Standard Gremlin implementations execute queries on single machines, which is infeasible at this scale.
This RFC presents a distributed query planner with four key capabilities:
- Decomposes Gremlin traversals into partition-local and cross-partition operations
- Optimizes execution plans using indexes (RFC-058)
- Routes queries to appropriate storage tiers (RFC-059)
- Applies vertex-level authorization filters (RFC-061)
The query executor supports the full Apache TinkerPop Gremlin specification while achieving sub-second latency for common traversals through intelligent partition pruning, parallel execution, and result streaming.
Key Innovations:
- Query Decomposition: Split Gremlin traversals into distributed execution plan
- Partition Pruning: Use indexes to skip irrelevant partitions (10-100× speedup)
- Adaptive Parallelism: Dynamically adjust parallelism based on intermediate result sizes
- Authorization Push-Down: Apply vertex label filters at partition level
- Result Streaming: Stream results to client without materializing full result set
- Cost-Based Optimizer: Choose execution strategy based on data statistics
Motivation
The Distributed Query Problem
Example Query: "Find all users in San Francisco who follow someone in New York"
g.V().hasLabel('User').has('city', 'San Francisco')
.out('FOLLOWS')
.has('city', 'New York')
Single-Machine Execution (impractical at 100B scale):
Step 1: Scan all vertices (100B vertices)
Step 2: Filter by label='User' and city='San Francisco'
Result: 5M vertices
Step 3: Traverse out('FOLLOWS') for each
Result: 50M edges to follow
Step 4: Filter targets by city='New York'
Result: 2M vertices
Total time: Hours (full scan of 100B vertices)
Distributed Execution (this RFC):
Step 1: Query optimizer analyzes query
- Identifies filter: city='San Francisco'
- Checks cluster-level index
- Finds: Only clusters 0, 2 have San Francisco users
Step 2: Partition pruning
- Broadcast query to clusters 0, 2 (200 proxies)
- Skip clusters 1, 3-9 (800 proxies not queried)
Step 3: Parallel execution on 200 proxies
- Each proxy: Use city_index['San Francisco'] → 25k local vertices
- Total: 5M vertices (200 proxies × 25k)
- Time: 2 seconds (parallel)
Step 4: Distributed traversal
- Fan-out to target partitions (follow edges)
- Filter by city='New York'
- Result: 2M vertices
- Time: 5 seconds (parallel RPC)
Total time: 7 seconds (vs hours)
Speedup: ~2,500×
Challenges at Massive Scale
Challenge 1: Query Explosion
Query: g.V('user:alice').out('FOLLOWS').out('FOLLOWS').out('FOLLOWS')
Hop 1: alice.out('FOLLOWS') → 200 friends
Hop 2: 200 friends × 200 friends = 40,000 friends-of-friends
Hop 3: 40,000 × 200 = 8,000,000 vertices (3rd degree)
Problem: Result set explodes exponentially with hops
Solution: Early termination, limit per hop, result streaming
Challenge 2: Cross-Partition Queries
Query: g.V().has('age', gt(30))
Without indexes:
- Broadcast query to all 1000 proxies
- Each scans local partitions
- Aggregate results
- Time: 60 seconds (network overhead dominates)
With indexes + partition pruning:
- Check cluster-level index: age > 30 in all clusters
- Cannot prune partitions (age distributed uniformly)
- Execute on all 1000 proxies BUT use age_index
- Time: 5 seconds (indexed scan vs full scan)
Challenge 3: Complex Predicate Push-Down
Query: g.V().hasLabel('User').has('age', gt(30)).has('city', 'SF').out('FOLLOWS')
Optimizer decisions:
1. Which filter to apply first? (selectivity)
2. Can we use indexes for each filter?
3. Should we materialize intermediate results?
4. Execute sequentially or in parallel?
Best plan:
1. Use city_index['SF'] → 5M vertices (high selectivity)
2. Filter age > 30 in-memory (low overhead)
3. Stream results, no materialization
4. Parallel execution on filtered vertices
Goals
- Full Gremlin Support: Implement Apache TinkerPop Gremlin specification
- Partition Pruning: Skip irrelevant partitions using indexes
- Parallel Execution: Execute across 1000+ nodes simultaneously
- Sub-Second Latency: Common queries complete in <1 second
- Authorization Integration: Apply vertex label filters (RFC-061)
- Result Streaming: Stream results without full materialization
- Cost-Based Optimization: Choose execution strategy based on statistics
Non-Goals
- ACID Transactions: Distributed transactions not covered (future work)
- Custom Gremlin Steps: Only standard Gremlin 3.x steps supported
- Graph Algorithms: Complex algorithms (PageRank, etc.) in RFC-055
- Schema Enforcement: Schema validation not in query engine
Gremlin Query Decomposition
Query Analysis Phase
type QueryAnalyzer struct {
indexManager *IndexManager
statsManager *StatisticsManager
}
func (qa *QueryAnalyzer) Analyze(gremlinQuery string) (*QueryPlan, error) {
// Parse Gremlin to AST
ast, err := qa.ParseGremlin(gremlinQuery)
if err != nil {
return nil, err
}
// Identify filterable steps
filters := qa.ExtractFilters(ast)
// Check index availability
indexableFilters := []Filter{}
for _, filter := range filters {
if qa.indexManager.HasIndex(filter.Property) {
indexableFilters = append(indexableFilters, filter)
}
}
// Estimate selectivity
selectivity := qa.EstimateSelectivity(filters)
// Generate execution plan
plan := qa.GenerateExecutionPlan(ast, indexableFilters, selectivity)
return plan, nil
}
Example: Parse query into steps
// Original Gremlin
g.V().hasLabel('User').has('city', 'SF').out('FOLLOWS').limit(100)
// Parsed steps
Step 1: V() - Start from all vertices
Step 2: hasLabel('User') - Filter by label
Step 3: has('city', 'SF') - Filter by property
Step 4: out('FOLLOWS') - Traverse outgoing edges
Step 5: limit(100) - Limit results
Query Plan:
message QueryPlan {
string query_id = 1;
repeated ExecutionStage stages = 2;
PartitionPruningStrategy pruning_strategy = 3;
ParallelismConfig parallelism = 4;
ResultStreamingConfig streaming = 5;
}
message ExecutionStage {
StageType type = 1;
repeated string target_partitions = 2; // Partitions to execute on
GremlinStep gremlin_step = 3;
bool use_index = 4;
string index_name = 5;
int64 estimated_cardinality = 6;
}
enum StageType {
STAGE_TYPE_UNSPECIFIED = 0;
STAGE_TYPE_INDEX_SCAN = 1; // Use index
STAGE_TYPE_FULL_SCAN = 2; // Scan all partitions
STAGE_TYPE_TRAVERSAL = 3; // Follow edges
STAGE_TYPE_FILTER = 4; // Apply predicate
STAGE_TYPE_LIMIT = 5; // Limit results
STAGE_TYPE_AGGREGATE = 6; // Group/count/etc
}
Partition Pruning Strategy
Strategy 1: Index-Based Pruning
func (qp *QueryPlanner) PrunePartitionsWithIndex(filter *Filter) []string {
// Check cluster-level index
clusterIndex := qp.GetClusterIndex(filter.Property)
// Find clusters with matching values
candidateClusters := clusterIndex.FindClusters(filter.Value)
// Find proxies within candidate clusters
candidateProxies := []string{}
for _, clusterID := range candidateClusters {
proxyIndex := qp.GetProxyIndex(clusterID, filter.Property)
proxies := proxyIndex.FindProxies(filter.Value)
candidateProxies = append(candidateProxies, proxies...)
}
// Find partitions within candidate proxies
candidatePartitions := []string{}
for _, proxyID := range candidateProxies {
partitionIndex := qp.GetPartitionIndex(proxyID, filter.Property)
partitions := partitionIndex.FindPartitions(filter.Value)
candidatePartitions = append(candidatePartitions, partitions...)
}
return candidatePartitions
}
Example:
Query: g.V().has('city', 'San Francisco')
Without pruning:
- Query all 64,000 partitions (1000 proxies × 64 partitions)
With pruning:
- Cluster index: SF in clusters 0, 2 (2 of 10)
- Proxy index: SF in 50 proxies (of 200 in clusters 0, 2)
- Partition index: SF in 150 partitions (of 800 in 50 proxies)
- Result: Query 150 partitions instead of 64,000
- Speedup: 106×
Strategy 2: Label-Based Pruning
func (qp *QueryPlanner) PrunePartitionsByLabel(label string) []string {
// Check if using label-based partitioning (RFC-057)
if qp.PartitionStrategy == STRATEGY_LABEL_BASED {
// All vertices with label are on specific cluster
clusterID := qp.LabelToClusterMapping[label]
return qp.GetAllPartitionsInCluster(clusterID)
}
// Otherwise, cannot prune (label distributed across all partitions)
return qp.GetAllPartitions()
}
Query Execution Stages
Stage 1: Initial Vertex Selection
Gremlin: g.V().hasLabel('User').has('city', 'SF')
Execution:
1. Use partition pruning to identify 150 partitions
2. Parallel execution on 150 partitions
3. Each partition:
a. Check if city_index exists
b. If yes: city_index['SF'] → vertex IDs
c. If no: Full scan with filter
4. Aggregate results
5. Stream to next stage (no materialization)
Stage 2: Edge Traversal
Gremlin: .out('FOLLOWS')
Execution:
1. Receive vertex IDs from stage 1 (streaming)
2. For each vertex batch (1000 vertices):
a. Group by target partition
b. Parallel fan-out to target partitions
c. Load edge index for each vertex
d. Return target vertex IDs
3. Stream results to next stage
Stage 3: Property Filtering
Gremlin: .has('age', gt(30))
Execution:
1. Receive vertex IDs from stage 2 (streaming)
2. For each vertex batch:
a. Load vertex properties from appropriate tier (hot/warm/cold)
b. Apply filter: age > 30
c. Return matching vertices
3. Stream results to next stage
Stage 4: Result Limiting
Gremlin: .limit(100)
Execution:
1. Receive vertices from stage 3 (streaming)
2. Keep first 100 vertices
3. Cancel upstream stages (early termination)
4. Return results to client
Distributed Execution Engine
Query Coordinator
type QueryCoordinator struct {
queryAnalyzer *QueryAnalyzer
partitionManager *PartitionManager
indexManager *IndexManager
authzManager *AuthorizationManager // RFC-061
}
func (qc *QueryCoordinator) ExecuteGremlin(
ctx context.Context,
gremlinQuery string,
principal *Principal,
) (*ResultStream, error) {
// Step 1: Parse and analyze query
plan, err := qc.queryAnalyzer.Analyze(gremlinQuery)
if err != nil {
return nil, err
}
// Step 2: Apply authorization filters (RFC-061)
plan = qc.authzManager.ApplyAuthzFilters(plan, principal)
// Step 3: Create result stream
resultStream := NewResultStream()
// Step 4: Execute stages in pipeline
go func() {
defer resultStream.Close()
var currentResults chan *Vertex
for _, stage := range plan.Stages {
currentResults = qc.ExecuteStage(ctx, stage, currentResults)
}
// Drain final results to stream
for vertex := range currentResults {
resultStream.Send(vertex)
}
}()
return resultStream, nil
}
Partition Executor
type PartitionExecutor struct {
partition *Partition
}
func (pe *PartitionExecutor) ExecuteStep(
ctx context.Context,
step *GremlinStep,
inputVertices []*Vertex,
) ([]*Vertex, error) {
switch step.Type {
case STEP_HAS_LABEL:
return pe.FilterByLabel(inputVertices, step.Label)
case STEP_HAS_PROPERTY:
return pe.FilterByProperty(inputVertices, step.Property, step.Value)
case STEP_OUT:
return pe.TraverseOut(inputVertices, step.EdgeLabel)
case STEP_IN:
return pe.TraverseIn(inputVertices, step.EdgeLabel)
case STEP_BOTH:
return pe.TraverseBoth(inputVertices, step.EdgeLabel)
default:
return nil, fmt.Errorf("unsupported step: %v", step.Type)
}
}
func (pe *PartitionExecutor) FilterByProperty(
vertices []*Vertex,
property string,
value interface{},
) ([]*Vertex, error) {
// Check if index available
index := pe.partition.GetIndex(property)
if index != nil {
// Fast path: Use index
vertexIDs := index.Lookup(value)
return pe.LoadVertices(vertexIDs), nil
}
// Slow path: Filter in-memory
filtered := []*Vertex{}
for _, v := range vertices {
if v.Properties[property] == value {
filtered = append(filtered, v)
}
}
return filtered, nil
}
func (pe *PartitionExecutor) TraverseOut(
vertices []*Vertex,
edgeLabel string,
) ([]*Vertex, error) {
targets := []*Vertex{}
for _, v := range vertices {
// Get outgoing edges from index
edgeIndex := pe.partition.GetEdgeIndex(edgeLabel)
targetIDs := edgeIndex.GetOutgoing(v.ID)
// Load target vertices
targetVertices := pe.LoadVertices(targetIDs)
targets = append(targets, targetVertices...)
}
return targets, nil
}
Cross-Partition Coordinator
type CrossPartitionCoordinator struct {
proxies map[string]*ProxyClient
}
func (cpc *CrossPartitionCoordinator) ExecuteDistributed(
ctx context.Context,
stage *ExecutionStage,
inputVertices chan *Vertex,
) (chan *Vertex, error) {
outputVertices := make(chan *Vertex, 10000)
// Group input vertices by partition
vertexsByPartition := make(map[string][]*Vertex)
go func() {
for vertex := range inputVertices {
partitionID := cpc.GetPartitionForVertex(vertex.ID)
vertexsByPartition[partitionID] = append(vertexsByPartition[partitionID], vertex)
// Flush when batch full
if len(vertexsByPartition[partitionID]) >= 1000 {
cpc.ExecuteOnPartition(partitionID, stage, vertexsByPartition[partitionID], outputVertices)
vertexsByPartition[partitionID] = []*Vertex{}
}
}
// Flush remaining
for partitionID, vertices := range vertexsByPartition {
if len(vertices) > 0 {
cpc.ExecuteOnPartition(partitionID, stage, vertices, outputVertices)
}
}
close(outputVertices)
}()
return outputVertices, nil
}
func (cpc *CrossPartitionCoordinator) ExecuteOnPartition(
partitionID string,
stage *ExecutionStage,
vertices []*Vertex,
output chan *Vertex,
) {
// Get proxy client for partition
proxyID := cpc.GetProxyForPartition(partitionID)
client := cpc.proxies[proxyID]
// Execute stage on remote partition
results, err := client.ExecuteStep(context.Background(), &ExecuteStepRequest{
PartitionID: partitionID,
Step: stage.GremlinStep,
InputVertices: vertices,
})
if err != nil {
log.Errorf("Failed to execute on partition %s: %v", partitionID, err)
return
}
// Stream results to output channel
for _, vertex := range results.Vertices {
output <- vertex
}
}
Query Optimization
Cost-Based Optimizer
type CostModel struct {
stats *StatisticsManager
}
func (cm *CostModel) EstimateQueryCost(plan *QueryPlan) float64 {
totalCost := 0.0
for _, stage := range plan.Stages {
stageCost := cm.EstimateStageCost(stage)
totalCost += stageCost
}
return totalCost
}
func (cm *CostModel) EstimateStageCost(stage *ExecutionStage) float64 {
switch stage.Type {
case STAGE_TYPE_INDEX_SCAN:
// Index scan cost: O(log N) lookup + O(M) results
lookupCost := math.Log(float64(cm.stats.TotalVertices))
scanCost := float64(stage.EstimatedCardinality)
return lookupCost + scanCost
case STAGE_TYPE_FULL_SCAN:
// Full scan cost: O(N) where N = vertices in partitions
return float64(len(stage.TargetPartitions)) * cm.stats.AvgVerticesPerPartition
case STAGE_TYPE_TRAVERSAL:
// Traversal cost: O(M) edge lookups + O(P) cross-partition RPCs
edgeLookupCost := float64(stage.EstimatedCardinality)
rpcCost := float64(len(stage.TargetPartitions)) * 0.005 // 5ms per RPC
return edgeLookupCost + rpcCost
default:
return float64(stage.EstimatedCardinality)
}
}
Selectivity Estimation
func (cm *CostModel) EstimateSelectivity(filter *Filter) float64 {
stats := cm.stats.GetPropertyStats(filter.Property)
switch filter.Operator {
case OP_EQUALS:
// For equals, use histogram
return stats.Histogram.EstimateEqual(filter.Value)
case OP_GREATER_THAN:
// For range queries, use min/max
return stats.Histogram.EstimateGreaterThan(filter.Value)
case OP_IN:
// For IN queries, sum selectivities
selectivity := 0.0
for _, value := range filter.Values {
selectivity += stats.Histogram.EstimateEqual(value)
}
return selectivity
default:
// Default: Assume 10% selectivity
return 0.1
}
}
Example: Choose between index scan and full scan
Query: g.V().has('city', 'rare_city')
Statistics:
- Total vertices: 100B
- city='rare_city': 100 vertices (selectivity = 0.000001%)
Cost estimation:
Index scan cost:
- Lookup: log(100B) = 36 operations
- Results: 100 vertices
- Total: 136 operations
Full scan cost:
- Scan: 100B vertices
- Total: 100B operations
Decision: Use index scan (136 vs 100B = 735,000,000× faster)
Filter Ordering Optimization
func (qo *QueryOptimizer) ReorderFilters(filters []*Filter) []*Filter {
// Calculate selectivity for each filter
selectivities := make(map[*Filter]float64)
for _, filter := range filters {
selectivities[filter] = qo.EstimateSelectivity(filter)
}
// Sort by selectivity (most selective first)
sort.Slice(filters, func(i, j int) bool {
return selectivities[filters[i]] < selectivities[filters[j]]
})
// Prefer indexed filters
sort.SliceStable(filters, func(i, j int) bool {
indexI := qo.indexManager.HasIndex(filters[i].Property)
indexJ := qo.indexManager.HasIndex(filters[j].Property)
return indexI && !indexJ
})
return filters
}
Example:
// Original query
g.V().has('country', 'USA').has('city', 'SF').has('age', gt(30))
// Statistics
country='USA': 30B vertices (30% selectivity)
city='SF': 5M vertices (0.005% selectivity)
age>30: 50B vertices (50% selectivity)
// Optimized order (most selective first)
g.V().has('city', 'SF').has('age', gt(30)).has('country', 'USA')
^^^^^^^^^^^^^^^ Most selective (0.005%)
^^^^^^^^^^^^^^^^^^^ Medium (50%)
^^^^^^^^^^^^^^^^^^ Least (30%)
Result: Filter 99.995% of vertices early, avoid processing 30B vertices
Adaptive Parallelism
Dynamic Parallelism Adjustment
type AdaptiveExecutor struct {
maxParallelism int
}
func (ae *AdaptiveExecutor) ExecuteWithAdaptiveParallelism(
stage *ExecutionStage,
inputVertices chan *Vertex,
) (chan *Vertex, error) {
// Estimate input size
estimatedInputSize := stage.EstimatedCardinality
// Adjust parallelism based on input size
parallelism := ae.CalculateOptimalParallelism(estimatedInputSize)
// Execute with calculated parallelism
return ae.ExecuteParallel(stage, inputVertices, parallelism)
}
func (ae *AdaptiveExecutor) CalculateOptimalParallelism(inputSize int64) int {
switch {
case inputSize < 1000:
// Small input: Sequential execution
return 1
case inputSize < 100000:
// Medium input: Moderate parallelism
return 10
case inputSize < 10000000:
// Large input: High parallelism
return 100
default:
// Very large input: Maximum parallelism
return ae.maxParallelism // 1000
}
}
Example:
Query: g.V('user:alice').out('FOLLOWS').out('FOLLOWS')
Stage 1: V('user:alice')
Input: 1 vertex
Parallelism: 1 (sequential)
Output: 1 vertex
Stage 2: out('FOLLOWS')
Input: 1 vertex
Output: 200 friends
Parallelism: 1 (small output)
Stage 3: out('FOLLOWS')
Input: 200 vertices
Output: 40,000 friends-of-friends
Parallelism: 100 (large output, parallel execution)
Result Streaming
Streaming Protocol
message StreamingQueryRequest {
string query_id = 1;
string gremlin_query = 2;
StreamingConfig config = 3;
}
message StreamingConfig {
int32 batch_size = 1; // Vertices per stream message
int32 max_results = 2; // Early termination
bool include_properties = 3; // Include vertex properties or just IDs
int32 timeout_seconds = 4; // Query timeout
}
message StreamingQueryResponse {
string query_id = 1;
int32 batch_number = 2;
repeated Vertex vertices = 3;
bool has_more = 4;
QueryStats stats = 5;
}
message QueryStats {
int64 vertices_scanned = 1;
int64 partitions_queried = 2;
int64 total_duration_ms = 3;
}
Client-Side Streaming
// Client code
client := NewGraphClient("localhost:50051")
stream, err := client.ExecuteGremlinStream(ctx, &StreamingQueryRequest{
GremlinQuery: "g.V().hasLabel('User').has('city', 'SF')",
Config: &StreamingConfig{
BatchSize: 1000,
MaxResults: 10000,
IncludeProperties: true,
},
})
for {
response, err := stream.Recv()
if err == io.EOF {
break // End of stream
}
// Process batch of vertices
for _, vertex := range response.Vertices {
fmt.Printf("Vertex: %s\n", vertex.ID)
}
if !response.HasMore {
break
}
}
Authorization Integration (RFC-061)
Query-Time Authorization
func (qc *QueryCoordinator) ApplyAuthorizationFilters(
plan *QueryPlan,
principal *Principal,
) *QueryPlan {
// Get authorized vertex labels for principal
authorizedLabels := qc.authzManager.GetAuthorizedLabels(principal)
// Add label filter to first stage
labelFilter := &Filter{
Property: "label",
Operator: OP_IN,
Values: authorizedLabels,
}
// Inject filter at start of query plan
plan.Stages[0].AdditionalFilters = append(
plan.Stages[0].AdditionalFilters,
labelFilter,
)
return plan
}
Example:
Principal: user@example.com
Authorized labels: ['User', 'Post'] (cannot see 'AdminLog')
Query: g.V().has('created_at', gt('2025-01-01'))
Authorization filter injected:
g.V().hasLabel(within('User', 'Post')).has('created_at', gt('2025-01-01'))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Injected by authz layer
Result: User only sees User and Post vertices, not AdminLog vertices
Query Resource Limits and Runaway Prevention
Problem: At 100B scale with 1000 nodes, a single runaway query can exhaust cluster resources and impact all users (MEMO-050 Finding 4).
The Runaway Query Problem
Real-World Scenario: Celebrity with 100M followers
// Innocent-looking query
g.V().has('username', '@taylorswift').out('FOLLOWS')
// What actually happens at execution:
Step 1: Find vertex '@taylorswift' → 1 vertex (fast: 50 μs)
Step 2: Traverse out('FOLLOWS') edges → 100M vertices ❌
Memory impact:
100M vertices × 100 bytes = 10 GB per query
Parallel queries: 1000 concurrent = 10 TB memory consumed
Result: Out-of-memory crash across cluster
Without Limits: A single unbounded traversal can:
- Consume 10+ TB of memory across 1000 nodes
- Trigger cascading OOM failures
- Impact all concurrent queries
- Require cluster restart (10-30 min recovery)
Multi-Layer Resource Limits
Layer 1: Query Configuration Limits
Pre-execution validation before query starts:
query_configuration_limits:
# Timeout limits
max_query_timeout: 300s # 5 minutes maximum
default_query_timeout: 30s
min_query_timeout: 1s
# Memory limits (per query)
max_memory_per_query: 16 GB # Hard limit
memory_warning_threshold: 8 GB # Soft limit (log warning)
memory_oom_kill_threshold: 15 GB # Kill before hitting 16 GB hard limit
# Fan-out limits (breadth)
max_vertices_per_hop: 10M # Maximum vertices in intermediate result
max_total_vertices: 100M # Total vertices touched across all hops
max_edges_per_vertex: 1M # Maximum out-degree for single vertex
# Depth limits (traversal hops)
max_traversal_depth: 10 # Maximum hops in graph traversal
max_loop_iterations: 1000 # Maximum iterations in repeat() step
# Result limits
max_result_size: 1M # Maximum vertices/edges in final result
max_result_memory: 1 GB # Maximum memory for result set
# Concurrent query limits (per user)
max_concurrent_queries_per_user: 10
max_concurrent_queries_cluster: 10000 # Total across all users
Enforcement:
type QueryLimits struct {
Timeout time.Duration
MaxMemory int64
MaxVerticesPerHop int64
MaxDepth int
}
func (qp *QueryPlanner) ValidateQueryLimits(query *GremlinQuery) error {
// Estimate query complexity before execution
estimate := qp.EstimateComplexity(query)
if estimate.EstimatedVertices > qp.limits.MaxVerticesPerHop {
return QueryTooComplexError{
Message: "Query exceeds max vertices per hop",
Estimated: estimate.EstimatedVertices,
Limit: qp.limits.MaxVerticesPerHop,
Suggestion: "Add filters or use .limit() step",
}
}
if estimate.EstimatedDepth > qp.limits.MaxDepth {
return QueryTooDeepError{
Message: "Query exceeds max traversal depth",
Estimated: estimate.EstimatedDepth,
Limit: qp.limits.MaxDepth,
}
}
return nil
}
Layer 2: Pre-Execution Complexity Analysis
Estimate query cost before running:
type QueryComplexityEstimate struct {
EstimatedVertices int64
EstimatedEdges int64
EstimatedDepth int
EstimatedMemory int64
EstimatedLatency time.Duration
PartitionsPruned int
PartitionsRequired int
}
func (qp *QueryPlanner) EstimateComplexity(query *GremlinQuery) *QueryComplexityEstimate {
estimate := &QueryComplexityEstimate{}
// Parse query steps
for _, step := range query.Steps {
switch step.Type {
case "V":
// Starting vertex estimation
if step.HasFilter {
estimate.EstimatedVertices = qp.EstimateFilteredVertices(step.Filter)
} else {
estimate.EstimatedVertices = qp.TotalVertexCount // 100B (expensive!)
}
case "out", "in", "both":
// Edge traversal: Multiply by average out-degree
avgOutDegree := qp.GetAverageOutDegree(step.EdgeLabel)
estimate.EstimatedVertices *= avgOutDegree
estimate.EstimatedDepth++
case "limit":
// Apply limit to intermediate results
estimate.EstimatedVertices = min(estimate.EstimatedVertices, step.LimitValue)
case "has":
// Apply selectivity reduction
selectivity := qp.GetIndexSelectivity(step.Property, step.Value)
estimate.EstimatedVertices = int64(float64(estimate.EstimatedVertices) * selectivity)
}
}
// Estimate memory: vertices × 100 bytes per vertex
estimate.EstimatedMemory = estimate.EstimatedVertices * 100
// Estimate partitions required
estimate.PartitionsRequired = qp.EstimatePartitions(estimate.EstimatedVertices)
return estimate
}
Example Complexity Analysis:
g.V().has('username', '@taylorswift').out('FOLLOWS')
Complexity Analysis:
Step 1: V().has('username', '@taylorswift')
Estimated vertices: 1 (indexed property, O(1) lookup)
Partitions required: 1
Step 2: out('FOLLOWS')
Estimated vertices: 100M (out-degree of @taylorswift)
Estimated memory: 10 GB
Partitions required: 800 (100M vertices across cluster)
Total Complexity:
Vertices: 100M
Memory: 10 GB
Partitions: 800
Latency estimate: 45 seconds
❌ REJECTED: Exceeds max_vertices_per_hop limit (10M)
Suggested rewrite:
g.V().has('username', '@taylorswift').out('FOLLOWS').limit(10000)
Layer 3: Runtime Enforcement with Memory Tracking
Monitor query execution in real-time:
type QueryExecutionMonitor struct {
QueryID string
StartTime time.Time
MemoryUsed int64
VerticesProcessed int64
Depth int
Status QueryStatus
}
func (qe *QueryExecutor) ExecuteWithMonitoring(query *GremlinQuery) (*QueryResult, error) {
monitor := &QueryExecutionMonitor{
QueryID: uuid.New().String(),
StartTime: time.Now(),
}
// Launch goroutine to monitor query progress
ctx, cancel := context.WithTimeout(context.Background(), query.Timeout)
defer cancel()
go qe.MonitorQuery(ctx, monitor)
// Execute query with context
result, err := qe.ExecuteQuery(ctx, query, monitor)
return result, err
}
func (qe *QueryExecutor) MonitorQuery(ctx context.Context, monitor *QueryExecutionMonitor) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Check memory usage
if monitor.MemoryUsed > qe.limits.MaxMemory {
qe.KillQuery(monitor.QueryID, "Memory limit exceeded")
return
}
// Check timeout
if time.Since(monitor.StartTime) > qe.limits.Timeout {
qe.KillQuery(monitor.QueryID, "Timeout exceeded")
return
}
// Check vertex count
if monitor.VerticesProcessed > qe.limits.MaxTotalVertices {
qe.KillQuery(monitor.QueryID, "Vertex limit exceeded")
return
}
}
}
}
Layer 4: Circuit Breaker for Cascading Failures
Prevent cluster-wide impact:
type CircuitBreaker struct {
MaxFailures int
TimeWindow time.Duration
OpenDuration time.Duration
State CircuitState
FailureCount int
LastFailureTime time.Time
}
func (cb *CircuitBreaker) ExecuteQuery(query *GremlinQuery) (*QueryResult, error) {
// Check circuit state
if cb.State == CircuitOpen {
if time.Since(cb.LastFailureTime) < cb.OpenDuration {
return nil, CircuitOpenError{
Message: "Circuit breaker open, query rejected",
OpenUntil: cb.LastFailureTime.Add(cb.OpenDuration),
}
}
// Try half-open state
cb.State = CircuitHalfOpen
}
// Execute query
result, err := cb.executeQuery(query)
// Update circuit state based on result
if err != nil {
cb.RecordFailure()
if cb.FailureCount >= cb.MaxFailures {
cb.Open()
}
return nil, err
}
cb.RecordSuccess()
return result, nil
}
func (cb *CircuitBreaker) Open() {
cb.State = CircuitOpen
cb.LastFailureTime = time.Now()
// Alert ops team
log.Error("Circuit breaker opened due to high failure rate")
}
Circuit Breaker Configuration:
circuit_breaker:
max_failures: 10 # Open circuit after 10 consecutive failures
time_window: 60s # Within 60-second window
open_duration: 30s # Stay open for 30 seconds before trying half-open
half_open_requests: 5 # Test with 5 requests in half-open state
Layer 5: Query Admission Control
Priority-based queuing to prevent resource exhaustion:
type QueryAdmissionController struct {
MaxConcurrentQueries int
PriorityQueues map[QueryPriority]*QueryQueue
RunningQueries map[string]*QueryExecution
}
type QueryPriority int
const (
PriorityLow QueryPriority = iota
PriorityMedium
PriorityHigh
PriorityCritical // Admin queries, health checks
)
func (qac *QueryAdmissionController) AdmitQuery(query *GremlinQuery) error {
// Check if cluster at capacity
if len(qac.RunningQueries) >= qac.MaxConcurrentQueries {
// Queue low/medium priority queries
if query.Priority <= PriorityMedium {
return qac.EnqueueQuery(query)
}
// High priority: Preempt low priority query
if query.Priority == PriorityHigh {
if lowPriorityQuery := qac.FindLowestPriorityQuery(); lowPriorityQuery != nil {
qac.PreemptQuery(lowPriorityQuery.ID)
qac.RunQuery(query)
return nil
}
}
// Critical: Always admit (ops/health checks)
if query.Priority == PriorityCritical {
qac.RunQuery(query)
return nil
}
return QueryRejectedError{Message: "Cluster at capacity"}
}
// Capacity available: Admit query
qac.RunQuery(query)
return nil
}
Operational Metrics and Alerts
Prometheus metrics for query resource usage:
# Query execution metrics
prism_query_duration_seconds{priority="high",status="success"} 2.5
prism_query_duration_seconds{priority="low",status="killed"} 300.0
# Resource usage
prism_query_memory_bytes{query_id="abc123"} 8589934592 # 8 GB
prism_query_vertices_processed{query_id="abc123"} 5000000
# Limit violations
prism_query_limit_violations_total{type="memory"} 15
prism_query_limit_violations_total{type="timeout"} 42
prism_query_limit_violations_total{type="vertex_count"} 8
# Circuit breaker state
prism_circuit_breaker_state{cluster="0"} 1 # 0=closed, 1=open, 2=half-open
prism_circuit_breaker_failures_total{cluster="0"} 10
# Admission control
prism_queries_queued_total{priority="low"} 150
prism_queries_running_total{priority="high"} 25
prism_queries_rejected_total{reason="capacity"} 5
Alerting Rules:
alerts:
- name: HighQueryFailureRate
condition: rate(prism_query_limit_violations_total[5m]) > 10
severity: warning
message: "High query failure rate due to limit violations"
- name: CircuitBreakerOpen
condition: prism_circuit_breaker_state == 1
severity: critical
message: "Circuit breaker open - cluster under stress"
- name: QueryQueueBacklog
condition: prism_queries_queued_total{priority="high"} > 100
severity: warning
message: "High-priority queries backing up in queue"
Graceful Degradation Strategies
When cluster reaches capacity:
-
Reject Non-Essential Queries:
- Analytics queries (priority: low)
- Batch exports (priority: low)
- Dashboard refreshes (priority: medium)
-
Rate-Limit Per User:
user_rate_limits:
free_tier: 10 queries/minute
paid_tier: 1000 queries/minute
enterprise: unlimited -
Sample Large Results:
// Original query
g.V().has('type', 'User').out('FRIENDS')
// Degraded mode: Sample 10% of results
g.V().has('type', 'User').sample(0.1).out('FRIENDS') -
Return Partial Results:
Query result: 50,000 vertices (partial)
Warning: Result truncated due to cluster capacity
Suggestion: Refine query with additional filters
Example: Runaway Query Scenario
Without Limits:
g.V().out().out().out() // 3-hop traversal without filters
Step 1: V() → 100B vertices (all vertices)
Step 2: out() → 1T vertices (100B × 10 avg out-degree)
Step 3: out() → 10T vertices (1T × 10)
Step 4: out() → 100T vertices (10T × 10)
Memory: 100T vertices × 100 bytes = 10 PB ❌
Result: Cluster crash, 30-min recovery
With Limits:
Pre-execution analysis:
Estimated vertices: 10T
Estimated memory: 1 PB
❌ REJECTED: Exceeds max_total_vertices limit (100M)
Error returned to client:
QueryTooComplexError: Query exceeds vertex limit
Estimated: 10T vertices
Limit: 100M vertices
Suggestion: Add filters or use .limit() at each hop
Example rewrite:
g.V().limit(1000).out().limit(10000).out().limit(100000)
Summary
Query resource limits are mandatory at 100B scale to prevent:
- Runaway queries: Single query consuming 10+ GB memory
- Cascading failures: OOM crashes spreading across cluster
- Noisy neighbor: One user impacting all concurrent queries
Multi-layer protection:
- Configuration limits: Pre-execution validation (timeout, memory, fan-out, depth)
- Complexity analysis: Estimate cost before running (vertices, memory, latency)
- Runtime enforcement: Monitor and kill queries exceeding limits
- Circuit breaker: Prevent cluster-wide cascading failures
- Admission control: Priority-based queuing and preemption
Key metrics:
- Max memory per query: 16 GB (hard limit)
- Max vertices per hop: 10M (intermediate results)
- Max traversal depth: 10 hops
- Max result size: 1M vertices
- Circuit breaker: Open after 10 failures in 60s window
Impact: Protects 1000-node cluster from single query consuming all resources, enables safe multi-tenant operation at massive scale.
Super-Node Handling and Sampling Strategies
Problem: At 100B scale, a small fraction of vertices have extremely high degree (100M+ neighbors), causing queries to return massive result sets that exceed memory limits (MEMO-050 Finding 2).
The Celebrity Problem
Real-World Scenario: Social graph with celebrity vertices
// Innocent-looking query
g.V().has('username', '@taylorswift').out('FOLLOWS')
// What actually happens:
Step 1: Find vertex '@taylorswift' → 1 vertex (fast: 50 μs)
Step 2: Traverse out('FOLLOWS') edges → 100M vertices ❌
Memory impact:
100M vertices × 64 bytes (ID + metadata) = 6.4 GB per query
10 concurrent queries: 64 GB memory consumed
Result: Out-of-memory crash
Without Sampling: Single celebrity query consumes entire proxy memory, impacts all concurrent queries, triggers cascading failures.
Vertex Degree Classification
Power-Law Distribution: At 100B scale, vertex degrees follow power-law distribution (scale-free network):
Degree Distribution:
Normal vertices (<1k neighbors): 99.9% of vertices
Hub vertices (1k-100k neighbors): 0.09% of vertices
Super-nodes (100k-10M neighbors): 0.009% of vertices
Mega-nodes (>10M neighbors): 0.001% of vertices
Example counts at 100B scale:
Normal: 99.9B vertices
Hubs: 90M vertices
Super: 9M vertices
Mega: 100k vertices
Classification Thresholds:
vertex_degree_classification:
normal:
max_degree: 1000
strategy: full_traversal # Return all neighbors
memory_per_query: <100 KB
hub:
min_degree: 1001
max_degree: 100000
strategy: paginated_traversal # Return in batches
memory_per_query: <10 MB
super:
min_degree: 100001
max_degree: 10000000
strategy: sampling_required # Must use sampling
memory_per_query: <100 MB (sampled)
mega:
min_degree: 10000001
strategy: sampling_mandatory # No full traversal allowed
memory_per_query: <1 GB (sampled)
Sampling Strategies
Strategy 1: Random Sampling
Approach: Randomly select N neighbors from total degree
type RandomSampler struct {
SampleSize int
}
func (rs *RandomSampler) SampleNeighbors(vertexID string, degree int64) ([]string, error) {
if degree <= rs.SampleSize {
// Vertex not a super-node, return all
return rs.GetAllNeighbors(vertexID), nil
}
// Random sampling
samples := make([]string, 0, rs.SampleSize)
indices := rs.GenerateRandomIndices(degree, rs.SampleSize)
for _, idx := range indices {
neighborID := rs.GetNeighborAtIndex(vertexID, idx)
samples = append(samples, neighborID)
}
return samples, nil
}
func (rs *RandomSampler) GenerateRandomIndices(total int64, count int) []int64 {
// Reservoir sampling
indices := make([]int64, count)
for i := 0; i < count; i++ {
indices[i] = int64(i)
}
for i := count; i < int(total); i++ {
j := rand.Int63n(int64(i + 1))
if j < int64(count) {
indices[j] = int64(i)
}
}
return indices
}
Performance:
- Time complexity: O(N) for N samples
- Memory: O(N) for sample storage
- Bias: Uniform random (no bias)
Strategy 2: Top-K by Property
Approach: Return top K neighbors by property value (e.g., most recent followers, highest engagement)
type TopKSampler struct {
SampleSize int
OrderBy string // Property to sort by
Descending bool
}
func (tks *TopKSampler) SampleNeighbors(vertexID string) ([]string, error) {
// Use property index for efficient top-K
index := tks.GetPropertyIndex(vertexID, tks.OrderBy)
if tks.Descending {
return index.GetTopK(tks.SampleSize), nil
} else {
return index.GetBottomK(tks.SampleSize), nil
}
}
Use Cases:
- Most recent followers:
.out('FOLLOWS').sample(1000).orderBy('joined_at', desc) - Highest engagement:
.out('FRIENDS').sample(1000).orderBy('messages_count', desc)
Performance:
- Time complexity: O(log N) with indexed property
- Memory: O(K) for top-K storage
- Bias: Intentional (returns "most important" neighbors)
Strategy 3: HyperLogLog Cardinality Estimation
Approach: Estimate neighbor count without materializing full set
type HyperLogLogSampler struct {
Precision int // Typical: 14 (error rate: 0.8%)
}
func (hll *HyperLogLogSampler) EstimateDegree(vertexID string) (int64, error) {
// Initialize HyperLogLog counter
counter := hyperloglog.New(hll.Precision)
// Stream neighbors and add to counter
neighborStream := hll.StreamNeighbors(vertexID)
for neighborID := range neighborStream {
counter.Add([]byte(neighborID))
}
// Estimate cardinality
estimate := counter.Count()
return int64(estimate), nil
}
Use Cases:
- Degree estimation without full scan:
g.V('@taylorswift').out('FOLLOWS').count().approximate() - Cardinality for query planning: "How many neighbors does this vertex have?"
Performance:
- Space complexity: O(1) - fixed memory (16 KB for precision 14)
- Time complexity: O(N) - single pass through neighbors
- Error rate: 0.8% for precision 14
Gremlin Extensions for Sampling
Extension 1: .sample(N)
Syntax: Sample N vertices from current traversal
// Sample 1000 followers
g.V().has('username', '@taylorswift').out('FOLLOWS').sample(1000)
// Sample 10% of followers
g.V().has('username', '@taylorswift').out('FOLLOWS').sample(0.1)
Implementation:
func (qe *QueryExecutor) ExecuteSampleStep(
inputVertices []*Vertex,
sampleSize interface{}, // int or float64
) ([]*Vertex, error) {
switch s := sampleSize.(type) {
case int:
// Absolute count
return qe.SampleFixed(inputVertices, s), nil
case float64:
// Percentage
count := int(float64(len(inputVertices)) * s)
return qe.SampleFixed(inputVertices, count), nil
default:
return nil, fmt.Errorf("invalid sample size: %v", sampleSize)
}
}
func (qe *QueryExecutor) SampleFixed(vertices []*Vertex, n int) []*Vertex {
if len(vertices) <= n {
return vertices // Already smaller than sample
}
// Reservoir sampling
samples := make([]*Vertex, n)
copy(samples, vertices[:n])
for i := n; i < len(vertices); i++ {
j := rand.Intn(i + 1)
if j < n {
samples[j] = vertices[i]
}
}
return samples
}
Extension 2: .approximate()
Syntax: Use approximate algorithms for aggregations
// Approximate count (HyperLogLog)
g.V().has('username', '@taylorswift').out('FOLLOWS').count().approximate()
// Approximate distinct (HyperLogLog)
g.V().has('type', 'User').values('city').dedup().count().approximate()
Implementation:
func (qe *QueryExecutor) ExecuteApproximateCount(
inputVertices []*Vertex,
) (int64, error) {
counter := hyperloglog.New14() // Precision 14
for _, vertex := range inputVertices {
counter.Add([]byte(vertex.ID))
}
return int64(counter.Count()), nil
}
Circuit Breaker for Super-Node Queries
Automatic Detection: Detect super-node queries during execution and apply sampling automatically
type SuperNodeCircuitBreaker struct {
DegreeThreshold int64 // 100k neighbors = super-node
AutoSample bool // Automatically apply sampling
DefaultSampleSize int // 10k samples
}
func (scb *SuperNodeCircuitBreaker) ExecuteTraversal(
vertex *Vertex,
edgeLabel string,
) ([]*Vertex, error) {
// Check degree before traversal
degree := scb.GetDegree(vertex.ID, edgeLabel)
if degree > scb.DegreeThreshold {
// Super-node detected
if !scb.AutoSample {
return nil, SuperNodeError{
VertexID: vertex.ID,
Degree: degree,
Message: "Vertex exceeds degree threshold, use .sample() or .approximate()",
}
}
// Automatic sampling
log.Warnf("Super-node detected: %s (degree: %d), applying automatic sampling to %d",
vertex.ID, degree, scb.DefaultSampleSize)
return scb.SampleNeighbors(vertex.ID, scb.DefaultSampleSize), nil
}
// Normal vertex, full traversal
return scb.GetAllNeighbors(vertex.ID), nil
}
Configuration:
super_node_circuit_breaker:
enabled: true
degree_threshold: 100000 # Vertices with >100k neighbors
auto_sample: true # Automatically apply sampling
default_sample_size: 10000 # Sample 10k neighbors
rejection_mode: false # If true, reject queries instead of sampling
Performance Trade-Offs
Comparison of Sampling Strategies:
| Strategy | Memory | Time | Bias | Use Case |
|---|---|---|---|---|
| Full Traversal | O(N) | O(N) | None | Normal vertices (N < 1k) |
| Random Sampling | O(K) | O(K) | None | Unbiased sample, general purpose |
| Top-K by Property | O(K) | O(log N)* | Intentional | "Most important" neighbors |
| HyperLogLog | O(1) | O(N) | Count only | Cardinality estimation |
| Reservoir Sampling | O(K) | O(N) | None | Stream processing |
*Assumes indexed property
Example Performance: Celebrity Query
Query: g.V('@taylorswift').out('FOLLOWS')
Without sampling:
Degree: 100M followers
Memory: 6.4 GB
Time: 45 seconds
Result: OOM crash
With random sampling (10k):
Degree: 100M followers (sampled to 10k)
Memory: 640 KB
Time: 100 ms
Result: 10k sampled followers
With HyperLogLog (count only):
Degree: 100M followers
Memory: 16 KB
Time: 2 seconds
Result: Estimate 100M ± 0.8%
Trade-Off Decision Matrix:
| Query Goal | Strategy | Justification |
|---|---|---|
| Exact count | Full traversal | Must count all neighbors |
| Approximate count | HyperLogLog | 0.8% error acceptable, 99.97% memory savings |
| Representative sample | Random sampling | Unbiased, general purpose |
| Most active users | Top-K by activity | Intentional bias for engagement analysis |
| Recent interactions | Top-K by timestamp | Intentional bias for recency |
Client-Side API
Python Client:
from prism_client import GraphClient
client = GraphClient('localhost:50051')
# Sample 1000 followers
followers = client.gremlin(
"g.V().has('username', '@taylorswift').out('FOLLOWS').sample(1000)"
)
# Approximate count
count = client.gremlin(
"g.V().has('username', '@taylorswift').out('FOLLOWS').count().approximate()"
)
# Top 100 by engagement
top_followers = client.gremlin(
"""g.V().has('username', '@taylorswift')
.out('FOLLOWS')
.sample(100)
.orderBy('engagement_score', desc)"""
)
Go Client:
client := prism.NewGraphClient("localhost:50051")
// Sample with options
result, err := client.Gremlin(ctx, &prism.GremlinRequest{
Query: "g.V().has('username', '@taylorswift').out('FOLLOWS')",
SamplingOptions: &prism.SamplingOptions{
Enabled: true,
SampleSize: 1000,
Strategy: prism.SAMPLING_STRATEGY_RANDOM,
},
})
Summary
Super-node handling is mandatory at 100B scale to prevent:
- Memory exhaustion: 100M neighbors × 64 bytes = 6.4 GB per query
- Query timeouts: Full traversal of 100M neighbors takes 45+ seconds
- Cascading failures: Single celebrity query impacts all concurrent queries
Multi-strategy approach:
- Vertex classification: Identify super-nodes before query execution (normal/hub/super/mega)
- Sampling: Random, Top-K, HyperLogLog for different use cases
- Gremlin extensions:
.sample(N),.approximate()for explicit sampling - Circuit breaker: Automatic detection and sampling for super-node queries
- Performance trade-offs: Balance accuracy vs memory vs latency
Key metrics:
- Degree threshold: 100k neighbors (super-node classification)
- Default sample size: 10k neighbors (99% memory reduction)
- HyperLogLog precision: 14 (0.8% error rate, 16 KB memory)
- Circuit breaker: Auto-sample or reject queries to super-nodes
Impact: Enables safe querying of social graphs with celebrities/influencers, prevents OOM crashes, maintains sub-second query latency even for high-degree vertices.
Performance Characteristics
Query Latency by Complexity
| Query Type | Partitions | Vertices | Latency (P50) | Latency (P99) |
|---|---|---|---|---|
| Single vertex lookup | 1 | 1 | 50 μs | 200 μs |
| Property filter (indexed) | 150 | 5M | 2 s | 5 s |
| Property filter (unindexed) | 64,000 | 100B | 60 s | 180 s |
| 1-hop traversal (local) | 1 | 200 | 500 μs | 2 ms |
| 1-hop traversal (distributed) | 50 | 10k | 10 ms | 50 ms |
| 2-hop traversal | 500 | 100k | 100 ms | 500 ms |
| 3-hop traversal | 5,000 | 1M | 1 s | 5 s |
Partition Pruning Effectiveness
| Query Pattern | Partitions Queried | Pruning Ratio | Speedup |
|---|---|---|---|
| Exact property match (indexed) | 150 / 64,000 | 99.1% | 106× |
| Range query (indexed) | 2,000 / 64,000 | 87.5% | 8× |
| Label filter | 4,000 / 64,000 | 75% | 4× |
| No prunable filters | 64,000 / 64,000 | 0% | 1× |
Parallel Execution Speedup
| Input Size | Sequential Time | Parallel Time (1000 workers) | Speedup |
|---|---|---|---|
| 1k vertices | 0.1 s | 0.1 s | 1× |
| 10k vertices | 1 s | 0.5 s | 2× |
| 100k vertices | 10 s | 0.8 s | 12.5× |
| 1M vertices | 100 s | 2 s | 50× |
| 10M vertices | 1000 s | 10 s | 100× |
Query Observability and Debugging
Problem: At 100B scale with 1000-node distributed execution, debugging slow queries is challenging without deep observability. "Why did this query take 45 seconds instead of 5 seconds?" requires visibility into partition-level execution, network latency, and resource utilization.
EXPLAIN Plan (SQL-Style)
Purpose: Show query execution plan before running query, estimate costs, identify optimization opportunities.
type QueryExplainer struct {
planner *QueryPlanner
costModel *CostModel
statsManager *StatisticsManager
}
func (qe *QueryExplainer) Explain(gremlinQuery string) (*ExplainPlan, error) {
// Parse query
plan, err := qe.planner.Analyze(gremlinQuery)
if err != nil {
return nil, err
}
// Estimate costs
explainPlan := &ExplainPlan{
Query: gremlinQuery,
Stages: make([]*ExplainStage, len(plan.Stages)),
}
for i, stage := range plan.Stages {
explainPlan.Stages[i] = &ExplainStage{
StageID: i,
Type: stage.Type,
EstimatedRows: stage.EstimatedCardinality,
EstimatedCost: qe.costModel.EstimateStageCost(stage),
PartitionsQueried: len(stage.TargetPartitions),
IndexUsed: stage.UseIndex,
IndexName: stage.IndexName,
}
}
explainPlan.TotalCost = qe.costModel.EstimateQueryCost(plan)
explainPlan.TotalLatency = qe.EstimateLatency(plan)
return explainPlan, nil
}
Example: EXPLAIN Output
EXPLAIN: g.V().hasLabel('User').has('city', 'SF').out('FOLLOWS').limit(100)
Query Plan:
┌──────────────────────────────────────────────────────────────┐
│ Stage 0: Vertex Scan (hasLabel + has) │
│ Type: INDEX_SCAN │
│ Index: city_index['SF'] │
│ Estimated Rows: 5M vertices │
│ Estimated Cost: 2.5 (index lookup + scan) │
│ Partitions Queried: 150 of 64,000 (pruned 99%) │
│ Estimated Latency: 2 seconds │
└──────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Stage 1: Edge Traversal (out FOLLOWS) │
│ Type: TRAVERSAL │
│ Index: edge_index['FOLLOWS'] │
│ Estimated Rows: 50M edges (10 per vertex avg) │
│ Estimated Cost: 50.0 (edge lookup + vertex load) │
│ Partitions Queried: 500 (distributed fan-out) │
│ Estimated Latency: 5 seconds │
└──────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Stage 2: Limit (100) │
│ Type: LIMIT │
│ Estimated Rows: 100 vertices │
│ Estimated Cost: 0.1 (early termination) │
│ Partitions Queried: N/A │
│ Estimated Latency: <1 ms │
└──────────────────────────────────────────────────────────────┘
Total Estimated Cost: 52.6
Total Estimated Latency: 7 seconds
Total Partitions Touched: 650 of 64,000
Query Timeline Visualization
Purpose: Show actual execution timeline with partition-level detail, identify bottlenecks.
type QueryTimeline struct {
QueryID string
StartTime time.Time
EndTime time.Time
TotalTime time.Duration
Stages []*StageTimeline
}
type StageTimeline struct {
StageID int
StartTime time.Time
EndTime time.Time
Duration time.Duration
PartitionExec []*PartitionExecution
}
type PartitionExecution struct {
PartitionID string
StartTime time.Time
EndTime time.Time
Duration time.Duration
RowsReturned int64
}
func (qe *QueryExecutor) ExecuteWithTimeline(
ctx context.Context,
query string,
) (*QueryResult, *QueryTimeline, error) {
timeline := &QueryTimeline{
QueryID: uuid.New().String(),
StartTime: time.Now(),
}
// Execute query stages
for i, stage := range query.Stages {
stageTimeline := &StageTimeline{
StageID: i,
StartTime: time.Now(),
}
// Execute on partitions
for _, partitionID := range stage.TargetPartitions {
partExec := &PartitionExecution{
PartitionID: partitionID,
StartTime: time.Now(),
}
rows := qe.ExecuteOnPartition(partitionID, stage)
partExec.EndTime = time.Now()
partExec.Duration = partExec.EndTime.Sub(partExec.StartTime)
partExec.RowsReturned = int64(len(rows))
stageTimeline.PartitionExec = append(stageTimeline.PartitionExec, partExec)
}
stageTimeline.EndTime = time.Now()
stageTimeline.Duration = stageTimeline.EndTime.Sub(stageTimeline.StartTime)
timeline.Stages = append(timeline.Stages, stageTimeline)
}
timeline.EndTime = time.Now()
timeline.TotalTime = timeline.EndTime.Sub(timeline.StartTime)
return result, timeline, nil
}
Example: Timeline Visualization
Query Timeline: g.V().has('city', 'SF').out('FOLLOWS')
Total Time: 8.2 seconds
Stage 0: Vertex Scan (2.1 seconds)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.1s
Partitions: 150
Fastest: partition 07:0005:12 (10 ms)
Slowest: partition 09:0089:45 (450 ms) ← BOTTLENECK
Average: 14 ms
Stage 1: Edge Traversal (6.0 seconds)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.0s
Partitions: 500
Fastest: partition 01:0012:03 (8 ms)
Slowest: partition 07:0042:18 (3.2s) ← BOTTLENECK
Reason: Cold partition (loaded from S3)
Average: 12 ms
Stage 2: Limit (0.1 seconds)
━ 0.1s
Distributed Tracing (OpenTelemetry)
Purpose: Trace query execution across 1000 nodes, capture spans for each operation.
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func (qc *QueryCoordinator) ExecuteWithTracing(
ctx context.Context,
gremlinQuery string,
) (*ResultStream, error) {
// Start root span
tracer := otel.Tracer("prism-query-executor")
ctx, span := tracer.Start(ctx, "gremlin.query",
trace.WithAttributes(
attribute.String("gremlin.query", gremlinQuery),
attribute.String("query.id", uuid.New().String()),
),
)
defer span.End()
// Parse query
ctx, parseSpan := tracer.Start(ctx, "gremlin.parse")
plan, err := qc.queryAnalyzer.Analyze(gremlinQuery)
parseSpan.End()
if err != nil {
span.RecordError(err)
return nil, err
}
// Execute stages
for i, stage := range plan.Stages {
ctx, stageSpan := tracer.Start(ctx, fmt.Sprintf("stage.%d", i),
trace.WithAttributes(
attribute.String("stage.type", stage.Type.String()),
attribute.Int("stage.partitions", len(stage.TargetPartitions)),
),
)
// Execute on partitions
for _, partitionID := range stage.TargetPartitions {
ctx, partSpan := tracer.Start(ctx, "partition.execute",
trace.WithAttributes(
attribute.String("partition.id", partitionID),
),
)
results := qc.ExecuteOnPartition(ctx, partitionID, stage)
partSpan.SetAttributes(
attribute.Int("partition.rows_returned", len(results)),
)
partSpan.End()
}
stageSpan.End()
}
return results, nil
}
Trace Example in SignOz/Jaeger:
gremlin.query (8.2s)
│
├─ gremlin.parse (10ms)
│
├─ stage.0: Vertex Scan (2.1s)
│ ├─ partition.execute [07:0005:12] (10ms)
│ ├─ partition.execute [07:0012:34] (12ms)
│ └─ partition.execute [09:0089:45] (450ms) ← SLOW
│
└─ stage.1: Edge Traversal (6.0s)
├─ partition.execute [01:0012:03] (8ms)
├─ partition.execute [07:0042:18] (3.2s) ← VERY SLOW
│ ├─ s3.fetch (2.8s) ← ROOT CAUSE: Cold partition
│ └─ index.lookup (400ms)
└─ partition.execute [05:0067:89] (15ms)
Slow Query Log Configuration
slow_query_log:
enabled: true
threshold: 5000ms # Log queries taking >5 seconds
log_level: warn
output:
- file: /var/log/prism/slow_queries.log
- kafka: slow-queries-topic
- signoz: true
# Include in slow query log
include:
- query_text: true
- execution_plan: true
- timeline: true
- partition_breakdown: true
- resource_usage: true # CPU, memory, network
# Sampling (avoid overwhelming logs)
sampling:
rate: 100% # Log 100% of slow queries
Slow Query Log Example:
{
"timestamp": "2025-11-15T10:23:45Z",
"query_id": "550e8400-e29b-41d4-a716-446655440000",
"duration_ms": 8200,
"threshold_ms": 5000,
"query": "g.V().has('city', 'SF').out('FOLLOWS')",
"principal": "user@example.com",
"stages": [
{
"stage_id": 0,
"type": "INDEX_SCAN",
"duration_ms": 2100,
"partitions_queried": 150,
"rows_returned": 5000000
},
{
"stage_id": 1,
"type": "TRAVERSAL",
"duration_ms": 6000,
"partitions_queried": 500,
"rows_returned": 50000000,
"bottleneck": {
"partition_id": "07:0042:18",
"duration_ms": 3200,
"reason": "Cold partition (S3 fetch: 2.8s)"
}
}
],
"resource_usage": {
"cpu_seconds": 120,
"memory_mb": 4500,
"network_bytes_sent": 524288000
},
"recommendation": "Consider promoting partition 07:0042:18 to hot tier (accessed 15 times in last hour)"
}
Prometheus Metrics and Alerts
Metrics:
# Query latency histogram
prism_gremlin_query_duration_seconds{stage="0",partition="07:0042:18"} 3.2
# Query throughput
prism_gremlin_queries_total{status="success"} 1250000
prism_gremlin_queries_total{status="error"} 350
# Partition execution time
prism_partition_execution_duration_seconds{partition="07:0042:18"} 3.2
# Slow query count
prism_slow_queries_total{threshold="5s"} 125
Alerting Rules:
alerts:
- name: HighSlowQueryRate
condition: rate(prism_slow_queries_total[5m]) > 10
severity: warning
message: "High slow query rate: {{$value}} slow queries/min (threshold: 10)"
- name: PartitionConsistentlySlow
condition: avg_over_time(prism_partition_execution_duration_seconds{partition="07:0042:18"}[10m]) > 1.0
severity: warning
message: "Partition {{$labels.partition}} consistently slow (avg: {{$value}}s)"
- name: QueryErrorRate
condition: rate(prism_gremlin_queries_total{status="error"}[5m]) > 100
severity: critical
message: "High query error rate: {{$value}} errors/min"
Gremlin Step Support Matrix
| Gremlin Step | Supported | Optimization | Notes |
|---|---|---|---|
| V() | ✅ | Index scan | Start traversal from all vertices |
| V(id) | ✅ | Direct lookup | Start from specific vertex |
| E() | ✅ | Edge index scan | Start from all edges |
| hasLabel() | ✅ | Label index | Filter by vertex label |
| has(key, value) | ✅ | Property index | Filter by property |
| has(key, predicate) | ✅ | Range index | Filter by predicate (gt, lt, etc) |
| out() | ✅ | Edge index | Traverse outgoing edges |
| in() | ✅ | Inverted edge index | Traverse incoming edges |
| both() | ✅ | Both indexes | Traverse both directions |
| outE() | ✅ | Edge index | Get outgoing edges |
| inE() | ✅ | Inverted edge index | Get incoming edges |
| outV() | ✅ | Vertex lookup | Get source vertex from edge |
| inV() | ✅ | Vertex lookup | Get target vertex from edge |
| values() | ✅ | Property access | Get property values |
| limit() | ✅ | Early termination | Limit result set |
| range() | ✅ | Pagination | Range-based pagination |
| count() | ✅ | Cardinality estimate | Count vertices/edges |
| dedup() | ✅ | Bloom filter | Remove duplicates |
| order() | ⚠️ | Partial | Sort results (expensive) |
| group() | ⚠️ | Partial | Group by property |
| path() | ⚠️ | Limited | Path tracking (memory intensive) |
| match() | ❌ | - | Complex pattern matching (future) |
| repeat() | ⚠️ | Limited | Recursive traversal (bounded) |
Related RFCs
- RFC-057: Massive-Scale Graph Sharding - Distributed architecture
- RFC-058: Multi-Level Graph Indexing - Index-based query optimization
- RFC-059: Hot/Cold Storage Tiers - Storage tier aware query execution
- RFC-061: Graph Authorization - Vertex-level access control
- RFC-055: Graph Pattern - Base graph operations
Open Questions
- Query Caching: Should we cache query plans or query results?
- Distributed Transactions: How to support multi-vertex updates atomically?
- Query Timeouts: How to handle long-running queries gracefully?
- Result Pagination: Best pagination strategy for large result sets?
- Custom Functions: Support for user-defined Gremlin functions?
References
- Apache TinkerPop Gremlin
- JanusGraph Query Execution
- Neo4j Cypher Query Planning
- Facebook TAO: Query Optimization
- Google Pregel: Graph Processing
Revision History
- 2025-11-15: Initial draft - Distributed Gremlin query execution engine