Skip to content

n8n 文件处理与存储 - 处理各种文档和媒体文件

文件处理是自动化工作流中的常见需求,包括文档转换、图片处理、文件上传下载等。今天我们来学习如何在 n8n 中高效地处理各种类型的文件。

文件处理基础

二进制数据概念

在 n8n 中,文件以二进制数据的形式存储和传递:

javascript
// 二进制数据结构
{
  "json": {
    "fileName": "document.pdf",
    "mimeType": "application/pdf",
    "fileSize": 1024000
  },
  "binary": {
    "data": {
      "data": "base64-encoded-content",
      "mimeType": "application/pdf",
      "fileName": "document.pdf",
      "fileExtension": "pdf"
    }
  }
}

文件类型检测

javascript
// 检测文件类型
function detectFileType(fileName, mimeType, buffer) {
  const extension = fileName.split('.').pop().toLowerCase();
  
  const fileTypes = {
    // 文档类型
    'pdf': { category: 'document', mimeType: 'application/pdf' },
    'doc': { category: 'document', mimeType: 'application/msword' },
    'docx': { category: 'document', mimeType: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' },
    'xls': { category: 'document', mimeType: 'application/vnd.ms-excel' },
    'xlsx': { category: 'document', mimeType: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' },
    
    // 图片类型
    'jpg': { category: 'image', mimeType: 'image/jpeg' },
    'jpeg': { category: 'image', mimeType: 'image/jpeg' },
    'png': { category: 'image', mimeType: 'image/png' },
    'gif': { category: 'image', mimeType: 'image/gif' },
    'webp': { category: 'image', mimeType: 'image/webp' },
    
    // 音频类型
    'mp3': { category: 'audio', mimeType: 'audio/mpeg' },
    'wav': { category: 'audio', mimeType: 'audio/wav' },
    'flac': { category: 'audio', mimeType: 'audio/flac' },
    
    // 视频类型
    'mp4': { category: 'video', mimeType: 'video/mp4' },
    'avi': { category: 'video', mimeType: 'video/x-msvideo' },
    'mov': { category: 'video', mimeType: 'video/quicktime' }
  };
  
  const detected = fileTypes[extension] || { category: 'unknown', mimeType: 'application/octet-stream' };
  
  // 验证 MIME 类型是否匹配
  if (mimeType && mimeType !== detected.mimeType) {
    console.warn(`MIME type mismatch: expected ${detected.mimeType}, got ${mimeType}`);
  }
  
  return {
    ...detected,
    extension,
    fileName,
    actualMimeType: mimeType
  };
}

// 使用示例
const fileInfo = items[0].binary.data;
const detectedType = detectFileType(
  fileInfo.fileName,
  fileInfo.mimeType,
  fileInfo.data
);

return [{ json: detectedType }];

文件上传和下载

HTTP 文件上传

javascript
// 处理文件上传
async function uploadFile(fileData, uploadConfig) {
  const formData = new FormData();
  
  // 添加文件
  const buffer = Buffer.from(fileData.data, 'base64');
  formData.append('file', buffer, {
    filename: fileData.fileName,
    contentType: fileData.mimeType
  });
  
  // 添加其他字段
  if (uploadConfig.metadata) {
    Object.entries(uploadConfig.metadata).forEach(([key, value]) => {
      formData.append(key, value);
    });
  }
  
  const response = await fetch(uploadConfig.url, {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${uploadConfig.token}`,
      ...uploadConfig.headers
    },
    body: formData
  });
  
  if (!response.ok) {
    throw new Error(`Upload failed: ${response.status} ${response.statusText}`);
  }
  
  return await response.json();
}

// 使用示例
const fileData = items[0].binary.data;
const uploadConfig = {
  url: 'https://api.example.com/upload',
  token: process.env.UPLOAD_TOKEN,
  metadata: {
    category: 'documents',
    userId: items[0].json.userId
  }
};

const uploadResult = await uploadFile(fileData, uploadConfig);
return [{ json: uploadResult }];

文件下载

javascript
// 下载文件
async function downloadFile(url, options = {}) {
  const response = await fetch(url, {
    method: 'GET',
    headers: {
      'User-Agent': 'n8n-workflow/1.0',
      ...options.headers
    }
  });
  
  if (!response.ok) {
    throw new Error(`Download failed: ${response.status} ${response.statusText}`);
  }
  
  const buffer = await response.buffer();
  const contentType = response.headers.get('content-type');
  const contentDisposition = response.headers.get('content-disposition');
  
  // 从 Content-Disposition 头提取文件名
  let fileName = 'downloaded-file';
  if (contentDisposition) {
    const match = contentDisposition.match(/filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/);
    if (match) {
      fileName = match[1].replace(/['"]/g, '');
    }
  }
  
  // 如果没有文件名,从 URL 推断
  if (fileName === 'downloaded-file') {
    const urlPath = new URL(url).pathname;
    const urlFileName = urlPath.split('/').pop();
    if (urlFileName && urlFileName.includes('.')) {
      fileName = urlFileName;
    }
  }
  
  return {
    json: {
      fileName,
      mimeType: contentType,
      fileSize: buffer.length,
      downloadUrl: url
    },
    binary: {
      data: {
        data: buffer.toString('base64'),
        mimeType: contentType,
        fileName: fileName,
        fileExtension: fileName.split('.').pop()
      }
    }
  };
}

// 批量下载文件
const downloadUrls = items.map(item => item.json.url);
const downloadResults = [];

for (const url of downloadUrls) {
  try {
    const result = await downloadFile(url);
    downloadResults.push(result);
  } catch (error) {
    downloadResults.push({
      json: {
        error: error.message,
        url: url,
        success: false
      }
    });
  }
}

return downloadResults;

文档处理

PDF 操作

javascript
// PDF 文本提取
async function extractTextFromPDF(pdfBuffer) {
  const pdf = require('pdf-parse');
  
  try {
    const data = await pdf(pdfBuffer);
    
    return {
      text: data.text,
      pages: data.numpages,
      info: data.info,
      metadata: data.metadata,
      version: data.version
    };
  } catch (error) {
    throw new Error(`PDF parsing failed: ${error.message}`);
  }
}

// PDF 分页处理
async function splitPDF(pdfBuffer, options = {}) {
  const PDFDocument = require('pdf-lib').PDFDocument;
  
  const pdfDoc = await PDFDocument.load(pdfBuffer);
  const pageCount = pdfDoc.getPageCount();
  
  const results = [];
  
  if (options.splitByPages) {
    // 按页数分割
    const pagesPerSplit = options.pagesPerSplit || 1;
    
    for (let i = 0; i < pageCount; i += pagesPerSplit) {
      const newPdf = await PDFDocument.create();
      const endPage = Math.min(i + pagesPerSplit, pageCount);
      
      const pages = await newPdf.copyPages(pdfDoc, 
        Array.from({ length: endPage - i }, (_, idx) => i + idx)
      );
      
      pages.forEach(page => newPdf.addPage(page));
      
      const pdfBytes = await newPdf.save();
      
      results.push({
        json: {
          fileName: `split_${i + 1}-${endPage}.pdf`,
          pageRange: `${i + 1}-${endPage}`,
          totalPages: endPage - i
        },
        binary: {
          data: {
            data: Buffer.from(pdfBytes).toString('base64'),
            mimeType: 'application/pdf',
            fileName: `split_${i + 1}-${endPage}.pdf`
          }
        }
      });
    }
  }
  
  return results;
}

// 使用示例
const pdfData = items[0].binary.data;
const pdfBuffer = Buffer.from(pdfData.data, 'base64');

// 提取文本
const textContent = await extractTextFromPDF(pdfBuffer);

// 分割 PDF
const splitResults = await splitPDF(pdfBuffer, {
  splitByPages: true,
  pagesPerSplit: 2
});

return [{
  json: {
    originalFile: pdfData.fileName,
    textContent: textContent,
    splitFiles: splitResults.length
  }
}];

Excel 处理

javascript
// Excel 文件读取
async function readExcelFile(excelBuffer, options = {}) {
  const XLSX = require('xlsx');
  
  const workbook = XLSX.read(excelBuffer, { type: 'buffer' });
  const results = {};
  
  workbook.SheetNames.forEach(sheetName => {
    const worksheet = workbook.Sheets[sheetName];
    
    // 转换为 JSON
    const jsonData = XLSX.utils.sheet_to_json(worksheet, {
      header: options.header || 1,
      defval: options.defaultValue || '',
      blankrows: options.includeBlankRows || false
    });
    
    results[sheetName] = {
      data: jsonData,
      rowCount: jsonData.length,
      range: worksheet['!ref']
    };
  });
  
  return results;
}

// Excel 文件写入
async function writeExcelFile(data, options = {}) {
  const XLSX = require('xlsx');
  
  const workbook = XLSX.utils.book_new();
  
  Object.entries(data).forEach(([sheetName, sheetData]) => {
    const worksheet = XLSX.utils.json_to_sheet(sheetData, {
      header: options.header,
      skipHeader: options.skipHeader || false
    });
    
    // 设置列宽
    if (options.columnWidths) {
      worksheet['!cols'] = options.columnWidths.map(width => ({ wch: width }));
    }
    
    XLSX.utils.book_append_sheet(workbook, worksheet, sheetName);
  });
  
  const excelBuffer = XLSX.write(workbook, {
    type: 'buffer',
    bookType: options.format || 'xlsx'
  });
  
  return excelBuffer;
}

// 使用示例
const excelData = items[0].binary.data;
const excelBuffer = Buffer.from(excelData.data, 'base64');

// 读取 Excel
const excelContent = await readExcelFile(excelBuffer, {
  header: 1,
  includeBlankRows: false
});

// 处理数据
const processedData = {};
Object.entries(excelContent).forEach(([sheetName, sheet]) => {
  processedData[sheetName] = sheet.data.map(row => ({
    ...row,
    processed: true,
    processedAt: new Date().toISOString()
  }));
});

// 写入新的 Excel 文件
const newExcelBuffer = await writeExcelFile(processedData, {
  format: 'xlsx',
  columnWidths: [20, 30, 15, 25]
});

return [{
  json: {
    originalSheets: Object.keys(excelContent),
    totalRows: Object.values(excelContent).reduce((sum, sheet) => sum + sheet.rowCount, 0)
  },
  binary: {
    data: {
      data: newExcelBuffer.toString('base64'),
      mimeType: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
      fileName: 'processed_data.xlsx'
    }
  }
}];

图片处理

图片压缩和调整

javascript
// 图片处理
async function processImage(imageBuffer, options = {}) {
  const sharp = require('sharp');
  
  let image = sharp(imageBuffer);
  
  // 获取图片信息
  const metadata = await image.metadata();
  
  // 调整大小
  if (options.resize) {
    image = image.resize(options.resize.width, options.resize.height, {
      fit: options.resize.fit || 'inside',
      withoutEnlargement: options.resize.withoutEnlargement !== false
    });
  }
  
  // 压缩质量
  if (options.quality) {
    if (metadata.format === 'jpeg') {
      image = image.jpeg({ quality: options.quality });
    } else if (metadata.format === 'png') {
      image = image.png({ quality: options.quality });
    } else if (metadata.format === 'webp') {
      image = image.webp({ quality: options.quality });
    }
  }
  
  // 格式转换
  if (options.format) {
    switch (options.format) {
      case 'jpeg':
        image = image.jpeg();
        break;
      case 'png':
        image = image.png();
        break;
      case 'webp':
        image = image.webp();
        break;
    }
  }
  
  // 添加水印
  if (options.watermark) {
    const watermarkBuffer = Buffer.from(options.watermark.data, 'base64');
    image = image.composite([{
      input: watermarkBuffer,
      gravity: options.watermark.position || 'southeast',
      blend: options.watermark.blend || 'over'
    }]);
  }
  
  const processedBuffer = await image.toBuffer();
  const processedMetadata = await sharp(processedBuffer).metadata();
  
  return {
    buffer: processedBuffer,
    metadata: processedMetadata,
    originalMetadata: metadata,
    compressionRatio: metadata.size / processedMetadata.size
  };
}

// 批量图片处理
const imageProcessingOptions = {
  resize: { width: 800, height: 600, fit: 'inside' },
  quality: 80,
  format: 'jpeg'
};

const processedImages = [];

for (const item of items) {
  if (item.binary && item.binary.data) {
    const imageBuffer = Buffer.from(item.binary.data.data, 'base64');
    
    try {
      const processed = await processImage(imageBuffer, imageProcessingOptions);
      
      processedImages.push({
        json: {
          originalFile: item.binary.data.fileName,
          originalSize: processed.originalMetadata.size,
          processedSize: processed.metadata.size,
          compressionRatio: processed.compressionRatio,
          dimensions: {
            original: `${processed.originalMetadata.width}x${processed.originalMetadata.height}`,
            processed: `${processed.metadata.width}x${processed.metadata.height}`
          }
        },
        binary: {
          data: {
            data: processed.buffer.toString('base64'),
            mimeType: `image/${imageProcessingOptions.format}`,
            fileName: item.binary.data.fileName.replace(/\.[^.]+$/, `.${imageProcessingOptions.format}`)
          }
        }
      });
    } catch (error) {
      processedImages.push({
        json: {
          error: error.message,
          originalFile: item.binary.data.fileName
        }
      });
    }
  }
}

return processedImages;

云存储集成

AWS S3 集成

javascript
// S3 文件操作
class S3FileManager {
  constructor(config) {
    this.config = config;
    this.s3Client = new AWS.S3({
      accessKeyId: config.accessKeyId,
      secretAccessKey: config.secretAccessKey,
      region: config.region
    });
  }
  
  async uploadFile(fileData, key, options = {}) {
    const params = {
      Bucket: this.config.bucket,
      Key: key,
      Body: Buffer.from(fileData.data, 'base64'),
      ContentType: fileData.mimeType,
      Metadata: options.metadata || {},
      ServerSideEncryption: options.encryption || 'AES256'
    };
    
    if (options.acl) {
      params.ACL = options.acl;
    }
    
    const result = await this.s3Client.upload(params).promise();
    
    return {
      location: result.Location,
      etag: result.ETag,
      bucket: result.Bucket,
      key: result.Key
    };
  }
  
  async downloadFile(key) {
    const params = {
      Bucket: this.config.bucket,
      Key: key
    };
    
    const result = await this.s3Client.getObject(params).promise();
    
    return {
      data: result.Body.toString('base64'),
      mimeType: result.ContentType,
      lastModified: result.LastModified,
      etag: result.ETag,
      metadata: result.Metadata
    };
  }
  
  async deleteFile(key) {
    const params = {
      Bucket: this.config.bucket,
      Key: key
    };
    
    await this.s3Client.deleteObject(params).promise();
    return { deleted: true, key };
  }
  
  async listFiles(prefix = '', maxKeys = 1000) {
    const params = {
      Bucket: this.config.bucket,
      Prefix: prefix,
      MaxKeys: maxKeys
    };
    
    const result = await this.s3Client.listObjectsV2(params).promise();
    
    return {
      files: result.Contents.map(obj => ({
        key: obj.Key,
        size: obj.Size,
        lastModified: obj.LastModified,
        etag: obj.ETag
      })),
      hasMore: result.IsTruncated,
      nextToken: result.NextContinuationToken
    };
  }
}

// 使用 S3 存储
const s3Manager = new S3FileManager({
  accessKeyId: process.env.AWS_ACCESS_KEY_ID,
  secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  region: process.env.AWS_REGION,
  bucket: process.env.S3_BUCKET
});

const fileData = items[0].binary.data;
const s3Key = `uploads/${Date.now()}_${fileData.fileName}`;

const uploadResult = await s3Manager.uploadFile(fileData, s3Key, {
  acl: 'private',
  metadata: {
    uploadedBy: 'n8n-workflow',
    originalName: fileData.fileName
  }
});

return [{ json: uploadResult }];

Google Drive 集成

javascript
// Google Drive 文件操作
async function uploadToGoogleDrive(fileData, options = {}) {
  const { google } = require('googleapis');
  
  const auth = new google.auth.GoogleAuth({
    keyFile: process.env.GOOGLE_SERVICE_ACCOUNT_KEY,
    scopes: ['https://www.googleapis.com/auth/drive.file']
  });
  
  const drive = google.drive({ version: 'v3', auth });
  
  const fileMetadata = {
    name: options.fileName || fileData.fileName,
    parents: options.parentFolderId ? [options.parentFolderId] : undefined
  };
  
  const media = {
    mimeType: fileData.mimeType,
    body: Buffer.from(fileData.data, 'base64')
  };
  
  const response = await drive.files.create({
    resource: fileMetadata,
    media: media,
    fields: 'id,name,size,mimeType,createdTime,webViewLink'
  });
  
  return response.data;
}

// 批量上传到 Google Drive
const uploadResults = [];

for (const item of items) {
  if (item.binary && item.binary.data) {
    try {
      const result = await uploadToGoogleDrive(item.binary.data, {
        fileName: `processed_${item.binary.data.fileName}`,
        parentFolderId: process.env.GOOGLE_DRIVE_FOLDER_ID
      });
      
      uploadResults.push({
        json: {
          success: true,
          fileId: result.id,
          fileName: result.name,
          size: result.size,
          webViewLink: result.webViewLink
        }
      });
    } catch (error) {
      uploadResults.push({
        json: {
          success: false,
          error: error.message,
          fileName: item.binary.data.fileName
        }
      });
    }
  }
}

return uploadResults;

文件监控和管理

文件变化监控

javascript
// 文件系统监控
const chokidar = require('chokidar');

function setupFileWatcher(watchPath, options = {}) {
  const watcher = chokidar.watch(watchPath, {
    ignored: options.ignored || /[\/\\]\./,
    persistent: true,
    ignoreInitial: options.ignoreInitial !== false
  });
  
  const events = [];
  
  watcher
    .on('add', path => {
      events.push({
        type: 'file_added',
        path: path,
        timestamp: new Date().toISOString()
      });
    })
    .on('change', path => {
      events.push({
        type: 'file_changed',
        path: path,
        timestamp: new Date().toISOString()
      });
    })
    .on('unlink', path => {
      events.push({
        type: 'file_deleted',
        path: path,
        timestamp: new Date().toISOString()
      });
    });
  
  return {
    watcher,
    getEvents: () => events,
    clearEvents: () => events.length = 0,
    close: () => watcher.close()
  };
}

文件清理任务

javascript
// 自动文件清理
async function cleanupOldFiles(directory, options = {}) {
  const fs = require('fs').promises;
  const path = require('path');
  
  const maxAge = options.maxAge || 30 * 24 * 60 * 60 * 1000; // 30天
  const maxSize = options.maxSize || 1024 * 1024 * 1024; // 1GB
  const filePattern = options.pattern || /\.(tmp|log|cache)$/;
  
  const now = Date.now();
  const results = {
    scanned: 0,
    deleted: 0,
    totalSize: 0,
    freedSpace: 0,
    errors: []
  };
  
  try {
    const files = await fs.readdir(directory);
    
    for (const file of files) {
      const filePath = path.join(directory, file);
      
      try {
        const stats = await fs.stat(filePath);
        results.scanned++;
        results.totalSize += stats.size;
        
        const shouldDelete = 
          (now - stats.mtime.getTime() > maxAge) ||
          (stats.size > maxSize) ||
          (filePattern.test(file));
        
        if (shouldDelete) {
          await fs.unlink(filePath);
          results.deleted++;
          results.freedSpace += stats.size;
        }
      } catch (error) {
        results.errors.push({
          file: filePath,
          error: error.message
        });
      }
    }
  } catch (error) {
    results.errors.push({
      directory: directory,
      error: error.message
    });
  }
  
  return results;
}

// 执行清理任务
const cleanupResult = await cleanupOldFiles('/tmp/n8n-files', {
  maxAge: 7 * 24 * 60 * 60 * 1000, // 7天
  pattern: /\.(tmp|temp|cache)$/
});

return [{ json: cleanupResult }];

小结

文件处理是 n8n 工作流的重要功能:

  1. 理解二进制数据:掌握文件在 n8n 中的存储和传递方式
  2. 选择合适的处理方式:根据文件类型选择对应的处理库
  3. 云存储集成:利用云服务实现文件的持久化存储
  4. 性能优化:合理处理大文件,避免内存溢出
  5. 错误处理:完善的异常处理和恢复机制

下一篇文章,我们将学习邮件与通知系统,这是自动化工作流中重要的沟通环节。

记住,文件处理涉及大量的 I/O 操作和内存使用,要特别注意性能优化和资源管理。对于大文件,考虑使用流式处理或分块处理的方式。