Spark Task如何在Executor执行

  1. CoarseGrainedExecutorBackend#receive() 接收LaunchTask消息并处理
override def receive: PartialFunction[Any, Unit] = {
......  
//接收到LaunchTask消息
  case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value) //获得TaskDescription
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)  //启动Task
      }
......
}

2.org.apache.spark.executor.Executor#launchTask 得到TaskRunner并提交线程池

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

 // Start worker thread pool 
private val threadPool = {
    val threadFactory = new ThreadFactoryBuilder()
    ......
      })
      .build()
 Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription) //TaskRunner继承自Runnable
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr) //TaskRunner对象提交线程池执行
  }
  1. org.apache.spark.executor.Executor.TaskRunner#run 反序列化得到Task并执行其run方法

/**
     * The task to run. This will be set in run() by deserializing the task binary coming
     * from the driver. Once it is set, it will never be changed.
     */
@volatile var task: Task[Any] = _
override def run(): Unit = {
......
//此处反序列化出真正的Task 关键在于taskDescription.serializedTask
task = ser.deserialize[Task[Any]](
         taskDescription.serializedTask,Thread.currentThread.getContextClassLoader)
......
//此处最终执行Task的run方法
val value = try {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        }
 ......
}
  1. org.apache.spark.scheduler.Task#run() 执行 org.apache.spark.scheduler.Task#runTask()
    终于到了最后调用
def runTask(context: TaskContext): T //这是一个需要子类实现的方法 e.g ShuffleMapTask ResultTask
Task的子类们.png

Task执行的主干逻辑还是非常简单,CoarseGrainedExecutorBackend收到消息,将传输过来的数据(TaskDescription)传入TaskRunner,并交由Executor内部线程池执行.
处理中,主要靠org.apache.spark.scheduler.TaskDescription#serializedTask 这个ByteBuffer反序列化出真正的Task对象.
最后Task本身run方法中调用子类实现的runTask() 最终逻辑就藏于此

收工!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。