概要

真面目な記事ばかりだと面白くないのでPrefectを使ってチャット一個に対してDockerコンテナ一個起動するという極めて無駄が多いTelegramチャットボットシステムを作ります。

Warning

この記事を読む方は何か起きてもソースコードを読んで自力で解決できるくらいの方を想定しています!

方針

  • Githubのモノレポ構成です。
  • DockerコンテナイメージレジストリにGithub Packages Container Registryを使用します。(Docker Registryではないので注意)
  • CI/CDにGithub Actionsを使用しDockerコンテナのビルドとPrefectのデプロイを自動化します。ActionsRunnerにはGithub hosted Runnerを使用します。
  • Prefectワークフローを使ってチャットに応答します。Telegramチャットをシリアライズし、Prefectワークロードに処理を引き渡します。

開発フロー

GithubActionsを使用し、Prefectワークフローの開発インターフェースをGithubに統一します。

sequenceDiagram
    participant Developer
    participant Github
    participant ActionsRunner
    participant GithubContainerRegistry
    participant PrefectCloud
    Developer->>Github:Push Code Changes
    Github->>ActionsRunner:Detect Code Changes
    alt Github CI pipeline
        ActionsRunner->>GithubContainerRegistry:DepolyDocker
        ActionsRunner->>PrefectCloud:Depoly Prefect
    end

運用フロー

TelegramからのチャットはPrefectCloudを経由してPrefectWorkerに引き渡されます。

sequenceDiagram
    participant User
    participant Telegram
    participant PrefectCloud
    participant PrefectWorker
    participant GithubContainerRegistry
    User->>Telegram: Send Chat
    Telegram->>PrefectCloud: WebHook Event
    PrefectCloud->>PrefectWorker: Run Deployment (Automation)
    GithubContainerRegistry->>PrefectWorker: Pull Latest Code
    PrefectWorker->>Telegram: Send Reply
    Telegram->>User: Send Reply

通信フロー

sequenceDiagram
    participant telegram client
    participant telegram server
    participant prefect cloud
    participant prefect worker
    telegram client->>telegram server: MTProto
    telegram server->>prefect cloud: HTTPS
    prefect cloud->>prefect worker: WebSocketSecure
    prefect worker->>telegram server: HTTPS
    telegram server->>telegram client: MTProto

環境

  • Telegram: 無料版アカウント 1個
  • PrefectCloud: prefect.cloud 無料版アカウント 1個
  • Github: github.com 無料版アカウント privateリポジトリ 1個
  • 開発マシン: Linux系 amd64ビット系 PC 1台
  • PrefectWorker: 無料のVPS 2台
    • OracleCloudInfrastructure Compute #1: VM.Standard.E2.1.Micro
      • CPU: 1コア(OCPU), メモリ: 1GB, OS: AlmaLinux9
      • Dockerインストール済
    • OracleCloudInfrastructure Compute #2: VM.Standard.E2.1.Micro
      • CPU: 1コア(OCPU), メモリ: 1GB, OS: AlmaLinux9
      • Dockerインストール済

環境準備

Telegram

  1. https://telegram.me/BotFather からBOTを作成し、Telegram BOT API Keyを取得する。

PrefectCloud

  1. https://app.prefect.cloud/auth/sign-up からアカウント登録する。
  2. defaultワークスペース→API KeysからPrefect Cloud API Tokenを取得する。

    Warning

    セキュリティの観点でPrefect APIキーは開発マシン、PrefectWorker#1,#2の分として3つ作って使い分けること。

開発マシン

  1. 環境変数をセット。
     export PREFECT_API_KEY="Prefect Cloud API Token"
    
  2. https://docs.astral.sh/uv/#getting-started の通りuvをインストール。
     curl -LsSf https://astral.sh/uv/install.sh | sh
     echo 'source $HOME/.local/bin/env' >> ~/.bashrc
     . $HOME/.bashrc
    
  3. uvで仮想環境prefectsを作成
     mkdir -p $HOME/prefects
     uv init --native-tls $HOME/prefects
     cd $HOME/prefects
     uv --native-tls python install 3.11
     uv --native-tls python pin 3.11
    
  4. Prefect Cloud API Tokenを使用しprefect cloudにログイン。
     echo "prefect" > requirements.txt
     echo "prefect-docker" >> requirements.txt
     uv add --native-tls -r requirements.txt
     uv run --native-tls prefect cloud login -k $PREFECT_API_KEY
     # $HOME/.prefect/profiles.toml が作成されます
    
  5. prefect work-poolを作成。
     uv run --native-tls prefect work-pool create oci-pool --set-as-default -t docker --overwrite
    
  6. 実際に https://prefect.cloud/ からWork-Poolが作成されていることを確認。 work-pool.png

    Warning

    ここで、CloudManagedのWork-Poolを無効化することを推奨します。

  7. Prefect Event Webhookを作成。
        
     uv run prefect cloud webhook create telegram_getupdate --description "Receives webhooks from telegram bot" --template '{ "event": "telegram", "resource": { "prefect.resource.id": "{{ body.update_id}}" }, "message": {{ body|tojson }} }'
    
  8. 実際に https://prefect.cloud/ からEvent Webhookが作成されていることを確認。 Event WebhookのURL https://api.prefect.cloud/hooks/<urlslug> をメモ。 event-webhook.png

    Warning

    無料版の場合クライアントの認証はできませんので、URLは秘匿してください。

PrefectWorker(#1,#2 共通)

  1. https://docs.astral.sh/uv/#getting-started の通りuvをインストール。
     curl -LsSf https://astral.sh/uv/install.sh | sh
     echo 'source $HOME/.local/bin/env' >> ~/.bashrc
     . $HOME/.bashrc
    
  2. uvで仮想環境prefectsを作成。
     mkdir -p $HOME/prefects
     uv init --native-tls $HOME/prefects
     cd $HOME/prefects
     uv python --native-tls install 3.11
     uv python --native-tls pin 3.11
    
  3. Prefect Cloud API Tokenを使用しprefect cloudにログイン。
     echo "httpx" > requirements.txt
     echo "prefect" > requirements.txt
     echo "prefect-docker" >> requirements.txt
     uv add --native-tls -r requirements.txt
     uv run --native-tls prefect cloud login -k $PREFECT_API_KEY
     # $HOME/.prefect/profiles.toml が作成されます
    
  4. prefect workerを起動
     nohup uv run --native-tls prefect worker start --pool "oci-pool" --name oci01 >~/prefect.log 2>&1 &
     # nohup uv run --native-tls prefect worker start --pool "oci-pool" --name oci02 >~/prefect.log 2>&1 & #二号機は名前をoci02にしよう
    

    Warning

    prefect workerを動かすユーザはDockerグループに所属するなど、dockerコマンドが使える状態にしてください。最初は1号機のみで試験動作をした方が、問題があった際にみるべきprefect.logを特定するのに困りません。

  5. 実際に https://prefect.cloud/ からWorkerが登録されていることを確認 worker.png

以降、PrefectWorker側は一切触りません!

Github

  1. prefectsという名前でGithubプライベートリポジトリを作成する。(名前は何でも良い)
  2. 自身のアカウントのGithub Personal Access Tokenを取得しておく。権限は以下。
    • read:packages: Dockerコンテナレジストリの利用に必要。

    Warning

    今回Dockerコンテナ内にソースコードもパッケージングするので、暗にread:repo権限も含む点にご注意ください。発行するGithub Personal Access Tokenの種類はTokens (classic)を選択してください。本記事執筆時点で、Fine-grained TokensはGithub Packagesに対応していません。

  3. prefectsリポジトリ→Settings→Actions→General→Workflow permisiionsから、GithubワークフローがGITHUB_TOKEN環境変数を使ってGithub Container Registryにアクセス可能にします。 action_priv.png
  4. prefectsリポジトリ→Settings→Secrets and Variables→Actions→Secretsから、PREFECT_API_KEYという名前で取得したPrefect Cloud API Tokenを登録します。GithubワークフローがPREFECT_API_KEY環境変数を使ってPrefectCloudにアクセスできるようにします。
  5. prefectsリポジトリ→Settings→Secrets and Variables→Actions→Secretsから、TG_BOT_TOKENという名前で取得したTelegram BOT API Keyを登録します。GithubワークフローがTG_BOT_TOKEN環境変数を使ってTelegramBOTを扱えるようにします。
  6. prefectsリポジトリ→Settings→Secrets and Variables→Actions→Secretsから、PREFECT_WEBHOOKという名前で取得したPrefect Event Webhookを登録します。GithubワークフローがPREFECT_WEBHOOK環境変数を使ってTelegramBOTを扱えるようにします。 github_secrets.png

設定

開発マシンからPrefectCloudとTelegramを設定

  1. 環境変数をセット。中身はちゃんと変えてね。
     export GITHUB_TOKEN="Github Personal Access Token"
     export TG_BOT_TOKEN="Telegram BOT API Key"
     export PREFECT_WEBHOOK="Prefect Event Webhook"
    
  2. Prefectプロジェクトを初期化
     cd $HOME/prefects
     uv run --native-tls prefect init --recipe git
    

    Warning

    自動作成されるprefect.yamlは使いません。

  3. 各種変数をPrefectCloudに登録します。
    1. 以下の内容のsetup.pyを作成します。
       cd $HOME/prefects
       vim setup.py
      
       #!/usr/bin/env python
       # encoding: utf-8
       import os
       import asyncio
      
       import httpx
       from prefect import flow, task
       from prefect.blocks.system import Secret
       from prefect_docker import DockerRegistryCredentials
      
       @task
       async def set_blocks() -> None:
           GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", None)
           TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", None)
           PREFECT_WEBHOOK = os.getenv("PREFECT_WEBHOOK", None)
           async with asyncio.TaskGroup() as t:
               t.create_task(
                   DockerRegistryCredentials(
                       registry_url="ghcr.io", # Github Container Registry
                       username="syakesaba", # Github Username
                       password=GITHUB_TOKEN # Github Personal Access Token
                   ).save(name="ghcr-docker-registry-credential-block", overwrite=True)
               )
      
               t.create_task(
                   Secret(value=TG_BOT_TOKEN).save(
                       name="tg-bot-secret-block", overwrite=True
                   )
               )
      
               t.create_task(
                   Secret(value=PREFECT_WEBHOOK).save(
                       name="prefect-webhook-secret-block", overwrite=True
                   )
               )
      
       @task
       async def set_hooks() -> httpx.Response:
           """
           POST https://api.telegram.org/bot{my_bot_token}/setWebhook?url={url_to_send_updates_to}
           """
           TG_BOT_TOKEN = (await Secret.load("tg-bot-secret-block")).get()
           PREFECT_WEBHOOK = (await Secret.load("prefect-webhook-secret-block")).get()
           acli = httpx.AsyncClient()
           return acli.post(f"https://api.telegram.org/bot{TG_BOT_TOKEN}/setWebhook?url={PREFECT_WEBHOOK}")
      
       @flow
       async def setup() -> None:
           print(await set_blocks())
           print(await set_hooks())
      
       if __name__ == "__main__":
           asyncio.run(setup())
      
    2. 実行
       uv run --native-tls python setup.py
      

      実行することでPrefect Cloudに登録されたことが確認できます。 prefect_block.png

  4. $HOME/prefects/echo.pyというPrefectワークフローを作成します。
     vim $HOME/prefects/echo.py
    
     #!/usr/bin/env python
     # encoding: utf-8
     import asyncio
     from prefect import flow, task
     @task(log_prints=True)
     async def print_hello(name: str):
         print(f"Hello {name} from Prefect! 🤗")
     @task(log_prints=True)
     async def print_goodbye(name: str):
         print(f"Goodbye {name}!")
     @flow(log_prints=True)
     async def echo(name: str = "world", goodbye: bool = False):
         async with asyncio.TaskGroup() as tg:
             task1 = tg.create_task(print_hello(name=name))
             if goodbye:
                 task2 = tg.create_task(print_goodbye(name=name))
     if __name__ == "__main__":
         asyncio.run(echo(name="World", goodbye=True))
    
  5. $HOME/prefects/echo.pyを含めるDockerfileを作成します。
     vim $HOME/prefects/Dockerfile
    
     ARG _PYTHON_VERSION=3.11
     FROM ghcr.io/astral-sh/uv:python${_PYTHON_VERSION}-bookworm-slim
     COPY . /opt/prefect/
     WORKDIR /opt/prefect/
     ENV UV_SYSTEM_PYTHON=1
     ENV PYTHONUNBUFFERED=1
     ENV PATH="/root/.local/bin:$PATH"
     COPY requirements.txt .
     RUN --mount=type=cache,target=/root/.cache/uv \
     uv pip install -r requirements.txt
    
  6. $HOME/prefects/prefect.yamlを作成します。 ```yaml name: echo prefect-version: 3.1.15 build: null push: null pull: null deployments:
    • name: echo entrypoint: echo.py:echo work_pool: name: oci-pool work_queue_name: job_variables: image: “{{ $DOCKER_REGISTRY }}/{{ $DOCKER_IMAGE_NAME }}” registry_credentials: “{{ prefect.blocks.docker-registry-credentials.ghcr-docker-registry-credential-block }}” image_pull_policy: Always auto_remove: true pull:
      • prefect.deployments.steps.set_working_directory: directory: /opt/prefect/ ``` :::message
    • 他にjob_variablesでDocker-Poolに何が指定できるかはこちらを参照。
    • set_working_directoryでos.chdirしてます。 :::
  7. GithubActions用のディレクトリを作成します。
     mkdir -p $HOME/prefects/.github/workflows
    
  8. 以下のようなbuild_and_deploy.yamlを作成します。
     vim $HOME/prefects/.github/workflows/build_and_deploy.yaml
    
     name: Deploy Prefet Deployments
     on:
         push:
         branches:
             - main
     env:
         DOCKER_REGISTRY: ghcr.io
         DOCKER_IMAGE_NAME: ${{ github.repository }}
     jobs:
         main:
         runs-on: ubuntu-latest
         permissions:
             contents: read
             packages: write
         steps:
             -
             name: Check out source repository
             uses: actions/checkout@v4
             with:
                 fetch-depth: 1
             -
             name: Extract metadata (tags, labels) for Docker
             id: meta
             uses: docker/metadata-action@v5
             with:
                 images: ${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }}
                 tags: |
                 # set latest tag for default branch
                 type=raw,value=latest,enable=
             - 
             name: Login to GitHub Container Registry
             uses: docker/login-action@v3
             with:
                 registry: ghcr.io
                 username: ${{ github.actor }}
                 password: ${{ secrets.GITHUB_TOKEN }}
             -
             name: Set up Docker Buildx
             uses: docker/setup-buildx-action@v3
             -
             name: Build and push
             uses: docker/build-push-action@v6
             with:
                 context: .
                 push: ${{ github.event_name != 'pull_request' }}
                 tags: ${{ steps.meta.outputs.tags }}
                 labels: ${{ steps.meta.outputs.labels }}
                 cache-from: type=registry,ref=${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }}:buildcache
                 cache-to: type=registry,ref=${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }}:buildcache,mode=max
             -
             name: Install Prefect
             run: |
                 uv sync --native-tls
             -
             name: Login to Prefect Cloud
             env:
                 PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
             run: |
                 yes | uv run prefect cloud login -k "${PREFECT_API_KEY}"
             -
             name: Deploy Prefect
             run: |
                 yes n | uv run --native-tls prefect deploy --all
    
  9. ここまでのprefectsのコードを一旦github.comにPushします。
     echo "blocks.py" >> $HOME/prefects/.prefectgitignore
     cp $HOME/prefects/.prefectignore $HOME/prefects/.gitignore
     cp $HOME/prefects/.prefectignore $HOME/prefects/.dockerignore
     cd $HOME/prefects
     git init
     git remote add origin https://github.com/<username>/prefects.git
     git branch -M main
     git add .
     git commit -am "initial commit"
     git push -uf origin main
    

    :::message githubへのログインは適当にやってください。 :::

開発マシンからTelegramを設定

  1. PrefectCloudのWebhookのURL https://api.prefect.cloud/hooks/<urlslug>をTelegramに登録します。
    1. uvで仮想環境prefectsにhttpxを追加します。既に依存関係としてインストールされてますが念の為手動でインストールします。
       cd $HOME/prefects
       echo "httpx" >> requirements.txt
       uv add --native-tls -r requirements.txt
      
    2. 以下の内容のhooks.pyを作成します。
       cd $HOME/prefects
       vim hooks.py
      
       #!/usr/bin/env python
       # encoding: utf-8
       import httpx
       from prefect import flow, task
       from prefect.blocks.system import Secret
      
       @flow
       def add_hooks():
           secret_block = Secret.load("prefect-webhook-secret-block")
           webhook_uri = secret_block.get()
           return httpx.post("https://api.prefect.cloud/hooks/<urlslug>")
      
       if __name__ == "__main__":
           print(add_hooks())
      
    3. 実行
       uv run --native-tls python hooks.py
      

      実行することでPrefect Cloudに登録されたことが確認できます。 prefect_block.png

Prefect上にCI/CDでデプロイされたワークフローの動作確認

PrefectCloudにログイン後、DeploymentsからQuick Runをすれば実行できていることを確認できます。

echo_test_deploy.png

aiogramとTelegramのインテグレーション

aiogramは非同期処理を前提としたTelegram APIクライアントライブラリです。

  1. uvで仮想環境prefectsにaiogramを追加します。
     cd $HOME/prefects
     echo "aiogram" >> requirements.txt
     uv add --native-tls -r requirements.txt
    
  2. 公式サイトを参考に$HOME/prefects/recv.pyを作成します。
     vim $HOME/prefects/recv.py
    
        
    
  3. $HOME/prefects/recv.pyをバックグラウンドで実行し

補足: aiogramの主要な機能

  • メッセージのpin: message.pin()
  • メッセージの応答: message.reply(“pong”)
  • メッセージへのリアクション:
    • aiogram.types.reaction_type_emoji.ReactionTypeEmoji

まとめ

Kubernetesの名前空間でクローズしたCI/CDが可能になりました