Flink V1.20源码阅读笔记(5.9)- Flink Task读数据过程解析

-
-
2025-03-18

上一篇解析到Flink Task写数据过程,数据最终被写入ResultPartition结果分区模型中。每个ResultPartition实例都包含一到多个ResultSubPartition结果子分区,经过RecordWriter分区器分配后数据以NetworkBuffer形式存放在特定结果子分区buffers集合中,等待被下游Task消费。

本篇继续解析Flink Task读数据过程及用到的数据模型。

这两篇只涉及到Flink Task线程的数据读写过程,不涉及上下游Task之间的网络数据请求及传输,上下游Task之间数据请求及传输过程基于信用的Netty通信机制实现,该机制将在下篇讲解。

 

StreamTask

StreamTask是算子的执行容器,可以从StreamTask执行中解析Flink Task读数据过程。之前讲解到MailBox线程执行模型默认动作是StreamTask类的processInput(...)方法,该方法承担着StreamTask读数据的入口。本篇就以processInput(...)方法为起点解析Task读数据构成。在Task启动初始化后触发processInput(...)方法的循环调用。

 

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop

    /**
     * Runs the mailbox processing loop. This is where the main work is done. This loop can be
     * suspended at any time by calling {@link #suspend()}. For resuming the loop this method should
     * be called again.
     */
    //运行Mailbox处理循环。这是主要工作完成的地方。
    //这个循环可以通过调用suspend() 在任何时候暂停。为了恢复循环,应该再次调用此方法。
    public void runMailboxLoop() throws Exception {
        suspended = !mailboxLoopRunning;

        final TaskMailbox localMailbox = mailbox;

        checkState(
                localMailbox.isMailboxThread(),
                "Method must be executed by declared mailbox thread!");

        assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

        final MailboxController mailboxController = new MailboxController(this);

        while (isNextLoopPossible()) {
            // The blocking `processMail` call will not return until default action is available.
            //阻塞的 “processmail' 调用将不会返回,直到默认操作可用。
            processMail(localMailbox, false);
            if (isNextLoopPossible()) {
                //根据需要在默认操作中获取锁定
                mailboxDefaultAction.runDefaultAction(
                        mailboxController); // lock is acquired inside default action as needed
            }
        }
    }

 

org.apache.flink.streaming.runtime.tasks.StreamTask#processInput

    /**
     * This method implements the default action of the task (e.g. processing one event from the
     * input). Implementations should (in general) be non-blocking.
     *
     * @param controller controller object for collaborative interaction between the action and the
     *     stream task.
     * @throws Exception on any problems in the action.
     */
    //此方法实现任务的默认操作 (例如,处理来自输入的一个事件)。实现应该 (通常) 是非阻塞的。
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        //过程输入
        DataInputStatus status = inputProcessor.processInput();
		// 忽略其他
    }

 

以StreamOneInputProcessor实现为例,processInput()方法中StreamTaskInput.emitNext(output)方法用来读取数据。

org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processInput

    @Override
    public DataInputStatus processInput() throws Exception {
        //用来读取数据
        DataInputStatus status = input.emitNext(output);

        if (status == DataInputStatus.END_OF_DATA) {
            endOfInputAware.endInput(input.getInputIndex() + 1);
            output = new FinishedDataOutput<>();
        } else if (status == DataInputStatus.END_OF_RECOVERY) {
            if (input instanceof RecoverableStreamTaskInput) {
                input = ((RecoverableStreamTaskInput<IN>) input).finishRecovery();
            }
            return DataInputStatus.MORE_AVAILABLE;
        }

        return status;
    }

 

AbstractStreamTaskNetworkInput类实现中,processElement(...)方法借助output入参将数据传递到具体算子的UDF函数中执行。而数据读取操作在checkpointedInputGate.pollNext()方法中实现。

org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput#emitNext

    @Override
    public DataInputStatus emitNext(DataOutput<T> output) throws Exception {

        while (true) {
            // get the stream element from the deserializer
            //从反序列化程序获取流元素
            if (currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result;
                try {
                    result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                } catch (IOException e) {
                    throw new IOException(
                            String.format("Can't get next record for channel %s", lastChannel), e);
                }
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    final boolean breakBatchEmitting =
                            //借助output入参将数据传递到具体算子的UDF函数中执行
                            processElement(deserializationDelegate.getInstance(), output);
                    if (canEmitBatchOfRecords.check() && !breakBatchEmitting) {
                        continue;
                    }
                    return DataInputStatus.MORE_AVAILABLE;
                }
            }

            //数据读取操作在checkpointedInputGate.pollNext()方法中实现。
            Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
            if (bufferOrEvent.isPresent()) {
                // return to the mailbox after receiving a checkpoint barrier to avoid processing of
                // data after the barrier before checkpoint is performed for unaligned checkpoint
                // mode
                //收到检查点屏障后返回mailbox,避免在未对齐检查点模式下执行检查点之前处理屏障后的数据
                if (bufferOrEvent.get().isBuffer()) {
                    processBuffer(bufferOrEvent.get());
                } else {
                    DataInputStatus status = processEvent(bufferOrEvent.get());
                    if (status == DataInputStatus.MORE_AVAILABLE && canEmitBatchOfRecords.check()) {
                        continue;
                    }
                    return status;
                }
            } else {
                if (checkpointedInputGate.isFinished()) {
                    checkState(
                            checkpointedInputGate.getAvailableFuture().isDone(),
                            "Finished BarrierHandler should be available");
                    return DataInputStatus.END_OF_INPUT;
                }
                return DataInputStatus.NOTHING_AVAILABLE;
            }
        }
    }

 

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#pollNext

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        //读取数据。
        Optional<BufferOrEvent> next = inputGate.pollNext();

        if (!next.isPresent()) {
            return handleEmptyBuffer();
        }

        BufferOrEvent bufferOrEvent = next.get();

        if (bufferOrEvent.isEvent()) {
            return handleEvent(bufferOrEvent);
        } else if (bufferOrEvent.isBuffer()) {
            /**
             * https://issues.apache.org/jira/browse/FLINK-19537 This is not entirely true, as it's
             * ignoring the buffer/bytes accumulated in the record deserializers. If buffer is
             * processed here, it doesn't mean it was fully processed (so we can over estimate the
             * amount of processed bytes). On the other hand some records/bytes might be processed
             * without polling anything from this {@link CheckpointedInputGate} (underestimating the
             * amount of processed bytes). All in all this should have been calculated on the {@link
             * StreamTaskNetworkInput} level, where we have an access to the records deserializers.
             * However the current is on average accurate and it might be just good enough (at least
             * for the time being).
             */
            //https://issues.apache.org/jira/browse/FLINK-19537
            // 这不完全正确,因为它忽略了记录反序列化程序中累积的bufferbytes。
            // 如果缓冲区在这里处理,这并不意味着它被完全处理 (所以我们可以高估处理字节的数量)。
            // 另一方面,一些recordsbytes可能会被处理,而不会从这个 {@ link CheckpointedInputGate}
            // 轮询任何东西 (低估了已处理字节的数量)。所有这些都应该在 {@ link StreamTaskNetworkInput}
            // 级别上进行计算,在该级别上我们可以访问记录反序列化程序。
            // 然而,电流是平均准确的,它可能只是不够好 (至少暂时)。
            barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());
        }
        return next;
    }

 

以上是StreamTask阶段读数据过程,下面会转入到SingleInputGate读数据执行过程中。

 

SingleInputGate

InputGate的实现类主要有两个,一是SingleInputGate、二是UnionInputGate。读数据过程常见的是SingleInputGate。UnionInputGate是将多个SingleInputGate合并在一起的InputGate,例如CoStreamMap等算子从上游两个流中输入数据,用的就是UnionInputGate,下面着重解析SingleInputGate。

SingleInputGate
先来看下SingleInputGate成员结构,下面是SingleInputGate成员变量的含义:

private final int gateIndex:代表当前Task消费的上游Task的下标,大部分情况下一个算子只有一个上游输入,如果有多个上游输入,gateIndex变量标识哪个上游输入。
private final IntermediateDataSetID consumedResultId:代表当前Task消费的上游算子的中间结果集。
private final ResultPartitionType consumedPartitionType:代表当前InputGate的消费分区类型,Flink流式应用一般都是PIPELINED_BOUNDED模式,采用有限个buffer缓存来支持上下游同时生产和消费数据。
private final int numberOfInputChannels:代表有多少个上游结果子分区输入。
private final Map<IntermediateResultPartitionID, Map<InputChannelInfo, InputChannel>> inputChannels;:代表上游结果子分区输入明细。
private final InputChannel[] channels:代表上游结果子分区输入明细。
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>():当前可读取数据的InputChannel。
private BufferPool bufferPool:代表SingleInputGate的本地buffer池,用来缓存读取到的数据。

 

SingleInputGate读数据过程。

继续解析SingleInputGate.pollNext()方法读取数据过程。

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#pollNext

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return getNextBufferOrEvent(false);
    }

 

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
            throws IOException, InterruptedException {
        if (hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }

        if (closeFuture.isDone()) {
            throw new CancelTaskException("Input gate is already closed.");
        }
        //获取可用的InputChannel、数据Buffer等信息。
        Optional<InputWithData<InputChannel, Buffer>> next = waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            throughputCalculator.pauseMeasurement();
            return Optional.empty();
        }

        throughputCalculator.resumeMeasurement();

        InputWithData<InputChannel, Buffer> inputWithData = next.get();
        final BufferOrEvent bufferOrEvent =
                //转换为缓冲区或事件 里面会释放缓冲区
                transformToBufferOrEvent(
                        inputWithData.data,
                        inputWithData.moreAvailable,
                        inputWithData.input,
                        inputWithData.morePriorityEvents);
        throughputCalculator.incomingDataSize(bufferOrEvent.getSize());
        return Optional.of(bufferOrEvent);
    }

 

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#waitAndGetNextData

    private Optional<InputWithData<InputChannel, Buffer>> waitAndGetNextData(boolean blocking)
            throws IOException, InterruptedException {
        while (true) {
            synchronized (inputChannelsWithData) {
                //获取当前可读取数据的InChannel信息
                Optional<InputChannel> inputChannelOpt = getChannel(blocking);
                if (!inputChannelOpt.isPresent()) {
                    return Optional.empty();
                }

                final InputChannel inputChannel = inputChannelOpt.get();
                //读取恢复或正常缓冲区。
                Optional<Buffer> buffer = readRecoveredOrNormalBuffer(inputChannel);
                if (!buffer.isPresent()) {
                    checkUnavailability();
                    continue;
                }

                int numSubpartitions = inputChannel.getConsumedSubpartitionIndexSet().size();
                if (numSubpartitions > 1) {
                    switch (buffer.get().getDataType()) {
                        case END_OF_DATA:
                            endOfDatas[inputChannel.getChannelIndex()]++;
                            if (endOfDatas[inputChannel.getChannelIndex()] < numSubpartitions) {
                                buffer.get().recycleBuffer();
                                continue;
                            }
                            break;
                        case END_OF_PARTITION:
                            endOfPartitions[inputChannel.getChannelIndex()]++;
                            if (endOfPartitions[inputChannel.getChannelIndex()]
                                    < numSubpartitions) {
                                buffer.get().recycleBuffer();
                                continue;
                            }
                            break;
                        default:
                            break;
                    }
                }

                final boolean morePriorityEvents =
                        inputChannelsWithData.getNumPriorityElements() > 0;
                if (buffer.get().getDataType().hasPriority()) {
                    if (!morePriorityEvents) {
                        priorityAvailabilityHelper.resetUnavailable();
                    }
                }
                checkUnavailability();
                return Optional.of(
                        new InputWithData<>(
                                inputChannel,
                                buffer.get(),
                                !inputChannelsWithData.isEmpty(),
                                morePriorityEvents));
            }
        }
    }

 

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getChannel(boolean)

    private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
        assert Thread.holdsLock(inputChannelsWithData);

        while (inputChannelsWithData.isEmpty()) {
            if (closeFuture.isDone()) {
                throw new IllegalStateException("Released");
            }

            if (blocking) {
                inputChannelsWithData.wait();
            } else {
                availabilityHelper.resetUnavailable();
                return Optional.empty();
            }
        }

        InputChannel inputChannel = inputChannelsWithData.poll();
        enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());

        return Optional.of(inputChannel);
    }

 

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#readRecoveredOrNormalBuffer

    private Optional<Buffer> readRecoveredOrNormalBuffer(InputChannel inputChannel)
            throws IOException, InterruptedException {
        // Firstly, read the buffers from the recovered channel
        // 首先,从恢复的通道读取缓冲区
        if (inputChannel instanceof RecoveredInputChannel && !inputChannel.isReleased()) {
            //从可读取数据InputChannel中开始获取数据。
            Optional<Buffer> buffer = readBufferFromInputChannel(inputChannel);
            if (!((RecoveredInputChannel) inputChannel).getStateConsumedFuture().isDone()) {
                return buffer;
            }
        }

        //  After the recovered buffers are read, read the normal buffers
        // 读取恢复的缓冲区后,读取正常缓冲区
        return enabledTieredStorage()
                ? readBufferFromTieredStore(inputChannel)
                : readBufferFromInputChannel(inputChannel);
    }

 

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#readBufferFromInputChannel

    private Optional<Buffer> readBufferFromInputChannel(InputChannel inputChannel)
            throws IOException, InterruptedException {
        //获取下一个缓冲区
        Optional<BufferAndAvailability> bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
        if (!bufferAndAvailabilityOpt.isPresent()) {
            return Optional.empty();
        }
        final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
        if (bufferAndAvailability.moreAvailable()) {
            // enqueue the inputChannel at the end to avoid starvation
            //将 inputChannel 添加到最后队列以避免饥饿
            queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
        }
        if (bufferAndAvailability.hasPriority()) {
            lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
                    bufferAndAvailability.getSequenceNumber();
        }

        Buffer buffer = bufferAndAvailability.buffer();
        if (buffer.getDataType() == Buffer.DataType.RECOVERY_METADATA) {
            RecoveryMetadata recoveryMetadata =
                    (RecoveryMetadata)
                            EventSerializer.fromSerializedEvent(
                                    buffer.getNioBufferReadable(), getClass().getClassLoader());
            lastBufferStatusMapInTieredStore.put(
                    inputChannel.getChannelIndex(),
                    Tuple2.of(
                            buffer.getDataType().isPartialRecord(),
                            recoveryMetadata.getFinalBufferSubpartitionId()));
        }
        return Optional.of(bufferAndAvailability.buffer());
    }

 

InputChannel

在实际生产环境中,上下游Task有可能被部署到同一个机器节点上,这时InputChannel类型就是LocalInputChannel。也有可能被部署到不同的机器节点上,这时InputChannel类型就是RemoteInputChannel。下面分别解析这两种情况下的InputChannel类型。

 

关键成员结构

LocalInputChannel

private final ResultPartitionManager partitionManager:收集管理结果分区
private volatile ResultSubpartitionView subpartitionView:结果子分区视图,封装了ResultSubpartition中读取数据、释放资源等行为。其中getNextBuffer()方法定义了获取Buffer数据的过程。

 

RemoteInputChannel

private final ConnectionManager connectionManager:与其他节点的通信连接管理组件。

private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>():当前从上游接收到的Buffer队列,后续会被算子消费处理。

private volatile PartitionRequestClient partitionRequestClient:和上游Task的连接客户端,Flink是用Netty通信框架实现节点之间的数据通信过程。

private final int initialCredit:初始信用凭证,代表该InputChannel独有的初始Buffer数量。

private final BufferManager bufferManager:Buffer管理器。

 

InputChannel读取数据过程

上节SingleInputGate最后解析到InputChannel.getNextBuffer()开始获取Buffer数据。下面就两种InputChannel类型分别介绍getNextBuffer()方法实现过程。

LocalInputChannel

LocalInputChannel类型意味着上下游Task在同一个机器节点上,上篇讲解到数据Buffer最后会被存储在PipelinedSubpartition.buffers队列集合中。而LocalInputChannel类型会以同步的形式从PipelinedSubpartition.buffers获取数据,实现同一个机器节点上下游Task数据写和读操作。

org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel#getNextBuffer

    //定义了获取Buffer数据的过程
    @Override
    public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
        checkError();

        ResultSubpartitionView subpartitionView = this.subpartitionView;
        if (subpartitionView == null) {
            // There is a possible race condition between writing a EndOfPartitionEvent (1) and
            // flushing (3) the Local
            // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush
            // notification (4). When
            // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue
            // LocalInputChannel after (or
            // during) it was released during reading the EndOfPartitionEvent (2).
            //在写入EndOfPartitionEvent (1) 和刷新 (3) 发送方端的本地通道与读取EndOfPartitionEvent
            // (2) 和处理刷新通知 (4) 之间可能存在竞争条件。
            // 当它们按顺序发生时 (1-2-3-4),刷新通知可以在读取EndOfPartitionEvent
            // (2) 期间释放LocalInputChannel之后 (或期间) 重新排队。
            if (isReleased) {
                return Optional.empty();
            }

            // this can happen if the request for the partition was triggered asynchronously
            // by the time trigger
            // would be good to avoid that, by guaranteeing that the requestPartition() and
            // getNextBuffer() always come from the same thread
            // we could do that by letting the timer insert a special "requesting channel" into the
            // input gate's queue
            //如果分区的请求是由时间触发器异步触发的,则可以避免这种情况,通过保证requestPartition()
            // 和getNextBuffer() 总是来自同一个线程,我们可以让定时器插入一个特殊的 “请求通道” 到inputGate的队列
            subpartitionView = checkAndWaitForSubpartitionView();
        }

        //获取下一个缓冲区
        BufferAndBacklog next = subpartitionView.getNextBuffer();
        // ignore the empty buffer directly
        // 直接忽略空缓冲区
        while (next != null && next.buffer().readableBytes() == 0) {
            next.buffer().recycleBuffer();
            next = subpartitionView.getNextBuffer();
            numBuffersIn.inc();
        }

        if (next == null) {
            if (subpartitionView.isReleased()) {
                throw new CancelTaskException(
                        "Consumed partition " + subpartitionView + " has been released.");
            } else {
                return Optional.empty();
            }
        }

        Buffer buffer = next.buffer();

        if (buffer instanceof FileRegionBuffer) {
            buffer = ((FileRegionBuffer) buffer).readInto(inputGate.getUnpooledSegment());
        }

        if (buffer instanceof CompositeBuffer) {
            buffer = ((CompositeBuffer) buffer).getFullBufferData(inputGate.getUnpooledSegment());
        }

        numBytesIn.inc(buffer.readableBytes());
        numBuffersIn.inc();
        channelStatePersister.checkForBarrier(buffer);
        channelStatePersister.maybePersist(buffer);
        NetworkActionsLogger.traceInput(
                "LocalInputChannel#getNextBuffer",
                buffer,
                inputGate.getOwningTaskName(),
                channelInfo,
                channelStatePersister,
                next.getSequenceNumber());
        return Optional.of(
                new BufferAndAvailability(
                        buffer,
                        next.getNextDataType(),
                        next.buffersInBacklog(),
                        next.getSequenceNumber()));
    }

 

org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView#getNextBuffer

    @Nullable
    @Override
    public BufferAndBacklog getNextBuffer() {
        //轮询缓冲区
        return parent.pollBuffer();
    }

 

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#pollBuffer

    @Nullable
    BufferAndBacklog pollBuffer() {
        synchronized (buffers) {
            if (isBlocked) {
                return null;
            }

            Buffer buffer = null;

            if (buffers.isEmpty()) {
                flushRequested = false;
            }

            while (!buffers.isEmpty()) {
                //获取buffers队列的数据
                BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength =
                        buffers.peek();
                BufferConsumer bufferConsumer =
                        bufferConsumerWithPartialRecordLength.getBufferConsumer();
                if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
                        == bufferConsumer.getDataType()) {
                    completeTimeoutableCheckpointBarrier(bufferConsumer);
                }
                buffer = buildSliceBuffer(bufferConsumerWithPartialRecordLength);

                checkState(
                        bufferConsumer.isFinished() || buffers.size() == 1,
                        "When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");

                if (buffers.size() == 1) {
                    // turn off flushRequested flag if we drained all the available data
                    //如果我们耗尽了所有可用数据,请关闭flushRequested标志
                    flushRequested = false;
                }

                if (bufferConsumer.isFinished()) {
                    requireNonNull(buffers.poll()).getBufferConsumer().close();
                    //从自己的buffers队列里获取数据后会将自己的buffersInBacklog变量减1,代表结果子分区数据积压数减1。
                    decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
                }

                // if we have an empty finished buffer and the exclusive credit is 0, we just return
                // the empty buffer so that the downstream task can release the allocated credit for
                // this empty buffer, this happens in two main scenarios currently:
                // 1. all data of a buffer builder has been read and after that the buffer builder
                // is finished
                // 2. in approximate recovery mode, a partial record takes a whole buffer builder
                //如果我们有一个空的已完成缓冲区,并且独占信用为0,
                //我们只是返回空缓冲区,以便下游任务可以释放为此空缓冲区分配的信用,
                // 目前在两种主要情况下会发生这种情况:
                // 1.buffer builder的所有数据已被读取,之后buffer builder完成
                // 2.在近似恢复模式下,部分记录将占用整个buffer builder
                if (receiverExclusiveBuffersPerChannel == 0 && bufferConsumer.isFinished()) {
                    break;
                }

                if (buffer.readableBytes() > 0) {
                    break;
                }
                //释放此缓冲区一次,即,如果引用计数达到0 ,则减少引用计数并回收缓冲区。
                buffer.recycleBuffer();
                buffer = null;
                if (!bufferConsumer.isFinished()) {
                    break;
                }
            }

            if (buffer == null) {
                return null;
            }

            if (buffer.getDataType().isBlockingUpstream()) {
                isBlocked = true;
            }

            updateStatistics(buffer);
            // Do not report last remaining buffer on buffers as available to read (assuming it's
            // unfinished).
            // It will be reported for reading either on flush or when the number of buffers in the
            // queue
            // will be 2 or more.
            //不要将缓冲区上最后剩余的缓冲区报告为可读取 (假设它未完成)。在刷新时或当队列中的缓冲区数量为2或更多时,将报告读取。
            NetworkActionsLogger.traceOutput(
                    "PipelinedSubpartition#pollBuffer",
                    buffer,
                    parent.getOwningTaskName(),
                    subpartitionInfo);
            return new BufferAndBacklog(
                    buffer,
                    getBuffersInBacklogUnsafe(),
                    isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
                    sequenceNumber++);
        }
    }

 

RemoteInputChannel

org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#getNextBuffer

    @Override
    public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
        checkPartitionRequestQueueInitialized();

        final SequenceBuffer next;
        final DataType nextDataType;

        synchronized (receivedBuffers) {
            //以同步的方式从receivedBuffers队列里获取一个Buffer数据并返回
            //而receivedBuffers队列Buffer数据添加动作是在Netty通信过程中发生的。
            //通过CreditBasedPartitionRequestClientHandler处理器解析获取的数据并添加到receivedBuffers队列。
            next = receivedBuffers.poll();

            if (next != null) {
                totalQueueSizeInBytes -= next.buffer.getSize();
            }
            nextDataType =
                    receivedBuffers.peek() != null
                            ? receivedBuffers.peek().buffer.getDataType()
                            : DataType.NONE;
        }

        if (next == null) {
            if (isReleased.get()) {
                throw new CancelTaskException(
                        "Queried for a buffer after channel has been released.");
            }
            return Optional.empty();
        }

        NetworkActionsLogger.traceInput(
                "RemoteInputChannel#getNextBuffer",
                next.buffer,
                inputGate.getOwningTaskName(),
                channelInfo,
                channelStatePersister,
                next.sequenceNumber);
        numBytesIn.inc(next.buffer.getSize());
        numBuffersIn.inc();
        return Optional.of(
                new BufferAndAvailability(next.buffer, nextDataType, 0, next.sequenceNumber));
    }

 

以上即为Flink Task读数据过程解析。通过Flink Task读写数据介绍,我们知道了Task数据的基本操作,下篇继续介绍ResultSubPartition的buffers数据是怎么发送给下游的,RemoteInputChannel的数据是怎么通过网络传输得到的。

 

参考资料

Flink源码解析(十八)——Flink Task读数据过程解析


目录