RFC-059: Hot/Cold Storage Tiers with Rapid S3 Snapshot Loading
Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15
Abstract
This RFC defines a hot/cold storage tier architecture for massive-scale graph data (100B vertices) with rapid bulk loading from S3 snapshots. At this scale, pure in-memory storage requires 210 TB RAM ($105M/year), making it economically infeasible. Hot/cold tiering keeps frequently accessed data in memory (hot tier: 21 TB, 10%) and infrequently accessed data in S3 (cold tier: 189 TB, 90%), reducing costs by 95% while maintaining query performance. The system supports extremely rapid updates via parallel snapshot loading from S3 with 100MB chunk sizes, supporting multiple formats: Parquet (columnar), Prometheus/Thanos (time-series), HDFS (Hadoop), Protobuf (native), and JSON Lines (human-readable).
Key Innovations:
- Cost Optimization: 95% cost reduction vs pure in-memory ($12.5k/month vs $105k/month)
- Multi-Format Support: Parquet, Prometheus, HDFS, Protobuf, JSON Lines
- Parallel Loading: 1000 workers load 10 TB snapshot in 60 seconds
- Access Pattern Learning: ML-based hot/cold classification
- Distributed WAL: Kafka-based write-ahead log for consistency
- Zero-Copy Loading: Memory-mapped files for instant access
Motivation
Cost Analysis: Pure In-Memory vs Hot/Cold Tiers
Pure In-Memory Architecture (RFC-055 baseline):
Graph Data:
100B vertices × 100 bytes = 10 TB
10T edges × 20 bytes = 200 TB
Total: 210 TB
Infrastructure:
210 TB ÷ 30 GB per proxy = 7,000 proxies
AWS r6i.2xlarge: 64 GB RAM, $583/month
Total: 7,000 proxies × $583/month = $4,081,000/month
Annual cost: $48,972,000/year
Hot/Cold Tiered Architecture (this RFC):
Hot Tier (10% of data):
21 TB in-memory across 1,000 proxies
1,000 proxies × $583/month = $583,000/month
Cold Tier (90% of data):
189 TB on S3 Standard
189 TB × $23/TB/month = $4,347/month
Warm Tier (caching):
50 TB on local NVMe SSD (proxy-local)
Included in proxy cost
Total: $587,347/month = $7,048,164/year
Annual savings: $48.9M - $7.0M = $41.9M (86% reduction)
Why Rapid Snapshot Loading?
Problem: Traditional graph loading is too slow for operational use
Traditional Approach (Insert per vertex/edge):
100B vertices × 1ms per insert = 100M seconds = 1,157 days
Even with 1000 parallel workers: 1.16 days
Snapshot Approach (Bulk load):
Load 210 TB snapshot in parallel
1000 workers × 200 MB/s = 200 GB/s aggregate
210 TB ÷ 200 GB/s = 1,050 seconds = 17.5 minutes
Speedup: 95,880×
Use Cases for Rapid Loading:
- Daily Full Refresh: Load entire graph from nightly batch job
- Disaster Recovery: Rebuild cluster from S3 in <20 minutes
- Development/Staging: Spin up new clusters with production-like data
- A/B Testing: Load multiple graph versions for experimentation
- Time Travel: Load historical snapshots for analysis
Why Multiple Snapshot Formats?
Different data sources produce different formats:
| Format | Use Case | Compression | Read Speed | Ecosystem |
|---|---|---|---|---|
| Parquet | ETL pipelines, analytics | Excellent (10:1) | Fast (columnar) | Spark, Presto, Trino |
| Prometheus | Time-series metrics, monitoring | Good (3:1) | Fast (Thanos) | Prometheus, Grafana |
| HDFS | Hadoop ecosystem, MapReduce | Good (Snappy) | Fast (parallel) | Hadoop, Hive |
| Protobuf | Native graph format, lowest latency | Excellent (5:1) | Fastest | Prism native |
| JSON Lines | Human-readable, debugging | Poor (2:1) | Slow | Universal |
Example: Enterprise data pipeline
Source Systems → Data Lake → Graph Snapshot → Prism
Step 1: Spark ETL job exports to Parquet
- Users table → s3://data-lake/users.parquet
- Transactions table → s3://data-lake/transactions.parquet
Step 2: Prism loader converts Parquet → Graph
- Users → Vertices
- Transactions → Edges
Step 3: Prism loads graph in 17 minutes
- No intermediate conversion needed
- Direct Parquet → in-memory graph
Goals
- Cost Efficiency: 95% cost reduction vs pure in-memory
- Rapid Loading: Load 210 TB snapshot in <20 minutes
- Multi-Format Support: Parquet, Prometheus, HDFS, Protobuf, JSON Lines
- Hot/Cold Transparency: Queries work seamlessly across tiers
- Access Pattern Learning: Automatically identify hot/cold data
- Distributed WAL: Consistent updates across all tiers
- Zero-Copy Loading: Avoid memory copies during snapshot load
Non-Goals
- Real-Time Streaming: Snapshot loading is batch-oriented (streaming via RFC-051 WAL)
- Schema Inference: Snapshot schema must be explicit (no auto-detection)
- Cross-Format Joins: Each snapshot is single format
- Versioning/Time Travel: Snapshot versioning is external concern
Hot/Cold Storage Architecture
Three-Tier Storage Model
┌────────────────────────────────────────────────────────────────┐
│ Tier 1: Hot (Memory) │
│ │
│ Frequently accessed data: Last 7 days, VIP users, trending │
│ Storage: In-memory (MemStore) │
│ Capacity: 21 TB (10% of total) │
│ Access latency: 50 μs │
│ Cost: $583k/month │
│ │
└────────────────────────────────────────────────────────────────┘
↕ Promotion/Demotion
┌────────────────────────────────────────────────────────────────┐
│ Tier 2: Warm (Local SSD) │
│ │
│ Recently accessed: Last 30 days, active users │
│ Storage: Local NVMe SSD (proxy-attached) │
│ Capacity: 50 TB │
│ Access latency: 100 μs │
│ Cost: Included in proxy hardware │
│ │
└────────────────────────────────────────────────────────────────┘
↕ Fetch on demand
┌────────────────────────────────────────────────────────────────┐
│ Tier 3: Cold (S3) │
│ │
│ Infrequently accessed: Historical data, inactive users │
│ Storage: S3 Standard │
│ Capacity: 189 TB (90% of total) │
│ Access latency: 50-100 ms │
│ Cost: $4.3k/month │
│ │
└────────────────────────────────────────────────────────────────┘
Partition Temperature States
enum PartitionTemperature {
TEMPERATURE_HOT = 0; // In memory, frequently accessed
TEMPERATURE_WARM = 1; // On local SSD, occasionally accessed
TEMPERATURE_COLD = 2; // On S3, rarely accessed
TEMPERATURE_FROZEN = 3; // On S3 Glacier, never accessed
}
message PartitionMetrics {
string partition_id = 1;
PartitionTemperature temperature = 2;
// Access metrics
int64 requests_per_minute = 3;
int64 last_access_timestamp = 4;
int64 total_accesses = 5;
// Storage locations
bool in_memory = 6;
bool on_ssd = 7;
string s3_path = 8;
// Size metrics
int64 vertex_count = 9;
int64 edge_count = 10;
int64 memory_bytes = 11;
int64 ssd_bytes = 12;
int64 s3_bytes = 13;
}
Temperature Classification Algorithm
ML-Based Classification:
type TemperatureClassifier struct {
model *xgboost.Model
}
func (tc *TemperatureClassifier) ClassifyPartition(metrics *PartitionMetrics) PartitionTemperature {
features := []float64{
float64(metrics.RequestsPerMinute),
float64(time.Now().Unix() - metrics.LastAccessTimestamp),
float64(metrics.TotalAccesses),
tc.calculateAccessRecency(metrics),
tc.calculateAccessFrequency(metrics),
tc.calculateAccessBurstiness(metrics),
}
prediction := tc.model.Predict(features)
switch {
case prediction > 0.8:
return TEMPERATURE_HOT
case prediction > 0.5:
return TEMPERATURE_WARM
case prediction > 0.2:
return TEMPERATURE_COLD
default:
return TEMPERATURE_FROZEN
}
}
// Access recency: How recently was partition accessed?
func (tc *TemperatureClassifier) calculateAccessRecency(m *PartitionMetrics) float64 {
hoursSinceAccess := float64(time.Now().Unix()-m.LastAccessTimestamp) / 3600.0
return math.Exp(-hoursSinceAccess / 24.0) // Exponential decay over 24 hours
}
// Access frequency: How often is partition accessed?
func (tc *TemperatureClassifier) calculateAccessFrequency(m *PartitionMetrics) float64 {
return float64(m.RequestsPerMinute) / 1000.0 // Normalize to [0, 1]
}
// Access burstiness: Are accesses clustered or spread out?
func (tc *TemperatureClassifier) calculateAccessBurstiness(m *PartitionMetrics) float64 {
// Use coefficient of variation from access history
return m.AccessHistogram.CoefficientOfVariation()
}
Rule-Based Classification (simpler alternative):
temperature_rules:
hot:
promote_threshold:
requests_per_minute: ">= 1000"
last_access: "<= 1 hour"
demote_threshold:
requests_per_minute: "< 800" # 20% hysteresis
last_access: "> 1 hour"
cooldown_period: 5m # Minimum time before demotion
warm:
promote_threshold:
requests_per_minute: ">= 10"
last_access: "<= 24 hours"
demote_threshold:
requests_per_minute: "< 8" # 20% hysteresis
last_access: "> 24 hours"
cooldown_period: 10m # Minimum time before demotion
cold:
promote_threshold:
requests_per_minute: ">= 1"
last_access: "<= 7 days"
demote_threshold:
requests_per_minute: "< 1" # No hysteresis (already at floor)
last_access: "> 7 days"
cooldown_period: 0m
frozen:
requests_per_minute: "< 1"
last_access: "> 30 days"
Hysteresis Rationale:
At 100B scale with 16,000 partitions, temperature thrashing causes excessive data movement and performance degradation. Without hysteresis, partitions near threshold boundaries oscillate between states multiple times per minute.
Problem: Thrashing Without Hysteresis
Partition with fluctuating load: 990-1010 rpm
Without hysteresis (single threshold: 1000 rpm):
t=0s: 1010 rpm → HOT (promote)
t=60s: 990 rpm → WARM (demote)
t=120s: 1005 rpm → HOT (promote)
t=180s: 995 rpm → WARM (demote)
t=240s: 1010 rpm → HOT (promote)
Result: 4 state changes in 5 minutes
Cost: 4 × 156 MB data movements = 624 MB wasted bandwidth
Solution: 20% Hysteresis + Cooldown
Same partition with hysteresis:
Promote to HOT: >= 1000 rpm
Demote from HOT: < 800 rpm (20% below)
Cooldown: 5 minutes minimum in HOT before demotion
With hysteresis:
t=0s: 1010 rpm → HOT (promote)
t=60s: 990 rpm → HOT (still above 800 rpm demote threshold)
t=120s: 1005 rpm → HOT (stays)
t=180s: 995 rpm → HOT (stays)
t=240s: 1010 rpm → HOT (stays)
Result: 1 state change in 5 minutes (stabilized)
Cost: 1 × 156 MB data movement = 156 MB (75% reduction)
Hysteresis Values Explained:
-
Hot ↔ Warm: 20% hysteresis (1000 rpm promote, 800 rpm demote)
- Rationale: Hot tier is expensive (memory), prevent unnecessary churn
- Cooldown: 5 minutes (enough for burst traffic to settle)
- Impact: Partitions stay hot unless sustained drop below 800 rpm
-
Warm ↔ Cold: 20% hysteresis (10 rpm promote, 8 rpm demote)
- Rationale: Prevent oscillation for rarely-accessed partitions
- Cooldown: 10 minutes (longer than hot, less critical tier)
- Impact: Once promoted to warm, stays warm unless truly idle
-
Cold ↔ Frozen: No hysteresis
- Rationale: Already at lowest tier, no performance impact from oscillation
- Both are on S3, just different access patterns
Performance Impact:
| Metric | Without Hysteresis | With Hysteresis | Improvement |
|---|---|---|---|
| State changes/hour | 24 | 4 | 6× reduction |
| Data movements/hour | 3.7 GB | 0.6 GB | 84% reduction |
| Cache hit rate | 70% | 88% | 18% improvement |
| Partition load time | 5s avg | 2s avg | 60% faster |
Promotion and Demotion Policies
func (pm *PartitionManager) MonitorTemperature() {
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
partitions := pm.GetAllPartitions()
for _, partition := range partitions {
metrics := pm.GetMetrics(partition.ID)
newTemp := pm.Classifier.ClassifyPartition(metrics)
if newTemp != partition.Temperature {
pm.TransitionTemperature(partition.ID, newTemp)
}
}
}
}
func (pm *PartitionManager) TransitionTemperature(
partitionID string,
newTemp PartitionTemperature,
) error {
partition := pm.GetPartition(partitionID)
oldTemp := partition.Temperature
switch {
case oldTemp == TEMPERATURE_COLD && newTemp == TEMPERATURE_HOT:
// Cold → Hot: Load from S3 to memory
return pm.PromoteToHot(partitionID)
case oldTemp == TEMPERATURE_COLD && newTemp == TEMPERATURE_WARM:
// Cold → Warm: Load from S3 to SSD
return pm.PromoteToWarm(partitionID)
case oldTemp == TEMPERATURE_HOT && newTemp == TEMPERATURE_COLD:
// Hot → Cold: Evict from memory to S3
return pm.DemoteToCold(partitionID)
case oldTemp == TEMPERATURE_WARM && newTemp == TEMPERATURE_COLD:
// Warm → Cold: Evict from SSD to S3
return pm.DemoteToCold(partitionID)
default:
return nil // No transition needed
}
}
Snapshot Formats and Loading
Format 1: Parquet (Apache Arrow)
Schema:
Vertex Schema (Parquet):
- vertex_id: STRING (required)
- label: STRING (required)
- properties: MAP<STRING, BINARY> (required)
- created_at: TIMESTAMP (optional)
- updated_at: TIMESTAMP (optional)
Edge Schema (Parquet):
- edge_id: STRING (required)
- label: STRING (required)
- from_vertex_id: STRING (required)
- to_vertex_id: STRING (required)
- properties: MAP<STRING, BINARY> (optional)
- created_at: TIMESTAMP (optional)
S3 Layout:
s3://prism-snapshots/graph-2025-11-15/
vertices/
part-00000.parquet (100 MB, 1M vertices)
part-00001.parquet (100 MB, 1M vertices)
...
part-09999.parquet (100 MB, 1M vertices)
Total: 10,000 files, 1 TB, 10B vertices
edges/
part-00000.parquet (100 MB, 500k edges)
part-00001.parquet (100 MB, 500k edges)
...
part-99999.parquet (100 MB, 500k edges)
Total: 100,000 files, 10 TB, 10T edges
Loader Implementation:
func (sl *SnapshotLoader) LoadParquetSnapshot(s3Path string) error {
// Step 1: List all Parquet files
vertexFiles := sl.ListS3Files(s3Path + "/vertices/*.parquet")
edgeFiles := sl.ListS3Files(s3Path + "/edges/*.parquet")
// Step 2: Distribute files across 1000 proxies
proxies := sl.GetAllProxies()
vertexChunks := sl.DistributeFiles(vertexFiles, len(proxies))
edgeChunks := sl.DistributeFiles(edgeFiles, len(proxies))
// Step 3: Parallel load (1000 workers)
var wg sync.WaitGroup
for i, proxy := range proxies {
wg.Add(1)
go func(proxyID string, vFiles, eFiles []string) {
defer wg.Done()
// Load vertices
for _, file := range vFiles {
sl.LoadParquetVertexFile(proxyID, file)
}
// Load edges
for _, file := range eFiles {
sl.LoadParquetEdgeFile(proxyID, file)
}
}(proxy.ID, vertexChunks[i], edgeChunks[i])
}
wg.Wait()
return nil
}
func (sl *SnapshotLoader) LoadParquetVertexFile(proxyID string, s3File string) error {
// Stream from S3 with Arrow IPC
reader, err := parquet.OpenS3File(s3File)
if err != nil {
return err
}
defer reader.Close()
batch := make([]*Vertex, 0, 10000)
for reader.Next() {
// Read Arrow record batch
record := reader.Record()
// Convert to vertices
for i := 0; i < int(record.NumRows()); i++ {
vertex := &Vertex{
ID: record.Column(0).ValueString(i),
Label: record.Column(1).ValueString(i),
Properties: sl.DecodeProperties(record.Column(2).Value(i)),
}
batch = append(batch, vertex)
// Flush batch every 10k vertices
if len(batch) >= 10000 {
sl.SendBatchToProxy(proxyID, batch)
batch = batch[:0]
}
}
}
// Flush remaining
if len(batch) > 0 {
sl.SendBatchToProxy(proxyID, batch)
}
return nil
}
Performance:
Parquet Loading:
1 TB vertices (10,000 files × 100 MB)
1000 workers × 200 MB/s = 200 GB/s aggregate
1 TB ÷ 200 GB/s = 5 seconds (vertices only)
10 TB edges (100,000 files × 100 MB)
10 TB ÷ 200 GB/s = 50 seconds (edges only)
Total: 55 seconds for 1 TB vertices + 10 TB edges
Format 2: Prometheus/Thanos (TSDB Blocks)
Schema:
Prometheus Block Structure:
block-{ulid}/
chunks/
000001 (compressed time series)
000002
...
index (inverted index: metric name → series)
meta.json (block metadata)
Time Series to Graph Mapping:
metric{labels} → Vertex
- vertex_id: metric name
- properties: labels + timestamp + value
Example:
http_requests_total{job="api",instance="10.0.0.1"} → Vertex
ID: "http_requests_total:api:10.0.0.1"
Label: "Metric"
Properties: {job: "api", instance: "10.0.0.1", value: 1234}
Loader Implementation:
func (sl *SnapshotLoader) LoadPrometheusSnapshot(blockPath string) error {
// Open Prometheus block
block, err := tsdb.OpenBlock(blockPath)
if err != nil {
return err
}
defer block.Close()
// Iterate time series
querier, err := block.Querier(context.Background(), 0, math.MaxInt64)
if err != nil {
return err
}
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
for seriesSet.Next() {
series := seriesSet.At()
// Convert series to vertex
vertex := &Vertex{
ID: sl.SeriesLabelsToVertexID(series.Labels()),
Label: "Metric",
Properties: map[string][]byte{
"metric_name": []byte(series.Labels().Get("__name__")),
},
}
// Add all label key-value pairs as properties
for _, label := range series.Labels() {
if label.Name != "__name__" {
vertex.Properties[label.Name] = []byte(label.Value)
}
}
// Create vertex
sl.CreateVertex(vertex)
// Optionally: Create edges between related metrics
// Example: Same job → Edge
}
return nil
}
Format 3: HDFS (Hadoop Distributed File System)
Schema:
HDFS Layout:
hdfs://namenode:9000/graph-snapshot/
vertices/
part-r-00000 (Hadoop SequenceFile)
part-r-00001
...
part-r-09999
edges/
part-r-00000
part-r-00001
...
part-r-99999
SequenceFile Format:
Key: LongWritable (vertex/edge ID)
Value: BytesWritable (protobuf-encoded vertex/edge)
Loader Implementation:
func (sl *SnapshotLoader) LoadHDFSSnapshot(hdfsPath string) error {
// Connect to HDFS
client, err := hdfs.New(sl.Config.HDFSNameNode)
if err != nil {
return err
}
defer client.Close()
// List vertex files
vertexFiles, err := client.ReadDir(hdfsPath + "/vertices")
if err != nil {
return err
}
// Parallel load across proxies
var wg sync.WaitGroup
for _, file := range vertexFiles {
wg.Add(1)
go func(filename string) {
defer wg.Done()
// Open SequenceFile
reader, err := seqfile.NewReader(client.Open(filename))
if err != nil {
log.Errorf("Failed to open %s: %v", filename, err)
return
}
defer reader.Close()
// Read records
for reader.Next() {
key := reader.Key()
value := reader.Value()
// Decode protobuf vertex
vertex := &prism.Vertex{}
proto.Unmarshal(value, vertex)
// Create vertex
sl.CreateVertex(vertex)
}
}(file.Name())
}
wg.Wait()
return nil
}
Format 4: Protobuf (Native)
Schema:
message GraphSnapshot {
SnapshotMetadata metadata = 1;
repeated VertexBatch vertex_batches = 2;
repeated EdgeBatch edge_batches = 3;
}
message SnapshotMetadata {
string snapshot_id = 1;
int64 timestamp = 2;
int64 vertex_count = 3;
int64 edge_count = 4;
string schema_version = 5;
}
message VertexBatch {
repeated Vertex vertices = 1;
}
message EdgeBatch {
repeated Edge edges = 1;
}
S3 Layout (chunked):
s3://prism-snapshots/graph-2025-11-15/
metadata.pb (100 KB)
vertices/
chunk-00000.pb (100 MB, ~1M vertices)
chunk-00001.pb
...
chunk-09999.pb
Total: 10,000 chunks, 1 TB
edges/
chunk-00000.pb (100 MB, ~500k edges)
chunk-00001.pb
...
chunk-99999.pb
Total: 100,000 chunks, 10 TB
Loader Implementation (zero-copy):
func (sl *SnapshotLoader) LoadProtobufSnapshot(s3Path string) error {
// Memory-map snapshot files for zero-copy access
vertexFiles := sl.ListS3Files(s3Path + "/vertices/*.pb")
for _, file := range vertexFiles {
// Download to local SSD
localPath := sl.DownloadToSSD(file)
// Memory-map file
mmap, err := syscall.Mmap(
int(os.Open(localPath)),
0,
os.Stat(localPath).Size(),
syscall.PROT_READ,
syscall.MAP_SHARED,
)
if err != nil {
return err
}
// Parse protobuf directly from mmap (zero-copy)
batch := &VertexBatch{}
proto.Unmarshal(mmap, batch)
// Vertices are now accessible without copying
for _, vertex := range batch.Vertices {
sl.AddVertexPointer(vertex) // Store pointer, not copy
}
}
return nil
}
Performance (fastest):
Protobuf Loading (with memory-map):
No deserialization overhead
Zero-copy access to vertex data
Loading speed: Network bandwidth limited
1000 proxies × 10 Gbps = 10 Tbps aggregate = 1.25 TB/s
210 TB ÷ 1.25 TB/s = 168 seconds = 2.8 minutes
Format 5: JSON Lines (Human-Readable)
Schema:
{"type":"vertex","id":"user:alice","label":"User","properties":{"name":"Alice","age":30}}
{"type":"edge","id":"follow:1","label":"FOLLOWS","from":"user:alice","to":"user:bob"}
{"type":"vertex","id":"user:bob","label":"User","properties":{"name":"Bob","age":25}}
Loader (simplest, slowest):
func (sl *SnapshotLoader) LoadJSONLinesSnapshot(s3Path string) error {
files := sl.ListS3Files(s3Path + "/*.jsonl.gz")
for _, file := range files {
reader := sl.OpenGzipS3File(file)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
var obj map[string]interface{}
json.Unmarshal([]byte(line), &obj)
switch obj["type"] {
case "vertex":
vertex := &Vertex{
ID: obj["id"].(string),
Label: obj["label"].(string),
Properties: obj["properties"].(map[string]interface{}),
}
sl.CreateVertex(vertex)
case "edge":
edge := &Edge{
ID: obj["id"].(string),
Label: obj["label"].(string),
FromVertexID: obj["from"].(string),
ToVertexID: obj["to"].(string),
}
sl.CreateEdge(edge)
}
}
}
return nil
}
Distributed WAL for Consistency
WAL Architecture
┌────────────────────────────────────────────────────────────────┐
│ Write Path │
│ │
│ Client → Proxy → Kafka WAL → [S3 + Memory + SSD] │
│ │
│ Step 1: Write to Kafka (durable, ordered) │
│ Step 2: Async apply to hot tier (memory) │
│ Step 3: Async apply to warm tier (SSD) │
│ Step 4: Async apply to cold tier (S3) │
│ │
└────────────────────────────────────────────────────────────────┘
Kafka Topic Configuration:
wal_kafka_config:
topic: graph-wal-cluster-{cluster_id}
partitions: 1000 # One per proxy
replication_factor: 3
retention_ms: 604800000 # 7 days
segment_ms: 3600000 # 1 hour segments
compression_type: lz4
# Performance tuning
batch_size: 16384
linger_ms: 10
max_request_size: 1048576 # 1 MB
WAL Consumer for Multi-Tier Updates
func (wc *WALConsumer) ConsumeAndApplyToTiers(partitionID string) {
consumer := wc.CreateKafkaConsumer(partitionID)
for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
continue
}
walOp := &WALOperation{}
proto.Unmarshal(msg.Value, walOp)
// Apply to hot tier (synchronous, must succeed)
err := wc.ApplyToHotTier(walOp)
if err != nil {
log.Errorf("Failed to apply to hot tier: %v", err)
continue // Don't commit offset
}
// Apply to warm tier (async, best-effort)
go wc.ApplyToWarmTier(walOp)
// Apply to cold tier (async, eventually consistent)
go wc.ApplyToColdTier(walOp)
// Commit offset
consumer.CommitMessage(msg)
}
}
func (wc *WALConsumer) ApplyToHotTier(op *WALOperation) error {
partition := wc.GetHotPartition(op.PartitionID)
if partition == nil {
return fmt.Errorf("partition not in hot tier: %s", op.PartitionID)
}
switch op.Type {
case OP_CREATE_VERTEX:
return partition.MemStore.CreateVertex(op.Vertex)
case OP_UPDATE_VERTEX:
return partition.MemStore.UpdateVertex(op.Vertex)
case OP_DELETE_VERTEX:
return partition.MemStore.DeleteVertex(op.VertexID)
case OP_CREATE_EDGE:
return partition.MemStore.CreateEdge(op.Edge)
case OP_DELETE_EDGE:
return partition.MemStore.DeleteEdge(op.EdgeID)
}
return nil
}
func (wc *WALConsumer) ApplyToWarmTier(op *WALOperation) error {
// Write to local SSD
ssd := wc.GetSSDStorage(op.PartitionID)
return ssd.AppendWALOperation(op)
}
func (wc *WALConsumer) ApplyToColdTier(op *WALOperation) error {
// Batch writes to S3 (expensive per-operation)
wc.S3BatchWriter.Enqueue(op)
return nil
}
S3 Batch Writer (Optimize Write Costs)
type S3BatchWriter struct {
buffer []*WALOperation
bufferSize int
maxSize int
flushTimer *time.Timer
}
func (sbw *S3BatchWriter) Enqueue(op *WALOperation) {
sbw.buffer = append(sbw.buffer, op)
// Flush when buffer full or timer expires
if len(sbw.buffer) >= sbw.maxSize {
sbw.Flush()
}
}
func (sbw *S3BatchWriter) Flush() {
if len(sbw.buffer) == 0 {
return
}
// Serialize batch
batch := &WALBatch{
Operations: sbw.buffer,
Timestamp: time.Now().Unix(),
}
data, _ := proto.Marshal(batch)
compressed := gzip.Compress(data)
// Write to S3
key := fmt.Sprintf("wal-batch-%d.pb.gz", batch.Timestamp)
sbw.S3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String("prism-wal-batches"),
Key: aws.String(key),
Body: bytes.NewReader(compressed),
})
// Clear buffer
sbw.buffer = sbw.buffer[:0]
}
Snapshot WAL Replay: Consistency During Bulk Loads
Problem: At 100B scale, loading a 210 TB snapshot from S3 takes 17 minutes (RFC-057). During this window, new writes arrive via the WAL. Where do these writes go? How do we ensure consistency?
The Version Skew Problem
Timeline:
T0: Snapshot created (version V1)
T1: Begin snapshot load (17-minute window starts)
T2-T16: Writes continue arriving via WAL
T17: Snapshot load complete (version V1)
Problem: Writes at T2-T16 NOT in snapshot (version V1)
- WAL contains operations from T2-T16
- Snapshot contains state up to T0
- Result: 17 minutes of data missing if WAL not replayed
Without WAL Replay:
- Snapshot load completes with state at T0
- Writes T2-T16 lost (data loss!)
- Queries see stale data (T0 state)
With WAL Replay:
- Snapshot load completes with state at T0
- Replay WAL from T0 to T17 (catch up)
- Final state: consistent as of T17
Dual-Version Loading Solution
Approach: Maintain two versions during snapshot load - "loading" (V1) and "active" (V2).
type PartitionLoader struct {
partitionID string
activeVersion *PartitionVersion // Current version (serving queries)
loadingVersion *PartitionVersion // New version (loading from S3)
walConsumer *WALConsumer
}
func (pl *PartitionLoader) LoadSnapshotWithWALReplay() error {
// Step 1: Note WAL position before starting load
walCheckpoint := pl.walConsumer.GetCurrentOffset()
log.Infof("Starting snapshot load at WAL offset %d", walCheckpoint)
// Step 2: Load snapshot from S3 (17 minutes)
pl.loadingVersion = &PartitionVersion{
ID: pl.partitionID,
Timestamp: time.Now().Unix(),
State: PARTITION_STATE_LOADING,
}
err := pl.LoadSnapshotFromS3(pl.loadingVersion)
if err != nil {
return fmt.Errorf("snapshot load failed: %w", err)
}
log.Infof("Snapshot loaded, replaying WAL from offset %d", walCheckpoint)
// Step 3: Replay WAL operations since snapshot creation
// (operations from walCheckpoint to current)
err = pl.ReplayWALSinceCheckpoint(walCheckpoint, pl.loadingVersion)
if err != nil {
return fmt.Errorf("WAL replay failed: %w", err)
}
// Step 4: Atomic swap to new version
pl.atomicSwap(pl.activeVersion, pl.loadingVersion)
log.Infof("Partition %s now serving version at offset %d", pl.partitionID, pl.walConsumer.GetCurrentOffset())
return nil
}
func (pl *PartitionLoader) ReplayWALSinceCheckpoint(
checkpoint int64,
target *PartitionVersion,
) error {
consumer := pl.walConsumer.CreateReaderFromOffset(checkpoint)
opsReplayed := 0
for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
break // Caught up to current offset
}
walOp := &WALOperation{}
proto.Unmarshal(msg.Value, walOp)
// Apply operation to loading version
err := pl.ApplyWALOperation(target, walOp)
if err != nil {
return fmt.Errorf("failed to apply WAL op: %w", err)
}
opsReplayed++
}
log.Infof("Replayed %d WAL operations during catch-up", opsReplayed)
return nil
}
func (pl *PartitionLoader) atomicSwap(old, new *PartitionVersion) {
pl.mu.Lock()
defer pl.mu.Unlock()
// Atomic pointer swap
pl.activeVersion = new
pl.loadingVersion = nil
// Old version garbage collected
}
Shadow Graph Implementation
During Snapshot Load: Queries continue on activeVersion while loadingVersion builds in background.
Query Flow During Snapshot Load:
Client Query → Partition Executor → activeVersion (V1, serving)
↓
loadingVersion (V2, loading from S3)
Writes:
WAL → [activeVersion, loadingVersion] // Dual-write to both
After Swap:
activeVersion = V2 (fully loaded + WAL replay)
loadingVersion = nil (GC'd)
Dual-Write Strategy:
func (pl *PartitionLoader) ApplyWrite(op *WALOperation) error {
// Write to active version (must succeed)
err := pl.ApplyWALOperation(pl.activeVersion, op)
if err != nil {
return err
}
// Write to loading version if present (best-effort)
if pl.loadingVersion != nil {
go pl.ApplyWALOperation(pl.loadingVersion, op)
}
return nil
}
WAL Replay Performance Analysis
Scenario: 17-minute snapshot load window
Write rate: 10k ops/sec per partition
Operations during load: 10k ops/sec × 1,020 seconds = 10.2M operations
WAL replay:
Deserialization: 10.2M ops × 10 μs = 102 seconds
Application: 10.2M ops × 50 μs = 510 seconds
Total: 612 seconds = 10.2 minutes
Total snapshot load: 17 min (S3) + 10.2 min (WAL) = 27.2 minutes
Optimization: Parallel WAL Replay
func (pl *PartitionLoader) ReplayWALParallel(
checkpoint int64,
target *PartitionVersion,
parallelism int,
) error {
// Divide WAL into N chunks
chunks := pl.DivideWALIntoChunks(checkpoint, parallelism)
// Replay each chunk in parallel
var wg sync.WaitGroup
errors := make(chan error, parallelism)
for _, chunk := range chunks {
wg.Add(1)
go func(c *WALChunk) {
defer wg.Done()
err := pl.ReplayWALChunk(c, target)
if err != nil {
errors <- err
}
}(chunk)
}
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
if err != nil {
return err
}
}
return nil
}
Parallel Replay Performance:
Parallelism: 10 workers
Replay time: 612 seconds ÷ 10 = 61 seconds (10× speedup)
Total load: 17 min (S3) + 1 min (WAL) = 18 minutes
Consistency Guarantees
Read Consistency:
- Queries ALWAYS see consistent state (either V1 or V2, never mix)
- No torn reads during snapshot load
- Atomic swap ensures no intermediate state visible
Write Consistency:
- All writes go to WAL first (durable)
- Writes applied to activeVersion immediately
- Writes applied to loadingVersion best-effort
- After swap, WAL replay ensures no writes lost
Failure Handling:
failure_scenarios:
snapshot_load_failure:
action: Abort load, keep activeVersion serving
impact: No data loss, retry snapshot load
wal_replay_failure:
action: Abort swap, keep activeVersion serving
impact: No data loss, retry from checkpoint
proxy_crash_during_load:
action: Restart, activeVersion still serving
impact: Retry snapshot load from beginning
Query Execution Across Tiers
Transparent Tier Fallback
func (qe *QueryExecutor) GetVertex(vertexID string) (*Vertex, error) {
partition := qe.GetPartitionForVertex(vertexID)
// Try hot tier (memory)
if partition.Temperature >= TEMPERATURE_HOT {
vertex, err := partition.MemStore.GetVertex(vertexID)
if err == nil {
qe.Metrics.RecordHit("hot")
return vertex, nil
}
}
// Try warm tier (SSD)
if partition.Temperature >= TEMPERATURE_WARM {
vertex, err := partition.SSD.GetVertex(vertexID)
if err == nil {
qe.Metrics.RecordHit("warm")
// Promote to hot tier if frequently accessed
qe.ConsiderPromotion(partition.ID)
return vertex, nil
}
}
// Fallback to cold tier (S3)
vertex, err := partition.LoadFromS3(vertexID)
if err != nil {
return nil, err
}
qe.Metrics.RecordHit("cold")
// Optionally cache in warm/hot tier
qe.ConsiderPromotion(partition.ID)
return vertex, nil
}
Prefetching for Multi-Hop Traversals
func (qe *QueryExecutor) TraverseWithPrefetch(
startVertexID string,
hops int,
) ([]Vertex, error) {
currentLevel := []string{startVertexID}
for hop := 0; hop < hops; hop++ {
// Prefetch neighbors for all current level vertices
nextLevelIDs := qe.PrefetchNeighbors(currentLevel)
// Execute traversal (neighbors already loaded)
nextLevel := []Vertex{}
for _, id := range nextLevelIDs {
vertex, _ := qe.GetVertex(id) // Fast: Already in cache
nextLevel = append(nextLevel, *vertex)
}
currentLevel = nextLevelIDs
}
return currentLevel, nil
}
func (qe *QueryExecutor) PrefetchNeighbors(vertexIDs []string) []string {
// Group by partition
partitionGroups := qe.GroupByPartition(vertexIDs)
neighborIDs := []string{}
// Prefetch in parallel
var wg sync.WaitGroup
for partitionID, vIDs := range partitionGroups {
wg.Add(1)
go func(pID string, ids []string) {
defer wg.Done()
// Load neighbors from appropriate tier
neighbors := qe.LoadNeighborsForPartition(pID, ids)
neighborIDs = append(neighborIDs, neighbors...)
}(partitionID, vIDs)
}
wg.Wait()
return neighborIDs
}
Performance Characteristics
Snapshot Loading Performance
| Format | Compression | Chunk Size | Workers | Total Time (210 TB) |
|---|---|---|---|---|
| Protobuf (mmap) | gzip (5:1) | 100 MB | 1000 | 2.8 minutes |
| Parquet (Arrow) | Snappy (10:1) | 100 MB | 1000 | 17 minutes |
| HDFS (SequenceFile) | Snappy (4:1) | 100 MB | 1000 | 20 minutes |
| Prometheus (TSDB) | Custom (3:1) | Block-level | 1000 | 25 minutes |
| JSON Lines | gzip (2:1) | 100 MB | 1000 | 60 minutes |
Query Latency by Tier
| Query Type | Hot (Memory) | Warm (SSD) | Cold (S3) | Mixed (90% Cold) |
|---|---|---|---|---|
| Single vertex | 50 μs | 100 μs | 50 ms | 45 ms |
| 1-hop traversal | 500 μs | 1 ms | 200 ms | 180 ms |
| 2-hop traversal | 2 ms | 10 ms | 2 s | 1.8 s |
| Property filter | 2 s | 5 s | 60 s | 54 s |
Cost Analysis
Pure In-Memory (baseline):
210 TB RAM × $500/TB/month = $105,000/month
Hot/Cold Tiered (10% hot):
Hot: 21 TB RAM × $500/TB/month = $10,500/month
Cold: 189 TB S3 × $23/TB/month = $4,347/month
Total: $14,847/month
Savings: $90,153/month (86% reduction)
Hot/Cold Tiered (20% hot, better performance):
Hot: 42 TB RAM × $500/TB/month = $21,000/month
Cold: 168 TB S3 × $23/TB/month = $3,864/month
Total: $24,864/month
Savings: $80,136/month (76% reduction)
S3 Cost Optimization Strategy
Problem: At 100B scale, S3 request costs dominate storage costs (MEMO-050 Finding 1). The original cost model significantly underestimated true TCO.
The Hidden Cost of S3
Original Cost Estimate (Storage Only):
Cold storage: 168 TB × $23/TB/month (S3 Standard) = $3,864/month
Annual: $46,368/year ✓ (correctly calculated)
True Cost Including Requests (90% cold tier access):
Assumptions:
- 1B queries/sec at 100B scale
- 10% cache hit rate (90% miss → S3 requests)
- Cold tier serves 90% of misses
- Average query touches 100 partitions
S3 GET requests per second:
1B queries/sec × 90% miss × 90% cold × 100 partitions = 81B S3 GETs/sec
S3 GET pricing:
$0.0004 per 1000 requests = $0.0000004 per request
Monthly S3 request cost:
81B requests/sec × 86,400 sec/day × 30 days/month × $0.0000004
= 210 trillion requests/month
= $84M/month in S3 GET requests alone ❌
vs Storage cost:
$3,864/month (negligible compared to request costs)
True monthly cost without optimization: $84M/month
True annual cost: $1B/year (not $46k/year) ❌
Key Insight: At massive scale, S3 request costs dwarf storage costs by 2000×. The hidden cost of S3 is not storage—it's the API calls.
Multi-Tier Caching Architecture
To reduce S3 request costs from $84M/month to manageable levels, implement aggressive multi-tier caching:
Tier 0: Proxy-Local Cache (Varnish)
Purpose: Absorb repeated queries to same partitions within single proxy
proxy_local_cache:
technology: Varnish HTTP cache
storage: 100 GB SSD per proxy (1000 proxies = 100 TB cluster-wide)
ttl: 300 seconds (5 minutes)
eviction: LRU
performance:
hit_latency: 1 ms (local SSD read)
hit_rate: 30% (repeated queries within 5 min window)
cost: $100/TB/month (SSD) × 100 GB = $10/proxy/month
total_cost: $10,000/month for 1000 proxies
Impact: 30% of queries never reach S3 (absorbed locally).
Tier 1: CloudFront CDN
Purpose: Global edge caching for frequently accessed partitions across all proxies
cloudfront_cdn:
edge_locations: 400+ globally
cache_behavior:
default_ttl: 3600 seconds (1 hour)
max_ttl: 86400 seconds (24 hours)
min_ttl: 60 seconds
pricing:
data_transfer: $0.085/GB (first 10 TB)
http_requests: $0.0075 per 10,000 requests (85× cheaper than S3 GETs)
performance:
hit_latency: 50 ms (edge to user)
hit_rate: 60% (of queries not absorbed by Tier 0)
effective_rate: 60% × 70% remaining = 42% of total
cost_calculation:
Queries reaching Tier 1: 1B queries/sec × 70% (missed Tier 0) = 700M queries/sec
CloudFront hits: 700M × 60% = 420M queries/sec
CloudFront GET cost: 420M req/sec × 86,400 sec/day × 30 days/month × $0.00000075
= $816,480/month (vs $35M/month if those hit S3)
Impact: Additional 42% of queries served from CloudFront edge (total 72% never reach S3).
Tier 2: S3 Express One Zone (Hot Partitions)
Purpose: Faster S3 access for frequently accessed cold partitions
s3_express_one_zone:
use_case: Hot cold partitions (top 10% of cold tier by access frequency)
storage: 16.8 TB (10% of 168 TB cold storage)
pricing:
storage: $0.16/GB/month = $160/TB/month
requests: $0.0008 per 1000 requests (2× cheaper than S3 Standard)
performance:
latency: 5-10 ms (vs 50-100 ms S3 Standard)
hit_rate: 15% (of queries reaching S3)
cost_calculation:
Storage: 16.8 TB × $160/TB/month = $2,688/month
Requests: 280M queries/sec × 15% × 86,400 × 30 × $0.0000008
= $8.7M/month (vs $13M if S3 Standard)
Total: $8.7M/month
Impact: 15% of S3-bound queries use faster, cheaper S3 Express.
Tier 3: Batch S3 Reads (Cold Partitions)
Purpose: Amortize S3 request costs across multiple partitions
s3_batch_reads:
strategy: Prefetch adjacent partitions when loading one partition
batch_size: 10 partitions per S3 GetObject call
s3_multipart_api:
operation: S3 Select or Athena for range reads
benefit: Single GET retrieves multiple partitions
cost_impact:
Before: 100 partitions × 100 S3 GETs = 10,000 requests
After: 100 partitions ÷ 10 per batch = 10 requests
Reduction: 1000× fewer S3 requests
latency_trade_off:
Increased transfer time: 156 MB × 10 = 1.56 GB per request
At 1 Gbps: 12.5 seconds (vs 5s for single partition)
Acceptable: Cold queries are rare (<10% of workload)
Impact: 1000× reduction in S3 requests for remaining 13% of queries.
Revised Cost Model
With Full Multi-Tier Caching:
Tier 0 (Proxy-Local Varnish): 30% hit rate
Cost: $10,000/month
Queries absorbed: 300M queries/sec
Tier 1 (CloudFront CDN): 42% additional hit rate (72% cumulative)
Cost: $816,480/month
Queries absorbed: 420M queries/sec
Tier 2 (S3 Express One Zone): 15% of S3-bound queries
Storage: $2,688/month
Requests: $8.7M/month
Queries: 42M queries/sec
Tier 3 (Batch S3 Standard): 13% of S3-bound queries with 1000× batching
Storage: $3,864/month (168 TB - 16.8 TB = 151.2 TB at $23/TB)
Requests: 36.4M queries/sec ÷ 1000 batch factor = 36,400 req/sec
Request cost: 36,400 req/sec × 86,400 × 30 × $0.0000004 = $37,791/month
Total Monthly Cost (with optimization): $9.6M/month
Total Annual Cost: $115M/year
vs Without optimization: $1B/year
Savings: $885M/year (88.5% reduction) ✅
Cost Optimization Roadmap by Scale
1B Vertices (10 Nodes): Minimal optimization needed
scale: 1B vertices
query_rate: 10M queries/sec
strategy: Proxy-local cache only
cost: $10k/month (affordable without CDN)
10B Vertices (100 Nodes): Add CloudFront CDN
scale: 10B vertices
query_rate: 100M queries/sec
strategy: Proxy-local + CloudFront
cost: $1M/month
savings: $9M/month vs no optimization (90%)
100B Vertices (1000 Nodes): Full multi-tier caching
scale: 100B vertices
query_rate: 1B queries/sec
strategy: All 4 tiers (Varnish + CloudFront + S3 Express + Batch)
cost: $9.6M/month
savings: $74.4M/month vs no optimization (88.5%)
mandatory: Yes - system unusable without optimization
Alternative Approaches Considered
Option 1: Increase Hot Tier to 50% (Rejected)
- Approach: Keep 50% of data in memory to reduce S3 load
- Cost: 105 TB RAM × $500/TB/month = $52.5M/month
- Rejected Why: More expensive than optimized caching ($9.6M/month), negates hot/cold benefit
Option 2: Use Only S3 Glacier (Rejected)
- Approach: Store all cold data in Glacier ($1/TB/month)
- Latency: 12-hour retrieval time (unacceptable for queries)
- Rejected Why: Breaks query SLAs, defeats purpose of cold tier
Option 3: Compress All Cold Data 10× (Evaluated)
- Approach: Aggressive compression (zstd level 19)
- Trade-off: 10× storage reduction but 3× slower decompression
- Decision: Use compression for S3 storage, but decompress on load (already planned)
Option 4: No Cold Tier, Pure In-Memory (Rejected)
- Cost: 210 TB RAM × $500/TB/month = $105M/month
- Rejected Why: 10× more expensive than optimized hot/cold architecture
Integration with Temperature Management
Coordinated Cache Warming:
func (cm *CacheManager) OnPartitionPromotion(partitionID string) {
// When partition promoted from Cold → Warm:
// 1. Prefetch to CloudFront
cm.CloudFront.Prefetch(partitionID)
// 2. Load to local caches across cluster
for _, proxy := range cm.GetProxies() {
proxy.LocalCache.Warm(partitionID)
}
// 3. Mark for S3 Express migration if sustained high access
if cm.GetAccessRate(partitionID) > 1000 {
cm.MigrateToS3Express(partitionID)
}
}
Cache Invalidation:
func (cm *CacheManager) OnPartitionUpdate(partitionID string, version int64) {
// Invalidate all cache tiers to ensure consistency
cm.LocalCache.Invalidate(partitionID)
cm.CloudFront.InvalidatePath(partitionID, version)
// S3 is eventually consistent, no invalidation needed
}
Summary
S3 cost optimization is mandatory at 100B scale. Without multi-tier caching, S3 request costs reach $1B/year (vs $46k/year originally estimated). With optimization:
- Total cost: $115M/year (vs $1B/year unoptimized)
- Savings: $885M/year (88.5% reduction)
- Architecture: 4-tier caching (Varnish → CloudFront → S3 Express → Batch S3)
- Query impact: 72% of queries never reach S3 (absorbed by Tiers 0-1)
- Scalability: Each tier provides specific optimization for different access patterns
Key Takeaway: The true cost of S3 at massive scale is not storage ($46k/year) but requests ($1B/year without optimization). This finding corrects the original TCO from $7M/year to $115M/year—still 50% cheaper than pure in-memory, but requires aggressive caching architecture.
Related RFCs
- RFC-057: Massive-Scale Graph Sharding - Distributed sharding for 100B vertices
- RFC-058: Multi-Level Graph Indexing - Indexing strategy for queries
- RFC-060: Distributed Gremlin Query Execution - Query optimization
- RFC-061: Graph Authorization with Vertex Labels - Access control
- RFC-051: Write-Ahead Log Pattern - WAL architecture
- RFC-055: Graph Pattern - Base graph pattern
Open Questions
- S3 Request Costs: How to minimize S3 GET requests for cold tier?
- Snapshot Consistency: How to handle concurrent updates during snapshot load?
- Partial Snapshots: Support incremental snapshots for faster updates?
- Compression Trade-offs: Optimal compression ratio vs decompression speed?
- Multi-Region S3: How to handle cross-region snapshot replication?
References
- Apache Parquet Format
- Apache Arrow IPC
- Prometheus TSDB Format
- Thanos Long-Term Storage
- AWS S3 Performance Guidelines
Revision History
- 2025-11-15: Initial draft - Hot/cold storage tiers with rapid S3 snapshot loading