ExecutionGraph调度解析※
本节详细解析ExecutionGraph的调度过程。
由上篇可知,在JobMaster启动时会触发ExecutionGraph的调度过程,调度入口即JobMaster.startScheduling()方法。
org.apache.flink.runtime.jobmaster.JobMaster#startScheduling
private void startScheduling() {
//开始调度
schedulerNG.startScheduling();
}
org.apache.flink.runtime.scheduler.SchedulerBase#startScheduling
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
//注册作业指标
registerJobMetrics(
jobManagerJobMetricGroup,
executionGraph,
this::getNumberOfRestarts,
deploymentStateTimeMetrics,
executionGraph::registerJobStatusListener,
executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
jobStatusMetricsSettings);
//启动所有OperatorCoordinators
operatorCoordinatorHandler.startAllOperatorCoordinators();
//开始调度内部
startSchedulingInternal();
}
org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
//切换至Running
transitionToRunning();
//开始调度
schedulingStrategy.startScheduling();
}
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#startScheduling
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
//返回此拓扑中的所有管道区域
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
//是源区域
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
//根据条件调度区域
maybeScheduleRegions(sourceRegions);
}
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegions
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final Set<SchedulingPipelinedRegion> regionsToSchedule = new HashSet<>();
Set<SchedulingPipelinedRegion> nextRegions = regions;
while (!nextRegions.isEmpty()) {
//添加可调度区域并获取下一个区域
nextRegions = addSchedulableAndGetNextRegions(nextRegions, regionsToSchedule);
}
// schedule regions in topological order.
//按拓扑顺序调度区域。
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regionsToSchedule)
//调度Region
.forEach(this::scheduleRegion);
}
对每一个符合条件的SchedulingPipelinedRegion开始分配slot并部署,在Flink刚启动阶段还没有可用的slot,需要向资源管理框架Yarn:ResourceManager申请container来分配可用的slot资源。上述即为ExecutionGraph的调度过程,下面继续分析调度过程中slot的分配过程。
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#scheduleRegion
private void scheduleRegion(final SchedulingPipelinedRegion region) {
checkState(
areRegionVerticesAllInCreatedState(region),
"BUG: trying to schedule a region which is not in CREATED state");
scheduledRegions.add(region);
//分配Slot并部署
schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
}
org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy
public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeploy) {
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
final List<Execution> executionsToDeploy =
verticesToDeploy.stream()
.map(this::getCurrentExecutionOfVertex)
.collect(Collectors.toList());
//分配Slot并部署
executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
}
allocateSlotsFor(...)方法负责向Yarn:ResourceManager申请新的container获取slot资源。waitForAllSlotsAndDeploy(...)负责在获取slot资源后部署Task的过程,Task部署过程在下一篇中解析。下面着重分析allocateSlotsFor(...)方法。
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#allocateSlotsAndDeploy
public void allocateSlotsAndDeploy(
final List<Execution> executionsToDeploy,
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
//验证执行状态
validateExecutionStates(executionsToDeploy);
//切换至Scheduled
transitionToScheduled(executionsToDeploy);
final Map<ExecutionAttemptID, ExecutionSlotAssignment> executionSlotAssignmentMap =
//分配Slot
allocateSlotsFor(executionsToDeploy);
final List<ExecutionDeploymentHandle> deploymentHandles =
//创建部署句柄
createDeploymentHandles(
executionsToDeploy, requiredVersionByVertex, executionSlotAssignmentMap);
//等待所有插槽并部署
waitForAllSlotsAndDeploy(deploymentHandles);
}
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#allocateSlotsFor
private Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(
final List<Execution> executionsToDeploy) {
final List<ExecutionAttemptID> executionAttemptIds =
executionsToDeploy.stream()
.map(Execution::getAttemptId)
.collect(Collectors.toList());
//分配slot
return executionSlotAllocator.allocateSlotsFor(executionAttemptIds);
}
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSlotsFor
public Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(
List<ExecutionAttemptID> executionAttemptIds) {
//忽略其他部分
//为顶点分配slot
return allocateSlotsForVertices(vertexIds).stream()
.collect(
Collectors.toMap(
vertexAssignment ->
vertexIdToExecutionId.get(
vertexAssignment.getExecutionVertexId()),
vertexAssignment ->
new ExecutionSlotAssignment(
vertexIdToExecutionId.get(
vertexAssignment.getExecutionVertexId()),
vertexAssignment.getLogicalSlotFuture())));
}
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSlotsForVertices
private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(
List<ExecutionVertexID> executionVertexIds) {
SharedSlotProfileRetriever sharedSlotProfileRetriever =
sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<>(executionVertexIds));
Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup =
executionVertexIds.stream()
.collect(
Collectors.groupingBy(
slotSharingStrategy::getExecutionSlotSharingGroup));
Map<ExecutionSlotSharingGroup, SharedSlot> slots = new HashMap<>(executionsByGroup.size());
Set<ExecutionSlotSharingGroup> groupsToAssign = new HashSet<>(executionsByGroup.keySet());
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
//尝试分配现有的共享插槽 第一次启动时是没有的
tryAssignExistingSharedSlots(groupsToAssign);
slots.putAll(assignedSlots);
groupsToAssign.removeAll(assignedSlots.keySet());
if (!groupsToAssign.isEmpty()) {
Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =
//分配新的Slot
allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);
slots.putAll(allocatedSlots);
groupsToAssign.removeAll(allocatedSlots.keySet());
Preconditions.checkState(groupsToAssign.isEmpty());
}
//从共享槽中分配逻辑槽
Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);
// we need to pass the slots map to the createBulk method instead of using the allocator's
// 'sharedSlots'
// because if any physical slots have already failed, their shared slots have been removed
// from the allocator's 'sharedSlots' by failed logical slots.
//我们需要将插槽映射传递给 createBulk 方法,而不是使用分配器的“sharedSlots”,
//因为如果任何物理插槽已经失败,则它们的共享插槽已被失败的逻辑插槽从分配器的“sharedSlots”中删除。
SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup);
bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout);
return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
}
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSharedSlots
private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(
Set<ExecutionSlotSharingGroup> executionSlotSharingGroups,
SharedSlotProfileRetriever sharedSlotProfileRetriever) {
//忽略其他部分
Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =
//提交分配物理槽位的请求
slotProvider.allocatePhysicalSlots(slotRequests);
//忽略其他部分
return allocatedSlots;
}
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl#allocatePhysicalSlots
public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
Collection<PhysicalSlotRequest> physicalSlotRequests) {
//忽略其他部分
Map<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =
//尝试从可用的分配
tryAllocateFromAvailable(physicalSlotRequestsById.values());
return availablePhysicalSlots.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> {
//忽略其他部分
//请求新Slot
requestNewSlot(slotRequestId,resourceProfile,slotProfile.getPreferredAllocations(),physicalSlotRequest
.willSlotBeOccupiedIndefinitely()));
//忽略其他部分
}));
}
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl#requestNewSlot
private CompletableFuture<PhysicalSlot> requestNewSlot(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Collection<AllocationID> preferredAllocations,
boolean willSlotBeOccupiedIndefinitely) {
if (willSlotBeOccupiedIndefinitely) {
//请求新分配的Slot
return slotPool.requestNewAllocatedSlot(
slotRequestId, resourceProfile, preferredAllocations, null);
} else {
return slotPool.requestNewAllocatedBatchSlot(
slotRequestId, resourceProfile, preferredAllocations);
}
}
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#requestNewAllocatedSlot
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
@Nullable Time timeout) {
assertRunningInMainThread();
final PendingRequest pendingRequest =
//创建正常请求
PendingRequest.createNormalRequest(
slotRequestId, resourceProfile, preferredAllocations);
//内部请求新Slot
return internalRequestNewSlot(pendingRequest, timeout);
}
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#internalRequestNewSlot
private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
PendingRequest pendingRequest, @Nullable Time timeout) {
//内部请求新分配的Slot
internalRequestNewAllocatedSlot(pendingRequest);
//忽略其他部分
}
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot
private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
getDeclarativeSlotPool()
//增加资源需求
.increaseResourceRequirementsBy(
ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
}
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool#increaseResourceRequirementsBy
public void increaseResourceRequirementsBy(ResourceCounter increment) {
if (increment.isEmpty()) {
return;
}
totalResourceRequirements = totalResourceRequirements.add(increment);
//声明资源需求
declareResourceRequirements();
}
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool#declareResourceRequirements
private void declareResourceRequirements() {
//获取资源需求
final Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();
//org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.DeclarativeSlotPoolService
//中定义了 declareResourceRequirements方法
//所以会调用到DeclarativeSlotPoolService#declareResourceRequirements
notifyNewResourceRequirements.accept(resourceRequirements);
}
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService#declareResourceRequirements
private void declareResourceRequirements(Collection<ResourceRequirement> resourceRequirements) {
assertHasBeenStarted();
//声明资源需求
resourceRequirementServiceConnectionManager.declareResourceRequirements(
ResourceRequirements.create(jobId, jobManagerAddress, resourceRequirements));
}
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#declareResourceRequirements
public void declareResourceRequirements(ResourceRequirements resourceRequirements) {
synchronized (lock) {
checkNotClosed();
if (isConnected()) {
currentResourceRequirements = resourceRequirements;
//触发资源需求提交
triggerResourceRequirementsSubmission(
Duration.ofMillis(1L),
Duration.ofMillis(10000L),
currentResourceRequirements);
}
}
}
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#triggerResourceRequirementsSubmission
private void triggerResourceRequirementsSubmission(
Duration sleepOnError,
Duration maxSleepOnError,
ResourceRequirements resourceRequirementsToSend) {
FutureUtils.retryWithDelay(
//发送资源需求
() -> sendResourceRequirements(resourceRequirementsToSend),
new ExponentialBackoffRetryStrategy(
Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
throwable -> !(throwable instanceof CancellationException),
scheduledExecutor);
}
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#sendResourceRequirements
private CompletableFuture<Acknowledge> sendResourceRequirements(
ResourceRequirements resourceRequirementsToSend) {
synchronized (lock) {
//声明资源需求
return service.declareResourceRequirements(resourceRequirementsToSend);
//忽略其他部分
}
}
org.apache.flink.runtime.resourcemanager.ResourceManager#declareRequiredResources
public CompletableFuture<Acknowledge> declareRequiredResources(
JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) {
//忽略其他部分
//流程资源需求
slotManager.processResourceRequirements(resourceRequirements);
//忽略其他部分
}
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#processResourceRequirements
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
//忽略其他部分
//通知资源需求
resourceTracker.notifyResourceRequirements(
resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
//延迟检查资源需求
checkResourceRequirementsWithDelay();
}
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker#notifyResourceRequirements
public void notifyResourceRequirements(
JobID jobId, Collection<ResourceRequirement> resourceRequirements) {
//忽略其他部分
//通知资源需求
getOrCreateTracker(jobId).notifyResourceRequirements(resourceRequirements);
//忽略其他部分
}
org.apache.flink.runtime.resourcemanager.slotmanager.JobScopedResourceTracker#notifyResourceRequirements
public void notifyResourceRequirements(
Collection<ResourceRequirement> newResourceRequirements) {
//忽略其他部分
//找到多余的槽位
findExcessSlots();
//尝试分配多余的插槽
tryAssigningExcessSlots();
}
真正的分配动作在checkResourceRequirementsWithDelay();实现
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#checkResourceRequirementsWithDelay
//根据ResourceAllocationStrategy的实现,检查资源需求并可能进行重新分配可能会很繁重。
// 为了每次检查覆盖更多的变化,从而减少不必要的重新分配的频率,检查会稍微延迟执行。
private void checkResourceRequirementsWithDelay() {
if (requirementsCheckDelay.toMillis() <= 0) {
checkResourceRequirements();
} else {
if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) {
requirementsCheckFuture = new CompletableFuture<>();
scheduledExecutor.schedule(
() ->
mainThreadExecutor.execute(
() -> {
//检查资源需求
checkResourceRequirements();
Preconditions.checkNotNull(requirementsCheckFuture)
.complete(null);
}),
requirementsCheckDelay.toMillis(),
TimeUnit.MILLISECONDS);
}
}
}
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#checkResourceRequirements
private void checkResourceRequirements() {
if (!started) {
return;
}
Map<JobID, Collection<ResourceRequirement>> missingResources =
resourceTracker.getMissingResources();
if (missingResources.isEmpty()) {
if (resourceAllocator.isSupported()
&& !taskManagerTracker.getPendingTaskManagers().isEmpty()) {
taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
//检查资源是否需要协调
checkResourcesNeedReconcile();
//延迟声明所需资源
declareNeededResourcesWithDelay();
}
return;
}
logMissingAndAvailableResource(missingResources);
missingResources =
missingResources.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
final ResourceAllocationResult result =
//尝试满足要求
resourceAllocationStrategy.tryFulfillRequirements(
missingResources, taskManagerTracker, this::isBlockedTaskManager);
// Allocate slots according to the result
//根据结果分配slots
allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
final Set<PendingTaskManagerId> failAllocations;
if (resourceAllocator.isSupported()) {
// Allocate task managers according to the result
failAllocations =
allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());
// Record slot allocation of pending task managers
final Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>
pendingResourceAllocationResult =
new HashMap<>(result.getAllocationsOnPendingResources());
pendingResourceAllocationResult.keySet().removeAll(failAllocations);
taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);
} else {
failAllocations =
result.getPendingTaskManagersToAllocate().stream()
.map(PendingTaskManager::getPendingTaskManagerId)
.collect(Collectors.toSet());
}
unfulfillableJobs.clear();
unfulfillableJobs.addAll(result.getUnfulfillableJobs());
for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
unfulfillableJobs.addAll(
result.getAllocationsOnPendingResources().get(pendingTaskManagerId).keySet());
}
// Notify jobs that can not be fulfilled
if (sendNotEnoughResourceNotifications) {
for (JobID jobId : unfulfillableJobs) {
LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
resourceEventListener.notEnoughResourceAvailable(
jobId, resourceTracker.getAcquiredResources(jobId));
}
}
if (resourceAllocator.isSupported()) {
checkResourcesNeedReconcile();
//延迟声明所需资源
declareNeededResourcesWithDelay();
}
}
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#declareNeededResourcesWithDelay
private void declareNeededResourcesWithDelay() {
Preconditions.checkState(resourceAllocator.isSupported());
if (declareNeededResourceDelay.toMillis() <= 0) {
//声明所需资源
declareNeededResources();
} else {
if (declareNeededResourceFuture == null || declareNeededResourceFuture.isDone()) {
declareNeededResourceFuture = new CompletableFuture<>();
scheduledExecutor.schedule(
() ->
mainThreadExecutor.execute(
() -> {
//声明所需资源
declareNeededResources();
Preconditions.checkNotNull(declareNeededResourceFuture)
.complete(null);
}),
declareNeededResourceDelay.toMillis(),
TimeUnit.MILLISECONDS);
}
}
}
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#declareNeededResources
private void declareNeededResources() {
//忽略其他部分
//声明所需资源
resourceAllocator.declareResourceNeeded(resourceDeclarations);
}
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.ResourceAllocatorImpl#declareResourceNeeded
public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
validateRunsInMainThread();
//声明所需资源
ActiveResourceManager.this.declareResourceNeeded(resourceDeclarations);
}
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#declareResourceNeeded
public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
this.resourceDeclarations = Collections.unmodifiableCollection(resourceDeclarations);
log.debug("Update resource declarations to {}.", resourceDeclarations);
//检查资源声明
checkResourceDeclarations();
}
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#checkResourceDeclarations
private void checkResourceDeclarations() {
//忽略其他部分
for (int i = 0; i < requestWorkerNumber; i++) {
//请求新的 workers
requestNewWorker(workerResourceSpec);
}
//忽略其他部分
}
}
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#requestNewWorker
public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
//忽略其他部分
final CompletableFuture<WorkerType> requestResourceFuture =
//请求资源
resourceManagerDriver.requestResource(taskExecutorProcessSpec);
}
org.apache.flink.yarn.YarnResourceManagerDriver#requestResource
public CompletableFuture<YarnWorkerNode> requestResource(
TaskExecutorProcessSpec taskExecutorProcessSpec) {
//忽略其他部分
//添加容器请求
addContainerRequest(resource, priority);
}
org.apache.flink.yarn.YarnResourceManagerDriver#addContainerRequest
private void addContainerRequest(Resource resource, Priority priority) {
// update blocklist
//更新黑名单
tryUpdateApplicationBlockList();
AMRMClient.ContainerRequest containerRequest =
ContainerRequestReflector.INSTANCE.getContainerRequest(
resource, priority, taskManagerNodeLabel);
//通过客户端添加容器请求
//分配好container资源后,会回调
//org.apache.flink.yarn.YarnResourceManagerDriver.YarnContainerEventHandler.onContainersAllocated
resourceManagerClient.addContainerRequest(containerRequest);
}
org.apache.flink.yarn.YarnResourceManagerDriver.YarnContainerEventHandler#onContainersAllocated
//分配container的回调函数
@Override
public void onContainersAllocated(List<Container> containers) {
runAsyncWithFatalHandler(
() -> {
checkInitialized();
log.info("Received {} containers.", containers.size());
for (Map.Entry<Priority, List<Container>> entry :
groupContainerByPriority(containers).entrySet()) {
//在优先分配的容器上
onContainersOfPriorityAllocated(entry.getKey(), entry.getValue());
}
// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (getNumRequestedNotAllocatedWorkers() <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
org.apache.flink.yarn.YarnResourceManagerDriver#onContainersOfPriorityAllocated
private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
//忽略其他部分
//在容器中异步启动任务执行器
startTaskExecutorInContainerAsync(container, taskExecutorProcessSpec, resourceId);
removeContainerRequest(pendingRequest);
}
int numExcess = 0;
while (containerIterator.hasNext()) {
//返回多余的容器
returnExcessContainer(containerIterator.next());
numExcess++;
}
//忽略其他部分
}
org.apache.flink.yarn.YarnResourceManagerDriver#startTaskExecutorInContainerAsync
private void startTaskExecutorInContainerAsync(
Container container,
TaskExecutorProcessSpec taskExecutorProcessSpec,
ResourceID resourceId) {
final CompletableFuture<ContainerLaunchContext> containerLaunchContextFuture =
FutureUtils.supplyAsync(
() ->
//创建 TaskManager 的启动上下文
createTaskExecutorLaunchContext(
resourceId,
container.getNodeId().getHost(),
taskExecutorProcessSpec),
getIoExecutor());
FutureUtils.assertNoException(
containerLaunchContextFuture.handleAsync(
(context, exception) -> {
if (exception == null) {
//异步启动容器
nodeManagerClient.startContainerAsync(container, context);
} else {
getResourceEventHandler()
.onWorkerTerminated(resourceId, exception.getMessage());
}
return null;
},
getMainThreadExecutor()));
}
调用Utils.createTaskExecutorContext(...)方法时,传入TaskManager的实现类信息作为TaskManager的启动入口。即yarn container容器启动时会执行YarnTaskExecutorRunner类的main(...)方法。
org.apache.flink.yarn.YarnResourceManagerDriver#createTaskExecutorLaunchContext
private ContainerLaunchContext createTaskExecutorLaunchContext(
ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
throws Exception {
//忽略其他部分
final ContainerLaunchContext taskExecutorLaunchContext =
Utils.createTaskExecutorContext(
flinkConfig,
yarnConfig,
configuration,
taskManagerParameters,
taskManagerDynamicProperties,
currDir,
////入口类
YarnTaskExecutorRunner.class,
log);
taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_NODE_ID, host);
return taskExecutorLaunchContext;
}
在创建完ContainerLaunchContext实例后,随即调用nodeManagerClient.startContainerAsync(container, context);方法,开始TaskManager的启动工作。
以上即为ExecutionGraph的调度过程及初次请求slot资源时,TaskManager的配置启动信息,下面继续分析TaskManager的启动过程。
TaskManager启动过程解析※
上节最后分析到ContainerLaunchContext实例生成过程,同时也看到配置好了YarnTaskExecutorRunner启动过程,它就是一个普通的java类,包含main(...)方法,通过java命令启动。本节分析下YarnTaskExecutorRunner的启动过程。
通过java命令触发main(...)方法执行,进入runTaskManagerSecurely(...)方法中,加载配置文件生成配置项,执行后续启动操作。
org.apache.flink.yarn.YarnTaskExecutorRunner#main
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
//YARN 任务执行器的实例入口点
runTaskManagerSecurely(args);
}
org.apache.flink.yarn.YarnTaskExecutorRunner#runTaskManagerSecurely
private static void runTaskManagerSecurely(String[] args) {
Configuration configuration = null;
try {
LOG.debug("All environment variables: {}", ENV);
final String currDir = ENV.get(Environment.PWD.key());
LOG.info("Current working Directory: {}", currDir);
configuration = TaskManagerRunner.loadConfiguration(args);
//设置和修改配置
setupAndModifyConfiguration(configuration, currDir, ENV);
} catch (Throwable t) {
LOG.error("YARN TaskManager initialization failed.", t);
System.exit(INIT_ERROR_EXIT_CODE);
}
//安全地运行TaskManager进程
TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
}
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerProcessSecurely(org.apache.flink.configuration.Configuration)
public static void runTaskManagerProcessSecurely(Configuration configuration) {
//忽略其他部分nstall(new SecurityConfiguration(configuration));
exitCode =
SecurityUtils.getInstalledContext()
//运行TaskManager
.runSecured(() -> runTaskManager(configuration, pluginManager));
//忽略其他部分
System.exit(exitCode);
}
生成TaskManagerRunner实例并启动。
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManager
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
//忽略其他部分
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
//创建任务执行器服务
TaskManagerRunner::createTaskExecutorService);
//启动
taskManagerRunner.start();
//忽略其他部分
}
在生成TaskManagerRunner实例时,构造函数第三个参数是一个TaskExecutorService类型的实例,具体生成过程如下:
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorService
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler,
DelegationTokenReceiverRepository delegationTokenReceiverRepository)
throws Exception {
final TaskExecutor taskExecutor =
//启动TaskManager
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
fatalErrorHandler,
delegationTokenReceiverRepository);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager
public static TaskExecutor startTaskManager(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
TaskExecutorBlobService taskExecutorBlobService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
FatalErrorHandler fatalErrorHandler,
DelegationTokenReceiverRepository delegationTokenReceiverRepository)
throws Exception {
//忽略其他部分
//构建函数中创建了JobManager心跳管理器和ResourceManager心跳管理器
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
taskExecutorBlobService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
delegationTokenReceiverRepository);
}
TaskManagerRunner实例生成后紧接着调动其start()方法,其中startTaskManagerRunnerServices()方法主要负责构建TaskExecutorService实例,构造完成后调用其start()方法。
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#start
public void start() throws Exception {
synchronized (lock) {
//启动TaskManagerRunnerServices
startTaskManagerRunnerServices();
//会回调到org.apache.flink.runtime.taskexecutor.TaskExecutor.onStart
taskExecutorService.start();
}
}
org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManagerRunnerServices
private void startTaskManagerRunnerServices() throws Exception {
synchronized (lock) {
rpcSystem = RpcSystem.load(configuration);
this.executor =
Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices =
//创建高可用性服务
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
AddressResolution.NO_ADDRESS_RESOLUTION,
rpcSystem,
this);
JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));
//创建rpc服务
rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);
this.resourceId =
//获取任务管理器资源ID
getTaskManagerResourceID(
configuration, rpcService.getAddress(), rpcService.getPort());
//创建任务管理器工作目录
this.workingDirectory =
ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
configuration, resourceId);
LOG.info("Using working directory: {}", workingDirectory);
//从配置中获取心跳服务
HeartbeatServices heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
metricRegistry =
new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(
configuration,
rpcSystem.getMaximumMessageSizeInBytes(configuration)),
ReporterSetup.fromConfiguration(configuration, pluginManager),
TraceReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
rpcService.getAddress(),
configuration.get(TaskManagerOptions.BIND_HOST),
rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
highAvailabilityServices.createBlobStore(),
null);
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
configuration, pluginManager);
final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
new DelegationTokenReceiverRepository(configuration, pluginManager);
taskExecutorService =
//创建任务执行器
taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId.unwrap(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
workingDirectory.unwrap(),
this,
delegationTokenReceiverRepository);
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(
LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
}
}
taskExecutorService.start()方法最后执行的是TaskExecutor.start()方法。TaskExecutor继承于RpcEndpoint,所以当调用RpcEndpoint的start()方法时,会回调TaskExecutor的onStart()方法。
org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart
// 回调方法
@Override
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerException exception =
new TaskManagerException(
String.format("Could not start the TaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
//首先连接到 ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
//告诉slot表谁负责slot操作
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
//启动作业领导者服务
jobLeaderService.start(
getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache =
new FileCache(
taskManagerConfiguration.getTmpDirectories(),
taskExecutorBlobService.getPermanentBlobService());
//尝试加载本地分配快照
tryLoadLocalAllocationSnapshots();
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
以上即为TaskManager启动过程解析。
小结※
本文简单梳理了ExecutionGraph调度解析和TaskManager启动过程解析
参考资料:
Flink源码解析(十四)——Flink On Yarn ExecutionGraph调度及TaskManager启动过程解析