Celeryは、Pythonで分散タスクキューを実現するための強力なライブラリです。長時間実行されるジョブの処理、定期的なタスクの実行、およびリアルタイム操作のオフロードを行うために使用されます。Celeryは、非同期タスクの実行とタスクのキューイングを容易にし、システムのパフォーマンスとスケーラビリティを向上させます。Celeryの基本的な使い方と具体的な例を紹介します。
Celeryの基本機能
タスクキューの管理
タスクをキューに投入し、ワーカーがそのタスクを処理します。
非同期タスクの実行
タスクを非同期で実行し、結果を後で取得できます。
定期的なタスクの実行
定期的なジョブやスケジュールされたタスクの実行をサポートします。
スケーラビリティ
複数のワーカーを使ってタスクを並列処理することで、システムのスケーラビリティを向上させます。
Celeryのインストール
まず、Celeryとバックエンドとして使用するメッセージブローカー(例:Redis)をインストールします。
pip install celery[redis]
pip install redis
例題1: 基本的な非同期タスクの実行
以下に、基本的な非同期タスクを実行する方法を示します。
Celeryの設定とタスクの定義
以下の内容をtasks.py
として保存します。
from celery import Celery
# Celeryインスタンスの作成
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
ワーカーの起動
以下のコマンドを実行してCeleryワーカーを起動します。
celery -A tasks worker --loglevel=info
タスクの実行
以下の内容をmain.py
として保存し、タスクを非同期で実行します。
from tasks import add
# 非同期タスクの実行
result = add.delay(4, 6)
print(f"Task result: {result.get(timeout=10)}")
main.py
を実行します。
python main.py
この例では、add
タスクが非同期で実行され、結果が取得されます。
例題2: 定期的なタスクの実行
次に、定期的にタスクを実行する方法を示します。
Celeryの設定とタスクの定義
以下の内容をtasks.py
として保存します。
from celery import Celery
from celery.schedules import crontab
# Celeryインスタンスの作成
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
# 定期的なタスクの設定
app.conf.beat_schedule = {
'add-every-minute': {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'),
'args': (16, 16),
},
}
app.conf.timezone = 'UTC'
ワーカーとビートの起動
以下のコマンドを実行してCeleryワーカーとビートを起動します。
celery -A tasks worker --loglevel=info
celery -A tasks beat --loglevel=info
この例では、add
タスクが毎分実行されます。
例題3: チェインタスクの実行
複数のタスクを順次実行するチェインタスクの例を示します。
Celeryの設定とタスクの定義
以下の内容をtasks.py
として保存します。
from celery import Celery, chain
# Celeryインスタンスの作成
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task
def multiply(x, y):
return x * y
ワーカーの起動
以下のコマンドを実行してCeleryワーカーを起動します。
celery -A tasks worker --loglevel=info
チェインタスクの実行
以下の内容をmain.py
として保存し、チェインタスクを実行します。
from tasks import add, multiply
# チェインタスクの実行
result = chain(add.s(4, 6) | multiply.s(10))()
print(f"Chain task result: {result.get(timeout=10)}")
main.py
を実行します。
python main.py
この例では、add
タスクが実行され、その結果がmultiply
タスクに渡されて実行されます。
結論
Celeryは、Pythonで分散タスクキューを実現するための強力なライブラリです。Celeryを使用することで、長時間実行されるジョブの処理や定期的なタスクの実行、並列処理の効率化が容易になります。基本的な非同期タスクの実行から定期的なタスクの設定、複数のタスクのチェイン実行まで、Celeryの理解と適用は、システムのパフォーマンスとスケーラビリティの向上に非常に有用です。