ADR-017: Multi-Source Disagreement Detection & Missing Data Enforcement
Status: Draft (not published) Date: 2026-03-31 Author: Atlas Architecture Depends on: ADR-010 (Ontology Lifecycle), ADR-013 (Analyst Interaction Layer) Impacts: Mutation Queue pipeline, Conflict Detection, Evaluation Engine, Analyst Dashboard
Table of Contents
- Context & Problem Statement
- Decision Summary
- Gap Analysis
- Multi-Source Disagreement Detection
- Missing Data Enforcement
- Integration with Existing Pipeline
- Data Model Changes
- API Surface
- Migration Path
- Rejected Alternatives
- Open Questions
1. Context & Problem Statement
The core value proposition of connecting multiple data sources to a single ontology field is cross-validation: if two independent providers agree on a customer's incorporation date, that strengthens confidence. If they disagree, an analyst needs to investigate before the data enters the compliance record.
The current mutation pipeline (ADR-010, Section 8) processes mutations sequentially per source. Each incoming value is compared against the current provenance record (the last accepted value). This design handles the common case well but has two critical gaps for a compliance product.
Gap 1: Multi-source disagreement is invisible
When Source A writes incorporation_date = 2019-03-15 and Source B later writes incorporation_date = 2019-04-01, the conflict detector sees "proposed value differs from current value" and fires a conflict. But the system does not explicitly model that this is a source-to-source disagreement. It looks identical to a single source updating its own previously-submitted value.
This matters because the compliance response should be different. A source updating its own prior value is routine (data correction). Two independent sources providing contradictory values is a data integrity concern that may require investigation across both providers.
The current ConflictRecord captures current_source and incoming_source, so an analyst can see who disagrees. But there is no automated detection that says "these N sources provided different values for this field." The system cannot answer: "For entity X, field Y, how many sources agree vs. disagree?" without reconstructing it from mutation history.
Gap 2: Missing data on required fields has no runtime consequence
The is_required flag on field configurations is only consumed by the evaluation engine (evaluation.py). When a required field has no data from any source, the evaluation run reports it as required_missing in the coverage metrics. But the runtime pipeline (mutation processor, conflict handler) does not enforce it.
This means a schema version can be published and used in production with required fields that have zero coverage, and no alert is raised until the next manual evaluation run. For AML/KYC compliance, a missing required field (e.g., date_of_birth for an individual, registration_number for a company) should trigger a proactive data-gap alert, not wait for a scheduled evaluation.
2. Decision Summary
Add two new capabilities to the mutation pipeline, both running as post-batch hooks after all mutations in a batch have been processed:
-
Disagreement Detector: After a batch completes, compare all source values for each affected field. If two or more sources have provided different values and the field's conflict_response is not
accept_trusted, flag a multi-source disagreement that triggers investigation. -
Missing Data Monitor: After a batch completes, check all
is_requiredfields for the affected entity types. If any required field has no provenance record (no source has ever provided a value), create a data-gap alert.
Both hooks are append-only: they create new records (disagreement alerts, data-gap alerts) without modifying existing provenance or mutation records. They run after the per-mutation pipeline has completed its work, so they cannot interfere with the existing merge/conflict flow.
3. Gap Analysis
What the current pipeline does well
The existing per-mutation flow handles:
- Sequential conflict detection (
conflict_detector.py): catches when a new value differs from the current provenance value, including frozen field protection. - Trust-based resolution (
merge_executor.py): thehighest_truststrategy correctly selects the most trusted source's value when multiple sources have written to the same field. - Conflict response handlers (
conflict_handler.py):accept_trustedauto-resolves with logging,flag_reviewcreates analyst tasks,freeze_investigatefreezes the field and triggers investigation. - Provenance tracking: every accepted value records its source, trust score, and mutation ID.
What is missing
| Capability | Current State | Gap |
|---|---|---|
| Source-vs-source comparison | Only proposed-vs-current (sequential) | No batch-level cross-source view |
| Disagreement classification | All conflicts look the same | Cannot distinguish "source updated itself" from "two sources disagree" |
| Disagreement resolution tracking | ConflictRecord tracks pairwise resolution | No aggregate "all sources now agree" signal |
| Required field enforcement at runtime | is_required only used in evaluation metrics | No alert when batch completes with required fields still empty |
| Data-gap alerting | Manual evaluation runs only | No proactive notification to analysts |
4. Multi-Source Disagreement Detection
4.1 When it runs
After all mutations in a batch have been processed (merged, conflict-detected, provenance-upserted), the disagreement detector runs as a post-batch hook inside MutationProcessor.process_batch().
4.2 What it checks
For each unique (entity_id, field_path) touched in the batch:
- Query all mutations for this field across all sources in the batch, plus the current provenance record.
- Group distinct values by source. Each source contributes its latest proposed value (or the provenance value if the source wasn't in this batch but has a prior provenance record).
- If two or more sources have provided non-null values and those values differ, this is a multi-source disagreement.
4.3 Disagreement vs. conflict
A conflict (existing ConflictRecord) is a per-mutation event: "this specific incoming value differs from the current value." A disagreement is a field-level state: "the sources that have contributed to this field do not agree on its value."
One disagreement can be caused by multiple conflicts, or by zero conflicts (if the merge strategy auto-resolved each mutation but the underlying source values still differ). The disagreement detector looks at the final state, not the per-mutation events.
4.4 Resolution semantics
A disagreement is resolved when one of these conditions is met:
- All sources agree (a subsequent batch brings the values into alignment).
- An analyst manually resolves the disagreement, choosing a canonical value and recording justification.
- The field is frozen for investigation, which implicitly acknowledges the disagreement and blocks further changes until resolved.
4.5 Trust-weighted disagreement severity
Not all disagreements are equal. If a source with trust 0.95 says one thing and a source with trust 0.3 says another, the system already accepted the high-trust value via highest_trust strategy. The disagreement still exists, but its severity is lower because the accepted value comes from the most reliable source.
Severity levels:
- critical: Two or more sources with trust >= 0.7 disagree. Both are considered reliable, so the contradiction demands investigation.
- high: The accepted (highest-trust) source has trust >= 0.7, but a source with trust 0.4-0.69 disagrees. Worth reviewing but less urgent.
- medium: All disagreeing sources have trust < 0.7. The data quality is uncertain across the board.
- low: Only one source has trust >= 0.4, and the disagreeing source has trust < 0.4. Likely a low-quality source providing noise.
The severity thresholds are configurable per tenant.
4.6 Algorithm
class DisagreementDetector:
"""Post-batch hook: detect multi-source disagreements for affected fields."""
async def detect(
self,
batch_id: UUID,
affected_fields: list[tuple[UUID, str]], # (entity_id, field_path)
provenance_repo: ProvenanceRepo,
mutation_repo: MutationRepo,
) -> list[DisagreementRecord]:
disagreements = []
for entity_id, field_path in affected_fields:
# Collect all source values: current provenance + batch mutations
source_values: dict[str, SourceSnapshot] = {}
# 1. Current provenance (the "winner" from merge resolution)
prov = await provenance_repo.get_by_subject_key(
FieldProvenanceRecord.compute_subject_key(
"entity", entity_id, None, field_path
)
)
if prov and prov.current_value is not None:
source_values[prov.source_id] = SourceSnapshot(
value=prov.current_value,
trust=prov.source_trust,
received_at=prov.created_at,
)
# 2. All mutations in this batch for this field
# (includes rejected/overridden values from lower-trust sources)
mutations = await mutation_repo.get_by_batch_and_field(
batch_id, entity_id, field_path
)
for m in mutations:
source_values[m.source_id] = SourceSnapshot(
value=m.proposed_value,
trust=m.source_trust,
received_at=m.created_at,
)
# 3. Check for disagreement
unique_values = {sv.value for sv in source_values.values()}
if len(unique_values) > 1:
severity = self._compute_severity(source_values)
disagreements.append(
DisagreementRecord(
entity_id=entity_id,
field_path=field_path,
source_snapshots=source_values,
severity=severity,
accepted_value=prov.current_value if prov else None,
accepted_source=prov.source_id if prov else None,
)
)
return disagreements
4.7 Relationship to existing conflict flow
The disagreement detector does not replace the existing conflict detector. They serve different purposes:
ConflictDetector.detect()runs per-mutation, inline in the pipeline. It decides whether to trigger conflict_response handling (accept_trusted, flag_review, freeze_investigate) for that specific mutation.DisagreementDetector.detect()runs post-batch, outside the per-mutation loop. It provides a field-level view of source agreement that feeds the analyst dashboard and compliance reporting.
If a disagreement is detected and the field's conflict_response is freeze_investigate, the existing per-mutation conflict handler will have already frozen the field. The disagreement record provides additional context: which sources disagree, their trust levels, and severity classification.
5. Missing Data Enforcement
5.1 When it runs
After a batch completes, the missing data monitor checks whether required fields for the affected entity types have coverage.
5.2 What it checks
For each entity touched in the batch:
- Load the published schema version's field configurations for that entity type.
- Filter to fields where
is_required = True. - For each required field, check if a provenance record exists with a non-null
current_value. - If not, create a
DataGapAlert.
5.3 Alert lifecycle
A data-gap alert is created when a required field has no value after a batch processes. It is auto-resolved when a subsequent batch provides a value for that field. Analysts can also manually dismiss an alert with justification (e.g., "field not applicable for this entity subtype").
5.4 Alert vs. evaluation
The existing evaluation engine runs on-demand and produces a comprehensive coverage report across all entities and all fields. The missing data monitor is complementary: it runs automatically after every batch and only flags required fields with zero coverage for entities that were just processed.
| Concern | Evaluation Engine | Missing Data Monitor |
|---|---|---|
| Trigger | Manual / scheduled | Automatic per-batch |
| Scope | All entities, all fields | Entities in batch, required fields only |
| Output | Coverage metrics, scores | Actionable alerts with analyst routing |
| Latency | Minutes (full scan) | Milliseconds (targeted check) |
5.5 Algorithm
class MissingDataMonitor:
"""Post-batch hook: check required field coverage for affected entities."""
async def check(
self,
entity_ids: set[UUID],
schema_version_id: UUID,
field_config_repo: FieldConfigRepo,
provenance_repo: ProvenanceRepo,
) -> list[DataGapAlert]:
alerts = []
configs = await field_config_repo.get_required_fields(schema_version_id)
for entity_id in entity_ids:
for cfg in configs:
if cfg["entity_type"] != entity_type_for(entity_id):
continue
subject_key = FieldProvenanceRecord.compute_subject_key(
"entity", entity_id, None, cfg["field_name"]
)
prov = await provenance_repo.get_by_subject_key(subject_key)
if prov is None or prov.current_value is None:
alerts.append(
DataGapAlert(
entity_id=entity_id,
field_path=cfg["field_name"],
entity_type=cfg["entity_type"],
severity="critical" if cfg.get("is_identity_key") else "high",
schema_version_id=schema_version_id,
)
)
return alerts
5.6 Severity for missing data
- critical: The field is both
is_requiredandis_identity_key. Identity fields (name, registration number, date of birth) are essential for entity resolution and KYC. - high: The field is
is_requiredbut not an identity key. Important for compliance but not for entity resolution.
6. Integration with Existing Pipeline
6.1 Hook point in MutationProcessor
The post-batch hooks run after the per-mutation loop in process_provider_batch() completes, before the method returns its result summary.
Current flow (processor.py):
for mutation in mutations:
provenance lookup -> merge resolve -> conflict detect -> handle -> enqueue -> provenance upsert
return result
New flow:
for mutation in mutations:
provenance lookup -> merge resolve -> conflict detect -> handle -> enqueue -> provenance upsert
# Post-batch hooks (new)
disagreements = await disagreement_detector.detect(batch_id, affected_fields, ...)
data_gaps = await missing_data_monitor.check(entity_ids, schema_version_id, ...)
# Persist and route
await disagreement_repo.bulk_create(disagreements)
await data_gap_repo.bulk_create_or_update(data_gaps)
await alert_router.route(disagreements, data_gaps)
return result
6.2 Alert routing
Both disagreement records and data-gap alerts feed into the existing analyst task system (ADR-013). The alert router determines:
- Whether to create a new review task or append to an existing investigation.
- Which analyst role receives the task (based on entity type and field sensitivity).
- Whether to send a real-time notification (WebSocket push to the Analyst Dashboard).
6.3 Performance considerations
The post-batch hooks add queries proportional to the number of unique (entity, field) pairs in the batch, not the total mutation count. For a typical batch of 50-200 mutations across 10-30 entities, this adds 30-100 provenance lookups. These are indexed by subject_key and complete in under 1ms each.
The missing data check loads field configs once per schema version (cached in the processor) and does one provenance lookup per required field per entity. With 15-20 required fields and 10-30 entities, this is 150-600 lookups, still well under 1 second.
7. Data Model Changes
7.1 New table: source_disagreements
CREATE TABLE source_disagreements (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
entity_id UUID NOT NULL,
field_path TEXT NOT NULL,
batch_id UUID REFERENCES mutation_batches(id),
severity TEXT NOT NULL CHECK (severity IN ('critical', 'high', 'medium', 'low')),
source_snapshots JSONB NOT NULL, -- { source_id: { value, trust, received_at } }
accepted_value JSONB,
accepted_source TEXT,
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'resolved', 'dismissed')),
resolved_at TIMESTAMPTZ,
resolved_by TEXT,
resolved_value JSONB,
resolution_justification TEXT,
investigation_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (tenant_id, entity_id, field_path, status)
WHERE status = 'open'
);
CREATE INDEX idx_disagreements_entity ON source_disagreements (entity_id, field_path);
CREATE INDEX idx_disagreements_status ON source_disagreements (tenant_id, status, severity);
7.2 New table: data_gap_alerts
CREATE TABLE data_gap_alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
entity_id UUID NOT NULL,
entity_type TEXT NOT NULL,
field_path TEXT NOT NULL,
schema_version_id UUID NOT NULL REFERENCES ontology_schema_versions(id),
severity TEXT NOT NULL CHECK (severity IN ('critical', 'high')),
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'resolved', 'dismissed')),
resolved_at TIMESTAMPTZ,
resolved_by TEXT,
dismiss_justification TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (tenant_id, entity_id, field_path, status)
WHERE status = 'open'
);
CREATE INDEX idx_data_gaps_entity ON data_gap_alerts (entity_id);
CREATE INDEX idx_data_gaps_status ON data_gap_alerts (tenant_id, status, severity);
7.3 Partial unique indexes
Both tables use partial unique indexes on status = 'open' to ensure only one open disagreement or data-gap alert exists per entity-field pair. When resolved, a new disagreement for the same field can be created without violating the constraint.
8. API Surface
8.1 Disagreements
GET /api/disagreements?entity_id={id}&status=open
GET /api/disagreements/{id}
POST /api/disagreements/{id}/resolve
Body: { resolved_value, resolution_justification, resolved_by }
POST /api/disagreements/{id}/dismiss
Body: { justification, dismissed_by }
8.2 Data Gap Alerts
GET /api/data-gaps?entity_id={id}&status=open
GET /api/data-gaps?entity_type={type}&status=open&severity=critical
POST /api/data-gaps/{id}/dismiss
Body: { justification, dismissed_by }
Data-gap alerts are auto-resolved when a subsequent mutation provides a value for the field, so there is no manual resolve endpoint.
8.3 Dashboard aggregates
GET /api/compliance-health?tenant_id={id}
Response: {
open_disagreements: { critical: N, high: N, medium: N, low: N },
open_data_gaps: { critical: N, high: N },
entities_with_issues: N,
fields_fully_covered: N,
fields_total: N
}
9. Migration Path
Phase 1: Data model and detection (backend)
- Create migration for
source_disagreementsanddata_gap_alertstables. - Implement
DisagreementDetectorandMissingDataMonitorclasses. - Add repository classes for both tables.
- Wire post-batch hooks into
MutationProcessor.process_batch(). - Add API endpoints for querying and resolving.
Phase 2: Alert routing and notifications
- Extend the existing review task system to accept disagreement and data-gap sources.
- Add WebSocket notifications for real-time analyst alerts.
- Wire alert routing into the post-batch hook chain.
Phase 3: Analyst Dashboard integration (frontend)
- Add a "Data Quality" panel to the Analyst Dashboard showing open disagreements and data gaps.
- Disagreement detail view: show all source values side by side with trust scores, allow analyst to pick the canonical value.
- Data-gap view: show required fields with no coverage, grouped by entity, with links to trigger data refresh from specific sources.
Phase 4: Compliance reporting
- Add disagreement and data-gap history to the compliance audit report.
- Include time-to-resolution metrics for disagreements.
- Add data-gap trend analysis (are required fields getting coverage over time?).
Estimated effort
Phase 1: 2-3 days. Phase 2: 1-2 days. Phase 3: 2-3 days. Phase 4: 1-2 days. Total: ~8 days.
10. Rejected Alternatives
Inline disagreement detection (per-mutation)
Checking all source values inside the per-mutation loop would require loading provenance history for every mutation, turning an O(1) provenance lookup into an O(S) query where S is the number of sources. This adds latency to every mutation and conflates per-mutation conflict handling with field-level disagreement analysis. Post-batch detection keeps the per-mutation path fast and separates concerns.
Blocking mutations on missing required fields
An alternative design would reject mutations for an entity if required fields are still missing, forcing data providers to supply complete records. Rejected because: (a) data providers often supply partial records that get enriched over time, (b) blocking mutations would prevent any data from entering the system until all required fields are available, creating a chicken-and-egg problem, and (c) the compliance requirement is visibility into gaps, not blocking data flow.
Extending ConflictRecord for disagreements
Rather than a new table, we could add a disagreement_type column to conflict_records. Rejected because conflicts and disagreements have different lifecycles (per-mutation vs. per-field), different resolution semantics (one resolver vs. aggregate consensus), and different query patterns (dashboard aggregates vs. investigation detail). Combining them would complicate queries and make the model harder to reason about.
Relying on evaluation runs for gap detection
The evaluation engine already reports required_missing fields. We could make evaluations run automatically after each batch. Rejected because evaluation runs are heavyweight (they scan all entities, not just the batch), and the existing evaluation model produces metrics, not actionable alerts. The missing data monitor is a lightweight, targeted check that feeds directly into the analyst task system.
11. Open Questions
-
Should disagreement detection consider historical values? Currently it only looks at the latest value per source. If Source A previously agreed with Source B but then updated its value, the disagreement is between Source A's new value and Source B's old value. Should the system track this progression?
-
Should data-gap alerts be scoped to published schema versions only? Draft schemas may have required fields that are aspirational. Alerting on them would create noise. Current proposal: only enforce required fields from the published schema version.
-
Should the disagreement severity thresholds be global or per-field? Some fields (e.g.,
beneficial_owner_name) may warrant a higher sensitivity than others (e.g.,phone_number). Per-field thresholds add configuration complexity. -
Should the system auto-dismiss data-gap alerts when a schema version is archived? If the schema is no longer active, its required fields are moot. But audit trail requirements may demand keeping the historical record.
-
Batch size limits for post-batch hooks. If a batch contains 10,000 mutations across 5,000 entities, the post-batch hooks could be expensive. Should there be a threshold above which the hooks run asynchronously via a background job instead of inline?