Connector Implementation Guide¶
Lessons learned from implementing the SEC EDGAR and Financial Reports connectors. This guide covers patterns, pitfalls, and decisions that apply to any new data provider.
Architecture: One Connector Per Operation¶
Never use a single connector with an endpoint: str parameter and extra="allow".
This hides parameter schemas from the agent — to_llm() shows one opaque blob instead
of discoverable operations with typed parameters.
Always create separate connectors for each distinct operation. Each gets its own
Pydantic BaseModel with explicit, typed, documented fields. The agent discovers
available operations and their parameters automatically via to_llm().
# BAD: agent sees nothing useful
class FetchParams(BaseModel):
model_config = ConfigDict(extra="allow")
endpoint: str = Field(..., description="company | filings | profile")
# GOOD: agent sees exactly what each operation accepts
class CompanySearchParams(BaseModel):
identifier: str = Field(..., description="Company name, ticker, or CIK")
class FilingsParams(BaseModel):
identifier: str | None = Field(default=None, description="...")
form: str | None = Field(default=None, description="e.g. '10-K', '8-K'")
limit: int = Field(default=20, ge=1, le=100)
When to share param models¶
Share a param model across connectors only when the parameters are truly identical.
Example: income_statement, balance_sheet, and cashflow_statement all accept
identifier, periods, annual, view — one FinancialStatementParams model is correct.
Result Types: Tell the Agent What .data Contains¶
The framework defaults to .data being a DataFrame. When a connector returns text
(markdown, context summaries), use result_type="text" on the @connector decorator:
@connector(tags=["my_source"], result_type="text")
async def my_source_document(params: DocumentParams) -> Result:
"""Retrieve document as markdown text."""
...
return Result(data=markdown_text, provenance=...)
This adds -> result.data is text (not a DataFrame). to the to_llm() output,
preventing the agent from writing .iloc[0] or .head() on a string.
When to use text vs DataFrame¶
- DataFrame: structured, tabular data the agent will filter/aggregate/chart
- Text: documents, metadata summaries, section content the agent will read/reason about
If the data is a single row with 30+ columns from a flattened API response, consider
whether to_context() (if the SDK provides it) or a curated summary would serve the
agent better than a wide DataFrame it can't easily display.
DataFrame Sanitization¶
SDK responses often produce DataFrames with issues that crash downstream serialization (the agent's display layer, Arrow conversion, JSON export). Handle these in the connector:
MultiIndex columns¶
Some SDKs produce DataFrames with pd.MultiIndex columns (e.g., multi-level table headers).
Downstream df.to_json(orient="table") does not support MultiIndex.
if isinstance(df.columns, pd.MultiIndex):
df.columns = [" | ".join(str(c) for c in col if str(c).strip()) for col in df.columns]
Duplicate column names¶
Financial tables often have years as column headers repeated (amount column + symbol column).
This causes KeyError or ValueError in pandas operations.
seen: dict[str, int] = {}
new_cols: list[str] = []
for col in df.columns:
col_str = str(col)
if col_str in seen:
seen[col_str] += 1
new_cols.append(f"{col_str}_{seen[col_str]}")
else:
seen[col_str] = 1
new_cols.append(col_str)
if new_cols != list(df.columns):
df.columns = new_cols
Mixed-type columns¶
Columns containing both strings ("$1,234") and floats (NaN) cause ArrowTypeError
when the agent's display layer converts to Arrow format. Coerce to uniform type:
for col in df.columns:
if df[col].dtype == object:
sample = df[col].dropna()
if len(sample) > 0:
types = set(type(v).__name__ for v in sample)
if len(types) > 1:
df[col] = df[col].apply(lambda x: str(x) if x is not None else "")
Unhashable cells (nested dicts/lists)¶
pd.json_normalize() with max_level leaves nested objects as dict/list cells.
Pandas operations on these fail with TypeError: unhashable type.
Use json.dumps(), not string concatenation. Comma-joining lists or calling str()
on dicts produces opaque strings the agent can't parse back. JSON strings preserve
structure and can be re-parsed downstream:
import json
for col in df.columns:
sample = df[col].dropna()
if len(sample) == 0:
continue
first = sample.iloc[0]
if isinstance(first, (list, tuple, dict)):
df[col] = df[col].apply(
lambda x: json.dumps(x, default=str)
if isinstance(x, (list, tuple, dict)) else x
)
Timezone-aware datetimes (Pydantic TzInfo)¶
Pydantic v2 models use pydantic_core.TzInfo for timezone-aware datetimes. Pandas
can't serialize these ('TzInfo' object has no attribute 'zone'). Use
model_dump(mode="json") to serialize datetimes to ISO strings before creating DataFrames:
raw = resp.model_dump(mode="json") # datetimes -> ISO strings
df = pd.json_normalize(raw["results"])
Rate Limit Handling¶
APIs with rate limits need two distinct strategies: burst limits (short-term speed limits, retry after a few seconds) and quota limits (monthly caps, don't retry).
Distinguish burst from quota¶
Parse the retry_after value from the 429 response body. If it's short (< 60s), retry
with exponential backoff. If it's long (hours/days), raise immediately with a clear
message — retrying wastes time and burns agent tool calls.
_MAX_BURST_RETRIES = 3
async def _with_retry(coro_factory, api_key: str):
"""Execute an async SDK call with retry on burst 429 errors."""
for attempt in range(_MAX_BURST_RETRIES + 1):
try:
async with _sdk_client(api_key) as client:
return await coro_factory(client)
except Exception as exc:
if getattr(exc, "status", None) != 429:
raise
retry_after = _parse_retry_after(exc) # extract from body
if retry_after > 60: # quota exhaustion
raise ValueError(
"API quota exhausted for the current billing period. "
"Upgrade your plan or wait for the next billing cycle."
) from exc
if attempt < _MAX_BURST_RETRIES:
wait = max(retry_after, 1.0) * (2 ** attempt)
await asyncio.sleep(wait)
else:
raise ValueError(
f"Rate limit exceeded after {_MAX_BURST_RETRIES} retries."
) from exc
Why this matters for agents¶
Without retry handling, the agent sees a raw ApiException(429) with a wall of HTTP
headers. It doesn't know whether to retry or give up, so it often loops — trying the
same call repeatedly, burning tool calls and time. With clear error messages:
- Burst: the connector retries silently, the agent never sees the error
- Quota: the agent gets
"quota exhausted"and immediately falls back to other sources
Apply to all connectors uniformly¶
Wrap every SDK call through the retry helper. Use a lambda/closure pattern so complex connectors (with conditional logic) can still benefit:
# Simple connector
resp = await _with_retry(lambda c: c.companies.list(**kwargs), api_key)
# Complex connector with conditional dispatch
async def _call(client):
api = getattr(client, "isic", None) or ISICApi(client.api_client)
return await {"sections": api.sections_list, "divisions": api.divisions_list}[level](**kwargs)
resp = await _with_retry(_call, api_key)
SDK Workarounds¶
Broken SDK modules¶
If an SDK has a syntax error or broken import in a module you don't use, pre-load a
stub in sys.modules before importing:
import sys, types
stub_key = "broken_sdk.api.broken_module"
if stub_key not in sys.modules:
stub = types.ModuleType(stub_key)
class _Stub:
def __init__(self, *a, **kw): pass
stub.BrokenClass = _Stub
sys.modules[stub_key] = stub
from broken_sdk import Client # now imports cleanly
This is clean, requires no disk writes, and is harmless when the SDK is fixed.
Sync SDKs in async connectors¶
Many SDKs are synchronous. Wrap blocking calls with asyncio.to_thread():
@connector(tags=["my_source"])
async def my_source_fetch(params: FetchParams) -> Result:
entity = await asyncio.to_thread(_resolve_entity, params.identifier)
data = await asyncio.to_thread(entity.get_data)
...
For complex multi-step sync operations, extract them into a single sync function
and wrap the whole thing in one to_thread() call to minimize thread-switching overhead.
OutputConfig: Match Actual Response Columns, Not API Docs¶
OutputConfig column names must match what json_normalize() actually produces, not what
the API documentation says the response should contain. APIs often return different
columns for summary/list views vs full/detail views.
Verify columns empirically¶
Make a real API call (or inspect _to_dataframe(resp).columns) before defining the
OutputConfig. Common mismatches:
- Summary vs full view: a list endpoint returns
country_code(flat string), but the detail endpoint returnscountry_iso.name(nested object that gets normalized). Your search OutputConfig must usecountry_code, notcountry_iso.name. - Optional nested objects:
sector.nameonly exists when the API response includes asectorobject. If it'snull,json_normalizewon't create the column at all. - Renamed fields: the SDK may rename fields during
model_dump().
Missing columns are silently skipped¶
build_table_result() matches config columns against the DataFrame. If a config column
doesn't exist in the DataFrame, it's silently skipped — no error. This means a wrong
column name doesn't crash, it just produces incomplete output. The agent still gets data,
but without the semantic roles you intended.
Per-resource OutputConfig mapping¶
When a single connector dispatches to different resources (e.g., a reference data connector that serves filing types, countries, languages), each resource has different columns. Use a mapping dict to select the right OutputConfig at runtime:
_OUTPUT_MAP = {
"filing_types": FILING_TYPES_OUTPUT, # id, code, name, description
"countries": COUNTRIES_OUTPUT, # alpha_2, name
"languages": GENERIC_OUTPUT, # id, name
}
# In the connector:
output = _OUTPUT_MAP.get(params.resource, GENERIC_OUTPUT)
return output.build_table_result(df, provenance=...)
Docstrings as Workflow Guidance¶
Connector docstrings are injected verbatim into the agent's system prompt via to_llm().
They're not just documentation — they're instructions that guide the agent through
multi-step workflows.
Chain connectors explicitly¶
Tell the agent exactly which connector to call next and how to pass identifiers between them:
@connector(output=COMPANIES_SEARCH_OUTPUT, tags=["my_source"])
async def my_source_companies_search(params: CompaniesSearchParams, *, api_key: str) -> Result:
"""Search companies by name, country, or industry.
Returns company profiles with ID, name, and country.
Use the company ID with my_source_filings_search(company=id) to find filings,
my_source_company_retrieve(id=id) for the full profile, or
my_source_next_report(id=id) for report predictions.
Use my_source_industries to discover valid industry codes for the sector filter.
"""
Anti-pattern: generic docstrings¶
Without chaining hints, the agent must guess the workflow. It often picks the wrong connector, passes wrong parameters, or skips steps entirely:
# BAD: agent doesn't know what to do with the results
"""Search companies on the platform."""
# GOOD: agent knows the exact next steps
"""Search companies by name, country, or industry.
Use the company ID with my_source_filings_search(company=id) to find filings."""
Reference data connectors need extra guidance¶
Reference/lookup connectors are often missed by the agent unless you tell it when to use them. Be specific about which filter parameters accept which reference codes:
"""List reference data: filing types, categories, languages, countries, or sources.
Use filing type codes in my_source_filings_search(types=...).
Use country codes in my_source_filings_search(countries=...) or my_source_companies_search(countries=...)."""
Connector Design Patterns¶
Discovery -> Inspect -> Fetch¶
Design connectors in layers that let the agent narrow down before committing to expensive operations:
- Search/List — cheap, returns identifiers + minimal metadata
- Sections/TOC — medium, returns what's available inside a resource
- Fetch item — targeted, returns only the specific content needed
Example flow:
search_filings(query="AI risk") -> accession numbers
filing_sections(accession_number=...) -> list of items
filing_item(accession_number=..., item="1A") -> Risk Factors text only
This saves massive tokens vs fetching a 500-page document to find one section.
Table listing + individual fetch¶
When a resource contains multiple heterogeneous tables (different schemas, column counts), don't try to return them all in one DataFrame. Instead:
- List tables — returns summary DataFrame (index, caption, type, size)
- Fetch table — returns individual table by index as a clean DataFrame
The agent uses the listing to pick the right table, then fetches only that one.
Batch operations¶
If the SDK supports batch retrieval, expose it. The agent calling a connector in a loop (N sequential API calls) is slow and token-heavy. A single connector that accepts a list of identifiers and returns a combined result is much more efficient.
However, only add batch params if the SDK actually supports it server-side. Client-side batching (looping internally) doesn't save latency and adds complexity.
Provenance¶
The Connector.__call__ method automatically stamps provenance.source with the
connector name and provenance.source_description with the connector's docstring.
You don't need to manually set these — just construct your Provenance with params:
The framework will override source with the connector name (e.g., my_source_filings)
at call time. The UI uses this to show which specific connector produced each result.
If you need to pass additional metadata through provenance (e.g., API response headers,
pagination info), use the properties dict:
Provenance(
source="my_source",
params=params.model_dump(),
properties={"page": 1, "total_pages": 5},
)
SDK Feature Discovery: Use to_context() When Available¶
Modern data SDKs increasingly include AI-optimized methods (to_context(), to_llm(),
to_agent_tools()). Before building a manual attribute-extraction pipeline:
- Check if the SDK objects implement a
to_context()or similar method - If they do, use it — it's maintained by the SDK authors, handles all object types, and produces structured output designed for LLM consumption
- Expose it as a
result_type="text"connector
This avoids the trap of maintaining a hard-coded attribute list that falls behind the SDK's evolving data model.
SDK Sub-APIs Not on the High-Level Client¶
Generated SDK clients often expose a convenience wrapper (e.g., client.filings,
client.companies) that covers the main endpoints, but leave specialized APIs
(ISINs, ISIC classifications, chat, watchlist) accessible only through low-level
API classes.
Check what the high-level client exposes vs what exists in the SDK's api/ directory.
For sub-APIs not on the wrapper, instantiate the low-level class directly:
async def _call(client):
# Try high-level client first, fall back to low-level API
if hasattr(client, "isic"):
return await client.isic.sections_list(**kwargs)
from my_sdk import ISICClassificationsApi
return await ISICClassificationsApi(client.api_client).sections_list(**kwargs)
This is forward-compatible: if the SDK later adds the sub-API to the high-level client, your code uses it automatically.
Testing with the Agent¶
Run uv run python scripts/agent_eval.py "your query" to test connectors end-to-end
with the actual agent. This catches issues that unit tests miss:
Common failures discovered only through agent testing¶
- Wrong attribute names: the SDK's actual API vs what you assumed from docs
- DataFrame serialization errors: MultiIndex, duplicates, mixed types, Arrow conversion
- Empty results:
row_count > 0in metadata butto_dataframe()returns empty - Agent confusion: the agent misuses connectors when
to_llm()descriptions are unclear or when result types aren't communicated - OutputConfig column mismatch: columns in the config that don't exist in the actual API response (silently skipped, produces incomplete output with no error)
- Rate limit loops: without retry handling, the agent sees raw 429 errors and loops trying the same call, burning 10-20+ tool calls before giving up
Test workflows, not just individual connectors¶
The agent chains connectors together. Test the full workflow: - Search -> list -> fetch -> analyze - Error recovery: what happens when the first result is empty? - Fallback behavior: does the agent find alternative connectors when one fails?
Test degradation scenarios¶
- Rate limits: what does the agent see when the API throttles? A clear message like "quota exhausted" lets the agent fall back to alternatives. A raw exception causes loops.
- Partial API access: tiered APIs may return 403 for premium endpoints. The agent should get a clear message, not an opaque stack trace.
- Empty search results: raise
ValueErrorwith the search params so the agent can adjust its query rather than assuming the connector is broken.
Verify to_llm() output¶
After implementing connectors, call CONNECTORS.to_llm() and read the output. This is
exactly what the agent sees. Check that:
- Each connector's description includes workflow chaining hints
- Returns: lists the OutputConfig columns (so the agent knows what to expect)
- Parameter descriptions are specific enough to use without guessing
- result_type="text" connectors are clearly marked
Exports Checklist¶
# Every connector module should export:
CONNECTORS = Connectors([
my_source_search,
my_source_fetch,
my_source_detail,
...
])
# If some connectors need API keys and others don't:
FETCH_CONNECTORS = Connectors([...]) # subset that needs bind_deps
In __init__.py, use bind_deps(api_key=key) for connectors requiring API keys,
and add key-free connectors directly:
result = result + MY_SOURCE_FETCH.bind_deps(api_key=key) # needs key
result = result + MY_OTHER_SOURCE # no key needed
File Size¶
Aim for < 400 lines per connector module. If a provider has 15+ connectors, the file will be ~500-600 lines — that's fine for a single-provider module. Don't split into multiple files unless you have genuinely separate concerns (e.g., separate SDK clients).