在电子商务行业中,EMR、Flink 和 Kafka 的组合为实时库存管理和动态定价提供了强大的解决方案。通过 Kafka 实时捕获销售、库存和市场数据,Flink 能够迅速处理这些信息流,实现库存水平的即时更新和价格的动态调整。EMR 则提供了可扩展的计算资源,支持复杂的分析算法。这种实时数据处理能力使企业能够精确把握库存状况,避免过度库存或缺货情况,同时根据市场需求和竞争对手价格实时优化定价策略。结果是库存准确性显著提高,通常达到 95% 以上,而动态定价策略则可带来 5-10% 的收入增长。这不仅提升了运营效率,还增强了企业在快速变化的电商市场中的竞争力,为客户提供了更好的购物体验。
1. 直接写入 Hudi:
2. 测试指标:
CREATE TABLE source_table (order_id INT,order_no STRING,sub_order_no STRING,order_type STRING,order_status STRING,order_amount DECIMAL(10,2),order_time TIMESTAMP,pay_time TIMESTAMP,delivery_time TIMESTAMP,complete_time TIMESTAMP,user_id STRING,user_name STRING,user_level STRING,user_age INT,user_gender STRING,user_phone STRING,user_email STRING,register_time TIMESTAMP,last_login_time TIMESTAMP,membership_level STRING,membership_points INT,user_status STRING,product_id INT,product_name STRING,product_code STRING,category_id STRING,category_name STRING,brand_id STRING,brand_name STRING,supplier_id STRING,supplier_name STRING,product_status STRING,product_type STRING,product_desc STRING,product_weight DECIMAL(10,2),product_length DECIMAL(10,2),product_width DECIMAL(10,2),product_height DECIMAL(10,2),original_price DECIMAL(10,2),sale_price DECIMAL(10,2),wholesale_price DECIMAL(10,2),cost_price DECIMAL(10,2),discount_amount DECIMAL(10,2),discount_rate DECIMAL(5,2),tax_amount DECIMAL(10,2),shipping_fee DECIMAL(10,2),stock_quantity INT,available_stock INT,locked_stock INT,safety_stock INT,warehouse_id STRING,warehouse_name STRING,warehouse_location STRING,payment_id STRING,payment_method STRING,payment_status STRING,payment_platform STRING,transaction_id STRING,bank_card_no STRING,bank_name STRING,shipping_id STRING,shipping_method STRING,shipping_company STRING,tracking_no STRING,receiver_name STRING,receiver_phone STRING,receiver_province STRING,receiver_city STRING,receiver_district STRING,receiver_address STRING,receiver_postcode STRING,promotion_id STRING,promotion_name STRING,coupon_id STRING,coupon_code STRING,coupon_amount DECIMAL(10,2),activity_id STRING,activity_name STRING,channel_id STRING,channel_name STRING,daily_sales_amount DECIMAL(10,2),weekly_sales_amount DECIMAL(10,2),monthly_sales_amount DECIMAL(10,2),quarterly_sales_amount DECIMAL(10,2),yearly_sales_amount DECIMAL(10,2),daily_order_count INT,weekly_order_count INT,monthly_order_count INT,quarterly_order_count INT,yearly_order_count INT,sales_amount_1h DECIMAL(10,2),sales_amount_6h DECIMAL(10,2),sales_amount_12h DECIMAL(10,2),sales_amount_24h DECIMAL(10,2),sales_amount_7d DECIMAL(10,2),sales_amount_30d DECIMAL(10,2),order_count_1h INT,order_count_6h INT,order_count_12h INT,order_count_24h INT,order_count_7d INT,order_count_30d INT,product_daily_sales DECIMAL(10,2),product_weekly_sales DECIMAL(10,2),product_monthly_sales DECIMAL(10,2),product_daily_orders INT,product_weekly_orders INT,product_monthly_orders INT,user_total_orders INT,user_total_amount DECIMAL(10,2),user_avg_amount DECIMAL(10,2),user_max_amount DECIMAL(10,2),user_min_amount DECIMAL(10,2),user_order_frequency DECIMAL(10,2),return_id STRING,return_type STRING,return_reason STRING,return_amount DECIMAL(10,2),return_status STRING,return_time TIMESTAMP,review_id STRING,review_content STRING,review_score INT,review_time TIMESTAMP,review_status STRING,sales_amount_q1 DECIMAL(10,2),sales_amount_q2 DECIMAL(10,2),sales_amount_q3 DECIMAL(10,2),sales_amount_q4 DECIMAL(10,2),order_count_q1 INT,order_count_q2 INT,order_count_q3 INT,order_count_q4 INT,avg_daily_sales DECIMAL(10,2),avg_weekly_sales DECIMAL(10,2),avg_monthly_sales DECIMAL(10,2),avg_quarterly_sales DECIMAL(10,2),cumulative_sales DECIMAL(10,2),cumulative_orders INT,cumulative_customers INT,cumulative_products INT,yoy_sales_growth DECIMAL(10,2),mom_sales_growth DECIMAL(10,2),wow_sales_growth DECIMAL(10,2),dod_sales_growth DECIMAL(10,2),browse_count INT,cart_count INT,order_count INT,pay_count INT,browse_to_cart_rate DECIMAL(10,2),cart_to_order_rate DECIMAL(10,2),order_to_pay_rate DECIMAL(10,2),create_time TIMESTAMP,update_time TIMESTAMP,create_user STRING,update_user STRING,is_deleted BOOLEAN,remark STRING) WITH ('connector' = 'datagen','rows-per-second' = '500');
CREATE TABLE target_table (order_id INT,order_no STRING,sub_order_no STRING,order_type STRING,order_status STRING,order_amount DECIMAL(10,2),order_time TIMESTAMP,pay_time TIMESTAMP,delivery_time TIMESTAMP,complete_time TIMESTAMP,user_id STRING,user_name STRING,user_level STRING,user_age INT,user_gender STRING,user_phone STRING,user_email STRING,register_time TIMESTAMP,last_login_time TIMESTAMP,membership_level STRING,membership_points INT,user_status STRING,product_id INT,product_name STRING,product_code STRING,category_id STRING,category_name STRING,brand_id STRING,brand_name STRING,supplier_id STRING,supplier_name STRING,product_status STRING,product_type STRING,product_desc STRING,product_weight DECIMAL(10,2),product_length DECIMAL(10,2),product_width DECIMAL(10,2),product_height DECIMAL(10,2),original_price DECIMAL(10,2),sale_price DECIMAL(10,2),wholesale_price DECIMAL(10,2),cost_price DECIMAL(10,2),discount_amount DECIMAL(10,2),discount_rate DECIMAL(5,2),tax_amount DECIMAL(10,2),shipping_fee DECIMAL(10,2),stock_quantity INT,available_stock INT,locked_stock INT,safety_stock INT,warehouse_id STRING,warehouse_name STRING,warehouse_location STRING,payment_id STRING,payment_method STRING,payment_status STRING,payment_platform STRING,transaction_id STRING,bank_card_no STRING,bank_name STRING,shipping_id STRING,shipping_method STRING,shipping_company STRING,tracking_no STRING,receiver_name STRING,receiver_phone STRING,receiver_province STRING,receiver_city STRING,receiver_district STRING,receiver_address STRING,receiver_postcode STRING,promotion_id STRING,promotion_name STRING,coupon_id STRING,coupon_code STRING,coupon_amount DECIMAL(10,2),activity_id STRING,activity_name STRING,channel_id STRING,channel_name STRING,daily_sales_amount DECIMAL(10,2),weekly_sales_amount DECIMAL(10,2),monthly_sales_amount DECIMAL(10,2),quarterly_sales_amount DECIMAL(10,2),yearly_sales_amount DECIMAL(10,2),daily_order_count INT,weekly_order_count INT,monthly_order_count INT,quarterly_order_count INT,yearly_order_count INT,sales_amount_1h DECIMAL(10,2),sales_amount_6h DECIMAL(10,2),sales_amount_12h DECIMAL(10,2),sales_amount_24h DECIMAL(10,2),sales_amount_7d DECIMAL(10,2),sales_amount_30d DECIMAL(10,2),order_count_1h INT,order_count_6h INT,order_count_12h INT,order_count_24h INT,order_count_7d INT,order_count_30d INT,product_daily_sales DECIMAL(10,2),product_weekly_sales DECIMAL(10,2),product_monthly_sales DECIMAL(10,2),product_daily_orders INT,product_weekly_orders INT,product_monthly_orders INT,user_total_orders INT,user_total_amount DECIMAL(10,2),user_avg_amount DECIMAL(10,2),user_max_amount DECIMAL(10,2),user_min_amount DECIMAL(10,2),user_order_frequency DECIMAL(10,2),return_id STRING,return_type STRING,return_reason STRING,return_amount DECIMAL(10,2),return_status STRING,return_time TIMESTAMP,review_id STRING,review_content STRING,review_score INT,review_time TIMESTAMP,review_status STRING,sales_amount_q1 DECIMAL(10,2),sales_amount_q2 DECIMAL(10,2),sales_amount_q3 DECIMAL(10,2),sales_amount_q4 DECIMAL(10,2),order_count_q1 INT,order_count_q2 INT,order_count_q3 INT,order_count_q4 INT,avg_daily_sales DECIMAL(10,2),avg_weekly_sales DECIMAL(10,2),avg_monthly_sales DECIMAL(10,2),avg_quarterly_sales DECIMAL(10,2),cumulative_sales DECIMAL(10,2),cumulative_orders INT,cumulative_customers INT,cumulative_products INT,yoy_sales_growth DECIMAL(10,2),mom_sales_growth DECIMAL(10,2),wow_sales_growth DECIMAL(10,2),dod_sales_growth DECIMAL(10,2),browse_count INT,cart_count INT,order_count INT,pay_count INT,browse_to_cart_rate DECIMAL(10,2),cart_to_order_rate DECIMAL(10,2),order_to_pay_rate DECIMAL(10,2),create_time TIMESTAMP,update_time TIMESTAMP,create_user STRING,update_user STRING,is_deleted BOOLEAN,remark STRING,ds STRING,hh STRING,PRIMARY KEY(order_id) NOT ENFORCED) PARTITIONED BY (ds,hh) WITH ('connector' = 'hudi','path' = 's3://yoreland-hudi/ods/target_table/','table.type' = 'MERGE_ON_READ','hive_sync.enable' = 'true','hive_sync.db' = 'flink_hudi_database','hive_sync.table' = 'target_table','hive_sync.mode' = 'GLUE','hive_sync.partition_fields' = 'ds, hh','hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor','hive_sync.use_jdbc' = 'false');
INSERT INTO target_table SELECT order_id,order_no,sub_order_no,order_type,order_status,order_amount,order_time,pay_time,delivery_time,complete_time,user_id,user_name,user_level,user_age,user_gender,user_phone,user_email,register_time,last_login_time,membership_level,membership_points,user_status,product_id,product_name,product_code,category_id,category_name,brand_id,brand_name,supplier_id,supplier_name,product_status,product_type,product_desc,product_weight,product_length,product_width,product_height,original_price,sale_price,wholesale_price,cost_price,discount_amount,discount_rate,tax_amount,shipping_fee,stock_quantity,available_stock,locked_stock,safety_stock,warehouse_id,warehouse_name,warehouse_location,payment_id,payment_method,payment_status,payment_platform,transaction_id,bank_card_no,bank_name,shipping_id,shipping_method,shipping_company,tracking_no,receiver_name,receiver_phone,receiver_province,receiver_city,receiver_district,receiver_address,receiver_postcode,promotion_id,promotion_name,coupon_id,coupon_code,coupon_amount,activity_id,activity_name,channel_id,channel_name,daily_sales_amount,weekly_sales_amount,monthly_sales_amount,quarterly_sales_amount,yearly_sales_amount,daily_order_count,weekly_order_count,monthly_order_count,quarterly_order_count,yearly_order_count,sales_amount_1h,sales_amount_6h,sales_amount_12h,sales_amount_24h,sales_amount_7d,sales_amount_30d,order_count_1h,order_count_6h,order_count_12h,order_count_24h,order_count_7d,order_count_30d,product_daily_sales,product_weekly_sales,product_monthly_sales,product_daily_orders,product_weekly_orders,product_monthly_orders,user_total_orders,user_total_amount,user_avg_amount,user_max_amount,user_min_amount,user_order_frequency,return_id,return_type,return_reason,return_amount,return_status,return_time,review_id,review_content,review_score,review_time,review_status,sales_amount_q1,sales_amount_q2,sales_amount_q3,sales_amount_q4,order_count_q1,order_count_q2,order_count_q3,order_count_q4,avg_daily_sales,avg_weekly_sales,avg_monthly_sales,avg_quarterly_sales,cumulative_sales,cumulative_orders,cumulative_customers,cumulative_products,yoy_sales_growth,mom_sales_growth,wow_sales_growth,dod_sales_growth,browse_count,cart_count,order_count,pay_count,browse_to_cart_rate,cart_to_order_rate,order_to_pay_rate,create_time,update_time,create_user,update_user,is_deleted,remark,DATE_FORMAT(order_time, 'yyyy-MM-dd') as ds,DATE_FORMAT(order_time, 'HH') as hh FROM source_table;
数据生成方案:
1. 表结构规模:
• 总字段数:134个字段
• 字段类型分布:
o INT: 31个
o STRING: 47个
o DECIMAL: 45个
o TIMESTAMP: 9个
o BOOLEAN: 1个
2. 数据生成频率:
• 写入速率:500行/秒
• 每小时数据量:500 * 3600 = 1,800,000行
• 每天数据量:500 * 3600 * 24 = 43,200,000行(约4320万条)
3. 分区设计:
• 按天分区(ds)
• 按小时分区(hh)
• 每天产生24个分区
4. 存储预估:
• 假设每行平均1KB
• 每小时数据量:约1.8GB
• 每天数据量:约43.2GB
5. 表特点:
• 使用Hudi MERGE_ON_READ表类型
• 支持Hive元数据同步
• 使用Glue作为元数据目录
• 数据存储在S3路径
• 以order_id作为主键