SCCで検知した問題のあるFWルールを自動削除する予防的ガードレールをCloud Run Functionsで実装してみた!

こんにちは! 人事・技術・経営推進本部 インフラストラクチャーDiv リードインフラエンジニアの入田です!
普段の業務では社内のインフラの運用をオンプレ・クラウド問わず行っています。
2024年10月に入社し、気づいたら年を越していたので時間の速さにびっくりしています。試用期間も無事終了しひと安心です。

本記事ではGoogle CloudのSCC(Security Command Center)でセキュリティ脅威として検知した問題のあるFWルールを自動削除する予防的ガードレールを、
最近名前が変わったCloud Run Functions(旧 Cloud Functions)で実装してみたので、そちらを解説していきたいと思います!

Security Command Centerとは

Google Cloud環境のセキュリティを総合的に管理するためのプラットフォームです。
自動でセキュリティリスクや脆弱性を検出し、アラートを発信することで、クラウド環境のセキュリティ強化を支援します。
Googleのセキュリティノウハウを活かした高度な機能により、組織のセキュリティ体制を強化できます。

背景

弊社では、運用しているGoogle Cloudのプロジェクトに対して、組織単位で配下のプロジェクトに対してSCCを有効化し、脅威を検知するように設定しています。
ただ、今までは通知をSlackに飛ばすだけで、検知した脅威に対しての対処は手動で行っていました。
今回は特定のFWルールに対して、脅威として検知したものは削除するような予防的ガードレールをCloud Functionsで実装しました。

構成図

処理フロー

上記の構成図の通り、SCCで検知した脅威の情報をPub/SubでCloud Functionsに送信し、各プロジェクトのFWルールを削除します。
また、削除結果はSlackに通知します。Cloud Run Functionsの仕様上、複数回実行される場合があるため、FWルールの存在チェックを処理に入れ込み、ルールがすでに処理され存在しない場合には処理を終了させるようにしています。

  1. SCCがファイアウォールルール違反を検出
  2. 違反情報がPub/Subトピックに発行される
  3. Cloud Functionsが通知を受信
  4. 関数が違反情報を解析
  5. FWルールの存在を確認し、存在しなければ処理を終了
  6. ルールが存在すれば削除を実行
  7. 処理結果をログに記録
  8. 処理結果をSlackへ通知

実装

Pub/Sub

Cloud Functionsを実行するプロジェクトで、SCC検知のエクスポート先となるPub/Subトピックを作成します。

  1. Pub/Subの画面で「トピックを作成」をクリックします
  2. 「トピックを作成」をクリックします
  3. トピックIDを入力し、「作成」をクリックします

SCC

SCCを有効化している組織にコンソールよりアクセスし、SCCの継続エクスポートを設定します。
今回はRDPのポートをすべてのIPから許可するFWルールの検知を作成します。

  1. 組織のコンソールへログインし、セキュリティ→「リスクの概要」をクリックします
  2. 右上の「設定」をクリックします
  3. 「PUB/SUBのエクスポートを作成」をクリックします
  4. エクスポート名、エクスポートの説明を入力し、エクスポート先として作成したPub/Subトピックを選択します
  5. 検出クエリにクエリを入力し、「保存」をクリックします

例:RDPのポートをすべてのIPから許可するFWルールを検知

state="ACTIVE"
AND NOT mute="MUTED"
AND category="OPEN_RDP_PORT"

Cloud Run Functions

Pub/SubをトリガーとしたCloud Run Functionsを実装します。
サービスアカウントは事前に作成しておきます
権限は組織単位で「Compute セキュリティ管理者」を付与してあげます。

IAM権限

処理としては、SCCからPub/Subで送られてきた情報より、プロジェクト名とFWルール名を抜き出し、
対象のプロジェクトに対してFWルール削除処理を行うというシンプルなものとなっております。
また、削除対象となったプロジェクト・FWルール名は社内のSlackに通知するようにしています。
Slack通知のメッセージのテンプレート用にJinja2を利用しています。

コードは以下のとおりです。

  • main.py
import base64
import json
import os
import urllib.request

from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import compute_v1
from google.api_core import exceptions
import jinja2
from jinja2.environment import Template

# Compute Engineクライアントの初期化
compute_client = compute_v1.FirewallsClient()

# SlackのWeb Hook URLを環境変数から取得
web_hook_urls: list = os.environ["WEB_HOOK_URLS"].split(",")


@functions_framework.cloud_event
def handle_scc_violation(cloud_event: CloudEvent) -> None:
    """Pub/Subメッセージを処理し、違反ファイアウォールルールを削除する"""
    # メッセージのデコード
    pubsub_message = base64.b64decode(cloud_event.data['message']['data']).decode('utf-8')
    message_data = json.loads(pubsub_message)

    # 違反情報の抽出
    finding = message_data.get('finding', {})
    resource = message_data.get('resource', {})
    
    category = finding.get('category','')
    
    project_id = resource.get('projectDisplayName')
    firewall_rule_name = resource.get('displayName')

    # FWルールの存在確認
    if not check_firewall_rule(project_id,firewall_rule_name):
        print(f"Firewall rule {firewall_rule_name} does not exist in google cloud project {project_id}. Skipping.")
        return

    # ファイアウォールルールの削除
    try:
        delete_firewall_rule(project_id,firewall_rule_name)
        print(f"Successfully deleted firewall rule: {firewall_rule_name}")
    except Exception as e:
        print(f"Error deleting firewall rule {firewall_rule_name}: {str(e)}")
        raise  # Cloud Functionsの再試行メカニズムをトリガー

    # Slack通知の作成・送信

    post_data = make_slack_notification(project_id,firewall_rule_name,category)
    send_slack_notification(post_data)

def check_firewall_rule(project_id: str, rule_name: str) -> bool:
    """
    指定されたファイアウォールルールが存在するかどうかを確認します。
    
    :param project_id: Google Cloudプロジェクトのプロジェクトiid
    :param rule_name: 確認するファイアウォールルールの名前
    :return: ルールが存在する場合はTrue、存在しない場合はFalse
    """
    client = compute_v1.FirewallsClient()
    
    try:
        client.get(project=project_id, firewall=rule_name)
        return True
    except exceptions.NotFound:
        return False

def delete_firewall_rule(project_id: str, rule_name: str) -> None:
    """ファイアウォールルールを削除する"""
    operation = compute_client.delete(project=project_id, firewall=rule_name)
    operation.result()  # 操作の完了を待つ

def make_slack_notification(project_id: str, firewall_rule_name: str, category: str) -> str:
    """Slack通知を作成する"""
    loader: jinja2.FileSystemLoader =\
        jinja2.FileSystemLoader(
            "/".join(__file__.split("/")[:-1]),
            encoding='utf-8'
        )
    environment: jinja2.Environment = jinja2.Environment(loader=loader)
    template: Template = environment.get_template("payload_template.j2")
    post_data: str = template.render(
        project_display_name=project_id,
        display_name=firewall_rule_name,
        category=category
    )
    return post_data

def send_slack_notification(post_data: str) -> None:
    """作成した通知をSlackに送信する"""
    headers : dict = {
        "Content-Type": "application/json; charset=UTF-8"
    }

    for web_hook_url in web_hook_urls:
        req: urllib.request.Request = \
            urllib.request.Request(
                web_hook_url,
                data=post_data.encode("utf-8"),
                method="POST",
                headers=headers
            )
        try:
            urllib.request.urlopen(req, timeout=50)
        except Exception as e:
            print(e)
  • requirements.txt
functions-framework==3.*
cloudevents==1.*
google-cloud-compute
jinja2
  • payload_template.j2
{
    "blocks": [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "SCCで検知したファイアウォールルールを削除しました",
                "emoji": true
            }
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*projectDisplayName:*\n{{project_display_name}}"
                },
                {
                    "type": "mrkdwn",
                    "text": "*displayName:*\n{{display_name}}"
                },
                {
                    "type": "mrkdwn",
                    "text": "*category:*\n{{category}}"
                }
            ]
        }
    ]
}
  1. コンソールより「Cloud Run関数」を選択します
  2. 「関数を作成」をクリックします
  3. 各設定値を入力し、「次へ」を入力します
    ※環境変数には以下のものを入力します
    WEB_HOOK_URLS:<SlackのWebHookURL>
  4. コードを入力し、「デプロイ」をクリックします

実際に動かしてみた

これでガードレールが実装できたので、実際にRDPポートへ任意のIPから許可するFWルールをプロジェクト内に作成してみます!

  1. 組織配下の任意のプロジェクトに入り、「VPCネットワーク」→「ファイアウォール」を選択します。
  2. 送信元IPv4範囲を「0.0.0.0/0」、プロトコルとポートを、TCP,UCPそれぞれ3389を入力します。 3.「作成」をクリックします。

一覧に作成されました!

ここで数十秒経った後、更新ボタンを押してみると…
みごと、一覧から削除されていますね。(静止画なのであまり説得力はないですが)

通知も無事Slackに飛んできました。

Slackの通知

まとめ

今回は、SCCで検知した脅威となりうるFWルールを、自動削除する予防的ガードレールを実装しました!
他にもSCCで検知できるものは色々あるので、今後も工夫してガードレールを実装していき、
弊社内のGoogle Cloud環境の秩序を保ちたいと思います!