Celery&Flower文档笔记
阅读原文时间:2023年07月09日阅读:1
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379/0')  

@app.task
def add(a, b):
    print(a + b)

创建实例

app是celery的一个实例,第一个参数表示app的名称,broker申明使用的broker是谁,这里用的是Redis。backend申明后端结果存储在哪里。

@app.task表示这是app的一个任务。

启动worker

接着启动worker。到目录下执行celery -A tasks worker --loglevel=info。tasks为实例在哪个模块中,这里在tasks.py中,celery会自己去找实例,你也可以指定tasks.app。-loglevel 指定了日志级别,也可以用-l info表示。在Windows中,celery4以上在这样执行后续可能会报错,那么可以使用celery -A tasks worker -pool=solo -l info打开worker。或者在tasks.py中加一句os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

如果要停止worker,使用control + c。

把任务给worker

在启动celery worker后,可以使用delay函数把任务给worker去异步执行。

from tasks import add

if __name__ == '__main__':
    print("start add")
    add.delay(1, 2)
    print("end")

如果配置了backend,那么能接收delay的返回值result,使用result.ready()判断是否执行完毕。

配置

celery允许我们进行相关的配置,比如app.conf.task_serializer = 'json'。如果需要同时配置多条,可以使用update进行配置。

目录结构:
proj/main.py
    /config.py
    /tasks.py


app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于大型的工程,可以使用专门的配置模块进行配置,app.config_from_object('celeryconfig'),celeryconfig为配置模块的名称,我们可以在同级目录下创建celeryconfig.py进行配置。

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

全部代码:

# main.py
from tasks import add  

if __name__ == '__main__':
    print("start add")
    add.delay(1, 2)
    print("end")


# tasks.py
from celery import Celery
import os  

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('tasks')
app.config_from_object('config')  

@app.task
def add(a, b):
    print(a + b)


# config.py
broker_url='redis://localhost:6379'
result_backend='redis://localhost:6379'

关于PENDING队列:

Retrieve list of tasks in a queue in Celery

Message Protocol

监控celery

使用events得到事件发生的信息,从而进行监控。

Monitoring and Management Guide

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

打开本地监听:flower --port=5555

或者从Celery运行:celery flower --address=127.0.0.1 --port=5555

如果要开放给外网:celery flower --address=0.0.0.0 --port=5555

可以在后面加上--basic_auth=name:password用来进行简单的登录验证。

打开后进入127.0.0.1:5555进入flower界面。

然后我们使用celery就能在flower页面看到监控记录了。

flower提供了HTTP API,可以通过此对worker进行远程操控。

API Reference

Configuration and defaults

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章