Flink V1.20源码阅读笔记(5.5)-Flink On Yarn ExecutionGraph调度解析及TaskManager启动过程解析

-
-
2024-11-22

ExecutionGraph调度解析

本节详细解析ExecutionGraph的调度过程。

由上篇可知,在JobMaster启动时会触发ExecutionGraph的调度过程,调度入口即JobMaster.startScheduling()方法。

org.apache.flink.runtime.jobmaster.JobMaster#startScheduling

    private void startScheduling() {
        //开始调度
        schedulerNG.startScheduling();
    }

org.apache.flink.runtime.scheduler.SchedulerBase#startScheduling

    public final void startScheduling() {
        mainThreadExecutor.assertRunningInMainThread();
        //注册作业指标
        registerJobMetrics(
                jobManagerJobMetricGroup,
                executionGraph,
                this::getNumberOfRestarts,
                deploymentStateTimeMetrics,
                executionGraph::registerJobStatusListener,
                executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
                jobStatusMetricsSettings);
        //启动所有OperatorCoordinators
        operatorCoordinatorHandler.startAllOperatorCoordinators();

        //开始调度内部
        startSchedulingInternal();
    }

org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal

    protected void startSchedulingInternal() {
        log.info(
                "Starting scheduling with scheduling strategy [{}]",
                schedulingStrategy.getClass().getName());
        //切换至Running
        transitionToRunning();
        //开始调度
        schedulingStrategy.startScheduling();
    }

org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#startScheduling

    public void startScheduling() {
        final Set<SchedulingPipelinedRegion> sourceRegions =
                //返回此拓扑中的所有管道区域
                IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
                        //是源区域
                        .filter(this::isSourceRegion)
                        .collect(Collectors.toSet());
        //根据条件调度区域
        maybeScheduleRegions(sourceRegions);
    }

org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegions

    private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
        final Set<SchedulingPipelinedRegion> regionsToSchedule = new HashSet<>();
        Set<SchedulingPipelinedRegion> nextRegions = regions;
        while (!nextRegions.isEmpty()) {
            //添加可调度区域并获取下一个区域
            nextRegions = addSchedulableAndGetNextRegions(nextRegions, regionsToSchedule);
        }
        // schedule regions in topological order.
        //按拓扑顺序调度区域。
        SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
                        schedulingTopology, regionsToSchedule)
                //调度Region
                .forEach(this::scheduleRegion);
    }

 

对每一个符合条件的SchedulingPipelinedRegion开始分配slot并部署,在Flink刚启动阶段还没有可用的slot,需要向资源管理框架Yarn:ResourceManager申请container来分配可用的slot资源。上述即为ExecutionGraph的调度过程,下面继续分析调度过程中slot的分配过程。

org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#scheduleRegion

    private void scheduleRegion(final SchedulingPipelinedRegion region) {
        checkState(
                areRegionVerticesAllInCreatedState(region),
                "BUG: trying to schedule a region which is not in CREATED state");
        scheduledRegions.add(region);
        //分配Slot并部署
        schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));
    }

org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy

    public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeploy) {
        final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
                executionVertexVersioner.recordVertexModifications(verticesToDeploy);

        final List<Execution> executionsToDeploy =
                verticesToDeploy.stream()
                        .map(this::getCurrentExecutionOfVertex)
                        .collect(Collectors.toList());

        //分配Slot并部署
        executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
    }

 

allocateSlotsFor(...)方法负责向Yarn:ResourceManager申请新的container获取slot资源。waitForAllSlotsAndDeploy(...)负责在获取slot资源后部署Task的过程,Task部署过程在下一篇中解析。下面着重分析allocateSlotsFor(...)方法。

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#allocateSlotsAndDeploy

    public void allocateSlotsAndDeploy(
            final List<Execution> executionsToDeploy,
            final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
        //验证执行状态
        validateExecutionStates(executionsToDeploy);

        //切换至Scheduled
        transitionToScheduled(executionsToDeploy);

        final Map<ExecutionAttemptID, ExecutionSlotAssignment> executionSlotAssignmentMap =
                //分配Slot
                allocateSlotsFor(executionsToDeploy);

        final List<ExecutionDeploymentHandle> deploymentHandles =
                //创建部署句柄
                createDeploymentHandles(
                        executionsToDeploy, requiredVersionByVertex, executionSlotAssignmentMap);

        //等待所有插槽并部署
        waitForAllSlotsAndDeploy(deploymentHandles);
    }

org.apache.flink.runtime.scheduler.DefaultExecutionDeployer#allocateSlotsFor

    private Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(
            final List<Execution> executionsToDeploy) {
        final List<ExecutionAttemptID> executionAttemptIds =
                executionsToDeploy.stream()
                        .map(Execution::getAttemptId)
                        .collect(Collectors.toList());
        //分配slot
        return executionSlotAllocator.allocateSlotsFor(executionAttemptIds);
    }

org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSlotsFor

    public Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(
            List<ExecutionAttemptID> executionAttemptIds) {

		//忽略其他部分
        //为顶点分配slot
        return allocateSlotsForVertices(vertexIds).stream()
                .collect(
                        Collectors.toMap(
                                vertexAssignment ->
                                        vertexIdToExecutionId.get(
                                                vertexAssignment.getExecutionVertexId()),
                                vertexAssignment ->
                                        new ExecutionSlotAssignment(
                                                vertexIdToExecutionId.get(
                                                        vertexAssignment.getExecutionVertexId()),
                                                vertexAssignment.getLogicalSlotFuture())));
    }

org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSlotsForVertices

    private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(
            List<ExecutionVertexID> executionVertexIds) {

        SharedSlotProfileRetriever sharedSlotProfileRetriever =
                sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<>(executionVertexIds));
        Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup =
                executionVertexIds.stream()
                        .collect(
                                Collectors.groupingBy(
                                        slotSharingStrategy::getExecutionSlotSharingGroup));

        Map<ExecutionSlotSharingGroup, SharedSlot> slots = new HashMap<>(executionsByGroup.size());
        Set<ExecutionSlotSharingGroup> groupsToAssign = new HashSet<>(executionsByGroup.keySet());

        Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
                //尝试分配现有的共享插槽  第一次启动时是没有的
                tryAssignExistingSharedSlots(groupsToAssign);
        slots.putAll(assignedSlots);
        groupsToAssign.removeAll(assignedSlots.keySet());

        if (!groupsToAssign.isEmpty()) {
            Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =
                    //分配新的Slot
                    allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);
            slots.putAll(allocatedSlots);
            groupsToAssign.removeAll(allocatedSlots.keySet());
            Preconditions.checkState(groupsToAssign.isEmpty());
        }

        //从共享槽中分配逻辑槽
        Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
                allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);

        // we need to pass the slots map to the createBulk method instead of using the allocator's
        // 'sharedSlots'
        // because if any physical slots have already failed, their shared slots have been removed
        // from the allocator's 'sharedSlots' by failed logical slots.
        //我们需要将插槽映射传递给 createBulk 方法,而不是使用分配器的“sharedSlots”,
        //因为如果任何物理插槽已经失败,则它们的共享插槽已被失败的逻辑插槽从分配器的“sharedSlots”中删除。
        SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup);
        bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout);

        return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
    }

org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator#allocateSharedSlots

    private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(
            Set<ExecutionSlotSharingGroup> executionSlotSharingGroups,
            SharedSlotProfileRetriever sharedSlotProfileRetriever) {

		//忽略其他部分
        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =
                //提交分配物理槽位的请求
                slotProvider.allocatePhysicalSlots(slotRequests);

		//忽略其他部分
        return allocatedSlots;
    }

org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl#allocatePhysicalSlots

    public Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
            Collection<PhysicalSlotRequest> physicalSlotRequests) {

		//忽略其他部分
        Map<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =
                //尝试从可用的分配
                tryAllocateFromAvailable(physicalSlotRequestsById.values());

        return availablePhysicalSlots.entrySet().stream()
                .collect(
                        Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> {
		//忽略其他部分
                                                                    //请求新Slot
       requestNewSlot(slotRequestId,resourceProfile,slotProfile.getPreferredAllocations(),physicalSlotRequest
.willSlotBeOccupiedIndefinitely()));

		//忽略其他部分
}));
    }

org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl#requestNewSlot

    private CompletableFuture<PhysicalSlot> requestNewSlot(
            SlotRequestId slotRequestId,
            ResourceProfile resourceProfile,
            Collection<AllocationID> preferredAllocations,
            boolean willSlotBeOccupiedIndefinitely) {
        if (willSlotBeOccupiedIndefinitely) {
            //请求新分配的Slot
            return slotPool.requestNewAllocatedSlot(
                    slotRequestId, resourceProfile, preferredAllocations, null);
        } else {
            return slotPool.requestNewAllocatedBatchSlot(
                    slotRequestId, resourceProfile, preferredAllocations);
        }
    }

org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#requestNewAllocatedSlot

    public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
            @Nonnull SlotRequestId slotRequestId,
            @Nonnull ResourceProfile resourceProfile,
            @Nonnull Collection<AllocationID> preferredAllocations,
            @Nullable Time timeout) {
        assertRunningInMainThread();
        final PendingRequest pendingRequest =
                //创建正常请求
                PendingRequest.createNormalRequest(
                        slotRequestId, resourceProfile, preferredAllocations);

        //内部请求新Slot
        return internalRequestNewSlot(pendingRequest, timeout);
    }

org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#internalRequestNewSlot

    private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
            PendingRequest pendingRequest, @Nullable Time timeout) {
        //内部请求新分配的Slot
        internalRequestNewAllocatedSlot(pendingRequest);

		//忽略其他部分
    }

org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot

    private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
        pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);

        getDeclarativeSlotPool()
                //增加资源需求
                .increaseResourceRequirementsBy(
                        ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
    }

org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool#increaseResourceRequirementsBy

    public void increaseResourceRequirementsBy(ResourceCounter increment) {
        if (increment.isEmpty()) {
            return;
        }
        totalResourceRequirements = totalResourceRequirements.add(increment);

        //声明资源需求
        declareResourceRequirements();
    }

org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool#declareResourceRequirements

    private void declareResourceRequirements() {
        //获取资源需求
        final Collection<ResourceRequirement> resourceRequirements = getResourceRequirements();


        //org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.DeclarativeSlotPoolService
        //中定义了 declareResourceRequirements方法
        //所以会调用到DeclarativeSlotPoolService#declareResourceRequirements
        notifyNewResourceRequirements.accept(resourceRequirements);
    }

org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService#declareResourceRequirements

    private void declareResourceRequirements(Collection<ResourceRequirement> resourceRequirements) {
        assertHasBeenStarted();

        //声明资源需求
        resourceRequirementServiceConnectionManager.declareResourceRequirements(
                ResourceRequirements.create(jobId, jobManagerAddress, resourceRequirements));
    }

org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#declareResourceRequirements

    public void declareResourceRequirements(ResourceRequirements resourceRequirements) {
        synchronized (lock) {
            checkNotClosed();
            if (isConnected()) {
                currentResourceRequirements = resourceRequirements;

                //触发资源需求提交
                triggerResourceRequirementsSubmission(
                        Duration.ofMillis(1L),
                        Duration.ofMillis(10000L),
                        currentResourceRequirements);
            }
        }
    }

org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#triggerResourceRequirementsSubmission

    private void triggerResourceRequirementsSubmission(
            Duration sleepOnError,
            Duration maxSleepOnError,
            ResourceRequirements resourceRequirementsToSend) {

        FutureUtils.retryWithDelay(
                //发送资源需求
                () -> sendResourceRequirements(resourceRequirementsToSend),
                new ExponentialBackoffRetryStrategy(
                        Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
                throwable -> !(throwable instanceof CancellationException),
                scheduledExecutor);
    }

org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager#sendResourceRequirements

    private CompletableFuture<Acknowledge> sendResourceRequirements(
            ResourceRequirements resourceRequirementsToSend) {
        synchronized (lock) {
                    //声明资源需求
                    return service.declareResourceRequirements(resourceRequirementsToSend);
		//忽略其他部分
        }
    }

org.apache.flink.runtime.resourcemanager.ResourceManager#declareRequiredResources

    public CompletableFuture<Acknowledge> declareRequiredResources(
            JobMasterId jobMasterId, ResourceRequirements resourceRequirements, Time timeout) {
		//忽略其他部分
        //流程资源需求
        slotManager.processResourceRequirements(resourceRequirements);
		//忽略其他部分
    }

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#processResourceRequirements

    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
		//忽略其他部分

        //通知资源需求
        resourceTracker.notifyResourceRequirements(
                resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
        //延迟检查资源需求
        checkResourceRequirementsWithDelay();
    }

org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker#notifyResourceRequirements

    public void notifyResourceRequirements(
            JobID jobId, Collection<ResourceRequirement> resourceRequirements) {
		//忽略其他部分
        //通知资源需求
        getOrCreateTracker(jobId).notifyResourceRequirements(resourceRequirements);

		//忽略其他部分
    }

org.apache.flink.runtime.resourcemanager.slotmanager.JobScopedResourceTracker#notifyResourceRequirements

    public void notifyResourceRequirements(
            Collection<ResourceRequirement> newResourceRequirements) {
		//忽略其他部分
        //找到多余的槽位
        findExcessSlots();
        //尝试分配多余的插槽
        tryAssigningExcessSlots();
    }

 

真正的分配动作在checkResourceRequirementsWithDelay();实现

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#checkResourceRequirementsWithDelay

    //根据ResourceAllocationStrategy的实现,检查资源需求并可能进行重新分配可能会很繁重。
    // 为了每次检查覆盖更多的变化,从而减少不必要的重新分配的频率,检查会稍微延迟执行。
    private void checkResourceRequirementsWithDelay() {
        if (requirementsCheckDelay.toMillis() <= 0) {
            checkResourceRequirements();
        } else {
            if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) {
                requirementsCheckFuture = new CompletableFuture<>();
                scheduledExecutor.schedule(
                        () ->
                                mainThreadExecutor.execute(
                                        () -> {
                                            //检查资源需求
                                            checkResourceRequirements();
                                            Preconditions.checkNotNull(requirementsCheckFuture)
                                                    .complete(null);
                                        }),
                        requirementsCheckDelay.toMillis(),
                        TimeUnit.MILLISECONDS);
            }
        }
    }

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#checkResourceRequirements

    private void checkResourceRequirements() {
        if (!started) {
            return;
        }
        Map<JobID, Collection<ResourceRequirement>> missingResources =
                resourceTracker.getMissingResources();
        if (missingResources.isEmpty()) {
            if (resourceAllocator.isSupported()
                    && !taskManagerTracker.getPendingTaskManagers().isEmpty()) {
                taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
                //检查资源是否需要协调
                checkResourcesNeedReconcile();
                //延迟声明所需资源
                declareNeededResourcesWithDelay();
            }
            return;
        }

        logMissingAndAvailableResource(missingResources);

        missingResources =
                missingResources.entrySet().stream()
                        .collect(
                                Collectors.toMap(
                                        Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));

        final ResourceAllocationResult result =
                //尝试满足要求
                resourceAllocationStrategy.tryFulfillRequirements(
                        missingResources, taskManagerTracker, this::isBlockedTaskManager);

        // Allocate slots according to the result
        //根据结果分配slots
        allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());

        final Set<PendingTaskManagerId> failAllocations;
        if (resourceAllocator.isSupported()) {
            // Allocate task managers according to the result
            failAllocations =
                    allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());

            // Record slot allocation of pending task managers
            final Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>
                    pendingResourceAllocationResult =
                            new HashMap<>(result.getAllocationsOnPendingResources());
            pendingResourceAllocationResult.keySet().removeAll(failAllocations);
            taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);
        } else {
            failAllocations =
                    result.getPendingTaskManagersToAllocate().stream()
                            .map(PendingTaskManager::getPendingTaskManagerId)
                            .collect(Collectors.toSet());
        }

        unfulfillableJobs.clear();
        unfulfillableJobs.addAll(result.getUnfulfillableJobs());
        for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
            unfulfillableJobs.addAll(
                    result.getAllocationsOnPendingResources().get(pendingTaskManagerId).keySet());
        }
        // Notify jobs that can not be fulfilled
        if (sendNotEnoughResourceNotifications) {
            for (JobID jobId : unfulfillableJobs) {
                LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
                resourceEventListener.notEnoughResourceAvailable(
                        jobId, resourceTracker.getAcquiredResources(jobId));
            }
        }

        if (resourceAllocator.isSupported()) {
            checkResourcesNeedReconcile();
            //延迟声明所需资源
            declareNeededResourcesWithDelay();
        }
    }

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#declareNeededResourcesWithDelay

    private void declareNeededResourcesWithDelay() {
        Preconditions.checkState(resourceAllocator.isSupported());

        if (declareNeededResourceDelay.toMillis() <= 0) {
            //声明所需资源
            declareNeededResources();
        } else {
            if (declareNeededResourceFuture == null || declareNeededResourceFuture.isDone()) {
                declareNeededResourceFuture = new CompletableFuture<>();
                scheduledExecutor.schedule(
                        () ->
                                mainThreadExecutor.execute(
                                        () -> {
                                            //声明所需资源
                                            declareNeededResources();
                                            Preconditions.checkNotNull(declareNeededResourceFuture)
                                                    .complete(null);
                                        }),
                        declareNeededResourceDelay.toMillis(),
                        TimeUnit.MILLISECONDS);
            }
        }
    }

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#declareNeededResources

    private void declareNeededResources() {
		//忽略其他部分

        //声明所需资源
        resourceAllocator.declareResourceNeeded(resourceDeclarations);
    }

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.ResourceAllocatorImpl#declareResourceNeeded

        public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
            validateRunsInMainThread();
            //声明所需资源
            ActiveResourceManager.this.declareResourceNeeded(resourceDeclarations);
        }

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#declareResourceNeeded

    public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
        this.resourceDeclarations = Collections.unmodifiableCollection(resourceDeclarations);
        log.debug("Update resource declarations to {}.", resourceDeclarations);

        //检查资源声明
        checkResourceDeclarations();
    }

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#checkResourceDeclarations

    private void checkResourceDeclarations() {
		//忽略其他部分
       
                    for (int i = 0; i < requestWorkerNumber; i++) {
                        //请求新的 workers
                        requestNewWorker(workerResourceSpec);
                    }

		//忽略其他部分
        }
    }

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#requestNewWorker

    public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
		//忽略其他部分
        final CompletableFuture<WorkerType> requestResourceFuture =
                //请求资源
                resourceManagerDriver.requestResource(taskExecutorProcessSpec);
    }

org.apache.flink.yarn.YarnResourceManagerDriver#requestResource

    public CompletableFuture<YarnWorkerNode> requestResource(
            TaskExecutorProcessSpec taskExecutorProcessSpec) {
		//忽略其他部分
		
            //添加容器请求
            addContainerRequest(resource, priority);    
    }

org.apache.flink.yarn.YarnResourceManagerDriver#addContainerRequest

    private void addContainerRequest(Resource resource, Priority priority) {
        // update blocklist
        //更新黑名单
        tryUpdateApplicationBlockList();

        AMRMClient.ContainerRequest containerRequest =
                ContainerRequestReflector.INSTANCE.getContainerRequest(
                        resource, priority, taskManagerNodeLabel);
        
        //通过客户端添加容器请求
        //分配好container资源后,会回调
        //org.apache.flink.yarn.YarnResourceManagerDriver.YarnContainerEventHandler.onContainersAllocated
        resourceManagerClient.addContainerRequest(containerRequest);
    }

 

org.apache.flink.yarn.YarnResourceManagerDriver.YarnContainerEventHandler#onContainersAllocated

        //分配container的回调函数
        @Override
        public void onContainersAllocated(List<Container> containers) {
            runAsyncWithFatalHandler(
                    () -> {
                        checkInitialized();
                        log.info("Received {} containers.", containers.size());

                        for (Map.Entry<Priority, List<Container>> entry :
                                groupContainerByPriority(containers).entrySet()) {
                            //在优先分配的容器上
                            onContainersOfPriorityAllocated(entry.getKey(), entry.getValue());
                        }

                        // if we are waiting for no further containers, we can go to the
                        // regular heartbeat interval
                        if (getNumRequestedNotAllocatedWorkers() <= 0) {
                            resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
                        }
                    });
        }

org.apache.flink.yarn.YarnResourceManagerDriver#onContainersOfPriorityAllocated

    private void onContainersOfPriorityAllocated(Priority priority, List<Container> containers) {
		//忽略其他部分
            //在容器中异步启动任务执行器
            startTaskExecutorInContainerAsync(container, taskExecutorProcessSpec, resourceId);
            removeContainerRequest(pendingRequest);


        }

        int numExcess = 0;
        while (containerIterator.hasNext()) {
            //返回多余的容器
            returnExcessContainer(containerIterator.next());
            numExcess++;
        }

		//忽略其他部分
    }

org.apache.flink.yarn.YarnResourceManagerDriver#startTaskExecutorInContainerAsync

    private void startTaskExecutorInContainerAsync(
            Container container,
            TaskExecutorProcessSpec taskExecutorProcessSpec,
            ResourceID resourceId) {
        final CompletableFuture<ContainerLaunchContext> containerLaunchContextFuture =
                FutureUtils.supplyAsync(
                        () ->
                                //创建 TaskManager 的启动上下文
                                createTaskExecutorLaunchContext(
                                        resourceId,
                                        container.getNodeId().getHost(),
                                        taskExecutorProcessSpec),
                        getIoExecutor());

        FutureUtils.assertNoException(
                containerLaunchContextFuture.handleAsync(
                        (context, exception) -> {
                            if (exception == null) {
                                //异步启动容器
                                nodeManagerClient.startContainerAsync(container, context);
                            } else {
                                getResourceEventHandler()
                                        .onWorkerTerminated(resourceId, exception.getMessage());
                            }
                            return null;
                        },
                        getMainThreadExecutor()));
    }

 

调用Utils.createTaskExecutorContext(...)方法时,传入TaskManager的实现类信息作为TaskManager的启动入口。即yarn container容器启动时会执行YarnTaskExecutorRunner类的main(...)方法。

org.apache.flink.yarn.YarnResourceManagerDriver#createTaskExecutorLaunchContext

    private ContainerLaunchContext createTaskExecutorLaunchContext(
            ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
            throws Exception {
		//忽略其他部分
                    final ContainerLaunchContext taskExecutorLaunchContext =
                Utils.createTaskExecutorContext(
                        flinkConfig,
                        yarnConfig,
                        configuration,
                        taskManagerParameters,
                        taskManagerDynamicProperties,
                        currDir,
                        ////入口类
                        YarnTaskExecutorRunner.class,
                        log);

        taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_NODE_ID, host);
        return taskExecutorLaunchContext;
    }

在创建完ContainerLaunchContext实例后,随即调用nodeManagerClient.startContainerAsync(container, context);方法,开始TaskManager的启动工作。

以上即为ExecutionGraph的调度过程及初次请求slot资源时,TaskManager的配置启动信息,下面继续分析TaskManager的启动过程。

 

TaskManager启动过程解析

上节最后分析到ContainerLaunchContext实例生成过程,同时也看到配置好了YarnTaskExecutorRunner启动过程,它就是一个普通的java类,包含main(...)方法,通过java命令启动。本节分析下YarnTaskExecutorRunner的启动过程。

通过java命令触发main(...)方法执行,进入runTaskManagerSecurely(...)方法中,加载配置文件生成配置项,执行后续启动操作。

org.apache.flink.yarn.YarnTaskExecutorRunner#main

    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        //YARN 任务执行器的实例入口点
        runTaskManagerSecurely(args);
    }

org.apache.flink.yarn.YarnTaskExecutorRunner#runTaskManagerSecurely

    private static void runTaskManagerSecurely(String[] args) {
        Configuration configuration = null;

        try {
            LOG.debug("All environment variables: {}", ENV);

            final String currDir = ENV.get(Environment.PWD.key());
            LOG.info("Current working Directory: {}", currDir);

            configuration = TaskManagerRunner.loadConfiguration(args);
            //设置和修改配置
            setupAndModifyConfiguration(configuration, currDir, ENV);
        } catch (Throwable t) {
            LOG.error("YARN TaskManager initialization failed.", t);
            System.exit(INIT_ERROR_EXIT_CODE);
        }

        //安全地运行TaskManager进程
        TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
    }

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerProcessSecurely(org.apache.flink.configuration.Configuration)

    public static void runTaskManagerProcessSecurely(Configuration configuration) {
		//忽略其他部分nstall(new SecurityConfiguration(configuration));

            exitCode =
                    SecurityUtils.getInstalledContext()
                            //运行TaskManager
                            .runSecured(() -> runTaskManager(configuration, pluginManager));
		//忽略其他部分

        System.exit(exitCode);
    }

 

生成TaskManagerRunner实例并启动。

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManager

    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
            throws Exception {
		//忽略其他部分
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            //创建任务执行器服务
                            TaskManagerRunner::createTaskExecutorService);
            //启动
            taskManagerRunner.start();
		//忽略其他部分
    }

 

在生成TaskManagerRunner实例时,构造函数第三个参数是一个TaskExecutorService类型的实例,具体生成过程如下:

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#createTaskExecutorService

    public static TaskExecutorService createTaskExecutorService(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            WorkingDirectory workingDirectory,
            FatalErrorHandler fatalErrorHandler,
            DelegationTokenReceiverRepository delegationTokenReceiverRepository)
            throws Exception {

        final TaskExecutor taskExecutor =
                //启动TaskManager
                startTaskManager(
                        configuration,
                        resourceID,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        metricRegistry,
                        blobCacheService,
                        localCommunicationOnly,
                        externalResourceInfoProvider,
                        workingDirectory,
                        fatalErrorHandler,
                        delegationTokenReceiverRepository);

        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    }

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager

    public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            TaskExecutorBlobService taskExecutorBlobService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            WorkingDirectory workingDirectory,
            FatalErrorHandler fatalErrorHandler,
            DelegationTokenReceiverRepository delegationTokenReceiverRepository)
            throws Exception {

		//忽略其他部分


        //构建函数中创建了JobManager心跳管理器和ResourceManager心跳管理器
        return new TaskExecutor(
                rpcService,
                taskManagerConfiguration,
                highAvailabilityServices,
                taskManagerServices,
                externalResourceInfoProvider,
                heartbeatServices,
                taskManagerMetricGroup.f0,
                metricQueryServiceAddress,
                taskExecutorBlobService,
                fatalErrorHandler,
                new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
                delegationTokenReceiverRepository);
    }

 

TaskManagerRunner实例生成后紧接着调动其start()方法,其中startTaskManagerRunnerServices()方法主要负责构建TaskExecutorService实例,构造完成后调用其start()方法。

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#start

    public void start() throws Exception {
        synchronized (lock) {
            //启动TaskManagerRunnerServices
            startTaskManagerRunnerServices();

            //会回调到org.apache.flink.runtime.taskexecutor.TaskExecutor.onStart
            taskExecutorService.start();
        }
    }

org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManagerRunnerServices

    private void startTaskManagerRunnerServices() throws Exception {
        synchronized (lock) {
            rpcSystem = RpcSystem.load(configuration);

            this.executor =
                    Executors.newScheduledThreadPool(
                            Hardware.getNumberCPUCores(),
                            new ExecutorThreadFactory("taskmanager-future"));

            highAvailabilityServices =
                    //创建高可用性服务
                    HighAvailabilityServicesUtils.createHighAvailabilityServices(
                            configuration,
                            executor,
                            AddressResolution.NO_ADDRESS_RESOLUTION,
                            rpcSystem,
                            this);

            JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));

            //创建rpc服务
            rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);

            this.resourceId =
                    //获取任务管理器资源ID
                    getTaskManagerResourceID(
                            configuration, rpcService.getAddress(), rpcService.getPort());

            //创建任务管理器工作目录
            this.workingDirectory =
                    ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
                            configuration, resourceId);

            LOG.info("Using working directory: {}", workingDirectory);

            //从配置中获取心跳服务
            HeartbeatServices heartbeatServices =
                    HeartbeatServices.fromConfiguration(configuration);

            metricRegistry =
                    new MetricRegistryImpl(
                            MetricRegistryConfiguration.fromConfiguration(
                                    configuration,
                                    rpcSystem.getMaximumMessageSizeInBytes(configuration)),
                            ReporterSetup.fromConfiguration(configuration, pluginManager),
                            TraceReporterSetup.fromConfiguration(configuration, pluginManager));

            final RpcService metricQueryServiceRpcService =
                    MetricUtils.startRemoteMetricsRpcService(
                            configuration,
                            rpcService.getAddress(),
                            configuration.get(TaskManagerOptions.BIND_HOST),
                            rpcSystem);
            metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());

            blobCacheService =
                    BlobUtils.createBlobCacheService(
                            configuration,
                            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                            highAvailabilityServices.createBlobStore(),
                            null);

            final ExternalResourceInfoProvider externalResourceInfoProvider =
                    ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
                            configuration, pluginManager);

            final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
                    new DelegationTokenReceiverRepository(configuration, pluginManager);

            taskExecutorService =
                    //创建任务执行器
                    taskExecutorServiceFactory.createTaskExecutor(
                            this.configuration,
                            this.resourceId.unwrap(),
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            metricRegistry,
                            blobCacheService,
                            false,
                            externalResourceInfoProvider,
                            workingDirectory.unwrap(),
                            this,
                            delegationTokenReceiverRepository);

            handleUnexpectedTaskExecutorServiceTermination();

            MemoryLogger.startIfConfigured(
                    LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
        }
    }

 

taskExecutorService.start()方法最后执行的是TaskExecutor.start()方法。TaskExecutor继承于RpcEndpoint,所以当调用RpcEndpoint的start()方法时,会回调TaskExecutor的onStart()方法。

org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart

     // 回调方法
    @Override
    public void onStart() throws Exception {
        try {
            startTaskExecutorServices();
        } catch (Throwable t) {
            final TaskManagerException exception =
                    new TaskManagerException(
                            String.format("Could not start the TaskExecutor %s", getAddress()), t);
            onFatalError(exception);
            throw exception;
        }

        startRegistrationTimeout();
    }

org.apache.flink.runtime.taskexecutor.TaskExecutor#startTaskExecutorServices

    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            //首先连接到 ResourceManager
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

            // tell the task slot table who's responsible for the task slot actions
            //告诉slot表谁负责slot操作
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

            // start the job leader service
            //启动作业领导者服务
            jobLeaderService.start(
                    getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache =
                    new FileCache(
                            taskManagerConfiguration.getTmpDirectories(),
                            taskExecutorBlobService.getPermanentBlobService());

            //尝试加载本地分配快照
            tryLoadLocalAllocationSnapshots();
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

以上即为TaskManager启动过程解析。

 

小结

本文简单梳理了ExecutionGraph调度解析和TaskManager启动过程解析

 

参考资料:

Flink源码解析(十四)——Flink On Yarn ExecutionGraph调度及TaskManager启动过程解析


目录