我是靠谱客的博主 鲜艳小白菜,最近开发中收集的这篇文章主要介绍Spark容错分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

      • 1. RDD自身容错性
      • 2. checkpoint机制
        • **2.1 问题**
        • **2.2 描述**
        • **2.3 总结**
        • **2.4 Cache(persist)原理 源码分析**
          • 2.4.1 Spark示例
          • 2.4.2 分析
            • 2.4.2.1 persist()分析
        • 2.5 读取缓存/checkpoint原理分析
          • 详细分析
      • 附录

1. RDD自身容错性

RDD的Lineage记录的是:粗颗粒度的特定数据Transformation操作(如filter、map、join等)行为。

在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。

缺点:由于这样的粗颗粒的数据模型,限制了Spark的运用场合,所以Spark并不适用于全部高性能要求的场景。

2. checkpoint机制

2.1 问题

Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的 RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化。如果采用 persists 把数据在内存中的话,虽然最快速但是也是最不可靠的;如果放在磁盘上也不是完全可靠的,例如磁盘会损坏,系统管理员可能会清空磁盘。

2.2 描述

在容错机制中,如果集群中一个节点死机了,而且运算窄依赖,则只需要把丢失的父RDD分区重算即可,不依赖于其他节点。但对宽依赖,则需要父RDD的所有分区都重算,这个代价就很昂贵了。因此,Spark 提供设置检查点的方式来保存Shuffle前的祖先RDD数据,将依赖关系删除。当数据丢失时,直接从检查点中恢复数据。为了确保检查点不会因为节点死机而丢失,检查点数据保存在磁盘中,通常是hdfs文件。

2.3 总结

检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助。lineage过长会造成容错成本过高。这样就不如在中间阶段做检查点容错,假设之后有节点出现故障而丢失分区。从做检查点的RDD开始重做Lineage,就会降低开销。

建议:做检查点的RDD最好是已缓存在内存中,否则保存检查点的过程还需要重新计算,产生I/O开销。

2.4 Cache(persist)原理 源码分析

2.4.1 Spark示例
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class demo9 {
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化 JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
//设置检查点存放目录,window为例
sc.setCheckpointDir("hdfs://10.47.85.213/check");
//从test.txt 构建rdd
JavaRDD<String> rdd = sc.textFile("test.txt");
JavaPairRDD<String, Integer> pairRDD = rdd.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
String[] arr = s.split("\s");
for (String ele : arr) {
list.add(new Tuple2<String, Integer>(ele, 1));
}
return list.iterator();
}
}).cache();
//为pairRDD设置检查点
pairRDD.checkpoint();
System.out.println("isCheckpointed:" + pairRDD.isCheckpointed());
System.out.println("checkpoint:" + pairRDD.getCheckpointFile());
pairRDD.collect();
System.out.println("isCheckpointed:" + pairRDD.isCheckpointed());
System.out.println("checkpoint:" + pairRDD.getCheckpointFile());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
}
输出:
isCheckpointed:false
checkpoint:Optional.empty
isCheckpointed:true
checkpoint:Optional[file:/E:/check/6c933408-176a-4117-bfb1-6172b510e7be/rdd-2]
2.4.2 分析
2.4.2.1 persist()分析

我们从RDD.scala中的persist()【cache()方法内部调用的是persist(),使用默认的存储级别】方法入手,如下:


private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// 如果该RDD已经有了storage level,但是还和指定的storage level不相等,那么抛出异常,不支持在一个RDD分配了storage level之后再分配一个storage level
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
//标记这个RDD为persisting
sc.persistRDD(this)
}
//设置RDD的storage level
storageLevel = newLevel
this
}

上面代码主要是标记的作用,运行的时候具体看SparkContext的runJob方法,如下:

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
// 如果是停止状态就抛出异常
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:n" + rdd.toDebugString)
}
//调用了dagScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
// ConsoleProgressBar 控制台输出的job进度条
progressBar.foreach(_.finishAll())
// 最终递归调用doCheckpoint来检查每个父RDD是否需要checkpoint
// checkpoint一般是存储数据到HDFS上,并切掉之前的RDD的lineage
// 以后的RDD若要重用的话都会先检查是否有checkpoint过
rdd.doCheckpoint()
}

这里在job完成之后做checkpoint,这就是上面说的在checkpoint之前做persist的原因,下面是docheckPoint方法详情,如下:

private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
// 该rdd是否已经调用doCheckpoint,如果还没有,则开始处理
if (!doCheckpointCalled) {
// 判断RDDCheckpointData是否已经定义了,如果已经定义了
doCheckpointCalled = true
if (checkpointData.isDefined) {
// 查看是否需要把该rdd的所有依赖即血缘全部checkpoint
if (checkpointAllMarkedAncestors) {
// Linestage上的每一个rdd递归调用该方法
dependencies.foreach(_.rdd.doCheckpoint())
}
// 调用RDDCheckpointData的checkpoint方法
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}

接着查看RDDCheckpointData的checkpoint方法,如下:

final def checkpoint(): Unit = {
// 将checkpoint的状态从Initialized置为CheckpointingInProgress
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
// 调用子类的doCheckpoint,我们以ReliableCheckpointRDD为例,创建一个新的CheckpointRDD
val newRDD = doCheckpoint()
// 将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建的CheckpointRDD
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}

上面的方法中调用了doCheckpoint方法,因为RDDCheckpointData是一个抽象类,doCheckpoint方法是一个抽象方法,看RDDCheckpointData的子类如何实现。

RDDCheckpointData有两个子类:ReliableRDDCheckpointData和LocalRDDCheckpointData。

我们查看ReliableRDDCheckpointData中的doCheckpoint方法,如下:

protected override def doCheckpoint(): CheckpointRDD[T] = {
// 将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}

上面这段代码做了如下的事情:将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD。

具体怎么写看writeRDDToCheckpointDirectory方法,如下:

def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
// 创建checkpoint输出目录
val checkpointDirPath = new Path(checkpointDir)
// 获取HDFS文件系统API接口
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
// 创建目录
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
// 将配置文件信息广播到所有节点
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
// 重新启动一个job,将rdd的分区数据写入HDFS
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
// 如果rdd的partitioner不为空,则将partitioner写入checkpoint目录
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
// 创建一个CheckpointRDD,该分区数目应该和原始的rdd的分区数是一样的
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
}
newRDD
}

2.5 读取缓存/checkpoint原理分析

详细分析

Spark RDD主要由Dependency、Partition、Partitioner组成,Partition是其中之一。一份待处理的原始数据会被按照相应的逻辑(例如jdbc和hdfs的split逻辑)切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度。

**1. ** 我们从Partition入手,Partition源码如下:

/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/** Get the partition's index within its parent RDD*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}

Partition的定义很简单。Partition和RDD是伴生的,所以每一种RDD都有其对应的Partition实现,所以,分析Partition主要是分析其子类。

2. 在RDD.scala中,定义了很多方法,如:

//输入一个partition,对其代表的数据进行计算
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
//数据如何被split的逻辑
protected def getPartitions: Array[Partition]
//这个RDD的依赖——它的父RDD
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
@transient val partitioner: Option[Partitioner] = None

其中的第二个方法,getPartitions()是数据源如何被切分的逻辑,返回值正是Partition,第一个方法compute()是消费切割后的Partition的方法,我们从getPartitions和compute方法入手。

3. RDD 是通过 iterator 来进行计算:每当 Task 运行的时候会调用 RDD 的 Compute 方法进行计算,而 Compute 方法会调用 iterator 方法。iterator()方法在RDD.scala中的源码如下:


final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
// 如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘,
// 如果是磁盘获取的,需要把block缓存在内存中
getOrCompute(split, context)
} else {
// 进行rdd partition的计算或者根据checkpoint读取数据
computeOrReadCheckpoint(split, context)
}
}

4. 这个方法是 final 级别【不能覆写但可以被子类去使用】,先看持久化的逻辑,我们可以看getOrCompute方法,这个方法从内存或者磁盘获取,如果从磁盘获取需要将block缓存到内存:


private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
// 根据rdd id创建RDDBlockId
var readCachedBlock = true
// 是否从缓存的block读取
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
// 如果执行到了这,说明没有获取到block,readCachedBlock设置成false,表示不能从cache中读取。
readCachedBlock = false
// 需要调用该函数重新计算或者从checkpoint读取
computeOrReadCheckpoint(partition, context)
}) match {
// 获取到了结果直接返回
case Left(blockResult) =>
// 如果从cache读取block
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}

5. 其中getOrElseUpdate方法做了什么:如果指定的block存在,则直接获取,否则调用makeIterator方法去计算block,然后持久化最后返回值,代码如下:

 def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// get方法 尝试从本地获取数据,如果获取不到则从远端获取
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
// Need to compute the block.
}
// 如果本地化和远端都没有获取到数据,则调用makeIterator计算,最后将结果写入block
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// 表示写入成功
val blockResult = getLocalValues(blockId).getOrElse {
// 从本地获取数据块
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
// 如果写入失败
// 如果put操作失败,表示可能是因为数据太大,无法写入内存,又无法被磁盘drop,因此我们需要返回这个iterator给调用者以至于他们能够做出决定这个值是什么,怎么办
Right(iter)
}
}

6. 通过get方法获取数据,先从本地获取数据,如果没有则从远端获取,代码如下:


def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
// 从本地获取block
val local = getLocalValues(blockId)
// 如果本地获取到了则返回
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
// 如果本地没有获取到则从远端获取
val remote = getRemoteValues[T](blockId)
// 如果远端获取到了则返回,没有返回None
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}

7. 如何从本地获取block的逻辑在getLocalValues方法中,这个方法会从本地获取block,如果存在返回BlockResult,不存在返回None;如果storage level是磁盘,则还需将得到的block缓存到内存存储,方便下次读取,具体如下:

def getLocalValues(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
// 调用block info manager,锁定该block,然后读取block,返回该block 元数据block info
blockInfoManager.lockForReading(blockId) match {
// 没有读取到则返回None
case None =>
logDebug(s"Block $blockId was not found")
None
// 读取到block元数据
case Some(info) =>
val level = info.level
// 获取存储级别storage level
logDebug(s"Level for block $blockId is $level")
val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
if (level.useMemory && memoryStore.contains(blockId)) {
// 如果使用内存,且内存memory store包含这个block id
// 判断是不是storage level是不是反序列化的,如果是反序列化的,则调用MemoryStore的getValues方法
// 否则调用MemoryStore的getBytes然后反序列输入流返回数据作为迭代器
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
serializerManager.dataDeserializeStream(
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, {
releaseLock(blockId, taskAttemptId)
})
// 构建一个BlockResult对象返回,这个对象包括数据,读取方式以及字节大小
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
// 如果使用磁盘存储,且disk store包含这个block则从磁盘获取,并且把结果放入内存
val diskData = diskStore.getBytes(blockId)
// 调用DiskStore的getBytes方法,如果需要反序列化,则进行反序列
val iterToReturn: Iterator[Any] = {
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskData.toInputStream())(info.classTag)
// 尝试将从磁盘读的溢写的值加载到内存,方便后续快速读取
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
// 如果不需要反序列化,首先将读取到的流加载到内存,方便后续快速读取
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
.map { _.toInputStream(dispose = false) }
.getOrElse { diskData.toInputStream() }
// 然后再返回反序列化之后的数据
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
// 构建BlockResult返回
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskAttemptId)
})
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
// 处理本地读取block失败,报告driver这是一个无效的block,将会删除这个block
handleLocalReadFailure(blockId)
}
}
}

8. 远端读取,即从block所存放的其他block manager(其他节点)获取block,逻辑如下:

private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val ct = implicitly[ClassTag[T]]
getRemoteBytes(blockId).map {
// 将远程fetch的结果进行反序列化,然后构建BlockResult返回
data => val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
new BlockResult(values, DataReadMethod.Network, data.size)
}
}

其中获取获取数据的方法getRemoteBytes,逻辑如下:

def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
// 首先根据blockId获取当前block存在在哪些block manager上
val locations = getLocations(blockId)
// 最大允许的获取block的失败次数为该block对应的block manager数量
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext) { // 开始遍历block manager
val loc = locationIterator.next()
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
// 通过调用BlockTransferSerivce的fetchBlockSync方法从远端获取block
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
runningFailureCount += 1
totalFailureCount += 1
// 如果总的失败数量大于了阀值则返回None
if (totalFailureCount >= maxFetchFailures) {
logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
s"Most recent failure cause:", e)
return None
}
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $runningFailureCount)", e)
if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
locationIterator = getLocations(blockId).iterator
logDebug(s"Refreshed locations from the driver " +
s"after ${runningFailureCount} fetch failures.")
runningFailureCount = 0
}
null
}
// 成功的话,返回ChunkedByteBuffer
if (data != null) {
return Some(new ChunkedByteBuffer(data))
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}

9. 另一个分支checkpiont

根据上面的iterator()的另一个分支:如果block没有被持久化,即storage level为None,我们就需要进行计算或者从Checkpoint读取数据;如果已经checkpoint了,则调用ietrator去读取block数据,否则调用Parent的RDD的compute方法。

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
// 当前rdd是否已经checkpoint和物理化了,如果已经checkpoint,则调用第一个parent rdd的iterator方法获取
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
//否则调用rdd的compute方法开始计算,返回一个Iterator对象
compute(split, context)
}
}

附录

RDD根据useDisk、useMemory、deserialized、off_heap、replication。五个参数的组合提供了11种存储级别

StorageLevel类型类型描述
MEMORY_ONLY(默认级别)将RDD以JAVA对象的形式保存到JVM内存如果分片太大,内存缓存不下,就不缓存
MEMORY_ONLY_SER将RDD以序列化的JAVA对象形式保存到内存
DISK_ONLY将RDD持久化到硬盘
MEMORY_AND_DISK将RDD数据集以JAVA对象的形式保存到JVM内存中,如果有些分片太大不能保存到内存中,则保存到磁盘上,并在下次用时重新从磁盘读取。
MEMORY_AND_DISK_SER与MEMORY_ONLY_SER类似,但当分片太大不能保存到内存中,会将其保存到磁盘中
XXX_2上述5中level后缀添加2代表两副本
OFF_HEAPRDD实际被保存到Tachyon

最后

以上就是鲜艳小白菜为你收集整理的Spark容错分析的全部内容,希望文章能够帮你解决Spark容错分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部