Prefect

0. 背景

自從上次介紹過 Rocketry後,又發現了一個更符合我胃口的自動化執行與任務監控服務Prefect,是不是有點喜新厭舊的感覺?有一個更好的說法叫做最佳化或持續改良,找到更適合的工具也可以作為改進的目的吧?

剛好開發的時期處於部門建置期,所以調整的成本其實不算是太高,尤其已經經歷過了兩三次的調整,已經有些經驗了,作為一個衍生性商品自營交易主管兼開發者兼交易員教練的我需求大概如下:

使用者友善的操作介面,提供人員監控、管理、手動執行與排程功能。

Prefect Dashboard

金融市場是一個週而復始的行為,所以任務排程上就要考量時間依賴的任務執行模式,例如每日收盤後更新交易資料,或盤中每X分鐘/秒計算全市場選擇權的隱含波動率,最適當的模式就是Cron,Prefect的任務排程也可使用Cron。

Prefect另外一個特點就是可以追蹤每個函數的執行狀況與自動化log,對於人工監控環節可以很清楚的知道哪個環節效率不佳或存在問題,也可以確定整體任務執行流程的完善度。


1. Prefect介紹

1.1 官方資源

Prefect: Official website
Prefect: Docs

1.2 Prefect 工作任務概念

Task

Prefect的最小任務單位,我們口中定義的完整任務(Prefect: Flow),其實是由多個子任務(Prefect: Task)所構成,可以將任務切分為不同功能的函數,例如一個常用的ETL流程:

  1. Extract: 透過資料來源(API, 爬蟲,...etc)取得目標原始資料。

  2. Transform: 將資料轉換格式或計算,以符合目標資料庫的設計。

  3. Load: 將轉換後的資料存入目標資料庫。

我們便可將此三個不同的功能轉換成三個不同的函數,並加上前綴@task

@task
def extract():
    url = "https://example.com/api/data"
    response = requests.get(url)
    data = response.json()
    return data

@task
def transform(data):
    df = pd.DataFrame(data)
    return df[['Symbol','Date','Open','High','Low','Close','Volume']]

@task    
def load(data):
    connection = sqlite3.connect('example_db.db')
    cursor = connection.cursor()
    
    for row in data.to_dict('records'):
        cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")

    cursor.commit()
    connection.close()    
    

Flow

延續前面Task的範例,當建立完成三個子任務(Task)後,我們需要將三個任務串起來,以達成我們真正的目的:

@flow
def etl_flow():
    original_data = extract()
    transformed_data = transform(original_data)
    load(transformed_data)

透過etl_flow()將以pipeline的模式串起,一個任務完成後,才會執行另一個任務,彼此之間相互依存,

Workpool

2. 建立第一個專案

2.1 安裝

首先建立一個專案路徑與安裝虛擬環境後,啟動虛擬環境後,就可以輸入以下指令:

pip install prefect

2.2 建立etl_workflow.py

我們來一起建立一個最最最幼稚園的範例,就是ETL任務:

Step 0: 引入必要套件

from prefect import flow, task
import requests
import sqlite3

Step 1: 建立任務

@task
def extract():
    url = "https://example.com/api/data"
    response = requests.get(url)
    data = response.json()
    return data

@task
def transform(data):
    df = pd.DataFrame(data)
    return df[['Symbol','Date','Open','High','Low','Close','Volume']]

@task    
def load(data):
    connection = sqlite3.connect('example_db.db')
    cursor = connection.cursor()
    
    for row in data.to_dict('records'):
        cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")

    cursor.commit()
    connection.close()    
    

Step 2: 建立流程

@flow
def etl_flow():
    original_data = extract()
    transformed_data = transform(original_data)
    load(transformed_data)

Step3: 手動單次執行

if __name__ == "__main__":
    etl_flow()
python etl_workflow.py

Last updated