12. Spark
目录
Spark 概述与环境
Spark简介
Apache Spark 是一个快速、通用的大数据处理引擎,专为大规模数据处理而设计。它提供了高级API(Java、Scala、Python、R),并支持用于SQL查询、流处理、机器学习和图形处理的优化引擎。
Spark特点与优势
核心特点:
- 速度快:内存计算比Hadoop MapReduce快100倍,磁盘计算快10倍
- 易用性:提供多种语言API,支持80多种高级算子
- 通用性:支持SQL查询、流处理、机器学习、图计算
- 兼容性:可运行在Hadoop、Mesos、Kubernetes、standalone等集群上
技术优势:
| 特性 | Spark | Hadoop MapReduce |
|---|---|---|
| 计算模式 | 内存计算 + 磁盘存储 | 磁盘计算 |
| 数据共享 | RDD内存共享 | 磁盘文件系统 |
| 迭代计算 | 支持高效迭代 | 效率低 |
| 实时处理 | 支持流处理 | 仅批处理 |
| 容错机制 | RDD血统恢复 | 数据副本 |
| 开发效率 | 代码简洁 | 代码复杂 |
Spark vs Hadoop MapReduce
graph TD
A[数据输入] --> B{计算引擎}
B -->|MapReduce| C[Map阶段]
C --> D[Shuffle写磁盘]
D --> E[Reduce阶段]
E --> F[结果写HDFS]
B -->|Spark| G[RDD转换]
G --> H[内存计算]
H --> I[结果输出]
style C fill:#ffcccb
style D fill:#ffcccb
style E fill:#ffcccb
style F fill:#ffcccb
style G fill:#90EE90
style H fill:#90EE90
style I fill:#90EE90
性能对比:
- 内存计算:Spark在内存中缓存数据,避免重复I/O
- DAG执行:Spark将作业构建为DAG,优化执行计划
- Pipelining:Spark支持算子流水线,减少中间数据存储
- 代码生成:Catalyst优化器生成高效的Java代码
Spark应用场景
典型应用领域:
| 场景 | 描述 | 优势 |
|---|---|---|
| 数据ETL | 大规模数据清洗、转换、加载 | 处理速度快,支持多种数据源 |
| 实时流处理 | 实时数据分析、监控告警 | 低延迟,高吞吐量 |
| 机器学习 | 大规模机器学习模型训练 | MLlib生态,迭代计算优势 |
| 交互式查询 | 即席查询、数据探索 | SQL支持,响应速度快 |
| 图计算 | 社交网络分析、推荐系统 | GraphX图处理能力 |
Spark生态系统
核心组件
Spark核心组件详解表:
| 组件 | 主要功能 | 核心API | 关键特性 | 适用场景 | 性能特点 |
|---|---|---|---|---|---|
| Spark Core | 基础运行时引擎 | RDD | 内存计算、懒加载、容错 | 数据处理基础 | 高速内存计算 |
| Spark SQL | 结构化数据处理 | DataFrame/Dataset | SQL支持、Catalyst优化 | 数据分析、ETL | 查询优化 |
| Spark Streaming | 流数据处理 | DStream | 微批处理、准实时 | 实时数据处理 | 秒级延迟 |
| Structured Streaming | 统一流批处理 | DataFrame | 端到端一致性、状态管理 | 实时分析 | 毫秒级延迟 |
| MLlib | 机器学习库 | ML Pipeline | 分布式算法、管道API | 大数据机器学习 | 可扩展性强 |
| GraphX | 图计算框架 | Graph/VertexRDD | 图算法、并行计算 | 社交网络、推荐 | 内存图计算 |
Spark 核心概念
RDD核心概念
RDD (Resilient Distributed Dataset) 是Spark的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。
RDD特性
graph TD
A[RDD核心特性] --> B[不可变性<br/>Immutable]
A --> C[分布式<br/>Distributed]
A --> D[弹性容错<br/>Resilient]
A --> E[惰性求值<br/>Lazy Evaluation]
A --> F[分区计算<br/>Partitioned]
B --> B1[数据一旦创建不可修改]
C --> C1[数据分布在集群多个节点]
D --> D1[通过血统信息自动容错]
E --> E1[Transform操作延迟执行]
F --> F1[支持并行计算]
style A fill:#e1f5fe
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#ffebee
style E fill:#f3e5f5
style F fill:#fce4ec
RDD的五大特性:
| 特性 | 描述 | 意义 |
|---|---|---|
| 分区列表 | RDD由多个分区组成 | 支持并行计算 |
| 计算函数 | 每个分区都有计算函数 | 定义数据处理逻辑 |
| 依赖关系 | RDD之间的依赖关系 | 支持容错恢复 |
| 分区器 | Key-Value RDD的分区策略 | 优化数据分布 |
| 位置偏好 | 计算分区的最佳位置 | 数据本地性优化 |
RDD操作分类
Transformation vs Action:
graph LR
A[RDD操作] --> B[Transformation<br/>转换操作]
A --> C[Action<br/>行动操作]
B --> D[惰性执行<br/>不立即计算]
B --> E[返回新RDD]
B --> F[构建计算图]
C --> G[立即执行<br/>触发计算]
C --> H[返回结果值]
C --> I[提交作业]
style B fill:#e8f5e8
style C fill:#ffebee
常用Transformation操作:
// 创建RDD
val rdd = sc.parallelize(1 to 100, 4)
// map:一对一转换
val mapRDD = rdd.map(x => x * 2)
// filter:过滤数据
val filterRDD = rdd.filter(x => x % 2 == 0)
// flatMap:一对多转换
val flatMapRDD = rdd.flatMap(x => 1 to x)
// groupByKey:按键分组
val kvRDD = rdd.map(x => (x % 10, x))
val groupedRDD = kvRDD.groupByKey()
// reduceByKey:按键聚合
val reducedRDD = kvRDD.reduceByKey(_ + _)
// join:连接操作
val rdd2 = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c")))
val joinedRDD = kvRDD.join(rdd2)
常用Action操作:
// collect:收集所有元素到Driver
val result = rdd.collect()
// count:计算元素数量
val cnt = rdd.count()
// first:获取第一个元素
val firstElement = rdd.first()
// take:获取前n个元素
val firstN = rdd.take(10)
// reduce:聚合所有元素
val sum = rdd.reduce(_ + _)
// foreach:遍历每个元素
rdd.foreach(println)
// saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://output/path")
RDD依赖关系
依赖类型:
graph TD
A[RDD依赖关系] --> B[窄依赖<br/>Narrow Dependency]
A --> C[宽依赖<br/>Wide Dependency]
B --> D[一对一映射<br/>1:1 Mapping]
B --> E[同一Stage内<br/>Pipeline执行]
B --> F[局部失败恢复]
C --> G[一对多映射<br/>1:N Mapping]
C --> H[需要Shuffle<br/>跨Stage执行]
C --> I[全量重新计算]
style B fill:#e8f5e8
style C fill:#ffebee
窄依赖示例:
// map, filter, union等操作产生窄依赖
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(_ * 2) // 窄依赖
val rdd3 = rdd2.filter(_ > 10) // 窄依赖
宽依赖示例:
// groupByKey, reduceByKey, join等操作产生宽依赖
val rdd1 = sc.parallelize(Seq((1, "a"), (2, "b"), (1, "c")), 2)
val rdd2 = rdd1.groupByKey() // 宽依赖,需要Shuffle
val rdd3 = rdd1.reduceByKey(_ + _) // 宽依赖,需要Shuffle
DataFrame与Dataset
DataFrame概念
DataFrame 是Spark SQL的核心抽象,是一个以命名列方式组织的分布式数据集,类似于关系数据库中的表。
DataFrame特点:
- 结构化数据:具有明确的Schema定义
- 优化执行:使用Catalyst优化器
- 多语言支持:Scala、Java、Python、R
- 丰富API:SQL风格和函数式API
DataFrame创建:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("DataFrameExample")
.getOrCreate()
import spark.implicits._
// 方式1:从RDD创建
val rdd = sc.parallelize(Seq(("Alice", 25), ("Bob", 30), ("Charlie", 35)))
val df1 = rdd.toDF("name", "age")
// 方式2:从序列创建
val df2 = Seq(("Alice", 25), ("Bob", 30)).toDF("name", "age")
// 方式3:从外部数据源创建
val df3 = spark.read.json("path/to/file.json")
val df4 = spark.read.parquet("path/to/file.parquet")
// 方式4:通过Schema创建
val schema = StructType(Seq(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
val df5 = spark.createDataFrame(rdd, schema)
Dataset概念
Dataset 是DataFrame的扩展,提供了类型安全的面向对象编程接口。
Dataset特点:
- 类型安全:编译时类型检查
- 优化执行:享受Catalyst优化器
- 函数式API:支持lambda表达式
- 编码器支持:自动序列化/反序列化
Dataset创建:
// 定义样例类
case class Person(name: String, age: Int, city: String)
// 方式1:从序列创建
val ds1 = Seq(
Person("Alice", 25, "Beijing"),
Person("Bob", 30, "Shanghai")
).toDS()
// 方式2:从DataFrame转换
val ds2 = df.as[Person]
// 方式3:从外部数据源创建
val ds3 = spark.read.json("path/to/file.json").as[Person]
三者对比分析
RDD vs DataFrame vs Dataset 全面对比:
| 特性 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 数据抽象 | 分布式对象集合 | 结构化数据表 | 类型安全的数据表 |
| 编译时检查 | ❌ 运行时错误 | ❌ 运行时错误 | ✅ 编译时错误 |
| 执行优化 | ❌ 无优化 | ✅ Catalyst优化 | ✅ Catalyst优化 |
| 代码生成 | ❌ 无 | ✅ 有 | ✅ 有 |
| 序列化 | Java/Kryo序列化 | Tungsten二进制格式 | Tungsten二进制格式 |
| API风格 | 函数式 | SQL + 函数式 | 类型安全函数式 |
| 性能 | 低 | 高 | 高 |
| 易用性 | 复杂 | 简单 | 中等 |
| 适用场景 | 低级操作、非结构化数据 | SQL查询、结构化数据 | 类型安全要求高 |
性能对比:
// 性能测试示例
import org.apache.spark.sql.functions._
// RDD方式 - 性能较低
val rddResult = rdd.filter(_.age > 25)
.map(p => (p.city, 1))
.reduceByKey(_ + _)
.collect()
// DataFrame方式 - 性能优化
val dfResult = df.filter($"age" > 25)
.groupBy("city")
.count()
.collect()
// Dataset方式 - 类型安全 + 性能优化
val dsResult = ds.filter(_.age > 25)
.groupByKey(_.city)
.count()
.collect()
选择建议:
graph TD
A[选择数据抽象] --> B{数据类型}
B -->|非结构化| C[使用RDD]
B -->|结构化| D{类型安全要求}
D -->|不需要| E[使用DataFrame]
D -->|需要| F[使用Dataset]
C --> G[复杂数据处理<br/>底层控制]
E --> H[SQL查询<br/>高性能要求]
F --> I[类型安全<br/>编译时检查]
style C fill:#ffebee
style E fill:#e8f5e8
style F fill:#e1f5fe
分区机制
分区策略
分区的重要性:
- 并行度控制:分区数决定任务并行度
- 数据本地性:减少网络传输
- 负载均衡:避免数据倾斜
- 资源利用:充分利用集群资源
分区器类型:
| 分区器 | 适用数据类型 | 分区策略 | 使用场景 |
|---|---|---|---|
| HashPartitioner | Key-Value RDD | Hash(key) % numPartitions | 均匀分布的键 |
| RangePartitioner | 可排序的Key-Value RDD | 按键值范围分区 | 有序数据查询 |
| 自定义分区器 | 任意类型 | 用户定义逻辑 | 特殊业务需求 |
分区操作示例:
// 创建带分区的RDD
val rdd = sc.parallelize(1 to 100, 4) // 4个分区
// 查看分区信息
println(s"分区数: ${rdd.getNumPartitions}")
println(s"分区内容: ${rdd.glom().collect().map(_.toList).toList}")
// 重新分区
val repartitionedRDD = rdd.repartition(8) // 增加分区数
val coalescedRDD = rdd.coalesce(2) // 减少分区数
// Key-Value RDD分区
val kvRDD = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")), 2)
// 使用HashPartitioner
val hashPartitioned = kvRDD.partitionBy(new HashPartitioner(3))
// 使用RangePartitioner
val rangePartitioned = kvRDD.partitionBy(new RangePartitioner(3, kvRDD))
自定义分区器:
import org.apache.spark.Partitioner
// 自定义分区器:按用户ID的地区分区
class RegionPartitioner(regions: Array[String]) extends Partitioner {
override def numPartitions: Int = regions.length
override def getPartition(key: Any): Int = {
val userId = key.asInstanceOf[String]
val region = getUserRegion(userId)
math.abs(regions.indexOf(region)) % numPartitions
}
private def getUserRegion(userId: String): String = {
// 根据用户ID确定地区的业务逻辑
userId.substring(0, 2) match {
case "01" | "02" => "North"
case "03" | "04" => "South"
case "05" | "06" => "East"
case _ => "West"
}
}
}
// 使用自定义分区器
val regions = Array("North", "South", "East", "West")
val customPartitioner = new RegionPartitioner(regions)
val customPartitioned = kvRDD.partitionBy(customPartitioner)
分区调优
分区数优化:
// 分区数设置原则
val totalCores = 16 // 集群总核心数
val optimalPartitions = totalCores * 2 // 推荐分区数为核心数的2-3倍
// 动态调整分区数
def getOptimalPartitions(dataSize: Long): Int = {
val targetPartitionSize = 128 * 1024 * 1024 // 128MB per partition
math.max(1, (dataSize / targetPartitionSize).toInt)
}
// 分区倾斜检测
def detectPartitionSkew(rdd: RDD[_]): Unit = {
val partitionSizes = rdd.mapPartitionsWithIndex { (index, iter) =>
Iterator((index, iter.size))
}.collect()
val avgSize = partitionSizes.map(_._2).sum / partitionSizes.length
val maxSize = partitionSizes.map(_._2).max
val skewRatio = maxSize.toDouble / avgSize
if (skewRatio > 2.0) {
println(s"警告:检测到分区倾斜,倾斜比例: $skewRatio")
partitionSizes.foreach { case (index, size) =>
println(s"分区 $index: $size 条记录")
}
}
}
分区优化策略:
- 预分区策略:
// 根据数据特征预分区
val userRDD = sc.textFile("hdfs://users/*")
.map(parseUser)
.partitionBy(new HashPartitioner(numPartitions))
.cache() // 缓存预分区的数据
- Coalesce vs Repartition:
// Coalesce:减少分区,避免全量Shuffle
val reducedRDD = largeRDD.coalesce(10)
// Repartition:重新分区,会进行全量Shuffle
val reshuffledRDD = largeRDD.repartition(20)
// 条件分区调整
def smartRepartition[T](rdd: RDD[T], targetPartitions: Int): RDD[T] = {
val currentPartitions = rdd.getNumPartitions
if (targetPartitions < currentPartitions) {
rdd.coalesce(targetPartitions)
} else {
rdd.repartition(targetPartitions)
}
}
- 分区保持策略:
// 使用mapPartitions保持分区结构
val optimizedRDD = rdd.mapPartitions { iter =>
// 分区内处理逻辑
iter.map(processRecord)
}
// 避免破坏分区的操作
val goodRDD = partitionedRDD.mapValues(_ * 2) // 保持分区
val badRDD = partitionedRDD.map(x => (x._1, x._2 * 2)) // 可能破坏分区
Spark 架构与原理
整体架构设计
系统架构总览
Spark分布式系统架构是基于**主从模式(Master-Slave)的分布式计算框架,采用驱动器-执行器(Driver-Executor)**的执行模型。
Spark系统架构层次图:
graph TB
subgraph "应用层 Application Layer"
A1[Spark应用程序]
A2[用户代码 User Code]
end
subgraph "接口层 API Layer"
B1[Spark Core RDD API]
B2[Spark SQL DataFrame/Dataset]
B3[Spark Streaming DStream]
B4[MLlib Pipeline]
B5[GraphX Graph]
end
subgraph "引擎层 Engine Layer"
C1[Catalyst优化器]
C2[DAG调度器]
C3[任务调度器]
C4[内存管理]
C5[Shuffle管理]
end
subgraph "运行时层 Runtime Layer"
D1[Driver进程]
D2[Executor进程]
D3[BlockManager]
D4[RPC通信]
end
subgraph "资源层 Resource Layer"
E1[YARN集群]
E2[Kubernetes]
E3[Mesos集群]
E4[Standalone集群]
end
subgraph "存储层 Storage Layer"
F1[HDFS]
F2[HBase]
F3[S3/OSS]
F4[本地存储]
end
A1 --> B1
A1 --> B2
A1 --> B3
A1 --> B4
A1 --> B5
B2 --> C1
B1 --> C2
B1 --> C3
B1 --> C4
B1 --> C5
C2 --> D1
C3 --> D2
C4 --> D3
C5 --> D4
D1 --> E1
D1 --> E2
D1 --> E3
D1 --> E4
D2 --> F1
D2 --> F2
D2 --> F3
D2 --> F4
style A1 fill:#e1f5fe
style C1 fill:#fff3e0
style D1 fill:#e8f5e8
style E1 fill:#fce4ec
架构分层说明:
| 层次 | 功能职责 | 核心组件 | 技术特点 | 对外接口 |
|---|---|---|---|---|
| 应用层 | 业务逻辑实现 | 用户程序 | 高级抽象API | 编程接口 |
| 接口层 | 统一编程模型 | RDD/DataFrame/DStream | 声明式编程 | 多语言API |
| 引擎层 | 查询优化执行 | Catalyst/DAGScheduler | 自动优化 | 内部接口 |
| 运行时层 | 分布式执行 | Driver/Executor | 容错机制 | RPC通信 |
| 资源层 | 资源管理调度 | 集群管理器 | 弹性伸缩 | 资源API |
| 存储层 | 数据持久化 | 分布式存储 | 高可靠性 | 存储协议 |
Spark完整架构交互图 - 已在之前的章节中定义,展示了各组件间的详细交互关系。
核心组件协作流程:
sequenceDiagram
participant User as 用户应用
participant SC as SparkContext
participant CM as 集群管理器
participant Driver as Driver
participant Executor as Executor
Note over User, Executor: 应用程序生命周期
User->>SC: 1. 创建SparkContext
SC->>CM: 2. 申请集群资源
CM->>Executor: 3. 启动Executor进程
Executor->>Driver: 4. 注册到Driver
Note over User, Executor: 作业执行过程
User->>SC: 5. 提交Action操作
SC->>Driver: 6. 构建DAG图
Driver->>Driver: 7. 划分Stage
Driver->>Executor: 8. 分发Task
Executor->>Executor: 9. 执行计算
Executor->>Driver: 10. 返回结果
Driver->>User: 11. 汇总结果
Note over User, Executor: 资源清理
User->>SC: 12. 停止应用
SC->>Executor: 13. 停止Executor
SC->>CM: 14. 释放资源
部署架构模式
Spark支持多种部署模式,每种模式适用于不同的场景和需求:
部署模式对比分析:
| 部署模式 | 架构特点 | Driver位置 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|---|---|
| 本地模式 | 单机运行 | 本地进程 | 开发测试 | 简单易用 | 无分布式能力 |
| Standalone Client | Spark原生 | 客户端 | 交互式开发 | 实时反馈 | 网络依赖强 |
| Standalone Cluster | Spark原生 | 集群节点 | 生产批处理 | 高可用 | 资源隔离弱 |
| YARN Client | Hadoop集成 | 客户端 | 数据探索 | Hadoop兼容 | 客户端负载重 |
| YARN Cluster | Hadoop集成 | YARN容器 | 企业生产 | 资源隔离好 | 调试困难 |
| Kubernetes | 容器化 | Pod | 云原生 | 弹性伸缩 | 复杂度高 |
| Mesos | 通用调度 | Mesos框架 | 多框架共享 | 细粒度资源 | 配置复杂 |
典型部署架构示例:
graph TB
subgraph "YARN Cluster部署"
Y1[ResourceManager]
Y2[NodeManager 1]
Y3[NodeManager 2]
Y4[ApplicationMaster<br/>Driver]
Y5[Container<br/>Executor]
Y6[Container<br/>Executor]
Y1 --> Y2
Y1 --> Y3
Y2 --> Y4
Y2 --> Y5
Y3 --> Y6
Y4 -.->|调度| Y5
Y4 -.->|调度| Y6
end
subgraph "Kubernetes部署"
K1[Master Node]
K2[Worker Node 1]
K3[Worker Node 2]
K4[Driver Pod]
K5[Executor Pod]
K6[Executor Pod]
K1 --> K2
K1 --> K3
K2 --> K4
K2 --> K5
K3 --> K6
K4 -.->|调度| K5
K4 -.->|调度| K6
end
subgraph "Standalone部署"
S1[Spark Master]
S2[Spark Worker 1]
S3[Spark Worker 2]
S4[Driver]
S5[Executor]
S6[Executor]
S1 --> S2
S1 --> S3
S2 --> S4
S2 --> S5
S3 --> S6
S4 -.->|调度| S5
S4 -.->|调度| S6
end
style Y1 fill:#e1f5fe
style K1 fill:#fff3e0
style S1 fill:#e8f5e8
核心组件原理
SparkContext - 应用程序入口
SparkContext 是Spark应用程序的入口点,负责与集群建立连接。
// SparkContext核心功能
class SparkContext(config: SparkConf) extends Logging {
// 1. 初始化核心组件
private val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus, numCores, mockOutputCommitCoordinator)
private val statusTracker = new SparkStatusTracker(this, sparkUI)
private val taskScheduler = createTaskScheduler(this, master, deployMode)
private val dagScheduler = new DAGScheduler(this)
// 2. 创建RDD
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions)
.map(pair => pair._2.toString)
}
// 3. 提交作业
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
dagScheduler.runJob(rdd, func, partitions, callSite, resultHandler, localProperties.get)
}
// 4. 资源管理
def stop(): Unit = {
dagScheduler.stop()
taskScheduler.stop()
env.stop()
}
}
Driver Program - 驱动程序
Driver Program 是Spark应用程序的控制中心,运行应用程序的main函数,负责:
- 创建SparkContext:初始化Spark应用程序
- 构建逻辑计划:将用户程序转换为DAG
- 任务调度:将DAG分解为Stage和Task
- 结果收集:收集Executor返回的结果
// Driver程序示例
object WordCount {
def main(args: Array[String]): Unit = {
// 1. 创建SparkContext
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
try {
// 2. 创建RDD并定义转换操作
val lines = sc.textFile(args(0))
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// 3. 触发Action,提交作业
wordCounts.saveAsTextFile(args(1))
} finally {
// 4. 停止SparkContext
sc.stop()
}
}
}
Cluster Manager - 集群管理器
集群管理器是Spark与底层资源管理系统的接口层,负责资源的申请、分配和监控。
集群管理器详细对比:
| 集群管理器 | 部署复杂度 | 资源隔离 | 高可用性 | 动态分配 | 生态集成 | 适用场景 |
|---|---|---|---|---|---|---|
| Standalone | 简单 | 应用级 | 支持 | 支持 | Spark专用 | 开发测试、专用集群 |
| YARN | 中等 | 队列级 | 高 | 支持 | Hadoop生态 | 企业级大数据平台 |
| Mesos | 复杂 | 应用级 | 高 | 支持 | 多框架 | 多租户共享环境 |
| Kubernetes | 中等 | 命名空间 | 高 | 支持 | 云原生 | 容器化微服务环境 |
部署模式对比:
| 部署模式 | Driver位置 | 网络要求 | 交互性 | 故障恢复 | 资源使用 | 使用场景 |
|---|---|---|---|---|---|---|
| Client模式 | 客户端 | 稳定网络连接 | 好 | Driver故障导致应用失败 | 客户端资源 | 交互式开发、调试 |
| Cluster模式 | 集群节点 | 无特殊要求 | 差 | 集群自动恢复 | 集群资源 | 生产环境、批处理 |
YARN模式详解:
// YARN Client模式
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 10 \
--executor-memory 4g \
--executor-cores 2 \
--class com.example.MyApp \
myapp.jar
// YARN Cluster模式
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \
--executor-memory 4g \
--executor-cores 2 \
--class com.example.MyApp \
myapp.jar
Executor - 任务执行器
Executor 是运行在Worker节点上的JVM进程,是Spark任务的实际执行单元,负责:
- 任务执行:运行Driver分发的Task
- 数据存储:管理缓存和持久化数据
- 结果返回:将计算结果回传给Driver
- 资源管理:管理分配的CPU核心和内存
// Executor核心组件
class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging {
// 1. 线程池管理
private val threadPool = ThreadUtils.newDaemonCachedThreadPool(
"Executor task launch worker", sparkConf.get(EXECUTOR_CORES), 60)
// 2. 内存管理
private val memoryManager = env.memoryManager
// 3. 存储管理
private val blockManager = env.blockManager
// 4. 任务执行
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
// 5. 任务运行器
class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
extends Runnable {
override def run(): Unit = {
try {
// 反序列化任务
val task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 执行任务
val value = task.run(
taskAttemptId = taskDescription.taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
// 返回结果
execBackend.statusUpdate(taskDescription.taskId, TaskState.FINISHED, ser.serialize(value))
} catch {
case e: Exception =>
execBackend.statusUpdate(taskDescription.taskId, TaskState.FAILED, ser.serialize(TaskFailedReason))
}
}
}
}
任务调度机制
调度框架总览
Spark完整架构交互图:
graph TB
subgraph "客户端"
A1[用户应用程序]
A2[spark-submit]
end
subgraph "Driver节点"
B1[SparkContext]
B2[DAGScheduler]
B3[TaskScheduler]
B4[SchedulerBackend]
B5[BlockManagerMaster]
B6[SparkUI]
end
subgraph "集群管理器"
C1[YARN ResourceManager]
C2[Application Master]
C3[NodeManager]
end
subgraph "Worker节点1"
D1[Executor 1]
D2[Task执行]
D3[BlockManager]
D4[MemoryStore]
D5[DiskStore]
D6[ShuffleManager]
end
subgraph "Worker节点2"
E1[Executor 2]
E2[Task执行]
E3[BlockManager]
E4[MemoryStore]
E5[DiskStore]
E6[ShuffleManager]
end
%% 交互关系
A1 --> A2
A2 --> B1
B1 --> B2
B2 --> B3
B3 --> B4
B1 --> B5
B1 --> B6
B4 --> C1
C1 --> C2
C2 --> C3
C3 --> D1
C3 --> E1
D1 --> D2
D1 --> D3
D3 --> D4
D3 --> D5
D1 --> D6
E1 --> E2
E1 --> E3
E3 --> E4
E3 --> E5
E1 --> E6
%% 跨节点通信
B5 -.->|元数据管理| D3
B5 -.->|元数据管理| E3
D3 -.->|数据传输| E3
D6 -.->|Shuffle数据| E6
%% 样式设置
style A1 fill:#e1f5fe
style B1 fill:#fff3e0
style C1 fill:#ffebee
style D1 fill:#e8f5e8
style E1 fill:#e8f5e8
组件职责与交互关系:
| 组件层级 | 核心组件 | 主要职责 | 交互对象 | 通信方式 |
|---|---|---|---|---|
| 客户端 | SparkSubmit | 应用提交与配置 | YARN RM | HTTP/RPC |
| Driver | SparkContext | 应用程序入口 | 所有组件 | RPC调用 |
| Driver | DAGScheduler | Stage划分与调度 | TaskScheduler | 同步调用 |
| Driver | TaskScheduler | Task分发与管理 | SchedulerBackend | 事件驱动 |
| Driver | SchedulerBackend | 集群通信接口 | 集群管理器 | RPC/HTTP |
| Driver | BlockManagerMaster | 存储元数据管理 | BlockManager | RPC通信 |
| 集群 | ResourceManager | 资源分配管理 | ApplicationMaster | RPC通信 |
| Worker | Executor | 任务执行引擎 | Driver | 异步RPC |
| Worker | BlockManager | 数据存储管理 | BlockManagerMaster | RPC通信 |
| Worker | ShuffleManager | Shuffle数据管理 | 其他Executor | 网络传输 |
数据流转路径分析:
sequenceDiagram
participant User as 用户程序
participant Driver as Driver
participant CM as 集群管理器
participant Executor as Executor
Note over User, Executor: 1. 应用程序启动阶段
User->>Driver: 提交Spark应用
Driver->>CM: 申请资源
CM->>Executor: 启动Executor进程
Executor->>Driver: 注册到Driver
Note over User, Executor: 2. 作业执行阶段
User->>Driver: 调用Action操作
Driver->>Driver: 构建DAG,划分Stage
Driver->>Executor: 分发Task
Executor->>Executor: 执行Task
Executor->>Driver: 返回结果
Note over User, Executor: 3. Shuffle数据交换
Executor->>Executor: Shuffle Write
Executor->>Executor: Shuffle Read
Note over User, Executor: 4. 应用程序结束
Driver->>Executor: 停止Executor
Driver->>CM: 释放资源
Driver->>User: 返回最终结果
DAG调度器原理
DAGScheduler 是Spark作业调度的核心组件,负责将用户提交的RDD DAG分解为多个Stage,并按照依赖关系顺序提交给TaskScheduler执行。它是连接高层RDD操作和底层任务执行的关键桥梁。
主要功能:
- DAG构建与分析:将RDD的转换操作构建成有向无环图
- Stage划分:根据宽依赖边界将DAG划分为多个Stage
- 任务调度:按照Stage依赖关系进行拓扑排序和调度
- 容错处理:处理任务失败、Stage重试等容错逻辑
- 资源管理:与TaskScheduler协调进行资源分配
graph TD
A[用户调用Action] --> B[SparkContext.runJob]
B --> C[DAGScheduler.runJob]
C --> D[创建ActiveJob]
D --> E[提交JobSubmitted事件]
E --> F[handleJobSubmitted]
F --> G[创建ResultStage]
G --> H[getOrCreateParentStages]
H --> I[递归分析RDD依赖]
I --> J{依赖类型判断}
J -->|窄依赖| K[继续向上遍历]
J -->|宽依赖| L[创建ShuffleMapStage]
K --> I
L --> M[submitStage]
M --> N[getMissingParentStages]
N --> O{父Stage是否完成}
O -->|未完成| P[递归提交父Stage]
O -->|已完成| Q[submitMissingTasks]
P --> M
Q --> R[创建TaskSet]
R --> S[TaskScheduler.submitTasks]
S --> T[任务分发与执行]
T --> U[Stage完成]
U --> V[检查后续Stage]
V --> W[Job完成]
style A fill:#e1f5fe
style G fill:#fff3e0
style L fill:#ffebee
style W fill:#c8e6c9
DAGScheduler架构组件:
| 组件 | 类名 | 主要职责 | 关键方法 |
|---|---|---|---|
| DAGScheduler | DAGScheduler | 作业调度和Stage划分 | submitJob, submitStage |
| EventLoop | DAGSchedulerEventProcessLoop | 事件处理循环 | post, onReceive |
| Stage | Stage, ResultStage, ShuffleMapStage | Stage抽象 | findMissingPartitions |
| Job | ActiveJob | 作业抽象 | numFinished, numPartitions |
事件处理流程图:
graph TD
A[DAGScheduler事件] --> B[EventProcessLoop]
B --> C{事件类型}
C -->|JobSubmitted| D[handleJobSubmitted]
C -->|StageCompleted| E[handleStageCompleted]
C -->|TaskCompleted| F[handleTaskCompleted]
C -->|TaskFailed| G[handleTaskFailed]
C -->|ExecutorLost| H[handleExecutorLost]
D --> I[创建Stage]
E --> J[检查后续Stage]
F --> K[更新Stage状态]
G --> L[重试或失败处理]
H --> M[重新提交受影响Stage]
I --> N[提交Stage]
J --> N
K --> O[Stage完成检查]
L --> P[容错处理]
M --> N
style B fill:#e1f5fe
style N fill:#e8f5e8
style P fill:#ffebee
// DAGScheduler事件类型定义
sealed trait DAGSchedulerEvent
case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) extends DAGSchedulerEvent
case class StageCompleted(stage: Stage) extends DAGSchedulerEvent
case class TaskCompleted(task: Task[_], reason: TaskEndReason) extends DAGSchedulerEvent
case class TaskFailed(taskId: Long, reason: TaskFailedReason) extends DAGSchedulerEvent
case class ExecutorLost(execId: String, reason: ExecutorLossReason) extends DAGSchedulerEvent
// 事件处理循环实现
class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = dagScheduler.metricsSource.messageProcessingTimer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case StageCompleted(stage) =>
dagScheduler.handleStageCompleted(stage)
case TaskCompleted(task, reason) =>
dagScheduler.handleTaskCompleted(task, reason)
case TaskFailed(taskId, reason) =>
dagScheduler.handleTaskFailed(taskId, reason)
case ExecutorLost(execId, reason) =>
dagScheduler.handleExecutorLost(execId, reason)
}
}
DAGScheduler主要源码:
// DAGScheduler.scala - 核心调度逻辑
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
// 事件处理循环
private val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
// Stage和Job管理
private val stageIdToStage = new HashMap[Int, Stage]
private val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private val activeJobs = new HashSet[ActiveJob]
// 等待和运行中的Stage
private val waitingStages = new HashSet[Stage]
private val runningStages = new HashSet[Stage]
private val failedStages = new HashSet[Stage]
// 1. 提交作业的核心方法
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter, Duration.Inf)
waiter.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
// 2. 提交作业
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查分区有效性
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// 空分区直接返回
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 提交JobSubmitted事件
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter, properties))
waiter
}
// 3. 处理作业提交
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
// 创建ResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// 创建ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId)
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交Stage
submitStage(finalStage)
}
// 4. Stage划分核心算法
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new ArrayBuffer[Stage]()
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]): Unit = {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
// 宽依赖,创建新的ShuffleMapStage
parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
case _ =>
// 窄依赖,递归访问父RDD
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
// 5. 创建或获取ShuffleMapStage
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// 递归创建父Stage
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
createShuffleMapStage(shuffleDep, firstJobId)
}
}
// 6. 提交Stage
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
// 7. 提交缺失的任务
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
// 获取缺失的分区
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 添加到运行中的Stage
runningStages += stage
// 创建任务
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, serializedTaskMetrics)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${partitionsToCompute.take(15).mkString(", ")})")
// 提交TaskSet给TaskScheduler
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
} else {
// 没有任务需要运行,标记Stage完成
markStageAsFinished(stage, None)
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
submitWaitingChildStages(stage)
}
}
// 8. 处理任务完成
def handleTaskCompletion(event: CompletionEvent): Unit = {
val task = event.task
val taskId = event.taskInfo.taskId
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
// 更新累加器
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = AccumulatorContext.get(id)
if (acc != null) {
acc.asInstanceOf[AccumulatorV2[Any, Any]].merge(partialValue.asInstanceOf[Any])
}
}
// 处理不同的任务结果
event.reason match {
case Success =>
task match {
case rt: ResultTask[_, _] =>
// ResultTask完成
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
// 调用结果处理器
job.listener.taskSucceeded(rt.outputId, event.result)
// 检查Job是否完成
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
case smt: ShuffleMapTask =>
// ShuffleMapTask完成
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.addOutputLoc(smt.partitionId, event.result.asInstanceOf[MapStatus])
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
// 提交等待中的子Stage
submitWaitingChildStages(shuffleStage)
}
}
case _: TaskFailedException =>
// 任务失败处理
handleTaskFailure(task, event.reason.asInstanceOf[TaskFailedException])
}
}
}
Stage划分算法详解
Stage划分原理流程图:
graph TD
A[开始Stage划分] --> B[从最终RDD开始]
B --> C[遍历RDD依赖]
C --> D{依赖类型}
D -->|窄依赖| E[加入当前Stage]
D -->|宽依赖| F[创建新Stage边界]
E --> G[继续遍历父RDD]
F --> H[创建ShuffleMapStage]
G --> C
H --> I[递归处理父RDD]
I --> C
C --> J{是否还有未处理RDD}
J -->|是| C
J -->|否| K[Stage划分完成]
style A fill:#e1f5fe
style F fill:#ffebee
style H fill:#fff3e0
style K fill:#e8f5e8
Stage划分与依赖管理源码:
// Stage划分核心逻辑
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new ArrayBuffer[Stage]()
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]): Unit = {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
// 宽依赖,创建新的ShuffleMapStage
parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
case _ =>
// 窄依赖,递归访问父RDD
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
// 查找缺失的父依赖
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
ancestors
}
// 创建ResultStage
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
// 创建ShuffleMapStage
private def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ") as input to " +
"shuffle " + shuffleDep.shuffleId)
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
容错处理流程图:
graph TD
A[Task执行失败] --> B{失败原因分析}
B -->|FetchFailed| C[Shuffle数据获取失败]
B -->|TaskKilled| D[任务被杀死]
B -->|ExceptionFailure| E[任务执行异常]
B -->|ExecutorLostFailure| F[Executor丢失]
C --> G[标记父Stage失败]
G --> H[重新提交父Stage]
H --> I[重新计算Shuffle数据]
D --> J[检查重试次数]
E --> J
J --> K{是否超过最大重试}
K -->|否| L[重新调度Task]
K -->|是| M[标记Stage失败]
F --> N[移除丢失的Executor]
N --> O[重新分配资源]
O --> P[重新提交所有Task]
L --> Q[Task重新执行]
I --> Q
P --> Q
M --> R[Job失败]
Q --> S[执行成功]
style A fill:#ffebee
style C fill:#fff3e0
style R fill:#ffcdd2
style S fill:#e8f5e8
graph TD
A[RDD DAG] --> B[DAGScheduler]
B --> C[Stage划分]
C --> D[Stage 0<br/>ShuffleMapStage]
C --> E[Stage 1<br/>ShuffleMapStage]
C --> F[Stage 2<br/>ResultStage]
D --> G[Task 0-1]
D --> H[Task 0-2]
E --> I[Task 1-1]
E --> J[Task 1-2]
F --> K[Task 2-1]
F --> L[Task 2-2]
style B fill:#e1f5fe
style D fill:#fff3e0
style E fill:#fff3e0
style F fill:#e8f5e8
DAGScheduler架构组件:
| 组件 | 类名 | 主要职责 | 关键方法 |
|---|---|---|---|
| DAGScheduler | DAGScheduler | 作业调度和Stage划分 | submitJob, submitStage |
| EventLoop | DAGSchedulerEventProcessLoop | 事件处理循环 | post, onReceive |
| Stage | Stage, ResultStage, ShuffleMapStage | Stage抽象 | findMissingPartitions |
| Job | ActiveJob | 作业抽象 | numFinished, numPartitions |
DAGScheduler事件处理:
// DAGScheduler事件类型
sealed trait DAGSchedulerEvent
case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) extends DAGSchedulerEvent
case class StageCompleted(stage: Stage) extends DAGSchedulerEvent
case class TaskCompleted(task: Task[_], reason: TaskEndReason) extends DAGSchedulerEvent
// 事件处理循环
class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
override def onReceive(event: DAGSchedulerEvent): Unit = {
event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case StageCompleted(stage) =>
dagScheduler.handleStageCompletion(stage)
case TaskCompleted(task, reason) =>
dagScheduler.handleTaskCompletion(task, reason)
}
}
}
Stage划分与依赖管理:
// Stage划分核心逻辑
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new ArrayBuffer[Stage]()
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]): Unit = {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
// 宽依赖,创建新的ShuffleMapStage
parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
case _ =>
// 窄依赖,递归访问父RDD
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
任务调度器实现
TaskScheduler 负责将Task分发到Executor执行,支持多种调度策略。
graph TD
A[DAGScheduler提交TaskSet] --> B[TaskSchedulerImpl.submitTasks]
B --> C[创建TaskSetManager]
C --> D[添加到调度池Pool]
D --> E[SchedulerBackend.reviveOffers]
E --> F[收集Executor资源信息]
F --> G[TaskSchedulerImpl.resourceOffers]
G --> H[按本地性级别分配Task]
H --> I[创建TaskDescription]
I --> J[SchedulerBackend.launchTasks]
J --> K[发送Task到Executor]
K --> L[Executor执行Task]
L --> M[返回执行结果]
M --> N[TaskScheduler.statusUpdate]
N --> O[更新Task状态]
O --> P{TaskSet是否完成}
P -->|否| Q[继续调度剩余Task]
P -->|是| R[通知DAGScheduler]
Q --> G
R --> S[Stage完成]
style A fill:#e1f5fe
style G fill:#fff3e0
style L fill:#e8f5e8
style S fill:#c8e6c9
TaskScheduler架构组件
| 组件 | 类名 | 主要职责 | 关键特性 |
|---|---|---|---|
| TaskScheduler | TaskSchedulerImpl | 任务调度和分发 | 支持多种调度策略 |
| SchedulerBackend | CoarseGrainedSchedulerBackend | 与集群管理器通信 | 资源分配和Executor管理 |
| TaskSetManager | TaskSetManager | 管理TaskSet执行 | 任务重试、推测执行 |
| Pool | Pool | 调度池管理 | 公平调度、FIFO调度 |
Task分发和执行完整流程图
graph TD
A[DAGScheduler提交TaskSet] --> B[TaskScheduler.submitTasks]
B --> C[创建TaskSetManager]
C --> D[添加到调度池]
D --> E[SchedulerBackend.reviveOffers]
E --> F[收集Executor资源信息]
F --> G[创建WorkerOffer列表]
G --> H[TaskScheduler.resourceOffers]
H --> I[遍历TaskSetManager队列]
I --> J[按本地性级别分配]
J --> K{PROCESS_LOCAL可用?}
K -->|是| L[分配到指定Executor]
K -->|否| M{NODE_LOCAL可用?}
M -->|是| N[分配到指定Host]
M -->|否| O{RACK_LOCAL可用?}
O -->|是| P[分配到指定Rack]
O -->|否| Q[ANY级别分配]
L --> R[创建TaskDescription]
N --> R
P --> R
Q --> R
R --> S[序列化Task]
S --> T[SchedulerBackend.launchTasks]
T --> U[发送LaunchTask消息]
U --> V[Executor接收任务]
V --> W[创建TaskRunner]
W --> X[提交到线程池执行]
X --> Y[Task.run执行]
Y --> Z[返回执行结果]
Z --> AA[StatusUpdate消息]
AA --> BB[TaskScheduler.statusUpdate]
BB --> CC[更新TaskSetManager状态]
CC --> DD{TaskSet完成?}
DD -->|否| E
DD -->|是| EE[通知DAGScheduler]
style A fill:#e1f5fe
style H fill:#fff3e0
style Y fill:#e8f5e8
style EE fill:#c8e6c9
Stage划分机制
Stage划分原则:
- 宽依赖边界:遇到宽依赖(Shuffle)划分新Stage
- 窄依赖合并:窄依赖的RDD在同一Stage内Pipeline执行
- Stage类型:ShuffleMapStage和ResultStage
graph TD
A[textFile] --> B[flatMap]
B --> C[map]
C --> D[reduceByKey]
D --> E[map]
E --> F[collect]
subgraph "Stage 0 (ShuffleMapStage)"
A
B
C
end
subgraph "Stage 1 (ResultStage)"
E
F
end
C -.->|Shuffle| E
style A fill:#e8f5e8
style B fill:#e8f5e8
style C fill:#e8f5e8
style E fill:#e1f5fe
style F fill:#e1f5fe
本地性调度策略
Spark任务调度的核心是数据本地性,通过就近调度减少网络传输开销。
本地性级别优先级:
| 本地性级别 | 描述 | 数据位置 | 性能影响 | 优先级 |
|---|---|---|---|---|
| PROCESS_LOCAL | 进程本地 | 同一JVM进程的内存中 | 最快 | 最高 |
| NODE_LOCAL | 节点本地 | 同一节点的不同进程 | 快 | 高 |
| NO_PREF | 无偏好 | 数据分布均匀 | 一般 | 中等 |
| RACK_LOCAL | 机架本地 | 同一机架的不同节点 | 慢 | 低 |
| ANY | 任意位置 | 跨机架或远程 | 最慢 | 最低 |
本地性调度实现:
// TaskSetManager中的本地性调度实现
class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
val blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
// 本地性级别定义
private val myLocalityLevels = computeValidLocalityLevels()
private val localityWaits = myLocalityLevels.map(getLocalityWait)
// 计算有效的本地性级别
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality._
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty) {
levels += RACK_LOCAL
}
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
// 获取本地性等待时间
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = sched.conf.get("spark.locality.wait", "3s")
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}
if (localityWaitKey != null) {
sched.conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}
}
// 为特定Executor和本地性级别查找任务
private def findTask(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality): Option[(Int, TaskLocality.TaskLocality)] = {
for (locality <- TaskLocality.values) {
if (isAllowedLocality(locality, maxLocality)) {
val task = locality match {
case TaskLocality.PROCESS_LOCAL =>
findTaskFromList(pendingTasksForExecutor.getOrElse(execId, ArrayBuffer()))
case TaskLocality.NODE_LOCAL =>
findTaskFromList(pendingTasksForHost.getOrElse(host, ArrayBuffer()))
case TaskLocality.NO_PREF =>
findTaskFromList(pendingTasksWithNoPrefs)
case TaskLocality.RACK_LOCAL =>
val rack = sched.getRackForHost(host)
if (rack != null) {
findTaskFromList(pendingTasksForRack.getOrElse(rack, ArrayBuffer()))
} else {
None
}
case TaskLocality.ANY =>
findTaskFromList(allPendingTasks)
}
if (task.isDefined) {
return Some((task.get, locality))
}
}
}
None
}
// 检查是否允许特定本地性级别
private def isAllowedLocality(
taskLocality: TaskLocality.TaskLocality,
maxLocality: TaskLocality.TaskLocality): Boolean = {
TaskLocality.isAllowed(taskLocality, maxLocality)
}
}
本地性调度流程图:
graph TD
A[收到资源Offer] --> B[遍历TaskSet]
B --> C[检查PROCESS_LOCAL]
C --> D{有进程本地任务?}
D -->|是| E[分配PROCESS_LOCAL任务]
D -->|否| F[检查NODE_LOCAL]
F --> G{有节点本地任务?}
G -->|是| H[检查等待时间]
H --> I{等待时间未到?}
I -->|是| J[跳过,等待更好本地性]
I -->|否| K[分配NODE_LOCAL任务]
G -->|否| L[检查NO_PREF]
L --> M{有无偏好任务?}
M -->|是| N[分配NO_PREF任务]
M -->|否| O[检查RACK_LOCAL]
O --> P{有机架本地任务?}
P -->|是| Q[分配RACK_LOCAL任务]
P -->|否| R[分配ANY任务]
E --> S[创建TaskDescription]
K --> S
N --> S
Q --> S
R --> S
J --> T[等待下次调度]
style A fill:#e1f5fe
style E fill:#e8f5e8
style K fill:#e8f5e8
style S fill:#c8e6c9
推测执行与容错
推测执行 是Spark处理慢任务(Straggler)的重要机制,通过运行重复任务来提高整体性能。
推测执行触发条件:
- 任务运行时间超过中位数的1.5倍
- 任务进度明显落后于其他任务
- 节点或网络出现性能问题
// 推测执行实现
class TaskSetManager {
// 推测执行配置
private val speculationEnabled = sched.conf.getBoolean("spark.speculation", false)
private val speculationQuantile = sched.conf.getDouble("spark.speculation.quantile", 0.75)
private val speculationMultiplier = sched.conf.getDouble("spark.speculation.multiplier", 1.5)
// 检查是否需要推测执行
def checkSpeculatableTasks(minFinishedForSpeculation: Int): Boolean = {
if (!speculationEnabled || taskSet.stageId < 0 || taskSet.stageAttemptId < 0) {
return false
}
var foundTasks = false
val finishedTasks = runningTasksSet.size - pendingPartitions.size
if (finishedTasks >= minFinishedForSpeculation && finishedTasks > 0) {
val time = clock.getTimeMillis()
val medianDuration = medianTaskDuration
if (medianDuration > 0) {
val threshold = speculationMultiplier * medianDuration
// 检查每个正在运行的任务
for ((taskId, info) <- runningTasks) {
val runtime = time - info.launchTime
if (runtime > threshold && !speculatableTasks.contains(taskId)) {
logInfo(s"Marking task $taskId in stage ${taskSet.stageId} (on ${info.host}) " +
s"as speculatable because it ran more than ${threshold.toDouble / 1000.0} seconds")
speculatableTasks += taskId
foundTasks = true
}
}
}
}
foundTasks
}
// 获取可推测的任务
def dequeueSpeculativeTask(
execId: String,
host: String,
locality: TaskLocality.TaskLocality): Option[(Int, TaskLocality.TaskLocality)] = {
speculatableTasks.retain(index => runningTasks.contains(index))
if (speculatableTasks.nonEmpty) {
// 检查是否有适合该Executor的推测任务
for (index <- speculatableTasks) {
val locations = tasks(index).preferredLocations
val allowedLocality = getAllowedLocalityLevel(clock.getTimeMillis())
if (TaskLocality.isAllowed(locality, allowedLocality)) {
if (isTaskLocalToExecutor(index, execId, host, locality)) {
speculatableTasks -= index
return Some((index, locality))
}
}
}
}
None
}
// 处理推测任务完成
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// 如果这是推测任务的成功,取消其他副本
if (speculatableTasks.contains(index)) {
for ((taskId, taskInfo) <- runningTasks) {
if (taskInfo.index == index && taskId != tid) {
logInfo(s"Killing speculative task $taskId")
sched.backend.killTask(taskId, taskInfo.executorId, interruptThread = true,
reason = "another copy of the task succeeded")
}
}
speculatableTasks -= index
}
// 标记任务成功
successful(index) = true
removeRunningTask(tid)
// 如果所有任务都完成,通知DAGScheduler
if (tasksSuccessful == numTasks) {
isZombie = true
}
}
}
容错处理机制详解:
Spark在任务级别提供了完善的容错机制,包括任务重试、Stage重新计算、黑名单管理等。
故障类型与处理策略:
| 故障类型 | 触发条件 | 处理策略 | 影响范围 |
|---|---|---|---|
| 任务失败 | Task执行异常 | 重新调度任务 | 单个Task |
| Executor丢失 | Executor进程崩溃 | 重新调度所有运行中任务 | Executor上所有Task |
| Worker节点失败 | 节点不可达 | 黑名单机制,重新分配 | 节点上所有Executor |
| 网络分区 | 网络不稳定 | 推测执行 + 重试 | 部分Task |
// 故障处理实现
class TaskSetManager {
// 处理任务失败
def handleFailedTask(
tid: Long,
state: TaskState,
reason: TaskFailedReason): Unit = {
val info = taskInfos(tid)
val index = info.index
removeRunningTask(tid)
reason match {
case fetchFailed: FetchFailed =>
// Shuffle数据获取失败,需要重新计算上游Stage
logWarning(s"Lost task ${info.id} in stage ${taskSet.stageId} (TID $tid, ${info.host}, " +
s"executor ${info.executorId}): ${reason}")
if (!successful(index)) {
successful(index) = true
tasksSuccessful += 1
}
// 标记需要重新计算的Stage
abort(s"Task $tid failed with FetchFailedException", Some(fetchFailed))
case exceptionFailure: ExceptionFailure =>
// 任务执行异常
val key = (info.host, info.executorId)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
// 任务失败次数超过阈值,整个TaskSet失败
val errorMessage = s"Task $index failed $maxTaskFailures times, most recent failure: " +
exceptionFailure.description
abort(errorMessage, exceptionFailure.exception)
return
} else {
// 重新加入待调度队列
addPendingTask(index)
}
case taskKilled: TaskKilled =>
// 任务被主动杀死
logWarning(s"Task $tid killed: ${taskKilled.reason}")
case executorLostFailure: ExecutorLostFailure =>
// Executor丢失导致的任务失败
logInfo(s"Task $tid failed because executor ${executorLostFailure.execId} was lost")
addPendingTask(index)
case _ =>
// 其他类型失败
logWarning(s"Lost task $tid in stage ${taskSet.stageId}: $reason")
addPendingTask(index)
}
// 更新失败统计
if (!isZombie) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index, reason))
}
}
// 处理Executor丢失
def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
// 重新调度该Executor上运行的所有任务
for ((tid, info) <- runningTasks) {
if (info.executorId == execId) {
val reason = ExecutorLostFailure(execId, exitCausedByApp = true,
Some(s"Executor lost: $reason"))
handleFailedTask(tid, TaskState.FAILED, reason)
}
}
// 从本地性映射中移除该Executor
for {
(host, execIds) <- pendingTasksForHost
if execIds.contains(execId)
} {
execIds -= execId
if (execIds.isEmpty) {
pendingTasksForHost.remove(host)
}
}
// 重新计算有效的本地性级别
recomputeLocality()
}
}
存储与内存管理
BlockManager存储引擎
BlockManager 是Spark中负责数据存储和管理的核心组件,统一管理内存和磁盘上的数据块。
操作时序图
sequenceDiagram
participant Driver
participant BlockManagerMaster
participant Executor
participant BlockManager
participant MemoryStore
participant DiskStore
Driver->>BlockManagerMaster: 1. 注册BlockManager
Executor->>BlockManager: 2. 初始化BlockManager
BlockManager->>BlockManagerMaster: 3. 注册到Master
BlockManager->>MemoryStore: 4. 初始化内存存储
BlockManager->>DiskStore: 5. 初始化磁盘存储
Note over BlockManager,DiskStore: 数据存储流程
BlockManager->>MemoryStore: 6. 尝试内存存储
alt 内存足够
MemoryStore-->>BlockManager: 7a. 存储成功
else 内存不足
BlockManager->>DiskStore: 7b. 磁盘存储
DiskStore-->>BlockManager: 7c. 存储成功
end
BlockManager->>BlockManagerMaster: 8. 报告存储状态
BlockManager核心组件详解
1. BlockManager架构组件
| 组件 | 类名 | 主要职责 | 存储介质 |
|---|---|---|---|
| BlockManager | BlockManager | 数据块管理总控制器 | 内存+磁盘 |
| MemoryStore | MemoryStore | 内存数据存储 | JVM堆内存 |
| DiskStore | DiskStore | 磁盘数据存储 | 本地磁盘 |
| BlockManagerMaster | BlockManagerMaster | 元数据管理 | Driver内存 |
| BlockInfoManager | BlockInfoManager | Block信息管理 | 内存索引 |
2. BlockManager创建与初始化
class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
master: BlockManagerMaster,
serializerManager: SerializerManager,
conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker)
extends BlockDataManager with BlockEvictionHandler with Logging {
// 核心组件初始化
private[spark] val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
private[spark] val blockInfoManager = new BlockInfoManager
// 初始化存储组件
private[spark] val memoryStore = new MemoryStore(conf, blockInfoManager)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
// 注册到Master
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
}
3. 数据块存储流程
// 数据块存储的核心方法
def putBlockData(
blockId: BlockId,
data: BlockData,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
// 1. 检查存储级别
if (level.useMemory) {
// 2. 尝试存储到内存
val putSucceeded = memoryStore.putBytes(blockId, data, level)
if (putSucceeded) {
// 3. 通知Master
if (tellMaster) {
reportBlockStatus(blockId, BlockStatus(level, 0, 0))
}
return true
}
}
// 4. 内存不足,存储到磁盘
if (level.useDisk) {
val putSucceeded = diskStore.putBytes(blockId, data)
if (putSucceeded) {
if (tellMaster) {
reportBlockStatus(blockId, BlockStatus(level, 0, data.size))
}
return true
}
}
false
}
4. 数据块获取流程
// 数据块获取的核心方法
def get[T](blockId: BlockId): Option[BlockResult[T]] = {
// 1. 检查本地内存
memoryStore.get(blockId) match {
case Some(blockResult) => return Some(blockResult)
case None => // 继续查找
}
// 2. 检查本地磁盘
diskStore.get(blockId) match {
case Some(blockResult) => return Some(blockResult)
case None => // 继续查找
}
// 3. 从远程获取
getRemote(blockId)
}
def getRemote[T](blockId: BlockId): Option[BlockResult[T]] = {
// 1. 从Master获取block位置
val locations = master.getLocations(blockId)
// 2. 从远程节点获取
for (location <- locations) {
val blockResult = blockTransferService.fetchBlockSync(
location.host, location.port, location.executorId, blockId.toString)
if (blockResult.isDefined) {
return blockResult
}
}
None
}
统一内存管理
Spark内存分区架构:
graph TD
A[JVM内存] --> B[堆内内存<br/>On-Heap]
A --> C[堆外内存<br/>Off-Heap]
B --> D[存储内存<br/>Storage Memory]
B --> E[执行内存<br/>Execution Memory]
B --> F[其他内存<br/>Other Memory]
D --> G[RDD缓存<br/>广播变量]
E --> H[Shuffle<br/>Join聚合]
F --> I[用户数据结构<br/>Spark内部对象]
C --> J[堆外存储<br/>Off-Heap Storage]
C --> K[堆外执行<br/>Off-Heap Execution]
style B fill:#e8f5e8
style C fill:#e1f5fe
style D fill:#fff3e0
style E fill:#ffebee
1. 内存管理架构组件
| 组件 | 类名 | 主要职责 | 管理范围 |
|---|---|---|---|
| MemoryManager | UnifiedMemoryManager | 统一内存管理器 | 堆内+堆外内存 |
| StorageMemoryPool | StorageMemoryPool | 存储内存池 | 缓存数据内存 |
| ExecutionMemoryPool | ExecutionMemoryPool | 执行内存池 | 任务执行内存 |
| MemoryStore | MemoryStore | 内存存储管理 | 缓存数据存储 |
| TaskMemoryManager | TaskMemoryManager | 任务内存管理 | 单个任务内存 |
2. MemoryStore缓存管理
// MemoryStore核心实现
class MemoryStore(
conf: SparkConf,
blockInfoManager: BlockInfoManager)
extends BlockStore(BlockStore.MEMORY) with BlockEvictionHandler with Logging {
// 内存映射表
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// 当前内存使用量
private var _currentMemory = 0L
def putBytes[T](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
// 1. 检查内存是否足够
if (!memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
return false
}
// 2. 分配内存并存储数据
val bytes = _bytes()
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
_currentMemory += size
}
true
}
def get[T](blockId: BlockId): Option[BlockResult[T]] = {
entries.synchronized {
entries.get(blockId) match {
case entry: SerializedMemoryEntry[T] =>
Some(BlockResult(entry.value.asInstanceOf[T], DataReadMethod.Memory, entry.size))
case entry: DeserializedMemoryEntry[T] =>
Some(BlockResult(entry.value.asInstanceOf[T], DataReadMethod.Memory, entry.size))
case _ => None
}
}
}
}
3. TaskMemoryManager任务内存管理
// TaskMemoryManager核心实现
class TaskMemoryManager(
memoryManager: MemoryManager,
taskAttemptId: Long)
extends MemoryManager with Logging {
// 任务内存映射表
private val memoryForTask = new mutable.HashMap[Long, Long]()
// 内存分配方法
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = {
// 1. 尝试从执行内存池分配
val acquired = memoryManager.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
// 2. 记录分配的内存
if (acquired > 0) {
memoryForTask.synchronized {
memoryForTask(taskAttemptId) = memoryForTask.getOrElse(taskAttemptId, 0L) + acquired
}
}
acquired
}
// 释放内存
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = {
memoryManager.releaseExecutionMemory(numBytes, taskAttemptId, memoryMode)
memoryForTask.synchronized {
val current = memoryForTask.getOrElse(taskAttemptId, 0L)
val newTotal = math.max(0L, current - numBytes)
if (newTotal == 0) {
memoryForTask.remove(taskAttemptId)
} else {
memoryForTask(taskAttemptId) = newTotal
}
}
}
}
缓存与持久化策略
统一内存管理:
class UnifiedMemoryManager(
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(conf, numCores, onHeapStorageRegionSize, maxHeapMemory) {
// 内存池配置
private val maxPoolSize = maxHeapMemory - reservedMemory
private val poolSize = maxPoolSize * memoryFraction
// 动态内存分配
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
return false
}
if (numBytes > storagePool.memoryFree) {
// 尝试从执行内存池借用
val memoryBorrowedFromExecution = math.min(
executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
}
Shuffle数据交换
Shuffle 是Spark中数据重新分布的过程,发生在需要跨分区进行数据交换的操作中。
Shuffle类型对比
| Shuffle类型 | 特点 | 优缺点 |
|---|---|---|
| Hash Shuffle | 每个Map Task为每个Reduce Task创建文件 | 文件数过多,影响性能 |
| Sort Shuffle | 每个Map Task创建一个文件,按分区排序 | 减少文件数,提高性能 |
| Tungsten Sort | 使用堆外内存,优化排序性能 | 内存使用更高效 |
Hash Shuffle
graph TD
subgraph "Map阶段"
A[Map Task 1] --> D[File 1-1]
A --> E[File 1-2]
A --> F[File 1-3]
B[Map Task 2] --> G[File 2-1]
B --> H[File 2-2]
B --> I[File 2-3]
C[Map Task 3] --> J[File 3-1]
C --> K[File 3-2]
C --> L[File 3-3]
end
subgraph "Reduce阶段"
D --> M[Reduce Task 1]
G --> M
J --> M
E --> N[Reduce Task 2]
H --> N
K --> N
F --> O[Reduce Task 3]
I --> O
L --> O
end
style A fill:#ffebee
style B fill:#ffebee
style C fill:#ffebee
Hash Shuffle问题:
- 文件数爆炸:M个Map Task × N个Reduce Task = M×N个文件
- 随机I/O:大量小文件导致随机I/O
- 内存压力:需要为每个文件维护缓冲区
Sort Shuffle
graph TD
subgraph "Map阶段"
A[Map Task 1] --> D[排序缓冲区]
D --> E[单个输出文件]
E --> F[索引文件]
B[Map Task 2] --> G[排序缓冲区]
G --> H[单个输出文件]
H --> I[索引文件]
end
subgraph "Reduce阶段"
F --> J[Reduce Task 1]
I --> J
F --> K[Reduce Task 2]
I --> K
end
style D fill:#fff3e0
style G fill:#fff3e0
style E fill:#e8f5e8
style H fill:#e8f5e8
Sort Shuffle优势:
- 文件数减少:每个Map Task只产生一个数据文件和一个索引文件
- 顺序I/O:数据按分区ID排序写入,提高I/O效率
- 内存优化:使用外部排序,支持spill到磁盘
Tungsten优化:
- 堆外内存管理:减少GC压力
- 缓存友好的数据结构:提高CPU缓存命中率
- 代码生成:运行时生成优化的字节码
// Tungsten Sort实现
class UnsafeShuffleWriter[K, V](
blockManager: BlockManager,
shuffleBlockResolver: IndexShuffleBlockResolver,
taskMemoryManager: TaskMemoryManager,
handle: SerializedShuffleHandle[K, V],
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {
private val partitioner = handle.dependency.partitioner
private val numPartitions = partitioner.numPartitions
private var sorter: UnsafeShuffleExternalSorter = _
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 使用Tungsten内存管理
val taskContext = context.asInstanceOf[TaskContextImpl]
sorter = UnsafeShuffleExternalSorter.create(
taskContext.taskMemoryManager(),
blockManager,
context,
numPartitions,
shouldCompress = true)
// 序列化并插入记录
while (records.hasNext) {
insertRecordIntoSorter(records.next())
}
// 写出排序结果
val outputFile = shuffleBlockResolver.getDataFile(handle.shuffleId, mapId)
val partitionLengths = sorter.closeAndGetSpills.map(_.file)
.foldLeft(Array.fill[Long](numPartitions)(0)) { (lengths, file) =>
// 合并spill文件
mergeSpillsWithTransferTo(file, outputFile, lengths)
}
shuffleBlockResolver.writeIndexFileAndCommit(handle.shuffleId, mapId, partitionLengths, outputFile)
}
}
Hash Shuffle 时序图:
sequenceDiagram
participant MapTask
participant HashWriter
participant FileSystem
participant ReduceTask
participant HashReader
MapTask->>HashWriter: 写入记录
HashWriter->>FileSystem: 为每个分区创建文件
Note over FileSystem: M×N个文件创建
HashWriter->>FileSystem: 写入数据到对应文件
ReduceTask->>HashReader: 开始读取
HashReader->>FileSystem: 读取相关分区文件
FileSystem-->>HashReader: 返回数据
HashReader-->>ReduceTask: 聚合后数据
Sort Shuffle 时序图:
sequenceDiagram
participant MapTask
participant SortWriter
participant ExternalSorter
participant FileSystem
participant ReduceTask
participant SortReader
MapTask->>SortWriter: 写入记录
SortWriter->>ExternalSorter: 缓存并排序
ExternalSorter->>ExternalSorter: 内存排序/Spill
ExternalSorter->>FileSystem: 写入单个数据文件
SortWriter->>FileSystem: 写入索引文件
ReduceTask->>SortReader: 开始读取
SortReader->>FileSystem: 根据索引读取数据
FileSystem-->>SortReader: 返回分区数据
SortReader-->>ReduceTask: 聚合后数据
Shuffle机制原理
Spark Shuffle拉取流程层次图
graph TD
subgraph "Map端 (ShuffleMapTask)"
A1["Map Task执行"] --> A2["数据写入内存缓冲区<br/>(PartitionedAppendOnlyMap)"]
A2 --> A3{"内存是否充足?"}
A3 -->|否| A4["溢写到磁盘<br/>(Spill Files)"]
A3 -->|是| A5["继续缓存到内存"]
A4 --> A6["ExternalSorter排序"]
A5 --> A6
A6 --> A7["写入本地磁盘<br/>(shuffle_shuffleId_mapId_0.data)"]
A7 --> A8["生成索引文件<br/>(shuffle_shuffleId_mapId_0.index)"]
A8 --> A9["向Driver注册<br/>MapOutputTracker"]
end
subgraph "Driver端 (协调层)"
B1["MapOutputTracker"] --> B2["维护Shuffle数据位置信息<br/>(shuffleId -> Array[MapStatus])"]
B2 --> B3["响应Reduce Task<br/>的位置查询请求"]
end
subgraph "Reduce端 (Reduce Task)"
C1["Reduce Task启动"] --> C2["向MapOutputTracker<br/>请求数据位置"]
C2 --> C3["获取目标数据块位置<br/>(BlockManagerId + BlockId)"]
C3 --> C4["ShuffleBlockFetcherIterator<br/>创建拉取器"]
subgraph "数据拉取层"
C4 --> D1["本地数据拉取<br/>(LocalDirIterator)"]
C4 --> D2["远程数据拉取<br/>(RemoteBlockIterator)"]
D1 --> D3["本地磁盘读取"]
D2 --> D4["网络传输<br/>(NettyBlockTransferService)"]
D3 --> D5["数据反序列化"]
D4 --> D5
end
subgraph "BlockManager处理层"
D5 --> E1["BlockManager接收数据"]
E1 --> E2{"数据大小检查"}
E2 -->|小数据| E3["直接内存缓存<br/>(MemoryStore)"]
E2 -->|大数据| E4["磁盘临时存储<br/>(DiskStore)"]
E3 --> E5["数据解压缩"]
E4 --> E5
end
subgraph "数据处理层"
E5 --> F1["ShuffleBlockFetcherIterator<br/>迭代处理"]
F1 --> F2{"是否需要聚合?"}
F2 -->|是| F3["Aggregator聚合<br/>(combineByKey)"]
F2 -->|否| F4["直接传递数据"]
F3 --> F5{"是否需要排序?"}
F4 --> F5
F5 -->|是| F6["ExternalSorter排序"]
F5 -->|否| F7["返回最终迭代器"]
F6 --> F7
end
end
subgraph "网络传输层"
G1["NettyBlockTransferService"] --> G2["建立网络连接"]
G2 --> G3["分批次拉取数据<br/>(maxBytesInFlight)"]
G3 --> G4["流控制管理<br/>(maxReqsInFlight)"]
G4 --> G5["数据传输完成"]
end
%% 连接关系
A9 -.->|注册| B1
B3 -.->|位置信息| C3
D2 -.->|网络请求| G1
G5 -.->|数据返回| D4
%% 样式设置
style A7 fill:#e1f5fe
style A8 fill:#e1f5fe
style C4 fill:#f3e5f5
style F1 fill:#f3e5f5
style E1 fill:#fff3e0
style G3 fill:#e8f5e8
classDef mapStage fill:#e3f2fd
classDef reduceStage fill:#f1f8e9
classDef networkStage fill:#fff8e1
classDef driverStage fill:#fce4ec
class A1,A2,A3,A4,A5,A6,A7,A8,A9 mapStage
class C1,C2,C3,C4,D1,D2,D3,D4,D5,E1,E2,E3,E4,E5,F1,F2,F3,F4,F5,F6,F7 reduceStage
class G1,G2,G3,G4,G5 networkStage
class B1,B2,B3 driverStage
Shuffle拉取流程关键组件详解
Shuffle拉取流程阶段说明:
| 阶段 | 组件 | 主要功能 | 关键类 | 性能要点 |
|---|---|---|---|---|
| Map端写入 | ShuffleMapTask | 数据分区、排序、写入本地磁盘 | SortShuffleWriterExternalSorter | 内存管理、溢写控制 |
| Driver协调 | MapOutputTracker | 维护Shuffle数据位置信息 | MapOutputTrackerMasterMapStatus | 元数据管理、负载均衡 |
| Reduce端拉取 | ShuffleBlockFetcherIterator | 数据位置查询、批量拉取 | BlockStoreShuffleReaderShuffleBlockFetcherIterator | 网络优化、本地性优先 |
| 网络传输 | NettyBlockTransferService | 建立连接、数据传输、流控制 | NettyBlockTransferServiceTransportClient | 并发控制、批量传输 |
| BlockManager | 存储管理 | 数据缓存、磁盘存储、内存管理 | BlockManagerMemoryStoreDiskStore | 内存/磁盘平衡 |
| 数据处理 | Aggregator/Sorter | 数据聚合、排序、输出 | AggregatorExternalSorter | 算法优化、内存复用 |
Shuffle拉取流程详细步骤:
阶段1:Map端数据写入 (Mapper → 本地磁盘)
- 数据分区:根据Partitioner将数据分配到不同分区
- 内存缓存:优先使用
PartitionedAppendOnlyMap缓存数据 - 溢写控制:内存不足时通过
ExternalSorter溢写到磁盘 - 文件合并:将多个溢写文件和内存数据合并成单个文件
- 索引生成:创建
.index文件记录每个分区的数据位置
阶段2:Driver端元数据管理 (MapOutputTracker)
- 位置注册:Map Task完成后向Driver注册输出位置
- 元数据维护:维护
shuffleId → Array[MapStatus]映射 - 位置查询:响应Reduce Task的数据位置查询请求
- 负载均衡:根据数据大小优化Reduce Task的拉取策略
阶段3:Reduce端数据拉取 (ShuffleBlockFetcherIterator)
- 位置获取:向MapOutputTracker查询目标数据位置
- 拉取策略:优先本地拉取,再进行远程网络拉取
- 批量处理:通过
maxBytesInFlight控制批量大小 - 并发控制:通过
maxReqsInFlight限制并发请求数 - 失败重试:网络异常时自动重试和故障转移
阶段4:BlockManager数据管理
- 存储决策:根据数据大小选择内存或磁盘存储
- 压缩处理:自动处理数据压缩和解压缩
- 缓存管理:LRU策略管理内存中的数据块
- 磁盘管理:临时文件的创建、读取和清理
阶段5:数据处理与输出
- 反序列化:将字节流转换为具体的数据对象
- 聚合处理:如果需要,使用Aggregator进行数据聚合
- 排序处理:如果指定了排序规则,使用ExternalSorter排序
- 迭代器输出:返回最终的数据迭代器供下游使用
关键性能优化配置:
// Shuffle拉取相关配置
spark.reducer.maxSizeInFlight = 48m // 同时拉取的最大数据量
spark.reducer.maxReqsInFlight = 2147483647 // 同时发出的最大请求数
spark.reducer.maxBlocksInFlightPerAddress = 2147483647 // 每个地址的最大块数
spark.shuffle.compress = true // 启用Shuffle数据压缩
spark.shuffle.spill.compress = true // 启用溢写数据压缩
spark.shuffle.io.maxRetries = 3 // 拉取失败最大重试次数
spark.shuffle.io.retryWait = 5s // 重试等待时间
1. ShuffleManager架构组件
| 组件 | 类名 | 主要职责 | 适用场景 |
|---|---|---|---|
| SortShuffleManager | SortShuffleManager | Sort Shuffle管理器 | 默认Shuffle方式 |
| HashShuffleManager | HashShuffleManager | Hash Shuffle管理器 | 已废弃 |
| ShuffleWriter | SortShuffleWriter, UnsafeShuffleWriter | Shuffle写入器 | Map端数据写入 |
| ShuffleReader | BlockStoreShuffleReader | Shuffle读取器 | Reduce端数据读取 |
2. ShuffleWriter核心实现
// Sort Shuffle实现核心
class SortShuffleWriter[K, V, C](
shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {
private val dep = handle.dependency
private val blockManager = SparkEnv.get.blockManager
private val sorter: ExternalSorter[K, V, _] = {
if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
}
}
// 写入数据
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter.insertAll(records)
// 获取输出文件
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 写入排序后的数据
val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
// 写入索引文件
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, outputFile)
}
}
3. ExternalSorter内存管理
// ExternalSorter核心实现
class ExternalSorter[K, V, C](
context: TaskContext,
aggregator: Option[Aggregator[K, V, C]] = None,
partitioner: Option[Partitioner] = None,
ordering: Option[Ordering[K]] = None,
serializer: Serializer = SparkEnv.get.serializer)
extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())
with Logging {
// 内存中的数据结构
private var map = new PartitionedAppendOnlyMap[K, C]
private val buffer = new PartitionedPairBuffer[K, C]
// 插入数据
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// 需要聚合的情况
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
// 不需要聚合的情况
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
// Spill到磁盘
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): SpilledFile = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
collection.clear()
spillFile
}
}
4. ShuffleReader数据读取
// ShuffleReader核心实现
class BlockStoreShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
context: TaskContext,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
extends ShuffleReader[K, C] with Logging {
private val dep = handle.dependency
override def read(): Iterator[Product2[K, C]] = {
// 1. 获取Shuffle数据块位置
val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
// 2. 读取数据块
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
blockManager.blockTransferService,
blockManager,
blocksByAddress,
serializerManager.wrapStream(blockId, _),
// 注意:我们使用serializerManager来获取压缩和加密包装器
maxBytesInFlight = SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
maxReqsInFlight = SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
maxBlocksInFlightPerAddress = SparkEnv.get.conf.getInt(
"spark.reducer.maxBlocksInFlightPerAddress", Int.MaxValue),
maxReqSizeShuffleToMem = SparkEnv.get.conf.getSizeAsBytes(
"spark.reducer.maxReqSizeShuffleToMem", Long.MaxValue),
detectCorrupt = SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
// 3. 反序列化并聚合
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// Map端已经聚合,Reduce端继续聚合
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// Map端未聚合,Reduce端进行聚合
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, V)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// 4. 排序(如果需要)
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// 创建ExternalSorter进行排序
val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
}
}
5. ShuffleBlockResolver文件管理
// ShuffleBlockResolver核心实现
class IndexShuffleBlockResolver(conf: SparkConf, _blockManager: BlockManager = null)
extends ShuffleBlockResolver with Logging {
// 获取数据文件
def getDataFile(shuffleId: Int, mapId: Long): File = {
new File(getShuffleDataDir(shuffleId), s"shuffle_${shuffleId}_${mapId}_0.data")
}
// 获取索引文件
def getIndexFile(shuffleId: Int, mapId: Long): File = {
new File(getShuffleDataDir(shuffleId), s"shuffle_${shuffleId}_${mapId}_0.index")
}
// 写入索引文件并提交
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Long,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = new File(indexFile.getAbsolutePath + ".tmp")
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// 写入偏移量
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
// 原子性重命名
val dataFile = getDataFile(shuffleId, mapId)
if (dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("Failed to rename data file")
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("Failed to rename index file")
}
} catch {
case e: Exception =>
indexTmp.delete()
throw e
}
}
}
6. 数据流组件交互
sequenceDiagram
participant MapTask
participant ExternalSorter
participant ShuffleWriter
participant ShuffleBlockResolver
participant Disk
participant ReduceTask
participant ShuffleReader
participant BlockManager
MapTask->>ExternalSorter: insertAll(records)
ExternalSorter->>ExternalSorter: 内存排序/Spill
ExternalSorter->>ShuffleWriter: writePartitionedFile()
ShuffleWriter->>ShuffleBlockResolver: writeIndexFileAndCommit()
ShuffleBlockResolver->>Disk: 写入data/index文件
ReduceTask->>ShuffleReader: read()
ShuffleReader->>BlockManager: 获取block位置
BlockManager->>Disk: 读取Shuffle文件
Disk-->>ShuffleReader: 返回数据
ShuffleReader->>ReduceTask: 聚合/排序结果
7. Shuffle性能监控组件
// Shuffle性能指标收集
class ShuffleWriteMetrics extends TaskMetrics {
// 写入字节数
private var _bytesWritten: Long = 0L
// 写入记录数
private var _recordsWritten: Long = 0L
// 写入时间
private var _writeTime: Long = 0L
def bytesWritten: Long = _bytesWritten
def recordsWritten: Long = _recordsWritten
def writeTime: Long = _writeTime
}
class ShuffleReadMetrics extends TaskMetrics {
// 读取字节数
private var _bytesRead: Long = 0L
// 读取记录数
private var _recordsRead: Long = 0L
// 读取时间
private var _readTime: Long = 0L
// 远程读取字节数
private var _remoteBytesRead: Long = 0L
def bytesRead: Long = _bytesRead
def recordsRead: Long = _recordsRead
def readTime: Long = _readTime
def remoteBytesRead: Long = _remoteBytesRead
}
Shuffle性能优化
主要优化策略:
- 压缩:
spark.shuffle.compress,减少网络传输量 - 合理设置分区数:
spark.sql.shuffle.partitions,避免分区过多或过少 - 使用本地化Shuffle:减少网络I/O
- 启用spill机制:内存不足时溢写磁盘,防止OOM
- 聚合缓冲区:Map端本地聚合,减少传输数据量
1. 分区优化策略
# 推荐设置(根据数据量调整)
spark.sql.shuffle.partitions=200
spark.default.parallelism=200
# 动态调整分区数
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
2. 动态资源分配
# 启用动态分配
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.initialExecutors=2
# 资源分配策略
spark.dynamicAllocation.executorIdleTimeout=60s
spark.dynamicAllocation.cachedExecutorIdleTimeout=120s
3. 压缩与序列化优化
| 配置项 | 推荐值 | 说明 |
|---|---|---|
spark.shuffle.compress | true | 启用Shuffle压缩 |
spark.shuffle.compress.codec | snappy | 压缩算法选择 |
spark.serializer | KryoSerializer | 序列化器选择 |
spark.kryo.registrationRequired | false | 是否要求注册类 |
4. 本地化Shuffle优化
# 本地化配置
spark.locality.wait=3s
spark.locality.wait.process=3s
spark.locality.wait.node=3s
spark.locality.wait.rack=3s
5. 高级优化技巧
Map端聚合:
// 使用reduceByKey替代groupByKey
val result = rdd.reduceByKey(_ + _) // 推荐
// val result = rdd.groupByKey().mapValues(_.sum) // 不推荐
广播变量优化:
// 小表广播,避免Shuffle
val smallTable = spark.table("small_table").collect()
val broadcastVar = spark.sparkContext.broadcast(smallTable)
数据倾斜处理
1. 数据倾斜问题
现象:某些分区数据量远大于其他分区,导致Task执行时间差异很大
解决方案:
// 方案1:加盐处理
val skewedRDD = rdd.map(x => {
val key = x._1
val value = x._2
if (isSkewedKey(key)) {
(key + "_" + Random.nextInt(10), value)
} else {
(key, value)
}
})
// 方案2:自定义分区器
class SkewPartitioner(numPartitions: Int) extends Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
// 自定义分区逻辑,避免数据倾斜
val rawKey = key.toString.split("_")(0)
math.abs(rawKey.hashCode) % numPartitions
}
}
2. Shuffle文件过多问题
现象:Shuffle过程中产生大量小文件,影响性能
解决方案:
# 合并小文件
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200
3. 内存溢出问题
现象:Shuffle过程中出现OOM
解决方案:
# 启用Spill机制
spark.shuffle.spill=true
spark.shuffle.spill.compress=true
# 调整内存配置
spark.executor.memory=4g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.3
容错与可靠性
血缘关系容错
Spark容错机制 基于RDD血缘关系实现,是Spark高可用性的核心保障。
血缘关系原理:
graph TD
A["textFile('input')"] --> B["flatMap(_.split(' '))"]
B --> C["map((_, 1))"]
C --> D["reduceByKey(_ + _)"]
D --> E["saveAsTextFile('output')"]
F["分区0丢失"] --> G["根据血缘关系"]
G --> H["重新计算分区0"]
H --> I["恢复数据"]
style A fill:#e1f5fe
style F fill:#ffebee
style I fill:#e8f5e8
容错机制对比:
| 容错方式 | 实现原理 | 恢复速度 | 存储开销 | 适用场景 |
|---|---|---|---|---|
| 血缘关系 | 重新计算丢失分区 | 中等 | 无 | 计算链条较短 |
| Checkpoint | 持久化中间结果 | 快 | 高 | 计算链条很长 |
| 缓存 | 内存/磁盘存储 | 最快 | 中等 | 多次使用的RDD |
容错实现机制:
// RDD血缘关系实现
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable {
// 1. 计算分区数据
def compute(split: Partition, context: TaskContext): Iterator[T]
// 2. 获取依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 3. 分区器
@transient val partitioner: Option[Partitioner] = None
// 4. 首选位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// 5. 容错恢复
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
}
// 依赖关系类型
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
// 窄依赖 - 每个父RDD分区最多被一个子分区使用
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
override def rdd: RDD[T] = _rdd
def getParents(partitionId: Int): Seq[Int]
}
// 宽依赖 - 每个父RDD分区可能被多个子分区使用
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
}
容错恢复流程:
sequenceDiagram
participant Task as 失败任务
participant Executor as Executor
participant Driver as Driver
participant DAGScheduler as DAGScheduler
Task->>Executor: 任务执行失败
Executor->>Driver: 报告任务失败
Driver->>DAGScheduler: 处理任务失败事件
DAGScheduler->>DAGScheduler: 分析失败原因
alt 数据分区丢失
DAGScheduler->>DAGScheduler: 查找血缘关系
DAGScheduler->>DAGScheduler: 重新计算丢失分区
DAGScheduler->>Executor: 重新提交任务
else Executor失败
DAGScheduler->>DAGScheduler: 标记Executor不可用
DAGScheduler->>DAGScheduler: 重新调度所有受影响任务
end
Executor->>Driver: 任务重新执行成功
Driver->>DAGScheduler: 更新任务状态
Checkpoint检查点
Checkpoint 是Spark提供的一种容错优化机制,通过将RDD持久化到可靠存储来缩短血缘关系链条。
Checkpoint实现原理:
// RDD Checkpoint实现
abstract class RDD[T: ClassTag] {
// Checkpoint状态
private var checkpointData: Option[RDDCheckpointData[T]] = None
// 设置Checkpoint目录
def checkpoint(): Unit = {
RDDCheckpointData.synchronized {
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
}
// 执行Checkpoint
private[spark] def doCheckpoint(): Unit = {
RDDCheckpointData.synchronized {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
}
}
}
}
// 检查是否已Checkpoint
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
// 获取Checkpoint文件
def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointDir)
}
// Checkpoint数据管理
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
// Checkpoint文件路径
private val checkpointPath = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
// 执行Checkpoint操作
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, checkpointPath)
// 清理血缘关系
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach(_.registerRDDCheckpointDataForCleanup(newRDD, rdd.id))
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $checkpointPath, new parent is RDD ${newRDD.id}")
newRDD
}
}
Checkpoint使用场景:
| 场景 | 描述 | 使用建议 |
|---|---|---|
| 长血缘链条 | RDD经过多次转换操作 | 在中间节点设置Checkpoint |
| 宽依赖较多 | 包含大量shuffle操作 | 在shuffle后设置Checkpoint |
| 迭代计算 | 机器学习等迭代算法 | 在每次迭代后Checkpoint |
| 共享RDD | 被多个Action使用的RDD | 第一次使用前Checkpoint |
故障恢复机制
Spark故障恢复策略总结:
| 故障级别 | 恢复机制 | 恢复时间 | 数据丢失 | 适用场景 |
|---|---|---|---|---|
| Task失败 | 重新调度Task | 秒级 | 无 | 偶发故障 |
| Executor失败 | 重启Executor,重算丢失分区 | 分钟级 | 无(如有缓存则丢失) | 进程崩溃 |
| Driver失败 | 应用重启 | 小时级 | 应用状态丢失 | 驱动进程异常 |
| 数据分区丢失 | 血缘关系重算 | 分钟级 | 无 | 存储故障 |
| 网络分区 | 推测执行+重试 | 秒到分钟级 | 无 | 网络不稳定 |
资源管理与通信
调度算法策略
Spark支持多种任务调度算法,根据应用场景选择合适的调度策略。
调度算法对比:
| 调度算法 | 调度原理 | 适用场景 | 公平性 | 响应时间 |
|---|---|---|---|---|
| FIFO | 先进先出 | 单用户批处理 | 差 | 差 |
| Fair | 公平共享资源 | 多用户交互式 | 好 | 好 |
| Capacity | 队列容量分配 | 企业级多租户 | 中等 | 中等 |
Fair Scheduler实现原理:
graph TD
A[应用程序提交] --> B[调度池分配]
B --> C{调度算法}
C -->|FIFO| D[队列内FIFO调度]
C -->|Fair| E[公平共享调度]
E --> F[计算资源份额]
F --> G[按缺失资源排序]
G --> H[分配资源]
D --> I[按提交顺序执行]
I --> H
H --> J[资源监控]
J --> K[动态调整]
K --> F
style B fill:#e1f5fe
style F fill:#e8f5e8
style K fill:#fff3e0
// Fair Scheduler核心实现
class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder {
override def buildPools(): Unit = {
val fairSchedulerConfigFile = conf.getOption("spark.scheduler.allocation.file")
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val xmlPath = fairSchedulerConfigFile match {
case Some(file) => file
case None => DEFAULT_SCHEDULER_FILE
}
val rootName = rootPool.name
var is: InputStream = null
try {
is = Utils.getSparkClassLoader.getResourceAsStream(xmlPath)
if (is != null) {
buildPoolsFromXML(is)
} else {
buildDefaultPool()
}
} finally {
if (is != null) is.close()
}
// 构建默认调度池
buildDefaultPool()
}
private def buildDefaultPool(): Unit = {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, 0, 1)
rootPool.addSchedulable(pool)
logInfo("Created default pool: " + DEFAULT_POOL_NAME)
}
}
private def buildPoolsFromXML(is: InputStream): Unit = {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
val schedulingMode = getSchedulingModeValue(poolNode, SCHEDULING_MODE_PROPERTY,
DEFAULT_SCHEDULING_MODE, poolName)
val minShare = getIntValue(poolNode, MINIMUM_SHARES_PROPERTY, 0, poolName)
val weight = getIntValue(poolNode, WEIGHT_PROPERTY, 1, poolName)
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
logInfo("Created pool: " + poolName)
}
}
}
// 公平调度池实现
class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable with Logging {
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
var priority = 0
// Fair调度算法实现
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
// Fair算法比较器
def taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR => new FairSchedulingAlgorithm()
case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm()
case _ => new FairSchedulingAlgorithm()
}
}
}
// 公平调度算法
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
动态资源分配
动态资源分配 允许Spark应用程序根据工作负载动态添加和移除Executor。
动态分配原理图:
graph TD
A[应用程序启动] --> B[初始Executor数量]
B --> C[监控任务队列]
C --> D{资源需求分析}
D -->|任务积压| E[请求新Executor]
D -->|Executor空闲| F[移除空闲Executor]
D -->|负载适中| G[保持当前状态]
E --> H[集群管理器分配资源]
H --> I[启动新Executor]
I --> C
F --> J[优雅关闭Executor]
J --> K[清理缓存数据]
K --> C
G --> C
style E fill:#e8f5e8
style F fill:#ffebee
style C fill:#e1f5fe
动态分配配置:
// 动态资源分配配置
spark.dynamicAllocation.enabled=true
// 初始Executor数量
spark.dynamicAllocation.initialExecutors=2
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=20
// 扩容策略
spark.dynamicAllocation.schedulerBacklogTimeout=1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s
spark.dynamicAllocation.executorAllocationRatio=1
// 缩容策略
spark.dynamicAllocation.executorIdleTimeout=60s
spark.dynamicAllocation.cachedExecutorIdleTimeout=infinity
// 开启External Shuffle Service
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
RPC通信机制
Spark采用基于Netty的RPC框架进行组件间通信,支持异步消息传递和背压控制。
RPC通信架构:
graph TD
subgraph "Driver节点"
A[SparkContext]
B[RpcEnv]
C[NettyRpcEnv]
D[TransportContext]
end
subgraph "Executor节点"
E[CoarseGrainedExecutorBackend]
F[RpcEnv]
G[NettyRpcEnv]
H[TransportContext]
end
A --> B
B --> C
C --> D
D -.->|网络通信| H
H --> G
G --> F
F --> E
style A fill:#e1f5fe
style E fill:#e8f5e8
style D fill:#fff3e0
style H fill:#fff3e0
RPC消息类型:
| 消息类型 | 方向 | 用途 | 示例 |
|---|---|---|---|
| LaunchTask | Driver→Executor | 任务启动 | 分发计算任务 |
| StatusUpdate | Executor→Driver | 状态更新 | 任务完成/失败 |
| KillTask | Driver→Executor | 任务终止 | 取消运行中任务 |
| RegisterExecutor | Executor→Driver | 注册 | Executor启动注册 |
| RemoveExecutor | Driver→Executor | 移除 | 动态资源缩容 |
// RPC消息定义
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
// Driver发送给Executor的消息
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean, reason: String)
extends CoarseGrainedClusterMessage
case object StopExecutor extends CoarseGrainedClusterMessage
// Executor发送给Driver的消息
case class RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
case class StatusUpdate(
executorId: String,
taskId: Long,
state: TaskState,
data: SerializableBuffer)
extends CoarseGrainedClusterMessage
// RPC通信实现
class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
private var driver: Option[RpcEndpointRef] = None
private var executor: Executor = null
override def onStart(): Unit = {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// 向Driver注册Executor
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
case Success(msg) =>
// 注册成功,创建Executor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
}(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread, reason) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread, reason)
}
}
override def statusUpdate(taskId: Long, state: TaskState, data: SerializableBuffer): Unit = {
val resources = executor.runningTasks.get(taskId).map(_.taskDescription.resources)
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
}
序列化与网络传输
Spark支持多种序列化方式,序列化性能直接影响网络传输和存储效率。
序列化器对比:
| 序列化器 | 速度 | 压缩率 | CPU占用 | 适用场景 |
|---|---|---|---|---|
| Java | 慢 | 差 | 高 | 默认配置 |
| Kryo | 快 | 好 | 低 | 生产环境推荐 |
| Avro | 中等 | 很好 | 中等 | 跨语言兼容 |
// 序列化配置
// 使用Kryo序列化器
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=MyKryoRegistrator
spark.kryo.unsafe=true
// 自定义Kryo注册器
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[MyClass])
kryo.register(classOf[AnotherClass])
}
}
// 序列化实现
class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
private val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m")
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').filter(_.nonEmpty)
def newKryo(): Kryo = {
val instantiator = new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())
val kryo = new Kryo()
kryo.setInstantiatorStrategy(instantiator)
kryo.setRegistrationRequired(registrationRequired)
kryo.setReferences(referenceTracking)
// 注册常用类
kryo.register(classOf[Array[Byte]])
kryo.register(None.getClass)
kryo.register(Nil.getClass)
// 用户自定义注册器
for (registrator <- userRegistrators) {
val reg = Utils.classForName(registrator).getConstructor().newInstance()
.asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
kryo
}
override def newInstance(): SerializerInstance = {
new KryoSerializerInstance(this, bufferSizeKb, maxBufferSizeMb)
}
}
Spark SQL与Catalyst
Spark SQL概述
Spark SQL 是Spark用于处理结构化数据的模块,提供了DataFrame和Dataset API。
主要特性
- 统一数据访问:支持多种数据源
- Hive兼容性:完全兼容Hive SQL
- 优化执行:Catalyst优化器
- 代码生成:运行时代码生成
Catalyst优化器
Catalyst 是Spark SQL的查询优化引擎,基于Scala的函数式编程和模式匹配构建,是Spark SQL高性能的核心保障。
Catalyst架构原理
Catalyst核心组件架构:
| 组件 | 功能 | 实现技术 | 关键特性 | 输入 | 输出 |
|---|---|---|---|---|---|
| Parser | SQL解析 | ANTLR4语法解析器 | 语法检查、AST生成 | SQL字符串 | 未解析逻辑计划 |
| Analyzer | 语义分析 | 规则引擎 | 类型检查、字段解析 | 未解析逻辑计划 | 已解析逻辑计划 |
| Optimizer | 逻辑优化 | 规则+成本优化 | 谓词下推、Join重排序 | 已解析逻辑计划 | 优化逻辑计划 |
| Planner | 物理计划 | 策略模式 | 算法选择、计划生成 | 优化逻辑计划 | 物理计划 |
| CodeGen | 代码生成 | Scala代码生成 | 全阶段代码生成 | 物理计划 | 可执行代码 |
优化流程详解
完整优化流程图:
graph TD
A[SQL/DataFrame API] --> B[SQL Parser]
B --> C[未解析逻辑计划<br/>Unresolved Logical Plan]
C --> D[Analyzer分析器]
D --> E[已解析逻辑计划<br/>Resolved Logical Plan]
E --> F[Catalyst优化器]
F --> G[优化逻辑计划<br/>Optimized Logical Plan]
G --> H[SparkPlanner规划器]
H --> I[物理计划候选<br/>Physical Plan Candidates]
I --> J[成本评估<br/>Cost-based Optimizer]
J --> K[最优物理计划<br/>Selected Physical Plan]
K --> L[代码生成<br/>Code Generation]
L --> M[可执行代码<br/>Generated Code]
M --> N[RDD执行]
%% 详细子流程
F --> F1[规则优化器<br/>RuleExecutor]
F1 --> F2[谓词下推<br/>Predicate Pushdown]
F1 --> F3[投影剪枝<br/>Column Pruning]
F1 --> F4[常量折叠<br/>Constant Folding]
F1 --> F5[Join重排序<br/>Join Reordering]
H --> H1[Join策略选择]
H --> H2[聚合策略选择]
H --> H3[排序策略选择]
style A fill:#f9f9f9
style F fill:#e1f5fe
style J fill:#fff3e0
style L fill:#e8f5e8
style N fill:#c8e6c9
各阶段详细说明:
解析阶段(Parsing)
// SQL解析示例 val sqlText = "SELECT name, age FROM users WHERE age > 25" val parser = new SparkSqlParser() val logicalPlan = parser.parsePlan(sqlText)分析阶段(Analysis)
// 语义分析过程 class Analyzer(catalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { val batches = Batch("Resolution", fixedPoint, ResolveReferences :: ResolveRelations :: ResolveFunctions :: ResolveAliases :: Nil) :: Nil }优化阶段(Optimization)
- 规则优化:基于预定义规则进行转换
- 成本优化:基于统计信息选择最优策略
规划阶段(Planning)
- 将逻辑计划转换为可执行的物理计划
- 选择具体的执行算法和策略
核心优化规则
Catalyst主要优化规则分类表:
| 优化类别 | 规则名称 | 优化原理 | 性能提升 | 应用场景 | 示例 |
|---|---|---|---|---|---|
| 谓词优化 | Predicate Pushdown | 将过滤条件下推至数据源 | 减少数据传输 | 有WHERE条件的查询 | 下推到Parquet文件 |
| 谓词优化 | Predicate Pullup | 将过滤条件上提合并 | 减少重复计算 | 复杂子查询 | Union后的过滤合并 |
| 投影优化 | Column Pruning | 剪枝不需要的列 | 减少IO和内存 | 宽表查询 | 只读取需要的列 |
| 投影优化 | Projection Pushdown | 投影操作下推 | 减少数据量 | 嵌套查询 | 向子查询推送列选择 |
| 常量优化 | Constant Folding | 编译时计算常量表达式 | 减少运行时计算 | 包含常量运算 | 1+2 → 3 |
| 常量优化 | Constant Propagation | 传播常量值 | 简化表达式 | 有等值条件 | col=5 AND col>3 → col=5 |
| Join优化 | Join Reordering | 重排序Join顺序 | 减少中间数据 | 多表Join | 基于表大小和选择性 |
| Join优化 | Broadcast Join | 广播小表 | 避免Shuffle | 大小表Join | 自动广播小于10MB的表 |
| 子查询优化 | Subquery Elimination | 消除不必要子查询 | 简化执行计划 | 相关子查询 | 将子查询转换为Join |
| 空值优化 | Null Propagation | 传播NULL值 | 提前短路 | 包含NULL的表达式 | NULL AND expr → NULL |
深入优化规则实现
1. 谓词下推实现原理:
// 谓词下推规则实现
object PushDownPredicates extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
pushDownPredicate(condition, child)
}
private def pushDownPredicate(condition: Expression, plan: LogicalPlan): LogicalPlan = {
plan match {
case Project(projectList, grandChild) =>
// 将过滤条件下推到投影操作之下
val newCondition = replaceAlias(condition, aliasMap(projectList))
Project(projectList, Filter(newCondition, grandChild))
case Join(left, right, joinType, joinCondition) =>
// 将过滤条件分解并分别推送到Join的左右子树
val (leftFilters, rightFilters, remainingFilters) =
splitConjunctivePredicates(condition, left, right)
val newLeft = if (leftFilters.nonEmpty) {
Filter(leftFilters.reduce(And), left)
} else left
val newRight = if (rightFilters.nonEmpty) {
Filter(rightFilters.reduce(And), right)
} else right
val newJoin = Join(newLeft, newRight, joinType, joinCondition)
if (remainingFilters.nonEmpty) {
Filter(remainingFilters.reduce(And), newJoin)
} else newJoin
case _ => Filter(condition, plan)
}
}
}
2. Join优化策略:
// Join策略选择
object JoinStrategySelection extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
// 1. 检查是否可以使用广播Join
if (canBroadcast(left)) {
BroadcastHashJoinExec(leftKeys, rightKeys, joinType,
BuildLeft, condition, planLater(left), planLater(right)) :: Nil
} else if (canBroadcast(right)) {
BroadcastHashJoinExec(leftKeys, rightKeys, joinType,
BuildRight, condition, planLater(left), planLater(right)) :: Nil
}
// 2. 使用SortMergeJoin
else {
SortMergeJoinExec(leftKeys, rightKeys, joinType,
condition, planLater(left), planLater(right)) :: Nil
}
case _ => Nil
}
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
}
成本优化器(CBO)
成本优化器工作原理:
| 组件 | 功能 | 成本因子 | 算法 | 应用场景 |
|---|---|---|---|---|
| 统计收集 | 收集表和列统计信息 | 行数、数据大小、基数 | 采样统计 | 所有表 |
| 基数估算 | 估算中间结果大小 | 选择性、相关性 | 直方图分析 | 过滤和Join |
| 成本计算 | 计算执行计划成本 | CPU、IO、网络 | 成本模型 | 计划选择 |
| 计划选择 | 选择最优执行计划 | 总成本最小 | 动态规划 | 多表Join |
CBO配置与使用:
// 启用成本优化器
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.cbo.planStats.enabled", "true")
// 收集表统计信息
sql("ANALYZE TABLE users COMPUTE STATISTICS")
sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS user_id, order_date, amount")
// 查看统计信息
sql("DESCRIBE EXTENDED users").show()
代码生成引擎
Whole-Stage Code Generation原理:
graph TD
A[物理计划树] --> B[代码生成遍历]
B --> C[表达式代码生成]
B --> D[算子代码生成]
C --> E[组合生成单一函数]
D --> E
E --> F[Java代码编译]
F --> G[字节码加载]
G --> H[高效执行]
style A fill:#f9f9f9
style E fill:#e1f5fe
style F fill:#fff3e0
style H fill:#e8f5e8
代码生成示例:
// 原始查询
df.select($"id", $"name", $"age" + 1).filter($"age" > 25)
// 生成的代码(简化版)
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends UnsafeProjection {
/* 006 */ private UnsafeRow result;
/* 007 */ private BufferHolder holder;
/* 008 */ private UnsafeRowWriter writer;
/* 009 */
/* 010 */ public SpecificUnsafeProjection(Object[] references) {
/* 011 */ result = new UnsafeRow(3);
/* 012 */ holder = new BufferHolder(result, 32);
/* 013 */ writer = new UnsafeRowWriter(holder, 3);
/* 014 */ }
/* 015 */
/* 016 */ public UnsafeRow apply(InternalRow i) {
/* 017 */ boolean isNull_0 = i.isNullAt(2);
/* 018 */ int value_0 = isNull_0 ? -1 : (i.getInt(2));
/* 019 */ if (!(!(isNull_0) && value_0 > 25)) return null; // filter condition
/* 020 */
/* 021 */ holder.reset();
/* 022 */ writer.zeroOutNullBytes();
/* 023 */
/* 024 */ // id
/* 025 */ if (i.isNullAt(0)) {
/* 026 */ writer.setNullAt(0);
/* 027 */ } else {
/* 028 */ writer.write(0, i.getLong(0));
/* 029 */ }
/* 030 */
/* 031 */ // name
/* 032 */ if (i.isNullAt(1)) {
/* 033 */ writer.setNullAt(1);
/* 034 */ } else {
/* 035 */ writer.write(1, i.getUTF8String(1));
/* 036 */ }
/* 037 */
/* 038 */ // age + 1
/* 039 */ if (isNull_0) {
/* 040 */ writer.setNullAt(2);
/* 041 */ } else {
/* 042 */ writer.write(2, value_0 + 1);
/* 043 */ }
/* 044 */
/* 045 */ result.pointTo(holder.buffer, holder.totalSize());
/* 046 */ return result;
/* 047 */ }
/* 048 */ }
代码生成优势对比:
| 执行方式 | 虚拟函数调用 | 条件分支 | 数据访问 | 内存管理 | 性能提升 |
|---|---|---|---|---|---|
| 解释执行 | 多层虚拟调用 | 大量if-else | 通过接口访问 | 大量临时对象 | 基线 |
| 代码生成 | 直接调用 | 编译优化 | 直接内存访问 | 复用对象 | 2-10倍 |
自适应查询执行(AQE)
AQE关键特性:
| 特性 | 优化目标 | 工作原理 | 触发条件 | 性能收益 |
|---|---|---|---|---|
| 动态分区合并 | 减少小文件 | 运行时合并小分区 | 分区大小 < 阈值 | 提高并发度 |
| 动态Join策略 | 选择最优Join | 运行时切换Join类型 | 表大小 < 广播阈值 | 避免Shuffle |
| 动态倾斜处理 | 解决数据倾斜 | 拆分大分区 | 分区大小差异大 | 提升整体速度 |
// AQE配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
优化器扩展与定制
自定义优化规则:
// 自定义优化规则示例
case class RemoveRedundantFilters() extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(condition1, Filter(condition2, child)) =>
// 合并两个相邻的Filter
Filter(And(condition1, condition2), child)
case Filter(Literal(true, BooleanType), child) =>
// 移除恒为true的过滤条件
child
case Filter(Literal(false, BooleanType), child) =>
// 将恒为false的过滤转换为空结果
LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
}
}
// 注册自定义规则
spark.experimental.extraOptimizations = Seq(RemoveRedundantFilters())
Catalyst优化器的核心优势:
- 可扩展性:基于规则的设计易于扩展
- 类型安全:Scala类型系统保证正确性
- 高性能:代码生成技术带来显著性能提升
- 智能化:成本优化器自动选择最优策略
- 统一性:支持SQL、DataFrame、Dataset的统一优化
通过深入理解Catalyst优化器的工作原理,可以编写更高效的Spark SQL应用,并在必要时通过自定义规则进一步提升性能。
SparkSQL 实用函数与语法
日期与时间处理
-- 获取当前时间戳
SELECT current_timestamp() as current_time,
unix_timestamp() as unix_timestamp;
-- 时间戳转换
SELECT from_unixtime(1640995200) as datetime,
unix_timestamp('2022-01-01 00:00:00') as timestamp;
-- date_format(to_date(create_time), 'yyyyMMdd') // 默认标准格式的时间戳
-- 日期加减操作
SELECT date_add('2022-01-01', 7) as next_week,
date_sub('2022-01-01', 7) as last_week,
add_months('2022-01-01', 1) as next_month;
-- 日期差计算
SELECT datediff('2022-01-15', '2022-01-01') as days_diff,
months_between('2022-03-01', '2022-01-01') as months_diff;
-- 日期格式化
SELECT date_format('2022-01-01', 'yyyy-MM-dd') as formatted_date,
to_date('2022-01-01') as date_type;
-- 注意传入的是字符串,而不是int类型,会报错
SELECT date_format(to_date('20250506', 'yyyyMMdd'), 'yyyy-MM-dd') AS fmt
字符串处理
-- 字符串连接
SELECT concat('Hello', ' ', 'World') as greeting,
concat_ws('-', '2022', '01', '01') as date_str;
concat的内容有null的话返回的是null,concat_ws会跳过null值
scala> spark.sql("select concat('dd','fd',12,null),concat_ws('-','ddd',12,null,44)").show
+------------------------+-------------------------------+
|concat(dd, fd, 12, NULL)|concat_ws(-, ddd, 12, NULL, 44)|
+------------------------+-------------------------------+
| null| ddd-12-44|
+------------------------+-------------------------------+
-- 字符串截取
注意第一位索引是1,而不是0
SELECT substring('Hello World', 1, 5) as substring,
substr('Hello World', 7) as substr_from_7;
-- 字符串替换
SELECT replace('Hello World', 'World', 'Spark') as replaced,
regexp_replace('Hello123World', '\\d+', '') as no_digits;
-- 正则表达式提取
SELECT regexp_extract('Hello123World456', '(\\d+)', 1) as first_number,
regexp_extract('[email protected]', '([^@]+)@([^@]+)', 1) as username;
-- 字符串分割
SELECT split('a,b,c,d', ',') as split_array,
size(split('a,b,c,d', ',')) as array_size;
scala> spark.sql("select split('1.32.252.0','\\\\.')[0]").show
+----------------------------+
|split(1.32.252.0, \., -1)[0]|
+----------------------------+
| 1|
+----------------------------+
16进制和十进制互转
mac加一位,减一位
scala> spark.sql("select lower(conv(cast(conv('1c1a1b64e2f9', 16, 10) as long)+1,10,16)) mac1,lower(conv(cast(conv('1c1a1b64e2f9', 16, 10) as long)-1,10,16)) mac2 ").show
+------------+------------+
| mac1| mac2|
+------------+------------+
|1c1a1b64e2fa|1c1a1b64e2f8|
+------------+------------+
数组与集合操作
-- 数组创建,索引从0开始,substr函数索引从1开始 array[0] ,方括号
SELECT array(1, 2, 3) as simple_array,
array_contains(array(1, 2, 3), 2) as contains_2;
-- spark 字符串生成数组,分割字符串有转义符号的要多转义一次,有4个反斜杠,在ck中不需要
woi.filter("ys_duplicate_flag=1").withColumn("geohash7",expr("split(detail,'\\\\x01')")).select("detail","geohash7").show(false)
-- 数组展开(行转列)
SELECT explode(array(1, 2, 3)) as exploded_value;
-- 数组聚合(列转行)
SELECT collect_list(column_name) as list_agg,
collect_set(column_name) as set_agg;
-- 数组交集 返回1或者0
SELECT arrays_overlap(array(1, 2, 3), array(2, 3, 4)) as has_overlap;
-- 复杂数组操作
SELECT
id,
explode(split(tags, ',')) as tag
FROM user_tags;
-- collect字段转数组
val ip_arr=ip_all.select("ip_range").as[String].collect
-- 数组创建,打平,去重 flatten入参是二维数组
scala> spark.sql("select array_distinct(flatten(array(array(1,2,4),array(2,3,4))))").show
+--------------------------------------------------------------+
|array_distinct(flatten(array(array(1, 2, 4), array(2, 3, 4))))|
+--------------------------------------------------------------+
| [1, 2, 4, 3]|
+--------------------------------------------------------------+
--- 数组合并 array_union 会去重
scala> spark.sql("select array_union(array(1,2,4),array(2,3,4)) as arr").show
+------------+
| arr|
+------------+
|[1, 2, 4, 3]|
+------------+
--- 数组交集 array_intersect
scala> spark.sql("select array_intersect(array(1, 2, 3), array(2, 3, 4))").show
+-----------------------------------------------+
|array_intersect(array(1, 2, 3), array(2, 3, 4))|
+-----------------------------------------------+
| [2, 3]|
+-----------------------------------------------+
select array_distinct(filter(array_union(array(dim_geohash),transform(split(geo_list,','),x->split(x,'#')[1])),x->length(x)=12))
-- 数组去重、过滤、转换
select filter(array_distinct(transform(ssids,x->substr(split(x,'#')[1],1,6))),x->length(x)>0)
-- 数组排序、反转、切分
scala> spark.sql("Select slice(reverse(array_sort(array(4,3,7,10,6,2,1,8))),1,5) data").show(false)
+----------------+
|data |
+----------------+
|[10, 8, 7, 6, 4]|
+----------------+
-- 数组包含
-- 如果 array 里有 NULL,而你查的元素不是 NULL,则 NULL 不会影响结果。
-- 如果 element 是 NULL,那么结果就是:只要 array 里有 NULL,就返回 true;否则返回 false。
SELECT array_contains(array('a', 'b', 'c'), 'd');
-- false
SELECT array_contains(array(1, NULL, 3), NULL);
-- true
JSON处理
-- JSON解析
SELECT get_json_object('{"name": "John", "age": 30}', '$.name') as name,
get_json_object('{"name": "John", "age": 30}', '$.age') as age;
-- JSON数组处理
SELECT json_array_length('[1, 2, 3, 4]') as array_length;
-- 转换为JSON
SELECT to_json(struct('John' as name, 30 as age)) as json_string;
-- 复杂JSON操作
SELECT
user_id,
get_json_object(profile, '$.email') as email,
get_json_object(profile, '$.address.city') as city
FROM user_profiles;
条件与判断
-- CASE WHEN语句
SELECT
name,
age,
CASE
WHEN age < 18 THEN '未成年'
WHEN age < 30 THEN '青年'
WHEN age < 50 THEN '中年'
ELSE '老年'
END as age_group
FROM users;
-- IF函数
SELECT
name,
IF(age >= 18, '成年', '未成年') as adult_status,
IFNULL(email, '无邮箱') as email_info
FROM users;
-- COALESCE函数
SELECT
user_id,
COALESCE(nickname, real_name, 'Unknown') as display_name
FROM user_info;
唯一ID生成方法
在Spark SQL中,生成唯一ID是常见需求,特别是在数据去重、主键生成、数据标识等场景。以下介绍几种常用的唯一ID生成方法及其特点对比。
-- 1. 单调递增ID (monotonically_increasing_id)
SELECT
monotonically_increasing_id() as row_id,
name,
email
FROM users;
-- 2. UUID生成
SELECT
uuid() as unique_id,
name,
email
FROM users;
-- 3. Hash函数生成
SELECT
hash(name, email) as hash_id,
xxhash64(name, email) as xxhash_id,
name,
email
FROM users;
-- 4. MD5哈希
SELECT
md5(concat(name, email, cast(unix_timestamp() as string))) as md5_id,
name,
email
FROM users;
-- 5. SHA系列哈希
SELECT
sha1(concat(name, email)) as sha1_id,
sha2(concat(name, email), 256) as sha256_id,
name,
email
FROM users;
-- 6. 基于时间戳的ID生成
SELECT
concat(cast(unix_timestamp() as string), '_',
cast(monotonically_increasing_id() as string)) as timestamp_id,
name,
email
FROM users;
-- 7. 自定义格式ID生成
SELECT
concat('USER_', lpad(cast(row_number() OVER (ORDER BY name) as string), 8, '0')) as custom_id,
name,
email
FROM users;
唯一ID生成方法对比
| 方法 | 存储空间 | 性能 | 唯一性保证 | 重复概率 | 预期重复数据量 | 可读性 | 排序性 | 适用场景 |
|---|---|---|---|---|---|---|---|---|
| monotonically_increasing_id() | 8字节(Long) | 极高 | 全局唯一 | 0% | 永不重复 | 差 | 是 | 行标识、排序需求 |
| uuid() | 36字节(String) | 高 | 极高 | ~10^-37 | 10^28亿 | 中 | 否 | 分布式环境、外部系统对接 |
| hash() | 8字节(Long) | 极高 | 数据相同则相同 | 1/2^64 ≈ 5.4×10^-20 | 1844亿 | 差 | 否 | 数据去重、快速查找 |
| xxhash64() | 8字节(Long) | 极高 | 数据相同则相同 | 1/2^64 ≈ 5.4×10^-20 | 1844亿 | 差 | 否 | 高性能哈希、分桶 |
| md5() | 32字节(String) | 中 | 极高 | 1/2^128 ≈ 2.9×10^-39 | 3.4×10^29亿 | 差 | 否 | 数据完整性校验、去重 |
| sha1() | 40字节(String) | 中 | 极高 | 1/2^160 ≈ 6.8×10^-49 | 1.5×10^39亿 | 差 | 否 | 安全性要求较高场景 |
| sha2(256) | 64字节(String) | 低 | 极高 | 1/2^256 ≈ 8.6×10^-78 | 1.2×10^68亿 | 差 | 否 | 高安全性要求 |
| timestamp_id | 可变 | 高 | 高 | 取决于时间精度 | 毫秒级:1000万条/秒 | 中 | 是 | 时间相关业务、调试 |
| custom_id | 可变 | 中 | 依赖设计 | 取决于设计方案 | 取决于设计方案 | 高 | 是 | 业务友好、人工可读 |
使用建议:
- 性能优先场景:使用
monotonically_increasing_id()或hash()/xxhash64() - 分布式环境:推荐使用
uuid(),保证跨系统唯一性 - 数据去重:使用
hash()或md5(),基于数据内容生成 - 安全敏感:使用
sha2(256)或更高级别的哈希算法 - 业务可读性:使用自定义格式ID,包含业务前缀和序号
- 存储敏感:优先选择数值类型ID(8字节),避免长字符串
注意事项:
-- monotonically_increasing_id() 注意点
-- 1. 只保证单调递增,不保证连续
-- 2. 在不同分区间可能有大的跳跃
-- 3. 重新运行查询会生成不同的ID
-- hash函数的碰撞处理
SELECT
name, email,
hash(name, email) as hash_id,
-- 组合多字段降低碰撞概率
hash(name, email, cast(created_time as string)) as enhanced_hash
FROM users;
-- UUID的格式化处理
SELECT
uuid() as full_uuid,
replace(uuid(), '-', '') as compact_uuid,
substring(uuid(), 1, 8) as short_uuid
FROM users;
窗口函数
-- ROW_NUMBER() 行号
SELECT
name,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) as rank
FROM employees;
-- RANK() 排名(相同值相同排名,跳过)
SELECT
name,
salary,
RANK() OVER (ORDER BY salary DESC) as rank
FROM employees;
-- DENSE_RANK() 密集排名(相同值相同排名,不跳过)
SELECT
name,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) as rank
FROM employees;
-- LAG/LEAD 前后值
SELECT
date,
sales,
LAG(sales, 1) OVER (ORDER BY date) as prev_sales,
LEAD(sales, 1) OVER (ORDER BY date) as next_sales
FROM daily_sales;
-- 分区窗口函数
SELECT
department,
name,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank
FROM employees;
聚合函数
-- 基本聚合
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
SUM(salary) as total_salary,
MIN(salary) as min_salary,
MAX(salary) as max_salary
FROM employees
GROUP BY department;
-- 条件聚合
SELECT
department,
COUNT(CASE WHEN gender = 'M' THEN 1 END) as male_count,
COUNT(CASE WHEN gender = 'F' THEN 1 END) as female_count,
AVG(CASE WHEN age > 30 THEN salary END) as senior_avg_salary
FROM employees
GROUP BY department;
-- 分析函数
type27Raw.groupBy("priority","uuid").count.groupBy("priority").agg(
avg("count").as("avg_num"),
-- 求分位数
percentile_approx(col("count"), array(lit(0.25), lit(0.5), lit(0.75)), lit(1000)).as("percentage"),
-- 偏度 0完全对称; >0 右偏; <0 左偏 ;> 1 或 < -1 明显偏态
skewness("count").as("skewness"),
-- 峰度 0 正态分布 >0 尾部更重、峰更尖(极端值更多) <0 尾更轻、峰更平(更平均)
-- kurtosis 越大,不代表极端值“数量”多,而是“出现极端值的概率分布更重”。
kurtosis("count").as("kurtosis")
).sort("priority").show(false)
-- 去重聚合
SELECT
department,
COUNT(DISTINCT employee_id) as unique_employees
FROM employees
GROUP BY department;
UDF/UDAF 注册与使用
-- 解析put2String
import com.glab.function.utils.Put2StringUtil;
import scala.collection.JavaConverters._
val put2string_read = udf((arg1: String,arg2: String) => Put2StringUtil.read(arg1, arg2).asScala.toSeq)
spark.udf.register("put2string_read",put2string_read);
-- 有row、qualifier、value、cf
lbs.withColumn("row",expr("put2string_read(value,'row')[0]")).withColumn("qualifier",expr("put2string_read(value,'qualifier')[0]")).withColumn("value",expr("put2string_read(value,'value')[0]")).show(false)
# ipv4转long
spark.sql("create function ipv4ToLong as 'com.glab.function.ip.IPV42Long' using jar 'hdfs://gt-ga-xs/tmp/wuxl/udf/ys-bi-udf-hive-function-0.0.0.jar'")
spark.sql("DROP FUNCTION IF EXISTS default.longToIpv4")
spark.sql("create function longToIpv4 as 'com.glab.function.ip.Long2Ipv4' using jar 'hdfs://gt-ga-xs/tmp/wuxl/udf/ys-bi-udf-hive-function-0.0.0.jar'")
--ipv6 补全
def expandIpv6(ip: String): String = {
import java.net.InetAddress
if (ip == null) return null
// 去掉 zone id(例如 "%eth0")
val noZone = ip.split("%")(0).trim
if (noZone.isEmpty) return null
try {
val addr = InetAddress.getByName(noZone)
val bytes = addr.getAddress
// 只有 IPv6 才处理(长度应为16)
if (bytes == null || bytes.length != 16) return null
// 两字节一组,格式化为4位16进制(小写)
val sb = new StringBuilder(39) // 8*4 + 7 ':' = 39 chars
for (i <- 0 until 8) {
if (i > 0) sb.append(':')
val hi = bytes(2 * i) & 0xff
val lo = bytes(2 * i + 1) & 0xff
val v = (hi << 8) | lo
sb.append(f"$v%04x") // 小写,宽度4,不足补0
}
sb.toString
} catch {
case _: Throwable => null
}
}
spark.udf.register("expandIpv6", expandIpv6 _)
-- ip段打平
def ip_range(minip: String, maxip: String) = {
import scala.collection.immutable
val minIpArray = minip.split("\\.")
val maxIpArray = maxip.split("\\.")
if (minip == maxip) {
immutable.Seq(minip)
} else {
val arr: immutable.Seq[String] = for {
i <- minIpArray(2).toInt to maxIpArray(2).toInt
} yield minIpArray(0) + "." + minIpArray(1) + "." + i
arr
}
}
spark.udf.register("ip_range_arr", ip_range _)
scala> spark.sql("select ip_range_arr('5.28.32','5.28.33')").show
+------------------------------+
|ip_range_arr(5.28.32, 5.28.33)|
+------------------------------+
| [5.28.32, 5.28.33]|
+------------------------------+
// 校验日期是否合法
val isValidDateTime = udf((s: String) => {
import org.apache.spark.sql.api.java.UDF1;
import java.text.SimpleDateFormat;
if (s == null ) false
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
format.setLenient(false) // 严格模式,避免 2025-13-40 这种通过
try {
format.parse(s)
true
} catch {
case _: Exception => false
}
})
// 注册UDF
spark.udf.register("isValidDateTime", isValidDateTime)
<!-- mac前缀变换 -->
val expandMacUdf = udf((mac: String) => {
if (mac == null || mac.length != 12) Set.empty[String]
// MAC 转大写
val baseMac = mac.toUpperCase
// 遍历 00 ~ FF 的十六进制组合
val hexList = (0 until 256).map(i => f"$i%02X")
val prefixVariants = hexList.map(p => p + baseMac.drop(2))
val suffixVariants = hexList.map(s => baseMac.dropRight(2) + s)
// 去重合并
(prefixVariants ++ suffixVariants).map(_.toLowerCase).toSet
})
spark.udf.register("expandMacUdf",expandMacUdf)
注册与调用要点
- SQL 注册:
spark.udf.register("fn", func)后即可在 SQL 中select fn(col) ...。 - DataFrame API:
F.udf(func)后用select(F.callUDF("fn", col))或expr("fn(col)")。 - UDAF(Scala):实现
UserDefinedAggregateFunction或使用Aggregator(typed)。 - PySpark 性能优先:尽量使用内置函数或
pandas_udf代替普通 UDF。
性能与注意事项
- 尽量优先使用内置函数(Catalyst 可优化);UDF 一旦进入黑盒,优化受限。
- PySpark 普通 UDF 有 Python-JVM 边界开销;
pandas_udf借助 Arrow 可以显著提升性能。 - 指定 UDF 返回类型,避免推断导致的性能/兼容问题。
- 复杂逻辑可考虑 SQL
CASE WHEN、transform/filter/aggregate高阶函数替代。
性能调优与优化
写出排序优化
写出ck的时候要对数据按照不同节点进行分区,然后分区内由按partition分part,同part内按主键排序,可以这样实现 分区:shard+分区字段;排序:分区内按主键字段的顺序排序;写出:按照shard分目录,对应ck节点
val data=dataframe.repartition("shard","part")
.sortWithinPartitions("id_md5","wifimac")
data.write.partitionBy("shard").parquet(oupath)
好处
- 这样不需要全局shuffle,只要分区内有序就好
- repartition的时候同一个part内的数据已经提前合到一起了
- partitionBy只使用shard字段,part内的数据不会乱序,不需要重新排
查询与作业优化
存储格式优化
推荐存储格式对比:
| 格式 | 压缩比 | 查询速度 | 写入速度 | 适用场景 |
|---|---|---|---|---|
| Parquet | 高 | 快 | 中等 | 分析查询,列式存储 |
| ORC | 很高 | 很快 | 快 | Hive集成,高压缩比 |
| Avro | 中等 | 中等 | 快 | 行式存储,Schema演进 |
| JSON | 低 | 慢 | 快 | 开发调试,灵活性高 |
Parquet格式优化:
// 推荐使用Parquet格式
df.write.mode("overwrite").parquet("data.parquet")
val optimizedDF = spark.read.parquet("data.parquet")
// 配置压缩
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
// 设置列式读取批次大小
spark.conf.set("spark.sql.parquet.columnarReaderBatchSize", "10000")
// 启用向量化读取
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
ORC格式优化:
// 使用ORC格式
df.write.format("orc").mode("overwrite").save("data.orc")
// ORC优化配置
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
分区策略优化
时间分区策略:
// 按时间分区(推荐)
df.write
.partitionBy("year", "month", "day")
.parquet("time_partitioned_data")
// 避免过度分区
val dailyData = df.withColumn("date", to_date($"timestamp"))
dailyData.write
.partitionBy("date")
.parquet("daily_partitioned_data")
// 动态分区裁剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.useStats", "true")
业务分区策略:
// 按业务字段分区
df.write
.partitionBy("region", "category")
.parquet("business_partitioned_data")
// 分区数控制
val numPartitions = spark.conf.get("spark.sql.shuffle.partitions", "200").toInt
val optimalPartitions = Math.max(100, Math.min(numPartitions, 1000))
df.repartition(optimalPartitions, $"partition_key")
.write
.partitionBy("partition_key")
.parquet("optimized_data")
分区裁剪优化:
// 启用分区裁剪
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// 查询时使用分区过滤
val result = spark.read.parquet("partitioned_data")
.filter($"year" === 2023 && $"month" >= 6) // 有效分区裁剪
.select("id", "name", "value")
谓词下推优化
启用谓词下推:
// 启用各种数据源的谓词下推
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.json.filterPushdown", "true")
// JDBC谓词下推
spark.conf.set("spark.sql.pushDownPredicate", "true")
优化示例:
// 原始查询(未优化)
val df = spark.read.parquet("large_dataset.parquet")
val result = df.select("*").filter($"age" > 18 && $"city" === "Beijing")
// 优化后查询(谓词下推)
val result = spark.read.parquet("large_dataset.parquet")
.filter($"age" > 18) // 谓词下推到数据源
.filter($"city" === "Beijing") // 谓词下推到数据源
.select("id", "name", "age") // 列裁剪
复杂谓词优化:
// 组合条件优化
val complexFilter = ($"age".between(18, 65)) &&
($"salary" > 50000) &&
($"department".isin("IT", "Finance"))
val result = spark.read.parquet("employee_data.parquet")
.filter(complexFilter) // 复杂谓词会被自动优化和下推
.select("id", "name", "salary")
Join优化
Join策略选择
Join类型对比:
| Join类型 | 适用场景 | 优势 | 劣势 | 触发条件 |
|---|---|---|---|---|
| Broadcast Join | 小表Join大表 | 无Shuffle,性能最好 | 小表必须能放入内存 | 小表 < 10MB |
| Sort Merge Join | 大表Join大表 | 内存友好,稳定 | 需要Shuffle | 默认Join |
| Shuffle Hash Join | 中等表Join | 内存使用适中 | 需要Shuffle | 中等数据量 |
| Cartesian Join | 笛卡尔积 | 简单 | 性能极差 | 无Join键 |
广播Join优化
自动广播配置:
// 设置自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
// 启用自适应查询执行
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
手动广播优化:
// 手动广播小表
val smallTable = spark.table("small_table")
val broadcastDF = broadcast(smallTable)
val result = largeTable.join(broadcastDF, "id")
// 强制广播(即使超过阈值)
val result = largeTable.join(broadcast(mediumTable), "id")
// 不等式广播join
com_ipv6.join(broadcast(ipv6),com_ipv6("public_ip")>=ipv6("minip") and com_ipv6("public_ip")<ipv6("maxip")).show(false)
// 广播变量优化
val lookupMap = smallTable.collect()
.map(row => row.getAs[String]("key") -> row.getAs[String]("value"))
.toMap
val broadcastMap = spark.sparkContext.broadcast(lookupMap)
// 广播大变量,然后udf方法使用该变量,进行本地不等式join
val ipv6_arr=ipv6.map(_.getAs[String]("minip")).collect.sorted
val ipv6_arr_broadcast = spark.sparkContext.broadcast(ipv6_arr)
// 注册 UDF
spark.udf.register("get_ipv6_range", (code: String) => {
findIpv6Index(ipv6_arr_broadcast.value,code)
})
大变量用完之后销毁,否则一直占用内存
broadcastMap.unpersist(true)
val enrichedData = largeTable.map { row =>
val key = row.getAs[String]("key")
val enrichValue = broadcastMap.value.getOrElse(key, "unknown")
(row, enrichValue)
}
Sort Merge Join优化
预排序优化:
如果join key有默认值不需要参与join,就设置为null,设置成其他值还是会被分发然后参与join的,null不参与
// 预先排序减少Shuffle成本
val sortedTable1 = table1.sort("join_key")
val sortedTable2 = table2.sort("join_key")
val result = sortedTable1.join(sortedTable2, "join_key")
// 使用分桶表
table1.write
.bucketBy(10, "join_key")
.sortBy("join_key")
.saveAsTable("bucketed_table1")
table2.write
.bucketBy(10, "join_key")
.sortBy("join_key")
.saveAsTable("bucketed_table2")
// 分桶表Join(无Shuffle)
val result = spark.table("bucketed_table1")
.join(spark.table("bucketed_table2"), "join_key")
数据倾斜处理
倾斜检测:
// 检测Join键分布
val keyDistribution = largeTable
.groupBy("join_key")
.count()
.orderBy($"count".desc)
keyDistribution.show(20) // 查看top 20的键分布
// 统计分析
val stats = keyDistribution.agg(
avg("count").as("avg_count"),
max("count").as("max_count"),
min("count").as("min_count"),
stddev("count").as("stddev_count")
)
stats.show()
//join前先对数据按照相同key repartition
spark.read.parquet("/tmp/logs/data/huangg/mac_uuid/20251015").withColumn("geo6_equal",expr("substr(type27_geohash,1,6)=substr(woi_geohash,1,6)")).withColumn("public_ip",expr("case when public_ip like '%:%' then expandIpv6(public_ip) else public_ip end")).withColumn("ipv4_minip",expr("case when public_ip like '%.%' then get_ipv4_range(ipv4ToLong(public_ip)) else -1 end")).withColumn("ipv6_minip",expr("case when public_ip like '%:%' then get_ipv6_range(public_ip) else '' end")).na.fill("").repartition(6000,col("ipv4_minip")).join(ipv4.repartition(6000,col("ipv4_minip")),Seq("ipv4_minip"),"left").repartition(6000,col("ipv6_minip")).join(ipv6.repartition(6000,col("ipv6_minip")),Seq("ipv6_minip"),"left").write.mode("overwrite").parquet("/tmp/logs/data/huangg/essid/ip/20251017")
倾斜解决方案:
方案1:加盐处理:
// 加盐Join
import scala.util.Random
// 给小表加盐
val saltedSmallTable = smallTable.flatMap { row =>
(0 until 10).map { salt =>
Row.fromSeq(row.toSeq :+ salt)
}
}
// 给大表的倾斜键加随机盐
val saltedLargeTable = largeTable.map { row =>
val key = row.getAs[String]("join_key")
val salt = if (isSkewedKey(key)) Random.nextInt(10) else 0
Row.fromSeq(row.toSeq :+ salt)
}
// 执行Join
val result = saltedLargeTable.join(saltedSmallTable,
Seq("join_key", "salt_column"))
方案2:倾斜键单独处理:
// 识别倾斜键
val skewedKeys = Set("skewed_key_1", "skewed_key_2")
// 分离倾斜数据和正常数据
val normalData = largeTable.filter(!$"join_key".isin(skewedKeys.toSeq:_*))
val skewedData = largeTable.filter($"join_key".isin(skewedKeys.toSeq:_*))
// 正常数据正常Join
val normalResult = normalData.join(smallTable, "join_key")
// 倾斜数据使用广播Join
val skewedResult = skewedData.join(broadcast(smallTable), "join_key")
// 合并结果
val finalResult = normalResult.union(skewedResult)
方案3:两阶段聚合:
// 预聚合阶段
val preAggregated = largeTable
.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat($"join_key", lit("_"), $"salt"))
.groupBy("salted_key")
.agg(sum("value").as("partial_sum"))
// 最终聚合阶段
val finalAggregated = preAggregated
.withColumn("original_key", split($"salted_key", "_").getItem(0))
.groupBy("original_key")
.agg(sum("partial_sum").as("final_sum"))
缓存与持久化
存储级别选择
存储级别对比:
| 存储级别 | 内存使用 | 磁盘使用 | 序列化 | CPU开销 | 适用场景 |
|---|---|---|---|---|---|
| MEMORY_ONLY | 高 | 无 | 无 | 低 | 小数据集,频繁访问 |
| MEMORY_AND_DISK | 中等 | 有 | 无 | 低 | 大数据集,内存不足时 |
| MEMORY_ONLY_SER | 低 | 无 | 有 | 高 | 内存紧张,CPU充足 |
| DISK_ONLY | 无 | 高 | 有 | 中等 | 大数据集,内存紧张 |
| OFF_HEAP | 堆外 | 无 | 有 | 中等 | 减少GC压力 |
缓存策略优化
智能缓存策略:
// 根据数据大小选择存储级别
def selectStorageLevel(dataSize: Long, memoryAvailable: Long): StorageLevel = {
val ratio = dataSize.toDouble / memoryAvailable
if (ratio < 0.3) {
StorageLevel.MEMORY_ONLY // 内存充足
} else if (ratio < 0.8) {
StorageLevel.MEMORY_ONLY_SER // 内存紧张,序列化节省空间
} else {
StorageLevel.MEMORY_AND_DISK_SER // 内存不足,溢出到磁盘
}
}
// 应用缓存策略
val dataSize = rdd.map(_.toString.length).sum()
val storageLevel = selectStorageLevel(dataSize, availableMemory)
rdd.persist(storageLevel)
缓存生命周期管理:
// 缓存管理器
class CacheManager {
private val cachedRDDs = mutable.Map[String, RDD[_]]()
def cache[T](name: String, rdd: RDD[T], level: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {
// 检查是否已缓存
if (!cachedRDDs.contains(name)) {
rdd.persist(level)
cachedRDDs(name) = rdd
println(s"Cached RDD: $name")
}
rdd
}
def uncache(name: String): Unit = {
cachedRDDs.get(name).foreach { rdd =>
rdd.unpersist()
cachedRDDs.remove(name)
println(s"Uncached RDD: $name")
}
}
def clearAll(): Unit = {
cachedRDDs.values.foreach(_.unpersist())
cachedRDDs.clear()
println("Cleared all cached RDDs")
}
}
Checkpoint优化
Checkpoint策略:
// 设置checkpoint目录
spark.sparkContext.setCheckpointDir("hdfs://namenode:8020/checkpoint")
// 智能checkpoint
def smartCheckpoint[T](rdd: RDD[T], lineageDepth: Int = 10): RDD[T] = {
// 计算血缘深度
def getLineageDepth(rdd: RDD[_]): Int = {
if (rdd.dependencies.isEmpty) {
1
} else {
1 + rdd.dependencies.map(_.rdd).map(getLineageDepth).max
}
}
val currentDepth = getLineageDepth(rdd)
if (currentDepth > lineageDepth) {
println(s"Checkpointing RDD with lineage depth: $currentDepth")
rdd.checkpoint()
}
rdd
}
// 使用示例
val deepRDD = rdd1.map(transform1)
.filter(filter1)
.join(rdd2)
.map(transform2)
.filter(filter2)
val checkpointedRDD = smartCheckpoint(deepRDD)
代码层面优化
算子选择优化
高效算子使用:
// 使用reduceByKey替代groupByKey
val wordCounts = words.map(word => (word, 1))
.reduceByKey(_ + _) // 推荐:Map端预聚合
// 而不是
val wordCounts = words.map(word => (word, 1))
.groupByKey() // 不推荐:所有数据都要Shuffle
.mapValues(_.sum)
// 使用mapPartitions减少函数调用开销
val result = rdd.mapPartitions { partition =>
// 在分区级别初始化资源
val connection = createConnection()
val buffer = new ArrayBuffer[ProcessedRecord]()
try {
partition.foreach { record =>
buffer += processWithConnection(record, connection)
}
buffer.iterator
} finally {
connection.close()
}
}
// 使用aggregateByKey进行复杂聚合
val result = rdd.aggregateByKey((0, 0))(
// 分区内聚合
(acc, value) => (acc._1 + value, acc._2 + 1),
// 分区间聚合
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (sum, count) => sum.toDouble / count }
数据结构优化
选择合适的数据结构:
// 使用原始类型数组替代集合
class OptimizedProcessor {
// 推荐:原始类型数组
private val intArray = new Array[Int](1000000)
// 不推荐:装箱类型集合
private val intList = new ArrayBuffer[Integer]()
// 使用专用的数据结构
def processLargeDataset(data: RDD[String]): RDD[String] = {
data.mapPartitions { partition =>
val bloom = new BloomFilter[String](1000000, 0.01)
val deduped = new mutable.HashSet[String]()
partition.filter { item =>
if (bloom.mightContain(item)) {
if (deduped.contains(item)) {
false // 重复项
} else {
deduped += item
true
}
} else {
bloom.put(item)
deduped += item
true
}
}
}
}
}
内存使用优化
对象复用:
// 对象池模式
class ObjectPool[T](createFunc: () => T, resetFunc: T => Unit) {
private val pool = new mutable.Queue[T]()
def borrow(): T = {
pool.synchronized {
if (pool.nonEmpty) {
pool.dequeue()
} else {
createFunc()
}
}
}
def return(obj: T): Unit = {
resetFunc(obj)
pool.synchronized {
pool.enqueue(obj)
}
}
}
// 使用对象池
val stringBuilderPool = new ObjectPool[StringBuilder](
() => new StringBuilder(),
_.clear()
)
val result = rdd.mapPartitions { partition =>
partition.map { item =>
val sb = stringBuilderPool.borrow()
try {
sb.append(item).append("_processed").toString
} finally {
stringBuilderPool.return(sb)
}
}
}
广播变量和累加器优化
广播变量最佳实践:
// 大查找表广播
val lookupTableMap = smallTable.collect()
.map(row => row.getString(0) -> row.getString(1))
.toMap
val broadcastLookup = spark.sparkContext.broadcast(lookupTableMap)
val enrichedData = largeRDD.map { record =>
val enrichValue = broadcastLookup.value.getOrElse(record.key, "unknown")
EnrichedRecord(record, enrichValue)
}
// 记住及时释放
broadcastLookup.destroy()
累加器优化:
// 自定义累加器
class HistogramAccumulator extends AccumulatorV2[Double, mutable.Map[String, Long]] {
private val histogram = mutable.Map[String, Long]()
override def isZero: Boolean = histogram.isEmpty
override def copy(): HistogramAccumulator = {
val newAcc = new HistogramAccumulator
newAcc.histogram ++= this.histogram
newAcc
}
override def reset(): Unit = histogram.clear()
override def add(value: Double): Unit = {
val bucket = getBucket(value)
histogram(bucket) = histogram.getOrElse(bucket, 0L) + 1
}
override def merge(other: AccumulatorV2[Double, mutable.Map[String, Long]]): Unit = {
other match {
case o: HistogramAccumulator =>
o.histogram.foreach { case (bucket, count) =>
histogram(bucket) = histogram.getOrElse(bucket, 0L) + count
}
}
}
override def value: mutable.Map[String, Long] = histogram.toMap
private def getBucket(value: Double): String = {
if (value < 0) "negative"
else if (value < 10) "0-10"
else if (value < 100) "10-100"
else "100+"
}
}
// 使用自定义累加器
val histogramAcc = new HistogramAccumulator
spark.sparkContext.register(histogramAcc, "histogram")
rdd.foreach(value => histogramAcc.add(value))
println(s"Histogram: ${histogramAcc.value}")
网络与I/O优化
序列化优化
Kryo序列化配置:
// Kryo配置
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")
spark.conf.set("spark.kryo.unsafe", "true")
// 注册常用类
val conf = new SparkConf()
conf.registerKryoClasses(Array(
classOf[MyClass],
classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
classOf[org.apache.spark.sql.types.StructType],
classOf[Array[org.apache.spark.sql.types.StructField]]
))
压缩优化
压缩算法选择:
// 根据场景选择压缩算法
spark.conf.set("spark.io.compression.codec", "snappy") // 平衡压缩比和速度
// spark.conf.set("spark.io.compression.codec", "lz4") // 更快的压缩/解压
// spark.conf.set("spark.io.compression.codec", "gzip") // 更高的压缩比
// 启用各种压缩
spark.conf.set("spark.broadcast.compress", "true")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.rdd.compress", "true")
网络调优
网络参数优化:
# 网络超时设置
spark.conf.set("spark.network.timeout", "800s")
spark.conf.set("spark.rpc.askTimeout", "800s")
spark.conf.set("spark.rpc.lookupTimeout", "800s")
# 网络连接优化
spark.conf.set("spark.rpc.netty.dispatcher.numThreads", "8")
spark.conf.set("spark.shuffle.io.numConnectionsPerPeer", "3")
spark.conf.set("spark.shuffle.io.preferDirectBufs", "true")
# 传输优化
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.reducer.maxReqsInFlight", "Int.MaxValue")
常见性能问题
内存溢出问题
OOM问题诊断:
// 1. Driver OOM
// 原因:collect()操作数据量过大
val result = largeRDD.collect() // 危险操作
// 解决方案:使用take()或分批处理
val sample = largeRDD.take(1000)
largeRDD.foreachPartition { partition =>
// 分区内处理,避免将所有数据拉到Driver
}
// 2. Executor OOM
// 原因:单个分区数据过大
// 解决方案:增加分区数
val repartitionedRDD = rdd.repartition(numPartitions * 2)
数据倾斜解决方案
倾斜检测和解决:
// 检测倾斜
def detectSkew[T](rdd: RDD[T]): Unit = {
val partitionSizes = rdd.mapPartitionsWithIndex { (index, iter) =>
Iterator((index, iter.size))
}.collect()
val maxSize = partitionSizes.maxBy(_._2)
val avgSize = partitionSizes.map(_._2).sum / partitionSizes.length
if (maxSize._2 > avgSize * 3) {
println(s"数据倾斜警告:分区${maxSize._1}有${maxSize._2}条记录,平均${avgSize}条")
}
}
监控与诊断
Spark UI监控
关键监控指标:
- Jobs页面:作业执行时间、Stage信息
- Stages页面:Stage执行详情、任务分布
- Storage页面:RDD缓存使用情况
- Executors页面:Executor资源使用情况
- SQL页面:SQL查询执行计划
性能指标
核心性能指标:
# 查看应用程序指标
curl http://driver-host:4040/api/v1/applications
curl http://driver-host:4040/api/v1/applications/[app-id]/jobs
curl http://driver-host:4040/api/v1/applications/[app-id]/stages
curl http://driver-host:4040/api/v1/applications/[app-id]/executors
Spark Web UI - SQL 页面解读
graph LR
A["SQL 页面"] --> B["SQL 查询列表"]
A --> C["图表区域 Graph"]
A --> D["详情区域 Details"]
B --> B1["Query ID / Duration / Submission Time"]
B --> B2["Job/Stage 关联"]
C --> C1["DAG 图视图"]
C --> C2["节点颜色/尺寸/边箭头"]
D --> D1["SQL 文本"]
D --> D2["Parsed/Analyzed/Optimized"]
D --> D3["Physical Plan"]
- SQL 查询列表:每一条记录对应一次 SQL 执行(包括 DataFrame/Dataset API 被翻译的查询)。
- Query ID:查询唯一标识;点击进入详情。
- Duration:总时长;过长需排查倾斜、Shuffle 或 Scan 代价。
- Jobs/Stages:该查询触发的 Job/Stage 数量与链接。
- 图表区域 Graph(DAG):展示物理执行 DAG。
- 节点:代表物理算子(如 BroadcastHashJoin、ShuffledHashJoin、SortMergeJoin、WholeStageCodegen、Project、Filter、Scan)。
- 颜色/大小:通常暗示阶段边界、Shuffle、广播等;边箭头表示依赖方向。
- 提示:节点过多或多次 Shuffle,通常意味着可优化(如 Broadcast Join、过滤下推、列裁剪)。
- 详情区域 Details:分层展示查询从 SQL 文本到物理计划的演进:
- Parsed:语法解析树。
- Analyzed:解析并完成解析属性、类型推导后的逻辑计划。
- Optimized:Catalyst 优化(谓词/投影下推、常量折叠、合并等)后的逻辑计划。
- Physical Plan:可执行的物理算子树(见下节)。
Details 中 Physical Plan 详解
graph TD
A["Physical Plan"] --> B["Scan(扫描)"]
A --> C["Project / Filter(列裁剪/筛选)"]
A --> D["Aggregate(聚合)"]
A --> E["Join(连接)"]
A --> F["Sort / Exchange(排序/Shuffle)"]
A --> G["WholeStageCodegen(代码生成)"]
常见算子说明(按链路自下而上)
- Scan:数据源读取。关键信息:
PushedFilters(谓词下推)、Batch/Vectorized(矢量化读取)、Input Size/Rows(读取量)。- 优化:开启列裁剪、谓词下推;使用列式存储(Parquet/ORC);尽量减少
Input Size。
- 优化:开启列裁剪、谓词下推;使用列式存储(Parquet/ORC);尽量减少
- Project/Filter:列裁剪与条件过滤,若能在 Scan 阶段下推更优。
- Aggregate:分为 HashAggregate/SortAggregate;前者内存友好且常见,后者需要排序。
- 优化:
partial -> shuffle -> final两阶段聚合是否生效;必要时调大spark.sql.shuffle.partitions。
- 优化:
- Join:BroadcastHashJoin / SortMergeJoin / ShuffledHashJoin / BroadcastNestedLoopJoin 等。
- 选择逻辑:
- 小表 + 大表:优先 BroadcastHashJoin(受
spark.sql.autoBroadcastJoinThreshold约束)。 - 大表 + 大表:常见 SortMergeJoin(需要两侧都可排序的等值连接)。
- 非等值/复杂条件:可能退化为 NestedLoopJoin。
- 小表 + 大表:优先 BroadcastHashJoin(受
- 优化:广播小表、重分区、提前过滤、选择合适的 join key 分布、开启 AQE(自适应执行)。
- 选择逻辑:
- Sort / Exchange:排序与 Shuffle 边界(
Exchange)。- 代价高的环节,观察
shuffle read/write size、skew(倾斜告警)。
- 代价高的环节,观察
- WholeStageCodegen:算子融合,减少虚函数开销;节点越大代表融合范围越广。
- 观察
codegenStageId;若关闭可设置spark.sql.codegen.wholeStage=false(仅用于诊断)。
- 观察
如何解读 Physical Plan 关键信息
- 行为统计:
(x rows, y KB)结合 Stage Metrics 判断热点与瓶颈。 - 下推/裁剪:检查
PushedFilters、Input/Output Columns是否最小化。 - Shuffle 边界:
Exchange hashpartitioning(...)处是重分区;关注分区数与倾斜。 - Join 选择:核对
BroadcastHashJoin是否命中,或SortMergeJoin是否多余的 sort。 - 代码生成:确认存在
WholeStageCodegen,并留意是否被PythonUDF打断(可替换为内置函数或 pandas_udf)。
常见定位与优化建议
- SQL 页面 Duration 过长:先看 Graph 的 Shuffle/Join 节点;再看 Details 的 Physical Plan 并结合 Executors 页的 Task 时间分布。
- 扫描过大:启用分区裁剪、列裁剪、谓词下推;检查表统计信息(ANALYZE TABLE)。
- 倾斜:尝试
salting、skew join(AQE 自动倾斜优化)、过滤热点 key。 - Join 不合适:提升广播阈值或强制
/*+ BROADCAST(t) */;减少无用的Sort。
常见错误解决方案
内存相关错误
OutOfMemoryError: Java heap space
错误现象:
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
at java.lang.StringBuilder.append(StringBuilder.java:136)
原因分析:
- 堆内存不足:Executor或Driver的堆内存设置过小
- 数据倾斜:某些分区数据量过大,导致单个Task内存溢出
- 缓存过多:RDD缓存占用过多内存
- 对象创建过多:频繁创建大对象
解决方案:
1. 调整内存配置:
# 增加Executor内存
spark-submit \
--conf spark.executor.memory=8g \
--conf spark.executor.memoryOverhead=2g \
--conf spark.driver.memory=4g \
--conf spark.driver.memoryOverhead=1g \
your-app.jar
# 调整内存比例
spark-submit \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
your-app.jar
2. 处理数据倾斜:
// 方法1:增加分区数
val skewedRDD = originalRDD.repartition(200)
// 方法2:自定义分区策略
val customPartitioner = new Partitioner {
override def numPartitions: Int = 200
override def getPartition(key: Any): Int = {
// 自定义分区逻辑,避免数据倾斜
val hash = key.hashCode()
Math.abs(hash) % numPartitions
}
}
val skewedRDD = originalRDD.partitionBy(customPartitioner)
// 方法3:两阶段聚合
val stage1RDD = originalRDD
.map(x => (x._1 + "_" + Random.nextInt(10), x._2)) // 添加随机前缀
.reduceByKey(_ + _)
.map(x => (x._1.split("_")(0), x._2)) // 去掉随机前缀
.reduceByKey(_ + _)
3. 优化缓存策略:
// 使用MEMORY_AND_DISK_SER减少内存占用
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
// 及时释放不需要的缓存
rdd.unpersist()
// 使用checkpoint减少内存压力
rdd.checkpoint()
4. 代码优化:
// 使用mapPartitions减少对象创建
val optimizedRDD = rdd.mapPartitions(iter => {
val result = new ArrayBuffer[String]()
while (iter.hasNext) {
val item = iter.next()
// 处理逻辑
result += processedItem
}
result.iterator
})
// 复用对象
case class User(id: Long, name: String)
val userRDD = rdd.mapPartitions(iter => {
val user = User(0L, "") // 复用对象
iter.map { case (id, name) =>
user.id = id
user.name = name
user.copy() // 返回副本
}
})
OutOfMemoryError: Direct buffer memory
错误现象:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
原因分析:
- 堆外内存不足:DirectBuffer内存设置过小
- 网络传输过多:大量数据通过网络传输
- 序列化问题:序列化过程中占用过多堆外内存
解决方案:
1. 增加堆外内存:
spark-submit \
--conf spark.executor.memoryOverhead=4g \
--conf spark.driver.memoryOverhead=2g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=4g \
your-app.jar
2. 优化网络传输:
// 启用压缩
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.broadcast.compress", "true")
spark.conf.set("spark.shuffle.compress", "true")
// 调整网络缓冲区
spark.conf.set("spark.network.timeout", "800s")
spark.conf.set("spark.executor.heartbeatInterval", "60s")
3. 优化序列化:
// 使用Kryo序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")
// 注册自定义类
val conf = new SparkConf()
conf.registerKryoClasses(Array(classOf[MyClass]))
- OutOfMemoryError: Metaspace
错误现象:
java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
原因分析:
- 类加载过多:动态生成大量类
- Metaspace设置过小:JVM Metaspace空间不足
- UDF使用过多:大量UDF导致类加载
解决方案:
1. 调整JVM参数:
spark-submit \
--conf spark.executor.extraJavaOptions="-XX:MaxMetaspaceSize=512m" \
--conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=512m" \
your-app.jar
2. 优化UDF使用:
// 避免在UDF中创建过多类
val optimizedUDF = udf((value: String) => {
// 使用简单逻辑,避免动态类生成
value.toUpperCase
})
// 复用UDF实例
val myUDF = udf((x: Int) => x * 2)
df.select(myUDF(col("value")))
网络相关错误
FetchFailedException
错误现象:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostname:7337
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:646)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:646)
原因分析:
- 网络超时:网络连接超时
- Executor丢失:Executor进程异常退出
- 内存不足:Executor内存不足导致进程退出
- 网络不稳定:网络连接不稳定
解决方案:
1. 调整网络超时参数:
spark-submit \
--conf spark.network.timeout=800s \
--conf spark.executor.heartbeatInterval=60s \
--conf spark.rpc.askTimeout=800s \
--conf spark.rpc.lookupTimeout=800s \
your-app.jar
2. 增加重试机制:
spark-submit \
--conf spark.task.maxFailures=8 \
--conf spark.stage.maxAttempts=4 \
your-app.jar
3. 优化Shuffle配置:
spark-submit \
--conf spark.shuffle.io.maxRetries=3 \
--conf spark.shuffle.io.retryWait=60s \
--conf spark.shuffle.file.buffer=32k \
your-app.jar
4. 监控Executor状态:
// 添加监控代码
spark.sparkContext.addSparkListener(new SparkListener {
override def onExecutorLost(executorLost: SparkListenerExecutorLost): Unit = {
println(s"Executor lost: ${executorLost.executorId}")
// 记录日志或发送告警
}
})
ConnectionTimeoutException
错误现象:
java.net.ConnectTimeoutException: Connection timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
原因分析:
- 网络延迟:网络延迟过高
- 防火墙限制:防火墙阻止连接
- 端口冲突:端口被占用
- DNS解析问题:DNS解析失败
解决方案:
1. 调整连接超时:
spark-submit \
--conf spark.network.timeout=1200s \
--conf spark.rpc.askTimeout=1200s \
your-app.jar
2. 检查网络配置:
# 检查网络连通性
ping hostname
telnet hostname port
# 检查防火墙
iptables -L
3. 使用本地化策略:
// 启用数据本地化
spark.conf.set("spark.locality.wait", "30s")
spark.conf.set("spark.locality.wait.process", "30s")
spark.conf.set("spark.locality.wait.node", "30s")
spark.conf.set("spark.locality.wait.rack", "30s")
序列化相关错误
NotSerializableException
错误现象:
java.io.NotSerializableException: com.example.MyClass
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
原因分析:
- 类未实现Serializable:自定义类未实现Serializable接口
- 闭包问题:在闭包中引用了不可序列化的对象
- 静态变量:引用了静态变量或单例对象
解决方案:
1. 实现Serializable接口:
// 方法1:实现Serializable
case class MyClass(id: Int, name: String) extends Serializable
// 方法2:使用@transient注解
class MyClass(val id: Int, val name: String) extends Serializable {
@transient
private val nonSerializableField = new NonSerializableClass()
}
2. 避免闭包问题:
// 错误示例
val nonSerializableObject = new NonSerializableClass()
val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.map(x => nonSerializableObject.process(x)) // 会报错
// 正确示例
val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.mapPartitions(iter => {
val localObject = new NonSerializableClass() // 在分区内创建
iter.map(x => localObject.process(x))
})
3. 使用广播变量:
// 对于只读的大对象,使用广播变量
val largeData = spark.sparkContext.parallelize(1 to 1000000).collect()
val broadcastData = spark.sparkContext.broadcast(largeData)
val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.map(x => {
val data = broadcastData.value // 使用广播变量
processWithData(x, data)
})
KryoSerializationException
错误现象:
com.esotericsoftware.kryo.KryoException: java.lang.ClassNotFoundException: com.example.MyClass
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
原因分析:
- 类未注册:使用Kryo序列化时类未注册
- 类路径问题:类不在classpath中
- 版本不兼容:序列化和反序列化版本不匹配
解决方案:
1. 注册自定义类:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(
classOf[MyClass],
classOf[MyOtherClass]
))
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
2. 使用Kryo注册器:
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[MyClass])
kryo.register(classOf[MyOtherClass])
}
}
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
3. 禁用严格模式:
spark.conf.set("spark.kryo.registrationRequired", "false")
资源相关错误
ExecutorLostFailure
错误现象:
org.apache.spark.scheduler.ExecutorLostFailure: Executor 1 lost
at org.apache.spark.scheduler.TaskSetManager$$anonfun$abortIfCompletelyBlacklisted$1.apply(TaskSetManager.scala:1023)
原因分析:
- 内存不足:Executor内存不足被杀死
- CPU过载:CPU使用率过高导致进程异常
- 磁盘空间不足:磁盘空间不足导致写入失败
- 网络问题:网络连接问题导致心跳超时
解决方案:
1. 增加资源配额:
spark-submit \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.executor.memoryOverhead=2g \
your-app.jar
2. 监控资源使用:
// 添加资源监控
spark.sparkContext.addSparkListener(new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val metrics = taskEnd.taskMetrics
println(s"Task ${taskEnd.taskInfo.taskId} completed:")
println(s" Duration: ${taskEnd.taskInfo.duration}ms")
println(s" Memory: ${metrics.peakExecutionMemory} bytes")
println(s" Disk: ${metrics.diskBytesSpilled} bytes spilled")
}
})
3. 优化资源分配:
spark-submit \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.dynamicAllocation.initialExecutors=5 \
your-app.jar
NoClassDefFoundError
错误现象:
java.lang.NoClassDefFoundError: com.example.MyClass
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
原因分析:
- 依赖缺失:缺少必要的jar包
- 版本冲突:依赖版本冲突
- 类路径问题:类不在classpath中
- 打包问题:jar包打包不完整
解决方案:
1. 添加依赖jar包:
spark-submit \
--jars /path/to/dependency1.jar,/path/to/dependency2.jar \
--conf spark.executor.extraClassPath=/path/to/dependencies/* \
your-app.jar
2. 使用fat jar:
<!-- Maven配置 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
3. 检查依赖冲突:
# 查看依赖树
mvn dependency:tree
# 排除冲突依赖
<dependency>
<groupId>com.example</groupId>
<artifactId>library</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>conflicting.group</groupId>
<artifactId>conflicting-artifact</artifactId>
</exclusion>
</exclusions>
</dependency>
数据相关错误
FileNotFoundException
错误现象:
java.io.FileNotFoundException: File does not exist: hdfs://namenode:8020/path/to/file
at org.apache.hadoop.hdfs.DFSClient.checkPath(DFSClient.java:1274)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1527)
原因分析:
- 文件不存在:输入文件路径错误或文件被删除
- 权限问题:没有读取文件的权限
- 路径错误:文件路径格式错误
- HDFS问题:HDFS服务异常
解决方案:
1. 检查文件路径:
// 检查文件是否存在
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val path = new org.apache.hadoop.fs.Path("/path/to/file")
if (fs.exists(path)) {
println("File exists")
} else {
println("File does not exist")
}
2. 设置文件系统配置:
// 设置HDFS配置
spark.conf.set("spark.hadoop.fs.defaultFS", "hdfs://namenode:8020")
spark.conf.set("spark.hadoop.dfs.namenode.rpc-address", "namenode:8020")
3. 处理权限问题:
# 检查文件权限
hdfs dfs -ls /path/to/file
# 修改文件权限
hdfs dfs -chmod 644 /path/to/file
# 修改文件所有者
hdfs dfs -chown username:group /path/to/file
DataSourceException
错误现象:
org.apache.spark.sql.AnalysisException: Table or view not found: table_name
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
原因分析:
- 表不存在:数据库表不存在
- 权限问题:没有访问表的权限
- 数据库连接问题:数据库连接失败
- 表名错误:表名拼写错误
解决方案:
1. 检查表是否存在:
// 检查表是否存在
val tables = spark.catalog.listTables()
tables.filter(_.name == "table_name").show()
// 或者使用SQL
spark.sql("SHOW TABLES").show()
2. 设置数据库连接:
// 设置数据库连接
spark.conf.set("spark.sql.warehouse.dir", "/user/hive/warehouse")
spark.conf.set("hive.metastore.uris", "thrift://metastore:9083")
// 使用Hive
spark.sql("USE database_name")
spark.sql("SHOW TABLES").show()
3. 处理权限问题:
-- 授予权限
GRANT SELECT ON TABLE table_name TO USER username;
-- 检查权限
SHOW GRANT USER username ON TABLE table_name;
调试和诊断工具
- Spark Web UI
访问方式:
http://driver-host:4040 # 应用运行时
http://driver-host:18080 # 历史服务器
关键指标:
- Stages页面:查看Stage执行情况和失败原因
- Executors页面:查看Executor资源使用情况
- Storage页面:查看RDD缓存情况
- Environment页面:查看配置参数
- 日志分析
查看日志:
# 查看Driver日志
tail -f /path/to/spark/logs/spark-*-driver-*.log
# 查看Executor日志
tail -f /path/to/spark/logs/spark-*-executor-*.log
# 查看YARN日志
yarn logs -applicationId application_1234567890_0001
关键日志模式:
# 查找错误信息
grep -i "error\|exception\|failed" /path/to/logs/*.log
# 查找内存相关错误
grep -i "outofmemory\|oom" /path/to/logs/*.log
# 查找网络相关错误
grep -i "timeout\|connection" /path/to/logs/*.log
- 性能分析工具
JVM分析:
# 查看JVM堆内存使用
jstat -gc <pid> 1000
# 查看线程状态
jstack <pid>
# 查看内存dump
jmap -dump:format=b,file=heap.hprof <pid>
系统资源监控:
# 查看系统资源使用
top -p <pid>
iostat -x 1
netstat -i
- 调试代码
添加调试信息:
// 添加日志
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.DEBUG)
// 添加监控
spark.sparkContext.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
println(s"Task started: ${taskStart.taskInfo.taskId}")
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
println(s"Task ended: ${taskEnd.taskInfo.taskId}, " +
s"status: ${taskEnd.taskInfo.status}")
}
})
使用Spark Shell调试:
// 启动Spark Shell
spark-shell --master local[*]
// 逐步调试
val rdd = sc.textFile("/path/to/file")
rdd.take(10).foreach(println) // 查看数据
rdd.count() // 检查数据量
预防措施
- 配置优化
基础配置:
# 内存配置
spark.executor.memory=8g
spark.executor.memoryOverhead=2g
spark.driver.memory=4g
spark.driver.memoryOverhead=1g
# 网络配置
spark.network.timeout=800s
spark.executor.heartbeatInterval=60s
# 序列化配置
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=false
性能配置:
# Shuffle配置
spark.shuffle.file.buffer=32k
spark.shuffle.io.maxRetries=3
spark.shuffle.io.retryWait=60s
# 动态分配
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=20
- 代码最佳实践
内存优化:
// 使用广播变量
val broadcastVar = sc.broadcast(largeData)
// 及时释放缓存
rdd.unpersist()
// 使用checkpoint
rdd.checkpoint()
性能优化:
// 合理设置分区数
val optimalPartitions = Math.max(rdd.partitions.length, 200)
val repartitionedRDD = rdd.repartition(optimalPartitions)
// 使用mapPartitions
val optimizedRDD = rdd.mapPartitions(iter => {
// 批量处理逻辑
iter.map(processItem)
})
- 监控告警
设置监控:
// 添加监控指标
val metrics = spark.sparkContext.getStatusTracker
val stageInfo = metrics.getStageInfo(stageId)
println(s"Stage $stageId: ${stageInfo.numTasks} tasks, " +
s"${stageInfo.numCompletedTasks} completed")
告警配置:
# 设置告警阈值
spark.executor.failures.max=3
spark.task.maxFailures=8
spark.stage.maxAttempts=4
通过以上详细的错误分析和解决方案,可以有效处理Spark任务中的常见问题,提高系统的稳定性和性能。
关键参数与配置模板
JVM相关参数
垃圾回收配置:
# G1GC配置(推荐)
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:MaxGCPauseMillis=200
-XX:+G1PrintRegionRememberedSetInfo
-XX:+UseCompressedOops
-XX:+UseCompressedClassPointers
-XX:+PrintHeapAtGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/spark/gc-executor.log"
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+PrintHeapAtGC
-Xloggc:/var/log/spark/gc-driver.log"
并发GC配置:
# CMS GC配置
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=70
-XX:+PrintGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps"
内存调试参数:
# 内存调试配置
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/spark/heapdump
-XX:+TraceClassLoading
-XX:+PrintStringDeduplication"
生产环境配置:
#!/bin/bash
# 生产环境Spark配置模板
/opt/spark32/bin/spark-shell --master yarn \
--deploy-mode client \
--driver-memory ${driver_memory} \
--executor-memory ${executor_memory} \
--num-executors ${num_executors} \
--conf spark.default.parallelism=${default_parallelism} \
--conf spark.sql.shuffle.partitions=${shuffle_partitions} \
--conf spark.driver.maxResultSize=4g \
--name ${job_name}_${log_date} \
--conf spark.executor.memoryOverhead=2g \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:InitiatingHeapOccupancyPercent=35 -XX:MaxGCPauseMillis=200" \
--conf spark.executor.heartbeatInterval=60s \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=512m \
--conf spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 \
--conf spark.sql.parquet.binaryAsString=true \
--conf spark.sql.parquet.compression.codec=snappy \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.kryoserializer.buffer=128k \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.shuffle.io.preferDirectBufs=true \
--conf spark.shuffle.io.maxRetries=10 \
--conf spark.shuffle.io.retryWait=20s \
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true \
--conf spark.shuffle.file.buffer=4m \
--conf spark.network.timeout=1200s \
--conf spark.reducer.maxSizeInFlight=36m \
--jars ${jar_path} \
--queue default
Spark高频面试题
基础概念题
Q1: 请详细解释RDD、DataFrame和Dataset的区别及各自的应用场景。
标准答案:
Spark提供了三种核心数据抽象:RDD、DataFrame和Dataset,它们各自适用于不同的场景,具有不同的特性和优势。
1. 基本概念对比
- RDD (Resilient Distributed Dataset):Spark最初的数据抽象,是一个不可变的、分布式的对象集合,支持函数式编程操作。
- DataFrame:在RDD基础上引入了Schema概念,类似关系型数据库中的表结构,支持SQL查询。
- Dataset:DataFrame的扩展,提供类型安全的、面向对象的编程接口,结合了RDD的类型安全和DataFrame的优化性能。
2. 核心特性对比
| 特性 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 类型安全 | 编译时类型检查 | 运行时类型检查 | 编译时类型检查 |
| 性能优化 | 无内置优化 | Catalyst优化器 | Catalyst优化器 |
| 内存管理 | Java对象/Kryo序列化 | Tungsten二进制格式 | Tungsten二进制格式 |
| API风格 | 函数式API | SQL + 函数式API | 类型安全的函数式API |
| Schema感知 | 无Schema | 有Schema | 有Schema |
| 使用难度 | 较复杂 | 简单 | 中等 |
| 适用场景 | 非结构化数据处理 | 结构化数据分析 | 结构化数据复杂处理 |
3. 代码示例对比
// RDD示例
val rdd = sc.textFile("data.txt")
.map(line => line.split(","))
.map(fields => Person(fields(0), fields(1).toInt))
.filter(person => person.age > 30)
// DataFrame示例
val df = spark.read.json("people.json")
df.filter($"age" > 30)
.select($"name", $"age")
.groupBy($"age")
.count()
// Dataset示例
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(p => p.age > 30)
.map(p => (p.name, p.age))
.groupByKey(_._2)
.count()
4. 性能与优化对比
- RDD:依赖于JVM的垃圾回收和Java序列化,性能受限
- DataFrame:
- 使用Catalyst优化器进行逻辑和物理执行计划优化
- 使用Tungsten高效内存管理,减少GC开销
- 支持列式存储和压缩
- Dataset:
- 结合了DataFrame的所有优化
- 增加了编码器(Encoder),在对象和内部Tungsten表示之间高效转换
5. 选择建议
- 选择RDD:当需要细粒度控制或处理非结构化数据时
- 选择DataFrame:处理结构化数据,需要高性能优化,或需要使用SQL查询
- 选择Dataset:需要类型安全和函数式编程,同时又需要Catalyst优化器的性能提升
随着Spark的发展,DataFrame和Dataset API已经成为推荐的数据处理方式,特别是在Spark 2.0之后,DataFrame实际上是Dataset[Row]的类型别名。
Q2: 请详细描述Spark任务的执行流程,从提交应用到任务完成的全过程。
标准答案:
Spark任务执行是一个复杂的分布式计算过程,涉及多个组件协同工作。理解这个流程对于优化Spark应用和排查问题至关重要。
1. 整体执行架构
Spark应用程序的执行涉及以下核心组件:
- Driver Program:包含应用程序的main函数,负责创建SparkContext
- Cluster Manager:负责资源分配(如YARN、Kubernetes、Mesos或Standalone)
- Worker Node:执行计算任务的节点
- Executor:在Worker节点上运行,负责执行具体的计算任务
2. 详细执行流程
graph TD
A[用户程序] --> B[SparkContext]
B --> C[DAGScheduler]
C --> D[TaskScheduler]
D --> E[Worker节点]
E --> F[Executor]
F --> G[Task执行]
G --> H[结果收集]
H --> I[作业完成]
style A fill:#f9f9f9,stroke:#333
style B fill:#d4f1f9,stroke:#05a4d1
style C fill:#ffe6cc,stroke:#d79b00
style D fill:#d5e8d4,stroke:#82b366
style E fill:#e1d5e7,stroke:#9673a6
style F fill:#f8cecc,stroke:#b85450
style G fill:#f8cecc,stroke:#b85450
style H fill:#d4f1f9,stroke:#05a4d1
style I fill:#f9f9f9,stroke:#333
3. 执行步骤详解
应用程序启动:
- 用户提交应用程序
- 创建SparkContext(Spark的入口点)
- SparkContext连接到集群管理器
资源申请:
- SparkContext通过集群管理器申请资源
- 集群管理器在Worker节点上启动Executor进程
DAG构建:
- 用户代码通过RDD转换操作构建DAG(有向无环图)
- 延迟计算:转换操作只构建DAG,不执行计算
作业提交:
- 当遇到Action操作时,触发作业提交
- SparkContext将作业提交给DAGScheduler
Stage划分:
- DAGScheduler将DAG划分为多个Stage
- 划分依据:Shuffle操作(如reduceByKey、join等)
- 每个Stage包含可以流水线执行的一组Task
Task生成与调度:
- 为每个Stage生成TaskSet
- TaskScheduler将TaskSet提交给TaskSetManager
- TaskSetManager负责具体的任务调度和失败重试
Task执行:
- Executor接收并执行Task
- 执行计算并将结果保存在内存或磁盘
- 对于Shuffle操作,将中间结果写入本地磁盘
结果收集:
- 对于需要返回结果的Action操作,Driver收集结果
- 对于写入外部存储的操作,直接写入目标位置
作业完成:
- 所有Task执行完成后,作业结束
- 释放资源或继续执行下一个作业
4. 关键概念解析
- Job:由Action操作触发的一组计算任务
- Stage:Job的子集,由一组可以流水线执行的Task组成
- Task:在单个Executor上执行的最小工作单元,处理一个分区的数据
- Shuffle:数据重分布过程,是Stage划分的边界
5. 示例说明
// 这个简单的Spark程序演示了执行流程
val sc = new SparkContext(conf) // 创建SparkContext
val lines = sc.textFile("data.txt") // 构建RDD,但不执行
val words = lines.flatMap(_.split(" ")) // 继续构建DAG
val wordCounts = words.map((_, 1)).reduceByKey(_ + _) // reduceByKey会导致Shuffle,划分Stage
wordCounts.collect() // Action操作,触发实际计算
在这个例子中:
- 会创建一个包含两个Stage的Job
- 第一个Stage执行textFile、flatMap和map操作
- 第二个Stage执行reduceByKey操作
- collect()触发整个Job的执行
理解Spark任务执行流程有助于编写高效的Spark应用程序,并能更好地进行性能调优和故障排查。
Q3: 请解释Spark的内存管理机制,包括内存分配策略和优化方法。
标准答案:
Spark内存管理是影响Spark应用性能的关键因素之一。Spark通过精细的内存管理机制,在有限的内存资源下实现高效的分布式计算。
1. Spark内存架构
Spark的JVM堆内存主要分为以下几个部分:
graph TD
A[JVM堆内存] --> B[Reserved Memory<br>300MB]
A --> C[User Memory<br>用户代码使用]
A --> D[Spark Memory<br>执行和存储]
D --> E[Storage Memory<br>缓存数据]
D --> F[Execution Memory<br>计算过程]
style A fill:#f9f9f9,stroke:#333
style B fill:#f8cecc,stroke:#b85450
style C fill:#d5e8d4,stroke:#82b366
style D fill:#d4f1f9,stroke:#05a4d1
style E fill:#ffe6cc,stroke:#d79b00
style F fill:#e1d5e7,stroke:#9673a6
2. 内存管理模式
Spark提供两种内存管理模式:
静态内存管理(Static Memory Management):
- Spark 1.6之前的默认模式
- 为存储和执行内存分配固定比例,不能动态调整
- 配置参数:
spark.storage.memoryFraction和spark.shuffle.memoryFraction
统一内存管理(Unified Memory Management):
- Spark 1.6及之后的默认模式
- 存储内存和执行内存共享一个区域,可以动态调整
- 配置参数:
spark.memory.fraction和spark.memory.storageFraction
3. 统一内存管理详解
在统一内存管理模式下:
- Spark Memory:占JVM堆内存的比例由
spark.memory.fraction控制,默认为0.6 - Storage Memory:初始大小由
spark.memory.storageFraction控制,默认为0.5 - Execution Memory:初始大小为Spark Memory减去Storage Memory
内存动态调整机制:
存储内存不足时:
- 如果执行内存有空闲,可以借用执行内存
- 如果执行内存没有空闲,则按LRU策略淘汰已缓存的RDD分区
执行内存不足时:
- 如果存储内存有空闲,可以借用存储内存
- 如果存储内存没有空闲,但存储内存中有部分是被执行内存借用的,则可以抢占这部分内存
- 执行内存不会淘汰存储内存中的数据
4. 内存管理相关配置参数
// 统一内存管理模式关键参数
spark.memory.fraction = 0.6 // Spark Memory占JVM堆内存的比例
spark.memory.storageFraction = 0.5 // Storage Memory初始占比
// 其他重要内存参数
spark.executor.memory = "4g" // Executor的JVM堆内存大小
spark.memory.offHeap.enabled = false // 是否启用堆外内存
spark.memory.offHeap.size = "2g" // 堆外内存大小
5. 堆外内存(Off-Heap Memory)
从Spark 2.0开始,Spark支持使用堆外内存:
- 通过
spark.memory.offHeap.enabled开启 - 使用
spark.memory.offHeap.size设置大小 - 优势:减少GC开销,提高性能
- 缺点:需要手动管理内存,配置复杂
6. 内存管理最佳实践
- 合理设置Executor内存:根据集群节点内存和并发任务数
- 监控内存使用情况:通过Spark UI查看内存使用情况
- 调整内存分配比例:根据应用特点调整存储和执行内存比例
- 使用堆外内存:对于大数据量处理,考虑启用堆外内存
- 避免内存泄漏:注意释放不再使用的RDD,使用
unpersist()方法
7. 内存不足问题排查
当遇到OutOfMemoryError或性能下降时:
- 检查是否有不必要的数据缓存
- 考虑增加分区数,减少每个任务的内存使用
- 调整GC策略,如使用G1GC
- 使用Kryo序列化减少内存占用
- 考虑增加Executor内存或减少每个Executor的核心数
深入理解Spark内存管理机制,对于优化Spark应用性能和解决内存相关问题至关重要。
架构原理题
Q4: 请详细介绍Spark的架构组件及其职责,各组件之间如何协同工作?
标准答案:
Spark是一个分布式计算框架,其架构由多个组件协同工作,共同支撑分布式数据处理能力。深入理解Spark架构组件及其交互方式,对于有效使用Spark和排查问题至关重要。
1. Spark架构总览
graph TB
A[用户应用] --> B[SparkContext]
B --> C[集群管理器<br>YARN/Kubernetes/Mesos/Standalone]
C --> D[Worker节点]
D --> E[Executor]
B --> F[DAGScheduler]
F --> G[TaskScheduler]
G --> E
style A fill:#f9f9f9,stroke:#333
style B fill:#d4f1f9,stroke:#05a4d1
style C fill:#ffe6cc,stroke:#d79b00
style D fill:#d5e8d4,stroke:#82b366
style E fill:#e1d5e7,stroke:#9673a6
style F fill:#f8cecc,stroke:#b85450
style G fill:#f8cecc,stroke:#b85450
2. 核心组件详解
| 组件 | 位置 | 主要职责 | 关键特性 |
|---|---|---|---|
| Driver Program | 客户端或集群中 | 运行应用程序的main函数 创建SparkContext 提交作业 收集结果 | 应用程序的控制中心 包含DAGScheduler和TaskScheduler |
| SparkContext | Driver中 | Spark程序的入口点 连接集群管理器 获取Executor 构建RDD | 每个应用只有一个 负责作业提交和资源申请 |
| SparkSession | Driver中 | Spark 2.0后的入口点 整合SQL、DataFrame、Dataset API | 提供统一的数据访问接口 包含SparkContext |
| Cluster Manager | 独立进程 | 资源分配 启动Executor | 支持多种实现: YARN、Kubernetes、Mesos、Standalone |
| Worker Node | 集群节点 | 提供计算资源 启动Executor进程 | 物理节点或虚拟机 可以运行多个Executor |
| Executor | Worker节点上 | 执行Task 缓存RDD 返回结果 | 每个应用有多个 生命周期与应用相同 |
| DAGScheduler | Driver中 | 构建DAG 划分Stage 生成TaskSet | 基于Shuffle依赖划分Stage 优化执行计划 |
| TaskScheduler | Driver中 | 将Task分发给Executor 监控Task执行 重试失败的Task | 负责具体的任务调度 处理任务失败和重试 |
| BlockManager | Driver和Executor中 | 管理内存和磁盘存储 处理数据块传输 | 负责RDD缓存 管理Shuffle数据 |
3. 组件交互流程
- 应用程序初始化:
val spark = SparkSession.builder().appName("MyApp").getOrCreate()
val sc = spark.sparkContext
资源申请与Executor启动:
- SparkContext连接集群管理器
- 集群管理器在Worker节点上启动Executor进程
- Executor向Driver注册
作业提交与执行:
- DAGScheduler将RDD DAG划分为Stage
- TaskScheduler将TaskSet提交给Executor
- Executor执行Task并返回结果
4. 不同部署模式对组件的影响
| 部署模式 | Driver位置 | 特点 | 适用场景 |
|---|---|---|---|
| Client模式 | 客户端机器 | Driver与客户端在同一进程 便于调试和查看输出 | 开发测试 交互式应用 |
| Cluster模式 | 集群中的Worker节点 | Driver在集群中运行 客户端可以断开连接 | 生产环境 长时间运行的作业 |
5. 各组件的高可用性考虑
- Driver:在YARN或Kubernetes上可以启用AM (ApplicationMaster) 重启
- Worker:节点失败时,其上的Executor会在其他节点重启
- Executor:失败时会重启,正在执行的任务会重试
- Task:失败后会自动重试,最多重试次数可配置
6. 实际应用中的架构选择
- 小规模应用:Standalone模式简单易用
- 企业生产环境:YARN或Kubernetes提供更好的资源隔离和管理
- 混合负载环境:Kubernetes适合与其他工作负载共存
- 传统大数据环境:YARN与Hadoop生态系统集成更好
深入理解Spark架构组件及其交互方式,有助于优化应用性能、排查问题,以及设计适合特定场景的Spark应用架构。
Q5: 请详细解释Spark的Shuffle机制原理及其演进历史,如何优化Shuffle操作?
标准答案:
Shuffle是Spark中最关键也最复杂的机制之一,它涉及到数据的重新分区和跨节点传输,对Spark应用的性能有着重大影响。深入理解Shuffle机制对于优化Spark应用至关重要。
1. Shuffle的基本概念
Shuffle是指将分布在各个分区的数据按照某种规则重新组织,使得具有相同特征(如相同的key)的数据聚集在一起进行计算的过程。在Spark中,Shuffle操作是Stage划分的边界。
触发Shuffle的操作包括:
- 重分区操作:
repartition、coalesce - ByKey类操作:
groupByKey、reduceByKey、aggregateByKey - Join类操作:
join、cogroup - 排序操作:
sortBy、sortByKey
2. Shuffle的演进历史
Spark Shuffle机制经历了多次重大改进:
| Shuffle版本 | Spark版本 | 特点 | 主要问题 |
|---|---|---|---|
| Hash Shuffle V1 | 0.8及之前 | 每个map task输出M×R个文件 (M=map任务数,R=reduce任务数) | 文件数过多,占用文件句柄 |
| Hash Shuffle V2 | 0.8.1 - 1.1 | 每个executor输出C×R个文件 (C=core数,R=reduce任务数) | 文件数仍然较多 |
| Sort Shuffle V1 | 1.1 - 1.5 | 每个map task输出1个文件,按key排序 | 所有数据都排序,开销大 |
| Sort Shuffle V2 (Tungsten) | 1.5 - 2.0 | 二进制序列化,直接操作内存 | 特定场景优化 |
| Sort Shuffle V3 | 2.0+ | 统一的Sort-based Shuffle 小数据量可绕过排序 | 当前默认实现 |
3. Sort-based Shuffle详细工作流程
graph TD
A[Map任务] --> B[内存中按Partitioner分区]
B --> C{是否需要排序?}
C -->|是| D[对每个分区内数据排序]
C -->|否| E[跳过排序]
D --> F[溢写到磁盘]
E --> F
F --> G[合并溢写文件]
G --> H[生成数据文件和索引文件]
H --> I[Reduce任务]
I --> J[通过网络拉取数据]
J --> K[合并数据]
K --> L[进行Reduce计算]
style A fill:#d4f1f9,stroke:#05a4d1
style I fill:#e1d5e7,stroke:#9673a6
style F fill:#ffe6cc,stroke:#d79b00
style G fill:#ffe6cc,stroke:#d79b00
style H fill:#ffe6cc,stroke:#d79b00
style J fill:#d5e8d4,stroke:#82b366
style K fill:#d5e8d4,stroke:#82b366
style L fill:#d5e8d4,stroke:#82b366
4. Map端详解
- 分区计算:根据Partitioner确定每条数据的目标分区
- 内存缓冲:数据先写入内存缓冲区
- 排序与聚合:根据配置决定是否进行排序和聚合
- 溢写机制:
- 当缓冲区达到阈值(
spark.shuffle.spill.numElementsForceSpillThreshold)时触发溢写 - 溢写过程中可能进行排序和聚合
- 当缓冲区达到阈值(
- 文件合并:多个溢写文件最终合并为一个数据文件和一个索引文件
5. Reduce端详解
- 任务初始化:Reduce任务启动时,向DAGScheduler获取上游Shuffle数据的位置信息
- 数据拉取:通过BlockManager从各个Map任务所在节点拉取数据
- 拉取策略:
- 按批次拉取,避免一次性拉取过多数据
- 支持重试机制,处理临时网络故障
- 数据聚合:将拉取的数据进行合并和聚合处理
- 结果计算:对聚合后的数据执行Reduce操作
6. 关键配置参数
// Shuffle行为控制
spark.shuffle.manager = "sort" // Shuffle实现方式,默认sort
spark.shuffle.sort.bypassMergeThreshold = 200 // 小分区数量绕过排序的阈值
// 内存使用控制
spark.shuffle.file.buffer = "32k" // 每个输出流的缓冲大小
spark.shuffle.spill.compress = true // 是否压缩溢写文件
// 网络传输控制
spark.reducer.maxSizeInFlight = "48m" // 每个reduce任务同时拉取的最大数据量
spark.shuffle.io.retryWait = "5s" // 重试等待时间
spark.shuffle.io.maxRetries = 3 // 最大重试次数
7. Shuffle优化策略
减少Shuffle操作:
- 使用
mapPartitions替代map后接reduceByKey - 使用广播变量替代
join
- 使用
调整分区数量:
- 过少:数据倾斜,任务并行度低
- 过多:小文件过多,调度开销大
- 建议:每个分区大小在128MB左右
启用聚合:
- 使用
reduceByKey替代groupByKey - 使用
aggregateByKey进行本地预聚合
- 使用
内存调优:
- 增加Shuffle缓冲区大小减少磁盘I/O
- 调整执行内存比例适应Shuffle需求
序列化优化:
- 使用Kryo序列化减少数据大小
- 注册自定义类提高序列化性能
深入理解Shuffle机制,可以帮助开发者编写更高效的Spark应用,避免常见的性能陷阱,特别是在处理大规模数据时。
性能调优题
Q6: 请详述Spark应用的性能调优策略,从哪些方面可以提升Spark作业的执行效率?
标准答案:
Spark性能调优是一个系统性工作,需要从多个维度进行综合优化。一个高效的Spark应用需要合理的资源配置、优化的代码结构、适当的数据处理策略以及精细的参数调整。
1. 性能调优的整体方法论
性能调优应遵循以下方法论:
- 自上而下:从应用架构到具体参数
- 数据驱动:基于监控指标和性能测试
- 渐进式:从最大瓶颈开始,逐步优化
- 权衡取舍:在资源消耗、执行速度、稳定性之间寻找平衡
2. 资源配置优化
graph TD
A[资源配置优化] --> B[Executor配置]
A --> C[Driver配置]
A --> D[集群资源]
B --> B1[内存大小]
B --> B2[核心数量]
B --> B3[实例数量]
C --> C1[内存大小]
C --> C2[并行度]
D --> D1[节点规格]
D --> D2[资源隔离]
style A fill:#f9f9f9,stroke:#333
style B fill:#d4f1f9,stroke:#05a4d1
style C fill:#ffe6cc,stroke:#d79b00
style D fill:#d5e8d4,stroke:#82b366
Executor配置最佳实践:
- 内存大小:每个Executor 4-8GB内存(过大导致GC延迟)
- 核心数量:每个Executor 4-5个核心(过多导致线程竞争)
- 实例数量:
(集群总核心数 / 每个Executor核心数),预留10%资源
配置示例:
// 10节点集群,每节点16核64GB内存
spark.executor.instances = 30 // (10 * 16) / 5 = 32,预留部分
spark.executor.cores = 5 // 每个Executor 5个核心
spark.executor.memory = "20g" // 每个Executor 20GB内存
spark.driver.memory = "10g" // Driver 10GB内存
3. 并行度优化
并行度是指任务划分的分区数,影响任务的并行执行效率。
并行度设置原则:
- 基准值:集群总核心数的2-3倍
- 数据量:每个分区数据量在128MB左右
- 动态调整:
spark.sql.adaptive.enabled=true
并行度相关配置:
// 静态配置
spark.default.parallelism = 600 // 默认并行度
spark.sql.shuffle.partitions = 600 // SQL操作的并行度
// 动态配置
spark.sql.adaptive.enabled = true // 启用自适应查询执行
spark.sql.adaptive.coalescePartitions.enabled = true // 合并小分区
4. 数据倾斜优化
数据倾斜是指某些分区的数据量远大于其他分区,导致任务执行时间不均衡。
识别数据倾斜:
- Spark UI中观察Stage页面的任务执行时间分布
- 查看Shuffle读写数据量的分布情况
解决方案:
| 倾斜类型 | 解决方案 | 实现方式 |
|---|---|---|
| Join倾斜 | 广播Join | broadcast(smallDF).join(largeDF) |
| Join倾斜 | 拆分热点键 | 对热点键添加随机前缀,扩大Join |
| 聚合倾斜 | 两阶段聚合 | 局部聚合+全局聚合 |
| 聚合倾斜 | 自定义分区 | 实现自定义Partitioner |
| 数据源倾斜 | 预处理 | ETL阶段重新分区 |
代码示例:
// 两阶段聚合示例
val result = rdd
.map(x => (x._1 + "_" + Random.nextInt(10), x._2)) // 加盐
.reduceByKey(_ + _) // 局部聚合
.map(x => (x._1.split("_")(0), x._2)) // 去盐
.reduceByKey(_ + _) // 全局聚合
5. 缓存策略优化
合理的缓存策略可以避免重复计算,提高执行效率。
缓存级别选择:
| 存储级别 | 内存使用 | CPU开销 | 适用场景 |
|---|---|---|---|
| MEMORY_ONLY | 高 | 低 | 默认选择,内存充足 |
| MEMORY_AND_DISK | 中 | 低 | 数据量大于内存 |
| MEMORY_ONLY_SER | 低 | 高 | 内存受限,可接受序列化开销 |
| OFF_HEAP | 低 | 中 | 需要跨应用共享数据 |
缓存使用原则:
- 只缓存重复使用的RDD/DataFrame
- 在Shuffle操作之后、Action操作之前缓存
- 及时使用
unpersist()释放不再使用的缓存
6. Shuffle优化
Shuffle是Spark中最昂贵的操作,优化Shuffle可以显著提升性能。
Shuffle优化策略:
- 减少Shuffle:使用
mapPartitions替代map+reduceByKey - 本地聚合:使用
reduceByKey替代groupByKey - 广播变量:小表广播避免Shuffle
- 参数调整:调整缓冲区大小、压缩算法等
关键参数:
spark.shuffle.file.buffer = "64k" // 增加缓冲区减少磁盘I/O
spark.shuffle.compress = true // 启用压缩
spark.shuffle.io.maxRetries = 6 // 增加重试次数
7. SQL优化
对于Spark SQL应用,可以应用以下优化技术:
查询优化:
- 谓词下推:尽早过滤数据
- 列裁剪:只读取需要的列
- 分区裁剪:只读取需要的分区
- 自动优化:启用AQE、动态分区裁剪等
配置示例:
// 启用自适应查询执行
spark.sql.adaptive.enabled = true
// 启用动态分区裁剪
spark.sql.optimizer.dynamicPartitionPruning.enabled = true
// 启用Join重排序
spark.sql.adaptive.optimizeSkewedJoin = true
8. 序列化优化
序列化影响数据传输和存储效率。
序列化选择:
- Kryo序列化:比Java序列化更高效
- 列式格式:Parquet、ORC等格式更高效
配置示例:
// 启用Kryo序列化
spark.serializer = "org.apache.spark.serializer.KryoSerializer"
// 注册自定义类
spark.kryo.registrator = "com.example.MyRegistrator"
9. 综合性能调优案例
大规模数据Join优化:
// 优化前
val result = largeDF.join(smallDF, Seq("key"))
// 优化后
// 1. 广播小表
val broadcastDF = broadcast(smallDF)
val result = largeDF.join(broadcastDF, Seq("key"))
// 2. 启用AQE和Join优化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
数据倾斜处理:
// 优化前
val result = rdd.reduceByKey(_ + _)
// 优化后
// 1. 两阶段聚合
val result = rdd
.map(x => ((x._1, Random.nextInt(10)), x._2)) // 加盐
.reduceByKey(_ + _) // 局部聚合
.map(x => (x._1._1, x._2)) // 去盐
.reduceByKey(_ + _) // 全局聚合
性能调优是一个持续的过程,需要结合具体应用场景、数据特点和资源情况,采用适当的优化策略。通过系统性的调优,可以显著提升Spark应用的性能和资源利用率。
Q7: 如何识别和解决Spark中的数据倾斜问题?请给出具体的解决方案和代码示例。
标准答案:
数据倾斜是Spark应用中常见的性能瓶颈,表现为某些分区的数据量远大于其他分区,导致任务执行时间不均衡,整体作业延迟。有效解决数据倾斜问题是Spark性能优化的关键环节。
1. 数据倾斜的识别
在解决数据倾斜前,首先需要准确识别问题:
识别方法:
- Spark UI:观察Stage页面中任务执行时间分布,如有明显"长尾"现象则可能存在倾斜
- Shuffle统计:检查Shuffle读写数据量分布是否均衡
- 数据采样:对可能的倾斜键进行采样分析,确定热点数据
倾斜特征:
Task执行时间分布:
[████████] 12s
[████████] 13s
[████████] 11s
[██████████████████████████████████████] 120s <- 明显的倾斜任务
[████████] 14s
2. 数据倾斜的根本原因
数据倾斜通常由以下原因导致:
graph TD
A[数据倾斜根因] --> B[数据本身分布不均]
A --> C[业务逻辑导致]
A --> D[技术实现问题]
B --> B1[热点键/值]
B --> B2[异常数据]
C --> C1[时间维度聚合]
C --> C2[地域维度聚合]
D --> D1[默认分区器问题]
D --> D2[并行度设置不当]
style A fill:#f9f9f9,stroke:#333
style B fill:#d4f1f9,stroke:#05a4d1
style C fill:#ffe6cc,stroke:#d79b00
style D fill:#d5e8d4,stroke:#82b366
3. 解决方案分类
根据倾斜场景和原因,可以采用不同的解决方案:
| 倾斜场景 | 解决方案 | 适用条件 | 优缺点 |
|---|---|---|---|
| Join倾斜 | 广播Join | 一侧数据集较小(<10GB) | 简单高效,但受内存限制 |
| Join倾斜 | 拆分热点键 | 能识别出热点键 | 针对性强,但实现复杂 |
| Join倾斜 | 随机前缀+扩容 | 热点键较多 | 通用性好,但增加计算量 |
| 聚合倾斜 | 两阶段聚合 | 聚合操作(如reduceByKey) | 效果好,适用面广 |
| 聚合倾斜 | 自定义分区 | 数据分布已知 | 精确控制,但需定制开发 |
| 数据源倾斜 | 预处理重分区 | ETL阶段可控 | 治本方法,但增加前置处理 |
4. 详细解决方案
4.1 Join操作倾斜解决方案
方案一:广播Join
// 优化前
val result = largeDF.join(smallDF, "key")
// 优化后
import org.apache.spark.sql.functions.broadcast
val result = largeDF.join(broadcast(smallDF), "key")
方案二:拆分热点键
// 假设发现"000"是热点键
// 1. 将大表中热点键对应的数据拆分出来
val largeDF_normal = largeDF.filter($"key" =!= "000")
val largeDF_skew = largeDF.filter($"key" === "000")
.withColumn("key_random", concat($"key", lit("_"), rand()*10))
// 2. 将小表对应热点键数据扩容
val smallDF_normal = smallDF.filter($"key" =!= "000")
val smallDF_skew = smallDF.filter($"key" === "000")
.withColumn("key_random",
explode(array((0 until 10).map(i => concat($"key", lit("_"), lit(i))): _*)))
// 3. 分别Join后合并结果
val join1 = largeDF_normal.join(smallDF_normal, "key")
val join2 = largeDF_skew.join(smallDF_skew,
largeDF_skew("key_random") === smallDF_skew("key_random"))
.drop("key_random")
val result = join1.union(join2)
方案三:随机前缀+扩容Join
// 1. 大表添加随机前缀
val largeDF_rand = largeDF.withColumn("prefix", (rand()*10).cast("int"))
.withColumn("key_prefixed", concat(col("prefix").cast("string"), lit("_"), col("key")))
// 2. 小表扩容10倍
val smallDF_expanded = smallDF.withColumn("prefix",
explode(array((0 until 10).map(lit(_)): _*)))
.withColumn("key_prefixed", concat(col("prefix").cast("string"), lit("_"), col("key")))
// 3. 在prefixed key上Join
val joinResult = largeDF_rand.join(smallDF_expanded, "key_prefixed")
.drop("prefix", "key_prefixed")
4.2 聚合操作倾斜解决方案
方案一:两阶段聚合
// 优化前
val result = rdd.reduceByKey(_ + _)
// 优化后
val result = rdd
// 第一阶段:局部聚合,加随机前缀
.map(x => ((x._1, Random.nextInt(100)), x._2))
.reduceByKey(_ + _)
// 第二阶段:全局聚合,去除随机前缀
.map(x => (x._1._1, x._2))
.reduceByKey(_ + _)
方案二:自定义分区器
// 定义自定义分区器
class BalancedPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
val k = key.toString
// 对热点键特殊处理
if (k == "hot_key_1") {
Math.abs(Random.nextInt() % partitions)
} else {
Math.abs(k.hashCode % partitions)
}
}
}
// 使用自定义分区器
val result = rdd
.partitionBy(new BalancedPartitioner(100))
.reduceByKey(_ + _)
4.3 数据源倾斜解决方案
方案一:预处理过滤异常数据
// 过滤掉可能导致倾斜的异常值
val cleanedDF = rawDF.filter($"key".isNotNull && $"key" =!= "")
方案二:预聚合处理
// 在ETL阶段进行预聚合
val preAggregatedDF = rawDF
.repartition(200, $"date", $"region") // 先按非倾斜维度重分区
.groupBy($"date", $"region", $"user_id") // 低粒度预聚合
.agg(sum($"value").as("value"))
5. 实际案例分析
案例:用户行为数据分析中的数据倾斜
问题描述: 在电商用户行为分析中,需要统计每个商品的点击次数,但某些热门商品的点击量远高于其他商品。
解决方案:
// 原始代码
val clickCounts = userClicks
.groupBy("product_id")
.count()
// 优化后代码
// 1. 数据采样,识别热点商品
val sampleDF = userClicks.sample(0.1)
val hotProducts = sampleDF
.groupBy("product_id")
.count()
.orderBy($"count".desc)
.limit(10)
.collect()
.map(_.getAs[String]("product_id"))
.toSet
val bcHotProducts = spark.sparkContext.broadcast(hotProducts)
// 2. 对热点商品特殊处理
val processedClicks = userClicks.mapPartitions(iter => {
val hotProds = bcHotProducts.value
iter.map(row => {
val productId = row.getAs[String]("product_id")
if (hotProds.contains(productId)) {
// 为热点商品添加随机后缀
Row.fromSeq(row.toSeq :+ (productId + "_" + Random.nextInt(100)))
} else {
// 非热点商品保持不变
Row.fromSeq(row.toSeq :+ productId)
}
})
}, true)
// 3. 使用处理后的键进行聚合
val schema = userClicks.schema.add("balanced_key", StringType)
val balancedDF = spark.createDataFrame(processedClicks, schema)
val result = balancedDF
.groupBy("balanced_key")
.count()
// 去除随机后缀,恢复原始商品ID
.withColumn("product_id",
when($"balanced_key".contains("_"),
split($"balanced_key", "_").getItem(0))
.otherwise($"balanced_key"))
.groupBy("product_id")
.sum("count")
.drop("balanced_key")
6. 预防数据倾斜的最佳实践
- 合理设计键:避免使用可能产生热点的键(如时间戳精确到秒)
- 提前预估:在开发前评估数据分布情况
- 监控机制:建立任务监控,及时发现倾斜问题
- 数据质量:在数据接入阶段处理异常值和空值
- 并行度:设置合理的并行度,避免分区过少
有效解决数据倾斜问题需要结合具体业务场景和数据特点,灵活运用各种技术手段,从根本上优化数据分布,提高Spark作业的执行效率。
实战应用题
Q8: 请介绍Spark SQL的优化技术,如何提高SQL查询性能?
标准答案:
Spark SQL是Spark生态系统中的重要组件,它提供了结构化数据处理能力和SQL查询接口。通过一系列优化技术,可以显著提升SQL查询性能。
1. Spark SQL优化技术概述
Spark SQL优化主要分为以下几个方面:
- Catalyst优化器:基于规则和成本的查询优化
- Tungsten执行引擎:内存管理和代码生成优化
- 参数配置:针对特定场景的参数调整
2. 关键优化配置
// 开启自适应查询执行(AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
// 启用小分区合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// 设置合并后的目标分区大小
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
// 广播Join优化
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
// 启用AQE优化的Join策略
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
// 动态分区裁剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
// 启用Join重排序
spark.conf.set("spark.sql.optimizer.joinReorder.enabled", "true")
3. 查询优化实例
// 优化前
val result = spark.sql("""
SELECT c.customer_name, sum(o.order_amount) as total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date > '2023-01-01'
GROUP BY c.customer_name
""")
// 优化后
// 1. 使用广播Join
val customers = spark.table("customers")
val orders = spark.table("orders").filter($"order_date" > "2023-01-01")
import org.apache.spark.sql.functions.broadcast
val result = orders.join(broadcast(customers), "customer_id")
.groupBy($"customer_name")
.agg(sum($"order_amount").as("total_amount"))
// 2. 启用AQE和其他优化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
val result = spark.sql("""
SELECT /*+ BROADCAST(c) */
c.customer_name, sum(o.order_amount) as total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date > '2023-01-01'
GROUP BY c.customer_name
""")
4. 性能优化最佳实践
- 数据格式选择:使用列式存储格式(Parquet、ORC)
- 分区策略:根据查询模式选择合适的分区键
- 预聚合:对常用查询创建物化视图
- 缓存管理:缓存频繁使用的表或查询结果
- SQL Hint:使用查询提示指导优化器
通过综合应用这些优化技术,可以显著提升Spark SQL查询性能,特别是在处理大规模数据集时效果更为明显。
Q9: 当Spark应用出现故障或性能问题时,如何进行排查和解决?
标准答案:
Spark应用故障排查是一项系统性工作,需要从多个维度收集信息,分析根因,并采取相应的解决措施。
1. 故障排查方法论
有效的故障排查需要遵循以下方法论:
- 系统性分析:从应用、集群到资源全面考虑
- 数据驱动:基于日志和监控数据进行分析
- 逐层排除:从外到内或从内到外逐层排查
- 复现验证:尝试复现问题并验证解决方案
2. 排查步骤详解
查看Spark UI
- 分析失败的Stage和Task
- 检查Job执行时间和资源使用情况
- 识别异常的执行模式(如数据倾斜)
检查日志信息
- Driver日志:应用级别错误和异常
- Executor日志:任务执行错误
- Worker/Master日志:集群级别问题
- 系统日志:资源和环境问题
资源监控分析
- CPU使用率:是否存在计算瓶颈
- 内存使用:是否存在OOM或GC问题
- 磁盘I/O:是否存在存储瓶颈
- 网络传输:是否存在网络瓶颈
常见问题诊断
| 问题类型 | 症状 | 诊断方法 | 可能解决方案 |
|---|---|---|---|
| OOM错误 | java.lang.OutOfMemoryError | 检查GC日志,内存使用趋势 | 增加内存,调整分区,优化代码 |
| 数据倾斜 | 少数任务执行时间远长于其他任务 | 查看Stage详情,分析数据分布 | 加盐处理,预聚合,调整分区 |
| 序列化错误 | java.io.NotSerializableException | 检查类的序列化实现 | 实现Serializable接口,使用@transient注解 |
| Shuffle失败 | FetchFailedException | 检查Shuffle写入和读取日志 | 增加内存,调整Shuffle参数 |
| 资源不足 | 任务排队,执行缓慢 | 查看集群资源使用情况 | 增加资源,优化资源分配 |
3. 性能问题排查工具
# 查看Spark应用日志
yarn logs -applicationId application_1234567890_0001
# 使用jstack查看JVM线程状态
jstack <pid> > thread_dump.txt
# 使用jmap查看内存使用
jmap -heap <pid>
# 使用Spark History Server查看历史应用
http://history-server:18080
4. 常见问题解决方案
// 解决OOM问题
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.memory.fraction", "0.8")
spark.conf.set("spark.memory.storageFraction", "0.3")
// 解决Shuffle问题
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.shuffle.io.maxRetries", "10")
// 解决数据倾斜
// 参见数据倾斜解决方案
5. 预防措施
- 监控系统:建立应用和集群监控
- 性能测试:在生产环境前进行压力测试
- 渐进式部署:先小规模测试,再扩大规模
- 容量规划:根据数据增长预估资源需求
通过系统性的故障排查和性能优化,可以提高Spark应用的稳定性和效率,减少生产环境中的问题发生。
深度技术原理题
Q10: 请详细解释Spark的Catalyst优化器的工作原理及其优化规则。
标准答案:
Catalyst优化器是Spark SQL的核心优化引擎,它基于Scala的模式匹配和函数式编程特性构建,为Spark SQL提供了强大的查询优化能力。理解Catalyst的工作原理对于编写高效的Spark SQL应用至关重要。
1. Catalyst优化器架构
Catalyst优化器的核心架构包括以下组件:
- 树节点转换框架:基于Scala模式匹配的树转换机制
- 规则执行引擎:应用优化规则的执行器
- 成本模型:评估不同执行计划性能的模型
- 代码生成引擎:将物理计划转换为高效执行代码
2. 优化流程详解
graph TD
A[SQL/DataFrame API] --> B["抽象语法树(AST)"]
B --> C[未解析逻辑计划]
C --> D[解析逻辑计划]
D --> E[优化逻辑计划]
E --> F[物理计划生成]
F --> G[物理计划优化]
G --> H[选择最优计划]
H --> I[代码生成]
I --> J[执行]
style A fill:#f9f9f9,stroke:#333
style B fill:#d4f1f9,stroke:#05a4d1
style C fill:#d4f1f9,stroke:#05a4d1
style D fill:#ffe6cc,stroke:#d79b00
style E fill:#ffe6cc,stroke:#d79b00
style F fill:#d5e8d4,stroke:#82b366
style G fill:#d5e8d4,stroke:#82b366
style H fill:#d5e8d4,stroke:#82b366
style I fill:#e1d5e7,stroke:#9673a6
style J fill:#f8cecc,stroke:#b85450
3. 优化阶段详细说明
语法分析
- 将SQL语句解析为抽象语法树(AST)
- 使用ANTLR语法解析器处理SQL语法
- 转换DataFrame/Dataset API调用为内部表示
逻辑计划生成与解析
- 将AST转换为未解析逻辑计划
- 通过Catalog解析表名、列名和函数名
- 进行类型推断和类型检查
逻辑计划优化
- 应用基于规则的优化策略
- 优化转换是声明式的,基于模式匹配
- 多轮应用规则直至计划稳定
物理计划生成
- 将逻辑算子转换为物理算子
- 为同一逻辑操作生成多种物理实现
- 如Sort可实现为SortExec或ExternalSortExec
物理计划优化与选择
- 使用基于成本的优化器评估计划
- 考虑数据大小、操作复杂度等因素
- 选择成本最低的执行计划
代码生成
- 使用Janino编译器生成Java字节码
- 将多个操作融合为单个函数
- 减少虚函数调用和解释开销
4. 核心优化规则详解
| 优化规则 | 描述 | 示例 |
|---|---|---|
| 谓词下推 | 将过滤条件尽早应用,减少数据量 | SELECT * FROM t1 JOIN t2 WHERE t1.id > 100 → 先过滤t1.id > 100再Join |
| 列裁剪 | 只读取和处理查询所需的列 | SELECT name FROM (SELECT id, name, age FROM users) → 只读取name列 |
| 常量折叠 | 编译时计算常量表达式 | SELECT id + 5 * 10 FROM t → SELECT id + 50 FROM t |
| Join重排序 | 优化多表Join的顺序 | 小表先Join,减少中间结果 |
| Join选择 | 根据表大小选择Join策略 | 小表使用BroadcastHashJoin,大表使用SortMergeJoin |
| 分区裁剪 | 只读取包含所需数据的分区 | WHERE date='2023-01-01' → 只读取该日期的分区 |
| 聚合优化 | 部分聚合+最终聚合 | 先在每个分区内聚合,再全局聚合 |
5. 代码示例:Catalyst转换过程
// 示例查询
val query = spark.sql("""
SELECT c.name, sum(o.amount) as total
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.date > '2023-01-01'
GROUP BY c.name
HAVING sum(o.amount) > 1000
""")
// 查看逻辑计划
println("Logical Plan:")
query.queryExecution.logical.explain(true)
// 查看优化后的逻辑计划
println("Optimized Logical Plan:")
query.queryExecution.optimizedPlan.explain(true)
// 查看物理计划
println("Physical Plan:")
query.queryExecution.sparkPlan.explain(true)
// 查看执行计划
println("Executed Plan:")
query.queryExecution.executedPlan.explain(true)
6. 自定义优化规则
Catalyst允许开发者扩展优化规则,实现自定义优化:
// 自定义优化规则示例
object MyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(condition, child) if canOptimize(condition) =>
// 自定义优化逻辑
OptimizedFilter(optimizeCondition(condition), child)
}
}
// 注册自定义规则
spark.experimental.extraOptimizations = Seq(MyOptimizationRule)
Catalyst优化器是Spark SQL性能优越的关键因素,通过理解其工作原理和优化规则,可以编写更高效的Spark SQL应用,并在必要时通过自定义规则进一步提升性能。
Q11: 请详细介绍Spark内存管理的演进历史,旧版本和新版本的区别是什么?
标准答案:
Spark内存管理机制经历了重要的演进过程,从早期的静态内存管理到现代的统一内存管理,显著提升了内存利用率和应用性能。
1. 内存管理演进的背景
Spark作为内存计算框架,其性能很大程度上取决于内存管理效率。早期版本的内存管理机制存在诸多问题,如内存划分固定、配置复杂、资源利用率低等,这促使Spark团队不断优化内存管理机制。
2. 静态内存管理(Static Memory Management)
静态内存管理是Spark 1.6之前的默认模式,其特点是:
- 固定内存划分:预先为不同用途划分固定比例的内存
- 严格边界:各内存区域之间不能动态调整
- 手动配置:需要用户手动调整多个参数
静态内存划分:
+---------------------------+---------------------------+---------------+
| Storage Memory | Execution Memory | Other Memory |
| spark.storage.memoryFraction | spark.shuffle.memoryFraction | Remainder |
| (默认0.6) | (默认0.2) | (0.2) |
+---------------------------+---------------------------+---------------+
关键参数:
// 静态内存管理关键参数
spark.storage.memoryFraction = 0.6 // 缓存RDD数据的内存比例
spark.storage.unrollFraction = 0.2 // 用于展开RDD的内存比例
spark.shuffle.memoryFraction = 0.2 // Shuffle操作的内存比例
3. 统一内存管理(Unified Memory Management)
统一内存管理是Spark 1.6及之后的默认模式,其特点是:
- 动态内存共享:Storage和Execution内存可以相互借用
- 简化配置:减少配置参数,更易使用
- 自适应调整:根据运行时需求动态分配内存
统一内存划分:
+--------------------+----------------------+---------------+
| Reserved | User Memory | Spark Memory |
| (300MB) | (1-spark.memory.fraction) | spark.memory.fraction |
+--------------------+----------------------+---------------+
|
+----------------------+---------------+
| Storage Memory | Execution Memory |
| spark.memory.storageFraction | Remainder |
+----------------------+---------------+
关键参数:
// 统一内存管理关键参数
spark.memory.fraction = 0.75 // Spark Memory占JVM堆内存的比例
spark.memory.storageFraction = 0.5 // Storage Memory初始占比
4. 两种模式的核心区别
| 特性 | 静态内存管理 | 统一内存管理 |
|---|---|---|
| 内存划分 | 固定比例,不可调整 | 动态共享,可相互借用 |
| 配置复杂度 | 多参数,调优复杂 | 少量参数,简化配置 |
| 内存利用率 | 较低,常有浪费 | 较高,按需分配 |
| 适用场景 | 负载稳定,可预测 | 多样化负载,资源竞争 |
| 溢出处理 | 直接溢出到磁盘 | 先尝试借用,再溢出 |
| 版本支持 | 1.6之前默认 | 1.6及之后默认 |
5. 内存借用机制详解
在统一内存管理模式下:
Storage Memory不足时:
- 如果Execution Memory有空闲,可以借用
- 如果没有空闲,则溢出到磁盘
Execution Memory不足时:
- 如果Storage Memory有空闲,可以借用
- 如果Storage Memory中有部分是被Execution Memory借用的,可以强制收回
- Execution Memory不会淘汰Storage Memory中的数据
6. 代码示例:配置对比
// 静态内存管理配置示例
spark.storage.memoryFraction = 0.6
spark.storage.unrollFraction = 0.2
spark.shuffle.memoryFraction = 0.2
// 统一内存管理配置示例
spark.memory.fraction = 0.75
spark.memory.storageFraction = 0.5
7. 实际应用建议
- 内存密集型计算:增加
spark.memory.fraction,分配更多内存给Spark - 缓存优先场景:增加
spark.memory.storageFraction,提高缓存容量 - Shuffle密集场景:降低
spark.memory.storageFraction,提供更多执行内存 - 监控内存使用:通过Spark UI监控内存使用情况,及时调整参数
Spark内存管理的演进体现了分布式计算框架对资源利用效率的不断追求。统一内存管理机制显著提升了Spark的内存利用率,减少了OOM错误,简化了配置,是Spark性能优化的重要里程碑。
Q12: 请详细对比Spark Streaming和Structured Streaming的区别,并说明各自的适用场景。
标准答案:
Spark提供了两种流处理引擎:Spark Streaming和Structured Streaming,它们在设计理念、API、处理模型等方面存在显著差异。深入理解这些差异对于选择合适的流处理技术至关重要。
1. 基本概念对比
Spark Streaming: Spark 0.7版本引入的第一代流处理引擎,基于RDD模型,采用微批处理架构,将流数据分割成小批次进行处理。
Structured Streaming: Spark 2.0版本引入的第二代流处理引擎,基于Spark SQL引擎,将流数据视为无界表,提供更高级的API和优化。
2. 核心特性对比
| 特性 | Spark Streaming | Structured Streaming | 差异影响 |
|---|---|---|---|
| 处理模型 | 微批处理(DStream) | 连续处理(无界表) | Structured Streaming可实现更低延迟 |
| API抽象 | DStream API | DataFrame/Dataset API | Structured Streaming使用更统一的API |
| 编程模型 | 函数式转换 | 声明式查询 | Structured Streaming代码更简洁 |
| 容错机制 | WAL + Checkpoint | 状态存储 + Checkpoint | 两者都支持容错,但实现方式不同 |
| 延迟 | 秒级 | 毫秒级(连续处理模式) | Structured Streaming可实现更低延迟 |
| 状态管理 | updateStateByKey/mapWithState | 内置状态管理 | Structured Streaming状态管理更强大 |
| 事件时间 | 有限支持 | 原生支持 | Structured Streaming更适合事件时间处理 |
| 水印机制 | 不支持 | 支持 | Structured Streaming能更好处理乱序数据 |
| 输出模式 | 固定模式 | Complete/Append/Update | Structured Streaming提供更灵活的输出选项 |
| 优化器 | 无 | Catalyst优化器 | Structured Streaming查询性能更高 |
| 端到端一致性 | 至少一次 | 精确一次 | Structured Streaming提供更强的一致性保证 |
3. 代码示例对比
Spark Streaming示例:
// 创建StreamingContext
val ssc = new StreamingContext(sparkContext, Seconds(1))
// 从Kafka读取数据
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 处理数据
val wordCounts = kafkaStream
.map(record => record.value)
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
// 输出结果
wordCounts.print()
// 启动流处理
ssc.start()
ssc.awaitTermination()
Structured Streaming示例:
// 从Kafka读取数据
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic")
.load()
// 处理数据
val wordCounts = kafkaStream
.selectExpr("CAST(value AS STRING)")
.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
// 输出结果
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
4. 高级特性对比
事件时间处理:
- Spark Streaming:需要手动实现,较为复杂
- Structured Streaming:内置支持
// 使用事件时间窗口 val windowedCounts = kafkaStream .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") .as[(String, Timestamp)] .flatMap(record => record._1.split(" ").map((_, record._2))) .groupBy( window($"_2", "10 minutes", "5 minutes"), $"_1" ) .count()
水印机制:
- Spark Streaming:不支持
- Structured Streaming:内置支持
// 添加水印
val windowedCounts = kafkaStream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"value"
)
.count()
5. 性能与扩展性对比
| 方面 | Spark Streaming | Structured Streaming |
|---|---|---|
| 查询优化 | 无内置优化器 | 使用Catalyst优化器 |
| 内存使用 | 基于RDD | 基于Tungsten |
| 吞吐量 | 高 | 更高(得益于优化) |
| 延迟 | 取决于批次间隔 | 可配置(微批或连续) |
| 扩展性 | 水平扩展 | 水平扩展 |
6. 适用场景建议
选择Spark Streaming的场景:
- 与现有RDD代码集成
- 简单的流处理需求
- 对API稳定性要求高(API较为稳定)
- 需要自定义复杂的转换操作
- 与遗留系统集成
选择Structured Streaming的场景:
- 新项目开发
- 需要低延迟处理
- 需要事件时间处理和水印支持
- 需要强一致性保证
- 复杂的流处理需求(如窗口聚合、会话分析)
- 与Spark SQL生态系统集成
7. 未来发展趋势
Structured Streaming是Spark流处理的未来发展方向,Spark团队将主要精力投入到Structured Streaming的改进和优化中。对于新项目,建议优先考虑Structured Streaming,除非有特殊需求只能使用Spark Streaming。
总体而言,Structured Streaming相比Spark Streaming提供了更高级的API、更强大的功能和更好的性能,是Spark流处理技术的重要进步。
Q13: 请详细解释Spark如何实现Exactly-Once语义,包括原理和实现方式。
标准答案:
在分布式流处理系统中,数据处理语义是一个核心问题,特别是在面对节点故障和网络分区等情况时。Exactly-Once语义是指每条数据被精确处理一次,不多不少,这是最强的一致性保证。Spark通过多层机制实现了Exactly-Once语义。
1. 数据处理语义级别
首先理解三种主要的数据处理语义:
| 语义级别 | 描述 | 实现难度 | 性能影响 |
|---|---|---|---|
| At-most-once | 数据最多处理一次,可能丢失 | 低 | 几乎无影响 |
| At-least-once | 数据至少处理一次,可能重复 | 中 | 轻微影响 |
| Exactly-once | 数据精确处理一次,不丢失不重复 | 高 | 可能显著影响 |
2. Spark实现Exactly-Once的核心机制
Spark通过以下多层机制共同保证Exactly-Once语义:
2.1 数据源端保证
- 可重放的数据源:使用支持偏移量跟踪的数据源(如Kafka)
- 偏移量管理:精确记录和管理已处理数据的偏移量
- 断点续传:从上次处理的位置继续处理
2.2 处理过程保证
- 确定性操作:确保相同输入产生相同输出
- Checkpoint机制:定期保存计算状态和进度
- WAL(预写日志):记录所有状态变更操作
- 幂等性转换:确保重复执行不会产生副作用
2.3 数据输出保证
- 幂等性写入:确保重复写入不会导致数据重复
- 事务性写入:使用支持事务的存储系统
- 两阶段提交:确保数据处理和结果写入的原子性
- 输出提交协议:将输出操作与偏移量提交绑定
3. Structured Streaming中的Exactly-Once实现
Structured Streaming通过以下方式实现Exactly-Once:
graph TD
A[数据源] --> B[偏移量跟踪]
B --> C[微批处理]
C --> D[状态管理]
D --> E[输出模式]
E --> F[事务性Sink]
B -.-> G[WAL/Checkpoint]
D -.-> G
F -.-> G
style A fill:#d4f1f9,stroke:#05a4d1
style B fill:#ffe6cc,stroke:#d79b00
style C fill:#d5e8d4,stroke:#82b366
style D fill:#e1d5e7,stroke:#9673a6
style E fill:#f8cecc,stroke:#b85450
style F fill:#f8cecc,stroke:#b85450
style G fill:#fff2cc,stroke:#d6b656
3.1 详细实现机制
偏移量跟踪:
- 记录每个数据源的读取位置
- 将偏移量与处理结果关联
- 在Checkpoint中保存偏移量信息
状态管理:
- 使用状态存储保存中间状态
- 定期Checkpoint状态到可靠存储
- 故障恢复时重建状态
输出提交协议:
- 将结果写入与偏移量提交绑定
- 确保原子性:要么都成功,要么都失败
- 使用事务性Sink或幂等性写入
4. 代码示例:实现Exactly-Once语义
4.1 使用事务性输出:
// 使用foreachBatch实现事务性输出
val query = inputStream
.writeStream
.foreachBatch { (batchDF, batchId) =>
// 开启事务
val connection = getConnection() // 获取数据库连接
connection.beginTransaction()
try {
// 使用batchId确保幂等性
val outputPath = s"output/batch_$batchId"
// 删除可能存在的旧数据(幂等性保证)
connection.executeUpdate(s"DELETE FROM results WHERE batch_id = $batchId")
// 写入新数据
batchDF.write
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", jdbcUrl)
.option("dbtable", "results")
.option("user", user)
.option("password", password)
.mode("append")
.save()
// 提交事务
connection.commit()
} catch {
case e: Exception =>
// 回滚事务
connection.rollback()
throw e
} finally {
connection.close()
}
}
.option("checkpointLocation", "/checkpoint")
.start()
4.2 使用幂等性写入:
// 使用幂等性写入
val query = inputStream
.writeStream
.foreachBatch { (batchDF, batchId) =>
// 使用批次ID作为唯一标识,确保幂等性
val outputPath = s"output/batch_$batchId"
// 覆盖写入,确保幂等性
batchDF.write
.mode("overwrite") // 覆盖模式确保幂等性
.parquet(outputPath)
// 可选:更新元数据表记录已处理的批次
spark.sql(s"""
MERGE INTO batch_metadata
USING (SELECT $batchId as id) AS source
ON batch_metadata.id = source.id
WHEN MATCHED THEN UPDATE SET processed_time = current_timestamp()
WHEN NOT MATCHED THEN INSERT (id, processed_time) VALUES ($batchId, current_timestamp())
""")
}
.option("checkpointLocation", "/checkpoint")
.start()
5. 不同场景下的Exactly-Once实现
| 场景 | 实现方式 | 关键点 |
|---|---|---|
| Kafka → HDFS | WAL + 幂等性写入 | 使用批次ID作为文件名或路径 |
| Kafka → 数据库 | 两阶段提交 | 将偏移量与数据库事务绑定 |
| Kafka → Kafka | 事务性API | 使用Kafka事务API |
| 有状态操作 | Checkpoint + 状态存储 | 定期Checkpoint状态 |
6. 实现Exactly-Once的最佳实践
- 选择合适的数据源:使用支持重放和偏移量管理的数据源
- 启用Checkpoint:配置可靠的Checkpoint存储
- 使用幂等性操作:确保重复执行不会产生副作用
- 事务性输出:使用支持事务的输出系统
- 错误处理:妥善处理异常,避免部分提交
- 监控与验证:建立监控机制验证Exactly-Once语义
7. 限制与挑战
- 性能开销:实现Exactly-Once通常会带来一定性能开销
- 外部系统限制:依赖外部系统对事务的支持
- 复杂性增加:实现和维护更为复杂
- 调试难度:问题排查更具挑战性
Spark的Exactly-Once语义实现是一个多层次的系统,通过结合多种机制共同保证数据处理的一致性。理解这些机制有助于构建可靠的流处理应用,并在面对故障时保持数据一致性。
故障排查与运维题
Q1: 如何诊断和解决Spark应用中的性能瓶颈和数据倾斜问题?
标准答案:
Spark应用性能问题主要分为资源瓶颈、数据倾斜、代码优化等几个方面,需要系统性地进行诊断和解决。
1. 性能问题诊断方法
Spark UI分析:
- 作业视图:检查整体执行时间、失败任务、重试次数
- Stage视图:分析每个Stage的执行时间,找出耗时最长的Stage
- Task视图:查看任务执行时间分布,识别异常值(数据倾斜的标志)
- Storage视图:检查缓存使用情况和内存压力
- Executors视图:监控资源使用情况、GC活动
日志分析:
// 在代码中添加性能计时器
val startTime = System.currentTimeMillis()
val result = dataFrame.groupBy("key").count()
val endTime = System.currentTimeMillis()
println(s"Operation took ${endTime - startTime} ms")
// 启用SQL执行统计
spark.conf.set("spark.sql.execution.planChangeLog.level", "WARN")
2. 数据倾斜问题解决方案
识别数据倾斜:
// 检查键分布
val keyDistribution = rdd
.map(x => (x._1, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
.take(10)
println("Top 10 keys by frequency:")
keyDistribution.foreach(println)
解决方案:
(1) 预聚合 + 二次聚合:
// 原始代码(可能倾斜)
val result = rdd.reduceByKey(_ + _)
// 优化后(两阶段聚合)
val localAggregated = rdd.mapPartitions(iter => {
iter.toList.groupBy(_._1).mapValues(_.map(_._2).sum).iterator
})
val result = localAggregated.reduceByKey(_ + _)
(2) 加盐处理:
// 对倾斜的Key加盐
val skewedKeys = Set("hot_key1", "hot_key2") // 预先识别的热点键
val saltedRDD = rdd.map { case (key, value) =>
if (skewedKeys.contains(key)) {
val salt = Random.nextInt(10)
((key, salt), value)
} else {
((key, 0), value)
}
}
// 第一次聚合
val firstAgg = saltedRDD.reduceByKey(_ + _)
// 去盐,第二次聚合
val result = firstAgg
.map { case ((key, salt), value) => (key, value) }
.reduceByKey(_ + _)
(3) 广播变量优化Join:
// 原始Join(可能倾斜)
val joinResult = largeRDD.join(smallRDD)
// 优化:广播小RDD
val smallMap = smallRDD.collectAsMap()
val broadcastMap = spark.sparkContext.broadcast(smallMap)
val joinResult = largeRDD.mapPartitions { iter =>
iter.flatMap { case (k, v1) =>
broadcastMap.value.get(k) match {
case Some(v2) => Iterator((k, (v1, v2)))
case None => Iterator.empty
}
}
}
3. 资源配置优化
内存配置:
# 调整内存配置
spark-submit \
--executor-memory 8g \
--executor-cores 4 \
--driver-memory 4g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.executor.memoryOverhead=2g \
--conf spark.dynamicAllocation.enabled=true
并行度调整:
// 设置适当的并行度
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.default.parallelism", 200)
// 或者根据数据量动态计算
val optimalPartitions = Math.max(200, totalSizeInGB * 2)
dataFrame.repartition(optimalPartitions)
4. 监控与预防措施
监控指标:
- CPU利用率:保持在70-80%为佳
- 内存使用率:避免频繁GC和OOM
- Shuffle数据量:减少大量数据移动
- 任务执行时间分布:标准差小于平均值的20%为佳
性能调优最佳实践:
- 定期进行性能基准测试
- 建立关键指标监控仪表板
- 实施自动告警机制
- 对关键任务进行性能分析和优化
- 维护性能优化知识库和最佳实践
Q2: 如何构建高可用的Spark生产环境,并实施有效的监控和故障恢复策略?
标准答案:
构建高可用的Spark生产环境需要从架构设计、监控系统、故障恢复和运维流程等多方面入手。
1. 高可用架构设计
Spark HA部署架构:
graph TD
A[Spark应用] --> B[Spark HA]
B --> C1[主Master]
B --> C2[备用Master]
C1 --> D[ZooKeeper集群]
C2 --> D
C1 --> E1[Worker1]
C1 --> E2[Worker2]
C1 --> E3[Worker3]
E1 --> F1[Executor]
E2 --> F2[Executor]
E3 --> F3[Executor]
style A fill:#d4f1f9
style B fill:#d5e8d4
style C1 fill:#ffe6cc
style C2 fill:#ffe6cc
style D fill:#f8cecc
YARN集成配置:
<!-- yarn-site.xml -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
# Spark on YARN提交配置
spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=10 \
--conf spark.yarn.executor.failuresValidityInterval=1h
2. 全面监控系统
多层次监控架构:
- 基础设施监控:服务器CPU、内存、磁盘、网络
- Spark应用监控:作业执行、资源使用、GC活动
- 业务指标监控:数据处理量、延迟、错误率
监控工具集成:
# Prometheus配置示例
scrape_configs:
- job_name: 'spark_metrics'
static_configs:
- targets: ['spark-master:7077', 'spark-worker-1:8081', 'spark-worker-2:8081']
metrics_path: /metrics
scheme: http
- job_name: 'spark_applications'
static_configs:
- targets: ['spark-app-exporter:9091']
自定义监控指标:
// 注册自定义Spark监听器
val listener = new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val jobDuration = jobEnd.time - jobStart.getOrElse(jobEnd.jobId, jobEnd.time)
metrics.gauge("spark.job.duration").update(jobDuration)
if (jobEnd.jobResult == JobSucceeded) {
metrics.counter("spark.job.success").inc()
} else {
metrics.counter("spark.job.failure").inc()
}
}
}
spark.sparkContext.addSparkListener(listener)
告警策略:
# Alertmanager配置
rules:
- alert: SparkJobFailure
expr: rate(spark_job_failure[5m]) > 0
for: 2m
labels:
severity: critical
annotations:
summary: "Spark作业失败"
description: "{{ $labels.job }} 在过去5分钟内有作业失败"
- alert: SparkExecutorHighMemoryUsage
expr: spark_executor_memory_used_bytes / spark_executor_memory_max_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "Executor内存使用率高"
description: "Executor {{ $labels.executor_id }} 内存使用率超过90%"
3. 故障恢复机制
应用级恢复:
// 设置Checkpoint目录
sc.setCheckpointDir("hdfs://namenode:8020/spark/checkpoints")
// 对关键RDD进行Checkpoint
val result = rdd.map(complexTransformation)
.filter(filterCondition)
.checkpoint()
流处理恢复:
// Structured Streaming故障恢复配置
val query = df.writeStream
.format("kafka")
.option("checkpointLocation", "hdfs://namenode:8020/checkpoints/kafka-stream")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.start()
自动重启策略:
# 使用supervisord管理Spark应用
[program:spark-streaming-app]
command=/opt/spark/bin/spark-submit --master yarn --deploy-mode cluster /path/to/app.jar
autostart=true
autorestart=true
startretries=10
startsecs=10
redirect_stderr=true
stdout_logfile=/var/log/spark-app.log
4. 运维最佳实践
部署流程自动化:
# CI/CD流水线示例 (GitLab CI)
stages:
- build
- test
- deploy
build_jar:
stage: build
script:
- sbt clean assembly
artifacts:
paths:
- target/scala-2.12/app.jar
run_tests:
stage: test
script:
- sbt test
deploy_to_prod:
stage: deploy
script:
- ansible-playbook -i inventory/prod deploy-spark-app.yml
environment:
name: production
when: manual
滚动升级策略:
- 使用蓝绿部署或金丝雀发布
- 先升级部分节点验证稳定性
- 设置超时和回滚机制
灾备与数据恢复:
- 跨区域备份关键数据
- 定期演练故障恢复流程
- 维护详细的运维手册和故障处理流程
5. 性能与可靠性平衡
资源隔离:
# YARN资源队列配置
spark-submit \
--queue production_critical \
--conf spark.yarn.am.nodeLabelExpression="production" \
--conf spark.yarn.executor.nodeLabelExpression="production"
优雅降级策略:
// 实现优雅降级逻辑
def processData(data: DataFrame): DataFrame = {
try {
// 完整处理逻辑
data.groupBy("key")
.agg(
sum("value").as("total_value"),
countDistinct("user_id").as("unique_users")
)
} catch {
case e: OutOfMemoryError =>
logger.warn("内存不足,切换到简化处理逻辑", e)
// 简化处理逻辑
data.groupBy("key")
.agg(sum("value").as("total_value"))
}
}
通过以上全面的高可用架构设计、监控系统、故障恢复机制和运维最佳实践,可以构建一个稳定可靠的Spark生产环境,确保关键业务应用的连续性和可靠性。