Flink V1.20源码阅读笔记(5.7)- Flink Task启动过程解析

-
-
2024-12-23

本篇继续解析Task构造函数内部的构造事宜,继而解析StreamTask启动过程。

 

核心对象说明

  • ResultPartitionWriter:ResultPartitionWriter面向的是Buffer,在数据传输层次中处于最低层,其子类实现中包含一个BufferPool组件,提供Buffer资源。子类实现中包含一个数组结构ResultSubpartition[] subpartitions的子分区组件,用来承接上层RecordWriter对象分发下来的数据。

  • RecordWriter:RecordWriter负责将Task处理的数据输出,其主要面向StreamRecord。RecordWriter比ResultPartitionWriter层级要高,每一个RecordWriter实例都包含一个ResultPartitionWriter子类实例。RecordWriter主要有两个子类实现,ChannelSelectorRecordWriter、BroadcastRecordWriter。子类中包含设置StreamRecord分区的分区器信息,用来决定每一个StreamRecord数据所属的低层级ResultPartitionWriter实例中subpartitions数组组件的下标元素。RecordWriterDelegate是RecordWriter类的代理类。代理零个、一个、多个RecordWriter对象。

  • Output:Output接口继承于Collector接口,Collector接口即为flatMap等api方法中Collector参数类型。Output是算子向下游传递的数据抽象,用来向下游发送StreamRecord、Watermark、LatencyMark等事件元素。

    • RecordWriterOutput主要负责数据跨网络的输出,

    • ChainingOutput主要在算子链内传递数据

    • BroadcastingOutputCollector包含一组Output,向下游所有Task广播数据。

  • 数据传递:Flink的数据传递过程主要分为三类

    • 第一类是算子链内部算子之间的数据传递,算子链所有算子在同一个本地线程内链式调用processElements()方法。

    • 第二类是本地线程间数据传递,存在某些有上下游关系的Task被分配到同一个机器节点的TaskManager中,以LocalInputChannel为中介,利用java对象的wait()/notifyAll()机制来等待上游数据发送及上游数据具备时唤醒数据消费动作。

    • 第三类跨网络间数据传递,利用RemoteInputChannel组件以及Netty的请求响应机制来跨网络间传递数据。

  • StreamInputProcessor输入处理器:StreamInputProcessor是StreamTask中读取数据的抽象,负责完成数据的读取、处理、输出给下游的过程。在其子类实现中往往会包含StreamTaskInput、DataOutput两个重要的组件,来共同完成数据读取、处理、输出给下游动作。

  • StreamTaskInput Task输入:StreamTaskInput是StreamTask数据输入的抽象,其包含InputGate组件,InputGate用来读取上游Task的数据。

  • DataOutput Task输出:其子类实现是StreamTaskNetworkOutput,StreamTaskNetworkOutput只负责将数据交给算子来进行处理,算子处理完数据后写出数据的动作由上面的Output负责。

 

Task构造过程解析

ResultPartitionWriter[] partitionWriters 成员生成过程

org.apache.flink.runtime.taskmanager.Task#Task

    public Task(
            JobInformation jobInformation,
            TaskInformation taskInformation,
            ExecutionAttemptID executionAttemptID,
            AllocationID slotAllocationId,
            List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
            List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
            MemoryManager memManager,
            SharedResources sharedResources,
            IOManager ioManager,
            ShuffleEnvironment<?, ?> shuffleEnvironment,
            KvStateService kvStateService,
            BroadcastVariableManager bcVarManager,
            TaskEventDispatcher taskEventDispatcher,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            TaskStateManager taskStateManager,
            TaskManagerActions taskManagerActions,
            InputSplitProvider inputSplitProvider,
            CheckpointResponder checkpointResponder,
            TaskOperatorEventGateway operatorCoordinatorEventGateway,
            GlobalAggregateManager aggregateManager,
            LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
            FileCache fileCache,
            TaskManagerRuntimeInfo taskManagerConfig,
            @Nonnull TaskMetricGroup metricGroup,
            PartitionProducerStateChecker partitionProducerStateChecker,
            Executor executor,
            ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory) {


		// 忽略其他部分
  
        // produced intermediate result partitions
        //Task的输出操作
        final ResultPartitionWriter[] resultPartitionWriters =
                shuffleEnvironment
                        //负责创建partitionWriters成员变量,
                        //由入参可知包含Task提交过程中创建的ResultPartitionDeploymentDescriptor实例信息,即Task的输出信息。
                        .createResultPartitionWriters(
                                taskShuffleContext, resultPartitionDeploymentDescriptors)
                        .toArray(new ResultPartitionWriter[] {});

        this.partitionWriters = resultPartitionWriters;

		// 忽略其他部分
    }

org.apache.flink.runtime.io.network.NettyShuffleEnvironment#createResultPartitionWriters

    public List<ResultPartition> createResultPartitionWriters(
            ShuffleIOOwnerContext ownerContext,
            List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors) {
        synchronized (lock) {
            Preconditions.checkState(
                    !isClosed, "The NettyShuffleEnvironment has already been shut down.");

            ResultPartition[] resultPartitions =
                    new ResultPartition[resultPartitionDeploymentDescriptors.size()];
            for (int partitionIndex = 0;
                    partitionIndex < resultPartitions.length;
                    partitionIndex++) {
                resultPartitions[partitionIndex] =
                        //ResultPartition列表长度即下游有多少个算子(注意,是不同算子的实例,不是同一个算子的不同算子实例)消费该Task的数据。
                        resultPartitionFactory.create(
                                ownerContext.getOwnerName(),
                                partitionIndex,
                                resultPartitionDeploymentDescriptors.get(partitionIndex));
            }

            //注册指标
            registerOutputMetrics(
                    config.isNetworkDetailedMetrics(),
                    ownerContext.getOutputGroup(),
                    resultPartitions);
            return Arrays.asList(resultPartitions);
        }
    }

org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#create

    public ResultPartition create(
            String taskNameWithSubtaskAndId,
            int partitionIndex,
            ResultPartitionDeploymentDescriptor desc) {
        return create(
                taskNameWithSubtaskAndId,
                partitionIndex,
                desc.getShuffleDescriptor().getResultPartitionID(),
                desc.getPartitionType(),
                desc.getTotalNumberOfPartitions(),
                //返回下游一个ExecutionJobVertex中有多少个ExecutionVertex实例消费该Task实例的数据
                desc.getNumberOfSubpartitions(),
                desc.getMaxParallelism(),
                //中间结果是否为广播结果。
                desc.isBroadcast(),
                desc.getShuffleDescriptor(),

                //创建BufferPool工厂
                createBufferPoolFactory(
                        //代表下游同一个算子有多少个算子实例消费该Task数据。
                        desc.getNumberOfSubpartitions(), desc.getPartitionType()),
                desc.isNumberOfPartitionConsumerUndefined());
    }

org.apache.flink.runtime.io.network.partition.ResultPartitionFactory#create

    public ResultPartition create(
            String taskNameWithSubtaskAndId,
            int partitionIndex,
            ResultPartitionID id,
            ResultPartitionType type,
            int numberOfPartitions,
            int numberOfSubpartitions,
            int maxParallelism,
            boolean isBroadcast,
            ShuffleDescriptor shuffleDescriptor,
            SupplierWithException<BufferPool, IOException> bufferPoolFactory,
            boolean isNumberOfPartitionConsumerUndefined) {
        BufferCompressor bufferCompressor = null;
        if (type.supportCompression() && batchShuffleCompressionEnabled) {
            bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
        }
        if (tieredStorage.isPresent() && type == ResultPartitionType.BLOCKING) {
            LOG.warn(
                    "When enabling tiered storage, the BLOCKING result partition will be replaced as HYBRID_FULL.");
            type = ResultPartitionType.HYBRID_FULL;
        }

        ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];

        final ResultPartition partition;
        if (type == ResultPartitionType.PIPELINED
                || type == ResultPartitionType.PIPELINED_BOUNDED
                || type == ResultPartitionType.PIPELINED_APPROXIMATE) {
            //首先创建隶属于该Task的ResultPartition实例,
            final PipelinedResultPartition pipelinedPartition =
                    new PipelinedResultPartition(
                            taskNameWithSubtaskAndId,
                            partitionIndex,
                            id,
                            type,
                            subpartitions,
                            maxParallelism,
                            partitionManager,
                            bufferCompressor,
                            bufferPoolFactory);

            for (int i = 0; i < subpartitions.length; i++) {
                if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
                    subpartitions[i] =
                            new PipelinedApproximateSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                } else {
                    //根据numberOfSubpartitions参数,遍历生成ResultSubpartitions数组成员。
                    //ResultSubpartitions数组是ResultPartition实例的成员变量。
                    subpartitions[i] =
                            new PipelinedSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                }
            }

            partition = pipelinedPartition;
        } 
        
        // 忽略其他部分


        partition.isNumberOfPartitionConsumerUndefined(isNumberOfPartitionConsumerUndefined);

        LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);

        return partition;
    }

以上即为Task实例partitionWriters成员生成过程,接下来解析inputGates成员生成过程。

 

IndexedInputGate[] inputGates成员生成过程

createInputGates(...)方法为inputGates的生成入口

org.apache.flink.runtime.taskmanager.Task#Task

    public Task(
            JobInformation jobInformation,
            TaskInformation taskInformation,
            ExecutionAttemptID executionAttemptID,
            AllocationID slotAllocationId,
            List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
            List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
            MemoryManager memManager,
            SharedResources sharedResources,
            IOManager ioManager,
            ShuffleEnvironment<?, ?> shuffleEnvironment,
            KvStateService kvStateService,
            BroadcastVariableManager bcVarManager,
            TaskEventDispatcher taskEventDispatcher,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            TaskStateManager taskStateManager,
            TaskManagerActions taskManagerActions,
            InputSplitProvider inputSplitProvider,
            CheckpointResponder checkpointResponder,
            TaskOperatorEventGateway operatorCoordinatorEventGateway,
            GlobalAggregateManager aggregateManager,
            LibraryCacheManager.ClassLoaderHandle classLoaderHandle,
            FileCache fileCache,
            TaskManagerRuntimeInfo taskManagerConfig,
            @Nonnull TaskMetricGroup metricGroup,
            PartitionProducerStateChecker partitionProducerStateChecker,
            Executor executor,
            ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory) {




		//忽略其他

        // consumed intermediate result partitions
        //消耗的中间结果分区
        final IndexedInputGate[] gates =
                shuffleEnvironment
                        //inputGates的生成入口,入参包含Task提交过程中创建的InputGateDeploymentDescriptor实例信息即Task的输入信
                        .createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
                        .toArray(new IndexedInputGate[0]);

        //Task的输入操作
        this.inputGates = new IndexedInputGate[gates.length];
        int counter = 0;
        for (IndexedInputGate gate : gates) {
            inputGates[counter++] =
                    new InputGateWithMetrics(
                            gate, metrics.getIOMetricGroup().getNumBytesInCounter());
        }


    }

org.apache.flink.runtime.io.network.NettyShuffleEnvironment#createInputGates

    public List<SingleInputGate> createInputGates(
            ShuffleIOOwnerContext ownerContext,
            PartitionProducerStateProvider partitionProducerStateProvider,
            //InputGateDeploymentDescriptor个数代表该Task消费上游多少个算子数据
            List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
        synchronized (lock) {
            Preconditions.checkState(
                    !isClosed, "The NettyShuffleEnvironment has already been shut down.");

            MetricGroup networkInputGroup = ownerContext.getInputGroup();

            InputChannelMetrics inputChannelMetrics =
                    new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());

            SingleInputGate[] inputGates =
                    new SingleInputGate[inputGateDeploymentDescriptors.size()];
            for (int gateIndex = 0; gateIndex < inputGates.length; gateIndex++) {
                final InputGateDeploymentDescriptor igdd =
                        inputGateDeploymentDescriptors.get(gateIndex);
                //SingleInputGate实例包含一个重要的成员变量BufferPoolFactory,它负责生成InputGate的数据Buffer空间。
                SingleInputGate inputGate =
                        //遍历InputGateDeploymentDescriptor个数开始创建一个个InputGate实例信息。
                        singleInputGateFactory.create(
                                ownerContext,
                                gateIndex,
                                igdd,
                                partitionProducerStateProvider,
                                inputChannelMetrics);
                InputGateID id =
                        new InputGateID(
                                igdd.getConsumedResultId(), ownerContext.getExecutionAttemptID());
                Set<SingleInputGate> inputGateSet =
                        inputGatesById.computeIfAbsent(
                                id, ignored -> ConcurrentHashMap.newKeySet());
                inputGateSet.add(inputGate);
                inputGatesById.put(id, inputGateSet);
                inputGate
                        .getCloseFuture()
                        .thenRun(
                                () ->
                                        inputGatesById.computeIfPresent(
                                                id,
                                                (key, value) -> {
                                                    value.remove(inputGate);
                                                    if (value.isEmpty()) {
                                                        return null;
                                                    }
                                                    return value;
                                                }));
                inputGates[gateIndex] = inputGate;
            }

            if (config.getDebloatConfiguration().isEnabled()) {
                registerDebloatingTaskMetrics(inputGates, ownerContext.getParentGroup());
            }

            registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup, inputGates);
            return Arrays.asList(inputGates);
        }
    }

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#create

    //创建InputGate及其所有输入通道。
    public SingleInputGate create(
            @Nonnull ShuffleIOOwnerContext owner,
            int gateIndex,
            @Nonnull InputGateDeploymentDescriptor igdd,
            @Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
            @Nonnull InputChannelMetrics metrics) {
        // This variable describes whether it is supported to have one input channel consume
        // multiple subpartitions, if multiple subpartitions from the same partition are
        // assigned to this input gate.
        //
        // For now this function is only supported in the new mode of Hybrid Shuffle.
        //如果来自同一分区的多个子分区被分配给此InputGate,则此变量描述是否支持让一个输入通道使用多个子分区。
        //目前仅在混合洗牌的新模式下支持此功能。
        boolean isSharedInputChannelSupported =
                igdd.getConsumedPartitionType().isHybridResultPartition()
                        && tieredStorageConfiguration != null;
                        

		//InputGate的缓冲区规格
        GateBuffersSpec gateBuffersSpec =
                createGateBuffersSpec(
                        maxRequiredBuffersPerGate,
                        configuredNetworkBuffersPerChannel,
                        floatingNetworkBuffersPerGate,
                        igdd.getConsumedPartitionType(),
                        //计算通道数
                        calculateNumChannels(
                                igdd.getShuffleDescriptors().length,
                                //流式作业subpartitionIndexRange范围一般都是1
                                igdd.getConsumedSubpartitionIndexRange().size(),
                                isSharedInputChannelSupported),
                        tieredStorageConfiguration != null);
        SupplierWithException<BufferPool, IOException> bufferPoolFactory =
                //尝试创建一个缓冲池,保证至少提供所需数量的缓冲区。
                createBufferPoolFactory(
                        networkBufferPool,
                        gateBuffersSpec.getRequiredFloatingBuffers(),
                        gateBuffersSpec.getTotalFloatingBuffers());

        BufferDecompressor bufferDecompressor = null;
        if (igdd.getConsumedPartitionType().supportCompression()
                && batchShuffleCompressionEnabled) {
            // 创建压缩Buffer的解压缩器。
            bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
        }

        final String owningTaskName = owner.getOwnerName();
        final MetricGroup networkInputGroup = owner.getInputGroup();

        ResultSubpartitionIndexSet subpartitionIndexSet =
                new ResultSubpartitionIndexSet(igdd.getConsumedSubpartitionIndexRange());
        //开始创建SingleInputGate实例
        SingleInputGate inputGate =
                new SingleInputGate(
                        owningTaskName,
                        gateIndex,
                        igdd.getConsumedResultId(),
                        igdd.getConsumedPartitionType(),
                        calculateNumChannels(
                                igdd.getShuffleDescriptors().length,
                                subpartitionIndexSet.size(),
                                isSharedInputChannelSupported),
                        partitionProducerStateProvider,
                        bufferPoolFactory,
                        bufferDecompressor,
                        networkBufferPool,
                        networkBufferSize,
                        new ThroughputCalculator(SystemClock.getInstance()),
                        maybeCreateBufferDebloater(
                                owningTaskName, gateIndex, networkInputGroup.addGroup(gateIndex)));

        //开始创建每个SingleInputGate的InputChannel数组成员。和 分层存储服务
        createInputChannelsAndTieredStorageService(
                owningTaskName,
                igdd,
                inputGate,
                subpartitionIndexSet,
                gateBuffersSpec,
                metrics,
                isSharedInputChannelSupported);
        return inputGate;
    }

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createInputChannelsAndTieredStorageService

    private void createInputChannelsAndTieredStorageService(
            String owningTaskName,
            InputGateDeploymentDescriptor inputGateDeploymentDescriptor,
            SingleInputGate inputGate,
            ResultSubpartitionIndexSet subpartitionIndexSet,
            GateBuffersSpec gateBuffersSpec,
            InputChannelMetrics metrics,
            boolean isSharedInputChannelSupported) {
        ShuffleDescriptor[] shuffleDescriptors =
                inputGateDeploymentDescriptor.getShuffleDescriptors();

        // Create the input channels. There is one input channel for each consumed subpartition.
        //创建输入通道。每个消耗的子分区都有一个输入通道。
        InputChannel[] inputChannels =
                new InputChannel
                        [calculateNumChannels(
                                shuffleDescriptors.length,
                                subpartitionIndexSet.size(),
                                isSharedInputChannelSupported)];

        ChannelStatistics channelStatistics = new ChannelStatistics();

        int channelIdx = 0;
        final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs = new ArrayList<>();
        List<List<TierShuffleDescriptor>> tierShuffleDescriptors = new ArrayList<>();

        //多数情况下每个算子有多个算子实例,Task会为每个上游算子的每个算子实例创建一个InputChannel通道信息。
        //一个Task会消费上游算子的多个算子实例,通过InputChannel和每个算子实例产生关联关系。
        for (ShuffleDescriptor descriptor : shuffleDescriptors) {
            TieredStoragePartitionId partitionId =
                    TieredStorageIdMappingUtils.convertId(descriptor.getResultPartitionID());
            if (isSharedInputChannelSupported) {
                inputChannels[channelIdx] =
                        createInputChannel(
                                inputGate,
                                channelIdx,
                                gateBuffersSpec.getEffectiveExclusiveBuffersPerChannel(),
                                descriptor,
                                subpartitionIndexSet,
                                channelStatistics,
                                metrics);
                if (tieredStorageConfiguration != null) {
                    addTierShuffleDescriptors(tierShuffleDescriptors, descriptor);

                    tieredStorageConsumerSpecs.add(
                            new TieredStorageConsumerSpec(
                                    inputGate.getInputGateIndex(),
                                    partitionId,
                                    new TieredStorageInputChannelId(channelIdx),
                                    subpartitionIndexSet));
                }
                channelIdx++;
            } else {
                for (int subpartitionIndex : subpartitionIndexSet.values()) {
                    inputChannels[channelIdx] =
                            createInputChannel(
                                    inputGate,
                                    channelIdx,
                                    gateBuffersSpec.getEffectiveExclusiveBuffersPerChannel(),
                                    descriptor,
                                    new ResultSubpartitionIndexSet(subpartitionIndex),
                                    channelStatistics,
                                    metrics);
                    if (tieredStorageConfiguration != null) {
                        addTierShuffleDescriptors(tierShuffleDescriptors, descriptor);

                        tieredStorageConsumerSpecs.add(
                                new TieredStorageConsumerSpec(
                                        inputGate.getInputGateIndex(),
                                        partitionId,
                                        new TieredStorageInputChannelId(channelIdx),
                                        new ResultSubpartitionIndexSet(subpartitionIndex)));
                    }
                    channelIdx++;
                }
            }
        }

        inputGate.setInputChannels(inputChannels);

        if (tieredStorageConfiguration != null) {
            TieredStorageConsumerClient tieredStorageConsumerClient =
                    new TieredStorageConsumerClient(
                            tieredStorageConfiguration.getTierFactories(),
                            tieredStorageConsumerSpecs,
                            tierShuffleDescriptors,
                            tieredStorageNettyService);
            inputGate.setTieredStorageService(
                    tieredStorageConsumerSpecs,
                    tieredStorageConsumerClient,
                    tieredStorageNettyService);
        }

        LOG.debug(
                "{}: Created {} input channels ({}).",
                owningTaskName,
                inputChannels.length,
                channelStatistics);
    }

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createInputChannel

    private InputChannel createInputChannel(
            SingleInputGate inputGate,
            int index,
            int buffersPerChannel,
            ShuffleDescriptor shuffleDescriptor,
            ResultSubpartitionIndexSet subpartitionIndexSet,
            ChannelStatistics channelStatistics,
            InputChannelMetrics metrics) {
        //对已知和未知的ShuffleDescriptor应用不同的函数。
        //还强制转换已知的ShuffleDescriptor 。
        return applyWithShuffleTypeCheck(
                NettyShuffleDescriptor.class,
                shuffleDescriptor,
                unknownShuffleDescriptor -> {
                    channelStatistics.numUnknownChannels++;
                    //创建未知输入通道
                    return new UnknownInputChannel(
                            inputGate,
                            index,
                            unknownShuffleDescriptor.getResultPartitionID(),
                            subpartitionIndexSet,
                            partitionManager,
                            taskEventPublisher,
                            connectionManager,
                            partitionRequestInitialBackoff,
                            partitionRequestMaxBackoff,
                            partitionRequestListenerTimeout,
                            buffersPerChannel,
                            metrics);
                },
                nettyShuffleDescriptor ->
                        //创建已知的输入通道
                        createKnownInputChannel(
                                inputGate,
                                index,
                                buffersPerChannel,
                                nettyShuffleDescriptor,
                                subpartitionIndexSet,
                                channelStatistics,
                                metrics));
    }

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory#createKnownInputChannel

    @VisibleForTesting
    protected InputChannel createKnownInputChannel(
            SingleInputGate inputGate,
            int index,
            int buffersPerChannel,
            NettyShuffleDescriptor inputChannelDescriptor,
            ResultSubpartitionIndexSet subpartitionIndexSet,
            ChannelStatistics channelStatistics,
            InputChannelMetrics metrics) {
        ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
        
        //根据上下游Task实例所在的机器位置信息判断创建不同的LocalRecoveredInputChannel、RemoteRecoveredInputChannel类型实例。
        if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
            // Consuming task is deployed to the same TaskManager as the partition => local
            //消费任务部署到与分区相同的TaskManager => local
            channelStatistics.numLocalChannels++;
            return new LocalRecoveredInputChannel(
                    inputGate,
                    index,
                    partitionId,
                    subpartitionIndexSet,
                    partitionManager,
                    taskEventPublisher,
                    partitionRequestInitialBackoff,
                    partitionRequestMaxBackoff,
                    buffersPerChannel,
                    metrics);
        } else {
            // Different instances => remote
            //不同的实例 => 远程
            channelStatistics.numRemoteChannels++;
            return new RemoteRecoveredInputChannel(
                    inputGate,
                    index,
                    partitionId,
                    subpartitionIndexSet,
                    inputChannelDescriptor.getConnectionId(),
                    connectionManager,
                    partitionRequestInitialBackoff,
                    partitionRequestMaxBackoff,
                    partitionRequestListenerTimeout,
                    buffersPerChannel,
                    metrics);
        }
    }

以上即为Task实例初始化过程中输入输出操作的创建过程,接下来解析StreamTask的构成过程。

 

StreamTask构造过程解析

Task类继承于Runnable接口,在创建完Task实例后,TaskExecutor.submitTask(...)就会运行Task.run()方法,开始Task的运行过程。

org.apache.flink.runtime.taskmanager.Task#run

    @Override
    public void run() {
        try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
            doRun();
        } finally {
            terminationFuture.complete(executionState);
        }
    }

org.apache.flink.runtime.taskmanager.Task#doRun

    private void doRun() {

		// 忽略其他

            //主要作用是设置ResultPartition和InputGate的BufferPool信息。
            setupPartitionsAndGates(partitionWriters, inputGates);

            for (ResultPartitionWriter partitionWriter : partitionWriters) {
                taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
            }

            
            // When constructing invokable, separate threads can be constructed and thus should be
            // monitored for system exit (in addition to invoking thread itself monitored below).
            //构造可调用时,可以构造单独的线程,因此应该监视系统退出(除了下面监视的调用线程本身之外)。
            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
            try {
                // now load and instantiate the task's invokable code
                //现在加载并实例化任务的可调用代码
                invokable =
                        //此处反射创建的invokable实例就是StreamTask实例。
                        loadAndInstantiateInvokable(
                                userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
            } finally {
                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
            }
            
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
            //实际任务核心工作

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            //我们必须严格确保当我们切换到运行状态时,cancel() 调用可以访问该可调用对象。
            this.invokable = invokable;

            //恢复和调用
            restoreAndInvoke(invokable);

          // 忽略其他 
        
    }

org.apache.flink.runtime.taskmanager.Task#setupPartitionsAndGates

    public static void setupPartitionsAndGates(
            ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {

        for (ResultPartitionWriter partition : producedPartitions) {
            //方法中会创建LocalBufferPool
            partition.setup();
        }

        // InputGates must be initialized after the partitions, since during InputGate#setup
        // we are requesting partitions
        //InputGates必须在分区之后初始化,因为在InputGatesetup期间,我们正在请求分区
        for (InputGate gate : inputGates) {
            gate.setup();
        }
    }

org.apache.flink.runtime.io.network.partition.ResultPartition#setup

    public void setup() throws IOException {
        checkState(
                this.bufferPool == null,
                "Bug in result partition setup logic: Already registered buffer pool.");

        //创建bufferPool
        //调用的是org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.createBufferPoolFactory
        this.bufferPool = checkNotNull(bufferPoolFactory.get());
        //做子类自己的设置操作
        setupInternal();
        //注册结果分区
        partitionManager.registerResultPartition(this);
    }

org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#setup

    public void setup() throws IOException {
        checkState(
                this.bufferPool == null,
                "Bug in input gate setup logic: Already registered buffer pool.");

        //调用org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(int, int)
        BufferPool bufferPool = bufferPoolFactory.get();
        setBufferPool(bufferPool);
        if (tieredStorageConsumerClient != null) {
            tieredStorageConsumerClient.setup(bufferPool);
        }

        //将专用缓冲区直接分配给所有远程输入通道,以实现基于信用的模式。
        setupChannels();
    }

org.apache.flink.runtime.taskmanager.Task#loadAndInstantiateInvokable

    private static TaskInvokable loadAndInstantiateInvokable(
            ClassLoader classLoader, String className, Environment environment) throws Throwable {

        final Class<? extends TaskInvokable> invokableClass;
        try {
            invokableClass =
                    Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class);
        } catch (Throwable t) {
            throw new Exception("Could not load the task's invokable class.", t);
        }

        Constructor<? extends TaskInvokable> statelessCtor;

        try {
            //获取只包含Environment入参类型的StreamTask构造函数
            statelessCtor = invokableClass.getConstructor(Environment.class);
        } catch (NoSuchMethodException ee) {
            throw new FlinkException("Task misses proper constructor", ee);
        }

        // instantiate the class
        try {
            //noinspection ConstantConditions  --> cannot happen
            //新建StreamTask实例。
            return statelessCtor.newInstance(environment);
        } catch (InvocationTargetException e) {
            // directly forward exceptions from the eager initialization
            throw e.getTargetException();
        } catch (Exception e) {
            throw new FlinkException("Could not instantiate the task's invokable class.", e);
        }
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#StreamTask(org.apache.flink.runtime.execution.Environment)

    protected StreamTask(Environment env) throws Exception {
        this(env, null);
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#StreamTask(org.apache.flink.runtime.execution.Environment, org.apache.flink.streaming.runtime.tasks.TimerService, java.lang.Thread.UncaughtExceptionHandler, org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor, org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox)

    protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor,
            TaskMailbox mailbox)
            throws Exception {
        // The registration of all closeable resources. The order of registration is important.
        //所有可关闭资源的注册。注册的顺序很重要。
        resourceCloser = new AutoCloseableRegistry();
        try {


		// 忽略其他

            //在Flink应用执行数据处理、checkpoint执行、定时器触发等过程中可能会同时修改状态,
            //Flink系统通过引入Mailbox线程模型来解决状态操作不一致的情况。
            //其中MailboxProcessor负责拉取、处理Mailbox中的Mail,即checkpoint执行、定时器触发等动作,
            //而MailboxProcessor成员变量MailboxDefaultAction mailboxDefaultAction
            //默认动作负责DataStream上普通消息的处理,包括:处理Event、barrier、Watermark等。
            //TaskMailboxImpl为Mailbox的实现,负责存储checkpoint执行、定时器触发等动作,
            //MailboxExecutorImpl负责提交生成checkpoint执行、定时器触发等动作。
            this.mailboxProcessor =
                    new MailboxProcessor(
                            //processInput()方法作为成员mailboxDefaultAction的值,负责常规的数据处理
                            this::processInput,
                            mailbox, actionExecutor, mailboxMetricsControl);

            // Should be closed last.
            // 应该最后关闭。
            resourceCloser.registerCloseable(mailboxProcessor);


            //RecordWriter创建入口。
            this.recordWriter = createRecordWriterDelegate(configuration, environment);
            
        

            //StateBackend实例创建过程
            this.stateBackend = createStateBackend();
            
		// 忽略其他
       
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriterDelegate

    public static <OUT>
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
                    createRecordWriterDelegate(
                            StreamConfig configuration, Environment environment) {
        List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
                createRecordWriters(configuration, environment);
        if (recordWrites.size() == 1) {
            return new SingleRecordWriter<>(recordWrites.get(0));
        } else if (recordWrites.size() == 0) {
            return new NonRecordWriter<>();
        } else {
            return new MultipleRecordWriters<>(recordWrites);
        }
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriters

    private static <OUT>
            List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
                    StreamConfig configuration, Environment environment) {
        List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
                new ArrayList<>();
        //先获取每个算子不可chain下游的出边集合,
        List<NonChainedOutput> outputsInOrder =
                configuration.getVertexNonChainedOutputs(
                        environment.getUserCodeClassLoader().asClassLoader());

        int index = 0;
        //遍历该集合,根据每个不可chain下游算子的出边创建一个RecordWriter实例。
        for (NonChainedOutput streamOutput : outputsInOrder) {
            replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
                    environment, streamOutput, index);
            recordWriters.add(
                    //创建
                    createRecordWriter(
                            streamOutput,
                            index++,
                            environment,
                            environment.getTaskInfo().getTaskNameWithSubtasks(),
                            streamOutput.getBufferTimeout()));
        }
        return recordWriters;
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#createRecordWriter

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
            NonChainedOutput streamOutput,
            int outputIndex,
            Environment environment,
            String taskNameWithSubtask,
            long bufferTimeout) {

        StreamPartitioner<OUT> outputPartitioner = null;

        // Clones the partition to avoid multiple stream edges sharing the same stream partitioner,
        // like the case of https://issues.apache.org/jira/browse/FLINK-14087.
        //克隆分区以避免多个流边缘共享相同的流分区器,例如 https:issues.apache.orgjirabrowseFLINK-14087 的情况。
        try {
            outputPartitioner =
                    InstantiationUtil.clone(
                            (StreamPartitioner<OUT>) streamOutput.getPartitioner(),
                            environment.getUserCodeClassLoader().asClassLoader());
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }

        LOG.debug(
                "Using partitioner {} for output {} of task {}",
                outputPartitioner,
                outputIndex,
                taskNameWithSubtask);

        //从ResultPartitionWriter[]中获取
        ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);

        // we initialize the partitioner here with the number of key groups (aka max. parallelism)
        //我们在这里用键组的数量 (也称为最大并行性) 初始化分区程序
        if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
            int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
            if (0 < numKeyGroups) {
                ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
            }
        }

        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
                new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
                        .setChannelSelector(outputPartitioner)
                        .setTimeout(bufferTimeout)
                        .setTaskName(taskNameWithSubtask)
                        //构建RecordWriter
                        .build(bufferWriter);
        output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return output;
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend

    private StateBackend createStateBackend() throws Exception {
        //获取用户配置的StateBackend,配置项为:state.backend.type
        final StateBackend fromApplication =
                configuration.getStateBackend(getUserCodeClassLoader());

        //状态后端加载器
        return StateBackendLoader.fromApplicationOrConfigOrDefault(
                fromApplication,
                getJobConfiguration(),
                getEnvironment().getTaskManagerInfo().getConfiguration(),
                getUserCodeClassLoader(),
                LOG);
    }

以上即为StreamTask关键构造过程解析。

 

StreamTask启动过程解析

在StreamTask实例创建完后紧接着触发该实例的restore()方法,开始StreamTask启动过程的准备工作。

该方法中主要有三个重要的步骤:

  • 一是创建OperatorChain实例

  • 二是创建StreamInputProcessor实例,流输入处理器

  • 三是初始化算子操作。

org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke

    private void restoreAndInvoke(TaskInvokable finalInvokable) throws Exception {
        try {
            // switch to the INITIALIZING state, if that fails, we have been canceled/failed in the
            // meantime
            //切换到初始化状态,如果失败,我们已被取消失败在此期间
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
                throw new CancelTaskException();
            }

            taskManagerActions.updateTaskExecutionState(
                    new TaskExecutionState(executionId, ExecutionState.INITIALIZING));

            // make sure the user code classloader is accessible thread-locally
            //确保用户代码classloader是线程本地可访问的
            executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());

            //调用restore
            runWithSystemExitMonitoring(finalInvokable::restore);

            if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            //通知大家我们已切换到RUNNING
            taskManagerActions.updateTaskExecutionState(
                    new TaskExecutionState(executionId, ExecutionState.RUNNING));

            //调用invoke
            runWithSystemExitMonitoring(finalInvokable::invoke);
        } catch (Throwable throwable) {
            try {
                runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(throwable));
            } catch (Throwable cleanUpThrowable) {
                throwable.addSuppressed(cleanUpThrowable);
            }
            throw throwable;
        }
        runWithSystemExitMonitoring(() -> finalInvokable.cleanUp(null));
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#restore

    public final void restore() throws Exception {
        restoreInternal();
    }

org.apache.flink.streaming.runtime.tasks.StreamTask#restoreInternal

    void restoreInternal() throws Exception {
		// 忽略其他       
       

            operatorChain =
                    getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
                            ? new FinishedOperatorChain<>(this, recordWriter)
                            //创建OperatorChain实例,一般来说OperatorChain都是RegularOperatorChain类型,
                            //转入到RegularOperatorChain构造函数中。
                            : new RegularOperatorChain<>(this, recordWriter);
            mainOperator = operatorChain.getMainOperator();



            // task specific initialization
            //特定于任务的初始化
            //该方法主要由子类实现。常见的StreamTask子类包括OneInputStreamTask和SourceStreamTask等。
            init();




            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            //我们需要确保在打开所有运算符之前,无法执行open() 中计划的任何触发器
            CompletableFuture<Void> allGatesRecoveredFuture =
                    //在该方法中有一步重要的操作就是初始化算子链中的各个算子。
                    actionExecutor.call(() -> restoreStateAndGates(initializationMetrics));

            // Run mailbox until all gates will be recovered.
            //其主要作用是触发mailboxDefaultAction默认动作的执行,即StreamTask.processInput(...)方法的执行。
            mailboxProcessor.runMailboxLoop();

		// 忽略其他  
    }

 

创建OperatorChain实例过程

org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#RegularOperatorChain

    public RegularOperatorChain(
            StreamTask<OUT, OP> containingTask,
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
        super(containingTask, recordWriterDelegate);
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#OperatorChain

    public OperatorChain(
            StreamTask<OUT, OP> containingTask,
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {

        this.operatorEventDispatcher =
                new OperatorEventDispatcherImpl(
                        containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
                        containingTask.getEnvironment().getOperatorCoordinatorEventGateway());

        final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        final StreamConfig configuration = containingTask.getConfiguration();

        //算子工厂实例
        StreamOperatorFactory<OUT> operatorFactory =
                configuration.getStreamOperatorFactory(userCodeClassloader);

        // we read the chained configs, and the order of record writer registrations by output name
        //我们读取了链接的configs,以及按输出名称记录编写器注册的顺序
        //可chain的算子配置集合
        Map<Integer, StreamConfig> chainedConfigs =
                configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);

        // create the final output stream writers
        // we iterate through all the out edges from this job vertex and create a stream output
        //创建最终输出流编写器
        //我们遍历这个作业顶点的所有out边,并创建一个流输出
        List<NonChainedOutput> outputsInOrder =
                configuration.getVertexNonChainedOutputs(userCodeClassloader);
        Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs =
                CollectionUtil.newHashMapWithExpectedSize(outputsInOrder.size());
        this.streamOutputs = new RecordWriterOutput<?>[outputsInOrder.size()];
        this.finishedOnRestoreInput =
                this.isTaskDeployedAsFinished()
                        ? new FinishedOnRestoreInput(
                                streamOutputs, configuration.getInputs(userCodeClassloader).length)
                        : null;

        // from here on, we need to make sure that the output writers are shut down again on failure
        // 从这里开始,我们需要确保输出写入器在失败时再次关闭
        boolean success = false;
        try {
            //创建链输出
            createChainOutputs(
                    outputsInOrder,
                    recordWriterDelegate,
                    chainedConfigs,
                    containingTask,
                    recordWriterOutputs);

            // we create the chain of operators and grab the collector that leads into the chain
            //我们创建操作符链,并抓住通向链的收集器
            List<StreamOperatorWrapper<?, ?>> allOpWrappers =
                    new ArrayList<>(chainedConfigs.size());
            this.mainOperatorOutput =
                    //负责创建整个OperatorChain中的算子以及算子输出。
                    createOutputCollector(
                            containingTask,
                            configuration,
                            chainedConfigs,
                            userCodeClassloader,
                            recordWriterOutputs,
                            allOpWrappers,
                            containingTask.getMailboxExecutorFactory(),
                            operatorFactory != null);

            if (operatorFactory != null) {
                Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
                        StreamOperatorFactoryUtil.createOperator(
                                operatorFactory,
                                containingTask,
                                configuration,
                                mainOperatorOutput,
                                operatorEventDispatcher);

                OP mainOperator = mainOperatorAndTimeService.f0;
                mainOperator
                        .getMetricGroup()
                        .gauge(
                                MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
                                mainOperatorOutput.getWatermarkGauge());
                this.mainOperatorWrapper =
                        createOperatorWrapper(
                                mainOperator,
                                containingTask,
                                configuration,
                                mainOperatorAndTimeService.f1,
                                true);

                // add main operator to end of chain
                //将主运算符添加到链的末尾
                allOpWrappers.add(mainOperatorWrapper);

                this.tailOperatorWrapper = allOpWrappers.get(0);
            } else {
                checkState(allOpWrappers.size() == 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }

            this.chainedSources =
                    createChainedSources(
                            containingTask,
                            configuration.getInputs(userCodeClassloader),
                            chainedConfigs,
                            userCodeClassloader,
                            allOpWrappers);

            this.numOperators = allOpWrappers.size();

            firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);

            success = true;
        } finally {
            // make sure we clean up after ourselves in case of a failure after acquiring
            // the first resources
            //确保我们在获取第一个资源后发生故障时自行清理
            if (!success) {
                for (int i = 0; i < streamOutputs.length; i++) {
                    if (streamOutputs[i] != null) {
                        streamOutputs[i].close();
                    }
                    streamOutputs[i] = null;
                }
            }
        }
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#createChainOutputs

    private void createChainOutputs(
            List<NonChainedOutput> outputsInOrder,
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
            Map<Integer, StreamConfig> chainedConfigs,
            StreamTask<OUT, OP> containingTask,
            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs) {
        for (int i = 0; i < outputsInOrder.size(); ++i) {
            NonChainedOutput output = outputsInOrder.get(i);

            RecordWriterOutput<?> recordWriterOutput =
            		//创建流输出
                    createStreamOutput(
                            recordWriterDelegate.getRecordWriter(i),
                            output,
                            chainedConfigs.get(output.getSourceNodeId()),
                            containingTask.getEnvironment());

            this.streamOutputs[i] = recordWriterOutput;
            recordWriterOutputs.put(output.getDataSetId(), recordWriterOutput);
        }
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#createStreamOutput

    private RecordWriterOutput<OUT> createStreamOutput(
            RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
            NonChainedOutput streamOutput,
            StreamConfig upStreamConfig,
            Environment taskEnvironment) {
        OutputTag sideOutputTag =
                //OutputTag,如果不是sideOutput,则返回null
                streamOutput.getOutputTag(); // OutputTag, return null if not sideOutput

        TypeSerializer outSerializer;

        if (streamOutput.getOutputTag() != null) {
            // side output
            //侧输出
            outSerializer =
                    upStreamConfig.getTypeSerializerSideOut(
                            streamOutput.getOutputTag(),
                            taskEnvironment.getUserCodeClassLoader().asClassLoader());
        } else {
            // main output
            //主输出
            outSerializer =
                    upStreamConfig.getTypeSerializerOut(
                            taskEnvironment.getUserCodeClassLoader().asClassLoader());
        }

        return closer.register(
                new RecordWriterOutput<OUT>(
                        recordWriter,
                        outSerializer,
                        sideOutputTag,
                        streamOutput.supportsUnalignedCheckpoints()));
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#createOutputCollector

    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
            List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
            MailboxExecutorFactory mailboxExecutorFactory,
            boolean shouldAddMetric) {
        List<OutputWithChainingCheck<StreamRecord<T>>> allOutputs = new ArrayList<>(4);

        // create collectors for the network outputs
        //为网络输出创建收集器
        for (NonChainedOutput streamOutput :
                operatorConfig.getOperatorNonChainedOutputs(userCodeClassloader)) {
            @SuppressWarnings("unchecked")
            RecordWriterOutput<T> recordWriterOutput =
                    (RecordWriterOutput<T>) recordWriterOutputs.get(streamOutput.getDataSetId());

            allOutputs.add(recordWriterOutput);
        }

        // Create collectors for the chained outputs
        //为链式输出创建收集器
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);

            WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                    //递归地创建, 运算符是尾对头创建的
                    createOperatorChain(
                            containingTask,
                            operatorConfig,
                            chainedOpConfig,
                            chainedConfigs,
                            userCodeClassloader,
                            recordWriterOutputs,
                            allOperatorWrappers,
                            outputEdge.getOutputTag(),
                            mailboxExecutorFactory,
                            shouldAddMetric);
            checkState(output instanceof OutputWithChainingCheck);
            allOutputs.add((OutputWithChainingCheck) output);
            // If the operator has multiple downstream chained operators, only one of them should
            // increment the recordsOutCounter for this operator. Set shouldAddMetric to false
            // so that we would skip adding the counter to other downstream operators.
            //如果运算符具有多个下游链接的运算符,则其中只有一个应递增此运算符的recordsOutCounter。
            //将shouldAddMetric设置为false,以便我们跳过将计数器添加到其他下游运算符。
            shouldAddMetric = false;
        }

        WatermarkGaugeExposingOutput<StreamRecord<T>> result;

        if (allOutputs.size() == 1) {
            result = allOutputs.get(0);
            // only if this is a single RecordWriterOutput, reuse its numRecordOut for task.
            //仅当这是单个RecordWriterOutput时,才对任务重用其numRecordOut。
            if (result instanceof RecordWriterOutput) {
                Counter numRecordsOutCounter = createNumRecordsOutCounter(containingTask);
                ((RecordWriterOutput<T>) result).setNumRecordsOut(numRecordsOutCounter);
            }
        } else {
            // send to N outputs. Note that this includes the special case
            // of sending to zero outputs
            //发送到N个输出。请注意,这包括发送到零输出的特殊情况
            @SuppressWarnings({"unchecked"})
            OutputWithChainingCheck<StreamRecord<T>>[] allOutputsArray =
                    new OutputWithChainingCheck[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); i++) {
                allOutputsArray[i] = allOutputs.get(i);
            }

            // This is the inverse of creating the normal ChainingOutput.
            // If the chaining output does not copy we need to copy in the broadcast output,
            // otherwise multi-chaining would not work correctly.
            //这与创建正常的ChainingOutput相反。如果链接输出不复制,
            //我们需要在广播输出中复制,否则多链接将无法正常工作。
            Counter numRecordsOutForTask = createNumRecordsOutCounter(containingTask);
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                result =
                        closer.register(
                                new CopyingBroadcastingOutputCollector<>(
                                        allOutputsArray, numRecordsOutForTask));
            } else {
                result =
                        closer.register(
                                new BroadcastingOutputCollector<>(
                                        allOutputsArray, numRecordsOutForTask));
            }
        }

        if (shouldAddMetric) {
            // Create a CountingOutput to increment the recordsOutCounter for this operator
            // if we have not added the counter to any downstream chained operator.
            //如果我们尚未将计数器添加到任何下游链接运算符,则创建CountingOutput以递增此运算符的recordsOutCounter。
            Counter recordsOutCounter =
                    getOperatorRecordsOutCounter(containingTask, operatorConfig);
            if (recordsOutCounter != null) {
                result = new CountingOutput<>(result, recordsOutCounter);
            }
        }
        return result;
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#createOperatorChain

    //递归地创建从给定的 @ param operatorConfig开始的运算符链。
    // 运算符是尾对头创建的,并包装到WatermarkGaugeExposingOutput中。
    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
            StreamTask<OUT, ?> containingTask,
            StreamConfig prevOperatorConfig,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<IntermediateDataSetID, RecordWriterOutput<?>> recordWriterOutputs,
            List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
            OutputTag<IN> outputTag,
            MailboxExecutorFactory mailboxExecutorFactory,
            boolean shouldAddMetricForPrevOperator) {
        // create the output that the operator writes to first. this may recursively create more
        // operators
        //创建运算符首先写入的输出。这可能会递归地创建更多的运算符
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
                createOutputCollector(
                        containingTask,
                        operatorConfig,
                        chainedConfigs,
                        userCodeClassloader,
                        recordWriterOutputs,
                        allOperatorWrappers,
                        mailboxExecutorFactory,
                        true);

        OneInputStreamOperator<IN, OUT> chainedOperator =
                createOperator(
                        containingTask,
                        operatorConfig,
                        userCodeClassloader,
                        chainedOperatorOutput,
                        allOperatorWrappers,
                        false);

        return wrapOperatorIntoOutput(
                chainedOperator,
                containingTask,
                prevOperatorConfig,
                operatorConfig,
                userCodeClassloader,
                outputTag,
                shouldAddMetricForPrevOperator);
    }

org.apache.flink.streaming.runtime.tasks.OperatorChain#createOperator

    //从给定的 @ param operatorConfig创建并返回单个运算符,该运算符将生成记录到 @ param输出
    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
            StreamTask<OUT, ?> containingTask,
            StreamConfig operatorConfig,
            ClassLoader userCodeClassloader,
            WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
            List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
            boolean isHead) {

        // now create the operator and give it the output collector to write its output to
        //现在创建运算符并为其提供输出收集器以将其输出写入
        Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
                StreamOperatorFactoryUtil.createOperator(
                        operatorConfig.getStreamOperatorFactory(userCodeClassloader),
                        containingTask,
                        operatorConfig,
                        output,
                        operatorEventDispatcher);

        OP chainedOperator = chainedOperatorAndTimeService.f0;
        allOperatorWrappers.add(
                createOperatorWrapper(
                        chainedOperator,
                        containingTask,
                        operatorConfig,
                        chainedOperatorAndTimeService.f1,
                        isHead));

        chainedOperator
                .getMetricGroup()
                .gauge(
                        MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
                        output.getWatermarkGauge()::getValue);
        return chainedOperator;
    }

 

StreamTask的初始化过程

在OperatorChain实例创建完后,紧接着调用init()方法,该方法主要由子类实现。常见的StreamTask子类包括OneInputStreamTask和SourceStreamTask等。

 

(1)、OneInputStreamTask的init()方法调用,主要目的是创建StreamInputProcessor实例,即核心数据处理器。

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask#init

    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
        int numberOfInputs = configuration.getNumberOfNetworkInputs();

        if (numberOfInputs > 0) {
            CheckpointedInputGate inputGate = createCheckpointedInputGate();
            Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
            DataOutput<IN> output = createDataOutput(numRecordsIn);
            StreamTaskInput<IN> input = createTaskInput(inputGate);

            StreamConfig.InputConfig[] inputConfigs =
                    configuration.getInputs(getUserCodeClassLoader());
            StreamConfig.InputConfig inputConfig = inputConfigs[0];
            if (requiresSorting(inputConfig)) {
                checkState(
                        !configuration.isCheckpointingEnabled(),
                        "Checkpointing is not allowed with sorted inputs.");
                input = wrapWithSorted(input);
            }

            getEnvironment()
                    .getMetricGroup()
                    .getIOMetricGroup()
                    .reuseRecordsInputCounter(numRecordsIn);
            
            //创建StreamInputProcessor实例,即核心数据处理器。
            inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
        }
        mainOperator
                .getMetricGroup()
                .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
        // wrap watermark gauge since registered metrics must be unique
        //包装水印量规,因为注册的指标必须是唯一的
        getEnvironment()
                .getMetricGroup()
                .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
    }

 

(2)、SourceStreamTask的init()方法调用,主要是判断SourceFunction是不是ExternallyInducedSource类型,是的话为SourceFunction设置checkpoint相关的东西等。

org.apache.flink.streaming.runtime.tasks.SourceStreamTask#init

    protected void init() {
        // we check if the source is actually inducing the checkpoints, rather
        // than the trigger
        //我们检查源是否实际上是诱导检查点,而不是触发器
        SourceFunction<?> source = mainOperator.getUserFunction();
        if (source instanceof ExternallyInducedSource) {
            externallyInducedCheckpoints = true;

            ExternallyInducedSource.CheckpointTrigger triggerHook =
                    new ExternallyInducedSource.CheckpointTrigger() {

                        @Override
                        public void triggerCheckpoint(long checkpointId) throws FlinkException {
                            // TODO - we need to see how to derive those. We should probably not
                            // encode this in the
                            // TODO -   source's trigger message, but do a handshake in this task
                            // between the trigger
                            // TODO -   message from the master, and the source's trigger
                            // notification
                            //我们需要看看如何得出这些。我们可能不应该源的触发消息中对此进行编码,
                            // 而是在此任务中,在来自主机的触发消息和源的触发通知之间进行握手
                            final CheckpointOptions checkpointOptions =
                                    CheckpointOptions.forConfig(
                                            CheckpointType.CHECKPOINT,
                                            CheckpointStorageLocationReference.getDefault(),
                                            configuration.isExactlyOnceCheckpointMode(),
                                            configuration.isUnalignedCheckpointsEnabled(),
                                            configuration.getAlignedCheckpointTimeout().toMillis());
                            final long timestamp = System.currentTimeMillis();

                            final CheckpointMetaData checkpointMetaData =
                                    new CheckpointMetaData(checkpointId, timestamp, timestamp);

                            try {
                                SourceStreamTask.super
                                        .triggerCheckpointAsync(
                                                checkpointMetaData, checkpointOptions)
                                        .get();
                            } catch (RuntimeException e) {
                                throw e;
                            } catch (Exception e) {
                                throw new FlinkException(e.getMessage(), e);
                            }
                        }
                    };

            ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
        }
        getEnvironment()
                .getMetricGroup()
                .getIOMetricGroup()
                .gauge(
                        MetricNames.CHECKPOINT_START_DELAY_TIME,
                        this::getAsyncCheckpointStartDelayNanos);
        recordWriter.setMaxOverdraftBuffersPerGate(0);
    }

 

Operator的初始化过程

调用restoreStateAndGates()方法,在该方法中有一步重要的操作就是初始化算子链中的各个算子。

org.apache.flink.streaming.runtime.tasks.StreamTask#restoreStateAndGates

    private CompletableFuture<Void> restoreStateAndGates(
            SubTaskInitializationMetricsBuilder initializationMetrics) throws Exception {

        long mailboxStartTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric(
                MAILBOX_START_DURATION,
                mailboxStartTs - initializationMetrics.getInitializationStartTs());

        SequentialChannelStateReader reader =
                getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
        reader.readOutputData(
                getEnvironment().getAllWriters(), !configuration.isGraphContainingLoops());

        long readOutputDataTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric(
                READ_OUTPUT_DATA_DURATION, readOutputDataTs - mailboxStartTs);

        //调用算子链实例的initializeStateAndOpenOperators(...)方法。
        operatorChain.initializeStateAndOpenOperators(
                createStreamTaskStateInitializer(initializationMetrics));

        initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric(
                INITIALIZE_STATE_DURATION, initializeStateEndTs - readOutputDataTs);
        IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();

        channelIOExecutor.execute(
                () -> {
                    try {
                        //读取输入数据
                        reader.readInputData(inputGates);
                    } catch (Exception e) {
                        asyncExceptionHandler.handleAsyncException(
                                "Unable to read channel state", e);
                    }
                });

        // We wait for all input channel state to recover before we go into RUNNING state, and thus
        // start checkpointing. If we implement incremental checkpointing of input channel state
        // we must make sure it supports CheckpointType#FULL_CHECKPOINT
        //在进入 RUNNING 状态之前,我们等待所有输入通道状态恢复,从而开始检查点。
        // 如果我们实现输入通道状态的增量检查点,我们必须确保它支持 CheckpointTypeFULL_CHECKPOINT
        List<CompletableFuture<?>> recoveredFutures = new ArrayList<>(inputGates.length);
        for (InputGate inputGate : inputGates) {
            recoveredFutures.add(inputGate.getStateConsumedFuture());

            inputGate
                    .getStateConsumedFuture()
                    .thenRun(
                            () ->
                                    mainMailboxExecutor.execute(
                                            //请求数据操作。
                                            inputGate::requestPartitions,
                                            "Input gate request partitions"));
        }

        return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0]))
                .thenRun(mailboxProcessor::suspend);
    }

org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#initializeStateAndOpenOperators

    public void initializeStateAndOpenOperators(
            StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
            StreamOperator<?> operator = operatorWrapper.getStreamOperator();
            //提供上下文以初始化运算符中的所有状态。
            //主要负责初始化Operator成员变量operatorStateBackend、keyedStateBackend以及keyedStateStore等。
            operator.initializeState(streamTaskStateInitializer);
            //在处理任何元素之前立即调用此方法,它应包含运算符的初始化逻辑。
            //调用每个Operator的open()方法,继而调用UDF函数中open()方法。
            operator.open();
        }
    }

org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2#initializeState

    public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
            throws Exception {
        final TypeSerializer<?> keySerializer =
                config.getStateKeySerializer(getUserCodeClassloader());

        final StreamOperatorStateContext context =
                streamTaskStateManager.streamOperatorStateContext(
                        getOperatorID(),
                        getClass().getSimpleName(),
                        getProcessingTimeService(),
                        this,
                        keySerializer,
                        cancelables,
                        metrics,
                        config.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                ManagedMemoryUseCase.STATE_BACKEND,
                                runtimeContext.getJobConfiguration(),
                                runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                                runtimeContext.getUserCodeClassLoader()),
                        isUsingCustomRawKeyedState());

        stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables);
        timeServiceManager = context.internalTimerServiceManager();
        //初始化操作员状态
        stateHandler.initializeOperatorState(this);

        if (useSplittableTimers()
                && areSplittableTimersConfigured()
                && getTimeServiceManager().isPresent()) {
            watermarkProcessor =
                    new MailboxWatermarkProcessor(
                            output, mailboxExecutor, getTimeServiceManager().get());
        }
    }

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator#open

    public void open() throws Exception {
        super.open();
        //调用UDF函数中open()方法。
        FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE);
    }

org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

    public static void openFunction(Function function, OpenContext openContext) throws Exception {
        if (function instanceof RichFunction) {
            RichFunction richFunction = (RichFunction) function;
            richFunction.open(openContext);
        }
    }

以上即为StreamTask的启动过程前的准备过程。

 

Task读数据概述

在调用完子类的init()方法后,开始触发mailboxProcessor.runMailboxLoop();代码行的执行,由方法runMailboxLoop()可知,其主要作用是触发mailboxDefaultAction默认动作的执行,即StreamTask.processInput(...)方法的执行。

org.apache.flink.streaming.runtime.tasks.StreamTask#processInput

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        //过程输入
        DataInputStatus status = inputProcessor.processInput();
        switch (status) {
            case MORE_AVAILABLE:
                if (taskIsAvailable()) {
                    return;
                }
                break;
            case NOTHING_AVAILABLE:
                break;
            case END_OF_RECOVERY:
                throw new IllegalStateException("We should not receive this event here.");
            case STOPPED:
                endData(StopMode.NO_DRAIN);
                return;
            case END_OF_DATA:
                endData(StopMode.DRAIN);
                notifyEndOfData();
                return;
            case END_OF_INPUT:
                // Suspend the mailbox processor, it would be resumed in afterInvoke and finished
                // after all records processed by the downstream tasks. We also suspend the default
                // actions to avoid repeat executing the empty default operation (namely process
                // records).
                //挂起mailbox处理器,它将在afterInvoke中恢复,并在下游任务处理完所有记录后完成。
                // 我们还暂停默认操作,以避免重复执行空的默认操作 (即处理记录)。
                controller.suspendDefaultAction();
                mailboxProcessor.suspend();
                return;
        }

        TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
        PeriodTimer timer;
        CompletableFuture<?> resumeFuture;
        if (!recordWriter.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
            resumeFuture = recordWriter.getAvailableFuture();
        } else if (!inputProcessor.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
            resumeFuture = inputProcessor.getAvailableFuture();
        } else if (changelogWriterAvailabilityProvider != null
                && !changelogWriterAvailabilityProvider.isAvailable()) {
            // waiting for changelog availability is reported as busy
            //等待变更日志可用性被报告为繁忙
            timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
            resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
        } else {
            // data availability has changed in the meantime; retry immediately
            // 数据可用性同时发生了变化;立即重试
            return;
        }
        assertNoException(
                resumeFuture.thenRun(
                        new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
    }

 

子类SourceStreamTask.processInput(...)方法执行如下:

org.apache.flink.streaming.runtime.tasks.SourceStreamTask#processInput

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {

        controller.suspendDefaultAction();

        // Against the usual contract of this method, this implementation is not step-wise but
        // blocking instead for
        // compatibility reasons with the current source interface (source functions run as a loop,
        // not in steps).
        //与此方法的通常约定不同,此实现不是逐步实现,
        // 而是出于与当前源接口的兼容性原因而阻塞(源函数作为循环运行,而不是逐步运行)。
        sourceThread.setTaskDescription(getName());


        // 调用run方法
        sourceThread.start();

        sourceThread
                .getCompletionFuture()
                .whenComplete(
                        (Void ignore, Throwable sourceThreadThrowable) -> {
                            if (sourceThreadThrowable != null) {
                                mailboxProcessor.reportThrowable(sourceThreadThrowable);
                            } else {
                                notifyEndOfData();
                                mailboxProcessor.suspend();
                            }
                        });
    }

org.apache.flink.streaming.runtime.tasks.SourceStreamTask.LegacySourceFunctionThread#run

        @Override
        public void run() {
            try {
                if (!operatorChain.isTaskDeployedAsFinished()) {
                    LOG.debug(
                            "Legacy source {} skip execution since the task is finished on restore",
                            getTaskNameWithSubtaskAndId());
                    mainOperator.run(lock, operatorChain);
                }
                completeProcessing();
                completionFuture.complete(null);
            } catch (Throwable t) {
                // Note, t can be also an InterruptedException
                // 注意,t也可以是InterruptedException
                if (isCanceled()
                        && ExceptionUtils.findThrowable(t, InterruptedException.class)
                                .isPresent()) {
                    completionFuture.completeExceptionally(new CancelTaskException(t));
                } else {
                    completionFuture.completeExceptionally(t);
                }
            }
        }

org.apache.flink.streaming.api.operators.StreamSource#run

    public void run(final Object lockingObject, final OperatorChain<?, ?> operatorChain)
            throws Exception {

        run(lockingObject, output, operatorChain);
    }

org.apache.flink.streaming.api.operators.StreamSource#run

    public void run(
            final Object lockingObject,
            final Output<StreamRecord<OUT>> collector,
            final OperatorChain<?, ?> operatorChain)
            throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration =
                this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        //延迟跟踪间隔
        final long latencyTrackingInterval =
                getExecutionConfig().isLatencyTrackingConfigured()
                        ? getExecutionConfig().getLatencyTrackingInterval()
                        : configuration.get(MetricOptions.LATENCY_INTERVAL).toMillis();

        LatencyMarkerEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter =
                    new LatencyMarkerEmitter<>(
                            getProcessingTimeService(),
                            collector::emitLatencyMarker,
                            latencyTrackingInterval,
                            this.getOperatorID(),
                            getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }

        //水印间隔
        final long watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

        //根据时间特性 ,返回适当的SourceFunction.SourceContext
        this.ctx =
                StreamSourceContexts.getSourceContext(
                        timeCharacteristic,
                        getProcessingTimeService(),
                        lockingObject,
                        collector,
                        watermarkInterval,
                        -1,
                        emitProgressiveWatermarks);

        try {
            //启动源。实现使用SourceFunction.SourceContext发出元素。
            userFunction.run(ctx);
        } finally {
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }

以上为概要的Task读取数据的过程

 

参考资料:

Flink源码解析(十六)——Flink Task启动过程解析


目录