在我们构建的基于开源工具与 EMR 的数据分析平台中,我们使用 DolphinScheduler 进行离线任务调度,在 DolphinScheduler 中可以直接提交 Spark SQL 任务,但是是以 Spark-sql Shell 的模式,这种模式下,一方面正如在上一篇中讲到的,DolphinScheduler 环境与 EMR 环境依赖度、耦合性高,另一方面,driver 运行在 Worker 节点,当并行任务量较多时,会对 DolphinScheduler Worker 节点造成较大压力。所以我们使用了 Apache Kyuubi 作为 Spark SQL的提交网关,可以提供更高并发的 Spark SQL 执行能力。
Kyuubi 简介
Kyuubi 是一个 Thrift JDBC/ODBC 服务,支持多租户和分布式等特性。Kyuubi 通过 Thrift JDBC/ODBC 接口为终端用户提供了一个统一的纯 SQL 网关查询接口,使其能够以标准 SQL 语法查询和分析各种数据源,可以使用预编程和可扩展的 Spark SQL 引擎处理大规模数据。这种“开箱即用”的模型最大限度地减少了终端用户在客户端使用 Spark 的障碍和成本。在服务器端,Kyuubi Server 和引擎的多租户架构为管理员提供了一种实现计算资源隔离、数据安全性、高可用性、高客户机并发性等的方案。
尽管已有 Spark Thrift Server (STS),但 Kyuubi 封装了 Spark,将 Spark SQL 作为 serverless 服务的一种选择。使得即使没有 Spark 知识背景的用户也能轻松使用 Spark 进行查询。相比 STS 的任务共享同一 SparkContext ,Kyuubi 提供了独立的 SparkSession 实现用户隔离,支持大规模并发查询的弹性扩展。此外,Kyuubi 提供了可靠的服务层设计,避免单点故障,提升高可用性,确保查询任务的稳定性与性能优化,从而满足更高的生产需求。
Kyuubi 的技术架构主要由 Kyuubi 服务器、客户端连接和执行请求组成。Kyuubi 会将连接请求管理为会话,并将执行请求视为与会话绑定的操作。会话的创建分为轻量级和重量级两种情况,绝大多数是轻量级的,只有在首次连接或长时间未连接时,才会创建重量级会话。Kyuubi 以松散耦合的方式维护与 SparkContext 的连接,这些 SparkContext 可在不同的部署模式下创建并能够共享,统一接口方面 Kyuubi 实现了 Hive Service RPC 模块,提供与 HiveServer2 和 Spark Thrift Server 相同的数据访问方式。用户只需熟悉 SQL 和 JDBC,即可处理大规模数据,便于业务系统的设计和实现,如下图所示。
本方案中的 Kyuubi 架构
通过架构图来一起看一下 Kyuubi 的请求路径,如下所示:
- 图表顶部是客户端层。客户端可以从服务发现层的命名空间中找到多个注册的 Kyuubi 实例(i.),然后选择进行连接。注册到相同命名空间的 Kyuubi 实例可以相互进行负载均衡。
- 选定的 Kyuubi 实例将从服务发现层的引擎命名空间中选择一个可用的引擎实例(i.)以建立连接。如果没有可用的实例,Kyuubi 将创建一个新的引擎实例,等待其完成注册,然后继续连接。
- 如果同一个用户请求新的连接,连接将设置到相同或另一个 Kyuubi 实例,但会重用已有的引擎实例。
- 对于不同用户的连接,请求步骤 2 和 3 将被重复。这是因为在服务发现层中,用于存储引擎实例地址的命名空间默认是基于用户隔离的,不同用户无法访问其他用户的实例。
Kyuubi 与 Spark STS 对比,如下表格所示:
|
Spark STS |
Kyuubi |
隔离性 |
隔离性差:不同用户之间的查询可能会共享同一个 SparkContext,导致资源隔离性差,可能引起资源竞争和任务冲突。 |
多租户隔离性好:每个用户的查询任务运行在不同的 SparkSession 中,提供更好的资源隔离性和稳定性。 |
扩展性 |
扩展性有限:STS 本身是一个单点服务,无法做到弹性扩展,处理大规模并发查询时性能表现较差。 |
弹性扩展:Kyuubi 基于 Spark 的多实例架构,可以轻松水平扩展以支持大规模并发查询需求。 |
任务调度 |
任务调度问题:由于所有任务在同一个 SparkContext 中运行,任务调度变得更加复杂和不可预测,长时间运行的查询可能会阻塞其他查询。 |
资源管理更灵活:支持通过 YARN 或 Kubernetes 等资源管理平台进行细粒度的资源调度和任务执行,更加灵活。 |
服务稳定性 |
服务稳定性问题:单个 SparkContext 可能会导致整个 Thrift Server 崩溃,从而影响所有用户的查询任务。 |
高可用性:支持多实例部署,单个实例的故障不会影响整个服务,提供了更好的高可用性。 |
性能优化 |
有限的性能优化:依赖于 Spark 的内存缓存和查询优化器功能,存在资源竞争问题。 |
更好的性能优化:支持高级功能,如 SQL 缓存、结果缓存等,进一步提升查询性能,减少重复计算和 I/O 操作。 |
使用 Bootstrap(引导操作)脚本在 EMR 上部署 Kyuubi
在本方案中,我们通过 EMR 引导操作,将 Kyuubi 部署在 EMR Master 主节点。引导操作是在启动 Amazon EMR 实例后,在集群上运行的脚本。它在安装应用程序和处理数据之前执行,且在添加新节点时同样运行。通过引导操作用户可以在集群节点上安装额外的软件、配置文件或运行自定义脚本,从而满足用户的特定需求。关于引导操作更详细的内容,请您参阅文档[1]。 通过引导操作脚本部署 Kyuubi 一共涉及到 3 个步骤和 2 个脚本。具体内容如下:
步骤 1:下载 Kyuubi 安装包并上传到一个 S3 存储桶
# 指定s3路径
s3_path=s3://xxxx/tmp/
# 例如以下载1.9.2版本为例
wget https://dlcdn.apache.org/kyuubi/kyuubi-1.9.2/apache-kyuubi-1.9.2-bin.tgz
# 使用aws cli将tar包上传到指定的s3路径
aws s3 cp apache-kyuubi-1.9.2-bin.tgz $s3_path
步骤 2:准备 2 个脚本
第一个脚本为安装和配置 Kyuubi 的脚本,保存为 kyuubi_install.sh,并上传到 S3 桶。脚本内容为:
#!/bin/bash
# 保存脚本为kyuubi_install.sh,将脚本上传到S3
kyuubi_s3_path=$1
# Wait for EMR provisioning to become successful.
while [[ $(sed '/localInstance {/{:1; /}/!{N; b1}; /nodeProvision/p}; d' /emr/instance-controller/lib/info/job-flow-state.txt | sed '/nodeProvisionCheckinRecord {/{:1; /}/!{N; b1}; /status/p}; d' | awk '/SUCCESSFUL/' | xargs) != "status: SUCCESSFUL" ]];
do
sleep 3
done
# Now the EMR cluster is ready. Do your work here.
IS_MASTER=$(cat /mnt/var/lib/info/instance.json | jq -r ".isMaster" | grep "true" || true);
if [ -z $IS_MASTER ]; then
exit 0
fi;
# download
cd /mnt/
mkdir kyuubi && cd kyuubi
aws s3 cp $kyuubi_s3_path .
tar -xvzf apache-kyuubi-1.9.2-bin.tgz
ln -s apache-kyuubi-1.9.2-bin kyuubi
# config
cd /mnt/kyuubi/kyuubi/conf
mv kyuubi-env.sh.template kyuubi-env.sh
sed -i -e '$a\export SPARK_HOME=/usr/lib/spark' kyuubi-env.sh
sed -i -e '$a\export HADOOP_CONF_DIR=/etc/hadoop/conf' kyuubi-env.sh
# env中kyuubi中的jvm内存可以调大
mv /mnt/kyuubi/kyuubi/conf/kyuubi-defaults.conf.template /mnt/kyuubi/kyuubi/conf/kyuubi-defaults.conf
export zk=hostname
sed -i -e '$a\kyuubi.ha.addresses '$zk':2181' kyuubi-defaults.conf
sed -i -e '$a\kyuubi.session.engine.idle.timeout PT10M' kyuubi-defaults.conf
# USER, CONNECTION, GROUP, SERVER
sed -i -e '$a\kyuubi.engine.share.level CONNECTION' kyuubi-defaults.conf
# kyuubi-defaults.conf 添加
sed -i -e '$a\spark.dynamicAllocation.initialExecutors=1' kyuubi-defaults.conf
sed -i -e '$a\spark.dynamicAllocation.minExecutors=1' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.legacy.allowNonEmptyLocationInCTAS=true' kyuubi-defaults.conf
sed -i -e '$a\spark.serializer=org.apache.spark.serializer.KryoSerializer' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.hive.convertMetastoreParquet=false' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.storeAssignmentPolicy=LEGACY' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.session.timeZone=GMT+8' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.filesourceTableRelationCacheSize=0' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.sources.default=parquet' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED' kyuubi-defaults.conf
sed -i -e '$a\spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict' kyuubi-defaults.conf
sed -i -e '$a\spark.hadoop.hive.exec.dynamic.partition=true' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.sources.partitionOverwriteMode=DYNAMIC' kyuubi-defaults.conf
sed -i -e '$a\spark.driver.maxResultSize=10G' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.parquet.int96RebaseModeInWrite=CORRECTED' kyuubi-defaults.conf
sed -i -e '$a\spark.sql.parquet.int96RebaseModeInRead=CORRECTED' kyuubi-defaults.conf
sed -i -e '$a\spark.master=yarn' kyuubi-defaults.conf
sed -i -e '$a\spark.submit.deployMode=cluster' kyuubi-defaults.conf
sed -i -e '$a\kyuubi.session.engine.startup.waitCompletion=false' kyuubi-defaults.conf
# spark engine初始化的超时时间,默认180000ms,比如集群没有资源,spark submit提交等待资源,如果超时这个超时时间,application会被kill, client链接超时失败,可以调大
sed -i -e '$a\kyuubi.session.engine.initialize.timeout=3600000' kyuubi-defaults.conf
# 这是当前executor的内存设定,可以根据需求调整
sed -i -e '$a\spark.executor.memory=4096M' kyuubi-defaults.conf
sed -i -e '$a\spark.emr.default.executor.memory=4096M' kyuubi-defaults.conf
sed -i -e '$a\spark.executor.cores=2' kyuubi-defaults.conf
sed -i -e '$a\spark.emr.default.executor.cores=2' kyuubi-defaults.conf
# kyuubi-env KYUUBI_JAVA_OPTS -Xmx10g, 下面命令去掉kyuubi默认配置JVM 10GB配置的注释
sed -i '/^#.*export KYUUBI_JAVA_OPTS/s/^#//g' kyuubi-env.sh
# spark driver memory,也可以在kyuubi-defaults.conf设置,可以针对每个不通用户设置,改值是可以被kyuubi-defaults.conf中设置的driver内存覆盖
sudo sed -i -e '$a\spark.driver.memory 1024M' /etc/spark/conf/spark-defaults.conf
# start
sudo chown -R hadoop:hadoop /mnt/kyuubi
sudo -u hadoop /mnt/kyuubi/kyuubi/bin/kyuubi start
exit 0
第二个脚本为 EMR bootstrap 脚本,内容为下载 Kyuubi 安装脚本并运行。同样,此脚本需要上到 S3 桶
#!/bin/bash
# 保存脚本为kyuubi_bootstrap.sh 将脚本上传到S3
kyuubi_script_s3_path=$1
kyuubi_s3_path=$2
aws s3 cp $kyuubi_script_s3_path .
chmod +x kyuubi_install.sh
nohup ./kyuubi_install.sh $kyuubi_s3_path &>/dev/null &
步骤 3:启动 EMR 时指定 bootstrap 脚本
最终在 EMR 里填写 bootstrap 脚本时的配置。其中 Name 为自定义名称,Script location 为上述第 2 个脚本的 S3 路径。Arguments 里第一个参数为第 1 个脚本的 S3 路径,第二个参数为步骤 1 里上传到 S3 的 kyuubi tar 包路径。
使用 DolphinScheduler 通过 Kyuubi 提交 Spark sql 任务
以下示例使用 3.2.1 版本 DolphinScheduler。
创建数据源
- 创建数据源,点击“源中心” → “创建源”
- 选择源类型为 KYUUBI(图左)
- 填写数据源参数(图右),在这里可以通过 JDBC 的方式设置不同的 kyuubi spark session 的资源,例如 driver 和 executor 的 vcore 和 memory,同时根据上一篇博客中提到的,可以通过项目参数、本地参数的方式传入变量。
- 填写完所有参数后可以点击“测试连接”以测试连通性,点击“确定”创建数据源。
提交任务
- 导航栏点击“项目管理”,创建一个项目。
- 进入步骤 5 创建的项目,创建工作流。
- 将左侧通用组件中的 SQL 拖拽至右侧区域。
- 然后输入节点名称,选择数据源类型为“KYUUBI”,然后选择数据源实例为步骤 4 创建的数据源。
- 创建好工作流定义之后点击保存。
- 保存后就成功创建了工作流,点击上线按钮后再点击运行,工作流即开始按照配置运行。
- 查看工作流的运行结果。
- 对应在 EMR 集群的 Yarn Web UI 上的作业运行结果如下所示。
总结
通过本文的介绍,我们可以了解 Kyuubi 作为 Thrift JDBC/ODBC 服务在大数据处理中的优势,特别是在多租户隔离、资源调度和扩展性方面的显著优势。Kyuubi 相较于传统的 Spark Thrift Server,能够更好地隔离不同用户的查询任务,避免资源竞争,提高系统稳定性。此外,Kyuubi 的弹性扩展能力和高级性能优化功能,使其在处理大规模并发查询时表现尤为出色。以及如何通过 Amazon EMR 的 Bootstrap 脚本在集群中自动化安装和配置 Kyuubi,从而简化了集群管理和运维过程。此外,通过将 Kyuubi 与 DolphinScheduler 集成,可以轻松实现 Spark Sql 任务的调度和管理,进一步提升了大数据工作流的效率和可视化管理能力。Kyuubi 在数据分析和处理场景中提供了强大的支持,为企业级大数据处理提供了一个灵活且高效的解决方案。
参考文档
创建引导操作以安装其它软件 – Amazon EMR
DolphinScheduler | 文档中心 (apache.org)
Apache Kyuubi – Multi-tenant Thrift JDBC/ODBC server
系列文章
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-one/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-two/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-three/
https://aws.amazon.com/cn/blogs/china/building-an-emr-data-analysis-platform-based-on-open-source-tools-part-five/
本篇作者