Amazon Web Services ブログ
AWS Lambda 高耐久関数を使用して複数ステップのアプリケーションと AI ワークフローを構築
現代のアプリケーションでは、複数段階の支払い処理、AI エージェントのオーケストレーション、または人間の決定を待つ承認プロセスなど、サービス間の複雑で長期にわたる調整がますます必要になっています。従来、これらを構築するには、状態管理を実装し、障害を処理し、複数のインフラストラクチャサービスを統合するために多大な労力が必要でした。
2025 年 12 月 2 日より、AWS Lambda の耐久性のある関数を使用して、使い慣れた AWS Lambda エクスペリエンス内で信頼性の高いマルチステップアプリケーションを直接構築できます。耐久性のある関数とは、すでにご存知のものと同じイベントハンドラーと統合を備えた通常の Lambda 関数です。任意のプログラミング言語でシーケンシャルコードを記述すれば、耐久性のある関数が進行状況を追跡し、障害発生時に自動的に再試行し、定義された時点で最大 1 年間実行を一時停止します。待機中のアイドルコンピューティングの費用を支払う必要はありません。
AWS Lambda の高耐久関数は、耐久実行と呼ばれるチェックポイントとリプレイのメカニズムを使用してこれらの機能を提供します。永続実行のための関数を有効にしたら、新しいオープンソースの永続実行 SDK を関数コードに追加します。次に、「steps」などの SDK プリミティブを使用してビジネスロジックに自動チェックポイントとリトライを追加し、「waits」を使用して計算料金なしで実行を効率的に中断します。実行が予期せず終了すると、Lambda は最後のチェックポイントから再開し、完了した操作をスキップしながらイベントハンドラーを最初からリプレイします。
AWS Lambda 高耐久関数の使用開始
耐久性のある関数の使用方法を説明します。
まず、コンソールで Lambda 関数を作成し、[ゼロから作成者] を選択します。[永続実行] セクションで、[有効化] を選択します。耐久性のある関数設定は関数の作成時にのみ設定でき、既存の Lambda 関数では現在変更できないことにご注意ください。

Lambda 高耐久関数を作成したら、提供されているコードを使用して作業を開始できます。

Lambda 高耐久関数には、状態管理と回復を処理する 2 つのコアプリミティブが導入されています。
- ステップ —
context.step()メソッドは、ビジネスロジックに自動再試行とチェックポイントを追加します。ステップが完了すると、リプレイ中はスキップされます。 - 待機 —
context.wait()メソッドは、指定された時間だけ実行を一時停止し、関数を終了し、計算料金なしで実行を一時停止して再開します。
さらに、Lambda の高耐久関数には、より複雑なパターンに対応するオペレーションが他にも用意されています。create_callback() は API レスポンスや人的承認などの外部イベントの結果を待つために使用できるコールバックを作成し、wait_for_condition() は特定の条件が満たされる (たとえば REST API をポーリングしてプロセスが完了する) まで一時停止します。また、parallel() または map() オペレーションは高度な同時実行ユースケースに利用できます。
本番稼働準備が整った注文処理ワークフローの構築
次に、デフォルトの例を拡張して、本番稼働環境ですぐに使える注文処理ワークフローを構築しましょう。これは、外部承認にコールバックを使用し、エラーを適切に処理し、再試行戦略を設定する方法を示しています。これらのコアコンセプトに焦点を当てるために、コードは意図的に簡潔にしています。完全に実装すると、Amazon Bedrock を使用して検証ステップを強化し、AI を活用した注文分析を追加することができます。
注文処理ワークフローの仕組みは次のとおりです。
- 最初に
validate_order()は注文データをチェックして、すべての必須フィールドが存在することを確認します。 - 次に、
send_for_approval()は外部からの承認を求める命令を送信し、コールバック応答を待って、コンピューティング料金なしで実行を一時停止します。 - その後、
process_order()は注文処理を完了します。 - ワークフロー全体を通して、try-catch エラー処理は、実行をすぐに停止するターミナルエラーと、自動再試行をトリガーするステップ内の回復可能なエラーを区別します。
ステップ定義とメインハンドラーを含む完全な注文処理ワークフローは次のとおりです。
import random
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import (
Duration,
StepConfig,
CallbackConfig,
)
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)
@durable_step
def validate_order(step_context: StepContext, order_id: str) -> dict:
"""Validates order data using AI."""
step_context.logger.info(f"Validating order: {order_id}")
# 本番稼働: Amazon Bedrock を呼び出して注文の完全性と正確性を検証
return {"order_id": order_id, "status": "validated"}
@durable_step
def send_for_approval(step_context: StepContext, callback_id: str, order_id: str) -> dict:
"""Sends order for approval using the provided callback token."""
step_context.logger.info(f"Sending order {order_id} for approval with callback_id: {callback_id}")
# 本番稼働: callback_id を外部承認システムに送信
# 外部システムは Lambda SendDurableExecutionCallbackSuccess を呼び出すか
# 承認が完了したらこの callback_id を使って SendDurableExecutionCallbackFailure API を送信
return {
"order_id": order_id,
"callback_id": callback_id,
"status": "sent_for_approval"
}
@durable_step
def process_order(step_context: StepContext, order_id: str) -> dict:
"""Processes the order with retry logic for transient failures."""
step_context.logger.info(f"Processing order: {order_id}")
# 時々失敗する不安定な API をシミュレート
if random.random() > 0.4:
step_context.logger.info("Processing failed, will retry")
raise Exception("Processing failed")
return {
"order_id": order_id,
"status": "processed",
"timestamp": "2025-11-27T10:00:00Z",
}
@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
try:
order_id = event.get("order_id")
# ステップ 1: 注文を検証
validated = context.step(validate_order(order_id))
if validated["status"] != "validated":
raise Exception("Validation failed") # ターミナルエラー - 実行を停止
context.logger.info(f"Order validated: {validated}")
# ステップ 2: コールバックを作成
callback = context.create_callback(
name="awaiting-approval",
config=CallbackConfig(timeout=Duration.from_minutes(3))
)
context.logger.info(f"Created callback with id: {callback.callback_id}")
# ステップ 3: callback_id を使用して承認リクエストを送信
approval_request = context.step(send_for_approval(callback.callback_id, order_id))
context.logger.info(f"Approval request sent: {approval_request}")
# ステップ 4: コールバックの結果を待つ
# これは、外部システムが SendDurableExecutionCallbackSuccess または SendDurableExecutionCallbackFailure を呼び出すまでブロックされる
approval_result = callback.result()
context.logger.info(f"Approval received: {approval_result}")
# ステップ 5: カスタム再試行戦略による注文を処理
retry_config = RetryStrategyConfig(max_attempts=3, backoff_rate=2.0)
processed = context.step(
process_order(order_id),
config=StepConfig(retry_strategy=create_retry_strategy(retry_config)),
)
if processed["status"] != "processed":
raise Exception("Processing failed") # ターミナルエラー
context.logger.info(f"Order successfully processed: {processed}")
return processed
except Exception as error:
context.logger.error(f"Error processing order: {error}")
raise error # 再発生して実行を失敗させる
このコードは、いくつかの重要な概念を示しています。
- エラー処理 — try-catch ブロックはターミナルエラーを処理します。未処理の例外がステップの外に投げられた場合 (検証チェックなど)、実行はすぐに終了します。これは、注文データが無効であるなど、再試行しても意味がない場合に役立ちます。
- ステップ再試行 —
process_orderステップ内では、例外によってデフォルト (ステップ 1) または設定されたRetryStrategy(ステップ 5) に基づいて自動再試行がトリガーされます。これにより、一時的な API が使用できなくなるなどの一時的な障害が処理されます。 - ログ記録 — メインハンドラーには
context.loggerを使用し、ステップ内ではstep_context.loggerを使用します。コンテキストロガーは再生中の重複ログを抑制します。
次に、order_id を使用してテストイベントを作成し、関数を非同期で呼び出して注文ワークフローを開始します。[テスト] タブに移動し、オプションの耐久性のある実行名を入力して、この実行を識別します。なお、耐久性のある関数にはべき等性が組み込まれています。同じ実行名で関数を 2 回呼び出すと、2 回目の呼び出しでは複製を作成せずに既存の実行結果が返されます。

Lambda コンソールの [Durable 実行] タブに移動すると、実行状況をモニタリングできます。

ここでは、各ステップのステータスとタイミングを確認できます。実行すると、CallbackStarted の後に InvocationCompleted と表示されます。これは、承認コールバックを待っている間にアイドル料金が発生しないように、関数が終了し、実行が一時停止されたことを示します。

これで、コンソールから [送信成功] または [送信失敗] を選択するか、プログラムで Lambda API を使用してコールバックを完了できるようになりました。

[送信成功] を選択します。

コールバックが完了すると、実行が再開され、注文が処理されます。シミュレートされた不安定な API が原因で process_order ステップが失敗すると、設定した戦略に基づいて自動的に再試行されます。すべての再試行が成功すると、実行は正常に完了します。

Amazon EventBridge による実行のモニタリング
Amazon EventBridge を使用して永続的な関数の実行をモニタリングすることもできます。Lambda は実行ステータス変更イベントをデフォルトのイベントバスに自動的に送信するため、ダウンストリームのワークフローを構築したり、通知を送信したり、他の AWS サービスと統合したりできます。
これらのイベントを受信するには、デフォルトのイベントバスで次のパターンを使用して EventBridge ルールを作成します。
{
"source": ["aws.lambda"],
"detail-type": ["Durable Execution Status Change"]
}
知っておくべきこと
留意点は以下のとおりです。
- 可用性 — Lambda 高耐久関数が米国東部 (オハイオ) AWS リージョンで利用できるようになりました。最新のリージョンの可用性については、AWS Capabilities by Region ページをご覧ください。
- プログラミング言語サポート — 起動時、AWS Lambda 高耐久関数は JavaScript/TypeScript (Node.js 22/24) と Python (3.13/3.14) をサポートしています。お好みのパッケージマネージャーを使用して、耐久性のある実行 SDK を関数コードにバンドルすることをお勧めします。SDK は動きが速いため、新機能が利用可能になったときに依存関係を簡単に更新できます。
- Lambda バージョンの使用 — 耐久性のある関数を本番稼働環境にデプロイする場合は、Lambda バージョンを使用して、常に同じコードバージョンで再生が行われるようにしてください。実行が中断されている間に関数コードを更新すると、リプレイでは実行を開始したバージョンが使用されるため、長時間実行されるワークフローでのコード変更による不整合を防ぐことができます。
- 耐久性のある関数のテスト — より複雑な統合テストには、pytest 統合を備えた個別のテスト SDK と AWS サーバーレスアプリケーションモデル (AWS SAM) のコマンドラインインターフェイス (CLI) を使用して、AWS 認証情報なしで耐久性のある関数をローカルでテストできます。
- オープンソース SDK —高耐久性 SDK は、JavaScript/TypeScript および Python 向けのオープンソースです。ソースコードを確認したり、改善に貢献したり、最新の機能について最新情報を入手したりできます。
- 料金 — AWS Lambda 高耐久関数の料金の詳細については、AWS Lambda の料金表ページを参照してください。
AWS Lambda コンソールにアクセスして、AWS Lambda 高耐久関数を使い始めます。詳細については、AWS Lambda 高耐久関数のドキュメントページを参照してください。
構築がうまくいきますように!
– Donnie
原文はこちらです。