Flink V1.20源码阅读笔记(5.4)-Flink On Yarn JobManager启动过程Dispatcher启动解析

-
-
2024-11-07

JobManager启动过程Dispatcher启动解析

本篇介绍核心组件Dispatcher的启动过程。

dispatcherRunnerFactory类型为DefaultDispatcherRunnerFactory,在调用createDispatcherRunner(...)方法中,入参会有一个Dispatcher独有的Leader选举服务,一个高可用的作业持久化组件等。

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

            //创建并触发dispatcher组件高可用Leader选举过程。 实际创建的是DefaultDispatcherRunner
            // 选举后 回调至org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner.grantLeadership
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElection(),
                            fatalErrorHandler,
                            new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

 

dispatcherLeaderProcessFactoryFactory类型是ApplicationDispatcherLeaderProcessFactoryFactory,通过createFactory(...)方法生成一个DispatcherLeaderProcessFactory实例,参与到DefaultDispatcherRunner实例构建过程中。

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner

    @Override
    public DispatcherRunner createDispatcherRunner(
            LeaderElection leaderElection,
            FatalErrorHandler fatalErrorHandler,
            JobPersistenceComponentFactory jobPersistenceComponentFactory,
            Executor ioExecutor,
            RpcService rpcService,
            PartialDispatcherServices partialDispatcherServices)
            throws Exception {
        
        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                //ApplicationDispatcherLeaderProcessFactoryFactory
                dispatcherLeaderProcessFactoryFactory.createFactory(
                        jobPersistenceComponentFactory,
                        ioExecutor,
                        rpcService,
                        partialDispatcherServices,
                        fatalErrorHandler);

        //创建并触发dispatcher组件高可用Leader选举过程。
        //选举后 回调org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner.grantLeadership
        return DefaultDispatcherRunner.create(
                leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
    }

 

DefaultDispatcherRunner通过create(...)方法新建一个DefaultDispatcherRunner实例。由类继承关系可知,它也是一个Leader选举候选者类,实例会参与到Leader选举的过程中。实例创建完成后紧接着调用start()方法启动Leader选举过程,该过程与前两篇WebmonitorEndpoint、ResourceManager Leader选举过程一样。

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#create

    public static DispatcherRunner create(
            LeaderElection leaderElection,
            FatalErrorHandler fatalErrorHandler,
            DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
            throws Exception {
        final DefaultDispatcherRunner dispatcherRunner =
                new DefaultDispatcherRunner(
                        leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
        //触发dispatcher组件高可用Leader选举过程。 回调org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner.grantLeadership
        dispatcherRunner.start();
        return dispatcherRunner;
    }

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#start

    void start() throws Exception {
        //将通过的LeaderContender注册到领导者选举流程。
        leaderElection.startLeaderElection(this);
    }

 

选举成功后回调至grantLeadership 方法

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#grantLeadership

    //选举后,回调至此方法
    @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    //生成并启动dispatcherLeaderProcess
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

 

最后会新建一个SessionDispatcherLeaderProcess实例并调用start()方法。

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#startNewDispatcherLeaderProcess

    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
        stopDispatcherLeaderProcess();

        //返回的实际类型是SessionDispatcherLeaderProcess
        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
        FutureUtils.assertNoException(
                previousDispatcherLeaderProcessTerminationFuture.thenRun(
                        //启动dispatcherLeaderProcess实例
                        newDispatcherLeaderProcess::start));
    }

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#createNewDispatcherLeaderProcess

    private DispatcherLeaderProcess createNewDispatcherLeaderProcess(UUID leaderSessionID) {
        //返回的实际类型是SessionDispatcherLeaderProcess
        final DispatcherLeaderProcess newDispatcherLeaderProcess =
                dispatcherLeaderProcessFactory.create(leaderSessionID);

         //忽略其他部分
    }

 

SessionDispatcherLeaderProcess实例start()方法执行,该实例通过父类AbstractDispatcherLeaderProcesss的start()方法调用startInternal(),将状态改为RUNNING后,执行SessionDispatcherLeaderProcess实例的onStart()方法。

 

org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#start

    @Override
    public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#startInternal

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
        //实际调用的SessionDispatcherLeaderProcess
        onStart();
    }

 

在onStart()方法中,第一步执行startServices()方法,将SessionDispatcherLeaderProcess实例所代表的JobGraphListener添加到DefaultJobGraphStore启动过程中。第二步执行createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults()方法,因为Flink应用是新建状态,getDirtyJobResultsIfRunning()、recoverJobsIfRunning()都为空,随即执行createDispatcherIfRunning()方法,最后到createDispatcher(...)执行过程中。

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#onStart

    @Override
    protected void onStart() {
        //启动jobGraphStore
        startServices();

        //一系列调用后触发Flink应用main(...)方法的执行
        onGoingRecoveryOperation =
                createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();
    }

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults

    private CompletableFuture<Void>
            createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {
        final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
                //获取标记为dirty持久化JobResult实例。这对于恢复完成步骤很有用。
                CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);

        return dirtyJobsFuture
                .thenApplyAsync(
                        dirtyJobs ->
                                //如果正在运行则恢复作业
                                this.recoverJobsIfRunning(
                                        dirtyJobs.stream()
                                                .map(JobResult::getJobId)
                                                .collect(Collectors.toSet())),
                        ioExecutor)
                //如果正在运行则创建调度程序
                .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
                .handle(this::onErrorIfRunning);
    }

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherIfRunning

    private void createDispatcherIfRunning(
            Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
        //创建调度程序
        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));
    }

 

dispatcherGatewayServiceFactory.create(...)过程如下,其中dispatcherGatewayServiceFactory类型是ApplicationDispatcherGatewayServiceFactory。在create(...)中,会先创建一个Dispatcher实例,创建时第5个入参是DispatcherBootstrapFactory类型,DispatcherBootstrapFactory实例最后会触发Flink应用main()方法的执行。最后启动Dispatcher服务。

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcher

    private void createDispatcher(
            Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {

        final DispatcherGatewayService dispatcherService =
                //会新建ApplicationDispatcherBootstrap实例 并在构建时触发Flink应用main
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()),
                        jobGraphs,
                        recoveredDirtyJobResults,
                        jobGraphStore,
                        jobResultStore);

        completeDispatcherSetup(dispatcherService);
    }

 

 dispatcherFactory.createDispatcher(...)方法解析,由之前类初始化时可知,dispatcherFactory类型为SessionDispatcherFactory.INSTANCE,调用createDispatcher(...)方法生成一个StandaloneDispatcher实例,由继承关系可知Dispatcher继承于RpcEndpoint,在调用start()方法时,实际上调用的是父类RpcEndpoint的start()方法,由Flink akka RPC框架可知,RpcEndpoint在启动start()方法时会回调onStart()方法,

org.apache.flink.client.deployment.application.ApplicationDispatcherGatewayServiceFactory#create

    @Override
    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            JobGraphWriter jobGraphWriter,
            JobResultStore jobResultStore) {

        //获取恢复的作业 ID
        final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

        final Dispatcher dispatcher;
        try {
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            recoveredDirtyJobResults,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    //新建ApplicationDispatcherBootstrap实例
                                    new ApplicationDispatcherBootstrap(
                                            application,
                                            recoveredJobIds,
                                            configuration,
                                            dispatcherGateway,
                                            scheduledExecutor,
                                            errorHandler),
                            PartialDispatcherServicesWithJobPersistenceComponents.from(
                                    partialDispatcherServices, jobGraphWriter, jobResultStore));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        //Dispatcher  启动 会回调至org.apache.flink.runtime.dispatcher.Dispatcher.onStart
        //回调中会启动Flink应用的main()方法。
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

org.apache.flink.runtime.dispatcher.SessionDispatcherFactory#createDispatcher

    @Override
    public StandaloneDispatcher createDispatcher(
            RpcService rpcService,
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            DispatcherBootstrapFactory dispatcherBootstrapFactory,
            PartialDispatcherServicesWithJobPersistenceComponents
                    partialDispatcherServicesWithJobPersistenceComponents)
            throws Exception {
        // create the default dispatcher
        //创建默认调度程序
        return new StandaloneDispatcher(
                rpcService,
                fencingToken,
                recoveredJobs,
                recoveredDirtyJobResults,
                dispatcherBootstrapFactory,
                DispatcherServices.from(
                        partialDispatcherServicesWithJobPersistenceComponents,
                        JobMasterServiceLeadershipRunnerFactory.INSTANCE,
                        CheckpointResourcesCleanupRunnerFactory.INSTANCE));
    }

 

在onStart()方法中, startDispatcherServices()方法负责注册监控指标。因为Flink应用是第一次启动,startCleanupRetries()、startRecoveredJobs()2个方法都仅仅空执行。dispatcherBootstrapFactory.create(...)方法负责启动Flink应用的main()方法。

org.apache.flink.runtime.dispatcher.Dispatcher#onStart

    @Override
    public void onStart() throws Exception {
        try {
            //负责注册监控指标
            startDispatcherServices();
        } catch (Throwable t) {
            final DispatcherException exception =
                    new DispatcherException(
                            String.format("Could not start the Dispatcher %s", getAddress()), t);
            onFatalError(exception);
            throw exception;
        }

        //开始清理重试
        startCleanupRetries();
        
        //开始恢复工作
        startRecoveredJobs();

        this.dispatcherBootstrap =
                //负责启动Flink应用的main()方法。
                this.dispatcherBootstrapFactory.create(
                        getSelfGateway(DispatcherGateway.class),
                        this.getRpcService().getScheduledExecutor(),
                        this::onFatalError);
    }

 

 

JobGraph提交、ExecutionGraph生成入口的调用、JobMaster生成等过程解析

由第一节可知Dispatcher开始启动,但到此为止还未说明JobGraph提交过程、ExecutionGraph生成入口的调用过程、JobMaster生成过程等过程,结合dispatcherBootstrapFactory.create(...)方法调用、Flink应用main()方法触发、StreamExecutionEnvironment类中execute()方法来说明以上缺失的重大过程的执行情况。

 

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#ApplicationDispatcherBootstrap

    public ApplicationDispatcherBootstrap(
            final PackagedProgram application,
            final Collection<JobID> recoveredJobIds,
            final Configuration configuration,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final FatalErrorHandler errorHandler) {
        this.configuration = checkNotNull(configuration);
        this.recoveredJobIds = checkNotNull(recoveredJobIds);
        this.application = checkNotNull(application);
        this.errorHandler = checkNotNull(errorHandler);

        this.applicationCompletionFuture =
                //修复JobId 并异步运行应用程序
                fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

        this.bootstrapCompletionFuture = finishBootstrapTasks(dispatcherGateway);
    }

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#fixJobIdAndRunApplicationAsync

    private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
            final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) {
        final Optional<String> configuredJobId =
                configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
        final boolean submitFailedJobOnApplicationError =
                configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
        //非HA模式
        if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
                && !configuredJobId.isPresent()) {
            //异步运行应用程序
            return runApplicationAsync(
                    dispatcherGateway, scheduledExecutor, false, submitFailedJobOnApplicationError);
        }
        if (!configuredJobId.isPresent()) {
            // In HA mode, we only support single-execute jobs at the moment. Here, we manually
            // generate the job id, if not configured, from the cluster id to keep it consistent
            // across failover.
            //在HA模式下,我们目前仅支持单执行作业。
            //在这里,我们从集群 ID 手动生成作业 ID(如果未配置),以使其在故障转移期间保持一致。
            configuration.set(
                    PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
                    new JobID(
                                    Preconditions.checkNotNull(
                                                    configuration.get(
                                                            HighAvailabilityOptions.HA_CLUSTER_ID))
                                            .hashCode(),
                                    0)
                            .toHexString());
        }
        //异步运行应用程序
        return runApplicationAsync(
                dispatcherGateway, scheduledExecutor, true, submitFailedJobOnApplicationError);
    }

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAsync

    /**
     * Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
     * The returned {@link CompletableFuture} completes when all jobs of the user application
     * succeeded. if any of them fails, or if job submission fails.
     */
    //通过在给定的scheduledExecutor上调度任务来运行用户程序入口点。
    //当用户应用程序的所有作业成功时,返回的CompletableFuture完成。
    //如果其中任何一个失败,或者作业提交失败。
    private CompletableFuture<Void> runApplicationAsync(
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution,
            final boolean submitFailedJobOnApplicationError) {
        final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();
        final Set<JobID> tolerateMissingResult = Collections.synchronizedSet(new HashSet<>());

        // we need to hand in a future as return value because we need to get those JobIs out
        // from the scheduled task that executes the user program
        //我们需要将 future 作为返回值,因为我们需要从执行用户程序的计划任务中获取这些 JobIs
        applicationExecutionTask =
                scheduledExecutor.schedule(
                        () ->
                                //运行应用程序入口点
                                runApplicationEntryPoint(
                                        applicationExecutionFuture,
                                        tolerateMissingResult,
                                        dispatcherGateway,
                                        scheduledExecutor,
                                        enforceSingleJobExecution,
                                        submitFailedJobOnApplicationError),
                        0L,
                        TimeUnit.MILLISECONDS);

        return applicationExecutionFuture.thenCompose(
                jobIds ->
                        //获取应用结果
                        getApplicationResult(
                                dispatcherGateway,
                                jobIds,
                                tolerateMissingResult,
                                scheduledExecutor));
    }

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPoint

    /**
     * Runs the user program entrypoint and completes the given {@code jobIdsFuture} with the {@link
     * JobID JobIDs} of the submitted jobs.
     *
     * <p>This should be executed in a separate thread (or task).
     */
    //运行用户程序入口点并使用已提交作业的JobIDs完成给定的jobIdsFuture 。
    //这应该在单独的线程(或任务)中执行。
    private void runApplicationEntryPoint(
            final CompletableFuture<List<JobID>> jobIdsFuture,
            final Set<JobID> tolerateMissingResult,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution,
            final boolean submitFailedJobOnApplicationError) {
         //忽略其他部分
         
           //创建了EmbeddedExecutorServiceLoader
            final PipelineExecutorServiceLoader executorServiceLoader =
                    new EmbeddedExecutorServiceLoader(
                            applicationJobIds, dispatcherGateway, scheduledExecutor);

            //执行程序
            ClientUtils.executeProgram(
                    executorServiceLoader,
                    configuration,
                    application,
                    enforceSingleJobExecution,
                    true /* suppress sysout */);

         //忽略其他部分
    }

 

Flink应用main()方法开始触发之前,即ClientUtils.executeProgram(...)方法里调用program.invokeInteractiveModeForExecution()之前,会设置流执行环境的工厂实例

这样在调用program.invokeInteractiveModeForExecution()后触发Flink应用main(...)方法执行,用户在编写Flink应用时,第一步基本上都是以StreamExecutionEnvironment.getExecutionEnvironment();形式获取流执行环境。通过工厂实例获取流执行环境StreamContextEnvironment,实际类型为EmbeddedExecutorServiceLoader。

    public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout)
            throws ProgramInvocationException {

         //忽略其他部分

            //设置上下文  实际上是EmbeddedExecutorServiceLoader
            ContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            StreamContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            // For DataStream v2.
            ExecutionContextEnvironment.setAsContext(
                    executorServiceLoader, configuration, userCodeClassLoader);


                //调用交互模式执行
                program.invokeInteractiveModeForExecution();
           
         //忽略其他部分
    }

org.apache.flink.client.program.PackagedProgram#invokeInteractiveModeForExecution

    /**
     * This method assumes that the context environment is prepared, or the execution will be a
     * local execution by default.
     */
    //该方法假设上下文环境已准备好,否则默认执行本地执行。
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            //调用主方法
            callMainMethod(mainClass, args);
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

org.apache.flink.client.program.PackagedProgram#callMainMethod

    private static void callMainMethod(Class<?> entryClass, String[] args)
            throws ProgramInvocationException {
         //忽略其他部分
            //反射调用执行main方法
            mainMethod.invoke(null, (Object) args);
            
         //忽略其他部分
    }

 

Flink应用最后一步基本上都是执行StreamExecutionEnvironment.execute()方法,开始Flink应用的运行。通过一系列父类子类的方法调用,后面到达EmbeddedExecutor.execute方法

org.apache.flink.client.deployment.application.executors.EmbeddedExecutor#execute

    @Override
    public CompletableFuture<JobClient> execute(
            final Pipeline pipeline,
            final Configuration configuration,
            ClassLoader userCodeClassloader)
            throws MalformedURLException {
        //忽略其他部分

        //提交
        return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
    }

org.apache.flink.client.deployment.application.executors.EmbeddedExecutor#submitAndGetJobClientFuture

    private CompletableFuture<JobClient> submitAndGetJobClientFuture(
            final Pipeline pipeline,
            final Configuration configuration,
            final ClassLoader userCodeClassloader)
            throws MalformedURLException {


         //忽略其他部分

        final CompletableFuture<JobID> jobSubmissionFuture =
                //开始提交JobGraph信息。
                submitJob(configuration, dispatcherGateway, jobGraph, timeout);

         //忽略其他部分
    }

 

JobGraph的创建

FlinkPipelineTranslationUtil.getJobGraph中会触发translate 至 JobGraph的转换

org.apache.flink.client.deployment.executors.PipelineExecutorUtils#getJobGraph

    /**
     * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
     *
     * @param pipeline the pipeline whose job graph we are computing.
     * @param configuration the configuration with the necessary information such as jars and
     *     classpaths to be included, the parallelism of the job and potential savepoint settings
     *     used to bootstrap its state.
     * @param userClassloader the classloader which can load user classes.
     * @return the corresponding {@link JobGraph}.
     */
    //创建与提供的Pipeline对应的JobGraph 。
    //参数:
    //pipeline – 我们正在计算其作业图的管道。
    //configuration – 包含必要信息的配置,例如要包含的 jar 和类路径、作业的并行性以及用于引导其状态的潜在保存点设置。
    //userClassloader – 可以加载用户类的类加载器。
    //返回:
    //相应的JobGraph 。
    public static JobGraph getJobGraph(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull ClassLoader userClassloader)
            throws MalformedURLException {
         //忽略其他部分
         
        final JobGraph jobGraph =
                //触发translate 至 JobGraph的转换
                FlinkPipelineTranslationUtil.getJobGraph(
                        userClassloader,
                        pipeline,
                        configuration,
                        executionConfigAccessor.getParallelism());

         //忽略其他部分
    }

org.apache.flink.client.deployment.application.executors.EmbeddedExecutor#submitJob

    private static CompletableFuture<JobID> submitJob(
            final Configuration configuration,
            final DispatcherGateway dispatcherGateway,
            final JobGraph jobGraph,
            final Time rpcTimeout) {
         //忽略其他部分 
       //提取并上传JobGraph文件
       ClientUtils.extractAndUploadJobGraphFiles(jobGraph,() -> new BlobClient(blobServerAddress, configuration));

     //向dispatcher提交任务
      return dispatcherGateway.submitJob(jobGraph, rpcTimeout);
    }

org.apache.flink.runtime.dispatcher.Dispatcher#submitJob

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
         //忽略其他部分 
         
         //内部提交作业
		 return internalSubmitJob(jobGraph);
    }

org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob

    private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        //应用并行度覆盖
        applyParallelismOverrides(jobGraph);
        log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID());

        // track as an outstanding job
        submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID());

        //运行job
        return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
                .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable))
                .thenCompose(Function.identity())
                .whenComplete(
                        (ignored, throwable) ->
                                // job is done processing, whether failed or finished
                                //作业已完成处理,无论失败还是完成
                                submittedAndWaitingTerminationJobIDs.remove(jobGraph.getJobID()));
    }

org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        //初始化作业客户端过期时间
        initJobClientExpiredTime(jobGraph);
        //运行作业
        runJob(
                //创建JobMasterRunner
                createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
    }

 

在方法createJobMasterRunner(...)中jobManagerRunnerFactory.createJobManagerRunner(...)执行过程如下图所示:

org.apache.flink.runtime.dispatcher.Dispatcher#createJobMasterRunner

    private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {
        Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
        return jobManagerRunnerFactory.createJobManagerRunner(
                jobGraph,
                configuration,
                getRpcService(),
                highAvailabilityServices,
                heartbeatServices,
                jobManagerSharedServices,
                new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                fatalErrorHandler,
                failureEnrichers,
                System.currentTimeMillis());
    }

org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory#createJobManagerRunner

    @Override
    public JobManagerRunner createJobManagerRunner(
            JobGraph jobGraph,
            Configuration configuration,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            JobManagerSharedServices jobManagerServices,
            JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
            FatalErrorHandler fatalErrorHandler,
            Collection<FailureEnricher> failureEnrichers,
            long initializationTimestamp)
            throws Exception {

     //忽略其他部分 
     
        //获取jobManagerLeaderElection
		final LeaderElection jobManagerLeaderElection =
                highAvailabilityServices.getJobManagerLeaderElection(jobGraph.getJobID());
                
        //创建DefaultJobMasterServiceFactory
        final DefaultJobMasterServiceFactory jobMasterServiceFactory =
                new DefaultJobMasterServiceFactory(
                        MdcUtils.scopeToJob(
                                jobGraph.getJobID(), jobManagerServices.getIoExecutor()),
                        rpcService,
                        jobMasterConfiguration,
                        jobGraph,
                        highAvailabilityServices,
                        slotPoolServiceSchedulerFactory,
                        jobManagerServices,
                        heartbeatServices,
                        jobManagerJobMetricGroupFactory,
                        fatalErrorHandler,
                        userCodeClassLoader,
                        failureEnrichers,
                        initializationTimestamp);   


        //创建JobMasterServiceLeadershipRunner
        return new JobMasterServiceLeadershipRunner(
                jobMasterServiceProcessFactory,
                jobManagerLeaderElection,
                jobResultStore,
                classLoaderLease,
                fatalErrorHandler);
    }

 

由继承关系可知,JobMasterServiceLeadershipRunner是一个Leader候选者类。在创建完JobMasterServiceLeadershipRunner实例后开始执行实例的start()方法进入候选Leader过程,选举成功后会回调到grantLeadership 方法;

org.apache.flink.runtime.dispatcher.Dispatcher#runJob

    private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType)
            throws Exception {
        //启动 会回调至org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.grantLeadership
        jobManagerRunner.start();
        jobManagerRunnerRegistry.register(jobManagerRunner);

     //忽略其他部分 
    }

 

org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner#grantLeadership

    //回调至此方法
    @Override
    public void grantLeadership(UUID leaderSessionID) {
        runIfStateRunning(
                //异步启动作业主服务进程
                () -> startJobMasterServiceProcessAsync(leaderSessionID),
                "starting a new JobMasterServiceProcess");
    }

org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner#startJobMasterServiceProcessAsync

    @GuardedBy("lock")
    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
   //忽略其他部分 
   
   //如果领导者有效,则创建新的作业主服务流程
    createNewJobMasterServiceProcessIfValidLeader(leaderSessionId);
    
   //忽略其他部分 
    }

org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcessIfValidLeader

    private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
        runIfValidLeader(
                leaderSessionId,
                () ->
                        ThrowingRunnable.unchecked(
                                //创建新的作业主服务流程
                                        () -> createNewJobMasterServiceProcess(leaderSessionId))
                                .run(),
                "create new job master service process");
    }

 

JobMasterServiceLeadershipRunner.createNewJobMasterServiceProcess(...)负责创建JobMasterServiceProcess实例。

org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess

    @GuardedBy("lock")
    private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
    //忽略其他部分 
    
        //创建了DefaultJobMasterServiceProcess 里面会创建JobMasterService
        jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);

    //忽略其他部分 
    }

org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory#create

    @Override
    public JobMasterServiceProcess create(UUID leaderSessionId) {
        //构造函数中创建了JobMasterService
        return new DefaultJobMasterServiceProcess(
                jobId,
                leaderSessionId,
                jobMasterServiceFactory,
                //创建存档执行图
                cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
    }

 

构造函数中会创建一个JobMaster实例,具体过程如下:

org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess#DefaultJobMasterServiceProcess

   public DefaultJobMasterServiceProcess(
            JobID jobId,
            UUID leaderSessionId,
            JobMasterServiceFactory jobMasterServiceFactory,
            Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
        this.jobId = jobId;
        this.leaderSessionId = leaderSessionId;
        this.jobMasterServiceFuture =
                //创建作业主服务
                jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);



   //忽略其他部分 
    }

org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory#createJobMasterService

    @Override
    public CompletableFuture<JobMasterService> createJobMasterService(
            UUID leaderSessionId, OnCompletionActions onCompletionActions) {

        return CompletableFuture.supplyAsync(
                FunctionUtils.uncheckedSupplier(
                        //内部创建作业主服务
                        () -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
                executor);
    }

org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory#internalCreateJobMasterService

    private JobMasterService internalCreateJobMasterService(
            UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {

        //创建JobMaster
        final JobMaster jobMaster =
                new JobMaster(
                        rpcService,
                        JobMasterId.fromUuidOrNull(leaderSessionId),
                        jobMasterConfiguration,
                        ResourceID.generate(),
                        jobGraph,
                        haServices,
                        slotPoolServiceSchedulerFactory,
                        jobManagerSharedServices,
                        heartbeatServices,
                        jobManagerJobMetricGroupFactory,
                        onCompletionActions,
                        fatalErrorHandler,
                        userCodeClassloader,
                        shuffleMaster,
                        lookup ->
                                new JobMasterPartitionTrackerImpl(
                                        jobGraph.getJobID(), shuffleMaster, lookup),
                        new DefaultExecutionDeploymentTracker(),
                        DefaultExecutionDeploymentReconciler::new,
                        BlocklistUtils.loadBlocklistHandlerFactory(
                                jobMasterConfiguration.getConfiguration()),
                        failureEnrichers,
                        initializationTimestamp);

        //启动 会回调至org.apache.flink.runtime.jobmaster.JobMaster.onStart
        jobMaster.start();

        return jobMaster;
    }

 

在JobMaster构造函数中会设置一系列的成员变量,其中schedulerNG成员代表Flink Task任务的调度器。调度器负责管理作业执行的所有相关过程。包括JobGraph到ExecutionGraph的转换过程、作业的发布、取消、停止过程、作业Task的发布、取消、停止过程、资源申请与释放、作业和Task的Failover等。

org.apache.flink.runtime.jobmaster.JobMaster#JobMaster

    public JobMaster(
            RpcService rpcService,
            JobMasterId jobMasterId,
            JobMasterConfiguration jobMasterConfiguration,
            ResourceID resourceId,
            JobGraph jobGraph,
            HighAvailabilityServices highAvailabilityService,
            SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
            JobManagerSharedServices jobManagerSharedServices,
            HeartbeatServices heartbeatServices,
            JobManagerJobMetricGroupFactory jobMetricGroupFactory,
            OnCompletionActions jobCompletionActions,
            FatalErrorHandler fatalErrorHandler,
            ClassLoader userCodeLoader,
            ShuffleMaster<?> shuffleMaster,
            PartitionTrackerFactory partitionTrackerFactory,
            ExecutionDeploymentTracker executionDeploymentTracker,
            ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
            BlocklistHandler.Factory blocklistHandlerFactory,
            Collection<FailureEnricher> failureEnrichers,
            long initializationTimestamp)
            throws Exception {

   //忽略其他部分 
   
        this.slotPoolService =
                checkNotNull(slotPoolServiceSchedulerFactory)
                        //创建SlotPoolService
                        .createSlotPoolService(
                                jid,
                                createDeclarativeSlotPoolFactory(
                                        jobMasterConfiguration.getConfiguration()));

    
        this.schedulerNG =
                //创建调度程序 代表Flink Task任务的调度器。调度器负责管理作业执行的所有相关过程。
                // 包括JobGraph到ExecutionGraph的转换过程、作业的发布、取消、停止过程、
                // 作业Task的发布、取消、停止过程、资源申请与释放、作业和Task的Failover等。
                createScheduler(
                        slotPoolServiceSchedulerFactory,
                        executionDeploymentTracker,
                        jobManagerJobMetricGroup,
                        jobStatusListener);


    }

org.apache.flink.runtime.jobmaster.JobMaster#createScheduler

    private SchedulerNG createScheduler(
            SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
            ExecutionDeploymentTracker executionDeploymentTracker,
            JobManagerJobMetricGroup jobManagerJobMetricGroup,
            JobStatusListener jobStatusListener)
            throws Exception {
        final SchedulerNG scheduler =
                slotPoolServiceSchedulerFactory.createScheduler(
                        log,
                        jobGraph,
                        ioExecutor,
                        jobMasterConfiguration.getConfiguration(),
                        slotPoolService,
                        futureExecutor,
                        userCodeLoader,
                        highAvailabilityServices.getCheckpointRecoveryFactory(),
                        rpcTimeout,
                        blobWriter,
                        jobManagerJobMetricGroup,
                        jobMasterConfiguration.getSlotRequestTimeout(),
                        shuffleMaster,
                        partitionTracker,
                        executionDeploymentTracker,
                        initializationTimestamp,
                        getMainThreadExecutor(),
                        fatalErrorHandler,
                        jobStatusListener,
                        failureEnrichers,
                        blocklistHandler::addNewBlockedNodes);

        return scheduler;
    }

 

ExecutionGraph的创建

createScheduler(...) 经过一系列的调用后,最终再DefaultScheduler的构造函数中触发了ExecutionGraph的创建,过程如下。

org.apache.flink.runtime.scheduler.DefaultScheduler#DefaultScheduler

    protected DefaultScheduler(
            final Logger log,
            final JobGraph jobGraph,
            final Executor ioExecutor,
            final Configuration jobMasterConfiguration,
            final Consumer<ComponentMainThreadExecutor> startUpAction,
            final ScheduledExecutor delayExecutor,
            final ClassLoader userCodeLoader,
            final CheckpointsCleaner checkpointsCleaner,
            final CheckpointRecoveryFactory checkpointRecoveryFactory,
            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
            final SchedulingStrategyFactory schedulingStrategyFactory,
            final FailoverStrategy.Factory failoverStrategyFactory,
            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
            final ExecutionOperations executionOperations,
            final ExecutionVertexVersioner executionVertexVersioner,
            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
            long initializationTimestamp,
            final ComponentMainThreadExecutor mainThreadExecutor,
            final JobStatusListener jobStatusListener,
            final Collection<FailureEnricher> failureEnrichers,
            final ExecutionGraphFactory executionGraphFactory,
            final ShuffleMaster<?> shuffleMaster,
            final Time rpcTimeout,
            final VertexParallelismStore vertexParallelismStore,
            final ExecutionDeployer.Factory executionDeployerFactory)
            throws Exception {

        //父类中触发ExecutionGraph实例的创建过程
        super(
                log,
                jobGraph,
                ioExecutor,
                jobMasterConfiguration,
                checkpointsCleaner,
                checkpointRecoveryFactory,
                jobManagerJobMetricGroup,
                executionVertexVersioner,
                initializationTimestamp,
                mainThreadExecutor,
                jobStatusListener,
                executionGraphFactory,
                vertexParallelismStore);
                

   //忽略其他部分 
  
    }

org.apache.flink.runtime.scheduler.SchedulerBase#SchedulerBase

    public SchedulerBase(
            final Logger log,
            final JobGraph jobGraph,
            final Executor ioExecutor,
            final Configuration jobMasterConfiguration,
            final CheckpointsCleaner checkpointsCleaner,
            final CheckpointRecoveryFactory checkpointRecoveryFactory,
            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
            final ExecutionVertexVersioner executionVertexVersioner,
            long initializationTimestamp,
            final ComponentMainThreadExecutor mainThreadExecutor,
            final JobStatusListener jobStatusListener,
            final ExecutionGraphFactory executionGraphFactory,
            final VertexParallelismStore vertexParallelismStore)
            throws Exception {

   //忽略其他部分 

        this.executionGraph =
                //创建和恢复执行图
                createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener,
                        vertexParallelismStore);

   //忽略其他部分 
    }

org.apache.flink.runtime.scheduler.SchedulerBase#createAndRestoreExecutionGraph

    private ExecutionGraph createAndRestoreExecutionGraph(
            CompletedCheckpointStore completedCheckpointStore,
            CheckpointsCleaner checkpointsCleaner,
            CheckpointIDCounter checkpointIdCounter,
            long initializationTimestamp,
            ComponentMainThreadExecutor mainThreadExecutor,
            JobStatusListener jobStatusListener,
            VertexParallelismStore vertexParallelismStore)
            throws Exception {

        //创建ExecutionGraph
        final ExecutionGraph newExecutionGraph =
                executionGraphFactory.createAndRestoreExecutionGraph(
                        jobGraph,
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
                                jobGraph.getJobType()),
                        initializationTimestamp,
                        new DefaultVertexAttemptNumberStore(),
                        vertexParallelismStore,
                        deploymentStateTimeMetrics,
                        getMarkPartitionFinishedStrategy(),
                        log);

        newExecutionGraph.setInternalTaskFailuresListener(
                new UpdateSchedulerNgOnInternalFailuresListener(this));
        newExecutionGraph.registerJobStatusListener(jobStatusListener);
        newExecutionGraph.start(mainThreadExecutor);

        return newExecutionGraph;
    }

 

JobMaster.start()方法解析

创建完后JobMaster实例后,随即执行jobMaster.start();方法开始ExecutionGraph的调度。本小节只初步介绍和组件服务有关的过程,具体的ExecutionGraph调度过程放到后面详解。

由Flink RPC框架可知,JobMaster.start()方法会触发rpcServer.start()方法,最后会回调到RpcEndpoint的onStart()方法上。

 

JobMaster.onStart()方法实现如下,主要由两大步骤组成,一是创建2个心跳服务,JobMaster -> TaskManager的心跳服务、ResourceManager -> JobMaster。二是执行ExecutionGraph调度,ExecutionGraph调度后面详细分析。

org.apache.flink.runtime.jobmaster.JobMaster#onStart

    //回调至此方法
    @Override
    protected void onStart() throws JobMasterException {
        try {
            //开始作业执行
            startJobExecution();
        } catch (Exception e) {
            final JobMasterException jobMasterException =
                    new JobMasterException("Could not start the JobMaster.", e);
            handleJobMasterError(jobMasterException);
            throw jobMasterException;
        }
    }

org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution

    private void startJobExecution() throws Exception {
        validateRunsInMainThread();

        JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
        shuffleMaster.registerJob(context);

        //启动作业主服务
        startJobMasterServices();

        log.info(
                "Starting execution of job '{}' ({}) under job master id {}.",
                jobGraph.getName(),
                jobGraph.getJobID(),
                getFencingToken());

        //开始安排
        startScheduling();
    }

 

startJobMasterServices()方法创建JobMaster -> TaskManager的心跳服务、ResourceManager -> JobMaster的心跳服务,启动slotPool服务,获取ResourceManager的leader检索服务,创建JobMaster和ResourceManager的链接。

org.apache.flink.runtime.jobmaster.JobMaster#startJobMasterServices

    private void startJobMasterServices() throws Exception {
        try {
            //创建TaskManager心跳管理器
            this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
            //创建ResourceManager心跳管理器
            this.resourceManagerHeartbeatManager =
                    createResourceManagerHeartbeatManager(heartbeatServices);

            // start the slot pool make sure the slot pool now accepts messages for this leader
            //启动 slotPool确保 slotPool现在接受该领导者的消息
            slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());

            // job is ready to go, try to establish connection with resource manager
            //   - activate leader retrieval for the resource manager
            //   - on notification of the leader, the connection will be established and
            //     the slot pool will start requesting slots
            //作业已准备就绪,尝试与资源管理器建立连接 - 激活资源管理器的领导者检索 - 在领导者通知后,将建立连接并且插槽池将开始请求插槽
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
        } catch (Exception e) {
            handleStartJobMasterServicesError(e);
        }
    }

 

org.apache.flink.runtime.jobmaster.JobMaster#startScheduling

    private void startScheduling() {
        //开始安排
        schedulerNG.startScheduling();
    }

 

小结

本文简单梳理了Dispatcher启动流程,并分析了JobGraph和ExecutionGraph的创建时机

 

参考资料:

Flink源码解析(十三)——Flink On Yarn JobManager启动过程Dispatcher启动解析


目录