在我们构建的基于开源工具与 EMR 的数据分析平台中,Amazon EMR 是核心的计算引擎,提供了 Apache Spark 执行框架,底层使用 YARN 作为资源管理。通常数据驱动的公司,每天有大量的离线任务需要执行,这些任务需要在不同的时间、以不同的周期执行,任务之间有前后依赖关系,任务也由 sql、shell、java 等不同语言编写,任务分布在不同的部门,有不同的 owner,所以需要一个统一的管理、调度、编排工具,能够按部门、按项目管理任务,使用可视化的方式编写任务调度工作流,在本方案中我们引入了开源的任务调度系统 DolphinScheduler。
DolphinScheduler 介绍
Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。
Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系,并为应用程序提供数据和各种 OPS 编排中的关系。 解决数据研发 ETL 依赖错综复杂,无法监控任务健康状态的问题。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。
特性
- 简单易用
- 可视化 DAG:用户友好的,通过拖拽定义工作流的,运行时控制工具
- 模块化操作:模块化有助于轻松定制和维护。
- 丰富的使用场景
- 支持多种任务类型:支持 Shell、MR、Spark、SQL 等 10 余种任务类型,支持跨语言,易于扩展
- 丰富的工作流操作:工作流程可以定时、暂停、恢复和停止,便于维护和控制全局和本地参数。
- High Reliability
- 高可靠性: 去中心化设计,确保稳定性。 原生 HA 任务队列支持,提供过载容错能力。 DolphinScheduler 能提供高度稳健的环境。
- High Scalability
- 高扩展性: 支持多租户和在线资源管理。支持每天 10 万个数据任务的稳定运行。
DolphinScheduler 安装配置
DolphinScheduler 提供了多种部署选项,包括单机部署、伪集群部署、集群部署和 K8S 方式部署。官方部署方式参考【 DolphinScheduler 部署指南】,您也可以在 AWS 部署无服务器版本,部署方式参考博客【AWS 部署无服务器 DolphinScheduler】。
DolphinScheduler 任务管理与调度
任务管理
在 DolphinScheduler 中,任务按照用户、项目、工作流来组织,首先可以配置多用户,每个用户创建、上下线、管理自己的任务;然后再按照项目组织工作流,工作流中编排多个任务。可以配置用户对项目、数据源等的管理权限。这样在一个公司内,可以很好的按照部门、项目组、用户管理任务。
任务提交
DolphinScheduler 支持丰富的任务类型,支持基于 cron 表达式的定时调度和手动调度,命令类型支持:启动工作流、从当前节点开始执行,支持补历史数据。
其中针对 Spark 任务,有原生的 Spark 组件,但是使用该组件时需要 DolphinScheduler 节点上有 EMR 的环境,这样一旦 EMR 重建就需要重新配置环境,难以维护,而且无法在多个 EMR 集群上调度任务。为了减少 DolphinScheduler 和 EMR 的依赖和耦合,我们设计了不依赖与 EMR 环境的任务提交方式。
预先配置
在向 EMR 提交作业时,会使用 AWS SDK 进行作业提交,所以也需要对应的权限认证。通常步骤如下:
- 创建一个 IAM Policy,指定 EMR Step 相关权限
- 创建一个 IAM User,将步骤 1 里的 Policy 绑定到此 IAM User
- 获取此 IAM User 的 Access Key 与 Secret Key
一个参考的 IAM Policy 可以是:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:AddJobFlowSteps",
"elasticmapreduce:DescribeStep",
"elasticmapreduce:ListSteps",
"elasticmapreduce:CancelSteps",
"elasticmapreduce:ModifyInstanceGroups"
],
"Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
},
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:ListClusters",
"elasticmapreduce:DescribeCluster"
],
"Resource": "*"
}
]
}
如果你需要更严格的权限控制,可以将资源 ARN 限制为特定区域或特定集群 ID,例如:
在获取了 AWS Access Key 和 Secret Key 后,编辑 worker server 的 common.properties 配置文件(路径为:worker-server/conf/common.properties),填写以下配置后保存即可:
resource.aws.access.key.id=xxxxx
resource.aws.secret.access.key=xxxxx
resource.aws.region=xxxx
SparkJar 任务
首先在项目管理中选择一个项目,进入工作流定义界面,创建一个新工作流,拖拽“云”分组下的 EMR 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,程序类型选择 ADD_JOB_FLOW_STEPS,stepsDefineJson 按照下面 Json 格式填写,其中 JobFlowId 中填写 EMR Cluster ID,前置任务选择依赖的前置任务。
该组件的功能是通过 EMR SDK 向 EMR 集群增加一个 Step 任务,是 EMR 的原生任务提交方式,由于在 Json 中指定了集群名称,并且可以采用参数的方式提交,可以灵活定义和修改集群。
完整的 stepsDefineJson 格式:
{
"JobFlowId": "j-3JNBCJK*****",
"Steps": [
{
"Name": "spark-step",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--executor-memory","2G",
"--executor-cores","1",
"--num-executors","1",
"--class","org.apache.spark.examples.SparkPi",
"/usr/lib/spark/examples/jars/spark-examples.jar","1000"
]
}
}
]
}
SparkSql 任务
参考【基于开源工具构建 EMR 数据分析平台(四)使用 Kyuubi 进行 Spark Sql 任务提交】
EMR Serverless 任务
在工作流创建页面,拖拽通用组件下的 SHELL 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,在脚本部分填写以下 shell 脚本提交任务到 EMR Serverless。
脚本模板:
#!/bin/bash
set -ex
export APP_NAME='EMR-xxx-spark-prod'
export APP_S3_HOME='s3://xxxxs'
export APP_LOCAL_HOME='/tmp'
export EMR_SERVERLESS_APP_ID='xxxxx'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::xxxx'
submit-spark-sql-job() {
sqlFiles="$1"
sqlParams="$2"
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
"name":"my-spark-sql-job",
"applicationId":"$EMR_SERVERLESS_APP_ID",
"executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
"jobDriver":{
"sparkSubmit":{
"entryPoint":"$APP_S3_HOME/jar/emr-serverless-utils-1.0.jar",
"entryPointArguments":[
$([[ -n "$sqlFiles" ]] && echo "\"--sql-files\", \"$sqlFiles\"")
$([[ -n "$sqlParams" ]] && echo ",\"--sql-params\", \"$sqlParams\"")
],
"sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.executor.cores=4 --conf spark.executor.memory=10G --conf spark.driver.memory=4G --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=80000000 --conf spark.default.parallelism=2000 --conf spark.sql.shuffle.partitions=20000 --conf spark.sql.files.maxPartitionBytes=50000000 --conf spark.sql.storeAssignmentPolicy=LEGACY --conf spark.emr-serverless.executor.disk=100G --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.sql.parquet.compression.codec=gzip"
}
},
"configurationOverrides":{
"monitoringConfiguration":{
"s3MonitoringConfiguration":{
"logUri":"$APP_S3_HOME/logs"
}
}
}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json
export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
--no-paginate --no-cli-pager --output text \
--name my-spark-sql-job \
--application-id $EMR_SERVERLESS_APP_ID \
--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
--tags sgt:project=emr_serverless_dmp,sgt:usage=common_common_compute_emr \
--execution-timeout-minutes 0 \
--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
--query jobRunId)
now=$(date +%s)sec
while true; do
jobStatus=$(aws emr-serverless get-job-run \
--no-paginate --no-cli-pager --output text \
--application-id $EMR_SERVERLESS_APP_ID \
--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
--query jobRun.state)
if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
for i in {0..5}; do
echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
sleep 1
done
elif [ "$jobStatus" = "FAILED" ] || [ "$jobStatus" = "CANCELLED" ]; then
printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"
exit 1
else
printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"
break
fi
done
}
submit-spark-sql-job "$APP_S3_HOME/sql/dwm_user_app_action_sum_all.sql" "APP_S3_HOME=$APP_S3_HOME,last_hour_yyyy-MM-dd=${last_hour_yyyy-MM-dd},last_hour_yyyyMMdd=${last_hour_yyyyMMdd},last_day_yyyyMMdd=${last_day_yyyyMMdd},last_hour_HH=${last_hour_HH}"
关于用 shell 提交 EMR Serverless 任务和 emr-serverless-utils-1.0.jar 脚本,请参考博客【如何在 Amazon EMR Serverless 上执行纯 SQL 文件】,以上是提交 Spark Sql 任务的脚本,如果是 Spark Jar 任务,直接将任务 jar 替换 emr-serverless-utils-1.0.jar,其他参数根据实际情况修改即可。
通过参数优化任务配置
在 DolphinScheduler 中,有全局参数、项目参数、本地参数等各种参数可以定义,定义的参数可以在任务配置、脚本、数据源定义中使用,通过参数定义的方式,可以在一个地方定义,多处使用,这样减少配置复杂度、增加灵活性。比如,在 Kyuubi 数据源配置中使用参数,可以统一数据源定义、还能根据任务灵活调整资源用量,在 SparkJar 任务配置中灵活定义目标集群。配置样例如下:
DolphinScheduler 定制化开发
虽然通过以上介绍,我们可以看到通过 DolphinScheduler 可以灵活、方便、高效地管理、调度各种 Spark 任务并提交到 EMR,但是在实际使用中,DolphinScheduler3.2 版本还存在以下几个问题:
- 当通过 EMR 原生方式提交任务后,在任务实例中看到的状态是 EMR Step 的状态,不能真正体现 Yarn 上任务状态;
- 无法真正停止任务,在任务实例列表中停止任务,实际上停止的是 EMR Step,而不是 Yarn 上的任务;
- DolphinScheduler 的参数,在 Kyuubi 数据源定义中和 EMR Step Json 中无法生效;
- DolphinScheduler 在 Sql 任务结束后没有关闭连接,Yarn 上还一直保持运行状态,资源不释放;
对此,我们修改了部分 DolphinScheduler 源码,解决了上述问题,核心代码如下:
- 从 Yarn rest API 读取任务的真实状态,解决读 step 状态不准确的问题
private String getYarnAppStatus(String yarnAppId) throws Exception {
log.info("Trying getting status from Yarn. App ID: " + yarnAppId);
URL url = new URL(yarnAppUri + yarnAppId);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Accept", "application/json");
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
}
BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
String output, jsonResponse = "";
while ((output = br.readLine()) != null) {
jsonResponse += output;
}
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonResponse);
JsonNode appNode = rootNode.path("app");
JsonNode stateNode = appNode.path("state");
String appStatus = stateNode.asText();
log.info("emr step [clusterId:{}, stepId:{}, appId:{}] running with status:{}", clusterId, stepId, yarnAppId, appStatus);
return appStatus;
}
- 当 kill 任务时,直接通过 Yarn rest API 进行操作,防止 kill step 之后任务在 Yarn 上仍在运行
private void killYarnApp(String yarnAppId) throws Exception {
URL url = new URL(yarnAppUri + yarnAppId + "/state");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setDoOutput(true);
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("Accept", "application/json");
String payload = "{\"state\": \"KILLED\"}";
OutputStream os = conn.getOutputStream();
byte[] input = payload.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_ACCEPTED) {
log.info("Yarn application " + yarnAppId + " has been successfully marked for kill.");
} else {
log.info("Failed to kill the application. Response code: {}", responseCode);
throw new RuntimeException();
}
}
- 在 Sql 任务重,解析参数,解决参数无法生效的问题
public String convertParameterPlaceholders(String originParamStr) {
String parameterPlaceholders = ParameterUtils.convertParameterPlaceholders(
originParamStr,
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
log.info("Sql task input param: {}", parameterPlaceholders);
return parameterPlaceholders;
}
public void handle(TaskCallBack taskCallBack) throws TaskException {
log.info("Full sql parameters: {}", sqlParameters);
try {
// get datasource
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
convertParameterPlaceholders(sqlTaskExecutionContext.getConnectionParams()));
List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
.splitAndRemoveComment(sqlParameters.getSql());
...
}
...
}
总结
本文介绍了如何在基于开源工具构建的 EMR 数据分析平台中,通过引入 Apache DolphinScheduler,实现对 EMR 任务的统一管理和调度。DolphinScheduler 不仅提供了可视化的任务编排界面,还支持多种任务提交方式,能够满足不同场景下的 EMR 任务调度需求。在实际应用中,我们采用了三种主要的任务提交方式:
- 通过 EMR SDK 提交 SparkJar 任务
- 使用 Kyuubi 提交 SparkSQL 任务
- 采用 Shell 脚本方式提交 EMR Serverless 任务
通过参数化配置,我们提高了任务管理的灵活性和复用性。虽然在使用过程中发现了一些问题,如任务状态显示、任务停止控制等,但通过源码级别的定制开发,这些问题都得到了解决。
DolphinScheduler 的引入,使我们能够更好地管理复杂的数据处理工作流,提高了任务调度的可靠性和效率,为构建完整的 EMR 数据分析平台提供了重要支撑。
系列文章
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-one/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-two/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-four/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-five/
本篇作者