Compétences airflow-dag-patterns
📦

airflow-dag-patterns

Sûr

Apache Airflow DAGs mit Produktionsmustern erstellen

Également disponible depuis: wshobson

Das Erstellen produktionsreifer Airflow DAGs erfordert das Verständnis von Operatoren, Sensoren und Fehlerbehandlungsmustern. Diese Skill bietet erprobte Vorlagen und Best Practices für die Datenpipeline-Orchestrierung.

Prend en charge: Claude Codex Code(CC)
🥉 75 Bronze
1

Télécharger le ZIP du skill

2

Importer dans Claude

Allez dans Paramètres → Capacités → Skills → Importer un skill

3

Activez et commencez à utiliser

Tester

Utilisation de "airflow-dag-patterns". Erstellen Sie einen täglichen ETL-DAG mit Extract-, Transform- und Load-Tasks

Résultat attendu:

  • DAG erstellt mit Zeitplan '0 6 * * *' (täglich um 6 Uhr)
  • Drei PythonOperator-Tasks: extract_data, transform_data, load_data
  • Task-Abhängigkeiten: extract >> transform >> load
  • Wiederholungskonfiguration: 3 Wiederholungen mit exponentiellem Backoff von 5 Minuten
  • E-Mail-Benachrichtigungen für Task-Fehler konfiguriert

Utilisation de "airflow-dag-patterns". Fügen Sie einen Sensor hinzu, der auf S3-Dateien vor der Verarbeitung wartet

Résultat attendu:

  • S3KeySensor hinzugefügt mit 2-Stunden-Timeout und 5-Minuten-Poke-Intervall
  • Sensor konfiguriert mit mode='reschedule', um Worker-Slots freizugeben
  • Verarbeitungs-Task als Downstream-Abhängigkeit des Sensors gesetzt
  • DAG wartet jetzt auf Dateiverfügbarkeit vor der Ausführung

Audit de sécurité

Sûr
v1 • 2/24/2026

Static analysis detected 41 patterns but all are false positives. Backtick characters are markdown formatting for code blocks, not shell execution. globals() usage is standard Airflow pattern for dynamic DAG generation. URLs are documentation references. This is educational documentation with no executable security risks.

2
Fichiers analysés
554
Lignes analysées
0
résultats
1
Total des audits
Aucun problème de sécurité trouvé
Audité par: claude

Score de qualité

38
Architecture
100
Maintenabilité
87
Contenu
50
Communauté
100
Sécurité
100
Conformité aux spécifications

Ce que vous pouvez construire

Data-Engineering-Teams, die ETL-Pipelines erstellen

Erstellen Sie geplante Daten-Pipelines, die Daten aus Quellen extrahieren, transformieren und in Warehouses laden, mit korrekter Fehlerbehandlung und Überwachung.

Analytics-Teams, die Berichtserstellung orchestrieren

Planen Sie automatisierte Berichtserstellungs-Tasks mit Abhängigkeiten von Datenverfügbarkeit und dem Abschluss vorgelagerter Verarbeitung.

ML-Ingenieure, die Modelle planmäßig trainieren

Orchestrieren Sie Machine-Learning-Training-Pipelines mit Datenvalidierung, Modelstraining und Bereitstellungsstufen.

Essayez ces prompts

Grundlegende DAG-Erstellung
Erstellen Sie einen Airflow DAG, der täglich um 6 Uhr morgens läuft, um Daten aus einer CSV-Datei zu extrahieren, zu transformieren und in eine Datenbank zu laden. Fügen Sie grundlegende Fehlerbehandlung mit Wiederholungen hinzu.
Dynamische DAG-Generierung
Generieren Sie mehrere ähnliche DAGs aus einer Konfigurationsliste zur Verarbeitung verschiedener Datenquellen. Jeder DAG sollte die gleiche Struktur haben, aber unterschiedliche Zeitpläne und Quellpfade.
Verzweigungspipeline mit Datenqualitätsprüfungen
Erstellen Sie einen DAG, der Datenqualitätsmetriken prüft und basierend auf der Qualitätsbewertung zu verschiedenen Verarbeitungspfaden verzweigt. Fügen Sie nach der Verzweigung korrekte Join-Logik hinzu.
Sensorbasierte externe Abhängigkeiten
Erstellen Sie einen DAG, der auf das Eintreffen von Dateien in S3 wartet, von einem anderen DAG abhängt und einen API-Gesundheitsendpunkt überwacht, bevor die Verarbeitung beginnt. Verwenden Sie den Reschedule-Modus für Sensoren.

Bonnes pratiques

  • Verwenden Sie die TaskFlow API für saubereren Code und automatische XCom-Übergabe zwischen Tasks
  • Gestalten Sie alle Tasks als idempotent, sodass Wiederholungen keine Datenduplikation verursachen
  • Setzen Sie geeignete Timeouts auf Tasks und verwenden Sie den Reschedule-Modus für Sensoren, um Worker-Ressourcen freizugeben

Éviter

  • Verwendung von depends_on_past=True, das unnötige Engpässe erzeugt und parallele Ausführung blockiert
  • Festcodierung von Daten oder Werten anstelle der Verwendung von Airflow-Makros wie {{ ds }} für Ausführungsdaten
  • Platzierung schwerer Geschäftslogik direkt in DAG-Dateien anstatt des Imports aus separaten Modulen

Foire aux questions

Wie teste ich meine DAGs vor der Bereitstellung in der Produktion?
Verwenden Sie die DagBag-Klasse, um DAGs in Unit-Tests zu laden und zu validieren. Testen Sie einzelne Task-Funktionen separat mit pytest. Führen Sie DAGs in einer lokalen Airflow-Instanz mit catchup=False aus, um den Ausführungsfluss zu verifizieren.
Was ist der Unterschied zwischen Operatoren und Sensoren?
Operatoren führen Aktionen aus, wie das Ausführen von Python-Code oder das Ausführen von Abfragen. Sensoren sind spezielle Operatoren, die auf externe Bedingungen wie Dateiverfügbarkeit, API-Antworten oder andere DAG-Abschlüsse warten, bevor sie fortfahren.
Wie übergebe ich Daten zwischen Tasks in Airflow?
Verwenden Sie XCom für kleine Daten, indem Sie Werte aus Task-Funktionen zurückgeben. Die TaskFlow API behandelt XCom automatisch. Für große Daten speichern Sie in externem Speicher wie S3 und übergeben Referenzen zwischen Tasks.
Warum werden meine Tasks bei Fehlern nicht wiederholt?
Stellen Sie sicher, dass retries und retry_delay in default_args oder einzelnen Task-Definitionen gesetzt sind. Überprüfen Sie, dass der Task nicht während der Planungsphase fehlschlägt. Verifizieren Sie retry_exponential_backoff für progressive Verzögerungserhöhungen.
Wie handhabe ich DAG-Abhängigkeiten von anderen DAGs?
Verwenden Sie ExternalTaskSensor, um auf spezifische Tasks in vorgelagerten DAGs zu warten. Konfigurieren Sie execution_date_fn, um die korrekten Ausführungsdaten zwischen abhängigen DAGs abzugleichen.
Welche Trigger-Regel sollte ich für Bereinigungs-Tasks verwenden?
Verwenden Sie TriggerRule.ALL_DONE für Bereinigung, die unabhängig vom Erfolg oder Fehler upstream ausgeführt werden muss. Verwenden Sie TriggerRule.ALL_SUCCESS für Tasks, die nur ausgeführt werden sollen, wenn alle upstream-Tasks erfolgreich waren.

Détails du développeur

Structure de fichiers