本篇继续解析Task构造函数内部的构造事宜,继而解析StreamTask启动过程。
核心对象说明※
ResultPartitionWriter:ResultPartitionWriter面向的是Buffer,在数据传输层次中处于最低层,其子类实现中包含一个BufferPool组件,提供Buffer资源。子类实现中包含一个数组结构ResultSubpartition[] subpartitions的子分区组件,用来承接上层RecordWriter对象分发下来的数据。
RecordWriter:RecordWriter负责将Task处理的数据输出,其主要面向StreamRecord。RecordWriter比ResultPartitionWriter层级要高,每一个RecordWriter实例都包含一个ResultPartitionWriter子类实例。RecordWriter主要有两个子类实现,ChannelSelectorRecordWriter、BroadcastRecordWriter。子类中包含设置StreamRecord分区的分区器信息,用来决定每一个StreamRecord数据所属的低层级ResultPartitionWriter实例中subpartitions数组组件的下标元素。RecordWriterDelegate是RecordWriter类的代理类。代理零个、一个、多个RecordWriter对象。
Output:Output接口继承于Collector接口,Collector接口即为flatMap等api方法中Collector参数类型。Output是算子向下游传递的数据抽象,用来向下游发送StreamRecord、Watermark、LatencyMark等事件元素。
RecordWriterOutput主要负责数据跨网络的输出,
ChainingOutput主要在算子链内传递数据
BroadcastingOutputCollector包含一组Output,向下游所有Task广播数据。
数据传递:Flink的数据传递过程主要分为三类
第一类是算子链内部算子之间的数据传递,算子链所有算子在同一个本地线程内链式调用processElements()方法。
第二类是本地线程间数据传递,存在某些有上下游关系的Task被分配到同一个机器节点的TaskManager中,以LocalInputChannel为中介,利用java对象的wait()/notifyAll()机制来等待上游数据发送及上游数据具备时唤醒数据消费动作。
第三类跨网络间数据传递,利用RemoteInputChannel组件以及Netty的请求响应机制来跨网络间传递数据。
StreamInputProcessor输入处理器:StreamInputProcessor是StreamTask中读取数据的抽象,负责完成数据的读取、处理、输出给下游的过程。在其子类实现中往往会包含StreamTaskInput、DataOutput两个重要的组件,来共同完成数据读取、处理、输出给下游动作。
StreamTaskInput Task输入:StreamTaskInput是StreamTask数据输入的抽象,其包含InputGate组件,InputGate用来读取上游Task的数据。
- DataOutput Task输出:其子类实现是StreamTaskNetworkOutput,StreamTaskNetworkOutput只负责将数据交给算子来进行处理,算子处理完数据后写出数据的动作由上面的Output负责。
Task构造过程解析※
ResultPartitionWriter[] partitionWriters 成员生成过程※
org.apache.flink.runtime.taskmanager.Task#Task
public Task(
JobInformation jobInformation,
TaskInformation taskInformation,
ExecutionAttemptID executionAttemptID,
AllocationID slotAllocationId,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
MemoryManager memManager,
SharedResources sharedResources,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager bcVarManager,
TaskEventDispatcher taskEventDispatcher,
ExternalResourceInfoProvider externalResourceInfoProvider,
TaskStateManager taskStateManager,
TaskManagerActions taskManagerActions,
InputSplitProvider inputSplitProvider,
CheckpointResponder checkpointResponder,
TaskOperatorEventGateway operatorCoordinatorEventGateway,
GlobalAggregateManager aggregateManager,
LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
FileCache fileCache,
TaskManagerRuntimeInfo taskManagerConfig,
@Nonnull TaskMetricGroup metricGroup,
PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory) {
// 忽略其他部分
// produced intermediate result partitions
//Task的输出操作
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment
//负责创建partitionWriters成员变量,
//由入参可知包含Task提交过程中创建的ResultPartitionDeploymentDescriptor实例信息,即Task的输出信息。
.createResultPartitionWriters(
taskShuffleContext, resultPartitionDeploymentDescriptors)
.toArray(new ResultPartitionWriter[] {});
this.partitionWriters = resultPartitionWriters;
// 忽略其他部分
}
org.apache.flink.runtime.io.network.NettyShuffleEnvironment#createResultPartitionWriters
public List<ResultPartition> createResultPartitionWriters(
ShuffleIOOwnerContext ownerContext,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors) {
synchronized (lock) {
Preconditions.checkState(
!isClosed, "The NettyShuffleEnvironment has already been shut down.");
ResultPartition[] resultPartitions =
new ResultPartition[resultPartitionDeploymentDescriptors.size()];
for (int partitionIndex = 0;
partitionIndex < resultPartitions.length;
partitionIndex++) {
resultPartitions[partitionIndex] =
//ResultPartition列表长度即下游有多少个算子(注意,是不同算子的实例,不是同一个算子的不同算子实例)消费该Task的数据。
resultPartitionFactory.create(
ownerContext.getOwnerName(),
partitionIndex,
resultPartitionDeploymentDescriptors.get(partitionIndex));
}
//注册指标
registerOutputMetrics(
config.isNetworkDetailedMetrics(),
ownerContext.getOutputGroup(),
resultPartitions);
return Arrays.asList(resultPartitions);
}
}
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#create
public ResultPartition create(
String taskNameWithSubtaskAndId,
int partitionIndex,
ResultPartitionDeploymentDescriptor desc) {
return create(
taskNameWithSubtaskAndId,
partitionIndex,
desc.getShuffleDescriptor().getResultPartitionID(),
desc.getPartitionType(),
desc.getTotalNumberOfPartitions(),
//返回下游一个ExecutionJobVertex中有多少个ExecutionVertex实例消费该Task实例的数据
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
//中间结果是否为广播结果。
desc.isBroadcast(),
desc.getShuffleDescriptor(),
//创建BufferPool工厂
createBufferPoolFactory(
//代表下游同一个算子有多少个算子实例消费该Task数据。
desc.getNumberOfSubpartitions(), desc.getPartitionType()),
desc.isNumberOfPartitionConsumerUndefined());
}
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#create
public ResultPartition create(
String taskNameWithSubtaskAndId,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
int numberOfPartitions,
int numberOfSubpartitions,
int maxParallelism,
boolean isBroadcast,
ShuffleDescriptor shuffleDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
boolean isNumberOfPartitionConsumerUndefined) {
BufferCompressor bufferCompressor = null;
if (type.supportCompression() && batchShuffleCompressionEnabled) {
bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
}
if (tieredStorage.isPresent() && type == ResultPartitionType.BLOCKING) {
LOG.warn(
"When enabling tiered storage, the BLOCKING result partition will be replaced as HYBRID_FULL.");
type = ResultPartitionType.HYBRID_FULL;
}
ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
final ResultPartition partition;
if (type == ResultPartitionType.PIPELINED
|| type == ResultPartitionType.PIPELINED_BOUNDED
|| type == ResultPartitionType.PIPELINED_APPROXIMATE) {
//首先创建隶属于该Task的ResultPartition实例,
final PipelinedResultPartition pipelinedPartition =
new PipelinedResultPartition(
taskNameWithSubtaskAndId,
partitionIndex,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferCompressor,
bufferPoolFactory);
for (int i = 0; i < subpartitions.length; i++) {
if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
subpartitions[i] =
new PipelinedApproximateSubpartition(
i, configuredNetworkBuffersPerChannel, pipelinedPartition);
} else {
//根据numberOfSubpartitions参数,遍历生成ResultSubpartitions数组成员。
//ResultSubpartitions数组是ResultPartition实例的成员变量。
subpartitions[i] =
new PipelinedSubpartition(
i, configuredNetworkBuffersPerChannel, pipelinedPartition);
}
}
partition = pipelinedPartition;
}
// 忽略其他部分
partition.isNumberOfPartitionConsumerUndefined(isNumberOfPartitionConsumerUndefined);
LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
return partition;
}
以上即为Task实例partitionWriters成员生成过程,接下来解析inputGates成员生成过程。
IndexedInputGate[] inputGates成员生成过程※
createInputGates(...)方法为inputGates的生成入口
org.apache.flink.runtime.taskmanager.Task#Task
public Task(
JobInformation jobInformation,
TaskInformation taskInformation,
ExecutionAttemptID executionAttemptID,
AllocationID slotAllocationId,
List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
MemoryManager memManager,
SharedResources sharedResources,
IOManager ioManager,
ShuffleEnvironment<?, ?> shuffleEnvironment,
KvStateService kvStateService,
BroadcastVariableManager bcVarManager,
TaskEventDispatcher taskEventDispatcher,
ExternalResourceInfoProvider externalResourceInfoProvider,
TaskStateManager taskStateManager,
TaskManagerActions taskManagerActions,
InputSplitProvider inputSplitProvider,
CheckpointResponder checkpointResponder,
TaskOperatorEventGateway operatorCoordinatorEventGateway,
GlobalAggregateManager aggregateManager,
LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
FileCache fileCache,
TaskManagerRuntimeInfo taskManagerConfig,
@Nonnull TaskMetricGroup metricGroup,
PartitionProducerStateChecker partitionProducerStateChecker,
Executor executor,
ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory) {
//忽略其他
// consumed intermediate result partitions
//消耗的中间结果分区
final IndexedInputGate[] gates =
shuffleEnvironment
//inputGates的生成入口,入参包含Task提交过程中创建的InputGateDeploymentDescriptor实例信息即Task的输入信
.createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
.toArray(new IndexedInputGate[0]);
//Task的输入操作
this.inputGates = new IndexedInputGate[gates.length];
int counter = 0;
for (IndexedInputGate gate : gates) {
inputGates[counter++] =
new InputGateWithMetrics(
gate, metrics.getIOMetricGroup().getNumBytesInCounter());
}
}
org.apache.flink.runtime.io.network.NettyShuffleEnvironment#createInputGates
public List<SingleInputGate> createInputGates(
ShuffleIOOwnerContext ownerContext,
PartitionProducerStateProvider partitionProducerStateProvider,
//InputGateDeploymentDescriptor个数代表该Task消费上游多少个算子数据
List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
synchronized (lock) {
Preconditions.checkState(
!isClosed, "The NettyShuffleEnvironment has already been shut down.");
MetricGroup networkInputGroup = ownerContext.getInputGroup();
InputChannelMetrics inputChannelMetrics =
new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());
SingleInputGate[] inputGates =
new SingleInputGate[inputGateDeploymentDescriptors.size()];
for (int gateIndex = 0; gateIndex < inputGates.length; gateIndex++) {
final InputGateDeploymentDescriptor igdd =
inputGateDeploymentDescriptors.get(gateIndex);
//SingleInputGate实例包含一个重要的成员变量BufferPoolFactory,它负责生成InputGate的数据Buffer空间。
SingleInputGate inputGate =
//遍历InputGateDeploymentDescriptor个数开始创建一个个InputGate实例信息。
singleInputGateFactory.create(
ownerContext,
gateIndex,
igdd,
partitionProducerStateProvider,
inputChannelMetrics);
InputGateID id =
new InputGateID(
igdd.getConsumedResultId(), ownerContext.getExecutionAttemptID());
Set<SingleInputGate> inputGateSet =
inputGatesById.computeIfAbsent(
id, ignored -> ConcurrentHashMap.newKeySet());
inputGateSet.add(inputGate);
inputGatesById.put(id, inputGateSet);
inputGate
.getCloseFuture()
.thenRun(
() ->
inputGatesById.computeIfPresent(
id,
(key, value) -> {
value.remove(inputGate);
if (value.isEmpty()) {
return null;
}
return value;
}));
inputGates[gateIndex] = inputGate;
}
if (config.getDebloatConfiguration().isEnabled()) {
registerDebloatingTaskMetrics(inputGates, ownerContext.getParentGroup());
}
registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup, inputGates);
return Arrays.asList(inputGates);
}
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#create
//创建InputGate及其所有输入通道。
public SingleInputGate create(
@Nonnull ShuffleIOOwnerContext owner,
int gateIndex,
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
// This variable describes whether it is supported to have one input channel consume
// multiple subpartitions, if multiple subpartitions from the same partition are
// assigned to this input gate.
//
// For now this function is only supported in the new mode of Hybrid Shuffle.
//如果来自同一分区的多个子分区被分配给此InputGate,则此变量描述是否支持让一个输入通道使用多个子分区。
//目前仅在混合洗牌的新模式下支持此功能。
boolean isSharedInputChannelSupported =
igdd.getConsumedPartitionType().isHybridResultPartition()
&& tieredStorageConfiguration != null;
//InputGate的缓冲区规格
GateBuffersSpec gateBuffersSpec =
createGateBuffersSpec(
maxRequiredBuffersPerGate,
configuredNetworkBuffersPerChannel,
floatingNetworkBuffersPerGate,
igdd.getConsumedPartitionType(),
//计算通道数
calculateNumChannels(
igdd.getShuffleDescriptors().length,
//流式作业subpartitionIndexRange范围一般都是1
igdd.getConsumedSubpartitionIndexRange().size(),
isSharedInputChannelSupported),
tieredStorageConfiguration != null);
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
//尝试创建一个缓冲池,保证至少提供所需数量的缓冲区。
createBufferPoolFactory(
networkBufferPool,
gateBuffersSpec.getRequiredFloatingBuffers(),
gateBuffersSpec.getTotalFloatingBuffers());
BufferDecompressor bufferDecompressor = null;
if (igdd.getConsumedPartitionType().supportCompression()
&& batchShuffleCompressionEnabled) {
// 创建压缩Buffer的解压缩器。
bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
}
final String owningTaskName = owner.getOwnerName();
final MetricGroup networkInputGroup = owner.getInputGroup();
ResultSubpartitionIndexSet subpartitionIndexSet =
new ResultSubpartitionIndexSet(igdd.getConsumedSubpartitionIndexRange());
//开始创建SingleInputGate实例
SingleInputGate inputGate =
new SingleInputGate(
owningTaskName,
gateIndex,
igdd.getConsumedResultId(),
igdd.getConsumedPartitionType(),
calculateNumChannels(
igdd.getShuffleDescriptors().length,
subpartitionIndexSet.size(),
isSharedInputChannelSupported),
partitionProducerStateProvider,
bufferPoolFactory,
bufferDecompressor,
networkBufferPool,
networkBufferSize,
new ThroughputCalculator(SystemClock.getInstance()),
maybeCreateBufferDebloater(
owningTaskName, gateIndex, networkInputGroup.addGroup(gateIndex)));
//开始创建每个SingleInputGate的InputChannel数组成员。和 分层存储服务
createInputChannelsAndTieredStorageService(
owningTaskName,
igdd,
inputGate,
subpartitionIndexSet,
gateBuffersSpec,
metrics,
isSharedInputChannelSupported);
return inputGate;
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createInputChannelsAndTieredStorageService
private void createInputChannelsAndTieredStorageService(
String owningTaskName,
InputGateDeploymentDescriptor inputGateDeploymentDescriptor,
SingleInputGate inputGate,
ResultSubpartitionIndexSet subpartitionIndexSet,
GateBuffersSpec gateBuffersSpec,
InputChannelMetrics metrics,
boolean isSharedInputChannelSupported) {
ShuffleDescriptor[] shuffleDescriptors =
inputGateDeploymentDescriptor.getShuffleDescriptors();
// Create the input channels. There is one input channel for each consumed subpartition.
//创建输入通道。每个消耗的子分区都有一个输入通道。
InputChannel[] inputChannels =
new InputChannel
[calculateNumChannels(
shuffleDescriptors.length,
subpartitionIndexSet.size(),
isSharedInputChannelSupported)];
ChannelStatistics channelStatistics = new ChannelStatistics();
int channelIdx = 0;
final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs = new ArrayList<>();
List<List<TierShuffleDescriptor>> tierShuffleDescriptors = new ArrayList<>();
//多数情况下每个算子有多个算子实例,Task会为每个上游算子的每个算子实例创建一个InputChannel通道信息。
//一个Task会消费上游算子的多个算子实例,通过InputChannel和每个算子实例产生关联关系。
for (ShuffleDescriptor descriptor : shuffleDescriptors) {
TieredStoragePartitionId partitionId =
TieredStorageIdMappingUtils.convertId(descriptor.getResultPartitionID());
if (isSharedInputChannelSupported) {
inputChannels[channelIdx] =
createInputChannel(
inputGate,
channelIdx,
gateBuffersSpec.getEffectiveExclusiveBuffersPerChannel(),
descriptor,
subpartitionIndexSet,
channelStatistics,
metrics);
if (tieredStorageConfiguration != null) {
addTierShuffleDescriptors(tierShuffleDescriptors, descriptor);
tieredStorageConsumerSpecs.add(
new TieredStorageConsumerSpec(
inputGate.getInputGateIndex(),
partitionId,
new TieredStorageInputChannelId(channelIdx),
subpartitionIndexSet));
}
channelIdx++;
} else {
for (int subpartitionIndex : subpartitionIndexSet.values()) {
inputChannels[channelIdx] =
createInputChannel(
inputGate,
channelIdx,
gateBuffersSpec.getEffectiveExclusiveBuffersPerChannel(),
descriptor,
new ResultSubpartitionIndexSet(subpartitionIndex),
channelStatistics,
metrics);
if (tieredStorageConfiguration != null) {
addTierShuffleDescriptors(tierShuffleDescriptors, descriptor);
tieredStorageConsumerSpecs.add(
new TieredStorageConsumerSpec(
inputGate.getInputGateIndex(),
partitionId,
new TieredStorageInputChannelId(channelIdx),
new ResultSubpartitionIndexSet(subpartitionIndex)));
}
channelIdx++;
}
}
}
inputGate.setInputChannels(inputChannels);
if (tieredStorageConfiguration != null) {
TieredStorageConsumerClient tieredStorageConsumerClient =
new TieredStorageConsumerClient(
tieredStorageConfiguration.getTierFactories(),
tieredStorageConsumerSpecs,
tierShuffleDescriptors,
tieredStorageNettyService);
inputGate.setTieredStorageService(
tieredStorageConsumerSpecs,
tieredStorageConsumerClient,
tieredStorageNettyService);
}
LOG.debug(
"{}: Created {} input channels ({}).",
owningTaskName,
inputChannels.length,
channelStatistics);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createInputChannel
private InputChannel createInputChannel(
SingleInputGate inputGate,
int index,
int buffersPerChannel,
ShuffleDescriptor shuffleDescriptor,
ResultSubpartitionIndexSet subpartitionIndexSet,
ChannelStatistics channelStatistics,
InputChannelMetrics metrics) {
//对已知和未知的ShuffleDescriptor应用不同的函数。
//还强制转换已知的ShuffleDescriptor 。
return applyWithShuffleTypeCheck(
NettyShuffleDescriptor.class,
shuffleDescriptor,
unknownShuffleDescriptor -> {
channelStatistics.numUnknownChannels++;
//创建未知输入通道
return new UnknownInputChannel(
inputGate,
index,
unknownShuffleDescriptor.getResultPartitionID(),
subpartitionIndexSet,
partitionManager,
taskEventPublisher,
connectionManager,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
partitionRequestListenerTimeout,
buffersPerChannel,
metrics);
},
nettyShuffleDescriptor ->
//创建已知的输入通道
createKnownInputChannel(
inputGate,
index,
buffersPerChannel,
nettyShuffleDescriptor,
subpartitionIndexSet,
channelStatistics,
metrics));
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createKnownInputChannel
@VisibleForTesting
protected InputChannel createKnownInputChannel(
SingleInputGate inputGate,
int index,
int buffersPerChannel,
NettyShuffleDescriptor inputChannelDescriptor,
ResultSubpartitionIndexSet subpartitionIndexSet,
ChannelStatistics channelStatistics,
InputChannelMetrics metrics) {
ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
//根据上下游Task实例所在的机器位置信息判断创建不同的LocalRecoveredInputChannel、RemoteRecoveredInputChannel类型实例。
if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
// Consuming task is deployed to the same TaskManager as the partition => local
//消费任务部署到与分区相同的TaskManager => local
channelStatistics.numLocalChannels++;
return new LocalRecoveredInputChannel(
inputGate,
index,
partitionId,
subpartitionIndexSet,
partitionManager,
taskEventPublisher,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
buffersPerChannel,
metrics);
} else {
// Different instances => remote
//不同的实例 => 远程
channelStatistics.numRemoteChannels++;
return new RemoteRecoveredInputChannel(
inputGate,
index,
partitionId,
subpartitionIndexSet,
inputChannelDescriptor.getConnectionId(),
connectionManager,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
partitionRequestListenerTimeout,
buffersPerChannel,
metrics);
}
}
以上即为Task实例初始化过程中输入输出操作的创建过程,接下来解析StreamTask的构成过程。
StreamTask构造过程解析※
Task类继承于Runnable接口,在创建完Task实例后,TaskExecutor.submitTask(...)就会运行Task.run()方法,开始Task的运行过程。
org.apache.flink.runtime.taskmanager.Task#run
@Override
public void run() {
try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
doRun();
} finally {
terminationFuture.complete(executionState);
}
}
org.apache.flink.runtime.taskmanager.Task#doRun
private void doRun() {
// 忽略其他
//主要作用是设置ResultPartition和InputGate的BufferPool信息。
setupPartitionsAndGates(partitionWriters, inputGates);
for (ResultPartitionWriter partitionWriter : partitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
}
// When constructing invokable, separate threads can be constructed and thus should be
// monitored for system exit (in addition to invoking thread itself monitored below).
//构造可调用时,可以构造单独的线程,因此应该监视系统退出(除了下面监视的调用线程本身之外)。
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
// now load and instantiate the task's invokable code
//现在加载并实例化任务的可调用代码
invokable =
//此处反射创建的invokable实例就是StreamTask实例。
loadAndInstantiateInvokable(
userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
//实际任务核心工作
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
//我们必须严格确保当我们切换到运行状态时,cancel() 调用可以访问该可调用对象。
this.invokable = invokable;
//恢复和调用
restoreAndInvoke(invokable);
// 忽略其他
}
org.apache.flink.runtime.taskmanager.Task#setupPartitionsAndGates
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
for (ResultPartitionWriter partition : producedPartitions) {
//方法中会创建LocalBufferPool
partition.setup();
}
// InputGates must be initialized after the partitions, since during InputGate#setup
// we are requesting partitions
//InputGates必须在分区之后初始化,因为在InputGatesetup期间,我们正在请求分区
for (InputGate gate : inputGates) {
gate.setup();
}
}
org.apache.flink.runtime.io.network.partition.ResultPartition#setup
public void setup() throws IOException {
checkState(
this.bufferPool == null,
"Bug in result partition setup logic: Already registered buffer pool.");
//创建bufferPool
//调用的是org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.createBufferPoolFactory
this.bufferPool = checkNotNull(bufferPoolFactory.get());
//做子类自己的设置操作
setupInternal();
//注册结果分区
partitionManager.registerResultPartition(this);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#setup
public void setup() throws IOException {
checkState(
this.bufferPool == null,
"Bug in input gate setup logic: Already registered buffer pool.");
//调用org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(int, int)
BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
if (tieredStorageConsumerClient != null) {
tieredStorageConsumerClient.setup(bufferPool);
}
//将专用缓冲区直接分配给所有远程输入通道,以实现基于信用的模式。
setupChannels();
}
org.apache.flink.runtime.taskmanager.Task#loadAndInstantiateInvokable
private static TaskInvokable loadAndInstantiateInvokable(
ClassLoader classLoader, String className, Environment environment) throws Throwable {
final Class<? extends TaskInvokable> invokableClass;
try {
invokableClass =
Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class);
} catch (Throwable t) {
throw new Exception("Could not load the task's invokable class.", t);
}
Constructor<? extends TaskInvokable> statelessCtor;
try {
//获取只包含Environment入参类型的StreamTask构造函数
statelessCtor = invokableClass.getConstructor(Environment.class);
} catch (NoSuchMethodException ee) {
throw new FlinkException("Task misses proper constructor", ee);
}
// instantiate the class
try {
//noinspection ConstantConditions --> cannot happen
//新建StreamTask实例。
return statelessCtor.newInstance(environment);
} catch (InvocationTargetException e) {
// directly forward exceptions from the eager initialization
throw e.getTargetException();
} catch (Exception e) {
throw new FlinkException("Could not instantiate the task's invokable class.", e);
}
}
org.apache.flink.streaming.runtime.tasks.StreamTask#StreamTask(org.apache.flink.runtime.execution.Environment)
protected StreamTask(Environment env) throws Exception {
this(env, null);
}
org.apache.flink.streaming.runtime.tasks.StreamTask#StreamTask(org.apache.flink.runtime.execution.Environment, org.apache.flink.streaming.runtime.tasks.TimerService, java.lang.Thread.UncaughtExceptionHandler, org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor, org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox)
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
// The registration of all closeable resources. The order of registration is important.
//所有可关闭资源的注册。注册的顺序很重要。
resourceCloser = new AutoCloseableRegistry();
try {
// 忽略其他
//在Flink应用执行数据处理、checkpoint执行、定时器触发等过程中可能会同时修改状态,
//Flink系统通过引入Mailbox线程模型来解决状态操作不一致的情况。
//其中MailboxProcessor负责拉取、处理Mailbox中的Mail,即checkpoint执行、定时器触发等动作,
//而MailboxProcessor成员变量MailboxDefaultAction mailboxDefaultAction
//默认动作负责DataStream上普通消息的处理,包括:处理Event、barrier、Watermark等。
//TaskMailboxImpl为Mailbox的实现,负责存储checkpoint执行、定时器触发等动作,
//MailboxExecutorImpl负责提交生成checkpoint执行、定时器触发等动作。
this.mailboxProcessor =
new MailboxProcessor(
//processInput()方法作为成员mailboxDefaultAction的值,负责常规的数据处理
this::processInput,
mailbox, actionExecutor, mailboxMetricsControl);
// Should be closed last.
// 应该最后关闭。
resourceCloser.registerCloseable(mailboxProcessor);
//RecordWriter创建入口。
this.recordWriter = createRecordWriterDelegate(configuration, environment);
//StateBackend实例创建过程
this.stateBackend = createStateBackend();
// 忽略其他
}
org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriterDelegate
public static <OUT>
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
createRecordWriterDelegate(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
createRecordWriters(configuration, environment);
if (recordWrites.size() == 1) {
return new SingleRecordWriter<>(recordWrites.get(0));
} else if (recordWrites.size() == 0) {
return new NonRecordWriter<>();
} else {
return new MultipleRecordWriters<>(recordWrites);
}
}
org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriters
private static <OUT>
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
new ArrayList<>();
//先获取每个算子不可chain下游的出边集合,
List<NonChainedOutput> outputsInOrder =
configuration.getVertexNonChainedOutputs(
environment.getUserCodeClassLoader().asClassLoader());
int index = 0;
//遍历该集合,根据每个不可chain下游算子的出边创建一个RecordWriter实例。
for (NonChainedOutput streamOutput : outputsInOrder) {
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
environment, streamOutput, index);
recordWriters.add(
//创建
createRecordWriter(
streamOutput,
index++,
environment,
environment.getTaskInfo().getTaskNameWithSubtasks(),
streamOutput.getBufferTimeout()));
}
return recordWriters;
}
org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriter
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
NonChainedOutput streamOutput,
int outputIndex,
Environment environment,
String taskNameWithSubtask,
long bufferTimeout) {
StreamPartitioner<OUT> outputPartitioner = null;
// Clones the partition to avoid multiple stream edges sharing the same stream partitioner,
// like the case of https://issues.apache.org/jira/browse/FLINK-14087.
//克隆分区以避免多个流边缘共享相同的流分区器,例如 https:issues.apache.orgjirabrowseFLINK-14087 的情况。
try {
outputPartitioner =
InstantiationUtil.clone(
(StreamPartitioner<OUT>) streamOutput.getPartitioner(),
environment.getUserCodeClassLoader().asClassLoader());
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
LOG.debug(
"Using partitioner {} for output {} of task {}",
outputPartitioner,
outputIndex,
taskNameWithSubtask);
//从ResultPartitionWriter[]中获取
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
// we initialize the partitioner here with the number of key groups (aka max. parallelism)
//我们在这里用键组的数量 (也称为最大并行性) 初始化分区程序
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
.setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskNameWithSubtask)
//构建RecordWriter
.build(bufferWriter);
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
return output;
}
org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
private StateBackend createStateBackend() throws Exception {
//获取用户配置的StateBackend,配置项为:state.backend.type
final StateBackend fromApplication =
configuration.getStateBackend(getUserCodeClassLoader());
//状态后端加载器
return StateBackendLoader.fromApplicationOrConfigOrDefault(
fromApplication,
getJobConfiguration(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getUserCodeClassLoader(),
LOG);
}
以上即为StreamTask关键构造过程解析。
StreamTask启动过程解析※
在StreamTask实例创建完后紧接着触发该实例的restore()方法,开始StreamTask启动过程的准备工作。
该方法中主要有三个重要的步骤:
一是创建OperatorChain实例
二是创建StreamInputProcessor实例,流输入处理器
三是初始化算子操作。
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke
private void restoreAndInvoke(TaskInvokable finalInvokable) throws Exception {
try {
// switch to the INITIALIZING state, if that fails, we have been canceled/failed in the
// meantime
//切换到初始化状态,如果失败,我们已被取消失败在此期间
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
throw new CancelTaskException();
}
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, ExecutionState.INITIALIZING));
// make sure the user code classloader is accessible thread-locally
//确保用户代码classloader是线程本地可访问的
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
//调用restore
runWithSystemExitMonitoring(finalInvokable::restore);
if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
//通知大家我们已切换到RUNNING
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, ExecutionState.RUNNING));
//调用invoke
runWithSystemExitMonitoring(finalInvokable::invoke);
} catch (Throwable throwable) {
try {
runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(throwable));
} catch (Throwable cleanUpThrowable) {
throwable.addSuppressed(cleanUpThrowable);
}
throw throwable;
}
runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(null));
}
org.apache.flink.streaming.runtime.tasks.StreamTask#restore
public final void restore() throws Exception {
restoreInternal();
}
org.apache.flink.streaming.runtime.tasks.StreamTask#restoreInternal
void restoreInternal() throws Exception {
// 忽略其他
operatorChain =
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
? new FinishedOperatorChain<>(this, recordWriter)
//创建OperatorChain实例,一般来说OperatorChain都是RegularOperatorChain类型,
//转入到RegularOperatorChain构造函数中。
: new RegularOperatorChain<>(this, recordWriter);
mainOperator = operatorChain.getMainOperator();
// task specific initialization
//特定于任务的初始化
//该方法主要由子类实现。常见的StreamTask子类包括OneInputStreamTask和SourceStreamTask等。
init();
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
//我们需要确保在打开所有运算符之前,无法执行open() 中计划的任何触发器
CompletableFuture<Void> allGatesRecoveredFuture =
//在该方法中有一步重要的操作就是初始化算子链中的各个算子。
actionExecutor.call(() -> restoreStateAndGates(initializationMetrics));
// Run mailbox until all gates will be recovered.
//其主要作用是触发mailboxDefaultAction默认动作的执行,即StreamTask.processInput(...)方法的执行。
mailboxProcessor.runMailboxLoop();
// 忽略其他
}
创建OperatorChain实例过程※
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#RegularOperatorChain
public RegularOperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
super(containingTask, recordWriterDelegate);
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#OperatorChain
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
this.operatorEventDispatcher =
new OperatorEventDispatcherImpl(
containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
//算子工厂实例
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
//我们读取了链接的configs,以及按输出名称记录编写器注册的顺序
//可chain的算子配置集合
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
//创建最终输出流编写器
//我们遍历这个作业顶点的所有out边,并创建一个流输出
List<NonChainedOutput> outputsInOrder =
configuration.getVertexNonChainedOutputs(userCodeClassloader);
Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs =
CollectionUtil.newHashMapWithExpectedSize(outputsInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outputsInOrder.size()];
this.finishedOnRestoreInput =
this.isTaskDeployedAsFinished()
? new FinishedOnRestoreInput(
streamOutputs, configuration.getInputs(userCodeClassloader).length)
: null;
// from here on, we need to make sure that the output writers are shut down again on failure
// 从这里开始,我们需要确保输出写入器在失败时再次关闭
boolean success = false;
try {
//创建链输出
createChainOutputs(
outputsInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
recordWriterOutputs);
// we create the chain of operators and grab the collector that leads into the chain
//我们创建操作符链,并抓住通向链的收集器
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
this.mainOperatorOutput =
//负责创建整个OperatorChain中的算子以及算子输出。
createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
recordWriterOutputs,
allOpWrappers,
containingTask.getMailboxExecutorFactory(),
operatorFactory != null);
if (operatorFactory != null) {
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
mainOperatorOutput,
operatorEventDispatcher);
OP mainOperator = mainOperatorAndTimeService.f0;
mainOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
mainOperatorOutput.getWatermarkGauge());
this.mainOperatorWrapper =
createOperatorWrapper(
mainOperator,
containingTask,
configuration,
mainOperatorAndTimeService.f1,
true);
// add main operator to end of chain
//将主运算符添加到链的末尾
allOpWrappers.add(mainOperatorWrapper);
this.tailOperatorWrapper = allOpWrappers.get(0);
} else {
checkState(allOpWrappers.size() == 0);
this.mainOperatorWrapper = null;
this.tailOperatorWrapper = null;
}
this.chainedSources =
createChainedSources(
containingTask,
configuration.getInputs(userCodeClassloader),
chainedConfigs,
userCodeClassloader,
allOpWrappers);
this.numOperators = allOpWrappers.size();
firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
success = true;
} finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
//确保我们在获取第一个资源后发生故障时自行清理
if (!success) {
for (int i = 0; i < streamOutputs.length; i++) {
if (streamOutputs[i] != null) {
streamOutputs[i].close();
}
streamOutputs[i] = null;
}
}
}
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#createChainOutputs
private void createChainOutputs(
List<NonChainedOutput> outputsInOrder,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
Map<Integer, StreamConfig> chainedConfigs,
StreamTask<OUT, OP> containingTask,
Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs) {
for (int i = 0; i < outputsInOrder.size(); ++i) {
NonChainedOutput output = outputsInOrder.get(i);
RecordWriterOutput<?> recordWriterOutput =
//创建流输出
createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
output,
chainedConfigs.get(output.getSourceNodeId()),
containingTask.getEnvironment());
this.streamOutputs[i] = recordWriterOutput;
recordWriterOutputs.put(output.getDataSetId(), recordWriterOutput);
}
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#createStreamOutput
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
NonChainedOutput streamOutput,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
OutputTag sideOutputTag =
//OutputTag,如果不是sideOutput,则返回null
streamOutput.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer;
if (streamOutput.getOutputTag() != null) {
// side output
//侧输出
outSerializer =
upStreamConfig.getTypeSerializerSideOut(
streamOutput.getOutputTag(),
taskEnvironment.getUserCodeClassLoader().asClassLoader());
} else {
// main output
//主输出
outSerializer =
upStreamConfig.getTypeSerializerOut(
taskEnvironment.getUserCodeClassLoader().asClassLoader());
}
return closer.register(
new RecordWriterOutput<OUT>(
recordWriter,
outSerializer,
sideOutputTag,
streamOutput.supportsUnalignedCheckpoints()));
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#createOutputCollector
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory,
boolean shouldAddMetric) {
List<OutputWithChainingCheck<StreamRecord<T>>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
//为网络输出创建收集器
for (NonChainedOutput streamOutput :
operatorConfig.getOperatorNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> recordWriterOutput =
(RecordWriterOutput<T>) recordWriterOutputs.get(streamOutput.getDataSetId());
allOutputs.add(recordWriterOutput);
}
// Create collectors for the chained outputs
//为链式输出创建收集器
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
WatermarkGaugeExposingOutput<StreamRecord<T>> output =
//递归地创建, 运算符是尾对头创建的
createOperatorChain(
containingTask,
operatorConfig,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
recordWriterOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory,
shouldAddMetric);
checkState(output instanceof OutputWithChainingCheck);
allOutputs.add((OutputWithChainingCheck) output);
// If the operator has multiple downstream chained operators, only one of them should
// increment the recordsOutCounter for this operator. Set shouldAddMetric to false
// so that we would skip adding the counter to other downstream operators.
//如果运算符具有多个下游链接的运算符,则其中只有一个应递增此运算符的recordsOutCounter。
//将shouldAddMetric设置为false,以便我们跳过将计数器添加到其他下游运算符。
shouldAddMetric = false;
}
WatermarkGaugeExposingOutput<StreamRecord<T>> result;
if (allOutputs.size() == 1) {
result = allOutputs.get(0);
// only if this is a single RecordWriterOutput, reuse its numRecordOut for task.
//仅当这是单个RecordWriterOutput时,才对任务重用其numRecordOut。
if (result instanceof RecordWriterOutput) {
Counter numRecordsOutCounter = createNumRecordsOutCounter(containingTask);
((RecordWriterOutput<T>) result).setNumRecordsOut(numRecordsOutCounter);
}
} else {
// send to N outputs. Note that this includes the special case
// of sending to zero outputs
//发送到N个输出。请注意,这包括发送到零输出的特殊情况
@SuppressWarnings({"unchecked"})
OutputWithChainingCheck<StreamRecord<T>>[] allOutputsArray =
new OutputWithChainingCheck[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
allOutputsArray[i] = allOutputs.get(i);
}
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
//这与创建正常的ChainingOutput相反。如果链接输出不复制,
//我们需要在广播输出中复制,否则多链接将无法正常工作。
Counter numRecordsOutForTask = createNumRecordsOutCounter(containingTask);
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
result =
closer.register(
new CopyingBroadcastingOutputCollector<>(
allOutputsArray, numRecordsOutForTask));
} else {
result =
closer.register(
new BroadcastingOutputCollector<>(
allOutputsArray, numRecordsOutForTask));
}
}
if (shouldAddMetric) {
// Create a CountingOutput to increment the recordsOutCounter for this operator
// if we have not added the counter to any downstream chained operator.
//如果我们尚未将计数器添加到任何下游链接运算符,则创建CountingOutput以递增此运算符的recordsOutCounter。
Counter recordsOutCounter =
getOperatorRecordsOutCounter(containingTask, operatorConfig);
if (recordsOutCounter != null) {
result = new CountingOutput<>(result, recordsOutCounter);
}
}
return result;
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#createOperatorChain
//递归地创建从给定的 @ param operatorConfig开始的运算符链。
// 运算符是尾对头创建的,并包装到WatermarkGaugeExposingOutput中。
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
StreamTask<OUT, ?> containingTask,
StreamConfig prevOperatorConfig,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory,
boolean shouldAddMetricForPrevOperator) {
// create the output that the operator writes to first. this may recursively create more
// operators
//创建运算符首先写入的输出。这可能会递归地创建更多的运算符
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
recordWriterOutputs,
allOperatorWrappers,
mailboxExecutorFactory,
true);
OneInputStreamOperator<IN, OUT> chainedOperator =
createOperator(
containingTask,
operatorConfig,
userCodeClassloader,
chainedOperatorOutput,
allOperatorWrappers,
false);
return wrapOperatorIntoOutput(
chainedOperator,
containingTask,
prevOperatorConfig,
operatorConfig,
userCodeClassloader,
outputTag,
shouldAddMetricForPrevOperator);
}
org.apache.flink.streaming.runtime.tasks.OperatorChain#createOperator
//从给定的 @ param operatorConfig创建并返回单个运算符,该运算符将生成记录到 @ param输出
private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
ClassLoader userCodeClassloader,
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
boolean isHead) {
// now create the operator and give it the output collector to write its output to
//现在创建运算符并为其提供输出收集器以将其输出写入
Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorConfig.getStreamOperatorFactory(userCodeClassloader),
containingTask,
operatorConfig,
output,
operatorEventDispatcher);
OP chainedOperator = chainedOperatorAndTimeService.f0;
allOperatorWrappers.add(
createOperatorWrapper(
chainedOperator,
containingTask,
operatorConfig,
chainedOperatorAndTimeService.f1,
isHead));
chainedOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
output.getWatermarkGauge()::getValue);
return chainedOperator;
}
StreamTask的初始化过程※
在OperatorChain实例创建完后,紧接着调用init()方法,该方法主要由子类实现。常见的StreamTask子类包括OneInputStreamTask和SourceStreamTask等。
(1)、OneInputStreamTask的init()方法调用,主要目的是创建StreamInputProcessor实例,即核心数据处理器。
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask#init
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
int numberOfInputs = configuration.getNumberOfNetworkInputs();
if (numberOfInputs > 0) {
CheckpointedInputGate inputGate = createCheckpointedInputGate();
Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
DataOutput<IN> output = createDataOutput(numRecordsIn);
StreamTaskInput<IN> input = createTaskInput(inputGate);
StreamConfig.InputConfig[] inputConfigs =
configuration.getInputs(getUserCodeClassLoader());
StreamConfig.InputConfig inputConfig = inputConfigs[0];
if (requiresSorting(inputConfig)) {
checkState(
!configuration.isCheckpointingEnabled(),
"Checkpointing is not allowed with sorted inputs.");
input = wrapWithSorted(input);
}
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.reuseRecordsInputCounter(numRecordsIn);
//创建StreamInputProcessor实例,即核心数据处理器。
inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
}
mainOperator
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
//包装水印量规,因为注册的指标必须是唯一的
getEnvironment()
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
}
(2)、SourceStreamTask的init()方法调用,主要是判断SourceFunction是不是ExternallyInducedSource类型,是的话为SourceFunction设置checkpoint相关的东西等。
org.apache.flink.streaming.runtime.tasks.SourceStreamTask#init
protected void init() {
// we check if the source is actually inducing the checkpoints, rather
// than the trigger
//我们检查源是否实际上是诱导检查点,而不是触发器
SourceFunction<?> source = mainOperator.getUserFunction();
if (source instanceof ExternallyInducedSource) {
externallyInducedCheckpoints = true;
ExternallyInducedSource.CheckpointTrigger triggerHook =
new ExternallyInducedSource.CheckpointTrigger() {
@Override
public void triggerCheckpoint(long checkpointId) throws FlinkException {
// TODO - we need to see how to derive those. We should probably not
// encode this in the
// TODO - source's trigger message, but do a handshake in this task
// between the trigger
// TODO - message from the master, and the source's trigger
// notification
//我们需要看看如何得出这些。我们可能不应该源的触发消息中对此进行编码,
// 而是在此任务中,在来自主机的触发消息和源的触发通知之间进行握手
final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault(),
configuration.isExactlyOnceCheckpointMode(),
configuration.isUnalignedCheckpointsEnabled(),
configuration.getAlignedCheckpointTimeout().toMillis());
final long timestamp = System.currentTimeMillis();
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointId, timestamp, timestamp);
try {
SourceStreamTask.super
.triggerCheckpointAsync(
checkpointMetaData, checkpointOptions)
.get();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new FlinkException(e.getMessage(), e);
}
}
};
((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
}
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.gauge(
MetricNames.CHECKPOINT_START_DELAY_TIME,
this::getAsyncCheckpointStartDelayNanos);
recordWriter.setMaxOverdraftBuffersPerGate(0);
}
Operator的初始化过程※
调用restoreStateAndGates()方法,在该方法中有一步重要的操作就是初始化算子链中的各个算子。
org.apache.flink.streaming.runtime.tasks.StreamTask#restoreStateAndGates
private CompletableFuture<Void> restoreStateAndGates(
SubTaskInitializationMetricsBuilder initializationMetrics) throws Exception {
long mailboxStartTs = SystemClock.getInstance().absoluteTimeMillis();
initializationMetrics.addDurationMetric(
MAILBOX_START_DURATION,
mailboxStartTs - initializationMetrics.getInitializationStartTs());
SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
reader.readOutputData(
getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops());
long readOutputDataTs = SystemClock.getInstance().absoluteTimeMillis();
initializationMetrics.addDurationMetric(
READ_OUTPUT_DATA_DURATION, readOutputDataTs - mailboxStartTs);
//调用算子链实例的initializeStateAndOpenOperators(...)方法。
operatorChain.initializeStateAndOpenOperators(
createStreamTaskStateInitializer(initializationMetrics));
initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis();
initializationMetrics.addDurationMetric(
INITIALIZE_STATE_DURATION, initializeStateEndTs - readOutputDataTs);
IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
channelIOExecutor.execute(
() -> {
try {
//读取输入数据
reader.readInputData(inputGates);
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
// We wait for all input channel state to recover before we go into RUNNING state, and thus
// start checkpointing. If we implement incremental checkpointing of input channel state
// we must make sure it supports CheckpointType#FULL_CHECKPOINT
//在进入 RUNNING 状态之前,我们等待所有输入通道状态恢复,从而开始检查点。
// 如果我们实现输入通道状态的增量检查点,我们必须确保它支持 CheckpointTypeFULL_CHECKPOINT
List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length);
for (InputGate inputGate : inputGates) {
recoveredFutures.add(inputGate.getStateConsumedFuture());
inputGate
.getStateConsumedFuture()
.thenRun(
() ->
mainMailboxExecutor.execute(
//请求数据操作。
inputGate::requestPartitions,
"Input gate request partitions"));
}
return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0]))
.thenRun(mailboxProcessor::suspend);
}
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#initializeStateAndOpenOperators
public void initializeStateAndOpenOperators(
StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
//提供上下文以初始化运算符中的所有状态。
//主要负责初始化Operator成员变量operatorStateBackend、keyedStateBackend以及keyedStateStore等。
operator.initializeState(streamTaskStateInitializer);
//在处理任何元素之前立即调用此方法,它应包含运算符的初始化逻辑。
//调用每个Operator的open()方法,继而调用UDF函数中open()方法。
operator.open();
}
}
org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2#initializeState
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
getProcessingTimeService(),
this,
keySerializer,
cancelables,
metrics,
config.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
runtimeContext.getJobConfiguration(),
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
runtimeContext.getUserCodeClassLoader()),
isUsingCustomRawKeyedState());
stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
timeServiceManager = context.internalTimerServiceManager();
//初始化操作员状态
stateHandler.initializeOperatorState(this);
if (useSplittableTimers()
&& areSplittableTimersConfigured()
&& getTimeServiceManager().isPresent()) {
watermarkProcessor =
new MailboxWatermarkProcessor(
output, mailboxExecutor, getTimeServiceManager().get());
}
}
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator#open
public void open() throws Exception {
super.open();
//调用UDF函数中open()方法。
FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE);
}
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
public static void openFunction(Function function, OpenContext openContext) throws Exception {
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(openContext);
}
}
以上即为StreamTask的启动过程前的准备过程。
Task读数据概述※
在调用完子类的init()方法后,开始触发mailboxProcessor.runMailboxLoop();代码行的执行,由方法runMailboxLoop()可知,其主要作用是触发mailboxDefaultAction默认动作的执行,即StreamTask.processInput(...)方法的执行。
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
//过程输入
DataInputStatus status = inputProcessor.processInput();
switch (status) {
case MORE_AVAILABLE:
if (taskIsAvailable()) {
return;
}
break;
case NOTHING_AVAILABLE:
break;
case END_OF_RECOVERY:
throw new IllegalStateException("We should not receive this event here.");
case STOPPED:
endData(StopMode.NO_DRAIN);
return;
case END_OF_DATA:
endData(StopMode.DRAIN);
notifyEndOfData();
return;
case END_OF_INPUT:
// Suspend the mailbox processor, it would be resumed in afterInvoke and finished
// after all records processed by the downstream tasks. We also suspend the default
// actions to avoid repeat executing the empty default operation (namely process
// records).
//挂起mailbox处理器,它将在afterInvoke中恢复,并在下游任务处理完所有记录后完成。
// 我们还暂停默认操作,以避免重复执行空的默认操作 (即处理记录)。
controller.suspendDefaultAction();
mailboxProcessor.suspend();
return;
}
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
PeriodTimer timer;
CompletableFuture<?> resumeFuture;
if (!recordWriter.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
resumeFuture = recordWriter.getAvailableFuture();
} else if (!inputProcessor.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
} else if (changelogWriterAvailabilityProvider != null
&& !changelogWriterAvailabilityProvider.isAvailable()) {
// waiting for changelog availability is reported as busy
//等待变更日志可用性被报告为繁忙
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
// 数据可用性同时发生了变化;立即重试
return;
}
assertNoException(
resumeFuture.thenRun(
new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
}
子类SourceStreamTask.processInput(...)方法执行如下:
org.apache.flink.streaming.runtime.tasks.SourceStreamTask#processInput
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
controller.suspendDefaultAction();
// Against the usual contract of this method, this implementation is not step-wise but
// blocking instead for
// compatibility reasons with the current source interface (source functions run as a loop,
// not in steps).
//与此方法的通常约定不同,此实现不是逐步实现,
// 而是出于与当前源接口的兼容性原因而阻塞(源函数作为循环运行,而不是逐步运行)。
sourceThread.setTaskDescription(getName());
// 调用run方法
sourceThread.start();
sourceThread
.getCompletionFuture()
.whenComplete(
(Void ignore, Throwable sourceThreadThrowable) -> {
if (sourceThreadThrowable != null) {
mailboxProcessor.reportThrowable(sourceThreadThrowable);
} else {
notifyEndOfData();
mailboxProcessor.suspend();
}
});
}
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.LegacySourceFunctionThread#run
@Override
public void run() {
try {
if (!operatorChain.isTaskDeployedAsFinished()) {
LOG.debug(
"Legacy source {} skip execution since the task is finished on restore",
getTaskNameWithSubtaskAndId());
mainOperator.run(lock, operatorChain);
}
completeProcessing();
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
// 注意,t也可以是InterruptedException
if (isCanceled()
&& ExceptionUtils.findThrowable(t, InterruptedException.class)
.isPresent()) {
completionFuture.completeExceptionally(new CancelTaskException(t));
} else {
completionFuture.completeExceptionally(t);
}
}
}
org.apache.flink.streaming.api.operators.StreamSource#run
public void run(final Object lockingObject, final OperatorChain<?, ?> operatorChain)
throws Exception {
run(lockingObject, output, operatorChain);
}
org.apache.flink.streaming.api.operators.StreamSource#run
public void run(
final Object lockingObject,
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain)
throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final Configuration configuration =
this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
//延迟跟踪间隔
final long latencyTrackingInterval =
getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.get(MetricOptions.LATENCY_INTERVAL).toMillis();
LatencyMarkerEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter =
new LatencyMarkerEmitter<>(
getProcessingTimeService(),
collector::emitLatencyMarker,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
}
//水印间隔
final long watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
//根据时间特性 ,返回适当的SourceFunction.SourceContext
this.ctx =
StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
collector,
watermarkInterval,
-1,
emitProgressiveWatermarks);
try {
//启动源。实现使用SourceFunction.SourceContext发出元素。
userFunction.run(ctx);
} finally {
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
以上为概要的Task读取数据的过程
参考资料: