django + celeryで非同期処理を実行する方法をまとめる

前回の記事でdjangoとredisを繋げることに成功したので今回はそのredisと、新しくceleryというモジュールを組み合わせて非同期処理を組み込んでいきたいと思います。

環境

Django: 2.2.4
Python: 3.6.9
Celery: 4.3.0

docker-compose.yml

僕は開発環境はdocker-composeで作っています。現在は Django、MySQL、Redisという3つのコンテナを動かしています。

version: '3.7'
services:
  django:
    restart: always
    build: .
    volumes:
      - ./:/opt/apps
    depends_on:
      - db
      - redis
    command: /bin/sh -c "cd /opt/apps; pip install -r requirements.txt; python manage.py migrate; python manage.py runserver 0:8000"
    ports:
      - 8000:8000
  db:
    image: mysql:5.7
    environment:
      MYSQL_DATABASE: app
      MYSQL_USER: root
      MYSQL_ROOT_PASSWORD: パスワード
    tty: true
    ports:
      - 3306:3306
    command: mysqld --character-set-server=utf8mb4 --explicit_defaults_for_timestamp=true
  redis:
    image: redis:latest
    ports:
      - 6379:6379
    tty: true

Dockerfile

FROM python:3.6.9-alpine3.10
ENV APP_PATH /opt/apps
COPY requirements.txt $APP_PATH/
RUN pip install --no-cache-dir -r $APP_PATH/requirements.txt
WORKDIR $APP_PATH

Celery導入

では、この環境にCeleryを入れていきましょう。

requirements.txt

requirements.txtに下記を追記する。

celery==4.3.0

celery.py

まずはメインアプリケーションのディレクトリ(settings.pyがあるところ)配下に、 celery.pyを作ります。

# celery.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'アプリケーションの名前(ディレクトリ名)')
app = Celery('アプリケーションの名前(ディレクトリ名)')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py

任意のアプリケーションのディレクトリ配下に tasks.pyを作成します。このファイルに書かれた処理が非同期で実行されます。ここでは `app`というディレクトリ配下に作りたいと思います。

# app/tasks.py
from celery.task import task
@task
def say_hello():
    print("start say_hello")
    print("hello")
    print("end say_hello")

本来であれば非同期にするという判断した処理は大きくなるかと思いますが、今回はサンプルということで文字列を出力するだけにします。

@task

メソッドの前に `@task`というのがあるのにきづきましたでしょうか。これはこのメソッドは celeryタスクですよ、というのを明示的に示しているものになります。

settings.py

settings.pyにceleryを使うための設定を書いていきます。

# settings.py
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZZER = 'json'
BROKER_URL = 'redis://redis:6379'

カスタムコマンド

ではceleryに非同期処理のキューを積むカスタムコマンドを作ります。

# app/management/commands/set_say_hello_queue.py
from django.core.management.base import BaseCommand
from app.tasks import say_hello
class Command(BaseCommand):
    def handle(self, *args, **options):
        print("====== START =================")
        say_hello.apply_async(args=(), queue='say_hello')
        print("====== END   =================")

さて、これで準備が整いました。

実行

celery実行

celery -A プロジェクト名 worker -Q say_hello -c 2

このコマンドの説明をします。

Aオプション

プロジェクト名を入れます

Qオプション

さばきたいキューの名前を入れます

Cオプション

Cは concurrencyの略で、最大並列数を指定します。ここに1を入れると並列数は1、つまり直列処理になります。

ここに10を入れると最大10個のスレッドで並列処理が実行されます。

このコマンドを実行して、下記のようになれば準備OKです。(今は一旦warningは無視する)

# celery -A プロジェクト名 worker -Q say_hello -c 2
/usr/local/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
  uid=uid, euid=euid, gid=gid, egid=egid,
 -------------- celery@54880b09a5b0 v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-4.9.93-linuxkit-aufs-x86_64-with 2019-09-23 03:33:39
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         プロジェクト名:0x7ff3f70f8a20
- ** ---------- .> transport:   redis://redis:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> say_hello        exchange=say_hello(direct) key=say_hello
[2019-09-23 03:33:40,227: WARNING/MainProcess] /usr/local/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '

カスタムコマンド実行

それではキューを積むコマンドを打ってみましょう。

python manage.py set_say_hello_queue

すると、celeryでは下記のように処理がされることを確認してください。

[2019-09-23 03:35:05,562: WARNING/ForkPoolWorker-2] start say_hello
[2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] hello
[2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] end say_hello

積んだキューがceleryによって拾い上げられ、処理されることを確認できました!

今後はcelery実装で詰まった点や、celeryのキューを可視化するツールについてまとめていこうと思います。最後までお読みいただきありがとうございます。

Related Posts