Skip to main content

RFC-060: Distributed Gremlin Query Execution for Massive-Scale Graphs

Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15

Abstract

This RFC defines a distributed Gremlin query execution engine for massive-scale graphs (100B vertices across 1000+ nodes). Standard Gremlin implementations execute queries on single machines, which is infeasible at this scale.

This RFC presents a distributed query planner with four key capabilities:

  • Decomposes Gremlin traversals into partition-local and cross-partition operations
  • Optimizes execution plans using indexes (RFC-058)
  • Routes queries to appropriate storage tiers (RFC-059)
  • Applies vertex-level authorization filters (RFC-061)

The query executor supports the full Apache TinkerPop Gremlin specification while achieving sub-second latency for common traversals through intelligent partition pruning, parallel execution, and result streaming.

Key Innovations:

  • Query Decomposition: Split Gremlin traversals into distributed execution plan
  • Partition Pruning: Use indexes to skip irrelevant partitions (10-100× speedup)
  • Adaptive Parallelism: Dynamically adjust parallelism based on intermediate result sizes
  • Authorization Push-Down: Apply vertex label filters at partition level
  • Result Streaming: Stream results to client without materializing full result set
  • Cost-Based Optimizer: Choose execution strategy based on data statistics

Motivation

The Distributed Query Problem

Example Query: "Find all users in San Francisco who follow someone in New York"

g.V().hasLabel('User').has('city', 'San Francisco')
.out('FOLLOWS')
.has('city', 'New York')

Single-Machine Execution (impractical at 100B scale):

Step 1: Scan all vertices (100B vertices)
Step 2: Filter by label='User' and city='San Francisco'
Result: 5M vertices
Step 3: Traverse out('FOLLOWS') for each
Result: 50M edges to follow
Step 4: Filter targets by city='New York'
Result: 2M vertices

Total time: Hours (full scan of 100B vertices)

Distributed Execution (this RFC):

Step 1: Query optimizer analyzes query
- Identifies filter: city='San Francisco'
- Checks cluster-level index
- Finds: Only clusters 0, 2 have San Francisco users

Step 2: Partition pruning
- Broadcast query to clusters 0, 2 (200 proxies)
- Skip clusters 1, 3-9 (800 proxies not queried)

Step 3: Parallel execution on 200 proxies
- Each proxy: Use city_index['San Francisco'] → 25k local vertices
- Total: 5M vertices (200 proxies × 25k)
- Time: 2 seconds (parallel)

Step 4: Distributed traversal
- Fan-out to target partitions (follow edges)
- Filter by city='New York'
- Result: 2M vertices
- Time: 5 seconds (parallel RPC)

Total time: 7 seconds (vs hours)
Speedup: ~2,500×

Challenges at Massive Scale

Challenge 1: Query Explosion

Query: g.V('user:alice').out('FOLLOWS').out('FOLLOWS').out('FOLLOWS')

Hop 1: alice.out('FOLLOWS') → 200 friends
Hop 2: 200 friends × 200 friends = 40,000 friends-of-friends
Hop 3: 40,000 × 200 = 8,000,000 vertices (3rd degree)

Problem: Result set explodes exponentially with hops
Solution: Early termination, limit per hop, result streaming

Challenge 2: Cross-Partition Queries

Query: g.V().has('age', gt(30))

Without indexes:
- Broadcast query to all 1000 proxies
- Each scans local partitions
- Aggregate results
- Time: 60 seconds (network overhead dominates)

With indexes + partition pruning:
- Check cluster-level index: age > 30 in all clusters
- Cannot prune partitions (age distributed uniformly)
- Execute on all 1000 proxies BUT use age_index
- Time: 5 seconds (indexed scan vs full scan)

Challenge 3: Complex Predicate Push-Down

Query: g.V().hasLabel('User').has('age', gt(30)).has('city', 'SF').out('FOLLOWS')

Optimizer decisions:
1. Which filter to apply first? (selectivity)
2. Can we use indexes for each filter?
3. Should we materialize intermediate results?
4. Execute sequentially or in parallel?

Best plan:
1. Use city_index['SF'] → 5M vertices (high selectivity)
2. Filter age > 30 in-memory (low overhead)
3. Stream results, no materialization
4. Parallel execution on filtered vertices

Goals

  1. Full Gremlin Support: Implement Apache TinkerPop Gremlin specification
  2. Partition Pruning: Skip irrelevant partitions using indexes
  3. Parallel Execution: Execute across 1000+ nodes simultaneously
  4. Sub-Second Latency: Common queries complete in <1 second
  5. Authorization Integration: Apply vertex label filters (RFC-061)
  6. Result Streaming: Stream results without full materialization
  7. Cost-Based Optimization: Choose execution strategy based on statistics

Non-Goals

  • ACID Transactions: Distributed transactions not covered (future work)
  • Custom Gremlin Steps: Only standard Gremlin 3.x steps supported
  • Graph Algorithms: Complex algorithms (PageRank, etc.) in RFC-055
  • Schema Enforcement: Schema validation not in query engine

Gremlin Query Decomposition

Query Analysis Phase

type QueryAnalyzer struct {
indexManager *IndexManager
statsManager *StatisticsManager
}

func (qa *QueryAnalyzer) Analyze(gremlinQuery string) (*QueryPlan, error) {
// Parse Gremlin to AST
ast, err := qa.ParseGremlin(gremlinQuery)
if err != nil {
return nil, err
}

// Identify filterable steps
filters := qa.ExtractFilters(ast)

// Check index availability
indexableFilters := []Filter{}
for _, filter := range filters {
if qa.indexManager.HasIndex(filter.Property) {
indexableFilters = append(indexableFilters, filter)
}
}

// Estimate selectivity
selectivity := qa.EstimateSelectivity(filters)

// Generate execution plan
plan := qa.GenerateExecutionPlan(ast, indexableFilters, selectivity)

return plan, nil
}

Example: Parse query into steps

// Original Gremlin
g.V().hasLabel('User').has('city', 'SF').out('FOLLOWS').limit(100)

// Parsed steps
Step 1: V() - Start from all vertices
Step 2: hasLabel('User') - Filter by label
Step 3: has('city', 'SF') - Filter by property
Step 4: out('FOLLOWS') - Traverse outgoing edges
Step 5: limit(100) - Limit results

Query Plan:

message QueryPlan {
string query_id = 1;
repeated ExecutionStage stages = 2;
PartitionPruningStrategy pruning_strategy = 3;
ParallelismConfig parallelism = 4;
ResultStreamingConfig streaming = 5;
}

message ExecutionStage {
StageType type = 1;
repeated string target_partitions = 2; // Partitions to execute on
GremlinStep gremlin_step = 3;
bool use_index = 4;
string index_name = 5;
int64 estimated_cardinality = 6;
}

enum StageType {
STAGE_TYPE_UNSPECIFIED = 0;
STAGE_TYPE_INDEX_SCAN = 1; // Use index
STAGE_TYPE_FULL_SCAN = 2; // Scan all partitions
STAGE_TYPE_TRAVERSAL = 3; // Follow edges
STAGE_TYPE_FILTER = 4; // Apply predicate
STAGE_TYPE_LIMIT = 5; // Limit results
STAGE_TYPE_AGGREGATE = 6; // Group/count/etc
}

Partition Pruning Strategy

Strategy 1: Index-Based Pruning

func (qp *QueryPlanner) PrunePartitionsWithIndex(filter *Filter) []string {
// Check cluster-level index
clusterIndex := qp.GetClusterIndex(filter.Property)

// Find clusters with matching values
candidateClusters := clusterIndex.FindClusters(filter.Value)

// Find proxies within candidate clusters
candidateProxies := []string{}
for _, clusterID := range candidateClusters {
proxyIndex := qp.GetProxyIndex(clusterID, filter.Property)
proxies := proxyIndex.FindProxies(filter.Value)
candidateProxies = append(candidateProxies, proxies...)
}

// Find partitions within candidate proxies
candidatePartitions := []string{}
for _, proxyID := range candidateProxies {
partitionIndex := qp.GetPartitionIndex(proxyID, filter.Property)
partitions := partitionIndex.FindPartitions(filter.Value)
candidatePartitions = append(candidatePartitions, partitions...)
}

return candidatePartitions
}

Example:

Query: g.V().has('city', 'San Francisco')

Without pruning:
- Query all 64,000 partitions (1000 proxies × 64 partitions)

With pruning:
- Cluster index: SF in clusters 0, 2 (2 of 10)
- Proxy index: SF in 50 proxies (of 200 in clusters 0, 2)
- Partition index: SF in 150 partitions (of 800 in 50 proxies)
- Result: Query 150 partitions instead of 64,000
- Speedup: 106×

Strategy 2: Label-Based Pruning

func (qp *QueryPlanner) PrunePartitionsByLabel(label string) []string {
// Check if using label-based partitioning (RFC-057)
if qp.PartitionStrategy == STRATEGY_LABEL_BASED {
// All vertices with label are on specific cluster
clusterID := qp.LabelToClusterMapping[label]
return qp.GetAllPartitionsInCluster(clusterID)
}

// Otherwise, cannot prune (label distributed across all partitions)
return qp.GetAllPartitions()
}

Query Execution Stages

Stage 1: Initial Vertex Selection

Gremlin: g.V().hasLabel('User').has('city', 'SF')

Execution:
1. Use partition pruning to identify 150 partitions
2. Parallel execution on 150 partitions
3. Each partition:
a. Check if city_index exists
b. If yes: city_index['SF'] → vertex IDs
c. If no: Full scan with filter
4. Aggregate results
5. Stream to next stage (no materialization)

Stage 2: Edge Traversal

Gremlin: .out('FOLLOWS')

Execution:
1. Receive vertex IDs from stage 1 (streaming)
2. For each vertex batch (1000 vertices):
a. Group by target partition
b. Parallel fan-out to target partitions
c. Load edge index for each vertex
d. Return target vertex IDs
3. Stream results to next stage

Stage 3: Property Filtering

Gremlin: .has('age', gt(30))

Execution:
1. Receive vertex IDs from stage 2 (streaming)
2. For each vertex batch:
a. Load vertex properties from appropriate tier (hot/warm/cold)
b. Apply filter: age > 30
c. Return matching vertices
3. Stream results to next stage

Stage 4: Result Limiting

Gremlin: .limit(100)

Execution:
1. Receive vertices from stage 3 (streaming)
2. Keep first 100 vertices
3. Cancel upstream stages (early termination)
4. Return results to client

Distributed Execution Engine

Query Coordinator

type QueryCoordinator struct {
queryAnalyzer *QueryAnalyzer
partitionManager *PartitionManager
indexManager *IndexManager
authzManager *AuthorizationManager // RFC-061
}

func (qc *QueryCoordinator) ExecuteGremlin(
ctx context.Context,
gremlinQuery string,
principal *Principal,
) (*ResultStream, error) {
// Step 1: Parse and analyze query
plan, err := qc.queryAnalyzer.Analyze(gremlinQuery)
if err != nil {
return nil, err
}

// Step 2: Apply authorization filters (RFC-061)
plan = qc.authzManager.ApplyAuthzFilters(plan, principal)

// Step 3: Create result stream
resultStream := NewResultStream()

// Step 4: Execute stages in pipeline
go func() {
defer resultStream.Close()

var currentResults chan *Vertex
for _, stage := range plan.Stages {
currentResults = qc.ExecuteStage(ctx, stage, currentResults)
}

// Drain final results to stream
for vertex := range currentResults {
resultStream.Send(vertex)
}
}()

return resultStream, nil
}

Partition Executor

type PartitionExecutor struct {
partition *Partition
}

func (pe *PartitionExecutor) ExecuteStep(
ctx context.Context,
step *GremlinStep,
inputVertices []*Vertex,
) ([]*Vertex, error) {
switch step.Type {
case STEP_HAS_LABEL:
return pe.FilterByLabel(inputVertices, step.Label)

case STEP_HAS_PROPERTY:
return pe.FilterByProperty(inputVertices, step.Property, step.Value)

case STEP_OUT:
return pe.TraverseOut(inputVertices, step.EdgeLabel)

case STEP_IN:
return pe.TraverseIn(inputVertices, step.EdgeLabel)

case STEP_BOTH:
return pe.TraverseBoth(inputVertices, step.EdgeLabel)

default:
return nil, fmt.Errorf("unsupported step: %v", step.Type)
}
}

func (pe *PartitionExecutor) FilterByProperty(
vertices []*Vertex,
property string,
value interface{},
) ([]*Vertex, error) {
// Check if index available
index := pe.partition.GetIndex(property)
if index != nil {
// Fast path: Use index
vertexIDs := index.Lookup(value)
return pe.LoadVertices(vertexIDs), nil
}

// Slow path: Filter in-memory
filtered := []*Vertex{}
for _, v := range vertices {
if v.Properties[property] == value {
filtered = append(filtered, v)
}
}

return filtered, nil
}

func (pe *PartitionExecutor) TraverseOut(
vertices []*Vertex,
edgeLabel string,
) ([]*Vertex, error) {
targets := []*Vertex{}

for _, v := range vertices {
// Get outgoing edges from index
edgeIndex := pe.partition.GetEdgeIndex(edgeLabel)
targetIDs := edgeIndex.GetOutgoing(v.ID)

// Load target vertices
targetVertices := pe.LoadVertices(targetIDs)
targets = append(targets, targetVertices...)
}

return targets, nil
}

Cross-Partition Coordinator

type CrossPartitionCoordinator struct {
proxies map[string]*ProxyClient
}

func (cpc *CrossPartitionCoordinator) ExecuteDistributed(
ctx context.Context,
stage *ExecutionStage,
inputVertices chan *Vertex,
) (chan *Vertex, error) {
outputVertices := make(chan *Vertex, 10000)

// Group input vertices by partition
vertexsByPartition := make(map[string][]*Vertex)

go func() {
for vertex := range inputVertices {
partitionID := cpc.GetPartitionForVertex(vertex.ID)
vertexsByPartition[partitionID] = append(vertexsByPartition[partitionID], vertex)

// Flush when batch full
if len(vertexsByPartition[partitionID]) >= 1000 {
cpc.ExecuteOnPartition(partitionID, stage, vertexsByPartition[partitionID], outputVertices)
vertexsByPartition[partitionID] = []*Vertex{}
}
}

// Flush remaining
for partitionID, vertices := range vertexsByPartition {
if len(vertices) > 0 {
cpc.ExecuteOnPartition(partitionID, stage, vertices, outputVertices)
}
}

close(outputVertices)
}()

return outputVertices, nil
}

func (cpc *CrossPartitionCoordinator) ExecuteOnPartition(
partitionID string,
stage *ExecutionStage,
vertices []*Vertex,
output chan *Vertex,
) {
// Get proxy client for partition
proxyID := cpc.GetProxyForPartition(partitionID)
client := cpc.proxies[proxyID]

// Execute stage on remote partition
results, err := client.ExecuteStep(context.Background(), &ExecuteStepRequest{
PartitionID: partitionID,
Step: stage.GremlinStep,
InputVertices: vertices,
})

if err != nil {
log.Errorf("Failed to execute on partition %s: %v", partitionID, err)
return
}

// Stream results to output channel
for _, vertex := range results.Vertices {
output <- vertex
}
}

Query Optimization

Cost-Based Optimizer

type CostModel struct {
stats *StatisticsManager
}

func (cm *CostModel) EstimateQueryCost(plan *QueryPlan) float64 {
totalCost := 0.0

for _, stage := range plan.Stages {
stageCost := cm.EstimateStageCost(stage)
totalCost += stageCost
}

return totalCost
}

func (cm *CostModel) EstimateStageCost(stage *ExecutionStage) float64 {
switch stage.Type {
case STAGE_TYPE_INDEX_SCAN:
// Index scan cost: O(log N) lookup + O(M) results
lookupCost := math.Log(float64(cm.stats.TotalVertices))
scanCost := float64(stage.EstimatedCardinality)
return lookupCost + scanCost

case STAGE_TYPE_FULL_SCAN:
// Full scan cost: O(N) where N = vertices in partitions
return float64(len(stage.TargetPartitions)) * cm.stats.AvgVerticesPerPartition

case STAGE_TYPE_TRAVERSAL:
// Traversal cost: O(M) edge lookups + O(P) cross-partition RPCs
edgeLookupCost := float64(stage.EstimatedCardinality)
rpcCost := float64(len(stage.TargetPartitions)) * 0.005 // 5ms per RPC
return edgeLookupCost + rpcCost

default:
return float64(stage.EstimatedCardinality)
}
}

Selectivity Estimation

func (cm *CostModel) EstimateSelectivity(filter *Filter) float64 {
stats := cm.stats.GetPropertyStats(filter.Property)

switch filter.Operator {
case OP_EQUALS:
// For equals, use histogram
return stats.Histogram.EstimateEqual(filter.Value)

case OP_GREATER_THAN:
// For range queries, use min/max
return stats.Histogram.EstimateGreaterThan(filter.Value)

case OP_IN:
// For IN queries, sum selectivities
selectivity := 0.0
for _, value := range filter.Values {
selectivity += stats.Histogram.EstimateEqual(value)
}
return selectivity

default:
// Default: Assume 10% selectivity
return 0.1
}
}

Example: Choose between index scan and full scan

Query: g.V().has('city', 'rare_city')

Statistics:
- Total vertices: 100B
- city='rare_city': 100 vertices (selectivity = 0.000001%)

Cost estimation:
Index scan cost:
- Lookup: log(100B) = 36 operations
- Results: 100 vertices
- Total: 136 operations

Full scan cost:
- Scan: 100B vertices
- Total: 100B operations

Decision: Use index scan (136 vs 100B = 735,000,000× faster)

Filter Ordering Optimization

func (qo *QueryOptimizer) ReorderFilters(filters []*Filter) []*Filter {
// Calculate selectivity for each filter
selectivities := make(map[*Filter]float64)
for _, filter := range filters {
selectivities[filter] = qo.EstimateSelectivity(filter)
}

// Sort by selectivity (most selective first)
sort.Slice(filters, func(i, j int) bool {
return selectivities[filters[i]] < selectivities[filters[j]]
})

// Prefer indexed filters
sort.SliceStable(filters, func(i, j int) bool {
indexI := qo.indexManager.HasIndex(filters[i].Property)
indexJ := qo.indexManager.HasIndex(filters[j].Property)
return indexI && !indexJ
})

return filters
}

Example:

// Original query
g.V().has('country', 'USA').has('city', 'SF').has('age', gt(30))

// Statistics
country='USA': 30B vertices (30% selectivity)
city='SF': 5M vertices (0.005% selectivity)
age>30: 50B vertices (50% selectivity)

// Optimized order (most selective first)
g.V().has('city', 'SF').has('age', gt(30)).has('country', 'USA')
^^^^^^^^^^^^^^^ Most selective (0.005%)
^^^^^^^^^^^^^^^^^^^ Medium (50%)
^^^^^^^^^^^^^^^^^^ Least (30%)

Result: Filter 99.995% of vertices early, avoid processing 30B vertices

Adaptive Parallelism

Dynamic Parallelism Adjustment

type AdaptiveExecutor struct {
maxParallelism int
}

func (ae *AdaptiveExecutor) ExecuteWithAdaptiveParallelism(
stage *ExecutionStage,
inputVertices chan *Vertex,
) (chan *Vertex, error) {
// Estimate input size
estimatedInputSize := stage.EstimatedCardinality

// Adjust parallelism based on input size
parallelism := ae.CalculateOptimalParallelism(estimatedInputSize)

// Execute with calculated parallelism
return ae.ExecuteParallel(stage, inputVertices, parallelism)
}

func (ae *AdaptiveExecutor) CalculateOptimalParallelism(inputSize int64) int {
switch {
case inputSize < 1000:
// Small input: Sequential execution
return 1

case inputSize < 100000:
// Medium input: Moderate parallelism
return 10

case inputSize < 10000000:
// Large input: High parallelism
return 100

default:
// Very large input: Maximum parallelism
return ae.maxParallelism // 1000
}
}

Example:

Query: g.V('user:alice').out('FOLLOWS').out('FOLLOWS')

Stage 1: V('user:alice')
Input: 1 vertex
Parallelism: 1 (sequential)
Output: 1 vertex

Stage 2: out('FOLLOWS')
Input: 1 vertex
Output: 200 friends
Parallelism: 1 (small output)

Stage 3: out('FOLLOWS')
Input: 200 vertices
Output: 40,000 friends-of-friends
Parallelism: 100 (large output, parallel execution)

Result Streaming

Streaming Protocol

message StreamingQueryRequest {
string query_id = 1;
string gremlin_query = 2;
StreamingConfig config = 3;
}

message StreamingConfig {
int32 batch_size = 1; // Vertices per stream message
int32 max_results = 2; // Early termination
bool include_properties = 3; // Include vertex properties or just IDs
int32 timeout_seconds = 4; // Query timeout
}

message StreamingQueryResponse {
string query_id = 1;
int32 batch_number = 2;
repeated Vertex vertices = 3;
bool has_more = 4;
QueryStats stats = 5;
}

message QueryStats {
int64 vertices_scanned = 1;
int64 partitions_queried = 2;
int64 total_duration_ms = 3;
}

Client-Side Streaming

// Client code
client := NewGraphClient("localhost:50051")

stream, err := client.ExecuteGremlinStream(ctx, &StreamingQueryRequest{
GremlinQuery: "g.V().hasLabel('User').has('city', 'SF')",
Config: &StreamingConfig{
BatchSize: 1000,
MaxResults: 10000,
IncludeProperties: true,
},
})

for {
response, err := stream.Recv()
if err == io.EOF {
break // End of stream
}

// Process batch of vertices
for _, vertex := range response.Vertices {
fmt.Printf("Vertex: %s\n", vertex.ID)
}

if !response.HasMore {
break
}
}

Authorization Integration (RFC-061)

Query-Time Authorization

func (qc *QueryCoordinator) ApplyAuthorizationFilters(
plan *QueryPlan,
principal *Principal,
) *QueryPlan {
// Get authorized vertex labels for principal
authorizedLabels := qc.authzManager.GetAuthorizedLabels(principal)

// Add label filter to first stage
labelFilter := &Filter{
Property: "label",
Operator: OP_IN,
Values: authorizedLabels,
}

// Inject filter at start of query plan
plan.Stages[0].AdditionalFilters = append(
plan.Stages[0].AdditionalFilters,
labelFilter,
)

return plan
}

Example:

Principal: user@example.com
Authorized labels: ['User', 'Post'] (cannot see 'AdminLog')

Query: g.V().has('created_at', gt('2025-01-01'))

Authorization filter injected:
g.V().hasLabel(within('User', 'Post')).has('created_at', gt('2025-01-01'))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Injected by authz layer

Result: User only sees User and Post vertices, not AdminLog vertices

Query Resource Limits and Runaway Prevention

Problem: At 100B scale with 1000 nodes, a single runaway query can exhaust cluster resources and impact all users (MEMO-050 Finding 4).

The Runaway Query Problem

Real-World Scenario: Celebrity with 100M followers

// Innocent-looking query
g.V().has('username', '@taylorswift').out('FOLLOWS')

// What actually happens at execution:
Step 1: Find vertex '@taylorswift' → 1 vertex (fast: 50 μs)
Step 2: Traverse out('FOLLOWS') edges → 100M vertices ❌

Memory impact:
100M vertices × 100 bytes = 10 GB per query
Parallel queries: 1000 concurrent = 10 TB memory consumed
Result: Out-of-memory crash across cluster

Without Limits: A single unbounded traversal can:

  • Consume 10+ TB of memory across 1000 nodes
  • Trigger cascading OOM failures
  • Impact all concurrent queries
  • Require cluster restart (10-30 min recovery)

Multi-Layer Resource Limits

Layer 1: Query Configuration Limits

Pre-execution validation before query starts:

query_configuration_limits:
# Timeout limits
max_query_timeout: 300s # 5 minutes maximum
default_query_timeout: 30s
min_query_timeout: 1s

# Memory limits (per query)
max_memory_per_query: 16 GB # Hard limit
memory_warning_threshold: 8 GB # Soft limit (log warning)
memory_oom_kill_threshold: 15 GB # Kill before hitting 16 GB hard limit

# Fan-out limits (breadth)
max_vertices_per_hop: 10M # Maximum vertices in intermediate result
max_total_vertices: 100M # Total vertices touched across all hops
max_edges_per_vertex: 1M # Maximum out-degree for single vertex

# Depth limits (traversal hops)
max_traversal_depth: 10 # Maximum hops in graph traversal
max_loop_iterations: 1000 # Maximum iterations in repeat() step

# Result limits
max_result_size: 1M # Maximum vertices/edges in final result
max_result_memory: 1 GB # Maximum memory for result set

# Concurrent query limits (per user)
max_concurrent_queries_per_user: 10
max_concurrent_queries_cluster: 10000 # Total across all users

Enforcement:

type QueryLimits struct {
Timeout time.Duration
MaxMemory int64
MaxVerticesPerHop int64
MaxDepth int
}

func (qp *QueryPlanner) ValidateQueryLimits(query *GremlinQuery) error {
// Estimate query complexity before execution
estimate := qp.EstimateComplexity(query)

if estimate.EstimatedVertices > qp.limits.MaxVerticesPerHop {
return QueryTooComplexError{
Message: "Query exceeds max vertices per hop",
Estimated: estimate.EstimatedVertices,
Limit: qp.limits.MaxVerticesPerHop,
Suggestion: "Add filters or use .limit() step",
}
}

if estimate.EstimatedDepth > qp.limits.MaxDepth {
return QueryTooDeepError{
Message: "Query exceeds max traversal depth",
Estimated: estimate.EstimatedDepth,
Limit: qp.limits.MaxDepth,
}
}

return nil
}

Layer 2: Pre-Execution Complexity Analysis

Estimate query cost before running:

type QueryComplexityEstimate struct {
EstimatedVertices int64
EstimatedEdges int64
EstimatedDepth int
EstimatedMemory int64
EstimatedLatency time.Duration
PartitionsPruned int
PartitionsRequired int
}

func (qp *QueryPlanner) EstimateComplexity(query *GremlinQuery) *QueryComplexityEstimate {
estimate := &QueryComplexityEstimate{}

// Parse query steps
for _, step := range query.Steps {
switch step.Type {
case "V":
// Starting vertex estimation
if step.HasFilter {
estimate.EstimatedVertices = qp.EstimateFilteredVertices(step.Filter)
} else {
estimate.EstimatedVertices = qp.TotalVertexCount // 100B (expensive!)
}

case "out", "in", "both":
// Edge traversal: Multiply by average out-degree
avgOutDegree := qp.GetAverageOutDegree(step.EdgeLabel)
estimate.EstimatedVertices *= avgOutDegree
estimate.EstimatedDepth++

case "limit":
// Apply limit to intermediate results
estimate.EstimatedVertices = min(estimate.EstimatedVertices, step.LimitValue)

case "has":
// Apply selectivity reduction
selectivity := qp.GetIndexSelectivity(step.Property, step.Value)
estimate.EstimatedVertices = int64(float64(estimate.EstimatedVertices) * selectivity)
}
}

// Estimate memory: vertices × 100 bytes per vertex
estimate.EstimatedMemory = estimate.EstimatedVertices * 100

// Estimate partitions required
estimate.PartitionsRequired = qp.EstimatePartitions(estimate.EstimatedVertices)

return estimate
}

Example Complexity Analysis:

g.V().has('username', '@taylorswift').out('FOLLOWS')
Complexity Analysis:
Step 1: V().has('username', '@taylorswift')
Estimated vertices: 1 (indexed property, O(1) lookup)
Partitions required: 1

Step 2: out('FOLLOWS')
Estimated vertices: 100M (out-degree of @taylorswift)
Estimated memory: 10 GB
Partitions required: 800 (100M vertices across cluster)

Total Complexity:
Vertices: 100M
Memory: 10 GB
Partitions: 800
Latency estimate: 45 seconds

❌ REJECTED: Exceeds max_vertices_per_hop limit (10M)

Suggested rewrite:
g.V().has('username', '@taylorswift').out('FOLLOWS').limit(10000)

Layer 3: Runtime Enforcement with Memory Tracking

Monitor query execution in real-time:

type QueryExecutionMonitor struct {
QueryID string
StartTime time.Time
MemoryUsed int64
VerticesProcessed int64
Depth int
Status QueryStatus
}

func (qe *QueryExecutor) ExecuteWithMonitoring(query *GremlinQuery) (*QueryResult, error) {
monitor := &QueryExecutionMonitor{
QueryID: uuid.New().String(),
StartTime: time.Now(),
}

// Launch goroutine to monitor query progress
ctx, cancel := context.WithTimeout(context.Background(), query.Timeout)
defer cancel()

go qe.MonitorQuery(ctx, monitor)

// Execute query with context
result, err := qe.ExecuteQuery(ctx, query, monitor)

return result, err
}

func (qe *QueryExecutor) MonitorQuery(ctx context.Context, monitor *QueryExecutionMonitor) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Check memory usage
if monitor.MemoryUsed > qe.limits.MaxMemory {
qe.KillQuery(monitor.QueryID, "Memory limit exceeded")
return
}

// Check timeout
if time.Since(monitor.StartTime) > qe.limits.Timeout {
qe.KillQuery(monitor.QueryID, "Timeout exceeded")
return
}

// Check vertex count
if monitor.VerticesProcessed > qe.limits.MaxTotalVertices {
qe.KillQuery(monitor.QueryID, "Vertex limit exceeded")
return
}
}
}
}

Layer 4: Circuit Breaker for Cascading Failures

Prevent cluster-wide impact:

type CircuitBreaker struct {
MaxFailures int
TimeWindow time.Duration
OpenDuration time.Duration
State CircuitState
FailureCount int
LastFailureTime time.Time
}

func (cb *CircuitBreaker) ExecuteQuery(query *GremlinQuery) (*QueryResult, error) {
// Check circuit state
if cb.State == CircuitOpen {
if time.Since(cb.LastFailureTime) < cb.OpenDuration {
return nil, CircuitOpenError{
Message: "Circuit breaker open, query rejected",
OpenUntil: cb.LastFailureTime.Add(cb.OpenDuration),
}
}
// Try half-open state
cb.State = CircuitHalfOpen
}

// Execute query
result, err := cb.executeQuery(query)

// Update circuit state based on result
if err != nil {
cb.RecordFailure()
if cb.FailureCount >= cb.MaxFailures {
cb.Open()
}
return nil, err
}

cb.RecordSuccess()
return result, nil
}

func (cb *CircuitBreaker) Open() {
cb.State = CircuitOpen
cb.LastFailureTime = time.Now()
// Alert ops team
log.Error("Circuit breaker opened due to high failure rate")
}

Circuit Breaker Configuration:

circuit_breaker:
max_failures: 10 # Open circuit after 10 consecutive failures
time_window: 60s # Within 60-second window
open_duration: 30s # Stay open for 30 seconds before trying half-open
half_open_requests: 5 # Test with 5 requests in half-open state

Layer 5: Query Admission Control

Priority-based queuing to prevent resource exhaustion:

type QueryAdmissionController struct {
MaxConcurrentQueries int
PriorityQueues map[QueryPriority]*QueryQueue
RunningQueries map[string]*QueryExecution
}

type QueryPriority int

const (
PriorityLow QueryPriority = iota
PriorityMedium
PriorityHigh
PriorityCritical // Admin queries, health checks
)

func (qac *QueryAdmissionController) AdmitQuery(query *GremlinQuery) error {
// Check if cluster at capacity
if len(qac.RunningQueries) >= qac.MaxConcurrentQueries {
// Queue low/medium priority queries
if query.Priority <= PriorityMedium {
return qac.EnqueueQuery(query)
}

// High priority: Preempt low priority query
if query.Priority == PriorityHigh {
if lowPriorityQuery := qac.FindLowestPriorityQuery(); lowPriorityQuery != nil {
qac.PreemptQuery(lowPriorityQuery.ID)
qac.RunQuery(query)
return nil
}
}

// Critical: Always admit (ops/health checks)
if query.Priority == PriorityCritical {
qac.RunQuery(query)
return nil
}

return QueryRejectedError{Message: "Cluster at capacity"}
}

// Capacity available: Admit query
qac.RunQuery(query)
return nil
}

Operational Metrics and Alerts

Prometheus metrics for query resource usage:

# Query execution metrics
prism_query_duration_seconds{priority="high",status="success"} 2.5
prism_query_duration_seconds{priority="low",status="killed"} 300.0

# Resource usage
prism_query_memory_bytes{query_id="abc123"} 8589934592 # 8 GB
prism_query_vertices_processed{query_id="abc123"} 5000000

# Limit violations
prism_query_limit_violations_total{type="memory"} 15
prism_query_limit_violations_total{type="timeout"} 42
prism_query_limit_violations_total{type="vertex_count"} 8

# Circuit breaker state
prism_circuit_breaker_state{cluster="0"} 1 # 0=closed, 1=open, 2=half-open
prism_circuit_breaker_failures_total{cluster="0"} 10

# Admission control
prism_queries_queued_total{priority="low"} 150
prism_queries_running_total{priority="high"} 25
prism_queries_rejected_total{reason="capacity"} 5

Alerting Rules:

alerts:
- name: HighQueryFailureRate
condition: rate(prism_query_limit_violations_total[5m]) > 10
severity: warning
message: "High query failure rate due to limit violations"

- name: CircuitBreakerOpen
condition: prism_circuit_breaker_state == 1
severity: critical
message: "Circuit breaker open - cluster under stress"

- name: QueryQueueBacklog
condition: prism_queries_queued_total{priority="high"} > 100
severity: warning
message: "High-priority queries backing up in queue"

Graceful Degradation Strategies

When cluster reaches capacity:

  1. Reject Non-Essential Queries:

    • Analytics queries (priority: low)
    • Batch exports (priority: low)
    • Dashboard refreshes (priority: medium)
  2. Rate-Limit Per User:

    user_rate_limits:
    free_tier: 10 queries/minute
    paid_tier: 1000 queries/minute
    enterprise: unlimited
  3. Sample Large Results:

    // Original query
    g.V().has('type', 'User').out('FRIENDS')

    // Degraded mode: Sample 10% of results
    g.V().has('type', 'User').sample(0.1).out('FRIENDS')
  4. Return Partial Results:

    Query result: 50,000 vertices (partial)
    Warning: Result truncated due to cluster capacity
    Suggestion: Refine query with additional filters

Example: Runaway Query Scenario

Without Limits:

g.V().out().out().out()  // 3-hop traversal without filters
Step 1: V() → 100B vertices (all vertices)
Step 2: out() → 1T vertices (100B × 10 avg out-degree)
Step 3: out() → 10T vertices (1T × 10)
Step 4: out() → 100T vertices (10T × 10)

Memory: 100T vertices × 100 bytes = 10 PB ❌
Result: Cluster crash, 30-min recovery

With Limits:

Pre-execution analysis:
Estimated vertices: 10T
Estimated memory: 1 PB
❌ REJECTED: Exceeds max_total_vertices limit (100M)

Error returned to client:
QueryTooComplexError: Query exceeds vertex limit
Estimated: 10T vertices
Limit: 100M vertices
Suggestion: Add filters or use .limit() at each hop

Example rewrite:
g.V().limit(1000).out().limit(10000).out().limit(100000)

Summary

Query resource limits are mandatory at 100B scale to prevent:

  • Runaway queries: Single query consuming 10+ GB memory
  • Cascading failures: OOM crashes spreading across cluster
  • Noisy neighbor: One user impacting all concurrent queries

Multi-layer protection:

  1. Configuration limits: Pre-execution validation (timeout, memory, fan-out, depth)
  2. Complexity analysis: Estimate cost before running (vertices, memory, latency)
  3. Runtime enforcement: Monitor and kill queries exceeding limits
  4. Circuit breaker: Prevent cluster-wide cascading failures
  5. Admission control: Priority-based queuing and preemption

Key metrics:

  • Max memory per query: 16 GB (hard limit)
  • Max vertices per hop: 10M (intermediate results)
  • Max traversal depth: 10 hops
  • Max result size: 1M vertices
  • Circuit breaker: Open after 10 failures in 60s window

Impact: Protects 1000-node cluster from single query consuming all resources, enables safe multi-tenant operation at massive scale.

Super-Node Handling and Sampling Strategies

Problem: At 100B scale, a small fraction of vertices have extremely high degree (100M+ neighbors), causing queries to return massive result sets that exceed memory limits (MEMO-050 Finding 2).

The Celebrity Problem

Real-World Scenario: Social graph with celebrity vertices

// Innocent-looking query
g.V().has('username', '@taylorswift').out('FOLLOWS')

// What actually happens:
Step 1: Find vertex '@taylorswift' → 1 vertex (fast: 50 μs)
Step 2: Traverse out('FOLLOWS') edges → 100M vertices ❌

Memory impact:
100M vertices × 64 bytes (ID + metadata) = 6.4 GB per query
10 concurrent queries: 64 GB memory consumed
Result: Out-of-memory crash

Without Sampling: Single celebrity query consumes entire proxy memory, impacts all concurrent queries, triggers cascading failures.

Vertex Degree Classification

Power-Law Distribution: At 100B scale, vertex degrees follow power-law distribution (scale-free network):

Degree Distribution:
Normal vertices (<1k neighbors): 99.9% of vertices
Hub vertices (1k-100k neighbors): 0.09% of vertices
Super-nodes (100k-10M neighbors): 0.009% of vertices
Mega-nodes (>10M neighbors): 0.001% of vertices

Example counts at 100B scale:
Normal: 99.9B vertices
Hubs: 90M vertices
Super: 9M vertices
Mega: 100k vertices

Classification Thresholds:

vertex_degree_classification:
normal:
max_degree: 1000
strategy: full_traversal # Return all neighbors
memory_per_query: <100 KB

hub:
min_degree: 1001
max_degree: 100000
strategy: paginated_traversal # Return in batches
memory_per_query: <10 MB

super:
min_degree: 100001
max_degree: 10000000
strategy: sampling_required # Must use sampling
memory_per_query: <100 MB (sampled)

mega:
min_degree: 10000001
strategy: sampling_mandatory # No full traversal allowed
memory_per_query: <1 GB (sampled)

Sampling Strategies

Strategy 1: Random Sampling

Approach: Randomly select N neighbors from total degree

type RandomSampler struct {
SampleSize int
}

func (rs *RandomSampler) SampleNeighbors(vertexID string, degree int64) ([]string, error) {
if degree <= rs.SampleSize {
// Vertex not a super-node, return all
return rs.GetAllNeighbors(vertexID), nil
}

// Random sampling
samples := make([]string, 0, rs.SampleSize)
indices := rs.GenerateRandomIndices(degree, rs.SampleSize)

for _, idx := range indices {
neighborID := rs.GetNeighborAtIndex(vertexID, idx)
samples = append(samples, neighborID)
}

return samples, nil
}

func (rs *RandomSampler) GenerateRandomIndices(total int64, count int) []int64 {
// Reservoir sampling
indices := make([]int64, count)
for i := 0; i < count; i++ {
indices[i] = int64(i)
}

for i := count; i < int(total); i++ {
j := rand.Int63n(int64(i + 1))
if j < int64(count) {
indices[j] = int64(i)
}
}

return indices
}

Performance:

  • Time complexity: O(N) for N samples
  • Memory: O(N) for sample storage
  • Bias: Uniform random (no bias)

Strategy 2: Top-K by Property

Approach: Return top K neighbors by property value (e.g., most recent followers, highest engagement)

type TopKSampler struct {
SampleSize int
OrderBy string // Property to sort by
Descending bool
}

func (tks *TopKSampler) SampleNeighbors(vertexID string) ([]string, error) {
// Use property index for efficient top-K
index := tks.GetPropertyIndex(vertexID, tks.OrderBy)

if tks.Descending {
return index.GetTopK(tks.SampleSize), nil
} else {
return index.GetBottomK(tks.SampleSize), nil
}
}

Use Cases:

  • Most recent followers: .out('FOLLOWS').sample(1000).orderBy('joined_at', desc)
  • Highest engagement: .out('FRIENDS').sample(1000).orderBy('messages_count', desc)

Performance:

  • Time complexity: O(log N) with indexed property
  • Memory: O(K) for top-K storage
  • Bias: Intentional (returns "most important" neighbors)

Strategy 3: HyperLogLog Cardinality Estimation

Approach: Estimate neighbor count without materializing full set

type HyperLogLogSampler struct {
Precision int // Typical: 14 (error rate: 0.8%)
}

func (hll *HyperLogLogSampler) EstimateDegree(vertexID string) (int64, error) {
// Initialize HyperLogLog counter
counter := hyperloglog.New(hll.Precision)

// Stream neighbors and add to counter
neighborStream := hll.StreamNeighbors(vertexID)
for neighborID := range neighborStream {
counter.Add([]byte(neighborID))
}

// Estimate cardinality
estimate := counter.Count()
return int64(estimate), nil
}

Use Cases:

  • Degree estimation without full scan: g.V('@taylorswift').out('FOLLOWS').count().approximate()
  • Cardinality for query planning: "How many neighbors does this vertex have?"

Performance:

  • Space complexity: O(1) - fixed memory (16 KB for precision 14)
  • Time complexity: O(N) - single pass through neighbors
  • Error rate: 0.8% for precision 14

Gremlin Extensions for Sampling

Extension 1: .sample(N)

Syntax: Sample N vertices from current traversal

// Sample 1000 followers
g.V().has('username', '@taylorswift').out('FOLLOWS').sample(1000)

// Sample 10% of followers
g.V().has('username', '@taylorswift').out('FOLLOWS').sample(0.1)

Implementation:

func (qe *QueryExecutor) ExecuteSampleStep(
inputVertices []*Vertex,
sampleSize interface{}, // int or float64
) ([]*Vertex, error) {
switch s := sampleSize.(type) {
case int:
// Absolute count
return qe.SampleFixed(inputVertices, s), nil

case float64:
// Percentage
count := int(float64(len(inputVertices)) * s)
return qe.SampleFixed(inputVertices, count), nil

default:
return nil, fmt.Errorf("invalid sample size: %v", sampleSize)
}
}

func (qe *QueryExecutor) SampleFixed(vertices []*Vertex, n int) []*Vertex {
if len(vertices) <= n {
return vertices // Already smaller than sample
}

// Reservoir sampling
samples := make([]*Vertex, n)
copy(samples, vertices[:n])

for i := n; i < len(vertices); i++ {
j := rand.Intn(i + 1)
if j < n {
samples[j] = vertices[i]
}
}

return samples
}

Extension 2: .approximate()

Syntax: Use approximate algorithms for aggregations

// Approximate count (HyperLogLog)
g.V().has('username', '@taylorswift').out('FOLLOWS').count().approximate()

// Approximate distinct (HyperLogLog)
g.V().has('type', 'User').values('city').dedup().count().approximate()

Implementation:

func (qe *QueryExecutor) ExecuteApproximateCount(
inputVertices []*Vertex,
) (int64, error) {
counter := hyperloglog.New14() // Precision 14

for _, vertex := range inputVertices {
counter.Add([]byte(vertex.ID))
}

return int64(counter.Count()), nil
}

Circuit Breaker for Super-Node Queries

Automatic Detection: Detect super-node queries during execution and apply sampling automatically

type SuperNodeCircuitBreaker struct {
DegreeThreshold int64 // 100k neighbors = super-node
AutoSample bool // Automatically apply sampling
DefaultSampleSize int // 10k samples
}

func (scb *SuperNodeCircuitBreaker) ExecuteTraversal(
vertex *Vertex,
edgeLabel string,
) ([]*Vertex, error) {
// Check degree before traversal
degree := scb.GetDegree(vertex.ID, edgeLabel)

if degree > scb.DegreeThreshold {
// Super-node detected
if !scb.AutoSample {
return nil, SuperNodeError{
VertexID: vertex.ID,
Degree: degree,
Message: "Vertex exceeds degree threshold, use .sample() or .approximate()",
}
}

// Automatic sampling
log.Warnf("Super-node detected: %s (degree: %d), applying automatic sampling to %d",
vertex.ID, degree, scb.DefaultSampleSize)

return scb.SampleNeighbors(vertex.ID, scb.DefaultSampleSize), nil
}

// Normal vertex, full traversal
return scb.GetAllNeighbors(vertex.ID), nil
}

Configuration:

super_node_circuit_breaker:
enabled: true
degree_threshold: 100000 # Vertices with >100k neighbors
auto_sample: true # Automatically apply sampling
default_sample_size: 10000 # Sample 10k neighbors
rejection_mode: false # If true, reject queries instead of sampling

Performance Trade-Offs

Comparison of Sampling Strategies:

StrategyMemoryTimeBiasUse Case
Full TraversalO(N)O(N)NoneNormal vertices (N < 1k)
Random SamplingO(K)O(K)NoneUnbiased sample, general purpose
Top-K by PropertyO(K)O(log N)*Intentional"Most important" neighbors
HyperLogLogO(1)O(N)Count onlyCardinality estimation
Reservoir SamplingO(K)O(N)NoneStream processing

*Assumes indexed property

Example Performance: Celebrity Query

Query: g.V('@taylorswift').out('FOLLOWS')

Without sampling:
Degree: 100M followers
Memory: 6.4 GB
Time: 45 seconds
Result: OOM crash

With random sampling (10k):
Degree: 100M followers (sampled to 10k)
Memory: 640 KB
Time: 100 ms
Result: 10k sampled followers

With HyperLogLog (count only):
Degree: 100M followers
Memory: 16 KB
Time: 2 seconds
Result: Estimate 100M ± 0.8%

Trade-Off Decision Matrix:

Query GoalStrategyJustification
Exact countFull traversalMust count all neighbors
Approximate countHyperLogLog0.8% error acceptable, 99.97% memory savings
Representative sampleRandom samplingUnbiased, general purpose
Most active usersTop-K by activityIntentional bias for engagement analysis
Recent interactionsTop-K by timestampIntentional bias for recency

Client-Side API

Python Client:

from prism_client import GraphClient

client = GraphClient('localhost:50051')

# Sample 1000 followers
followers = client.gremlin(
"g.V().has('username', '@taylorswift').out('FOLLOWS').sample(1000)"
)

# Approximate count
count = client.gremlin(
"g.V().has('username', '@taylorswift').out('FOLLOWS').count().approximate()"
)

# Top 100 by engagement
top_followers = client.gremlin(
"""g.V().has('username', '@taylorswift')
.out('FOLLOWS')
.sample(100)
.orderBy('engagement_score', desc)"""
)

Go Client:

client := prism.NewGraphClient("localhost:50051")

// Sample with options
result, err := client.Gremlin(ctx, &prism.GremlinRequest{
Query: "g.V().has('username', '@taylorswift').out('FOLLOWS')",
SamplingOptions: &prism.SamplingOptions{
Enabled: true,
SampleSize: 1000,
Strategy: prism.SAMPLING_STRATEGY_RANDOM,
},
})

Summary

Super-node handling is mandatory at 100B scale to prevent:

  • Memory exhaustion: 100M neighbors × 64 bytes = 6.4 GB per query
  • Query timeouts: Full traversal of 100M neighbors takes 45+ seconds
  • Cascading failures: Single celebrity query impacts all concurrent queries

Multi-strategy approach:

  1. Vertex classification: Identify super-nodes before query execution (normal/hub/super/mega)
  2. Sampling: Random, Top-K, HyperLogLog for different use cases
  3. Gremlin extensions: .sample(N), .approximate() for explicit sampling
  4. Circuit breaker: Automatic detection and sampling for super-node queries
  5. Performance trade-offs: Balance accuracy vs memory vs latency

Key metrics:

  • Degree threshold: 100k neighbors (super-node classification)
  • Default sample size: 10k neighbors (99% memory reduction)
  • HyperLogLog precision: 14 (0.8% error rate, 16 KB memory)
  • Circuit breaker: Auto-sample or reject queries to super-nodes

Impact: Enables safe querying of social graphs with celebrities/influencers, prevents OOM crashes, maintains sub-second query latency even for high-degree vertices.

Performance Characteristics

Query Latency by Complexity

Query TypePartitionsVerticesLatency (P50)Latency (P99)
Single vertex lookup1150 μs200 μs
Property filter (indexed)1505M2 s5 s
Property filter (unindexed)64,000100B60 s180 s
1-hop traversal (local)1200500 μs2 ms
1-hop traversal (distributed)5010k10 ms50 ms
2-hop traversal500100k100 ms500 ms
3-hop traversal5,0001M1 s5 s

Partition Pruning Effectiveness

Query PatternPartitions QueriedPruning RatioSpeedup
Exact property match (indexed)150 / 64,00099.1%106×
Range query (indexed)2,000 / 64,00087.5%
Label filter4,000 / 64,00075%
No prunable filters64,000 / 64,0000%

Parallel Execution Speedup

Input SizeSequential TimeParallel Time (1000 workers)Speedup
1k vertices0.1 s0.1 s
10k vertices1 s0.5 s
100k vertices10 s0.8 s12.5×
1M vertices100 s2 s50×
10M vertices1000 s10 s100×

Query Observability and Debugging

Problem: At 100B scale with 1000-node distributed execution, debugging slow queries is challenging without deep observability. "Why did this query take 45 seconds instead of 5 seconds?" requires visibility into partition-level execution, network latency, and resource utilization.

EXPLAIN Plan (SQL-Style)

Purpose: Show query execution plan before running query, estimate costs, identify optimization opportunities.

type QueryExplainer struct {
planner *QueryPlanner
costModel *CostModel
statsManager *StatisticsManager
}

func (qe *QueryExplainer) Explain(gremlinQuery string) (*ExplainPlan, error) {
// Parse query
plan, err := qe.planner.Analyze(gremlinQuery)
if err != nil {
return nil, err
}

// Estimate costs
explainPlan := &ExplainPlan{
Query: gremlinQuery,
Stages: make([]*ExplainStage, len(plan.Stages)),
}

for i, stage := range plan.Stages {
explainPlan.Stages[i] = &ExplainStage{
StageID: i,
Type: stage.Type,
EstimatedRows: stage.EstimatedCardinality,
EstimatedCost: qe.costModel.EstimateStageCost(stage),
PartitionsQueried: len(stage.TargetPartitions),
IndexUsed: stage.UseIndex,
IndexName: stage.IndexName,
}
}

explainPlan.TotalCost = qe.costModel.EstimateQueryCost(plan)
explainPlan.TotalLatency = qe.EstimateLatency(plan)

return explainPlan, nil
}

Example: EXPLAIN Output

EXPLAIN: g.V().hasLabel('User').has('city', 'SF').out('FOLLOWS').limit(100)

Query Plan:
┌──────────────────────────────────────────────────────────────┐
│ Stage 0: Vertex Scan (hasLabel + has) │
│ Type: INDEX_SCAN │
│ Index: city_index['SF'] │
│ Estimated Rows: 5M vertices │
│ Estimated Cost: 2.5 (index lookup + scan) │
│ Partitions Queried: 150 of 64,000 (pruned 99%) │
│ Estimated Latency: 2 seconds │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ Stage 1: Edge Traversal (out FOLLOWS) │
│ Type: TRAVERSAL │
│ Index: edge_index['FOLLOWS'] │
│ Estimated Rows: 50M edges (10 per vertex avg) │
│ Estimated Cost: 50.0 (edge lookup + vertex load) │
│ Partitions Queried: 500 (distributed fan-out) │
│ Estimated Latency: 5 seconds │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ Stage 2: Limit (100) │
│ Type: LIMIT │
│ Estimated Rows: 100 vertices │
│ Estimated Cost: 0.1 (early termination) │
│ Partitions Queried: N/A │
│ Estimated Latency: <1 ms │
└──────────────────────────────────────────────────────────────┘

Total Estimated Cost: 52.6
Total Estimated Latency: 7 seconds
Total Partitions Touched: 650 of 64,000

Query Timeline Visualization

Purpose: Show actual execution timeline with partition-level detail, identify bottlenecks.

type QueryTimeline struct {
QueryID string
StartTime time.Time
EndTime time.Time
TotalTime time.Duration
Stages []*StageTimeline
}

type StageTimeline struct {
StageID int
StartTime time.Time
EndTime time.Time
Duration time.Duration
PartitionExec []*PartitionExecution
}

type PartitionExecution struct {
PartitionID string
StartTime time.Time
EndTime time.Time
Duration time.Duration
RowsReturned int64
}

func (qe *QueryExecutor) ExecuteWithTimeline(
ctx context.Context,
query string,
) (*QueryResult, *QueryTimeline, error) {
timeline := &QueryTimeline{
QueryID: uuid.New().String(),
StartTime: time.Now(),
}

// Execute query stages
for i, stage := range query.Stages {
stageTimeline := &StageTimeline{
StageID: i,
StartTime: time.Now(),
}

// Execute on partitions
for _, partitionID := range stage.TargetPartitions {
partExec := &PartitionExecution{
PartitionID: partitionID,
StartTime: time.Now(),
}

rows := qe.ExecuteOnPartition(partitionID, stage)

partExec.EndTime = time.Now()
partExec.Duration = partExec.EndTime.Sub(partExec.StartTime)
partExec.RowsReturned = int64(len(rows))

stageTimeline.PartitionExec = append(stageTimeline.PartitionExec, partExec)
}

stageTimeline.EndTime = time.Now()
stageTimeline.Duration = stageTimeline.EndTime.Sub(stageTimeline.StartTime)
timeline.Stages = append(timeline.Stages, stageTimeline)
}

timeline.EndTime = time.Now()
timeline.TotalTime = timeline.EndTime.Sub(timeline.StartTime)

return result, timeline, nil
}

Example: Timeline Visualization

Query Timeline: g.V().has('city', 'SF').out('FOLLOWS')
Total Time: 8.2 seconds

Stage 0: Vertex Scan (2.1 seconds)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.1s
Partitions: 150
Fastest: partition 07:0005:12 (10 ms)
Slowest: partition 09:0089:45 (450 ms) ← BOTTLENECK
Average: 14 ms

Stage 1: Edge Traversal (6.0 seconds)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.0s
Partitions: 500
Fastest: partition 01:0012:03 (8 ms)
Slowest: partition 07:0042:18 (3.2s) ← BOTTLENECK
Reason: Cold partition (loaded from S3)
Average: 12 ms

Stage 2: Limit (0.1 seconds)
━ 0.1s

Distributed Tracing (OpenTelemetry)

Purpose: Trace query execution across 1000 nodes, capture spans for each operation.

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func (qc *QueryCoordinator) ExecuteWithTracing(
ctx context.Context,
gremlinQuery string,
) (*ResultStream, error) {
// Start root span
tracer := otel.Tracer("prism-query-executor")
ctx, span := tracer.Start(ctx, "gremlin.query",
trace.WithAttributes(
attribute.String("gremlin.query", gremlinQuery),
attribute.String("query.id", uuid.New().String()),
),
)
defer span.End()

// Parse query
ctx, parseSpan := tracer.Start(ctx, "gremlin.parse")
plan, err := qc.queryAnalyzer.Analyze(gremlinQuery)
parseSpan.End()

if err != nil {
span.RecordError(err)
return nil, err
}

// Execute stages
for i, stage := range plan.Stages {
ctx, stageSpan := tracer.Start(ctx, fmt.Sprintf("stage.%d", i),
trace.WithAttributes(
attribute.String("stage.type", stage.Type.String()),
attribute.Int("stage.partitions", len(stage.TargetPartitions)),
),
)

// Execute on partitions
for _, partitionID := range stage.TargetPartitions {
ctx, partSpan := tracer.Start(ctx, "partition.execute",
trace.WithAttributes(
attribute.String("partition.id", partitionID),
),
)

results := qc.ExecuteOnPartition(ctx, partitionID, stage)

partSpan.SetAttributes(
attribute.Int("partition.rows_returned", len(results)),
)
partSpan.End()
}

stageSpan.End()
}

return results, nil
}

Trace Example in SignOz/Jaeger:

gremlin.query (8.2s)

├─ gremlin.parse (10ms)

├─ stage.0: Vertex Scan (2.1s)
│ ├─ partition.execute [07:0005:12] (10ms)
│ ├─ partition.execute [07:0012:34] (12ms)
│ └─ partition.execute [09:0089:45] (450ms) ← SLOW

└─ stage.1: Edge Traversal (6.0s)
├─ partition.execute [01:0012:03] (8ms)
├─ partition.execute [07:0042:18] (3.2s) ← VERY SLOW
│ ├─ s3.fetch (2.8s) ← ROOT CAUSE: Cold partition
│ └─ index.lookup (400ms)
└─ partition.execute [05:0067:89] (15ms)

Slow Query Log Configuration

slow_query_log:
enabled: true
threshold: 5000ms # Log queries taking >5 seconds
log_level: warn
output:
- file: /var/log/prism/slow_queries.log
- kafka: slow-queries-topic
- signoz: true

# Include in slow query log
include:
- query_text: true
- execution_plan: true
- timeline: true
- partition_breakdown: true
- resource_usage: true # CPU, memory, network

# Sampling (avoid overwhelming logs)
sampling:
rate: 100% # Log 100% of slow queries

Slow Query Log Example:

{
"timestamp": "2025-11-15T10:23:45Z",
"query_id": "550e8400-e29b-41d4-a716-446655440000",
"duration_ms": 8200,
"threshold_ms": 5000,
"query": "g.V().has('city', 'SF').out('FOLLOWS')",
"principal": "user@example.com",
"stages": [
{
"stage_id": 0,
"type": "INDEX_SCAN",
"duration_ms": 2100,
"partitions_queried": 150,
"rows_returned": 5000000
},
{
"stage_id": 1,
"type": "TRAVERSAL",
"duration_ms": 6000,
"partitions_queried": 500,
"rows_returned": 50000000,
"bottleneck": {
"partition_id": "07:0042:18",
"duration_ms": 3200,
"reason": "Cold partition (S3 fetch: 2.8s)"
}
}
],
"resource_usage": {
"cpu_seconds": 120,
"memory_mb": 4500,
"network_bytes_sent": 524288000
},
"recommendation": "Consider promoting partition 07:0042:18 to hot tier (accessed 15 times in last hour)"
}

Prometheus Metrics and Alerts

Metrics:

# Query latency histogram
prism_gremlin_query_duration_seconds{stage="0",partition="07:0042:18"} 3.2

# Query throughput
prism_gremlin_queries_total{status="success"} 1250000
prism_gremlin_queries_total{status="error"} 350

# Partition execution time
prism_partition_execution_duration_seconds{partition="07:0042:18"} 3.2

# Slow query count
prism_slow_queries_total{threshold="5s"} 125

Alerting Rules:

alerts:
- name: HighSlowQueryRate
condition: rate(prism_slow_queries_total[5m]) > 10
severity: warning
message: "High slow query rate: {{$value}} slow queries/min (threshold: 10)"

- name: PartitionConsistentlySlow
condition: avg_over_time(prism_partition_execution_duration_seconds{partition="07:0042:18"}[10m]) > 1.0
severity: warning
message: "Partition {{$labels.partition}} consistently slow (avg: {{$value}}s)"

- name: QueryErrorRate
condition: rate(prism_gremlin_queries_total{status="error"}[5m]) > 100
severity: critical
message: "High query error rate: {{$value}} errors/min"

Gremlin Step Support Matrix

Gremlin StepSupportedOptimizationNotes
V()Index scanStart traversal from all vertices
V(id)Direct lookupStart from specific vertex
E()Edge index scanStart from all edges
hasLabel()Label indexFilter by vertex label
has(key, value)Property indexFilter by property
has(key, predicate)Range indexFilter by predicate (gt, lt, etc)
out()Edge indexTraverse outgoing edges
in()Inverted edge indexTraverse incoming edges
both()Both indexesTraverse both directions
outE()Edge indexGet outgoing edges
inE()Inverted edge indexGet incoming edges
outV()Vertex lookupGet source vertex from edge
inV()Vertex lookupGet target vertex from edge
values()Property accessGet property values
limit()Early terminationLimit result set
range()PaginationRange-based pagination
count()Cardinality estimateCount vertices/edges
dedup()Bloom filterRemove duplicates
order()⚠️PartialSort results (expensive)
group()⚠️PartialGroup by property
path()⚠️LimitedPath tracking (memory intensive)
match()-Complex pattern matching (future)
repeat()⚠️LimitedRecursive traversal (bounded)

Open Questions

  1. Query Caching: Should we cache query plans or query results?
  2. Distributed Transactions: How to support multi-vertex updates atomically?
  3. Query Timeouts: How to handle long-running queries gracefully?
  4. Result Pagination: Best pagination strategy for large result sets?
  5. Custom Functions: Support for user-defined Gremlin functions?

References

Revision History

  • 2025-11-15: Initial draft - Distributed Gremlin query execution engine