AI 工具 | | 约 46 分钟 | 18,341 字

开发文件处理 MCP Server:文档解析与转换

开发处理 PDF、Excel、CSV 等文件格式的 MCP Server

项目目标

我们要开发一个文件处理 MCP Server,让 AI 能够:

  • 解析 PDF 文件,提取文本内容
  • 读取 Excel 文件,获取表格数据
  • 处理 CSV 文件,支持查询和统计
  • 转换文件格式(CSV ↔ JSON)

这在日常工作中非常实用——你可以让 AI 直接分析报表、提取合同信息、处理数据文件。


项目初始化

mkdir mcp-file-server
cd mcp-file-server
npm init -y
npm install @modelcontextprotocol/sdk zod
npm install pdf-parse xlsx csv-parse csv-stringify
npm install -D typescript @types/node

项目结构:

src/
├── index.ts           # 入口
├── server.ts          # MCP Server 定义
├── parsers/
│   ├── pdf-parser.ts  # PDF 解析
│   ├── excel-parser.ts # Excel 解析
│   └── csv-parser.ts  # CSV 解析
└── utils/
    └── file-utils.ts  # 文件工具函数

文件工具函数

// src/utils/file-utils.ts
import { readFile, stat, access } from "fs/promises";
import { extname, resolve } from "path";

// 允许访问的目录(安全限制)
let allowedDirs: string[] = [];

export function setAllowedDirs(dirs: string[]): void {
  allowedDirs = dirs.map((d) => resolve(d));
}

export function validatePath(filePath: string): string {
  const resolved = resolve(filePath);

  if (allowedDirs.length > 0) {
    const isAllowed = allowedDirs.some((dir) => resolved.startsWith(dir));
    if (!isAllowed) {
      throw new Error(
        `Access denied: ${filePath} is outside allowed directories`
      );
    }
  }

  return resolved;
}

export async function fileExists(filePath: string): Promise<boolean> {
  try {
    await access(filePath);
    return true;
  } catch {
    return false;
  }
}

export async function getFileInfo(filePath: string) {
  const resolved = validatePath(filePath);
  const stats = await stat(resolved);
  const ext = extname(resolved).toLowerCase();

  return {
    path: resolved,
    size: stats.size,
    extension: ext,
    modifiedAt: stats.mtime,
    sizeFormatted: formatFileSize(stats.size),
  };
}

export async function readFileBuffer(filePath: string): Promise<Buffer> {
  const resolved = validatePath(filePath);
  return readFile(resolved);
}

export async function readFileText(filePath: string): Promise<string> {
  const resolved = validatePath(filePath);
  return readFile(resolved, "utf-8");
}

function formatFileSize(bytes: number): string {
  if (bytes < 1024) return `${bytes} B`;
  if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
  return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
}

PDF 解析器

// src/parsers/pdf-parser.ts
import pdfParse from "pdf-parse";
import { readFileBuffer } from "../utils/file-utils.js";

export interface PdfResult {
  text: string;
  pageCount: number;
  info: {
    title?: string;
    author?: string;
    creator?: string;
  };
}

export async function parsePdf(
  filePath: string,
  options?: { maxPages?: number; pageRange?: string }
): Promise<PdfResult> {
  const buffer = await readFileBuffer(filePath);

  const parseOptions: pdfParse.Options = {};

  if (options?.maxPages) {
    parseOptions.max = options.maxPages;
  }

  const data = await pdfParse(buffer, parseOptions);

  let text = data.text;

  // 如果指定了页面范围,尝试按页分割
  if (options?.pageRange) {
    const pages = text.split(/\f/); // PDF 页面分隔符
    const range = parsePageRange(options.pageRange, pages.length);
    text = range.map((i) => pages[i] || "").join("\n\n---\n\n");
  }

  return {
    text: text.trim(),
    pageCount: data.numpages,
    info: {
      title: data.info?.Title,
      author: data.info?.Author,
      creator: data.info?.Creator,
    },
  };
}

function parsePageRange(range: string, totalPages: number): number[] {
  const pages: number[] = [];

  for (const part of range.split(",")) {
    const trimmed = part.trim();
    if (trimmed.includes("-")) {
      const [start, end] = trimmed.split("-").map(Number);
      for (let i = start - 1; i < Math.min(end, totalPages); i++) {
        pages.push(i);
      }
    } else {
      const page = Number(trimmed) - 1;
      if (page >= 0 && page < totalPages) {
        pages.push(page);
      }
    }
  }

  return pages;
}

Excel 解析器

// src/parsers/excel-parser.ts
import * as XLSX from "xlsx";
import { readFileBuffer } from "../utils/file-utils.js";

export interface ExcelSheet {
  name: string;
  rowCount: number;
  columnCount: number;
  headers: string[];
}

export interface ExcelData {
  headers: string[];
  rows: Record<string, unknown>[];
  totalRows: number;
}

export async function listSheets(filePath: string): Promise<ExcelSheet[]> {
  const buffer = await readFileBuffer(filePath);
  const workbook = XLSX.read(buffer, { type: "buffer" });

  return workbook.SheetNames.map((name) => {
    const sheet = workbook.Sheets[name];
    const range = XLSX.utils.decode_range(sheet["!ref"] || "A1");

    return {
      name,
      rowCount: range.e.r - range.s.r + 1,
      columnCount: range.e.c - range.s.c + 1,
      headers: getHeaders(sheet, range),
    };
  });
}

export async function readSheet(
  filePath: string,
  sheetName?: string,
  options?: { startRow?: number; maxRows?: number }
): Promise<ExcelData> {
  const buffer = await readFileBuffer(filePath);
  const workbook = XLSX.read(buffer, { type: "buffer" });

  const targetSheet = sheetName || workbook.SheetNames[0];
  const sheet = workbook.Sheets[targetSheet];

  if (!sheet) {
    throw new Error(`Sheet "${targetSheet}" not found`);
  }

  const jsonData = XLSX.utils.sheet_to_json<Record<string, unknown>>(sheet);
  const startRow = options?.startRow || 0;
  const maxRows = options?.maxRows || 100;

  const sliced = jsonData.slice(startRow, startRow + maxRows);
  const headers =
    sliced.length > 0 ? Object.keys(sliced[0]) : [];

  return {
    headers,
    rows: sliced,
    totalRows: jsonData.length,
  };
}

function getHeaders(
  sheet: XLSX.WorkSheet,
  range: XLSX.Range
): string[] {
  const headers: string[] = [];
  for (let c = range.s.c; c <= range.e.c; c++) {
    const cell = sheet[XLSX.utils.encode_cell({ r: range.s.r, c })];
    headers.push(cell ? String(cell.v) : `Column${c + 1}`);
  }
  return headers;
}

CSV 解析器

// src/parsers/csv-parser.ts
import { parse } from "csv-parse/sync";
import { stringify } from "csv-stringify/sync";
import { readFileText } from "../utils/file-utils.js";

export interface CsvData {
  headers: string[];
  rows: Record<string, string>[];
  totalRows: number;
}

export async function parseCsv(
  filePath: string,
  options?: {
    delimiter?: string;
    maxRows?: number;
    encoding?: BufferEncoding;
  }
): Promise<CsvData> {
  const content = await readFileText(filePath);

  const records = parse(content, {
    columns: true,
    skip_empty_lines: true,
    delimiter: options?.delimiter || ",",
    trim: true,
  }) as Record<string, string>[];

  const maxRows = options?.maxRows || 100;
  const sliced = records.slice(0, maxRows);
  const headers = sliced.length > 0 ? Object.keys(sliced[0]) : [];

  return {
    headers,
    rows: sliced,
    totalRows: records.length,
  };
}

export function csvToJson(
  rows: Record<string, string>[]
): string {
  return JSON.stringify(rows, null, 2);
}

export function jsonToCsv(
  data: Record<string, unknown>[]
): string {
  if (data.length === 0) return "";

  const headers = Object.keys(data[0]);
  return stringify(data, {
    header: true,
    columns: headers,
  });
}

export function getCsvStats(
  rows: Record<string, string>[],
  column: string
): Record<string, unknown> {
  const values = rows.map((r) => r[column]).filter(Boolean);
  const numericValues = values
    .map(Number)
    .filter((n) => !isNaN(n));

  const stats: Record<string, unknown> = {
    column,
    totalValues: values.length,
    uniqueValues: new Set(values).size,
    emptyValues: rows.length - values.length,
  };

  if (numericValues.length > 0) {
    const sorted = numericValues.sort((a, b) => a - b);
    stats.min = sorted[0];
    stats.max = sorted[sorted.length - 1];
    stats.mean =
      numericValues.reduce((a, b) => a + b, 0) / numericValues.length;
    stats.median =
      sorted.length % 2 === 0
        ? (sorted[sorted.length / 2 - 1] + sorted[sorted.length / 2]) / 2
        : sorted[Math.floor(sorted.length / 2)];
  }

  return stats;
}

MCP Server 实现

// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import { parsePdf } from "./parsers/pdf-parser.js";
import { listSheets, readSheet } from "./parsers/excel-parser.js";
import { parseCsv, csvToJson, getCsvStats } from "./parsers/csv-parser.js";
import { getFileInfo } from "./utils/file-utils.js";

export function createServer(): McpServer {
  const server = new McpServer({
    name: "mcp-file-server",
    version: "1.0.0",
  });

  // 工具:获取文件信息
  server.tool(
    "file_info",
    "获取文件的基本信息(大小、类型、修改时间)",
    {
      path: z.string().describe("文件的绝对路径"),
    },
    async ({ path }) => {
      try {
        const info = await getFileInfo(path);
        return {
          content: [
            {
              type: "text",
              text:
                `文件信息:\n` +
                `- 路径:${info.path}\n` +
                `- 大小:${info.sizeFormatted}\n` +
                `- 类型:${info.extension}\n` +
                `- 修改时间:${info.modifiedAt.toLocaleString()}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `获取文件信息失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:解析 PDF
  server.tool(
    "read_pdf",
    "解析 PDF 文件,提取文本内容。支持指定页面范围",
    {
      path: z.string().describe("PDF 文件的绝对路径"),
      max_pages: z.number().optional().describe("最多解析的页数"),
      page_range: z
        .string()
        .optional()
        .describe("页面范围,如 '1-3' 或 '1,3,5'"),
    },
    async ({ path, max_pages, page_range }) => {
      try {
        const result = await parsePdf(path, {
          maxPages: max_pages,
          pageRange: page_range,
        });

        let text = `PDF 文件解析结果:\n`;
        text += `- 总页数:${result.pageCount}\n`;
        if (result.info.title) text += `- 标题:${result.info.title}\n`;
        if (result.info.author) text += `- 作者:${result.info.author}\n`;
        text += `\n---\n\n${result.text}`;

        // 如果内容太长,截断并提示
        if (text.length > 50000) {
          text = text.substring(0, 50000) + "\n\n[内容已截断,请使用 page_range 参数指定页面范围]";
        }

        return { content: [{ type: "text", text }] };
      } catch (error) {
        return {
          content: [{ type: "text", text: `PDF 解析失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:列出 Excel 工作表
  server.tool(
    "list_excel_sheets",
    "列出 Excel 文件中的所有工作表及其基本信息",
    {
      path: z.string().describe("Excel 文件的绝对路径(.xlsx 或 .xls)"),
    },
    async ({ path }) => {
      try {
        const sheets = await listSheets(path);

        const sheetList = sheets
          .map(
            (s) =>
              `### ${s.name}\n- 行数:${s.rowCount}\n- 列数:${s.columnCount}\n- 列名:${s.headers.join(", ")}`
          )
          .join("\n\n");

        return {
          content: [
            {
              type: "text",
              text: `Excel 文件包含 ${sheets.length} 个工作表:\n\n${sheetList}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `Excel 解析失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:读取 Excel 数据
  server.tool(
    "read_excel",
    "读取 Excel 工作表的数据,返回表格格式",
    {
      path: z.string().describe("Excel 文件的绝对路径"),
      sheet_name: z.string().optional().describe("工作表名称,默认第一个"),
      start_row: z.number().optional().default(0).describe("起始行号(0 开始)"),
      max_rows: z.number().optional().default(50).describe("最大返回行数"),
    },
    async ({ path, sheet_name, start_row, max_rows }) => {
      try {
        const data = await readSheet(path, sheet_name, {
          startRow: start_row,
          maxRows: max_rows,
        });

        if (data.rows.length === 0) {
          return {
            content: [{ type: "text", text: "工作表中没有数据。" }],
          };
        }

        // 格式化为 Markdown 表格
        const header = `| ${data.headers.join(" | ")} |`;
        const separator = `| ${data.headers.map(() => "---").join(" | ")} |`;
        const rows = data.rows
          .map(
            (row) =>
              `| ${data.headers.map((h) => String(row[h] ?? "")).join(" | ")} |`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text:
                `数据(共 ${data.totalRows} 行,显示 ${data.rows.length} 行):\n\n` +
                `${header}\n${separator}\n${rows}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `读取失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:解析 CSV
  server.tool(
    "read_csv",
    "解析 CSV 文件,返回表格数据",
    {
      path: z.string().describe("CSV 文件的绝对路径"),
      delimiter: z.string().optional().default(",").describe("分隔符,默认逗号"),
      max_rows: z.number().optional().default(50).describe("最大返回行数"),
    },
    async ({ path, delimiter, max_rows }) => {
      try {
        const data = await parseCsv(path, { delimiter, maxRows: max_rows });

        if (data.rows.length === 0) {
          return {
            content: [{ type: "text", text: "CSV 文件中没有数据。" }],
          };
        }

        const header = `| ${data.headers.join(" | ")} |`;
        const separator = `| ${data.headers.map(() => "---").join(" | ")} |`;
        const rows = data.rows
          .map(
            (row) =>
              `| ${data.headers.map((h) => row[h] ?? "").join(" | ")} |`
          )
          .join("\n");

        return {
          content: [
            {
              type: "text",
              text:
                `CSV 数据(共 ${data.totalRows} 行,显示 ${data.rows.length} 行):\n\n` +
                `${header}\n${separator}\n${rows}`,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `CSV 解析失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:CSV 列统计
  server.tool(
    "csv_column_stats",
    "对 CSV 文件的指定列进行统计分析(计数、唯一值、数值统计等)",
    {
      path: z.string().describe("CSV 文件的绝对路径"),
      column: z.string().describe("要统计的列名"),
    },
    async ({ path, column }) => {
      try {
        const data = await parseCsv(path, { maxRows: 10000 });

        if (!data.headers.includes(column)) {
          return {
            content: [
              {
                type: "text",
                text: `列 "${column}" 不存在。可用的列:${data.headers.join(", ")}`,
              },
            ],
          };
        }

        const stats = getCsvStats(data.rows, column);

        let text = `列 "${column}" 的统计信息:\n`;
        text += `- 总值数:${stats.totalValues}\n`;
        text += `- 唯一值:${stats.uniqueValues}\n`;
        text += `- 空值数:${stats.emptyValues}\n`;

        if (stats.min !== undefined) {
          text += `\n数值统计:\n`;
          text += `- 最小值:${stats.min}\n`;
          text += `- 最大值:${stats.max}\n`;
          text += `- 平均值:${(stats.mean as number).toFixed(2)}\n`;
          text += `- 中位数:${stats.median}\n`;
        }

        return { content: [{ type: "text", text }] };
      } catch (error) {
        return {
          content: [{ type: "text", text: `统计失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  // 工具:CSV 转 JSON
  server.tool(
    "csv_to_json",
    "将 CSV 文件转换为 JSON 格式",
    {
      path: z.string().describe("CSV 文件的绝对路径"),
      max_rows: z.number().optional().default(100).describe("最大转换行数"),
    },
    async ({ path, max_rows }) => {
      try {
        const data = await parseCsv(path, { maxRows: max_rows });
        const json = csvToJson(data.rows);

        return {
          content: [
            {
              type: "text",
              text: `转换结果(${data.rows.length} 条记录):\n\n\`\`\`json\n${json}\n\`\`\``,
            },
          ],
        };
      } catch (error) {
        return {
          content: [{ type: "text", text: `转换失败:${(error as Error).message}` }],
          isError: true,
        };
      }
    }
  );

  return server;
}

入口文件

// src/index.ts
#!/usr/bin/env node

import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { setAllowedDirs } from "./utils/file-utils.js";
import { createServer } from "./server.js";

async function main() {
  // 从命令行参数获取允许访问的目录
  const allowedDirs = process.argv.slice(2);

  if (allowedDirs.length === 0) {
    console.error("Usage: mcp-file-server <dir1> [dir2] ...");
    console.error("Specify directories that the server is allowed to access");
    process.exit(1);
  }

  setAllowedDirs(allowedDirs);

  const server = createServer();
  const transport = new StdioServerTransport();
  await server.connect(transport);
}

main().catch((error) => {
  console.error("Server failed to start:", error);
  process.exit(1);
});

安全考虑

文件处理 Server 的安全性尤为重要:

1. 目录白名单

只允许访问指定目录下的文件,防止越权访问:

{
  "mcpServers": {
    "files": {
      "command": "node",
      "args": [
        "/path/to/mcp-file-server/dist/index.js",
        "/Users/me/documents",
        "/Users/me/downloads"
      ]
    }
  }
}

2. 文件大小限制

避免处理过大的文件导致内存溢出:

const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB

const info = await getFileInfo(path);
if (info.size > MAX_FILE_SIZE) {
  throw new Error(`File too large: ${info.sizeFormatted} (max 50MB)`);
}

3. 只读操作

这个 Server 只提供读取和分析功能,不提供写入或删除操作。

4. 路径遍历防护

// validatePath 函数中已经使用 resolve() 处理了路径
// 防止 ../../../etc/passwd 这类攻击
const resolved = resolve(filePath);

使用场景

配置好 Server 后,可以这样使用:

用户:帮我分析 /Users/me/documents/sales-report.xlsx 的销售数据

AI:我来读取这个 Excel 文件...
    [调用 list_excel_sheets]
    文件包含 3 个工作表:Q1, Q2, Q3

    [调用 read_excel,sheet_name="Q1"]
    Q1 数据如下...

    根据数据分析,Q1 的总销售额为 xxx,环比增长 xx%...

总结

文件处理 MCP Server 是一个非常实用的工具,它让 AI 能够直接处理我们日常工作中的各种文件。核心要点:

  • 每种文件格式用独立的解析器处理,保持代码清晰
  • 安全性是第一优先级:目录白名单、文件大小限制、只读操作
  • 返回格式化的数据(Markdown 表格),让 AI 更容易理解
  • 大文件要做截断处理,避免超出上下文限制

数据不应该被锁在文件里。一个好的文件处理工具,让 AI 成为你的数据分析助手。

评论

加载中...

相关文章

分享:

评论

加载中...