Flink V1.20源码阅读笔记(5.1)- 集群启动流程之客户端作业提交过程

-
-
2024-10-08

Flink On Yarn三种客户端作业提交模式

方式一、Per-Job 模式 已处于Deprecated状态,后续不推荐使用。

作业提交命令行方式:

./bin/flink run -t yarn-per-job -d -ynm FlinkAppName -Dyarn.application.name=FlinkRetention -c com.dake.FlinkAppName ${JarFileDir}/FlinkStudy.jar xxx

方式二、Session模式

(1)、 ./bin/yarn-session.sh -jm <jm-memory> -tm <tm-memory> -s <slots-per-taskmanager> -z <zk-namespace> -nm <app-name> -d

(2)、 ./bin/flink run -c com.dake.FlinkAppName  -yid application_1602374521458_0001 ${JarFileDir}/FlinkStudy.jar xxx

方式三、Application模式

./bin/flink run-application -t yarn-application -Dparallelism.default=3 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=4096m -Dyarn.application.name=FlinkAppName -Dtaskmanager.numberOfTaskSlots=3 -c com.dake.FlinkAppName ${JarFileDir}/FlinkStudy.jar xxx

 

4、三种提交模式对比

exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

由bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFrontend#main触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端运行。客户端解析生成JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,这两种模式会导致客户端承受较大带宽压力。为解决该问题,Application模式将Flink应用main方法触发过程后置到JobManager生成过程中,以此将带宽压力分散到集群各节点上。

 

Application模式提交过程解析

YarnClient提交过程

本小节以Application提交模式为例,说明客户端提交Flink作业过程。由上可知CliFrontend为客户端进程入口类,其main(...)入参主要包括run-application、-t yarn-application、-c com.dake.FlinkAppName等。

 

org.apache.flink.client.cli.CliFrontend#main

    // 根据参数提交作业。
    public static void main(final String[] args) {
        int retCode = INITIAL_RET_CODE;
        try {
            retCode = mainInternal(args);
        } finally {
            System.exit(retCode);
        }
    }

 

org.apache.flink.client.cli.CliFrontend#mainInternal

会解析flink-conf.yaml文件,生成作业执行时需要的配置项,新建CliFrontend实例。构造函数中新建DefaultClusterClientServiceLoader实例并赋值成员变量clusterClientServiceLoader。该成员负责生成Yarn Client信息。

	@VisibleForTesting
    static int mainInternal(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        // 找到配置目录
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        //加载全局配置
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        // 加载自定义命令行
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = INITIAL_RET_CODE;
        try {
            // 构建CliFrontend实例 构造函数中创建了DefaultClusterClientServiceLoader
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            CommandLine commandLine =
                    cli.getCommandLine(
                            new Options(),
                            Arrays.copyOfRange(args, min(args.length, 1), args.length),
                            true);
            Configuration securityConfig = new Configuration(cli.configuration);
            //编码动态属性
            DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
            SecurityUtils.install(new SecurityConfiguration(securityConfig));
            //解析命令行参数并启动请求的操作。
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        }
        return retCode;
    }

 

org.apache.flink.client.cli.CliFrontend#CliFrontend

    //负责生成Yarn Client信息。
    private final ClusterClientServiceLoader clusterClientServiceLoader;

    public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
        this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
    }

    public CliFrontend(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            List<CustomCommandLine> customCommandLines) {
        this.configuration = checkNotNull(configuration);
        this.customCommandLines = checkNotNull(customCommandLines);
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);

        FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        this.customCommandLineOptions = new Options();

        for (CustomCommandLine customCommandLine : customCommandLines) {
            customCommandLine.addGeneralOptions(customCommandLineOptions);
            customCommandLine.addRunOptions(customCommandLineOptions);
        }
    }

 

org.apache.flink.client.cli.CliFrontend#parseAndRun

    /**
     * Parses the command line arguments and starts the requested action.
     *
     * @param args command line arguments of the client.
     * @return The return code of the program
     */
    //解析命令行参数并启动请求的操作。
    //参数:
    //args – 客户端的命令行参数。
    //返回:
    //程序的返回码
    public int parseAndRun(String[] args) {

        // check for action
        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }

        // get action
        String action = args[0];

        // remove action from parameters
        //从参数中删除action
        final String[] params = Arrays.copyOfRange(args, 1, args.length);

        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                case ACTION_INFO:
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    savepoint(params);
                    return 0;
                case ACTION_CHECKPOINT:
                    checkpoint(params);
                    return 0;
                case "-h":
                case "--help":
                    CliFrontendParser.printHelp(customCommandLines);
                    return 0;
                case "-v":
                case "--version":
                    String version = EnvironmentInformation.getVersion();
                    String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                    System.out.print("Version: " + version);
                    System.out.println(
                            commitID.equals(EnvironmentInformation.UNKNOWN)
                                    ? ""
                                    : ", Commit ID: " + commitID);
                    return 0;
                default:
                    System.out.printf("\"%s\" is not a valid action.\n", action);
                    System.out.println();
                    System.out.println(
                            "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                    System.out.println();
                    System.out.println(
                            "Specify the version option (-v or --version) to print Flink version.");
                    System.out.println();
                    System.out.println(
                            "Specify the help option (-h or --help) to get help on the command.");
                    return 1;
            }
        } catch (CliArgsException ce) {
            return handleArgException(ce);
        } catch (ProgramParametrizationException ppe) {
            return handleParametrizationException(ppe);
        } catch (ProgramMissingJobException pmje) {
            return handleMissingJobException();
        } catch (Exception e) {
            return handleError(e);
        }
    }

 

org.apache.flink.client.cli.CliFrontend#runApplication

会新建ProgramOptions实例,ProgramOptions.entryPointClass成员值是flink命令行-c选项指定的Flink应用入口类,后面会以反射的形式触发main()方法调用。entryPointClass值会赋值给ApplicationConfiguration.applicationClassName成员变量

    protected void runApplication(String[] args) throws Exception {
        LOG.info("Running 'run-application' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        //获取命令行
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(customCommandLines);
            return;
        }

        //验证并获取参数的自定义命令行。
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        final ProgramOptions programOptions;
        final Configuration effectiveConfiguration;

        // No need to set a jarFile path for Pyflink job.
        // Pyflink 无需设置jar文件路径
        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        } else {
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            //解析 JarFilePath URI
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            //获得有效的配置
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }

        //构建应用程序配置
        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(),
                        //是flink命令行-c选项指定的Flink应用入口类 后面会以反射的形式触发main()方法调用
                        programOptions.getEntryPointClassName());
        //提交用户程序执行并在集群上运行用户主方法
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

 

org.apache.flink.client.cli.ApplicationDeployer#run

负责加载Yarn Application模式客户端信息等。

由DefaultClusterClientServiceLoader.getClusterClientFactory(...)方法可知,ServiceLoader会判断所有ClusterClientFactory.isCompatibleWith()返回值为true的ClusterClientFactory类实例。Flink系统Yarn Application模式下ClusterClientFactory类体系isCompatibleWith()方法为true的可选项只有YarnClusterClientFactory,因此DefaultClusterClientServiceLoader.getClusterClientFactory(...)方法返回YarnClusterClientFactory实例。YarnClusterClientFactory实例负责生成Yarn Client信息继而提交Flink作业。

 

createClusterDescriptor(...)方法会新建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序

    //提交用户程序执行并在集群上运行用户主方法。
    //参数:
    //configuration ——包含有关提交用户程序的所有必要信息的配置。
    //applicationConfiguration – 特定于要执行的应用程序的ApplicationConfiguration 。
    public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        checkNotNull(configuration);
        checkNotNull(applicationConfiguration);

        LOG.info("Submitting application in 'Application Mode'.");

//        yarn 会使用YarnClusterClientFactory
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
        		//构建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序
                clientFactory.createClusterDescriptor(configuration)) {
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);

            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }

 

org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory

    //根据提供的配置发现适当的ClusterClientFactory 。
    //参数:
    //configuration ——将使用适当工厂所基于的配置。
    //返回:
    //适当的ClusterClientFactory 。
    @Override
    public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
            final Configuration configuration) {
        checkNotNull(configuration);

        //yarn 会使用YarnClusterClientFactory
        final ServiceLoader<ClusterClientFactory> loader =
                ServiceLoader.load(ClusterClientFactory.class);

        final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
        final Iterator<ClusterClientFactory> factories = loader.iterator();
        while (factories.hasNext()) {
            try {
                final ClusterClientFactory factory = factories.next();
                //如果当前ClusterClientFactory与提供的配置兼容,则返回true ,否则返回false 。
                if (factory != null && factory.isCompatibleWith(configuration)) {
                    compatibleFactories.add(factory);
                }
            } catch (Throwable e) {
                if (e.getCause() instanceof NoClassDefFoundError) {
                    LOG.info("Could not load factory due to missing dependencies.");
                } else {
                    throw e;
                }
            }
        }

        if (compatibleFactories.size() > 1) {
            final List<String> configStr =
                    configuration.toMap().entrySet().stream()
                            .map(e -> e.getKey() + "=" + e.getValue())
                            .collect(Collectors.toList());

            throw new IllegalStateException(
                    "Multiple compatible client factories found for:\n"
                            + String.join("\n", configStr)
                            + ".");
        }

        if (compatibleFactories.isEmpty()) {
            throw new IllegalStateException(
                    "No ClusterClientFactory found. If you were targeting a Yarn cluster, "
                            + "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
                            + "classpath. For more information refer to the \"Deployment\" section of the official "
                            + "Apache Flink documentation.");
        }

        return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
    }

 

org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster

进行一些配置和检查

     //触发应用程序集群的部署。这对应于专用于执行预定义应用程序的集群。
    //该集群将在应用程序提交时创建,并在应用程序终止时拆除。
    //另外,应用程序的用户代码的main()将在集群上执行,而不是在客户端上执行。
    //参数:
    //clusterSpecification – 定义要部署的集群的集群规范
    //applicationConfiguration – 应用程序特定的配置参数
    //返回:
    //集群的客户端
    @Override
    public ClusterClientProvider<ApplicationId> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        checkNotNull(clusterSpecification);
        checkNotNull(applicationConfiguration);

        final YarnDeploymentTarget deploymentTarget =
                YarnDeploymentTarget.fromConfig(flinkConfiguration);
        if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
            throw new ClusterDeploymentException(
                    "Couldn't deploy Yarn Application Cluster."
                            + " Expected deployment.target="
                            + YarnDeploymentTarget.APPLICATION.getName()
                            + " but actual one was \""
                            + deploymentTarget.getName()
                            + "\"");
        }

        applicationConfiguration.applyToConfiguration(flinkConfiguration);

        // No need to do pipelineJars validation if it is a PyFlink job.
        //如果是 PyFlink 作业,则无需进行 Pipers 验证。
        if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
                || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
            final List<String> pipelineJars =
                    flinkConfiguration
                            .getOptional(PipelineOptions.JARS)
                            .orElse(Collections.emptyList());
            Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
        }

        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink Application Cluster",
                    //入口类
                    YarnApplicationClusterEntryPoint.class.getName(),
                    null,
                    false);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
        }
    }

 

org.apache.flink.yarn.YarnClusterDescriptor#deployInternal

在方法中进行一些用户权限认证、配置参数检查、yarn队列检查等操作,通过YarnClient实例新建YarnClientApplication实例,进行yarn资源参数检查

    /**
     * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
     *
     * @param clusterSpecification Initial cluster specification for the Flink cluster to be
     *     deployed
     * @param applicationName name of the Yarn application to start
     * @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
     * @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
     * @param detached True if the cluster should be started in detached mode
     */
    //此方法将阻塞,直到 ApplicationMaster/ JobManager 部署到 YARN 上。
    //参数:
    //clusterSpecification – 要部署的 Flink 集群的初始集群规范
    //applicationName – 要启动的 Yarn 应用程序的名称
    //yarnClusterEntrypoint – Yarn 集群入口点的类名称。
    //jobGraph – 与 Flink 集群一起部署的作业图,如果没有null
    //detached – 如果集群应该以分离模式启动则为 true
    private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached)
            throws Exception {

        final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        //是否启用了 Kerberos 安全性
        if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
            boolean useTicketCache =
                    flinkConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

            if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
                throw new RuntimeException(
                        "Hadoop security with Kerberos is enabled but the login user "
                                + "does not have Kerberos credentials or delegation tokens!");
            }

            final boolean fetchToken =
                    flinkConfiguration.get(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            final boolean yarnAccessFSEnabled =
                    !CollectionUtil.isNullOrEmpty(
                            flinkConfiguration.get(
                                    SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS));
            if (!fetchToken && yarnAccessFSEnabled) {
                throw new IllegalConfigurationException(
                        String.format(
                                "When %s is disabled, %s must be disabled as well.",
                                SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
                                SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS.key()));
            }
        }

        //检查参数
        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------
        //检查指定队列是否存在
        checkYarnQueues(yarnClient);

        // ------------------ Check if the YARN ClusterClient has the requested resources
        // --------------

        // Create application via yarnClient
        //通过yarnClient创建应用程序
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        //获取新的应用程序响应
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

        //获得最大的资源能力
        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            //获取当前空闲集群资源
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException(
                    "Could not retrieve information about free cluster resources.", e);
        }

        // yarn最小分配 MB
        final int yarnMinAllocationMB =
                yarnConfiguration.getInt(
                        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
                        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
        if (yarnMinAllocationMB <= 0) {
            throw new YarnDeploymentException(
                    "The minimum allocation memory "
                            + "("
                            + yarnMinAllocationMB
                            + " MB) configured via '"
                            + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
                            + "' should be greater than 0.");
        }

        final ClusterSpecification validClusterSpecification;
        try {
            //有效的集群规范
            validClusterSpecification =
                    validateClusterResources(
                            clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);

        //执行模式
        final ClusterEntrypoint.ExecutionMode executionMode =
                detached
                        ? ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.set(
                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());

        ApplicationReport report =
                //启动AppMaster
                startAppMaster(
                        flinkConfiguration,
                        applicationName,
                        yarnClusterEntrypoint,
                        jobGraph,
                        yarnClient,
                        yarnApplication,
                        validClusterSpecification);

        // print the application id for user to cancel themselves.
        //打印应用程序 ID 供用户自行取消。
        if (detached) {
            final ApplicationId yarnApplicationId = report.getApplicationId();
            logDetachedClusterInformation(yarnApplicationId, LOG);
        }

        //将集群入口点信息设置为 config
        setClusterEntrypointInfoToConfig(report);

        return () -> {
            try {
                return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
            } catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }

 

org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster

总体来说就是通过YarnClient提交新建的Application信息,向Yarn:ResourceManager通信,启动上述ApplicationMaster过程

    private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)
            throws Exception {

        // ------------------ Initialize the file systems -------------------------

        //初始化文件系统
        org.apache.flink.core.fs.FileSystem.initialize(
                configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        final FileSystem fs = FileSystem.get(yarnConfiguration);

        // hard coded check for the GoogleHDFS client because its not overriding the getScheme()
        // method.
        //对 GoogleHDFS 客户端进行硬编码检查,因为它没有覆盖 getScheme() 方法。
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
                && fs.getScheme().startsWith("file")) {
            LOG.warn(
                    "The file system scheme is '"
                            + fs.getScheme()
                            + "'. This indicates that the "
                            + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
                            + "The Flink YARN client needs to store its files in a distributed file system");
        }

        //获取应用程序提交上下文
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

        //获取合格的远程提供的库目录
        final List<Path> providedLibDirs =
                Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);

        //获得合格的远程提供的 usr lib
        final Optional<Path> providedUsrLibDir =
                Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);

        //获取暂存目录
        Path stagingDirPath = getStagingDir(fs);
        FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
        final YarnApplicationFileUploader fileUploader =
                YarnApplicationFileUploader.from(
                        stagingDirFs,
                        stagingDirPath,
                        providedLibDirs,
                        appContext.getApplicationId(),
                        getFileReplication());

        // The files need to be shipped and added to classpath.
        //需要传送这些文件并将其添加到类路径中。
        Set<Path> systemShipFiles = new HashSet<>(shipFiles);

        final String logConfigFilePath =
                configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(getPathFromLocalFilePathStr(logConfigFilePath));
        }

        // Set-up ApplicationSubmissionContext for the application

        final ApplicationId appId = appContext.getApplicationId();

        // ------------------ Add Zookeeper namespace to local flinkConfiguration ------
        //将 Zookeeper 命名空间添加到本地 flinkConfiguration
        setHAClusterIdIfNotSet(configuration, appId);

        //高可用性模式是否已激活
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
            // activate re-execution of failed applications
            appContext.setMaxAppAttempts(
                    configuration.getInteger(
                            YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

            //激活高可用性支持
            activateHighAvailabilitySupport(appContext);
        } else {
            // set number of application retries to 1 in the default case
            //默认情况下将应用程序重试次数设置为 1
            appContext.setMaxAppAttempts(
                    configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }

        final Set<Path> userJarFiles = new HashSet<>();
        if (jobGraph != null) {
            userJarFiles.addAll(
                    jobGraph.getUserJars().stream()
                            .map(f -> f.toUri())
                            .map(Path::new)
                            .collect(Collectors.toSet()));
        }

        final List<URI> jarUrls =
                ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
        if (jarUrls != null
                && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }

        // only for per job mode
        //仅适用于每个作业模式
        if (jobGraph != null) {
            for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    jobGraph.getUserArtifacts().entrySet()) {
                // only upload local files
                //只上传本地文件
                if (!Utils.isRemotePath(entry.getValue().filePath)) {
                    Path localPath = new Path(entry.getValue().filePath);
                    Tuple2<Path, Long> remoteFileInfo =
                            fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
                    jobGraph.setUserArtifactRemotePath(
                            entry.getKey(), remoteFileInfo.f0.toString());
                }
            }

            jobGraph.writeUserArtifactEntriesToConfiguration();
        }

        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            addLibFoldersToShipFiles(systemShipFiles);
        }

        // Register all files in provided lib dirs as local resources with public visibility
        // and upload the remaining dependencies as local resources with APPLICATION visibility.
        //将提供的 lib 目录中的所有文件注册为具有公共可见性的本地资源,并将其余依赖项上传为具有应用程序可见性的本地资源。
        final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        //注册多个本地资源
        final List<String> uploadedDependencies =
                fileUploader.registerMultipleLocalResources(
                        systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);

        // upload and register ship-only files
        // Plugin files only need to be shipped and should not be added to classpath.
        //上传并注册仅发布文件插件文件只需要发布,不应添加到类路径中。
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            Set<Path> shipOnlyFiles = new HashSet<>();
            //添加插件文件夹来发送文件
            addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(
                    shipOnlyFiles, Path.CUR_DIR, LocalResourceType.FILE);
        }

        if (!shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(
                    shipArchives, Path.CUR_DIR, LocalResourceType.ARCHIVE);
        }

        // only for application mode
        // Python jar file only needs to be shipped and should not be added to classpath.
        //仅适用于应用程序模式 Python jar 文件仅需要传送,不应添加到类路径中。
        if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
                && PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
            fileUploader.registerMultipleLocalResources(
                    Collections.singletonList(
                            new Path(PackagedProgramUtils.getPythonJar().toURI())),
                    ConfigConstants.DEFAULT_FLINK_OPT_DIR,
                    LocalResourceType.FILE);
        }

        // Upload and register user jars
        //上传并注册用户 jar
        final List<String> userClassPaths =
                fileUploader.registerMultipleLocalResources(
                        userJarFiles,
                        userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
                                ? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
                                : Path.CUR_DIR,
                        LocalResourceType.FILE);

        // usrlib in remote will be used first.
        //首先使用远程中的 usrlib。
        if (providedUsrLibDir.isPresent()) {
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            Collections.singletonList(providedUsrLibDir.get()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        } else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
            // local usrlib will be automatically shipped if it exists and there is no remote
            // usrlib.
            //如果本地 usrlib 存在且没有远程 usrlib,则会自动传送。
            final Set<File> usrLibShipFiles = new HashSet<>();
            addUsrLibFolderToShipFiles(usrLibShipFiles);
            final List<String> usrLibClassPaths =
                    fileUploader.registerMultipleLocalResources(
                            usrLibShipFiles.stream()
                                    .map(e -> new Path(e.toURI()))
                                    .collect(Collectors.toSet()),
                            Path.CUR_DIR,
                            LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }

        // normalize classpath by sorting
        //通过排序标准化类路径
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);

        // classpath assembler
        //类路径汇编器
        StringBuilder classPathBuilder = new StringBuilder();
        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }

        // Setup jar for ApplicationMaster
        //为 ApplicationMaster 设置 jar
        final YarnLocalResourceDescriptor localResourceDescFlinkJar =
                fileUploader.uploadFlinkDist(flinkJarPath);
        classPathBuilder
                .append(localResourceDescFlinkJar.getResourceKey())
                .append(File.pathSeparator);

        // write job graph to tmp file and add it to local resource
        //将作业图写入 tmp 文件并将其添加到本地资源
        // TODO: server use user main method to generate job graph
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                        ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
                    obOutput.writeObject(jobGraph);
                }

                final String jobGraphFilename = "job.graph";
                configuration.set(JOB_GRAPH_FILE_PATH, jobGraphFilename);

                //注册单个本地资源
                fileUploader.registerSingleLocalResource(
                        jobGraphFilename,
                        new Path(tmpJobGraphFile.toURI()),
                        "",
                        LocalResourceType.FILE,
                        true,
                        false);
                classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
            } catch (Exception e) {
                LOG.warn("Add job graph to local resource fail.");
                throw e;
            } finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
                }
            }
        }

        // Upload the flink configuration
        // write out configuration file
        //上传flink配置写出配置文件
        File tmpConfigurationFile = null;
        try {
            String flinkConfigFileName = GlobalConfiguration.getFlinkConfFilename();
            tmpConfigurationFile = File.createTempFile(appId + "-" + flinkConfigFileName, null);

            // remove localhost bind hosts as they render production clusters unusable
            //删除本地主机绑定主机,因为它们会使生产集群无法使用
            removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
            removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
            // this setting is unconditionally overridden anyway, so we remove it for clarity
            //无论如何,此设置都会被无条件覆盖,因此为了清楚起见,我们将其删除
            configuration.removeConfig(TaskManagerOptions.HOST);

            BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

            //注册单个本地资源
            fileUploader.registerSingleLocalResource(
                    flinkConfigFileName,
                    new Path(tmpConfigurationFile.getAbsolutePath()),
                    "",
                    LocalResourceType.FILE,
                    true,
                    true);
            classPathBuilder.append(flinkConfigFileName).append(File.pathSeparator);
        } finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
            }
        }

        if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }

        // To support Yarn Secure Integration Test Scenario
        // In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
        // the Yarn site XML
        // and KRB5 configuration files. We are adding these files as container local resources for
        // the container
        // applications (JM/TMs) to have proper secure cluster setup
        //支持 Yarn 安全集成测试场景 在集成测试设置中,YarnMiniCluster 创建的 Yarn 容器没有 Yarn 站点 XML 和 KRB5 配置文件。
        // 我们将这些文件添加为容器本地资源,以便容器应用程序 (JMTM) 具有正确的安全集群设置
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
            LOG.info(
                    "Adding Yarn configuration {} to the AM container local resource bucket",
                    f.getAbsolutePath());
            Path yarnSitePath = new Path(f.getAbsolutePath());
            remoteYarnSiteXmlPath =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.YARN_SITE_FILE_NAME,
                                    yarnSitePath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(
                        SecurityOptions.KERBEROS_KRB5_PATH,
                        System.getProperty("java.security.krb5.conf"));
            }
        }

        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
            final File krb5 = new File(krb5Config);
            LOG.info(
                    "Adding KRB5 configuration {} to the AM container local resource bucket",
                    krb5.getAbsolutePath());
            final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path =
                    fileUploader
                            .registerSingleLocalResource(
                                    Utils.KRB5_FILE_NAME,
                                    krb5ConfPath,
                                    "",
                                    LocalResourceType.FILE,
                                    false,
                                    false)
                            .getPath();
            hasKrb5 = true;
        }

        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab = flinkConfiguration.get(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath = flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                // Localize the keytab to YARN containers via local resource.
                //通过本地资源将密钥表本地化到 YARN 容器。
                LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
                remotePathKeytab =
                        fileUploader
                                .registerSingleLocalResource(
                                        localizedKeytabPath,
                                        new Path(keytab),
                                        "",
                                        LocalResourceType.FILE,
                                        false,
                                        false)
                                .getPath();
            } else {
                // // Assume Keytab is pre-installed in the container.
                localizedKeytabPath =
                        flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }

        final JobManagerProcessSpec processSpec =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
        final ContainerLaunchContext amContainer =
                //设置ApplicationMaste容器
                setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);

        boolean fetchToken = configuration.get(SecurityOptions.DELEGATION_TOKENS_ENABLED);
        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
        if (kerberosLoginProvider.isLoginPossible(true)) {
            setTokensFor(amContainer, fetchToken);
        } else {
            LOG.info(
                    "Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
        }

        //设置本地资源
        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();

        Utils.setAclsFor(amContainer, flinkConfiguration);

        // Setup CLASSPATH and environment variables for ApplicationMaster
        //为ApplicationMaster设置CLASSPATH和环境变量
        final Map<String, String> appMasterEnv =
                generateApplicationMasterEnv(
                        fileUploader,
                        classPathBuilder.toString(),
                        localResourceDescFlinkJar.toString(),
                        appId.toString());

        if (localizedKeytabPath != null) {
            appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
            String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
            }
        }

        // To support Yarn Secure Integration Test Scenario
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put(
                    YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
        }

        //设置环境
        amContainer.setEnvironment(appMasterEnv);

        // Set up resource type requirements for ApplicationMaster
        //设置ApplicationMaster的资源类型要求
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES));

        final String customApplicationName = customName != null ? customName : applicationName;

        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);

        // Set priority for application
        //设置应用程序的优先级
        int priorityNum = flinkConfiguration.get(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance(priorityNum);
            appContext.setPriority(priority);
        }

        if (yarnQueue != null) {
            appContext.setQueue(yarnQueue);
        }

        //设置应用程序节点标签
        setApplicationNodeLabel(appContext);

        //设置应用程序标签
        setApplicationTags(appContext);

        // add a hook to clean up in case deployment fails
        //添加一个钩子以在部署失败时进行清理
        Thread deploymentFailureHook =
                new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        //提交申请
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        long lastLogTime = System.currentTimeMillis();
        ApplicationReport report;
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        loop:
        while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            } catch (IOException e) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", appState);
            switch (appState) {
                case FAILED:
                case KILLED:
                    throw new YarnDeploymentException(
                            "The YARN application unexpectedly switched to state "
                                    + appState
                                    + " during deployment. \n"
                                    + "Diagnostics from YARN: "
                                    + report.getDiagnostics()
                                    + "\n"
                                    + "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
                                    + "yarn logs -applicationId "
                                    + appId);
                    // break ..
                case RUNNING:
                    LOG.info("YARN application has been deployed successfully.");
                    break loop;
                case FINISHED:
                    LOG.info("YARN application has been finished successfully.");
                    break loop;
                default:
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - lastLogTime > 60000) {
                        lastLogTime = System.currentTimeMillis();
                        LOG.info(
                                "Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster",
                                (lastLogTime - startTime) / 1000);
                    }
            }
            lastAppState = appState;
            Thread.sleep(250);
        }

        // since deployment was successful, remove the hook
        //由于部署成功,删除钩子
        ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
        return report;
    }

 

Flink应用main方法启动过程

上一小节讲述到YarnClient提交过程,Yarn:ResourceManager收到ApplicationMaster信息后,会通知NodeManager分配Container并启动ApplicationMaster程序,即上面讲的YarnApplicationClusterEntryPoint程序。

此小节着重分析YarnApplicationClusterEntryPoint涉及的Flink应用main(...)方法启动过程,YarnApplicationClusterEntryPoint整体启动过程在后面详细讲解。

触发leader选举流程

 

org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint#main

public static void main(final String[] args) {
        // startup checks and logging
        //启动检查和日志记录
        EnvironmentInformation.logEnvironmentInfo(
                LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        Map<String, String> env = System.getenv();

        //工作目录
        final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
        Preconditions.checkArgument(
                workingDirectory != null,
                "Working directory variable (%s) not set",
                ApplicationConstants.Environment.PWD.key());

        try {
            //记录yarn环境信息
            YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
        } catch (IOException e) {
            LOG.warn("Could not log YARN environment information.", e);
        }

        final Configuration dynamicParameters =
                //解析参数或退出
                ClusterEntrypointUtils.parseParametersOrExit(
                        args,
                        new DynamicParametersConfigurationParserFactory(),
                        YarnApplicationClusterEntryPoint.class);
        final Configuration configuration =
                YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

        PackagedProgram program = null;
        try {
            //生成Flink应用入口类main(...)方法信息,后面通过反射方式触发Flink应用main(...)方法执行。
            program = getPackagedProgram(configuration);
        } catch (Exception e) {
            LOG.error("Could not create application program.", e);
            System.exit(1);
        }

        try {
            //配置执行
            configureExecution(configuration, program);
        } catch (Exception e) {
            LOG.error("Could not apply application configuration.", e);
            System.exit(1);
        }

        YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =
                //YarnApplicationCluster入口点
                new YarnApplicationClusterEntryPoint(configuration, program);

        //运行集群
        ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
    }

 

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

        final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
        try {
            //启动集群
            clusterEntrypoint.startCluster();
        } catch (ClusterEntrypointException e) {
            LOG.error(
                    String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
                    e);
            System.exit(STARTUP_FAILURE_RETURN_CODE);
        }

        int returnCode;
        Throwable throwable = null;

        try {
            returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
        } catch (Throwable e) {
            throwable = ExceptionUtils.stripExecutionException(e);
            returnCode = RUNTIME_FAILURE_RETURN_CODE;
        }

        LOG.info(
                "Terminating cluster entrypoint process {} with exit code {}.",
                clusterEntrypointName,
                returnCode,
                throwable);
        System.exit(returnCode);
    }

 

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#startCluster

    public void startCluster() throws ClusterEntrypointException {
        LOG.info("Starting {}.", getClass().getSimpleName());

        try {
            //从配置中设置Flink安全管理器
            FlinkSecurityManager.setFromConfiguration(configuration);
            PluginManager pluginManager =
                    //从根文件夹创建插件管理器
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            configureFileSystems(configuration, pluginManager);

            //安装安全上下文
            SecurityContext securityContext = installSecurityContext(configuration);

            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> {
                                //运行集群
                                runCluster(configuration, pluginManager);

                                return null;
                            });
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

            try {
                // clean up any partial state
                //清理任何部分状态
                shutDownAsync(
                                ApplicationStatus.FAILED,
                                ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                ExceptionUtils.stringifyException(strippedThrowable),
                                false)
                        .get(
                                INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
                                TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                strippedThrowable.addSuppressed(e);
            }

            throw new ClusterEntrypointException(
                    String.format(
                            "Failed to initialize the cluster entrypoint %s.",
                            getClass().getSimpleName()),
                    strippedThrowable);
        }
    }

 

org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster

    //运行集群
    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //初始化服务
            initializeServices(configuration, pluginManager);

            // write host information into configuration
            //将主机信息写入配置
            configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());

            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                    //创建调度程序资源管理器组件工厂 返回的是DefaultDispatcherResourceManagerComponentFactory
                            createDispatcherResourceManagerComponentFactory(configuration);

            //通过一系列调用链路生成dispatcherRunner实例,
            //dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
            //而dispatcher组件负责触发Flink应用main(...)方法执行
            //用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            resourceId.unwrap(),
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            delegationTokenManager,
                            metricRegistry,
                            executionGraphInfoStore,
                            //rpc 指标查询服务检索器
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            failureEnrichers,
                            this);

            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                            (ApplicationStatus applicationStatus, Throwable throwable) -> {
                                if (throwable != null) {
                                    shutDownAsync(
                                            ApplicationStatus.UNKNOWN,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            ExceptionUtils.stringifyException(throwable),
                                            false);
                                } else {
                                    // This is the general shutdown path. If a separate more
                                    // specific shutdown was
                                    // already triggered, this will do nothing
                                    //这是一般的关闭路径。如果已经触发了单独的更具体的关闭,则这不会执行任何操作
                                    shutDownAsync(
                                            applicationStatus,
                                            ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                            null,
                                            true);
                                }
                            });
        }
    }

 

org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#createDispatcherResourceManagerComponentFactory

返回的是DefaultDispatcherResourceManagerComponentFactory

    @Override
    protected DefaultDispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(Configuration configuration)
                    throws IOException {
        return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
                YarnResourceManagerFactory.getInstance(),
                FileJobGraphRetriever.createFrom(
                        configuration,
                        YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null)));
    }

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createJobComponentFactory

    public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory(
            ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobGraphRetriever) {
        return new DefaultDispatcherResourceManagerComponentFactory(
                DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever),
                resourceManagerFactory,
                JobRestEndpointFactory.INSTANCE);
    }

 

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

通过一系列调用链路生成dispatcherRunner实例,dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,而dispatcher组件负责触发Flink应用main(...)方法执行。

    //用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
    //dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
    //而dispatcher组件负责触发Flink应用main(...)方法执行
    @Override
    public DispatcherResourceManagerComponent create(
            Configuration configuration,
            ResourceID resourceId,
            Executor ioExecutor,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            BlobServer blobServer,
            HeartbeatServices heartbeatServices,
            DelegationTokenManager delegationTokenManager,
            MetricRegistry metricRegistry,
            ExecutionGraphInfoStore executionGraphInfoStore,
            MetricQueryServiceRetriever metricQueryServiceRetriever,
            Collection<FailureEnricher> failureEnrichers,
            FatalErrorHandler fatalErrorHandler)
            throws Exception {

//暂时忽略其他部分.....    
        DispatcherRunner dispatcherRunner = null;
        try {
            log.debug("Starting Dispatcher.");
            //创建并触发dispatcher组件高可用Leader选举过程。
            dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElection(),
                            fatalErrorHandler,
                            new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

//暂时忽略其他部分.....
        } catch (Exception exception) {
          
        }
    }

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner

    @Override
    public DispatcherRunner createDispatcherRunner(
            LeaderElection leaderElection,
            FatalErrorHandler fatalErrorHandler,
            JobPersistenceComponentFactory jobPersistenceComponentFactory,
            Executor ioExecutor,
            RpcService rpcService,
            PartialDispatcherServices partialDispatcherServices)
            throws Exception {

        final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
                dispatcherLeaderProcessFactoryFactory.createFactory(
                        jobPersistenceComponentFactory,
                        ioExecutor,
                        rpcService,
                        partialDispatcherServices,
                        fatalErrorHandler);

        //创建并触发dispatcher组件高可用Leader选举过程。
        return DefaultDispatcherRunner.create(
                leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
    }

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#create

    public static DispatcherRunner create(
            LeaderElection leaderElection,
            FatalErrorHandler fatalErrorHandler,
            DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
            throws Exception {
        final DefaultDispatcherRunner dispatcherRunner =
                new DefaultDispatcherRunner(
                        leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
        //触发dispatcher组件高可用Leader选举过程。
        dispatcherRunner.start();
        return dispatcherRunner;
    }

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#start

    void start() throws Exception {
        //将通过的LeaderContender注册到领导者选举流程。
        leaderElection.startLeaderElection(this);
    }

 

leader 回调,触发Flink main方法

选举为leader的DefaultDispatcherRunner实例候选者在回调动作过程中会调用到grantLeadership(...)方法

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#grantLeadership

    @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    //生成并启动dispatcherLeaderProcess
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

 

org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#startNewDispatcherLeaderProcess

    private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
        stopDispatcherLeaderProcess();

        //返回的实际类型是SessionDispatcherLeaderProcess
        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
        FutureUtils.assertNoException(
                previousDispatcherLeaderProcessTerminationFuture.thenRun(
                        //启动dispatcherLeaderProcess实例
                        newDispatcherLeaderProcess::start));
    }

 

org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#start

    @Override
    public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

 

org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#startInternal

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
        onStart();
    }

 

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#onStart

    @Override
    protected void onStart() {
        //启动jobGraphStore
        startServices();

        //一系列调用后触发Flink应用main(...)方法的执行
        onGoingRecoveryOperation =
                createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();
    }

 

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults

    private CompletableFuture<Void>
            createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {
        final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
                CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);

        return dirtyJobsFuture
                .thenApplyAsync(
                        dirtyJobs ->
                                //如果正在运行则恢复作业
                                this.recoverJobsIfRunning(
                                        dirtyJobs.stream()
                                                .map(JobResult::getJobId)
                                                .collect(Collectors.toSet())),
                        ioExecutor)
                //如果正在运行则创建调度程序
                .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
                .handle(this::onErrorIfRunning);
    }

 

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherIfRunning

    private void createDispatcherIfRunning(
            Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
        //创建调度程序
        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));
    }

 

org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcher

    private void createDispatcher(
            Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {

        final DispatcherGatewayService dispatcherService =
                //会新建ApplicationDispatcherBootstrap实例 并在构建时触发Flink应用main
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()),
                        jobGraphs,
                        recoveredDirtyJobResults,
                        jobGraphStore,
                        jobResultStore);

        completeDispatcherSetup(dispatcherService);
    }

 

    @Override
    public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            Collection<JobResult> recoveredDirtyJobResults,
            JobGraphWriter jobGraphWriter,
            JobResultStore jobResultStore) {

        final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

        final Dispatcher dispatcher;
        try {
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            recoveredDirtyJobResults,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    //新建ApplicationDispatcherBootstrap实例
                                    new ApplicationDispatcherBootstrap(
                                            application,
                                            recoveredJobIds,
                                            configuration,
                                            dispatcherGateway,
                                            scheduledExecutor,
                                            errorHandler),
                            PartialDispatcherServicesWithJobPersistenceComponents.from(
                                    partialDispatcherServices, jobGraphWriter, jobResultStore));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        //Dispatcher  启动
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

 

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#ApplicationDispatcherBootstrap

    public ApplicationDispatcherBootstrap(
            final PackagedProgram application,
            final Collection<JobID> recoveredJobIds,
            final Configuration configuration,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final FatalErrorHandler errorHandler) {
        this.configuration = checkNotNull(configuration);
        this.recoveredJobIds = checkNotNull(recoveredJobIds);
        this.application = checkNotNull(application);
        this.errorHandler = checkNotNull(errorHandler);

        this.applicationCompletionFuture =
                //修复JobId 并异步运行应用程序
                fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

        this.bootstrapCompletionFuture = finishBootstrapTasks(dispatcherGateway);
    }

 

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#fixJobIdAndRunApplicationAsync

    private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
            final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) {
        final Optional<String> configuredJobId =
                configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
        final boolean submitFailedJobOnApplicationError =
                configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
        //非HA模式
        if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
                && !configuredJobId.isPresent()) {
            //异步运行应用程序
            return runApplicationAsync(
                    dispatcherGateway, scheduledExecutor, false, submitFailedJobOnApplicationError);
        }
        if (!configuredJobId.isPresent()) {
            // In HA mode, we only support single-execute jobs at the moment. Here, we manually
            // generate the job id, if not configured, from the cluster id to keep it consistent
            // across failover.
            //在HA模式下,我们目前仅支持单执行作业。
            //在这里,我们从集群 ID 手动生成作业 ID(如果未配置),以使其在故障转移期间保持一致。
            configuration.set(
                    PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
                    new JobID(
                                    Preconditions.checkNotNull(
                                                    configuration.get(
                                                            HighAvailabilityOptions.HA_CLUSTER_ID))
                                            .hashCode(),
                                    0)
                            .toHexString());
        }
        //异步运行应用程序
        return runApplicationAsync(
                dispatcherGateway, scheduledExecutor, true, submitFailedJobOnApplicationError);
    }

 

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAsync

    /**
     * Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
     * The returned {@link CompletableFuture} completes when all jobs of the user application
     * succeeded. if any of them fails, or if job submission fails.
     */
    //通过在给定的scheduledExecutor上调度任务来运行用户程序入口点。
    //当用户应用程序的所有作业成功时,返回的CompletableFuture完成。
    //如果其中任何一个失败,或者作业提交失败。
    private CompletableFuture<Void> runApplicationAsync(
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution,
            final boolean submitFailedJobOnApplicationError) {
        final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();
        final Set<JobID> tolerateMissingResult = Collections.synchronizedSet(new HashSet<>());

        // we need to hand in a future as return value because we need to get those JobIs out
        // from the scheduled task that executes the user program
        //我们需要将 future 作为返回值,因为我们需要从执行用户程序的计划任务中获取这些 JobIs
        applicationExecutionTask =
                scheduledExecutor.schedule(
                        () ->
                                //运行应用程序入口点
                                runApplicationEntryPoint(
                                        applicationExecutionFuture,
                                        tolerateMissingResult,
                                        dispatcherGateway,
                                        scheduledExecutor,
                                        enforceSingleJobExecution,
                                        submitFailedJobOnApplicationError),
                        0L,
                        TimeUnit.MILLISECONDS);

        return applicationExecutionFuture.thenCompose(
                jobIds ->
                        getApplicationResult(
                                dispatcherGateway,
                                jobIds,
                                tolerateMissingResult,
                                scheduledExecutor));
    }

 

org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPoint

    /**
     * Runs the user program entrypoint and completes the given {@code jobIdsFuture} with the {@link
     * JobID JobIDs} of the submitted jobs.
     *
     * <p>This should be executed in a separate thread (or task).
     */
    //运行用户程序入口点并使用已提交作业的JobIDs完成给定的jobIdsFuture 。
    //这应该在单独的线程(或任务)中执行。
    private void runApplicationEntryPoint(
            final CompletableFuture<List<JobID>> jobIdsFuture,
            final Set<JobID> tolerateMissingResult,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution,
            final boolean submitFailedJobOnApplicationError) {
        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
            jobIdsFuture.completeExceptionally(
                    new ApplicationExecutionException(
                            String.format(
                                    "Submission of failed job in case of an application error ('%s') is not supported in non-HA setups.",
                                    DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
                                            .key())));
            return;
        }
        final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
        try {
            final PipelineExecutorServiceLoader executorServiceLoader =
                    new EmbeddedExecutorServiceLoader(
                            applicationJobIds, dispatcherGateway, scheduledExecutor);

            //执行程序
            ClientUtils.executeProgram(
                    executorServiceLoader,
                    configuration,
                    application,
                    enforceSingleJobExecution,
                    true /* suppress sysout */);

            if (applicationJobIds.isEmpty()) {
                jobIdsFuture.completeExceptionally(
                        new ApplicationExecutionException(
                                "The application contains no execute() calls."));
            } else {
                jobIdsFuture.complete(applicationJobIds);
            }
        } catch (Throwable t) {
            // If we're running in a single job execution mode, it's safe to consider re-submission
            // of an already finished a success.
            final Optional<DuplicateJobSubmissionException> maybeDuplicate =
                    ExceptionUtils.findThrowable(t, DuplicateJobSubmissionException.class);
            if (enforceSingleJobExecution
                    && maybeDuplicate.isPresent()
                    && maybeDuplicate.get().isGloballyTerminated()) {
                final JobID jobId = maybeDuplicate.get().getJobID();
                tolerateMissingResult.add(jobId);
                jobIdsFuture.complete(Collections.singletonList(jobId));
            } else if (submitFailedJobOnApplicationError && applicationJobIds.isEmpty()) {
                final JobID failedJobId =
                        JobID.fromHexString(
                                configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
                dispatcherGateway
                        .submitFailedJob(failedJobId, FAILED_JOB_NAME, t)
                        .thenAccept(
                                ignored ->
                                        jobIdsFuture.complete(
                                                Collections.singletonList(failedJobId)));
            } else {
                jobIdsFuture.completeExceptionally(
                        new ApplicationExecutionException("Could not execute application.", t));
            }
        }
    }

 

org.apache.flink.client.ClientUtils#executeProgram

    public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout)
            throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info(
                    "Starting program (detached: {})",
                    !configuration.get(DeploymentOptions.ATTACHED));

            //设置上下文
            ContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            StreamContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            // For DataStream v2.
            ExecutionContextEnvironment.setAsContext(
                    executorServiceLoader, configuration, userCodeClassLoader);

            try {
                //调用交互模式执行
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
                // For DataStream v2.
                ExecutionContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

 

org.apache.flink.client.program.PackagedProgram#invokeInteractiveModeForExecution

    /**
     * This method assumes that the context environment is prepared, or the execution will be a
     * local execution by default.
     */
    //该方法假设上下文环境已准备好,否则默认执行本地执行。
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            //调用主方法
            callMainMethod(mainClass, args);
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

 

org.apache.flink.client.program.PackagedProgram#callMainMethod

    private static void callMainMethod(Class<?> entryClass, String[] args)
            throws ProgramInvocationException {
        Method mainMethod;
        if (!Modifier.isPublic(entryClass.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " must be public.");
        }

        try {
            mainMethod = entryClass.getMethod("main", String[].class);
        } catch (NoSuchMethodException e) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " has no main(String[]) method.");
        } catch (Throwable t) {
            throw new ProgramInvocationException(
                    "Could not look up the main(String[]) method from the class "
                            + entryClass.getName()
                            + ": "
                            + t.getMessage(),
                    t);
        }

        if (!Modifier.isStatic(mainMethod.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " declares a non-static main method.");
        }
        if (!Modifier.isPublic(mainMethod.getModifiers())) {
            throw new ProgramInvocationException(
                    "The class " + entryClass.getName() + " declares a non-public main method.");
        }

        try {
            //反射调用执行main方法
            mainMethod.invoke(null, (Object) args);
        } catch (IllegalArgumentException e) {
            throw new ProgramInvocationException(
                    "Could not invoke the main method, arguments are not matching.", e);
        } catch (IllegalAccessException e) {
            throw new ProgramInvocationException(
                    "Access to the main method was denied: " + e.getMessage(), e);
        } catch (InvocationTargetException e) {
            Throwable exceptionInMethod = e.getTargetException();
            if (exceptionInMethod instanceof Error) {
                throw (Error) exceptionInMethod;
            } else if (exceptionInMethod instanceof ProgramParametrizationException) {
                throw (ProgramParametrizationException) exceptionInMethod;
            } else if (exceptionInMethod instanceof ProgramInvocationException) {
                throw (ProgramInvocationException) exceptionInMethod;
            } else {
                throw new ProgramInvocationException(
                        "The main method caused an error: " + exceptionInMethod.getMessage(),
                        exceptionInMethod);
            }
        } catch (Throwable t) {
            throw new ProgramInvocationException(
                    "An error occurred while invoking the program's main method: " + t.getMessage(),
                    t);
        }
    }

 

小结

本文简单分析了Flink On Yarn Application模式提交过程解析

 

参考资料:

Flink源码解析(十)——Flink On Yarn客户端提交过程解析


目录