ADR-008: EBA Risk Matrix Engine
Status: Proposed
Date: 2026-03-28
Author: Atlas Architecture
Supersedes: Current flat risk scoring in risk_scoring.py
Context
Atlas currently computes a single 0–100 risk score per investigation using a flat, additive algorithm in risk_scoring.py. This approach has three limitations:
-
No EBA alignment. The EBA ML/TF Risk Factors Guidelines (EBA/GL/2021/02, consolidated with EBA/GL/2023/03 and EBA/GL/2024/01) require obliged entities to assess risk across distinct dimensions: customer, geographic, product/service, delivery channel, and transaction. Atlas collapses all of these into one number, making it difficult for compliance officers to demonstrate a structured risk-based approach to regulators.
-
No versioning. When a compliance team adjusts risk weights or adds new risk factors, the old scoring logic is overwritten. There is no way to know which version of the matrix was active when a company was evaluated, which breaks audit trails.
-
No configurability. Risk weights are hardcoded in Python. Different customers (or even different business lines within the same customer) may need different matrices — a payment institution's geographic risk factors differ from a corporate finance firm's.
This ADR introduces a versioned, schema-driven EBA risk matrix that integrates with the existing ontology, the rule engine (ADR feature gap spec), and the workflow engine (ADR-007).
Decision
Build a versioned EBA risk matrix system with the following properties:
- Configurable dimensions and factors defined in a YAML/JSON schema, following the entity ontology pattern.
- Deterministic, idempotent scoring — same inputs always produce the same matrix output, regardless of when or how many times the computation runs.
- Immutable matrix versions — once published, a matrix version is frozen. Edits create a new version.
- Single published version per schema line — at most one version of a given
schema_idmay bepublishedat a time. - Company-to-version binding — each evaluation pins the company to the exact matrix version used, creating a permanent audit record.
- Ontology mapping — risk factors map to entity ontology properties and investigation module outputs, so agents can populate the matrix automatically.
- Snapshotted reference data — external risk lists used in scoring are resolved and frozen into the published matrix version.
- Manual override capability — analysts can override individual factor scores with documented justification, producing a new derived evaluation rather than mutating the original record.
EBA Risk Dimension Model
The EBA ML/TF Risk Factors Guidelines (Guideline 2, sections 2.3–2.21) define five risk factor categories. Each category contains multiple risk factors, and each factor has risk-increasing and risk-decreasing indicators.
Dimensions
| Dimension | EBA Reference | Description | Atlas Source Modules |
|---|---|---|---|
| Customer | GL 2.3–2.8 | Nature of the customer, ownership structure, behaviour, reputation | CIR, MEBO, SPEPWS, AMLRR |
| Geographic | GL 2.9–2.15 | Countries/jurisdictions linked to the customer, operations, transactions | CIR, ROA, MEBO |
| Product / Service | GL 2.16–2.17 | Products or services the customer uses or provides | CIR, FRLS |
| Delivery Channel | GL 2.18–2.19 | How the business relationship is conducted | CIR, DFWO |
| Transaction | GL 2.20–2.21 | Nature and pattern of transactions | FRLS |
Each dimension receives its own score (0–100). The overall risk level is derived from a configurable aggregation function applied to the dimension scores — not a simple average.
Database Schema
All new tables. No modifications to existing tables.
Core Tables
-- V56__risk_matrix_schemas.sql
-- ============================================================
-- Matrix template: the configurable risk matrix definition
-- ============================================================
CREATE TABLE risk_matrix_schemas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
schema_id VARCHAR(100) NOT NULL, -- e.g. 'eba_standard_v1'
version INTEGER NOT NULL DEFAULT 1,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'draft',
-- 'draft', 'published', 'archived'
-- The full matrix definition (dimensions, factors, weights, indicators)
matrix_definition JSONB NOT NULL,
-- Aggregation configuration
aggregation_config JSONB NOT NULL DEFAULT '{
"method": "weighted_max",
"dimension_weights": {
"customer": 0.30,
"geographic": 0.25,
"product_service": 0.20,
"delivery_channel": 0.10,
"transaction": 0.15
},
"risk_levels": {
"critical": {"min": 90, "max": 100},
"high": {"min": 70, "max": 89},
"medium": {"min": 40, "max": 69},
"low": {"min": 20, "max": 39},
"clear": {"min": 0, "max": 19}
}
}',
-- Frozen external reference data used by this matrix version
-- Example: FATF lists, EU high-risk countries, CPI-derived country buckets
reference_data_snapshot JSONB NOT NULL DEFAULT '{}',
-- Metadata
regulatory_basis VARCHAR(100), -- 'EBA/GL/2021/02'
applicable_entity_types TEXT[] DEFAULT '{LegalEntity}',
-- Authorship and audit
created_by VARCHAR(100),
published_by VARCHAR(100),
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(schema_id, version)
);
CREATE INDEX idx_rms_status ON risk_matrix_schemas(status);
CREATE INDEX idx_rms_schema_id ON risk_matrix_schemas(schema_id);
CREATE UNIQUE INDEX uq_rms_single_published
ON risk_matrix_schemas(schema_id)
WHERE status = 'published';
-- Prevent modification of published schemas
CREATE OR REPLACE FUNCTION prevent_published_matrix_update()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.status = 'published' AND NEW.status NOT IN ('published', 'archived') THEN
RAISE EXCEPTION 'Published risk matrix schemas may only transition to archived.';
END IF;
IF OLD.status = 'published' AND NEW.status = 'published' THEN
IF NEW.matrix_definition IS DISTINCT FROM OLD.matrix_definition
OR NEW.aggregation_config IS DISTINCT FROM OLD.aggregation_config
OR NEW.reference_data_snapshot IS DISTINCT FROM OLD.reference_data_snapshot
OR NEW.name IS DISTINCT FROM OLD.name THEN
RAISE EXCEPTION 'Cannot modify a published risk matrix schema. Create a new version instead.';
END IF;
IF NEW.description IS DISTINCT FROM OLD.description
OR NEW.regulatory_basis IS DISTINCT FROM OLD.regulatory_basis
OR NEW.applicable_entity_types IS DISTINCT FROM OLD.applicable_entity_types
OR NEW.schema_id IS DISTINCT FROM OLD.schema_id
OR NEW.version IS DISTINCT FROM OLD.version THEN
RAISE EXCEPTION 'Cannot modify metadata of a published risk matrix schema. Create a new version instead.';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_prevent_published_matrix_update
BEFORE UPDATE ON risk_matrix_schemas
FOR EACH ROW
EXECUTE FUNCTION prevent_published_matrix_update();
-- ============================================================
-- Matrix evaluation: a completed risk assessment for a company
-- ============================================================
CREATE TABLE risk_matrix_evaluations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- What was evaluated
company_id UUID NOT NULL,
entity_id UUID, -- optional: specific ontology entity
investigation_id UUID REFERENCES investigations(id) ON DELETE SET NULL,
-- Which matrix version was used (immutable reference)
matrix_schema_id UUID NOT NULL REFERENCES risk_matrix_schemas(id) ON DELETE RESTRICT,
-- Snapshot of the matrix definition at evaluation time
-- (defense-in-depth: even if schema table is corrupted, the
-- evaluation record contains the exact rules that were applied)
matrix_version_snapshot JSONB NOT NULL,
-- Dimension scores (each 0–100)
dimension_scores JSONB NOT NULL,
-- Example:
-- {
-- "customer": {"score": 65, "level": "medium", "factors": [...]},
-- "geographic": {"score": 82, "level": "high", "factors": [...]},
-- "product_service": {"score": 30, "level": "low", "factors": [...]},
-- "delivery_channel": {"score": 15, "level": "clear", "factors": [...]},
-- "transaction": {"score": 45, "level": "medium", "factors": [...]}
-- }
-- Aggregated overall score
overall_score INTEGER NOT NULL, -- 0–100
overall_level VARCHAR(20) NOT NULL, -- 'critical','high','medium','low','clear'
-- Determinism proof
input_hash VARCHAR(64) NOT NULL, -- SHA-256 of canonicalized input data only
override_hash VARCHAR(64) NOT NULL, -- SHA-256 of canonicalized overrides
evaluation_fingerprint VARCHAR(64) NOT NULL, -- matrix_schema_id + input_hash + override_hash
output_hash VARCHAR(64) NOT NULL, -- SHA-256 of dimension_scores + overall
-- Populated by
populated_by VARCHAR(20) NOT NULL DEFAULT 'agent',
-- 'agent' (auto-populated from investigation), 'manual', 'hybrid'
-- Analyst overrides (if any)
overrides JSONB DEFAULT '[]',
-- [{
-- "dimension": "customer",
-- "factor_id": "ownership_complexity",
-- "original_score": 20,
-- "override_score": 60,
-- "justification": "Nominee structure identified in manual review",
-- "overridden_by": "analyst@example.com",
-- "overridden_at": "2026-03-28T10:00:00Z"
-- }]
-- Status
status VARCHAR(20) NOT NULL DEFAULT 'completed',
-- 'pending', 'completed', 'overridden', 'superseded'
-- If this evaluation was produced from an existing one
derived_from_evaluation_id UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT,
created_at TIMESTAMPTZ DEFAULT NOW(),
superseded_at TIMESTAMPTZ,
superseded_by UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT
);
CREATE INDEX idx_rme_company ON risk_matrix_evaluations(company_id, created_at DESC);
CREATE INDEX idx_rme_schema ON risk_matrix_evaluations(matrix_schema_id);
CREATE INDEX idx_rme_investigation ON risk_matrix_evaluations(investigation_id);
CREATE INDEX idx_rme_status ON risk_matrix_evaluations(status);
CREATE INDEX idx_rme_input_hash ON risk_matrix_evaluations(input_hash);
CREATE UNIQUE INDEX uq_rme_fingerprint
ON risk_matrix_evaluations(matrix_schema_id, evaluation_fingerprint);
-- ============================================================
-- Company-to-matrix-version binding
-- ============================================================
CREATE TABLE company_matrix_assignments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL,
matrix_schema_id UUID NOT NULL REFERENCES risk_matrix_schemas(id) ON DELETE RESTRICT,
-- The evaluation that established this assignment
evaluation_id UUID REFERENCES risk_matrix_evaluations(id) ON DELETE RESTRICT,
-- Active period (supports temporal queries)
effective_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
effective_until TIMESTAMPTZ, -- NULL = currently active
-- Why this assignment was made
assignment_reason VARCHAR(50) NOT NULL DEFAULT 'initial_evaluation',
-- 'initial_evaluation', 'periodic_review', 'matrix_upgrade',
-- 'regulatory_change', 'manual_reassignment'
assigned_by VARCHAR(100),
created_at TIMESTAMPTZ DEFAULT NOW(),
-- Only one active assignment per company at a time
CONSTRAINT uq_active_assignment
EXCLUDE USING gist (
company_id WITH =,
tstzrange(effective_from, COALESCE(effective_until, 'infinity'::timestamptz)) WITH &&
)
);
CREATE INDEX idx_cma_company_active
ON company_matrix_assignments(company_id)
WHERE effective_until IS NULL;
CREATE INDEX idx_cma_schema ON company_matrix_assignments(matrix_schema_id);
Matrix Definition Schema
The matrix_definition JSONB column in risk_matrix_schemas follows a strict structure. This is the "ontology" of the risk matrix — it defines what gets scored and how.
# config/risk_matrix_schemas/eba_standard_v1.yaml
schema_id: eba_standard_v1
version: 1
name: "EBA Standard ML/TF Risk Matrix"
regulatory_basis: "EBA/GL/2021/02 consolidated"
dimensions:
customer:
label: "Customer Risk"
eba_reference: "GL 2.3–2.8"
weight: 0.30
factors:
- id: ownership_complexity
label: "Ownership Structure Complexity"
description: "Complex or opaque ownership structures, nominee shareholders, bearer shares"
max_score: 25
weight: 1.0
# Ontology mapping: how agents find the data
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "ownership_structure.layers"
indicator: "greater_than"
thresholds: [{ value: 3, score: 25 }, { value: 2, score: 15 }, { value: 1, score: 5 }]
- path: "ownership_structure.has_nominee"
indicator: "equals"
thresholds: [{ value: true, score: 20 }]
- path: "ownership_structure.has_bearer_shares"
indicator: "equals"
thresholds: [{ value: true, score: 25 }]
# Investigation module mapping: which module outputs feed this factor
module_mapping:
module: MEBO
fields:
- "ownership_chain_depth"
- "nominee_shareholders_detected"
- "circular_ownership_detected"
# Risk indicators from existing system
risk_indicator_mapping:
- "complex_ownership"
- "nominee_shareholders"
- "hidden_ubos"
- id: pep_exposure
label: "PEP Exposure"
description: "Politically exposed persons in ownership or management"
max_score: 30
weight: 1.0
ontology_mapping:
entity_type: Person
fields:
- path: "is_pep"
indicator: "equals"
thresholds: [{ value: true, score: 30 }]
- path: "pep_level"
indicator: "in"
thresholds:
- { value: ["head_of_state", "senior_government"], score: 30 }
- { value: ["member_of_parliament", "senior_military"], score: 25 }
- { value: ["regional_political", "senior_judicial"], score: 20 }
- { value: ["family_member", "close_associate"], score: 15 }
module_mapping:
module: SPEPWS
fields:
- "pep_matches"
- "pep_classification"
risk_indicator_mapping:
- "pep_exposure"
- id: sanctions_exposure
label: "Sanctions Exposure"
description: "Direct or indirect sanctions list matches"
max_score: 50
weight: 1.0
ontology_mapping:
entity_type: SanctionsMatch
fields:
- path: "match_type"
indicator: "in"
thresholds:
- { value: ["exact_match"], score: 50 }
- { value: ["strong_match"], score: 40 }
- { value: ["partial_match"], score: 20 }
module_mapping:
module: SPEPWS
fields:
- "sanctions_matches"
- "sanctions_match_confidence"
risk_indicator_mapping:
- "sanctions_match"
- id: adverse_media
label: "Adverse Media & Reputation"
description: "Negative news coverage, litigation, fraud allegations"
max_score: 25
weight: 1.0
module_mapping:
module: AMLRR
fields:
- "adverse_media_count"
- "adverse_media_severity"
- "litigation_count"
- "fraud_allegations"
risk_indicator_mapping:
- "adverse_media"
- "fraud_allegations"
- "ongoing_litigation"
- id: business_profile
label: "Business Profile & Activity"
description: "Nature of business, cash-intensity, high-risk sectors"
max_score: 20
weight: 1.0
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "industry_codes"
indicator: "intersects"
thresholds:
- { value: ["gambling", "crypto", "arms_trade", "precious_metals"], score: 20 }
- { value: ["real_estate", "legal_services", "art_dealing"], score: 15 }
- { value: ["construction", "import_export"], score: 10 }
- path: "incorporation_date"
indicator: "recency_days"
thresholds: [{ value: 365, score: 10 }, { value: 730, score: 5 }]
# The mapper converts dates into "days since incorporation" before scoring.
module_mapping:
module: CIR
fields:
- "industry_classification"
- "incorporation_date"
- "cash_intensive_indicators"
risk_indicator_mapping:
- "recent_incorporation"
- "high_risk_industry"
geographic:
label: "Geographic Risk"
eba_reference: "GL 2.9–2.15"
weight: 0.25
factors:
- id: jurisdiction_risk
label: "Jurisdiction of Registration"
description: "Country risk based on FATF, EU high-risk third countries, CPI"
max_score: 30
weight: 1.0
ontology_mapping:
entity_type: LegalEntity
fields:
- path: "jurisdiction"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 30 }
- { list: "fatf_grey_list", score: 25 }
- { list: "fatf_increased_monitoring", score: 20 }
- { list: "cpi_below_40", score: 15 }
module_mapping:
module: CIR
fields:
- "country_code"
- "jurisdiction_risk_level"
- id: operational_geography
label: "Operational Geographies"
description: "Countries where the entity operates, has subsidiaries, or transacts"
max_score: 25
weight: 1.0
ontology_mapping:
entity_type: Address
fields:
- path: "country"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 25 }
- { list: "fatf_grey_list", score: 20 }
- { list: "cpi_below_40", score: 10 }
module_mapping:
module: ROA
fields:
- "operational_countries"
- "address_jurisdictions"
- id: ubo_geography
label: "UBO / Director Geography"
description: "Countries of nationality or residence of UBOs and directors"
max_score: 25
weight: 1.0
ontology_mapping:
entity_type: Person
fields:
- path: "nationality"
indicator: "country_risk_list"
thresholds:
- { list: "eu_high_risk_third_countries", score: 25 }
- { list: "fatf_grey_list", score: 20 }
module_mapping:
module: MEBO
fields:
- "ubo_nationalities"
- "director_nationalities"
- id: address_risk
label: "Address Risk Indicators"
description: "Virtual offices, mass registration addresses, formation agents"
max_score: 20
weight: 1.0
module_mapping:
module: ROA
fields:
- "virtual_office_detected"
- "mass_registration_address"
- "address_mismatch"
risk_indicator_mapping:
- "virtual_office"
- "address_mismatch"
- "mass_registration_address"
product_service:
label: "Product / Service Risk"
eba_reference: "GL 2.16–2.17"
weight: 0.20
factors:
- id: product_complexity
label: "Product / Service Complexity"
description: "Complex financial products, anonymity features, new technologies"
max_score: 25
weight: 1.0
# This dimension is primarily populated by the obliged entity's
# own product classification — configured per customer deployment
input_type: "manual_or_workflow"
default_score: 0
- id: regulatory_status
label: "Regulatory & Licensing Status"
description: "Missing licenses, regulatory violations, compliance gaps"
max_score: 25
weight: 1.0
module_mapping:
module: FRLS
fields:
- "required_licenses"
- "license_status"
- "regulatory_violations"
- "compliance_gaps"
risk_indicator_mapping:
- "missing_licenses"
- "compliance_violations"
delivery_channel:
label: "Delivery Channel Risk"
eba_reference: "GL 2.18–2.19"
weight: 0.10
factors:
- id: non_face_to_face
label: "Non-Face-to-Face Relationship"
description: "Remote onboarding, no physical presence verification"
max_score: 15
weight: 1.0
input_type: "manual_or_workflow"
default_score: 0
- id: digital_presence
label: "Digital Presence Integrity"
description: "Website authenticity, domain ownership, online footprint"
max_score: 20
weight: 1.0
module_mapping:
module: DFWO
fields:
- "domain_ownership_verified"
- "website_content_match"
- "domain_age_days"
- "ssl_certificate_valid"
risk_indicator_mapping:
- "domain_not_owned"
- "privacy_protected_whois"
- "recent_domain_registration"
- "website_content_mismatch"
transaction:
label: "Transaction Risk"
eba_reference: "GL 2.20–2.21"
weight: 0.15
factors:
- id: financial_profile
label: "Financial Profile"
description: "Revenue patterns, financial distress, unusual financial indicators"
max_score: 25
weight: 1.0
module_mapping:
module: FRLS
fields:
- "financial_distress_indicators"
- "revenue_plausibility"
- "missing_financial_statements"
risk_indicator_mapping:
- "financial_distress"
- "missing_financial_statements"
- id: transaction_patterns
label: "Transaction Patterns"
description: "Unusual transaction volumes, cross-border patterns, cash usage"
max_score: 25
weight: 1.0
# Primarily fed by ongoing monitoring, not initial investigation
input_type: "manual_or_workflow"
default_score: 0
# Aggregation: how dimension scores combine into overall score
aggregation:
method: "weighted_max"
# weighted_max: overall = max(dimension_scores) * 0.6 + weighted_avg * 0.4
# This ensures a single critical dimension cannot be hidden by low others
# while still reflecting overall risk profile
# Important: with the default 60/40 blend, a single dimension scoring 100
# while all others score 0 yields an overall score of 60. This is a deliberate
# policy choice for v1 and must be reviewed by compliance owners before publish.
# Alternative methods available:
# - "weighted_average": pure weighted average of dimension scores
# - "highest_dimension": overall = highest single dimension score
# Custom user-defined formulas are intentionally out of scope for v1.
# They weaken determinism guarantees and complicate auditability.
risk_levels:
critical: { min: 90, max: 100, color: "#DC2626", action: "reject_or_edd" }
high: { min: 70, max: 89, color: "#EA580C", action: "enhanced_due_diligence" }
medium: { min: 40, max: 69, color: "#CA8A04", action: "standard_due_diligence" }
low: { min: 20, max: 39, color: "#16A34A", action: "simplified_due_diligence" }
clear: { min: 0, max: 19, color: "#0D9488", action: "simplified_due_diligence" }
Scoring Algorithm
Design Principles
-
Deterministic. Given identical inputs and the same matrix version, the output is always the same. No randomness, no LLM inference in the scoring path, no floating-point non-determinism.
-
Idempotent. Running the scorer twice on the same matrix version, canonical input, and canonical override set produces the same evaluation record. The unique
evaluation_fingerprintenforces this at the persistence layer. -
Auditable. Every intermediate value is stored. The
dimension_scoresJSONB contains per-factor breakdowns. Thematrix_version_snapshotpreserves the exact rules used.
Scoring Engine
# src/risk_matrix/scorer.py
import logging
import hashlib
import json
from dataclasses import dataclass, field
from typing import Any, Optional
logger = logging.getLogger(__name__)
@dataclass
class FactorResult:
"""Result of evaluating a single risk factor."""
factor_id: str
raw_score: int # Before capping to max_score
capped_score: int # After capping: min(raw_score, max_score)
max_score: int
contributing_indicators: list[dict] # Which data points contributed
override: Optional[dict] = None # Analyst override if applied
@dataclass
class DimensionResult:
"""Result of evaluating a single risk dimension."""
dimension_id: str
score: int # 0–100 normalized
level: str # risk level label
factors: list[FactorResult] = field(default_factory=list)
raw_total: int = 0 # Sum of factor scores before normalization
max_possible: int = 0 # Sum of factor max_scores
@dataclass
class MatrixEvaluationResult:
"""Complete matrix evaluation output."""
dimension_results: dict[str, DimensionResult]
overall_score: int
overall_level: str
input_hash: str
override_hash: str
evaluation_fingerprint: str
output_hash: str
class RiskMatrixScorer:
"""
Deterministic, idempotent risk matrix scorer.
Accepts a matrix schema definition and input data,
produces a fully traceable evaluation result.
"""
def __init__(
self,
matrix_definition: dict,
aggregation_config: dict,
reference_data_snapshot: dict | None = None,
):
self.matrix = matrix_definition
self.aggregation = aggregation_config
self.reference_data = reference_data_snapshot or {}
@staticmethod
def _canonical_json(payload: Any) -> str:
return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=True, default=str)
def validate_configuration(self) -> None:
"""
Publish-time validation for aggregation config and external references.
"""
self._validate_risk_levels()
self._validate_reference_lists()
def compute_input_hash(self, input_data: dict) -> str:
"""
SHA-256 of canonicalized input data.
Ensures determinism: same inputs always produce same hash.
"""
canonical = self._canonical_json(input_data)
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def compute_override_hash(self, overrides: list[dict]) -> str:
canonical = self._canonical_json(self._normalize_overrides(overrides))
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def compute_evaluation_fingerprint(
self,
matrix_schema_id: str,
input_hash: str,
override_hash: str,
) -> str:
canonical = self._canonical_json({
"matrix_schema_id": matrix_schema_id,
"input_hash": input_hash,
"override_hash": override_hash,
})
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
def compute_output_hash(self, dimension_results: dict, overall_score: int) -> str:
"""SHA-256 of the evaluation output for tamper detection."""
output = {
"dimensions": {
k: {"score": v.score, "factors": [
{"id": f.factor_id, "score": f.capped_score}
for f in v.factors
]}
for k, v in dimension_results.items()
},
"overall_score": overall_score,
}
canonical = json.dumps(output, sort_keys=True)
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def evaluate(
self,
input_data: dict,
matrix_schema_id: str,
overrides: list[dict] | None = None,
) -> MatrixEvaluationResult:
"""
Evaluate the full risk matrix against input data.
Args:
input_data: Dict keyed by dimension_id, containing factor input values.
Typically assembled by the ontology mapper or investigation
results collector.
overrides: Optional list of analyst overrides:
[{"dimension": "customer", "factor_id": "pep_exposure",
"override_score": 30, ...}]
Returns:
MatrixEvaluationResult with all scores, hashes, and traceability.
"""
normalized_overrides = self._normalize_overrides(overrides or [])
input_hash = self.compute_input_hash(input_data)
override_hash = self.compute_override_hash(normalized_overrides)
evaluation_fingerprint = self.compute_evaluation_fingerprint(
matrix_schema_id, input_hash, override_hash
)
dimension_results: dict[str, DimensionResult] = {}
override_map = self._build_override_map(normalized_overrides)
for dim_id, dim_def in self.matrix["dimensions"].items():
dim_result = self._evaluate_dimension(
dim_id, dim_def, input_data.get(dim_id, {}), override_map
)
dimension_results[dim_id] = dim_result
overall_score = self._aggregate(dimension_results)
overall_level = self._score_to_level(overall_score)
output_hash = self.compute_output_hash(dimension_results, overall_score)
return MatrixEvaluationResult(
dimension_results=dimension_results,
overall_score=overall_score,
overall_level=overall_level,
input_hash=input_hash,
override_hash=override_hash,
evaluation_fingerprint=evaluation_fingerprint,
output_hash=output_hash,
)
def _evaluate_dimension(
self,
dim_id: str,
dim_def: dict,
dim_input: dict,
override_map: dict,
) -> DimensionResult:
"""
Evaluate all factors within a dimension, normalize to 0–100.
Dimension aggregation is additive across factors after per-factor capping.
Within a factor, multiple indicators collapse to the highest matching score.
This intentionally rewards breadth of moderate risk signals across distinct
factors rather than only the single strongest signal.
"""
factor_results = []
raw_total = 0
max_possible = 0
for factor_def in dim_def["factors"]:
factor_id = factor_def["id"]
max_score = factor_def["max_score"]
max_possible += max_score
# Compute raw factor score from input data
raw_score, indicators = self._evaluate_factor(
factor_def, dim_input.get(factor_id, {})
)
capped_score = min(raw_score, max_score)
# Apply override if present
override_key = f"{dim_id}.{factor_id}"
override = override_map.get(override_key)
if override:
capped_score = min(override["override_score"], max_score)
raw_total += capped_score
factor_results.append(FactorResult(
factor_id=factor_id,
raw_score=raw_score,
capped_score=capped_score,
max_score=max_score,
contributing_indicators=indicators,
override=override,
))
# Normalize to 0–100
normalized = round((raw_total / max_possible) * 100) if max_possible > 0 else 0
level = self._score_to_level(normalized)
return DimensionResult(
dimension_id=dim_id,
score=normalized,
level=level,
factors=factor_results,
raw_total=raw_total,
max_possible=max_possible,
)
def _evaluate_factor(
self, factor_def: dict, factor_input: dict
) -> tuple[int, list[dict]]:
"""
Evaluate a single factor against its input data.
Uses the threshold definitions from the matrix schema.
Returns (score, contributing_indicators).
"""
if not factor_input:
return (factor_def.get("default_score", 0), [])
score = 0
indicators = []
# Check ontology-mapped fields
if "ontology_mapping" in factor_def:
for field_def in factor_def["ontology_mapping"].get("fields", []):
field_path = field_def["path"]
field_value = factor_input.get(field_path)
if field_value is None:
continue
indicator_type = field_def["indicator"]
for threshold in field_def["thresholds"]:
matched, threshold_score = self._check_threshold(
indicator_type, field_value, threshold
)
if matched:
score = max(score, threshold_score)
indicators.append({
"field": field_path,
"value": field_value,
"indicator": indicator_type,
"threshold_score": threshold_score,
})
break # Use highest matching threshold
# Check risk indicator flags (from existing risk_indicators)
if "risk_indicator_mapping" in factor_def:
for ri_key in factor_def["risk_indicator_mapping"]:
if factor_input.get(f"ri_{ri_key}"):
ri_score = factor_input.get(f"ri_{ri_key}_score", 10)
score = max(score, ri_score)
indicators.append({
"risk_indicator": ri_key,
"score": ri_score,
})
# Check module output fields
if "module_mapping" in factor_def:
for field_name in factor_def["module_mapping"].get("fields", []):
if field_name in factor_input and factor_input[field_name]:
module_score = factor_input.get(f"{field_name}_score", 0)
if module_score > 0:
score = max(score, module_score)
indicators.append({
"module_field": field_name,
"value": factor_input[field_name],
"score": module_score,
})
return (score, indicators)
def _check_threshold(
self, indicator_type: str, value: Any, threshold: dict
) -> tuple[bool, int]:
"""Pure function: check if a value matches a threshold definition."""
t_value = threshold.get("value")
t_score = threshold.get("score", 0)
t_list = threshold.get("list")
match indicator_type:
case "equals":
if isinstance(value, list):
return (t_value in value, t_score)
return (value == t_value, t_score)
case "greater_than":
return (value > t_value, t_score) if isinstance(value, (int, float)) else (False, 0)
case "less_than":
return (value < t_value, t_score) if isinstance(value, (int, float)) else (False, 0)
case "in":
if isinstance(value, list) and isinstance(t_value, list):
return (bool(set(value) & set(t_value)), t_score)
return (value in t_value, t_score) if isinstance(t_value, list) else (False, 0)
case "intersects":
if isinstance(value, list) and isinstance(t_value, list):
return (bool(set(value) & set(t_value)), t_score)
return (False, 0)
case "country_risk_list":
if t_list not in self.reference_data:
raise ValueError(f"Unknown country risk list: {t_list}")
reference_values = self.reference_data[t_list]
if isinstance(value, list):
return (bool(set(value) & set(reference_values)), t_score)
return (value in reference_values, t_score)
case "recency_days":
if isinstance(value, (int, float)):
return (value <= t_value, t_score)
return (False, 0)
case _:
logger.warning(f"Unknown indicator type: {indicator_type}")
return (False, 0)
def _aggregate(self, dimension_results: dict[str, DimensionResult]) -> int:
"""
Aggregate dimension scores into overall score.
Method determined by aggregation config.
"""
method = self.aggregation.get("method", "weighted_max")
weights = self.aggregation.get("dimension_weights", {})
scores = {k: v.score for k, v in dimension_results.items()}
if method == "weighted_average":
total_weight = sum(weights.get(k, 0) for k in scores)
if total_weight == 0:
return 0
return round(
sum(scores[k] * weights.get(k, 0) for k in scores) / total_weight
)
elif method == "weighted_max":
# Blended: 60% highest dimension, 40% weighted average
max_score = max(scores.values()) if scores else 0
total_weight = sum(weights.get(k, 0) for k in scores)
weighted_avg = (
round(sum(scores[k] * weights.get(k, 0) for k in scores) / total_weight)
if total_weight > 0 else 0
)
return round(max_score * 0.6 + weighted_avg * 0.4)
elif method == "highest_dimension":
return max(scores.values()) if scores else 0
else:
raise ValueError(f"Unknown aggregation method: {method}")
def _score_to_level(self, score: int) -> str:
"""Map a numeric score to a risk level label after validated band checks."""
levels = self.aggregation.get("risk_levels", {})
ordered_levels = sorted(levels.items(), key=lambda item: item[1]["min"])
for level_name, bounds in ordered_levels:
if bounds["min"] <= score <= bounds["max"]:
return level_name
return "medium" # Fallback
def _validate_risk_levels(self) -> None:
levels = self.aggregation.get("risk_levels", {})
ordered_levels = sorted(levels.items(), key=lambda item: item[1]["min"])
expected_min = 0
for level_name, bounds in ordered_levels:
min_score = bounds["min"]
max_score = bounds["max"]
if min_score != expected_min:
raise ValueError(
f"Risk levels must be contiguous from 0..100; gap or overlap before {level_name}"
)
if max_score < min_score:
raise ValueError(f"Invalid risk level range for {level_name}")
expected_min = max_score + 1
if expected_min != 101:
raise ValueError("Risk levels must fully cover 0..100")
def _validate_reference_lists(self) -> None:
for dim_def in self.matrix.get("dimensions", {}).values():
for factor_def in dim_def.get("factors", []):
for field_def in factor_def.get("ontology_mapping", {}).get("fields", []):
if field_def.get("indicator") != "country_risk_list":
continue
for threshold in field_def.get("thresholds", []):
list_name = threshold.get("list")
if list_name and list_name not in self.reference_data:
raise ValueError(f"Missing reference list in matrix snapshot: {list_name}")
@staticmethod
def _normalize_overrides(overrides: list[dict]) -> list[dict]:
"""Return overrides in canonical order for deterministic hashing."""
return sorted(
overrides,
key=lambda item: (
item.get("dimension", ""),
item.get("factor_id", ""),
str(item.get("override_score", "")),
),
)
@staticmethod
def _build_override_map(overrides: list[dict]) -> dict:
"""Index overrides by 'dimension.factor_id' for O(1) lookup."""
return {
f"{o['dimension']}.{o['factor_id']}": o
for o in overrides
}
Ontology Mapping: From Investigation to Matrix Input
The bridge between investigation outputs and the risk matrix is the OntologyMatrixMapper. It reads ontology entities and investigation module results, then assembles the structured input dict that the scorer expects.
# src/risk_matrix/ontology_mapper.py
class OntologyMatrixMapper:
"""
Maps ontology entities and investigation results to risk matrix input format.
This is the integration point between:
- Entity ontology (ontology_schema_v3.yaml entities)
- Investigation module outputs (OSINT reports)
- Risk indicators (existing risk_indicators from investigations)
- The risk matrix scorer's expected input structure
"""
def __init__(self, matrix_definition: dict, schema_cache):
self.matrix = matrix_definition
self.schema_cache = schema_cache
async def build_input(
self,
company_id: str,
investigation_id: str,
entity_repo,
report_repo,
risk_repo,
) -> dict:
"""
Assemble complete matrix input from all available sources.
Returns dict keyed by dimension_id, each containing factor_id dicts
with the field values the scorer needs.
"""
# 1. Load ontology entities for this company
entities = await entity_repo.get_entities_by_company(company_id)
entities_by_type = self._group_entities_by_type(entities)
# 2. Load investigation module reports
reports = await report_repo.get_reports_by_investigation(investigation_id)
reports_by_module = {r["report_type"].upper(): r for r in reports}
# 3. Load existing risk indicators
risk_indicators = await risk_repo.get_indicators_by_investigation(
investigation_id
)
ri_set = {ri["indicator_key"]: ri for ri in risk_indicators}
# 4. Build input for each dimension
matrix_input = {}
for dim_id, dim_def in self.matrix["dimensions"].items():
dim_input = {}
for factor_def in dim_def["factors"]:
factor_input = {}
factor_id = factor_def["id"]
# Extract from ontology entities
if "ontology_mapping" in factor_def:
entity_type = factor_def["ontology_mapping"]["entity_type"]
matching_entities = entities_by_type.get(entity_type, [])
for field_def in factor_def["ontology_mapping"].get("fields", []):
value = self._extract_field_from_entities(
matching_entities, field_def["path"]
)
if field_def.get("indicator") == "recency_days" and value is not None:
value = self._days_since(value)
if value is not None:
factor_input[field_def["path"]] = value
# Extract from module reports
if "module_mapping" in factor_def:
module = factor_def["module_mapping"]["module"]
report = reports_by_module.get(module, {})
findings = report.get("findings", {})
for field_name in factor_def["module_mapping"].get("fields", []):
if field_name in findings:
factor_input[field_name] = findings[field_name]
# Look for pre-computed score from the module
score_key = f"{field_name}_score"
if score_key in findings:
factor_input[score_key] = findings[score_key]
# Map existing risk indicators
if "risk_indicator_mapping" in factor_def:
for ri_key in factor_def["risk_indicator_mapping"]:
if ri_key in ri_set:
factor_input[f"ri_{ri_key}"] = True
factor_input[f"ri_{ri_key}_score"] = (
ri_set[ri_key].get("severity_score", 10)
)
dim_input[factor_id] = factor_input
matrix_input[dim_id] = dim_input
return matrix_input
def _group_entities_by_type(self, entities: list[dict]) -> dict[str, list]:
grouped = {}
for e in entities:
et = e.get("entity_type", "Unknown")
grouped.setdefault(et, []).append(e)
return grouped
def _extract_field_from_entities(
self, entities: list[dict], field_path: str
) -> Any:
"""
Extract a field value from a list of entities.
For numeric fields: returns the highest-risk value across entities.
For boolean fields: returns True if any entity has True.
For list fields: returns the sorted union across all entities.
For repeated string fields: returns a sorted list of distinct values so
downstream threshold checks can evaluate the full risk surface.
"""
values = []
for entity in entities:
data = entity.get("entity_data", {})
value = self._resolve_dotted_path(data, field_path)
if value is not None:
values.append(value)
if not values:
return None
# Merge strategy based on value type
first = values[0]
if isinstance(first, bool):
return any(values) # Any True → True
elif isinstance(first, (int, float)):
return max(values) # Highest risk value
elif isinstance(first, list):
merged = set()
for v in values:
merged.update(v)
return sorted(merged) # Deterministic union of lists
elif isinstance(first, str):
distinct = sorted({v for v in values if isinstance(v, str)})
return distinct[0] if len(distinct) == 1 else distinct
return values[0]
@staticmethod
def _resolve_dotted_path(data: dict, path: str) -> Any:
"""Resolve 'ownership_structure.layers' from nested dict."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
@staticmethod
def _days_since(value: Any) -> Any:
"""
Convert date/datetime inputs into integer days before scoring.
If the value is already numeric, pass it through unchanged.
"""
return value
Versioning & Company Binding
Matrix Lifecycle
draft ──→ published ──→ archived
│
│ (immutable once published)
│
└──→ new version (draft) ──→ published ──→ ...
Rules:
- A
draftmatrix can be edited freely. - Publishing freezes the matrix. The trigger function
prevent_published_matrix_updateenforces this at the database level. - Publish-time validation must call
RiskMatrixScorer.validate_configuration()before the status transition is committed. - Publishing also freezes the resolved external reference datasets inside
reference_data_snapshot. - When a published matrix needs changes, a new version is created (same
schema_id, incrementedversion). - Only one version per
schema_idmay bepublishedat a time. Publishing a new version must archive the previously published version in the same transaction. - Archiving a matrix does not affect existing evaluations — they reference the specific
risk_matrix_schemas.id.
Company Binding Flow
1. Company is onboarded → investigation runs
2. Investigation completes → rule_evaluation phase triggers
3. RiskMatrixScorer evaluates against the ACTIVE published matrix
4. Evaluation record is stored with:
- matrix_schema_id = the specific version UUID
- matrix_version_snapshot = full JSON copy of the matrix definition, aggregation, and frozen reference data
- input_hash + override_hash + output_hash for determinism proof
5. company_matrix_assignments record is created:
- Links company to the matrix version
- Records effective_from timestamp
- Previous assignment gets effective_until set (temporal closure)
Re-evaluation on Matrix Upgrade
When a new matrix version is published and a compliance officer decides to re-evaluate existing companies:
# src/risk_matrix/version_manager.py
class MatrixVersionManager:
"""Manages matrix versioning and company re-evaluation."""
async def upgrade_company(
self,
company_id: str,
new_schema_id: UUID,
investigation_id: str,
reason: str = "matrix_upgrade",
) -> RiskMatrixEvaluation:
"""
Re-evaluate a company against a new matrix version.
- Marks previous evaluation as 'superseded'
- Creates new evaluation with new matrix version
- Updates company_matrix_assignments temporal record
"""
# 1. Load new matrix schema
new_schema = await self.schema_repo.get_by_id(new_schema_id)
if new_schema["status"] != "published":
raise ValueError("Cannot evaluate against unpublished matrix")
# 2. Build input from latest investigation
mapper = OntologyMatrixMapper(new_schema["matrix_definition"], self.schema_cache)
input_data = await mapper.build_input(
company_id, investigation_id,
self.entity_repo, self.report_repo, self.risk_repo,
)
# 3. Score
scorer = RiskMatrixScorer(
new_schema["matrix_definition"],
new_schema["aggregation_config"],
new_schema["reference_data_snapshot"],
)
result = scorer.evaluate(
input_data,
matrix_schema_id=str(new_schema_id),
)
# 4. Check idempotency using matrix version + input + overrides
existing = await self.eval_repo.get_by_fingerprint(
new_schema_id, result.evaluation_fingerprint
)
if existing:
logger.info(
f"Idempotent: evaluation already exists for fingerprint "
f"{result.evaluation_fingerprint}"
)
return existing
# 5. Store + supersede + rebind atomically
async with self.db.transaction():
prev_eval = await self.eval_repo.get_active_for_company(company_id)
new_eval = await self.eval_repo.create(
company_id=company_id,
investigation_id=investigation_id,
matrix_schema_id=new_schema_id,
matrix_version_snapshot={
"matrix_definition": new_schema["matrix_definition"],
"aggregation_config": new_schema["aggregation_config"],
"reference_data_snapshot": new_schema["reference_data_snapshot"],
},
dimension_scores=self._serialize_dimension_results(result),
overall_score=result.overall_score,
overall_level=result.overall_level,
input_hash=result.input_hash,
override_hash=result.override_hash,
evaluation_fingerprint=result.evaluation_fingerprint,
output_hash=result.output_hash,
)
if prev_eval:
await self.eval_repo.mark_superseded(prev_eval["id"], new_eval["id"])
await self.assignment_repo.close_current(company_id)
await self.assignment_repo.create(
company_id=company_id,
matrix_schema_id=new_schema_id,
evaluation_id=new_eval["id"],
assignment_reason=reason,
)
return new_eval
Workflow Integration
The risk matrix plugs into ADR-007's rule_evaluation phase type. When a workflow reaches a rule_evaluation phase with output_format: eba_risk_matrix, the engine:
- Loads the company's assigned matrix, or resolves the configured
matrix_schemato the single currently published version. - Calls
OntologyMatrixMapper.build_input()to collect investigation results. - Calls
RiskMatrixScorer.evaluate()to compute the matrix. - Stores the
risk_matrix_evaluationsrecord. - Returns dimension scores to the workflow for routing decisions.
# In a workflow schema (e.g., kyb_onboarding_standard_v1.yaml)
phases:
- id: risk_assessment
type: rule_evaluation
config:
output_format: eba_risk_matrix
matrix_schema: "eba_standard_v1" # Resolves to the single published version
# OR: matrix_schema_id: "uuid-here" # Pin to exact version
allow_overrides: true
gate:
conditions:
- field: "risk_assessment.status"
operator: "equals"
value: "completed"
routes:
- condition: "risk_assessment.overall_level in ['critical', 'high']"
next_phase: enhanced_review
- condition: "risk_assessment.overall_level == 'medium'"
next_phase: standard_review
- condition: "risk_assessment.overall_level in ['low', 'clear']"
next_phase: auto_approve
API Endpoints
All under /risk-matrix prefix, registered behind the same feature flag pattern as the workflow engine.
# Matrix Schema CRUD
GET /risk-matrix/schemas List all schemas (with version history)
POST /risk-matrix/schemas Create new schema (draft)
GET /risk-matrix/schemas/{id} Get schema by ID
PUT /risk-matrix/schemas/{id} Update draft schema
POST /risk-matrix/schemas/{id}/publish Publish schema (freezes it)
POST /risk-matrix/schemas/{id}/archive Archive schema
POST /risk-matrix/schemas/{id}/new-version Create new version from existing
GET /risk-matrix/schemas/{schema_id}/versions List all versions of a schema
GET /risk-matrix/schemas/{schema_id}/diff/{v1}/{v2} Diff between two versions
# Evaluations
POST /risk-matrix/evaluate Trigger evaluation for a company
GET /risk-matrix/evaluations/{id} Get evaluation details
GET /risk-matrix/evaluations/company/{company_id} Get evaluation history for company
POST /risk-matrix/evaluations/{id}/override Create derived overridden evaluation
GET /risk-matrix/evaluations/{id}/verify Re-compute and verify hashes match
# Company Assignments
GET /risk-matrix/assignments/company/{company_id} Get assignment history
POST /risk-matrix/assignments/company/{company_id}/upgrade Re-evaluate with new matrix
GET /risk-matrix/assignments/schema/{schema_id} List companies on a specific matrix version
# Batch Operations
POST /risk-matrix/batch/re-evaluate Re-evaluate multiple companies
GET /risk-matrix/batch/{batch_id}/status Check batch re-evaluation progress
Batch re-evaluation is asynchronous only and must run via Temporal, not inline in the API request. The API must enforce a maximum batch size per request, tenant-scoped authorization, and rate limiting. Recommended v1 defaults: max 100 companies per batch, one active batch per tenant, and explicit operator approval/audit logging.
Navigation: Compliance Studio
The risk matrix management UI lives at /risk-matrices under the Compliance Studio sidebar section, alongside Workflows (/workflows/schemas) and Risk Categories (/studio/risk-categories, see ADR-008a). This navigation restructure replaces the previous "Configure" section and groups the three compliance authoring tools together. See ADR-008a for the full navigation specification.
The Risk Matrices page should support: listing all matrix schemas with version history and status, creating new matrices (draft), editing draft matrices (dimension/factor/weight configuration), publishing (which triggers reference data resolution via ADR-008a and freezes the snapshot), archiving, version diffing, and viewing evaluation statistics per matrix.
File Structure
src/risk_matrix/ # NEW directory
├── __init__.py
├── scorer.py # RiskMatrixScorer (deterministic scoring engine)
├── ontology_mapper.py # OntologyMatrixMapper (entity → matrix input)
├── version_manager.py # MatrixVersionManager (versioning + company binding)
├── schema_loader.py # RiskMatrixSchemaCache (YAML loader + validation)
├── country_risk_lists.py # Reference data: FATF, EU high-risk, CPI lists
├── repository.py # MatrixSchemaRepo, EvaluationRepo, AssignmentRepo
├── router.py # FastAPI endpoints
└── activities.py # Temporal activities for workflow integration
config/risk_matrix_schemas/ # NEW directory
├── eba_standard_v1.yaml # Default EBA-aligned matrix
├── eba_crypto_v1.yaml # Extended matrix for CASPs (EBA/GL/2024/01)
└── custom_template.yaml # Blank template for customer customization
migrations/
├── V56__risk_matrix_schemas.sql # Matrix schemas + evaluations + assignments
Agent Population Flow
When an investigation completes inside a workflow, the agents have already produced all the data. The matrix population is a post-investigation activity, not an agent task itself.
┌──────────────────────────────────────────────────┐
│ InvestigationWorkflow (existing, unchanged) │
│ │
│ ┌─────┐ ┌─────┐ ┌──────┐ ┌───────┐ ┌──────┐ │
│ │ CIR │ │ ROA │ │ MEBO │ │SPEPWS │ │AMLRR │... │
│ └──┬──┘ └──┬──┘ └──┬───┘ └───┬───┘ └──┬───┘ │
│ │ │ │ │ │ │
│ └───────┴───────┴────┬────┴─────────┘ │
│ ▼ │
│ Entity Ontology + Reports │
│ (PostgreSQL + Neo4j) │
└──────────────────┬─────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ rule_evaluation phase (DynamicComplianceWorkflow) │
│ │
│ 1. OntologyMatrixMapper.build_input() │
│ ├── Reads entities from entity_repo │
│ ├── Reads reports from report_repo │
│ └── Reads risk_indicators from risk_repo │
│ │
│ 2. RiskMatrixScorer.evaluate(input_data, matrix_schema_id) │
│ ├── Scores each factor within each dimension │
│ ├── Normalizes dimension scores to 0–100 │
│ ├── Aggregates to overall score │
│ └── Computes input_hash + override_hash + output_hash │
│ │
│ 3. Store risk_matrix_evaluations record │
│ 4. Create/update company_matrix_assignments │
│ 5. Return scores to workflow for routing │
└──────────────────────────────────────────────────┘
Idempotency Guarantee
The system guarantees idempotency at three levels:
-
Evaluation level. The
evaluation_fingerprintis a SHA-256 overmatrix_schema_id + input_hash + override_hash. A unique database index enforces one evaluation row per exact scoring request, and repository inserts must use a transaction / upsert pattern to collapse concurrent retries. -
Input/output proof level. The
input_hashcaptures canonicalized source inputs, theoverride_hashcaptures canonicalized analyst overrides, and theoutput_hashcaptures the resulting scores. On re-evaluation, a mismatch indicates either a bug in the scorer or data corruption. -
Temporal level. The workflow engine's
rule_evaluationactivity is implemented as a Temporal activity withidempotency_key = f"{investigation_id}:{matrix_schema_id}:{override_hash}". Temporal deduplicates activity executions with the same key.
Migration from Current Risk Scoring
The existing risk_scoring.py and its flat 0–100 score remain untouched. The risk matrix is additive:
- Existing investigations keep their flat
risk_scoreandrisk_level. - New investigations (via workflow engine) produce both the flat score (for backward compatibility) and the full matrix evaluation.
- The flat score is derived from the matrix's
overall_scoreso they stay in sync. - APIs that return
risk_levelcontinue to work; the matrix detail is available via the new/risk-matrix/endpoints.
Rollback Strategy
Same pattern as ADR-007 — fully reversible, zero impact on existing functionality:
- Remove router registration from
main.py. - Drop new tables:
DROP TABLE company_matrix_assignments, risk_matrix_evaluations, risk_matrix_schemas CASCADE. - Remove
src/risk_matrix/directory. - Remove
config/risk_matrix_schemas/directory.
Existing investigations, entities, risk indicators, and the flat risk score are completely unaffected.
Effort Estimate
| Component | Estimate | Dependencies |
|---|---|---|
| Database migrations (V56) | 0.5 weeks | None |
| RiskMatrixSchemaCache (YAML loader + validation) | 1 week | Migrations |
| RiskMatrixScorer (deterministic engine) | 1.5 weeks | Schema loader |
| OntologyMatrixMapper (entity → input) | 1.5 weeks | Scorer |
| MatrixVersionManager (versioning + binding) | 1 week | Mapper, Scorer |
| Country risk list reference data | 0.5 weeks | None |
| Repository layer | 1 week | Migrations |
| API endpoints (router) | 1 week | Repository |
| Temporal activity integration | 0.5 weeks | Scorer, ADR-007 Phase 2 |
| Default EBA matrix schema (YAML) | 1 week | Schema loader |
| Testing (unit + integration) | 2.5 weeks | All components |
| Total | 12 weeks |
The scorer and mapper (3 weeks combined) can begin immediately. The workflow integration (0.5 weeks) depends on ADR-007 Phase 2 delivering the rule_evaluation phase type. The expanded testing allowance covers determinism edge cases, hash stability, publish-time validation, and Temporal retry/idempotency scenarios.
Success Criteria
| Criterion | Target | Measurement |
|---|---|---|
| Same input + same matrix version = identical output | 100% | Automated test suite with hash verification |
| Published matrix cannot be modified | 100% | DB trigger + API-level validation |
| Only one published version exists per schema line | 100% | Partial unique index on status = 'published' |
| Every evaluation links to exact matrix version | 100% | Foreign key + snapshot column |
| Reference datasets are frozen per published version | 100% | reference_data_snapshot persisted and replay-tested |
| Company assignment history is temporally queryable | Yes | effective_from/effective_until range queries |
| Matrix schema validates against JSON Schema | 100% | Schema validation on load |
| Scoring completes in < 500ms per company | 95th percentile | Activity execution metrics |
| Ontology mapper extracts all mapped fields correctly | > 95% | Integration tests against sample investigations |
| Analyst override is recorded with full audit trail | 100% | Derived evaluation row + override JSONB + supersession chain |