AI 工具 | | 约 39 分钟 | 15,475 字

开发 API 集成 MCP Server:连接第三方服务

开发连接 REST API 的 MCP Server,以 GitHub API 为例

项目目标

我们要开发一个 GitHub MCP Server,让 AI 能够:

  • 搜索仓库和代码
  • 查看 Issue 和 PR 列表
  • 创建 Issue
  • 获取仓库信息和文件内容

这个项目的重点不仅是 GitHub 集成本身,更是展示如何将任意 REST API 封装为 MCP Server 的通用模式。


项目初始化

mkdir mcp-github-server
cd mcp-github-server
npm init -y
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node
// tsconfig.json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true
  },
  "include": ["src/**/*"]
}

项目结构:

src/
├── index.ts          # 入口
├── server.ts         # MCP Server 定义
├── github-client.ts  # GitHub API 封装
└── rate-limiter.ts   # 速率限制

GitHub API 客户端

先封装一个 GitHub API 客户端,处理认证、请求和错误:

// src/github-client.ts

interface GitHubRequestOptions {
  method?: string;
  body?: unknown;
  params?: Record<string, string | number>;
}

export interface Repository {
  full_name: string;
  description: string | null;
  html_url: string;
  stargazers_count: number;
  language: string | null;
  updated_at: string;
}

export interface Issue {
  number: number;
  title: string;
  state: string;
  user: { login: string };
  created_at: string;
  labels: { name: string }[];
  html_url: string;
}

export interface SearchResult<T> {
  total_count: number;
  items: T[];
}

export class GitHubClient {
  private baseUrl = "https://api.github.com";
  private token: string;
  private remainingRequests = 5000;
  private resetTime = 0;

  constructor(token: string) {
    this.token = token;
  }

  private async request<T>(
    path: string,
    options: GitHubRequestOptions = {}
  ): Promise<T> {
    const { method = "GET", body, params } = options;

    // 构建 URL
    let url = `${this.baseUrl}${path}`;
    if (params) {
      const searchParams = new URLSearchParams();
      for (const [key, value] of Object.entries(params)) {
        searchParams.set(key, String(value));
      }
      url += `?${searchParams.toString()}`;
    }

    // 检查速率限制
    if (this.remainingRequests <= 0) {
      const waitTime = this.resetTime - Date.now();
      if (waitTime > 0) {
        throw new Error(
          `GitHub API rate limit exceeded. Reset in ${Math.ceil(waitTime / 1000)} seconds.`
        );
      }
    }

    const response = await fetch(url, {
      method,
      headers: {
        Authorization: `Bearer ${this.token}`,
        Accept: "application/vnd.github.v3+json",
        "Content-Type": "application/json",
        "User-Agent": "mcp-github-server/1.0",
      },
      body: body ? JSON.stringify(body) : undefined,
    });

    // 更新速率限制信息
    this.remainingRequests = Number(
      response.headers.get("x-ratelimit-remaining") ?? 5000
    );
    this.resetTime =
      Number(response.headers.get("x-ratelimit-reset") ?? 0) * 1000;

    if (!response.ok) {
      const error = await response.json().catch(() => ({}));
      throw new Error(
        `GitHub API error ${response.status}: ${(error as any).message || response.statusText}`
      );
    }

    return response.json() as Promise<T>;
  }

  // 搜索仓库
  async searchRepositories(
    query: string,
    page = 1,
    perPage = 10
  ): Promise<SearchResult<Repository>> {
    return this.request("/search/repositories", {
      params: { q: query, page, per_page: perPage, sort: "stars" },
    });
  }

  // 搜索代码
  async searchCode(
    query: string,
    page = 1,
    perPage = 10
  ): Promise<SearchResult<{ path: string; repository: { full_name: string }; html_url: string }>> {
    return this.request("/search/code", {
      params: { q: query, page, per_page: perPage },
    });
  }

  // 获取仓库信息
  async getRepository(owner: string, repo: string): Promise<Repository> {
    return this.request(`/repos/${owner}/${repo}`);
  }

  // 列出 Issue
  async listIssues(
    owner: string,
    repo: string,
    state: "open" | "closed" | "all" = "open",
    page = 1,
    perPage = 20
  ): Promise<Issue[]> {
    return this.request(`/repos/${owner}/${repo}/issues`, {
      params: { state, page, per_page: perPage, sort: "updated" },
    });
  }

  // 创建 Issue
  async createIssue(
    owner: string,
    repo: string,
    title: string,
    body?: string,
    labels?: string[]
  ): Promise<Issue> {
    return this.request(`/repos/${owner}/${repo}/issues`, {
      method: "POST",
      body: { title, body, labels },
    });
  }

  // 获取文件内容
  async getFileContent(
    owner: string,
    repo: string,
    path: string
  ): Promise<string> {
    const result = await this.request<{ content: string; encoding: string }>(
      `/repos/${owner}/${repo}/contents/${path}`
    );

    if (result.encoding === "base64") {
      return Buffer.from(result.content, "base64").toString("utf-8");
    }
    return result.content;
  }

  // 获取速率限制状态
  getRateLimitStatus(): { remaining: number; resetAt: Date } {
    return {
      remaining: this.remainingRequests,
      resetAt: new Date(this.resetTime),
    };
  }
}

速率限制处理

GitHub API 有严格的速率限制,我们需要优雅地处理:

// src/rate-limiter.ts

export class RateLimiter {
  private requests: number[] = [];
  private maxRequests: number;
  private windowMs: number;

  constructor(maxRequests: number, windowMs: number) {
    this.maxRequests = maxRequests;
    this.windowMs = windowMs;
  }

  async acquire(): Promise<void> {
    const now = Date.now();

    // 清理过期的请求记录
    this.requests = this.requests.filter(
      (time) => now - time < this.windowMs
    );

    if (this.requests.length >= this.maxRequests) {
      const oldestRequest = this.requests[0];
      const waitTime = this.windowMs - (now - oldestRequest);

      if (waitTime > 0) {
        await new Promise((resolve) => setTimeout(resolve, waitTime));
      }
    }

    this.requests.push(Date.now());
  }
}

MCP Server 实现

// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import { GitHubClient } from "./github-client.js";

export function createServer(github: GitHubClient): McpServer {
  const server = new McpServer({
    name: "mcp-github-server",
    version: "1.0.0",
  });

  // 工具:搜索仓库
  server.tool(
    "search_repositories",
    "在 GitHub 上搜索仓库,返回仓库名、描述、Star 数等信息",
    {
      query: z.string().describe("搜索关键词,支持 GitHub 搜索语法"),
      page: z.number().optional().default(1).describe("页码"),
      per_page: z.number().optional().default(10).describe("每页数量,最大 30"),
    },
    async ({ query, page, per_page }) => {
      try {
        const result = await github.searchRepositories(query, page, per_page);

        if (result.items.length === 0) {
          return {
            content: [{ type: "text", text: `没有找到匹配 "${query}" 的仓库。` }],
          };
        }

        const repos = result.items
          .map(
            (repo) =>
              `### ${repo.full_name}\n` +
              `- 描述:${repo.description || "无"}\n` +
              `- Star:${repo.stargazers_count}\n` +
              `- 语言:${repo.language || "未知"}\n` +
              `- 链接:${repo.html_url}`
          )
          .join("\n\n");

        return {
          content: [
            {
              type: "text",
              text: `找到 ${result.total_count} 个仓库(显示第 ${page} 页):\n\n${repos}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `搜索失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:搜索代码
  server.tool(
    "search_code",
    "在 GitHub 上搜索代码片段,返回文件路径和所属仓库",
    {
      query: z.string().describe("搜索关键词,支持 GitHub 代码搜索语法"),
      page: z.number().optional().default(1).describe("页码"),
    },
    async ({ query, page }) => {
      try {
        const result = await github.searchCode(query, page);

        if (result.items.length === 0) {
          return {
            content: [{ type: "text", text: `没有找到匹配的代码。` }],
          };
        }

        const items = result.items
          .map(
            (item) =>
              `- \`${item.repository.full_name}\`\`${item.path}\`\n  ${item.html_url}`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text: `找到 ${result.total_count} 个匹配(显示第 ${page} 页):\n\n${items}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `搜索失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:列出 Issue
  server.tool(
    "list_issues",
    "列出指定仓库的 Issue,支持按状态筛选",
    {
      owner: z.string().describe("仓库所有者"),
      repo: z.string().describe("仓库名称"),
      state: z
        .enum(["open", "closed", "all"])
        .optional()
        .default("open")
        .describe("Issue 状态"),
      page: z.number().optional().default(1).describe("页码"),
    },
    async ({ owner, repo, state, page }) => {
      try {
        const issues = await github.listIssues(owner, repo, state, page);

        if (issues.length === 0) {
          return {
            content: [
              { type: "text", text: `${owner}/${repo} 没有 ${state} 状态的 Issue。` },
            ],
          };
        }

        const header = "| # | 标题 | 状态 | 作者 | 标签 |";
        const separator = "|---|------|------|------|------|";
        const rows = issues
          .map(
            (issue) =>
              `| #${issue.number} | ${issue.title} | ${issue.state} | @${issue.user.login} | ${issue.labels.map((l) => l.name).join(", ") || "-"} |`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text: `${owner}/${repo} 的 Issue 列表:\n\n${header}\n${separator}\n${rows}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `查询失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:创建 Issue
  server.tool(
    "create_issue",
    "在指定仓库创建新的 Issue",
    {
      owner: z.string().describe("仓库所有者"),
      repo: z.string().describe("仓库名称"),
      title: z.string().describe("Issue 标题"),
      body: z.string().optional().describe("Issue 内容(支持 Markdown)"),
      labels: z.array(z.string()).optional().describe("标签列表"),
    },
    async ({ owner, repo, title, body, labels }) => {
      try {
        const issue = await github.createIssue(owner, repo, title, body, labels);

        return {
          content: [
            {
              type: "text",
              text: `Issue 创建成功!\n\n- 编号:#${issue.number}\n- 标题:${issue.title}\n- 链接:${issue.html_url}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `创建失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:获取文件内容
  server.tool(
    "get_file_content",
    "获取 GitHub 仓库中指定文件的内容",
    {
      owner: z.string().describe("仓库所有者"),
      repo: z.string().describe("仓库名称"),
      path: z.string().describe("文件路径,如 src/index.ts"),
    },
    async ({ owner, repo, path }) => {
      try {
        const content = await github.getFileContent(owner, repo, path);

        return {
          content: [
            {
              type: "text",
              text: `文件 \`${owner}/${repo}/${path}\` 的内容:\n\n\`\`\`\n${content}\n\`\`\``,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `获取失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:查看速率限制
  server.tool(
    "rate_limit_status",
    "查看当前 GitHub API 的速率限制状态",
    {},
    async () => {
      const status = github.getRateLimitStatus();
      return {
        content: [
          {
            type: "text",
            text: `GitHub API 速率限制:\n- 剩余请求数:${status.remaining}\n- 重置时间:${status.resetAt.toLocaleString()}`,
          },
        ],
      };
    }
  );

  return server;
}

入口文件

// src/index.ts
#!/usr/bin/env node

import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { GitHubClient } from "./github-client.js";
import { createServer } from "./server.js";

async function main() {
  const token = process.env.GITHUB_TOKEN;

  if (!token) {
    console.error("Error: GITHUB_TOKEN environment variable is required");
    console.error("Create a token at: https://github.com/settings/tokens");
    process.exit(1);
  }

  const github = new GitHubClient(token);
  const server = createServer(github);

  const transport = new StdioServerTransport();
  await server.connect(transport);
}

main().catch((error) => {
  console.error("Server failed to start:", error);
  process.exit(1);
});

认证处理

创建 GitHub Token

  1. 访问 github.com/settings/tokens
  2. 点击 “Generate new token (classic)”
  3. 选择需要的权限:
    • repo:访问仓库(如果需要访问私有仓库)
    • public_repo:只访问公开仓库
  4. 生成并保存 Token

配置到 Claude Code

{
  "mcpServers": {
    "github": {
      "command": "node",
      "args": ["/path/to/mcp-github-server/dist/index.js"],
      "env": {
        "GITHUB_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxx"
      }
    }
  }
}

错误处理最佳实践

1. 分类处理错误

function handleGitHubError(error: unknown): string {
  if (error instanceof Error) {
    const message = error.message;

    if (message.includes("404")) {
      return "资源不存在,请检查仓库名或文件路径是否正确";
    }
    if (message.includes("401")) {
      return "认证失败,请检查 GitHub Token 是否有效";
    }
    if (message.includes("403")) {
      return "权限不足或速率限制,请稍后重试";
    }
    if (message.includes("422")) {
      return "请求参数无效,请检查输入";
    }

    return message;
  }
  return "未知错误";
}

2. 重试机制

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  delay = 1000
): Promise<T> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxRetries) throw error;

      const message = (error as Error).message;
      // 只对可重试的错误进行重试
      if (message.includes("rate limit") || message.includes("5")) {
        await new Promise((r) => setTimeout(r, delay * attempt));
        continue;
      }
      throw error;
    }
  }
  throw new Error("Unreachable");
}

分页处理

GitHub API 的分页是一个常见问题。我们的工具通过 page 参数支持分页,但也可以实现自动分页:

async function fetchAllPages<T>(
  fetchPage: (page: number) => Promise<T[]>,
  maxPages = 5
): Promise<T[]> {
  const allItems: T[] = [];

  for (let page = 1; page <= maxPages; page++) {
    const items = await fetchPage(page);
    allItems.push(...items);

    // 如果返回的数量少于每页数量,说明已经是最后一页
    if (items.length < 20) break;
  }

  return allItems;
}

通用模式:封装任意 REST API

这个 GitHub Server 展示了一个通用的模式,可以应用到任何 REST API:

步骤

  1. 封装 API 客户端:处理认证、请求、错误
  2. 定义工具:每个 API 端点对应一个工具
  3. 处理分页:支持分页参数
  4. 速率限制:尊重 API 的速率限制
  5. 格式化输出:将 API 响应转换为 AI 友好的格式

适用于

  • Jira API → 项目管理 MCP Server
  • Slack API → 团队通信 MCP Server
  • Stripe API → 支付管理 MCP Server
  • 任何有 REST API 的服务

总结

通过这个 GitHub MCP Server 项目,我们学到了:

  • 如何封装 REST API 为 MCP 工具
  • 认证信息通过环境变量安全传递
  • 速率限制和错误处理的重要性
  • 分页数据的处理方式
  • 工具描述要清晰准确,这直接影响 AI 的使用效果

这个模式可以复用到任何 REST API 的集成中,只需要替换 API 客户端的实现即可。

好的 MCP Server 不只是 API 的简单包装,而是让 AI 能够自然地使用这些能力。工具描述写得好,AI 用得好。

评论

加载中...

相关文章

分享:

评论

加载中...