作为一个分布式系统,Flink 内部不同组件之间通信依赖于 RPC 机制。这篇文章将对 Flink 的 RPC 框架加以分析。
主要抽象※
RpcEndpoint 是对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需要继承该抽象类。另外,对于同一个 RpcEndpoint 的所有 RPC 调用都会在同一个线程(RpcEndpoint 的“主线程”)中执行,因此无需担心并发执行的线程安全问题。
RpcGateway 接口是用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法。在实现一个提供 RPC 调用的组件时,通常需要先定一个接口,该接口继承 RpcGateway 并约定好提供的远程调用的方法。
RpcService 是 RpcEndpoint 的运行时环境, RpcService 提供了启动 RpcEndpoint, 连接到远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法。
RpcServer 相当于 RpcEndpoint 自身的的代理对象(self gateway)。RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得自身的代理,然后调用该Endpoint 提供的服务。
FencedRpcEndpoint 和 FencedRpcGateway要求在调用 RPC 方法时携带 token 信息,只有当调用方提供了 token 和 endpoint 的 token 一致时才允许调用。
源码分析※
RpcGateway :※
//Rpc网关接口,必须由Rpc网关实现。
//定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为对方的客户端代理
public interface RpcGateway {
/**
* Returns the fully qualified address under which the associated rpc endpoint is reachable.
*
* @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
*/
String getAddress();
/**
* Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
*
* @return Fully qualified hostname under which the associated rpc endpoint is reachable
*/
String getHostname();
}
RpcEndpoint ※
protected RpcEndpoint(
RpcService rpcService, String endpointId, Map<String, String> loggingContext) {
// 保存rpcService和endpointId
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 通过 RpcService 启动RpcServer
/**
* 构造的时候调用 rpcService.startServer()启动RpcServer,进入可以接受处理请求的状态,最后将RpcServer绑定到主线程上
* 真正执行起来
* 在RpcEndpoint中还定义了一些放入如 runAsync(Runnable)、callAsync(Callable,Time)方法来执行Rpc调用,值得注意的是在Flink
* 的设计中,对于同一个Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动RpcEndpoint进行Rpc调用时,其会委托RpcServer进行处理
*/
this.rpcServer = rpcService.startServer(this, loggingContext);
this.resourceRegistry = new CloseableRegistry();
// 主线程执行器,所有调用在主线程中串行执行
this.mainThreadExecutor =
new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);
//将mainThreadExecutor 资源注册到 CloseableRegistry上
registerResource(this.mainThreadExecutor);
}
PekkoRpcService:※
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
C rpcEndpoint, Map<String, String> loggingContext) {
checkNotNull(rpcEndpoint, "rpc endpoint");
/**
* 根据RpcEndpoint的类型来创建对应的Actor,目前支持两种Actor的创建
* 1、PekkoRpcActor
* 2、FencedPekkoRpcActor,对PekkoRpcActor进行扩展,能够过滤到与指定token无关的消息
*/
final SupervisorActor.ActorRegistration actorRegistration =
registerRpcActor(rpcEndpoint, loggingContext);
// 这里拿到的是PekkoRpcActor的引用
final ActorRef actorRef = actorRegistration.getActorRef();
final CompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();
LOG.info(
"Starting RPC endpoint for {} at {} .",
rpcEndpoint.getClass().getName(),
actorRef.path());
final String address = PekkoUtils.getRpcURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
// 提取集成RpcEndpoint的所有子类
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(PekkoBasedEndpoint.class);
// 对上述指定的类集合进行代理
final InvocationHandler invocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedPekkoInvocationHandler
invocationHandler =
new FencedPekkoInvocationHandler<>(
address,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
actorTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks,
flinkClassLoader);
} else {
invocationHandler =
new PekkoInvocationHandler(
address,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
actorTerminationFuture,
captureAskCallstacks,
flinkClassLoader);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
//我们不是直接使用系统类加载器,而是从此类派生类加载器。
//在 Flink 嵌入运行并且所有 Flink 代码通过自定义 ClassLoader 动态加载(例如从 OSGI 包)的情况下,效果更好
ClassLoader classLoader = getClass().getClassLoader();
// 针对RpcServer生成一个动态代理
@SuppressWarnings("unchecked")
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
invocationHandler);
return server;
}
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
final String address, final Class<C> clazz) {
// 连接远程Rpc Server,返回的是代理对象
return connectInternal(
address,
clazz,
(ActorRef actorRef) -> {
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
return new PekkoInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
null,
captureAskCallstacks,
flinkClassLoader);
});
}
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, "RpcService is stopped");
LOG.debug(
"Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address,
clazz.getName());
// 根据Pekko Actor地址获取ActorRef
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
// 发送一个握手成功的消息给远程Actor
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
ScalaFutureUtils.toJava(
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMillis())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
// 创建动态代理,并返回
final CompletableFuture<C> gatewayFuture =
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
// PekkoInvocationHandler,针对客户端会调用 invokeRpc
InvocationHandler invocationHandler =
invocationHandlerFactory.apply(actorRef);
// Rather than using the System ClassLoader directly, we derive the
// ClassLoader from this class.
// That works better in cases where Flink runs embedded and
// all Flink code is loaded dynamically
// (for example from an OSGI bundle) through a custom ClassLoader
//我们没有直接使用系统类加载器,而是从此类派生类加载器。
// 在 Flink 嵌入运行并且所有 Flink 代码通过自定义 ClassLoader 动态加载(例如从 OSGI 包)的情况下,效果更好
ClassLoader classLoader = getClass().getClassLoader();
// 创建动态代理
@SuppressWarnings("unchecked")
C proxy =
(C)
Proxy.newProxyInstance(
classLoader,
new Class<?>[] {clazz},
invocationHandler);
return proxy;
},
actorSystem.dispatcher());
return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
PekkoRpcActor.java:※
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
//如果是 RemoteHandshakeMessage 信息,执行 handleHandshakeMessage,处理握手消息
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
//如果是 ControlMessages,执行 handleControlMessage,如启动、停止、中断
.match(ControlMessages.class, this::handleControlMessage)
//其他情况,handleMessage
.matchAny(this::handleMessage)
.build();
}
PekkoInvocationHandler: ※
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 判断方法的类是否为指定的类,符合如下的类,执行本地调用,否则实行远程调用
if (declaringClass.equals(PekkoBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException(
"InvocationHandler does not support the call FencedRpcGateway#"
+ method.getName()
+ ". This indicates that you retrieved a FencedRpcGateway without specifying a "
+ "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
+ "retrieve a properly FencedRpcGateway.");
} else {
// 客户端会进入这里,进行远程的调用
result = invokeRpc(method, args);
}
return result;
}
小结※
这篇文章简单地分析了 Flink 内部的 RPC 框架。首先,通过 RpcService, RpcEndpoint, RpcGateway, RpcServer 等接口和抽象类,确定了 RPC 服务的基本框架;在这套框架的基础上, Flink 借助 Pekko 和动态代理等技术提供了 RPC 调用的具体实现。
参考资料: