Prefect
0. 背景


1. Prefect介紹
1.1 官方資源
1.2 Prefect 工作任務概念
Task
Flow
Workpool
2. 建立第一個專案
2.1 安裝
2.2 建立etl_workflow.py
Last updated


Last updated
@task
def extract():
url = "https://example.com/api/data"
response = requests.get(url)
data = response.json()
return data
@task
def transform(data):
df = pd.DataFrame(data)
return df[['Symbol','Date','Open','High','Low','Close','Volume']]
@task
def load(data):
connection = sqlite3.connect('example_db.db')
cursor = connection.cursor()
for row in data.to_dict('records'):
cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")
cursor.commit()
connection.close()
@flow
def etl_flow():
original_data = extract()
transformed_data = transform(original_data)
load(transformed_data)pip install prefectfrom prefect import flow, task
import requests
import sqlite3@task
def extract():
url = "https://example.com/api/data"
response = requests.get(url)
data = response.json()
return data
@task
def transform(data):
df = pd.DataFrame(data)
return df[['Symbol','Date','Open','High','Low','Close','Volume']]
@task
def load(data):
connection = sqlite3.connect('example_db.db')
cursor = connection.cursor()
for row in data.to_dict('records'):
cursor.execute(f"INSERT INTO PriceHistory (Symbol, Date, Open, High, Low, Close, Volume) VALUES ('{data['Symbol']}',......)")
cursor.commit()
connection.close()
@flow
def etl_flow():
original_data = extract()
transformed_data = transform(original_data)
load(transformed_data)if __name__ == "__main__":
etl_flow()python etl_workflow.py