Deep Research Trajectories with NeMo Data Designer and MCP Tool Use
Data Designer v0.5.0's MCP tool-use support lets you generate multi-turn research trajectories, the kind of data needed to train deep research agents that iteratively search, read, and synthesize evidence before answering a question.

Deep research agents like OpenResearcher (Li, Jiang, Ma et al., 2026) and Universal Deep Research (Belcak & Molchanov, 2025) generate long reasoning chains interleaved with tool calls: formulating queries, retrieving documents, reading passages, refining hypotheses, and eventually synthesizing an answer. Training these agents requires trajectory data capturing the full multi-turn interaction between a model and its tools: every search, every document opened, every dead end explored.
OpenResearcher demonstrated something worth paying attention to: synthetic trajectories generated against a local retriever (BM25 over a static corpus, no web APIs) are sufficient to train Nemotron Nano 3 to outperform GPT-4.1 on deep research benchmarks. The data format (complete tool-use traces showing how a model moves through an information space) matters more than model scale. Nemotron Nano 3, with only 3B active parameters, beats models orders of magnitude larger on multi-hop research tasks.
This post shows how to generate that same kind of training data using Data Designer's MCP tool-use capabilities. We build a retriever as an MCP server, construct a corpus with known-good evidence, run a teacher model through the full research process, and use an LLM judge for rejection sampling. The result is a pipeline that produces high-quality research trajectories you can use for supervised fine-tuning or as a starting point for RL.
Here's what one of those trajectories looks like, a 4-hop question answered correctly by Claude Opus 4.5 using the pipeline described below. Each line is a tool call; parallel calls within the same turn are grouped.
Example trajectory: 4-hop question, 31 turns, 49 tool calls
Based on my extensive research, I have gathered the following evidence:
1. The Roman Empire reached its greatest territorial extent under Trajan (117 CE) — confirmed by multiple documents
2. Trajan's mother was from the Gens Marcia — the Trajan document states he "on his mother's side from the Gens Marcia, of an Italic family of Sabine origin"
3. Egypt was colonized/annexed by Rome after Cleopatra's death (30 BC), making it a Roman province
4. Egypt had border conflicts with Libya (ancient Marmaricae/Cyrenaica region) in the 1st century BC
The answer is Marcia.
Step 1: Building the Retrieval MCP Server
OpenResearcher's key design choice is a three-tool browser interface rather than a single retrieval call. The paper argues (and their ablations confirm) that separating search, document opening, and in-document search forces the model to develop genuine research strategies: skimming results, diving into promising documents, hunting for specific evidence within them. A single monolithic "retrieve" tool collapses this entire workflow into one step, which produces shorter and less useful training trajectories.
We implement the same three tools as an MCP server that Data Designer can invoke during generation. Our retriever uses BM25S for fast lexical search over the corpus:
from mcp.server.fastmcp import FastMCP
mcp_server = FastMCP("corpus-retriever")
@mcp_server.tool()
def search(query: str, top_k: int = 10) -> dict:
"""Search for candidate documents to explore."""
# BM25S search over the corpus, returns ranked results with snippets
...
@mcp_server.tool(name="open")
def open_document(doc_id: str) -> dict:
"""Open a document for detailed inspection with cursor-numbered chunks."""
# Returns content formatted as [1] paragraph... [2] paragraph...
...
@mcp_server.tool()
def find(doc_id: str, query: str) -> dict:
"""Find matching passages inside a document by keyword."""
# Returns matching chunks with cursor positions
...
if __name__ == "__main__":
mcp_server.run()
search returns a ranked list of document IDs with short snippets, enough for the model to decide which documents look promising. open returns the full document content, split into cursor-numbered chunks so the model can reference specific passages. find does targeted keyword search within a single document, letting the model locate specific evidence without reading the entire thing. The cursor-based chunking across open and find gives the model a way to scan long documents incrementally, the way a human researcher would scan a paper for the relevant section rather than reading it cover to cover.
The server runs as a local stdio process, which means Data Designer launches and manages it automatically. No external services, no API keys for retrieval, no rate limits.
Step 2: Building the Corpus
The corpus design follows directly from OpenResearcher's most striking ablation result. They tested what happens when you vary the retrieval corpus while keeping the reasoning model fixed (GPT-OSS-120B). The results, from the OpenResearcher Appendix:
| Corpus | BrowseComp-Plus Accuracy |
|---|---|
| Golden passages only (BrowseComp-Plus corpus) | 56.0% |
| 15M FineWeb + golden passages | 31.2% |
| 15M FineWeb only | 0.71% |
Without golden passages (documents known to contain evidence for the question), accuracy drops to nearly zero. The model can't learn research strategies from trajectories where every search is a dead end.
The original OpenResearcher corpus uses 15M documents from FineWeb as distractors alongside 10K golden passages. For this demonstration, we use a lighter-weight approach: we construct the corpus from multi-hop QA datasets: HotpotQA (2-hop questions requiring two pieces of linked evidence) and MuSiQue (2-4 hop questions composed from single-hop sub-questions). Each question comes with annotated supporting passages, the specific paragraphs that contain the evidence needed to answer it. Golden passages go into the corpus alongside non-supporting passages from the same datasets as distractors, at roughly a 1:9 ratio. The model has to search through noise to find the signal, which is exactly the skill we want the training data to teach.
The key constraint is that golden passages must be findable but not obvious. If the corpus is too small or the golden passages are too easy to identify, the trajectories won't transfer to real-world research where evidence is sparse. The distractor ratio controls this difficulty, and the paper's ablations give us a good starting point for tuning it.
Step 3: The Data Designer Pipeline
With the retriever server and corpus ready, the Data Designer pipeline ties everything together. We configure a teacher model, point it at the MCP retriever, and let it research each question from scratch. For this demo we hosted our own inference server, but anyone can try this pipeline using Nemotron Nano 3 on build.nvidia.com with a free API key using the model configuration shown below.
import data_designer.config as dd
from data_designer.interface import DataDesigner
# Search rollout model for trajectory generation
config = dd.DataDesignerConfigBuilder()
config.add_model_config(
dd.ModelConfig(
alias="search_rollout_model",
model="nvidia/nemotron-3-nano-30b-a3b",
provider="nvidia",
inference_parameters=dd.ChatCompletionInferenceParams(
temperature=1.0,
top_p=0.95,
max_tokens=16384,
),
)
)
The temperature and top_p settings matter here. We want diverse research strategies across seeds (different query formulations, different document exploration orders) so that rejection sampling has a rich pool to select from. Setting temperature to 1.0 with top_p at 0.95 gives enough variation that the same question can produce meaningfully different trajectories across seeds.
The MCP tool configuration tells Data Designer which server to use and how many tool-call turns to allow:
# MCP retriever tool configuration
tool_config = dd.ToolConfig(
tool_alias="knowledge-base",
providers=["corpus-retriever"],
max_tool_call_turns=150,
)
config.add_tool_config(tool_config)
We set max_tool_call_turns high (150) because deep research trajectories can be long. Our longest observed trajectory used 25 tool calls across 53 messages. Capping too low would truncate the most interesting research chains.
The seed dataset contains the research questions alongside reference answers (which we'll use for rejection sampling in Step 4):
config.with_seed_dataset(
dd.LocalFileSeedSource(path="questions.jsonl"),
)
config.add_column(
dd.ExpressionColumnConfig(
name="research_question",
expr="{{ question }}",
)
)
The core of the pipeline is the research column, where the teacher model receives a question and a system prompt instructing it to use the retriever tools:
SYSTEM_PROMPT = """You are a thorough research assistant. You have access to three tools \
for navigating a knowledge base:
- search(query, top_k): Find candidate documents relevant to your query
- open(doc_id): Open a document to read its full content in numbered chunks
- find(doc_id, query): Locate specific passages within a document by keyword
Your task is to research the given question by searching for relevant documents, \
reading their content, and synthesizing an answer from the evidence you find. \
Be systematic: formulate search queries, explore promising results, and gather \
evidence before answering. Cite specific passages when possible."""
config.add_column(
dd.LLMTextColumnConfig(
name="research_answer",
prompt="Research and answer thoroughly:\n\n{{ research_question }}",
model_alias="search_rollout_model",
system_prompt=SYSTEM_PROMPT,
tool_alias="knowledge-base",
with_trace=dd.TraceType.ALL_MESSAGES,
extract_reasoning_content=True,
)
)
Two settings are doing the important work here. with_trace=dd.TraceType.ALL_MESSAGES captures the entire interaction (every tool call, every tool response, every intermediate reasoning step) into a separate trace column in ChatML format. This is the training data: the full trajectory of how the model moved through the information space. extract_reasoning_content=True pulls out the model's internal chain-of-thought separately, so you can include or exclude it depending on your training setup.
Step 4: Rejection Sampling with an LLM Judge
Not every trajectory leads to a correct answer. OpenResearcher's approach is straightforward. Generate multiple trajectories per question, score them for correctness, and keep only the ones that got the right answer. We implement this with Data Designer's LLMJudgeColumnConfig, using a separate (smaller) model as the judge:
# Judge model for rejection sampling
config.add_model_config(
dd.ModelConfig(
alias="judge",
model="nvidia/nemotron-3-nano-30b-a3b",
provider="nvidia",
)
)
config.add_column(
dd.LLMJudgeColumnConfig(
name="correctness",
model_alias="judge",
prompt=(
"Question: {{ research_question }}\n"
"Reference answer: {{ answer }}\n"
"Generated answer: {{ research_answer }}\n"
"Does the generated answer correctly address the question?"
),
scores=[
dd.Score(
name="correct",
description="Is the answer factually correct?",
options={
1: "Correct",
0: "Incorrect",
},
),
],
)
)
The judge compares the generated answer against the reference answer from the seed dataset. Using a smaller model as judge is deliberate. We don't need the judge to reason about the question, just to compare two answers for factual agreement. This keeps costs down when scoring thousands of trajectories.
In practice, you'd generate multiple trajectories per question (varying the random seed) and filter to correctness.correct == 1. The incorrect trajectories aren't wasted; they can serve as negative examples for preference-based training methods like DPO.
Multi-Turn Tool Calling: Rough Edges in the Open Model Ecosystem
The pipeline described above is straightforward in principle. In practice, getting multi-turn tool calling to work reliably with open-weight models served through vLLM turned out to be the hardest part of this project.
We tested two open-weight models on a self-hosted vLLM (v0.15.1) instance: GPT-OSS-120B and Kimi K2.5. Both failed to produce usable research trajectories, for related but distinct reasons.
GPT-OSS-120B uses a "Harmony" output format that routes text through named channels (reasoning, final answer, tool calls). When tools are involved, vLLM's parser consistently routes the model's output to the wrong channel: everything lands in reasoning_content while the content field stays empty. This happens at all reasoning_effort levels. The model does the research (calls tools, reads documents, formulates queries) but the final synthesized answer never appears where the serving layer expects it. This is a known issue in vLLM's Harmony format handling. Here's the final message from a typical trajectory. The model has been researching for 5 tool calls but produces no answer:
{
"role": "assistant",
"content": [{"type": "text", "text": ""}],
"reasoning_content": "It seems that the knowledge base may have a page about
Colin Bateman that includes his biography. Possibly the 'md_001100' entry is
about a footballer, not the author. The author Colin Bateman likely ...",
"tool_calls": null
}
The model's reasoning shows it has the answer (it identified Colin Bateman as the author), but the content field is empty and no tool call is emitted. The trajectory ends here with nothing to show for it.
Kimi K2.5 exhibits a different failure mode. With its thinking mode enabled, it has the same channel-routing problem as GPT-OSS. With thinking mode disabled, the model produces content text, but after the first tool result, it narrates what it plans to do next rather than emitting another tool call. The serving layer sees text content without tool calls and treats it as the final answer, terminating the research loop after a single search:
{
"role": "assistant",
"content": "I found that 'Cycle of Violence' was written by Colin Bateman,
described as a 'Northern Irish author'. Now let me search for more details
about his birthplace to confirm his birth country.",
"reasoning_content": "The search results clearly show that 'Cycle of Violence'
was written by Colin Bateman, a Northern Irish author...",
"tool_calls": null
}
The model intends to keep researching ("let me search for more details") but describes the action instead of calling the tool. The framework sees content, no tool calls, and stops. We tried multiple tokenizer modes, prompt variations, and vLLM configurations; open issues on the model's HuggingFace page confirm this is a broader compatibility gap.
The original OpenResearcher codebase handles this by bypassing vLLM's tool call parser entirely. They hit the raw /completions endpoint (openai_generator.py), parse <tool_call> XML tags from the output with regex, and continue looping until the model emits an explicit answer marker like <answer> or final answer: (deploy_agent.py).
The open-source tool-calling stack is growing and maturing quickly, but multi-turn tool use with reasoning models is still a rough edge. For now, the practical path is to use models with battle-tested tool-calling support through their native APIs, which is what we do in the results below.
Results
We ran 64 questions uniformly sampled across 2, 3, and 4-hop difficulty levels from MuSiQue, with 50K FineWeb web documents as distractors (a 1:100 golden-to-distractor ratio). We tested two models, Claude Opus 4.5 (via API) and Nemotron Nano 3 (30B total / 3B active params, self-hosted via vLLM with reasoning disabled).
| Claude Opus 4.5 | Nemotron Nano 3 | |
|---|---|---|
| Samples | 64 (55 completed) | 64 (61 completed) |
| Overall accuracy | 41/55 (75%) | 32/61 (52%) |
| 2-hop accuracy | 18/23 (78%) | 13/23 (57%) |
| 3-hop accuracy | 15/18 (83%) | 11/22 (50%) |
| 4-hop accuracy | 8/14 (57%) | 8/16 (50%) |
| Avg tool calls | 16.8 | 11.8 |
| Max tool calls | 57 | 63 |
| Avg messages per trajectory | 40.4 | 26.5 |
| Max messages per trajectory | 117 | 129 |
Opus is 22 points more accurate, but Nano runs roughly 5x faster on self-hosted hardware. Both models show tool usage scaling with hop count. Nano uses fewer tools but achieves lower accuracy, with the largest gap on 2-hop questions (78% vs 57%). Splitting by correctness reveals the same pattern in both models: incorrect trajectories are longer.
Claude Opus 4.5:
| Outcome | Hops | Count | Avg Tool Calls | Avg Messages | Avg Answer Length |
|---|---|---|---|---|---|
| Correct | 2 | 18 | 7.3 | 18.9 | 1,072 chars |
| 3 | 15 | 14.9 | 35.7 | 1,372 chars | |
| 4 | 8 | 21.0 | 50.6 | 1,705 chars | |
| All | 41 | 12.8 | 31.2 | 1,305 chars | |
| Incorrect | 2 | 5 | 21.0 | 48.6 | 1,534 chars |
| 3 | 3 | 25.7 | 63.0 | 1,795 chars | |
| 4 | 6 | 36.0 | 85.2 | 1,903 chars | |
| All | 14 | 28.4 | 67.4 | 1,748 chars |
Nemotron Nano 3:
| Outcome | Hops | Count | Avg Tool Calls | Avg Messages | Avg Answer Length |
|---|---|---|---|---|---|
| Correct | 2 | 13 | 6.5 | 16.1 | 773 chars |
| 3 | 11 | 12.7 | 28.5 | 708 chars | |
| 4 | 8 | 8.0 | 19.0 | 1,600 chars | |
| All | 32 | 9.0 | 21.1 | 957 chars | |
| Incorrect | 2 | 10 | 10.1 | 23.2 | 799 chars |
| 3 | 11 | 18.0 | 39.0 | 1,163 chars | |
| 4 | 8 | 16.2 | 35.5 | 848 chars | |
| All | 29 | 14.8 | 32.6 | 951 chars |
Correct trajectories are shorter at every hop level for both models. Incorrect trajectories are roughly twice as long because the model keeps searching when it can't find evidence, then writes a longer answer to compensate. This anti-correlation between trajectory length and correctness is consistent across model scales, which means trajectory length alone could serve as a lightweight filter during rejection sampling.
Closing Remarks
Thanks to the OpenResearcher team for their work showing that synthetic research trajectories over local retrieval can train small models to compete with much larger ones. Their results suggest we're only beginning to understand how LLMs interact with search tools and how the structure of those interactions shapes what models learn. We're excited to see where the community takes synthetic data research using NeMo Data Designer as both the models and the tooling continue to improve.
Try For Yourself
Full source: openresearcher_demo.py
import data_designer.config as dd
from data_designer.interface import DataDesigner
# Models
config = dd.DataDesignerConfigBuilder()
config.add_model_config(
dd.ModelConfig(
alias="search_rollout_model",
model="nvidia/nemotron-3-nano-30b-a3b",
provider="nvidia",
inference_parameters=dd.ChatCompletionInferenceParams(
temperature=1.0,
top_p=0.95,
max_tokens=16384,
),
)
)
config.add_model_config(
dd.ModelConfig(
alias="judge",
model="nvidia/nemotron-3-nano-30b-a3b",
provider="nvidia",
)
)
# MCP retriever
tool_config = dd.ToolConfig(
tool_alias="knowledge-base",
providers=["corpus-retriever"],
max_tool_call_turns=150,
)
config.add_tool_config(tool_config)
# Seed questions with reference answers
config.with_seed_dataset(
dd.LocalFileSeedSource(path="questions.jsonl"),
)
config.add_column(
dd.ExpressionColumnConfig(
name="research_question",
expr="{{ question }}",
)
)
# Research trajectory generation
config.add_column(
dd.LLMTextColumnConfig(
name="research_answer",
prompt="Research and answer thoroughly:\n\n{{ research_question }}",
model_alias="search_rollout_model",
system_prompt=SYSTEM_PROMPT,
tool_alias="knowledge-base",
with_trace=dd.TraceType.ALL_MESSAGES,
extract_reasoning_content=True,
)
)
# Rejection sampling judge
config.add_column(
dd.LLMJudgeColumnConfig(
name="correctness",
model_alias="judge",
prompt=(
"Question: {{ research_question }}\n"
"Reference answer: {{ answer }}\n"
"Generated answer: {{ research_answer }}\n"
"Does the generated answer correctly address the question?"
),
scores=[
dd.Score(
name="correct",
description="Is the answer factually correct?",
options={
1: "Correct",
0: "Incorrect",
},
),
],
)
)
# Run
mcp_provider = dd.LocalStdioMCPProvider(
name="corpus-retriever",
command="uv",
args=["run", "retriever_mcp.py", "serve"],
env={"CORPUS_PATH": "corpus.jsonl"},
)
data_designer = DataDesigner(mcp_providers=[mcp_provider])
results = data_designer.create(
config_builder=config,
num_records=1000,
dataset_name="research-trajectories",
)
Full source: prepare_corpus.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["datasets", "huggingface_hub", "pyarrow"]
# ///
"""Prepare a retrieval corpus and question set for the OpenResearcher demo.
Builds corpus.jsonl and questions.jsonl from two sources:
1. MuSiQue — multi-hop QA dataset (2/3/4-hop) with golden passages
2. FineWeb — web documents as distractors (matches the OpenResearcher paper)
Golden passages (documents containing evidence for the answer) are mixed with
FineWeb distractors at roughly 1:100 ratio, so the model must search through
noise to find the signal.
Usage:
uv run prepare_corpus.py
"""
from __future__ import annotations
import json
import random
from pathlib import Path
from urllib.parse import urlparse
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
NUM_QUESTIONS = 192 # 64 per hop level (2, 3, 4)
NUM_FINEWEB_DISTRACTORS = 50_000
FINEWEB_SHARD = 0
OUTPUT_DIR = "data"
# ---------------------------------------------------------------------------
# MuSiQue extraction
# ---------------------------------------------------------------------------
def prepare_musique(num_questions: int) -> tuple[list[dict], list[dict]]:
"""Load MuSiQue and extract multi-hop questions with golden passages.
Samples uniformly across hop counts (2, 3, 4) so the dataset has balanced
difficulty. Golden passages (is_supporting=True) go into the corpus;
non-golden passages from the same examples serve as additional distractors.
Returns:
(questions, corpus_docs) where corpus_docs have is_golden=True/False.
"""
from datasets import load_dataset
print("Loading MuSiQue (train split)...")
dataset = load_dataset("bdsaglam/musique", split="train")
# Bucket answerable examples by hop count
hop_buckets: dict[int, list[dict]] = {}
for example in dataset:
if not example.get("answerable", False):
continue
num_hops = len(example.get("question_decomposition", []))
if num_hops < 2:
continue
hop_buckets.setdefault(num_hops, []).append(example)
# Sample uniformly: equal questions per hop level
available_hops = sorted(hop_buckets.keys())
per_hop = num_questions // len(available_hops)
selected_examples = []
for h in available_hops:
bucket = hop_buckets[h]
n = min(per_hop, len(bucket))
selected_examples.extend(random.sample(bucket, n))
print(f" Selected {len(selected_examples)} questions across hops {available_hops}")
# Build questions and corpus docs
questions: list[dict] = []
golden_titles: dict[str, str] = {}
nongolden_titles: dict[str, str] = {}
for example in selected_examples:
num_hops = len(example["question_decomposition"])
questions.append({
"id": f"mq_{len(questions):06d}",
"question": example["question"],
"answer": example["answer"],
"source": "musique",
"num_hops": num_hops,
"seed_id": 0,
})
for para in example.get("paragraphs", []):
title = para.get("title", "").strip()
content = para.get("paragraph_text", "").strip()
if not title or not content:
continue
if para.get("is_supporting", False):
if len(content) > len(golden_titles.get(title, "")):
golden_titles[title] = content
else:
if len(content) > len(nongolden_titles.get(title, "")):
nongolden_titles[title] = content
# Golden passages
corpus_docs = [
{"title": t, "content": c, "source": "musique", "is_golden": True}
for t, c in sorted(golden_titles.items())
]
# Non-golden passages (skip titles already in golden set)
corpus_docs.extend(
{"title": t, "content": c, "source": "musique", "is_golden": False}
for t, c in sorted(nongolden_titles.items())
if t not in golden_titles
)
print(f" Golden passages: {len(golden_titles)}")
print(f" Non-golden passages: {len(corpus_docs) - len(golden_titles)}")
return questions, corpus_docs
# ---------------------------------------------------------------------------
# FineWeb distractor caching
# ---------------------------------------------------------------------------
def cache_fineweb(shard_index: int, max_docs: int) -> list[dict]:
"""Download a FineWeb parquet shard and extract English documents.
Uses huggingface_hub for direct shard download (faster than load_dataset)
and pyarrow for memory-efficient row-group-at-a-time reading.
Returns:
List of distractor documents with title (domain) and content (text).
"""
from huggingface_hub import hf_hub_download
import pyarrow.parquet as pq
filename = f"sample/10BT/{shard_index:03d}_00000.parquet"
print(f"Downloading FineWeb shard: {filename}")
parquet_path = hf_hub_download(
repo_id="HuggingFaceFW/fineweb",
repo_type="dataset",
filename=filename,
)
pf = pq.ParquetFile(parquet_path)
print(f" {pf.metadata.num_rows:,} rows in shard")
docs: list[dict] = []
for rg_idx in range(pf.metadata.num_row_groups):
table = pf.read_row_group(rg_idx, columns=["text", "url", "language", "token_count"])
batch = table.to_pydict()
for text, url, lang, tok_count in zip(
batch["text"], batch["url"], batch["language"], batch["token_count"]
):
if lang != "en" or tok_count < 50:
continue
text = text.strip()
if not text:
continue
# Use domain as title
try:
domain = urlparse(url).netloc.removeprefix("www.")
except Exception:
domain = "unknown"
docs.append({
"title": domain,
"content": text,
"source": "fineweb",
"is_golden": False,
})
if len(docs) >= max_docs:
break
if len(docs) >= max_docs:
break
print(f" Extracted {len(docs):,} English documents (min 50 tokens)")
return docs
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
# Extract MuSiQue questions and golden passages
questions, corpus_docs = prepare_musique(NUM_QUESTIONS)
# Download FineWeb distractors
fineweb_docs = cache_fineweb(FINEWEB_SHARD, NUM_FINEWEB_DISTRACTORS)
corpus_docs.extend(fineweb_docs)
# Deduplicate by title (keep longest content)
title_to_best: dict[str, dict] = {}
for doc in corpus_docs:
title = doc["title"]
if title not in title_to_best or len(doc["content"]) > len(title_to_best[title]["content"]):
title_to_best[title] = doc
corpus = list(title_to_best.values())
random.shuffle(corpus)
# Assign stable IDs
prefix_map = {"musique": "md", "fineweb": "fw"}
source_counters: dict[str, int] = {}
for doc in corpus:
prefix = prefix_map.get(doc["source"], "xx")
idx = source_counters.get(doc["source"], 0)
doc["id"] = f"{prefix}_{idx:06d}"
source_counters[doc["source"]] = idx + 1
# Write corpus.jsonl
corpus_path = output_dir / "corpus.jsonl"
with open(corpus_path, "w") as f:
for doc in corpus:
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
# Write questions.jsonl
random.shuffle(questions)
questions_path = output_dir / "questions.jsonl"
with open(questions_path, "w") as f:
for q in questions:
f.write(json.dumps(q, ensure_ascii=False) + "\n")
# Summary
golden = sum(1 for d in corpus if d["is_golden"])
nongolden = len(corpus) - golden
print(f"\nCorpus: {len(corpus):,} docs ({golden} golden, {nongolden} distractors)")
print(f"Questions: {len(questions)}")
print(f"Output: {corpus_path.resolve()}")
print(f" {questions_path.resolve()}")
if __name__ == "__main__":
main()
Full source: retriever_mcp.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["mcp", "bm25s", "PyStemmer"]
# ///
"""MCP Server: BM25S Corpus Retriever for OpenResearcher-style Deep Research
A single-file MCP server that indexes a JSONL corpus and exposes BM25S
lexical search via three browser tools:
- search(query, top_k): ranked document discovery
- open(doc_id): full document inspection with cursor-numbered chunks
- find(doc_id, query): in-document evidence lookup
Corpus format (JSONL, one document per line):
{"id": "wiki_123", "title": "Christopher Nolan", "content": "Christopher Edward Nolan is a..."}
Server mode (used by Data Designer):
CORPUS_PATH=corpus.jsonl uv run retriever_mcp.py serve
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import bm25s
from mcp.server.fastmcp import FastMCP
MCP_SERVER_NAME = "corpus-retriever"
# Global state — populated at server startup
_bm25_retriever: bm25s.BM25 | None = None
_corpus: list[dict[str, str]] = []
_id_to_index: dict[str, int] = {}
mcp_server = FastMCP(MCP_SERVER_NAME)
def load_corpus(corpus_path: str) -> list[dict[str, str]]:
"""Load a JSONL corpus file into a list of document dicts."""
docs: list[dict[str, str]] = []
with open(corpus_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
doc = json.loads(line)
except json.JSONDecodeError as e:
print(f"Warning: skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
continue
if "id" not in doc or "content" not in doc:
print(f"Warning: skipping line {line_num}, missing 'id' or 'content'", file=sys.stderr)
continue
docs.append({
"id": str(doc["id"]),
"title": str(doc.get("title", "")),
"content": str(doc["content"]),
})
return docs
def build_index(docs: list[dict[str, str]]) -> bm25s.BM25:
"""Build a BM25S index over title + content for each document."""
corpus_texts = [f"{d['title']} {d['content']}" for d in docs]
corpus_tokens = bm25s.tokenize(corpus_texts, stopwords="en")
retriever = bm25s.BM25()
retriever.index(corpus_tokens)
return retriever
def initialize(corpus_path: str) -> None:
"""Load corpus and build index into global state."""
global _bm25_retriever, _corpus, _id_to_index
print(f"Loading corpus from {corpus_path}...", file=sys.stderr)
_corpus = load_corpus(corpus_path)
if not _corpus:
print("Warning: corpus is empty", file=sys.stderr)
return
_id_to_index = {doc["id"]: idx for idx, doc in enumerate(_corpus)}
print(f"Building BM25S index over {len(_corpus)} documents...", file=sys.stderr)
_bm25_retriever = build_index(_corpus)
print(f"Index ready. {len(_corpus)} documents indexed.", file=sys.stderr)
def _chunk_content(content: str) -> list[str]:
"""Split document content into cursor-addressable chunks."""
paragraph_chunks = [c.strip() for c in re.split(r"\n\s*\n+", content) if c.strip()]
if len(paragraph_chunks) > 1:
return paragraph_chunks
line_chunks = [line.strip() for line in content.splitlines() if line.strip()]
if line_chunks:
return line_chunks
stripped = content.strip()
return [stripped] if stripped else []
@mcp_server.tool()
def search(query: str, top_k: int = 10) -> dict:
"""Search for candidate documents to explore.
Args:
query: Search query string.
top_k: Maximum number of ranked results (default: 10).
"""
global _bm25_retriever, _corpus
if _bm25_retriever is None or not _corpus:
return {"error": "Search index not initialized", "results": []}
query_tokens = bm25s.tokenize([query], stopwords="en")
k = max(1, min(top_k, len(_corpus)))
results, scores = _bm25_retriever.retrieve(query_tokens, k=k)
search_results: list[dict] = []
for i in range(results.shape[1]):
doc_idx = results[0, i]
score = float(scores[0, i])
if score <= 0:
continue
doc = _corpus[doc_idx]
snippet = doc["content"][:500]
if len(doc["content"]) > 500:
snippet += "..."
search_results.append({
"id": doc["id"],
"title": doc["title"],
"snippet": snippet,
"score": round(score, 4),
})
return {"results": search_results, "query": query, "total": len(search_results)}
@mcp_server.tool(name="open")
def open_document(doc_id: str) -> dict:
"""Open a document for detailed inspection with cursor-numbered chunks.
Args:
doc_id: The document ID (from search results).
"""
global _corpus, _id_to_index
if not _corpus:
return {"error": "Corpus not loaded"}
idx = _id_to_index.get(doc_id)
if idx is None:
return {"error": f"Document not found: {doc_id}"}
doc = _corpus[idx]
chunks = _chunk_content(doc["content"])
numbered_chunks = [{"cursor": i + 1, "text": chunk} for i, chunk in enumerate(chunks)]
formatted = "\n".join(f"[{e['cursor']}] {e['text']}" for e in numbered_chunks)
return {
"id": doc["id"],
"title": doc["title"],
"content": formatted,
"chunks": numbered_chunks,
"total_chunks": len(numbered_chunks),
}
@mcp_server.tool()
def find(doc_id: str, query: str) -> dict:
"""Find matching passages inside a document by keyword.
Args:
doc_id: Document ID to search within.
query: Text to find (case-insensitive substring and keyword matching).
"""
global _corpus, _id_to_index
if not _corpus:
return {"error": "Corpus not loaded", "matches": []}
idx = _id_to_index.get(doc_id)
if idx is None:
return {"error": f"Document not found: {doc_id}", "matches": []}
query_text = query.strip().lower()
if not query_text:
return {"error": "Query must be non-empty", "matches": []}
doc = _corpus[idx]
chunks = _chunk_content(doc["content"])
query_terms = [term for term in re.findall(r"\w+", query_text) if term]
matches: list[dict] = []
for i, chunk in enumerate(chunks, start=1):
haystack = chunk.lower()
if query_text in haystack or (query_terms and all(t in haystack for t in query_terms)):
matches.append({"cursor": i, "text": chunk})
return {
"doc_id": doc["id"],
"title": doc["title"],
"query": query,
"matches": matches,
"total_matches": len(matches),
}
def serve() -> None:
"""Run as MCP server subprocess (called by Data Designer)."""
corpus_path = os.environ.get("CORPUS_PATH", "corpus.jsonl")
initialize(corpus_path)
mcp_server.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="BM25S corpus retriever MCP server")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("serve", help="Run the MCP server (reads CORPUS_PATH from env)")
stats_parser = subparsers.add_parser("stats", help="Print corpus statistics")
stats_parser.add_argument("--corpus-path", default="corpus.jsonl")
args = parser.parse_args()
if args.command == "serve":
serve()
elif args.command == "stats":
docs = load_corpus(args.corpus_path)
total_chars = sum(len(d["content"]) for d in docs)
print(f"Corpus: {args.corpus_path}")
print(f"Documents: {len(docs)}")
print(f"Total content: {total_chars:,} chars (~{total_chars // 4:,} tokens)")
else:
parser.print_help()
Key Resources: