大家好,欢迎来到IT知识分享网。
导读 本文将介绍由基础到进阶的 Hudi 实时入湖解决方案。
主要内容包括:
1. 数据集成整体方案
2. 数据入湖通用方案
3. 数据入湖进阶方案
分享嘉宾|杨宣 华为 大数据开发工程师
编辑整理|成亮
内容校对|李瑶
出品社区|DataFun
01
数据集成整体方案
1. 整体方案
首先来介绍一下数据集成方案的整体概况。

数据集成大致可分为三种模式:
- 通过 JDBC 直连,通过 Sqoop,将数据写入 Hive 表。
- 通过 CDC 工具采集数据库日志,再将其写入专业的 CDC 工具所支持的存储格式中。
- 通过数据库将数据落到文件或消息队列中,作为数据源,其中文件再通过Sqoop 及 FTP 服务写入 Hive 表;非标准格式的消息队列通过 Spark/Flink 作业写入 Hive/Hudi 表,标准格式通过专业的 CDC 工具写入 Hudi 表。
入湖模式可以分为实时和批量,实时数据量单表约万级 TPS,秒级的时延要求;批量数据量较大,达到百万到千万级,周期性同步,延迟是分钟到小时级。接下来分别展开介绍批量和实时的入湖方案。
2. 批量入湖方案
批量入湖适用于存量数据搬迁和补数的场景,时效性要求不高,比如 T+1。批量入湖过程中存在的一些挑战主要包括:
- 挑战一:数据重复问题。数据库 JDBC 直连或文件导出方式无法识别更新和删除,直接写入目标表会导致数据重复问题。
- 建议:可以使用 Hudi 的行级更新能力,自动去重。建议对表进行分区,这样去重环节比较方便。
- 挑战二:JDBC 直连方式读取数据库会受到底层网络资源的影响,JDBC 协议通道也有限,在大数据量场景下不仅效率低而且会给业务库带来压力。
- 建议:可以通过备库或者文件入湖来规避。
- 挑战三:文件入湖需要上下游协同来保证完整性,依赖可以管理上下游任务调度的平台。
- 建议:可以对文件进行压缩上传和下载,降低网络通道消耗,通过 flag 来标记文件上传状态,保证文件完整性。
- 挑战四:对接消息管道批量入湖需要关注数据老化时间,防止数据丢失。
- 建议:可以通过限流来保证入湖程序的稳定性,增加监控对消费终止和积压等进行告警,降低数据老化风险。
3. 实时入湖方案
实时入湖的特点主要包括:
- 入湖频率高,单次数据量低,数据都是包含新增、更新和删除的过程数据,而非最终快照数据。
- 业务需要快速的数据计算,以满足业务实时决策需求。
- 任务常驻,资源不释放,需要降低资源消耗峰值和源库压力。
实时入湖面临的主要挑战包括:
- 挑战一:Flink 直连数据源的方案单个流只能处理单个表,资源消耗大,无并发,吞吐量小,在异常场景分析难度大,可靠性较低;Spark 直连数据源的开发成本高。
- 建议:采用专业的 CDC 工具,可以利用其可视化的界面操作,无需客户开发代码;单任务可以采集多表,资源复用,高吞吐;支持的数据源和目标存储格式丰富;最关键的是专业的 CDC 工具可以快速从异常场景恢复,丢数风险低,有丰富的告警和监控来保证任务的可靠性。
- 挑战二:实时场景的 DDL 和 DML 操作需要一定的有序性保证。
- 建议:要保证数据湖的 DDL 操作早于业务库变更,可以在入湖程序对关键表的数据格式进行检查,如字段增减、数据类型调整等,发现异常程序立即退出并告警,保证数据质量。
- 挑战三:实时场景对于数据存储模型的设计非常重要。
- 建议:实时数据入湖使用 Hudi 格式存储,特大表入湖使用 MOR 表+Bucket 索引+分区。
4. Hudi 表模型设计方案
Hudi 主要包括两种存储格式:COW 和 MOR。COW 格式经常用于批量场景中,因为其 copy on write 的特点,造成了写放大的问题,写入相对于 MOR 会慢很多,因此不适合实时场景。如果对入湖实时性要求较高可以选择 MOR 表,端到端可以控制在分钟级。Hudi 支持多种类型引擎,表字段大小写以及字段类型在不同引擎上都会有差别,因此建议统一表字段的大小写,调研各引擎之间字段类型的映射关系。
索引方面,如果业务涉及到多个引擎操作同一个 Hudi 表,需要使用统一的索引;数据量很大的情况下,可以使用 Bucket 索引,将数据进行 Hash 分发存储;Bucket 索引的分桶数要合理,分区表可以根据业务峰值预估最大的分区未压缩前的数据量/2G 来进行分桶,非分区表可以在分区的分桶基础上乘 2 来分桶;状态索引建议在 2 亿数据量以下使用,COW 多数使用 Simple 索引,大表可以采用 Bloom 索引,MOR 大数据量表建议使用 Bucket 索引。
分区方面,实时表采用日期分区,维度表采用非分区或者粗粒度的日期分区;分区要基于数据更新范围以及下游作业读取范围来确定。
写入模式方面,可以使用 Upsert 保证 Hudi 的自动去重,使用 Append 来进行日志类型数据的追加。
Hudi 在贴源层处理数据主要有两种方案:快照表和拉链表。
首先介绍快照表的方案。

快照表的增量同步,批量或实时将业务更新数据通过 Upsert 同步至 Hudi 快照表。全量同步适用于小表批量入湖,以及源表物理删除。全量同步直接使用Truncate+Insert 或者 Insert Overwrite Table。
下面介绍 Hudi 在拉链表的方案,包括三部分:增量拉链表、全量拉链表和传统数仓的拉链表。

Hudi 表会保存历史增量数据快照,使用分区标识增量时间段。使用增量同步,直接 Upsert 到 Hudi 表。如果没有时间标识增量时间段,可以直接使用全量数据生成 Hudi 最新快照,和 Hudi 旧快照表进行比对,找到该时段增量数据,直接写到 Hudi 的增量拉链表。每个分区内数据都是该时间范围产生的增量数据最新快照,分区之间可以存在主键重复,分区内没有主键重复。

Hudi 的全量拉链表每个分区数据是全量数据的最新快照,冗余很大,适合数据总量小,更新操作多的场景。通常使用批量同步任务,建议拉链表的时间分区通常为拉链程序运行日减一天,在次日凌晨进行前一天的数据同步。

数仓拉链涉及 StartTime 和 EndTime,开链直接写 Hudi,可以使用实时快照表和历史增量表组成;闭链可以使用索引表,建立主键和 EndTime 的索引,在更新时快速找到需要闭链的行,进行 EndTime 更新。
02
实时入湖通用方案
接下来介绍实时入湖的一些通用方案,以及用到的 Hudi 的核心能力。
1. 整体方案

Hudi 在实时入湖过程中承担着数据组织的角色,负责管理数据集,并提供各种查询接口。
实时入湖的存储引擎通常为 HDFS 或者云存储,查询引擎支持 Spark 或者Flink,推荐使用 Flink 进行流式加工至 Hudi,使用 Spark 进行批量补数并完成 Hudi 的表服务,这种方式可以保证实时入湖的稳定性。
表服务对于 Hudi 表来说非常重要,维护着表的健康状态,负责清理 Hudi 表的冗余数据文件,减轻 timeline 压力以及小文件问题等等。
2. 分区老化

Hudi 支持分区级数据老化能力,可以按表设置多个 TTL 策略。批量写任务可以在数据写入完成后执行 TTL。实时写任务如果进行数据老化会影响性能,可以在异步 clean 任务中完成 TTL。
3. 动态 Bucket 桶

Bucket 桶数可以进行动态扩展,以适应业务规模的激增,可以在建表时,指定预先分桶策略,设置未来某天的分桶数。此外,我们可以通过现有的桶数,以及现有桶下的文件增长情况,进行 AI 桶数预测。分区下的桶数支持原地调整,但需要覆写数据分区。
4. 隐式分区

Hudi 支持隐式分区字段,直接使用业务字段进行分区,无需再加工日期字段。我们可以选择适合业务场景的分区,从而在查询时实现加速。
5. 分区演进

指定分区规则后,可以根据业务场景随时进行调整,满足业务查询条件多样性和多变性,提升查询速度。业务高峰期可以从数据量、特定的业务查询条件等来演进分区,实现负载均衡和查询优化。
03
实时入湖进阶方案
接下来介绍 Hudi 的更多进阶能力。
1. ChangeLog

将业务记录变更的过程数据通过 ChangeLog 存储到 Hudi,供下游 Flink 加工任务使用。如上图所示,主键 ID 为 1 的一条数据发生了更新,如果没有开启 ChangeLog,在 sum 时会将两条数据求和,而开启 ChangeLog 后只取最新的一条,从而保证了数据的准确性。
另外,使用 Hudi Changelog 表来存储变更数据可以解决 Kafka 数据老化快、存储成本高和无法点查等问题。
2. 高速流表

在实时入湖流程中,可以使用高速流表降低 Flink 端到端加工任务的时延。主流程使用 Flink 任务消费 Kafka,并写入实时数仓。在中间流程,使用 HDMS(Hudi 的表管理服务,负责托管 hudi 表服务以及流表任务。其中流表任务会将 Kafka 里的数据写到 hudi 流表)消费 Kafka 数据异步写入 Hudi 流表,可以解决 Kafka 数据老化快的问题,Flink 任务也可以基于 Hudi 流表中的变更数据进行更快或更早的恢复。
3. 列簇

为了适应拉宽表的场景,我们提出了列簇的概念,将 file group 进一步细分。列簇是指写任务写宽表时要更新的字段集合,这几个字段映射到宽表的一行,每个写任务要更新列簇的几个字段。列簇适用于 MOR 表和 Bucket 索引的场景,写任务并发向列簇进行追加写。

读流程按照列簇进行并发读,最后将多个列簇的数据进行 sort merge 操作。
4. MOW

目前 Hudi 支持 MOR 和 COW 两种存储模式,我们结合两者特点提出了 MOW 模型(Merge on Write),相比 MOR 读得更快,相比 COW 写得更快。更新时每个 parquet 文件维护一个 bitmap,删除记录使用位图置 0 来标识。
5. MDT

当 COW 表存量在百 TB 级别,分区后每天增量在几 TB,我们可以使用MDT 加速查询。MDT 是由分区加文件的索引构成的表。我们可以在客户查询前,将 MDT 索引缓存到 JDBC Server 缓存。使用 MDT 加速后,单并发查询在 3 秒内,50并发查询可以控制在 10 秒内。
以上就是本次分享的内容,谢谢大家。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/180280.html