2020年5月6日水曜日

GAE/Python3/Flask で Cloud Tasks を使う

非同期処理に使いたいので、調べてみました。

こちらを参考に
https://cloud.google.com/tasks/docs/creating-appengine-tasks?hl=ja

インストール
$ pip install google-cloud-tasks

まず、タスクを作ってキューに入れます。
--
@app.route('/task_send')
def task_send():
 from google.cloud import tasks_v2

 PROJECT = <プロジェクトID>
 LOCATION = <リージョン>
 QUEUE = "default"

 client = tasks_v2.CloudTasksClient()
 parent = client.queue_path(PROJECT, LOCATION, QUEUE)

 app_route = "/task_run"

 data = {
  'a': "abc",
 }
 payload = json.dumps(data)

 task = {
  'app_engine_http_request': {
   'http_method': 'POST',
   'relative_uri': app_route,
   'headers': {
    'Content-Type': "application/json"
   },
   'body': payload.encode(),
  }
 }

 response = client.create_task(parent, task)
--

説明
・LOCATION: GAEの実行場所(リージョン)です。
 "us-central"の場合、"us-central1" とします。(1を追記)
・app_route: タスクを実行する場所(PATH)です。
・data: データをJSON形式で送りたいので、一旦dictに入れて、jsonに変換します。
・headers: Content-Typeを jsonにします。
 ※デフォルトは、"application/octet-stream"

次に、タスクを受け取ります。
--
@app.route('/task_run', methods=["POST"])
def task_run():
 data = request.json
 ....
--
説明
・data: dictデータです。

あとは、お好きに。