在这篇文章中,我们将对 Flink HA加以分析。
HA 及 Leader 选举※
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
Leader 地址的获取通过 LeaderRetrievalListener 和 LeaderRetrievalService 这两个接口来完成。 LeaderRetrievalService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。
/**
* Classes which want to be notified about a changing leader by the {@link LeaderRetrievalService}
* have to implement this interface.
*/
//希望由LeaderRetrievalService通知有关更改领导者的类必须实现此接口。
public interface LeaderRetrievalListener {
/**
* This method is called by the {@link LeaderRetrievalService} when a new leader is elected.
*
* <p>If both arguments are null then it signals that leadership was revoked without a new
* leader having been elected.
*
* @param leaderAddress The address of the new leader
* @param leaderSessionID The new leader session ID
*/
//当选新的领导者时, LeaderRetrievalService将调用此方法。
//如果两个参数都为空,则表示在没有选举新领导人的情况下领导层被撤销。
//参数:
//leaderAddress -新领导者 的地址
//leaderSessionID -新领导者会话ID
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
/**
* This method is called by the {@link LeaderRetrievalService} in case of an exception. This
* assures that the {@link LeaderRetrievalListener} is aware of any problems occurring in the
* {@link LeaderRetrievalService} thread.
*
* @param exception
*/
//此方法由LeaderRetrievalService在发生异常时调用。
// 这确保LeaderRetrievalListener知道在LeaderRetrievalService线程中发生的任何问题
void handleError(Exception exception);
}
/**
* This interface has to be implemented by a service which retrieves the current leader and notifies
* a listener about it.
*
* <p>Prior to using this service it has to be started by calling the start method. The start method
* also takes the {@link LeaderRetrievalListener} as an argument. The service can only be started
* once.
*
* <p>The service should be stopped by calling the stop method.
*/
//此接口必须由检索当前leader并通知侦听器的服务来实现。
//在使用此服务之前,必须通过调用start方法来启动它。start方法还将LeaderRetrievalListener作为参数。该服务只能启动一次。
//应通过调用stop方法停止服务。
public interface LeaderRetrievalService {
/**
* Starts the leader retrieval service with the given listener to listen for new leaders. This
* method can only be called once.
*
* @param listener The leader retrieval listener which will be notified about new leaders.
* @throws Exception
*/
//使用给定的侦听器启动leader检索服务,以侦听新的leader。此方法只能调用一次。
//参数:
//监听器-领导者检索侦听器,将收到有关新领导者的通知。
void start(LeaderRetrievalListener listener) throws Exception;
/**
* Stops the leader retrieval service.
*
* @throws Exception
*/
//停止leader检索服务。
void stop() throws Exception;
}
GatewayRetriever 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriever 则同时继承了 LeaderRetriever 和 GatewayRetriever,因而
1)可以在Leader选举完成后得到 Leader地址
2)可以获取到 Leader的 RpcGateway。
RpcGatewayRetriever是 LeaderGatewayRetriever 的具体实现,根据 Leader 的地址通过 rpcService.connect 方法获得对应 Leader 的 RpcGateway。
/**
* {@link LeaderGatewayRetriever} implementation using the {@link RpcService}.
*
* @param <F> type of the fencing token
* @param <T> type of the fenced gateway to retrieve
*/
//LeaderGatewayRetriever使用RpcService实现。
public class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>>
extends LeaderGatewayRetriever<T> {
private final RpcService rpcService;
private final Class<T> gatewayType;
private final Function<UUID, F> fencingTokenMapper;
private final RetryStrategy retryStrategy;
public RpcGatewayRetriever(
RpcService rpcService,
Class<T> gatewayType,
Function<UUID, F> fencingTokenMapper,
RetryStrategy retryStrategy) {
this.rpcService = Preconditions.checkNotNull(rpcService);
this.gatewayType = Preconditions.checkNotNull(gatewayType);
this.fencingTokenMapper = Preconditions.checkNotNull(fencingTokenMapper);
this.retryStrategy = Preconditions.checkNotNull(retryStrategy);
}
@Override
protected CompletableFuture<T> createGateway(
CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
//根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway
return FutureUtils.retryWithDelay(
() ->
leaderFuture.thenCompose(
(Tuple2<String, UUID> addressLeaderTuple) ->
rpcService.connect(
addressLeaderTuple.f0,
fencingTokenMapper.apply(addressLeaderTuple.f1),
gatewayType)),
retryStrategy,
rpcService.getScheduledExecutor());
}
}
Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderElection(LeaderElectionService的代理) 和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender竞选成功了,会通过 LeaderContender#grantLeadership得到通知。
其Leader选举基本原理是同一类的LeaderContender,每个LeaderContender在启动时都会创建一个LeaderLatch,使用相同的leader path和其他LeaderLatch交互,其中一个最终会被选举为leader。
LeaderLatch在启动执行start()方法之前会先调用addListener(...)方法添加一个LeaderLatchListener,在被选举为leader时回调LeaderLatchListener的isLeader()方法,isLeader()方法即包含选举leader成功后LeaderContender的后续操作。在未被选举为leader时,会回调LeaderLatchListener的notLeader()方法,保持备用状态。
isLeader() 最终会回调到grantLeadership方法
/**
* Interface for a service which allows to elect a leader among a group of contenders.
*
* <p>Prior to using this service, it has to be started calling the start method. The start method
* takes the contender as a parameter. If there are multiple contenders, then each contender has to
* instantiate its own leader election service.
*
* <p>Once a contender has been granted leadership he has to confirm the received leader session ID
* by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify
* the leader election service, that the contender has accepted the leadership specified and that
* the leader session id as well as the leader address can now be published for leader retrieval
* services.
*/
//服务的接口,允许在一组竞争者中选出一个领导者。
//在使用此服务之前,必须调用start方法启动它。start方法将竞争者作为参数。
// 如果存在多个竞争者,则每个竞争者必须实例化其自己的领导者选举服务。
//一旦竞争者被授予领导地位,他必须通过调用LeaderElection. Confirmleadiablishing (UUID,String) 方法来确认收到的领导会话ID。
//这将通知领导者选举服务,竞争者已接受指定的领导,并且现在可以为领导者检索服务发布领导者会话id以及领导者地址
public interface LeaderElectionService {
/**
* Creates a new {@link LeaderElection} instance that is registered to this {@code
* LeaderElectionService} instance.
*
* @param componentId a unique identifier that refers to the stored leader information that the
* newly created {@link LeaderElection} manages.
*/
//创建一个新的领导选举注册到此的实例LeaderElectionService实例。
//参数:
//组件id-引用新创建的存储的领导者信息的唯一标识符领导选举管理。
LeaderElection createLeaderElection(String componentId);
}
/**
* {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
* LeaderContender}.
*/
//LeaderElectionService和LeaderContender之间充当LeaderElectionService的代理
public interface LeaderElection extends AutoCloseable {
/** Registers the passed {@link LeaderContender} with the leader election process. */
//将通过的LeaderContender注册到领导者选举流程。
void startLeaderElection(LeaderContender contender) throws Exception;
/**
* Confirms that the {@link LeaderContender} has accepted the leadership identified by the given
* leader session id. It also publishes the leader address under which the leader is reachable.
*
* <p>The intention of this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID and the
* related leader address to the leader retrieval services.
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
//确认LeaderContender已接受由给定领导会话id标识的领导。它还会发布领导者地址,在该地址下可以访问领导者。
//此方法的目的是在LeaderContender中设置新的leader会话ID与
//将新的leader会话ID和相关的leader地址发布到leader检索服务之间建立顺序。
//参数:
//leaderAddress -新的领导者会话ID
//leaderAddress -新的领导者
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
/**
* Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
* given leader session ID acquired.
*
* @param leaderSessionId identifying the current leader
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
//如果服务的LeaderContender在获取的给定领导者会话ID下具有领导,则返回true。
//参数:
//leaderSessionId -识别当前领导者
//return:
//如果关联的LeaderContender是领导者,则为true,否则为false
boolean hasLeadership(UUID leaderSessionId);
/**
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the
* underlying leader election. {@link LeaderContender#revokeLeadership()} will be called if the
* service still holds the leadership.
*/
//通过从基础leader选举中取消注册LeaderContender来关闭LeaderElection。
//LeaderContender. revokeLeadership() 如果服务仍然拥有领导地位,将被调用。
void close() throws Exception;
}
/**
* Interface which has to be implemented to take part in the leader election process of the {@link
* LeaderElectionService}.
*/
//必须实现以参与LeaderElectionService的领导者选举过程的接口。
public interface LeaderContender {
/**
* Callback method which is called by the {@link LeaderElectionService} upon selecting this
* instance as the new leader. The method is called with the new leader session ID.
*
* @param leaderSessionID New leader session ID
*/
//回调方法,该方法由LeaderElectionService在选择此实例作为新领导者时调用。使用新的leader会话ID调用该方法。
//参数:
//leaderSessionID -新的领导者会话ID
void grantLeadership(UUID leaderSessionID);
/**
* Callback method which is called by the {@link LeaderElectionService} upon revoking the
* leadership of a former leader. This might happen in case that multiple contenders have been
* granted leadership.
*/
// 回调方法,该方法由LeaderElectionService在撤销前leader的领导时调用。这可能发生在多个竞争者被授予领导权的情况下。
void revokeLeadership();
/**
* Callback method which is called by {@link LeaderElectionService} in case of an error in the
* service thread.
*
* @param exception Caught exception
*/
// LeaderElectionService在服务线程中出现错误时调用的回调方法。
void handleError(Exception exception);
}
LeaderElection有多种实现,如无需进行选举过程的 StandaloneLeaderElection,以及DefaultLeaderElection,具体的实现细节可参考对应的源码。
HighAvailabilityServices接口则提供了获取 HA 相关所有服务的方法,包括:
- ResourceManager 领导者选举和领导者检索
- JobManager 领导者选举和领导者检索
- 检查点元数据的持久性
- 注册最新完成的检查点
- BLOB 存储的持久性
- 标记作业状态的注册表
- RPC端点的命名
/**
* The HighAvailabilityServices give access to all services needed for a highly-available setup. In
* particular, the services provide access to highly available storage and registries, as well as
* distributed counters and leader election.
*
* <ul>
* <li>ResourceManager leader election and leader retrieval
* <li>JobManager leader election and leader retrieval
* <li>Persistence for checkpoint metadata
* <li>Registering the latest completed checkpoint(s)
* <li>Persistence for the BLOB store
* <li>Registry that marks a job's status
* <li>Naming of RPC endpoints
* </ul>
*/
//HighAvailabilityServices 允许访问高可用性设置所需的所有服务。
// 特别是,这些服务提供对高可用存储和注册表的访问,以及分布式计数器和领导者选举。
//ResourceManager 领导者选举和领导者检索
//JobManager 领导者选举和领导者检索
//检查点元数据的持久性
//注册最新完成的检查点
//BLOB 存储的持久性
//标记作业状态的注册表
//RPC端点的命名
public interface HighAvailabilityServices
extends ClientHighAvailabilityServices, GloballyCleanableResource {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/**
* This UUID should be used when no proper leader election happens, but a simple pre-configured
* leader is used. That is for example the case in non-highly-available standalone setups.
*/
// 当没有发生正确的领导者选举但使用简单的预配置领导者时,应该使用此 UUID。例如,在非高可用性独立设置中就是这种情况。
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
/**
* This JobID should be used to identify the old JobManager when using the {@link
* HighAvailabilityServices}. With the new mode every JobMaster will have a distinct JobID
* assigned.
*/
// 使用HighAvailabilityServices时,应使用此 JobID 来识别旧的 JobManager。
// 在新模式下,每个 JobMaster 都会分配一个不同的 JobID
JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
/** Gets the leader retriever for the cluster's resource manager. */
// 获取集群资源管理器的领导者检索器
LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
* Gets the leader retriever for the dispatcher. This leader retrieval service is not always
* accessible.
*/
// 获取调度程序的领导者检索器。此领导者检索服务并不总是可以访问。
LeaderRetrievalService getDispatcherLeaderRetriever();
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job.
*
* @param jobID The identifier of the job.
*
* @return Leader retrieval service to retrieve the job manager for the given job
*
* @deprecated This method should only be used by the legacy code where the JobManager acts as
* the master.
*/
@Deprecated
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job.
*
* @param jobID The identifier of the job.
* @param defaultJobManagerAddress JobManager address which will be returned by a static leader
* retrieval service.
*
* @return Leader retrieval service to retrieve the job manager for the given job
*/
// 获取负责给定作业的作业 JobMaster 的领导者检索器。
//参数:
//jobID – 作业的标识符。
// defaultJobManagerAddress – 将由静态领导者检索服务返回的 JobManager 地址。
//返回:
//领导者检索服务,用于检索给定作业的作业管理器
LeaderRetrievalService getJobManagerLeaderRetriever(
JobID jobID, String defaultJobManagerAddress);
/**
* This retriever should no longer be used on the cluster side. The web monitor retriever is
* only required on the client-side and we have a dedicated high-availability services for the
* client, named {@link ClientHighAvailabilityServices}. See also FLINK-13750.
*
* @return the leader retriever for web monitor
*
* @deprecated just use {@link #getClusterRestEndpointLeaderRetriever()} instead of this method.
*/
@Deprecated
default LeaderRetrievalService getWebMonitorLeaderRetriever() {
throw new UnsupportedOperationException(
"getWebMonitorLeaderRetriever should no longer be used. Instead use "
+ "#getClusterRestEndpointLeaderRetriever to instantiate the cluster "
+ "rest endpoint leader retriever. If you called this method, then "
+ "make sure that #getClusterRestEndpointLeaderRetriever has been "
+ "implemented by your HighAvailabilityServices implementation.");
}
/** Gets the {@link LeaderElection} for the cluster's resource manager. */
// 获取集群资源管理器的LeaderElection 。
LeaderElection getResourceManagerLeaderElection();
/** Gets the {@link LeaderElection} for the cluster's dispatcher. */
// 获取集群调度程序的LeaderElection 。
LeaderElection getDispatcherLeaderElection();
/** Gets the {@link LeaderElection} for the job with the given {@link JobID}. */
// 获取具有给定JobID的作业的LeaderElection 。
LeaderElection getJobManagerLeaderElection(JobID jobID);
/**
* Gets the {@link LeaderElection} for the cluster's rest endpoint.
*
* @deprecated Use {@link #getClusterRestEndpointLeaderElection()} instead.
*/
@Deprecated
default LeaderElection getWebMonitorLeaderElection() {
throw new UnsupportedOperationException(
"getWebMonitorLeaderElectionService should no longer be used. Instead use "
+ "#getClusterRestEndpointLeaderElectionService to instantiate the cluster "
+ "rest endpoint's leader election service. If you called this method, then "
+ "make sure that #getClusterRestEndpointLeaderElectionService has been "
+ "implemented by your HighAvailabilityServices implementation.");
}
/**
* Gets the checkpoint recovery factory for the job manager.
*
* @return Checkpoint recovery factory
*/
// 获取作业管理器的检查点恢复工厂。
//返回:
//检查点恢复工厂
CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
/**
* Gets the submitted job graph store for the job manager.
*
* @return Submitted job graph store
*
* @throws Exception if the submitted job graph store could not be created
*/
// 获取作业管理器提交的作业图存储。
//返回:
//提交的作业图存储
JobGraphStore getJobGraphStore() throws Exception;
/**
* Gets the store that holds information about the state of finished jobs.
*
* @return Store of finished job results
*
* @throws Exception if job result store could not be created
*/
// 获取保存有关已完成作业状态信息的存储。
//返回:
//存储已完成的作业结果
JobResultStore getJobResultStore() throws Exception;
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*
* @return Blob store
*
* @throws IOException if the blob store could not be created
*/
// 创建 BLOB 存储,其中以高可用性方式存储 BLOB。
//返回:
//斑点商店
BlobStore createBlobStore() throws IOException;
/** Gets the {@link LeaderElection} for the cluster's rest endpoint. */
default LeaderElection getClusterRestEndpointLeaderElection() {
// for backwards compatibility we delegate to getWebMonitorLeaderElectionService
// all implementations of this interface should override
// getClusterRestEndpointLeaderElectionService, though
return getWebMonitorLeaderElection();
}
@Override
default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
// for backwards compatibility we delegate to getWebMonitorLeaderRetriever
// all implementations of this interface should override
// getClusterRestEndpointLeaderRetriever, though
return getWebMonitorLeaderRetriever();
}
// ------------------------------------------------------------------------
// Shutdown and Cleanup
// ------------------------------------------------------------------------
/**
* Closes the high availability services, releasing all resources.
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores (file
* systems, ZooKeeper, etc). Another instance of the high availability services will be able to
* recover the job.
*
* <p>If an exception occurs during closing services, this method will attempt to continue
* closing other services and report exceptions only after all services have been attempted to
* be closed.
*
* @throws Exception Thrown, if an exception occurred while closing these services.
*/
@Override
void close() throws Exception;
/**
* Deletes all data stored by high availability services in external stores.
*
* <p>After this method was called, any job or session that was managed by these high
* availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to continue the cleanup
* and report exceptions only after all cleanup steps have been attempted.
*
* @throws Exception if an error occurred while cleaning up data stored by them.
*/
// 删除外部存储中高可用性服务存储的所有数据。
//调用此方法后,由这些高可用性服务管理的任何作业或会话都将不可恢复。
//如果清理期间发生异常,此方法将尝试继续清理并仅在尝试所有清理步骤后报告异常。
void cleanupAllData() throws Exception;
/**
* Calls {@link #cleanupAllData()} (if {@code true} is passed as a parameter) before calling
* {@link #close()} on this instance. Any error that appeared during the cleanup will be
* propagated after calling {@code close()}.
*/
default void closeWithOptionalClean(boolean cleanupData) throws Exception {
Throwable exception = null;
if (cleanupData) {
try {
cleanupAllData();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}
try {
close();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
if (exception != null) {
ExceptionUtils.rethrowException(exception);
}
}
@Override
default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
return FutureUtils.completedVoidFuture();
}
}
小结※
本文简单分析了 Flink HA 和Leader选举流程
参考资料: