前回の記事でdjangoとredisを繋げることに成功したので今回はそのredisと、新しくceleryというモジュールを組み合わせて非同期処理を組み込んでいきたいと思います。

環境

Django: 2.2.4
Python: 3.6.9
Celery: 4.3.0

docker-compose.yml

僕は開発環境はdocker-composeで作っています。現在は Django、MySQL、Redisという3つのコンテナを動かしています。

version: '3.7' services: django: restart: always build: . volumes: - ./:/opt/apps depends_on: - db - redis command: /bin/sh -c "cd /opt/apps; pip install -r requirements.txt; python manage.py migrate; python manage.py runserver 0:8000" ports: - 8000:8000 db: image: mysql:5.7 environment: MYSQL_DATABASE: app MYSQL_USER: root MYSQL_ROOT_PASSWORD: パスワード tty: true ports: - 3306:3306 command: mysqld --character-set-server=utf8mb4 --explicit_defaults_for_timestamp=true redis: image: redis:latest ports: - 6379:6379 tty: true
Code language: JavaScript (javascript)

Dockerfile

FROM python:3.6.9-alpine3.10 ENV APP_PATH /opt/apps COPY requirements.txt $APP_PATH/ RUN pip install --no-cache-dir -r $APP_PATH/requirements.txt WORKDIR $APP_PATH
Code language: PHP (php)

Celery導入

では、この環境にCeleryを入れていきましょう。

requirements.txt

requirements.txtに下記を追記する。

celery==4.3.0

celery.py

まずはメインアプリケーションのディレクトリ(settings.pyがあるところ)配下に、 celery.pyを作ります。

# celery.py import os from celery import Celery from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'アプリケーションの名前(ディレクトリ名)') app = Celery('アプリケーションの名前(ディレクトリ名)') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Code language: PHP (php)

tasks.py

任意のアプリケーションのディレクトリ配下に tasks.pyを作成します。このファイルに書かれた処理が非同期で実行されます。ここでは `app`というディレクトリ配下に作りたいと思います。

# app/tasks.py from celery.task import task @task def say_hello(): print("start say_hello") print("hello") print("end say_hello")
Code language: PHP (php)

本来であれば非同期にするという判断した処理は大きくなるかと思いますが、今回はサンプルということで文字列を出力するだけにします。

@task

メソッドの前に `@task`というのがあるのにきづきましたでしょうか。これはこのメソッドは celeryタスクですよ、というのを明示的に示しているものになります。

settings.py

settings.pyにceleryを使うための設定を書いていきます。

# settings.py CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZZER = 'json' BROKER_URL = 'redis://redis:6379'
Code language: PHP (php)

カスタムコマンド

ではceleryに非同期処理のキューを積むカスタムコマンドを作ります。

# app/management/commands/set_say_hello_queue.py from django.core.management.base import BaseCommand from app.tasks import say_hello class Command(BaseCommand): def handle(self, *args, **options): print("====== START =================") say_hello.apply_async(args=(), queue='say_hello') print("====== END =================")

さて、これで準備が整いました。

実行

celery実行

celery -A プロジェクト名 worker -Q say_hello -c 2

このコマンドの説明をします。

Aオプション

プロジェクト名を入れます

Qオプション

さばきたいキューの名前を入れます

Cオプション

Cは concurrencyの略で、最大並列数を指定します。ここに1を入れると並列数は1、つまり直列処理になります。

ここに10を入れると最大10個のスレッドで並列処理が実行されます。

このコマンドを実行して、下記のようになれば準備OKです。(今は一旦warningは無視する)

# celery -A プロジェクト名 worker -Q say_hello -c 2 /usr/local/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@54880b09a5b0 v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.9.93-linuxkit-aufs-x86_64-with 2019-09-23 03:33:39 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: プロジェクト名:0x7ff3f70f8a20 - ** ---------- .> transport: redis://redis:6379// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> say_hello exchange=say_hello(direct) key=say_hello [2019-09-23 03:33:40,227: WARNING/MainProcess] /usr/local/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never '
Code language: PHP (php)

カスタムコマンド実行

それではキューを積むコマンドを打ってみましょう。

python manage.py set_say_hello_queue
Code language: CSS (css)

すると、celeryでは下記のように処理がされることを確認してください。

[2019-09-23 03:35:05,562: WARNING/ForkPoolWorker-2] start say_hello [2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] hello [2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] end say_hello
Code language: CSS (css)

積んだキューがceleryによって拾い上げられ、処理されることを確認できました!

今後はcelery実装で詰まった点や、celeryのキューを可視化するツールについてまとめていこうと思います。最後までお読みいただきありがとうございます。

By user