diff --git a/Makefile b/Makefile index 09b5e16..b8f648e 100644 --- a/Makefile +++ b/Makefile @@ -181,7 +181,7 @@ build-%: %-docker-build @: .PHONY: build -build: database-docker-build backend-docker-build frontend-docker-build runtime-docker-build backend-python-docker-build +build: database-docker-build gateway-docker-build backend-docker-build frontend-docker-build runtime-docker-build backend-python-docker-build # ========== Utility Targets ========== diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index dc1071f..ca701c6 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -20,12 +20,19 @@ import com.datamate.datamanagement.common.enums.DatasetType; import com.datamate.datamanagement.domain.model.dataset.Dataset; import com.datamate.datamanagement.domain.model.dataset.DatasetFile; import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest; +import com.datamate.operator.domain.repository.OperatorRepository; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.yaml.snakeyaml.DumperOptions; @@ -39,6 +46,8 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -52,6 +61,8 @@ public class CleaningTaskService { private final OperatorInstanceRepository operatorInstanceRepo; + private final OperatorRepository operatorRepo; + private final CleaningResultRepository cleaningResultRepo; private final CleaningTaskScheduler taskScheduler; @@ -66,11 +77,16 @@ public class CleaningTaskService { private final String FLOW_PATH = "/flow"; - private final Pattern LEVEL_PATTERN = Pattern.compile( - "\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b", - Pattern.CASE_INSENSITIVE + private static final Pattern STANDARD_LEVEL_PATTERN = Pattern.compile( + "\\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\\b" ); + private static final Pattern EXCEPTION_SUFFIX_PATTERN = Pattern.compile( + "\\b\\w+(Warning|Error|Exception)\\b" + ); + + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public List getTasks(String status, String keywords, Integer page, Integer size) { List tasks = cleaningTaskRepo.findTasks(status, keywords, page, size); tasks.forEach(this::setProcess); @@ -133,6 +149,7 @@ public class CleaningTaskService { } public List getTaskLog(String taskId) { + cleanTaskValidator.checkTaskId(taskId); String logPath = FLOW_PATH + "/" + taskId + "/output.log"; try (Stream lines = Files.lines(Paths.get(logPath))) { List logs = new ArrayList<>(); @@ -156,18 +173,31 @@ public class CleaningTaskService { return defaultLevel; } - Matcher matcher = LEVEL_PATTERN.matcher(logLine); - if (matcher.find()) { - return matcher.group(1).toUpperCase(); + Matcher stdMatcher = STANDARD_LEVEL_PATTERN.matcher(logLine); + if (stdMatcher.find()) { + return stdMatcher.group(1).toUpperCase(); + } + + Matcher exMatcher = EXCEPTION_SUFFIX_PATTERN.matcher(logLine); + if (exMatcher.find()) { + String match = exMatcher.group(1).toUpperCase(); + if ("WARNING".equals(match)) return "WARN"; + if ("ERROR".equals(match) || "EXCEPTION".equals(match)) return "ERROR"; } return defaultLevel; } @Transactional public void deleteTask(String taskId) { + cleanTaskValidator.checkTaskId(taskId); cleaningTaskRepo.deleteTaskById(taskId); operatorInstanceRepo.deleteByInstanceId(taskId); cleaningResultRepo.deleteByInstanceId(taskId); + try { + FileUtils.deleteDirectory(new File(FLOW_PATH + "/" + taskId)); + } catch (IOException e) { + log.warn("Can't delete flow path with task id: {}.", taskId, e); + } } public void executeTask(String taskId) { @@ -180,6 +210,11 @@ public class CleaningTaskService { } private void prepareTask(CleaningTaskDto task, List instances) { + List allOperators = operatorRepo.findAllOperators(); + Map defaultSettings = allOperators.stream() + .filter(operatorDto -> StringUtils.isNotBlank(operatorDto.getSettings())) + .collect(Collectors.toMap(OperatorDto::getId, Function.identity())); + TaskProcess process = new TaskProcess(); process.setInstanceId(task.getId()); process.setDatasetId(task.getDestDatasetId()); @@ -187,7 +222,14 @@ public class CleaningTaskService { process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId()); process.setExecutorType(ExecutorType.DATAMATE.getValue()); process.setProcess(instances.stream() - .map(instance -> Map.of(instance.getId(), instance.getOverrides())) + .map(instance -> { + OperatorDto operatorDto = defaultSettings.get(instance.getId()); + Map stringObjectMap = getDefaultValue(operatorDto); + stringObjectMap.putAll(instance.getOverrides()); + Map runtime = getRuntime(operatorDto); + stringObjectMap.putAll(runtime); + return Map.of(instance.getId(), stringObjectMap); + }) .toList()); ObjectMapper jsonMapper = new ObjectMapper(new YAMLFactory()); @@ -210,67 +252,113 @@ public class CleaningTaskService { } } - private void scanDataset(String taskId, String srcDatasetId) { - int pageNumber = 0; - int pageSize = 500; - PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize); - PagedResponse datasetFiles; - do { - datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest); - if (datasetFiles.getContent().isEmpty()) { - break; + private Map getDefaultValue(OperatorDto operatorDto) { + if (StringUtils.isBlank(operatorDto.getSettings())) { + return new HashMap<>(); + } + + Map defaultSettings = new HashMap<>(); + try { + Map> settings = OBJECT_MAPPER.readValue(operatorDto.getSettings(), Map.class); + for (Map.Entry> entry : settings.entrySet()) { + String key = entry.getKey(); + Map setting = entry.getValue(); + String type = setting.get("type").toString(); + switch (type) { + case "slider": + case "switch": + case "select": + case "input": + case "radio": + case "checkbox": + if (setting.containsKey("defaultVal")) { + defaultSettings.put(key, setting.get("defaultVal")); + } + break; + case "range": + List rangeDefault = getRangeDefault(setting); + if (CollectionUtils.isNotEmpty(rangeDefault)) { + defaultSettings.put(key, rangeDefault); + } + break; + default: + } } - List> files = datasetFiles.getContent().stream() - .map(content -> Map.of("fileName", (Object) content.getFileName(), - "fileSize", content.getFileSize(), - "filePath", content.getFilePath(), - "fileType", content.getFileType(), - "fileId", content.getId())) - .toList(); - writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl"); - pageNumber += 1; - } while (pageNumber < datasetFiles.getTotalPages()); + return defaultSettings; + } catch (JsonProcessingException e) { + throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage()); + } + } + + private List getRangeDefault(Map setting) { + List defaultValue = new ArrayList<>(); + Object properties = setting.get("properties"); + if (properties instanceof List list) { + for (Object o : list) { + Map map = OBJECT_MAPPER.convertValue(o, Map.class); + if (map.containsKey("defaultVal")) { + defaultValue.add(map.get("defaultVal")); + } + } + } + return defaultValue; + } + + private Map getRuntime(OperatorDto operatorDto) { + if (StringUtils.isBlank(operatorDto.getRuntime())) { + return new HashMap<>(); + } + try { + return OBJECT_MAPPER.readValue(operatorDto.getRuntime(), Map.class); + } catch (JsonProcessingException e) { + throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage()); + } + } + + private void scanDataset(String taskId, String srcDatasetId) { + doScan(taskId, srcDatasetId, file -> true); } private void scanDataset(String taskId, String srcDatasetId, Set succeedFiles) { + doScan(taskId, srcDatasetId, file -> !succeedFiles.contains(file.getId())); + } + + private void doScan(String taskId, String srcDatasetId, Predicate filterCondition) { + cleanTaskValidator.checkTaskId(taskId); + String targetFilePath = FLOW_PATH + "/" + taskId + "/dataset.jsonl"; + File targetFile = new File(targetFilePath); + if (targetFile.getParentFile() != null && !targetFile.getParentFile().exists()) { + targetFile.getParentFile().mkdirs(); + } + int pageNumber = 0; int pageSize = 500; - PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize); - PagedResponse datasetFiles; - do { - datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest); - if (datasetFiles.getContent().isEmpty()) { - break; - } - List> files = datasetFiles.getContent().stream() - .filter(content -> !succeedFiles.contains(content.getId())) - .map(content -> Map.of("fileName", (Object) content.getFileName(), + try (BufferedWriter writer = new BufferedWriter(new FileWriter(targetFile))) { + PagedResponse datasetFiles; + do { + PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize); + datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, null, pageRequest); + if (datasetFiles.getContent().isEmpty()) { + break; + } + for (DatasetFile content : datasetFiles.getContent()) { + if (!filterCondition.test(content)) { + continue; + } + Map fileMap = Map.of( + "fileName", content.getFileName(), "fileSize", content.getFileSize(), "filePath", content.getFilePath(), "fileType", content.getFileType(), - "fileId", content.getId())) - .toList(); - writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl"); - pageNumber += 1; - } while (pageNumber < datasetFiles.getTotalPages()); - } - - private void writeListMapToJsonlFile(List> mapList, String fileName) { - ObjectMapper objectMapper = new ObjectMapper(); - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) { - if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常 - String jsonString = objectMapper.writeValueAsString(mapList.getFirst()); - writer.write(jsonString); - - for (int i = 1; i < mapList.size(); i++) { + "fileId", content.getId() + ); + writer.write(OBJECT_MAPPER.writeValueAsString(fileMap)); writer.newLine(); - jsonString = objectMapper.writeValueAsString(mapList.get(i)); - writer.write(jsonString); } - } + pageNumber++; + } while (pageNumber < datasetFiles.getTotalPages()); } catch (IOException e) { - log.error("Failed to prepare dataset.jsonl.", e); + log.error("Failed to write dataset.jsonl for taskId: {}", taskId, e); throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java index 8732a3e..3033895 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java @@ -4,12 +4,14 @@ import com.datamate.cleaning.common.exception.CleanErrorCode; import com.datamate.cleaning.domain.repository.CleaningTaskRepository; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import java.util.List; import java.util.Locale; +import java.util.regex.Pattern; @Component @@ -17,6 +19,10 @@ import java.util.Locale; public class CleanTaskValidator { private final CleaningTaskRepository cleaningTaskRepo; + private final Pattern UUID_PATTERN = Pattern.compile( + "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" + ); + public void checkNameDuplication(String name) { if (cleaningTaskRepo.isNameExist(name)) { throw BusinessException.of(CleanErrorCode.DUPLICATE_TASK_NAME); @@ -39,4 +45,10 @@ public class CleanTaskValidator { front.getName(), back.getName())); } } + + public void checkTaskId(String id) { + if (id == null || !UUID_PATTERN.matcher(id).matches()) { + throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); + } + } } diff --git a/deployment/helm/datamate/charts/ray-cluster/values.yaml b/deployment/helm/datamate/charts/ray-cluster/values.yaml index 0a57504..58752d0 100644 --- a/deployment/helm/datamate/charts/ray-cluster/values.yaml +++ b/deployment/helm/datamate/charts/ray-cluster/values.yaml @@ -126,7 +126,7 @@ worker: groupName: workergroup replicas: 1 minReplicas: 1 - maxReplicas: 3 + maxReplicas: 1 labels: {} serviceAccountName: "" restartPolicy: "" diff --git a/deployment/helm/datamate/values.yaml b/deployment/helm/datamate/values.yaml index 2adf629..5290d6b 100644 --- a/deployment/helm/datamate/values.yaml +++ b/deployment/helm/datamate/values.yaml @@ -189,8 +189,20 @@ runtime: ray-cluster: enabled: true head: + enableInTreeAutoscaling: true + autoscalerOptions: + upscalingMode: Default + idleTimeoutSeconds: 60 + imagePullPolicy: IfNotPresent + resources: + limits: + cpu: "500m" + memory: "512Mi" + requests: + cpu: "500m" + memory: "512Mi" rayStartParams: - num-cpus: '0' + num-cpus: "0" containerEnv: - name: RAY_DEDUP_LOGS value: "0" @@ -206,6 +218,8 @@ ray-cluster: value: *dbPass - name: MYSQL_DATABASE value: "datamate" + - name: RAY_enable_autoscaler_v2 + value: "1" resources: limits: cpu: "4" @@ -283,3 +297,58 @@ ray-cluster: - mountPath: /usr/local/lib/ops/site-packages name: operator-volume subPath: site-packages + additionalWorkerGroups: + npuGroup: + disabled: false + replicas: 0 + minReplicas: 0 + maxReplicas: 8 + rayStartParams: + resources: '"{\"npu\": 1}"' + containerEnv: + - name: RAY_DEDUP_LOGS + value: "0" + - name: RAY_TQDM_PATCH_PRINT + value: "0" + - name: MYSQL_HOST + value: "datamate-database" + - name: MYSQL_PORT + value: "3306" + - name: MYSQL_USER + value: "root" + - name: MYSQL_PASSWORD + value: *dbPass + - name: MYSQL_DATABASE + value: "datamate" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + resources: + limits: + cpu: "8" + memory: "64G" + huawei.com/Ascend910: 1 + requests: + cpu: "1" + memory: "2G" + huawei.com/Ascend910: 1 + volumes: + - *datasetVolume + - *flowVolume + - *logVolume + - *operatorVolume + volumeMounts: + - mountPath: /tmp/ray + name: log-volume + subPathExpr: ray/$(POD_NAME) + - mountPath: /dataset + name: dataset-volume + - mountPath: /flow + name: flow-volume + - mountPath: /opt/runtime/datamate/ops/user + name: operator-volume + subPath: extract + - mountPath: /usr/local/lib/ops/site-packages + name: operator-volume + subPath: site-packages diff --git a/runtime/python-executor/datamate/core/dataset.py b/runtime/python-executor/datamate/core/dataset.py index 1058ed2..16d45cd 100644 --- a/runtime/python-executor/datamate/core/dataset.py +++ b/runtime/python-executor/datamate/core/dataset.py @@ -22,17 +22,6 @@ from core.base_op import Filter as RELATIVE_Filter, Mapper as RELATIVE_Mapper, S rd.DataContext.get_current().enable_progress_bars = False -def is_valid_path(item, dataset_dir): - full_path = os.path.abspath(os.path.join(dataset_dir, item)) - return os.path.exists(full_path) - - -def new_get_num_npus(init_kwargs): - if init_kwargs.get("accelerator", "cpu") != "npu": - return 0.0 - return 0.1 - - class Formatters(Enum): """ 抽取算子和落盘算子枚举类 @@ -163,22 +152,19 @@ class RayDataset(BasicDataset): return res def _run_single_op(self, operators_cls, init_kwargs, **kwargs): - - num_npus = new_get_num_npus(init_kwargs) max_actor_nums = os.getenv("MAX_ACTOR_NUMS", "20") - # 分辨是否是onnx算子,如果是需要限制Actor并发数量 - if self._use_onnx_model(init_kwargs['op_name']): - max_actor_nums = 4 - resources = {} - if num_npus > 0: - resources["node_npu"] = 0.1 + if init_kwargs.get("npu", 0) > 0: + resources["npu"] = init_kwargs.get("npu") if init_kwargs.get("arch", "arm").startswith("x86"): resources["arch"] = "x86" + cpu = init_kwargs.get("cpu", 0.05) + memory = init_kwargs.get("memory", None) + kwargs.update({"ext_params": {}, "failed_reason": {}, "target_type": None}) try: if issubclass(operators_cls, (Mapper, RELATIVE_Mapper)): @@ -186,7 +172,8 @@ class RayDataset(BasicDataset): fn_constructor_kwargs=init_kwargs, fn_kwargs=kwargs, resources=resources, - num_cpus=0.05, + num_cpus=cpu, + memory=memory, compute=rd.ActorPoolStrategy(min_size=1, max_size=int(max_actor_nums))) @@ -195,7 +182,8 @@ class RayDataset(BasicDataset): fn_constructor_kwargs=init_kwargs, fn_kwargs=kwargs, resources=resources, - num_cpus=0.05, + num_cpus=cpu, + memory=memory, compute=rd.ActorPoolStrategy(min_size=1, max_size=int(max_actor_nums))) @@ -204,7 +192,8 @@ class RayDataset(BasicDataset): fn_constructor_kwargs=init_kwargs, fn_kwargs=kwargs, resources=resources, - num_cpus=0.05, + num_cpus=cpu, + memory=memory, compute=rd.ActorPoolStrategy(min_size=1, max_size=int(max_actor_nums))) else: @@ -214,13 +203,3 @@ class RayDataset(BasicDataset): except Exception as e: logger.error(e) raise Exception("Error! Ops Details:") from e - - def _use_onnx_model(self, ops_name): - if ops_name in self.onnx_ops_name: - return True - return False - - def _use_npu_model(self, ops_name): - if ops_name in self.npu_ops_name: - return True - return False