Task queue[三] Celery(二)
wordsCount: 491
readingTime: 1 min
viewers:
任務定義
將裝飾器綁到函數
celery = Celery(__name__,broker=broker)
@celery.task
def add(x: int, y: int) -> int:
time.sleep(5)
return x + y
add 就是一個task
任務使用
完整版
apply_async(args[, kwargs[, …]])
ex:
add.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},countdown=10))
簡化版(不帶選項類型參數=>控制任務)
delay(*args, **kwargs)
ex:
add.delay(arg1, arg2, kwarg1='x', kwarg2='y')
Signatures 簽名=⇒將函數當成可傳遞
完整版
add.signature((2, 2), countdown=10)
s = add.signature((2, 2), {'debug': True}, countdown=10)
可印出的方式
s.args
(2, 2)
s.kwargs
{'debug': True}
s.options
{'countdown': 10}
簡化版(不帶選項類型參數=>控制任務)
add.s(2, 2, debug=True)
簡化外掛版又加上參數
add.s(1, 2, debug=True).set(countdown=5)
為何要有簽名?
直接啟動任務,不就可以執行?還要先變成可傳遞,再去調用一次
因為要讓多個任務可以組合
Task Flow(Primitives)
對多個任務的組合方式
組合方式 | |
---|---|
group | 一次等多個任務結束後,才回傳 |
chain | 任務一個接一個 |
chord | group + callback任務 |
map | 一個任務,處理多個不同參數,一次回傳多個結果 |
starmap | 同map,參數帶入(tuple) |
chunks | 將可迭代參數自動區分成多個小任務 |
提醒:
阻塞
在celery 一個任務內,不可做其他任務等待,不能用阻塞的方式,所以chord才用callback,而rpc不會存結果,在這邊不能當後端使用
阻塞類型的定期程式
有阻塞類型的定期程式,就不能用beat,因為beat的啟動就是把定期本身當成一個任務丟出去到worker 必須改用APScheduler的定期在裡面手動丟給celery任務
Table of Contents
Related Posts
Task queue[二] Celery
緣由 在工作上,遇到單機已經過多負荷,所以開始做實作分散式爬蟲 Celery定義 分散式任務隊列處理 架構 任
2024-1-27
Task queue[一] Tq基礎
TaskQueue & MessageQueue 傳送物件 抽象層級 主要設定 相同 Tq 任務 高(基於mq的封裝) 任務的使用方式與處理 異步與分散式 Mq 訊息 低
2024-1-26
Message Queue[三] Kafka
緣由 因為工作學了rabbitMQ,所以閒暇之餘來學kafka,想知道兩者的差異 比較 速度 訊息保證 訊息大
2024-11-23
Message Queue[一] Mq基礎
MessageQueue(MQ) 訊息貯列,協助訊息以非同步方式溝通,常見協議MQTT,AMQP 訊息代理人 經紀人Broker 訊息發送
2023-11-18
Message Queue[二] RabbitMq & Pika Pika
緣由 在工作上,因為需要導入發布訂閱模式,故在幾種常見的Mq中做選擇 發布/訂閱 速度 訊息保證 訊息大小 訊息
2023-11-18
Sponsor
Wechat
Alipay