ClickHouse Integration for Time Series Analytics
Abstract
This RFC specifies the integration of ClickHouse into Prism as a high-performance OLAP database optimized for time series analytics. ClickHouse provides columnar storage, real-time ingestion, and lightning-fast analytical queries, making it ideal for metrics, logs, events, and observability data.
1. Introduction
1.1 Purpose
ClickHouse integration addresses analytical time series workloads:
- Metrics Storage: Application metrics, system metrics, business KPIs
- Event Logging: Application logs, audit logs, user activity events
- Observability: Traces, spans, and distributed system telemetry
- Analytics: Real-time aggregations, rollups, and dashboards
1.2 Goals
- Ingestion Rate: 1M+ events/sec sustained write throughput
- Query Performance: Sub-second aggregations over billions of rows
- Compression: 10-100x compression ratio for time series data
- Retention: Automatic data lifecycle with TTL and tiered storage
- Scalability: Horizontal scaling with ReplicatedMergeTree
1.3 Non-Goals
- Not for OLTP: Use Postgres for transactional workloads
- Not for full-text search: Use Elasticsearch or TypeSense
- Not for updates: ClickHouse is append-only (use Postgres for mutable data)
- Not for joins: Optimized for denormalized data, avoid complex joins
2. Architecture Overview
2.1 Time Series Pipeline
2.2 Data Flow
3. Time Series Access Pattern
3.1 Use Cases
Metrics
- Application performance metrics (request rate, latency, errors)
- Infrastructure metrics (CPU, memory, disk, network)
- Business metrics (revenue, conversions, user activity)
Logs
- Application logs (errors, warnings, debug)
- Access logs (HTTP requests, API calls)
- Audit logs (security events, compliance)
Events
- User activity (clicks, page views, sessions)
- System events (deployments, configuration changes)
- IoT telemetry (sensor data, device events)
Traces
- Distributed tracing spans
- Service dependencies and call graphs
- Performance profiling
3.2 Interface
syntax = "proto3";
package prism.timeseries.v1;
service TimeSeriesService {
// Append events (batched, async)
rpc AppendEvents(AppendEventsRequest) returns (AppendEventsResponse);
// Stream events (for high-throughput ingestion)
rpc StreamEvents(stream Event) returns (StreamEventsResponse);
// Query events with time range and filters
rpc QueryEvents(QueryRequest) returns (QueryResponse);
// Query with aggregations
rpc QueryAggregates(AggregateRequest) returns (AggregateResponse);
// Stream query results (for large datasets)
rpc StreamQuery(QueryRequest) returns (stream Event);
}
message Event {
// Timestamp (nanosecond precision)
google.protobuf.Timestamp timestamp = 1;
// Event metadata
string event_type = 2;
string source = 3;
// Dimensions (indexed)
map<string, string> dimensions = 4;
// Metrics (columnar storage)
map<string, double> metrics = 5;
// Payload (compressed)
bytes payload = 6;
}
message AppendEventsRequest {
string session_id = 1;
string namespace = 2;
repeated Event events = 3;
// Async mode (immediate response)
bool async = 4;
}
message QueryRequest {
string session_id = 1;
string namespace = 2;
// Time range (required for partition pruning)
google.protobuf.Timestamp start_time = 3;
google.protobuf.Timestamp end_time = 4;
// Filters (WHERE clause)
map<string, string> filters = 5;
// SQL-like query (optional)
string sql = 6;
// Pagination
int32 limit = 7;
int32 offset = 8;
}
message AggregateRequest {
string session_id = 1;
string namespace = 2;
google.protobuf.Timestamp start_time = 3;
google.protobuf.Timestamp end_time = 4;
// Aggregation function
enum AggregateFunc {
COUNT = 0;
SUM = 1;
AVG = 2;
MIN = 3;
MAX = 4;
QUANTILE_95 = 5;
QUANTILE_99 = 6;
}
// Metrics to aggregate
repeated string metric_names = 5;
repeated AggregateFunc functions = 6;
// Group by dimensions
repeated string group_by = 7;
// Time bucket (for time series)
google.protobuf.Duration bucket_size = 8;
}
3.3 Schema Design
-- Main events table (sharded by timestamp)
CREATE TABLE events ON CLUSTER '{cluster}' (
timestamp DateTime64(9),
event_type LowCardinality(String),
source LowCardinality(String),
namespace LowCardinality(String),
-- Dimensions (indexed)
dimensions Map(String, String),
-- Metrics (columnar)
metrics Map(String, Float64),
-- Payload (compressed)
payload String CODEC(ZSTD(3))
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (namespace, event_type, timestamp)
TTL timestamp + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
-- Distributed table (query interface)
CREATE TABLE events_distributed ON CLUSTER '{cluster}' AS events
ENGINE = Distributed('{cluster}', default, events, rand());
3.4 Materialized Views for Pre-aggregations
-- 1-minute rollups
CREATE MATERIALIZED VIEW events_1m ON CLUSTER '{cluster}'
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/events_1m', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (namespace, event_type, timestamp_1m)
AS SELECT
toStartOfMinute(timestamp) AS timestamp_1m,
namespace,
event_type,
count() AS event_count,
sumMap(metrics) AS metrics_sum,
avgMap(metrics) AS metrics_avg
FROM events
GROUP BY timestamp_1m, namespace, event_type;
-- 1-hour rollups (from 1-minute)
CREATE MATERIALIZED VIEW events_1h ON CLUSTER '{cluster}'
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/events_1h', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (namespace, event_type, timestamp_1h)
AS SELECT
toStartOfHour(timestamp_1m) AS timestamp_1h,
namespace,
event_type,
sum(event_count) AS event_count,
sumMap(metrics_sum) AS metrics_sum
FROM events_1m
GROUP BY timestamp_1h, namespace, event_type;
4. Query Patterns
4.1 Time Range Queries
-- Recent events (last 1 hour)
SELECT *
FROM events_distributed
WHERE namespace = 'app:production'
AND timestamp >= now() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
LIMIT 100;
4.2 Aggregations
-- Request rate per minute (last 24 hours)
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS request_count,
avg(metrics['duration_ms']) AS avg_duration,
quantile(0.95)(metrics['duration_ms']) AS p95_duration
FROM events_distributed
WHERE namespace = 'api:requests'
AND timestamp >= now() - INTERVAL 24 HOUR
GROUP BY minute
ORDER BY minute;
4.3 Top-N Queries
-- Top 10 endpoints by error rate
SELECT
dimensions['endpoint'] AS endpoint,
countIf(dimensions['status'] >= '500') AS error_count,
count() AS total_count,
error_count / total_count AS error_rate
FROM events_distributed
WHERE namespace = 'api:requests'
AND timestamp >= now() - INTERVAL 1 HOUR
GROUP BY endpoint
ORDER BY error_rate DESC
LIMIT 10;
4.4 Time Series Bucketing
-- P99 latency per 5-minute bucket
SELECT
toStartOfInterval(timestamp, INTERVAL 5 MINUTE) AS bucket,
quantile(0.99)(metrics['latency_ms']) AS p99_latency
FROM events_distributed
WHERE namespace = 'service:checkout'
AND timestamp >= now() - INTERVAL 6 HOUR
GROUP BY bucket
ORDER BY bucket;
5. Performance Optimizations
5.1 Partitioning Strategy
Benefits:
- Partition Pruning: Only scan relevant days
- TTL: Drop old partitions efficiently
- Parallel Queries: Query partitions in parallel
5.2 Compression
-- Codec comparison
ALTER TABLE events MODIFY COLUMN payload String CODEC(ZSTD(3)); -- 10x compression
ALTER TABLE events MODIFY COLUMN dimensions Map(String, String) CODEC(LZ4); -- 3x, faster
Compression Ratios:
- ZSTD(3): 10-15x (best compression)
- LZ4: 2-3x (fastest)
- Delta + LZ4: 20x+ for monotonic data (timestamps, counters)
5.3 Async Inserts
-- Enable async inserts (client-side batching)
SET async_insert = 1;
SET wait_for_async_insert = 0;
SET async_insert_max_data_size = 10000000; -- 10MB batches
SET async_insert_busy_timeout_ms = 1000; -- 1s max wait
6. Configuration
6.1 Client Configuration
message ClickHouseBackendConfig {
// Connection
repeated string hosts = 1;
int32 port = 2;
string database = 3;
string username = 4;
string password = 5;
// Cluster settings
bool cluster_mode = 6;
string cluster_name = 7;
int32 num_shards = 8;
int32 num_replicas = 9;
// Performance
int32 batch_size = 10; // Events per batch
google.protobuf.Duration batch_timeout = 11; // Max wait time
bool async_insert = 12;
int32 insert_threads = 13;
// Retention
int32 ttl_days = 14;
bool enable_tiered_storage = 15;
// Table settings
string partition_by = 16; // "toYYYYMMDD(timestamp)"
string order_by = 17; // "(namespace, event_type, timestamp)"
int32 index_granularity = 18;
}
6.2 Server Configuration
# config/clickhouse.yaml
clickhouse:
cluster:
name: "prism_cluster"
shards:
- hosts: ["ch-shard1-1.internal:9000", "ch-shard1-2.internal:9000"]
- hosts: ["ch-shard2-1.internal:9000", "ch-shard2-2.internal:9000"]
tables:
events:
engine: "ReplicatedMergeTree"
partition_by: "toYYYYMMDD(timestamp)"
order_by: "(namespace, event_type, timestamp)"
ttl_days: 90
compression: "ZSTD(3)"
performance:
batch_size: 10000
batch_timeout: "1s"
async_insert: true
insert_threads: 4
max_memory_usage: "10GB"
materialized_views:
- name: "events_1m"
enabled: true
- name: "events_1h"
enabled: true
- name: "events_1d"
enabled: false # Enable for long-term storage
7. Operational Considerations
7.1 Capacity Planning
Storage Calculation: daily_volume = events_per_day × avg_event_size_bytes compressed_size = daily_volume / compression_ratio retention_storage = compressed_size × retention_days × (1 + replica_count)
**Example**:
- 1B events/day × 200 bytes = 200GB/day uncompressed
- 200GB / 10 (ZSTD) = 20GB/day compressed
- 20GB × 90 days × 2 (replication) = 3.6TB total
**Query Performance**:
- 1M events/sec ingestion requires ~4 shards
- Sub-second queries over 1TB datasets
- 100 concurrent analytical queries supported
### 7.2 Monitoring
metrics: ingestion: - insert_rate_events_per_sec - insert_throughput_bytes_per_sec - async_insert_queue_size - insert_latency_p99
storage: - disk_usage_bytes - compression_ratio - parts_count - partition_count
queries: - query_rate_per_sec - query_latency_p50 - query_latency_p99 - memory_usage_per_query - running_queries_count
replication: - replication_lag_seconds - replica_queue_size
### 7.3 Data Lifecycle
graph LR
Hot[Hot Storage
SSD
Last 7 days] -->|TTL| Warm[Warm Storage
HDD
8-30 days]
Warm -->|TTL| Cold[Cold Storage
S3
31-90 days]
Cold -->|TTL| Delete[Deleted
90+ days]
style Hot fill:#ff6b6b
style Warm fill:#feca57
style Cold fill:#48dbfb
style Delete fill:#dfe6e9
-- Tiered storage with TTL ALTER TABLE events MODIFY TTL timestamp + INTERVAL 7 DAY TO VOLUME 'hot', timestamp + INTERVAL 30 DAY TO VOLUME 'warm', timestamp + INTERVAL 90 DAY TO VOLUME 'cold', timestamp + INTERVAL 90 DAY DELETE;
## 8. Migration Path
### 8.1 Phase 1: Single Node (Week 1-2)
- Deploy ClickHouse standalone
- Implement TimeSeriesService gRPC interface
- Create basic events table
- Integration tests with mock data
### 8.2 Phase 2: Cluster Setup (Week 3-4)
- Deploy 2-shard cluster with replication
- Configure Distributed tables
- Implement async batching
- Load testing (1M events/sec)
### 8.3 Phase 3: Materialized Views (Week 5-6)
- Create 1-minute rollups
- Create 1-hour rollups
- Query optimization with pre-aggregations
- Benchmarking (query latency)
### 8.4 Phase 4: Production (Week 7-8)
- Tiered storage configuration
- Alerting and monitoring
- Backup and disaster recovery
- Documentation and runbooks
## 9. Use Case Recommendations
### 9.1 When to Use ClickHouse
✅ **Use When**:
- Append-only time series data
- Analytical queries over large datasets
- Real-time aggregations needed
- High ingestion rate (`>100k` events/sec)
- Data has natural time dimension
❌ **Avoid When**:
- Need updates/deletes (use Postgres)
- Complex joins required (use Postgres)
- Full-text search needed (use Elasticsearch)
- Small dataset (`<1M` rows, use Postgres)
### 9.2 ClickHouse vs Alternatives
| Use Case | ClickHouse | Postgres | Kafka |
|----------|-----------|----------|-------|
| Metrics storage | ✅ Excellent | ❌ Poor | ❌ No analytics |
| Event logging | ✅ Excellent | ⚠️ Limited | ✅ Good |
| Real-time dashboards | ✅ Excellent | ❌ Slow | ❌ No queries |
| Retention (90+ days) | ✅ Efficient | ❌ Expensive | ⚠️ Complex |
| Complex aggregations | ✅ Fast | ⚠️ Moderate | ❌ Not supported |
## 10. References
- [ClickHouse Documentation](https://clickhouse.com/docs/)
- [ReplicatedMergeTree Engine](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication)
- [Materialized Views](https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views)
- [Compression Codecs](https://clickhouse.com/docs/en/sql-reference/statements/create/table#column_compression_codec)
## 11. Revision History
- 2025-10-08: Initial draft