亚马逊AWS官方博客

玩转 Amazon Cloudwatch(一):为 EC2 实例批量创建 CPU 使用率告警

引言

云时代企业为提升 IT 效能,常将业务迁移至公有云。某新能源头部企业因全球化布局及技术创新需求,计划将业务负载迁移至亚马逊云科技。迁移中面临的一项挑战:亚马逊云科技 Amazon CloudWatch 原生功能缺失 EC2 批量告警配置能力,对拥有海量实例的客户而言,手动逐台配置告警耗时且易错。因此可以采用脚本化的方式批量创建 EC2 CPU 告警,有效降低运维成本。

背景与挑战

该企业迁移至亚马逊云科技后,需监控大量 EC2 实例性能,尤其需为每台实例设置 CPU 使用率阈值告警,以实时预警资源瓶颈。尽管 CloudWatch 是其核心监控服务,却缺乏批量告警功能,需逐台配置。面对数十至上百台实例,手动逐台操作效率低且易引发配置遗漏或错误,直接影响监控覆盖率和系统稳定性。

解决方案设计

为了解决这一挑战,可以利用亚马逊云科技提供的软件开发工具包——Boto3,编写一个 Python 脚本来实现批量创建 EC2 CPU 使用量告警的功能。关于 Boto3 的介绍,可以参考 Boto3 介绍。通过使用 Boto3,我们可以使用编程方式来获取所有运行的 EC2 实例信息,并为每个实例创建自定义的 CPU 使用率告警。这样就可以批量的一次性为所有目标实例配置统一的告警规则,不仅节省了大量时间,还确保了配置的一致性和准确性。

对于新建的少数的 EC2,可以采用 EventBridge 监测 EC2 的 Running 和 Terminated 事件,继而触发 Lambda 的执行,根据EC2上所加 Tag 的信息,来自动的添加和删除相应的 CloudWatch Alarm。

Python 脚本实现

以下是相关的 Python 脚本,用于批量创建 EC2 CPU 使用率告警:

"""
author: RJ.Wang
Date: 2025-03-13
email: wangrenjun@gmail.com
Description: Batch creation of AWS EC2 CPU utilization alarms with TAG filtering
"""
import boto3
import logging
from botocore.exceptions import ClientError

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

AWS_REGION = "ap-southeast-1"  # 需要修改为您的 AWS 区域
sns_topic_arn = "arn:aws:sns:xxxxxx:xxxxxx:xxxxxx"  # 需要修改为您的 SNS 主题 ARN

# 若要对不同的EC2配置不同的告警参数,可根据TAG 来筛选和分组,KEY和VALUE 值均为 None 时,将对该区域所有为 running 状态的 EC2 进行操作
TAG_KEY = None
TAG_VALUE = None
# 下述示例表示具备 Monitor=yes 的Tag-Key Pair的EC2实例将被创建CloudWatch Alarm。
#TAG_KEY = "Monitor" 
#TAG_VALUE = "yes"

ec2_client = boto3.client("ec2", region_name=AWS_REGION)
cw_client = boto3.client("cloudwatch", region_name=AWS_REGION)
sts_client = boto3.client("sts")

# 获取账号ID并缓存
account_id = sts_client.get_caller_identity()["Account"]

def get_instance_ids():
    """
    获取所有运行的EC2实例ID,并根据TAG进行筛选
    """
    paginator = ec2_client.get_paginator("describe_instances")
    instance_ids = []
    try:
        for page in paginator.paginate():
            for reservation in page.get("Reservations", []):
                for instance in reservation.get("Instances", []):
                    instance_id = instance.get("InstanceId")
                    state = instance.get("State", {}).get("Name", "unknown")
                    tags = instance.get("Tags", [])
                    
                    # 检查实例是否符合TAG筛选条件
                    tag_match = True
                    if TAG_KEY and TAG_VALUE:
                        tag_match = False
                        for tag in tags:
                            if tag.get("Key") == TAG_KEY and tag.get("Value") == TAG_VALUE:
                                tag_match = True
                                break
                    
                    if state == "running" and tag_match:
                        instance_ids.append(instance_id)
                        logger.debug(f"Found running instance with matching TAG: {instance_id}")
                    else:
                        logger.debug(f"Skipping instance (state: {state}, TAG match: {tag_match}): {instance_id}")
    except ClientError as e:
        logger.error(f"Error getting instance IDs: {e}")
    except Exception as e:
        logger.error(f"Unexpected error getting instances: {e}")
    return instance_ids

def delete_existing_alarm(alarm_name):
    """
    删除已存在的同名报警
    """
    try:
        response = cw_client.describe_alarms(AlarmNames=[alarm_name])
        if response.get("MetricAlarms"):
            cw_client.delete_alarms(AlarmNames=[alarm_name])
            logger.info(f"Deleted existing alarm: {alarm_name}")
        else:
            logger.info(f"No existing alarm found: {alarm_name}")
    except ClientError as e:
        logger.error(f"Error deleting alarm {alarm_name}: {e}")

def create_cpu_alarm(instance_id, created_alarms):
    """
    为指定实例创建CPU使用率报警
    """
    alarm_name = f"CPU-{instance_id}-Alarm"
    try:
        delete_existing_alarm(alarm_name)
        cw_client.put_metric_alarm(
            AlarmName=alarm_name,
            MetricName="CPUUtilization",
            Namespace="AWS/EC2",
            Statistic="Average",
            Period=60,
            EvaluationPeriods=2,
            Threshold=85.0,
            ComparisonOperator="GreaterThanThreshold",
            AlarmActions=[sns_topic_arn],
            OKActions=[sns_topic_arn],
            AlarmDescription=f"Alarm for CPUUtilization on instance {instance_id}",  # 增加报警描述
            Dimensions=[{"Name": "InstanceId", "Value": instance_id}]
        )
        logger.info(f"Created CPU alarm: {alarm_name}")
        
        alarm_arn = f"arn:aws:cloudwatch:{AWS_REGION}:{account_id}:alarm:{alarm_name}"
        created_alarms.append(alarm_arn)
    except ClientError as e:
        logger.error(f"Error creating alarm for instance {instance_id}: {e}")

def main():
    """
    主函数,执行整个流程
    """
    try:
        logger.info("Starting program...")
        instance_ids = get_instance_ids()
        if not instance_ids:
            logger.info("No running instances found with matching TAG. Exiting.")
            return
        
        logger.info(f"Found {len(instance_ids)} running instances with matching TAG: {instance_ids}")
        
        created_alarms = []
        for instance_id in instance_ids:
            create_cpu_alarm(instance_id, created_alarms)
        
        total_alarms = len(created_alarms)
        logger.info(f"\nTotal new alarms created: {total_alarms}")
        logger.info("Alarm ARNs:")
        for arn in created_alarms:
            logger.info(arn)
        
        logger.info("Program completed successfully.")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}", exc_info=True)

if __name__ == "__main__":
    main()

脚本解析

  1. 初始化客户端:脚本首先初始化了 EC2、CloudWatch 和 STS 客户端,用于与相应的 云服务进行交互。通过 STS 客户端获取当前账号的 ID,以便后续构造报警的 ARN。
  2. 获取运行的实例 ID:get_instance_ids 函数使用 EC2 客户端的分页器获取所有运行状态的 EC2 实例 ID。它遍历每个页面的实例,筛选出状态为 “running” 的实例,并收集它们的 ID。若需要对不同的 EC2 设置不同的告警参数,可根据 TAG 来筛选和分组。
  3. 删除现有报警:在为每个实例创建新报警之前,delete_existing_alarm 函数会检查是否存在同名的现有报警。如果存在,则删除它,以避免冲突。
  4. 创建 CPU 报警:create_cpu_alarm 函数负责创建 CPU 使用率报警。它定义了报警的名称、指标名称、命名空间、统计方式、周期、评估周期、阈值、比较运算符、报警动作、OK 动作、描述以及维度等参数。通过调用 CloudWatch 客户端的 put_metric_alarm 方法,将报警规则发送到 亚马逊云科技 服务。
  5. 主函数执行流程:main 函数作为程序的入口,协调整个流程。它首先获取运行的实例 ID 列表,然后为每个实例调用 create_cpu_alarm 函数创建报警,并收集创建的报警 ARN,最后输出总结信息。

配置 SNS 主题和订阅

为了确保告警通知能够及时发送到指定的电子邮件地址,我们需要配置 SNS 主题和订阅。以下是详细的步骤:

1. 创建 SNS 主题:

  • 登录 亚马逊云科技 管理控制台,选择 SNS 服务。
  • 在左侧导航栏中,选择 **主题**,然后点击 **创建主题**。
  • 输入主题名称(例如:ec2-cpu-alarms)和显示名称(可选),然后点击 **创建主题**。
  • 记录下创建的主题 ARN,例如:arn:aws:sns:ap-southeast-1:123456789012:ec2-cpu-alarms

2. 订阅 SNS 主题:

  • 在 SNS 控制台中,选择刚才创建的主题,点击 订阅 选项卡,然后点击 **创建订阅**。
  • 在 协议 下拉菜单中选择 **电子邮件**。
  • 在 终端节点 中输入要接收告警通知的电子邮件地址。
  • 点击 **创建订阅**。

3. 确认订阅:

  • 亚马逊云科技 会向指定的电子邮件地址发送一封确认邮件。
  • 打开邮件,点击其中的 确认订阅 链接,完成订阅过程。
  • 确认后,该电子邮件地址将能够接收来自 SNS 主题的告警通知。

实施步骤

1. 配置 SNS 主题和订阅:按照上述步骤创建 SNS 主题,并通过电子邮件地址订阅该主题,确保订阅已确认。

2. 更新脚本中的变量:在脚本中,将 AWS_REGION 变量的值替换为您的 亚马逊云科技的相关区域,将 sns_topic_arn 变量的值替换为您创建的 SNS 主题 ARN。

3. 在亚马逊云科技 CloudShell 中测试脚本:

  • 登录到亚马逊云科技管理控制台,使用具有必要权限的 IAM 账户进行登录。确保该账户具有访问亚马逊云科技 CloudShell 和使用相关亚马逊云科技服务的权限。
  • 在亚马逊云科技管理控制台中,启动亚马逊云科技 CloudShell。您可以通过点击控制台右上角的 CloudShell 图标来启动它。
  • 在 CloudShell 中,直接将脚本复制到 CloudShell 环境中。
  • 运行脚本:在 CloudShell 中运行 python create_ec2_alarms.py 命令。

4. 验证结果:运行脚本后,企业可以在亚马逊云科技管理控制台的 CloudWatch 服务中查看新创建的报警。检查每个实例是否都有对应的 CPU 报警,并验证报警的配置是否符合预期。

5. 收到的告警 mail 通知

新增 EC2 实例告警创建与删除

在实际工作中,批量添加告警信息后,还会有少量的新的计算实例被拉起或终止,此时,这些实例也需要根据企业运维要求增加或者删除相应告警,为了解决这些 EC2 实例的告警问题,本文还提出了另外一种方法来解决该问题。该方法架构图如下所示:

首先创建 EventBridge 规则,检测 EC2 状态的变更,注意选择 Rule with an event pattern,如下图所示:

其中,Event pattern 选择 EC2 Instance State-change Notification,注意选择“running”和‘terminated’状态,原因是需要在 EC2 实例创建时和删除时,自动创建相应告警。如下图所示:

接下来,选择将 EventBridge 的事件发送到一个 SNS Topic,注意改 topic 需要预先建好。关于如何创建 SNS Topic,可以参考文档创建 Amazon SNS 主题

SNS Topic 收到相关 EC2 状态变化的时间通知后,会调用 Lambda 函数来进行告警的创建和删除(此处要求为 SNS Topic 创建一个订阅,订阅的 Endpoint 需要选择对应的 Lambda 函数)。如下图所示:

相关 Lambda 函数的代码如下:

import boto3
from alarm_config import ALARM_CONFIGS
import os
import json

# Initialize AWS clients
ec2_client = boto3.client('ec2')
cloudwatch_client = boto3.client('cloudwatch')

def create_cloudwatch_alarm(resource_id, metric_category, metric_type, threshold):
    """
    Create CloudWatch alarm for various metrics (CPU, Network, etc.).
    
    Args:
        resource_id: Resource ID (e.g., EC2 instance ID)
        metric_category: Category of metric (e.g., 'cpu', 'network')
        metric_type: Type of metric (e.g., 'utilization', 'in_bytes')
        threshold: Alarm threshold value
    """
    try:
        if metric_category not in ALARM_CONFIGS:
            raise ValueError(f"Invalid metric category: {metric_category}")
        
        category_config = ALARM_CONFIGS[metric_category]
        if metric_type not in category_config:
            raise ValueError(f"Invalid metric type: {metric_type}")

        config = category_config[metric_type]
        description = config['description'].format(resource_id=resource_id, threshold=threshold)
        
        cloudwatch_client.put_metric_alarm(
            AlarmName=f'{resource_id}-{config["name"]}',
            ComparisonOperator=config["comparison"],
            EvaluationPeriods=1,
            MetricName=config["metric"],
            Namespace=config["namespace"],
            Period=300,  # 5 minutes
            Statistic='Average',
            Threshold=threshold,
            ActionsEnabled=True,
            AlarmDescription=description,
            Dimensions=[
                {
                    'Name': config["dimensions_key"],
                    'Value': resource_id
                }
            ]
        )
    except ValueError as e:
        print(f"Invalid configuration: {str(e)}")

def lambda_handler(event, context):
    # Parse the EventBridge event
    print(f"Received event: {json.dumps(event)}")
    
    # Extract instance details from the event
    sns_message = event['Records'][0]['Sns']['Message']
    message_data = json.loads(sns_message)
    
    # Extract the instance ID and state
    instance_id = message_data['detail']['instance-id']
    state = message_data['detail']['state']
    
    if state == 'running':
        # Get instance tags
        response = ec2_client.describe_tags(
            Filters=[
                {
                    'Name': 'resource-id',
                    'Values': [instance_id]
                }
            ]
        )
        
        for tag in response['Tags']:
            key = tag['Key']
            value = tag['Value']
            
            if key == 'cpu_utilization':
                try:
                    create_cloudwatch_alarm(instance_id, 'cpu', 'utilization', float(value))
                except ValueError:
                    print(f"Invalid CPU threshold value: {value}")
            
            if key == 'cpu_credit_balance':
                try:
                    create_cloudwatch_alarm(instance_id, 'cpu', 'credit_balance', float(value))
                except ValueError:
                    print(f"Invalid CPU credit balance threshold value: {value}")
    
    elif state == 'terminated':
        # Delete all alarms for the instance
        try:
            # List all alarms for the instance
            response = cloudwatch_client.describe_alarms()
            for alarm in response['MetricAlarms']:
                # Check if alarm belongs to this instance
                if alarm['Dimensions'] and \
                   alarm['Dimensions'][0]['Name'] == 'InstanceId' and \
                   alarm['Dimensions'][0]['Value'] == instance_id:
                    # Delete the alarm
                    cloudwatch_client.delete_alarms(
                        AlarmNames=[alarm['AlarmName']]
                    )
                    print(f"Deleted alarm: {alarm['AlarmName']}")
        except Exception as e:
            print(f"Error deleting alarms: {str(e)}")
    
    return {
        'statusCode': 200,
        'body': json.dumps('Successfully processed EC2 instance state change')
    }

代码中引用的 ALARM_CONFIGS 类,定义如下:

"""
Configuration module for CloudWatch alarms.
"""

# Dictionary containing configurations for different types of CloudWatch alarms
ALARM_CONFIGS = {
    'cpu': {
        'utilization': {
            'name': 'CPU-Utilization',
            'metric': 'CPUUtilization',
            'namespace': 'AWS/EC2',
            'comparison': 'GreaterThanThreshold',
            'description': '{resource_id}: CPU utilization exceeds {threshold}%',
            'dimensions_key': 'InstanceId'
        },
        'credit_balance': {
            'name': 'CPU-Credit-Balance',
            'metric': 'CPUCreditBalance',
            'namespace': 'AWS/EC2',
            'comparison': 'LessThanThreshold',
            'description': '{resource_id}: CPU credit balance is below {threshold}',
            'dimensions_key': 'InstanceId'
        }
    },
    'network': {
        'in_bytes': {
            'name': 'Network-In',
            'metric': 'NetworkIn',
            'namespace': 'AWS/EC2',
            'comparison': 'GreaterThanThreshold',
            'description': '{resource_id}: Network in exceeds {threshold} bytes',
            'dimensions_key': 'InstanceId'
        },
        'out_bytes': {
            'name': 'Network-Out',
            'metric': 'NetworkOut',
            'namespace': 'AWS/EC2',
            'comparison': 'GreaterThanThreshold',
            'description': '{resource_id}: Network out exceeds {threshold} bytes',
            'dimensions_key': 'InstanceId'
        }
    }
}

上述代码是创建告警的主要逻辑,简介如下:Lambda 函数接收到的 SNS 消息中包含当前 EC2 的状态和 EC2 的 Instance ID,如果状态为“running”时,依据 Instance ID 可以获取当前 EC2 实例的 Tag,根据 Tag 内容,例如 cpu_utilization=85 或者 cpu_credit_balance=200,创建对应的 CPU Utilization >= 85% 或者 cpu_credit_balance<=200 时的告警。下图展示了在创建 EC2 时,如何在 EC2 上添加相应 Tag:

其中 Tag Key(例如 cpu_utilization)与 CloudWatch Alarm 的映射关系在 alarm_config.py 中定义。用户可参照样例代码中的实例,来增加其他的告警信息。 下图是一个测试结果,我们在控制台上创建了一台 EC2 实例,并添加了 CPU 使用率的 Tag,在 CloudWatch Alarm 页面,可以看到如下结果(新的告警被创建):

在 EC2 instance 终止之后,Lambda 函数会获取到 SNS 消息,其中 EC2 状态为“terminated”,Lambda 函数根据 Instance ID,找到关联在该 EC2 实例上的所有 CloudWatch Alarm,一并删除。

结果与收益

通过这个解决方案,企业成功地为其所有运行的 EC2 实例批量创建了 CPU 使用量告警。这不仅提高了监控效率,还确保了所有实例的告警配置的一致性。企业现在可以更有效地监控其 亚马逊云科技 环境中的资源使用情况,并在出现问题时及时收到通知,从而提高整体系统的可靠性和稳定性。

此外,这个案例还展示了亚马逊云科技服务的灵活性和可扩展性。尽管 CloudWatch 的原生功能中没有直接支持批量添加告警,但通过使用 SDK 和 API,企业可以轻松地实现自定义的解决方案,满足其特定的业务需求。希望这个案例能够为其他面临类似挑战的企业提供借鉴,帮助他们更好地利用亚马逊云科技的功能提升业务价值。

本篇作者

汪仁君

伟仕佳杰-云计算事业部-高级工程师,拥有 20+ 年的 IT 从业经验。致力于云计算产品解决方案在企业中的推广和应用,云端架构方案制定,方案讲解演示,现场答疑,项目 POC,落地实施,交付和维护。

宋孜攀

亚马逊云科技解决方案架构师,拥有多年软件研发及云计算咨询经验,负责基于亚马逊云科技的云计算方案的架构设计,同时致力于亚马逊云科技的云服务在移动应用与互联网行业的应用和推广。

刘冀

亚马逊云科技合作伙伴解决方案架构师,负责亚马逊云科技合作伙伴的技术赋能及解决方案的架构设计与落地。在企业数字化转型、云原生架构和生成式 AI 领域拥有丰富的实践经验。与合作伙伴紧密协作,帮助客户分析业务需求,提供基于亚马逊云科技的最佳实践方案。