Prefect Echo: PrefectによるTelegramチャットボットシステム
概要
真面目な記事ばかりだと面白くないのでPrefectを使ってチャット一個に対してDockerコンテナ一個起動するという極めて無駄が多いTelegramチャットボットシステムを作ります。
Warning
この記事を読む方は何か起きてもソースコードを読んで自力で解決できるくらいの方を想定しています!
方針
- Githubのモノレポ構成です。
- DockerコンテナイメージレジストリにGithub Packages Container Registryを使用します。(Docker Registryではないので注意)
- CI/CDにGithub Actionsを使用しDockerコンテナのビルドとPrefectのデプロイを自動化します。ActionsRunnerにはGithub hosted Runnerを使用します。
- Prefectワークフローを使ってチャットに応答します。Telegramチャットをシリアライズし、Prefectワークロードに処理を引き渡します。
開発フロー
GithubActionsを使用し、Prefectワークフローの開発インターフェースをGithubに統一します。
sequenceDiagram
participant Developer
participant Github
participant ActionsRunner
participant GithubContainerRegistry
participant PrefectCloud
Developer->>Github:Push Code Changes
Github->>ActionsRunner:Detect Code Changes
alt Github CI pipeline
ActionsRunner->>GithubContainerRegistry:DepolyDocker
ActionsRunner->>PrefectCloud:Depoly Prefect
end
運用フロー
TelegramからのチャットはPrefectCloudを経由してPrefectWorkerに引き渡されます。
sequenceDiagram
participant User
participant Telegram
participant PrefectCloud
participant PrefectWorker
participant GithubContainerRegistry
User->>Telegram: Send Chat
Telegram->>PrefectCloud: WebHook Event
PrefectCloud->>PrefectWorker: Run Deployment (Automation)
GithubContainerRegistry->>PrefectWorker: Pull Latest Code
PrefectWorker->>Telegram: Send Reply
Telegram->>User: Send Reply
通信フロー
sequenceDiagram
participant telegram client
participant telegram server
participant prefect cloud
participant prefect worker
telegram client->>telegram server: MTProto
telegram server->>prefect cloud: HTTPS
prefect cloud->>prefect worker: WebSocketSecure
prefect worker->>telegram server: HTTPS
telegram server->>telegram client: MTProto
環境
- Telegram: 無料版アカウント 1個
- PrefectCloud: prefect.cloud 無料版アカウント 1個
- Github: github.com 無料版アカウント privateリポジトリ 1個
- 開発マシン: Linux系 amd64ビット系 PC 1台
- PrefectWorker: 無料のVPS 2台
- OracleCloudInfrastructure Compute #1: VM.Standard.E2.1.Micro
- CPU: 1コア(OCPU), メモリ: 1GB, OS: AlmaLinux9
- Dockerインストール済
- OracleCloudInfrastructure Compute #2: VM.Standard.E2.1.Micro
- CPU: 1コア(OCPU), メモリ: 1GB, OS: AlmaLinux9
- Dockerインストール済
- OracleCloudInfrastructure Compute #1: VM.Standard.E2.1.Micro
環境準備
Telegram
- https://telegram.me/BotFather からBOTを作成し、
Telegram BOT API Key
を取得する。
PrefectCloud
- https://app.prefect.cloud/auth/sign-up からアカウント登録する。
- defaultワークスペース→API Keysから
Prefect Cloud API Token
を取得する。Warning
セキュリティの観点でPrefect APIキーは開発マシン、PrefectWorker#1,#2の分として3つ作って使い分けること。
開発マシン
- 環境変数をセット。
export PREFECT_API_KEY="Prefect Cloud API Token"
- https://docs.astral.sh/uv/#getting-started の通りuvをインストール。
curl -LsSf https://astral.sh/uv/install.sh | sh echo 'source $HOME/.local/bin/env' >> ~/.bashrc . $HOME/.bashrc
- uvで仮想環境
prefects
を作成mkdir -p $HOME/prefects uv init --native-tls $HOME/prefects cd $HOME/prefects uv --native-tls python install 3.11 uv --native-tls python pin 3.11
Prefect Cloud API Token
を使用しprefect cloudにログイン。echo "prefect" > requirements.txt echo "prefect-docker" >> requirements.txt uv add --native-tls -r requirements.txt uv run --native-tls prefect cloud login -k $PREFECT_API_KEY # $HOME/.prefect/profiles.toml が作成されます
- prefect work-poolを作成。
uv run --native-tls prefect work-pool create oci-pool --set-as-default -t docker --overwrite
- 実際に https://prefect.cloud/ からWork-Poolが作成されていることを確認。
Warning
ここで、CloudManagedのWork-Poolを無効化することを推奨します。
Prefect Event Webhook
を作成。uv run prefect cloud webhook create telegram_getupdate --description "Receives webhooks from telegram bot" --template '{ "event": "telegram", "resource": { "prefect.resource.id": "{{ body.update_id}}" }, "message": {{ body|tojson }} }'
- 実際に https://prefect.cloud/ からEvent Webhookが作成されていることを確認。
Event WebhookのURL
https://api.prefect.cloud/hooks/<urlslug>
をメモ。Warning
無料版の場合クライアントの認証はできませんので、URLは秘匿してください。
PrefectWorker(#1,#2 共通)
- https://docs.astral.sh/uv/#getting-started の通りuvをインストール。
curl -LsSf https://astral.sh/uv/install.sh | sh echo 'source $HOME/.local/bin/env' >> ~/.bashrc . $HOME/.bashrc
- uvで仮想環境
prefects
を作成。mkdir -p $HOME/prefects uv init --native-tls $HOME/prefects cd $HOME/prefects uv python --native-tls install 3.11 uv python --native-tls pin 3.11
Prefect Cloud API Token
を使用しprefect cloudにログイン。echo "httpx" > requirements.txt echo "prefect" > requirements.txt echo "prefect-docker" >> requirements.txt uv add --native-tls -r requirements.txt uv run --native-tls prefect cloud login -k $PREFECT_API_KEY # $HOME/.prefect/profiles.toml が作成されます
- prefect workerを起動
nohup uv run --native-tls prefect worker start --pool "oci-pool" --name oci01 >~/prefect.log 2>&1 & # nohup uv run --native-tls prefect worker start --pool "oci-pool" --name oci02 >~/prefect.log 2>&1 & #二号機は名前をoci02にしよう
Warning
prefect workerを動かすユーザはDockerグループに所属するなど、dockerコマンドが使える状態にしてください。最初は1号機のみで試験動作をした方が、問題があった際にみるべきprefect.logを特定するのに困りません。
- 実際に https://prefect.cloud/ からWorkerが登録されていることを確認
以降、PrefectWorker側は一切触りません!
Github
prefects
という名前でGithubプライベートリポジトリを作成する。(名前は何でも良い)- 自身のアカウントの
Github Personal Access Token
を取得しておく。権限は以下。read:packages
: Dockerコンテナレジストリの利用に必要。
Warning
今回Dockerコンテナ内にソースコードもパッケージングするので、暗にread:repo権限も含む点にご注意ください。発行するGithub Personal Access Tokenの種類はTokens (classic)を選択してください。本記事執筆時点で、Fine-grained TokensはGithub Packagesに対応していません。
prefects
リポジトリ→Settings→Actions→General→Workflow permisiionsから、GithubワークフローがGITHUB_TOKEN
環境変数を使ってGithub Container Registryにアクセス可能にします。prefects
リポジトリ→Settings→Secrets and Variables→Actions→Secretsから、PREFECT_API_KEY
という名前で取得したPrefect Cloud API Token
を登録します。GithubワークフローがPREFECT_API_KEY
環境変数を使ってPrefectCloudにアクセスできるようにします。prefects
リポジトリ→Settings→Secrets and Variables→Actions→Secretsから、TG_BOT_TOKEN
という名前で取得したTelegram BOT API Key
を登録します。GithubワークフローがTG_BOT_TOKEN
環境変数を使ってTelegramBOTを扱えるようにします。prefects
リポジトリ→Settings→Secrets and Variables→Actions→Secretsから、PREFECT_WEBHOOK
という名前で取得したPrefect Event Webhook
を登録します。GithubワークフローがPREFECT_WEBHOOK
環境変数を使ってTelegramBOTを扱えるようにします。
設定
開発マシンからPrefectCloudとTelegramを設定
- 環境変数をセット。中身はちゃんと変えてね。
export GITHUB_TOKEN="Github Personal Access Token" export TG_BOT_TOKEN="Telegram BOT API Key" export PREFECT_WEBHOOK="Prefect Event Webhook"
- Prefectプロジェクトを初期化
cd $HOME/prefects uv run --native-tls prefect init --recipe git
Warning
自動作成されるprefect.yamlは使いません。
- 各種変数をPrefectCloudに登録します。
- 以下の内容のsetup.pyを作成します。
cd $HOME/prefects vim setup.py
#!/usr/bin/env python # encoding: utf-8 import os import asyncio import httpx from prefect import flow, task from prefect.blocks.system import Secret from prefect_docker import DockerRegistryCredentials @task async def set_blocks() -> None: GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", None) TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", None) PREFECT_WEBHOOK = os.getenv("PREFECT_WEBHOOK", None) async with asyncio.TaskGroup() as t: t.create_task( DockerRegistryCredentials( registry_url="ghcr.io", # Github Container Registry username="syakesaba", # Github Username password=GITHUB_TOKEN # Github Personal Access Token ).save(name="ghcr-docker-registry-credential-block", overwrite=True) ) t.create_task( Secret(value=TG_BOT_TOKEN).save( name="tg-bot-secret-block", overwrite=True ) ) t.create_task( Secret(value=PREFECT_WEBHOOK).save( name="prefect-webhook-secret-block", overwrite=True ) ) @task async def set_hooks() -> httpx.Response: """ POST https://api.telegram.org/bot{my_bot_token}/setWebhook?url={url_to_send_updates_to} """ TG_BOT_TOKEN = (await Secret.load("tg-bot-secret-block")).get() PREFECT_WEBHOOK = (await Secret.load("prefect-webhook-secret-block")).get() acli = httpx.AsyncClient() return acli.post(f"https://api.telegram.org/bot{TG_BOT_TOKEN}/setWebhook?url={PREFECT_WEBHOOK}") @flow async def setup() -> None: print(await set_blocks()) print(await set_hooks()) if __name__ == "__main__": asyncio.run(setup())
- 実行
uv run --native-tls python setup.py
実行することでPrefect Cloudに登録されたことが確認できます。
- 以下の内容のsetup.pyを作成します。
$HOME/prefects/echo.py
というPrefectワークフローを作成します。vim $HOME/prefects/echo.py
#!/usr/bin/env python # encoding: utf-8 import asyncio from prefect import flow, task @task(log_prints=True) async def print_hello(name: str): print(f"Hello {name} from Prefect! 🤗") @task(log_prints=True) async def print_goodbye(name: str): print(f"Goodbye {name}!") @flow(log_prints=True) async def echo(name: str = "world", goodbye: bool = False): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(print_hello(name=name)) if goodbye: task2 = tg.create_task(print_goodbye(name=name)) if __name__ == "__main__": asyncio.run(echo(name="World", goodbye=True))
$HOME/prefects/echo.py
を含めるDockerfileを作成します。vim $HOME/prefects/Dockerfile
ARG _PYTHON_VERSION=3.11 FROM ghcr.io/astral-sh/uv:python${_PYTHON_VERSION}-bookworm-slim COPY . /opt/prefect/ WORKDIR /opt/prefect/ ENV UV_SYSTEM_PYTHON=1 ENV PYTHONUNBUFFERED=1 ENV PATH="/root/.local/bin:$PATH" COPY requirements.txt . RUN --mount=type=cache,target=/root/.cache/uv \ uv pip install -r requirements.txt
$HOME/prefects/prefect.yaml
を作成します。 ```yaml name: echo prefect-version: 3.1.15 build: null push: null pull: null deployments:- name: echo
entrypoint: echo.py:echo
work_pool:
name: oci-pool
work_queue_name:
job_variables:
image: “{{ $DOCKER_REGISTRY }}/{{ $DOCKER_IMAGE_NAME }}”
registry_credentials: “{{ prefect.blocks.docker-registry-credentials.ghcr-docker-registry-credential-block }}”
image_pull_policy: Always
auto_remove: true
pull:
- prefect.deployments.steps.set_working_directory: directory: /opt/prefect/ ``` :::message
- 他にjob_variablesでDocker-Poolに何が指定できるかはこちらを参照。
- set_working_directoryでos.chdirしてます。 :::
- name: echo
entrypoint: echo.py:echo
work_pool:
name: oci-pool
work_queue_name:
job_variables:
image: “{{ $DOCKER_REGISTRY }}/{{ $DOCKER_IMAGE_NAME }}”
registry_credentials: “{{ prefect.blocks.docker-registry-credentials.ghcr-docker-registry-credential-block }}”
image_pull_policy: Always
auto_remove: true
pull:
- GithubActions用のディレクトリを作成します。
mkdir -p $HOME/prefects/.github/workflows
- 以下のような
build_and_deploy.yaml
を作成します。vim $HOME/prefects/.github/workflows/build_and_deploy.yaml
name: Deploy Prefet Deployments on: push: branches: - main env: DOCKER_REGISTRY: ghcr.io DOCKER_IMAGE_NAME: ${{ github.repository }} jobs: main: runs-on: ubuntu-latest permissions: contents: read packages: write steps: - name: Check out source repository uses: actions/checkout@v4 with: fetch-depth: 1 - name: Extract metadata (tags, labels) for Docker id: meta uses: docker/metadata-action@v5 with: images: ${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }} tags: | # set latest tag for default branch type=raw,value=latest,enable= - name: Login to GitHub Container Registry uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Build and push uses: docker/build-push-action@v6 with: context: . push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=registry,ref=${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }}:buildcache cache-to: type=registry,ref=${{ env.DOCKER_REGISTRY }}/${{ env.DOCKER_IMAGE_NAME }}:buildcache,mode=max - name: Install Prefect run: | uv sync --native-tls - name: Login to Prefect Cloud env: PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }} run: | yes | uv run prefect cloud login -k "${PREFECT_API_KEY}" - name: Deploy Prefect run: | yes n | uv run --native-tls prefect deploy --all
- ここまでの
prefects
のコードを一旦github.comにPushします。echo "blocks.py" >> $HOME/prefects/.prefectgitignore cp $HOME/prefects/.prefectignore $HOME/prefects/.gitignore cp $HOME/prefects/.prefectignore $HOME/prefects/.dockerignore cd $HOME/prefects git init git remote add origin https://github.com/<username>/prefects.git git branch -M main git add . git commit -am "initial commit" git push -uf origin main
:::message githubへのログインは適当にやってください。 :::
開発マシンからTelegramを設定
- PrefectCloudのWebhookのURL
https://api.prefect.cloud/hooks/<urlslug>
をTelegramに登録します。- uvで仮想環境
prefects
にhttpxを追加します。既に依存関係としてインストールされてますが念の為手動でインストールします。cd $HOME/prefects echo "httpx" >> requirements.txt uv add --native-tls -r requirements.txt
- 以下の内容のhooks.pyを作成します。
cd $HOME/prefects vim hooks.py
#!/usr/bin/env python # encoding: utf-8 import httpx from prefect import flow, task from prefect.blocks.system import Secret @flow def add_hooks(): secret_block = Secret.load("prefect-webhook-secret-block") webhook_uri = secret_block.get() return httpx.post("https://api.prefect.cloud/hooks/<urlslug>") if __name__ == "__main__": print(add_hooks())
- 実行
uv run --native-tls python hooks.py
実行することでPrefect Cloudに登録されたことが確認できます。
- uvで仮想環境
Prefect上にCI/CDでデプロイされたワークフローの動作確認
PrefectCloudにログイン後、DeploymentsからQuick Runをすれば実行できていることを確認できます。
aiogramとTelegramのインテグレーション
aiogramは非同期処理を前提としたTelegram APIクライアントライブラリです。
- uvで仮想環境
prefects
にaiogramを追加します。cd $HOME/prefects echo "aiogram" >> requirements.txt uv add --native-tls -r requirements.txt
- 公式サイトを参考に
$HOME/prefects/recv.py
を作成します。vim $HOME/prefects/recv.py
$HOME/prefects/recv.py
をバックグラウンドで実行し
補足: aiogramの主要な機能
- メッセージのpin: message.pin()
- メッセージの応答: message.reply(“pong”)
- メッセージへのリアクション:
- aiogram.types.reaction_type_emoji.ReactionTypeEmoji
まとめ
Kubernetesの名前空間でクローズしたCI/CDが可能になりました