JobManager启动过程ResourceManager启动解析※
根据类继承信息可知ResourceManagerServiceImpl是一个Leader选举候选者。
public class ResourceManagerServiceImpl implements ResourceManagerService, LeaderContender {}
resourceManagerService创建完成后会调用start()方法启动Leader选举过程,选举过程和上篇WebmonitorEndpoint的Leader选举过程一样。
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
delegationTokenManager,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
//启动ResourceManagerService
resourceManagerService.start();
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#grantLeadership
// 选举成功后,回调此方法
@Override
public void grantLeadership(UUID newLeaderSessionID) {
handleLeaderEventExecutor.execute(
() -> {
synchronized (lock) {
if (!running) {
LOG.info(
"Resource manager service is not running. Ignore granting leadership with session ID {}.",
newLeaderSessionID);
return;
}
LOG.info(
"Resource manager service is granted leadership with session id {}.",
newLeaderSessionID);
try {
//启动新的领导者资源管理器
startNewLeaderResourceManager(newLeaderSessionID);
} catch (Throwable t) {
fatalErrorHandler.onFatalError(
new FlinkException("Cannot start resource manager.", t));
}
}
});
}
可知Leader选举完会回调到ResourceManagerServiceImpl.startNewLeaderResourceManager(...)方法中
resourceManagerFactory.createResourceManager(...)方法最终会创建一个ActiveResourceManager实例
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#startNewLeaderResourceManager
//启动新的领导者资源管理器
@GuardedBy("lock")
private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception {
//停止领导者资源管理器
stopLeaderResourceManager();
this.leaderSessionID = newLeaderSessionID;
//创建ResourceManager 创建了SlotManager 和 JobLeaderIdService
this.leaderResourceManager =
resourceManagerFactory.createResourceManager(rmProcessContext, newLeaderSessionID);
final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;
previousResourceManagerTerminationFuture
.thenComposeAsync(
(ignore) -> {
synchronized (lock) {
//如果是领导者则启动资源管理器
return startResourceManagerIfIsLeader(newLeaderResourceManager);
}
},
handleLeaderEventExecutor)
.thenAcceptAsync(
(isStillLeader) -> {
if (isStillLeader) {
//确认Leader 并发布信息
leaderElection.confirmLeadership(
newLeaderSessionID, newLeaderResourceManager.getAddress());
}
},
ioExecutor);
}
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory#createResourceManager(org.apache.flink.runtime.resourcemanager.ResourceManagerProcessContext, java.util.UUID)
public ResourceManager<T> createResourceManager(
ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {
final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
//创建资源管理器运行时服务 创建了SlotManager 和 JobLeaderIdService
createResourceManagerRuntimeServices(
context.getRmRuntimeServicesConfig(),
context.getRpcService(),
context.getHighAvailabilityServices(),
SlotManagerMetricGroup.create(
context.getMetricRegistry(), context.getHostname()));
//创建资源管理器
return createResourceManager(
context.getRmConfig(),
context.getResourceId(),
context.getRpcService(),
leaderSessionId,
context.getHeartbeatServices(),
context.getDelegationTokenManager(),
context.getFatalErrorHandler(),
context.getClusterInformation(),
context.getWebInterfaceUrl(),
ResourceManagerMetricGroup.create(
context.getMetricRegistry(), context.getHostname()),
resourceManagerRuntimeServices,
context.getIoExecutor());
}
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory#createResourceManagerRuntimeServices
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManagerMetricGroup slotManagerMetricGroup) {
return ResourceManagerRuntimeServices.fromConfiguration(
rmRuntimeServicesConfig,
highAvailabilityServices,
rpcService.getScheduledExecutor(),
slotManagerMetricGroup);
}
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices#fromConfiguration
public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
ScheduledExecutor scheduledExecutor,
SlotManagerMetricGroup slotManagerMetricGroup) {
// 创建了SlotManager
final SlotManager slotManager =
createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);
//创建了JobLeaderIdService
final JobLeaderIdService jobLeaderIdService =
new DefaultJobLeaderIdService(
highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerFactory#createResourceManager
@Override
public ResourceManager<WorkerType> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
UUID leaderSessionId,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices,
Executor ioExecutor)
throws Exception {
final ThresholdMeter failureRater = createStartWorkerFailureRater(configuration);
final Duration retryInterval =
configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL);
final Duration workerRegistrationTimeout =
configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT);
final Duration previousWorkerRecoverTimeout =
configuration.get(
ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
//活动资源管理器
return new ActiveResourceManager<>(
//创建了YarnResourceManagerDriver
createResourceManagerDriver(
configuration, webInterfaceUrl, rpcService.getAddress()),
configuration,
rpcService,
leaderSessionId,
resourceId,
heartbeatServices,
delegationTokenManager,
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
BlocklistUtils.loadBlocklistHandlerFactory(configuration),
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
failureRater,
retryInterval,
workerRegistrationTimeout,
previousWorkerRecoverTimeout,
ioExecutor);
}
startResourceManagerIfIsLeader(...)方法最后会触发resourceManager.start()方法的调用,此方法里会启动resourcemanager所需要的一系列基础组件,下面会详细讲解。
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#startResourceManagerIfIsLeader
/**
* Returns a future that completes as {@code true} if the resource manager is still leader and
* started, and {@code false} if it's no longer leader.
*/
//如果资源管理器仍然是领导者并已启动, true返回一个 future,如果它不再是领导者,则返回false 。
@GuardedBy("lock")
private CompletableFuture<Boolean> startResourceManagerIfIsLeader(
ResourceManager<?> resourceManager) {
if (isLeader(resourceManager)) {
//启动 回回调到 org.apache.flink.runtime.resourcemanager.ResourceManager.onStart 方法
resourceManager.start();
forwardTerminationFuture(resourceManager);
return resourceManager.getStartedFuture().thenApply(ignore -> true);
} else {
return CompletableFuture.completedFuture(false);
}
}
org.apache.flink.runtime.resourcemanager.ResourceManager#onStart
@Override
public final void onStart() throws Exception {
try {
log.info("Starting the resource manager.");
//启动资源管理器服务
startResourceManagerServices();
startedFuture.complete(null);
} catch (Throwable t) {
final ResourceManagerException exception =
new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()),
t);
onFatalError(exception);
throw exception;
}
}
org.apache.flink.runtime.resourcemanager.ResourceManager#startResourceManagerServices
private void startResourceManagerServices() throws Exception {
try {
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
//注册指标
registerMetrics();
//启动心跳服务 启动了俩个心跳 跟TaskManager和跟JobManager的
startHeartbeatServices();
//启动slotManager 并开启了一个定时任务定时申请所需资源
slotManager.start(
getFencingToken(),
getMainThreadExecutor(),
resourceAllocator,
new ResourceEventListenerImpl(),
blocklistHandler::isBlockedTaskManager);
delegationTokenManager.start(this);
initialize();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
org.apache.flink.runtime.resourcemanager.ResourceManager#startHeartbeatServices
private void startHeartbeatServices() {
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager#start
/**
* Starts the slot manager with the given leader id and resource manager actions.
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
* @param newResourceAllocator to use for resource (de-)allocations
* @param newBlockedTaskManagerChecker to query whether a task manager is blocked
*/
//使用给定的领导者 ID 和资源管理器操作启动槽管理器。
//参数:
//newResourceManagerId – 用于与任务管理器通信
//newMainThreadExecutor – 用于在 ResourceManager 的主线程中运行代码
//newResourceAllocator – 用于资源(取消)分配
//newBlockedTaskManagerChecker – 查询任务管理器是否被阻止
@Override
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceAllocator newResourceAllocator,
ResourceEventListener newResourceEventListener,
BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
LOG.info("Starting the slot manager.");
resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceAllocator = Preconditions.checkNotNull(newResourceAllocator);
resourceEventListener = Preconditions.checkNotNull(newResourceEventListener);
slotStatusSyncer.initialize(
taskManagerTracker, resourceTracker, resourceManagerId, mainThreadExecutor);
blockedTaskManagerChecker = Preconditions.checkNotNull(newBlockedTaskManagerChecker);
started = true;
if (resourceAllocator.isSupported()) {
clusterReconciliationCheck =
scheduledExecutor.scheduleWithFixedDelay(
//检查并申请所需资源
() -> mainThreadExecutor.execute(this::checkClusterReconciliation),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
//注册SlotManager指标
registerSlotManagerMetrics();
}
leaderElection.confirmLeadership(...)方法作用是将Leader信息写入Zookeeper路径中。
小结※
以上可知ResourceManager的启动过程不太复杂,根据akka rpc框架原理,主要是启动了2个心跳服务、1个定时服务以及ResourceManager候选者竞选Leader过程并将自己的信息写入Zookeeper中。
参考资料:
Flink源码解析(十二)——Flink On Yarn JobManager启动过程ResourceManager启动解析