58. 实时数仓技术指南
目录
点击展开目录
实时数仓基础概念
什么是实时数仓
实时数仓(Real-time Data Warehouse) 是一种能够实时或准实时地采集、处理、存储和分析数据的数据仓库系统。与传统离线数仓按天或按小时批量处理数据不同,实时数仓能够在秒级甚至毫秒级完成数据的端到端处理,为业务决策提供最新的数据支持。
核心特征:
低延迟处理
- 数据从产生到可查询的时间在秒级或分钟级
- 支持毫秒级的数据写入和查询
- 端到端延迟可控且稳定
流式计算
- 采用流式计算引擎持续处理数据流
- 数据逐条或微批处理,而非批量处理
- 支持事件驱动的实时响应
实时可见
- 业务指标实时更新,无需等待批处理完成
- 支持实时查询和分析
- 数据变化立即反映到报表和大屏
高吞吐量
- 能够处理海量高并发的数据流
- 支持百万级甚至千万级 TPS
- 水平扩展能力强
容错性强
- 支持故障自动恢复
- 数据不丢失、不重复
- 状态一致性保证
技术架构图:
MySQL/PostgreSQL"] A2["日志数据
应用日志/Nginx"] A3["埋点数据
用户行为"] A4["第三方数据
API/文件"] end subgraph "数据采集层" B1["CDC
Canal/Debezium"] B2["日志采集
Flume/Filebeat"] B3["SDK/API"] B4["DataX/Sqoop"] end subgraph "消息队列层" C["Kafka
统一消息总线"] end subgraph "实时计算层" D1["Flink
流式计算"] D2["Spark Streaming
微批处理"] end subgraph "存储层" E1["ClickHouse
OLAP 分析"] E2["Hudi/Iceberg
数据湖"] E3["Redis/HBase
KV 存储"] E4["Doris
MPP 数据库"] end subgraph "应用层" F1["实时大屏"] F2["实时报表"] F3["实时推荐"] F4["实时风控"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B4 B1 --> C B2 --> C B3 --> C B4 --> C C --> D1 C --> D2 D1 --> E1 D1 --> E2 D1 --> E3 D2 --> E4 E1 --> F1 E2 --> F2 E3 --> F3 E4 --> F4 style C fill:#ffd43b style D1 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#51cf66 style F1 fill:#ff6b6b
典型应用场景:
| 场景 | 延迟要求 | 数据量级 | 典型指标 | 技术选型 | 业务价值 |
|---|---|---|---|---|---|
| 实时大屏 | 秒级 | 百万级/秒 | GMV、订单量、UV/PV | Flink + ClickHouse | 实时监控业务状态 |
| 实时风控 | 毫秒级 | 十万级/秒 | 风险评分、异常检测 | Flink + Redis | 防止欺诈和损失 |
| 实时推荐 | 百毫秒级 | 千万级/秒 | 用户画像、商品推荐 | Flink + HBase | 提升转化率 |
| 实时监控 | 秒级 | 百万级/秒 | 系统指标、业务告警 | Flink + Prometheus | 快速发现问题 |
| 实时报表 | 分钟级 | 百万级/秒 | 业务报表、数据分析 | Flink + Hudi | 支持实时决策 |
| 实时数据同步 | 秒级 | 十万级/秒 | 数据一致性 | Flink CDC + Hudi | 数据实时可用 |
实时数仓 vs 离线数仓
核心差异对比:
| 维度 | 离线数仓 | 实时数仓 | 说明 |
|---|---|---|---|
| 数据延迟 | T+1(天级)或小时级 | 秒级/分钟级 | 实时数仓延迟降低 1000 倍以上 |
| 处理模式 | 批处理(Batch) | 流处理(Stream) | 处理范式完全不同 |
| 计算引擎 | Hive、Spark Batch | Flink、Spark Streaming | 流式引擎更复杂 |
| 存储引擎 | HDFS、Hive 表 | ClickHouse、Hudi、Iceberg | 实时存储要求更高 |
| 数据更新 | 全量/增量覆盖 | 实时追加/更新 | 实时数仓支持实时更新 |
| 查询性能 | 分钟级 | 秒级/毫秒级 | 实时数仓查询更快 |
| 资源消耗 | 低(定时调度) | 高(持续运行) | 实时数仓需要持续占用资源 |
| 数据一致性 | 强一致 | 最终一致 | 实时数仓一致性要求更灵活 |
| 开发复杂度 | 低 | 高 | 实时数仓需要处理更多边界情况 |
| 运维复杂度 | 低 | 高 | 实时数仓需要 7x24 监控 |
| 适用场景 | 历史分析、离线报表 | 实时监控、实时决策 | 各有侧重 |
| 成本 | 低 | 高 | 实时数仓成本是离线的 2-3 倍 |
架构对比流程图:
延迟:天级"] end subgraph "实时数仓架构" direction TB A2["业务数据库"] -->|"CDC 实时采集"| B2["Kafka"] B2 -->|"流式处理"| C2["Flink 实时 ETL"] C2 --> D2["ClickHouse/Hudi"] D2 --> E2["实时大屏
延迟:秒级"] end style A1 fill:#a5d8ff style A2 fill:#a5d8ff style D1 fill:#ffd43b style C2 fill:#4dabf7 style F1 fill:#74c0fc style E2 fill:#51cf66
数据处理流程对比:
离线数仓处理流程:
- 每天凌晨定时调度任务
- 从业务库全量或增量抽取数据
- 数据加载到 HDFS/Hive
- 批量 ETL 处理(清洗、转换、关联)
- 批量聚合计算
- 结果写入数据集市
- 早上业务人员查看昨天的报表
实时数仓处理流程:
- CDC 实时捕获数据库变更
- 变更事件实时发送到 Kafka
- Flink 实时消费 Kafka 数据
- 流式 ETL 处理(清洗、转换、关联)
- 实时聚合计算(窗口、状态)
- 结果实时写入存储引擎
- 业务人员实时查看最新数据
性能对比示例:
假设一个电商场景,统计实时 GMV:
| 指标 | 离线数仓 | 实时数仓 | 提升倍数 |
|---|---|---|---|
| 数据延迟 | 24 小时 | 5 秒 | 17280 倍 |
| 查询延迟 | 30 秒 | 0.5 秒 | 60 倍 |
| 数据新鲜度 | 昨天的数据 | 当前秒的数据 | - |
| 决策响应 | 次日调整 | 实时调整 | - |
实时数仓的核心价值
1. 业务价值
实时决策能力:
- 场景:双十一大促,实时监控各品类销售情况
- 价值:发现某品类销售不佳,立即调整营销策略,增加曝光
- 效果:相比离线数仓次日调整,实时调整可提升 20-30% 的销售额
用户体验提升:
- 场景:电商平台实时推荐系统
- 价值:根据用户最近 5 分钟的浏览行为,实时更新推荐结果
- 效果:点击率提升 15%,转化率提升 10%
风险控制:
- 场景:金融交易实时风控
- 价值:毫秒级识别异常交易,实时拦截欺诈行为
- 效果:欺诈损失降低 80%,误拦截率降低 50%
运营效率:
- 场景:物流配送实时监控
- 价值:实时监控配送进度,及时发现异常,快速调度
- 效果:配送时效提升 20%,客户满意度提升 15%
2. 技术价值
数据时效性:
系统响应速度:
- 查询响应时间从分钟级降到秒级
- 支持高并发查询(QPS 10000+)
- 支持复杂分析查询(多维聚合、关联)
资源利用率:
- 流式处理避免了批处理的资源峰值
- 资源占用更平稳,更容易规划
- 支持弹性伸缩,按需分配资源
架构简化:
- 统一流批处理,减少重复开发
- 一套代码同时支持实时和离线
- 降低维护成本
3. 商业价值
+15%"] C --> F D --> G["成本降低
-20%"] E --> F F --> H["ROI 提升
200%+"] G --> H style A fill:#ff6b6b style F fill:#51cf66 style G fill:#51cf66 style H fill:#ffd43b
ROI 计算示例:
假设一个中型电商平台:
- 日均 GMV:1000 万
- 实时数仓建设成本:200 万/年
- 运维成本:100 万/年
- 总成本:300 万/年
收益:
- 实时推荐提升转化率 10%:GMV 增加 100 万/天 = 3.65 亿/年
- 实时风控降低损失 80%:节省 500 万/年
- 实时运营提升效率 20%:节省人力成本 200 万/年
- 总收益:3.7 亿/年
ROI = (收益 - 成本) / 成本 = (3.7 亿 - 300 万) / 300 万 ≈ 123 倍
实时数仓的技术挑战
1. 数据一致性挑战
问题描述:
- 分布式环境下,数据在多个节点间传输和处理
- 网络延迟、节点故障可能导致数据丢失或重复
- 多个数据源的数据需要保持一致性
解决方案:
Exactly-Once 语义:
// Flink Exactly-Once 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 启用 Checkpoint
env.enableCheckpointing(60000); // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2. Kafka Source Exactly-Once
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"topic",
new SimpleStringSchema(),
properties
);
consumer.setStartFromGroupOffsets(); // 从 Group Offset 开始
// 3. Kafka Sink Exactly-Once
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精确一次
.setTransactionalIdPrefix("flink-kafka-")
.build();
幂等性设计:
// 使用唯一 ID 保证幂等性
public class IdempotentSinkFunction extends RichSinkFunction<Order> {
private transient RedisClient redis;
@Override
public void invoke(Order order, Context context) {
String key = "order:" + order.getOrderId();
// 检查是否已处理
if (redis.exists(key)) {
return; // 已处理,跳过
}
// 处理数据
processOrder(order);
// 标记已处理
redis.setex(key, 3600, "processed"); // 1 小时过期
}
}
分布式事务:
- 使用 Flink 的两阶段提交(2PC)
- 使用 Hudi/Iceberg 的 ACID 事务
- 使用 Saga 模式处理长事务
2. 数据延迟挑战
延迟来源分析:
10-50ms"| B["Kafka"] B -->|"消费延迟
10-100ms"| C["Flink"] C -->|"计算延迟
100-500ms"| D["处理完成"] D -->|"写入延迟
50-200ms"| E["存储"] E -->|"查询延迟
10-100ms"| F["应用"] style A fill:#a5d8ff style C fill:#4dabf7 style E fill:#51cf66 style F fill:#ff6b6b
优化方案:
| 环节 | 优化措施 | 效果 |
|---|---|---|
| Kafka | 增加分区数、优化网络配置 | 延迟降低 50% |
| Flink | 调整并行度、优化算子 | 延迟降低 30% |
| 存储 | 批量写入、异步写入 | 延迟降低 40% |
| 查询 | 添加索引、物化视图 | 延迟降低 60% |
端到端延迟监控:
public class LatencyMonitor extends ProcessFunction<Event, Event> {
private transient Histogram latencyHistogram;
@Override
public void open(Configuration parameters) {
latencyHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("end_to_end_latency",
new DescriptiveStatisticsHistogram(1000));
}
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
long latency = System.currentTimeMillis() - event.getTimestamp();
latencyHistogram.update(latency);
if (latency > 5000) { // 超过 5 秒告警
log.warn("High latency detected: {}ms, event: {}", latency, event);
}
out.collect(event);
}
}
3. 数据乱序挑战
乱序产生原因:
- 网络延迟不一致
- 多数据源时间不同步
- 分布式系统的固有特性
Watermark 机制:
处理乱序数据:
// 设置 Watermark 策略
WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // 容忍 10 秒乱序
.withTimestampAssigner((order, timestamp) -> order.getCreateTime())
.withIdleness(Duration.ofMinutes(1)); // 1 分钟无数据则推进 Watermark
DataStream<Order> watermarkedStream = stream
.assignTimestampsAndWatermarks(watermarkStrategy);
// 窗口允许延迟
DataStream<OrderStats> result = watermarkedStream
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(1)) // 允许 1 分钟延迟
.sideOutputLateData(lateOutputTag) // 超过延迟的数据输出到侧输出流
.aggregate(new OrderAggregateFunction());
// 处理迟到数据
DataStream<Order> lateData = result.getSideOutput(lateOutputTag);
lateData.addSink(new LateDataSink()); // 单独处理迟到数据
4. 状态管理挑战
大状态问题:
- 状态大小可能达到 TB 级别
- 状态恢复时间长(小时级)
- 内存压力大,容易 OOM
解决方案:
使用 RocksDB 状态后端:
// RocksDB 状态后端配置
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); // 启用增量 Checkpoint
backend.setDbStoragePath("/data/flink/rocksdb");
env.setStateBackend(backend);
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");
状态 TTL:
// 设置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // 24 小时过期
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建和写入时更新
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期数据
.cleanupFullSnapshot() // 全量快照时清理
.cleanupIncrementally(10, true) // 增量清理
.build();
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("user-state", String.class);
descriptor.enableTimeToLive(ttlConfig);
ValueState<String> state = getRuntimeContext().getState(descriptor);
增量 Checkpoint:
- 只保存状态的增量变化
- Checkpoint 时间从小时级降到分钟级
- 恢复时间大幅缩短
5. 资源成本挑战
成本构成:
| 资源类型 | 占比 | 优化空间 |
|---|---|---|
| 计算资源 | 40% | 高 |
| 存储资源 | 30% | 中 |
| 网络资源 | 20% | 低 |
| 人力成本 | 10% | 中 |
成本优化策略:
计算资源优化
- 合理设置并行度,避免资源浪费
- 使用 Slot Sharing 共享资源
- 使用弹性伸缩,按需分配资源
存储资源优化
- 冷热数据分离
- 数据压缩(LZ4/Snappy)
- 定期清理过期数据
网络资源优化
- 数据压缩传输
- 减少数据 Shuffle
- 使用本地聚合
实时数仓的应用场景
1. 实时大屏监控
业务需求:
- 实时展示核心业务指标(GMV、订单量、UV/PV)
- 多维度分析(地域、品类、时间)
- 秒级数据刷新
技术方案:
- 数据采集:Canal CDC
- 消息队列:Kafka
- 实时计算:Flink
- 存储:ClickHouse(明细)+ Redis(实时指标)
- 展示:DataV/Grafana
2. 实时风控系统
业务需求:
- 毫秒级风险识别
- 多维度风险规则
- 实时拦截异常交易
技术方案:
- 数据采集:SDK 埋点
- 消息队列:Kafka
- 实时计算:Flink CEP(复杂事件处理)
- 存储:Redis(规则)+ HBase(历史)
- 决策:实时评分 + 规则引擎
3. 实时推荐系统
业务需求:
- 根据用户实时行为推荐
- 百毫秒级响应
- 个性化推荐
技术方案:
- 数据采集:埋点 SDK
- 消息队列:Kafka
- 实时计算:Flink(特征计算)
- 存储:HBase(用户画像)+ Redis(推荐结果)
- 算法:协同过滤 + 深度学习
4. 实时数据同步
业务需求:
- 多数据源实时同步
- 数据一致性保证
- 支持数据转换
技术方案:
- 数据采集:Flink CDC
- 消息队列:Kafka
- 实时计算:Flink
- 存储:Hudi/Iceberg(数据湖)
- 查询:Presto/Trino
5. 实时数据质量监控
业务需求:
- 实时监控数据质量
- 异常数据告警
- 数据血缘追踪
技术方案:
- 数据采集:全链路埋点
- 消息队列:Kafka
- 实时计算:Flink(质量检查)
- 存储:Elasticsearch(日志)+ MySQL(元数据)
- 告警:AlertManager
应用场景总结表:
| 场景 | 延迟 | 数据量 | 复杂度 | 成本 | 适用行业 |
|---|---|---|---|---|---|
| 实时大屏 | 秒级 | 高 | 中 | 中 | 电商、金融、物流 |
| 实时风控 | 毫秒级 | 中 | 高 | 高 | 金融、支付、电商 |
| 实时推荐 | 百毫秒级 | 极高 | 高 | 高 | 电商、视频、社交 |
| 实时同步 | 秒级 | 高 | 中 | 中 | 所有行业 |
| 实时监控 | 秒级 | 中 | 低 | 低 | 所有行业 |
实时数仓架构演进
Lambda 架构
Lambda 架构是由 Nathan Marz 在 2011 年提出的大数据处理架构,通过批处理和流处理两条链路并行处理数据,最终合并结果,是实时数仓架构的重要里程碑。
架构组成:
日志/埋点"] end subgraph "批处理层 Batch Layer" B["HDFS
Master Dataset"] --> C["Spark Batch
MapReduce"] C --> D["批处理视图
Batch View"] end subgraph "速度层 Speed Layer" E["Kafka
消息队列"] --> F["Flink/Storm
流式计算"] F --> G["实时视图
Real-time View"] end subgraph "服务层 Serving Layer" D --> H["查询合并
Query Merge"] G --> H H --> I["统一查询接口
API"] end A -->|"批量导入
T+1"| B A -->|"实时采集
CDC"| E style B fill:#a5d8ff style E fill:#ffd43b style H fill:#51cf66 style I fill:#ff6b6b
三层架构详解:
1. 批处理层(Batch Layer)
职责:
- 存储完整的历史数据(Master Dataset)
- 定期批量计算,生成批处理视图
- 保证数据的准确性和完整性
- 支持数据重新计算和修正
技术栈:
- 存储:HDFS、Hive
- 计算:Spark Batch、MapReduce
- 调度:Airflow、Oozie
特点:
- 高延迟(小时/天级)
- 高准确性(强一致性)
- 支持复杂计算
- 资源消耗集中在批处理时段
2. 速度层(Speed Layer)
职责:
- 处理增量实时数据
- 快速生成实时视图
- 补充批处理层的延迟
- 提供最新的数据
技术栈:
- 消息队列:Kafka、Pulsar
- 计算:Flink、Storm、Spark Streaming
- 存储:Redis、HBase、Cassandra
特点:
- 低延迟(秒级)
- 最终一致性
- 只处理最近的数据
- 持续占用资源
3. 服务层(Serving Layer)
职责:
- 合并批处理视图和实时视图
- 提供统一的查询接口
- 处理查询请求
- 返回完整的结果
技术栈:
- 查询引擎:Druid、ClickHouse、Presto
- API 层:Spring Boot、FastAPI
- 缓存:Redis、Memcached
特点:
- 查询合并逻辑
- 统一对外接口
- 支持高并发查询
Lambda 架构工作流程:
Lambda 架构优缺点:
| 优点 | 缺点 |
|---|---|
| ✅ 容错性强:批处理保证数据准确性,可重新计算 | ❌ 维护成本高:需要维护两套代码(批处理 + 流处理) |
| ✅ 可扩展性好:批流分离,各自独立扩展 | ❌ 数据一致性难:批处理和流处理结果可能不一致 |
| ✅ 历史数据可重算:支持数据修正和逻辑变更 | ❌ 架构复杂:三层架构,组件多,运维复杂 |
| ✅ 适合复杂业务:批处理支持复杂的计算逻辑 | ❌ 资源消耗大:批流两套系统,资源占用多 |
| ✅ 数据完整性好:Master Dataset 保存完整数据 | ❌ 开发效率低:同一逻辑需要实现两次 |
Lambda 架构适用场景:
- 数据量超大(PB 级以上)
- 需要频繁重算历史数据
- 对数据准确性要求极高
- 业务逻辑复杂,需要批处理支持
- 有足够的开发和运维资源
Lambda 架构实战示例:
// 批处理层:Spark Batch 计算每日订单统计
SparkSession spark = SparkSession.builder()
.appName("Daily Order Stats")
.getOrCreate();
Dataset<Row> orders = spark.read()
.format("hive")
.table("ods.orders")
.where("dt = '2024-02-28'");
Dataset<Row> stats = orders
.groupBy("province", "category")
.agg(
count("order_id").as("order_count"),
sum("amount").as("total_amount")
);
stats.write()
.mode(SaveMode.Overwrite)
.format("hive")
.saveAsTable("ads.daily_order_stats");
// 速度层:Flink 实时计算订单统计
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> orders = env
.addSource(new FlinkKafkaConsumer<>("orders", ...));
DataStream<OrderStats> realtimeStats = orders
.keyBy(order -> Tuple2.of(order.getProvince(), order.getCategory()))
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregateFunction());
realtimeStats.addSink(new RedisSink<>(...));
// 服务层:合并批处理和实时视图
@GetMapping("/order-stats")
public OrderStats getOrderStats(String province, String category) {
// 1. 从 Hive 获取批处理结果(昨天及之前)
OrderStats batchStats = hiveService.getStats(province, category,
LocalDate.now().minusDays(1));
// 2. 从 Redis 获取实时结果(今天)
OrderStats realtimeStats = redisService.getStats(province, category);
// 3. 合并结果
return OrderStats.builder()
.orderCount(batchStats.getOrderCount() + realtimeStats.getOrderCount())
.totalAmount(batchStats.getTotalAmount().add(realtimeStats.getTotalAmount()))
.build();
}
Kappa 架构
Kappa 架构是由 LinkedIn 的 Jay Kreps 在 2014 年提出,是对 Lambda 架构的简化,核心思想是一切皆流,只保留流处理链路。
核心思想:
- 统一处理:用同一套流处理引擎处理实时和历史数据
- 批处理即流处理:将批处理视为流处理的特例(有界流)
- 简化架构:去掉批处理层,降低维护成本
- Kafka 长期存储:利用 Kafka 的持久化能力存储历史数据
架构流程图:
日志/埋点"] end subgraph "消息队列(长期存储)" B["Kafka
保留 7-30 天"] end subgraph "流处理层" C1["Flink Job 1
实时计算"] C2["Flink Job 2
历史回溯"] end subgraph "存储层" D["ClickHouse/Hudi
统一存储"] end subgraph "应用层" E["统一查询接口"] end A -->|"CDC 实时采集"| B B -->|"实时流"| C1 B -->|"历史流
(重放)"| C2 C1 --> D C2 --> D D --> E style B fill:#ffd43b style C1 fill:#4dabf7 style C2 fill:#74c0fc style D fill:#51cf66
Kappa 架构特点:
1. 统一流处理
所有数据处理都通过流处理引擎完成:
- 实时数据:从 Kafka 实时消费,持续处理
- 历史数据:从 Kafka 重放历史消息,批量处理
- 代码复用:同一套代码处理实时和历史数据
2. Kafka 作为数据中心
Kafka 不仅是消息队列,还是数据存储:
- 长期保留:配置较长的保留时间(7-30 天)
- 数据回溯:支持从任意 offset 开始消费
- 数据重放:支持多次消费同一份数据
3. 灵活的数据重算
当需要重新计算数据时:
- 停止当前的流处理任务
- 启动新的流处理任务,从历史 offset 开始消费
- 重新计算并写入存储
- 完成后切换到实时处理
Kappa 架构优缺点:
| 优点 | 缺点 |
|---|---|
| ✅ 架构简单:只需维护一套流处理代码 | ❌ 依赖 Kafka:需要 Kafka 长期存储,成本较高 |
| ✅ 开发效率高:统一流处理,代码复用 | ❌ 回溯速度慢:历史数据重算速度不如批处理 |
| ✅ 数据一致性好:同一套逻辑,结果一致 | ❌ 不适合超大数据:PB 级数据回溯困难 |
| ✅ 运维成本低:组件少,维护简单 | ❌ 对流引擎要求高:需要成熟的流处理引擎 |
| ✅ 实时性好:专注流处理,延迟更低 | ❌ 存储成本高:Kafka 长期存储成本高 |
Kappa 架构适用场景:
- 数据量适中(TB 级)
- 以实时处理为主
- 历史数据重算频率低
- 追求架构简洁性
- 团队熟悉流处理技术
Kappa 架构实战示例:
// 统一的流处理代码
public class OrderStatsJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "order-stats");
// 根据参数决定是实时处理还是历史回溯
FlinkKafkaConsumer<Order> consumer = new FlinkKafkaConsumer<>(
"orders",
new OrderDeserializationSchema(),
properties
);
if (args.length > 0 && "reprocess".equals(args[0])) {
// 历史回溯:从指定 offset 开始
Map<TopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new TopicPartition("orders", 0), 0L);
specificStartOffsets.put(new TopicPartition("orders", 1), 0L);
consumer.setStartFromSpecificOffsets(specificStartOffsets);
} else {
// 实时处理:从最新 offset 开始
consumer.setStartFromLatest();
}
// 统一的处理逻辑
DataStream<Order> orders = env.addSource(consumer);
DataStream<OrderStats> stats = orders
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreateTime())
)
.keyBy(order -> Tuple2.of(order.getProvince(), order.getCategory()))
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new OrderAggregateFunction());
// 写入存储
stats.addSink(new ClickHouseSink<>(...));
env.execute("Order Stats Job");
}
}
// Kafka 配置:长期保留
// server.properties
log.retention.hours=720 # 保留 30 天
log.segment.bytes=1073741824 # 1GB
log.retention.check.interval.ms=300000
数据重算流程:
需要重新计算 Dev->>Flink: 停止实时任务 Dev->>Flink: 启动回溯任务
(从历史 offset) Flink->>Kafka: 从 offset=0 开始消费 Kafka->>Flink: 返回历史数据 loop 处理历史数据 Flink->>Flink: 流式处理 Flink->>Storage: 写入结果 end Note over Flink: 历史数据处理完成 Dev->>Flink: 停止回溯任务 Dev->>Flink: 启动实时任务
(从最新 offset) Flink->>Kafka: 从最新 offset 消费 Kafka->>Flink: 返回实时数据
现代实时数仓架构
现代实时数仓架构融合了 Lambda 和 Kappa 的优点,结合数据湖技术(Hudi/Iceberg),形成了更加灵活和高效的流批一体、湖仓一体架构。
典型架构:
MySQL/PostgreSQL"] A2["日志数据
应用日志/Nginx"] A3["埋点数据
用户行为"] A4["第三方数据
API/文件"] end subgraph "数据采集层" B1["CDC
Canal/Debezium/Flink CDC"] B2["日志采集
Flume/Filebeat"] B3["SDK/API"] B4["批量导入
DataX/Sqoop"] end subgraph "消息队列层" C["Kafka
统一消息总线"] end subgraph "实时计算层" D1["Flink
实时 ETL"] D2["Flink
实时聚合"] D3["Flink
实时关联"] end subgraph "存储层" E1["ClickHouse
OLAP 查询"] E2["Hudi/Iceberg
数据湖"] E3["Redis/HBase
KV 存储"] E4["Doris
MPP 数据库"] end subgraph "批处理层(可选)" F["Spark Batch
离线计算"] end subgraph "应用层" G1["实时大屏"] G2["实时报表"] G3["实时推荐"] G4["实时风控"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B4 B1 --> C B2 --> C B3 --> C B4 --> C C --> D1 C --> D2 C --> D3 D1 --> E1 D1 --> E2 D2 --> E3 D3 --> E4 E2 --> F F --> E2 E1 --> G1 E2 --> G2 E3 --> G3 E4 --> G4 style C fill:#ffd43b style D1 fill:#4dabf7 style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E2 fill:#51cf66
现代架构核心特点:
1. 流批一体
使用 Flink 统一流批处理:
- 流处理:实时数据流式处理
- 批处理:历史数据批量处理
- 统一 API:DataStream API 和 Table API
- 统一引擎:同一个 Flink 集群
// Flink 流批一体示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 流处理:无界流
DataStream<Order> streamOrders = env
.addSource(new FlinkKafkaConsumer<>("orders", ...));
// 批处理:有界流
DataStream<Order> batchOrders = env
.readTextFile("hdfs://path/to/orders")
.map(new OrderMapper());
// 统一的处理逻辑
DataStream<OrderStats> processOrders(DataStream<Order> orders) {
return orders
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new OrderAggregateFunction());
}
DataStream<OrderStats> streamStats = processOrders(streamOrders);
DataStream<OrderStats> batchStats = processOrders(batchOrders);
2. 湖仓一体
结合数据湖和数据仓库的优势:
- 数据湖:灵活的存储,支持多种格式
- 数据仓库:高性能的查询,支持 SQL
- ACID 事务:保证数据一致性
- Schema Evolution:支持 Schema 变更
Hudi/Iceberg 特性对比:
| 特性 | Hudi | Iceberg | Delta Lake |
|---|---|---|---|
| 提出公司 | Uber | Netflix | Databricks |
| 开源时间 | 2019 | 2018 | 2019 |
| ACID 支持 | ✅ | ✅ | ✅ |
| 更新删除 | ✅ | ✅ | ✅ |
| 时间旅行 | ✅ | ✅ | ✅ |
| Schema Evolution | ✅ | ✅ | ✅ |
| 流式写入 | ✅ 强 | ⚠️ 一般 | ⚠️ 一般 |
| 查询性能 | ⚠️ 一般 | ✅ 好 | ✅ 好 |
| Flink 集成 | ✅ 原生支持 | ✅ 支持 | ❌ 不支持 |
| Spark 集成 | ✅ | ✅ | ✅ |
| Presto 集成 | ✅ | ✅ | ⚠️ 一般 |
3. 多引擎融合
根据不同场景选择合适的存储引擎:
| 存储引擎 | 适用场景 | 查询延迟 | 写入吞吐 | 更新支持 |
|---|---|---|---|---|
| ClickHouse | OLAP 分析、实时大屏 | 秒级 | 极高 | ❌ |
| Hudi/Iceberg | 数据湖、流批一体 | 秒级 | 高 | ✅ |
| Doris | 实时报表、多维分析 | 秒级 | 高 | ✅ |
| Redis | 实时推荐、风控 | 毫秒级 | 极高 | ✅ |
| HBase | 用户画像、明细查询 | 毫秒级 | 高 | ✅ |
4. 分层解耦
数据分层设计,各层独立:
- ODS 层:原始数据,保持原貌
- DWD 层:明细数据,清洗转换
- DWS 层:汇总数据,轻度聚合
- ADS 层:应用数据,面向业务
5. 元数据管理
统一的元数据管理:
- 数据血缘:追踪数据来源和去向
- 数据质量:监控数据质量指标
- 数据目录:统一的数据发现和查询
- 权限管理:细粒度的权限控制
架构选型指南
架构演进对比:
| 架构 | 提出时间 | 核心思想 | 复杂度 | 成本 | 适用场景 | 代表公司 |
|---|---|---|---|---|---|---|
| Lambda | 2011 | 批流分离 | 高 | 高 | 超大规模、高准确性 | Twitter, Netflix |
| Kappa | 2014 | 一切皆流 | 中 | 中 | 实时为主、简化架构 | LinkedIn, Uber |
| 现代架构 | 2018+ | 流批一体、湖仓一体 | 中 | 中 | 大规模实时数仓 | 字节跳动, 阿里巴巴 |
架构选型决策树:
频繁重算?"} B -->|"TB 级"| D{"是否需要
数据湖能力?"} B -->|"GB 级"| E["Kappa 架构
Flink + ClickHouse"] C -->|"是"| F["Lambda 架构
批流分离"] C -->|"否"| G["现代架构
Hudi + Flink"] D -->|"是"| H["现代架构
Hudi/Iceberg + Flink"] D -->|"否"| I["Kappa 架构
Flink + ClickHouse"] style F fill:#ffd43b style E fill:#51cf66 style H fill:#4dabf7 style I fill:#51cf66
详细选型建议:
1. 选择 Lambda 架构的场景:
✅ 适合:
- 数据量超大(PB 级以上)
- 需要频繁重算历史数据
- 对数据准确性要求极高(金融、医疗)
- 业务逻辑复杂,需要批处理支持
- 有充足的开发和运维资源
- 批处理和实时处理的业务逻辑差异大
❌ 不适合:
- 团队规模小,资源有限
- 追求快速迭代和敏捷开发
- 数据量不大(TB 级以下)
- 实时性要求极高(毫秒级)
2. 选择 Kappa 架构的场景:
✅ 适合:
- 数据量适中(TB 级)
- 以实时处理为主,历史数据重算频率低
- 追求架构简洁性和开发效率
- 团队熟悉流处理技术
- 业务逻辑相对简单
- 可以接受 Kafka 长期存储的成本
❌ 不适合:
- 数据量超大(PB 级以上)
- 需要频繁重算历史数据
- 对批处理性能要求高
- Kafka 存储成本不可接受
3. 选择现代架构的场景:
✅ 适合:
- 数据量大(TB-PB 级)
- 需要流批一体能力
- 需要数据湖能力(ACID、更新删除)
- 追求架构灵活性
- 团队技术能力强
- 需要支持多种查询引擎
❌ 不适合:
- 数据量很小(GB 级)
- 只需要简单的实时处理
- 团队对新技术不熟悉
- 预算有限
架构选型评分表:
| 评估维度 | Lambda | Kappa | 现代架构 | 权重 |
|---|---|---|---|---|
| 开发复杂度 | 2 | 5 | 4 | 20% |
| 运维复杂度 | 2 | 5 | 4 | 20% |
| 数据准确性 | 5 | 4 | 5 | 15% |
| 实时性 | 3 | 5 | 5 | 15% |
| 可扩展性 | 5 | 4 | 5 | 10% |
| 成本 | 2 | 4 | 3 | 10% |
| 灵活性 | 3 | 3 | 5 | 10% |
| 总分 | 3.05 | 4.35 | 4.50 | 100% |
架构演进路径:
单体架构"] -->|"数据量增长"| B["Kappa 架构
流式处理"] B -->|"业务复杂化"| C["Lambda 架构
批流分离"] C -->|"技术成熟"| D["现代架构
流批一体"] style A fill:#a5d8ff style B fill:#74c0fc style C fill:#ffd43b style D fill:#51cf66
典型演进案例:
阶段 1:初期(数据量 < 100GB/天)
- 架构:单体应用 + MySQL
- 特点:简单直接,开发快速
- 问题:数据量增长后性能下降
阶段 2:成长期(数据量 100GB-1TB/天)
- 架构:Kappa 架构(Flink + Kafka + ClickHouse)
- 特点:实时处理,架构简洁
- 问题:历史数据重算困难
阶段 3:成熟期(数据量 1TB-10TB/天)
- 架构:现代架构(Flink + Hudi + ClickHouse)
- 特点:流批一体,灵活性高
- 问题:技术复杂度增加
阶段 4:大规模(数据量 > 10TB/天)
- 架构:Lambda 架构或现代架构
- 特点:高可靠、高性能
- 问题:成本高,运维复杂
不同行业的架构选择:
| 行业 | 数据量 | 实时性要求 | 推荐架构 | 理由 |
|---|---|---|---|---|
| 电商 | 大 | 高 | 现代架构 | 需要流批一体,支持多种场景 |
| 金融 | 超大 | 极高 | Lambda 架构 | 准确性要求高,需要批处理保证 |
| 社交 | 超大 | 高 | 现代架构 | 数据量大,实时性要求高 |
| 物流 | 中 | 中 | Kappa 架构 | 实时为主,架构简洁 |
| 游戏 | 中 | 高 | Kappa 架构 | 实时性要求高,数据量适中 |
| 广告 | 大 | 高 | 现代架构 | 需要实时竞价,数据量大 |
架构选型检查清单:
业务需求:
- 数据量级(GB/TB/PB)
- 延迟要求(毫秒/秒/分钟)
- 准确性要求(最终一致/强一致)
- 是否需要历史数据重算
- 业务逻辑复杂度
技术能力:
- 团队规模和技术水平
- 是否熟悉流处理技术
- 是否有大数据平台经验
- 运维能力和资源
资源约束:
- 预算限制
- 时间限制
- 人力资源
- 基础设施
未来规划:
- 数据量增长预期
- 业务扩展计划
- 技术演进方向
- 团队建设计划
实时数仓技术栈
数据采集层技术
数据采集层是实时数仓的第一道关口,负责从各种数据源实时采集数据并发送到消息队列。
主流采集技术对比:
| 技术 | 类型 | 数据源 | 延迟 | 吞吐量 | 可靠性 | 学习成本 | 适用场景 |
|---|---|---|---|---|---|---|---|
| Canal | MySQL CDC | MySQL | 秒级 | 高 | 高 | 低 | MySQL 数据同步 |
| Debezium | 通用 CDC | 多种数据库 | 秒级 | 高 | 高 | 中 | 多数据库同步 |
| Flink CDC | CDC 框架 | 多种数据库 | 秒级 | 极高 | 极高 | 中 | 大规模 CDC |
| Maxwell | MySQL CDC | MySQL | 秒级 | 中 | 中 | 低 | 轻量级 CDC |
| Flume | 日志采集 | 日志文件 | 秒级 | 高 | 高 | 中 | 日志采集 |
| Filebeat | 日志采集 | 日志文件 | 秒级 | 中 | 高 | 低 | 轻量级日志采集 |
| Logstash | 日志采集 | 多种数据源 | 秒级 | 中 | 高 | 中 | ELK 栈 |
| DataX | 批量同步 | 多种数据源 | 分钟级 | 极高 | 高 | 低 | 离线数据同步 |
CDC 技术详解:
CDC(Change Data Capture) 是实时数仓的核心技术,通过捕获数据库的变更日志实现数据的实时同步。
CDC 工作原理:
Binlog 事件类型:
| 事件类型 | 说明 | 包含信息 | 处理方式 |
|---|---|---|---|
| INSERT | 插入操作 | 新数据 | 直接插入 |
| UPDATE | 更新操作 | 更新前后的数据 | 更新或 Upsert |
| DELETE | 删除操作 | 删除的数据 | 删除或软删除 |
| DDL | 表结构变更 | DDL 语句 | Schema 变更 |
Canal 详解:
Canal 是阿里巴巴开源的 MySQL Binlog 增量订阅和消费组件。
Canal 架构:
解析器"] D["Sink
输出"] end subgraph "下游" E["Kafka"] F["RocketMQ"] G["Canal Client"] end A -->|"伪装成 Slave"| B B --> C C --> D D --> E D --> F D --> G style B fill:#4dabf7 style D fill:#51cf66
Canal 配置示例:
# canal.properties
canal.id = 1
canal.ip = 192.168.1.100
canal.port = 11111
canal.zkServers = localhost:2181
# instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# 订阅的数据库和表
canal.instance.filter.regex=test_db\\..*
canal.instance.filter.black.regex=test_db\\.tmp_.*
# Kafka 配置
canal.mq.servers=localhost:9092
canal.mq.topic=canal_topic
canal.mq.partition=0
Debezium 详解:
Debezium 是 RedHat 开源的分布式 CDC 平台,支持多种数据库。
Debezium 支持的数据库:
- MySQL
- PostgreSQL
- MongoDB
- SQL Server
- Oracle
- Db2
- Cassandra
Debezium 架构:
Debezium 配置示例:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
Flink CDC 详解:
Flink CDC 是基于 Flink 的 CDC 框架,提供了更好的性能和易用性。
Flink CDC 优势:
- 无需部署额外组件:直接在 Flink 中使用
- 性能更好:原生 Flink 集成,减少序列化开销
- 支持全量 + 增量:自动处理全量和增量数据
- Exactly-Once 保证:利用 Flink 的 Checkpoint 机制
- 支持多种数据库:MySQL、PostgreSQL、MongoDB、Oracle 等
Flink CDC 使用示例:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
public class FlinkCDCExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 MySQL CDC Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory") // 监控的数据库
.tableList("inventory.orders", "inventory.products") // 监控的表
.username("root")
.password("password")
.startupOptions(StartupOptions.initial()) // 全量 + 增量
.deserializer(new JsonDebeziumDeserializationSchema()) // JSON 格式
.build();
// 创建数据流
DataStreamSource<String> cdcStream = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
// 处理 CDC 数据
cdcStream
.map(new ParseCDCFunction())
.keyBy(Order::getOrderId)
.process(new OrderProcessFunction())
.addSink(new KafkaSink<>(...));
env.execute("Flink CDC Job");
}
}
// 解析 CDC 事件
public class ParseCDCFunction implements MapFunction<String, Order> {
@Override
public Order map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
String op = json.getString("op"); // c/u/d/r
switch (op) {
case "c": // CREATE (INSERT)
case "r": // READ (全量同步)
return parseOrder(json.getJSONObject("after"));
case "u": // UPDATE
return parseOrder(json.getJSONObject("after"));
case "d": // DELETE
Order order = parseOrder(json.getJSONObject("before"));
order.setDeleted(true);
return order;
default:
return null;
}
}
}
CDC 技术选型建议:
| 场景 | 推荐技术 | 理由 |
|---|---|---|
| 只有 MySQL | Canal | 轻量级,易部署,性能好 |
| 多种数据库 | Debezium | 支持多种数据库,生态完善 |
| 大规模 CDC | Flink CDC | 性能最好,与 Flink 集成好 |
| 轻量级场景 | Maxwell | 最轻量,配置简单 |
消息队列层技术
消息队列是实时数仓的数据总线,负责数据的缓冲、解耦和分发。
主流消息队列对比:
| 消息队列 | 吞吐量 | 延迟 | 持久化 | 顺序性 | 可靠性 | 生态 | 适用场景 |
|---|---|---|---|---|---|---|---|
| Kafka | 极高 | 毫秒级 | ✅ | ✅ | 高 | 极好 | 大数据、日志 |
| Pulsar | 极高 | 毫秒级 | ✅ | ✅ | 极高 | 好 | 云原生、多租户 |
| RocketMQ | 高 | 毫秒级 | ✅ | ✅ | 高 | 中 | 电商、金融 |
| RabbitMQ | 中 | 毫秒级 | ✅ | ⚠️ | 高 | 好 | 微服务 |
| Redis Streams | 高 | 微秒级 | ⚠️ | ✅ | 中 | 好 | 轻量级场景 |
Kafka 核心特性:
1. 高吞吐量
- 单机百万级 TPS
- 顺序写磁盘,性能接近内存
- 零拷贝技术(Zero-Copy)
- 批量发送和压缩
2. 低延迟
- 端到端延迟 < 10ms
- 页缓存(Page Cache)加速读取
- 异步发送
3. 持久化
- 消息持久化到磁盘
- 支持数据备份(副本机制)
- 可配置保留时间
4. 可扩展
- 水平扩展,支持海量数据
- 分区机制,并行处理
- 消费者组,负载均衡
5. 容错性
- 副本机制(Replication)
- ISR(In-Sync Replicas)
- 自动故障转移
Kafka 架构:
Leader"] B2["Broker 2
Follower"] B3["Broker 3
Follower"] C1["Topic: orders
Partition 0"] C2["Topic: orders
Partition 1"] C3["Topic: orders
Partition 2"] end subgraph "Consumer Group" D1["Consumer 1"] D2["Consumer 2"] D3["Consumer 3"] end subgraph "ZooKeeper" E["元数据管理"] end A1 --> B1 A2 --> B1 B1 --> C1 B1 --> C2 B1 --> C3 B2 -.->|"副本"| C1 B3 -.->|"副本"| C2 C1 --> D1 C2 --> D2 C3 --> D3 B1 -.-> E B2 -.-> E B3 -.-> E style B1 fill:#4dabf7 style E fill:#ffd43b
Kafka 核心概念:
1. Topic(主题)
- 消息的逻辑分类
- 类似数据库的表
- 可以有多个分区
2. Partition(分区)
- Topic 的物理分片
- 保证分区内消息有序
- 并行处理的基本单位
3. Replica(副本)
- 数据备份,保证可靠性
- Leader 副本:处理读写请求
- Follower 副本:同步数据,备份
4. Producer(生产者)
- 发送消息到 Topic
- 可以指定分区策略
- 支持批量发送和压缩
5. Consumer(消费者)
- 从 Topic 消费消息
- 消费者组实现负载均衡
- 支持手动和自动提交 Offset
6. Consumer Group(消费者组)
- 多个消费者组成一个组
- 同一个组内的消费者不会重复消费
- 不同组之间独立消费
Kafka 在实时数仓中的作用:
3 分区"] B2["Topic: ods_user
3 分区"] B3["Topic: ods_log
6 分区"] end subgraph "消费者" C1["Flink Job 1
实时清洗"] C2["Flink Job 2
实时聚合"] C3["Flink Job 3
实时关联"] C4["离线同步"] end A1 --> B1 A2 --> B2 A3 --> B3 B1 --> C1 B1 --> C2 B2 --> C3 B3 --> C1 B1 --> C4 style B1 fill:#ffd43b style B2 fill:#ffd43b style B3 fill:#ffd43b style C1 fill:#4dabf7
Kafka 关键配置:
# Broker 配置
# 网络线程数
num.network.threads=8
# IO 线程数
num.io.threads=16
# Socket 缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志配置
# 保留时间(7 天)
log.retention.hours=168
# 日志段大小(1GB)
log.segment.bytes=1073741824
# 检查间隔
log.retention.check.interval.ms=300000
# 副本配置
# 默认副本数
default.replication.factor=3
# 最小同步副本数
min.insync.replicas=2
# 副本延迟阈值
replica.lag.time.max.ms=10000
# 性能优化
# 压缩类型
compression.type=lz4
# 批量大小
batch.size=16384
# 等待时间
linger.ms=10
# 缓冲区大小
buffer.memory=33554432
Producer 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
props.put("acks", "all"); // 所有副本确认
props.put("retries", 3); // 重试次数
props.put("max.in.flight.requests.per.connection", 1); // 保证顺序
// 性能配置
props.put("compression.type", "lz4"); // 压缩
props.put("batch.size", 16384); // 批量大小
props.put("linger.ms", 10); // 等待时间
props.put("buffer.memory", 33554432); // 缓冲区
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 异步发送
producer.send(new ProducerRecord<>("topic", key, value), (metadata, exception) -> {
if (exception != null) {
log.error("Send failed", exception);
} else {
log.info("Sent to partition {} offset {}",
metadata.partition(), metadata.offset());
}
});
Consumer 配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 消费配置
props.put("enable.auto.commit", false); // 手动提交
props.put("auto.offset.reset", "earliest"); // 从最早开始
props.put("max.poll.records", 500); // 单次拉取记录数
props.put("fetch.min.bytes", 1024); // 最小拉取字节数
props.put("fetch.max.wait.ms", 500); // 最大等待时间
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
processRecord(record);
}
// 手动提交
consumer.commitSync();
}
Kafka 分区策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 轮询(Round-Robin) | 依次分配到各分区 | 负载均衡 |
| 哈希(Hash) | 根据 Key 的哈希值分配 | 保证相同 Key 到同一分区 |
| 自定义 | 自定义分区逻辑 | 特殊业务需求 |
// 自定义分区器
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 根据业务逻辑自定义分区
if (key == null) {
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// 例如:VIP 用户到特定分区
if (isVIP(key.toString())) {
return 0; // VIP 专属分区
}
// 普通用户哈希分区
return Math.abs(key.hashCode()) % (numPartitions - 1) + 1;
}
}
实时计算层技术
实时计算层是实时数仓的核心,负责数据的清洗、转换、聚合和关联。
主流实时计算引擎对比:
| 引擎 | 延迟 | 吞吐 | 状态管理 | 容错机制 | 窗口支持 | 生态 | 学习成本 | 适用场景 |
|---|---|---|---|---|---|---|---|---|
| Flink | 毫秒级 | 极高 | ✅ 强大 | Checkpoint | ✅ 丰富 | 极好 | 中 | 复杂实时计算 |
| Spark Streaming | 秒级 | 高 | ⚠️ 一般 | WAL | ⚠️ 一般 | 极好 | 低 | 准实时计算 |
| Storm | 毫秒级 | 中 | ❌ 弱 | ACK 机制 | ❌ 弱 | 中 | 中 | 简单实时计算 |
| Kafka Streams | 毫秒级 | 高 | ✅ 强 | Changelog | ✅ 好 | 好 | 低 | 轻量级流处理 |
| Samza | 毫秒级 | 高 | ✅ 强 | Checkpoint | ✅ 好 | 中 | 中 | LinkedIn 内部 |
Flink 核心优势:
1. 真正的流处理
Flink 是真正的流处理引擎,而非微批处理:
- 事件驱动:逐条处理数据,而非批量处理
- 低延迟:毫秒级延迟
- 事件时间:支持事件时间和处理时间
- 乱序处理:Watermark 机制处理乱序数据
2. 强大的状态管理
Flink 提供了完善的状态管理能力:
- 多种状态类型:ValueState、ListState、MapState、ReducingState、AggregatingState
- 状态后端:Memory、FsStateBackend、RocksDBStateBackend
- 状态 TTL:自动清理过期状态
- 状态可扩展:支持状态的重新分布
3. Exactly-Once 语义
Flink 通过 Checkpoint 机制保证端到端 Exactly-Once:
- 两阶段提交:保证 Sink 的 Exactly-Once
- Checkpoint:定期保存状态快照
- 故障恢复:从最近的 Checkpoint 恢复
4. 丰富的窗口操作
Flink 支持多种窗口类型:
- 滚动窗口(Tumbling Window):固定大小,不重叠
- 滑动窗口(Sliding Window):固定大小,可重叠
- 会话窗口(Session Window):基于活动间隔
- 全局窗口(Global Window):自定义触发器
5. 灵活的时间语义
Flink 支持三种时间语义:
- 事件时间(Event Time):数据产生的时间
- 处理时间(Processing Time):数据被处理的时间
- 摄入时间(Ingestion Time):数据进入 Flink 的时间
Flink 架构:
Flink 核心概念:
1. DataStream API
DataStream API 是 Flink 的核心 API,用于流式数据处理:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source
DataStream<String> text = env.readTextFile("input.txt");
// Transformation
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// Sink
counts.print();
env.execute("WordCount");
2. 算子(Operator)
Flink 提供了丰富的算子:
| 算子类型 | 算子 | 说明 | 示例 |
|---|---|---|---|
| Source | addSource | 数据源 | Kafka、Socket、File |
| Transformation | map | 一对一转换 | 字段映射 |
| flatMap | 一对多转换 | 分词 | |
| filter | 过滤 | 条件过滤 | |
| keyBy | 分组 | 按 Key 分组 | |
| reduce | 聚合 | 累加、求和 | |
| aggregate | 聚合 | 自定义聚合 | |
| window | 窗口 | 时间窗口、计数窗口 | |
| join | 关联 | 流与流关联 | |
| union | 合并 | 多流合并 | |
| Sink | addSink | 数据输出 | Kafka、JDBC、File |
3. 窗口(Window)
窗口是流处理中的核心概念,用于将无界流切分成有界流:
窗口示例:
// 滚动窗口:每 5 分钟统计一次
DataStream<OrderStats> tumblingStats = orders
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderAggregateFunction());
// 滑动窗口:最近 10 分钟,每 5 分钟更新一次
DataStream<OrderStats> slidingStats = orders
.keyBy(Order::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.aggregate(new OrderAggregateFunction());
// 会话窗口:30 分钟无活动则结束会话
DataStream<SessionStats> sessionStats = behaviors
.keyBy(Behavior::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregateFunction());
// 计数窗口:每 100 条数据触发一次
DataStream<OrderStats> countStats = orders
.keyBy(Order::getUserId)
.countWindow(100)
.aggregate(new OrderAggregateFunction());
4. Watermark
Watermark 用于处理乱序数据:
// 设置 Watermark 策略
WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // 容忍 10 秒乱序
.withTimestampAssigner((order, timestamp) -> order.getCreateTime()) // 提取时间戳
.withIdleness(Duration.ofMinutes(1)); // 1 分钟无数据则推进 Watermark
DataStream<Order> watermarkedStream = stream
.assignTimestampsAndWatermarks(watermarkStrategy);
Watermark 工作原理:
但在容忍范围内 Watermark->>Window: 接受数据 Source->>Watermark: Event(t=5) Watermark->>Window: Watermark(t=4) Note over Window: 触发 [0-5s] 窗口 Source->>Watermark: Event(t=1, 迟到) Note over Watermark: t=1 < Watermark(t=4)
超出容忍范围 Watermark->>Window: 丢弃或侧输出
5. 状态(State)
Flink 支持多种状态类型:
// ValueState:单值状态
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> countState = getRuntimeContext().getState(descriptor);
Long count = countState.value();
countState.update(count == null ? 1 : count + 1);
// ListState:列表状态
ListStateDescriptor<String> listDescriptor =
new ListStateDescriptor<>("list", String.class);
ListState<String> listState = getRuntimeContext().getListState(listDescriptor);
listState.add("item");
for (String item : listState.get()) {
// 处理列表项
}
// MapState:映射状态
MapStateDescriptor<String, Long> mapDescriptor =
new MapStateDescriptor<>("map", String.class, Long.class);
MapState<String, Long> mapState = getRuntimeContext().getMapState(mapDescriptor);
mapState.put("key", 100L);
Long value = mapState.get("key");
// ReducingState:聚合状态
ReducingStateDescriptor<Long> reducingDescriptor =
new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class);
ReducingState<Long> reducingState = getRuntimeContext().getReducingState(reducingDescriptor);
reducingState.add(10L);
Long sum = reducingState.get();
6. Checkpoint
Checkpoint 是 Flink 的容错机制:
// Checkpoint 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint
env.enableCheckpointing(60000); // 60 秒
// Checkpoint 模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Checkpoint 超时
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 容忍失败次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 外部化 Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");
Checkpoint 流程:
存储层技术
存储层负责数据的持久化存储和快速查询,是实时数仓的数据服务层。
主流存储引擎对比:
| 存储引擎 | 类型 | 查询延迟 | 写入吞吐 | 更新支持 | 并发查询 | 压缩比 | 适用场景 |
|---|---|---|---|---|---|---|---|
| ClickHouse | 列式 OLAP | 秒级 | 极高 | ❌ | 高 | 10:1 | 实时分析、大屏 |
| Hudi | 数据湖 | 秒级 | 高 | ✅ | 中 | 5:1 | 流批一体、CDC |
| Iceberg | 数据湖 | 秒级 | 高 | ✅ | 高 | 5:1 | 大规模数据湖 |
| Doris | MPP OLAP | 秒级 | 高 | ✅ | 极高 | 8:1 | 实时报表 |
| Redis | KV 内存 | 毫秒级 | 极高 | ✅ | 极高 | 1:1 | 实时推荐、风控 |
| HBase | KV 列式 | 毫秒级 | 高 | ✅ | 高 | 3:1 | 用户画像、明细 |
| Elasticsearch | 搜索引擎 | 秒级 | 高 | ✅ | 高 | 3:1 | 日志分析、搜索 |
ClickHouse 详解:
ClickHouse 是俄罗斯 Yandex 开源的列式 OLAP 数据库,以极致的查询性能著称。
ClickHouse 核心特性:
1. 列式存储
数据按列存储,而非按行:
- 压缩比高:同一列数据类型相同,压缩效果好
- 查询快:只读取需要的列,减少 IO
- 适合分析:OLAP 查询通常只涉及部分列
2. 向量化执行
使用 SIMD 指令批量处理数据:
- CPU 利用率高:充分利用 CPU 缓存
- 性能提升:相比逐行处理提升 10 倍以上
3. 分布式架构
支持水平扩展:
- 分片(Shard):数据分布到多个节点
- 副本(Replica):数据备份,保证可靠性
- 并行查询:多节点并行处理
4. 实时写入
支持高并发实时写入:
- MergeTree 引擎:后台异步合并数据
- 批量写入:建议 10000+ 行批量写入
- 分区管理:按时间分区,便于管理
ClickHouse 表引擎:
| 引擎 | 特点 | 适用场景 | 示例 |
|---|---|---|---|
| MergeTree | 最常用,支持主键、分区、采样 | 通用场景 | 订单明细表 |
| ReplacingMergeTree | 去重,保留最新数据 | CDC 同步 | 用户信息表 |
| SummingMergeTree | 自动聚合 | 预聚合 | 订单统计表 |
| AggregatingMergeTree | 复杂聚合 | 复杂指标 | 用户行为统计 |
| CollapsingMergeTree | 支持更新删除 | 状态变更 | 订单状态表 |
| VersionedCollapsingMergeTree | 带版本的更新删除 | 复杂状态 | 库存变更表 |
ClickHouse 表设计示例:
-- MergeTree:订单明细表
CREATE TABLE orders (
order_id String,
user_id String,
product_id String,
amount Decimal(18, 2),
status String,
province String,
city String,
create_time DateTime,
update_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time) -- 按月分区
ORDER BY (user_id, order_id, create_time) -- 排序键
SETTINGS index_granularity = 8192; -- 索引粒度
-- ReplacingMergeTree:用户信息表(去重)
CREATE TABLE user_profile (
user_id String,
name String,
age UInt8,
gender String,
province String,
city String,
update_time DateTime
) ENGINE = ReplacingMergeTree(update_time) -- 按 update_time 去重
ORDER BY user_id;
-- SummingMergeTree:订单统计表(自动聚合)
CREATE TABLE order_stats (
date Date,
province String,
category String,
order_count UInt64,
total_amount Decimal(18, 2)
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province, category);
-- AggregatingMergeTree:用户行为统计(复杂聚合)
CREATE TABLE user_behavior_stats (
date Date,
user_id String,
pv SimpleAggregateFunction(sum, UInt64), -- 简单聚合
uv AggregateFunction(uniq, String) -- 复杂聚合(去重计数)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, user_id);
ClickHouse 分区设计:
-- 按月分区(推荐)
PARTITION BY toYYYYMM(create_time)
-- 按天分区(数据量大时)
PARTITION BY toYYYYMMDD(create_time)
-- 按小时分区(实时场景)
PARTITION BY toYYYYMMDDhh(create_time)
-- 多级分区
PARTITION BY (toYYYYMM(create_time), province)
-- 查看分区
SELECT
partition,
name,
rows,
bytes_on_disk,
formatReadableSize(bytes_on_disk) AS size
FROM system.parts
WHERE table = 'orders' AND active
ORDER BY partition DESC;
-- 删除旧分区
ALTER TABLE orders DROP PARTITION '202401';
-- 优化分区(合并小文件)
OPTIMIZE TABLE orders PARTITION '202402';
ClickHouse 索引优化:
-- 主键索引(ORDER BY)
ORDER BY (user_id, create_time)
-- 跳数索引(Skip Index)
-- minmax:数值范围查询
ALTER TABLE orders ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;
-- set:低基数字段
ALTER TABLE orders ADD INDEX idx_province province TYPE set(100) GRANULARITY 4;
-- bloom_filter:等值查询
ALTER TABLE orders ADD INDEX idx_status status TYPE bloom_filter GRANULARITY 4;
-- ngrambf_v1:模糊查询
ALTER TABLE orders ADD INDEX idx_user_name user_name TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4;
-- 物化列(Materialized Column)
ALTER TABLE orders ADD COLUMN order_date Date MATERIALIZED toDate(create_time);
-- 物化视图(Materialized View)
CREATE MATERIALIZED VIEW order_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province)
AS SELECT
toDate(create_time) AS date,
province,
count() AS order_count,
sum(amount) AS total_amount
FROM orders
GROUP BY date, province;
ClickHouse 查询优化:
-- 使用 PREWHERE 过滤(比 WHERE 快)
SELECT *
FROM orders
PREWHERE create_time >= '2024-01-01' -- 先过滤
WHERE amount > 100; -- 再过滤
-- 避免 SELECT *
SELECT order_id, amount, create_time
FROM orders;
-- 使用 FINAL 去重(ReplacingMergeTree)
SELECT *
FROM user_profile
FINAL
WHERE user_id = 'user123';
-- 分布式查询优化
SELECT province, sum(amount)
FROM orders
WHERE create_time >= today()
GROUP BY province
SETTINGS distributed_aggregation_memory_efficient = 1;
-- 使用物化视图
SELECT date, province, order_count, total_amount
FROM order_stats_mv
WHERE date >= '2024-01-01';
-- 并行查询
SELECT *
FROM orders
SETTINGS max_threads = 8;
ClickHouse 写入优化:
-- 批量写入(推荐 10000-100000 行)
INSERT INTO orders VALUES
('order1', 'user1', 'product1', 100.00, 'PAID', '北京', '北京', '2024-01-01 10:00:00', '2024-01-01 10:00:00'),
('order2', 'user2', 'product2', 200.00, 'PAID', '上海', '上海', '2024-01-01 10:01:00', '2024-01-01 10:01:00'),
-- ... 更多数据
;
-- 异步写入
INSERT INTO orders SETTINGS async_insert = 1, wait_for_async_insert = 0
VALUES (...);
-- 并行写入(多个客户端同时写入不同分区)
-- 写入性能监控
SELECT
table,
sum(rows) AS total_rows,
sum(bytes) AS total_bytes,
formatReadableSize(total_bytes) AS size,
count() AS parts_count
FROM system.parts
WHERE table = 'orders' AND active
GROUP BY table;
数据湖技术
数据湖技术是现代实时数仓的重要组成部分,提供了流批一体、ACID 事务、Schema Evolution 等能力。
Hudi vs Iceberg vs Delta Lake:
| 特性 | Hudi | Iceberg | Delta Lake |
|---|---|---|---|
| 提出公司 | Uber | Netflix | Databricks |
| 开源时间 | 2019 | 2018 | 2019 |
| ACID 支持 | ✅ | ✅ | ✅ |
| 更新删除 | ✅ | ✅ | ✅ |
| 时间旅行 | ✅ | ✅ | ✅ |
| Schema Evolution | ✅ | ✅ | ✅ |
| 流式写入 | ✅ 强 | ⚠️ 一般 | ⚠️ 一般 |
| 查询性能 | ⚠️ 一般 | ✅ 好 | ✅ 好 |
| Flink 集成 | ✅ 原生支持 | ✅ 支持 | ❌ 不支持 |
| Spark 集成 | ✅ | ✅ | ✅ |
| Presto 集成 | ✅ | ✅ | ⚠️ 一般 |
| 小文件问题 | ✅ 自动合并 | ⚠️ 需手动 | ⚠️ 需手动 |
| 增量查询 | ✅ 强 | ⚠️ 一般 | ⚠️ 一般 |
Hudi 核心概念:
1. 表类型
Hudi 支持两种表类型:
| 表类型 | 写入方式 | 查询方式 | 适用场景 |
|---|---|---|---|
| COW | 写时合并,生成新的 Parquet 文件 | 直接查询 Parquet 文件 | 读多写少 |
| MOR | 写入 Delta Log,异步合并 | 合并 Base + Delta | 写多读少 |
2. 查询类型
Hudi 支持三种查询类型:
| 查询类型 | 说明 | 适用场景 |
|---|---|---|
| Snapshot Query | 快照查询,查询最新数据 | 实时查询 |
| Incremental Query | 增量查询,查询变更数据 | CDC、增量同步 |
| Read Optimized Query | 只查询 Base 文件,性能最好 | 批处理 |
Hudi 使用示例:
// Flink 写入 Hudi
import org.apache.hudi.configuration.FlinkOptions
val hoodieOptions = Map(
FlinkOptions.PATH.key() -> "hdfs://namenode:9000/hudi/orders",
FlinkOptions.TABLE_NAME.key() -> "orders",
FlinkOptions.TABLE_TYPE.key() -> "MERGE_ON_READ", // 表类型
FlinkOptions.PRECOMBINE_FIELD.key() -> "update_time", // 去重字段
FlinkOptions.RECORD_KEY_FIELD.key() -> "order_id", // 主键
FlinkOptions.PARTITION_PATH_FIELD.key() -> "dt", // 分区字段
FlinkOptions.OPERATION.key() -> "upsert", // 操作类型
FlinkOptions.WRITE_TASKS.key() -> "4", // 写入并行度
FlinkOptions.COMPACTION_TASKS.key() -> "2" // 压缩并行度
)
dataStream
.addSink(
HoodieFlinkStreamer.builder()
.withOptions(hoodieOptions)
.build()
)
// Spark 读取 Hudi
val df = spark.read
.format("hudi")
.load("hdfs://namenode:9000/hudi/orders")
// 增量查询
val incrementalDF = spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240101000000")
.load("hdfs://namenode:9000/hudi/orders")
// 时间旅行
val historicalDF = spark.read
.format("hudi")
.option("as.of.instant", "20240101000000")
.load("hdfs://namenode:9000/hudi/orders")
Iceberg 核心特性:
1. 隐藏分区
Iceberg 支持隐藏分区,用户无需关心分区字段:
-- 创建表时定义分区
CREATE TABLE orders (
order_id STRING,
user_id STRING,
amount DECIMAL(18, 2),
create_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(create_time)); -- 按天分区
-- 查询时无需指定分区
SELECT * FROM orders WHERE create_time >= '2024-01-01';
2. Schema Evolution
Iceberg 支持灵活的 Schema 变更:
-- 添加列
ALTER TABLE orders ADD COLUMN province STRING;
-- 删除列
ALTER TABLE orders DROP COLUMN province;
-- 重命名列
ALTER TABLE orders RENAME COLUMN amount TO order_amount;
-- 修改列类型
ALTER TABLE orders ALTER COLUMN age TYPE BIGINT;
3. 时间旅行
Iceberg 支持查询历史版本:
-- 查询指定时间点的数据
SELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-01-01 00:00:00';
-- 查询指定快照的数据
SELECT * FROM orders FOR SYSTEM_VERSION AS OF 123456789;
-- 查看快照历史
SELECT * FROM orders.snapshots;
存储选型建议:
要求?"} B -->|"毫秒级"| C["Redis/HBase"] B -->|"秒级"| D{"是否需要
更新删除?"} D -->|"是"| E{"数据规模?"} D -->|"否"| F["ClickHouse"] E -->|"TB 级"| G["Hudi/Doris"] E -->|"PB 级"| H["Iceberg"] style C fill:#ff6b6b style F fill:#51cf66 style G fill:#4dabf7 style H fill:#ffd43b
| 场景 | 推荐存储 | 理由 |
|---|---|---|
| 实时大屏 | ClickHouse | 查询快,支持复杂分析 |
| 实时报表 | Doris/ClickHouse | 支持更新,查询性能好 |
| 实时推荐 | Redis/HBase | 毫秒级查询,支持高并发 |
| 实时风控 | Redis | 极低延迟,支持复杂数据结构 |
| CDC 同步 | Hudi/Iceberg | 支持更新删除,流批一体 |
| 数据湖 | Iceberg | 大规模数据,生态完善 |
| 日志分析 | Elasticsearch | 全文搜索,日志分析 |
实时数仓分层设计
实时数仓的分层设计借鉴了离线数仓的分层思想,通过分层解耦,提高数据的可维护性和复用性。
实时数仓分层架构:
原始订单"] B2["ods_user
原始用户"] B3["ods_log
原始日志"] end subgraph "DWD 层 (明细数据层)" C1["dwd_order
清洗后订单"] C2["dwd_user
清洗后用户"] C3["dwd_behavior
用户行为"] end subgraph "DWS 层 (汇总数据层)" D1["dws_order_1min
订单分钟汇总"] D2["dws_user_1hour
用户小时汇总"] D3["dws_traffic_1day
流量天汇总"] end subgraph "ADS 层 (应用数据层)" E1["ads_realtime_gmv
实时 GMV"] E2["ads_user_portrait
用户画像"] E3["ads_traffic_monitor
流量监控"] end A1 --> B1 A2 --> B2 A3 --> B3 B1 --> C1 B2 --> C2 B3 --> C3 C1 --> D1 C2 --> D2 C3 --> D3 D1 --> E1 D2 --> E2 D3 --> E3 style B1 fill:#a5d8ff style B2 fill:#a5d8ff style B3 fill:#a5d8ff style C1 fill:#74c0fc style C2 fill:#74c0fc style C3 fill:#74c0fc style D1 fill:#4dabf7 style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#51cf66 style E3 fill:#51cf66
ODS 层设计
ODS(Operational Data Store)层是实时数仓的第一层,存储从数据源采集的原始数据。
核心职责:
- 数据接入:从各种数据源实时采集数据
- 数据存储:保持数据原貌,不做任何处理
- 数据分发:为下游提供统一的数据源
- 数据备份:作为数据的第一道防线
设计原则:
保持原貌
- 不做任何业务逻辑处理
- 保留所有字段,包括系统字段
- 记录数据采集时间
- 保留原始数据类型
统一格式
- 统一字段命名规范(驼峰或下划线)
- 统一数据类型(String/Int/Decimal)
- 统一时间格式(ISO 8601)
- 统一编码格式(UTF-8)
分区设计
- 按天或按小时分区
- 便于数据回溯和清理
- 支持增量处理
元数据管理
- 记录数据来源
- 记录采集时间
- 记录数据版本
ODS 层表设计示例:
-- Kafka Topic: ods_order_topic
-- ClickHouse 表结构
CREATE TABLE ods_order (
-- 业务字段
order_id String,
user_id String,
product_id String,
amount Decimal(18, 2),
status String,
province String,
city String,
create_time DateTime,
update_time DateTime,
-- 元数据字段
source_db String, -- 来源数据库
source_table String, -- 来源表
binlog_file String, -- Binlog 文件
binlog_pos UInt64, -- Binlog 位置
event_type String, -- 事件类型 (INSERT/UPDATE/DELETE)
event_time DateTime, -- 事件时间
-- 系统字段
etl_time DateTime DEFAULT now(), -- 数据采集时间
etl_date Date DEFAULT today() -- 分区字段
) ENGINE = MergeTree()
PARTITION BY etl_date
ORDER BY (order_id, create_time);
Flink 写入 ODS 层:
public class OdsWriterJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Flink CDC 读取 MySQL
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("ecommerce")
.tableList("ecommerce.orders")
.username("root")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStream<String> cdcStream = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
// 解析 CDC 事件,添加元数据
DataStream<OdsOrder> odsStream = cdcStream
.map(new RichMapFunction<String, OdsOrder>() {
@Override
public OdsOrder map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
JSONObject source = json.getJSONObject("source");
JSONObject after = json.getJSONObject("after");
OdsOrder order = new OdsOrder();
// 业务字段
order.setOrderId(after.getString("order_id"));
order.setUserId(after.getString("user_id"));
order.setAmount(after.getBigDecimal("amount"));
// ... 其他字段
// 元数据字段
order.setSourceDb(source.getString("db"));
order.setSourceTable(source.getString("table"));
order.setBinlogFile(source.getString("file"));
order.setBinlogPos(source.getLong("pos"));
order.setEventType(json.getString("op"));
order.setEventTime(source.getLong("ts_ms"));
order.setEtlTime(System.currentTimeMillis());
return order;
}
});
// 写入 Kafka ODS Topic
odsStream.sinkTo(
KafkaSink.<OdsOrder>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("ods_order_topic")
.setValueSerializationSchema(new JsonSerializationSchema<>())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()
);
env.execute("ODS Writer Job");
}
}
DWD 层设计
DWD(Data Warehouse Detail)层是实时数仓的明细层,对 ODS 层数据进行清洗、转换和关联。
核心职责:
- 数据清洗:过滤脏数据、异常数据
- 数据转换:字段映射、类型转换、格式统一
- 数据关联:维度关联、事实表关联
- 数据拆分:宽表拆分、主题域划分
- 数据标准化:统一业务口径
设计原则:
业务含义清晰
- 字段命名符合业务语义
- 添加必要的业务字段
- 保留明细粒度
- 便于业务理解
数据质量保证
- 数据去重
- 数据校验
- 异常数据处理
- 数据完整性检查
维度关联
- 关联维度表补充信息
- 使用维度 ID 而非维度值
- 缓慢变化维度处理
性能优化
- 合理设置分区
- 优化关联方式
- 控制数据量
DWD 层表设计示例:
-- ClickHouse DWD 层表
CREATE TABLE dwd_order (
-- 订单信息
order_id String,
order_no String,
order_type String,
-- 用户信息(关联维度)
user_id String,
user_name String,
user_level String,
user_type String,
-- 商品信息(关联维度)
product_id String,
product_name String,
category_id String,
category_name String,
brand_id String,
brand_name String,
-- 金额信息
original_amount Decimal(18, 2),
discount_amount Decimal(18, 2),
final_amount Decimal(18, 2),
-- 状态信息
status String,
pay_type String,
-- 地域信息
province String,
city String,
district String,
-- 时间信息
create_time DateTime,
pay_time DateTime,
update_time DateTime,
-- 系统字段
etl_time DateTime DEFAULT now(),
etl_date Date DEFAULT today()
) ENGINE = MergeTree()
PARTITION BY etl_date
ORDER BY (user_id, order_id, create_time);
Flink DWD 层处理逻辑:
public class DwdProcessJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取 ODS 层数据
DataStream<OdsOrder> odsStream = env
.addSource(new FlinkKafkaConsumer<>("ods_order_topic", ...));
// 读取用户维度表(广播流)
DataStream<DimUser> userStream = env
.addSource(new FlinkKafkaConsumer<>("dim_user_topic", ...));
MapStateDescriptor<String, DimUser> userStateDescriptor =
new MapStateDescriptor<>("user-broadcast-state", String.class, DimUser.class);
BroadcastStream<DimUser> userBroadcast = userStream.broadcast(userStateDescriptor);
// 关联维度表
DataStream<DwdOrder> dwdStream = odsStream
.connect(userBroadcast)
.process(new BroadcastProcessFunction<OdsOrder, DimUser, DwdOrder>() {
@Override
public void processElement(OdsOrder order, ReadOnlyContext ctx,
Collector<DwdOrder> out) throws Exception {
// 从广播状态中获取用户信息
ReadOnlyBroadcastState<String, DimUser> state =
ctx.getBroadcastState(userStateDescriptor);
DimUser user = state.get(order.getUserId());
if (user != null) {
DwdOrder dwdOrder = new DwdOrder();
// 订单信息
dwdOrder.setOrderId(order.getOrderId());
dwdOrder.setAmount(order.getAmount());
// 用户信息
dwdOrder.setUserId(user.getUserId());
dwdOrder.setUserName(user.getUserName());
dwdOrder.setUserLevel(user.getUserLevel());
out.collect(dwdOrder);
}
}
@Override
public void processBroadcastElement(DimUser user, Context ctx,
Collector<DwdOrder> out) throws Exception {
// 更新广播状态
ctx.getBroadcastState(userStateDescriptor).put(user.getUserId(), user);
}
})
// 数据清洗
.filter(order -> order.getAmount() != null && order.getAmount().compareTo(BigDecimal.ZERO) > 0)
.filter(order -> order.getUserId() != null)
// 数据转换
.map(new OrderTransformMapper());
// 写入 ClickHouse DWD 层
dwdStream.addSink(new ClickHouseSink<>(...));
env.execute("DWD Process Job");
}
}
DWS 层设计
DWS(Data Warehouse Summary)层是实时数仓的汇总层,对 DWD 层数据进行聚合计算。
核心职责:
- 数据聚合:按时间窗口、维度进行聚合
- 指标计算:计算业务指标
- 轻度汇总:为 ADS 层提供基础数据
- 性能优化:减少 ADS 层计算压力
设计原则:
粒度设计
- 按时间粒度:分钟、小时、天
- 按维度粒度:用户、商品、地区
- 平衡查询性能和存储成本
指标设计
- 原子指标:不可再拆分的指标
- 派生指标:基于原子指标计算
- 复合指标:多个指标组合
窗口设计
- 滚动窗口:固定时间窗口
- 滑动窗口:重叠时间窗口
- 会话窗口:基于活动的窗口
聚合策略
- 增量聚合:来一条处理一条
- 全量聚合:窗口结束时统计
- 混合聚合:先增量再全量
DWS 层表设计示例:
-- 订单分钟级汇总表
CREATE TABLE dws_order_1min (
window_start DateTime,
window_end DateTime,
province String,
city String,
category String,
-- 聚合指标
order_count UInt64,
user_count UInt64,
product_count UInt64,
total_amount Decimal(18, 2),
avg_amount Decimal(18, 2),
max_amount Decimal(18, 2),
min_amount Decimal(18, 2),
etl_time DateTime DEFAULT now()
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, province, city, category);
-- 用户小时级汇总表
CREATE TABLE dws_user_1hour (
window_start DateTime,
window_end DateTime,
user_id String,
user_level String,
-- 行为指标
pv UInt64,
uv UInt64,
click_count UInt64,
cart_count UInt64,
order_count UInt64,
-- 金额指标
total_amount Decimal(18, 2),
avg_order_amount Decimal(18, 2),
-- 转化指标
click_rate Float64,
cart_rate Float64,
conversion_rate Float64,
etl_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(etl_time)
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, user_id);
Flink DWS 层聚合逻辑:
public class DwsAggregateJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取 DWD 层数据
DataStream<DwdOrder> dwdStream = env
.addSource(new FlinkKafkaConsumer<>("dwd_order_topic", ...));
// 设置 Watermark
DataStream<DwdOrder> watermarkedStream = dwdStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<DwdOrder>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreateTime())
);
// 按维度分组 + 滚动窗口聚合
DataStream<DwsOrderStats> dwsStream = watermarkedStream
.keyBy(order -> Tuple3.of(
order.getProvince(),
order.getCity(),
order.getCategory()
))
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<DwdOrder, OrderAccumulator, DwsOrderStats>() {
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator();
}
@Override
public OrderAccumulator add(DwdOrder order, OrderAccumulator acc) {
acc.orderCount++;
acc.userSet.add(order.getUserId());
acc.productSet.add(order.getProductId());
acc.totalAmount = acc.totalAmount.add(order.getFinalAmount());
acc.maxAmount = acc.maxAmount.max(order.getFinalAmount());
acc.minAmount = acc.minAmount.min(order.getFinalAmount());
return acc;
}
@Override
public DwsOrderStats getResult(OrderAccumulator acc) {
return DwsOrderStats.builder()
.orderCount(acc.orderCount)
.userCount(acc.userSet.size())
.productCount(acc.productSet.size())
.totalAmount(acc.totalAmount)
.avgAmount(acc.totalAmount.divide(
BigDecimal.valueOf(acc.orderCount), 2, RoundingMode.HALF_UP))
.maxAmount(acc.maxAmount)
.minAmount(acc.minAmount)
.build();
}
@Override
public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
a.orderCount += b.orderCount;
a.userSet.addAll(b.userSet);
a.productSet.addAll(b.productSet);
a.totalAmount = a.totalAmount.add(b.totalAmount);
a.maxAmount = a.maxAmount.max(b.maxAmount);
a.minAmount = a.minAmount.min(b.minAmount);
return a;
}
});
// 写入 ClickHouse DWS 层
dwsStream.addSink(new ClickHouseSink<>(...));
env.execute("DWS Aggregate Job");
}
}
ADS 层设计
ADS(Application Data Store)层是实时数仓的应用层,面向具体业务场景提供数据服务。
核心职责:
- 业务主题:按业务主题组织数据
- 指标计算:计算复杂业务指标
- 数据服务:为前端应用提供数据接口
- 性能优化:针对查询场景优化
设计原则:
面向应用
- 按业务场景设计表结构
- 字段命名符合前端需求
- 查询性能优先
- 减少关联查询
指标完整
- 包含所有业务指标
- 支持多维度分析
- 支持实时查询
- 支持历史对比
易于使用
- 表结构简单清晰
- 减少关联查询
- 提供标准 API
- 文档完善
性能保证
- 使用物化视图
- 使用缓存(Redis)
- 预计算指标
- 合理分区
ADS 层表设计示例:
-- 实时 GMV 大屏表
CREATE TABLE ads_realtime_gmv (
stat_time DateTime,
time_granularity String, -- 时间粒度 (1min/5min/1hour)
-- 核心指标
gmv Decimal(18, 2),
order_count UInt64,
user_count UInt64,
product_count UInt64,
avg_order_amount Decimal(18, 2),
-- 同比环比
gmv_yoy Decimal(10, 2), -- GMV 同比增长率
gmv_mom Decimal(10, 2), -- GMV 环比增长率
order_yoy Decimal(10, 2),
order_mom Decimal(10, 2),
-- TOP 数据
top_province String,
top_category String,
top_product String,
-- 系统字段
update_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY (stat_time, time_granularity);
-- 用户画像表
CREATE TABLE ads_user_portrait (
user_id String,
-- 基本信息
user_name String,
age UInt8,
gender String,
province String,
city String,
-- 行为特征
total_pv UInt64,
total_uv UInt64,
avg_session_duration UInt32,
last_visit_time DateTime,
-- 消费特征
total_order_count UInt64,
total_amount Decimal(18, 2),
avg_order_amount Decimal(18, 2),
max_order_amount Decimal(18, 2),
last_order_time DateTime,
-- 偏好特征
favorite_category Array(String),
favorite_brand Array(String),
-- 标签
user_tags Array(String),
user_level String,
-- 系统字段
update_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY user_id;
分层设计最佳实践
1. 分层原则
| 层级 | 中文名 | 数据粒度 | 主要操作 | 存储引擎 | 数据延迟 | 保留时间 |
|---|---|---|---|---|---|---|
| ODS | 操作数据层 | 明细 | 采集、存储 | Kafka/ClickHouse | 秒级 | 7-30 天 |
| DWD | 明细数据层 | 明细 | 清洗、转换、关联 | ClickHouse/Hudi | 秒级 | 30-90 天 |
| DWS | 汇总数据层 | 轻度汇总 | 聚合、计算 | ClickHouse | 分钟级 | 90-180 天 |
| ADS | 应用数据层 | 高度汇总 | 业务指标 | ClickHouse/Redis | 分钟级 | 180-365 天 |
2. 数据流转
不做处理 ODS->>DWD: 实时消费 Note over DWD: 清洗转换
维度关联 DWD->>DWS: 实时聚合 Note over DWS: 窗口计算
指标聚合 DWS->>ADS: 指标计算 Note over ADS: 业务指标
多维分析 ADS->>App: API 查询 Note over App: 实时大屏
实时报表
3. 命名规范
| 层级 | 表名前缀 | 示例 | 说明 |
|---|---|---|---|
| ODS | ods_ | ods_order | 原始数据 |
| DWD | dwd_ | dwd_order | 明细数据 |
| DWS | dws_ | dws_order_1min | 汇总数据 + 时间粒度 |
| ADS | ads_ | ads_realtime_gmv | 应用数据 + 业务主题 |
| DIM | dim_ | dim_user | 维度数据 |
4. 数据质量保证
// 数据质量检查
public class DataQualityChecker extends ProcessFunction<Order, Order> {
private transient Counter validCounter;
private transient Counter invalidCounter;
private transient Histogram qualityScore;
@Override
public void open(Configuration parameters) {
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
validCounter = metricGroup.counter("valid_records");
invalidCounter = metricGroup.counter("invalid_records");
qualityScore = metricGroup.histogram("quality_score",
new DescriptiveStatisticsHistogram(1000));
}
@Override
public void processElement(Order order, Context ctx, Collector<Order> out) {
int score = 100;
List<String> errors = new ArrayList<>();
// 必填字段检查
if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
errors.add("订单 ID 为空");
score -= 20;
}
// 数值范围检查
if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
errors.add("订单金额异常");
score -= 20;
}
// 枚举值检查
if (!Arrays.asList("PENDING", "PAID", "CANCELLED").contains(order.getStatus())) {
errors.add("订单状态非法");
score -= 15;
}
// 时间合理性检查
if (order.getCreateTime() == null || order.getCreateTime() > System.currentTimeMillis()) {
errors.add("订单时间异常");
score -= 15;
}
// 数据完整性检查
if (order.getUserId() == null) {
errors.add("用户 ID 为空");
score -= 15;
}
qualityScore.update(score);
if (score >= 80) {
validCounter.inc();
out.collect(order);
} else {
invalidCounter.inc();
log.error("数据质量不合格: orderId={}, score={}, errors={}",
order.getOrderId(), score, errors);
// 发送到错误队列
ctx.output(errorOutputTag, order);
}
}
}
5. 分层优化建议
| 优化项 | ODS 层 | DWD 层 | DWS 层 | ADS 层 |
|---|---|---|---|---|
| 分区策略 | 按天 | 按天 | 按天/小时 | 按月 |
| 压缩算法 | LZ4 | LZ4 | LZ4 | ZSTD |
| 索引策略 | 主键索引 | 主键 + 跳数索引 | 主键 + 物化视图 | 全文索引 |
| TTL 策略 | 7-30 天 | 30-90 天 | 90-180 天 | 180-365 天 |
| 备份策略 | 不备份 | 增量备份 | 增量备份 | 全量备份 |
实时数据采集与同步
CDC 技术原理
CDC(Change Data Capture) 是实时数仓的核心技术,通过捕获数据库的变更日志实现数据的实时同步。
CDC 工作原理:
Binlog 解析
- MySQL 的 Binlog 记录了所有数据变更
- CDC 工具伪装成 MySQL Slave 读取 Binlog
- 解析 Binlog 事件,转换为变更记录
事件类型
- INSERT:新增数据
- UPDATE:更新数据
- DELETE:删除数据
- DDL:表结构变更
数据格式
- Before:变更前的数据
- After:变更后的数据
- Metadata:元数据信息
Binlog 格式:
| 格式 | 说明 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Statement | 记录 SQL 语句 | 日志量小 | 可能不一致 | 不推荐 |
| Row | 记录行变更 | 数据一致 | 日志量大 | 推荐 |
| Mixed | 混合模式 | 折中方案 | 复杂 | 特殊场景 |
配置 MySQL Binlog:
-- 查看 Binlog 配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
-- 开启 Binlog(需要重启)
-- my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=7
-- 创建 CDC 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
-- 查看 Binlog 文件
SHOW BINARY LOGS;
-- 查看 Binlog 内容
SHOW BINLOG EVENTS IN 'mysql-bin.000001';
Canal 实战
Canal 部署:
# 下载 Canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz -C /opt/canal
# 配置 Canal
cd /opt/canal/conf/example
# instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=test
# 订阅规则
canal.instance.filter.regex=test_db\\..*
canal.instance.filter.black.regex=test_db\\.tmp_.*
# 启动 Canal
cd /opt/canal
sh bin/startup.sh
# 查看日志
tail -f logs/canal/canal.log
tail -f logs/example/example.log
Canal Client 示例:
public class CanalClient {
public static void main(String[] args) {
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"",
""
);
try {
connector.connect();
connector.subscribe("test_db\\..*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
printEntry(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error", e);
}
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() +
" update=" + column.getUpdated());
}
}
}
Canal 发送到 Kafka:
# canal.properties
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = lz4
kafka.batch.size = 16384
kafka.linger.ms = 10
# instance.properties
canal.mq.topic = canal_topic
canal.mq.partition = 0
canal.mq.partitionsNum = 3
canal.mq.partitionHash = test_db.orders:id
Debezium 实战
Debezium 部署(Kafka Connect):
# 下载 Debezium
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.0.Final/debezium-connector-mysql-2.1.0.Final-plugin.tar.gz
tar -zxvf debezium-connector-mysql-2.1.0.Final-plugin.tar.gz -C /opt/kafka/plugins/
# 配置 Kafka Connect
# connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
plugin.path=/opt/kafka/plugins
# 启动 Kafka Connect
bin/connect-distributed.sh config/connect-distributed.properties
创建 Debezium Connector:
# 创建 MySQL Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "inventory",
"table.include.list": "inventory.orders,inventory.products",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"snapshot.mode": "initial",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}'
# 查看 Connector 状态
curl http://localhost:8083/connectors/mysql-connector/status
# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector
Debezium 数据格式:
{
"before": null,
"after": {
"id": 1001,
"order_no": "ORD20240228001",
"user_id": "user123",
"amount": 199.99,
"status": "PAID",
"create_time": 1709107200000
},
"source": {
"version": "2.1.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1709107200123,
"snapshot": "false",
"db": "inventory",
"table": "orders",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": null
},
"op": "c",
"ts_ms": 1709107200456,
"transaction": null
}
Flink CDC 实战
Flink CDC 依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
Flink CDC 使用示例:
public class FlinkCDCJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
// 配置 MySQL CDC Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.orders", "inventory.products")
.username("root")
.password("password")
.serverTimeZone("Asia/Shanghai")
.startupOptions(StartupOptions.initial()) // 全量 + 增量
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 创建数据流
DataStreamSource<String> cdcStream = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
// 处理 CDC 数据
DataStream<Order> orderStream = cdcStream
.map(new MapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
String op = json.getString("op");
Order order = new Order();
switch (op) {
case "c": // CREATE (INSERT)
case "r": // READ (全量同步)
JSONObject after = json.getJSONObject("after");
order.setOrderId(after.getString("id"));
order.setOrderNo(after.getString("order_no"));
order.setUserId(after.getString("user_id"));
order.setAmount(after.getBigDecimal("amount"));
order.setStatus(after.getString("status"));
order.setCreateTime(after.getLong("create_time"));
order.setDeleted(false);
break;
case "u": // UPDATE
after = json.getJSONObject("after");
order.setOrderId(after.getString("id"));
order.setStatus(after.getString("status"));
order.setDeleted(false);
break;
case "d": // DELETE
JSONObject before = json.getJSONObject("before");
order.setOrderId(before.getString("id"));
order.setDeleted(true);
break;
}
return order;
}
})
.filter(order -> order.getOrderId() != null);
// 写入 Kafka
orderStream.sinkTo(
KafkaSink.<Order>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("dwd_order")
.setValueSerializationSchema(new JsonSerializationSchema<>())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()
);
env.execute("Flink CDC Job");
}
}
Flink CDC Table API:
public class FlinkCDCTableJob {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 MySQL CDC 表
tableEnv.executeSql(
"CREATE TABLE orders (" +
" id BIGINT," +
" order_no STRING," +
" user_id STRING," +
" amount DECIMAL(18, 2)," +
" status STRING," +
" create_time TIMESTAMP(3)," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'password'," +
" 'database-name' = 'inventory'," +
" 'table-name' = 'orders'" +
")"
);
// 创建 Kafka Sink 表
tableEnv.executeSql(
"CREATE TABLE kafka_orders (" +
" id BIGINT," +
" order_no STRING," +
" user_id STRING," +
" amount DECIMAL(18, 2)," +
" status STRING," +
" create_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'dwd_order'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// 数据同步
tableEnv.executeSql(
"INSERT INTO kafka_orders " +
"SELECT id, order_no, user_id, amount, status, create_time " +
"FROM orders"
);
}
}
全量与增量同步策略
同步策略对比:
| 策略 | 说明 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 全量同步 | 一次性同步所有数据 | 数据完整 | 时间长、资源消耗大 | 初始化、数据修复 |
| 增量同步 | 只同步变更数据 | 实时性好、资源消耗小 | 需要 CDC 支持 | 日常同步 |
| 全量 + 增量 | 先全量后增量 | 兼顾完整性和实时性 | 实现复杂 | 推荐方案 |
全量 + 增量同步流程:
全量 + 增量同步实现:
public class FullIncrementalSyncJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 MySQL CDC Source(全量 + 增量)
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.orders")
.username("root")
.password("password")
.startupOptions(StartupOptions.initial()) // 先全量再增量
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> cdcStream = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
// 处理数据
DataStream<Order> orderStream = cdcStream
.map(new ParseCDCFunction())
.filter(order -> order != null);
// 根据操作类型分流
OutputTag<Order> insertTag = new OutputTag<Order>("insert"){};
OutputTag<Order> updateTag = new OutputTag<Order>("update"){};
OutputTag<Order> deleteTag = new OutputTag<Order>("delete"){};
SingleOutputStreamOperator<Order> mainStream = orderStream
.process(new ProcessFunction<Order, Order>() {
@Override
public void processElement(Order order, Context ctx, Collector<Order> out) {
String op = order.getOp();
switch (op) {
case "c":
case "r":
ctx.output(insertTag, order);
break;
case "u":
ctx.output(updateTag, order);
break;
case "d":
ctx.output(deleteTag, order);
break;
}
}
});
// 分别处理不同类型的数据
DataStream<Order> insertStream = mainStream.getSideOutput(insertTag);
DataStream<Order> updateStream = mainStream.getSideOutput(updateTag);
DataStream<Order> deleteStream = mainStream.getSideOutput(deleteTag);
// 写入不同的 Sink
insertStream.addSink(new ClickHouseInsertSink());
updateStream.addSink(new ClickHouseUpdateSink());
deleteStream.addSink(new ClickHouseDeleteSink());
env.execute("Full Incremental Sync Job");
}
}
断点续传:
// 使用 Checkpoint 实现断点续传
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 从 Checkpoint 恢复
// flink run -s hdfs://namenode:9000/flink/checkpoints/chk-123 your-job.jar
数据一致性保证:
// Exactly-Once 配置
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Kafka Sink Exactly-Once
KafkaSink<Order> sink = KafkaSink.<Order>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-kafka-")
.build();
// ClickHouse 幂等写入
public class ClickHouseIdempotentSink extends RichSinkFunction<Order> {
@Override
public void invoke(Order order, Context context) {
// 使用 REPLACE INTO 或 INSERT ... ON DUPLICATE KEY UPDATE
String sql = "INSERT INTO orders (order_id, amount, status) " +
"VALUES (?, ?, ?) " +
"ON DUPLICATE KEY UPDATE amount=VALUES(amount), status=VALUES(status)";
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, order.getOrderId());
ps.setBigDecimal(2, order.getAmount());
ps.setString(3, order.getStatus());
ps.executeUpdate();
}
}
}
实时数据处理
数据清洗
数据清洗是保证数据质量的关键环节,包括去重、过滤、校验等操作。
常见清洗场景:
| 场景 | 问题 | 解决方案 | 示例 |
|---|---|---|---|
| 脏数据 | 字段缺失、格式错误 | 过滤或补全默认值 | NULL 值填充为 0 |
| 重复数据 | 同一条数据多次发送 | 去重(基于主键) | 使用状态去重 |
| 异常数据 | 数值超出合理范围 | 过滤或修正 | 金额 < 0 设为 0 |
| 空值处理 | NULL 值影响计算 | 填充默认值 | NULL 填充为 “未知” |
| 类型转换 | 字符串转数值失败 | 异常捕获和处理 | Try-Catch 处理 |
| 编码问题 | 乱码 | 统一编码格式 | UTF-8 编码 |
Flink 数据清洗示例:
public class DataCleanFunction extends RichMapFunction<Order, Order> {
private transient Counter invalidCounter;
private transient Counter cleanedCounter;
@Override
public void open(Configuration parameters) {
invalidCounter = getRuntimeContext()
.getMetricGroup()
.counter("invalid_records");
cleanedCounter = getRuntimeContext()
.getMetricGroup()
.counter("cleaned_records");
}
@Override
public Order map(Order order) throws Exception {
// 1. 字段校验
if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
invalidCounter.inc();
return null; // 过滤无效数据
}
// 2. 数值范围校验
if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) < 0) {
order.setAmount(BigDecimal.ZERO); // 修正异常值
}
// 3. 空值处理
if (order.getProvince() == null || order.getProvince().isEmpty()) {
order.setProvince("未知");
}
// 4. 类型转换
try {
if (order.getCreateTimeStr() != null) {
order.setCreateTime(
LocalDateTime.parse(order.getCreateTimeStr(),
DateTimeFormatter.ISO_DATE_TIME)
);
}
} catch (Exception e) {
order.setCreateTime(LocalDateTime.now());
}
// 5. 数据标准化
order.setProvince(order.getProvince().trim());
order.setStatus(order.getStatus().toUpperCase());
// 6. 数据脱敏
if (order.getUserPhone() != null) {
order.setUserPhone(maskPhone(order.getUserPhone()));
}
cleanedCounter.inc();
return order;
}
private String maskPhone(String phone) {
if (phone.length() == 11) {
return phone.substring(0, 3) + "****" + phone.substring(7);
}
return phone;
}
}
// 使用清洗函数
DataStream<Order> cleanedStream = rawStream
.map(new DataCleanFunction())
.filter(Objects::nonNull); // 过滤 null 值
基于状态的去重:
public class DeduplicationFunction extends KeyedProcessFunction<String, Order, Order> {
// 状态:记录已处理的订单 ID
private ValueState<Boolean> processedState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>("processed", Boolean.class);
// 设置状态 TTL,1 小时后自动清理
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, true)
.build();
descriptor.enableTimeToLive(ttlConfig);
processedState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Order order, Context ctx, Collector<Order> out)
throws Exception {
Boolean processed = processedState.value();
if (processed == null || !processed) {
// 首次处理
processedState.update(true);
out.collect(order);
}
// 重复数据,丢弃
}
}
// 按订单 ID 去重
DataStream<Order> deduplicatedStream = cleanedStream
.keyBy(Order::getOrderId)
.process(new DeduplicationFunction());
数据转换
数据转换是将数据从一种格式转换为另一种格式,包括字段映射、类型转换、格式统一等。
常见转换场景:
- 字段映射:源字段名到目标字段名的映射
- 类型转换:String 转 Int、Long 转 Timestamp 等
- 格式统一:日期格式、金额格式统一
- 字段拆分:一个字段拆分为多个字段
- 字段合并:多个字段合并为一个字段
- 字段计算:基于现有字段计算新字段
Flink 数据转换示例:
public class DataTransformFunction implements MapFunction<OdsOrder, DwdOrder> {
@Override
public DwdOrder map(OdsOrder ods) throws Exception {
DwdOrder dwd = new DwdOrder();
// 1. 字段映射
dwd.setOrderId(ods.getOrderId());
dwd.setUserId(ods.getUserId());
// 2. 类型转换
dwd.setAmount(new BigDecimal(ods.getAmountStr()));
dwd.setCreateTime(Long.parseLong(ods.getCreateTimeStr()));
// 3. 格式统一
dwd.setOrderNo(formatOrderNo(ods.getOrderNo()));
dwd.setPhone(formatPhone(ods.getPhone()));
// 4. 字段拆分
String[] address = ods.getAddress().split("-");
if (address.length >= 3) {
dwd.setProvince(address[0]);
dwd.setCity(address[1]);
dwd.setDistrict(address[2]);
}
// 5. 字段合并
dwd.setFullName(ods.getFirstName() + ods.getLastName());
// 6. 字段计算
dwd.setFinalAmount(
ods.getOriginalAmount().subtract(ods.getDiscountAmount())
);
dwd.setDiscountRate(
ods.getDiscountAmount()
.divide(ods.getOriginalAmount(), 4, RoundingMode.HALF_UP)
.multiply(new BigDecimal("100"))
);
return dwd;
}
private String formatOrderNo(String orderNo) {
// 统一订单号格式:ORD20240228001
return "ORD" + orderNo;
}
private String formatPhone(String phone) {
// 统一手机号格式:去除空格和横线
return phone.replaceAll("[\\s-]", "");
}
}
数据关联
数据关联是将事实表与维度表进行关联,补充业务信息。
关联方式对比:
| 方式 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 广播关联 | 维度表广播到所有节点 | 实时性好、无外部依赖 | 维度表不能太大 | 小维度表(< 100MB) |
| 异步 IO | 异步查询外部存储 | 支持大维度表 | 增加外部依赖、延迟高 | 大维度表 |
| Temporal Join | 基于时间的关联 | 支持变化维度 | 配置复杂 | 缓慢变化维度 |
| Lookup Join | 查询外部数据源 | 灵活性高 | 性能开销大 | 实时维度查询 |
| Interval Join | 时间区间关联 | 支持乱序数据 | 需要 Watermark | 双流关联 |
广播关联示例:
// 维度表流(用户维度)
DataStream<DimUser> userStream = env
.addSource(new FlinkKafkaConsumer<>("dim_user", ...))
.map(new UserDeserializer());
// 定义广播状态描述符
MapStateDescriptor<String, DimUser> userStateDescriptor =
new MapStateDescriptor<>(
"user-broadcast-state",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(DimUser.class)
);
// 创建广播流
BroadcastStream<DimUser> userBroadcast = userStream
.broadcast(userStateDescriptor);
// 事实表流(订单流)
DataStream<Order> orderStream = env
.addSource(new FlinkKafkaConsumer<>("ods_order", ...));
// 关联维度
DataStream<OrderWithUser> enrichedStream = orderStream
.connect(userBroadcast)
.process(new BroadcastProcessFunction<Order, DimUser, OrderWithUser>() {
@Override
public void processElement(Order order, ReadOnlyContext ctx,
Collector<OrderWithUser> out) throws Exception {
// 从广播状态中获取用户信息
ReadOnlyBroadcastState<String, DimUser> state =
ctx.getBroadcastState(userStateDescriptor);
DimUser user = state.get(order.getUserId());
if (user != null) {
OrderWithUser enriched = new OrderWithUser();
enriched.setOrder(order);
enriched.setUser(user);
out.collect(enriched);
} else {
// 维度数据缺失,记录日志或发送到侧输出流
log.warn("User not found: {}", order.getUserId());
}
}
@Override
public void processBroadcastElement(DimUser user, Context ctx,
Collector<OrderWithUser> out) throws Exception {
// 更新广播状态
ctx.getBroadcastState(userStateDescriptor)
.put(user.getUserId(), user);
}
});
异步 IO 关联示例:
// 异步查询 Redis 维度表
public class AsyncRedisLookupFunction
extends RichAsyncFunction<Order, OrderWithUser> {
private transient RedisAsyncCommands<String, String> redisClient;
@Override
public void open(Configuration parameters) {
RedisClient client = RedisClient.create("redis://localhost:6379");
StatefulRedisConnection<String, String> connection = client.connect();
redisClient = connection.async();
}
@Override
public void asyncInvoke(Order order, ResultFuture<OrderWithUser> resultFuture) {
String key = "user:" + order.getUserId();
CompletableFuture<String> future = redisClient.get(key).toCompletableFuture();
future.whenComplete((userJson, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
return;
}
if (userJson != null) {
DimUser user = JSON.parseObject(userJson, DimUser.class);
OrderWithUser result = new OrderWithUser(order, user);
resultFuture.complete(Collections.singleton(result));
} else {
// 维度数据不存在,返回空用户
resultFuture.complete(Collections.singleton(
new OrderWithUser(order, new DimUser())
));
}
});
}
@Override
public void timeout(Order order, ResultFuture<OrderWithUser> resultFuture) {
// 超时处理
log.warn("Async lookup timeout: {}", order.getOrderId());
resultFuture.complete(Collections.singleton(
new OrderWithUser(order, new DimUser())
));
}
@Override
public void close() throws Exception {
if (redisClient != null) {
redisClient.getStatefulConnection().close();
}
}
}
// 使用异步 IO
DataStream<OrderWithUser> enrichedStream = AsyncDataStream
.unorderedWait(
orderStream,
new AsyncRedisLookupFunction(),
5000, // 超时时间 5 秒
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);
Interval Join 示例:
// 订单流
DataStream<Order> orderStream = env
.addSource(new FlinkKafkaConsumer<>("orders", ...))
.assignTimestampsAndWatermarks(...);
// 支付流
DataStream<Payment> paymentStream = env
.addSource(new FlinkKafkaConsumer<>("payments", ...))
.assignTimestampsAndWatermarks(...);
// Interval Join:订单和支付在 10 分钟内关联
DataStream<OrderPayment> joinedStream = orderStream
.keyBy(Order::getOrderId)
.intervalJoin(paymentStream.keyBy(Payment::getOrderId))
.between(Time.minutes(-10), Time.minutes(10)) // 时间区间
.process(new ProcessJoinFunction<Order, Payment, OrderPayment>() {
@Override
public void processElement(Order order, Payment payment, Context ctx,
Collector<OrderPayment> out) {
OrderPayment result = new OrderPayment();
result.setOrder(order);
result.setPayment(payment);
out.collect(result);
}
});
数据聚合
数据聚合是实时数仓的核心计算逻辑,包括窗口聚合、增量聚合等。
窗口类型对比:
| 窗口类型 | 说明 | 特点 | 适用场景 |
|---|---|---|---|
| 滚动窗口 | 固定大小,不重叠 | 简单、高效 | 每分钟统计 |
| 滑动窗口 | 固定大小,可重叠 | 平滑、连续 | 最近 5 分钟 |
| 会话窗口 | 基于活动间隔 | 动态大小 | 用户会话 |
| 全局窗口 | 自定义触发器 | 灵活 | 特殊场景 |
窗口聚合示例:
// 滚动窗口:每分钟统计订单数和金额
DataStream<OrderStats> tumblingStats = orderStream
.keyBy(Order::getProvince)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Order, OrderAccumulator, OrderStats>() {
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator();
}
@Override
public OrderAccumulator add(Order order, OrderAccumulator acc) {
acc.count++;
acc.totalAmount = acc.totalAmount.add(order.getAmount());
acc.userSet.add(order.getUserId());
return acc;
}
@Override
public OrderStats getResult(OrderAccumulator acc) {
return new OrderStats(
acc.count,
acc.userSet.size(),
acc.totalAmount,
acc.totalAmount.divide(BigDecimal.valueOf(acc.count), 2, RoundingMode.HALF_UP)
);
}
@Override
public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
a.count += b.count;
a.totalAmount = a.totalAmount.add(b.totalAmount);
a.userSet.addAll(b.userSet);
return a;
}
});
// 滑动窗口:最近 5 分钟,每分钟更新一次
DataStream<OrderStats> slidingStats = orderStream
.keyBy(Order::getProvince)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new OrderAggregateFunction());
// 会话窗口:30 分钟无活动则结束会话
DataStream<SessionStats> sessionStats = behaviorStream
.keyBy(Behavior::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregateFunction());
// 计数窗口:每 100 条数据触发一次
DataStream<OrderStats> countStats = orderStream
.keyBy(Order::getUserId)
.countWindow(100)
.aggregate(new OrderAggregateFunction());
窗口计算
窗口触发器:
// 自定义触发器:每 10 秒或 100 条数据触发一次
public class CustomTrigger extends Trigger<Order, TimeWindow> {
private final long maxCount = 100;
private final long maxTime = 10000; // 10 秒
@Override
public TriggerResult onElement(Order element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
// 注册定时器
ctx.registerEventTimeTimer(window.maxTimestamp());
// 获取计数状态
ValueState<Long> countState = ctx.getPartitionedState(
new ValueStateDescriptor<>("count", Long.class, 0L)
);
long count = countState.value() + 1;
countState.update(count);
// 达到最大计数,触发计算
if (count >= maxCount) {
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
// 注册处理时间定时器
long fireTime = ctx.getCurrentProcessingTime() + maxTime;
ctx.registerProcessingTimeTimer(fireTime);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}
// 使用自定义触发器
DataStream<OrderStats> customStats = orderStream
.keyBy(Order::getProvince)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.trigger(new CustomTrigger())
.aggregate(new OrderAggregateFunction());
增量聚合 vs 全量聚合:
// 增量聚合:AggregateFunction(推荐)
window.aggregate(new AggregateFunction<Order, OrderAccumulator, OrderStats>() {
// 来一条处理一条,内存占用小
});
// 全量聚合:ProcessWindowFunction
window.process(new ProcessWindowFunction<Order, OrderStats, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<Order> elements, Collector<OrderStats> out) {
// 窗口结束时,elements 包含窗口内所有数据
// 内存占用大,但可以访问窗口元数据
List<Order> orders = StreamSupport
.stream(elements.spliterator(), false)
.collect(Collectors.toList());
// 复杂计算
OrderStats stats = calculateComplexStats(orders);
stats.setWindowStart(context.window().getStart());
stats.setWindowEnd(context.window().getEnd());
out.collect(stats);
}
});
// 增量 + 全量:先增量聚合,再全量处理(最佳实践)
window.aggregate(
new OrderAggregateFunction(), // 增量聚合
new ProcessWindowFunction<OrderStats, OrderStats, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<OrderStats> elements, Collector<OrderStats> out) {
// 增量聚合的结果
OrderStats stats = elements.iterator().next();
// 添加窗口元数据
stats.setWindowStart(context.window().getStart());
stats.setWindowEnd(context.window().getEnd());
out.collect(stats);
}
}
);
实时数仓性能优化
Flink 性能调优
Flink 性能调优是实时数仓优化的核心,涉及并行度、内存、状态、Checkpoint 等多个方面。
核心调优参数:
| 参数类别 | 参数名 | 推荐值 | 说明 |
|---|---|---|---|
| 并行度 | parallelism.default | CPU 核数 * 2 | 默认并行度 |
| 内存 | taskmanager.memory.process.size | 8-16GB | TaskManager 总内存 |
| 内存 | taskmanager.memory.managed.fraction | 0.4 | 托管内存比例 |
| Checkpoint | execution.checkpointing.interval | 60s-300s | Checkpoint 间隔 |
| Checkpoint | execution.checkpointing.timeout | 10min | Checkpoint 超时 |
| 状态后端 | state.backend | rocksdb | 状态后端类型 |
| 网络 | taskmanager.network.memory.fraction | 0.1-0.2 | 网络缓冲区比例 |
1. 并行度优化
// 全局并行度
env.setParallelism(16);
// 算子级并行度
DataStream<Order> stream = env
.addSource(new FlinkKafkaConsumer<>(...))
.setParallelism(32) // Source 并行度 = Kafka 分区数
.map(new OrderMapper())
.setParallelism(16) // Map 并行度
.keyBy(Order::getUserId)
.window(...)
.aggregate(new OrderAggregateFunction())
.setParallelism(8); // 聚合并行度
// Sink 并行度
stream.addSink(new ClickHouseSink<>(...))
.setParallelism(4); // 根据下游承载能力设置
并行度设置原则:
- Source 并行度 = Kafka 分区数
- 计算密集型算子:并行度 = CPU 核数 * 2
- IO 密集型算子:并行度可适当增大
- Sink 并行度:根据下游承载能力设置
- 避免数据倾斜:使用 rebalance() 或自定义分区器
2. 内存优化
# flink-conf.yaml
taskmanager.memory.process.size: 16g
taskmanager.memory.flink.size: 14g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.15
# JVM 参数优化
env.java.opts: -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails
内存分配建议:
- TaskManager 总内存:8-16GB
- 托管内存(RocksDB):40%
- 网络缓冲区:10-20%
- 堆内存:剩余部分
- JVM 开销:1-2GB
3. Checkpoint 优化
// Checkpoint 配置
env.enableCheckpointing(60000); // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 最小间隔
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟超时
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大并发数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 容忍失败次数
// 增量 Checkpoint(RocksDB)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 启用增量
// Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");
// 外部化 Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 本地恢复
env.getCheckpointConfig().enableLocalRecovery(true);
Checkpoint 调优建议:
- 间隔时间:根据数据量和状态大小调整,一般 1-5 分钟
- 增量 Checkpoint:大状态场景必须启用
- 异步快照:默认启用,提升性能
- 本地恢复:启用本地状态恢复加速重启
- 压缩:启用状态压缩减少存储
4. 状态优化
// 状态 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // 24 小时过期
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot() // 全量快照时清理
.cleanupIncrementally(10, true) // 增量清理
.build();
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("state", String.class);
descriptor.enableTimeToLive(ttlConfig);
// RocksDB 调优
RocksDBStateBackend backend = new RocksDBStateBackend("hdfs://...", true);
backend.setNumberOfTransferThreads(4);
// RocksDB 配置
OptionsFactory optionsFactory = new OptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions
.setMaxBackgroundJobs(4)
.setMaxOpenFiles(-1);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return currentOptions
.setCompactionStyle(CompactionStyle.LEVEL)
.setWriteBufferSize(64 * 1024 * 1024) // 64MB
.setMaxWriteBufferNumber(3);
}
};
backend.setRocksDBOptions(optionsFactory);
5. 反压处理
反压监控:
# 查看反压情况
curl http://jobmanager:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure
# Flink Web UI 查看
# Metrics -> Task -> buffers.outPoolUsage
# 接近 1.0 表示有反压
反压优化策略:
| 原因 | 现象 | 解决方案 |
|---|---|---|
| 下游处理慢 | Sink 反压 | 增加 Sink 并行度、批量写入 |
| 数据倾斜 | 某些 Task 慢 | 重新分区、加盐 |
| GC 频繁 | 内存不足 | 增加内存、优化代码 |
| 网络瓶颈 | 网络缓冲区满 | 增加网络缓冲区 |
| 外部系统慢 | 异步 IO 慢 | 增加并发度、优化查询 |
// 解决数据倾斜:加盐
DataStream<Order> rebalancedStream = skewedStream
.map(order -> {
// 添加随机后缀
String newKey = order.getUserId() + "_" +
ThreadLocalRandom.current().nextInt(10);
order.setUserId(newKey);
return order;
})
.keyBy(Order::getUserId);
// 两阶段聚合
// 第一阶段:加盐聚合
DataStream<OrderStats> stage1 = skewedStream
.map(order -> {
order.setUserId(order.getUserId() + "_" + random.nextInt(10));
return order;
})
.keyBy(Order::getUserId)
.window(...)
.aggregate(...);
// 第二阶段:去盐聚合
DataStream<OrderStats> stage2 = stage1
.map(stats -> {
stats.setUserId(stats.getUserId().split("_")[0]);
return stats;
})
.keyBy(OrderStats::getUserId)
.window(...)
.aggregate(...);
// 批量写入优化
stream.addSink(new RichSinkFunction<Order>() {
private List<Order> buffer = new ArrayList<>();
private static final int BATCH_SIZE = 1000;
@Override
public void invoke(Order order, Context context) {
buffer.add(order);
if (buffer.size() >= BATCH_SIZE) {
flush();
}
}
@Override
public void close() throws Exception {
flush();
}
private void flush() {
if (!buffer.isEmpty()) {
clickHouseClient.batchInsert(buffer);
buffer.clear();
}
}
});
ClickHouse 性能优化
1. 表引擎选择
-- MergeTree:通用场景
CREATE TABLE orders (
order_id String,
user_id String,
amount Decimal(18, 2),
create_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY (user_id, order_id)
SETTINGS index_granularity = 8192;
-- ReplacingMergeTree:去重场景
CREATE TABLE user_profile (
user_id String,
name String,
age UInt8,
update_time DateTime
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY user_id;
-- SummingMergeTree:预聚合场景
CREATE TABLE order_stats (
date Date,
province String,
order_count UInt64,
total_amount Decimal(18, 2)
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province);
2. 分区设计
-- 按月分区(推荐)
PARTITION BY toYYYYMM(create_time)
-- 按天分区(数据量大时)
PARTITION BY toYYYYMMDD(create_time)
-- 按小时分区(实时场景)
PARTITION BY toYYYYMMDDhh(create_time)
-- 多级分区
PARTITION BY (toYYYYMM(create_time), province)
-- 查看分区
SELECT partition, name, rows, bytes_on_disk
FROM system.parts
WHERE table = 'orders' AND active
ORDER BY partition DESC;
-- 删除旧分区
ALTER TABLE orders DROP PARTITION '202401';
-- 优化分区(合并小文件)
OPTIMIZE TABLE orders PARTITION '202402';
分区设计原则:
- 分区数量:建议 100-1000 个
- 分区粒度:根据查询模式和数据量选择
- 避免过度分区:影响查询性能
- 定期清理:删除过期分区
3. 索引优化
-- 主键索引(ORDER BY)
ORDER BY (user_id, create_time)
-- 跳数索引(Skip Index)
-- minmax:数值范围查询
ALTER TABLE orders ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;
-- set:低基数字段
ALTER TABLE orders ADD INDEX idx_province province TYPE set(100) GRANULARITY 4;
-- bloom_filter:等值查询
ALTER TABLE orders ADD INDEX idx_status status TYPE bloom_filter GRANULARITY 4;
-- ngrambf_v1:模糊查询
ALTER TABLE orders ADD INDEX idx_user_name user_name TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4;
-- 物化列(Materialized Column)
ALTER TABLE orders ADD COLUMN order_date Date MATERIALIZED toDate(create_time);
-- 物化视图(Materialized View)
CREATE MATERIALIZED VIEW order_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province)
AS SELECT
toDate(create_time) AS date,
province,
count() AS order_count,
sum(amount) AS total_amount
FROM orders
GROUP BY date, province;
4. 查询优化
-- 使用 PREWHERE 过滤(比 WHERE 快)
SELECT *
FROM orders
PREWHERE create_time >= '2024-01-01'
WHERE amount > 100;
-- 避免 SELECT *
SELECT order_id, amount, create_time
FROM orders;
-- 使用 FINAL 去重(ReplacingMergeTree)
SELECT *
FROM user_profile
FINAL
WHERE user_id = 'user123';
-- 分布式查询优化
SELECT province, sum(amount)
FROM orders
WHERE create_time >= today()
GROUP BY province
SETTINGS distributed_aggregation_memory_efficient = 1;
-- 使用物化视图
SELECT date, province, order_count, total_amount
FROM order_stats_mv
WHERE date >= '2024-01-01';
-- 并行查询
SELECT *
FROM orders
SETTINGS max_threads = 8;
5. 写入优化
-- 批量写入(推荐 10000-100000 行)
INSERT INTO orders VALUES
('order1', 'user1', 100.00, '2024-01-01 10:00:00'),
('order2', 'user2', 200.00, '2024-01-01 10:01:00'),
-- ... 更多数据
;
-- 异步写入
INSERT INTO orders SETTINGS async_insert = 1, wait_for_async_insert = 0
VALUES (...);
-- 并行写入(多个客户端同时写入不同分区)
-- 写入性能监控
SELECT
table,
sum(rows) AS total_rows,
sum(bytes) AS total_bytes,
formatReadableSize(total_bytes) AS size,
count() AS parts_count
FROM system.parts
WHERE table = 'orders' AND active
GROUP BY table;
Kafka 性能优化
1. 分区设计
# 创建 Topic(32 个分区,3 个副本)
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 32 \
--replication-factor 3
# 增加分区
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 64
分区数量计算:
- 目标吞吐量 / 单分区吞吐量 = 分区数
- 单分区吞吐量:生产 10-30 MB/s,消费 30-50 MB/s
- 建议:16-64 个分区
2. 生产者优化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 性能优化参数
props.put("acks", "1"); // 0: 不等待, 1: leader 确认, all: 所有副本确认
props.put("compression.type", "lz4"); // 压缩算法:lz4/snappy/gzip
props.put("batch.size", 16384); // 批次大小 16KB
props.put("linger.ms", 10); // 等待时间 10ms
props.put("buffer.memory", 33554432); // 缓冲区 32MB
props.put("max.in.flight.requests.per.connection", 5); // 最大未确认请求
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 异步发送
producer.send(new ProducerRecord<>("orders", key, value), (metadata, exception) -> {
if (exception != null) {
// 处理异常
}
});
3. 消费者优化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 性能优化参数
props.put("fetch.min.bytes", 1024); // 最小拉取 1KB
props.put("fetch.max.wait.ms", 500); // 最大等待 500ms
props.put("max.partition.fetch.bytes", 1048576); // 单分区最大 1MB
props.put("max.poll.records", 500); // 单次拉取最大记录数
props.put("enable.auto.commit", false); // 手动提交
props.put("auto.offset.reset", "earliest"); // 从最早开始消费
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
// 手动提交
consumer.commitSync();
}
端到端性能优化
1. 延迟优化
| 环节 | 优化措施 | 效果 |
|---|---|---|
| Kafka | 增加分区数、优化网络配置 | 延迟降低 50% |
| Flink | 调整并行度、优化算子 | 延迟降低 30% |
| 存储 | 批量写入、异步写入 | 延迟降低 40% |
| 查询 | 添加索引、物化视图 | 延迟降低 60% |
2. 吞吐优化
| 环节 | 优化措施 | 效果 |
|---|---|---|
| Kafka | 增加分区数、批量发送 | 吞吐提升 2 倍 |
| Flink | 增加并行度、批量处理 | 吞吐提升 3 倍 |
| 存储 | 批量写入、并行写入 | 吞吐提升 5 倍 |
3. 资源优化
| 资源类型 | 优化措施 | 效果 |
|---|---|---|
| CPU | 合理设置并行度 | 利用率提升 30% |
| 内存 | 优化状态管理 | 内存占用降低 40% |
| 网络 | 数据压缩传输 | 网络流量降低 50% |
| 存储 | 数据压缩、冷热分离 | 存储成本降低 60% |
性能优化总结表:
| 组件 | 优化项 | 关键参数 | 效果 |
|---|---|---|---|
| Flink | 并行度 | parallelism | 提升吞吐 |
| Flink | Checkpoint | interval, timeout | 降低延迟 |
| Flink | 状态后端 | RocksDB + 增量 | 大状态支持 |
| ClickHouse | 表引擎 | MergeTree 系列 | 查询加速 |
| ClickHouse | 分区 | 按月/天分区 | 数据管理 |
| ClickHouse | 批量写入 | 10000+ 行 | 写入加速 |
| Kafka | 分区数 | 32-64 | 并行度 |
| Kafka | 压缩 | lz4/snappy | 降低网络 |
| Kafka | 批量发送 | batch.size | 提升吞吐 |
实时数仓监控与运维
监控指标体系
实时数仓监控指标分为系统指标、业务指标和数据质量指标三大类。
核心监控指标表:
| 类别 | 指标 | 阈值 | 告警级别 | 说明 |
|---|---|---|---|---|
| Flink | Task 失败率 | > 1% | P0 | 任务异常 |
| Flink | Checkpoint 失败率 | > 5% | P1 | 状态异常 |
| Flink | 反压率 | > 80% | P1 | 性能瓶颈 |
| Flink | 数据延迟 | > 5min | P1 | 实时性下降 |
| Kafka | 消息堆积 | > 100万 | P1 | 消费滞后 |
| Kafka | 磁盘使用率 | > 80% | P1 | 存储不足 |
| ClickHouse | 查询延迟 | > 3s | P2 | 查询慢 |
| ClickHouse | 写入失败率 | > 1% | P0 | 数据丢失 |
| 业务 | 数据断流 | > 5min | P0 | 数据中断 |
| 业务 | 数据重复率 | > 0.1% | P1 | 数据质量 |
Flink 监控指标采集:
// 自定义 Metrics
public class OrderProcessFunction extends ProcessFunction<Order, Order> {
private transient Counter processedCounter;
private transient Meter processingRate;
private transient Histogram amountHistogram;
private transient Gauge<Long> delayGauge;
@Override
public void open(Configuration parameters) {
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
// Counter:计数器
processedCounter = metricGroup.counter("processed_orders");
// Meter:速率
processingRate = metricGroup.meter("processing_rate", new MeterView(60));
// Histogram:分布
amountHistogram = metricGroup.histogram("order_amount",
new DescriptiveStatisticsHistogram(1000));
// Gauge:实时值
delayGauge = metricGroup.gauge("processing_delay", () -> {
return System.currentTimeMillis() - lastProcessTime;
});
}
@Override
public void processElement(Order order, Context ctx, Collector<Order> out) {
processedCounter.inc();
processingRate.markEvent();
amountHistogram.update(order.getAmount().longValue());
out.collect(order);
}
}
// Prometheus Reporter 配置
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Prometheus + Grafana 监控:
# prometheus.yml
scrape_configs:
- job_name: 'flink'
static_configs:
- targets: ['flink-jobmanager:9249', 'flink-taskmanager:9249']
- job_name: 'kafka'
static_configs:
- targets: ['kafka-broker:9308']
- job_name: 'clickhouse'
static_configs:
- targets: ['clickhouse:9363']
Grafana 关键面板:
-- Flink 任务状态
flink_jobmanager_job_uptime{job_name="order-processing"}
-- Flink 数据处理速率
rate(flink_taskmanager_job_task_operator_numRecordsIn[1m])
-- Flink Checkpoint 时长
flink_jobmanager_job_lastCheckpointDuration
-- Kafka 消息堆积
kafka_consumergroup_lag{group="order-consumer"}
-- ClickHouse 查询 QPS
rate(ClickHouseProfileEvents_Query[1m])
告警机制设计
告警规则配置:
# alertmanager.yml
groups:
- name: flink_alerts
rules:
- alert: FlinkTaskFailed
expr: flink_jobmanager_job_numRestarts > 3
for: 5m
labels:
severity: critical
annotations:
summary: "Flink 任务频繁重启"
description: "任务 {{ $labels.job_name }} 在 5 分钟内重启超过 3 次"
- alert: FlinkCheckpointFailed
expr: rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Flink Checkpoint 失败率过高"
description: "任务 {{ $labels.job_name }} Checkpoint 失败率 > 10%"
- alert: FlinkBackpressure
expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 800
for: 10m
labels:
severity: warning
annotations:
summary: "Flink 任务反压"
description: "任务 {{ $labels.job_name }} 反压率 > 80%"
- name: kafka_alerts
rules:
- alert: KafkaConsumerLag
expr: kafka_consumergroup_lag > 1000000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 消息堆积"
description: "消费者组 {{ $labels.group }} 堆积超过 100 万条"
- alert: KafkaDiskUsage
expr: kafka_server_BrokerTopicMetrics_BytesInPerSec > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka 磁盘使用率过高"
description: "Broker {{ $labels.broker }} 磁盘使用率 > 80%"
- name: clickhouse_alerts
rules:
- alert: ClickHouseQuerySlow
expr: histogram_quantile(0.95, rate(ClickHouseProfileEvents_Query[5m])) > 3
for: 5m
labels:
severity: warning
annotations:
summary: "ClickHouse 查询慢"
description: "P95 查询延迟 > 3 秒"
故障排查手册
常见故障及排查方法:
1. Flink 任务频繁重启
# 查看任务日志
kubectl logs -f flink-taskmanager-xxx
# 查看异常堆栈
grep "Exception" flink-taskmanager.log | tail -100
# 查看 Checkpoint 失败原因
curl http://jobmanager:8081/jobs/<job-id>/checkpoints
常见原因:
- OOM:增加内存或优化代码
- 网络超时:增加超时时间
- 外部依赖故障:增加重试机制
- 数据倾斜:重新分区
2. Kafka 消息堆积
# 查看消费者组延迟
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-consumer
# 查看 Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
解决方案:
- 增加消费者并行度
- 优化消费逻辑
- 增加 Kafka 分区数
- 临时增加消费者实例
3. ClickHouse 查询慢
-- 查看慢查询
SELECT
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE query_duration_ms > 3000
ORDER BY query_duration_ms DESC
LIMIT 10;
-- 查看正在执行的查询
SELECT
query_id,
user,
query,
elapsed,
read_rows,
memory_usage
FROM system.processes;
-- Kill 慢查询
KILL QUERY WHERE query_id = 'xxx';
优化方案:
- 添加 PREWHERE 过滤
- 使用物化视图
- 优化分区裁剪
- 增加索引
4. 数据延迟过高
排查流程:
- 检查 Kafka 堆积情况
- 检查 Flink 反压情况
- 检查 Sink 写入性能
- 检查网络和外部依赖
端到端延迟监控:
// 在数据中添加时间戳
public class TimestampedOrder {
private Order order;
private long sourceTimestamp; // 数据产生时间
private long kafkaTimestamp; // Kafka 接收时间
private long processTimestamp; // Flink 处理时间
private long sinkTimestamp; // Sink 写入时间
}
// 计算各阶段延迟
public class LatencyMonitorFunction extends ProcessFunction<Order, Order> {
private transient Histogram kafkaLatency;
private transient Histogram processLatency;
private transient Histogram sinkLatency;
@Override
public void processElement(Order order, Context ctx, Collector<Order> out) {
long now = System.currentTimeMillis();
// Kafka 延迟
kafkaLatency.update(now - order.getKafkaTimestamp());
// 处理延迟
processLatency.update(now - order.getSourceTimestamp());
order.setProcessTimestamp(now);
out.collect(order);
}
}
数据质量保障
数据质量保障是实时数仓的重要环节,包括数据校验、数据对账、数据修复。
1. 数据校验
// 数据完整性校验
public class DataValidationFunction extends ProcessFunction<Order, Order> {
private transient Counter validCounter;
private transient Counter invalidCounter;
@Override
public void processElement(Order order, Context ctx, Collector<Order> out) {
List<String> errors = new ArrayList<>();
// 必填字段校验
if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
errors.add("订单 ID 为空");
}
// 数值范围校验
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
errors.add("订单金额必须大于 0");
}
// 枚举值校验
if (!Arrays.asList("PENDING", "PAID", "CANCELLED").contains(order.getStatus())) {
errors.add("订单状态非法");
}
// 时间合理性校验
if (order.getCreateTime().isAfter(LocalDateTime.now())) {
errors.add("订单时间不能是未来时间");
}
if (errors.isEmpty()) {
validCounter.inc();
out.collect(order);
} else {
invalidCounter.inc();
log.error("数据校验失败: orderId={}, errors={}",
order.getOrderId(), errors);
ctx.output(errorOutputTag, order);
}
}
}
2. 数据对账
-- 实时数仓 vs 离线数仓对账
-- 实时数仓(ClickHouse)
SELECT
toDate(create_time) AS date,
count() AS order_count,
sum(amount) AS total_amount
FROM dwd_order
WHERE date = today()
GROUP BY date;
-- 离线数仓(Hive)
SELECT
dt AS date,
count(*) AS order_count,
sum(amount) AS total_amount
FROM dwd_order
WHERE dt = '${date}'
GROUP BY dt;
-- 对账差异分析
SELECT
a.date,
a.order_count AS realtime_count,
b.order_count AS batch_count,
a.order_count - b.order_count AS diff_count,
(a.order_count - b.order_count) * 100.0 / b.order_count AS diff_rate
FROM realtime_stats a
JOIN batch_stats b ON a.date = b.date
WHERE abs(diff_rate) > 1; -- 差异超过 1%
3. 数据修复
// 数据回溯修复
public class DataRepairJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kafka 指定 offset 开始消费
Map<TopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new TopicPartition("orders", 0), 1000000L);
specificStartOffsets.put(new TopicPartition("orders", 1), 1000000L);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"orders",
new SimpleStringSchema(),
properties
);
consumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<Order> stream = env
.addSource(consumer)
.map(new OrderMapper())
.filter(order -> {
// 只处理需要修复的数据
return order.getCreateTime().isAfter(repairStartTime) &&
order.getCreateTime().isBefore(repairEndTime);
});
// 重新处理并写入
stream.addSink(new ClickHouseSink<>(...));
env.execute("Data Repair Job");
}
}
运维自动化
运维最佳实践:
| 实践 | 说明 | 工具 |
|---|---|---|
| 监控告警 | 7x24 小时监控,及时告警 | Prometheus + Grafana + AlertManager |
| 日志收集 | 集中式日志管理 | ELK/Loki |
| 链路追踪 | 端到端链路追踪 | Jaeger/Zipkin |
| 自动化运维 | 自动扩缩容、故障自愈 | Kubernetes + Operator |
| 灰度发布 | 降低发布风险 | Argo Rollouts |
| 数据备份 | 定期备份 Checkpoint 和数据 | HDFS/S3 |
| 应急预案 | 故障应急流程 | Runbook |
自动化运维脚本示例:
#!/bin/bash
# Flink 任务健康检查脚本
FLINK_REST_URL="http://flink-jobmanager:8081"
ALERT_WEBHOOK="https://hooks.dingtalk.com/robot/send?access_token=xxx"
# 获取所有运行中的任务
jobs=$(curl -s "${FLINK_REST_URL}/jobs/overview" | jq -r '.jobs[] | select(.state=="RUNNING")')
if [ -z "$jobs" ]; then
echo "警告:没有运行中的 Flink 任务"
send_alert "Flink 任务全部停止"
exit 1
fi
# 检查每个任务的 Checkpoint 状态
echo "$jobs" | jq -r '.jid' | while read job_id; do
job_name=$(echo "$jobs" | jq -r "select(.jid==\"$job_id\") | .name")
# 获取最近的 Checkpoint 信息
checkpoint_info=$(curl -s "${FLINK_REST_URL}/jobs/${job_id}/checkpoints")
# 检查最近 Checkpoint 是否成功
last_status=$(echo "$checkpoint_info" | jq -r '.latest.completed.status // "NONE"')
if [ "$last_status" != "COMPLETED" ]; then
echo "警告:任务 ${job_name} 最近 Checkpoint 失败"
send_alert "任务 ${job_name} Checkpoint 异常: ${last_status}"
fi
# 检查 Checkpoint 时长
last_duration=$(echo "$checkpoint_info" | jq -r '.latest.completed.duration // 0')
if [ "$last_duration" -gt 300000 ]; then # 超过 5 分钟
echo "警告:任务 ${job_name} Checkpoint 耗时过长: ${last_duration}ms"
send_alert "任务 ${job_name} Checkpoint 耗时 ${last_duration}ms"
fi
done
# 检查 TaskManager 存活
tm_count=$(curl -s "${FLINK_REST_URL}/taskmanagers" | jq '.taskmanagers | length')
expected_tm=8
if [ "$tm_count" -lt "$expected_tm" ]; then
echo "警告:TaskManager 数量不足: ${tm_count}/${expected_tm}"
send_alert "TaskManager 数量不足: ${tm_count}/${expected_tm}"
fi
echo "健康检查完成: $(date)"
Flink 任务自动恢复:
// Kubernetes Operator 自动恢复配置
@Component
public class FlinkJobRecoveryService {
@Autowired
private FlinkRestClient flinkClient;
@Autowired
private KubernetesClient k8sClient;
// 定期检查任务状态
@Scheduled(fixedRate = 30000) // 每 30 秒
public void checkAndRecover() {
List<FlinkJob> jobs = flinkClient.listJobs();
for (FlinkJob job : jobs) {
if ("FAILED".equals(job.getState())) {
log.warn("检测到失败任务: {}", job.getName());
// 获取最近的 Savepoint
String savepointPath = flinkClient.getLatestSavepoint(job.getJobId());
if (savepointPath != null) {
// 从 Savepoint 恢复
log.info("从 Savepoint 恢复任务: {}, path: {}",
job.getName(), savepointPath);
flinkClient.restoreFromSavepoint(job.getJobId(), savepointPath);
} else {
// 从 Checkpoint 恢复
String checkpointPath = flinkClient.getLatestCheckpoint(job.getJobId());
if (checkpointPath != null) {
log.info("从 Checkpoint 恢复任务: {}", job.getName());
flinkClient.restoreFromCheckpoint(job.getJobId(), checkpointPath);
} else {
log.error("无法恢复任务: {},无可用的 Savepoint/Checkpoint",
job.getName());
alertService.sendCriticalAlert("任务无法自动恢复: " + job.getName());
}
}
}
}
}
}
日志收集与分析(Loki + Grafana):
# promtail 配置 - 采集 Flink 日志
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: flink-logs
static_configs:
- targets:
- localhost
labels:
job: flink
__path__: /opt/flink/log/*.log
pipeline_stages:
- regex:
expression: '(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<message>.*)'
- labels:
level:
- timestamp:
source: timestamp
format: '2006-01-02 15:04:05,000'
实时数仓实战案例
电商实时大屏
**业务场景:**双十一实时大屏,展示实时 GMV、订单量、UV 等核心指标。
技术架构:
- 数据采集:Canal CDC
- 消息队列:Kafka
- 实时计算:Flink
- 存储:ClickHouse(明细)+ Redis(实时指标)
- 展示:DataV/Grafana
核心指标计算:
// 实时 GMV 计算
public class RealtimeGMVJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取订单流
DataStream<Order> orderStream = env
.addSource(new FlinkKafkaConsumer<>("ods_order", ...))
.filter(order -> "PAID".equals(order.getStatus()));
// 1分钟滚动窗口聚合
DataStream<GMVStats> gmvStream = orderStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreateTime())
)
.keyBy(order -> "global") // 全局聚合
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Order, GMVAccumulator, GMVStats>() {
@Override
public GMVAccumulator createAccumulator() {
return new GMVAccumulator();
}
@Override
public GMVAccumulator add(Order order, GMVAccumulator acc) {
acc.orderCount++;
acc.totalAmount = acc.totalAmount.add(order.getAmount());
acc.userSet.add(order.getUserId());
return acc;
}
@Override
public GMVStats getResult(GMVAccumulator acc) {
return GMVStats.builder()
.windowEnd(System.currentTimeMillis())
.gmv(acc.totalAmount)
.orderCount(acc.orderCount)
.userCount(acc.userSet.size())
.avgOrderAmount(acc.totalAmount.divide(
BigDecimal.valueOf(acc.orderCount), 2, RoundingMode.HALF_UP))
.build();
}
@Override
public GMVAccumulator merge(GMVAccumulator a, GMVAccumulator b) {
a.orderCount += b.orderCount;
a.totalAmount = a.totalAmount.add(b.totalAmount);
a.userSet.addAll(b.userSet);
return a;
}
});
// 写入 Redis
gmvStream.addSink(new RedisSink<>(...));
env.execute("Realtime GMV Job");
}
}
大屏 API 接口:
@RestController
@RequestMapping("/api/realtime")
public class RealtimeDashboardController {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ClickHouseJdbcTemplate clickHouse;
// 实时 GMV
@GetMapping("/gmv")
public GMVStats getRealtimeGMV() {
String json = redisTemplate.opsForValue().get("realtime:gmv:latest");
return JSON.parseObject(json, GMVStats.class);
}
// 实时订单趋势(最近 1 小时)
@GetMapping("/order-trend")
public List<OrderTrend> getOrderTrend() {
String sql = """
SELECT
toStartOfMinute(create_time) AS time,
count() AS order_count,
sum(amount) AS total_amount
FROM dwd_order
WHERE create_time >= now() - INTERVAL 1 HOUR
GROUP BY time
ORDER BY time
""";
return clickHouse.query(sql, new OrderTrendRowMapper());
}
// TOP 省份排行
@GetMapping("/top-provinces")
public List<ProvinceStats> getTopProvinces(@RequestParam(defaultValue = "10") int limit) {
String sql = """
SELECT
province,
count() AS order_count,
sum(amount) AS total_amount
FROM dwd_order
WHERE create_time >= today()
GROUP BY province
ORDER BY total_amount DESC
LIMIT ?
""";
return clickHouse.query(sql, new ProvinceStatsRowMapper(), limit);
}
}
实时风控系统
**业务场景:**实时识别异常交易、欺诈行为,毫秒级响应。
风控规则:
| 规则 | 描述 | 阈值 | 动作 |
|---|---|---|---|
| 高频交易 | 1 分钟内交易次数 | > 10 次 | 拦截 |
| 大额交易 | 单笔交易金额 | > 10000 元 | 人工审核 |
| 异地登录 | 短时间内异地登录 | < 1 小时 | 二次验证 |
| 设备异常 | 新设备首次交易 | - | 短信验证 |
| 黑名单 | 命中黑名单 | - | 拒绝 |
实时风控实现:
public class RealTimeRiskControlJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取交易流
DataStream<Transaction> txnStream = env
.addSource(new FlinkKafkaConsumer<>("transactions", ...));
// 规则 1:高频交易检测
DataStream<RiskAlert> highFreqAlerts = txnStream
.keyBy(Transaction::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
.aggregate(new CountAggregateFunction())
.filter(count -> count > 10)
.map(count -> RiskAlert.builder()
.userId(count.getUserId())
.riskType("HIGH_FREQUENCY")
.riskLevel("HIGH")
.action("BLOCK")
.build());
// 规则 2:大额交易检测
DataStream<RiskAlert> largeAmountAlerts = txnStream
.filter(txn -> txn.getAmount().compareTo(new BigDecimal("10000")) > 0)
.map(txn -> RiskAlert.builder()
.userId(txn.getUserId())
.transactionId(txn.getTxnId())
.riskType("LARGE_AMOUNT")
.riskLevel("MEDIUM")
.action("MANUAL_REVIEW")
.build());
// 规则 3:异地登录检测(基于状态)
DataStream<RiskAlert> locationAlerts = txnStream
.keyBy(Transaction::getUserId)
.process(new LocationCheckFunction());
// 规则 4:黑名单检测(异步 IO)
DataStream<RiskAlert> blacklistAlerts = AsyncDataStream
.unorderedWait(
txnStream,
new AsyncBlacklistCheckFunction(),
1000,
TimeUnit.MILLISECONDS
)
.filter(Objects::nonNull);
// 合并所有告警
DataStream<RiskAlert> allAlerts = highFreqAlerts
.union(largeAmountAlerts)
.union(locationAlerts)
.union(blacklistAlerts);
// 写入 Redis(实时查询)
allAlerts.addSink(new RedisSink<>(...));
// 写入 Kafka(下游处理)
allAlerts.addSink(new FlinkKafkaProducer<>("risk-alerts", ...));
env.execute("Real-Time Risk Control Job");
}
}
实时推荐系统
**业务场景:**根据用户实时行为,动态更新用户画像,实时推荐商品。
实时特征计算:
public class RealtimeFeatureJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取用户行为流
DataStream<UserBehavior> behaviorStream = env
.addSource(new FlinkKafkaConsumer<>("user-behavior", ...));
// 实时特征计算
DataStream<UserFeature> featureStream = behaviorStream
.keyBy(UserBehavior::getUserId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new AggregateFunction<UserBehavior, FeatureAccumulator, UserFeature>() {
@Override
public FeatureAccumulator createAccumulator() {
return new FeatureAccumulator();
}
@Override
public FeatureAccumulator add(UserBehavior behavior, FeatureAccumulator acc) {
// 统计各类行为次数
switch (behavior.getType()) {
case "VIEW":
acc.viewCount++;
acc.viewedCategories.add(behavior.getCategoryId());
break;
case "CLICK":
acc.clickCount++;
break;
case "CART":
acc.cartCount++;
break;
case "BUY":
acc.buyCount++;
acc.totalAmount = acc.totalAmount.add(behavior.getAmount());
break;
}
return acc;
}
@Override
public UserFeature getResult(FeatureAccumulator acc) {
return UserFeature.builder()
.userId(acc.userId)
.viewCount(acc.viewCount)
.clickCount(acc.clickCount)
.cartCount(acc.cartCount)
.buyCount(acc.buyCount)
.totalAmount(acc.totalAmount)
.clickRate(acc.clickCount * 1.0 / acc.viewCount)
.conversionRate(acc.buyCount * 1.0 / acc.clickCount)
.preferredCategories(new ArrayList<>(acc.viewedCategories))
.build();
}
@Override
public FeatureAccumulator merge(FeatureAccumulator a, FeatureAccumulator b) {
a.viewCount += b.viewCount;
a.clickCount += b.clickCount;
a.cartCount += b.cartCount;
a.buyCount += b.buyCount;
a.totalAmount = a.totalAmount.add(b.totalAmount);
a.viewedCategories.addAll(b.viewedCategories);
return a;
}
});
// 写入 HBase(用户画像)
featureStream.addSink(new HBaseSink<>(...));
// 写入 Redis(实时查询)
featureStream.addSink(new RedisSink<>(...));
env.execute("Realtime Feature Job");
}
}
加密货币实时行情数仓(Kafka + Flink + Doris)
**业务场景:**构建加密货币实时行情数据仓库,支持实时行情展示、技术指标计算、交易信号生成、风险监控等功能。
业务需求:
| 需求类型 | 具体需求 | 延迟要求 | 数据量 |
|---|---|---|---|
| 实时行情 | 价格、成交量、涨跌幅实时展示 | < 1 秒 | 10 万笔/秒 |
| 技术指标 | MA、EMA、MACD、RSI、布林带 | < 5 秒 | 计算密集 |
| 交易信号 | 金叉死叉、突破信号、背离信号 | < 3 秒 | 实时触发 |
| 风险监控 | 异常波动、大额交易、流动性监控 | < 2 秒 | 实时告警 |
| 历史回测 | 策略回测、绩效分析 | 分钟级 | TB 级历史数据 |
技术架构:
实时行情"] A2["OKX WebSocket
实时行情"] A3["Coinbase WebSocket
实时行情"] A4["历史数据 API
补全数据"] end subgraph "数据采集层" B1["Flink Source
WebSocket 连接器"] B2["数据标准化
统一格式"] end subgraph "消息队列层" C1["Kafka Topic
ods_crypto_tick
原始 Tick 数据"] C2["Kafka Topic
ods_crypto_kline
K 线数据"] C3["Kafka Topic
ods_crypto_trade
成交数据"] end subgraph "实时计算层 Flink" D1["DWD 层
数据清洗
去重、过滤"] D2["DWS 层
K 线聚合
1m/5m/15m/1h"] D3["DWS 层
技术指标
MA/EMA/MACD/RSI"] D4["ADS 层
交易信号
金叉死叉/突破"] D5["ADS 层
风险监控
异常检测"] end subgraph "存储层" E1["Doris
实时 OLAP
多维分析"] E2["Redis
实时行情
最新价格"] E3["ClickHouse
历史数据
冷数据归档"] end subgraph "应用层" F1["实时行情大屏
价格/成交量"] F2["交易策略系统
信号生成"] F3["风控告警系统
异常监控"] F4["数据分析平台
回测/研究"] end A1 --> B1 A2 --> B1 A3 --> B1 A4 --> B2 B1 --> C1 B1 --> C2 B1 --> C3 B2 --> C1 C1 --> D1 C2 --> D1 C3 --> D1 D1 --> D2 D2 --> D3 D3 --> D4 D3 --> D5 D2 --> E1 D3 --> E1 D4 --> E1 D5 --> E2 D2 --> E3 E1 --> F1 E1 --> F2 E2 --> F3 E1 --> F4 E3 --> F4 style C1 fill:#ffd43b style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#ff6b6b
数据模型设计:
ODS 层(原始数据层):
-- Kafka Topic: ods_crypto_tick(Tick 数据)
{
"exchange": "binance", -- 交易所
"symbol": "BTCUSDT", -- 交易对
"timestamp": 1709827200000, -- 时间戳(毫秒)
"price": 68500.50, -- 最新价
"volume": 1.25, -- 成交量
"bid_price": 68500.00, -- 买一价
"ask_price": 68501.00, -- 卖一价
"bid_volume": 5.2, -- 买一量
"ask_volume": 3.8 -- 卖一量
}
-- Kafka Topic: ods_crypto_kline(K 线数据)
{
"exchange": "binance",
"symbol": "BTCUSDT",
"interval": "1m", -- K 线周期
"open_time": 1709827200000,
"close_time": 1709827259999,
"open": 68500.00, -- 开盘价
"high": 68550.00, -- 最高价
"low": 68480.00, -- 最低价
"close": 68520.00, -- 收盘价
"volume": 125.5, -- 成交量
"quote_volume": 8598750.00, -- 成交额
"trades": 1250 -- 成交笔数
}
DWD 层(明细数据层):
-- Doris 表:dwd_crypto_tick_realtime
CREATE TABLE dwd_crypto_tick_realtime (
exchange VARCHAR(20),
symbol VARCHAR(20),
tick_time DATETIME,
price DECIMAL(18, 8),
volume DECIMAL(18, 8),
bid_price DECIMAL(18, 8),
ask_price DECIMAL(18, 8),
spread DECIMAL(18, 8), -- 买卖价差
mid_price DECIMAL(18, 8), -- 中间价
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(exchange, symbol, tick_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD",
"compression" = "LZ4"
);
DWS 层(汇总数据层):
-- Doris 表:dws_crypto_kline_1m(1 分钟 K 线)
CREATE TABLE dws_crypto_kline_1m (
exchange VARCHAR(20),
symbol VARCHAR(20),
kline_time DATETIME,
open_price DECIMAL(18, 8),
high_price DECIMAL(18, 8),
low_price DECIMAL(18, 8),
close_price DECIMAL(18, 8),
volume DECIMAL(18, 8),
quote_volume DECIMAL(18, 8),
trades INT,
-- 技术指标
ma5 DECIMAL(18, 8), -- 5 周期均线
ma10 DECIMAL(18, 8), -- 10 周期均线
ma20 DECIMAL(18, 8), -- 20 周期均线
ema12 DECIMAL(18, 8), -- 12 周期指数移动平均
ema26 DECIMAL(18, 8), -- 26 周期指数移动平均
macd DECIMAL(18, 8), -- MACD 值
signal DECIMAL(18, 8), -- 信号线
histogram DECIMAL(18, 8), -- 柱状图
rsi DECIMAL(18, 8), -- RSI 指标
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(exchange, symbol, kline_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
"replication_num" = "3"
);
-- Doris 物化视图:实时涨跌幅
CREATE MATERIALIZED VIEW mv_crypto_change_24h
AS
SELECT
symbol,
MAX(close_price) as high_24h,
MIN(close_price) as low_24h,
FIRST_VALUE(close_price) as open_24h,
LAST_VALUE(close_price) as close_24h,
(LAST_VALUE(close_price) - FIRST_VALUE(close_price)) / FIRST_VALUE(close_price) * 100 as change_pct_24h
FROM dws_crypto_kline_1m
WHERE kline_time >= NOW() - INTERVAL 24 HOUR
GROUP BY symbol;
ADS 层(应用数据层):
-- Doris 表:ads_crypto_signal(交易信号)
CREATE TABLE ads_crypto_signal (
signal_id VARCHAR(64),
exchange VARCHAR(20),
symbol VARCHAR(20),
signal_type VARCHAR(20), -- GOLDEN_CROSS, DEATH_CROSS, BREAKOUT, RSI_OVERSOLD
signal_time DATETIME,
price DECIMAL(18, 8),
strength DECIMAL(5, 2), -- 信号强度 0-100
description TEXT,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(signal_id)
DISTRIBUTED BY HASH(symbol) BUCKETS 16;
核心实现代码:
1. WebSocket 数据采集(Flink Source)
// Binance WebSocket 数据源
public class BinanceWebSocketSource extends RichSourceFunction<CryptoTick> {
private volatile boolean running = true;
private WebSocketClient client;
private final List<String> symbols;
public BinanceWebSocketSource(List<String> symbols) {
this.symbols = symbols;
}
@Override
public void open(Configuration parameters) throws Exception {
// 构建 WebSocket URL
String streamNames = symbols.stream()
.map(s -> s.toLowerCase() + "@ticker")
.collect(Collectors.joining("/"));
String wsUrl = "wss://stream.binance.com:9443/stream?streams=" + streamNames;
// 创建 WebSocket 客户端
client = new WebSocketClient(new URI(wsUrl)) {
@Override
public void onMessage(String message) {
try {
CryptoTick tick = parseMessage(message);
sourceContext.collect(tick);
} catch (Exception e) {
log.error("Failed to parse message: {}", message, e);
}
}
@Override
public void onError(Exception ex) {
log.error("WebSocket error", ex);
// 重连逻辑
reconnect();
}
};
client.connect();
}
@Override
public void run(SourceContext<CryptoTick> ctx) throws Exception {
while (running) {
Thread.sleep(1000); // 保持连接
}
}
@Override
public void cancel() {
running = false;
if (client != null) {
client.close();
}
}
private CryptoTick parseMessage(String message) {
JSONObject json = JSON.parseObject(message);
JSONObject data = json.getJSONObject("data");
return CryptoTick.builder()
.exchange("binance")
.symbol(data.getString("s"))
.timestamp(data.getLong("E"))
.price(data.getBigDecimal("c"))
.volume(data.getBigDecimal("v"))
.bidPrice(data.getBigDecimal("b"))
.askPrice(data.getBigDecimal("a"))
.build();
}
private void reconnect() {
try {
Thread.sleep(5000); // 等待 5 秒
client.reconnect();
} catch (Exception e) {
log.error("Reconnect failed", e);
}
}
}
2. K 线聚合计算(Flink 窗口)
// 1 分钟 K 线聚合
public class KLineAggregateJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
// 读取 Tick 数据
DataStream<CryptoTick> tickStream = env
.addSource(new FlinkKafkaConsumer<>("ods_crypto_tick", ...))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<CryptoTick>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tick, ts) -> tick.getTimestamp())
);
// 1 分钟滚动窗口聚合
DataStream<KLine> klineStream = tickStream
.keyBy(tick -> tick.getExchange() + "_" + tick.getSymbol())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new KLineAggregateFunction());
// 写入 Kafka
klineStream.addSink(new FlinkKafkaProducer<>("ods_crypto_kline", ...));
// 写入 Doris
klineStream.addSink(DorisSink.<KLine>builder()
.setDorisOptions(DorisOptions.builder()
.setFenodes("doris-fe:8030")
.setTableIdentifier("crypto_db.dws_crypto_kline_1m")
.setUsername("root")
.setPassword("password")
.build())
.setSerializer(new KLineDorisSerializer())
.build());
env.execute("KLine Aggregate Job");
}
}
// K 线聚合函数
public class KLineAggregateFunction
implements AggregateFunction<CryptoTick, KLineAccumulator, KLine> {
@Override
public KLineAccumulator createAccumulator() {
return new KLineAccumulator();
}
@Override
public KLineAccumulator add(CryptoTick tick, KLineAccumulator acc) {
if (acc.open == null) {
acc.open = tick.getPrice();
acc.openTime = tick.getTimestamp();
}
acc.high = acc.high == null ? tick.getPrice() :
acc.high.max(tick.getPrice());
acc.low = acc.low == null ? tick.getPrice() :
acc.low.min(tick.getPrice());
acc.close = tick.getPrice();
acc.closeTime = tick.getTimestamp();
acc.volume = acc.volume.add(tick.getVolume());
acc.trades++;
acc.exchange = tick.getExchange();
acc.symbol = tick.getSymbol();
return acc;
}
@Override
public KLine getResult(KLineAccumulator acc) {
return KLine.builder()
.exchange(acc.exchange)
.symbol(acc.symbol)
.interval("1m")
.openTime(acc.openTime)
.closeTime(acc.closeTime)
.open(acc.open)
.high(acc.high)
.low(acc.low)
.close(acc.close)
.volume(acc.volume)
.trades(acc.trades)
.build();
}
@Override
public KLineAccumulator merge(KLineAccumulator a, KLineAccumulator b) {
a.high = a.high.max(b.high);
a.low = a.low.min(b.low);
a.close = b.close;
a.closeTime = b.closeTime;
a.volume = a.volume.add(b.volume);
a.trades += b.trades;
return a;
}
}
3. 技术指标计算(Flink 状态)
// 技术指标计算任务
public class TechnicalIndicatorJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取 K 线数据
DataStream<KLine> klineStream = env
.addSource(new FlinkKafkaConsumer<>("ods_crypto_kline", ...));
// 计算技术指标
DataStream<KLineWithIndicators> indicatorStream = klineStream
.keyBy(kline -> kline.getExchange() + "_" + kline.getSymbol())
.process(new TechnicalIndicatorProcessFunction());
// 写入 Doris
indicatorStream.addSink(DorisSink.<KLineWithIndicators>builder()
.setDorisOptions(...)
.build());
env.execute("Technical Indicator Job");
}
}
// 技术指标计算函数
public class TechnicalIndicatorProcessFunction
extends KeyedProcessFunction<String, KLine, KLineWithIndicators> {
// 状态:存储历史 K 线数据
private ListState<BigDecimal> priceHistory;
private ValueState<BigDecimal> ema12State;
private ValueState<BigDecimal> ema26State;
@Override
public void open(Configuration parameters) {
// 初始化状态
priceHistory = getRuntimeContext().getListState(
new ListStateDescriptor<>("price-history", BigDecimal.class));
ema12State = getRuntimeContext().getState(
new ValueStateDescriptor<>("ema12", BigDecimal.class));
ema26State = getRuntimeContext().getState(
new ValueStateDescriptor<>("ema26", BigDecimal.class));
}
@Override
public void processElement(KLine kline, Context ctx,
Collector<KLineWithIndicators> out) throws Exception {
// 添加当前价格到历史
priceHistory.add(kline.getClose());
// 获取历史价格列表
List<BigDecimal> prices = new ArrayList<>();
priceHistory.get().forEach(prices::add);
// 只保留最近 100 个价格
if (prices.size() > 100) {
prices = prices.subList(prices.size() - 100, prices.size());
priceHistory.clear();
prices.forEach(p -> {
try {
priceHistory.add(p);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
// 计算均线
BigDecimal ma5 = calculateMA(prices, 5);
BigDecimal ma10 = calculateMA(prices, 10);
BigDecimal ma20 = calculateMA(prices, 20);
// 计算 EMA
BigDecimal ema12 = calculateEMA(kline.getClose(), ema12State.value(), 12);
BigDecimal ema26 = calculateEMA(kline.getClose(), ema26State.value(), 26);
ema12State.update(ema12);
ema26State.update(ema26);
// 计算 MACD
BigDecimal macd = ema12.subtract(ema26);
// 计算 RSI
BigDecimal rsi = calculateRSI(prices, 14);
// 输出结果
out.collect(KLineWithIndicators.builder()
.kline(kline)
.ma5(ma5)
.ma10(ma10)
.ma20(ma20)
.ema12(ema12)
.ema26(ema26)
.macd(macd)
.rsi(rsi)
.build());
}
// 计算简单移动平均
private BigDecimal calculateMA(List<BigDecimal> prices, int period) {
if (prices.size() < period) {
return BigDecimal.ZERO;
}
List<BigDecimal> recentPrices = prices.subList(prices.size() - period, prices.size());
BigDecimal sum = recentPrices.stream()
.reduce(BigDecimal.ZERO, BigDecimal::add);
return sum.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
}
// 计算指数移动平均
private BigDecimal calculateEMA(BigDecimal currentPrice, BigDecimal prevEMA, int period) {
if (prevEMA == null) {
return currentPrice;
}
BigDecimal multiplier = BigDecimal.valueOf(2.0 / (period + 1));
return currentPrice.subtract(prevEMA)
.multiply(multiplier)
.add(prevEMA);
}
// 计算 RSI
private BigDecimal calculateRSI(List<BigDecimal> prices, int period) {
if (prices.size() < period + 1) {
return BigDecimal.valueOf(50);
}
BigDecimal gains = BigDecimal.ZERO;
BigDecimal losses = BigDecimal.ZERO;
for (int i = prices.size() - period; i < prices.size(); i++) {
BigDecimal change = prices.get(i).subtract(prices.get(i - 1));
if (change.compareTo(BigDecimal.ZERO) > 0) {
gains = gains.add(change);
} else {
losses = losses.add(change.abs());
}
}
BigDecimal avgGain = gains.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
BigDecimal avgLoss = losses.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
if (avgLoss.compareTo(BigDecimal.ZERO) == 0) {
return BigDecimal.valueOf(100);
}
BigDecimal rs = avgGain.divide(avgLoss, 8, RoundingMode.HALF_UP);
return BigDecimal.valueOf(100).subtract(
BigDecimal.valueOf(100).divide(
BigDecimal.ONE.add(rs), 8, RoundingMode.HALF_UP));
}
}
4. 交易信号生成(Flink CEP)
// 交易信号检测任务
public class TradingSignalJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取带指标的 K 线数据
DataStream<KLineWithIndicators> klineStream = env
.addSource(new FlinkKafkaConsumer<>("dws_crypto_kline_indicator", ...));
// 金叉信号检测(MA5 上穿 MA10)
DataStream<TradingSignal> goldenCrossSignals = klineStream
.keyBy(k -> k.getSymbol())
.process(new GoldenCrossDetector());
// RSI 超卖信号检测(RSI < 30)
DataStream<TradingSignal> oversoldSignals = klineStream
.filter(k -> k.getRsi().compareTo(BigDecimal.valueOf(30)) < 0)
.map(k -> TradingSignal.builder()
.signalType("RSI_OVERSOLD")
.symbol(k.getSymbol())
.price(k.getClose())
.strength(BigDecimal.valueOf(100).subtract(k.getRsi()))
.description("RSI 超卖信号,RSI=" + k.getRsi())
.build());
// 突破信号检测(价格突破 20 日均线)
DataStream<TradingSignal> breakoutSignals = klineStream
.keyBy(k -> k.getSymbol())
.process(new BreakoutDetector());
// 合并所有信号
DataStream<TradingSignal> allSignals = goldenCrossSignals
.union(oversoldSignals)
.union(breakoutSignals);
// 写入 Doris
allSignals.addSink(DorisSink.<TradingSignal>builder()
.setDorisOptions(...)
.build());
// 写入 Redis(实时查询)
allSignals.addSink(new RedisSink<>(...));
// 发送告警(Kafka)
allSignals
.filter(s -> s.getStrength().compareTo(BigDecimal.valueOf(80)) > 0)
.addSink(new FlinkKafkaProducer<>("trading-alerts", ...));
env.execute("Trading Signal Job");
}
}
// 金叉检测器
public class GoldenCrossDetector
extends KeyedProcessFunction<String, KLineWithIndicators, TradingSignal> {
private ValueState<Boolean> prevCrossState;
@Override
public void open(Configuration parameters) {
prevCrossState = getRuntimeContext().getState(
new ValueStateDescriptor<>("prev-cross", Boolean.class));
}
@Override
public void processElement(KLineWithIndicators kline, Context ctx,
Collector<TradingSignal> out) throws Exception {
BigDecimal ma5 = kline.getMa5();
BigDecimal ma10 = kline.getMa10();
if (ma5 == null || ma10 == null) {
return;
}
// 当前是否金叉(MA5 > MA10)
boolean currentCross = ma5.compareTo(ma10) > 0;
Boolean prevCross = prevCrossState.value();
// 检测金叉(从下方穿过到上方)
if (prevCross != null && !prevCross && currentCross) {
BigDecimal strength = ma5.subtract(ma10)
.divide(ma10, 4, RoundingMode.HALF_UP)
.multiply(BigDecimal.valueOf(100));
out.collect(TradingSignal.builder()
.signalType("GOLDEN_CROSS")
.symbol(kline.getSymbol())
.price(kline.getClose())
.strength(strength)
.description("MA5 金叉 MA10")
.build());
}
// 检测死叉(从上方穿过到下方)
if (prevCross != null && prevCross && !currentCross) {
BigDecimal strength = ma10.subtract(ma5)
.divide(ma10, 4, RoundingMode.HALF_UP)
.multiply(BigDecimal.valueOf(100));
out.collect(TradingSignal.builder()
.signalType("DEATH_CROSS")
.symbol(kline.getSymbol())
.price(kline.getClose())
.strength(strength)
.description("MA5 死叉 MA10")
.build());
}
prevCrossState.update(currentCross);
}
}
5. 实时查询 API(Spring Boot + Doris)
@RestController
@RequestMapping("/api/crypto")
public class CryptoDataController {
@Autowired
private DorisTemplate dorisTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 获取实时价格
@GetMapping("/price/{symbol}")
public CryptoPrice getRealtimePrice(@PathVariable String symbol) {
String key = "crypto:price:" + symbol;
String json = redisTemplate.opsForValue().get(key);
if (json != null) {
return JSON.parseObject(json, CryptoPrice.class);
}
// Redis 未命中,查询 Doris
String sql = """
SELECT symbol, close_price as price, kline_time as update_time
FROM dws_crypto_kline_1m
WHERE symbol = ?
ORDER BY kline_time DESC
LIMIT 1
""";
return dorisTemplate.queryForObject(sql,
new CryptoPriceRowMapper(), symbol);
}
// 获取 K 线数据
@GetMapping("/kline/{symbol}")
public List<KLine> getKLineData(
@PathVariable String symbol,
@RequestParam String interval,
@RequestParam(defaultValue = "100") int limit) {
String sql = """
SELECT *
FROM dws_crypto_kline_1m
WHERE symbol = ?
ORDER BY kline_time DESC
LIMIT ?
""";
return dorisTemplate.query(sql, new KLineRowMapper(), symbol, limit);
}
// 获取技术指标
@GetMapping("/indicators/{symbol}")
public KLineWithIndicators getIndicators(@PathVariable String symbol) {
String sql = """
SELECT *
FROM dws_crypto_kline_1m
WHERE symbol = ?
ORDER BY kline_time DESC
LIMIT 1
""";
return dorisTemplate.queryForObject(sql,
new IndicatorRowMapper(), symbol);
}
// 获取交易信号
@GetMapping("/signals")
public List<TradingSignal> getRecentSignals(
@RequestParam(required = false) String symbol,
@RequestParam(defaultValue = "20") int limit) {
StringBuilder sql = new StringBuilder("""
SELECT *
FROM ads_crypto_signal
WHERE 1=1
""");
List<Object> params = new ArrayList<>();
if (symbol != null) {
sql.append(" AND symbol = ?");
params.add(symbol);
}
sql.append(" ORDER BY signal_time DESC LIMIT ?");
params.add(limit);
return dorisTemplate.query(sql.toString(),
new TradingSignalRowMapper(), params.toArray());
}
// 获取市场概览
@GetMapping("/market-overview")
public MarketOverview getMarketOverview() {
String sql = """
SELECT
COUNT(DISTINCT symbol) as total_symbols,
SUM(volume) as total_volume_24h,
AVG(change_pct_24h) as avg_change_pct,
COUNT(CASE WHEN change_pct_24h > 0 THEN 1 END) as gainers,
COUNT(CASE WHEN change_pct_24h < 0 THEN 1 END) as losers
FROM mv_crypto_change_24h
""";
return dorisTemplate.queryForObject(sql, new MarketOverviewRowMapper());
}
// 获取涨跌幅排行
@GetMapping("/top-movers")
public TopMovers getTopMovers(@RequestParam(defaultValue = "10") int limit) {
String sqlGainers = """
SELECT symbol, close_24h as price, change_pct_24h
FROM mv_crypto_change_24h
ORDER BY change_pct_24h DESC
LIMIT ?
""";
String sqlLosers = """
SELECT symbol, close_24h as price, change_pct_24h
FROM mv_crypto_change_24h
ORDER BY change_pct_24h ASC
LIMIT ?
""";
List<CryptoMover> gainers = dorisTemplate.query(sqlGainers,
new CryptoMoverRowMapper(), limit);
List<CryptoMover> losers = dorisTemplate.query(sqlLosers,
new CryptoMoverRowMapper(), limit);
return TopMovers.builder()
.gainers(gainers)
.losers(losers)
.build();
}
}
性能优化方案:
1. Kafka 优化
# Kafka Producer 配置
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
acks=1 # 平衡性能和可靠性
compression.type=lz4 # 启用压缩
batch.size=32768 # 32KB 批次大小
linger.ms=10 # 10ms 延迟
buffer.memory=67108864 # 64MB 缓冲区
# Kafka Consumer 配置
fetch.min.bytes=1024 # 最小拉取 1KB
fetch.max.wait.ms=500 # 最大等待 500ms
max.poll.records=1000 # 每次拉取 1000 条
2. Flink 优化
// Flink 配置优化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint 配置
env.enableCheckpointing(60000); // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 状态后端配置(RocksDB)
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setDbStoragePath("/data/flink/rocksdb");
env.setStateBackend(backend);
// 并行度配置
env.setParallelism(16); // 全局并行度
env.setMaxParallelism(128); // 最大并行度
// 重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.seconds(10) // 重启间隔
));
3. Doris 优化
-- 表优化配置
CREATE TABLE dws_crypto_kline_1m (
...
)
DUPLICATE KEY(exchange, symbol, kline_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD", -- 使用 SSD
"compression" = "LZ4", -- LZ4 压缩
"bloom_filter_columns" = "symbol", -- 布隆过滤器
"colocate_with" = "crypto_group" -- 数据本地化
);
-- 创建 Rollup 加速查询
ALTER TABLE dws_crypto_kline_1m
ADD ROLLUP rollup_symbol_hour (
symbol,
DATE_TRUNC(kline_time, 'hour') as hour,
SUM(volume) as total_volume,
AVG(close_price) as avg_price
);
-- 分区策略(按天分区)
CREATE TABLE dws_crypto_kline_1m_partitioned (
...
)
PARTITION BY RANGE(kline_time) (
PARTITION p20240301 VALUES LESS THAN ("2024-03-02"),
PARTITION p20240302 VALUES LESS THAN ("2024-03-03")
)
DISTRIBUTED BY HASH(symbol) BUCKETS 32;
监控告警方案:
1. 数据质量监控
// 数据质量检查任务
public class DataQualityMonitor extends ProcessFunction<KLine, Alert> {
private transient Counter invalidDataCounter;
private transient Histogram priceHistogram;
@Override
public void open(Configuration parameters) {
invalidDataCounter = getRuntimeContext()
.getMetricGroup()
.counter("invalid_data_count");
priceHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("price_distribution",
new DescriptiveStatisticsHistogram(1000));
}
@Override
public void processElement(KLine kline, Context ctx, Collector<Alert> out) {
// 检查价格异常
if (kline.getHigh().compareTo(kline.getLow()) < 0) {
invalidDataCounter.inc();
out.collect(Alert.builder()
.type("DATA_QUALITY")
.level("ERROR")
.message("价格异常:最高价 < 最低价")
.symbol(kline.getSymbol())
.build());
}
// 检查价格波动
BigDecimal priceChange = kline.getHigh().subtract(kline.getLow())
.divide(kline.getLow(), 4, RoundingMode.HALF_UP);
if (priceChange.compareTo(BigDecimal.valueOf(0.1)) > 0) {
out.collect(Alert.builder()
.type("PRICE_VOLATILITY")
.level("WARNING")
.message("价格波动超过 10%")
.symbol(kline.getSymbol())
.build());
}
priceHistogram.update(kline.getClose().longValue());
}
}
2. 延迟监控
// 端到端延迟监控
public class LatencyMonitor extends ProcessFunction<CryptoTick, Void> {
private transient Histogram e2eLatency;
@Override
public void open(Configuration parameters) {
e2eLatency = getRuntimeContext()
.getMetricGroup()
.histogram("e2e_latency_ms",
new DescriptiveStatisticsHistogram(1000));
}
@Override
public void processElement(CryptoTick tick, Context ctx, Collector<Void> out) {
long latency = System.currentTimeMillis() - tick.getTimestamp();
e2eLatency.update(latency);
// 延迟告警
if (latency > 5000) {
log.warn("High latency detected: {}ms for symbol {}",
latency, tick.getSymbol());
}
}
}
3. Prometheus + Grafana 监控
# prometheus.yml
scrape_configs:
- job_name: 'flink'
static_configs:
- targets: ['flink-jobmanager:9249']
- job_name: 'kafka'
static_configs:
- targets: ['kafka-exporter:9308']
- job_name: 'doris'
static_configs:
- targets: ['doris-fe:8030']
关键监控指标:
| 组件 | 监控指标 | 告警阈值 | 说明 |
|---|---|---|---|
| Kafka | 消息积压 | > 10000 | Topic Lag 过大 |
| Kafka | 生产速率 | < 1000 TPS | 生产速率下降 |
| Flink | Checkpoint 失败率 | > 5% | Checkpoint 频繁失败 |
| Flink | 反压 | > 0.8 | 任务反压严重 |
| Flink | 端到端延迟 | > 5 秒 | 数据延迟过高 |
| Doris | 查询 P99 延迟 | > 1 秒 | 查询性能下降 |
| Doris | 磁盘使用率 | > 80% | 磁盘空间不足 |
| Redis | 内存使用率 | > 80% | 内存不足 |
| 系统 | CPU 使用率 | > 80% | CPU 负载过高 |
| 系统 | 内存使用率 | > 85% | 内存不足 |
部署架构:
1. 集群规划
| 组件 | 节点数 | 配置 | 说明 |
|---|---|---|---|
| Kafka | 3 | 16C 64G 2TB SSD | 3 个 Broker |
| Flink | 1 + 6 | JobManager: 8C 32G TaskManager: 16C 64G | 1 个 JM + 6 个 TM |
| Doris FE | 3 | 8C 32G 500G SSD | 3 个 FE(1 Master + 2 Follower) |
| Doris BE | 6 | 16C 64G 4TB SSD | 6 个 BE |
| Redis | 3 | 8C 32G | 主从 + 哨兵 |
| ZooKeeper | 3 | 4C 16G | Kafka 依赖 |
2. 容量规划
数据量估算:
- 交易对数量:500 个
- Tick 频率:平均 10 次/秒/交易对
- 总 TPS:500 * 10 = 5000 TPS
- 单条数据大小:约 200 字节
- 每日数据量:5000 * 200 * 86400 = 86.4 GB/天
存储规划:
- Kafka 保留 7 天:86.4 * 7 = 604.8 GB
- Doris 保留 90 天:86.4 * 90 = 7.78 TB
- 考虑副本和压缩:7.78 * 3 * 0.3 = 7 TB
网络带宽:
- 峰值 TPS:20000(4 倍平均值)
- 峰值带宽:20000 * 200 字节 = 4 MB/s = 32 Mbps
- 建议带宽:100 Mbps 以上
3. Docker Compose 部署示例
version: '3.8'
services:
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 32
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
volumes:
- kafka-data:/var/lib/kafka/data
deploy:
resources:
limits:
cpus: '16'
memory: 64G
# Flink JobManager
flink-jobmanager:
image: flink:1.18.0-scala_2.12-java11
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
- FLINK_PROPERTIES=
jobmanager.memory.process.size: 8192m
taskmanager.numberOfTaskSlots: 8
volumes:
- flink-checkpoints:/opt/flink/checkpoints
# Flink TaskManager
flink-taskmanager:
image: flink:1.18.0-scala_2.12-java11
depends_on:
- flink-jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
- FLINK_PROPERTIES=
taskmanager.memory.process.size: 16384m
taskmanager.numberOfTaskSlots: 8
deploy:
replicas: 6
# Doris FE
doris-fe:
image: apache/doris:2.0.0-fe
ports:
- "8030:8030"
- "9030:9030"
environment:
- FE_SERVERS=fe1:doris-fe:9010
volumes:
- doris-fe-meta:/opt/apache-doris/fe/doris-meta
# Doris BE
doris-be:
image: apache/doris:2.0.0-be
depends_on:
- doris-fe
environment:
- FE_SERVERS=doris-fe:9010
volumes:
- doris-be-storage:/opt/apache-doris/be/storage
deploy:
replicas: 6
# Redis
redis:
image: redis:7.2-alpine
ports:
- "6379:6379"
command: redis-server --maxmemory 16gb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
volumes:
kafka-data:
flink-checkpoints:
doris-fe-meta:
doris-be-storage:
redis-data:
核心挑战与解决方案:
1. 数据乱序问题
挑战:不同交易所的数据到达时间不一致,导致 K 线计算不准确。
解决方案:
- 使用 Watermark 机制容忍 5 秒乱序
- 设置窗口允许延迟 1 分钟
- 迟到数据输出到侧输出流单独处理
WatermarkStrategy<CryptoTick> watermarkStrategy = WatermarkStrategy
.<CryptoTick>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tick, ts) -> tick.getTimestamp())
.withIdleness(Duration.ofMinutes(1));
DataStream<KLine> klineStream = tickStream
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateDataTag)
.aggregate(...);
2. 状态管理问题
挑战:技术指标计算需要维护大量历史数据状态,内存压力大。
解决方案:
- 使用 RocksDB 状态后端
- 设置状态 TTL 自动清理过期数据
- 使用增量 Checkpoint 减少 Checkpoint 时间
// 状态 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(10, true)
.build();
ListStateDescriptor<BigDecimal> descriptor =
new ListStateDescriptor<>("price-history", BigDecimal.class);
descriptor.enableTimeToLive(ttlConfig);
3. 查询性能问题
挑战:实时大屏需要高并发查询,Doris 查询压力大。
解决方案:
- 使用 Redis 缓存热点数据(最新价格、涨跌幅)
- Doris 创建物化视图加速聚合查询
- 使用 Rollup 预聚合数据
- 查询结果缓存 5 秒
@Cacheable(value = "crypto-price", key = "#symbol", unless = "#result == null")
public CryptoPrice getRealtimePrice(String symbol) {
// 先查 Redis
String key = "crypto:price:" + symbol;
String json = redisTemplate.opsForValue().get(key);
if (json != null) {
return JSON.parseObject(json, CryptoPrice.class);
}
// Redis 未命中,查 Doris
return dorisTemplate.queryForObject(...);
}
4. 数据一致性问题
挑战:多个交易所的同一交易对价格不一致。
解决方案:
- 计算加权平均价格(按交易量加权)
- 标记数据来源,支持按交易所查询
- 异常价格检测和过滤
// 加权平均价格计算
public BigDecimal calculateWeightedAvgPrice(List<CryptoTick> ticks) {
BigDecimal totalValue = BigDecimal.ZERO;
BigDecimal totalVolume = BigDecimal.ZERO;
for (CryptoTick tick : ticks) {
totalValue = totalValue.add(tick.getPrice().multiply(tick.getVolume()));
totalVolume = totalVolume.add(tick.getVolume());
}
return totalValue.divide(totalVolume, 8, RoundingMode.HALF_UP);
}
案例总结:
| 维度 | 指标 | 说明 |
|---|---|---|
| 数据规模 | 5000 TPS,86 GB/天 | 500 个交易对实时行情 |
| 延迟 | P99 < 3 秒 | 从数据产生到可查询 |
| 可用性 | 99.9% | 年停机时间 < 9 小时 |
| 查询性能 | P99 < 500ms | 实时查询响应时间 |
| 成本 | 约 5 万/月 | 云服务器 + 带宽成本 |
技术亮点:
- 流批一体:Flink 统一处理实时和历史数据
- 多引擎融合:Kafka + Flink + Doris + Redis 各司其职
- 技术指标实时计算:基于 Flink 状态实现复杂指标计算
- 交易信号生成:Flink CEP 实现模式匹配
- 高性能查询:Doris 物化视图 + Redis 缓存
业务价值:
- 实时行情展示,延迟 < 1 秒
- 技术指标实时更新,支持量化交易
- 交易信号实时生成,捕捉市场机会
- 历史数据回测,验证交易策略
- 风险监控告警,及时发现异常
实战案例总结表(更新):
| 案例 | 延迟要求 | 数据量 | 技术栈 | 核心挑战 | 业务价值 |
|---|---|---|---|---|---|
| 实时大屏 | 秒级 | 百万级/秒 | Flink + ClickHouse + Redis | 高并发聚合 | 实时监控业务 |
| 实时风控 | 毫秒级 | 十万级/秒 | Flink + Redis + HBase | 低延迟、高准确率 | 防止欺诈损失 |
| 实时推荐 | 百毫秒级 | 千万级/秒 | Flink + HBase + Redis | 特征实时性 | 提升转化率 |
| 数据中台 | 秒级 | 亿级/天 | Flink + 多引擎 | 统一管理 | 数据服务化 |
| 加密货币行情 | 秒级 | 5000 TPS | Flink + Doris + Redis | 状态管理、查询性能 | 量化交易支持 |
实时数据中台
**业务场景:**构建统一的实时数据中台,为多个业务线提供实时数据服务。
架构设计:
统一数据采集平台:
// 统一数据采集配置
public class UnifiedDataCollector {
// 数据源注册
public void registerSource(DataSourceConfig config) {
switch (config.getType()) {
case MYSQL_CDC:
registerMySQLCDC(config);
break;
case LOG_FILE:
registerLogCollector(config);
break;
case HTTP_API:
registerAPICollector(config);
break;
case KAFKA_TOPIC:
registerKafkaSource(config);
break;
}
}
// MySQL CDC 注册
private void registerMySQLCDC(DataSourceConfig config) {
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname(config.getHost())
.port(config.getPort())
.databaseList(config.getDatabases())
.tableList(config.getTables())
.username(config.getUsername())
.password(config.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 注册到统一管理平台
sourceRegistry.register(config.getName(), source);
}
}
统一数据服务层:
@RestController
@RequestMapping("/api/data-service")
public class UnifiedDataServiceController {
@Autowired
private QueryRouter queryRouter;
// 统一查询接口
@PostMapping("/query")
public QueryResult query(@RequestBody QueryRequest request) {
// 根据查询类型路由到不同存储引擎
DataSource dataSource = queryRouter.route(request);
switch (dataSource) {
case CLICKHOUSE:
return clickHouseService.query(request);
case REDIS:
return redisService.query(request);
case HBASE:
return hbaseService.query(request);
default:
throw new UnsupportedOperationException("不支持的数据源");
}
}
// 查询路由策略
@Component
public class QueryRouter {
public DataSource route(QueryRequest request) {
// 实时指标查询 → Redis
if (request.isRealtimeMetric()) {
return DataSource.REDIS;
}
// 明细数据查询 → ClickHouse
if (request.isDetailQuery()) {
return DataSource.CLICKHOUSE;
}
// KV 查询 → HBase
if (request.isKVQuery()) {
return DataSource.HBASE;
}
return DataSource.CLICKHOUSE;
}
}
}
元数据管理:
| 功能 | 说明 | 实现方式 |
|---|---|---|
| 数据血缘 | 追踪数据从源到目标的流转路径 | Flink Lineage + Atlas |
| 数据目录 | 统一管理所有表、字段的元信息 | Hive Metastore + 自研 |
| 数据质量 | 自动化数据质量检测和告警 | 自研规则引擎 |
| 权限管理 | 细粒度的数据访问控制 | Ranger + RBAC |
| 数据标准 | 统一的命名规范和数据标准 | 数据治理平台 |
| SLA 管理 | 数据产出时效性保障 | 自研监控平台 |
数据血缘追踪实现:
// 数据血缘采集
public class LineageCollector {
// 记录表级血缘
public void recordTableLineage(String sourceTable, String targetTable,
String jobName, String transformLogic) {
LineageRecord record = LineageRecord.builder()
.sourceTable(sourceTable)
.targetTable(targetTable)
.jobName(jobName)
.transformLogic(transformLogic)
.createTime(LocalDateTime.now())
.build();
lineageRepository.save(record);
}
// 记录字段级血缘
public void recordColumnLineage(String sourceTable, String sourceColumn,
String targetTable, String targetColumn,
String expression) {
ColumnLineageRecord record = ColumnLineageRecord.builder()
.sourceTable(sourceTable)
.sourceColumn(sourceColumn)
.targetTable(targetTable)
.targetColumn(targetColumn)
.expression(expression)
.build();
columnLineageRepository.save(record);
}
// 查询数据血缘(向上追溯)
public List<LineageRecord> traceUpstream(String tableName, int depth) {
return lineageRepository.findUpstream(tableName, depth);
}
// 查询数据影响(向下追溯)
public List<LineageRecord> traceDownstream(String tableName, int depth) {
return lineageRepository.findDownstream(tableName, depth);
}
}
实战案例总结:
| 案例 | 延迟要求 | 数据量 | 技术栈 | 核心挑战 |
|---|---|---|---|---|
| 实时大屏 | 秒级 | 百万级/秒 | Flink + ClickHouse + Redis | 高并发聚合 |
| 实时风控 | 毫秒级 | 十万级/秒 | Flink + Redis + HBase | 低延迟、高准确率 |
| 实时推荐 | 百毫秒级 | 千万级/秒 | Flink + HBase + Redis | 特征实时性 |
| 数据中台 | 秒级 | 亿级/天 | Flink + 多引擎 | 统一管理、多业务线 |
实时数仓最佳实践
架构设计原则
1. 分层解耦
- 数据采集、计算、存储、应用各层独立
- 每层可独立扩展和优化
- 降低系统耦合度
2. 流批一体
- 使用 Flink 统一流批处理
- 数据湖支持流批写入
- 简化开发和运维
3. 湖仓一体
- 结合数据湖的灵活性和数据仓库的性能
- 支持 ACID 事务
- 统一元数据管理
4. 多引擎融合
- 根据不同场景选择合适的存储引擎
- ClickHouse:OLAP 分析
- Hudi/Iceberg:数据湖
- Redis/HBase:KV 存储
5. 元数据驱动
- 统一的元数据管理
- 数据血缘追踪
- 数据质量监控
开发规范
1. 命名规范
| 类型 | 规范 | 示例 |
|---|---|---|
| 表名 | {层级}{业务}{粒度} | dws_order_1min |
| 字段名 | 小写下划线 | order_id, create_time |
| Job 名 | {业务}_{功能}Job | OrderProcessJob |
| 类名 | 大驼峰 | OrderMapper |
| 方法名 | 小驼峰 | processOrder |
2. 代码规范
// 良好的代码结构
public class OrderProcessJob {
// 常量定义
private static final String KAFKA_SERVERS = "localhost:9092";
private static final int CHECKPOINT_INTERVAL = 60000;
public static void main(String[] args) throws Exception {
// 1. 环境配置
StreamExecutionEnvironment env = createEnvironment();
// 2. 数据源
DataStream<Order> orderStream = createOrderSource(env);
// 3. 数据处理
DataStream<OrderStats> statsStream = processOrders(orderStream);
// 4. 数据输出
addSinks(statsStream);
// 5. 执行任务
env.execute("Order Process Job");
}
private static StreamExecutionEnvironment createEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(CHECKPOINT_INTERVAL);
env.setParallelism(16);
return env;
}
private static DataStream<Order> createOrderSource(StreamExecutionEnvironment env) {
return env.addSource(new FlinkKafkaConsumer<>(...));
}
private static DataStream<OrderStats> processOrders(DataStream<Order> orderStream) {
return orderStream
.map(new OrderMapper())
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregateFunction());
}
private static void addSinks(DataStream<OrderStats> statsStream) {
statsStream.addSink(new ClickHouseSink<>(...));
}
}
3. 配置管理
// 使用配置文件
public class Config {
private static final Properties props = new Properties();
static {
try {
props.load(Config.class.getResourceAsStream("/application.properties"));
} catch (IOException e) {
throw new RuntimeException("Failed to load config", e);
}
}
public static String get(String key) {
return props.getProperty(key);
}
public static String get(String key, String defaultValue) {
return props.getProperty(key, defaultValue);
}
}
// application.properties
kafka.bootstrap.servers=localhost:9092
kafka.group.id=order-consumer
flink.checkpoint.interval=60000
flink.parallelism=16
clickhouse.url=jdbc:clickhouse://localhost:8123/default
测试策略
1. 单元测试
public class OrderMapperTest {
@Test
public void testOrderMapping() {
OrderMapper mapper = new OrderMapper();
OdsOrder ods = new OdsOrder();
ods.setOrderId("order123");
ods.setAmount("199.99");
DwdOrder dwd = mapper.map(ods);
assertEquals("order123", dwd.getOrderId());
assertEquals(new BigDecimal("199.99"), dwd.getAmount());
}
}
2. 集成测试
public class OrderProcessJobTest {
@Test
public void testOrderProcessing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建测试数据源
DataStream<Order> testStream = env.fromElements(
new Order("order1", "user1", new BigDecimal("100")),
new Order("order2", "user1", new BigDecimal("200"))
);
// 处理数据
DataStream<OrderStats> statsStream = testStream
.keyBy(Order::getUserId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new OrderAggregateFunction());
// 收集结果
List<OrderStats> results = new ArrayList<>();
statsStream.addSink(new CollectSink(results));
env.execute();
// 验证结果
assertEquals(1, results.size());
assertEquals(2, results.get(0).getOrderCount());
assertEquals(new BigDecimal("300"), results.get(0).getTotalAmount());
}
}
3. 性能测试
public class PerformanceTest {
@Test
public void testThroughput() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 生成大量测试数据
DataStream<Order> testStream = env
.addSource(new RateLimitedSourceFunction(10000)) // 10000 TPS
.setParallelism(16);
// 处理数据
DataStream<OrderStats> statsStream = testStream
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregateFunction());
// 测量吞吐量
statsStream.addSink(new ThroughputMeasureSink());
env.execute();
}
}
发布流程
1. 灰度发布
# Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-job-v2
spec:
replicas: 1 # 先发布 1 个实例
selector:
matchLabels:
app: flink-job
version: v2
template:
metadata:
labels:
app: flink-job
version: v2
spec:
containers:
- name: flink-job
image: flink-job:v2
2. 蓝绿部署
# 部署新版本(绿)
kubectl apply -f flink-job-v2.yaml
# 等待新版本就绪
kubectl wait --for=condition=ready pod -l version=v2
# 切换流量到新版本
kubectl patch service flink-job -p '{"spec":{"selector":{"version":"v2"}}}'
# 验证新版本
# 如果有问题,回滚到旧版本
kubectl patch service flink-job -p '{"spec":{"selector":{"version":"v1"}}}'
# 删除旧版本
kubectl delete deployment flink-job-v1
3. 金丝雀发布
# Argo Rollouts
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: flink-job
spec:
replicas: 10
strategy:
canary:
steps:
- setWeight: 10 # 10% 流量
- pause: {duration: 5m}
- setWeight: 30 # 30% 流量
- pause: {duration: 5m}
- setWeight: 50 # 50% 流量
- pause: {duration: 5m}
- setWeight: 100 # 100% 流量
成本优化
1. 计算资源优化
- 合理设置并行度,避免资源浪费
- 使用 Slot Sharing 共享资源
- 使用弹性伸缩,按需分配资源
2. 存储资源优化
- 冷热数据分离
- 数据压缩(LZ4/Snappy)
- 定期清理过期数据
3. 网络资源优化
- 数据压缩传输
- 减少数据 Shuffle
- 使用本地聚合
成本优化总结:
| 优化项 | 优化措施 | 效果 |
|---|---|---|
| 计算 | 合理并行度、资源共享 | 成本降低 30% |
| 存储 | 冷热分离、数据压缩 | 成本降低 50% |
| 网络 | 数据压缩、本地聚合 | 成本降低 40% |
容量规划与扩展
容量规划是实时数仓建设的关键环节,需要根据业务增长预期合理规划资源。
容量评估维度:
| 维度 | 评估指标 | 计算方式 | 示例 |
|---|---|---|---|
| 数据量 | 日增数据量 | 单条大小 × 日消息数 | 1KB × 1亿 = 100GB/天 |
| 吞吐量 | 峰值 TPS | 日均 TPS × 峰值系数 | 1万 × 5 = 5万 TPS |
| 存储 | 总存储量 | 日增量 × 保留天数 × 副本数 | 100GB × 30 × 3 = 9TB |
| 计算 | CPU 核数 | 并行度 × 每 Slot CPU | 64 × 2 = 128 核 |
| 内存 | 总内存 | TM 数 × 每 TM 内存 | 16 × 8GB = 128GB |
Kafka 容量规划:
# Kafka 存储容量计算
单条消息大小: 1KB
日消息量: 1 亿条
日数据量: 1KB × 100,000,000 = 100GB
副本数: 3
日存储量: 100GB × 3 = 300GB
保留天数: 7 天
总存储量: 300GB × 7 = 2.1TB
# Kafka 分区数计算
目标吞吐量: 50,000 TPS
单分区吞吐量: 5,000 TPS(经验值)
分区数: 50,000 / 5,000 = 10 个分区(建议 16 个,留余量)
# Broker 数量
每 Broker 磁盘: 1TB
Broker 数量: 2.1TB / 1TB ≈ 3 个(建议 5 个,留余量)
ClickHouse 容量规划:
-- 存储容量估算
-- 原始数据: 100GB/天
-- 压缩比: 10:1(列式存储 + LZ4)
-- 压缩后: 10GB/天
-- 保留天数: 90 天
-- 副本数: 2
-- 总存储: 10GB × 90 × 2 = 1.8TB
-- 查询性能估算
-- 单节点扫描速度: 200MB/s(SSD)
-- 90 天数据量: 900GB(单副本)
-- 全表扫描时间: 900GB / 200MB/s ≈ 75 分钟
-- 分区裁剪后(单天): 10GB / 200MB/s ≈ 50 秒
-- PREWHERE 过滤后: 1GB / 200MB/s ≈ 5 秒
扩展策略:
水平扩展操作步骤:
1. Kafka 扩容
# 增加分区数(不可减少)
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic orders --partitions 32
# 添加新 Broker 后重新分配分区
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment.json --execute
2. Flink 扩容
# 方式 1:停止任务,修改并行度后重启
flink stop <job-id> -p /savepoints/
flink run -s /savepoints/savepoint-xxx -p 64 job.jar
# 方式 2:Reactive Mode(自动扩缩容)
# flink-conf.yaml
scheduler-mode: reactive
3. ClickHouse 扩容
-- 添加新分片后,迁移数据
-- 1. 创建新的分布式表(包含新分片)
CREATE TABLE dwd_order_dist ON CLUSTER new_cluster
AS dwd_order_local
ENGINE = Distributed('new_cluster', 'default', 'dwd_order_local', rand());
-- 2. 历史数据重新分布(可选)
INSERT INTO dwd_order_dist SELECT * FROM dwd_order_local;
容量预警机制:
| 指标 | 黄色预警 | 红色预警 | 处理方式 |
|---|---|---|---|
| 磁盘使用率 | > 70% | > 85% | 扩容/清理数据 |
| CPU 使用率 | > 70% | > 90% | 扩容/优化代码 |
| 内存使用率 | > 75% | > 90% | 扩容/优化状态 |
| Kafka 堆积 | > 50万 | > 200万 | 增加消费者 |
| 查询延迟 | > 3s | > 10s | 优化查询/扩容 |
实时数仓常见问题与解决方案
Flink 任务常见问题
Flink OOM 问题排查
**问题现象:**TaskManager 频繁 OOM,任务不断重启。
排查步骤:
# 1. 查看 GC 日志
grep "Full GC" flink-taskmanager-*.log
# 2. 查看内存使用
curl http://taskmanager:8081/taskmanagers/<tm-id>/metrics?get=Status.JVM.Memory.Heap.Used
# 3. 生成堆转储
jmap -dump:format=b,file=heap.hprof <pid>
# 4. 分析堆转储
jhat heap.hprof
# 或使用 MAT (Memory Analyzer Tool)
常见原因及解决方案:
| 原因 | 现象 | 解决方案 |
|---|---|---|
| 状态过大 | 状态持续增长 | 设置 State TTL,定期清理 |
| 数据倾斜 | 部分 Task 内存高 | 加盐打散、两阶段聚合 |
| 窗口数据堆积 | 窗口触发前内存暴涨 | 使用增量聚合、减小窗口 |
| 对象创建过多 | GC 频繁 | 复用对象、使用对象池 |
| 外部连接泄漏 | 连接数持续增长 | 使用连接池、及时关闭 |
| 序列化问题 | 序列化后数据膨胀 | 使用高效序列化器 |
State TTL 配置示例:
// 设置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) // RocksDB 压缩时清理
.build();
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);
内存配置优化:
# flink-conf.yaml
taskmanager.memory.process.size: 8g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.jvm-overhead.fraction: 0.1
# RocksDB 状态后端优化
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4
Flink 反压问题处理
反压识别:
反压排查流程:
- Flink Web UI:查看各 Task 的反压状态(OK / LOW / HIGH)
- 定位瓶颈算子:从 Sink 向上游逐步排查,找到第一个反压为 HIGH 的算子
- 分析瓶颈原因:
- CPU 密集型:优化计算逻辑
- IO 密集型:使用异步 IO
- 数据倾斜:重新分区
反压解决方案:
// 方案 1:异步 IO 解决外部查询瓶颈
AsyncDataStream.unorderedWait(
inputStream,
new AsyncDatabaseRequest(),
5000, // 超时时间
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);
// 方案 2:使用 Cache 减少外部查询
public class CachedLookupFunction extends RichMapFunction<Order, EnrichedOrder> {
private transient LoadingCache<String, UserInfo> cache;
@Override
public void open(Configuration parameters) {
cache = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(new CacheLoader<String, UserInfo>() {
@Override
public UserInfo load(String userId) {
return queryFromDB(userId);
}
});
}
@Override
public EnrichedOrder map(Order order) {
UserInfo userInfo = cache.get(order.getUserId());
return new EnrichedOrder(order, userInfo);
}
}
// 方案 3:增加并行度
env.setParallelism(32); // 全局并行度
sinkOperator.setParallelism(64); // Sink 单独设置更高并行度
Checkpoint 失败问题
常见 Checkpoint 失败原因:
| 原因 | 错误信息 | 解决方案 |
|---|---|---|
| 超时 | Checkpoint expired before completing | 增加超时时间、优化状态大小 |
| 状态过大 | OutOfMemoryError during checkpoint | 启用增量 Checkpoint、优化状态 |
| 对齐超时 | Barrier alignment timeout | 使用 Unaligned Checkpoint |
| 存储故障 | HDFS/S3 写入失败 | 检查存储集群状态 |
| Task 失败 | Task failed during checkpoint | 修复 Task 异常 |
Unaligned Checkpoint 配置:
// 适用于反压严重的场景
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
// 增量 Checkpoint(RocksDB 状态后端)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// Checkpoint 存储配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 最小间隔 30 秒
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
Kafka 常见问题
消息丢失问题
消息丢失的三个环节:
防止消息丢失的配置:
# Producer 端
acks=all # 所有副本确认
retries=3 # 重试次数
max.in.flight.requests.per.connection=1 # 保证顺序
enable.idempotence=true # 幂等性
# Broker 端
min.insync.replicas=2 # 最少同步副本数
unclean.leader.election.enable=false # 禁止不完整副本选举
# Consumer 端
enable.auto.commit=false # 关闭自动提交
auto.offset.reset=earliest # 从最早开始消费
Flink Kafka Consumer 精确一次配置:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("orders")
.setGroupId("order-consumer")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setProperty("isolation.level", "read_committed") // 读已提交
.build();
消息重复消费问题
重复消费原因:
- Consumer 提交 offset 前宕机
- Rebalance 导致 offset 回退
- Producer 重试导致消息重复
解决方案:
// 方案 1:幂等写入(推荐)
public class IdempotentSinkFunction extends RichSinkFunction<Order> {
@Override
public void invoke(Order order, Context context) {
// 使用 UPSERT 语义写入
String sql = """
INSERT INTO dwd_order (order_id, amount, status, update_time)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
amount = VALUES(amount),
status = VALUES(status),
update_time = VALUES(update_time)
""";
jdbcTemplate.update(sql, order.getOrderId(), order.getAmount(),
order.getStatus(), order.getUpdateTime());
}
}
// 方案 2:基于 Redis 去重
public class DeduplicationFunction extends RichFilterFunction<Order> {
private transient Jedis jedis;
@Override
public boolean filter(Order order) {
String key = "dedup:" + order.getOrderId();
// SETNX 原子操作,设置过期时间
String result = jedis.set(key, "1", SetParams.setParams().nx().ex(86400));
return "OK".equals(result); // 返回 true 表示首次出现
}
}
ClickHouse 常见问题
写入性能问题
写入优化策略:
| 策略 | 说明 | 效果 |
|---|---|---|
| 批量写入 | 每批 10000+ 行 | 吞吐提升 10 倍 |
| 异步写入 | 使用 Buffer 表 | 降低写入延迟 |
| 分布式写入 | 写入本地表 | 避免分布式开销 |
| 数据预排序 | 按主键排序后写入 | 减少 Merge 开销 |
| 合理分区 | 避免分区过多 | 减少文件数 |
批量写入实现:
// ClickHouse 批量写入 Sink
public class ClickHouseBatchSink extends RichSinkFunction<Order> {
private static final int BATCH_SIZE = 10000;
private static final long FLUSH_INTERVAL = 10000; // 10 秒
private transient List<Order> buffer;
private transient ScheduledExecutorService scheduler;
private transient ClickHouseConnection connection;
@Override
public void open(Configuration parameters) {
buffer = new ArrayList<>(BATCH_SIZE);
connection = ClickHouseDataSource.getConnection();
// 定时刷新
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::flush,
FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.MILLISECONDS);
}
@Override
public void invoke(Order order, Context context) {
synchronized (buffer) {
buffer.add(order);
if (buffer.size() >= BATCH_SIZE) {
flush();
}
}
}
private void flush() {
List<Order> toFlush;
synchronized (buffer) {
if (buffer.isEmpty()) return;
toFlush = new ArrayList<>(buffer);
buffer.clear();
}
try (PreparedStatement ps = connection.prepareStatement(
"INSERT INTO dwd_order VALUES (?, ?, ?, ?, ?)")) {
for (Order order : toFlush) {
ps.setString(1, order.getOrderId());
ps.setBigDecimal(2, order.getAmount());
ps.setString(3, order.getStatus());
ps.setString(4, order.getUserId());
ps.setTimestamp(5, Timestamp.valueOf(order.getCreateTime()));
ps.addBatch();
}
ps.executeBatch();
} catch (SQLException e) {
log.error("ClickHouse 批量写入失败", e);
// 重试逻辑
}
}
@Override
public void close() {
flush();
scheduler.shutdown();
connection.close();
}
}
查询优化问题
慢查询优化清单:
-- 1. 使用 PREWHERE 替代 WHERE(减少数据读取量)
-- 差
SELECT * FROM dwd_order WHERE status = 'PAID' AND amount > 100;
-- 好
SELECT * FROM dwd_order PREWHERE status = 'PAID' WHERE amount > 100;
-- 2. 避免 SELECT *(只查需要的列)
-- 差
SELECT * FROM dwd_order WHERE order_date = '2026-01-01';
-- 好
SELECT order_id, amount, status FROM dwd_order WHERE order_date = '2026-01-01';
-- 3. 利用分区裁剪
-- 差(全表扫描)
SELECT count() FROM dwd_order WHERE toDate(create_time) = '2026-01-01';
-- 好(分区裁剪)
SELECT count() FROM dwd_order WHERE order_date = '2026-01-01';
-- 4. 使用物化视图预聚合
CREATE MATERIALIZED VIEW dws_order_daily_mv
ENGINE = SummingMergeTree()
ORDER BY (order_date, province)
AS SELECT
toDate(create_time) AS order_date,
province,
count() AS order_count,
sum(amount) AS total_amount
FROM dwd_order
GROUP BY order_date, province;
-- 5. 合理使用索引
ALTER TABLE dwd_order ADD INDEX idx_status status TYPE set(100) GRANULARITY 4;
ALTER TABLE dwd_order ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;
数据一致性问题
实时与离线数据不一致
不一致原因分析:
| 原因 | 说明 | 影响 |
|---|---|---|
| 数据延迟 | 实时数据未完全到达 | 实时 < 离线 |
| 数据重复 | 实时处理重复消费 | 实时 > 离线 |
| 计算逻辑差异 | 流批处理逻辑不同 | 结果不一致 |
| 时间窗口差异 | 窗口边界处理不同 | 边界数据差异 |
| 数据源差异 | 实时和离线读取不同快照 | 数据不一致 |
解决方案:
1. 统一计算逻辑(流批一体)
-- 使用 Flink SQL 统一流批处理
-- 同一套 SQL 既可以流处理也可以批处理
CREATE TABLE orders (
order_id STRING,
amount DECIMAL(10, 2),
create_time TIMESTAMP(3),
WATERMARK FOR create_time AS create_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 统一聚合逻辑
INSERT INTO dws_order_stats
SELECT
TUMBLE_START(create_time, INTERVAL '1' HOUR) AS window_start,
count(*) AS order_count,
sum(amount) AS total_amount
FROM orders
GROUP BY TUMBLE(create_time, INTERVAL '1' HOUR);
2. 定期数据对账与修复
// 自动对账任务
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点
public void reconcileData() {
LocalDate yesterday = LocalDate.now().minusDays(1);
// 查询实时数仓数据
RealtimeStats realtimeStats = clickHouseService.queryDailyStats(yesterday);
// 查询离线数仓数据
BatchStats batchStats = hiveService.queryDailyStats(yesterday);
// 对比差异
ReconcileResult result = reconcileService.compare(realtimeStats, batchStats);
if (result.getDiffRate() > 0.01) { // 差异超过 1%
// 发送告警
alertService.sendAlert("数据对账异常", result.toString());
// 自动修复(以离线数据为准)
if (result.getDiffRate() < 0.05) { // 差异小于 5% 自动修复
repairService.repairFromBatch(yesterday);
}
}
// 记录对账结果
reconcileLogService.save(result);
}
3. 数据修正机制
数据延迟问题
端到端延迟优化
延迟分解与优化:
| 阶段 | 典型延迟 | 优化目标 | 优化手段 |
|---|---|---|---|
| 数据采集 | 1-5s | < 1s | 减少采集间隔、使用 CDC |
| Kafka 传输 | 10-100ms | < 50ms | 优化 batch.size、linger.ms |
| Flink 处理 | 100ms-10s | < 1s | 优化算子、减少 Shuffle |
| Sink 写入 | 100ms-5s | < 500ms | 批量写入、异步写入 |
| 查询响应 | 100ms-3s | < 500ms | 预聚合、缓存 |
Kafka 低延迟配置:
# Producer 低延迟配置
linger.ms=5 # 减少等待时间(默认 0)
batch.size=16384 # 适当减小批次
buffer.memory=33554432 # 32MB 缓冲区
compression.type=lz4 # 快速压缩
# Consumer 低延迟配置
fetch.min.bytes=1 # 有数据就返回
fetch.max.wait.ms=100 # 最大等待 100ms
max.poll.records=500 # 每次拉取 500 条
Flink 低延迟配置:
// 减少缓冲区刷新间隔
env.setBufferTimeout(10); // 10ms 刷新一次
// 使用处理时间减少 Watermark 延迟
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 减少 Checkpoint 对延迟的影响
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 使用 Mini-Batch 优化(Flink SQL)
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");
资源管理问题
Flink 资源规划
资源估算公式:
TaskManager 数量 = 总并行度 / 每个 TM 的 Slot 数
每个 TM 内存 = 状态大小 / TM 数量 * 2 + 网络缓冲 + JVM 开销
每个 TM CPU = Slot 数 * 每个 Slot 的 CPU 需求
资源规划参考:
| 场景 | 数据量 | 并行度 | TM 数量 | TM 内存 | TM CPU |
|---|---|---|---|---|---|
| 小型 | 1万 TPS | 8 | 2 | 4GB | 2核 |
| 中型 | 10万 TPS | 32 | 8 | 8GB | 4核 |
| 大型 | 100万 TPS | 128 | 32 | 16GB | 8核 |
| 超大型 | 1000万 TPS | 512 | 128 | 32GB | 16核 |
Kubernetes 资源配置:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: order-processing
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: rocksdb
state.backend.incremental: "true"
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "8192m"
cpu: 4
replicas: 8
job:
jarURI: local:///opt/flink/jobs/order-processing.jar
parallelism: 32
upgradeMode: savepoint
弹性伸缩策略:
// 基于 Kafka 消费延迟的自动伸缩
public class AutoScaler {
private static final long LAG_THRESHOLD_SCALE_UP = 1000000; // 100 万
private static final long LAG_THRESHOLD_SCALE_DOWN = 10000; // 1 万
@Scheduled(fixedRate = 60000) // 每分钟检查
public void checkAndScale() {
long totalLag = kafkaAdminService.getConsumerGroupLag("order-consumer");
int currentParallelism = flinkService.getJobParallelism("order-processing");
if (totalLag > LAG_THRESHOLD_SCALE_UP) {
// 扩容:并行度翻倍,最大 256
int newParallelism = Math.min(currentParallelism * 2, 256);
flinkService.rescaleJob("order-processing", newParallelism);
log.info("扩容: {} -> {}, lag: {}", currentParallelism, newParallelism, totalLag);
} else if (totalLag < LAG_THRESHOLD_SCALE_DOWN && currentParallelism > 8) {
// 缩容:并行度减半,最小 8
int newParallelism = Math.max(currentParallelism / 2, 8);
flinkService.rescaleJob("order-processing", newParallelism);
log.info("缩容: {} -> {}, lag: {}", currentParallelism, newParallelism, totalLag);
}
}
}
跨机房容灾
多活架构设计
双机房架构:
Kafka MirrorMaker 2 配置:
# mm2.properties
clusters = primary, backup
primary.bootstrap.servers = kafka-a:9092
backup.bootstrap.servers = kafka-b:9092
# 主 → 备 同步
primary->backup.enabled = true
primary->backup.topics = orders, payments, users
# 备 → 主 同步(双活场景)
backup->primary.enabled = true
backup->primary.topics = orders, payments, users
# 同步配置
replication.factor = 3
sync.topic.configs.enabled = true
sync.topic.acls.enabled = true
emit.heartbeats.enabled = true
emit.checkpoints.enabled = true
故障切换流程:
| 步骤 | 操作 | 耗时 | 说明 |
|---|---|---|---|
| 1 | 检测故障 | 1-3 分钟 | 自动监控告警 |
| 2 | 确认切换 | 1-5 分钟 | 人工确认或自动决策 |
| 3 | DNS 切换 | 1-2 分钟 | 流量切换到备机房 |
| 4 | Flink 任务恢复 | 2-5 分钟 | 从 Checkpoint 恢复 |
| 5 | 数据校验 | 5-10 分钟 | 验证数据一致性 |
| 总计 | - | 10-25 分钟 | RTO 目标 < 30 分钟 |
高频面试题精选
1. 实时数仓和离线数仓的核心区别是什么?
答:核心区别在于数据处理模式和延迟。
处理模式:
- 离线数仓采用批处理模式,数据延迟通常是 T+1(天级),使用 Hive/Spark Batch 进行计算
- 实时数仓采用流处理模式,数据延迟在秒级或分钟级,使用 Flink/Spark Streaming 进行计算
技术差异:
- 离线数仓:HDFS + Hive + Spark Batch,定时调度,资源消耗集中
- 实时数仓:Kafka + Flink + ClickHouse,持续运行,资源占用稳定
适用场景:
- 离线数仓:历史分析、离线报表、数据挖掘
- 实时数仓:实时监控、实时决策、实时推荐
关键点:实时数仓需要处理更多边界情况(乱序、重复、状态管理),开发和运维复杂度更高,但能提供更好的业务价值。
2. Lambda 架构和 Kappa 架构的优缺点是什么?如何选择?
答:
Lambda 架构:
- 优点:容错性强(批处理保证准确性)、可扩展性好、历史数据可重算
- 缺点:需要维护两套代码(批处理 + 流处理)、数据一致性难保证、架构复杂、运维成本高
Kappa 架构:
- 优点:架构简单(只需维护一套流处理代码)、开发效率高、数据一致性好
- 缺点:依赖 Kafka 长期存储(成本高)、历史数据回溯速度慢、不适合超大规模数据
选择建议:
- 选择 Lambda:数据量超大(PB 级以上)、需要频繁重算历史数据、对准确性要求极高
- 选择 Kappa:数据量适中(TB 级)、以实时处理为主、追求架构简洁性
- 选择现代架构:需要流批一体、数据湖能力,推荐 Flink + Hudi/Iceberg
关键点:现代实时数仓更倾向于 Kappa 架构或流批一体的现代架构,因为技术成熟度提升,可以用流处理引擎统一处理实时和历史数据。
3. Flink 如何保证 Exactly-Once 语义?
答:Flink 通过两阶段提交(2PC)+ Checkpoint 机制保证端到端 Exactly-Once。
实现机制:
- Source 端:Kafka 支持 offset 管理,Checkpoint 时记录 offset
- 计算过程:Checkpoint 机制保证状态一致性,失败时从最近的 Checkpoint 恢复
- Sink 端:使用两阶段提交协议
- 第一阶段(Pre-commit):数据写入但不可见,等待 Checkpoint 完成
- 第二阶段(Commit):Checkpoint 成功后,正式提交,数据可见
- 失败时(Abort):回滚预提交的数据
关键配置:
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
sink.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
关键点:Exactly-Once 需要 Source、Flink、Sink 三方配合,任何一方不支持都无法实现端到端的 Exactly-Once。
4. 实时数仓如何处理数据乱序问题?
答:使用 Watermark 机制处理乱序数据。
Watermark 原理:
- Watermark 表示时间戳小于 Watermark 的数据已经全部到达
- 设置乱序容忍度:
forBoundedOutOfOrderness(Duration.ofSeconds(10)),容忍 10 秒乱序 - 窗口触发:当 Watermark 超过窗口结束时间时,触发窗口计算
处理策略:
- 设置合理的乱序容忍度
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
- 允许窗口延迟关闭
window.allowedLateness(Time.minutes(1))
- 侧输出流处理迟到数据
window.sideOutputLateData(lateOutputTag)
DataStream<Order> lateData = result.getSideOutput(lateOutputTag);
关键点:乱序容忍度的设置需要平衡实时性和准确性,容忍度越大,延迟越高,但数据越完整。
5. 如何解决实时数仓中的数据倾斜问题?
答:数据倾斜的识别和解决方案。
识别方法:
- Flink Web UI 查看各 Task 的处理速度
- 监控各 Task 的数据量分布
- 查看反压情况
解决方案:
1. 加盐(Salting):给热点 key 添加随机后缀
.map(order -> {
String newKey = order.getUserId() + "_" + random.nextInt(10);
order.setUserId(newKey);
return order;
})
2. 两阶段聚合:先局部聚合,再全局聚合
// 第一阶段:加盐聚合
.keyBy(order -> order.getUserId() + "_" + random.nextInt(10))
.window(...).aggregate(...)
// 第二阶段:去盐聚合
.keyBy(order -> order.getUserId())
.window(...).aggregate(...)
3. 自定义分区器:根据业务特点自定义分区逻辑
4. 使用 rebalance():强制数据重新分布
关键点:数据倾斜会导致部分 Task 成为瓶颈,影响整体性能,需要根据具体情况选择合适的解决方案。
6. ClickHouse 为什么查询这么快?
答:ClickHouse 的核心优化技术。
1. 列式存储:只读取需要的列,减少 IO 2. 向量化执行:使用 SIMD 指令批量处理数据,CPU 利用率高 3. 数据压缩:列式存储压缩比高(10:1),减少磁盘 IO 4. 稀疏索引:主键索引 + 跳数索引,快速定位数据 5. 分区裁剪:根据分区键快速过滤数据 6. 并行处理:多核并行查询 7. 预聚合:SummingMergeTree 等引擎自动聚合
适用场景:
- OLAP 分析、实时报表、大屏展示
- 不适合高并发点查和频繁更新
关键点:ClickHouse 的性能优势来自于针对 OLAP 场景的深度优化,但不支持事务和更新,需要结合业务场景选择。
7. 实时数仓如何保证数据质量?
答:数据质量保障体系。
1. 数据校验:
- 必填字段校验
- 数值范围校验
- 枚举值校验
- 时间合理性校验
2. 数据去重:
- 基于主键去重
- 使用状态 + TTL
- ClickHouse ReplacingMergeTree
3. 数据对账:
- 实时数仓 vs 离线数仓对账
- 定期核对数据一致性
- 差异分析和修复
4. 监控告警:
- 数据断流告警
- 数据重复率告警
- 数据延迟告警
- 数据质量率监控
5. 数据修复:
- 支持数据回溯
- Kafka 消息重放
- 手动修复工具
关键点:数据质量是实时数仓的生命线,需要建立完善的数据质量保障体系,从数据采集到数据应用全链路监控。
8. Flink 状态过大导致 Checkpoint 超时怎么办?
答:优化方案。
1. 启用增量 Checkpoint:
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
2. 设置状态 TTL:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.cleanupFullSnapshot()
.build();
3. 增加 Checkpoint 间隔:
env.enableCheckpointing(300000); // 5 分钟
4. 增加 Checkpoint 超时时间:
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
5. 优化状态存储:
- 使用 RocksDB 状态后端
- 调优 RocksDB 参数
- 增加托管内存
6. 业务优化:
- 减少状态大小
- 使用聚合状态而非列表状态
- 定期清理过期状态
关键点:大状态是实时数仓的常见问题,需要从技术和业务两方面优化,增量 Checkpoint 是必选项。
9. 如何监控实时数仓的端到端延迟?
答:端到端延迟监控方案。
1. 在数据中添加时间戳:
- 数据产生时间(Source Timestamp)
- Kafka 接收时间(Kafka Timestamp)
- Flink 处理时间(Process Timestamp)
- Sink 写入时间(Sink Timestamp)
2. 计算各阶段延迟:
- Kafka 延迟 = Kafka Timestamp - Source Timestamp
- 处理延迟 = Process Timestamp - Kafka Timestamp
- Sink 延迟 = Sink Timestamp - Process Timestamp
- 端到端延迟 = Sink Timestamp - Source Timestamp
3. 使用 Metrics 上报:
Histogram latencyHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("end_to_end_latency", new DescriptiveStatisticsHistogram(1000));
latencyHistogram.update(System.currentTimeMillis() - order.getSourceTimestamp());
4. Grafana 可视化:
- P50/P95/P99 延迟
- 延迟趋势图
- 各阶段延迟占比
关键点:端到端延迟监控是实时数仓的核心指标,需要全链路埋点,定位性能瓶颈。
10. 实时数仓和数据湖的关系是什么?
答:实时数仓和数据湖是互补关系。
数据湖(Hudi/Iceberg)的优势:
- 支持 ACID 事务
- 支持更新和删除
- 支持时间旅行
- 流批一体
结合方式:
1. 存储层:使用数据湖作为统一存储 2. 计算层:Flink 流式写入数据湖 3. 查询层:
- 实时查询:ClickHouse/Doris
- 批量查询:Spark/Presto 查询数据湖
典型架构:
Kafka → Flink → Hudi/Iceberg (数据湖)
↓
ClickHouse (实时查询)
↓
Spark (批量分析)
优势:
- 统一存储,避免数据孤岛
- 流批一体,简化架构
- 支持更新删除,满足 CDC 场景
- 成本优化,冷热数据分离
关键点:这种架构实现了湖仓一体,既有数据湖的灵活性,又有数据仓库的查询性能,是现代实时数仓的发展方向。
面试技巧总结:
1. 回答要有层次:
- 先说是什么(定义)
- 再说为什么(原理)
- 最后说怎么做(实践)
2. 结合实际项目经验:
- 说明在什么场景下使用
- 遇到什么问题
- 如何解决
3. 对比不同方案:
- 说明各方案的优缺点
- 适用场景
- 选型依据
4. 展示技术深度:
- 不仅知道怎么用
- 还要知道底层原理
- 能够优化和排查问题
5. 关注业务价值:
- 技术服务于业务
- 能说明技术带来的业务价值
- ROI 分析
常见面试题分类:
| 类别 | 题目示例 | 考察点 |
|---|---|---|
| 架构设计 | Lambda vs Kappa、分层设计 | 架构能力 |
| 核心技术 | Flink Exactly-Once、Watermark | 技术深度 |
| 性能优化 | 数据倾斜、Checkpoint 优化 | 优化能力 |
| 故障排查 | 任务重启、查询慢 | 问题解决能力 |
| 数据质量 | 数据校验、数据对账 | 质量意识 |
| 监控运维 | 延迟监控、告警机制 | 运维能力 |
| 实战经验 | 项目案例、技术选型 | 实践经验 |
面试准备建议:
- 深入理解核心技术:Flink、Kafka、ClickHouse 的原理和优化
- 总结项目经验:准备 2-3 个实战案例,包括背景、方案、效果
- 关注技术趋势:流批一体、湖仓一体、实时 OLAP
- 练习表达能力:清晰、有条理地表达技术方案
- 准备常见问题:架构设计、性能优化、故障排查
11. 如何设计一个高可用的实时数仓系统?
答:高可用设计需要从多个层面保障。
数据采集层:
- Kafka 多副本(replication.factor=3),保证消息不丢失
- CDC 工具高可用部署(主备切换)
- 数据源多通道采集,互为备份
计算层:
- Flink 高可用配置:ZooKeeper/Kubernetes 管理 JobManager 主备
- Checkpoint 持久化到 HDFS/S3,支持故障恢复
- 任务自动重启策略:
restart-strategy: fixed-delay,最大重试次数 + 重试间隔
存储层:
- ClickHouse 多副本(ReplicatedMergeTree),跨机架部署
- Redis Cluster 模式,自动故障转移
- HBase RegionServer 自动负载均衡
服务层:
- 多实例部署 + 负载均衡
- 熔断降级机制(Hystrix/Sentinel)
- 多级缓存(本地缓存 → Redis → ClickHouse)
关键配置:
// Flink 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha/
// 重启策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 30s
关键点:高可用不是单点保障,而是全链路保障,每个环节都需要有冗余和故障恢复机制。RTO(恢复时间目标)和 RPO(恢复点目标)是衡量高可用的核心指标。
12. Flink SQL 和 DataStream API 如何选择?
答:两者各有优势,选择取决于业务复杂度和开发效率。
对比:
| 维度 | Flink SQL | DataStream API |
|---|---|---|
| 开发效率 | 高,声明式编程 | 低,命令式编程 |
| 灵活性 | 受限于 SQL 表达能力 | 完全灵活 |
| 性能优化 | 自动优化(Planner) | 手动优化 |
| 状态管理 | 自动管理 | 手动管理 |
| 适用场景 | ETL、聚合、Join | 复杂业务逻辑 |
| 学习成本 | 低 | 高 |
| 调试难度 | 较难(SQL 黑盒) | 较易(代码可调试) |
选择建议:
- 选 Flink SQL:标准 ETL 处理、多表 Join、窗口聚合、快速开发
- 选 DataStream API:复杂状态管理、自定义窗口、精细化控制、CEP 模式匹配
- 混合使用:SQL 处理主流程,DataStream 处理特殊逻辑
// 混合使用示例
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
// SQL 处理主流程
Table orderTable = tableEnv.sqlQuery("""
SELECT order_id, user_id, amount,
TUMBLE_START(create_time, INTERVAL '1' MINUTE) AS window_start
FROM orders
GROUP BY TUMBLE(create_time, INTERVAL '1' MINUTE), order_id, user_id, amount
""");
// 转换为 DataStream 处理复杂逻辑
DataStream<Row> orderStream = tableEnv.toDataStream(orderTable);
DataStream<RiskResult> riskStream = orderStream
.keyBy(row -> row.getFieldAs("user_id"))
.process(new ComplexRiskDetectionFunction());
关键点:实际项目中推荐以 Flink SQL 为主、DataStream API 为辅的混合模式,兼顾开发效率和灵活性。
13. 实时数仓如何实现维表关联?
答:维表关联是实时数仓的核心需求,有多种实现方式。
方案对比:
| 方案 | 延迟 | 吞吐 | 一致性 | 适用场景 |
|---|---|---|---|---|
| 预加载 | 低 | 高 | 弱(定期刷新) | 小维表(< 100MB) |
| 异步 IO | 中 | 高 | 强 | 中等维表 |
| 广播流 | 低 | 高 | 强(实时更新) | 小维表 + 频繁更新 |
| Temporal Join | 中 | 中 | 强 | Flink SQL 场景 |
| 外部缓存 | 中 | 高 | 中(TTL 控制) | 大维表 |
广播流维表关联(推荐):
// 维表数据流(CDC 实时更新)
DataStream<DimUser> dimStream = env
.addSource(new FlinkKafkaConsumer<>("dim_user_cdc", ...))
.map(new DimUserMapper());
// 广播状态描述
MapStateDescriptor<String, DimUser> broadcastDesc =
new MapStateDescriptor<>("dim_user", String.class, DimUser.class);
BroadcastStream<DimUser> broadcastStream = dimStream.broadcast(broadcastDesc);
// 主流关联维表
DataStream<EnrichedOrder> enrichedStream = orderStream
.connect(broadcastStream)
.process(new BroadcastProcessFunction<Order, DimUser, EnrichedOrder>() {
@Override
public void processElement(Order order, ReadOnlyContext ctx,
Collector<EnrichedOrder> out) {
ReadOnlyBroadcastState<String, DimUser> state =
ctx.getBroadcastState(broadcastDesc);
DimUser user = state.get(order.getUserId());
if (user != null) {
out.collect(new EnrichedOrder(order, user));
} else {
// 维表未命中,输出到侧输出流
ctx.output(missedTag, order);
}
}
@Override
public void processBroadcastElement(DimUser user, Context ctx,
Collector<EnrichedOrder> out) {
BroadcastState<String, DimUser> state =
ctx.getBroadcastState(broadcastDesc);
state.put(user.getUserId(), user);
}
});
Flink SQL Temporal Join:
-- 创建维表(支持 Lookup)
CREATE TABLE dim_user (
user_id STRING,
user_name STRING,
province STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/dim',
'table-name' = 'dim_user',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '1h'
);
-- Temporal Join
SELECT o.order_id, o.amount, u.user_name, u.province
FROM orders AS o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;
关键点:维表关联方案的选择取决于维表大小、更新频率和一致性要求。小维表用广播流,大维表用异步 IO + 缓存,Flink SQL 场景用 Temporal Join。
14. 如何从零搭建一个实时数仓?
答:分五个阶段逐步建设。
第一阶段:基础设施搭建(1-2 周)
- 部署 Kafka 集群(3 Broker + ZooKeeper)
- 部署 Flink 集群(Kubernetes 模式)
- 部署 ClickHouse 集群(2 分片 2 副本)
- 部署监控系统(Prometheus + Grafana)
第二阶段:数据采集(1-2 周)
- 配置 CDC 采集(Flink CDC / Canal)
- 配置日志采集(Filebeat / Flume)
- 建立 ODS 层 Kafka Topic
第三阶段:数据处理(2-4 周)
- 开发 DWD 层 ETL 任务(数据清洗、转换)
- 开发 DWS 层聚合任务(窗口聚合、指标计算)
- 开发 ADS 层应用任务(业务指标)
第四阶段:数据服务(1-2 周)
- 开发统一查询 API
- 配置 Redis 缓存
- 对接前端大屏/报表
第五阶段:运维保障(持续)
- 完善监控告警
- 建立数据质量体系
- 制定应急预案
- 持续优化性能
关键点:实时数仓建设是一个渐进式过程,先跑通核心链路,再逐步完善。初期不要追求完美架构,先满足核心业务需求,再迭代优化。
15. 实时数仓的未来发展趋势是什么?
答:实时数仓正在向以下方向演进。
1. 流批一体
- Flink 统一流批处理引擎
- 一套代码、一套逻辑、两种执行模式
- 降低开发和运维成本
2. 湖仓一体
- 数据湖(Hudi/Iceberg/Paimon)+ 数据仓库融合
- 支持 ACID 事务、时间旅行、增量查询
- Apache Paimon(原 Flink Table Store)是重要方向
3. 实时 OLAP
- ClickHouse、Doris、StarRocks 等实时分析引擎
- 支持高并发、低延迟的即席查询
- 物化视图自动刷新
4. Serverless 化
- 按需分配计算资源
- 自动扩缩容
- 降低运维成本
5. AI + 实时数仓
- 实时特征工程
- 在线学习
- 智能运维(AIOps)
技术趋势总结:
| 趋势 | 代表技术 | 核心价值 |
|---|---|---|
| 流批一体 | Flink、Paimon | 统一开发、降低成本 |
| 湖仓一体 | Hudi、Iceberg、Paimon | 统一存储、灵活查询 |
| 实时 OLAP | Doris、StarRocks | 高并发、低延迟 |
| Serverless | 云原生 Flink | 弹性伸缩、按需付费 |
| AI 融合 | Feature Store | 实时特征、在线推理 |
关键点:实时数仓的终极目标是实时化、智能化、一体化,让数据从产生到价值变现的链路越来越短,成本越来越低。
面试准备建议:
- 深入理解核心技术:Flink、Kafka、ClickHouse 的原理和优化
- 总结项目经验:准备 2-3 个实战案例,包括背景、方案、效果
- 关注技术趋势:流批一体、湖仓一体、实时 OLAP
- 练习表达能力:清晰、有条理地表达技术方案
- 准备常见问题:架构设计、性能优化、故障排查