Flink V1.20源码阅读笔记(4)- RPC通信

-
-
2024-09-21

作为一个分布式系统,Flink 内部不同组件之间通信依赖于 RPC 机制。这篇文章将对 Flink 的 RPC 框架加以分析。

 

主要抽象

RpcEndpoint 是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承该抽象类。另外,对于同一个 RpcEndpoint 的所有 RPC 调用都会在同一个线程(RpcEndpoint 的“主线程”)中执行,因此无需担心并发执行的线程安全问题。

RpcGateway 接口是用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法。在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法。

RpcService 是 RpcEndpoint 的运行时环境, RpcService 提供了启动 RpcEndpoint, 连接到远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法。

RpcServer 相当于 RpcEndpoint 自身的的代理对象(self gateway)。RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得自身的代理,然后调用该Endpoint 提供的服务。

FencedRpcEndpoint 和 FencedRpcGateway要求在调用 RPC 方法时携带 token 信息,只有当调用方提供了 token 和 endpoint 的 token 一致时才允许调用。

 

源码分析

RpcGateway :

//Rpc网关接口,必须由Rpc网关实现。
//定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为对方的客户端代理
public interface RpcGateway {

    /**
     * Returns the fully qualified address under which the associated rpc endpoint is reachable.
     *
     * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
     */
    String getAddress();

    /**
     * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
     *
     * @return Fully qualified hostname under which the associated rpc endpoint is reachable
     */
    String getHostname();
}

 

RpcEndpoint 

     protected RpcEndpoint(
            RpcService rpcService, String endpointId, Map<String, String> loggingContext) {
        // 保存rpcService和endpointId
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");

        // 通过 RpcService 启动RpcServer
        /**
         * 构造的时候调用 rpcService.startServer()启动RpcServer,进入可以接受处理请求的状态,最后将RpcServer绑定到主线程上
         * 真正执行起来
         * 在RpcEndpoint中还定义了一些放入如 runAsync(Runnable)、callAsync(Callable,Time)方法来执行Rpc调用,值得注意的是在Flink
         * 的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint进行Rpc调用时,其会委托RpcServer进行处理
         */
        this.rpcServer = rpcService.startServer(this, loggingContext);
        this.resourceRegistry = new CloseableRegistry();

        // 主线程执行器,所有调用在主线程中串行执行
        this.mainThreadExecutor =
                new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);
        //将mainThreadExecutor 资源注册到 CloseableRegistry上
        registerResource(this.mainThreadExecutor);
    }

 

PekkoRpcService:

    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
            C rpcEndpoint, Map<String, String> loggingContext) {
        checkNotNull(rpcEndpoint, "rpc endpoint");

        /**
         * 根据RpcEndpoint的类型来创建对应的Actor,目前支持两种Actor的创建
         * 1、PekkoRpcActor
         * 2、FencedPekkoRpcActor,对PekkoRpcActor进行扩展,能够过滤到与指定token无关的消息
         */
        final SupervisorActor.ActorRegistration actorRegistration =
                registerRpcActor(rpcEndpoint, loggingContext);

        // 这里拿到的是PekkoRpcActor的引用
        final ActorRef actorRef = actorRegistration.getActorRef();
        final CompletableFuture<Void> actorTerminationFuture =
                actorRegistration.getTerminationFuture();

        LOG.info(
                "Starting RPC endpoint for {} at {} .",
                rpcEndpoint.getClass().getName(),
                actorRef.path());

        final String address = PekkoUtils.getRpcURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }

        // 提取集成RpcEndpoint的所有子类
        Set<Class<?>> implementedRpcGateways =
                new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(PekkoBasedEndpoint.class);

        // 对上述指定的类集合进行代理
        final InvocationHandler invocationHandler;

        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedPekkoInvocationHandler
            invocationHandler =
                    new FencedPekkoInvocationHandler<>(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                            captureAskCallstacks,
                            flinkClassLoader);
        } else {
            invocationHandler =
                    new PekkoInvocationHandler(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            captureAskCallstacks,
                            flinkClassLoader);
        }

        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        //我们不是直接使用系统类加载器,而是从此类派生类加载器。
        //在 Flink 嵌入运行并且所有 Flink 代码通过自定义 ClassLoader 动态加载(例如从 OSGI 包)的情况下,效果更好
        ClassLoader classLoader = getClass().getClassLoader();

        // 针对RpcServer生成一个动态代理
        @SuppressWarnings("unchecked")
        RpcServer server =
                (RpcServer)
                        Proxy.newProxyInstance(
                                classLoader,
                                implementedRpcGateways.toArray(
                                        new Class<?>[implementedRpcGateways.size()]),
                                invocationHandler);

        return server;
    }
    
    @Override
    public <C extends RpcGateway> CompletableFuture<C> connect(
            final String address, final Class<C> clazz) {

        // 连接远程Rpc Server,返回的是代理对象
        return connectInternal(
                address,
                clazz,
                (ActorRef actorRef) -> {
                    Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

                    return new PekkoInvocationHandler(
                            addressHostname.f0,
                            addressHostname.f1,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            null,
                            captureAskCallstacks,
                            flinkClassLoader);
                });
    }
    
    private <C extends RpcGateway> CompletableFuture<C> connectInternal(
            final String address,
            final Class<C> clazz,
            Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
        checkState(!stopped, "RpcService is stopped");

        LOG.debug(
                "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
                address,
                clazz.getName());

        // 根据Pekko Actor地址获取ActorRef
        final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);

        // 发送一个握手成功的消息给远程Actor
        final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
                actorRefFuture.thenCompose(
                        (ActorRef actorRef) ->
                                ScalaFutureUtils.toJava(
                                        Patterns.ask(
                                                        actorRef,
                                                        new RemoteHandshakeMessage(
                                                                clazz, getVersion()),
                                                        configuration.getTimeout().toMillis())
                                                .<HandshakeSuccessMessage>mapTo(
                                                        ClassTag$.MODULE$
                                                                .<HandshakeSuccessMessage>apply(
                                                                        HandshakeSuccessMessage
                                                                                .class))));

        // 创建动态代理,并返回
        final CompletableFuture<C> gatewayFuture =
                actorRefFuture.thenCombineAsync(
                        handshakeFuture,
                        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                            // PekkoInvocationHandler,针对客户端会调用 invokeRpc
                            InvocationHandler invocationHandler =
                                    invocationHandlerFactory.apply(actorRef);

                            // Rather than using the System ClassLoader directly, we derive the
                            // ClassLoader from this class.
                            // That works better in cases where Flink runs embedded and
                            // all Flink code is loaded dynamically
                            // (for example from an OSGI bundle) through a custom ClassLoader
                            //我们没有直接使用系统类加载器,而是从此类派生类加载器。
                            // 在 Flink 嵌入运行并且所有 Flink 代码通过自定义 ClassLoader 动态加载(例如从 OSGI 包)的情况下,效果更好
                            ClassLoader classLoader = getClass().getClassLoader();

                            // 创建动态代理
                            @SuppressWarnings("unchecked")
                            C proxy =
                                    (C)
                                            Proxy.newProxyInstance(
                                                    classLoader,
                                                    new Class<?>[] {clazz},
                                                    invocationHandler);

                            return proxy;
                        },
                        actorSystem.dispatcher());

        return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
    }

 

PekkoRpcActor.java:

     @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                //如果是 RemoteHandshakeMessage 信息,执行 handleHandshakeMessage,处理握手消息
                .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
                //如果是 ControlMessages,执行 handleControlMessage,如启动、停止、中断
                .match(ControlMessages.class, this::handleControlMessage)
                //其他情况,handleMessage
                .matchAny(this::handleMessage)
                .build();
    }

 

PekkoInvocationHandler: 

     @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

        // 判断方法的类是否为指定的类,符合如下的类,执行本地调用,否则实行远程调用
        if (declaringClass.equals(PekkoBasedEndpoint.class)
                || declaringClass.equals(Object.class)
                || declaringClass.equals(RpcGateway.class)
                || declaringClass.equals(StartStoppable.class)
                || declaringClass.equals(MainThreadExecutable.class)
                || declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) {
            throw new UnsupportedOperationException(
                    "InvocationHandler does not support the call FencedRpcGateway#"
                            + method.getName()
                            + ". This indicates that you retrieved a FencedRpcGateway without specifying a "
                            + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
                            + "retrieve a properly FencedRpcGateway.");
        } else {
            //  客户端会进入这里,进行远程的调用
            result = invokeRpc(method, args);
        }

        return result;
    }

 

小结

这篇文章简单地分析了 Flink 内部的 RPC 框架。首先,通过 RpcService, RpcEndpoint, RpcGateway, RpcServer 等接口和抽象类,确定了 RPC 服务的基本框架;在这套框架的基础上, Flink 借助 Pekko 和动态代理等技术提供了 RPC 调用的具体实现。

 

参考资料:


目录