Flink V1.20源码阅读笔记(6)- 数据分区解析

-
-
2025-02-14

数据分区概念

对分布式计算引擎来说,数据分区的主要作用是将现环节的数据进行切分,交给下游位于不同物理节点上的Task计算。

 

Flink数据分区接口体系

顶层接口ChannelSelector

org.apache.flink.runtime.io.network.api.writer.ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * Initializes the channel selector with the number of output channels.
     *
     * @param numberOfChannels the total number of output channels which are attached to respective
     *     output gate.
     */
    //设置下游算子的通道数量。从该接口中可以看到,算子里的每一个分区器都知道下游通道数量。
    void setup(int numberOfChannels);

    /**
     * Returns the logical channel index, to which the given record should be written. It is illegal
     * to call this method for broadcast channel selectors and this method can remain not
     * implemented in that case (for example by throwing {@link UnsupportedOperationException}).
     *
     * @param record the record to determine the output channels for.
     * @return an integer number which indicates the index of the output channel through which the
     *     record shall be forwarded.
     */
    //设置每条数据所属的下游具体通道。
    int selectChannel(T record);

    /**
     * Returns whether the channel selector always selects all the output channels.
     *
     * @return true if the selector is for broadcast mode.
     */
    //判断是否向下游广播。
    boolean isBroadcast();
}

 

Flink数据分区抽象类实现StreamPartitioner

抽象类StreamPartitioner包含一个关键成员变量numberOfChannels,在具体数据分区实现类初始化时,需要设置成员变量numberOfChannels的值,代表下游通道数量。

 

Flink数据分区实现类:

  • BroadcastPartitioner:将每条数据广播给下游所有分区。调用方法dataStream.broadcast();
  • CustomPartitionerWrapper:Flink应用人员自定义分区选择逻辑。调用方法dataStream.partitionCustom(partitioner,${key});
  • ForwardPartitioner:在同一个OperatorChain中上下游算子之间数据转发,实际上是数据是直接传递给下游的。调用方法dataStream.forward();
  • GlobalPartitioner:只会将数据输出到下游算子的第一个实例。调用方法dataStream.global();
  • KeyGroupStreamPartitioner:应用在KeyedStream上,后续随笔会着重解析key的分发过程。
  • RebalancePartitioner:轮询分配每条数据。调用方法dataStream.rebalance();
  • RescalePartitioner:先平均分配下游分区范围,再轮询分配每条数据。调用方法dataStream.rescale();
  • ShufflePartitioner:随机分配每条数据。调用方法dataStream.shuffle();

 

数据分区实现解析

CustomPartitionerWrapper解析:

DataStream类下调用partitionCustom()方法,需要传入Partitioner和KeySelector实例入参。

org.apache.flink.streaming.api.datastream.DataStream#partitionCustom

    /**
     * Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
     * method takes the key selector to get the key to partition on, and a partitioner that accepts
     * the key type.
     *
     * <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
     * of fields.
     *
     * @param partitioner The partitioner to assign partitions to keys.
     * @param keySelector The KeySelector with which the DataStream is partitioned.
     * @return The partitioned DataStream.
     * @see KeySelector
     */
    // 使用自定义分区器在选择器返回的键上对 DataStream 进行分区。此方法使用键选择器来获取要分区的键,并使用接受键类型的分区器。
    //注意:此方法仅适用于单个字段键,即选择器不能返回字段的元组。
    //参数:
    //partitioner – 用于将分区分配给键的分区器。
    //keySelector – 用于对 DataStream 进行分区的 KeySelector。
    public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        return setConnectionType(
                new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
    }

 

Partitioner接口只需要自定义实现partition()方法,即根据KeySelector得到的key,计算出来一个自定义通道位置。

org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper#selectChannel

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }

        return partitioner.partition(key, numberOfChannels);
    }

 

 可知setConnectionType()方法根据用户自定义分区类实例生成一个PartitionTransformation虚拟转换。

org.apache.flink.streaming.api.datastream.DataStream#setConnectionType

    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        return new DataStream<>(
                this.getExecutionEnvironment(),
                new PartitionTransformation<>(this.getTransformation(), partitioner));
    }

 

RebalancePartitioner解析:

可知RebalancePartitioner事先随机初始化一个下游通道位置,每来一条数据,目标通道位置+1事先轮询的过程。

org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner#setup

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);

        //随机初始化一个下游通道位置
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }

 

org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner#selectChannel

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //每来一条数据,目标通道位置+1
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

 

ShufflePartitioner解析:

可知ShufflePartitioner对每条数据随机生成一个0~numberOfChannels范围内的整数作为该数据的下游通道位置。

org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner#selectChannel

    private Random random = new Random();

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }

 

参考资料:

Flink源码解析(六)——数据分区解析


 


目录