关于celery的介绍

栏目: 后端 · 发布时间: 6年前

内容简介:说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 该键值对的失效时间为24小时链式任务就是异步或者定时执行的任务由多个子任务执行完成
  • 1.1 Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具(它本身不是一个任务队列, 它是任务队列管理的工具, 它提供的接口可以帮助我们实现分布式任务队列)。

  • 1.2 Celery专注于实时任务处理,支持任务调度(跟rabbitMQ可实现多种exchange。)

说白了,它是一个分布式队列的管理工具,我们可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列。

  • 1.3 Celery 架构

    关于celery的介绍
    • 消息中间件(message broker)(邮箱, 邮局): 本身不提供消息服务,可以和第三方消息中间件集成,常用的有 redis mongodb rabbitMQ

    • 任务执行单元(worker)(寄件人): 是Celery提供的任务执行单元, worker并发的运行在分布式的系统节点中

    • 任务执行结果存储(task result store)(收件人):用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等

2. 任务队列和消息队列

  • 任务队列是一种在线或机器分发任务的机制

  • 消息队列输入是工作的一个单元, 可以认为是一个任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

  • 图解

    关于celery的介绍

2.简单示例

2.1 创建一个celery实例 创建tasks.py文件

import time
from celery import Celery

app = Celery('tasks', broker='redis:////127.0.0.1:6379/6', backend='redis:////127.0.0.1:6379/7')


@app.task
def add(x, y):
    time.sleep(10)
    return x + y
复制代码

ps: tasks为任务名称 设置reids为中间件

2.2 创建一个index.py文件调用并且检测任务、查看任务执行状态

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tasks import add, app
from celery.result import AsyncResult
import time

# 立即告知celery去执行add任务,并传入两个参数
result = add.delay(4, 4)
print(result.id)
async = AsyncResult(id=result.id, app=app)

time.sleep(3)
if async.successful():
    result = async.get()
    print(result, "执行成功")
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

复制代码
  • ps 如果使用 redis 作为任务队列中间人,在redis中存在两个键 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名为 celery 的任务队列(Celery 默认),而 celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。

2.3 执行命令详解

  • celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
    • A参数指定celery对象的位置,该app.celery_tasks.celery指的 是app包下面的celery_tasks.py模块的celery实例,注意一定是初始化后的实例,

    • Q参数指的是该worker接收指定的队列的任务,这是为了当多个队列有不同的任务时可以独立;如果不设会接收所有的队列的任务;

    • l参数指定worker的日志级别;

执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 该键值对的失效时间为24小时

2.4 消息主体分析

  • body : 是序列化后使用base64编码的信息,包括具体的任务参数,其中包括了需要执行的方法、参数和一些任务基本信息
  • content-encoding : 序列化数据编码方式
  • content-type : 任务数据的序列化方式,默认使用python内置的序列化模块pickle(ps: pickle模块支持的类型 所有python支持的原生类型:布尔值,整数,浮点数,复数,字符串,字节,None。 由任何原生类型组成的列表,元组,字典和集合。 函数,类,类的实例, 常用的方法:dumps,dump,loads,load)
{
    "body": "gAJ9cQAoWAQAAAB0YXNrcQFYCQAAAHRhc2tzLmFkZHECWAIAAABpZHEDWCQAAABjNDMwMzZkMi03Yzc3LTQ0MDUtOTYwNC1iZDc3ZTcyNzNlN2FxBFgEAAAAYXJnc3EFSwRLBIZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==",
    "content-encoding": "binary",
    "content-type": "application/x-python-serialize",
    "headers": {},
    "properties": {
        "reply_to": "caa78c3a-618a-31f0-84a9-b79db708af02",
        "correlation_id": "c43036d2-7c77-4405-9604-bd77e7273e7a",
        "delivery_mode": 2,
        "delivery_info": {
            "priority": 0,
            "exchange": "celery",
            "routing_key": "celery"
        },
        "body_encoding": "base64",
        "delivery_tag": "e7e288b5-ecbb-4ec6-912c-f42eb92dbd72"
    }
}
复制代码

2.5Celery配置

CELERY_DEFAULT_QUEUE:默认队列
BROKER_URL  : 代理人的网址
CELERY_RESULT_BACKEND:结果存储地址
CELERY_TASK_SERIALIZER:任务序列化方式
CELERY_RESULT_SERIALIZER:任务执行结果序列化方式
CELERY_TASK_RESULT_EXPIRES:任务过期时间
CELERY_ACCEPT_CONTENT:指定任务接受的内容序列化类型(序列化),一个列表;
复制代码

2.6获取执行任务执行结果的方法

r = func.delay(...)
r.ready()     			# 查看任务状态,返回布尔值,  任务执行完成, 返回 True, 否则返回 False.
r.wait()      			# 等待任务完成, 返回任务执行结果,很少使用;
r.get(timeout=1)       # 获取任务执行结果,可以设置等待时间
r.result      			# 任务执行结果.
r.state       			# PENDING, START, SUCCESS,任务当前的状态
r.status     				# PENDING, START, SUCCESS,任务当前的状态
r.successful  			# 任务成功返回true
r.traceback 				# 如果任务抛出了一个异常,你也可以获取原始的回溯信息
复制代码

2.7celery的装饰方法celery.task

  • task()把任务(函数)装饰成异步
@celery.task()
def func():
	# do something
    pass
    
复制代码
  • 可以重新定义任务的基类
class MyTask(celery.Task):
    # 任务失败时执行
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    # 任务成功时执行
    def on_success(self, retval, task_id, args, kwargs):
        pass
    # 任务重试时执行
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass


复制代码

参数

  • task_id : 任务id
  • einfo:执行失败时任务详情
  • exc: 失败时的错误类型
  • retval: 任务成功时返回的执行结果

2.8 一份完整的配置文件

# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任务过期时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 20   
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["msgpack"]   
# 任务发送完成是否需要确认,这一项对性能有一点影响     
CELERY_ACKS_LATE = True  
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5  # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default" 
# 设置详细的队列
CELERY_QUEUES = {
    "default": { # 这是上面指定的默认队列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
    
}
复制代码

2.8 Celery定时任务

  • 指定定时任务并加入配置 重新启动worker
# config.py
from datetime import timedelta
from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'ptask': {
        'task': 'tasks.period_task',
        'schedule': timedelta(seconds=5),
    },
}

# 添加定时任务
@app.task(bind=True)
def period_task(self):
    print 'period task done: {0}'.format(self.request.id)
 
复制代码

PS:时间如果涉及到datatime最好设置为UTC时间

  • 启动定时任务进程
celery -A task beat

复制代码

2.9 链式任务

链式任务就是异步或者定时执行的任务由多个子任务执行完成

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()
 
@app.task()
def fetch_page(url):
    return myhttplib.get(url)
 
@app.task()
def parse_page(page):
    return myparser.parse_document(page)
 
@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

 fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
复制代码

以上所述就是小编给大家介绍的《关于celery的介绍》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Blog Design Solutions

Blog Design Solutions

Richard Rutter、Andy Budd、Simon Collison、Chris J Davis、Michael Heilemann、Phil Sherry、David Powers、John Oxton / friendsofED / 2006-2-16 / USD 39.99

Blogging has moved rapidly from being a craze to become a core feature of the Internetfrom individuals sharing their thoughts with the world via online diaries, through fans talking about their favori......一起来看看 《Blog Design Solutions》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具