Multi-Source Aggregation
ApiLinker v0.7.1 can combine records from multiple REST sources using keyed joins before you map or send data to a target API. This is useful when identifiers live in different systems (for example CRM users and a billing service).
Library API only
YAML configuration and CLI commands for aggregation are planned for v0.7.2. In v0.7.1, use the Python API shown below.
Overview
| Component | Role |
|---|---|
MultiSourceAggregator | Joins pre-fetched payloads by key |
ApiLinker.aggregate_sources() | Fetches from named connectors, then aggregates |
ApiLinker.register_source() | Registers extra source connectors by alias |
Join types
inner— only keys present in every sourceleft— all keys from the first configured sourceright— all keys from the last configured sourceouter— union of keys across sources
Merge strategies
flat— merge fields into one object (useconflict_resolutionwhen names collide)namespace— nest each source under its configurednamespace
Conflict resolution (flat merge)
prefer_first— keep the first source’s valueprefer_last— keep the last source’s valueerror— raise if the same field exists in multiple sources
Example
from apilinker import ApiLinker
linker = ApiLinker(log_level="INFO")
# Register connectors (or load from YAML and call register_source)
linker.register_source("crm", crm_connector)
linker.register_source("billing", billing_connector)
aggregation_config = {
"join_type": "inner",
"merge_strategy": "flat",
"conflict_resolution": "prefer_last",
"sources": [
{
"name": "crm",
"join_key": "id",
"fields": [
{"source": "id", "target": "id"},
{"source": "name", "target": "name"},
],
},
{
"name": "billing",
"join_key": "customer_id",
"fields": [{"source": "plan", "target": "plan"}],
},
],
}
rows = linker.aggregate_sources(
source_requests={
"crm": {"connector": "crm", "endpoint": "list_users"},
"billing": {"connector": "billing", "endpoint": "list_plans"},
},
aggregation_config=aggregation_config,
parallel=True,
)
# rows is a list of merged dicts ready for mapping or target sync
Pre-fetched data
If you already have payloads, use aggregate_source_data():
merged = linker.aggregate_source_data(
{
"crm": [{"id": 1, "name": "Alice"}],
"billing": [{"customer_id": 1, "plan": "pro"}],
},
aggregation_config,
)
Parallel fetching
aggregate_sources(..., parallel=True) uses a thread pool (up to eight workers) when more than one source is requested. Set parallel=False for deterministic, sequential fetches.
See also
- Core Components — streaming and connector APIs
examples/multi_source_aggregation.py— runnable stub example- ROADMAP.md — YAML/CLI aggregation (v0.7.2)