You've already forked DataMate
Merge branch 'main' of github.com:ModelEngine-Group/DataMate
This commit is contained in:
@@ -37,6 +37,7 @@
|
|||||||
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
|
<lombok-mapstruct-binding.version>0.2.0</lombok-mapstruct-binding.version>
|
||||||
<poi.version>5.4.0</poi.version>
|
<poi.version>5.4.0</poi.version>
|
||||||
<log4j2.version>2.21.1</log4j2.version>
|
<log4j2.version>2.21.1</log4j2.version>
|
||||||
|
<commons-compress.version>1.26.1</commons-compress.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<modules>
|
<modules>
|
||||||
@@ -151,6 +152,12 @@
|
|||||||
<version>2.6.6</version>
|
<version>2.6.6</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-compress</artifactId>
|
||||||
|
<version>${commons-compress.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,10 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest;
|
|||||||
import com.datamate.common.domain.model.FileUploadResult;
|
import com.datamate.common.domain.model.FileUploadResult;
|
||||||
import com.datamate.common.domain.service.FileService;
|
import com.datamate.common.domain.service.FileService;
|
||||||
import com.datamate.common.domain.utils.AnalyzerUtils;
|
import com.datamate.common.domain.utils.AnalyzerUtils;
|
||||||
|
import com.datamate.common.domain.utils.ArchiveAnalyzer;
|
||||||
import com.datamate.common.infrastructure.exception.BusinessAssert;
|
import com.datamate.common.infrastructure.exception.BusinessAssert;
|
||||||
import com.datamate.common.infrastructure.exception.BusinessException;
|
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||||
|
import com.datamate.common.infrastructure.exception.CommonErrorCode;
|
||||||
import com.datamate.common.infrastructure.exception.SystemErrorCode;
|
import com.datamate.common.infrastructure.exception.SystemErrorCode;
|
||||||
import com.datamate.common.interfaces.PagedResponse;
|
import com.datamate.common.interfaces.PagedResponse;
|
||||||
import com.datamate.common.interfaces.PagingQuery;
|
import com.datamate.common.interfaces.PagingQuery;
|
||||||
@@ -213,6 +215,9 @@ public class DatasetFileApplicationService {
|
|||||||
*/
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) {
|
public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) {
|
||||||
|
if (Objects.isNull(datasetRepository.getById(datasetId))) {
|
||||||
|
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
|
||||||
|
}
|
||||||
ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
|
ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
|
||||||
request.setUploadPath(datasetBasePath + File.separator + datasetId);
|
request.setUploadPath(datasetBasePath + File.separator + datasetId);
|
||||||
request.setTotalFileNum(chunkUploadRequest.getTotalFileNum());
|
request.setTotalFileNum(chunkUploadRequest.getTotalFileNum());
|
||||||
@@ -225,7 +230,7 @@ public class DatasetFileApplicationService {
|
|||||||
String checkInfoJson = objectMapper.writeValueAsString(checkInfo);
|
String checkInfoJson = objectMapper.writeValueAsString(checkInfo);
|
||||||
request.setCheckInfo(checkInfoJson);
|
request.setCheckInfo(checkInfoJson);
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new IllegalArgumentException("Failed to serialize checkInfo to JSON", e);
|
log.warn("Failed to serialize checkInfo to JSON", e);
|
||||||
}
|
}
|
||||||
return fileService.preUpload(request);
|
return fileService.preUpload(request);
|
||||||
}
|
}
|
||||||
@@ -238,31 +243,54 @@ public class DatasetFileApplicationService {
|
|||||||
@Transactional
|
@Transactional
|
||||||
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
|
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
|
||||||
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
|
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
|
||||||
saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId);
|
saveFileInfoToDb(uploadResult, datasetId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) {
|
private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) {
|
||||||
if (Objects.isNull(fileUploadResult.getSavedFile())) {
|
if (Objects.isNull(fileUploadResult.getSavedFile())) {
|
||||||
// 文件切片上传没有完成
|
// 文件切片上传没有完成
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
DatasetFileUploadCheckInfo checkInfo;
|
||||||
|
try {
|
||||||
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class);
|
||||||
|
if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) {
|
||||||
|
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
|
||||||
|
}
|
||||||
|
} catch (IllegalArgumentException | JsonProcessingException e) {
|
||||||
|
log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e);
|
||||||
|
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
|
||||||
|
}
|
||||||
|
List<FileUploadResult> files;
|
||||||
|
if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) {
|
||||||
|
files = ArchiveAnalyzer.process(fileUploadResult);
|
||||||
|
} else {
|
||||||
|
files = Collections.singletonList(fileUploadResult);
|
||||||
|
}
|
||||||
|
addFileToDataset(datasetId, files);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addFileToDataset(String datasetId, List<FileUploadResult> unpacked) {
|
||||||
Dataset dataset = datasetRepository.getById(datasetId);
|
Dataset dataset = datasetRepository.getById(datasetId);
|
||||||
File savedFile = fileUploadResult.getSavedFile();
|
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
|
||||||
LocalDateTime currentTime = LocalDateTime.now();
|
for (FileUploadResult file : unpacked) {
|
||||||
DatasetFile datasetFile = DatasetFile.builder()
|
File savedFile = file.getSavedFile();
|
||||||
|
LocalDateTime currentTime = LocalDateTime.now();
|
||||||
|
DatasetFile datasetFile = DatasetFile.builder()
|
||||||
.id(UUID.randomUUID().toString())
|
.id(UUID.randomUUID().toString())
|
||||||
.datasetId(datasetId)
|
.datasetId(datasetId)
|
||||||
.fileSize(savedFile.length())
|
.fileSize(savedFile.length())
|
||||||
.uploadTime(currentTime)
|
.uploadTime(currentTime)
|
||||||
.lastAccessTime(currentTime)
|
.lastAccessTime(currentTime)
|
||||||
.fileName(uploadFile.getFileName())
|
.fileName(file.getFileName())
|
||||||
.filePath(savedFile.getPath())
|
.filePath(savedFile.getPath())
|
||||||
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
|
.fileType(AnalyzerUtils.getExtension(file.getFileName()))
|
||||||
.build();
|
.build();
|
||||||
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
|
setDatasetFileId(datasetFile, dataset);
|
||||||
setDatasetFileId(datasetFile, dataset);
|
datasetFileRepository.saveOrUpdate(datasetFile);
|
||||||
datasetFileRepository.saveOrUpdate(datasetFile);
|
dataset.addFile(datasetFile);
|
||||||
dataset.addFile(datasetFile);
|
}
|
||||||
dataset.active();
|
dataset.active();
|
||||||
datasetRepository.updateById(dataset);
|
datasetRepository.updateById(dataset);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
package com.datamate.datamanagement.domain.model.dataset;
|
package com.datamate.datamanagement.domain.model.dataset;
|
||||||
|
|
||||||
import com.datamate.common.domain.model.UploadCheckInfo;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -9,7 +10,9 @@ import lombok.Setter;
|
|||||||
*/
|
*/
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
public class DatasetFileUploadCheckInfo extends UploadCheckInfo {
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class DatasetFileUploadCheckInfo {
|
||||||
/** 数据集id */
|
/** 数据集id */
|
||||||
private String datasetId;
|
private String datasetId;
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,6 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-compress</artifactId>
|
<artifactId>commons-compress</artifactId>
|
||||||
<version>1.26.1</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- 核心服务依赖 -->
|
<!-- 核心服务依赖 -->
|
||||||
|
|||||||
@@ -42,5 +42,9 @@
|
|||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-compress</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@@ -1,7 +0,0 @@
|
|||||||
package com.datamate.common.domain.model;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 上传检查信息基类
|
|
||||||
*/
|
|
||||||
public abstract class UploadCheckInfo {
|
|
||||||
}
|
|
||||||
@@ -4,6 +4,8 @@ import com.datamate.common.domain.model.ChunkUploadPreRequest;
|
|||||||
import com.datamate.common.domain.model.ChunkUploadRequest;
|
import com.datamate.common.domain.model.ChunkUploadRequest;
|
||||||
import com.datamate.common.domain.model.FileUploadResult;
|
import com.datamate.common.domain.model.FileUploadResult;
|
||||||
import com.datamate.common.domain.utils.ChunksSaver;
|
import com.datamate.common.domain.utils.ChunksSaver;
|
||||||
|
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||||
|
import com.datamate.common.infrastructure.exception.CommonErrorCode;
|
||||||
import com.datamate.common.infrastructure.mapper.ChunkUploadRequestMapper;
|
import com.datamate.common.infrastructure.mapper.ChunkUploadRequestMapper;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@@ -46,7 +48,7 @@ public class FileService {
|
|||||||
uploadFileRequest.setFileSize(uploadFileRequest.getFile().getSize());
|
uploadFileRequest.setFileSize(uploadFileRequest.getFile().getSize());
|
||||||
ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(uploadFileRequest.getReqId());
|
ChunkUploadPreRequest preRequest = chunkUploadRequestMapper.findById(uploadFileRequest.getReqId());
|
||||||
if (preRequest == null || preRequest.isUploadComplete() || preRequest.isRequestTimeout()) {
|
if (preRequest == null || preRequest.isUploadComplete() || preRequest.isRequestTimeout()) {
|
||||||
throw new IllegalArgumentException("预上传请求不存在");
|
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
|
||||||
}
|
}
|
||||||
File savedFile;
|
File savedFile;
|
||||||
if (uploadFileRequest.getTotalChunkNum() > 1) {
|
if (uploadFileRequest.getTotalChunkNum() > 1) {
|
||||||
@@ -55,7 +57,7 @@ public class FileService {
|
|||||||
savedFile = uploadFile(uploadFileRequest, preRequest);
|
savedFile = uploadFile(uploadFileRequest, preRequest);
|
||||||
}
|
}
|
||||||
if (chunkUploadRequestMapper.update(preRequest) == 0) {
|
if (chunkUploadRequestMapper.update(preRequest) == 0) {
|
||||||
throw new IllegalArgumentException("预上传请求不存在");
|
throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST);
|
||||||
}
|
}
|
||||||
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
|
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
|
||||||
if (isFinish) {
|
if (isFinish) {
|
||||||
|
|||||||
@@ -37,4 +37,15 @@ public class AnalyzerUtils {
|
|||||||
}
|
}
|
||||||
return filename.substring(firstDotIndex + 1).toLowerCase(Locale.ROOT);
|
return filename.substring(firstDotIndex + 1).toLowerCase(Locale.ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断是否为压缩包
|
||||||
|
*
|
||||||
|
* @param filePath 文件路径
|
||||||
|
* @return 返回信息
|
||||||
|
*/
|
||||||
|
public static boolean isPackage(String filePath) {
|
||||||
|
String extension = getExtension(filePath);
|
||||||
|
return extension.toLowerCase(Locale.ROOT).equals(TYPE_ZIP) || extension.toLowerCase(Locale.ROOT).equals(TYPE_TAR_GZ);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,192 @@
|
|||||||
|
package com.datamate.common.domain.utils;
|
||||||
|
|
||||||
|
import com.datamate.common.domain.model.FileUploadResult;
|
||||||
|
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||||
|
import com.datamate.common.infrastructure.exception.SystemErrorCode;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.archivers.ArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.ArchiveInputStream;
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
|
||||||
|
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
|
||||||
|
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for validating and unpacking archive files.
|
||||||
|
*
|
||||||
|
* @since 2023-11-17
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class ArchiveAnalyzer {
|
||||||
|
private static final int DEFAULT_BUFFER_SIZE = 4096;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process list.
|
||||||
|
*
|
||||||
|
* @param fileDto The uploaded file DTO
|
||||||
|
* @return the list
|
||||||
|
*/
|
||||||
|
public static List<FileUploadResult> process(FileUploadResult fileDto) {
|
||||||
|
log.info("Start unpacking [{}]", fileDto.getFileName());
|
||||||
|
File file = fileDto.getSavedFile();
|
||||||
|
Path archivePath;
|
||||||
|
try {
|
||||||
|
archivePath = Paths.get(file.getCanonicalPath());
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("Failed to get the archive file path.");
|
||||||
|
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to get the archive file path.");
|
||||||
|
}
|
||||||
|
|
||||||
|
List<FileUploadResult> unpacked;
|
||||||
|
String extension = AnalyzerUtils.getExtension(fileDto.getFileName());
|
||||||
|
if (AnalyzerUtils.TYPE_ZIP.equalsIgnoreCase(extension)) {
|
||||||
|
log.info("ZIP unpacking [{}]", fileDto.getFileName());
|
||||||
|
unpacked = processZip(archivePath);
|
||||||
|
log.info("ZIP unpacking FINISHED [{}]", fileDto.getFileName());
|
||||||
|
} else if (AnalyzerUtils.TYPE_TAR_GZ.equalsIgnoreCase(extension)) {
|
||||||
|
unpacked = processTarGz(archivePath);
|
||||||
|
} else {
|
||||||
|
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unrecognized archive format.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!archivePath.toFile().delete()) {
|
||||||
|
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Unable to delete the archive file " + archivePath.toAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Finished unpacking [{}]", fileDto.getFileName());
|
||||||
|
return unpacked;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<FileUploadResult> processZip(Path archivePath) {
|
||||||
|
try (ArchiveInputStream<ZipArchiveEntry> inputStream = new ZipArchiveInputStream(
|
||||||
|
new BufferedInputStream(Files.newInputStream(archivePath)))) {
|
||||||
|
return unpackArchive(inputStream, archivePath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("Failed to unpack zip archive:", e);
|
||||||
|
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack zip archive.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<FileUploadResult> processTarGz(Path archivePath) {
|
||||||
|
try (ArchiveInputStream<TarArchiveEntry> inputStream = new TarArchiveInputStream(
|
||||||
|
new GzipCompressorInputStream(new BufferedInputStream(Files.newInputStream(archivePath))),
|
||||||
|
StandardCharsets.UTF_8.toString())) {
|
||||||
|
return unpackArchive(inputStream, archivePath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("Failed to unpack tar.gz archive:", e);
|
||||||
|
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Failed to unpack tar.gz archive.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<FileUploadResult> unpackArchive(ArchiveInputStream<?> archiveInputStream, Path archivePath) throws IOException {
|
||||||
|
List<FileUploadResult> unpacked = new ArrayList<>();
|
||||||
|
long unpackedSize = 0L;
|
||||||
|
try {
|
||||||
|
ArchiveEntry archiveEntry;
|
||||||
|
int entryCount = 0;
|
||||||
|
while ((archiveEntry = archiveInputStream.getNextEntry()) != null) {
|
||||||
|
if (isSymlink(archiveEntry)) {
|
||||||
|
// 解压时跳过symlink文件
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
entryCount++;
|
||||||
|
if (checkUnpackSizeAndFileSize(entryCount, unpacked) || checkVersionSize(unpackedSize, archiveEntry.getSize())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (!archiveEntry.isDirectory()) {
|
||||||
|
unpackedSize = addFileAndCountFileSize(archiveInputStream, archiveEntry, unpacked,
|
||||||
|
unpackedSize, archivePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
unpacked.forEach(v -> deleteFile(v.getSavedFile()));
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
return unpacked;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean checkVersionSize(long unpackedSize, long currFileSize) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long addFileAndCountFileSize(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry,
|
||||||
|
List<FileUploadResult> unpacked, long unpackedSize, Path archivePath) throws IOException {
|
||||||
|
Optional<FileUploadResult> uploadFileDto = extractEntity(archiveInputStream, archiveEntry, archivePath);
|
||||||
|
long newSize = unpackedSize;
|
||||||
|
if (uploadFileDto.isPresent()) {
|
||||||
|
FileUploadResult dto = uploadFileDto.get();
|
||||||
|
unpacked.add(dto);
|
||||||
|
newSize += dto.getSavedFile().length();
|
||||||
|
}
|
||||||
|
return newSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean checkUnpackSizeAndFileSize(int entryCount, List<FileUploadResult> unpacked) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Optional<FileUploadResult> extractEntity(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry, Path archivePath)
|
||||||
|
throws IOException {
|
||||||
|
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
|
||||||
|
Path path = Paths.get(archivePath.getParent().toString(), archiveEntry.getName());
|
||||||
|
File file = path.toFile();
|
||||||
|
long fileSize = 0L;
|
||||||
|
String extension = AnalyzerUtils.getExtension(archiveEntry.getName());
|
||||||
|
|
||||||
|
long supportFileSize = 1024*1024*1024; // 上传大小暂定为1个G
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
|
||||||
|
int byteRead;
|
||||||
|
while ((byteRead = archiveInputStream.read(buffer)) != -1) {
|
||||||
|
outputStream.write(buffer, 0, byteRead);
|
||||||
|
fileSize += byteRead;
|
||||||
|
if (fileSize > supportFileSize) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("error happened while write entry to file system");
|
||||||
|
file.delete();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fileSize > supportFileSize) {
|
||||||
|
// 文件大小超过限制,删除
|
||||||
|
log.info("file {} size exceeds limit", archiveEntry.getName());
|
||||||
|
file.delete();
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
return Optional.of(FileUploadResult.builder().savedFile(file).fileName(CommonUtils.trimFilePath(archiveEntry.getName())).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void deleteFile(File file) {
|
||||||
|
Path fileToDeletePath = Paths.get(file.getPath());
|
||||||
|
if (Files.exists(fileToDeletePath)) {
|
||||||
|
try {
|
||||||
|
Files.delete(fileToDeletePath);
|
||||||
|
} catch (IOException e1) {
|
||||||
|
log.error("Failed to delete file.", e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isSymlink(ArchiveEntry archiveEntry) {
|
||||||
|
if (archiveEntry instanceof TarArchiveEntry) {
|
||||||
|
return ((TarArchiveEntry) archiveEntry).isSymbolicLink();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package com.datamate.common.infrastructure.exception;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CommonErrorCode
|
||||||
|
*
|
||||||
|
* @since 2025/12/5
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum CommonErrorCode implements ErrorCode{
|
||||||
|
PRE_UPLOAD_REQUEST_NOT_EXIST("common.0101", "预上传请求不存在");
|
||||||
|
private final String code;
|
||||||
|
private final String message;
|
||||||
|
}
|
||||||
@@ -23,6 +23,13 @@ data:
|
|||||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
location /api/evaluation/ {
|
||||||
|
proxy_pass http://datamate-backend-python:18000/api/evaluation/;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
proxy_set_header X-Real-IP $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
|
}
|
||||||
|
|
||||||
location /api/annotation/ {
|
location /api/annotation/ {
|
||||||
proxy_pass http://datamate-backend-python:18000/api/annotation/;
|
proxy_pass http://datamate-backend-python:18000/api/annotation/;
|
||||||
proxy_set_header Host $host;
|
proxy_set_header Host $host;
|
||||||
@@ -91,6 +98,13 @@ data:
|
|||||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
location /api/evaluation/ {
|
||||||
|
proxy_pass http://datamate-backend-python:18000/api/evaluation/;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
proxy_set_header X-Real-IP $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
|
}
|
||||||
|
|
||||||
location /api/annotation/ {
|
location /api/annotation/ {
|
||||||
proxy_pass http://datamate-backend-python:18000/api/annotation/;
|
proxy_pass http://datamate-backend-python:18000/api/annotation/;
|
||||||
proxy_set_header Host $host;
|
proxy_set_header Host $host;
|
||||||
@@ -110,4 +124,4 @@ data:
|
|||||||
try_files $uri $uri/ /index.html;
|
try_files $uri $uri/ /index.html;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{{- end }}
|
{{- end }}
|
||||||
|
|||||||
@@ -95,6 +95,24 @@ backend:
|
|||||||
- name: operator-volume
|
- name: operator-volume
|
||||||
mountPath: /operators
|
mountPath: /operators
|
||||||
|
|
||||||
|
backend-python:
|
||||||
|
env:
|
||||||
|
- name: DB_PASSWORD
|
||||||
|
value: *dbPass
|
||||||
|
volumes:
|
||||||
|
- *datasetVolume
|
||||||
|
- *flowVolume
|
||||||
|
- *logVolume
|
||||||
|
volumeMounts:
|
||||||
|
- name: dataset-volume
|
||||||
|
mountPath: /dataset
|
||||||
|
- name: flow-volume
|
||||||
|
mountPath: /flow
|
||||||
|
- name: log-volume
|
||||||
|
mountPath: /var/log/datamate
|
||||||
|
- name: operator-volume
|
||||||
|
mountPath: /operators
|
||||||
|
|
||||||
frontend:
|
frontend:
|
||||||
service:
|
service:
|
||||||
type: NodePort
|
type: NodePort
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ export function useFileSliceUpload(
|
|||||||
controller,
|
controller,
|
||||||
size: 0,
|
size: 0,
|
||||||
updateEvent: detail.updateEvent,
|
updateEvent: detail.updateEvent,
|
||||||
|
hasArchive: detail.hasArchive,
|
||||||
};
|
};
|
||||||
taskListRef.current = [task, ...taskListRef.current];
|
taskListRef.current = [task, ...taskListRef.current];
|
||||||
|
|
||||||
@@ -112,6 +113,7 @@ export function useFileSliceUpload(
|
|||||||
totalFileNum: files.length,
|
totalFileNum: files.length,
|
||||||
totalSize,
|
totalSize,
|
||||||
datasetId: task.key,
|
datasetId: task.key,
|
||||||
|
hasArchive: task.hasArchive,
|
||||||
});
|
});
|
||||||
|
|
||||||
const newTask: TaskItem = {
|
const newTask: TaskItem = {
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ export default function DatasetDetail() {
|
|||||||
<Tabs activeKey={activeTab} items={tabList} onChange={setActiveTab} />
|
<Tabs activeKey={activeTab} items={tabList} onChange={setActiveTab} />
|
||||||
<div className="h-full overflow-auto">
|
<div className="h-full overflow-auto">
|
||||||
{activeTab === "overview" && (
|
{activeTab === "overview" && (
|
||||||
<Overview dataset={dataset} filesOperation={filesOperation} />
|
<Overview dataset={dataset} filesOperation={filesOperation} fetchDataset={fetchDataset}/>
|
||||||
)}
|
)}
|
||||||
{activeTab === "lineage" && <DataLineageFlow dataset={dataset} />}
|
{activeTab === "lineage" && <DataLineageFlow dataset={dataset} />}
|
||||||
{activeTab === "quality" && <DataQuality />}
|
{activeTab === "quality" && <DataQuality />}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { Select, Input, Form, Radio, Modal, Button, UploadFile } from "antd";
|
import { Select, Input, Form, Radio, Modal, Button, UploadFile, Switch } from "antd";
|
||||||
import { InboxOutlined } from "@ant-design/icons";
|
import { InboxOutlined } from "@ant-design/icons";
|
||||||
import { dataSourceOptions } from "../../dataset.const";
|
import { dataSourceOptions } from "../../dataset.const";
|
||||||
import { Dataset, DataSource } from "../../dataset.model";
|
import { Dataset, DataSource } from "../../dataset.model";
|
||||||
@@ -51,6 +51,7 @@ export default function ImportConfiguration({
|
|||||||
dataset,
|
dataset,
|
||||||
files: fileSliceList,
|
files: fileSliceList,
|
||||||
updateEvent,
|
updateEvent,
|
||||||
|
hasArchive: importConfig.hasArchive,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
@@ -195,29 +196,39 @@ export default function ImportConfiguration({
|
|||||||
|
|
||||||
{/* Local Upload Component */}
|
{/* Local Upload Component */}
|
||||||
{importConfig?.source === DataSource.UPLOAD && (
|
{importConfig?.source === DataSource.UPLOAD && (
|
||||||
<Form.Item
|
<>
|
||||||
label="上传文件"
|
<Form.Item
|
||||||
name="files"
|
label="自动解压上传的压缩包"
|
||||||
rules={[
|
name="hasArchive"
|
||||||
{
|
valuePropName="checked"
|
||||||
required: true,
|
initialValue={true}
|
||||||
message: "请上传文件",
|
|
||||||
},
|
|
||||||
]}
|
|
||||||
>
|
|
||||||
<Dragger
|
|
||||||
className="w-full"
|
|
||||||
onRemove={handleRemoveFile}
|
|
||||||
beforeUpload={handleBeforeUpload}
|
|
||||||
multiple
|
|
||||||
>
|
>
|
||||||
<p className="ant-upload-drag-icon">
|
<Switch />
|
||||||
<InboxOutlined />
|
</Form.Item>
|
||||||
</p>
|
<Form.Item
|
||||||
<p className="ant-upload-text">本地文件上传</p>
|
label="上传文件"
|
||||||
<p className="ant-upload-hint">拖拽文件到此处或点击选择文件</p>
|
name="files"
|
||||||
</Dragger>
|
rules={[
|
||||||
</Form.Item>
|
{
|
||||||
|
required: true,
|
||||||
|
message: "请上传文件",
|
||||||
|
},
|
||||||
|
]}
|
||||||
|
>
|
||||||
|
<Dragger
|
||||||
|
className="w-full"
|
||||||
|
onRemove={handleRemoveFile}
|
||||||
|
beforeUpload={handleBeforeUpload}
|
||||||
|
multiple
|
||||||
|
>
|
||||||
|
<p className="ant-upload-drag-icon">
|
||||||
|
<InboxOutlined />
|
||||||
|
</p>
|
||||||
|
<p className="ant-upload-text">本地文件上传</p>
|
||||||
|
<p className="ant-upload-hint">拖拽文件到此处或点击选择文件</p>
|
||||||
|
</Dragger>
|
||||||
|
</Form.Item>
|
||||||
|
</>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
{/* Target Configuration */}
|
{/* Target Configuration */}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import { formatBytes, formatDateTime } from "@/utils/unit";
|
|||||||
import { Download, Trash2 } from "lucide-react";
|
import { Download, Trash2 } from "lucide-react";
|
||||||
import { datasetTypeMap } from "../../dataset.const";
|
import { datasetTypeMap } from "../../dataset.const";
|
||||||
|
|
||||||
export default function Overview({ dataset, filesOperation }) {
|
export default function Overview({ dataset, filesOperation, fetchDataset }) {
|
||||||
const {
|
const {
|
||||||
fileList,
|
fileList,
|
||||||
pagination,
|
pagination,
|
||||||
@@ -73,6 +73,11 @@ export default function Overview({ dataset, filesOperation }) {
|
|||||||
label: "存储路径",
|
label: "存储路径",
|
||||||
children: dataset.targetLocation || "未知",
|
children: dataset.targetLocation || "未知",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
key: "pvcName",
|
||||||
|
label: "存储名称",
|
||||||
|
children: dataset.pvcName || "未知",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
key: "createdAt",
|
key: "createdAt",
|
||||||
label: "创建时间",
|
label: "创建时间",
|
||||||
@@ -129,7 +134,11 @@ export default function Overview({ dataset, filesOperation }) {
|
|||||||
<Button
|
<Button
|
||||||
size="small"
|
size="small"
|
||||||
type="link"
|
type="link"
|
||||||
onClick={() => handleDeleteFile(record)}
|
onClick={async () => {
|
||||||
|
await handleDeleteFile(record);
|
||||||
|
fetchDataset()
|
||||||
|
}
|
||||||
|
}
|
||||||
>
|
>
|
||||||
删除
|
删除
|
||||||
</Button>
|
</Button>
|
||||||
|
|||||||
@@ -101,4 +101,5 @@ export interface TaskItem {
|
|||||||
cancelFn?: () => void;
|
cancelFn?: () => void;
|
||||||
updateEvent?: string;
|
updateEvent?: string;
|
||||||
size?: number;
|
size?: number;
|
||||||
|
hasArchive?: boolean;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user