Initial MCP server with Docker support

This commit is contained in:
2026-01-14 23:45:48 -05:00
commit f35ee34bbf
12 changed files with 1142 additions and 0 deletions

13
.dockerignore Normal file
View File

@@ -0,0 +1,13 @@
.venv
__pycache__
*.pyc
*.pyo
*.pyd
.git
.gitignore
.DS_Store
.cache
.mypy_cache
.pytest_cache
.env
.env.*

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
.venv/
__pycache__/
*.pyc
*.pyo
*.pyd
*.egg-info/
.cache/
.mypy_cache/
.pytest_cache/
.env
.env.*

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
# syntax=docker/dockerfile:1
FROM python:3.12-slim AS builder
WORKDIR /app
ENV PIP_DISABLE_PIP_VERSION_CHECK=1 \
PIP_NO_CACHE_DIR=1
COPY pyproject.toml README.md openapi.json ./
COPY src ./src
RUN pip install --upgrade pip \
&& pip wheel . -w /wheels
FROM python:3.12-slim AS runtime
WORKDIR /app
ENV PYTHONUNBUFFERED=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1 \
PIP_NO_CACHE_DIR=1
COPY --from=builder /wheels /wheels
RUN pip install /wheels/*.whl \
&& rm -rf /wheels
ENV MCP_TRANSPORT=streamable-http \
MCP_HOST=0.0.0.0 \
MCP_PORT=8000 \
LIGHTRAG_BASE_URL=http://host.docker.internal:9621
EXPOSE 8000
ENTRYPOINT ["lightrag-mcp"]

52
README.md Normal file
View File

@@ -0,0 +1,52 @@
# LightRAG MCP Server
An MCP server exposing the LightRAG Server API as tools, resources, and prompts for coding agents.
## Features
- Retrieval tools: `query_data`, `query`, `query_stream`, `query_stream_chunks`
- Ingestion tools: `ingest_text`, `ingest_texts`, `ingest_file`, `ingest_files`, `upload_document`
- Freshness tools: `scan_documents`, `scan_and_wait`, `pipeline_status`, `wait_for_idle`, `track_status`
- Memory tool: `ingest_memory` for lessons, preferences, decisions, structures, functions, relationships
- Graph tools: entity/relation CRUD, entity existence check, label search, graph export
- Health tool: `health`
- Macro tool: `refresh_and_query` (scan -> wait idle -> query_data -> query)
- Resources: health, pipeline status, documents, graph, labels (list + popular), status counts
- Prompts: evidence-first answering, refresh-then-query, record project memory
## Quickstart
```bash
# Create a venv and install
python -m venv .venv
. .venv/bin/activate
pip install -e .
# Run the MCP server (streamable HTTP)
export MCP_TRANSPORT=streamable-http
export MCP_HOST=127.0.0.1
export MCP_PORT=8000
export LIGHTRAG_BASE_URL=http://127.0.0.1:9621
lightrag-mcp
# Smoke test (health + optional retrieval)
lightrag-mcp-smoke --query "What is this project?" --format pretty
```
## Configuration
- `LIGHTRAG_BASE_URL` (default `http://127.0.0.1:9621`)
- `LIGHTRAG_TIMEOUT_S` (default `60`)
- `LIGHTRAG_POLL_INTERVAL_S` (default `1`)
- `LIGHTRAG_POLL_TIMEOUT_S` (default `120`)
- `MCP_TRANSPORT` (default `streamable-http`)
- `MCP_HOST` (default `127.0.0.1`)
- `MCP_PORT` (default `8000`)
- `MCP_SERVER_NAME` (default `LightRAG MCP`)
## Notes
- `query_stream` collects the streaming response and returns it as a single string.
- `query_stream_chunks` returns chunked output and reports progress to clients that support progress events.
- `refresh_and_query` is a convenience macro for evidence-first workflows.
- `ingest_file(s)` chunk local files and store them with `file_source` references.

15
docker-compose.yml Normal file
View File

@@ -0,0 +1,15 @@
services:
lightrag-mcp:
build:
context: .
image: git.baked.rocks/vasceannie/lightrag-mcp:latest
environment:
MCP_TRANSPORT: streamable-http
MCP_HOST: 0.0.0.0
MCP_PORT: 8000
LIGHTRAG_BASE_URL: http://host.docker.internal:9621
ports:
- "8000:8000"
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped

1
openapi.json Normal file

File diff suppressed because one or more lines are too long

33
pyproject.toml Normal file
View File

@@ -0,0 +1,33 @@
[project]
name = "lightrag-mcp"
version = "0.1.0"
description = "MCP server for LightRAG Server API"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [
{ name = "LightRAG MCP" }
]
dependencies = [
"httpx>=0.27.0",
"mcp>=1.2.0",
"pydantic>=2.6.0",
"pydantic-settings>=2.1.0",
]
[project.scripts]
lightrag-mcp = "lightrag_mcp.server:main"
lightrag-mcp-smoke = "lightrag_mcp.smoke:main"
[tool.ruff]
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "B"]
[tool.ruff.lint.isort]
known-first-party = ["lightrag_mcp"]
[build-system]
requires = ["hatchling>=1.20.0"]
build-backend = "hatchling.build"

View File

@@ -0,0 +1,5 @@
"""LightRAG MCP server package."""
__all__ = ["__version__"]
__version__ = "0.1.0"

105
src/lightrag_mcp/client.py Normal file
View File

@@ -0,0 +1,105 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
import httpx
class LightRAGClient:
def __init__(self, base_url: str, timeout_s: float = 60.0) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout_s
@asynccontextmanager
async def _client(self) -> AsyncIterator[httpx.AsyncClient]:
async with httpx.AsyncClient(base_url=self._base_url, timeout=self._timeout) as client:
yield client
async def request_json(
self,
method: str,
path: str,
*,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
files: dict[str, Any] | None = None,
) -> dict[str, Any]:
async with self._client() as client:
response = await client.request(method, path, json=json, params=params, files=files)
return _json_or_raise(response, method, path)
async def request_text(
self,
method: str,
path: str,
*,
json: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> str:
async with self._client() as client:
response = await client.request(method, path, json=json, params=params)
return _text_or_raise(response, method, path)
async def stream_text(
self,
path: str,
*,
json: dict[str, Any] | None = None,
) -> str:
chunks = [chunk async for chunk in self.stream_chunks(path, json=json)]
return "\n".join(chunks)
async def stream_chunks(
self,
path: str,
*,
json: dict[str, Any] | None = None,
) -> AsyncIterator[str]:
async with self._client() as client:
async with client.stream(
"POST",
path,
json=json,
headers={"accept": "text/event-stream"},
) as response:
_raise_for_status(response, "POST", path)
async for line in response.aiter_lines():
if not line:
continue
if line.startswith("data:"):
line = line[5:].lstrip()
yield line
async def upload_file(self, path: str | Path) -> dict[str, Any]:
file_path = Path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
async with self._client() as client:
with file_path.open("rb") as handle:
files = {"file": (file_path.name, handle)}
response = await client.request("POST", "/documents/upload", files=files)
return _json_or_raise(response, "POST", "/documents/upload")
def _raise_for_status(response: httpx.Response, method: str, path: str) -> None:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
text = exc.response.text.strip()
message = f"{method} {path} failed with {exc.response.status_code}"
if text:
message = f"{message}: {text}"
raise RuntimeError(message) from exc
def _json_or_raise(response: httpx.Response, method: str, path: str) -> dict[str, Any]:
_raise_for_status(response, method, path)
return response.json()
def _text_or_raise(response: httpx.Response, method: str, path: str) -> str:
_raise_for_status(response, method, path)
return response.text

779
src/lightrag_mcp/server.py Normal file
View File

@@ -0,0 +1,779 @@
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from typing import Any, Iterable
from mcp.server.fastmcp import Context, FastMCP
from lightrag_mcp.client import LightRAGClient
from lightrag_mcp.settings import load_settings
settings = load_settings()
client = LightRAGClient(settings.base_url, timeout_s=settings.timeout_s)
mcp = FastMCP(settings.mcp_server_name, json_response=True)
def _compact_payload(payload: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in payload.items() if value is not None}
def _build_query_payload(
query: str,
*,
mode: str | None = None,
top_k: int | None = None,
chunk_top_k: int | None = None,
include_references: bool | None = None,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
only_need_context: bool | None = None,
only_need_prompt: bool | None = None,
response_type: str | None = None,
user_prompt: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
hl_keywords: list[str] | None = None,
ll_keywords: list[str] | None = None,
max_total_tokens: int | None = None,
max_entity_tokens: int | None = None,
max_relation_tokens: int | None = None,
stream: bool | None = None,
options: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"query": query,
"mode": mode,
"top_k": top_k,
"chunk_top_k": chunk_top_k,
"include_references": include_references,
"include_chunk_content": include_chunk_content,
"enable_rerank": enable_rerank,
"only_need_context": only_need_context,
"only_need_prompt": only_need_prompt,
"response_type": response_type,
"user_prompt": user_prompt,
"conversation_history": conversation_history,
"hl_keywords": hl_keywords,
"ll_keywords": ll_keywords,
"max_total_tokens": max_total_tokens,
"max_entity_tokens": max_entity_tokens,
"max_relation_tokens": max_relation_tokens,
"stream": stream,
}
payload = _compact_payload(payload)
if options:
payload.update(options)
return payload
async def _wait_for_idle(timeout_s: float, interval_s: float) -> dict[str, Any]:
started = time.monotonic()
while True:
status = await client.request_json("GET", "/documents/pipeline_status")
if not status.get("busy"):
return status
if time.monotonic() - started > timeout_s:
raise TimeoutError("Pipeline did not become idle within timeout")
await asyncio.sleep(interval_s)
def _chunk_text(text: str, max_chars: int, overlap: int) -> list[str]:
if max_chars <= 0:
raise ValueError("max_chars must be > 0")
if overlap < 0 or overlap >= max_chars:
raise ValueError("overlap must be >= 0 and < max_chars")
chunks: list[str] = []
start = 0
length = len(text)
while start < length:
end = min(start + max_chars, length)
cut = text.rfind("\n", start, end)
if cut == -1 or cut <= start:
cut = end
chunk = text[start:cut].strip()
if chunk:
chunks.append(chunk)
if cut >= length:
break
next_start = max(cut - overlap, 0)
if next_start <= start:
next_start = cut
start = next_start
return chunks
def _format_list(items: Iterable[str]) -> str:
return ", ".join(item.strip() for item in items if item.strip())
def _build_memory_note(
*,
title: str,
content: str,
memory_type: str | None = None,
tags: list[str] | None = None,
related_files: list[str] | None = None,
related_symbols: list[str] | None = None,
source: str | None = None,
timestamp: str | None = None,
) -> str:
lines = [f"Title: {title}"]
if memory_type:
lines.append(f"Type: {memory_type}")
if tags:
lines.append(f"Tags: {_format_list(tags)}")
if related_files:
lines.append(f"Related files: {_format_list(related_files)}")
if related_symbols:
lines.append(f"Related symbols: {_format_list(related_symbols)}")
if source:
lines.append(f"Source: {source}")
if timestamp:
lines.append(f"Timestamp: {timestamp}")
lines.append("")
lines.append(content.strip())
return "\n".join(lines).strip()
@mcp.tool()
async def query_data(
query: str,
mode: str | None = "mix",
top_k: int | None = None,
chunk_top_k: int | None = None,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
only_need_context: bool | None = None,
only_need_prompt: bool | None = None,
response_type: str | None = None,
user_prompt: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
hl_keywords: list[str] | None = None,
ll_keywords: list[str] | None = None,
max_total_tokens: int | None = None,
max_entity_tokens: int | None = None,
max_relation_tokens: int | None = None,
options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Run retrieval-only query and return raw entities, relations, chunks, and references."""
payload = _build_query_payload(
query,
mode=mode,
top_k=top_k,
chunk_top_k=chunk_top_k,
include_chunk_content=include_chunk_content,
enable_rerank=enable_rerank,
only_need_context=only_need_context,
only_need_prompt=only_need_prompt,
response_type=response_type,
user_prompt=user_prompt,
conversation_history=conversation_history,
hl_keywords=hl_keywords,
ll_keywords=ll_keywords,
max_total_tokens=max_total_tokens,
max_entity_tokens=max_entity_tokens,
max_relation_tokens=max_relation_tokens,
options=options,
)
return await client.request_json("POST", "/query/data", json=payload)
@mcp.tool()
async def query(
query: str,
mode: str | None = "mix",
top_k: int | None = None,
chunk_top_k: int | None = None,
include_references: bool | None = True,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
only_need_context: bool | None = None,
only_need_prompt: bool | None = None,
response_type: str | None = None,
user_prompt: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
hl_keywords: list[str] | None = None,
ll_keywords: list[str] | None = None,
max_total_tokens: int | None = None,
max_entity_tokens: int | None = None,
max_relation_tokens: int | None = None,
options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Run generation query and return response with references when available."""
payload = _build_query_payload(
query,
mode=mode,
top_k=top_k,
chunk_top_k=chunk_top_k,
include_references=include_references,
include_chunk_content=include_chunk_content,
enable_rerank=enable_rerank,
only_need_context=only_need_context,
only_need_prompt=only_need_prompt,
response_type=response_type,
user_prompt=user_prompt,
conversation_history=conversation_history,
hl_keywords=hl_keywords,
ll_keywords=ll_keywords,
max_total_tokens=max_total_tokens,
max_entity_tokens=max_entity_tokens,
max_relation_tokens=max_relation_tokens,
options=options,
)
return await client.request_json("POST", "/query", json=payload)
@mcp.tool()
async def query_stream(
query: str,
mode: str | None = "mix",
top_k: int | None = None,
chunk_top_k: int | None = None,
include_references: bool | None = True,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
only_need_context: bool | None = None,
only_need_prompt: bool | None = None,
response_type: str | None = None,
user_prompt: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
hl_keywords: list[str] | None = None,
ll_keywords: list[str] | None = None,
max_total_tokens: int | None = None,
max_entity_tokens: int | None = None,
max_relation_tokens: int | None = None,
options: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Run streaming query and return the collected stream output."""
payload = _build_query_payload(
query,
mode=mode,
top_k=top_k,
chunk_top_k=chunk_top_k,
include_references=include_references,
include_chunk_content=include_chunk_content,
enable_rerank=enable_rerank,
only_need_context=only_need_context,
only_need_prompt=only_need_prompt,
response_type=response_type,
user_prompt=user_prompt,
conversation_history=conversation_history,
hl_keywords=hl_keywords,
ll_keywords=ll_keywords,
max_total_tokens=max_total_tokens,
max_entity_tokens=max_entity_tokens,
max_relation_tokens=max_relation_tokens,
stream=True,
options=options,
)
text = await client.stream_text("/query/stream", json=payload)
return {"response": text}
@mcp.tool()
async def query_stream_chunks(
query: str,
mode: str | None = "mix",
top_k: int | None = None,
chunk_top_k: int | None = None,
include_references: bool | None = True,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
only_need_context: bool | None = None,
only_need_prompt: bool | None = None,
response_type: str | None = None,
user_prompt: str | None = None,
conversation_history: list[dict[str, Any]] | None = None,
hl_keywords: list[str] | None = None,
ll_keywords: list[str] | None = None,
max_total_tokens: int | None = None,
max_entity_tokens: int | None = None,
max_relation_tokens: int | None = None,
max_chunks: int | None = None,
options: dict[str, Any] | None = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Stream query output as chunks, reporting progress when supported."""
payload = _build_query_payload(
query,
mode=mode,
top_k=top_k,
chunk_top_k=chunk_top_k,
include_references=include_references,
include_chunk_content=include_chunk_content,
enable_rerank=enable_rerank,
only_need_context=only_need_context,
only_need_prompt=only_need_prompt,
response_type=response_type,
user_prompt=user_prompt,
conversation_history=conversation_history,
hl_keywords=hl_keywords,
ll_keywords=ll_keywords,
max_total_tokens=max_total_tokens,
max_entity_tokens=max_entity_tokens,
max_relation_tokens=max_relation_tokens,
stream=True,
options=options,
)
chunks: list[str] = []
count = 0
async for chunk in client.stream_chunks("/query/stream", json=payload):
chunks.append(chunk)
count += 1
if ctx is not None:
await ctx.report_progress(count, None, message=f"Received chunk {count}")
if max_chunks is not None and count >= max_chunks:
break
return {"chunks": chunks, "response": "\n".join(chunks), "count": count}
@mcp.tool()
async def ingest_text(text: str, file_source: str | None = None) -> dict[str, Any]:
"""Ingest a single text chunk into the knowledge base."""
payload = _compact_payload({"text": text, "file_source": file_source})
return await client.request_json("POST", "/documents/text", json=payload)
@mcp.tool()
async def ingest_texts(
texts: list[str],
file_sources: list[str] | None = None,
) -> dict[str, Any]:
"""Ingest multiple text chunks into the knowledge base."""
payload: dict[str, Any] = {"texts": texts}
if file_sources is not None:
payload["file_sources"] = file_sources
return await client.request_json("POST", "/documents/texts", json=payload)
@mcp.tool()
async def ingest_file(
path: str,
max_chars: int = 4000,
overlap: int = 200,
encoding: str = "utf-8",
) -> dict[str, Any]:
"""Read a local file, chunk it, and ingest as texts with file_sources set per chunk."""
file_path = Path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
text = file_path.read_text(encoding=encoding, errors="replace")
chunks = _chunk_text(text, max_chars=max_chars, overlap=overlap)
if not chunks:
raise ValueError(f"No content to ingest from {file_path}")
file_sources = [f"{file_path}#chunk:{idx + 1}/{len(chunks)}" for idx in range(len(chunks))]
payload: dict[str, Any] = {"texts": chunks, "file_sources": file_sources}
return await client.request_json("POST", "/documents/texts", json=payload)
@mcp.tool()
async def ingest_files(
paths: list[str],
max_chars: int = 4000,
overlap: int = 200,
encoding: str = "utf-8",
) -> dict[str, Any]:
"""Ingest multiple local files by chunking each file into texts."""
results: dict[str, Any] = {}
for path in paths:
results[path] = await ingest_file(
path=path,
max_chars=max_chars,
overlap=overlap,
encoding=encoding,
)
return {"results": results}
@mcp.tool()
async def upload_document(path: str) -> dict[str, Any]:
"""Upload a local file to the LightRAG input directory."""
return await client.upload_file(path)
@mcp.tool()
async def scan_documents() -> dict[str, Any]:
"""Scan the input directory for new documents."""
return await client.request_json("POST", "/documents/scan")
@mcp.tool()
async def scan_and_wait(timeout_s: float | None = None) -> dict[str, Any]:
"""Scan for new documents and wait until the pipeline is idle."""
await client.request_json("POST", "/documents/scan")
wait_timeout = timeout_s if timeout_s is not None else settings.poll_timeout_s
status = await _wait_for_idle(wait_timeout, settings.poll_interval_s)
return {"status": status}
@mcp.tool()
async def pipeline_status() -> dict[str, Any]:
"""Get the ingestion pipeline status."""
return await client.request_json("GET", "/documents/pipeline_status")
@mcp.tool()
async def wait_for_idle(timeout_s: float | None = None) -> dict[str, Any]:
"""Wait for pipeline idle and return the final status."""
wait_timeout = timeout_s if timeout_s is not None else settings.poll_timeout_s
return await _wait_for_idle(wait_timeout, settings.poll_interval_s)
@mcp.tool()
async def track_status(track_id: str) -> dict[str, Any]:
"""Get document processing status for a scan track id."""
return await client.request_json("GET", f"/documents/track_status/{track_id}")
@mcp.tool()
async def list_documents() -> dict[str, Any]:
"""List documents in the knowledge base."""
return await client.request_json("GET", "/documents")
@mcp.tool()
async def list_documents_paginated(
status_filter: list[str] | None = None,
page: int | None = None,
page_size: int | None = None,
sort_field: str | None = None,
sort_direction: str | None = None,
) -> dict[str, Any]:
"""List documents with pagination and status filters."""
payload = _compact_payload(
{
"status_filter": status_filter,
"page": page,
"page_size": page_size,
"sort_field": sort_field,
"sort_direction": sort_direction,
}
)
return await client.request_json("POST", "/documents/paginated", json=payload)
@mcp.tool()
async def status_counts() -> dict[str, Any]:
"""Get document status counts."""
return await client.request_json("GET", "/documents/status_counts")
@mcp.tool()
async def delete_documents(
doc_ids: list[str],
delete_file: bool | None = None,
delete_llm_cache: bool | None = None,
) -> dict[str, Any]:
"""Delete documents by ID."""
payload = _compact_payload(
{"doc_ids": doc_ids, "delete_file": delete_file, "delete_llm_cache": delete_llm_cache}
)
return await client.request_json("DELETE", "/documents/delete_document", json=payload)
@mcp.tool()
async def clear_documents() -> dict[str, Any]:
"""Clear all documents and associated data."""
return await client.request_json("DELETE", "/documents")
@mcp.tool()
async def clear_cache() -> dict[str, Any]:
"""Clear cached LLM data in LightRAG."""
return await client.request_json("POST", "/documents/clear_cache", json={})
@mcp.tool()
async def reprocess_failed() -> dict[str, Any]:
"""Reprocess failed documents in the pipeline."""
return await client.request_json("POST", "/documents/reprocess_failed")
@mcp.tool()
async def cancel_pipeline() -> dict[str, Any]:
"""Cancel the active ingestion pipeline."""
return await client.request_json("POST", "/documents/cancel_pipeline")
@mcp.tool()
async def create_entity(entity_name: str, entity_data: dict[str, Any]) -> dict[str, Any]:
"""Create a new entity in the knowledge graph."""
payload = {"entity_name": entity_name, "entity_data": entity_data}
return await client.request_json("POST", "/graph/entity/create", json=payload)
@mcp.tool()
async def edit_entity(
entity_name: str,
updated_data: dict[str, Any],
allow_rename: bool | None = None,
allow_merge: bool | None = None,
) -> dict[str, Any]:
"""Update an entity in the knowledge graph."""
payload = _compact_payload(
{
"entity_name": entity_name,
"updated_data": updated_data,
"allow_rename": allow_rename,
"allow_merge": allow_merge,
}
)
return await client.request_json("POST", "/graph/entity/edit", json=payload)
@mcp.tool()
async def merge_entities(entities_to_change: list[str], entity_to_change_into: str) -> dict[str, Any]:
"""Merge multiple entities into one target entity."""
payload = {
"entities_to_change": entities_to_change,
"entity_to_change_into": entity_to_change_into,
}
return await client.request_json("POST", "/graph/entities/merge", json=payload)
@mcp.tool()
async def create_relation(
source_entity: str,
target_entity: str,
relation_data: dict[str, Any],
) -> dict[str, Any]:
"""Create a relation in the knowledge graph."""
payload = {
"source_entity": source_entity,
"target_entity": target_entity,
"relation_data": relation_data,
}
return await client.request_json("POST", "/graph/relation/create", json=payload)
@mcp.tool()
async def edit_relation(
source_id: str,
target_id: str,
updated_data: dict[str, Any],
) -> dict[str, Any]:
"""Update a relation in the knowledge graph."""
payload = {"source_id": source_id, "target_id": target_id, "updated_data": updated_data}
return await client.request_json("POST", "/graph/relation/edit", json=payload)
@mcp.tool()
async def delete_entity(entity_name: str) -> dict[str, Any]:
"""Delete an entity from the knowledge graph."""
payload = {"entity_name": entity_name}
return await client.request_json("DELETE", "/documents/delete_entity", json=payload)
@mcp.tool()
async def delete_relation(source_id: str, target_id: str) -> dict[str, Any]:
"""Delete a relation from the knowledge graph."""
payload = {"source_id": source_id, "target_id": target_id}
return await client.request_json("DELETE", "/documents/delete_relation", json=payload)
@mcp.tool()
async def graph_labels() -> dict[str, Any]:
"""List graph labels."""
return await client.request_json("GET", "/graph/label/list")
@mcp.tool()
async def popular_labels() -> dict[str, Any]:
"""List popular graph labels."""
return await client.request_json("GET", "/graph/label/popular")
@mcp.tool()
async def search_labels(query: str, limit: int | None = None) -> dict[str, Any]:
"""Search graph labels by query string."""
params: dict[str, Any] = {"q": query}
if limit is not None:
params["limit"] = limit
return await client.request_json("GET", "/graph/label/search", params=params)
@mcp.tool()
async def get_graph() -> dict[str, Any]:
"""Fetch the full knowledge graph."""
return await client.request_json("GET", "/graphs")
@mcp.tool()
async def entity_exists(name: str) -> dict[str, Any]:
"""Check if an entity exists in the knowledge graph."""
return await client.request_json("GET", "/graph/entity/exists", params={"name": name})
@mcp.tool()
async def health() -> dict[str, Any]:
"""Fetch health and configuration status."""
return await client.request_json("GET", "/health")
@mcp.tool()
async def ingest_memory(
title: str,
content: str,
memory_type: str | None = None,
tags: list[str] | None = None,
related_files: list[str] | None = None,
related_symbols: list[str] | None = None,
source: str | None = None,
timestamp: str | None = None,
file_source: str | None = None,
) -> dict[str, Any]:
"""Store a structured project memory (lessons, preferences, decisions, structures, etc.)."""
note = _build_memory_note(
title=title,
content=content,
memory_type=memory_type,
tags=tags,
related_files=related_files,
related_symbols=related_symbols,
source=source,
timestamp=timestamp,
)
if file_source is None:
normalized_type = (memory_type or "memory").replace(" ", "_").lower()
file_source = f"memory/{normalized_type}/{title}"
return await client.request_json(
"POST",
"/documents/text",
json={"text": note, "file_source": file_source},
)
@mcp.tool()
async def refresh_and_query(
query: str,
mode: str | None = "mix",
top_k: int | None = None,
chunk_top_k: int | None = None,
include_references: bool | None = True,
include_chunk_content: bool | None = None,
enable_rerank: bool | None = None,
wait_timeout_s: float | None = None,
) -> dict[str, Any]:
"""Scan for new documents, wait for idle, then run query_data and query."""
await client.request_json("POST", "/documents/scan")
timeout_s = wait_timeout_s if wait_timeout_s is not None else settings.poll_timeout_s
status = await _wait_for_idle(timeout_s, settings.poll_interval_s)
payload = _build_query_payload(
query,
mode=mode,
top_k=top_k,
chunk_top_k=chunk_top_k,
include_references=include_references,
include_chunk_content=include_chunk_content,
enable_rerank=enable_rerank,
)
data = await client.request_json("POST", "/query/data", json=payload)
response = await client.request_json("POST", "/query", json=payload)
return {"pipeline_status": status, "query_data": data, "query": response}
@mcp.resource("lightrag://health")
async def health_resource() -> str:
"""Health check and config status."""
data = await client.request_json("GET", "/health")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://pipeline/status")
async def pipeline_status_resource() -> str:
"""Current pipeline status."""
data = await client.request_json("GET", "/documents/pipeline_status")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://documents")
async def documents_resource() -> str:
"""Documents list."""
data = await client.request_json("GET", "/documents")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://graph")
async def graph_resource() -> str:
"""Full knowledge graph."""
data = await client.request_json("GET", "/graphs")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://labels/list")
async def labels_resource() -> str:
"""List of graph labels."""
data = await client.request_json("GET", "/graph/label/list")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://labels/popular")
async def popular_labels_resource() -> str:
"""Popular graph labels."""
data = await client.request_json("GET", "/graph/label/popular")
return json.dumps(data, indent=2)
@mcp.resource("lightrag://documents/status_counts")
async def status_counts_resource() -> str:
"""Document status counts."""
data = await client.request_json("GET", "/documents/status_counts")
return json.dumps(data, indent=2)
@mcp.prompt()
async def evidence_first_answer(question: str, mode: str = "mix", top_k: int | None = None) -> str:
"""Prefer evidence-first retrieval before answering."""
return (
"Use query_data first to gather evidence, then decide if you need more retrieval. "
"After that, call query with include_references=true to produce the final response.\n\n"
f"Question: {question}\nMode: {mode}\nTop_k: {top_k}"
)
@mcp.prompt()
async def refresh_kb_and_query(question: str, mode: str = "mix", top_k: int | None = None) -> str:
"""Refresh the knowledge base, wait idle, then retrieve and answer."""
return (
"Run refresh_and_query to scan, wait for idle, and then retrieve evidence and answer. "
"Prefer include_references=true for the final answer.\n\n"
f"Question: {question}\nMode: {mode}\nTop_k: {top_k}"
)
@mcp.prompt()
async def record_project_memory(
title: str,
content: str,
source: str | None = None,
) -> str:
"""Record a durable project memory into LightRAG."""
source_line = f"Source: {source}\n" if source else ""
return (
"Summarize the memory as a concise note with bullet points and then ingest it using ingest_memory. "
"Prefer memory types like lesson, preference, decision, structure, function, relationship.\n\n"
f"Title: {title}\n{source_line}Content: {content}"
)
def main() -> None:
run_kwargs: dict[str, Any] = {"transport": settings.mcp_transport}
try:
import inspect
params = inspect.signature(mcp.run).parameters
if "host" in params:
run_kwargs["host"] = settings.mcp_host
if "port" in params:
run_kwargs["port"] = settings.mcp_port
except (TypeError, ValueError):
pass
mcp.run(**run_kwargs)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Runtime settings for the LightRAG MCP server."""
model_config = SettingsConfigDict(extra="ignore", populate_by_name=True)
base_url: str = Field(default="http://127.0.0.1:9621", alias="LIGHTRAG_BASE_URL")
timeout_s: float = Field(default=60.0, alias="LIGHTRAG_TIMEOUT_S")
mcp_transport: str = Field(default="streamable-http", alias="MCP_TRANSPORT")
mcp_host: str = Field(default="127.0.0.1", alias="MCP_HOST")
mcp_port: int = Field(default=8000, alias="MCP_PORT")
mcp_server_name: str = Field(default="LightRAG MCP", alias="MCP_SERVER_NAME")
poll_interval_s: float = Field(default=1.0, alias="LIGHTRAG_POLL_INTERVAL_S")
poll_timeout_s: float = Field(default=120.0, alias="LIGHTRAG_POLL_TIMEOUT_S")
def load_settings() -> Settings:
return Settings()

72
src/lightrag_mcp/smoke.py Normal file
View File

@@ -0,0 +1,72 @@
from __future__ import annotations
import argparse
import asyncio
import json
from lightrag_mcp.client import LightRAGClient
from lightrag_mcp.settings import load_settings
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="LightRAG MCP smoke test")
parser.add_argument("--base-url", help="LightRAG base URL")
parser.add_argument("--timeout", type=float, help="HTTP timeout in seconds")
parser.add_argument("--query", help="Optional query to run via /query/data")
parser.add_argument("--mode", default="mix", help="Query mode")
parser.add_argument("--top-k", type=int, help="Top-k entities/relations")
parser.add_argument("--chunk-top-k", type=int, help="Top-k chunks")
parser.add_argument(
"--include-chunk-content",
action="store_true",
help="Include chunk content in query_data results",
)
parser.add_argument(
"--format",
choices=["pretty", "json"],
default="pretty",
help="Output format",
)
return parser
async def _run_smoke(args: argparse.Namespace) -> int:
settings = load_settings()
base_url = args.base_url or settings.base_url
timeout = args.timeout or settings.timeout_s
client = LightRAGClient(base_url, timeout_s=timeout)
health = await client.request_json("GET", "/health")
_print_payload("health", health, args.format)
if args.query:
payload = {
"query": args.query,
"mode": args.mode,
"top_k": args.top_k,
"chunk_top_k": args.chunk_top_k,
"include_chunk_content": True if args.include_chunk_content else None,
}
payload = {k: v for k, v in payload.items() if v is not None}
data = await client.request_json("POST", "/query/data", json=payload)
_print_payload("query_data", data, args.format)
return 0
def _print_payload(label: str, payload: dict[str, object], fmt: str) -> None:
if fmt == "json":
print(json.dumps({label: payload}))
else:
print(f"[{label}]")
print(json.dumps(payload, indent=2))
def main() -> None:
parser = _build_parser()
args = parser.parse_args()
raise SystemExit(asyncio.run(_run_smoke(args)))
if __name__ == "__main__":
main()