分配资源和注册生产分区※
在上一篇中着重解析了allocateSlotsFor(...)方法触发TaskManager启动的过程,本篇继续解析waitForAllSlotsAndDeploy(...)方法触发Task部署的过程。
在waitForAllSlotsAndDeploy方法中会为每一个Execution分配资源和注册生产分区,紧接着进行Task部署的过程。
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#waitForAllSlotsAndDeploy
private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
FutureUtils.assertNoException(
//分配所有资源并注册生成的分区
assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
//部署全部
.handle(deployAll(deploymentHandles)));
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#assignAllResourcesAndRegisterProducedPartitions
//为每一个Execution分配资源和注册生产分区。
private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
final List<ExecutionDeploymentHandle> deploymentHandles) {
final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
final CompletableFuture<Void> resultFuture =
deploymentHandle
.getLogicalSlotFuture()
//为Execution分配slot资源
.handle(assignResource(deploymentHandle))
//根据Execution的下游消费节点数量,为Execution(Task)设置下游所有消费分区链接信息。
.thenCompose(registerProducedPartitions(deploymentHandle))
.handle(
(ignore, throwable) -> {
if (throwable != null) {
handleTaskDeploymentFailure(
deploymentHandle.getExecution(), throwable);
}
return null;
});
resultFutures.add(resultFuture);
}
return FutureUtils.waitForAll(resultFutures);
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#assignResource
//负责为Execution分配slot资源。
private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
final ExecutionDeploymentHandle deploymentHandle) {
//忽略其他部分
//设置assignedResource成员变量的值,代表Execution被分配slot资源,
//最后在ExecutionVertex中记录最新一次Execution运行实例的资源分配地址信息
if (!execution.tryAssignResource(logicalSlot)) {
}
//忽略其他部分
}
org.apache.flink.runtime.executiongraph.Execution#tryAssignResource
public boolean tryAssignResource(final LogicalSlot logicalSlot) {
//忽略其他部分
//设置assignedResource成员变量的值,代表Execution被分配slot资源
assignedResource = logicalSlot;
getVertex()
//记录最新一次Execution运行实例的资源分配地址信息。
.setLatestPriorSlotAllocation(
assignedResource.getTaskManagerLocation(),
logicalSlot.getAllocationId());
//忽略其他部分
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#registerProducedPartitions
//根据Execution的下游消费节点数量,为Execution(Task)设置下游所有消费分区链接信息。
private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
final ExecutionDeploymentHandle deploymentHandle) {
//忽略其他部分
final CompletableFuture<Void> partitionRegistrationFuture =
//注册生成的分区
execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());
//忽略其他部分
};
}
org.apache.flink.runtime.executiongraph.Execution#registerProducedPartitions
private static CompletableFuture<
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>>
registerProducedPartitions(
ExecutionVertex vertex,
TaskManagerLocation location,
ExecutionAttemptID attemptId) {
ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);
//partitions变量存的是一个ExecutionVertex对应的所有的消费分区。
//通俗来讲,假如一个DataStream1并行度是2,被下游N个节点消费,
//则DataStream1每个并行度都会有N个IntermediateResultPartition来表示,在Task创建时需设置N个分区链接信息
Collection<IntermediateResultPartition> partitions =
vertex.getProducedPartitions().values();
Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations =
new ArrayList<>(partitions.size());
for (IntermediateResultPartition partition : partitions) {
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture =
vertex.getExecutionGraphAccessor()
.getShuffleMaster()
//使用 shuffle 服务异步注册分区及其生产者。
.registerPartitionWithProducer(
vertex.getJobId(), partitionDescriptor, producerDescriptor);
CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration =
shuffleDescriptorFuture.thenApply(
shuffleDescriptor ->
//构建ResultPartitionDeploymentDescriptor
createResultPartitionDeploymentDescriptor(
partitionDescriptor, partition, shuffleDescriptor));
partitionRegistrations.add(partitionRegistration);
}
return FutureUtils.combineAll(partitionRegistrations)
.thenApply(
rpdds -> {
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>
producedPartitions =
CollectionUtil.newLinkedHashMapWithExpectedSize(
partitions.size());
//收集完所有分区生产链接信息后返回。
rpdds.forEach(
rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
return producedPartitions;
});
}
Execution的部署过程※
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployAll
private BiFunction<Void, Throwable, Void> deployAll(
final List<ExecutionDeploymentHandle> deploymentHandles) {
//忽略其他部分
FutureUtils.assertNoException(
//部署或处理错误
slotAssigned.handle(deployOrHandleError(deploymentHandle)));
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployOrHandleError
private BiFunction<Object, Throwable, Void> deployOrHandleError(
final ExecutionDeploymentHandle deploymentHandle) {
//忽略其他部分
//安全部署任务
deployTaskSafe(execution);
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#deployTaskSafe
private void deployTaskSafe(final Execution execution) {
try {
executionOperations.deploy(execution);
} catch (Throwable e) {
handleTaskDeploymentFailure(execution, e);
}
}
org.apache.flink.runtime.executiongraph.Execution#deploy
//将执行部署到先前分配的资源。
public void deploy() throws JobException {
//忽略其他部分
final TaskDeploymentDescriptor deployment =
vertex.getExecutionGraphAccessor()
.getTaskDeploymentDescriptorFactory()
//负责创建一个Task部署描述符
.createDeploymentDescriptor(
this,
slot.getAllocationId(),
taskRestore,
producedPartitions.values());
//我们在Future执行器中运行提交,以便大型 TDD 的序列化不会阻塞主线程,并在提交完成后同步回主线程。
CompletableFuture.supplyAsync(
//向任务管理器提交任务
() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
}
在Task部署信息创建好后通过TaskExecutor的网关入口信息开始提交Task的初始化部署过程,部署过程如下:
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
}
根据传过来的Job信息、Task信息等创建Task实例。
org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
//忽略其他部分
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
//获取Job信息
jobInformation = tdd.getJobInformation();
//获取Task信息
taskInformation = tdd.getTaskInformation();
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not deserialize the job or task information.", e);
}
//忽略其他部分
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getProducedPartitions(),
//Task的输入操作
tdd.getInputGates(),
memoryManager,
sharedResources,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
partitionStateChecker,
MdcUtils.scopeToJob(jobId, getRpcService().getScheduledExecutor()),
channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
//忽略其他部分
//启动任务线程
task.startTaskThread();
//忽略其他部分
}
Task类继承于Runnable接口,在创建完Task实例后,就会运行Task.run()方法,开始Task的运行过程。
以上即为Flink Task部署过程解析。
参考资料: