Popular Now
Infographic illustrating production‑ready GKE architecture, showing Google Cloud services, Kubernetes clusters, DevOps/GitOps workflows, SRE practices, observability, security, and disaster recovery components.

Production-Ready GKE: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Infographic showing best practices for production‑ready EKS deployments, illustrating AWS cloud architecture, Kubernetes clusters, GitOps automation, observability, security, and disaster recovery principles.

Production-Ready EKS: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Orchestrating a Regulatory Reporting Pipeline with Apache Airflow

Build a production COREP Airflow DAG: branching quality gates, XCom state passing, SLA alerts, OpenLineage per task, and an immutable audit trail for regulatory submissions.
📅 Day 13 of 18  ·  COREP Governance Pipeline Series  ·  Airflow Orchestration

The eleven modules built across Days 5–12 can each be run independently on the command line. That is fine for development. In production, you need something that runs them in order, on a schedule, retries on transient failures, branches when data quality fails, alerts when SLAs are missed, and produces an immutable audit trail of every execution. That something is Apache Airflow.

But Airflow for a regulatory reporting pipeline has requirements that a standard data engineering DAG does not. The pipeline runs on a fixed regulatory calendar — month-end, quarter-end. It must branch rather than fail silently: a COREP quality failure is not a pipeline error, it is a data quality incident that requires a different workflow path. Every task execution must be traceable: the regulator can ask “what data was used to compute the CET1 ratio in the Q1 2026 submission?” and you must be able to answer with a run ID, a lineage graph, and a timestamp.

This post builds the complete dags/corep_pipeline_dag.py from scratch, explains every design decision, and shows you how to read the Airflow UI to understand what happened in each run.

1. Why Airflow Over Prefect, Dagster, or a Cron Job

RequirementCronPrefect / DagsterAirflow 2.9
Fixed regulatory calendar scheduleYesYesYes
Conditional branching (quality fail → quarantine)NoYesYes
Task-level retry with exponential backoffNoYesYes
SLA miss alerting per taskNoPluginNative
XCom — pass state between tasks without filesNoYesYes
OpenLineage integration (per-task lineage)NoManualopenlineage-airflow
Immutable run history for auditNo — log rotationYesYes — PostgreSQL metadata DB
Self-hosted, open source, no vendor lock-inYesOSS core onlyApache 2.0
EBA/ECB audit familiarity (widely deployed in EU banks)NoEmergingDominant
🔎 The Decisive Factor: openlineage-airflow

The openlineage-airflow package instruments every Airflow task automatically. When a PythonOperator runs your module, Airflow emits an OpenLineage START event with the DAG run ID as the parent job before the task executes, and a COMPLETE or FAIL event after. This means Marquez receives a run-level lineage event for every task in every DAG run — with the DAG run ID linking them all. The result is a queryable graph: “show me the full lineage for DAG run corep_pipeline_2026-03-31T00:00:00Z” returns every dataset touched, every transformation applied, in order.

2. The Five Airflow Concepts You Must Understand

ConceptWhat it isHow we use it
DAGDirected Acyclic Graph — the pipeline definition. A Python file that declares tasks and their dependencies.One DAG: corep_monthly_pipeline. Runs on the last day of each month at 01:00 UTC.
OperatorA task template. PythonOperator runs a Python function. BranchPythonOperator returns the ID of the next task to run (branching logic).All module tasks use PythonOperator. Quality gates and XBRL validation result use BranchPythonOperator.
XComCross-task communication. A task can push a value to the Airflow metadata DB; a downstream task pulls it by task ID and key.Branch tasks push "PASS" or "FAIL" to XCom. The BranchPythonOperator pulls this value to decide routing.
TriggerRuleThe condition under which a task runs. Default is ALL_SUCCESS. NONE_FAILED_MIN_ONE_SUCCESS is used for tasks that must run after a branch regardless of which path was taken.The final audit-log task uses NONE_FAILED_MIN_ONE_SUCCESS so it always runs — whether the pipeline succeeded or was quarantined.
SLAService Level Agreement — a maximum allowed duration per task. If exceeded, Airflow triggers an alert callback.Set on the XBRL generation and validation tasks. COREP submission deadline is typically T+15 business days. SLA breach = alert to data governance team.
📌 Full DAG Architecture

3. The Complete DAG Flow

corep_monthly_pipeline  ·  schedule: last day of month 01:00 UTC  ·  Airflow 2.9 LocalExecutor

START
  │
  ▼
[ check_reporting_date ]      PythonOperator
  Derives reporting_date from dag_run.logical_date
  Pushes to XCom: reporting_date, reporting_period
  Verifies it is a valid EBA reporting date (quarter-end or month-end)
  │
  ▼
[ run_ingest ]               PythonOperator · retry=2 · retry_delay=5min
  IngestModule.run()
  Loads 6 CSVs → raw.* tables, uploads to MinIO
  │
  ▼
[ run_quality_layer1 ]        PythonOperator → pushes quality_gate_status to XCom
  QualityModule.run() — raw layer gates only
  │
  ▼
[ branch_quality_layer1 ]     BranchPythonOperator
  quality_gate_status == "PASS" → [ run_dbt ]
  quality_gate_status == "FAIL" → [ quarantine_raw_failure ]
  │                                      │
  ▼                                      ▼
[ run_dbt ]                  [ quarantine_raw_failure ]
  dbt run → all staging,       Archives raw tables to MinIO/quarantine/
  intermediate, mart models    Sends Slack/email alert
  dbt test                     TriggerRule: ALL_DONE
  │
  ▼
[ run_quality_layer2 ]        PythonOperator → pushes mart_gate_status to XCom
  QualityModule.run() — mart layer gates
  │
  ▼
[ branch_quality_layer2 ]     BranchPythonOperator
  mart_gate_status == "PASS" → [ run_catalog ]
  mart_gate_status == "FAIL" → [ quarantine_mart_failure ]
  │
  ▼
[ run_catalog ]              PythonOperator
  CatalogModule.run() — OM metadata ingestion
  │
  ├─────────────────────────┐
  ▼                         ▼
[ run_security ]         [ refresh_superset_metadata ]  (can run in parallel)
  SecurityModule.run()     Calls Superset API to refresh schema cache
  │                         │
  └─────────┬───────────────┘
            ▼
[ run_xbrl_gen ]             PythonOperator · SLA=60min
  XbrlGenModule.run()
  Generates XBRL instance → MinIO corep-xbrl-output
  │
  ▼
[ run_xbrl_valid ]           PythonOperator · SLA=30min → pushes xbrl_valid_status
  XbrlValidModule.run()
  │
  ▼
[ branch_xbrl_valid ]        BranchPythonOperator
  xbrl_valid_status == "PASS" → [ build_submission_package ]
  xbrl_valid_status == "FAIL" → [ quarantine_xbrl_failure ]
  │
  ▼
[ build_submission_package ] PythonOperator
  Bundles: XBRL file + validation report + GX data docs
  + Ranger policy export + lineage snapshot
  Creates submission_package_YYYYMMDD.zip in MinIO/submissions/
  │
  ▼
[ write_final_audit_log ]    PythonOperator
  TriggerRule: NONE_FAILED_MIN_ONE_SUCCESS
  Writes complete run summary to audit.pipeline_run_log
  Includes: all task statuses, row counts, XBRL file name, run_id
  │
  ▼
END
📄 The Complete DAG File

4. dags/corep_pipeline_dag.py

"""
dags/corep_pipeline_dag.py — Monthly COREP regulatory reporting pipeline.

Schedule: last day of each month at 01:00 UTC.
Executor:  LocalExecutor (configured in docker-compose.yml).

Design principles:
  1. Every module is a PythonOperator — no BashOperators, no subprocess DAG calls.
  2. Quality failures branch to quarantine — they never silently proceed.
  3. XCom carries only status strings and paths — never DataFrames.
  4. The final audit task runs regardless of branch taken.
  5. SLAs are set on the two slowest tasks (xbrl_gen, xbrl_valid).
"""

from __future__ import annotations

import json
import logging
import os
import subprocess
from datetime import datetime, timedelta, timezone
from pathlib import Path

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.dates import days_ago

log = logging.getLogger(__name__)

# ── Project root (mounted into the Airflow container) ────────────────
PROJECT_ROOT = Path(os.environ.get("COREP_PROJECT_ROOT", "/opt/corep"))
DBT_PROJECT  = PROJECT_ROOT / "dbt"

# ── Default task arguments ───────────────────────────────────────────
DEFAULT_ARGS = {
    "owner":            "corep-pipeline",
    "depends_on_past":  False,
    "email_on_failure": True,
    "email_on_retry":   False,
    "email":            [os.environ.get("ALERT_EMAIL", "governance@example.com")],
    "retries":          2,
    "retry_delay":      timedelta(minutes=5),
    "retry_exponential_backoff": True,
}

# ── SLA callback — fires when a task exceeds its SLA ─────────────────
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    log.error(
        "[airflow] SLA MISS on DAG=%s tasks=%s — governance team alerted.",
        dag.dag_id,
        [t.task_id for t in task_list],
    )
    # In production: send to PagerDuty / Slack webhook
    _slack_alert(
        f":warning: SLA breach on {dag.dag_id}: "
        + ", ".join(t.task_id for t in task_list)
    )


def _slack_alert(message: str) -> None:
    webhook = os.environ.get("SLACK_WEBHOOK_URL")
    if not webhook:
        log.info("[airflow] No SLACK_WEBHOOK_URL set — alert not sent.")
        return
    try:
        import urllib.request
        payload = json.dumps({"text": message}).encode()
        req = urllib.request.Request(
            webhook, data=payload, headers={"Content-Type": "application/json"}
        )
        urllib.request.urlopen(req, timeout=10)
    except Exception as exc:
        log.warning("[airflow] Slack alert failed (non-fatal): %s", exc)


# ═══════════════════════════════════════════════════════════════════════
# TASK FUNCTIONS
# Each function is a PythonOperator callable.
# Signature must accept **context to receive the Airflow task context dict.
# ═══════════════════════════════════════════════════════════════════════

def task_check_reporting_date(**context) -> None:
    """Derive and validate the reporting date from the DAG run logical_date."""
    logical_date  = context["logical_date"]           # pendulum.DateTime
    reporting_date = logical_date.strftime("%Y-%m-%d")

    # Determine the period: Q1=03-31, Q2=06-30, Q3=09-30, Q4=12-31
    month = logical_date.month
    if month in (3, 6, 9, 12):
        period_type = "QUARTERLY"
    else:
        period_type = "MONTHLY"

    log.info("[check_reporting_date] date=%s period=%s", reporting_date, period_type)
    ti = context["task_instance"]
    ti.xcom_push(key="reporting_date",   value=reporting_date)
    ti.xcom_push(key="reporting_period", value=period_type)


def task_run_ingest(**context) -> None:
    from modules.ingest import IngestModule
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    log.info("[ingest] Running for reporting_date=%s", reporting_date)
    IngestModule(run_context={"reporting_date": reporting_date}).run()


def task_run_quality_layer1(**context) -> None:
    """Run raw-layer quality gates. Push PASS/FAIL status to XCom."""
    from modules.quality import QualityModule, QualityGateError
    ti = context["task_instance"]
    try:
        QualityModule(layer="raw").run()
        ti.xcom_push(key="quality_gate_status", value="PASS")
    except QualityGateError as exc:
        log.error("[quality_layer1] Gate FAILED: %s", exc)
        ti.xcom_push(key="quality_gate_status", value="FAIL")


def branch_quality_layer1(**context) -> str:
    status = context["task_instance"].xcom_pull(
        task_ids="run_quality_layer1", key="quality_gate_status"
    )
    return "run_dbt" if status == "PASS" else "quarantine_raw_failure"


def task_quarantine_raw_failure(**context) -> None:
    """Archive raw tables snapshot to MinIO quarantine on Layer 1 failure."""
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    log.error("[quarantine] Layer 1 quality FAILED for %s — routing to quarantine.", reporting_date)
    _slack_alert(
        f":x: COREP Layer 1 quality gate FAILED for {reporting_date}. "
        "Pipeline halted — check GX data docs in MinIO corep-gx-reports."
    )
    # Write quarantine marker to audit log
    _write_audit_marker(context, "quarantine_raw", "FAIL")


def task_run_dbt(**context) -> None:
    """Run dbt staging → intermediate → mart + dbt test."""
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    env = {
        **os.environ,
        "REPORTING_DATE": reporting_date,
    }
    for cmd in [
        ["dbt", "run",  "--profiles-dir", str(DBT_PROJECT), "--project-dir", str(DBT_PROJECT)],
        ["dbt", "test", "--profiles-dir", str(DBT_PROJECT), "--project-dir", str(DBT_PROJECT)],
    ]:
        result = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False)
        log.info("[dbt] stdout:\n%s", result.stdout[-3000:])
        if result.returncode != 0:
            raise RuntimeError(
                f"[dbt] Command failed: {' '.join(cmd)}\n" + result.stderr[-2000:]
            )
    log.info("[dbt] All models and tests passed.")


def task_run_quality_layer2(**context) -> None:
    from modules.quality import QualityModule, QualityGateError
    ti = context["task_instance"]
    try:
        QualityModule(layer="mart").run()
        ti.xcom_push(key="mart_gate_status", value="PASS")
    except QualityGateError as exc:
        log.error("[quality_layer2] Gate FAILED: %s", exc)
        ti.xcom_push(key="mart_gate_status", value="FAIL")


def branch_quality_layer2(**context) -> str:
    status = context["task_instance"].xcom_pull(
        task_ids="run_quality_layer2", key="mart_gate_status"
    )
    return "run_catalog" if status == "PASS" else "quarantine_mart_failure"


def task_quarantine_mart_failure(**context) -> None:
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    log.error("[quarantine] Layer 2 mart quality FAILED for %s.", reporting_date)
    _slack_alert(
        f":x: COREP mart quality gate FAILED for {reporting_date}. "
        "dbt mart models produced values that failed GX expectations. "
        "Check mart_corep_suite results in MinIO corep-gx-reports."
    )
    _write_audit_marker(context, "quarantine_mart", "FAIL")


def task_run_catalog(**context) -> None:
    from modules.catalog import CatalogModule
    CatalogModule().run()


def task_run_security(**context) -> None:
    from modules.security import SecurityModule
    SecurityModule().run()


def task_refresh_superset(**context) -> None:
    """Refresh Superset schema cache so new mart columns are visible."""
    superset_url  = os.environ.get("SUPERSET_URL", "http://superset:8088")
    admin_user    = os.environ.get("SUPERSET_ADMIN_USER", "admin")
    admin_pass    = os.environ.get("SUPERSET_ADMIN_PASS", "admin")
    try:
        import urllib.request, urllib.parse
        # Login to get CSRF token
        login_data = json.dumps({
            "username": admin_user, "password": admin_pass,
            "provider": "db", "refresh": True,
        }).encode()
        req  = urllib.request.Request(
            f"{superset_url}/api/v1/security/login",
            data=login_data,
            headers={"Content-Type": "application/json"},
        )
        resp = json.loads(urllib.request.urlopen(req, timeout=15).read())
        token = resp["access_token"]
        # Refresh all database metadata
        refresh_req = urllib.request.Request(
            f"{superset_url}/api/v1/database/1/schemas/",
            headers={"Authorization": f"Bearer {token}"},
        )
        urllib.request.urlopen(refresh_req, timeout=15)
        log.info("[superset] Schema cache refreshed.")
    except Exception as exc:
        log.warning("[superset] Refresh failed (non-fatal): %s", exc)


def task_run_xbrl_gen(**context) -> None:
    from modules.xbrl_gen import XbrlGenModule
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    XbrlGenModule(run_context={"reporting_date": reporting_date}).run()


def task_run_xbrl_valid(**context) -> None:
    from modules.xbrl_valid import XbrlValidModule, XbrlValidationError
    ti = context["task_instance"]
    try:
        XbrlValidModule().run()
        ti.xcom_push(key="xbrl_valid_status", value="PASS")
    except XbrlValidationError as exc:
        log.error("[xbrl_valid] Validation FAILED: %s", exc)
        ti.xcom_push(key="xbrl_valid_status", value="FAIL")


def branch_xbrl_valid(**context) -> str:
    status = context["task_instance"].xcom_pull(
        task_ids="run_xbrl_valid", key="xbrl_valid_status"
    )
    return "build_submission_package" if status == "PASS" else "quarantine_xbrl_failure"


def task_quarantine_xbrl_failure(**context) -> None:
    reporting_date = context["task_instance"].xcom_pull(
        task_ids="check_reporting_date", key="reporting_date"
    )
    log.error("[quarantine] XBRL validation FAILED for %s.", reporting_date)
    _slack_alert(
        f":x: COREP XBRL validation FAILED for {reporting_date}. "
        "EBA formula rules not satisfied. Check validation report in MinIO corep-xbrl-output."
    )
    _write_audit_marker(context, "quarantine_xbrl", "FAIL")


def task_build_submission_package(**context) -> None:
    """
    Bundle the XBRL instance + all audit artefacts into a submission package ZIP.
    Uploaded to MinIO corep-xbrl-output/submissions/.
    """
    import zipfile, shutil, tempfile
    from minio import Minio

    ti             = context["task_instance"]
    reporting_date = ti.xcom_pull(task_ids="check_reporting_date", key="reporting_date")
    ts             = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    pkg_name       = f"COREP_{reporting_date}_{ts}.zip"

    xbrl_dir   = Path(os.environ.get("XBRL_OUTPUT_DIR",   "output/xbrl"))
    report_dir = Path(os.environ.get("XBRL_REPORT_DIR",  "output/validation_reports"))
    gx_dir     = Path("gx/uncommitted/data_docs/local_site")

    with tempfile.TemporaryDirectory() as tmp:
        pkg_path = Path(tmp) / pkg_name
        with zipfile.ZipFile(pkg_path, "w", zipfile.ZIP_DEFLATED) as zf:
            # XBRL instance
            for f in sorted(xbrl_dir.glob("*.xbrl")):
                zf.write(f, f"xbrl/{f.name}")
            # Validation report
            for f in sorted(report_dir.glob("*.json")):
                zf.write(f, f"validation/{f.name}")
            # GX data docs index
            index_html = gx_dir / "index.html"
            if index_html.exists():
                zf.write(index_html, "quality/gx_data_docs_index.html")
            # Pipeline manifest (run metadata)
            manifest = {
                "reporting_date": reporting_date,
                "packaged_at":    ts,
                "dag_run_id":     context["run_id"],
                "airflow_version": "2.9",
                "xbrl_files":     [f.name for f in sorted(xbrl_dir.glob("*.xbrl"))],
            }
            zf.writestr("manifest.json", json.dumps(manifest, indent=2))

        # Upload to MinIO
        client = Minio(
            os.environ.get("MINIO_ENDPOINT", "minio:9000"),
            access_key=os.environ.get("MINIO_ROOT_USER",     "minioadmin"),
            secret_key=os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin"),
            secure=False,
        )
        bucket = "corep-xbrl-output"
        if not client.bucket_exists(bucket):
            client.make_bucket(bucket)
        client.fput_object(bucket, f"submissions/{pkg_name}", str(pkg_path))
        log.info("[submission] Package uploaded → minio://%s/submissions/%s", bucket, pkg_name)
        ti.xcom_push(key="submission_package", value=pkg_name)


def task_write_final_audit_log(**context) -> None:
    """Write the complete pipeline run summary to audit.pipeline_run_log."""
    from modules.base import _pg_conn

    ti             = context["task_instance"]
    reporting_date = ti.xcom_pull(task_ids="check_reporting_date", key="reporting_date")
    q1_status      = ti.xcom_pull(task_ids="run_quality_layer1",    key="quality_gate_status") or "SKIP"
    q2_status      = ti.xcom_pull(task_ids="run_quality_layer2",    key="mart_gate_status")    or "SKIP"
    xv_status      = ti.xcom_pull(task_ids="run_xbrl_valid",        key="xbrl_valid_status")   or "SKIP"
    pkg_name       = ti.xcom_pull(task_ids="build_submission_package", key="submission_package") or "N/A"

    overall = "PASS" if xv_status == "PASS" else "FAIL"

    conn = _pg_conn()
    try:
        cur = conn.cursor()
        cur.execute(
            """
            INSERT INTO audit.pipeline_run_log
                (run_id, module_name, status, metadata, ran_at)
            VALUES (%s, 'pipeline_complete', %s, %s, now())
            """,
            (
                context["run_id"],
                overall,
                json.dumps({
                    "reporting_date":    reporting_date,
                    "dag_run_id":        context["run_id"],
                    "quality_layer1":    q1_status,
                    "quality_layer2":    q2_status,
                    "xbrl_validation":   xv_status,
                    "submission_package": pkg_name,
                }),
            ),
        )
        conn.commit()
        log.info("[audit] Final run summary written — overall=%s run_id=%s", overall, context["run_id"])
    finally:
        conn.close()

    if overall == "PASS":
        _slack_alert(
            f":white_check_mark: COREP pipeline PASSED for {reporting_date}. "
            f"Submission package: {pkg_name}"
        )


def _write_audit_marker(context: dict, stage: str, status: str) -> None:
    from modules.base import _pg_conn
    conn = _pg_conn()
    try:
        cur = conn.cursor()
        cur.execute(
            "INSERT INTO audit.pipeline_run_log (run_id, module_name, status, ran_at) VALUES (%s, %s, %s, now())",
            (context["run_id"], stage, status),
        )
        conn.commit()
    finally:
        conn.close()


# ═══════════════════════════════════════════════════════════════════════
# DAG DEFINITION
# ═══════════════════════════════════════════════════════════════════════

with DAG(
    dag_id="corep_monthly_pipeline",
    description="Monthly COREP regulatory reporting: ingest → quality → dbt → catalog → security → XBRL → validate → package",
    default_args=DEFAULT_ARGS,
    schedule_interval="0 1 L * *",   # 01:00 UTC on the last day of every month
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,               # Only ONE pipeline run at a time
    tags=["corep", "regulatory", "eba", "governance"],
    sla_miss_callback=sla_miss_callback,
    doc_md="""
    ## COREP Monthly Regulatory Reporting Pipeline

    Runs on the last calendar day of each month at 01:00 UTC.
    Produces a validated XBRL instance for EBA COREP supervisory reporting.

    ### Modules
    | Step | Module | Output |
    |------|--------|--------|
    | 1 | ingest | raw.* tables |
    | 2 | quality_layer1 | GX raw checkpoints |
    | 3 | dbt | staging/intermediate/mart |
    | 4 | quality_layer2 | GX mart checkpoints |
    | 5 | catalog | OpenMetadata indexed |
    | 6 | security | Ranger policies applied |
    | 7 | xbrl_gen | .xbrl instance file |
    | 8 | xbrl_valid | Arelle validation report |
    | 9 | submission | ZIP package in MinIO |
    """,
) as dag:

    t_check_date       = PythonOperator(task_id="check_reporting_date",  python_callable=task_check_reporting_date)
    t_ingest           = PythonOperator(task_id="run_ingest",             python_callable=task_run_ingest)
    t_quality1         = PythonOperator(task_id="run_quality_layer1",     python_callable=task_run_quality_layer1)
    t_branch_q1        = BranchPythonOperator(task_id="branch_quality_layer1", python_callable=branch_quality_layer1)
    t_quarantine_raw   = PythonOperator(task_id="quarantine_raw_failure",  python_callable=task_quarantine_raw_failure)
    t_dbt              = PythonOperator(task_id="run_dbt",                python_callable=task_run_dbt)
    t_quality2         = PythonOperator(task_id="run_quality_layer2",     python_callable=task_run_quality_layer2)
    t_branch_q2        = BranchPythonOperator(task_id="branch_quality_layer2", python_callable=branch_quality_layer2)
    t_quarantine_mart  = PythonOperator(task_id="quarantine_mart_failure", python_callable=task_quarantine_mart_failure)
    t_catalog          = PythonOperator(task_id="run_catalog",            python_callable=task_run_catalog)
    t_security         = PythonOperator(task_id="run_security",           python_callable=task_run_security)
    t_superset         = PythonOperator(task_id="refresh_superset_metadata", python_callable=task_refresh_superset)
    t_xbrl_gen         = PythonOperator(
        task_id="run_xbrl_gen",
        python_callable=task_run_xbrl_gen,
        sla=timedelta(hours=1),    # alert if XBRL generation > 60 min
    )
    t_xbrl_valid       = PythonOperator(
        task_id="run_xbrl_valid",
        python_callable=task_run_xbrl_valid,
        sla=timedelta(minutes=30),  # alert if validation > 30 min
    )
    t_branch_xv        = BranchPythonOperator(task_id="branch_xbrl_valid",     python_callable=branch_xbrl_valid)
    t_quarantine_xbrl  = PythonOperator(task_id="quarantine_xbrl_failure", python_callable=task_quarantine_xbrl_failure)
    t_submission       = PythonOperator(task_id="build_submission_package", python_callable=task_build_submission_package)
    t_audit_final      = PythonOperator(
        task_id="write_final_audit_log",
        python_callable=task_write_final_audit_log,
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,  # always runs
    )

    # ── Wire dependencies ────────────────────────────────────────────────
    t_check_date >> t_ingest >> t_quality1 >> t_branch_q1
    t_branch_q1  >> [t_dbt, t_quarantine_raw]
    t_dbt        >> t_quality2 >> t_branch_q2
    t_branch_q2  >> [t_catalog, t_quarantine_mart]
    t_catalog    >> [t_security, t_superset]
    [t_security, t_superset] >> t_xbrl_gen
    t_xbrl_gen   >> t_xbrl_valid >> t_branch_xv
    t_branch_xv  >> [t_submission, t_quarantine_xbrl]
    [t_submission, t_quarantine_raw, t_quarantine_mart, t_quarantine_xbrl] >> t_audit_final
▶ Triggering and Monitoring the DAG

5. How to Trigger, Monitor, and Debug

5.1 Initial Setup

# Access Airflow UI at http://localhost:8090
# Default credentials: admin / admin (set in docker-compose.yml)

# Verify the DAG appears — it should auto-detect from dags/ folder
# within 30 seconds of starting the Airflow container

# Check DAG syntax without triggering a run
python -c "from dags.corep_pipeline_dag import dag; print('DAG OK:', dag.dag_id)"

# Enable the DAG (it starts paused for safety)
# In the UI: toggle the pause switch next to "corep_monthly_pipeline"

# Or via Airflow CLI inside the container:
docker exec corep-airflow airflow dags unpause corep_monthly_pipeline

5.2 Trigger a Manual Run (Development)

# Trigger via CLI with a specific logical date (simulates end-of-month run)
docker exec corep-airflow airflow dags trigger \
    corep_monthly_pipeline \
    --conf '{"reporting_date": "2026-03-31"}'

# Watch task states in real time
docker exec corep-airflow airflow tasks states-for-dag-run \
    corep_monthly_pipeline \
    --run-id "manual__2026-05-07T10:00:00+00:00"

# Expected output of a passing run:
check_reporting_date    success
run_ingest              success
run_quality_layer1      success
branch_quality_layer1   success
run_dbt                 success
run_quality_layer2      success
branch_quality_layer2   success
run_catalog             success
run_security            success
refresh_superset_metadata success
run_xbrl_gen            success
run_xbrl_valid          success
branch_xbrl_valid       success
build_submission_package success
write_final_audit_log   success
quarantine_raw_failure  skipped
quarantine_mart_failure skipped
quarantine_xbrl_failure skipped

# Branches not taken show as "skipped" — this is correct Airflow behaviour

5.3 Reading a Quality Failure Run

# When a Layer 1 quality gate fails, the DAG run looks like this:
check_reporting_date    success
run_ingest              success
run_quality_layer1      success  # ← task succeeded; gate FAIL stored in XCom
branch_quality_layer1   success  # ← branch ran and chose quarantine path
quarantine_raw_failure  success  # ← quarantine task ran (status=success but pipeline=FAIL)
run_dbt                 skipped
run_quality_layer2      skipped
... all downstream      skipped
write_final_audit_log   success  # ← always runs due to NONE_FAILED_MIN_ONE_SUCCESS

# The entire DAG run shows as "success" in Airflow because
# no task raised an exception. The FAIL is in the audit log and XCom.
# This is intentional: a quality failure is not a pipeline ERROR,
# it is a DATA QUALITY INCIDENT handled by the designed workflow.

# Query the audit log to confirm:
docker exec corep-postgres psql -U corep_admin -d corep -c \
    "SELECT run_id, module_name, status, metadata->>'reporting_date' AS date
     FROM audit.pipeline_run_log
     ORDER BY ran_at DESC LIMIT 10;"

6. Automatic OpenLineage Lineage per DAG Run

When openlineage-airflow is installed, Airflow emits lineage events automatically for every task. You get a complete DAG-level lineage graph in Marquez without writing a single extra line of code in your task functions. Here is what the integration adds:

Event emittedWhenWhat Marquez receives
STARTBefore task function runsJob: corep_monthly_pipeline.run_ingest, Run ID = Airflow task instance run ID, parent = DAG run ID
COMPLETEAfter task function returns without exceptionOutput datasets (if module’s emit_lineage() also fires, both events appear — they are deduplicated by run ID)
FAILAfter task function raises an exceptionError message, stack trace, run ID — allows Marquez to mark the dataset lineage node as failed
# openlineage-airflow config in airflow.cfg (or environment variables)
# Set these in docker-compose.yml for the Airflow container:

AIRFLOW__OPENLINEAGE__TRANSPORT=http
AIRFLOW__OPENLINEAGE__URL=http://marquez:5000
AIRFLOW__OPENLINEAGE__NAMESPACE=corep-governance-pipeline

# Verify lineage is being received after a run:
curl http://localhost:5000/api/v1/jobs?namespace=corep-governance-pipeline | python -m json.tool

# Expected: one job entry per task in the DAG
# Each job entry has runs[] with start_at, end_at, and state (COMPLETED/FAILED)

7. The Regulatory Calendar Schedule

⚠ Why schedule_interval="0 1 L * *" — Not a Standard Cron

The L (Last) day-of-month expression requires the croniter library ≥ 1.3.8 and Airflow ≥ 2.7. It runs the pipeline at 01:00 UTC on the last calendar day of every month — which is the correct trigger point for COREP monthly reporting (the reporting period closes on the last day of the month; processing runs early next morning after midnight UTC).

If your Airflow version does not support L syntax, use a Timetable instead (see Airflow 2.4+ custom timetables) or deploy month-end via @monthly (which runs on the 1st of the next month — one day late, which may be acceptable for your NCA’s T+N submission deadline).

Reporting typeSchedule expressionRun atEBA deadline
Monthly (all months)0 1 L * *01:00 UTC, last day of monthT+12 business days for most NCAs
Quarterly only (Q1/Q2/Q3/Q4)0 1 L 3,6,9,12 *01:00 UTC, last day of March, June, September, DecemberT+15 business days per EBA ITS
Manual trigger (backfill or rerun)airflow dags trigger --conf ...On demandN/A — used for correction submissions
🔒 The Regulatory Audit Trail

8. What the Audit Trail Looks Like After a Complete Run

After a successful end-of-month run, the following artefacts exist and are queryable:

-- Query the full pipeline audit log for a reporting date
SELECT
    module_name,
    status,
    metadata->>'reporting_date'    AS date,
    metadata->>'error_count'       AS errors,
    metadata->>'submission_package' AS package,
    ran_at
FROM audit.pipeline_run_log
WHERE metadata->>'reporting_date' = '2026-03-31'
ORDER BY ran_at;

-- Output:
-- module_name          | status | date       | errors | package                         | ran_at
-- ingest               | PASS   | 2026-03-31 |        |                                 | 2026-03-31 01:02:14
-- quality_raw          | PASS   | 2026-03-31 |        |                                 | 2026-03-31 01:03:45
-- quality_mart         | PASS   | 2026-03-31 |        |                                 | 2026-03-31 01:09:22
-- catalog              | PASS   | 2026-03-31 |        |                                 | 2026-03-31 01:11:05
-- security             | PASS   | 2026-03-31 |        |                                 | 2026-03-31 01:11:38
-- xbrl_valid           | PASS   | 2026-03-31 | 0      |                                 | 2026-03-31 01:18:52
-- pipeline_complete    | PASS   | 2026-03-31 | 0      | COREP_2026-03-31_20260331T011912Z.zip | 2026-03-31 01:19:03
Artefacts produced for reporting date 2026-03-31:

MinIO bucket: corep-eba-source
  └── source/capital_instruments.csv   (raw input — immutable copy)
  └── source/rwa_exposures.csv
  └── source/...

MinIO bucket: corep-gx-reports
  └── raw/corep_raw_2026-03-31/         (GX HTML data docs — Layer 1 pass)
  └── mart/corep_mart_2026-03-31/       (GX HTML data docs — Layer 2 pass)

MinIO bucket: corep-xbrl-output
  └── COREP_2026-03-31_20260331T011623Z.xbrl
  └── validation_reports/validation_report_20260331T011845Z.json
  └── submissions/COREP_2026-03-31_20260331T011912Z.zip

PostgreSQL: audit.pipeline_run_log
  └── 7 rows — one per module — all PASS

Marquez lineage graph:
  └── corep_monthly_pipeline.run_ingest (COMPLETE)
       → raw.capital_instruments, raw.rwa_exposures, ...
  └── corep_monthly_pipeline.run_dbt (COMPLETE)
       → staging.*, intermediate.*, mart.*
  └── corep_monthly_pipeline.run_xbrl_gen (COMPLETE)
       → corep-xbrl-output/COREP_2026-03-31_*.xbrl

9. Common Airflow Problems in This Pipeline and Their Fixes

ProblemSymptomFix
Module import error on task startTask immediately fails with ModuleNotFoundError: modules.ingestThe Airflow container’s PYTHONPATH must include /opt/corep. Set PYTHONPATH=/opt/corep in the Airflow environment or add to airflow.cfg: pythonpath = /opt/corep
Branch downstream tasks show “upstream_failed” not “skipped”All tasks after a branch show upstream_failed, blocking the final audit taskEvery task that may be “skipped” by a branch must have its trigger_rule explicitly set. The final audit task must use TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS.
DAG schedule not triggeringDAG paused or start_date in futureUse days_ago(1) as start_date. Set catchup=False. Unpause the DAG. Check airflow scheduler container is running.
XCom value is None in downstream taskxcom_pull returns None when a branch was not takenAlways use or "SKIP" / or "N/A" as default when pulling XCom from tasks that may have been skipped. Skipped tasks do not push XCom values.
dbt subprocess fails silentlyTask succeeds but mart tables not populatedAlways check result.returncode != 0 and raise. Do not rely on dbt’s exit code alone — also check for "ERROR" in result.stdout.
Max active runs exceededManual trigger does nothingmax_active_runs=1 prevents parallel runs. If a previous run is still in progress (or stuck), clear it: airflow dags clear corep_monthly_pipeline --run-id <stuck_id>

📚 Day 13 Key Takeaways

  • BranchPythonOperator + TriggerRule is the core pattern for regulatory pipelines. Quality failures must route to quarantine, never silently proceed. The final audit task must always run regardless of branch.
  • Never raise exceptions for quality failures — push "FAIL" to XCom instead. Let the branch decide the route. If the quality task raises, Airflow marks it as a task error, which activates its retry policy and blocks the branch logic.
  • XCom carries status strings, not data. Passing a DataFrame through XCom would bypass the PostgreSQL persistence that makes the audit trail immutable. All data flows through PostgreSQL → MinIO → PostgreSQL; XCom carries only run metadata.
  • openlineage-airflow is the cheapest audit investment you will ever make. Three environment variables and Marquez automatically receives a complete DAG-level lineage graph for every run, with task-level start/end timestamps and dataset references.
  • The submission package ZIP is the regulator-facing deliverable. It contains the XBRL instance, the validation evidence, and the GX quality documentation. Everything a compliance team needs to reconstruct and defend the submission is in one file.
  • SLAs are a regulatory calendar tool, not a performance metric. Set SLAs based on the NCA submission deadline minus your correction buffer — typically T+12 business days for monthly COREP. An SLA breach means you need to investigate immediately, not eventually.
  • Next: Day 14 — Building the COREP Capital Dashboard in Apache Superset — connecting Trino as the query engine and building the CET1 ratio time-series panel from mart.corep_c0300.
Previous Post

XBRL Formula Validation — Why Your COREP Numbers Must Add Up Across Templates

Next Post
Add a comment

Leave a Reply

Your email address will not be published. Required fields are marked *