上一篇解析到Flink Task写数据过程,数据最终被写入ResultPartition结果分区模型中。每个ResultPartition实例都包含一到多个ResultSubPartition结果子分区,经过RecordWriter分区器分配后数据以NetworkBuffer形式存放在特定结果子分区buffers集合中,等待被下游Task消费。
本篇继续解析Flink Task读数据过程及用到的数据模型。
这两篇只涉及到Flink Task线程的数据读写过程,不涉及上下游Task之间的网络数据请求及传输,上下游Task之间数据请求及传输过程基于信用的Netty通信机制实现,该机制将在下篇讲解。
StreamTask※
StreamTask是算子的执行容器,可以从StreamTask执行中解析Flink Task读数据过程。之前讲解到MailBox线程执行模型默认动作是StreamTask类的processInput(...)方法,该方法承担着StreamTask读数据的入口。本篇就以processInput(...)方法为起点解析Task读数据构成。在Task启动初始化后触发processInput(...)方法的循环调用。
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop
/**
* Runs the mailbox processing loop. This is where the main work is done. This loop can be
* suspended at any time by calling {@link #suspend()}. For resuming the loop this method should
* be called again.
*/
//运行Mailbox处理循环。这是主要工作完成的地方。
//这个循环可以通过调用suspend() 在任何时候暂停。为了恢复循环,应该再次调用此方法。
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
//阻塞的 “processmail' 调用将不会返回,直到默认操作可用。
processMail(localMailbox, false);
if (isNextLoopPossible()) {
//根据需要在默认操作中获取锁定
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput
/**
* This method implements the default action of the task (e.g. processing one event from the
* input). Implementations should (in general) be non-blocking.
*
* @param controller controller object for collaborative interaction between the action and the
* stream task.
* @throws Exception on any problems in the action.
*/
//此方法实现任务的默认操作 (例如,处理来自输入的一个事件)。实现应该 (通常) 是非阻塞的。
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
//过程输入
DataInputStatus status = inputProcessor.processInput();
// 忽略其他
}
以StreamOneInputProcessor实现为例,processInput()方法中StreamTaskInput.emitNext(output)方法用来读取数据。
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processInput
@Override
public DataInputStatus processInput() throws Exception {
//用来读取数据
DataInputStatus status = input.emitNext(output);
if (status == DataInputStatus.END_OF_DATA) {
endOfInputAware.endInput(input.getInputIndex() + 1);
output = new FinishedDataOutput<>();
} else if (status == DataInputStatus.END_OF_RECOVERY) {
if (input instanceof RecoverableStreamTaskInput) {
input = ((RecoverableStreamTaskInput<IN>) input).finishRecovery();
}
return DataInputStatus.MORE_AVAILABLE;
}
return status;
}
AbstractStreamTaskNetworkInput类实现中,processElement(...)方法借助output入参将数据传递到具体算子的UDF函数中执行。而数据读取操作在checkpointedInputGate.pollNext()方法中实现。
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput#emitNext
@Override
public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
//从反序列化程序获取流元素
if (currentRecordDeserializer != null) {
RecordDeserializer.DeserializationResult result;
try {
result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
} catch (IOException e) {
throw new IOException(
String.format("Can't get next record for channel %s", lastChannel), e);
}
if (result.isBufferConsumed()) {
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
final boolean breakBatchEmitting =
//借助output入参将数据传递到具体算子的UDF函数中执行
processElement(deserializationDelegate.getInstance(), output);
if (canEmitBatchOfRecords.check() && !breakBatchEmitting) {
continue;
}
return DataInputStatus.MORE_AVAILABLE;
}
}
//数据读取操作在checkpointedInputGate.pollNext()方法中实现。
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint
// mode
//收到检查点屏障后返回mailbox,避免在未对齐检查点模式下执行检查点之前处理屏障后的数据
if (bufferOrEvent.get().isBuffer()) {
processBuffer(bufferOrEvent.get());
} else {
DataInputStatus status = processEvent(bufferOrEvent.get());
if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) {
continue;
}
return status;
}
} else {
if (checkpointedInputGate.isFinished()) {
checkState(
checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
return DataInputStatus.END_OF_INPUT;
}
return DataInputStatus.NOTHING_AVAILABLE;
}
}
}
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#pollNext
@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
//读取数据。
Optional<BufferOrEvent> next = inputGate.pollNext();
if (!next.isPresent()) {
return handleEmptyBuffer();
}
BufferOrEvent bufferOrEvent = next.get();
if (bufferOrEvent.isEvent()) {
return handleEvent(bufferOrEvent);
} else if (bufferOrEvent.isBuffer()) {
/**
* https://issues.apache.org/jira/browse/FLINK-19537 This is not entirely true, as it's
* ignoring the buffer/bytes accumulated in the record deserializers. If buffer is
* processed here, it doesn't mean it was fully processed (so we can over estimate the
* amount of processed bytes). On the other hand some records/bytes might be processed
* without polling anything from this {@link CheckpointedInputGate} (underestimating the
* amount of processed bytes). All in all this should have been calculated on the {@link
* StreamTaskNetworkInput} level, where we have an access to the records deserializers.
* However the current is on average accurate and it might be just good enough (at least
* for the time being).
*/
//https://issues.apache.org/jira/browse/FLINK-19537
// 这不完全正确,因为它忽略了记录反序列化程序中累积的bufferbytes。
// 如果缓冲区在这里处理,这并不意味着它被完全处理 (所以我们可以高估处理字节的数量)。
// 另一方面,一些recordsbytes可能会被处理,而不会从这个 {@ link CheckpointedInputGate}
// 轮询任何东西 (低估了已处理字节的数量)。所有这些都应该在 {@ link StreamTaskNetworkInput}
// 级别上进行计算,在该级别上我们可以访问记录反序列化程序。
// 然而,电流是平均准确的,它可能只是不够好 (至少暂时)。
barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());
}
return next;
}
以上是StreamTask阶段读数据过程,下面会转入到SingleInputGate读数据执行过程中。
SingleInputGate※
InputGate的实现类主要有两个,一是SingleInputGate、二是UnionInputGate。读数据过程常见的是SingleInputGate。UnionInputGate是将多个SingleInputGate合并在一起的InputGate,例如CoStreamMap等算子从上游两个流中输入数据,用的就是UnionInputGate,下面着重解析SingleInputGate。
SingleInputGate
先来看下SingleInputGate成员结构,下面是SingleInputGate成员变量的含义:
private final int gateIndex:代表当前Task消费的上游Task的下标,大部分情况下一个算子只有一个上游输入,如果有多个上游输入,gateIndex变量标识哪个上游输入。
private final IntermediateDataSetID consumedResultId:代表当前Task消费的上游算子的中间结果集。
private final ResultPartitionType consumedPartitionType:代表当前InputGate的消费分区类型,Flink流式应用一般都是PIPELINED_BOUNDED模式,采用有限个buffer缓存来支持上下游同时生产和消费数据。
private final int numberOfInputChannels:代表有多少个上游结果子分区输入。
private final Map<IntermediateResultPartitionID, Map<InputChannelInfo, InputChannel>> inputChannels;:代表上游结果子分区输入明细。
private final InputChannel[] channels:代表上游结果子分区输入明细。
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>():当前可读取数据的InputChannel。
private BufferPool bufferPool:代表SingleInputGate的本地buffer池,用来缓存读取到的数据。
SingleInputGate读数据过程。※
继续解析SingleInputGate.pollNext()方法读取数据过程。
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#pollNext
@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
return getNextBufferOrEvent(false);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
throws IOException, InterruptedException {
if (hasReceivedAllEndOfPartitionEvents) {
return Optional.empty();
}
if (closeFuture.isDone()) {
throw new CancelTaskException("Input gate is already closed.");
}
//获取可用的InputChannel、数据Buffer等信息。
Optional<InputWithData<InputChannel, Buffer>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
throughputCalculator.pauseMeasurement();
return Optional.empty();
}
throughputCalculator.resumeMeasurement();
InputWithData<InputChannel, Buffer> inputWithData = next.get();
final BufferOrEvent bufferOrEvent =
//转换为缓冲区或事件 里面会释放缓冲区
transformToBufferOrEvent(
inputWithData.data,
inputWithData.moreAvailable,
inputWithData.input,
inputWithData.morePriorityEvents);
throughputCalculator.incomingDataSize(bufferOrEvent.getSize());
return Optional.of(bufferOrEvent);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#waitAndGetNextData
private Optional<InputWithData<InputChannel, Buffer>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
synchronized (inputChannelsWithData) {
//获取当前可读取数据的InChannel信息
Optional<InputChannel> inputChannelOpt = getChannel(blocking);
if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
final InputChannel inputChannel = inputChannelOpt.get();
//读取恢复或正常缓冲区。
Optional<Buffer> buffer = readRecoveredOrNormalBuffer(inputChannel);
if (!buffer.isPresent()) {
checkUnavailability();
continue;
}
int numSubpartitions = inputChannel.getConsumedSubpartitionIndexSet().size();
if (numSubpartitions > 1) {
switch (buffer.get().getDataType()) {
case END_OF_DATA:
endOfDatas[inputChannel.getChannelIndex()]++;
if (endOfDatas[inputChannel.getChannelIndex()] < numSubpartitions) {
buffer.get().recycleBuffer();
continue;
}
break;
case END_OF_PARTITION:
endOfPartitions[inputChannel.getChannelIndex()]++;
if (endOfPartitions[inputChannel.getChannelIndex()]
< numSubpartitions) {
buffer.get().recycleBuffer();
continue;
}
break;
default:
break;
}
}
final boolean morePriorityEvents =
inputChannelsWithData.getNumPriorityElements() > 0;
if (buffer.get().getDataType().hasPriority()) {
if (!morePriorityEvents) {
priorityAvailabilityHelper.resetUnavailable();
}
}
checkUnavailability();
return Optional.of(
new InputWithData<>(
inputChannel,
buffer.get(),
!inputChannelsWithData.isEmpty(),
morePriorityEvents));
}
}
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getChannel(boolean)
private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
assert Thread.holdsLock(inputChannelsWithData);
while (inputChannelsWithData.isEmpty()) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Released");
}
if (blocking) {
inputChannelsWithData.wait();
} else {
availabilityHelper.resetUnavailable();
return Optional.empty();
}
}
InputChannel inputChannel = inputChannelsWithData.poll();
enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
return Optional.of(inputChannel);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#readRecoveredOrNormalBuffer
private Optional<Buffer> readRecoveredOrNormalBuffer(InputChannel inputChannel)
throws IOException, InterruptedException {
// Firstly, read the buffers from the recovered channel
// 首先,从恢复的通道读取缓冲区
if (inputChannel instanceof RecoveredInputChannel && !inputChannel.isReleased()) {
//从可读取数据InputChannel中开始获取数据。
Optional<Buffer> buffer = readBufferFromInputChannel(inputChannel);
if (!((RecoveredInputChannel) inputChannel).getStateConsumedFuture().isDone()) {
return buffer;
}
}
// After the recovered buffers are read, read the normal buffers
// 读取恢复的缓冲区后,读取正常缓冲区
return enabledTieredStorage()
? readBufferFromTieredStore(inputChannel)
: readBufferFromInputChannel(inputChannel);
}
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#readBufferFromInputChannel
private Optional<Buffer> readBufferFromInputChannel(InputChannel inputChannel)
throws IOException, InterruptedException {
//获取下一个缓冲区
Optional<BufferAndAvailability> bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
if (!bufferAndAvailabilityOpt.isPresent()) {
return Optional.empty();
}
final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
if (bufferAndAvailability.moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
//将 inputChannel 添加到最后队列以避免饥饿
queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
}
if (bufferAndAvailability.hasPriority()) {
lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
bufferAndAvailability.getSequenceNumber();
}
Buffer buffer = bufferAndAvailability.buffer();
if (buffer.getDataType() == Buffer.DataType.RECOVERY_METADATA) {
RecoveryMetadata recoveryMetadata =
(RecoveryMetadata)
EventSerializer.fromSerializedEvent(
buffer.getNioBufferReadable(), getClass().getClassLoader());
lastBufferStatusMapInTieredStore.put(
inputChannel.getChannelIndex(),
Tuple2.of(
buffer.getDataType().isPartialRecord(),
recoveryMetadata.getFinalBufferSubpartitionId()));
}
return Optional.of(bufferAndAvailability.buffer());
}
InputChannel※
在实际生产环境中,上下游Task有可能被部署到同一个机器节点上,这时InputChannel类型就是LocalInputChannel。也有可能被部署到不同的机器节点上,这时InputChannel类型就是RemoteInputChannel。下面分别解析这两种情况下的InputChannel类型。
关键成员结构※
LocalInputChannel※
private final ResultPartitionManager partitionManager:收集管理结果分区
private volatile ResultSubpartitionView subpartitionView:结果子分区视图,封装了ResultSubpartition中读取数据、释放资源等行为。其中getNextBuffer()方法定义了获取Buffer数据的过程。
RemoteInputChannel※
private final ConnectionManager connectionManager:与其他节点的通信连接管理组件。
private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>():当前从上游接收到的Buffer队列,后续会被算子消费处理。
private volatile PartitionRequestClient partitionRequestClient:和上游Task的连接客户端,Flink是用Netty通信框架实现节点之间的数据通信过程。
private final int initialCredit:初始信用凭证,代表该InputChannel独有的初始Buffer数量。
private final BufferManager bufferManager:Buffer管理器。
InputChannel读取数据过程※
上节SingleInputGate最后解析到InputChannel.getNextBuffer()开始获取Buffer数据。下面就两种InputChannel类型分别介绍getNextBuffer()方法实现过程。
LocalInputChannel※
LocalInputChannel类型意味着上下游Task在同一个机器节点上,上篇讲解到数据Buffer最后会被存储在PipelinedSubpartition.buffers队列集合中。而LocalInputChannel类型会以同步的形式从PipelinedSubpartition.buffers获取数据,实现同一个机器节点上下游Task数据写和读操作。
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel#getNextBuffer
//定义了获取Buffer数据的过程
@Override
public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkError();
ResultSubpartitionView subpartitionView = this.subpartitionView;
if (subpartitionView == null) {
// There is a possible race condition between writing a EndOfPartitionEvent (1) and
// flushing (3) the Local
// channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush
// notification (4). When
// they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue
// LocalInputChannel after (or
// during) it was released during reading the EndOfPartitionEvent (2).
//在写入EndOfPartitionEvent (1) 和刷新 (3) 发送方端的本地通道与读取EndOfPartitionEvent
// (2) 和处理刷新通知 (4) 之间可能存在竞争条件。
// 当它们按顺序发生时 (1-2-3-4),刷新通知可以在读取EndOfPartitionEvent
// (2) 期间释放LocalInputChannel之后 (或期间) 重新排队。
if (isReleased) {
return Optional.empty();
}
// this can happen if the request for the partition was triggered asynchronously
// by the time trigger
// would be good to avoid that, by guaranteeing that the requestPartition() and
// getNextBuffer() always come from the same thread
// we could do that by letting the timer insert a special "requesting channel" into the
// input gate's queue
//如果分区的请求是由时间触发器异步触发的,则可以避免这种情况,通过保证requestPartition()
// 和getNextBuffer() 总是来自同一个线程,我们可以让定时器插入一个特殊的 “请求通道” 到inputGate的队列
subpartitionView = checkAndWaitForSubpartitionView();
}
//获取下一个缓冲区
BufferAndBacklog next = subpartitionView.getNextBuffer();
// ignore the empty buffer directly
// 直接忽略空缓冲区
while (next != null && next.buffer().readableBytes() == 0) {
next.buffer().recycleBuffer();
next = subpartitionView.getNextBuffer();
numBuffersIn.inc();
}
if (next == null) {
if (subpartitionView.isReleased()) {
throw new CancelTaskException(
"Consumed partition " + subpartitionView + " has been released.");
} else {
return Optional.empty();
}
}
Buffer buffer = next.buffer();
if (buffer instanceof FileRegionBuffer) {
buffer = ((FileRegionBuffer) buffer).readInto(inputGate.getUnpooledSegment());
}
if (buffer instanceof CompositeBuffer) {
buffer = ((CompositeBuffer) buffer).getFullBufferData(inputGate.getUnpooledSegment());
}
numBytesIn.inc(buffer.readableBytes());
numBuffersIn.inc();
channelStatePersister.checkForBarrier(buffer);
channelStatePersister.maybePersist(buffer);
NetworkActionsLogger.traceInput(
"LocalInputChannel#getNextBuffer",
buffer,
inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
next.getSequenceNumber());
return Optional.of(
new BufferAndAvailability(
buffer,
next.getNextDataType(),
next.buffersInBacklog(),
next.getSequenceNumber()));
}
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView#getNextBuffer
@Nullable
@Override
public BufferAndBacklog getNextBuffer() {
//轮询缓冲区
return parent.pollBuffer();
}
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#pollBuffer
@Nullable
BufferAndBacklog pollBuffer() {
synchronized (buffers) {
if (isBlocked) {
return null;
}
Buffer buffer = null;
if (buffers.isEmpty()) {
flushRequested = false;
}
while (!buffers.isEmpty()) {
//获取buffers队列的数据
BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength =
buffers.peek();
BufferConsumer bufferConsumer =
bufferConsumerWithPartialRecordLength.getBufferConsumer();
if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
== bufferConsumer.getDataType()) {
completeTimeoutableCheckpointBarrier(bufferConsumer);
}
buffer = buildSliceBuffer(bufferConsumerWithPartialRecordLength);
checkState(
bufferConsumer.isFinished() || buffers.size() == 1,
"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");
if (buffers.size() == 1) {
// turn off flushRequested flag if we drained all the available data
//如果我们耗尽了所有可用数据,请关闭flushRequested标志
flushRequested = false;
}
if (bufferConsumer.isFinished()) {
requireNonNull(buffers.poll()).getBufferConsumer().close();
//从自己的buffers队列里获取数据后会将自己的buffersInBacklog变量减1,代表结果子分区数据积压数减1。
decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
}
// if we have an empty finished buffer and the exclusive credit is 0, we just return
// the empty buffer so that the downstream task can release the allocated credit for
// this empty buffer, this happens in two main scenarios currently:
// 1. all data of a buffer builder has been read and after that the buffer builder
// is finished
// 2. in approximate recovery mode, a partial record takes a whole buffer builder
//如果我们有一个空的已完成缓冲区,并且独占信用为0,
//我们只是返回空缓冲区,以便下游任务可以释放为此空缓冲区分配的信用,
// 目前在两种主要情况下会发生这种情况:
// 1.buffer builder的所有数据已被读取,之后buffer builder完成
// 2.在近似恢复模式下,部分记录将占用整个buffer builder
if (receiverExclusiveBuffersPerChannel == 0 && bufferConsumer.isFinished()) {
break;
}
if (buffer.readableBytes() > 0) {
break;
}
//释放此缓冲区一次,即,如果引用计数达到0 ,则减少引用计数并回收缓冲区。
buffer.recycleBuffer();
buffer = null;
if (!bufferConsumer.isFinished()) {
break;
}
}
if (buffer == null) {
return null;
}
if (buffer.getDataType().isBlockingUpstream()) {
isBlocked = true;
}
updateStatistics(buffer);
// Do not report last remaining buffer on buffers as available to read (assuming it's
// unfinished).
// It will be reported for reading either on flush or when the number of buffers in the
// queue
// will be 2 or more.
//不要将缓冲区上最后剩余的缓冲区报告为可读取 (假设它未完成)。在刷新时或当队列中的缓冲区数量为2或更多时,将报告读取。
NetworkActionsLogger.traceOutput(
"PipelinedSubpartition#pollBuffer",
buffer,
parent.getOwningTaskName(),
subpartitionInfo);
return new BufferAndBacklog(
buffer,
getBuffersInBacklogUnsafe(),
isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
sequenceNumber++);
}
}
RemoteInputChannel※
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#getNextBuffer
@Override
public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkPartitionRequestQueueInitialized();
final SequenceBuffer next;
final DataType nextDataType;
synchronized (receivedBuffers) {
//以同步的方式从receivedBuffers队列里获取一个Buffer数据并返回
//而receivedBuffers队列Buffer数据添加动作是在Netty通信过程中发生的。
//通过CreditBasedPartitionRequestClientHandler处理器解析获取的数据并添加到receivedBuffers队列。
next = receivedBuffers.poll();
if (next != null) {
totalQueueSizeInBytes -= next.buffer.getSize();
}
nextDataType =
receivedBuffers.peek() != null
? receivedBuffers.peek().buffer.getDataType()
: DataType.NONE;
}
if (next == null) {
if (isReleased.get()) {
throw new CancelTaskException(
"Queried for a buffer after channel has been released.");
}
return Optional.empty();
}
NetworkActionsLogger.traceInput(
"RemoteInputChannel#getNextBuffer",
next.buffer,
inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
next.sequenceNumber);
numBytesIn.inc(next.buffer.getSize());
numBuffersIn.inc();
return Optional.of(
new BufferAndAvailability(next.buffer, nextDataType, 0, next.sequenceNumber));
}
以上即为Flink Task读数据过程解析。通过Flink Task读写数据介绍,我们知道了Task数据的基本操作,下篇继续介绍ResultSubPartition的buffers数据是怎么发送给下游的,RemoteInputChannel的数据是怎么通过网络传输得到的。
参考资料