Day 9 tagged raw.counterparties.lei and raw.counterparties.name as GDPR.PII in OpenMetadata. Tags are documentation. They describe intent. They do not enforce anything.
An analyst who connects directly to PostgreSQL on port 5432 and runs SELECT * FROM raw.counterparties will see every LEI, every legal entity name, completely unmasked — regardless of how many tags you applied in the catalog. The tag said “this is PII.” Nobody stopped them.
This is the gap that Apache Ranger fills. Ranger is a centralised policy engine that intercepts every query before it reaches the data. When an analyst queries raw.counterparties through Trino, Ranger’s plugin evaluates their role against the active policies and either allows the query, masks the PII column, filters rows, or denies access entirely — before a single byte of data is read from PostgreSQL.
This post wires Ranger into your Trino layer with four policy types, documents the security.py module, and maps every policy to its GDPR, CRR, and BCBS 239 requirement.
1. Why Ranger — and Not Just PostgreSQL GRANT Statements
Every experienced DBA reading this will ask: why add Ranger complexity when PostgreSQL already has GRANT, row-level security, and column-level privileges?
| Capability | PostgreSQL GRANT / RLS | Apache Ranger + Trino plugin |
|---|---|---|
| Column-level access control | Yes — GRANT SELECT(col) | Yes — column filter policy |
| Row-level filtering | Yes — Row Level Security | Yes — row filter policy |
| Column masking (show partial value) | No — only full hide or show | Yes — mask to SHA256, first 3 chars, NULL, custom expression |
| Audit log of every query | pg_audit — DDL/DML but no column-level | Every column access logged with user, timestamp, policy ID |
| Policies apply across ALL data sources | No — per-database only | Trino federates Postgres + Iceberg/MinIO + future sources under one policy engine |
| Policy-as-code (version-controlled JSON) | No — SQL DDL only | Yes — Ranger REST API, policies stored as JSON |
| No direct database connection required for analysts | No — analysts need DB credentials | Yes — analysts query Trino only, no PostgreSQL credentials needed |
| Tag-based policies (“if GDPR.PII then mask”) | No | Yes — Ranger tag-based policies from Atlas/OpenMetadata |
From Day 1 of this pipeline the rule has been: all consumers query through Trino. Nobody connects directly to PostgreSQL port 5432 (except pipeline service accounts). Nobody connects to MinIO via S3 API directly. This single architectural constraint means Ranger’s Trino plugin is the only enforcement point you need. If a query doesn’t go through Trino, it doesn’t happen.
In your Docker network, PostgreSQL port 5432 is not exposed to the host. Only Trino port 8080 is. This is the network-level enforcement of the architectural rule.
2. Security Architecture
Query Path with Ranger Enforcement ═══════════════════════════════════════════════════════════════════ Analyst / Airflow / Superset │ │ SQL query over HTTP (port 8080) ▼ ┌──────────────────────────────────┐ │ Trino Coordinator │ │ │ │ ┌────────────────────────────┐ │ │ │ Ranger Trino Plugin │ │ ◄── intercepts EVERY query │ │ • Parses SQL AST │ │ │ │ • Extracts table/columns │ │ │ │ • Calls Ranger REST API │ │ │ └──────────┬─────────────────┘ │ │ │ │ └─────────────┼────────────────────┘ │ ▼ ┌──────────────────────────────────┐ │ Apache Ranger Admin :6080 │ │ │ │ Policy evaluation engine: │ │ 1. Find matching policies │ │ (user + resource + action) │ │ 2. Evaluate in priority order │ │ 3. Return: ALLOW / DENY / │ │ MASK / ROW_FILTER │ │ 4. Write audit log │ └──────────┬───────────────────────┘ │ │ Decision returned to Trino plugin ▼ ┌──────────────────────────────────┐ │ Trino executes query with │ │ policy applied: │ │ │ │ ALLOW → full result │ │ MASK → lei = SHA256(lei) │ │ ROW FILT→ WHERE tier = 'CET1' │ │ DENY → error: Access denied │ └──────────┬───────────────────────┘ │ ▼ ┌───────────────────┐ ┌───────────────────┐ │ PostgreSQL :5432 │ │ MinIO / Iceberg │ │ (internal only) │ │ (internal only) │ └───────────────────┘ └───────────────────┘ │ ▼ Ranger audit log → MinIO bucket: corep-ranger-audit (every query, every user, every column access, every decision)
3. Ranger + Trino Plugin Setup
3.1 Verify Ranger Is Running
docker compose --project-directory C:\reg_repo\platform-solution\corep-governance-pipeline ps | grep ranger # Expected: ranger running 0.0.0.0:6080->6080/tcp # Default credentials — CHANGE IN PRODUCTION # URL: http://localhost:6080 # User: admin Password: rangerR0cks!
3.2 Install the Ranger Trino Plugin
The Ranger Trino plugin is a JAR that sits in Trino’s plugin directory and intercepts the system access control interface. Your docker/trino/config.properties already has the hook. Now you configure it.
# docker/trino/ranger-trino-plugin/ranger-trino-security.xml # This file tells the Trino plugin where to find the Ranger Admin server <?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>ranger.plugin.trino.service.name</name> <value>corep-trino</value> <!-- Must match the service name created in Ranger Admin UI --> </property> <property> <name>ranger.plugin.trino.policy.source.impl</name> <value>org.apache.ranger.admin.client.RangerAdminRESTClient</value> </property> <property> <name>ranger.plugin.trino.policy.rest.url</name> <value>http://ranger:6080</value> <!-- ranger is the Docker service name --> </property> <property> <name>ranger.plugin.trino.policy.pollIntervalMs</name> <value>30000</value> <!-- Trino pulls policy updates every 30 seconds --> </property> <property> <name>ranger.plugin.trino.access.cluster.name</name> <value>corep-governance-pipeline</value> </property> </configuration>
# docker/trino/ranger-trino-plugin/ranger-trino-audit.xml # Configures audit log destination — write to local file + MinIO S3 <?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>xasecure.audit.is.enabled</name> <value>true</value> </property> <property> <name>xasecure.audit.solr.is.enabled</name> <value>false</value> <!-- Not using Solr in this stack --> </property> <property> <name>xasecure.audit.hdfs.is.enabled</name> <value>false</value> </property> <property> <name>xasecure.audit.log4j.is.enabled</name> <value>true</value> <!-- Write to container log — Docker log driver captures it --> </property> <property> <name>xasecure.audit.destination.db.is.enabled</name> <value>true</value> </property> <property> <name>xasecure.audit.destination.db.jdbc.url</name> <value>jdbc:postgresql://postgres:5432/corep</value> </property> <property> <name>xasecure.audit.destination.db.jdbc.driver</name> <value>org.postgresql.Driver</value> </property> <property> <name>xasecure.audit.destination.db.user</name> <value>corep_admin</value> </property> <property> <name>xasecure.audit.destination.db.password</name> <value>${POSTGRES_PASSWORD}</value> <!-- Injected from Docker env at container start --> </property> </configuration>
3.3 Register the Trino Service in Ranger
# catalog/scripts/create_ranger_service.py # Run once to register the Trino service in Ranger Admin import requests, os, json RANGER_URL = os.environ.get("RANGER_URL", "http://localhost:6080") RANGER_AUTH = ( os.environ.get("RANGER_ADMIN_USER", "admin"), os.environ.get("RANGER_ADMIN_PASSWORD", "rangerR0cks!"), ) HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} service_payload = { "name": "corep-trino", "displayName": "COREP Governance Pipeline — Trino", "type": "trino", "description": "Trino query engine for COREP regulatory pipeline. All analyst queries go through this service.", "isEnabled": True, "configs": { "username": "trino_ranger_user", # service account Ranger uses to validate resources "password": os.environ.get("TRINO_RANGER_PASSWORD", ""), "jdbc.driverClassName": "io.trino.jdbc.TrinoDriver", "jdbc.url": "jdbc:trino://trino:8080", }, } r = requests.post( f"{RANGER_URL}/service/plugins/services", json=service_payload, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() service_id = r.json()["id"] print(f"Ranger service created: corep-trino (id={service_id})")
4. Defining Banking Roles
A banking data governance pipeline has a small but clearly defined set of consumer roles. Each maps to a real job function and a real set of data access requirements.
| Role | Job function | What they need | What they must NOT see |
|---|---|---|---|
corep_reporting | Regulatory reporting team — submits COREP to ECB/NCAs | Full access to mart.*, read-only intermediate.* | raw.* PII columns (name, lei) |
risk_analyst | Risk management — analyses capital adequacy, LCR | mart.*, staging.* aggregates, no individual borrower data | raw.loans individual rows, raw.counterparties.lei |
data_engineer | Pipeline development and maintenance | All schemas, all columns — for debugging | Nothing (but all access audited) |
auditor | Internal audit, external audit firms, ECB inspectors | audit.*, read-only mart.* and lineage, GX data docs | raw PII columns, staging.stg_counterparties (even though PII-free, still restricted) |
pipeline_service | Airflow DAG service account | All schemas — pipeline must write raw.*, read mart.* | Nothing (service account, not human) |
# catalog/scripts/create_ranger_roles.py ROLES = [ {"name": "corep_reporting", "description": "Regulatory reporting team — COREP submission"}, {"name": "risk_analyst", "description": "Risk management — capital and liquidity analysis"}, {"name": "data_engineer", "description": "Pipeline engineering team — full access with audit"}, {"name": "auditor", "description": "Internal/external audit and ECB SREP inspectors"}, {"name": "pipeline_service","description": "Airflow DAG service account"}, ] for role in ROLES: r = requests.post( f"{RANGER_URL}/service/roles/roles", json={ "name": role["name"], "description": role["description"], "users": [], # assign users via LDAP/SCIM in production "groups": [], "roles": [], }, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() print(f" Role created: {role['name']}")
5. Column Masking Policies — The Core GDPR Enforcement
Column masking is Ranger’s most powerful feature for GDPR compliance. Instead of completely hiding a column (which breaks some queries), masking replaces the value with a transformed version — a hash, a partial value, or a NULL — so the query structure works but the personal data is not exposed.
The EBA’s COREP validation rules require LEI in some C 02.00 rows. The solution: the corep_reporting role sees the real LEI because they are the submission team. The risk_analyst role sees a SHA-256 hash — consistent for joining purposes but not reversible to the actual legal entity identity.
postgresql / schema=raw / table=counterparties / column=leiApplies to roles:
risk_analyst, auditorMask type: HASH (SHA-256)
Regulatory basis: GDPR Article 25 — data protection by design; GDPR Article 4(5) — pseudonymisation
postgresql / schema=raw / table=counterparties / column=nameApplies to roles:
risk_analyst, auditorMask type: PARTIAL — first 3 chars + “***” (e.g. “Deutsche Bank AG” → “Deu***”)
Regulatory basis: GDPR Article 4(1) — personal data minimisation
postgresql / schema=raw / table=counterparties / column=leiApplies to roles:
corep_reporting, data_engineer, pipeline_serviceAccess: SELECT (unmasked)
Justification: COREP C 02.00 EBA submission requires full unmasked LEI per EBA Validation Rule v0001
# catalog/scripts/create_ranger_policies.py — column masking policies def create_column_mask_policy( name: str, schema: str, table: str, column: str, roles: list, mask_type: str, # "MASK" (hash), "MASK_SHOW_FIRST_4", "MASK_NULL", "CUSTOM" mask_expr: str = "", # for CUSTOM mask type only ) -> dict: return { "service": "corep-trino", "name": name, "policyType": 1, # 0=Access, 1=Masking, 2=RowFilter "description": f"Column masking: {schema}.{table}.{column}", "isEnabled": True, "resources": { "catalog": {"values": ["postgresql"], "isExcludes": False, "isRecursive": False}, "schema": {"values": [schema], "isExcludes": False, "isRecursive": False}, "table": {"values": [table], "isExcludes": False, "isRecursive": False}, "column": {"values": [column], "isExcludes": False, "isRecursive": False}, }, "dataMaskPolicyItems": [ { "dataMaskInfo": { "dataMaskType": mask_type, "valueExpr": mask_expr, }, "roles": roles, "groups": [], "users": [], "accesses": [{"type": "select", "isAllowed": True}], "conditions": [], "delegateAdmin": False, } ], } MASK_POLICIES = [ # LEI → SHA-256 hash for analysts (pseudonymised, not anonymised) create_column_mask_policy( name="mask-lei-analyst", schema="raw", table="counterparties", column="lei", roles=["risk_analyst", "auditor"], mask_type="MASK", # Ranger built-in SHA-256 hash ), # Counterparty name → first 3 chars + *** for analysts create_column_mask_policy( name="mask-name-analyst", schema="raw", table="counterparties", column="name", roles=["risk_analyst", "auditor"], mask_type="MASK_SHOW_FIRST_4", ), # Borrower name in loans → NULL for all non-engineer roles create_column_mask_policy( name="mask-borrower-name", schema="raw", table="loans", column="borrower_name", roles=["risk_analyst", "corep_reporting", "auditor"], mask_type="MASK_NULL", ), # Borrower ID → hash (still usable for COUNT DISTINCT, GROUP BY) create_column_mask_policy( name="mask-borrower-id", schema="raw", table="loans", column="borrower_id", roles=["risk_analyst", "corep_reporting", "auditor"], mask_type="MASK", ), ] for policy in MASK_POLICIES: r = requests.post( f"{RANGER_URL}/service/plugins/policies", json=policy, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() print(f" Masking policy created: {policy['name']}")
6. Row-Level Filter Policies
Row filters restrict which rows a role can see, even when they have column access. In a banking pipeline, row filters enforce data domain boundaries — a risk analyst in the credit risk team should not see market risk exposure rows, and a front-office analyst should not see internal treasury positions.
postgresql / schema=raw / table=rwa_exposuresApplies to role:
risk_analyst (when tagged as credit risk)Filter expression:
exposure_class IN ('central_governments','institutions','corporates','retail','real_estate')Excludes:
equity and other exposure classes (market risk team’s domain)Regulatory basis: CRR Article 112 — exposure class assignments; CRD IV Article 74 — internal governance
postgresql / schema=raw / table=capital_instrumentsApplies to role:
risk_analystFilter expression:
tier IN ('CET1', 'AT1', 'T2')Effect: Excludes any instruments in non-standard tiers or data errors — analyst always sees clean data
# catalog/scripts/create_ranger_policies.py — row filter policies def create_row_filter_policy( name: str, schema: str, table: str, roles: list, filter_expr: str, ) -> dict: return { "service": "corep-trino", "name": name, "policyType": 2, # 2 = RowFilter "description": f"Row filter: {schema}.{table} for {', '.join(roles)}", "isEnabled": True, "resources": { "catalog": {"values": ["postgresql"], "isExcludes": False, "isRecursive": False}, "schema": {"values": [schema], "isExcludes": False, "isRecursive": False}, "table": {"values": [table], "isExcludes": False, "isRecursive": False}, }, "rowFilterPolicyItems": [ { "rowFilterInfo": {"filterExpr": filter_expr}, "roles": roles, "groups": [], "users": [], "accesses": [{"type": "select", "isAllowed": True}], "conditions": [], "delegateAdmin": False, } ], } ROW_FILTER_POLICIES = [ create_row_filter_policy( name="filter-rwa-credit-only", schema="raw", table="rwa_exposures", roles=["risk_analyst"], filter_expr="exposure_class IN ('central_governments','institutions','corporates','retail','real_estate')", ), create_row_filter_policy( name="filter-capital-valid-tiers", schema="raw", table="capital_instruments", roles=["risk_analyst", "corep_reporting"], filter_expr="tier IN ('CET1', 'AT1', 'T2')", ), create_row_filter_policy( name="filter-liquidity-hqla-only", schema="raw", table="liquidity_assets", roles=["risk_analyst"], filter_expr="hqla_level IN ('1', '2A', '2B')", ), ] for policy in ROW_FILTER_POLICIES: r = requests.post( f"{RANGER_URL}/service/plugins/policies", json=policy, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() print(f" Row filter policy created: {policy['name']}")
7. Access Allow Policies — Schema-Level RBAC
# Schema-level access policy — corep_reporting can see mart.* and intermediate.* # but NOT raw.* (only the data_engineer and pipeline_service can) ACCESS_POLICIES = [ { "service": "corep-trino", "name": "allow-mart-to-reporting", "policyType": 0, "isEnabled": True, "resources": { "catalog": {"values": ["postgresql"]}, "schema": {"values": ["mart", "intermediate"]}, "table": {"values": ["*"]}, "column": {"values": ["*"]}, }, "policyItems": [{ "roles": ["corep_reporting", "risk_analyst", "auditor"], "accesses": [{"type": "select", "isAllowed": True}], "conditions": [], "delegateAdmin": False, }], }, { "service": "corep-trino", "name": "allow-raw-to-engineers-only", "policyType": 0, "isEnabled": True, "resources": { "catalog": {"values": ["postgresql"]}, "schema": {"values": ["raw"]}, "table": {"values": ["*"]}, "column": {"values": ["*"]}, }, "policyItems": [{ "roles": ["data_engineer", "pipeline_service"], "accesses": [ {"type": "select", "isAllowed": True}, {"type": "insert", "isAllowed": True}, {"type": "update", "isAllowed": True}, {"type": "delete", "isAllowed": True}, {"type": "create", "isAllowed": True}, {"type": "drop", "isAllowed": True}, ], "conditions": [], "delegateAdmin": False, }], # Explicit deny for everyone else — deny overrides allow in Ranger "denyPolicyItems": [{ "roles": ["risk_analyst", "corep_reporting", "auditor"], "accesses": [{"type": "select", "isAllowed": True}], "conditions": [], "delegateAdmin": False, }], }, { "service": "corep-trino", "name": "allow-audit-schema-to-auditors", "policyType": 0, "isEnabled": True, "resources": { "catalog": {"values": ["postgresql"]}, "schema": {"values": ["audit"]}, "table": {"values": ["*"]}, "column": {"values": ["*"]}, }, "policyItems": [{ "roles": ["auditor", "data_engineer"], "accesses": [{"type": "select", "isAllowed": True}], "conditions": [], "delegateAdmin": False, }], }, ] for policy in ACCESS_POLICIES: r = requests.post( f"{RANGER_URL}/service/plugins/policies", json=policy, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() print(f" Access policy created: {policy['name']}")
8. The security.py Module
""" modules/security.py — Apply and verify Ranger security policies as a pipeline step. This module: - Creates the Ranger service if it does not exist - Pushes all column masking, row filter, and access allow policies - Verifies policies are active by polling the Ranger API - Exports current policy state to MinIO for audit evidence """ import json, logging, os, time from pathlib import Path import requests from modules.base import BaseModule log = logging.getLogger(__name__) RANGER_URL = os.environ.get("RANGER_URL", "http://localhost:6080") RANGER_AUTH = ( os.environ.get("RANGER_ADMIN_USER", "admin"), os.environ.get("RANGER_ADMIN_PASSWORD", "rangerR0cks!"), ) HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} POLICY_DIR = Path(os.environ.get("RANGER_POLICY_DIR", "catalog/ranger_policies")) class SecurityModule(BaseModule): MODULE_NAME = "security" # Policy JSON files — loaded from POLICY_DIR # Each file is a Ranger policy payload (policyType 0/1/2) _POLICY_FILES = [ "access_allow_policies.json", "column_mask_policies.json", "row_filter_policies.json", ] def input_check(self) -> None: """Verify Ranger Admin is reachable and policy files exist.""" try: r = requests.get( f"{RANGER_URL}/service/plugins/services", auth=RANGER_AUTH, timeout=10 ) r.raise_for_status() except Exception as exc: raise RuntimeError( f"[security] Ranger Admin not reachable at {RANGER_URL}: {exc}" ) missing = [ str(POLICY_DIR / f) for f in self._POLICY_FILES if not (POLICY_DIR / f).exists() ] if missing: raise RuntimeError( "[security] Missing policy JSON files: " + ", ".join(missing) + ". Run: python catalog/scripts/export_ranger_policies.py" ) log.info("[security] Ranger reachable. %d policy files present.", len(self._POLICY_FILES)) def _execute(self) -> None: """Push all policies to Ranger. Existing policies are updated (PUT), new ones created (POST).""" for policy_file in self._POLICY_FILES: policies = json.loads((POLICY_DIR / policy_file).read_text()) log.info("[security] Applying %d policies from %s", len(policies), policy_file) for policy in policies: existing = self._get_existing_policy(policy["name"]) if existing: policy["id"] = existing["id"] r = requests.put( f"{RANGER_URL}/service/plugins/policies/{existing['id']}", json=policy, headers=HEADERS, auth=RANGER_AUTH, ) else: r = requests.post( f"{RANGER_URL}/service/plugins/policies", json=policy, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() log.info( "[security] Policy %s: %s", policy["name"], "updated" if existing else "created", ) time.sleep(5) # Trino polls Ranger every 30s; allow propagation before output_check self._export_policies_to_minio() def _get_existing_policy(self, name: str) -> dict | None: r = requests.get( f"{RANGER_URL}/service/plugins/policies", params={"serviceName": "corep-trino", "policyName": name}, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() policies = r.json().get("policies", []) return policies[0] if policies else None def _export_policies_to_minio(self) -> None: """Export current Ranger policy state to MinIO for versioned audit evidence.""" try: from minio import Minio import io from datetime import datetime, timezone r = requests.get( f"{RANGER_URL}/service/plugins/policies", params={"serviceName": "corep-trino"}, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() policy_json = json.dumps(r.json(), indent=2).encode() client = Minio( os.environ.get("MINIO_ENDPOINT", "minio:9000"), access_key=os.environ.get("MINIO_ROOT_USER", "minioadmin"), secret_key=os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin"), secure=False, ) bucket = "corep-ranger-audit" if not client.bucket_exists(bucket): client.make_bucket(bucket) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") object_name = f"policies/ranger_policies_{ts}.json" client.put_object( bucket, object_name, io.BytesIO(policy_json), length=len(policy_json), content_type="application/json", ) log.info("[security] Ranger policies exported → minio://%s/%s", bucket, object_name) except Exception as exc: log.warning("[security] MinIO export failed (non-fatal): %s", exc) def emit_lineage(self) -> None: log.info("[security] No lineage event — policy enforcement is orthogonal to data lineage.") def output_check(self) -> None: """Verify all expected policy names are active in Ranger.""" expected = {"mask-lei-analyst", "mask-name-analyst", "mask-borrower-name", "filter-rwa-credit-only", "allow-mart-to-reporting", "allow-raw-to-engineers-only", "allow-audit-schema-to-auditors"} r = requests.get( f"{RANGER_URL}/service/plugins/policies", params={"serviceName": "corep-trino"}, headers=HEADERS, auth=RANGER_AUTH, ) r.raise_for_status() active = {p["name"] for p in r.json().get("policies", [])} missing = expected - active if missing: raise RuntimeError( f"[security] Policies not active in Ranger: {', '.join(missing)}" ) log.info("[security] All %d policies verified active in Ranger.", len(expected))
9. Testing Policies via Trino
After security.py runs, verify the policies work exactly as designed. Connect to Trino as different roles and confirm the masking behaviour.
-- Connect as risk_analyst role -- Trino CLI: trino --user risk_analyst_user --catalog postgresql --schema raw -- Test 1: Column masking on lei SELECT instrument_id, lei, name FROM raw.counterparties LIMIT 3; -- Expected output for risk_analyst_user: -- instrument_id | lei | name -- CP001 | a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae | Deu*** -- CP002 | 2c624232cdd221771294dfbb310acbc8d2e4b5f0d1af56af893d69ccb20b43 | Bar*** -- CP003 | 19581e27de7ced00ff1ce50b2047e7a567c76b1cbaebabe5ef03f7c3017bb5b | Soc*** -- Note: lei is SHA-256 hash. name is truncated. GDPR enforced. -- Test 2: Row filter on rwa_exposures SELECT DISTINCT exposure_class FROM raw.rwa_exposures; -- Expected: only credit classes — equity and other are filtered out -- Test 3: Schema-level deny for raw schema (analyst tries to access raw directly) SELECT * FROM raw.capital_instruments LIMIT 1; -- Expected: ERROR: Access Denied: Cannot select from table: postgresql.raw.capital_instruments -- (raw schema is blocked — analyst must go through mart) -- Test 4: Mart access works for risk_analyst (unmasked, aggregated — no PII) SELECT * FROM mart.corep_c0300; -- Expected: full result — cet1_ratio, tier1_ratio, etc. No PII in mart.
-- Connect as corep_reporting role -- Test 5: Reporting role sees full unmasked LEI SELECT lei FROM raw.counterparties LIMIT 3; -- Expected: 529900T8BM49AURSDO55 (real LEI — unmasked) -- corep_reporting has the ALLOW policy for this column -- Test 6: Reporting role cannot see raw.loans borrower data (masked to NULL) SELECT borrower_name, borrower_id FROM raw.loans LIMIT 3; -- Expected: NULL | a665a45920... (name NULL, id hashed)
10. The Ranger Audit Log — Supervisory Evidence
Every query Ranger evaluates — whether allowed, masked, or denied — is written to the audit log. This log is your evidence for ECB SREP Question: “Can you demonstrate that PII columns were not accessible to unauthorised users during the reporting period?”
| Audit field | Example value | Supervisory purpose |
|---|---|---|
eventTime | 2026-05-07T08:14:33Z | Timestamp for every access event |
user | risk_analyst_user | Identifies who made the query |
accessType | SELECT | Type of operation attempted |
resourcePath | postgresql/raw/counterparties/lei | Exact column accessed |
accessResult | ALLOWED | Whether access was granted (with mask applied) |
policyId | 42 | Which policy made the decision — traceable to version in MinIO |
tags | GDPR.PII | Tags active on the resource at decision time |
clusterName | corep-governance-pipeline | Identifies the pipeline cluster |
-- Query Ranger audit log from Trino (audit schema in PostgreSQL) -- Auditors can run this themselves — no Ranger admin access needed SELECT event_time, request_user, access_type, resource_path, access_result, policy_id, tags FROM audit.ranger_access_audit WHERE event_time >= DATE_TRUNC('month', CURRENT_DATE) -- current reporting month AND resource_path LIKE '%lei%' -- all LEI column accesses ORDER BY event_time DESC LIMIT 100; -- Cross-check: any ALLOW on lei where user is not in corep_reporting or data_engineer? SELECT DISTINCT request_user, access_result, policy_id FROM audit.ranger_access_audit WHERE resource_path LIKE '%lei%' AND access_result = 'ALLOWED' AND request_user NOT IN ( SELECT username FROM audit.role_assignments WHERE role_name IN ('corep_reporting', 'data_engineer', 'pipeline_service') ); -- This query should always return 0 rows. If it doesn't, you have a policy gap.
11. Complete Policy Inventory
| Policy name | Type | Resource | Roles affected | Effect |
|---|---|---|---|---|
mask-lei-analyst | Mask | raw.counterparties.lei | risk_analyst, auditor | SHA-256 hash |
mask-name-analyst | Mask | raw.counterparties.name | risk_analyst, auditor | First 4 chars + *** |
mask-borrower-name | Mask | raw.loans.borrower_name | risk_analyst, corep_reporting, auditor | NULL |
mask-borrower-id | Mask | raw.loans.borrower_id | risk_analyst, corep_reporting, auditor | SHA-256 hash |
filter-rwa-credit-only | Row Filter | raw.rwa_exposures | risk_analyst | Credit classes only |
filter-capital-valid-tiers | Row Filter | raw.capital_instruments | risk_analyst, corep_reporting | CET1/AT1/T2 only |
filter-liquidity-hqla-only | Row Filter | raw.liquidity_assets | risk_analyst | HQLA levels 1/2A/2B only |
allow-mart-to-reporting | Allow | mart.*, intermediate.* | corep_reporting, risk_analyst, auditor | Full SELECT |
allow-raw-to-engineers-only | Allow + Deny | raw.* | engineer: full; others: DENY | Schema segregation |
allow-audit-schema-to-auditors | Allow | audit.* | auditor, data_engineer | SELECT only |
12. Ranger Policies Mapped to Regulations
| Regulation | Specific article / principle | Ranger policy type | How it is satisfied |
|---|---|---|---|
| GDPR Article 25 | Data protection by design and by default | Column mask | PII columns masked by default for all non-submission roles — analyst cannot opt out |
| GDPR Article 4(5) | Pseudonymisation definition | Column mask (HASH) | SHA-256 hash of LEI is pseudonymisation — reversible only with the original value, not by the analyst |
| GDPR Article 30 | Records of processing activities | Audit log → MinIO | Every column access logged with timestamp, user, policy ID — persistent evidence |
| GDPR Article 32 | Security of processing — appropriate technical measures | All policy types | Ranger enforcement at query layer is a documented technical measure |
| CRD IV Article 74 | Internal governance — data access controls | Row filter + allow | Role-based schema access matches organisational data ownership boundaries |
| BCBS 239 Principle 2 | Data architecture aligned to risk data needs | Row filter | Exposure class row filter enforces domain boundaries between credit and market risk teams |
| EBA GL on Internal Governance | Data governance framework — access controls documented | Policy-as-code in MinIO | Policies versioned as JSON in MinIO with timestamps — auditable policy history |
📚 Day 10 Key Takeaways
- Tags without enforcement are documentation, not security. The GDPR.PII tags from Day 9 become enforcement only when Ranger’s masking policy reads them and acts on them at query time.
- Column masking > column hiding. A masked SHA-256 LEI still lets analysts do
COUNT DISTINCTandGROUP BYcorrectly. A completely hidden column breaks every query that joins on it. Masking is the regulatory-compliant middle ground. - Trino as the single query surface is what makes Ranger practical. One enforcement point — not one enforcement point per data source per schema per team.
- Policy-as-code in MinIO gives you a versioned, timestamped history of who was allowed to see what, and when the policy changed. This is what a GDPR supervisory authority examines in a data breach investigation.
- Row filters enforce domain boundaries without requiring separate physical tables — credit risk and market risk see the same raw table but different rows, governed by their organisational role.
- The Ranger audit log is regulatory evidence — query it from Trino via the
auditschema. Auditors can self-serve without needing Ranger admin credentials. - Next: Day 11 — XBRL generation with Arelle: converting your mart tables into a valid EBA COREP XBRL instance document, complete with EBA taxonomy validation.

