关于Spark缓存了执行计划引起的数据不准确的问题

-
-
2024-02-02

转载至惊帆大佬的博客: https://baifachuan.com/posts/e51fd265.html

 

最近有一套大数据架构,中间的一个数据流是:

Flink 通过 CDC实时的把数据写入对象存储,采用的是hudi connector。
Flink开启了compaction机制,且频率较快,因为数据量比较大,所以可以理解为几乎持续在做合并。
在Hue中使用Spark SQL对表进行查询分析。
 

在这个链路下,碰到一个问题,问题是:

t1时刻:flink cdc不断的给对象存储写文件,假设写了10个文件,且开启了定期合并
t2时刻:建出一个SparkSession。执行一个SQL对表进行count,SQL可以成功执行
t3时刻:flink 启动定期合并,这时候会删除之前的10个,产生一个新的文件。
t4时刻:继续使用之前那个SparkSession对象执行SQL,会报错,报错被merge掉的文件找不到。
 

看起来就是对于一个已经存在的SparkSession,它并没有去感知对象存储中的文件发生的变化,开始以为是hudi的表格式的问题,于是去看了一下hudi的代码,对应到hudi(0.11.1)的代码是在:

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 

在这个文件中,定义了用来记录一张表有哪些文件列表的meta信息,使用了lazy的形式进行加载:

/**
 * NOTE: PLEASE READ THIS CAREFULLY
 *
 * Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table,
 * this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until
 * it's actually accessed
 */
protected lazy val fileIndex: HoodieFileIndex =
  HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
    FileStatusCache.getOrCreate(sparkSession))
 

由于 fileIndex 是lazy的,所以在定义的时候并不会去创建HoodieFileIndex对象,而是在使用的时候才会去初始化,对应到使用的地方是listLatestBaseFiles方法:

protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
  val partitionDirs = if (globbedPaths.isEmpty) {
    fileIndex.listFiles(partitionFilters, dataFilters)
  } else {
    val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
    inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
  }

  val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
  val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)

  latestBaseFiles.groupBy(getPartitionPath)
}
 

通过断点发现,对fileIndex的初始化只有第一次的时候,才会进入,第二次后就不会进入,也就是第二次SQL开始,fileIndex始终拿到的是第一次初始化的值,从而导致了后续的SQL出现问题。

这时候直观感觉可能不是Hudi的问题,可能是问题出在Spark(3.2.1)端,基于错误的堆栈逐步查找,最后看到Spark的执行计划这部分,基本能找到原因了,原因是在Spark里面,对于同一个SparkSession,它会Cache住相同SQL的执行计划。

对应到代码的位置是:

org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 

中的FindDataSourceTable

private def readDataSourceTable(
    table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = {
  val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
  val catalog = sparkSession.sessionState.catalog
  val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
  catalog.getCachedPlan(qualifiedTableName, () => {
    val dataSource =
      DataSource(
        sparkSession,
        // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
        // inferred at runtime. We should still support it.
        userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
        partitionColumns = table.partitionColumnNames,
        bucketSpec = table.bucketSpec,
        className = table.provider.get,
        options = dsOptions,
        catalogTable = Some(table))
    LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
  })
}
 

org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala的:

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
  var builder = CacheBuilder.newBuilder()
    .maximumSize(cacheSize)

  if (cacheTTL > 0) {
    builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
  }

  builder.build[QualifiedTableName, LogicalPlan]()
}
 

可以看到,Spark对于相同的SQL,会基于DB和table产生qualifiedTableName,然后把这个里面的meta给cache下来,从而导致后面拿到的meta都是cache后的,因此一旦出现类似flink这种做了一个merge的动作。

对于一个已经存在的SparkSession是无法感知到对应的变化的,问题明确了,要解决问题的话,就比较容易了,一般来说可以有两种办法去解决,一种是关闭cache,另一种是缩短这个cache时间。

对于缩短cache时间的话,是调整spark.sql.metadataCacheTTLSeconds这个key,对应到引擎中的描述是:

val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds")
    .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " +
      "session catalog cache. This configuration only has an effect when this value having " +
      "a positive value (> 0). It also requires setting " +
      s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " +
      s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " +
      s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " +
      "to be applied to the partition file metadata cache.")
    .version("3.1.0")
    .timeConf(TimeUnit.SECONDS)
    .createWithDefault(-1)
 

如果要关闭缓存的话,则可以将spark.sql.filesourceTableRelationCacheSize设置成0,这里本质上是将cache的容量调整为0,自然一个key都放不进去,约等于关闭了cache。


阿财
漫漫优化路,总会错几步!
公告

本网站转载的文章、图片、音视频等资料,均来源于互联网和媒体,转载目的在于个人记录。如涉及作品内容、版权和其它问题,请在30日内与本网站联系,我们将在第一时间删除内容!
最新评论

加载中...