CloudWatchアラームが3時間続いたら、Teamsユーザーに自動発信して読み上げ通知する(ACS × AWSサーバレス実装ガイド)

※本記事は、私のアイディアを元にChatGPT(GPT-5モデル)を用いて書き上げています。

目的:Criticalなシステムアラートが3時間以上継続した場合、組織内のMicrosoft Teamsユーザーへ自動で通話し、TTS(音声合成)で状況を読み上げて知らせるしくみを、Azure Communication Services(ACS)Call AutomationAWS(EventBridge/Lambda/API Gateway) の組み合わせで構築します。
※PSTN(外線)発信は行わないため Teams Phone(電話システム)ライセンスは不要 です。


なぜACS(Call Automation)を使うのか

Teamsの自動通話には大きく二つの選択肢があります。

  • Graph Calls API(Calling Bot):柔軟ですが、常時稼働のメディア処理(ボットプロセス)が必要で、Lambda単体では完結しづらい。
  • ACS(Azure Communication Services)Call Automationサーバレス寄りで実装が軽いコールバックURL(Webhook)に対してイベントが飛び、Play(TTS/音声)→ハングアップのような単純フローはLambda+API Gatewayで作りやすい。

今回はACS案で進めます。社内のTeamsユーザーに対する内線的な通話であればPhoneライセンスは不要です(PSTNへ出なければOK)。


全体アーキテクチャ

┌───────────┐            ┌──────────────────┐
│ CloudWatch │  ALARM    │ EventBridge       │
│ Alarm      ├──────────▶│ (Scheduler)       │ ①ALARM検知→+3時間の一回実行を予約
└─────┬──────┘            └───────┬──────────┘
      │                              │ (+3h)
      │ CloudWatch.Check              ▼
      │                        ┌─────────────┐      create_call
      └───────────────────────▶│ Lambda(A)   │─────────────────────┐
                               │ 発信トリガ │                        │
                 API Gateway   └──────┬──────┘                        │
                  (任意/即時発信)     │                               ▼
                         ▲            │callbackUrl               ┌───────────────┐
                         │            ▼                          │ Azure Comm.   │
                     POST /call  ┌─────────────┐  events(POST)   │ Services(ACS) │
                                 │ API Gateway │◀───────────────▶│  Call Auto.   │
                                 │  /events    │                  └─────┬─────────┘
                                 └──────┬──────┘                        │
                                        │                               │
                                        ▼                               │
                                  ┌─────────────┐                       │
                                  │ Lambda(B)   │ ②CallConnected→TTS   │
                                  │ イベント処理│   Play→Hangup        │
                                  └──────┬──────┘                       │
                                         └──────────────────────────────┘
  • Lambda(A)
    • 「3時間後チェック」または「即時発信(API)」で呼ばれる。
    • 対象アラームがまだALARMなら ACSへ発信(callback URLは /events)。
  • Lambda(B)
    • ACSのコールバックイベント(CallConnectedなど)を受信し、TTS再生→切断まで行う。
  • EventBridge Scheduler
    • ALARM発報時に**+3時間後のワンショット実行**を予約する。

事前準備(Azure 側)

  1. ACSリソース作成
    • Azure Portalで「Communication Services」を作成。
    • 接続文字列エンドポイントを取得。
  2. Teams連携(Interop)を有効化
    • ACSから組織内のTeamsユーザーに通話するには、テナント側でACSとTeamsの相互運用を有効化する必要があります。
    • 手順は公式ドキュメントに沿って実施してください(管理者作業)。ポイントは**対象ユーザーのMicrosoft Entra ID(旧AAD)のObject ID(OID)**で特定する、という点です。
  3. 対象ユーザーのOIDを控える
    • 通話先の社内ユーザー(受信者)のEntra OIDを取得しておきます。

補足:SDKの型としては MicrosoftTeamsUserIdentifier を使い、引数にOIDを渡す実装が一般的です(SDKのバージョンにより細部は異なります)。


事前準備(AWS 側)

  • Lambda 実行ロール
    • Lambda(A):CloudWatch DescribeAlarms を実行できる権限、(必要なら)EventBridge Schedulerの CreateSchedule 権限。
    • Lambda(B):不要(ACSへの呼び出しのみ)。
  • シークレット管理
    • ACS_CONNECTION_STRINGAWS Secrets ManagerSSM パラメータストアで安全に管理。
  • API Gateway
    • POST /call:外部(監視系など)から即時発信したい時の入口。
    • POST /eventsACSコールバック受信エンドポイント(公開URL)。
  • EventBridge Scheduler
    • ALARM検知時に**+3時間後**にLambda(A)を呼ぶワンショットスケジュールを作成。
  • ネットワーク
    • /eventsインターネット公開が必要(ACSがPOSTするため)。
    • 内部向け保護として専用の秘密パスAPIキーWAFなどを検討。

実装(Lambda/A:発信トリガ)

役割:

  • action=check_and_call の場合は「アラームがまだALARMか」を確認し、継続なら発信。
  • action=call の場合は無条件で発信。
  • 発信先はTeamsユーザーOID
  • callback_urlhttps://<api-gw>/events

必要パッケージ

pip install azure-communication-callautomation azure-identity boto3

環境変数(例)

  • ACS_CONNECTION_STRING:ACS接続文字列
  • CALLBACK_URL:API Gatewayの /events のフルURL
  • SOURCE_DISPLAY_NAME:(任意)発信元表示名

コード例(Python 3.11)

※SDKのバージョンによりメソッド名や型名が一部異なる場合があります。エラーになった場合は該当バージョンのAPIリファレンスに合わせて読み替えてください(MicrosoftTeamsUserIdentifier の代わりに create_identifier_from_raw_id("8:orgid:<OID>") を使う等)。

# lambda_call.py
import json, os, boto3
from azure.communication.callautomation import (
    CallAutomationClient,
    CallInvite
)

# Optional: 一部の環境でTeamsユーザー識別子の生成が異なることがあります
try:
    from azure.communication.identity import MicrosoftTeamsUserIdentifier
    def build_teams_identifier(oid: str):
        return MicrosoftTeamsUserIdentifier(user_id=oid)
except Exception:
    from azure.communication.callautomation._shared.utils import create_identifier_from_raw_id
    def build_teams_identifier(oid: str):
        # Raw ID形式(8:orgid:<OID>)から識別子を作るfallback
        return create_identifier_from_raw_id(f"8:orgid:{oid}")

def is_alarm_active(alarm_name: str) -> bool:
    cw = boto3.client("cloudwatch")
    resp = cw.describe_alarms(AlarmNames=[alarm_name])
    if not resp.get("MetricAlarms"):
        return False
    return resp["MetricAlarms"][0]["StateValue"] == "ALARM"

def lambda_handler(event, context):
    """
    受信例:
    {
      "action": "check_and_call",   # or "call"
      "alarmName": "HighErrorRate",
      "teamsUserOid": "<EntraOID>",
      "tts": "Critical alert has been active for 3 hours."
    }
    """
    action = event.get("action", "check_and_call")
    teams_oid = event["teamsUserOid"]
    tts_text = event.get("tts", "Alert has been active for 3 hours.")

    if action == "check_and_call":
        alarm_name = event["alarmName"]
        if not is_alarm_active(alarm_name):
            return {"ok": True, "skipped": True, "reason": "Alarm cleared"}

    callback_url = os.environ["CALLBACK_URL"]
    conn_str = os.environ["ACS_CONNECTION_STRING"]

    client = CallAutomationClient.from_connection_string(conn_str)

    target = build_teams_identifier(teams_oid)
    invite = CallInvite(target_participant=target, source_display_name=os.getenv("SOURCE_DISPLAY_NAME"))

    # /events にコールバック(CallConnected等)がPOSTされる
    result = client.create_call(call_invite=invite, callback_url=f"{callback_url}")

    # callConnectionIdやserverCallIdは後続ハンドラで利用可能
    return {
        "ok": True,
        "callConnectionId": getattr(result, "call_connection_id", None),
        "serverCallId": getattr(result, "server_call_id", None),
        "note": "Call initiated. ACS will post events to /events."
    }

実装(Lambda/B:ACSイベントを受け取り、TTS→切断)

役割:/events に届くACSのコールバックを処理して、CallConnected をトリガに TTS再生 → ハングアップ
(録音やDTMF、リトライなど高度化する場合は、Play完了イベントを待って分岐させるなど拡張)

# lambda_events.py
import json, os
from azure.communication.callautomation import (
    CallAutomationClient
)

# SDKのPlay系APIはバージョンで少し名前が違うことがあります
# 代表的なTextSource / FileSourceのどちらかを使います
try:
    from azure.communication.callautomation import TextSource, FileSource
    HAS_TEXT_SOURCE = True
except Exception:
    HAS_TEXT_SOURCE = False
    # 代替: 音声ファイルURLの再生(FileSource)を推奨

def lambda_handler(event, context):
    # API Gateway(HTTP API/REST)の統合イベントからボディを取り出す
    body = event.get("body")
    if isinstance(body, str):
        payload = json.loads(body)
    else:
        payload = body or {}

    conn_str = os.environ["ACS_CONNECTION_STRING"]
    client = CallAutomationClient.from_connection_string(conn_str)

    # 単発 or バッチの両対応
    events = payload if isinstance(payload, list) else [payload]

    for e in events:
        etype = e.get("type") or e.get("eventType")
        data = e.get("data", {})

        # 代表例: CallConnected イベントで音声再生
        if etype in ("CallConnected", "Microsoft.Communication.CallConnected"):
            call_connection_id = data.get("callConnectionId") or data.get("callconnectionid")
            if not call_connection_id:
                continue

            call_conn = client.get_call_connection(call_connection_id)

            # 1) まず読み上げ(TTS)
            #    SDKによっては get_call_media().play_to_all(...) のような呼び出しになることがあります。
            #    エラー時はSDKのPlay/Mediaの最新APIに読み替えてください。
            tts_text = os.getenv("DEFAULT_TTS", "Critical alert has been active for 3 hours.")
            try:
                if HAS_TEXT_SOURCE:
                    call_conn.play_to_all(TextSource(tts_text))
                else:
                    # 代替: 事前生成した音声ファイルURLを再生
                    audio_url = os.getenv("ALERT_AUDIO_URL")  # 例: S3/Blobの公開URL
                    if audio_url:
                        call_conn.play_to_all(FileSource(audio_url))
            except Exception as ex:
                # 再生失敗時も、最低限のハングアップは試みる
                print(f"Play failed: {ex}")

            # 2) 要件が「読み上げ→すぐ切断」なら即ハングアップ
            try:
                call_conn.hang_up(is_for_everyone=True)
            except Exception as ex:
                print(f"Hangup failed: {ex}")

        # ここに PlayCompleted / HangupCompleted 等の追加ハンドリングを書けます

    return {"statusCode": 200, "body": json.dumps({"ok": True})}

備考

  • TTSが録音に含まれない環境や音質要件がある場合は、TTS→音声ファイル化→URL(Blob/S3)→FileSourceで再生が安定します。
  • ACSのイベントは重複する可能性があるため、serverCallId等で冪等制御を行うと安全です。

EventBridge Scheduler で「+3時間後に一回実行」を作る(例)

ALARM発報イベントを受けたハンドラから、3時間後にLambda(A)を呼び出すワンショットのスケジュールを作成します(CDK/TerraformでIaC化を推奨)。

# lambda_on_alarm_event.py (ALARM→スケジュール作成)
import json, os, boto3
from datetime import datetime, timedelta, timezone

def lambda_handler(event, context):
    # ここでは CloudWatch Alarm State Change イベントを受けた前提
    alarm_name = event["detail"]["alarmName"]
    teams_oid  = os.environ["DEFAULT_TEAMS_OID"]  # 環境変数 or ルックアップ

    # 3時間後(Asia/Tokyo前提なら意識してISO時間を作る。ここではUTCで表現)
    run_at = datetime.now(timezone.utc) + timedelta(hours=3)

    scheduler = boto3.client("scheduler")
    role_arn  = os.environ["SCHEDULER_ROLE_ARN"]  # SchedulerがLambdaを呼ぶためのロール
    target_arn = os.environ["TARGET_LAMBDA_ARN"]  # Lambda(A)のARN

    name = f"call-after-3h-{alarm_name}-{int(run_at.timestamp())}"
    input_json = json.dumps({
        "action": "check_and_call",
        "alarmName": alarm_name,
        "teamsUserOid": teams_oid,
        "tts": f"[{alarm_name}] Critical alert has been active for 3 hours."
    })

    scheduler.create_schedule(
        Name=name,
        ScheduleExpression=f"at({run_at.strftime('%Y-%m-%dT%H:%M:%SZ')})",
        FlexibleTimeWindow={"Mode": "OFF"},
        Target={
            "Arn": target_arn,
            "RoleArn": role_arn,
            "Input": input_json
        }
    )

    return {"ok": True, "schedule": name}

注意

  • タイムゾーンはUTCで扱うのが簡単です。読み上げメッセージに**日本時間(Asia/Tokyo)**を含めたい場合は、Lambda側で変換して埋め込んでください。
  • 同名スケジュール禁止・キャンセル運用・再通知(別ユーザーへ順次転送)などは運用設計に応じて拡張します。

API設計例(API Gateway)

  • POST /call … 即時発信
    • Body例: { "action": "call", "teamsUserOid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", "tts": "[HighErrorRate] Critical has been active for 3 hours." }
  • POST /eventsACSコールバック受信(公開URL)

/callIAM認可やAPIキーで保護、/events推測困難なパスWAF等で防御するのがおすすめです。


運用のポイント/エッジケース

  • 誰も出ない/拒否された
    • コール接続イベントやタイムアウトを契機に、次の宛先へ自動転送(エスカレーション)を実装可能。宛先ユーザー配列とリトライ回数をパラメータ化しましょう。
  • 就業時間外の扱い
    • EventBridge Schedulerの代わりにStep Functionsでウィンドウ判定→平日日中のみ発信とするなどの運用も可能。
  • 重複発信の防止
    • 同一serverCallIdやアラーム名+期間で冪等キーを設計し、DynamoDB等で状態を記録すると堅牢です。
  • 多言語TTS
    • TextSourceの音声・言語指定(SDKのオプション)を活用。日本語と英語の切り替えや、アナウンス文面のテンプレート化を。
  • セキュリティ
    • ACSコールバックは想定外のPOSTに注意。ヘッダや署名、特定のフィールド検証で簡易バリデーションを実施。
    • 秘密情報(接続文字列、APIキー)はSecrets Manager / SSMで管理。
  • コスト
    • AWSはLambda・API GW・EventBridgeの従量(小規模なら微小)。AzureはACSの通話・メディア操作分の従量。施工後に実運用のメトリクスを可視化し、しきい値やリトライ回数を調整しましょう。

よくある質問(FAQ)

Q. Teams Phone(電話システム)ライセンスは必要?
A. 不要です。今回はPSTNに発信しない前提で、Teamsユーザー(社内)への内線的なVoIP通話のみ行います。

Q. 読み上げの音質や録音への反映が気になります。
A. SDKのTextSourceで十分な場合もありますが、TTSを事前に音声ファイル化→FileSourceで再生すると安定します。録音や証跡用途では特に有効です。

Q. Call Automationのメソッド名がサンプルと違う。
A. SDKのマイナーバージョンで命名が変わることがあります。エラー時は最新のAPIリファレンスに合わせて、
call_conn.play_to_all(...)call_conn.get_call_media().play_to_all(...) のように読み替えてください。

Q. 3時間の判定は他の方法でも実装できますか?
A. はい。EventBridge Schedulerの代わりにStep FunctionsSQS遅延DynamoDB TTL+定期バッチ等でも可能です。最小構成としてはSchedulerのワンショットが手早く堅実です。


仕上げ:段階的な導入手順(チェックリスト)

  1. Azure
    • ACS作成、接続文字列取得
    • Teams連携(Interop)有効化
    • 対象ユーザーのEntra OIDを控える
  2. AWS
    • Lambda(A/B)をデプロイ、環境変数・シークレット設定
    • API Gatewayで /call/events を公開
    • CloudWatch AlarmのState Changeイベントをトリガに、+3時間のSchedulerを作るLambdaを配置(または手動でSchedulerを作成)
  3. 動作確認
    • /call に対してテスト用のJSONをPOSTして即時発信を確認
    • 発信後、CallConnected→TTS→切断のイベント流れをCloudWatch Logsで確認
    • 本番アラームと同条件でリハーサル(テスト用メトリクスや閾値で再現)
  4. 運用強化
    • エスカレーション(多段着信)
    • 就業時間帯制御
    • ログ/監査/可観測性(CloudWatch Logs Insights、メトリクス、ダッシュボード)
    • フェールセーフ(コール失敗時はSMS/メール/チャットにフォールバック等)

まとめ

  • 要件:アラートが3時間継続したら自動でTeamsユーザーに電話し、TTSで読み上げたい。
  • 解法ACS Call Automation × AWSサーバレス(EventBridge Scheduler + Lambda + API Gateway)。
  • メリット:Phoneライセンス不要/サーバレスで保守が軽い/拡張性が高い。
  • ポイント:Teams連携の有効化、コールバックURLの公開、冪等・セキュリティ・エスカレーションの設計。

コメント

タイトルとURLをコピーしました