Skip to main content

ADR-017: Multi-Source Disagreement Detection & Missing Data Enforcement

Status: Draft (not published) Date: 2026-03-31 Author: Atlas Architecture Depends on: ADR-010 (Ontology Lifecycle), ADR-013 (Analyst Interaction Layer) Impacts: Mutation Queue pipeline, Conflict Detection, Evaluation Engine, Analyst Dashboard


Table of Contents

  1. Context & Problem Statement
  2. Decision Summary
  3. Gap Analysis
  4. Multi-Source Disagreement Detection
  5. Missing Data Enforcement
  6. Integration with Existing Pipeline
  7. Data Model Changes
  8. API Surface
  9. Migration Path
  10. Rejected Alternatives
  11. Open Questions

1. Context & Problem Statement

The core value proposition of connecting multiple data sources to a single ontology field is cross-validation: if two independent providers agree on a customer's incorporation date, that strengthens confidence. If they disagree, an analyst needs to investigate before the data enters the compliance record.

The current mutation pipeline (ADR-010, Section 8) processes mutations sequentially per source. Each incoming value is compared against the current provenance record (the last accepted value). This design handles the common case well but has two critical gaps for a compliance product.

Gap 1: Multi-source disagreement is invisible

When Source A writes incorporation_date = 2019-03-15 and Source B later writes incorporation_date = 2019-04-01, the conflict detector sees "proposed value differs from current value" and fires a conflict. But the system does not explicitly model that this is a source-to-source disagreement. It looks identical to a single source updating its own previously-submitted value.

This matters because the compliance response should be different. A source updating its own prior value is routine (data correction). Two independent sources providing contradictory values is a data integrity concern that may require investigation across both providers.

The current ConflictRecord captures current_source and incoming_source, so an analyst can see who disagrees. But there is no automated detection that says "these N sources provided different values for this field." The system cannot answer: "For entity X, field Y, how many sources agree vs. disagree?" without reconstructing it from mutation history.

Gap 2: Missing data on required fields has no runtime consequence

The is_required flag on field configurations is only consumed by the evaluation engine (evaluation.py). When a required field has no data from any source, the evaluation run reports it as required_missing in the coverage metrics. But the runtime pipeline (mutation processor, conflict handler) does not enforce it.

This means a schema version can be published and used in production with required fields that have zero coverage, and no alert is raised until the next manual evaluation run. For AML/KYC compliance, a missing required field (e.g., date_of_birth for an individual, registration_number for a company) should trigger a proactive data-gap alert, not wait for a scheduled evaluation.


2. Decision Summary

Add two new capabilities to the mutation pipeline, both running as post-batch hooks after all mutations in a batch have been processed:

  1. Disagreement Detector: After a batch completes, compare all source values for each affected field. If two or more sources have provided different values and the field's conflict_response is not accept_trusted, flag a multi-source disagreement that triggers investigation.

  2. Missing Data Monitor: After a batch completes, check all is_required fields for the affected entity types. If any required field has no provenance record (no source has ever provided a value), create a data-gap alert.

Both hooks are append-only: they create new records (disagreement alerts, data-gap alerts) without modifying existing provenance or mutation records. They run after the per-mutation pipeline has completed its work, so they cannot interfere with the existing merge/conflict flow.


3. Gap Analysis

What the current pipeline does well

The existing per-mutation flow handles:

  • Sequential conflict detection (conflict_detector.py): catches when a new value differs from the current provenance value, including frozen field protection.
  • Trust-based resolution (merge_executor.py): the highest_trust strategy correctly selects the most trusted source's value when multiple sources have written to the same field.
  • Conflict response handlers (conflict_handler.py): accept_trusted auto-resolves with logging, flag_review creates analyst tasks, freeze_investigate freezes the field and triggers investigation.
  • Provenance tracking: every accepted value records its source, trust score, and mutation ID.

What is missing

CapabilityCurrent StateGap
Source-vs-source comparisonOnly proposed-vs-current (sequential)No batch-level cross-source view
Disagreement classificationAll conflicts look the sameCannot distinguish "source updated itself" from "two sources disagree"
Disagreement resolution trackingConflictRecord tracks pairwise resolutionNo aggregate "all sources now agree" signal
Required field enforcement at runtimeis_required only used in evaluation metricsNo alert when batch completes with required fields still empty
Data-gap alertingManual evaluation runs onlyNo proactive notification to analysts

4. Multi-Source Disagreement Detection

4.1 When it runs

After all mutations in a batch have been processed (merged, conflict-detected, provenance-upserted), the disagreement detector runs as a post-batch hook inside MutationProcessor.process_batch().

4.2 What it checks

For each unique (entity_id, field_path) touched in the batch:

  1. Query all mutations for this field across all sources in the batch, plus the current provenance record.
  2. Group distinct values by source. Each source contributes its latest proposed value (or the provenance value if the source wasn't in this batch but has a prior provenance record).
  3. If two or more sources have provided non-null values and those values differ, this is a multi-source disagreement.

4.3 Disagreement vs. conflict

A conflict (existing ConflictRecord) is a per-mutation event: "this specific incoming value differs from the current value." A disagreement is a field-level state: "the sources that have contributed to this field do not agree on its value."

One disagreement can be caused by multiple conflicts, or by zero conflicts (if the merge strategy auto-resolved each mutation but the underlying source values still differ). The disagreement detector looks at the final state, not the per-mutation events.

4.4 Resolution semantics

A disagreement is resolved when one of these conditions is met:

  • All sources agree (a subsequent batch brings the values into alignment).
  • An analyst manually resolves the disagreement, choosing a canonical value and recording justification.
  • The field is frozen for investigation, which implicitly acknowledges the disagreement and blocks further changes until resolved.

4.5 Trust-weighted disagreement severity

Not all disagreements are equal. If a source with trust 0.95 says one thing and a source with trust 0.3 says another, the system already accepted the high-trust value via highest_trust strategy. The disagreement still exists, but its severity is lower because the accepted value comes from the most reliable source.

Severity levels:

  • critical: Two or more sources with trust >= 0.7 disagree. Both are considered reliable, so the contradiction demands investigation.
  • high: The accepted (highest-trust) source has trust >= 0.7, but a source with trust 0.4-0.69 disagrees. Worth reviewing but less urgent.
  • medium: All disagreeing sources have trust < 0.7. The data quality is uncertain across the board.
  • low: Only one source has trust >= 0.4, and the disagreeing source has trust < 0.4. Likely a low-quality source providing noise.

The severity thresholds are configurable per tenant.

4.6 Algorithm

class DisagreementDetector:
"""Post-batch hook: detect multi-source disagreements for affected fields."""

async def detect(
self,
batch_id: UUID,
affected_fields: list[tuple[UUID, str]], # (entity_id, field_path)
provenance_repo: ProvenanceRepo,
mutation_repo: MutationRepo,
) -> list[DisagreementRecord]:

disagreements = []

for entity_id, field_path in affected_fields:
# Collect all source values: current provenance + batch mutations
source_values: dict[str, SourceSnapshot] = {}

# 1. Current provenance (the "winner" from merge resolution)
prov = await provenance_repo.get_by_subject_key(
FieldProvenanceRecord.compute_subject_key(
"entity", entity_id, None, field_path
)
)
if prov and prov.current_value is not None:
source_values[prov.source_id] = SourceSnapshot(
value=prov.current_value,
trust=prov.source_trust,
received_at=prov.created_at,
)

# 2. All mutations in this batch for this field
# (includes rejected/overridden values from lower-trust sources)
mutations = await mutation_repo.get_by_batch_and_field(
batch_id, entity_id, field_path
)
for m in mutations:
source_values[m.source_id] = SourceSnapshot(
value=m.proposed_value,
trust=m.source_trust,
received_at=m.created_at,
)

# 3. Check for disagreement
unique_values = {sv.value for sv in source_values.values()}
if len(unique_values) > 1:
severity = self._compute_severity(source_values)
disagreements.append(
DisagreementRecord(
entity_id=entity_id,
field_path=field_path,
source_snapshots=source_values,
severity=severity,
accepted_value=prov.current_value if prov else None,
accepted_source=prov.source_id if prov else None,
)
)

return disagreements

4.7 Relationship to existing conflict flow

The disagreement detector does not replace the existing conflict detector. They serve different purposes:

  • ConflictDetector.detect() runs per-mutation, inline in the pipeline. It decides whether to trigger conflict_response handling (accept_trusted, flag_review, freeze_investigate) for that specific mutation.
  • DisagreementDetector.detect() runs post-batch, outside the per-mutation loop. It provides a field-level view of source agreement that feeds the analyst dashboard and compliance reporting.

If a disagreement is detected and the field's conflict_response is freeze_investigate, the existing per-mutation conflict handler will have already frozen the field. The disagreement record provides additional context: which sources disagree, their trust levels, and severity classification.


5. Missing Data Enforcement

5.1 When it runs

After a batch completes, the missing data monitor checks whether required fields for the affected entity types have coverage.

5.2 What it checks

For each entity touched in the batch:

  1. Load the published schema version's field configurations for that entity type.
  2. Filter to fields where is_required = True.
  3. For each required field, check if a provenance record exists with a non-null current_value.
  4. If not, create a DataGapAlert.

5.3 Alert lifecycle

A data-gap alert is created when a required field has no value after a batch processes. It is auto-resolved when a subsequent batch provides a value for that field. Analysts can also manually dismiss an alert with justification (e.g., "field not applicable for this entity subtype").

5.4 Alert vs. evaluation

The existing evaluation engine runs on-demand and produces a comprehensive coverage report across all entities and all fields. The missing data monitor is complementary: it runs automatically after every batch and only flags required fields with zero coverage for entities that were just processed.

ConcernEvaluation EngineMissing Data Monitor
TriggerManual / scheduledAutomatic per-batch
ScopeAll entities, all fieldsEntities in batch, required fields only
OutputCoverage metrics, scoresActionable alerts with analyst routing
LatencyMinutes (full scan)Milliseconds (targeted check)

5.5 Algorithm

class MissingDataMonitor:
"""Post-batch hook: check required field coverage for affected entities."""

async def check(
self,
entity_ids: set[UUID],
schema_version_id: UUID,
field_config_repo: FieldConfigRepo,
provenance_repo: ProvenanceRepo,
) -> list[DataGapAlert]:

alerts = []
configs = await field_config_repo.get_required_fields(schema_version_id)

for entity_id in entity_ids:
for cfg in configs:
if cfg["entity_type"] != entity_type_for(entity_id):
continue

subject_key = FieldProvenanceRecord.compute_subject_key(
"entity", entity_id, None, cfg["field_name"]
)
prov = await provenance_repo.get_by_subject_key(subject_key)

if prov is None or prov.current_value is None:
alerts.append(
DataGapAlert(
entity_id=entity_id,
field_path=cfg["field_name"],
entity_type=cfg["entity_type"],
severity="critical" if cfg.get("is_identity_key") else "high",
schema_version_id=schema_version_id,
)
)

return alerts

5.6 Severity for missing data

  • critical: The field is both is_required and is_identity_key. Identity fields (name, registration number, date of birth) are essential for entity resolution and KYC.
  • high: The field is is_required but not an identity key. Important for compliance but not for entity resolution.

6. Integration with Existing Pipeline

6.1 Hook point in MutationProcessor

The post-batch hooks run after the per-mutation loop in process_provider_batch() completes, before the method returns its result summary.

Current flow (processor.py):
for mutation in mutations:
provenance lookup -> merge resolve -> conflict detect -> handle -> enqueue -> provenance upsert
return result

New flow:
for mutation in mutations:
provenance lookup -> merge resolve -> conflict detect -> handle -> enqueue -> provenance upsert

# Post-batch hooks (new)
disagreements = await disagreement_detector.detect(batch_id, affected_fields, ...)
data_gaps = await missing_data_monitor.check(entity_ids, schema_version_id, ...)

# Persist and route
await disagreement_repo.bulk_create(disagreements)
await data_gap_repo.bulk_create_or_update(data_gaps)
await alert_router.route(disagreements, data_gaps)

return result

6.2 Alert routing

Both disagreement records and data-gap alerts feed into the existing analyst task system (ADR-013). The alert router determines:

  • Whether to create a new review task or append to an existing investigation.
  • Which analyst role receives the task (based on entity type and field sensitivity).
  • Whether to send a real-time notification (WebSocket push to the Analyst Dashboard).

6.3 Performance considerations

The post-batch hooks add queries proportional to the number of unique (entity, field) pairs in the batch, not the total mutation count. For a typical batch of 50-200 mutations across 10-30 entities, this adds 30-100 provenance lookups. These are indexed by subject_key and complete in under 1ms each.

The missing data check loads field configs once per schema version (cached in the processor) and does one provenance lookup per required field per entity. With 15-20 required fields and 10-30 entities, this is 150-600 lookups, still well under 1 second.


7. Data Model Changes

7.1 New table: source_disagreements

CREATE TABLE source_disagreements (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
entity_id UUID NOT NULL,
field_path TEXT NOT NULL,
batch_id UUID REFERENCES mutation_batches(id),
severity TEXT NOT NULL CHECK (severity IN ('critical', 'high', 'medium', 'low')),
source_snapshots JSONB NOT NULL, -- { source_id: { value, trust, received_at } }
accepted_value JSONB,
accepted_source TEXT,
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'resolved', 'dismissed')),
resolved_at TIMESTAMPTZ,
resolved_by TEXT,
resolved_value JSONB,
resolution_justification TEXT,
investigation_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),

UNIQUE (tenant_id, entity_id, field_path, status)
WHERE status = 'open'
);

CREATE INDEX idx_disagreements_entity ON source_disagreements (entity_id, field_path);
CREATE INDEX idx_disagreements_status ON source_disagreements (tenant_id, status, severity);

7.2 New table: data_gap_alerts

CREATE TABLE data_gap_alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
entity_id UUID NOT NULL,
entity_type TEXT NOT NULL,
field_path TEXT NOT NULL,
schema_version_id UUID NOT NULL REFERENCES ontology_schema_versions(id),
severity TEXT NOT NULL CHECK (severity IN ('critical', 'high')),
status TEXT NOT NULL DEFAULT 'open' CHECK (status IN ('open', 'resolved', 'dismissed')),
resolved_at TIMESTAMPTZ,
resolved_by TEXT,
dismiss_justification TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),

UNIQUE (tenant_id, entity_id, field_path, status)
WHERE status = 'open'
);

CREATE INDEX idx_data_gaps_entity ON data_gap_alerts (entity_id);
CREATE INDEX idx_data_gaps_status ON data_gap_alerts (tenant_id, status, severity);

7.3 Partial unique indexes

Both tables use partial unique indexes on status = 'open' to ensure only one open disagreement or data-gap alert exists per entity-field pair. When resolved, a new disagreement for the same field can be created without violating the constraint.


8. API Surface

8.1 Disagreements

GET /api/disagreements?entity_id={id}&status=open
GET /api/disagreements/{id}
POST /api/disagreements/{id}/resolve
Body: { resolved_value, resolution_justification, resolved_by }
POST /api/disagreements/{id}/dismiss
Body: { justification, dismissed_by }

8.2 Data Gap Alerts

GET /api/data-gaps?entity_id={id}&status=open
GET /api/data-gaps?entity_type={type}&status=open&severity=critical
POST /api/data-gaps/{id}/dismiss
Body: { justification, dismissed_by }

Data-gap alerts are auto-resolved when a subsequent mutation provides a value for the field, so there is no manual resolve endpoint.

8.3 Dashboard aggregates

GET /api/compliance-health?tenant_id={id}
Response: {
open_disagreements: { critical: N, high: N, medium: N, low: N },
open_data_gaps: { critical: N, high: N },
entities_with_issues: N,
fields_fully_covered: N,
fields_total: N
}

9. Migration Path

Phase 1: Data model and detection (backend)

  1. Create migration for source_disagreements and data_gap_alerts tables.
  2. Implement DisagreementDetector and MissingDataMonitor classes.
  3. Add repository classes for both tables.
  4. Wire post-batch hooks into MutationProcessor.process_batch().
  5. Add API endpoints for querying and resolving.

Phase 2: Alert routing and notifications

  1. Extend the existing review task system to accept disagreement and data-gap sources.
  2. Add WebSocket notifications for real-time analyst alerts.
  3. Wire alert routing into the post-batch hook chain.

Phase 3: Analyst Dashboard integration (frontend)

  1. Add a "Data Quality" panel to the Analyst Dashboard showing open disagreements and data gaps.
  2. Disagreement detail view: show all source values side by side with trust scores, allow analyst to pick the canonical value.
  3. Data-gap view: show required fields with no coverage, grouped by entity, with links to trigger data refresh from specific sources.

Phase 4: Compliance reporting

  1. Add disagreement and data-gap history to the compliance audit report.
  2. Include time-to-resolution metrics for disagreements.
  3. Add data-gap trend analysis (are required fields getting coverage over time?).

Estimated effort

Phase 1: 2-3 days. Phase 2: 1-2 days. Phase 3: 2-3 days. Phase 4: 1-2 days. Total: ~8 days.


10. Rejected Alternatives

Inline disagreement detection (per-mutation)

Checking all source values inside the per-mutation loop would require loading provenance history for every mutation, turning an O(1) provenance lookup into an O(S) query where S is the number of sources. This adds latency to every mutation and conflates per-mutation conflict handling with field-level disagreement analysis. Post-batch detection keeps the per-mutation path fast and separates concerns.

Blocking mutations on missing required fields

An alternative design would reject mutations for an entity if required fields are still missing, forcing data providers to supply complete records. Rejected because: (a) data providers often supply partial records that get enriched over time, (b) blocking mutations would prevent any data from entering the system until all required fields are available, creating a chicken-and-egg problem, and (c) the compliance requirement is visibility into gaps, not blocking data flow.

Extending ConflictRecord for disagreements

Rather than a new table, we could add a disagreement_type column to conflict_records. Rejected because conflicts and disagreements have different lifecycles (per-mutation vs. per-field), different resolution semantics (one resolver vs. aggregate consensus), and different query patterns (dashboard aggregates vs. investigation detail). Combining them would complicate queries and make the model harder to reason about.

Relying on evaluation runs for gap detection

The evaluation engine already reports required_missing fields. We could make evaluations run automatically after each batch. Rejected because evaluation runs are heavyweight (they scan all entities, not just the batch), and the existing evaluation model produces metrics, not actionable alerts. The missing data monitor is a lightweight, targeted check that feeds directly into the analyst task system.


11. Open Questions

  1. Should disagreement detection consider historical values? Currently it only looks at the latest value per source. If Source A previously agreed with Source B but then updated its value, the disagreement is between Source A's new value and Source B's old value. Should the system track this progression?

  2. Should data-gap alerts be scoped to published schema versions only? Draft schemas may have required fields that are aspirational. Alerting on them would create noise. Current proposal: only enforce required fields from the published schema version.

  3. Should the disagreement severity thresholds be global or per-field? Some fields (e.g., beneficial_owner_name) may warrant a higher sensitivity than others (e.g., phone_number). Per-field thresholds add configuration complexity.

  4. Should the system auto-dismiss data-gap alerts when a schema version is archived? If the schema is no longer active, its required fields are moot. But audit trail requirements may demand keeping the historical record.

  5. Batch size limits for post-batch hooks. If a batch contains 10,000 mutations across 5,000 entities, the post-batch hooks could be expensive. Should there be a threshold above which the hooks run asynchronously via a background job instead of inline?