The eleven modules built across Days 5–12 can each be run independently on the command line. That is fine for development. In production, you need something that runs them in order, on a schedule, retries on transient failures, branches when data quality fails, alerts when SLAs are missed, and produces an immutable audit trail of every execution. That something is Apache Airflow.
But Airflow for a regulatory reporting pipeline has requirements that a standard data engineering DAG does not. The pipeline runs on a fixed regulatory calendar — month-end, quarter-end. It must branch rather than fail silently: a COREP quality failure is not a pipeline error, it is a data quality incident that requires a different workflow path. Every task execution must be traceable: the regulator can ask “what data was used to compute the CET1 ratio in the Q1 2026 submission?” and you must be able to answer with a run ID, a lineage graph, and a timestamp.
This post builds the complete dags/corep_pipeline_dag.py from scratch, explains every design decision, and shows you how to read the Airflow UI to understand what happened in each run.
1. Why Airflow Over Prefect, Dagster, or a Cron Job
| Requirement | Cron | Prefect / Dagster | Airflow 2.9 |
|---|---|---|---|
| Fixed regulatory calendar schedule | Yes | Yes | Yes |
| Conditional branching (quality fail → quarantine) | No | Yes | Yes |
| Task-level retry with exponential backoff | No | Yes | Yes |
| SLA miss alerting per task | No | Plugin | Native |
| XCom — pass state between tasks without files | No | Yes | Yes |
| OpenLineage integration (per-task lineage) | No | Manual | openlineage-airflow |
| Immutable run history for audit | No — log rotation | Yes | Yes — PostgreSQL metadata DB |
| Self-hosted, open source, no vendor lock-in | Yes | OSS core only | Apache 2.0 |
| EBA/ECB audit familiarity (widely deployed in EU banks) | No | Emerging | Dominant |
The openlineage-airflow package instruments every Airflow task automatically. When a PythonOperator runs your module, Airflow emits an OpenLineage START event with the DAG run ID as the parent job before the task executes, and a COMPLETE or FAIL event after. This means Marquez receives a run-level lineage event for every task in every DAG run — with the DAG run ID linking them all. The result is a queryable graph: “show me the full lineage for DAG run corep_pipeline_2026-03-31T00:00:00Z” returns every dataset touched, every transformation applied, in order.
2. The Five Airflow Concepts You Must Understand
| Concept | What it is | How we use it |
|---|---|---|
| DAG | Directed Acyclic Graph — the pipeline definition. A Python file that declares tasks and their dependencies. | One DAG: corep_monthly_pipeline. Runs on the last day of each month at 01:00 UTC. |
| Operator | A task template. PythonOperator runs a Python function. BranchPythonOperator returns the ID of the next task to run (branching logic). | All module tasks use PythonOperator. Quality gates and XBRL validation result use BranchPythonOperator. |
| XCom | Cross-task communication. A task can push a value to the Airflow metadata DB; a downstream task pulls it by task ID and key. | Branch tasks push "PASS" or "FAIL" to XCom. The BranchPythonOperator pulls this value to decide routing. |
| TriggerRule | The condition under which a task runs. Default is ALL_SUCCESS. NONE_FAILED_MIN_ONE_SUCCESS is used for tasks that must run after a branch regardless of which path was taken. | The final audit-log task uses NONE_FAILED_MIN_ONE_SUCCESS so it always runs — whether the pipeline succeeded or was quarantined. |
| SLA | Service Level Agreement — a maximum allowed duration per task. If exceeded, Airflow triggers an alert callback. | Set on the XBRL generation and validation tasks. COREP submission deadline is typically T+15 business days. SLA breach = alert to data governance team. |
3. The Complete DAG Flow
corep_monthly_pipeline · schedule: last day of month 01:00 UTC · Airflow 2.9 LocalExecutor START │ ▼ [ check_reporting_date ] PythonOperator Derives reporting_date from dag_run.logical_date Pushes to XCom: reporting_date, reporting_period Verifies it is a valid EBA reporting date (quarter-end or month-end) │ ▼ [ run_ingest ] PythonOperator · retry=2 · retry_delay=5min IngestModule.run() Loads 6 CSVs → raw.* tables, uploads to MinIO │ ▼ [ run_quality_layer1 ] PythonOperator → pushes quality_gate_status to XCom QualityModule.run() — raw layer gates only │ ▼ [ branch_quality_layer1 ] BranchPythonOperator quality_gate_status == "PASS" → [ run_dbt ] quality_gate_status == "FAIL" → [ quarantine_raw_failure ] │ │ ▼ ▼ [ run_dbt ] [ quarantine_raw_failure ] dbt run → all staging, Archives raw tables to MinIO/quarantine/ intermediate, mart models Sends Slack/email alert dbt test TriggerRule: ALL_DONE │ ▼ [ run_quality_layer2 ] PythonOperator → pushes mart_gate_status to XCom QualityModule.run() — mart layer gates │ ▼ [ branch_quality_layer2 ] BranchPythonOperator mart_gate_status == "PASS" → [ run_catalog ] mart_gate_status == "FAIL" → [ quarantine_mart_failure ] │ ▼ [ run_catalog ] PythonOperator CatalogModule.run() — OM metadata ingestion │ ├─────────────────────────┐ ▼ ▼ [ run_security ] [ refresh_superset_metadata ] (can run in parallel) SecurityModule.run() Calls Superset API to refresh schema cache │ │ └─────────┬───────────────┘ ▼ [ run_xbrl_gen ] PythonOperator · SLA=60min XbrlGenModule.run() Generates XBRL instance → MinIO corep-xbrl-output │ ▼ [ run_xbrl_valid ] PythonOperator · SLA=30min → pushes xbrl_valid_status XbrlValidModule.run() │ ▼ [ branch_xbrl_valid ] BranchPythonOperator xbrl_valid_status == "PASS" → [ build_submission_package ] xbrl_valid_status == "FAIL" → [ quarantine_xbrl_failure ] │ ▼ [ build_submission_package ] PythonOperator Bundles: XBRL file + validation report + GX data docs + Ranger policy export + lineage snapshot Creates submission_package_YYYYMMDD.zip in MinIO/submissions/ │ ▼ [ write_final_audit_log ] PythonOperator TriggerRule: NONE_FAILED_MIN_ONE_SUCCESS Writes complete run summary to audit.pipeline_run_log Includes: all task statuses, row counts, XBRL file name, run_id │ ▼ END
4. dags/corep_pipeline_dag.py
""" dags/corep_pipeline_dag.py — Monthly COREP regulatory reporting pipeline. Schedule: last day of each month at 01:00 UTC. Executor: LocalExecutor (configured in docker-compose.yml). Design principles: 1. Every module is a PythonOperator — no BashOperators, no subprocess DAG calls. 2. Quality failures branch to quarantine — they never silently proceed. 3. XCom carries only status strings and paths — never DataFrames. 4. The final audit task runs regardless of branch taken. 5. SLAs are set on the two slowest tasks (xbrl_gen, xbrl_valid). """ from __future__ import annotations import json import logging import os import subprocess from datetime import datetime, timedelta, timezone from pathlib import Path from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule from airflow.utils.dates import days_ago log = logging.getLogger(__name__) # ── Project root (mounted into the Airflow container) ──────────────── PROJECT_ROOT = Path(os.environ.get("COREP_PROJECT_ROOT", "/opt/corep")) DBT_PROJECT = PROJECT_ROOT / "dbt" # ── Default task arguments ─────────────────────────────────────────── DEFAULT_ARGS = { "owner": "corep-pipeline", "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, "email": [os.environ.get("ALERT_EMAIL", "governance@example.com")], "retries": 2, "retry_delay": timedelta(minutes=5), "retry_exponential_backoff": True, } # ── SLA callback — fires when a task exceeds its SLA ───────────────── def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis): log.error( "[airflow] SLA MISS on DAG=%s tasks=%s — governance team alerted.", dag.dag_id, [t.task_id for t in task_list], ) # In production: send to PagerDuty / Slack webhook _slack_alert( f":warning: SLA breach on {dag.dag_id}: " + ", ".join(t.task_id for t in task_list) ) def _slack_alert(message: str) -> None: webhook = os.environ.get("SLACK_WEBHOOK_URL") if not webhook: log.info("[airflow] No SLACK_WEBHOOK_URL set — alert not sent.") return try: import urllib.request payload = json.dumps({"text": message}).encode() req = urllib.request.Request( webhook, data=payload, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=10) except Exception as exc: log.warning("[airflow] Slack alert failed (non-fatal): %s", exc) # ═══════════════════════════════════════════════════════════════════════ # TASK FUNCTIONS # Each function is a PythonOperator callable. # Signature must accept **context to receive the Airflow task context dict. # ═══════════════════════════════════════════════════════════════════════ def task_check_reporting_date(**context) -> None: """Derive and validate the reporting date from the DAG run logical_date.""" logical_date = context["logical_date"] # pendulum.DateTime reporting_date = logical_date.strftime("%Y-%m-%d") # Determine the period: Q1=03-31, Q2=06-30, Q3=09-30, Q4=12-31 month = logical_date.month if month in (3, 6, 9, 12): period_type = "QUARTERLY" else: period_type = "MONTHLY" log.info("[check_reporting_date] date=%s period=%s", reporting_date, period_type) ti = context["task_instance"] ti.xcom_push(key="reporting_date", value=reporting_date) ti.xcom_push(key="reporting_period", value=period_type) def task_run_ingest(**context) -> None: from modules.ingest import IngestModule reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) log.info("[ingest] Running for reporting_date=%s", reporting_date) IngestModule(run_context={"reporting_date": reporting_date}).run() def task_run_quality_layer1(**context) -> None: """Run raw-layer quality gates. Push PASS/FAIL status to XCom.""" from modules.quality import QualityModule, QualityGateError ti = context["task_instance"] try: QualityModule(layer="raw").run() ti.xcom_push(key="quality_gate_status", value="PASS") except QualityGateError as exc: log.error("[quality_layer1] Gate FAILED: %s", exc) ti.xcom_push(key="quality_gate_status", value="FAIL") def branch_quality_layer1(**context) -> str: status = context["task_instance"].xcom_pull( task_ids="run_quality_layer1", key="quality_gate_status" ) return "run_dbt" if status == "PASS" else "quarantine_raw_failure" def task_quarantine_raw_failure(**context) -> None: """Archive raw tables snapshot to MinIO quarantine on Layer 1 failure.""" reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) log.error("[quarantine] Layer 1 quality FAILED for %s — routing to quarantine.", reporting_date) _slack_alert( f":x: COREP Layer 1 quality gate FAILED for {reporting_date}. " "Pipeline halted — check GX data docs in MinIO corep-gx-reports." ) # Write quarantine marker to audit log _write_audit_marker(context, "quarantine_raw", "FAIL") def task_run_dbt(**context) -> None: """Run dbt staging → intermediate → mart + dbt test.""" reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) env = { **os.environ, "REPORTING_DATE": reporting_date, } for cmd in [ ["dbt", "run", "--profiles-dir", str(DBT_PROJECT), "--project-dir", str(DBT_PROJECT)], ["dbt", "test", "--profiles-dir", str(DBT_PROJECT), "--project-dir", str(DBT_PROJECT)], ]: result = subprocess.run(cmd, capture_output=True, text=True, env=env, check=False) log.info("[dbt] stdout:\n%s", result.stdout[-3000:]) if result.returncode != 0: raise RuntimeError( f"[dbt] Command failed: {' '.join(cmd)}\n" + result.stderr[-2000:] ) log.info("[dbt] All models and tests passed.") def task_run_quality_layer2(**context) -> None: from modules.quality import QualityModule, QualityGateError ti = context["task_instance"] try: QualityModule(layer="mart").run() ti.xcom_push(key="mart_gate_status", value="PASS") except QualityGateError as exc: log.error("[quality_layer2] Gate FAILED: %s", exc) ti.xcom_push(key="mart_gate_status", value="FAIL") def branch_quality_layer2(**context) -> str: status = context["task_instance"].xcom_pull( task_ids="run_quality_layer2", key="mart_gate_status" ) return "run_catalog" if status == "PASS" else "quarantine_mart_failure" def task_quarantine_mart_failure(**context) -> None: reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) log.error("[quarantine] Layer 2 mart quality FAILED for %s.", reporting_date) _slack_alert( f":x: COREP mart quality gate FAILED for {reporting_date}. " "dbt mart models produced values that failed GX expectations. " "Check mart_corep_suite results in MinIO corep-gx-reports." ) _write_audit_marker(context, "quarantine_mart", "FAIL") def task_run_catalog(**context) -> None: from modules.catalog import CatalogModule CatalogModule().run() def task_run_security(**context) -> None: from modules.security import SecurityModule SecurityModule().run() def task_refresh_superset(**context) -> None: """Refresh Superset schema cache so new mart columns are visible.""" superset_url = os.environ.get("SUPERSET_URL", "http://superset:8088") admin_user = os.environ.get("SUPERSET_ADMIN_USER", "admin") admin_pass = os.environ.get("SUPERSET_ADMIN_PASS", "admin") try: import urllib.request, urllib.parse # Login to get CSRF token login_data = json.dumps({ "username": admin_user, "password": admin_pass, "provider": "db", "refresh": True, }).encode() req = urllib.request.Request( f"{superset_url}/api/v1/security/login", data=login_data, headers={"Content-Type": "application/json"}, ) resp = json.loads(urllib.request.urlopen(req, timeout=15).read()) token = resp["access_token"] # Refresh all database metadata refresh_req = urllib.request.Request( f"{superset_url}/api/v1/database/1/schemas/", headers={"Authorization": f"Bearer {token}"}, ) urllib.request.urlopen(refresh_req, timeout=15) log.info("[superset] Schema cache refreshed.") except Exception as exc: log.warning("[superset] Refresh failed (non-fatal): %s", exc) def task_run_xbrl_gen(**context) -> None: from modules.xbrl_gen import XbrlGenModule reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) XbrlGenModule(run_context={"reporting_date": reporting_date}).run() def task_run_xbrl_valid(**context) -> None: from modules.xbrl_valid import XbrlValidModule, XbrlValidationError ti = context["task_instance"] try: XbrlValidModule().run() ti.xcom_push(key="xbrl_valid_status", value="PASS") except XbrlValidationError as exc: log.error("[xbrl_valid] Validation FAILED: %s", exc) ti.xcom_push(key="xbrl_valid_status", value="FAIL") def branch_xbrl_valid(**context) -> str: status = context["task_instance"].xcom_pull( task_ids="run_xbrl_valid", key="xbrl_valid_status" ) return "build_submission_package" if status == "PASS" else "quarantine_xbrl_failure" def task_quarantine_xbrl_failure(**context) -> None: reporting_date = context["task_instance"].xcom_pull( task_ids="check_reporting_date", key="reporting_date" ) log.error("[quarantine] XBRL validation FAILED for %s.", reporting_date) _slack_alert( f":x: COREP XBRL validation FAILED for {reporting_date}. " "EBA formula rules not satisfied. Check validation report in MinIO corep-xbrl-output." ) _write_audit_marker(context, "quarantine_xbrl", "FAIL") def task_build_submission_package(**context) -> None: """ Bundle the XBRL instance + all audit artefacts into a submission package ZIP. Uploaded to MinIO corep-xbrl-output/submissions/. """ import zipfile, shutil, tempfile from minio import Minio ti = context["task_instance"] reporting_date = ti.xcom_pull(task_ids="check_reporting_date", key="reporting_date") ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") pkg_name = f"COREP_{reporting_date}_{ts}.zip" xbrl_dir = Path(os.environ.get("XBRL_OUTPUT_DIR", "output/xbrl")) report_dir = Path(os.environ.get("XBRL_REPORT_DIR", "output/validation_reports")) gx_dir = Path("gx/uncommitted/data_docs/local_site") with tempfile.TemporaryDirectory() as tmp: pkg_path = Path(tmp) / pkg_name with zipfile.ZipFile(pkg_path, "w", zipfile.ZIP_DEFLATED) as zf: # XBRL instance for f in sorted(xbrl_dir.glob("*.xbrl")): zf.write(f, f"xbrl/{f.name}") # Validation report for f in sorted(report_dir.glob("*.json")): zf.write(f, f"validation/{f.name}") # GX data docs index index_html = gx_dir / "index.html" if index_html.exists(): zf.write(index_html, "quality/gx_data_docs_index.html") # Pipeline manifest (run metadata) manifest = { "reporting_date": reporting_date, "packaged_at": ts, "dag_run_id": context["run_id"], "airflow_version": "2.9", "xbrl_files": [f.name for f in sorted(xbrl_dir.glob("*.xbrl"))], } zf.writestr("manifest.json", json.dumps(manifest, indent=2)) # Upload to MinIO client = Minio( os.environ.get("MINIO_ENDPOINT", "minio:9000"), access_key=os.environ.get("MINIO_ROOT_USER", "minioadmin"), secret_key=os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin"), secure=False, ) bucket = "corep-xbrl-output" if not client.bucket_exists(bucket): client.make_bucket(bucket) client.fput_object(bucket, f"submissions/{pkg_name}", str(pkg_path)) log.info("[submission] Package uploaded → minio://%s/submissions/%s", bucket, pkg_name) ti.xcom_push(key="submission_package", value=pkg_name) def task_write_final_audit_log(**context) -> None: """Write the complete pipeline run summary to audit.pipeline_run_log.""" from modules.base import _pg_conn ti = context["task_instance"] reporting_date = ti.xcom_pull(task_ids="check_reporting_date", key="reporting_date") q1_status = ti.xcom_pull(task_ids="run_quality_layer1", key="quality_gate_status") or "SKIP" q2_status = ti.xcom_pull(task_ids="run_quality_layer2", key="mart_gate_status") or "SKIP" xv_status = ti.xcom_pull(task_ids="run_xbrl_valid", key="xbrl_valid_status") or "SKIP" pkg_name = ti.xcom_pull(task_ids="build_submission_package", key="submission_package") or "N/A" overall = "PASS" if xv_status == "PASS" else "FAIL" conn = _pg_conn() try: cur = conn.cursor() cur.execute( """ INSERT INTO audit.pipeline_run_log (run_id, module_name, status, metadata, ran_at) VALUES (%s, 'pipeline_complete', %s, %s, now()) """, ( context["run_id"], overall, json.dumps({ "reporting_date": reporting_date, "dag_run_id": context["run_id"], "quality_layer1": q1_status, "quality_layer2": q2_status, "xbrl_validation": xv_status, "submission_package": pkg_name, }), ), ) conn.commit() log.info("[audit] Final run summary written — overall=%s run_id=%s", overall, context["run_id"]) finally: conn.close() if overall == "PASS": _slack_alert( f":white_check_mark: COREP pipeline PASSED for {reporting_date}. " f"Submission package: {pkg_name}" ) def _write_audit_marker(context: dict, stage: str, status: str) -> None: from modules.base import _pg_conn conn = _pg_conn() try: cur = conn.cursor() cur.execute( "INSERT INTO audit.pipeline_run_log (run_id, module_name, status, ran_at) VALUES (%s, %s, %s, now())", (context["run_id"], stage, status), ) conn.commit() finally: conn.close() # ═══════════════════════════════════════════════════════════════════════ # DAG DEFINITION # ═══════════════════════════════════════════════════════════════════════ with DAG( dag_id="corep_monthly_pipeline", description="Monthly COREP regulatory reporting: ingest → quality → dbt → catalog → security → XBRL → validate → package", default_args=DEFAULT_ARGS, schedule_interval="0 1 L * *", # 01:00 UTC on the last day of every month start_date=days_ago(1), catchup=False, max_active_runs=1, # Only ONE pipeline run at a time tags=["corep", "regulatory", "eba", "governance"], sla_miss_callback=sla_miss_callback, doc_md=""" ## COREP Monthly Regulatory Reporting Pipeline Runs on the last calendar day of each month at 01:00 UTC. Produces a validated XBRL instance for EBA COREP supervisory reporting. ### Modules | Step | Module | Output | |------|--------|--------| | 1 | ingest | raw.* tables | | 2 | quality_layer1 | GX raw checkpoints | | 3 | dbt | staging/intermediate/mart | | 4 | quality_layer2 | GX mart checkpoints | | 5 | catalog | OpenMetadata indexed | | 6 | security | Ranger policies applied | | 7 | xbrl_gen | .xbrl instance file | | 8 | xbrl_valid | Arelle validation report | | 9 | submission | ZIP package in MinIO | """, ) as dag: t_check_date = PythonOperator(task_id="check_reporting_date", python_callable=task_check_reporting_date) t_ingest = PythonOperator(task_id="run_ingest", python_callable=task_run_ingest) t_quality1 = PythonOperator(task_id="run_quality_layer1", python_callable=task_run_quality_layer1) t_branch_q1 = BranchPythonOperator(task_id="branch_quality_layer1", python_callable=branch_quality_layer1) t_quarantine_raw = PythonOperator(task_id="quarantine_raw_failure", python_callable=task_quarantine_raw_failure) t_dbt = PythonOperator(task_id="run_dbt", python_callable=task_run_dbt) t_quality2 = PythonOperator(task_id="run_quality_layer2", python_callable=task_run_quality_layer2) t_branch_q2 = BranchPythonOperator(task_id="branch_quality_layer2", python_callable=branch_quality_layer2) t_quarantine_mart = PythonOperator(task_id="quarantine_mart_failure", python_callable=task_quarantine_mart_failure) t_catalog = PythonOperator(task_id="run_catalog", python_callable=task_run_catalog) t_security = PythonOperator(task_id="run_security", python_callable=task_run_security) t_superset = PythonOperator(task_id="refresh_superset_metadata", python_callable=task_refresh_superset) t_xbrl_gen = PythonOperator( task_id="run_xbrl_gen", python_callable=task_run_xbrl_gen, sla=timedelta(hours=1), # alert if XBRL generation > 60 min ) t_xbrl_valid = PythonOperator( task_id="run_xbrl_valid", python_callable=task_run_xbrl_valid, sla=timedelta(minutes=30), # alert if validation > 30 min ) t_branch_xv = BranchPythonOperator(task_id="branch_xbrl_valid", python_callable=branch_xbrl_valid) t_quarantine_xbrl = PythonOperator(task_id="quarantine_xbrl_failure", python_callable=task_quarantine_xbrl_failure) t_submission = PythonOperator(task_id="build_submission_package", python_callable=task_build_submission_package) t_audit_final = PythonOperator( task_id="write_final_audit_log", python_callable=task_write_final_audit_log, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, # always runs ) # ── Wire dependencies ──────────────────────────────────────────────── t_check_date >> t_ingest >> t_quality1 >> t_branch_q1 t_branch_q1 >> [t_dbt, t_quarantine_raw] t_dbt >> t_quality2 >> t_branch_q2 t_branch_q2 >> [t_catalog, t_quarantine_mart] t_catalog >> [t_security, t_superset] [t_security, t_superset] >> t_xbrl_gen t_xbrl_gen >> t_xbrl_valid >> t_branch_xv t_branch_xv >> [t_submission, t_quarantine_xbrl] [t_submission, t_quarantine_raw, t_quarantine_mart, t_quarantine_xbrl] >> t_audit_final
5. How to Trigger, Monitor, and Debug
5.1 Initial Setup
# Access Airflow UI at http://localhost:8090
# Default credentials: admin / admin (set in docker-compose.yml)
# Verify the DAG appears — it should auto-detect from dags/ folder
# within 30 seconds of starting the Airflow container
# Check DAG syntax without triggering a run
python -c "from dags.corep_pipeline_dag import dag; print('DAG OK:', dag.dag_id)"
# Enable the DAG (it starts paused for safety)
# In the UI: toggle the pause switch next to "corep_monthly_pipeline"
# Or via Airflow CLI inside the container:
docker exec corep-airflow airflow dags unpause corep_monthly_pipeline
5.2 Trigger a Manual Run (Development)
# Trigger via CLI with a specific logical date (simulates end-of-month run) docker exec corep-airflow airflow dags trigger \ corep_monthly_pipeline \ --conf '{"reporting_date": "2026-03-31"}' # Watch task states in real time docker exec corep-airflow airflow tasks states-for-dag-run \ corep_monthly_pipeline \ --run-id "manual__2026-05-07T10:00:00+00:00" # Expected output of a passing run: check_reporting_date success run_ingest success run_quality_layer1 success branch_quality_layer1 success run_dbt success run_quality_layer2 success branch_quality_layer2 success run_catalog success run_security success refresh_superset_metadata success run_xbrl_gen success run_xbrl_valid success branch_xbrl_valid success build_submission_package success write_final_audit_log success quarantine_raw_failure skipped quarantine_mart_failure skipped quarantine_xbrl_failure skipped # Branches not taken show as "skipped" — this is correct Airflow behaviour
5.3 Reading a Quality Failure Run
# When a Layer 1 quality gate fails, the DAG run looks like this: check_reporting_date success run_ingest success run_quality_layer1 success # ← task succeeded; gate FAIL stored in XCom branch_quality_layer1 success # ← branch ran and chose quarantine path quarantine_raw_failure success # ← quarantine task ran (status=success but pipeline=FAIL) run_dbt skipped run_quality_layer2 skipped ... all downstream skipped write_final_audit_log success # ← always runs due to NONE_FAILED_MIN_ONE_SUCCESS # The entire DAG run shows as "success" in Airflow because # no task raised an exception. The FAIL is in the audit log and XCom. # This is intentional: a quality failure is not a pipeline ERROR, # it is a DATA QUALITY INCIDENT handled by the designed workflow. # Query the audit log to confirm: docker exec corep-postgres psql -U corep_admin -d corep -c \ "SELECT run_id, module_name, status, metadata->>'reporting_date' AS date FROM audit.pipeline_run_log ORDER BY ran_at DESC LIMIT 10;"
6. Automatic OpenLineage Lineage per DAG Run
When openlineage-airflow is installed, Airflow emits lineage events automatically for every task. You get a complete DAG-level lineage graph in Marquez without writing a single extra line of code in your task functions. Here is what the integration adds:
| Event emitted | When | What Marquez receives |
|---|---|---|
START | Before task function runs | Job: corep_monthly_pipeline.run_ingest, Run ID = Airflow task instance run ID, parent = DAG run ID |
COMPLETE | After task function returns without exception | Output datasets (if module’s emit_lineage() also fires, both events appear — they are deduplicated by run ID) |
FAIL | After task function raises an exception | Error message, stack trace, run ID — allows Marquez to mark the dataset lineage node as failed |
# openlineage-airflow config in airflow.cfg (or environment variables) # Set these in docker-compose.yml for the Airflow container: AIRFLOW__OPENLINEAGE__TRANSPORT=http AIRFLOW__OPENLINEAGE__URL=http://marquez:5000 AIRFLOW__OPENLINEAGE__NAMESPACE=corep-governance-pipeline # Verify lineage is being received after a run: curl http://localhost:5000/api/v1/jobs?namespace=corep-governance-pipeline | python -m json.tool # Expected: one job entry per task in the DAG # Each job entry has runs[] with start_at, end_at, and state (COMPLETED/FAILED)
7. The Regulatory Calendar Schedule
schedule_interval="0 1 L * *" — Not a Standard CronThe L (Last) day-of-month expression requires the croniter library ≥ 1.3.8 and Airflow ≥ 2.7. It runs the pipeline at 01:00 UTC on the last calendar day of every month — which is the correct trigger point for COREP monthly reporting (the reporting period closes on the last day of the month; processing runs early next morning after midnight UTC).
If your Airflow version does not support L syntax, use a Timetable instead (see Airflow 2.4+ custom timetables) or deploy month-end via @monthly (which runs on the 1st of the next month — one day late, which may be acceptable for your NCA’s T+N submission deadline).
| Reporting type | Schedule expression | Run at | EBA deadline |
|---|---|---|---|
| Monthly (all months) | 0 1 L * * | 01:00 UTC, last day of month | T+12 business days for most NCAs |
| Quarterly only (Q1/Q2/Q3/Q4) | 0 1 L 3,6,9,12 * | 01:00 UTC, last day of March, June, September, December | T+15 business days per EBA ITS |
| Manual trigger (backfill or rerun) | airflow dags trigger --conf ... | On demand | N/A — used for correction submissions |
8. What the Audit Trail Looks Like After a Complete Run
After a successful end-of-month run, the following artefacts exist and are queryable:
-- Query the full pipeline audit log for a reporting date SELECT module_name, status, metadata->>'reporting_date' AS date, metadata->>'error_count' AS errors, metadata->>'submission_package' AS package, ran_at FROM audit.pipeline_run_log WHERE metadata->>'reporting_date' = '2026-03-31' ORDER BY ran_at; -- Output: -- module_name | status | date | errors | package | ran_at -- ingest | PASS | 2026-03-31 | | | 2026-03-31 01:02:14 -- quality_raw | PASS | 2026-03-31 | | | 2026-03-31 01:03:45 -- quality_mart | PASS | 2026-03-31 | | | 2026-03-31 01:09:22 -- catalog | PASS | 2026-03-31 | | | 2026-03-31 01:11:05 -- security | PASS | 2026-03-31 | | | 2026-03-31 01:11:38 -- xbrl_valid | PASS | 2026-03-31 | 0 | | 2026-03-31 01:18:52 -- pipeline_complete | PASS | 2026-03-31 | 0 | COREP_2026-03-31_20260331T011912Z.zip | 2026-03-31 01:19:03
Artefacts produced for reporting date 2026-03-31: MinIO bucket: corep-eba-source └── source/capital_instruments.csv (raw input — immutable copy) └── source/rwa_exposures.csv └── source/... MinIO bucket: corep-gx-reports └── raw/corep_raw_2026-03-31/ (GX HTML data docs — Layer 1 pass) └── mart/corep_mart_2026-03-31/ (GX HTML data docs — Layer 2 pass) MinIO bucket: corep-xbrl-output └── COREP_2026-03-31_20260331T011623Z.xbrl └── validation_reports/validation_report_20260331T011845Z.json └── submissions/COREP_2026-03-31_20260331T011912Z.zip PostgreSQL: audit.pipeline_run_log └── 7 rows — one per module — all PASS Marquez lineage graph: └── corep_monthly_pipeline.run_ingest (COMPLETE) → raw.capital_instruments, raw.rwa_exposures, ... └── corep_monthly_pipeline.run_dbt (COMPLETE) → staging.*, intermediate.*, mart.* └── corep_monthly_pipeline.run_xbrl_gen (COMPLETE) → corep-xbrl-output/COREP_2026-03-31_*.xbrl
9. Common Airflow Problems in This Pipeline and Their Fixes
| Problem | Symptom | Fix |
|---|---|---|
| Module import error on task start | Task immediately fails with ModuleNotFoundError: modules.ingest | The Airflow container’s PYTHONPATH must include /opt/corep. Set PYTHONPATH=/opt/corep in the Airflow environment or add to airflow.cfg: pythonpath = /opt/corep |
| Branch downstream tasks show “upstream_failed” not “skipped” | All tasks after a branch show upstream_failed, blocking the final audit task | Every task that may be “skipped” by a branch must have its trigger_rule explicitly set. The final audit task must use TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS. |
| DAG schedule not triggering | DAG paused or start_date in future | Use days_ago(1) as start_date. Set catchup=False. Unpause the DAG. Check airflow scheduler container is running. |
| XCom value is None in downstream task | xcom_pull returns None when a branch was not taken | Always use or "SKIP" / or "N/A" as default when pulling XCom from tasks that may have been skipped. Skipped tasks do not push XCom values. |
| dbt subprocess fails silently | Task succeeds but mart tables not populated | Always check result.returncode != 0 and raise. Do not rely on dbt’s exit code alone — also check for "ERROR" in result.stdout. |
| Max active runs exceeded | Manual trigger does nothing | max_active_runs=1 prevents parallel runs. If a previous run is still in progress (or stuck), clear it: airflow dags clear corep_monthly_pipeline --run-id <stuck_id> |
📚 Day 13 Key Takeaways
- BranchPythonOperator + TriggerRule is the core pattern for regulatory pipelines. Quality failures must route to quarantine, never silently proceed. The final audit task must always run regardless of branch.
- Never raise exceptions for quality failures — push
"FAIL"to XCom instead. Let the branch decide the route. If the quality task raises, Airflow marks it as a task error, which activates its retry policy and blocks the branch logic. - XCom carries status strings, not data. Passing a DataFrame through XCom would bypass the PostgreSQL persistence that makes the audit trail immutable. All data flows through PostgreSQL → MinIO → PostgreSQL; XCom carries only run metadata.
openlineage-airflowis the cheapest audit investment you will ever make. Three environment variables and Marquez automatically receives a complete DAG-level lineage graph for every run, with task-level start/end timestamps and dataset references.- The submission package ZIP is the regulator-facing deliverable. It contains the XBRL instance, the validation evidence, and the GX quality documentation. Everything a compliance team needs to reconstruct and defend the submission is in one file.
- SLAs are a regulatory calendar tool, not a performance metric. Set SLAs based on the NCA submission deadline minus your correction buffer — typically T+12 business days for monthly COREP. An SLA breach means you need to investigate immediately, not eventually.
- Next: Day 14 — Building the COREP Capital Dashboard in Apache Superset — connecting Trino as the query engine and building the CET1 ratio time-series panel from
mart.corep_c0300.

