ContextCleaner是一个Spark服务,负责在应用程序范围内清除 shuffles, RDDs, broadcasts, accumulators和checkpointed RDDs,目的是减少长时间运行的数据密集型Spark应用程序的内存需求。
ContextCleaner在驱动程序上运行。 它会在SparkContext启动时被创建并立即启动(并且默认情况下spark.cleaner.referenceTracking Spark属性已启用),生命周期:当SparkContext停止时,它停止。
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-service-contextcleaner.html
ContextCleaner
实例ContextCleaner
需要一个SparkContext。
ContextCleaner
初始化内部注册表和计数器。
ContextCleaner
— start
方法start(): Unit
start
开始清洁线程,并执行一个操作,以System.gc()
每个spark.cleaner.periodicGC.interval间隔请求JVM垃圾收集器(使用)。
注意
请求JVM GC的操作是在periodicGCService
执行程序服务上安排的。
periodicGCService
单线程执行器服务periodicGCService
是一个内部单线程执行程序服务,名称为context-cleaner-periodic-gc以请求JVM垃圾收集器。
注意
在每个spark.cleaner.periodicGC.interval间隔内调度对JVM GC的请求。
定期运行在启动时ContextCleaner
开始,在停止时ContextCleaner
停止。
ShuffleDependency
清理— registerShuffleForCleanup
方法registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit
registerShuffleForCleanup
注册ShuffleDependency进行清理。
在内部,registerShuffleForCleanup
只需为input 执行registerForCleanupShuffleDependency
。
注意
registerShuffleForCleanup
在ShuffleDependency
创建时专门使用。
registerForCleanup
内部方法registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit
在内部,registerForCleanup
将输入添加objectForCleanup
到referenceBuffer
内部队列。
注意
尽管AnyRef
输入objectForCleanup
的类型最广泛,但该类型实际上CleanupTaskWeakReference
是自定义Java的java.lang.ref.WeakReference。
MapOutputTrackerMaster
和中删除随机播放块BlockManagerMaster
— doCleanupShuffle
方法doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit
doCleanupShuffle
执行洗牌清除,这是从当前MapOutputTrackerMaster和BlockManagerMaster中删除洗牌。doCleanupShuffle
还通知CleanerListeners。
在内部,当执行时,您应该在日志中看到以下DEBUG消息:
DEBUG Cleaning shuffle [id]
doCleanupShuffle
注销shuffleId
来自的输入MapOutputTrackerMaster
。
注意
doCleanupShuffle
用于SparkEnv
访问当前MapOutputTracker
。
doCleanupShuffle
shuffleId
从中删除输入的混洗块BlockManagerMaster
。
注意
doCleanupShuffle
用于SparkEnv
访问当前BlockManagerMaster
。
doCleanupShuffle
通知所有注册的CleanerListener
侦听器(来自listeners
内部队列)输入shuffleId
已清除。
最后,您应该在日志中看到以下调试消息:
DEBUG Cleaned shuffle [id]
如果发生任何异常,您应该在日志和异常本身中看到以下错误消息。
ERROR Error cleaning shuffle [id]
注意
doCleanupShuffle
在ContextCleaner
清理随机引用和(而有趣的)同时拟合ALSModel
(在Spark MLlib中)时执行。
表2. Spark属性
Spark Property
默认值
描述
spark.cleaner.periodicGC.interval
30min
控制触发垃圾回收的频率。
spark.cleaner.referenceTracking
true
控制是否在初始化 a 时ContextCleaner
创建 a 。SparkContext
spark.cleaner.referenceTracking.blocking
true
控制清洗线程是否应阻止执行清除任务(除了shuffle以外,后者由spark.cleaner.referenceTracking.blocking.shuffle Spark属性控制)。
这是SPARK-3015true
的解决方法,连续快速删除广播会导致Akka超时。
spark.cleaner.referenceTracking.blocking.shuffle
false
控制清洗线程是否应在随机清洗任务上阻塞。
清理混洗时,这是ContextCleaner的SPARK-3139 Akka超时的false
一种解决方法。
spark.cleaner.referenceTracking.cleanCheckpoints
false
控制如果引用超出范围,是否清理检查点文件。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章