You've already forked DataMate
refactor(utils): 重构文件流式分割上传功能
- 将 streamSplitAndUpload 函数拆分为独立的 processFileLines 函数 - 简化文件按行处理逻辑,移除冗余的行收集和缓存机制 - 优化并发上传实现,使用 Promise 集合管理上传任务 - 修复上传过程中断信号处理和错误传播机制 - 统一进度回调参数结构,改进字节和行数跟踪逻辑 - 优化空行跳过计数和上传结果返回值处理
This commit is contained in:
@@ -417,6 +417,63 @@ export interface StreamUploadResult {
|
||||
skippedEmptyCount: number;
|
||||
}
|
||||
|
||||
async function processFileLines(
|
||||
file: File,
|
||||
chunkSize: number,
|
||||
signal: AbortSignal | undefined,
|
||||
onLine?: (line: string, index: number) => Promise<void> | void,
|
||||
onProgress?: (currentBytes: number, totalBytes: number, processedLines: number) => void
|
||||
): Promise<{ lineCount: number; skippedEmptyCount: number }> {
|
||||
const fileSize = file.size;
|
||||
let offset = 0;
|
||||
let buffer = "";
|
||||
let skippedEmptyCount = 0;
|
||||
let lineIndex = 0;
|
||||
|
||||
while (offset < fileSize) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const end = Math.min(offset + chunkSize, fileSize);
|
||||
const chunk = file.slice(offset, end);
|
||||
const text = await readFileAsText(chunk);
|
||||
const combined = buffer + text;
|
||||
const lines = combined.split(/\r?\n/);
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
if (!line.trim()) {
|
||||
skippedEmptyCount++;
|
||||
continue;
|
||||
}
|
||||
const currentIndex = lineIndex;
|
||||
lineIndex += 1;
|
||||
if (onLine) {
|
||||
await onLine(line, currentIndex);
|
||||
}
|
||||
}
|
||||
|
||||
offset = end;
|
||||
onProgress?.(offset, fileSize, lineIndex);
|
||||
}
|
||||
|
||||
if (buffer.trim()) {
|
||||
const currentIndex = lineIndex;
|
||||
lineIndex += 1;
|
||||
if (onLine) {
|
||||
await onLine(buffer, currentIndex);
|
||||
}
|
||||
} else if (buffer.length > 0) {
|
||||
skippedEmptyCount++;
|
||||
}
|
||||
|
||||
return { lineCount: lineIndex, skippedEmptyCount };
|
||||
}
|
||||
|
||||
export async function streamSplitAndUpload(
|
||||
file: File,
|
||||
uploadFn: (formData: FormData, config?: { onUploadProgress?: (e: { loaded: number; total: number }) => void }) => Promise<unknown>,
|
||||
@@ -435,11 +492,8 @@ export async function streamSplitAndUpload(
|
||||
} = options;
|
||||
|
||||
const fileSize = file.size;
|
||||
let offset = 0;
|
||||
let buffer = "";
|
||||
let uploadedCount = 0;
|
||||
let skippedEmptyCount = 0;
|
||||
let currentBytes = 0;
|
||||
|
||||
// 获取文件名基础部分和扩展名
|
||||
const originalFileName = fileNamePrefix || file.name;
|
||||
@@ -447,57 +501,11 @@ export async function streamSplitAndUpload(
|
||||
const baseName = lastDotIndex > 0 ? originalFileName.slice(0, lastDotIndex) : originalFileName;
|
||||
const fileExtension = lastDotIndex > 0 ? originalFileName.slice(lastDotIndex) : "";
|
||||
|
||||
// 收集所有需要上传的行
|
||||
const pendingLines: { line: string; index: number }[] = [];
|
||||
let lineIndex = 0;
|
||||
|
||||
// 逐块读取文件并收集非空行
|
||||
while (offset < fileSize) {
|
||||
// 检查是否已取消
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
const end = Math.min(offset + chunkSize, fileSize);
|
||||
const chunk = file.slice(offset, end);
|
||||
const text = await readFileAsText(chunk);
|
||||
|
||||
// 将新读取的内容追加到 buffer
|
||||
const combined = buffer + text;
|
||||
|
||||
// 按换行符分割(支持 \n 和 \r\n)
|
||||
const lines = combined.split(/\r?\n/);
|
||||
|
||||
// 保留最后一行(可能不完整)
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
// 收集完整行(跳过空行)
|
||||
for (const line of lines) {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
if (!line.trim()) {
|
||||
skippedEmptyCount++;
|
||||
continue;
|
||||
}
|
||||
pendingLines.push({ line, index: lineIndex++ });
|
||||
}
|
||||
|
||||
currentBytes = end;
|
||||
offset = end;
|
||||
|
||||
// 每处理完一个 chunk,更新进度
|
||||
onProgress?.(currentBytes, fileSize, uploadedCount);
|
||||
}
|
||||
|
||||
// 处理最后剩余的 buffer(如果文件不以换行符结尾)
|
||||
if (buffer.trim()) {
|
||||
pendingLines.push({ line: buffer, index: lineIndex++ });
|
||||
} else if (buffer.length > 0) {
|
||||
skippedEmptyCount++;
|
||||
}
|
||||
|
||||
const totalFileNum = pendingLines.length;
|
||||
let resolvedReqId = initialReqId;
|
||||
if (!resolvedReqId) {
|
||||
const scanResult = await processFileLines(file, chunkSize, signal);
|
||||
const totalFileNum = scanResult.lineCount;
|
||||
skippedEmptyCount = scanResult.skippedEmptyCount;
|
||||
if (totalFileNum === 0) {
|
||||
return {
|
||||
uploadedCount: 0,
|
||||
@@ -505,13 +513,9 @@ export async function streamSplitAndUpload(
|
||||
skippedEmptyCount,
|
||||
};
|
||||
}
|
||||
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
|
||||
let resolvedReqId = initialReqId;
|
||||
if (!resolvedReqId) {
|
||||
if (!resolveReqId) {
|
||||
throw new Error("Missing pre-upload request id");
|
||||
}
|
||||
@@ -521,6 +525,9 @@ export async function streamSplitAndUpload(
|
||||
}
|
||||
onReqIdResolved?.(resolvedReqId);
|
||||
}
|
||||
if (!resolvedReqId) {
|
||||
throw new Error("Missing pre-upload request id");
|
||||
}
|
||||
|
||||
/**
|
||||
* 上传单行内容
|
||||
@@ -573,55 +580,65 @@ export async function streamSplitAndUpload(
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 带并发控制的上传队列执行器
|
||||
* 使用任务队列模式,确保不会同时启动所有上传任务
|
||||
*/
|
||||
async function executeUploadsWithConcurrency(): Promise<void> {
|
||||
const lines = [...pendingLines];
|
||||
let currentIndex = 0;
|
||||
let activeCount = 0;
|
||||
let resolvedCount = 0;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
function tryStartNext() {
|
||||
// 检查是否已完成
|
||||
if (resolvedCount >= lines.length) {
|
||||
if (activeCount === 0) {
|
||||
resolve();
|
||||
const inFlight = new Set<Promise<void>>();
|
||||
let uploadError: unknown = null;
|
||||
const enqueueUpload = async (line: string, index: number) => {
|
||||
if (signal?.aborted) {
|
||||
throw new Error("Upload cancelled");
|
||||
}
|
||||
return;
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
|
||||
// 启动新的上传任务,直到达到最大并发数
|
||||
while (activeCount < maxConcurrency && currentIndex < lines.length) {
|
||||
const { line, index } = lines[currentIndex++];
|
||||
activeCount++;
|
||||
|
||||
uploadLine(line, index)
|
||||
const uploadPromise = uploadLine(line, index)
|
||||
.then(() => {
|
||||
uploadedCount++;
|
||||
onProgress?.(fileSize, fileSize, uploadedCount);
|
||||
})
|
||||
.catch((err) => {
|
||||
reject(err);
|
||||
})
|
||||
.finally(() => {
|
||||
activeCount--;
|
||||
resolvedCount++;
|
||||
// 尝试启动下一个任务
|
||||
tryStartNext();
|
||||
uploadError = err;
|
||||
});
|
||||
inFlight.add(uploadPromise);
|
||||
uploadPromise.finally(() => inFlight.delete(uploadPromise));
|
||||
if (inFlight.size >= maxConcurrency) {
|
||||
await Promise.race(inFlight);
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let uploadResult: { lineCount: number; skippedEmptyCount: number } | null = null;
|
||||
try {
|
||||
uploadResult = await processFileLines(
|
||||
file,
|
||||
chunkSize,
|
||||
signal,
|
||||
enqueueUpload,
|
||||
(currentBytes, totalBytes) => {
|
||||
onProgress?.(currentBytes, totalBytes, uploadedCount);
|
||||
}
|
||||
);
|
||||
if (uploadError) {
|
||||
throw uploadError;
|
||||
}
|
||||
} finally {
|
||||
if (inFlight.size > 0) {
|
||||
await Promise.allSettled(inFlight);
|
||||
}
|
||||
}
|
||||
|
||||
// 开始执行
|
||||
tryStartNext();
|
||||
});
|
||||
if (!uploadResult || (initialReqId && uploadResult.lineCount === 0)) {
|
||||
return {
|
||||
uploadedCount: 0,
|
||||
totalBytes: fileSize,
|
||||
skippedEmptyCount: uploadResult?.skippedEmptyCount ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
// 使用并发控制执行所有上传
|
||||
await executeUploadsWithConcurrency();
|
||||
if (!initialReqId) {
|
||||
skippedEmptyCount = skippedEmptyCount || uploadResult.skippedEmptyCount;
|
||||
} else {
|
||||
skippedEmptyCount = uploadResult.skippedEmptyCount;
|
||||
}
|
||||
|
||||
return {
|
||||
uploadedCount,
|
||||
|
||||
Reference in New Issue
Block a user