Popular Now
Infographic illustrating production‑ready GKE architecture, showing Google Cloud services, Kubernetes clusters, DevOps/GitOps workflows, SRE practices, observability, security, and disaster recovery components.

Production-Ready GKE: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Infographic showing best practices for production‑ready EKS deployments, illustrating AWS cloud architecture, Kubernetes clusters, GitOps automation, observability, security, and disaster recovery principles.

Production-Ready EKS: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

End-to-End Data Lineage for COREP — Drilling from an XBRL Fact Back to the Source Column

Trace an EBA XBRL fact through column-level lineage back to its source CSV using OpenLineage, Marquez, and dbt-openlineage — with a full BCBS 239 compliance mapping.
📅 Day 14 of 18  ·  COREP Governance Pipeline Series  ·  Data Lineage

A European bank receives a query from the ECB. The question is precise: “In your Q1 2026 COREP C 01.00 submission, XBRL fact c0020 (CET1 Capital) is reported as €4,250,000 thousand. Please provide the complete calculation methodology, the source data used, the data quality checks applied, and the identity of the system that produced this value.”

You have ten business days to answer. If you cannot answer — if you cannot trace that specific number from the XBRL file back to the individual rows in your source systems that contributed to it — you are in violation of BCBS 239 Principle 2 (Data Architecture and IT Infrastructure) and Principle 6 (Adaptability). The EBA’s SREP process can result in additional capital requirements for banks with poor data governance.

This is exactly the question that end-to-end data lineage answers. And if you have built this pipeline correctly through Days 1–13, you can answer it in under five minutes using three tools: Marquez, OpenMetadata, and a single SQL query against your audit log.

This post traces the complete lineage path from XBRL fact c0020 (CET1 Capital) back to its source column in raw.capital_instruments, explains how each layer of the stack contributes to and records that lineage, shows you how to query it programmatically, and builds the lineage_snapshot.py script that captures the full graph as a JSON audit artefact at submission time.

1. The Three-Layer Lineage Stack

End-to-end lineage in this pipeline is produced by three different tools at three different granularities. Understanding which tool covers which layer is the prerequisite for knowing where to look when you need to answer an auditor’s question.

LayerToolGranularityWhat it capturesWhere it’s stored
Pipeline levelopenlineage-airflowJob → datasetEvery Airflow task run: which datasets were read, which were written, start/end timestamps, DAG run IDMarquez (port 5000) — queryable via REST API
Transformation leveldbt-openlineageColumn → columnFor every dbt model run: which source columns map to which output columns, via SQL parsing of the compiled modelMarquez — attached to dataset run facets
Catalog levelOpenMetadataTable → table + columnBusiness glossary links, PII tags, data owners, last-updated timestamps, schema descriptionsOpenMetadata MySQL + Elasticsearch (port 8585)
🔎 OpenLineage is the Protocol, Marquez is the Storage

OpenLineage is an open specification (Apache 2.0) that defines how tools emit lineage events in a standard JSON format. Every emitter — Airflow, dbt, your Python modules — sends events to a transport. In this pipeline the transport is HTTP to Marquez. Marquez stores the events and exposes a REST API and web UI at http://localhost:5000. If you later migrate to Atlan or DataHub, you change the transport endpoint in one place; all emitters continue working unchanged.

📌 The Complete Lineage Path for XBRL Fact c0020 (CET1 Capital)

2. Tracing c0020 from XBRL Back to Source — The Full Chain

Here is the complete path. Each arrow represents a lineage event recorded in Marquez by a different tool.

QUESTION: "Where does XBRL c0020 = 4,250,000 come from?"

XBRL LAYER
  corep-xbrl-output/COREP_2026-03-31_*.xbrl
  └── concept: {eba_met}c0020
      value:   4250000  (decimals=-3, unit=EUR)
      context: C_2026-03-31_instant
             │
             │  ← recorded by: xbrl_gen.emit_lineage()   job: corep-governance-pipeline.xbrl_gen   input dataset: postgres://corep/mart.corep_c0100MART LAYER
  mart.corep_c0100
  └── column: cet1_capital = 4250000000  (stored in full EUR, not thousands)
             │
             │  ← recorded by: dbt-openlineage (column-level)   job: corep-governance-pipeline.dbt.corep_c0100   input column: intermediate.int_capital_by_tier.cet1_capitalINTERMEDIATE LAYER
  intermediate.int_capital_by_tier
  └── column: cet1_capital = SUM(amount) WHERE tier='CET1'
             │
             │  ← recorded by: dbt-openlineage (column-level)   job: corep-governance-pipeline.dbt.int_capital_by_tier   input column: staging.stg_capital_instruments.amount
             │                               AND .tier
             ▼
STAGING LAYER
  staging.stg_capital_instruments
  └── column: amount = CAST(raw.capital_instruments.amount AS NUMERIC(20,2))
  └── column: tier   = LOWER(TRIM(raw.capital_instruments.tier))
             │
             │  ← recorded by: dbt-openlineage (column-level)   job: corep-governance-pipeline.dbt.stg_capital_instruments   input column: raw.capital_instruments.amount
             │                               AND .tier
             ▼
RAW LAYER
  raw.capital_instruments
  └── column: amount  (original string from CSV: "4250000000.00")
  └── column: tier    (original string from CSV: "CET1")
             │
             │  ← recorded by: ingest.emit_lineage()   job: corep-governance-pipeline.ingest   input dataset: file:///data/source/capital_instruments.csvSOURCE LAYER
  file:///data/source/capital_instruments.csv
  └── rows WHERE tier='CET1': SUM(amount) = 4,250,000,000.00 EUR
  └── Immutable copy: minio://corep-eba-source/source/capital_instruments.csv
  └── Ingest timestamp: 2026-03-31 01:02:14 UTC (from audit.pipeline_run_log)

That is the complete answer to the ECB’s question. Every arrow in that chain is a recorded, queryable, timestamped lineage event. The rest of this post shows you how to retrieve each one.

🔍 Querying the Lineage Graph via the Marquez API

3. Querying Marquez: From XBRL Fact to Source in Five API Calls

Step 1: Find the job that wrote the XBRL file

# Marquez REST API — base URL: http://localhost:5000/api/v1

# List all jobs in the corep namespace
curl -s "http://localhost:5000/api/v1/namespaces/corep-governance-pipeline/jobs" \
  | python -m json.tool | grep -A2 '"name"'

# Expected output (truncated):
# "name": "ingest"
# "name": "dbt.stg_capital_instruments"
# "name": "dbt.int_capital_by_tier"
# "name": "dbt.corep_c0100"
# "name": "xbrl_gen"
# "name": "xbrl_valid"
# "name": "corep_monthly_pipeline.run_xbrl_gen"  ← Airflow task-level

# Get the most recent run of the xbrl_gen job
curl -s "http://localhost:5000/api/v1/namespaces/corep-governance-pipeline/jobs/xbrl_gen/runs?limit=1" \
  | python -m json.tool
# Response (key fields):
{
  "runs": [{
    "id":        "a3f8c1d2-4b5e-6789-abcd-ef0123456789",
    "state":     "COMPLETED",
    "startedAt": "2026-03-31T01:16:23.000Z",
    "endedAt":   "2026-03-31T01:16:41.000Z",
    "inputDatasets": [
      {"namespace": "postgres://corep", "name": "mart.corep_c0100"},
      {"namespace": "postgres://corep", "name": "mart.corep_c0200"},
      {"namespace": "postgres://corep", "name": "mart.corep_c0300"},
      {"namespace": "postgres://corep", "name": "mart.corep_c4700"}
    ],
    "outputDatasets": [
      {"namespace": "minio://corep-xbrl-output",
       "name":      "COREP_2026-03-31_20260331T011623Z.xbrl"}
    ]
  }]
}

Step 2: Get the dataset that feeds c0020 — mart.corep_c0100

# Get lineage for the mart.corep_c0100 dataset
curl -s "http://localhost:5000/api/v1/lineage?nodeId=dataset:postgres://corep:mart.corep_c0100&depth=3" \
  | python -m json.tool

# The "depth=3" parameter traverses three hops upstream:
# mart.corep_c0100 ← dbt.corep_c0100 ← int_capital_by_tier ← stg_capital_instruments

# Response structure (simplified):
{
  "graph": [
    {"id": "dataset:postgres://corep:mart.corep_c0100",
     "type": "DATASET",
     "inEdges": [{"origin": "job:corep-governance-pipeline:dbt.corep_c0100"}],
     "outEdges": [{"destination": "job:corep-governance-pipeline:xbrl_gen"}]},
    {"id": "job:corep-governance-pipeline:dbt.corep_c0100",
     "type": "JOB",
     "inEdges": [{"origin": "dataset:postgres://corep:intermediate.int_capital_by_tier"}]},
    ... (continues upstream)
  ]
}

Step 3: Get column-level lineage from dbt

# Column-level lineage is stored in Marquez dataset version facets
# Query the latest version of mart.corep_c0100
curl -s "http://localhost:5000/api/v1/namespaces/postgres://corep/datasets/mart.corep_c0100/versions?limit=1" \
  | python -m json.tool | grep -A30 '"columnLineage"'

# Expected column lineage facet for cet1_capital:
{
  "columnLineage": {
    "fields": {
      "cet1_capital": {
        "inputFields": [
          {
            "namespace":  "postgres://corep",
            "dataset":    "intermediate.int_capital_by_tier",
            "field":      "cet1_capital",
            "transformationDescription": "DIRECT",
            "transformationType": "IDENTITY"
          }
        ]
      },
      "own_funds": {
        "inputFields": [
          {"dataset": "intermediate.int_capital_by_tier", "field": "cet1_capital"},
          {"dataset": "intermediate.int_capital_by_tier", "field": "at1_capital"},
          {"dataset": "intermediate.int_capital_by_tier", "field": "t2_capital"}
        ]
      }
    }
  }
}
# "own_funds" has THREE input fields — this is why column-level lineage matters.
# A table-level tool would only tell you "int_capital_by_tier → mart.corep_c0100".
# Column-level tells you WHICH columns of int_capital_by_tier contribute to WHICH output.
📄 How dbt-openlineage Produces Column-Level Lineage

4. How dbt-openlineage Extracts Column Lineage from SQL

When you run dbt run with dbt-openlineage installed and configured, dbt automatically parses the compiled SQL of every model using the OpenLineage SQL parser and emits a COMPLETE event with a columnLineage facet. No annotations in your SQL are required.

-- dbt/models/intermediate/int_capital_by_tier.sql
-- dbt-openlineage parses this compiled SQL and extracts:
-- output column "cet1_capital" ← input column "stg_capital_instruments.amount" (via SUM + WHERE filter)
-- output column "at1_capital"  ← input column "stg_capital_instruments.amount" (via SUM + WHERE filter)
-- output column "t2_capital"   ← input column "stg_capital_instruments.amount" (via SUM + WHERE filter)

SELECT
    reporting_date,
    SUM(CASE WHEN tier = 'CET1' THEN amount ELSE 0 END) AS cet1_capital,
    SUM(CASE WHEN tier = 'AT1'  THEN amount ELSE 0 END) AS at1_capital,
    SUM(CASE WHEN tier = 'T2'   THEN amount ELSE 0 END) AS t2_capital
FROM {{ ref('stg_capital_instruments') }}
GROUP BY reporting_date
# dbt/profiles.yml — the openlineage transport section that enables this
# (configured in Day 6 — repeated here for reference)

corep_governance_pipeline:
  outputs:
    dev:
      type: postgres
      host: localhost
      port: 5432
      user: corep_admin
      password: "{{ env_var('POSTGRES_PASSWORD') }}"
      dbname: corep
      schema: staging
  target: dev

openlineage:
  transport:
    type: http
    url: "{{ env_var('MARQUEZ_URL', 'http://localhost:5000') }}"
    endpoint: /api/v1/lineage
  namespace: corep-governance-pipeline
  # column_lineage is enabled by default in dbt-openlineage >= 1.0
  column_lineage_enabled: true
✓ What dbt-openlineage Parses Automatically

The OpenLineage SQL parser handles: SELECT col AS alias, CAST(col AS type), CASE WHEN ... THEN col, SUM(col) / AVG(col) / aggregate functions, col1 + col2 arithmetic, COALESCE(col1, col2), and JOIN column references. It does not parse: dynamic SQL, macro-generated column names, SELECT * (expands at runtime — dbt resolves this via its manifest), Python dbt models.

For SELECT * models, dbt-openlineage falls back to schema-level column listing from the manifest. Column-level lineage is still emitted, just as “all columns of source X → all columns of target Y” rather than individual field mappings.

📄 Building the Lineage Snapshot for Audit

5. xbrl/lineage_snapshot.py — Capturing the Full Graph at Submission Time

At submission time, the pipeline runs lineage_snapshot.py to capture the complete lineage graph for that reporting date as a JSON file. This becomes part of the submission package built in Day 13’s task_build_submission_package. It is the machine-readable answer to “show me the full lineage for this submission.”

"""
xbrl/lineage_snapshot.py — Capture the full data lineage graph for a
reporting date and write it as a structured JSON audit artefact.

Called by task_build_submission_package in the Airflow DAG.
Output: output/lineage/lineage_snapshot_YYYY-MM-DD.json

The snapshot captures:
  - Every Marquez job that ran for the reporting period
  - Each job's input and output datasets
  - Column-level lineage for the four mart tables (from dataset version facets)
  - Row counts per table (from audit.pipeline_run_log)
  - XBRL concept → mart column mapping (from mart_to_xbrl_mapping.yaml)
"""

import json, logging, os, urllib.request
from datetime import datetime, timezone
from pathlib import Path

import yaml

log = logging.getLogger(__name__)

MARQUEZ_URL  = os.environ.get("MARQUEZ_URL", "http://localhost:5000")
NAMESPACE    = "corep-governance-pipeline"
MAPPING_FILE = Path(os.environ.get("XBRL_MAPPING_FILE", "xbrl/mart_to_xbrl_mapping.yaml"))
LINEAGE_DIR  = Path("output/lineage")

# Mart tables that feed XBRL output — the core of the lineage query
MART_TABLES = [
    "mart.corep_c0100",
    "mart.corep_c0200",
    "mart.corep_c0300",
    "mart.corep_c4700",
]


def _marquez_get(path: str) -> dict:
    url = f"{MARQUEZ_URL}/api/v1{path}"
    with urllib.request.urlopen(url, timeout=15) as resp:
        return json.loads(resp.read())


def _get_dataset_lineage(namespace: str, dataset_name: str, depth: int = 5) -> dict:
    encoded_ns   = urllib.parse.quote(namespace, safe="")
    encoded_name = urllib.parse.quote(dataset_name, safe="")
    node_id      = f"dataset:{namespace}:{dataset_name}"
    encoded_node = urllib.parse.quote(node_id, safe="")
    return _marquez_get(f"/lineage?nodeId={encoded_node}&depth={depth}")


def _get_column_lineage(namespace: str, dataset_name: str) -> dict:
    encoded_ns   = urllib.parse.quote(namespace, safe="")
    encoded_name = urllib.parse.quote(dataset_name, safe="")
    versions     = _marquez_get(f"/namespaces/{encoded_ns}/datasets/{encoded_name}/versions?limit=1")
    if not versions.get("versions"):
        return {}
    facets = versions["versions"][0].get("facets", {})
    return facets.get("columnLineage", {}).get("fields", {})


def _get_xbrl_to_mart_map() -> list:
    """Load the mart_to_xbrl_mapping.yaml and invert it for the snapshot."""
    with open(MAPPING_FILE) as f:
        mapping = yaml.safe_load(f)
    entries = []
    for entry in mapping.get("mappings", []):
        entries.append({
            "xbrl_concept": entry["concept_id"],
            "mart_table":   entry["source_table"],
            "mart_column":  entry["source_column"],
            "unit":         entry.get("unit", "EUR"),
            "decimals":     entry.get("decimals", -3),
        })
    return entries


def build_lineage_snapshot(reporting_date: str) -> Path:
    """
    Build a complete lineage snapshot JSON for the given reporting_date.
    Returns the path to the written snapshot file.
    """
    import urllib.parse

    LINEAGE_DIR.mkdir(parents=True, exist_ok=True)
    ts          = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    output_path = LINEAGE_DIR / f"lineage_snapshot_{reporting_date}.json"

    log.info("[lineage_snapshot] Building snapshot for reporting_date=%s", reporting_date)

    snapshot = {
        "reporting_date":  reporting_date,
        "snapshot_taken":  ts,
        "namespace":       NAMESPACE,
        "xbrl_concept_map": _get_xbrl_to_mart_map(),
        "mart_lineage":    {},
        "pipeline_jobs":   [],
    }

    # ── 1. For each mart table: full upstream lineage graph ──────────
    for table in MART_TABLES:
        log.info("[lineage_snapshot] Fetching lineage for %s", table)
        try:
            graph        = _get_dataset_lineage("postgres://corep", table, depth=6)
            col_lineage  = _get_column_lineage("postgres://corep", table)
            snapshot["mart_lineage"][table] = {
                "upstream_graph": graph,
                "column_lineage": col_lineage,
            }
        except Exception as exc:
            log.warning("[lineage_snapshot] Could not fetch lineage for %s: %s", table, exc)
            snapshot["mart_lineage"][table] = {"error": str(exc)}

    # ── 2. List all pipeline jobs and their last run state ───────────
    try:
        jobs_resp = _marquez_get(f"/namespaces/{NAMESPACE}/jobs?limit=50")
        for job in jobs_resp.get("jobs", []):
            runs_resp = _marquez_get(
                f"/namespaces/{NAMESPACE}/jobs/{urllib.parse.quote(job['name'], safe='')}/runs?limit=1"
            )
            last_run = runs_resp["runs"][0] if runs_resp.get("runs") else {}
            snapshot["pipeline_jobs"].append({
                "job_name":    job["name"],
                "last_run_id": last_run.get("id"),
                "state":       last_run.get("state"),
                "started_at":  last_run.get("startedAt"),
                "ended_at":    last_run.get("endedAt"),
            })
    except Exception as exc:
        log.warning("[lineage_snapshot] Could not fetch job list: %s", exc)

    output_path.write_text(json.dumps(snapshot, indent=2))
    log.info("[lineage_snapshot] Snapshot written: %s", output_path)
    return output_path


if __name__ == "__main__":
    import sys
    date_arg = sys.argv[1] if len(sys.argv) > 1 else "2026-03-31"
    path = build_lineage_snapshot(date_arg)
    print(f"Lineage snapshot: {path}")

6. Column Lineage in Practice — What the Marquez UI Shows

The Marquez web UI at http://localhost:5000 visualises this lineage graph. Here is how to navigate it for the COREP c0020 drill-down:

Marquez UI Navigation — Finding c0020 Lineage

Step 1: Open http://localhost:5000
  → Select namespace: corep-governance-pipeline
  → Click "Datasets" tab
  → Search: mart.corep_c0100

Step 2: Dataset detail page
  → Click the dataset node
  → Right panel shows:
    ├── Schema (columns: cet1_capital, at1_capital, t2_capital, own_funds, ...)
    ├── Last updated: 2026-03-31 01:09:22 UTC
    ├── Source job: dbt.corep_c0100
    └── "View Lineage" button → opens graph

Step 3: Lineage graph
  Upstream (left):                           Downstream (right):
  ┌──────────────────────┐                  ┌─────────────────────────┐
  │ raw.capital_instrs   │                  │ xbrl_gen                │
  │  (ingest job)        │ ──→ stg ──→ int ─┤ COREP_2026-03-31.xbrl  │
  │ raw.rwa_exposures    │                  └─────────────────────────┘
  └──────────────────────┘
      ↑ each box is clickable — expands to show its own upstream

Step 4: Column-level drill (click "cet1_capital" column in schema panel)
  Shows: cet1_capital in mart.corep_c0100
    ← cet1_capital from intermediate.int_capital_by_tier (IDENTITY transform)
       ← amount from staging.stg_capital_instruments (SUM+FILTER transform)
          ← amount from raw.capital_instruments (CAST transform)
             ← source file: /data/source/capital_instruments.csv

7. What OpenMetadata Adds That Marquez Cannot Provide

Marquez answers “how was this data produced?” OpenMetadata answers “what does this data mean?” Both are needed for a complete regulatory response.

QuestionToolAPI / Location
Which source CSV rows contribute to XBRL c0020?MarquezGET /api/v1/lineage?nodeId=dataset:postgres://corep:mart.corep_c0100&depth=6
What is the business definition of “CET1 capital”?OpenMetadataEBA COREP Glossary → term “cet1-capital” → legal reference: CRR Art. 50
Who owns the mart.corep_c0100 table?OpenMetadataTable entity → owners field → “Data Governance Team”
Is any PII data in the lineage chain?OpenMetadataColumn tags: raw.counterparties.name tagged GDPR.PII — but it is dropped in stg_counterparties.sql before reaching mart
When was the taxonomy version last updated?OpenMetadataDataset description on mart.corep_c0100: “EBA DPM 4.0 / taxonomy 3.3”
What data quality checks passed on this data?Great Expectations data docsMinIO corep-gx-reports/mart/corep_mart_2026-03-31/index.html
# Query OpenMetadata for the cet1-capital glossary term
curl -s -H "Authorization: Bearer ${OM_JWT_TOKEN}" \
  "http://localhost:8585/api/v1/glossaryTerms/name/EBA-COREP-Glossary.cet1-capital" \
  | python -m json.tool

# Response (key fields):
{
  "name":        "cet1-capital",
  "displayName": "Common Equity Tier 1 Capital",
  "description": "The highest quality of regulatory capital. CRR Article 50. 
                  Includes ordinary share capital, retained earnings, accumulated OCI,
                  less regulatory deductions.",
  "references": [
    {"name": "CRR Art. 50", "endpoint": "https://eba.europa.eu/..."},
    {"name": "EBA DPM c0020", "endpoint": "https://eba.europa.eu/xbrl/crr/..."}
  ],
  "relatedTerms": ["own-funds", "tier1-capital"],
  "tags": ["Regulatory.EBA", "Capital.CET1"]
}
📄 Assembling the Auditor Response

8. Assembling the Complete Auditor Response

Putting it all together: here is the exact output of querying the lineage stack to answer “where does XBRL c0020 = 4,250,000 come from?”

# xbrl/answer_auditor_query.py — run this to produce the complete audit response

import json, os, urllib.request, urllib.parse
from pathlib import Path

MARQUEZ_URL    = os.environ.get("MARQUEZ_URL", "http://localhost:5000")
REPORTING_DATE = "2026-03-31"
XBRL_CONCEPT   = "c0020"          # CET1 capital
MART_TABLE     = "mart.corep_c0100"
MART_COLUMN    = "cet1_capital"

def build_audit_response():
    response = {
        "query": f"Trace XBRL concept {XBRL_CONCEPT} in COREP submission {REPORTING_DATE}",
        "answer": {}
    }

    # 1. XBRL fact details
    response["answer"]["xbrl_fact"] = {
        "concept":  "eba_met:c0020",
        "label":    "Common Equity Tier 1 Capital",
        "value":    4250000,
        "unit":     "EUR",
        "decimals": "-3",
        "meaning":  "4,250,000,000 EUR (value × 1000)",
        "template": "C 01.00 — Own Funds",
        "xbrl_file": f"corep-xbrl-output/COREP_{REPORTING_DATE}_20260331T011623Z.xbrl",
    }

    # 2. Mart table value (from Trino)
    response["answer"]["mart_source"] = {
        "table":  MART_TABLE,
        "column": MART_COLUMN,
        "value":  4250000000,   # full precision before ÷1000
        "query":  f"SELECT {MART_COLUMN} FROM {MART_TABLE}",
    }

    # 3. Column-level lineage from Marquez
    encoded = urllib.parse.quote(f"dataset:postgres://corep:{MART_TABLE}", safe="")
    lineage  = json.loads(
        urllib.request.urlopen(
            f"{MARQUEZ_URL}/api/v1/lineage?nodeId={encoded}&depth=6", timeout=10
        ).read()
    )
    response["answer"]["lineage_chain"] = [
        {"layer": "mart",         "table": "mart.corep_c0100",                "column": "cet1_capital"},
        {"layer": "intermediate",  "table": "intermediate.int_capital_by_tier",  "column": "cet1_capital",
         "transform": "SUM(amount) WHERE tier='CET1'"},
        {"layer": "staging",       "table": "staging.stg_capital_instruments",   "column": "amount",
         "transform": "CAST(raw.capital_instruments.amount AS NUMERIC(20,2))"},
        {"layer": "raw",           "table": "raw.capital_instruments",           "column": "amount",
         "transform": "loaded as string from CSV"},
        {"layer": "source",        "file":  "capital_instruments.csv",
         "immutable_copy": "minio://corep-eba-source/source/capital_instruments.csv",
         "ingest_timestamp": "2026-03-31T01:02:14Z"},
    ]

    # 4. Quality checks applied
    response["answer"]["quality_evidence"] = {
        "raw_gate": "PASS",
        "mart_gate": "PASS",
        "gx_docs": f"minio://corep-gx-reports/mart/corep_mart_{REPORTING_DATE}/index.html",
        "xbrl_validation": "PASS — 0 errors, 2 warnings (insignificant rounding)",
        "validation_report": "minio://corep-xbrl-output/validation_reports/...",
    }

    # 5. Security controls active at query time
    response["answer"]["security_controls"] = {
        "ranger_policy": "corep_reporting role — SELECT on mart.*",
        "pii_in_lineage": "raw.counterparties.name (GDPR.PII) dropped in stg_counterparties — NOT in mart",
        "pg_audit": "DDL/DML audit log active on audit schema",
    }

    print(json.dumps(response, indent=2))
    return response


if __name__ == "__main__":
    build_audit_response()
🔒 Mapping Lineage Capabilities to BCBS 239 Principles

9. How This Lineage Architecture Satisfies BCBS 239

BCBS 239 PrincipleRequirementHow this pipeline satisfies it
P2 — Data ArchitectureA bank should design, build and maintain data architecture and IT infrastructure which fully supports its risk data aggregation capabilities during normal times and during times of stress or crisisOpenLineage emits lineage automatically on every run. Marquez retains full history. The lineage architecture is the same in normal runs and restatement runs — same code, same tools.
P3 — Accuracy and IntegrityA bank should be able to generate accurate and reliable risk data to meet normal and stress/crisis reporting accuracy requirementsTwo-layer GX quality gates (raw + mart) with HTML evidence in MinIO. Arelle XBRL validation with JSON report in MinIO. Both are captured in the audit log per run.
P4 — CompletenessA bank should be able to capture and aggregate all material risk data across the banking groupIngest module captures row counts per table. output_check() verifies no tables are empty. Column-level lineage confirms all EBA DPM concepts are sourced from the correct columns.
P6 — AdaptabilityA bank should be able to generate aggregate risk data to meet a broad range of on-demand, ad hoc risk management reporting requestsMarquez lineage graph is queryable via REST API at any time. The lineage snapshot JSON in the submission package answers ad hoc queries without re-running the pipeline.
P7 — AccuracyRisk management reports should accurately and precisely convey aggregated risk data and reflect risk in an exact mannerdbt column-level tests (not-null, accepted-values, unique) plus GX suite checks on value ranges and cross-column consistency. XBRL calculation linkbase validation confirms arithmetic precision.
P11 — DistributionRisk management reports should be distributed to the relevant parties while ensuring confidentiality is maintainedRanger RBAC controls which roles can access which mart tables. Superset → Trino → Ranger enforces the same policy at dashboard query time as at pipeline execution time.

10. Lineage Gaps — What Breaks End-to-End Traceability

Gap patternWhat happensHow to prevent it
dbt model uses {{ run_query() }} to write to a non-ref tableMarquez has no record of the write — the lineage chain breaks at that modelNever write outside dbt’s ref/source graph. All writes must go through {{ ref() }} or {{ source() }} so dbt-openlineage can parse them.
A module reads from PostgreSQL without calling emit_lineage()The read is invisible to Marquez — dataset appears to have no consumerEvery module that reads a dataset must call emit_lineage(). BaseModule enforces this via the abstract method contract.
Manual SQL run directly against PostgreSQL (outside the pipeline)No lineage event emitted — data modified without provenanceRanger policy denies DML on mart and intermediate schemas to all roles except pipeline_service. Manual writes are structurally impossible for non-pipeline users.
Airflow task succeeds but the Python module’s emit_lineage() raises silentlyAirflow reports success but Marquez has no record of the runAll emit_lineage() implementations wrap the emit call in try/except and log at WARNING level (non-fatal). Monitor Marquez for missing job runs after Airflow success — absence of a lineage event is an alertable condition.
EBA taxonomy version change without updating mart_to_xbrl_mapping.yamlOld concept IDs in the lineage snapshot — mismatch between snapshot and actual XBRL outputThe mapping file version should be pinned in the submission package manifest. When the taxonomy version changes, regenerate the mapping and bump the manifest version field.

📚 Day 14 Key Takeaways

  • Lineage has three layers and each requires a different tool: pipeline-level (openlineage-airflow → Marquez), column-level (dbt-openlineage → Marquez), and business-meaning level (OpenMetadata). All three are needed to fully answer a regulatory query.
  • Column-level lineage is the differentiator for BCBS 239. Table-level lineage tells you “mart.corep_c0100 came from raw.capital_instruments.” Column-level tells you “the own_funds column is the arithmetic sum of three specific input columns.” That is what a regulator actually needs to verify the calculation.
  • The lineage snapshot JSON is the machine-readable audit artefact. Include it in every submission package ZIP alongside the XBRL file and validation report. If the Marquez server is unavailable during an audit, the snapshot provides the complete provenance record.
  • Lineage gaps are policy violations. Any write to a mart or intermediate table that does not go through the pipeline creates an invisible data modification. Ranger policies that restrict direct DML to pipeline_service role only are the structural enforcement of this rule.
  • OpenLineage is transport-agnostic. Today the transport is Marquez. If your bank’s data governance team mandates DataHub or Atlan, change one environment variable — all emitters continue working unchanged.
  • The complete auditor answer takes under 5 minutes: Marquez API for the lineage chain, OpenMetadata for the business definition, GX data docs for quality evidence, and the audit log for timestamps. That five-minute capability is the entire point of Days 1–14.
  • Next: Day 15 — Building the COREP Capital Dashboard in Apache Superset — CET1 ratio trend panel, RWA breakdown treemap, and LCR gauge all connected to Trino over the mart tables we have just traced.
Previous Post

Orchestrating a Regulatory Reporting Pipeline with Apache Airflow

Next Post

Building a Regulatory Dashboard in Superset — Capital Ratios and Governance Audit in One View

Add a comment

Leave a Reply

Your email address will not be published. Required fields are marked *