Skip to main content

ADR-008: EBA Risk Matrix Engine

Status: Proposed
Date: 2026-03-28
Author: Atlas Architecture
Supersedes: Current flat risk scoring in risk_scoring.py


Context

Atlas currently computes a single 0–100 risk score per investigation using a flat, additive algorithm in risk_scoring.py. This approach has three limitations:

  1. No EBA alignment. The EBA ML/TF Risk Factors Guidelines (EBA/GL/2021/02, consolidated with EBA/GL/2023/03 and EBA/GL/2024/01) require obliged entities to assess risk across distinct dimensions: customer, geographic, product/service, delivery channel, and transaction. Atlas collapses all of these into one number, making it difficult for compliance officers to demonstrate a structured risk-based approach to regulators.

  2. No versioning. When a compliance team adjusts risk weights or adds new risk factors, the old scoring logic is overwritten. There is no way to know which version of the matrix was active when a company was evaluated, which breaks audit trails.

  3. No configurability. Risk weights are hardcoded in Python. Different customers (or even different business lines within the same customer) may need different matrices — a payment institution's geographic risk factors differ from a corporate finance firm's.

This ADR introduces a versioned, schema-driven EBA risk matrix that integrates with the existing ontology, the rule engine (ADR feature gap spec), and the workflow engine (ADR-007).


Decision

Build a versioned EBA risk matrix system with the following properties:

  • Configurable dimensions and factors defined in a YAML/JSON schema, following the entity ontology pattern.
  • Deterministic, idempotent scoring — same inputs always produce the same matrix output, regardless of when or how many times the computation runs.
  • Immutable matrix versions — once published, a matrix version is frozen. Edits create a new version.
  • Single published version per schema line — at most one version of a given schema_id may be published at a time.
  • Company-to-version binding — each evaluation pins the company to the exact matrix version used, creating a permanent audit record.
  • Ontology mapping — risk factors map to entity ontology properties and investigation module outputs, so agents can populate the matrix automatically.
  • Snapshotted reference data — external risk lists used in scoring are resolved and frozen into the published matrix version.
  • Manual override capability — analysts can override individual factor scores with documented justification, producing a new derived evaluation rather than mutating the original record.

EBA Risk Dimension Model

The EBA ML/TF Risk Factors Guidelines (Guideline 2, sections 2.3–2.21) define five risk factor categories. Each category contains multiple risk factors, and each factor has risk-increasing and risk-decreasing indicators.

Dimensions

DimensionEBA ReferenceDescriptionAtlas Source Modules
CustomerGL 2.3–2.8Nature of the customer, ownership structure, behaviour, reputationCIR, MEBO, SPEPWS, AMLRR
GeographicGL 2.9–2.15Countries/jurisdictions linked to the customer, operations, transactionsCIR, ROA, MEBO
Product / ServiceGL 2.16–2.17Products or services the customer uses or providesCIR, FRLS
Delivery ChannelGL 2.18–2.19How the business relationship is conductedCIR, DFWO
TransactionGL 2.20–2.21Nature and pattern of transactionsFRLS

Each dimension receives its own score (0–100). The overall risk level is derived from a configurable aggregation function applied to the dimension scores — not a simple average.


Database Schema

All new tables. No modifications to existing tables.

Core Tables

-- V56__risk_matrix_schemas.sql

-- ============================================================
-- Matrix template: the configurable risk matrix definition
-- ============================================================
CREATE TABLE risk_matrix_schemas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
schema_id VARCHAR(100) NOT NULL, -- e.g. 'eba_standard_v1'
version INTEGER NOT NULL DEFAULT 1,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'draft',
-- 'draft', 'published', 'archived'

-- The full matrix definition (dimensions, factors, weights, indicators)
matrix_definition JSONB NOT NULL,

-- Aggregation configuration
aggregation_config JSONB NOT NULL DEFAULT '{
"method": "weighted_max",
"dimension_weights": {
"customer": 0.30,
"geographic": 0.25,
"product_service": 0.20,
"delivery_channel": 0.10,
"transaction": 0.15
},
"risk_levels": {
"critical": {"min": 90, "max": 100},
"high": {"min": 70, "max": 89},
"medium": {"min": 40, "max": 69},
"low": {"min": 20, "max": 39},
"clear": {"min": 0, "max": 19}
}
}',

-- Frozen external reference data used by this matrix version
-- Example: FATF lists, EU high-risk countries, CPI-derived country buckets
reference_data_snapshot JSONB NOT NULL DEFAULT '{}',

-- Metadata
regulatory_basis VARCHAR(100), -- 'EBA/GL/2021/02'
applicable_entity_types TEXT[] DEFAULT '{LegalEntity}',

-- Authorship and audit
created_by VARCHAR(100),
published_by VARCHAR(100),
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),

UNIQUE(schema_id, version)
);

CREATE INDEX idx_rms_status ON risk_matrix_schemas(status);
CREATE INDEX idx_rms_schema_id ON risk_matrix_schemas(schema_id);
CREATE UNIQUE INDEX uq_rms_single_published
ON risk_matrix_schemas(schema_id)
WHERE status = 'published';

-- Prevent modification of published schemas
CREATE OR REPLACE FUNCTION prevent_published_matrix_update()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.status = 'published' AND NEW.status NOT IN ('published', 'archived') THEN
RAISE EXCEPTION 'Published risk matrix schemas may only transition to archived.';
END IF;
IF OLD.status = 'published' AND NEW.status = 'published' THEN
IF NEW.matrix_definition IS DISTINCT FROM OLD.matrix_definition
OR NEW.aggregation_config IS DISTINCT FROM OLD.aggregation_config
OR NEW.reference_data_snapshot IS DISTINCT FROM OLD.reference_data_snapshot
OR NEW.name IS DISTINCT FROM OLD.name THEN
RAISE EXCEPTION 'Cannot modify a published risk matrix schema. Create a new version instead.';
END IF;
IF NEW.description IS DISTINCT FROM OLD.description
OR NEW.regulatory_basis IS DISTINCT FROM OLD.regulatory_basis
OR NEW.applicable_entity_types IS DISTINCT FROM OLD.applicable_entity_types
OR NEW.schema_id IS DISTINCT FROM OLD.schema_id
OR NEW.version IS DISTINCT FROM OLD.version THEN
RAISE EXCEPTION 'Cannot modify metadata of a published risk matrix schema. Create a new version instead.';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_prevent_published_matrix_update
BEFORE UPDATE ON risk_matrix_schemas
FOR EACH ROW
EXECUTE FUNCTION prevent_published_matrix_update();


-- ============================================================
-- Matrix evaluation: a completed risk assessment for a company
-- ============================================================
CREATE TABLE risk_matrix_evaluations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

-- What was evaluated
company_id UUID NOT NULL,
entity_id UUID, -- optional: specific ontology entity
investigation_id UUID REFERENCES investigations(id) ON DELETE SET NULL,

-- Which matrix version was used (immutable reference)
matrix_schema_id UUID NOT NULL REFERENCES risk_matrix_schemas(id) ON DELETE RESTRICT,

-- Snapshot of the matrix definition at evaluation time
-- (defense-in-depth: even if schema table is corrupted, the
-- evaluation record contains the exact rules that were applied)
matrix_version_snapshot JSONB NOT NULL,

-- Dimension scores (each 0–100)
dimension_scores JSONB NOT NULL,
-- Example:
-- {
-- "customer": {"score": 65, "level": "medium", "factors": [...]},
-- "geographic": {"score": 82, "level": "high", "factors": [...]},
-- "product_service": {"score": 30, "level": "low", "factors": [...]},
-- "delivery_channel": {"score": 15, "level": "clear", "factors": [...]},
-- "transaction": {"score": 45, "level": "medium", "factors": [...]}
-- }

-- Aggregated overall score
overall_score INTEGER NOT NULL, -- 0–100
overall_level VARCHAR(20) NOT NULL, -- 'critical','high','medium','low','clear'

-- Determinism proof
input_hash VARCHAR(64) NOT NULL, -- SHA-256 of canonicalized input data only
override_hash VARCHAR(64) NOT NULL, -- SHA-256 of canonicalized overrides
evaluation_fingerprint VARCHAR(64) NOT NULL, -- matrix_schema_id + input_hash + override_hash
output_hash VARCHAR(64) NOT NULL, -- SHA-256 of dimension_scores + overall

-- Populated by
populated_by VARCHAR(20) NOT NULL DEFAULT 'agent',
-- 'agent' (auto-populated from investigation), 'manual', 'hybrid'

-- Analyst overrides (if any)
overrides JSONB DEFAULT '[]',
-- [{
-- "dimension": "customer",
-- "factor_id": "ownership_complexity",
-- "original_score": 20,
-- "override_score": 60,
-- "justification": "Nominee structure identified in manual review",
-- "overridden_by": "analyst@example.com",
-- "overridden_at": "2026-03-28T10:00:00Z"
-- }]

-- Status
status VARCHAR(20) NOT NULL DEFAULT 'completed',
-- 'pending', 'completed', 'overridden', 'superseded'

-- If this evaluation was produced from an existing one
derived_from_evaluation_id UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT,

created_at TIMESTAMPTZ DEFAULT NOW(),
superseded_at TIMESTAMPTZ,
superseded_by UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT
);

CREATE INDEX idx_rme_company ON risk_matrix_evaluations(company_id, created_at DESC);
CREATE INDEX idx_rme_schema ON risk_matrix_evaluations(matrix_schema_id);
CREATE INDEX idx_rme_investigation ON risk_matrix_evaluations(investigation_id);
CREATE INDEX idx_rme_status ON risk_matrix_evaluations(status);
CREATE INDEX idx_rme_input_hash ON risk_matrix_evaluations(input_hash);
CREATE UNIQUE INDEX uq_rme_fingerprint
ON risk_matrix_evaluations(matrix_schema_id, evaluation_fingerprint);


-- ============================================================
-- Company-to-matrix-version binding
-- ============================================================
CREATE TABLE company_matrix_assignments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL,
matrix_schema_id UUID NOT NULL REFERENCES risk_matrix_schemas(id) ON DELETE RESTRICT,

-- The evaluation that established this assignment
evaluation_id UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT,

-- Active period (supports temporal queries)
effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
effective_until TIMESTAMPTZ, -- NULL = currently active

-- Why this assignment was made
assignment_reason VARCHAR(50) NOT NULL DEFAULT 'initial_evaluation',
-- 'initial_evaluation', 'periodic_review', 'matrix_upgrade',
-- 'regulatory_change', 'manual_reassignment'

assigned_by VARCHAR(100),
created_at TIMESTAMPTZ DEFAULT NOW(),

-- Only one active assignment per company at a time
CONSTRAINT uq_active_assignment
EXCLUDE USING gist (
company_id WITH =,
tstzrange(effective_from, COALESCE(effective_until, 'infinity'::timestamptz)) WITH &&
)
);

CREATE INDEX idx_cma_company_active
ON company_matrix_assignments(company_id)
WHERE effective_until IS NULL;
CREATE INDEX idx_cma_schema ON company_matrix_assignments(matrix_schema_id);

Matrix Definition Schema

The matrix_definition JSONB column in risk_matrix_schemas follows a strict structure. This is the "ontology" of the risk matrix — it defines what gets scored and how.

# config/risk_matrix_schemas/eba_standard_v1.yaml

schema_id: eba_standard_v1
version: 1
name: "EBA Standard ML/TF Risk Matrix"
regulatory_basis: "EBA/GL/2021/02 consolidated"

dimensions:
customer:
label: "Customer Risk"
eba_reference: "GL 2.3–2.8"
weight: 0.30
factors:

- id: ownership_complexity
label: "Ownership Structure Complexity"
description: "Complex or opaque ownership structures, nominee shareholders, bearer shares"
max_score: 25
weight: 1.0
# Ontology mapping: how agents find the data
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "ownership_structure.layers"
indicator: "greater_than"
thresholds: [{ value: 3, score: 25 }, { value: 2, score: 15 }, { value: 1, score: 5 }]
- path: "ownership_structure.has_nominee"
indicator: "equals"
thresholds: [{ value: true, score: 20 }]
- path: "ownership_structure.has_bearer_shares"
indicator: "equals"
thresholds: [{ value: true, score: 25 }]
# Investigation module mapping: which module outputs feed this factor
module_mapping:
module: MEBO
fields:
- "ownership_chain_depth"
- "nominee_shareholders_detected"
- "circular_ownership_detected"
# Risk indicators from existing system
risk_indicator_mapping:
- "complex_ownership"
- "nominee_shareholders"
- "hidden_ubos"

- id: pep_exposure
label: "PEP Exposure"
description: "Politically exposed persons in ownership or management"
max_score: 30
weight: 1.0
ontology_mapping:
entity_type: Person
fields:
- path: "is_pep"
indicator: "equals"
thresholds: [{ value: true, score: 30 }]
- path: "pep_level"
indicator: "in"
thresholds:
- { value: ["head_of_state", "senior_government"], score: 30 }
- { value: ["member_of_parliament", "senior_military"], score: 25 }
- { value: ["regional_political", "senior_judicial"], score: 20 }
- { value: ["family_member", "close_associate"], score: 15 }
module_mapping:
module: SPEPWS
fields:
- "pep_matches"
- "pep_classification"
risk_indicator_mapping:
- "pep_exposure"

- id: sanctions_exposure
label: "Sanctions Exposure"
description: "Direct or indirect sanctions list matches"
max_score: 50
weight: 1.0
ontology_mapping:
entity_type: SanctionsMatch
fields:
- path: "match_type"
indicator: "in"
thresholds:
- { value: ["exact_match"], score: 50 }
- { value: ["strong_match"], score: 40 }
- { value: ["partial_match"], score: 20 }
module_mapping:
module: SPEPWS
fields:
- "sanctions_matches"
- "sanctions_match_confidence"
risk_indicator_mapping:
- "sanctions_match"

- id: adverse_media
label: "Adverse Media & Reputation"
description: "Negative news coverage, litigation, fraud allegations"
max_score: 25
weight: 1.0
module_mapping:
module: AMLRR
fields:
- "adverse_media_count"
- "adverse_media_severity"
- "litigation_count"
- "fraud_allegations"
risk_indicator_mapping:
- "adverse_media"
- "fraud_allegations"
- "ongoing_litigation"

- id: business_profile
label: "Business Profile & Activity"
description: "Nature of business, cash-intensity, high-risk sectors"
max_score: 20
weight: 1.0
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "industry_codes"
indicator: "intersects"
thresholds:
- { value: ["gambling", "crypto", "arms_trade", "precious_metals"], score: 20 }
- { value: ["real_estate", "legal_services", "art_dealing"], score: 15 }
- { value: ["construction", "import_export"], score: 10 }
- path: "incorporation_date"
indicator: "recency_days"
thresholds: [{ value: 365, score: 10 }, { value: 730, score: 5 }]
# The mapper converts dates into "days since incorporation" before scoring.
module_mapping:
module: CIR
fields:
- "industry_classification"
- "incorporation_date"
- "cash_intensive_indicators"
risk_indicator_mapping:
- "recent_incorporation"
- "high_risk_industry"

geographic:
label: "Geographic Risk"
eba_reference: "GL 2.9–2.15"
weight: 0.25
factors:

- id: jurisdiction_risk
label: "Jurisdiction of Registration"
description: "Country risk based on FATF, EU high-risk third countries, CPI"
max_score: 30
weight: 1.0
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "jurisdiction"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 30 }
- { list: "fatf_grey_list", score: 25 }
- { list: "fatf_increased_monitoring", score: 20 }
- { list: "cpi_below_40", score: 15 }
module_mapping:
module: CIR
fields:
- "country_code"
- "jurisdiction_risk_level"

- id: operational_geography
label: "Operational Geographies"
description: "Countries where the entity operates, has subsidiaries, or transacts"
max_score: 25
weight: 1.0
ontology_mapping:
entity_type: Address
fields:
- path: "country"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 25 }
- { list: "fatf_grey_list", score: 20 }
- { list: "cpi_below_40", score: 10 }
module_mapping:
module: ROA
fields:
- "operational_countries"
- "address_jurisdictions"

- id: ubo_geography
label: "UBO / Director Geography"
description: "Countries of nationality or residence of UBOs and directors"
max_score: 25
weight: 1.0
ontology_mapping:
entity_type: Person
fields:
- path: "nationality"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 25 }
- { list: "fatf_grey_list", score: 20 }
module_mapping:
module: MEBO
fields:
- "ubo_nationalities"
- "director_nationalities"

- id: address_risk
label: "Address Risk Indicators"
description: "Virtual offices, mass registration addresses, formation agents"
max_score: 20
weight: 1.0
module_mapping:
module: ROA
fields:
- "virtual_office_detected"
- "mass_registration_address"
- "address_mismatch"
risk_indicator_mapping:
- "virtual_office"
- "address_mismatch"
- "mass_registration_address"

product_service:
label: "Product / Service Risk"
eba_reference: "GL 2.16–2.17"
weight: 0.20
factors:

- id: product_complexity
label: "Product / Service Complexity"
description: "Complex financial products, anonymity features, new technologies"
max_score: 25
weight: 1.0
# This dimension is primarily populated by the obliged entity's
# own product classification — configured per customer deployment
input_type: "manual_or_workflow"
default_score: 0

- id: regulatory_status
label: "Regulatory & Licensing Status"
description: "Missing licenses, regulatory violations, compliance gaps"
max_score: 25
weight: 1.0
module_mapping:
module: FRLS
fields:
- "required_licenses"
- "license_status"
- "regulatory_violations"
- "compliance_gaps"
risk_indicator_mapping:
- "missing_licenses"
- "compliance_violations"

delivery_channel:
label: "Delivery Channel Risk"
eba_reference: "GL 2.18–2.19"
weight: 0.10
factors:

- id: non_face_to_face
label: "Non-Face-to-Face Relationship"
description: "Remote onboarding, no physical presence verification"
max_score: 15
weight: 1.0
input_type: "manual_or_workflow"
default_score: 0

- id: digital_presence
label: "Digital Presence Integrity"
description: "Website authenticity, domain ownership, online footprint"
max_score: 20
weight: 1.0
module_mapping:
module: DFWO
fields:
- "domain_ownership_verified"
- "website_content_match"
- "domain_age_days"
- "ssl_certificate_valid"
risk_indicator_mapping:
- "domain_not_owned"
- "privacy_protected_whois"
- "recent_domain_registration"
- "website_content_mismatch"

transaction:
label: "Transaction Risk"
eba_reference: "GL 2.20–2.21"
weight: 0.15
factors:

- id: financial_profile
label: "Financial Profile"
description: "Revenue patterns, financial distress, unusual financial indicators"
max_score: 25
weight: 1.0
module_mapping:
module: FRLS
fields:
- "financial_distress_indicators"
- "revenue_plausibility"
- "missing_financial_statements"
risk_indicator_mapping:
- "financial_distress"
- "missing_financial_statements"

- id: transaction_patterns
label: "Transaction Patterns"
description: "Unusual transaction volumes, cross-border patterns, cash usage"
max_score: 25
weight: 1.0
# Primarily fed by ongoing monitoring, not initial investigation
input_type: "manual_or_workflow"
default_score: 0

# Aggregation: how dimension scores combine into overall score
aggregation:
method: "weighted_max"
# weighted_max: overall = max(dimension_scores) * 0.6 + weighted_avg * 0.4
# This ensures a single critical dimension cannot be hidden by low others
# while still reflecting overall risk profile
# Important: with the default 60/40 blend, a single dimension scoring 100
# while all others score 0 yields an overall score of 60. This is a deliberate
# policy choice for v1 and must be reviewed by compliance owners before publish.

# Alternative methods available:
# - "weighted_average": pure weighted average of dimension scores
# - "highest_dimension": overall = highest single dimension score
# Custom user-defined formulas are intentionally out of scope for v1.
# They weaken determinism guarantees and complicate auditability.

risk_levels:
critical: { min: 90, max: 100, color: "#DC2626", action: "reject_or_edd" }
high: { min: 70, max: 89, color: "#EA580C", action: "enhanced_due_diligence" }
medium: { min: 40, max: 69, color: "#CA8A04", action: "standard_due_diligence" }
low: { min: 20, max: 39, color: "#16A34A", action: "simplified_due_diligence" }
clear: { min: 0, max: 19, color: "#0D9488", action: "simplified_due_diligence" }

Scoring Algorithm

Design Principles

  1. Deterministic. Given identical inputs and the same matrix version, the output is always the same. No randomness, no LLM inference in the scoring path, no floating-point non-determinism.

  2. Idempotent. Running the scorer twice on the same matrix version, canonical input, and canonical override set produces the same evaluation record. The unique evaluation_fingerprint enforces this at the persistence layer.

  3. Auditable. Every intermediate value is stored. The dimension_scores JSONB contains per-factor breakdowns. The matrix_version_snapshot preserves the exact rules used.

Scoring Engine

# src/risk_matrix/scorer.py

import logging
import hashlib
import json
from dataclasses import dataclass, field
from typing import Any, Optional

logger = logging.getLogger(__name__)


@dataclass
class FactorResult:
"""Result of evaluating a single risk factor."""
factor_id: str
raw_score: int # Before capping to max_score
capped_score: int # After capping: min(raw_score, max_score)
max_score: int
contributing_indicators: list[dict] # Which data points contributed
override: Optional[dict] = None # Analyst override if applied


@dataclass
class DimensionResult:
"""Result of evaluating a single risk dimension."""
dimension_id: str
score: int # 0–100 normalized
level: str # risk level label
factors: list[FactorResult] = field(default_factory=list)
raw_total: int = 0 # Sum of factor scores before normalization
max_possible: int = 0 # Sum of factor max_scores


@dataclass
class MatrixEvaluationResult:
"""Complete matrix evaluation output."""
dimension_results: dict[str, DimensionResult]
overall_score: int
overall_level: str
input_hash: str
override_hash: str
evaluation_fingerprint: str
output_hash: str


class RiskMatrixScorer:
"""
Deterministic, idempotent risk matrix scorer.

Accepts a matrix schema definition and input data,
produces a fully traceable evaluation result.
"""

def __init__(
self,
matrix_definition: dict,
aggregation_config: dict,
reference_data_snapshot: dict | None = None,
):
self.matrix = matrix_definition
self.aggregation = aggregation_config
self.reference_data = reference_data_snapshot or {}

@staticmethod
def _canonical_json(payload: Any) -> str:
return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=True, default=str)

def validate_configuration(self) -> None:
"""
Publish-time validation for aggregation config and external references.
"""
self._validate_risk_levels()
self._validate_reference_lists()

def compute_input_hash(self, input_data: dict) -> str:
"""
SHA-256 of canonicalized input data.
Ensures determinism: same inputs always produce same hash.
"""
canonical = self._canonical_json(input_data)
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()

def compute_override_hash(self, overrides: list[dict]) -> str:
canonical = self._canonical_json(self._normalize_overrides(overrides))
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()

def compute_evaluation_fingerprint(
self,
matrix_schema_id: str,
input_hash: str,
override_hash: str,
) -> str:
canonical = self._canonical_json({
"matrix_schema_id": matrix_schema_id,
"input_hash": input_hash,
"override_hash": override_hash,
})
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()

def compute_output_hash(self, dimension_results: dict, overall_score: int) -> str:
"""SHA-256 of the evaluation output for tamper detection."""
output = {
"dimensions": {
k: {"score": v.score, "factors": [
{"id": f.factor_id, "score": f.capped_score}
for f in v.factors
]}
for k, v in dimension_results.items()
},
"overall_score": overall_score,
}
canonical = json.dumps(output, sort_keys=True)
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()

def evaluate(
self,
input_data: dict,
matrix_schema_id: str,
overrides: list[dict] | None = None,
) -> MatrixEvaluationResult:
"""
Evaluate the full risk matrix against input data.

Args:
input_data: Dict keyed by dimension_id, containing factor input values.
Typically assembled by the ontology mapper or investigation
results collector.
overrides: Optional list of analyst overrides:
[{"dimension": "customer", "factor_id": "pep_exposure",
"override_score": 30, ...}]

Returns:
MatrixEvaluationResult with all scores, hashes, and traceability.
"""
normalized_overrides = self._normalize_overrides(overrides or [])
input_hash = self.compute_input_hash(input_data)
override_hash = self.compute_override_hash(normalized_overrides)
evaluation_fingerprint = self.compute_evaluation_fingerprint(
matrix_schema_id, input_hash, override_hash
)
dimension_results: dict[str, DimensionResult] = {}
override_map = self._build_override_map(normalized_overrides)

for dim_id, dim_def in self.matrix["dimensions"].items():
dim_result = self._evaluate_dimension(
dim_id, dim_def, input_data.get(dim_id, {}), override_map
)
dimension_results[dim_id] = dim_result

overall_score = self._aggregate(dimension_results)
overall_level = self._score_to_level(overall_score)
output_hash = self.compute_output_hash(dimension_results, overall_score)

return MatrixEvaluationResult(
dimension_results=dimension_results,
overall_score=overall_score,
overall_level=overall_level,
input_hash=input_hash,
override_hash=override_hash,
evaluation_fingerprint=evaluation_fingerprint,
output_hash=output_hash,
)

def _evaluate_dimension(
self,
dim_id: str,
dim_def: dict,
dim_input: dict,
override_map: dict,
) -> DimensionResult:
"""
Evaluate all factors within a dimension, normalize to 0–100.

Dimension aggregation is additive across factors after per-factor capping.
Within a factor, multiple indicators collapse to the highest matching score.
This intentionally rewards breadth of moderate risk signals across distinct
factors rather than only the single strongest signal.
"""
factor_results = []
raw_total = 0
max_possible = 0

for factor_def in dim_def["factors"]:
factor_id = factor_def["id"]
max_score = factor_def["max_score"]
max_possible += max_score

# Compute raw factor score from input data
raw_score, indicators = self._evaluate_factor(
factor_def, dim_input.get(factor_id, {})
)
capped_score = min(raw_score, max_score)

# Apply override if present
override_key = f"{dim_id}.{factor_id}"
override = override_map.get(override_key)
if override:
capped_score = min(override["override_score"], max_score)

raw_total += capped_score
factor_results.append(FactorResult(
factor_id=factor_id,
raw_score=raw_score,
capped_score=capped_score,
max_score=max_score,
contributing_indicators=indicators,
override=override,
))

# Normalize to 0–100
normalized = round((raw_total / max_possible) * 100) if max_possible > 0 else 0
level = self._score_to_level(normalized)

return DimensionResult(
dimension_id=dim_id,
score=normalized,
level=level,
factors=factor_results,
raw_total=raw_total,
max_possible=max_possible,
)

def _evaluate_factor(
self, factor_def: dict, factor_input: dict
) -> tuple[int, list[dict]]:
"""
Evaluate a single factor against its input data.

Uses the threshold definitions from the matrix schema.
Returns (score, contributing_indicators).
"""
if not factor_input:
return (factor_def.get("default_score", 0), [])

score = 0
indicators = []

# Check ontology-mapped fields
if "ontology_mapping" in factor_def:
for field_def in factor_def["ontology_mapping"].get("fields", []):
field_path = field_def["path"]
field_value = factor_input.get(field_path)
if field_value is None:
continue

indicator_type = field_def["indicator"]
for threshold in field_def["thresholds"]:
matched, threshold_score = self._check_threshold(
indicator_type, field_value, threshold
)
if matched:
score = max(score, threshold_score)
indicators.append({
"field": field_path,
"value": field_value,
"indicator": indicator_type,
"threshold_score": threshold_score,
})
break # Use highest matching threshold

# Check risk indicator flags (from existing risk_indicators)
if "risk_indicator_mapping" in factor_def:
for ri_key in factor_def["risk_indicator_mapping"]:
if factor_input.get(f"ri_{ri_key}"):
ri_score = factor_input.get(f"ri_{ri_key}_score", 10)
score = max(score, ri_score)
indicators.append({
"risk_indicator": ri_key,
"score": ri_score,
})

# Check module output fields
if "module_mapping" in factor_def:
for field_name in factor_def["module_mapping"].get("fields", []):
if field_name in factor_input and factor_input[field_name]:
module_score = factor_input.get(f"{field_name}_score", 0)
if module_score > 0:
score = max(score, module_score)
indicators.append({
"module_field": field_name,
"value": factor_input[field_name],
"score": module_score,
})

return (score, indicators)

def _check_threshold(
self, indicator_type: str, value: Any, threshold: dict
) -> tuple[bool, int]:
"""Pure function: check if a value matches a threshold definition."""
t_value = threshold.get("value")
t_score = threshold.get("score", 0)
t_list = threshold.get("list")

match indicator_type:
case "equals":
if isinstance(value, list):
return (t_value in value, t_score)
return (value == t_value, t_score)
case "greater_than":
return (value > t_value, t_score) if isinstance(value, (int, float)) else (False, 0)
case "less_than":
return (value < t_value, t_score) if isinstance(value, (int, float)) else (False, 0)
case "in":
if isinstance(value, list) and isinstance(t_value, list):
return (bool(set(value) & set(t_value)), t_score)
return (value in t_value, t_score) if isinstance(t_value, list) else (False, 0)
case "intersects":
if isinstance(value, list) and isinstance(t_value, list):
return (bool(set(value) & set(t_value)), t_score)
return (False, 0)
case "country_risk_list":
if t_list not in self.reference_data:
raise ValueError(f"Unknown country risk list: {t_list}")
reference_values = self.reference_data[t_list]
if isinstance(value, list):
return (bool(set(value) & set(reference_values)), t_score)
return (value in reference_values, t_score)
case "recency_days":
if isinstance(value, (int, float)):
return (value <= t_value, t_score)
return (False, 0)
case _:
logger.warning(f"Unknown indicator type: {indicator_type}")
return (False, 0)

def _aggregate(self, dimension_results: dict[str, DimensionResult]) -> int:
"""
Aggregate dimension scores into overall score.
Method determined by aggregation config.
"""
method = self.aggregation.get("method", "weighted_max")
weights = self.aggregation.get("dimension_weights", {})
scores = {k: v.score for k, v in dimension_results.items()}

if method == "weighted_average":
total_weight = sum(weights.get(k, 0) for k in scores)
if total_weight == 0:
return 0
return round(
sum(scores[k] * weights.get(k, 0) for k in scores) / total_weight
)

elif method == "weighted_max":
# Blended: 60% highest dimension, 40% weighted average
max_score = max(scores.values()) if scores else 0
total_weight = sum(weights.get(k, 0) for k in scores)
weighted_avg = (
round(sum(scores[k] * weights.get(k, 0) for k in scores) / total_weight)
if total_weight > 0 else 0
)
return round(max_score * 0.6 + weighted_avg * 0.4)

elif method == "highest_dimension":
return max(scores.values()) if scores else 0

else:
raise ValueError(f"Unknown aggregation method: {method}")

def _score_to_level(self, score: int) -> str:
"""Map a numeric score to a risk level label after validated band checks."""
levels = self.aggregation.get("risk_levels", {})
ordered_levels = sorted(levels.items(), key=lambda item: item[1]["min"])
for level_name, bounds in ordered_levels:
if bounds["min"] <= score <= bounds["max"]:
return level_name
return "medium" # Fallback

def _validate_risk_levels(self) -> None:
levels = self.aggregation.get("risk_levels", {})
ordered_levels = sorted(levels.items(), key=lambda item: item[1]["min"])
expected_min = 0
for level_name, bounds in ordered_levels:
min_score = bounds["min"]
max_score = bounds["max"]
if min_score != expected_min:
raise ValueError(
f"Risk levels must be contiguous from 0..100; gap or overlap before {level_name}"
)
if max_score < min_score:
raise ValueError(f"Invalid risk level range for {level_name}")
expected_min = max_score + 1
if expected_min != 101:
raise ValueError("Risk levels must fully cover 0..100")

def _validate_reference_lists(self) -> None:
for dim_def in self.matrix.get("dimensions", {}).values():
for factor_def in dim_def.get("factors", []):
for field_def in factor_def.get("ontology_mapping", {}).get("fields", []):
if field_def.get("indicator") != "country_risk_list":
continue
for threshold in field_def.get("thresholds", []):
list_name = threshold.get("list")
if list_name and list_name not in self.reference_data:
raise ValueError(f"Missing reference list in matrix snapshot: {list_name}")

@staticmethod
def _normalize_overrides(overrides: list[dict]) -> list[dict]:
"""Return overrides in canonical order for deterministic hashing."""
return sorted(
overrides,
key=lambda item: (
item.get("dimension", ""),
item.get("factor_id", ""),
str(item.get("override_score", "")),
),
)

@staticmethod
def _build_override_map(overrides: list[dict]) -> dict:
"""Index overrides by 'dimension.factor_id' for O(1) lookup."""
return {
f"{o['dimension']}.{o['factor_id']}": o
for o in overrides
}

Ontology Mapping: From Investigation to Matrix Input

The bridge between investigation outputs and the risk matrix is the OntologyMatrixMapper. It reads ontology entities and investigation module results, then assembles the structured input dict that the scorer expects.

# src/risk_matrix/ontology_mapper.py

class OntologyMatrixMapper:
"""
Maps ontology entities and investigation results to risk matrix input format.

This is the integration point between:
- Entity ontology (ontology_schema_v3.yaml entities)
- Investigation module outputs (OSINT reports)
- Risk indicators (existing risk_indicators from investigations)
- The risk matrix scorer's expected input structure
"""

def __init__(self, matrix_definition: dict, schema_cache):
self.matrix = matrix_definition
self.schema_cache = schema_cache

async def build_input(
self,
company_id: str,
investigation_id: str,
entity_repo,
report_repo,
risk_repo,
) -> dict:
"""
Assemble complete matrix input from all available sources.

Returns dict keyed by dimension_id, each containing factor_id dicts
with the field values the scorer needs.
"""
# 1. Load ontology entities for this company
entities = await entity_repo.get_entities_by_company(company_id)
entities_by_type = self._group_entities_by_type(entities)

# 2. Load investigation module reports
reports = await report_repo.get_reports_by_investigation(investigation_id)
reports_by_module = {r["report_type"].upper(): r for r in reports}

# 3. Load existing risk indicators
risk_indicators = await risk_repo.get_indicators_by_investigation(
investigation_id
)
ri_set = {ri["indicator_key"]: ri for ri in risk_indicators}

# 4. Build input for each dimension
matrix_input = {}
for dim_id, dim_def in self.matrix["dimensions"].items():
dim_input = {}
for factor_def in dim_def["factors"]:
factor_input = {}
factor_id = factor_def["id"]

# Extract from ontology entities
if "ontology_mapping" in factor_def:
entity_type = factor_def["ontology_mapping"]["entity_type"]
matching_entities = entities_by_type.get(entity_type, [])
for field_def in factor_def["ontology_mapping"].get("fields", []):
value = self._extract_field_from_entities(
matching_entities, field_def["path"]
)
if field_def.get("indicator") == "recency_days" and value is not None:
value = self._days_since(value)
if value is not None:
factor_input[field_def["path"]] = value

# Extract from module reports
if "module_mapping" in factor_def:
module = factor_def["module_mapping"]["module"]
report = reports_by_module.get(module, {})
findings = report.get("findings", {})
for field_name in factor_def["module_mapping"].get("fields", []):
if field_name in findings:
factor_input[field_name] = findings[field_name]
# Look for pre-computed score from the module
score_key = f"{field_name}_score"
if score_key in findings:
factor_input[score_key] = findings[score_key]

# Map existing risk indicators
if "risk_indicator_mapping" in factor_def:
for ri_key in factor_def["risk_indicator_mapping"]:
if ri_key in ri_set:
factor_input[f"ri_{ri_key}"] = True
factor_input[f"ri_{ri_key}_score"] = (
ri_set[ri_key].get("severity_score", 10)
)

dim_input[factor_id] = factor_input
matrix_input[dim_id] = dim_input

return matrix_input

def _group_entities_by_type(self, entities: list[dict]) -> dict[str, list]:
grouped = {}
for e in entities:
et = e.get("entity_type", "Unknown")
grouped.setdefault(et, []).append(e)
return grouped

def _extract_field_from_entities(
self, entities: list[dict], field_path: str
) -> Any:
"""
Extract a field value from a list of entities.

For numeric fields: returns the highest-risk value across entities.
For boolean fields: returns True if any entity has True.
For list fields: returns the sorted union across all entities.
For repeated string fields: returns a sorted list of distinct values so
downstream threshold checks can evaluate the full risk surface.
"""
values = []
for entity in entities:
data = entity.get("entity_data", {})
value = self._resolve_dotted_path(data, field_path)
if value is not None:
values.append(value)

if not values:
return None

# Merge strategy based on value type
first = values[0]
if isinstance(first, bool):
return any(values) # Any True → True
elif isinstance(first, (int, float)):
return max(values) # Highest risk value
elif isinstance(first, list):
merged = set()
for v in values:
merged.update(v)
return sorted(merged) # Deterministic union of lists
elif isinstance(first, str):
distinct = sorted({v for v in values if isinstance(v, str)})
return distinct[0] if len(distinct) == 1 else distinct
return values[0]

@staticmethod
def _resolve_dotted_path(data: dict, path: str) -> Any:
"""Resolve 'ownership_structure.layers' from nested dict."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current

@staticmethod
def _days_since(value: Any) -> Any:
"""
Convert date/datetime inputs into integer days before scoring.
If the value is already numeric, pass it through unchanged.
"""
return value

Versioning & Company Binding

Matrix Lifecycle

draft ──→ published ──→ archived

│ (immutable once published)

└──→ new version (draft) ──→ published ──→ ...

Rules:

  • A draft matrix can be edited freely.
  • Publishing freezes the matrix. The trigger function prevent_published_matrix_update enforces this at the database level.
  • Publish-time validation must call RiskMatrixScorer.validate_configuration() before the status transition is committed.
  • Publishing also freezes the resolved external reference datasets inside reference_data_snapshot.
  • When a published matrix needs changes, a new version is created (same schema_id, incremented version).
  • Only one version per schema_id may be published at a time. Publishing a new version must archive the previously published version in the same transaction.
  • Archiving a matrix does not affect existing evaluations — they reference the specific risk_matrix_schemas.id.

Company Binding Flow

1. Company is onboarded → investigation runs
2. Investigation completes → rule_evaluation phase triggers
3. RiskMatrixScorer evaluates against the ACTIVE published matrix
4. Evaluation record is stored with:
- matrix_schema_id = the specific version UUID
- matrix_version_snapshot = full JSON copy of the matrix definition, aggregation, and frozen reference data
- input_hash + override_hash + output_hash for determinism proof
5. company_matrix_assignments record is created:
- Links company to the matrix version
- Records effective_from timestamp
- Previous assignment gets effective_until set (temporal closure)

Re-evaluation on Matrix Upgrade

When a new matrix version is published and a compliance officer decides to re-evaluate existing companies:

# src/risk_matrix/version_manager.py

class MatrixVersionManager:
"""Manages matrix versioning and company re-evaluation."""

async def upgrade_company(
self,
company_id: str,
new_schema_id: UUID,
investigation_id: str,
reason: str = "matrix_upgrade",
) -> RiskMatrixEvaluation:
"""
Re-evaluate a company against a new matrix version.

- Marks previous evaluation as 'superseded'
- Creates new evaluation with new matrix version
- Updates company_matrix_assignments temporal record
"""
# 1. Load new matrix schema
new_schema = await self.schema_repo.get_by_id(new_schema_id)
if new_schema["status"] != "published":
raise ValueError("Cannot evaluate against unpublished matrix")

# 2. Build input from latest investigation
mapper = OntologyMatrixMapper(new_schema["matrix_definition"], self.schema_cache)
input_data = await mapper.build_input(
company_id, investigation_id,
self.entity_repo, self.report_repo, self.risk_repo,
)

# 3. Score
scorer = RiskMatrixScorer(
new_schema["matrix_definition"],
new_schema["aggregation_config"],
new_schema["reference_data_snapshot"],
)
result = scorer.evaluate(
input_data,
matrix_schema_id=str(new_schema_id),
)

# 4. Check idempotency using matrix version + input + overrides
existing = await self.eval_repo.get_by_fingerprint(
new_schema_id, result.evaluation_fingerprint
)
if existing:
logger.info(
f"Idempotent: evaluation already exists for fingerprint "
f"{result.evaluation_fingerprint}"
)
return existing

# 5. Store + supersede + rebind atomically
async with self.db.transaction():
prev_eval = await self.eval_repo.get_active_for_company(company_id)
new_eval = await self.eval_repo.create(
company_id=company_id,
investigation_id=investigation_id,
matrix_schema_id=new_schema_id,
matrix_version_snapshot={
"matrix_definition": new_schema["matrix_definition"],
"aggregation_config": new_schema["aggregation_config"],
"reference_data_snapshot": new_schema["reference_data_snapshot"],
},
dimension_scores=self._serialize_dimension_results(result),
overall_score=result.overall_score,
overall_level=result.overall_level,
input_hash=result.input_hash,
override_hash=result.override_hash,
evaluation_fingerprint=result.evaluation_fingerprint,
output_hash=result.output_hash,
)

if prev_eval:
await self.eval_repo.mark_superseded(prev_eval["id"], new_eval["id"])

await self.assignment_repo.close_current(company_id)
await self.assignment_repo.create(
company_id=company_id,
matrix_schema_id=new_schema_id,
evaluation_id=new_eval["id"],
assignment_reason=reason,
)

return new_eval

Workflow Integration

The risk matrix plugs into ADR-007's rule_evaluation phase type. When a workflow reaches a rule_evaluation phase with output_format: eba_risk_matrix, the engine:

  1. Loads the company's assigned matrix, or resolves the configured matrix_schema to the single currently published version.
  2. Calls OntologyMatrixMapper.build_input() to collect investigation results.
  3. Calls RiskMatrixScorer.evaluate() to compute the matrix.
  4. Stores the risk_matrix_evaluations record.
  5. Returns dimension scores to the workflow for routing decisions.
# In a workflow schema (e.g., kyb_onboarding_standard_v1.yaml)

phases:
- id: risk_assessment
type: rule_evaluation
config:
output_format: eba_risk_matrix
matrix_schema: "eba_standard_v1" # Resolves to the single published version
# OR: matrix_schema_id: "uuid-here" # Pin to exact version
allow_overrides: true

gate:
conditions:
- field: "risk_assessment.status"
operator: "equals"
value: "completed"

routes:
- condition: "risk_assessment.overall_level in ['critical', 'high']"
next_phase: enhanced_review
- condition: "risk_assessment.overall_level == 'medium'"
next_phase: standard_review
- condition: "risk_assessment.overall_level in ['low', 'clear']"
next_phase: auto_approve

API Endpoints

All under /risk-matrix prefix, registered behind the same feature flag pattern as the workflow engine.

# Matrix Schema CRUD
GET /risk-matrix/schemas List all schemas (with version history)
POST /risk-matrix/schemas Create new schema (draft)
GET /risk-matrix/schemas/{id} Get schema by ID
PUT /risk-matrix/schemas/{id} Update draft schema
POST /risk-matrix/schemas/{id}/publish Publish schema (freezes it)
POST /risk-matrix/schemas/{id}/archive Archive schema
POST /risk-matrix/schemas/{id}/new-version Create new version from existing
GET /risk-matrix/schemas/{schema_id}/versions List all versions of a schema
GET /risk-matrix/schemas/{schema_id}/diff/{v1}/{v2} Diff between two versions

# Evaluations
POST /risk-matrix/evaluate Trigger evaluation for a company
GET /risk-matrix/evaluations/{id} Get evaluation details
GET /risk-matrix/evaluations/company/{company_id} Get evaluation history for company
POST /risk-matrix/evaluations/{id}/override Create derived overridden evaluation
GET /risk-matrix/evaluations/{id}/verify Re-compute and verify hashes match

# Company Assignments
GET /risk-matrix/assignments/company/{company_id} Get assignment history
POST /risk-matrix/assignments/company/{company_id}/upgrade Re-evaluate with new matrix
GET /risk-matrix/assignments/schema/{schema_id} List companies on a specific matrix version

# Batch Operations
POST /risk-matrix/batch/re-evaluate Re-evaluate multiple companies
GET /risk-matrix/batch/{batch_id}/status Check batch re-evaluation progress

Batch re-evaluation is asynchronous only and must run via Temporal, not inline in the API request. The API must enforce a maximum batch size per request, tenant-scoped authorization, and rate limiting. Recommended v1 defaults: max 100 companies per batch, one active batch per tenant, and explicit operator approval/audit logging.


The risk matrix management UI lives at /risk-matrices under the Compliance Studio sidebar section, alongside Workflows (/workflows/schemas) and Risk Categories (/studio/risk-categories, see ADR-008a). This navigation restructure replaces the previous "Configure" section and groups the three compliance authoring tools together. See ADR-008a for the full navigation specification.

The Risk Matrices page should support: listing all matrix schemas with version history and status, creating new matrices (draft), editing draft matrices (dimension/factor/weight configuration), publishing (which triggers reference data resolution via ADR-008a and freezes the snapshot), archiving, version diffing, and viewing evaluation statistics per matrix.


File Structure

src/risk_matrix/ # NEW directory
├── __init__.py
├── scorer.py # RiskMatrixScorer (deterministic scoring engine)
├── ontology_mapper.py # OntologyMatrixMapper (entity → matrix input)
├── version_manager.py # MatrixVersionManager (versioning + company binding)
├── schema_loader.py # RiskMatrixSchemaCache (YAML loader + validation)
├── country_risk_lists.py # Reference data: FATF, EU high-risk, CPI lists
├── repository.py # MatrixSchemaRepo, EvaluationRepo, AssignmentRepo
├── router.py # FastAPI endpoints
└── activities.py # Temporal activities for workflow integration

config/risk_matrix_schemas/ # NEW directory
├── eba_standard_v1.yaml # Default EBA-aligned matrix
├── eba_crypto_v1.yaml # Extended matrix for CASPs (EBA/GL/2024/01)
└── custom_template.yaml # Blank template for customer customization

migrations/
├── V56__risk_matrix_schemas.sql # Matrix schemas + evaluations + assignments

Agent Population Flow

When an investigation completes inside a workflow, the agents have already produced all the data. The matrix population is a post-investigation activity, not an agent task itself.

┌──────────────────────────────────────────────────┐
│ InvestigationWorkflow (existing, unchanged) │
│ │
│ ┌─────┐ ┌─────┐ ┌──────┐ ┌───────┐ ┌──────┐ │
│ │ CIR │ │ ROA │ │ MEBO │ │SPEPWS │ │AMLRR │... │
│ └──┬──┘ └──┬──┘ └──┬───┘ └───┬───┘ └──┬───┘ │
│ │ │ │ │ │ │
│ └───────┴───────┴────┬────┴─────────┘ │
│ ▼ │
│ Entity Ontology + Reports │
│ (PostgreSQL + Neo4j) │
└──────────────────┬─────────────────────────────────┘


┌──────────────────────────────────────────────────┐
│ rule_evaluation phase (DynamicComplianceWorkflow) │
│ │
│ 1. OntologyMatrixMapper.build_input() │
│ ├── Reads entities from entity_repo │
│ ├── Reads reports from report_repo │
│ └── Reads risk_indicators from risk_repo │
│ │
│ 2. RiskMatrixScorer.evaluate(input_data, matrix_schema_id) │
│ ├── Scores each factor within each dimension │
│ ├── Normalizes dimension scores to 0–100 │
│ ├── Aggregates to overall score │
│ └── Computes input_hash + override_hash + output_hash │
│ │
│ 3. Store risk_matrix_evaluations record │
│ 4. Create/update company_matrix_assignments │
│ 5. Return scores to workflow for routing │
└──────────────────────────────────────────────────┘

Idempotency Guarantee

The system guarantees idempotency at three levels:

  1. Evaluation level. The evaluation_fingerprint is a SHA-256 over matrix_schema_id + input_hash + override_hash. A unique database index enforces one evaluation row per exact scoring request, and repository inserts must use a transaction / upsert pattern to collapse concurrent retries.

  2. Input/output proof level. The input_hash captures canonicalized source inputs, the override_hash captures canonicalized analyst overrides, and the output_hash captures the resulting scores. On re-evaluation, a mismatch indicates either a bug in the scorer or data corruption.

  3. Temporal level. The workflow engine's rule_evaluation activity is implemented as a Temporal activity with idempotency_key = f"{investigation_id}:{matrix_schema_id}:{override_hash}". Temporal deduplicates activity executions with the same key.


Migration from Current Risk Scoring

The existing risk_scoring.py and its flat 0–100 score remain untouched. The risk matrix is additive:

  • Existing investigations keep their flat risk_score and risk_level.
  • New investigations (via workflow engine) produce both the flat score (for backward compatibility) and the full matrix evaluation.
  • The flat score is derived from the matrix's overall_score so they stay in sync.
  • APIs that return risk_level continue to work; the matrix detail is available via the new /risk-matrix/ endpoints.

Rollback Strategy

Same pattern as ADR-007 — fully reversible, zero impact on existing functionality:

  1. Remove router registration from main.py.
  2. Drop new tables: DROP TABLE company_matrix_assignments, risk_matrix_evaluations, risk_matrix_schemas CASCADE.
  3. Remove src/risk_matrix/ directory.
  4. Remove config/risk_matrix_schemas/ directory.

Existing investigations, entities, risk indicators, and the flat risk score are completely unaffected.


Effort Estimate

ComponentEstimateDependencies
Database migrations (V56)0.5 weeksNone
RiskMatrixSchemaCache (YAML loader + validation)1 weekMigrations
RiskMatrixScorer (deterministic engine)1.5 weeksSchema loader
OntologyMatrixMapper (entity → input)1.5 weeksScorer
MatrixVersionManager (versioning + binding)1 weekMapper, Scorer
Country risk list reference data0.5 weeksNone
Repository layer1 weekMigrations
API endpoints (router)1 weekRepository
Temporal activity integration0.5 weeksScorer, ADR-007 Phase 2
Default EBA matrix schema (YAML)1 weekSchema loader
Testing (unit + integration)2.5 weeksAll components
Total12 weeks

The scorer and mapper (3 weeks combined) can begin immediately. The workflow integration (0.5 weeks) depends on ADR-007 Phase 2 delivering the rule_evaluation phase type. The expanded testing allowance covers determinism edge cases, hash stability, publish-time validation, and Temporal retry/idempotency scenarios.


Success Criteria

CriterionTargetMeasurement
Same input + same matrix version = identical output100%Automated test suite with hash verification
Published matrix cannot be modified100%DB trigger + API-level validation
Only one published version exists per schema line100%Partial unique index on status = 'published'
Every evaluation links to exact matrix version100%Foreign key + snapshot column
Reference datasets are frozen per published version100%reference_data_snapshot persisted and replay-tested
Company assignment history is temporally queryableYeseffective_from/effective_until range queries
Matrix schema validates against JSON Schema100%Schema validation on load
Scoring completes in < 500ms per company95th percentileActivity execution metrics
Ontology mapper extracts all mapped fields correctly> 95%Integration tests against sample investigations
Analyst override is recorded with full audit trail100%Derived evaluation row + override JSONB + supersession chain