前言※
随着大数据时代的到来,数据量不断增长,HDFS也成为了数据存储和处理的重要组成部分。然而,由于HDFS的设计原理和文件存储方式,HDFS系统中存在大量的小文件,这些小文件会导致HDFS的性能下降,增加管理和维护的难度,严重影响数据处理效率和数据质量。因此,HDFS小文件的治理变得越来越重要。
小文件的危害※
小文件通常指文件大小要比 HDFS 块大小还要小很多的文件,大量的小文件会给 Hadoop 集群的扩展性和性能带来严重的影响。
NameNode 在内存中维护整个文件系统的元数据镜像、用户 HDFS 的管理,其中每个 HDFS 文件元信息(位置、大小、分块等)对象约占150字节,如果小文件过多,会占用大量内存,直接影响 NameNode 的性能。相对地,HDFS 读写小文件也会更加耗时,因为每次都需要从 NameNode 获取元信息,并与对应的 DataNode 建立连接。如果 NameNode 在宕机中恢复,也需要更多的时间从元数据文件中加载。
同时,小文件会给 Spark SQL 等查询引擎造成查询性能的损耗,大量的数据分片信息以及对应产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题。此外,入库操作最后的 commit job 操作,在 Spark Driver 端单点做,很容易出现单点的性能问题。
治理方案※
存量治理※
传统方法如手动编码合并或使用专用工具(如HDFS Merge工具)虽能直接清理历史小文件,但存在以下缺陷:
- 高风险性:合并操作可能引发数据不一致或丢失,尤其在复杂业务场景中。
- 低效性:需额外开发与维护,且对在线业务干扰较大
增量治理※
在数据可见前对文件进行合并,业界有字节Parquet快速合并方案 和 Kyuubi 的Spark extension
因为快速合并方案改动较大,且实施难度高这里不考虑
Kyuubi Spark extension 基于Spark AQE + Spark Session 扩展很好的满足当前业务需求
引入kyuubi-extension-spark-3-1_2.12-1.8.3.jar 至Spark (注意使用Spark对应版本)
配置示例:
//Spark AQE参数 这里使用的默认值 spark.sql.adaptive.enabled: true spark.sql.adaptive.coalescePartitions.enabled: true spark.sql.adaptive.coalescePartitions.minPartitionNum: 1 spark.sql.adaptive.coalescePartitions.initialPartitionNum: 200 spark.sql.adaptive.advisoryPartitionSizeInBytes: 64MB //Kyuubi extension 参数 spark.sql.extensions: org.apache.kyuubi.sql.KyuubiSparkSQLExtension spark.sql.optimizer.insertRepartitionBeforeWrite.enabled: true spark.sql.optimizer.finalStageConfigIsolation.enabled: true spark.sql.optimizer.forceShuffleBeforeJoin.enabled: true spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled: true //最终文件大小受文件格式和压缩方式影响 spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes: 256MB
生产案例※
CREATE TABLE `ods.file_merge_test`(
//忽略字段信息
)
COMMENT 'im用户维表'
PARTITIONED BY (
`dt` string COMMENT '日期')
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://hacluster/user/hive/warehouse/ods.db/file_merge_test'
TBLPROPERTIES (
'bucketing_version'='2',
'parquet.column.index.access'='true',
'parquet.compression'='snappy',
'transient_lastDdlTime'='1669253223')
insert overwrite table ods.file_merge_test
select
*
from ods.hudi_xxx where dt <= '${Y}'
Spark-3.1.1 同等资源下 (9C 18G)
改造前运行 18 min 最终文件数 7212 文件总大小 1004.6 MiB 数据条数 33792836 每个文件几十KB
改造后运行 7.7 min 最终文件数 9 文件总大小 1205.5 MiB 数据条数 33792836 每个文件140M左右
总结※
提前规避要好于事后补救,在任务开发以及表设计的前期尽可能考虑小文件问题尤为重要,因为事后修改任务或者修改表带来业务失败的概率会大得多。此外,小文件治理也是一个长期的过程,对于一个生产集群,定期的进行小文件治理是必要的。
参考资料