spark listener
阅读原文时间:2023年07月10日阅读:2

最近在做一个需求,当spark程序在读数据或写数据时,将所读的条数或或所写的条数实时的展现出来,这里用到了SparkListener,sparklisten 可以获取spark 各个运行阶段的状态。

首先我们先通过代码来分析下各个方法的功能,再来说思路

package org.apache.spark

import org.apache.spark.scheduler._

import org.apache.spark.sql.{SaveMode, SparkSession}

object ListenerTest {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("test").setMaster("local[*]")

      .set("spark.extraListeners", classOf[BasicJobCounter].getName)

      //.set("spark.extraListeners", classOf[BasicJobCounter].getName)

      .set("spark.executor.heartbeatInterval", "1000ms")

    val spark = SparkSession.builder()

      .config(conf)

      .getOrCreate()

    val fpath = args(0)

    val csvdf = spark.read.format("csv")

      .option("sep", ",")

      .option("inferSchema", "true")

      .option("header", "true")

      .load(fpath)

    csvdf.select("movieids").show()

    val schema = csvdf.schema

    val df = spark.createDataFrame(csvdf.rdd.repartition(5).setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)

    //val df = spark.createDataFrame(csvdf.rdd.setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)

    df.write.mode(SaveMode.Overwrite)

      .format("csv")

      .option("sep", ",")

      .option("header", "true")

      .save(args(1))

    spark.stop()

  }

}

private class OnlyExeUpdata extends SparkListener {

  val map = scala.collection.mutable.Map.empty[Long, Long]

  def getAccOut(list: List[AccumulableInfo], name: String): Option[AccumulableInfo] = {

    list match {

      case Nil => None

      case head :: tail => if (head.name.isDefined && head.name.get == name) {

        Some(head)

      } else {

        getAccOut(tail, name)

      }

    }

  }

  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {

    printx("onExecutorMetricsUpdate")

    val execId = executorMetricsUpdate.execId

    val accu = executorMetricsUpdate.accumUpdates

    println(s"execId  ${execId}")

    for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {

      //println(s"""${stageId}\t${accumUpdates.mkString("<==>")}""")

      /*for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {

        println(s"""${taskId}\t${stageId}\t${acc}""")

        if (3L == stageId) {

          sum += acc.update.get.asInstanceOf[Long]

        }

      }*/

      println(s"""==${taskId}  ${accumUpdates.mkString("<==>")}==""")

      val acc = getAccOut(accumUpdates.toList, "number of output rows")

      if (3L == stageId && acc.isDefined) {

        println(s"${taskId} ${acc.get.update.get.asInstanceOf[Long]}")

        map += taskId -> acc.get.update.get.asInstanceOf[Long]

      }

    }

    if (map.size > 0) {

      val sum = map.values.reduce((x, y) => x + y)

      println(s"sum $sum")

    }

    printx("onExecutorMetricsUpdate")

  }

  def printx(label: String): Unit = {

    println(s"=" * 20 + label + s"=" * 20)

  }

}

private class BasicJobCounter extends SparkListener {

  var lacc = 0L

  // app job stage executor task

  //======================applicatioin=========================

  /**

    *app 开始时跳去的方法

    * 该方法可以获取 appId,appName ,app开始的时间以及 执行程序的用户

    * @param applicationStart

    */

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {

    printx("onApplicationStart")

    println(s"applicationStart.appAttemptId = ${applicationStart.appAttemptId}")

    println(s"applicationStart.appId = ${applicationStart.appId}")

    println(s"applicationStart.appName = ${applicationStart.appName}")

    println(s"applicationStart.driverLogs = ${applicationStart.driverLogs}")

    println(s"applicationStart.sparkUser = ${applicationStart.sparkUser}")

    println(s"applicationStart.time = ${applicationStart.time}")

    printx("onApplicationStart")

  }

  /**

    * app结束时调用的方法

    * 可以获取app结束的时间点

    * @param applicationEnd

    */

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {

    printx("onApplicationEnd")

    println(s"applicationEnd.time  =  ${applicationEnd.time}")

    printx("onApplicationEnd")

  }

  //======================applicatioin=========================

  //======================job===============================

  /**

    * job开始时调用的方法

    * 可以获取jobId,以及该job所包含的stage的信息

    * stage 包括如下信息:stageID,stage 中rdd name,stage 的name 和 task的数量

    * @param jobStart

    */

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {

    printx("onJobStart")

    println(s"jobStart.jobId = ${jobStart.jobId}")

    jobStart.stageIds.foreach(a => println(s"stageId  $a"))

    val stageInfos = jobStart.stageInfos

    stageInfos.foreach {

      si =>

        val rddInfos = si.rddInfos

        println(Seq(

          s"stageId  ${si.stageId}",

          si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),

          s"si.details ${si.details}",

          s"si.name ${si.name}",

          //si.taskMetrics.accumulators().mkString(","),

          si.accumulables.mkString("accu", ",", "accu"),

          s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))

    }

    printx("onJobStart")

  }

  /**

    * jobs 结束时候调用

    * 可以获取jobID, jobResult(job 是否成功)

    * @param jobEnd

    */

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {

    printx("onJobEnd")

    println(s"jobEnd.jobId  ${jobEnd.jobId}")

    println(s"jobEnd.jobResult  ${jobEnd.jobResult}")

    printx("onJobEnd")

  }

  //======================job===============================

  //======================stage========================

  /**

    * stage 提交时调用的方法

    * 可以获取stage的一些信息

    * @param stageSubmitted

    */

  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {

    printx("onStageSubmitted")

    val si = stageSubmitted.stageInfo

    val rddInfos = si.rddInfos

    println(Seq(

      s"stageId  ${si.stageId}",

      si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),

      s"si.details ${si.details}",

      s"si.name ${si.name}",

      //si.taskMetrics.accumulators().mkString(","),

      si.accumulables.mkString("accu", ",", "accu"),

      s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))

    printx("onStageSubmitted")

  }

  /**

    * stage 结束时调用的方法

    * 可以获取stage 的一些信息,比较重要的accumulables,这里包含了一系列的统计信息,

    * 其中就包含了所read or write 的条数,当然了时该stage 的总条数

    * @param stageCompleted

    */

  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {

    printx("onStageCompleted")

    val si = stageCompleted.stageInfo

    println(s"stageId ${stageCompleted.stageInfo.stageId}")

    val rddInfos = si.rddInfos

    println(Seq(

      s"stageId  ${si.stageId}",

      si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),

      s"si.details ${si.details}",

      s"si.name ${si.name}",

      si.taskMetrics.accumulators().mkString("taskmetric", ",", "taskmetric"),

      si.accumulables.mkString("accu", ",", "accu"),

      s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))

    printx("onStageCompleted")

  }

  //======================stage========================

  //===========================executor========================

  /**

    * 当executor的统计指标更新时调用的方法,

    * 可以获取 executorID,以及executor中的累加器(重点就在此)

    * 累加器中包含了taskID,stageID,以及统计信息(其中包含了读写的条数)

    * @param executorMetricsUpdate

    */

  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {

    printx("onExecutorMetricsUpdate")

    val execId = executorMetricsUpdate.execId

    val accu = executorMetricsUpdate.accumUpdates

    println(s"execId   ${execId}")

    for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {

      //println(s"""${stageId}\t${accumUpdates.mkString("<==>")}""")

      for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {

        println(s"""${stageId}\t${taskId}\t${acc}""")

      }

    }

    printx("onExecutorMetricsUpdate")

  }

  /**

    * 添加executor时调用的方法

    * @param executorAdded

    */

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {

    printx("onExecutorAdded")

    val exeId = executorAdded.executorId

    println(s"exeId  ${exeId}")

    val exeInfo = executorAdded.executorInfo

    println(s"exeInfo.executorHost ${exeInfo.executorHost}")

    println(s"exeInfo.logUrlMap ${exeInfo.logUrlMap}")

    println(s"exeInfo.totalCores ${exeInfo.totalCores}")

    printx("onExecutorAdded")

  }

  //===========================executor========================

  //======================task===========================

  /**

    * task开始时调用的方法

    * @param taskStart

    */

  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {

    printx("onTaskStart")

    val stageAttempId = taskStart.stageAttemptId

    val stageId = taskStart.stageId

    val taskInfo = taskStart.taskInfo

    println(s"stageAttempId ${stageAttempId}")

    println(s"stageId ${stageId}")

    println(s"""taskInfo ${taskInfo.accumulables.mkString("<==>")}""")

    printx("onTaskStart")

  }

  /**

    * task 结束时调用的方法

    * @param taskEnd

    */

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {

    printx("onTaskEnd")

    val reason = taskEnd.reason

    val sid = taskEnd.stageId

    val taskInfo = taskEnd.taskInfo

    val taskMetrics = taskEnd.taskMetrics

    println(s"taskend reason ${reason}")

    println(s"stageId sid ${sid}")

    println(s"taskInfo ${taskInfo.accumulables.mkString("<==>")}")

    println(s"""taskMetrics ${taskMetrics.accumulators().mkString("<==>")}""")

    printx("onTaskEnd")

  }

  //======================task===========================

  def printx(label: String): Unit = {

    println(s"=" * 20 + label + s"=" * 20)

  }

}

思路是这样的:

1.df 底层所存储的rdd是可以命名的在获取df时,比如 val df = spark.read.format("csv")…load(..) 时,对df 内rdd重命名 ,df.rdd.setName("你想要取的名字"), 这样在onJobStart 时我们就可以找到对应的df 所在的stage,而onExecutorMetricsUpdate可以实时获取某个stage 所读/写的条数,只需将这个条数记录下来然后进行显示即可,注意:这里要对onExecutorMetricsUpdate 中所有task的累加器数据进行累加,文字不容易描述,具体见OnlyExeUpdata,否则统计的中间结果会大于实际值,onExecutorMetricsUpdate 会在心跳反馈时调用一次,因此可以通过设置心跳时间来控制该方法调用的频率,最后的准确值需要在onStageComplete 方法中获取,

要启动 sparkListener ,可以 在配置中添加对应类set("spark.extraListeners", classOf[BasicJobCounter].getName),文字不是很详细,具体可以运行代码结合输出结果来理解,

注: onExecutorMetricsUpdate  所对应的 accumUpdates 里可能包含了两条 (num of output rows) 的统计,有时候两条值一样,有时候一个为0,一个不为0,当对df repartition 后就会出现两个(num of output rows),具体原因还不清楚,使用时需注意,我只取了有值的(num of output rows),都有值的,随便选一个。

附上运行结果:结果可能有些被编辑器截断了

====================onExecutorAdded====================

exeId  driver

exeInfo.executorHost localhost

exeInfo.logUrlMap Map()

exeInfo.totalCores 4

====================onExecutorAdded====================

====================onApplicationStart====================

applicationStart.appAttemptId = None

applicationStart.appId = Some(local-1543335649097)

applicationStart.appName = test

applicationStart.driverLogs = None

applicationStart.sparkUser = MI

applicationStart.time = 1543335646997

====================onApplicationStart====================

====================onExecutorMetricsUpdate====================

execId   driver

====================onExecutorMetricsUpdate====================

====================onExecutorMetricsUpdate====================

execId   driver

====================onExecutorMetricsUpdate====================

====================onExecutorMetricsUpdate====================

execId   driver

====================onExecutorMetricsUpdate====================

====================onJobStart====================

jobStart.jobId = 0

stageId  0

<<:0__fgf__accuaccu__fgf__si.numTasks 1>>>

====================onJobStart====================

====================onStageSubmitted====================

<<:0__fgf__accuaccu__fgf__si.numTasks 1>>>

====================onStageSubmitted====================

====================onTaskStart====================

stageAttempId 0

stageId 0

taskInfo

====================onTaskStart====================

====================onExecutorMetricsUpdate====================

execId   driver

0    0    AccumulableInfo(2,Some(number of output rows),Some(2),None,true,true,Some(sql))

0    0    AccumulableInfo(1,Some(number of output rows),Some(2),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

====================onTaskEnd====================

taskend reason Success

stageId sid 0

taskInfo AccumulableInfo(1,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(2,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(27,Some(internal.metrics.input.recordsRead),Some(2),Some(2),true,true,None)<==>AccumulableInfo(26,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(10,Some(internal.metrics.resultSize),Some(1299),Some(1299),true,true,None)<==>AccumulableInfo(9,Some(internal.metrics.executorCpuTime),Some(78125000),Some(78125000),true,true,None)<==>AccumulableInfo(8,Some(internal.metrics.executorRunTime),Some(91),Some(91),true,true,None)<==>AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),Some(23),Some(23),true,true,None)

taskMetrics LongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23)<==>LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91)<==>LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000)<==>LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 0, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 2, name: Some(number of output rows), value: 2)<==>SQLMetric(id: 1, name: Some(number of output rows), value: 2)

====================onTaskEnd====================

====================onStageCompleted====================

stageId 0

<<:0__fgf__taskmetricLongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23),LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91),LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000),LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299),LongAccumulator(id: 11, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 12, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 13, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 14, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 15, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 16, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 17, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 18, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 19, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 20, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 21, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 22, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 23, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 24, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 25, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2),LongAccumulator(id: 28, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 29, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu26 -> AccumulableInfo(26,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),8 -> AccumulableInfo(8,Some(internal.metrics.executorRunTime),None,Some(91),true,true,None),2 -> AccumulableInfo(2,Some(number of output rows),None,Some(2),true,true,Some(sql)),7 -> AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),1 -> AccumulableInfo(1,Some(number of output rows),None,Some(2),true,true,Some(sql)),10 -> AccumulableInfo(10,Some(internal.metrics.resultSize),None,Some(1299),true,true,None),27 -> AccumulableInfo(27,Some(internal.metrics.input.recordsRead),None,Some(2),true,true,None),9 -> AccumulableInfo(9,Some(internal.metrics.executorCpuTime),None,Some(78125000),true,true,None),6 -> AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),None,Some(23),true,true,None)accu__fgf__si.numTasks 1>>>

====================onStageCompleted====================

====================onJobEnd====================

jobEnd.jobId  0

jobEnd.jobResult  JobSucceeded

====================onJobEnd====================

====================onJobStart====================

jobStart.jobId = 1

stageId  1

<<:0__fgf__accuaccu__fgf__si.numTasks 4>>>

====================onJobStart====================

====================onStageSubmitted====================

<<:0__fgf__accuaccu__fgf__si.numTasks 4>>>

====================onStageSubmitted====================

====================onTaskStart====================

stageAttempId 0

stageId 1

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 1

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 1

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 1

taskInfo

====================onTaskStart====================

[Stage 1:>                                                          (0 + 4) / 4]====================onTaskEnd====================

taskend reason Success

stageId sid 1

taskInfo AccumulableInfo(54,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(644),Some(643),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(13),Some(13),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(1527),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(234375000),Some(234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(654),Some(654),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(18),Some(18),true,true,None)

taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 18)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 654)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 234375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 13)<==>LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 644)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onExecutorMetricsUpdate====================

execId   driver

1    1    AccumulableInfo(54,Some(number of output rows),Some(159340),None,true,true,Some(sql))

1    2    AccumulableInfo(54,Some(number of output rows),Some(166588),None,true,true,Some(sql))

1    3    AccumulableInfo(54,Some(number of output rows),Some(163819),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

[Stage 1:==============>                                            (1 + 3) / 4]====================onTaskEnd====================

taskend reason Success

stageId sid 1

taskInfo AccumulableInfo(54,Some(number of output rows),Some(296289),Some(399901),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1153),Some(1796),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(296289),Some(399901),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(36),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1484),Some(3011),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(609375000),Some(843750000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1164),Some(1818),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(10),Some(28),true,true,None)

taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 10)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 609375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onTaskEnd====================

taskend reason Success

stageId sid 1

taskInfo

taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 13)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 625000000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onTaskEnd====================

taskend reason Success

stageId sid 1

taskInfo AccumulableInfo(54,Some(number of output rows),Some(303907),Some(1000210),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1172),Some(4121),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(303907),Some(1000210),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(82),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(6022),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(765625000),Some(2234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1188),Some(4170),true,true,None)<==>AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(9),Some(50),true,true,None)

taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 9)<==>LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1188)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 765625000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 303907)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1172)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 303907)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onStageCompleted====================

stageId 1

<<:0__fgf__taskmetricLongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 50),LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 4170),LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 2234375000),LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 6022),LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 82),LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1),LongAccumulator(id: 66, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 67, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 68, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 69, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 70, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 71, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 72, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 73, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 74, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 75, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 76, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 77, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 78, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 1000210),LongAccumulator(id: 81, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 82, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu59 -> AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),None,Some(50),true,true,None),80 -> AccumulableInfo(80,Some(internal.metrics.input.recordsRead),None,Some(1000210),true,true,None),62 -> AccumulableInfo(62,Some(internal.metrics.executorCpuTime),None,Some(2234375000),true,true,None),65 -> AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),None,Some(1),true,true,None),64 -> AccumulableInfo(64,Some(internal.metrics.jvmGCTime),None,Some(82),true,true,None),58 -> AccumulableInfo(58,Some(duration total (min, med, max)),None,Some(4121),true,true,Some(sql)),79 -> AccumulableInfo(79,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),61 -> AccumulableInfo(61,Some(internal.metrics.executorRunTime),None,Some(4170),true,true,None),60 -> AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),54 -> AccumulableInfo(54,Some(number of output rows),None,Some(1000210),true,true,Some(sql)),63 -> AccumulableInfo(63,Some(internal.metrics.resultSize),None,Some(6022),true,true,None)accu__fgf__si.numTasks 4>>>

====================onStageCompleted====================

====================onJobEnd====================

jobEnd.jobId  1

jobEnd.jobResult  JobSucceeded

====================onJobEnd====================

====================onJobStart====================

jobStart.jobId = 2

stageId  2

<<:0__fgf__accuaccu__fgf__si.numTasks 1>>>

====================onJobStart====================

====================onStageSubmitted====================

<<:0__fgf__accuaccu__fgf__si.numTasks 1>>>

====================onStageSubmitted====================

====================onTaskStart====================

stageAttempId 0

stageId 2

taskInfo

====================onTaskStart====================

====================onTaskEnd====================

taskend reason Success

stageId sid 2

taskInfo AccumulableInfo(84,Some(number of output rows),Some(22),Some(22),true,true,Some(sql))<==>AccumulableInfo(109,Some(internal.metrics.input.recordsRead),Some(22),Some(22),true,true,None)<==>AccumulableInfo(108,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(92,Some(internal.metrics.resultSize),Some(1310),Some(1310),true,true,None)<==>AccumulableInfo(91,Some(internal.metrics.executorCpuTime),Some(31250000),Some(31250000),true,true,None)<==>AccumulableInfo(90,Some(internal.metrics.executorRunTime),Some(25),Some(25),true,true,None)<==>AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),Some(4),Some(4),true,true,None)

taskMetrics LongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25)<==>LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000)<==>LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 83, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 84, name: Some(number of output rows), value: 22)

====================onTaskEnd====================

====================onStageCompleted====================

stageId 2

<<:0__fgf__taskmetricLongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4),LongAccumulator(id: 89, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25),LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000),LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310),LongAccumulator(id: 93, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 94, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 95, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 96, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 97, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 98, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 99, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 100, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 101, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 102, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 103, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 104, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 105, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 106, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 107, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22),LongAccumulator(id: 110, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 111, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu92 -> AccumulableInfo(92,Some(internal.metrics.resultSize),None,Some(1310),true,true,None),109 -> AccumulableInfo(109,Some(internal.metrics.input.recordsRead),None,Some(22),true,true,None),91 -> AccumulableInfo(91,Some(internal.metrics.executorCpuTime),None,Some(31250000),true,true,None),88 -> AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),None,Some(4),true,true,None),108 -> AccumulableInfo(108,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),90 -> AccumulableInfo(90,Some(internal.metrics.executorRunTime),None,Some(25),true,true,None),84 -> AccumulableInfo(84,Some(number of output rows),None,Some(22),true,true,Some(sql))accu__fgf__si.numTasks 1>>>

====================onStageCompleted====================

====================onJobEnd====================

jobEnd.jobId  2

jobEnd.jobResult  JobSucceeded

====================onJobEnd====================

+--------+

|movieids|

+--------+

|    1193|

|     661|

|     914|

|    3408|

|    2355|

|    1197|

|    1287|

|    2804|

|     594|

|     919|

|     595|

|     938|

|    2398|

|    2918|

|    1035|

|    2791|

|    2687|

|    2018|

|    3105|

|    2797|

+--------+

only showing top 20 rows

====================onJobStart====================

jobStart.jobId = 3

stageId  3

stageId  4

<<:0__fgf__accuaccu__fgf__si.numTasks 4>>>

<<:0__fgf__accuaccu__fgf__si.numTasks 5>>>

====================onJobStart====================

====================onStageSubmitted====================

<<:0__fgf__accuaccu__fgf__si.numTasks 4>>>

====================onStageSubmitted====================

====================onTaskStart====================

stageAttempId 0

stageId 3

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 3

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 3

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 3

taskInfo

====================onTaskStart====================

====================onExecutorMetricsUpdate====================

execId   driver

3    6    AccumulableInfo(112,Some(number of output rows),Some(10394),None,true,true,Some(sql))

3    7    AccumulableInfo(112,Some(number of output rows),Some(7983),None,true,true,Some(sql))

3    8    AccumulableInfo(112,Some(number of output rows),Some(9630),None,true,true,Some(sql))

3    9    AccumulableInfo(112,Some(number of output rows),Some(10079),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

[Stage 3:>                                                          (0 + 4) / 4]====================onExecutorMetricsUpdate====================

execId   driver

3    6    AccumulableInfo(112,Some(number of output rows),Some(47124),None,true,true,Some(sql))

3    7    AccumulableInfo(112,Some(number of output rows),Some(43589),None,true,true,Some(sql))

3    8    AccumulableInfo(112,Some(number of output rows),Some(48100),None,true,true,Some(sql))

3    9    AccumulableInfo(112,Some(number of output rows),Some(47160),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

====================onTaskEnd====================

taskend reason Success

stageId sid 3

taskInfo AccumulableInfo(112,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(1617),Some(1616),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(98373696),Some(98373696),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(2298021),Some(2298021),true,true,None)<==>AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(19),Some(19),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1628),Some(1628),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(843750000),Some(843750000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(1718),Some(1718),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(6),Some(6),true,true,None)

taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 6)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 1718)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 843750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1628)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 19)<==>LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 2298021)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 103612)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 98373696)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 1617)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

[Stage 3:==============>                                            (1 + 3) / 4]====================onExecutorMetricsUpdate====================

execId   driver

3    6    AccumulableInfo(112,Some(number of output rows),Some(221438),None,true,true,Some(sql))

3    7    AccumulableInfo(112,Some(number of output rows),Some(219572),None,true,true,Some(sql))

3    8    AccumulableInfo(112,Some(number of output rows),Some(225414),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

====================onTaskEnd====================

taskend reason Success

stageId sid 3

taskInfo AccumulableInfo(112,Some(number of output rows),Some(296402),Some(400014),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2541),Some(4157),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(82364891),Some(180738587),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6554778),Some(8852799),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(43),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(3213),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1718750000),Some(2562500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2610),Some(4328),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(3),Some(9),true,true,None)

taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 3)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2610)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1718750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6554778)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296402)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 82364891)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2541)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

[Stage 3:=============================>                             (2 + 2) / 4]====================onTaskEnd====================

taskend reason Success

stageId sid 3

taskInfo AccumulableInfo(112,Some(number of output rows),Some(296289),Some(696303),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2578),Some(6735),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(15359858),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(91972656),Some(272711243),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6576036),Some(15428835),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(67),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(4798),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1750000000),Some(4312500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2661),Some(6989),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(7),Some(16),true,true,None)

taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 7)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2661)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1750000000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6576036)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296289)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 91972656)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2578)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onTaskEnd====================

taskend reason Success

stageId sid 3

taskInfo AccumulableInfo(112,Some(number of output rows),Some(303906),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2600),Some(9335),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(80225232),Some(352936475),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6745230),Some(22174065),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(91),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(6383),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1765625000),Some(6078125000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2674),Some(9663),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(5),Some(21),true,true,None)

taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 5)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2674)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1765625000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6745230)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 303906)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 80225232)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 303906)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2600)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 303906)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)

====================onTaskEnd====================

====================onStageCompleted====================

stageId 3

<<:0__fgf__taskmetricLongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 21),LongAccumulator(id: 119, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 9663),LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 6078125000),LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 6383),LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 91),LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 125, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 126, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 127, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 128, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 129, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 130, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 131, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 132, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 133, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 134, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 22174065),LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 1000209),LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 352936475),LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 1000209),LongAccumulator(id: 140, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 141, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu137 -> AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),None,Some(352936475),true,true,None),122 -> AccumulableInfo(122,Some(internal.metrics.resultSize),None,Some(6383),true,true,None),116 -> AccumulableInfo(116,Some(duration total (min, med, max)),None,Some(9335),true,true,Some(sql)),124 -> AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),118 -> AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),None,Some(21),true,true,None),136 -> AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),None,Some(1000209),true,true,None),139 -> AccumulableInfo(139,Some(internal.metrics.input.recordsRead),None,Some(1000209),true,true,None),121 -> AccumulableInfo(121,Some(internal.metrics.executorCpuTime),None,Some(6078125000),true,true,None),112 -> AccumulableInfo(112,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),120 -> AccumulableInfo(120,Some(internal.metrics.executorRunTime),None,Some(9663),true,true,None),138 -> AccumulableInfo(138,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),123 -> AccumulableInfo(123,Some(internal.metrics.jvmGCTime),None,Some(91),true,true,None),135 -> AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),None,Some(22174065),true,true,None)accu__fgf__si.numTasks 4>>>

====================onStageCompleted====================

====================onStageSubmitted====================

<<:0__fgf__accuaccu__fgf__si.numTasks 5>>>

====================onStageSubmitted====================

====================onTaskStart====================

stageAttempId 0

stageId 4

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 4

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 4

taskInfo

====================onTaskStart====================

====================onTaskStart====================

stageAttempId 0

stageId 4

taskInfo

====================onTaskStart====================

[Stage 4:>                                                          (0 + 4) / 5]====================onExecutorMetricsUpdate====================

execId   driver

4    10    AccumulableInfo(117,Some(number of output rows),Some(8707),None,true,true,Some(sql))

4    11    AccumulableInfo(117,Some(number of output rows),Some(12415),None,true,true,Some(sql))

4    12    AccumulableInfo(117,Some(number of output rows),Some(11606),None,true,true,Some(sql))

4    13    AccumulableInfo(117,Some(number of output rows),Some(9539),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

====================onExecutorMetricsUpdate====================

execId   driver

4    10    AccumulableInfo(117,Some(number of output rows),Some(50731),None,true,true,Some(sql))

4    11    AccumulableInfo(117,Some(number of output rows),Some(50721),None,true,true,Some(sql))

4    12    AccumulableInfo(117,Some(number of output rows),Some(51519),None,true,true,Some(sql))

4    13    AccumulableInfo(117,Some(number of output rows),Some(55871),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

====================onTaskStart====================

stageAttempId 0

stageId 4

taskInfo

====================onTaskStart====================

====================onTaskEnd====================

taskend reason Success

stageId sid 4

taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(200041),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(200041),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4436289),Some(4436289),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(4),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(46),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(1686),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1203125000),Some(1203125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2309),Some(2309),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(52),Some(52),true,true,None)

taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 52)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2309)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1203125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4436289)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)

====================onTaskEnd====================

====================onTaskEnd====================

taskend reason Success

stageId sid 4

taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(400082),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(400082),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434760),Some(8871049),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(8),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(92),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(3329),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1421875000),Some(2625000000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2335),Some(4644),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(43),Some(95),true,true,None)

taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 43)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2335)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1421875000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434760)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)

====================onTaskEnd====================

[Stage 4:=======================>                                   (2 + 3) / 5]====================onTaskEnd====================

taskend reason Success

stageId sid 4

taskInfo AccumulableInfo(117,Some(number of output rows),Some(200043),Some(600125),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200043),Some(600125),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434141),Some(13305190),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(12),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(138),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(5015),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1328125000),Some(3953125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2362),Some(7006),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(46),Some(141),true,true,None)

taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 46)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2362)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1328125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434141)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200043)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200043)

====================onTaskEnd====================

====================onTaskEnd====================

taskend reason Success

stageId sid 4

taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(800167),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(800167),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4435220),Some(17740410),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(16),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(184),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(6701),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1343750000),Some(5296875000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2385),Some(9391),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(31250000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(41),Some(182),true,true,None)

taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 41)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2385)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1343750000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4435220)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)

====================onTaskEnd====================

====================onExecutorMetricsUpdate====================

execId   driver

4    14    AccumulableInfo(117,Some(number of output rows),Some(20678),None,true,true,Some(sql))

====================onExecutorMetricsUpdate====================

[Stage 4:===============================================>           (4 + 1) / 5]====================onTaskEnd====================

                                                                                taskend reason Success

stageId sid 4

taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(1000209),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4433655),Some(22174065),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(20),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(8344),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(687500000),Some(5984375000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(715),Some(10106),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(46875000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(12),Some(194),true,true,None)

taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 12)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 715)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 687500000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4433655)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)

====================onTaskEnd====================

====================onStageCompleted====================

stageId 4

<<:0__fgf__taskmetricLongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 194),LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 46875000),LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 10106),LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 5984375000),LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 8344),LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 184),LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 149, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 150, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 151, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 152, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 20),LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 22174065),LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 1000209),LongAccumulator(id: 159, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 160, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 161, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 162, name: Some(internal.metrics.input.bytesRead), value: 0),LongAccumulator(id: 163, name: Some(internal.metrics.input.recordsRead), value: 0),LongAccumulator(id: 164, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 165, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu146 -> AccumulableInfo(146,Some(internal.metrics.resultSize),None,Some(8344),true,true,None),155 -> AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),None,Some(0),true,true,None),158 -> AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),None,Some(1000209),true,true,None),142 -> AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),None,Some(194),true,true,None),145 -> AccumulableInfo(145,Some(internal.metrics.executorCpuTime),None,Some(5984375000),true,true,None),154 -> AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),None,Some(20),true,true,None),148 -> AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),157 -> AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),None,Some(0),true,true,None),147 -> AccumulableInfo(147,Some(internal.metrics.jvmGCTime),None,Some(184),true,true,None),156 -> AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),None,Some(22174065),true,true,None),144 -> AccumulableInfo(144,Some(internal.metrics.executorRunTime),None,Some(10106),true,true,None),153 -> AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),None,Some(0),true,true,None),117 -> AccumulableInfo(117,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),143 -> AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),None,Some(46875000),true,true,None)accu__fgf__si.numTasks 5>>>

====================onStageCompleted====================

====================onJobEnd====================

jobEnd.jobId  3

jobEnd.jobResult  JobSucceeded

====================onJobEnd====================

====================onApplicationEnd====================

applicationEnd.time  =  1543335661199

====================onApplicationEnd====================

Process finished with exit code 0

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章