Flink On Yarn三种客户端作业提交模式※
方式一、Per-Job 模式 已处于Deprecated状态,后续不推荐使用。
作业提交命令行方式:
./bin/flink run -t yarn-per-job -d -ynm FlinkAppName -Dyarn.application.name=FlinkRetention -c com.dake.FlinkAppName ${JarFileDir}/FlinkStudy.jar xxx
方式二、Session模式
(1)、 ./bin/yarn-session.sh -jm <jm-memory> -tm <tm-memory> -s <slots-per-taskmanager> -z <zk-namespace> -nm <app-name> -d
(2)、 ./bin/flink run -c com.dake.FlinkAppName -yid application_1602374521458_0001 ${JarFileDir}/FlinkStudy.jar xxx
方式三、Application模式
./bin/flink run-application -t yarn-application -Dparallelism.default=3 -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=4096m -Dyarn.application.name=FlinkAppName -Dtaskmanager.numberOfTaskSlots=3 -c com.dake.FlinkAppName ${JarFileDir}/FlinkStudy.jar xxx
4、三种提交模式对比
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
由bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFrontend#main触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端运行。客户端解析生成JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,这两种模式会导致客户端承受较大带宽压力。为解决该问题,Application模式将Flink应用main方法触发过程后置到JobManager生成过程中,以此将带宽压力分散到集群各节点上。
Application模式提交过程解析※
YarnClient提交过程※
本小节以Application提交模式为例,说明客户端提交Flink作业过程。由上可知CliFrontend为客户端进程入口类,其main(...)入参主要包括run-application、-t yarn-application、-c com.dake.FlinkAppName等。
org.apache.flink.client.cli.CliFrontend#main
// 根据参数提交作业。
public static void main(final String[] args) {
int retCode = INITIAL_RET_CODE;
try {
retCode = mainInternal(args);
} finally {
System.exit(retCode);
}
}
org.apache.flink.client.cli.CliFrontend#mainInternal
会解析flink-conf.yaml文件,生成作业执行时需要的配置项,新建CliFrontend实例。构造函数中新建DefaultClusterClientServiceLoader实例并赋值成员变量clusterClientServiceLoader。该成员负责生成Yarn Client信息。
@VisibleForTesting
static int mainInternal(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 找到配置目录
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
//加载全局配置
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 加载自定义命令行
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = INITIAL_RET_CODE;
try {
// 构建CliFrontend实例 构造函数中创建了DefaultClusterClientServiceLoader
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
CommandLine commandLine =
cli.getCommandLine(
new Options(),
Arrays.copyOfRange(args, min(args.length, 1), args.length),
true);
Configuration securityConfig = new Configuration(cli.configuration);
//编码动态属性
DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
SecurityUtils.install(new SecurityConfiguration(securityConfig));
//解析命令行参数并启动请求的操作。
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
}
return retCode;
}
org.apache.flink.client.cli.CliFrontend#CliFrontend
//负责生成Yarn Client信息。
private final ClusterClientServiceLoader clusterClientServiceLoader;
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
}
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
}
org.apache.flink.client.cli.CliFrontend#parseAndRun
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
//解析命令行参数并启动请求的操作。
//参数:
//args – 客户端的命令行参数。
//返回:
//程序的返回码
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
String action = args[0];
// remove action from parameters
//从参数中删除action
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case ACTION_CHECKPOINT:
checkpoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println(
"Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
org.apache.flink.client.cli.CliFrontend#runApplication
会新建ProgramOptions实例,ProgramOptions.entryPointClass成员值是flink命令行-c选项指定的Flink应用入口类,后面会以反射的形式触发main()方法调用。entryPointClass值会赋值给ApplicationConfiguration.applicationClassName成员变量
protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//获取命令行
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
//验证并获取参数的自定义命令行。
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
// No need to set a jarFile path for Pyflink job.
// Pyflink 无需设置jar文件路径
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
//解析 JarFilePath URI
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
//获得有效的配置
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
//构建应用程序配置
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(),
//是flink命令行-c选项指定的Flink应用入口类 后面会以反射的形式触发main()方法调用
programOptions.getEntryPointClassName());
//提交用户程序执行并在集群上运行用户主方法
deployer.run(effectiveConfiguration, applicationConfiguration);
}
org.apache.flink.client.cli.ApplicationDeployer#run
负责加载Yarn Application模式客户端信息等。
由DefaultClusterClientServiceLoader.getClusterClientFactory(...)方法可知,ServiceLoader会判断所有ClusterClientFactory.isCompatibleWith()返回值为true的ClusterClientFactory类实例。Flink系统Yarn Application模式下ClusterClientFactory类体系isCompatibleWith()方法为true的可选项只有YarnClusterClientFactory,因此DefaultClusterClientServiceLoader.getClusterClientFactory(...)方法返回YarnClusterClientFactory实例。YarnClusterClientFactory实例负责生成Yarn Client信息继而提交Flink作业。
createClusterDescriptor(...)方法会新建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序
//提交用户程序执行并在集群上运行用户主方法。
//参数:
//configuration ——包含有关提交用户程序的所有必要信息的配置。
//applicationConfiguration – 特定于要执行的应用程序的ApplicationConfiguration 。
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
// yarn 会使用YarnClusterClientFactory
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
//构建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序
clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory
//根据提供的配置发现适当的ClusterClientFactory 。
//参数:
//configuration ——将使用适当工厂所基于的配置。
//返回:
//适当的ClusterClientFactory 。
@Override
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
final Configuration configuration) {
checkNotNull(configuration);
//yarn 会使用YarnClusterClientFactory
final ServiceLoader<ClusterClientFactory> loader =
ServiceLoader.load(ClusterClientFactory.class);
final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
final Iterator<ClusterClientFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final ClusterClientFactory factory = factories.next();
//如果当前ClusterClientFactory与提供的配置兼容,则返回true ,否则返回false 。
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable e) {
if (e.getCause() instanceof NoClassDefFoundError) {
LOG.info("Could not load factory due to missing dependencies.");
} else {
throw e;
}
}
}
if (compatibleFactories.size() > 1) {
final List<String> configStr =
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());
throw new IllegalStateException(
"Multiple compatible client factories found for:\n"
+ String.join("\n", configStr)
+ ".");
}
if (compatibleFactories.isEmpty()) {
throw new IllegalStateException(
"No ClusterClientFactory found. If you were targeting a Yarn cluster, "
+ "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
+ "classpath. For more information refer to the \"Deployment\" section of the official "
+ "Apache Flink documentation.");
}
return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
}
org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster
进行一些配置和检查
//触发应用程序集群的部署。这对应于专用于执行预定义应用程序的集群。
//该集群将在应用程序提交时创建,并在应用程序终止时拆除。
//另外,应用程序的用户代码的main()将在集群上执行,而不是在客户端上执行。
//参数:
//clusterSpecification – 定义要部署的集群的集群规范
//applicationConfiguration – 应用程序特定的配置参数
//返回:
//集群的客户端
@Override
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster."
+ " Expected deployment.target="
+ YarnDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget.getName()
+ "\"");
}
applicationConfiguration.applyToConfiguration(flinkConfiguration);
// No need to do pipelineJars validation if it is a PyFlink job.
//如果是 PyFlink 作业,则无需进行 Pipers 验证。
if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
final List<String> pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
}
try {
return deployInternal(
clusterSpecification,
"Flink Application Cluster",
//入口类
YarnApplicationClusterEntryPoint.class.getName(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
}
}
org.apache.flink.yarn.YarnClusterDescriptor#deployInternal
在方法中进行一些用户权限认证、配置参数检查、yarn队列检查等操作,通过YarnClient实例新建YarnClientApplication实例,进行yarn资源参数检查
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification Initial cluster specification for the Flink cluster to be
* deployed
* @param applicationName name of the Yarn application to start
* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
//此方法将阻塞,直到 ApplicationMaster/ JobManager 部署到 YARN 上。
//参数:
//clusterSpecification – 要部署的 Flink 集群的初始集群规范
//applicationName – 要启动的 Yarn 应用程序的名称
//yarnClusterEntrypoint – Yarn 集群入口点的类名称。
//jobGraph – 与 Flink 集群一起部署的作业图,如果没有null
//detached – 如果集群应该以分离模式启动则为 true
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
//是否启用了 Kerberos 安全性
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache =
flinkConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
final boolean fetchToken =
flinkConfiguration.get(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
final boolean yarnAccessFSEnabled =
!CollectionUtil.isNullOrEmpty(
flinkConfiguration.get(
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS));
if (!fetchToken && yarnAccessFSEnabled) {
throw new IllegalConfigurationException(
String.format(
"When %s is disabled, %s must be disabled as well.",
SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(),
SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS.key()));
}
}
//检查参数
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
//检查指定队列是否存在
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// Create application via yarnClient
//通过yarnClient创建应用程序
final YarnClientApplication yarnApplication = yarnClient.createApplication();
//获取新的应用程序响应
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
//获得最大的资源能力
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
//获取当前空闲集群资源
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
// yarn最小分配 MB
final int yarnMinAllocationMB =
yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(
"The minimum allocation memory "
+ "("
+ yarnMinAllocationMB
+ " MB) configured via '"
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "' should be greater than 0.");
}
final ClusterSpecification validClusterSpecification;
try {
//有效的集群规范
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
//执行模式
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfiguration.set(
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
ApplicationReport report =
//启动AppMaster
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
// print the application id for user to cancel themselves.
//打印应用程序 ID 供用户自行取消。
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
//将集群入口点信息设置为 config
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Error while creating RestClusterClient.", e);
}
};
}
org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster
总体来说就是通过YarnClient提交新建的Application信息,向Yarn:ResourceManager通信,启动上述ApplicationMaster过程
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
// ------------------ Initialize the file systems -------------------------
//初始化文件系统
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
final FileSystem fs = FileSystem.get(yarnConfiguration);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme()
// method.
//对 GoogleHDFS 客户端进行硬编码检查,因为它没有覆盖 getScheme() 方法。
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
&& fs.getScheme().startsWith("file")) {
LOG.warn(
"The file system scheme is '"
+ fs.getScheme()
+ "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
//获取应用程序提交上下文
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
//获取合格的远程提供的库目录
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteProvidedLibDirs(configuration, yarnConfiguration);
//获得合格的远程提供的 usr lib
final Optional<Path> providedUsrLibDir =
Utils.getQualifiedRemoteProvidedUsrLib(configuration, yarnConfiguration);
//获取暂存目录
Path stagingDirPath = getStagingDir(fs);
FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
stagingDirFs,
stagingDirPath,
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
// The files need to be shipped and added to classpath.
//需要传送这些文件并将其添加到类路径中。
Set<Path> systemShipFiles = new HashSet<>(shipFiles);
final String logConfigFilePath =
configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(getPathFromLocalFilePathStr(logConfigFilePath));
}
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguration ------
//将 Zookeeper 命名空间添加到本地 flinkConfiguration
setHAClusterIdIfNotSet(configuration, appId);
//高可用性模式是否已激活
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
//激活高可用性支持
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
//默认情况下将应用程序重试次数设置为 1
appContext.setMaxAppAttempts(
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
}
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
// only for per job mode
//仅适用于每个作业模式
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
//只上传本地文件
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
//将提供的 lib 目录中的所有文件注册为具有公共可见性的本地资源,并将其余依赖项上传为具有应用程序可见性的本地资源。
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
//注册多个本地资源
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
//上传并注册仅发布文件插件文件只需要发布,不应添加到类路径中。
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<Path> shipOnlyFiles = new HashSet<>();
//添加插件文件夹来发送文件
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles, Path.CUR_DIR, LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives, Path.CUR_DIR, LocalResourceType.ARCHIVE);
}
// only for application mode
// Python jar file only needs to be shipped and should not be added to classpath.
//仅适用于应用程序模式 Python jar 文件仅需要传送,不应添加到类路径中。
if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)
&& PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {
fileUploader.registerMultipleLocalResources(
Collections.singletonList(
new Path(PackagedProgramUtils.getPythonJar().toURI())),
ConfigConstants.DEFAULT_FLINK_OPT_DIR,
LocalResourceType.FILE);
}
// Upload and register user jars
//上传并注册用户 jar
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
// usrlib in remote will be used first.
//首先使用远程中的 usrlib。
if (providedUsrLibDir.isPresent()) {
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
Collections.singletonList(providedUsrLibDir.get()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
} else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
// local usrlib will be automatically shipped if it exists and there is no remote
// usrlib.
//如果本地 usrlib 存在且没有远程 usrlib,则会自动传送。
final Set<File> usrLibShipFiles = new HashSet<>();
addUsrLibFolderToShipFiles(usrLibShipFiles);
final List<String> usrLibClassPaths =
fileUploader.registerMultipleLocalResources(
usrLibShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
userClassPaths.addAll(usrLibClassPaths);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
//通过排序标准化类路径
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
//类路径汇编器
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
//为 ApplicationMaster 设置 jar
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
// write job graph to tmp file and add it to local resource
//将作业图写入 tmp 文件并将其添加到本地资源
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.set(JOB_GRAPH_FILE_PATH, jobGraphFilename);
//注册单个本地资源
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
// Upload the flink configuration
// write out configuration file
//上传flink配置写出配置文件
File tmpConfigurationFile = null;
try {
String flinkConfigFileName = GlobalConfiguration.getFlinkConfFilename();
tmpConfigurationFile = File.createTempFile(appId + "-" + flinkConfigFileName, null);
// remove localhost bind hosts as they render production clusters unusable
//删除本地主机绑定主机,因为它们会使生产集群无法使用
removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
// this setting is unconditionally overridden anyway, so we remove it for clarity
//无论如何,此设置都会被无条件覆盖,因此为了清楚起见,我们将其删除
configuration.removeConfig(TaskManagerOptions.HOST);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
//注册单个本地资源
fileUploader.registerSingleLocalResource(
flinkConfigFileName,
new Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append(flinkConfigFileName).append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
//支持 Yarn 安全集成测试场景 在集成测试设置中,YarnMiniCluster 创建的 Yarn 容器没有 Yarn 站点 XML 和 KRB5 配置文件。
// 我们将这些文件添加为容器本地资源,以便容器应用程序 (JMTM) 具有正确的安全集群设置
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab = flinkConfiguration.get(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath = flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
//通过本地资源将密钥表本地化到 YARN 容器。
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
final ContainerLaunchContext amContainer =
//设置ApplicationMaste容器
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
boolean fetchToken = configuration.get(SecurityOptions.DELEGATION_TOKENS_ENABLED);
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
if (kerberosLoginProvider.isLoginPossible(true)) {
setTokensFor(amContainer, fetchToken);
} else {
LOG.info(
"Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
}
//设置本地资源
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
Utils.setAclsFor(amContainer, flinkConfiguration);
// Setup CLASSPATH and environment variables for ApplicationMaster
//为ApplicationMaster设置CLASSPATH和环境变量
final Map<String, String> appMasterEnv =
generateApplicationMasterEnv(
fileUploader,
classPathBuilder.toString(),
localResourceDescFlinkJar.toString(),
appId.toString());
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
// To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
//设置环境
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
//设置ApplicationMaster的资源类型要求
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// Set priority for application
//设置应用程序的优先级
int priorityNum = flinkConfiguration.get(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
//设置应用程序节点标签
setApplicationNodeLabel(appContext);
//设置应用程序标签
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
//添加一个钩子以在部署失败时进行清理
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
//提交申请
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
long lastLogTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - lastLogTime > 60000) {
lastLogTime = System.currentTimeMillis();
LOG.info(
"Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster",
(lastLogTime - startTime) / 1000);
}
}
lastAppState = appState;
Thread.sleep(250);
}
// since deployment was successful, remove the hook
//由于部署成功,删除钩子
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
Flink应用main方法启动过程※
上一小节讲述到YarnClient提交过程,Yarn:ResourceManager收到ApplicationMaster信息后,会通知NodeManager分配Container并启动ApplicationMaster程序,即上面讲的YarnApplicationClusterEntryPoint程序。
此小节着重分析YarnApplicationClusterEntryPoint涉及的Flink应用main(...)方法启动过程,YarnApplicationClusterEntryPoint整体启动过程在后面详细讲解。
触发leader选举流程※
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint#main
public static void main(final String[] args) {
// startup checks and logging
//启动检查和日志记录
EnvironmentInformation.logEnvironmentInfo(
LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
Map<String, String> env = System.getenv();
//工作目录
final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
Preconditions.checkArgument(
workingDirectory != null,
"Working directory variable (%s) not set",
ApplicationConstants.Environment.PWD.key());
try {
//记录yarn环境信息
YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
} catch (IOException e) {
LOG.warn("Could not log YARN environment information.", e);
}
final Configuration dynamicParameters =
//解析参数或退出
ClusterEntrypointUtils.parseParametersOrExit(
args,
new DynamicParametersConfigurationParserFactory(),
YarnApplicationClusterEntryPoint.class);
final Configuration configuration =
YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
PackagedProgram program = null;
try {
//生成Flink应用入口类main(...)方法信息,后面通过反射方式触发Flink应用main(...)方法执行。
program = getPackagedProgram(configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
try {
//配置执行
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}
YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =
//YarnApplicationCluster入口点
new YarnApplicationClusterEntryPoint(configuration, program);
//运行集群
ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
}
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
//启动集群
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
int returnCode;
Throwable throwable = null;
try {
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
} catch (Throwable e) {
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
}
LOG.info(
"Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
}
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#startCluster
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
//从配置中设置Flink安全管理器
FlinkSecurityManager.setFromConfiguration(configuration);
PluginManager pluginManager =
//从根文件夹创建插件管理器
PluginUtils.createPluginManagerFromRootFolder(configuration);
configureFileSystems(configuration, pluginManager);
//安装安全上下文
SecurityContext securityContext = installSecurityContext(configuration);
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
securityContext.runSecured(
(Callable<Void>)
() -> {
//运行集群
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
//清理任何部分状态
shutDownAsync(
ApplicationStatus.FAILED,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(strippedThrowable),
false)
.get(
INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format(
"Failed to initialize the cluster entrypoint %s.",
getClass().getSimpleName()),
strippedThrowable);
}
}
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster
//运行集群
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
//初始化服务
initializeServices(configuration, pluginManager);
// write host information into configuration
//将主机信息写入配置
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
//创建调度程序资源管理器组件工厂 返回的是DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(configuration);
//通过一系列调用链路生成dispatcherRunner实例,
//dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
//而dispatcher组件负责触发Flink应用main(...)方法执行
//用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
//rpc 指标查询服务检索器
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers,
this);
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
//这是一般的关闭路径。如果已经触发了单独的更具体的关闭,则这不会执行任何操作
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
}
});
}
}
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#createDispatcherResourceManagerComponentFactory
返回的是DefaultDispatcherResourceManagerComponentFactory
@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
throws IOException {
return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
YarnResourceManagerFactory.getInstance(),
FileJobGraphRetriever.createFrom(
configuration,
YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null)));
}
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createJobComponentFactory
public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobGraphRetriever) {
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever),
resourceManagerFactory,
JobRestEndpointFactory.INSTANCE);
}
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
通过一系列调用链路生成dispatcherRunner实例,dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,而dispatcher组件负责触发Flink应用main(...)方法执行。
//用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
//dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
//而dispatcher组件负责触发Flink应用main(...)方法执行
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception {
//暂时忽略其他部分.....
DispatcherRunner dispatcherRunner = null;
try {
log.debug("Starting Dispatcher.");
//创建并触发dispatcher组件高可用Leader选举过程。
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElection(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
//暂时忽略其他部分.....
} catch (Exception exception) {
}
}
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner
@Override
public DispatcherRunner createDispatcherRunner(
LeaderElection leaderElection,
FatalErrorHandler fatalErrorHandler,
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
dispatcherLeaderProcessFactoryFactory.createFactory(
jobPersistenceComponentFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
//创建并触发dispatcher组件高可用Leader选举过程。
return DefaultDispatcherRunner.create(
leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
}
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#create
public static DispatcherRunner create(
LeaderElection leaderElection,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory)
throws Exception {
final DefaultDispatcherRunner dispatcherRunner =
new DefaultDispatcherRunner(
leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
//触发dispatcher组件高可用Leader选举过程。
dispatcherRunner.start();
return dispatcherRunner;
}
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#start
void start() throws Exception {
//将通过的LeaderContender注册到领导者选举流程。
leaderElection.startLeaderElection(this);
}
leader 回调,触发Flink main方法※
选举为leader的DefaultDispatcherRunner实例候选者在回调动作过程中会调用到grantLeadership(...)方法
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#grantLeadership
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
//生成并启动dispatcherLeaderProcess
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner#startNewDispatcherLeaderProcess
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
stopDispatcherLeaderProcess();
//返回的实际类型是SessionDispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
//启动dispatcherLeaderProcess实例
newDispatcherLeaderProcess::start));
}
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#start
@Override
public final void start() {
runIfStateIs(State.CREATED, this::startInternal);
}
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess#startInternal
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
onStart();
}
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#onStart
@Override
protected void onStart() {
//启动jobGraphStore
startServices();
//一系列调用后触发Flink应用main(...)方法的执行
onGoingRecoveryOperation =
createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults();
}
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults
private CompletableFuture<Void>
createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults() {
final CompletableFuture<Collection<JobResult>> dirtyJobsFuture =
CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);
return dirtyJobsFuture
.thenApplyAsync(
dirtyJobs ->
//如果正在运行则恢复作业
this.recoverJobsIfRunning(
dirtyJobs.stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet())),
ioExecutor)
//如果正在运行则创建调度程序
.thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
.handle(this::onErrorIfRunning);
}
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherIfRunning
private void createDispatcherIfRunning(
Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
//创建调度程序
runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, recoveredDirtyJobResults));
}
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcher
private void createDispatcher(
Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
final DispatcherGatewayService dispatcherService =
//会新建ApplicationDispatcherBootstrap实例 并在构建时触发Flink应用main
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
jobGraphs,
recoveredDirtyJobResults,
jobGraphStore,
jobResultStore);
completeDispatcherSetup(dispatcherService);
}
@Override
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {
final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
//新建ApplicationDispatcherBootstrap实例
new ApplicationDispatcherBootstrap(
application,
recoveredJobIds,
configuration,
dispatcherGateway,
scheduledExecutor,
errorHandler),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
//Dispatcher 启动
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#ApplicationDispatcherBootstrap
public ApplicationDispatcherBootstrap(
final PackagedProgram application,
final Collection<JobID> recoveredJobIds,
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final FatalErrorHandler errorHandler) {
this.configuration = checkNotNull(configuration);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.application = checkNotNull(application);
this.errorHandler = checkNotNull(errorHandler);
this.applicationCompletionFuture =
//修复JobId 并异步运行应用程序
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
this.bootstrapCompletionFuture = finishBootstrapTasks(dispatcherGateway);
}
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#fixJobIdAndRunApplicationAsync
private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor) {
final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
final boolean submitFailedJobOnApplicationError =
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR);
//非HA模式
if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)
&& !configuredJobId.isPresent()) {
//异步运行应用程序
return runApplicationAsync(
dispatcherGateway, scheduledExecutor, false, submitFailedJobOnApplicationError);
}
if (!configuredJobId.isPresent()) {
// In HA mode, we only support single-execute jobs at the moment. Here, we manually
// generate the job id, if not configured, from the cluster id to keep it consistent
// across failover.
//在HA模式下,我们目前仅支持单执行作业。
//在这里,我们从集群 ID 手动生成作业 ID(如果未配置),以使其在故障转移期间保持一致。
configuration.set(
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
new JobID(
Preconditions.checkNotNull(
configuration.get(
HighAvailabilityOptions.HA_CLUSTER_ID))
.hashCode(),
0)
.toHexString());
}
//异步运行应用程序
return runApplicationAsync(
dispatcherGateway, scheduledExecutor, true, submitFailedJobOnApplicationError);
}
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationAsync
/**
* Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
* The returned {@link CompletableFuture} completes when all jobs of the user application
* succeeded. if any of them fails, or if job submission fails.
*/
//通过在给定的scheduledExecutor上调度任务来运行用户程序入口点。
//当用户应用程序的所有作业成功时,返回的CompletableFuture完成。
//如果其中任何一个失败,或者作业提交失败。
private CompletableFuture<Void> runApplicationAsync(
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution,
final boolean submitFailedJobOnApplicationError) {
final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();
final Set<JobID> tolerateMissingResult = Collections.synchronizedSet(new HashSet<>());
// we need to hand in a future as return value because we need to get those JobIs out
// from the scheduled task that executes the user program
//我们需要将 future 作为返回值,因为我们需要从执行用户程序的计划任务中获取这些 JobIs
applicationExecutionTask =
scheduledExecutor.schedule(
() ->
//运行应用程序入口点
runApplicationEntryPoint(
applicationExecutionFuture,
tolerateMissingResult,
dispatcherGateway,
scheduledExecutor,
enforceSingleJobExecution,
submitFailedJobOnApplicationError),
0L,
TimeUnit.MILLISECONDS);
return applicationExecutionFuture.thenCompose(
jobIds ->
getApplicationResult(
dispatcherGateway,
jobIds,
tolerateMissingResult,
scheduledExecutor));
}
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPoint
/**
* Runs the user program entrypoint and completes the given {@code jobIdsFuture} with the {@link
* JobID JobIDs} of the submitted jobs.
*
* <p>This should be executed in a separate thread (or task).
*/
//运行用户程序入口点并使用已提交作业的JobIDs完成给定的jobIdsFuture 。
//这应该在单独的线程(或任务)中执行。
private void runApplicationEntryPoint(
final CompletableFuture<List<JobID>> jobIdsFuture,
final Set<JobID> tolerateMissingResult,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution,
final boolean submitFailedJobOnApplicationError) {
if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
jobIdsFuture.completeExceptionally(
new ApplicationExecutionException(
String.format(
"Submission of failed job in case of an application error ('%s') is not supported in non-HA setups.",
DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR
.key())));
return;
}
final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
try {
final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(
applicationJobIds, dispatcherGateway, scheduledExecutor);
//执行程序
ClientUtils.executeProgram(
executorServiceLoader,
configuration,
application,
enforceSingleJobExecution,
true /* suppress sysout */);
if (applicationJobIds.isEmpty()) {
jobIdsFuture.completeExceptionally(
new ApplicationExecutionException(
"The application contains no execute() calls."));
} else {
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
// If we're running in a single job execution mode, it's safe to consider re-submission
// of an already finished a success.
final Optional<DuplicateJobSubmissionException> maybeDuplicate =
ExceptionUtils.findThrowable(t, DuplicateJobSubmissionException.class);
if (enforceSingleJobExecution
&& maybeDuplicate.isPresent()
&& maybeDuplicate.get().isGloballyTerminated()) {
final JobID jobId = maybeDuplicate.get().getJobID();
tolerateMissingResult.add(jobId);
jobIdsFuture.complete(Collections.singletonList(jobId));
} else if (submitFailedJobOnApplicationError && applicationJobIds.isEmpty()) {
final JobID failedJobId =
JobID.fromHexString(
configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
dispatcherGateway
.submitFailedJob(failedJobId, FAILED_JOB_NAME, t)
.thenAccept(
ignored ->
jobIdsFuture.complete(
Collections.singletonList(failedJobId)));
} else {
jobIdsFuture.completeExceptionally(
new ApplicationExecutionException("Could not execute application.", t));
}
}
}
org.apache.flink.client.ClientUtils#executeProgram
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info(
"Starting program (detached: {})",
!configuration.get(DeploymentOptions.ATTACHED));
//设置上下文
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
// For DataStream v2.
ExecutionContextEnvironment.setAsContext(
executorServiceLoader, configuration, userCodeClassLoader);
try {
//调用交互模式执行
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
// For DataStream v2.
ExecutionContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
org.apache.flink.client.program.PackagedProgram#invokeInteractiveModeForExecution
/**
* This method assumes that the context environment is prepared, or the execution will be a
* local execution by default.
*/
//该方法假设上下文环境已准备好,否则默认执行本地执行。
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
//调用主方法
callMainMethod(mainClass, args);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
org.apache.flink.client.program.PackagedProgram#callMainMethod
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException(
"Could not look up the main(String[]) method from the class "
+ entryClass.getName()
+ ": "
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
//反射调用执行main方法
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException(
"Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException(
"Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException(
"The main method caused an error: " + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException(
"An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
}
小结※
本文简单分析了Flink On Yarn Application模式提交过程解析
参考资料: