Popular Now
Infographic illustrating production‑ready GKE architecture, showing Google Cloud services, Kubernetes clusters, DevOps/GitOps workflows, SRE practices, observability, security, and disaster recovery components.

Production-Ready GKE: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Infographic showing best practices for production‑ready EKS deployments, illustrating AWS cloud architecture, Kubernetes clusters, GitOps automation, observability, security, and disaster recovery principles.

Production-Ready EKS: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Three Layers of Metadata Every Bank Must Manage — and How OpenMetadata Handles All of Them

Technical, semantic, and operational metadata — every bank needs all three layers. Build them with OpenMetadata, the EBA regulatory glossary, GDPR PII tags, and dbt lineage.
📅 Day 9 of 18  ·  COREP Governance Pipeline Series  ·  Data Catalog & Metadata

By the end of Day 8 you had quality gates that block bad data from reaching your XBRL generator. But a quality gate that fails only helps you if someone on the team can quickly answer three questions: What is this column? Where did this data come from? Who is allowed to see it?

If your answer to any of those questions is “open a Confluence page, search Jira, ask the data engineer who built it three years ago” — you have a metadata problem. And in regulated EU banking, a metadata problem is a supervisory finding waiting to happen.

EBA’s IRRBB Guidelines, BCBS 239 Principles 1–5, and the ECB’s SREP data quality expectations all require that banks maintain documented, machine-readable metadata for every data element used in regulatory reports. “Machine-readable” is the operative phrase. A Word document is not machine-readable. OpenMetadata is.

This post builds the catalog layer for the COREP pipeline: auto-discovery of all tables, an EBA regulatory glossary, GDPR PII tagging, lineage federation from Marquez, and the connector configuration that wires it all together.

1. The Three Layers of Metadata

The financial industry’s metadata problem is always described as “we don’t know what our data means.” That statement hides three separate and distinct problems that require different tooling to solve.

LayerNameThe question it answersWho caresTool
Layer 1Technical metadata“What tables exist? What are the columns and types?”Data engineers, DBAsOpenMetadata auto-discovery
Layer 2Business / semantic metadata“What does tier mean? Is ‘CET1’ the same as ‘Common Equity Tier 1’ in the EBA glossary?”Risk officers, auditors, regulatorsOpenMetadata Glossary + EBA concept IDs
Layer 3Operational metadata“When was this table last refreshed? What pipeline produced it? Did quality pass?”Compliance, audit, ECB SREP inspectorsOpenMetadata + Marquez lineage + GX quality
⚠ The BCBS 239 Trap

Most banks have Layer 1 covered — they have a data dictionary somewhere. Very few have Layer 2 automated at the column level. Almost none have Layer 3 accessible to non-engineers. BCBS 239 requires all three, and ECB SREP Q&A teams ask questions that span all three in a single question: “Show me the definition of Own Funds in your data dictionary, trace it to your COREP C 01.00 submission, and show me the last time it was quality-checked.”

  LAYER 1 — Technical Metadata
  ┌─────────────────────────────────────────────────────────────────┐
  │  PostgreSQL schemas: raw · staging · intermediate · mart        │
  │  Tables: capital_instruments, rwa_exposures, corep_c0100 ...    │
  │  Columns: instrument_id VARCHAR, tier TEXT, amount NUMERIC ...  │
  │  Constraints: PK, NOT NULL, CHECK                               │
  │  ← OpenMetadata PostgreSQL connector auto-discovers all of this  │
  └─────────────────────────────────────────────────────────────────┘
                               │
                               ▼
  LAYER 2 — Business / Semantic Metadata
  ┌─────────────────────────────────────────────────────────────────┐
  │  column tier      → Glossary term "Capital Tier Classification"  │
  │  column amount    → EBA concept c0010 · unit EUR · decimals -3  │
  │  column cet1_ratio→ EBA concept c0050 · floor 4.5% · CRR Art.92│
  │  table corep_c0100→ EBA DPM 4.0 Template C 01.00 Own Funds      │
  │  tag: PII=FALSE   → GDPR Article 4 — no personal data in mart   │
  │  ← Manually curated EBA glossary + column-level tag assignments  │
  └─────────────────────────────────────────────────────────────────┘
                               │
                               ▼
  LAYER 3 — Operational Metadata
  ┌─────────────────────────────────────────────────────────────────┐
  │  Last refreshed: 2026-05-07T08:14:33Z (Airflow DAG run)         │
  │  Lineage: capital_instruments.csv → raw → stg → int → mart      │
  │  Quality: raw_capital_suite PASS 12/12 · mart_corep_suite PASS  │
  │  Column lineage: stg.amount → int.total_amount → c0100.own_funds│
  │  ← OpenMetadata Lineage API + Marquez federation + GX metadata   │
  └─────────────────────────────────────────────────────────────────┘

2. OpenMetadata Architecture in the COREP Stack

OpenMetadata runs as a single service on port 8585 backed by MySQL (metadata store) and Elasticsearch (full-text search index). It talks to your data sources via ingestion connectors that run as Python processes — either scheduled inside OpenMetadata itself or triggered from Airflow.

  Data Sources                OpenMetadata 1.3              Consumers
  ──────────────               ────────────────               ──────────
  PostgreSQL:5432 ──────────► Metadata Server :8585 ───────► Risk officers
  dbt project ──────────────► │  MySQL :3306              │   Auditors
  Airflow DAGs ─────────────► │  Elasticsearch :9200      │   Regulators
  Marquez :5000 ─────────────► │  OpenMetadata UI         │   Compliance
  Great Expectations ────────► │                          │   Data engineers
                               │  Connectors (Python):    │
  MinIO :9000 ──────────────► │  • postgres_connector    │
                               │  • dbt_connector         │
                               │  • airflow_connector     │
                               │  • openlineage_connector │
                               └──────────────────────────┘
                                         │
                              Glossary   │   Tags   │   Lineage
                              EBA DPM ──►│◄── PII ──│◄── Marquez

OpenMetadata is already in your docker-compose.yml from Day 1. Verify it is running:

docker compose --project-directory C:\reg_repo\platform-solution\corep-governance-pipeline ps | grep openmetadata
# Expected: openmetadata   running   0.0.0.0:8585->8585/tcp

# Default credentials (change immediately in production)
# URL: http://localhost:8585
# User: admin   Password: admin
📄 Layer 1 — Technical Metadata: Auto-Discovery

3. Connecting OpenMetadata to PostgreSQL

The PostgreSQL connector reads the information schema and pg_catalog to discover every schema, table, column, data type, constraint, and row-count statistic. It runs on a schedule and keeps the catalog current automatically.

3.1 Install the OpenMetadata ingestion library

# In your virtual environment
pip install "openmetadata-ingestion[postgres]==1.3.4"
pip install "openmetadata-ingestion[dbt]==1.3.4"
pip install "openmetadata-ingestion[airflow]==1.3.4"
⚠ Version Alignment

The ingestion library version must match the server version exactly. If your OpenMetadata Docker image is 1.3.4 then install openmetadata-ingestion==1.3.4. A mismatch produces silent failures where ingestion runs but no metadata appears in the UI.

3.2 PostgreSQL Connector Configuration

# catalog/connectors/postgres_connector.yaml
source:
  type: postgres
  serviceName: corep-postgres
  serviceConnection:
    config:
      type: Postgres
      username: ${POSTGRES_USER}
      authType:
        password: ${POSTGRES_PASSWORD}
      hostPort: localhost:5432
      database: corep
  sourceConfig:
    config:
      type: DatabaseMetadata
      schemaFilterPattern:
        includes:
          - ^raw$
          - ^staging$
          - ^intermediate$
          - ^mart$
          - ^audit$
          - ^secure$
          - ^mapping$
        excludes:
          - ^pg_.*            # exclude internal Postgres schemas
          - ^information_schema$
      tableFilterPattern:
        includes:
          - .*                # discover all tables in included schemas
      includeViews: true
      includeTags: true
      markDeletedTables: true    # flag tables dropped from DB as Deleted in catalog
      includeDDL: false          # DDL may contain sensitive info — do not store

sink:
  type: metadata-rest
  config:
    api_endpoint: http://localhost:8585/api
    auth_provider: openmetadata
    security_config:
      jwt_token: ${OM_JWT_TOKEN}   # generated in OpenMetadata Settings → Bots

workflowConfig:
  openMetadataServerConfig:
    hostPort: http://localhost:8585/api
    authProvider: openmetadata
    securityConfig:
      jwtToken: ${OM_JWT_TOKEN}

3.3 Run the PostgreSQL Ingestion

# Run connector
metadata ingest -c catalog/connectors/postgres_connector.yaml

# Expected output
INFO  Ingesting metadata for service: corep-postgres
INFO  Processing schema: raw         — 6 tables found
INFO  Processing schema: staging     — 5 tables found
INFO  Processing schema: intermediate— 4 tables found
INFO  Processing schema: mart        — 4 tables found
INFO  Processing schema: audit       — 2 tables found
INFO  Writing 21 tables to OpenMetadata...
INFO  ✓ Ingestion completed successfully.

After this runs, open http://localhost:8585, navigate to Explore → Tables and you will see all 21 tables with column names, data types, and cardinality statistics. This is Layer 1 complete — in under five minutes, with no manual entry.

3.4 What OpenMetadata Discovers Automatically

Metadata itemSourceAuto-discovered?
Table name, schema, ownerpg_catalog.pg_tablesYes
Column names and data typesinformation_schema.columnsYes
Primary key, NOT NULL constraintsinformation_schema.table_constraintsYes
Row count (approximate)pg_stat_user_tables.n_live_tupYes
Column-level cardinality / null fractionpg_statsYes after ANALYZE
Table description (COMMENT ON TABLE)pg_descriptionYes
Column description (COMMENT ON COLUMN)pg_descriptionYes
Business glossary termsManual or APINo — curated
EBA concept IDsManual or APINo — curated
PII tagsManual or policy ruleNo — curated
📚 Layer 2 — Business Metadata: EBA Glossary & PII Tags

4. Building the EBA Regulatory Glossary

A glossary in OpenMetadata is a hierarchical dictionary of terms where each term has a definition, synonyms, related terms, and — critically — links to the columns in your tables that represent that concept. This is what bridges the gap between a column called amount in raw.capital_instruments and the EBA concept c0010 — Own funds instruments in the COREP C 01.00 template.

4.1 Glossary Structure for COREP

# Create the EBA glossary via OpenMetadata REST API
# catalog/scripts/create_eba_glossary.py

import requests, os

BASE = os.environ.get("OM_API", "http://localhost:8585/api")
HEADERS = {
    "Authorization": f"Bearer {os.environ['OM_JWT_TOKEN']}",
    "Content-Type": "application/json",
}

# ── 1. Create the top-level glossary ─────────────────────────────
glossary_payload = {
    "name": "eba-corep-glossary",
    "displayName": "EBA COREP Regulatory Glossary",
    "description": (
        "Canonical definitions for all data elements used in COREP regulatory "
        "reporting. Terms are aligned to EBA Data Point Model 4.0 concept IDs. "
        "Reference: https://www.eba.europa.eu/risk-analysis-and-data/reporting-frameworks"
    ),
    "owners": [],
    "reviewers": [],
    "tags": ["regulatory", "eba-dpm-4.0"],
    "mutuallyExclusive": False,
}
r = requests.post(f"{BASE}/v1/glossaries", json=glossary_payload, headers=HEADERS)
r.raise_for_status()
glossary_id = r.json()["id"]
print(f"Glossary created: {glossary_id}")

# ── 2. Create glossary terms ──────────────────────────────────────
EBA_TERMS = [
    {
        "name": "own-funds",
        "displayName": "Own Funds",
        "description": (
            "Total regulatory capital comprising CET1, AT1, and Tier 2 instruments. "
            "Defined in CRR Article 4(1)(118). EBA DPM concept: c0010. "
            "Reported in COREP template C 01.00 row 010."
        ),
        "synonyms": ["Total Capital", "Regulatory Capital", "Eligible Capital"],
        "relatedTerms": [],
        "references": [
            {"name": "CRR Article 4(1)(118)", "endpoint": "https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32013R0575"},
            {"name": "EBA DPM 4.0 — c0010", "endpoint": "https://www.eba.europa.eu/sites/default/documents/files/document_library/EBA_DPM_4.0.pdf"},
        ],
    },
    {
        "name": "cet1-capital",
        "displayName": "Common Equity Tier 1 Capital",
        "description": (
            "Highest quality regulatory capital: ordinary shares, retained earnings, "
            "other comprehensive income. CRR Article 26. Minimum ratio: 4.5% of RWA. "
            "EBA DPM concept: c0020. COREP C 01.00 row 020."
        ),
        "synonyms": ["CET1", "Core Tier 1"],
        "references": [
            {"name": "CRR Article 26", "endpoint": "https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32013R0575"},
        ],
    },
    {
        "name": "risk-weighted-assets",
        "displayName": "Risk-Weighted Assets",
        "description": (
            "Assets adjusted for credit, market, and operational risk. The denominator "
            "of all Basel III capital ratios. Calculated under CRR Part Three. "
            "EBA DPM concept: c0060. COREP C 02.00."
        ),
        "synonyms": ["RWA", "Risk Weighted Exposures"],
        "references": [
            {"name": "CRR Part Three Title II", "endpoint": "https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32013R0575"},
        ],
    },
    {
        "name": "hqla",
        "displayName": "High Quality Liquid Assets",
        "description": (
            "Assets that can be converted to cash with minimal loss in stressed market "
            "conditions. Classified as Level 1, 2A, or 2B under Delegated Regulation 2015/61. "
            "Numerator of the Liquidity Coverage Ratio. EBA DPM: c0010 in C 47.00."
        ),
        "synonyms": ["High-Quality Liquid Assets", "LCR Buffer", "Liquidity Buffer"],
        "references": [
            {"name": "Del. Reg. 2015/61 Article 7", "endpoint": "https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32015R0061"},
        ],
    },
    {
        "name": "lcr",
        "displayName": "Liquidity Coverage Ratio",
        "description": (
            "Ratio of HQLA stock to net cash outflows over a 30-day stress period. "
            "Minimum requirement: 100%. Del. Reg. 2015/61 Article 4. "
            "EBA DPM concept: c0090 in COREP C 47.00."
        ),
        "synonyms": ["Liquidity Ratio", "30-Day Liquidity"],
    },
    {
        "name": "exposure-at-default",
        "displayName": "Exposure at Default",
        "description": (
            "The expected outstanding loan balance at the time of borrower default. "
            "Input to RWA calculation. CRR Article 166. EBA DPM: ead column in C 02.00."
        ),
        "synonyms": ["EAD", "Credit Exposure"],
    },
]

for term in EBA_TERMS:
    term["glossary"] = {"id": glossary_id, "type": "glossary"}
    r = requests.post(f"{BASE}/v1/glossaryTerms", json=term, headers=HEADERS)
    r.raise_for_status()
    print(f"  Term created: {term['displayName']}")

print("EBA glossary complete.")

4.2 Linking Glossary Terms to Columns

Once terms exist, you link them to the columns they describe. This is the step that makes your catalog machine-readable: a query to the OpenMetadata API can now return “column mart.corep_c0100.cet1_capital is defined as EBA concept c0020 — Common Equity Tier 1 Capital.”

# catalog/scripts/tag_columns.py — attach glossary terms to columns

import requests, os

BASE    = os.environ.get("OM_API", "http://localhost:8585/api")
HEADERS = {
    "Authorization": f"Bearer {os.environ['OM_JWT_TOKEN']}",
    "Content-Type": "application/json",
}

# Maps (schema.table, column) → glossary term name
COLUMN_TERM_MAP = [
    ("mart.corep_c0100", "own_funds",         "own-funds"),
    ("mart.corep_c0100", "cet1_capital",       "cet1-capital"),
    ("mart.corep_c0100", "total_rwa",           "risk-weighted-assets"),
    ("mart.corep_c0200", "ead",                 "exposure-at-default"),
    ("mart.corep_c0200", "rwa",                 "risk-weighted-assets"),
    ("mart.corep_c4700", "hqla_buffer",         "hqla"),
    ("mart.corep_c4700", "lcr_ratio",           "lcr"),
    ("raw.capital_instruments", "amount",       "own-funds"),
    ("raw.rwa_exposures", "ead",               "exposure-at-default"),
    ("raw.liquidity_assets", "market_value",   "hqla"),
]

def get_table_fqn(schema_table: str) -> str:
    """Build fully qualified table name for OpenMetadata API."""
    schema, table = schema_table.split(".")
    return f"corep-postgres.corep.{schema}.{table}"

def get_table_id(fqn: str) -> str:
    r = requests.get(f"{BASE}/v1/tables/name/{fqn}", headers=HEADERS)
    r.raise_for_status()
    return r.json()["id"]

def get_term_id(term_name: str) -> str:
    r = requests.get(
        f"{BASE}/v1/glossaryTerms/name/eba-corep-glossary.{term_name}",
        headers=HEADERS,
    )
    r.raise_for_status()
    return r.json()["id"]

for schema_table, column, term_name in COLUMN_TERM_MAP:
    fqn      = get_table_fqn(schema_table)
    table_id = get_table_id(fqn)
    term_id  = get_term_id(term_name)

    # PATCH the column with the glossary term tag
    patch = [{
        "op": "add",
        "path": f"/columns/{column}/tags/-",
        "value": {
            "tagFQN": f"eba-corep-glossary.{term_name}",
            "source": "Glossary",
            "labelType": "Automated",
            "state": "Confirmed",
        }
    }]
    r = requests.patch(
        f"{BASE}/v1/tables/{table_id}",
        json=patch,
        headers={**HEADERS, "Content-Type": "application/json-patch+json"},
    )
    r.raise_for_status()
    print(f"  Tagged: {schema_table}.{column} → {term_name}")

4.3 GDPR PII Tags

Your stg_counterparties.sql from Day 6 deliberately dropped name and lei columns (data minimisation). But the raw layer still has them. You must tag those raw columns as PII so that Ranger column masking policies (Day 10) can be applied automatically.

🔒 GDPR Article 4(1) — Personal Data

Under GDPR, a Legal Entity Identifier (LEI) is personal data when it can be used to identify a natural person who controls or is associated with the legal entity. The EBA’s Validation Rules require LEI reporting in C 02.00 for counterparties above certain thresholds. The raw table holds the LEI; the mart does not — this is your GDPR-compliant architecture. The tag is the documented evidence of that design decision.

# catalog/scripts/apply_pii_tags.py — create classification + apply to raw PII columns

# Step 1: Create PII classification tag (one-time setup)
classification_payload = {
    "name": "GDPR",
    "displayName": "GDPR Data Classification",
    "description": "Tags for GDPR Article 4 personal data classification.",
    "mutuallyExclusive": True,
}
r = requests.post(f"{BASE}/v1/classifications", json=classification_payload, headers=HEADERS)
r.raise_for_status()

# Step 2: Create tags within the classification
PII_TAGS = [
    {"name": "PII",          "description": "Directly identifies a natural or legal person. Access restricted."},
    {"name": "Pseudonymised", "description": "Personal data that has been pseudonymised per GDPR Article 4(5)."},
    {"name": "Non-PII",       "description": "Confirmed non-personal data — safe to aggregate and share."},
]
for tag in PII_TAGS:
    tag["classification"] = "GDPR"
    r = requests.post(f"{BASE}/v1/tags", json=tag, headers=HEADERS)
    r.raise_for_status()
    print(f"  Tag created: GDPR.{tag['name']}")

# Step 3: Apply PII tag to raw columns that contain personal data
PII_COLUMNS = [
    ("raw.counterparties", "name"),   # legal entity name — PII
    ("raw.counterparties", "lei"),    # LEI — PII under GDPR
    ("raw.loans",          "borrower_name"),
    ("raw.loans",          "borrower_id"),
]

NON_PII_COLUMNS = [
    ("mart.corep_c0100", "own_funds"),
    ("mart.corep_c0100", "cet1_capital"),
    ("mart.corep_c0300", "cet1_ratio"),
    ("mart.corep_c4700", "lcr_ratio"),
]

def apply_tag(schema_table, column, tag_fqn):
    fqn      = get_table_fqn(schema_table)
    table_id = get_table_id(fqn)
    patch = [{
        "op": "add",
        "path": f"/columns/{column}/tags/-",
        "value": {"tagFQN": tag_fqn, "source": "Classification", "labelType": "Manual", "state": "Confirmed"},
    }]
    r = requests.patch(
        f"{BASE}/v1/tables/{table_id}", json=patch,
        headers={**HEADERS, "Content-Type": "application/json-patch+json"},
    )
    r.raise_for_status()
    print(f"  Applied {tag_fqn} → {schema_table}.{column}")

for schema_table, column in PII_COLUMNS:
    apply_tag(schema_table, column, "GDPR.PII")

for schema_table, column in NON_PII_COLUMNS:
    apply_tag(schema_table, column, "GDPR.Non-PII")
⚙ Layer 3 — Operational Metadata: Lineage, Freshness & Quality

5. Connecting dbt to OpenMetadata

The dbt connector reads your dbt/target/manifest.json and catalog.json — the artefacts produced by dbt docs generate — and pushes column-level lineage, model descriptions, and test results into OpenMetadata. This is where Layer 3 becomes automated.

# catalog/connectors/dbt_connector.yaml
source:
  type: dbt
  serviceName: corep-postgres      # attach dbt metadata to the existing Postgres service
  sourceConfig:
    config:
      type: DBT
      dbtConfigSource:
        dbtCatalogFilePath:  dbt/target/catalog.json
        dbtManifestFilePath: dbt/target/manifest.json
        dbtRunResultsFilePath: dbt/target/run_results.json   # test pass/fail status
      dbtUpdateDescriptions: true   # push dbt model descriptions into OM table descriptions
      includeDbtTags: true          # push dbt tags (e.g. tag: corep, tag: pii_free)

sink:
  type: metadata-rest
  config:
    api_endpoint: http://localhost:8585/api
    auth_provider: openmetadata
    security_config:
      jwt_token: ${OM_JWT_TOKEN}

workflowConfig:
  openMetadataServerConfig:
    hostPort: http://localhost:8585/api
    authProvider: openmetadata
    securityConfig:
      jwtToken: ${OM_JWT_TOKEN}
# Generate dbt artefacts first, then run the connector
cd corep-governance-pipeline
dbt docs generate --project-dir dbt --profiles-dir dbt

# Push dbt metadata to OpenMetadata
metadata ingest -c catalog/connectors/dbt_connector.yaml

After this runs, open any mart table in OpenMetadata and click Lineage. You will see the full column-level DAG from CSV source file through to the mart column — driven by dbt’s manifest.json which records every ref() and source() call.

6. Connecting Marquez Lineage to OpenMetadata

Marquez holds the OpenLineage events emitted by your Python modules and Airflow DAG. OpenMetadata can federate this lineage via its OpenLineage connector, showing pipeline-level job-to-dataset edges alongside dbt column-level edges in a single graph.

# catalog/connectors/openlineage_connector.yaml
source:
  type: OpenLineage
  serviceName: corep-pipeline-lineage
  serviceConnection:
    config:
      type: OpenLineage
      openLineageApiEndpoint: http://localhost:5000/api/v1   # Marquez API
      lineageNamespace: corep-governance-pipeline
  sourceConfig:
    config:
      type: PipelineMetadata
      includeLineage: true

sink:
  type: metadata-rest
  config:
    api_endpoint: http://localhost:8585/api
    auth_provider: openmetadata
    security_config:
      jwt_token: ${OM_JWT_TOKEN}

workflowConfig:
  openMetadataServerConfig:
    hostPort: http://localhost:8585/api
    authProvider: openmetadata
    securityConfig:
      jwtToken: ${OM_JWT_TOKEN}

7. Adding Table Freshness to Layer 3

OpenMetadata can query your tables and compute “time since last updated” by reading the audit.pipeline_run_log table you built in Day 2. This appears as a freshness indicator on every table in the catalog — no extra infrastructure needed.

# Add table profile + freshness config to postgres_connector.yaml
source:
  type: postgres
  serviceName: corep-postgres
  serviceConnection:
    config:
      type: Postgres
      # ... (same as before)
  sourceConfig:
    config:
      type: DatabaseMetadata
      # ... (same as before)

# Add a SEPARATE profiler workflow for freshness
---
source:
  type: postgres
  serviceName: corep-postgres
  sourceConfig:
    config:
      type: Profiler
      generateSampleData: true
      computeMetrics: true
      profileSample: 30.0                   # profile 30% of rows (fast)
      timeoutSeconds: 300
      tableConfig:
        - fullyQualifiedName: corep-postgres.corep.mart.corep_c0100
          profileSample: 100.0              # always full-scan mart tables (small)
        - fullyQualifiedName: corep-postgres.corep.mart.corep_c0300
          profileSample: 100.0

8. The catalog.py Module

"""
modules/catalog.py — Push pipeline run metadata to OpenMetadata after each run.

Responsibilities:
  - Trigger PostgreSQL connector ingestion (refresh table stats)
  - Trigger dbt connector ingestion (push manifest + lineage)
  - Write pipeline run completion time to audit.pipeline_run_log
    so OpenMetadata freshness checks have a timestamp to read
"""

import logging, os, subprocess
from pathlib import Path

from modules.base import BaseModule

log = logging.getLogger(__name__)

CATALOG_DIR = Path(os.environ.get("CATALOG_DIR", "catalog"))


class CatalogModule(BaseModule):
    MODULE_NAME = "catalog"

    _CONNECTORS = [
        "connectors/postgres_connector.yaml",
        "connectors/dbt_connector.yaml",
        "connectors/openlineage_connector.yaml",
    ]

    def input_check(self) -> None:
        """Verify connector YAML files and OM_JWT_TOKEN are present."""
        if not os.environ.get("OM_JWT_TOKEN"):
            raise RuntimeError(
                "[catalog] OM_JWT_TOKEN not set. "
                "Generate a bot token in OpenMetadata Settings → Bots."
            )
        missing = [
            str(CATALOG_DIR / c)
            for c in self._CONNECTORS
            if not (CATALOG_DIR / c).exists()
        ]
        if missing:
            raise RuntimeError("[catalog] Missing connector files: " + ", ".join(missing))

    def _execute(self) -> None:
        for connector_path in self._CONNECTORS:
            full_path = CATALOG_DIR / connector_path
            log.info("[catalog] Running ingestion: %s", connector_path)
            result = subprocess.run(
                ["metadata", "ingest", "-c", str(full_path)],
                capture_output=True, text=True, timeout=300,
            )
            if result.returncode != 0:
                log.warning(
                    "[catalog] Ingestion failed for %s (non-fatal):\n%s",
                    connector_path, result.stderr[-2000:]
                )
            else:
                log.info("[catalog] Ingestion complete: %s", connector_path)

    def emit_lineage(self) -> None:
        log.info("[catalog] No lineage event — catalog is metadata, not data.")

    def output_check(self) -> None:
        """Verify OM API responds with tables present."""
        import requests
        api = os.environ.get("OM_API", "http://localhost:8585/api")
        headers = {"Authorization": f"Bearer {os.environ['OM_JWT_TOKEN']}"}
        r = requests.get(f"{api}/v1/tables?service=corep-postgres&limit=1", headers=headers, timeout=10)
        r.raise_for_status()
        count = r.json().get("paging", {}).get("total", 0)
        if count == 0:
            raise RuntimeError("[catalog] OpenMetadata reports 0 tables for corep-postgres service.")
        log.info("[catalog] OpenMetadata has %d tables indexed for corep-postgres.", count)

9. What the Catalog Looks Like After Setup

OpenMetadata screenWhat you seeMetadata layer
Explore → Tables → mart.corep_c0100Columns, types, row count (1 row), description from dbtLayer 1
corep_c0100 → Schema tab → cet1_capital columnGlossary term “CET1 Capital”, EBA concept c0020, GDPR.Non-PII tagLayer 2
corep_c0100 → Lineage tabFull DAG: CSV → raw → staging → intermediate → mart (dbt column-level) + Airflow job node (Marquez)Layer 3
corep_c0100 → Activity FeedLast ingested timestamp, dbt run result (PASS/FAIL), GX quality badgeLayer 3
raw.counterparties → Schema → name columnGDPR.PII tag in red, tooltip: “This column is masked in Trino for non-privileged roles”Layer 2
Glossary → EBA COREP Regulatory Glossary6 terms with definitions, legal references, synonyms, linked columnsLayer 2
Explore → Search “LCR”Returns corep_c4700, hqla glossary term, raw.liquidity_assets, dbt model int_lcr_hqlaLayer 2

10. Answering the ECB SREP Inspector Question in 30 Seconds

The question that triggered this blog post: “Show me the definition of Own Funds in your data dictionary, trace it to your COREP C 01.00 submission, and show me the last time it was quality-checked.”

What the inspector wantsWhere it lives in OpenMetadataHow it got there
“Definition of Own Funds”Glossary → EBA COREP → own-funds term with CRR Article 4(1)(118) referencecreate_eba_glossary.py
“Trace to COREP C 01.00”mart.corep_c0100 → own_funds column → glossary tag: own-fundstag_columns.py
“Where does the number come from?”mart.corep_c0100 → Lineage tab → traces to raw.capital_instruments.amount via dbt + Airflowdbt connector + Marquez connector
“Last quality check”mart.corep_c0100 → Activity Feed → GX checkpoint mart_corep_suite PASS with timestampquality.py + OM audit write
“Who is allowed to see this?”raw.counterparties.lei → GDPR.PII tag → Ranger policy link (Day 10)apply_pii_tags.py
✓ This Is What “Machine-Readable Metadata” Means to a Supervisor

The inspector does not need to call a data engineer. They can navigate OpenMetadata themselves. Every answer links to evidence. Every piece of evidence has a timestamp. The entire chain from raw CSV to XBRL submission is visible in one screen. This is the practical definition of BCBS 239 Principle 1 — Governance and Architecture.

11. Full Metadata Coverage Map for the COREP Pipeline

Data assetTechnical metaSemantic metaOperational meta
raw.capital_instrumentsAuto-discoveredEBA glossary + GDPR tagsGX Layer 1 results
staging.stg_capital_instrumentsAuto-discovereddbt descriptionsdbt lineage + run results
intermediate.int_capital_by_tierAuto-discovereddbt descriptionsdbt lineage
mart.corep_c0100Auto-discoveredEBA glossary + Non-PII tags + C 01.00 linkdbt lineage + GX Layer 2 results
mart.corep_c0300Auto-discoveredEBA glossary (cet1_ratio, lcr)GX pair checks logged
raw.counterpartiesAuto-discoveredGDPR.PII on name, leiAirflow job lineage
Airflow DAG corep_pipelineAirflow connectorManual descriptionRun history + SLA
dbt models (all)dbt connector (manifest)dbt model descriptionsdbt run_results.json tests

12. Which Regulation Requires Which Metadata Layer

Regulation / PrincipleSpecific requirementMetadata layerHow OpenMetadata satisfies it
BCBS 239 Principle 1Board-approved data governance framework with documented data dictionariesLayer 2EBA glossary with CRR references and owner fields
BCBS 239 Principle 3Completeness — all material risk data capturedLayer 3GX completeness results linked to tables via Activity Feed
BCBS 239 Principle 6Accuracy — data reflects actual risk positions; reconciliation possibleLayer 3Column-level lineage proves the mart number derives from the source file
GDPR Article 30Records of processing activities — document what personal data you holdLayer 2GDPR.PII tags on raw columns with column-level granularity
GDPR Article 25Data protection by design — minimise personal data in processingLayer 2GDPR.Non-PII tag on mart confirms minimisation was applied
EBA IRRBB GL 2022/14Data lineage documented for interest rate risk data elementsLayer 3Marquez OpenLineage federation in OpenMetadata lineage tab
ECB SREP supervisionBank can demonstrate data quality and lineage on demandAll threeSingle OpenMetadata URL per table answers all three questions

📚 Day 9 Key Takeaways

  • Three distinct metadata layers — Technical (what exists), Semantic (what it means), Operational (what happened to it) — require different tools and different processes to populate.
  • Layer 1 is free after five minutes — the PostgreSQL connector auto-discovers all 21 tables, all column types, all row counts with a single metadata ingest call.
  • Layer 2 requires curation — EBA glossary terms must be written by a human who understands CRR and EBA DPM. This is the most valuable and most under-invested layer in every bank.
  • GDPR.PII tags at column level are the bridge between OpenMetadata (catalog) and Apache Ranger (enforcement on Day 10). The tag triggers the masking policy automatically.
  • The dbt connector pushes column-level lineage from manifest.json directly into OpenMetadata — no OpenLineage event required for dbt models.
  • The ECB question — definition + lineage + quality timestamp — is answerable in under 30 seconds when all three layers are populated.
  • Next: Day 10 — Apache Ranger + Trino: enforcing the GDPR.PII tags as column masking policies so that no SELECT * FROM raw.counterparties ever returns an unmasked LEI to an unauthorised analyst.
Previous Post

Data Quality Gates for Regulatory Reporting — What Great Expectations Catches That dbt Tests Miss

Next Post
Add a comment

Leave a Reply

Your email address will not be published. Required fields are marked *