n8n 文件处理与存储 - 处理各种文档和媒体文件
文件处理是自动化工作流中的常见需求,包括文档转换、图片处理、文件上传下载等。今天我们来学习如何在 n8n 中高效地处理各种类型的文件。
文件处理基础
二进制数据概念
在 n8n 中,文件以二进制数据的形式存储和传递:
javascript
// 二进制数据结构
{
"json": {
"fileName": "document.pdf",
"mimeType": "application/pdf",
"fileSize": 1024000
},
"binary": {
"data": {
"data": "base64-encoded-content",
"mimeType": "application/pdf",
"fileName": "document.pdf",
"fileExtension": "pdf"
}
}
}
文件类型检测
javascript
// 检测文件类型
function detectFileType(fileName, mimeType, buffer) {
const extension = fileName.split('.').pop().toLowerCase();
const fileTypes = {
// 文档类型
'pdf': { category: 'document', mimeType: 'application/pdf' },
'doc': { category: 'document', mimeType: 'application/msword' },
'docx': { category: 'document', mimeType: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' },
'xls': { category: 'document', mimeType: 'application/vnd.ms-excel' },
'xlsx': { category: 'document', mimeType: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' },
// 图片类型
'jpg': { category: 'image', mimeType: 'image/jpeg' },
'jpeg': { category: 'image', mimeType: 'image/jpeg' },
'png': { category: 'image', mimeType: 'image/png' },
'gif': { category: 'image', mimeType: 'image/gif' },
'webp': { category: 'image', mimeType: 'image/webp' },
// 音频类型
'mp3': { category: 'audio', mimeType: 'audio/mpeg' },
'wav': { category: 'audio', mimeType: 'audio/wav' },
'flac': { category: 'audio', mimeType: 'audio/flac' },
// 视频类型
'mp4': { category: 'video', mimeType: 'video/mp4' },
'avi': { category: 'video', mimeType: 'video/x-msvideo' },
'mov': { category: 'video', mimeType: 'video/quicktime' }
};
const detected = fileTypes[extension] || { category: 'unknown', mimeType: 'application/octet-stream' };
// 验证 MIME 类型是否匹配
if (mimeType && mimeType !== detected.mimeType) {
console.warn(`MIME type mismatch: expected ${detected.mimeType}, got ${mimeType}`);
}
return {
...detected,
extension,
fileName,
actualMimeType: mimeType
};
}
// 使用示例
const fileInfo = items[0].binary.data;
const detectedType = detectFileType(
fileInfo.fileName,
fileInfo.mimeType,
fileInfo.data
);
return [{ json: detectedType }];
文件上传和下载
HTTP 文件上传
javascript
// 处理文件上传
async function uploadFile(fileData, uploadConfig) {
const formData = new FormData();
// 添加文件
const buffer = Buffer.from(fileData.data, 'base64');
formData.append('file', buffer, {
filename: fileData.fileName,
contentType: fileData.mimeType
});
// 添加其他字段
if (uploadConfig.metadata) {
Object.entries(uploadConfig.metadata).forEach(([key, value]) => {
formData.append(key, value);
});
}
const response = await fetch(uploadConfig.url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${uploadConfig.token}`,
...uploadConfig.headers
},
body: formData
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.status} ${response.statusText}`);
}
return await response.json();
}
// 使用示例
const fileData = items[0].binary.data;
const uploadConfig = {
url: 'https://api.example.com/upload',
token: process.env.UPLOAD_TOKEN,
metadata: {
category: 'documents',
userId: items[0].json.userId
}
};
const uploadResult = await uploadFile(fileData, uploadConfig);
return [{ json: uploadResult }];
文件下载
javascript
// 下载文件
async function downloadFile(url, options = {}) {
const response = await fetch(url, {
method: 'GET',
headers: {
'User-Agent': 'n8n-workflow/1.0',
...options.headers
}
});
if (!response.ok) {
throw new Error(`Download failed: ${response.status} ${response.statusText}`);
}
const buffer = await response.buffer();
const contentType = response.headers.get('content-type');
const contentDisposition = response.headers.get('content-disposition');
// 从 Content-Disposition 头提取文件名
let fileName = 'downloaded-file';
if (contentDisposition) {
const match = contentDisposition.match(/filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/);
if (match) {
fileName = match[1].replace(/['"]/g, '');
}
}
// 如果没有文件名,从 URL 推断
if (fileName === 'downloaded-file') {
const urlPath = new URL(url).pathname;
const urlFileName = urlPath.split('/').pop();
if (urlFileName && urlFileName.includes('.')) {
fileName = urlFileName;
}
}
return {
json: {
fileName,
mimeType: contentType,
fileSize: buffer.length,
downloadUrl: url
},
binary: {
data: {
data: buffer.toString('base64'),
mimeType: contentType,
fileName: fileName,
fileExtension: fileName.split('.').pop()
}
}
};
}
// 批量下载文件
const downloadUrls = items.map(item => item.json.url);
const downloadResults = [];
for (const url of downloadUrls) {
try {
const result = await downloadFile(url);
downloadResults.push(result);
} catch (error) {
downloadResults.push({
json: {
error: error.message,
url: url,
success: false
}
});
}
}
return downloadResults;
文档处理
PDF 操作
javascript
// PDF 文本提取
async function extractTextFromPDF(pdfBuffer) {
const pdf = require('pdf-parse');
try {
const data = await pdf(pdfBuffer);
return {
text: data.text,
pages: data.numpages,
info: data.info,
metadata: data.metadata,
version: data.version
};
} catch (error) {
throw new Error(`PDF parsing failed: ${error.message}`);
}
}
// PDF 分页处理
async function splitPDF(pdfBuffer, options = {}) {
const PDFDocument = require('pdf-lib').PDFDocument;
const pdfDoc = await PDFDocument.load(pdfBuffer);
const pageCount = pdfDoc.getPageCount();
const results = [];
if (options.splitByPages) {
// 按页数分割
const pagesPerSplit = options.pagesPerSplit || 1;
for (let i = 0; i < pageCount; i += pagesPerSplit) {
const newPdf = await PDFDocument.create();
const endPage = Math.min(i + pagesPerSplit, pageCount);
const pages = await newPdf.copyPages(pdfDoc,
Array.from({ length: endPage - i }, (_, idx) => i + idx)
);
pages.forEach(page => newPdf.addPage(page));
const pdfBytes = await newPdf.save();
results.push({
json: {
fileName: `split_${i + 1}-${endPage}.pdf`,
pageRange: `${i + 1}-${endPage}`,
totalPages: endPage - i
},
binary: {
data: {
data: Buffer.from(pdfBytes).toString('base64'),
mimeType: 'application/pdf',
fileName: `split_${i + 1}-${endPage}.pdf`
}
}
});
}
}
return results;
}
// 使用示例
const pdfData = items[0].binary.data;
const pdfBuffer = Buffer.from(pdfData.data, 'base64');
// 提取文本
const textContent = await extractTextFromPDF(pdfBuffer);
// 分割 PDF
const splitResults = await splitPDF(pdfBuffer, {
splitByPages: true,
pagesPerSplit: 2
});
return [{
json: {
originalFile: pdfData.fileName,
textContent: textContent,
splitFiles: splitResults.length
}
}];
Excel 处理
javascript
// Excel 文件读取
async function readExcelFile(excelBuffer, options = {}) {
const XLSX = require('xlsx');
const workbook = XLSX.read(excelBuffer, { type: 'buffer' });
const results = {};
workbook.SheetNames.forEach(sheetName => {
const worksheet = workbook.Sheets[sheetName];
// 转换为 JSON
const jsonData = XLSX.utils.sheet_to_json(worksheet, {
header: options.header || 1,
defval: options.defaultValue || '',
blankrows: options.includeBlankRows || false
});
results[sheetName] = {
data: jsonData,
rowCount: jsonData.length,
range: worksheet['!ref']
};
});
return results;
}
// Excel 文件写入
async function writeExcelFile(data, options = {}) {
const XLSX = require('xlsx');
const workbook = XLSX.utils.book_new();
Object.entries(data).forEach(([sheetName, sheetData]) => {
const worksheet = XLSX.utils.json_to_sheet(sheetData, {
header: options.header,
skipHeader: options.skipHeader || false
});
// 设置列宽
if (options.columnWidths) {
worksheet['!cols'] = options.columnWidths.map(width => ({ wch: width }));
}
XLSX.utils.book_append_sheet(workbook, worksheet, sheetName);
});
const excelBuffer = XLSX.write(workbook, {
type: 'buffer',
bookType: options.format || 'xlsx'
});
return excelBuffer;
}
// 使用示例
const excelData = items[0].binary.data;
const excelBuffer = Buffer.from(excelData.data, 'base64');
// 读取 Excel
const excelContent = await readExcelFile(excelBuffer, {
header: 1,
includeBlankRows: false
});
// 处理数据
const processedData = {};
Object.entries(excelContent).forEach(([sheetName, sheet]) => {
processedData[sheetName] = sheet.data.map(row => ({
...row,
processed: true,
processedAt: new Date().toISOString()
}));
});
// 写入新的 Excel 文件
const newExcelBuffer = await writeExcelFile(processedData, {
format: 'xlsx',
columnWidths: [20, 30, 15, 25]
});
return [{
json: {
originalSheets: Object.keys(excelContent),
totalRows: Object.values(excelContent).reduce((sum, sheet) => sum + sheet.rowCount, 0)
},
binary: {
data: {
data: newExcelBuffer.toString('base64'),
mimeType: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
fileName: 'processed_data.xlsx'
}
}
}];
图片处理
图片压缩和调整
javascript
// 图片处理
async function processImage(imageBuffer, options = {}) {
const sharp = require('sharp');
let image = sharp(imageBuffer);
// 获取图片信息
const metadata = await image.metadata();
// 调整大小
if (options.resize) {
image = image.resize(options.resize.width, options.resize.height, {
fit: options.resize.fit || 'inside',
withoutEnlargement: options.resize.withoutEnlargement !== false
});
}
// 压缩质量
if (options.quality) {
if (metadata.format === 'jpeg') {
image = image.jpeg({ quality: options.quality });
} else if (metadata.format === 'png') {
image = image.png({ quality: options.quality });
} else if (metadata.format === 'webp') {
image = image.webp({ quality: options.quality });
}
}
// 格式转换
if (options.format) {
switch (options.format) {
case 'jpeg':
image = image.jpeg();
break;
case 'png':
image = image.png();
break;
case 'webp':
image = image.webp();
break;
}
}
// 添加水印
if (options.watermark) {
const watermarkBuffer = Buffer.from(options.watermark.data, 'base64');
image = image.composite([{
input: watermarkBuffer,
gravity: options.watermark.position || 'southeast',
blend: options.watermark.blend || 'over'
}]);
}
const processedBuffer = await image.toBuffer();
const processedMetadata = await sharp(processedBuffer).metadata();
return {
buffer: processedBuffer,
metadata: processedMetadata,
originalMetadata: metadata,
compressionRatio: metadata.size / processedMetadata.size
};
}
// 批量图片处理
const imageProcessingOptions = {
resize: { width: 800, height: 600, fit: 'inside' },
quality: 80,
format: 'jpeg'
};
const processedImages = [];
for (const item of items) {
if (item.binary && item.binary.data) {
const imageBuffer = Buffer.from(item.binary.data.data, 'base64');
try {
const processed = await processImage(imageBuffer, imageProcessingOptions);
processedImages.push({
json: {
originalFile: item.binary.data.fileName,
originalSize: processed.originalMetadata.size,
processedSize: processed.metadata.size,
compressionRatio: processed.compressionRatio,
dimensions: {
original: `${processed.originalMetadata.width}x${processed.originalMetadata.height}`,
processed: `${processed.metadata.width}x${processed.metadata.height}`
}
},
binary: {
data: {
data: processed.buffer.toString('base64'),
mimeType: `image/${imageProcessingOptions.format}`,
fileName: item.binary.data.fileName.replace(/\.[^.]+$/, `.${imageProcessingOptions.format}`)
}
}
});
} catch (error) {
processedImages.push({
json: {
error: error.message,
originalFile: item.binary.data.fileName
}
});
}
}
}
return processedImages;
云存储集成
AWS S3 集成
javascript
// S3 文件操作
class S3FileManager {
constructor(config) {
this.config = config;
this.s3Client = new AWS.S3({
accessKeyId: config.accessKeyId,
secretAccessKey: config.secretAccessKey,
region: config.region
});
}
async uploadFile(fileData, key, options = {}) {
const params = {
Bucket: this.config.bucket,
Key: key,
Body: Buffer.from(fileData.data, 'base64'),
ContentType: fileData.mimeType,
Metadata: options.metadata || {},
ServerSideEncryption: options.encryption || 'AES256'
};
if (options.acl) {
params.ACL = options.acl;
}
const result = await this.s3Client.upload(params).promise();
return {
location: result.Location,
etag: result.ETag,
bucket: result.Bucket,
key: result.Key
};
}
async downloadFile(key) {
const params = {
Bucket: this.config.bucket,
Key: key
};
const result = await this.s3Client.getObject(params).promise();
return {
data: result.Body.toString('base64'),
mimeType: result.ContentType,
lastModified: result.LastModified,
etag: result.ETag,
metadata: result.Metadata
};
}
async deleteFile(key) {
const params = {
Bucket: this.config.bucket,
Key: key
};
await this.s3Client.deleteObject(params).promise();
return { deleted: true, key };
}
async listFiles(prefix = '', maxKeys = 1000) {
const params = {
Bucket: this.config.bucket,
Prefix: prefix,
MaxKeys: maxKeys
};
const result = await this.s3Client.listObjectsV2(params).promise();
return {
files: result.Contents.map(obj => ({
key: obj.Key,
size: obj.Size,
lastModified: obj.LastModified,
etag: obj.ETag
})),
hasMore: result.IsTruncated,
nextToken: result.NextContinuationToken
};
}
}
// 使用 S3 存储
const s3Manager = new S3FileManager({
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
region: process.env.AWS_REGION,
bucket: process.env.S3_BUCKET
});
const fileData = items[0].binary.data;
const s3Key = `uploads/${Date.now()}_${fileData.fileName}`;
const uploadResult = await s3Manager.uploadFile(fileData, s3Key, {
acl: 'private',
metadata: {
uploadedBy: 'n8n-workflow',
originalName: fileData.fileName
}
});
return [{ json: uploadResult }];
Google Drive 集成
javascript
// Google Drive 文件操作
async function uploadToGoogleDrive(fileData, options = {}) {
const { google } = require('googleapis');
const auth = new google.auth.GoogleAuth({
keyFile: process.env.GOOGLE_SERVICE_ACCOUNT_KEY,
scopes: ['https://www.googleapis.com/auth/drive.file']
});
const drive = google.drive({ version: 'v3', auth });
const fileMetadata = {
name: options.fileName || fileData.fileName,
parents: options.parentFolderId ? [options.parentFolderId] : undefined
};
const media = {
mimeType: fileData.mimeType,
body: Buffer.from(fileData.data, 'base64')
};
const response = await drive.files.create({
resource: fileMetadata,
media: media,
fields: 'id,name,size,mimeType,createdTime,webViewLink'
});
return response.data;
}
// 批量上传到 Google Drive
const uploadResults = [];
for (const item of items) {
if (item.binary && item.binary.data) {
try {
const result = await uploadToGoogleDrive(item.binary.data, {
fileName: `processed_${item.binary.data.fileName}`,
parentFolderId: process.env.GOOGLE_DRIVE_FOLDER_ID
});
uploadResults.push({
json: {
success: true,
fileId: result.id,
fileName: result.name,
size: result.size,
webViewLink: result.webViewLink
}
});
} catch (error) {
uploadResults.push({
json: {
success: false,
error: error.message,
fileName: item.binary.data.fileName
}
});
}
}
}
return uploadResults;
文件监控和管理
文件变化监控
javascript
// 文件系统监控
const chokidar = require('chokidar');
function setupFileWatcher(watchPath, options = {}) {
const watcher = chokidar.watch(watchPath, {
ignored: options.ignored || /[\/\\]\./,
persistent: true,
ignoreInitial: options.ignoreInitial !== false
});
const events = [];
watcher
.on('add', path => {
events.push({
type: 'file_added',
path: path,
timestamp: new Date().toISOString()
});
})
.on('change', path => {
events.push({
type: 'file_changed',
path: path,
timestamp: new Date().toISOString()
});
})
.on('unlink', path => {
events.push({
type: 'file_deleted',
path: path,
timestamp: new Date().toISOString()
});
});
return {
watcher,
getEvents: () => events,
clearEvents: () => events.length = 0,
close: () => watcher.close()
};
}
文件清理任务
javascript
// 自动文件清理
async function cleanupOldFiles(directory, options = {}) {
const fs = require('fs').promises;
const path = require('path');
const maxAge = options.maxAge || 30 * 24 * 60 * 60 * 1000; // 30天
const maxSize = options.maxSize || 1024 * 1024 * 1024; // 1GB
const filePattern = options.pattern || /\.(tmp|log|cache)$/;
const now = Date.now();
const results = {
scanned: 0,
deleted: 0,
totalSize: 0,
freedSpace: 0,
errors: []
};
try {
const files = await fs.readdir(directory);
for (const file of files) {
const filePath = path.join(directory, file);
try {
const stats = await fs.stat(filePath);
results.scanned++;
results.totalSize += stats.size;
const shouldDelete =
(now - stats.mtime.getTime() > maxAge) ||
(stats.size > maxSize) ||
(filePattern.test(file));
if (shouldDelete) {
await fs.unlink(filePath);
results.deleted++;
results.freedSpace += stats.size;
}
} catch (error) {
results.errors.push({
file: filePath,
error: error.message
});
}
}
} catch (error) {
results.errors.push({
directory: directory,
error: error.message
});
}
return results;
}
// 执行清理任务
const cleanupResult = await cleanupOldFiles('/tmp/n8n-files', {
maxAge: 7 * 24 * 60 * 60 * 1000, // 7天
pattern: /\.(tmp|temp|cache)$/
});
return [{ json: cleanupResult }];
小结
文件处理是 n8n 工作流的重要功能:
- 理解二进制数据:掌握文件在 n8n 中的存储和传递方式
- 选择合适的处理方式:根据文件类型选择对应的处理库
- 云存储集成:利用云服务实现文件的持久化存储
- 性能优化:合理处理大文件,避免内存溢出
- 错误处理:完善的异常处理和恢复机制
下一篇文章,我们将学习邮件与通知系统,这是自动化工作流中重要的沟通环节。
记住,文件处理涉及大量的 I/O 操作和内存使用,要特别注意性能优化和资源管理。对于大文件,考虑使用流式处理或分块处理的方式。