# Prefect

## 0. 背景

自從上次介紹過 [Rocketry](/technology/task-automation/rocketry.md)後，又發現了一個更符合我胃口的自動化執行與任務監控服務**Prefect**，是不是有點喜新厭舊的感覺？有一個更好的說法叫做最佳化或持續改良，找到更適合的工具也可以作為改進的目的吧？

剛好開發的時期處於部門建置期，所以調整的成本其實不算是太高，尤其已經經歷過了兩三次的調整，已經有些經驗了，作為一個衍生性商品自營交易主管兼開發者兼交易員教練的我需求大概如下：

> 使用者友善的操作介面，提供人員監控、管理、手動執行與排程功能。

<figure><img src="/files/CslHjvoizT53K5c0Kpxm" alt=""><figcaption><p>Prefect Dashboard</p></figcaption></figure>

金融市場是一個週而復始的行為，所以任務排程上就要考量時間依賴的任務執行模式，例如每日收盤後更新交易資料，或盤中每X分鐘／秒計算全市場選擇權的隱含波動率，最適當的模式就是Cron，Prefect的任務排程也可使用Cron。

Prefect另外一個特點就是可以追蹤每個函數的執行狀況與自動化log，對於人工監控環節可以很清楚的知道哪個環節效率不佳或存在問題，也可以確定整體任務執行流程的完善度。

<figure><img src="/files/L4kbV4sa4Prr7rQ3liFv" alt=""><figcaption></figcaption></figure>

***

## 1. Prefect介紹

### 1.1 官方資源

{% embed url="<https://www.prefect.io/>" %}
Prefect: Official website
{% endembed %}

{% embed url="<https://docs.prefect.io/latest/>" %}
Prefect: Docs
{% endembed %}

### 1.2 Prefect 工作任務概念

#### Task

Prefect的最小任務單位，我們口中定義的完整任務(Prefect: Flow)，其實是由多個子任務(Prefect: Task)所構成，可以將任務切分為不同功能的函數，例如一個常用的ETL流程：

1. Extract: 透過資料來源(API, 爬蟲,...etc)取得目標原始資料。
2. Transform: 將資料轉換格式或計算，以符合目標資料庫的設計。
3. Load: 將轉換後的資料存入目標資料庫。

我們便可將此三個不同的功能轉換成三個不同的函數，並加上前綴`@task`：

<pre class="language-python"><code class="lang-python"><strong>@task
</strong>def extract():
    url = "https://example.com/api/data"
    response = requests.get(url)
    data = response.json()
    return data

@task
def transform(data):
    df = pd.DataFrame(data)
    return df[['Symbol','Date','Open','High','Low','Close','Volume']]

@task    
def load(data):
    connection = sqlite3.connect('example_db.db')
    cursor = connection.cursor()
    
    for row in data.to_dict('records'):
        cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")

    cursor.commit()
    connection.close()    
    
</code></pre>

#### Flow

延續前面Task的範例，當建立完成三個子任務(Task)後，我們需要將三個任務串起來，以達成我們真正的目的：

```python
@flow
def etl_flow():
    original_data = extract()
    transformed_data = transform(original_data)
    load(transformed_data)
```

透過etl\_flow()將以pipeline的模式串起，一個任務完成後，才會執行另一個任務，彼此之間相互依存，

#### Workpool

## 2. 建立第一個專案

#### 2.1 安裝

首先建立一個專案路徑與安裝虛擬環境後，啟動虛擬環境後，就可以輸入以下指令：

```bash
pip install prefect
```

#### 2.2 建立etl\_workflow\.py

我們來一起建立一個最最最幼稚園的範例，就是ETL任務：

**Step 0: 引入必要套件**

```python
from prefect import flow, task
import requests
import sqlite3
```

**Step 1: 建立任務**

```python
@task
def extract():
    url = "https://example.com/api/data"
    response = requests.get(url)
    data = response.json()
    return data

@task
def transform(data):
    df = pd.DataFrame(data)
    return df[['Symbol','Date','Open','High','Low','Close','Volume']]

@task    
def load(data):
    connection = sqlite3.connect('example_db.db')
    cursor = connection.cursor()
    
    for row in data.to_dict('records'):
        cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")

    cursor.commit()
    connection.close()    
    
```

**Step 2: 建立流程**

```python
@flow
def etl_flow():
    original_data = extract()
    transformed_data = transform(original_data)
    load(transformed_data)
```

**Step3: 手動單次執行**

```python
if __name__ == "__main__":
    etl_flow()
```

```bash
python etl_workflow.py
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://go.raymondctw.dev/technology/task-automation/prefect.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
