AI 工具 | | 约 35 分钟 | 13,777 字

开发数据库 MCP Server:SQL 查询助手

从零开发一个连接 PostgreSQL/MySQL 的 MCP Server

项目目标

我们要开发一个数据库 MCP Server,让 AI 能够:

  • 列出数据库中的所有表
  • 查看表结构(列名、类型、约束)
  • 执行 SQL 查询(只读)
  • 获取表的数据统计信息

最终效果:在 Claude Code 中直接用自然语言查询数据库。

用户:帮我查一下最近 7 天注册的用户数量
AI:我来查询数据库...
    [调用 query 工具]
    最近 7 天共注册了 142 位用户。

项目初始化

创建项目

mkdir mcp-database-server
cd mcp-database-server
npm init -y

安装依赖

# MCP SDK
npm install @modelcontextprotocol/sdk

# 数据库驱动
npm install pg           # PostgreSQL
npm install better-sqlite3  # SQLite(用于本地测试)

# 开发依赖
npm install -D typescript @types/node @types/pg
npm install -D @types/better-sqlite3

TypeScript 配置

// tsconfig.json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "declaration": true
  },
  "include": ["src/**/*"]
}

package.json 配置

{
  "name": "mcp-database-server",
  "version": "1.0.0",
  "type": "module",
  "bin": {
    "mcp-database-server": "./dist/index.js"
  },
  "scripts": {
    "build": "tsc",
    "dev": "tsc --watch",
    "start": "node dist/index.js"
  }
}

项目结构

src/
├── index.ts          # 入口文件,启动 Server
├── server.ts         # MCP Server 定义
├── database.ts       # 数据库连接和查询
├── tools.ts          # 工具定义
└── types.ts          # 类型定义

实现数据库连接层

先实现数据库操作的封装:

// src/database.ts
import pg from "pg";

export interface TableInfo {
  name: string;
  schema: string;
  rowCount: number;
}

export interface ColumnInfo {
  name: string;
  type: string;
  nullable: boolean;
  defaultValue: string | null;
  isPrimaryKey: boolean;
}

export interface QueryResult {
  columns: string[];
  rows: Record<string, unknown>[];
  rowCount: number;
}

export class DatabaseConnection {
  private pool: pg.Pool;

  constructor(connectionString: string) {
    this.pool = new pg.Pool({
      connectionString,
      max: 5,
      idleTimeoutMillis: 30000,
    });
  }

  async listTables(): Promise<TableInfo[]> {
    const result = await this.pool.query(`
      SELECT
        t.table_name as name,
        t.table_schema as schema,
        COALESCE(s.n_live_tup, 0) as row_count
      FROM information_schema.tables t
      LEFT JOIN pg_stat_user_tables s
        ON t.table_name = s.relname
      WHERE t.table_schema = 'public'
        AND t.table_type = 'BASE TABLE'
      ORDER BY t.table_name
    `);

    return result.rows.map((row) => ({
      name: row.name,
      schema: row.schema,
      rowCount: Number(row.row_count),
    }));
  }

  async describeTable(tableName: string): Promise<ColumnInfo[]> {
    // 防止 SQL 注入:验证表名
    if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
      throw new Error(`Invalid table name: ${tableName}`);
    }

    const result = await this.pool.query(
      `
      SELECT
        c.column_name as name,
        c.data_type as type,
        c.is_nullable = 'YES' as nullable,
        c.column_default as default_value,
        COALESCE(
          (SELECT true FROM information_schema.key_column_usage k
           JOIN information_schema.table_constraints tc
             ON k.constraint_name = tc.constraint_name
           WHERE k.column_name = c.column_name
             AND k.table_name = c.table_name
             AND tc.constraint_type = 'PRIMARY KEY'
           LIMIT 1),
          false
        ) as is_primary_key
      FROM information_schema.columns c
      WHERE c.table_name = $1
        AND c.table_schema = 'public'
      ORDER BY c.ordinal_position
    `,
      [tableName]
    );

    return result.rows.map((row) => ({
      name: row.name,
      type: row.type,
      nullable: row.nullable,
      defaultValue: row.default_value,
      isPrimaryKey: row.is_primary_key,
    }));
  }

  async query(sql: string, limit: number = 100): Promise<QueryResult> {
    // 安全检查:只允许 SELECT 语句
    const trimmed = sql.trim().toUpperCase();
    if (!trimmed.startsWith("SELECT") && !trimmed.startsWith("WITH")) {
      throw new Error("Only SELECT and WITH (CTE) queries are allowed");
    }

    // 禁止危险操作
    const forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"];
    for (const keyword of forbidden) {
      if (trimmed.includes(keyword)) {
        throw new Error(`Forbidden keyword detected: ${keyword}`);
      }
    }

    // 添加 LIMIT 限制
    const limitedSql = sql.replace(/;?\s*$/, "") + ` LIMIT ${limit}`;

    const result = await this.pool.query(limitedSql);

    return {
      columns: result.fields.map((f) => f.name),
      rows: result.rows,
      rowCount: result.rowCount ?? 0,
    };
  }

  async getTableStats(tableName: string): Promise<Record<string, unknown>> {
    if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
      throw new Error(`Invalid table name: ${tableName}`);
    }

    const countResult = await this.pool.query(
      `SELECT COUNT(*) as total FROM "${tableName}"`
    );

    const sizeResult = await this.pool.query(
      `SELECT pg_size_pretty(pg_total_relation_size($1)) as size`,
      [tableName]
    );

    return {
      tableName,
      totalRows: Number(countResult.rows[0].total),
      totalSize: sizeResult.rows[0].size,
    };
  }

  async close(): Promise<void> {
    await this.pool.end();
  }
}

实现 MCP Server

工具定义

// src/tools.ts
import { z } from "zod";

export const toolSchemas = {
  listTables: {
    name: "list_tables",
    description: "列出数据库中所有的表,包含表名和大致行数",
    schema: {},
  },

  describeTable: {
    name: "describe_table",
    description: "查看指定表的结构,包含列名、数据类型、是否可空、默认值、是否主键",
    schema: {
      table_name: z.string().describe("要查看的表名"),
    },
  },

  query: {
    name: "query",
    description:
      "执行只读 SQL 查询(仅支持 SELECT 语句),返回查询结果",
    schema: {
      sql: z.string().describe("要执行的 SQL 查询语句(仅支持 SELECT)"),
      limit: z
        .number()
        .optional()
        .default(100)
        .describe("返回结果的最大行数,默认 100"),
    },
  },

  tableStats: {
    name: "table_stats",
    description: "获取指定表的统计信息,包含总行数和占用空间",
    schema: {
      table_name: z.string().describe("要查看统计信息的表名"),
    },
  },
};

Server 主体

// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { DatabaseConnection } from "./database.js";
import { toolSchemas } from "./tools.js";

export function createServer(db: DatabaseConnection): McpServer {
  const server = new McpServer({
    name: "mcp-database-server",
    version: "1.0.0",
  });

  // 工具:列出所有表
  server.tool(
    toolSchemas.listTables.name,
    toolSchemas.listTables.description,
    toolSchemas.listTables.schema,
    async () => {
      try {
        const tables = await db.listTables();

        if (tables.length === 0) {
          return {
            content: [{ type: "text", text: "数据库中没有找到任何表。" }],
          };
        }

        const tableList = tables
          .map((t) => `- ${t.name} (约 ${t.rowCount} 行)`)
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text: `数据库中共有 ${tables.length} 张表:\n\n${tableList}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            { type: "text", text: `查询失败:${(error as Error).message}` },
          ],
          isError: true,
        };
      }
    }
  );

  // 工具:查看表结构
  server.tool(
    toolSchemas.describeTable.name,
    toolSchemas.describeTable.description,
    toolSchemas.describeTable.schema,
    async ({ table_name }) => {
      try {
        const columns = await db.describeTable(table_name);

        if (columns.length === 0) {
          return {
            content: [{ type: "text", text: `表 ${table_name} 不存在或没有列。` }],
          };
        }

        const header = "| 列名 | 类型 | 可空 | 默认值 | 主键 |";
        const separator = "|------|------|------|--------|------|";
        const rows = columns
          .map(
            (c) =>
              `| ${c.name} | ${c.type} | ${c.nullable ? "是" : "否"} | ${c.defaultValue ?? "-"} | ${c.isPrimaryKey ? "是" : "-"} |`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text: `表 ${table_name} 的结构:\n\n${header}\n${separator}\n${rows}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            { type: "text", text: `查询失败:${(error as Error).message}` },
          ],
          isError: true,
        };
      }
    }
  );

  // 工具:执行 SQL 查询
  server.tool(
    toolSchemas.query.name,
    toolSchemas.query.description,
    toolSchemas.query.schema,
    async ({ sql, limit }) => {
      try {
        const result = await db.query(sql, limit);

        if (result.rows.length === 0) {
          return {
            content: [{ type: "text", text: "查询没有返回任何结果。" }],
          };
        }

        // 格式化为 Markdown 表格
        const header = `| ${result.columns.join(" | ")} |`;
        const separator = `| ${result.columns.map(() => "---").join(" | ")} |`;
        const rows = result.rows
          .map(
            (row) =>
              `| ${result.columns.map((col) => String(row[col] ?? "NULL")).join(" | ")} |`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text: `查询返回 ${result.rowCount} 条结果:\n\n${header}\n${separator}\n${rows}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            { type: "text", text: `查询失败:${(error as Error).message}` },
          ],
          isError: true,
        };
      }
    }
  );

  // 工具:表统计信息
  server.tool(
    toolSchemas.tableStats.name,
    toolSchemas.tableStats.description,
    toolSchemas.tableStats.schema,
    async ({ table_name }) => {
      try {
        const stats = await db.getTableStats(table_name);

        return {
          content: [
            {
              type: "text",
              text: `表 ${table_name} 的统计信息:\n- 总行数:${stats.totalRows}\n- 占用空间:${stats.totalSize}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [
            { type: "text", text: `查询失败:${(error as Error).message}` },
          ],
          isError: true,
        };
      }
    }
  );

  return server;
}

入口文件

// src/index.ts
#!/usr/bin/env node

import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { DatabaseConnection } from "./database.js";
import { createServer } from "./server.js";

async function main() {
  // 从命令行参数或环境变量获取连接字符串
  const connectionString =
    process.argv[2] || process.env.DATABASE_URL;

  if (!connectionString) {
    console.error(
      "Usage: mcp-database-server <connection-string>"
    );
    console.error(
      "  or set DATABASE_URL environment variable"
    );
    process.exit(1);
  }

  // 创建数据库连接
  const db = new DatabaseConnection(connectionString);

  // 创建 MCP Server
  const server = createServer(db);

  // 使用 stdio 传输
  const transport = new StdioServerTransport();
  await server.connect(transport);

  // 优雅退出
  process.on("SIGINT", async () => {
    await db.close();
    process.exit(0);
  });
}

main().catch((error) => {
  console.error("Server failed to start:", error);
  process.exit(1);
});

SQL 安全处理

数据库 MCP Server 的安全性至关重要。以下是我们采取的安全措施:

1. 只读限制

// 只允许 SELECT 和 WITH 语句
const trimmed = sql.trim().toUpperCase();
if (!trimmed.startsWith("SELECT") && !trimmed.startsWith("WITH")) {
  throw new Error("Only SELECT queries are allowed");
}

2. 关键词过滤

const forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE"];
for (const keyword of forbidden) {
  if (trimmed.includes(keyword)) {
    throw new Error(`Forbidden keyword: ${keyword}`);
  }
}

3. 结果限制

// 强制添加 LIMIT,防止返回过多数据
const limitedSql = sql + ` LIMIT ${limit}`;

4. 表名验证

// 防止 SQL 注入
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) {
  throw new Error(`Invalid table name: ${tableName}`);
}

5. 数据库用户权限

最重要的一层防护:使用只读数据库用户。

-- 创建只读用户
CREATE USER mcp_readonly WITH PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE mydb TO mcp_readonly;
GRANT USAGE ON SCHEMA public TO mcp_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mcp_readonly;

配置和使用

在 Claude Code 中配置

{
  "mcpServers": {
    "database": {
      "command": "node",
      "args": [
        "/path/to/mcp-database-server/dist/index.js",
        "postgresql://mcp_readonly:password@localhost:5432/mydb"
      ]
    }
  }
}

使用环境变量

{
  "mcpServers": {
    "database": {
      "command": "node",
      "args": ["/path/to/mcp-database-server/dist/index.js"],
      "env": {
        "DATABASE_URL": "postgresql://mcp_readonly:password@localhost:5432/mydb"
      }
    }
  }
}

测试

使用 MCP Inspector 测试

# 构建项目
npm run build

# 使用 Inspector 测试
npx @modelcontextprotocol/inspector node dist/index.js \
  "postgresql://user:pass@localhost:5432/testdb"

在 Inspector 界面中可以:

  1. 查看注册的 4 个工具
  2. 手动调用每个工具
  3. 验证返回结果格式

手动测试

# 直接运行 Server,通过 stdin 发送请求
echo '{"jsonrpc":"2.0","id":1,"method":"tools/list"}' | \
  node dist/index.js "postgresql://user:pass@localhost:5432/testdb"

扩展思路

这个基础版本可以继续扩展:

  • 查询缓存:对相同查询结果进行缓存,减少数据库压力
  • 查询历史:记录执行过的查询,方便回溯
  • 多数据库支持:同时连接多个数据库
  • 数据可视化:返回图表友好的数据格式
  • 慢查询告警:对执行时间过长的查询给出提示

总结

通过这个项目,我们完成了一个实用的数据库 MCP Server。核心要点:

  • 使用 @modelcontextprotocol/sdk 简化 Server 开发
  • 工具定义要清晰,description 决定了 AI 何时使用这个工具
  • 安全性是数据库 Server 的重中之重,多层防护不可少
  • 返回格式化的结果(Markdown 表格)让 AI 更容易理解和展示

让 AI 理解你的数据,从一个好的 MCP Server 开始。安全第一,功能第二。

评论

加载中...

相关文章

分享:

评论

加载中...