Grafana可视化监控平台
目录
概述
什么是Grafana
Grafana是一个开源的可视化和可观测性平台,允许用户查询、可视化、告警和理解指标数据,无论数据存储在何处。它已成为监控领域的事实标准,被广泛应用于基础设施监控、应用性能监控、业务指标分析等场景。
核心价值:
- 统一可视化:将来自不同数据源的指标统一展示
- 实时监控:提供实时数据刷新和告警能力
- 灵活扩展:支持插件机制,可扩展数据源和可视化组件
- 开源免费:社区活跃,企业版提供更多高级特性
核心特性
| 特性类别 | 功能描述 | 应用价值 |
|---|---|---|
| 多数据源支持 | 支持Prometheus、InfluxDB、Elasticsearch等50+数据源 | 统一监控平台,避免工具碎片化 |
| 丰富的可视化 | 时序图、柱状图、热力图、地图、表格等多种图表类型 | 满足不同场景的展示需求 |
| 告警系统 | 灵活的告警规则、多种通知渠道(邮件、Slack、钉钉等) | 及时发现和响应问题 |
| Dashboard模板 | 变量、重复面板、链接跳转等高级功能 | 提高Dashboard复用性和交互性 |
| 权限管理 | 组织、团队、用户级别的权限控制 | 满足企业级安全需求 |
| 插件生态 | 丰富的社区插件和自定义插件开发能力 | 扩展平台能力 |
应用场景
1. 基础设施监控
- 服务器CPU、内存、磁盘、网络监控
- Kubernetes集群监控
- 云资源监控(AWS、Azure、GCP)
2. 应用性能监控(APM)
- 应用响应时间、吞吐量、错误率
- JVM监控(堆内存、GC、线程)
- 数据库性能监控
3. 大数据平台监控
- Flink作业监控(本文重点)
- Spark作业监控
- Kafka集群监控
- HDFS、HBase监控
4. 业务指标监控
- 订单量、交易额、用户活跃度
- 业务漏斗分析
- SLA监控
架构设计
核心组件
Grafana采用模块化架构设计,主要由以下核心组件构成:
React应用] B[Dashboard编辑器] C[可视化渲染引擎] end subgraph "后端层" D[API Server
Go Web服务] E[认证授权模块] F[查询引擎] G[告警引擎] end subgraph "数据层" H[SQLite/MySQL/PostgreSQL
元数据存储] I[数据源连接器] end subgraph "外部数据源" J[Prometheus] K[InfluxDB] L[Elasticsearch] M[MySQL/PostgreSQL] N[其他数据源] end A --> D B --> D C --> D D --> E D --> F D --> G D --> H F --> I I --> J I --> K I --> L I --> M I --> N style A fill:#e3f2fd style D fill:#fff3e0 style H fill:#f3e5f5 style J fill:#e8f5e9
组件职责说明:
| 组件 | 技术栈 | 主要职责 | 性能考量 |
|---|---|---|---|
| Web UI | React + TypeScript | 用户交互界面、Dashboard渲染 | 前端性能优化、懒加载 |
| API Server | Go语言 | 处理API请求、认证授权、数据查询协调 | 并发处理能力、连接池管理 |
| 查询引擎 | Go语言 | 查询优化、数据聚合、缓存管理 | 查询并发控制、结果缓存 |
| 告警引擎 | Go语言 | 告警规则评估、通知发送 | 告警频率控制、通知去重 |
| 数据源连接器 | 插件机制 | 与各类数据源通信、数据格式转换 | 连接池、超时控制 |
| 元数据存储 | SQLite/MySQL/PostgreSQL | 存储Dashboard、用户、配置等 | 数据库索引优化 |
数据流转
查询数据流程:
关键流程说明:
- 认证鉴权:每个请求都会经过认证中间件验证
- 查询优化:后端会对查询进行优化,如时间范围调整、采样率控制
- 并发控制:限制同时执行的查询数量,防止数据源过载
- 缓存策略:根据查询时间范围和刷新频率决定是否缓存
- 数据转换:将不同数据源的格式统一转换为Grafana内部格式
架构演进
Grafana 11 → Grafana 12 架构升级:
| 演进方向 | Grafana 11 | Grafana 12 | 改进价值 |
|---|---|---|---|
| Dashboard架构 | 传统Dashboard模型 | Scenes库驱动的Dashboard | 更稳定、更灵活、支持动态Dashboard |
| 可观测性 | 分散的Explore功能 | 统一的Drilldown体验 | 从指标快速下钻到日志和追踪 |
| 代码化管理 | 手动导入导出 | Git Sync + Dashboard as Code | 版本控制、CI/CD集成 |
| 性能优化 | 传统表格渲染 | 优化的表格和地图组件 | 大数据量下性能提升10倍+ |
| AI能力 | 无 | Grafana Assistant(预览) | AI辅助查询和Dashboard创建 |
数据源集成
支持的数据源
Grafana支持50+种数据源,涵盖时序数据库、关系型数据库、日志系统、云服务等:
| 数据源类型 | 典型代表 | 主要用途 | 查询语言 |
|---|---|---|---|
| 时序数据库 | Prometheus、InfluxDB、Graphite | 指标监控、性能分析 | PromQL、InfluxQL |
| 关系型数据库 | MySQL、PostgreSQL、SQL Server | 业务数据查询 | SQL |
| 日志系统 | Elasticsearch、Loki | 日志分析、全文搜索 | Lucene、LogQL |
| 云服务 | CloudWatch、Azure Monitor | 云资源监控 | 各云厂商查询语法 |
| APM系统 | Jaeger、Tempo、Zipkin | 分布式追踪 | TraceQL |
| 大数据 | ClickHouse、Druid | 大规模数据分析 | SQL |
Prometheus集成
Prometheus是Grafana最常用的数据源,特别适合监控场景。
配置步骤:
- 添加数据源:
# Grafana配置文件方式
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
jsonData:
timeInterval: 5s
queryTimeout: 60s
httpMethod: POST
- 关键配置项:
| 配置项 | 说明 | 推荐值 | 影响 |
|---|---|---|---|
| access | 访问模式(proxy/direct) | proxy | proxy模式更安全,避免跨域问题 |
| timeInterval | 最小查询间隔 | 5s-15s | 影响查询精度和性能 |
| queryTimeout | 查询超时时间 | 60s | 防止慢查询阻塞 |
| httpMethod | HTTP方法 | POST | POST支持更长的查询语句 |
- PromQL查询示例:
# CPU使用率
100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)
# 内存使用率
(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100
# 请求QPS
rate(http_requests_total[5m])
# P95延迟
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
其他常用数据源
InfluxDB:
- 适用场景:IoT数据、高频时序数据
- 查询语言:InfluxQL或Flux
- 特点:写入性能强、支持连续查询
Elasticsearch:
- 适用场景:日志分析、全文搜索
- 查询语言:Lucene查询语法
- 特点:强大的聚合能力、支持复杂查询
MySQL/PostgreSQL:
- 适用场景:业务指标、报表数据
- 查询语言:标准SQL
- 特点:灵活的关联查询、支持复杂计算
查询语法详解
PromQL基础语法
PromQL(Prometheus Query Language)是Prometheus的查询语言,也是Grafana中最常用的查询语言。
1. 基本查询结构
# 即时向量查询(Instant Vector)- 返回最新的时间点数据
metric_name{label1="value1", label2="value2"}
# 范围向量查询(Range Vector)- 返回一段时间范围内的数据
metric_name{label1="value1"}[5m]
# 标量查询(Scalar)- 返回单个数值
scalar(metric_name)
2. 选择器(Selectors)
| 选择器类型 | 语法 | 示例 | 说明 |
|---|---|---|---|
| 精确匹配 | label="value" | job="flink" | 标签值完全匹配 |
| 正则匹配 | label=~"regex" | job=~"flink.*" | 标签值匹配正则表达式 |
| 不等于 | label!="value" | job!="test" | 标签值不等于 |
| 正则不匹配 | label!~"regex" | job!~"test.*" | 标签值不匹配正则 |
# 组合使用多个选择器
flink_taskmanager_job_task_operator_numRecordsIn{
job_name="streaming-job",
task_name=~"Source.*",
host!="test-host"
}
3. 时间范围选择器
# 时间单位:s(秒), m(分钟), h(小时), d(天), w(周), y(年)
metric_name[5m] # 最近5分钟
metric_name[1h] # 最近1小时
metric_name[7d] # 最近7天
# 时间偏移(Offset)
metric_name offset 5m # 5分钟前的数据
metric_name[5m] offset 1h # 1小时前的5分钟数据
PromQL函数
1. 速率计算函数
| 函数 | 用途 | 适用场景 | 示例 |
|---|---|---|---|
| rate() | 计算每秒平均增长率 | Counter类型指标 | rate(http_requests_total[5m]) |
| irate() | 计算瞬时增长率 | 需要更高灵敏度的场景 | irate(http_requests_total[5m]) |
| increase() | 计算时间范围内的增长量 | 统计总增长 | increase(http_requests_total[1h]) |
| delta() | 计算Gauge类型的变化量 | Gauge指标 | delta(temperature[1h]) |
| deriv() | 计算导数(变化率) | 预测趋势 | deriv(disk_usage[1h]) |
# rate vs irate 对比
# rate: 计算5分钟内的平均速率,更平滑
rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
# irate: 只使用最后两个数据点,更敏感
irate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
# increase: 计算1小时内的总增长
increase(flink_taskmanager_job_task_operator_numRecordsIn[1h])
2. 聚合函数
| 函数 | 说明 | 保留标签 | 示例 |
|---|---|---|---|
| sum() | 求和 | by子句指定的标签 | sum(metric) by (label) |
| avg() | 平均值 | by子句指定的标签 | avg(metric) by (label) |
| min() | 最小值 | by子句指定的标签 | min(metric) by (label) |
| max() | 最大值 | by子句指定的标签 | max(metric) by (label) |
| count() | 计数 | by子句指定的标签 | count(metric) by (label) |
| stddev() | 标准差 | by子句指定的标签 | stddev(metric) by (label) |
| stdvar() | 方差 | by子句指定的标签 | stdvar(metric) by (label) |
| topk() | 前K个最大值 | 所有标签 | topk(5, metric) |
| bottomk() | 前K个最小值 | 所有标签 | bottomk(5, metric) |
| quantile() | 分位数 | by子句指定的标签 | quantile(0.95, metric) |
# 按job_name聚合,计算每个作业的总吞吐量
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name)
# 计算所有TaskManager的平均内存使用率
avg(flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) * 100
# 找出吞吐量最高的5个算子
topk(5, rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]))
# 计算P95延迟
quantile(0.95, flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency)
# without: 排除指定标签,保留其他所有标签
sum(metric) without (instance, host)
3. 数学运算函数
| 函数 | 说明 | 示例 |
|---|---|---|
| abs() | 绝对值 | abs(delta(metric[5m])) |
| ceil() | 向上取整 | ceil(metric) |
| floor() | 向下取整 | floor(metric) |
| round() | 四舍五入 | round(metric, 0.01) |
| sqrt() | 平方根 | sqrt(metric) |
| exp() | e的指数 | exp(metric) |
| ln() | 自然对数 | ln(metric) |
| log2() | 以2为底的对数 | log2(metric) |
| log10() | 以10为底的对数 | log10(metric) |
4. 时间和日期函数
# 当前时间戳
time()
# 一天中的小时(0-23)
hour()
# 一周中的星期几(0-6,0是周日)
day_of_week()
# 一个月中的第几天(1-31)
day_of_month()
# 一年中的第几天(1-365)
day_of_year()
# 月份(1-12)
month()
# 年份
year()
# 示例:只在工作日(周一到周五)触发告警
hour() >= 9 and hour() <= 18 and day_of_week() >= 1 and day_of_week() <= 5
5. 预测和趋势函数
# predict_linear: 线性预测未来值
# 预测1小时后的磁盘使用量
predict_linear(disk_usage[1h], 3600)
# holt_winters: 霍尔特-温特斯平滑预测(适合有季节性的数据)
holt_winters(metric[1d], 0.5, 0.5)
# 示例:预测4小时后磁盘是否会满
predict_linear(node_filesystem_avail_bytes[1h], 4*3600) < 0
6. 标签操作函数
# label_replace: 替换或添加标签
label_replace(
metric,
"new_label", # 新标签名
"$1", # 新标签值(可使用正则捕获组)
"source_label", # 源标签名
"(.*)" # 正则表达式
)
# label_join: 连接多个标签值
label_join(
metric,
"new_label", # 新标签名
"-", # 分隔符
"label1", "label2" # 要连接的标签
)
# 示例:从instance标签提取主机名
label_replace(
up,
"hostname",
"$1",
"instance",
"([^:]+):.*"
)
7. 排序和限制函数
# sort: 升序排序
sort(metric)
# sort_desc: 降序排序
sort_desc(metric)
# topk: 返回前K个最大值
topk(10, metric)
# bottomk: 返回前K个最小值
bottomk(10, metric)
# 示例:找出CPU使用率最高的10个节点
topk(10, 100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100))
PromQL运算符
1. 算术运算符
# 加法
metric1 + metric2
# 减法
metric1 - metric2
# 乘法
metric1 * metric2
# 除法
metric1 / metric2
# 取模
metric1 % metric2
# 幂运算
metric1 ^ metric2
# 示例:计算内存使用率
(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100
2. 比较运算符
| 运算符 | 说明 | 返回值 |
|---|---|---|
| == | 等于 | 0或1 |
| != | 不等于 | 0或1 |
| > | 大于 | 0或1 |
| < | 小于 | 0或1 |
| >= | 大于等于 | 0或1 |
| <= | 小于等于 | 0或1 |
# 过滤:只返回CPU使用率大于80%的节点
node_cpu_usage > 80
# bool修饰符:返回0或1而不是过滤
node_cpu_usage > bool 80
# 示例:检查内存使用率是否超过阈值
(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.85
3. 逻辑运算符
# and: 交集(两边都存在的时间序列)
metric1 and metric2
# or: 并集(任意一边存在的时间序列)
metric1 or metric2
# unless: 差集(左边存在但右边不存在的时间序列)
metric1 unless metric2
# 示例:CPU高且内存也高的节点
(node_cpu_usage > 80) and (node_memory_usage > 80)
# 示例:有数据的指标或默认值0
metric or vector(0)
4. 向量匹配
# one-to-one匹配(默认)
metric1 + metric2
# 指定匹配标签
metric1 + on(label1, label2) metric2
# 忽略某些标签
metric1 + ignoring(label1, label2) metric2
# many-to-one匹配
metric1 + on(label1) group_left metric2
# one-to-many匹配
metric1 + on(label1) group_right metric2
# 示例:计算每个节点的网络总流量
sum(rate(node_network_receive_bytes_total[5m])) by (instance) +
sum(rate(node_network_transmit_bytes_total[5m])) by (instance)
高级查询技巧
1. 子查询
# 语法:<query>[<range>:<resolution>]
# 计算过去1小时内,每5分钟的最大CPU使用率
max_over_time(
rate(node_cpu_seconds_total[5m])[1h:5m]
)
# 示例:检测CPU使用率的波动
stddev_over_time(
rate(node_cpu_seconds_total[1m])[10m:1m]
) > 0.1
2. 直方图和分位数
# histogram_quantile: 从直方图计算分位数
histogram_quantile(
0.95, # P95
rate(http_request_duration_seconds_bucket[5m])
)
# 按路径分组计算P95延迟
histogram_quantile(
0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le, path)
)
# 计算多个分位数
histogram_quantile(0.50, rate(metric_bucket[5m])) or
histogram_quantile(0.90, rate(metric_bucket[5m])) or
histogram_quantile(0.99, rate(metric_bucket[5m]))
3. 缺失数据处理
# 使用默认值填充缺失数据
metric or vector(0)
# 使用上一个有效值填充
metric or metric offset 1m
# 检测数据是否缺失
absent(metric{label="value"})
# 示例:如果指标不存在则告警
absent(up{job="flink"}) == 1
4. 复杂聚合示例
# 计算每个作业的平均处理延迟
avg(
histogram_quantile(0.95,
sum(rate(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_bucket[5m]))
by (job_name, le)
)
) by (job_name)
# 检测数据倾斜:同一算子不同subtask的处理量差异
(
max(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name) -
min(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name)
) /
avg(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name) > 0.5
# 计算作业的整体健康分数(0-100)
(
(flink_jobmanager_job_uptime > 0) * 40 + # 运行状态 40分
(1 - rate(flink_jobmanager_job_numRestarts[1h])) * 30 + # 稳定性 30分
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))) * 30 # Checkpoint成功率 30分
)
Grafana变量在查询中的使用
# 使用单选变量
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name="$job_name"}[5m])
# 使用多选变量(正则匹配)
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=~"$job_name"}[5m])
# 使用interval变量(自动调整采样率)
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__interval])
# 使用时间范围变量
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__range])
# 组合使用多个变量
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{
job_name=~"$job_name",
task_name=~"$task_name",
host=~"$host"
}[$__interval])) by (operator_name)
Dashboard仪表盘
Dashboard基础
Dashboard是Grafana的核心概念,由多个Panel(面板)组成,每个Panel展示一个可视化图表。
Dashboard结构:
Dashboard JSON结构:
{
"dashboard": {
"title": "Flink作业监控",
"uid": "flink-job-monitor",
"tags": ["flink", "streaming"],
"timezone": "browser",
"refresh": "30s",
"time": {
"from": "now-1h",
"to": "now"
},
"panels": [
{
"id": 1,
"type": "graph",
"title": "Records In/Out Rate",
"targets": [
{
"expr": "rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])",
"legendFormat": "{{job_name}} - {{task_name}}"
}
]
}
]
}
}
可视化组件
Grafana提供丰富的可视化组件,满足不同场景需求:
| 组件类型 | 适用场景 | 关键配置 | 使用建议 |
|---|---|---|---|
| Time Series(时序图) | 趋势分析、性能监控 | 线条样式、填充、堆叠 | 最常用,适合展示指标变化趋势 |
| Bar Chart(柱状图) | 对比分析、排名 | 方向、分组、排序 | 适合展示Top N、分类对比 |
| Stat(单值显示) | 关键指标、KPI | 阈值、颜色、单位 | 突出显示核心指标 |
| Gauge(仪表盘) | 百分比、容量 | 最小/最大值、阈值 | 直观展示使用率 |
| Table(表格) | 详细数据、列表 | 列配置、排序、过滤 | 适合展示多维度数据 |
| Heatmap(热力图) | 分布分析、延迟 | 颜色方案、桶大小 | 展示数据分布和密度 |
| Geomap(地图) | 地理位置、区域 | 地图层、标记 | 展示地理分布 |
时序图配置示例:
{
"type": "timeseries",
"title": "Flink Checkpoint Duration",
"fieldConfig": {
"defaults": {
"unit": "ms",
"color": {
"mode": "palette-classic"
},
"custom": {
"lineWidth": 2,
"fillOpacity": 10,
"showPoints": "never"
},
"thresholds": {
"mode": "absolute",
"steps": [
{"value": null, "color": "green"},
{"value": 5000, "color": "yellow"},
{"value": 10000, "color": "red"}
]
}
}
}
}
变量与模板
变量使Dashboard更加灵活和可复用,支持动态切换监控对象。
变量类型:
| 变量类型 | 数据来源 | 使用场景 | 示例 |
|---|---|---|---|
| Query | 数据源查询 | 动态获取实例、作业列表 | label_values(flink_jobmanager_job_uptime, job_name) |
| Custom | 手动定义 | 固定选项列表 | 环境:dev, test, prod |
| Constant | 常量 | 全局配置 | 集群名称、区域 |
| Interval | 时间间隔 | 动态调整采样率 | 1m, 5m, 15m, 1h |
| Data source | 数据源列表 | 切换数据源 | Prometheus实例切换 |
变量定义示例:
{
"templating": {
"list": [
{
"name": "job_name",
"type": "query",
"datasource": "Prometheus",
"query": "label_values(flink_jobmanager_job_uptime, job_name)",
"refresh": 1,
"multi": true,
"includeAll": true
},
{
"name": "interval",
"type": "interval",
"query": "1m,5m,15m,30m,1h",
"auto": true,
"auto_count": 30
}
]
}
}
变量使用:
# 在查询中使用变量
rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=~"$job_name"}[$interval])
# 在Panel标题中使用
title: "Records In Rate - $job_name"
# 在Legend中使用
legendFormat: "{{job_name}} - {{task_name}}"
告警机制
告警规则配置
Grafana告警系统支持多种告警规则类型,可以基于查询结果触发告警。
告警规则结构:
# 告警规则示例
apiVersion: 1
groups:
- name: flink_alerts
interval: 30s
rules:
- alert: FlinkJobDown
expr: flink_jobmanager_job_uptime == 0
for: 5m
labels:
severity: critical
component: flink
annotations:
summary: "Flink作业 {{ $labels.job_name }} 已停止"
description: "作业已停止超过5分钟,请立即检查"
告警状态流转:
通知渠道
Grafana支持多种告警通知渠道:
| 通知渠道 | 适用场景 | 配置复杂度 | 特点 |
|---|---|---|---|
| 通用场景 | 低 | 支持HTML格式、附件 | |
| Slack | 团队协作 | 低 | 实时通知、支持交互 |
| 钉钉/企业微信 | 国内团队 | 中 | 需要配置Webhook |
| PagerDuty | 值班管理 | 中 | 支持升级策略、排班 |
| Webhook | 自定义集成 | 高 | 灵活性最高 |
| Telegram | 个人通知 | 低 | 轻量级、跨平台 |
钉钉通知配置示例:
{
"name": "DingTalk",
"type": "webhook",
"settings": {
"url": "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN",
"httpMethod": "POST",
"username": "Grafana Alert"
}
}
告警最佳实践
1. 告警分级:
| 级别 | 响应时间 | 通知方式 | 典型场景 |
|---|---|---|---|
| Critical | 立即响应(5分钟内) | 电话+短信+IM | 服务完全不可用、数据丢失 |
| Warning | 30分钟内响应 | IM+邮件 | 性能下降、资源使用率高 |
| Info | 工作时间处理 | 邮件 | 配置变更、版本升级 |
2. 告警降噪策略:
- 静默规则:维护窗口期间自动静默告警
- 分组聚合:相同类型告警合并通知
- 抑制规则:上游故障时抑制下游告警
- 去重:相同告警在时间窗口内只发送一次
3. 告警模板优化:
// 告警消息模板
{{ define "alert.title" }}
[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}
{{ end }}
{{ define "alert.message" }}
**告警详情**:
- 作业名称:{{ .Labels.job_name }}
- 严重程度:{{ .Labels.severity }}
- 触发时间:{{ .StartsAt.Format "2006-01-02 15:04:05" }}
- 当前值:{{ .Values.B }}
- 阈值:{{ .Values.C }}
**问题描述**:
{{ .Annotations.description }}
**处理建议**:
{{ .Annotations.runbook_url }}
{{ end }}
Flink监控实战
Flink与Prometheus集成
Flink原生支持Prometheus指标上报,通过PrometheusReporter实现。
1. Flink配置:
# flink-conf.yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9250-9260
metrics.reporters: prom
# 启用详细网络指标
taskmanager.network.detailed-metrics: true
# 指标上报间隔
metrics.reporter.prom.interval: 10 SECONDS
2. Kubernetes部署配置:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-job-example
spec:
flinkConfiguration:
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9250-9260
metrics.reporters: prom
taskmanager.network.detailed-metrics: "true"
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
3. Prometheus抓取配置:
# prometheus.yml
scrape_configs:
- job_name: 'flink'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- flink-namespace
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_type]
action: keep
regex: flink-native-kubernetes
- source_labels: [__meta_kubernetes_pod_ip]
action: replace
target_label: __address__
replacement: '$1:9250'
scrape_interval: 15s
scrape_timeout: 10s
4. PodMonitor配置(Prometheus Operator):
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: flink-pod-monitor
labels:
release: prometheus
spec:
namespaceSelector:
matchNames:
- flink-namespace
selector:
matchLabels:
type: flink-native-kubernetes
podMetricsEndpoints:
- path: /
port: metrics
interval: 15s
relabelings:
- action: replace
sourceLabels: [__meta_kubernetes_pod_ip]
targetLabel: __address__
replacement: '$1:9250'
Flink核心指标
Flink提供了丰富的监控指标,涵盖作业运行、资源使用、数据处理等各个方面。
1. 作业级别指标:
| 指标名称 | 指标含义 | PromQL示例 | 告警阈值建议 |
|---|---|---|---|
| flink_jobmanager_job_uptime | 作业运行时间 | flink_jobmanager_job_uptime | == 0(作业停止) |
| flink_jobmanager_job_numRestarts | 作业重启次数 | rate(flink_jobmanager_job_numRestarts[5m]) | > 0(频繁重启) |
| flink_jobmanager_job_lastCheckpointDuration | 最近Checkpoint耗时 | flink_jobmanager_job_lastCheckpointDuration | > 60000ms |
| flink_jobmanager_job_lastCheckpointSize | 最近Checkpoint大小 | flink_jobmanager_job_lastCheckpointSize | 持续增长 |
| flink_jobmanager_job_numberOfFailedCheckpoints | Checkpoint失败次数 | rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) | > 0 |
2. 算子级别指标:
| 指标名称 | 指标含义 | 监控重点 | 优化方向 |
|---|---|---|---|
| numRecordsIn | 输入记录数 | 吞吐量、数据倾斜 | 增加并行度、优化分区 |
| numRecordsOut | 输出记录数 | 数据处理效率 | 优化算子逻辑 |
| numRecordsInPerSecond | 每秒输入记录数 | 实时吞吐量 | 扩容、优化上游 |
| numBytesIn | 输入字节数 | 网络带宽 | 数据压缩、序列化优化 |
| currentInputWatermark | 当前输入水位线 | 事件时间进度 | 调整水位线策略 |
| numLateRecordsDropped | 丢弃的迟到数据 | 数据完整性 | 增加允许延迟时间 |
3. TaskManager资源指标:
| 指标名称 | 指标含义 | PromQL查询 |
|---|---|---|
| JVM堆内存使用 | 堆内存使用率 | flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max * 100 |
| JVM非堆内存 | 非堆内存使用 | flink_taskmanager_Status_JVM_Memory_NonHeap_Used |
| GC时间 | GC耗时 | rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Time[5m]) |
| GC次数 | GC频率 | rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Count[5m]) |
| 线程数 | 活跃线程数 | flink_taskmanager_Status_JVM_Threads_Count |
| CPU使用率 | CPU负载 | flink_taskmanager_Status_JVM_CPU_Load |
4. 反压指标:
# 反压比例(0-1之间,越接近1反压越严重)
flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000
# 繁忙比例
flink_taskmanager_job_task_busyTimeMsPerSecond / 1000
# 空闲比例
flink_taskmanager_job_task_idleTimeMsPerSecond / 1000
反压分析:
| 反压比例 | 状态 | 原因分析 | 处理建议 |
|---|---|---|---|
| < 0.1 | 健康 | 处理能力充足 | 无需处理 |
| 0.1 - 0.5 | 轻微反压 | 短暂的数据峰值 | 观察趋势 |
| 0.5 - 0.8 | 中度反压 | 下游处理能力不足 | 增加并行度、优化算子 |
| > 0.8 | 严重反压 | 系统瓶颈 | 紧急扩容、排查慢算子 |
5. Kafka消费指标:
# Kafka消费延迟(Lag)
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max
# Kafka消费速率
rate(flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_total[5m])
# Kafka分区数
flink_taskmanager_job_task_operator_KafkaConsumer_assigned_partitions
6. 状态后端指标
Flink状态后端的监控对于理解作业的状态管理性能至关重要。
| 指标分类 | 指标名称 | 指标含义 | PromQL示例 |
|---|---|---|---|
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_bytes_read | RocksDB读取字节数 | rate(flink_taskmanager_job_task_operator_rocksdb_bytes_read[5m]) |
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_bytes_written | RocksDB写入字节数 | rate(flink_taskmanager_job_task_operator_rocksdb_bytes_written[5m]) |
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_compaction_pending | 待压缩的文件数 | flink_taskmanager_job_task_operator_rocksdb_compaction_pending |
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_mem_table_flush_pending | 待刷新的MemTable数 | flink_taskmanager_job_task_operator_rocksdb_mem_table_flush_pending |
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_num_running_compactions | 正在运行的压缩任务数 | flink_taskmanager_job_task_operator_rocksdb_num_running_compactions |
| RocksDB状态 | flink_taskmanager_job_task_operator_rocksdb_num_running_flushes | 正在运行的刷新任务数 | flink_taskmanager_job_task_operator_rocksdb_num_running_flushes |
| 状态大小 | flink_taskmanager_job_task_operator_state_size | 算子状态大小 | flink_taskmanager_job_task_operator_state_size |
RocksDB性能分析:
# RocksDB读写比例
rate(flink_taskmanager_job_task_operator_rocksdb_bytes_read[5m]) /
rate(flink_taskmanager_job_task_operator_rocksdb_bytes_written[5m])
# RocksDB压缩压力
flink_taskmanager_job_task_operator_rocksdb_compaction_pending +
flink_taskmanager_job_task_operator_rocksdb_num_running_compactions
# 状态大小增长率
deriv(flink_taskmanager_job_task_operator_state_size[30m])
7. 网络指标
网络指标反映了TaskManager之间的数据传输情况。
| 指标名称 | 指标含义 | 监控重点 | PromQL示例 |
|---|---|---|---|
| numBytesInLocal | 本地接收字节数 | 本地数据传输 | rate(flink_taskmanager_job_task_numBytesInLocal[5m]) |
| numBytesInRemote | 远程接收字节数 | 跨节点数据传输 | rate(flink_taskmanager_job_task_numBytesInRemote[5m]) |
| numBytesOut | 输出字节数 | 数据发送量 | rate(flink_taskmanager_job_task_numBytesOut[5m]) |
| numBuffersInLocal | 本地接收缓冲区数 | 本地缓冲区使用 | rate(flink_taskmanager_job_task_numBuffersInLocal[5m]) |
| numBuffersInRemote | 远程接收缓冲区数 | 远程缓冲区使用 | rate(flink_taskmanager_job_task_numBuffersInRemote[5m]) |
| numBuffersOut | 输出缓冲区数 | 缓冲区发送量 | rate(flink_taskmanager_job_task_numBuffersOut[5m]) |
| inputQueueLength | 输入队列长度 | 输入队列积压 | flink_taskmanager_job_task_buffers_inputQueueLength |
| outputQueueLength | 输出队列长度 | 输出队列积压 | flink_taskmanager_job_task_buffers_outputQueueLength |
| inPoolUsage | 输入缓冲池使用率 | 输入缓冲池压力 | flink_taskmanager_job_task_buffers_inPoolUsage |
| outPoolUsage | 输出缓冲池使用率 | 输出缓冲池压力 | flink_taskmanager_job_task_buffers_outPoolUsage |
网络性能分析:
# 网络传输总带宽
sum(rate(flink_taskmanager_job_task_numBytesInLocal[5m])) +
sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])) +
sum(rate(flink_taskmanager_job_task_numBytesOut[5m]))
# 远程传输比例(高比例可能导致性能下降)
sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])) /
(sum(rate(flink_taskmanager_job_task_numBytesInLocal[5m])) +
sum(rate(flink_taskmanager_job_task_numBytesInRemote[5m])))
# 缓冲区积压检测
max(flink_taskmanager_job_task_buffers_inputQueueLength) > 100 or
max(flink_taskmanager_job_task_buffers_outputQueueLength) > 100
8. Checkpoint详细指标
Checkpoint是Flink容错机制的核心,需要重点监控。
| 指标名称 | 指标含义 | 告警阈值 | PromQL示例 |
|---|---|---|---|
| lastCheckpointDuration | 最近Checkpoint耗时 | > 60s | flink_jobmanager_job_lastCheckpointDuration |
| lastCheckpointSize | 最近Checkpoint大小 | 持续增长 | flink_jobmanager_job_lastCheckpointSize |
| lastCheckpointExternalPath | 外部Checkpoint路径 | - | flink_jobmanager_job_lastCheckpointExternalPath |
| lastCheckpointRestoreTimestamp | 最近恢复时间戳 | - | flink_jobmanager_job_lastCheckpointRestoreTimestamp |
| lastCheckpointAlignmentBuffered | Checkpoint对齐缓冲数据量 | > 100MB | flink_jobmanager_job_lastCheckpointAlignmentBuffered |
| numberOfCompletedCheckpoints | 完成的Checkpoint数 | - | flink_jobmanager_job_numberOfCompletedCheckpoints |
| numberOfFailedCheckpoints | 失败的Checkpoint数 | > 0 | flink_jobmanager_job_numberOfFailedCheckpoints |
| numberOfInProgressCheckpoints | 进行中的Checkpoint数 | > 1 | flink_jobmanager_job_numberOfInProgressCheckpoints |
| totalNumberOfCheckpoints | Checkpoint总数 | - | flink_jobmanager_job_totalNumberOfCheckpoints |
Checkpoint健康度评估:
# Checkpoint成功率
rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m])) * 100
# Checkpoint耗时趋势(检测是否持续增长)
deriv(flink_jobmanager_job_lastCheckpointDuration[1h]) > 1000
# Checkpoint大小增长率(每小时)
rate(flink_jobmanager_job_lastCheckpointSize[1h])
# Checkpoint对齐时间占比(高比例表示反压严重)
flink_jobmanager_job_lastCheckpointAlignmentBuffered /
flink_jobmanager_job_lastCheckpointSize > 0.3
9. 窗口和水位线指标
对于使用事件时间和窗口的作业,这些指标至关重要。
| 指标名称 | 指标含义 | 监控重点 | PromQL示例 |
|---|---|---|---|
| currentInputWatermark | 当前输入水位线 | 水位线进度 | flink_taskmanager_job_task_operator_currentInputWatermark |
| currentOutputWatermark | 当前输出水位线 | 水位线传播 | flink_taskmanager_job_task_operator_currentOutputWatermark |
| watermarkLag | 水位线延迟 | 事件时间延迟 | time() * 1000 - flink_taskmanager_job_task_operator_currentInputWatermark |
| numLateRecordsDropped | 丢弃的迟到记录数 | 数据丢失 | rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m]) |
| latenessHistogram | 延迟分布直方图 | 延迟分布 | flink_taskmanager_job_task_operator_latenessHistogram |
水位线分析:
# 水位线延迟(毫秒)
(time() * 1000) - flink_taskmanager_job_task_operator_currentInputWatermark
# 水位线停滞检测(5分钟内水位线没有前进)
delta(flink_taskmanager_job_task_operator_currentInputWatermark[5m]) == 0
# 不同算子的水位线差异(检测水位线传播问题)
max(flink_taskmanager_job_task_operator_currentOutputWatermark) by (operator_name) -
min(flink_taskmanager_job_task_operator_currentOutputWatermark) by (operator_name)
# 迟到数据比例
rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m]) /
rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]) * 100
10. 自定义指标
Flink支持用户自定义指标,常见的自定义指标包括:
| 指标类型 | 使用场景 | 示例 |
|---|---|---|
| Counter | 计数器,累加值 | 处理的订单数、错误次数 |
| Gauge | 瞬时值 | 当前队列长度、连接数 |
| Histogram | 分布统计 | 处理延迟分布、数据大小分布 |
| Meter | 速率统计 | 每秒处理的事件数 |
// 自定义Counter示例
public class MyMapFunction extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration parameters) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) {
counter.inc();
return value.toUpperCase();
}
}
// 自定义Gauge示例
getRuntimeContext()
.getMetricGroup()
.gauge("queueSize", () -> queue.size());
// 自定义Histogram示例
Histogram histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DescriptiveStatisticsHistogram(100));
histogram.update(value);
11. 完整的指标层次结构
Flink指标按照以下层次组织:
<host>.<taskmanager_id>.<job_name>.<operator_name>.<subtask_index>.<metric_name>
指标命名规范:
| 层级 | 说明 | 示例 |
|---|---|---|
| host | TaskManager主机名 | flink-tm-001 |
| taskmanager_id | TaskManager ID | taskmanager-0 |
| job_name | 作业名称 | streaming-job |
| task_name | 任务名称 | Source: Kafka |
| operator_name | 算子名称 | Map |
| subtask_index | 子任务索引 | 0, 1, 2 |
| metric_name | 指标名称 | numRecordsIn |
按层级查询指标:
# 查询特定作业的所有指标
{job_name="streaming-job"}
# 查询特定算子的所有指标
{job_name="streaming-job", operator_name="Map"}
# 查询特定TaskManager的所有指标
{host="flink-tm-001"}
# 聚合所有subtask的指标
sum(flink_taskmanager_job_task_operator_numRecordsIn{job_name="streaming-job"}) by (operator_name)
12. 指标采集最佳实践
| 实践项 | 建议 | 原因 |
|---|---|---|
| 采集间隔 | 10-30秒 | 平衡实时性和系统开销 |
| 保留时间 | 原始数据7天,聚合数据30天 | 满足短期分析和长期趋势 |
| 标签数量 | 控制在10个以内 | 避免高基数问题 |
| 指标命名 | 使用统一前缀和命名规范 | 便于查询和管理 |
| 告警阈值 | 根据历史数据动态调整 | 减少误报和漏报 |
| 聚合粒度 | 使用Recording Rules预聚合 | 提高查询性能 |
Grafana Dashboard配置
完整的Flink监控Dashboard示例:
{
"dashboard": {
"title": "Flink作业监控Dashboard",
"tags": ["flink", "streaming", "production"],
"timezone": "browser",
"refresh": "30s",
"time": {
"from": "now-1h",
"to": "now"
},
"templating": {
"list": [
{
"name": "job_name",
"type": "query",
"datasource": "Prometheus",
"query": "label_values(flink_jobmanager_job_uptime, job_name)",
"refresh": 1,
"multi": false
},
{
"name": "task_name",
"type": "query",
"datasource": "Prometheus",
"query": "label_values(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\"}, task_name)",
"refresh": 1,
"multi": true,
"includeAll": true
}
]
},
"panels": [
{
"id": 1,
"type": "stat",
"title": "作业状态",
"targets": [
{
"expr": "flink_jobmanager_job_uptime{job_name=\"$job_name\"}",
"legendFormat": "运行时间"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"mappings": [
{
"type": "special",
"options": {
"match": "null",
"result": {"text": "已停止", "color": "red"}
}
}
],
"thresholds": {
"steps": [
{"value": 0, "color": "red"},
{"value": 1, "color": "green"}
]
}
}
}
},
{
"id": 2,
"type": "timeseries",
"title": "Records In/Out Rate",
"targets": [
{
"expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\",task_name=~\"$task_name\"}[1m])) by (task_name)",
"legendFormat": "{{task_name}} - In"
},
{
"expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsOut{job_name=\"$job_name\",task_name=~\"$task_name\"}[1m])) by (task_name)",
"legendFormat": "{{task_name}} - Out"
}
],
"fieldConfig": {
"defaults": {
"unit": "rps",
"custom": {
"lineWidth": 2,
"fillOpacity": 10
}
}
}
},
{
"id": 3,
"type": "timeseries",
"title": "Checkpoint Duration",
"targets": [
{
"expr": "flink_jobmanager_job_lastCheckpointDuration{job_name=\"$job_name\"}",
"legendFormat": "Checkpoint耗时"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"thresholds": {
"steps": [
{"value": null, "color": "green"},
{"value": 30000, "color": "yellow"},
{"value": 60000, "color": "red"}
]
}
}
}
},
{
"id": 4,
"type": "gauge",
"title": "反压状态",
"targets": [
{
"expr": "avg(flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_name=\"$job_name\"} / 1000)",
"legendFormat": "反压比例"
}
],
"fieldConfig": {
"defaults": {
"unit": "percentunit",
"min": 0,
"max": 1,
"thresholds": {
"steps": [
{"value": 0, "color": "green"},
{"value": 0.5, "color": "yellow"},
{"value": 0.8, "color": "red"}
]
}
}
}
},
{
"id": 5,
"type": "timeseries",
"title": "JVM堆内存使用",
"targets": [
{
"expr": "flink_taskmanager_Status_JVM_Memory_Heap_Used{job_name=\"$job_name\"} / flink_taskmanager_Status_JVM_Memory_Heap_Max{job_name=\"$job_name\"} * 100",
"legendFormat": "{{host}} - 堆内存使用率"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"max": 100,
"thresholds": {
"steps": [
{"value": null, "color": "green"},
{"value": 70, "color": "yellow"},
{"value": 85, "color": "red"}
]
}
}
}
},
{
"id": 6,
"type": "table",
"title": "算子处理详情",
"targets": [
{
"expr": "sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name=\"$job_name\"}[5m])) by (operator_name)",
"format": "table",
"instant": true
}
],
"transformations": [
{
"id": "organize",
"options": {
"excludeByName": {},
"indexByName": {},
"renameByName": {
"operator_name": "算子名称",
"Value": "处理速率(records/s)"
}
}
}
]
}
]
}
}
Dashboard布局建议:
常见监控场景
场景1:作业健康度监控
# 作业是否运行
flink_jobmanager_job_uptime > 0
# 作业重启频率(每小时)
increase(flink_jobmanager_job_numRestarts[1h])
# Checkpoint成功率
(
rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))
) * 100
# 告警规则
alert: FlinkJobUnhealthy
expr: flink_jobmanager_job_uptime == 0 or increase(flink_jobmanager_job_numRestarts[1h]) > 3
for: 5m
labels:
severity: critical
annotations:
summary: "Flink作业不健康"
场景2:性能瓶颈分析
# 识别慢算子(处理速率低)
topk(5,
rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
)
# 识别反压算子
topk(5,
flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000
) > 0.5
# 数据倾斜检测(同一算子不同subtask的处理量差异)
stddev(
rate(flink_taskmanager_job_task_operator_numRecordsIn{operator_name="MyOperator"}[5m])
) by (operator_name) /
avg(
rate(flink_taskmanager_job_task_operator_numRecordsIn{operator_name="MyOperator"}[5m])
) by (operator_name) > 0.3
场景3:资源使用监控
# TaskManager内存使用率
(
flink_taskmanager_Status_JVM_Memory_Heap_Used +
flink_taskmanager_Status_JVM_Memory_NonHeap_Used
) / (
flink_taskmanager_Status_JVM_Memory_Heap_Max +
flink_taskmanager_Status_JVM_Memory_NonHeap_Max
) * 100
# GC压力(GC时间占比)
rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[5m]) /
rate(flink_taskmanager_Status_JVM_CPU_Time[5m]) * 100
# 网络缓冲区使用率
flink_taskmanager_Status_Network_TotalMemorySegments -
flink_taskmanager_Status_Network_AvailableMemorySegments
# 告警规则
alert: HighMemoryUsage
expr: (flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) > 0.85
for: 10m
labels:
severity: warning
annotations:
summary: "TaskManager内存使用率过高"
场景4:数据处理监控
# 端到端延迟(从Kafka消费到处理完成)
histogram_quantile(0.99,
rate(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency_bucket[5m])
)
# Kafka消费延迟
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max
# 数据丢失检测(输入输出不匹配)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) -
sum(rate(flink_taskmanager_job_task_operator_numRecordsOut[5m]))
# 迟到数据量
rate(flink_taskmanager_job_task_operator_numLateRecordsDropped[5m])
场景5:Checkpoint监控
# Checkpoint耗时趋势
flink_jobmanager_job_lastCheckpointDuration
# Checkpoint大小增长
deriv(flink_jobmanager_job_lastCheckpointSize[30m])
# Checkpoint失败率
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[5m]) +
rate(flink_jobmanager_job_numberOfFailedCheckpoints[5m]))
# Checkpoint对齐时间(反映反压)
flink_jobmanager_job_lastCheckpointAlignmentBuffered
# 告警规则
alert: CheckpointTooSlow
expr: flink_jobmanager_job_lastCheckpointDuration > 60000
for: 15m
labels:
severity: warning
annotations:
summary: "Checkpoint耗时过长,可能影响故障恢复"
性能调优指导
基于Grafana监控数据,可以进行针对性的性能调优。
调优决策树:
增加并行度] F -->|内存高| H[优化状态大小
调整内存配置] F -->|正常| I[检查外部依赖
数据库/API] B -->|延迟高| J{Checkpoint耗时?} J -->|耗时长| K[优化状态后端
增加Checkpoint间隔] J -->|正常| L[检查水位线策略
窗口配置] B -->|资源不足| M{哪种资源?} M -->|内存| N[增加TaskManager内存
优化状态TTL] M -->|CPU| O[增加并行度
扩容TaskManager] M -->|网络| P[优化序列化
启用压缩] B -->|数据倾斜| Q[重新分区
自定义KeySelector] style A fill:#ffebee style G fill:#e8f5e9 style H fill:#e8f5e9 style I fill:#e8f5e9 style K fill:#e8f5e9 style L fill:#e8f5e9 style N fill:#e8f5e9 style O fill:#e8f5e9 style P fill:#e8f5e9 style Q fill:#e8f5e9
调优参数对照表:
| 监控指标异常 | 可能原因 | 调优参数 | 调整建议 |
|---|---|---|---|
| 反压比例 > 0.8 | 下游处理慢 | parallelism | 增加算子并行度 |
| Checkpoint耗时 > 60s | 状态过大 | state.backend.incremental | 启用增量Checkpoint |
| Checkpoint耗时 > 60s | 状态过大 | execution.checkpointing.interval | 增加Checkpoint间隔 |
| 堆内存使用率 > 85% | 内存不足 | taskmanager.memory.managed.size | 增加托管内存 |
| GC时间占比 > 10% | 频繁GC | taskmanager.memory.jvm-overhead.fraction | 增加JVM开销内存 |
| Kafka Lag持续增长 | 消费慢 | parallelism.default | 增加全局并行度 |
| numLateRecordsDropped > 0 | 水位线过快 | watermark.interval | 增加允许延迟时间 |
| 网络缓冲区不足 | 网络瓶颈 | taskmanager.network.memory.fraction | 增加网络内存比例 |
典型调优案例:
案例1:反压导致吞吐量下降
# 问题:监控显示某个算子反压比例0.9,吞吐量下降50%
# 分析步骤:
# 1. 查看该算子的CPU和内存使用率
# 2. 检查算子逻辑是否有慢操作(同步IO、复杂计算)
# 3. 查看下游算子的处理能力
# 解决方案:
# 方案1:增加并行度
parallelism: 4 -> 8
# 方案2:异步IO优化
dataStream
.flatMap(new MyFlatMapFunction())
.setParallelism(8)
.keyBy(...)
.process(new AsyncProcessFunction()) # 使用异步处理
# 方案3:拆分算子
# 将复杂算子拆分为多个简单算子,提高并行度
案例2:Checkpoint耗时过长
# 问题:Checkpoint耗时从10s增长到120s
# 分析:状态大小持续增长,从1GB增长到50GB
# 解决方案:
# 方案1:启用增量Checkpoint
state.backend: rocksdb
state.backend.incremental: true
# 方案2:增加Checkpoint间隔
execution.checkpointing.interval: 60s -> 300s
# 方案3:配置状态TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
高级特性
Grafana 12新特性
Grafana 12(2025年5月发布)带来了重大架构升级和新功能。
1. 可观测性即代码(Observability as Code)
支持将Dashboard、告警规则等配置以代码形式管理:
# dashboard.yaml
apiVersion: grizzly.grafana.com/v1alpha1
kind: Dashboard
metadata:
name: flink-monitoring
spec:
title: Flink作业监控
tags: [flink, production]
panels:
- type: timeseries
title: Records Rate
queries:
- expr: rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])
datasource: Prometheus
优势:
- 版本控制:使用Git管理Dashboard变更历史
- CI/CD集成:自动化部署和测试
- 团队协作:通过PR进行Dashboard审查
- 环境一致性:开发、测试、生产环境配置同步
2. Git Sync功能
# 配置Git同步
apiVersion: 1
git:
enabled: true
repository: https://github.com/your-org/grafana-dashboards.git
branch: main
path: dashboards/
sync_interval: 5m
auth:
type: token
token: ${GIT_TOKEN}
3. 动态Dashboard
支持基于条件逻辑的动态面板显示:
{
"panels": [
{
"id": 1,
"title": "高级指标",
"condition": {
"type": "variable",
"variable": "show_advanced",
"operator": "==",
"value": "true"
}
}
]
}
4. Drilldown体验
从指标快速下钻到日志和追踪:
指标异常] --> B[Logs
查看日志] B --> C[Traces
分布式追踪] C --> D[Profiles
性能剖析] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#fce4ec
可观测性即代码
完整的GitOps工作流:
# 1. 导出现有Dashboard
grafana-cli dashboard export flink-monitoring > flink-dashboard.json
# 2. 转换为YAML格式
grr get Dashboard/flink-monitoring > flink-dashboard.yaml
# 3. 提交到Git仓库
git add flink-dashboard.yaml
git commit -m "Update Flink dashboard"
git push origin main
# 4. 自动同步到Grafana
# Grafana会自动拉取Git仓库的变更并应用
CI/CD集成示例:
# .github/workflows/grafana-sync.yml
name: Sync Grafana Dashboards
on:
push:
branches: [main]
paths:
- 'dashboards/**'
jobs:
sync:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Grizzly
run: |
curl -fSL -o grr.tar.gz https://github.com/grafana/grizzly/releases/download/v0.4.0/grr-linux-amd64
tar -xzf grr.tar.gz
chmod +x grr
- name: Apply Dashboards
env:
GRAFANA_URL: ${{ secrets.GRAFANA_URL }}
GRAFANA_TOKEN: ${{ secrets.GRAFANA_TOKEN }}
run: |
./grr apply dashboards/
动态仪表盘
使用场景:
- 根据用户角色显示不同的面板
- 根据环境(开发/生产)调整告警阈值
- 根据时间范围自动调整采样率
实现示例:
{
"panels": [
{
"id": 1,
"title": "详细指标",
"condition": {
"type": "permission",
"permission": "Admin"
}
},
{
"id": 2,
"title": "生产环境告警",
"condition": {
"type": "variable",
"variable": "environment",
"operator": "==",
"value": "production"
},
"alert": {
"threshold": 100
}
},
{
"id": 3,
"title": "开发环境告警",
"condition": {
"type": "variable",
"variable": "environment",
"operator": "==",
"value": "development"
},
"alert": {
"threshold": 500
}
}
]
}
最佳实践
Dashboard设计原则
1. 信息层次化
第一层(最重要):核心KPI、作业状态
↓
第二层:吞吐量、延迟等性能指标
↓
第三层:资源使用、详细指标
↓
第四层:调试信息、详细日志
2. 颜色使用规范
| 颜色 | 含义 | 使用场景 | 阈值示例 |
|---|---|---|---|
| 绿色 | 正常 | 指标在健康范围内 | CPU < 70% |
| 黄色 | 警告 | 需要关注但未严重 | CPU 70-85% |
| 红色 | 严重 | 需要立即处理 | CPU > 85% |
| 蓝色 | 信息 | 中性指标展示 | 数据量统计 |
| 灰色 | 未知/禁用 | 数据缺失或功能关闭 | 服务停止 |
3. 命名规范
Dashboard命名:[系统]-[模块]-[环境]
示例:Flink-StreamingJob-Production
Panel命名:[指标类型] - [具体指标]
示例:Throughput - Records In Rate
变量命名:小写+下划线
示例:job_name, task_name, environment
4. 性能优化建议
| 优化项 | 问题 | 解决方案 | 效果 |
|---|---|---|---|
| 查询优化 | 查询过于复杂 | 使用Recording Rules预聚合 | 查询速度提升10倍+ |
| 时间范围 | 查询时间跨度过大 | 限制默认时间范围为1-6小时 | 减少数据传输量 |
| 刷新频率 | 刷新过于频繁 | 根据场景设置合理刷新间隔 | 降低后端负载 |
| 面板数量 | 单个Dashboard面板过多 | 拆分为多个Dashboard | 加载速度提升 |
| 数据点密度 | 数据点过密 | 使用$__interval动态调整 | 前端渲染更流畅 |
Recording Rules示例:
# prometheus-rules.yaml
groups:
- name: flink_aggregations
interval: 30s
rules:
# 预聚合:作业级别的总吞吐量
- record: job:flink_records_in_rate:sum
expr: sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name)
# 预聚合:TaskManager平均内存使用率
- record: job:flink_memory_usage:avg
expr: avg(flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max) by (job_name)
性能优化
1. 查询优化技巧
# 不推荐:查询所有标签
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m]))
# 推荐:只保留必要标签
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn{job_name="my-job"}[5m])) by (task_name)
# 不推荐:使用正则表达式
{job_name=~".*flink.*"}
# 推荐:使用精确匹配
{job_name="flink-streaming-job"}
# 使用$__interval变量自动调整采样率
rate(flink_taskmanager_job_task_operator_numRecordsIn[$__interval])
2. 缓存策略
# grafana.ini
[caching]
enabled = true
[query_cache]
enabled = true
ttl = 5m
max_cache_size_mb = 1000
3. 数据源连接池
# datasource配置
jsonData:
maxOpenConns: 100
maxIdleConns: 10
connMaxLifetime: 14400
安全配置
1. 认证配置
# grafana.ini
[auth]
disable_login_form = false
disable_signout_menu = false
[auth.ldap]
enabled = true
config_file = /etc/grafana/ldap.toml
[auth.oauth]
enabled = true
name = OAuth
allow_sign_up = true
client_id = YOUR_CLIENT_ID
client_secret = YOUR_CLIENT_SECRET
2. 权限管理
| 角色 | 权限 | 适用人员 |
|---|---|---|
| Viewer | 只读Dashboard | 普通开发人员、业务人员 |
| Editor | 创建和编辑Dashboard | 运维人员、SRE |
| Admin | 完全控制权限 | 系统管理员 |
3. API密钥管理
# 创建API密钥
curl -X POST -H "Content-Type: application/json" \
-d '{"name":"monitoring-api","role":"Viewer"}' \
http://admin:admin@localhost:3000/api/auth/keys
# 使用API密钥
curl -H "Authorization: Bearer YOUR_API_KEY" \
http://localhost:3000/api/dashboards/uid/flink-monitoring
4. 数据源权限隔离
# 为不同团队配置独立的数据源
datasources:
- name: Prometheus-Team-A
type: prometheus
url: http://prometheus-team-a:9090
access: proxy
basicAuth: true
basicAuthUser: team-a
secureJsonData:
basicAuthPassword: ${TEAM_A_PASSWORD}
常见问题与排查
数据源连接问题
问题1:数据源连接超时
# 症状
Error: context deadline exceeded
# 排查步骤
# 1. 检查网络连通性
curl -v http://prometheus:9090/api/v1/query?query=up
# 2. 检查Grafana日志
docker logs grafana | grep -i "error\|timeout"
# 3. 增加超时时间
# datasource配置
jsonData:
timeout: 60
queryTimeout: 120s
问题2:数据源认证失败
# 症状
Error: 401 Unauthorized
# 解决方案
# 1. 检查认证配置
# 2. 使用Bearer Token认证
jsonData:
httpHeaderName1: 'Authorization'
secureJsonData:
httpHeaderValue1: 'Bearer YOUR_TOKEN'
查询性能问题
问题1:查询响应慢
# 问题查询(扫描大量数据)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[1h])) by (job_name, task_name, operator_name, subtask_index)
# 优化后(减少标签维度)
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (job_name, operator_name)
# 使用Recording Rules预聚合
job:flink_records_in_rate:sum
问题2:Dashboard加载慢
# 排查步骤
# 1. 检查面板数量(建议<20个)
# 2. 检查查询复杂度
# 3. 启用查询缓存
# 4. 减少默认时间范围
# 优化方案
# 1. 拆分Dashboard
# 2. 使用变量过滤数据
# 3. 调整刷新频率
refresh: 30s -> 1m
告警不触发
问题1:告警规则不生效
# 检查告警规则状态
# Grafana UI: Alerting > Alert rules
# 常见原因
# 1. 查询返回空结果
# 2. for持续时间未满足
# 3. 告警通知渠道配置错误
# 调试方法
# 1. 在Dashboard中测试查询
# 2. 检查告警历史记录
# 3. 查看Grafana日志
问题2:告警风暴
# 问题:短时间内收到大量重复告警
# 解决方案
# 1. 配置告警分组
route:
group_by: ['alertname', 'job_name']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
# 2. 配置静默规则
silences:
- matchers:
- name: alertname
value: FlinkJobDown
startsAt: 2025-03-19T10:00:00Z
endsAt: 2025-03-19T12:00:00Z
comment: "维护窗口"
总结:
Grafana作为可视化监控平台,在Flink等大数据场景下发挥着关键作用。通过合理的Dashboard设计、精准的指标监控和及时的告警响应,可以有效保障Flink作业的稳定运行。结合Grafana 12的新特性,如可观测性即代码和动态Dashboard,能够进一步提升监控系统的可维护性和灵活性。
关键要点:
- 指标选择:关注核心指标(吞吐量、延迟、反压、Checkpoint),避免指标过载
- 告警设计:合理设置告警阈值和分级,避免告警疲劳
- 性能优化:使用Recording Rules、查询缓存、合理的刷新频率
- 团队协作:通过Git管理Dashboard配置,实现版本控制和CI/CD
- 持续改进:根据实际运行情况不断优化Dashboard和告警规则
参考来源: