python 异步任务框架 Celery 入门,速看!

2023-09-22 15:55:34

01、简介

Celery 是使用 python 编写的分布式任务调度框架。

它有几个主要的概念:

celery 应用

  • 用户编写的代码脚本,用来定义要执行的任务,然后通过 broker 将任务发送到消息队列中

broker

  • 代理,通过消息队列在客户端和 worker 之间进行协调。

  • celery 本身并不包含消息队列,它支持一下消息队列

    RabbitMQ

    Rdis

    Amazon SQS

    Zookeeper

  • 更多关于 Broker 见官方文档(末尾点击阅读原文)

backend

  • 数据库,用来存储任务返回的结果。

worker

  • 工人,用来执行 broker 分派的任务。

任务

  • 任务,定义的需要执行的任务

版本要求

Celery5.1 要求:

  • python(3.6,3.7,3.8)

Celery 是一个资金最少的项目,所以我们不支持 Microsoft Windows。

更多更详细的版本要求见官方文档

如果你想学习自动化测试,我这边给你推荐一套视频,这个视频可以说是B站播放全网第一的自动化测试教程,同时在线人数到达1000人,并且还有笔记可以领取及各路大神技术交流:798478386    

【已更新】B站讲的最详细的Python接口自动化测试实战教程全集(实战最新版)_哔哩哔哩_bilibili【已更新】B站讲的最详细的Python接口自动化测试实战教程全集(实战最新版)共计200条视频,包括:1、接口自动化之为什么要做接口自动化、2、接口自动化之request全局观、3、接口自动化之接口实战等,UP主更多精彩视频,请关注UP账号。icon-default.png?t=N7T8https://www.bilibili.com/video/BV17p4y1B77x/?spm_id_from=333.337&vd_source=488d25e59e6c5b111f7a1a1a16ecbe9a

安装

使用 pip 安装:

pip install -U Celery

捆绑包

Celery 还定义了一组包,用于安装 Celery 和给定的依赖项。

可以在 pip 命令中实现中括号来指定这些依赖项。

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"

02、简单使用

1. 选择一个 broker

使用 celery 首先需要选择一个消息队列。安装任意你熟悉的前面提到的 celery 支持的消息队列。

2. 编写一个 celery 应用

首先我们需要编写一个 celery 应用,它用来创建任务和管理 wokers,它要能够被其他的模块导入。

创建一个tasks.py 文件:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):   
    return x + y

第一个参数tasks是当前模块的名称,它可以省略,建议以当前模块名为名称。

第二个关键字参数 broker='redis://localhost:6379/0'指定我们使用 Redis 作为消息队列,并指定连接地址。

3.运行 celery 的 worker 服务

cd 到 tasks.py 所在目录,然后运行下面的命令来启动 worker 服务

celery -A tasks worker --loglevel=INFO

4. 调用任务

>>> from tasks import add
>>> add.delay(4,4)

通过调用任务的 delay 来执行对应的任务。celery 会把执行命令发送到 broker,broker 再将消息发送给 worker 服务来执行,如果一切正常你将会在 worker 服务的日志中看到接收任务和执行任务的日志。

5. 保存结果

如果你想要跟踪任务的状态以及保存任务的返回结果,celery 需要把它发送到某个地方。celery 提供多种结果后端。

我们这里以 reids 为例,修改 tasks.py中的代码,添加一个 Redis 后端。

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

更多结果后端见官方文档。(末尾点击阅读原文)

重新启动 worker 服务,重新打开 python 解释器

>>> from tasks import add
>>> result = add.delay(4,4)

ready()方法返回任务是否执行完成:

>>> result.ready()
False

还可以等待结果完成,但很少使用这种方法,因为它将异步调用转换为同步调用

>>> result.get(timeout=1)
8

03、在应用中使用 celery

创建项目

项目结构:

proj/__init__.py
    /celery.py
    /tasks.py

 proj/celery.py

from celery import Celery

app = Celery('proj',
            broker='redis://localhost:6379/0',
            backend='redis://localhost:6379/1',             
            include=['proj.tasks']

)# 配置
app.conf.update(
   result_expires=3600, # 结果过期时间
)

在这个模块中我们创建了一个 Celery 模块。要在你的项目中使用 celery 只需要导入此实例。

proj/tasks.py

from .celery import app


@app.task
def add(x, y): 
   return x + y


@app.task
def mul(x, y):
   return x * y


@app.tas
kdef xsum(numbers)
    return sum(numbers)

启动 worker

celery -A proj worker -l INFO

 调用任务

>>> from proj.tasks import add
>>> add.delay(2, 2)

04、在 django 中使用celery

要在你的 django 项目中使用 celery,首先需要定义一个 Celery 的实例。

如果你又 django 项目如下:

- proj/ 
 - manage.py
 - proj/ 
   - __init__.py
   - settings.py
   - urls.py

 那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义芹菜实例:file:proj/proj/celery.py

import os

from celery import Celery

# 为`celery`设置默认的django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')

app = Celery('proj')

# 设置配置来源
app.config_from_object('django.conf:settings',namespace='CELERY')

# 加载所有的已注册django应用中的任务
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self): 
   print(f'Request: {self.request!r}')

然后你需要在你的 proj/proj/__init__.py模块中导入这个应用程序。这样就可以保证 Django 启动时加载应用程序,以便于 @shared_task 装饰器的使用。

proj/proj/__init__.py:

from .celery import app as celery_app
__all__ = ('celery_app',)

请注意,此示例项目布局适用于较大的项目,对于简单的项目,可以使用包含定义应用程序和任务的单个模块。

接下来我们来解释一下 celery.py 中的代码,首先,我们设置celery命令行程序的环境变量DJANGO_SETTINGS_MODULE的默认值:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

这一行的作用是加载当前 django 项目的环境设置,特别是当需要在异步任务中用到 ORM。它必须在创建应用程序实例之前。

app = Celery('proj')

我们还添加了 Django 设置模块作为 Celery 的配置源。这意味着我们不必使用多个配置文件,而是直接在 Django 的配置文件中配置 Celery。

app.config_from_object('django.conf:settings', namespace='CELERY')

大写命名空间意味着所有Celery配置项必须以大写指定,并以 CELERY_ 开头,因此例如broker_url 设置变为 CELERY_BROKER_URL。

例如,Django 项目的配置文件可能包括:

settings.py

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60

接下来,可重用应用程序的常见做法是在单独的tasks.py模块中定义所有任务Celery有一种方法可以自动发现这些模块:

app.autodiscover_tasks()

 使用上面的行,Celery 将按照tasks.py 约定自动从所有已安装的应用程序中发现任务:

- app1/
   - tasks.py
   - models.py
- app2/
   - tasks.py
   - models.py

这样就不必手动将各个模块添加到CELERY_IMPORTS 设置中。

使用 @shared_task 装饰器

我们编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖与项目本身,因此无法直接导入 celery 应用实例。

@shared_task装饰器可以让我们无需任何具体的 celery 实例创建任务:demoapp/tasks.py

# Create your tasks here

from demoapp.models import Widget

from celery import shared_task


@shared_task
def add(x, y):
   return x + y


@shared_task
def mul(x, y):
   return x * y


@shared_task
def xsum(numbers):
   return sum(numbers)


@shared_task
def count_widgets(): 
   return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
   w = Widget.objects.get(id=widget_id)
   w.name = name
   w.save()

 

更多推荐

java 企业工程管理系统软件源码 自主研发 工程行业适用

工程项目管理软件(工程项目管理系统)对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营,全过程、全方位的对项目进行综合管理工程项目各模块及其功能点清单一、系统管理1、数据字典:实现对数据字典标签的增删改查操作2、编码管理:实现对系统编码的增删改查操作3、用户管理:管理和查看用户角

【C++】STL之适配器---用deque实现栈和队列

目录前言一、deque1、deque的原理介绍2、deque的底层结构3、deque的迭代器4、deque的优缺点4.1、优点4.2、缺点二、stack的介绍和使用1、stack的介绍2、stack的使用3、stack的模拟实现三、queue的介绍和使用1、queue的介绍2、queue的使用3、queue的模拟实现前

Python:为何成为当下最热门的编程语言?

文章目录🍋引言🍋1.简单易学🍋2.多领域应用🍋3.强大的社区支持🍋4.丰富的库和框架🍋5.跨平台兼容🍋6.开源和免费🍋7.数据科学和人工智能的崛起🍋8.自动化和脚本编写🍋9.大型组织的采用🍋10.教育和培训🍋引言在计算机编程的世界里,有数以百计的编程语言可供选择。然而,近年来,Python已经崭

偶现来电时手机操作出现重启

问题描述:偶现来电时手机操作出现重启问题分析:从系统Log看09-0610:22:44.79182914001425WWatchdog:***WATCHDOGKILLINGSYSTEMPROCESS:Blockedinhandleronmainthread(main)09-0610:22:44.794133140014

CSRF和SSRF有什么不同?

文章目录CSRF复现SSRF复现启动环境漏洞复现探测存活IP和端口服务计划任务反弹shell区别CSRF复现打开dvwa,将难度调为low,点击CSRF,打开后发现有一个修改密码的输入框:在这里修改密码,并用bp抓包,在httphistory查看数据包,点击engagementtools中的GenerateCSRFPo

策略模式,一种广泛应用于各种情况的设计模式(设计模式与开发实践 P5)

文章目录策略模式实现思想实战-表单策略模式定义:定义一系列算法,把它们一个个封装起来,并且可以互相替换例如,我们要计算年终奖,年终奖根据绩效A、B、C来计算最终数值实现最初我们很容易想到用分支if来解决这个问题,如果绩效=A则工资x2,如果绩效=B则工资x3如果经常使用这样的分支结构,你会发现代码耦合度很高,很容易就出

瑞芯微RK3568:Debian系统如何安装Docker

本文基于HD-RK3568-IOT评估板演示Debian系统安装Docker,该方法适用于RK356X全系产品。HD-RK3568-IOT评估板基于HD-RK3568-CORE工业级核心板设计(双网口、双CAN、5路串口),接口丰富,适用于工业现场应用需求,亦方便用户评估核心板及CPU的性能。适用于工业自动化控制、人机

网络安全(黑客)自学

前言:想自学网络安全(黑客技术)首先你得了解什么是网络安全!什么是黑客网络安全可以基于攻击和防御视角来分类,我们经常听到的“红队”、“渗透测试”等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。无论网络、Web、移动、桌面、云等哪个领域,都有攻与防两面性,例如Web安全技术,既有Web渗透,也有

手写模拟Spring的底层原理2.1

先来引入两个问题第一个懒加载还是立即加载的问题,这个问题还是在于就是说,当我们xml配置了一个对象bean的时候,它在spring容器里面是什么时候开始会给我们创建这个对象那如果我们想对某个对象启动懒加载,可以添加@lazy这个注解这个注解一加上,它就只会在得到对象的时候给我们在容器中创建对象也就是在使用下面的方法的时

Reactor 第十二篇 WebFlux集成PostgreSQL

1引言在现代的应用开发中,数据库是存储和管理数据的关键组件。PostgreSQL是一种强大的开源关系型数据库,而WebFlux是Spring框架提供的响应式编程模型。本文将介绍如何使用Reactor和WebFlux集成PostgreSQL,实现响应式的数据库访问。1.环境准备首先,我们需要在项目的pom.xml文件中添

一个例子了解交叉编译

学习嵌入式Linux经常听到交叉编译这个名词,那到底什么是交叉编译,下面通过一个例子来介绍。首先新建一个C文件,其代码如下。#include"stdio.h"voidmain(){inta,b;intc;printf("请输入两个数:\n");scanf("%d%d",&a,&b);c=a+b;printf("a+b=

热文推荐