celery 使用
阅读原文时间:2023年07月09日阅读:1

celery

celery能用来做什么:
    1.异步任务
    2.定时任务
    3.延迟任务

1.1 理解celery的运行原理

1.可以不依赖任何服务器 通过自身命令 启动服务

2.celery服务为其他项目服务提供异步解决任务需求
注:会有两个服务同时运行 一个是项目服务 一个是celery服务 项目服务将需要异步处理的任务交给celery服务 celery就会在需要时异步完成项目的需求
项目服务正常服务 和selery服务互不打扰 当 项目服务需要异步操作时 selery服务会完成异步操作

1.2 celery架构(Broker、backend都用redis)

1.任务中间件 Broker(中间件)其他服务提交的异步任务 放在队列里
    需要借助第三方: redis rabbitmq
2.任务执行单元 worker  真正执行异步任务的进程
    celery提供的
3.结果存储 backend 结果存储 函数的返回结果 存到backend种
    需要借助于第三方:redis mysql

1.3使用场景

异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期任务

celery 不支持win 通过eventlet支持在win上运行


# 安装---》安装完成,会有一个可执行文件 celery
    pip install celery
    celery 不支持win 通过eventlet支持在win上运行
    win:pip install eventlet


1.project
    ├── celery_task      # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │      # 所有任务函数
    ├── add_task.py      # 添加任务
    └── get_result.py   # 获取结果

1.新建一个celery_task--包

2.按照包结构创建py文件

3.celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'

# app = Celery('test', broker=broker, backend=backend, include=['celery_task.user_task', 'celery_task.order_task'])
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

4.order_task.py 添加任务

from celery_task.celery import app
import time
@app.task
def add(a, b):
    print('-----', a + b)
    time.sleep(2)
    return a + b

5.user_task.py 添加项目任务

import time
from celery_task.celery import app
@app.task
def send_sms_ss(phone,code):
    print("给%s发送短信成功,验证码为%s"%(phone,code))
    time.sleep(3)
    return True

6.lll_task.py 开启一个其他程序 提交项目任务

from celery_task.user_task import send_sms_ss
res = send_sms_ss.delay('123456789','66666') #立即异步执行
print(res)

'''
异步:
任务.delay
'''

7.终端 启动worker

celery  -A selery_task worker -l info -P eventlet

8.worker会执行消息中间件的任务 把结果存起来

9.查看执行结果

from main import app
from celery.result import AsyncResult
id = '51611be7-4914-4bd2-992d-749008e9c1a6'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

lll_task.py

from datetime import datetime,timedelta
eta = datetime.utcnow()+timedelta(seconds=20)
res=send_sms_ss.apply_async(args=['123456789','66666'],eta=eta)

'''
延迟任务:
任务.apply_async(args=[],eta-eta) r
如果没有修改时区需要使用UTC时间
'''


需要启动beat和worker
    beat --- 定时提交任务的进程 -- 配置在app.conf.beat_schedule的任务
    worker --- 执行任务的

使用步骤:

#第一步:在celery中的py文件中写入
    app.conf.timezone = 'Asia/Shanghai' #时区修改为上海
    app.conf.enable_utc = False    #是否使用UTC
    #任务的定时配置
    app.conf.beat_schedule = {
            'send_sms': {
                'task': 'celery_task.user_task.send_sms',
                # 'schedule': timedelta(seconds=3),  # 时间对象
                # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                'schedule': crontab(hour=9, minute=43),  # 每天9点43
                'args': ('18888888', '6666'),
            },
        }

#第二步:启动beat
    celery -A celery_task beat -l info

#第三步:启动worker
    celery -A celery_task worker -l info -P eventlet

'''
注意:
    1.启动命令的执行位置 如果是包结构 一定在包这一层
    2.include = ['celery_task.order_task'],路径从包名下开始导入 因为我们在包这层执行的命令
'''


如果在公司制作定时任务 还有一个更简单的框架
参考:https://blog.csdn.net/qq_41341757/article/details/118759836

使用步骤:
1.把咱们写的包复制到项目目录下
    -luffy_lzy
        -celery_task    #包路径
        -luffy_lzy        #源代码路径 小路飞

2.在使用提交异步任务的位置 导入使用即可
    在试图函数中使用 导入任务
    任务.delay() #提交任务

3.启动worker 如果有定时任务 启动beat

4.等待任务被worker执行

5.在视图函数中 查询任务执行的结果

7.1秒杀功能

逻辑分析:
    1.前端秒杀按钮 用户点击 --- 发送ajax请求
    2.视图函数 -- 提交秒杀任务 -- 借助于celery 提交到中间件中
    3.当次秒杀的请求 就回去了 携带者任务id号在前端
    4.前端开启定时任务 每隔3秒 带着任务 向后端发送请求 查看是否秒杀成功
    5.后端的情况
        1.任务还在等待被执行 --- 返回给前端 前端继续没隔3s发送一次请求
        2.任务执行完了 秒杀成功 ---返回给前端 恭喜秒杀成功 --关闭前端定时器
        3.任务执行完了 秒杀失败 ---返回给后端 秒杀失败--关闭前端定时器

7.2视图

#### 秒杀逻辑,CBV
from rest_framework.viewsets import ViewSet

from celery_task.order_task import sckill_task
from celery_task.celery import app
from celery.result import AsyncResult

class SckillView(ViewSet):
    @action(methods=['GET'], detail=False)
    def sckill(self, request):
        a = request.query_params.get('id')
        # 使用异步,提交一个秒杀任务
        res = sckill_task.delay(a)
        return APIResponse(task_id=res.id)

    @action(methods=['GET'], detail=False)
    def get_result(self, request):
        task_id = request.query_params.get('task_id')
        a = AsyncResult(id=task_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='秒杀成功')
            else:
                return APIResponse(code=101, msg='秒杀失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(code=666, msg='还在秒杀中')

7.3 任务 order_task.py

# 秒杀任务
import random
import time

@app.task
def sckill_task(good_id):
    # 生成订单,减库存,都要在一个事务中
    print("商品%s:秒杀开始" % good_id)
    # 这个过程,可能是1,2,3s中的任意一个
    time.sleep(random.choice([6, 7, 9]))
    print('商品%s秒杀结束' % good_id)

    return random.choice([True, False])

7.4前端 Sckill.vue

<template>
  <div>

    <button @click="handleSckill">秒杀</button>
  </div>
</template>

<script>
import Header from '@/components/Header';
import Banner from '@/components/Banner';
import Footer from '@/components/Footer';

export default {
  name: 'Sckill',
  data() {
    return {
      task_id: '',
      t: null
    }
  },
  methods: {
    handleSckill() {
      this.$axios.get(this.$settings.BASE_URL + '/user/sckill/sckill/?id=999').then(res => {
        this.task_id = res.data.task_id
        this.t = setInterval(() => {
          this.$axios.get(this.$settings.BASE_URL + '/user/sckill/get_result/?task_id=' + this.task_id).then(res => {
            if (res.data.code == 666) {
              //如果秒杀任务还没执行,定时任务继续执行
              console.log(res.data.msg)
            } else {
              // 秒杀结束,无论成功失败,这个定时任务都结束
              clearInterval(this.t)
              this.t = null
              this.$message(res.data.msg)
            }

          })
        }, 2000)
      }).catch(res => {

      })
    }
  }

}
</script>

7.5 django中使用celery

-1 把咱们写的包,复制到项目目录下
        -luffy_api
            -celery_task #celery的包路径
                celery.py  # 一定不要忘了一句话
                import os
                os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
            -luffy_api  #源代码路径

    -2 在使用提交异步任务的位置,导入使用即可
        -视图函数中使用,导入任务
        -任务.delay()  # 提交任务

    -3 启动worker,如果有定时任务,启动beat

    -4 等待任务被worker执行

    -5 在视图函数中,查询任务执行的结果

 # 重点:celery中使用djagno,有时候,任务中会使用django的orm,缓存,表模型。。。。一定要加
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')


1.轮播图接口请求来了 先去缓存去看 如果有 直接返回
2.如果没有 查数据库 然后把轮播图数据放到Redis中 缓存起来

改接口

#加入缓存的轮播图接口
class BannerView(GenericViewSet,ListModelMixin):
    queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        banner_list = cache.get('banner_list')
        #查看缓存有没有数据 如果没有再走数据库
        if banner_list:
            return APIResponse(data=banner_list)
        else: #数据库
            res = super().list(request, *args, **kwargs)
            cache.set('banner_list',res.data)
        return APIResponse(data=res.data)
        # {code:100,msg;成功,data=[{},{}]}


加入缓存后 缓存中有数据 先去缓存拿 但是如果mysql中数据变了 缓存不会自动变化 出现数据不一致问题     -- mysql和缓存数据库不一致

双写一致性
    写入mysql redis没动 数据不一致存在问题

如何解决
    1.修改数据 删除缓存
    2.修改数据 更新缓存
    3.定时更新缓存 --- 实时性差

#定时任务 :celery



第一步:在celery配置定时任务
    app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=3),  # 时间对象
    },
}

第二步:启动worker 启动beat
    # update_banner任务的代码
from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache
from django.conf import settings
@app.task
def update_banner():
    # 只要这个任务一执行,就更新轮播图的缓存
    banners = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')
    ser = BannerSerializer(instance=banners, many=True)
    for item in ser.data:
        item['image'] = settings.BACKEND_URL + item['image']

    cache.set('banner_list', ser.data)  # 会出问题,轮播图地址显示不全
    return True

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章