12.Spark

12.Spark

12. Spark 目录 点击展开目录 12. Spark 目录 Spark 概述与环境 Spark简介 Spark特点与优势 Spark vs Hadoop MapReduce Spark应用场景 Spark生态系统 核心组件 Spark 核心概念 RDD核心概念 RDD特性 RDD操作分类 RDD依赖关系 DataFrame与Dataset DataFrame概念 Dataset概念 三者对比分析 分区机制 分区策略 分区调优 Spark 架构与原理 整体架构设计 系统架构总览 部署架构模式 核心组件原理 SparkContext - 应用程序入口 Driver Program - 驱动程序 Cluster Manager - 集群管理器 Executor - 任务执行器 任务调度机制 调度框架总览 DAG调度器原理 任务调度器实现 本地性调度策略 推测执行与容错 存储与内存管理 BlockManager存储引擎 统一内存管理 缓存与持久化策略 Spark 1.6内存管理演进与参数配置 YARN Container OOM实战排查 Shuffle数据交换 Shuffle机制原理 Shuffle性能优化 数据倾斜处理 容错与可靠性 血缘关系容错 Checkpoint检查点 故障恢复机制 资源管理与通信 调度算法策略 动态资源分配 RPC通信机制 序列化与网络传输 Spark SQL与Catalyst Spark SQL概述 主要特性 Catalyst优化器 Catalyst架构原理 优化流程详解 核心优化规则 深入优化规则实现 成本优化器(CBO) 代码生成引擎 自适应查询执行(AQE) 优化器扩展与定制 SparkSQL 实用函数与语法 数据采样与查看 日期与时间处理 字符串处理 直接文件查询(File-based Query) 数组与集合操作 JSON处理 条件与判断 条件计数与聚合 唯一ID生成方法 窗口函数 聚合函数 UDF/UDAF 注册与使用 性能调优与优化 写出排序优化 查询与作业优化 Join优化 缓存与持久化 代码层面优化 网络与I/O优化 常见性能问题 监控与诊断 常见错误解决方案 内存相关错误 网络相关错误 序列化相关错误 资源相关错误 数据相关错误 调试和诊断工具 预防措施 关键参数与配置模板 JVM相关参数 Spark高频面试题 基础概念题 架构原理题 性能调优题 实战应用题 深度技术原理题 故障排查与运维题 Spark 概述与环境 Spark简介 Apache Spark 是一个快速、通用的大数据处理引擎,专为大规模数据处理而设计。它提供了高级API(Java、Scala、Python、R),并支持用于SQL查询、流处理、机器学习和图形处理的优化引擎。 ...

December 24, 2025 · Ralph Wren · 浏览量: --
12.1 Spark 源码解析

12.1 Spark 源码解析

12.1 Spark源码解析 目录 点击展开目录 一、Spark核心架构与初始化 1.1 SparkContext初始化流程 1.2 运行环境构建 二、RDD设计与实现 2.1 RDD核心抽象 2.2 RDD五大特性 2.3 RDD操作执行 三、任务调度系统 3.1 DAGScheduler调度器 3.2 Stage划分算法 3.3 TaskScheduler任务调度 3.4 Task执行机制 3.5 任务分发与调度流程 3.6 容错与监控机制 3.7 失败重试机制 3.8 RDD血统恢复 四、内存管理系统 4.1 统一内存管理 4.2 算子内存存储 4.3 内存监控与优化 4.4 内存管理系统(高级特性) 4.5 统一内存管理(详细实现) 五、Shuffle机制实现 5.1 Sort Shuffle核心 5.2 UnsafeShuffleWriter 六、存储系统设计 6.1 BlockManager存储 6.2 缓存机制 七、网络通信系统 7.1 网络传输服务 7.2 Block传输机制 八、动态资源分配 8.1 资源分配策略 8.2 动态伸缩算法 九、Spark SQL执行引擎 9.1 Catalyst优化器核心 9.2 代码生成与执行 9.3 列式存储与向量化 9.4 自适应查询执行(AQE) 9.5 窗口函数实现原理(以 Lag 为例) 十、广播变量与累加器 10.1 广播变量实现机制 10.2 累加器源码分析 十一、检查点与容错机制 11.1 检查点机制实现 11.2 失败重试与血统恢复 十二、集群管理器集成 12.1 YARN集成源码 12.2 Kubernetes集成 一、Spark核心架构与初始化 1.1 SparkContext初始化流程 SparkContext初始化流程图 graph TD A[SparkContext构造] --> B[创建SparkConf配置] B --> C[创建SparkEnv运行环境] C --> D[创建StatusTracker状态跟踪器] D --> E[创建TaskScheduler任务调度器] E --> F[创建DAGScheduler DAG调度器] F --> G[启动TaskScheduler] G --> H[设置默认并行度] H --> I[SparkContext初始化完成] C --> C1[创建SerializerManager] C --> C2[创建BlockManager] C --> C3[创建MemoryManager] C --> C4[创建MetricsSystem] E --> E1[根据master创建调度器] E1 --> E2[Standalone模式] E1 --> E3[YARN模式] E1 --> E4[Local模式] style A fill:#e1f5fe style I fill:#e8f5e8 style C fill:#fff3e0 style F fill:#f3e5f5 1.2 运行环境构建 SparkContext初始化源码分析 // SparkContext.scala 核心初始化流程 class SparkContext(config: SparkConf) extends Logging { // 1. 创建SparkEnv - 核心运行环境 private val env: SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, numCores, mockOutputCommitCoordinator) } // 2. 创建状态跟踪器 private val statusTracker = new SparkStatusTracker(this, sparkUI) // 3. 创建任务调度器 private val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) private val taskScheduler = ts // 4. 创建DAG调度器 private val dagScheduler = new DAGScheduler(this) // 5. 启动任务调度器 taskScheduler.start() // 6. 设置默认并行度 private val defaultParallelism: Int = taskScheduler.defaultParallelism // 核心方法:创建RDD def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } // 核心方法:提交作业 def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { dagScheduler.runJob(rdd, func, partitions, callSite, resultHandler, localProperties.get) } } 二、RDD设计与实现 2.1 RDD核心抽象 RDD五大特性实现流程 graph LR A[RDD实例化] --> B[getPartitions获取分区列表] B --> C[compute定义计算函数] C --> D[getDependencies设置依赖关系] D --> E[partitioner设置分区器] E --> F[getPreferredLocations位置偏好] F --> G[RDD创建完成] style A fill:#e1f5fe style G fill:#e8f5e8 2.2 RDD五大特性 RDD源码核心实现 // RDD.scala 核心抽象 abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { // 五大特性的具体实现 // 1. 分区列表 protected def getPartitions: Array[Partition] // 2. 计算函数 def compute(split: Partition, context: TaskContext): Iterator[T] // 3. 依赖关系 protected def getDependencies: Seq[Dependency[_]] = deps // 4. 分区器(可选) @transient val partitioner: Option[Partitioner] = None // 5. 位置偏好(可选) protected def getPreferredLocations(split: Partition): Seq[String] = Nil // Transformation操作实现 def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true) } def reduceByKey(func: (T, T) => T): RDD[T] = self.withScope { reduceByKey(defaultPartitioner(self), func) } // Action操作实现 def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } } 2.3 RDD操作执行 RDD操作执行流程图 graph TD A[RDD操作调用] --> B{操作类型} B -->|Transformation| C[创建新RDD] B -->|Action| D[触发作业执行] C --> C1[构建RDD血统] C1 --> C2[设置依赖关系] C2 --> C3[返回新RDD对象] C3 --> E[等待Action触发] D --> D1[调用SparkContext.runJob] D1 --> D2[DAGScheduler.runJob] D2 --> D3[构建DAG图] D3 --> D4[划分Stage] D4 --> D5[提交Task] D5 --> D6[Executor执行] D6 --> D7[返回结果] style C fill:#e8f5e8 style D fill:#ffebee style D3 fill:#fff3e0 style D6 fill:#e1f5fe 三、任务调度系统 3.1 DAGScheduler调度器 DAGScheduler作业提交流程图 graph TD A[用户调用Action] --> B[SparkContext.runJob] B --> C[DAGScheduler.runJob] C --> D[创建ActiveJob] D --> E[submitJob] E --> F[构建DAG图] F --> G[findMissingPartitions] G --> H[getMissingParentStages] H --> I{是否有父Stage} I -->|有| J[递归提交父Stage] I -->|无| K[submitMissingTasks] J --> L[等待父Stage完成] L --> K K --> M[创建TaskSet] M --> N[TaskScheduler.submitTasks] N --> O[分发Task到Executor] O --> P[Task执行完成] P --> Q[Stage完成] Q --> R[检查后续Stage] R --> S[Job完成] style A fill:#e1f5fe style F fill:#fff3e0 style K fill:#e8f5e8 style S fill:#c8e6c9 3.2 Stage划分算法 Stage划分算法流程图 graph TD A[开始Stage划分] --> B[从最终RDD开始] B --> C[遍历RDD依赖] C --> D{依赖类型} D -->|窄依赖| E[加入当前Stage] D -->|宽依赖| F[创建新Stage边界] E --> G[继续遍历父RDD] F --> H[创建ShuffleMapStage] G --> C H --> I[递归处理父RDD] I --> C C --> J{是否还有未处理RDD} J -->|是| C J -->|否| K[Stage划分完成] style A fill:#e1f5fe style F fill:#ffebee style H fill:#fff3e0 style K fill:#e8f5e8 DAGScheduler源码分析 // DAGScheduler.scala 核心调度逻辑 class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { // 事件处理循环 private val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) // 提交作业的核心方法 def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter, Duration.Inf) waiter.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception } } // Stage划分核心算法 private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // 递归创建父Stage getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } createShuffleMapStage(shuffleDep, firstJobId) } } // 查找缺失的父依赖 private def getMissingAncestorShuffleDependencies( rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } ancestors } // 提交Stage private def submitStage(stage: Stage): Unit = { val jobId = activeJobForStage(stage) if (jobId.isDefined) { if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } } } 3.3 内存监控与优化 内存存储状态监控 // 内存使用监控组件 class MemoryMonitor { // 监控Map的内存使用 def monitorMapMemory(map: SizeTrackingAppendOnlyMap[_, _]): MemoryUsage = { val estimatedSize = map.estimateSize() val currentMemory = map.currentMemory val maxMemory = map.maxMemory MemoryUsage( estimatedSize = estimatedSize, currentMemory = currentMemory, maxMemory = maxMemory, utilization = currentMemory.toDouble / maxMemory ) } // 监控Spill状态 def monitorSpillStatus(externalMap: ExternalAppendOnlyMap[_, _, _]): SpillStatus = { val spillCount = externalMap.spills.size val totalSpillSize = externalMap.spills.map(_.size).sum SpillStatus( spillCount = spillCount, totalSpillSize = totalSpillSize, averageSpillSize = if (spillCount > 0) totalSpillSize / spillCount else 0 ) } } case class MemoryUsage( estimatedSize: Long, currentMemory: Long, maxMemory: Long, utilization: Double) case class SpillStatus( spillCount: Int, totalSpillSize: Long, averageSpillSize: Long) 内存存储监控流程图 graph TD A[输入数据] --> B[PartitionedAppendOnlyMap] B --> C{内存是否足够?} C -->|是| D[内存聚合] C -->|否| E[Spill到磁盘] D --> F[返回结果] E --> G[ExternalAppendOnlyMap] G --> H[合并内存和磁盘数据] H --> F I[MemoryMonitor] --> B I --> G J[SpillMonitor] --> E 内存存储优化策略 // 内存分配优化 class MemoryOptimizer { // 动态调整内存阈值 def adjustMemoryThreshold( currentMemory: Long, maxMemory: Long, spillCount: Int): Long = { val utilization = currentMemory.toDouble / maxMemory if (utilization > 0.8 && spillCount > 0) { // 内存使用率高且有Spill,降低阈值 (maxMemory * 0.6).toLong } else if (utilization < 0.5 && spillCount == 0) { // 内存使用率低且无Spill,提高阈值 (maxMemory * 0.9).toLong } else { // 保持当前阈值 (maxMemory * 0.8).toLong } } // 优化Map初始容量 def optimizeInitialCapacity(dataSize: Long): Int = { val estimatedSize = (dataSize * 1.2).toInt math.max(64, math.min(estimatedSize, 1024 * 1024)) } } 3.4 TaskScheduler任务调度 DAG的生成与依赖分析 任务提交完整流程图: ...

December 24, 2025 · Ralph Wren · 浏览量: --
37.delta-lake

37.delta-lake

Delta Lake 技术指南 目录 点击展开目录 概述 核心概念 架构设计 核心特性 安装与配置 基本操作 高级功能 性能优化 最佳实践 故障排查 面试题 概述 Delta Lake 是由 Databricks 开源的存储层,为 Apache Spark 和大数据工作负载提供 ACID 事务、可扩展的元数据处理 和 统一的流批处理。它在现有数据湖之上构建了一个事务层,解决了传统数据湖的可靠性、性能和治理问题。 什么是 Delta Lake Delta Lake 是一个开源存储框架,它使数据湖能够提供数据仓库的可靠性。主要解决以下问题: 数据可靠性问题:传统数据湖缺乏 ACID 事务保证 数据质量问题:难以处理脏数据和数据不一致 性能问题:小文件过多,查询性能差 数据治理问题:缺乏 schema 演进和数据版本管理 核心价值 特性 传统数据湖 Delta Lake ACID 事务 ❌ 不支持 ✅ 完全支持 Schema 演进 ❌ 困难 ✅ 自动处理 时间旅行 ❌ 不支持 ✅ 支持版本回溯 数据质量 ❌ 难以保证 ✅ 内置校验 流批统一 ❌ 分离处理 ✅ 统一接口 性能优化 ❌ 手动维护 ✅ 自动优化 技术背景 Delta Lake 诞生于 Databricks 在构建大规模数据湖时遇到的实际问题。传统的数据湖虽然提供了灵活的存储能力,但在企业级应用中面临诸多挑战: ...

December 25, 2025 · Ralph Wren · 浏览量: --