ステップ6:イベントドリブンな自動アクション - EventBridge + Lambda(オプション)
このステップはオプションです
ステップ5まで完了すれば、データ分析パイプラインの基本的な機能は完成します。ステップ6は、より高度な自動化機能を学びたい方向けのオプションコンテンツです。時間に余裕がある場合や、自動化に興味がある場合は、ぜひチャレンジしてみてください。
今回のステップの概要とデータ分析パイプラインとの関連について
このステップでは、ステップ4で構築したAthenaの分析結果と、ステップ5で作成したQuickSightダッシュボードを活用し、Amazon EventBridgeとAWS Lambdaを使って自動アクションを実現します。Athena結果を定期評価し、しきい値を超えたら即時に通知やAPI実行を行うフローを構築します。ダッシュボードに頼らず、自動で「誰が・何をするか」を動かす仕組みを作ります。
このステップで学ぶこと
- EventBridge Schedulerでの定期トリガー設定
- Lambdaによるしきい値判定ロジックの実装
- SNS/Slack通知と簡易エスカレーション
- 具体的な自動アクション例(広告停止API、Auto Scaling調整など)
- プレイブック化と再利用
リソースの関わりと構成説明
- EventBridge Scheduler:所定の間隔でLambdaを起動し、最新データを評価。
- Lambda:Athena結果(または集約済みS3オブジェクト)を読み、条件判定と通知/アクションを実行。
- SNS/Slack Webhook:通知チャネル。役割別にトピックやWebhookを分ける。
- (任意)API呼び出し先:広告停止、スケールアウトなど。
実際の手順
1. しきい値判定の入力データを決める
- ステップ4で構築したAthenaのクエリ結果をS3にCSV/Parquetで出力します。ステップ4で設定したクエリ結果の保存場所(
s3://data-analysis-lake-(あなたのアカウントID)/athena-query-results/)を活用します。 - 判定に使う指標を明確化します(例:売上前日比-30%未満、エラー率>2%、特定チャネル急増など)。
2. Lambda関数の作成
しきい値判定と通知・アクション実行を行うLambda関数を作成します。
2-1. Lambda関数の作成
- AWSマネジメントコンソールで「Lambda」を検索して選択します
- 「Create function」ボタンをクリックします
- Function nameに「data-analysis-alert-function」と入力します
- Runtimeで「Python 3.11」を選択します
- 「Create function」ボタンをクリックします
2-2. Lambda関数のコード実装
- 関数のコードエディタに、以下のような判定ロジックを実装します:
import json
import boto3
from datetime import datetime
s3 = boto3.client('s3')
sns = boto3.client('sns')
athena = boto3.client('athena')
def lambda_handler(event, context):
# S3から最新の集約データを取得
# またはAthena APIでクエリ実行→結果S3から取得
# 判定ロジックで閾値超過を確認
threshold_exceeded = check_thresholds()
if threshold_exceeded:
# 通知メッセージを整形(誰に、何を、いつまでに)
message = format_alert_message(threshold_exceeded)
# SNSに通知を送信
sns.publish(
TopicArn='arn:aws:sns:region:account:alert-topic',
Message=message,
Subject='データ分析アラート'
)
# 必要に応じて外部API/SDKを呼び出しアクションを実行
execute_action(threshold_exceeded)
return {
'statusCode': 200,
'body': json.dumps('処理完了')
}【解説】Lambda関数の実装ポイント
Lambda関数では、以下の処理を実装します:
- データ取得:S3から最新の集約データを取得するか、Athena APIでクエリを実行して結果を取得します
- 判定ロジック:取得したデータを評価し、しきい値を超えているか確認します
- 通知送信:条件が一致した場合、SNSに通知を送信します。通知メッセージには「誰に、何を、いつまでに」を含めます
- アクション実行:必要に応じて、外部APIやAWS SDKを使用して自動アクションを実行します
学習用途では、シンプルな判定ロジックから始めることをおすすめします。本番環境では、エラーハンドリングやリトライロジックを追加します。
2-3. IAMロールの設定
- Lambda関数の「Configuration」タブ→「Permissions」を選択します
- 実行ロールに以下の権限を追加します:
s3:GetObject(S3からのデータ取得)athena:StartQueryExecution、athena:GetQueryResults(Athenaクエリ実行、必要な場合)sns:Publish(SNS通知送信)
【解説】最小権限の原則
Lambda関数の実行ロールには、必要最小限の権限のみを付与することをおすすめします。これにより、セキュリティを向上させることができます。
学習用途では、上記の権限で十分です。本番環境では、さらに細かい権限設定(特定のS3バケットのみ、特定のSNSトピックのみなど)を行うことをおすすめします。
3. EventBridge Schedulerで定期実行
EventBridge Schedulerを使用して、Lambda関数を定期的に実行します。
3-1. スケジュールの作成
- AWSマネジメントコンソールで「EventBridge」を検索して選択します
- 左側のナビゲーションペインから「Scheduler」を選択します
- 「Create schedule」ボタンをクリックします
3-2. スケジュールの設定
- Schedule nameに「data-analysis-alert-schedule」と入力します
- Schedule typeで「Recurring schedule」を選択します
- Schedule patternで「Rate-based schedule」または「Cron-based schedule」を選択します
- Rate-based:例「rate(15 minutes)」(15分ごと)
- Cron-based:例「cron(0/15 * * * ? *)」(15分ごと)
【解説】スケジュールパターンの選択
Rate-based scheduleは、指定した間隔で実行するシンプルなスケジュールです。例:「rate(15 minutes)」「rate(1 hour)」など。
Cron-based scheduleは、より柔軟なスケジュール設定が可能です。例:「cron(0 9 * * ? *)」(毎日9時)、「cron(0/15 * * * ? *)」(15分ごと)など。
今回は単純な指定が可能なRate-based scheduleを使用しましたが、Cron-based scheduleで指定することもできます。
3-3. ターゲットの設定
- Target typeで「AWS Lambda Invoke」を選択します
- Lambda functionで、ステップ2-1で作成した「data-analysis-alert-function」を選択します
- Execution roleで「Create new role」を選択します(または既存のロールを使用)
3-4. リトライとDLQの設定
- Retry policyで、リトライ回数とリトライ間隔を設定します(例:最大3回、指数バックオフ)
- **Dead-letter queue (DLQ)**で、失敗したイベントを保存するSQSキューを設定します(オプション)
【解説】リトライとDLQの重要性
リトライポリシーを設定することで、Lambda関数の実行が失敗した場合に自動的に再試行できます。これにより、一時的な障害から自動的に回復できます。
デッドレタキュー(DLQ)を設定することで、リトライを尽くしても失敗したイベントを保存できます。これにより、障害の原因を後から調査できます。
学習用途では、リトライポリシーを設定することをおすすめします。DLQはオプションですが、本番環境では設定することをおすすめします。
- 「Create schedule」ボタンをクリックします
4. 通知とエスカレーション
SNSトピックを作成し、通知を送信する仕組みを構築します。
4-1. SNSトピックの作成
- AWSマネジメントコンソールで「SNS」を検索して選択します
- 左側のナビゲーションペインから「Topics」を選択します
- 「Create topic」ボタンをクリックします
- Topic typeで「Standard」を選択します
- Nameに「data-analysis-alerts」と入力します
- 「Create topic」ボタンをクリックします
4-2. サブスクリプションの追加
- 作成したトピックを選択します
- 「Create subscription」ボタンをクリックします
- Protocolで「Email」または「HTTPS」(Slack Webhook用)を選択します
- EndpointにメールアドレスまたはWebhook URLを入力します
- 「Create subscription」ボタンをクリックします
- メールの場合は、確認メールを確認してサブスクリプションを有効化します
【解説】通知チャネルの選択
SNSは、複数のプロトコル(メール、SMS、HTTP、HTTPSなど)に対応しています。用途に応じて適切なチャネルを選択します。
- メール:一般的な通知に適しています。確認が必要です
- HTTPS(Slack Webhook):チームチャットへの通知に適しています。即座に通知が届きます
- SMS:緊急通知に適しています。追加の料金がかかります
学習用途では、メールまたはSlack Webhookを使用することをおすすめします。
4-3. 通知メッセージの整形
Lambda関数内で、通知メッセージに以下の情報を含めます:
- 条件:どのしきい値を超えたか
- 検出時刻:異常が検出された時刻
- 推奨アクション:取るべき行動
- 担当:対応すべき担当者
【解説】通知メッセージの設計
通知メッセージには、「誰が・何を・いつまでに」を明確に記載することで、受信者が即座に行動できるようにします。
例:
【データ分析アラート】
条件:売上前日比-30%未満
検出時刻:2024-01-15 14:30:00
推奨アクション:広告配分の見直し、価格調整の検討
担当:マーケティングチーム
期限:当日中このような形式で通知することで、受信者が迷わず行動できます。
5. 自動アクションの実装例
Lambda関数内で、具体的な自動アクションを実装します。以下に、よくあるシナリオの実装例を示します。
5-1. 広告出稿停止APIの呼び出し
急激なCV低下やCPA悪化が検出された場合、広告出稿を自動停止します。
def execute_action(threshold_exceeded):
if threshold_exceeded['type'] == 'cv_drop':
# 広告APIを呼び出して広告を停止
# 例:Google Ads API、Facebook Ads APIなど
stop_ad_campaign(threshold_exceeded['campaign_id'])【解説】外部API呼び出しの実装
Lambda関数から外部APIを呼び出す場合、適切な認証情報(APIキー、OAuthトークンなど)を環境変数やAWS Secrets Managerに保存します。
学習用途では、実際の広告APIを呼び出す代わりに、ログ出力やSNS通知でアクションをシミュレートすることができます。本番環境では、実際のAPIを呼び出す実装を行います。
5-2. Auto Scalingの上限変更
エラー率上昇やトラフィック急増が検出された場合、Auto Scalingの上限を一時的に引き上げます。
import boto3
autoscaling = boto3.client('autoscaling')
def execute_action(threshold_exceeded):
if threshold_exceeded['type'] == 'error_rate_high':
# Auto Scalingグループの最大サイズを一時的に増やす
autoscaling.set_desired_capacity(
AutoScalingGroupName='my-asg',
DesiredCapacity=10,
HonorCooldown=False
)【解説】Auto Scalingの自動調整
Lambda関数からAuto Scalingを調整することで、異常検知時に自動的にスケールアウトできます。これにより、システムの可用性を維持できます。
学習用途では、実際のAuto Scalingグループが存在しない場合、ログ出力でアクションをシミュレートすることができます。本番環境では、実際のAuto Scalingグループを調整する実装を行います。
6. プレイブック化
- よくあるシナリオをテンプレート化:
- 売上急落(前日比-30%):通知+広告停止API
- エラー率上昇(>2%):通知+ASG上限アップ
- 通知メッセージと実行手順をMarkdownで保存し、再利用可能にする。
このステップで何をしたのか
EventBridge + Lambdaで定期評価し、条件一致時に通知やAPI実行を自動化するフローを構築しました。ダッシュボード非依存でアクションが走るため、夜間や無人時間帯でも対応が可能になります。
データ分析パイプラインでどのような影響があるのか
この構成により、データ分析パイプラインは自動アクションを実現できるようになりました。異常検知から通知・初動までの時間を短縮でき、人の監視に頼らず、行動が自動でトリガーされます。プレイブック化により運用が属人化しにくくなり、夜間や無人時間帯でも対応が可能になります。
技術比較まとめ表
| 技術領域 | AWS | オンプレミス |
|---|---|---|
| スケジュール実行 | Amazon EventBridge Scheduler サーバーレス、cron/固定間隔、リトライ機能 | cron、タスクスケジューラ 自前でサーバー構築・運用、障害時の対応が必要 |
| 自動化処理 | AWS Lambda サーバーレス、自動スケーリング、複数言語対応 | スクリプト、ジョブスケジューラ サーバー管理、リソース管理が必要 |
| 通知配信 | Amazon SNS 複数プロトコル対応、自動スケーリング | メールサーバー、チャットボット 自前で構築・運用、可用性の管理が必要 |
学習において重要な技術的違い
1. サーバーレスとセルフマネージドの違い
- AWS:サーバーの構築・運用が不要。自動的にスケーリング
- オンプレミス:自前でサーバーを構築・運用。リソース管理が必要
2. スケジュール実行の信頼性
- AWS:EventBridge Schedulerによる高可用性なスケジュール実行。リトライとDLQによる障害対応
- オンプレミス:cronやタスクスケジューラによる実行。障害時の対応が複雑
3. 通知配信の柔軟性
- AWS:SNSによる複数プロトコル対応(メール、SMS、HTTP、Slackなど)。自動スケーリング
- オンプレミス:自前でメールサーバーやチャットボットを構築。可用性の管理が必要
4. 統合性
- AWS:Lambdaから他のAWSサービス(Athena、S3、Auto Scalingなど)への統合が容易
- オンプレミス:各システム間の統合が複雑。APIやスクリプトによる個別実装が必要
実践チェック:画面キャプチャで証明しよう
- EventBridge SchedulerでLambdaが定期実行されている
- LambdaがS3/Athenaのデータを取得して判定できている
- SNS/Slack通知が想定の宛先に届いている
- しきい値超過時にAPI呼び出しなどの自動アクションが実行されている
- プレイブック(シナリオ、通知テンプレ、実行手順)が整備されている
提出方法: 各項目ごとにスクリーンショットを撮影し、まとめて提出してください。 ファイル名やコメントで「どの項目か」が分かるようにしてください。
構成図による理解度チェック
このステップで作成したリソースを、データ分析パイプラインの構成図に追加しましょう。
なぜ構成図を更新するのか?
構成図を更新することで、データ分析パイプラインの全体像を視覚的に理解できるようになります。また、各リソースの関係性やデータの流れを明確にすることで、システムの動作を深く理解することができます。
- 自動アクションの流れの理解: EventBridge SchedulerがLambdaを起動し、Athena結果を評価して通知やAPI実行を行う流れを視覚的に把握できる
- リソース間の関係性: EventBridge、Lambda、Athena、SNSがどのように連携しているかを理解できる
- システム全体の把握: パイプライン全体の構造を一目で理解できる
構成図の書き方
ステップ5で作成した構成図をベースに、以下のリソースを追加してみましょう。
- Amazon EventBridge Scheduler: 図の上部に配置。定期実行のトリガーとして表現
- AWS Lambda: EventBridgeの下に配置。判定ロジックとアクション実行を表現
- Amazon SNS: Lambdaの右側に配置。通知配信を表現
- 自動アクションの流れ: EventBridge → Lambda → Athena/S3(データ取得)、Lambda → SNS(通知)、Lambda → 外部API(アクション実行)の矢印を描く
💡 ヒント: 構成図では、データの流れ、メタデータの流れ、可視化の流れ、自動アクションの流れを区別して表現すると理解しやすくなります。また、各リソースの役割を短い説明文で補足すると、より分かりやすくなります。
理解度チェック:なぜ?を考えてみよう
Q1. なぜダッシュボード閲覧を前提にせず、イベントドリブンで動かす必要があるのでしょうか? Q2. なぜ通知に「誰が・何を・いつまでに」を含めると行動が速くなるのでしょうか? Q3. なぜリトライとDLQを設定することが運用上重要なのでしょうか?
今回のステップで利用したAWSサービス名一覧
- Amazon EventBridge:スケジュール/イベントトリガー
- AWS Lambda:判定ロジックとアクション実行
- Amazon SNS:通知配信
- Amazon Athena:分析結果の取得
- Amazon S3:データ/クエリ結果の保管