Grafana可视化监控平台

目录

点击展开目录

概述

什么是Grafana

Grafana是一个开源的可视化和可观测性平台,允许用户查询、可视化、告警和理解指标数据,无论数据存储在何处。它已成为监控领域的事实标准,被广泛应用于基础设施监控、应用性能监控、业务指标分析等场景。

核心价值

  • 统一可视化:将来自不同数据源的指标统一展示
  • 实时监控:提供实时数据刷新和告警能力
  • 灵活扩展:支持插件机制,可扩展数据源和可视化组件
  • 开源免费:社区活跃,企业版提供更多高级特性

核心特性

特性类别功能描述应用价值
多数据源支持支持Prometheus、InfluxDB、Elasticsearch等50+数据源统一监控平台,避免工具碎片化
丰富的可视化时序图、柱状图、热力图、地图、表格等多种图表类型满足不同场景的展示需求
告警系统灵活的告警规则、多种通知渠道(邮件、Slack、钉钉等)及时发现和响应问题
Dashboard模板变量、重复面板、链接跳转等高级功能提高Dashboard复用性和交互性
权限管理组织、团队、用户级别的权限控制满足企业级安全需求
插件生态丰富的社区插件和自定义插件开发能力扩展平台能力

应用场景

1. 基础设施监控

  • 服务器CPU、内存、磁盘、网络监控
  • Kubernetes集群监控
  • 云资源监控(AWS、Azure、GCP)

2. 应用性能监控(APM)

  • 应用响应时间、吞吐量、错误率
  • JVM监控(堆内存、GC、线程)
  • 数据库性能监控

3. 大数据平台监控

  • Flink作业监控(本文重点)
  • Spark作业监控
  • Kafka集群监控
  • HDFS、HBase监控

4. 业务指标监控

  • 订单量、交易额、用户活跃度
  • 业务漏斗分析
  • SLA监控

架构设计

核心组件

Grafana采用模块化架构设计,主要由以下核心组件构成:

graph TB subgraph "前端层" A[Web UI
React应用] B[Dashboard编辑器] C[可视化渲染引擎] end subgraph "后端层" D[API Server
Go Web服务] E[认证授权模块] F[查询引擎] G[告警引擎] end subgraph "数据层" H[SQLite/MySQL/PostgreSQL
元数据存储] I[数据源连接器] end subgraph "外部数据源" J[Prometheus] K[InfluxDB] L[Elasticsearch] M[MySQL/PostgreSQL] N[其他数据源] end A --> D B --> D C --> D D --> E D --> F D --> G D --> H F --> I I --> J I --> K I --> L I --> M I --> N style A fill:#e3f2fd style D fill:#fff3e0 style H fill:#f3e5f5 style J fill:#e8f5e9

组件职责说明

组件技术栈主要职责性能考量
Web UIReact + TypeScript用户交互界面、Dashboard渲染前端性能优化、懒加载
API ServerGo语言处理API请求、认证授权、数据查询协调并发处理能力、连接池管理
查询引擎Go语言查询优化、数据聚合、缓存管理查询并发控制、结果缓存
告警引擎Go语言告警规则评估、通知发送告警频率控制、通知去重
数据源连接器插件机制与各类数据源通信、数据格式转换连接池、超时控制
元数据存储SQLite/MySQL/PostgreSQL存储Dashboard、用户、配置等数据库索引优化

数据流转

查询数据流程

sequenceDiagram participant User as 用户浏览器 participant Frontend as Grafana前端 participant Backend as Grafana后端 participant Cache as 查询缓存 participant DS as 数据源连接器 participant Prom as Prometheus User->>Frontend: 1. 打开Dashboard Frontend->>Backend: 2. 请求Dashboard配置 Backend->>Frontend: 3. 返回Dashboard JSON Frontend->>Backend: 4. 发起数据查询请求 Backend->>Cache: 5. 检查缓存 alt 缓存命中 Cache-->>Backend: 6a. 返回缓存数据 else 缓存未命中 Backend->>DS: 6b. 转发查询到数据源 DS->>Prom: 7. 执行PromQL查询 Prom-->>DS: 8. 返回时序数据 DS-->>Backend: 9. 数据格式转换 Backend->>Cache: 10. 更新缓存 end Backend-->>Frontend: 11. 返回查询结果 Frontend->>User: 12. 渲染可视化图表

关键流程说明

  1. 认证鉴权:每个请求都会经过认证中间件验证
  2. 查询优化:后端会对查询进行优化,如时间范围调整、采样率控制
  3. 并发控制:限制同时执行的查询数量,防止数据源过载
  4. 缓存策略:根据查询时间范围和刷新频率决定是否缓存
  5. 数据转换:将不同数据源的格式统一转换为Grafana内部格式

架构演进

Grafana 11 → Grafana 12 架构升级

演进方向Grafana 11Grafana 12改进价值
Dashboard架构传统Dashboard模型Scenes库驱动的Dashboard更稳定、更灵活、支持动态Dashboard
可观测性分散的Explore功能统一的Drilldown体验从指标快速下钻到日志和追踪
代码化管理手动导入导出Git Sync + Dashboard as Code版本控制、CI/CD集成
性能优化传统表格渲染优化的表格和地图组件大数据量下性能提升10倍+
AI能力Grafana Assistant(预览)AI辅助查询和Dashboard创建

数据源集成

支持的数据源

Grafana支持50+种数据源,涵盖时序数据库、关系型数据库、日志系统、云服务等:

数据源类型典型代表主要用途查询语言
时序数据库Prometheus、InfluxDB、Graphite指标监控、性能分析PromQL、InfluxQL
关系型数据库MySQL、PostgreSQL、SQL Server业务数据查询SQL
日志系统Elasticsearch、Loki日志分析、全文搜索Lucene、LogQL
云服务CloudWatch、Azure Monitor云资源监控各云厂商查询语法
APM系统Jaeger、Tempo、Zipkin分布式追踪TraceQL
大数据ClickHouse、Druid大规模数据分析SQL

Prometheus集成

Prometheus是Grafana最常用的数据源,特别适合监控场景。

配置步骤

  1. 添加数据源
# Grafana配置文件方式
apiVersion: 1
datasources:
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true
    jsonData:
      timeInterval: 5s
      queryTimeout: 60s
      httpMethod: POST
  1. 关键配置项
配置项说明推荐值影响
access访问模式(proxy/direct)proxyproxy模式更安全,避免跨域问题
timeInterval最小查询间隔5s-15s影响查询精度和性能
queryTimeout查询超时时间60s防止慢查询阻塞
httpMethodHTTP方法POSTPOST支持更长的查询语句
  1. PromQL查询示例
# CPU使用率
100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)

# 内存使用率
(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100

# 请求QPS
rate(http_requests_total[5m])

# P95延迟
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

其他常用数据源

InfluxDB

  • 适用场景:IoT数据、高频时序数据
  • 查询语言:InfluxQL或Flux
  • 特点:写入性能强、支持连续查询

Elasticsearch

  • 适用场景:日志分析、全文搜索
  • 查询语言:Lucene查询语法
  • 特点:强大的聚合能力、支持复杂查询

MySQL/PostgreSQL

  • 适用场景:业务指标、报表数据
  • 查询语言:标准SQL
  • 特点:灵活的关联查询、支持复杂计算

查询语法详解

PromQL基础语法

PromQL(Prometheus Query Language)是Prometheus的查询语言,也是Grafana中最常用的查询语言。

1. 基本查询结构

# 即时向量查询(Instant Vector)- 返回最新的时间点数据
metric_name{label1="value1", label2="value2"}

# 范围向量查询(Range Vector)- 返回一段时间范围内的数据
metric_name{label1="value1"}[5m]

# 标量查询(Scalar)- 返回单个数值
scalar(metric_name)

2. 选择器(Selectors)

选择器类型语法示例说明
精确匹配label="value"job="flink"标签值完全匹配
正则匹配label=~"regex"job=~"flink.*"标签值匹配正则表达式
不等于label!="value"job!="test"标签值不等于
正则不匹配label!~"regex"job!~"test.*"标签值不匹配正则
# 组合使用多个选择器
flink_taskmanager_job_task_operator_numRecordsIn{
  job_name="streaming-job",
  task_name=~"Source.*",
  host!="test-host"
}

3. 时间范围选择器

# 时间单位:s(秒), m(分钟), h(小时), d(天), w(周), y(年)
metric_name[5m]   # 最近5分钟
metric_name[1h]   # 最近1小时
metric_name[7d]   # 最近7天

# 时间偏移(Offset)
metric_name offset 5m        # 5分钟前的数据
metric_name[5m] offset 1h    # 1小时前的5分钟数据

PromQL函数

1. 速率计算函数

函数用途适用场景示例
rate()计算每秒平均增长率Counter类型指标rate(http_requests_total[5m])
irate()计算瞬时增长率需要更高灵敏度的场景irate(http_requests_total[5m])
increase()计算时间范围内的增长量统计总增长increase(http_requests_total[1h])
delta()计算Gauge类型的变化量Gauge指标delta(temperature[1h])
deriv()计算导数(变化率)预测趋势deriv(disk_usage[1h])
# rate vs irate 对比
# rate: 计算5分钟内的平均速率,更平滑
rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])

# irate: 只使用最后两个数据点,更敏感
irate(flink_taskmanager_job_task_operator_numRecordsIn[5m])

# increase: 计算1小时内的总增长
increase(flink_taskmanager_job_task_operator_numRecordsIn[1h])

2. 聚合函数

函数说明保留标签示例
sum()求和by子句指定的标签sum(metric) by (label)
avg()平均值by子句指定的标签avg(metric) by (label)
min()最小值by子句指定的标签min(metric) by (label)
max()最大值by子句指定的标签max(metric) by (label)
count()计数by子句指定的标签count(metric) by (label)
stddev()标准差by子句指定的标签stddev(metric) by (label)
stdvar()方差by子句指定的标签stdvar(metric) by (label)
topk()前K个最大值所有标签topk(5, metric)
bottomk()前K个最小值所有标签bottomk(5, metric)
quantile()分位数by子句指定的标签quantile(0.95, metric)
# 按job_name聚合,计算每个作业的总吞吐量
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name)

# 计算所有TaskManager的平均内存使用率
avg(flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) * 100

# 找出吞吐量最高的5个算子
topk(5, rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]))

# 计算P95延迟
quantile(0.95, flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency)

# without: 排除指定标签,保留其他所有标签
sum(metric) without (instance, host)

3. 数学运算函数

函数说明示例
abs()绝对值abs(delta(metric[5m]))
ceil()向上取整ceil(metric)
floor()向下取整floor(metric)
round()四舍五入round(metric, 0.01)
sqrt()平方根sqrt(metric)
exp()e的指数exp(metric)
ln()自然对数ln(metric)
log2()以2为底的对数log2(metric)
log10()以10为底的对数log10(metric)

4. 时间和日期函数

# 当前时间戳
time()

# 一天中的小时(0-23)
hour()

# 一周中的星期几(0-6,0是周日)
day_of_week()

# 一个月中的第几天(1-31)
day_of_month()

# 一年中的第几天(1-365)
day_of_year()

# 月份(1-12)
month()

# 年份
year()

# 示例:只在工作日(周一到周五)触发告警
hour() >= 9 and hour() <= 18 and day_of_week() >= 1 and day_of_week() <= 5

5. 预测和趋势函数

# predict_linear: 线性预测未来值
# 预测1小时后的磁盘使用量
predict_linear(disk_usage[1h], 3600)

# holt_winters: 霍尔特-温特斯平滑预测(适合有季节性的数据)
holt_winters(metric[1d], 0.5, 0.5)

# 示例:预测4小时后磁盘是否会满
predict_linear(node_filesystem_avail_bytes[1h], 4*3600) < 0

6. 标签操作函数

# label_replace: 替换或添加标签
label_replace(
  metric,
  "new_label",           # 新标签名
  "$1",                  # 新标签值(可使用正则捕获组)
  "source_label",        # 源标签名
  "(.*)"                 # 正则表达式
)

# label_join: 连接多个标签值
label_join(
  metric,
  "new_label",           # 新标签名
  "-",                   # 分隔符
  "label1", "label2"     # 要连接的标签
)

# 示例:从instance标签提取主机名
label_replace(
  up,
  "hostname",
  "$1",
  "instance",
  "([^:]+):.*"
)

7. 排序和限制函数

# sort: 升序排序
sort(metric)

# sort_desc: 降序排序
sort_desc(metric)

# topk: 返回前K个最大值
topk(10, metric)

# bottomk: 返回前K个最小值
bottomk(10, metric)

# 示例:找出CPU使用率最高的10个节点
topk(10, 100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100))

PromQL运算符

1. 算术运算符

# 加法
metric1 + metric2

# 减法
metric1 - metric2

# 乘法
metric1 * metric2

# 除法
metric1 / metric2

# 取模
metric1 % metric2

# 幂运算
metric1 ^ metric2

# 示例:计算内存使用率
(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100

2. 比较运算符

运算符说明返回值
==等于0或1
!=不等于0或1
>大于0或1
<小于0或1
>=大于等于0或1
<=小于等于0或1
# 过滤:只返回CPU使用率大于80%的节点
node_cpu_usage > 80

# bool修饰符:返回0或1而不是过滤
node_cpu_usage > bool 80

# 示例:检查内存使用率是否超过阈值
(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.85

3. 逻辑运算符

# and: 交集(两边都存在的时间序列)
metric1 and metric2

# or: 并集(任意一边存在的时间序列)
metric1 or metric2

# unless: 差集(左边存在但右边不存在的时间序列)
metric1 unless metric2

# 示例:CPU高且内存也高的节点
(node_cpu_usage > 80) and (node_memory_usage > 80)

# 示例:有数据的指标或默认值0
metric or vector(0)

4. 向量匹配

# one-to-one匹配(默认)
metric1 + metric2

# 指定匹配标签
metric1 + on(label1, label2) metric2

# 忽略某些标签
metric1 + ignoring(label1, label2) metric2

# many-to-one匹配
metric1 + on(label1) group_left metric2

# one-to-many匹配
metric1 + on(label1) group_right metric2

# 示例:计算每个节点的网络总流量
sum(rate(node_network_receive_bytes_total[5m])) by (instance) +
sum(rate(node_network_transmit_bytes_total[5m])) by (instance)

高级查询技巧

1. 子查询

# 语法:<query>[<range>:<resolution>]
# 计算过去1小时内,每5分钟的最大CPU使用率
max_over_time(
  rate(node_cpu_seconds_total[5m])[1h:5m]
)

# 示例:检测CPU使用率的波动
stddev_over_time(
  rate(node_cpu_seconds_total[1m])[10m:1m]
) > 0.1

2. 直方图和分位数

# histogram_quantile: 从直方图计算分位数
histogram_quantile(
  0.95,  # P95
  rate(http_request_duration_seconds_bucket[5m])
)

# 按路径分组计算P95延迟
histogram_quantile(
  0.95,
  sum(rate(http_request_duration_seconds_bucket[5m])) by (le, path)
)

# 计算多个分位数
histogram_quantile(0.50, rate(metric_bucket[5m])) or
histogram_quantile(0.90, rate(metric_bucket[5m])) or
histogram_quantile(0.99, rate(metric_bucket[5m]))

3. 缺失数据处理

# 使用默认值填充缺失数据
metric or vector(0)

# 使用上一个有效值填充
metric or metric offset 1m

# 检测数据是否缺失
absent(metric{label="value"})

# 示例:如果指标不存在则告警
absent(up{job="flink"}) == 1

4. 复杂聚合示例

# 计算每个作业的平均处理延迟
avg(
  histogram_quantile(0.95,
    sum(rate(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_bucket[5m]))
    by (job_name, le)
  )
) by (job_name)

# 检测数据倾斜:同一算子不同subtask的处理量差异
(
  max(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name) -
  min(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name)
) / 
avg(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name) > 0.5

# 计算作业的整体健康分数(0-100)
(
  (flink_jobmanager_job_uptime > 0) * 40 +                                    # 运行状态 40分
  (1 - rate(flink_jobmanager_job_numRestarts[1h])) * 30 +                    # 稳定性 30分
  (rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
   (rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
    rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))) * 30          # Checkpoint成功率 30分
)

Grafana变量在查询中的使用

# 使用单选变量
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name="$job_name"}[5m])

# 使用多选变量(正则匹配)
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=~"$job_name"}[5m])

# 使用interval变量(自动调整采样率)
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__interval])

# 使用时间范围变量
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__range])

# 组合使用多个变量
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{
  job_name=~"$job_name",
  task_name=~"$task_name",
  host=~"$host"
}[$__interval])) by (operator_name)

Dashboard仪表盘

Dashboard基础

Dashboard是Grafana的核心概念,由多个Panel(面板)组成,每个Panel展示一个可视化图表。

Dashboard结构

graph TD A[Dashboard] --> B[Row 1] A --> C[Row 2] A --> D[Row 3] B --> E[Panel 1: 时序图] B --> F[Panel 2: 柱状图] C --> G[Panel 3: 表格] C --> H[Panel 4: 热力图] D --> I[Panel 5: 单值显示] D --> J[Panel 6: 仪表盘] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#fff3e0 style D fill:#fff3e0

Dashboard JSON结构

{
  "dashboard": {
    "title": "Flink作业监控",
    "uid": "flink-job-monitor",
    "tags": ["flink", "streaming"],
    "timezone": "browser",
    "refresh": "30s",
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "panels": [
      {
        "id": 1,
        "type": "graph",
        "title": "Records In/Out Rate",
        "targets": [
          {
            "expr": "rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])",
            "legendFormat": "{{job_name}} - {{task_name}}"
          }
        ]
      }
    ]
  }
}

可视化组件

Grafana提供丰富的可视化组件,满足不同场景需求:

组件类型适用场景关键配置使用建议
Time Series(时序图)趋势分析、性能监控线条样式、填充、堆叠最常用,适合展示指标变化趋势
Bar Chart(柱状图)对比分析、排名方向、分组、排序适合展示Top N、分类对比
Stat(单值显示)关键指标、KPI阈值、颜色、单位突出显示核心指标
Gauge(仪表盘)百分比、容量最小/最大值、阈值直观展示使用率
Table(表格)详细数据、列表列配置、排序、过滤适合展示多维度数据
Heatmap(热力图)分布分析、延迟颜色方案、桶大小展示数据分布和密度
Geomap(地图)地理位置、区域地图层、标记展示地理分布

时序图配置示例

{
  "type": "timeseries",
  "title": "Flink Checkpoint Duration",
  "fieldConfig": {
    "defaults": {
      "unit": "ms",
      "color": {
        "mode": "palette-classic"
      },
      "custom": {
        "lineWidth": 2,
        "fillOpacity": 10,
        "showPoints": "never"
      },
      "thresholds": {
        "mode": "absolute",
        "steps": [
          {"value": null, "color": "green"},
          {"value": 5000, "color": "yellow"},
          {"value": 10000, "color": "red"}
        ]
      }
    }
  }
}

变量与模板

变量使Dashboard更加灵活和可复用,支持动态切换监控对象。

变量类型

变量类型数据来源使用场景示例
Query数据源查询动态获取实例、作业列表label_values(flink_jobmanager_job_uptime, job_name)
Custom手动定义固定选项列表环境:dev, test, prod
Constant常量全局配置集群名称、区域
Interval时间间隔动态调整采样率1m, 5m, 15m, 1h
Data source数据源列表切换数据源Prometheus实例切换

变量定义示例

{
  "templating": {
    "list": [
      {
        "name": "job_name",
        "type": "query",
        "datasource": "Prometheus",
        "query": "label_values(flink_jobmanager_job_uptime, job_name)",
        "refresh": 1,
        "multi": true,
        "includeAll": true
      },
      {
        "name": "interval",
        "type": "interval",
        "query": "1m,5m,15m,30m,1h",
        "auto": true,
        "auto_count": 30
      }
    ]
  }
}

变量使用

# 在查询中使用变量
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=~"$job_name"}[$interval])

# 在Panel标题中使用
title: "Records In Rate - $job_name"

# 在Legend中使用
legendFormat: "{{job_name}} - {{task_name}}"

告警机制

告警规则配置

Grafana告警系统支持多种告警规则类型,可以基于查询结果触发告警。

告警规则结构

# 告警规则示例
apiVersion: 1
groups:
  - name: flink_alerts
    interval: 30s
    rules:
      - alert: FlinkJobDown
        expr: flink_jobmanager_job_uptime == 0
        for: 5m
        labels:
          severity: critical
          component: flink
        annotations:
          summary: "Flink作业 {{ $labels.job_name }} 已停止"
          description: "作业已停止超过5分钟,请立即检查"

告警状态流转

stateDiagram-v2 [*] --> Normal: 指标正常 Normal --> Pending: 触发条件满足 Pending --> Alerting: 持续时间达到for阈值 Pending --> Normal: 条件不再满足 Alerting --> Normal: 问题解决 Alerting --> Alerting: 持续告警

通知渠道

Grafana支持多种告警通知渠道:

通知渠道适用场景配置复杂度特点
Email通用场景支持HTML格式、附件
Slack团队协作实时通知、支持交互
钉钉/企业微信国内团队需要配置Webhook
PagerDuty值班管理支持升级策略、排班
Webhook自定义集成灵活性最高
Telegram个人通知轻量级、跨平台

钉钉通知配置示例

{
  "name": "DingTalk",
  "type": "webhook",
  "settings": {
    "url": "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN",
    "httpMethod": "POST",
    "username": "Grafana Alert"
  }
}

告警最佳实践

1. 告警分级

级别响应时间通知方式典型场景
Critical立即响应(5分钟内)电话+短信+IM服务完全不可用、数据丢失
Warning30分钟内响应IM+邮件性能下降、资源使用率高
Info工作时间处理邮件配置变更、版本升级

2. 告警降噪策略

  • 静默规则:维护窗口期间自动静默告警
  • 分组聚合:相同类型告警合并通知
  • 抑制规则:上游故障时抑制下游告警
  • 去重:相同告警在时间窗口内只发送一次

3. 告警模板优化

// 告警消息模板
{{ define "alert.title" }}
[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}
{{ end }}

{{ define "alert.message" }}
**告警详情**
- 作业名称{{ .Labels.job_name }}
- 严重程度{{ .Labels.severity }}
- 触发时间{{ .StartsAt.Format "2006-01-02 15:04:05" }}
- 当前值{{ .Values.B }}
- 阈值{{ .Values.C }}

**问题描述**
{{ .Annotations.description }}

**处理建议**
{{ .Annotations.runbook_url }}
{{ end }}

Flink监控实战

Flink与Prometheus集成

Flink原生支持Prometheus指标上报,通过PrometheusReporter实现。

1. Flink配置

# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9250-9260
metrics.reporters: prom

# 启用详细网络指标
taskmanager.network.detailed-metrics: true

# 指标上报间隔
metrics.reporter.prom.interval: 10 SECONDS

2. Kubernetes部署配置

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-job-example
spec:
  flinkConfiguration:
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: 9250-9260
    metrics.reporters: prom
    taskmanager.network.detailed-metrics: "true"
  
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2

3. Prometheus抓取配置

# prometheus.yml
scrape_configs:
  - job_name: 'flink'
    kubernetes_sd_configs:
      - role: pod
        namespaces:
          names:
            - flink-namespace
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_type]
        action: keep
        regex: flink-native-kubernetes
      - source_labels: [__meta_kubernetes_pod_ip]
        action: replace
        target_label: __address__
        replacement: '$1:9250'
    scrape_interval: 15s
    scrape_timeout: 10s

4. PodMonitor配置(Prometheus Operator)

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: flink-pod-monitor
  labels:
    release: prometheus
spec:
  namespaceSelector:
    matchNames:
      - flink-namespace
  selector:
    matchLabels:
      type: flink-native-kubernetes
  podMetricsEndpoints:
    - path: /
      port: metrics
      interval: 15s
      relabelings:
        - action: replace
          sourceLabels: [__meta_kubernetes_pod_ip]
          targetLabel: __address__
          replacement: '$1:9250'

Flink核心指标

Flink提供了丰富的监控指标,涵盖作业运行、资源使用、数据处理等各个方面。

1. 作业级别指标

指标名称指标含义PromQL示例告警阈值建议
flink_jobmanager_job_uptime作业运行时间flink_jobmanager_job_uptime== 0(作业停止)
flink_jobmanager_job_numRestarts作业重启次数rate(flink_jobmanager_job_numRestarts[5m])> 0(频繁重启)
flink_jobmanager_job_lastCheckpointDuration最近Checkpoint耗时flink_jobmanager_job_lastCheckpointDuration> 60000ms
flink_jobmanager_job_lastCheckpointSize最近Checkpoint大小flink_jobmanager_job_lastCheckpointSize持续增长
flink_jobmanager_job_numberOfFailedCheckpointsCheckpoint失败次数rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m])> 0

2. 算子级别指标

指标名称指标含义监控重点优化方向
numRecordsIn输入记录数吞吐量、数据倾斜增加并行度、优化分区
numRecordsOut输出记录数数据处理效率优化算子逻辑
numRecordsInPerSecond每秒输入记录数实时吞吐量扩容、优化上游
numBytesIn输入字节数网络带宽数据压缩、序列化优化
currentInputWatermark当前输入水位线事件时间进度调整水位线策略
numLateRecordsDropped丢弃的迟到数据数据完整性增加允许延迟时间

3. TaskManager资源指标

指标名称指标含义PromQL查询
JVM堆内存使用堆内存使用率flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max * 100
JVM非堆内存非堆内存使用flink_taskmanager_Status_JVM_Memory_NonHeap_Used
GC时间GC耗时rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Time[5m])
GC次数GC频率rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Count[5m])
线程数活跃线程数flink_taskmanager_Status_JVM_Threads_Count
CPU使用率CPU负载flink_taskmanager_Status_JVM_CPU_Load

4. 反压指标

# 反压比例(0-1之间,越接近1反压越严重)
flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000

# 繁忙比例
flink_taskmanager_job_task_busyTimeMsPerSecond / 1000

# 空闲比例
flink_taskmanager_job_task_idleTimeMsPerSecond / 1000

反压分析

反压比例状态原因分析处理建议
< 0.1健康处理能力充足无需处理
0.1 - 0.5轻微反压短暂的数据峰值观察趋势
0.5 - 0.8中度反压下游处理能力不足增加并行度、优化算子
> 0.8严重反压系统瓶颈紧急扩容、排查慢算子

5. Kafka消费指标

# Kafka消费延迟(Lag)
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max

# Kafka消费速率
rate(flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_total[5m])

# Kafka分区数
flink_taskmanager_job_task_operator_KafkaConsumer_assigned_partitions

6. 状态后端指标

Flink状态后端的监控对于理解作业的状态管理性能至关重要。

指标分类指标名称指标含义PromQL示例
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_bytes_readRocksDB读取字节数rate(flink_taskmanager_job_task_operator_rocksdb_bytes_read[5m])
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_bytes_writtenRocksDB写入字节数rate(flink_taskmanager_job_task_operator_rocksdb_bytes_written[5m])
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_compaction_pending待压缩的文件数flink_taskmanager_job_task_operator_rocksdb_compaction_pending
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_mem_table_flush_pending待刷新的MemTable数flink_taskmanager_job_task_operator_rocksdb_mem_table_flush_pending
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_num_running_compactions正在运行的压缩任务数flink_taskmanager_job_task_operator_rocksdb_num_running_compactions
RocksDB状态flink_taskmanager_job_task_operator_rocksdb_num_running_flushes正在运行的刷新任务数flink_taskmanager_job_task_operator_rocksdb_num_running_flushes
状态大小flink_taskmanager_job_task_operator_state_size算子状态大小flink_taskmanager_job_task_operator_state_size

RocksDB性能分析

# RocksDB读写比例
rate(flink_taskmanager_job_task_operator_rocksdb_bytes_read[5m]) /
rate(flink_taskmanager_job_task_operator_rocksdb_bytes_written[5m])

# RocksDB压缩压力
flink_taskmanager_job_task_operator_rocksdb_compaction_pending +
flink_taskmanager_job_task_operator_rocksdb_num_running_compactions

# 状态大小增长率
deriv(flink_taskmanager_job_task_operator_state_size[30m])

7. 网络指标

网络指标反映了TaskManager之间的数据传输情况。

指标名称指标含义监控重点PromQL示例
numBytesInLocal本地接收字节数本地数据传输rate(flink_taskmanager_job_task_numBytesInLocal[5m])
numBytesInRemote远程接收字节数跨节点数据传输rate(flink_taskmanager_job_task_numBytesInRemote[5m])
numBytesOut输出字节数数据发送量rate(flink_taskmanager_job_task_numBytesOut[5m])
numBuffersInLocal本地接收缓冲区数本地缓冲区使用rate(flink_taskmanager_job_task_numBuffersInLocal[5m])
numBuffersInRemote远程接收缓冲区数远程缓冲区使用rate(flink_taskmanager_job_task_numBuffersInRemote[5m])
numBuffersOut输出缓冲区数缓冲区发送量rate(flink_taskmanager_job_task_numBuffersOut[5m])
inputQueueLength输入队列长度输入队列积压flink_taskmanager_job_task_buffers_inputQueueLength
outputQueueLength输出队列长度输出队列积压flink_taskmanager_job_task_buffers_outputQueueLength
inPoolUsage输入缓冲池使用率输入缓冲池压力flink_taskmanager_job_task_buffers_inPoolUsage
outPoolUsage输出缓冲池使用率输出缓冲池压力flink_taskmanager_job_task_buffers_outPoolUsage

网络性能分析

# 网络传输总带宽
sum(rate(flink_taskmanager_job_task_numBytesInLocal[5m])) +
sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])) +
sum(rate(flink_taskmanager_job_task_numBytesOut[5m]))

# 远程传输比例(高比例可能导致性能下降)
sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])) /
(sum(rate(flink_taskmanager_job_task_numBytesInLocal[5m])) +
 sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])))

# 缓冲区积压检测
max(flink_taskmanager_job_task_buffers_inputQueueLength) > 100 or
max(flink_taskmanager_job_task_buffers_outputQueueLength) > 100

8. Checkpoint详细指标

Checkpoint是Flink容错机制的核心,需要重点监控。

指标名称指标含义告警阈值PromQL示例
lastCheckpointDuration最近Checkpoint耗时> 60sflink_jobmanager_job_lastCheckpointDuration
lastCheckpointSize最近Checkpoint大小持续增长flink_jobmanager_job_lastCheckpointSize
lastCheckpointExternalPath外部Checkpoint路径-flink_jobmanager_job_lastCheckpointExternalPath
lastCheckpointRestoreTimestamp最近恢复时间戳-flink_jobmanager_job_lastCheckpointRestoreTimestamp
lastCheckpointAlignmentBufferedCheckpoint对齐缓冲数据量> 100MBflink_jobmanager_job_lastCheckpointAlignmentBuffered
numberOfCompletedCheckpoints完成的Checkpoint数-flink_jobmanager_job_numberOfCompletedCheckpoints
numberOfFailedCheckpoints失败的Checkpoint数> 0flink_jobmanager_job_numberOfFailedCheckpoints
numberOfInProgressCheckpoints进行中的Checkpoint数> 1flink_jobmanager_job_numberOfInProgressCheckpoints
totalNumberOfCheckpointsCheckpoint总数-flink_jobmanager_job_totalNumberOfCheckpoints

Checkpoint健康度评估

# Checkpoint成功率
rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
 rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m])) * 100

# Checkpoint耗时趋势(检测是否持续增长)
deriv(flink_jobmanager_job_lastCheckpointDuration[1h]) > 1000

# Checkpoint大小增长率(每小时)
rate(flink_jobmanager_job_lastCheckpointSize[1h])

# Checkpoint对齐时间占比(高比例表示反压严重)
flink_jobmanager_job_lastCheckpointAlignmentBuffered /
flink_jobmanager_job_lastCheckpointSize > 0.3

9. 窗口和水位线指标

对于使用事件时间和窗口的作业,这些指标至关重要。

指标名称指标含义监控重点PromQL示例
currentInputWatermark当前输入水位线水位线进度flink_taskmanager_job_task_operator_currentInputWatermark
currentOutputWatermark当前输出水位线水位线传播flink_taskmanager_job_task_operator_currentOutputWatermark
watermarkLag水位线延迟事件时间延迟time() * 1000 - flink_taskmanager_job_task_operator_currentInputWatermark
numLateRecordsDropped丢弃的迟到记录数数据丢失rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m])
latenessHistogram延迟分布直方图延迟分布flink_taskmanager_job_task_operator_latenessHistogram

水位线分析

# 水位线延迟(毫秒)
(time() * 1000) - flink_taskmanager_job_task_operator_currentInputWatermark

# 水位线停滞检测(5分钟内水位线没有前进)
delta(flink_taskmanager_job_task_operator_currentInputWatermark[5m]) == 0

# 不同算子的水位线差异(检测水位线传播问题)
max(flink_taskmanager_job_task_operator_currentOutputWatermark) by (operator_name) -
min(flink_taskmanager_job_task_operator_currentOutputWatermark) by (operator_name)

# 迟到数据比例
rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m]) /
rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]) * 100

10. 自定义指标

Flink支持用户自定义指标,常见的自定义指标包括:

指标类型使用场景示例
Counter计数器,累加值处理的订单数、错误次数
Gauge瞬时值当前队列长度、连接数
Histogram分布统计处理延迟分布、数据大小分布
Meter速率统计每秒处理的事件数
// 自定义Counter示例
public class MyMapFunction extends RichMapFunction<String, String> {
    private transient Counter counter;
    
    @Override
    public void open(Configuration parameters) {
        this.counter = getRuntimeContext()
            .getMetricGroup()
            .counter("myCounter");
    }
    
    @Override
    public String map(String value) {
        counter.inc();
        return value.toUpperCase();
    }
}

// 自定义Gauge示例
getRuntimeContext()
    .getMetricGroup()
    .gauge("queueSize", () -> queue.size());

// 自定义Histogram示例
Histogram histogram = getRuntimeContext()
    .getMetricGroup()
    .histogram("myHistogram", new DescriptiveStatisticsHistogram(100));
histogram.update(value);

11. 完整的指标层次结构

Flink指标按照以下层次组织:

<host>.<taskmanager_id>.<job_name>.<operator_name>.<subtask_index>.<metric_name>

指标命名规范

层级说明示例
hostTaskManager主机名flink-tm-001
taskmanager_idTaskManager IDtaskmanager-0
job_name作业名称streaming-job
task_name任务名称Source: Kafka
operator_name算子名称Map
subtask_index子任务索引0, 1, 2
metric_name指标名称numRecordsIn

按层级查询指标

# 查询特定作业的所有指标
{job_name="streaming-job"}

# 查询特定算子的所有指标
{job_name="streaming-job", operator_name="Map"}

# 查询特定TaskManager的所有指标
{host="flink-tm-001"}

# 聚合所有subtask的指标
sum(flink_taskmanager_job_task_operator_numRecordsIn{job_name="streaming-job"}) by (operator_name)

12. 指标采集最佳实践

实践项建议原因
采集间隔10-30秒平衡实时性和系统开销
保留时间原始数据7天,聚合数据30天满足短期分析和长期趋势
标签数量控制在10个以内避免高基数问题
指标命名使用统一前缀和命名规范便于查询和管理
告警阈值根据历史数据动态调整减少误报和漏报
聚合粒度使用Recording Rules预聚合提高查询性能

Grafana Dashboard配置

完整的Flink监控Dashboard示例

{
  "dashboard": {
    "title": "Flink作业监控Dashboard",
    "tags": ["flink", "streaming", "production"],
    "timezone": "browser",
    "refresh": "30s",
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "templating": {
      "list": [
        {
          "name": "job_name",
          "type": "query",
          "datasource": "Prometheus",
          "query": "label_values(flink_jobmanager_job_uptime, job_name)",
          "refresh": 1,
          "multi": false
        },
        {
          "name": "task_name",
          "type": "query",
          "datasource": "Prometheus",
          "query": "label_values(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\"}, task_name)",
          "refresh": 1,
          "multi": true,
          "includeAll": true
        }
      ]
    },
    "panels": [
      {
        "id": 1,
        "type": "stat",
        "title": "作业状态",
        "targets": [
          {
            "expr": "flink_jobmanager_job_uptime{job_name=\"$job_name\"}",
            "legendFormat": "运行时间"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "s",
            "mappings": [
              {
                "type": "special",
                "options": {
                  "match": "null",
                  "result": {"text": "已停止", "color": "red"}
                }
              }
            ],
            "thresholds": {
              "steps": [
                {"value": 0, "color": "red"},
                {"value": 1, "color": "green"}
              ]
            }
          }
        }
      },
      {
        "id": 2,
        "type": "timeseries",
        "title": "Records In/Out Rate",
        "targets": [
          {
            "expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\",task_name=~\"$task_name\"}[1m])) by (task_name)",
            "legendFormat": "{{task_name}} - In"
          },
          {
            "expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsOut{job_name=\"$job_name\",task_name=~\"$task_name\"}[1m])) by (task_name)",
            "legendFormat": "{{task_name}} - Out"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "rps",
            "custom": {
              "lineWidth": 2,
              "fillOpacity": 10
            }
          }
        }
      },
      {
        "id": 3,
        "type": "timeseries",
        "title": "Checkpoint Duration",
        "targets": [
          {
            "expr": "flink_jobmanager_job_lastCheckpointDuration{job_name=\"$job_name\"}",
            "legendFormat": "Checkpoint耗时"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "ms",
            "thresholds": {
              "steps": [
                {"value": null, "color": "green"},
                {"value": 30000, "color": "yellow"},
                {"value": 60000, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "id": 4,
        "type": "gauge",
        "title": "反压状态",
        "targets": [
          {
            "expr": "avg(flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_name=\"$job_name\"} / 1000)",
            "legendFormat": "反压比例"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percentunit",
            "min": 0,
            "max": 1,
            "thresholds": {
              "steps": [
                {"value": 0, "color": "green"},
                {"value": 0.5, "color": "yellow"},
                {"value": 0.8, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "id": 5,
        "type": "timeseries",
        "title": "JVM堆内存使用",
        "targets": [
          {
            "expr": "flink_taskmanager_Status_JVM_Memory_Heap_Used{job_name=\"$job_name\"} / flink_taskmanager_Status_JVM_Memory_Heap_Max{job_name=\"$job_name\"} * 100",
            "legendFormat": "{{host}} - 堆内存使用率"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "max": 100,
            "thresholds": {
              "steps": [
                {"value": null, "color": "green"},
                {"value": 70, "color": "yellow"},
                {"value": 85, "color": "red"}
              ]
            }
          }
        }
      },
      {
        "id": 6,
        "type": "table",
        "title": "算子处理详情",
        "targets": [
          {
            "expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\"}[5m])) by (operator_name)",
            "format": "table",
            "instant": true
          }
        ],
        "transformations": [
          {
            "id": "organize",
            "options": {
              "excludeByName": {},
              "indexByName": {},
              "renameByName": {
                "operator_name": "算子名称",
                "Value": "处理速率(records/s)"
              }
            }
          }
        ]
      }
    ]
  }
}

Dashboard布局建议

graph TD subgraph "第一行:核心状态" A1[作业状态] A2[运行时长] A3[重启次数] A4[Checkpoint成功率] end subgraph "第二行:吞吐量" B1[Records In/Out Rate] B2[Bytes In/Out Rate] end subgraph "第三行:性能指标" C1[Checkpoint Duration] C2[反压状态] C3[延迟分布] end subgraph "第四行:资源使用" D1[CPU使用率] D2[内存使用率] D3[GC时间] end subgraph "第五行:详细信息" E1[算子处理详情表格] E2[TaskManager列表] end style A1 fill:#e8f5e9 style B1 fill:#e3f2fd style C1 fill:#fff3e0 style D1 fill:#fce4ec

常见监控场景

场景1:作业健康度监控

# 作业是否运行
flink_jobmanager_job_uptime > 0

# 作业重启频率(每小时)
increase(flink_jobmanager_job_numRestarts[1h])

# Checkpoint成功率
(
  rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
  (rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) + 
   rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))
) * 100

# 告警规则
alert: FlinkJobUnhealthy
expr: flink_jobmanager_job_uptime == 0 or increase(flink_jobmanager_job_numRestarts[1h]) > 3
for: 5m
labels:
  severity: critical
annotations:
  summary: "Flink作业不健康"

场景2:性能瓶颈分析

# 识别慢算子(处理速率低)
topk(5, 
  rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
)

# 识别反压算子
topk(5,
  flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000
) > 0.5

# 数据倾斜检测(同一算子不同subtask的处理量差异)
stddev(
  rate(flink_taskmanager_job_task_operator_numRecordsIn{operator_name="MyOperator"}[5m])
) by (operator_name) / 
avg(
  rate(flink_taskmanager_job_task_operator_numRecordsIn{operator_name="MyOperator"}[5m])
) by (operator_name) > 0.3

场景3:资源使用监控

# TaskManager内存使用率
(
  flink_taskmanager_Status_JVM_Memory_Heap_Used +
  flink_taskmanager_Status_JVM_Memory_NonHeap_Used
) / (
  flink_taskmanager_Status_JVM_Memory_Heap_Max +
  flink_taskmanager_Status_JVM_Memory_NonHeap_Max
) * 100

# GC压力(GC时间占比)
rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[5m]) / 
rate(flink_taskmanager_Status_JVM_CPU_Time[5m]) * 100

# 网络缓冲区使用率
flink_taskmanager_Status_Network_TotalMemorySegments - 
flink_taskmanager_Status_Network_AvailableMemorySegments

# 告警规则
alert: HighMemoryUsage
expr: (flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) > 0.85
for: 10m
labels:
  severity: warning
annotations:
  summary: "TaskManager内存使用率过高"

场景4:数据处理监控

# 端到端延迟(从Kafka消费到处理完成)
histogram_quantile(0.99,
  rate(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_bucket[5m])
)

# Kafka消费延迟
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max

# 数据丢失检测(输入输出不匹配)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) -
sum(rate(flink_taskmanager_job_task_operator_numRecordsOut[5m]))

# 迟到数据量
rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m])

场景5:Checkpoint监控

# Checkpoint耗时趋势
flink_jobmanager_job_lastCheckpointDuration

# Checkpoint大小增长
deriv(flink_jobmanager_job_lastCheckpointSize[30m])

# Checkpoint失败率
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) + 
 rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))

# Checkpoint对齐时间(反映反压)
flink_jobmanager_job_lastCheckpointAlignmentBuffered

# 告警规则
alert: CheckpointTooSlow
expr: flink_jobmanager_job_lastCheckpointDuration > 60000
for: 15m
labels:
  severity: warning
annotations:
  summary: "Checkpoint耗时过长,可能影响故障恢复"

性能调优指导

基于Grafana监控数据,可以进行针对性的性能调优。

调优决策树

graph TD A[监控发现问题] --> B{问题类型?} B -->|吞吐量低| C{反压状态?} C -->|有反压| D[定位反压算子] C -->|无反压| E[检查上游数据源] D --> F{资源使用率?} F -->|CPU高| G[优化算子逻辑
增加并行度] F -->|内存高| H[优化状态大小
调整内存配置] F -->|正常| I[检查外部依赖
数据库/API] B -->|延迟高| J{Checkpoint耗时?} J -->|耗时长| K[优化状态后端
增加Checkpoint间隔] J -->|正常| L[检查水位线策略
窗口配置] B -->|资源不足| M{哪种资源?} M -->|内存| N[增加TaskManager内存
优化状态TTL] M -->|CPU| O[增加并行度
扩容TaskManager] M -->|网络| P[优化序列化
启用压缩] B -->|数据倾斜| Q[重新分区
自定义KeySelector] style A fill:#ffebee style G fill:#e8f5e9 style H fill:#e8f5e9 style I fill:#e8f5e9 style K fill:#e8f5e9 style L fill:#e8f5e9 style N fill:#e8f5e9 style O fill:#e8f5e9 style P fill:#e8f5e9 style Q fill:#e8f5e9

调优参数对照表

监控指标异常可能原因调优参数调整建议
反压比例 > 0.8下游处理慢parallelism增加算子并行度
Checkpoint耗时 > 60s状态过大state.backend.incremental启用增量Checkpoint
Checkpoint耗时 > 60s状态过大execution.checkpointing.interval增加Checkpoint间隔
堆内存使用率 > 85%内存不足taskmanager.memory.managed.size增加托管内存
GC时间占比 > 10%频繁GCtaskmanager.memory.jvm-overhead.fraction增加JVM开销内存
Kafka Lag持续增长消费慢parallelism.default增加全局并行度
numLateRecordsDropped > 0水位线过快watermark.interval增加允许延迟时间
网络缓冲区不足网络瓶颈taskmanager.network.memory.fraction增加网络内存比例

典型调优案例

案例1:反压导致吞吐量下降

# 问题:监控显示某个算子反压比例0.9,吞吐量下降50%
# 分析步骤:
# 1. 查看该算子的CPU和内存使用率
# 2. 检查算子逻辑是否有慢操作(同步IO、复杂计算)
# 3. 查看下游算子的处理能力

# 解决方案:
# 方案1:增加并行度
parallelism: 4 -> 8

# 方案2:异步IO优化
dataStream
  .flatMap(new MyFlatMapFunction())
  .setParallelism(8)
  .keyBy(...)
  .process(new AsyncProcessFunction())  # 使用异步处理

# 方案3:拆分算子
# 将复杂算子拆分为多个简单算子,提高并行度

案例2:Checkpoint耗时过长

# 问题:Checkpoint耗时从10s增长到120s
# 分析:状态大小持续增长,从1GB增长到50GB

# 解决方案:
# 方案1:启用增量Checkpoint
state.backend: rocksdb
state.backend.incremental: true

# 方案2:增加Checkpoint间隔
execution.checkpointing.interval: 60s -> 300s

# 方案3:配置状态TTL
StateTtlConfig ttlConfig = StateTtlConfig
  .newBuilder(Time.hours(24))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build();

valueStateDescriptor.enableTimeToLive(ttlConfig);

高级特性

Grafana 12新特性

Grafana 12(2025年5月发布)带来了重大架构升级和新功能。

1. 可观测性即代码(Observability as Code)

支持将Dashboard、告警规则等配置以代码形式管理:

# dashboard.yaml
apiVersion: grizzly.grafana.com/v1alpha1
kind: Dashboard
metadata:
  name: flink-monitoring
spec:
  title: Flink作业监控
  tags: [flink, production]
  panels:
    - type: timeseries
      title: Records Rate
      queries:
        - expr: rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
          datasource: Prometheus

优势

  • 版本控制:使用Git管理Dashboard变更历史
  • CI/CD集成:自动化部署和测试
  • 团队协作:通过PR进行Dashboard审查
  • 环境一致性:开发、测试、生产环境配置同步

2. Git Sync功能

# 配置Git同步
apiVersion: 1
git:
  enabled: true
  repository: https://github.com/your-org/grafana-dashboards.git
  branch: main
  path: dashboards/
  sync_interval: 5m
  auth:
    type: token
    token: ${GIT_TOKEN}

3. 动态Dashboard

支持基于条件逻辑的动态面板显示:

{
  "panels": [
    {
      "id": 1,
      "title": "高级指标",
      "condition": {
        "type": "variable",
        "variable": "show_advanced",
        "operator": "==",
        "value": "true"
      }
    }
  ]
}

4. Drilldown体验

从指标快速下钻到日志和追踪:

graph LR A[Metrics
指标异常] --> B[Logs
查看日志] B --> C[Traces
分布式追踪] C --> D[Profiles
性能剖析] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#fce4ec

可观测性即代码

完整的GitOps工作流

# 1. 导出现有Dashboard
grafana-cli dashboard export flink-monitoring > flink-dashboard.json

# 2. 转换为YAML格式
grr get Dashboard/flink-monitoring > flink-dashboard.yaml

# 3. 提交到Git仓库
git add flink-dashboard.yaml
git commit -m "Update Flink dashboard"
git push origin main

# 4. 自动同步到Grafana
# Grafana会自动拉取Git仓库的变更并应用

CI/CD集成示例

# .github/workflows/grafana-sync.yml
name: Sync Grafana Dashboards

on:
  push:
    branches: [main]
    paths:
      - 'dashboards/**'

jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Install Grizzly
        run: |
          curl -fSL -o grr.tar.gz https://github.com/grafana/grizzly/releases/download/v0.4.0/grr-linux-amd64
          tar -xzf grr.tar.gz
          chmod +x grr
      
      - name: Apply Dashboards
        env:
          GRAFANA_URL: ${{ secrets.GRAFANA_URL }}
          GRAFANA_TOKEN: ${{ secrets.GRAFANA_TOKEN }}
        run: |
          ./grr apply dashboards/

动态仪表盘

使用场景

  • 根据用户角色显示不同的面板
  • 根据环境(开发/生产)调整告警阈值
  • 根据时间范围自动调整采样率

实现示例

{
  "panels": [
    {
      "id": 1,
      "title": "详细指标",
      "condition": {
        "type": "permission",
        "permission": "Admin"
      }
    },
    {
      "id": 2,
      "title": "生产环境告警",
      "condition": {
        "type": "variable",
        "variable": "environment",
        "operator": "==",
        "value": "production"
      },
      "alert": {
        "threshold": 100
      }
    },
    {
      "id": 3,
      "title": "开发环境告警",
      "condition": {
        "type": "variable",
        "variable": "environment",
        "operator": "==",
        "value": "development"
      },
      "alert": {
        "threshold": 500
      }
    }
  ]
}

最佳实践

Dashboard设计原则

1. 信息层次化

第一层(最重要):核心KPI、作业状态
  ↓
第二层:吞吐量、延迟等性能指标
  ↓
第三层:资源使用、详细指标
  ↓
第四层:调试信息、详细日志

2. 颜色使用规范

颜色含义使用场景阈值示例
绿色正常指标在健康范围内CPU < 70%
黄色警告需要关注但未严重CPU 70-85%
红色严重需要立即处理CPU > 85%
蓝色信息中性指标展示数据量统计
灰色未知/禁用数据缺失或功能关闭服务停止

3. 命名规范

Dashboard命名:[系统]-[模块]-[环境]
示例:Flink-StreamingJob-Production

Panel命名:[指标类型] - [具体指标]
示例:Throughput - Records In Rate

变量命名:小写+下划线
示例:job_name, task_name, environment

4. 性能优化建议

优化项问题解决方案效果
查询优化查询过于复杂使用Recording Rules预聚合查询速度提升10倍+
时间范围查询时间跨度过大限制默认时间范围为1-6小时减少数据传输量
刷新频率刷新过于频繁根据场景设置合理刷新间隔降低后端负载
面板数量单个Dashboard面板过多拆分为多个Dashboard加载速度提升
数据点密度数据点过密使用$__interval动态调整前端渲染更流畅

Recording Rules示例

# prometheus-rules.yaml
groups:
  - name: flink_aggregations
    interval: 30s
    rules:
      # 预聚合:作业级别的总吞吐量
      - record: job:flink_records_in_rate:sum
        expr: sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name)
      
      # 预聚合:TaskManager平均内存使用率
      - record: job:flink_memory_usage:avg
        expr: avg(flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) by (job_name)

性能优化

1. 查询优化技巧

# 不推荐:查询所有标签
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]))

# 推荐:只保留必要标签
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name="my-job"}[5m])) by (task_name)

# 不推荐:使用正则表达式
{job_name=~".*flink.*"}

# 推荐:使用精确匹配
{job_name="flink-streaming-job"}

# 使用$__interval变量自动调整采样率
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__interval])

2. 缓存策略

# grafana.ini
[caching]
enabled = true

[query_cache]
enabled = true
ttl = 5m
max_cache_size_mb = 1000

3. 数据源连接池

# datasource配置
jsonData:
  maxOpenConns: 100
  maxIdleConns: 10
  connMaxLifetime: 14400

安全配置

1. 认证配置

# grafana.ini
[auth]
disable_login_form = false
disable_signout_menu = false

[auth.ldap]
enabled = true
config_file = /etc/grafana/ldap.toml

[auth.oauth]
enabled = true
name = OAuth
allow_sign_up = true
client_id = YOUR_CLIENT_ID
client_secret = YOUR_CLIENT_SECRET

2. 权限管理

角色权限适用人员
Viewer只读Dashboard普通开发人员、业务人员
Editor创建和编辑Dashboard运维人员、SRE
Admin完全控制权限系统管理员

3. API密钥管理

# 创建API密钥
curl -X POST -H "Content-Type: application/json" \
  -d '{"name":"monitoring-api","role":"Viewer"}' \
  http://admin:admin@localhost:3000/api/auth/keys

# 使用API密钥
curl -H "Authorization: Bearer YOUR_API_KEY" \
  http://localhost:3000/api/dashboards/uid/flink-monitoring

4. 数据源权限隔离

# 为不同团队配置独立的数据源
datasources:
  - name: Prometheus-Team-A
    type: prometheus
    url: http://prometheus-team-a:9090
    access: proxy
    basicAuth: true
    basicAuthUser: team-a
    secureJsonData:
      basicAuthPassword: ${TEAM_A_PASSWORD}

常见问题与排查

数据源连接问题

问题1:数据源连接超时

# 症状
Error: context deadline exceeded

# 排查步骤
# 1. 检查网络连通性
curl -v http://prometheus:9090/api/v1/query?query=up

# 2. 检查Grafana日志
docker logs grafana | grep -i "error\|timeout"

# 3. 增加超时时间
# datasource配置
jsonData:
  timeout: 60
  queryTimeout: 120s

问题2:数据源认证失败

# 症状
Error: 401 Unauthorized

# 解决方案
# 1. 检查认证配置
# 2. 使用Bearer Token认证
jsonData:
  httpHeaderName1: 'Authorization'
secureJsonData:
  httpHeaderValue1: 'Bearer YOUR_TOKEN'

查询性能问题

问题1:查询响应慢

# 问题查询(扫描大量数据)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[1h])) by (job_name, task_name, operator_name, subtask_index)

# 优化后(减少标签维度)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name, operator_name)

# 使用Recording Rules预聚合
job:flink_records_in_rate:sum

问题2:Dashboard加载慢

# 排查步骤
# 1. 检查面板数量(建议<20个)
# 2. 检查查询复杂度
# 3. 启用查询缓存
# 4. 减少默认时间范围

# 优化方案
# 1. 拆分Dashboard
# 2. 使用变量过滤数据
# 3. 调整刷新频率
refresh: 30s -> 1m

告警不触发

问题1:告警规则不生效

# 检查告警规则状态
# Grafana UI: Alerting > Alert rules

# 常见原因
# 1. 查询返回空结果
# 2. for持续时间未满足
# 3. 告警通知渠道配置错误

# 调试方法
# 1. 在Dashboard中测试查询
# 2. 检查告警历史记录
# 3. 查看Grafana日志

问题2:告警风暴

# 问题:短时间内收到大量重复告警

# 解决方案
# 1. 配置告警分组
route:
  group_by: ['alertname', 'job_name']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h

# 2. 配置静默规则
silences:
  - matchers:
      - name: alertname
        value: FlinkJobDown
    startsAt: 2025-03-19T10:00:00Z
    endsAt: 2025-03-19T12:00:00Z
    comment: "维护窗口"

总结

Grafana作为可视化监控平台,在Flink等大数据场景下发挥着关键作用。通过合理的Dashboard设计、精准的指标监控和及时的告警响应,可以有效保障Flink作业的稳定运行。结合Grafana 12的新特性,如可观测性即代码和动态Dashboard,能够进一步提升监控系统的可维护性和灵活性。

关键要点

  1. 指标选择:关注核心指标(吞吐量、延迟、反压、Checkpoint),避免指标过载
  2. 告警设计:合理设置告警阈值和分级,避免告警疲劳
  3. 性能优化:使用Recording Rules、查询缓存、合理的刷新频率
  4. 团队协作:通过Git管理Dashboard配置,实现版本控制和CI/CD
  5. 持续改进:根据实际运行情况不断优化Dashboard和告警规则

参考来源: