Popular Now
Infographic illustrating production‑ready GKE architecture, showing Google Cloud services, Kubernetes clusters, DevOps/GitOps workflows, SRE practices, observability, security, and disaster recovery components.

Production-Ready GKE: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Infographic showing best practices for production‑ready EKS deployments, illustrating AWS cloud architecture, Kubernetes clusters, GitOps automation, observability, security, and disaster recovery principles.

Production-Ready EKS: The Complete Best Practices Guide for Enterprise Kubernetes Deployments

Apache Ranger + Trino: Centralised Data Security for a Banking Governance Pipeline

Column masking, row filters, and schema-level RBAC with Apache Ranger on Trino — enforcing GDPR Article 25 in a COREP banking pipeline with full audit trail and policy-as-code.
📅 Day 10 of 18  ·  COREP Governance Pipeline Series  ·  Data Security & Access Control

Day 9 tagged raw.counterparties.lei and raw.counterparties.name as GDPR.PII in OpenMetadata. Tags are documentation. They describe intent. They do not enforce anything.

An analyst who connects directly to PostgreSQL on port 5432 and runs SELECT * FROM raw.counterparties will see every LEI, every legal entity name, completely unmasked — regardless of how many tags you applied in the catalog. The tag said “this is PII.” Nobody stopped them.

This is the gap that Apache Ranger fills. Ranger is a centralised policy engine that intercepts every query before it reaches the data. When an analyst queries raw.counterparties through Trino, Ranger’s plugin evaluates their role against the active policies and either allows the query, masks the PII column, filters rows, or denies access entirely — before a single byte of data is read from PostgreSQL.

This post wires Ranger into your Trino layer with four policy types, documents the security.py module, and maps every policy to its GDPR, CRR, and BCBS 239 requirement.

1. Why Ranger — and Not Just PostgreSQL GRANT Statements

Every experienced DBA reading this will ask: why add Ranger complexity when PostgreSQL already has GRANT, row-level security, and column-level privileges?

CapabilityPostgreSQL GRANT / RLSApache Ranger + Trino plugin
Column-level access controlYes — GRANT SELECT(col)Yes — column filter policy
Row-level filteringYes — Row Level SecurityYes — row filter policy
Column masking (show partial value)No — only full hide or showYes — mask to SHA256, first 3 chars, NULL, custom expression
Audit log of every querypg_audit — DDL/DML but no column-levelEvery column access logged with user, timestamp, policy ID
Policies apply across ALL data sourcesNo — per-database onlyTrino federates Postgres + Iceberg/MinIO + future sources under one policy engine
Policy-as-code (version-controlled JSON)No — SQL DDL onlyYes — Ranger REST API, policies stored as JSON
No direct database connection required for analystsNo — analysts need DB credentialsYes — analysts query Trino only, no PostgreSQL credentials needed
Tag-based policies (“if GDPR.PII then mask”)NoYes — Ranger tag-based policies from Atlas/OpenMetadata
🔒 The Architecture Rule: Trino Is the Only Query Surface

From Day 1 of this pipeline the rule has been: all consumers query through Trino. Nobody connects directly to PostgreSQL port 5432 (except pipeline service accounts). Nobody connects to MinIO via S3 API directly. This single architectural constraint means Ranger’s Trino plugin is the only enforcement point you need. If a query doesn’t go through Trino, it doesn’t happen.

In your Docker network, PostgreSQL port 5432 is not exposed to the host. Only Trino port 8080 is. This is the network-level enforcement of the architectural rule.

2. Security Architecture

  Query Path with Ranger Enforcement
  ═══════════════════════════════════════════════════════════════════

  Analyst / Airflow / Superset
          │
          │  SQL query over HTTP (port 8080)
          ▼
  ┌──────────────────────────────────┐
  │         Trino Coordinator        │
  │                                  │
  │  ┌────────────────────────────┐  │
  │  │  Ranger Trino Plugin       │  │  ◄── intercepts EVERY query
  │  │  • Parses SQL AST          │  │
  │  │  • Extracts table/columns  │  │
  │  │  • Calls Ranger REST API   │  │
  │  └──────────┬─────────────────┘  │
  │             │                    │
  └─────────────┼────────────────────┘
                │
                ▼
  ┌──────────────────────────────────┐
  │    Apache Ranger Admin :6080     │
  │                                  │
  │  Policy evaluation engine:       │
  │  1. Find matching policies       │
  │     (user + resource + action)   │
  │  2. Evaluate in priority order   │
  │  3. Return: ALLOW / DENY /       │
  │             MASK / ROW_FILTER    │
  │  4. Write audit log              │
  └──────────┬───────────────────────┘
             │
             │ Decision returned to Trino plugin
             ▼
  ┌──────────────────────────────────┐
  │  Trino executes query with       │
  │  policy applied:                 │
  │                                  │
  │  ALLOW   → full result           │
  │  MASK    → lei = SHA256(lei)      │
  │  ROW FILT→ WHERE tier = 'CET1'  │
  │  DENY    → error: Access denied  │
  └──────────┬───────────────────────┘
             │
             ▼
  ┌───────────────────┐   ┌───────────────────┐
  │  PostgreSQL :5432 │   │  MinIO / Iceberg  │
  │  (internal only)  │   │  (internal only)  │
  └───────────────────┘   └───────────────────┘
             │
             ▼
  Ranger audit log → MinIO bucket: corep-ranger-audit
  (every query, every user, every column access, every decision)

3. Ranger + Trino Plugin Setup

3.1 Verify Ranger Is Running

docker compose --project-directory C:\reg_repo\platform-solution\corep-governance-pipeline ps | grep ranger
# Expected: ranger   running   0.0.0.0:6080->6080/tcp

# Default credentials — CHANGE IN PRODUCTION
# URL: http://localhost:6080
# User: admin   Password: rangerR0cks!

3.2 Install the Ranger Trino Plugin

The Ranger Trino plugin is a JAR that sits in Trino’s plugin directory and intercepts the system access control interface. Your docker/trino/config.properties already has the hook. Now you configure it.

# docker/trino/ranger-trino-plugin/ranger-trino-security.xml
# This file tells the Trino plugin where to find the Ranger Admin server

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

  <property>
    <name>ranger.plugin.trino.service.name</name>
    <value>corep-trino</value>
    <!-- Must match the service name created in Ranger Admin UI -->
  </property>

  <property>
    <name>ranger.plugin.trino.policy.source.impl</name>
    <value>org.apache.ranger.admin.client.RangerAdminRESTClient</value>
  </property>

  <property>
    <name>ranger.plugin.trino.policy.rest.url</name>
    <value>http://ranger:6080</value>
    <!-- ranger is the Docker service name -->
  </property>

  <property>
    <name>ranger.plugin.trino.policy.pollIntervalMs</name>
    <value>30000</value>
    <!-- Trino pulls policy updates every 30 seconds -->
  </property>

  <property>
    <name>ranger.plugin.trino.access.cluster.name</name>
    <value>corep-governance-pipeline</value>
  </property>

</configuration>
# docker/trino/ranger-trino-plugin/ranger-trino-audit.xml
# Configures audit log destination — write to local file + MinIO S3

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

  <property>
    <name>xasecure.audit.is.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>xasecure.audit.solr.is.enabled</name>
    <value>false</value>
    <!-- Not using Solr in this stack -->
  </property>

  <property>
    <name>xasecure.audit.hdfs.is.enabled</name>
    <value>false</value>
  </property>

  <property>
    <name>xasecure.audit.log4j.is.enabled</name>
    <value>true</value>
    <!-- Write to container log — Docker log driver captures it -->
  </property>

  <property>
    <name>xasecure.audit.destination.db.is.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>xasecure.audit.destination.db.jdbc.url</name>
    <value>jdbc:postgresql://postgres:5432/corep</value>
  </property>

  <property>
    <name>xasecure.audit.destination.db.jdbc.driver</name>
    <value>org.postgresql.Driver</value>
  </property>

  <property>
    <name>xasecure.audit.destination.db.user</name>
    <value>corep_admin</value>
  </property>

  <property>
    <name>xasecure.audit.destination.db.password</name>
    <value>${POSTGRES_PASSWORD}</value>
    <!-- Injected from Docker env at container start -->
  </property>

</configuration>

3.3 Register the Trino Service in Ranger

# catalog/scripts/create_ranger_service.py
# Run once to register the Trino service in Ranger Admin

import requests, os, json

RANGER_URL = os.environ.get("RANGER_URL", "http://localhost:6080")
RANGER_AUTH = (
    os.environ.get("RANGER_ADMIN_USER", "admin"),
    os.environ.get("RANGER_ADMIN_PASSWORD", "rangerR0cks!"),
)
HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}

service_payload = {
    "name":        "corep-trino",
    "displayName": "COREP Governance Pipeline — Trino",
    "type":        "trino",
    "description": "Trino query engine for COREP regulatory pipeline. All analyst queries go through this service.",
    "isEnabled":   True,
    "configs": {
        "username":    "trino_ranger_user",  # service account Ranger uses to validate resources
        "password":    os.environ.get("TRINO_RANGER_PASSWORD", ""),
        "jdbc.driverClassName": "io.trino.jdbc.TrinoDriver",
        "jdbc.url":    "jdbc:trino://trino:8080",
    },
}

r = requests.post(
    f"{RANGER_URL}/service/plugins/services",
    json=service_payload,
    headers=HEADERS,
    auth=RANGER_AUTH,
)
r.raise_for_status()
service_id = r.json()["id"]
print(f"Ranger service created: corep-trino (id={service_id})")
👥 RBAC — Banking Roles and Permissions

4. Defining Banking Roles

A banking data governance pipeline has a small but clearly defined set of consumer roles. Each maps to a real job function and a real set of data access requirements.

RoleJob functionWhat they needWhat they must NOT see
corep_reportingRegulatory reporting team — submits COREP to ECB/NCAsFull access to mart.*, read-only intermediate.*raw.* PII columns (name, lei)
risk_analystRisk management — analyses capital adequacy, LCRmart.*, staging.* aggregates, no individual borrower dataraw.loans individual rows, raw.counterparties.lei
data_engineerPipeline development and maintenanceAll schemas, all columns — for debuggingNothing (but all access audited)
auditorInternal audit, external audit firms, ECB inspectorsaudit.*, read-only mart.* and lineage, GX data docsraw PII columns, staging.stg_counterparties (even though PII-free, still restricted)
pipeline_serviceAirflow DAG service accountAll schemas — pipeline must write raw.*, read mart.*Nothing (service account, not human)
# catalog/scripts/create_ranger_roles.py

ROLES = [
    {"name": "corep_reporting", "description": "Regulatory reporting team — COREP submission"},
    {"name": "risk_analyst",     "description": "Risk management — capital and liquidity analysis"},
    {"name": "data_engineer",   "description": "Pipeline engineering team — full access with audit"},
    {"name": "auditor",         "description": "Internal/external audit and ECB SREP inspectors"},
    {"name": "pipeline_service","description": "Airflow DAG service account"},
]

for role in ROLES:
    r = requests.post(
        f"{RANGER_URL}/service/roles/roles",
        json={
            "name":        role["name"],
            "description": role["description"],
            "users":       [],  # assign users via LDAP/SCIM in production
            "groups":      [],
            "roles":       [],
        },
        headers=HEADERS,
        auth=RANGER_AUTH,
    )
    r.raise_for_status()
    print(f"  Role created: {role['name']}")
🔅 Column Masking Policies

5. Column Masking Policies — The Core GDPR Enforcement

Column masking is Ranger’s most powerful feature for GDPR compliance. Instead of completely hiding a column (which breaks some queries), masking replaces the value with a transformed version — a hash, a partial value, or a NULL — so the query structure works but the personal data is not exposed.

The EBA’s COREP validation rules require LEI in some C 02.00 rows. The solution: the corep_reporting role sees the real LEI because they are the submission team. The risk_analyst role sees a SHA-256 hash — consistent for joining purposes but not reversible to the actual legal entity identity.

Column Mask mask-pii-lei-for-analysts
Resource: catalog=postgresql / schema=raw / table=counterparties / column=lei
Applies to roles: risk_analyst, auditor
Mask type: HASH (SHA-256)
Regulatory basis: GDPR Article 25 — data protection by design; GDPR Article 4(5) — pseudonymisation
Column Mask mask-pii-name-for-analysts
Resource: catalog=postgresql / schema=raw / table=counterparties / column=name
Applies to roles: risk_analyst, auditor
Mask type: PARTIAL — first 3 chars + “***” (e.g. “Deutsche Bank AG” → “Deu***”)
Regulatory basis: GDPR Article 4(1) — personal data minimisation
Allow allow-full-lei-for-reporting
Resource: catalog=postgresql / schema=raw / table=counterparties / column=lei
Applies to roles: corep_reporting, data_engineer, pipeline_service
Access: SELECT (unmasked)
Justification: COREP C 02.00 EBA submission requires full unmasked LEI per EBA Validation Rule v0001
# catalog/scripts/create_ranger_policies.py — column masking policies

def create_column_mask_policy(
    name: str,
    schema: str,
    table: str,
    column: str,
    roles: list,
    mask_type: str,       # "MASK" (hash), "MASK_SHOW_FIRST_4", "MASK_NULL", "CUSTOM"
    mask_expr: str = "",  # for CUSTOM mask type only
) -> dict:
    return {
        "service":     "corep-trino",
        "name":        name,
        "policyType":  1,  # 0=Access, 1=Masking, 2=RowFilter
        "description": f"Column masking: {schema}.{table}.{column}",
        "isEnabled":   True,
        "resources": {
            "catalog":    {"values": ["postgresql"], "isExcludes": False, "isRecursive": False},
            "schema":     {"values": [schema],       "isExcludes": False, "isRecursive": False},
            "table":      {"values": [table],        "isExcludes": False, "isRecursive": False},
            "column":     {"values": [column],       "isExcludes": False, "isRecursive": False},
        },
        "dataMaskPolicyItems": [
            {
                "dataMaskInfo": {
                    "dataMaskType": mask_type,
                    "valueExpr":   mask_expr,
                },
                "roles":      roles,
                "groups":     [],
                "users":      [],
                "accesses":   [{"type": "select", "isAllowed": True}],
                "conditions": [],
                "delegateAdmin": False,
            }
        ],
    }


MASK_POLICIES = [
    # LEI → SHA-256 hash for analysts (pseudonymised, not anonymised)
    create_column_mask_policy(
        name="mask-lei-analyst",
        schema="raw", table="counterparties", column="lei",
        roles=["risk_analyst", "auditor"],
        mask_type="MASK",   # Ranger built-in SHA-256 hash
    ),
    # Counterparty name → first 3 chars + *** for analysts
    create_column_mask_policy(
        name="mask-name-analyst",
        schema="raw", table="counterparties", column="name",
        roles=["risk_analyst", "auditor"],
        mask_type="MASK_SHOW_FIRST_4",
    ),
    # Borrower name in loans → NULL for all non-engineer roles
    create_column_mask_policy(
        name="mask-borrower-name",
        schema="raw", table="loans", column="borrower_name",
        roles=["risk_analyst", "corep_reporting", "auditor"],
        mask_type="MASK_NULL",
    ),
    # Borrower ID → hash (still usable for COUNT DISTINCT, GROUP BY)
    create_column_mask_policy(
        name="mask-borrower-id",
        schema="raw", table="loans", column="borrower_id",
        roles=["risk_analyst", "corep_reporting", "auditor"],
        mask_type="MASK",
    ),
]

for policy in MASK_POLICIES:
    r = requests.post(
        f"{RANGER_URL}/service/plugins/policies",
        json=policy, headers=HEADERS, auth=RANGER_AUTH,
    )
    r.raise_for_status()
    print(f"  Masking policy created: {policy['name']}")
📌 Row Filter Policies

6. Row-Level Filter Policies

Row filters restrict which rows a role can see, even when they have column access. In a banking pipeline, row filters enforce data domain boundaries — a risk analyst in the credit risk team should not see market risk exposure rows, and a front-office analyst should not see internal treasury positions.

Row Filter filter-rwa-by-exposure-class
Resource: catalog=postgresql / schema=raw / table=rwa_exposures
Applies to role: risk_analyst (when tagged as credit risk)
Filter expression: exposure_class IN ('central_governments','institutions','corporates','retail','real_estate')
Excludes: equity and other exposure classes (market risk team’s domain)
Regulatory basis: CRR Article 112 — exposure class assignments; CRD IV Article 74 — internal governance
Row Filter filter-capital-by-tier
Resource: catalog=postgresql / schema=raw / table=capital_instruments
Applies to role: risk_analyst
Filter expression: tier IN ('CET1', 'AT1', 'T2')
Effect: Excludes any instruments in non-standard tiers or data errors — analyst always sees clean data
# catalog/scripts/create_ranger_policies.py — row filter policies

def create_row_filter_policy(
    name: str,
    schema: str,
    table: str,
    roles: list,
    filter_expr: str,
) -> dict:
    return {
        "service":     "corep-trino",
        "name":        name,
        "policyType":  2,   # 2 = RowFilter
        "description": f"Row filter: {schema}.{table} for {', '.join(roles)}",
        "isEnabled":   True,
        "resources": {
            "catalog": {"values": ["postgresql"], "isExcludes": False, "isRecursive": False},
            "schema":  {"values": [schema],       "isExcludes": False, "isRecursive": False},
            "table":   {"values": [table],        "isExcludes": False, "isRecursive": False},
        },
        "rowFilterPolicyItems": [
            {
                "rowFilterInfo": {"filterExpr": filter_expr},
                "roles":       roles,
                "groups":      [],
                "users":       [],
                "accesses":    [{"type": "select", "isAllowed": True}],
                "conditions":  [],
                "delegateAdmin": False,
            }
        ],
    }


ROW_FILTER_POLICIES = [
    create_row_filter_policy(
        name="filter-rwa-credit-only",
        schema="raw", table="rwa_exposures",
        roles=["risk_analyst"],
        filter_expr="exposure_class IN ('central_governments','institutions','corporates','retail','real_estate')",
    ),
    create_row_filter_policy(
        name="filter-capital-valid-tiers",
        schema="raw", table="capital_instruments",
        roles=["risk_analyst", "corep_reporting"],
        filter_expr="tier IN ('CET1', 'AT1', 'T2')",
    ),
    create_row_filter_policy(
        name="filter-liquidity-hqla-only",
        schema="raw", table="liquidity_assets",
        roles=["risk_analyst"],
        filter_expr="hqla_level IN ('1', '2A', '2B')",
    ),
]

for policy in ROW_FILTER_POLICIES:
    r = requests.post(
        f"{RANGER_URL}/service/plugins/policies",
        json=policy, headers=HEADERS, auth=RANGER_AUTH,
    )
    r.raise_for_status()
    print(f"  Row filter policy created: {policy['name']}")
✅ Access Allow Policies — Schema-Level RBAC

7. Access Allow Policies — Schema-Level RBAC

# Schema-level access policy — corep_reporting can see mart.* and intermediate.*
# but NOT raw.* (only the data_engineer and pipeline_service can)

ACCESS_POLICIES = [
    {
        "service":    "corep-trino",
        "name":       "allow-mart-to-reporting",
        "policyType": 0,
        "isEnabled":  True,
        "resources": {
            "catalog": {"values": ["postgresql"]},
            "schema":  {"values": ["mart", "intermediate"]},
            "table":   {"values": ["*"]},
            "column":  {"values": ["*"]},
        },
        "policyItems": [{
            "roles":   ["corep_reporting", "risk_analyst", "auditor"],
            "accesses": [{"type": "select", "isAllowed": True}],
            "conditions": [], "delegateAdmin": False,
        }],
    },
    {
        "service":    "corep-trino",
        "name":       "allow-raw-to-engineers-only",
        "policyType": 0,
        "isEnabled":  True,
        "resources": {
            "catalog": {"values": ["postgresql"]},
            "schema":  {"values": ["raw"]},
            "table":   {"values": ["*"]},
            "column":  {"values": ["*"]},
        },
        "policyItems": [{
            "roles":   ["data_engineer", "pipeline_service"],
            "accesses": [
                {"type": "select",    "isAllowed": True},
                {"type": "insert",    "isAllowed": True},
                {"type": "update",    "isAllowed": True},
                {"type": "delete",    "isAllowed": True},
                {"type": "create",    "isAllowed": True},
                {"type": "drop",      "isAllowed": True},
            ],
            "conditions": [], "delegateAdmin": False,
        }],
        # Explicit deny for everyone else — deny overrides allow in Ranger
        "denyPolicyItems": [{
            "roles":   ["risk_analyst", "corep_reporting", "auditor"],
            "accesses": [{"type": "select", "isAllowed": True}],
            "conditions": [], "delegateAdmin": False,
        }],
    },
    {
        "service":    "corep-trino",
        "name":       "allow-audit-schema-to-auditors",
        "policyType": 0,
        "isEnabled":  True,
        "resources": {
            "catalog": {"values": ["postgresql"]},
            "schema":  {"values": ["audit"]},
            "table":   {"values": ["*"]},
            "column":  {"values": ["*"]},
        },
        "policyItems": [{
            "roles":   ["auditor", "data_engineer"],
            "accesses": [{"type": "select", "isAllowed": True}],
            "conditions": [], "delegateAdmin": False,
        }],
    },
]

for policy in ACCESS_POLICIES:
    r = requests.post(
        f"{RANGER_URL}/service/plugins/policies",
        json=policy, headers=HEADERS, auth=RANGER_AUTH,
    )
    r.raise_for_status()
    print(f"  Access policy created: {policy['name']}")

8. The security.py Module

"""
modules/security.py — Apply and verify Ranger security policies as a pipeline step.

This module:
  - Creates the Ranger service if it does not exist
  - Pushes all column masking, row filter, and access allow policies
  - Verifies policies are active by polling the Ranger API
  - Exports current policy state to MinIO for audit evidence
"""

import json, logging, os, time
from pathlib import Path

import requests

from modules.base import BaseModule

log = logging.getLogger(__name__)

RANGER_URL   = os.environ.get("RANGER_URL", "http://localhost:6080")
RANGER_AUTH  = (
    os.environ.get("RANGER_ADMIN_USER",     "admin"),
    os.environ.get("RANGER_ADMIN_PASSWORD",  "rangerR0cks!"),
)
HEADERS      = {"Content-Type": "application/json", "Accept": "application/json"}
POLICY_DIR   = Path(os.environ.get("RANGER_POLICY_DIR", "catalog/ranger_policies"))


class SecurityModule(BaseModule):
    MODULE_NAME = "security"

    # Policy JSON files — loaded from POLICY_DIR
    # Each file is a Ranger policy payload (policyType 0/1/2)
    _POLICY_FILES = [
        "access_allow_policies.json",
        "column_mask_policies.json",
        "row_filter_policies.json",
    ]

    def input_check(self) -> None:
        """Verify Ranger Admin is reachable and policy files exist."""
        try:
            r = requests.get(
                f"{RANGER_URL}/service/plugins/services",
                auth=RANGER_AUTH, timeout=10
            )
            r.raise_for_status()
        except Exception as exc:
            raise RuntimeError(
                f"[security] Ranger Admin not reachable at {RANGER_URL}: {exc}"
            )

        missing = [
            str(POLICY_DIR / f)
            for f in self._POLICY_FILES
            if not (POLICY_DIR / f).exists()
        ]
        if missing:
            raise RuntimeError(
                "[security] Missing policy JSON files: " + ", ".join(missing)
                + ". Run: python catalog/scripts/export_ranger_policies.py"
            )
        log.info("[security] Ranger reachable. %d policy files present.", len(self._POLICY_FILES))

    def _execute(self) -> None:
        """Push all policies to Ranger. Existing policies are updated (PUT), new ones created (POST)."""
        for policy_file in self._POLICY_FILES:
            policies = json.loads((POLICY_DIR / policy_file).read_text())
            log.info("[security] Applying %d policies from %s", len(policies), policy_file)

            for policy in policies:
                existing = self._get_existing_policy(policy["name"])
                if existing:
                    policy["id"] = existing["id"]
                    r = requests.put(
                        f"{RANGER_URL}/service/plugins/policies/{existing['id']}",
                        json=policy, headers=HEADERS, auth=RANGER_AUTH,
                    )
                else:
                    r = requests.post(
                        f"{RANGER_URL}/service/plugins/policies",
                        json=policy, headers=HEADERS, auth=RANGER_AUTH,
                    )
                r.raise_for_status()
                log.info(
                    "[security] Policy %s: %s",
                    policy["name"],
                    "updated" if existing else "created",
                )

        time.sleep(5)   # Trino polls Ranger every 30s; allow propagation before output_check
        self._export_policies_to_minio()

    def _get_existing_policy(self, name: str) -> dict | None:
        r = requests.get(
            f"{RANGER_URL}/service/plugins/policies",
            params={"serviceName": "corep-trino", "policyName": name},
            headers=HEADERS, auth=RANGER_AUTH,
        )
        r.raise_for_status()
        policies = r.json().get("policies", [])
        return policies[0] if policies else None

    def _export_policies_to_minio(self) -> None:
        """Export current Ranger policy state to MinIO for versioned audit evidence."""
        try:
            from minio import Minio
            import io
            from datetime import datetime, timezone

            r = requests.get(
                f"{RANGER_URL}/service/plugins/policies",
                params={"serviceName": "corep-trino"},
                headers=HEADERS, auth=RANGER_AUTH,
            )
            r.raise_for_status()
            policy_json = json.dumps(r.json(), indent=2).encode()

            client = Minio(
                os.environ.get("MINIO_ENDPOINT", "minio:9000"),
                access_key=os.environ.get("MINIO_ROOT_USER", "minioadmin"),
                secret_key=os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin"),
                secure=False,
            )
            bucket = "corep-ranger-audit"
            if not client.bucket_exists(bucket):
                client.make_bucket(bucket)

            ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
            object_name = f"policies/ranger_policies_{ts}.json"
            client.put_object(
                bucket, object_name,
                io.BytesIO(policy_json), length=len(policy_json),
                content_type="application/json",
            )
            log.info("[security] Ranger policies exported → minio://%s/%s", bucket, object_name)
        except Exception as exc:
            log.warning("[security] MinIO export failed (non-fatal): %s", exc)

    def emit_lineage(self) -> None:
        log.info("[security] No lineage event — policy enforcement is orthogonal to data lineage.")

    def output_check(self) -> None:
        """Verify all expected policy names are active in Ranger."""
        expected = {"mask-lei-analyst", "mask-name-analyst", "mask-borrower-name",
                    "filter-rwa-credit-only", "allow-mart-to-reporting",
                    "allow-raw-to-engineers-only", "allow-audit-schema-to-auditors"}

        r = requests.get(
            f"{RANGER_URL}/service/plugins/policies",
            params={"serviceName": "corep-trino"},
            headers=HEADERS, auth=RANGER_AUTH,
        )
        r.raise_for_status()
        active = {p["name"] for p in r.json().get("policies", [])}

        missing = expected - active
        if missing:
            raise RuntimeError(
                f"[security] Policies not active in Ranger: {', '.join(missing)}"
            )
        log.info("[security] All %d policies verified active in Ranger.", len(expected))

9. Testing Policies via Trino

After security.py runs, verify the policies work exactly as designed. Connect to Trino as different roles and confirm the masking behaviour.

-- Connect as risk_analyst role
-- Trino CLI: trino --user risk_analyst_user --catalog postgresql --schema raw

-- Test 1: Column masking on lei
SELECT instrument_id, lei, name
FROM raw.counterparties
LIMIT 3;

-- Expected output for risk_analyst_user:
-- instrument_id | lei                                                              | name
-- CP001         | a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae | Deu***
-- CP002         | 2c624232cdd221771294dfbb310acbc8d2e4b5f0d1af56af893d69ccb20b43 | Bar***
-- CP003         | 19581e27de7ced00ff1ce50b2047e7a567c76b1cbaebabe5ef03f7c3017bb5b | Soc***
-- Note: lei is SHA-256 hash. name is truncated. GDPR enforced.

-- Test 2: Row filter on rwa_exposures
SELECT DISTINCT exposure_class FROM raw.rwa_exposures;
-- Expected: only credit classes — equity and other are filtered out

-- Test 3: Schema-level deny for raw schema (analyst tries to access raw directly)
SELECT * FROM raw.capital_instruments LIMIT 1;
-- Expected: ERROR: Access Denied: Cannot select from table: postgresql.raw.capital_instruments
-- (raw schema is blocked — analyst must go through mart)

-- Test 4: Mart access works for risk_analyst (unmasked, aggregated — no PII)
SELECT * FROM mart.corep_c0300;
-- Expected: full result — cet1_ratio, tier1_ratio, etc. No PII in mart.
-- Connect as corep_reporting role

-- Test 5: Reporting role sees full unmasked LEI
SELECT lei FROM raw.counterparties LIMIT 3;
-- Expected: 529900T8BM49AURSDO55  (real LEI — unmasked)
-- corep_reporting has the ALLOW policy for this column

-- Test 6: Reporting role cannot see raw.loans borrower data (masked to NULL)
SELECT borrower_name, borrower_id FROM raw.loans LIMIT 3;
-- Expected: NULL | a665a45920... (name NULL, id hashed)

10. The Ranger Audit Log — Supervisory Evidence

Every query Ranger evaluates — whether allowed, masked, or denied — is written to the audit log. This log is your evidence for ECB SREP Question: “Can you demonstrate that PII columns were not accessible to unauthorised users during the reporting period?”

Audit fieldExample valueSupervisory purpose
eventTime2026-05-07T08:14:33ZTimestamp for every access event
userrisk_analyst_userIdentifies who made the query
accessTypeSELECTType of operation attempted
resourcePathpostgresql/raw/counterparties/leiExact column accessed
accessResultALLOWEDWhether access was granted (with mask applied)
policyId42Which policy made the decision — traceable to version in MinIO
tagsGDPR.PIITags active on the resource at decision time
clusterNamecorep-governance-pipelineIdentifies the pipeline cluster
-- Query Ranger audit log from Trino (audit schema in PostgreSQL)
-- Auditors can run this themselves — no Ranger admin access needed

SELECT
    event_time,
    request_user,
    access_type,
    resource_path,
    access_result,
    policy_id,
    tags
FROM audit.ranger_access_audit
WHERE
    event_time >= DATE_TRUNC('month', CURRENT_DATE)   -- current reporting month
    AND resource_path LIKE '%lei%'                     -- all LEI column accesses
ORDER BY event_time DESC
LIMIT 100;

-- Cross-check: any ALLOW on lei where user is not in corep_reporting or data_engineer?
SELECT DISTINCT request_user, access_result, policy_id
FROM audit.ranger_access_audit
WHERE resource_path LIKE '%lei%'
  AND access_result = 'ALLOWED'
  AND request_user NOT IN (
      SELECT username FROM audit.role_assignments
      WHERE role_name IN ('corep_reporting', 'data_engineer', 'pipeline_service')
  );
-- This query should always return 0 rows. If it doesn't, you have a policy gap.

11. Complete Policy Inventory

Policy nameTypeResourceRoles affectedEffect
mask-lei-analystMaskraw.counterparties.leirisk_analyst, auditorSHA-256 hash
mask-name-analystMaskraw.counterparties.namerisk_analyst, auditorFirst 4 chars + ***
mask-borrower-nameMaskraw.loans.borrower_namerisk_analyst, corep_reporting, auditorNULL
mask-borrower-idMaskraw.loans.borrower_idrisk_analyst, corep_reporting, auditorSHA-256 hash
filter-rwa-credit-onlyRow Filterraw.rwa_exposuresrisk_analystCredit classes only
filter-capital-valid-tiersRow Filterraw.capital_instrumentsrisk_analyst, corep_reportingCET1/AT1/T2 only
filter-liquidity-hqla-onlyRow Filterraw.liquidity_assetsrisk_analystHQLA levels 1/2A/2B only
allow-mart-to-reportingAllowmart.*, intermediate.*corep_reporting, risk_analyst, auditorFull SELECT
allow-raw-to-engineers-onlyAllow + Denyraw.*engineer: full; others: DENYSchema segregation
allow-audit-schema-to-auditorsAllowaudit.*auditor, data_engineerSELECT only

12. Ranger Policies Mapped to Regulations

RegulationSpecific article / principleRanger policy typeHow it is satisfied
GDPR Article 25Data protection by design and by defaultColumn maskPII columns masked by default for all non-submission roles — analyst cannot opt out
GDPR Article 4(5)Pseudonymisation definitionColumn mask (HASH)SHA-256 hash of LEI is pseudonymisation — reversible only with the original value, not by the analyst
GDPR Article 30Records of processing activitiesAudit log → MinIOEvery column access logged with timestamp, user, policy ID — persistent evidence
GDPR Article 32Security of processing — appropriate technical measuresAll policy typesRanger enforcement at query layer is a documented technical measure
CRD IV Article 74Internal governance — data access controlsRow filter + allowRole-based schema access matches organisational data ownership boundaries
BCBS 239 Principle 2Data architecture aligned to risk data needsRow filterExposure class row filter enforces domain boundaries between credit and market risk teams
EBA GL on Internal GovernanceData governance framework — access controls documentedPolicy-as-code in MinIOPolicies versioned as JSON in MinIO with timestamps — auditable policy history

📚 Day 10 Key Takeaways

  • Tags without enforcement are documentation, not security. The GDPR.PII tags from Day 9 become enforcement only when Ranger’s masking policy reads them and acts on them at query time.
  • Column masking > column hiding. A masked SHA-256 LEI still lets analysts do COUNT DISTINCT and GROUP BY correctly. A completely hidden column breaks every query that joins on it. Masking is the regulatory-compliant middle ground.
  • Trino as the single query surface is what makes Ranger practical. One enforcement point — not one enforcement point per data source per schema per team.
  • Policy-as-code in MinIO gives you a versioned, timestamped history of who was allowed to see what, and when the policy changed. This is what a GDPR supervisory authority examines in a data breach investigation.
  • Row filters enforce domain boundaries without requiring separate physical tables — credit risk and market risk see the same raw table but different rows, governed by their organisational role.
  • The Ranger audit log is regulatory evidence — query it from Trino via the audit schema. Auditors can self-serve without needing Ranger admin credentials.
  • Next: Day 11 — XBRL generation with Arelle: converting your mart tables into a valid EBA COREP XBRL instance document, complete with EBA taxonomy validation.

Previous Post
Next Post

Cracking the EBA XBRL Taxonomy with Arelle — a Python Walkthrough

Add a comment

Leave a Reply

Your email address will not be published. Required fields are marked *