12. Spark

目录


Spark 概述与环境

Spark简介

Apache Spark 是一个快速、通用的大数据处理引擎,专为大规模数据处理而设计。它提供了高级API(Java、Scala、Python、R),并支持用于SQL查询、流处理、机器学习和图形处理的优化引擎。

Spark特点与优势

核心特点

  • 速度快:内存计算比Hadoop MapReduce快100倍,磁盘计算快10倍
  • 易用性:提供多种语言API,支持80多种高级算子
  • 通用性:支持SQL查询、流处理、机器学习、图计算
  • 兼容性:可运行在Hadoop、Mesos、Kubernetes、standalone等集群上

技术优势

特性SparkHadoop MapReduce
计算模式内存计算 + 磁盘存储磁盘计算
数据共享RDD内存共享磁盘文件系统
迭代计算支持高效迭代效率低
实时处理支持流处理仅批处理
容错机制RDD血统恢复数据副本
开发效率代码简洁代码复杂

Spark vs Hadoop MapReduce

graph TD
    A[数据输入] --> B{计算引擎}
    
    B -->|MapReduce| C[Map阶段]
    C --> D[Shuffle写磁盘]
    D --> E[Reduce阶段]
    E --> F[结果写HDFS]
    
    B -->|Spark| G[RDD转换]
    G --> H[内存计算]
    H --> I[结果输出]
    
    style C fill:#ffcccb
    style D fill:#ffcccb
    style E fill:#ffcccb
    style F fill:#ffcccb
    style G fill:#90EE90
    style H fill:#90EE90
    style I fill:#90EE90

性能对比

  • 内存计算:Spark在内存中缓存数据,避免重复I/O
  • DAG执行:Spark将作业构建为DAG,优化执行计划
  • Pipelining:Spark支持算子流水线,减少中间数据存储
  • 代码生成:Catalyst优化器生成高效的Java代码

Spark应用场景

典型应用领域

场景描述优势
数据ETL大规模数据清洗、转换、加载处理速度快,支持多种数据源
实时流处理实时数据分析、监控告警低延迟,高吞吐量
机器学习大规模机器学习模型训练MLlib生态,迭代计算优势
交互式查询即席查询、数据探索SQL支持,响应速度快
图计算社交网络分析、推荐系统GraphX图处理能力

Spark生态系统

核心组件

Spark核心组件详解表

组件主要功能核心API关键特性适用场景性能特点
Spark Core基础运行时引擎RDD内存计算、懒加载、容错数据处理基础高速内存计算
Spark SQL结构化数据处理DataFrame/DatasetSQL支持、Catalyst优化数据分析、ETL查询优化
Spark Streaming流数据处理DStream微批处理、准实时实时数据处理秒级延迟
Structured Streaming统一流批处理DataFrame端到端一致性、状态管理实时分析毫秒级延迟
MLlib机器学习库ML Pipeline分布式算法、管道API大数据机器学习可扩展性强
GraphX图计算框架Graph/VertexRDD图算法、并行计算社交网络、推荐内存图计算

Spark 核心概念

RDD核心概念

RDD (Resilient Distributed Dataset) 是Spark的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。

RDD特性

graph TD
    A[RDD核心特性] --> B[不可变性<br/>Immutable]
    A --> C[分布式<br/>Distributed]
    A --> D[弹性容错<br/>Resilient]
    A --> E[惰性求值<br/>Lazy Evaluation]
    A --> F[分区计算<br/>Partitioned]
    
    B --> B1[数据一旦创建不可修改]
    C --> C1[数据分布在集群多个节点]
    D --> D1[通过血统信息自动容错]
    E --> E1[Transform操作延迟执行]
    F --> F1[支持并行计算]
    
    style A fill:#e1f5fe
    style B fill:#e8f5e8
    style C fill:#fff3e0
    style D fill:#ffebee
    style E fill:#f3e5f5
    style F fill:#fce4ec

RDD的五大特性

特性描述意义
分区列表RDD由多个分区组成支持并行计算
计算函数每个分区都有计算函数定义数据处理逻辑
依赖关系RDD之间的依赖关系支持容错恢复
分区器Key-Value RDD的分区策略优化数据分布
位置偏好计算分区的最佳位置数据本地性优化

RDD操作分类

Transformation vs Action

graph LR
    A[RDD操作] --> B[Transformation<br/>转换操作]
    A --> C[Action<br/>行动操作]
    
    B --> D[惰性执行<br/>不立即计算]
    B --> E[返回新RDD]
    B --> F[构建计算图]
    
    C --> G[立即执行<br/>触发计算]
    C --> H[返回结果值]
    C --> I[提交作业]
    
    style B fill:#e8f5e8
    style C fill:#ffebee

常用Transformation操作

// 创建RDD
val rdd = sc.parallelize(1 to 100, 4)

// map:一对一转换
val mapRDD = rdd.map(x => x * 2)

// filter:过滤数据
val filterRDD = rdd.filter(x => x % 2 == 0)

// flatMap:一对多转换
val flatMapRDD = rdd.flatMap(x => 1 to x)

// groupByKey:按键分组
val kvRDD = rdd.map(x => (x % 10, x))
val groupedRDD = kvRDD.groupByKey()

// reduceByKey:按键聚合
val reducedRDD = kvRDD.reduceByKey(_ + _)

// join:连接操作
val rdd2 = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c")))
val joinedRDD = kvRDD.join(rdd2)

常用Action操作

// collect:收集所有元素到Driver
val result = rdd.collect()

// count:计算元素数量
val cnt = rdd.count()

// first:获取第一个元素
val firstElement = rdd.first()

// take:获取前n个元素
val firstN = rdd.take(10)

// reduce:聚合所有元素
val sum = rdd.reduce(_ + _)

// foreach:遍历每个元素
rdd.foreach(println)

// saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://output/path")

RDD依赖关系

依赖类型

graph TD
    A[RDD依赖关系] --> B[窄依赖<br/>Narrow Dependency]
    A --> C[宽依赖<br/>Wide Dependency]
    
    B --> D[一对一映射<br/>1:1 Mapping]
    B --> E[同一Stage内<br/>Pipeline执行]
    B --> F[局部失败恢复]
    
    C --> G[一对多映射<br/>1:N Mapping]
    C --> H[需要Shuffle<br/>跨Stage执行]
    C --> I[全量重新计算]
    
    style B fill:#e8f5e8
    style C fill:#ffebee

窄依赖示例

// map, filter, union等操作产生窄依赖
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(_ * 2)        // 窄依赖
val rdd3 = rdd2.filter(_ > 10)    // 窄依赖

宽依赖示例

// groupByKey, reduceByKey, join等操作产生宽依赖
val rdd1 = sc.parallelize(Seq((1, "a"), (2, "b"), (1, "c")), 2)
val rdd2 = rdd1.groupByKey()      // 宽依赖,需要Shuffle
val rdd3 = rdd1.reduceByKey(_ + _) // 宽依赖,需要Shuffle

DataFrame与Dataset

DataFrame概念

DataFrame 是Spark SQL的核心抽象,是一个以命名列方式组织的分布式数据集,类似于关系数据库中的表。

DataFrame特点

  • 结构化数据:具有明确的Schema定义
  • 优化执行:使用Catalyst优化器
  • 多语言支持:Scala、Java、Python、R
  • 丰富API:SQL风格和函数式API

DataFrame创建

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("DataFrameExample")
  .getOrCreate()

import spark.implicits._

// 方式1:从RDD创建
val rdd = sc.parallelize(Seq(("Alice", 25), ("Bob", 30), ("Charlie", 35)))
val df1 = rdd.toDF("name", "age")

// 方式2:从序列创建
val df2 = Seq(("Alice", 25), ("Bob", 30)).toDF("name", "age")

// 方式3:从外部数据源创建
val df3 = spark.read.json("path/to/file.json")
val df4 = spark.read.parquet("path/to/file.parquet")

// 方式4:通过Schema创建
val schema = StructType(Seq(
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true)
))
val df5 = spark.createDataFrame(rdd, schema)

Dataset概念

Dataset 是DataFrame的扩展,提供了类型安全的面向对象编程接口。

Dataset特点

  • 类型安全:编译时类型检查
  • 优化执行:享受Catalyst优化器
  • 函数式API:支持lambda表达式
  • 编码器支持:自动序列化/反序列化

Dataset创建

// 定义样例类
case class Person(name: String, age: Int, city: String)

// 方式1:从序列创建
val ds1 = Seq(
  Person("Alice", 25, "Beijing"),
  Person("Bob", 30, "Shanghai")
).toDS()

// 方式2:从DataFrame转换
val ds2 = df.as[Person]

// 方式3:从外部数据源创建
val ds3 = spark.read.json("path/to/file.json").as[Person]

三者对比分析

RDD vs DataFrame vs Dataset 全面对比

特性RDDDataFrameDataset
数据抽象分布式对象集合结构化数据表类型安全的数据表
编译时检查❌ 运行时错误❌ 运行时错误✅ 编译时错误
执行优化❌ 无优化✅ Catalyst优化✅ Catalyst优化
代码生成❌ 无✅ 有✅ 有
序列化Java/Kryo序列化Tungsten二进制格式Tungsten二进制格式
API风格函数式SQL + 函数式类型安全函数式
性能
易用性复杂简单中等
适用场景低级操作、非结构化数据SQL查询、结构化数据类型安全要求高

性能对比

// 性能测试示例
import org.apache.spark.sql.functions._

// RDD方式 - 性能较低
val rddResult = rdd.filter(_.age > 25)
  .map(p => (p.city, 1))
  .reduceByKey(_ + _)
  .collect()

// DataFrame方式 - 性能优化
val dfResult = df.filter($"age" > 25)
  .groupBy("city")
  .count()
  .collect()

// Dataset方式 - 类型安全 + 性能优化
val dsResult = ds.filter(_.age > 25)
  .groupByKey(_.city)
  .count()
  .collect()

选择建议

graph TD
    A[选择数据抽象] --> B{数据类型}
    B -->|非结构化| C[使用RDD]
    B -->|结构化| D{类型安全要求}
    D -->|不需要| E[使用DataFrame]
    D -->|需要| F[使用Dataset]
    
    C --> G[复杂数据处理<br/>底层控制]
    E --> H[SQL查询<br/>高性能要求]
    F --> I[类型安全<br/>编译时检查]
    
    style C fill:#ffebee
    style E fill:#e8f5e8
    style F fill:#e1f5fe

分区机制

分区策略

分区的重要性

  • 并行度控制:分区数决定任务并行度
  • 数据本地性:减少网络传输
  • 负载均衡:避免数据倾斜
  • 资源利用:充分利用集群资源

分区器类型

分区器适用数据类型分区策略使用场景
HashPartitionerKey-Value RDDHash(key) % numPartitions均匀分布的键
RangePartitioner可排序的Key-Value RDD按键值范围分区有序数据查询
自定义分区器任意类型用户定义逻辑特殊业务需求

分区操作示例

// 创建带分区的RDD
val rdd = sc.parallelize(1 to 100, 4)  // 4个分区

// 查看分区信息
println(s"分区数: ${rdd.getNumPartitions}")
println(s"分区内容: ${rdd.glom().collect().map(_.toList).toList}")

// 重新分区
val repartitionedRDD = rdd.repartition(8)  // 增加分区数
val coalescedRDD = rdd.coalesce(2)         // 减少分区数

// Key-Value RDD分区
val kvRDD = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")), 2)

// 使用HashPartitioner
val hashPartitioned = kvRDD.partitionBy(new HashPartitioner(3))

// 使用RangePartitioner
val rangePartitioned = kvRDD.partitionBy(new RangePartitioner(3, kvRDD))

自定义分区器

import org.apache.spark.Partitioner

// 自定义分区器:按用户ID的地区分区
class RegionPartitioner(regions: Array[String]) extends Partitioner {
  
  override def numPartitions: Int = regions.length
  
  override def getPartition(key: Any): Int = {
    val userId = key.asInstanceOf[String]
    val region = getUserRegion(userId)
    math.abs(regions.indexOf(region)) % numPartitions
  }
  
  private def getUserRegion(userId: String): String = {
    // 根据用户ID确定地区的业务逻辑
    userId.substring(0, 2) match {
      case "01" | "02" => "North"
      case "03" | "04" => "South"
      case "05" | "06" => "East"
      case _ => "West"
    }
  }
}

// 使用自定义分区器
val regions = Array("North", "South", "East", "West")
val customPartitioner = new RegionPartitioner(regions)
val customPartitioned = kvRDD.partitionBy(customPartitioner)

分区调优

分区数优化

// 分区数设置原则
val totalCores = 16  // 集群总核心数
val optimalPartitions = totalCores * 2  // 推荐分区数为核心数的2-3倍

// 动态调整分区数
def getOptimalPartitions(dataSize: Long): Int = {
  val targetPartitionSize = 128 * 1024 * 1024  // 128MB per partition
  math.max(1, (dataSize / targetPartitionSize).toInt)
}

// 分区倾斜检测
def detectPartitionSkew(rdd: RDD[_]): Unit = {
  val partitionSizes = rdd.mapPartitionsWithIndex { (index, iter) =>
    Iterator((index, iter.size))
  }.collect()
  
  val avgSize = partitionSizes.map(_._2).sum / partitionSizes.length
  val maxSize = partitionSizes.map(_._2).max
  val skewRatio = maxSize.toDouble / avgSize
  
  if (skewRatio > 2.0) {
    println(s"警告:检测到分区倾斜,倾斜比例: $skewRatio")
    partitionSizes.foreach { case (index, size) =>
      println(s"分区 $index: $size 条记录")
    }
  }
}

分区优化策略

  1. 预分区策略
// 根据数据特征预分区
val userRDD = sc.textFile("hdfs://users/*")
  .map(parseUser)
  .partitionBy(new HashPartitioner(numPartitions))
  .cache()  // 缓存预分区的数据
  1. Coalesce vs Repartition
// Coalesce:减少分区,避免全量Shuffle
val reducedRDD = largeRDD.coalesce(10)

// Repartition:重新分区,会进行全量Shuffle
val reshuffledRDD = largeRDD.repartition(20)

// 条件分区调整
def smartRepartition[T](rdd: RDD[T], targetPartitions: Int): RDD[T] = {
  val currentPartitions = rdd.getNumPartitions
  if (targetPartitions < currentPartitions) {
    rdd.coalesce(targetPartitions)
  } else {
    rdd.repartition(targetPartitions)
  }
}
  1. 分区保持策略
// 使用mapPartitions保持分区结构
val optimizedRDD = rdd.mapPartitions { iter =>
  // 分区内处理逻辑
  iter.map(processRecord)
}

// 避免破坏分区的操作
val goodRDD = partitionedRDD.mapValues(_ * 2)  // 保持分区
val badRDD = partitionedRDD.map(x => (x._1, x._2 * 2))  // 可能破坏分区

Spark 架构与原理

整体架构设计

系统架构总览

Spark分布式系统架构是基于**主从模式(Master-Slave)的分布式计算框架,采用驱动器-执行器(Driver-Executor)**的执行模型。

Spark系统架构层次图

graph TB
    subgraph "应用层 Application Layer"
        A1[Spark应用程序]
        A2[用户代码 User Code]
    end
    
    subgraph "接口层 API Layer"
        B1[Spark Core RDD API]
        B2[Spark SQL DataFrame/Dataset]
        B3[Spark Streaming DStream]
        B4[MLlib Pipeline]
        B5[GraphX Graph]
    end
    
    subgraph "引擎层 Engine Layer"
        C1[Catalyst优化器]
        C2[DAG调度器]
        C3[任务调度器]
        C4[内存管理]
        C5[Shuffle管理]
    end
    
    subgraph "运行时层 Runtime Layer"
        D1[Driver进程]
        D2[Executor进程]
        D3[BlockManager]
        D4[RPC通信]
    end
    
    subgraph "资源层 Resource Layer"
        E1[YARN集群]
        E2[Kubernetes]
        E3[Mesos集群]
        E4[Standalone集群]
    end
    
    subgraph "存储层 Storage Layer"
        F1[HDFS]
        F2[HBase]
        F3[S3/OSS]
        F4[本地存储]
    end
    
    A1 --> B1
    A1 --> B2
    A1 --> B3
    A1 --> B4
    A1 --> B5
    
    B2 --> C1
    B1 --> C2
    B1 --> C3
    B1 --> C4
    B1 --> C5
    
    C2 --> D1
    C3 --> D2
    C4 --> D3
    C5 --> D4
    
    D1 --> E1
    D1 --> E2
    D1 --> E3
    D1 --> E4
    
    D2 --> F1
    D2 --> F2
    D2 --> F3
    D2 --> F4
    
    style A1 fill:#e1f5fe
    style C1 fill:#fff3e0
    style D1 fill:#e8f5e8
    style E1 fill:#fce4ec

架构分层说明

层次功能职责核心组件技术特点对外接口
应用层业务逻辑实现用户程序高级抽象API编程接口
接口层统一编程模型RDD/DataFrame/DStream声明式编程多语言API
引擎层查询优化执行Catalyst/DAGScheduler自动优化内部接口
运行时层分布式执行Driver/Executor容错机制RPC通信
资源层资源管理调度集群管理器弹性伸缩资源API
存储层数据持久化分布式存储高可靠性存储协议

Spark完整架构交互图 - 已在之前的章节中定义,展示了各组件间的详细交互关系。

核心组件协作流程

sequenceDiagram
    participant User as 用户应用
    participant SC as SparkContext
    participant CM as 集群管理器
    participant Driver as Driver
    participant Executor as Executor
    
    Note over User, Executor: 应用程序生命周期
    User->>SC: 1. 创建SparkContext
    SC->>CM: 2. 申请集群资源
    CM->>Executor: 3. 启动Executor进程
    Executor->>Driver: 4. 注册到Driver
    
    Note over User, Executor: 作业执行过程
    User->>SC: 5. 提交Action操作
    SC->>Driver: 6. 构建DAG图
    Driver->>Driver: 7. 划分Stage
    Driver->>Executor: 8. 分发Task
    Executor->>Executor: 9. 执行计算
    Executor->>Driver: 10. 返回结果
    Driver->>User: 11. 汇总结果
    
    Note over User, Executor: 资源清理
    User->>SC: 12. 停止应用
    SC->>Executor: 13. 停止Executor
    SC->>CM: 14. 释放资源

部署架构模式

Spark支持多种部署模式,每种模式适用于不同的场景和需求:

部署模式对比分析

部署模式架构特点Driver位置适用场景优势劣势
本地模式单机运行本地进程开发测试简单易用无分布式能力
Standalone ClientSpark原生客户端交互式开发实时反馈网络依赖强
Standalone ClusterSpark原生集群节点生产批处理高可用资源隔离弱
YARN ClientHadoop集成客户端数据探索Hadoop兼容客户端负载重
YARN ClusterHadoop集成YARN容器企业生产资源隔离好调试困难
Kubernetes容器化Pod云原生弹性伸缩复杂度高
Mesos通用调度Mesos框架多框架共享细粒度资源配置复杂

典型部署架构示例

graph TB
    subgraph "YARN Cluster部署"
        Y1[ResourceManager]
        Y2[NodeManager 1]
        Y3[NodeManager 2]
        Y4[ApplicationMaster<br/>Driver]
        Y5[Container<br/>Executor]
        Y6[Container<br/>Executor]
        
        Y1 --> Y2
        Y1 --> Y3
        Y2 --> Y4
        Y2 --> Y5
        Y3 --> Y6
        Y4 -.->|调度| Y5
        Y4 -.->|调度| Y6
    end
    
    subgraph "Kubernetes部署"
        K1[Master Node]
        K2[Worker Node 1]
        K3[Worker Node 2]
        K4[Driver Pod]
        K5[Executor Pod]
        K6[Executor Pod]
        
        K1 --> K2
        
        K1 --> K3
        K2 --> K4
        K2 --> K5
        K3 --> K6
        K4 -.->|调度| K5
        K4 -.->|调度| K6
    end
    
    subgraph "Standalone部署"
        S1[Spark Master]
        S2[Spark Worker 1]
        S3[Spark Worker 2]
        S4[Driver]
        S5[Executor]
        S6[Executor]
        
        S1 --> S2
        S1 --> S3
        S2 --> S4
        S2 --> S5
        S3 --> S6
        S4 -.->|调度| S5
        S4 -.->|调度| S6
    end
    
    style Y1 fill:#e1f5fe
    style K1 fill:#fff3e0
    style S1 fill:#e8f5e8

核心组件原理

SparkContext - 应用程序入口

SparkContext 是Spark应用程序的入口点,负责与集群建立连接。

// SparkContext核心功能
class SparkContext(config: SparkConf) extends Logging {
  
  // 1. 初始化核心组件
  private val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus, numCores, mockOutputCommitCoordinator)
  private val statusTracker = new SparkStatusTracker(this, sparkUI)
  private val taskScheduler = createTaskScheduler(this, master, deployMode)
  private val dagScheduler = new DAGScheduler(this)
  
  // 2. 创建RDD
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
  
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions)
      .map(pair => pair._2.toString)
  }
  
  // 3. 提交作业
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    dagScheduler.runJob(rdd, func, partitions, callSite, resultHandler, localProperties.get)
  }
  
  // 4. 资源管理
  def stop(): Unit = {
    dagScheduler.stop()
    taskScheduler.stop()
    env.stop()
  }
}

Driver Program - 驱动程序

Driver Program 是Spark应用程序的控制中心,运行应用程序的main函数,负责:

  • 创建SparkContext:初始化Spark应用程序
  • 构建逻辑计划:将用户程序转换为DAG
  • 任务调度:将DAG分解为Stage和Task
  • 结果收集:收集Executor返回的结果
// Driver程序示例
object WordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建SparkContext
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    
    try {
      // 2. 创建RDD并定义转换操作
      val lines = sc.textFile(args(0))
      val words = lines.flatMap(_.split("\\s+"))
      val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
      
      // 3. 触发Action,提交作业
      wordCounts.saveAsTextFile(args(1))
      
    } finally {
      // 4. 停止SparkContext
      sc.stop()
    }
  }
}

Cluster Manager - 集群管理器

集群管理器是Spark与底层资源管理系统的接口层,负责资源的申请、分配和监控。

集群管理器详细对比

集群管理器部署复杂度资源隔离高可用性动态分配生态集成适用场景
Standalone简单应用级支持支持Spark专用开发测试、专用集群
YARN中等队列级支持Hadoop生态企业级大数据平台
Mesos复杂应用级支持多框架多租户共享环境
Kubernetes中等命名空间支持云原生容器化微服务环境

部署模式对比

部署模式Driver位置网络要求交互性故障恢复资源使用使用场景
Client模式客户端稳定网络连接Driver故障导致应用失败客户端资源交互式开发、调试
Cluster模式集群节点无特殊要求集群自动恢复集群资源生产环境、批处理

YARN模式详解

// YARN Client模式
spark-submit \
  --master yarn \
  --deploy-mode client \
  --num-executors 10 \
  --executor-memory 4g \
  --executor-cores 2 \
  --class com.example.MyApp \
  myapp.jar

// YARN Cluster模式  
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 10 \
  --executor-memory 4g \
  --executor-cores 2 \
  --class com.example.MyApp \
  myapp.jar

Executor - 任务执行器

Executor 是运行在Worker节点上的JVM进程,是Spark任务的实际执行单元,负责:

  • 任务执行:运行Driver分发的Task
  • 数据存储:管理缓存和持久化数据
  • 结果返回:将计算结果回传给Driver
  • 资源管理:管理分配的CPU核心和内存
// Executor核心组件
class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {

  // 1. 线程池管理
  private val threadPool = ThreadUtils.newDaemonCachedThreadPool(
    "Executor task launch worker", sparkConf.get(EXECUTOR_CORES), 60)
  
  // 2. 内存管理
  private val memoryManager = env.memoryManager
  
  // 3. 存储管理
  private val blockManager = env.blockManager
  
  // 4. 任务执行
  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
  }
  
  // 5. 任务运行器
  class TaskRunner(
      execBackend: ExecutorBackend,
      private val taskDescription: TaskDescription)
    extends Runnable {
    
    override def run(): Unit = {
      try {
        // 反序列化任务
        val task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        
        // 执行任务
        val value = task.run(
          taskAttemptId = taskDescription.taskId,
          attemptNumber = taskDescription.attemptNumber,
          metricsSystem = env.metricsSystem)
        
        // 返回结果
        execBackend.statusUpdate(taskDescription.taskId, TaskState.FINISHED, ser.serialize(value))
        
      } catch {
        case e: Exception =>
          execBackend.statusUpdate(taskDescription.taskId, TaskState.FAILED, ser.serialize(TaskFailedReason))
      }
    }
  }
}

任务调度机制

调度框架总览

Spark完整架构交互图

graph TB
    subgraph "客户端"
        A1[用户应用程序]
        A2[spark-submit]
    end
    
    subgraph "Driver节点"
        B1[SparkContext]
        B2[DAGScheduler] 
        B3[TaskScheduler]
        B4[SchedulerBackend]
        B5[BlockManagerMaster]
        B6[SparkUI]
    end
    
    subgraph "集群管理器"
        C1[YARN ResourceManager]
        C2[Application Master]
        C3[NodeManager]
    end
    
    subgraph "Worker节点1"
        D1[Executor 1]
        D2[Task执行]
        D3[BlockManager]
        D4[MemoryStore]
        D5[DiskStore]
        D6[ShuffleManager]
    end
    
    subgraph "Worker节点2"
        E1[Executor 2] 
        E2[Task执行]
        E3[BlockManager]
        E4[MemoryStore]
        E5[DiskStore]
        E6[ShuffleManager]
    end
    
    %% 交互关系
    A1 --> A2
    A2 --> B1
    B1 --> B2
    B2 --> B3
    B3 --> B4
    B1 --> B5
    B1 --> B6
    
    B4 --> C1
    C1 --> C2
    C2 --> C3
    C3 --> D1
    C3 --> E1
    
    D1 --> D2
    D1 --> D3
    D3 --> D4
    D3 --> D5
    D1 --> D6
    
    E1 --> E2
    E1 --> E3
    E3 --> E4
    E3 --> E5
    E1 --> E6
    
    %% 跨节点通信
    B5 -.->|元数据管理| D3
    B5 -.->|元数据管理| E3
    D3 -.->|数据传输| E3
    D6 -.->|Shuffle数据| E6
    
    %% 样式设置
    style A1 fill:#e1f5fe
    style B1 fill:#fff3e0
    style C1 fill:#ffebee
    style D1 fill:#e8f5e8
    style E1 fill:#e8f5e8

组件职责与交互关系

组件层级核心组件主要职责交互对象通信方式
客户端SparkSubmit应用提交与配置YARN RMHTTP/RPC
DriverSparkContext应用程序入口所有组件RPC调用
DriverDAGSchedulerStage划分与调度TaskScheduler同步调用
DriverTaskSchedulerTask分发与管理SchedulerBackend事件驱动
DriverSchedulerBackend集群通信接口集群管理器RPC/HTTP
DriverBlockManagerMaster存储元数据管理BlockManagerRPC通信
集群ResourceManager资源分配管理ApplicationMasterRPC通信
WorkerExecutor任务执行引擎Driver异步RPC
WorkerBlockManager数据存储管理BlockManagerMasterRPC通信
WorkerShuffleManagerShuffle数据管理其他Executor网络传输

数据流转路径分析

sequenceDiagram
    participant User as 用户程序
    participant Driver as Driver
    participant CM as 集群管理器
    participant Executor as Executor
    
    Note over User, Executor: 1. 应用程序启动阶段
    User->>Driver: 提交Spark应用
    Driver->>CM: 申请资源
    CM->>Executor: 启动Executor进程
    Executor->>Driver: 注册到Driver
    
    Note over User, Executor: 2. 作业执行阶段  
    User->>Driver: 调用Action操作
    Driver->>Driver: 构建DAG,划分Stage
    Driver->>Executor: 分发Task
    Executor->>Executor: 执行Task
    Executor->>Driver: 返回结果
    
    Note over User, Executor: 3. Shuffle数据交换
    Executor->>Executor: Shuffle Write
    Executor->>Executor: Shuffle Read
    
    Note over User, Executor: 4. 应用程序结束
    Driver->>Executor: 停止Executor
    Driver->>CM: 释放资源
    Driver->>User: 返回最终结果

DAG调度器原理

DAGScheduler 是Spark作业调度的核心组件,负责将用户提交的RDD DAG分解为多个Stage,并按照依赖关系顺序提交给TaskScheduler执行。它是连接高层RDD操作和底层任务执行的关键桥梁。

主要功能

  • DAG构建与分析:将RDD的转换操作构建成有向无环图
  • Stage划分:根据宽依赖边界将DAG划分为多个Stage
  • 任务调度:按照Stage依赖关系进行拓扑排序和调度
  • 容错处理:处理任务失败、Stage重试等容错逻辑
  • 资源管理:与TaskScheduler协调进行资源分配
graph TD
    A[用户调用Action] --> B[SparkContext.runJob]
    B --> C[DAGScheduler.runJob]
    C --> D[创建ActiveJob]
    D --> E[提交JobSubmitted事件]
    E --> F[handleJobSubmitted]
    F --> G[创建ResultStage]
    G --> H[getOrCreateParentStages]
    H --> I[递归分析RDD依赖]
    I --> J{依赖类型判断}
    J -->|窄依赖| K[继续向上遍历]
    J -->|宽依赖| L[创建ShuffleMapStage]
    K --> I
    L --> M[submitStage]
    M --> N[getMissingParentStages]
    N --> O{父Stage是否完成}
    O -->|未完成| P[递归提交父Stage]
    O -->|已完成| Q[submitMissingTasks]
    P --> M
    Q --> R[创建TaskSet]
    R --> S[TaskScheduler.submitTasks]
    S --> T[任务分发与执行]
    T --> U[Stage完成]
    U --> V[检查后续Stage]
    V --> W[Job完成]
    
    style A fill:#e1f5fe
    style G fill:#fff3e0
    style L fill:#ffebee
    style W fill:#c8e6c9

DAGScheduler架构组件

组件类名主要职责关键方法
DAGSchedulerDAGScheduler作业调度和Stage划分submitJob, submitStage
EventLoopDAGSchedulerEventProcessLoop事件处理循环post, onReceive
StageStage, ResultStage, ShuffleMapStageStage抽象findMissingPartitions
JobActiveJob作业抽象numFinished, numPartitions

事件处理流程图

graph TD
    A[DAGScheduler事件] --> B[EventProcessLoop]
    B --> C{事件类型}
    C -->|JobSubmitted| D[handleJobSubmitted]
    C -->|StageCompleted| E[handleStageCompleted]
    C -->|TaskCompleted| F[handleTaskCompleted]
    C -->|TaskFailed| G[handleTaskFailed]
    C -->|ExecutorLost| H[handleExecutorLost]
    
    D --> I[创建Stage]
    E --> J[检查后续Stage]
    F --> K[更新Stage状态]
    G --> L[重试或失败处理]
    H --> M[重新提交受影响Stage]
    
    I --> N[提交Stage]
    J --> N
    K --> O[Stage完成检查]
    L --> P[容错处理]
    M --> N
    
    style B fill:#e1f5fe
    style N fill:#e8f5e8
    style P fill:#ffebee
// DAGScheduler事件类型定义
sealed trait DAGSchedulerEvent

case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) extends DAGSchedulerEvent

case class StageCompleted(stage: Stage) extends DAGSchedulerEvent
case class TaskCompleted(task: Task[_], reason: TaskEndReason) extends DAGSchedulerEvent
case class TaskFailed(taskId: Long, reason: TaskFailedReason) extends DAGSchedulerEvent
case class ExecutorLost(execId: String, reason: ExecutorLossReason) extends DAGSchedulerEvent

// 事件处理循环实现
class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
  
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = dagScheduler.metricsSource.messageProcessingTimer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }
  
  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
      
    case StageCompleted(stage) =>
      dagScheduler.handleStageCompleted(stage)
      
    case TaskCompleted(task, reason) =>
      dagScheduler.handleTaskCompleted(task, reason)
      
    case TaskFailed(taskId, reason) =>
      dagScheduler.handleTaskFailed(taskId, reason)
      
    case ExecutorLost(execId, reason) =>
      dagScheduler.handleExecutorLost(execId, reason)
  }
}

DAGScheduler主要源码

// DAGScheduler.scala - 核心调度逻辑
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  // 事件处理循环
  private val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  
  // Stage和Job管理
  private val stageIdToStage = new HashMap[Int, Stage]
  private val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
  private val jobIdToActiveJob = new HashMap[Int, ActiveJob]
  private val activeJobs = new HashSet[ActiveJob]
  
  // 等待和运行中的Stage
  private val waitingStages = new HashSet[Stage]
  private val runningStages = new HashSet[Stage]
  private val failedStages = new HashSet[Stage]
  
  // 1. 提交作业的核心方法
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter, Duration.Inf)
    
    waiter.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }
  
  // 2. 提交作业
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    
    // 检查分区有效性
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
    
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // 空分区直接返回
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    
    // 提交JobSubmitted事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter, properties))
    waiter
  }
  
  // 3. 处理作业提交
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    
    var finalStage: ResultStage = null
    try {
      // 创建ResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    
    // 创建ActiveJob
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    
    val stageIds = jobIdToStageIds(jobId)
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    
    // 提交Stage
    submitStage(finalStage)
  }
  
  // 4. Stage划分核心算法
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new ArrayBuffer[Stage]()
    val visited = new HashSet[RDD[_]]
    
    def visit(r: RDD[_]): Unit = {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              // 宽依赖,创建新的ShuffleMapStage
              parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
            case _ =>
              // 窄依赖,递归访问父RDD
              visit(dep.rdd)
          }
        }
      }
    }
    
    visit(rdd)
    parents.toList
}
  
  // 5. 创建或获取ShuffleMapStage
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
        
      case None =>
        // 递归创建父Stage
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }
  
  // 6. 提交Stage
  private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }
  
  // 7. 提交缺失的任务
  private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    logDebug("submitMissingTasks(" + stage + ")")
    
    // 获取缺失的分区
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
    // 添加到运行中的Stage
    runningStages += stage
    
    // 创建任务
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, serializedTaskMetrics)
          }
          
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, serializedTaskMetrics)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    
    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${partitionsToCompute.take(15).mkString(", ")})")
      
      // 提交TaskSet给TaskScheduler
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    } else {
      // 没有任务需要运行,标记Stage完成
      markStageAsFinished(stage, None)
      
      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
      
      submitWaitingChildStages(stage)
    }
  }
  
  // 8. 处理任务完成
  def handleTaskCompletion(event: CompletionEvent): Unit = {
    val task = event.task
    val taskId = event.taskInfo.taskId
    val stageId = task.stageId
    val taskType = Utils.getFormattedClassName(task)
    
    // 更新累加器
    event.accumUpdates.foreach { case (id, partialValue) =>
      val acc = AccumulatorContext.get(id)
      if (acc != null) {
        acc.asInstanceOf[AccumulatorV2[Any, Any]].merge(partialValue.asInstanceOf[Any])
      }
    }
    
    // 处理不同的任务结果
    event.reason match {
      case Success =>
        task match {
          case rt: ResultTask[_, _] =>
            // ResultTask完成
            val resultStage = stage.asInstanceOf[ResultStage]
            resultStage.activeJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  
                  // 调用结果处理器
                  job.listener.taskSucceeded(rt.outputId, event.result)
                  
                  // 检查Job是否完成
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(resultStage)
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                  }
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }
            
          case smt: ShuffleMapTask =>
            // ShuffleMapTask完成
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            shuffleStage.addOutputLoc(smt.partitionId, event.result.asInstanceOf[MapStatus])
            
            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              markStageAsFinished(shuffleStage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              
              // 提交等待中的子Stage
              submitWaitingChildStages(shuffleStage)
            }
        }
        
      case _: TaskFailedException =>
        // 任务失败处理
        handleTaskFailure(task, event.reason.asInstanceOf[TaskFailedException])
    }
  }
}

Stage划分算法详解

Stage划分原理流程图

graph TD
    A[开始Stage划分] --> B[从最终RDD开始]
    B --> C[遍历RDD依赖]
    C --> D{依赖类型}
    D -->|窄依赖| E[加入当前Stage]
    D -->|宽依赖| F[创建新Stage边界]
    E --> G[继续遍历父RDD]
    F --> H[创建ShuffleMapStage]
    G --> C
    H --> I[递归处理父RDD]
    I --> C
    C --> J{是否还有未处理RDD}
    J -->|是| C
    J -->|否| K[Stage划分完成]
    
    style A fill:#e1f5fe
    style F fill:#ffebee
    style H fill:#fff3e0
    style K fill:#e8f5e8

Stage划分与依赖管理源码

// Stage划分核心逻辑
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  val parents = new ArrayBuffer[Stage]()
  val visited = new HashSet[RDD[_]]
  
  def visit(r: RDD[_]): Unit = {
    if (!visited(r)) {
      visited += r
      for (dep <- r.dependencies) {
        dep match {
          case shufDep: ShuffleDependency[_, _, _] =>
            // 宽依赖,创建新的ShuffleMapStage
            parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
          case _ =>
            // 窄依赖,递归访问父RDD
            visit(dep.rdd)
        }
      }
    }
  }
  
  visit(rdd)
  parents.toList
}

// 查找缺失的父依赖
private def getMissingAncestorShuffleDependencies(
    rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
  val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ArrayStack[RDD[_]]
  
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
      visited += toVisit
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.push(shuffleDep)
            waitingForVisit.push(shuffleDep.rdd)
          }
        case narrowDep: NarrowDependency[_] =>
          waitingForVisit.push(narrowDep.rdd)
      }
    }
  }
  ancestors
}

// 创建ResultStage
private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

// 创建ShuffleMapStage
private def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  val numTasks = rdd.partitions.length
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
  
  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)
  
  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ") as input to " +
      "shuffle " + shuffleDep.shuffleId)
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}

容错处理流程图

graph TD
    A[Task执行失败] --> B{失败原因分析}
    B -->|FetchFailed| C[Shuffle数据获取失败]
    B -->|TaskKilled| D[任务被杀死]
    B -->|ExceptionFailure| E[任务执行异常]
    B -->|ExecutorLostFailure| F[Executor丢失]
    
    C --> G[标记父Stage失败]
    G --> H[重新提交父Stage]
    H --> I[重新计算Shuffle数据]
    
    D --> J[检查重试次数]
    E --> J
    J --> K{是否超过最大重试}
    K -->|否| L[重新调度Task]
    K -->|是| M[标记Stage失败]
    
    F --> N[移除丢失的Executor]
    N --> O[重新分配资源]
    O --> P[重新提交所有Task]
    
    L --> Q[Task重新执行]
    I --> Q
    P --> Q
    M --> R[Job失败]
    Q --> S[执行成功]
    
    style A fill:#ffebee
    style C fill:#fff3e0
    style R fill:#ffcdd2
    style S fill:#e8f5e8
graph TD
    A[RDD DAG] --> B[DAGScheduler]
    B --> C[Stage划分]
    C --> D[Stage 0<br/>ShuffleMapStage]
    C --> E[Stage 1<br/>ShuffleMapStage]  
    C --> F[Stage 2<br/>ResultStage]
    
    D --> G[Task 0-1]
    D --> H[Task 0-2]
    E --> I[Task 1-1]
    E --> J[Task 1-2]
    F --> K[Task 2-1]
    F --> L[Task 2-2]
    
    style B fill:#e1f5fe
    style D fill:#fff3e0
    style E fill:#fff3e0
    style F fill:#e8f5e8

DAGScheduler架构组件

组件类名主要职责关键方法
DAGSchedulerDAGScheduler作业调度和Stage划分submitJob, submitStage
EventLoopDAGSchedulerEventProcessLoop事件处理循环post, onReceive
StageStage, ResultStage, ShuffleMapStageStage抽象findMissingPartitions
JobActiveJob作业抽象numFinished, numPartitions

DAGScheduler事件处理

// DAGScheduler事件类型
sealed trait DAGSchedulerEvent

case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) extends DAGSchedulerEvent

case class StageCompleted(stage: Stage) extends DAGSchedulerEvent
case class TaskCompleted(task: Task[_], reason: TaskEndReason) extends DAGSchedulerEvent

// 事件处理循环
class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") {
  
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    event match {
      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
      case StageCompleted(stage) =>
        dagScheduler.handleStageCompletion(stage)
      case TaskCompleted(task, reason) =>
        dagScheduler.handleTaskCompletion(task, reason)
    }
  }
}

Stage划分与依赖管理

// Stage划分核心逻辑
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  val parents = new ArrayBuffer[Stage]()
  val visited = new HashSet[RDD[_]]
  
  def visit(r: RDD[_]): Unit = {
    if (!visited(r)) {
      visited += r
      for (dep <- r.dependencies) {
        dep match {
          case shufDep: ShuffleDependency[_, _, _] =>
            // 宽依赖,创建新的ShuffleMapStage
            parents += getOrCreateShuffleMapStage(shufDep, firstJobId)
          case _ =>
            // 窄依赖,递归访问父RDD
            visit(dep.rdd)
        }
      }
    }
  }
  
  visit(rdd)
  parents.toList
}

任务调度器实现

TaskScheduler 负责将Task分发到Executor执行,支持多种调度策略。

graph TD
    A[DAGScheduler提交TaskSet] --> B[TaskSchedulerImpl.submitTasks]
    B --> C[创建TaskSetManager]
    C --> D[添加到调度池Pool]
    D --> E[SchedulerBackend.reviveOffers]
    E --> F[收集Executor资源信息]
    F --> G[TaskSchedulerImpl.resourceOffers]
    G --> H[按本地性级别分配Task]
    H --> I[创建TaskDescription]
    I --> J[SchedulerBackend.launchTasks]
    J --> K[发送Task到Executor]
    K --> L[Executor执行Task]
    L --> M[返回执行结果]
    M --> N[TaskScheduler.statusUpdate]
    N --> O[更新Task状态]
    O --> P{TaskSet是否完成}
    P -->|否| Q[继续调度剩余Task]
    P -->|是| R[通知DAGScheduler]
    Q --> G
    R --> S[Stage完成]
    
    style A fill:#e1f5fe
    style G fill:#fff3e0
    style L fill:#e8f5e8
    style S fill:#c8e6c9

TaskScheduler架构组件

组件类名主要职责关键特性
TaskSchedulerTaskSchedulerImpl任务调度和分发支持多种调度策略
SchedulerBackendCoarseGrainedSchedulerBackend与集群管理器通信资源分配和Executor管理
TaskSetManagerTaskSetManager管理TaskSet执行任务重试、推测执行
PoolPool调度池管理公平调度、FIFO调度

Task分发和执行完整流程图

graph TD
    A[DAGScheduler提交TaskSet] --> B[TaskScheduler.submitTasks]
    B --> C[创建TaskSetManager]
    C --> D[添加到调度池]
    D --> E[SchedulerBackend.reviveOffers]
    E --> F[收集Executor资源信息]
    F --> G[创建WorkerOffer列表]
    G --> H[TaskScheduler.resourceOffers]
    H --> I[遍历TaskSetManager队列]
    I --> J[按本地性级别分配]
    J --> K{PROCESS_LOCAL可用?}
    K -->|是| L[分配到指定Executor]
    K -->|否| M{NODE_LOCAL可用?}
    M -->|是| N[分配到指定Host]
    M -->|否| O{RACK_LOCAL可用?}
    O -->|是| P[分配到指定Rack]
    O -->|否| Q[ANY级别分配]
    L --> R[创建TaskDescription]
    N --> R
    P --> R
    Q --> R
    R --> S[序列化Task]
    S --> T[SchedulerBackend.launchTasks]
    T --> U[发送LaunchTask消息]
    U --> V[Executor接收任务]
    V --> W[创建TaskRunner]
    W --> X[提交到线程池执行]
    X --> Y[Task.run执行]
    Y --> Z[返回执行结果]
    Z --> AA[StatusUpdate消息]
    AA --> BB[TaskScheduler.statusUpdate]
    BB --> CC[更新TaskSetManager状态]
    CC --> DD{TaskSet完成?}
    DD -->|否| E
    DD -->|是| EE[通知DAGScheduler]
    
    style A fill:#e1f5fe
    style H fill:#fff3e0
    style Y fill:#e8f5e8
    style EE fill:#c8e6c9

Stage划分机制

Stage划分原则

  • 宽依赖边界:遇到宽依赖(Shuffle)划分新Stage
  • 窄依赖合并:窄依赖的RDD在同一Stage内Pipeline执行
  • Stage类型:ShuffleMapStage和ResultStage
graph TD
    A[textFile] --> B[flatMap]
    B --> C[map]
    C --> D[reduceByKey]
    D --> E[map]
    E --> F[collect]
    
    subgraph "Stage 0 (ShuffleMapStage)"
        A
        B
        C
    end
    
    subgraph "Stage 1 (ResultStage)"
        E
        F
    end
    
    C -.->|Shuffle| E
    
    style A fill:#e8f5e8
    style B fill:#e8f5e8
    style C fill:#e8f5e8
    style E fill:#e1f5fe
    style F fill:#e1f5fe

本地性调度策略

Spark任务调度的核心是数据本地性,通过就近调度减少网络传输开销。

本地性级别优先级

本地性级别描述数据位置性能影响优先级
PROCESS_LOCAL进程本地同一JVM进程的内存中最快最高
NODE_LOCAL节点本地同一节点的不同进程
NO_PREF无偏好数据分布均匀一般中等
RACK_LOCAL机架本地同一机架的不同节点
ANY任意位置跨机架或远程最慢最低

本地性调度实现

// TaskSetManager中的本地性调度实现
class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet,
    val maxTaskFailures: Int,
    val blacklistTracker: Option[BlacklistTracker] = None,
    clock: Clock = new SystemClock()) extends Schedulable with Logging {

  // 本地性级别定义
  private val myLocalityLevels = computeValidLocalityLevels()
  private val localityWaits = myLocalityLevels.map(getLocalityWait)

  // 计算有效的本地性级别
  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality._
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    
    if (!pendingTasksForExecutor.isEmpty) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty) {
      levels += RACK_LOCAL
    }
    levels += ANY
    
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    levels.toArray
  }

  // 获取本地性等待时间
  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
    val defaultWait = sched.conf.get("spark.locality.wait", "3s")
    val localityWaitKey = level match {
      case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
      case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
      case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
      case _ => null
    }
    
    if (localityWaitKey != null) {
      sched.conf.getTimeAsMs(localityWaitKey, defaultWait)
    } else {
      0L
    }
  }

  // 为特定Executor和本地性级别查找任务
  private def findTask(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality): Option[(Int, TaskLocality.TaskLocality)] = {
    
    for (locality <- TaskLocality.values) {
      if (isAllowedLocality(locality, maxLocality)) {
        val task = locality match {
          case TaskLocality.PROCESS_LOCAL =>
            findTaskFromList(pendingTasksForExecutor.getOrElse(execId, ArrayBuffer()))
            
          case TaskLocality.NODE_LOCAL =>
            findTaskFromList(pendingTasksForHost.getOrElse(host, ArrayBuffer()))
            
          case TaskLocality.NO_PREF =>
            findTaskFromList(pendingTasksWithNoPrefs)
            
          case TaskLocality.RACK_LOCAL =>
            val rack = sched.getRackForHost(host)
            if (rack != null) {
              findTaskFromList(pendingTasksForRack.getOrElse(rack, ArrayBuffer()))
            } else {
              None
            }
            
          case TaskLocality.ANY =>
            findTaskFromList(allPendingTasks)
        }
        
        if (task.isDefined) {
          return Some((task.get, locality))
        }
      }
    }
    None
  }
  
  // 检查是否允许特定本地性级别
  private def isAllowedLocality(
      taskLocality: TaskLocality.TaskLocality,
      maxLocality: TaskLocality.TaskLocality): Boolean = {
    TaskLocality.isAllowed(taskLocality, maxLocality)
  }
}

本地性调度流程图

graph TD
    A[收到资源Offer] --> B[遍历TaskSet]
    B --> C[检查PROCESS_LOCAL]
    C --> D{有进程本地任务?}
    D -->|是| E[分配PROCESS_LOCAL任务]
    D -->|否| F[检查NODE_LOCAL]
    
    F --> G{有节点本地任务?}
    G -->|是| H[检查等待时间]
    H --> I{等待时间未到?}
    I -->|是| J[跳过,等待更好本地性]
    I -->|否| K[分配NODE_LOCAL任务]
    
    G -->|否| L[检查NO_PREF]
    L --> M{有无偏好任务?}
    M -->|是| N[分配NO_PREF任务]
    M -->|否| O[检查RACK_LOCAL]
    
    O --> P{有机架本地任务?}
    P -->|是| Q[分配RACK_LOCAL任务]
    P -->|否| R[分配ANY任务]
    
    E --> S[创建TaskDescription]
    K --> S
    N --> S
    Q --> S
    R --> S
    J --> T[等待下次调度]
    
    style A fill:#e1f5fe
    style E fill:#e8f5e8
    style K fill:#e8f5e8
    style S fill:#c8e6c9

推测执行与容错

推测执行 是Spark处理慢任务(Straggler)的重要机制,通过运行重复任务来提高整体性能。

推测执行触发条件

  1. 任务运行时间超过中位数的1.5倍
  2. 任务进度明显落后于其他任务
  3. 节点或网络出现性能问题
// 推测执行实现
class TaskSetManager {
  
  // 推测执行配置
  private val speculationEnabled = sched.conf.getBoolean("spark.speculation", false)
  private val speculationQuantile = sched.conf.getDouble("spark.speculation.quantile", 0.75)
  private val speculationMultiplier = sched.conf.getDouble("spark.speculation.multiplier", 1.5)
  
  // 检查是否需要推测执行
  def checkSpeculatableTasks(minFinishedForSpeculation: Int): Boolean = {
    if (!speculationEnabled || taskSet.stageId < 0 || taskSet.stageAttemptId < 0) {
      return false
    }
    
    var foundTasks = false
    val finishedTasks = runningTasksSet.size - pendingPartitions.size
    
    if (finishedTasks >= minFinishedForSpeculation && finishedTasks > 0) {
      val time = clock.getTimeMillis()
      val medianDuration = medianTaskDuration
      
      if (medianDuration > 0) {
        val threshold = speculationMultiplier * medianDuration
        
        // 检查每个正在运行的任务
        for ((taskId, info) <- runningTasks) {
          val runtime = time - info.launchTime
          if (runtime > threshold && !speculatableTasks.contains(taskId)) {
            logInfo(s"Marking task $taskId in stage ${taskSet.stageId} (on ${info.host}) " +
              s"as speculatable because it ran more than ${threshold.toDouble / 1000.0} seconds")
            speculatableTasks += taskId
            foundTasks = true
          }
        }
      }
    }
    foundTasks
  }
  
  // 获取可推测的任务
  def dequeueSpeculativeTask(
      execId: String,
      host: String,
      locality: TaskLocality.TaskLocality): Option[(Int, TaskLocality.TaskLocality)] = {
    
    speculatableTasks.retain(index => runningTasks.contains(index))
    
    if (speculatableTasks.nonEmpty) {
      // 检查是否有适合该Executor的推测任务
      for (index <- speculatableTasks) {
        val locations = tasks(index).preferredLocations
        val allowedLocality = getAllowedLocalityLevel(clock.getTimeMillis())
        
        if (TaskLocality.isAllowed(locality, allowedLocality)) {
          if (isTaskLocalToExecutor(index, execId, host, locality)) {
            speculatableTasks -= index
            return Some((index, locality))
          }
        }
      }
    }
    None
  }
  
  // 处理推测任务完成
  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
    val info = taskInfos(tid)
    val index = info.index
    
    // 如果这是推测任务的成功,取消其他副本
    if (speculatableTasks.contains(index)) {
      for ((taskId, taskInfo) <- runningTasks) {
        if (taskInfo.index == index && taskId != tid) {
          logInfo(s"Killing speculative task $taskId")
          sched.backend.killTask(taskId, taskInfo.executorId, interruptThread = true, 
            reason = "another copy of the task succeeded")
        }
      }
      speculatableTasks -= index
    }
    
    // 标记任务成功
    successful(index) = true
    removeRunningTask(tid)
    
    // 如果所有任务都完成,通知DAGScheduler
    if (tasksSuccessful == numTasks) {
      isZombie = true
    }
  }
}

容错处理机制详解

Spark在任务级别提供了完善的容错机制,包括任务重试、Stage重新计算、黑名单管理等。

故障类型与处理策略

故障类型触发条件处理策略影响范围
任务失败Task执行异常重新调度任务单个Task
Executor丢失Executor进程崩溃重新调度所有运行中任务Executor上所有Task
Worker节点失败节点不可达黑名单机制,重新分配节点上所有Executor
网络分区网络不稳定推测执行 + 重试部分Task
// 故障处理实现
class TaskSetManager {
  
  // 处理任务失败
  def handleFailedTask(
      tid: Long,
      state: TaskState,
      reason: TaskFailedReason): Unit = {
    
    val info = taskInfos(tid)
    val index = info.index
    
    removeRunningTask(tid)
    
    reason match {
      case fetchFailed: FetchFailed =>
        // Shuffle数据获取失败,需要重新计算上游Stage
        logWarning(s"Lost task ${info.id} in stage ${taskSet.stageId} (TID $tid, ${info.host}, " +
          s"executor ${info.executorId}): ${reason}")
        
        if (!successful(index)) {
          successful(index) = true
          tasksSuccessful += 1
        }
        
        // 标记需要重新计算的Stage
        abort(s"Task $tid failed with FetchFailedException", Some(fetchFailed))
        
      case exceptionFailure: ExceptionFailure =>
        // 任务执行异常
        val key = (info.host, info.executorId)
        numFailures(index) += 1
        
        if (numFailures(index) >= maxTaskFailures) {
          // 任务失败次数超过阈值,整个TaskSet失败
          val errorMessage = s"Task $index failed $maxTaskFailures times, most recent failure: " +
            exceptionFailure.description
          abort(errorMessage, exceptionFailure.exception)
          return
        } else {
          // 重新加入待调度队列
          addPendingTask(index)
        }
        
      case taskKilled: TaskKilled =>
        // 任务被主动杀死
        logWarning(s"Task $tid killed: ${taskKilled.reason}")
        
      case executorLostFailure: ExecutorLostFailure =>
        // Executor丢失导致的任务失败
        logInfo(s"Task $tid failed because executor ${executorLostFailure.execId} was lost")
        addPendingTask(index)
        
      case _ =>
        // 其他类型失败
        logWarning(s"Lost task $tid in stage ${taskSet.stageId}: $reason")
        addPendingTask(index)
    }
    
    // 更新失败统计
    if (!isZombie) {
      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
        info.host, info.executorId, index, reason))
    }
  }
  
  // 处理Executor丢失
  def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
    // 重新调度该Executor上运行的所有任务
    for ((tid, info) <- runningTasks) {
      if (info.executorId == execId) {
        val reason = ExecutorLostFailure(execId, exitCausedByApp = true, 
          Some(s"Executor lost: $reason"))
        handleFailedTask(tid, TaskState.FAILED, reason)
      }
    }
    
    // 从本地性映射中移除该Executor
    for {
      (host, execIds) <- pendingTasksForHost
      if execIds.contains(execId)
    } {
      execIds -= execId
      if (execIds.isEmpty) {
        pendingTasksForHost.remove(host)
      }
    }
    
    // 重新计算有效的本地性级别
    recomputeLocality()
  }
}

存储与内存管理

BlockManager存储引擎

BlockManager 是Spark中负责数据存储和管理的核心组件,统一管理内存和磁盘上的数据块。

操作时序图

sequenceDiagram
    participant Driver
    participant BlockManagerMaster
    participant Executor
    participant BlockManager
    participant MemoryStore
    participant DiskStore
    
    Driver->>BlockManagerMaster: 1. 注册BlockManager
    Executor->>BlockManager: 2. 初始化BlockManager
    BlockManager->>BlockManagerMaster: 3. 注册到Master
    BlockManager->>MemoryStore: 4. 初始化内存存储
    BlockManager->>DiskStore: 5. 初始化磁盘存储
    
    Note over BlockManager,DiskStore: 数据存储流程
    BlockManager->>MemoryStore: 6. 尝试内存存储
    alt 内存足够
        MemoryStore-->>BlockManager: 7a. 存储成功
    else 内存不足
        BlockManager->>DiskStore: 7b. 磁盘存储
        DiskStore-->>BlockManager: 7c. 存储成功
    end
    
    BlockManager->>BlockManagerMaster: 8. 报告存储状态

BlockManager核心组件详解

1. BlockManager架构组件

组件类名主要职责存储介质
BlockManagerBlockManager数据块管理总控制器内存+磁盘
MemoryStoreMemoryStore内存数据存储JVM堆内存
DiskStoreDiskStore磁盘数据存储本地磁盘
BlockManagerMasterBlockManagerMaster元数据管理Driver内存
BlockInfoManagerBlockInfoManagerBlock信息管理内存索引

2. BlockManager创建与初始化

class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    master: BlockManagerMaster,
    serializerManager: SerializerManager,
    conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker)
  extends BlockDataManager with BlockEvictionHandler with Logging {

  // 核心组件初始化
  private[spark] val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
  private[spark] val blockInfoManager = new BlockInfoManager
  
  // 初始化存储组件
  private[spark] val memoryStore = new MemoryStore(conf, blockInfoManager)
  private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
  
  // 注册到Master
  master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
}

3. 数据块存储流程

// 数据块存储的核心方法
def putBlockData(
    blockId: BlockId,
    data: BlockData,
    level: StorageLevel,
    tellMaster: Boolean = true): Boolean = {
  
  // 1. 检查存储级别
  if (level.useMemory) {
    // 2. 尝试存储到内存
    val putSucceeded = memoryStore.putBytes(blockId, data, level)
    if (putSucceeded) {
      // 3. 通知Master
      if (tellMaster) {
        reportBlockStatus(blockId, BlockStatus(level, 0, 0))
      }
      return true
    }
  }
  
  // 4. 内存不足,存储到磁盘
  if (level.useDisk) {
    val putSucceeded = diskStore.putBytes(blockId, data)
    if (putSucceeded) {
      if (tellMaster) {
        reportBlockStatus(blockId, BlockStatus(level, 0, data.size))
      }
      return true
    }
  }
  
  false
}

4. 数据块获取流程

// 数据块获取的核心方法
def get[T](blockId: BlockId): Option[BlockResult[T]] = {
  // 1. 检查本地内存
  memoryStore.get(blockId) match {
    case Some(blockResult) => return Some(blockResult)
    case None => // 继续查找
  }
  
  // 2. 检查本地磁盘
  diskStore.get(blockId) match {
    case Some(blockResult) => return Some(blockResult)
    case None => // 继续查找
  }
  
  // 3. 从远程获取
  getRemote(blockId)
}

def getRemote[T](blockId: BlockId): Option[BlockResult[T]] = {
  // 1. 从Master获取block位置
  val locations = master.getLocations(blockId)
  
  // 2. 从远程节点获取
  for (location <- locations) {
    val blockResult = blockTransferService.fetchBlockSync(
      location.host, location.port, location.executorId, blockId.toString)
    if (blockResult.isDefined) {
      return blockResult
    }
  }
  
  None
}

统一内存管理

Spark内存分区架构

graph TD
    A[JVM内存] --> B[堆内内存<br/>On-Heap]
    A --> C[堆外内存<br/>Off-Heap]
    
    B --> D[存储内存<br/>Storage Memory]
    B --> E[执行内存<br/>Execution Memory]
    B --> F[其他内存<br/>Other Memory]
    
    D --> G[RDD缓存<br/>广播变量]
    E --> H[Shuffle<br/>Join聚合]
    F --> I[用户数据结构<br/>Spark内部对象]
    
    C --> J[堆外存储<br/>Off-Heap Storage]
    C --> K[堆外执行<br/>Off-Heap Execution]
    
    style B fill:#e8f5e8
    style C fill:#e1f5fe
    style D fill:#fff3e0
    style E fill:#ffebee

1. 内存管理架构组件

组件类名主要职责管理范围
MemoryManagerUnifiedMemoryManager统一内存管理器堆内+堆外内存
StorageMemoryPoolStorageMemoryPool存储内存池缓存数据内存
ExecutionMemoryPoolExecutionMemoryPool执行内存池任务执行内存
MemoryStoreMemoryStore内存存储管理缓存数据存储
TaskMemoryManagerTaskMemoryManager任务内存管理单个任务内存

2. MemoryStore缓存管理

// MemoryStore核心实现
class MemoryStore(
    conf: SparkConf,
    blockInfoManager: BlockInfoManager)
  extends BlockStore(BlockStore.MEMORY) with BlockEvictionHandler with Logging {

  // 内存映射表
  private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
  
  // 当前内存使用量
  private var _currentMemory = 0L
  
  def putBytes[T](
      blockId: BlockId,
      size: Long,
      memoryMode: MemoryMode,
      _bytes: () => ChunkedByteBuffer): Boolean = {
    
    // 1. 检查内存是否足够
    if (!memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
      return false
    }
    
    // 2. 分配内存并存储数据
    val bytes = _bytes()
    val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
    entries.synchronized {
      entries.put(blockId, entry)
      _currentMemory += size
    }
    
    true
  }
  
  def get[T](blockId: BlockId): Option[BlockResult[T]] = {
    entries.synchronized {
      entries.get(blockId) match {
        case entry: SerializedMemoryEntry[T] =>
          Some(BlockResult(entry.value.asInstanceOf[T], DataReadMethod.Memory, entry.size))
        case entry: DeserializedMemoryEntry[T] =>
          Some(BlockResult(entry.value.asInstanceOf[T], DataReadMethod.Memory, entry.size))
        case _ => None
      }
    }
  }
}

3. TaskMemoryManager任务内存管理

// TaskMemoryManager核心实现
class TaskMemoryManager(
    memoryManager: MemoryManager,
    taskAttemptId: Long)
  extends MemoryManager with Logging {

  // 任务内存映射表
  private val memoryForTask = new mutable.HashMap[Long, Long]()
  
  // 内存分配方法
  def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = {
    
    // 1. 尝试从执行内存池分配
    val acquired = memoryManager.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
    
    // 2. 记录分配的内存
    if (acquired > 0) {
      memoryForTask.synchronized {
        memoryForTask(taskAttemptId) = memoryForTask.getOrElse(taskAttemptId, 0L) + acquired
      }
    }
    
    acquired
  }
  
  // 释放内存
  def releaseExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Unit = {
    
    memoryManager.releaseExecutionMemory(numBytes, taskAttemptId, memoryMode)
    
    memoryForTask.synchronized {
      val current = memoryForTask.getOrElse(taskAttemptId, 0L)
      val newTotal = math.max(0L, current - numBytes)
      if (newTotal == 0) {
        memoryForTask.remove(taskAttemptId)
      } else {
        memoryForTask(taskAttemptId) = newTotal
      }
    }
  }
}

缓存与持久化策略

统一内存管理

class UnifiedMemoryManager(
    conf: SparkConf,
    val maxHeapMemory: Long,
    onHeapStorageRegionSize: Long,
    numCores: Int)
  extends MemoryManager(conf, numCores, onHeapStorageRegionSize, maxHeapMemory) {

  // 内存池配置
  private val maxPoolSize = maxHeapMemory - reservedMemory
  private val poolSize = maxPoolSize * memoryFraction
  
  // 动态内存分配
  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapStorageMemory)
    }
    
    if (numBytes > maxMemory) {
      return false
    }
    
    if (numBytes > storagePool.memoryFree) {
      // 尝试从执行内存池借用
      val memoryBorrowedFromExecution = math.min(
        executionPool.memoryFree, 
        numBytes - storagePool.memoryFree)
      
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    
    storagePool.acquireMemory(blockId, numBytes)
  }
}

Shuffle数据交换

Shuffle 是Spark中数据重新分布的过程,发生在需要跨分区进行数据交换的操作中。

Shuffle类型对比

Shuffle类型特点优缺点
Hash Shuffle每个Map Task为每个Reduce Task创建文件文件数过多,影响性能
Sort Shuffle每个Map Task创建一个文件,按分区排序减少文件数,提高性能
Tungsten Sort使用堆外内存,优化排序性能内存使用更高效

Hash Shuffle

graph TD
    subgraph "Map阶段"
        A[Map Task 1] --> D[File 1-1]
        A --> E[File 1-2]
        A --> F[File 1-3]
        
        B[Map Task 2] --> G[File 2-1]
        B --> H[File 2-2] 
        B --> I[File 2-3]
        
        C[Map Task 3] --> J[File 3-1]
        C --> K[File 3-2]
        C --> L[File 3-3]
    end
    
    subgraph "Reduce阶段"
        D --> M[Reduce Task 1]
        G --> M
        J --> M
        
        E --> N[Reduce Task 2]
        H --> N
        K --> N
        
        F --> O[Reduce Task 3]
        I --> O
        L --> O
    end
    
    style A fill:#ffebee
    style B fill:#ffebee
    style C fill:#ffebee

Hash Shuffle问题

  • 文件数爆炸:M个Map Task × N个Reduce Task = M×N个文件
  • 随机I/O:大量小文件导致随机I/O
  • 内存压力:需要为每个文件维护缓冲区

Sort Shuffle

graph TD
    subgraph "Map阶段"
        A[Map Task 1] --> D[排序缓冲区]
        D --> E[单个输出文件]
        E --> F[索引文件]
        
        B[Map Task 2] --> G[排序缓冲区]
        G --> H[单个输出文件]
        H --> I[索引文件]
    end
    
    subgraph "Reduce阶段"
        F --> J[Reduce Task 1]
        I --> J
        
        F --> K[Reduce Task 2]
        I --> K
    end
    
    style D fill:#fff3e0
    style G fill:#fff3e0
    style E fill:#e8f5e8
    style H fill:#e8f5e8

Sort Shuffle优势

  • 文件数减少:每个Map Task只产生一个数据文件和一个索引文件
  • 顺序I/O:数据按分区ID排序写入,提高I/O效率
  • 内存优化:使用外部排序,支持spill到磁盘

Tungsten优化

  • 堆外内存管理:减少GC压力
  • 缓存友好的数据结构:提高CPU缓存命中率
  • 代码生成:运行时生成优化的字节码
// Tungsten Sort实现
class UnsafeShuffleWriter[K, V](
    blockManager: BlockManager,
    shuffleBlockResolver: IndexShuffleBlockResolver,
    taskMemoryManager: TaskMemoryManager,
    handle: SerializedShuffleHandle[K, V],
    mapId: Int,
    context: TaskContext)
  extends ShuffleWriter[K, V] with Logging {

  private val partitioner = handle.dependency.partitioner
  private val numPartitions = partitioner.numPartitions
  private var sorter: UnsafeShuffleExternalSorter = _
  
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    // 使用Tungsten内存管理
    val taskContext = context.asInstanceOf[TaskContextImpl]
    sorter = UnsafeShuffleExternalSorter.create(
      taskContext.taskMemoryManager(),
      blockManager,
      context,
      numPartitions,
      shouldCompress = true)

    // 序列化并插入记录
    while (records.hasNext) {
      insertRecordIntoSorter(records.next())
    }
    
    // 写出排序结果
    val outputFile = shuffleBlockResolver.getDataFile(handle.shuffleId, mapId)
    val partitionLengths = sorter.closeAndGetSpills.map(_.file)
      .foldLeft(Array.fill[Long](numPartitions)(0)) { (lengths, file) =>
        // 合并spill文件
        mergeSpillsWithTransferTo(file, outputFile, lengths)
      }
    
    shuffleBlockResolver.writeIndexFileAndCommit(handle.shuffleId, mapId, partitionLengths, outputFile)
  }
}

Hash Shuffle 时序图

sequenceDiagram
    participant MapTask
    participant HashWriter
    participant FileSystem
    participant ReduceTask
    participant HashReader
    
    MapTask->>HashWriter: 写入记录
    HashWriter->>FileSystem: 为每个分区创建文件
    Note over FileSystem: M×N个文件创建
    HashWriter->>FileSystem: 写入数据到对应文件
    
    ReduceTask->>HashReader: 开始读取
    HashReader->>FileSystem: 读取相关分区文件
    FileSystem-->>HashReader: 返回数据
    HashReader-->>ReduceTask: 聚合后数据

Sort Shuffle 时序图

sequenceDiagram
    participant MapTask
    participant SortWriter
    participant ExternalSorter
    participant FileSystem
    participant ReduceTask
    participant SortReader
    
    MapTask->>SortWriter: 写入记录
    SortWriter->>ExternalSorter: 缓存并排序
    ExternalSorter->>ExternalSorter: 内存排序/Spill
    ExternalSorter->>FileSystem: 写入单个数据文件
    SortWriter->>FileSystem: 写入索引文件
    
    ReduceTask->>SortReader: 开始读取
    SortReader->>FileSystem: 根据索引读取数据
    FileSystem-->>SortReader: 返回分区数据
    SortReader-->>ReduceTask: 聚合后数据

Shuffle机制原理

Spark Shuffle拉取流程层次图

graph TD
    subgraph "Map端 (ShuffleMapTask)"
        A1["Map Task执行"] --> A2["数据写入内存缓冲区<br/>(PartitionedAppendOnlyMap)"]
        A2 --> A3{"内存是否充足?"}
        A3 -->|否| A4["溢写到磁盘<br/>(Spill Files)"]
        A3 -->|是| A5["继续缓存到内存"]
        A4 --> A6["ExternalSorter排序"]
        A5 --> A6
        A6 --> A7["写入本地磁盘<br/>(shuffle_shuffleId_mapId_0.data)"]
        A7 --> A8["生成索引文件<br/>(shuffle_shuffleId_mapId_0.index)"]
        A8 --> A9["向Driver注册<br/>MapOutputTracker"]
    end

    subgraph "Driver端 (协调层)"
        B1["MapOutputTracker"] --> B2["维护Shuffle数据位置信息<br/>(shuffleId -> Array[MapStatus])"]
        B2 --> B3["响应Reduce Task<br/>的位置查询请求"]
    end

    subgraph "Reduce端 (Reduce Task)"
        C1["Reduce Task启动"] --> C2["向MapOutputTracker<br/>请求数据位置"]
        C2 --> C3["获取目标数据块位置<br/>(BlockManagerId + BlockId)"]
        C3 --> C4["ShuffleBlockFetcherIterator<br/>创建拉取器"]
        
        subgraph "数据拉取层"
            C4 --> D1["本地数据拉取<br/>(LocalDirIterator)"]
            C4 --> D2["远程数据拉取<br/>(RemoteBlockIterator)"]
            D1 --> D3["本地磁盘读取"]
            D2 --> D4["网络传输<br/>(NettyBlockTransferService)"]
            D3 --> D5["数据反序列化"]
            D4 --> D5
        end
        
        subgraph "BlockManager处理层"
            D5 --> E1["BlockManager接收数据"]
            E1 --> E2{"数据大小检查"}
            E2 -->|小数据| E3["直接内存缓存<br/>(MemoryStore)"]
            E2 -->|大数据| E4["磁盘临时存储<br/>(DiskStore)"]
            E3 --> E5["数据解压缩"]
            E4 --> E5
        end
        
        subgraph "数据处理层"
            E5 --> F1["ShuffleBlockFetcherIterator<br/>迭代处理"]
            F1 --> F2{"是否需要聚合?"}
            F2 -->|是| F3["Aggregator聚合<br/>(combineByKey)"]
            F2 -->|否| F4["直接传递数据"]
            F3 --> F5{"是否需要排序?"}
            F4 --> F5
            F5 -->|是| F6["ExternalSorter排序"]
            F5 -->|否| F7["返回最终迭代器"]
            F6 --> F7
        end
    end

    subgraph "网络传输层"
        G1["NettyBlockTransferService"] --> G2["建立网络连接"]
        G2 --> G3["分批次拉取数据<br/>(maxBytesInFlight)"]
        G3 --> G4["流控制管理<br/>(maxReqsInFlight)"]
        G4 --> G5["数据传输完成"]
    end

    %% 连接关系
    A9 -.->|注册| B1
    B3 -.->|位置信息| C3
    D2 -.->|网络请求| G1
    G5 -.->|数据返回| D4
    
    %% 样式设置
    style A7 fill:#e1f5fe
    style A8 fill:#e1f5fe
    style C4 fill:#f3e5f5
    style F1 fill:#f3e5f5
    style E1 fill:#fff3e0
    style G3 fill:#e8f5e8
    
    classDef mapStage fill:#e3f2fd
    classDef reduceStage fill:#f1f8e9
    classDef networkStage fill:#fff8e1
    classDef driverStage fill:#fce4ec
    
    class A1,A2,A3,A4,A5,A6,A7,A8,A9 mapStage
    class C1,C2,C3,C4,D1,D2,D3,D4,D5,E1,E2,E3,E4,E5,F1,F2,F3,F4,F5,F6,F7 reduceStage
    class G1,G2,G3,G4,G5 networkStage
    class B1,B2,B3 driverStage

Shuffle拉取流程关键组件详解

Shuffle拉取流程阶段说明

阶段组件主要功能关键类性能要点
Map端写入ShuffleMapTask数据分区、排序、写入本地磁盘SortShuffleWriter
ExternalSorter
内存管理、溢写控制
Driver协调MapOutputTracker维护Shuffle数据位置信息MapOutputTrackerMaster
MapStatus
元数据管理、负载均衡
Reduce端拉取ShuffleBlockFetcherIterator数据位置查询、批量拉取BlockStoreShuffleReader
ShuffleBlockFetcherIterator
网络优化、本地性优先
网络传输NettyBlockTransferService建立连接、数据传输、流控制NettyBlockTransferService
TransportClient
并发控制、批量传输
BlockManager存储管理数据缓存、磁盘存储、内存管理BlockManager
MemoryStore
DiskStore
内存/磁盘平衡
数据处理Aggregator/Sorter数据聚合、排序、输出Aggregator
ExternalSorter
算法优化、内存复用

Shuffle拉取流程详细步骤

阶段1:Map端数据写入 (Mapper → 本地磁盘)

  1. 数据分区:根据Partitioner将数据分配到不同分区
  2. 内存缓存:优先使用PartitionedAppendOnlyMap缓存数据
  3. 溢写控制:内存不足时通过ExternalSorter溢写到磁盘
  4. 文件合并:将多个溢写文件和内存数据合并成单个文件
  5. 索引生成:创建.index文件记录每个分区的数据位置

阶段2:Driver端元数据管理 (MapOutputTracker)

  1. 位置注册:Map Task完成后向Driver注册输出位置
  2. 元数据维护:维护shuffleId → Array[MapStatus]映射
  3. 位置查询:响应Reduce Task的数据位置查询请求
  4. 负载均衡:根据数据大小优化Reduce Task的拉取策略

阶段3:Reduce端数据拉取 (ShuffleBlockFetcherIterator)

  1. 位置获取:向MapOutputTracker查询目标数据位置
  2. 拉取策略:优先本地拉取,再进行远程网络拉取
  3. 批量处理:通过maxBytesInFlight控制批量大小
  4. 并发控制:通过maxReqsInFlight限制并发请求数
  5. 失败重试:网络异常时自动重试和故障转移

阶段4:BlockManager数据管理

  1. 存储决策:根据数据大小选择内存或磁盘存储
  2. 压缩处理:自动处理数据压缩和解压缩
  3. 缓存管理:LRU策略管理内存中的数据块
  4. 磁盘管理:临时文件的创建、读取和清理

阶段5:数据处理与输出

  1. 反序列化:将字节流转换为具体的数据对象
  2. 聚合处理:如果需要,使用Aggregator进行数据聚合
  3. 排序处理:如果指定了排序规则,使用ExternalSorter排序
  4. 迭代器输出:返回最终的数据迭代器供下游使用

关键性能优化配置

// Shuffle拉取相关配置
spark.reducer.maxSizeInFlight = 48m          // 同时拉取的最大数据量
spark.reducer.maxReqsInFlight = 2147483647   // 同时发出的最大请求数
spark.reducer.maxBlocksInFlightPerAddress = 2147483647  // 每个地址的最大块数
spark.shuffle.compress = true                // 启用Shuffle数据压缩
spark.shuffle.spill.compress = true         // 启用溢写数据压缩
spark.shuffle.io.maxRetries = 3             // 拉取失败最大重试次数
spark.shuffle.io.retryWait = 5s             // 重试等待时间

1. ShuffleManager架构组件

组件类名主要职责适用场景
SortShuffleManagerSortShuffleManagerSort Shuffle管理器默认Shuffle方式
HashShuffleManagerHashShuffleManagerHash Shuffle管理器已废弃
ShuffleWriterSortShuffleWriter, UnsafeShuffleWriterShuffle写入器Map端数据写入
ShuffleReaderBlockStoreShuffleReaderShuffle读取器Reduce端数据读取

2. ShuffleWriter核心实现

// Sort Shuffle实现核心
class SortShuffleWriter[K, V, C](
    shuffleBlockResolver: IndexShuffleBlockResolver,
    handle: BaseShuffleHandle[K, V, C],
    mapId: Int,
    context: TaskContext)
  extends ShuffleWriter[K, V] with Logging {

  private val dep = handle.dependency
  private val blockManager = SparkEnv.get.blockManager
  private val sorter: ExternalSorter[K, V, _] = {
    if (dep.mapSideCombine) {
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    }
  }

  // 写入数据
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter.insertAll(records)
    
    // 获取输出文件
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    
    // 写入排序后的数据
    val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
    
    // 写入索引文件
    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, outputFile)
  }
}

3. ExternalSorter内存管理

// ExternalSorter核心实现
class ExternalSorter[K, V, C](
    context: TaskContext,
    aggregator: Option[Aggregator[K, V, C]] = None,
    partitioner: Option[Partitioner] = None,
    ordering: Option[Ordering[K]] = None,
    serializer: Serializer = SparkEnv.get.serializer)
  extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())
  with Logging {

  // 内存中的数据结构
  private var map = new PartitionedAppendOnlyMap[K, C]
  private val buffer = new PartitionedPairBuffer[K, C]

  // 插入数据
  def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    val shouldCombine = aggregator.isDefined
    
    if (shouldCombine) {
      // 需要聚合的情况
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // 不需要聚合的情况
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

  // Spill到磁盘
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): SpilledFile = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    collection.clear()
    spillFile
  }
}

4. ShuffleReader数据读取

// ShuffleReader核心实现
class BlockStoreShuffleReader[K, C](
    handle: BaseShuffleHandle[K, _, C],
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    serializerManager: SerializerManager = SparkEnv.get.serializerManager,
    blockManager: BlockManager = SparkEnv.get.blockManager,
    mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
  extends ShuffleReader[K, C] with Logging {

  private val dep = handle.dependency

  override def read(): Iterator[Product2[K, C]] = {
    // 1. 获取Shuffle数据块位置
    val blocksByAddress = mapOutputTracker.getMapSizesByExecutorId(
      handle.shuffleId, startPartition, endPartition)
    
    // 2. 读取数据块
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.blockTransferService,
      blockManager,
      blocksByAddress,
      serializerManager.wrapStream(blockId, _),
      // 注意:我们使用serializerManager来获取压缩和加密包装器
      maxBytesInFlight = SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      maxReqsInFlight = SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      maxBlocksInFlightPerAddress = SparkEnv.get.conf.getInt(
        "spark.reducer.maxBlocksInFlightPerAddress", Int.MaxValue),
      maxReqSizeShuffleToMem = SparkEnv.get.conf.getSizeAsBytes(
        "spark.reducer.maxReqSizeShuffleToMem", Long.MaxValue),
      detectCorrupt = SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

    // 3. 反序列化并聚合
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // Map端已经聚合,Reduce端继续聚合
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        // Map端未聚合,Reduce端进行聚合
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, V)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }

    // 4. 排序(如果需要)
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // 创建ExternalSorter进行排序
        val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
        sorter.insertAll(aggregatedIter)
        context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
  }
}

5. ShuffleBlockResolver文件管理

// ShuffleBlockResolver核心实现
class IndexShuffleBlockResolver(conf: SparkConf, _blockManager: BlockManager = null)
  extends ShuffleBlockResolver with Logging {

  // 获取数据文件
  def getDataFile(shuffleId: Int, mapId: Long): File = {
    new File(getShuffleDataDir(shuffleId), s"shuffle_${shuffleId}_${mapId}_0.data")
  }
  
  // 获取索引文件
  def getIndexFile(shuffleId: Int, mapId: Long): File = {
    new File(getShuffleDataDir(shuffleId), s"shuffle_${shuffleId}_${mapId}_0.index")
  }
  
  // 写入索引文件并提交
  def writeIndexFileAndCommit(
      shuffleId: Int,
      mapId: Long,
      lengths: Array[Long],
      dataTmp: File): Unit = {
    
    val indexFile = getIndexFile(shuffleId, mapId)
    val indexTmp = new File(indexFile.getAbsolutePath + ".tmp")
    
    try {
      val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
      Utils.tryWithSafeFinally {
        // 写入偏移量
        var offset = 0L
        out.writeLong(offset)
        for (length <- lengths) {
          offset += length
          out.writeLong(offset)
        }
      } {
        out.close()
      }
      
      // 原子性重命名
      val dataFile = getDataFile(shuffleId, mapId)
      if (dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
        throw new IOException("Failed to rename data file")
      }
      if (!indexTmp.renameTo(indexFile)) {
        throw new IOException("Failed to rename index file")
      }
    } catch {
      case e: Exception =>
        indexTmp.delete()
        throw e
    }
  }
}

6. 数据流组件交互

sequenceDiagram
  participant MapTask
  participant ExternalSorter
  participant ShuffleWriter
  participant ShuffleBlockResolver
  participant Disk
  participant ReduceTask
  participant ShuffleReader
  participant BlockManager

  MapTask->>ExternalSorter: insertAll(records)
  ExternalSorter->>ExternalSorter: 内存排序/Spill
  ExternalSorter->>ShuffleWriter: writePartitionedFile()
  ShuffleWriter->>ShuffleBlockResolver: writeIndexFileAndCommit()
  ShuffleBlockResolver->>Disk: 写入data/index文件
  
  ReduceTask->>ShuffleReader: read()
  ShuffleReader->>BlockManager: 获取block位置
  BlockManager->>Disk: 读取Shuffle文件
  Disk-->>ShuffleReader: 返回数据
  ShuffleReader->>ReduceTask: 聚合/排序结果

7. Shuffle性能监控组件

// Shuffle性能指标收集
class ShuffleWriteMetrics extends TaskMetrics {
  // 写入字节数
  private var _bytesWritten: Long = 0L
  // 写入记录数
  private var _recordsWritten: Long = 0L
  // 写入时间
  private var _writeTime: Long = 0L
  
  def bytesWritten: Long = _bytesWritten
  def recordsWritten: Long = _recordsWritten
  def writeTime: Long = _writeTime
}

class ShuffleReadMetrics extends TaskMetrics {
  // 读取字节数
  private var _bytesRead: Long = 0L
  // 读取记录数
  private var _recordsRead: Long = 0L
  // 读取时间
  private var _readTime: Long = 0L
  // 远程读取字节数
  private var _remoteBytesRead: Long = 0L
  
  def bytesRead: Long = _bytesRead
  def recordsRead: Long = _recordsRead
  def readTime: Long = _readTime
  def remoteBytesRead: Long = _remoteBytesRead
}

Shuffle性能优化

主要优化策略

  • 压缩spark.shuffle.compress,减少网络传输量
  • 合理设置分区数spark.sql.shuffle.partitions,避免分区过多或过少
  • 使用本地化Shuffle:减少网络I/O
  • 启用spill机制:内存不足时溢写磁盘,防止OOM
  • 聚合缓冲区:Map端本地聚合,减少传输数据量

1. 分区优化策略

# 推荐设置(根据数据量调整)
spark.sql.shuffle.partitions=200
spark.default.parallelism=200

# 动态调整分区数
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true

2. 动态资源分配

# 启用动态分配
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.initialExecutors=2

# 资源分配策略
spark.dynamicAllocation.executorIdleTimeout=60s
spark.dynamicAllocation.cachedExecutorIdleTimeout=120s

3. 压缩与序列化优化

配置项推荐值说明
spark.shuffle.compresstrue启用Shuffle压缩
spark.shuffle.compress.codecsnappy压缩算法选择
spark.serializerKryoSerializer序列化器选择
spark.kryo.registrationRequiredfalse是否要求注册类

4. 本地化Shuffle优化

# 本地化配置
spark.locality.wait=3s
spark.locality.wait.process=3s
spark.locality.wait.node=3s
spark.locality.wait.rack=3s

5. 高级优化技巧

Map端聚合

// 使用reduceByKey替代groupByKey
val result = rdd.reduceByKey(_ + _)  // 推荐
// val result = rdd.groupByKey().mapValues(_.sum)  // 不推荐

广播变量优化

// 小表广播,避免Shuffle
val smallTable = spark.table("small_table").collect()
val broadcastVar = spark.sparkContext.broadcast(smallTable)

数据倾斜处理

1. 数据倾斜问题

现象:某些分区数据量远大于其他分区,导致Task执行时间差异很大

解决方案

// 方案1:加盐处理
val skewedRDD = rdd.map(x => {
  val key = x._1
  val value = x._2
  if (isSkewedKey(key)) {
    (key + "_" + Random.nextInt(10), value)
  } else {
    (key, value)
  }
})

// 方案2:自定义分区器
class SkewPartitioner(numPartitions: Int) extends Partitioner {
  override def numPartitions: Int = numPartitions
  override def getPartition(key: Any): Int = {
    // 自定义分区逻辑,避免数据倾斜
    val rawKey = key.toString.split("_")(0)
    math.abs(rawKey.hashCode) % numPartitions
  }
}

2. Shuffle文件过多问题

现象:Shuffle过程中产生大量小文件,影响性能

解决方案

# 合并小文件
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200

3. 内存溢出问题

现象:Shuffle过程中出现OOM

解决方案

# 启用Spill机制
spark.shuffle.spill=true
spark.shuffle.spill.compress=true

# 调整内存配置
spark.executor.memory=4g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.3

容错与可靠性

血缘关系容错

Spark容错机制 基于RDD血缘关系实现,是Spark高可用性的核心保障。

血缘关系原理

graph TD
    A["textFile('input')"] --> B["flatMap(_.split(' '))"]
    B --> C["map((_, 1))"]
    C --> D["reduceByKey(_ + _)"]
    D --> E["saveAsTextFile('output')"]
    
    F["分区0丢失"] --> G["根据血缘关系"]
    G --> H["重新计算分区0"]
    H --> I["恢复数据"]
    
    style A fill:#e1f5fe
    style F fill:#ffebee
    style I fill:#e8f5e8

容错机制对比

容错方式实现原理恢复速度存储开销适用场景
血缘关系重新计算丢失分区中等计算链条较短
Checkpoint持久化中间结果计算链条很长
缓存内存/磁盘存储最快中等多次使用的RDD

容错实现机制

// RDD血缘关系实现
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable {

  // 1. 计算分区数据
  def compute(split: Partition, context: TaskContext): Iterator[T]
  
  // 2. 获取依赖关系
  protected def getDependencies: Seq[Dependency[_]] = deps
  
  // 3. 分区器
  @transient val partitioner: Option[Partitioner] = None
  
  // 4. 首选位置
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  
  // 5. 容错恢复
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }
}

// 依赖关系类型
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

// 窄依赖 - 每个父RDD分区最多被一个子分区使用
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  override def rdd: RDD[T] = _rdd
  def getParents(partitionId: Int): Seq[Int]
}

// 宽依赖 - 每个父RDD分区可能被多个子分区使用  
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
  
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
  
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)
}

容错恢复流程

sequenceDiagram
    participant Task as 失败任务
    participant Executor as Executor
    participant Driver as Driver
    participant DAGScheduler as DAGScheduler
    
    Task->>Executor: 任务执行失败
    Executor->>Driver: 报告任务失败
    Driver->>DAGScheduler: 处理任务失败事件
    DAGScheduler->>DAGScheduler: 分析失败原因
    
    alt 数据分区丢失
        DAGScheduler->>DAGScheduler: 查找血缘关系
        DAGScheduler->>DAGScheduler: 重新计算丢失分区
        DAGScheduler->>Executor: 重新提交任务
    else Executor失败
        DAGScheduler->>DAGScheduler: 标记Executor不可用
        DAGScheduler->>DAGScheduler: 重新调度所有受影响任务
    end
    
    Executor->>Driver: 任务重新执行成功
    Driver->>DAGScheduler: 更新任务状态

Checkpoint检查点

Checkpoint 是Spark提供的一种容错优化机制,通过将RDD持久化到可靠存储来缩短血缘关系链条。

Checkpoint实现原理

// RDD Checkpoint实现
abstract class RDD[T: ClassTag] {
  
  // Checkpoint状态
  private var checkpointData: Option[RDDCheckpointData[T]] = None
  
  // 设置Checkpoint目录
  def checkpoint(): Unit = {
    RDDCheckpointData.synchronized {
      if (context.checkpointDir.isEmpty) {
        throw new SparkException("Checkpoint directory has not been set in the SparkContext")
      } else if (checkpointData.isEmpty) {
        checkpointData = Some(new ReliableRDDCheckpointData(this))
      }
    }
  }
  
  // 执行Checkpoint
  private[spark] def doCheckpoint(): Unit = {
    RDDCheckpointData.synchronized {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
            dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        }
      }
    }
  }
  
  // 检查是否已Checkpoint
  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
  
  // 获取Checkpoint文件
  def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointDir)
}

// Checkpoint数据管理
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {

  // Checkpoint文件路径
  private val checkpointPath = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
  
  // 执行Checkpoint操作
  protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, checkpointPath)
    
    // 清理血缘关系
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach(_.registerRDDCheckpointDataForCleanup(newRDD, rdd.id))
    }
    
    logInfo(s"Done checkpointing RDD ${rdd.id} to $checkpointPath, new parent is RDD ${newRDD.id}")
    newRDD
  }
}

Checkpoint使用场景

场景描述使用建议
长血缘链条RDD经过多次转换操作在中间节点设置Checkpoint
宽依赖较多包含大量shuffle操作在shuffle后设置Checkpoint
迭代计算机器学习等迭代算法在每次迭代后Checkpoint
共享RDD被多个Action使用的RDD第一次使用前Checkpoint

故障恢复机制

Spark故障恢复策略总结

故障级别恢复机制恢复时间数据丢失适用场景
Task失败重新调度Task秒级偶发故障
Executor失败重启Executor,重算丢失分区分钟级无(如有缓存则丢失)进程崩溃
Driver失败应用重启小时级应用状态丢失驱动进程异常
数据分区丢失血缘关系重算分钟级存储故障
网络分区推测执行+重试秒到分钟级网络不稳定

资源管理与通信

调度算法策略

Spark支持多种任务调度算法,根据应用场景选择合适的调度策略。

调度算法对比

调度算法调度原理适用场景公平性响应时间
FIFO先进先出单用户批处理
Fair公平共享资源多用户交互式
Capacity队列容量分配企业级多租户中等中等

Fair Scheduler实现原理

graph TD
    A[应用程序提交] --> B[调度池分配]
    B --> C{调度算法}
    C -->|FIFO| D[队列内FIFO调度]
    C -->|Fair| E[公平共享调度]
    
    E --> F[计算资源份额]
    F --> G[按缺失资源排序]
    G --> H[分配资源]
    
    D --> I[按提交顺序执行]
    I --> H
    
    H --> J[资源监控]
    J --> K[动态调整]
    K --> F
    
    style B fill:#e1f5fe
    style F fill:#e8f5e8
    style K fill:#fff3e0
// Fair Scheduler核心实现
class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder {

  override def buildPools(): Unit = {
    val fairSchedulerConfigFile = conf.getOption("spark.scheduler.allocation.file")
    val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
    
    val xmlPath = fairSchedulerConfigFile match {
      case Some(file) => file
      case None => DEFAULT_SCHEDULER_FILE
    }
    
    val rootName = rootPool.name
    var is: InputStream = null
    
    try {
      is = Utils.getSparkClassLoader.getResourceAsStream(xmlPath)
      if (is != null) {
        buildPoolsFromXML(is)
      } else {
        buildDefaultPool()
      }
    } finally {
      if (is != null) is.close()
    }
    
    // 构建默认调度池
    buildDefaultPool()
  }
  
  private def buildDefaultPool(): Unit = {
    if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, 0, 1)
      rootPool.addSchedulable(pool)
      logInfo("Created default pool: " + DEFAULT_POOL_NAME)
    }
  }
  
  private def buildPoolsFromXML(is: InputStream): Unit = {
    val xml = XML.load(is)
    
    for (poolNode <- (xml \\ POOLS_PROPERTY)) {
      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
      
      val schedulingMode = getSchedulingModeValue(poolNode, SCHEDULING_MODE_PROPERTY,
        DEFAULT_SCHEDULING_MODE, poolName)
      val minShare = getIntValue(poolNode, MINIMUM_SHARES_PROPERTY, 0, poolName)
      val weight = getIntValue(poolNode, WEIGHT_PROPERTY, 1, poolName)
      
      rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
      logInfo("Created pool: " + poolName)
    }
  }
}

// 公平调度池实现
class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode,
    initMinShare: Int,
    initWeight: Int)
  extends Schedulable with Logging {

  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
  
  var weight = initWeight
  var minShare = initMinShare
  var runningTasks = 0
  var priority = 0
  
  // Fair调度算法实现
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }
  
  // Fair算法比较器
  def taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR => new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm()
      case _ => new FairSchedulingAlgorithm()
    }
  }
}

// 公平调度算法
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    
    var compare = 0
    
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

动态资源分配

动态资源分配 允许Spark应用程序根据工作负载动态添加和移除Executor。

动态分配原理图

graph TD
    A[应用程序启动] --> B[初始Executor数量]
    B --> C[监控任务队列]
    C --> D{资源需求分析}
    
    D -->|任务积压| E[请求新Executor]
    D -->|Executor空闲| F[移除空闲Executor]
    D -->|负载适中| G[保持当前状态]
    
    E --> H[集群管理器分配资源]
    H --> I[启动新Executor]
    I --> C
    
    F --> J[优雅关闭Executor]
    J --> K[清理缓存数据]
    K --> C
    
    G --> C
    
    style E fill:#e8f5e8
    style F fill:#ffebee
    style C fill:#e1f5fe

动态分配配置

// 动态资源分配配置
spark.dynamicAllocation.enabled=true

// 初始Executor数量
spark.dynamicAllocation.initialExecutors=2
spark.dynamicAllocation.minExecutors=1  
spark.dynamicAllocation.maxExecutors=20

// 扩容策略
spark.dynamicAllocation.schedulerBacklogTimeout=1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s
spark.dynamicAllocation.executorAllocationRatio=1

// 缩容策略
spark.dynamicAllocation.executorIdleTimeout=60s
spark.dynamicAllocation.cachedExecutorIdleTimeout=infinity

// 开启External Shuffle Service
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337

RPC通信机制

Spark采用基于Netty的RPC框架进行组件间通信,支持异步消息传递和背压控制。

RPC通信架构

graph TD
    subgraph "Driver节点"
        A[SparkContext]
        B[RpcEnv]
        C[NettyRpcEnv]
        D[TransportContext]
    end

    
    
    subgraph "Executor节点"
        E[CoarseGrainedExecutorBackend]
        F[RpcEnv]
        G[NettyRpcEnv]
        H[TransportContext]
    end
    
    A --> B
    B --> C
    C --> D
    D -.->|网络通信| H
    H --> G
    G --> F
    F --> E
    
    style A fill:#e1f5fe
    style E fill:#e8f5e8
    style D fill:#fff3e0
    style H fill:#fff3e0

RPC消息类型

消息类型方向用途示例
LaunchTaskDriver→Executor任务启动分发计算任务
StatusUpdateExecutor→Driver状态更新任务完成/失败
KillTaskDriver→Executor任务终止取消运行中任务
RegisterExecutorExecutor→Driver注册Executor启动注册
RemoveExecutorDriver→Executor移除动态资源缩容
// RPC消息定义
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

// Driver发送给Executor的消息
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean, reason: String)
  extends CoarseGrainedClusterMessage
case object StopExecutor extends CoarseGrainedClusterMessage

// Executor发送给Driver的消息  
case class RegisterExecutor(
    executorId: String,
    executorRef: RpcEndpointRef,
    hostname: String,
    cores: Int,
    logUrls: Map[String, String])
  extends CoarseGrainedClusterMessage

case class StatusUpdate(
    executorId: String, 
    taskId: Long, 
    state: TaskState, 
    data: SerializableBuffer)
  extends CoarseGrainedClusterMessage

// RPC通信实现
class CoarseGrainedExecutorBackend(
    override val rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv,
    resourcesFileOpt: Option[String],
    resourceProfile: ResourceProfile)
  extends IsolatedRpcEndpoint with ExecutorBackend with Logging {

  private var driver: Option[RpcEndpointRef] = None
  private var executor: Executor = null
  
  override def onStart(): Unit = {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // 向Driver注册Executor
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      case Success(msg) =>
        // 注册成功,创建Executor
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
    }(ThreadUtils.sameThread)
  }
  
  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)
      }
      
    case KillTask(taskId, _, interruptThread, reason) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
      } else {
        executor.killTask(taskId, interruptThread, reason)
      }
  }
  
  override def statusUpdate(taskId: Long, state: TaskState, data: SerializableBuffer): Unit = {
    val resources = executor.runningTasks.get(taskId).map(_.taskDescription.resources)
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }
}

序列化与网络传输

Spark支持多种序列化方式,序列化性能直接影响网络传输和存储效率。

序列化器对比

序列化器速度压缩率CPU占用适用场景
Java默认配置
Kryo生产环境推荐
Avro中等很好中等跨语言兼容
// 序列化配置
// 使用Kryo序列化器
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=MyKryoRegistrator
spark.kryo.unsafe=true

// 自定义Kryo注册器
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClass])
    kryo.register(classOf[AnotherClass])
  }
}

// 序列化实现  
class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {
  
  private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
  private val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m") 
  private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
  private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
  private val userRegistrators = conf.get("spark.kryo.registrator", "")
    .split(',').filter(_.nonEmpty)
  
  def newKryo(): Kryo = {
    val instantiator = new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())
    val kryo = new Kryo()
    kryo.setInstantiatorStrategy(instantiator)
    kryo.setRegistrationRequired(registrationRequired)
    kryo.setReferences(referenceTracking)
    
    // 注册常用类
    kryo.register(classOf[Array[Byte]])
    kryo.register(None.getClass)
    kryo.register(Nil.getClass)
    
    // 用户自定义注册器
    for (registrator <- userRegistrators) {
      val reg = Utils.classForName(registrator).getConstructor().newInstance()
        .asInstanceOf[KryoRegistrator]
      reg.registerClasses(kryo)
    }
    
    kryo
  }
  
  override def newInstance(): SerializerInstance = {
    new KryoSerializerInstance(this, bufferSizeKb, maxBufferSizeMb)
  }
}

Spark SQL与Catalyst

Spark SQL概述

Spark SQL 是Spark用于处理结构化数据的模块,提供了DataFrame和Dataset API。

主要特性

  • 统一数据访问:支持多种数据源
  • Hive兼容性:完全兼容Hive SQL
  • 优化执行:Catalyst优化器
  • 代码生成:运行时代码生成

Catalyst优化器

Catalyst 是Spark SQL的查询优化引擎,基于Scala的函数式编程和模式匹配构建,是Spark SQL高性能的核心保障。

Catalyst架构原理

Catalyst核心组件架构

组件功能实现技术关键特性输入输出
ParserSQL解析ANTLR4语法解析器语法检查、AST生成SQL字符串未解析逻辑计划
Analyzer语义分析规则引擎类型检查、字段解析未解析逻辑计划已解析逻辑计划
Optimizer逻辑优化规则+成本优化谓词下推、Join重排序已解析逻辑计划优化逻辑计划
Planner物理计划策略模式算法选择、计划生成优化逻辑计划物理计划
CodeGen代码生成Scala代码生成全阶段代码生成物理计划可执行代码

优化流程详解

完整优化流程图

graph TD
    A[SQL/DataFrame API] --> B[SQL Parser]
    B --> C[未解析逻辑计划<br/>Unresolved Logical Plan]
    C --> D[Analyzer分析器]
    D --> E[已解析逻辑计划<br/>Resolved Logical Plan]
    E --> F[Catalyst优化器]
    F --> G[优化逻辑计划<br/>Optimized Logical Plan]
    G --> H[SparkPlanner规划器]
    H --> I[物理计划候选<br/>Physical Plan Candidates]
    I --> J[成本评估<br/>Cost-based Optimizer]
    J --> K[最优物理计划<br/>Selected Physical Plan]
    K --> L[代码生成<br/>Code Generation]
    L --> M[可执行代码<br/>Generated Code]
    M --> N[RDD执行]
    
    %% 详细子流程
    F --> F1[规则优化器<br/>RuleExecutor]
    F1 --> F2[谓词下推<br/>Predicate Pushdown]
    F1 --> F3[投影剪枝<br/>Column Pruning]
    F1 --> F4[常量折叠<br/>Constant Folding]
    F1 --> F5[Join重排序<br/>Join Reordering]
    
    H --> H1[Join策略选择]
    H --> H2[聚合策略选择]
    H --> H3[排序策略选择]
    
    style A fill:#f9f9f9
    style F fill:#e1f5fe
    style J fill:#fff3e0
    style L fill:#e8f5e8
    style N fill:#c8e6c9

各阶段详细说明

  1. 解析阶段(Parsing)

    // SQL解析示例
    val sqlText = "SELECT name, age FROM users WHERE age > 25"
    val parser = new SparkSqlParser()
    val logicalPlan = parser.parsePlan(sqlText)
    
  2. 分析阶段(Analysis)

    // 语义分析过程
    class Analyzer(catalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
      val batches = Batch("Resolution", fixedPoint,
        ResolveReferences ::
        ResolveRelations ::
        ResolveFunctions ::
        ResolveAliases ::
        Nil) :: Nil
    }
    
  3. 优化阶段(Optimization)

    • 规则优化:基于预定义规则进行转换
    • 成本优化:基于统计信息选择最优策略
  4. 规划阶段(Planning)

    • 将逻辑计划转换为可执行的物理计划
    • 选择具体的执行算法和策略

核心优化规则

Catalyst主要优化规则分类表

优化类别规则名称优化原理性能提升应用场景示例
谓词优化Predicate Pushdown将过滤条件下推至数据源减少数据传输有WHERE条件的查询下推到Parquet文件
谓词优化Predicate Pullup将过滤条件上提合并减少重复计算复杂子查询Union后的过滤合并
投影优化Column Pruning剪枝不需要的列减少IO和内存宽表查询只读取需要的列
投影优化Projection Pushdown投影操作下推减少数据量嵌套查询向子查询推送列选择
常量优化Constant Folding编译时计算常量表达式减少运行时计算包含常量运算1+23
常量优化Constant Propagation传播常量值简化表达式有等值条件col=5 AND col>3col=5
Join优化Join Reordering重排序Join顺序减少中间数据多表Join基于表大小和选择性
Join优化Broadcast Join广播小表避免Shuffle大小表Join自动广播小于10MB的表
子查询优化Subquery Elimination消除不必要子查询简化执行计划相关子查询将子查询转换为Join
空值优化Null Propagation传播NULL值提前短路包含NULL的表达式NULL AND exprNULL

深入优化规则实现

1. 谓词下推实现原理

// 谓词下推规则实现
object PushDownPredicates extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case filter @ Filter(condition, child) =>
      pushDownPredicate(condition, child)
  }
  
  private def pushDownPredicate(condition: Expression, plan: LogicalPlan): LogicalPlan = {
    plan match {
      case Project(projectList, grandChild) =>
        // 将过滤条件下推到投影操作之下
        val newCondition = replaceAlias(condition, aliasMap(projectList))
        Project(projectList, Filter(newCondition, grandChild))
        
      case Join(left, right, joinType, joinCondition) =>
        // 将过滤条件分解并分别推送到Join的左右子树
        val (leftFilters, rightFilters, remainingFilters) = 
          splitConjunctivePredicates(condition, left, right)
        
        val newLeft = if (leftFilters.nonEmpty) {
          Filter(leftFilters.reduce(And), left)
        } else left
        
        val newRight = if (rightFilters.nonEmpty) {
          Filter(rightFilters.reduce(And), right)
        } else right
        
        val newJoin = Join(newLeft, newRight, joinType, joinCondition)
        
        if (remainingFilters.nonEmpty) {
          Filter(remainingFilters.reduce(And), newJoin)
        } else newJoin
        
      case _ => Filter(condition, plan)
    }
  }
}

2. Join优化策略

// Join策略选择
object JoinStrategySelection extends Strategy {
  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
      // 1. 检查是否可以使用广播Join
      if (canBroadcast(left)) {
        BroadcastHashJoinExec(leftKeys, rightKeys, joinType, 
          BuildLeft, condition, planLater(left), planLater(right)) :: Nil
      } else if (canBroadcast(right)) {
        BroadcastHashJoinExec(leftKeys, rightKeys, joinType,
          BuildRight, condition, planLater(left), planLater(right)) :: Nil
      }
      // 2. 使用SortMergeJoin
      else {
        SortMergeJoinExec(leftKeys, rightKeys, joinType,
          condition, planLater(left), planLater(right)) :: Nil
      }
    case _ => Nil
  }
  
  private def canBroadcast(plan: LogicalPlan): Boolean = {
    plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
  }
}

成本优化器(CBO)

成本优化器工作原理

组件功能成本因子算法应用场景
统计收集收集表和列统计信息行数、数据大小、基数采样统计所有表
基数估算估算中间结果大小选择性、相关性直方图分析过滤和Join
成本计算计算执行计划成本CPU、IO、网络成本模型计划选择
计划选择选择最优执行计划总成本最小动态规划多表Join

CBO配置与使用

// 启用成本优化器
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.cbo.planStats.enabled", "true")

// 收集表统计信息
sql("ANALYZE TABLE users COMPUTE STATISTICS")
sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS user_id, order_date, amount")

// 查看统计信息
sql("DESCRIBE EXTENDED users").show()

代码生成引擎

Whole-Stage Code Generation原理

graph TD
    A[物理计划树] --> B[代码生成遍历]
    B --> C[表达式代码生成]
    B --> D[算子代码生成]
    C --> E[组合生成单一函数]
    D --> E
    E --> F[Java代码编译]
    F --> G[字节码加载]
    G --> H[高效执行]
    
    style A fill:#f9f9f9
    style E fill:#e1f5fe
    style F fill:#fff3e0
    style H fill:#e8f5e8

代码生成示例

// 原始查询
df.select($"id", $"name", $"age" + 1).filter($"age" > 25)

// 生成的代码(简化版)
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends UnsafeProjection {
/* 006 */   private UnsafeRow result;
/* 007 */   private BufferHolder holder;
/* 008 */   private UnsafeRowWriter writer;
/* 009 */
/* 010 */   public SpecificUnsafeProjection(Object[] references) {
/* 011 */     result = new UnsafeRow(3);
/* 012 */     holder = new BufferHolder(result, 32);
/* 013 */     writer = new UnsafeRowWriter(holder, 3);
/* 014 */   }
/* 015 */
/* 016 */   public UnsafeRow apply(InternalRow i) {
/* 017 */     boolean isNull_0 = i.isNullAt(2);
/* 018 */     int value_0 = isNull_0 ? -1 : (i.getInt(2));
/* 019 */     if (!(!(isNull_0) && value_0 > 25)) return null; // filter condition
/* 020 */
/* 021 */     holder.reset();
/* 022 */     writer.zeroOutNullBytes();
/* 023 */     
/* 024 */     // id
/* 025 */     if (i.isNullAt(0)) {
/* 026 */       writer.setNullAt(0);
/* 027 */     } else {
/* 028 */       writer.write(0, i.getLong(0));
/* 029 */     }
/* 030 */     
/* 031 */     // name  
/* 032 */     if (i.isNullAt(1)) {
/* 033 */       writer.setNullAt(1);
/* 034 */     } else {
/* 035 */       writer.write(1, i.getUTF8String(1));
/* 036 */     }
/* 037 */     
/* 038 */     // age + 1
/* 039 */     if (isNull_0) {
/* 040 */       writer.setNullAt(2);
/* 041 */     } else {
/* 042 */       writer.write(2, value_0 + 1);
/* 043 */     }
/* 044 */     
/* 045 */     result.pointTo(holder.buffer, holder.totalSize());
/* 046 */     return result;
/* 047 */   }
/* 048 */ }

代码生成优势对比

执行方式虚拟函数调用条件分支数据访问内存管理性能提升
解释执行多层虚拟调用大量if-else通过接口访问大量临时对象基线
代码生成直接调用编译优化直接内存访问复用对象2-10倍

自适应查询执行(AQE)

AQE关键特性

特性优化目标工作原理触发条件性能收益
动态分区合并减少小文件运行时合并小分区分区大小 < 阈值提高并发度
动态Join策略选择最优Join运行时切换Join类型表大小 < 广播阈值避免Shuffle
动态倾斜处理解决数据倾斜拆分大分区分区大小差异大提升整体速度
// AQE配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

优化器扩展与定制

自定义优化规则

// 自定义优化规则示例
case class RemoveRedundantFilters() extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(condition1, Filter(condition2, child)) =>
      // 合并两个相邻的Filter
      Filter(And(condition1, condition2), child)
      
    case Filter(Literal(true, BooleanType), child) =>
      // 移除恒为true的过滤条件
      child
      
    case Filter(Literal(false, BooleanType), child) =>
      // 将恒为false的过滤转换为空结果
      LocalRelation(child.output, data = Seq.empty, isStreaming = child.isStreaming)
  }
}

// 注册自定义规则
spark.experimental.extraOptimizations = Seq(RemoveRedundantFilters())

Catalyst优化器的核心优势

  1. 可扩展性:基于规则的设计易于扩展
  2. 类型安全:Scala类型系统保证正确性
  3. 高性能:代码生成技术带来显著性能提升
  4. 智能化:成本优化器自动选择最优策略
  5. 统一性:支持SQL、DataFrame、Dataset的统一优化

通过深入理解Catalyst优化器的工作原理,可以编写更高效的Spark SQL应用,并在必要时通过自定义规则进一步提升性能。

SparkSQL 实用函数与语法

日期与时间处理

-- 获取当前时间戳
SELECT current_timestamp() as current_time,
       unix_timestamp() as unix_timestamp;

-- 时间戳转换
SELECT from_unixtime(1640995200) as datetime,
       unix_timestamp('2022-01-01 00:00:00') as timestamp;
      --  date_format(to_date(create_time), 'yyyyMMdd')  // 默认标准格式的时间戳

-- 日期加减操作
SELECT date_add('2022-01-01', 7) as next_week,
       date_sub('2022-01-01', 7) as last_week,
       add_months('2022-01-01', 1) as next_month;

-- 日期差计算
SELECT datediff('2022-01-15', '2022-01-01') as days_diff,
       months_between('2022-03-01', '2022-01-01') as months_diff;

-- 日期格式化
SELECT date_format('2022-01-01', 'yyyy-MM-dd') as formatted_date,
       to_date('2022-01-01') as date_type;

-- 注意传入的是字符串,而不是int类型,会报错
SELECT date_format(to_date('20250506', 'yyyyMMdd'), 'yyyy-MM-dd') AS fmt

字符串处理

-- 字符串连接
SELECT concat('Hello', ' ', 'World') as greeting,
       concat_ws('-', '2022', '01', '01') as date_str;

concat的内容有null的话返回的是nullconcat_ws会跳过null值
scala> spark.sql("select concat('dd','fd',12,null),concat_ws('-','ddd',12,null,44)").show
+------------------------+-------------------------------+
|concat(dd, fd, 12, NULL)|concat_ws(-, ddd, 12, NULL, 44)|
+------------------------+-------------------------------+
|                    null|                      ddd-12-44|
+------------------------+-------------------------------+

-- 字符串截取
注意第一位索引是1,而不是0   
SELECT substring('Hello World', 1, 5) as substring,
       substr('Hello World', 7) as substr_from_7;

-- 字符串替换
SELECT replace('Hello World', 'World', 'Spark') as replaced,
       regexp_replace('Hello123World', '\\d+', '') as no_digits;

-- 正则表达式提取
SELECT regexp_extract('Hello123World456', '(\\d+)', 1) as first_number,
       regexp_extract('[email protected]', '([^@]+)@([^@]+)', 1) as username;

-- 字符串分割
SELECT split('a,b,c,d', ',') as split_array,
       size(split('a,b,c,d', ',')) as array_size;

scala> spark.sql("select split('1.32.252.0','\\\\.')[0]").show
+----------------------------+
|split(1.32.252.0, \., -1)[0]|
+----------------------------+
|                           1|
+----------------------------+


16进制和十进制互转
mac加一位,减一位

scala> spark.sql("select lower(conv(cast(conv('1c1a1b64e2f9', 16, 10) as long)+1,10,16)) mac1,lower(conv(cast(conv('1c1a1b64e2f9', 16, 10) as long)-1,10,16)) mac2 ").show
+------------+------------+
|        mac1|        mac2|
+------------+------------+
|1c1a1b64e2fa|1c1a1b64e2f8|
+------------+------------+

数组与集合操作

-- 数组创建,索引从0开始,substr函数索引从1开始 array[0] ,方括号
SELECT array(1, 2, 3) as simple_array,
       array_contains(array(1, 2, 3), 2) as contains_2;

-- spark 字符串生成数组,分割字符串有转义符号的要多转义一次,有4个反斜杠,在ck中不需要
woi.filter("ys_duplicate_flag=1").withColumn("geohash7",expr("split(detail,'\\\\x01')")).select("detail","geohash7").show(false)

-- 数组展开(行转列)
SELECT explode(array(1, 2, 3)) as exploded_value;


-- 数组聚合(列转行)
SELECT collect_list(column_name) as list_agg,
       collect_set(column_name) as set_agg;

-- 数组交集  返回1或者0
SELECT arrays_overlap(array(1, 2, 3), array(2, 3, 4)) as has_overlap;

-- 复杂数组操作
SELECT 
    id,
    explode(split(tags, ',')) as tag
FROM user_tags;

-- collect字段转数组
val ip_arr=ip_all.select("ip_range").as[String].collect

-- 数组创建,打平,去重  flatten入参是二维数组
scala> spark.sql("select array_distinct(flatten(array(array(1,2,4),array(2,3,4))))").show
+--------------------------------------------------------------+
|array_distinct(flatten(array(array(1, 2, 4), array(2, 3, 4))))|
+--------------------------------------------------------------+
|                                                  [1, 2, 4, 3]|
+--------------------------------------------------------------+

--- 数组合并  array_union  会去重
scala> spark.sql("select array_union(array(1,2,4),array(2,3,4)) as arr").show
+------------+
|         arr|
+------------+
|[1, 2, 4, 3]|
+------------+

--- 数组交集 array_intersect
scala> spark.sql("select array_intersect(array(1, 2, 3), array(2, 3, 4))").show
+-----------------------------------------------+
|array_intersect(array(1, 2, 3), array(2, 3, 4))|
+-----------------------------------------------+
|                                         [2, 3]|
+-----------------------------------------------+

select array_distinct(filter(array_union(array(dim_geohash),transform(split(geo_list,','),x->split(x,'#')[1])),x->length(x)=12))

-- 数组去重、过滤、转换
select filter(array_distinct(transform(ssids,x->substr(split(x,'#')[1],1,6))),x->length(x)>0)

-- 数组排序、反转、切分
scala> spark.sql("Select slice(reverse(array_sort(array(4,3,7,10,6,2,1,8))),1,5) data").show(false)
+----------------+
|data            |
+----------------+
|[10, 8, 7, 6, 4]|
+----------------+


-- 数组包含
-- 如果 array 里有 NULL,而你查的元素不是 NULL,则 NULL 不会影响结果。
-- 如果 element 是 NULL,那么结果就是:只要 array 里有 NULL,就返回 true;否则返回 false。
SELECT array_contains(array('a', 'b', 'c'), 'd');
-- false
SELECT array_contains(array(1, NULL, 3), NULL);
-- true

JSON处理

-- JSON解析
SELECT get_json_object('{"name": "John", "age": 30}', '$.name') as name,
       get_json_object('{"name": "John", "age": 30}', '$.age') as age;

-- JSON数组处理
SELECT json_array_length('[1, 2, 3, 4]') as array_length;

-- 转换为JSON
SELECT to_json(struct('John' as name, 30 as age)) as json_string;

-- 复杂JSON操作
SELECT 
    user_id,
    get_json_object(profile, '$.email') as email,
    get_json_object(profile, '$.address.city') as city
FROM user_profiles;

条件与判断

-- CASE WHEN语句
SELECT 
    name,
    age,
    CASE 
        WHEN age < 18 THEN '未成年'
        WHEN age < 30 THEN '青年'
        WHEN age < 50 THEN '中年'
        ELSE '老年'
    END as age_group
FROM users;

-- IF函数
SELECT 
    name,
    IF(age >= 18, '成年', '未成年') as adult_status,
    IFNULL(email, '无邮箱') as email_info
FROM users;

-- COALESCE函数
SELECT 
    user_id,
    COALESCE(nickname, real_name, 'Unknown') as display_name
FROM user_info;

唯一ID生成方法

在Spark SQL中,生成唯一ID是常见需求,特别是在数据去重、主键生成、数据标识等场景。以下介绍几种常用的唯一ID生成方法及其特点对比。

-- 1. 单调递增ID (monotonically_increasing_id)
SELECT 
    monotonically_increasing_id() as row_id,
    name,
    email
FROM users;

-- 2. UUID生成
SELECT 
    uuid() as unique_id,
    name,
    email
FROM users;

-- 3. Hash函数生成
SELECT 
    hash(name, email) as hash_id,
    xxhash64(name, email) as xxhash_id,
    name,
    email
FROM users;

-- 4. MD5哈希
SELECT 
    md5(concat(name, email, cast(unix_timestamp() as string))) as md5_id,
    name,
    email
FROM users;

-- 5. SHA系列哈希
SELECT 
    sha1(concat(name, email)) as sha1_id,
    sha2(concat(name, email), 256) as sha256_id,
    name,
    email
FROM users;

-- 6. 基于时间戳的ID生成
SELECT 
    concat(cast(unix_timestamp() as string), '_', 
           cast(monotonically_increasing_id() as string)) as timestamp_id,
    name,
    email
FROM users;

-- 7. 自定义格式ID生成
SELECT 
    concat('USER_', lpad(cast(row_number() OVER (ORDER BY name) as string), 8, '0')) as custom_id,
    name,
    email
FROM users;

唯一ID生成方法对比

方法存储空间性能唯一性保证重复概率预期重复数据量可读性排序性适用场景
monotonically_increasing_id()8字节(Long)极高全局唯一0%永不重复行标识、排序需求
uuid()36字节(String)极高~10^-3710^28亿分布式环境、外部系统对接
hash()8字节(Long)极高数据相同则相同1/2^64 ≈ 5.4×10^-201844亿数据去重、快速查找
xxhash64()8字节(Long)极高数据相同则相同1/2^64 ≈ 5.4×10^-201844亿高性能哈希、分桶
md5()32字节(String)极高1/2^128 ≈ 2.9×10^-393.4×10^29亿数据完整性校验、去重
sha1()40字节(String)极高1/2^160 ≈ 6.8×10^-491.5×10^39亿安全性要求较高场景
sha2(256)64字节(String)极高1/2^256 ≈ 8.6×10^-781.2×10^68亿高安全性要求
timestamp_id可变取决于时间精度毫秒级:1000万条/秒时间相关业务、调试
custom_id可变依赖设计取决于设计方案取决于设计方案业务友好、人工可读

使用建议

  1. 性能优先场景:使用 monotonically_increasing_id()hash()/xxhash64()
  2. 分布式环境:推荐使用 uuid(),保证跨系统唯一性
  3. 数据去重:使用 hash()md5(),基于数据内容生成
  4. 安全敏感:使用 sha2(256) 或更高级别的哈希算法
  5. 业务可读性:使用自定义格式ID,包含业务前缀和序号
  6. 存储敏感:优先选择数值类型ID(8字节),避免长字符串

注意事项

-- monotonically_increasing_id() 注意点
-- 1. 只保证单调递增,不保证连续
-- 2. 在不同分区间可能有大的跳跃
-- 3. 重新运行查询会生成不同的ID

-- hash函数的碰撞处理
SELECT 
    name, email,
    hash(name, email) as hash_id,
    -- 组合多字段降低碰撞概率
    hash(name, email, cast(created_time as string)) as enhanced_hash
FROM users;

-- UUID的格式化处理
SELECT 
    uuid() as full_uuid,
    replace(uuid(), '-', '') as compact_uuid,
    substring(uuid(), 1, 8) as short_uuid
FROM users;

窗口函数

-- ROW_NUMBER() 行号
SELECT 
    name,
    salary,
    ROW_NUMBER() OVER (ORDER BY salary DESC) as rank
FROM employees;

-- RANK() 排名(相同值相同排名,跳过)
SELECT 
    name,
    salary,
    RANK() OVER (ORDER BY salary DESC) as rank
FROM employees;

-- DENSE_RANK() 密集排名(相同值相同排名,不跳过)
SELECT 
    name,
    salary,
    DENSE_RANK() OVER (ORDER BY salary DESC) as rank
FROM employees;

-- LAG/LEAD 前后值
SELECT 
    date,
    sales,
    LAG(sales, 1) OVER (ORDER BY date) as prev_sales,
    LEAD(sales, 1) OVER (ORDER BY date) as next_sales
FROM daily_sales;

-- 分区窗口函数
SELECT 
    department,
    name,
    salary,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank
FROM employees;

聚合函数

-- 基本聚合
SELECT 
    department,
    COUNT(*) as employee_count,
    AVG(salary) as avg_salary,
    SUM(salary) as total_salary,
    MIN(salary) as min_salary,
    MAX(salary) as max_salary
FROM employees
GROUP BY department;

-- 条件聚合
SELECT 
    department,
    COUNT(CASE WHEN gender = 'M' THEN 1 END) as male_count,
    COUNT(CASE WHEN gender = 'F' THEN 1 END) as female_count,
    AVG(CASE WHEN age > 30 THEN salary END) as senior_avg_salary
FROM employees
GROUP BY department;

-- 分析函数

type27Raw.groupBy("priority","uuid").count.groupBy("priority").agg(
  avg("count").as("avg_num"),
  -- 求分位数
  percentile_approx(col("count"), array(lit(0.25), lit(0.5), lit(0.75)), lit(1000)).as("percentage"),
  -- 偏度 0完全对称; >0 右偏; <0 左偏 ;> 1 或 < -1 明显偏态
  skewness("count").as("skewness"),
  -- 峰度 0 正态分布 >0 尾部更重、峰更尖(极端值更多) <0 尾更轻、峰更平(更平均)
  -- kurtosis 越大,不代表极端值“数量”多,而是“出现极端值的概率分布更重”。
  kurtosis("count").as("kurtosis")
  
  ).sort("priority").show(false)


-- 去重聚合
SELECT 
    department,
    COUNT(DISTINCT employee_id) as unique_employees
FROM employees
GROUP BY department;

UDF/UDAF 注册与使用

-- 解析put2String
import com.glab.function.utils.Put2StringUtil;
import scala.collection.JavaConverters._
val put2string_read = udf((arg1: String,arg2: String) => Put2StringUtil.read(arg1, arg2).asScala.toSeq)
spark.udf.register("put2string_read",put2string_read);

-- 有rowqualifiervaluecf
lbs.withColumn("row",expr("put2string_read(value,'row')[0]")).withColumn("qualifier",expr("put2string_read(value,'qualifier')[0]")).withColumn("value",expr("put2string_read(value,'value')[0]")).show(false)

# ipv4转long
spark.sql("create function ipv4ToLong   as 'com.glab.function.ip.IPV42Long' using jar 'hdfs://gt-ga-xs/tmp/wuxl/udf/ys-bi-udf-hive-function-0.0.0.jar'")

spark.sql("DROP FUNCTION IF EXISTS default.longToIpv4")
spark.sql("create function longToIpv4   as 'com.glab.function.ip.Long2Ipv4' using jar 'hdfs://gt-ga-xs/tmp/wuxl/udf/ys-bi-udf-hive-function-0.0.0.jar'")

--ipv6 补全
def expandIpv6(ip: String): String = {
    import java.net.InetAddress
    if (ip == null) return null
    // 去掉 zone id(例如 "%eth0")
    val noZone = ip.split("%")(0).trim
    if (noZone.isEmpty) return null
    
    try {
      val addr = InetAddress.getByName(noZone)
      val bytes = addr.getAddress
      // 只有 IPv6 才处理(长度应为16)
      if (bytes == null || bytes.length != 16) return null
      
      // 两字节一组,格式化为4位16进制(小写)
      val sb = new StringBuilder(39) // 8*4 + 7 ':' = 39 chars
      for (i <- 0 until 8) {
        if (i > 0) sb.append(':')
        val hi = bytes(2 * i) & 0xff
        val lo = bytes(2 * i + 1) & 0xff
        val v = (hi << 8) | lo
        sb.append(f"$v%04x") // 小写,宽度4,不足补0
      }
      sb.toString
    } catch {
      case _: Throwable => null
    }
}
spark.udf.register("expandIpv6", expandIpv6 _)

-- ip段打平
def ip_range(minip: String, maxip: String) = {
  import scala.collection.immutable
  val minIpArray = minip.split("\\.")
  val maxIpArray = maxip.split("\\.")
  if (minip == maxip) {
    immutable.Seq(minip)
  } else {
    val arr: immutable.Seq[String] = for {
      i <- minIpArray(2).toInt to maxIpArray(2).toInt
    } yield minIpArray(0) + "." + minIpArray(1) + "." + i
    arr
  }
}
spark.udf.register("ip_range_arr", ip_range _)

scala> spark.sql("select ip_range_arr('5.28.32','5.28.33')").show
+------------------------------+
|ip_range_arr(5.28.32, 5.28.33)|
+------------------------------+
|            [5.28.32, 5.28.33]|
+------------------------------+

// 校验日期是否合法
val isValidDateTime = udf((s: String) => {
import org.apache.spark.sql.api.java.UDF1;
import java.text.SimpleDateFormat;
  if (s == null )   false
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  format.setLenient(false) // 严格模式,避免 2025-13-40 这种通过
  try {
    format.parse(s)
    true
  } catch {
    case _: Exception => false
  }
})

// 注册UDF
spark.udf.register("isValidDateTime", isValidDateTime)

<!-- mac前缀变换 -->
val expandMacUdf = udf((mac: String) => {
  if (mac == null || mac.length != 12)  Set.empty[String]
  // MAC 转大写
  val baseMac = mac.toUpperCase
  // 遍历 00 ~ FF 的十六进制组合
  val hexList = (0 until 256).map(i => f"$i%02X")
  val prefixVariants = hexList.map(p => p + baseMac.drop(2))
  val suffixVariants = hexList.map(s => baseMac.dropRight(2) + s)
  // 去重合并
  (prefixVariants ++ suffixVariants).map(_.toLowerCase).toSet
})
spark.udf.register("expandMacUdf",expandMacUdf)

注册与调用要点

  • SQL 注册:spark.udf.register("fn", func) 后即可在 SQL 中 select fn(col) ...
  • DataFrame API:F.udf(func) 后用 select(F.callUDF("fn", col))expr("fn(col)")
  • UDAF(Scala):实现 UserDefinedAggregateFunction 或使用 Aggregator(typed)。
  • PySpark 性能优先:尽量使用内置函数或 pandas_udf 代替普通 UDF。

性能与注意事项

  • 尽量优先使用内置函数(Catalyst 可优化);UDF 一旦进入黑盒,优化受限。
  • PySpark 普通 UDF 有 Python-JVM 边界开销;pandas_udf 借助 Arrow 可以显著提升性能。
  • 指定 UDF 返回类型,避免推断导致的性能/兼容问题。
  • 复杂逻辑可考虑 SQL CASE WHENtransform/filter/aggregate 高阶函数替代。

性能调优与优化

写出排序优化

写出ck的时候要对数据按照不同节点进行分区,然后分区内由按partition分part,同part内按主键排序,可以这样实现 分区:shard+分区字段;排序:分区内按主键字段的顺序排序;写出:按照shard分目录,对应ck节点

val data=dataframe.repartition("shard","part")
.sortWithinPartitions("id_md5","wifimac")

data.write.partitionBy("shard").parquet(oupath)

好处

  1. 这样不需要全局shuffle,只要分区内有序就好
  2. repartition的时候同一个part内的数据已经提前合到一起了
  3. partitionBy只使用shard字段,part内的数据不会乱序,不需要重新排

查询与作业优化

存储格式优化

推荐存储格式对比

格式压缩比查询速度写入速度适用场景
Parquet中等分析查询,列式存储
ORC很高很快Hive集成,高压缩比
Avro中等中等行式存储,Schema演进
JSON开发调试,灵活性高

Parquet格式优化

// 推荐使用Parquet格式
df.write.mode("overwrite").parquet("data.parquet")
val optimizedDF = spark.read.parquet("data.parquet")

// 配置压缩
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

// 设置列式读取批次大小
spark.conf.set("spark.sql.parquet.columnarReaderBatchSize", "10000")

// 启用向量化读取
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

ORC格式优化

// 使用ORC格式
df.write.format("orc").mode("overwrite").save("data.orc")

// ORC优化配置
spark.conf.set("spark.sql.orc.compression.codec", "snappy")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")

分区策略优化

时间分区策略

// 按时间分区(推荐)
df.write
  .partitionBy("year", "month", "day")
  .parquet("time_partitioned_data")

// 避免过度分区
val dailyData = df.withColumn("date", to_date($"timestamp"))
dailyData.write
  .partitionBy("date")
  .parquet("daily_partitioned_data")

// 动态分区裁剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.useStats", "true")

业务分区策略

// 按业务字段分区
df.write
  .partitionBy("region", "category")
  .parquet("business_partitioned_data")

// 分区数控制
val numPartitions = spark.conf.get("spark.sql.shuffle.partitions", "200").toInt
val optimalPartitions = Math.max(100, Math.min(numPartitions, 1000))

df.repartition(optimalPartitions, $"partition_key")
  .write
  .partitionBy("partition_key")
  .parquet("optimized_data")

分区裁剪优化

// 启用分区裁剪
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

// 查询时使用分区过滤
val result = spark.read.parquet("partitioned_data")
  .filter($"year" === 2023 && $"month" >= 6)  // 有效分区裁剪
  .select("id", "name", "value")

谓词下推优化

启用谓词下推

// 启用各种数据源的谓词下推
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.json.filterPushdown", "true")

// JDBC谓词下推
spark.conf.set("spark.sql.pushDownPredicate", "true")

优化示例

// 原始查询(未优化)
val df = spark.read.parquet("large_dataset.parquet")
val result = df.select("*").filter($"age" > 18 && $"city" === "Beijing")

// 优化后查询(谓词下推)
val result = spark.read.parquet("large_dataset.parquet")
  .filter($"age" > 18)  // 谓词下推到数据源
  .filter($"city" === "Beijing")  // 谓词下推到数据源
  .select("id", "name", "age")  // 列裁剪

复杂谓词优化

// 组合条件优化
val complexFilter = ($"age".between(18, 65)) && 
                   ($"salary" > 50000) && 
                   ($"department".isin("IT", "Finance"))

val result = spark.read.parquet("employee_data.parquet")
  .filter(complexFilter)  // 复杂谓词会被自动优化和下推
  .select("id", "name", "salary")

Join优化

Join策略选择

Join类型对比

Join类型适用场景优势劣势触发条件
Broadcast Join小表Join大表无Shuffle,性能最好小表必须能放入内存小表 < 10MB
Sort Merge Join大表Join大表内存友好,稳定需要Shuffle默认Join
Shuffle Hash Join中等表Join内存使用适中需要Shuffle中等数据量
Cartesian Join笛卡尔积简单性能极差无Join键

广播Join优化

自动广播配置

// 设置自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

// 启用自适应查询执行
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

手动广播优化

// 手动广播小表
val smallTable = spark.table("small_table")
val broadcastDF = broadcast(smallTable)
val result = largeTable.join(broadcastDF, "id")

// 强制广播(即使超过阈值)
val result = largeTable.join(broadcast(mediumTable), "id")

// 不等式广播join
com_ipv6.join(broadcast(ipv6),com_ipv6("public_ip")>=ipv6("minip") and com_ipv6("public_ip")<ipv6("maxip")).show(false)

// 广播变量优化
val lookupMap = smallTable.collect()
  .map(row => row.getAs[String]("key") -> row.getAs[String]("value"))
  .toMap
val broadcastMap = spark.sparkContext.broadcast(lookupMap)

// 广播大变量,然后udf方法使用该变量,进行本地不等式join
val ipv6_arr=ipv6.map(_.getAs[String]("minip")).collect.sorted
val ipv6_arr_broadcast = spark.sparkContext.broadcast(ipv6_arr)
// 注册 UDF
spark.udf.register("get_ipv6_range", (code: String) => {
  findIpv6Index(ipv6_arr_broadcast.value,code)
})

大变量用完之后销毁否则一直占用内存
broadcastMap.unpersist(true)

val enrichedData = largeTable.map { row =>
  val key = row.getAs[String]("key")
  val enrichValue = broadcastMap.value.getOrElse(key, "unknown")
  (row, enrichValue)
}

Sort Merge Join优化

预排序优化

如果join key有默认值不需要参与join,就设置为null,设置成其他值还是会被分发然后参与join的,null不参与

// 预先排序减少Shuffle成本
val sortedTable1 = table1.sort("join_key")
val sortedTable2 = table2.sort("join_key")
val result = sortedTable1.join(sortedTable2, "join_key")

// 使用分桶表
table1.write
  .bucketBy(10, "join_key")
  .sortBy("join_key")
  .saveAsTable("bucketed_table1")

table2.write
  .bucketBy(10, "join_key")
  .sortBy("join_key")
  .saveAsTable("bucketed_table2")

// 分桶表Join(无Shuffle)
val result = spark.table("bucketed_table1")
  .join(spark.table("bucketed_table2"), "join_key")

数据倾斜处理

倾斜检测

// 检测Join键分布
val keyDistribution = largeTable
  .groupBy("join_key")
  .count()
  .orderBy($"count".desc)

keyDistribution.show(20)  // 查看top 20的键分布

// 统计分析
val stats = keyDistribution.agg(
  avg("count").as("avg_count"),
  max("count").as("max_count"),
  min("count").as("min_count"),
  stddev("count").as("stddev_count")
)
stats.show()

//join前先对数据按照相同key repartition 
 spark.read.parquet("/tmp/logs/data/huangg/mac_uuid/20251015").withColumn("geo6_equal",expr("substr(type27_geohash,1,6)=substr(woi_geohash,1,6)")).withColumn("public_ip",expr("case when public_ip like '%:%' then expandIpv6(public_ip) else public_ip end")).withColumn("ipv4_minip",expr("case when public_ip like '%.%' then get_ipv4_range(ipv4ToLong(public_ip)) else -1 end")).withColumn("ipv6_minip",expr("case when public_ip like '%:%' then get_ipv6_range(public_ip) else '' end")).na.fill("").repartition(6000,col("ipv4_minip")).join(ipv4.repartition(6000,col("ipv4_minip")),Seq("ipv4_minip"),"left").repartition(6000,col("ipv6_minip")).join(ipv6.repartition(6000,col("ipv6_minip")),Seq("ipv6_minip"),"left").write.mode("overwrite").parquet("/tmp/logs/data/huangg/essid/ip/20251017")

倾斜解决方案

方案1:加盐处理

// 加盐Join
import scala.util.Random

// 给小表加盐
val saltedSmallTable = smallTable.flatMap { row =>
  (0 until 10).map { salt =>
    Row.fromSeq(row.toSeq :+ salt)
  }
}

// 给大表的倾斜键加随机盐
val saltedLargeTable = largeTable.map { row =>
  val key = row.getAs[String]("join_key")
  val salt = if (isSkewedKey(key)) Random.nextInt(10) else 0
  Row.fromSeq(row.toSeq :+ salt)
}

// 执行Join
val result = saltedLargeTable.join(saltedSmallTable, 
  Seq("join_key", "salt_column"))

方案2:倾斜键单独处理

// 识别倾斜键
val skewedKeys = Set("skewed_key_1", "skewed_key_2")

// 分离倾斜数据和正常数据
val normalData = largeTable.filter(!$"join_key".isin(skewedKeys.toSeq:_*))
val skewedData = largeTable.filter($"join_key".isin(skewedKeys.toSeq:_*))

// 正常数据正常Join
val normalResult = normalData.join(smallTable, "join_key")

// 倾斜数据使用广播Join
val skewedResult = skewedData.join(broadcast(smallTable), "join_key")

// 合并结果
val finalResult = normalResult.union(skewedResult)

方案3:两阶段聚合

// 预聚合阶段
val preAggregated = largeTable
  .withColumn("salt", (rand() * 10).cast("int"))
  .withColumn("salted_key", concat($"join_key", lit("_"), $"salt"))
  .groupBy("salted_key")
  .agg(sum("value").as("partial_sum"))

// 最终聚合阶段
val finalAggregated = preAggregated
  .withColumn("original_key", split($"salted_key", "_").getItem(0))
  .groupBy("original_key")
  .agg(sum("partial_sum").as("final_sum"))

缓存与持久化

存储级别选择

存储级别对比

存储级别内存使用磁盘使用序列化CPU开销适用场景
MEMORY_ONLY小数据集,频繁访问
MEMORY_AND_DISK中等大数据集,内存不足时
MEMORY_ONLY_SER内存紧张,CPU充足
DISK_ONLY中等大数据集,内存紧张
OFF_HEAP堆外中等减少GC压力

缓存策略优化

智能缓存策略

// 根据数据大小选择存储级别
def selectStorageLevel(dataSize: Long, memoryAvailable: Long): StorageLevel = {
  val ratio = dataSize.toDouble / memoryAvailable
  
  if (ratio < 0.3) {
    StorageLevel.MEMORY_ONLY  // 内存充足
  } else if (ratio < 0.8) {
    StorageLevel.MEMORY_ONLY_SER  // 内存紧张,序列化节省空间
  } else {
    StorageLevel.MEMORY_AND_DISK_SER  // 内存不足,溢出到磁盘
  }
}

// 应用缓存策略
val dataSize = rdd.map(_.toString.length).sum()
val storageLevel = selectStorageLevel(dataSize, availableMemory)
rdd.persist(storageLevel)

缓存生命周期管理

// 缓存管理器
class CacheManager {
  private val cachedRDDs = mutable.Map[String, RDD[_]]()
  
  def cache[T](name: String, rdd: RDD[T], level: StorageLevel = StorageLevel.MEMORY_AND_DISK): RDD[T] = {
    // 检查是否已缓存
    if (!cachedRDDs.contains(name)) {
      rdd.persist(level)
      cachedRDDs(name) = rdd
      println(s"Cached RDD: $name")
    }
    rdd
  }
  
  def uncache(name: String): Unit = {
    cachedRDDs.get(name).foreach { rdd =>
      rdd.unpersist()
      cachedRDDs.remove(name)
      println(s"Uncached RDD: $name")
    }
  }
  
  def clearAll(): Unit = {
    cachedRDDs.values.foreach(_.unpersist())
    cachedRDDs.clear()
    println("Cleared all cached RDDs")
  }
}

Checkpoint优化

Checkpoint策略

// 设置checkpoint目录
spark.sparkContext.setCheckpointDir("hdfs://namenode:8020/checkpoint")

// 智能checkpoint
def smartCheckpoint[T](rdd: RDD[T], lineageDepth: Int = 10): RDD[T] = {
  // 计算血缘深度
  def getLineageDepth(rdd: RDD[_]): Int = {
    if (rdd.dependencies.isEmpty) {
      1
    } else {
      1 + rdd.dependencies.map(_.rdd).map(getLineageDepth).max
    }
  }
  
  val currentDepth = getLineageDepth(rdd)
  if (currentDepth > lineageDepth) {
    println(s"Checkpointing RDD with lineage depth: $currentDepth")
    rdd.checkpoint()
  }
  rdd
}

// 使用示例
val deepRDD = rdd1.map(transform1)
  .filter(filter1)
  .join(rdd2)
  .map(transform2)
  .filter(filter2)

val checkpointedRDD = smartCheckpoint(deepRDD)

代码层面优化

算子选择优化

高效算子使用

// 使用reduceByKey替代groupByKey
val wordCounts = words.map(word => (word, 1))
  .reduceByKey(_ + _)  // 推荐:Map端预聚合

// 而不是
val wordCounts = words.map(word => (word, 1))
  .groupByKey()  // 不推荐:所有数据都要Shuffle
  .mapValues(_.sum)

// 使用mapPartitions减少函数调用开销
val result = rdd.mapPartitions { partition =>
  // 在分区级别初始化资源
  val connection = createConnection()
  val buffer = new ArrayBuffer[ProcessedRecord]()
  
  try {
    partition.foreach { record =>
      buffer += processWithConnection(record, connection)
    }
    buffer.iterator
  } finally {
    connection.close()
  }
}

// 使用aggregateByKey进行复杂聚合
val result = rdd.aggregateByKey((0, 0))(
  // 分区内聚合
  (acc, value) => (acc._1 + value, acc._2 + 1),
  // 分区间聚合
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (sum, count) => sum.toDouble / count }

数据结构优化

选择合适的数据结构

// 使用原始类型数组替代集合
class OptimizedProcessor {
  // 推荐:原始类型数组
  private val intArray = new Array[Int](1000000)
  
  // 不推荐:装箱类型集合
  private val intList = new ArrayBuffer[Integer]()
  
  // 使用专用的数据结构
  def processLargeDataset(data: RDD[String]): RDD[String] = {
    data.mapPartitions { partition =>
      val bloom = new BloomFilter[String](1000000, 0.01)
      val deduped = new mutable.HashSet[String]()
    
      partition.filter { item =>
        if (bloom.mightContain(item)) {
          if (deduped.contains(item)) {
            false  // 重复项
          } else {
            deduped += item
            true
          }
        } else {
          bloom.put(item)
          deduped += item
          true
        }
      }
    }
  }
}

内存使用优化

对象复用

// 对象池模式
class ObjectPool[T](createFunc: () => T, resetFunc: T => Unit) {
  private val pool = new mutable.Queue[T]()
  
  def borrow(): T = {
    pool.synchronized {
      if (pool.nonEmpty) {
        pool.dequeue()
      } else {
        createFunc()
      }
    }
  }
  
  def return(obj: T): Unit = {
    resetFunc(obj)
    pool.synchronized {
      pool.enqueue(obj)
    }
  }
}

// 使用对象池
val stringBuilderPool = new ObjectPool[StringBuilder](
  () => new StringBuilder(),
  _.clear()
)

val result = rdd.mapPartitions { partition =>
  partition.map { item =>
    val sb = stringBuilderPool.borrow()
    try {
      sb.append(item).append("_processed").toString
    } finally {
      stringBuilderPool.return(sb)
    }
  }
}

广播变量和累加器优化

广播变量最佳实践

// 大查找表广播
val lookupTableMap = smallTable.collect()
  .map(row => row.getString(0) -> row.getString(1))
  .toMap

val broadcastLookup = spark.sparkContext.broadcast(lookupTableMap)

val enrichedData = largeRDD.map { record =>
  val enrichValue = broadcastLookup.value.getOrElse(record.key, "unknown")
  EnrichedRecord(record, enrichValue)
}

// 记住及时释放
broadcastLookup.destroy()

累加器优化

// 自定义累加器
class HistogramAccumulator extends AccumulatorV2[Double, mutable.Map[String, Long]] {
  private val histogram = mutable.Map[String, Long]()
  
  override def isZero: Boolean = histogram.isEmpty
  
  override def copy(): HistogramAccumulator = {
    val newAcc = new HistogramAccumulator
    newAcc.histogram ++= this.histogram
    newAcc
  }
  
  override def reset(): Unit = histogram.clear()
  
  override def add(value: Double): Unit = {
    val bucket = getBucket(value)
    histogram(bucket) = histogram.getOrElse(bucket, 0L) + 1
  }
  
  override def merge(other: AccumulatorV2[Double, mutable.Map[String, Long]]): Unit = {
    other match {
      case o: HistogramAccumulator =>
        o.histogram.foreach { case (bucket, count) =>
          histogram(bucket) = histogram.getOrElse(bucket, 0L) + count
        }
    }
  }
  
  override def value: mutable.Map[String, Long] = histogram.toMap
  
  private def getBucket(value: Double): String = {
    if (value < 0) "negative"
    else if (value < 10) "0-10"
    else if (value < 100) "10-100"
    else "100+"
  }
}

// 使用自定义累加器
val histogramAcc = new HistogramAccumulator
spark.sparkContext.register(histogramAcc, "histogram")

rdd.foreach(value => histogramAcc.add(value))
println(s"Histogram: ${histogramAcc.value}")

网络与I/O优化

序列化优化

Kryo序列化配置

// Kryo配置
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")
spark.conf.set("spark.kryo.unsafe", "true")

// 注册常用类
val conf = new SparkConf()
conf.registerKryoClasses(Array(
  classOf[MyClass],
  classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
  classOf[org.apache.spark.sql.types.StructType],
  classOf[Array[org.apache.spark.sql.types.StructField]]
))

压缩优化

压缩算法选择

// 根据场景选择压缩算法
spark.conf.set("spark.io.compression.codec", "snappy")  // 平衡压缩比和速度
// spark.conf.set("spark.io.compression.codec", "lz4")     // 更快的压缩/解压
// spark.conf.set("spark.io.compression.codec", "gzip")    // 更高的压缩比

// 启用各种压缩
spark.conf.set("spark.broadcast.compress", "true")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.rdd.compress", "true")

网络调优

网络参数优化

# 网络超时设置
spark.conf.set("spark.network.timeout", "800s")
spark.conf.set("spark.rpc.askTimeout", "800s")
spark.conf.set("spark.rpc.lookupTimeout", "800s")

# 网络连接优化
spark.conf.set("spark.rpc.netty.dispatcher.numThreads", "8")
spark.conf.set("spark.shuffle.io.numConnectionsPerPeer", "3")
spark.conf.set("spark.shuffle.io.preferDirectBufs", "true")

# 传输优化
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.reducer.maxReqsInFlight", "Int.MaxValue")

常见性能问题

内存溢出问题

OOM问题诊断

// 1. Driver OOM
// 原因:collect()操作数据量过大
val result = largeRDD.collect()  // 危险操作

// 解决方案:使用take()或分批处理
val sample = largeRDD.take(1000)
largeRDD.foreachPartition { partition =>
  // 分区内处理,避免将所有数据拉到Driver
}

// 2. Executor OOM  
// 原因:单个分区数据过大
// 解决方案:增加分区数
val repartitionedRDD = rdd.repartition(numPartitions * 2)

数据倾斜解决方案

倾斜检测和解决

// 检测倾斜
def detectSkew[T](rdd: RDD[T]): Unit = {
  val partitionSizes = rdd.mapPartitionsWithIndex { (index, iter) =>
    Iterator((index, iter.size))
  }.collect()
  
  val maxSize = partitionSizes.maxBy(_._2)
  val avgSize = partitionSizes.map(_._2).sum / partitionSizes.length
  
  if (maxSize._2 > avgSize * 3) {
    println(s"数据倾斜警告:分区${maxSize._1}${maxSize._2}条记录,平均${avgSize}条")
  }
}

监控与诊断

Spark UI监控

关键监控指标

  • Jobs页面:作业执行时间、Stage信息
  • Stages页面:Stage执行详情、任务分布
  • Storage页面:RDD缓存使用情况
  • Executors页面:Executor资源使用情况
  • SQL页面:SQL查询执行计划

性能指标

核心性能指标

# 查看应用程序指标
curl http://driver-host:4040/api/v1/applications
curl http://driver-host:4040/api/v1/applications/[app-id]/jobs
curl http://driver-host:4040/api/v1/applications/[app-id]/stages
curl http://driver-host:4040/api/v1/applications/[app-id]/executors

Spark Web UI - SQL 页面解读

graph LR
  A["SQL 页面"] --> B["SQL 查询列表"]
  A --> C["图表区域 Graph"]
  A --> D["详情区域 Details"]
  
  B --> B1["Query ID / Duration / Submission Time"]
  B --> B2["Job/Stage 关联"]
  
  C --> C1["DAG 图视图"]
  C --> C2["节点颜色/尺寸/边箭头"]
  
  D --> D1["SQL 文本"]
  D --> D2["Parsed/Analyzed/Optimized"]
  D --> D3["Physical Plan"]
  • SQL 查询列表:每一条记录对应一次 SQL 执行(包括 DataFrame/Dataset API 被翻译的查询)。
    • Query ID:查询唯一标识;点击进入详情。
    • Duration:总时长;过长需排查倾斜、Shuffle 或 Scan 代价。
    • Jobs/Stages:该查询触发的 Job/Stage 数量与链接。
  • 图表区域 Graph(DAG):展示物理执行 DAG。
    • 节点:代表物理算子(如 BroadcastHashJoin、ShuffledHashJoin、SortMergeJoin、WholeStageCodegen、Project、Filter、Scan)。
    • 颜色/大小:通常暗示阶段边界、Shuffle、广播等;边箭头表示依赖方向。
    • 提示:节点过多或多次 Shuffle,通常意味着可优化(如 Broadcast Join、过滤下推、列裁剪)。
  • 详情区域 Details:分层展示查询从 SQL 文本到物理计划的演进:
    • Parsed:语法解析树。
    • Analyzed:解析并完成解析属性、类型推导后的逻辑计划。
    • Optimized:Catalyst 优化(谓词/投影下推、常量折叠、合并等)后的逻辑计划。
    • Physical Plan:可执行的物理算子树(见下节)。

Details 中 Physical Plan 详解

graph TD
  A["Physical Plan"] --> B["Scan(扫描)"]
  A --> C["Project / Filter(列裁剪/筛选)"]
  A --> D["Aggregate(聚合)"]
  A --> E["Join(连接)"]
  A --> F["Sort / Exchange(排序/Shuffle)"]
  A --> G["WholeStageCodegen(代码生成)"]

常见算子说明(按链路自下而上)

  • Scan:数据源读取。关键信息:PushedFilters(谓词下推)、Batch/Vectorized(矢量化读取)、Input Size/Rows(读取量)。
    • 优化:开启列裁剪、谓词下推;使用列式存储(Parquet/ORC);尽量减少 Input Size
  • Project/Filter:列裁剪与条件过滤,若能在 Scan 阶段下推更优。
  • Aggregate:分为 HashAggregate/SortAggregate;前者内存友好且常见,后者需要排序。
    • 优化:partial -> shuffle -> final 两阶段聚合是否生效;必要时调大 spark.sql.shuffle.partitions
  • Join:BroadcastHashJoin / SortMergeJoin / ShuffledHashJoin / BroadcastNestedLoopJoin 等。
    • 选择逻辑:
      • 小表 + 大表:优先 BroadcastHashJoin(受 spark.sql.autoBroadcastJoinThreshold 约束)。
      • 大表 + 大表:常见 SortMergeJoin(需要两侧都可排序的等值连接)。
      • 非等值/复杂条件:可能退化为 NestedLoopJoin。
    • 优化:广播小表、重分区、提前过滤、选择合适的 join key 分布、开启 AQE(自适应执行)。
  • Sort / Exchange:排序与 Shuffle 边界(Exchange)。
    • 代价高的环节,观察 shuffle read/write sizeskew(倾斜告警)。
  • WholeStageCodegen:算子融合,减少虚函数开销;节点越大代表融合范围越广。
    • 观察 codegenStageId;若关闭可设置 spark.sql.codegen.wholeStage=false(仅用于诊断)。

如何解读 Physical Plan 关键信息

  • 行为统计:(x rows, y KB) 结合 Stage Metrics 判断热点与瓶颈。
  • 下推/裁剪:检查 PushedFiltersInput/Output Columns 是否最小化。
  • Shuffle 边界:Exchange hashpartitioning(...) 处是重分区;关注分区数与倾斜。
  • Join 选择:核对 BroadcastHashJoin 是否命中,或 SortMergeJoin 是否多余的 sort。
  • 代码生成:确认存在 WholeStageCodegen,并留意是否被 PythonUDF 打断(可替换为内置函数或 pandas_udf)。

常见定位与优化建议

  • SQL 页面 Duration 过长:先看 Graph 的 Shuffle/Join 节点;再看 Details 的 Physical Plan 并结合 Executors 页的 Task 时间分布。
  • 扫描过大:启用分区裁剪、列裁剪、谓词下推;检查表统计信息(ANALYZE TABLE)。
  • 倾斜:尝试 saltingskew join(AQE 自动倾斜优化)、过滤热点 key。
  • Join 不合适:提升广播阈值或强制 /*+ BROADCAST(t) */;减少无用的 Sort

常见错误解决方案

内存相关错误

OutOfMemoryError: Java heap space

错误现象

java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3332)
    at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
    at java.lang.StringBuilder.append(StringBuilder.java:136)

原因分析

  • 堆内存不足:Executor或Driver的堆内存设置过小
  • 数据倾斜:某些分区数据量过大,导致单个Task内存溢出
  • 缓存过多:RDD缓存占用过多内存
  • 对象创建过多:频繁创建大对象

解决方案

1. 调整内存配置

# 增加Executor内存
spark-submit \
  --conf spark.executor.memory=8g \
  --conf spark.executor.memoryOverhead=2g \
  --conf spark.driver.memory=4g \
  --conf spark.driver.memoryOverhead=1g \
  your-app.jar

# 调整内存比例
spark-submit \
  --conf spark.memory.fraction=0.8 \
  --conf spark.memory.storageFraction=0.3 \
  your-app.jar

2. 处理数据倾斜

// 方法1:增加分区数
val skewedRDD = originalRDD.repartition(200)

// 方法2:自定义分区策略
val customPartitioner = new Partitioner {
  override def numPartitions: Int = 200
  override def getPartition(key: Any): Int = {
    // 自定义分区逻辑,避免数据倾斜
    val hash = key.hashCode()
    Math.abs(hash) % numPartitions
  }
}
val skewedRDD = originalRDD.partitionBy(customPartitioner)

// 方法3:两阶段聚合
val stage1RDD = originalRDD
  .map(x => (x._1 + "_" + Random.nextInt(10), x._2))  // 添加随机前缀
  .reduceByKey(_ + _)
  .map(x => (x._1.split("_")(0), x._2))  // 去掉随机前缀
  .reduceByKey(_ + _)

3. 优化缓存策略

// 使用MEMORY_AND_DISK_SER减少内存占用
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 及时释放不需要的缓存
rdd.unpersist()

// 使用checkpoint减少内存压力
rdd.checkpoint()

4. 代码优化

// 使用mapPartitions减少对象创建
val optimizedRDD = rdd.mapPartitions(iter => {
  val result = new ArrayBuffer[String]()
  while (iter.hasNext) {
    val item = iter.next()
    // 处理逻辑
    result += processedItem
  }
  result.iterator
})

// 复用对象
case class User(id: Long, name: String)
val userRDD = rdd.mapPartitions(iter => {
  val user = User(0L, "")  // 复用对象
  iter.map { case (id, name) =>
    user.id = id
    user.name = name
    user.copy()  // 返回副本
  }
})

OutOfMemoryError: Direct buffer memory

错误现象

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)

原因分析

  • 堆外内存不足:DirectBuffer内存设置过小
  • 网络传输过多:大量数据通过网络传输
  • 序列化问题:序列化过程中占用过多堆外内存

解决方案

1. 增加堆外内存

spark-submit \
  --conf spark.executor.memoryOverhead=4g \
  --conf spark.driver.memoryOverhead=2g \
  --conf spark.memory.offHeap.enabled=true \
  --conf spark.memory.offHeap.size=4g \
  your-app.jar

2. 优化网络传输

// 启用压缩
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.broadcast.compress", "true")
spark.conf.set("spark.shuffle.compress", "true")

// 调整网络缓冲区
spark.conf.set("spark.network.timeout", "800s")
spark.conf.set("spark.executor.heartbeatInterval", "60s")

3. 优化序列化

// 使用Kryo序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")

// 注册自定义类
val conf = new SparkConf()
conf.registerKryoClasses(Array(classOf[MyClass]))
  1. OutOfMemoryError: Metaspace

错误现象

java.lang.OutOfMemoryError: Metaspace
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

原因分析

  • 类加载过多:动态生成大量类
  • Metaspace设置过小:JVM Metaspace空间不足
  • UDF使用过多:大量UDF导致类加载

解决方案

1. 调整JVM参数

spark-submit \
  --conf spark.executor.extraJavaOptions="-XX:MaxMetaspaceSize=512m" \
  --conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=512m" \
  your-app.jar

2. 优化UDF使用

// 避免在UDF中创建过多类
val optimizedUDF = udf((value: String) => {
  // 使用简单逻辑,避免动态类生成
  value.toUpperCase
})

// 复用UDF实例
val myUDF = udf((x: Int) => x * 2)
df.select(myUDF(col("value")))

网络相关错误

FetchFailedException

错误现象

org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostname:7337
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:646)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:646)

原因分析

  • 网络超时:网络连接超时
  • Executor丢失:Executor进程异常退出
  • 内存不足:Executor内存不足导致进程退出
  • 网络不稳定:网络连接不稳定

解决方案

1. 调整网络超时参数

spark-submit \
  --conf spark.network.timeout=800s \
  --conf spark.executor.heartbeatInterval=60s \
  --conf spark.rpc.askTimeout=800s \
  --conf spark.rpc.lookupTimeout=800s \
  your-app.jar

2. 增加重试机制

spark-submit \
  --conf spark.task.maxFailures=8 \
  --conf spark.stage.maxAttempts=4 \
  your-app.jar

3. 优化Shuffle配置

spark-submit \
  --conf spark.shuffle.io.maxRetries=3 \
  --conf spark.shuffle.io.retryWait=60s \
  --conf spark.shuffle.file.buffer=32k \
  your-app.jar

4. 监控Executor状态

// 添加监控代码
spark.sparkContext.addSparkListener(new SparkListener {
  override def onExecutorLost(executorLost: SparkListenerExecutorLost): Unit = {
    println(s"Executor lost: ${executorLost.executorId}")
    // 记录日志或发送告警
  }
})

ConnectionTimeoutException

错误现象

java.net.ConnectTimeoutException: Connection timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

原因分析

  • 网络延迟:网络延迟过高
  • 防火墙限制:防火墙阻止连接
  • 端口冲突:端口被占用
  • DNS解析问题:DNS解析失败

解决方案

1. 调整连接超时

spark-submit \
  --conf spark.network.timeout=1200s \
  --conf spark.rpc.askTimeout=1200s \
  your-app.jar

2. 检查网络配置

# 检查网络连通性
ping hostname
telnet hostname port

# 检查防火墙
iptables -L

3. 使用本地化策略

// 启用数据本地化
spark.conf.set("spark.locality.wait", "30s")
spark.conf.set("spark.locality.wait.process", "30s")
spark.conf.set("spark.locality.wait.node", "30s")
spark.conf.set("spark.locality.wait.rack", "30s")

序列化相关错误

NotSerializableException

错误现象

java.io.NotSerializableException: com.example.MyClass
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

原因分析

  • 类未实现Serializable:自定义类未实现Serializable接口
  • 闭包问题:在闭包中引用了不可序列化的对象
  • 静态变量:引用了静态变量或单例对象

解决方案

1. 实现Serializable接口

// 方法1:实现Serializable
case class MyClass(id: Int, name: String) extends Serializable

// 方法2:使用@transient注解
class MyClass(val id: Int, val name: String) extends Serializable {
  @transient
  private val nonSerializableField = new NonSerializableClass()
}

2. 避免闭包问题

// 错误示例
val nonSerializableObject = new NonSerializableClass()
val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.map(x => nonSerializableObject.process(x))  // 会报错

// 正确示例
val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.mapPartitions(iter => {
  val localObject = new NonSerializableClass()  // 在分区内创建
  iter.map(x => localObject.process(x))
})

3. 使用广播变量

// 对于只读的大对象,使用广播变量
val largeData = spark.sparkContext.parallelize(1 to 1000000).collect()
val broadcastData = spark.sparkContext.broadcast(largeData)

val rdd = spark.sparkContext.parallelize(1 to 10)
rdd.map(x => {
  val data = broadcastData.value  // 使用广播变量
  processWithData(x, data)
})

KryoSerializationException

错误现象

com.esotericsoftware.kryo.KryoException: java.lang.ClassNotFoundException: com.example.MyClass
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

原因分析

  • 类未注册:使用Kryo序列化时类未注册
  • 类路径问题:类不在classpath中
  • 版本不兼容:序列化和反序列化版本不匹配

解决方案

1. 注册自定义类

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(
  classOf[MyClass],
  classOf[MyOtherClass]
))

val spark = SparkSession.builder()
  .config(conf)
  .getOrCreate()

2. 使用Kryo注册器

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClass])
    kryo.register(classOf[MyOtherClass])
  }
}

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")

3. 禁用严格模式

spark.conf.set("spark.kryo.registrationRequired", "false")

资源相关错误

ExecutorLostFailure

错误现象

org.apache.spark.scheduler.ExecutorLostFailure: Executor 1 lost
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$abortIfCompletelyBlacklisted$1.apply(TaskSetManager.scala:1023)

原因分析

  • 内存不足:Executor内存不足被杀死
  • CPU过载:CPU使用率过高导致进程异常
  • 磁盘空间不足:磁盘空间不足导致写入失败
  • 网络问题:网络连接问题导致心跳超时

解决方案

1. 增加资源配额

spark-submit \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.executor.memoryOverhead=2g \
  your-app.jar

2. 监控资源使用

// 添加资源监控
spark.sparkContext.addSparkListener(new SparkListener {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    val metrics = taskEnd.taskMetrics
    println(s"Task ${taskEnd.taskInfo.taskId} completed:")
    println(s"  Duration: ${taskEnd.taskInfo.duration}ms")
    println(s"  Memory: ${metrics.peakExecutionMemory} bytes")
    println(s"  Disk: ${metrics.diskBytesSpilled} bytes spilled")
  }
})

3. 优化资源分配

spark-submit \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=2 \
  --conf spark.dynamicAllocation.maxExecutors=20 \
  --conf spark.dynamicAllocation.initialExecutors=5 \
  your-app.jar

NoClassDefFoundError

错误现象

java.lang.NoClassDefFoundError: com.example.MyClass
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

原因分析

  • 依赖缺失:缺少必要的jar包
  • 版本冲突:依赖版本冲突
  • 类路径问题:类不在classpath中
  • 打包问题:jar包打包不完整

解决方案

1. 添加依赖jar包

spark-submit \
  --jars /path/to/dependency1.jar,/path/to/dependency2.jar \
  --conf spark.executor.extraClassPath=/path/to/dependencies/* \
  your-app.jar

2. 使用fat jar

<!-- Maven配置 -->
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>3.2.4</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
    </execution>
  </executions>
</plugin>

3. 检查依赖冲突

# 查看依赖树
mvn dependency:tree

# 排除冲突依赖
<dependency>
  <groupId>com.example</groupId>
  <artifactId>library</artifactId>
  <version>1.0.0</version>
  <exclusions>
    <exclusion>
      <groupId>conflicting.group</groupId>
      <artifactId>conflicting-artifact</artifactId>
    </exclusion>
  </exclusions>
</dependency>

数据相关错误

FileNotFoundException

错误现象

java.io.FileNotFoundException: File does not exist: hdfs://namenode:8020/path/to/file
    at org.apache.hadoop.hdfs.DFSClient.checkPath(DFSClient.java:1274)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1527)

原因分析

  • 文件不存在:输入文件路径错误或文件被删除
  • 权限问题:没有读取文件的权限
  • 路径错误:文件路径格式错误
  • HDFS问题:HDFS服务异常

解决方案

1. 检查文件路径

// 检查文件是否存在
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val path = new org.apache.hadoop.fs.Path("/path/to/file")

if (fs.exists(path)) {
  println("File exists")
} else {
  println("File does not exist")
}

2. 设置文件系统配置

// 设置HDFS配置
spark.conf.set("spark.hadoop.fs.defaultFS", "hdfs://namenode:8020")
spark.conf.set("spark.hadoop.dfs.namenode.rpc-address", "namenode:8020")

3. 处理权限问题

# 检查文件权限
hdfs dfs -ls /path/to/file

# 修改文件权限
hdfs dfs -chmod 644 /path/to/file

# 修改文件所有者
hdfs dfs -chown username:group /path/to/file

DataSourceException

错误现象

org.apache.spark.sql.AnalysisException: Table or view not found: table_name
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

原因分析

  • 表不存在:数据库表不存在
  • 权限问题:没有访问表的权限
  • 数据库连接问题:数据库连接失败
  • 表名错误:表名拼写错误

解决方案

1. 检查表是否存在

// 检查表是否存在
val tables = spark.catalog.listTables()
tables.filter(_.name == "table_name").show()

// 或者使用SQL
spark.sql("SHOW TABLES").show()

2. 设置数据库连接

// 设置数据库连接
spark.conf.set("spark.sql.warehouse.dir", "/user/hive/warehouse")
spark.conf.set("hive.metastore.uris", "thrift://metastore:9083")

// 使用Hive
spark.sql("USE database_name")
spark.sql("SHOW TABLES").show()

3. 处理权限问题

-- 授予权限
GRANT SELECT ON TABLE table_name TO USER username;

-- 检查权限
SHOW GRANT USER username ON TABLE table_name;

调试和诊断工具

  1. Spark Web UI

访问方式

http://driver-host:4040  # 应用运行时
http://driver-host:18080 # 历史服务器

关键指标

  • Stages页面:查看Stage执行情况和失败原因
  • Executors页面:查看Executor资源使用情况
  • Storage页面:查看RDD缓存情况
  • Environment页面:查看配置参数
  1. 日志分析

查看日志

# 查看Driver日志
tail -f /path/to/spark/logs/spark-*-driver-*.log

# 查看Executor日志
tail -f /path/to/spark/logs/spark-*-executor-*.log

# 查看YARN日志
yarn logs -applicationId application_1234567890_0001

关键日志模式

# 查找错误信息
grep -i "error\|exception\|failed" /path/to/logs/*.log

# 查找内存相关错误
grep -i "outofmemory\|oom" /path/to/logs/*.log

# 查找网络相关错误
grep -i "timeout\|connection" /path/to/logs/*.log
  1. 性能分析工具

JVM分析

# 查看JVM堆内存使用
jstat -gc <pid> 1000

# 查看线程状态
jstack <pid>

# 查看内存dump
jmap -dump:format=b,file=heap.hprof <pid>

系统资源监控

# 查看系统资源使用
top -p <pid>
iostat -x 1
netstat -i
  1. 调试代码

添加调试信息

// 添加日志
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.DEBUG)

// 添加监控
spark.sparkContext.addSparkListener(new SparkListener {
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    println(s"Task started: ${taskStart.taskInfo.taskId}")
  }
  
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    println(s"Task ended: ${taskEnd.taskInfo.taskId}, " +
            s"status: ${taskEnd.taskInfo.status}")
  }
})

使用Spark Shell调试

// 启动Spark Shell
spark-shell --master local[*]

// 逐步调试
val rdd = sc.textFile("/path/to/file")
rdd.take(10).foreach(println)  // 查看数据
rdd.count()  // 检查数据量

预防措施

  1. 配置优化

基础配置

# 内存配置
spark.executor.memory=8g
spark.executor.memoryOverhead=2g
spark.driver.memory=4g
spark.driver.memoryOverhead=1g

# 网络配置
spark.network.timeout=800s
spark.executor.heartbeatInterval=60s

# 序列化配置
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=false

性能配置

# Shuffle配置
spark.shuffle.file.buffer=32k
spark.shuffle.io.maxRetries=3
spark.shuffle.io.retryWait=60s

# 动态分配
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=20
  1. 代码最佳实践

内存优化

// 使用广播变量
val broadcastVar = sc.broadcast(largeData)

// 及时释放缓存
rdd.unpersist()

// 使用checkpoint
rdd.checkpoint()

性能优化

// 合理设置分区数
val optimalPartitions = Math.max(rdd.partitions.length, 200)
val repartitionedRDD = rdd.repartition(optimalPartitions)

// 使用mapPartitions
val optimizedRDD = rdd.mapPartitions(iter => {
  // 批量处理逻辑
  iter.map(processItem)
})
  1. 监控告警

设置监控

// 添加监控指标
val metrics = spark.sparkContext.getStatusTracker
val stageInfo = metrics.getStageInfo(stageId)
println(s"Stage $stageId: ${stageInfo.numTasks} tasks, " +
        s"${stageInfo.numCompletedTasks} completed")

告警配置

# 设置告警阈值
spark.executor.failures.max=3
spark.task.maxFailures=8
spark.stage.maxAttempts=4

通过以上详细的错误分析和解决方案,可以有效处理Spark任务中的常见问题,提高系统的稳定性和性能。


关键参数与配置模板

JVM相关参数

垃圾回收配置

# G1GC配置(推荐)
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC 
-XX:G1HeapRegionSize=16m 
-XX:MaxGCPauseMillis=200 
-XX:+G1PrintRegionRememberedSetInfo 
-XX:+UseCompressedOops 
-XX:+UseCompressedClassPointers
-XX:+PrintHeapAtGC 
-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps
-Xloggc:/var/log/spark/gc-executor.log"

--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC 
-XX:MaxGCPauseMillis=200 
-XX:+PrintHeapAtGC
-Xloggc:/var/log/spark/gc-driver.log"

并发GC配置

# CMS GC配置
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC 
-XX:+CMSParallelRemarkEnabled 
-XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=70
-XX:+PrintGC 
-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps"

内存调试参数

# 内存调试配置
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/spark/heapdump
-XX:+TraceClassLoading
-XX:+PrintStringDeduplication"

生产环境配置

#!/bin/bash
# 生产环境Spark配置模板
/opt/spark32/bin/spark-shell --master yarn  \
--deploy-mode client \
--driver-memory ${driver_memory} \
--executor-memory ${executor_memory} \
--num-executors ${num_executors} \
--conf spark.default.parallelism=${default_parallelism} \
--conf spark.sql.shuffle.partitions=${shuffle_partitions} \
--conf spark.driver.maxResultSize=4g \
--name ${job_name}_${log_date} \
--conf spark.executor.memoryOverhead=2g \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:InitiatingHeapOccupancyPercent=35 -XX:MaxGCPauseMillis=200" \
--conf spark.executor.heartbeatInterval=60s \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=512m \
--conf spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 \
--conf spark.sql.parquet.binaryAsString=true \
--conf spark.sql.parquet.compression.codec=snappy \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.kryoserializer.buffer=128k \
--conf spark.shuffle.consolidateFiles=true \
--conf spark.shuffle.io.preferDirectBufs=true \
--conf spark.shuffle.io.maxRetries=10 \
--conf spark.shuffle.io.retryWait=20s \
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true \
--conf spark.shuffle.file.buffer=4m \
--conf spark.network.timeout=1200s \
--conf spark.reducer.maxSizeInFlight=36m \
--jars ${jar_path} \
--queue default

Spark高频面试题

基础概念题

Q1: 请详细解释RDD、DataFrame和Dataset的区别及各自的应用场景。

标准答案:

Spark提供了三种核心数据抽象:RDD、DataFrame和Dataset,它们各自适用于不同的场景,具有不同的特性和优势。

1. 基本概念对比

  • RDD (Resilient Distributed Dataset):Spark最初的数据抽象,是一个不可变的、分布式的对象集合,支持函数式编程操作。
  • DataFrame:在RDD基础上引入了Schema概念,类似关系型数据库中的表结构,支持SQL查询。
  • Dataset:DataFrame的扩展,提供类型安全的、面向对象的编程接口,结合了RDD的类型安全和DataFrame的优化性能。

2. 核心特性对比

特性RDDDataFrameDataset
类型安全编译时类型检查运行时类型检查编译时类型检查
性能优化无内置优化Catalyst优化器Catalyst优化器
内存管理Java对象/Kryo序列化Tungsten二进制格式Tungsten二进制格式
API风格函数式APISQL + 函数式API类型安全的函数式API
Schema感知无Schema有Schema有Schema
使用难度较复杂简单中等
适用场景非结构化数据处理结构化数据分析结构化数据复杂处理

3. 代码示例对比

// RDD示例
val rdd = sc.textFile("data.txt")
  .map(line => line.split(","))
  .map(fields => Person(fields(0), fields(1).toInt))
  .filter(person => person.age > 30)

// DataFrame示例
val df = spark.read.json("people.json")
df.filter($"age" > 30)
  .select($"name", $"age")
  .groupBy($"age")
  .count()

// Dataset示例
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(p => p.age > 30)
  .map(p => (p.name, p.age))
  .groupByKey(_._2)
  .count()

4. 性能与优化对比

  • RDD:依赖于JVM的垃圾回收和Java序列化,性能受限
  • DataFrame
    • 使用Catalyst优化器进行逻辑和物理执行计划优化
    • 使用Tungsten高效内存管理,减少GC开销
    • 支持列式存储和压缩
  • Dataset
    • 结合了DataFrame的所有优化
    • 增加了编码器(Encoder),在对象和内部Tungsten表示之间高效转换

5. 选择建议

  • 选择RDD:当需要细粒度控制或处理非结构化数据时
  • 选择DataFrame:处理结构化数据,需要高性能优化,或需要使用SQL查询
  • 选择Dataset:需要类型安全和函数式编程,同时又需要Catalyst优化器的性能提升

随着Spark的发展,DataFrame和Dataset API已经成为推荐的数据处理方式,特别是在Spark 2.0之后,DataFrame实际上是Dataset[Row]的类型别名。

Q2: 请详细描述Spark任务的执行流程,从提交应用到任务完成的全过程。

标准答案:

Spark任务执行是一个复杂的分布式计算过程,涉及多个组件协同工作。理解这个流程对于优化Spark应用和排查问题至关重要。

1. 整体执行架构

Spark应用程序的执行涉及以下核心组件:

  • Driver Program:包含应用程序的main函数,负责创建SparkContext
  • Cluster Manager:负责资源分配(如YARN、Kubernetes、Mesos或Standalone)
  • Worker Node:执行计算任务的节点
  • Executor:在Worker节点上运行,负责执行具体的计算任务

2. 详细执行流程

graph TD
    A[用户程序] --> B[SparkContext]
    B --> C[DAGScheduler]
    C --> D[TaskScheduler]
    D --> E[Worker节点]
    E --> F[Executor]
    F --> G[Task执行]
    G --> H[结果收集]
    H --> I[作业完成]
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#d4f1f9,stroke:#05a4d1
    style C fill:#ffe6cc,stroke:#d79b00
    style D fill:#d5e8d4,stroke:#82b366
    style E fill:#e1d5e7,stroke:#9673a6
    style F fill:#f8cecc,stroke:#b85450
    style G fill:#f8cecc,stroke:#b85450
    style H fill:#d4f1f9,stroke:#05a4d1
    style I fill:#f9f9f9,stroke:#333

3. 执行步骤详解

  1. 应用程序启动

    • 用户提交应用程序
    • 创建SparkContext(Spark的入口点)
    • SparkContext连接到集群管理器
  2. 资源申请

    • SparkContext通过集群管理器申请资源
    • 集群管理器在Worker节点上启动Executor进程
  3. DAG构建

    • 用户代码通过RDD转换操作构建DAG(有向无环图)
    • 延迟计算:转换操作只构建DAG,不执行计算
  4. 作业提交

    • 当遇到Action操作时,触发作业提交
    • SparkContext将作业提交给DAGScheduler
  5. Stage划分

    • DAGScheduler将DAG划分为多个Stage
    • 划分依据:Shuffle操作(如reduceByKey、join等)
    • 每个Stage包含可以流水线执行的一组Task
  6. Task生成与调度

    • 为每个Stage生成TaskSet
    • TaskScheduler将TaskSet提交给TaskSetManager
    • TaskSetManager负责具体的任务调度和失败重试
  7. Task执行

    • Executor接收并执行Task
    • 执行计算并将结果保存在内存或磁盘
    • 对于Shuffle操作,将中间结果写入本地磁盘
  8. 结果收集

    • 对于需要返回结果的Action操作,Driver收集结果
    • 对于写入外部存储的操作,直接写入目标位置
  9. 作业完成

    • 所有Task执行完成后,作业结束
    • 释放资源或继续执行下一个作业

4. 关键概念解析

  • Job:由Action操作触发的一组计算任务
  • Stage:Job的子集,由一组可以流水线执行的Task组成
  • Task:在单个Executor上执行的最小工作单元,处理一个分区的数据
  • Shuffle:数据重分布过程,是Stage划分的边界

5. 示例说明

// 这个简单的Spark程序演示了执行流程
val sc = new SparkContext(conf)  // 创建SparkContext
val lines = sc.textFile("data.txt")  // 构建RDD,但不执行
val words = lines.flatMap(_.split(" "))  // 继续构建DAG
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)  // reduceByKey会导致Shuffle,划分Stage
wordCounts.collect()  // Action操作,触发实际计算

在这个例子中:

  • 会创建一个包含两个Stage的Job
  • 第一个Stage执行textFile、flatMap和map操作
  • 第二个Stage执行reduceByKey操作
  • collect()触发整个Job的执行

理解Spark任务执行流程有助于编写高效的Spark应用程序,并能更好地进行性能调优和故障排查。

Q3: 请解释Spark的内存管理机制,包括内存分配策略和优化方法。

标准答案:

Spark内存管理是影响Spark应用性能的关键因素之一。Spark通过精细的内存管理机制,在有限的内存资源下实现高效的分布式计算。

1. Spark内存架构

Spark的JVM堆内存主要分为以下几个部分:

graph TD
    A[JVM堆内存] --> B[Reserved Memory<br>300MB]
    A --> C[User Memory<br>用户代码使用]
    A --> D[Spark Memory<br>执行和存储]
    
    D --> E[Storage Memory<br>缓存数据]
    D --> F[Execution Memory<br>计算过程]
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#f8cecc,stroke:#b85450
    style C fill:#d5e8d4,stroke:#82b366
    style D fill:#d4f1f9,stroke:#05a4d1
    style E fill:#ffe6cc,stroke:#d79b00
    style F fill:#e1d5e7,stroke:#9673a6

2. 内存管理模式

Spark提供两种内存管理模式:

  • 静态内存管理(Static Memory Management)

    • Spark 1.6之前的默认模式
    • 为存储和执行内存分配固定比例,不能动态调整
    • 配置参数:spark.storage.memoryFractionspark.shuffle.memoryFraction
  • 统一内存管理(Unified Memory Management)

    • Spark 1.6及之后的默认模式
    • 存储内存和执行内存共享一个区域,可以动态调整
    • 配置参数:spark.memory.fractionspark.memory.storageFraction

3. 统一内存管理详解

在统一内存管理模式下:

  • Spark Memory:占JVM堆内存的比例由spark.memory.fraction控制,默认为0.6
  • Storage Memory:初始大小由spark.memory.storageFraction控制,默认为0.5
  • Execution Memory:初始大小为Spark Memory减去Storage Memory

内存动态调整机制

  1. 存储内存不足时

    • 如果执行内存有空闲,可以借用执行内存
    • 如果执行内存没有空闲,则按LRU策略淘汰已缓存的RDD分区
  2. 执行内存不足时

    • 如果存储内存有空闲,可以借用存储内存
    • 如果存储内存没有空闲,但存储内存中有部分是被执行内存借用的,则可以抢占这部分内存
    • 执行内存不会淘汰存储内存中的数据

4. 内存管理相关配置参数

// 统一内存管理模式关键参数
spark.memory.fraction = 0.6  // Spark Memory占JVM堆内存的比例
spark.memory.storageFraction = 0.5  // Storage Memory初始占比

// 其他重要内存参数
spark.executor.memory = "4g"  // Executor的JVM堆内存大小
spark.memory.offHeap.enabled = false  // 是否启用堆外内存
spark.memory.offHeap.size = "2g"  // 堆外内存大小

5. 堆外内存(Off-Heap Memory)

从Spark 2.0开始,Spark支持使用堆外内存:

  • 通过spark.memory.offHeap.enabled开启
  • 使用spark.memory.offHeap.size设置大小
  • 优势:减少GC开销,提高性能
  • 缺点:需要手动管理内存,配置复杂

6. 内存管理最佳实践

  • 合理设置Executor内存:根据集群节点内存和并发任务数
  • 监控内存使用情况:通过Spark UI查看内存使用情况
  • 调整内存分配比例:根据应用特点调整存储和执行内存比例
  • 使用堆外内存:对于大数据量处理,考虑启用堆外内存
  • 避免内存泄漏:注意释放不再使用的RDD,使用unpersist()方法

7. 内存不足问题排查

当遇到OutOfMemoryError或性能下降时:

  • 检查是否有不必要的数据缓存
  • 考虑增加分区数,减少每个任务的内存使用
  • 调整GC策略,如使用G1GC
  • 使用Kryo序列化减少内存占用
  • 考虑增加Executor内存或减少每个Executor的核心数

深入理解Spark内存管理机制,对于优化Spark应用性能和解决内存相关问题至关重要。

架构原理题

Q4: 请详细介绍Spark的架构组件及其职责,各组件之间如何协同工作?

标准答案:

Spark是一个分布式计算框架,其架构由多个组件协同工作,共同支撑分布式数据处理能力。深入理解Spark架构组件及其交互方式,对于有效使用Spark和排查问题至关重要。

1. Spark架构总览

graph TB
    A[用户应用] --> B[SparkContext]
    B --> C[集群管理器<br>YARN/Kubernetes/Mesos/Standalone]
    C --> D[Worker节点]
    D --> E[Executor]
    B --> F[DAGScheduler]
    F --> G[TaskScheduler]
    G --> E
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#d4f1f9,stroke:#05a4d1
    style C fill:#ffe6cc,stroke:#d79b00
    style D fill:#d5e8d4,stroke:#82b366
    style E fill:#e1d5e7,stroke:#9673a6
    style F fill:#f8cecc,stroke:#b85450
    style G fill:#f8cecc,stroke:#b85450

2. 核心组件详解

组件位置主要职责关键特性
Driver Program客户端或集群中运行应用程序的main函数
创建SparkContext
提交作业
收集结果
应用程序的控制中心
包含DAGScheduler和TaskScheduler
SparkContextDriver中Spark程序的入口点
连接集群管理器
获取Executor
构建RDD
每个应用只有一个
负责作业提交和资源申请
SparkSessionDriver中Spark 2.0后的入口点
整合SQL、DataFrame、Dataset API
提供统一的数据访问接口
包含SparkContext
Cluster Manager独立进程资源分配
启动Executor
支持多种实现:
YARN、Kubernetes、Mesos、Standalone
Worker Node集群节点提供计算资源
启动Executor进程
物理节点或虚拟机
可以运行多个Executor
ExecutorWorker节点上执行Task
缓存RDD
返回结果
每个应用有多个
生命周期与应用相同
DAGSchedulerDriver中构建DAG
划分Stage
生成TaskSet
基于Shuffle依赖划分Stage
优化执行计划
TaskSchedulerDriver中将Task分发给Executor
监控Task执行
重试失败的Task
负责具体的任务调度
处理任务失败和重试
BlockManagerDriver和Executor中管理内存和磁盘存储
处理数据块传输
负责RDD缓存
管理Shuffle数据

3. 组件交互流程

  1. 应用程序初始化
val spark = SparkSession.builder().appName("MyApp").getOrCreate()
val sc = spark.sparkContext
  1. 资源申请与Executor启动

    • SparkContext连接集群管理器
    • 集群管理器在Worker节点上启动Executor进程
    • Executor向Driver注册
  2. 作业提交与执行

    • DAGScheduler将RDD DAG划分为Stage
    • TaskScheduler将TaskSet提交给Executor
    • Executor执行Task并返回结果

4. 不同部署模式对组件的影响

部署模式Driver位置特点适用场景
Client模式客户端机器Driver与客户端在同一进程
便于调试和查看输出
开发测试
交互式应用
Cluster模式集群中的Worker节点Driver在集群中运行
客户端可以断开连接
生产环境
长时间运行的作业

5. 各组件的高可用性考虑

  • Driver:在YARN或Kubernetes上可以启用AM (ApplicationMaster) 重启
  • Worker:节点失败时,其上的Executor会在其他节点重启
  • Executor:失败时会重启,正在执行的任务会重试
  • Task:失败后会自动重试,最多重试次数可配置

6. 实际应用中的架构选择

  • 小规模应用:Standalone模式简单易用
  • 企业生产环境:YARN或Kubernetes提供更好的资源隔离和管理
  • 混合负载环境:Kubernetes适合与其他工作负载共存
  • 传统大数据环境:YARN与Hadoop生态系统集成更好

深入理解Spark架构组件及其交互方式,有助于优化应用性能、排查问题,以及设计适合特定场景的Spark应用架构。

Q5: 请详细解释Spark的Shuffle机制原理及其演进历史,如何优化Shuffle操作?

标准答案:

Shuffle是Spark中最关键也最复杂的机制之一,它涉及到数据的重新分区和跨节点传输,对Spark应用的性能有着重大影响。深入理解Shuffle机制对于优化Spark应用至关重要。

1. Shuffle的基本概念

Shuffle是指将分布在各个分区的数据按照某种规则重新组织,使得具有相同特征(如相同的key)的数据聚集在一起进行计算的过程。在Spark中,Shuffle操作是Stage划分的边界。

触发Shuffle的操作包括

  • 重分区操作repartitioncoalesce
  • ByKey类操作groupByKeyreduceByKeyaggregateByKey
  • Join类操作joincogroup
  • 排序操作sortBysortByKey

2. Shuffle的演进历史

Spark Shuffle机制经历了多次重大改进:

Shuffle版本Spark版本特点主要问题
Hash Shuffle V10.8及之前每个map task输出M×R个文件
(M=map任务数,R=reduce任务数)
文件数过多,占用文件句柄
Hash Shuffle V20.8.1 - 1.1每个executor输出C×R个文件
(C=core数,R=reduce任务数)
文件数仍然较多
Sort Shuffle V11.1 - 1.5每个map task输出1个文件,按key排序所有数据都排序,开销大
Sort Shuffle V2
(Tungsten)
1.5 - 2.0二进制序列化,直接操作内存特定场景优化
Sort Shuffle V32.0+统一的Sort-based Shuffle
小数据量可绕过排序
当前默认实现

3. Sort-based Shuffle详细工作流程

graph TD
    A[Map任务] --> B[内存中按Partitioner分区]
    B --> C{是否需要排序?}
    C -->|是| D[对每个分区内数据排序]
    C -->|否| E[跳过排序]
    D --> F[溢写到磁盘]
    E --> F
    F --> G[合并溢写文件]
    G --> H[生成数据文件和索引文件]
    H --> I[Reduce任务]
    I --> J[通过网络拉取数据]
    J --> K[合并数据]
    K --> L[进行Reduce计算]
    
    style A fill:#d4f1f9,stroke:#05a4d1
    style I fill:#e1d5e7,stroke:#9673a6
    style F fill:#ffe6cc,stroke:#d79b00
    style G fill:#ffe6cc,stroke:#d79b00
    style H fill:#ffe6cc,stroke:#d79b00
    style J fill:#d5e8d4,stroke:#82b366
    style K fill:#d5e8d4,stroke:#82b366
    style L fill:#d5e8d4,stroke:#82b366

4. Map端详解

  1. 分区计算:根据Partitioner确定每条数据的目标分区
  2. 内存缓冲:数据先写入内存缓冲区
  3. 排序与聚合:根据配置决定是否进行排序和聚合
  4. 溢写机制
    • 当缓冲区达到阈值(spark.shuffle.spill.numElementsForceSpillThreshold)时触发溢写
    • 溢写过程中可能进行排序和聚合
  5. 文件合并:多个溢写文件最终合并为一个数据文件和一个索引文件

5. Reduce端详解

  1. 任务初始化:Reduce任务启动时,向DAGScheduler获取上游Shuffle数据的位置信息
  2. 数据拉取:通过BlockManager从各个Map任务所在节点拉取数据
  3. 拉取策略
    • 按批次拉取,避免一次性拉取过多数据
    • 支持重试机制,处理临时网络故障
  4. 数据聚合:将拉取的数据进行合并和聚合处理
  5. 结果计算:对聚合后的数据执行Reduce操作

6. 关键配置参数

// Shuffle行为控制
spark.shuffle.manager = "sort"  // Shuffle实现方式,默认sort
spark.shuffle.sort.bypassMergeThreshold = 200  // 小分区数量绕过排序的阈值

// 内存使用控制
spark.shuffle.file.buffer = "32k"  // 每个输出流的缓冲大小
spark.shuffle.spill.compress = true  // 是否压缩溢写文件

// 网络传输控制
spark.reducer.maxSizeInFlight = "48m"  // 每个reduce任务同时拉取的最大数据量
spark.shuffle.io.retryWait = "5s"  // 重试等待时间
spark.shuffle.io.maxRetries = 3  // 最大重试次数

7. Shuffle优化策略

  1. 减少Shuffle操作

    • 使用mapPartitions替代map后接reduceByKey
    • 使用广播变量替代join
  2. 调整分区数量

    • 过少:数据倾斜,任务并行度低
    • 过多:小文件过多,调度开销大
    • 建议:每个分区大小在128MB左右
  3. 启用聚合

    • 使用reduceByKey替代groupByKey
    • 使用aggregateByKey进行本地预聚合
  4. 内存调优

    • 增加Shuffle缓冲区大小减少磁盘I/O
    • 调整执行内存比例适应Shuffle需求
  5. 序列化优化

    • 使用Kryo序列化减少数据大小
    • 注册自定义类提高序列化性能

深入理解Shuffle机制,可以帮助开发者编写更高效的Spark应用,避免常见的性能陷阱,特别是在处理大规模数据时。

性能调优题

Q6: 请详述Spark应用的性能调优策略,从哪些方面可以提升Spark作业的执行效率?

标准答案:

Spark性能调优是一个系统性工作,需要从多个维度进行综合优化。一个高效的Spark应用需要合理的资源配置、优化的代码结构、适当的数据处理策略以及精细的参数调整。

1. 性能调优的整体方法论

性能调优应遵循以下方法论:

  • 自上而下:从应用架构到具体参数
  • 数据驱动:基于监控指标和性能测试
  • 渐进式:从最大瓶颈开始,逐步优化
  • 权衡取舍:在资源消耗、执行速度、稳定性之间寻找平衡

2. 资源配置优化

graph TD
    A[资源配置优化] --> B[Executor配置]
    A --> C[Driver配置]
    A --> D[集群资源]
    
    B --> B1[内存大小]
    B --> B2[核心数量]
    B --> B3[实例数量]
    
    C --> C1[内存大小]
    C --> C2[并行度]
    
    D --> D1[节点规格]
    D --> D2[资源隔离]
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#d4f1f9,stroke:#05a4d1
    style C fill:#ffe6cc,stroke:#d79b00
    style D fill:#d5e8d4,stroke:#82b366

Executor配置最佳实践

  • 内存大小:每个Executor 4-8GB内存(过大导致GC延迟)
  • 核心数量:每个Executor 4-5个核心(过多导致线程竞争)
  • 实例数量(集群总核心数 / 每个Executor核心数),预留10%资源

配置示例

// 10节点集群,每节点16核64GB内存
spark.executor.instances = 30       // (10 * 16) / 5 = 32,预留部分
spark.executor.cores = 5            // 每个Executor 5个核心
spark.executor.memory = "20g"       // 每个Executor 20GB内存
spark.driver.memory = "10g"         // Driver 10GB内存

3. 并行度优化

并行度是指任务划分的分区数,影响任务的并行执行效率。

并行度设置原则

  • 基准值:集群总核心数的2-3倍
  • 数据量:每个分区数据量在128MB左右
  • 动态调整spark.sql.adaptive.enabled=true

并行度相关配置

// 静态配置
spark.default.parallelism = 600     // 默认并行度
spark.sql.shuffle.partitions = 600  // SQL操作的并行度

// 动态配置
spark.sql.adaptive.enabled = true   // 启用自适应查询执行
spark.sql.adaptive.coalescePartitions.enabled = true  // 合并小分区

4. 数据倾斜优化

数据倾斜是指某些分区的数据量远大于其他分区,导致任务执行时间不均衡。

识别数据倾斜

  • Spark UI中观察Stage页面的任务执行时间分布
  • 查看Shuffle读写数据量的分布情况

解决方案

倾斜类型解决方案实现方式
Join倾斜广播Joinbroadcast(smallDF).join(largeDF)
Join倾斜拆分热点键对热点键添加随机前缀,扩大Join
聚合倾斜两阶段聚合局部聚合+全局聚合
聚合倾斜自定义分区实现自定义Partitioner
数据源倾斜预处理ETL阶段重新分区

代码示例

// 两阶段聚合示例
val result = rdd
  .map(x => (x._1 + "_" + Random.nextInt(10), x._2))  // 加盐
  .reduceByKey(_ + _)  // 局部聚合
  .map(x => (x._1.split("_")(0), x._2))  // 去盐
  .reduceByKey(_ + _)  // 全局聚合

5. 缓存策略优化

合理的缓存策略可以避免重复计算,提高执行效率。

缓存级别选择

存储级别内存使用CPU开销适用场景
MEMORY_ONLY默认选择,内存充足
MEMORY_AND_DISK数据量大于内存
MEMORY_ONLY_SER内存受限,可接受序列化开销
OFF_HEAP需要跨应用共享数据

缓存使用原则

  • 只缓存重复使用的RDD/DataFrame
  • 在Shuffle操作之后、Action操作之前缓存
  • 及时使用unpersist()释放不再使用的缓存

6. Shuffle优化

Shuffle是Spark中最昂贵的操作,优化Shuffle可以显著提升性能。

Shuffle优化策略

  • 减少Shuffle:使用mapPartitions替代map+reduceByKey
  • 本地聚合:使用reduceByKey替代groupByKey
  • 广播变量:小表广播避免Shuffle
  • 参数调整:调整缓冲区大小、压缩算法等

关键参数

spark.shuffle.file.buffer = "64k"  // 增加缓冲区减少磁盘I/O
spark.shuffle.compress = true      // 启用压缩
spark.shuffle.io.maxRetries = 6    // 增加重试次数

7. SQL优化

对于Spark SQL应用,可以应用以下优化技术:

查询优化

  • 谓词下推:尽早过滤数据
  • 列裁剪:只读取需要的列
  • 分区裁剪:只读取需要的分区
  • 自动优化:启用AQE、动态分区裁剪等

配置示例

// 启用自适应查询执行
spark.sql.adaptive.enabled = true
// 启用动态分区裁剪
spark.sql.optimizer.dynamicPartitionPruning.enabled = true
// 启用Join重排序
spark.sql.adaptive.optimizeSkewedJoin = true

8. 序列化优化

序列化影响数据传输和存储效率。

序列化选择

  • Kryo序列化:比Java序列化更高效
  • 列式格式:Parquet、ORC等格式更高效

配置示例

// 启用Kryo序列化
spark.serializer = "org.apache.spark.serializer.KryoSerializer"
// 注册自定义类
spark.kryo.registrator = "com.example.MyRegistrator"

9. 综合性能调优案例

大规模数据Join优化

// 优化前
val result = largeDF.join(smallDF, Seq("key"))

// 优化后
// 1. 广播小表
val broadcastDF = broadcast(smallDF)
val result = largeDF.join(broadcastDF, Seq("key"))

// 2. 启用AQE和Join优化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

数据倾斜处理

// 优化前
val result = rdd.reduceByKey(_ + _)

// 优化后
// 1. 两阶段聚合
val result = rdd
  .map(x => ((x._1, Random.nextInt(10)), x._2))  // 加盐
  .reduceByKey(_ + _)  // 局部聚合
  .map(x => (x._1._1, x._2))  // 去盐
  .reduceByKey(_ + _)  // 全局聚合

性能调优是一个持续的过程,需要结合具体应用场景、数据特点和资源情况,采用适当的优化策略。通过系统性的调优,可以显著提升Spark应用的性能和资源利用率。

Q7: 如何识别和解决Spark中的数据倾斜问题?请给出具体的解决方案和代码示例。

标准答案:

数据倾斜是Spark应用中常见的性能瓶颈,表现为某些分区的数据量远大于其他分区,导致任务执行时间不均衡,整体作业延迟。有效解决数据倾斜问题是Spark性能优化的关键环节。

1. 数据倾斜的识别

在解决数据倾斜前,首先需要准确识别问题:

识别方法

  • Spark UI:观察Stage页面中任务执行时间分布,如有明显"长尾"现象则可能存在倾斜
  • Shuffle统计:检查Shuffle读写数据量分布是否均衡
  • 数据采样:对可能的倾斜键进行采样分析,确定热点数据

倾斜特征

Task执行时间分布:
[████████] 12s
[████████] 13s
[████████] 11s
[██████████████████████████████████████] 120s  <- 明显的倾斜任务
[████████] 14s

2. 数据倾斜的根本原因

数据倾斜通常由以下原因导致:

graph TD
    A[数据倾斜根因] --> B[数据本身分布不均]
    A --> C[业务逻辑导致]
    A --> D[技术实现问题]
    
    B --> B1[热点键/值]
    B --> B2[异常数据]
    
    C --> C1[时间维度聚合]
    C --> C2[地域维度聚合]
    
    D --> D1[默认分区器问题]
    D --> D2[并行度设置不当]
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#d4f1f9,stroke:#05a4d1
    style C fill:#ffe6cc,stroke:#d79b00
    style D fill:#d5e8d4,stroke:#82b366

3. 解决方案分类

根据倾斜场景和原因,可以采用不同的解决方案:

倾斜场景解决方案适用条件优缺点
Join倾斜广播Join一侧数据集较小(<10GB)简单高效,但受内存限制
Join倾斜拆分热点键能识别出热点键针对性强,但实现复杂
Join倾斜随机前缀+扩容热点键较多通用性好,但增加计算量
聚合倾斜两阶段聚合聚合操作(如reduceByKey)效果好,适用面广
聚合倾斜自定义分区数据分布已知精确控制,但需定制开发
数据源倾斜预处理重分区ETL阶段可控治本方法,但增加前置处理

4. 详细解决方案

4.1 Join操作倾斜解决方案

方案一:广播Join

// 优化前
val result = largeDF.join(smallDF, "key")

// 优化后
import org.apache.spark.sql.functions.broadcast
val result = largeDF.join(broadcast(smallDF), "key")

方案二:拆分热点键

// 假设发现"000"是热点键
// 1. 将大表中热点键对应的数据拆分出来
val largeDF_normal = largeDF.filter($"key" =!= "000")
val largeDF_skew = largeDF.filter($"key" === "000")
  .withColumn("key_random", concat($"key", lit("_"), rand()*10))

// 2. 将小表对应热点键数据扩容
val smallDF_normal = smallDF.filter($"key" =!= "000")
val smallDF_skew = smallDF.filter($"key" === "000")
  .withColumn("key_random", 
    explode(array((0 until 10).map(i => concat($"key", lit("_"), lit(i))): _*)))

// 3. 分别Join后合并结果
val join1 = largeDF_normal.join(smallDF_normal, "key")
val join2 = largeDF_skew.join(smallDF_skew, 
  largeDF_skew("key_random") === smallDF_skew("key_random"))
  .drop("key_random")

val result = join1.union(join2)

方案三:随机前缀+扩容Join

// 1. 大表添加随机前缀
val largeDF_rand = largeDF.withColumn("prefix", (rand()*10).cast("int"))
  .withColumn("key_prefixed", concat(col("prefix").cast("string"), lit("_"), col("key")))

// 2. 小表扩容10倍
val smallDF_expanded = smallDF.withColumn("prefix", 
  explode(array((0 until 10).map(lit(_)): _*)))
  .withColumn("key_prefixed", concat(col("prefix").cast("string"), lit("_"), col("key")))

// 3. 在prefixed key上Join
val joinResult = largeDF_rand.join(smallDF_expanded, "key_prefixed")
  .drop("prefix", "key_prefixed")

4.2 聚合操作倾斜解决方案

方案一:两阶段聚合

// 优化前
val result = rdd.reduceByKey(_ + _)

// 优化后
val result = rdd
  // 第一阶段:局部聚合,加随机前缀
  .map(x => ((x._1, Random.nextInt(100)), x._2))
  .reduceByKey(_ + _)
  // 第二阶段:全局聚合,去除随机前缀
  .map(x => (x._1._1, x._2))
  .reduceByKey(_ + _)

方案二:自定义分区器

// 定义自定义分区器
class BalancedPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions
  
  def getPartition(key: Any): Int = {
    val k = key.toString
    // 对热点键特殊处理
    if (k == "hot_key_1") {
      Math.abs(Random.nextInt() % partitions)
    } else {
      Math.abs(k.hashCode % partitions)
    }
  }
}

// 使用自定义分区器
val result = rdd
  .partitionBy(new BalancedPartitioner(100))
  .reduceByKey(_ + _)

4.3 数据源倾斜解决方案

方案一:预处理过滤异常数据

// 过滤掉可能导致倾斜的异常值
val cleanedDF = rawDF.filter($"key".isNotNull && $"key" =!= "")

方案二:预聚合处理

// 在ETL阶段进行预聚合
val preAggregatedDF = rawDF
  .repartition(200, $"date", $"region")  // 先按非倾斜维度重分区
  .groupBy($"date", $"region", $"user_id")  // 低粒度预聚合
  .agg(sum($"value").as("value"))

5. 实际案例分析

案例:用户行为数据分析中的数据倾斜

问题描述: 在电商用户行为分析中,需要统计每个商品的点击次数,但某些热门商品的点击量远高于其他商品。

解决方案

// 原始代码
val clickCounts = userClicks
  .groupBy("product_id")
  .count()

// 优化后代码
// 1. 数据采样,识别热点商品
val sampleDF = userClicks.sample(0.1)
val hotProducts = sampleDF
  .groupBy("product_id")
  .count()
  .orderBy($"count".desc)
  .limit(10)
  .collect()
  .map(_.getAs[String]("product_id"))
  .toSet
val bcHotProducts = spark.sparkContext.broadcast(hotProducts)

// 2. 对热点商品特殊处理
val processedClicks = userClicks.mapPartitions(iter => {
  val hotProds = bcHotProducts.value
  iter.map(row => {
    val productId = row.getAs[String]("product_id")
    if (hotProds.contains(productId)) {
      // 为热点商品添加随机后缀
      Row.fromSeq(row.toSeq :+ (productId + "_" + Random.nextInt(100)))
    } else {
      // 非热点商品保持不变
      Row.fromSeq(row.toSeq :+ productId)
    }
  })
}, true)

// 3. 使用处理后的键进行聚合
val schema = userClicks.schema.add("balanced_key", StringType)
val balancedDF = spark.createDataFrame(processedClicks, schema)

val result = balancedDF
  .groupBy("balanced_key")
  .count()
  // 去除随机后缀,恢复原始商品ID
  .withColumn("product_id", 
    when($"balanced_key".contains("_"), 
      split($"balanced_key", "_").getItem(0))
    .otherwise($"balanced_key"))
  .groupBy("product_id")
  .sum("count")
  .drop("balanced_key")

6. 预防数据倾斜的最佳实践

  1. 合理设计键:避免使用可能产生热点的键(如时间戳精确到秒)
  2. 提前预估:在开发前评估数据分布情况
  3. 监控机制:建立任务监控,及时发现倾斜问题
  4. 数据质量:在数据接入阶段处理异常值和空值
  5. 并行度:设置合理的并行度,避免分区过少

有效解决数据倾斜问题需要结合具体业务场景和数据特点,灵活运用各种技术手段,从根本上优化数据分布,提高Spark作业的执行效率。

实战应用题

Q8: 请介绍Spark SQL的优化技术,如何提高SQL查询性能?

标准答案:

Spark SQL是Spark生态系统中的重要组件,它提供了结构化数据处理能力和SQL查询接口。通过一系列优化技术,可以显著提升SQL查询性能。

1. Spark SQL优化技术概述

Spark SQL优化主要分为以下几个方面:

  • Catalyst优化器:基于规则和成本的查询优化
  • Tungsten执行引擎:内存管理和代码生成优化
  • 参数配置:针对特定场景的参数调整

2. 关键优化配置

// 开启自适应查询执行(AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
// 启用小分区合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// 设置合并后的目标分区大小
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

// 广播Join优化
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
// 启用AQE优化的Join策略
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

// 动态分区裁剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
// 启用Join重排序
spark.conf.set("spark.sql.optimizer.joinReorder.enabled", "true")

3. 查询优化实例

// 优化前
val result = spark.sql("""
  SELECT c.customer_name, sum(o.order_amount) as total_amount
  FROM orders o
  JOIN customers c ON o.customer_id = c.customer_id
  WHERE o.order_date > '2023-01-01'
  GROUP BY c.customer_name
""")

// 优化后
// 1. 使用广播Join
val customers = spark.table("customers")
val orders = spark.table("orders").filter($"order_date" > "2023-01-01")
import org.apache.spark.sql.functions.broadcast
val result = orders.join(broadcast(customers), "customer_id")
  .groupBy($"customer_name")
  .agg(sum($"order_amount").as("total_amount"))

// 2. 启用AQE和其他优化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
val result = spark.sql("""
  SELECT /*+ BROADCAST(c) */ 
    c.customer_name, sum(o.order_amount) as total_amount
  FROM orders o
  JOIN customers c ON o.customer_id = c.customer_id
  WHERE o.order_date > '2023-01-01'
  GROUP BY c.customer_name
""")

4. 性能优化最佳实践

  • 数据格式选择:使用列式存储格式(Parquet、ORC)
  • 分区策略:根据查询模式选择合适的分区键
  • 预聚合:对常用查询创建物化视图
  • 缓存管理:缓存频繁使用的表或查询结果
  • SQL Hint:使用查询提示指导优化器

通过综合应用这些优化技术,可以显著提升Spark SQL查询性能,特别是在处理大规模数据集时效果更为明显。

Q9: 当Spark应用出现故障或性能问题时,如何进行排查和解决?

标准答案:

Spark应用故障排查是一项系统性工作,需要从多个维度收集信息,分析根因,并采取相应的解决措施。

1. 故障排查方法论

有效的故障排查需要遵循以下方法论:

  • 系统性分析:从应用、集群到资源全面考虑
  • 数据驱动:基于日志和监控数据进行分析
  • 逐层排除:从外到内或从内到外逐层排查
  • 复现验证:尝试复现问题并验证解决方案

2. 排查步骤详解

  1. 查看Spark UI

    • 分析失败的Stage和Task
    • 检查Job执行时间和资源使用情况
    • 识别异常的执行模式(如数据倾斜)
  2. 检查日志信息

    • Driver日志:应用级别错误和异常
    • Executor日志:任务执行错误
    • Worker/Master日志:集群级别问题
    • 系统日志:资源和环境问题
  3. 资源监控分析

    • CPU使用率:是否存在计算瓶颈
    • 内存使用:是否存在OOM或GC问题
    • 磁盘I/O:是否存在存储瓶颈
    • 网络传输:是否存在网络瓶颈
  4. 常见问题诊断

问题类型症状诊断方法可能解决方案
OOM错误java.lang.OutOfMemoryError检查GC日志,内存使用趋势增加内存,调整分区,优化代码
数据倾斜少数任务执行时间远长于其他任务查看Stage详情,分析数据分布加盐处理,预聚合,调整分区
序列化错误java.io.NotSerializableException检查类的序列化实现实现Serializable接口,使用@transient注解
Shuffle失败FetchFailedException检查Shuffle写入和读取日志增加内存,调整Shuffle参数
资源不足任务排队,执行缓慢查看集群资源使用情况增加资源,优化资源分配

3. 性能问题排查工具

# 查看Spark应用日志
yarn logs -applicationId application_1234567890_0001

# 使用jstack查看JVM线程状态
jstack <pid> > thread_dump.txt

# 使用jmap查看内存使用
jmap -heap <pid>

# 使用Spark History Server查看历史应用
http://history-server:18080

4. 常见问题解决方案

// 解决OOM问题
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.memory.fraction", "0.8")
spark.conf.set("spark.memory.storageFraction", "0.3")

// 解决Shuffle问题
spark.conf.set("spark.shuffle.file.buffer", "64k")
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.shuffle.io.maxRetries", "10")

// 解决数据倾斜
// 参见数据倾斜解决方案

5. 预防措施

  • 监控系统:建立应用和集群监控
  • 性能测试:在生产环境前进行压力测试
  • 渐进式部署:先小规模测试,再扩大规模
  • 容量规划:根据数据增长预估资源需求

通过系统性的故障排查和性能优化,可以提高Spark应用的稳定性和效率,减少生产环境中的问题发生。

深度技术原理题

Q10: 请详细解释Spark的Catalyst优化器的工作原理及其优化规则。

标准答案:

Catalyst优化器是Spark SQL的核心优化引擎,它基于Scala的模式匹配和函数式编程特性构建,为Spark SQL提供了强大的查询优化能力。理解Catalyst的工作原理对于编写高效的Spark SQL应用至关重要。

1. Catalyst优化器架构

Catalyst优化器的核心架构包括以下组件:

  • 树节点转换框架:基于Scala模式匹配的树转换机制
  • 规则执行引擎:应用优化规则的执行器
  • 成本模型:评估不同执行计划性能的模型
  • 代码生成引擎:将物理计划转换为高效执行代码

2. 优化流程详解

graph TD
    A[SQL/DataFrame API] --> B["抽象语法树(AST)"]
    B --> C[未解析逻辑计划]
    C --> D[解析逻辑计划]
    D --> E[优化逻辑计划]
    E --> F[物理计划生成]
    F --> G[物理计划优化]
    G --> H[选择最优计划]
    H --> I[代码生成]
    I --> J[执行]
    
    style A fill:#f9f9f9,stroke:#333
    style B fill:#d4f1f9,stroke:#05a4d1
    style C fill:#d4f1f9,stroke:#05a4d1
    style D fill:#ffe6cc,stroke:#d79b00
    style E fill:#ffe6cc,stroke:#d79b00
    style F fill:#d5e8d4,stroke:#82b366
    style G fill:#d5e8d4,stroke:#82b366
    style H fill:#d5e8d4,stroke:#82b366
    style I fill:#e1d5e7,stroke:#9673a6
    style J fill:#f8cecc,stroke:#b85450

3. 优化阶段详细说明

  1. 语法分析

    • 将SQL语句解析为抽象语法树(AST)
    • 使用ANTLR语法解析器处理SQL语法
    • 转换DataFrame/Dataset API调用为内部表示
  2. 逻辑计划生成与解析

    • 将AST转换为未解析逻辑计划
    • 通过Catalog解析表名、列名和函数名
    • 进行类型推断和类型检查
  3. 逻辑计划优化

    • 应用基于规则的优化策略
    • 优化转换是声明式的,基于模式匹配
    • 多轮应用规则直至计划稳定
  4. 物理计划生成

    • 将逻辑算子转换为物理算子
    • 为同一逻辑操作生成多种物理实现
    • 如Sort可实现为SortExec或ExternalSortExec
  5. 物理计划优化与选择

    • 使用基于成本的优化器评估计划
    • 考虑数据大小、操作复杂度等因素
    • 选择成本最低的执行计划
  6. 代码生成

    • 使用Janino编译器生成Java字节码
    • 将多个操作融合为单个函数
    • 减少虚函数调用和解释开销

4. 核心优化规则详解

优化规则描述示例
谓词下推将过滤条件尽早应用,减少数据量SELECT * FROM t1 JOIN t2 WHERE t1.id > 100 → 先过滤t1.id > 100再Join
列裁剪只读取和处理查询所需的列SELECT name FROM (SELECT id, name, age FROM users) → 只读取name列
常量折叠编译时计算常量表达式SELECT id + 5 * 10 FROM tSELECT id + 50 FROM t
Join重排序优化多表Join的顺序小表先Join,减少中间结果
Join选择根据表大小选择Join策略小表使用BroadcastHashJoin,大表使用SortMergeJoin
分区裁剪只读取包含所需数据的分区WHERE date='2023-01-01' → 只读取该日期的分区
聚合优化部分聚合+最终聚合先在每个分区内聚合,再全局聚合

5. 代码示例:Catalyst转换过程

// 示例查询
val query = spark.sql("""
  SELECT c.name, sum(o.amount) as total
  FROM orders o
  JOIN customers c ON o.customer_id = c.id
  WHERE o.date > '2023-01-01'
  GROUP BY c.name
  HAVING sum(o.amount) > 1000
""")

// 查看逻辑计划
println("Logical Plan:")
query.queryExecution.logical.explain(true)

// 查看优化后的逻辑计划
println("Optimized Logical Plan:")
query.queryExecution.optimizedPlan.explain(true)

// 查看物理计划
println("Physical Plan:")
query.queryExecution.sparkPlan.explain(true)

// 查看执行计划
println("Executed Plan:")
query.queryExecution.executedPlan.explain(true)

6. 自定义优化规则

Catalyst允许开发者扩展优化规则,实现自定义优化:

// 自定义优化规则示例
object MyOptimizationRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(condition, child) if canOptimize(condition) =>
      // 自定义优化逻辑
      OptimizedFilter(optimizeCondition(condition), child)
  }
}

// 注册自定义规则
spark.experimental.extraOptimizations = Seq(MyOptimizationRule)

Catalyst优化器是Spark SQL性能优越的关键因素,通过理解其工作原理和优化规则,可以编写更高效的Spark SQL应用,并在必要时通过自定义规则进一步提升性能。

Q11: 请详细介绍Spark内存管理的演进历史,旧版本和新版本的区别是什么?

标准答案:

Spark内存管理机制经历了重要的演进过程,从早期的静态内存管理到现代的统一内存管理,显著提升了内存利用率和应用性能。

1. 内存管理演进的背景

Spark作为内存计算框架,其性能很大程度上取决于内存管理效率。早期版本的内存管理机制存在诸多问题,如内存划分固定、配置复杂、资源利用率低等,这促使Spark团队不断优化内存管理机制。

2. 静态内存管理(Static Memory Management)

静态内存管理是Spark 1.6之前的默认模式,其特点是:

  • 固定内存划分:预先为不同用途划分固定比例的内存
  • 严格边界:各内存区域之间不能动态调整
  • 手动配置:需要用户手动调整多个参数

静态内存划分

+---------------------------+---------------------------+---------------+
|     Storage Memory        |     Execution Memory      |  Other Memory |
| spark.storage.memoryFraction  | spark.shuffle.memoryFraction |  Remainder    |
|        (默认0.6)          |        (默认0.2)          |    (0.2)      |
+---------------------------+---------------------------+---------------+

关键参数

// 静态内存管理关键参数
spark.storage.memoryFraction = 0.6  // 缓存RDD数据的内存比例
spark.storage.unrollFraction = 0.2  // 用于展开RDD的内存比例
spark.shuffle.memoryFraction = 0.2  // Shuffle操作的内存比例

3. 统一内存管理(Unified Memory Management)

统一内存管理是Spark 1.6及之后的默认模式,其特点是:

  • 动态内存共享:Storage和Execution内存可以相互借用
  • 简化配置:减少配置参数,更易使用
  • 自适应调整:根据运行时需求动态分配内存

统一内存划分

+--------------------+----------------------+---------------+
|      Reserved      |      User Memory     |  Spark Memory |
|      (300MB)       | (1-spark.memory.fraction) | spark.memory.fraction |
+--------------------+----------------------+---------------+
                                            |
                     +----------------------+---------------+
                     |    Storage Memory    | Execution Memory |
                     | spark.memory.storageFraction | Remainder |
                     +----------------------+---------------+

关键参数

// 统一内存管理关键参数
spark.memory.fraction = 0.75  // Spark Memory占JVM堆内存的比例
spark.memory.storageFraction = 0.5  // Storage Memory初始占比

4. 两种模式的核心区别

特性静态内存管理统一内存管理
内存划分固定比例,不可调整动态共享,可相互借用
配置复杂度多参数,调优复杂少量参数,简化配置
内存利用率较低,常有浪费较高,按需分配
适用场景负载稳定,可预测多样化负载,资源竞争
溢出处理直接溢出到磁盘先尝试借用,再溢出
版本支持1.6之前默认1.6及之后默认

5. 内存借用机制详解

在统一内存管理模式下:

  1. Storage Memory不足时

    • 如果Execution Memory有空闲,可以借用
    • 如果没有空闲,则溢出到磁盘
  2. Execution Memory不足时

    • 如果Storage Memory有空闲,可以借用
    • 如果Storage Memory中有部分是被Execution Memory借用的,可以强制收回
    • Execution Memory不会淘汰Storage Memory中的数据

6. 代码示例:配置对比

// 静态内存管理配置示例
spark.storage.memoryFraction = 0.6
spark.storage.unrollFraction = 0.2
spark.shuffle.memoryFraction = 0.2

// 统一内存管理配置示例
spark.memory.fraction = 0.75
spark.memory.storageFraction = 0.5

7. 实际应用建议

  • 内存密集型计算:增加spark.memory.fraction,分配更多内存给Spark
  • 缓存优先场景:增加spark.memory.storageFraction,提高缓存容量
  • Shuffle密集场景:降低spark.memory.storageFraction,提供更多执行内存
  • 监控内存使用:通过Spark UI监控内存使用情况,及时调整参数

Spark内存管理的演进体现了分布式计算框架对资源利用效率的不断追求。统一内存管理机制显著提升了Spark的内存利用率,减少了OOM错误,简化了配置,是Spark性能优化的重要里程碑。

Q12: 请详细对比Spark Streaming和Structured Streaming的区别,并说明各自的适用场景。

标准答案:

Spark提供了两种流处理引擎:Spark Streaming和Structured Streaming,它们在设计理念、API、处理模型等方面存在显著差异。深入理解这些差异对于选择合适的流处理技术至关重要。

1. 基本概念对比

  • Spark Streaming: Spark 0.7版本引入的第一代流处理引擎,基于RDD模型,采用微批处理架构,将流数据分割成小批次进行处理。

  • Structured Streaming: Spark 2.0版本引入的第二代流处理引擎,基于Spark SQL引擎,将流数据视为无界表,提供更高级的API和优化。

2. 核心特性对比

特性Spark StreamingStructured Streaming差异影响
处理模型微批处理(DStream)连续处理(无界表)Structured Streaming可实现更低延迟
API抽象DStream APIDataFrame/Dataset APIStructured Streaming使用更统一的API
编程模型函数式转换声明式查询Structured Streaming代码更简洁
容错机制WAL + Checkpoint状态存储 + Checkpoint两者都支持容错,但实现方式不同
延迟秒级毫秒级(连续处理模式)Structured Streaming可实现更低延迟
状态管理updateStateByKey/mapWithState内置状态管理Structured Streaming状态管理更强大
事件时间有限支持原生支持Structured Streaming更适合事件时间处理
水印机制不支持支持Structured Streaming能更好处理乱序数据
输出模式固定模式Complete/Append/UpdateStructured Streaming提供更灵活的输出选项
优化器Catalyst优化器Structured Streaming查询性能更高
端到端一致性至少一次精确一次Structured Streaming提供更强的一致性保证

3. 代码示例对比

Spark Streaming示例

// 创建StreamingContext
val ssc = new StreamingContext(sparkContext, Seconds(1))

// 从Kafka读取数据
val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// 处理数据
val wordCounts = kafkaStream
  .map(record => record.value)
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

// 输出结果
wordCounts.print()

// 启动流处理
ssc.start()
ssc.awaitTermination()

Structured Streaming示例

// 从Kafka读取数据
val kafkaStream = spark.readStream
          .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "topic")
          .load()

// 处理数据
val wordCounts = kafkaStream
  .selectExpr("CAST(value AS STRING)")
  .as[String]
  .flatMap(_.split(" "))
  .groupBy("value")
  .count()

// 输出结果
val query = wordCounts.writeStream
  .outputMode("complete")
          .format("console")
          .start()
      
query.awaitTermination()

4. 高级特性对比

事件时间处理

  • Spark Streaming:需要手动实现,较为复杂
  • Structured Streaming:内置支持
    // 使用事件时间窗口
    val windowedCounts = kafkaStream
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
      .as[(String, Timestamp)]
      .flatMap(record => record._1.split(" ").map((_, record._2)))
      .groupBy(
        window($"_2", "10 minutes", "5 minutes"),
        $"_1"
      )
      .count()
    

水印机制

  • Spark Streaming:不支持
  • Structured Streaming:内置支持
  // 添加水印
  val windowedCounts = kafkaStream
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
      window($"timestamp", "10 minutes", "5 minutes"),
      $"value"
    )
    .count()

5. 性能与扩展性对比

方面Spark StreamingStructured Streaming
查询优化无内置优化器使用Catalyst优化器
内存使用基于RDD基于Tungsten
吞吐量更高(得益于优化)
延迟取决于批次间隔可配置(微批或连续)
扩展性水平扩展水平扩展

6. 适用场景建议

选择Spark Streaming的场景

  • 与现有RDD代码集成
  • 简单的流处理需求
  • 对API稳定性要求高(API较为稳定)
  • 需要自定义复杂的转换操作
  • 与遗留系统集成

选择Structured Streaming的场景

  • 新项目开发
  • 需要低延迟处理
  • 需要事件时间处理和水印支持
  • 需要强一致性保证
  • 复杂的流处理需求(如窗口聚合、会话分析)
  • 与Spark SQL生态系统集成

7. 未来发展趋势

Structured Streaming是Spark流处理的未来发展方向,Spark团队将主要精力投入到Structured Streaming的改进和优化中。对于新项目,建议优先考虑Structured Streaming,除非有特殊需求只能使用Spark Streaming。

总体而言,Structured Streaming相比Spark Streaming提供了更高级的API、更强大的功能和更好的性能,是Spark流处理技术的重要进步。

Q13: 请详细解释Spark如何实现Exactly-Once语义,包括原理和实现方式。

标准答案:

在分布式流处理系统中,数据处理语义是一个核心问题,特别是在面对节点故障和网络分区等情况时。Exactly-Once语义是指每条数据被精确处理一次,不多不少,这是最强的一致性保证。Spark通过多层机制实现了Exactly-Once语义。

1. 数据处理语义级别

首先理解三种主要的数据处理语义:

语义级别描述实现难度性能影响
At-most-once数据最多处理一次,可能丢失几乎无影响
At-least-once数据至少处理一次,可能重复轻微影响
Exactly-once数据精确处理一次,不丢失不重复可能显著影响

2. Spark实现Exactly-Once的核心机制

Spark通过以下多层机制共同保证Exactly-Once语义:

2.1 数据源端保证

  • 可重放的数据源:使用支持偏移量跟踪的数据源(如Kafka)
  • 偏移量管理:精确记录和管理已处理数据的偏移量
  • 断点续传:从上次处理的位置继续处理

2.2 处理过程保证

  • 确定性操作:确保相同输入产生相同输出
  • Checkpoint机制:定期保存计算状态和进度
  • WAL(预写日志):记录所有状态变更操作
  • 幂等性转换:确保重复执行不会产生副作用

2.3 数据输出保证

  • 幂等性写入:确保重复写入不会导致数据重复
  • 事务性写入:使用支持事务的存储系统
  • 两阶段提交:确保数据处理和结果写入的原子性
  • 输出提交协议:将输出操作与偏移量提交绑定

3. Structured Streaming中的Exactly-Once实现

Structured Streaming通过以下方式实现Exactly-Once:

graph TD
    A[数据源] --> B[偏移量跟踪]
    B --> C[微批处理]
    C --> D[状态管理]
    D --> E[输出模式]
    E --> F[事务性Sink]
    
    B -.-> G[WAL/Checkpoint]
    D -.-> G
    F -.-> G
    
    style A fill:#d4f1f9,stroke:#05a4d1
    style B fill:#ffe6cc,stroke:#d79b00
    style C fill:#d5e8d4,stroke:#82b366
    style D fill:#e1d5e7,stroke:#9673a6
    style E fill:#f8cecc,stroke:#b85450
    style F fill:#f8cecc,stroke:#b85450
    style G fill:#fff2cc,stroke:#d6b656

3.1 详细实现机制

  1. 偏移量跟踪

    • 记录每个数据源的读取位置
    • 将偏移量与处理结果关联
    • 在Checkpoint中保存偏移量信息
  2. 状态管理

    • 使用状态存储保存中间状态
    • 定期Checkpoint状态到可靠存储
    • 故障恢复时重建状态
  3. 输出提交协议

    • 将结果写入与偏移量提交绑定
    • 确保原子性:要么都成功,要么都失败
    • 使用事务性Sink或幂等性写入

4. 代码示例:实现Exactly-Once语义

4.1 使用事务性输出

// 使用foreachBatch实现事务性输出
val query = inputStream
  .writeStream
  .foreachBatch { (batchDF, batchId) =>
    // 开启事务
    val connection = getConnection()  // 获取数据库连接
    connection.beginTransaction()
    try {
      // 使用batchId确保幂等性
      val outputPath = s"output/batch_$batchId"
      
      // 删除可能存在的旧数据(幂等性保证)
      connection.executeUpdate(s"DELETE FROM results WHERE batch_id = $batchId")
      
      // 写入新数据
      batchDF.write
        .format("jdbc")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("url", jdbcUrl)
        .option("dbtable", "results")
        .option("user", user)
        .option("password", password)
        .mode("append")
        .save()
        
      // 提交事务
      connection.commit()
    } catch {
      case e: Exception =>
        // 回滚事务
        connection.rollback()
        throw e
    } finally {
      connection.close()
    }
  }
  .option("checkpointLocation", "/checkpoint")
  .start()

4.2 使用幂等性写入

// 使用幂等性写入
val query = inputStream
  .writeStream
  .foreachBatch { (batchDF, batchId) =>
    // 使用批次ID作为唯一标识,确保幂等性
    val outputPath = s"output/batch_$batchId"
    
    // 覆盖写入,确保幂等性
    batchDF.write
      .mode("overwrite")  // 覆盖模式确保幂等性
      .parquet(outputPath)
      
    // 可选:更新元数据表记录已处理的批次
    spark.sql(s"""
      MERGE INTO batch_metadata
      USING (SELECT $batchId as id) AS source
      ON batch_metadata.id = source.id
      WHEN MATCHED THEN UPDATE SET processed_time = current_timestamp()
      WHEN NOT MATCHED THEN INSERT (id, processed_time) VALUES ($batchId, current_timestamp())
    """)
  }
  .option("checkpointLocation", "/checkpoint")
  .start()

5. 不同场景下的Exactly-Once实现

场景实现方式关键点
Kafka → HDFSWAL + 幂等性写入使用批次ID作为文件名或路径
Kafka → 数据库两阶段提交将偏移量与数据库事务绑定
Kafka → Kafka事务性API使用Kafka事务API
有状态操作Checkpoint + 状态存储定期Checkpoint状态

6. 实现Exactly-Once的最佳实践

  1. 选择合适的数据源:使用支持重放和偏移量管理的数据源
  2. 启用Checkpoint:配置可靠的Checkpoint存储
  3. 使用幂等性操作:确保重复执行不会产生副作用
  4. 事务性输出:使用支持事务的输出系统
  5. 错误处理:妥善处理异常,避免部分提交
  6. 监控与验证:建立监控机制验证Exactly-Once语义

7. 限制与挑战

  • 性能开销:实现Exactly-Once通常会带来一定性能开销
  • 外部系统限制:依赖外部系统对事务的支持
  • 复杂性增加:实现和维护更为复杂
  • 调试难度:问题排查更具挑战性

Spark的Exactly-Once语义实现是一个多层次的系统,通过结合多种机制共同保证数据处理的一致性。理解这些机制有助于构建可靠的流处理应用,并在面对故障时保持数据一致性。

故障排查与运维题

Q1: 如何诊断和解决Spark应用中的性能瓶颈和数据倾斜问题?

标准答案:

Spark应用性能问题主要分为资源瓶颈、数据倾斜、代码优化等几个方面,需要系统性地进行诊断和解决。

1. 性能问题诊断方法

Spark UI分析

  • 作业视图:检查整体执行时间、失败任务、重试次数
  • Stage视图:分析每个Stage的执行时间,找出耗时最长的Stage
  • Task视图:查看任务执行时间分布,识别异常值(数据倾斜的标志)
  • Storage视图:检查缓存使用情况和内存压力
  • Executors视图:监控资源使用情况、GC活动

日志分析

// 在代码中添加性能计时器
val startTime = System.currentTimeMillis()
val result = dataFrame.groupBy("key").count()
val endTime = System.currentTimeMillis()
println(s"Operation took ${endTime - startTime} ms")

// 启用SQL执行统计
spark.conf.set("spark.sql.execution.planChangeLog.level", "WARN")

2. 数据倾斜问题解决方案

识别数据倾斜

// 检查键分布
val keyDistribution = rdd
  .map(x => (x._1, 1))
  .reduceByKey(_ + _)
  .sortBy(_._2, ascending = false)
  .take(10)

println("Top 10 keys by frequency:")
keyDistribution.foreach(println)

解决方案

(1) 预聚合 + 二次聚合

// 原始代码(可能倾斜)
val result = rdd.reduceByKey(_ + _)

// 优化后(两阶段聚合)
val localAggregated = rdd.mapPartitions(iter => {
  iter.toList.groupBy(_._1).mapValues(_.map(_._2).sum).iterator
})
val result = localAggregated.reduceByKey(_ + _)

(2) 加盐处理

// 对倾斜的Key加盐
val skewedKeys = Set("hot_key1", "hot_key2") // 预先识别的热点键
val saltedRDD = rdd.map { case (key, value) =>
  if (skewedKeys.contains(key)) {
    val salt = Random.nextInt(10)
    ((key, salt), value)
  } else {
    ((key, 0), value)
  }
}

// 第一次聚合
val firstAgg = saltedRDD.reduceByKey(_ + _)

// 去盐,第二次聚合
val result = firstAgg
  .map { case ((key, salt), value) => (key, value) }
  .reduceByKey(_ + _)

(3) 广播变量优化Join

// 原始Join(可能倾斜)
val joinResult = largeRDD.join(smallRDD)

// 优化:广播小RDD
val smallMap = smallRDD.collectAsMap()
val broadcastMap = spark.sparkContext.broadcast(smallMap)

val joinResult = largeRDD.mapPartitions { iter =>
  iter.flatMap { case (k, v1) =>
    broadcastMap.value.get(k) match {
      case Some(v2) => Iterator((k, (v1, v2)))
      case None => Iterator.empty
    }
  }
}

3. 资源配置优化

内存配置

# 调整内存配置
spark-submit \
  --executor-memory 8g \
  --executor-cores 4 \
  --driver-memory 4g \
  --conf spark.memory.fraction=0.8 \
  --conf spark.memory.storageFraction=0.3 \
  --conf spark.executor.memoryOverhead=2g \
  --conf spark.dynamicAllocation.enabled=true

并行度调整

// 设置适当的并行度
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.default.parallelism", 200)

// 或者根据数据量动态计算
val optimalPartitions = Math.max(200, totalSizeInGB * 2)
dataFrame.repartition(optimalPartitions)

4. 监控与预防措施

监控指标

  • CPU利用率:保持在70-80%为佳
  • 内存使用率:避免频繁GC和OOM
  • Shuffle数据量:减少大量数据移动
  • 任务执行时间分布:标准差小于平均值的20%为佳

性能调优最佳实践

  • 定期进行性能基准测试
  • 建立关键指标监控仪表板
  • 实施自动告警机制
  • 对关键任务进行性能分析和优化
  • 维护性能优化知识库和最佳实践

Q2: 如何构建高可用的Spark生产环境,并实施有效的监控和故障恢复策略?

标准答案:

构建高可用的Spark生产环境需要从架构设计、监控系统、故障恢复和运维流程等多方面入手。

1. 高可用架构设计

Spark HA部署架构

graph TD
    A[Spark应用] --> B[Spark HA]
    B --> C1[主Master]
    B --> C2[备用Master]
    C1 --> D[ZooKeeper集群]
    C2 --> D
    C1 --> E1[Worker1]
    C1 --> E2[Worker2]
    C1 --> E3[Worker3]
    E1 --> F1[Executor]
    E2 --> F2[Executor]
    E3 --> F3[Executor]
    
    style A fill:#d4f1f9
    style B fill:#d5e8d4
    style C1 fill:#ffe6cc
    style C2 fill:#ffe6cc
    style D fill:#f8cecc

YARN集成配置

<!-- yarn-site.xml -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
# Spark on YARN提交配置
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.yarn.maxAppAttempts=4 \
  --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
  --conf spark.yarn.max.executor.failures=10 \
  --conf spark.yarn.executor.failuresValidityInterval=1h

2. 全面监控系统

多层次监控架构

  • 基础设施监控:服务器CPU、内存、磁盘、网络
  • Spark应用监控:作业执行、资源使用、GC活动
  • 业务指标监控:数据处理量、延迟、错误率

监控工具集成

# Prometheus配置示例
scrape_configs:
  - job_name: 'spark_metrics'
    static_configs:
      - targets: ['spark-master:7077', 'spark-worker-1:8081', 'spark-worker-2:8081']
    metrics_path: /metrics
    scheme: http

  - job_name: 'spark_applications'
    static_configs:
      - targets: ['spark-app-exporter:9091']

自定义监控指标

// 注册自定义Spark监听器
val listener = new SparkListener {
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    val jobDuration = jobEnd.time - jobStart.getOrElse(jobEnd.jobId, jobEnd.time)
    metrics.gauge("spark.job.duration").update(jobDuration)
    
    if (jobEnd.jobResult == JobSucceeded) {
      metrics.counter("spark.job.success").inc()
    } else {
      metrics.counter("spark.job.failure").inc()
    }
  }
}

spark.sparkContext.addSparkListener(listener)

告警策略

# Alertmanager配置
rules:
  - alert: SparkJobFailure
    expr: rate(spark_job_failure[5m]) > 0
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "Spark作业失败"
      description: "{{ $labels.job }} 在过去5分钟内有作业失败"

  - alert: SparkExecutorHighMemoryUsage
    expr: spark_executor_memory_used_bytes / spark_executor_memory_max_bytes > 0.9
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Executor内存使用率高"
      description: "Executor {{ $labels.executor_id }} 内存使用率超过90%"

3. 故障恢复机制

应用级恢复

// 设置Checkpoint目录
sc.setCheckpointDir("hdfs://namenode:8020/spark/checkpoints")

// 对关键RDD进行Checkpoint
val result = rdd.map(complexTransformation)
                .filter(filterCondition)
                .checkpoint()

流处理恢复

// Structured Streaming故障恢复配置
val query = df.writeStream
  .format("kafka")
  .option("checkpointLocation", "hdfs://namenode:8020/checkpoints/kafka-stream")
  .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
  .start()

自动重启策略

# 使用supervisord管理Spark应用
[program:spark-streaming-app]
command=/opt/spark/bin/spark-submit --master yarn --deploy-mode cluster /path/to/app.jar
autostart=true
autorestart=true
startretries=10
startsecs=10
redirect_stderr=true
stdout_logfile=/var/log/spark-app.log

4. 运维最佳实践

部署流程自动化

# CI/CD流水线示例 (GitLab CI)
stages:
  - build
  - test
  - deploy

build_jar:
  stage: build
  script:
    - sbt clean assembly
  artifacts:
    paths:
      - target/scala-2.12/app.jar

run_tests:
  stage: test
  script:
    - sbt test

deploy_to_prod:
  stage: deploy
  script:
    - ansible-playbook -i inventory/prod deploy-spark-app.yml
  environment:
    name: production
  when: manual

滚动升级策略

  • 使用蓝绿部署或金丝雀发布
  • 先升级部分节点验证稳定性
  • 设置超时和回滚机制

灾备与数据恢复

  • 跨区域备份关键数据
  • 定期演练故障恢复流程
  • 维护详细的运维手册和故障处理流程

5. 性能与可靠性平衡

资源隔离

# YARN资源队列配置
spark-submit \
  --queue production_critical \
  --conf spark.yarn.am.nodeLabelExpression="production" \
  --conf spark.yarn.executor.nodeLabelExpression="production"

优雅降级策略

// 实现优雅降级逻辑
def processData(data: DataFrame): DataFrame = {
  try {
    // 完整处理逻辑
    data.groupBy("key")
      .agg(
        sum("value").as("total_value"),
        countDistinct("user_id").as("unique_users")
      )
  } catch {
    case e: OutOfMemoryError =>
      logger.warn("内存不足,切换到简化处理逻辑", e)
      // 简化处理逻辑
      data.groupBy("key")
        .agg(sum("value").as("total_value"))
  }
}

通过以上全面的高可用架构设计、监控系统、故障恢复机制和运维最佳实践,可以构建一个稳定可靠的Spark生产环境,确保关键业务应用的连续性和可靠性。