JobManager三大组件功能简介※
JobManager是Flink系统master节点的逻辑称呼,不同的部署模式有不同的实现类,对于Flink On Yarn下Application模式,其实现类是YarnApplicationClusterEntryPoint。JobManager由三个核心组件构成,分别是ResourceManager、Dispatcher和WebmonitorEndpoint,以下是核心组件的功能简介。
1、WebmonitorEndpoint:Rest服务,内部由Netty实现。客户端发起的所有请求都会被该组件接收处理。
2、Dispatcher:负责接收客户端提交的JobGraph请求,启动一个JobMaster。内部持有一个JobGraphStore,当物理执行图构成过程中主节点发生故障时,可以从JobGraphStore中从新拉起一个新的JobGraph。
3、ResourceManager:Flink集群的资源管理器,作用于Flink和资源管理集群(Yarn、K8s等)之间。主要功能包括启动新的TaskManager、为作业申请slot、维持和TaskManager、JobMaster的心跳等功能。
WebmonitorEndpoint启动解析※
上一篇最后小节介绍到YarnApplicationClusterEntryPoint启动过程最后会调用到ClusterEntrypoint.runCluster(...)方法,但只分析到Flink应用main(...)方法启动过程。现在继续分析ClusterEntrypoint.runCluster()方法涉及的三大组件创建过程。
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster
//运行集群
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
//初始化基础服务
initializeServices(configuration, pluginManager);
// write host information into configuration
//将主机信息写入配置
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
//创建调度程序资源管理器组件工厂 返回的是DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(configuration);
//通过一系列调用链路生成dispatcherRunner实例,
//dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,
//而dispatcher组件负责触发Flink应用main(...)方法执行
//用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
//rpc 指标查询服务检索器
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers,
this);
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
//这是一般的关闭路径。如果已经触发了单独的更具体的关闭,则这不会执行任何操作
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
}
});
}
}
ClusterEntrypoint.initializeServices()方法※
为了支持上面3个核心组件的运行,该方法会生成8个基础服务:
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#initializeServices
protected void initializeServices(Configuration configuration, PluginManager pluginManager)
throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
resourceId =
configuration
.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
.map(
value ->
DeterminismEnvelope.deterministicValue(
new ResourceID(value)))
.orElseGet(
() ->
DeterminismEnvelope.nondeterministicValue(
ResourceID.generate()));
LOG.debug(
"Initialize cluster entrypoint {} with resource id {}.",
getClass().getSimpleName(),
resourceId);
workingDirectory =
//创建作业管理器工作目录
ClusterEntrypointUtils.createJobManagerWorkingDirectory(
configuration, resourceId);
LOG.info("Using working directory: {}.", workingDirectory);
rpcSystem = RpcSystem.load(configuration);
//初始化和启动PekkoRpcService
commonRpcService =
RpcUtils.createRemoteRpcService(
rpcSystem,
configuration,
configuration.get(JobManagerOptions.ADDRESS),
//通用RpcService的端口范围
getRPCPortRange(configuration),
configuration.get(JobManagerOptions.BIND_HOST),
//JobManager 绑定的本地 RPC 端口
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
//启动一个 JMXService,客户端用于连接JobManager JVM 进行监控
JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));
// update the configuration used to create the high availability services
//更新用于创建高可用性服务的配置
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
//初始化一个负责集群io的线程池
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
delegationTokenManager =
DefaultDelegationTokenManagerFactory.create(
configuration,
pluginManager,
commonRpcService.getScheduledExecutor(),
ioExecutor);
// Obtaining delegation tokens and propagating them to the local JVM receivers in a
// one-time fashion is required because BlobServer may connect to external file systems
//由于 BlobServer 可能连接到外部文件系统,因此需要一次性获取委托令牌并将其传播到本地 JVM 接收器
delegationTokenManager.obtainDelegationTokens();
//创建一个ZooKeeperLeaderElectionHaServices类型的高可用服务
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
//初始化BlobServer服务端
blobServer =
BlobUtils.createBlobServer(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
haServices.createBlobStore());
blobServer.start();
configuration.set(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
//创建一个心跳服务,管理组件心跳
heartbeatServices = createHeartbeatServices(configuration);
failureEnrichers = FailureEnricherUtils.getFailureEnrichers(configuration);
//创建指标注册表
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
//性能监控服务
final RpcService metricQueryServiceRpcService =
//启动远程metrics rpc服务
MetricUtils.startRemoteMetricsRpcService(
configuration,
commonRpcService.getAddress(),
configuration.get(JobManagerOptions.BIND_HOST),
rpcSystem);
//启动查询服务
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
//获取主机名
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup =
//实例化进程度量组
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
//初始化一个FileExecutionGraphInfoStore类型的ExecutionGraph store服务
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
}
}
生成工厂类※
调用YarnApplicationClusterEntryPoint的createDispatcherResourceManagerComponentFactory方法
DispatcherResourceManagerComponentFactory实例创建过程如下,可知主要包含WebmonitorEndpoint、Dispatcher、ResourceManager 3大核心组件的工厂类。
org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint#createDispatcherResourceManagerComponentFactory
@Override
protected DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(final Configuration configuration) {
//创建调度程序资源管理器组件工厂
return new DefaultDispatcherResourceManagerComponentFactory(
//Dispatcher工厂
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory.create(
//使用StandaloneDispatcher
configuration, SessionDispatcherFactory.INSTANCE, program)),
//ResourceManager 工厂
resourceManagerFactory,
//WebmonitorEndpoint 工厂
JobRestEndpointFactory.INSTANCE);
}
WebMonitorEndpoint创建和启动※
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// 暂时忽略其他
//WebmonitorEndpoint的创建过程
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElection(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
// WebmonitorEndpoint 启动
webMonitorEndpoint.start();
// 暂时忽略其他
}
org.apache.flink.runtime.rest.RestServerEndpoint#start
public final void start() throws Exception {
synchronized (lock) {
Preconditions.checkState(
state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
//创建Router
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
//客户端请求Handler生成。
handlers = initializeHandlers(restAddressFuture);
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
//将Handlers按请求地址信息排序,目的是确认请求URL和Handler的一一对应关系
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
//确认Handler的唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
//将所有的Handler注册到Router当中。
handlers.forEach(handler -> registerHandler(router, handler, log));
MultipartRoutes multipartRoutes = createMultipartRoutes(handlers);
log.debug("Using {} for FileUploadHandler", multipartRoutes);
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws ConfigurationException {
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled()) {
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir, multipartRoutes))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories) {
Optional<ChannelHandler> channelHandler =
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent()) {
ch.pipeline().addLast(channelHandler.get());
}
}
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
//Netty服务端启动操作,Handlers注册完以后开发Netty服务端的启动操作,
// 通道初始化器生成、Netty服务端启动、遍历端口范围,防止端口冲突后绑定端口。
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
Iterator<Integer> portsIterator;
try {
//遍历端口范围
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch (IllegalConfigurationException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
}
int chosenPort = 0;
while (portsIterator.hasNext()) {
try {
chosenPort = portsIterator.next();
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(chosenPort);
} else {
channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch (final Exception e) {
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException)) {
throw e;
}
}
}
if (serverChannel == null) {
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
}
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
//Netty服务端启动后,修改WebMonitorEndpoint状态为RUNNING状态,
state = State.RUNNING;
//进行Leader选举和启动其他基础服务。
startInternal();
}
}
进行Leader选举和启动其他基础服务※
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal
@Override
public void startInternal() throws Exception {
//开始Leader选举过程。
leaderElection.startLeaderElection(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElection#startLeaderElection
@Override
public void startLeaderElection(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender);
//注册LeaderContender
parentService.register(componentId, contender);
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#register
@Override
protected void register(String componentId, LeaderContender contender) throws Exception {
checkNotNull(componentId, "componentId must not be null.");
checkNotNull(contender, "Contender must not be null.");
synchronized (lock) {
Preconditions.checkState(
running,
"The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
if (leaderElectionDriver == null) {
//创建LeaderElectionDriver组件,并赋值issuedLeaderSessionId。
createLeaderElectionDriver();
}
//添加componentId -> WebMonitorEndpoint的映射
Preconditions.checkState(
leaderContenderRegistry.put(componentId, contender) == null,
"There shouldn't be any contender registered under the passed component '%s'.",
componentId);
LOG.info(
"LeaderContender has been registered under component '{}' for {}.",
componentId,
leaderElectionDriver);
//前面赋值了issuedLeaderSessionId。
if (issuedLeaderSessionID != null) {
// notifying the LeaderContender shouldn't happen in the contender's main thread
runInLeaderEventThread(
LEADER_ACQUISITION_EVENT_LOG_NAME,
() ->
notifyLeaderContenderOfLeadership(
componentId, issuedLeaderSessionID));
}
}
}
创建LeaderElectionDriver组件,并参与选举※
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#createLeaderElectionDriver
@GuardedBy("lock")
private void createLeaderElectionDriver() throws Exception {
Preconditions.checkState(
leaderContenderRegistry.isEmpty(),
"No LeaderContender should have been registered, yet.");
Preconditions.checkState(
leaderElectionDriver == null,
"This DefaultLeaderElectionService cannot be reused. Calling startLeaderElectionBackend can only be called once to establish the connection to the HA backend.");
leaderElectionDriver = leaderElectionDriverFactory.create(this);
LOG.info(
"A connection to the HA backend was established through LeaderElectionDriver {}.",
leaderElectionDriver);
}
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#create
@Override
public ZooKeeperLeaderElectionDriver create(
LeaderElectionDriver.Listener leaderElectionListener) throws Exception {
return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);
}
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#ZooKeeperLeaderElectionDriver
public ZooKeeperLeaderElectionDriver(
CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
throws Exception {
this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
this.leaderLatchPath =
ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
this.treeCache =
ZooKeeperUtils.createTreeCache(
curatorFramework,
"/",
new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());
treeCache
.getListenable()
.addListener(
(client, event) -> {
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleChangedLeaderInformation(event.getData());
break;
case NODE_REMOVED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleRemovedLeaderInformation(event.getData().getPath());
break;
}
});
leaderLatch.addListener(this);
curatorFramework.getConnectionStateListenable().addListener(listener);
//启动执行leaderLatch 选举为leader时回调LeaderLatchListener的isLeader()方法
leaderLatch.start();
treeCache.start();
}
Leader回调赋值issuedLeaderSessionID※
LeaderLatch在启动执行start()方法之前会先调用addListener(...)方法添加一个LeaderLatchListener,在被选举为leader时回调LeaderLatchListener的isLeader()方法
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver#isLeader
//选举为leader时回调此方法
@Override
public void isLeader() {
final UUID leaderSessionID = UUID.randomUUID();
LOG.debug("{} obtained the leadership with session ID {}.", this, leaderSessionID);
leaderElectionListener.onGrantLeadership(leaderSessionID);
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadership
@Override
public void onGrantLeadership(UUID leaderSessionID) {
runInLeaderEventThread(
LEADER_ACQUISITION_EVENT_LOG_NAME,
() -> onGrantLeadershipInternal(leaderSessionID));
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadershipInternal
@GuardedBy("lock")
private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
Preconditions.checkNotNull(newLeaderSessionId);
Preconditions.checkState(
issuedLeaderSessionID == null,
"The leadership should have been granted while not having the leadership acquired.");
//赋值issuedLeaderSessionID
issuedLeaderSessionID = newLeaderSessionId;
//此时leaderContenderRegistry还未添加componentId -> WebMonitorEndpoint的映射,
// onGrantLeadershipInternal(...)直接退出,因此leaderElectionDriver创建完成并退回到上面的register(...)方法中。
leaderContenderRegistry
.keySet()
.forEach(
componentId ->
notifyLeaderContenderOfLeadership(
componentId, issuedLeaderSessionID));
}
确认Leader成功并发布领导者地址※
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#register方法中最终会调用notifyLeaderContenderOfLeadership方法
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#notifyLeaderContenderOfLeadership
@GuardedBy("lock")
private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {
if (!leaderContenderRegistry.containsKey(componentId)) {
LOG.debug(
"The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
sessionID,
leaderElectionDriver);
return;
} else if (!sessionID.equals(issuedLeaderSessionID)) {
LOG.debug(
"An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",
sessionID,
issuedLeaderSessionID);
return;
}
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"The leadership should have been granted while not having the leadership acquired.");
LOG.debug(
"Granting leadership to the contender registered under component '{}' with session ID {}.",
componentId,
issuedLeaderSessionID);
//回调grantLeadership方法
leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);
}
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#grantLeadership
@Override
public void grantLeadership(final UUID leaderSessionID) {
log.info(
"{} was granted leadership with leaderSessionID={}",
getRestBaseUrl(),
leaderSessionID);
//确认LeaderContender已接受由给定领导会话id标识的领导。它还会发布领导者地址,在该地址下可以访问领导者。
leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElection#confirmLeadership
@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
}
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#confirmLeadership
@Override
protected void confirmLeadership(
String componentId, UUID leaderSessionID, String leaderAddress) {
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
LOG.debug(
"The leader session for component '{}' is confirmed with session ID {} and address {}.",
componentId,
leaderSessionID,
leaderAddress);
checkNotNull(leaderSessionID);
synchronized (lock) {
if (hasLeadership(componentId, leaderSessionID)) {
Preconditions.checkState(
leaderElectionDriver != null,
"The leadership check should only return true if a driver is instantiated.");
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"No confirmation should have happened, yet.");
final LeaderInformation newConfirmedLeaderInformation =
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
//发布领导信息 只是将自己的信息写入Zookeeper路径中。
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
} else {
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
LOG.debug(
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
leaderSessionID,
componentId,
issuedLeaderSessionID);
} else {
LOG.warn(
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId,
leaderSessionID);
}
}
}
}
小结※
以上可知WebMonitorEndpoint的启动过程不太复杂,只要是初始化了一系列的Handler、启动Netty服务并注册Handlers、WebMonitorEndpoint候选者竞选Leader过程并将自己的信息写入Zookeeper中。
参考资料:
Flink源码解析(十一)——Flink On Yarn JobManager启动过程WebmonitorEndpoint启动解析