Flink V1.20源码阅读笔记(5.3)-Flink On Yarn JobManager启动过程ResourceManager启动解析

-
-
2024-10-31

JobManager启动过程ResourceManager启动解析

根据类继承信息可知ResourceManagerServiceImpl是一个Leader选举候选者。

public class ResourceManagerServiceImpl implements ResourceManagerService, LeaderContender {}

 

resourceManagerService创建完成后会调用start()方法启动Leader选举过程,选举过程和上篇WebmonitorEndpoint的Leader选举过程一样。

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

            resourceManagerService =
                    ResourceManagerServiceImpl.create(
                            resourceManagerFactory,
                            configuration,
                            resourceId,
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            delegationTokenManager,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);
            //启动ResourceManagerService
            resourceManagerService.start();

 

org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#grantLeadership

    // 选举成功后,回调此方法
    @Override
    public void grantLeadership(UUID newLeaderSessionID) {
        handleLeaderEventExecutor.execute(
                () -> {
                    synchronized (lock) {
                        if (!running) {
                            LOG.info(
                                    "Resource manager service is not running. Ignore granting leadership with session ID {}.",
                                    newLeaderSessionID);
                            return;
                        }

                        LOG.info(
                                "Resource manager service is granted leadership with session id {}.",
                                newLeaderSessionID);

                        try {
                            //启动新的领导者资源管理器
                            startNewLeaderResourceManager(newLeaderSessionID);
                        } catch (Throwable t) {
                            fatalErrorHandler.onFatalError(
                                    new FlinkException("Cannot start resource manager.", t));
                        }
                    }
                });
    }

可知Leader选举完会回调到ResourceManagerServiceImpl.startNewLeaderResourceManager(...)方法中

 

resourceManagerFactory.createResourceManager(...)方法最终会创建一个ActiveResourceManager实例

org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#startNewLeaderResourceManager

    //启动新的领导者资源管理器
    @GuardedBy("lock")
    private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception {
        //停止领导者资源管理器
        stopLeaderResourceManager();

        this.leaderSessionID = newLeaderSessionID;
        //创建ResourceManager 创建了SlotManager  和 JobLeaderIdService
        this.leaderResourceManager =
                resourceManagerFactory.createResourceManager(rmProcessContext, newLeaderSessionID);

        final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;

        previousResourceManagerTerminationFuture
                .thenComposeAsync(
                        (ignore) -> {
                            synchronized (lock) {
                                //如果是领导者则启动资源管理器
                                return startResourceManagerIfIsLeader(newLeaderResourceManager);
                            }
                        },
                        handleLeaderEventExecutor)
                .thenAcceptAsync(
                        (isStillLeader) -> {
                            if (isStillLeader) {
                                //确认Leader 并发布信息
                                leaderElection.confirmLeadership(
                                        newLeaderSessionID, newLeaderResourceManager.getAddress());
                            }
                        },
                        ioExecutor);
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManagerFactory#createResourceManager(org.apache.flink.runtime.resourcemanager.ResourceManagerProcessContext, java.util.UUID)

    public ResourceManager<T> createResourceManager(
            ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {

        final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
                //创建资源管理器运行时服务 创建了SlotManager  和 JobLeaderIdService
                createResourceManagerRuntimeServices(
                        context.getRmRuntimeServicesConfig(),
                        context.getRpcService(),
                        context.getHighAvailabilityServices(),
                        SlotManagerMetricGroup.create(
                                context.getMetricRegistry(), context.getHostname()));

        //创建资源管理器
        return createResourceManager(
                context.getRmConfig(),
                context.getResourceId(),
                context.getRpcService(),
                leaderSessionId,
                context.getHeartbeatServices(),
                context.getDelegationTokenManager(),
                context.getFatalErrorHandler(),
                context.getClusterInformation(),
                context.getWebInterfaceUrl(),
                ResourceManagerMetricGroup.create(
                        context.getMetricRegistry(), context.getHostname()),
                resourceManagerRuntimeServices,
                context.getIoExecutor());
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManagerFactory#createResourceManagerRuntimeServices

    private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
            ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            SlotManagerMetricGroup slotManagerMetricGroup) {

        return ResourceManagerRuntimeServices.fromConfiguration(
                rmRuntimeServicesConfig,
                highAvailabilityServices,
                rpcService.getScheduledExecutor(),
                slotManagerMetricGroup);
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices#fromConfiguration

    public static ResourceManagerRuntimeServices fromConfiguration(
            ResourceManagerRuntimeServicesConfiguration configuration,
            HighAvailabilityServices highAvailabilityServices,
            ScheduledExecutor scheduledExecutor,
            SlotManagerMetricGroup slotManagerMetricGroup) {

        // 创建了SlotManager
        final SlotManager slotManager =
                createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);

        //创建了JobLeaderIdService
        final JobLeaderIdService jobLeaderIdService =
                new DefaultJobLeaderIdService(
                        highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());

        return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
    }

 

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerFactory#createResourceManager

    @Override
    public ResourceManager<WorkerType> createResourceManager(
            Configuration configuration,
            ResourceID resourceId,
            RpcService rpcService,
            UUID leaderSessionId,
            HeartbeatServices heartbeatServices,
            DelegationTokenManager delegationTokenManager,
            FatalErrorHandler fatalErrorHandler,
            ClusterInformation clusterInformation,
            @Nullable String webInterfaceUrl,
            ResourceManagerMetricGroup resourceManagerMetricGroup,
            ResourceManagerRuntimeServices resourceManagerRuntimeServices,
            Executor ioExecutor)
            throws Exception {

        final ThresholdMeter failureRater = createStartWorkerFailureRater(configuration);
        final Duration retryInterval =
                configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
        final Duration workerRegistrationTimeout =
                configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
        final Duration previousWorkerRecoverTimeout =
                configuration.get(
                        ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);

        //活动资源管理器
        return new ActiveResourceManager<>(
                //创建了YarnResourceManagerDriver
                createResourceManagerDriver(
                        configuration, webInterfaceUrl, rpcService.getAddress()),
                configuration,
                rpcService,
                leaderSessionId,
                resourceId,
                heartbeatServices,
                delegationTokenManager,
                resourceManagerRuntimeServices.getSlotManager(),
                ResourceManagerPartitionTrackerImpl::new,
                BlocklistUtils.loadBlocklistHandlerFactory(configuration),
                resourceManagerRuntimeServices.getJobLeaderIdService(),
                clusterInformation,
                fatalErrorHandler,
                resourceManagerMetricGroup,
                failureRater,
                retryInterval,
                workerRegistrationTimeout,
                previousWorkerRecoverTimeout,
                ioExecutor);
    }

 

startResourceManagerIfIsLeader(...)方法最后会触发resourceManager.start()方法的调用,此方法里会启动resourcemanager所需要的一系列基础组件,下面会详细讲解。

org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#startResourceManagerIfIsLeader

    /**
     * Returns a future that completes as {@code true} if the resource manager is still leader and
     * started, and {@code false} if it's no longer leader.
     */
    //如果资源管理器仍然是领导者并已启动, true返回一个 future,如果它不再是领导者,则返回false 。
    @GuardedBy("lock")
    private CompletableFuture<Boolean> startResourceManagerIfIsLeader(
            ResourceManager<?> resourceManager) {
        if (isLeader(resourceManager)) {
            //启动 回回调到 org.apache.flink.runtime.resourcemanager.ResourceManager.onStart 方法
            resourceManager.start();
            forwardTerminationFuture(resourceManager);
            return resourceManager.getStartedFuture().thenApply(ignore -> true);
        } else {
            return CompletableFuture.completedFuture(false);
        }
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManager#onStart

    @Override
    public final void onStart() throws Exception {
        try {
            log.info("Starting the resource manager.");
            //启动资源管理器服务
            startResourceManagerServices();
            startedFuture.complete(null);
        } catch (Throwable t) {
            final ResourceManagerException exception =
                    new ResourceManagerException(
                            String.format("Could not start the ResourceManager %s", getAddress()),
                            t);
            onFatalError(exception);
            throw exception;
        }
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManager#startResourceManagerServices

    private void startResourceManagerServices() throws Exception {
        try {
            jobLeaderIdService.start(new JobLeaderIdActionsImpl());

            //注册指标
            registerMetrics();

            //启动心跳服务  启动了俩个心跳 跟TaskManager和跟JobManager的
            startHeartbeatServices();

            //启动slotManager  并开启了一个定时任务定时申请所需资源
            slotManager.start(
                    getFencingToken(),
                    getMainThreadExecutor(),
                    resourceAllocator,
                    new ResourceEventListenerImpl(),
                    blocklistHandler::isBlockedTaskManager);

            delegationTokenManager.start(this);

            initialize();
        } catch (Exception e) {
            handleStartResourceManagerServicesException(e);
        }
    }

 

org.apache.flink.runtime.resourcemanager.ResourceManager#startHeartbeatServices

    private void startHeartbeatServices() {
        taskManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new TaskManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);

        jobManagerHeartbeatManager =
                heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new JobManagerHeartbeatListener(),
                        getMainThreadExecutor(),
                        log);
    }

 

org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#start

    /**
     * Starts the slot manager with the given leader id and resource manager actions.
     *
     * @param newResourceManagerId to use for communication with the task managers
     * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
     * @param newResourceAllocator to use for resource (de-)allocations
     * @param newBlockedTaskManagerChecker to query whether a task manager is blocked
     */
   //使用给定的领导者 ID 和资源管理器操作启动槽管理器。
    //参数:
    //newResourceManagerId – 用于与任务管理器通信 
    //newMainThreadExecutor – 用于在 ResourceManager 的主线程中运行代码 
    //newResourceAllocator – 用于资源(取消)分配 
    //newBlockedTaskManagerChecker – 查询任务管理器是否被阻止
    @Override
    public void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceAllocator newResourceAllocator,
            ResourceEventListener newResourceEventListener,
            BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
        LOG.info("Starting the slot manager.");

        resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
        resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
        resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
        slotStatusSyncer.initialize(
                taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
        blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);

        started = true;

        if (resourceAllocator.isSupported()) {
            clusterReconciliationCheck =
                    scheduledExecutor.scheduleWithFixedDelay(
                            //检查并申请所需资源
                            () -> mainThreadExecutor.execute(this::checkClusterReconciliation),
                            0L,
                            taskManagerTimeout.toMilliseconds(),
                            TimeUnit.MILLISECONDS);
        }

        //注册SlotManager指标
        registerSlotManagerMetrics();
    }

 

leaderElection.confirmLeadership(...)方法作用是将Leader信息写入Zookeeper路径中。

 

小结

以上可知ResourceManager的启动过程不太复杂,根据akka rpc框架原理,主要是启动了2个心跳服务、1个定时服务以及ResourceManager候选者竞选Leader过程并将自己的信息写入Zookeeper中。

 

参考资料:

Flink源码解析(十二)——Flink On Yarn JobManager启动过程ResourceManager启动解析

 


目录