对于初学者来说,想要基于 Flink + Hologres 搭建实时数仓可能会感到无从下手,因为 Hologres 文档较为繁多,有时候难以找到切入点。为了帮助大家更好地理解和更快速地掌握这一过程,本文将从初学者的角度出发,详细演示如何从零开始搭建实时数仓。
原理概述
在开始演示如何通过 Flink 和 Hologres 搭建实时数仓之前,先简要介绍实时数仓的方案架构,以帮助你更好地理解实时数仓的原理。
如上图所示,这里的实时数仓分为 ODS 层、DWD 层、DWS 层。每一层的数据都存储在 Hologres 中(注:在 Hologres 中使用 Schema 来隔离不同的分层)。通过 Flink CDC 或其他数据集成工具,将 MySQL、Kafka、SLS 等数据源的数据采集到数仓里的 ODS 层。然后 Flink 通过订阅 ODS 层表的 Binlog 进行数据加工,进一步形成数据仓库的 DWD 层,并再次写入 Hologres。之后,Flink 继续订阅 DWD 层表的 Binlog,通过计算生成数据仓库的 DWS 层,并再次写入 Hologres。最后,由 Hologres 对外提供应用查询服务。
这种架构看起来与离线数据仓库相似,但它有一个独特的特点,即通过 Flink 作业串联,数据可以实现全链路实时流动。与之前的 Kappa 架构相比,此架构将每一层数据存储在 Hologres 中,而不再使用 Kafka。这样,每一层的数据都可以实时查询,实现了全链路实时可查的功能。此外,Hologres 的每一层数据都支持高效的写入和更新,解决了传统实时数仓解决方案中间层数据难以查询、更新和修正的问题。
在这个架构中,Hologres 主要有以下两大作用:
- 数据存储:Hologres 作为一个 OLAP 数据库,确保实时数仓的每一层数据都能够被查询和分析。
- 消息队列:由于实时数仓中的数据需要全链路实时流动,存储本身就需要充当消息队列,支持流式写入和读取,并且必须具备一定的顺序性。因此,Hologres 提供了 Binlog 能力,作为流式计算的上游,驱动 Flink 进行实时计算。
示例背景描述
本文以电商平台为例,通过 Flink CDC 实时采集业务系统的订单数据(MySQL)到 Hologres 中,然后对订单数据进行统计,实时计算当天的总 GMV、总有效 GMV。
GMV 指标通常称为网站成交金额,属于电商平台企业成交类指标,主要指拍下订单的总金额,包含付款和未付款两部分。
有效 GMV 指标是指付款成功的 GMV。
环境准备
- 开通 Hologres 服务;
- 开通实时计算 Flink 版;
- MySQL 数据库(用于模拟业务系统的数据源)。
注:Hologres、Flink、MySQL 网络互通。
MySQL数据准备
创建 MySQL 数据库:test:
CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */
创建订单表 tb_order:
CREATE TABLE `tb_order` (
`order_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单ID,主键',
`order_no` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '订单编号',
`order_status` int(11) NOT NULL COMMENT '订单状态,0-待支付,1-支付成功,2-支付失败',
`item_id` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '商品ID',
`user_id` int(11) NOT NULL COMMENT '买家ID',
`seller_id` int(11) NOT NULL COMMENT '卖家ID',
`amount` int(11) NOT NULL COMMENT '金额',
`num` int(11) NOT NULL COMMENT '数量',
`create_time` datetime NOT NULL COMMENT '创建时间',
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表'
往订单表 tb_order 写入两条记录:
INSERT INTO `test`.`tb_order` ( `order_id`, `order_no`, `order_status`, `item_id`, `user_id`, `seller_id`, `amount`, `num`, `create_time` )
VALUES
( 28505748, '230103254762075600734', 1, 'AvvDdmMcK2OrzMMGHY4', 103234, 943133, 16800, 1, '2023-01-03 01:32:02' );
INSERT INTO `test`.`tb_order` ( `order_id`, `order_no`, `order_status`, `item_id`, `user_id`, `seller_id`, `amount`, `num`, `create_time` )
VALUES
( 33990167, '230103211146535712213', 2, '4NNmxMwt8AO9wxPaFpY', 34875893, 8380271, 164901, 1, '2023-01-03 18:34:45' );
Hologres准备
在 Hologres 中,我们一般通过 Schema 来实现层级的划分。如下图所示,在数据库 rtdw 中创建三个 Schema 来隔离 ODS 层、DWD 层和 DWS 层。这种方法的优点在于能够更好地组织和管理数据,同时提高数据处理的效率和准确性。
具体操作步骤如下(因为是面向初学者,这里每一步操作都会尽量详细):
- 登录阿里云:
- 成功登录后,点击页面左上角的“控制台”入口:
- 进入控制台后,找到 Hologres 入口:
- 进入 Hologres 概览页,注意选择实例所在的地区,并点击“前往HoloWeb”链接:
- HoloWeb 页面如下:
- 进到 HoloWeb 页面后,选择 Hologres 实例,右键,进行登录:
- 在弹窗中选择数据库,然后点击“确认”按钮:
- 登录 Hologres 实例后,界面如下:
- 登录实例后,点击页面上方的“SQL编辑器”,新建三个 Schema,名称分别为 ods,dwd,dws:
创建 Schema SQL 语句:
--创建ods schema。
CREATE SCHEMA ods;
--创建dwd schema。
CREATE SCHEMA dwd;
--创建dws schema。
CREATE SCHEMA dws;
- 通过 HoloWeb 的 SQL 编辑器,在 ods 的 Schema 下新建表 ods.ods_tb_order,该表用于存放从业务订单表同步过来的数据,表的 DDL 如下(注意该表需要启用 binlog):
BEGIN;
CREATE TABLE ods.ods_tb_order (
order_id BIGINT NOT NULL,
order_no TEXT NOT NULL,
order_status BIGINT NOT NULL,
item_id TEXT NOT NULL,
user_id BIGINT NOT NULL,
seller_id BIGINT NOT NULL,
amount BIGINT NOT NULL,
num BIGINT NOT NULL,
create_time TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY ( order_id )
);
CALL set_table_property ( 'ods.ods_tb_order', 'orientation', 'column' );
CALL set_table_property ( 'ods.ods_tb_order', 'storage_format', 'orc' );
CALL set_table_property ( 'ods.ods_tb_order', 'bitmap_columns', 'order_no,item_id' );
CALL set_table_property ( 'ods.ods_tb_order', 'dictionary_encoding_columns', 'order_no:auto,item_id:auto' );
CALL set_table_property ( 'ods.ods_tb_order', 'distribution_key', 'order_id' );
CALL set_table_property ( 'ods.ods_tb_order', 'segment_key', 'create_time' );
CALL set_table_property ( 'ods.ods_tb_order', 'table_storage_mode', 'any' );
CALL set_table_property ( 'ods.ods_tb_order', 'time_to_live_in_seconds', '3153600000' );
CALL set_table_property ( 'ods.ods_tb_order', 'binlog.level', 'replica' ); -- 启用binlog
CALL set_table_property ( 'ods.ods_tb_order', 'binlog.ttl', '172800' ); -- binlog存留时间,单位:秒
COMMENT ON TABLE ods.ods_tb_order IS '订单表';
COMMENT ON COLUMN ods.ods_tb_order.order_id IS '订单ID,主键';
COMMENT ON COLUMN ods.ods_tb_order.order_no IS '订单编号';
COMMENT ON COLUMN ods.ods_tb_order.order_status IS '订单状态,0-待支付,1-支付成功,2-支付失败';
COMMENT ON COLUMN ods.ods_tb_order.item_id IS '商品ID';
COMMENT ON COLUMN ods.ods_tb_order.user_id IS '买家ID';
COMMENT ON COLUMN ods.ods_tb_order.seller_id IS '卖家ID';
COMMENT ON COLUMN ods.ods_tb_order.amount IS '金额';
COMMENT ON COLUMN ods.ods_tb_order.num IS '数量';
COMMENT ON COLUMN ods.ods_tb_order.create_time IS '创建时间';
END;
- 在 dws 的 Schema 下新建表 dws.dws_order_day,该表用于存放通过 Flink 实时计算的数据指标:当日总 GMV、总有效 GMV,表的 DDL 如下:
BEGIN;
CREATE TABLE dws.dws_order_day (
dt TEXT NOT NULL,
gmv BIGINT NOT NULL,
valid_gmv BIGINT NOT NULL,
update_time TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY ( dt )
);
CALL set_table_property ( 'dws.dws_order_day', 'orientation', 'column' );
CALL set_table_property ( 'dws.dws_order_day', 'storage_format', 'orc' );
CALL set_table_property ( 'dws.dws_order_day', 'bitmap_columns', 'dt' );
CALL set_table_property ( 'dws.dws_order_day', 'dictionary_encoding_columns', 'dt:auto' );
CALL set_table_property ( 'dws.dws_order_day', 'distribution_key', 'dt' );
CALL set_table_property ( 'dws.dws_order_day', 'segment_key', 'update_time' );
CALL set_table_property ( 'dws.dws_order_day', 'table_storage_mode', 'any' );
CALL set_table_property ( 'dws.dws_order_day', 'time_to_live_in_seconds', '3153600000' );
COMMENT ON TABLE dws.dws_order_day IS '每日订单指标';
COMMENT ON COLUMN dws.dws_order_day.dt IS '日期,格式:yyyyMMdd,主键';
COMMENT ON COLUMN dws.dws_order_day.gmv IS 'GMV';
COMMENT ON COLUMN dws.dws_order_day.valid_gmv IS '有效GMV';
COMMENT ON COLUMN dws.dws_order_day.update_time IS '更新时间';
END;
至此,关于 Hologres 的准备工作已经完毕。
使用Flink进行数据处理
我们通常会通过基于 Catalog 功能进行 Flink SQL 作业开发,以便提高开发效率和保证数据的正确性。也就是说,在开发 Flink SQL 作业之前,需要在工作空间中注册源端和目标端的 Catalog。本示例的源端 Catalog 是指 MySQL Catalog,目标端 Catalog 是指 Hologres Catalog。
Catalog 提供了元数据信息,例如数据库、表、分区、视图和函数,以及访问存储在数据库或其他外部系统中的数据所需的信息。
数据处理中最关键的一个方面是管理元数据。它可以是临时元数据,如临时表,或者是注册在表环境中的 UDF。也可以是永久元数据,比如 Hive Metastore 中的元数据。Catalog 提供了一个统一的 API 来管理元数据,并使其可以从 Table API 和 SQL 查询中访问。
目录使用户能够引用其数据系统中的现有元数据,并自动映射到 Flink 的相应元数据。例如,Flink 可以自动将 JDBC 表映射到 Flink 表,用户无需手动在 Flink 中重新编写 DDL。Catalog 极大地简化了使用 Flink 与用户现有系统开始工作所需的步骤,并大大增强了用户体验。
具体操作步骤如下(因为是面向初学者,这里每一步操作同样会尽量详细):
- 与 Hologres 操作步骤 1~2 一样,成功登录后,点击页面左上角的“控制台”入口,进入控制台后,再找到“实时计算Flink版”入口:
- 进入“实时计算Flink版”概览页后,找到对应的 Flink 实例的“控制台”入口:
- 进入“系统概览”页后,再点“SQL开发”链接:
- 在“作业草稿”中新建三个文件夹:ods、dwd、dws(与 Hologres 层级划分保持一致),分别用于存放各个层级的实时计算作业。同时,为便于管理 Catalog SQL 和退役作业,也可以新建两个文件夹 catalog 和“退役作业”:
- 在 catalog 文件夹下新建一个名为 hologres-catalog 的 SQL 作业,输入创建 Hologres Catalog 的命令(修改目标参数取值),选中代码片段后单击左侧代码行上的“运行”:
创建 Hologres Catalog 的 SQL 命令如下:
--********************************************************************--
-- Author: johnson
-- Created Time: 2024-01-30 15:12:37
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
CREATE CATALOG `rtdw` WITH (
'type' = 'hologres',
'endpoint' = '***************.hologres.aliyuncs.com:80',
'username' = '***************',
'password' = '***************',
'dbname' = 'rtdw',
'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。
'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。
'cdcmode' = 'true',
'connectionpoolname' = 'the_conn_pool',
'ignoredelete' = 'true', -- 宽表merge需要开启,防止回撤。
'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
'table_property.binlog.ttl' = '259200'
);
- 参考步骤 5,创建 MySQL Catalog:
--********************************************************************--
-- Author: johnson
-- Created Time: 2024-01-30 15:17:49
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
CREATE CATALOG mysqlcatalog WITH(
'type' = 'mysql',
'hostname' = '172.24.17.1',
'port' = '3306',
'username' = '*****',
'password' = '**************',
'default-database' = 'test'
);
- 接下来就是开发 Flink SQL 作业,首先使用 Flink CDC 同步业务系统数据至 Hologres 的 ODS 层。在 ods 文件夹下新建一个名为 mysql2holo_ods_tb_order 的 Flink SQL 作业,详细操作如下:
在查询脚本文本编辑区域,粘贴以下代码:
--********************************************************************--
-- Author: johnson
-- Created Time: 2024-01-30 22:52:21
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
USE CATALOG rtdw;
CREATE TABLE IF NOT EXISTS rtdw.`ods.ods_tb_order`
WITH ('jdbcWriteBatchSize' = '8', 'jdbcWriteFlushInterval' = '20')
AS TABLE mysqlcatalog.`test`.`tb_order`
/*+
OPTIONS(
'server-id'='6001-6050',
'scan.startup.mode'='earliest-offset'
)
*/
;
- 接下来是部署 Flink 作业,详细步骤如下:
- 如果作业部署失败,则根据返回的异常信息对作业进行修改。如果部署成功,则点页面左侧的“作业运维”导航,进入作业运维页面,找到你刚刚部署的作业,并点击作业的名称,进行资源配置(CPU 与内存的分配比例推荐为 1:4):
- 配置好资源后,点击页面右上角“启动”按钮即可启动作业。如果作业正常运行,你可以通过 Hologres 中查看表 ods.ods_tb_order 的数据。
最后是实时统计当天总 GMV、总有效 GMV 的作业开发。其开发与部署步骤同 Flink CDC 采集作业类似,这里不再赘述。
订单实时统计作业代码如下:
--********************************************************************--
-- Author: johnson
-- Created Time: 2024-01-31 01:34:01
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
USE CATALOG rtdw;
INSERT INTO rtdw.`dws.dws_order_day`
SELECT
DATE_FORMAT(create_time,'yyyyMMdd') AS dt
,SUM(amount) / 100 AS gmv
,SUM(amount) FILTER (WHERE order_status <> 4) / 100 AS valid_gmv
,CURRENT_TIMESTAMP AS update_time
FROM rtdw.`ods.ods_tb_order`
/*+ OPTIONS('jdbcScanFetchSize'='8','binlogBatchReadSize'='1') */
WHERE create_time IS NOT NULL
AND create_time >= CAST(CURRENT_DATE AS TIMESTAMP)
GROUP BY
DATE_FORMAT(create_time,'yyyyMMdd');
小结
本文简要概述了基于 Flink + Hologres 搭建实时数仓的架构原理,并通过电商平台实时统计 GMV 的作业,演示了在实时数仓开发中,Hologres 需要哪些操作,以及实时计算 Flink 版如何进行作业的开发与部署。