查看原文
其他

优刻得:使用 USDP 实践近实时数据湖仓

DataFunTalk
2024-09-10

导读 本文将分享优刻得所提出的基于 USDP的流式数据湖仓解决方案。

主要包括以下几大部分:

1. 背景2. 实践方案3. 方案优势4. 实践场景5. 场景设计6. 总结


01

背景

在数字化转型的大潮中,企业对于数据的实时性需求日益增长。尽管传统的离线数据仓库在数据存储和管理方面已经建立了成熟的架构体系,但其周期性的数据集成和更新策略,也往往造成用户无法及时获得数据变化,更无法通过处理数据变化而获得及时的决策策略支持。

在这种模式下,数据的实时性严重受限于作业调度的频率,通常只能实现每小时或每日的更新,导致数据使用者只能接触到过时的信息,即上一小时或前一天的数据,尤其巨量的数据规模仍在按天大幅度增加的情况下,无法满足快速变化的业务需求,严重制约了用户业务发展。此外,传统数据仓库在数据更新时采用的分区覆写(OVERWRITE)策略,需要首先读取分区内的旧数据,再与新数据进行合并,最后才能生成并存储最新的数据结果。这不仅增加了数据处理的复杂性,也推高了存储、计算、数据运维成本,严重阻碍生产效率提升。

02

实践方案

为了解决上述挑战,优刻得提出了一种基于 USDP(UCloud Smart Data Platform)的流式数据湖仓解决方案。该方案以 Flink 为实时数据处理引擎,结合 Paimon 的湖存储格式,实现了流批一体的数据管理,可有效拓展传统数据仓库在实时数据处理方面的局限性

从数据源接入开始,采用 Flink MySQL CDC 连接器,以最小侵入性的方式,利用了 MySQL 的二进制日志解析技术,通过订阅并解析 binlog 中的 DML 事件,实现对数据库变更的低延迟感知。该连接器支持增量快照读取(Incremental Snapshot Read),即在初始全量快照读取后,仅同步增量变更。而在数据处理环节中,使用 Flink 作为流处理引擎,充分利用其事件驱动、精确一次(exactly-once)处理语义以及容错机制等特性,构建一个动态的高吞吐数据流处理管道,保证了数据处理的准确性和一致性;利用 Flink 的窗口函数和聚合函数,对数据进行实时聚合计算,优化查询性能。接入和处理后的数据采用数据湖格式进行存储,该格式针对数据湖环境进行了特定优化,支持高效的数据检索和快速更新,同时保持数据存储的低成本效益。

结合业务架构和实时特性等使用需求,可按需引入其他 CDC 的数据同步工具、消息管道等服务,确保数据的实时传递效率,实现秒级数据统计分析。基于流计算框架构建流数据分层处理和数据流转,复用传统数仓中对数据分层处理的经验、架构和部分算子。

如何在现有的数仓架构基础上使用新技术,避免对已有架构的颠覆性改造,并能以极低的成本进行架构演进,是用户一直追寻的路径。

本方案的优势在于其对现有数据仓库和任务流的高度兼容性,使得改造成本相对较低允许企业通过软件层面的优化实现向近实时数据湖仓的转型,而无需进行大规模的硬件投资或系统重构。

以下是几个实际改造的案例,展示了如何通过软件层面的低成本改造,实现显著的性能提升:

数据集成层升级

在基于数据仓库架构基础上,通过集成 Flink 作为数据集成层,对接企业内各业务系统的数据源(数据库/存储系统),并动态处理实时产生的数据流,复用现有的数据存储系统及设施。例如,某电商平台在不更换数据库的情况下,扩展并利用 Flink 捕获实时交易数据,并直接集成到数据湖仓中,为电商业务增加了实时调度和运营策略的业务支撑。

任务流自动化改造:

基于即有数据分析架构,通过引入 Flink 的事件驱动架构,对现有的任务流进行自动化改造,减少对传统批处理任务调度(T+1)的依赖,从天级更新提升至分钟级数据更新。例如,某制造企业的数据分析团队利用 Flink 替代原有的定时批处理作业,实现厂区产能数据的实时监控和分析。

存储格式适配:

将现有的数据存储(如 Hive 表或 Parquet 文件)适配到 Paimon 格式,在复用 Hive元数据的基础上,对数据表进行湖表格式的改造,继续使用 HDFS 存储系统,不仅提高了数据读写效率,支持了数据更新特性,而且数据分析工作流并未进行任何调整。例如,某物流公司在保留现有存储硬件的基础上,将订单数据迁移到 Paimon 格式,以支持更快速的查询和分析。

CDC 技术集成:

利用 Flink CDC Connectors,在不改变现有数据库架构的前提下,实时捕获数据库的变更数据变化,并实时进行数据统计与聚合。例如,某金融机构在不修改数据库配置的情况下,实时同步交易流水数据到数据湖仓,进而扩展实现了实时风控分析管理能力。

查询性能优化:

通过将 Paimon 作为数据湖存储格式,可以在不更换现有查询引擎的情况下,提升数据查询性能。例如,一个在线广告平台可以在保留现有查询工具的基础上,通过 Paimon 快速响应广告投放效果的实时分析需求。

现实中,企业往往受限于现有技术架构的固化约束,以及海量的数据分析任务改造成本压力,通过上面的客户改造案例,不难看出,本方案的改造成本相对还是较低的,主要因为方案依赖于分析业务软件架构的扩展升级和优化,并不对硬件投资或对架构的大规模重构。这种以软件升级为核心的改造策略,不仅经济高效,而且能够快速实现,帮助企业迅速对近实时数据处理的能力拓展,进一步夯实现有系统的稳定性和可靠性。

03

方案优势

站在企业数据处理未来的角度,优刻得推出的流式数据湖仓解决方案,不仅提供实时数据处理所需的高效性和灵活性,还确保了数据存储的经济性和可扩展性。借助 Flink 和 Paimon 以及其他流批一体相关生态等先进技术,企业可以实现从数据摄取、处理到存储和分析的全流程自动化,大幅提高决策效率。这套方案,不仅是一次技术上的突破,更是一次业务上的飞跃,它将助力企业把握每一个数据驱动的机会,是企业构建高效、灵活数据湖仓的理想选择

以下是该方案的关键优势:

实时性与低延迟查询:

实现秒级数据变更传递,将传统数据仓库的延时从数小时甚至数天缩短至分钟级,为决策者提供了即时的数据洞察和决策支撑。

高效的数据更新与维护:

Paimon 的 LSM 树与增量数据机制,确保大数据量更新的同时快速响应更新,Upsert 操作极大的简化了复杂低效的数据覆写操作。

简化的数据处理流程:

利用 Flink SQL 实现 ETL 流程,简化了数据处理的复杂性。ODS、DWD 和 DWS 层的数据统一存储于 Paimon 中,减少了数据流转的复杂性,提升了数据处理的效率。

灵活的数据合并策略:

数据湖 Paimon 灵活的数据合并机制,包括去重、部分更新和预聚合等策略,允许用户通过 merge-engine 参数进行灵活配置,根据应用场景精确控制数据合并行为。

全面的增量数据生成:

通过配置参数优化和调整,为各种输入数据流定制合适的增量数据处理策略。无论是对历史数据不敏感的系统,还是需要快速响应数据变化的应用,或是对数据延迟更新容忍度较高的场景。

优化的数据湖存储:

数据湖存储格式,支持主流且丰富的开源引擎,如 Flink、Spark、StarRocks、Doris 和 Trino 等,实现数据湖与数据仓库的无缝集成。

高效的数据同步与查询:

利用 Flink CDC 和 Paimon 的变更日志流读功能,实现数据库的实时同步和增量快照,为实时分析和批式分析提供了强大的支持。

04

实践场景

本实践案例以一家电商平台为背景,通过建立流式数据湖仓,成功实现了数据的高效处理和清洗,并提供了数据查询服务,助力电商平台实现数据的即时处理与深度洞察。利用 Flink MySQL CDC 技术,从 MySQL 中实时捕获数据流,借助 Flink 和 Spark 强大的流数据处理能力,对收集到的数据进行快速清洗与分析,从而在订单管理、库存监控、用户行为分析、实时定价、市场动态监测、广告投放优化等多个业务场景中取得显著成效,确保实时数据价值能够在关键时刻发挥其作用。这不仅提升了用户体验,还增强了市场响应速度,为电商平台的智能化转型提供了坚实基础。通过该技术的落地实践,推动电商平台以数据驱动决策,实现个性化推荐,优化营销策略,最终推动业务增长。

该场景支持复用传统数仓的数据分层架构,满足了业务报表查询(如交易监控、用户行为分析、用户标签画像)和个性化推荐等多种应用需求。以 HDFS 或以及新型高性能对象存储为基础构建数据湖,统一数据资源库。结合元数据管理工具,为整套平台提供数据质量和一致性保障,准确性大幅提升。

本实践案例通过 USDP 大数据平台获得一站式环境承载:

1、StreamPark-2.1.3

2、Flink-1.16.3

3、Paimon-0.7.0

4、StarRocks-3.2.3

05

场景设计

本场景包含以下几个部分组成:

1.数据源端:通过 StreamPark 开发平台执行 Flink DataGen 任务以产生业务模拟数据。

首先,创建名为 `order_db` 的电商订单的 MySQL 数据库,并在其中构建三个电商业务表:订单表(orders)、订单支付表(orders_pay)和商品类别表(product_catalog),随后使用 Flink DataGen 工具模拟用户生成各表数据,并将这些数据存储到 MySQL 数据库中。

在 MySQL 创建数据库和表:

-- 创建订单库CREATE DATABASE IF NOT EXISTS order_db;
USE order_db;
-- 订单表CREATE TABLE `orders` ( order_id bigint primary key, user_id bigint, shop_id bigint, product_id bigint, buy_fee bigint, create_time TIMESTAMP(3), update_time TIMESTAMP(3), state int);
-- 订单支付表CREATE TABLE `orders_pay` ( pay_id bigint primary key, order_id bigint, pay_platform int, create_time TIMESTAMP(3));
-- 商品类别表CREATE TABLE `product_catalog` ( product_id bigint primary key, catalog_name varchar(50), create_time TIMESTAMP(3));

用 Flink SQL 生成商品数据:

-- 商品CREATE TABLE product_catalog ( product_id bigint PRIMARY KEY, catalog_name varchar(50), create_time TIMESTAMP(3)) WITH ( 'connector' = 'datagen',
-- product_id: 模拟有限数量的商品 'fields.product_id.kind'='sequence', 'fields.product_id.start'='202403000001', 'fields.product_id.end'='202403002000',
-- catalog_name: 随机生成 'fields.catalog_name.kind'='random', 'fields.catalog_name.length'='6');
-- 定义 MySQL 表以接收生成的商品数据CREATE TABLE product_catalog_sink ( product_id bigint PRIMARY KEY NOT ENFORCED, catalog_name varchar(50), create_time TIMESTAMP(3)) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db', 'table-name' = 'product_catalog', 'username' = 'username', 'password' = 'password');
-- 将商品类别数据写入 MySQLINSERT INTO product_catalog_sinkSELECT * FROM product_catalog;

用 Flink SQL 任务生成订单数据:

-- 订单CREATE TABLE orders ( order_id bigint primary key, user_id bigint, shop_id bigint, product_id bigint, buy_fee bigint, create_time TIMESTAMP(3), update_time TIMESTAMP(3), state int) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', -- 控制每秒生成的行数
-- order_id: 递增的整数 'fields.order_id.kind'='sequence', 'fields.order_id.start'='10001', 'fields.order_id.end'='30000', -- user_id: 随机生成,模拟有限数量的用户 'fields.user_id.kind'='random', 'fields.user_id.min'='202401000001', 'fields.user_id.max'='202401002000',
-- shop_id: 随机生成,模拟有限数量的商店 'fields.shop_id.kind'='random', 'fields.shop_id.min'='202402000001', 'fields.shop_id.max'='202402002000',
-- product_id: 随机生成,模拟有限数量的商品 'fields.product_id.kind'='random', 'fields.product_id.min'='202403000001', 'fields.product_id.max'='202403002000',
-- buy_fee: 随机生成购买费用 'fields.buy_fee.kind'='random', 'fields.buy_fee.min'='1', 'fields.buy_fee.max'='10000',
-- state: 随机生成 'fields.state.kind'='random', 'fields.state.min'='0', 'fields.state.max'='1');
-- 支付CREATE TABLE orders_pay ( pay_id bigint primary key, order_id bigint, pay_platform int, create_time TIMESTAMP(3)) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', -- 控制每秒生成的行数
-- pay_id: 递增的整数 'fields.pay_id.kind'='sequence', 'fields.pay_id.start'='2010001', 'fields.pay_id.end'='2030000',
-- order_id: 递增的整数 'fields.order_id.kind'='sequence', 'fields.order_id.start'='10001', 'fields.order_id.end'='30000',
-- pay_platform: 随机生成 'fields.pay_platform.kind'='random', 'fields.pay_platform.min'='1', 'fields.pay_platform.max'='9');
-- 定义 MySQL 表以接收生成的订单数据CREATE TABLE orders_sink ( order_id bigint primary key, user_id bigint, shop_id bigint, product_id bigint, buy_fee bigint, create_time TIMESTAMP(3), update_time TIMESTAMP(3), state int) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db', 'table-name' = 'orders', 'username' = 'username', 'password' = 'password');
-- 定义 MySQL 表以接收生成的支付数据CREATE TABLE orders_pay_sink ( pay_id bigint primary key, order_id bigint, pay_platform int, create_time TIMESTAMP(3)) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db', 'table-name' = 'orders_pay', 'username' = 'username', 'password' = 'password');
-- 将订单数据写入 MySQLINSERT INTO orders_sinkSELECT * FROM orders;
-- 将支付数据写入 MySQLINSERT INTO orders_pay_sinkSELECT * FROM orders_pay;
2.构建 ODS 层:利用 Flink 的实时处理能力,将 MySQL 中的订单表、订单支付表和商品类别表实时同步到 HDFS 上,并采用 Paimon 格式存储,形成操作数据存储(ODS)层。使用 paimon-flink-action 执行整库数据同步。

mysql_sync_database--warehouse hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse--database order_db--mysql_conf hostname=usdp.ucloudstack.com--mysql_conf username=username--mysql_conf password=password--mysql_conf database-name=order_db--table_conf bucket=-1--table_conf changelog-producer=input--table_conf sink.parallelism=4


该数据同步方式,首先对该库中的表数据执行全量同步,并持续监听源端数据库,当有新数据产生时,对新数据变化进行实时且增量地同步到目标库表。

3.构建 DWD 层主题宽表:在这个过程中,利用 Paimon 的部分数据更新机制,通过 Flink SQL 任务将订单表(orders)、商品类别表(product_catalog)和订单支付表(orders_pay)合并成主题宽表,形成 DWD 明细数据层。

具体操作则是通过维度表关联将订单表与商品类别表合并,并将结果与订单支付表结合,最终写入订单主题宽表(dwd_orders)。Paimon 的部分更新机制允许根据 order_id 更新 orders 和 orders_pay 表的数据,实现数据的扩宽。这个过程实时生成 DWD 层的宽表,并输出数据变更记录(Changelog)。


CREATE CATALOG paimon_catalog WITH ( 'type'='paimon', 'warehouse'='hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse');
USE CATALOG paimon_catalog;use order_db;
CREATE TABLE IF NOT EXISTS dwd_orders ( order_id BIGINT, order_user_id BIGINT, order_shop_id BIGINT, order_product_id BIGINT, order_product_catalog_name STRING, order_fee BIGINT, order_create_time TIMESTAMP(3), order_update_time TIMESTAMP(3), order_state INT, pay_id BIGINT, pay_platform INT, pay_create_time TIMESTAMP(3), PRIMARY KEY (order_id) NOT ENFORCED) WITH ( 'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表 'partial-update.ignore-delete' = 'true', -- 忽略 DELETE 数据,避免运行报错 'changelog-producer' = 'lookup' -- 使用 lookup 增量数据产生机制以低延时产出变更数据);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- 减轻检查点长尾的影响。SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 消除无用的 SinkMaterialize 算子。
-- Paimon结果表在每次检查点完成之后才会正式提交数据。-- 此处将检查点间隔缩短为 10s,是为了更快地看到结果。-- 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。SET 'execution.checkpointing.interval' = '10s';SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO dwd_orders-- orders 表提供主要字段SELECT o.order_id, o.user_id, o.shop_id, o.product_id, dim.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, CAST(NULL AS BIGINT) AS pay_id, CAST(NULL AS INT) AS pay_platform, CAST(NULL AS TIMESTAMP(3)) AS pay_create_timeFROM orders o LEFT JOIN product_catalog dim ON o.product_id = dim.product_idUNION ALL-- orders_pay 表提供支付相关字段SELECT order_id, CAST(NULL AS BIGINT) AS user_id, CAST(NULL AS BIGINT) AS shop_id, CAST(NULL AS BIGINT) AS product_id, CAST(NULL AS STRING) AS order_product_catalog_name, CAST(NULL AS BIGINT) AS order_fee, CAST(NULL AS TIMESTAMP(3)) AS order_create_time, CAST(NULL AS TIMESTAMP(3)) AS order_update_time, CAST(NULL AS INT) AS order_state, pay_id, pay_platform, create_timeFROM orders_pay;



4.构建 DWM 层:通过 Flink SQL 作业,将 dwd_orders 表的数据迁移至 dwm_users_shops 表。同时,利用 Paimon 的预聚合机制对 order_fee 字段进行求和,以计算每位用户在各个商户的总消费额。此外,通过对常数 1 进行求和,用来统计用户在各个商户的消费次数。

CREATE CATALOG paimon_catalog WITH ( 'type'='paimon', 'warehouse'='hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse');
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS order_db;use order_db;
-- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。CREATE TABLE IF NOT EXISTS dwm_users_shops ( user_id BIGINT, shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额', pv BIGINT COMMENT '当日用户在商户购买的次数', PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果 'changelog-producer' = 'lookup', -- 使用 lookup 增量数据产生机制以低延时产出变更数据 -- dwm 层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。 'file.format' = 'avro', -- 使用 avro 行存格式的写入性能更加高效。 'metadata.stats-mode' = 'none' -- 放弃统计信息会增加 OLAP 查询代价(对持续的流处理无影响),但会让写入性能更加高效。);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO dwm_users_shopsSELECT order_user_id, order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, order_fee, 1 -- 一条输入记录代表一次消费FROM dwd_ordersWHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;


5.构建 DWS 层:在构建数据仓库服务层(DWS)的过程中,将使用 Flink 处理宽表的实时数据变更,并利用 Paimon 的预聚合功能来生成用户-商户聚合中间表(dwm_users_shops)。

Flink SQL 作业会将 dwm_users_shops 表中的数据汇总到 dws_users 表。通过 Paimon 的预聚合合并机制,对 payed_buy_fee_sum 字段进行求和,以计算每一位用户的总消费额。同时,数据也会被汇总到 dws_shops 表,并继续对 payed_buy_fee_sum 字段求和,得出商户的总流水。此外,还会统计支付购买费用的用户数量(通过字段 1 求和)以及总消费人次数(通过 pv 字段求和)。这一数据处理体系为电商平台的业务决策提供了坚实的数据基础。

CREATE CATALOG paimon_catalog WITH ( 'type'='paimon', 'warehouse'='hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse');
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS order_db;use order_db;
-- 用户维度聚合指标表。CREATE TABLE IF NOT EXISTS dws_users ( user_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额', PRIMARY KEY (user_id, ds) NOT ENFORCED) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 -- 由于 dws_users 表不再被下游流式消费,因此无需指定增量数据产生机制);
-- 商户维度聚合指标表。CREATE TABLE IF NOT EXISTS dws_shops ( shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额', uv BIGINT COMMENT '当日不同购买用户总人数', pv BIGINT COMMENT '当日购买用户总人次', PRIMARY KEY (shop_id, ds) NOT ENFORCED) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果 'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果 -- 由于 dws_shops 表不再被下游流式消费,因此无需指定增量数据产生机制);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';SET 'execution.checkpointing.min-pause' = '10s';
-- 与 dwd 不同,此处每一条 INSERT 语句写入的是不同的 Paimon 表,可以放在同一个作业中。BEGIN STATEMENT SET;
INSERT INTO dws_usersSELECT user_id, ds, payed_buy_fee_sumFROM dwm_users_shops;
-- 以商户为主键,部分热门商户的数据量可能远高于其他商户。-- 因此使用 local merge 在写入 Paimon 之前先在内存中进行预聚合,缓解数据倾斜问题。INSERT INTO dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */SELECT shop_id, ds, payed_buy_fee_sum, 1, -- 一条输入记录代表一名用户在该商户的所有消费 pvFROM dwm_users_shops;
END;
跟踪业务数据的变化

在电商平台的业务运营中,数据的实时性和准确性对于业务决策至关重要的。通过构建流式数据湖仓,实现对数据架构的能力提升,降低数据处理成本的同时提升了数据处理效率。批流融合的技术应用,极大的提升了数据的实时价值挖掘效率,为业务增长提供了强有力的数据支撑。

通过对用户消费行为分析和实时用户付费数据,结合用户兴趣点及周期性购买需求,平台可以构建用户消费行为模型,持续预测用户未来消费趋势,实现精准营销。通过实时销售数据使商户能够及时调整销售策略,进而调整促销活动、库存分配及供应链优化。通过结合订单明细和商品销售数据,优化商品推荐系统,实现精准的商品推荐,提升用户购买意愿、复购意愿,实现平台销售额的大幅提升。借助与实时数据分析技术能力,帮助平台快速捕捉市场动态,提升大促活动中用户消费额度,并结合节日促销、季节性商品需求变化等,及时调整营销计划。这些实时统计和深入分析技术,帮助电商平台能够实现数据驱动的决策,实现业务增长和利润最大化。

如下数据查询场景中,通过使用 Flink SQL 或者其他查询引擎(例如 Hive、StarRocks、Trino)查看用户付费、店铺销售额度统计、商品排名、订单明细、数据报表等业务数据的变化。

用户付费情况实时查询

-- 查看 dws_users 表数据SELECT * FROM dws_users ORDER BY user_id;
实时关注用户单日消费额度变化,为个性化营销活动提供数据支持。通过实时数据分析,平台能够及时发现并响应用户消费行为的变化,实现动态定价、优惠推送、精准推荐。

商户销售情况明细实时查询

-- 查看 dws_shops 表数据SELECT * FROM dws_shops ORDER BY shop_id;

实时洞察各商铺销售额变化以及活跃用户数、购买人次等关键指标,结合这些数据,可进一步分析已消费用户群体、以及与这些商品相关的购买人群特征,为商户管理提供决策支持。利用实时销售数据,商户可以优化商品推荐逻辑,快速调整库存和营销策略,优化销售模式,提升销售业绩。

商户销售额排名分析

-- 查询 2024 年 4 月 12 日交易额前三高的商户SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum FROM dws_shopsWHERE ds = '20240412' ORDER BY rnLIMIT 3;

通过关注 Top 级商户在特定日期的销售额等数据,深入分析其成功因素。结合分析排名,进一步探索这些商户的用户特征和购买行为,为精准营销和商品推荐提供依据。

订单明细深入分析


-- 查询某个客户 2024 年 4 月特定支付平台支付的订单明细SELECT * FROM dwd_orders WHERE order_create_time >= '2024-04-01 00:00:00' AND order_create_time < '2024-05-01 00:00:00' AND order_user_id = 202401000063 AND pay_platform = 2ORDER BY order_create_time;

深入分析订单明细,以及特定支付平台的用户订单行为,为支付渠道优化提供数据支持。进一步了解用户消费偏好,为产品推荐算法提供输入,提升用户满意度和忠诚度。

商品数据综合报表


-- 查询 2024 年 4 月内每个品类的订单总量和订单总金额SELECT DATE_FORMAT(order_create_time, 'yyyyMMdd') AS order_create_date, order_product_catalog_name, COUNT(*), SUM(order_fee)FROM dwd_ordersWHERE order_create_time >= '2024-04-01 00:00:00' and order_create_time < '2024-05-01 00:00:00'GROUP BY order_create_date, order_product_catalog_nameORDER BY order_create_date, order_product_catalog_name;


按商品各品类的订单总量和总金额统计报表,为商品管理和采购决策提供数据支持。结合报表数据,分析商品销售趋势,预测市场动向,为新品开发和库存管理提供科学依据。

06

总结

对于电商平台而言,实时数据的处理和分析关乎用户体验、销售业绩和市场竞争力。本案例以一家领先的电商平台为例子,展示了如何通过实践流式数据湖仓解决方案,将实时数据价值转化为业务增长引擎。而在传统数据仓库模式下,数据更新周期长,无法满足快速变化的业务需求。该电商平台面临的挑战是如何在保证数据准确性的同时,实现数据的实时处理和分析。

为此,该电商平台客户引入了优刻得智能大数据平台 USDP,通过 USDP 一站式承载数据仓库业务的同时,扩展实践基于 Flink 实时数据处理引擎和 Paimon 数据湖仓解决方案,成功实现了实时订单处理,显著提升了客户满意度和订单处理效率。同时,实时数据流的应用使得库存管理更加精准,有效预防了超卖,并优化了库存周转率和供应链优化。围绕用户深入分析购买行为,抓住用户浏览瞬间,通过精准商品推荐,促进用户下单。

扫码咨询优刻得USDP大数据产品及方案

 点击『阅读原文』并获得产品资料~

继续滑动看下一个
DataFunTalk
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存