亚马逊AWS官方博客

在AWS EMR Core节点部署Flink Client的实战指南

摘要:本文为您详细介绍如何通过 Bootstrap Action 在新Amazon EMR集群的 Core 节点上自动安装完整的 Flink Client 工具链,整个方案遵循”一次打包、多次复用”的原则,只要 EMR 版本不变,打包产物可以在后续所有集群创建时重复使用,无需重复操作。


一、概述

在现代大数据架构中,Apache Flink 已成为流批一体计算的核心引擎,广泛应用于实时数据处理、ETL 流水线和复杂事件处理等场景。Amazon EMR(Elastic MapReduce)作为 AWS 托管的大数据平台,提供了对 Flink 的原生支持,极大地简化了集群的运维成本。

然而,在实际生产环境的落地过程中, EMR 的默认安装行为存在一个限制:**Flink Client 工具链仅安装在 Master 节点上,Core 节点默认不具备直接提交 Flink 任务的能力**。Core 节点的职责是运行 YARN NodeManager 和 HDFS DataNode,负责实际的计算和存储,而非任务提交。随着企业 AI 化和数据平台建设的深入,越来越多的客户选择在 EMR Core 节点上部署 DolphinScheduler、Apache Airflow 等工作流调度系统的 Worker 组件。这些调度系统的 Worker 进程需要能够直接调用 `flink`、`flink-sql-client`、`yarn-session.sh` 等命令来提交 Flink 作业。如果 Core 节点上没有 Flink Client,调度系统只能通过远程 SSH 到 Master 节点执行任务,增加了架构复杂性和维护成本。

本文以 **EMR 7.10.0(Flink 1.20.0-amzn-4)** 为例,详细记录如何通过 Bootstrap Action 在新集群的 Core 节点上自动安装完整的 Flink Client 工具链,整个方案遵循”一次打包、多次复用”的原则,只要 EMR 版本不变,打包产物可以在后续所有集群创建时重复使用,无需重复操作。

二、测试环境准备

2.1. EMR 版本与 Flink 版本的关系

在开始之前,有一点值得说明。EMR 控制台显示的 **Flink 1.20.0** 是 Apache 开源社区的基础版本号,而实际安装的 **Flink 1.20.0-amzn-4** 是 AWS 基于开源版本的定制构建,包含 AWS 服务集成优化、安全补丁和性能改进。EMR 中的 Flink 版本与 EMR 发行版绑定,选择 `emr-7.10.0` 即自动获得 `Flink 1.20.0-amzn-4`,因此打包时需要使用完全相同的 EMR 版本。

2.2. 前置条件

  • AWS CLI 已安装并配置好具有足够权限的 IAM 凭证
  • 一个用于存放 Bootstrap 脚本和 Flink 打包文件的 S3 Bucket
  • 创建一个用于打包Flink二进制文件的Source 集群:
aws emr create-cluster \
--release-label emr-7.10.0 \
--applications Name=Flink Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole \
--name "Flink-Source-Cluster"

记下返回的 `ClusterId`,例如 `j-XXXXXXXXXXXXX`。等待集群进入 `WAITING` 状态(约 5-10 分钟)

[图1]

三、方案详述

从已有 EMR 集群的 Master 节点打包 Flink Client 二进制文件和配置文件,上传到 S3,然后通过 Bootstrap Action 在新集群的 Core/Task 节点上自动安装。

3.1. 步骤一:连接到已有的EMR Master 节点并验证 Flink

通过SSH或者SSM Manager连接到已有的EMR Master节点上。

# 连接后验证Flink版本

flink --version

[图2]

# 验证 Flink 可以在 YARN 上正常运行(注意:在 Master 节点上必须用完整路径,否则会因 `$bin` 路径解析问题报错;同时需要使用 `hadoop` 用户或 `sudo -u hadoop` 来获得 HDFS 写权限):
sudo -u hadoop 
# 启动一个 Flink YARN Session(验证 Flink 可以跑起来)
/usr/lib/flink/bin/yarn-session.sh -d -jm 1024 -tm 2048
# 查看 YARN 上的 Flink Session
yarn application -list

[图3]

3.2. 步骤二:打包Flink Client 并且上传到S3上

# 打包 Flink 二进制文件(包含 bin、lib、opt、plugins、examples 等目录)
tar -czf flink-client-1.20.0-amzn-4.tar.gz -C /usr/lib flink
# 打包 Flink 配置文件
tar -czf flink-conf.tar.gz -C /etc flink
# 上传到 S3(替换为实际 bucket 名称)
aws s3 cp flink-client-1.20.0-amzn-4.tar.gz s3://<your-bucket>/emr/bootstrap/
aws s3 cp flink-conf.tar.gz s3://<your-bucket>/emr/bootstrap/
# 退出 Master 节点
Exit

[图4]

打包完成后,这两个文件可长期复用于相同 EMR 版本的新集群,无需重复打包。

3.3. 步骤三:编写 Bootstrap Action 脚本

在本地创建 install_flink_client.sh,脚本会在集群启动时检测节点类型,仅在 Core/Task 节点上执行安装逻辑:

#!/bin/bash
# install_flink_client.sh
# Bootstrap Action: Install Flink client on Core/Task nodes
# Compatible with EMR 7.10.0 (Flink 1.20.0-amzn-4)
#
# Usage: Upload to S3 and reference as Bootstrap Action when creating EMR cluster
#   aws s3 cp install_flink_client.sh s3://<your-bucket>/emr/bootstrap/
set -e
S3_BUCKET="<your-bucket>"
FLINK_TARBALL="flink-client-1.20.0-amzn-4.tar.gz"
FLINK_CONF_TARBALL="flink-conf.tar.gz"
S3_PREFIX="s3://${S3_BUCKET}/emr/bootstrap"
IS_MASTER=$(cat /emr/instance-controller/lib/info/instance.json | jq -r '.isMaster')
if [ "$IS_MASTER" = "false" ]; then
    echo "Core/Task node detected, installing Flink client..."
    # 1. Download and extract Flink client binaries
    echo "Downloading Flink client binaries from S3..."
    aws s3 cp "${S3_PREFIX}/${FLINK_TARBALL}" /tmp/
    sudo tar -xzf "/tmp/${FLINK_TARBALL}" -C /usr/lib/
    # 2. Download and extract Flink config files
    echo "Downloading Flink config from S3..."
    aws s3 cp "${S3_PREFIX}/${FLINK_CONF_TARBALL}" /tmp/
    sudo tar -xzf "/tmp/${FLINK_CONF_TARBALL}" -C /etc/
    # 3. Fix broken /etc/flink/conf symlink
    #    On Core nodes, /etc/flink/conf points to /etc/alternatives/flink-conf
    #    which does not exist (only created on Master nodes by EMR)
    if [ -L "/etc/flink/conf" ] && [ ! -e "/etc/flink/conf" ]; then
        echo "Fixing broken symlink /etc/flink/conf..."
        sudo rm -f /etc/flink/conf
        if [ -d "/etc/flink/conf.dist" ]; then
            sudo cp -r /etc/flink/conf.dist /etc/flink/conf
        else
            sudo mkdir -p /etc/flink/conf
        fi
    fi
    # 4. Fix broken /usr/lib/flink/conf symlink
    if [ -L "/usr/lib/flink/conf" ] && [ ! -e "/usr/lib/flink/conf" ]; then
        echo "Fixing broken symlink /usr/lib/flink/conf..."
        sudo rm -f /usr/lib/flink/conf
        sudo ln -sf /etc/flink/conf /usr/lib/flink/conf
    fi
    # 5. Create wrapper scripts (NOT symlinks)
    #    Symlinks cause $bin to resolve to /usr/bin/ instead of /usr/lib/flink/bin/,
    #    breaking relative path references like `. "$bin"/config.sh` inside the scripts.
    echo "Creating wrapper scripts in /usr/bin/..."
    sudo tee /usr/bin/flink > /dev/null << 'WRAPPER'
#!/bin/bash
exec /usr/lib/flink/bin/flink "$@"
WRAPPER
    sudo chmod +x /usr/bin/flink
    sudo tee /usr/bin/flink-sql-client > /dev/null << 'WRAPPER'
#!/bin/bash
exec /usr/lib/flink/bin/sql-client.sh "$@"
WRAPPER
    sudo chmod +x /usr/bin/flink-sql-client
    sudo tee /usr/bin/flink-sql-gateway > /dev/null << 'WRAPPER'
#!/bin/bash
exec /usr/lib/flink/bin/sql-gateway.sh "$@"
WRAPPER
    sudo chmod +x /usr/bin/flink-sql-gateway
    sudo tee /usr/bin/yarn-session.sh > /dev/null << 'WRAPPER'
#!/bin/bash
exec /usr/lib/flink/bin/yarn-session.sh "$@"
WRAPPER
    sudo chmod +x /usr/bin/yarn-session.sh
    # 6. Set environment variables
    cat << 'ENVEOF' | sudo tee /etc/profile.d/flink.sh
export FLINK_HOME=/usr/lib/flink
export PATH=$FLINK_HOME/bin:$PATH
ENVEOF
    # 7. Clean up temp files
    rm -f "/tmp/${FLINK_TARBALL}" "/tmp/${FLINK_CONF_TARBALL}"
    echo "Flink client installation completed successfully."
else
    echo "Master node detected, skipping (Flink client already installed by EMR)."
fi

将脚本上传到 S3

aws s3 cp install_flink_client.sh s3://<your-bucket>/emr/bootstrap/

[图5]

3.4. 步骤四:创建正式集群(使用Bootstrap Action)

aws emr create-cluster \
--release-label emr-7.10.0 \
--applications Name=Flink Name=Hadoop \
--bootstrap-actions Path=s3:// <your-bucket>/emr/bootstrap/install_flink_client.sh,Name="Install Flink Client on Core Nodes" \
--instance-type m5.xlarge \
--instance-count 3 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole \
--name "Flink-Source-Cluster"

[图6]

3.5. 步骤五:验证 Core 节点安装结果

Core 节点没有公网 IP,需要通过 Master 节点跳转。首先连接到Master节点,列出 Core 节点的内网地址,并且ssh到任意一个Core节点进行验证:

flink --version
flink-sql-client --help
yarn-session.sh –help

如需排查问题,查看 Bootstrap Action 日志:

sudo cat /var/log/bootstrap-actions/1/stdout
sudo cat /var/log/bootstrap-actions/1/stderr

[图7]

四、结语

本文介绍了一种通过 Bootstrap Action 在 EMR Core 节点上安装 Flink Client 工具链的完整方案,主要解决了在 Core 节点部署 DolphinScheduler、Airflow 等调度系统 Worker 时无法直接提交 Flink 任务的痛点。方案的核心思路是从 Master 节点打包 Flink 二进制文件和配置,上传至 S3,然后在新集群启动时通过 Bootstrap Action 自动分发到 Core/Task 节点。只要 EMR 版本不变,S3 上的打包文件可以反复用于新集群创建,无需重新打包,真正实现一次配置、持续复用。

随着企业大数据架构的持续演进,EMR 上的 Flink 工作负载将越来越多地与调度、监控、数据治理等周边系统深度集成。在进行此类集成设计时,建议提前梳理各组件的部署位置与运行时依赖,充分考虑 Master 单点瓶颈与 Core 节点弹性伸缩的权衡,并在正式上线前通过 Bootstrap Action 日志和节点级别的命令验证确认安装结果,以避免调度系统在生产环境中因环境缺失而静默失败。

➡️ 下一步行动:

相关产品:

  • Amazon EMR — 轻松运行大数据框架
  • Amazon S3 — 适用于 AI、分析和存档的几乎无限的安全对象存储
  • Amazon EC2 — 安全且可调整大小的计算容量
  • Amazon IAM — 身份管理和访问权限

相关文章:

五、参考资料

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

白雪尧

亚马逊云科技解决方案架构师,曾任职于 SAP、微软的开发、技术支持部门。对高并发低延迟现代化应用架构、数据分析有丰富经验,对web3/HPC/半导体设计行业有行业经验。

殷雨濛

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的云计算方案的架构设计。曾在 IBM、HPE 和花旗银行担任数据科学家和研发负责人,在 IT 咨询、数据科学和软件研发领域有丰富的实践经验。专注于推动技术创新,通过高效的解决方案设计,帮助企业实现业务转型和增长。


AWS 架构师中心:云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用