Ingestion pipelines
All external data — whether from a registry API or an LLM-driven OSINT crew — enters the knowledge graph through the same conceptual pipeline: acquire → map → resolve → persist → sync → score. What differs is the acquisition step and whether it runs synchronously or as a durable workflow.
Two acquisition modes
Plugins declare an execution mode in plugin.yaml:
| Mode | Transport | Examples | Orchestration |
|---|---|---|---|
sync | HTTP request/response | KVK, NorthData | Called directly within a Temporal activity |
async (investigation) | Temporal child workflow | OSINT crews | Long-running agentic loop |
See Plugins for the plugin contract and src/integrations/plugin_loader.py.
Mapping: from provider payload to ontology claims
Providers never write ontology fields directly. Each provider ships a mapping spec
(plugins/<slug>/mapping_spec.yaml) describing how its source fields map to ontology entity types
and attributes, with a confidence weight per field. At boot, the TranslationRegistry (Phase
114, src/integrations/translation_registry.py) reads every plugin's spec together with the active
ontology and builds an index (plugin_id, source_field) → (entity_type, target_field, confidence).
If a plugin declares a target_schema incompatible with the active ontology version family (e.g.
targets v3.4 while v3.5.2 is active), the registry fails the boot — drift is caught at startup,
not in production.
The provider router
When an investigation needs data from a provider, the ProviderRouter
(src/integrations/provider_router.py) resolves which provider and which credentials to use for
the current tenant. It applies a fallback chain (rollback flag → not-metered → tenant credential
repository → grandfathered defaults), raising MissingTenantCredentialsError (HTTP 424) if no
usable credential exists.
Sync provider flow (KVK / NorthData)
Async crew flow (OSINT)
OSINT acquisition is an agentic loop: a crew is given a prompt and a set of MCP tools, and iterates think → call-tool → observe until it has gathered enough to return findings in the ontology's JSON contract.
The findings then flow through the same map → resolve → persist → sync → score path as sync
providers. The model is instantiated per tenant via src/pipelines/model_factory.py, which reads
the tenant's LLM key (src/integrations/llm_credentials.py) — there is no global env-var key.
Freshness and provider runs
Each provider execution is recorded as a provider_run with status and extracted-entity counts. A
stale_check (src/integrations/stale_check.py) compares run recency so investigations can avoid
re-fetching fresh data. See Mutation queue for the provenance that ties
extracted data back to the run that produced it.
Deep dive: the plugin & mapping contracts
The mechanics of the loader, the TranslationRegistry, the credential chain, and the trust-weighted
merge are documented in detail on the Plugins deep dive.
Two points worth emphasising for the ingestion path specifically:
Sync vs async is a declaration, not a fork in the core
A plugin's execution.mode (sync or async) is read from plugin.yaml. Sync providers are called
inside a Temporal activity as plain HTTP; async providers declare a workflow_name and run as their
own workflow. The rest of the pipeline is identical — both paths produce ontology entities +
claims that flow through the same resolve → persist → sync → score stages. That uniformity is why
adding a provider rarely touches core code.
Boot-time drift protection
The registry validates the union of all plugins against the active ontology at startup. A field that doesn't exist, or a plugin pinned to the wrong schema family, aborts the boot — so a mapping mistake can never silently mis-route provider data in production.
To add a provider end to end, follow Add a provider.