0%

Python - Celery - 调用任务

0. 前提

有3个任务:add作加法运算, mul作乘法运算, xsum为求和运算

1
2
3
4
5
6
7
8
9
10
11
@app.task(bind=True, base=BaseTask)
def add(self, a, b):
return a + b

@app.task(bind=True, base=BaseTask)
def mul(self, a, b):
return a * b

@app.task(bind=True, base=BaseTask)
def xsum(self, *args):
return sum(args)

1. 立即执行任务

1
add.apply_async()

或者简写为:

1
add.delay()

2. 延时任务

使用countdown参数使任务在60秒后执行:

1
add.apply_async(countdown=60)

3. 定时任务

使用eta参数使任务在晚上9点整执行:

1
2
3
import datetime
exec_at = datetime.datetime(2019, 7, 28, 21, 0, 0, 0)
add.apply_async(eta=exec_at)

4. 任务分组

使用group生成一组任务并发执行

1
2
3
4
5
6
tasks = group(add.si(x,x) for x in range(10))
tasks.apply_async()

# get方法可获取任务的返回值:
res = tasks()
res.get(timeout=1) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

5. 任务分块

使用chord将1000个任务分成100个一块,共10组

1
2
3
4
5
6
7
tasks = add.chunks(((x, x) for x in range(1000)), 100).group()  # 转化为group
# 每间隔60秒执行一组
tasks.skew(step=60)
tasks.apply_async()

# 注意使用了skew后eta参数会失效,想要定时效果可以转化为countdown如:
task.apply_async(countdown=(exec_at - now()).seconds)

6. 链式顺序执行

任务签名signature
subtask=signature,可以简写为s,如add.s(),当需要对任务做一定处理后执行时使用。
ssi的区别:
s相当于signature(),而si相当于signature(immutable=True)。 s会把上个任务的结果传给下个任务,而si不传递结果。

使用chain按顺序执行一队任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 前一个任务的结果作为后一个任务的第一个参数
chain(add.s(1,1), add.s(2), add.s(3))()

# 不传递结果,仅仅按顺序执行
chain(add.si(1,1), add.si(2,2), add.si(3,3))()

# 也可以使用 | 链接任务,效果是一样的,如:
all_task = (add.s(1,1) | add.s(2) | add.s(3))
all_task = (add.si(1,1) | add.si(22) | add.si(32))
all_task.delay()

# chain还支持部分传参,如:
c = (add.s(2) | mul.s(8))
res = c(1) # (1+2)*8
res.get() # 24

# 搭配列表生成式实现更灵活,如
task_list = [add.si(x,x) for x in range(3)]
chain(*task_list)() # 不要忘了加上最后的括号才可执行任务

另一种调用方法:

1
2
3
c =   (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8))) # ((4 + 16) * 2 + 4) * 8
res = c.apply_async() # 或者直接c()
res.get() # .get()方法可以获取链式任务的最终返回值352

7. 任务分割

使用chord将任务分为headerbody两部分,header任务执行完再执行body,其中header的返回结果会作为参数传递给body,如:

1
2
res = chord(header=[add.s(1,2),mul.s(3,4)],body=add.s())()  # 任务(1+2)+(3*4)
res.get() #

当一个group和另一个task组成chain时,会自动升级转化为chord,如:

1
2
3
c = (group(add.s(i, i) for i in range(10)) | xsum.s())  # (0+0)+(1+1)+(2+2)+...+(9+9)
res = c()
res.get() # 90