本篇开始解析Task写数据过程。Task写数据过程中涉及到RecordWriterOutput、RecordWriter、ChannelSelector、ResultPartition、ResultSubpartition、LocalBufferPool等组件,这些组件在之前随笔中或多或少都提及过,本篇随笔着重解析数据在这些组件之间的流转过程。
RecordWriterOutput※
RecordWriterOutput结构解析,Output接口面向api层,负责向下游传递数据。
RecordWriterOutput类型包含recordWriter、serializationDelegate等重要成员变量,包含分区器信息的recordWriter主要面向流中Record元素,负责转发Output接收的元素到下游某一个子分区中。serializationDelegate负责序列化Record元素数据。
以flatMap(...) api为例,当Flink应用人员实现FlatMapFunction UDF函数时最后一步会调用collector.collect(...)方法向下游发送数据,此处collector继承于Output接口,上篇随笔讲到Output接口继承于Collector接口,并且在OperatorChain创建过程中每个算子都有一个Output成员实例来接收本算子产生的数据并向下游发送数据。当算子之间是不可Chain边时,Output成员的实际类型往往是RecordWriterOutput类型。推知StreamFlatMap算子在算子链生成过程中创建Output实例,并在open()方法调用时封装出collector成员。在Flink应用运行过程中触发processElement(...)方法处理并收集数据。
org.apache.flink.streaming.api.operators.StreamFlatMap
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
org.apache.flink.streaming.api.operators.TimestampedCollector#collect
@Override
public void collect(T record) {
output.collect(reuse.replace(record));
}
org.apache.flink.streaming.runtime.io.RecordWriterOutput#collect
@Override
public void collect(StreamRecord<OUT> record) {
//收集并检查是否被锁住
if (collectAndCheckIfChained(record)) {
numRecordsOut.inc();
}
}
org.apache.flink.streaming.runtime.io.RecordWriterOutput#collectAndCheckIfChained
@Override
public boolean collectAndCheckIfChained(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
//我们不负责发射到主输出。
return false;
}
//推送到record writer
pushToRecordWriter(record);
return true;
}
RecordWriterOutput将接收到的StreamRecord数据元素赋值给serializationDelegate成员,借助recordWriter将带有序列化操作的serializationDelegate分发到某一个子分区中。
org.apache.flink.streaming.runtime.io.RecordWriterOutput#pushToRecordWriter
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
//借助recordWriter将带有序列化操作的serializationDelegate分发到某一个子分区中。
recordWriter.emit(serializationDelegate);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
以上是RecordWriterOutput向下游转发数据的高层抽象,下面继续分析RecordWriter的实现。
RecordWriter※
RecordWriter结构解析,主要包含targetPartition、numberOfSubpartitions、serializer、outputFlusher等成员变量,各个成员含义如下:
- ResultPartitionWriter targetPartition:结果分区,每一个算子实例Task都包含一个结果分区,结果分区的数量即是算子的并行度。
- int numberOfSubpartitions:结果子分区个数,对应下游有多少个算子实例Task消费该Task实例的数据。
- DataOutputSerializer serializer:数据输出视图,包含一个byte数组,负责序列化StreamRecord数据元素并顺序写入byte数组中,最后封装出一个ByteBuffer内存区域。
- OutputFlusher outputFlusher:定时刷新器,是一个单独的线程实现。当上游数据产出较慢时,该线程负责以固定的时间间隔将已有的buffer数据发送到下游,避免下游算子等待过长时间。
- ChannelSelector<T> channelSelector:RecordWriter子类ChannelSelectorRecordWriter中包含一个分区器成员,用来决定一个StreamRecord数据元素被分发到哪个结果子分区中。
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#emit
@Override
public void emit(T record) throws IOException {
emit(record, channelSelector.selectChannel(record));
}
RecordWriter数据输出。当RecordWriterOutput实例调用RecordWriter.emit方法时,转入到RecordWriter实例方法emit(T record,int targetSubpartition)中。由以下代码可知RecordWriter先将StreamRecord数据元素序列化写入到ByteBuffer,继而调用targetPartition.emitRecord(...)方法将ByteBuff缓冲区数据写入到具体的子分区实例。
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit
public void emit(T record, int targetSubpartition) throws IOException {
checkErroneous();
//将给定的序列化记录写入目标子分区。
targetPartition.emitRecord(
//先将StreamRecord数据元素序列化写入到ByteBuffer
serializeRecord(serializer, record), targetSubpartition);
if (flushAlways) {
targetPartition.flush(targetSubpartition);
}
}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#serializeRecord
@VisibleForTesting
public static ByteBuffer serializeRecord(
DataOutputSerializer serializer, IOReadableWritable record) throws IOException {
// the initial capacity should be no less than 4 bytes
//初始容量应不少于4字节
serializer.setPositionUnsafe(4);
// write data
//写入数据
record.write(serializer);
// write length
//写入长度
serializer.writeIntUnsafe(serializer.length() - 4, 0);
return serializer.wrapAsByteBuffer();
}
数据分区器ChannelSelector※
具体信息参考 Flink V1.20源码阅读笔记(6)- 数据分区解析
结果分区ResultPartition和结果子分区ResultSubpartition※
ResultPartition结构解析,包含以下几个核心成员:
- int partitionIndex:算子实例Task对应的结果分区下标。
- ResultPartitionType partitionType:结果分区类型。Flink流式应用类型一般是PIPELINED、PIPELINED_BOUNDED,PIPELINED_BOUNDED代表数据写入速度有限制,当流数据出现背压时,任务不会在自己的Buffer里缓存大量的数据。
- int numSubpartitions:结果分区包含的结果子分区个数。
- BufferPool bufferPool:结果分区包含的缓冲池,用来存储分配到子分区的数据,类型是LocalBuffPool。ResultPartition写数据时,会向bufferPool申请buffer并写入数据。
- SupplierWithException<BufferPool, IOException> bufferPoolFactory:缓冲池工厂类,用来创建bufferPool,类型是NetworkBufferPool。
- ResultSubpartition[] subpartitions : ResultPartition子类BufferWritingResultPartition 包含 结果子分区数组。
结果子分区ResultSubpartition结构解析,包含以下几个核心成员:
- buffers: 子类PipelinedSubpartition中包含用于缓存写入的数据,等待下游Task消费buffers里的数据。
- PipelinedSubpartitionView readView: 子类PipelinedSubpartition中包含读视图,该结果子分区数据消费行为的封装。
接上小节,调用targetPartition.emitRecord(...)方法将ByteBuffer缓冲区数据写入到具体的子分区实例中,方法实现中会先从LocalBufferPool资源池中获取Buffer资源并将ByteBuffer数据写入。BufferBuilder底层用MemorySegment代表Buffer资源信息。如果ByteBuffer数据过大,一个BufferBuilder被写满后还有剩余数据,则将该BufferBuilder记为finish状态,继续申请新BufferBuilder实例,直到把剩余数据写完。
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#emitRecord
//方法实现中会先从LocalBufferPool资源池中获取Buffer资源并将ByteBuffer数据写入。
//BufferBuilder底层用MemorySegment代表Buffer资源信息。
//如果ByteBuffer数据过大,一个BufferBuilder被写满后还有剩余数据,
//则将该BufferBuilder记为finish状态,继续申请新BufferBuilder实例,直到把剩余数据写完。
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
totalWrittenBytes += record.remaining();
//分配新的BufferBuilder实例并加入结果子分区buffers队列,最后将ByteBuffer数据写入到BufferBuilder实例中。
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
while (record.hasRemaining()) {
// full buffer, partial record
//缓冲区已满,部分记录
finishUnicastBufferBuilder(targetSubpartition);
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}
if (buffer.isFull()) {
// full buffer, full record
//满缓冲区、全记录
finishUnicastBufferBuilder(targetSubpartition);
}
// partial buffer, full record
//部分缓冲区,完整记录
}
调用appendUnicastDataForNewRecord(...)方法,分配新的BufferBuilder实例并加入结果子分区buffers队列,最后将ByteBuffer数据写入到BufferBuilder实例中。
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#appendUnicastDataForNewRecord
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws IOException {
if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
throw new ArrayIndexOutOfBoundsException(targetSubpartition);
}
BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
if (buffer == null) {
//请求新的
buffer = requestNewUnicastBufferBuilder(targetSubpartition);
//将数据Buffer存放到PipelinedSubpartition的buffers队列中。
addToSubpartition(buffer, targetSubpartition, 0, record.remaining());
}
append(record, buffer);
return buffer;
}
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#requestNewUnicastBufferBuilder
从LocalBufferPool中申请新的BufferBuilder资源。
private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition)
throws IOException {
checkInProduceState();
ensureUnicastMode();
//从LocalBufferPool中申请新的BufferBuilder资源。
final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
unicastBufferBuilders[targetSubpartition] = bufferBuilder;
return bufferBuilder;
}
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#requestNewBufferBuilderFromPool
private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
throws IOException {
//请求一个BufferBuilder实例
BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
if (bufferBuilder != null) {
return bufferBuilder;
}
hardBackPressuredTimeMsPerSecond.markStart();
try {
bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
hardBackPressuredTimeMsPerSecond.markEnd();
return bufferBuilder;
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for buffer");
}
}
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition#addToSubpartition
将数据Buffer存放到PipelinedSubpartition的buffers队列中。
private void addToSubpartition(
BufferBuilder buffer,
int targetSubpartition,
int partialRecordLength,
int minDesirableBufferSize)
throws IOException {
int desirableBufferSize =
//添加给定的缓冲区
subpartitions[targetSubpartition].add(
buffer.createBufferConsumerFromBeginning(), partialRecordLength);
resizeBuffer(buffer, desirableBufferSize, minDesirableBufferSize);
}
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#add
@Override
public int add(BufferConsumer bufferConsumer, int partialRecordLength) {
return add(bufferConsumer, partialRecordLength, false);
}
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#add
private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
checkNotNull(bufferConsumer);
final boolean notifyDataAvailable;
int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
int newBufferSize;
synchronized (buffers) {
if (isFinished || isReleased) {
bufferConsumer.close();
return ADD_BUFFER_ERROR_CODE;
}
// Add the bufferConsumer and update the stats
//添加bufferConsumer并更新统计信息
if (addBuffer(bufferConsumer, partialRecordLength)) {
prioritySequenceNumber = sequenceNumber;
}
updateStatistics(bufferConsumer);
increaseBuffersInBacklog(bufferConsumer);
notifyDataAvailable = finish || shouldNotifyDataAvailable();
isFinished |= finish;
newBufferSize = bufferSize;
}
notifyPriorityEvent(prioritySequenceNumber);
if (notifyDataAvailable) {
//通知可用数据
notifyDataAvailable();
}
return newBufferSize;
}
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition#addBuffer
@GuardedBy("buffers")
private boolean addBuffer(BufferConsumer bufferConsumer, int partialRecordLength) {
assert Thread.holdsLock(buffers);
if (bufferConsumer.getDataType().hasPriority()) {
return processPriorityBuffer(bufferConsumer, partialRecordLength);
} else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
== bufferConsumer.getDataType()) {
processTimeoutableCheckpointBarrier(bufferConsumer);
}
//调用add
buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength));
return false;
}
LocalBufferPool※
LocalBufferPool结构解析,包含以下几个核心成员:
- NetworkBufferPool networkBufferPool:全局网络缓存池,一个TaskManager只有一个NetworkBufferPool。LocalBufferPool中的Buffer资源来源于NetworkBufferPool。
- int numberOfRequiredMemorySegments:当前LocalBufferPool最少能申请的Buffer资源。
- ArrayDeque<MemorySegment> availableMemorySegments:LocalBufferPool中当前可用的Buffer资源集合,从NetworkBufferPool获取但还没缓存数据的Buffer。Buffer的底层存储是MemorySegment。
- int maxNumberOfMemorySegments:能从NetworkBufferPool申请的最大Buffer资源数量。
- int numberOfRequestedMemorySegments:当前为止从NetworkBufferPool申请的Buffer资源。
LocalBufferPool创建过程:
在解析Task启动过程时有一步设置结果分区的操作,LocalBufferPool的创建就在该设置步骤中
org.apache.flink.runtime.taskmanager.Task#setupPartitionsAndGates
@VisibleForTesting
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
for (ResultPartitionWriter partition : producedPartitions) {
//方法中会创建LocalBufferPool
partition.setup();
}
// InputGates must be initialized after the partitions, since during InputGate#setup
// we are requesting partitions
//InputGates必须在分区之后初始化,因为在InputGatesetup期间,我们正在请求分区
for (InputGate gate : inputGates) {
gate.setup();
}
}
org.apache.flink.runtime.io.network.partition.ResultPartition#setup
//向此结果分区注册缓冲池。
//每个结果分区都有一个池,该池由其所有子分区共享。
//为了符合TaskExecutor中任务注册的生命周期,池在构建后 * 在分区中注册。
@Override
public void setup() throws IOException {
checkState(
this.bufferPool == null,
"Bug in result partition setup logic: Already registered buffer pool.");
//创建bufferPool
//调用的是org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.createBufferPoolFactory
this.bufferPool = checkNotNull(bufferPoolFactory.get());
//做子类自己的设置操作
setupInternal();
//注册结果分区
partitionManager.registerResultPartition(this);
}
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#createBufferPoolFactory
//出于两个考虑,最小池大小应为numberOfSubpartitions 1:
//1. StreamTask只能在输出侧至少有一个可用缓冲区的情况下处理输入,
// 因此如果最小池大小恰好等于子分区的数量,则可能会导致卡住问题,因为每个子分区都可能维护部分未填充的缓冲区。
//2.为每个输出LocalBufferPool增加一个缓冲区,以避免性能回归,如果处理输入是基于至少一个缓冲区可用在输出侧
@VisibleForTesting
SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
int numberOfSubpartitions, ResultPartitionType type) {
return () -> {
Pair<Integer, Integer> pair =
//计算并返回结果分区使用的本地网络缓冲池大小
NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition(
configuredNetworkBuffersPerChannel,
floatingNetworkBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
numberOfSubpartitions,
tieredStorage.isPresent(),
tieredStorage
.map(ResultPartitionFactory::getNumTotalGuaranteedBuffers)
.orElse(0),
type);
return bufferPoolFactory.createBufferPool(
pair.getLeft(),
pair.getRight(),
numberOfSubpartitions,
maxBuffersPerChannel,
//返回该结果分区是否需要透支缓冲区。
isOverdraftBufferNeeded(type) ? maxOverdraftBuffersPerGate : 0);
};
}
创建LocalBufferPool时有三个重要参数控制着每个LocalBufferPool的Buffer资源数,参数分别是Task对应的下游任务数即结果子分区数*每个结果子分区的Buffer数+额外分配的Buffer数量。每个结果子分区的Buffer数由参数taskmanager.network.memory.buffers-per-channel控制,额外分配的Buffer数量由参数taskmanager.network.memory.floating-buffers-per-gate控制。
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool#createBufferPool
@Override
public BufferPool createBufferPool(
int numRequiredBuffers,
int maxUsedBuffers,
int numSubpartitions,
int maxBuffersPerChannel,
int maxOverdraftBuffersPerGate)
throws IOException {
return internalCreateBufferPool(
numRequiredBuffers,
maxUsedBuffers,
numSubpartitions,
maxBuffersPerChannel,
maxOverdraftBuffersPerGate);
}
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool#internalCreateBufferPool
private BufferPool internalCreateBufferPool(
int numRequiredBuffers,
int maxUsedBuffers,
int numSubpartitions,
int maxBuffersPerChannel,
int maxOverdraftBuffersPerGate)
throws IOException {
// It is necessary to use a separate lock from the one used for buffer
// requests to ensure deadlock freedom for failure cases.
//有必要使用与缓冲区请求所用的锁不同的锁,以确保故障情况下没有死锁。
synchronized (factoryLock) {
if (isDestroyed) {
throw new IllegalStateException("Network buffer pool has already been destroyed.");
}
// Ensure that the number of required buffers can be satisfied.
// With dynamic memory management this should become obsolete.
// 确保能够满足所需缓冲区的数量。有了动态内存管理,这应该变得过时了。
if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
throw new IOException(
String.format(
"Insufficient number of network buffers: "
+ "required %d, but only %d available. %s.",
numRequiredBuffers,
totalNumberOfMemorySegments - numTotalRequiredBuffers,
getConfigDescription()));
}
this.numTotalRequiredBuffers += numRequiredBuffers;
// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
//我们可以开始创建一个新的缓冲池并重新分配非固定大小的缓冲区。
LocalBufferPool localBufferPool =
new LocalBufferPool(
this,
numRequiredBuffers,
maxUsedBuffers,
numSubpartitions,
maxBuffersPerChannel,
maxOverdraftBuffersPerGate);
allBufferPools.add(localBufferPool);
if (numRequiredBuffers < maxUsedBuffers) {
resizableBufferPools.add(localBufferPool);
}
//重新分配缓冲区
redistributeBuffers();
return localBufferPool;
}
}
我们回到前面requestBufferBuilder(...)方法,看一下具体的过程:
org.apache.flink.runtime.io.network.buffer.LocalBufferPool#requestBufferBuilder
@Override
public BufferBuilder requestBufferBuilder() {
return toBufferBuilder(
//从LocalBufferPool可用队列里获取MemorySegment资源。
requestMemorySegment(UNKNOWN_CHANNEL), UNKNOWN_CHANNEL);
}
org.apache.flink.runtime.io.network.buffer.LocalBufferPool#toBufferBuilder
private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
if (memorySegment == null) {
return null;
}
if (targetChannel == UNKNOWN_CHANNEL) {
return new BufferBuilder(memorySegment, this);
} else {
//创建BufferBuilder
return new BufferBuilder(memorySegment, subpartitionBufferRecyclers[targetChannel]);
}
}
从LocalBufferPool可用队列里获取MemorySegment资源。
org.apache.flink.runtime.io.network.buffer.LocalBufferPool#requestMemorySegment
@Nullable
private MemorySegment requestMemorySegment(int targetChannel) {
MemorySegment segment = null;
synchronized (availableMemorySegments) {
checkDestroyed();
if (!availableMemorySegments.isEmpty()) {
segment = availableMemorySegments.poll();
} else if (isRequestedSizeReached()) {
// Only when the buffer request reaches the upper limit(i.e. current pool size),
// requests an overdraft buffer.
//仅当缓冲区请求达到上限时 (即当前池大小),请求透支缓冲区。
segment = requestOverdraftMemorySegmentFromGlobal();
}
if (segment == null) {
return null;
}
if (targetChannel != UNKNOWN_CHANNEL) {
if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
unavailableSubpartitionsCount++;
}
}
checkAndUpdateAvailability();
}
return segment;
}
参考资料:
Flink源码解析(十七)——Flink Task写数据过程解析