Skip to content

Deep Research Trajectories with NeMo Data Designer and MCP Tool Use

Data Designer v0.5.0's MCP tool-use support lets you generate multi-turn research trajectories, the kind of data needed to train deep research agents that iteratively search, read, and synthesize evidence before answering a question.


OpenResearcher benchmark results across deep research tasks. Source: Li, Jiang, Ma et al., 2026.

Deep research agents like OpenResearcher (Li, Jiang, Ma et al., 2026) and Universal Deep Research (Belcak & Molchanov, 2025) generate long reasoning chains interleaved with tool calls: formulating queries, retrieving documents, reading passages, refining hypotheses, and eventually synthesizing an answer. Training these agents requires trajectory data capturing the full multi-turn interaction between a model and its tools: every search, every document opened, every dead end explored.

OpenResearcher demonstrated something worth paying attention to: synthetic trajectories generated against a local retriever (BM25 over a static corpus, no web APIs) are sufficient to train Nemotron Nano 3 to outperform GPT-4.1 on deep research benchmarks. The data format (complete tool-use traces showing how a model moves through an information space) matters more than model scale. Nemotron Nano 3, with only 3B active parameters, beats models orders of magnitude larger on multi-hop research tasks.

This post shows how to generate that same kind of training data using Data Designer's MCP tool-use capabilities. We build a retriever as an MCP server, construct a corpus with known-good evidence, run a teacher model through the full research process, and use an LLM judge for rejection sampling. The result is a pipeline that produces high-quality research trajectories you can use for supervised fine-tuning or as a starting point for RL.

Here's what one of those trajectories looks like, a 4-hop question answered correctly by Claude Opus 4.5 using the pipeline described below. Each line is a tool call; parallel calls within the same turn are grouped.

Example trajectory: 4-hop question, 31 turns, 49 tool calls
Q: Who was the mother of the person under whom the colonizer of the winner of a border war with Libya in the 1st century BC reached its greatest extent?
Reference: Marcia
⭐ = golden passage (contains evidence for the answer)
T1
T2
T3
T4
openmd_001400
T5
T6
T7
openmd_000167
T8
T9
openmd_001676 ⭐
T10
T11
openfw_021746
T12
T13
openmd_001054 ⭐
T14
T15
openmd_002075 ⭐
openmd_002510 ⭐
T16
T17
openmd_002117
T18
T19
T20
openmd_000722
T21
T22
T23
T24
openmd_002076 ⭐
T25
T26
T27
T28
T29
T30
T31
answer
Based on my extensive research, I have gathered the following evidence:

1. The Roman Empire reached its greatest territorial extent under Trajan (117 CE) — confirmed by multiple documents
2. Trajan's mother was from the Gens Marcia — the Trajan document states he "on his mother's side from the Gens Marcia, of an Italic family of Sabine origin"
3. Egypt was colonized/annexed by Rome after Cleopatra's death (30 BC), making it a Roman province
4. Egypt had border conflicts with Libya (ancient Marmaricae/Cyrenaica region) in the 1st century BC

The answer is Marcia.

Step 1: Building the Retrieval MCP Server

OpenResearcher's key design choice is a three-tool browser interface rather than a single retrieval call. The paper argues (and their ablations confirm) that separating search, document opening, and in-document search forces the model to develop genuine research strategies: skimming results, diving into promising documents, hunting for specific evidence within them. A single monolithic "retrieve" tool collapses this entire workflow into one step, which produces shorter and less useful training trajectories.

We implement the same three tools as an MCP server that Data Designer can invoke during generation. Our retriever uses BM25S for fast lexical search over the corpus:

from mcp.server.fastmcp import FastMCP

mcp_server = FastMCP("corpus-retriever")

@mcp_server.tool()
def search(query: str, top_k: int = 10) -> dict:
    """Search for candidate documents to explore."""
    # BM25S search over the corpus, returns ranked results with snippets
    ...

@mcp_server.tool(name="open")
def open_document(doc_id: str) -> dict:
    """Open a document for detailed inspection with cursor-numbered chunks."""
    # Returns content formatted as [1] paragraph... [2] paragraph...
    ...

@mcp_server.tool()
def find(doc_id: str, query: str) -> dict:
    """Find matching passages inside a document by keyword."""
    # Returns matching chunks with cursor positions
    ...

if __name__ == "__main__":
    mcp_server.run()

search returns a ranked list of document IDs with short snippets, enough for the model to decide which documents look promising. open returns the full document content, split into cursor-numbered chunks so the model can reference specific passages. find does targeted keyword search within a single document, letting the model locate specific evidence without reading the entire thing. The cursor-based chunking across open and find gives the model a way to scan long documents incrementally, the way a human researcher would scan a paper for the relevant section rather than reading it cover to cover.

The server runs as a local stdio process, which means Data Designer launches and manages it automatically. No external services, no API keys for retrieval, no rate limits.


Step 2: Building the Corpus

The corpus design follows directly from OpenResearcher's most striking ablation result. They tested what happens when you vary the retrieval corpus while keeping the reasoning model fixed (GPT-OSS-120B). The results, from the OpenResearcher Appendix:

Corpus BrowseComp-Plus Accuracy
Golden passages only (BrowseComp-Plus corpus) 56.0%
15M FineWeb + golden passages 31.2%
15M FineWeb only 0.71%

Without golden passages (documents known to contain evidence for the question), accuracy drops to nearly zero. The model can't learn research strategies from trajectories where every search is a dead end.

The original OpenResearcher corpus uses 15M documents from FineWeb as distractors alongside 10K golden passages. For this demonstration, we use a lighter-weight approach: we construct the corpus from multi-hop QA datasets: HotpotQA (2-hop questions requiring two pieces of linked evidence) and MuSiQue (2-4 hop questions composed from single-hop sub-questions). Each question comes with annotated supporting passages, the specific paragraphs that contain the evidence needed to answer it. Golden passages go into the corpus alongside non-supporting passages from the same datasets as distractors, at roughly a 1:9 ratio. The model has to search through noise to find the signal, which is exactly the skill we want the training data to teach.

The key constraint is that golden passages must be findable but not obvious. If the corpus is too small or the golden passages are too easy to identify, the trajectories won't transfer to real-world research where evidence is sparse. The distractor ratio controls this difficulty, and the paper's ablations give us a good starting point for tuning it.


Step 3: The Data Designer Pipeline

With the retriever server and corpus ready, the Data Designer pipeline ties everything together. We configure a teacher model, point it at the MCP retriever, and let it research each question from scratch. For this demo we hosted our own inference server, but anyone can try this pipeline using Nemotron Nano 3 on build.nvidia.com with a free API key using the model configuration shown below.

import data_designer.config as dd
from data_designer.interface import DataDesigner

# Search rollout model for trajectory generation
config = dd.DataDesignerConfigBuilder()
config.add_model_config(
    dd.ModelConfig(
        alias="search_rollout_model",
        model="nvidia/nemotron-3-nano-30b-a3b",
        provider="nvidia",
        inference_parameters=dd.ChatCompletionInferenceParams(
            temperature=1.0,
            top_p=0.95,
            max_tokens=16384,
        ),
    )
)

The temperature and top_p settings matter here. We want diverse research strategies across seeds (different query formulations, different document exploration orders) so that rejection sampling has a rich pool to select from. Setting temperature to 1.0 with top_p at 0.95 gives enough variation that the same question can produce meaningfully different trajectories across seeds.

The MCP tool configuration tells Data Designer which server to use and how many tool-call turns to allow:

# MCP retriever tool configuration
tool_config = dd.ToolConfig(
    tool_alias="knowledge-base",
    providers=["corpus-retriever"],
    max_tool_call_turns=150,
)
config.add_tool_config(tool_config)

We set max_tool_call_turns high (150) because deep research trajectories can be long. Our longest observed trajectory used 25 tool calls across 53 messages. Capping too low would truncate the most interesting research chains.

The seed dataset contains the research questions alongside reference answers (which we'll use for rejection sampling in Step 4):

config.with_seed_dataset(
    dd.LocalFileSeedSource(path="questions.jsonl"),
)

config.add_column(
    dd.ExpressionColumnConfig(
        name="research_question",
        expr="{{ question }}",
    )
)

The core of the pipeline is the research column, where the teacher model receives a question and a system prompt instructing it to use the retriever tools:

SYSTEM_PROMPT = """You are a thorough research assistant. You have access to three tools \
for navigating a knowledge base:
- search(query, top_k): Find candidate documents relevant to your query
- open(doc_id): Open a document to read its full content in numbered chunks
- find(doc_id, query): Locate specific passages within a document by keyword

Your task is to research the given question by searching for relevant documents, \
reading their content, and synthesizing an answer from the evidence you find. \
Be systematic: formulate search queries, explore promising results, and gather \
evidence before answering. Cite specific passages when possible."""

config.add_column(
    dd.LLMTextColumnConfig(
        name="research_answer",
        prompt="Research and answer thoroughly:\n\n{{ research_question }}",
        model_alias="search_rollout_model",
        system_prompt=SYSTEM_PROMPT,
        tool_alias="knowledge-base",
        with_trace=dd.TraceType.ALL_MESSAGES,
        extract_reasoning_content=True,
    )
)

Two settings are doing the important work here. with_trace=dd.TraceType.ALL_MESSAGES captures the entire interaction (every tool call, every tool response, every intermediate reasoning step) into a separate trace column in ChatML format. This is the training data: the full trajectory of how the model moved through the information space. extract_reasoning_content=True pulls out the model's internal chain-of-thought separately, so you can include or exclude it depending on your training setup.


Step 4: Rejection Sampling with an LLM Judge

Not every trajectory leads to a correct answer. OpenResearcher's approach is straightforward. Generate multiple trajectories per question, score them for correctness, and keep only the ones that got the right answer. We implement this with Data Designer's LLMJudgeColumnConfig, using a separate (smaller) model as the judge:

# Judge model for rejection sampling
config.add_model_config(
    dd.ModelConfig(
        alias="judge",
        model="nvidia/nemotron-3-nano-30b-a3b",
        provider="nvidia",
    )
)

config.add_column(
    dd.LLMJudgeColumnConfig(
        name="correctness",
        model_alias="judge",
        prompt=(
            "Question: {{ research_question }}\n"
            "Reference answer: {{ answer }}\n"
            "Generated answer: {{ research_answer }}\n"
            "Does the generated answer correctly address the question?"
        ),
        scores=[
            dd.Score(
                name="correct",
                description="Is the answer factually correct?",
                options={
                    1: "Correct",
                    0: "Incorrect",
                },
            ),
        ],
    )
)

The judge compares the generated answer against the reference answer from the seed dataset. Using a smaller model as judge is deliberate. We don't need the judge to reason about the question, just to compare two answers for factual agreement. This keeps costs down when scoring thousands of trajectories.

In practice, you'd generate multiple trajectories per question (varying the random seed) and filter to correctness.correct == 1. The incorrect trajectories aren't wasted; they can serve as negative examples for preference-based training methods like DPO.


Multi-Turn Tool Calling: Rough Edges in the Open Model Ecosystem

The pipeline described above is straightforward in principle. In practice, getting multi-turn tool calling to work reliably with open-weight models served through vLLM turned out to be the hardest part of this project.

We tested two open-weight models on a self-hosted vLLM (v0.15.1) instance: GPT-OSS-120B and Kimi K2.5. Both failed to produce usable research trajectories, for related but distinct reasons.

GPT-OSS-120B uses a "Harmony" output format that routes text through named channels (reasoning, final answer, tool calls). When tools are involved, vLLM's parser consistently routes the model's output to the wrong channel: everything lands in reasoning_content while the content field stays empty. This happens at all reasoning_effort levels. The model does the research (calls tools, reads documents, formulates queries) but the final synthesized answer never appears where the serving layer expects it. This is a known issue in vLLM's Harmony format handling. Here's the final message from a typical trajectory. The model has been researching for 5 tool calls but produces no answer:

{
  "role": "assistant",
  "content": [{"type": "text", "text": ""}],
  "reasoning_content": "It seems that the knowledge base may have a page about
    Colin Bateman that includes his biography. Possibly the 'md_001100' entry is
    about a footballer, not the author. The author Colin Bateman likely ...",
  "tool_calls": null
}

The model's reasoning shows it has the answer (it identified Colin Bateman as the author), but the content field is empty and no tool call is emitted. The trajectory ends here with nothing to show for it.

Kimi K2.5 exhibits a different failure mode. With its thinking mode enabled, it has the same channel-routing problem as GPT-OSS. With thinking mode disabled, the model produces content text, but after the first tool result, it narrates what it plans to do next rather than emitting another tool call. The serving layer sees text content without tool calls and treats it as the final answer, terminating the research loop after a single search:

{
  "role": "assistant",
  "content": "I found that 'Cycle of Violence' was written by Colin Bateman,
    described as a 'Northern Irish author'. Now let me search for more details
    about his birthplace to confirm his birth country.",
  "reasoning_content": "The search results clearly show that 'Cycle of Violence'
    was written by Colin Bateman, a Northern Irish author...",
  "tool_calls": null
}

The model intends to keep researching ("let me search for more details") but describes the action instead of calling the tool. The framework sees content, no tool calls, and stops. We tried multiple tokenizer modes, prompt variations, and vLLM configurations; open issues on the model's HuggingFace page confirm this is a broader compatibility gap.

The original OpenResearcher codebase handles this by bypassing vLLM's tool call parser entirely. They hit the raw /completions endpoint (openai_generator.py), parse <tool_call> XML tags from the output with regex, and continue looping until the model emits an explicit answer marker like <answer> or final answer: (deploy_agent.py).

The open-source tool-calling stack is growing and maturing quickly, but multi-turn tool use with reasoning models is still a rough edge. For now, the practical path is to use models with battle-tested tool-calling support through their native APIs, which is what we do in the results below.


Results

We ran 64 questions uniformly sampled across 2, 3, and 4-hop difficulty levels from MuSiQue, with 50K FineWeb web documents as distractors (a 1:100 golden-to-distractor ratio). We tested two models, Claude Opus 4.5 (via API) and Nemotron Nano 3 (30B total / 3B active params, self-hosted via vLLM with reasoning disabled).

Claude Opus 4.5 Nemotron Nano 3
Samples 64 (55 completed) 64 (61 completed)
Overall accuracy 41/55 (75%) 32/61 (52%)
2-hop accuracy 18/23 (78%) 13/23 (57%)
3-hop accuracy 15/18 (83%) 11/22 (50%)
4-hop accuracy 8/14 (57%) 8/16 (50%)
Avg tool calls 16.8 11.8
Max tool calls 57 63
Avg messages per trajectory 40.4 26.5
Max messages per trajectory 117 129

Opus is 22 points more accurate, but Nano runs roughly 5x faster on self-hosted hardware. Both models show tool usage scaling with hop count. Nano uses fewer tools but achieves lower accuracy, with the largest gap on 2-hop questions (78% vs 57%). Splitting by correctness reveals the same pattern in both models: incorrect trajectories are longer.

Claude Opus 4.5:

Outcome Hops Count Avg Tool Calls Avg Messages Avg Answer Length
Correct 2 18 7.3 18.9 1,072 chars
3 15 14.9 35.7 1,372 chars
4 8 21.0 50.6 1,705 chars
All 41 12.8 31.2 1,305 chars
Incorrect 2 5 21.0 48.6 1,534 chars
3 3 25.7 63.0 1,795 chars
4 6 36.0 85.2 1,903 chars
All 14 28.4 67.4 1,748 chars

Nemotron Nano 3:

Outcome Hops Count Avg Tool Calls Avg Messages Avg Answer Length
Correct 2 13 6.5 16.1 773 chars
3 11 12.7 28.5 708 chars
4 8 8.0 19.0 1,600 chars
All 32 9.0 21.1 957 chars
Incorrect 2 10 10.1 23.2 799 chars
3 11 18.0 39.0 1,163 chars
4 8 16.2 35.5 848 chars
All 29 14.8 32.6 951 chars

Correct trajectories are shorter at every hop level for both models. Incorrect trajectories are roughly twice as long because the model keeps searching when it can't find evidence, then writes a longer answer to compensate. This anti-correlation between trajectory length and correctness is consistent across model scales, which means trajectory length alone could serve as a lightweight filter during rejection sampling.


Closing Remarks

Thanks to the OpenResearcher team for their work showing that synthetic research trajectories over local retrieval can train small models to compete with much larger ones. Their results suggest we're only beginning to understand how LLMs interact with search tools and how the structure of those interactions shapes what models learn. We're excited to see where the community takes synthetic data research using NeMo Data Designer as both the models and the tooling continue to improve.


Try For Yourself

Full source: openresearcher_demo.py
import data_designer.config as dd
from data_designer.interface import DataDesigner

# Models
config = dd.DataDesignerConfigBuilder()
config.add_model_config(
    dd.ModelConfig(
        alias="search_rollout_model",
        model="nvidia/nemotron-3-nano-30b-a3b",
        provider="nvidia",
        inference_parameters=dd.ChatCompletionInferenceParams(
            temperature=1.0,
            top_p=0.95,
            max_tokens=16384,
        ),
    )
)
config.add_model_config(
    dd.ModelConfig(
        alias="judge",
        model="nvidia/nemotron-3-nano-30b-a3b",
        provider="nvidia",
    )
)

# MCP retriever
tool_config = dd.ToolConfig(
    tool_alias="knowledge-base",
    providers=["corpus-retriever"],
    max_tool_call_turns=150,
)
config.add_tool_config(tool_config)

# Seed questions with reference answers
config.with_seed_dataset(
    dd.LocalFileSeedSource(path="questions.jsonl"),
)

config.add_column(
    dd.ExpressionColumnConfig(
        name="research_question",
        expr="{{ question }}",
    )
)

# Research trajectory generation
config.add_column(
    dd.LLMTextColumnConfig(
        name="research_answer",
        prompt="Research and answer thoroughly:\n\n{{ research_question }}",
        model_alias="search_rollout_model",
        system_prompt=SYSTEM_PROMPT,
        tool_alias="knowledge-base",
        with_trace=dd.TraceType.ALL_MESSAGES,
        extract_reasoning_content=True,
    )
)

# Rejection sampling judge
config.add_column(
    dd.LLMJudgeColumnConfig(
        name="correctness",
        model_alias="judge",
        prompt=(
            "Question: {{ research_question }}\n"
            "Reference answer: {{ answer }}\n"
            "Generated answer: {{ research_answer }}\n"
            "Does the generated answer correctly address the question?"
        ),
        scores=[
            dd.Score(
                name="correct",
                description="Is the answer factually correct?",
                options={
                    1: "Correct",
                    0: "Incorrect",
                },
            ),
        ],
    )
)

# Run
mcp_provider = dd.LocalStdioMCPProvider(
    name="corpus-retriever",
    command="uv",
    args=["run", "retriever_mcp.py", "serve"],
    env={"CORPUS_PATH": "corpus.jsonl"},
)
data_designer = DataDesigner(mcp_providers=[mcp_provider])
results = data_designer.create(
    config_builder=config,
    num_records=1000,
    dataset_name="research-trajectories",
)
Full source: prepare_corpus.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["datasets", "huggingface_hub", "pyarrow"]
# ///

"""Prepare a retrieval corpus and question set for the OpenResearcher demo.

Builds corpus.jsonl and questions.jsonl from two sources:

    1. MuSiQue — multi-hop QA dataset (2/3/4-hop) with golden passages
    2. FineWeb — web documents as distractors (matches the OpenResearcher paper)

Golden passages (documents containing evidence for the answer) are mixed with
FineWeb distractors at roughly 1:100 ratio, so the model must search through
noise to find the signal.

Usage:
    uv run prepare_corpus.py
"""

from __future__ import annotations

import json
import random
from pathlib import Path
from urllib.parse import urlparse

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

NUM_QUESTIONS = 192          # 64 per hop level (2, 3, 4)
NUM_FINEWEB_DISTRACTORS = 50_000
FINEWEB_SHARD = 0
OUTPUT_DIR = "data"

# ---------------------------------------------------------------------------
# MuSiQue extraction
# ---------------------------------------------------------------------------

def prepare_musique(num_questions: int) -> tuple[list[dict], list[dict]]:
    """Load MuSiQue and extract multi-hop questions with golden passages.

    Samples uniformly across hop counts (2, 3, 4) so the dataset has balanced
    difficulty. Golden passages (is_supporting=True) go into the corpus;
    non-golden passages from the same examples serve as additional distractors.

    Returns:
        (questions, corpus_docs) where corpus_docs have is_golden=True/False.
    """
    from datasets import load_dataset

    print("Loading MuSiQue (train split)...")
    dataset = load_dataset("bdsaglam/musique", split="train")

    # Bucket answerable examples by hop count
    hop_buckets: dict[int, list[dict]] = {}
    for example in dataset:
        if not example.get("answerable", False):
            continue
        num_hops = len(example.get("question_decomposition", []))
        if num_hops < 2:
            continue
        hop_buckets.setdefault(num_hops, []).append(example)

    # Sample uniformly: equal questions per hop level
    available_hops = sorted(hop_buckets.keys())
    per_hop = num_questions // len(available_hops)
    selected_examples = []
    for h in available_hops:
        bucket = hop_buckets[h]
        n = min(per_hop, len(bucket))
        selected_examples.extend(random.sample(bucket, n))

    print(f"  Selected {len(selected_examples)} questions across hops {available_hops}")

    # Build questions and corpus docs
    questions: list[dict] = []
    golden_titles: dict[str, str] = {}
    nongolden_titles: dict[str, str] = {}

    for example in selected_examples:
        num_hops = len(example["question_decomposition"])
        questions.append({
            "id": f"mq_{len(questions):06d}",
            "question": example["question"],
            "answer": example["answer"],
            "source": "musique",
            "num_hops": num_hops,
            "seed_id": 0,
        })

        for para in example.get("paragraphs", []):
            title = para.get("title", "").strip()
            content = para.get("paragraph_text", "").strip()
            if not title or not content:
                continue
            if para.get("is_supporting", False):
                if len(content) > len(golden_titles.get(title, "")):
                    golden_titles[title] = content
            else:
                if len(content) > len(nongolden_titles.get(title, "")):
                    nongolden_titles[title] = content

    # Golden passages
    corpus_docs = [
        {"title": t, "content": c, "source": "musique", "is_golden": True}
        for t, c in sorted(golden_titles.items())
    ]
    # Non-golden passages (skip titles already in golden set)
    corpus_docs.extend(
        {"title": t, "content": c, "source": "musique", "is_golden": False}
        for t, c in sorted(nongolden_titles.items())
        if t not in golden_titles
    )

    print(f"  Golden passages: {len(golden_titles)}")
    print(f"  Non-golden passages: {len(corpus_docs) - len(golden_titles)}")
    return questions, corpus_docs


# ---------------------------------------------------------------------------
# FineWeb distractor caching
# ---------------------------------------------------------------------------

def cache_fineweb(shard_index: int, max_docs: int) -> list[dict]:
    """Download a FineWeb parquet shard and extract English documents.

    Uses huggingface_hub for direct shard download (faster than load_dataset)
    and pyarrow for memory-efficient row-group-at-a-time reading.

    Returns:
        List of distractor documents with title (domain) and content (text).
    """
    from huggingface_hub import hf_hub_download
    import pyarrow.parquet as pq

    filename = f"sample/10BT/{shard_index:03d}_00000.parquet"
    print(f"Downloading FineWeb shard: {filename}")
    parquet_path = hf_hub_download(
        repo_id="HuggingFaceFW/fineweb",
        repo_type="dataset",
        filename=filename,
    )

    pf = pq.ParquetFile(parquet_path)
    print(f"  {pf.metadata.num_rows:,} rows in shard")

    docs: list[dict] = []
    for rg_idx in range(pf.metadata.num_row_groups):
        table = pf.read_row_group(rg_idx, columns=["text", "url", "language", "token_count"])
        batch = table.to_pydict()

        for text, url, lang, tok_count in zip(
            batch["text"], batch["url"], batch["language"], batch["token_count"]
        ):
            if lang != "en" or tok_count < 50:
                continue
            text = text.strip()
            if not text:
                continue

            # Use domain as title
            try:
                domain = urlparse(url).netloc.removeprefix("www.")
            except Exception:
                domain = "unknown"

            docs.append({
                "title": domain,
                "content": text,
                "source": "fineweb",
                "is_golden": False,
            })
            if len(docs) >= max_docs:
                break

        if len(docs) >= max_docs:
            break

    print(f"  Extracted {len(docs):,} English documents (min 50 tokens)")
    return docs


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main() -> None:
    output_dir = Path(OUTPUT_DIR)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Extract MuSiQue questions and golden passages
    questions, corpus_docs = prepare_musique(NUM_QUESTIONS)

    # Download FineWeb distractors
    fineweb_docs = cache_fineweb(FINEWEB_SHARD, NUM_FINEWEB_DISTRACTORS)
    corpus_docs.extend(fineweb_docs)

    # Deduplicate by title (keep longest content)
    title_to_best: dict[str, dict] = {}
    for doc in corpus_docs:
        title = doc["title"]
        if title not in title_to_best or len(doc["content"]) > len(title_to_best[title]["content"]):
            title_to_best[title] = doc

    corpus = list(title_to_best.values())
    random.shuffle(corpus)

    # Assign stable IDs
    prefix_map = {"musique": "md", "fineweb": "fw"}
    source_counters: dict[str, int] = {}
    for doc in corpus:
        prefix = prefix_map.get(doc["source"], "xx")
        idx = source_counters.get(doc["source"], 0)
        doc["id"] = f"{prefix}_{idx:06d}"
        source_counters[doc["source"]] = idx + 1

    # Write corpus.jsonl
    corpus_path = output_dir / "corpus.jsonl"
    with open(corpus_path, "w") as f:
        for doc in corpus:
            f.write(json.dumps(doc, ensure_ascii=False) + "\n")

    # Write questions.jsonl
    random.shuffle(questions)
    questions_path = output_dir / "questions.jsonl"
    with open(questions_path, "w") as f:
        for q in questions:
            f.write(json.dumps(q, ensure_ascii=False) + "\n")

    # Summary
    golden = sum(1 for d in corpus if d["is_golden"])
    nongolden = len(corpus) - golden
    print(f"\nCorpus: {len(corpus):,} docs ({golden} golden, {nongolden} distractors)")
    print(f"Questions: {len(questions)}")
    print(f"Output: {corpus_path.resolve()}")
    print(f"         {questions_path.resolve()}")


if __name__ == "__main__":
    main()
Full source: retriever_mcp.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["mcp", "bm25s", "PyStemmer"]
# ///

"""MCP Server: BM25S Corpus Retriever for OpenResearcher-style Deep Research

A single-file MCP server that indexes a JSONL corpus and exposes BM25S
lexical search via three browser tools:

    - search(query, top_k): ranked document discovery
    - open(doc_id): full document inspection with cursor-numbered chunks
    - find(doc_id, query): in-document evidence lookup

Corpus format (JSONL, one document per line):
    {"id": "wiki_123", "title": "Christopher Nolan", "content": "Christopher Edward Nolan is a..."}

Server mode (used by Data Designer):
    CORPUS_PATH=corpus.jsonl uv run retriever_mcp.py serve
"""

from __future__ import annotations

import argparse
import json
import os
import re
import sys

import bm25s
from mcp.server.fastmcp import FastMCP

MCP_SERVER_NAME = "corpus-retriever"

# Global state — populated at server startup
_bm25_retriever: bm25s.BM25 | None = None
_corpus: list[dict[str, str]] = []
_id_to_index: dict[str, int] = {}

mcp_server = FastMCP(MCP_SERVER_NAME)


def load_corpus(corpus_path: str) -> list[dict[str, str]]:
    """Load a JSONL corpus file into a list of document dicts."""
    docs: list[dict[str, str]] = []
    with open(corpus_path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            try:
                doc = json.loads(line)
            except json.JSONDecodeError as e:
                print(f"Warning: skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
                continue
            if "id" not in doc or "content" not in doc:
                print(f"Warning: skipping line {line_num}, missing 'id' or 'content'", file=sys.stderr)
                continue
            docs.append({
                "id": str(doc["id"]),
                "title": str(doc.get("title", "")),
                "content": str(doc["content"]),
            })
    return docs


def build_index(docs: list[dict[str, str]]) -> bm25s.BM25:
    """Build a BM25S index over title + content for each document."""
    corpus_texts = [f"{d['title']} {d['content']}" for d in docs]
    corpus_tokens = bm25s.tokenize(corpus_texts, stopwords="en")
    retriever = bm25s.BM25()
    retriever.index(corpus_tokens)
    return retriever


def initialize(corpus_path: str) -> None:
    """Load corpus and build index into global state."""
    global _bm25_retriever, _corpus, _id_to_index
    print(f"Loading corpus from {corpus_path}...", file=sys.stderr)
    _corpus = load_corpus(corpus_path)
    if not _corpus:
        print("Warning: corpus is empty", file=sys.stderr)
        return
    _id_to_index = {doc["id"]: idx for idx, doc in enumerate(_corpus)}
    print(f"Building BM25S index over {len(_corpus)} documents...", file=sys.stderr)
    _bm25_retriever = build_index(_corpus)
    print(f"Index ready. {len(_corpus)} documents indexed.", file=sys.stderr)


def _chunk_content(content: str) -> list[str]:
    """Split document content into cursor-addressable chunks."""
    paragraph_chunks = [c.strip() for c in re.split(r"\n\s*\n+", content) if c.strip()]
    if len(paragraph_chunks) > 1:
        return paragraph_chunks
    line_chunks = [line.strip() for line in content.splitlines() if line.strip()]
    if line_chunks:
        return line_chunks
    stripped = content.strip()
    return [stripped] if stripped else []


@mcp_server.tool()
def search(query: str, top_k: int = 10) -> dict:
    """Search for candidate documents to explore.

    Args:
        query: Search query string.
        top_k: Maximum number of ranked results (default: 10).
    """
    global _bm25_retriever, _corpus
    if _bm25_retriever is None or not _corpus:
        return {"error": "Search index not initialized", "results": []}
    query_tokens = bm25s.tokenize([query], stopwords="en")
    k = max(1, min(top_k, len(_corpus)))
    results, scores = _bm25_retriever.retrieve(query_tokens, k=k)
    search_results: list[dict] = []
    for i in range(results.shape[1]):
        doc_idx = results[0, i]
        score = float(scores[0, i])
        if score <= 0:
            continue
        doc = _corpus[doc_idx]
        snippet = doc["content"][:500]
        if len(doc["content"]) > 500:
            snippet += "..."
        search_results.append({
            "id": doc["id"],
            "title": doc["title"],
            "snippet": snippet,
            "score": round(score, 4),
        })
    return {"results": search_results, "query": query, "total": len(search_results)}


@mcp_server.tool(name="open")
def open_document(doc_id: str) -> dict:
    """Open a document for detailed inspection with cursor-numbered chunks.

    Args:
        doc_id: The document ID (from search results).
    """
    global _corpus, _id_to_index
    if not _corpus:
        return {"error": "Corpus not loaded"}
    idx = _id_to_index.get(doc_id)
    if idx is None:
        return {"error": f"Document not found: {doc_id}"}
    doc = _corpus[idx]
    chunks = _chunk_content(doc["content"])
    numbered_chunks = [{"cursor": i + 1, "text": chunk} for i, chunk in enumerate(chunks)]
    formatted = "\n".join(f"[{e['cursor']}] {e['text']}" for e in numbered_chunks)
    return {
        "id": doc["id"],
        "title": doc["title"],
        "content": formatted,
        "chunks": numbered_chunks,
        "total_chunks": len(numbered_chunks),
    }


@mcp_server.tool()
def find(doc_id: str, query: str) -> dict:
    """Find matching passages inside a document by keyword.

    Args:
        doc_id: Document ID to search within.
        query: Text to find (case-insensitive substring and keyword matching).
    """
    global _corpus, _id_to_index
    if not _corpus:
        return {"error": "Corpus not loaded", "matches": []}
    idx = _id_to_index.get(doc_id)
    if idx is None:
        return {"error": f"Document not found: {doc_id}", "matches": []}
    query_text = query.strip().lower()
    if not query_text:
        return {"error": "Query must be non-empty", "matches": []}
    doc = _corpus[idx]
    chunks = _chunk_content(doc["content"])
    query_terms = [term for term in re.findall(r"\w+", query_text) if term]
    matches: list[dict] = []
    for i, chunk in enumerate(chunks, start=1):
        haystack = chunk.lower()
        if query_text in haystack or (query_terms and all(t in haystack for t in query_terms)):
            matches.append({"cursor": i, "text": chunk})
    return {
        "doc_id": doc["id"],
        "title": doc["title"],
        "query": query,
        "matches": matches,
        "total_matches": len(matches),
    }


def serve() -> None:
    """Run as MCP server subprocess (called by Data Designer)."""
    corpus_path = os.environ.get("CORPUS_PATH", "corpus.jsonl")
    initialize(corpus_path)
    mcp_server.run()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="BM25S corpus retriever MCP server")
    subparsers = parser.add_subparsers(dest="command")
    subparsers.add_parser("serve", help="Run the MCP server (reads CORPUS_PATH from env)")
    stats_parser = subparsers.add_parser("stats", help="Print corpus statistics")
    stats_parser.add_argument("--corpus-path", default="corpus.jsonl")
    args = parser.parse_args()
    if args.command == "serve":
        serve()
    elif args.command == "stats":
        docs = load_corpus(args.corpus_path)
        total_chars = sum(len(d["content"]) for d in docs)
        print(f"Corpus: {args.corpus_path}")
        print(f"Documents: {len(docs)}")
        print(f"Total content: {total_chars:,} chars (~{total_chars // 4:,} tokens)")
    else:
        parser.print_help()

Key Resources:

  1. NeMo Data Designer on GitHub
  2. OpenResearcher on GitHub
  3. OpenResearcher blog post
  4. HotpotQA: A Dataset for Diverse, Explainable Multi-hop Question Answering
  5. MuSiQue: Multi-hop Questions via Single-hop Question Composition
  6. BM25S: Fast lexical search in Python