查看原文
其他

OPPO 智能湖仓的实践之路

陈哲嘉 DataFunSummit
2024-09-10

导读 随着业务的快速发展,很多企业正面临着前所未有的数据增长。这些数据无论是来自在线交易、社交媒体互动还是物联网设备,都蕴含着宝贵的洞察力,能够帮助企业做出更明智的决策。然而,随着数据量的激增,传统的数据管理解决方案开始显得力不从心。智能湖仓是一种创新的数据平台,它结合了数据湖的可扩展性和数据仓库的性能优势,旨在为企业提供一个一站式的数据分析和管理解决方案。本文将分享 OPPO 在数据服务领域的实践和相关技术。

今天的介绍会围绕下面五点展开:

1. 背景介绍

2. 实时湖仓

3. 数据入湖

4. 技术规划

5. 问答环节

分享嘉宾|陈哲嘉 OPPO 高级大数据平台工程师

编辑整理|李阳

内容校对|李瑶

出品社区|DataFun


01
背景介绍

首先介绍一下相关背景。

1. OPPO 大数据架构

OPPO 大数据平台规模是比较大的,数据量级已经达到了 EB 级别,任务日均百万,离线任务主要是 spark,实时任务主要是 flink。技术栈主要是基于开源的软件做二次开发和改造,以提升稳定性和效率。

上图展示了 OPPO 大数据平台的整体架构。
  • 接入层:做了多引擎的适配以及 HBO,由于数据湖对引擎版本要求较高,我们在接入层支持用户手动指定引擎版本。
  • 计算引擎层:部署外部的 shuffle 引擎,Spark 和 Flink 都可以用相同的外部 Shuffle 服务,从而提高了 Shuffle 效率,该项目已经开源,叫做 Shuttle
  • 调度层:也做了很多工作,特别是今年在海外云,比如针对 AWS 云的调度做了很多优化,降低了成本。
  • 运维诊断系统:对 Spark 任务、Flink 任务进行全自动的诊断,运维人员可以轻松管理集群上的任务,不需要对引擎有很深入的了解,系统可以发现并报告任务的问题。这个项目已经开始对接 Apache,未来可能会成为 Apache 的一个孵化项目。
  • 数据湖 Glacier主要的工作是流批一体、数据入湖以及索引方面的工作。功能是把引擎层和存储层衔接在一起,替换 Hive 表,达成业务目标。

2. 湖仓一体

原来 OPPO 主要使用的是大数据技术体系,使用 Hive 表。数据接入时需要用到数据集成的功能,比如从 MySQL 或者 MongoDB 中导数据,每天或者每小时导一次数据,在使用中发现如下一些问题:
  • Hive 表不够灵活,比如 MySQL 经常会加减字段,Hive 表就需要重新建表,重新导入数据;另外时效性较低,只能做到小时级,难以继续缩小延迟,无法支撑实时业务。
  • 离线数仓升级为实时数仓的过程非常复杂,会使用 Lamda 架构,Flink Kafka,需要按照原来离线的逻辑,从头开始搭建一套实时链路,维护成本非常高。
  • 针对两套引擎,开发人员既要懂离线引擎,又要懂实时引擎相关技术,因此开发门槛很高,实时业务开发效率比较低。
我们希望利用数据湖技术来解决上述问题。OPPO 智能湖仓的目标为:
  • 数据一键接入:希望数据可以非常方便地接入,并且可以实现自动的 schema 适应。
  • 流批一体:一条链路,既可以跑流也可以跑批,从而节省成本。
  • 统一入口:对外是统一的引擎,包括 SQL 也是统一的,方便业务做开发。
  • 高实时性
  • 高性能数据分析
  • 使用简化:全托管,相比原来的数仓架构,使用更加简化。
  • 降低成本:降低运维工作量和成本。

3. Glacier service

Glacier 是辅助 Iceberg 的一个服务。因为 Iceberg 是没有服务的,任务的执行和管理都依赖于 Hive,因此我们自研了 Glacier Service 对湖仓的表统一管理。

另外,Iceberg 需要很多优化,比如文件合并、数据和任务等资源的清理,Iceberg 自身支持的都不是很好,所以我们在 Glacier Service 中增加了任务管理的功能,可以知道哪些表是 active 状态。还增加了生命周期的管理,可以自动清理无效数据、元数据,以及数据的降冷、过期等功能,比如自动清理一些旧的分区。

目前 Glacier Service 架构主要使用的引擎包括 Spark、Trino、Flink。Spark 主要做离线计算,Trino 主要是提供给数据分析师使用,做实时数据分析,Flink 是主力引擎,包括正常的写数据、导数据等。

并且引入了分布式缓存,因为 Iceberg 本身存在 HDFS、S3 或者 OPPO 自己的CubeFS 上,某些情况下性能是不达标的,为了解决这个问题,我们引进了分布式缓存,对性能进行提升。

02

实时湖仓

接下来介绍实时湖仓实施过程中的技术方案和优化策略。

1. 流批一体数仓

流批一体的数仓是比较简单的一个功能,图中是典型的流批一体链路,采用 Lamda 架构,埋点数据上报到实时的 Kafka 中,实时任务从 Kafka 中读取数据,执行后续的实时任务。离线的流程是先把数据抽到 Hive 表中,再定时地跑批一些日任务或者小时任务。
  • 统一存储
上线流批一体之后,直接把实时的 Kafka 改成流批一体的 DWD 表。通过一个 Flink 任务从 Kafka 里抽数,写到数据湖的一个表里,这个表可以提供流读和批读的能力,可以查也可以改数据。
  • 统一入口
第二步是前面讲到的一个目标,可以让用户非常容易地从原来的离线链路切到实时链路。所以我们希望至少面对用户来说引擎入口是统一的,系统可以完成一键自动切换,后台自动将日任务或小时级任务切换成 Flink 任务,对于用户来说是无感知的。第二点是统一 SQL,因为切换过程中 Flink SQL 和 Spark SQL 有些语法是不一致的,所以我们做了 SQL 统一方面的工作。
  • 统一引擎
下一阶段是统一引擎,引擎方面比较明显的是流批语法不一样,我们希望大部分是能够通用的,最终可以实现使用同一套语法。终极的目标是只保留一个引擎,Flink 引擎对流批的支持都比较好,但也还存在一些问题,Flink 在执行批任务的时候,其性能和 Spark 的差距还是比较大的。所以仍需要一些优化工作,使 Flink 和 Spark 性能对齐。

2. 统一入湖

前面讲到我们实际上是从采集入湖以后开始做链路改造,而理论上的数据湖,是数据会统一落到数据湖中,生成一个大宽表,后面再根据具体业务进行下游数据的开发抽取工作,形成不同的链路。这个表会非常大,性能要求很高,并且它不是 Flink 写入,大多数是 API 写入。我们最初是使用 Iceberg,但是发现 Iceberg 存在一些问题。
  • 客户端混乱,不清楚目前有哪些客户端,存在管理上的问题。
  • 性能方面,每个客户端都会往文件系统中写数据,并且把写的文件提交到数据湖的表中。源数据这边有一个锁,只能单线程写,去生成一个 snapshot,如果有几百个客户端,性能会存在瓶颈,出现堵塞的情况。
  • 数据正确性问题,如果当前有一个 API 执行过程中挂掉,需要重启后重新开始写,因为找不到 API,数据的回滚非常麻烦,因此要保证数据不重复不丢失是很难的。

3. 统一入湖优化

针对这些场景我们对数据统一入湖进行了优化,是通过 Glacier Server 来实现的。
  • 客户端增加标识:首先我们给客户端增加标识 uid
  • 统一提交:数据的提交从 API 客户端转到了 Glacier Server,客户端只执行数据的写入,Glacier Server 会进行统一提交。比如可以把 100 个任务直接提交成一个 snapshot,这样锁的瓶颈就少了很多,并且源数据写的性能也会提高,整体性能得到了很大提升。
  • 回滚设计:snapshot 进行了改造,记录每个文件是从哪个客户端来的,什么时间生成的,解决了 API 执行任务数据失败回滚的问题。

4. 秒级延迟实现

Iceberg 的延时是分钟级的,无法支持实时性要求非常高的场景,为了解决这一问题,我们实现了秒级延迟。

针对这一问题有一些开源方案,其技术思路是额外拉一条 Kafka 线来做 log store,去实现秒级延迟。

我们则是使用缓存来实现,抽象了流式文件。流式文件区别于原来的批文件,其特点是数据写进去后,下游立即可以看到,当文件达到一定长度或者超过一定的时间,会通过 Glacier Server 把流式文件 dump 成一个批文件,这样就跟原来的状态是一样的。相当于是用流式文件实现了实时功能。

例如,批任务读的时候,会先拿到 snapshot 下面所有的批文件,并且还会拿到所有的流文件,流文件会记录当前写入数据的位置,这样批任务就可以读到刚刚写进去的数据,从而可以实现非常低的延迟。

第二部分是流任务,OPPO 实现了 Flink stream reader,流任务会一直循环读流文件,直到流任务结束为止,Flink 任务主要消费的就是流文件,这样就实现了秒级延迟。

但是这样也会出现一个问题,本身 Flink 任务写数据的时候,是通过 checkpoint 提交到元数据里,然后下游才可见。如果这个任务挂了,相当于上一个周期写的数据是无效的,因为没有被提交。当下游再重启,再重写一遍新的进行提交,这样下游整体上看到数据才是对的。

但是如果切到流文件,因为他之前写的那个文件,下游已经能消费到了,已经被拿走了,重启后 Flink 任务恢复的时候有一部分数据会被重新写入,导致下游产生一些重复的数据,这在一些场景,特别是广告推送场景下是不能接受的。我们针对这一问题做了数据一致性的优化。

将流文件转化成批文件是一个 dump 过程,我们新增了一个叫 checkpoint dump 的逻辑,上游任务写入的流文件只有经过 checkpoint 检测,才会最终生成批文件,如果没有通过检测,这个文件在 dump 过程中就会被删掉。

最终的效果是流读的任务实际上还有一部分是会重的,但是批读的任务包括这个表最终的状态是正确的。不管上游任务怎么重写,整体上最终的状态是正确的,并且dump 完以后批任务读的数据也是对的,因此实现了最终一致性的效果。针对流任务,因为已经把数据消费出来,并写到下游的表中了,因此我们会启动一个修复的 fix task,把有问题的时间段的数据重新生成一遍,保证下游表也是正确的,下游还有表就一直生成下去,这样就实现了 Lambda 架构通过离线表去修复实时表,最终保证了数据的正确性。

03

数据入湖

接下来介绍数据入湖方面的工作。

1. Schema 自适应

Schema 自适应要解决的是上下游表 schema 对齐的问题。MySQL 数据导进来之后,表结构发生变更,下游无法自动识别会引起报错,导致数据无法写入。

我们对 Flink CDC 的逻辑进行了改动,添加了一个 DDL task,从 binlog 消费数据的时候,如果读到 schema 变动的任务,系统会同步给下游的表。另一个功能是给每条数据打上一个标记,就是 DataWithSchemaID,schema id 对应的是 Iceberg 表的 schema 版本,下游 data 具体写数据的任务,在收到数据以后,会对比当前写入的数据版本和新的数据版本是否一致,如果不一致,会从表里重新拉取新的 schema,这样就实现了 schema 自适应的功能。整体对于用户来说是无感知的,当上游 MySQL 表结构发生变更,下游会自动更新成新版本。

2. 数据源合并

因为所有数据都过 DDL task,MySQL 都有同步,所以实际上流量是比较小的。但是原来 Flink CDC 有个问题,即一个表必须用一个 Flink CDC 任务去跑,因此我们做了一个任务合并功能,把一个 DB 中的表,通过一个任务直接同步到下游多个表里。

3. Delete 优化

MySQL 删数据场景比较多,特别是主键的删除。简单来讲,我们对 delete file 做了优化,把 delete file 生成 snapshot,具体删的是哪个 delete file 数据,会拆成多个文件,并且根据情况把 equal delete 转成 position delete,也就是一个 delete 文件会直接标记出删除了哪些 delete file 的文件。查询时不需要把每个 delete file 全部过滤一遍,从而提高了查询的性能。并且增加了一个 bloom 过滤器,针对主键的删除,可以直接根据 bloom 过滤器判断是否在里面。

4. 索引加速

索引加速主要是针对数据分析的场景。我们与数据接入端有联合,接入端会采集用户的 SQL,我们会根据常用的 SQL 自动创建索引,同时我们自己也实现了一些索引,比如 Bit-map、Z-order,还有主键的索引。主键不是一一对应的,只是有一个范围,为了加速。Z-order 主要是针对多条件的场景,集成了 Iceberg 新版本的功能。

5. 样本拼接

样本拼接主要是针对机器学习的场景。这一场景的特点,首先是量很大,第二是样本数据会经常变,比如算法经常会增加特征,因此数据湖的表动态 schema 是非常有优势的。要把样本的特征和标签拼接到一起,我们有两版实现,最开始是用 Flink 的 rocksdb 去实现,在内存里做拼接,但非常占内存,资源消耗太高。第二个是用数据湖的表的 upsert 功能去拼接,存在的问题是因为拼接很多是随机的,数据存在 HDFS 上会对 HDFS 造成大量随机读取,样本的数据又非常大,导致对 HDFS 造成巨大的压力,性能也非常差。

最终我们综合了两种方案的优势,把大数据存在 DFS ,把 key 放在索引里,然后做拼接,只有拼接上之后,才会记录拼上的数据,通过流读数据时,就可以读到拼接上的一批数据。这样 HDFS 只需要实现顺序的扫描,并且把有用的数据拿出来,不需要像原来那样大量的随机读,最终实现了资源比较低且性能比较好的方案。

以上是流式的拼接。批式的拼接,还是按照原来 shuffle 的方案,并且会对文件定期合并,实现了高效率的拼接,性能可以达到每秒 7000 条。

04

技术规划

最后分享一下后续的技术规划。

第一个是缓存优化,目前使用的外部缓存,性能仍有待提升,后续计划进行更深入的优化,或尝试其他缓存。

第二个是流批一体的场景,我们希望能够统一引擎,至少都统一到 Flink 上。

第三个是机器学习的场景,目前大模型非常火,而数据湖在大模型方面也有很多应用场景。目前的表都是文件系统的表,比如 ORC 格式或 Parquet 格式。我们希望引入新内存格式,使模型训练可以纯内存完成,从而更好地对接一些模型训练的场景,也可以支持一些打通或优化方面的工作。

第四个是对前面所讲到的一些功能做部分的开源。

05

问答环节

Q1:分布式缓存使用的是哪一种?

A1:使用的是 Alluxio,做了一个文件,但因为它本身不支持实时写和实时读,所以我们做了改造使它支持实时写和实时读的功能。同时对 PRC 性能进行了优化。

Q2:该项目和其他的开源项目有什么区别?比如网易的 Arctic。

A2:和网易 Arctic 的有些功能是重叠的,网易更关注元数据管理、任务调度,在元数据方面,网易做得更好,有一个专门的元数据服务。我们的平台与底层的存储、顶层的引擎做了更深的结合。比如,通过缓存加速数据读写的性能、通过索引提升查询速度等等。
以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


陈哲嘉

OPPO

高级大数据平台工程师

oppo 大数据平台计算引擎负责人,在大数据离线实时计算,数据湖应用和优化等方向上有丰富的经验。

活动推荐


往期推荐


腾讯游戏数据分析中的湖仓一体化实践

从“数据虚拟化”之父Denodo创始人的视角,一探数据编织技术的发展落地与未来趋势

数据工程师如何应对巨量的取数需求?

爱奇艺埋点体系建设

玩转 A/B 实验,解锁评估策略长期效果新方案

快看漫画用户画像产品搭建

为什么又造了个新词 Data Warebase:我看到了 AI 时代数据平台应当的样子

阿里面向企业数字化的文档智能技术与应用

发动机铸造模具温度智能管理调节应用落地

懂车帝准实时指标体系架构及应用

华为盘古大模型微调实践

算法&大数据如何赋能?OPPO推荐领域降本增效指南


点个在看你最好看

继续滑动看下一个
DataFunSummit
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存