Rocketry
我原本的排程系統都是使用APScheduler,偶然間看到Rocketry,稍微看了一下官方文件後,就開始著手進行學習,套件使用方法相當類似於FastAPI,開發者也說是被FastAPI所啟發。這位開發者其實也是在金融相關的公司工作,所以他所設計的就是那麼對味。另一個金融公司開發的就是pandas。
Rocketry特色
文字描述模式
這個功能是比較少見的模式,利用文字敘述想要執行任務時間或頻率,開放的模式也代表著相對不嚴謹,所以我是比較少用一些。
@app.task('every 10 seconds')
def do_constantly():
...
@app.task('every 1 minute')
def do_minutely():
...
@app.task('every 1 hour')
def do_hourly():
...
@app.task('every 1 day')
def do_daily():
...
@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
...
提供Pipeline架構
一般的任務排程系統主要由時間驅動,並可以使用預設的固定時點(例如每日上午10點),或是時間區間執行(例如每五分鐘),但Rocketry提供了一個pipeline的模式,當任務流程成功與失敗時都可以由客製化的函數來執行。
例如下方官方文件中的範例程式,可以很直覺得看到當某一任務完成後,要執行的任務,或失敗執行的任務,甚至還可以多個都完成,或其中一個失敗的定義。例如我們就可以定義的任一函數失敗時,寄送Email進行通知。
from rocketry.conds import after_success, after_fail, after_finish
@app.task()
def do_things():
...
@app.task(after_success(do_things))
def do_after_success():
...
@app.task(after_fail(do_things))
def do_after_fail():
...
@app.task(after_finish(do_things))
def do_after_fail_or_success():
...
而Pipeline之間就一定會有資料的傳輸,例如取得某個資料後,將原始資料進行某種轉換或再計算,並整理出適合存入資料庫的格式,再進行資料庫更新。
from rocketry.conds import daily, after_success
from rocketry.args import Return
@app.task(daily)
def do_first():
return 'Hello World'
@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
# arg's value is "Hello World"
...
Rocketry安裝問題
如果直接使用pip install rocketry,而運行基本程序後,將會出現一堆Pydantic的錯誤,這時候只能將Pydantic重新以1.10.10版本進行安裝,這個問題才能解決,可能要等開發團隊更新Pydantic 2.X.X版以後才能運行。
Rocketry + Full-Stack (FastAPI)
AirFlow的網頁介面實在是相當方便,Rocketry開發者也有提供一個與FastAPI整合的專案,可以透過網頁介面管理與操作任務。
Last updated