ステップ4:リアルタイム通信 - WebSocket APIの実装
今回のステップの概要とモバイルアプリとの関連について
このステップでは、ステップ3で構築したデータ管理システムにリアルタイム通信機能を追加します。具体的には、WebSocket APIとリアルタイムメッセージングを実装してモバイルアプリの双方向通信機能を完成させます。
モバイルアプリにとって、リアルタイム通信システムは「デジタルトランシーバー」のような役割を果たします。WebSocketは常時接続された電話回線のようなもので、ユーザー同士がリアルタイムでやり取りでき、位置情報の変更を瞬時に共有できる本格的なモバイルアプリケーションが完成します。
このステップで学ぶこと
- WebSocket APIの構築とリアルタイム双方向通信の実装
- Lambda Authorizerを使用したWebSocket認証の実装
- WebSocketコネクション管理とメッセージブロードキャスト
- リアルタイム通信のパフォーマンス最適化とトラブルシューティング
リソースの関わりと構成説明
ステップ4で作成するリソースは、モバイルアプリケーションのリアルタイム通信層を構築するものです。それぞれのリソースがモバイルアプリにどのように関わるのかを説明します。
WebSocket APIとモバイルアプリの関わり
WebSocket API「リアルタイム通信ゲートウェイ」は、モバイルアプリケーションの「常時接続型電話回線」のような役割を果たします。ユーザー間のチャット、位置情報のリアルタイム共有、オンライン状態の表示などを瞬時に配信します。これにより、従来のHTTP通信では実現困難だった双方向のリアルタイムコミュニケーションが可能になります。
WebSocketを使用することで、サーバーからクライアントへの能動的なメッセージ送信が可能になり、ユーザーは常に最新の情報を受け取ることができます。また、接続を維持したまま双方向通信を行うため、HTTPポーリングと比較してバッテリー消費とネットワーク使用量を大幅に削減できます。
実際の手順
実際の手順では、たくさんの設定値を入力することになります。 本文中に設定値が指定されていない場合は、デフォルト値のまま作業を進めてください。
1. WebSocket APIの作成
リアルタイム双方向通信を実現するWebSocket APIを作成します。
1-1. WebSocket APIの基本設定
- API Gatewayのコンソールを開きます
- 「APIを作成」ボタンをクリックします
- 「WebSocket API」を選択し、「構築」をクリックします
- API名に「mobile-app-websocket」と入力します
- IPアドレスのタイプで「IPv4」を選択します
- ルート選択式に「$request.body.action」と入力します
- 「Next」をクリックします
- ルートを追加で「Next」をクリックします
- 統合をアタッチするで「Next」をクリックします
- ステージを追加でステージ名に「prod」を入力し、「Next」をクリックします
- 「create」をクリックします
【解説】WebSocketとHTTPの違い
HTTPは「リクエスト・レスポンス」型の通信で、クライアントからサーバーへの一方向通信が基本です。一方、WebSocketは「常時接続」型の通信で、サーバーからクライアントへの能動的な通信が可能です。
モバイルアプリでは、チャット機能、リアルタイム位置共有、オンライン状態表示などでWebSocketが威力を発揮します。従来のHTTPポーリング(定期的なリクエスト)と比較して、バッテリー消費とネットワーク使用量を大幅に削減できます。
ルート選択式は、受信メッセージをどのLambda関数に振り分けるかを決定する重要な設定で、メッセージの種類に応じた適切な処理を実現できます。
1-2. WebSocket用Lambda関数の作成
- Lambdaコンソールを開きます
- 「関数を作成」ボタンをクリックします
- 「一から作成」を選択します
- 関数名に「websocket-handler」と入力します
- ランタイムで「Python 3.9」を選択します
- アーキテクチャは「x86_64」を選択します
- 「関数を作成」をクリックします
- Lambdaコンソールで「websocket-handler」関数を開きます
- 「設定」タブの「アクセス権限」を選択します
- 実行ロール名をクリックしてIAMコンソールを開きます
- 「許可を追加」から「ポリシーをアタッチ」をクリックします
- 「AmazonDynamoDBFullAccess」と「AmazonAPIGatewayInvokeFullAccess」を検索して選択します
- 「許可を追加」をクリックします
1-3. WebSocketコネクション管理テーブルの作成
- DynamoDBコンソールを開きます
- 「テーブルの作成」をクリックします
- テーブル名に「websocket-connections」と入力します
- パーティションキーに「connection_id」(文字列)を設定します
- 「テーブルの作成」をクリックします
2. WebSocket Lambda関数の実装
WebSocket接続の管理とメッセージ配信を処理するLambda関数を実装します。
2-1. WebSocket Lambda関数のコード実装
- 「websocket-handler」関数のコードエディタで以下のPythonコードを入力します:
import json
import logging
import boto3
from datetime import datetime
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWSクライアント
dynamodb = boto3.resource('dynamodb')
connections_table = dynamodb.Table('websocket-connections')
apigateway = boto3.client('apigatewaymanagementapi')
def lambda_handler(event, context):
"""
WebSocket接続とメッセージングを処理する関数
"""
try:
# イベント情報をログに記録
logger.info(f"WebSocketイベント: {json.dumps(event)}")
# 接続情報を取得
connection_id = event.get('requestContext', {}).get('connectionId')
route_key = event.get('requestContext', {}).get('routeKey')
# エンドポイントURLを設定
domain_name = event.get('requestContext', {}).get('domainName')
stage = event.get('requestContext', {}).get('stage')
endpoint_url = f"https://{domain_name}/{stage}"
global apigateway
apigateway = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint_url)
# ルートに応じた処理
if route_key == '$connect':
return handle_connect(connection_id, event)
elif route_key == '$disconnect':
return handle_disconnect(connection_id)
elif route_key == '$default':
return handle_message(connection_id, event)
else:
return create_response(400, 'Unknown route')
except Exception as e:
logger.error(f"WebSocketエラー: {str(e)}")
return create_response(500, 'Internal server error')
def handle_connect(connection_id, event):
"""WebSocket接続処理"""
try:
# 認証情報を取得(Cognito Authorizerから)
authorizer = event.get('requestContext', {}).get('authorizer', {})
user_id = authorizer.get('principalId', 'anonymous')
# 接続情報を保存
connection_data = {
'connection_id': connection_id,
'user_id': user_id,
'connected_at': datetime.now().isoformat(),
'ttl': int(datetime.now().timestamp()) + 86400 # 24時間後に自動削除
}
connections_table.put_item(Item=connection_data)
logger.info(f"WebSocket接続成功: {connection_id}, ユーザー: {user_id}")
return create_response(200, 'Connected')
except Exception as e:
logger.error(f"接続処理エラー: {str(e)}")
return create_response(500, 'Connection failed')
def handle_disconnect(connection_id):
"""WebSocket切断処理"""
try:
# 接続情報を削除
connections_table.delete_item(Key={'connection_id': connection_id})
logger.info(f"WebSocket切断: {connection_id}")
return create_response(200, 'Disconnected')
except Exception as e:
logger.error(f"切断処理エラー: {str(e)}")
return create_response(500, 'Disconnection failed')
def handle_message(connection_id, event):
"""メッセージ処理"""
try:
# メッセージ内容を取得
body = json.loads(event.get('body', '{}'))
action = body.get('action', 'unknown')
message_data = body.get('data', {})
# 送信者情報を取得
sender_connection = connections_table.get_item(Key={'connection_id': connection_id})
if 'Item' not in sender_connection:
return create_response(400, 'Connection not found')
sender_user_id = sender_connection['Item']['user_id']
# アクションに応じた処理
if action == 'send_message':
return broadcast_message(sender_user_id, message_data)
elif action == 'location_update':
return broadcast_location_update(sender_user_id, message_data)
else:
return create_response(400, 'Unknown action')
except Exception as e:
logger.error(f"メッセージ処理エラー: {str(e)}")
return create_response(500, 'Message processing failed')
def broadcast_message(sender_user_id, message_data):
"""メッセージをブロードキャスト"""
try:
# アクティブな接続を取得
response = connections_table.scan()
connections = response.get('Items', [])
# ブロードキャスト用メッセージを作成
broadcast_data = {
'type': 'message',
'sender_id': sender_user_id,
'message': message_data.get('message', ''),
'timestamp': datetime.now().isoformat()
}
# 全接続にメッセージを送信
successful_sends = 0
for connection in connections:
if send_message_to_connection(connection['connection_id'], broadcast_data):
successful_sends += 1
logger.info(f"メッセージブロードキャスト完了: {successful_sends}件送信")
return create_response(200, f'Message broadcasted to {successful_sends} connections')
except Exception as e:
logger.error(f"ブロードキャストエラー: {str(e)}")
return create_response(500, 'Broadcast failed')
def send_message_to_connection(connection_id, message_data):
"""特定の接続にメッセージを送信"""
try:
apigateway.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(message_data, ensure_ascii=False)
)
return True
except apigateway.exceptions.GoneException:
# 切断された接続を削除
logger.info(f"切断された接続を削除: {connection_id}")
connections_table.delete_item(Key={'connection_id': connection_id})
return False
except Exception as e:
logger.error(f"メッセージ送信エラー: {str(e)}")
return False
def create_response(status_code, message):
"""レスポンスの生成"""
return {
'statusCode': status_code,
'body': json.dumps({'message': message}, ensure_ascii=False)
}- 「Deploy」ボタンをクリックしてコードをデプロイします
2-2. WebSocket認証機能の実装
WebSocket接続時にCognitoトークンを検証して、認証されたユーザーのみが接続できるようにします。
2-2-1. Lambda Authorizerの作成
- Lambdaコンソールで「関数を作成」をクリックします
- 「一から作成」を選択します
- 関数名に「websocket-authorizer」と入力します
- ランタイムで「Python 3.9」を選択します
- アーキテクチャは「x86_64」を選択します
- 「関数を作成」をクリックします
2-2-2. Lambda Authorizerのコード実装
「websocket-authorizer」関数に以下のコードを実装します:
import json
import jwt
import os
import requests
from jwt.algorithms import RSAAlgorithm
# Cognito設定(環境変数から取得)
REGION = os.environ.get('AWS_REGION', 'us-east-1')
USER_POOL_ID = os.environ.get('USER_POOL_ID', 'us-east-1_xxxxxxxxx')
APP_CLIENT_ID = os.environ.get('APP_CLIENT_ID', 'your-client-id')
# Cognito公開鍵のキャッシュ
jwks_cache = None
def lambda_handler(event, context):
"""
WebSocket接続時の認証を行うLambda Authorizer
"""
try:
# トークンを取得
token = extract_token(event)
if not token:
print("トークンが見つかりません")
return generate_policy('Deny', event['methodArn'])
# トークンを検証
claims = verify_token(token)
if claims:
# 認証成功
principal_id = claims.get('sub', 'unknown')
print(f"認証成功: ユーザーID={principal_id}")
# ユーザー情報をcontextに追加
context_data = {
'principalId': principal_id,
'userId': principal_id,
'email': claims.get('email', ''),
'username': claims.get('cognito:username', '')
}
return generate_policy('Allow', event['methodArn'], context_data)
else:
# 認証失敗
print("トークン検証失敗")
return generate_policy('Deny', event['methodArn'])
except Exception as e:
print(f"認証エラー: {str(e)}")
return generate_policy('Deny', event['methodArn'])
def extract_token(event):
"""
イベントからトークンを抽出
"""
# クエリパラメータから取得
query_params = event.get('queryStringParameters', {})
if query_params and 'token' in query_params:
return query_params['token']
# ヘッダーから取得(Authorization: Bearer <token>)
headers = event.get('headers', {})
auth_header = headers.get('Authorization', '') or headers.get('authorization', '')
if auth_header.startswith('Bearer '):
return auth_header[7:]
return None
def verify_token(token):
"""
CognitoトークンをJWKSで検証
"""
try:
# JWKSを取得(キャッシュあり)
jwks = get_jwks()
# トークンのヘッダーをデコード(署名検証なし)
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header['kid']
# 対応する公開鍵を取得
key = None
for jwk in jwks['keys']:
if jwk['kid'] == kid:
key = RSAAlgorithm.from_jwk(json.dumps(jwk))
break
if not key:
print(f"公開鍵が見つかりません: kid={kid}")
return None
# トークンを検証
claims = jwt.decode(
token,
key,
algorithms=['RS256'],
audience=APP_CLIENT_ID,
options={'verify_exp': True}
)
# トークンタイプを確認(IDトークン)
if claims.get('token_use') != 'id':
print(f"無効なトークンタイプ: {claims.get('token_use')}")
return None
return claims
except jwt.ExpiredSignatureError:
print("トークンの有効期限切れ")
return None
except jwt.InvalidTokenError as e:
print(f"無効なトークン: {str(e)}")
return None
except Exception as e:
print(f"トークン検証エラー: {str(e)}")
return None
def get_jwks():
"""
Cognito JWKSを取得(キャッシュあり)
"""
global jwks_cache
if jwks_cache:
return jwks_cache
# JWKSエンドポイント
jwks_url = f'https://cognito-idp.{REGION}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
response = requests.get(jwks_url)
response.raise_for_status()
jwks_cache = response.json()
return jwks_cache
def generate_policy(effect, resource, context=None):
"""
IAMポリシーを生成
"""
policy = {
'principalId': context.get('principalId', 'user') if context else 'user',
'policyDocument': {
'Version': '2012-10-17',
'Statement': [
{
'Action': 'execute-api:Invoke',
'Effect': effect,
'Resource': resource
}
]
}
}
# コンテキスト情報を追加(Lambda関数で利用可能)
if context:
policy['context'] = context
return policy2-2-3. 環境変数の設定
- 「websocket-authorizer」関数の「設定」タブを開きます
- 「環境変数」セクションで「編集」をクリックします
- 以下の環境変数を追加します:
USER_POOL_ID: ステップ2で作成したCognito ユーザープールID(例:us-east-1_xxxxxxxxx)APP_CLIENT_ID: ステップ2で作成したCognito クライアント ID
- 「保存」をクリックします
2-2-4. 必要なライブラリの追加(Lambda Layer)
Lambda Authorizerではpyjwtとrequestsライブラリが必要です。Lambda Layerを作成してデプロイします。
Lambda環境でのライブラリインストール
Lambda Layerのライブラリは、Lambda実行環境(Amazon Linux 2)と同じ環境でビルドする必要があります。ローカル環境がMacやWindowsの場合、互換性の問題が発生する可能性があります。
方法1: Dockerを使用(推奨)
Lambda環境と同じAmazon Linux 2環境でパッケージをビルドします:
# 作業ディレクトリを作成
mkdir lambda-layer
cd lambda-layer
# Dockerを使用してLambda環境でパッケージをインストール
# 注意: zshシェルの場合、角括弧をクォートで囲む必要があります
docker run --rm -v $(pwd):/var/task \
--entrypoint /bin/sh \
public.ecr.aws/lambda/python:3.9 \
-c "pip install 'pyjwt[crypto]' requests cryptography -t /var/task/python/"
# zipファイルを作成
zip -r lambda-layer.zip python/方法2: ローカル環境で直接インストール
Lambdaで指定したランタイムと同じ環境の場合、直接インストールできます:
# 作業ディレクトリを作成
mkdir lambda-layer
cd lambda-layer
mkdir python
# 必要なライブラリをインストール
# 注意: zshシェルの場合、角括弧をクォートで囲む必要があります
pip install 'pyjwt[crypto]' requests cryptography --platform manylinux2014_x86_64 --only-binary=:all: -t python/
# zipファイルを作成
zip -r lambda-layer.zip python/Lambda Layerのアップロードと作成:
- Lambdaコンソールで「レイヤー」を選択します
- 「レイヤーの作成」をクリックします
- 名前に「websocket-auth-dependencies」と入力します
- 作成した
lambda-layer.zipをアップロードします - 互換性のあるアーキテクチャで「x86_64」を選択します
- 互換性のあるランタイムで「Python 3.9」を選択します
- 「作成」をクリックします
ファイルサイズの確認
Lambda Layerの最大サイズは50MBです(圧縮後)。cryptographyライブラリは大きいため、zipファイルが約20-30MBになることがあります。これは正常です。
Lambda関数にLayerを追加:
- 「websocket-authorizer」関数を開きます
- 「コード」タブの「レイヤー」セクションで「レイヤーの追加」をクリックします
- 「カスタムレイヤー」を選択します
- 作成した「websocket-auth-dependencies」レイヤーを選択します
- バージョンを選択します(最新版)
- 「追加」をクリックします
Layerが正しく追加されたことを確認:
Lambda関数の「設定」→「コード」タブの「レイヤー」セクションで、「websocket-auth-dependencies」が表示されていることを確認します。
2-2-5. WebSocket APIにAuthorizerを統合
- API Gatewayで「mobile-app-websocket」を開きます
- 左メニューから「オーソライザー」を選択します
- 「オーソライザーを作成」をクリックします
- オーソライザー名に「CognitoAuthorizer」と入力します
- Lmbda関数で「us-east-1」、「websocket-authorizer」を選択します
- IDソースタイプで「クエリ文字列」を選択します
- キーに「token」と入力します
- 「オーソライザーを作成」をクリックします
2-2-6. WebSocket APIのルート設定
- API Gatewayで「mobile-app-websocket」を開きます
- 左メニューから「ルート」を選択し、「ルートを作成」をクリックします
- ルートキーに「$connect」と入力します
- 統合タイプで「Lambda関数」を選択します
- 「Lambdaプロキシ統合」のトグルスイッチをONにします
- Lambda関数で「websocket-handler」Lambda関数を設定します
- 「$disconnect」と「$default」ルートも2.~6.の手順で同様に設定します
2-2-7. $connectルートにAuthorizerを適用
- 左メニューから「ルート」を選択します
- 「$connect」ルートをクリックします
- 「ルートリクエスト」タブの「編集」を選択します
- 認可で作成した「CognitoAuthorizer」を選択します
- 「変更を保存」をクリックします
- 「APIをデプロイ」でprodステージにデプロイします
- 表示される「WebSocket URL」をコピーします(wss://xxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod形式)
【解説】WebSocket認証の流れ
- クライアントがWebSocket接続時にCognito IDトークンをクエリパラメータとして送信
- API GatewayがLambda Authorizerを呼び出し
- Lambda Authorizerがトークンを検証:
- Cognito JWKSから公開鍵を取得
- トークンの署名を検証
- 有効期限を確認
- トークンタイプ(id token)を確認
- 認証成功時、ユーザー情報(principalId、userId等)をcontextに設定
- WebSocket Lambda関数が
event['requestContext']['authorizer']からユーザー情報を取得 - ユーザーIDをDynamoDBに保存
セキュリティのポイント:
- トークンの署名検証により、改ざんを防止
- 有効期限チェックにより、古いトークンの利用を防止
- ユーザーIDとconnection_idを紐付けて、なりすましを防止
2-2-8. WebSocket Lambda関数の認証情報取得の確認
「websocket-handler」関数のhandle_connect関数で、Lambda Authorizerから渡された認証情報を正しく取得していることを確認します。
確認する場所:handle_connect関数の先頭部分
def handle_connect(connection_id, event):
"""WebSocket接続処理"""
try:
# ===== ここで認証情報を取得 =====
# Lambda Authorizerで設定したcontextからユーザー情報を取得
authorizer = event.get('requestContext', {}).get('authorizer', {})
user_id = authorizer.get('principalId', 'anonymous')
# =================================
# 接続情報を保存
connection_data = {
'connection_id': connection_id,
'user_id': user_id, # ← Authorizerから取得したユーザーIDを保存
'connected_at': datetime.now().isoformat(),
'ttl': int(datetime.now().timestamp()) + 86400
}
connections_table.put_item(Item=connection_data)
# ... 以下省略 ...重要なポイント:
- Lambda Authorizerのcontextから取得:
event['requestContext']['authorizer']には、Lambda Authorizerで設定したcontextの内容が入っています - principalIdの取得: Lambda Authorizerで設定した
principalId(ユーザーID)を取得します - DynamoDBへの保存: 取得したユーザーIDをconnection_idと一緒にDynamoDBに保存します
このコードにより、Lambda Authorizerで認証されたユーザー情報が自動的に取得され、WebSocket接続とユーザーが紐付けられます。
3. 認証付きWebSocket接続とリアルタイム通信の動作確認
構築したWebSocket APIが認証機能と共に正しく動作することを確認します。
3-1. テストツールのインストール
WebSocket接続テストに必要なツールをインストールします:
# wscatをグローバルインストール
npm install -g wscat3-2. 認証付きWebSocket接続テスト
IDトークンを使用してWebSocket接続をテストします:
WebSocket URLとトークンを環境変数に設定:
export CLIENT_ID="xxxxxxxxxxxxxxxx"
export WS_URL="wss://xxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod"
# IDトークン
export ID_TOKEN=$(aws cognito-idp initiate-auth \
--client-id ${CLIENT_ID} \
--auth-flow USER_PASSWORD_AUTH \
--auth-parameters USERNAME=testuser@example.com,PASSWORD=Test1234! \
--region us-east-1 \
--output json | jq -r '.AuthenticationResult.IdToken')【解説】WebSocketとHTTPの違い
- HTTP: リクエスト→レスポンスの1往復で接続が切れる
- WebSocket: 一度接続すると、接続を維持したまま双方向通信が可能
WebSocketは、チャットアプリやリアルタイム位置共有など、サーバーからクライアントへの能動的な通知が必要な場合に適しています。
IDトークンの有効期限
Cognito IDトークンのデフォルト有効期限は1時間です。テスト中にトークンが期限切れになった場合は、再度取得してください。トークンが無効な場合、WebSocket接続時に401 Unauthorizedエラーが返されます。
認証付きでWebSocket接続:
# トークンをクエリパラメータとして渡す
wscat -c "${WS_URL}?token=${ID_TOKEN}"期待される出力(接続成功):
Connected (press CTRL+C to quit)
>認証失敗時の出力:
error: Unexpected server response: 401認証が失敗する主な原因
- トークンが無効: IDトークンが正しくない、または期限切れ
- User Pool IDの不一致: Lambda Authorizerの環境変数が間違っている
- App Client IDの不一致: Lambda Authorizerの環境変数が間違っている
- Authorizerの設定ミス: API Gatewayで$connectルートにAuthorizerが設定されていない
3-3. 認証付きメッセージ送信テスト
WebSocket接続が成功したら、メッセージを送信してテストします:
メッセージ送信:
{"action": "send_message", "data": {"message": "Hello from authenticated user!"}}期待される応答:
{
"type": "message",
"sender_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"message": "Hello from authenticated user!",
"timestamp": "2025-11-14T15:56:02.183047"
}sender_idには、Lambda Authorizerで認証されたユーザーIDが表示されます(anonymousではない)。
3-4. 複数クライアントでのリアルタイムテスト(認証付き)
2つのターミナルウィンドウで同時に認証付きWebSocket接続を行い、リアルタイム通信をテストします:
ターミナル1(ユーザーA):
# ユーザーAとしてログイン
export ID_TOKEN_A=$(aws cognito-idp initiate-auth \
--client-id ${CLIENT_ID} \
--auth-flow USER_PASSWORD_AUTH \
--auth-parameters USERNAME=testuser@example.com,PASSWORD=Test1234! \
--region us-east-1 \
--output json | jq -r '.AuthenticationResult.IdToken')
wscat -c "${WS_URL}?token=${ID_TOKEN_A}"ターミナル2(ユーザーB): 別のターミナルを開き、ユーザーBとしてログインする
export CLIENT_ID="xxxxxxxxxxxxxxxx"
export WS_URL="wss://xxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod"
export USER_POOL_ID =" us-east-1_xxxxxxxxx "
# User_B作成
aws cognito-idp sign-up \
--client-id ${CLIENT_ID} \
--username testuser2@example.com \
--password Test1234! \
--user-attributes Name=email,Value=testuser2@example.com \
--region us-east-1
aws cognito-idp admin-confirm-sign-up \
--user-pool-id ${USER_POOL_ID} \
--username testuser2@example.com \
--region us-east-1
# token取得
# ユーザーBとしてログイン
export ID_TOKEN_B=$(aws cognito-idp initiate-auth \
--client-id ${CLIENT_ID} \
--auth-flow USER_PASSWORD_AUTH \
--auth-parameters USERNAME=testuser2@example.com,PASSWORD=Test1234! \
--region us-east-1 \
--output json | jq -r '.AuthenticationResult.IdToken')
wscat -c "${WS_URL}?token=${ID_TOKEN_B}"接続後、ターミナル1(ユーザーA)からメッセージを送信:
{"action": "send_message", "data": {"message": "Hello from User A!"}}ターミナル2(ユーザーB)の画面に、ユーザーAからのメッセージがリアルタイムで表示されることを確認します:
{
"type": "message",
"sender_id": "ユーザーAのID",
"message": "Hello from User A!",
"timestamp": "2025-11-14T16:30:00.000000"
}【解説】リアルタイムブロードキャスト
WebSocket接続しているすべてのクライアントに、メッセージがリアルタイムで配信されます。この仕組みにより:
- チャット機能: ユーザー間のリアルタイムメッセージング
- 位置情報共有: 他のユーザーの移動をリアルタイムで表示
- 通知配信: システムからの即時通知
が実現できます。
3-5. DynamoDB Connectionsテーブルの確認
WebSocket接続情報がDynamoDBに保存されていることを確認します:
- DynamoDBコンソールを開きます
- 「websocket-connections」テーブルを選択します
- 左メニューの「項目を探索」をクリックします
- 現在接続中のコネクションIDが表示されることを確認します
確認すべき項目:
connection_id: WebSocketコネクションの一意のIDuser_id: 認証されたユーザーのID(Lambda Authorizerから取得)connected_at: 接続日時ttl: Time To Live(自動削除用、24時間後に自動削除)
期待される内容の例:
{
"connection_id": "abc123def456",
"user_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"connected_at": "2025-11-14T16:30:00.000000",
"ttl": 1731692400
}【解説】WebSocketコネクション管理
WebSocketの接続情報をDynamoDBに保存することで、以下が可能になります:
- 特定ユーザーへのメッセージ送信:
user_idからconnection_idを検索して送信 - 全ユーザーへのブロードキャスト: すべての
connection_idを取得して一斉送信 - アクティブユーザー数の把握: 接続中のレコード数をカウント
- 古い接続の自動削除: TTL機能で24時間後に自動削除
- ユーザーの複数デバイス対応: 同じ
user_idで複数のconnection_idを管理可能
認証されたユーザーIDの確認
user_idフィールドに実際のCognito User ID(UUID形式)が保存されていることを確認してください。anonymousと表示されている場合は、Lambda Authorizerが正しく動作していない可能性があります。
3-6. CloudWatch Logsでの実行ログ確認
WebSocket Lambda関数のログを確認します:
WebSocket Handlerのログ:
- CloudWatchコンソールを開きます
/aws/lambda/websocket-handlerを選択します- 最新のログストリームを確認します
接続時のログ例:
START RequestId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
[INFO] WebSocketイベント: {"headers": …
[INFO] WebSocket接続成功: …
END RequestId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxメッセージ送信時のログ例:
START RequestId: yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy
[INFO] WebSocketイベント:{"requestContext": …
[INFO] メッセージブロードキャスト完了: 2件送信
END RequestId: yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy3-7. トラブルシューティング
エラー: “error: Unexpected server response: 401”
原因:WebSocket接続時の認証に失敗している
対処法:
- IDトークンの有効期限を確認: トークンは1時間で期限切れになります。新しいトークンを取得してください
- トークンの形式を確認: クエリパラメータに正しく
token=として渡されているか確認 - Lambda Authorizerの環境変数を確認:
USER_POOL_IDとAPP_CLIENT_IDが正しく設定されているか確認 - Lambda Layerの確認:
pyjwtとrequestsライブラリが正しくインストールされているか確認(上記のImportModuleErrorを参照) - CloudWatch Logsを確認:
/aws/lambda/websocket-authorizerのログでエラー内容を確認
# CloudWatch Logsを確認
aws logs tail /aws/lambda/websocket-authorizer --followエラー: “Failed to upgrade to websocket”
原因:WebSocket URLが間違っている、またはAPI Gatewayの設定が不完全
対処法:
- WebSocket URLが
wss://で始まることを確認 - API Gatewayでルート($connect、$disconnect、$default)が正しく設定されているか確認
- Lambda統合が正しく設定されているか確認
- API Gatewayが正しくデプロイされているか確認(prodステージ)
エラー: 接続はできるがメッセージが届かない
原因:Lambda関数のメッセージ送信ロジックにエラーがある
対処法:
- CloudWatch LogsでLambda関数のエラーを確認
- DynamoDB connectionsテーブルに接続情報が保存されているか確認
- API Gateway Management APIを使用したメッセージ送信権限を確認
エラー: “GoneException: Connection no longer available”
原因:接続が既に切断されているconnection_idに対してメッセージを送信しようとしている
対処法:
- Lambda関数でGoneExceptionをキャッチして処理
- DynamoDBから該当のconnection_idを削除
- 接続の有効性を定期的にチェック
try:
api_gateway_management_api.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps(message)
)
except api_gateway_management_api.exceptions.GoneException:
# 接続が切れている場合、DynamoDBから削除
dynamodb.delete_item(
TableName='websocket-connections',
Key={'connection_id': connection_id}
)エラー: “Rate exceeded” 大量の接続時
原因:API GatewayまたはLambdaの同時実行制限に達している
対処法:
- API Gatewayのスロットリング設定を確認・調整
- Lambda関数の同時実行数の上限を引き上げ
- DynamoDBのキャパシティを増やす(オンデマンドまたはプロビジョニング)
【解説】WebSocketのスケーラビリティ
大規模なリアルタイムアプリケーションでは、以下の考慮が必要です:
- 接続数の制限: API Gatewayは1アカウントあたり10,000接続まで(上限緩和申請可能)
- Lambda同時実行数: デフォルトで1,000(リージョンごと、上限緩和申請可能)
- DynamoDBスループット: オンデマンドモードは自動スケーリング、プロビジョニングモードは事前計画が必要
- コスト最適化: 接続維持時間、メッセージ送信回数、DynamoDB読み書き量を監視
このステップで何をしたのか
このステップでは、モバイルアプリケーション向けのリアルタイム通信システムを構築しました。具体的には、WebSocket APIでリアルタイム双方向通信を実現し、Lambda Authorizerを使用したセキュアな認証機能を実装しました。また、コネクション管理テーブルで接続状態を管理し、接続中のすべてのクライアントにメッセージをブロードキャストできる効率的な配信システムを完成させました。
モバイルアプリでどのような影響があるのか
この構成により、モバイルアプリは企業レベルのリアルタイムコミュニケーション機能を提供できるようになります。ユーザー同士のリアルタイムチャット、位置情報の即座な共有、チェックイン通知の瞬時配信が可能になります。WebSocketによる常時接続により、従来のHTTPポーリングと比較してバッテリー消費を大幅に削減し、ユーザー体験を向上させます。これは、LINEやSlackのような「プロフェッショナルなコミュニケーションツール」に相当するリアルタイム機能を実現します。
技術比較まとめ表
| 技術領域 | AWS | オンプレミス |
|---|---|---|
| リアルタイム通信 | WebSocket API + Lambda マネージド接続管理、自動スケーリング | Socket.IO + Node.js 手動接続管理、サーバー管理 |
| 認証・セキュリティ | Lambda Authorizer + Cognito JWT検証、自動認証処理 | カスタム認証ミドルウェア 手動トークン検証 |
学習において重要な技術的違い
1. リアルタイム通信の管理
- AWS:WebSocket APIによる自動コネクション管理とスケーリング
- オンプレミス:Socket.IOサーバーの手動管理とロードバランシング
2. 運用・監視
- AWS:CloudWatchによる統合監視とアラート
- オンプレミス:各コンポーネントの個別監視システム構築
実践チェック:画面キャプチャで証明しよう
下記のチェック項目について、実際にAWSマネジメントコンソールで設定ができていることを確認し、各項目ごとに該当画面のスクリーンショットを撮影して提出してください。
-
WebSocket API「mobile-app-websocket」が作成されていること
-
WebSocket用Lambda関数「websocket-handler」が作成されていること
-
WebSocket認証用Lambda関数「websocket-authorizer」が作成されていること
-
Lambda Layer「websocket-auth-dependencies」が作成され、websocket-authorizer関数に追加されていること
-
WebSocket APIにAuthorizer「CognitoAuthorizer」が設定されていること
-
$connectルートにAuthorizerが適用されていること
-
DynamoDBテーブル「websocket-connections」が作成されていること
-
WebSocket APIのルート設定とLambda統合が完了していること
提出方法: 各項目ごとにスクリーンショットを撮影し、まとめて提出してください。 ファイル名やコメントで「どの項目か」が分かるようにしてください。
構成図による理解度チェック
ステップ3の構成図に、このステップで作成したリアルタイム通信層を追記して、最終的なインフラ構成を完成させましょう。
なぜ構成図を更新するのか?
リアルタイム通信層の追加により、モバイルアプリのアーキテクチャが完全なエンタープライズレベルのシステムとして完成します。WebSocketによる双方向通信、Lambda Authorizerによるセキュアな認証、DynamoDBによる接続管理の相互関係を視覚的に理解することが重要です。また、従来のHTTP通信との違いや、スケーラビリティの仕組みを明確にできます。
- リアルタイム通信フロー: WebSocket接続の確立から切断までの完全なライフサイクル
- 認証フロー: Lambda Authorizerによるトークン検証とユーザー情報の取得
- メッセージ配信: 接続中のクライアントへのブロードキャスト処理
構成図の書き方
ステップ3で作成した構成図をベースに、以下のリソースを追記してみましょう。
- WebSocket API: REST APIと並列に配置し、リアルタイム通信経路を表現
- Lambda Authorizer: WebSocket接続時の認証フローを表現
- websocket-connections テーブル: DynamoDBでのコネクション管理を表現
- メッセージフロー: Lambda関数から全接続へのブロードキャスト処理を図示
💡 ヒント: リアルタイム通信(WebSocket)と従来の通信(HTTP)を色分けして表現すると、システムの特徴が理解しやすくなります
理解度チェック:なぜ?を考えてみよう
AWSの各リソースや設計には、必ず”理由”や”目的”があります。 下記の「なぜ?」という問いに自分なりの言葉で答えてみましょう。 仕組みや設計意図を自分で説明できることが、真の理解につながります。 ぜひ、単なる暗記ではなく「なぜそうなっているのか?」を意識して考えてみてください。
Q. なぜモバイルアプリでWebSocket通信を使用するのか?従来のHTTPポーリングと比較してどのようなメリットがあるか?
Q. なぜWebSocket接続時に認証が必要なのか?Lambda Authorizerを使用する理由は何か?
Q. なぜトークンの署名検証にJWKS(JSON Web Key Set)を使用するのか?トークンをそのまま信頼することの問題点は何か?
Q. WebSocketのコネクション管理をDynamoDBで行う理由は何か?メモリ内で管理する場合と比較してどのようなメリットがあるか?
今回のステップで利用したAWSサービス名一覧
- Amazon API Gateway:WebSocket APIによるリアルタイム双方向通信を提供するサービス
- AWS Lambda:WebSocket接続管理、メッセージ処理、認証処理を行うサーバーレスコンピューティング
- Amazon DynamoDB:WebSocket接続情報を管理するデータベース
- Amazon Cognito:ユーザー認証とJWTトークン発行を行うサービス
- Amazon CloudWatch:リアルタイム通信の監視・ログ分析を行うサービス