Amazon Web Services ブログ

AWS AppSync Events と Powertools for AWS の統合を簡素化

リアルタイム機能は、ユーザーがすぐに更新されたり双方向の体験を求める現代のアプリケーションにおいて不可欠となっています。チャットアプリ、ライブダッシュボード、ゲームのリーダーボード、IoT システムなどを構築する場合、AWS AppSync Events は WebSocket API を通じてこれらのリアルタイム機能を実現し、スケーリングや接続管理を気にかけることなく、スケーラブルで高性能なリアルタイムアプリケーションを構築できるようにしています。

AWS Lambda 用の Powertools は、監視、バッチ処理、AWS Systems Manager Parameter Store 統合、冪等性、フィーチャーフラグ、Amazon CloudWatch メトリクス、構造化ログなどを含む開発者向けツールキットです。Powertools for AWS は、Python、TypeScript、.NET で提供される新しい AppSyncEventsResolver を通じて、AppSync Events をサポートするようになりました。この新機能により、ビジネスロジックに集中できるように設計された機能が強化され、開発体験が向上します。AppSyncEventsResolver は、イベントの処理のためのシンプルで一貫したインターフェイスを提供し、イベントのフィルタリング、変換、ルーティングなどの一般的なパターンに対する組み込みサポートも提供されます。

この記事では、TypeScript の例を見ますが、Powertools for AWS (Python)Powertools for AWS (.NET) を使えば、Python と .NET 関数でも同じ機能を利用できます。

Real-time event handling architecture using AWS AppSync, Lambda, and Powertools

図 1 – AWS AppSync、Lambda、および Powertools を使用したリアルタイムイベントハンドリングアーキテクチャー

本ブログでは、以下の点を学びます。

  • AppSyncEventsResolver を使用してイベントハンドラを設定する
  • 最適なパフォーマンスのために、さまざまなイベント処理パターンを実装する
  • パターンベースのルーティングを使用して、イベントハンドラを整理する
  • 一般的な使用パターンに対して、組み込み機能を活用する

始め方

AppSyncEventsResolver は、AWS Lambda 関数内での AppSync Events を処理する簡単で宣言的な方法です。このイベントリゾルバーによって、PUBLISHイベントとSUBSCRIBEイベントをリッスンできます。PUBLISHイベントは、クライアントがチャネルにメッセージを送信するときに発生します。一方、SUBSCRIBEイベントは、クライアントがチャネルをサブスクリプションしようとしたときに発生します。異なる名前空間やチャネルのハンドラを登録することで、イベント駆動型の通信を管理できます。

次に、作業を開始してさまざまな開発体験を向上させるための主要機能について見ていきましょう。AppSyncEventsResolver を設定する基本的な例は次のとおりです。

import {
  AppSyncEventsResolver,
  UnauthorizedException,
} from '@aws-lambda-powertools/event-handler/appsync-events';

// Types for our message handling 
 type ChatMessage = {
    userId: string ;
    content: string ;
}

// Simple authorization check 
 const isAuthorized = (path: string, userId ?: string): boolean => {
    // check against your authorization system 
    if (path.startsWith('/chat/private') && ! userId) {
        return false ;
    }
    return true ;
};

// Message processing logic 
 const processMessage = async (payload: ChatMessage) => {
    // - Validate message content 
    // - Store in database 
    // - Enrich with additional data 
    return {
        ...payload,
        timestamp: new Date().toISOString()
    };
};

 const app = new AppSyncEventsResolver();

// Handle publish events for a specific channel 
 app.onPublish('/chat/general', async (payload: ChatMessage) => {
    // Process and return the message 
    return processMessage(payload);
});

// Handle subscription events for all channels 
 app.onSubscribe('/*', async (info) => {
    const {
        channel: { path },
        request,
    } = info ;

    // Perform access control checks 
    if (! isAuthorized(path, userId)) {
        throw new UnauthorizedException(`not allowed to subscribe to ${ path } `);
    }

    return true ;
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

AppSyncEventsResolver クラスは、受信したイベントデータを解析し、イベントの種類に応じて適切なハンドラメソッドを呼び出します。ここで何が起こっているのかを解説しましょう。

パターンベースのルーティング

AppSyncEventsResolver は、チャネルパスに基づいてイベントハンドラを整理できる直感的なパターンベースのルーティングシステムを使用しています。以下のことが可能です:

  • 特定のチャネルを処理する(/chat/general)
  • 名前空間にワイルドカードを使用する(/chat/*)
  • グローバルなキャッチオールハンドラを作成する(/*)
import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';

 const app = new AppSyncEventsResolver();

// Specific channel handler 
 app.onPublish('/notifications/alerts', async (payload) => {
    // your logic here 
});

// Handle all channels in the notifications namespace 
 app.onPublish('/notifications/*', async (payload) => {
    // your logic here 
});

// Global catch-all for unhandled channels 
 app.onPublish('/*', async (payload) => {
    // your logic here 
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

最も一般的なキャッチオールハンドラは /* で、これはどの名前空間やチャネルにもマッチします。一方、/default/* は、default 名前空間のあらゆるチャネルにマッチします。複数のハンドラが同じイベントにマッチする場合は、ライブラリは最も具体的なハンドラを呼び出し、より一般的なハンドラは無視されます。たとえば、/default/channel1 に対してハンドラが登録されており、さらに /default/* にもハンドラが登録されている場合、Powertools は /default/channel1 にマッチするイベントでは最初のハンドラを呼び出し、2 番目のハンドラは無視されます。このアプローチにより、イベントがどのように処理されるかを制御し、不要な処理を回避できます。デフォルトでは、Powertools はどのハンドラとも合致しないイベントについては、そのままイベントを返し、警告をログに記録します。つまり、そういったイベントは変更されずにそのまま渡されます。このアプローチは、特定のイベントに対してはカスタムロジックを適用しつつ、他のイベントはデフォルトの挙動で処理できるため、徐々にライブラリを適用していくのに役立ちます。

Subscription ハンドリング

Powertools はサブスクリプションイベントを処理する簡単な方法も提供します。受信したイベントを自動的に解析し、イベントタイプに基づいて適切なハンドラを呼び出します。デフォルトでは、Lambda ハンドラがエラーをスローしたり要求を明示的に拒否しない限り、AppSync はサブスクリプションを許可します。サブスクリプションが拒否されると、AppSync はクライアントに 4xx レスポンスを返し、サブスクリプションが確立されるのを防ぎます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';
 import { Metrics, MetricUnit } from '@aws-lambda-powertools/metrics';
 import type { Context } from 'aws-lambda';

 const metrics = new Metrics({
  namespace: 'serverlessAirline',
  serviceName: 'chat',
  singleMetric: true,
});
 const app = new AppSyncEventsResolver();

 app.onSubscribe('/default/foo', (event) => {
  metrics.addDimension('channel', event.info.channel.path);
  metrics.addMetric('connections', MetricUnit.Count, 1);
});

 export const handler = async (event: unknown, context: Context) =>
  app.resolve(event, context);

サブスクリプションイベントが到着すると、このライブラリはイベントオブジェクトを第 1 引数としてハンドラを呼び出します。アクセス制御チェックの実行など、サブスクリプションイベントに基づいて必要な処理を行えます。

app.onSubscribe('/private/*', async (info) => {
  const userGroups =
    info.identity?.groups && Array.isArray(info.identity?.groups)
      ?info.identity ?.groups 
      : [] ;
  const channelGroup = 'premium-users';

  if (!userGroups.includes(channelGroup)) {
    throw new UnauthorizedException(
      `Subscription requires ${ channelGroup } group membership` 
    );
  }
})

サブスクリプションイベントは同じマッチングルールに従い、イベントとコンテキストへの完全なアクセスを提供します。ワイルドカード * 文字を使用して任意の名前空間やチャンネルに対するキャッチオールハンドラを登録でき、ハンドラ内でイベントとコンテキストオブジェクトに完全にアクセスすることもできます。

イベントとコンテキストの完全なアクセス

リゾルバーがイベントハンドリングを簡素化しますが、必要に応じてイベントとコンテキストオブジェクトに完全にアクセスできます。これは、リクエストヘッダーや Lambda コンテキストの残り実行時間など、カスタムロジックを実装するために追加情報が必要な場合に役立ちます。

リゾルバーは、全てのイベントとコンテキストを第 2 引数と第 3 引数として各ハンドラに渡します。これにより、既存のコードを変更することなく、関連する全ての情報にアクセスできます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';
 import { Logger } from '@aws-lambda-powertools/logger';

 const logger = new Logger({
  logLeveL: 'INFO',
  serviceName: 'serverlessAirline'
});
 const app = new AppSyncEventsResolver({ logger });

 app.onPublish('/orders/process', async (payload, event, context) => {
    // Access request headers 
    const { headers } = event.request ;
    
    // Access Lambda context 
    const { getRemainingTimeInMillis } = context ;

    logger.info('Processing event details', {
        headers,
        remainingTime: getRemainingTimeInMillis()
    });

    return payload ;
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

エラーハンドリング

AppSyncEventsResolver には、Lambda 関数の障害を防ぎながら、エラーが適切に AppSync に通知されるよう組み込みのエラー処理機能があります。AppSync はそれらのエラーをクライアントに伝播させます。ハンドラでエラーが発生した場合、リゾルバ は Lambda の呼び出し全体を失敗させるのではなく、そのエラーをイベントごとのレスポンスペイロードに含めます。

このアプローチにより、Lambda 関数は実行を継続しながら、AppSync に適切にフォーマットされたエラーメッセージを提供します。複数のイベントを処理する場合、1つのイベントが失敗しても、他のイベントは正常に処理を続けます。これは、あるイベントのエラーが他のイベントの処理に影響を与えないようにしたい並列処理シナリオで特に役立ちます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';

 const app = new AppSyncEventsResolver();

 app.onPublish('/messages', async (payload) => {
    // If message contains "error", throw an exception 
    if (payload.message === "error") {
        throw new Error("Invalid message");
    }
    return payload ;
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

// When processing this event:
// {
//     "id": "123",
//     "payload": {
//         "message": "error"
//     }
// }

// The resolver will return:
// {
//     "id": "123",
//     "error": "Error - Invalid message"
// }

高度なパターンとベストプラクティス

AppSyncEventsResolver には、堅牢でメンテナンス性の高いリアルタイムアプリケーションを構築するのに役立つ、高度な機能が追加されています。これらの機能とその効果的な利用方法を見ていきましょう。

パブリッシュ処理について

デフォルトでは、メッセージごとにルートハンドラを 1 回呼び出します。Powertools がメッセージの反復処理やイベントレスポンス形式の変換を行うため、ビジネスロジックに集中し、定型コードを書く必要がありません。あとはペイロードとして使いたい値を返すか、そのメッセージでエラーをスローするだけです。ライブラリがペイロードを正しいイベント ID と関連付けます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';
 import { Metrics, MetricUnit } from '@aws-lambda-powertools/metrics';

 type SensorReading = {
    deviceId: string ;
    temperature: number ;
    humidity: number ;
    timestamp: string ;
}

 const app = new AppSyncEventsResolver();
 const metrics = new Metrics({ namespace: 'SensorReadings' });

 app.onPublish('/sensors/readings', async (payload: SensorReading) => {
    // Process each sensor reading independently 
    if (payload.temperature > 100) {
        metrics.addDimension('alertType', 'highTemperature');
        metrics.addMetric('HighTemperature', MetricUnit.Count, 1);
        throw new Error('Temperature reading too high');
    }

    // Enrich the payload with processing timestamp 
    return {
        ...payload,
        processed: true,
        processedAt: new Date().toISOString()
    };
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

このパターンは、単一のイベントに対するロジックのみを記述すればよいため、開発を簡素化します。Powertools が残りを自動的に処理します。

集約処理

集約モードでは、イベントを個別に処理するのではなく、複数のイベントを単一のバッチとして処理することができます。これは、データベースへの複数のクエリを1回の操作で送信するなど、リソース使用を最適化したい場合や、処理前に複数のイベントをまとめて分析したい場合に特に役立ちます。どちらのモードでもイベント処理を完全に制御できますが、集約モードでは一度にイベントのリスト全体にアクセスできます。

これを実現するには、aggregate オプションを true に設定します。このモードを使用すると、リゾルバーはイベントのリスト全体を1回の呼び出しでハンドラに送信し、バッチとして処理することができます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';

 const app = new AppSyncEventsResolver();

 app.onPublish('/default/*', async (events) => {
  const results = [] ;
  for (const event of events) {
    try {
      results.push(await handleDefaultNamespaceCatchAll(event));
    } catch (error) {
      results.push({
        error: {
          errorType: 'Error',
          message: error.message,
        },
        id: event.id,
      });
    }
  }

  return results ;
}, {
  aggregate: true,
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

集約オプションはパブリッシュイベントにのみ使用可能であり、このオプションを使用する場合は、イベントの処理と適切なレスポンスの返却に責任を持つ必要があることに注意してください。Powertools は引き続きイベントを正しいハンドラにルーティングしますが、イベントの処理方法は完全にあなたの制御下にあります。

イベントフィルタリング

イベントをフィルタリングするには、チャネルハンドラでエラーをスローします。ハンドラが特定のイベントに対してエラーをスローすると、ライブラリはそれをキャッチし、同じインデックスのレスポンスリストにエラーオブジェクトを追加します。これは、対応するメッセージを破棄すべきであることを示します。これにより、イベントを静かにフィルタリングするか、サブスクライバーに意味のあるエラーフィードバックを提供するかを選択できます。

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';

 app.onPublish('/moderation/*', async (payload) => {
    // Filter out inappropriate content 
    if (await containsInappropriateContent(payload)) {
        throw new CustomError('Content violates guidelines');
    }

    // Process valid content 
    return await processContent(payload);
});

 export const handler = async (event, context) =>
  app.resolve(event, context);

まとめ

Powertools for AWS の AppSyncEventsResolver は、シンプルで一貫したインターフェースを提供することで、AppSync Events を処理する開発体験を向上させます。定型コードを削減し、一般的なパターンに対する組み込みサポートを提供することで、インフラコードではなくビジネスロジックに集中できます。

詳細については:

これらの新機能で何を構築するのか楽しみにしています。フィードバックを共有し、アプリケーションで AppSyncEventsResolver をどのように使用しているかお知らせください!

本ブログは「Simplify AWS AppSync Events integration with Powertools for AWS Lambda」 を翻訳したものです。

翻訳者について

Photo of author

稲田 大陸

AWS Japan で働く筋トレが趣味のソリューションアーキテクト。好きな AWS サービスは Amazon Location Service と AWS Amplify で、日本のお客様向けに Amazon Location Service の解説ブログなどを執筆しています。