Skip to main content

Ingestion pipelines

All external data — whether from a registry API or an LLM-driven OSINT crew — enters the knowledge graph through the same conceptual pipeline: acquire → map → resolve → persist → sync → score. What differs is the acquisition step and whether it runs synchronously or as a durable workflow.

Two acquisition modes

Plugins declare an execution mode in plugin.yaml:

ModeTransportExamplesOrchestration
syncHTTP request/responseKVK, NorthDataCalled directly within a Temporal activity
async (investigation)Temporal child workflowOSINT crewsLong-running agentic loop

See Plugins for the plugin contract and src/integrations/plugin_loader.py.

Mapping: from provider payload to ontology claims

Providers never write ontology fields directly. Each provider ships a mapping spec (plugins/<slug>/mapping_spec.yaml) describing how its source fields map to ontology entity types and attributes, with a confidence weight per field. At boot, the TranslationRegistry (Phase 114, src/integrations/translation_registry.py) reads every plugin's spec together with the active ontology and builds an index (plugin_id, source_field) → (entity_type, target_field, confidence).

If a plugin declares a target_schema incompatible with the active ontology version family (e.g. targets v3.4 while v3.5.2 is active), the registry fails the boot — drift is caught at startup, not in production.

The provider router

When an investigation needs data from a provider, the ProviderRouter (src/integrations/provider_router.py) resolves which provider and which credentials to use for the current tenant. It applies a fallback chain (rollback flag → not-metered → tenant credential repository → grandfathered defaults), raising MissingTenantCredentialsError (HTTP 424) if no usable credential exists.

Sync provider flow (KVK / NorthData)

Async crew flow (OSINT)

OSINT acquisition is an agentic loop: a crew is given a prompt and a set of MCP tools, and iterates think → call-tool → observe until it has gathered enough to return findings in the ontology's JSON contract.

The findings then flow through the same map → resolve → persist → sync → score path as sync providers. The model is instantiated per tenant via src/pipelines/model_factory.py, which reads the tenant's LLM key (src/integrations/llm_credentials.py) — there is no global env-var key.

Freshness and provider runs

Each provider execution is recorded as a provider_run with status and extracted-entity counts. A stale_check (src/integrations/stale_check.py) compares run recency so investigations can avoid re-fetching fresh data. See Mutation queue for the provenance that ties extracted data back to the run that produced it.


Deep dive: the plugin & mapping contracts

The mechanics of the loader, the TranslationRegistry, the credential chain, and the trust-weighted merge are documented in detail on the Plugins deep dive. Two points worth emphasising for the ingestion path specifically:

Sync vs async is a declaration, not a fork in the core

A plugin's execution.mode (sync or async) is read from plugin.yaml. Sync providers are called inside a Temporal activity as plain HTTP; async providers declare a workflow_name and run as their own workflow. The rest of the pipeline is identical — both paths produce ontology entities + claims that flow through the same resolve → persist → sync → score stages. That uniformity is why adding a provider rarely touches core code.

Boot-time drift protection

The registry validates the union of all plugins against the active ontology at startup. A field that doesn't exist, or a plugin pinned to the wrong schema family, aborts the boot — so a mapping mistake can never silently mis-route provider data in production.

To add a provider end to end, follow Add a provider.