Home
Posts
Categories
Series
Tags
About
Task queue[三] Celery(二)
postedOn: 2024-1-27   updatedOn: 2024-1-27   includedIn: 程式
wordsCount: 491   readingTime: 1 min   viewers:

任務定義

將裝飾器綁到函數


celery = Celery(__name__,broker=broker)


@celery.task
def add(x: int, y: int) -> int:
    time.sleep(5)
    return x + y


add 就是一個task

任務使用

完整版

apply_async(args[, kwargs[, …]])

ex:
add.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},countdown=10))

簡化版(不帶選項類型參數=>控制任務)

delay(*args, **kwargs)

ex:
add.delay(arg1, arg2, kwarg1='x', kwarg2='y')

Signatures 簽名=⇒將函數當成可傳遞

完整版

add.signature((2, 2), countdown=10)

s = add.signature((2, 2), {'debug': True}, countdown=10)

可印出的方式
s.args
(2, 2)
s.kwargs
{'debug': True}
s.options
{'countdown': 10}

簡化版(不帶選項類型參數=>控制任務)

add.s(2, 2, debug=True)

簡化外掛版又加上參數

add.s(1, 2, debug=True).set(countdown=5)

為何要有簽名?

直接啟動任務,不就可以執行?還要先變成可傳遞,再去調用一次

因為要讓多個任務可以組合

Task Flow(Primitives)

對多個任務的組合方式

組合方式
group 一次等多個任務結束後,才回傳
chain 任務一個接一個
chord group + callback任務
map 一個任務,處理多個不同參數,一次回傳多個結果
starmap 同map,參數帶入(tuple)
chunks 將可迭代參數自動區分成多個小任務

提醒:

阻塞

在celery 一個任務內,不可做其他任務等待,不能用阻塞的方式,所以chord才用callback,而rpc不會存結果,在這邊不能當後端使用

阻塞類型的定期程式

有阻塞類型的定期程式,就不能用beat,因為beat的啟動就是把定期本身當成一個任務丟出去到worker 必須改用APScheduler的定期在裡面手動丟給celery任務