58. 实时数仓技术指南

目录

点击展开目录

实时数仓基础概念

什么是实时数仓

实时数仓(Real-time Data Warehouse) 是一种能够实时或准实时地采集、处理、存储和分析数据的数据仓库系统。与传统离线数仓按天或按小时批量处理数据不同,实时数仓能够在秒级甚至毫秒级完成数据的端到端处理,为业务决策提供最新的数据支持。

核心特征:

  1. 低延迟处理

    • 数据从产生到可查询的时间在秒级或分钟级
    • 支持毫秒级的数据写入和查询
    • 端到端延迟可控且稳定
  2. 流式计算

    • 采用流式计算引擎持续处理数据流
    • 数据逐条或微批处理,而非批量处理
    • 支持事件驱动的实时响应
  3. 实时可见

    • 业务指标实时更新,无需等待批处理完成
    • 支持实时查询和分析
    • 数据变化立即反映到报表和大屏
  4. 高吞吐量

    • 能够处理海量高并发的数据流
    • 支持百万级甚至千万级 TPS
    • 水平扩展能力强
  5. 容错性强

    • 支持故障自动恢复
    • 数据不丢失、不重复
    • 状态一致性保证

技术架构图:

graph TB subgraph "数据源层" A1["业务数据库
MySQL/PostgreSQL"] A2["日志数据
应用日志/Nginx"] A3["埋点数据
用户行为"] A4["第三方数据
API/文件"] end subgraph "数据采集层" B1["CDC
Canal/Debezium"] B2["日志采集
Flume/Filebeat"] B3["SDK/API"] B4["DataX/Sqoop"] end subgraph "消息队列层" C["Kafka
统一消息总线"] end subgraph "实时计算层" D1["Flink
流式计算"] D2["Spark Streaming
微批处理"] end subgraph "存储层" E1["ClickHouse
OLAP 分析"] E2["Hudi/Iceberg
数据湖"] E3["Redis/HBase
KV 存储"] E4["Doris
MPP 数据库"] end subgraph "应用层" F1["实时大屏"] F2["实时报表"] F3["实时推荐"] F4["实时风控"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B4 B1 --> C B2 --> C B3 --> C B4 --> C C --> D1 C --> D2 D1 --> E1 D1 --> E2 D1 --> E3 D2 --> E4 E1 --> F1 E2 --> F2 E3 --> F3 E4 --> F4 style C fill:#ffd43b style D1 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#51cf66 style F1 fill:#ff6b6b

典型应用场景:

场景延迟要求数据量级典型指标技术选型业务价值
实时大屏秒级百万级/秒GMV、订单量、UV/PVFlink + ClickHouse实时监控业务状态
实时风控毫秒级十万级/秒风险评分、异常检测Flink + Redis防止欺诈和损失
实时推荐百毫秒级千万级/秒用户画像、商品推荐Flink + HBase提升转化率
实时监控秒级百万级/秒系统指标、业务告警Flink + Prometheus快速发现问题
实时报表分钟级百万级/秒业务报表、数据分析Flink + Hudi支持实时决策
实时数据同步秒级十万级/秒数据一致性Flink CDC + Hudi数据实时可用

实时数仓 vs 离线数仓

核心差异对比:

维度离线数仓实时数仓说明
数据延迟T+1(天级)或小时级秒级/分钟级实时数仓延迟降低 1000 倍以上
处理模式批处理(Batch)流处理(Stream)处理范式完全不同
计算引擎Hive、Spark BatchFlink、Spark Streaming流式引擎更复杂
存储引擎HDFS、Hive 表ClickHouse、Hudi、Iceberg实时存储要求更高
数据更新全量/增量覆盖实时追加/更新实时数仓支持实时更新
查询性能分钟级秒级/毫秒级实时数仓查询更快
资源消耗低(定时调度)高(持续运行)实时数仓需要持续占用资源
数据一致性强一致最终一致实时数仓一致性要求更灵活
开发复杂度实时数仓需要处理更多边界情况
运维复杂度实时数仓需要 7x24 监控
适用场景历史分析、离线报表实时监控、实时决策各有侧重
成本实时数仓成本是离线的 2-3 倍

架构对比流程图:

graph LR subgraph "离线数仓架构" direction TB A1["业务数据库"] -->|"T+1 批量导入"| B1["Sqoop/DataX"] B1 --> C1["HDFS/Hive ODS"] C1 -->|"批处理 ETL"| D1["Hive DWD/DWS"] D1 -->|"批量聚合"| E1["Hive ADS"] E1 --> F1["BI 报表
延迟:天级"] end subgraph "实时数仓架构" direction TB A2["业务数据库"] -->|"CDC 实时采集"| B2["Kafka"] B2 -->|"流式处理"| C2["Flink 实时 ETL"] C2 --> D2["ClickHouse/Hudi"] D2 --> E2["实时大屏
延迟:秒级"] end style A1 fill:#a5d8ff style A2 fill:#a5d8ff style D1 fill:#ffd43b style C2 fill:#4dabf7 style F1 fill:#74c0fc style E2 fill:#51cf66

数据处理流程对比:

离线数仓处理流程:

  1. 每天凌晨定时调度任务
  2. 从业务库全量或增量抽取数据
  3. 数据加载到 HDFS/Hive
  4. 批量 ETL 处理(清洗、转换、关联)
  5. 批量聚合计算
  6. 结果写入数据集市
  7. 早上业务人员查看昨天的报表

实时数仓处理流程:

  1. CDC 实时捕获数据库变更
  2. 变更事件实时发送到 Kafka
  3. Flink 实时消费 Kafka 数据
  4. 流式 ETL 处理(清洗、转换、关联)
  5. 实时聚合计算(窗口、状态)
  6. 结果实时写入存储引擎
  7. 业务人员实时查看最新数据

性能对比示例:

假设一个电商场景,统计实时 GMV:

指标离线数仓实时数仓提升倍数
数据延迟24 小时5 秒17280 倍
查询延迟30 秒0.5 秒60 倍
数据新鲜度昨天的数据当前秒的数据-
决策响应次日调整实时调整-

实时数仓的核心价值

1. 业务价值

实时决策能力:

  • 场景:双十一大促,实时监控各品类销售情况
  • 价值:发现某品类销售不佳,立即调整营销策略,增加曝光
  • 效果:相比离线数仓次日调整,实时调整可提升 20-30% 的销售额

用户体验提升:

  • 场景:电商平台实时推荐系统
  • 价值:根据用户最近 5 分钟的浏览行为,实时更新推荐结果
  • 效果:点击率提升 15%,转化率提升 10%

风险控制:

  • 场景:金融交易实时风控
  • 价值:毫秒级识别异常交易,实时拦截欺诈行为
  • 效果:欺诈损失降低 80%,误拦截率降低 50%

运营效率:

  • 场景:物流配送实时监控
  • 价值:实时监控配送进度,及时发现异常,快速调度
  • 效果:配送时效提升 20%,客户满意度提升 15%

2. 技术价值

数据时效性:

graph LR A["数据产生"] -->|"离线:24h"| B1["离线数仓"] A -->|"实时:5s"| B2["实时数仓"] B1 -->|"次日可用"| C1["业务决策"] B2 -->|"实时可用"| C2["业务决策"] style A fill:#a5d8ff style B1 fill:#ffd43b style B2 fill:#51cf66 style C2 fill:#ff6b6b

系统响应速度:

  • 查询响应时间从分钟级降到秒级
  • 支持高并发查询(QPS 10000+)
  • 支持复杂分析查询(多维聚合、关联)

资源利用率:

  • 流式处理避免了批处理的资源峰值
  • 资源占用更平稳,更容易规划
  • 支持弹性伸缩,按需分配资源

架构简化:

  • 统一流批处理,减少重复开发
  • 一套代码同时支持实时和离线
  • 降低维护成本

3. 商业价值

graph TB A["实时数仓"] --> B["提升决策效率"] A --> C["优化用户体验"] A --> D["降低业务风险"] A --> E["增强竞争优势"] B --> F["营收增长
+15%"] C --> F D --> G["成本降低
-20%"] E --> F F --> H["ROI 提升
200%+"] G --> H style A fill:#ff6b6b style F fill:#51cf66 style G fill:#51cf66 style H fill:#ffd43b

ROI 计算示例:

假设一个中型电商平台:

  • 日均 GMV:1000 万
  • 实时数仓建设成本:200 万/年
  • 运维成本:100 万/年
  • 总成本:300 万/年

收益:

  • 实时推荐提升转化率 10%:GMV 增加 100 万/天 = 3.65 亿/年
  • 实时风控降低损失 80%:节省 500 万/年
  • 实时运营提升效率 20%:节省人力成本 200 万/年
  • 总收益:3.7 亿/年

ROI = (收益 - 成本) / 成本 = (3.7 亿 - 300 万) / 300 万 ≈ 123 倍

实时数仓的技术挑战

1. 数据一致性挑战

问题描述:

  • 分布式环境下,数据在多个节点间传输和处理
  • 网络延迟、节点故障可能导致数据丢失或重复
  • 多个数据源的数据需要保持一致性

解决方案:

Exactly-Once 语义:

// Flink Exactly-Once 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1. 启用 Checkpoint
env.enableCheckpointing(60000);  // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 2. Kafka Source Exactly-Once
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
    "topic",
    new SimpleStringSchema(),
    properties
);
consumer.setStartFromGroupOffsets();  // 从 Group Offset 开始

// 3. Kafka Sink Exactly-Once
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(...)
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  // 精确一次
    .setTransactionalIdPrefix("flink-kafka-")
    .build();

幂等性设计:

// 使用唯一 ID 保证幂等性
public class IdempotentSinkFunction extends RichSinkFunction<Order> {
    private transient RedisClient redis;
    
    @Override
    public void invoke(Order order, Context context) {
        String key = "order:" + order.getOrderId();
        
        // 检查是否已处理
        if (redis.exists(key)) {
            return;  // 已处理,跳过
        }
        
        // 处理数据
        processOrder(order);
        
        // 标记已处理
        redis.setex(key, 3600, "processed");  // 1 小时过期
    }
}

分布式事务:

  • 使用 Flink 的两阶段提交(2PC)
  • 使用 Hudi/Iceberg 的 ACID 事务
  • 使用 Saga 模式处理长事务

2. 数据延迟挑战

延迟来源分析:

graph LR A["数据产生"] -->|"网络延迟
10-50ms"| B["Kafka"] B -->|"消费延迟
10-100ms"| C["Flink"] C -->|"计算延迟
100-500ms"| D["处理完成"] D -->|"写入延迟
50-200ms"| E["存储"] E -->|"查询延迟
10-100ms"| F["应用"] style A fill:#a5d8ff style C fill:#4dabf7 style E fill:#51cf66 style F fill:#ff6b6b

优化方案:

环节优化措施效果
Kafka增加分区数、优化网络配置延迟降低 50%
Flink调整并行度、优化算子延迟降低 30%
存储批量写入、异步写入延迟降低 40%
查询添加索引、物化视图延迟降低 60%

端到端延迟监控:

public class LatencyMonitor extends ProcessFunction<Event, Event> {
    private transient Histogram latencyHistogram;
    
    @Override
    public void open(Configuration parameters) {
        latencyHistogram = getRuntimeContext()
            .getMetricGroup()
            .histogram("end_to_end_latency", 
                new DescriptiveStatisticsHistogram(1000));
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Event> out) {
        long latency = System.currentTimeMillis() - event.getTimestamp();
        latencyHistogram.update(latency);
        
        if (latency > 5000) {  // 超过 5 秒告警
            log.warn("High latency detected: {}ms, event: {}", latency, event);
        }
        
        out.collect(event);
    }
}

3. 数据乱序挑战

乱序产生原因:

  • 网络延迟不一致
  • 多数据源时间不同步
  • 分布式系统的固有特性

Watermark 机制:

graph TB subgraph "数据流" A["t=1"] --> B["t=3"] B --> C["t=2 (乱序)"] C --> D["t=4"] D --> E["t=5"] end subgraph "Watermark" W1["Watermark=0"] --> W2["Watermark=1"] W2 --> W3["Watermark=1 (等待)"] W3 --> W4["Watermark=2"] W4 --> W5["Watermark=3"] end A -.-> W1 B -.-> W2 C -.-> W3 D -.-> W4 E -.-> W5 style C fill:#ff6b6b style W3 fill:#ffd43b

处理乱序数据:

// 设置 Watermark 策略
WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))  // 容忍 10 秒乱序
    .withTimestampAssigner((order, timestamp) -> order.getCreateTime())
    .withIdleness(Duration.ofMinutes(1));  // 1 分钟无数据则推进 Watermark

DataStream<Order> watermarkedStream = stream
    .assignTimestampsAndWatermarks(watermarkStrategy);

// 窗口允许延迟
DataStream<OrderStats> result = watermarkedStream
    .keyBy(Order::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.minutes(1))  // 允许 1 分钟延迟
    .sideOutputLateData(lateOutputTag)  // 超过延迟的数据输出到侧输出流
    .aggregate(new OrderAggregateFunction());

// 处理迟到数据
DataStream<Order> lateData = result.getSideOutput(lateOutputTag);
lateData.addSink(new LateDataSink());  // 单独处理迟到数据

4. 状态管理挑战

大状态问题:

  • 状态大小可能达到 TB 级别
  • 状态恢复时间长(小时级)
  • 内存压力大,容易 OOM

解决方案:

使用 RocksDB 状态后端:

// RocksDB 状态后端配置
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);  // 启用增量 Checkpoint
backend.setDbStoragePath("/data/flink/rocksdb");

env.setStateBackend(backend);
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");

状态 TTL:

// 设置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))  // 24 小时过期
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 创建和写入时更新
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 不返回过期数据
    .cleanupFullSnapshot()  // 全量快照时清理
    .cleanupIncrementally(10, true)  // 增量清理
    .build();

ValueStateDescriptor<String> descriptor = 
    new ValueStateDescriptor<>("user-state", String.class);
descriptor.enableTimeToLive(ttlConfig);

ValueState<String> state = getRuntimeContext().getState(descriptor);

增量 Checkpoint:

  • 只保存状态的增量变化
  • Checkpoint 时间从小时级降到分钟级
  • 恢复时间大幅缩短

5. 资源成本挑战

成本构成:

资源类型占比优化空间
计算资源40%
存储资源30%
网络资源20%
人力成本10%

成本优化策略:

  1. 计算资源优化

    • 合理设置并行度,避免资源浪费
    • 使用 Slot Sharing 共享资源
    • 使用弹性伸缩,按需分配资源
  2. 存储资源优化

    • 冷热数据分离
    • 数据压缩(LZ4/Snappy)
    • 定期清理过期数据
  3. 网络资源优化

    • 数据压缩传输
    • 减少数据 Shuffle
    • 使用本地聚合

实时数仓的应用场景

1. 实时大屏监控

业务需求:

  • 实时展示核心业务指标(GMV、订单量、UV/PV)
  • 多维度分析(地域、品类、时间)
  • 秒级数据刷新

技术方案:

  • 数据采集:Canal CDC
  • 消息队列:Kafka
  • 实时计算:Flink
  • 存储:ClickHouse(明细)+ Redis(实时指标)
  • 展示:DataV/Grafana

2. 实时风控系统

业务需求:

  • 毫秒级风险识别
  • 多维度风险规则
  • 实时拦截异常交易

技术方案:

  • 数据采集:SDK 埋点
  • 消息队列:Kafka
  • 实时计算:Flink CEP(复杂事件处理)
  • 存储:Redis(规则)+ HBase(历史)
  • 决策:实时评分 + 规则引擎

3. 实时推荐系统

业务需求:

  • 根据用户实时行为推荐
  • 百毫秒级响应
  • 个性化推荐

技术方案:

  • 数据采集:埋点 SDK
  • 消息队列:Kafka
  • 实时计算:Flink(特征计算)
  • 存储:HBase(用户画像)+ Redis(推荐结果)
  • 算法:协同过滤 + 深度学习

4. 实时数据同步

业务需求:

  • 多数据源实时同步
  • 数据一致性保证
  • 支持数据转换

技术方案:

  • 数据采集:Flink CDC
  • 消息队列:Kafka
  • 实时计算:Flink
  • 存储:Hudi/Iceberg(数据湖)
  • 查询:Presto/Trino

5. 实时数据质量监控

业务需求:

  • 实时监控数据质量
  • 异常数据告警
  • 数据血缘追踪

技术方案:

  • 数据采集:全链路埋点
  • 消息队列:Kafka
  • 实时计算:Flink(质量检查)
  • 存储:Elasticsearch(日志)+ MySQL(元数据)
  • 告警:AlertManager

应用场景总结表:

场景延迟数据量复杂度成本适用行业
实时大屏秒级电商、金融、物流
实时风控毫秒级金融、支付、电商
实时推荐百毫秒级极高电商、视频、社交
实时同步秒级所有行业
实时监控秒级所有行业

实时数仓架构演进

Lambda 架构

Lambda 架构是由 Nathan Marz 在 2011 年提出的大数据处理架构,通过批处理和流处理两条链路并行处理数据,最终合并结果,是实时数仓架构的重要里程碑。

架构组成:

graph TB subgraph "数据源" A["业务数据库
日志/埋点"] end subgraph "批处理层 Batch Layer" B["HDFS
Master Dataset"] --> C["Spark Batch
MapReduce"] C --> D["批处理视图
Batch View"] end subgraph "速度层 Speed Layer" E["Kafka
消息队列"] --> F["Flink/Storm
流式计算"] F --> G["实时视图
Real-time View"] end subgraph "服务层 Serving Layer" D --> H["查询合并
Query Merge"] G --> H H --> I["统一查询接口
API"] end A -->|"批量导入
T+1"| B A -->|"实时采集
CDC"| E style B fill:#a5d8ff style E fill:#ffd43b style H fill:#51cf66 style I fill:#ff6b6b

三层架构详解:

1. 批处理层(Batch Layer)

职责:

  • 存储完整的历史数据(Master Dataset)
  • 定期批量计算,生成批处理视图
  • 保证数据的准确性和完整性
  • 支持数据重新计算和修正

技术栈:

  • 存储:HDFS、Hive
  • 计算:Spark Batch、MapReduce
  • 调度:Airflow、Oozie

特点:

  • 高延迟(小时/天级)
  • 高准确性(强一致性)
  • 支持复杂计算
  • 资源消耗集中在批处理时段

2. 速度层(Speed Layer)

职责:

  • 处理增量实时数据
  • 快速生成实时视图
  • 补充批处理层的延迟
  • 提供最新的数据

技术栈:

  • 消息队列:Kafka、Pulsar
  • 计算:Flink、Storm、Spark Streaming
  • 存储:Redis、HBase、Cassandra

特点:

  • 低延迟(秒级)
  • 最终一致性
  • 只处理最近的数据
  • 持续占用资源

3. 服务层(Serving Layer)

职责:

  • 合并批处理视图和实时视图
  • 提供统一的查询接口
  • 处理查询请求
  • 返回完整的结果

技术栈:

  • 查询引擎:Druid、ClickHouse、Presto
  • API 层:Spring Boot、FastAPI
  • 缓存:Redis、Memcached

特点:

  • 查询合并逻辑
  • 统一对外接口
  • 支持高并发查询

Lambda 架构工作流程:

sequenceDiagram participant Source as 数据源 participant Batch as 批处理层 participant Speed as 速度层 participant Serving as 服务层 participant User as 用户查询 Source->>Batch: 批量导入(T+1) Source->>Speed: 实时采集(CDC) Batch->>Batch: 批量计算 Speed->>Speed: 流式计算 Batch->>Serving: 批处理视图 Speed->>Serving: 实时视图 User->>Serving: 查询请求 Serving->>Serving: 合并视图 Serving->>User: 返回结果

Lambda 架构优缺点:

优点缺点
容错性强:批处理保证数据准确性,可重新计算维护成本高:需要维护两套代码(批处理 + 流处理)
可扩展性好:批流分离,各自独立扩展数据一致性难:批处理和流处理结果可能不一致
历史数据可重算:支持数据修正和逻辑变更架构复杂:三层架构,组件多,运维复杂
适合复杂业务:批处理支持复杂的计算逻辑资源消耗大:批流两套系统,资源占用多
数据完整性好:Master Dataset 保存完整数据开发效率低:同一逻辑需要实现两次

Lambda 架构适用场景:

  1. 数据量超大(PB 级以上)
  2. 需要频繁重算历史数据
  3. 对数据准确性要求极高
  4. 业务逻辑复杂,需要批处理支持
  5. 有足够的开发和运维资源

Lambda 架构实战示例:

// 批处理层:Spark Batch 计算每日订单统计
SparkSession spark = SparkSession.builder()
    .appName("Daily Order Stats")
    .getOrCreate();

Dataset<Row> orders = spark.read()
    .format("hive")
    .table("ods.orders")
    .where("dt = '2024-02-28'");

Dataset<Row> stats = orders
    .groupBy("province", "category")
    .agg(
        count("order_id").as("order_count"),
        sum("amount").as("total_amount")
    );

stats.write()
    .mode(SaveMode.Overwrite)
    .format("hive")
    .saveAsTable("ads.daily_order_stats");

// 速度层:Flink 实时计算订单统计
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Order> orders = env
    .addSource(new FlinkKafkaConsumer<>("orders", ...));

DataStream<OrderStats> realtimeStats = orders
    .keyBy(order -> Tuple2.of(order.getProvince(), order.getCategory()))
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new OrderAggregateFunction());

realtimeStats.addSink(new RedisSink<>(...));

// 服务层:合并批处理和实时视图
@GetMapping("/order-stats")
public OrderStats getOrderStats(String province, String category) {
    // 1. 从 Hive 获取批处理结果(昨天及之前)
    OrderStats batchStats = hiveService.getStats(province, category, 
        LocalDate.now().minusDays(1));
    
    // 2. 从 Redis 获取实时结果(今天)
    OrderStats realtimeStats = redisService.getStats(province, category);
    
    // 3. 合并结果
    return OrderStats.builder()
        .orderCount(batchStats.getOrderCount() + realtimeStats.getOrderCount())
        .totalAmount(batchStats.getTotalAmount().add(realtimeStats.getTotalAmount()))
        .build();
}

Kappa 架构

Kappa 架构是由 LinkedIn 的 Jay Kreps 在 2014 年提出,是对 Lambda 架构的简化,核心思想是一切皆流,只保留流处理链路。

核心思想:

  1. 统一处理:用同一套流处理引擎处理实时和历史数据
  2. 批处理即流处理:将批处理视为流处理的特例(有界流)
  3. 简化架构:去掉批处理层,降低维护成本
  4. Kafka 长期存储:利用 Kafka 的持久化能力存储历史数据

架构流程图:

graph TB subgraph "数据源" A["业务数据库
日志/埋点"] end subgraph "消息队列(长期存储)" B["Kafka
保留 7-30 天"] end subgraph "流处理层" C1["Flink Job 1
实时计算"] C2["Flink Job 2
历史回溯"] end subgraph "存储层" D["ClickHouse/Hudi
统一存储"] end subgraph "应用层" E["统一查询接口"] end A -->|"CDC 实时采集"| B B -->|"实时流"| C1 B -->|"历史流
(重放)"| C2 C1 --> D C2 --> D D --> E style B fill:#ffd43b style C1 fill:#4dabf7 style C2 fill:#74c0fc style D fill:#51cf66

Kappa 架构特点:

1. 统一流处理

所有数据处理都通过流处理引擎完成:

  • 实时数据:从 Kafka 实时消费,持续处理
  • 历史数据:从 Kafka 重放历史消息,批量处理
  • 代码复用:同一套代码处理实时和历史数据

2. Kafka 作为数据中心

Kafka 不仅是消息队列,还是数据存储:

  • 长期保留:配置较长的保留时间(7-30 天)
  • 数据回溯:支持从任意 offset 开始消费
  • 数据重放:支持多次消费同一份数据

3. 灵活的数据重算

当需要重新计算数据时:

  • 停止当前的流处理任务
  • 启动新的流处理任务,从历史 offset 开始消费
  • 重新计算并写入存储
  • 完成后切换到实时处理

Kappa 架构优缺点:

优点缺点
架构简单:只需维护一套流处理代码依赖 Kafka:需要 Kafka 长期存储,成本较高
开发效率高:统一流处理,代码复用回溯速度慢:历史数据重算速度不如批处理
数据一致性好:同一套逻辑,结果一致不适合超大数据:PB 级数据回溯困难
运维成本低:组件少,维护简单对流引擎要求高:需要成熟的流处理引擎
实时性好:专注流处理,延迟更低存储成本高:Kafka 长期存储成本高

Kappa 架构适用场景:

  1. 数据量适中(TB 级)
  2. 以实时处理为主
  3. 历史数据重算频率低
  4. 追求架构简洁性
  5. 团队熟悉流处理技术

Kappa 架构实战示例:

// 统一的流处理代码
public class OrderStatsJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "order-stats");
        
        // 根据参数决定是实时处理还是历史回溯
        FlinkKafkaConsumer<Order> consumer = new FlinkKafkaConsumer<>(
            "orders",
            new OrderDeserializationSchema(),
            properties
        );
        
        if (args.length > 0 && "reprocess".equals(args[0])) {
            // 历史回溯:从指定 offset 开始
            Map<TopicPartition, Long> specificStartOffsets = new HashMap<>();
            specificStartOffsets.put(new TopicPartition("orders", 0), 0L);
            specificStartOffsets.put(new TopicPartition("orders", 1), 0L);
            consumer.setStartFromSpecificOffsets(specificStartOffsets);
        } else {
            // 实时处理:从最新 offset 开始
            consumer.setStartFromLatest();
        }
        
        // 统一的处理逻辑
        DataStream<Order> orders = env.addSource(consumer);
        
        DataStream<OrderStats> stats = orders
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((order, ts) -> order.getCreateTime())
            )
            .keyBy(order -> Tuple2.of(order.getProvince(), order.getCategory()))
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .aggregate(new OrderAggregateFunction());
        
        // 写入存储
        stats.addSink(new ClickHouseSink<>(...));
        
        env.execute("Order Stats Job");
    }
}

// Kafka 配置:长期保留
// server.properties
log.retention.hours=720  # 保留 30 天
log.segment.bytes=1073741824  # 1GB
log.retention.check.interval.ms=300000

数据重算流程:

sequenceDiagram participant Dev as 开发者 participant Flink as Flink 任务 participant Kafka as Kafka participant Storage as 存储 Note over Dev: 发现数据错误
需要重新计算 Dev->>Flink: 停止实时任务 Dev->>Flink: 启动回溯任务
(从历史 offset) Flink->>Kafka: 从 offset=0 开始消费 Kafka->>Flink: 返回历史数据 loop 处理历史数据 Flink->>Flink: 流式处理 Flink->>Storage: 写入结果 end Note over Flink: 历史数据处理完成 Dev->>Flink: 停止回溯任务 Dev->>Flink: 启动实时任务
(从最新 offset) Flink->>Kafka: 从最新 offset 消费 Kafka->>Flink: 返回实时数据

现代实时数仓架构

现代实时数仓架构融合了 Lambda 和 Kappa 的优点,结合数据湖技术(Hudi/Iceberg),形成了更加灵活和高效的流批一体、湖仓一体架构。

典型架构:

graph TB subgraph "数据源层" A1["业务数据库
MySQL/PostgreSQL"] A2["日志数据
应用日志/Nginx"] A3["埋点数据
用户行为"] A4["第三方数据
API/文件"] end subgraph "数据采集层" B1["CDC
Canal/Debezium/Flink CDC"] B2["日志采集
Flume/Filebeat"] B3["SDK/API"] B4["批量导入
DataX/Sqoop"] end subgraph "消息队列层" C["Kafka
统一消息总线"] end subgraph "实时计算层" D1["Flink
实时 ETL"] D2["Flink
实时聚合"] D3["Flink
实时关联"] end subgraph "存储层" E1["ClickHouse
OLAP 查询"] E2["Hudi/Iceberg
数据湖"] E3["Redis/HBase
KV 存储"] E4["Doris
MPP 数据库"] end subgraph "批处理层(可选)" F["Spark Batch
离线计算"] end subgraph "应用层" G1["实时大屏"] G2["实时报表"] G3["实时推荐"] G4["实时风控"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B4 B1 --> C B2 --> C B3 --> C B4 --> C C --> D1 C --> D2 C --> D3 D1 --> E1 D1 --> E2 D2 --> E3 D3 --> E4 E2 --> F F --> E2 E1 --> G1 E2 --> G2 E3 --> G3 E4 --> G4 style C fill:#ffd43b style D1 fill:#4dabf7 style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E2 fill:#51cf66

现代架构核心特点:

1. 流批一体

使用 Flink 统一流批处理:

  • 流处理:实时数据流式处理
  • 批处理:历史数据批量处理
  • 统一 API:DataStream API 和 Table API
  • 统一引擎:同一个 Flink 集群
// Flink 流批一体示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 流处理:无界流
DataStream<Order> streamOrders = env
    .addSource(new FlinkKafkaConsumer<>("orders", ...));

// 批处理:有界流
DataStream<Order> batchOrders = env
    .readTextFile("hdfs://path/to/orders")
    .map(new OrderMapper());

// 统一的处理逻辑
DataStream<OrderStats> processOrders(DataStream<Order> orders) {
    return orders
        .keyBy(Order::getUserId)
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new OrderAggregateFunction());
}

DataStream<OrderStats> streamStats = processOrders(streamOrders);
DataStream<OrderStats> batchStats = processOrders(batchOrders);

2. 湖仓一体

结合数据湖和数据仓库的优势:

  • 数据湖:灵活的存储,支持多种格式
  • 数据仓库:高性能的查询,支持 SQL
  • ACID 事务:保证数据一致性
  • Schema Evolution:支持 Schema 变更

Hudi/Iceberg 特性对比:

特性HudiIcebergDelta Lake
提出公司UberNetflixDatabricks
开源时间201920182019
ACID 支持
更新删除
时间旅行
Schema Evolution
流式写入✅ 强⚠️ 一般⚠️ 一般
查询性能⚠️ 一般✅ 好✅ 好
Flink 集成✅ 原生支持✅ 支持❌ 不支持
Spark 集成
Presto 集成⚠️ 一般

3. 多引擎融合

根据不同场景选择合适的存储引擎:

存储引擎适用场景查询延迟写入吞吐更新支持
ClickHouseOLAP 分析、实时大屏秒级极高
Hudi/Iceberg数据湖、流批一体秒级
Doris实时报表、多维分析秒级
Redis实时推荐、风控毫秒级极高
HBase用户画像、明细查询毫秒级

4. 分层解耦

数据分层设计,各层独立:

  • ODS 层:原始数据,保持原貌
  • DWD 层:明细数据,清洗转换
  • DWS 层:汇总数据,轻度聚合
  • ADS 层:应用数据,面向业务

5. 元数据管理

统一的元数据管理:

  • 数据血缘:追踪数据来源和去向
  • 数据质量:监控数据质量指标
  • 数据目录:统一的数据发现和查询
  • 权限管理:细粒度的权限控制

架构选型指南

架构演进对比:

架构提出时间核心思想复杂度成本适用场景代表公司
Lambda2011批流分离超大规模、高准确性Twitter, Netflix
Kappa2014一切皆流实时为主、简化架构LinkedIn, Uber
现代架构2018+流批一体、湖仓一体大规模实时数仓字节跳动, 阿里巴巴

架构选型决策树:

graph TD A["开始选型"] --> B{"数据量级?"} B -->|"PB 级以上"| C{"是否需要
频繁重算?"} B -->|"TB 级"| D{"是否需要
数据湖能力?"} B -->|"GB 级"| E["Kappa 架构
Flink + ClickHouse"] C -->|"是"| F["Lambda 架构
批流分离"] C -->|"否"| G["现代架构
Hudi + Flink"] D -->|"是"| H["现代架构
Hudi/Iceberg + Flink"] D -->|"否"| I["Kappa 架构
Flink + ClickHouse"] style F fill:#ffd43b style E fill:#51cf66 style H fill:#4dabf7 style I fill:#51cf66

详细选型建议:

1. 选择 Lambda 架构的场景:

适合:

  • 数据量超大(PB 级以上)
  • 需要频繁重算历史数据
  • 对数据准确性要求极高(金融、医疗)
  • 业务逻辑复杂,需要批处理支持
  • 有充足的开发和运维资源
  • 批处理和实时处理的业务逻辑差异大

不适合:

  • 团队规模小,资源有限
  • 追求快速迭代和敏捷开发
  • 数据量不大(TB 级以下)
  • 实时性要求极高(毫秒级)

2. 选择 Kappa 架构的场景:

适合:

  • 数据量适中(TB 级)
  • 以实时处理为主,历史数据重算频率低
  • 追求架构简洁性和开发效率
  • 团队熟悉流处理技术
  • 业务逻辑相对简单
  • 可以接受 Kafka 长期存储的成本

不适合:

  • 数据量超大(PB 级以上)
  • 需要频繁重算历史数据
  • 对批处理性能要求高
  • Kafka 存储成本不可接受

3. 选择现代架构的场景:

适合:

  • 数据量大(TB-PB 级)
  • 需要流批一体能力
  • 需要数据湖能力(ACID、更新删除)
  • 追求架构灵活性
  • 团队技术能力强
  • 需要支持多种查询引擎

不适合:

  • 数据量很小(GB 级)
  • 只需要简单的实时处理
  • 团队对新技术不熟悉
  • 预算有限

架构选型评分表:

评估维度LambdaKappa现代架构权重
开发复杂度25420%
运维复杂度25420%
数据准确性54515%
实时性35515%
可扩展性54510%
成本24310%
灵活性33510%
总分3.054.354.50100%

架构演进路径:

graph LR A["初期
单体架构"] -->|"数据量增长"| B["Kappa 架构
流式处理"] B -->|"业务复杂化"| C["Lambda 架构
批流分离"] C -->|"技术成熟"| D["现代架构
流批一体"] style A fill:#a5d8ff style B fill:#74c0fc style C fill:#ffd43b style D fill:#51cf66

典型演进案例:

阶段 1:初期(数据量 < 100GB/天)

  • 架构:单体应用 + MySQL
  • 特点:简单直接,开发快速
  • 问题:数据量增长后性能下降

阶段 2:成长期(数据量 100GB-1TB/天)

  • 架构:Kappa 架构(Flink + Kafka + ClickHouse)
  • 特点:实时处理,架构简洁
  • 问题:历史数据重算困难

阶段 3:成熟期(数据量 1TB-10TB/天)

  • 架构:现代架构(Flink + Hudi + ClickHouse)
  • 特点:流批一体,灵活性高
  • 问题:技术复杂度增加

阶段 4:大规模(数据量 > 10TB/天)

  • 架构:Lambda 架构或现代架构
  • 特点:高可靠、高性能
  • 问题:成本高,运维复杂

不同行业的架构选择:

行业数据量实时性要求推荐架构理由
电商现代架构需要流批一体,支持多种场景
金融超大极高Lambda 架构准确性要求高,需要批处理保证
社交超大现代架构数据量大,实时性要求高
物流Kappa 架构实时为主,架构简洁
游戏Kappa 架构实时性要求高,数据量适中
广告现代架构需要实时竞价,数据量大

架构选型检查清单:

业务需求:

  • 数据量级(GB/TB/PB)
  • 延迟要求(毫秒/秒/分钟)
  • 准确性要求(最终一致/强一致)
  • 是否需要历史数据重算
  • 业务逻辑复杂度

技术能力:

  • 团队规模和技术水平
  • 是否熟悉流处理技术
  • 是否有大数据平台经验
  • 运维能力和资源

资源约束:

  • 预算限制
  • 时间限制
  • 人力资源
  • 基础设施

未来规划:

  • 数据量增长预期
  • 业务扩展计划
  • 技术演进方向
  • 团队建设计划

实时数仓技术栈

数据采集层技术

数据采集层是实时数仓的第一道关口,负责从各种数据源实时采集数据并发送到消息队列。

主流采集技术对比:

技术类型数据源延迟吞吐量可靠性学习成本适用场景
CanalMySQL CDCMySQL秒级MySQL 数据同步
Debezium通用 CDC多种数据库秒级多数据库同步
Flink CDCCDC 框架多种数据库秒级极高极高大规模 CDC
MaxwellMySQL CDCMySQL秒级轻量级 CDC
Flume日志采集日志文件秒级日志采集
Filebeat日志采集日志文件秒级轻量级日志采集
Logstash日志采集多种数据源秒级ELK 栈
DataX批量同步多种数据源分钟级极高离线数据同步

CDC 技术详解:

CDC(Change Data Capture) 是实时数仓的核心技术,通过捕获数据库的变更日志实现数据的实时同步。

CDC 工作原理:

graph LR subgraph "MySQL 数据库" A["业务表"] -->|"DML 操作"| B["Binlog"] end subgraph "CDC 组件" B -->|"读取"| C["Canal/Debezium"] C -->|"解析"| D["变更事件"] D -->|"转换"| E["JSON 消息"] end subgraph "消息队列" E --> F["Kafka Topic"] end subgraph "下游消费" F --> G["Flink 消费"] F --> H["其他消费者"] end style B fill:#ffd43b style C fill:#4dabf7 style F fill:#51cf66

Binlog 事件类型:

事件类型说明包含信息处理方式
INSERT插入操作新数据直接插入
UPDATE更新操作更新前后的数据更新或 Upsert
DELETE删除操作删除的数据删除或软删除
DDL表结构变更DDL 语句Schema 变更

Canal 详解:

Canal 是阿里巴巴开源的 MySQL Binlog 增量订阅和消费组件。

Canal 架构:

graph TB subgraph "MySQL Master" A["Binlog"] end subgraph "Canal Server" B["Canal Instance"] C["Parser
解析器"] D["Sink
输出"] end subgraph "下游" E["Kafka"] F["RocketMQ"] G["Canal Client"] end A -->|"伪装成 Slave"| B B --> C C --> D D --> E D --> F D --> G style B fill:#4dabf7 style D fill:#51cf66

Canal 配置示例:

# canal.properties
canal.id = 1
canal.ip = 192.168.1.100
canal.port = 11111
canal.zkServers = localhost:2181

# instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8

# 订阅的数据库和表
canal.instance.filter.regex=test_db\\..*
canal.instance.filter.black.regex=test_db\\.tmp_.*

# Kafka 配置
canal.mq.servers=localhost:9092
canal.mq.topic=canal_topic
canal.mq.partition=0

Debezium 详解:

Debezium 是 RedHat 开源的分布式 CDC 平台,支持多种数据库。

Debezium 支持的数据库:

  • MySQL
  • PostgreSQL
  • MongoDB
  • SQL Server
  • Oracle
  • Db2
  • Cassandra

Debezium 架构:

graph TB subgraph "数据库" A1["MySQL"] A2["PostgreSQL"] A3["MongoDB"] end subgraph "Kafka Connect" B["Debezium Connector"] C["Converter"] end subgraph "Kafka" D["Topic 1"] E["Topic 2"] F["Topic 3"] end A1 -->|"Binlog"| B A2 -->|"WAL"| B A3 -->|"Oplog"| B B --> C C --> D C --> E C --> F style B fill:#4dabf7 style C fill:#74c0fc

Debezium 配置示例:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "database.include.list": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}

Flink CDC 详解:

Flink CDC 是基于 Flink 的 CDC 框架,提供了更好的性能和易用性。

Flink CDC 优势:

  • 无需部署额外组件:直接在 Flink 中使用
  • 性能更好:原生 Flink 集成,减少序列化开销
  • 支持全量 + 增量:自动处理全量和增量数据
  • Exactly-Once 保证:利用 Flink 的 Checkpoint 机制
  • 支持多种数据库:MySQL、PostgreSQL、MongoDB、Oracle 等

Flink CDC 使用示例:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;

public class FlinkCDCExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置 MySQL CDC Source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("inventory")  // 监控的数据库
            .tableList("inventory.orders", "inventory.products")  // 监控的表
            .username("root")
            .password("password")
            .startupOptions(StartupOptions.initial())  // 全量 + 增量
            .deserializer(new JsonDebeziumDeserializationSchema())  // JSON 格式
            .build();
        
        // 创建数据流
        DataStreamSource<String> cdcStream = env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
        
        // 处理 CDC 数据
        cdcStream
            .map(new ParseCDCFunction())
            .keyBy(Order::getOrderId)
            .process(new OrderProcessFunction())
            .addSink(new KafkaSink<>(...));
        
        env.execute("Flink CDC Job");
    }
}

// 解析 CDC 事件
public class ParseCDCFunction implements MapFunction<String, Order> {
    @Override
    public Order map(String value) throws Exception {
        JSONObject json = JSON.parseObject(value);
        String op = json.getString("op");  // c/u/d/r
        
        switch (op) {
            case "c":  // CREATE (INSERT)
            case "r":  // READ (全量同步)
                return parseOrder(json.getJSONObject("after"));
            case "u":  // UPDATE
                return parseOrder(json.getJSONObject("after"));
            case "d":  // DELETE
                Order order = parseOrder(json.getJSONObject("before"));
                order.setDeleted(true);
                return order;
            default:
                return null;
        }
    }
}

CDC 技术选型建议:

场景推荐技术理由
只有 MySQLCanal轻量级,易部署,性能好
多种数据库Debezium支持多种数据库,生态完善
大规模 CDCFlink CDC性能最好,与 Flink 集成好
轻量级场景Maxwell最轻量,配置简单

消息队列层技术

消息队列是实时数仓的数据总线,负责数据的缓冲、解耦和分发。

主流消息队列对比:

消息队列吞吐量延迟持久化顺序性可靠性生态适用场景
Kafka极高毫秒级极好大数据、日志
Pulsar极高毫秒级极高云原生、多租户
RocketMQ毫秒级电商、金融
RabbitMQ毫秒级⚠️微服务
Redis Streams微秒级⚠️轻量级场景

Kafka 核心特性:

1. 高吞吐量

  • 单机百万级 TPS
  • 顺序写磁盘,性能接近内存
  • 零拷贝技术(Zero-Copy)
  • 批量发送和压缩

2. 低延迟

  • 端到端延迟 < 10ms
  • 页缓存(Page Cache)加速读取
  • 异步发送

3. 持久化

  • 消息持久化到磁盘
  • 支持数据备份(副本机制)
  • 可配置保留时间

4. 可扩展

  • 水平扩展,支持海量数据
  • 分区机制,并行处理
  • 消费者组,负载均衡

5. 容错性

  • 副本机制(Replication)
  • ISR(In-Sync Replicas)
  • 自动故障转移

Kafka 架构:

graph TB subgraph "Producer" A1["Producer 1"] A2["Producer 2"] end subgraph "Kafka Cluster" B1["Broker 1
Leader"] B2["Broker 2
Follower"] B3["Broker 3
Follower"] C1["Topic: orders
Partition 0"] C2["Topic: orders
Partition 1"] C3["Topic: orders
Partition 2"] end subgraph "Consumer Group" D1["Consumer 1"] D2["Consumer 2"] D3["Consumer 3"] end subgraph "ZooKeeper" E["元数据管理"] end A1 --> B1 A2 --> B1 B1 --> C1 B1 --> C2 B1 --> C3 B2 -.->|"副本"| C1 B3 -.->|"副本"| C2 C1 --> D1 C2 --> D2 C3 --> D3 B1 -.-> E B2 -.-> E B3 -.-> E style B1 fill:#4dabf7 style E fill:#ffd43b

Kafka 核心概念:

1. Topic(主题)

  • 消息的逻辑分类
  • 类似数据库的表
  • 可以有多个分区

2. Partition(分区)

  • Topic 的物理分片
  • 保证分区内消息有序
  • 并行处理的基本单位

3. Replica(副本)

  • 数据备份,保证可靠性
  • Leader 副本:处理读写请求
  • Follower 副本:同步数据,备份

4. Producer(生产者)

  • 发送消息到 Topic
  • 可以指定分区策略
  • 支持批量发送和压缩

5. Consumer(消费者)

  • 从 Topic 消费消息
  • 消费者组实现负载均衡
  • 支持手动和自动提交 Offset

6. Consumer Group(消费者组)

  • 多个消费者组成一个组
  • 同一个组内的消费者不会重复消费
  • 不同组之间独立消费

Kafka 在实时数仓中的作用:

graph TB subgraph "数据源" A1["MySQL CDC"] A2["日志采集"] A3["埋点数据"] end subgraph "Kafka 集群" B1["Topic: ods_order
3 分区"] B2["Topic: ods_user
3 分区"] B3["Topic: ods_log
6 分区"] end subgraph "消费者" C1["Flink Job 1
实时清洗"] C2["Flink Job 2
实时聚合"] C3["Flink Job 3
实时关联"] C4["离线同步"] end A1 --> B1 A2 --> B2 A3 --> B3 B1 --> C1 B1 --> C2 B2 --> C3 B3 --> C1 B1 --> C4 style B1 fill:#ffd43b style B2 fill:#ffd43b style B3 fill:#ffd43b style C1 fill:#4dabf7

Kafka 关键配置:

# Broker 配置
# 网络线程数
num.network.threads=8
# IO 线程数
num.io.threads=16
# Socket 缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
# 保留时间(7 天)
log.retention.hours=168
# 日志段大小(1GB)
log.segment.bytes=1073741824
# 检查间隔
log.retention.check.interval.ms=300000

# 副本配置
# 默认副本数
default.replication.factor=3
# 最小同步副本数
min.insync.replicas=2
# 副本延迟阈值
replica.lag.time.max.ms=10000

# 性能优化
# 压缩类型
compression.type=lz4
# 批量大小
batch.size=16384
# 等待时间
linger.ms=10
# 缓冲区大小
buffer.memory=33554432

Producer 配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 可靠性配置
props.put("acks", "all");  // 所有副本确认
props.put("retries", 3);  // 重试次数
props.put("max.in.flight.requests.per.connection", 1);  // 保证顺序

// 性能配置
props.put("compression.type", "lz4");  // 压缩
props.put("batch.size", 16384);  // 批量大小
props.put("linger.ms", 10);  // 等待时间
props.put("buffer.memory", 33554432);  // 缓冲区

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 异步发送
producer.send(new ProducerRecord<>("topic", key, value), (metadata, exception) -> {
    if (exception != null) {
        log.error("Send failed", exception);
    } else {
        log.info("Sent to partition {} offset {}", 
            metadata.partition(), metadata.offset());
    }
});

Consumer 配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 消费配置
props.put("enable.auto.commit", false);  // 手动提交
props.put("auto.offset.reset", "earliest");  // 从最早开始
props.put("max.poll.records", 500);  // 单次拉取记录数
props.put("fetch.min.bytes", 1024);  // 最小拉取字节数
props.put("fetch.max.wait.ms", 500);  // 最大等待时间

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        processRecord(record);
    }
    // 手动提交
    consumer.commitSync();
}

Kafka 分区策略:

策略说明适用场景
轮询(Round-Robin)依次分配到各分区负载均衡
哈希(Hash)根据 Key 的哈希值分配保证相同 Key 到同一分区
自定义自定义分区逻辑特殊业务需求
// 自定义分区器
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 根据业务逻辑自定义分区
        if (key == null) {
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }
        
        // 例如:VIP 用户到特定分区
        if (isVIP(key.toString())) {
            return 0;  // VIP 专属分区
        }
        
        // 普通用户哈希分区
        return Math.abs(key.hashCode()) % (numPartitions - 1) + 1;
    }
}

实时计算层技术

实时计算层是实时数仓的核心,负责数据的清洗、转换、聚合和关联。

主流实时计算引擎对比:

引擎延迟吞吐状态管理容错机制窗口支持生态学习成本适用场景
Flink毫秒级极高✅ 强大Checkpoint✅ 丰富极好复杂实时计算
Spark Streaming秒级⚠️ 一般WAL⚠️ 一般极好准实时计算
Storm毫秒级❌ 弱ACK 机制❌ 弱简单实时计算
Kafka Streams毫秒级✅ 强Changelog✅ 好轻量级流处理
Samza毫秒级✅ 强Checkpoint✅ 好LinkedIn 内部

Flink 核心优势:

1. 真正的流处理

Flink 是真正的流处理引擎,而非微批处理:

  • 事件驱动:逐条处理数据,而非批量处理
  • 低延迟:毫秒级延迟
  • 事件时间:支持事件时间和处理时间
  • 乱序处理:Watermark 机制处理乱序数据

2. 强大的状态管理

Flink 提供了完善的状态管理能力:

  • 多种状态类型:ValueState、ListState、MapState、ReducingState、AggregatingState
  • 状态后端:Memory、FsStateBackend、RocksDBStateBackend
  • 状态 TTL:自动清理过期状态
  • 状态可扩展:支持状态的重新分布

3. Exactly-Once 语义

Flink 通过 Checkpoint 机制保证端到端 Exactly-Once:

  • 两阶段提交:保证 Sink 的 Exactly-Once
  • Checkpoint:定期保存状态快照
  • 故障恢复:从最近的 Checkpoint 恢复

4. 丰富的窗口操作

Flink 支持多种窗口类型:

  • 滚动窗口(Tumbling Window):固定大小,不重叠
  • 滑动窗口(Sliding Window):固定大小,可重叠
  • 会话窗口(Session Window):基于活动间隔
  • 全局窗口(Global Window):自定义触发器

5. 灵活的时间语义

Flink 支持三种时间语义:

  • 事件时间(Event Time):数据产生的时间
  • 处理时间(Processing Time):数据被处理的时间
  • 摄入时间(Ingestion Time):数据进入 Flink 的时间

Flink 架构:

graph TB subgraph "Client" A["Flink Job"] end subgraph "JobManager" B["Dispatcher"] C["ResourceManager"] D["JobMaster"] end subgraph "TaskManager 1" E1["Task Slot 1"] E2["Task Slot 2"] end subgraph "TaskManager 2" F1["Task Slot 1"] F2["Task Slot 2"] end subgraph "State Backend" G["RocksDB/HDFS"] end A -->|"提交"| B B --> C B --> D D -->|"调度"| E1 D -->|"调度"| E2 D -->|"调度"| F1 D -->|"调度"| F2 E1 -.->|"状态"| G E2 -.->|"状态"| G F1 -.->|"状态"| G F2 -.->|"状态"| G style D fill:#4dabf7 style G fill:#51cf66

Flink 核心概念:

1. DataStream API

DataStream API 是 Flink 的核心 API,用于流式数据处理:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Source
DataStream<String> text = env.readTextFile("input.txt");

// Transformation
DataStream<Tuple2<String, Integer>> counts = text
    .flatMap(new Tokenizer())
    .keyBy(value -> value.f0)
    .sum(1);

// Sink
counts.print();

env.execute("WordCount");

2. 算子(Operator)

Flink 提供了丰富的算子:

算子类型算子说明示例
SourceaddSource数据源Kafka、Socket、File
Transformationmap一对一转换字段映射
flatMap一对多转换分词
filter过滤条件过滤
keyBy分组按 Key 分组
reduce聚合累加、求和
aggregate聚合自定义聚合
window窗口时间窗口、计数窗口
join关联流与流关联
union合并多流合并
SinkaddSink数据输出Kafka、JDBC、File

3. 窗口(Window)

窗口是流处理中的核心概念,用于将无界流切分成有界流:

graph TB subgraph "滚动窗口 Tumbling" A1["[0-5s]"] --> A2["[5-10s]"] --> A3["[10-15s]"] end subgraph "滑动窗口 Sliding" B1["[0-5s]"] --> B2["[2-7s]"] --> B3["[4-9s]"] --> B4["[6-11s]"] end subgraph "会话窗口 Session" C1["活动1"] -.->|"gap > 5s"| C2["活动2"] -.->|"gap > 5s"| C3["活动3"] end style A1 fill:#a5d8ff style A2 fill:#74c0fc style A3 fill:#4dabf7 style B1 fill:#ffd43b style B2 fill:#ffc078 style B3 fill:#ff922b style B4 fill:#fd7e14

窗口示例:

// 滚动窗口:每 5 分钟统计一次
DataStream<OrderStats> tumblingStats = orders
    .keyBy(Order::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderAggregateFunction());

// 滑动窗口:最近 10 分钟,每 5 分钟更新一次
DataStream<OrderStats> slidingStats = orders
    .keyBy(Order::getUserId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .aggregate(new OrderAggregateFunction());

// 会话窗口:30 分钟无活动则结束会话
DataStream<SessionStats> sessionStats = behaviors
    .keyBy(Behavior::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregateFunction());

// 计数窗口:每 100 条数据触发一次
DataStream<OrderStats> countStats = orders
    .keyBy(Order::getUserId)
    .countWindow(100)
    .aggregate(new OrderAggregateFunction());

4. Watermark

Watermark 用于处理乱序数据:

// 设置 Watermark 策略
WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))  // 容忍 10 秒乱序
    .withTimestampAssigner((order, timestamp) -> order.getCreateTime())  // 提取时间戳
    .withIdleness(Duration.ofMinutes(1));  // 1 分钟无数据则推进 Watermark

DataStream<Order> watermarkedStream = stream
    .assignTimestampsAndWatermarks(watermarkStrategy);

Watermark 工作原理:

sequenceDiagram participant Source as 数据源 participant Watermark as Watermark participant Window as 窗口 Source->>Watermark: Event(t=1) Watermark->>Window: Watermark(t=0) Source->>Watermark: Event(t=3) Watermark->>Window: Watermark(t=2) Source->>Watermark: Event(t=2, 乱序) Note over Watermark: t=2 < Watermark(t=2)
但在容忍范围内 Watermark->>Window: 接受数据 Source->>Watermark: Event(t=5) Watermark->>Window: Watermark(t=4) Note over Window: 触发 [0-5s] 窗口 Source->>Watermark: Event(t=1, 迟到) Note over Watermark: t=1 < Watermark(t=4)
超出容忍范围 Watermark->>Window: 丢弃或侧输出

5. 状态(State)

Flink 支持多种状态类型:

// ValueState:单值状态
ValueStateDescriptor<Long> descriptor = 
    new ValueStateDescriptor<>("count", Long.class);
ValueState<Long> countState = getRuntimeContext().getState(descriptor);

Long count = countState.value();
countState.update(count == null ? 1 : count + 1);

// ListState:列表状态
ListStateDescriptor<String> listDescriptor = 
    new ListStateDescriptor<>("list", String.class);
ListState<String> listState = getRuntimeContext().getListState(listDescriptor);

listState.add("item");
for (String item : listState.get()) {
    // 处理列表项
}

// MapState:映射状态
MapStateDescriptor<String, Long> mapDescriptor = 
    new MapStateDescriptor<>("map", String.class, Long.class);
MapState<String, Long> mapState = getRuntimeContext().getMapState(mapDescriptor);

mapState.put("key", 100L);
Long value = mapState.get("key");

// ReducingState:聚合状态
ReducingStateDescriptor<Long> reducingDescriptor = 
    new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class);
ReducingState<Long> reducingState = getRuntimeContext().getReducingState(reducingDescriptor);

reducingState.add(10L);
Long sum = reducingState.get();

6. Checkpoint

Checkpoint 是 Flink 的容错机制:

// Checkpoint 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用 Checkpoint
env.enableCheckpointing(60000);  // 60 秒

// Checkpoint 模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Checkpoint 超时
env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10 分钟

// 最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);  // 30 秒

// 最大并发 Checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 容忍失败次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

// 外部化 Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");

Checkpoint 流程:

sequenceDiagram participant JM as JobManager participant TM1 as TaskManager 1 participant TM2 as TaskManager 2 participant Storage as 存储 JM->>TM1: 触发 Checkpoint JM->>TM2: 触发 Checkpoint TM1->>TM1: 保存状态快照 TM2->>TM2: 保存状态快照 TM1->>Storage: 写入状态 TM2->>Storage: 写入状态 TM1->>JM: Checkpoint 完成 TM2->>JM: Checkpoint 完成 JM->>JM: 标记 Checkpoint 成功

存储层技术

存储层负责数据的持久化存储和快速查询,是实时数仓的数据服务层。

主流存储引擎对比:

存储引擎类型查询延迟写入吞吐更新支持并发查询压缩比适用场景
ClickHouse列式 OLAP秒级极高10:1实时分析、大屏
Hudi数据湖秒级5:1流批一体、CDC
Iceberg数据湖秒级5:1大规模数据湖
DorisMPP OLAP秒级极高8:1实时报表
RedisKV 内存毫秒级极高极高1:1实时推荐、风控
HBaseKV 列式毫秒级3:1用户画像、明细
Elasticsearch搜索引擎秒级3:1日志分析、搜索

ClickHouse 详解:

ClickHouse 是俄罗斯 Yandex 开源的列式 OLAP 数据库,以极致的查询性能著称。

ClickHouse 核心特性:

1. 列式存储

数据按列存储,而非按行:

  • 压缩比高:同一列数据类型相同,压缩效果好
  • 查询快:只读取需要的列,减少 IO
  • 适合分析:OLAP 查询通常只涉及部分列

2. 向量化执行

使用 SIMD 指令批量处理数据:

  • CPU 利用率高:充分利用 CPU 缓存
  • 性能提升:相比逐行处理提升 10 倍以上

3. 分布式架构

支持水平扩展:

  • 分片(Shard):数据分布到多个节点
  • 副本(Replica):数据备份,保证可靠性
  • 并行查询:多节点并行处理

4. 实时写入

支持高并发实时写入:

  • MergeTree 引擎:后台异步合并数据
  • 批量写入:建议 10000+ 行批量写入
  • 分区管理:按时间分区,便于管理

ClickHouse 表引擎:

引擎特点适用场景示例
MergeTree最常用,支持主键、分区、采样通用场景订单明细表
ReplacingMergeTree去重,保留最新数据CDC 同步用户信息表
SummingMergeTree自动聚合预聚合订单统计表
AggregatingMergeTree复杂聚合复杂指标用户行为统计
CollapsingMergeTree支持更新删除状态变更订单状态表
VersionedCollapsingMergeTree带版本的更新删除复杂状态库存变更表

ClickHouse 表设计示例:

-- MergeTree:订单明细表
CREATE TABLE orders (
    order_id String,
    user_id String,
    product_id String,
    amount Decimal(18, 2),
    status String,
    province String,
    city String,
    create_time DateTime,
    update_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)  -- 按月分区
ORDER BY (user_id, order_id, create_time)  -- 排序键
SETTINGS index_granularity = 8192;  -- 索引粒度

-- ReplacingMergeTree:用户信息表(去重)
CREATE TABLE user_profile (
    user_id String,
    name String,
    age UInt8,
    gender String,
    province String,
    city String,
    update_time DateTime
) ENGINE = ReplacingMergeTree(update_time)  -- 按 update_time 去重
ORDER BY user_id;

-- SummingMergeTree:订单统计表(自动聚合)
CREATE TABLE order_stats (
    date Date,
    province String,
    category String,
    order_count UInt64,
    total_amount Decimal(18, 2)
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province, category);

-- AggregatingMergeTree:用户行为统计(复杂聚合)
CREATE TABLE user_behavior_stats (
    date Date,
    user_id String,
    pv SimpleAggregateFunction(sum, UInt64),  -- 简单聚合
    uv AggregateFunction(uniq, String)  -- 复杂聚合(去重计数)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, user_id);

ClickHouse 分区设计:

-- 按月分区(推荐)
PARTITION BY toYYYYMM(create_time)

-- 按天分区(数据量大时)
PARTITION BY toYYYYMMDD(create_time)

-- 按小时分区(实时场景)
PARTITION BY toYYYYMMDDhh(create_time)

-- 多级分区
PARTITION BY (toYYYYMM(create_time), province)

-- 查看分区
SELECT 
    partition,
    name,
    rows,
    bytes_on_disk,
    formatReadableSize(bytes_on_disk) AS size
FROM system.parts
WHERE table = 'orders' AND active
ORDER BY partition DESC;

-- 删除旧分区
ALTER TABLE orders DROP PARTITION '202401';

-- 优化分区(合并小文件)
OPTIMIZE TABLE orders PARTITION '202402';

ClickHouse 索引优化:

-- 主键索引(ORDER BY)
ORDER BY (user_id, create_time)

-- 跳数索引(Skip Index)
-- minmax:数值范围查询
ALTER TABLE orders ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;

-- set:低基数字段
ALTER TABLE orders ADD INDEX idx_province province TYPE set(100) GRANULARITY 4;

-- bloom_filter:等值查询
ALTER TABLE orders ADD INDEX idx_status status TYPE bloom_filter GRANULARITY 4;

-- ngrambf_v1:模糊查询
ALTER TABLE orders ADD INDEX idx_user_name user_name TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4;

-- 物化列(Materialized Column)
ALTER TABLE orders ADD COLUMN order_date Date MATERIALIZED toDate(create_time);

-- 物化视图(Materialized View)
CREATE MATERIALIZED VIEW order_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province)
AS SELECT
    toDate(create_time) AS date,
    province,
    count() AS order_count,
    sum(amount) AS total_amount
FROM orders
GROUP BY date, province;

ClickHouse 查询优化:

-- 使用 PREWHERE 过滤(比 WHERE 快)
SELECT *
FROM orders
PREWHERE create_time >= '2024-01-01'  -- 先过滤
WHERE amount > 100;  -- 再过滤

-- 避免 SELECT *
SELECT order_id, amount, create_time
FROM orders;

-- 使用 FINAL 去重(ReplacingMergeTree)
SELECT *
FROM user_profile
FINAL
WHERE user_id = 'user123';

-- 分布式查询优化
SELECT province, sum(amount)
FROM orders
WHERE create_time >= today()
GROUP BY province
SETTINGS distributed_aggregation_memory_efficient = 1;

-- 使用物化视图
SELECT date, province, order_count, total_amount
FROM order_stats_mv
WHERE date >= '2024-01-01';

-- 并行查询
SELECT *
FROM orders
SETTINGS max_threads = 8;

ClickHouse 写入优化:

-- 批量写入(推荐 10000-100000 行)
INSERT INTO orders VALUES
    ('order1', 'user1', 'product1', 100.00, 'PAID', '北京', '北京', '2024-01-01 10:00:00', '2024-01-01 10:00:00'),
    ('order2', 'user2', 'product2', 200.00, 'PAID', '上海', '上海', '2024-01-01 10:01:00', '2024-01-01 10:01:00'),
    -- ... 更多数据
    ;

-- 异步写入
INSERT INTO orders SETTINGS async_insert = 1, wait_for_async_insert = 0
VALUES (...);

-- 并行写入(多个客户端同时写入不同分区)

-- 写入性能监控
SELECT
    table,
    sum(rows) AS total_rows,
    sum(bytes) AS total_bytes,
    formatReadableSize(total_bytes) AS size,
    count() AS parts_count
FROM system.parts
WHERE table = 'orders' AND active
GROUP BY table;

数据湖技术

数据湖技术是现代实时数仓的重要组成部分,提供了流批一体、ACID 事务、Schema Evolution 等能力。

Hudi vs Iceberg vs Delta Lake:

特性HudiIcebergDelta Lake
提出公司UberNetflixDatabricks
开源时间201920182019
ACID 支持
更新删除
时间旅行
Schema Evolution
流式写入✅ 强⚠️ 一般⚠️ 一般
查询性能⚠️ 一般✅ 好✅ 好
Flink 集成✅ 原生支持✅ 支持❌ 不支持
Spark 集成
Presto 集成⚠️ 一般
小文件问题✅ 自动合并⚠️ 需手动⚠️ 需手动
增量查询✅ 强⚠️ 一般⚠️ 一般

Hudi 核心概念:

1. 表类型

Hudi 支持两种表类型:

graph TB subgraph "Copy On Write (COW)" A1["写时合并"] A2["查询快"] A3["写入慢"] end subgraph "Merge On Read (MOR)" B1["读时合并"] B2["写入快"] B3["查询慢"] end A1 --> C["适合读多写少"] B1 --> D["适合写多读少"] style A1 fill:#4dabf7 style B1 fill:#ffd43b style C fill:#51cf66 style D fill:#51cf66
表类型写入方式查询方式适用场景
COW写时合并,生成新的 Parquet 文件直接查询 Parquet 文件读多写少
MOR写入 Delta Log,异步合并合并 Base + Delta写多读少

2. 查询类型

Hudi 支持三种查询类型:

查询类型说明适用场景
Snapshot Query快照查询,查询最新数据实时查询
Incremental Query增量查询,查询变更数据CDC、增量同步
Read Optimized Query只查询 Base 文件,性能最好批处理

Hudi 使用示例:

// Flink 写入 Hudi
import org.apache.hudi.configuration.FlinkOptions

val hoodieOptions = Map(
  FlinkOptions.PATH.key() -> "hdfs://namenode:9000/hudi/orders",
  FlinkOptions.TABLE_NAME.key() -> "orders",
  FlinkOptions.TABLE_TYPE.key() -> "MERGE_ON_READ",  // 表类型
  FlinkOptions.PRECOMBINE_FIELD.key() -> "update_time",  // 去重字段
  FlinkOptions.RECORD_KEY_FIELD.key() -> "order_id",  // 主键
  FlinkOptions.PARTITION_PATH_FIELD.key() -> "dt",  // 分区字段
  FlinkOptions.OPERATION.key() -> "upsert",  // 操作类型
  FlinkOptions.WRITE_TASKS.key() -> "4",  // 写入并行度
  FlinkOptions.COMPACTION_TASKS.key() -> "2"  // 压缩并行度
)

dataStream
  .addSink(
    HoodieFlinkStreamer.builder()
      .withOptions(hoodieOptions)
      .build()
  )

// Spark 读取 Hudi
val df = spark.read
  .format("hudi")
  .load("hdfs://namenode:9000/hudi/orders")

// 增量查询
val incrementalDF = spark.read
  .format("hudi")
  .option("hoodie.datasource.query.type", "incremental")
  .option("hoodie.datasource.read.begin.instanttime", "20240101000000")
  .load("hdfs://namenode:9000/hudi/orders")

// 时间旅行
val historicalDF = spark.read
  .format("hudi")
  .option("as.of.instant", "20240101000000")
  .load("hdfs://namenode:9000/hudi/orders")

Iceberg 核心特性:

1. 隐藏分区

Iceberg 支持隐藏分区,用户无需关心分区字段:

-- 创建表时定义分区
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(18, 2),
    create_time TIMESTAMP
) USING iceberg
PARTITIONED BY (days(create_time));  -- 按天分区

-- 查询时无需指定分区
SELECT * FROM orders WHERE create_time >= '2024-01-01';

2. Schema Evolution

Iceberg 支持灵活的 Schema 变更:

-- 添加列
ALTER TABLE orders ADD COLUMN province STRING;

-- 删除列
ALTER TABLE orders DROP COLUMN province;

-- 重命名列
ALTER TABLE orders RENAME COLUMN amount TO order_amount;

-- 修改列类型
ALTER TABLE orders ALTER COLUMN age TYPE BIGINT;

3. 时间旅行

Iceberg 支持查询历史版本:

-- 查询指定时间点的数据
SELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-01-01 00:00:00';

-- 查询指定快照的数据
SELECT * FROM orders FOR SYSTEM_VERSION AS OF 123456789;

-- 查看快照历史
SELECT * FROM orders.snapshots;

存储选型建议:

graph TD A["存储选型"] --> B{"查询延迟
要求?"} B -->|"毫秒级"| C["Redis/HBase"] B -->|"秒级"| D{"是否需要
更新删除?"} D -->|"是"| E{"数据规模?"} D -->|"否"| F["ClickHouse"] E -->|"TB 级"| G["Hudi/Doris"] E -->|"PB 级"| H["Iceberg"] style C fill:#ff6b6b style F fill:#51cf66 style G fill:#4dabf7 style H fill:#ffd43b
场景推荐存储理由
实时大屏ClickHouse查询快,支持复杂分析
实时报表Doris/ClickHouse支持更新,查询性能好
实时推荐Redis/HBase毫秒级查询,支持高并发
实时风控Redis极低延迟,支持复杂数据结构
CDC 同步Hudi/Iceberg支持更新删除,流批一体
数据湖Iceberg大规模数据,生态完善
日志分析Elasticsearch全文搜索,日志分析

实时数仓分层设计

实时数仓的分层设计借鉴了离线数仓的分层思想,通过分层解耦,提高数据的可维护性和复用性。

实时数仓分层架构:

graph TB subgraph "数据源层" A1["业务数据库"] A2["日志数据"] A3["埋点数据"] end subgraph "ODS 层 (操作数据层)" B1["ods_order
原始订单"] B2["ods_user
原始用户"] B3["ods_log
原始日志"] end subgraph "DWD 层 (明细数据层)" C1["dwd_order
清洗后订单"] C2["dwd_user
清洗后用户"] C3["dwd_behavior
用户行为"] end subgraph "DWS 层 (汇总数据层)" D1["dws_order_1min
订单分钟汇总"] D2["dws_user_1hour
用户小时汇总"] D3["dws_traffic_1day
流量天汇总"] end subgraph "ADS 层 (应用数据层)" E1["ads_realtime_gmv
实时 GMV"] E2["ads_user_portrait
用户画像"] E3["ads_traffic_monitor
流量监控"] end A1 --> B1 A2 --> B2 A3 --> B3 B1 --> C1 B2 --> C2 B3 --> C3 C1 --> D1 C2 --> D2 C3 --> D3 D1 --> E1 D2 --> E2 D3 --> E3 style B1 fill:#a5d8ff style B2 fill:#a5d8ff style B3 fill:#a5d8ff style C1 fill:#74c0fc style C2 fill:#74c0fc style C3 fill:#74c0fc style D1 fill:#4dabf7 style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#51cf66 style E3 fill:#51cf66

ODS 层设计

ODS(Operational Data Store)层是实时数仓的第一层,存储从数据源采集的原始数据。

核心职责:

  • 数据接入:从各种数据源实时采集数据
  • 数据存储:保持数据原貌,不做任何处理
  • 数据分发:为下游提供统一的数据源
  • 数据备份:作为数据的第一道防线

设计原则:

  1. 保持原貌

    • 不做任何业务逻辑处理
    • 保留所有字段,包括系统字段
    • 记录数据采集时间
    • 保留原始数据类型
  2. 统一格式

    • 统一字段命名规范(驼峰或下划线)
    • 统一数据类型(String/Int/Decimal)
    • 统一时间格式(ISO 8601)
    • 统一编码格式(UTF-8)
  3. 分区设计

    • 按天或按小时分区
    • 便于数据回溯和清理
    • 支持增量处理
  4. 元数据管理

    • 记录数据来源
    • 记录采集时间
    • 记录数据版本

ODS 层表设计示例:

-- Kafka Topic: ods_order_topic
-- ClickHouse 表结构
CREATE TABLE ods_order (
    -- 业务字段
    order_id String,
    user_id String,
    product_id String,
    amount Decimal(18, 2),
    status String,
    province String,
    city String,
    create_time DateTime,
    update_time DateTime,
    
    -- 元数据字段
    source_db String,              -- 来源数据库
    source_table String,           -- 来源表
    binlog_file String,            -- Binlog 文件
    binlog_pos UInt64,             -- Binlog 位置
    event_type String,             -- 事件类型 (INSERT/UPDATE/DELETE)
    event_time DateTime,           -- 事件时间
    
    -- 系统字段
    etl_time DateTime DEFAULT now(),  -- 数据采集时间
    etl_date Date DEFAULT today()     -- 分区字段
) ENGINE = MergeTree()
PARTITION BY etl_date
ORDER BY (order_id, create_time);

Flink 写入 ODS 层:

public class OdsWriterJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Flink CDC 读取 MySQL
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("ecommerce")
            .tableList("ecommerce.orders")
            .username("root")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
        
        DataStream<String> cdcStream = env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
        
        // 解析 CDC 事件,添加元数据
        DataStream<OdsOrder> odsStream = cdcStream
            .map(new RichMapFunction<String, OdsOrder>() {
                @Override
                public OdsOrder map(String value) throws Exception {
                    JSONObject json = JSON.parseObject(value);
                    JSONObject source = json.getJSONObject("source");
                    JSONObject after = json.getJSONObject("after");
                    
                    OdsOrder order = new OdsOrder();
                    // 业务字段
                    order.setOrderId(after.getString("order_id"));
                    order.setUserId(after.getString("user_id"));
                    order.setAmount(after.getBigDecimal("amount"));
                    // ... 其他字段
                    
                    // 元数据字段
                    order.setSourceDb(source.getString("db"));
                    order.setSourceTable(source.getString("table"));
                    order.setBinlogFile(source.getString("file"));
                    order.setBinlogPos(source.getLong("pos"));
                    order.setEventType(json.getString("op"));
                    order.setEventTime(source.getLong("ts_ms"));
                    order.setEtlTime(System.currentTimeMillis());
                    
                    return order;
                }
            });
        
        // 写入 Kafka ODS Topic
        odsStream.sinkTo(
            KafkaSink.<OdsOrder>builder()
                .setBootstrapServers("localhost:9092")
                .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                        .setTopic("ods_order_topic")
                        .setValueSerializationSchema(new JsonSerializationSchema<>())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build()
        );
        
        env.execute("ODS Writer Job");
    }
}

DWD 层设计

DWD(Data Warehouse Detail)层是实时数仓的明细层,对 ODS 层数据进行清洗、转换和关联。

核心职责:

  • 数据清洗:过滤脏数据、异常数据
  • 数据转换:字段映射、类型转换、格式统一
  • 数据关联:维度关联、事实表关联
  • 数据拆分:宽表拆分、主题域划分
  • 数据标准化:统一业务口径

设计原则:

  1. 业务含义清晰

    • 字段命名符合业务语义
    • 添加必要的业务字段
    • 保留明细粒度
    • 便于业务理解
  2. 数据质量保证

    • 数据去重
    • 数据校验
    • 异常数据处理
    • 数据完整性检查
  3. 维度关联

    • 关联维度表补充信息
    • 使用维度 ID 而非维度值
    • 缓慢变化维度处理
  4. 性能优化

    • 合理设置分区
    • 优化关联方式
    • 控制数据量

DWD 层表设计示例:

-- ClickHouse DWD 层表
CREATE TABLE dwd_order (
    -- 订单信息
    order_id String,
    order_no String,
    order_type String,
    
    -- 用户信息(关联维度)
    user_id String,
    user_name String,
    user_level String,
    user_type String,
    
    -- 商品信息(关联维度)
    product_id String,
    product_name String,
    category_id String,
    category_name String,
    brand_id String,
    brand_name String,
    
    -- 金额信息
    original_amount Decimal(18, 2),
    discount_amount Decimal(18, 2),
    final_amount Decimal(18, 2),
    
    -- 状态信息
    status String,
    pay_type String,
    
    -- 地域信息
    province String,
    city String,
    district String,
    
    -- 时间信息
    create_time DateTime,
    pay_time DateTime,
    update_time DateTime,
    
    -- 系统字段
    etl_time DateTime DEFAULT now(),
    etl_date Date DEFAULT today()
) ENGINE = MergeTree()
PARTITION BY etl_date
ORDER BY (user_id, order_id, create_time);

Flink DWD 层处理逻辑:

public class DwdProcessJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取 ODS 层数据
        DataStream<OdsOrder> odsStream = env
            .addSource(new FlinkKafkaConsumer<>("ods_order_topic", ...));
        
        // 读取用户维度表(广播流)
        DataStream<DimUser> userStream = env
            .addSource(new FlinkKafkaConsumer<>("dim_user_topic", ...));
        
        MapStateDescriptor<String, DimUser> userStateDescriptor = 
            new MapStateDescriptor<>("user-broadcast-state", String.class, DimUser.class);
        
        BroadcastStream<DimUser> userBroadcast = userStream.broadcast(userStateDescriptor);
        
        // 关联维度表
        DataStream<DwdOrder> dwdStream = odsStream
            .connect(userBroadcast)
            .process(new BroadcastProcessFunction<OdsOrder, DimUser, DwdOrder>() {
                
                @Override
                public void processElement(OdsOrder order, ReadOnlyContext ctx, 
                        Collector<DwdOrder> out) throws Exception {
                    // 从广播状态中获取用户信息
                    ReadOnlyBroadcastState<String, DimUser> state = 
                        ctx.getBroadcastState(userStateDescriptor);
                    DimUser user = state.get(order.getUserId());
                    
                    if (user != null) {
                        DwdOrder dwdOrder = new DwdOrder();
                        // 订单信息
                        dwdOrder.setOrderId(order.getOrderId());
                        dwdOrder.setAmount(order.getAmount());
                        
                        // 用户信息
                        dwdOrder.setUserId(user.getUserId());
                        dwdOrder.setUserName(user.getUserName());
                        dwdOrder.setUserLevel(user.getUserLevel());
                        
                        out.collect(dwdOrder);
                    }
                }
                
                @Override
                public void processBroadcastElement(DimUser user, Context ctx, 
                        Collector<DwdOrder> out) throws Exception {
                    // 更新广播状态
                    ctx.getBroadcastState(userStateDescriptor).put(user.getUserId(), user);
                }
            })
            // 数据清洗
            .filter(order -> order.getAmount() != null && order.getAmount().compareTo(BigDecimal.ZERO) > 0)
            .filter(order -> order.getUserId() != null)
            // 数据转换
            .map(new OrderTransformMapper());
        
        // 写入 ClickHouse DWD 层
        dwdStream.addSink(new ClickHouseSink<>(...));
        
        env.execute("DWD Process Job");
    }
}

DWS 层设计

DWS(Data Warehouse Summary)层是实时数仓的汇总层,对 DWD 层数据进行聚合计算。

核心职责:

  • 数据聚合:按时间窗口、维度进行聚合
  • 指标计算:计算业务指标
  • 轻度汇总:为 ADS 层提供基础数据
  • 性能优化:减少 ADS 层计算压力

设计原则:

  1. 粒度设计

    • 按时间粒度:分钟、小时、天
    • 按维度粒度:用户、商品、地区
    • 平衡查询性能和存储成本
  2. 指标设计

    • 原子指标:不可再拆分的指标
    • 派生指标:基于原子指标计算
    • 复合指标:多个指标组合
  3. 窗口设计

    • 滚动窗口:固定时间窗口
    • 滑动窗口:重叠时间窗口
    • 会话窗口:基于活动的窗口
  4. 聚合策略

    • 增量聚合:来一条处理一条
    • 全量聚合:窗口结束时统计
    • 混合聚合:先增量再全量

DWS 层表设计示例:

-- 订单分钟级汇总表
CREATE TABLE dws_order_1min (
    window_start DateTime,
    window_end DateTime,
    province String,
    city String,
    category String,
    
    -- 聚合指标
    order_count UInt64,
    user_count UInt64,
    product_count UInt64,
    total_amount Decimal(18, 2),
    avg_amount Decimal(18, 2),
    max_amount Decimal(18, 2),
    min_amount Decimal(18, 2),
    
    etl_time DateTime DEFAULT now()
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, province, city, category);

-- 用户小时级汇总表
CREATE TABLE dws_user_1hour (
    window_start DateTime,
    window_end DateTime,
    user_id String,
    user_level String,
    
    -- 行为指标
    pv UInt64,
    uv UInt64,
    click_count UInt64,
    cart_count UInt64,
    order_count UInt64,
    
    -- 金额指标
    total_amount Decimal(18, 2),
    avg_order_amount Decimal(18, 2),
    
    -- 转化指标
    click_rate Float64,
    cart_rate Float64,
    conversion_rate Float64,
    
    etl_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(etl_time)
PARTITION BY toYYYYMMDD(window_start)
ORDER BY (window_start, user_id);

Flink DWS 层聚合逻辑:

public class DwsAggregateJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取 DWD 层数据
        DataStream<DwdOrder> dwdStream = env
            .addSource(new FlinkKafkaConsumer<>("dwd_order_topic", ...));
        
        // 设置 Watermark
        DataStream<DwdOrder> watermarkedStream = dwdStream
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<DwdOrder>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((order, ts) -> order.getCreateTime())
            );
        
        // 按维度分组 + 滚动窗口聚合
        DataStream<DwsOrderStats> dwsStream = watermarkedStream
            .keyBy(order -> Tuple3.of(
                order.getProvince(),
                order.getCity(),
                order.getCategory()
            ))
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new AggregateFunction<DwdOrder, OrderAccumulator, DwsOrderStats>() {
                
                @Override
                public OrderAccumulator createAccumulator() {
                    return new OrderAccumulator();
                }
                
                @Override
                public OrderAccumulator add(DwdOrder order, OrderAccumulator acc) {
                    acc.orderCount++;
                    acc.userSet.add(order.getUserId());
                    acc.productSet.add(order.getProductId());
                    acc.totalAmount = acc.totalAmount.add(order.getFinalAmount());
                    acc.maxAmount = acc.maxAmount.max(order.getFinalAmount());
                    acc.minAmount = acc.minAmount.min(order.getFinalAmount());
                    return acc;
                }
                
                @Override
                public DwsOrderStats getResult(OrderAccumulator acc) {
                    return DwsOrderStats.builder()
                        .orderCount(acc.orderCount)
                        .userCount(acc.userSet.size())
                        .productCount(acc.productSet.size())
                        .totalAmount(acc.totalAmount)
                        .avgAmount(acc.totalAmount.divide(
                            BigDecimal.valueOf(acc.orderCount), 2, RoundingMode.HALF_UP))
                        .maxAmount(acc.maxAmount)
                        .minAmount(acc.minAmount)
                        .build();
                }
                
                @Override
                public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
                    a.orderCount += b.orderCount;
                    a.userSet.addAll(b.userSet);
                    a.productSet.addAll(b.productSet);
                    a.totalAmount = a.totalAmount.add(b.totalAmount);
                    a.maxAmount = a.maxAmount.max(b.maxAmount);
                    a.minAmount = a.minAmount.min(b.minAmount);
                    return a;
                }
            });
        
        // 写入 ClickHouse DWS 层
        dwsStream.addSink(new ClickHouseSink<>(...));
        
        env.execute("DWS Aggregate Job");
    }
}

ADS 层设计

ADS(Application Data Store)层是实时数仓的应用层,面向具体业务场景提供数据服务。

核心职责:

  • 业务主题:按业务主题组织数据
  • 指标计算:计算复杂业务指标
  • 数据服务:为前端应用提供数据接口
  • 性能优化:针对查询场景优化

设计原则:

  1. 面向应用

    • 按业务场景设计表结构
    • 字段命名符合前端需求
    • 查询性能优先
    • 减少关联查询
  2. 指标完整

    • 包含所有业务指标
    • 支持多维度分析
    • 支持实时查询
    • 支持历史对比
  3. 易于使用

    • 表结构简单清晰
    • 减少关联查询
    • 提供标准 API
    • 文档完善
  4. 性能保证

    • 使用物化视图
    • 使用缓存(Redis)
    • 预计算指标
    • 合理分区

ADS 层表设计示例:

-- 实时 GMV 大屏表
CREATE TABLE ads_realtime_gmv (
    stat_time DateTime,
    time_granularity String,  -- 时间粒度 (1min/5min/1hour)
    
    -- 核心指标
    gmv Decimal(18, 2),
    order_count UInt64,
    user_count UInt64,
    product_count UInt64,
    avg_order_amount Decimal(18, 2),
    
    -- 同比环比
    gmv_yoy Decimal(10, 2),  -- GMV 同比增长率
    gmv_mom Decimal(10, 2),  -- GMV 环比增长率
    order_yoy Decimal(10, 2),
    order_mom Decimal(10, 2),
    
    -- TOP 数据
    top_province String,
    top_category String,
    top_product String,
    
    -- 系统字段
    update_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY (stat_time, time_granularity);

-- 用户画像表
CREATE TABLE ads_user_portrait (
    user_id String,
    
    -- 基本信息
    user_name String,
    age UInt8,
    gender String,
    province String,
    city String,
    
    -- 行为特征
    total_pv UInt64,
    total_uv UInt64,
    avg_session_duration UInt32,
    last_visit_time DateTime,
    
    -- 消费特征
    total_order_count UInt64,
    total_amount Decimal(18, 2),
    avg_order_amount Decimal(18, 2),
    max_order_amount Decimal(18, 2),
    last_order_time DateTime,
    
    -- 偏好特征
    favorite_category Array(String),
    favorite_brand Array(String),
    
    -- 标签
    user_tags Array(String),
    user_level String,
    
    -- 系统字段
    update_time DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY user_id;

分层设计最佳实践

1. 分层原则

层级中文名数据粒度主要操作存储引擎数据延迟保留时间
ODS操作数据层明细采集、存储Kafka/ClickHouse秒级7-30 天
DWD明细数据层明细清洗、转换、关联ClickHouse/Hudi秒级30-90 天
DWS汇总数据层轻度汇总聚合、计算ClickHouse分钟级90-180 天
ADS应用数据层高度汇总业务指标ClickHouse/Redis分钟级180-365 天

2. 数据流转

sequenceDiagram participant Source as 数据源 participant ODS as ODS 层 participant DWD as DWD 层 participant DWS as DWS 层 participant ADS as ADS 层 participant App as 应用层 Source->>ODS: CDC 实时采集 Note over ODS: 保持原貌
不做处理 ODS->>DWD: 实时消费 Note over DWD: 清洗转换
维度关联 DWD->>DWS: 实时聚合 Note over DWS: 窗口计算
指标聚合 DWS->>ADS: 指标计算 Note over ADS: 业务指标
多维分析 ADS->>App: API 查询 Note over App: 实时大屏
实时报表

3. 命名规范

层级表名前缀示例说明
ODSods_ods_order原始数据
DWDdwd_dwd_order明细数据
DWSdws_dws_order_1min汇总数据 + 时间粒度
ADSads_ads_realtime_gmv应用数据 + 业务主题
DIMdim_dim_user维度数据

4. 数据质量保证

// 数据质量检查
public class DataQualityChecker extends ProcessFunction<Order, Order> {
    
    private transient Counter validCounter;
    private transient Counter invalidCounter;
    private transient Histogram qualityScore;
    
    @Override
    public void open(Configuration parameters) {
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        validCounter = metricGroup.counter("valid_records");
        invalidCounter = metricGroup.counter("invalid_records");
        qualityScore = metricGroup.histogram("quality_score", 
            new DescriptiveStatisticsHistogram(1000));
    }
    
    @Override
    public void processElement(Order order, Context ctx, Collector<Order> out) {
        int score = 100;
        List<String> errors = new ArrayList<>();
        
        // 必填字段检查
        if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
            errors.add("订单 ID 为空");
            score -= 20;
        }
        
        // 数值范围检查
        if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            errors.add("订单金额异常");
            score -= 20;
        }
        
        // 枚举值检查
        if (!Arrays.asList("PENDING", "PAID", "CANCELLED").contains(order.getStatus())) {
            errors.add("订单状态非法");
            score -= 15;
        }
        
        // 时间合理性检查
        if (order.getCreateTime() == null || order.getCreateTime() > System.currentTimeMillis()) {
            errors.add("订单时间异常");
            score -= 15;
        }
        
        // 数据完整性检查
        if (order.getUserId() == null) {
            errors.add("用户 ID 为空");
            score -= 15;
        }
        
        qualityScore.update(score);
        
        if (score >= 80) {
            validCounter.inc();
            out.collect(order);
        } else {
            invalidCounter.inc();
            log.error("数据质量不合格: orderId={}, score={}, errors={}", 
                order.getOrderId(), score, errors);
            // 发送到错误队列
            ctx.output(errorOutputTag, order);
        }
    }
}

5. 分层优化建议

优化项ODS 层DWD 层DWS 层ADS 层
分区策略按天按天按天/小时按月
压缩算法LZ4LZ4LZ4ZSTD
索引策略主键索引主键 + 跳数索引主键 + 物化视图全文索引
TTL 策略7-30 天30-90 天90-180 天180-365 天
备份策略不备份增量备份增量备份全量备份

实时数据采集与同步

CDC 技术原理

CDC(Change Data Capture) 是实时数仓的核心技术,通过捕获数据库的变更日志实现数据的实时同步。

CDC 工作原理:

  1. Binlog 解析

    • MySQL 的 Binlog 记录了所有数据变更
    • CDC 工具伪装成 MySQL Slave 读取 Binlog
    • 解析 Binlog 事件,转换为变更记录
  2. 事件类型

    • INSERT:新增数据
    • UPDATE:更新数据
    • DELETE:删除数据
    • DDL:表结构变更
  3. 数据格式

    • Before:变更前的数据
    • After:变更后的数据
    • Metadata:元数据信息

Binlog 格式:

格式说明优点缺点适用场景
Statement记录 SQL 语句日志量小可能不一致不推荐
Row记录行变更数据一致日志量大推荐
Mixed混合模式折中方案复杂特殊场景

配置 MySQL Binlog:

-- 查看 Binlog 配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

-- 开启 Binlog(需要重启)
-- my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=7

-- 创建 CDC 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

-- 查看 Binlog 文件
SHOW BINARY LOGS;

-- 查看 Binlog 内容
SHOW BINLOG EVENTS IN 'mysql-bin.000001';

Canal 实战

Canal 部署:

# 下载 Canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz -C /opt/canal

# 配置 Canal
cd /opt/canal/conf/example

# instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.defaultDatabaseName=test

# 订阅规则
canal.instance.filter.regex=test_db\\..*
canal.instance.filter.black.regex=test_db\\.tmp_.*

# 启动 Canal
cd /opt/canal
sh bin/startup.sh

# 查看日志
tail -f logs/canal/canal.log
tail -f logs/example/example.log

Canal Client 示例:

public class CanalClient {
    public static void main(String[] args) {
        // 创建连接
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111),
            "example",
            "",
            ""
        );
        
        try {
            connector.connect();
            connector.subscribe("test_db\\..*");
            connector.rollback();
            
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                
                if (batchId == -1 || size == 0) {
                    Thread.sleep(1000);
                } else {
                    printEntry(message.getEntries());
                }
                
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    
    private static void printEntry(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || 
                entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            
            RowChange rowChange = null;
            try {
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error", e);
            }
            
            EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                eventType));
            
            for (RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + 
                "    update=" + column.getUpdated());
        }
    }
}

Canal 发送到 Kafka:

# canal.properties
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = lz4
kafka.batch.size = 16384
kafka.linger.ms = 10

# instance.properties
canal.mq.topic = canal_topic
canal.mq.partition = 0
canal.mq.partitionsNum = 3
canal.mq.partitionHash = test_db.orders:id

Debezium 实战

Debezium 部署(Kafka Connect):

# 下载 Debezium
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.0.Final/debezium-connector-mysql-2.1.0.Final-plugin.tar.gz
tar -zxvf debezium-connector-mysql-2.1.0.Final-plugin.tar.gz -C /opt/kafka/plugins/

# 配置 Kafka Connect
# connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
plugin.path=/opt/kafka/plugins

# 启动 Kafka Connect
bin/connect-distributed.sh config/connect-distributed.properties

创建 Debezium Connector:

# 创建 MySQL Connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "database.include.list": "inventory",
    "table.include.list": "inventory.orders,inventory.products",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true",
    "snapshot.mode": "initial",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}'

# 查看 Connector 状态
curl http://localhost:8083/connectors/mysql-connector/status

# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector

Debezium 数据格式:

{
  "before": null,
  "after": {
    "id": 1001,
    "order_no": "ORD20240228001",
    "user_id": "user123",
    "amount": 199.99,
    "status": "PAID",
    "create_time": 1709107200000
  },
  "source": {
    "version": "2.1.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1709107200123,
    "snapshot": "false",
    "db": "inventory",
    "table": "orders",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "c",
  "ts_ms": 1709107200456,
  "transaction": null
}

Flink CDC 依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.4.0</version>
</dependency>

Flink CDC 使用示例:

public class FlinkCDCJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);
        
        // 配置 MySQL CDC Source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("inventory")
            .tableList("inventory.orders", "inventory.products")
            .username("root")
            .password("password")
            .serverTimeZone("Asia/Shanghai")
            .startupOptions(StartupOptions.initial())  // 全量 + 增量
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
        
        // 创建数据流
        DataStreamSource<String> cdcStream = env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
        
        // 处理 CDC 数据
        DataStream<Order> orderStream = cdcStream
            .map(new MapFunction<String, Order>() {
                @Override
                public Order map(String value) throws Exception {
                    JSONObject json = JSON.parseObject(value);
                    String op = json.getString("op");
                    
                    Order order = new Order();
                    switch (op) {
                        case "c":  // CREATE (INSERT)
                        case "r":  // READ (全量同步)
                            JSONObject after = json.getJSONObject("after");
                            order.setOrderId(after.getString("id"));
                            order.setOrderNo(after.getString("order_no"));
                            order.setUserId(after.getString("user_id"));
                            order.setAmount(after.getBigDecimal("amount"));
                            order.setStatus(after.getString("status"));
                            order.setCreateTime(after.getLong("create_time"));
                            order.setDeleted(false);
                            break;
                        case "u":  // UPDATE
                            after = json.getJSONObject("after");
                            order.setOrderId(after.getString("id"));
                            order.setStatus(after.getString("status"));
                            order.setDeleted(false);
                            break;
                        case "d":  // DELETE
                            JSONObject before = json.getJSONObject("before");
                            order.setOrderId(before.getString("id"));
                            order.setDeleted(true);
                            break;
                    }
                    return order;
                }
            })
            .filter(order -> order.getOrderId() != null);
        
        // 写入 Kafka
        orderStream.sinkTo(
            KafkaSink.<Order>builder()
                .setBootstrapServers("localhost:9092")
                .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                        .setTopic("dwd_order")
                        .setValueSerializationSchema(new JsonSerializationSchema<>())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build()
        );
        
        env.execute("Flink CDC Job");
    }
}

Flink CDC Table API:

public class FlinkCDCTableJob {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // 创建 MySQL CDC 表
        tableEnv.executeSql(
            "CREATE TABLE orders (" +
            "  id BIGINT," +
            "  order_no STRING," +
            "  user_id STRING," +
            "  amount DECIMAL(18, 2)," +
            "  status STRING," +
            "  create_time TIMESTAMP(3)," +
            "  PRIMARY KEY (id) NOT ENFORCED" +
            ") WITH (" +
            "  'connector' = 'mysql-cdc'," +
            "  'hostname' = 'localhost'," +
            "  'port' = '3306'," +
            "  'username' = 'root'," +
            "  'password' = 'password'," +
            "  'database-name' = 'inventory'," +
            "  'table-name' = 'orders'" +
            ")"
        );
        
        // 创建 Kafka Sink 表
        tableEnv.executeSql(
            "CREATE TABLE kafka_orders (" +
            "  id BIGINT," +
            "  order_no STRING," +
            "  user_id STRING," +
            "  amount DECIMAL(18, 2)," +
            "  status STRING," +
            "  create_time TIMESTAMP(3)" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic' = 'dwd_order'," +
            "  'properties.bootstrap.servers' = 'localhost:9092'," +
            "  'format' = 'json'" +
            ")"
        );
        
        // 数据同步
        tableEnv.executeSql(
            "INSERT INTO kafka_orders " +
            "SELECT id, order_no, user_id, amount, status, create_time " +
            "FROM orders"
        );
    }
}

全量与增量同步策略

同步策略对比:

策略说明优点缺点适用场景
全量同步一次性同步所有数据数据完整时间长、资源消耗大初始化、数据修复
增量同步只同步变更数据实时性好、资源消耗小需要 CDC 支持日常同步
全量 + 增量先全量后增量兼顾完整性和实时性实现复杂推荐方案

全量 + 增量同步流程:

sequenceDiagram participant Source as MySQL participant CDC as Flink CDC participant Kafka as Kafka participant Sink as ClickHouse Note over CDC: 阶段 1:全量同步 CDC->>Source: SELECT * FROM orders Source->>CDC: 返回全量数据 CDC->>Kafka: 发送全量数据 Kafka->>Sink: 写入全量数据 Note over CDC: 阶段 2:增量同步 Source->>CDC: Binlog 变更事件 CDC->>Kafka: 发送增量数据 Kafka->>Sink: 写入增量数据 Note over CDC: 持续增量同步

全量 + 增量同步实现:

public class FullIncrementalSyncJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 配置 MySQL CDC Source(全量 + 增量)
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("inventory")
            .tableList("inventory.orders")
            .username("root")
            .password("password")
            .startupOptions(StartupOptions.initial())  // 先全量再增量
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
        
        DataStreamSource<String> cdcStream = env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC");
        
        // 处理数据
        DataStream<Order> orderStream = cdcStream
            .map(new ParseCDCFunction())
            .filter(order -> order != null);
        
        // 根据操作类型分流
        OutputTag<Order> insertTag = new OutputTag<Order>("insert"){};
        OutputTag<Order> updateTag = new OutputTag<Order>("update"){};
        OutputTag<Order> deleteTag = new OutputTag<Order>("delete"){};
        
        SingleOutputStreamOperator<Order> mainStream = orderStream
            .process(new ProcessFunction<Order, Order>() {
                @Override
                public void processElement(Order order, Context ctx, Collector<Order> out) {
                    String op = order.getOp();
                    switch (op) {
                        case "c":
                        case "r":
                            ctx.output(insertTag, order);
                            break;
                        case "u":
                            ctx.output(updateTag, order);
                            break;
                        case "d":
                            ctx.output(deleteTag, order);
                            break;
                    }
                }
            });
        
        // 分别处理不同类型的数据
        DataStream<Order> insertStream = mainStream.getSideOutput(insertTag);
        DataStream<Order> updateStream = mainStream.getSideOutput(updateTag);
        DataStream<Order> deleteStream = mainStream.getSideOutput(deleteTag);
        
        // 写入不同的 Sink
        insertStream.addSink(new ClickHouseInsertSink());
        updateStream.addSink(new ClickHouseUpdateSink());
        deleteStream.addSink(new ClickHouseDeleteSink());
        
        env.execute("Full Incremental Sync Job");
    }
}

断点续传:

// 使用 Checkpoint 实现断点续传
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// 从 Checkpoint 恢复
// flink run -s hdfs://namenode:9000/flink/checkpoints/chk-123 your-job.jar

数据一致性保证:

// Exactly-Once 配置
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Kafka Sink Exactly-Once
KafkaSink<Order> sink = KafkaSink.<Order>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(...)
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-kafka-")
    .build();

// ClickHouse 幂等写入
public class ClickHouseIdempotentSink extends RichSinkFunction<Order> {
    @Override
    public void invoke(Order order, Context context) {
        // 使用 REPLACE INTO 或 INSERT ... ON DUPLICATE KEY UPDATE
        String sql = "INSERT INTO orders (order_id, amount, status) " +
                    "VALUES (?, ?, ?) " +
                    "ON DUPLICATE KEY UPDATE amount=VALUES(amount), status=VALUES(status)";
        
        try (PreparedStatement ps = connection.prepareStatement(sql)) {
            ps.setString(1, order.getOrderId());
            ps.setBigDecimal(2, order.getAmount());
            ps.setString(3, order.getStatus());
            ps.executeUpdate();
        }
    }
}

实时数据处理

数据清洗

数据清洗是保证数据质量的关键环节,包括去重、过滤、校验等操作。

常见清洗场景:

场景问题解决方案示例
脏数据字段缺失、格式错误过滤或补全默认值NULL 值填充为 0
重复数据同一条数据多次发送去重(基于主键)使用状态去重
异常数据数值超出合理范围过滤或修正金额 < 0 设为 0
空值处理NULL 值影响计算填充默认值NULL 填充为 “未知”
类型转换字符串转数值失败异常捕获和处理Try-Catch 处理
编码问题乱码统一编码格式UTF-8 编码

Flink 数据清洗示例:

public class DataCleanFunction extends RichMapFunction<Order, Order> {
    
    private transient Counter invalidCounter;
    private transient Counter cleanedCounter;
    
    @Override
    public void open(Configuration parameters) {
        invalidCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("invalid_records");
        cleanedCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("cleaned_records");
    }
    
    @Override
    public Order map(Order order) throws Exception {
        // 1. 字段校验
        if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
            invalidCounter.inc();
            return null;  // 过滤无效数据
        }
        
        // 2. 数值范围校验
        if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) < 0) {
            order.setAmount(BigDecimal.ZERO);  // 修正异常值
        }
        
        // 3. 空值处理
        if (order.getProvince() == null || order.getProvince().isEmpty()) {
            order.setProvince("未知");
        }
        
        // 4. 类型转换
        try {
            if (order.getCreateTimeStr() != null) {
                order.setCreateTime(
                    LocalDateTime.parse(order.getCreateTimeStr(), 
                        DateTimeFormatter.ISO_DATE_TIME)
                );
            }
        } catch (Exception e) {
            order.setCreateTime(LocalDateTime.now());
        }
        
        // 5. 数据标准化
        order.setProvince(order.getProvince().trim());
        order.setStatus(order.getStatus().toUpperCase());
        
        // 6. 数据脱敏
        if (order.getUserPhone() != null) {
            order.setUserPhone(maskPhone(order.getUserPhone()));
        }
        
        cleanedCounter.inc();
        return order;
    }
    
    private String maskPhone(String phone) {
        if (phone.length() == 11) {
            return phone.substring(0, 3) + "****" + phone.substring(7);
        }
        return phone;
    }
}

// 使用清洗函数
DataStream<Order> cleanedStream = rawStream
    .map(new DataCleanFunction())
    .filter(Objects::nonNull);  // 过滤 null 值

基于状态的去重:

public class DeduplicationFunction extends KeyedProcessFunction<String, Order, Order> {
    
    // 状态:记录已处理的订单 ID
    private ValueState<Boolean> processedState;
    
    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> descriptor = 
            new ValueStateDescriptor<>("processed", Boolean.class);
        
        // 设置状态 TTL,1 小时后自动清理
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupIncrementally(10, true)
            .build();
        descriptor.enableTimeToLive(ttlConfig);
        
        processedState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public void processElement(Order order, Context ctx, Collector<Order> out) 
            throws Exception {
        Boolean processed = processedState.value();
        if (processed == null || !processed) {
            // 首次处理
            processedState.update(true);
            out.collect(order);
        }
        // 重复数据,丢弃
    }
}

// 按订单 ID 去重
DataStream<Order> deduplicatedStream = cleanedStream
    .keyBy(Order::getOrderId)
    .process(new DeduplicationFunction());

数据转换

数据转换是将数据从一种格式转换为另一种格式,包括字段映射、类型转换、格式统一等。

常见转换场景:

  1. 字段映射:源字段名到目标字段名的映射
  2. 类型转换:String 转 Int、Long 转 Timestamp 等
  3. 格式统一:日期格式、金额格式统一
  4. 字段拆分:一个字段拆分为多个字段
  5. 字段合并:多个字段合并为一个字段
  6. 字段计算:基于现有字段计算新字段

Flink 数据转换示例:

public class DataTransformFunction implements MapFunction<OdsOrder, DwdOrder> {
    
    @Override
    public DwdOrder map(OdsOrder ods) throws Exception {
        DwdOrder dwd = new DwdOrder();
        
        // 1. 字段映射
        dwd.setOrderId(ods.getOrderId());
        dwd.setUserId(ods.getUserId());
        
        // 2. 类型转换
        dwd.setAmount(new BigDecimal(ods.getAmountStr()));
        dwd.setCreateTime(Long.parseLong(ods.getCreateTimeStr()));
        
        // 3. 格式统一
        dwd.setOrderNo(formatOrderNo(ods.getOrderNo()));
        dwd.setPhone(formatPhone(ods.getPhone()));
        
        // 4. 字段拆分
        String[] address = ods.getAddress().split("-");
        if (address.length >= 3) {
            dwd.setProvince(address[0]);
            dwd.setCity(address[1]);
            dwd.setDistrict(address[2]);
        }
        
        // 5. 字段合并
        dwd.setFullName(ods.getFirstName() + ods.getLastName());
        
        // 6. 字段计算
        dwd.setFinalAmount(
            ods.getOriginalAmount().subtract(ods.getDiscountAmount())
        );
        dwd.setDiscountRate(
            ods.getDiscountAmount()
                .divide(ods.getOriginalAmount(), 4, RoundingMode.HALF_UP)
                .multiply(new BigDecimal("100"))
        );
        
        return dwd;
    }
    
    private String formatOrderNo(String orderNo) {
        // 统一订单号格式:ORD20240228001
        return "ORD" + orderNo;
    }
    
    private String formatPhone(String phone) {
        // 统一手机号格式:去除空格和横线
        return phone.replaceAll("[\\s-]", "");
    }
}

数据关联

数据关联是将事实表与维度表进行关联,补充业务信息。

关联方式对比:

方式原理优点缺点适用场景
广播关联维度表广播到所有节点实时性好、无外部依赖维度表不能太大小维度表(< 100MB)
异步 IO异步查询外部存储支持大维度表增加外部依赖、延迟高大维度表
Temporal Join基于时间的关联支持变化维度配置复杂缓慢变化维度
Lookup Join查询外部数据源灵活性高性能开销大实时维度查询
Interval Join时间区间关联支持乱序数据需要 Watermark双流关联

广播关联示例:

// 维度表流(用户维度)
DataStream<DimUser> userStream = env
    .addSource(new FlinkKafkaConsumer<>("dim_user", ...))
    .map(new UserDeserializer());

// 定义广播状态描述符
MapStateDescriptor<String, DimUser> userStateDescriptor = 
    new MapStateDescriptor<>(
        "user-broadcast-state",
        BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(DimUser.class)
    );

// 创建广播流
BroadcastStream<DimUser> userBroadcast = userStream
    .broadcast(userStateDescriptor);

// 事实表流(订单流)
DataStream<Order> orderStream = env
    .addSource(new FlinkKafkaConsumer<>("ods_order", ...));

// 关联维度
DataStream<OrderWithUser> enrichedStream = orderStream
    .connect(userBroadcast)
    .process(new BroadcastProcessFunction<Order, DimUser, OrderWithUser>() {
        
        @Override
        public void processElement(Order order, ReadOnlyContext ctx, 
                Collector<OrderWithUser> out) throws Exception {
            // 从广播状态中获取用户信息
            ReadOnlyBroadcastState<String, DimUser> state = 
                ctx.getBroadcastState(userStateDescriptor);
            DimUser user = state.get(order.getUserId());
            
            if (user != null) {
                OrderWithUser enriched = new OrderWithUser();
                enriched.setOrder(order);
                enriched.setUser(user);
                out.collect(enriched);
            } else {
                // 维度数据缺失,记录日志或发送到侧输出流
                log.warn("User not found: {}", order.getUserId());
            }
        }
        
        @Override
        public void processBroadcastElement(DimUser user, Context ctx, 
                Collector<OrderWithUser> out) throws Exception {
            // 更新广播状态
            ctx.getBroadcastState(userStateDescriptor)
                .put(user.getUserId(), user);
        }
    });

异步 IO 关联示例:

// 异步查询 Redis 维度表
public class AsyncRedisLookupFunction 
        extends RichAsyncFunction<Order, OrderWithUser> {
    
    private transient RedisAsyncCommands<String, String> redisClient;
    
    @Override
    public void open(Configuration parameters) {
        RedisClient client = RedisClient.create("redis://localhost:6379");
        StatefulRedisConnection<String, String> connection = client.connect();
        redisClient = connection.async();
    }
    
    @Override
    public void asyncInvoke(Order order, ResultFuture<OrderWithUser> resultFuture) {
        String key = "user:" + order.getUserId();
        
        CompletableFuture<String> future = redisClient.get(key).toCompletableFuture();
        
        future.whenComplete((userJson, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally(throwable);
                return;
            }
            
            if (userJson != null) {
                DimUser user = JSON.parseObject(userJson, DimUser.class);
                OrderWithUser result = new OrderWithUser(order, user);
                resultFuture.complete(Collections.singleton(result));
            } else {
                // 维度数据不存在,返回空用户
                resultFuture.complete(Collections.singleton(
                    new OrderWithUser(order, new DimUser())
                ));
            }
        });
    }
    
    @Override
    public void timeout(Order order, ResultFuture<OrderWithUser> resultFuture) {
        // 超时处理
        log.warn("Async lookup timeout: {}", order.getOrderId());
        resultFuture.complete(Collections.singleton(
            new OrderWithUser(order, new DimUser())
        ));
    }
    
    @Override
    public void close() throws Exception {
        if (redisClient != null) {
            redisClient.getStatefulConnection().close();
        }
    }
}

// 使用异步 IO
DataStream<OrderWithUser> enrichedStream = AsyncDataStream
    .unorderedWait(
        orderStream,
        new AsyncRedisLookupFunction(),
        5000,  // 超时时间 5 秒
        TimeUnit.MILLISECONDS,
        100    // 最大并发请求数
    );

Interval Join 示例:

// 订单流
DataStream<Order> orderStream = env
    .addSource(new FlinkKafkaConsumer<>("orders", ...))
    .assignTimestampsAndWatermarks(...);

// 支付流
DataStream<Payment> paymentStream = env
    .addSource(new FlinkKafkaConsumer<>("payments", ...))
    .assignTimestampsAndWatermarks(...);

// Interval Join:订单和支付在 10 分钟内关联
DataStream<OrderPayment> joinedStream = orderStream
    .keyBy(Order::getOrderId)
    .intervalJoin(paymentStream.keyBy(Payment::getOrderId))
    .between(Time.minutes(-10), Time.minutes(10))  // 时间区间
    .process(new ProcessJoinFunction<Order, Payment, OrderPayment>() {
        @Override
        public void processElement(Order order, Payment payment, Context ctx, 
                Collector<OrderPayment> out) {
            OrderPayment result = new OrderPayment();
            result.setOrder(order);
            result.setPayment(payment);
            out.collect(result);
        }
    });

数据聚合

数据聚合是实时数仓的核心计算逻辑,包括窗口聚合、增量聚合等。

窗口类型对比:

窗口类型说明特点适用场景
滚动窗口固定大小,不重叠简单、高效每分钟统计
滑动窗口固定大小,可重叠平滑、连续最近 5 分钟
会话窗口基于活动间隔动态大小用户会话
全局窗口自定义触发器灵活特殊场景

窗口聚合示例:

// 滚动窗口:每分钟统计订单数和金额
DataStream<OrderStats> tumblingStats = orderStream
    .keyBy(Order::getProvince)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AggregateFunction<Order, OrderAccumulator, OrderStats>() {
        @Override
        public OrderAccumulator createAccumulator() {
            return new OrderAccumulator();
        }
        
        @Override
        public OrderAccumulator add(Order order, OrderAccumulator acc) {
            acc.count++;
            acc.totalAmount = acc.totalAmount.add(order.getAmount());
            acc.userSet.add(order.getUserId());
            return acc;
        }
        
        @Override
        public OrderStats getResult(OrderAccumulator acc) {
            return new OrderStats(
                acc.count,
                acc.userSet.size(),
                acc.totalAmount,
                acc.totalAmount.divide(BigDecimal.valueOf(acc.count), 2, RoundingMode.HALF_UP)
            );
        }
        
        @Override
        public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
            a.count += b.count;
            a.totalAmount = a.totalAmount.add(b.totalAmount);
            a.userSet.addAll(b.userSet);
            return a;
        }
    });

// 滑动窗口:最近 5 分钟,每分钟更新一次
DataStream<OrderStats> slidingStats = orderStream
    .keyBy(Order::getProvince)
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .aggregate(new OrderAggregateFunction());

// 会话窗口:30 分钟无活动则结束会话
DataStream<SessionStats> sessionStats = behaviorStream
    .keyBy(Behavior::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregateFunction());

// 计数窗口:每 100 条数据触发一次
DataStream<OrderStats> countStats = orderStream
    .keyBy(Order::getUserId)
    .countWindow(100)
    .aggregate(new OrderAggregateFunction());

窗口计算

窗口触发器:

// 自定义触发器:每 10 秒或 100 条数据触发一次
public class CustomTrigger extends Trigger<Order, TimeWindow> {
    
    private final long maxCount = 100;
    private final long maxTime = 10000;  // 10 秒
    
    @Override
    public TriggerResult onElement(Order element, long timestamp, 
            TimeWindow window, TriggerContext ctx) throws Exception {
        // 注册定时器
        ctx.registerEventTimeTimer(window.maxTimestamp());
        
        // 获取计数状态
        ValueState<Long> countState = ctx.getPartitionedState(
            new ValueStateDescriptor<>("count", Long.class, 0L)
        );
        long count = countState.value() + 1;
        countState.update(count);
        
        // 达到最大计数,触发计算
        if (count >= maxCount) {
            countState.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        
        // 注册处理时间定时器
        long fireTime = ctx.getCurrentProcessingTime() + maxTime;
        ctx.registerProcessingTimeTimer(fireTime);
        
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, 
            TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }
    
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, 
            TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ? 
            TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
    }
    
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }
}

// 使用自定义触发器
DataStream<OrderStats> customStats = orderStream
    .keyBy(Order::getProvince)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .trigger(new CustomTrigger())
    .aggregate(new OrderAggregateFunction());

增量聚合 vs 全量聚合:

// 增量聚合:AggregateFunction(推荐)
window.aggregate(new AggregateFunction<Order, OrderAccumulator, OrderStats>() {
    // 来一条处理一条,内存占用小
});

// 全量聚合:ProcessWindowFunction
window.process(new ProcessWindowFunction<Order, OrderStats, String, TimeWindow>() {
    @Override
    public void process(String key, Context context, 
            Iterable<Order> elements, Collector<OrderStats> out) {
        // 窗口结束时,elements 包含窗口内所有数据
        // 内存占用大,但可以访问窗口元数据
        List<Order> orders = StreamSupport
            .stream(elements.spliterator(), false)
            .collect(Collectors.toList());
        
        // 复杂计算
        OrderStats stats = calculateComplexStats(orders);
        stats.setWindowStart(context.window().getStart());
        stats.setWindowEnd(context.window().getEnd());
        out.collect(stats);
    }
});

// 增量 + 全量:先增量聚合,再全量处理(最佳实践)
window.aggregate(
    new OrderAggregateFunction(),  // 增量聚合
    new ProcessWindowFunction<OrderStats, OrderStats, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, 
                Iterable<OrderStats> elements, Collector<OrderStats> out) {
            // 增量聚合的结果
            OrderStats stats = elements.iterator().next();
            // 添加窗口元数据
            stats.setWindowStart(context.window().getStart());
            stats.setWindowEnd(context.window().getEnd());
            out.collect(stats);
        }
    }
);

实时数仓性能优化

Flink 性能调优是实时数仓优化的核心,涉及并行度、内存、状态、Checkpoint 等多个方面。

核心调优参数:

参数类别参数名推荐值说明
并行度parallelism.defaultCPU 核数 * 2默认并行度
内存taskmanager.memory.process.size8-16GBTaskManager 总内存
内存taskmanager.memory.managed.fraction0.4托管内存比例
Checkpointexecution.checkpointing.interval60s-300sCheckpoint 间隔
Checkpointexecution.checkpointing.timeout10minCheckpoint 超时
状态后端state.backendrocksdb状态后端类型
网络taskmanager.network.memory.fraction0.1-0.2网络缓冲区比例

1. 并行度优化

// 全局并行度
env.setParallelism(16);

// 算子级并行度
DataStream<Order> stream = env
    .addSource(new FlinkKafkaConsumer<>(...))
    .setParallelism(32)  // Source 并行度 = Kafka 分区数
    .map(new OrderMapper())
    .setParallelism(16)  // Map 并行度
    .keyBy(Order::getUserId)
    .window(...)
    .aggregate(new OrderAggregateFunction())
    .setParallelism(8);  // 聚合并行度

// Sink 并行度
stream.addSink(new ClickHouseSink<>(...))
    .setParallelism(4);  // 根据下游承载能力设置

并行度设置原则:

  • Source 并行度 = Kafka 分区数
  • 计算密集型算子:并行度 = CPU 核数 * 2
  • IO 密集型算子:并行度可适当增大
  • Sink 并行度:根据下游承载能力设置
  • 避免数据倾斜:使用 rebalance() 或自定义分区器

2. 内存优化

# flink-conf.yaml
taskmanager.memory.process.size: 16g
taskmanager.memory.flink.size: 14g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.15

# JVM 参数优化
env.java.opts: -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails

内存分配建议:

  • TaskManager 总内存:8-16GB
  • 托管内存(RocksDB):40%
  • 网络缓冲区:10-20%
  • 堆内存:剩余部分
  • JVM 开销:1-2GB

3. Checkpoint 优化

// Checkpoint 配置
env.enableCheckpointing(60000);  // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);  // 最小间隔
env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10 分钟超时
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);  // 最大并发数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);  // 容忍失败次数

// 增量 Checkpoint(RocksDB)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));  // 启用增量

// Checkpoint 存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:9000/flink/checkpoints");

// 外部化 Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// 本地恢复
env.getCheckpointConfig().enableLocalRecovery(true);

Checkpoint 调优建议:

  • 间隔时间:根据数据量和状态大小调整,一般 1-5 分钟
  • 增量 Checkpoint:大状态场景必须启用
  • 异步快照:默认启用,提升性能
  • 本地恢复:启用本地状态恢复加速重启
  • 压缩:启用状态压缩减少存储

4. 状态优化

// 状态 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))  // 24 小时过期
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()  // 全量快照时清理
    .cleanupIncrementally(10, true)  // 增量清理
    .build();

ValueStateDescriptor<String> descriptor = 
    new ValueStateDescriptor<>("state", String.class);
descriptor.enableTimeToLive(ttlConfig);

// RocksDB 调优
RocksDBStateBackend backend = new RocksDBStateBackend("hdfs://...", true);
backend.setNumberOfTransferThreads(4);

// RocksDB 配置
OptionsFactory optionsFactory = new OptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions) {
        return currentOptions
            .setMaxBackgroundJobs(4)
            .setMaxOpenFiles(-1);
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
        return currentOptions
            .setCompactionStyle(CompactionStyle.LEVEL)
            .setWriteBufferSize(64 * 1024 * 1024)  // 64MB
            .setMaxWriteBufferNumber(3);
    }
};
backend.setRocksDBOptions(optionsFactory);

5. 反压处理

反压监控:

# 查看反压情况
curl http://jobmanager:8081/jobs/<job-id>/vertices/<vertex-id>/backpressure

# Flink Web UI 查看
# Metrics -> Task -> buffers.outPoolUsage
# 接近 1.0 表示有反压

反压优化策略:

原因现象解决方案
下游处理慢Sink 反压增加 Sink 并行度、批量写入
数据倾斜某些 Task 慢重新分区、加盐
GC 频繁内存不足增加内存、优化代码
网络瓶颈网络缓冲区满增加网络缓冲区
外部系统慢异步 IO 慢增加并发度、优化查询
// 解决数据倾斜:加盐
DataStream<Order> rebalancedStream = skewedStream
    .map(order -> {
        // 添加随机后缀
        String newKey = order.getUserId() + "_" + 
            ThreadLocalRandom.current().nextInt(10);
        order.setUserId(newKey);
        return order;
    })
    .keyBy(Order::getUserId);

// 两阶段聚合
// 第一阶段:加盐聚合
DataStream<OrderStats> stage1 = skewedStream
    .map(order -> {
        order.setUserId(order.getUserId() + "_" + random.nextInt(10));
        return order;
    })
    .keyBy(Order::getUserId)
    .window(...)
    .aggregate(...);

// 第二阶段:去盐聚合
DataStream<OrderStats> stage2 = stage1
    .map(stats -> {
        stats.setUserId(stats.getUserId().split("_")[0]);
        return stats;
    })
    .keyBy(OrderStats::getUserId)
    .window(...)
    .aggregate(...);

// 批量写入优化
stream.addSink(new RichSinkFunction<Order>() {
    private List<Order> buffer = new ArrayList<>();
    private static final int BATCH_SIZE = 1000;
    
    @Override
    public void invoke(Order order, Context context) {
        buffer.add(order);
        if (buffer.size() >= BATCH_SIZE) {
            flush();
        }
    }
    
    @Override
    public void close() throws Exception {
        flush();
    }
    
    private void flush() {
        if (!buffer.isEmpty()) {
            clickHouseClient.batchInsert(buffer);
            buffer.clear();
        }
    }
});

ClickHouse 性能优化

1. 表引擎选择

-- MergeTree:通用场景
CREATE TABLE orders (
    order_id String,
    user_id String,
    amount Decimal(18, 2),
    create_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY (user_id, order_id)
SETTINGS index_granularity = 8192;

-- ReplacingMergeTree:去重场景
CREATE TABLE user_profile (
    user_id String,
    name String,
    age UInt8,
    update_time DateTime
) ENGINE = ReplacingMergeTree(update_time)
ORDER BY user_id;

-- SummingMergeTree:预聚合场景
CREATE TABLE order_stats (
    date Date,
    province String,
    order_count UInt64,
    total_amount Decimal(18, 2)
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province);

2. 分区设计

-- 按月分区(推荐)
PARTITION BY toYYYYMM(create_time)

-- 按天分区(数据量大时)
PARTITION BY toYYYYMMDD(create_time)

-- 按小时分区(实时场景)
PARTITION BY toYYYYMMDDhh(create_time)

-- 多级分区
PARTITION BY (toYYYYMM(create_time), province)

-- 查看分区
SELECT partition, name, rows, bytes_on_disk
FROM system.parts
WHERE table = 'orders' AND active
ORDER BY partition DESC;

-- 删除旧分区
ALTER TABLE orders DROP PARTITION '202401';

-- 优化分区(合并小文件)
OPTIMIZE TABLE orders PARTITION '202402';

分区设计原则:

  • 分区数量:建议 100-1000 个
  • 分区粒度:根据查询模式和数据量选择
  • 避免过度分区:影响查询性能
  • 定期清理:删除过期分区

3. 索引优化

-- 主键索引(ORDER BY)
ORDER BY (user_id, create_time)

-- 跳数索引(Skip Index)
-- minmax:数值范围查询
ALTER TABLE orders ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;

-- set:低基数字段
ALTER TABLE orders ADD INDEX idx_province province TYPE set(100) GRANULARITY 4;

-- bloom_filter:等值查询
ALTER TABLE orders ADD INDEX idx_status status TYPE bloom_filter GRANULARITY 4;

-- ngrambf_v1:模糊查询
ALTER TABLE orders ADD INDEX idx_user_name user_name TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4;

-- 物化列(Materialized Column)
ALTER TABLE orders ADD COLUMN order_date Date MATERIALIZED toDate(create_time);

-- 物化视图(Materialized View)
CREATE MATERIALIZED VIEW order_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, province)
AS SELECT
    toDate(create_time) AS date,
    province,
    count() AS order_count,
    sum(amount) AS total_amount
FROM orders
GROUP BY date, province;

4. 查询优化

-- 使用 PREWHERE 过滤(比 WHERE 快)
SELECT *
FROM orders
PREWHERE create_time >= '2024-01-01'
WHERE amount > 100;

-- 避免 SELECT *
SELECT order_id, amount, create_time
FROM orders;

-- 使用 FINAL 去重(ReplacingMergeTree)
SELECT *
FROM user_profile
FINAL
WHERE user_id = 'user123';

-- 分布式查询优化
SELECT province, sum(amount)
FROM orders
WHERE create_time >= today()
GROUP BY province
SETTINGS distributed_aggregation_memory_efficient = 1;

-- 使用物化视图
SELECT date, province, order_count, total_amount
FROM order_stats_mv
WHERE date >= '2024-01-01';

-- 并行查询
SELECT *
FROM orders
SETTINGS max_threads = 8;

5. 写入优化

-- 批量写入(推荐 10000-100000 行)
INSERT INTO orders VALUES
    ('order1', 'user1', 100.00, '2024-01-01 10:00:00'),
    ('order2', 'user2', 200.00, '2024-01-01 10:01:00'),
    -- ... 更多数据
    ;

-- 异步写入
INSERT INTO orders SETTINGS async_insert = 1, wait_for_async_insert = 0
VALUES (...);

-- 并行写入(多个客户端同时写入不同分区)

-- 写入性能监控
SELECT
    table,
    sum(rows) AS total_rows,
    sum(bytes) AS total_bytes,
    formatReadableSize(total_bytes) AS size,
    count() AS parts_count
FROM system.parts
WHERE table = 'orders' AND active
GROUP BY table;

Kafka 性能优化

1. 分区设计

# 创建 Topic(32 个分区,3 个副本)
kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --topic orders \
    --partitions 32 \
    --replication-factor 3

# 增加分区
kafka-topics.sh --alter \
    --bootstrap-server localhost:9092 \
    --topic orders \
    --partitions 64

分区数量计算:

  • 目标吞吐量 / 单分区吞吐量 = 分区数
  • 单分区吞吐量:生产 10-30 MB/s,消费 30-50 MB/s
  • 建议:16-64 个分区

2. 生产者优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 性能优化参数
props.put("acks", "1");  // 0: 不等待, 1: leader 确认, all: 所有副本确认
props.put("compression.type", "lz4");  // 压缩算法:lz4/snappy/gzip
props.put("batch.size", 16384);  // 批次大小 16KB
props.put("linger.ms", 10);  // 等待时间 10ms
props.put("buffer.memory", 33554432);  // 缓冲区 32MB
props.put("max.in.flight.requests.per.connection", 5);  // 最大未确认请求

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 异步发送
producer.send(new ProducerRecord<>("orders", key, value), (metadata, exception) -> {
    if (exception != null) {
        // 处理异常
    }
});

3. 消费者优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 性能优化参数
props.put("fetch.min.bytes", 1024);  // 最小拉取 1KB
props.put("fetch.max.wait.ms", 500);  // 最大等待 500ms
props.put("max.partition.fetch.bytes", 1048576);  // 单分区最大 1MB
props.put("max.poll.records", 500);  // 单次拉取最大记录数
props.put("enable.auto.commit", false);  // 手动提交
props.put("auto.offset.reset", "earliest");  // 从最早开始消费

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
    }
    // 手动提交
    consumer.commitSync();
}

端到端性能优化

1. 延迟优化

环节优化措施效果
Kafka增加分区数、优化网络配置延迟降低 50%
Flink调整并行度、优化算子延迟降低 30%
存储批量写入、异步写入延迟降低 40%
查询添加索引、物化视图延迟降低 60%

2. 吞吐优化

环节优化措施效果
Kafka增加分区数、批量发送吞吐提升 2 倍
Flink增加并行度、批量处理吞吐提升 3 倍
存储批量写入、并行写入吞吐提升 5 倍

3. 资源优化

资源类型优化措施效果
CPU合理设置并行度利用率提升 30%
内存优化状态管理内存占用降低 40%
网络数据压缩传输网络流量降低 50%
存储数据压缩、冷热分离存储成本降低 60%

性能优化总结表:

组件优化项关键参数效果
Flink并行度parallelism提升吞吐
FlinkCheckpointinterval, timeout降低延迟
Flink状态后端RocksDB + 增量大状态支持
ClickHouse表引擎MergeTree 系列查询加速
ClickHouse分区按月/天分区数据管理
ClickHouse批量写入10000+ 行写入加速
Kafka分区数32-64并行度
Kafka压缩lz4/snappy降低网络
Kafka批量发送batch.size提升吞吐

实时数仓监控与运维

监控指标体系

实时数仓监控指标分为系统指标、业务指标和数据质量指标三大类。

核心监控指标表:

类别指标阈值告警级别说明
FlinkTask 失败率> 1%P0任务异常
FlinkCheckpoint 失败率> 5%P1状态异常
Flink反压率> 80%P1性能瓶颈
Flink数据延迟> 5minP1实时性下降
Kafka消息堆积> 100万P1消费滞后
Kafka磁盘使用率> 80%P1存储不足
ClickHouse查询延迟> 3sP2查询慢
ClickHouse写入失败率> 1%P0数据丢失
业务数据断流> 5minP0数据中断
业务数据重复率> 0.1%P1数据质量

Flink 监控指标采集:

// 自定义 Metrics
public class OrderProcessFunction extends ProcessFunction<Order, Order> {
    
    private transient Counter processedCounter;
    private transient Meter processingRate;
    private transient Histogram amountHistogram;
    private transient Gauge<Long> delayGauge;
    
    @Override
    public void open(Configuration parameters) {
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        
        // Counter:计数器
        processedCounter = metricGroup.counter("processed_orders");
        
        // Meter:速率
        processingRate = metricGroup.meter("processing_rate", new MeterView(60));
        
        // Histogram:分布
        amountHistogram = metricGroup.histogram("order_amount", 
            new DescriptiveStatisticsHistogram(1000));
        
        // Gauge:实时值
        delayGauge = metricGroup.gauge("processing_delay", () -> {
            return System.currentTimeMillis() - lastProcessTime;
        });
    }
    
    @Override
    public void processElement(Order order, Context ctx, Collector<Order> out) {
        processedCounter.inc();
        processingRate.markEvent();
        amountHistogram.update(order.getAmount().longValue());
        
        out.collect(order);
    }
}

// Prometheus Reporter 配置
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249

Prometheus + Grafana 监控:

# prometheus.yml
scrape_configs:
  - job_name: 'flink'
    static_configs:
      - targets: ['flink-jobmanager:9249', 'flink-taskmanager:9249']
  
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-broker:9308']
  
  - job_name: 'clickhouse'
    static_configs:
      - targets: ['clickhouse:9363']

Grafana 关键面板:

-- Flink 任务状态
flink_jobmanager_job_uptime{job_name="order-processing"}

-- Flink 数据处理速率
rate(flink_taskmanager_job_task_operator_numRecordsIn[1m])

-- Flink Checkpoint 时长
flink_jobmanager_job_lastCheckpointDuration

-- Kafka 消息堆积
kafka_consumergroup_lag{group="order-consumer"}

-- ClickHouse 查询 QPS
rate(ClickHouseProfileEvents_Query[1m])

告警机制设计

告警规则配置:

# alertmanager.yml
groups:
  - name: flink_alerts
    rules:
      - alert: FlinkTaskFailed
        expr: flink_jobmanager_job_numRestarts > 3
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Flink 任务频繁重启"
          description: "任务 {{ $labels.job_name }} 在 5 分钟内重启超过 3 次"
      
      - alert: FlinkCheckpointFailed
        expr: rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Flink Checkpoint 失败率过高"
          description: "任务 {{ $labels.job_name }} Checkpoint 失败率 > 10%"
      
      - alert: FlinkBackpressure
        expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 800
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Flink 任务反压"
          description: "任务 {{ $labels.job_name }} 反压率 > 80%"
  
  - name: kafka_alerts
    rules:
      - alert: KafkaConsumerLag
        expr: kafka_consumergroup_lag > 1000000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消息堆积"
          description: "消费者组 {{ $labels.group }} 堆积超过 100 万条"
      
      - alert: KafkaDiskUsage
        expr: kafka_server_BrokerTopicMetrics_BytesInPerSec > 0.8
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka 磁盘使用率过高"
          description: "Broker {{ $labels.broker }} 磁盘使用率 > 80%"
  
  - name: clickhouse_alerts
    rules:
      - alert: ClickHouseQuerySlow
        expr: histogram_quantile(0.95, rate(ClickHouseProfileEvents_Query[5m])) > 3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "ClickHouse 查询慢"
          description: "P95 查询延迟 > 3 秒"

故障排查手册

常见故障及排查方法:

1. Flink 任务频繁重启

# 查看任务日志
kubectl logs -f flink-taskmanager-xxx

# 查看异常堆栈
grep "Exception" flink-taskmanager.log | tail -100

# 查看 Checkpoint 失败原因
curl http://jobmanager:8081/jobs/<job-id>/checkpoints

常见原因:

  • OOM:增加内存或优化代码
  • 网络超时:增加超时时间
  • 外部依赖故障:增加重试机制
  • 数据倾斜:重新分区

2. Kafka 消息堆积

# 查看消费者组延迟
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group order-consumer

# 查看 Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 \
    --describe --topic orders

解决方案:

  • 增加消费者并行度
  • 优化消费逻辑
  • 增加 Kafka 分区数
  • 临时增加消费者实例

3. ClickHouse 查询慢

-- 查看慢查询
SELECT
    query,
    query_duration_ms,
    read_rows,
    read_bytes,
    memory_usage
FROM system.query_log
WHERE query_duration_ms > 3000
ORDER BY query_duration_ms DESC
LIMIT 10;

-- 查看正在执行的查询
SELECT
    query_id,
    user,
    query,
    elapsed,
    read_rows,
    memory_usage
FROM system.processes;

-- Kill 慢查询
KILL QUERY WHERE query_id = 'xxx';

优化方案:

  • 添加 PREWHERE 过滤
  • 使用物化视图
  • 优化分区裁剪
  • 增加索引

4. 数据延迟过高

排查流程:

  1. 检查 Kafka 堆积情况
  2. 检查 Flink 反压情况
  3. 检查 Sink 写入性能
  4. 检查网络和外部依赖

端到端延迟监控:

// 在数据中添加时间戳
public class TimestampedOrder {
    private Order order;
    private long sourceTimestamp;  // 数据产生时间
    private long kafkaTimestamp;   // Kafka 接收时间
    private long processTimestamp; // Flink 处理时间
    private long sinkTimestamp;    // Sink 写入时间
}

// 计算各阶段延迟
public class LatencyMonitorFunction extends ProcessFunction<Order, Order> {
    private transient Histogram kafkaLatency;
    private transient Histogram processLatency;
    private transient Histogram sinkLatency;
    
    @Override
    public void processElement(Order order, Context ctx, Collector<Order> out) {
        long now = System.currentTimeMillis();
        
        // Kafka 延迟
        kafkaLatency.update(now - order.getKafkaTimestamp());
        
        // 处理延迟
        processLatency.update(now - order.getSourceTimestamp());
        
        order.setProcessTimestamp(now);
        out.collect(order);
    }
}

数据质量保障

数据质量保障是实时数仓的重要环节,包括数据校验、数据对账、数据修复。

1. 数据校验

// 数据完整性校验
public class DataValidationFunction extends ProcessFunction<Order, Order> {
    
    private transient Counter validCounter;
    private transient Counter invalidCounter;
    
    @Override
    public void processElement(Order order, Context ctx, Collector<Order> out) {
        List<String> errors = new ArrayList<>();
        
        // 必填字段校验
        if (order.getOrderId() == null || order.getOrderId().isEmpty()) {
            errors.add("订单 ID 为空");
        }
        
        // 数值范围校验
        if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            errors.add("订单金额必须大于 0");
        }
        
        // 枚举值校验
        if (!Arrays.asList("PENDING", "PAID", "CANCELLED").contains(order.getStatus())) {
            errors.add("订单状态非法");
        }
        
        // 时间合理性校验
        if (order.getCreateTime().isAfter(LocalDateTime.now())) {
            errors.add("订单时间不能是未来时间");
        }
        
        if (errors.isEmpty()) {
            validCounter.inc();
            out.collect(order);
        } else {
            invalidCounter.inc();
            log.error("数据校验失败: orderId={}, errors={}", 
                order.getOrderId(), errors);
            ctx.output(errorOutputTag, order);
        }
    }
}

2. 数据对账

-- 实时数仓 vs 离线数仓对账
-- 实时数仓(ClickHouse)
SELECT
    toDate(create_time) AS date,
    count() AS order_count,
    sum(amount) AS total_amount
FROM dwd_order
WHERE date = today()
GROUP BY date;

-- 离线数仓(Hive)
SELECT
    dt AS date,
    count(*) AS order_count,
    sum(amount) AS total_amount
FROM dwd_order
WHERE dt = '${date}'
GROUP BY dt;

-- 对账差异分析
SELECT
    a.date,
    a.order_count AS realtime_count,
    b.order_count AS batch_count,
    a.order_count - b.order_count AS diff_count,
    (a.order_count - b.order_count) * 100.0 / b.order_count AS diff_rate
FROM realtime_stats a
JOIN batch_stats b ON a.date = b.date
WHERE abs(diff_rate) > 1;  -- 差异超过 1%

3. 数据修复

// 数据回溯修复
public class DataRepairJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 从 Kafka 指定 offset 开始消费
        Map<TopicPartition, Long> specificStartOffsets = new HashMap<>();
        specificStartOffsets.put(new TopicPartition("orders", 0), 1000000L);
        specificStartOffsets.put(new TopicPartition("orders", 1), 1000000L);
        
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
            "orders",
            new SimpleStringSchema(),
            properties
        );
        consumer.setStartFromSpecificOffsets(specificStartOffsets);
        
        DataStream<Order> stream = env
            .addSource(consumer)
            .map(new OrderMapper())
            .filter(order -> {
                // 只处理需要修复的数据
                return order.getCreateTime().isAfter(repairStartTime) &&
                       order.getCreateTime().isBefore(repairEndTime);
            });
        
        // 重新处理并写入
        stream.addSink(new ClickHouseSink<>(...));
        
        env.execute("Data Repair Job");
    }
}

运维自动化

运维最佳实践:

实践说明工具
监控告警7x24 小时监控,及时告警Prometheus + Grafana + AlertManager
日志收集集中式日志管理ELK/Loki
链路追踪端到端链路追踪Jaeger/Zipkin
自动化运维自动扩缩容、故障自愈Kubernetes + Operator
灰度发布降低发布风险Argo Rollouts
数据备份定期备份 Checkpoint 和数据HDFS/S3
应急预案故障应急流程Runbook

自动化运维脚本示例:

#!/bin/bash
# Flink 任务健康检查脚本

FLINK_REST_URL="http://flink-jobmanager:8081"
ALERT_WEBHOOK="https://hooks.dingtalk.com/robot/send?access_token=xxx"

# 获取所有运行中的任务
jobs=$(curl -s "${FLINK_REST_URL}/jobs/overview" | jq -r '.jobs[] | select(.state=="RUNNING")')

if [ -z "$jobs" ]; then
    echo "警告:没有运行中的 Flink 任务"
    send_alert "Flink 任务全部停止"
    exit 1
fi

# 检查每个任务的 Checkpoint 状态
echo "$jobs" | jq -r '.jid' | while read job_id; do
    job_name=$(echo "$jobs" | jq -r "select(.jid==\"$job_id\") | .name")
    
    # 获取最近的 Checkpoint 信息
    checkpoint_info=$(curl -s "${FLINK_REST_URL}/jobs/${job_id}/checkpoints")
    
    # 检查最近 Checkpoint 是否成功
    last_status=$(echo "$checkpoint_info" | jq -r '.latest.completed.status // "NONE"')
    
    if [ "$last_status" != "COMPLETED" ]; then
        echo "警告:任务 ${job_name} 最近 Checkpoint 失败"
        send_alert "任务 ${job_name} Checkpoint 异常: ${last_status}"
    fi
    
    # 检查 Checkpoint 时长
    last_duration=$(echo "$checkpoint_info" | jq -r '.latest.completed.duration // 0')
    if [ "$last_duration" -gt 300000 ]; then  # 超过 5 分钟
        echo "警告:任务 ${job_name} Checkpoint 耗时过长: ${last_duration}ms"
        send_alert "任务 ${job_name} Checkpoint 耗时 ${last_duration}ms"
    fi
done

# 检查 TaskManager 存活
tm_count=$(curl -s "${FLINK_REST_URL}/taskmanagers" | jq '.taskmanagers | length')
expected_tm=8

if [ "$tm_count" -lt "$expected_tm" ]; then
    echo "警告:TaskManager 数量不足: ${tm_count}/${expected_tm}"
    send_alert "TaskManager 数量不足: ${tm_count}/${expected_tm}"
fi

echo "健康检查完成: $(date)"

Flink 任务自动恢复:

// Kubernetes Operator 自动恢复配置
@Component
public class FlinkJobRecoveryService {
    
    @Autowired
    private FlinkRestClient flinkClient;
    
    @Autowired
    private KubernetesClient k8sClient;
    
    // 定期检查任务状态
    @Scheduled(fixedRate = 30000)  // 每 30 秒
    public void checkAndRecover() {
        List<FlinkJob> jobs = flinkClient.listJobs();
        
        for (FlinkJob job : jobs) {
            if ("FAILED".equals(job.getState())) {
                log.warn("检测到失败任务: {}", job.getName());
                
                // 获取最近的 Savepoint
                String savepointPath = flinkClient.getLatestSavepoint(job.getJobId());
                
                if (savepointPath != null) {
                    // 从 Savepoint 恢复
                    log.info("从 Savepoint 恢复任务: {}, path: {}", 
                        job.getName(), savepointPath);
                    flinkClient.restoreFromSavepoint(job.getJobId(), savepointPath);
                } else {
                    // 从 Checkpoint 恢复
                    String checkpointPath = flinkClient.getLatestCheckpoint(job.getJobId());
                    if (checkpointPath != null) {
                        log.info("从 Checkpoint 恢复任务: {}", job.getName());
                        flinkClient.restoreFromCheckpoint(job.getJobId(), checkpointPath);
                    } else {
                        log.error("无法恢复任务: {},无可用的 Savepoint/Checkpoint", 
                            job.getName());
                        alertService.sendCriticalAlert("任务无法自动恢复: " + job.getName());
                    }
                }
            }
        }
    }
}

日志收集与分析(Loki + Grafana):

# promtail 配置 - 采集 Flink 日志
server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: flink-logs
    static_configs:
      - targets:
          - localhost
        labels:
          job: flink
          __path__: /opt/flink/log/*.log
    pipeline_stages:
      - regex:
          expression: '(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<message>.*)'
      - labels:
          level:
      - timestamp:
          source: timestamp
          format: '2006-01-02 15:04:05,000'

实时数仓实战案例

电商实时大屏

**业务场景:**双十一实时大屏,展示实时 GMV、订单量、UV 等核心指标。

技术架构:

  • 数据采集:Canal CDC
  • 消息队列:Kafka
  • 实时计算:Flink
  • 存储:ClickHouse(明细)+ Redis(实时指标)
  • 展示:DataV/Grafana

核心指标计算:

// 实时 GMV 计算
public class RealtimeGMVJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取订单流
        DataStream<Order> orderStream = env
            .addSource(new FlinkKafkaConsumer<>("ods_order", ...))
            .filter(order -> "PAID".equals(order.getStatus()));
        
        // 1分钟滚动窗口聚合
        DataStream<GMVStats> gmvStream = orderStream
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                    .withTimestampAssigner((order, ts) -> order.getCreateTime())
            )
            .keyBy(order -> "global")  // 全局聚合
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new AggregateFunction<Order, GMVAccumulator, GMVStats>() {
                @Override
                public GMVAccumulator createAccumulator() {
                    return new GMVAccumulator();
                }
                
                @Override
                public GMVAccumulator add(Order order, GMVAccumulator acc) {
                    acc.orderCount++;
                    acc.totalAmount = acc.totalAmount.add(order.getAmount());
                    acc.userSet.add(order.getUserId());
                    return acc;
                }
                
                @Override
                public GMVStats getResult(GMVAccumulator acc) {
                    return GMVStats.builder()
                        .windowEnd(System.currentTimeMillis())
                        .gmv(acc.totalAmount)
                        .orderCount(acc.orderCount)
                        .userCount(acc.userSet.size())
                        .avgOrderAmount(acc.totalAmount.divide(
                            BigDecimal.valueOf(acc.orderCount), 2, RoundingMode.HALF_UP))
                        .build();
                }
                
                @Override
                public GMVAccumulator merge(GMVAccumulator a, GMVAccumulator b) {
                    a.orderCount += b.orderCount;
                    a.totalAmount = a.totalAmount.add(b.totalAmount);
                    a.userSet.addAll(b.userSet);
                    return a;
                }
            });
        
        // 写入 Redis
        gmvStream.addSink(new RedisSink<>(...));
        
        env.execute("Realtime GMV Job");
    }
}

大屏 API 接口:

@RestController
@RequestMapping("/api/realtime")
public class RealtimeDashboardController {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private ClickHouseJdbcTemplate clickHouse;
    
    // 实时 GMV
    @GetMapping("/gmv")
    public GMVStats getRealtimeGMV() {
        String json = redisTemplate.opsForValue().get("realtime:gmv:latest");
        return JSON.parseObject(json, GMVStats.class);
    }
    
    // 实时订单趋势(最近 1 小时)
    @GetMapping("/order-trend")
    public List<OrderTrend> getOrderTrend() {
        String sql = """
            SELECT
                toStartOfMinute(create_time) AS time,
                count() AS order_count,
                sum(amount) AS total_amount
            FROM dwd_order
            WHERE create_time >= now() - INTERVAL 1 HOUR
            GROUP BY time
            ORDER BY time
            """;
        return clickHouse.query(sql, new OrderTrendRowMapper());
    }
    
    // TOP 省份排行
    @GetMapping("/top-provinces")
    public List<ProvinceStats> getTopProvinces(@RequestParam(defaultValue = "10") int limit) {
        String sql = """
            SELECT
                province,
                count() AS order_count,
                sum(amount) AS total_amount
            FROM dwd_order
            WHERE create_time >= today()
            GROUP BY province
            ORDER BY total_amount DESC
            LIMIT ?
            """;
        return clickHouse.query(sql, new ProvinceStatsRowMapper(), limit);
    }
}

实时风控系统

**业务场景:**实时识别异常交易、欺诈行为,毫秒级响应。

风控规则:

规则描述阈值动作
高频交易1 分钟内交易次数> 10 次拦截
大额交易单笔交易金额> 10000 元人工审核
异地登录短时间内异地登录< 1 小时二次验证
设备异常新设备首次交易-短信验证
黑名单命中黑名单-拒绝

实时风控实现:

public class RealTimeRiskControlJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取交易流
        DataStream<Transaction> txnStream = env
            .addSource(new FlinkKafkaConsumer<>("transactions", ...));
        
        // 规则 1:高频交易检测
        DataStream<RiskAlert> highFreqAlerts = txnStream
            .keyBy(Transaction::getUserId)
            .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
            .aggregate(new CountAggregateFunction())
            .filter(count -> count > 10)
            .map(count -> RiskAlert.builder()
                .userId(count.getUserId())
                .riskType("HIGH_FREQUENCY")
                .riskLevel("HIGH")
                .action("BLOCK")
                .build());
        
        // 规则 2:大额交易检测
        DataStream<RiskAlert> largeAmountAlerts = txnStream
            .filter(txn -> txn.getAmount().compareTo(new BigDecimal("10000")) > 0)
            .map(txn -> RiskAlert.builder()
                .userId(txn.getUserId())
                .transactionId(txn.getTxnId())
                .riskType("LARGE_AMOUNT")
                .riskLevel("MEDIUM")
                .action("MANUAL_REVIEW")
                .build());
        
        // 规则 3:异地登录检测(基于状态)
        DataStream<RiskAlert> locationAlerts = txnStream
            .keyBy(Transaction::getUserId)
            .process(new LocationCheckFunction());
        
        // 规则 4:黑名单检测(异步 IO)
        DataStream<RiskAlert> blacklistAlerts = AsyncDataStream
            .unorderedWait(
                txnStream,
                new AsyncBlacklistCheckFunction(),
                1000,
                TimeUnit.MILLISECONDS
            )
            .filter(Objects::nonNull);
        
        // 合并所有告警
        DataStream<RiskAlert> allAlerts = highFreqAlerts
            .union(largeAmountAlerts)
            .union(locationAlerts)
            .union(blacklistAlerts);
        
        // 写入 Redis(实时查询)
        allAlerts.addSink(new RedisSink<>(...));
        
        // 写入 Kafka(下游处理)
        allAlerts.addSink(new FlinkKafkaProducer<>("risk-alerts", ...));
        
        env.execute("Real-Time Risk Control Job");
    }
}

实时推荐系统

**业务场景:**根据用户实时行为,动态更新用户画像,实时推荐商品。

实时特征计算:

public class RealtimeFeatureJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取用户行为流
        DataStream<UserBehavior> behaviorStream = env
            .addSource(new FlinkKafkaConsumer<>("user-behavior", ...));
        
        // 实时特征计算
        DataStream<UserFeature> featureStream = behaviorStream
            .keyBy(UserBehavior::getUserId)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new AggregateFunction<UserBehavior, FeatureAccumulator, UserFeature>() {
                @Override
                public FeatureAccumulator createAccumulator() {
                    return new FeatureAccumulator();
                }
                
                @Override
                public FeatureAccumulator add(UserBehavior behavior, FeatureAccumulator acc) {
                    // 统计各类行为次数
                    switch (behavior.getType()) {
                        case "VIEW":
                            acc.viewCount++;
                            acc.viewedCategories.add(behavior.getCategoryId());
                            break;
                        case "CLICK":
                            acc.clickCount++;
                            break;
                        case "CART":
                            acc.cartCount++;
                            break;
                        case "BUY":
                            acc.buyCount++;
                            acc.totalAmount = acc.totalAmount.add(behavior.getAmount());
                            break;
                    }
                    return acc;
                }
                
                @Override
                public UserFeature getResult(FeatureAccumulator acc) {
                    return UserFeature.builder()
                        .userId(acc.userId)
                        .viewCount(acc.viewCount)
                        .clickCount(acc.clickCount)
                        .cartCount(acc.cartCount)
                        .buyCount(acc.buyCount)
                        .totalAmount(acc.totalAmount)
                        .clickRate(acc.clickCount * 1.0 / acc.viewCount)
                        .conversionRate(acc.buyCount * 1.0 / acc.clickCount)
                        .preferredCategories(new ArrayList<>(acc.viewedCategories))
                        .build();
                }
                
                @Override
                public FeatureAccumulator merge(FeatureAccumulator a, FeatureAccumulator b) {
                    a.viewCount += b.viewCount;
                    a.clickCount += b.clickCount;
                    a.cartCount += b.cartCount;
                    a.buyCount += b.buyCount;
                    a.totalAmount = a.totalAmount.add(b.totalAmount);
                    a.viewedCategories.addAll(b.viewedCategories);
                    return a;
                }
            });
        
        // 写入 HBase(用户画像)
        featureStream.addSink(new HBaseSink<>(...));
        
        // 写入 Redis(实时查询)
        featureStream.addSink(new RedisSink<>(...));
        
        env.execute("Realtime Feature Job");
    }
}

**业务场景:**构建加密货币实时行情数据仓库,支持实时行情展示、技术指标计算、交易信号生成、风险监控等功能。

业务需求:

需求类型具体需求延迟要求数据量
实时行情价格、成交量、涨跌幅实时展示< 1 秒10 万笔/秒
技术指标MA、EMA、MACD、RSI、布林带< 5 秒计算密集
交易信号金叉死叉、突破信号、背离信号< 3 秒实时触发
风险监控异常波动、大额交易、流动性监控< 2 秒实时告警
历史回测策略回测、绩效分析分钟级TB 级历史数据

技术架构:

graph TB subgraph "数据源层" A1["Binance WebSocket
实时行情"] A2["OKX WebSocket
实时行情"] A3["Coinbase WebSocket
实时行情"] A4["历史数据 API
补全数据"] end subgraph "数据采集层" B1["Flink Source
WebSocket 连接器"] B2["数据标准化
统一格式"] end subgraph "消息队列层" C1["Kafka Topic
ods_crypto_tick
原始 Tick 数据"] C2["Kafka Topic
ods_crypto_kline
K 线数据"] C3["Kafka Topic
ods_crypto_trade
成交数据"] end subgraph "实时计算层 Flink" D1["DWD 层
数据清洗
去重、过滤"] D2["DWS 层
K 线聚合
1m/5m/15m/1h"] D3["DWS 层
技术指标
MA/EMA/MACD/RSI"] D4["ADS 层
交易信号
金叉死叉/突破"] D5["ADS 层
风险监控
异常检测"] end subgraph "存储层" E1["Doris
实时 OLAP
多维分析"] E2["Redis
实时行情
最新价格"] E3["ClickHouse
历史数据
冷数据归档"] end subgraph "应用层" F1["实时行情大屏
价格/成交量"] F2["交易策略系统
信号生成"] F3["风控告警系统
异常监控"] F4["数据分析平台
回测/研究"] end A1 --> B1 A2 --> B1 A3 --> B1 A4 --> B2 B1 --> C1 B1 --> C2 B1 --> C3 B2 --> C1 C1 --> D1 C2 --> D1 C3 --> D1 D1 --> D2 D2 --> D3 D3 --> D4 D3 --> D5 D2 --> E1 D3 --> E1 D4 --> E1 D5 --> E2 D2 --> E3 E1 --> F1 E1 --> F2 E2 --> F3 E1 --> F4 E3 --> F4 style C1 fill:#ffd43b style D2 fill:#4dabf7 style D3 fill:#4dabf7 style E1 fill:#51cf66 style E2 fill:#ff6b6b

数据模型设计:

ODS 层(原始数据层):

-- Kafka Topic: ods_crypto_tick(Tick 数据)
{
  "exchange": "binance",           -- 交易所
  "symbol": "BTCUSDT",             -- 交易对
  "timestamp": 1709827200000,      -- 时间戳(毫秒)
  "price": 68500.50,               -- 最新价
  "volume": 1.25,                  -- 成交量
  "bid_price": 68500.00,           -- 买一价
  "ask_price": 68501.00,           -- 卖一价
  "bid_volume": 5.2,               -- 买一量
  "ask_volume": 3.8                -- 卖一量
}

-- Kafka Topic: ods_crypto_kline(K 线数据)
{
  "exchange": "binance",
  "symbol": "BTCUSDT",
  "interval": "1m",                -- K 线周期
  "open_time": 1709827200000,
  "close_time": 1709827259999,
  "open": 68500.00,                -- 开盘价
  "high": 68550.00,                -- 最高价
  "low": 68480.00,                 -- 最低价
  "close": 68520.00,               -- 收盘价
  "volume": 125.5,                 -- 成交量
  "quote_volume": 8598750.00,      -- 成交额
  "trades": 1250                   -- 成交笔数
}

DWD 层(明细数据层):

-- Doris 表:dwd_crypto_tick_realtime
CREATE TABLE dwd_crypto_tick_realtime (
    exchange VARCHAR(20),
    symbol VARCHAR(20),
    tick_time DATETIME,
    price DECIMAL(18, 8),
    volume DECIMAL(18, 8),
    bid_price DECIMAL(18, 8),
    ask_price DECIMAL(18, 8),
    spread DECIMAL(18, 8),           -- 买卖价差
    mid_price DECIMAL(18, 8),        -- 中间价
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(exchange, symbol, tick_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
    "replication_num" = "3",
    "storage_medium" = "SSD",
    "compression" = "LZ4"
);

DWS 层(汇总数据层):

-- Doris 表:dws_crypto_kline_1m(1 分钟 K 线)
CREATE TABLE dws_crypto_kline_1m (
    exchange VARCHAR(20),
    symbol VARCHAR(20),
    kline_time DATETIME,
    open_price DECIMAL(18, 8),
    high_price DECIMAL(18, 8),
    low_price DECIMAL(18, 8),
    close_price DECIMAL(18, 8),
    volume DECIMAL(18, 8),
    quote_volume DECIMAL(18, 8),
    trades INT,
    -- 技术指标
    ma5 DECIMAL(18, 8),              -- 5 周期均线
    ma10 DECIMAL(18, 8),             -- 10 周期均线
    ma20 DECIMAL(18, 8),             -- 20 周期均线
    ema12 DECIMAL(18, 8),            -- 12 周期指数移动平均
    ema26 DECIMAL(18, 8),            -- 26 周期指数移动平均
    macd DECIMAL(18, 8),             -- MACD 值
    signal DECIMAL(18, 8),           -- 信号线
    histogram DECIMAL(18, 8),        -- 柱状图
    rsi DECIMAL(18, 8),              -- RSI 指标
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(exchange, symbol, kline_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
    "replication_num" = "3"
);

-- Doris 物化视图:实时涨跌幅
CREATE MATERIALIZED VIEW mv_crypto_change_24h
AS
SELECT 
    symbol,
    MAX(close_price) as high_24h,
    MIN(close_price) as low_24h,
    FIRST_VALUE(close_price) as open_24h,
    LAST_VALUE(close_price) as close_24h,
    (LAST_VALUE(close_price) - FIRST_VALUE(close_price)) / FIRST_VALUE(close_price) * 100 as change_pct_24h
FROM dws_crypto_kline_1m
WHERE kline_time >= NOW() - INTERVAL 24 HOUR
GROUP BY symbol;

ADS 层(应用数据层):

-- Doris 表:ads_crypto_signal(交易信号)
CREATE TABLE ads_crypto_signal (
    signal_id VARCHAR(64),
    exchange VARCHAR(20),
    symbol VARCHAR(20),
    signal_type VARCHAR(20),         -- GOLDEN_CROSS, DEATH_CROSS, BREAKOUT, RSI_OVERSOLD
    signal_time DATETIME,
    price DECIMAL(18, 8),
    strength DECIMAL(5, 2),          -- 信号强度 0-100
    description TEXT,
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP
)
DUPLICATE KEY(signal_id)
DISTRIBUTED BY HASH(symbol) BUCKETS 16;

核心实现代码:

1. WebSocket 数据采集(Flink Source)

// Binance WebSocket 数据源
public class BinanceWebSocketSource extends RichSourceFunction<CryptoTick> {
    
    private volatile boolean running = true;
    private WebSocketClient client;
    private final List<String> symbols;
    
    public BinanceWebSocketSource(List<String> symbols) {
        this.symbols = symbols;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // 构建 WebSocket URL
        String streamNames = symbols.stream()
            .map(s -> s.toLowerCase() + "@ticker")
            .collect(Collectors.joining("/"));
        
        String wsUrl = "wss://stream.binance.com:9443/stream?streams=" + streamNames;
        
        // 创建 WebSocket 客户端
        client = new WebSocketClient(new URI(wsUrl)) {
            @Override
            public void onMessage(String message) {
                try {
                    CryptoTick tick = parseMessage(message);
                    sourceContext.collect(tick);
                } catch (Exception e) {
                    log.error("Failed to parse message: {}", message, e);
                }
            }
            
            @Override
            public void onError(Exception ex) {
                log.error("WebSocket error", ex);
                // 重连逻辑
                reconnect();
            }
        };
        
        client.connect();
    }
    
    @Override
    public void run(SourceContext<CryptoTick> ctx) throws Exception {
        while (running) {
            Thread.sleep(1000);  // 保持连接
        }
    }
    
    @Override
    public void cancel() {
        running = false;
        if (client != null) {
            client.close();
        }
    }
    
    private CryptoTick parseMessage(String message) {
        JSONObject json = JSON.parseObject(message);
        JSONObject data = json.getJSONObject("data");
        
        return CryptoTick.builder()
            .exchange("binance")
            .symbol(data.getString("s"))
            .timestamp(data.getLong("E"))
            .price(data.getBigDecimal("c"))
            .volume(data.getBigDecimal("v"))
            .bidPrice(data.getBigDecimal("b"))
            .askPrice(data.getBigDecimal("a"))
            .build();
    }
    
    private void reconnect() {
        try {
            Thread.sleep(5000);  // 等待 5 秒
            client.reconnect();
        } catch (Exception e) {
            log.error("Reconnect failed", e);
        }
    }
}

2. K 线聚合计算(Flink 窗口)

// 1 分钟 K 线聚合
public class KLineAggregateJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000);
        
        // 读取 Tick 数据
        DataStream<CryptoTick> tickStream = env
            .addSource(new FlinkKafkaConsumer<>("ods_crypto_tick", ...))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<CryptoTick>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((tick, ts) -> tick.getTimestamp())
            );
        
        // 1 分钟滚动窗口聚合
        DataStream<KLine> klineStream = tickStream
            .keyBy(tick -> tick.getExchange() + "_" + tick.getSymbol())
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new KLineAggregateFunction());
        
        // 写入 Kafka
        klineStream.addSink(new FlinkKafkaProducer<>("ods_crypto_kline", ...));
        
        // 写入 Doris
        klineStream.addSink(DorisSink.<KLine>builder()
            .setDorisOptions(DorisOptions.builder()
                .setFenodes("doris-fe:8030")
                .setTableIdentifier("crypto_db.dws_crypto_kline_1m")
                .setUsername("root")
                .setPassword("password")
                .build())
            .setSerializer(new KLineDorisSerializer())
            .build());
        
        env.execute("KLine Aggregate Job");
    }
}

// K 线聚合函数
public class KLineAggregateFunction 
    implements AggregateFunction<CryptoTick, KLineAccumulator, KLine> {
    
    @Override
    public KLineAccumulator createAccumulator() {
        return new KLineAccumulator();
    }
    
    @Override
    public KLineAccumulator add(CryptoTick tick, KLineAccumulator acc) {
        if (acc.open == null) {
            acc.open = tick.getPrice();
            acc.openTime = tick.getTimestamp();
        }
        
        acc.high = acc.high == null ? tick.getPrice() : 
            acc.high.max(tick.getPrice());
        acc.low = acc.low == null ? tick.getPrice() : 
            acc.low.min(tick.getPrice());
        acc.close = tick.getPrice();
        acc.closeTime = tick.getTimestamp();
        acc.volume = acc.volume.add(tick.getVolume());
        acc.trades++;
        
        acc.exchange = tick.getExchange();
        acc.symbol = tick.getSymbol();
        
        return acc;
    }
    
    @Override
    public KLine getResult(KLineAccumulator acc) {
        return KLine.builder()
            .exchange(acc.exchange)
            .symbol(acc.symbol)
            .interval("1m")
            .openTime(acc.openTime)
            .closeTime(acc.closeTime)
            .open(acc.open)
            .high(acc.high)
            .low(acc.low)
            .close(acc.close)
            .volume(acc.volume)
            .trades(acc.trades)
            .build();
    }
    
    @Override
    public KLineAccumulator merge(KLineAccumulator a, KLineAccumulator b) {
        a.high = a.high.max(b.high);
        a.low = a.low.min(b.low);
        a.close = b.close;
        a.closeTime = b.closeTime;
        a.volume = a.volume.add(b.volume);
        a.trades += b.trades;
        return a;
    }
}

3. 技术指标计算(Flink 状态)

// 技术指标计算任务
public class TechnicalIndicatorJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取 K 线数据
        DataStream<KLine> klineStream = env
            .addSource(new FlinkKafkaConsumer<>("ods_crypto_kline", ...));
        
        // 计算技术指标
        DataStream<KLineWithIndicators> indicatorStream = klineStream
            .keyBy(kline -> kline.getExchange() + "_" + kline.getSymbol())
            .process(new TechnicalIndicatorProcessFunction());
        
        // 写入 Doris
        indicatorStream.addSink(DorisSink.<KLineWithIndicators>builder()
            .setDorisOptions(...)
            .build());
        
        env.execute("Technical Indicator Job");
    }
}

// 技术指标计算函数
public class TechnicalIndicatorProcessFunction 
    extends KeyedProcessFunction<String, KLine, KLineWithIndicators> {
    
    // 状态:存储历史 K 线数据
    private ListState<BigDecimal> priceHistory;
    private ValueState<BigDecimal> ema12State;
    private ValueState<BigDecimal> ema26State;
    
    @Override
    public void open(Configuration parameters) {
        // 初始化状态
        priceHistory = getRuntimeContext().getListState(
            new ListStateDescriptor<>("price-history", BigDecimal.class));
        ema12State = getRuntimeContext().getState(
            new ValueStateDescriptor<>("ema12", BigDecimal.class));
        ema26State = getRuntimeContext().getState(
            new ValueStateDescriptor<>("ema26", BigDecimal.class));
    }
    
    @Override
    public void processElement(KLine kline, Context ctx, 
                               Collector<KLineWithIndicators> out) throws Exception {
        
        // 添加当前价格到历史
        priceHistory.add(kline.getClose());
        
        // 获取历史价格列表
        List<BigDecimal> prices = new ArrayList<>();
        priceHistory.get().forEach(prices::add);
        
        // 只保留最近 100 个价格
        if (prices.size() > 100) {
            prices = prices.subList(prices.size() - 100, prices.size());
            priceHistory.clear();
            prices.forEach(p -> {
                try {
                    priceHistory.add(p);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        
        // 计算均线
        BigDecimal ma5 = calculateMA(prices, 5);
        BigDecimal ma10 = calculateMA(prices, 10);
        BigDecimal ma20 = calculateMA(prices, 20);
        
        // 计算 EMA
        BigDecimal ema12 = calculateEMA(kline.getClose(), ema12State.value(), 12);
        BigDecimal ema26 = calculateEMA(kline.getClose(), ema26State.value(), 26);
        ema12State.update(ema12);
        ema26State.update(ema26);
        
        // 计算 MACD
        BigDecimal macd = ema12.subtract(ema26);
        
        // 计算 RSI
        BigDecimal rsi = calculateRSI(prices, 14);
        
        // 输出结果
        out.collect(KLineWithIndicators.builder()
            .kline(kline)
            .ma5(ma5)
            .ma10(ma10)
            .ma20(ma20)
            .ema12(ema12)
            .ema26(ema26)
            .macd(macd)
            .rsi(rsi)
            .build());
    }
    
    // 计算简单移动平均
    private BigDecimal calculateMA(List<BigDecimal> prices, int period) {
        if (prices.size() < period) {
            return BigDecimal.ZERO;
        }
        
        List<BigDecimal> recentPrices = prices.subList(prices.size() - period, prices.size());
        BigDecimal sum = recentPrices.stream()
            .reduce(BigDecimal.ZERO, BigDecimal::add);
        
        return sum.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
    }
    
    // 计算指数移动平均
    private BigDecimal calculateEMA(BigDecimal currentPrice, BigDecimal prevEMA, int period) {
        if (prevEMA == null) {
            return currentPrice;
        }
        
        BigDecimal multiplier = BigDecimal.valueOf(2.0 / (period + 1));
        return currentPrice.subtract(prevEMA)
            .multiply(multiplier)
            .add(prevEMA);
    }
    
    // 计算 RSI
    private BigDecimal calculateRSI(List<BigDecimal> prices, int period) {
        if (prices.size() < period + 1) {
            return BigDecimal.valueOf(50);
        }
        
        BigDecimal gains = BigDecimal.ZERO;
        BigDecimal losses = BigDecimal.ZERO;
        
        for (int i = prices.size() - period; i < prices.size(); i++) {
            BigDecimal change = prices.get(i).subtract(prices.get(i - 1));
            if (change.compareTo(BigDecimal.ZERO) > 0) {
                gains = gains.add(change);
            } else {
                losses = losses.add(change.abs());
            }
        }
        
        BigDecimal avgGain = gains.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
        BigDecimal avgLoss = losses.divide(BigDecimal.valueOf(period), 8, RoundingMode.HALF_UP);
        
        if (avgLoss.compareTo(BigDecimal.ZERO) == 0) {
            return BigDecimal.valueOf(100);
        }
        
        BigDecimal rs = avgGain.divide(avgLoss, 8, RoundingMode.HALF_UP);
        return BigDecimal.valueOf(100).subtract(
            BigDecimal.valueOf(100).divide(
                BigDecimal.ONE.add(rs), 8, RoundingMode.HALF_UP));
    }
}

4. 交易信号生成(Flink CEP)

// 交易信号检测任务
public class TradingSignalJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 读取带指标的 K 线数据
        DataStream<KLineWithIndicators> klineStream = env
            .addSource(new FlinkKafkaConsumer<>("dws_crypto_kline_indicator", ...));
        
        // 金叉信号检测(MA5 上穿 MA10)
        DataStream<TradingSignal> goldenCrossSignals = klineStream
            .keyBy(k -> k.getSymbol())
            .process(new GoldenCrossDetector());
        
        // RSI 超卖信号检测(RSI < 30)
        DataStream<TradingSignal> oversoldSignals = klineStream
            .filter(k -> k.getRsi().compareTo(BigDecimal.valueOf(30)) < 0)
            .map(k -> TradingSignal.builder()
                .signalType("RSI_OVERSOLD")
                .symbol(k.getSymbol())
                .price(k.getClose())
                .strength(BigDecimal.valueOf(100).subtract(k.getRsi()))
                .description("RSI 超卖信号,RSI=" + k.getRsi())
                .build());
        
        // 突破信号检测(价格突破 20 日均线)
        DataStream<TradingSignal> breakoutSignals = klineStream
            .keyBy(k -> k.getSymbol())
            .process(new BreakoutDetector());
        
        // 合并所有信号
        DataStream<TradingSignal> allSignals = goldenCrossSignals
            .union(oversoldSignals)
            .union(breakoutSignals);
        
        // 写入 Doris
        allSignals.addSink(DorisSink.<TradingSignal>builder()
            .setDorisOptions(...)
            .build());
        
        // 写入 Redis(实时查询)
        allSignals.addSink(new RedisSink<>(...));
        
        // 发送告警(Kafka)
        allSignals
            .filter(s -> s.getStrength().compareTo(BigDecimal.valueOf(80)) > 0)
            .addSink(new FlinkKafkaProducer<>("trading-alerts", ...));
        
        env.execute("Trading Signal Job");
    }
}

// 金叉检测器
public class GoldenCrossDetector 
    extends KeyedProcessFunction<String, KLineWithIndicators, TradingSignal> {
    
    private ValueState<Boolean> prevCrossState;
    
    @Override
    public void open(Configuration parameters) {
        prevCrossState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("prev-cross", Boolean.class));
    }
    
    @Override
    public void processElement(KLineWithIndicators kline, Context ctx, 
                               Collector<TradingSignal> out) throws Exception {
        
        BigDecimal ma5 = kline.getMa5();
        BigDecimal ma10 = kline.getMa10();
        
        if (ma5 == null || ma10 == null) {
            return;
        }
        
        // 当前是否金叉(MA5 > MA10)
        boolean currentCross = ma5.compareTo(ma10) > 0;
        Boolean prevCross = prevCrossState.value();
        
        // 检测金叉(从下方穿过到上方)
        if (prevCross != null && !prevCross && currentCross) {
            BigDecimal strength = ma5.subtract(ma10)
                .divide(ma10, 4, RoundingMode.HALF_UP)
                .multiply(BigDecimal.valueOf(100));
            
            out.collect(TradingSignal.builder()
                .signalType("GOLDEN_CROSS")
                .symbol(kline.getSymbol())
                .price(kline.getClose())
                .strength(strength)
                .description("MA5 金叉 MA10")
                .build());
        }
        
        // 检测死叉(从上方穿过到下方)
        if (prevCross != null && prevCross && !currentCross) {
            BigDecimal strength = ma10.subtract(ma5)
                .divide(ma10, 4, RoundingMode.HALF_UP)
                .multiply(BigDecimal.valueOf(100));
            
            out.collect(TradingSignal.builder()
                .signalType("DEATH_CROSS")
                .symbol(kline.getSymbol())
                .price(kline.getClose())
                .strength(strength)
                .description("MA5 死叉 MA10")
                .build());
        }
        
        prevCrossState.update(currentCross);
    }
}

5. 实时查询 API(Spring Boot + Doris)

@RestController
@RequestMapping("/api/crypto")
public class CryptoDataController {
    
    @Autowired
    private DorisTemplate dorisTemplate;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    // 获取实时价格
    @GetMapping("/price/{symbol}")
    public CryptoPrice getRealtimePrice(@PathVariable String symbol) {
        String key = "crypto:price:" + symbol;
        String json = redisTemplate.opsForValue().get(key);
        
        if (json != null) {
            return JSON.parseObject(json, CryptoPrice.class);
        }
        
        // Redis 未命中,查询 Doris
        String sql = """
            SELECT symbol, close_price as price, kline_time as update_time
            FROM dws_crypto_kline_1m
            WHERE symbol = ?
            ORDER BY kline_time DESC
            LIMIT 1
            """;
        
        return dorisTemplate.queryForObject(sql, 
            new CryptoPriceRowMapper(), symbol);
    }
    
    // 获取 K 线数据
    @GetMapping("/kline/{symbol}")
    public List<KLine> getKLineData(
            @PathVariable String symbol,
            @RequestParam String interval,
            @RequestParam(defaultValue = "100") int limit) {
        
        String sql = """
            SELECT *
            FROM dws_crypto_kline_1m
            WHERE symbol = ?
            ORDER BY kline_time DESC
            LIMIT ?
            """;
        
        return dorisTemplate.query(sql, new KLineRowMapper(), symbol, limit);
    }
    
    // 获取技术指标
    @GetMapping("/indicators/{symbol}")
    public KLineWithIndicators getIndicators(@PathVariable String symbol) {
        String sql = """
            SELECT *
            FROM dws_crypto_kline_1m
            WHERE symbol = ?
            ORDER BY kline_time DESC
            LIMIT 1
            """;
        
        return dorisTemplate.queryForObject(sql, 
            new IndicatorRowMapper(), symbol);
    }
    
    // 获取交易信号
    @GetMapping("/signals")
    public List<TradingSignal> getRecentSignals(
            @RequestParam(required = false) String symbol,
            @RequestParam(defaultValue = "20") int limit) {
        
        StringBuilder sql = new StringBuilder("""
            SELECT *
            FROM ads_crypto_signal
            WHERE 1=1
            """);
        
        List<Object> params = new ArrayList<>();
        
        if (symbol != null) {
            sql.append(" AND symbol = ?");
            params.add(symbol);
        }
        
        sql.append(" ORDER BY signal_time DESC LIMIT ?");
        params.add(limit);
        
        return dorisTemplate.query(sql.toString(), 
            new TradingSignalRowMapper(), params.toArray());
    }
    
    // 获取市场概览
    @GetMapping("/market-overview")
    public MarketOverview getMarketOverview() {
        String sql = """
            SELECT 
                COUNT(DISTINCT symbol) as total_symbols,
                SUM(volume) as total_volume_24h,
                AVG(change_pct_24h) as avg_change_pct,
                COUNT(CASE WHEN change_pct_24h > 0 THEN 1 END) as gainers,
                COUNT(CASE WHEN change_pct_24h < 0 THEN 1 END) as losers
            FROM mv_crypto_change_24h
            """;
        
        return dorisTemplate.queryForObject(sql, new MarketOverviewRowMapper());
    }
    
    // 获取涨跌幅排行
    @GetMapping("/top-movers")
    public TopMovers getTopMovers(@RequestParam(defaultValue = "10") int limit) {
        String sqlGainers = """
            SELECT symbol, close_24h as price, change_pct_24h
            FROM mv_crypto_change_24h
            ORDER BY change_pct_24h DESC
            LIMIT ?
            """;
        
        String sqlLosers = """
            SELECT symbol, close_24h as price, change_pct_24h
            FROM mv_crypto_change_24h
            ORDER BY change_pct_24h ASC
            LIMIT ?
            """;
        
        List<CryptoMover> gainers = dorisTemplate.query(sqlGainers, 
            new CryptoMoverRowMapper(), limit);
        List<CryptoMover> losers = dorisTemplate.query(sqlLosers, 
            new CryptoMoverRowMapper(), limit);
        
        return TopMovers.builder()
            .gainers(gainers)
            .losers(losers)
            .build();
    }
}

性能优化方案:

1. Kafka 优化

# Kafka Producer 配置
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
acks=1                              # 平衡性能和可靠性
compression.type=lz4                # 启用压缩
batch.size=32768                    # 32KB 批次大小
linger.ms=10                        # 10ms 延迟
buffer.memory=67108864              # 64MB 缓冲区

# Kafka Consumer 配置
fetch.min.bytes=1024                # 最小拉取 1KB
fetch.max.wait.ms=500               # 最大等待 500ms
max.poll.records=1000               # 每次拉取 1000 条

2. Flink 优化

// Flink 配置优化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Checkpoint 配置
env.enableCheckpointing(60000);     // 60 秒
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 状态后端配置(RocksDB)
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setDbStoragePath("/data/flink/rocksdb");
env.setStateBackend(backend);

// 并行度配置
env.setParallelism(16);             // 全局并行度
env.setMaxParallelism(128);         // 最大并行度

// 重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3,                              // 重启次数
    Time.seconds(10)                // 重启间隔
));

3. Doris 优化

-- 表优化配置
CREATE TABLE dws_crypto_kline_1m (
    ...
)
DUPLICATE KEY(exchange, symbol, kline_time)
DISTRIBUTED BY HASH(symbol) BUCKETS 32
PROPERTIES (
    "replication_num" = "3",
    "storage_medium" = "SSD",           -- 使用 SSD
    "compression" = "LZ4",              -- LZ4 压缩
    "bloom_filter_columns" = "symbol",  -- 布隆过滤器
    "colocate_with" = "crypto_group"    -- 数据本地化
);

-- 创建 Rollup 加速查询
ALTER TABLE dws_crypto_kline_1m 
ADD ROLLUP rollup_symbol_hour (
    symbol, 
    DATE_TRUNC(kline_time, 'hour') as hour,
    SUM(volume) as total_volume,
    AVG(close_price) as avg_price
);

-- 分区策略(按天分区)
CREATE TABLE dws_crypto_kline_1m_partitioned (
    ...
)
PARTITION BY RANGE(kline_time) (
    PARTITION p20240301 VALUES LESS THAN ("2024-03-02"),
    PARTITION p20240302 VALUES LESS THAN ("2024-03-03")
)
DISTRIBUTED BY HASH(symbol) BUCKETS 32;

监控告警方案:

1. 数据质量监控

// 数据质量检查任务
public class DataQualityMonitor extends ProcessFunction<KLine, Alert> {
    
    private transient Counter invalidDataCounter;
    private transient Histogram priceHistogram;
    
    @Override
    public void open(Configuration parameters) {
        invalidDataCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("invalid_data_count");
        
        priceHistogram = getRuntimeContext()
            .getMetricGroup()
            .histogram("price_distribution", 
                new DescriptiveStatisticsHistogram(1000));
    }
    
    @Override
    public void processElement(KLine kline, Context ctx, Collector<Alert> out) {
        // 检查价格异常
        if (kline.getHigh().compareTo(kline.getLow()) < 0) {
            invalidDataCounter.inc();
            out.collect(Alert.builder()
                .type("DATA_QUALITY")
                .level("ERROR")
                .message("价格异常:最高价 < 最低价")
                .symbol(kline.getSymbol())
                .build());
        }
        
        // 检查价格波动
        BigDecimal priceChange = kline.getHigh().subtract(kline.getLow())
            .divide(kline.getLow(), 4, RoundingMode.HALF_UP);
        
        if (priceChange.compareTo(BigDecimal.valueOf(0.1)) > 0) {
            out.collect(Alert.builder()
                .type("PRICE_VOLATILITY")
                .level("WARNING")
                .message("价格波动超过 10%")
                .symbol(kline.getSymbol())
                .build());
        }
        
        priceHistogram.update(kline.getClose().longValue());
    }
}

2. 延迟监控

// 端到端延迟监控
public class LatencyMonitor extends ProcessFunction<CryptoTick, Void> {
    
    private transient Histogram e2eLatency;
    
    @Override
    public void open(Configuration parameters) {
        e2eLatency = getRuntimeContext()
            .getMetricGroup()
            .histogram("e2e_latency_ms", 
                new DescriptiveStatisticsHistogram(1000));
    }
    
    @Override
    public void processElement(CryptoTick tick, Context ctx, Collector<Void> out) {
        long latency = System.currentTimeMillis() - tick.getTimestamp();
        e2eLatency.update(latency);
        
        // 延迟告警
        if (latency > 5000) {
            log.warn("High latency detected: {}ms for symbol {}", 
                latency, tick.getSymbol());
        }
    }
}

3. Prometheus + Grafana 监控

# prometheus.yml
scrape_configs:
  - job_name: 'flink'
    static_configs:
      - targets: ['flink-jobmanager:9249']
    
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-exporter:9308']
    
  - job_name: 'doris'
    static_configs:
      - targets: ['doris-fe:8030']

关键监控指标:

组件监控指标告警阈值说明
Kafka消息积压> 10000Topic Lag 过大
Kafka生产速率< 1000 TPS生产速率下降
FlinkCheckpoint 失败率> 5%Checkpoint 频繁失败
Flink反压> 0.8任务反压严重
Flink端到端延迟> 5 秒数据延迟过高
Doris查询 P99 延迟> 1 秒查询性能下降
Doris磁盘使用率> 80%磁盘空间不足
Redis内存使用率> 80%内存不足
系统CPU 使用率> 80%CPU 负载过高
系统内存使用率> 85%内存不足

部署架构:

1. 集群规划

组件节点数配置说明
Kafka316C 64G 2TB SSD3 个 Broker
Flink1 + 6JobManager: 8C 32G
TaskManager: 16C 64G
1 个 JM + 6 个 TM
Doris FE38C 32G 500G SSD3 个 FE(1 Master + 2 Follower)
Doris BE616C 64G 4TB SSD6 个 BE
Redis38C 32G主从 + 哨兵
ZooKeeper34C 16GKafka 依赖

2. 容量规划

数据量估算:
- 交易对数量:500 个
- Tick 频率:平均 10 次/秒/交易对
- 总 TPS:500 * 10 = 5000 TPS
- 单条数据大小:约 200 字节
- 每日数据量:5000 * 200 * 86400 = 86.4 GB/天

存储规划:
- Kafka 保留 7 天:86.4 * 7 = 604.8 GB
- Doris 保留 90 天:86.4 * 90 = 7.78 TB
- 考虑副本和压缩:7.78 * 3 * 0.3 = 7 TB

网络带宽:
- 峰值 TPS:20000(4 倍平均值)
- 峰值带宽:20000 * 200 字节 = 4 MB/s = 32 Mbps
- 建议带宽:100 Mbps 以上

3. Docker Compose 部署示例

version: '3.8'

services:
  # Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_NUM_PARTITIONS: 32
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    volumes:
      - kafka-data:/var/lib/kafka/data
    deploy:
      resources:
        limits:
          cpus: '16'
          memory: 64G
  
  # Flink JobManager
  flink-jobmanager:
    image: flink:1.18.0-scala_2.12-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
      - FLINK_PROPERTIES=
          jobmanager.memory.process.size: 8192m
          taskmanager.numberOfTaskSlots: 8
    volumes:
      - flink-checkpoints:/opt/flink/checkpoints
  
  # Flink TaskManager
  flink-taskmanager:
    image: flink:1.18.0-scala_2.12-java11
    depends_on:
      - flink-jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
      - FLINK_PROPERTIES=
          taskmanager.memory.process.size: 16384m
          taskmanager.numberOfTaskSlots: 8
    deploy:
      replicas: 6
  
  # Doris FE
  doris-fe:
    image: apache/doris:2.0.0-fe
    ports:
      - "8030:8030"
      - "9030:9030"
    environment:
      - FE_SERVERS=fe1:doris-fe:9010
    volumes:
      - doris-fe-meta:/opt/apache-doris/fe/doris-meta
  
  # Doris BE
  doris-be:
    image: apache/doris:2.0.0-be
    depends_on:
      - doris-fe
    environment:
      - FE_SERVERS=doris-fe:9010
    volumes:
      - doris-be-storage:/opt/apache-doris/be/storage
    deploy:
      replicas: 6
  
  # Redis
  redis:
    image: redis:7.2-alpine
    ports:
      - "6379:6379"
    command: redis-server --maxmemory 16gb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data

volumes:
  kafka-data:
  flink-checkpoints:
  doris-fe-meta:
  doris-be-storage:
  redis-data:

核心挑战与解决方案:

1. 数据乱序问题

挑战:不同交易所的数据到达时间不一致,导致 K 线计算不准确。

解决方案

  • 使用 Watermark 机制容忍 5 秒乱序
  • 设置窗口允许延迟 1 分钟
  • 迟到数据输出到侧输出流单独处理
WatermarkStrategy<CryptoTick> watermarkStrategy = WatermarkStrategy
    .<CryptoTick>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((tick, ts) -> tick.getTimestamp())
    .withIdleness(Duration.ofMinutes(1));

DataStream<KLine> klineStream = tickStream
    .assignTimestampsAndWatermarks(watermarkStrategy)
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateDataTag)
    .aggregate(...);

2. 状态管理问题

挑战:技术指标计算需要维护大量历史数据状态,内存压力大。

解决方案

  • 使用 RocksDB 状态后端
  • 设置状态 TTL 自动清理过期数据
  • 使用增量 Checkpoint 减少 Checkpoint 时间
// 状态 TTL 配置
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .cleanupIncrementally(10, true)
    .build();

ListStateDescriptor<BigDecimal> descriptor = 
    new ListStateDescriptor<>("price-history", BigDecimal.class);
descriptor.enableTimeToLive(ttlConfig);

3. 查询性能问题

挑战:实时大屏需要高并发查询,Doris 查询压力大。

解决方案

  • 使用 Redis 缓存热点数据(最新价格、涨跌幅)
  • Doris 创建物化视图加速聚合查询
  • 使用 Rollup 预聚合数据
  • 查询结果缓存 5 秒
@Cacheable(value = "crypto-price", key = "#symbol", unless = "#result == null")
public CryptoPrice getRealtimePrice(String symbol) {
    // 先查 Redis
    String key = "crypto:price:" + symbol;
    String json = redisTemplate.opsForValue().get(key);
    if (json != null) {
        return JSON.parseObject(json, CryptoPrice.class);
    }
    
    // Redis 未命中,查 Doris
    return dorisTemplate.queryForObject(...);
}

4. 数据一致性问题

挑战:多个交易所的同一交易对价格不一致。

解决方案

  • 计算加权平均价格(按交易量加权)
  • 标记数据来源,支持按交易所查询
  • 异常价格检测和过滤
// 加权平均价格计算
public BigDecimal calculateWeightedAvgPrice(List<CryptoTick> ticks) {
    BigDecimal totalValue = BigDecimal.ZERO;
    BigDecimal totalVolume = BigDecimal.ZERO;
    
    for (CryptoTick tick : ticks) {
        totalValue = totalValue.add(tick.getPrice().multiply(tick.getVolume()));
        totalVolume = totalVolume.add(tick.getVolume());
    }
    
    return totalValue.divide(totalVolume, 8, RoundingMode.HALF_UP);
}

案例总结:

维度指标说明
数据规模5000 TPS,86 GB/天500 个交易对实时行情
延迟P99 < 3 秒从数据产生到可查询
可用性99.9%年停机时间 < 9 小时
查询性能P99 < 500ms实时查询响应时间
成本约 5 万/月云服务器 + 带宽成本

技术亮点:

  1. 流批一体:Flink 统一处理实时和历史数据
  2. 多引擎融合:Kafka + Flink + Doris + Redis 各司其职
  3. 技术指标实时计算:基于 Flink 状态实现复杂指标计算
  4. 交易信号生成:Flink CEP 实现模式匹配
  5. 高性能查询:Doris 物化视图 + Redis 缓存

业务价值:

  • 实时行情展示,延迟 < 1 秒
  • 技术指标实时更新,支持量化交易
  • 交易信号实时生成,捕捉市场机会
  • 历史数据回测,验证交易策略
  • 风险监控告警,及时发现异常

实战案例总结表(更新):

案例延迟要求数据量技术栈核心挑战业务价值
实时大屏秒级百万级/秒Flink + ClickHouse + Redis高并发聚合实时监控业务
实时风控毫秒级十万级/秒Flink + Redis + HBase低延迟、高准确率防止欺诈损失
实时推荐百毫秒级千万级/秒Flink + HBase + Redis特征实时性提升转化率
数据中台秒级亿级/天Flink + 多引擎统一管理数据服务化
加密货币行情秒级5000 TPSFlink + Doris + Redis状态管理、查询性能量化交易支持

实时数据中台

**业务场景:**构建统一的实时数据中台,为多个业务线提供实时数据服务。

架构设计:

graph TB subgraph "数据采集层" direction LR A1["MySQL CDC"] --> B1["Kafka"] A2["日志采集"] --> B1 A3["埋点数据"] --> B1 A4["第三方 API"] --> B1 end subgraph "数据处理层" direction LR B1 --> C1["Flink 实时 ETL"] C1 --> C2["ODS 层"] C2 --> C3["DWD 层"] C3 --> C4["DWS 层"] C4 --> C5["ADS 层"] end subgraph "数据存储层" direction LR C5 --> D1["ClickHouse"] C5 --> D2["Redis"] C5 --> D3["HBase"] C5 --> D4["Hudi 数据湖"] end subgraph "数据服务层" direction LR D1 --> E1["统一查询服务"] D2 --> E1 D3 --> E1 E1 --> F1["实时大屏"] E1 --> F2["业务系统"] E1 --> F3["数据分析"] E1 --> F4["AI/ML"] end subgraph "元数据管理" direction LR G1["数据血缘"] --> G2["数据目录"] G2 --> G3["数据质量"] G3 --> G4["权限管理"] end

统一数据采集平台:

// 统一数据采集配置
public class UnifiedDataCollector {
    
    // 数据源注册
    public void registerSource(DataSourceConfig config) {
        switch (config.getType()) {
            case MYSQL_CDC:
                registerMySQLCDC(config);
                break;
            case LOG_FILE:
                registerLogCollector(config);
                break;
            case HTTP_API:
                registerAPICollector(config);
                break;
            case KAFKA_TOPIC:
                registerKafkaSource(config);
                break;
        }
    }
    
    // MySQL CDC 注册
    private void registerMySQLCDC(DataSourceConfig config) {
        MySqlSource<String> source = MySqlSource.<String>builder()
            .hostname(config.getHost())
            .port(config.getPort())
            .databaseList(config.getDatabases())
            .tableList(config.getTables())
            .username(config.getUsername())
            .password(config.getPassword())
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
        
        // 注册到统一管理平台
        sourceRegistry.register(config.getName(), source);
    }
}

统一数据服务层:

@RestController
@RequestMapping("/api/data-service")
public class UnifiedDataServiceController {
    
    @Autowired
    private QueryRouter queryRouter;
    
    // 统一查询接口
    @PostMapping("/query")
    public QueryResult query(@RequestBody QueryRequest request) {
        // 根据查询类型路由到不同存储引擎
        DataSource dataSource = queryRouter.route(request);
        
        switch (dataSource) {
            case CLICKHOUSE:
                return clickHouseService.query(request);
            case REDIS:
                return redisService.query(request);
            case HBASE:
                return hbaseService.query(request);
            default:
                throw new UnsupportedOperationException("不支持的数据源");
        }
    }
    
    // 查询路由策略
    @Component
    public class QueryRouter {
        public DataSource route(QueryRequest request) {
            // 实时指标查询 → Redis
            if (request.isRealtimeMetric()) {
                return DataSource.REDIS;
            }
            // 明细数据查询 → ClickHouse
            if (request.isDetailQuery()) {
                return DataSource.CLICKHOUSE;
            }
            // KV 查询 → HBase
            if (request.isKVQuery()) {
                return DataSource.HBASE;
            }
            return DataSource.CLICKHOUSE;
        }
    }
}

元数据管理:

功能说明实现方式
数据血缘追踪数据从源到目标的流转路径Flink Lineage + Atlas
数据目录统一管理所有表、字段的元信息Hive Metastore + 自研
数据质量自动化数据质量检测和告警自研规则引擎
权限管理细粒度的数据访问控制Ranger + RBAC
数据标准统一的命名规范和数据标准数据治理平台
SLA 管理数据产出时效性保障自研监控平台

数据血缘追踪实现:

// 数据血缘采集
public class LineageCollector {
    
    // 记录表级血缘
    public void recordTableLineage(String sourceTable, String targetTable, 
                                    String jobName, String transformLogic) {
        LineageRecord record = LineageRecord.builder()
            .sourceTable(sourceTable)
            .targetTable(targetTable)
            .jobName(jobName)
            .transformLogic(transformLogic)
            .createTime(LocalDateTime.now())
            .build();
        
        lineageRepository.save(record);
    }
    
    // 记录字段级血缘
    public void recordColumnLineage(String sourceTable, String sourceColumn,
                                     String targetTable, String targetColumn,
                                     String expression) {
        ColumnLineageRecord record = ColumnLineageRecord.builder()
            .sourceTable(sourceTable)
            .sourceColumn(sourceColumn)
            .targetTable(targetTable)
            .targetColumn(targetColumn)
            .expression(expression)
            .build();
        
        columnLineageRepository.save(record);
    }
    
    // 查询数据血缘(向上追溯)
    public List<LineageRecord> traceUpstream(String tableName, int depth) {
        return lineageRepository.findUpstream(tableName, depth);
    }
    
    // 查询数据影响(向下追溯)
    public List<LineageRecord> traceDownstream(String tableName, int depth) {
        return lineageRepository.findDownstream(tableName, depth);
    }
}

实战案例总结:

案例延迟要求数据量技术栈核心挑战
实时大屏秒级百万级/秒Flink + ClickHouse + Redis高并发聚合
实时风控毫秒级十万级/秒Flink + Redis + HBase低延迟、高准确率
实时推荐百毫秒级千万级/秒Flink + HBase + Redis特征实时性
数据中台秒级亿级/天Flink + 多引擎统一管理、多业务线

实时数仓最佳实践

架构设计原则

1. 分层解耦

  • 数据采集、计算、存储、应用各层独立
  • 每层可独立扩展和优化
  • 降低系统耦合度

2. 流批一体

  • 使用 Flink 统一流批处理
  • 数据湖支持流批写入
  • 简化开发和运维

3. 湖仓一体

  • 结合数据湖的灵活性和数据仓库的性能
  • 支持 ACID 事务
  • 统一元数据管理

4. 多引擎融合

  • 根据不同场景选择合适的存储引擎
  • ClickHouse:OLAP 分析
  • Hudi/Iceberg:数据湖
  • Redis/HBase:KV 存储

5. 元数据驱动

  • 统一的元数据管理
  • 数据血缘追踪
  • 数据质量监控

开发规范

1. 命名规范

类型规范示例
表名{层级}{业务}{粒度}dws_order_1min
字段名小写下划线order_id, create_time
Job 名{业务}_{功能}JobOrderProcessJob
类名大驼峰OrderMapper
方法名小驼峰processOrder

2. 代码规范

// 良好的代码结构
public class OrderProcessJob {
    
    // 常量定义
    private static final String KAFKA_SERVERS = "localhost:9092";
    private static final int CHECKPOINT_INTERVAL = 60000;
    
    public static void main(String[] args) throws Exception {
        // 1. 环境配置
        StreamExecutionEnvironment env = createEnvironment();
        
        // 2. 数据源
        DataStream<Order> orderStream = createOrderSource(env);
        
        // 3. 数据处理
        DataStream<OrderStats> statsStream = processOrders(orderStream);
        
        // 4. 数据输出
        addSinks(statsStream);
        
        // 5. 执行任务
        env.execute("Order Process Job");
    }
    
    private static StreamExecutionEnvironment createEnvironment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(CHECKPOINT_INTERVAL);
        env.setParallelism(16);
        return env;
    }
    
    private static DataStream<Order> createOrderSource(StreamExecutionEnvironment env) {
        return env.addSource(new FlinkKafkaConsumer<>(...));
    }
    
    private static DataStream<OrderStats> processOrders(DataStream<Order> orderStream) {
        return orderStream
            .map(new OrderMapper())
            .keyBy(Order::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new OrderAggregateFunction());
    }
    
    private static void addSinks(DataStream<OrderStats> statsStream) {
        statsStream.addSink(new ClickHouseSink<>(...));
    }
}

3. 配置管理

// 使用配置文件
public class Config {
    private static final Properties props = new Properties();
    
    static {
        try {
            props.load(Config.class.getResourceAsStream("/application.properties"));
        } catch (IOException e) {
            throw new RuntimeException("Failed to load config", e);
        }
    }
    
    public static String get(String key) {
        return props.getProperty(key);
    }
    
    public static String get(String key, String defaultValue) {
        return props.getProperty(key, defaultValue);
    }
}

// application.properties
kafka.bootstrap.servers=localhost:9092
kafka.group.id=order-consumer
flink.checkpoint.interval=60000
flink.parallelism=16
clickhouse.url=jdbc:clickhouse://localhost:8123/default

测试策略

1. 单元测试

public class OrderMapperTest {
    
    @Test
    public void testOrderMapping() {
        OrderMapper mapper = new OrderMapper();
        
        OdsOrder ods = new OdsOrder();
        ods.setOrderId("order123");
        ods.setAmount("199.99");
        
        DwdOrder dwd = mapper.map(ods);
        
        assertEquals("order123", dwd.getOrderId());
        assertEquals(new BigDecimal("199.99"), dwd.getAmount());
    }
}

2. 集成测试

public class OrderProcessJobTest {
    
    @Test
    public void testOrderProcessing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 创建测试数据源
        DataStream<Order> testStream = env.fromElements(
            new Order("order1", "user1", new BigDecimal("100")),
            new Order("order2", "user1", new BigDecimal("200"))
        );
        
        // 处理数据
        DataStream<OrderStats> statsStream = testStream
            .keyBy(Order::getUserId)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .aggregate(new OrderAggregateFunction());
        
        // 收集结果
        List<OrderStats> results = new ArrayList<>();
        statsStream.addSink(new CollectSink(results));
        
        env.execute();
        
        // 验证结果
        assertEquals(1, results.size());
        assertEquals(2, results.get(0).getOrderCount());
        assertEquals(new BigDecimal("300"), results.get(0).getTotalAmount());
    }
}

3. 性能测试

public class PerformanceTest {
    
    @Test
    public void testThroughput() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 生成大量测试数据
        DataStream<Order> testStream = env
            .addSource(new RateLimitedSourceFunction(10000))  // 10000 TPS
            .setParallelism(16);
        
        // 处理数据
        DataStream<OrderStats> statsStream = testStream
            .keyBy(Order::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new OrderAggregateFunction());
        
        // 测量吞吐量
        statsStream.addSink(new ThroughputMeasureSink());
        
        env.execute();
    }
}

发布流程

1. 灰度发布

# Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-job-v2
spec:
  replicas: 1  # 先发布 1 个实例
  selector:
    matchLabels:
      app: flink-job
      version: v2
  template:
    metadata:
      labels:
        app: flink-job
        version: v2
    spec:
      containers:
      - name: flink-job
        image: flink-job:v2

2. 蓝绿部署

# 部署新版本(绿)
kubectl apply -f flink-job-v2.yaml

# 等待新版本就绪
kubectl wait --for=condition=ready pod -l version=v2

# 切换流量到新版本
kubectl patch service flink-job -p '{"spec":{"selector":{"version":"v2"}}}'

# 验证新版本
# 如果有问题,回滚到旧版本
kubectl patch service flink-job -p '{"spec":{"selector":{"version":"v1"}}}'

# 删除旧版本
kubectl delete deployment flink-job-v1

3. 金丝雀发布

# Argo Rollouts
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
  name: flink-job
spec:
  replicas: 10
  strategy:
    canary:
      steps:
      - setWeight: 10  # 10% 流量
      - pause: {duration: 5m}
      - setWeight: 30  # 30% 流量
      - pause: {duration: 5m}
      - setWeight: 50  # 50% 流量
      - pause: {duration: 5m}
      - setWeight: 100  # 100% 流量

成本优化

1. 计算资源优化

  • 合理设置并行度,避免资源浪费
  • 使用 Slot Sharing 共享资源
  • 使用弹性伸缩,按需分配资源

2. 存储资源优化

  • 冷热数据分离
  • 数据压缩(LZ4/Snappy)
  • 定期清理过期数据

3. 网络资源优化

  • 数据压缩传输
  • 减少数据 Shuffle
  • 使用本地聚合

成本优化总结:

优化项优化措施效果
计算合理并行度、资源共享成本降低 30%
存储冷热分离、数据压缩成本降低 50%
网络数据压缩、本地聚合成本降低 40%

容量规划与扩展

容量规划是实时数仓建设的关键环节,需要根据业务增长预期合理规划资源。

容量评估维度:

维度评估指标计算方式示例
数据量日增数据量单条大小 × 日消息数1KB × 1亿 = 100GB/天
吞吐量峰值 TPS日均 TPS × 峰值系数1万 × 5 = 5万 TPS
存储总存储量日增量 × 保留天数 × 副本数100GB × 30 × 3 = 9TB
计算CPU 核数并行度 × 每 Slot CPU64 × 2 = 128 核
内存总内存TM 数 × 每 TM 内存16 × 8GB = 128GB

Kafka 容量规划:

# Kafka 存储容量计算
单条消息大小: 1KB
日消息量: 1 亿条
日数据量: 1KB × 100,000,000 = 100GB
副本数: 3
日存储量: 100GB × 3 = 300GB
保留天数: 7 天
总存储量: 300GB × 7 = 2.1TB

# Kafka 分区数计算
目标吞吐量: 50,000 TPS
单分区吞吐量: 5,000 TPS(经验值)
分区数: 50,000 / 5,000 = 10 个分区(建议 16 个,留余量)

# Broker 数量
每 Broker 磁盘: 1TB
Broker 数量: 2.1TB / 1TB ≈ 3 个(建议 5 个,留余量)

ClickHouse 容量规划:

-- 存储容量估算
-- 原始数据: 100GB/天
-- 压缩比: 10:1(列式存储 + LZ4)
-- 压缩后: 10GB/天
-- 保留天数: 90 天
-- 副本数: 2
-- 总存储: 10GB × 90 × 2 = 1.8TB

-- 查询性能估算
-- 单节点扫描速度: 200MB/s(SSD)
-- 90 天数据量: 900GB(单副本)
-- 全表扫描时间: 900GB / 200MB/s ≈ 75 分钟
-- 分区裁剪后(单天): 10GB / 200MB/s ≈ 50 秒
-- PREWHERE 过滤后: 1GB / 200MB/s ≈ 5 秒

扩展策略:

graph TB subgraph "垂直扩展" A1["增加 CPU"] --> A2["增加内存"] A2 --> A3["升级磁盘 SSD"] A3 --> A4["增加网络带宽"] end subgraph "水平扩展" B1["增加 Kafka 分区"] --> B2["增加 Flink 并行度"] B2 --> B3["增加 ClickHouse 分片"] B3 --> B4["增加 Redis 节点"] end subgraph "架构扩展" C1["读写分离"] --> C2["冷热分离"] C2 --> C3["多级缓存"] C3 --> C4["异步处理"] end style A1 fill:#4ecdc4,color:#fff style B1 fill:#45b7d1,color:#fff style C1 fill:#96ceb4,color:#fff

水平扩展操作步骤:

1. Kafka 扩容

# 增加分区数(不可减少)
kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic orders --partitions 32

# 添加新 Broker 后重新分配分区
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
    --reassignment-json-file reassignment.json --execute

2. Flink 扩容

# 方式 1:停止任务,修改并行度后重启
flink stop <job-id> -p /savepoints/
flink run -s /savepoints/savepoint-xxx -p 64 job.jar

# 方式 2:Reactive Mode(自动扩缩容)
# flink-conf.yaml
scheduler-mode: reactive

3. ClickHouse 扩容

-- 添加新分片后,迁移数据
-- 1. 创建新的分布式表(包含新分片)
CREATE TABLE dwd_order_dist ON CLUSTER new_cluster
AS dwd_order_local
ENGINE = Distributed('new_cluster', 'default', 'dwd_order_local', rand());

-- 2. 历史数据重新分布(可选)
INSERT INTO dwd_order_dist SELECT * FROM dwd_order_local;

容量预警机制:

指标黄色预警红色预警处理方式
磁盘使用率> 70%> 85%扩容/清理数据
CPU 使用率> 70%> 90%扩容/优化代码
内存使用率> 75%> 90%扩容/优化状态
Kafka 堆积> 50万> 200万增加消费者
查询延迟> 3s> 10s优化查询/扩容

实时数仓常见问题与解决方案

**问题现象:**TaskManager 频繁 OOM,任务不断重启。

排查步骤:

# 1. 查看 GC 日志
grep "Full GC" flink-taskmanager-*.log

# 2. 查看内存使用
curl http://taskmanager:8081/taskmanagers/<tm-id>/metrics?get=Status.JVM.Memory.Heap.Used

# 3. 生成堆转储
jmap -dump:format=b,file=heap.hprof <pid>

# 4. 分析堆转储
jhat heap.hprof
# 或使用 MAT (Memory Analyzer Tool)

常见原因及解决方案:

原因现象解决方案
状态过大状态持续增长设置 State TTL,定期清理
数据倾斜部分 Task 内存高加盐打散、两阶段聚合
窗口数据堆积窗口触发前内存暴涨使用增量聚合、减小窗口
对象创建过多GC 频繁复用对象、使用对象池
外部连接泄漏连接数持续增长使用连接池、及时关闭
序列化问题序列化后数据膨胀使用高效序列化器

State TTL 配置示例:

// 设置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)  // RocksDB 压缩时清理
    .build();

ValueStateDescriptor<UserProfile> descriptor = 
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

内存配置优化:

# flink-conf.yaml
taskmanager.memory.process.size: 8g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.jvm-overhead.fraction: 0.1

# RocksDB 状态后端优化
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 128mb
state.backend.rocksdb.writebuffer.count: 4

反压识别:

graph LR subgraph "反压传播链路" A["Source"] -->|"正常"| B["Map"] B -->|"正常"| C["KeyBy + Window"] C -->|"反压"| D["Sink"] end style C fill:#ff6b6b,color:#fff style D fill:#ff6b6b,color:#fff

反压排查流程:

  1. Flink Web UI:查看各 Task 的反压状态(OK / LOW / HIGH)
  2. 定位瓶颈算子:从 Sink 向上游逐步排查,找到第一个反压为 HIGH 的算子
  3. 分析瓶颈原因
    • CPU 密集型:优化计算逻辑
    • IO 密集型:使用异步 IO
    • 数据倾斜:重新分区

反压解决方案:

// 方案 1:异步 IO 解决外部查询瓶颈
AsyncDataStream.unorderedWait(
    inputStream,
    new AsyncDatabaseRequest(),
    5000,       // 超时时间
    TimeUnit.MILLISECONDS,
    100         // 最大并发请求数
);

// 方案 2:使用 Cache 减少外部查询
public class CachedLookupFunction extends RichMapFunction<Order, EnrichedOrder> {
    private transient LoadingCache<String, UserInfo> cache;
    
    @Override
    public void open(Configuration parameters) {
        cache = CacheBuilder.newBuilder()
            .maximumSize(100000)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build(new CacheLoader<String, UserInfo>() {
                @Override
                public UserInfo load(String userId) {
                    return queryFromDB(userId);
                }
            });
    }
    
    @Override
    public EnrichedOrder map(Order order) {
        UserInfo userInfo = cache.get(order.getUserId());
        return new EnrichedOrder(order, userInfo);
    }
}

// 方案 3:增加并行度
env.setParallelism(32);  // 全局并行度
sinkOperator.setParallelism(64);  // Sink 单独设置更高并行度

Checkpoint 失败问题

常见 Checkpoint 失败原因:

原因错误信息解决方案
超时Checkpoint expired before completing增加超时时间、优化状态大小
状态过大OutOfMemoryError during checkpoint启用增量 Checkpoint、优化状态
对齐超时Barrier alignment timeout使用 Unaligned Checkpoint
存储故障HDFS/S3 写入失败检查存储集群状态
Task 失败Task failed during checkpoint修复 Task 异常

Unaligned Checkpoint 配置:

// 适用于反压严重的场景
env.getCheckpointConfig().enableUnalignedCheckpoints();
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));

// 增量 Checkpoint(RocksDB 状态后端)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

// Checkpoint 存储配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10 分钟
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);  // 最小间隔 30 秒
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

Kafka 常见问题

消息丢失问题

消息丢失的三个环节:

graph LR subgraph "Producer" A["生产者"] -->|"可能丢失"| B["Kafka Broker"] end subgraph "Broker" B -->|"可能丢失"| C["磁盘"] end subgraph "Consumer" B -->|"可能丢失"| D["消费者"] end style A fill:#4ecdc4,color:#fff style B fill:#45b7d1,color:#fff style D fill:#96ceb4,color:#fff

防止消息丢失的配置:

# Producer 端
acks=all                          # 所有副本确认
retries=3                         # 重试次数
max.in.flight.requests.per.connection=1  # 保证顺序
enable.idempotence=true           # 幂等性

# Broker 端
min.insync.replicas=2             # 最少同步副本数
unclean.leader.election.enable=false  # 禁止不完整副本选举

# Consumer 端
enable.auto.commit=false          # 关闭自动提交
auto.offset.reset=earliest        # 从最早开始消费

Flink Kafka Consumer 精确一次配置:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("orders")
    .setGroupId("order-consumer")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
    .setProperty("isolation.level", "read_committed")  // 读已提交
    .build();

消息重复消费问题

重复消费原因:

  • Consumer 提交 offset 前宕机
  • Rebalance 导致 offset 回退
  • Producer 重试导致消息重复

解决方案:

// 方案 1:幂等写入(推荐)
public class IdempotentSinkFunction extends RichSinkFunction<Order> {
    
    @Override
    public void invoke(Order order, Context context) {
        // 使用 UPSERT 语义写入
        String sql = """
            INSERT INTO dwd_order (order_id, amount, status, update_time)
            VALUES (?, ?, ?, ?)
            ON DUPLICATE KEY UPDATE
                amount = VALUES(amount),
                status = VALUES(status),
                update_time = VALUES(update_time)
            """;
        jdbcTemplate.update(sql, order.getOrderId(), order.getAmount(), 
            order.getStatus(), order.getUpdateTime());
    }
}

// 方案 2:基于 Redis 去重
public class DeduplicationFunction extends RichFilterFunction<Order> {
    private transient Jedis jedis;
    
    @Override
    public boolean filter(Order order) {
        String key = "dedup:" + order.getOrderId();
        // SETNX 原子操作,设置过期时间
        String result = jedis.set(key, "1", SetParams.setParams().nx().ex(86400));
        return "OK".equals(result);  // 返回 true 表示首次出现
    }
}

ClickHouse 常见问题

写入性能问题

写入优化策略:

策略说明效果
批量写入每批 10000+ 行吞吐提升 10 倍
异步写入使用 Buffer 表降低写入延迟
分布式写入写入本地表避免分布式开销
数据预排序按主键排序后写入减少 Merge 开销
合理分区避免分区过多减少文件数

批量写入实现:

// ClickHouse 批量写入 Sink
public class ClickHouseBatchSink extends RichSinkFunction<Order> {
    
    private static final int BATCH_SIZE = 10000;
    private static final long FLUSH_INTERVAL = 10000;  // 10 秒
    
    private transient List<Order> buffer;
    private transient ScheduledExecutorService scheduler;
    private transient ClickHouseConnection connection;
    
    @Override
    public void open(Configuration parameters) {
        buffer = new ArrayList<>(BATCH_SIZE);
        connection = ClickHouseDataSource.getConnection();
        
        // 定时刷新
        scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(this::flush, 
            FLUSH_INTERVAL, FLUSH_INTERVAL, TimeUnit.MILLISECONDS);
    }
    
    @Override
    public void invoke(Order order, Context context) {
        synchronized (buffer) {
            buffer.add(order);
            if (buffer.size() >= BATCH_SIZE) {
                flush();
            }
        }
    }
    
    private void flush() {
        List<Order> toFlush;
        synchronized (buffer) {
            if (buffer.isEmpty()) return;
            toFlush = new ArrayList<>(buffer);
            buffer.clear();
        }
        
        try (PreparedStatement ps = connection.prepareStatement(
                "INSERT INTO dwd_order VALUES (?, ?, ?, ?, ?)")) {
            for (Order order : toFlush) {
                ps.setString(1, order.getOrderId());
                ps.setBigDecimal(2, order.getAmount());
                ps.setString(3, order.getStatus());
                ps.setString(4, order.getUserId());
                ps.setTimestamp(5, Timestamp.valueOf(order.getCreateTime()));
                ps.addBatch();
            }
            ps.executeBatch();
        } catch (SQLException e) {
            log.error("ClickHouse 批量写入失败", e);
            // 重试逻辑
        }
    }
    
    @Override
    public void close() {
        flush();
        scheduler.shutdown();
        connection.close();
    }
}

查询优化问题

慢查询优化清单:

-- 1. 使用 PREWHERE 替代 WHERE(减少数据读取量)
-- 差
SELECT * FROM dwd_order WHERE status = 'PAID' AND amount > 100;
-- 好
SELECT * FROM dwd_order PREWHERE status = 'PAID' WHERE amount > 100;

-- 2. 避免 SELECT *(只查需要的列)
-- 差
SELECT * FROM dwd_order WHERE order_date = '2026-01-01';
-- 好
SELECT order_id, amount, status FROM dwd_order WHERE order_date = '2026-01-01';

-- 3. 利用分区裁剪
-- 差(全表扫描)
SELECT count() FROM dwd_order WHERE toDate(create_time) = '2026-01-01';
-- 好(分区裁剪)
SELECT count() FROM dwd_order WHERE order_date = '2026-01-01';

-- 4. 使用物化视图预聚合
CREATE MATERIALIZED VIEW dws_order_daily_mv
ENGINE = SummingMergeTree()
ORDER BY (order_date, province)
AS SELECT
    toDate(create_time) AS order_date,
    province,
    count() AS order_count,
    sum(amount) AS total_amount
FROM dwd_order
GROUP BY order_date, province;

-- 5. 合理使用索引
ALTER TABLE dwd_order ADD INDEX idx_status status TYPE set(100) GRANULARITY 4;
ALTER TABLE dwd_order ADD INDEX idx_amount amount TYPE minmax GRANULARITY 4;

数据一致性问题

实时与离线数据不一致

不一致原因分析:

原因说明影响
数据延迟实时数据未完全到达实时 < 离线
数据重复实时处理重复消费实时 > 离线
计算逻辑差异流批处理逻辑不同结果不一致
时间窗口差异窗口边界处理不同边界数据差异
数据源差异实时和离线读取不同快照数据不一致

解决方案:

1. 统一计算逻辑(流批一体)

-- 使用 Flink SQL 统一流批处理
-- 同一套 SQL 既可以流处理也可以批处理
CREATE TABLE orders (
    order_id STRING,
    amount DECIMAL(10, 2),
    create_time TIMESTAMP(3),
    WATERMARK FOR create_time AS create_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- 统一聚合逻辑
INSERT INTO dws_order_stats
SELECT
    TUMBLE_START(create_time, INTERVAL '1' HOUR) AS window_start,
    count(*) AS order_count,
    sum(amount) AS total_amount
FROM orders
GROUP BY TUMBLE(create_time, INTERVAL '1' HOUR);

2. 定期数据对账与修复

// 自动对账任务
@Scheduled(cron = "0 0 2 * * ?")  // 每天凌晨 2 点
public void reconcileData() {
    LocalDate yesterday = LocalDate.now().minusDays(1);
    
    // 查询实时数仓数据
    RealtimeStats realtimeStats = clickHouseService.queryDailyStats(yesterday);
    
    // 查询离线数仓数据
    BatchStats batchStats = hiveService.queryDailyStats(yesterday);
    
    // 对比差异
    ReconcileResult result = reconcileService.compare(realtimeStats, batchStats);
    
    if (result.getDiffRate() > 0.01) {  // 差异超过 1%
        // 发送告警
        alertService.sendAlert("数据对账异常", result.toString());
        
        // 自动修复(以离线数据为准)
        if (result.getDiffRate() < 0.05) {  // 差异小于 5% 自动修复
            repairService.repairFromBatch(yesterday);
        }
    }
    
    // 记录对账结果
    reconcileLogService.save(result);
}

3. 数据修正机制

graph TB A["发现数据不一致"] --> B{"差异率"} B -->|"< 1%"| C["记录日志,忽略"] B -->|"1% ~ 5%"| D["自动修复"] B -->|"> 5%"| E["人工介入"] D --> F["从离线数仓回补"] E --> G["排查根因"] G --> H["修复代码/配置"] H --> I["重跑数据"] style A fill:#ff6b6b,color:#fff style D fill:#4ecdc4,color:#fff style E fill:#ffa726,color:#fff

数据延迟问题

端到端延迟优化

延迟分解与优化:

阶段典型延迟优化目标优化手段
数据采集1-5s< 1s减少采集间隔、使用 CDC
Kafka 传输10-100ms< 50ms优化 batch.size、linger.ms
Flink 处理100ms-10s< 1s优化算子、减少 Shuffle
Sink 写入100ms-5s< 500ms批量写入、异步写入
查询响应100ms-3s< 500ms预聚合、缓存

Kafka 低延迟配置:

# Producer 低延迟配置
linger.ms=5                       # 减少等待时间(默认 0)
batch.size=16384                  # 适当减小批次
buffer.memory=33554432            # 32MB 缓冲区
compression.type=lz4              # 快速压缩

# Consumer 低延迟配置
fetch.min.bytes=1                 # 有数据就返回
fetch.max.wait.ms=100             # 最大等待 100ms
max.poll.records=500              # 每次拉取 500 条

Flink 低延迟配置:

// 减少缓冲区刷新间隔
env.setBufferTimeout(10);  // 10ms 刷新一次

// 使用处理时间减少 Watermark 延迟
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))

// 减少 Checkpoint 对延迟的影响
env.getCheckpointConfig().enableUnalignedCheckpoints();

// 使用 Mini-Batch 优化(Flink SQL)
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");

资源管理问题

资源估算公式:

TaskManager 数量 = 总并行度 / 每个 TM 的 Slot 数
每个 TM 内存 = 状态大小 / TM 数量 * 2 + 网络缓冲 + JVM 开销
每个 TM CPU = Slot 数 * 每个 Slot 的 CPU 需求

资源规划参考:

场景数据量并行度TM 数量TM 内存TM CPU
小型1万 TPS824GB2核
中型10万 TPS3288GB4核
大型100万 TPS1283216GB8核
超大型1000万 TPS51212832GB16核

Kubernetes 资源配置:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: order-processing
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: rocksdb
    state.backend.incremental: "true"
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "8192m"
      cpu: 4
    replicas: 8
  job:
    jarURI: local:///opt/flink/jobs/order-processing.jar
    parallelism: 32
    upgradeMode: savepoint

弹性伸缩策略:

// 基于 Kafka 消费延迟的自动伸缩
public class AutoScaler {
    
    private static final long LAG_THRESHOLD_SCALE_UP = 1000000;   // 100 万
    private static final long LAG_THRESHOLD_SCALE_DOWN = 10000;   // 1 万
    
    @Scheduled(fixedRate = 60000)  // 每分钟检查
    public void checkAndScale() {
        long totalLag = kafkaAdminService.getConsumerGroupLag("order-consumer");
        int currentParallelism = flinkService.getJobParallelism("order-processing");
        
        if (totalLag > LAG_THRESHOLD_SCALE_UP) {
            // 扩容:并行度翻倍,最大 256
            int newParallelism = Math.min(currentParallelism * 2, 256);
            flinkService.rescaleJob("order-processing", newParallelism);
            log.info("扩容: {} -> {}, lag: {}", currentParallelism, newParallelism, totalLag);
        } else if (totalLag < LAG_THRESHOLD_SCALE_DOWN && currentParallelism > 8) {
            // 缩容:并行度减半,最小 8
            int newParallelism = Math.max(currentParallelism / 2, 8);
            flinkService.rescaleJob("order-processing", newParallelism);
            log.info("缩容: {} -> {}, lag: {}", currentParallelism, newParallelism, totalLag);
        }
    }
}

跨机房容灾

多活架构设计

双机房架构:

graph TB subgraph "机房 A(主)" A1["Kafka 集群 A"] --> B1["Flink 集群 A"] B1 --> C1["ClickHouse 集群 A"] end subgraph "机房 B(备)" A2["Kafka 集群 B"] --> B2["Flink 集群 B"] B2 --> C2["ClickHouse 集群 B"] end A1 <-->|"MirrorMaker 2"| A2 C1 <-->|"数据同步"| C2 D["负载均衡"] --> A1 D --> A2 style A1 fill:#4ecdc4,color:#fff style A2 fill:#45b7d1,color:#fff

Kafka MirrorMaker 2 配置:

# mm2.properties
clusters = primary, backup
primary.bootstrap.servers = kafka-a:9092
backup.bootstrap.servers = kafka-b:9092

# 主 → 备 同步
primary->backup.enabled = true
primary->backup.topics = orders, payments, users

# 备 → 主 同步(双活场景)
backup->primary.enabled = true
backup->primary.topics = orders, payments, users

# 同步配置
replication.factor = 3
sync.topic.configs.enabled = true
sync.topic.acls.enabled = true
emit.heartbeats.enabled = true
emit.checkpoints.enabled = true

故障切换流程:

步骤操作耗时说明
1检测故障1-3 分钟自动监控告警
2确认切换1-5 分钟人工确认或自动决策
3DNS 切换1-2 分钟流量切换到备机房
4Flink 任务恢复2-5 分钟从 Checkpoint 恢复
5数据校验5-10 分钟验证数据一致性
总计-10-25 分钟RTO 目标 < 30 分钟

高频面试题精选

1. 实时数仓和离线数仓的核心区别是什么?

:核心区别在于数据处理模式和延迟

处理模式:

  • 离线数仓采用批处理模式,数据延迟通常是 T+1(天级),使用 Hive/Spark Batch 进行计算
  • 实时数仓采用流处理模式,数据延迟在秒级或分钟级,使用 Flink/Spark Streaming 进行计算

技术差异:

  • 离线数仓:HDFS + Hive + Spark Batch,定时调度,资源消耗集中
  • 实时数仓:Kafka + Flink + ClickHouse,持续运行,资源占用稳定

适用场景:

  • 离线数仓:历史分析、离线报表、数据挖掘
  • 实时数仓:实时监控、实时决策、实时推荐

关键点:实时数仓需要处理更多边界情况(乱序、重复、状态管理),开发和运维复杂度更高,但能提供更好的业务价值。


2. Lambda 架构和 Kappa 架构的优缺点是什么?如何选择?

Lambda 架构:

  • 优点:容错性强(批处理保证准确性)、可扩展性好、历史数据可重算
  • 缺点:需要维护两套代码(批处理 + 流处理)、数据一致性难保证、架构复杂、运维成本高

Kappa 架构:

  • 优点:架构简单(只需维护一套流处理代码)、开发效率高、数据一致性好
  • 缺点:依赖 Kafka 长期存储(成本高)、历史数据回溯速度慢、不适合超大规模数据

选择建议:

  • 选择 Lambda:数据量超大(PB 级以上)、需要频繁重算历史数据、对准确性要求极高
  • 选择 Kappa:数据量适中(TB 级)、以实时处理为主、追求架构简洁性
  • 选择现代架构:需要流批一体、数据湖能力,推荐 Flink + Hudi/Iceberg

关键点:现代实时数仓更倾向于 Kappa 架构或流批一体的现代架构,因为技术成熟度提升,可以用流处理引擎统一处理实时和历史数据。


3. Flink 如何保证 Exactly-Once 语义?

:Flink 通过两阶段提交(2PC)+ Checkpoint 机制保证端到端 Exactly-Once。

实现机制:

  1. Source 端:Kafka 支持 offset 管理,Checkpoint 时记录 offset
  2. 计算过程:Checkpoint 机制保证状态一致性,失败时从最近的 Checkpoint 恢复
  3. Sink 端:使用两阶段提交协议
    • 第一阶段(Pre-commit):数据写入但不可见,等待 Checkpoint 完成
    • 第二阶段(Commit):Checkpoint 成功后,正式提交,数据可见
    • 失败时(Abort):回滚预提交的数据

关键配置:

env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
sink.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);

关键点:Exactly-Once 需要 Source、Flink、Sink 三方配合,任何一方不支持都无法实现端到端的 Exactly-Once。


4. 实时数仓如何处理数据乱序问题?

:使用 Watermark 机制处理乱序数据。

Watermark 原理:

  • Watermark 表示时间戳小于 Watermark 的数据已经全部到达
  • 设置乱序容忍度:forBoundedOutOfOrderness(Duration.ofSeconds(10)),容忍 10 秒乱序
  • 窗口触发:当 Watermark 超过窗口结束时间时,触发窗口计算

处理策略:

  1. 设置合理的乱序容忍度
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
  1. 允许窗口延迟关闭
window.allowedLateness(Time.minutes(1))
  1. 侧输出流处理迟到数据
window.sideOutputLateData(lateOutputTag)
DataStream<Order> lateData = result.getSideOutput(lateOutputTag);

关键点:乱序容忍度的设置需要平衡实时性和准确性,容忍度越大,延迟越高,但数据越完整。


5. 如何解决实时数仓中的数据倾斜问题?

:数据倾斜的识别和解决方案

识别方法:

  • Flink Web UI 查看各 Task 的处理速度
  • 监控各 Task 的数据量分布
  • 查看反压情况

解决方案:

1. 加盐(Salting):给热点 key 添加随机后缀

.map(order -> {
    String newKey = order.getUserId() + "_" + random.nextInt(10);
    order.setUserId(newKey);
    return order;
})

2. 两阶段聚合:先局部聚合,再全局聚合

// 第一阶段:加盐聚合
.keyBy(order -> order.getUserId() + "_" + random.nextInt(10))
.window(...).aggregate(...)
// 第二阶段:去盐聚合
.keyBy(order -> order.getUserId())
.window(...).aggregate(...)

3. 自定义分区器:根据业务特点自定义分区逻辑

4. 使用 rebalance():强制数据重新分布

关键点:数据倾斜会导致部分 Task 成为瓶颈,影响整体性能,需要根据具体情况选择合适的解决方案。


6. ClickHouse 为什么查询这么快?

:ClickHouse 的核心优化技术

1. 列式存储:只读取需要的列,减少 IO 2. 向量化执行:使用 SIMD 指令批量处理数据,CPU 利用率高 3. 数据压缩:列式存储压缩比高(10:1),减少磁盘 IO 4. 稀疏索引:主键索引 + 跳数索引,快速定位数据 5. 分区裁剪:根据分区键快速过滤数据 6. 并行处理:多核并行查询 7. 预聚合:SummingMergeTree 等引擎自动聚合

适用场景:

  • OLAP 分析、实时报表、大屏展示
  • 不适合高并发点查和频繁更新

关键点:ClickHouse 的性能优势来自于针对 OLAP 场景的深度优化,但不支持事务和更新,需要结合业务场景选择。


7. 实时数仓如何保证数据质量?

数据质量保障体系

1. 数据校验

  • 必填字段校验
  • 数值范围校验
  • 枚举值校验
  • 时间合理性校验

2. 数据去重

  • 基于主键去重
  • 使用状态 + TTL
  • ClickHouse ReplacingMergeTree

3. 数据对账

  • 实时数仓 vs 离线数仓对账
  • 定期核对数据一致性
  • 差异分析和修复

4. 监控告警

  • 数据断流告警
  • 数据重复率告警
  • 数据延迟告警
  • 数据质量率监控

5. 数据修复

  • 支持数据回溯
  • Kafka 消息重放
  • 手动修复工具

关键点:数据质量是实时数仓的生命线,需要建立完善的数据质量保障体系,从数据采集到数据应用全链路监控。


8. Flink 状态过大导致 Checkpoint 超时怎么办?

优化方案

1. 启用增量 Checkpoint

env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

2. 设置状态 TTL

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .cleanupFullSnapshot()
    .build();

3. 增加 Checkpoint 间隔

env.enableCheckpointing(300000);  // 5 分钟

4. 增加 Checkpoint 超时时间

env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10 分钟

5. 优化状态存储

  • 使用 RocksDB 状态后端
  • 调优 RocksDB 参数
  • 增加托管内存

6. 业务优化

  • 减少状态大小
  • 使用聚合状态而非列表状态
  • 定期清理过期状态

关键点:大状态是实时数仓的常见问题,需要从技术和业务两方面优化,增量 Checkpoint 是必选项。


9. 如何监控实时数仓的端到端延迟?

端到端延迟监控方案

1. 在数据中添加时间戳

  • 数据产生时间(Source Timestamp)
  • Kafka 接收时间(Kafka Timestamp)
  • Flink 处理时间(Process Timestamp)
  • Sink 写入时间(Sink Timestamp)

2. 计算各阶段延迟

  • Kafka 延迟 = Kafka Timestamp - Source Timestamp
  • 处理延迟 = Process Timestamp - Kafka Timestamp
  • Sink 延迟 = Sink Timestamp - Process Timestamp
  • 端到端延迟 = Sink Timestamp - Source Timestamp

3. 使用 Metrics 上报

Histogram latencyHistogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("end_to_end_latency", new DescriptiveStatisticsHistogram(1000));

latencyHistogram.update(System.currentTimeMillis() - order.getSourceTimestamp());

4. Grafana 可视化

  • P50/P95/P99 延迟
  • 延迟趋势图
  • 各阶段延迟占比

关键点:端到端延迟监控是实时数仓的核心指标,需要全链路埋点,定位性能瓶颈。


10. 实时数仓和数据湖的关系是什么?

实时数仓和数据湖是互补关系

数据湖(Hudi/Iceberg)的优势:

  • 支持 ACID 事务
  • 支持更新和删除
  • 支持时间旅行
  • 流批一体

结合方式:

1. 存储层:使用数据湖作为统一存储 2. 计算层:Flink 流式写入数据湖 3. 查询层

  • 实时查询:ClickHouse/Doris
  • 批量查询:Spark/Presto 查询数据湖

典型架构:

Kafka → Flink → Hudi/Iceberg (数据湖)
                    ↓
              ClickHouse (实时查询)
                    ↓
              Spark (批量分析)

优势:

  • 统一存储,避免数据孤岛
  • 流批一体,简化架构
  • 支持更新删除,满足 CDC 场景
  • 成本优化,冷热数据分离

关键点:这种架构实现了湖仓一体,既有数据湖的灵活性,又有数据仓库的查询性能,是现代实时数仓的发展方向。


面试技巧总结:

1. 回答要有层次

  • 先说是什么(定义)
  • 再说为什么(原理)
  • 最后说怎么做(实践)

2. 结合实际项目经验

  • 说明在什么场景下使用
  • 遇到什么问题
  • 如何解决

3. 对比不同方案

  • 说明各方案的优缺点
  • 适用场景
  • 选型依据

4. 展示技术深度

  • 不仅知道怎么用
  • 还要知道底层原理
  • 能够优化和排查问题

5. 关注业务价值

  • 技术服务于业务
  • 能说明技术带来的业务价值
  • ROI 分析

常见面试题分类:

类别题目示例考察点
架构设计Lambda vs Kappa、分层设计架构能力
核心技术Flink Exactly-Once、Watermark技术深度
性能优化数据倾斜、Checkpoint 优化优化能力
故障排查任务重启、查询慢问题解决能力
数据质量数据校验、数据对账质量意识
监控运维延迟监控、告警机制运维能力
实战经验项目案例、技术选型实践经验

面试准备建议:

  1. 深入理解核心技术:Flink、Kafka、ClickHouse 的原理和优化
  2. 总结项目经验:准备 2-3 个实战案例,包括背景、方案、效果
  3. 关注技术趋势:流批一体、湖仓一体、实时 OLAP
  4. 练习表达能力:清晰、有条理地表达技术方案
  5. 准备常见问题:架构设计、性能优化、故障排查

11. 如何设计一个高可用的实时数仓系统?

:高可用设计需要从多个层面保障。

数据采集层:

  • Kafka 多副本(replication.factor=3),保证消息不丢失
  • CDC 工具高可用部署(主备切换)
  • 数据源多通道采集,互为备份

计算层:

  • Flink 高可用配置:ZooKeeper/Kubernetes 管理 JobManager 主备
  • Checkpoint 持久化到 HDFS/S3,支持故障恢复
  • 任务自动重启策略:restart-strategy: fixed-delay,最大重试次数 + 重试间隔

存储层:

  • ClickHouse 多副本(ReplicatedMergeTree),跨机架部署
  • Redis Cluster 模式,自动故障转移
  • HBase RegionServer 自动负载均衡

服务层:

  • 多实例部署 + 负载均衡
  • 熔断降级机制(Hystrix/Sentinel)
  • 多级缓存(本地缓存 → Redis → ClickHouse)

关键配置:

// Flink 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha/

// 重启策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 30s

关键点:高可用不是单点保障,而是全链路保障,每个环节都需要有冗余和故障恢复机制。RTO(恢复时间目标)和 RPO(恢复点目标)是衡量高可用的核心指标。


12. Flink SQL 和 DataStream API 如何选择?

:两者各有优势,选择取决于业务复杂度和开发效率

对比:

维度Flink SQLDataStream API
开发效率高,声明式编程低,命令式编程
灵活性受限于 SQL 表达能力完全灵活
性能优化自动优化(Planner)手动优化
状态管理自动管理手动管理
适用场景ETL、聚合、Join复杂业务逻辑
学习成本
调试难度较难(SQL 黑盒)较易(代码可调试)

选择建议:

  • 选 Flink SQL:标准 ETL 处理、多表 Join、窗口聚合、快速开发
  • 选 DataStream API:复杂状态管理、自定义窗口、精细化控制、CEP 模式匹配
  • 混合使用:SQL 处理主流程,DataStream 处理特殊逻辑
// 混合使用示例
TableEnvironment tableEnv = StreamTableEnvironment.create(env);

// SQL 处理主流程
Table orderTable = tableEnv.sqlQuery("""
    SELECT order_id, user_id, amount, 
           TUMBLE_START(create_time, INTERVAL '1' MINUTE) AS window_start
    FROM orders
    GROUP BY TUMBLE(create_time, INTERVAL '1' MINUTE), order_id, user_id, amount
""");

// 转换为 DataStream 处理复杂逻辑
DataStream<Row> orderStream = tableEnv.toDataStream(orderTable);
DataStream<RiskResult> riskStream = orderStream
    .keyBy(row -> row.getFieldAs("user_id"))
    .process(new ComplexRiskDetectionFunction());

关键点:实际项目中推荐以 Flink SQL 为主、DataStream API 为辅的混合模式,兼顾开发效率和灵活性。


13. 实时数仓如何实现维表关联?

:维表关联是实时数仓的核心需求,有多种实现方式。

方案对比:

方案延迟吞吐一致性适用场景
预加载弱(定期刷新)小维表(< 100MB)
异步 IO中等维表
广播流强(实时更新)小维表 + 频繁更新
Temporal JoinFlink SQL 场景
外部缓存中(TTL 控制)大维表

广播流维表关联(推荐):

// 维表数据流(CDC 实时更新)
DataStream<DimUser> dimStream = env
    .addSource(new FlinkKafkaConsumer<>("dim_user_cdc", ...))
    .map(new DimUserMapper());

// 广播状态描述
MapStateDescriptor<String, DimUser> broadcastDesc = 
    new MapStateDescriptor<>("dim_user", String.class, DimUser.class);

BroadcastStream<DimUser> broadcastStream = dimStream.broadcast(broadcastDesc);

// 主流关联维表
DataStream<EnrichedOrder> enrichedStream = orderStream
    .connect(broadcastStream)
    .process(new BroadcastProcessFunction<Order, DimUser, EnrichedOrder>() {
        @Override
        public void processElement(Order order, ReadOnlyContext ctx, 
                                    Collector<EnrichedOrder> out) {
            ReadOnlyBroadcastState<String, DimUser> state = 
                ctx.getBroadcastState(broadcastDesc);
            DimUser user = state.get(order.getUserId());
            
            if (user != null) {
                out.collect(new EnrichedOrder(order, user));
            } else {
                // 维表未命中,输出到侧输出流
                ctx.output(missedTag, order);
            }
        }
        
        @Override
        public void processBroadcastElement(DimUser user, Context ctx, 
                                             Collector<EnrichedOrder> out) {
            BroadcastState<String, DimUser> state = 
                ctx.getBroadcastState(broadcastDesc);
            state.put(user.getUserId(), user);
        }
    });

Flink SQL Temporal Join:

-- 创建维表(支持 Lookup)
CREATE TABLE dim_user (
    user_id STRING,
    user_name STRING,
    province STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/dim',
    'table-name' = 'dim_user',
    'lookup.cache.max-rows' = '10000',
    'lookup.cache.ttl' = '1h'
);

-- Temporal Join
SELECT o.order_id, o.amount, u.user_name, u.province
FROM orders AS o
JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;

关键点:维表关联方案的选择取决于维表大小、更新频率和一致性要求。小维表用广播流,大维表用异步 IO + 缓存,Flink SQL 场景用 Temporal Join。


14. 如何从零搭建一个实时数仓?

:分五个阶段逐步建设。

第一阶段:基础设施搭建(1-2 周)

  • 部署 Kafka 集群(3 Broker + ZooKeeper)
  • 部署 Flink 集群(Kubernetes 模式)
  • 部署 ClickHouse 集群(2 分片 2 副本)
  • 部署监控系统(Prometheus + Grafana)

第二阶段:数据采集(1-2 周)

  • 配置 CDC 采集(Flink CDC / Canal)
  • 配置日志采集(Filebeat / Flume)
  • 建立 ODS 层 Kafka Topic

第三阶段:数据处理(2-4 周)

  • 开发 DWD 层 ETL 任务(数据清洗、转换)
  • 开发 DWS 层聚合任务(窗口聚合、指标计算)
  • 开发 ADS 层应用任务(业务指标)

第四阶段:数据服务(1-2 周)

  • 开发统一查询 API
  • 配置 Redis 缓存
  • 对接前端大屏/报表

第五阶段:运维保障(持续)

  • 完善监控告警
  • 建立数据质量体系
  • 制定应急预案
  • 持续优化性能

关键点:实时数仓建设是一个渐进式过程,先跑通核心链路,再逐步完善。初期不要追求完美架构,先满足核心业务需求,再迭代优化。


15. 实时数仓的未来发展趋势是什么?

:实时数仓正在向以下方向演进。

1. 流批一体

  • Flink 统一流批处理引擎
  • 一套代码、一套逻辑、两种执行模式
  • 降低开发和运维成本

2. 湖仓一体

  • 数据湖(Hudi/Iceberg/Paimon)+ 数据仓库融合
  • 支持 ACID 事务、时间旅行、增量查询
  • Apache Paimon(原 Flink Table Store)是重要方向

3. 实时 OLAP

  • ClickHouse、Doris、StarRocks 等实时分析引擎
  • 支持高并发、低延迟的即席查询
  • 物化视图自动刷新

4. Serverless 化

  • 按需分配计算资源
  • 自动扩缩容
  • 降低运维成本

5. AI + 实时数仓

  • 实时特征工程
  • 在线学习
  • 智能运维(AIOps)

技术趋势总结:

趋势代表技术核心价值
流批一体Flink、Paimon统一开发、降低成本
湖仓一体Hudi、Iceberg、Paimon统一存储、灵活查询
实时 OLAPDoris、StarRocks高并发、低延迟
Serverless云原生 Flink弹性伸缩、按需付费
AI 融合Feature Store实时特征、在线推理

关键点:实时数仓的终极目标是实时化、智能化、一体化,让数据从产生到价值变现的链路越来越短,成本越来越低。

面试准备建议:

  1. 深入理解核心技术:Flink、Kafka、ClickHouse 的原理和优化
  2. 总结项目经验:准备 2-3 个实战案例,包括背景、方案、效果
  3. 关注技术趋势:流批一体、湖仓一体、实时 OLAP
  4. 练习表达能力:清晰、有条理地表达技术方案
  5. 准备常见问题:架构设计、性能优化、故障排查