引言
云时代企业为提升 IT 效能,常将业务迁移至公有云。某新能源头部企业因全球化布局及技术创新需求,计划将业务负载迁移至亚马逊云科技。迁移中面临的一项挑战:亚马逊云科技 Amazon CloudWatch 原生功能缺失 EC2 批量告警配置能力,对拥有海量实例的客户而言,手动逐台配置告警耗时且易错。因此可以采用脚本化的方式批量创建 EC2 CPU 告警,有效降低运维成本。
背景与挑战
该企业迁移至亚马逊云科技后,需监控大量 EC2 实例性能,尤其需为每台实例设置 CPU 使用率阈值告警,以实时预警资源瓶颈。尽管 CloudWatch 是其核心监控服务,却缺乏批量告警功能,需逐台配置。面对数十至上百台实例,手动逐台操作效率低且易引发配置遗漏或错误,直接影响监控覆盖率和系统稳定性。
解决方案设计
为了解决这一挑战,可以利用亚马逊云科技提供的软件开发工具包——Boto3,编写一个 Python 脚本来实现批量创建 EC2 CPU 使用量告警的功能。关于 Boto3 的介绍,可以参考 Boto3 介绍。通过使用 Boto3,我们可以使用编程方式来获取所有运行的 EC2 实例信息,并为每个实例创建自定义的 CPU 使用率告警。这样就可以批量的一次性为所有目标实例配置统一的告警规则,不仅节省了大量时间,还确保了配置的一致性和准确性。
对于新建的少数的 EC2,可以采用 EventBridge 监测 EC2 的 Running 和 Terminated 事件,继而触发 Lambda 的执行,根据EC2上所加 Tag 的信息,来自动的添加和删除相应的 CloudWatch Alarm。
Python 脚本实现
以下是相关的 Python 脚本,用于批量创建 EC2 CPU 使用率告警:
"""
author: RJ.Wang
Date: 2025-03-13
email: wangrenjun@gmail.com
Description: Batch creation of AWS EC2 CPU utilization alarms with TAG filtering
"""
import boto3
import logging
from botocore.exceptions import ClientError
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
AWS_REGION = "ap-southeast-1" # 需要修改为您的 AWS 区域
sns_topic_arn = "arn:aws:sns:xxxxxx:xxxxxx:xxxxxx" # 需要修改为您的 SNS 主题 ARN
# 若要对不同的EC2配置不同的告警参数,可根据TAG 来筛选和分组,KEY和VALUE 值均为 None 时,将对该区域所有为 running 状态的 EC2 进行操作
TAG_KEY = None
TAG_VALUE = None
# 下述示例表示具备 Monitor=yes 的Tag-Key Pair的EC2实例将被创建CloudWatch Alarm。
#TAG_KEY = "Monitor"
#TAG_VALUE = "yes"
ec2_client = boto3.client("ec2", region_name=AWS_REGION)
cw_client = boto3.client("cloudwatch", region_name=AWS_REGION)
sts_client = boto3.client("sts")
# 获取账号ID并缓存
account_id = sts_client.get_caller_identity()["Account"]
def get_instance_ids():
"""
获取所有运行的EC2实例ID,并根据TAG进行筛选
"""
paginator = ec2_client.get_paginator("describe_instances")
instance_ids = []
try:
for page in paginator.paginate():
for reservation in page.get("Reservations", []):
for instance in reservation.get("Instances", []):
instance_id = instance.get("InstanceId")
state = instance.get("State", {}).get("Name", "unknown")
tags = instance.get("Tags", [])
# 检查实例是否符合TAG筛选条件
tag_match = True
if TAG_KEY and TAG_VALUE:
tag_match = False
for tag in tags:
if tag.get("Key") == TAG_KEY and tag.get("Value") == TAG_VALUE:
tag_match = True
break
if state == "running" and tag_match:
instance_ids.append(instance_id)
logger.debug(f"Found running instance with matching TAG: {instance_id}")
else:
logger.debug(f"Skipping instance (state: {state}, TAG match: {tag_match}): {instance_id}")
except ClientError as e:
logger.error(f"Error getting instance IDs: {e}")
except Exception as e:
logger.error(f"Unexpected error getting instances: {e}")
return instance_ids
def delete_existing_alarm(alarm_name):
"""
删除已存在的同名报警
"""
try:
response = cw_client.describe_alarms(AlarmNames=[alarm_name])
if response.get("MetricAlarms"):
cw_client.delete_alarms(AlarmNames=[alarm_name])
logger.info(f"Deleted existing alarm: {alarm_name}")
else:
logger.info(f"No existing alarm found: {alarm_name}")
except ClientError as e:
logger.error(f"Error deleting alarm {alarm_name}: {e}")
def create_cpu_alarm(instance_id, created_alarms):
"""
为指定实例创建CPU使用率报警
"""
alarm_name = f"CPU-{instance_id}-Alarm"
try:
delete_existing_alarm(alarm_name)
cw_client.put_metric_alarm(
AlarmName=alarm_name,
MetricName="CPUUtilization",
Namespace="AWS/EC2",
Statistic="Average",
Period=60,
EvaluationPeriods=2,
Threshold=85.0,
ComparisonOperator="GreaterThanThreshold",
AlarmActions=[sns_topic_arn],
OKActions=[sns_topic_arn],
AlarmDescription=f"Alarm for CPUUtilization on instance {instance_id}", # 增加报警描述
Dimensions=[{"Name": "InstanceId", "Value": instance_id}]
)
logger.info(f"Created CPU alarm: {alarm_name}")
alarm_arn = f"arn:aws:cloudwatch:{AWS_REGION}:{account_id}:alarm:{alarm_name}"
created_alarms.append(alarm_arn)
except ClientError as e:
logger.error(f"Error creating alarm for instance {instance_id}: {e}")
def main():
"""
主函数,执行整个流程
"""
try:
logger.info("Starting program...")
instance_ids = get_instance_ids()
if not instance_ids:
logger.info("No running instances found with matching TAG. Exiting.")
return
logger.info(f"Found {len(instance_ids)} running instances with matching TAG: {instance_ids}")
created_alarms = []
for instance_id in instance_ids:
create_cpu_alarm(instance_id, created_alarms)
total_alarms = len(created_alarms)
logger.info(f"\nTotal new alarms created: {total_alarms}")
logger.info("Alarm ARNs:")
for arn in created_alarms:
logger.info(arn)
logger.info("Program completed successfully.")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
if __name__ == "__main__":
main()
脚本解析
- 初始化客户端:脚本首先初始化了 EC2、CloudWatch 和 STS 客户端,用于与相应的 云服务进行交互。通过 STS 客户端获取当前账号的 ID,以便后续构造报警的 ARN。
- 获取运行的实例 ID:
get_instance_ids
函数使用 EC2 客户端的分页器获取所有运行状态的 EC2 实例 ID。它遍历每个页面的实例,筛选出状态为 “running” 的实例,并收集它们的 ID。若需要对不同的 EC2 设置不同的告警参数,可根据 TAG 来筛选和分组。
- 删除现有报警:在为每个实例创建新报警之前,
delete_existing_alarm
函数会检查是否存在同名的现有报警。如果存在,则删除它,以避免冲突。
- 创建 CPU 报警:
create_cpu_alarm
函数负责创建 CPU 使用率报警。它定义了报警的名称、指标名称、命名空间、统计方式、周期、评估周期、阈值、比较运算符、报警动作、OK 动作、描述以及维度等参数。通过调用 CloudWatch 客户端的 put_metric_alarm
方法,将报警规则发送到 亚马逊云科技 服务。
- 主函数执行流程:
main
函数作为程序的入口,协调整个流程。它首先获取运行的实例 ID 列表,然后为每个实例调用 create_cpu_alarm
函数创建报警,并收集创建的报警 ARN,最后输出总结信息。
配置 SNS 主题和订阅
为了确保告警通知能够及时发送到指定的电子邮件地址,我们需要配置 SNS 主题和订阅。以下是详细的步骤:
1. 创建 SNS 主题:
- 登录 亚马逊云科技 管理控制台,选择 SNS 服务。
- 在左侧导航栏中,选择 **主题**,然后点击 **创建主题**。
- 输入主题名称(例如:
ec2-cpu-alarms
)和显示名称(可选),然后点击 **创建主题**。
- 记录下创建的主题 ARN,例如:
arn:aws:sns:ap-southeast-1:123456789012:ec2-cpu-alarms
。
2. 订阅 SNS 主题:
- 在 SNS 控制台中,选择刚才创建的主题,点击 订阅 选项卡,然后点击 **创建订阅**。
- 在 协议 下拉菜单中选择 **电子邮件**。
- 在 终端节点 中输入要接收告警通知的电子邮件地址。
- 点击 **创建订阅**。
3. 确认订阅:
- 亚马逊云科技 会向指定的电子邮件地址发送一封确认邮件。
- 打开邮件,点击其中的 确认订阅 链接,完成订阅过程。
- 确认后,该电子邮件地址将能够接收来自 SNS 主题的告警通知。
实施步骤
1. 配置 SNS 主题和订阅:按照上述步骤创建 SNS 主题,并通过电子邮件地址订阅该主题,确保订阅已确认。
2. 更新脚本中的变量:在脚本中,将 AWS_REGION 变量的值替换为您的 亚马逊云科技的相关区域,将 sns_topic_arn 变量的值替换为您创建的 SNS 主题 ARN。
3. 在亚马逊云科技 CloudShell 中测试脚本:
- 登录到亚马逊云科技管理控制台,使用具有必要权限的 IAM 账户进行登录。确保该账户具有访问亚马逊云科技 CloudShell 和使用相关亚马逊云科技服务的权限。
- 在亚马逊云科技管理控制台中,启动亚马逊云科技 CloudShell。您可以通过点击控制台右上角的 CloudShell 图标来启动它。
- 在 CloudShell 中,直接将脚本复制到 CloudShell 环境中。
- 运行脚本:在 CloudShell 中运行
python create_ec2_alarms.py
命令。
4. 验证结果:运行脚本后,企业可以在亚马逊云科技管理控制台的 CloudWatch 服务中查看新创建的报警。检查每个实例是否都有对应的 CPU 报警,并验证报警的配置是否符合预期。
5. 收到的告警 mail 通知
新增 EC2 实例告警创建与删除
在实际工作中,批量添加告警信息后,还会有少量的新的计算实例被拉起或终止,此时,这些实例也需要根据企业运维要求增加或者删除相应告警,为了解决这些 EC2 实例的告警问题,本文还提出了另外一种方法来解决该问题。该方法架构图如下所示:
首先创建 EventBridge 规则,检测 EC2 状态的变更,注意选择 Rule with an event pattern,如下图所示:
其中,Event pattern 选择 EC2 Instance State-change Notification,注意选择“running”和‘terminated’状态,原因是需要在 EC2 实例创建时和删除时,自动创建相应告警。如下图所示:
接下来,选择将 EventBridge 的事件发送到一个 SNS Topic,注意改 topic 需要预先建好。关于如何创建 SNS Topic,可以参考文档创建 Amazon SNS 主题。
SNS Topic 收到相关 EC2 状态变化的时间通知后,会调用 Lambda 函数来进行告警的创建和删除(此处要求为 SNS Topic 创建一个订阅,订阅的 Endpoint 需要选择对应的 Lambda 函数)。如下图所示:
相关 Lambda 函数的代码如下:
import boto3
from alarm_config import ALARM_CONFIGS
import os
import json
# Initialize AWS clients
ec2_client = boto3.client('ec2')
cloudwatch_client = boto3.client('cloudwatch')
def create_cloudwatch_alarm(resource_id, metric_category, metric_type, threshold):
"""
Create CloudWatch alarm for various metrics (CPU, Network, etc.).
Args:
resource_id: Resource ID (e.g., EC2 instance ID)
metric_category: Category of metric (e.g., 'cpu', 'network')
metric_type: Type of metric (e.g., 'utilization', 'in_bytes')
threshold: Alarm threshold value
"""
try:
if metric_category not in ALARM_CONFIGS:
raise ValueError(f"Invalid metric category: {metric_category}")
category_config = ALARM_CONFIGS[metric_category]
if metric_type not in category_config:
raise ValueError(f"Invalid metric type: {metric_type}")
config = category_config[metric_type]
description = config['description'].format(resource_id=resource_id, threshold=threshold)
cloudwatch_client.put_metric_alarm(
AlarmName=f'{resource_id}-{config["name"]}',
ComparisonOperator=config["comparison"],
EvaluationPeriods=1,
MetricName=config["metric"],
Namespace=config["namespace"],
Period=300, # 5 minutes
Statistic='Average',
Threshold=threshold,
ActionsEnabled=True,
AlarmDescription=description,
Dimensions=[
{
'Name': config["dimensions_key"],
'Value': resource_id
}
]
)
except ValueError as e:
print(f"Invalid configuration: {str(e)}")
def lambda_handler(event, context):
# Parse the EventBridge event
print(f"Received event: {json.dumps(event)}")
# Extract instance details from the event
sns_message = event['Records'][0]['Sns']['Message']
message_data = json.loads(sns_message)
# Extract the instance ID and state
instance_id = message_data['detail']['instance-id']
state = message_data['detail']['state']
if state == 'running':
# Get instance tags
response = ec2_client.describe_tags(
Filters=[
{
'Name': 'resource-id',
'Values': [instance_id]
}
]
)
for tag in response['Tags']:
key = tag['Key']
value = tag['Value']
if key == 'cpu_utilization':
try:
create_cloudwatch_alarm(instance_id, 'cpu', 'utilization', float(value))
except ValueError:
print(f"Invalid CPU threshold value: {value}")
if key == 'cpu_credit_balance':
try:
create_cloudwatch_alarm(instance_id, 'cpu', 'credit_balance', float(value))
except ValueError:
print(f"Invalid CPU credit balance threshold value: {value}")
elif state == 'terminated':
# Delete all alarms for the instance
try:
# List all alarms for the instance
response = cloudwatch_client.describe_alarms()
for alarm in response['MetricAlarms']:
# Check if alarm belongs to this instance
if alarm['Dimensions'] and \
alarm['Dimensions'][0]['Name'] == 'InstanceId' and \
alarm['Dimensions'][0]['Value'] == instance_id:
# Delete the alarm
cloudwatch_client.delete_alarms(
AlarmNames=[alarm['AlarmName']]
)
print(f"Deleted alarm: {alarm['AlarmName']}")
except Exception as e:
print(f"Error deleting alarms: {str(e)}")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed EC2 instance state change')
}
代码中引用的 ALARM_CONFIGS 类,定义如下:
"""
Configuration module for CloudWatch alarms.
"""
# Dictionary containing configurations for different types of CloudWatch alarms
ALARM_CONFIGS = {
'cpu': {
'utilization': {
'name': 'CPU-Utilization',
'metric': 'CPUUtilization',
'namespace': 'AWS/EC2',
'comparison': 'GreaterThanThreshold',
'description': '{resource_id}: CPU utilization exceeds {threshold}%',
'dimensions_key': 'InstanceId'
},
'credit_balance': {
'name': 'CPU-Credit-Balance',
'metric': 'CPUCreditBalance',
'namespace': 'AWS/EC2',
'comparison': 'LessThanThreshold',
'description': '{resource_id}: CPU credit balance is below {threshold}',
'dimensions_key': 'InstanceId'
}
},
'network': {
'in_bytes': {
'name': 'Network-In',
'metric': 'NetworkIn',
'namespace': 'AWS/EC2',
'comparison': 'GreaterThanThreshold',
'description': '{resource_id}: Network in exceeds {threshold} bytes',
'dimensions_key': 'InstanceId'
},
'out_bytes': {
'name': 'Network-Out',
'metric': 'NetworkOut',
'namespace': 'AWS/EC2',
'comparison': 'GreaterThanThreshold',
'description': '{resource_id}: Network out exceeds {threshold} bytes',
'dimensions_key': 'InstanceId'
}
}
}
上述代码是创建告警的主要逻辑,简介如下:Lambda 函数接收到的 SNS 消息中包含当前 EC2 的状态和 EC2 的 Instance ID,如果状态为“running”时,依据 Instance ID 可以获取当前 EC2 实例的 Tag,根据 Tag 内容,例如 cpu_utilization=85 或者 cpu_credit_balance=200,创建对应的 CPU Utilization >= 85% 或者 cpu_credit_balance<=200 时的告警。下图展示了在创建 EC2 时,如何在 EC2 上添加相应 Tag:
其中 Tag Key(例如 cpu_utilization)与 CloudWatch Alarm 的映射关系在 alarm_config.py 中定义。用户可参照样例代码中的实例,来增加其他的告警信息。 下图是一个测试结果,我们在控制台上创建了一台 EC2 实例,并添加了 CPU 使用率的 Tag,在 CloudWatch Alarm 页面,可以看到如下结果(新的告警被创建):
在 EC2 instance 终止之后,Lambda 函数会获取到 SNS 消息,其中 EC2 状态为“terminated”,Lambda 函数根据 Instance ID,找到关联在该 EC2 实例上的所有 CloudWatch Alarm,一并删除。
结果与收益
通过这个解决方案,企业成功地为其所有运行的 EC2 实例批量创建了 CPU 使用量告警。这不仅提高了监控效率,还确保了所有实例的告警配置的一致性。企业现在可以更有效地监控其 亚马逊云科技 环境中的资源使用情况,并在出现问题时及时收到通知,从而提高整体系统的可靠性和稳定性。
此外,这个案例还展示了亚马逊云科技服务的灵活性和可扩展性。尽管 CloudWatch 的原生功能中没有直接支持批量添加告警,但通过使用 SDK 和 API,企业可以轻松地实现自定义的解决方案,满足其特定的业务需求。希望这个案例能够为其他面临类似挑战的企业提供借鉴,帮助他们更好地利用亚马逊云科技的功能提升业务价值。
本篇作者