MCP 服务器开发指南
Model Context Protocol (MCP) 是一个开放协议,允许 AI 代理通过标准化接口访问外部工具和数据源。本指南教你如何开发自定义 MCP 服务器。
这页适合作为“自己给 AI 代理造工具”的开发文档。真正的重点不是先把所有能力都做出来,而是先把输入、输出、权限边界、错误处理和调试链路设计清楚。
适合谁读
- 想把内部系统接给 AI 代理的人
- 想把脚本、数据库、SaaS API 包成标准工具的人
- 想做团队内部专用 Agent 能力层的人
MCP 协议概述
核心概念
- Server:提供工具和资源的服务
- Client:AI 代理,调用服务器的工具
- Tool:服务器暴露的功能(函数)
- Resource:服务器提供的数据(文件、API 响应等)
- Prompt:预定义的提示词模板
通信方式
MCP 支持三种传输方式:
- stdio:标准输入输出(最常用)
- SSE:Server-Sent Events(HTTP 流)
- WebSocket:双向通信
快速开始
环境准备
# Node.js 实现
npm install @modelcontextprotocol/sdk
# Python 实现
pip install mcp
最小示例(Node.js)
创建 server.js:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server(
{
name: "my-mcp-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
},
);
// 注册工具
server.setRequestHandler("tools/list", async () => {
return {
tools: [
{
name: "get_weather",
description: "获取指定城市的天气信息",
inputSchema: {
type: "object",
properties: {
city: {
type: "string",
description: "城市名称",
},
},
required: ["city"],
},
},
],
};
});
// 处理工具调用
server.setRequestHandler("tools/call", async (request) => {
if (request.params.name === "get_weather") {
const city = request.params.arguments.city;
// 实际应用中调用天气 API
return {
content: [
{
type: "text",
text: `${city} 的天气:晴天,25°C`,
},
],
};
}
throw new Error("Unknown tool");
});
// 启动服务器
const transport = new StdioServerTransport();
await server.connect(transport);
最小示例(Python)
创建 server.py:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("my-mcp-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_weather",
description="获取指定城市的天气信息",
inputSchema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
}
},
"required": ["city"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "get_weather":
city = arguments["city"]
# 实际应用中调用天气 API
return [
TextContent(
type="text",
text=f"{city} 的天气:晴天,25°C"
)
]
raise ValueError(f"Unknown tool: {name}")
if __name__ == "__main__":
import asyncio
asyncio.run(stdio_server(app))
配置客户端
在 AI 代理(如 nanobot、OpenClaw)中配置:
{
"mcpServers": {
"weather": {
"command": "node",
"args": ["server.js"]
}
}
}
或 Python 版本:
{
"mcpServers": {
"weather": {
"command": "python",
"args": ["server.py"]
}
}
}
工具开发
输入验证
使用 JSON Schema 定义输入:
{
name: "search_files",
description: "在指定目录搜索文件",
inputSchema: {
type: "object",
properties: {
directory: {
type: "string",
description: "搜索目录路径"
},
pattern: {
type: "string",
description: "文件名模式(支持通配符)"
},
recursive: {
type: "boolean",
description: "是否递归搜索子目录",
default: false
}
},
required: ["directory", "pattern"]
}
}
返回多种内容类型
server.setRequestHandler("tools/call", async (request) => {
if (request.params.name === "analyze_image") {
return {
content: [
{
type: "text",
text: "图像分析结果:",
},
{
type: "image",
data: base64ImageData,
mimeType: "image/png",
},
{
type: "resource",
uri: "file:///path/to/report.json",
mimeType: "application/json",
},
],
};
}
});
错误处理
server.setRequestHandler("tools/call", async (request) => {
try {
// 工具逻辑
} catch (error) {
return {
content: [
{
type: "text",
text: `错误:${error.message}`,
},
],
isError: true,
};
}
});
资源提供
列出资源
server.setRequestHandler("resources/list", async () => {
return {
resources: [
{
uri: "file:///workspace/README.md",
name: "项目说明",
mimeType: "text/markdown",
},
{
uri: "db://users/123",
name: "用户数据",
mimeType: "application/json",
},
],
};
});
读取资源
server.setRequestHandler("resources/read", async (request) => {
const uri = request.params.uri;
if (uri.startsWith("file://")) {
const path = uri.slice(7);
const content = await fs.readFile(path, "utf-8");
return {
contents: [
{
uri,
mimeType: "text/plain",
text: content,
},
],
};
}
throw new Error("Unsupported URI scheme");
});
提示词模板
定义模板
server.setRequestHandler("prompts/list", async () => {
return {
prompts: [
{
name: "code_review",
description: "代码审查提示词",
arguments: [
{
name: "language",
description: "编程语言",
required: true,
},
],
},
],
};
});
生成提示词
server.setRequestHandler("prompts/get", async (request) => {
if (request.params.name === "code_review") {
const language = request.params.arguments.language;
return {
messages: [
{
role: "user",
content: {
type: "text",
text: `请审查以下 ${language} 代码,关注:
1. 代码质量和可读性
2. 潜在的 bug 和安全问题
3. 性能优化建议`,
},
},
],
};
}
});
高级特性
流式响应
server.setRequestHandler("tools/call", async (request) => {
if (request.params.name === "generate_report") {
// 返回流式内容
return {
content: [
{
type: "text",
text: "正在生成报告...",
},
],
_meta: {
progressToken: "report-123",
},
};
}
});
// 发送进度更新
server.notification({
method: "notifications/progress",
params: {
progressToken: "report-123",
progress: 50,
total: 100,
},
});
订阅资源变化
server.setRequestHandler("resources/subscribe", async (request) => {
const uri = request.params.uri;
// 监听文件变化
fs.watch(uri.slice(7), (eventType) => {
server.notification({
method: "notifications/resources/updated",
params: { uri },
});
});
return {};
});
认证和授权
server.setRequestHandler("initialize", async (request) => {
const apiKey = request.params.clientInfo?.apiKey;
if (!isValidApiKey(apiKey)) {
throw new Error("Invalid API key");
}
return {
protocolVersion: "2024-11-05",
capabilities: {
tools: {},
},
serverInfo: {
name: "secure-server",
version: "1.0.0",
},
};
});
实战案例
案例 1:GitHub API 服务器
import { Octokit } from "@octokit/rest";
const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });
server.setRequestHandler("tools/list", async () => {
return {
tools: [
{
name: "create_issue",
description: "创建 GitHub Issue",
inputSchema: {
type: "object",
properties: {
owner: { type: "string" },
repo: { type: "string" },
title: { type: "string" },
body: { type: "string" },
},
required: ["owner", "repo", "title"],
},
},
{
name: "list_prs",
description: "列出 Pull Requests",
inputSchema: {
type: "object",
properties: {
owner: { type: "string" },
repo: { type: "string" },
state: {
type: "string",
enum: ["open", "closed", "all"],
default: "open",
},
},
required: ["owner", "repo"],
},
},
],
};
});
server.setRequestHandler("tools/call", async (request) => {
const { name, arguments: args } = request.params;
if (name === "create_issue") {
const result = await octokit.issues.create({
owner: args.owner,
repo: args.repo,
title: args.title,
body: args.body,
});
return {
content: [
{
type: "text",
text: `Issue 创建成功:${result.data.html_url}`,
},
],
};
}
if (name === "list_prs") {
const result = await octokit.pulls.list({
owner: args.owner,
repo: args.repo,
state: args.state,
});
const prs = result.data
.map((pr) => `#${pr.number}: ${pr.title} (${pr.state})`)
.join("\n");
return {
content: [
{
type: "text",
text: prs,
},
],
};
}
});
案例 2:数据库查询服务器
import sqlite3
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("database-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query_db",
description="执行 SQL 查询",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL 查询语句"
}
},
"required": ["sql"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_db":
conn = sqlite3.connect("data.db")
cursor = conn.cursor()
try:
cursor.execute(arguments["sql"])
results = cursor.fetchall()
# 格式化结果
output = "\n".join(str(row) for row in results)
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(type="text", text=f"错误:{e}")]
finally:
conn.close()
案例 3:文件系统服务器
import fs from "fs/promises";
import path from "path";
const WORKSPACE = process.env.WORKSPACE_DIR || process.cwd();
server.setRequestHandler("tools/list", async () => {
return {
tools: [
{
name: "read_file",
description: "读取文件内容",
inputSchema: {
type: "object",
properties: {
path: { type: "string" },
},
required: ["path"],
},
},
{
name: "write_file",
description: "写入文件",
inputSchema: {
type: "object",
properties: {
path: { type: "string" },
content: { type: "string" },
},
required: ["path", "content"],
},
},
{
name: "list_directory",
description: "列出目录内容",
inputSchema: {
type: "object",
properties: {
path: { type: "string" },
},
required: ["path"],
},
},
],
};
});
server.setRequestHandler("tools/call", async (request) => {
const { name, arguments: args } = request.params;
const fullPath = path.join(WORKSPACE, args.path);
// 安全检查:防止路径遍历
if (!fullPath.startsWith(WORKSPACE)) {
throw new Error("Access denied: path outside workspace");
}
if (name === "read_file") {
const content = await fs.readFile(fullPath, "utf-8");
return {
content: [{ type: "text", text: content }],
};
}
if (name === "write_file") {
await fs.writeFile(fullPath, args.content, "utf-8");
return {
content: [{ type: "text", text: "文件写入成功" }],
};
}
if (name === "list_directory") {
const files = await fs.readdir(fullPath);
return {
content: [{ type: "text", text: files.join("\n") }],
};
}
});
测试与调试
使用 MCP Inspector
npx @modelcontextprotocol/inspector node server.js
在浏览器中打开 http://localhost:5173,可以:
- 查看服务器信息
- 测试工具调用
- 查看日志输出
单元测试
import { describe, it, expect } from "vitest";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
describe("Weather Server", () => {
it("should return weather data", async () => {
const server = createWeatherServer();
const response = await server.request({
method: "tools/call",
params: {
name: "get_weather",
arguments: { city: "北京" },
},
});
expect(response.content[0].text).toContain("北京");
});
});
日志记录
import { Logger } from "@modelcontextprotocol/sdk/shared/logger.js";
const logger = new Logger("my-server");
server.setRequestHandler("tools/call", async (request) => {
logger.info(`Tool called: ${request.params.name}`);
logger.debug(`Arguments: ${JSON.stringify(request.params.arguments)}`);
try {
// 工具逻辑
} catch (error) {
logger.error(`Error: ${error.message}`);
throw error;
}
});
部署
发布到 npm
{
"name": "mcp-server-weather",
"version": "1.0.0",
"bin": {
"mcp-server-weather": "./dist/index.js"
},
"files": ["dist"]
}
用户可以通过 npx 直接使用:
{
"mcpServers": {
"weather": {
"command": "npx",
"args": ["-y", "mcp-server-weather"]
}
}
}
发布到 PyPI
# setup.py
from setuptools import setup
setup(
name="mcp-server-weather",
version="1.0.0",
py_modules=["server"],
entry_points={
"console_scripts": [
"mcp-server-weather=server:main",
],
},
)
用户可以通过 uvx 使用:
{
"mcpServers": {
"weather": {
"command": "uvx",
"args": ["mcp-server-weather"]
}
}
}
Docker 部署
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY . .
CMD ["node", "server.js"]
配置:
{
"mcpServers": {
"weather": {
"command": "docker",
"args": ["run", "-i", "mcp-server-weather"]
}
}
}
最佳实践
1. 清晰的工具描述
{
name: "search_files",
description: "在指定目录搜索文件。支持通配符模式(如 *.js)和递归搜索。",
// 不好的描述:
// description: "搜索文件"
}
2. 详细的参数说明
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "搜索关键词。支持正则表达式,如 /pattern/i 表示不区分大小写。",
// 不好的描述:
// description: "查询"
}
}
}
3. 合理的错误处理
try {
// 工具逻辑
} catch (error) {
return {
content: [
{
type: "text",
text: `操作失败:${error.message}\n\n建议:检查文件路径是否正确`,
},
],
isError: true,
};
}
4. 安全验证
// 路径遍历防护
if (!fullPath.startsWith(WORKSPACE)) {
throw new Error("Access denied");
}
// 输入验证
if (!isValidEmail(args.email)) {
throw new Error("Invalid email format");
}
// 速率限制
if (rateLimiter.isExceeded(clientId)) {
throw new Error("Rate limit exceeded");
}
5. 性能优化
// 缓存结果
const cache = new Map();
server.setRequestHandler("tools/call", async (request) => {
const cacheKey = JSON.stringify(request.params);
if (cache.has(cacheKey)) {
return cache.get(cacheKey);
}
const result = await expensiveOperation();
cache.set(cacheKey, result);
return result;
});
推荐开发顺序
建议按这个顺序推进:
- 先做一个只暴露单个工具的最小服务器
- 再补输入校验和错误处理
- 再补资源与提示模板
- 再补日志、调试与测试
- 最后再做部署、权限和版本治理
常见问题
工具能跑,但代理不会调用
优先检查:
- 工具描述是否清晰
- 输入 schema 是否合理
- 返回内容是否足够让模型理解下一步
- 客户端是否真的加载了该服务器
该先做 Tools 还是 Resources
如果你的能力更像“执行动作”,先做 Tools;如果主要是“提供内容”,先做 Resources。
为什么服务器越写越重
通常说明把业务逻辑、适配层和协议层混在了一起。更稳的做法是把核心业务单独抽出来,MCP 只负责暴露接口。