转载至惊帆大佬的博客: https://baifachuan.com/posts/e51fd265.html
最近有一套大数据架构,中间的一个数据流是:
Flink 通过 CDC实时的把数据写入对象存储,采用的是hudi connector。 Flink开启了compaction机制,且频率较快,因为数据量比较大,所以可以理解为几乎持续在做合并。 在Hue中使用Spark SQL对表进行查询分析。 |
在这个链路下,碰到一个问题,问题是:
t1时刻:flink cdc不断的给对象存储写文件,假设写了10个文件,且开启了定期合并 t2时刻:建出一个SparkSession。执行一个SQL对表进行count,SQL可以成功执行 t3时刻:flink 启动定期合并,这时候会删除之前的10个,产生一个新的文件。 t4时刻:继续使用之前那个SparkSession对象执行SQL,会报错,报错被merge掉的文件找不到。 |
看起来就是对于一个已经存在的SparkSession,它并没有去感知对象存储中的文件发生的变化,开始以为是hudi的表格式的问题,于是去看了一下hudi的代码,对应到hudi(0.11.1)的代码是在:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala |
在这个文件中,定义了用来记录一张表有哪些文件列表的meta信息,使用了lazy的形式进行加载:
/** * NOTE: PLEASE READ THIS CAREFULLY * * Even though [[HoodieFileIndex]] initializes eagerly listing all of the files w/in the given Hudi table, * this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until * it's actually accessed */ protected lazy val fileIndex: HoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession)) |
由于 fileIndex
是lazy的,所以在定义的时候并不会去创建HoodieFileIndex
对象,而是在使用的时候才会去初始化,对应到使用的地方是listLatestBaseFiles
方法:
protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { val partitionDirs = if (globbedPaths.isEmpty) { fileIndex.listFiles(partitionFilters, dataFilters) } else { val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths) inMemoryFileIndex.listFiles(partitionFilters, dataFilters) } val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) latestBaseFiles.groupBy(getPartitionPath) } |
通过断点发现,对fileIndex
的初始化只有第一次的时候,才会进入,第二次后就不会进入,也就是第二次SQL开始,fileIndex
始终拿到的是第一次初始化的值,从而导致了后续的SQL出现问题。
这时候直观感觉可能不是Hudi的问题,可能是问题出在Spark(3.2.1)端,基于错误的堆栈逐步查找,最后看到Spark的执行计划这部分,基本能找到原因了,原因是在Spark里面,对于同一个SparkSession,它会Cache住相同SQL的执行计划。
对应到代码的位置是:
org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala |
中的FindDataSourceTable
:
private def readDataSourceTable( table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) catalog.getCachedPlan(qualifiedTableName, () => { val dataSource = DataSource( sparkSession, // In older version(prior to 2.1) of Spark, the table schema can be empty and should be // inferred at runtime. We should still support it. userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, className = table.provider.get, options = dsOptions, catalogTable = Some(table)) LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) }) } |
和 org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
的:
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { var builder = CacheBuilder.newBuilder() .maximumSize(cacheSize) if (cacheTTL > 0) { builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) } builder.build[QualifiedTableName, LogicalPlan]() } |
可以看到,Spark对于相同的SQL,会基于DB和table产生qualifiedTableName,然后把这个里面的meta给cache下来,从而导致后面拿到的meta都是cache后的,因此一旦出现类似flink这种做了一个merge的动作。
对于一个已经存在的SparkSession是无法感知到对应的变化的,问题明确了,要解决问题的话,就比较容易了,一般来说可以有两种办法去解决,一种是关闭cache,另一种是缩短这个cache时间。
对于缩短cache时间的话,是调整spark.sql.metadataCacheTTLSeconds
这个key,对应到引擎中的描述是:
val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds") .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + "session catalog cache. This configuration only has an effect when this value having " + "a positive value (> 0). It also requires setting " + s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " + s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " + s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " + "to be applied to the partition file metadata cache.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(-1) |
如果要关闭缓存的话,则可以将spark.sql.filesourceTableRelationCacheSize
设置成0,这里本质上是将cache的容量调整为0,自然一个key都放不进去,约等于关闭了cache。