Celery:Pythonでの分散タスクキュー

Celeryは、Pythonで分散タスクキューを実現するための強力なライブラリです。長時間実行されるジョブの処理、定期的なタスクの実行、およびリアルタイム操作のオフロードを行うために使用されます。Celeryは、非同期タスクの実行とタスクのキューイングを容易にし、システムのパフォーマンスとスケーラビリティを向上させます。Celeryの基本的な使い方と具体的な例を紹介します。

Celeryの基本機能

タスクキューの管理

タスクをキューに投入し、ワーカーがそのタスクを処理します。

非同期タスクの実行

タスクを非同期で実行し、結果を後で取得できます。

定期的なタスクの実行

定期的なジョブやスケジュールされたタスクの実行をサポートします。

スケーラビリティ

複数のワーカーを使ってタスクを並列処理することで、システムのスケーラビリティを向上させます。

    Celeryのインストール

    まず、Celeryとバックエンドとして使用するメッセージブローカー(例:Redis)をインストールします。

    pip install celery[redis]
    pip install redis

    例題1: 基本的な非同期タスクの実行

    以下に、基本的な非同期タスクを実行する方法を示します。

    Celeryの設定とタスクの定義

    以下の内容をtasks.pyとして保存します。

    from celery import Celery
    
    # Celeryインスタンスの作成
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y

    ワーカーの起動

    以下のコマンドを実行してCeleryワーカーを起動します。

    celery -A tasks worker --loglevel=info

    タスクの実行

    以下の内容をmain.pyとして保存し、タスクを非同期で実行します。

    from tasks import add
    
    # 非同期タスクの実行
    result = add.delay(4, 6)
    print(f"Task result: {result.get(timeout=10)}")

    main.pyを実行します。

    python main.py

      この例では、addタスクが非同期で実行され、結果が取得されます。

      例題2: 定期的なタスクの実行

      次に、定期的にタスクを実行する方法を示します。

      Celeryの設定とタスクの定義

      以下の内容をtasks.pyとして保存します。

      from celery import Celery
      from celery.schedules import crontab
      
      # Celeryインスタンスの作成
      app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
      
      @app.task
      def add(x, y):
          return x + y
      
      # 定期的なタスクの設定
      app.conf.beat_schedule = {
          'add-every-minute': {
              'task': 'tasks.add',
              'schedule': crontab(minute='*/1'),
              'args': (16, 16),
          },
      }
      app.conf.timezone = 'UTC'

      ワーカーとビートの起動

      以下のコマンドを実行してCeleryワーカーとビートを起動します。

        celery -A tasks worker --loglevel=info
        celery -A tasks beat --loglevel=info

        この例では、addタスクが毎分実行されます。

        例題3: チェインタスクの実行

        複数のタスクを順次実行するチェインタスクの例を示します。

        Celeryの設定とタスクの定義

        以下の内容をtasks.pyとして保存します。

        from celery import Celery, chain
        
        # Celeryインスタンスの作成
        app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
        
        @app.task
        def add(x, y):
            return x + y
        
        @app.task
        def multiply(x, y):
            return x * y

        ワーカーの起動

        以下のコマンドを実行してCeleryワーカーを起動します。

        celery -A tasks worker --loglevel=info

        チェインタスクの実行

        以下の内容をmain.pyとして保存し、チェインタスクを実行します。

        from tasks import add, multiply
        
        # チェインタスクの実行
        result = chain(add.s(4, 6) | multiply.s(10))()
        print(f"Chain task result: {result.get(timeout=10)}")

        main.pyを実行します。

        python main.py

        この例では、addタスクが実行され、その結果がmultiplyタスクに渡されて実行されます。

        結論

        Celeryは、Pythonで分散タスクキューを実現するための強力なライブラリです。Celeryを使用することで、長時間実行されるジョブの処理や定期的なタスクの実行、並列処理の効率化が容易になります。基本的な非同期タスクの実行から定期的なタスクの設定、複数のタスクのチェイン実行まで、Celeryの理解と適用は、システムのパフォーマンスとスケーラビリティの向上に非常に有用です。

        タイトルとURLをコピーしました