数据分区概念※
对分布式计算引擎来说,数据分区的主要作用是将现环节的数据进行切分,交给下游位于不同物理节点上的Task计算。
Flink数据分区接口体系※
顶层接口ChannelSelector
org.apache.flink.runtime.io.network.api.writer.ChannelSelector
public interface ChannelSelector<T extends IOReadableWritable> {
/**
* Initializes the channel selector with the number of output channels.
*
* @param numberOfChannels the total number of output channels which are attached to respective
* output gate.
*/
//设置下游算子的通道数量。从该接口中可以看到,算子里的每一个分区器都知道下游通道数量。
void setup(int numberOfChannels);
/**
* Returns the logical channel index, to which the given record should be written. It is illegal
* to call this method for broadcast channel selectors and this method can remain not
* implemented in that case (for example by throwing {@link UnsupportedOperationException}).
*
* @param record the record to determine the output channels for.
* @return an integer number which indicates the index of the output channel through which the
* record shall be forwarded.
*/
//设置每条数据所属的下游具体通道。
int selectChannel(T record);
/**
* Returns whether the channel selector always selects all the output channels.
*
* @return true if the selector is for broadcast mode.
*/
//判断是否向下游广播。
boolean isBroadcast();
}
Flink数据分区抽象类实现StreamPartitioner
抽象类StreamPartitioner包含一个关键成员变量numberOfChannels,在具体数据分区实现类初始化时,需要设置成员变量numberOfChannels的值,代表下游通道数量。
Flink数据分区实现类:
- BroadcastPartitioner:将每条数据广播给下游所有分区。调用方法dataStream.broadcast();
- CustomPartitionerWrapper:Flink应用人员自定义分区选择逻辑。调用方法dataStream.partitionCustom(partitioner,${key});
- ForwardPartitioner:在同一个OperatorChain中上下游算子之间数据转发,实际上是数据是直接传递给下游的。调用方法dataStream.forward();
- GlobalPartitioner:只会将数据输出到下游算子的第一个实例。调用方法dataStream.global();
- KeyGroupStreamPartitioner:应用在KeyedStream上,后续随笔会着重解析key的分发过程。
- RebalancePartitioner:轮询分配每条数据。调用方法dataStream.rebalance();
- RescalePartitioner:先平均分配下游分区范围,再轮询分配每条数据。调用方法dataStream.rescale();
- ShufflePartitioner:随机分配每条数据。调用方法dataStream.shuffle();
数据分区实现解析※
CustomPartitionerWrapper解析:
DataStream类下调用partitionCustom()方法,需要传入Partitioner和KeySelector实例入参。
org.apache.flink.streaming.api.datastream.DataStream#partitionCustom
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
* method takes the key selector to get the key to partition on, and a partitioner that accepts
* the key type.
*
* <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param keySelector The KeySelector with which the DataStream is partitioned.
* @return The partitioned DataStream.
* @see KeySelector
*/
// 使用自定义分区器在选择器返回的键上对 DataStream 进行分区。此方法使用键选择器来获取要分区的键,并使用接受键类型的分区器。
//注意:此方法仅适用于单个字段键,即选择器不能返回字段的元组。
//参数:
//partitioner – 用于将分区分配给键的分区器。
//keySelector – 用于对 DataStream 进行分区的 KeySelector。
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
}
Partitioner接口只需要自定义实现partition()方法,即根据KeySelector得到的key,计算出来一个自定义通道位置。
org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper#selectChannel
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
return partitioner.partition(key, numberOfChannels);
}
可知setConnectionType()方法根据用户自定义分区类实例生成一个PartitionTransformation虚拟转换。
org.apache.flink.streaming.api.datastream.DataStream#setConnectionType
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
return new DataStream<>(
this.getExecutionEnvironment(),
new PartitionTransformation<>(this.getTransformation(), partitioner));
}
RebalancePartitioner解析:
可知RebalancePartitioner事先随机初始化一个下游通道位置,每来一条数据,目标通道位置+1事先轮询的过程。
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner#setup
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
//随机初始化一个下游通道位置
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner#selectChannel
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
//每来一条数据,目标通道位置+1
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
ShufflePartitioner解析:
可知ShufflePartitioner对每条数据随机生成一个0~numberOfChannels范围内的整数作为该数据的下游通道位置。
org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner#selectChannel
private Random random = new Random();
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
参考资料: