在当今数字化转型加速的商业环境中,企业在处理日活数据分析时面临着四大关键痛点:
- 首先,传统的 Iceberg存储层虽然易于集成,但缺乏简单高效的表管理功能;
- 其次,Kubernetes集群全部使用按需实例的方式导致计算资源成本较高,且缺乏真正有效的弹性扩缩能力,造成资源利用率低下;
- 第三,通过自行维护基础设施进行日活分析作业调度的方式,给运维增加了复杂度;
- 最后,缺乏直观的可视化分析工具,使得业务数据难以清晰呈现,严重影响决策效率和准确性。如何在控制成本降低运维复杂度的同时,快速从海量日活数据中提取有价值的业务洞察,已成为决定竞争优势的关键因素。
针对这些挑战,我们设计了一套基于 AWS 的现代化数据分析平台。该方案:
- 通过 Amazon S3 Tables 桶替代复杂的 Iceberg 格式,提供简单高效的数据操作能力;
- 采用 Amazon EMR on Amazon EKS 结合 Karpenter、Graviton 和 Spot 实例的组合,显著降低计算成本并实现高效的弹性扩缩;
- 利用 Amazon EventBridge 与 AWS Lambda 构建自动化调度系统,摆脱基础设施维护负担;
- 整合 Amazon Athena 和 Amazon QuickSight 提供强大的分析可视化能力,让业务数据直观呈现,加速决策过程。
这套解决方案适合需要处理波动性大的日活数据、希望优化数据存储,追求降低总体成本、探索简化运维流程,同时要求高效数据可视化的企业。可以加速从数据采集到业务洞察的全流程,使企业能够更敏捷地响应市场变化,做出数据驱动的决策,实现数据价值的最大化。
下面我们将重点分享这一架构的详细设计思路、环境搭建过程中的关键注意事项。
一、架构介绍
下图展示了我们基于 AWS 构建的数据分析平台架构。该架构整合了多项 AWS 服务,实现了从数据采集、存储、处理到分析可视化的完整数据流程。下面将详细介绍各个组件及其在整体架构中的作用。
核心组件及优势
- 数据存储层:Amazon S3 Tables。S3 Tables 为数据存储提供简单高效的解决方案,支持增删改查操作,避免了 Iceberg 等格式的复杂维护成本,同时兼容 Athena、Spark 等常见查询引擎。
- 计算引擎:Amazon EMR on EKS。EMR Spark 提供 100% 兼容开源 Spark 的环境,同时针对 AWS 的环境进行了性能优化,为数据分析任务提供强大且灵活的计算能力。
- 资源管理:Karpenter 结合 Graviton + Spot 实例。Karpenter 实现 EKS 计算资源的智能自动扩缩,结合 Graviton 处理器和 Spot 实例的使用,在保证性能的同时显著降低计算成本。
- 作业调度:EventBridge 结合 Lambda。EventBridge 和 Lambda 的组合实现了完全自动化的作业调度,无需手工操作即可触发 Spark 作业,采用 Serverless 方式降低了运维复杂度。
- 数据分析与可视化:Athena 和 QuickSight。Athena 提供无服务器 SQL 查询能力,QuickSight 则提供强大的 BI 分析和可视化功能,两者无缝集成,让业务数据可视化并提供直观洞察。
二、端到端运行流程
1. 在日活数据分析场景上应用该方案,需要在 S3 Tables 桶中创建两张表格:
其中:
- user_activity:用于存储用户活动数据
- daily_active_users_:用户存储分析完的日活数据结果
2. 客户使用 Amazon Data Firehose,通过 batch Direct PUT 的方式,采用 Glue 资源链接连接到 S3 Tables 桶,并将前一天的用户活动数据注入到 S3 Tables 桶中的 user_activity 表中。
3. Lambda 函数会启动用于日活数据分析的 EMR Spark 任务。此处 EMR 部署方式为在 EKS 之上。
4. 创建 EventBridge 规则,定义为每天定时触发 Lambda 函数进行日活数据分析。示例如下:
5. EMR Spark 任务分析 user_activity 中的数据,并将分析的上一日的日活结果写入到 daily_active_users 表中。EventBridge 触发的 EMR Spark 任务可在 EMR 界面查看。
6. 在 Athena 和 QuickSight 中通过查询 S3 Tables 桶中的 daily_active_users 表格来展示日活数据趋势。一个简单的图表结果示例如下:
三、实施要点与注意事项
1. 创建 S3 Tables 桶和表格
可以通过如下脚本创建 S3 Tables 桶,命名空间以及表格。
- 创建 S3 Tables 桶
aws s3tables create-table-bucket -region us-east-1 --name <your-table-bucket-name>
- 创建命名空间
aws s3tables create-namespace --table-bucket-arn <tyour-able-bucket-arn> --namespace <your-namespace-name>
- 依次创建表 user_activity 和 daily_active_users
aws s3tables create-table --cli-input-json file://user_activity.json
aws s3tables create-table --cli-input-json file://daily_active_users.json
user_activity.json 内容如下:
{
"tableBucketARN": "<your-table-bucket-arn>",
"namespace": "<your-namespace-name>",
"name": "user_event",
"format": "ICEBERG",
"metadata": {
"iceberg": {
"schema": {
"fields": [
{"name": "user_id", "type": "int","required": true},
{"name": "event_time", "type": "timestamp"},
{"name": "event_type", "type": "string"}
]
}
}
}
}
daily_active_users.json 内容如下:
{
"tableBucketARN": "< your-table-bucket-arn>",
"namespace": "<your-namespace-name>",
"name": "daily_active_users",
"format": "ICEBERG",
"metadata": {
"iceberg": {
"schema": {
"fields": [
{"name": "event_date", "type": "date", "required": true},
{"name": "dau", "type": "int"},
{"name": "ingestion_time", "type": "timestamp"}
]
}
}
}
}
2. Amazon Data Firehose 填充数据
在使用 Amazon Data Firehose 将数据填充到 S3 时,需要注意以下几个关键点:
- 要访问您的表,某些 AWS 分析服务需要一个指向您表命名空间的资源链接。资源链接是一个 Data Catalog(数据目录)对象,它充当指向其他 Data Catalog 资源(如数据库或表)的别名或指针。示例如下:
aws glue create-database
--region us-east-1 \
--catalog-id "<replace-to-your-account-id>" \
--database-input \
‘{
"Name": "resource_link_to_s3tablesdemo",
"TargetDatabase": {
"CatalogId":"<replace-to-your-account-id>:s3tablescatalog/ <replace-to-your-table-bucket-name>",
"DatabaseName": "<replace-to-your-namespace>"
},
"CreateTableDefaultPermissions": []
}'
- 创建 Amazon Data Firehose 时,配置示例如下:
其中:
- Database expression 填写 resourcelink 的名字
- Table expression 填写要操作的 S3 Table 桶中表格的名字。
另外,可以参考如下脚本将用户活动数据写入到 S3 Table 桶中的 user_activity 表格。
#!/bin/bash
# 配置参数
DELIVERY_STREAM="<your-datafirehose-name>"
BATCH_SIZE=200 # 每批次发送 500 条
NUM_BATCHES=1 # 需要发送的批次数
NUM_RECORDS=$((NUM_BATCHES * BATCH_SIZE))
EVENT_TYPES=("login" "browse" "purchase")
BATCH=()
# 生成并发送数据
for ((i=1; i<=$NUM_BATCHES; i++)); do
for ((j=1; j<=$BATCH_SIZE; j++)); do
USER_ID=$((RANDOM % 900000 + 100000))
EVENT_TIME=$(date -v -$((1))d '+%Y-%m-%dT%H:%M:%S')
EVENT_TYPE=${EVENT_TYPES[$RANDOM % ${#EVENT_TYPES[@]}]}
JSON_DATA="{\"user_id\": $USER_ID, \"event_time\": \"$EVENT_TIME\", \"event_type\": \"$EVENT_TYPE\"}"
BASE64_DATA=$(echo -n "$JSON_DATA" | base64)
BATCH+=("{\"Data\":\"$BASE64_DATA\"}")
done
JSON_BATCH="[$(IFS=,; echo "${BATCH[*]}")]"
echo "aws firehose put-record-batch --delivery-stream-name $DELIVERY_STREAM --records [batch of $BATCH_SIZE]"
aws firehose put-record-batch --delivery-stream-name "$DELIVERY_STREAM" --records "$JSON_BATCH" > /dev/null
BATCH=() # 清空批次
echo "已发送 $i * $BATCH_SIZE 条数据..."
done
echo "数据生成并发送完毕,共 $NUM_RECORDS 条记录。"
3. EMR Spark Job 设置
在配置和运行 EMR Spark 作业时,应关注以下配置:
{
"name": "demojob",
"virtualClusterId": "5noewg7p3wb509neqivxxxxxxxx",
"executionRoleArn": "arn:aws:iam::73927xxxxxxx:role/EMRContainers-JobExecutionRole",
"releaseLabel": "emr-7.5.0-latest",
"jobDriver": {
……
},
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.jars.packages":"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.s3tablesbucket1": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.s3tablesbucket1.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
"spark.sql.catalog.s3tablesbucket1.warehouse": "arn:aws:s3tables:us-east-1: <replace-to-your-account-id>:bucket/<replace-to-your-bucket-name>"
}
}
]
}
}
其中:
- jars.packages:定义 Spark 运行时需要的依赖库 Jar 包,用于 Iceberg、AWS SDK 和 S3Tables 插件。
- sql.extensions:指定 Spark 使用 Iceberg 扩展,以便支持 Iceberg SQL 语法。
- sql.catalog.s3tablesbucket1:Spark SQL 中创建一个名为 s3tablesbucket1 的 Catalog,支持通过 SQL 访问 S3 Tables 数据。
- sql.catalog.s3tablesbucket1.catalog-impl:指定 s3tablesbucket1 Catalog 的实现实现类,这里使用 Amazon 提供的 S3TablesCatalog。
- sql.catalog.s3tablesbucket1.warehouse:指定数据存储路径(S3 Tables 存储桶)。
对用户活动数据分析的代码,示例如下:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, countDistinct, current_timestamp
import time
from datetime import datetime, timedelta
if __name__ == "__main__":
spark = SparkSession.builder\
.appName("DAU Analysis") \
.config("spark.driver.memory", "2G") \
.getOrCreate()
output_path = None
if len(sys.argv) > 1:
output_path = sys.argv[1]
else:
print("S3 output location not specified printing top 10 results to output stream")
# 计算前一天的日期
yesterday_date = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
# 读取用户行为数据(Iceberg 表)
user_activity_df = spark.read.format("iceberg").load("<your-s3-catalog-name>.<your-namespace-name>.user_event") \
.filter(to_date("event_time") == yesterday_date) # 只选前一天的数据
# 计算每日活跃用户(DAU)
dau_df = user_activity_df.withColumn("event_date", to_date("event_time")) \
.groupBy("event_date") \
.agg(countDistinct("user_id").alias("dau")) \
.withColumn("ingestion_time", current_timestamp())
# 写入 Iceberg 目标表
dau_df.write.format("iceberg").mode("append").save("<your-s3-catalog-name>.<your-namespace-name>.daily_active_users")
# 关闭 SparkSession
spark.stop()
4. 创建 Lambda 函数并配置 EventBridge 定时触发
Lambda 函数示例如下。其中 Job 的配置注意事项请参照上一节 EMR Spark Job 设置。
import json
import boto3
import os
emr_client = boto3.client('emr-containers')
def lambda_handler(event, context):
job_config = get_job_config()
# 解析必要参数
virtual_cluster_id = job_config["virtualClusterId"]
job_name = job_config["name"]
job_config_params = {
"name": job_name,
"virtualClusterId": virtual_cluster_id,
"executionRoleArn": job_config["executionRoleArn"],
"releaseLabel": job_config["releaseLabel"],
"jobDriver": job_config["jobDriver"],
"configurationOverrides": job_config.get("configurationOverrides", {})
}
try:
# 调用 start-job-run
response = emr_client.start_job_run(**job_config_params)
# 获取 Job ID
job_run_id = response["id"]
print(f"Job started successfully. Job ID: {job_run_id}")
return {
"statusCode": 200,
"body": json.dumps({"message": "Job started", "job_run_id": job_run_id})
}
except Exception as e:
print(f"Error starting job: {str(e)}")
return {
"statusCode": 500,
"body": json.dumps({"error": str(e)})
}
def get_job_config():
return {
"name": "dau-ana-trigger-by-lambda",
"releaseLabel": "emr-7.5.0-latest",
……
}
5. Karpenter node 支持 IMDS
确保 Karpenter 管理的节点正确支持实例元数据服务(IMDS)。在创建 Karpenter EC2NodeClass 时,请将 metadataOptions.httpPutResponseHopLimit 设置为 2。
注意,此 EC2NodeClass 中使用的的 ami 的区域是 us-east-1。请根据你的所在区域使用相应的 ami。
apiVersion: karpenter.k8s.aws/v1
kind: EC2NodeClass
metadata:
name: default
spec:
amiFamily: AL2 # Amazon Linux 2
role: "KarpenterNodeRole- eksdemo " # replace with your cluster name
subnetSelectorTerms:
- tags:
alpha.eksctl.io/cluster-name: "eksdemo" # replace with your cluster name
securityGroupSelectorTerms:
- tags:
alpha.eksctl.io/cluster-name: " eksdemo " # replace with your cluster name
amiSelectorTerms:
- id: "ami-07f0a903b02947a1c"
- id: "ami-0e4591ba595196441"
metadataOptions:
httpPutResponseHopLimit: 2
6. EKS 引入 Graviton 和 Spot 到 Spark job
将 Graviton 和 Spot 实例应用于 Spark 作业需要考虑:
将可中断的 Spark Executor pods 放置在 Spot 实例上,不可中断的 Spark Driver pods 放置在按需实例上。因此我们会使用 Karpenter 配置两个 nodepool,分别管理 Spark Driver pods 和 Spark Executor pods 所需的实例。
针对 Spark Driver pods 的示例如下:
apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
name: nodepool-od
spec:
template:
metadata:
labels:
karpenternodepool: nodepool-od
spec:
requirements:
- key: kubernetes.io/arch
operator: In
values: [”arm64"]
- key: karpenter.sh/capacity-type
operator: In
values: ["on-demand"]
- key: "eks.amazonaws.com/instance-family"
operator: In
values: [c6g, c7g, m6g, r6g]
相应的 pod template 如下。通过设置 nodeSelector 来使用带有 label karpenternodepool 为 nodepool-od 的实例。
apiVersion: v1
kind: Pod
spec:
volumes:
- name: source-data-volume
emptyDir: {}
- name: metrics-files-volume
emptyDir: {}
nodeSelector:
karpenternodepool: nodepool-od
containers:
- name: spark-kubernetes-driver
针对 Spark Executor pods 的示例如下:
apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
name: nodepool-mix
spec:
template:
metadata:
labels:
karpenternodepool: nodepool-mix
spec:
requirements:
- key: kubernetes.io/arch
operator: In
values: ["amd64","arm64"]
- key: karpenter.sh/capacity-type
operator: In
values: [“on-demand”,”spot”]
- key: "eks.amazonaws.com/instance-family"
operator: In
values: [c6g, c7g, m6g, c6i, m6i, r6g, r6i]
相应的 pod template 如下。通过设置 nodeSelector 来使用带有 karpenternodepool 标签值为 nodepool-mix 的实例。
apiVersion: v1
kind: Pod
spec:
volumes:
- name: source-data-volume
emptyDir: {}
- name: metrics-files-volume
EmptyDir: {}
nodeSelector:
karpenternodepool: nodepool-mix
containers:
- name: spark-kubernetes-executor
7. 授予 Lake Formation 对表资源的权限
使用 AWS Lake Formation 管理数据访问权限时,需要在 Lake Formation 导航窗格中,选择数据权限,然后选择授予。
对于 Athena 或 Firehose,选择 IAM 用户和角色。示例如下:
对于 Amazon QuickSight,选择 SAML 用户和组,然后输入 Amazon QuickSight 管理员用户的 ARN:
然后再继续分别授予表和资源链接的权限。
- 对表格赋予权限时操作如下:
- 对资源链接赋予权限时操作如下:
四、总结
本文介绍的架构充分利用了 AWS 的优势,构建了一个高效、低成本且易于维护的数据分析平台。通过 S3 Tables 作为数据存储、EMR on EKS 作为计算引擎、Karpenter 管理计算资源、EventBridge 和 Lambda 实现自动化调度,以及 Athena 和 QuickSight 提供分析可视化能力,我们实现了从数据采集到洞察的完整流程。
本篇作者