python开发celery异步任务调度与定时触发_定时任务beat

介绍


Celery 可以异步执行,也可以通过定时任务触发

环境准备


这里用redis作为中间件,django使用的版本是v2.1.2
安装django需要用到的第三方包,注意版本号

pip install celery==3.1.26.post2
pip install django-celery==3.3.1
pip install redis==2.10.6

详细的基础教程参考前面的https://www.code404.icu/1332.html。
本篇主要讲定时任务如何实现,下图中的Celery beat 定时任务
在这里插入图片描述

celery 的5个角色


  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。
    任务的消费者是Worker。
    Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat 定时任务调度器,根据配置定时将任务发送给Broker。
  • Backend 用于存储任务的执行结果。

Django 中使用 Celery


要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)

如果你有一个现代的 Django 项目布局,比如:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

其中debug_task是测试的任务,可以注掉

# @app.task(bind=True)
# def debug_task(self):
#     print('Request: {0!r}'.format(self.request))

上面一段只需改这句,’proj’是自己django项目的app名称

app = Celery('proj')

然后你需要在你的proj/proj/init.py 模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便 @shared_task 装饰器(稍后提到)将使用它:

proj/proj/init.py:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

上面这段固定的,不用改

tasks任务


在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    print("task----------111111----------")
    return x + y

@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器

添加setting配置


setting.py添加配置

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

#  celery 配置连接redis
BROKER_URL = 'redis://192.168.1.1:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.1.1:6379'

# 配置定时任务
from celery.schedules import crontab
from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'yoyo(django app名称).tasks.add',  # 任务
        'schedule': timedelta(seconds=5),  # 每5秒执行add函数
        'args': (11, 12)  # 运行参数
    },
    'mul': {
        'task': 'yoyo(django app名称).tasks.mul',  # 任务
        'schedule': timedelta(seconds=10),  # 每10秒执行mul函数
        'args': (11, 2)  # 运行参数
    }
}

CELERYBEAT_SCHEDULE 是配置定时任务,可以添加多个任务,任务名称可以与tasks中的函数名称保持一致,也可以自己定义一个任务名称。

  • task 参数是对应app目录下的tasks文件中任务函数名称
  • schedule 运行周期,支持contrab表达式
  • args 运行任务时候带上的参数

启动worker 和beat服务


启动worker,执行任务

celery -A MyDjango(django 项目名称) worker -l info

运行日志

D:\202107django\MyDjango>celery -A MyDjango worker -l info

 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         yoyo:0x219342ff978
- ** ---------- .> transport:   redis://192.168.1.1:6379//
- ** ---------- .> results:     redis://192.168.1.1:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . yoyo.tasks.add
  . yoyo.tasks.mul

[2021-10-21 12:20:32,079: INFO/MainProcess] Connected to redis://192.168.1.1:6379//
[2021-10-21 12:20:32,167: INFO/MainProcess] mingle: searching for neighbors
[2021-10-21 12:20:33,700: INFO/MainProcess] mingle: all alone

启动beat 定时任务监听

celery -A MyDjango(django 项目名称) beat -l info

启动日志

MyDjango>celery -A MyDjango beat -l info
celery beat v3.1.26.post2 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> redis://192.168.1.1:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2021-10-21 12:22:56,867: INFO/MainProcess] beat: Starting...

启动完成后,会看到beat运行日志,定时任务已经推过去
在这里插入图片描述

worker运行日志,执行任务
在这里插入图片描述

crontab 周期任务


前面是设置每多少秒执行任务,这个只是测试下功能,任务很简单,我们一般用crontab 实现周期性任务,比如每周1-5早上执行一遍任务,用crontab 可以轻松实现

# crontab任务
# 每周一8:30调用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 8:30 A.M
    'add': {
        'task': 'yoyo(django app名称).tasks.add',  # 任务
        'schedule': crontab(hour=8, minute=30, day_of_week=1),
        'args': (11, 12)  # 运行参数
    }
}

crontab定时任务命令规则:

星期命令路径
minutehourdaymonthweekcommandpath
*****commandpath
  • minute:
    表示分钟,可以是从0到59之间的任何整数。
  • hour:
    表示小时,可以是从0到23之间的任何整数。
  • day:
    表示日期,可以是从1到31之间的任何整数。
  • month:
    表示月份,可以是从1到12之间的任何整数。
  • week:
    表示星期几,可以是从0到7之间的任何整数,这里的0或7代表星期日。
  • command:
    要执行的命令,可以是系统命令,也可以是自己编写的脚本文件。
  • path:
    需执行的文件,用绝对路径

crontab命令常用的特殊字符

符号说明
*表示任何时刻
,表示分割
表示一个段,如第二段里:1-5,就表示1到5点
/n表示每个n的单位执行一次,如第二段里,*/1, 就表示每隔1个小时执行一次命令。也可以写成1-23/1

版权声明:本文为作者原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原创文章,作者:老C,如若转载,请注明出处:https://www.code404.icu/1339.html

发表评论

登录后才能评论