Flink V1.20源码阅读笔记(5.8)- Flink Task写数据过程解析

-
-
2025-02-14

本篇开始解析Task写数据过程。Task写数据过程中涉及到RecordWriterOutput、RecordWriter、ChannelSelector、ResultPartition、ResultSubpartition、LocalBufferPool等组件,这些组件在之前随笔中或多或少都提及过,本篇随笔着重解析数据在这些组件之间的流转过程。

 

RecordWriterOutput

RecordWriterOutput结构解析,Output接口面向api层,负责向下游传递数据。

RecordWriterOutput类型包含recordWriter、serializationDelegate等重要成员变量,包含分区器信息的recordWriter主要面向流中Record元素,负责转发Output接收的元素到下游某一个子分区中。serializationDelegate负责序列化Record元素数据。

以flatMap(...) api为例,当Flink应用人员实现FlatMapFunction UDF函数时最后一步会调用collector.collect(...)方法向下游发送数据,此处collector继承于Output接口,上篇随笔讲到Output接口继承于Collector接口,并且在OperatorChain创建过程中每个算子都有一个Output成员实例来接收本算子产生的数据并向下游发送数据。当算子之间是不可Chain边时,Output成员的实际类型往往是RecordWriterOutput类型。推知StreamFlatMap算子在算子链生成过程中创建Output实例,并在open()方法调用时封装出collector成员。在Flink应用运行过程中触发processElement(...)方法处理并收集数据。

 

org.apache.flink.streaming.api.operators.StreamFlatMap

public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
}

 

org.apache.flink.streaming.api.operators.TimestampedCollector#collect

    @Override
    public void collect(T record) {
        output.collect(reuse.replace(record));
    }

 

org.apache.flink.streaming.runtime.io.RecordWriterOutput#collect

    @Override
    public void collect(StreamRecord<OUT> record) {
        //收集并检查是否被锁住
        if (collectAndCheckIfChained(record)) {
            numRecordsOut.inc();
        }
    }

 

org.apache.flink.streaming.runtime.io.RecordWriterOutput#collectAndCheckIfChained

    @Override
    public boolean collectAndCheckIfChained(StreamRecord<OUT> record) {
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            //我们不负责发射到主输出。
            return false;
        }

        //推送到record writer
        pushToRecordWriter(record);
        return true;
    }

 

RecordWriterOutput将接收到的StreamRecord数据元素赋值给serializationDelegate成员,借助recordWriter将带有序列化操作的serializationDelegate分发到某一个子分区中。

org.apache.flink.streaming.runtime.io.RecordWriterOutput#pushToRecordWriter

    private <X> void pushToRecordWriter(StreamRecord<X> record) {
        serializationDelegate.setInstance(record);

        try {
            //借助recordWriter将带有序列化操作的serializationDelegate分发到某一个子分区中。
            recordWriter.emit(serializationDelegate);
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

 

以上是RecordWriterOutput向下游转发数据的高层抽象,下面继续分析RecordWriter的实现。

 

RecordWriter

RecordWriter结构解析,主要包含targetPartition、numberOfSubpartitions、serializer、outputFlusher等成员变量,各个成员含义如下:

  • ResultPartitionWriter targetPartition:结果分区,每一个算子实例Task都包含一个结果分区,结果分区的数量即是算子的并行度。
  • int numberOfSubpartitions:结果子分区个数,对应下游有多少个算子实例Task消费该Task实例的数据。
  • DataOutputSerializer serializer:数据输出视图,包含一个byte数组,负责序列化StreamRecord数据元素并顺序写入byte数组中,最后封装出一个ByteBuffer内存区域。
  • OutputFlusher outputFlusher:定时刷新器,是一个单独的线程实现。当上游数据产出较慢时,该线程负责以固定的时间间隔将已有的buffer数据发送到下游,避免下游算子等待过长时间。
  • ChannelSelector<T> channelSelector:RecordWriter子类ChannelSelectorRecordWriter中包含一个分区器成员,用来决定一个StreamRecord数据元素被分发到哪个结果子分区中。

org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#emit

    @Override
    public void emit(T record) throws IOException {
        emit(record, channelSelector.selectChannel(record));
    }

 

RecordWriter数据输出。当RecordWriterOutput实例调用RecordWriter.emit方法时,转入到RecordWriter实例方法emit(T record,int targetSubpartition)中。由以下代码可知RecordWriter先将StreamRecord数据元素序列化写入到ByteBuffer,继而调用targetPartition.emitRecord(...)方法将ByteBuff缓冲区数据写入到具体的子分区实例。

org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit

    public void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();

        //将给定的序列化记录写入目标子分区。
        targetPartition.emitRecord(
                //先将StreamRecord数据元素序列化写入到ByteBuffer
                serializeRecord(serializer, record), targetSubpartition);

        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    }

 

org.apache.flink.runtime.io.network.api.writer.RecordWriter#serializeRecord

    @VisibleForTesting
    public static ByteBuffer serializeRecord(
            DataOutputSerializer serializer, IOReadableWritable record) throws IOException {
        // the initial capacity should be no less than 4 bytes
        //初始容量应不少于4字节
        serializer.setPositionUnsafe(4);

        // write data
        //写入数据
        record.write(serializer);

        // write length
        //写入长度
        serializer.writeIntUnsafe(serializer.length() - 4, 0);

        return serializer.wrapAsByteBuffer();
    }

 

数据分区器ChannelSelector

具体信息参考 Flink V1.20源码阅读笔记(6)- 数据分区解析

 

结果分区ResultPartition和结果子分区ResultSubpartition

ResultPartition结构解析,包含以下几个核心成员:

  • int partitionIndex:算子实例Task对应的结果分区下标。
  • ResultPartitionType partitionType:结果分区类型。Flink流式应用类型一般是PIPELINED、PIPELINED_BOUNDED,PIPELINED_BOUNDED代表数据写入速度有限制,当流数据出现背压时,任务不会在自己的Buffer里缓存大量的数据。
  • int numSubpartitions:结果分区包含的结果子分区个数。
  • BufferPool bufferPool:结果分区包含的缓冲池,用来存储分配到子分区的数据,类型是LocalBuffPool。ResultPartition写数据时,会向bufferPool申请buffer并写入数据。
  • SupplierWithException<BufferPool, IOException> bufferPoolFactory:缓冲池工厂类,用来创建bufferPool,类型是NetworkBufferPool。
  • ResultSubpartition[] subpartitions  : ResultPartition子类BufferWritingResultPartition 包含 结果子分区数组。

 

结果子分区ResultSubpartition结构解析,包含以下几个核心成员:

  • buffers: 子类PipelinedSubpartition中包含用于缓存写入的数据,等待下游Task消费buffers里的数据。
  • PipelinedSubpartitionView readView: 子类PipelinedSubpartition中包含读视图,该结果子分区数据消费行为的封装。

 

接上小节,调用targetPartition.emitRecord(...)方法将ByteBuffer缓冲区数据写入到具体的子分区实例中,方法实现中会先从LocalBufferPool资源池中获取Buffer资源并将ByteBuffer数据写入。BufferBuilder底层用MemorySegment代表Buffer资源信息。如果ByteBuffer数据过大,一个BufferBuilder被写满后还有剩余数据,则将该BufferBuilder记为finish状态,继续申请新BufferBuilder实例,直到把剩余数据写完。

org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#emitRecord

    //方法实现中会先从LocalBufferPool资源池中获取Buffer资源并将ByteBuffer数据写入。
    //BufferBuilder底层用MemorySegment代表Buffer资源信息。
    //如果ByteBuffer数据过大,一个BufferBuilder被写满后还有剩余数据,
    //则将该BufferBuilder记为finish状态,继续申请新BufferBuilder实例,直到把剩余数据写完。
    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        totalWrittenBytes += record.remaining();

        //分配新的BufferBuilder实例并加入结果子分区buffers队列,最后将ByteBuffer数据写入到BufferBuilder实例中。
        BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);

        while (record.hasRemaining()) {
            // full buffer, partial record
            //缓冲区已满,部分记录
            finishUnicastBufferBuilder(targetSubpartition);
            buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
        }

        if (buffer.isFull()) {
            // full buffer, full record
            //满缓冲区、全记录
            finishUnicastBufferBuilder(targetSubpartition);
        }

        // partial buffer, full record
        //部分缓冲区,完整记录
    }

 

调用appendUnicastDataForNewRecord(...)方法,分配新的BufferBuilder实例并加入结果子分区buffers队列,最后将ByteBuffer数据写入到BufferBuilder实例中。

org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#appendUnicastDataForNewRecord

    private BufferBuilder appendUnicastDataForNewRecord(
            final ByteBuffer record, final int targetSubpartition) throws IOException {
        if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
            throw new ArrayIndexOutOfBoundsException(targetSubpartition);
        }
        BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];

        if (buffer == null) {
            //请求新的
            buffer = requestNewUnicastBufferBuilder(targetSubpartition);
            //将数据Buffer存放到PipelinedSubpartition的buffers队列中。
            addToSubpartition(buffer, targetSubpartition, 0, record.remaining());
        }

        append(record, buffer);

        return buffer;
    }

 

org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#requestNewUnicastBufferBuilder

从LocalBufferPool中申请新的BufferBuilder资源。

    private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition)
            throws IOException {
        checkInProduceState();
        ensureUnicastMode();
        //从LocalBufferPool中申请新的BufferBuilder资源。
        final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
        unicastBufferBuilders[targetSubpartition] = bufferBuilder;

        return bufferBuilder;
    }

 

org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#requestNewBufferBuilderFromPool

    private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
            throws IOException {
        //请求一个BufferBuilder实例
        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
        if (bufferBuilder != null) {
            return bufferBuilder;
        }

        hardBackPressuredTimeMsPerSecond.markStart();
        try {
            bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
            hardBackPressuredTimeMsPerSecond.markEnd();
            return bufferBuilder;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }

 

org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#addToSubpartition

将数据Buffer存放到PipelinedSubpartition的buffers队列中。

    private void addToSubpartition(
            BufferBuilder buffer,
            int targetSubpartition,
            int partialRecordLength,
            int minDesirableBufferSize)
            throws IOException {
        int desirableBufferSize =
                //添加给定的缓冲区
                subpartitions[targetSubpartition].add(
                        buffer.createBufferConsumerFromBeginning(), partialRecordLength);

        resizeBuffer(buffer, desirableBufferSize, minDesirableBufferSize);
    }

 

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#add

    @Override
    public int add(BufferConsumer bufferConsumer, int partialRecordLength) {
        return add(bufferConsumer, partialRecordLength, false);
    }

 

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#add

    private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
        checkNotNull(bufferConsumer);

        final boolean notifyDataAvailable;
        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
        int newBufferSize;
        synchronized (buffers) {
            if (isFinished || isReleased) {
                bufferConsumer.close();
                return ADD_BUFFER_ERROR_CODE;
            }

            // Add the bufferConsumer and update the stats
            //添加bufferConsumer并更新统计信息
            if (addBuffer(bufferConsumer, partialRecordLength)) {
                prioritySequenceNumber = sequenceNumber;
            }
            updateStatistics(bufferConsumer);
            increaseBuffersInBacklog(bufferConsumer);
            notifyDataAvailable = finish || shouldNotifyDataAvailable();

            isFinished |= finish;
            newBufferSize = bufferSize;
        }

        notifyPriorityEvent(prioritySequenceNumber);
        if (notifyDataAvailable) {
            //通知可用数据
            notifyDataAvailable();
        }

        return newBufferSize;
    }

 

org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#addBuffer

    @GuardedBy("buffers")
    private boolean addBuffer(BufferConsumer bufferConsumer, int partialRecordLength) {
        assert Thread.holdsLock(buffers);
        if (bufferConsumer.getDataType().hasPriority()) {
            return processPriorityBuffer(bufferConsumer, partialRecordLength);
        } else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
                == bufferConsumer.getDataType()) {
            processTimeoutableCheckpointBarrier(bufferConsumer);
        }
        //调用add
        buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength));
        return false;
    }

 

LocalBufferPool

LocalBufferPool结构解析,包含以下几个核心成员:

  • NetworkBufferPool networkBufferPool:全局网络缓存池,一个TaskManager只有一个NetworkBufferPool。LocalBufferPool中的Buffer资源来源于NetworkBufferPool。
  • int numberOfRequiredMemorySegments:当前LocalBufferPool最少能申请的Buffer资源。
  • ArrayDeque<MemorySegment> availableMemorySegments:LocalBufferPool中当前可用的Buffer资源集合,从NetworkBufferPool获取但还没缓存数据的Buffer。Buffer的底层存储是MemorySegment。
  • int maxNumberOfMemorySegments:能从NetworkBufferPool申请的最大Buffer资源数量。
  • int numberOfRequestedMemorySegments:当前为止从NetworkBufferPool申请的Buffer资源。

 

LocalBufferPool创建过程:

在解析Task启动过程时有一步设置结果分区的操作,LocalBufferPool的创建就在该设置步骤中

org.apache.flink.runtime.taskmanager.Task#setupPartitionsAndGates

    @VisibleForTesting
    public static void setupPartitionsAndGates(
            ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {

        for (ResultPartitionWriter partition : producedPartitions) {
            //方法中会创建LocalBufferPool
            partition.setup();
        }

        // InputGates must be initialized after the partitions, since during InputGate#setup
        // we are requesting partitions
        //InputGates必须在分区之后初始化,因为在InputGatesetup期间,我们正在请求分区
        for (InputGate gate : inputGates) {
            gate.setup();
        }
    }

 

org.apache.flink.runtime.io.network.partition.ResultPartition#setup

    //向此结果分区注册缓冲池。
    //每个结果分区都有一个池,该池由其所有子分区共享。
    //为了符合TaskExecutor中任务注册的生命周期,池在构建后 * 在分区中注册。
    @Override
    public void setup() throws IOException {
        checkState(
                this.bufferPool == null,
                "Bug in result partition setup logic: Already registered buffer pool.");

        //创建bufferPool
        //调用的是org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.createBufferPoolFactory
        this.bufferPool = checkNotNull(bufferPoolFactory.get());
        //做子类自己的设置操作
        setupInternal();
        //注册结果分区
        partitionManager.registerResultPartition(this);
    }

 

org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#createBufferPoolFactory

   //出于两个考虑,最小池大小应为numberOfSubpartitions 1:
    //1. StreamTask只能在输出侧至少有一个可用缓冲区的情况下处理输入,
    // 因此如果最小池大小恰好等于子分区的数量,则可能会导致卡住问题,因为每个子分区都可能维护部分未填充的缓冲区。
    //2.为每个输出LocalBufferPool增加一个缓冲区,以避免性能回归,如果处理输入是基于至少一个缓冲区可用在输出侧
    @VisibleForTesting
    SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
            int numberOfSubpartitions, ResultPartitionType type) {
        return () -> {
            Pair<Integer, Integer> pair =
                    //计算并返回结果分区使用的本地网络缓冲池大小
                    NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition(
                            configuredNetworkBuffersPerChannel,
                            floatingNetworkBuffersPerGate,
                            sortShuffleMinParallelism,
                            sortShuffleMinBuffers,
                            numberOfSubpartitions,
                            tieredStorage.isPresent(),
                            tieredStorage
                                    .map(ResultPartitionFactory::getNumTotalGuaranteedBuffers)
                                    .orElse(0),
                            type);

            return bufferPoolFactory.createBufferPool(
                    pair.getLeft(),
                    pair.getRight(),
                    numberOfSubpartitions,
                    maxBuffersPerChannel,
                    //返回该结果分区是否需要透支缓冲区。
                    isOverdraftBufferNeeded(type) ? maxOverdraftBuffersPerGate : 0);
        };
    }

 

创建LocalBufferPool时有三个重要参数控制着每个LocalBufferPool的Buffer资源数,参数分别是Task对应的下游任务数即结果子分区数*每个结果子分区的Buffer数+额外分配的Buffer数量。每个结果子分区的Buffer数由参数taskmanager.network.memory.buffers-per-channel控制,额外分配的Buffer数量由参数taskmanager.network.memory.floating-buffers-per-gate控制。

org.apache.flink.runtime.io.network.buffer.NetworkBufferPool#createBufferPool

    @Override
    public BufferPool createBufferPool(
            int numRequiredBuffers,
            int maxUsedBuffers,
            int numSubpartitions,
            int maxBuffersPerChannel,
            int maxOverdraftBuffersPerGate)
            throws IOException {
        return internalCreateBufferPool(
                numRequiredBuffers,
                maxUsedBuffers,
                numSubpartitions,
                maxBuffersPerChannel,
                maxOverdraftBuffersPerGate);
    }

 

org.apache.flink.runtime.io.network.buffer.NetworkBufferPool#internalCreateBufferPool

    private BufferPool internalCreateBufferPool(
            int numRequiredBuffers,
            int maxUsedBuffers,
            int numSubpartitions,
            int maxBuffersPerChannel,
            int maxOverdraftBuffersPerGate)
            throws IOException {

        // It is necessary to use a separate lock from the one used for buffer
        // requests to ensure deadlock freedom for failure cases.
        //有必要使用与缓冲区请求所用的锁不同的锁,以确保故障情况下没有死锁。
        synchronized (factoryLock) {
            if (isDestroyed) {
                throw new IllegalStateException("Network buffer pool has already been destroyed.");
            }

            // Ensure that the number of required buffers can be satisfied.
            // With dynamic memory management this should become obsolete.
            // 确保能够满足所需缓冲区的数量。有了动态内存管理,这应该变得过时了。
            if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
                throw new IOException(
                        String.format(
                                "Insufficient number of network buffers: "
                                        + "required %d, but only %d available. %s.",
                                numRequiredBuffers,
                                totalNumberOfMemorySegments - numTotalRequiredBuffers,
                                getConfigDescription()));
            }

            this.numTotalRequiredBuffers += numRequiredBuffers;

            // We are good to go, create a new buffer pool and redistribute
            // non-fixed size buffers.
            //我们可以开始创建一个新的缓冲池并重新分配非固定大小的缓冲区。
            LocalBufferPool localBufferPool =
                    new LocalBufferPool(
                            this,
                            numRequiredBuffers,
                            maxUsedBuffers,
                            numSubpartitions,
                            maxBuffersPerChannel,
                            maxOverdraftBuffersPerGate);

            allBufferPools.add(localBufferPool);

            if (numRequiredBuffers < maxUsedBuffers) {
                resizableBufferPools.add(localBufferPool);
            }

            //重新分配缓冲区
            redistributeBuffers();

            return localBufferPool;
        }
    }

 

我们回到前面requestBufferBuilder(...)方法,看一下具体的过程:

org.apache.flink.runtime.io.network.buffer.LocalBufferPool#requestBufferBuilder

    @Override
    public BufferBuilder requestBufferBuilder() {
        return toBufferBuilder(
                //从LocalBufferPool可用队列里获取MemorySegment资源。
                requestMemorySegment(UNKNOWN_CHANNEL), UNKNOWN_CHANNEL);
    }

 

org.apache.flink.runtime.io.network.buffer.LocalBufferPool#toBufferBuilder

    private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
        if (memorySegment == null) {
            return null;
        }

        if (targetChannel == UNKNOWN_CHANNEL) {
            return new BufferBuilder(memorySegment, this);
        } else {
            //创建BufferBuilder
            return new BufferBuilder(memorySegment, subpartitionBufferRecyclers[targetChannel]);
        }
    }

 

从LocalBufferPool可用队列里获取MemorySegment资源。

org.apache.flink.runtime.io.network.buffer.LocalBufferPool#requestMemorySegment

    @Nullable
    private MemorySegment requestMemorySegment(int targetChannel) {
        MemorySegment segment = null;
        synchronized (availableMemorySegments) {
            checkDestroyed();

            if (!availableMemorySegments.isEmpty()) {
                segment = availableMemorySegments.poll();
            } else if (isRequestedSizeReached()) {
                // Only when the buffer request reaches the upper limit(i.e. current pool size),
                // requests an overdraft buffer.
                //仅当缓冲区请求达到上限时 (即当前池大小),请求透支缓冲区。
                segment = requestOverdraftMemorySegmentFromGlobal();
            }

            if (segment == null) {
                return null;
            }

            if (targetChannel != UNKNOWN_CHANNEL) {
                if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
                    unavailableSubpartitionsCount++;
                }
            }

            checkAndUpdateAvailability();
        }
        return segment;
    }

 

参考资料:

Flink源码解析(十七)——Flink Task写数据过程解析

 


目录