Rocketry
Rocketry特色
文字描述模式
提供Pipeline架構
Rocketry安裝問題
Rocketry + Full-Stack (FastAPI)
Last updated
Last updated
@app.task('every 10 seconds')
def do_constantly():
...
@app.task('every 1 minute')
def do_minutely():
...
@app.task('every 1 hour')
def do_hourly():
...
@app.task('every 1 day')
def do_daily():
...
@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
...from rocketry.conds import after_success, after_fail, after_finish
@app.task()
def do_things():
...
@app.task(after_success(do_things))
def do_after_success():
...
@app.task(after_fail(do_things))
def do_after_fail():
...
@app.task(after_finish(do_things))
def do_after_fail_or_success():
...from rocketry.conds import daily, after_success
from rocketry.args import Return
@app.task(daily)
def do_first():
return 'Hello World'
@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
# arg's value is "Hello World"
...