病毒|作业帮基于 DeltaLake 的数据湖建设最佳实践( 三 )


如何界定数据完全就绪
流式数据一般会有乱序的情况 , 在乱序的情况下 , 即使采用 watermark 的机制 , 也只能保障一定时间范围内的数据有序 , 而对于离线数仓来说 , 数据需要100%可靠不丢 。 而如果我们可以解决数据源的有序性问题 , 那么数据就绪问题的解决就会简化很多:假如数据按照天级分区 , 那么当出现12-31的数据时 , 就可以认为12-30的数据都就绪了 。
因此 , 我们的方案拆解为两个子问题:
流数据有序后界定批数据边界 保障流数据有序的机制 首先对于前者 , 总体方案如下:

设定数据表的逻辑分区字段 dt 以及对应的时间单位信息 。
当 Spark 读取某一个 batch 数据后 , 根据上述表元数据使用数据中的 event time 生成对应的 dt 值 , 如数据流中 event time 的值均属于T+1 , 则会触发生成数据版本T的 snapshot , 数据读取时根据 snapshot 找到对应的数据版本信息进行读取 。如何解决流数据的乱序问题
不论是 app-log 还是 MySQL-Binlog , 对于日志本身都是有序的 , 以 MySQL-Binlog 举例 , 单个物理表的 Binlog 必然有序 , 但是实际业务场景下 , 业务系统会经常进行分库分表的使用 , 对于使用分表的场景 , 一张逻辑表 Table 会分为 Table1、Table2、……几张表 , 对于离线数仓的 ODS 表 , 则需要屏蔽掉业务侧 MySQL 分表的细节和逻辑 , 这样 , 问题就聚焦为如何解决分表场景下数据有序的问题 。
保障分库分表 , 甚至不同分表在不同集群的情况下 , 数据写入到 Kafka 后的有序性 。 即写入 DeltaLake 的 Spark 从某个 topic 读取到逻辑表的数据是 partition 粒度有序的 。保障 ODS 表就绪的时效性 , 如区分无 Binlog 数据的情况下 , ODS 层数据也可以按期就绪 。此处需要对原有系统进行升级改造 , 方案如下:

如上图所示:某个 MySQL 集群的 Binlog 经 Canal 采集后写入到特定的 Kafka-topic , 但是由于写入时按照db和 Table(去分表_*后缀)做 hash 确定 partition , 因此单个 partition 内部会存在多个物理表的 Binlog , 对于写入 DeltaLake 来说非常不友好 。 考虑到对其他数据应用方的兼容性 , 我们新增了数据分发服务:
将逻辑表名(去分表_*后缀)的数据写入到对应的 topic , 并使用物理表名进行 hash 。 保障单 partition 内部数据始终有序 , 单 topic 内仅包括一张逻辑表的数据 。在 MySQL 集群内构建了内部的心跳表 , 来做 Canal 采集的延迟异常监控 , 并基于此功能设置一定的阈值来判断当系统没有 Binlog 数据时是系统出问题了还是真的没数据了 。 如果是后者 , 也会触发 DeltaLake 进行 savepoint , 进而及时触发 snapshot来保障 ODS 表的及时就绪 。通过上述方案 , 我们将 Binlog 数据流式的写入 DeltaLake 中 , 且表分区就绪时间延迟10mins 。
读写性能优化
下面讲下我们在使用 DeltaLake 过程中遇到的性能问题以及对应的解法 。
通过 DPP 提高写性能
DeltaLake 支持通过 SparkStreamingSQL 的方式来写入数据 。

因为要做记录的合并去重 , 因此这里需要通过 merge into 的方式写入 。 DeltaLake 更新数据时分为两步:
定位到要更新的文件 , 默认情况下需要读取全部的文件和 Spark 内 batch 的增量数据做 Join , 关联出需要更新的文件来 。Merge 后重新写入这些文件 , 把老的文件标记为删除 。
如上左图所示 , 由于 DeltaLake 默认会读取上个版本的全量文件 , 因此导致写入性能极低 , 一次合并操作无法在 Spark一个 batch 内完成 。
针对这种场景 , 对 DeltaLake 做了升级:使用 DPP 做分区剪枝来优化 Megre into 的性能 , 如上右图所示: