亚马逊AWS官方博客

基于开源工具构建 EMR 数据分析平台(三)使用 DolphinScheduler 进行 EMR 任务调度

在我们构建的基于开源工具与 EMR 的数据分析平台中,Amazon EMR 是核心的计算引擎,提供了 Apache Spark 执行框架,底层使用 YARN 作为资源管理。通常数据驱动的公司,每天有大量的离线任务需要执行,这些任务需要在不同的时间、以不同的周期执行,任务之间有前后依赖关系,任务也由 sql、shell、java 等不同语言编写,任务分布在不同的部门,有不同的 owner,所以需要一个统一的管理、调度、编排工具,能够按部门、按项目管理任务,使用可视化的方式编写任务调度工作流,在本方案中我们引入了开源的任务调度系统 DolphinScheduler。

DolphinScheduler 介绍

Apache DolphinScheduler 是一个分布式易扩展的可视化 DAG 工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系,并为应用程序提供数据和各种 OPS 编排中的关系。 解决数据研发 ETL 依赖错综复杂,无法监控任务健康状态的问题。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。

特性

  • 简单易用
    • 可视化 DAG:用户友好的,通过拖拽定义工作流的,运行时控制工具
    • 模块化操作:模块化有助于轻松定制和维护。
  • 丰富的使用场景
    • 支持多种任务类型:支持 Shell、MR、Spark、SQL 等 10 余种任务类型,支持跨语言,易于扩展
    • 丰富的工作流操作:工作流程可以定时、暂停、恢复和停止,便于维护和控制全局和本地参数。
  • High Reliability
    • 高可靠性: 去中心化设计,确保稳定性。 原生 HA 任务队列支持,提供过载容错能力。 DolphinScheduler 能提供高度稳健的环境。
  • High Scalability
    • 高扩展性: 支持多租户和在线资源管理。支持每天 10 万个数据任务的稳定运行。

DolphinScheduler 安装配置

DolphinScheduler 提供了多种部署选项,包括单机部署、伪集群部署、集群部署和 K8S 方式部署。官方部署方式参考【 DolphinScheduler 部署指南】,您也可以在 AWS 部署无服务器版本,部署方式参考博客【AWS 部署无服务器 DolphinScheduler】。

DolphinScheduler 任务管理与调度

任务管理

在 DolphinScheduler 中,任务按照用户、项目、工作流来组织,首先可以配置多用户,每个用户创建、上下线、管理自己的任务;然后再按照项目组织工作流,工作流中编排多个任务。可以配置用户对项目、数据源等的管理权限。这样在一个公司内,可以很好的按照部门、项目组、用户管理任务。

任务提交

DolphinScheduler 支持丰富的任务类型,支持基于 cron 表达式的定时调度和手动调度,命令类型支持:启动工作流、从当前节点开始执行,支持补历史数据。

其中针对 Spark 任务,有原生的 Spark 组件,但是使用该组件时需要 DolphinScheduler 节点上有 EMR 的环境,这样一旦 EMR 重建就需要重新配置环境,难以维护,而且无法在多个 EMR 集群上调度任务。为了减少 DolphinScheduler 和 EMR 的依赖和耦合,我们设计了不依赖与 EMR 环境的任务提交方式。

预先配置

在向 EMR 提交作业时,会使用 AWS SDK 进行作业提交,所以也需要对应的权限认证。通常步骤如下:

  1. 创建一个 IAM Policy,指定 EMR Step 相关权限
  2. 创建一个 IAM User,将步骤 1 里的 Policy 绑定到此 IAM User
  3. 获取此 IAM User 的 Access Key 与 Secret Key

一个参考的 IAM Policy 可以是:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:AddJobFlowSteps",
        "elasticmapreduce:DescribeStep",
        "elasticmapreduce:ListSteps",
        "elasticmapreduce:CancelSteps",
        "elasticmapreduce:ModifyInstanceGroups"
      ],
      "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:ListClusters",
        "elasticmapreduce:DescribeCluster"
      ],
      "Resource": "*"
    }
  ]
}

如果你需要更严格的权限控制,可以将资源 ARN 限制为特定区域或特定集群 ID,例如:

在获取了 AWS Access Key 和 Secret Key 后,编辑 worker server 的 common.properties 配置文件(路径为:worker-server/conf/common.properties),填写以下配置后保存即可:

resource.aws.access.key.id=xxxxx

resource.aws.secret.access.key=xxxxx

resource.aws.region=xxxx

SparkJar 任务

首先在项目管理中选择一个项目,进入工作流定义界面,创建一个新工作流,拖拽“云”分组下的 EMR 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,程序类型选择 ADD_JOB_FLOW_STEPS,stepsDefineJson 按照下面 Json 格式填写,其中 JobFlowId 中填写 EMR Cluster ID,前置任务选择依赖的前置任务。

该组件的功能是通过 EMR SDK 向 EMR 集群增加一个 Step 任务,是 EMR 的原生任务提交方式,由于在 Json 中指定了集群名称,并且可以采用参数的方式提交,可以灵活定义和修改集群。

完整的 stepsDefineJson 格式:

{
  "JobFlowId": "j-3JNBCJK*****",
  "Steps": [
    {
      "Name": "spark-step",
      "ActionOnFailure": "CONTINUE",
      "HadoopJarStep": {
        "Jar": "command-runner.jar",
        "Args": [
         "spark-submit",
         "--deploy-mode",
         "cluster",
         "--master",
         "yarn",
         "--executor-memory","2G",
         "--executor-cores","1",
         "--num-executors","1",
         "--class","org.apache.spark.examples.SparkPi",
         "/usr/lib/spark/examples/jars/spark-examples.jar","1000"
        ]
      }
    }
  ]
}
SparkSql 任务

参考【基于开源工具构建 EMR 数据分析平台(四)使用 Kyuubi 进行 Spark Sql 任务提交】

EMR Serverless 任务

在工作流创建页面,拖拽通用组件下的 SHELL 组件到右侧工作流编排区域,在弹出的任务编辑页面,填写相关信息,在脚本部分填写以下 shell 脚本提交任务到 EMR Serverless。

脚本模板:

#!/bin/bash
set -ex
export APP_NAME='EMR-xxx-spark-prod'
export APP_S3_HOME='s3://xxxxs'
export APP_LOCAL_HOME='/tmp'
export EMR_SERVERLESS_APP_ID='xxxxx'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::xxxx'

submit-spark-sql-job() {
    sqlFiles="$1"
    sqlParams="$2"
    cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{
    "name":"my-spark-sql-job",
    "applicationId":"$EMR_SERVERLESS_APP_ID",
    "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
    "jobDriver":{
        "sparkSubmit":{
        "entryPoint":"$APP_S3_HOME/jar/emr-serverless-utils-1.0.jar",
        "entryPointArguments":[
            $([[ -n "$sqlFiles" ]] && echo "\"--sql-files\", \"$sqlFiles\"")
            $([[ -n "$sqlParams" ]] && echo ",\"--sql-params\", \"$sqlParams\"")
        ],
         "sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.executor.cores=4 --conf spark.executor.memory=10G --conf spark.driver.memory=4G --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=80000000 --conf spark.default.parallelism=2000 --conf spark.sql.shuffle.partitions=20000 --conf spark.sql.files.maxPartitionBytes=50000000 --conf spark.sql.storeAssignmentPolicy=LEGACY --conf spark.emr-serverless.executor.disk=100G --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.sql.parquet.compression.codec=gzip"
        }
   },
   "configurationOverrides":{
        "monitoringConfiguration":{
            "s3MonitoringConfiguration":{
                "logUri":"$APP_S3_HOME/logs"
            }
        }
   }
}
EOF
    jq . $APP_LOCAL_HOME/start-job-run.json
    export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
        --no-paginate --no-cli-pager --output text \
        --name my-spark-sql-job \
        --application-id $EMR_SERVERLESS_APP_ID \
        --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
        --tags sgt:project=emr_serverless_dmp,sgt:usage=common_common_compute_emr \
        --execution-timeout-minutes 0 \
        --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
        --query jobRunId)
    now=$(date +%s)sec
    while true; do
        jobStatus=$(aws emr-serverless get-job-run \
                        --no-paginate --no-cli-pager --output text \
                        --application-id $EMR_SERVERLESS_APP_ID \
                        --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                        --query jobRun.state)
        if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
            for i in {0..5}; do
                echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
                sleep 1
            done
        elif [ "$jobStatus" = "FAILED" ] || [ "$jobStatus" = "CANCELLED" ]; then
            printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"
            exit 1
        else 
            printf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"
            break
        fi
    done
}

submit-spark-sql-job "$APP_S3_HOME/sql/dwm_user_app_action_sum_all.sql" "APP_S3_HOME=$APP_S3_HOME,last_hour_yyyy-MM-dd=${last_hour_yyyy-MM-dd},last_hour_yyyyMMdd=${last_hour_yyyyMMdd},last_day_yyyyMMdd=${last_day_yyyyMMdd},last_hour_HH=${last_hour_HH}"

关于用 shell 提交 EMR Serverless 任务和 emr-serverless-utils-1.0.jar 脚本,请参考博客【如何在 Amazon EMR Serverless 上执行纯 SQL 文件】,以上是提交 Spark Sql 任务的脚本,如果是 Spark Jar 任务,直接将任务 jar 替换 emr-serverless-utils-1.0.jar,其他参数根据实际情况修改即可。

通过参数优化任务配置

在 DolphinScheduler 中,有全局参数、项目参数、本地参数等各种参数可以定义,定义的参数可以在任务配置、脚本、数据源定义中使用,通过参数定义的方式,可以在一个地方定义,多处使用,这样减少配置复杂度、增加灵活性。比如,在 Kyuubi 数据源配置中使用参数,可以统一数据源定义、还能根据任务灵活调整资源用量,在 SparkJar 任务配置中灵活定义目标集群。配置样例如下:

DolphinScheduler 定制化开发

虽然通过以上介绍,我们可以看到通过 DolphinScheduler 可以灵活、方便、高效地管理、调度各种 Spark 任务并提交到 EMR,但是在实际使用中,DolphinScheduler3.2 版本还存在以下几个问题:

  1. 当通过 EMR 原生方式提交任务后,在任务实例中看到的状态是 EMR Step 的状态,不能真正体现 Yarn 上任务状态;
  2. 无法真正停止任务,在任务实例列表中停止任务,实际上停止的是 EMR Step,而不是 Yarn 上的任务;
  3. DolphinScheduler 的参数,在 Kyuubi 数据源定义中和 EMR Step Json 中无法生效;
  4. DolphinScheduler 在 Sql 任务结束后没有关闭连接,Yarn 上还一直保持运行状态,资源不释放;

对此,我们修改了部分 DolphinScheduler 源码,解决了上述问题,核心代码如下:

  • 从 Yarn rest API 读取任务的真实状态,解决读 step 状态不准确的问题
    private String getYarnAppStatus(String yarnAppId) throws Exception {
            log.info("Trying getting status from Yarn. App ID: " + yarnAppId);
            URL url = new URL(yarnAppUri + yarnAppId);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setRequestProperty("Accept", "application/json");
    
            if (conn.getResponseCode() != 200) {
                throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
            }
    
            BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
            String output, jsonResponse = "";
            while ((output = br.readLine()) != null) {
                jsonResponse += output;
            }
    
            ObjectMapper mapper = new ObjectMapper();
            JsonNode rootNode = mapper.readTree(jsonResponse);
            JsonNode appNode = rootNode.path("app");
            JsonNode stateNode = appNode.path("state");
            String appStatus = stateNode.asText();
            log.info("emr step [clusterId:{}, stepId:{}, appId:{}] running with status:{}", clusterId, stepId, yarnAppId, appStatus);
            return appStatus;
        }
    
  • 当 kill 任务时,直接通过 Yarn rest API 进行操作,防止 kill step 之后任务在 Yarn 上仍在运行
    private void killYarnApp(String yarnAppId) throws Exception {
            URL url = new URL(yarnAppUri + yarnAppId + "/state");
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("PUT");
            conn.setDoOutput(true);
            conn.setRequestProperty("Content-Type", "application/json");
            conn.setRequestProperty("Accept", "application/json");
            String payload = "{\"state\": \"KILLED\"}";
            OutputStream os = conn.getOutputStream();
            byte[] input = payload.getBytes(StandardCharsets.UTF_8);
            os.write(input, 0, input.length);
            int responseCode = conn.getResponseCode();
    
            if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_ACCEPTED) {
                log.info("Yarn application " + yarnAppId + " has been successfully marked for kill.");
            } else {
                log.info("Failed to kill the application. Response code: {}", responseCode);
                throw new RuntimeException();
            }
        }
    
  • 在 Sql 任务重,解析参数,解决参数无法生效的问题
    public String convertParameterPlaceholders(String originParamStr) {
            String parameterPlaceholders = ParameterUtils.convertParameterPlaceholders(
                    originParamStr,
                    ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
            log.info("Sql task input param: {}", parameterPlaceholders);
            return parameterPlaceholders;
        }
    public void handle(TaskCallBack taskCallBack) throws TaskException {
            log.info("Full sql parameters: {}", sqlParameters);
            try {
    
                // get datasource
                baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
                        convertParameterPlaceholders(sqlTaskExecutionContext.getConnectionParams()));
                List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
                        .splitAndRemoveComment(sqlParameters.getSql());
                ...
            }
            ...
    }
    

总结

本文介绍了如何在基于开源工具构建的 EMR 数据分析平台中,通过引入 Apache DolphinScheduler,实现对 EMR 任务的统一管理和调度。DolphinScheduler 不仅提供了可视化的任务编排界面,还支持多种任务提交方式,能够满足不同场景下的 EMR 任务调度需求。在实际应用中,我们采用了三种主要的任务提交方式:

  1. 通过 EMR SDK 提交 SparkJar 任务
  2. 使用 Kyuubi 提交 SparkSQL 任务
  3. 采用 Shell 脚本方式提交 EMR Serverless 任务

通过参数化配置,我们提高了任务管理的灵活性和复用性。虽然在使用过程中发现了一些问题,如任务状态显示、任务停止控制等,但通过源码级别的定制开发,这些问题都得到了解决。

DolphinScheduler 的引入,使我们能够更好地管理复杂的数据处理工作流,提高了任务调度的可靠性和效率,为构建完整的 EMR 数据分析平台提供了重要支撑。

系列文章

https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-one/

https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-two/

https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-four/

https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-five/

本篇作者

彭赟

亚马逊云科技资深解决方案架构师,负责基于亚马逊云科技的云计算方案架构咨询和设计,20 多年软件架构、设计、开发、项目管理交付经验,擅长业务咨询、产品设计、软件架构,在大数据、区块链、容器化方向有较深的入研究,具有丰富的解决客户实际问题的经验。

汤市建

亚马逊云科技数据分析解决方案架构师,负责客户大数据解决方案的咨询与架构设计。

郝亮

亚马逊云科技 Analytics 快速原型解决方案架构师,负责根据客户实际的业务场景,利用最新最适用于场景的大数据技术,基于亚马逊云科技服务快速搭建核心系统,解决客户的关键业务诉求,验证方案的可行性。

龚德强

亚马逊云科技资深客户解决方案经理,2019 年加入亚马逊云科技。入职亚马逊云科技之前在软件行业、电信行业工作 20 多年,也曾 2 次创业 ToB 公司,熟悉软件开发、项目管理、项目交付,擅长与客户进行需求沟通。