Flink V1.20源码阅读笔记(5.2)- 集群启动流程之WebmonitorEndpoint启动解析

-
-
2024-10-11

JobManager三大组件功能简介

JobManager是Flink系统master节点的逻辑称呼,不同的部署模式有不同的实现类,对于Flink On Yarn下Application模式,其实现类是YarnApplicationClusterEntryPoint。JobManager由三个核心组件构成,分别是ResourceManager、Dispatcher和WebmonitorEndpoint,以下是核心组件的功能简介。

1、WebmonitorEndpoint:Rest服务,内部由Netty实现。客户端发起的所有请求都会被该组件接收处理。

2、Dispatcher:负责接收客户端提交的JobGraph请求,启动一个JobMaster。内部持有一个JobGraphStore,当物理执行图构成过程中主节点发生故障时,可以从JobGraphStore中从新拉起一个新的JobGraph。

3、ResourceManager:Flink集群的资源管理器,作用于Flink和资源管理集群(Yarn、K8s等)之间。主要功能包括启动新的TaskManager、为作业申请slot、维持和TaskManager、JobMaster的心跳等功能。

 

WebmonitorEndpoint启动解析

上一篇最后小节介绍到YarnApplicationClusterEntryPoint启动过程最后会调用到ClusterEntrypoint.runCluster(...)方法,但只分析到Flink应用main(...)方法启动过程。现在继续分析ClusterEntrypoint.runCluster()方法涉及的三大组件创建过程。

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster

    //运行集群
    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //初始化基础服务
            initializeServices(configuration, pluginManager);

            // write host information into configuration
            //将主机信息写入配置
            configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());

            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                    //创建调度程序资源管理器组件工厂 返回的是DefaultDispatcherResourceManagerComponentFactory
                            createDispatcherResourceManagerComponentFactory(configuration);

            //通过一系列调用链路生成dispatcherRunner实例,
            //dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
            //而dispatcher组件负责触发Flink应用main(...)方法执行
            //用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            resourceId.unwrap(),
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            delegationTokenManager,
                            metricRegistry,
                            executionGraphInfoStore,
                            //rpc 指标查询服务检索器
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            failureEnrichers,
                            this);

            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> {
                                if (throwable != null) {
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                } else {
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    //这是一般的关闭路径。如果已经触发了单独的更具体的关闭,则这不会执行任何操作
                                    shutDownAsync(
                                            applicationStatus,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            null,
                                            true);
                                }
                            });
        }
    }

 

ClusterEntrypoint.initializeServices()方法

为了支持上面3个核心组件的运行,该方法会生成8个基础服务:

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#initializeServices

    protected void initializeServices(Configuration configuration, PluginManager pluginManager)
            throws Exception {

        LOG.info("Initializing cluster services.");

        synchronized (lock) {
            resourceId =
                    configuration
                            .getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
                            .map(
                                    value ->
                                            DeterminismEnvelope.deterministicValue(
                                                    new ResourceID(value)))
                            .orElseGet(
                                    () ->
                                            DeterminismEnvelope.nondeterministicValue(
                                                    ResourceID.generate()));

            LOG.debug(
                    "Initialize cluster entrypoint {} with resource id {}.",
                    getClass().getSimpleName(),
                    resourceId);

            workingDirectory =
                    //创建作业管理器工作目录
                    ClusterEntrypointUtils.createJobManagerWorkingDirectory(
                            configuration, resourceId);

            LOG.info("Using working directory: {}.", workingDirectory);

            rpcSystem = RpcSystem.load(configuration);

            //初始化和启动PekkoRpcService
            commonRpcService =
                    RpcUtils.createRemoteRpcService(
                            rpcSystem,
                            configuration,
                            configuration.get(JobManagerOptions.ADDRESS),
                            //通用RpcService的端口范围
                            getRPCPortRange(configuration),
                            configuration.get(JobManagerOptions.BIND_HOST),
                            //JobManager 绑定的本地 RPC 端口
                            configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

            //启动一个 JMXService,客户端用于连接JobManager JVM 进行监控
            JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));

            // update the configuration used to create the high availability services
            //更新用于创建高可用性服务的配置
            configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());

            //初始化一个负责集群io的线程池
            ioExecutor =
                    Executors.newFixedThreadPool(
                            ClusterEntrypointUtils.getPoolSize(configuration),
                            new ExecutorThreadFactory("cluster-io"));
            delegationTokenManager =
                    DefaultDelegationTokenManagerFactory.create(
                            configuration,
                            pluginManager,
                            commonRpcService.getScheduledExecutor(),
                            ioExecutor);
            // Obtaining delegation tokens and propagating them to the local JVM receivers in a
            // one-time fashion is required because BlobServer may connect to external file systems
            //由于 BlobServer 可能连接到外部文件系统,因此需要一次性获取委托令牌并将其传播到本地 JVM 接收器
            delegationTokenManager.obtainDelegationTokens();
            //创建一个ZooKeeperLeaderElectionHaServices类型的高可用服务
            haServices = createHaServices(configuration, ioExecutor, rpcSystem);

            //初始化BlobServer服务端
            blobServer =
                    BlobUtils.createBlobServer(
                            configuration,
                            Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
                            haServices.createBlobStore());
            blobServer.start();
            configuration.set(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
            //创建一个心跳服务,管理组件心跳
            heartbeatServices = createHeartbeatServices(configuration);
            failureEnrichers = FailureEnricherUtils.getFailureEnrichers(configuration);
            //创建指标注册表
            metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

            //性能监控服务
            final RpcService metricQueryServiceRpcService =
                    //启动远程metrics rpc服务
                    MetricUtils.startRemoteMetricsRpcService(
                            configuration,
                            commonRpcService.getAddress(),
                            configuration.get(JobManagerOptions.BIND_HOST),
                            rpcSystem);
            //启动查询服务
            metricRegistry.startQueryService(metricQueryServiceRpcService, null);

            //获取主机名
            final String hostname = RpcUtils.getHostname(commonRpcService);

            processMetricGroup =
                    //实例化进程度量组
                    MetricUtils.instantiateProcessMetricGroup(
                            metricRegistry,
                            hostname,
                            ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                    configuration));

            //初始化一个FileExecutionGraphInfoStore类型的ExecutionGraph store服务
            executionGraphInfoStore =
                    createSerializableExecutionGraphStore(
                            configuration, commonRpcService.getScheduledExecutor());
        }
    }

 

生成工厂类

调用YarnApplicationClusterEntryPoint的createDispatcherResourceManagerComponentFactory方法

DispatcherResourceManagerComponentFactory实例创建过程如下,可知主要包含WebmonitorEndpoint、Dispatcher、ResourceManager 3大核心组件的工厂类。

org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint#createDispatcherResourceManagerComponentFactory

    @Override
    protected DispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(final Configuration configuration) {
        //创建调度程序资源管理器组件工厂
        return new DefaultDispatcherResourceManagerComponentFactory(
                //Dispatcher工厂
                new DefaultDispatcherRunnerFactory(
                        ApplicationDispatcherLeaderProcessFactoryFactory.create(
                                //使用StandaloneDispatcher
                                configuration, SessionDispatcherFactory.INSTANCE, program)),
                //ResourceManager 工厂
                resourceManagerFactory,
                //WebmonitorEndpoint 工厂
                JobRestEndpointFactory.INSTANCE);
    }

 

WebMonitorEndpoint创建和启动

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

    @Override
    public DispatcherResourceManagerComponent create(
            Configuration configuration,
            ResourceID resourceId,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            DelegationTokenManager delegationTokenManager,
            MetricRegistry metricRegistry,
            ExecutionGraphInfoStore executionGraphInfoStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            Collection<FailureEnricher> failureEnrichers,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {

		// 暂时忽略其他


            //WebmonitorEndpoint的创建过程
            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElection(),
                            fatalErrorHandler);

            log.debug("Starting Dispatcher REST endpoint.");
            // WebmonitorEndpoint 启动
            webMonitorEndpoint.start();

		// 暂时忽略其他
    }

 

org.apache.flink.runtime.rest.RestServerEndpoint#start

    public final void start() throws Exception {
        synchronized (lock) {
            Preconditions.checkState(
                    state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

            log.info("Starting rest endpoint.");

            //创建Router
            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

            //客户端请求Handler生成。
            handlers = initializeHandlers(restAddressFuture);

            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            //将Handlers按请求地址信息排序,目的是确认请求URL和Handler的一一对应关系
            Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

            //确认Handler的唯一性
            checkAllEndpointsAndHandlersAreUnique(handlers);
            //将所有的Handler注册到Router当中。
            handlers.forEach(handler -> registerHandler(router, handler, log));

            MultipartRoutes multipartRoutes = createMultipartRoutes(handlers);
            log.debug("Using {} for FileUploadHandler", multipartRoutes);

            ChannelInitializer<SocketChannel> initializer =
                    new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws ConfigurationException {
                            RouterHandler handler = new RouterHandler(router, responseHeaders);

                            // SSL should be the first handler in the pipeline
                            if (isHttpsEnabled()) {
                                ch.pipeline()
                                        .addLast(
                                                "ssl",
                                                new RedirectingSslHandler(
                                                        restAddress,
                                                        restAddressFuture,
                                                        sslHandlerFactory));
                            }

                            ch.pipeline()
                                    .addLast(new HttpServerCodec())
                                    .addLast(new FileUploadHandler(uploadDir, multipartRoutes))
                                    .addLast(
                                            new FlinkHttpObjectAggregator(
                                                    maxContentLength, responseHeaders));

                            for (InboundChannelHandlerFactory factory :
                                    inboundChannelHandlerFactories) {
                                Optional<ChannelHandler> channelHandler =
                                        factory.createHandler(configuration, responseHeaders);
                                if (channelHandler.isPresent()) {
                                    ch.pipeline().addLast(channelHandler.get());
                                }
                            }

                            ch.pipeline()
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(handler.getName(), handler)
                                    .addLast(new PipelineErrorHandler(log, responseHeaders));
                        }
                    };

            NioEventLoopGroup bossGroup =
                    new NioEventLoopGroup(
                            1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup =
                    new NioEventLoopGroup(
                            0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

            //Netty服务端启动操作,Handlers注册完以后开发Netty服务端的启动操作,
            // 通道初始化器生成、Netty服务端启动、遍历端口范围,防止端口冲突后绑定端口。
            bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(initializer);

            Iterator<Integer> portsIterator;
            try {
                //遍历端口范围
                portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
            } catch (IllegalConfigurationException e) {
                throw e;
            } catch (Exception e) {
                throw new IllegalArgumentException(
                        "Invalid port range definition: " + restBindPortRange);
            }

            int chosenPort = 0;
            while (portsIterator.hasNext()) {
                try {
                    chosenPort = portsIterator.next();
                    final ChannelFuture channel;
                    if (restBindAddress == null) {
                        channel = bootstrap.bind(chosenPort);
                    } else {
                        channel = bootstrap.bind(restBindAddress, chosenPort);
                    }
                    serverChannel = channel.syncUninterruptibly().channel();
                    break;
                } catch (final Exception e) {
                    // syncUninterruptibly() throws checked exceptions via Unsafe
                    // continue if the exception is due to the port being in use, fail early
                    // otherwise
                    if (!(e instanceof java.net.BindException)) {
                        throw e;
                    }
                }
            }

            if (serverChannel == null) {
                throw new BindException(
                        "Could not start rest endpoint on any port in port range "
                                + restBindPortRange);
            }

            log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);

            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) {
                advertisedAddress = this.restAddress;
            } else {
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            }

            port = bindAddress.getPort();

            log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

            restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

            restAddressFuture.complete(restBaseUrl);

            //Netty服务端启动后,修改WebMonitorEndpoint状态为RUNNING状态,
            state = State.RUNNING;

            //进行Leader选举和启动其他基础服务。
            startInternal();
        }
    }

 

进行Leader选举和启动其他基础服务

org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

    @Override
    public void startInternal() throws Exception {
        //开始Leader选举过程。
        leaderElection.startLeaderElection(this);

        startExecutionGraphCacheCleanupTask();

        if (hasWebUI) {
            log.info("Web frontend listening at {}.", getRestBaseUrl());
        }
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElection#startLeaderElection

    @Override
    public void startLeaderElection(LeaderContender contender) throws Exception {
        Preconditions.checkNotNull(contender);
        //注册LeaderContender
        parentService.register(componentId, contender);
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#register

    @Override
    protected void register(String componentId, LeaderContender contender) throws Exception {
        checkNotNull(componentId, "componentId must not be null.");
        checkNotNull(contender, "Contender must not be null.");

        synchronized (lock) {
            Preconditions.checkState(
                    running,
                    "The DefaultLeaderElectionService should have established a connection to the backend before it's started.");

            if (leaderElectionDriver == null) {
                //创建LeaderElectionDriver组件,并赋值issuedLeaderSessionId。
                createLeaderElectionDriver();
            }

            //添加componentId -> WebMonitorEndpoint的映射
            Preconditions.checkState(
                    leaderContenderRegistry.put(componentId, contender) == null,
                    "There shouldn't be any contender registered under the passed component '%s'.",
                    componentId);

            LOG.info(
                    "LeaderContender has been registered under component '{}' for {}.",
                    componentId,
                    leaderElectionDriver);

            //前面赋值了issuedLeaderSessionId。
            if (issuedLeaderSessionID != null) {
                // notifying the LeaderContender shouldn't happen in the contender's main thread
                runInLeaderEventThread(
                        LEADER_ACQUISITION_EVENT_LOG_NAME,
                        () ->
                                notifyLeaderContenderOfLeadership(
                                        componentId, issuedLeaderSessionID));
            }
        }
    }

 

创建LeaderElectionDriver组件,并参与选举

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#createLeaderElectionDriver

    @GuardedBy("lock")
    private void createLeaderElectionDriver() throws Exception {
        Preconditions.checkState(
                leaderContenderRegistry.isEmpty(),
                "No LeaderContender should have been registered, yet.");
        Preconditions.checkState(
                leaderElectionDriver == null,
                "This DefaultLeaderElectionService cannot be reused. Calling startLeaderElectionBackend can only be called once to establish the connection to the HA backend.");

        leaderElectionDriver = leaderElectionDriverFactory.create(this);

        LOG.info(
                "A connection to the HA backend was established through LeaderElectionDriver {}.",
                leaderElectionDriver);
    }

 

org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#create

    @Override
    public ZooKeeperLeaderElectionDriver create(
            LeaderElectionDriver.Listener leaderElectionListener) throws Exception {
        return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);
    }

 

org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#ZooKeeperLeaderElectionDriver

    public ZooKeeperLeaderElectionDriver(
            CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
            throws Exception {
        this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
        this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);

        this.leaderLatchPath =
                ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
        this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
        this.treeCache =
                ZooKeeperUtils.createTreeCache(
                        curatorFramework,
                        "/",
                        new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());

        treeCache
                .getListenable()
                .addListener(
                        (client, event) -> {
                            switch (event.getType()) {
                                case NODE_ADDED:
                                case NODE_UPDATED:
                                    Preconditions.checkNotNull(
                                            event.getData(),
                                            "The ZooKeeper event data must not be null.");
                                    handleChangedLeaderInformation(event.getData());
                                    break;
                                case NODE_REMOVED:
                                    Preconditions.checkNotNull(
                                            event.getData(),
                                            "The ZooKeeper event data must not be null.");
                                    handleRemovedLeaderInformation(event.getData().getPath());
                                    break;
                            }
                        });

        leaderLatch.addListener(this);
        curatorFramework.getConnectionStateListenable().addListener(listener);
        //启动执行leaderLatch  选举为leader时回调LeaderLatchListener的isLeader()方法
        leaderLatch.start();
        treeCache.start();
    }

 

Leader回调赋值issuedLeaderSessionID

LeaderLatch在启动执行start()方法之前会先调用addListener(...)方法添加一个LeaderLatchListener,在被选举为leader时回调LeaderLatchListener的isLeader()方法

org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#isLeader

    //选举为leader时回调此方法
    @Override
    public void isLeader() {
        final UUID leaderSessionID = UUID.randomUUID();
        LOG.debug("{} obtained the leadership with session ID {}.", this, leaderSessionID);
        leaderElectionListener.onGrantLeadership(leaderSessionID);
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadership

    @Override
    public void onGrantLeadership(UUID leaderSessionID) {
        runInLeaderEventThread(
                LEADER_ACQUISITION_EVENT_LOG_NAME,
                () -> onGrantLeadershipInternal(leaderSessionID));
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadershipInternal

    @GuardedBy("lock")
    private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
        Preconditions.checkNotNull(newLeaderSessionId);

        Preconditions.checkState(
                issuedLeaderSessionID == null,
                "The leadership should have been granted while not having the leadership acquired.");

        //赋值issuedLeaderSessionID
        issuedLeaderSessionID = newLeaderSessionId;

        //此时leaderContenderRegistry还未添加componentId -> WebMonitorEndpoint的映射,
        // onGrantLeadershipInternal(...)直接退出,因此leaderElectionDriver创建完成并退回到上面的register(...)方法中。
        leaderContenderRegistry
                .keySet()
                .forEach(
                        componentId ->
                                notifyLeaderContenderOfLeadership(
                                        componentId, issuedLeaderSessionID));
    }

 

确认Leader成功并发布领导者地址

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#register方法中最终会调用notifyLeaderContenderOfLeadership方法

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#notifyLeaderContenderOfLeadership

    @GuardedBy("lock")
    private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {
        if (!leaderContenderRegistry.containsKey(componentId)) {
            LOG.debug(
                    "The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
                    sessionID,
                    leaderElectionDriver);
            return;
        } else if (!sessionID.equals(issuedLeaderSessionID)) {
            LOG.debug(
                    "An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",
                    sessionID,
                    issuedLeaderSessionID);
            return;
        }

        Preconditions.checkState(
                !confirmedLeaderInformation.hasLeaderInformation(componentId),
                "The leadership should have been granted while not having the leadership acquired.");

        LOG.debug(
                "Granting leadership to the contender registered under component '{}' with session ID {}.",
                componentId,
                issuedLeaderSessionID);

        //回调grantLeadership方法
        leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);
    }

 

org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#grantLeadership

    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        log.info(
                "{} was granted leadership with leaderSessionID={}",
                getRestBaseUrl(),
                leaderSessionID);
        //确认LeaderContender已接受由给定领导会话id标识的领导。它还会发布领导者地址,在该地址下可以访问领导者。
        leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl());
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElection#confirmLeadership

    @Override
    public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
        parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
    }

 

org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#confirmLeadership

    @Override
    protected void confirmLeadership(
            String componentId, UUID leaderSessionID, String leaderAddress) {
        Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
        LOG.debug(
                "The leader session for component '{}' is confirmed with session ID {} and address {}.",
                componentId,
                leaderSessionID,
                leaderAddress);

        checkNotNull(leaderSessionID);

        synchronized (lock) {
            if (hasLeadership(componentId, leaderSessionID)) {
                Preconditions.checkState(
                        leaderElectionDriver != null,
                        "The leadership check should only return true if a driver is instantiated.");
                Preconditions.checkState(
                        !confirmedLeaderInformation.hasLeaderInformation(componentId),
                        "No confirmation should have happened, yet.");

                final LeaderInformation newConfirmedLeaderInformation =
                        LeaderInformation.known(leaderSessionID, leaderAddress);
                confirmedLeaderInformation =
                        LeaderInformationRegister.merge(
                                confirmedLeaderInformation,
                                componentId,
                                newConfirmedLeaderInformation);

                //发布领导信息  只是将自己的信息写入Zookeeper路径中。
                leaderElectionDriver.publishLeaderInformation(
                        componentId, newConfirmedLeaderInformation);
            } else {
                if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                    LOG.debug(
                            "Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
                            leaderSessionID,
                            componentId,
                            issuedLeaderSessionID);
                } else {
                    LOG.warn(
                            "The leader session ID {} for component '{}' was confirmed even though the corresponding "
                                    + "service was not elected as the leader or has been stopped already.",
                            componentId,
                            leaderSessionID);
                }
            }
        }
    }

 

 

小结

以上可知WebMonitorEndpoint的启动过程不太复杂,只要是初始化了一系列的Handler、启动Netty服务并注册Handlers、WebMonitorEndpoint候选者竞选Leader过程并将自己的信息写入Zookeeper中。

 

参考资料:

Flink源码解析(十一)——Flink On Yarn JobManager启动过程WebmonitorEndpoint启动解析

 

 


目录