Flink V1.20源码阅读笔记(5)- 集群启动流程之HA

-
-
2024-10-05

在这篇文章中,我们将对 Flink HA加以分析。

 

HA 及 Leader 选举

Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。

Leader 地址的获取通过 LeaderRetrievalListener 和 LeaderRetrievalService 这两个接口来完成。 LeaderRetrievalService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。

/**
 * Classes which want to be notified about a changing leader by the {@link LeaderRetrievalService}
 * have to implement this interface.
 */
//希望由LeaderRetrievalService通知有关更改领导者的类必须实现此接口。
public interface LeaderRetrievalListener {

    /**
     * This method is called by the {@link LeaderRetrievalService} when a new leader is elected.
     *
     * <p>If both arguments are null then it signals that leadership was revoked without a new
     * leader having been elected.
     *
     * @param leaderAddress The address of the new leader
     * @param leaderSessionID The new leader session ID
     */
    //当选新的领导者时, LeaderRetrievalService将调用此方法。
    //如果两个参数都为空,则表示在没有选举新领导人的情况下领导层被撤销。
    //参数:
    //leaderAddress -新领导者 的地址
    //leaderSessionID -新领导者会话ID
    void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);

    /**
     * This method is called by the {@link LeaderRetrievalService} in case of an exception. This
     * assures that the {@link LeaderRetrievalListener} is aware of any problems occurring in the
     * {@link LeaderRetrievalService} thread.
     *
     * @param exception
     */
    //此方法由LeaderRetrievalService在发生异常时调用。
    // 这确保LeaderRetrievalListener知道在LeaderRetrievalService线程中发生的任何问题
    void handleError(Exception exception);
}
/**
 * This interface has to be implemented by a service which retrieves the current leader and notifies
 * a listener about it.
 *
 * <p>Prior to using this service it has to be started by calling the start method. The start method
 * also takes the {@link LeaderRetrievalListener} as an argument. The service can only be started
 * once.
 *
 * <p>The service should be stopped by calling the stop method.
 */
//此接口必须由检索当前leader并通知侦听器的服务来实现。
//在使用此服务之前,必须通过调用start方法来启动它。start方法还将LeaderRetrievalListener作为参数。该服务只能启动一次。
//应通过调用stop方法停止服务。
public interface LeaderRetrievalService {

    /**
     * Starts the leader retrieval service with the given listener to listen for new leaders. This
     * method can only be called once.
     *
     * @param listener The leader retrieval listener which will be notified about new leaders.
     * @throws Exception
     */
    //使用给定的侦听器启动leader检索服务,以侦听新的leader。此方法只能调用一次。
    //参数:
    //监听器-领导者检索侦听器,将收到有关新领导者的通知。
    void start(LeaderRetrievalListener listener) throws Exception;

    /**
     * Stops the leader retrieval service.
     *
     * @throws Exception
     */
    //停止leader检索服务。
    void stop() throws Exception;
}

 

GatewayRetriever 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriever 则同时继承了 LeaderRetriever 和 GatewayRetriever,因而

1)可以在Leader选举完成后得到 Leader地址 

2)可以获取到 Leader的 RpcGateway。

 

RpcGatewayRetriever是 LeaderGatewayRetriever 的具体实现,根据 Leader 的地址通过 rpcService.connect 方法获得对应 Leader 的 RpcGateway。

/**
 * {@link LeaderGatewayRetriever} implementation using the {@link RpcService}.
 *
 * @param <F> type of the fencing token
 * @param <T> type of the fenced gateway to retrieve
 */
//LeaderGatewayRetriever使用RpcService实现。
public class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>>
        extends LeaderGatewayRetriever<T> {

    private final RpcService rpcService;
    private final Class<T> gatewayType;
    private final Function<UUID, F> fencingTokenMapper;
    private final RetryStrategy retryStrategy;

    public RpcGatewayRetriever(
            RpcService rpcService,
            Class<T> gatewayType,
            Function<UUID, F> fencingTokenMapper,
            RetryStrategy retryStrategy) {
        this.rpcService = Preconditions.checkNotNull(rpcService);
        this.gatewayType = Preconditions.checkNotNull(gatewayType);
        this.fencingTokenMapper = Preconditions.checkNotNull(fencingTokenMapper);
        this.retryStrategy = Preconditions.checkNotNull(retryStrategy);
    }

    @Override
    protected CompletableFuture<T> createGateway(
            CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
        //根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway
        return FutureUtils.retryWithDelay(
                () ->
                        leaderFuture.thenCompose(
                                (Tuple2<String, UUID> addressLeaderTuple) ->
                                        rpcService.connect(
                                                addressLeaderTuple.f0,
                                                fencingTokenMapper.apply(addressLeaderTuple.f1),
                                                gatewayType)),
                retryStrategy,
                rpcService.getScheduledExecutor());
    }
}

 

Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderElection(LeaderElectionService的代理) 和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender竞选成功了,会通过 LeaderContender#grantLeadership得到通知。

其Leader选举基本原理是同一类的LeaderContender,每个LeaderContender在启动时都会创建一个LeaderLatch,使用相同的leader path和其他LeaderLatch交互,其中一个最终会被选举为leader。

 

LeaderLatch在启动执行start()方法之前会先调用addListener(...)方法添加一个LeaderLatchListener,在被选举为leader时回调LeaderLatchListener的isLeader()方法,isLeader()方法即包含选举leader成功后LeaderContender的后续操作。在未被选举为leader时,会回调LeaderLatchListener的notLeader()方法,保持备用状态。

isLeader() 最终会回调到grantLeadership方法

/**
 * Interface for a service which allows to elect a leader among a group of contenders.
 *
 * <p>Prior to using this service, it has to be started calling the start method. The start method
 * takes the contender as a parameter. If there are multiple contenders, then each contender has to
 * instantiate its own leader election service.
 *
 * <p>Once a contender has been granted leadership he has to confirm the received leader session ID
 * by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify
 * the leader election service, that the contender has accepted the leadership specified and that
 * the leader session id as well as the leader address can now be published for leader retrieval
 * services.
 */
//服务的接口,允许在一组竞争者中选出一个领导者。
//在使用此服务之前,必须调用start方法启动它。start方法将竞争者作为参数。
// 如果存在多个竞争者,则每个竞争者必须实例化其自己的领导者选举服务。
//一旦竞争者被授予领导地位,他必须通过调用LeaderElection. Confirmleadiablishing (UUID,String) 方法来确认收到的领导会话ID。
//这将通知领导者选举服务,竞争者已接受指定的领导,并且现在可以为领导者检索服务发布领导者会话id以及领导者地址
public interface LeaderElectionService {

    /**
     * Creates a new {@link LeaderElection} instance that is registered to this {@code
     * LeaderElectionService} instance.
     *
     * @param componentId a unique identifier that refers to the stored leader information that the
     *     newly created {@link LeaderElection} manages.
     */
    //创建一个新的领导选举注册到此的实例LeaderElectionService实例。
    //参数:
    //组件id-引用新创建的存储的领导者信息的唯一标识符领导选举管理。
    LeaderElection createLeaderElection(String componentId);
}
/**
 * {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
 * LeaderContender}.
 */
//LeaderElectionService和LeaderContender之间充当LeaderElectionService的代理
public interface LeaderElection extends AutoCloseable {

    /** Registers the passed {@link LeaderContender} with the leader election process. */
    //将通过的LeaderContender注册到领导者选举流程。
    void startLeaderElection(LeaderContender contender) throws Exception;

    /**
     * Confirms that the {@link LeaderContender} has accepted the leadership identified by the given
     * leader session id. It also publishes the leader address under which the leader is reachable.
     *
     * <p>The intention of this method is to establish an order between setting the new leader
     * session ID in the {@link LeaderContender} and publishing the new leader session ID and the
     * related leader address to the leader retrieval services.
     *
     * @param leaderSessionID The new leader session ID
     * @param leaderAddress The address of the new leader
     */
    //确认LeaderContender已接受由给定领导会话id标识的领导。它还会发布领导者地址,在该地址下可以访问领导者。
    //此方法的目的是在LeaderContender中设置新的leader会话ID与
    //将新的leader会话ID和相关的leader地址发布到leader检索服务之间建立顺序。
    //参数:
    //leaderAddress -新的领导者会话ID 
    //leaderAddress -新的领导者
    void confirmLeadership(UUID leaderSessionID, String leaderAddress);

    /**
     * Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
     * given leader session ID acquired.
     *
     * @param leaderSessionId identifying the current leader
     * @return true if the associated {@link LeaderContender} is the leader, otherwise false
     */
    //如果服务的LeaderContender在获取的给定领导者会话ID下具有领导,则返回true。
    //参数:
    //leaderSessionId -识别当前领导者
    //return:
    //如果关联的LeaderContender是领导者,则为true,否则为false
    boolean hasLeadership(UUID leaderSessionId);

    /**
     * Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the
     * underlying leader election. {@link LeaderContender#revokeLeadership()} will be called if the
     * service still holds the leadership.
     */
    //通过从基础leader选举中取消注册LeaderContender来关闭LeaderElection。
    //LeaderContender. revokeLeadership() 如果服务仍然拥有领导地位,将被调用。
    void close() throws Exception;
}
/**
 * Interface which has to be implemented to take part in the leader election process of the {@link
 * LeaderElectionService}.
 */
//必须实现以参与LeaderElectionService的领导者选举过程的接口。
public interface LeaderContender {

    /**
     * Callback method which is called by the {@link LeaderElectionService} upon selecting this
     * instance as the new leader. The method is called with the new leader session ID.
     *
     * @param leaderSessionID New leader session ID
     */
//回调方法,该方法由LeaderElectionService在选择此实例作为新领导者时调用。使用新的leader会话ID调用该方法。
//参数:
//leaderSessionID -新的领导者会话ID
    void grantLeadership(UUID leaderSessionID);

    /**
     * Callback method which is called by the {@link LeaderElectionService} upon revoking the
     * leadership of a former leader. This might happen in case that multiple contenders have been
     * granted leadership.
     */
//    回调方法,该方法由LeaderElectionService在撤销前leader的领导时调用。这可能发生在多个竞争者被授予领导权的情况下。
    void revokeLeadership();

    /**
     * Callback method which is called by {@link LeaderElectionService} in case of an error in the
     * service thread.
     *
     * @param exception Caught exception
     */
//    LeaderElectionService在服务线程中出现错误时调用的回调方法。
    void handleError(Exception exception);
}

 

LeaderElection有多种实现,如无需进行选举过程的 StandaloneLeaderElection,以及DefaultLeaderElection,具体的实现细节可参考对应的源码。

 

 

HighAvailabilityServices接口则提供了获取 HA 相关所有服务的方法,包括:

  • ResourceManager 领导者选举和领导者检索
  • JobManager 领导者选举和领导者检索
  • 检查点元数据的持久性
  • 注册最新完成的检查点
  • BLOB 存储的持久性
  • 标记作业状态的注册表
  • RPC端点的命名
/**
 * The HighAvailabilityServices give access to all services needed for a highly-available setup. In
 * particular, the services provide access to highly available storage and registries, as well as
 * distributed counters and leader election.
 *
 * <ul>
 *   <li>ResourceManager leader election and leader retrieval
 *   <li>JobManager leader election and leader retrieval
 *   <li>Persistence for checkpoint metadata
 *   <li>Registering the latest completed checkpoint(s)
 *   <li>Persistence for the BLOB store
 *   <li>Registry that marks a job's status
 *   <li>Naming of RPC endpoints
 * </ul>
 */
//HighAvailabilityServices 允许访问高可用性设置所需的所有服务。
// 特别是,这些服务提供对高可用存储和注册表的访问,以及分布式计数器和领导者选举。
//ResourceManager 领导者选举和领导者检索
//JobManager 领导者选举和领导者检索
//检查点元数据的持久性
//注册最新完成的检查点
//BLOB 存储的持久性
//标记作业状态的注册表
//RPC端点的命名
public interface HighAvailabilityServices
        extends ClientHighAvailabilityServices, GloballyCleanableResource {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /**
     * This UUID should be used when no proper leader election happens, but a simple pre-configured
     * leader is used. That is for example the case in non-highly-available standalone setups.
     */
//    当没有发生正确的领导者选举但使用简单的预配置领导者时,应该使用此 UUID。例如,在非高可用性独立设置中就是这种情况。
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);

    /**
     * This JobID should be used to identify the old JobManager when using the {@link
     * HighAvailabilityServices}. With the new mode every JobMaster will have a distinct JobID
     * assigned.
     */
//    使用HighAvailabilityServices时,应使用此 JobID 来识别旧的 JobManager。
//    在新模式下,每个 JobMaster 都会分配一个不同的 JobID
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    /** Gets the leader retriever for the cluster's resource manager. */
//    获取集群资源管理器的领导者检索器
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service is not always
     * accessible.
     */
//    获取调度程序的领导者检索器。此领导者检索服务并不总是可以访问。
    LeaderRetrievalService getDispatcherLeaderRetriever();

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job.
     *
     * @param jobID The identifier of the job.
     *
     * @return Leader retrieval service to retrieve the job manager for the given job
     *
     * @deprecated This method should only be used by the legacy code where the JobManager acts as
     *         the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job.
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by a static leader
     *         retrieval service.
     *
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
//    获取负责给定作业的作业 JobMaster 的领导者检索器。
//参数:
//jobID – 作业的标识符。 
// defaultJobManagerAddress – 将由静态领导者检索服务返回的 JobManager 地址。
//返回:
//领导者检索服务,用于检索给定作业的作业管理器
    LeaderRetrievalService getJobManagerLeaderRetriever(
            JobID jobID, String defaultJobManagerAddress);

    /**
     * This retriever should no longer be used on the cluster side. The web monitor retriever is
     * only required on the client-side and we have a dedicated high-availability services for the
     * client, named {@link ClientHighAvailabilityServices}. See also FLINK-13750.
     *
     * @return the leader retriever for web monitor
     *
     * @deprecated just use {@link #getClusterRestEndpointLeaderRetriever()} instead of this method.
     */
    @Deprecated
    default LeaderRetrievalService getWebMonitorLeaderRetriever() {
        throw new UnsupportedOperationException(
                "getWebMonitorLeaderRetriever should no longer be used. Instead use "
                        + "#getClusterRestEndpointLeaderRetriever to instantiate the cluster "
                        + "rest endpoint leader retriever. If you called this method, then "
                        + "make sure that #getClusterRestEndpointLeaderRetriever has been "
                        + "implemented by your HighAvailabilityServices implementation.");
    }

    /** Gets the {@link LeaderElection} for the cluster's resource manager. */
//    获取集群资源管理器的LeaderElection 。
    LeaderElection getResourceManagerLeaderElection();

    /** Gets the {@link LeaderElection} for the cluster's dispatcher. */
//    获取集群调度程序的LeaderElection 。
    LeaderElection getDispatcherLeaderElection();

    /** Gets the {@link LeaderElection} for the job with the given {@link JobID}. */
//    获取具有给定JobID的作业的LeaderElection 。
    LeaderElection getJobManagerLeaderElection(JobID jobID);

    /**
     * Gets the {@link LeaderElection} for the cluster's rest endpoint.
     *
     * @deprecated Use {@link #getClusterRestEndpointLeaderElection()} instead.
     */
    @Deprecated
    default LeaderElection getWebMonitorLeaderElection() {
        throw new UnsupportedOperationException(
                "getWebMonitorLeaderElectionService should no longer be used. Instead use "
                        + "#getClusterRestEndpointLeaderElectionService to instantiate the cluster "
                        + "rest endpoint's leader election service. If you called this method, then "
                        + "make sure that #getClusterRestEndpointLeaderElectionService has been "
                        + "implemented by your HighAvailabilityServices implementation.");
    }

    /**
     * Gets the checkpoint recovery factory for the job manager.
     *
     * @return Checkpoint recovery factory
     */
//    获取作业管理器的检查点恢复工厂。
//返回:
//检查点恢复工厂
    CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;

    /**
     * Gets the submitted job graph store for the job manager.
     *
     * @return Submitted job graph store
     *
     * @throws Exception if the submitted job graph store could not be created
     */
//    获取作业管理器提交的作业图存储。
//返回:
//提交的作业图存储
    JobGraphStore getJobGraphStore() throws Exception;

    /**
     * Gets the store that holds information about the state of finished jobs.
     *
     * @return Store of finished job results
     *
     * @throws Exception if job result store could not be created
     */
//    获取保存有关已完成作业状态信息的存储。
//返回:
//存储已完成的作业结果
    JobResultStore getJobResultStore() throws Exception;

    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     *
     * @throws IOException if the blob store could not be created
     */
//    创建 BLOB 存储,其中以高可用性方式存储 BLOB。
//返回:
//斑点商店
    BlobStore createBlobStore() throws IOException;

    /** Gets the {@link LeaderElection} for the cluster's rest endpoint. */
    default LeaderElection getClusterRestEndpointLeaderElection() {
        // for backwards compatibility we delegate to getWebMonitorLeaderElectionService
        // all implementations of this interface should override
        // getClusterRestEndpointLeaderElectionService, though
        return getWebMonitorLeaderElection();
    }

    @Override
    default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
        // for backwards compatibility we delegate to getWebMonitorLeaderRetriever
        // all implementations of this interface should override
        // getClusterRestEndpointLeaderRetriever, though
        return getWebMonitorLeaderRetriever();
    }

    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------

    /**
     * Closes the high availability services, releasing all resources.
     *
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores (file
     * systems, ZooKeeper, etc). Another instance of the high availability services will be able to
     * recover the job.
     *
     * <p>If an exception occurs during closing services, this method will attempt to continue
     * closing other services and report exceptions only after all services have been attempted to
     * be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;

    /**
     * Deletes all data stored by high availability services in external stores.
     *
     * <p>After this method was called, any job or session that was managed by these high
     * availability services will be unrecoverable.
     *
     * <p>If an exception occurs during cleanup, this method will attempt to continue the cleanup
     * and report exceptions only after all cleanup steps have been attempted.
     *
     * @throws Exception if an error occurred while cleaning up data stored by them.
     */
//    删除外部存储中高可用性服务存储的所有数据。
//调用此方法后,由这些高可用性服务管理的任何作业或会话都将不可恢复。
//如果清理期间发生异常,此方法将尝试继续清理并仅在尝试所有清理步骤后报告异常。
    void cleanupAllData() throws Exception;

    /**
     * Calls {@link #cleanupAllData()} (if {@code true} is passed as a parameter) before calling
     * {@link #close()} on this instance. Any error that appeared during the cleanup will be
     * propagated after calling {@code close()}.
     */
    default void closeWithOptionalClean(boolean cleanupData) throws Exception {
        Throwable exception = null;
        if (cleanupData) {
            try {
                cleanupAllData();
            } catch (Throwable t) {
                exception = ExceptionUtils.firstOrSuppressed(t, exception);
            }
        }
        try {
            close();
        } catch (Throwable t) {
            exception = ExceptionUtils.firstOrSuppressed(t, exception);
        }

        if (exception != null) {
            ExceptionUtils.rethrowException(exception);
        }
    }

    @Override
    default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
        return FutureUtils.completedVoidFuture();
    }
}

 

小结

本文简单分析了 Flink HA 和Leader选举流程

 

参考资料:


目录