Skip to main content

RFC-059: Hot/Cold Storage Tiers with Rapid S3 Snapshot Loading

Status: Draft Author: Platform Team Created: 2025-11-15 Updated: 2025-11-15

Abstract

This RFC defines a hot/cold storage tier architecture for massive-scale graph data (100B vertices) with rapid bulk loading from S3 snapshots. At this scale, pure in-memory storage requires 210 TB RAM ($105M/year), making it economically infeasible. Hot/cold tiering keeps frequently accessed data in memory (hot tier: 21 TB, 10%) and infrequently accessed data in S3 (cold tier: 189 TB, 90%), reducing costs by 95% while maintaining query performance. The system supports extremely rapid updates via parallel snapshot loading from S3 with 100MB chunk sizes, supporting multiple formats: Parquet (columnar), Prometheus/Thanos (time-series), HDFS (Hadoop), Protobuf (native), and JSON Lines (human-readable).

Key Innovations:

  • Cost Optimization: 95% cost reduction vs pure in-memory ($12.5k/month vs $105k/month)
  • Multi-Format Support: Parquet, Prometheus, HDFS, Protobuf, JSON Lines
  • Parallel Loading: 1000 workers load 10 TB snapshot in 60 seconds
  • Access Pattern Learning: ML-based hot/cold classification
  • Distributed WAL: Kafka-based write-ahead log for consistency
  • Zero-Copy Loading: Memory-mapped files for instant access

Motivation

Cost Analysis: Pure In-Memory vs Hot/Cold Tiers

Pure In-Memory Architecture (RFC-055 baseline):

Graph Data:
100B vertices × 100 bytes = 10 TB
10T edges × 20 bytes = 200 TB
Total: 210 TB

Infrastructure:
210 TB ÷ 30 GB per proxy = 7,000 proxies
AWS r6i.2xlarge: 64 GB RAM, $583/month
Total: 7,000 proxies × $583/month = $4,081,000/month

Annual cost: $48,972,000/year

Hot/Cold Tiered Architecture (this RFC):

Hot Tier (10% of data):
21 TB in-memory across 1,000 proxies
1,000 proxies × $583/month = $583,000/month

Cold Tier (90% of data):
189 TB on S3 Standard
189 TB × $23/TB/month = $4,347/month

Warm Tier (caching):
50 TB on local NVMe SSD (proxy-local)
Included in proxy cost

Total: $587,347/month = $7,048,164/year
Annual savings: $48.9M - $7.0M = $41.9M (86% reduction)

Why Rapid Snapshot Loading?

Problem: Traditional graph loading is too slow for operational use

Traditional Approach (Insert per vertex/edge):
100B vertices × 1ms per insert = 100M seconds = 1,157 days
Even with 1000 parallel workers: 1.16 days

Snapshot Approach (Bulk load):
Load 210 TB snapshot in parallel
1000 workers × 200 MB/s = 200 GB/s aggregate
210 TB ÷ 200 GB/s = 1,050 seconds = 17.5 minutes
Speedup: 95,880×

Use Cases for Rapid Loading:

  1. Daily Full Refresh: Load entire graph from nightly batch job
  2. Disaster Recovery: Rebuild cluster from S3 in <20 minutes
  3. Development/Staging: Spin up new clusters with production-like data
  4. A/B Testing: Load multiple graph versions for experimentation
  5. Time Travel: Load historical snapshots for analysis

Why Multiple Snapshot Formats?

Different data sources produce different formats:

FormatUse CaseCompressionRead SpeedEcosystem
ParquetETL pipelines, analyticsExcellent (10:1)Fast (columnar)Spark, Presto, Trino
PrometheusTime-series metrics, monitoringGood (3:1)Fast (Thanos)Prometheus, Grafana
HDFSHadoop ecosystem, MapReduceGood (Snappy)Fast (parallel)Hadoop, Hive
ProtobufNative graph format, lowest latencyExcellent (5:1)FastestPrism native
JSON LinesHuman-readable, debuggingPoor (2:1)SlowUniversal

Example: Enterprise data pipeline

Source Systems → Data Lake → Graph Snapshot → Prism

Step 1: Spark ETL job exports to Parquet
- Users table → s3://data-lake/users.parquet
- Transactions table → s3://data-lake/transactions.parquet

Step 2: Prism loader converts Parquet → Graph
- Users → Vertices
- Transactions → Edges

Step 3: Prism loads graph in 17 minutes
- No intermediate conversion needed
- Direct Parquet → in-memory graph

Goals

  1. Cost Efficiency: 95% cost reduction vs pure in-memory
  2. Rapid Loading: Load 210 TB snapshot in <20 minutes
  3. Multi-Format Support: Parquet, Prometheus, HDFS, Protobuf, JSON Lines
  4. Hot/Cold Transparency: Queries work seamlessly across tiers
  5. Access Pattern Learning: Automatically identify hot/cold data
  6. Distributed WAL: Consistent updates across all tiers
  7. Zero-Copy Loading: Avoid memory copies during snapshot load

Non-Goals

  • Real-Time Streaming: Snapshot loading is batch-oriented (streaming via RFC-051 WAL)
  • Schema Inference: Snapshot schema must be explicit (no auto-detection)
  • Cross-Format Joins: Each snapshot is single format
  • Versioning/Time Travel: Snapshot versioning is external concern

Hot/Cold Storage Architecture

Three-Tier Storage Model

┌────────────────────────────────────────────────────────────────┐
│ Tier 1: Hot (Memory) │
│ │
│ Frequently accessed data: Last 7 days, VIP users, trending │
│ Storage: In-memory (MemStore) │
│ Capacity: 21 TB (10% of total) │
│ Access latency: 50 μs │
│ Cost: $583k/month │
│ │
└────────────────────────────────────────────────────────────────┘
↕ Promotion/Demotion
┌────────────────────────────────────────────────────────────────┐
│ Tier 2: Warm (Local SSD) │
│ │
│ Recently accessed: Last 30 days, active users │
│ Storage: Local NVMe SSD (proxy-attached) │
│ Capacity: 50 TB │
│ Access latency: 100 μs │
│ Cost: Included in proxy hardware │
│ │
└────────────────────────────────────────────────────────────────┘
↕ Fetch on demand
┌────────────────────────────────────────────────────────────────┐
│ Tier 3: Cold (S3) │
│ │
│ Infrequently accessed: Historical data, inactive users │
│ Storage: S3 Standard │
│ Capacity: 189 TB (90% of total) │
│ Access latency: 50-100 ms │
│ Cost: $4.3k/month │
│ │
└────────────────────────────────────────────────────────────────┘

Partition Temperature States

enum PartitionTemperature {
TEMPERATURE_HOT = 0; // In memory, frequently accessed
TEMPERATURE_WARM = 1; // On local SSD, occasionally accessed
TEMPERATURE_COLD = 2; // On S3, rarely accessed
TEMPERATURE_FROZEN = 3; // On S3 Glacier, never accessed
}

message PartitionMetrics {
string partition_id = 1;
PartitionTemperature temperature = 2;

// Access metrics
int64 requests_per_minute = 3;
int64 last_access_timestamp = 4;
int64 total_accesses = 5;

// Storage locations
bool in_memory = 6;
bool on_ssd = 7;
string s3_path = 8;

// Size metrics
int64 vertex_count = 9;
int64 edge_count = 10;
int64 memory_bytes = 11;
int64 ssd_bytes = 12;
int64 s3_bytes = 13;
}

Temperature Classification Algorithm

ML-Based Classification:

type TemperatureClassifier struct {
model *xgboost.Model
}

func (tc *TemperatureClassifier) ClassifyPartition(metrics *PartitionMetrics) PartitionTemperature {
features := []float64{
float64(metrics.RequestsPerMinute),
float64(time.Now().Unix() - metrics.LastAccessTimestamp),
float64(metrics.TotalAccesses),
tc.calculateAccessRecency(metrics),
tc.calculateAccessFrequency(metrics),
tc.calculateAccessBurstiness(metrics),
}

prediction := tc.model.Predict(features)

switch {
case prediction > 0.8:
return TEMPERATURE_HOT
case prediction > 0.5:
return TEMPERATURE_WARM
case prediction > 0.2:
return TEMPERATURE_COLD
default:
return TEMPERATURE_FROZEN
}
}

// Access recency: How recently was partition accessed?
func (tc *TemperatureClassifier) calculateAccessRecency(m *PartitionMetrics) float64 {
hoursSinceAccess := float64(time.Now().Unix()-m.LastAccessTimestamp) / 3600.0
return math.Exp(-hoursSinceAccess / 24.0) // Exponential decay over 24 hours
}

// Access frequency: How often is partition accessed?
func (tc *TemperatureClassifier) calculateAccessFrequency(m *PartitionMetrics) float64 {
return float64(m.RequestsPerMinute) / 1000.0 // Normalize to [0, 1]
}

// Access burstiness: Are accesses clustered or spread out?
func (tc *TemperatureClassifier) calculateAccessBurstiness(m *PartitionMetrics) float64 {
// Use coefficient of variation from access history
return m.AccessHistogram.CoefficientOfVariation()
}

Rule-Based Classification (simpler alternative):

temperature_rules:
hot:
promote_threshold:
requests_per_minute: ">= 1000"
last_access: "<= 1 hour"
demote_threshold:
requests_per_minute: "< 800" # 20% hysteresis
last_access: "> 1 hour"
cooldown_period: 5m # Minimum time before demotion

warm:
promote_threshold:
requests_per_minute: ">= 10"
last_access: "<= 24 hours"
demote_threshold:
requests_per_minute: "< 8" # 20% hysteresis
last_access: "> 24 hours"
cooldown_period: 10m # Minimum time before demotion

cold:
promote_threshold:
requests_per_minute: ">= 1"
last_access: "<= 7 days"
demote_threshold:
requests_per_minute: "< 1" # No hysteresis (already at floor)
last_access: "> 7 days"
cooldown_period: 0m

frozen:
requests_per_minute: "< 1"
last_access: "> 30 days"

Hysteresis Rationale:

At 100B scale with 16,000 partitions, temperature thrashing causes excessive data movement and performance degradation. Without hysteresis, partitions near threshold boundaries oscillate between states multiple times per minute.

Problem: Thrashing Without Hysteresis

Partition with fluctuating load: 990-1010 rpm

Without hysteresis (single threshold: 1000 rpm):
t=0s: 1010 rpm → HOT (promote)
t=60s: 990 rpm → WARM (demote)
t=120s: 1005 rpm → HOT (promote)
t=180s: 995 rpm → WARM (demote)
t=240s: 1010 rpm → HOT (promote)

Result: 4 state changes in 5 minutes
Cost: 4 × 156 MB data movements = 624 MB wasted bandwidth

Solution: 20% Hysteresis + Cooldown

Same partition with hysteresis:
Promote to HOT: >= 1000 rpm
Demote from HOT: < 800 rpm (20% below)
Cooldown: 5 minutes minimum in HOT before demotion

With hysteresis:
t=0s: 1010 rpm → HOT (promote)
t=60s: 990 rpm → HOT (still above 800 rpm demote threshold)
t=120s: 1005 rpm → HOT (stays)
t=180s: 995 rpm → HOT (stays)
t=240s: 1010 rpm → HOT (stays)

Result: 1 state change in 5 minutes (stabilized)
Cost: 1 × 156 MB data movement = 156 MB (75% reduction)

Hysteresis Values Explained:

  • Hot ↔ Warm: 20% hysteresis (1000 rpm promote, 800 rpm demote)

    • Rationale: Hot tier is expensive (memory), prevent unnecessary churn
    • Cooldown: 5 minutes (enough for burst traffic to settle)
    • Impact: Partitions stay hot unless sustained drop below 800 rpm
  • Warm ↔ Cold: 20% hysteresis (10 rpm promote, 8 rpm demote)

    • Rationale: Prevent oscillation for rarely-accessed partitions
    • Cooldown: 10 minutes (longer than hot, less critical tier)
    • Impact: Once promoted to warm, stays warm unless truly idle
  • Cold ↔ Frozen: No hysteresis

    • Rationale: Already at lowest tier, no performance impact from oscillation
    • Both are on S3, just different access patterns

Performance Impact:

MetricWithout HysteresisWith HysteresisImprovement
State changes/hour2446× reduction
Data movements/hour3.7 GB0.6 GB84% reduction
Cache hit rate70%88%18% improvement
Partition load time5s avg2s avg60% faster

Promotion and Demotion Policies

func (pm *PartitionManager) MonitorTemperature() {
ticker := time.NewTicker(60 * time.Second)

for range ticker.C {
partitions := pm.GetAllPartitions()

for _, partition := range partitions {
metrics := pm.GetMetrics(partition.ID)
newTemp := pm.Classifier.ClassifyPartition(metrics)

if newTemp != partition.Temperature {
pm.TransitionTemperature(partition.ID, newTemp)
}
}
}
}

func (pm *PartitionManager) TransitionTemperature(
partitionID string,
newTemp PartitionTemperature,
) error {
partition := pm.GetPartition(partitionID)
oldTemp := partition.Temperature

switch {
case oldTemp == TEMPERATURE_COLD && newTemp == TEMPERATURE_HOT:
// Cold → Hot: Load from S3 to memory
return pm.PromoteToHot(partitionID)

case oldTemp == TEMPERATURE_COLD && newTemp == TEMPERATURE_WARM:
// Cold → Warm: Load from S3 to SSD
return pm.PromoteToWarm(partitionID)

case oldTemp == TEMPERATURE_HOT && newTemp == TEMPERATURE_COLD:
// Hot → Cold: Evict from memory to S3
return pm.DemoteToCold(partitionID)

case oldTemp == TEMPERATURE_WARM && newTemp == TEMPERATURE_COLD:
// Warm → Cold: Evict from SSD to S3
return pm.DemoteToCold(partitionID)

default:
return nil // No transition needed
}
}

Snapshot Formats and Loading

Format 1: Parquet (Apache Arrow)

Schema:

Vertex Schema (Parquet):
- vertex_id: STRING (required)
- label: STRING (required)
- properties: MAP<STRING, BINARY> (required)
- created_at: TIMESTAMP (optional)
- updated_at: TIMESTAMP (optional)

Edge Schema (Parquet):
- edge_id: STRING (required)
- label: STRING (required)
- from_vertex_id: STRING (required)
- to_vertex_id: STRING (required)
- properties: MAP<STRING, BINARY> (optional)
- created_at: TIMESTAMP (optional)

S3 Layout:

s3://prism-snapshots/graph-2025-11-15/
vertices/
part-00000.parquet (100 MB, 1M vertices)
part-00001.parquet (100 MB, 1M vertices)
...
part-09999.parquet (100 MB, 1M vertices)
Total: 10,000 files, 1 TB, 10B vertices

edges/
part-00000.parquet (100 MB, 500k edges)
part-00001.parquet (100 MB, 500k edges)
...
part-99999.parquet (100 MB, 500k edges)
Total: 100,000 files, 10 TB, 10T edges

Loader Implementation:

func (sl *SnapshotLoader) LoadParquetSnapshot(s3Path string) error {
// Step 1: List all Parquet files
vertexFiles := sl.ListS3Files(s3Path + "/vertices/*.parquet")
edgeFiles := sl.ListS3Files(s3Path + "/edges/*.parquet")

// Step 2: Distribute files across 1000 proxies
proxies := sl.GetAllProxies()
vertexChunks := sl.DistributeFiles(vertexFiles, len(proxies))
edgeChunks := sl.DistributeFiles(edgeFiles, len(proxies))

// Step 3: Parallel load (1000 workers)
var wg sync.WaitGroup

for i, proxy := range proxies {
wg.Add(1)
go func(proxyID string, vFiles, eFiles []string) {
defer wg.Done()

// Load vertices
for _, file := range vFiles {
sl.LoadParquetVertexFile(proxyID, file)
}

// Load edges
for _, file := range eFiles {
sl.LoadParquetEdgeFile(proxyID, file)
}
}(proxy.ID, vertexChunks[i], edgeChunks[i])
}

wg.Wait()
return nil
}

func (sl *SnapshotLoader) LoadParquetVertexFile(proxyID string, s3File string) error {
// Stream from S3 with Arrow IPC
reader, err := parquet.OpenS3File(s3File)
if err != nil {
return err
}
defer reader.Close()

batch := make([]*Vertex, 0, 10000)

for reader.Next() {
// Read Arrow record batch
record := reader.Record()

// Convert to vertices
for i := 0; i < int(record.NumRows()); i++ {
vertex := &Vertex{
ID: record.Column(0).ValueString(i),
Label: record.Column(1).ValueString(i),
Properties: sl.DecodeProperties(record.Column(2).Value(i)),
}
batch = append(batch, vertex)

// Flush batch every 10k vertices
if len(batch) >= 10000 {
sl.SendBatchToProxy(proxyID, batch)
batch = batch[:0]
}
}
}

// Flush remaining
if len(batch) > 0 {
sl.SendBatchToProxy(proxyID, batch)
}

return nil
}

Performance:

Parquet Loading:
1 TB vertices (10,000 files × 100 MB)
1000 workers × 200 MB/s = 200 GB/s aggregate
1 TB ÷ 200 GB/s = 5 seconds (vertices only)

10 TB edges (100,000 files × 100 MB)
10 TB ÷ 200 GB/s = 50 seconds (edges only)

Total: 55 seconds for 1 TB vertices + 10 TB edges

Format 2: Prometheus/Thanos (TSDB Blocks)

Schema:

Prometheus Block Structure:
block-{ulid}/
chunks/
000001 (compressed time series)
000002
...
index (inverted index: metric name → series)
meta.json (block metadata)

Time Series to Graph Mapping:
metric{labels} → Vertex
- vertex_id: metric name
- properties: labels + timestamp + value

Example:
http_requests_total{job="api",instance="10.0.0.1"} → Vertex
ID: "http_requests_total:api:10.0.0.1"
Label: "Metric"
Properties: {job: "api", instance: "10.0.0.1", value: 1234}

Loader Implementation:

func (sl *SnapshotLoader) LoadPrometheusSnapshot(blockPath string) error {
// Open Prometheus block
block, err := tsdb.OpenBlock(blockPath)
if err != nil {
return err
}
defer block.Close()

// Iterate time series
querier, err := block.Querier(context.Background(), 0, math.MaxInt64)
if err != nil {
return err
}

seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

for seriesSet.Next() {
series := seriesSet.At()

// Convert series to vertex
vertex := &Vertex{
ID: sl.SeriesLabelsToVertexID(series.Labels()),
Label: "Metric",
Properties: map[string][]byte{
"metric_name": []byte(series.Labels().Get("__name__")),
},
}

// Add all label key-value pairs as properties
for _, label := range series.Labels() {
if label.Name != "__name__" {
vertex.Properties[label.Name] = []byte(label.Value)
}
}

// Create vertex
sl.CreateVertex(vertex)

// Optionally: Create edges between related metrics
// Example: Same job → Edge
}

return nil
}

Format 3: HDFS (Hadoop Distributed File System)

Schema:

HDFS Layout:
hdfs://namenode:9000/graph-snapshot/
vertices/
part-r-00000 (Hadoop SequenceFile)
part-r-00001
...
part-r-09999

edges/
part-r-00000
part-r-00001
...
part-r-99999

SequenceFile Format:
Key: LongWritable (vertex/edge ID)
Value: BytesWritable (protobuf-encoded vertex/edge)

Loader Implementation:

func (sl *SnapshotLoader) LoadHDFSSnapshot(hdfsPath string) error {
// Connect to HDFS
client, err := hdfs.New(sl.Config.HDFSNameNode)
if err != nil {
return err
}
defer client.Close()

// List vertex files
vertexFiles, err := client.ReadDir(hdfsPath + "/vertices")
if err != nil {
return err
}

// Parallel load across proxies
var wg sync.WaitGroup

for _, file := range vertexFiles {
wg.Add(1)
go func(filename string) {
defer wg.Done()

// Open SequenceFile
reader, err := seqfile.NewReader(client.Open(filename))
if err != nil {
log.Errorf("Failed to open %s: %v", filename, err)
return
}
defer reader.Close()

// Read records
for reader.Next() {
key := reader.Key()
value := reader.Value()

// Decode protobuf vertex
vertex := &prism.Vertex{}
proto.Unmarshal(value, vertex)

// Create vertex
sl.CreateVertex(vertex)
}
}(file.Name())
}

wg.Wait()
return nil
}

Format 4: Protobuf (Native)

Schema:

message GraphSnapshot {
SnapshotMetadata metadata = 1;
repeated VertexBatch vertex_batches = 2;
repeated EdgeBatch edge_batches = 3;
}

message SnapshotMetadata {
string snapshot_id = 1;
int64 timestamp = 2;
int64 vertex_count = 3;
int64 edge_count = 4;
string schema_version = 5;
}

message VertexBatch {
repeated Vertex vertices = 1;
}

message EdgeBatch {
repeated Edge edges = 1;
}

S3 Layout (chunked):

s3://prism-snapshots/graph-2025-11-15/
metadata.pb (100 KB)
vertices/
chunk-00000.pb (100 MB, ~1M vertices)
chunk-00001.pb
...
chunk-09999.pb
Total: 10,000 chunks, 1 TB

edges/
chunk-00000.pb (100 MB, ~500k edges)
chunk-00001.pb
...
chunk-99999.pb
Total: 100,000 chunks, 10 TB

Loader Implementation (zero-copy):

func (sl *SnapshotLoader) LoadProtobufSnapshot(s3Path string) error {
// Memory-map snapshot files for zero-copy access
vertexFiles := sl.ListS3Files(s3Path + "/vertices/*.pb")

for _, file := range vertexFiles {
// Download to local SSD
localPath := sl.DownloadToSSD(file)

// Memory-map file
mmap, err := syscall.Mmap(
int(os.Open(localPath)),
0,
os.Stat(localPath).Size(),
syscall.PROT_READ,
syscall.MAP_SHARED,
)
if err != nil {
return err
}

// Parse protobuf directly from mmap (zero-copy)
batch := &VertexBatch{}
proto.Unmarshal(mmap, batch)

// Vertices are now accessible without copying
for _, vertex := range batch.Vertices {
sl.AddVertexPointer(vertex) // Store pointer, not copy
}
}

return nil
}

Performance (fastest):

Protobuf Loading (with memory-map):
No deserialization overhead
Zero-copy access to vertex data

Loading speed: Network bandwidth limited
1000 proxies × 10 Gbps = 10 Tbps aggregate = 1.25 TB/s
210 TB ÷ 1.25 TB/s = 168 seconds = 2.8 minutes

Format 5: JSON Lines (Human-Readable)

Schema:

{"type":"vertex","id":"user:alice","label":"User","properties":{"name":"Alice","age":30}}
{"type":"edge","id":"follow:1","label":"FOLLOWS","from":"user:alice","to":"user:bob"}
{"type":"vertex","id":"user:bob","label":"User","properties":{"name":"Bob","age":25}}

Loader (simplest, slowest):

func (sl *SnapshotLoader) LoadJSONLinesSnapshot(s3Path string) error {
files := sl.ListS3Files(s3Path + "/*.jsonl.gz")

for _, file := range files {
reader := sl.OpenGzipS3File(file)
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
line := scanner.Text()

var obj map[string]interface{}
json.Unmarshal([]byte(line), &obj)

switch obj["type"] {
case "vertex":
vertex := &Vertex{
ID: obj["id"].(string),
Label: obj["label"].(string),
Properties: obj["properties"].(map[string]interface{}),
}
sl.CreateVertex(vertex)

case "edge":
edge := &Edge{
ID: obj["id"].(string),
Label: obj["label"].(string),
FromVertexID: obj["from"].(string),
ToVertexID: obj["to"].(string),
}
sl.CreateEdge(edge)
}
}
}

return nil
}

Distributed WAL for Consistency

WAL Architecture

┌────────────────────────────────────────────────────────────────┐
│ Write Path │
│ │
│ Client → Proxy → Kafka WAL → [S3 + Memory + SSD] │
│ │
│ Step 1: Write to Kafka (durable, ordered) │
│ Step 2: Async apply to hot tier (memory) │
│ Step 3: Async apply to warm tier (SSD) │
│ Step 4: Async apply to cold tier (S3) │
│ │
└────────────────────────────────────────────────────────────────┘

Kafka Topic Configuration:

wal_kafka_config:
topic: graph-wal-cluster-{cluster_id}
partitions: 1000 # One per proxy
replication_factor: 3
retention_ms: 604800000 # 7 days
segment_ms: 3600000 # 1 hour segments
compression_type: lz4

# Performance tuning
batch_size: 16384
linger_ms: 10
max_request_size: 1048576 # 1 MB

WAL Consumer for Multi-Tier Updates

func (wc *WALConsumer) ConsumeAndApplyToTiers(partitionID string) {
consumer := wc.CreateKafkaConsumer(partitionID)

for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
continue
}

walOp := &WALOperation{}
proto.Unmarshal(msg.Value, walOp)

// Apply to hot tier (synchronous, must succeed)
err := wc.ApplyToHotTier(walOp)
if err != nil {
log.Errorf("Failed to apply to hot tier: %v", err)
continue // Don't commit offset
}

// Apply to warm tier (async, best-effort)
go wc.ApplyToWarmTier(walOp)

// Apply to cold tier (async, eventually consistent)
go wc.ApplyToColdTier(walOp)

// Commit offset
consumer.CommitMessage(msg)
}
}

func (wc *WALConsumer) ApplyToHotTier(op *WALOperation) error {
partition := wc.GetHotPartition(op.PartitionID)
if partition == nil {
return fmt.Errorf("partition not in hot tier: %s", op.PartitionID)
}

switch op.Type {
case OP_CREATE_VERTEX:
return partition.MemStore.CreateVertex(op.Vertex)

case OP_UPDATE_VERTEX:
return partition.MemStore.UpdateVertex(op.Vertex)

case OP_DELETE_VERTEX:
return partition.MemStore.DeleteVertex(op.VertexID)

case OP_CREATE_EDGE:
return partition.MemStore.CreateEdge(op.Edge)

case OP_DELETE_EDGE:
return partition.MemStore.DeleteEdge(op.EdgeID)
}

return nil
}

func (wc *WALConsumer) ApplyToWarmTier(op *WALOperation) error {
// Write to local SSD
ssd := wc.GetSSDStorage(op.PartitionID)
return ssd.AppendWALOperation(op)
}

func (wc *WALConsumer) ApplyToColdTier(op *WALOperation) error {
// Batch writes to S3 (expensive per-operation)
wc.S3BatchWriter.Enqueue(op)
return nil
}

S3 Batch Writer (Optimize Write Costs)

type S3BatchWriter struct {
buffer []*WALOperation
bufferSize int
maxSize int
flushTimer *time.Timer
}

func (sbw *S3BatchWriter) Enqueue(op *WALOperation) {
sbw.buffer = append(sbw.buffer, op)

// Flush when buffer full or timer expires
if len(sbw.buffer) >= sbw.maxSize {
sbw.Flush()
}
}

func (sbw *S3BatchWriter) Flush() {
if len(sbw.buffer) == 0 {
return
}

// Serialize batch
batch := &WALBatch{
Operations: sbw.buffer,
Timestamp: time.Now().Unix(),
}

data, _ := proto.Marshal(batch)
compressed := gzip.Compress(data)

// Write to S3
key := fmt.Sprintf("wal-batch-%d.pb.gz", batch.Timestamp)
sbw.S3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String("prism-wal-batches"),
Key: aws.String(key),
Body: bytes.NewReader(compressed),
})

// Clear buffer
sbw.buffer = sbw.buffer[:0]
}

Snapshot WAL Replay: Consistency During Bulk Loads

Problem: At 100B scale, loading a 210 TB snapshot from S3 takes 17 minutes (RFC-057). During this window, new writes arrive via the WAL. Where do these writes go? How do we ensure consistency?

The Version Skew Problem

Timeline:
T0: Snapshot created (version V1)
T1: Begin snapshot load (17-minute window starts)
T2-T16: Writes continue arriving via WAL
T17: Snapshot load complete (version V1)

Problem: Writes at T2-T16 NOT in snapshot (version V1)
- WAL contains operations from T2-T16
- Snapshot contains state up to T0
- Result: 17 minutes of data missing if WAL not replayed

Without WAL Replay:

  • Snapshot load completes with state at T0
  • Writes T2-T16 lost (data loss!)
  • Queries see stale data (T0 state)

With WAL Replay:

  • Snapshot load completes with state at T0
  • Replay WAL from T0 to T17 (catch up)
  • Final state: consistent as of T17

Dual-Version Loading Solution

Approach: Maintain two versions during snapshot load - "loading" (V1) and "active" (V2).

type PartitionLoader struct {
partitionID string
activeVersion *PartitionVersion // Current version (serving queries)
loadingVersion *PartitionVersion // New version (loading from S3)
walConsumer *WALConsumer
}

func (pl *PartitionLoader) LoadSnapshotWithWALReplay() error {
// Step 1: Note WAL position before starting load
walCheckpoint := pl.walConsumer.GetCurrentOffset()
log.Infof("Starting snapshot load at WAL offset %d", walCheckpoint)

// Step 2: Load snapshot from S3 (17 minutes)
pl.loadingVersion = &PartitionVersion{
ID: pl.partitionID,
Timestamp: time.Now().Unix(),
State: PARTITION_STATE_LOADING,
}

err := pl.LoadSnapshotFromS3(pl.loadingVersion)
if err != nil {
return fmt.Errorf("snapshot load failed: %w", err)
}

log.Infof("Snapshot loaded, replaying WAL from offset %d", walCheckpoint)

// Step 3: Replay WAL operations since snapshot creation
// (operations from walCheckpoint to current)
err = pl.ReplayWALSinceCheckpoint(walCheckpoint, pl.loadingVersion)
if err != nil {
return fmt.Errorf("WAL replay failed: %w", err)
}

// Step 4: Atomic swap to new version
pl.atomicSwap(pl.activeVersion, pl.loadingVersion)
log.Infof("Partition %s now serving version at offset %d", pl.partitionID, pl.walConsumer.GetCurrentOffset())

return nil
}

func (pl *PartitionLoader) ReplayWALSinceCheckpoint(
checkpoint int64,
target *PartitionVersion,
) error {
consumer := pl.walConsumer.CreateReaderFromOffset(checkpoint)

opsReplayed := 0
for {
msg := consumer.Poll(100 * time.Millisecond)
if msg == nil {
break // Caught up to current offset
}

walOp := &WALOperation{}
proto.Unmarshal(msg.Value, walOp)

// Apply operation to loading version
err := pl.ApplyWALOperation(target, walOp)
if err != nil {
return fmt.Errorf("failed to apply WAL op: %w", err)
}

opsReplayed++
}

log.Infof("Replayed %d WAL operations during catch-up", opsReplayed)
return nil
}

func (pl *PartitionLoader) atomicSwap(old, new *PartitionVersion) {
pl.mu.Lock()
defer pl.mu.Unlock()

// Atomic pointer swap
pl.activeVersion = new
pl.loadingVersion = nil

// Old version garbage collected
}

Shadow Graph Implementation

During Snapshot Load: Queries continue on activeVersion while loadingVersion builds in background.

Query Flow During Snapshot Load:

Client Query → Partition Executor → activeVersion (V1, serving)

loadingVersion (V2, loading from S3)

Writes:
WAL → [activeVersion, loadingVersion] // Dual-write to both

After Swap:
activeVersion = V2 (fully loaded + WAL replay)
loadingVersion = nil (GC'd)

Dual-Write Strategy:

func (pl *PartitionLoader) ApplyWrite(op *WALOperation) error {
// Write to active version (must succeed)
err := pl.ApplyWALOperation(pl.activeVersion, op)
if err != nil {
return err
}

// Write to loading version if present (best-effort)
if pl.loadingVersion != nil {
go pl.ApplyWALOperation(pl.loadingVersion, op)
}

return nil
}

WAL Replay Performance Analysis

Scenario: 17-minute snapshot load window

Write rate: 10k ops/sec per partition
Operations during load: 10k ops/sec × 1,020 seconds = 10.2M operations

WAL replay:
Deserialization: 10.2M ops × 10 μs = 102 seconds
Application: 10.2M ops × 50 μs = 510 seconds
Total: 612 seconds = 10.2 minutes

Total snapshot load: 17 min (S3) + 10.2 min (WAL) = 27.2 minutes

Optimization: Parallel WAL Replay

func (pl *PartitionLoader) ReplayWALParallel(
checkpoint int64,
target *PartitionVersion,
parallelism int,
) error {
// Divide WAL into N chunks
chunks := pl.DivideWALIntoChunks(checkpoint, parallelism)

// Replay each chunk in parallel
var wg sync.WaitGroup
errors := make(chan error, parallelism)

for _, chunk := range chunks {
wg.Add(1)
go func(c *WALChunk) {
defer wg.Done()
err := pl.ReplayWALChunk(c, target)
if err != nil {
errors <- err
}
}(chunk)
}

wg.Wait()
close(errors)

// Check for errors
for err := range errors {
if err != nil {
return err
}
}

return nil
}

Parallel Replay Performance:

Parallelism: 10 workers
Replay time: 612 seconds ÷ 10 = 61 seconds (10× speedup)
Total load: 17 min (S3) + 1 min (WAL) = 18 minutes

Consistency Guarantees

Read Consistency:

  • Queries ALWAYS see consistent state (either V1 or V2, never mix)
  • No torn reads during snapshot load
  • Atomic swap ensures no intermediate state visible

Write Consistency:

  • All writes go to WAL first (durable)
  • Writes applied to activeVersion immediately
  • Writes applied to loadingVersion best-effort
  • After swap, WAL replay ensures no writes lost

Failure Handling:

failure_scenarios:
snapshot_load_failure:
action: Abort load, keep activeVersion serving
impact: No data loss, retry snapshot load

wal_replay_failure:
action: Abort swap, keep activeVersion serving
impact: No data loss, retry from checkpoint

proxy_crash_during_load:
action: Restart, activeVersion still serving
impact: Retry snapshot load from beginning

Query Execution Across Tiers

Transparent Tier Fallback

func (qe *QueryExecutor) GetVertex(vertexID string) (*Vertex, error) {
partition := qe.GetPartitionForVertex(vertexID)

// Try hot tier (memory)
if partition.Temperature >= TEMPERATURE_HOT {
vertex, err := partition.MemStore.GetVertex(vertexID)
if err == nil {
qe.Metrics.RecordHit("hot")
return vertex, nil
}
}

// Try warm tier (SSD)
if partition.Temperature >= TEMPERATURE_WARM {
vertex, err := partition.SSD.GetVertex(vertexID)
if err == nil {
qe.Metrics.RecordHit("warm")

// Promote to hot tier if frequently accessed
qe.ConsiderPromotion(partition.ID)

return vertex, nil
}
}

// Fallback to cold tier (S3)
vertex, err := partition.LoadFromS3(vertexID)
if err != nil {
return nil, err
}

qe.Metrics.RecordHit("cold")

// Optionally cache in warm/hot tier
qe.ConsiderPromotion(partition.ID)

return vertex, nil
}

Prefetching for Multi-Hop Traversals

func (qe *QueryExecutor) TraverseWithPrefetch(
startVertexID string,
hops int,
) ([]Vertex, error) {
currentLevel := []string{startVertexID}

for hop := 0; hop < hops; hop++ {
// Prefetch neighbors for all current level vertices
nextLevelIDs := qe.PrefetchNeighbors(currentLevel)

// Execute traversal (neighbors already loaded)
nextLevel := []Vertex{}
for _, id := range nextLevelIDs {
vertex, _ := qe.GetVertex(id) // Fast: Already in cache
nextLevel = append(nextLevel, *vertex)
}

currentLevel = nextLevelIDs
}

return currentLevel, nil
}

func (qe *QueryExecutor) PrefetchNeighbors(vertexIDs []string) []string {
// Group by partition
partitionGroups := qe.GroupByPartition(vertexIDs)

neighborIDs := []string{}

// Prefetch in parallel
var wg sync.WaitGroup
for partitionID, vIDs := range partitionGroups {
wg.Add(1)
go func(pID string, ids []string) {
defer wg.Done()

// Load neighbors from appropriate tier
neighbors := qe.LoadNeighborsForPartition(pID, ids)
neighborIDs = append(neighborIDs, neighbors...)
}(partitionID, vIDs)
}

wg.Wait()
return neighborIDs
}

Performance Characteristics

Snapshot Loading Performance

FormatCompressionChunk SizeWorkersTotal Time (210 TB)
Protobuf (mmap)gzip (5:1)100 MB10002.8 minutes
Parquet (Arrow)Snappy (10:1)100 MB100017 minutes
HDFS (SequenceFile)Snappy (4:1)100 MB100020 minutes
Prometheus (TSDB)Custom (3:1)Block-level100025 minutes
JSON Linesgzip (2:1)100 MB100060 minutes

Query Latency by Tier

Query TypeHot (Memory)Warm (SSD)Cold (S3)Mixed (90% Cold)
Single vertex50 μs100 μs50 ms45 ms
1-hop traversal500 μs1 ms200 ms180 ms
2-hop traversal2 ms10 ms2 s1.8 s
Property filter2 s5 s60 s54 s

Cost Analysis

Pure In-Memory (baseline):
210 TB RAM × $500/TB/month = $105,000/month

Hot/Cold Tiered (10% hot):
Hot: 21 TB RAM × $500/TB/month = $10,500/month
Cold: 189 TB S3 × $23/TB/month = $4,347/month
Total: $14,847/month
Savings: $90,153/month (86% reduction)

Hot/Cold Tiered (20% hot, better performance):
Hot: 42 TB RAM × $500/TB/month = $21,000/month
Cold: 168 TB S3 × $23/TB/month = $3,864/month
Total: $24,864/month
Savings: $80,136/month (76% reduction)

S3 Cost Optimization Strategy

Problem: At 100B scale, S3 request costs dominate storage costs (MEMO-050 Finding 1). The original cost model significantly underestimated true TCO.

The Hidden Cost of S3

Original Cost Estimate (Storage Only):

Cold storage: 168 TB × $23/TB/month (S3 Standard) = $3,864/month
Annual: $46,368/year ✓ (correctly calculated)

True Cost Including Requests (90% cold tier access):

Assumptions:
- 1B queries/sec at 100B scale
- 10% cache hit rate (90% miss → S3 requests)
- Cold tier serves 90% of misses
- Average query touches 100 partitions

S3 GET requests per second:
1B queries/sec × 90% miss × 90% cold × 100 partitions = 81B S3 GETs/sec

S3 GET pricing:
$0.0004 per 1000 requests = $0.0000004 per request

Monthly S3 request cost:
81B requests/sec × 86,400 sec/day × 30 days/month × $0.0000004
= 210 trillion requests/month
= $84M/month in S3 GET requests alone ❌

vs Storage cost:
$3,864/month (negligible compared to request costs)

True monthly cost without optimization: $84M/month
True annual cost: $1B/year (not $46k/year) ❌

Key Insight: At massive scale, S3 request costs dwarf storage costs by 2000×. The hidden cost of S3 is not storage—it's the API calls.

Multi-Tier Caching Architecture

To reduce S3 request costs from $84M/month to manageable levels, implement aggressive multi-tier caching:

Tier 0: Proxy-Local Cache (Varnish)

Purpose: Absorb repeated queries to same partitions within single proxy

proxy_local_cache:
technology: Varnish HTTP cache
storage: 100 GB SSD per proxy (1000 proxies = 100 TB cluster-wide)
ttl: 300 seconds (5 minutes)
eviction: LRU

performance:
hit_latency: 1 ms (local SSD read)
hit_rate: 30% (repeated queries within 5 min window)
cost: $100/TB/month (SSD) × 100 GB = $10/proxy/month
total_cost: $10,000/month for 1000 proxies

Impact: 30% of queries never reach S3 (absorbed locally).

Tier 1: CloudFront CDN

Purpose: Global edge caching for frequently accessed partitions across all proxies

cloudfront_cdn:
edge_locations: 400+ globally
cache_behavior:
default_ttl: 3600 seconds (1 hour)
max_ttl: 86400 seconds (24 hours)
min_ttl: 60 seconds

pricing:
data_transfer: $0.085/GB (first 10 TB)
http_requests: $0.0075 per 10,000 requests (85× cheaper than S3 GETs)

performance:
hit_latency: 50 ms (edge to user)
hit_rate: 60% (of queries not absorbed by Tier 0)
effective_rate: 60% × 70% remaining = 42% of total

cost_calculation:
Queries reaching Tier 1: 1B queries/sec × 70% (missed Tier 0) = 700M queries/sec
CloudFront hits: 700M × 60% = 420M queries/sec
CloudFront GET cost: 420M req/sec × 86,400 sec/day × 30 days/month × $0.00000075
= $816,480/month (vs $35M/month if those hit S3)

Impact: Additional 42% of queries served from CloudFront edge (total 72% never reach S3).

Tier 2: S3 Express One Zone (Hot Partitions)

Purpose: Faster S3 access for frequently accessed cold partitions

s3_express_one_zone:
use_case: Hot cold partitions (top 10% of cold tier by access frequency)
storage: 16.8 TB (10% of 168 TB cold storage)

pricing:
storage: $0.16/GB/month = $160/TB/month
requests: $0.0008 per 1000 requests (2× cheaper than S3 Standard)

performance:
latency: 5-10 ms (vs 50-100 ms S3 Standard)
hit_rate: 15% (of queries reaching S3)

cost_calculation:
Storage: 16.8 TB × $160/TB/month = $2,688/month
Requests: 280M queries/sec × 15% × 86,400 × 30 × $0.0000008
= $8.7M/month (vs $13M if S3 Standard)
Total: $8.7M/month

Impact: 15% of S3-bound queries use faster, cheaper S3 Express.

Tier 3: Batch S3 Reads (Cold Partitions)

Purpose: Amortize S3 request costs across multiple partitions

s3_batch_reads:
strategy: Prefetch adjacent partitions when loading one partition
batch_size: 10 partitions per S3 GetObject call

s3_multipart_api:
operation: S3 Select or Athena for range reads
benefit: Single GET retrieves multiple partitions

cost_impact:
Before: 100 partitions × 100 S3 GETs = 10,000 requests
After: 100 partitions ÷ 10 per batch = 10 requests
Reduction: 1000× fewer S3 requests

latency_trade_off:
Increased transfer time: 156 MB × 10 = 1.56 GB per request
At 1 Gbps: 12.5 seconds (vs 5s for single partition)
Acceptable: Cold queries are rare (<10% of workload)

Impact: 1000× reduction in S3 requests for remaining 13% of queries.

Revised Cost Model

With Full Multi-Tier Caching:

Tier 0 (Proxy-Local Varnish): 30% hit rate
Cost: $10,000/month
Queries absorbed: 300M queries/sec

Tier 1 (CloudFront CDN): 42% additional hit rate (72% cumulative)
Cost: $816,480/month
Queries absorbed: 420M queries/sec

Tier 2 (S3 Express One Zone): 15% of S3-bound queries
Storage: $2,688/month
Requests: $8.7M/month
Queries: 42M queries/sec

Tier 3 (Batch S3 Standard): 13% of S3-bound queries with 1000× batching
Storage: $3,864/month (168 TB - 16.8 TB = 151.2 TB at $23/TB)
Requests: 36.4M queries/sec ÷ 1000 batch factor = 36,400 req/sec
Request cost: 36,400 req/sec × 86,400 × 30 × $0.0000004 = $37,791/month

Total Monthly Cost (with optimization): $9.6M/month
Total Annual Cost: $115M/year

vs Without optimization: $1B/year
Savings: $885M/year (88.5% reduction) ✅

Cost Optimization Roadmap by Scale

1B Vertices (10 Nodes): Minimal optimization needed

scale: 1B vertices
query_rate: 10M queries/sec
strategy: Proxy-local cache only
cost: $10k/month (affordable without CDN)

10B Vertices (100 Nodes): Add CloudFront CDN

scale: 10B vertices
query_rate: 100M queries/sec
strategy: Proxy-local + CloudFront
cost: $1M/month
savings: $9M/month vs no optimization (90%)

100B Vertices (1000 Nodes): Full multi-tier caching

scale: 100B vertices
query_rate: 1B queries/sec
strategy: All 4 tiers (Varnish + CloudFront + S3 Express + Batch)
cost: $9.6M/month
savings: $74.4M/month vs no optimization (88.5%)
mandatory: Yes - system unusable without optimization

Alternative Approaches Considered

Option 1: Increase Hot Tier to 50% (Rejected)

  • Approach: Keep 50% of data in memory to reduce S3 load
  • Cost: 105 TB RAM × $500/TB/month = $52.5M/month
  • Rejected Why: More expensive than optimized caching ($9.6M/month), negates hot/cold benefit

Option 2: Use Only S3 Glacier (Rejected)

  • Approach: Store all cold data in Glacier ($1/TB/month)
  • Latency: 12-hour retrieval time (unacceptable for queries)
  • Rejected Why: Breaks query SLAs, defeats purpose of cold tier

Option 3: Compress All Cold Data 10× (Evaluated)

  • Approach: Aggressive compression (zstd level 19)
  • Trade-off: 10× storage reduction but 3× slower decompression
  • Decision: Use compression for S3 storage, but decompress on load (already planned)

Option 4: No Cold Tier, Pure In-Memory (Rejected)

  • Cost: 210 TB RAM × $500/TB/month = $105M/month
  • Rejected Why: 10× more expensive than optimized hot/cold architecture

Integration with Temperature Management

Coordinated Cache Warming:

func (cm *CacheManager) OnPartitionPromotion(partitionID string) {
// When partition promoted from Cold → Warm:

// 1. Prefetch to CloudFront
cm.CloudFront.Prefetch(partitionID)

// 2. Load to local caches across cluster
for _, proxy := range cm.GetProxies() {
proxy.LocalCache.Warm(partitionID)
}

// 3. Mark for S3 Express migration if sustained high access
if cm.GetAccessRate(partitionID) > 1000 {
cm.MigrateToS3Express(partitionID)
}
}

Cache Invalidation:

func (cm *CacheManager) OnPartitionUpdate(partitionID string, version int64) {
// Invalidate all cache tiers to ensure consistency
cm.LocalCache.Invalidate(partitionID)
cm.CloudFront.InvalidatePath(partitionID, version)
// S3 is eventually consistent, no invalidation needed
}

Summary

S3 cost optimization is mandatory at 100B scale. Without multi-tier caching, S3 request costs reach $1B/year (vs $46k/year originally estimated). With optimization:

  • Total cost: $115M/year (vs $1B/year unoptimized)
  • Savings: $885M/year (88.5% reduction)
  • Architecture: 4-tier caching (Varnish → CloudFront → S3 Express → Batch S3)
  • Query impact: 72% of queries never reach S3 (absorbed by Tiers 0-1)
  • Scalability: Each tier provides specific optimization for different access patterns

Key Takeaway: The true cost of S3 at massive scale is not storage ($46k/year) but requests ($1B/year without optimization). This finding corrects the original TCO from $7M/year to $115M/year—still 50% cheaper than pure in-memory, but requires aggressive caching architecture.

Open Questions

  1. S3 Request Costs: How to minimize S3 GET requests for cold tier?
  2. Snapshot Consistency: How to handle concurrent updates during snapshot load?
  3. Partial Snapshots: Support incremental snapshots for faster updates?
  4. Compression Trade-offs: Optimal compression ratio vs decompression speed?
  5. Multi-Region S3: How to handle cross-region snapshot replication?

References

Revision History

  • 2025-11-15: Initial draft - Hot/cold storage tiers with rapid S3 snapshot loading