技能 airflow-dag-patterns
📦
建立生產就緒的 Airflow DAGs 需要了解運算子、感測器和錯誤處理模式。此技能提供經過實戰驗證的範本和資料管線編排的最佳實踐。
支援: Claude Codex Code(CC)
1
下載技能 ZIP
2
在 Claude 中上傳
前往 設定 → 功能 → 技能 → 上傳技能
3
開啟並開始使用
測試它
正在使用「airflow-dag-patterns」。 建立具有擷取、轉換和載入任務的每日 ETL DAG
預期結果:
- 已建立 DAG,排程為 '0 6 * * *'(每天早上 6 點)
- 三個 PythonOperator 任務:extract_data、transform_data、load_data
- 任務依賴關係:extract >> transform >> load
- 重試設定:3 次重試,5 分鐘指數退避
- 已設定任務失敗的電子郵件通知
正在使用「airflow-dag-patterns」。 新增感測器以在處理前等待 S3 檔案
預期結果:
- 已新增 S3KeySensor,逾時 2 小時,探測間隔 5 分鐘
- 感測器設定 mode='reschedule' 以釋放 worker 槽位
- 處理任務設定為感測器的下游依賴
- DAG 現在會在執行前等待檔案可用性
安全審計
安全v1 • 2/24/2026
Static analysis detected 41 patterns but all are false positives. Backtick characters are markdown formatting for code blocks, not shell execution. globals() usage is standard Airflow pattern for dynamic DAG generation. URLs are documentation references. This is educational documentation with no executable security risks.
2
已掃描檔案
554
分析行數
0
發現項
1
審計總數
未發現安全問題
審計者: claude
品質評分
38
架構
100
可維護性
87
內容
50
社群
100
安全
100
規範符合性
你能建構什麼
建立 ETL 管線的資料工程團隊
建立排程化的資料管線,從來源擷取、轉換資料並載入至資料倉儲,具備適當的錯誤處理和監控功能。
編排報告產生的分析團隊
排程自動化報告產生任務,依賴資料可用性和上游處理完成狀態。
排程化訓練模型的 ML 工程師
編排機器學習訓練管線,包含資料驗證、模型訓練和部署階段。
試試這些提示
基本 DAG 建立
建立一個每天早上 6 點執行的 Airflow DAG,從 CSV 檔案擷取資料、轉換後載入資料庫。包含具有重試功能的基本錯誤處理。
動態 DAG 生成
從設定清單生成多個相似的 DAG 以處理不同的資料來源。每個 DAG 應具有相同的結構,但有不同的排程和來源路徑。
具有資料品質檢查的分支管線
建立一個檢查資料品質指標的 DAG,並根據品質分數分支到不同的處理路徑。包含分支後的正確合併邏輯。
基於感測器的外部依賴
建立一個 DAG,等待檔案到達 S3、依賴另一個 DAG 完成,並在處理前監控 API 健康檢查端點。對感測器使用 reschedule 模式。
最佳實務
- 使用 TaskFlow API 獲得更乾淨的程式碼,並在任務之間自動傳遞 XCom
- 將所有任務設計為冪等,使重試不會導致資料重複
- 為任務設定適當的逾時,並對感測器使用 reschedule 模式以釋放 worker 資源
避免
- 使用 depends_on_past=True 會造成不必要的瓶頸並阻礙平行執行
- 硬編碼日期或數值,而非使用 Airflow 巨集如 {{ ds }} 來表示執行日期
- 將繁重的業務邏輯直接放在 DAG 檔案中,而非從獨立的模組匯入
常見問題
在部署到生產環境之前,我該如何測試我的 DAG?
使用 DagBag 類別在單元測試中載入和驗證 DAG。使用 pytest 分別測試個別任務函式。在本地 Airflow 執行個體中以 catchup=False 執行 DAG 以驗證執行流程。
運算子和感測器之間有什麼區別?
運算子執行動作,如執行 Python 程式碼或查詢。感測器是特殊的運算子,會等待外部條件,如檔案可用性、API 回應或其他 DAG 完成後才繼續執行。
我如何在 Airflow 的任務之間傳遞資料?
對於小型資料,透過從任務函式回傳值來使用 XCom。TaskFlow API 會自動處理 XCom。對於大型資料,將其儲存在外部儲存空間如 S3,並在任務之間傳遞參考。
為什麼我的任務在失敗後沒有重試?
確保在 default_args 或個別任務定義中設定 retries 和 retry_delay。檢查任務是否未在排程階段失敗。驗證 retry_exponential_backoff 以確保延遲遞增。
我該如何處理 DAG 對其他 DAG 的依賴?
使用 ExternalTaskSensor 等待上游 DAG 中的特定任務完成。設定 execution_date_fn 以在相依 DAG 之間比對正確的執行日期。
我應該為清理任務使用什麼觸發規則?
對於無論上游成功或失敗都必須執行的清理任務,使用 TriggerRule.ALL_DONE。對於僅在所有上游任務成功後才應執行的任務,使用 TriggerRule.ALL_SUCCESS。