feat: 支持npu自动扩缩容 (#197)

* feat: npu动态调度

* feat: 数据集分页优化

* feat: 支持npu自动扩缩容

* feat: 支持npu自动扩缩容

* feat: 支持npu自动扩缩容

* feat: clean code
This commit is contained in:
hhhhsc701
2025-12-24 18:03:30 +08:00
committed by GitHub
parent de7f853c83
commit 1c507ac98a
6 changed files with 239 additions and 91 deletions

View File

@@ -20,12 +20,19 @@ import com.datamate.datamanagement.common.enums.DatasetType;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.datamate.operator.domain.repository.OperatorRepository;
import com.datamate.operator.infrastructure.exception.OperatorErrorCode;
import com.datamate.operator.interfaces.dto.OperatorDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.yaml.snakeyaml.DumperOptions;
@@ -39,6 +46,8 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -52,6 +61,8 @@ public class CleaningTaskService {
private final OperatorInstanceRepository operatorInstanceRepo;
private final OperatorRepository operatorRepo;
private final CleaningResultRepository cleaningResultRepo;
private final CleaningTaskScheduler taskScheduler;
@@ -66,11 +77,16 @@ public class CleaningTaskService {
private final String FLOW_PATH = "/flow";
private final Pattern LEVEL_PATTERN = Pattern.compile(
"\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b",
Pattern.CASE_INSENSITIVE
private static final Pattern STANDARD_LEVEL_PATTERN = Pattern.compile(
"\\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\\b"
);
private static final Pattern EXCEPTION_SUFFIX_PATTERN = Pattern.compile(
"\\b\\w+(Warning|Error|Exception)\\b"
);
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public List<CleaningTaskDto> getTasks(String status, String keywords, Integer page, Integer size) {
List<CleaningTaskDto> tasks = cleaningTaskRepo.findTasks(status, keywords, page, size);
tasks.forEach(this::setProcess);
@@ -133,6 +149,7 @@ public class CleaningTaskService {
}
public List<CleaningTaskLog> getTaskLog(String taskId) {
cleanTaskValidator.checkTaskId(taskId);
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
List<CleaningTaskLog> logs = new ArrayList<>();
@@ -156,18 +173,31 @@ public class CleaningTaskService {
return defaultLevel;
}
Matcher matcher = LEVEL_PATTERN.matcher(logLine);
if (matcher.find()) {
return matcher.group(1).toUpperCase();
Matcher stdMatcher = STANDARD_LEVEL_PATTERN.matcher(logLine);
if (stdMatcher.find()) {
return stdMatcher.group(1).toUpperCase();
}
Matcher exMatcher = EXCEPTION_SUFFIX_PATTERN.matcher(logLine);
if (exMatcher.find()) {
String match = exMatcher.group(1).toUpperCase();
if ("WARNING".equals(match)) return "WARN";
if ("ERROR".equals(match) || "EXCEPTION".equals(match)) return "ERROR";
}
return defaultLevel;
}
@Transactional
public void deleteTask(String taskId) {
cleanTaskValidator.checkTaskId(taskId);
cleaningTaskRepo.deleteTaskById(taskId);
operatorInstanceRepo.deleteByInstanceId(taskId);
cleaningResultRepo.deleteByInstanceId(taskId);
try {
FileUtils.deleteDirectory(new File(FLOW_PATH + "/" + taskId));
} catch (IOException e) {
log.warn("Can't delete flow path with task id: {}.", taskId, e);
}
}
public void executeTask(String taskId) {
@@ -180,6 +210,11 @@ public class CleaningTaskService {
}
private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances) {
List<OperatorDto> allOperators = operatorRepo.findAllOperators();
Map<String, OperatorDto> defaultSettings = allOperators.stream()
.filter(operatorDto -> StringUtils.isNotBlank(operatorDto.getSettings()))
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));
TaskProcess process = new TaskProcess();
process.setInstanceId(task.getId());
process.setDatasetId(task.getDestDatasetId());
@@ -187,7 +222,14 @@ public class CleaningTaskService {
process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId());
process.setExecutorType(ExecutorType.DATAMATE.getValue());
process.setProcess(instances.stream()
.map(instance -> Map.of(instance.getId(), instance.getOverrides()))
.map(instance -> {
OperatorDto operatorDto = defaultSettings.get(instance.getId());
Map<String, Object> stringObjectMap = getDefaultValue(operatorDto);
stringObjectMap.putAll(instance.getOverrides());
Map<String, Object> runtime = getRuntime(operatorDto);
stringObjectMap.putAll(runtime);
return Map.of(instance.getId(), stringObjectMap);
})
.toList());
ObjectMapper jsonMapper = new ObjectMapper(new YAMLFactory());
@@ -210,67 +252,113 @@ public class CleaningTaskService {
}
}
private void scanDataset(String taskId, String srcDatasetId) {
int pageNumber = 0;
int pageSize = 500;
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
PagedResponse<DatasetFile> datasetFiles;
do {
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
private Map<String, Object> getDefaultValue(OperatorDto operatorDto) {
if (StringUtils.isBlank(operatorDto.getSettings())) {
return new HashMap<>();
}
Map<String, Object> defaultSettings = new HashMap<>();
try {
Map<String, Map<String, Object>> settings = OBJECT_MAPPER.readValue(operatorDto.getSettings(), Map.class);
for (Map.Entry<String, Map<String, Object>> entry : settings.entrySet()) {
String key = entry.getKey();
Map<String, Object> setting = entry.getValue();
String type = setting.get("type").toString();
switch (type) {
case "slider":
case "switch":
case "select":
case "input":
case "radio":
case "checkbox":
if (setting.containsKey("defaultVal")) {
defaultSettings.put(key, setting.get("defaultVal"));
}
break;
case "range":
List<Object> rangeDefault = getRangeDefault(setting);
if (CollectionUtils.isNotEmpty(rangeDefault)) {
defaultSettings.put(key, rangeDefault);
}
break;
default:
}
}
List<Map<String, Object>> files = datasetFiles.getContent().stream()
.map(content -> Map.of("fileName", (Object) content.getFileName(),
"fileSize", content.getFileSize(),
"filePath", content.getFilePath(),
"fileType", content.getFileType(),
"fileId", content.getId()))
.toList();
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
pageNumber += 1;
} while (pageNumber < datasetFiles.getTotalPages());
return defaultSettings;
} catch (JsonProcessingException e) {
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
}
}
private List<Object> getRangeDefault(Map<String, Object> setting) {
List<Object> defaultValue = new ArrayList<>();
Object properties = setting.get("properties");
if (properties instanceof List<?> list) {
for (Object o : list) {
Map<String, Object> map = OBJECT_MAPPER.convertValue(o, Map.class);
if (map.containsKey("defaultVal")) {
defaultValue.add(map.get("defaultVal"));
}
}
}
return defaultValue;
}
private Map<String, Object> getRuntime(OperatorDto operatorDto) {
if (StringUtils.isBlank(operatorDto.getRuntime())) {
return new HashMap<>();
}
try {
return OBJECT_MAPPER.readValue(operatorDto.getRuntime(), Map.class);
} catch (JsonProcessingException e) {
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
}
}
private void scanDataset(String taskId, String srcDatasetId) {
doScan(taskId, srcDatasetId, file -> true);
}
private void scanDataset(String taskId, String srcDatasetId, Set<String> succeedFiles) {
doScan(taskId, srcDatasetId, file -> !succeedFiles.contains(file.getId()));
}
private void doScan(String taskId, String srcDatasetId, Predicate<DatasetFile> filterCondition) {
cleanTaskValidator.checkTaskId(taskId);
String targetFilePath = FLOW_PATH + "/" + taskId + "/dataset.jsonl";
File targetFile = new File(targetFilePath);
if (targetFile.getParentFile() != null && !targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}
int pageNumber = 0;
int pageSize = 500;
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
PagedResponse<DatasetFile> datasetFiles;
do {
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
}
List<Map<String, Object>> files = datasetFiles.getContent().stream()
.filter(content -> !succeedFiles.contains(content.getId()))
.map(content -> Map.of("fileName", (Object) content.getFileName(),
try (BufferedWriter writer = new BufferedWriter(new FileWriter(targetFile))) {
PagedResponse<DatasetFile> datasetFiles;
do {
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
}
for (DatasetFile content : datasetFiles.getContent()) {
if (!filterCondition.test(content)) {
continue;
}
Map<String, Object> fileMap = Map.of(
"fileName", content.getFileName(),
"fileSize", content.getFileSize(),
"filePath", content.getFilePath(),
"fileType", content.getFileType(),
"fileId", content.getId()))
.toList();
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
pageNumber += 1;
} while (pageNumber < datasetFiles.getTotalPages());
}
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
ObjectMapper objectMapper = new ObjectMapper();
try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常
String jsonString = objectMapper.writeValueAsString(mapList.getFirst());
writer.write(jsonString);
for (int i = 1; i < mapList.size(); i++) {
"fileId", content.getId()
);
writer.write(OBJECT_MAPPER.writeValueAsString(fileMap));
writer.newLine();
jsonString = objectMapper.writeValueAsString(mapList.get(i));
writer.write(jsonString);
}
}
pageNumber++;
} while (pageNumber < datasetFiles.getTotalPages());
} catch (IOException e) {
log.error("Failed to prepare dataset.jsonl.", e);
log.error("Failed to write dataset.jsonl for taskId: {}", taskId, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
}

View File

@@ -4,12 +4,14 @@ import com.datamate.cleaning.common.exception.CleanErrorCode;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
@Component
@@ -17,6 +19,10 @@ import java.util.Locale;
public class CleanTaskValidator {
private final CleaningTaskRepository cleaningTaskRepo;
private final Pattern UUID_PATTERN = Pattern.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
);
public void checkNameDuplication(String name) {
if (cleaningTaskRepo.isNameExist(name)) {
throw BusinessException.of(CleanErrorCode.DUPLICATE_TASK_NAME);
@@ -39,4 +45,10 @@ public class CleanTaskValidator {
front.getName(), back.getName()));
}
}
public void checkTaskId(String id) {
if (id == null || !UUID_PATTERN.matcher(id).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}
}
}