Flink V1.20源码阅读笔记(5.6)- Flink Task部署过程解析

-
-
2024-11-29

分配资源和注册生产分区

在上一篇中着重解析了allocateSlotsFor(...)方法触发TaskManager启动的过程,本篇继续解析waitForAllSlotsAndDeploy(...)方法触发Task部署的过程。

 

在waitForAllSlotsAndDeploy方法中会为每一个Execution分配资源和注册生产分区,紧接着进行Task部署的过程。

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#waitForAllSlotsAndDeploy

    private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
        FutureUtils.assertNoException(
                //分配所有资源并注册生成的分区
                assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
                        //部署全部
                        .handle(deployAll(deploymentHandles)));
    }

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#assignAllResourcesAndRegisterProducedPartitions

    //为每一个Execution分配资源和注册生产分区。
    private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
            final List<ExecutionDeploymentHandle> deploymentHandles) {
        final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
        for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
            final CompletableFuture<Void> resultFuture =
                    deploymentHandle
                            .getLogicalSlotFuture()
                            //为Execution分配slot资源
                            .handle(assignResource(deploymentHandle))
                            //根据Execution的下游消费节点数量,为Execution(Task)设置下游所有消费分区链接信息。
                            .thenCompose(registerProducedPartitions(deploymentHandle))
                            .handle(
                                    (ignore, throwable) -> {
                                        if (throwable != null) {
                                            handleTaskDeploymentFailure(
                                                    deploymentHandle.getExecution(), throwable);
                                        }
                                        return null;
                                    });

            resultFutures.add(resultFuture);
        }
        return FutureUtils.waitForAll(resultFutures);
    }

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#assignResource

	//负责为Execution分配slot资源。
    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
            final ExecutionDeploymentHandle deploymentHandle) {

			//忽略其他部分

            //设置assignedResource成员变量的值,代表Execution被分配slot资源,
            //最后在ExecutionVertex中记录最新一次Execution运行实例的资源分配地址信息
            if (!execution.tryAssignResource(logicalSlot)) {

            }

			//忽略其他部分

    }

org.apache.flink.runtime.executiongraph.Execution#tryAssignResource

    public boolean tryAssignResource(final LogicalSlot logicalSlot) {

	//忽略其他部分

				//设置assignedResource成员变量的值,代表Execution被分配slot资源
                assignedResource = logicalSlot;

                        getVertex()
                                //记录最新一次Execution运行实例的资源分配地址信息。
                                .setLatestPriorSlotAllocation(
                                        assignedResource.getTaskManagerLocation(),
                                        logicalSlot.getAllocationId());

	//忽略其他部分
    }

 

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#registerProducedPartitions

    //根据Execution的下游消费节点数量,为Execution(Task)设置下游所有消费分区链接信息。
    private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
            final ExecutionDeploymentHandle deploymentHandle) {
            

			//忽略其他部分
                final CompletableFuture<Void> partitionRegistrationFuture =
                        //注册生成的分区
                        execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());

			//忽略其他部分
        };
    }

org.apache.flink.runtime.executiongraph.Execution#registerProducedPartitions

    private static CompletableFuture<
                    Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>>
            registerProducedPartitions(
                    ExecutionVertex vertex,
                    TaskManagerLocation location,
                    ExecutionAttemptID attemptId) {

        ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);

        //partitions变量存的是一个ExecutionVertex对应的所有的消费分区。
        //通俗来讲,假如一个DataStream1并行度是2,被下游N个节点消费,
        //则DataStream1每个并行度都会有N个IntermediateResultPartition来表示,在Task创建时需设置N个分区链接信息
        Collection<IntermediateResultPartition> partitions =
                vertex.getProducedPartitions().values();
        Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations =
                new ArrayList<>(partitions.size());

        for (IntermediateResultPartition partition : partitions) {
            PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
            CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture =
                    vertex.getExecutionGraphAccessor()
                            .getShuffleMaster()
                            //使用 shuffle 服务异步注册分区及其生产者。
                            .registerPartitionWithProducer(
                                    vertex.getJobId(), partitionDescriptor, producerDescriptor);

            CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration =
                    shuffleDescriptorFuture.thenApply(
                            shuffleDescriptor ->
                                    //构建ResultPartitionDeploymentDescriptor
                                    createResultPartitionDeploymentDescriptor(
                                            partitionDescriptor, partition, shuffleDescriptor));
            partitionRegistrations.add(partitionRegistration);
        }


        return FutureUtils.combineAll(partitionRegistrations)
                .thenApply(
                        rpdds -> {
                            Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>
                                    producedPartitions =
                                            CollectionUtil.newLinkedHashMapWithExpectedSize(
                                                    partitions.size());
                            //收集完所有分区生产链接信息后返回。
                            rpdds.forEach(
                                    rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
                            return producedPartitions;
                        });
    }

 

Execution的部署过程

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployAll

    private BiFunction<Void, Throwable, Void> deployAll(
            final List<ExecutionDeploymentHandle> deploymentHandles) {
	//忽略其他部分
                FutureUtils.assertNoException(
                        //部署或处理错误
                        slotAssigned.handle(deployOrHandleError(deploymentHandle)));
   
    }

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployOrHandleError

    private BiFunction<Object, Throwable, Void> deployOrHandleError(
            final ExecutionDeploymentHandle deploymentHandle) {


	//忽略其他部分
                //安全部署任务
                deployTaskSafe(execution);
 
    }

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployTaskSafe

    private void deployTaskSafe(final Execution execution) {
        try {
            executionOperations.deploy(execution);
        } catch (Throwable e) {
            handleTaskDeploymentFailure(execution, e);
        }
    }

org.apache.flink.runtime.executiongraph.Execution#deploy

    //将执行部署到先前分配的资源。
    public void deploy() throws JobException {
	//忽略其他部分
	
            final TaskDeploymentDescriptor deployment =
                    vertex.getExecutionGraphAccessor()
                            .getTaskDeploymentDescriptorFactory()
                            //负责创建一个Task部署描述符
                            .createDeploymentDescriptor(
                                    this,
                                    slot.getAllocationId(),
                                    taskRestore,
                                    producedPartitions.values());

            
            //我们在Future执行器中运行提交,以便大型 TDD 的序列化不会阻塞主线程,并在提交完成后同步回主线程。
            CompletableFuture.supplyAsync(
                            //向任务管理器提交任务
                            () -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)

    }

 

在Task部署信息创建好后通过TaskExecutor的网关入口信息开始提交Task的初始化部署过程,部署过程如下:

org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask

    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
        return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
    }

根据传过来的Job信息、Task信息等创建Task实例。

org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask

    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {

	//忽略其他部分
            final JobInformation jobInformation;
            final TaskInformation taskInformation;
            try {
                //获取Job信息
                jobInformation = tdd.getJobInformation();
                //获取Task信息
                taskInformation = tdd.getTaskInformation();
            } catch (IOException | ClassNotFoundException e) {
                throw new TaskSubmissionException(
                        "Could not deserialize the job or task information.", e);
            }

           	//忽略其他部分

            Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getProducedPartitions(),
                            //Task的输入操作
                            tdd.getInputGates(),
                            memoryManager,
                            sharedResources,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            partitionStateChecker,
                            MdcUtils.scopeToJob(jobId, getRpcService().getScheduledExecutor()),
                            channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));

           	//忽略其他部分
                //启动任务线程
                task.startTaskThread();

            //忽略其他部分
    }

Task类继承于Runnable接口,在创建完Task实例后,就会运行Task.run()方法,开始Task的运行过程。

以上即为Flink Task部署过程解析。

 

参考资料:

Flink源码解析(十五)——Flink Task部署过程解析


目录