亚马逊AWS官方博客

EMR Flink-Hudi 实时分析系统成本优化

引言

在电子商务行业中,EMR、Flink 和 Kafka 的组合为实时库存管理和动态定价提供了强大的解决方案。通过 Kafka 实时捕获销售、库存和市场数据,Flink 能够迅速处理这些信息流,实现库存水平的即时更新和价格的动态调整。EMR 则提供了可扩展的计算资源,支持复杂的分析算法。这种实时数据处理能力使企业能够精确把握库存状况,避免过度库存或缺货情况,同时根据市场需求和竞争对手价格实时优化定价策略。结果是库存准确性显著提高,通常达到 95% 以上,而动态定价策略则可带来 5-10% 的收入增长。这不仅提升了运营效率,还增强了企业在快速变化的电商市场中的竞争力,为客户提供了更好的购物体验。

某大型电商客户当前的实时数据分析系统使用了以下技术栈:

– 数据源:RDS(关系型数据库服务)

– 数据采集:Flink CDC(变更数据捕获)

– 数据传输:Kafka

– 数据处理:EMR 上的 Flink(8 个 r6a.4xlarge 节点)

– 数据存储:Hudi(使用 Copy on Write 模式)

– 数据查询:StarRocks(通过 Multi-Catalog 关联 Hudi 外表)

在目前的架构中遇到的主要挑战包括:

  1. 数据延迟问题:Flink 直接写入 Hudi 时,数据延迟达到几个小时。客户目前通过 Kafka 中转后,延迟降至约 10 分钟,延迟依然过高。
  2. 架构复杂性:涉及多个中间件(Flink、Kafka、Hudi),增加了系统的复杂性。导致维护成本增加和系统管理难度提高。
  3. 资源利用:使用了 8 个 EMR Flink 节点,每个配置为 r6a.4xlarge,资源利用效率比较低。
  4. 查询性能 concerns:当前使用 Copy on Write (COW) 模式以应对高查询负载。在保证查询性能不下降的情况是否可以采用 Merge on Read (MOR) 模式。

针对客户当前的痛点,通过在架构简化和调整使用方式来满足客户针对性能提升与成本优化之间最佳平衡。架构改进计划:

目标:不改造数据链路的前提下,降低成本。

具体措施:通过 EMR Flink CDC 集群将数据直接从数据源写入 Hudi

POC 内容

1. 直接写入 Hudi:

  • 不通过 Kafka,直接从 RDS 和 Flink CDC 写入 Hudi,验证架构改进方案的性能和成本对比原方案。
  • POC 可以模拟 RDS 产生一些实时数据,数据规模大约在每日千万级新增数据,字段大约在 200-300 左右。

2. 测试指标:

  • Hudi 表中进行 ad-hoc 查询,目标可以查询到最新数据,延迟在分钟级。
  • 统计整体方案的成本,需要包含 CDC + 查询 workload 的综合成本。

环境准备

集群配置

集群 A 配置 用于实时入仓

配置项 详细信息
实例配置
主节点 1 × m5.xlarge
核心节点 4 × r6g.4xlarge
应用程序版本
Flink 1.18.1
Hadoop 3.3.6
Hive 3.1.3
集群状态 正在等待
运行时间 2 天 19 小时

集群 B 配置 用于 AD-Hoc 查询

配置项 详细信息
实例配置
主节点 1 × m5.xlarge
核心节点 1 × m5.xlarge
应用程序版本
Presto 0.285
Hadoop 3.3.6

依赖安装

下载并配置 Hudi 相关 JAR 包:

  • hudi-aws-0.15.0.jar
  • hudi-flink1.18-bundle-0.15.0-amzn-0.jar
  • 设置 S3 作为检查点存储路径:s3://***/ods/

Flink Session 配置

使用 flink-yarn-session 启动集群,参数包括:

  • 内存分配:-jm 2048 -tm 4096 -s 2
  • 检查点设置:RocksDB 状态后端、检查点间隔 60 秒、保留 5 个检查点。
  • 执行模式:EXACTLY_ONCE。
  • 进入 Flink SQL 交互界面:sql-client.sh -s {application id}
#准备环境
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-aws/0.15.0/hudi-aws-0.15.0.jar
sudo cp hudi-aws-0.15.0.jar /usr/lib/flink/lib/
sudo cp /usr/lib/hudi/hudi-flink1.18-bundle-0.15.0-amzn-0.jar /usr/lib/flink/lib/
checkpoints=s3://***/ods/

#创建flink session
flink-yarn-session -jm 2048 -tm 4096 -s 2 \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=60000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-d \
-t /etc/hive/conf/hive-site.xml

#基于session id进入flink-sql交互页面
/usr/lib/flink/bin/sql-client.sh -s {application id} 

测试数据生成

使用 flink-sql,配合 hudi data-gen 生成实时数据

CREATE TABLE source_table (order_id INT,order_no STRING,sub_order_no STRING,order_type STRING,order_status STRING,order_amount DECIMAL(10,2),order_time TIMESTAMP,pay_time TIMESTAMP,delivery_time TIMESTAMP,complete_time TIMESTAMP,user_id STRING,user_name STRING,user_level STRING,user_age INT,user_gender STRING,user_phone STRING,user_email STRING,register_time TIMESTAMP,last_login_time TIMESTAMP,membership_level STRING,membership_points INT,user_status STRING,product_id INT,product_name STRING,product_code STRING,category_id STRING,category_name STRING,brand_id STRING,brand_name STRING,supplier_id STRING,supplier_name STRING,product_status STRING,product_type STRING,product_desc STRING,product_weight DECIMAL(10,2),product_length DECIMAL(10,2),product_width DECIMAL(10,2),product_height DECIMAL(10,2),original_price DECIMAL(10,2),sale_price DECIMAL(10,2),wholesale_price DECIMAL(10,2),cost_price DECIMAL(10,2),discount_amount DECIMAL(10,2),discount_rate DECIMAL(5,2),tax_amount DECIMAL(10,2),shipping_fee DECIMAL(10,2),stock_quantity INT,available_stock INT,locked_stock INT,safety_stock INT,warehouse_id STRING,warehouse_name STRING,warehouse_location STRING,payment_id STRING,payment_method STRING,payment_status STRING,payment_platform STRING,transaction_id STRING,bank_card_no STRING,bank_name STRING,shipping_id STRING,shipping_method STRING,shipping_company STRING,tracking_no STRING,receiver_name STRING,receiver_phone STRING,receiver_province STRING,receiver_city STRING,receiver_district STRING,receiver_address STRING,receiver_postcode STRING,promotion_id STRING,promotion_name STRING,coupon_id STRING,coupon_code STRING,coupon_amount DECIMAL(10,2),activity_id STRING,activity_name STRING,channel_id STRING,channel_name STRING,daily_sales_amount DECIMAL(10,2),weekly_sales_amount DECIMAL(10,2),monthly_sales_amount DECIMAL(10,2),quarterly_sales_amount DECIMAL(10,2),yearly_sales_amount DECIMAL(10,2),daily_order_count INT,weekly_order_count INT,monthly_order_count INT,quarterly_order_count INT,yearly_order_count INT,sales_amount_1h DECIMAL(10,2),sales_amount_6h DECIMAL(10,2),sales_amount_12h DECIMAL(10,2),sales_amount_24h DECIMAL(10,2),sales_amount_7d DECIMAL(10,2),sales_amount_30d DECIMAL(10,2),order_count_1h INT,order_count_6h INT,order_count_12h INT,order_count_24h INT,order_count_7d INT,order_count_30d INT,product_daily_sales DECIMAL(10,2),product_weekly_sales DECIMAL(10,2),product_monthly_sales DECIMAL(10,2),product_daily_orders INT,product_weekly_orders INT,product_monthly_orders INT,user_total_orders INT,user_total_amount DECIMAL(10,2),user_avg_amount DECIMAL(10,2),user_max_amount DECIMAL(10,2),user_min_amount DECIMAL(10,2),user_order_frequency DECIMAL(10,2),return_id STRING,return_type STRING,return_reason STRING,return_amount DECIMAL(10,2),return_status STRING,return_time TIMESTAMP,review_id STRING,review_content STRING,review_score INT,review_time TIMESTAMP,review_status STRING,sales_amount_q1 DECIMAL(10,2),sales_amount_q2 DECIMAL(10,2),sales_amount_q3 DECIMAL(10,2),sales_amount_q4 DECIMAL(10,2),order_count_q1 INT,order_count_q2 INT,order_count_q3 INT,order_count_q4 INT,avg_daily_sales DECIMAL(10,2),avg_weekly_sales DECIMAL(10,2),avg_monthly_sales DECIMAL(10,2),avg_quarterly_sales DECIMAL(10,2),cumulative_sales DECIMAL(10,2),cumulative_orders INT,cumulative_customers INT,cumulative_products INT,yoy_sales_growth DECIMAL(10,2),mom_sales_growth DECIMAL(10,2),wow_sales_growth DECIMAL(10,2),dod_sales_growth DECIMAL(10,2),browse_count INT,cart_count INT,order_count INT,pay_count INT,browse_to_cart_rate DECIMAL(10,2),cart_to_order_rate DECIMAL(10,2),order_to_pay_rate DECIMAL(10,2),create_time TIMESTAMP,update_time TIMESTAMP,create_user STRING,update_user STRING,is_deleted BOOLEAN,remark STRING) WITH ('connector' = 'datagen','rows-per-second' = '500');

CREATE TABLE target_table (order_id INT,order_no STRING,sub_order_no STRING,order_type STRING,order_status STRING,order_amount DECIMAL(10,2),order_time TIMESTAMP,pay_time TIMESTAMP,delivery_time TIMESTAMP,complete_time TIMESTAMP,user_id STRING,user_name STRING,user_level STRING,user_age INT,user_gender STRING,user_phone STRING,user_email STRING,register_time TIMESTAMP,last_login_time TIMESTAMP,membership_level STRING,membership_points INT,user_status STRING,product_id INT,product_name STRING,product_code STRING,category_id STRING,category_name STRING,brand_id STRING,brand_name STRING,supplier_id STRING,supplier_name STRING,product_status STRING,product_type STRING,product_desc STRING,product_weight DECIMAL(10,2),product_length DECIMAL(10,2),product_width DECIMAL(10,2),product_height DECIMAL(10,2),original_price DECIMAL(10,2),sale_price DECIMAL(10,2),wholesale_price DECIMAL(10,2),cost_price DECIMAL(10,2),discount_amount DECIMAL(10,2),discount_rate DECIMAL(5,2),tax_amount DECIMAL(10,2),shipping_fee DECIMAL(10,2),stock_quantity INT,available_stock INT,locked_stock INT,safety_stock INT,warehouse_id STRING,warehouse_name STRING,warehouse_location STRING,payment_id STRING,payment_method STRING,payment_status STRING,payment_platform STRING,transaction_id STRING,bank_card_no STRING,bank_name STRING,shipping_id STRING,shipping_method STRING,shipping_company STRING,tracking_no STRING,receiver_name STRING,receiver_phone STRING,receiver_province STRING,receiver_city STRING,receiver_district STRING,receiver_address STRING,receiver_postcode STRING,promotion_id STRING,promotion_name STRING,coupon_id STRING,coupon_code STRING,coupon_amount DECIMAL(10,2),activity_id STRING,activity_name STRING,channel_id STRING,channel_name STRING,daily_sales_amount DECIMAL(10,2),weekly_sales_amount DECIMAL(10,2),monthly_sales_amount DECIMAL(10,2),quarterly_sales_amount DECIMAL(10,2),yearly_sales_amount DECIMAL(10,2),daily_order_count INT,weekly_order_count INT,monthly_order_count INT,quarterly_order_count INT,yearly_order_count INT,sales_amount_1h DECIMAL(10,2),sales_amount_6h DECIMAL(10,2),sales_amount_12h DECIMAL(10,2),sales_amount_24h DECIMAL(10,2),sales_amount_7d DECIMAL(10,2),sales_amount_30d DECIMAL(10,2),order_count_1h INT,order_count_6h INT,order_count_12h INT,order_count_24h INT,order_count_7d INT,order_count_30d INT,product_daily_sales DECIMAL(10,2),product_weekly_sales DECIMAL(10,2),product_monthly_sales DECIMAL(10,2),product_daily_orders INT,product_weekly_orders INT,product_monthly_orders INT,user_total_orders INT,user_total_amount DECIMAL(10,2),user_avg_amount DECIMAL(10,2),user_max_amount DECIMAL(10,2),user_min_amount DECIMAL(10,2),user_order_frequency DECIMAL(10,2),return_id STRING,return_type STRING,return_reason STRING,return_amount DECIMAL(10,2),return_status STRING,return_time TIMESTAMP,review_id STRING,review_content STRING,review_score INT,review_time TIMESTAMP,review_status STRING,sales_amount_q1 DECIMAL(10,2),sales_amount_q2 DECIMAL(10,2),sales_amount_q3 DECIMAL(10,2),sales_amount_q4 DECIMAL(10,2),order_count_q1 INT,order_count_q2 INT,order_count_q3 INT,order_count_q4 INT,avg_daily_sales DECIMAL(10,2),avg_weekly_sales DECIMAL(10,2),avg_monthly_sales DECIMAL(10,2),avg_quarterly_sales DECIMAL(10,2),cumulative_sales DECIMAL(10,2),cumulative_orders INT,cumulative_customers INT,cumulative_products INT,yoy_sales_growth DECIMAL(10,2),mom_sales_growth DECIMAL(10,2),wow_sales_growth DECIMAL(10,2),dod_sales_growth DECIMAL(10,2),browse_count INT,cart_count INT,order_count INT,pay_count INT,browse_to_cart_rate DECIMAL(10,2),cart_to_order_rate DECIMAL(10,2),order_to_pay_rate DECIMAL(10,2),create_time TIMESTAMP,update_time TIMESTAMP,create_user STRING,update_user STRING,is_deleted BOOLEAN,remark STRING,ds STRING,hh STRING,PRIMARY KEY(order_id) NOT ENFORCED) PARTITIONED BY (ds,hh) WITH ('connector' = 'hudi','path' = 's3://yoreland-hudi/ods/target_table/','table.type' = 'MERGE_ON_READ','hive_sync.enable' = 'true','hive_sync.db' = 'flink_hudi_database','hive_sync.table' = 'target_table','hive_sync.mode' = 'GLUE','hive_sync.partition_fields' = 'ds, hh','hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor','hive_sync.use_jdbc' = 'false');

INSERT INTO target_table SELECT order_id,order_no,sub_order_no,order_type,order_status,order_amount,order_time,pay_time,delivery_time,complete_time,user_id,user_name,user_level,user_age,user_gender,user_phone,user_email,register_time,last_login_time,membership_level,membership_points,user_status,product_id,product_name,product_code,category_id,category_name,brand_id,brand_name,supplier_id,supplier_name,product_status,product_type,product_desc,product_weight,product_length,product_width,product_height,original_price,sale_price,wholesale_price,cost_price,discount_amount,discount_rate,tax_amount,shipping_fee,stock_quantity,available_stock,locked_stock,safety_stock,warehouse_id,warehouse_name,warehouse_location,payment_id,payment_method,payment_status,payment_platform,transaction_id,bank_card_no,bank_name,shipping_id,shipping_method,shipping_company,tracking_no,receiver_name,receiver_phone,receiver_province,receiver_city,receiver_district,receiver_address,receiver_postcode,promotion_id,promotion_name,coupon_id,coupon_code,coupon_amount,activity_id,activity_name,channel_id,channel_name,daily_sales_amount,weekly_sales_amount,monthly_sales_amount,quarterly_sales_amount,yearly_sales_amount,daily_order_count,weekly_order_count,monthly_order_count,quarterly_order_count,yearly_order_count,sales_amount_1h,sales_amount_6h,sales_amount_12h,sales_amount_24h,sales_amount_7d,sales_amount_30d,order_count_1h,order_count_6h,order_count_12h,order_count_24h,order_count_7d,order_count_30d,product_daily_sales,product_weekly_sales,product_monthly_sales,product_daily_orders,product_weekly_orders,product_monthly_orders,user_total_orders,user_total_amount,user_avg_amount,user_max_amount,user_min_amount,user_order_frequency,return_id,return_type,return_reason,return_amount,return_status,return_time,review_id,review_content,review_score,review_time,review_status,sales_amount_q1,sales_amount_q2,sales_amount_q3,sales_amount_q4,order_count_q1,order_count_q2,order_count_q3,order_count_q4,avg_daily_sales,avg_weekly_sales,avg_monthly_sales,avg_quarterly_sales,cumulative_sales,cumulative_orders,cumulative_customers,cumulative_products,yoy_sales_growth,mom_sales_growth,wow_sales_growth,dod_sales_growth,browse_count,cart_count,order_count,pay_count,browse_to_cart_rate,cart_to_order_rate,order_to_pay_rate,create_time,update_time,create_user,update_user,is_deleted,remark,DATE_FORMAT(order_time, 'yyyy-MM-dd') as ds,DATE_FORMAT(order_time, 'HH') as hh FROM source_table;

数据生成方案:
1.	表结构规模:
•	总字段数:134个字段
•	字段类型分布:
o	INT: 31个
o	STRING: 47个
o	DECIMAL: 45个
o	TIMESTAMP: 9个
o	BOOLEAN: 1个
2.	数据生成频率:
•	写入速率:500行/秒
•	每小时数据量:500 * 3600 = 1,800,000行
•	每天数据量:500 * 3600 * 24 = 43,200,000行(约4320万条)
3.	分区设计:
•	按天分区(ds)
•	按小时分区(hh)
•	每天产生24个分区
4.	存储预估:
•	假设每行平均1KB
•	每小时数据量:约1.8GB
•	每天数据量:约43.2GB
5.	表特点:
•	使用Hudi MERGE_ON_READ表类型
•	支持Hive元数据同步
•	使用Glue作为元数据目录
•	数据存储在S3路径
•	以order_id作为主键

在另一个集群 B 中使用 presto(或使用 athena)进行延迟查询测试

presto-cli --catalog hive
presto> SELECT max(update_time) FROM "flink_hudi_database"."target_table";
          _col0          
-------------------------
 2025-02-26 06:24:05.904 
(1 row)

测试结果

以下结果是在集群持续运行 12 小时左右期间其中随机时间进行 ad-hoc 查询产生的结果。

运行时间波动范围:6.608 秒 – 8.583 秒;平均延迟约 190 秒(3-4 分钟)

测试编号 运行时间(sec) 排队时间(ms) 扫描数据量(GB) 延迟(second)
1 6.608 101 9.12 147
2 7.055 115 9.14 212
3 8.583 97 9.17 230
4 6.892 108 8.92 168
5 7.234 94 9.45 195
6 8.127 112 9.28 205
7 6.945 98 8.78 178
8 7.856 106 9.52 223
9 8.234 103 9.33 189
10 7.123 99 8.85 156
11 6.987 118 9.58 198
12 8.445 95 9.22 215
13 7.567 107 8.72 182
14 7.892 113 9.41 201
15 8.156 96 9.07 167

预期成果

  • 通过 POC 验证新方案的可行性和优势,为中长期优化提供数据支持和决策依据。
  • 提高实时数据分析的性能和成本效益,为客户带来更高的业务价值。

参考文档

  1. 在 Amazon EMR 上构建实时数据湖:https://dev.amazoncloud.cn/column/article/6309c8990c9a20404da7914f
  2. 多库多表场景下使用 Amazon EMR CDC 实时入湖最佳实践:https://dev.amazoncloud.cn/column/article/6309e29be0f88a79bcfae80a

本篇作者

Dora Gui

亚马逊云科技技术客户经理,主要支持游戏、互联网行业客户的架构优化、成本管理、技术咨询等工作,并专注在 IAAS、大数据和容器等方向的技术选型、方案落地和实践。在加入亚马逊云科技之前,曾就职于 EMC 和微软、腾讯等科技公司,拥有近 10 年虚拟化与公有云领域的架构优化和技术支持经验。

夏宁

亚马逊云科技解决方案架构师,曾就职惠普软件和声网,超过 10 年前端后端开发经验,主导过各类软件项目设计。熟悉移动互联网,音视频,云计算相关领域。

杨俊

亚马逊云科技资深解决方案架构师。加入亚马逊云科技之前,主要从事电商和零售相关的系统开发工作,具备丰富的零售行业经验和企业上云实践经验。

翁建清

亚马逊云科技资深解决方案架构师,具有多年 IT 从业经验,涉及移动互联网、企业、金融、政府等行业,曾任职咨询总监、CIO、企业架构师等岗位,具有多年丰富的各类项目经验,尤其在数据仓库、大数据、数据应用场景等方面具有丰富的实战经验,目前专注于企业整体上云的架构规划、设计和实施。