全部文章

MCP 服务器开发指南

Model Context Protocol 服务器开发教程,为 AI 代理提供自定义工具

目录 48 节

MCP 服务器开发指南

Model Context Protocol (MCP) 是一个开放协议,允许 AI 代理通过标准化接口访问外部工具和数据源。本指南教你如何开发自定义 MCP 服务器。

这页适合作为“自己给 AI 代理造工具”的开发文档。真正的重点不是先把所有能力都做出来,而是先把输入、输出、权限边界、错误处理和调试链路设计清楚。

适合谁读

  • 想把内部系统接给 AI 代理的人
  • 想把脚本、数据库、SaaS API 包成标准工具的人
  • 想做团队内部专用 Agent 能力层的人

MCP 协议概述

核心概念

  • Server:提供工具和资源的服务
  • Client:AI 代理,调用服务器的工具
  • Tool:服务器暴露的功能(函数)
  • Resource:服务器提供的数据(文件、API 响应等)
  • Prompt:预定义的提示词模板

通信方式

MCP 支持三种传输方式:

  1. stdio:标准输入输出(最常用)
  2. SSE:Server-Sent Events(HTTP 流)
  3. WebSocket:双向通信

快速开始

环境准备

# Node.js 实现
npm install @modelcontextprotocol/sdk

# Python 实现
pip install mcp

最小示例(Node.js)

创建 server.js

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

const server = new Server(
  {
    name: "my-mcp-server",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
    },
  },
);

// 注册工具
server.setRequestHandler("tools/list", async () => {
  return {
    tools: [
      {
        name: "get_weather",
        description: "获取指定城市的天气信息",
        inputSchema: {
          type: "object",
          properties: {
            city: {
              type: "string",
              description: "城市名称",
            },
          },
          required: ["city"],
        },
      },
    ],
  };
});

// 处理工具调用
server.setRequestHandler("tools/call", async (request) => {
  if (request.params.name === "get_weather") {
    const city = request.params.arguments.city;
    // 实际应用中调用天气 API
    return {
      content: [
        {
          type: "text",
          text: `${city} 的天气:晴天,25°C`,
        },
      ],
    };
  }
  throw new Error("Unknown tool");
});

// 启动服务器
const transport = new StdioServerTransport();
await server.connect(transport);

最小示例(Python)

创建 server.py

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("my-mcp-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="get_weather",
            description="获取指定城市的天气信息",
            inputSchema={
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称"
                    }
                },
                "required": ["city"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "get_weather":
        city = arguments["city"]
        # 实际应用中调用天气 API
        return [
            TextContent(
                type="text",
                text=f"{city} 的天气:晴天,25°C"
            )
        ]
    raise ValueError(f"Unknown tool: {name}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(stdio_server(app))

配置客户端

在 AI 代理(如 nanobot、OpenClaw)中配置:

{
  "mcpServers": {
    "weather": {
      "command": "node",
      "args": ["server.js"]
    }
  }
}

或 Python 版本:

{
  "mcpServers": {
    "weather": {
      "command": "python",
      "args": ["server.py"]
    }
  }
}

工具开发

输入验证

使用 JSON Schema 定义输入:

{
  name: "search_files",
  description: "在指定目录搜索文件",
  inputSchema: {
    type: "object",
    properties: {
      directory: {
        type: "string",
        description: "搜索目录路径"
      },
      pattern: {
        type: "string",
        description: "文件名模式(支持通配符)"
      },
      recursive: {
        type: "boolean",
        description: "是否递归搜索子目录",
        default: false
      }
    },
    required: ["directory", "pattern"]
  }
}

返回多种内容类型

server.setRequestHandler("tools/call", async (request) => {
  if (request.params.name === "analyze_image") {
    return {
      content: [
        {
          type: "text",
          text: "图像分析结果:",
        },
        {
          type: "image",
          data: base64ImageData,
          mimeType: "image/png",
        },
        {
          type: "resource",
          uri: "file:///path/to/report.json",
          mimeType: "application/json",
        },
      ],
    };
  }
});

错误处理

server.setRequestHandler("tools/call", async (request) => {
  try {
    // 工具逻辑
  } catch (error) {
    return {
      content: [
        {
          type: "text",
          text: `错误:${error.message}`,
        },
      ],
      isError: true,
    };
  }
});

资源提供

列出资源

server.setRequestHandler("resources/list", async () => {
  return {
    resources: [
      {
        uri: "file:///workspace/README.md",
        name: "项目说明",
        mimeType: "text/markdown",
      },
      {
        uri: "db://users/123",
        name: "用户数据",
        mimeType: "application/json",
      },
    ],
  };
});

读取资源

server.setRequestHandler("resources/read", async (request) => {
  const uri = request.params.uri;

  if (uri.startsWith("file://")) {
    const path = uri.slice(7);
    const content = await fs.readFile(path, "utf-8");
    return {
      contents: [
        {
          uri,
          mimeType: "text/plain",
          text: content,
        },
      ],
    };
  }

  throw new Error("Unsupported URI scheme");
});

提示词模板

定义模板

server.setRequestHandler("prompts/list", async () => {
  return {
    prompts: [
      {
        name: "code_review",
        description: "代码审查提示词",
        arguments: [
          {
            name: "language",
            description: "编程语言",
            required: true,
          },
        ],
      },
    ],
  };
});

生成提示词

server.setRequestHandler("prompts/get", async (request) => {
  if (request.params.name === "code_review") {
    const language = request.params.arguments.language;
    return {
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `请审查以下 ${language} 代码,关注:
1. 代码质量和可读性
2. 潜在的 bug 和安全问题
3. 性能优化建议`,
          },
        },
      ],
    };
  }
});

高级特性

流式响应

server.setRequestHandler("tools/call", async (request) => {
  if (request.params.name === "generate_report") {
    // 返回流式内容
    return {
      content: [
        {
          type: "text",
          text: "正在生成报告...",
        },
      ],
      _meta: {
        progressToken: "report-123",
      },
    };
  }
});

// 发送进度更新
server.notification({
  method: "notifications/progress",
  params: {
    progressToken: "report-123",
    progress: 50,
    total: 100,
  },
});

订阅资源变化

server.setRequestHandler("resources/subscribe", async (request) => {
  const uri = request.params.uri;

  // 监听文件变化
  fs.watch(uri.slice(7), (eventType) => {
    server.notification({
      method: "notifications/resources/updated",
      params: { uri },
    });
  });

  return {};
});

认证和授权

server.setRequestHandler("initialize", async (request) => {
  const apiKey = request.params.clientInfo?.apiKey;

  if (!isValidApiKey(apiKey)) {
    throw new Error("Invalid API key");
  }

  return {
    protocolVersion: "2024-11-05",
    capabilities: {
      tools: {},
    },
    serverInfo: {
      name: "secure-server",
      version: "1.0.0",
    },
  };
});

实战案例

案例 1:GitHub API 服务器

import { Octokit } from "@octokit/rest";

const octokit = new Octokit({ auth: process.env.GITHUB_TOKEN });

server.setRequestHandler("tools/list", async () => {
  return {
    tools: [
      {
        name: "create_issue",
        description: "创建 GitHub Issue",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            title: { type: "string" },
            body: { type: "string" },
          },
          required: ["owner", "repo", "title"],
        },
      },
      {
        name: "list_prs",
        description: "列出 Pull Requests",
        inputSchema: {
          type: "object",
          properties: {
            owner: { type: "string" },
            repo: { type: "string" },
            state: {
              type: "string",
              enum: ["open", "closed", "all"],
              default: "open",
            },
          },
          required: ["owner", "repo"],
        },
      },
    ],
  };
});

server.setRequestHandler("tools/call", async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "create_issue") {
    const result = await octokit.issues.create({
      owner: args.owner,
      repo: args.repo,
      title: args.title,
      body: args.body,
    });

    return {
      content: [
        {
          type: "text",
          text: `Issue 创建成功:${result.data.html_url}`,
        },
      ],
    };
  }

  if (name === "list_prs") {
    const result = await octokit.pulls.list({
      owner: args.owner,
      repo: args.repo,
      state: args.state,
    });

    const prs = result.data
      .map((pr) => `#${pr.number}: ${pr.title} (${pr.state})`)
      .join("\n");

    return {
      content: [
        {
          type: "text",
          text: prs,
        },
      ],
    };
  }
});

案例 2:数据库查询服务器

import sqlite3
from mcp.server import Server
from mcp.types import Tool, TextContent

app = Server("database-server")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="query_db",
            description="执行 SQL 查询",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL 查询语句"
                    }
                },
                "required": ["sql"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "query_db":
        conn = sqlite3.connect("data.db")
        cursor = conn.cursor()

        try:
            cursor.execute(arguments["sql"])
            results = cursor.fetchall()

            # 格式化结果
            output = "\n".join(str(row) for row in results)

            return [TextContent(type="text", text=output)]
        except Exception as e:
            return [TextContent(type="text", text=f"错误:{e}")]
        finally:
            conn.close()

案例 3:文件系统服务器

import fs from "fs/promises";
import path from "path";

const WORKSPACE = process.env.WORKSPACE_DIR || process.cwd();

server.setRequestHandler("tools/list", async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "读取文件内容",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string" },
          },
          required: ["path"],
        },
      },
      {
        name: "write_file",
        description: "写入文件",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string" },
            content: { type: "string" },
          },
          required: ["path", "content"],
        },
      },
      {
        name: "list_directory",
        description: "列出目录内容",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string" },
          },
          required: ["path"],
        },
      },
    ],
  };
});

server.setRequestHandler("tools/call", async (request) => {
  const { name, arguments: args } = request.params;
  const fullPath = path.join(WORKSPACE, args.path);

  // 安全检查:防止路径遍历
  if (!fullPath.startsWith(WORKSPACE)) {
    throw new Error("Access denied: path outside workspace");
  }

  if (name === "read_file") {
    const content = await fs.readFile(fullPath, "utf-8");
    return {
      content: [{ type: "text", text: content }],
    };
  }

  if (name === "write_file") {
    await fs.writeFile(fullPath, args.content, "utf-8");
    return {
      content: [{ type: "text", text: "文件写入成功" }],
    };
  }

  if (name === "list_directory") {
    const files = await fs.readdir(fullPath);
    return {
      content: [{ type: "text", text: files.join("\n") }],
    };
  }
});

测试与调试

使用 MCP Inspector

npx @modelcontextprotocol/inspector node server.js

在浏览器中打开 http://localhost:5173,可以:

  • 查看服务器信息
  • 测试工具调用
  • 查看日志输出

单元测试

import { describe, it, expect } from "vitest";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";

describe("Weather Server", () => {
  it("should return weather data", async () => {
    const server = createWeatherServer();

    const response = await server.request({
      method: "tools/call",
      params: {
        name: "get_weather",
        arguments: { city: "北京" },
      },
    });

    expect(response.content[0].text).toContain("北京");
  });
});

日志记录

import { Logger } from "@modelcontextprotocol/sdk/shared/logger.js";

const logger = new Logger("my-server");

server.setRequestHandler("tools/call", async (request) => {
  logger.info(`Tool called: ${request.params.name}`);
  logger.debug(`Arguments: ${JSON.stringify(request.params.arguments)}`);

  try {
    // 工具逻辑
  } catch (error) {
    logger.error(`Error: ${error.message}`);
    throw error;
  }
});

部署

发布到 npm

{
  "name": "mcp-server-weather",
  "version": "1.0.0",
  "bin": {
    "mcp-server-weather": "./dist/index.js"
  },
  "files": ["dist"]
}

用户可以通过 npx 直接使用:

{
  "mcpServers": {
    "weather": {
      "command": "npx",
      "args": ["-y", "mcp-server-weather"]
    }
  }
}

发布到 PyPI

# setup.py
from setuptools import setup

setup(
    name="mcp-server-weather",
    version="1.0.0",
    py_modules=["server"],
    entry_points={
        "console_scripts": [
            "mcp-server-weather=server:main",
        ],
    },
)

用户可以通过 uvx 使用:

{
  "mcpServers": {
    "weather": {
      "command": "uvx",
      "args": ["mcp-server-weather"]
    }
  }
}

Docker 部署

FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY . .
CMD ["node", "server.js"]

配置:

{
  "mcpServers": {
    "weather": {
      "command": "docker",
      "args": ["run", "-i", "mcp-server-weather"]
    }
  }
}

最佳实践

1. 清晰的工具描述

{
  name: "search_files",
  description: "在指定目录搜索文件。支持通配符模式(如 *.js)和递归搜索。",
  // 不好的描述:
  // description: "搜索文件"
}

2. 详细的参数说明

inputSchema: {
  type: "object",
  properties: {
    query: {
      type: "string",
      description: "搜索关键词。支持正则表达式,如 /pattern/i 表示不区分大小写。",
      // 不好的描述:
      // description: "查询"
    }
  }
}

3. 合理的错误处理

try {
  // 工具逻辑
} catch (error) {
  return {
    content: [
      {
        type: "text",
        text: `操作失败:${error.message}\n\n建议:检查文件路径是否正确`,
      },
    ],
    isError: true,
  };
}

4. 安全验证

// 路径遍历防护
if (!fullPath.startsWith(WORKSPACE)) {
  throw new Error("Access denied");
}

// 输入验证
if (!isValidEmail(args.email)) {
  throw new Error("Invalid email format");
}

// 速率限制
if (rateLimiter.isExceeded(clientId)) {
  throw new Error("Rate limit exceeded");
}

5. 性能优化

// 缓存结果
const cache = new Map();

server.setRequestHandler("tools/call", async (request) => {
  const cacheKey = JSON.stringify(request.params);

  if (cache.has(cacheKey)) {
    return cache.get(cacheKey);
  }

  const result = await expensiveOperation();
  cache.set(cacheKey, result);

  return result;
});

推荐开发顺序

建议按这个顺序推进:

  1. 先做一个只暴露单个工具的最小服务器
  2. 再补输入校验和错误处理
  3. 再补资源与提示模板
  4. 再补日志、调试与测试
  5. 最后再做部署、权限和版本治理

常见问题

工具能跑,但代理不会调用

优先检查:

  • 工具描述是否清晰
  • 输入 schema 是否合理
  • 返回内容是否足够让模型理解下一步
  • 客户端是否真的加载了该服务器

该先做 Tools 还是 Resources

如果你的能力更像“执行动作”,先做 Tools;如果主要是“提供内容”,先做 Resources。

为什么服务器越写越重

通常说明把业务逻辑、适配层和协议层混在了一起。更稳的做法是把核心业务单独抽出来,MCP 只负责暴露接口。

延伸阅读

参考链接

阅读建议
  • - 先读标题和摘要,再结合目录决定从哪个章节开始精读。
  • - 看到具体命令、配置或步骤时,尽量在自己的环境里同步验证。
  • - 如果你只是快速查资料,可先看目录和相关文档,再决定是否深入全文。
适合谁看
  • - 希望把零散经验整理成长期可复用工作流的人
  • - 正在使用 AI 工具、Agent 或自动化工作流的人
  • - 希望阅读时顺手建立自己的操作清单或收藏体系的人
执行前检查
  • - 先浏览标题、摘要和目录,带着问题阅读会更高效
  • - 确认模型供应商、API Key、CLI 工具链与本地资源是否已准备好
  • - 如果页面里提到相关文档,尽量一起打开对照,效果通常更完整
同类内容
← 上一篇CSS 实用技巧