こんにちは、Assured のオリバーです。
最近、Google Platform の Workflows を業務に導入し、非同期で動作していたプロセスやバッチをパイプライン化して自動化することで、管理コストと運用コストを削減することができました。この記事では、私たちが取り入れた構成例や、参考になりそうなポイントをいくつか紹介します。これらの情報が、これから Workflows を試してみたい方や、既に使用を開始している方にとってお役に立てれば嬉しいです。
すでに Workflows をご利用の方は、「Workflowsとは」のセクションを飛ばして、「Assured のユースケース」からご覧ください。
Assured の事例は以下の二つについて話をします。
Workflowsとは
Workflows は Google Cloud のサービスで、複数の Google Cloud サービスや HTTP ベースの API サービス間でのタスク自動化と連携を可能にするフルマネージド型サービスです。
特徴
- Google Platform 外部の HTTP ベースの API と Google Platform サービスとの組み合わせが可能
- サーバレス構成で、必要に応じて自動的にスケールアップ
- ステップ間のサービス依存関係をエンドツーエンドで可視化し、モニタリングが可能
- 実行履歴はコンソールまたは REST API から確認できる
- 年間実行可能時間に対する制限あり(2024年3月現在)
- 使用した分だけの課金で、月額の無料枠が充実
料金
Workflows は、実行されるステップごとに課金されます。ステップには 内部ステップ と 外部ステップ があり、それぞれに無料枠とその超過分の料金が設定されています。
- 内部ステップ: Google Platform 内部の API やサービスの呼び出し、ワークフロー内での条件分岐や関数の実行など
- 外部ステップ: Google Platform 外部への API 送信やカスタムドメインへのリクエストなど
無料枠 (月) | 有料部分 (月額) | |
---|---|---|
内部 | 5000 ステップ | 1,000 ステップごとに $0.01 |
外部 | 2000 ステップ | 1,000 ステップごとに $0.025 |
※料金の詳細は 料金の概要ページ をご確認ください。
主な機能
Workflows は、以下の5つの制御文で構成されており、条件分岐や計算などに使用できる 標準ライブラリ を利用できます。
- ステップ: ワークフローでは、ステップとその実行順序を定義します。最低1ステップが必要で、ステップは順番に実行されます。
- 条件:
switch
ブロックを使用して、実行フローを制御することができます。条件に一致する場合、該当するステートメントが実行されます。 - 繰り返し:
for
ループを使用して、リストやマップの値を反復処理し、実行することができます。 - 並列処理:
parallel
ステップを使うと、ステップやワークフローの一部を並行して実行することができます。 - サブワークフロー: 一つまたは複数のステップをカプセル化して、再利用可能な単位として定義できます。
さらに、組み込み変数を使用したり、引数を受け取ることで、より柔軟にモニタリングやステップの定義が行えます。
続いて私たちのチームが直面した課題とその解決策に活用したWorkflowsの機能を事例を通じて紹介します。サンプルは YAML を使用して Workflows の定義方法について解説していきます。
Assured のユースケース
Assured では、複数のサブシステムを Cloud Run で実行し、それぞれの結果を集めて非同期に BigQuery で統計や分析を行っています。統計には、いくつかのデータ取得、解析のJobの実行後に計算を走らせる必要があり、Workflowsの活用前は各Jobを以下のようにそれぞれ独立して走らせていました。
flowchart TD subgraph Z["データ取得"] 非同期プロセスxN end subgraph ZA["データ解析A"] end subgraph ZB["データ解析B"] end subgraph ZC["データ解析C"] end subgraph ZZ["全体の解析結果を保存"] end Z -.定期実行(本当に終わった?).-> ZA Z -.定期実行(本当に終わった?).-> ZB Z -.定期実行(本当に終わった?).-> ZC ZA -.他は終わった?.-> ZZ ZB -.他は終わった?.-> ZZ ZC -.他は終わった?.-> ZZ
しかし、この方法では、以下のような問題が起こります。
- データ解析処理の完了を保証できない
- 憶測では一定時間の待ちに無駄が生じる
- 一部のJobが失敗した時に全体から問題を特定/把握しづらい
そこで、Workflowsで全体をパイプライン化することで、全てのキューイング処理の完了、非同期で動いていた解析の開始と終了後に無駄な待ち時間などの上記の問題を解決しました。
flowchart TD subgraph Z["データ取得"] 非同期プロセスxN end subgraph ZA["取得完了後、処理を並列に同時実行"] データ解析A データ解析B データ解析C end subgraph ZZ["全体の解析結果を保存"] end Z --データ取得完了--> ZA ZA --全部の解析終了--> ZZ
実装上は、以下の2つがのポイントとなります。
それぞれを、Google Cloud Platform のサンプルコードを用いて今回のケースに合わせてご紹介します。
1. データ解析のパイプライン化
Workflows を利用すると、非同期のプロセスを並列化しつつ各ステップを連携つしてパイプラインとして実行できます。
以下は実際のタスクを模したサンプルで、SQLの結果をBigQueryから取得し保存するものです。
- 並列化した三つの解析処理: parallel: shared: [tables] branches: - 解析A: steps: - 解析AND統計処理CallA: for: value: table in: ${tables} steps: - analyticsCallA: call: analyticsCall args: table: ${table} - 解析B: steps: - 解析AND統計処理CallB: for: value: table in: ${tables} steps: - analyticsCallB: call: analyticsCall args: table: ${table} - 解析C: steps: - 解析AND統計処理CallC: for: value: table in: ${tables} steps: - analyticsCallC: call: analyticsCall args: table: ${table}
上記のように定義したWorkflowsをタスクの可視化機能を利用すると、以下のようになります。
可視化したステップはプロセスの流れを直感的に理解しやすくなり、プロジェクトチーム内での共有やデバッグ作業が容易になります。
複数のプロセスを連携させてパイプライン化することで、各全体処理の実行単位ごとに記録を残すことができるようになります。これにより、入力データやログを明確に設定することで、従来は非同期処理で難しかったデバッグ作業がパイプラインの実行単位で関連づけて調査が可能になりました。
そのため、エラーが発生した場合の実行記録も、どのステップ(ステート)で処理が停止したのかが一覧で確認できるようになり、問題解析がスムーズになりそうです。
2. 長時間のキューイングプロセスの自動化
Workflows では、 CloudTasks API の呼び出しを利用して、キューの状態を監視し、全てのタスクが完了したかどうかを検知できます。また、特定のステップへの条件付きジャンプのようなステップを活用することで、全体の処理が完了するまで最後の処理を待機させることができます。
以下のようにステップを組み合わせて実現しています。
main: steps: - init: assign: - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - time_start: ${sys.now()} - time_limit_seconds: 36000.0 - wait_time_seconds: 600 - is_scan_time_out: switch: - condition: ${(sys.now() - time_start) > time_limit_seconds} return: "Error: Timeout" - check_queue: call: googleapis.cloudtasks.v2beta3.projects.locations.queues.get args: name: "projects/PROJECT_ID/locations/LOCATION/queues/QUEUE_ID" readMask: stats.tasksCount result: queueRes - is_tasks_done: switch: - condition: ${map.get(queueRes, ["stats", "tasksCount"]) == null} next: create_back_up next: wait_and_recheck - wait_and_recheck: call: sys.sleep args: seconds: ${wait_time_seconds} next: is_scan_time_out - create_back_up: call: googleapis.cloudbuild.v1.projects.locations.triggers.run args: name: "projects/PROJECT/triggers/TRIGGER_ID" body:
これを可視化すると、以下のような少し複雑なステップになります。create_back_up が全体のタスク完了時に行いたいタスクのトリガーです。
- is_scan_time_out: 全体の待ち時間が超過しているかを判断する(超過した場合はエラー)
- check_queue: CloudTasks API を呼び出してキューの残りを取得する
- is_tasks_done: キューの残りを確認して、全体が完了したかを判断する
- wait_and_recheck: キューの処理を待つ
check_queue
が実際にキューとして使っている Cloud Tasks の API で呼び出し レスポンス の残りタスク数をして、 is_tasks_done
と wait_and_recheck
の処理により、check_queue
ステップをループしポーリングのような形で キュー内のすべて処理されたことを確認できます。また、is_scan_time_out
のステップを挟むことで、パイプラインが想定以上に長期化してないかを実行ステータスとして履歴に残せます。
上記のサンプルで使用した Workflowsの機能は以下のドキュメントを参考にしてみて下さい。
- 残タスクキューの確認で CloudTasks API を呼び出した Caller ステップ
- 条件によって別アクションステップへの条件付きジャンプステップ
- ポーリングを使用した待機
Assured では上記の二つの例を組み合わせて、いくつかのパプラインを構成して、複雑な条件分岐や状態の監視を Workflows で自動化することができました。
最後に
以上で、Workflows を活用したデータ処理のパイプライン化と並列処理、そして長時間のキューイングプロセスの自動化についてお話ししました。導入してみて現状あった課題を解決し、運用面でも効率化ができたのではないかと思います。このブログが、皆さんの Workflows への利用の起点として、今活用されている方々への良いワークフローを構築するきっかけになれば嬉しい限りです。利用時は Workflows のリソースの上限を確認しながら試してみて下さい。
少しでもAssuredの事業に興味をお持ちいただいた方は、ぜひ気軽にお声掛けください。以下のリンクからカジュアル面談への応募をお待ちしています。