From 5da992d312f546f2c59545e3078fe935287399e9 Mon Sep 17 00:00:00 2001 From: Vincent <84168298+szc0616@users.noreply.github.com> Date: Tue, 18 Nov 2025 09:24:07 +0800 Subject: [PATCH] =?UTF-8?q?feature=EF=BC=9A=E5=A2=9E=E5=8A=A0obs=E5=BD=92?= =?UTF-8?q?=E9=9B=86=E6=96=B9=E5=BC=8F=20(#90)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature:实现通过datax进行obs归集的插件 * feature:前端增加obs归集时的前缀参数 --- .../DataCollection/Create/CreateTask.tsx | 7 + runtime/datax/obsreader/pom.xml | 88 ++++++++ .../obsreader/src/main/assembly/package.xml | 35 +++ .../plugin/reader/obsreader/ObsReader.java | 206 ++++++++++++++++++ .../obsreader/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 10 + runtime/datax/obswriter/pom.xml | 88 ++++++++ .../obswriter/src/main/assembly/package.xml | 35 +++ .../plugin/writer/obswriter/ObsWriter.java | 177 +++++++++++++++ .../obswriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 11 + runtime/datax/package.xml | 14 ++ runtime/datax/pom.xml | 10 +- 13 files changed, 692 insertions(+), 1 deletion(-) create mode 100644 runtime/datax/obsreader/pom.xml create mode 100644 runtime/datax/obsreader/src/main/assembly/package.xml create mode 100644 runtime/datax/obsreader/src/main/java/com/datamate/plugin/reader/obsreader/ObsReader.java create mode 100644 runtime/datax/obsreader/src/main/resources/plugin.json create mode 100644 runtime/datax/obsreader/src/main/resources/plugin_job_template.json create mode 100644 runtime/datax/obswriter/pom.xml create mode 100644 runtime/datax/obswriter/src/main/assembly/package.xml create mode 100644 runtime/datax/obswriter/src/main/java/com/datamate/plugin/writer/obswriter/ObsWriter.java create mode 100644 runtime/datax/obswriter/src/main/resources/plugin.json create mode 100644 runtime/datax/obswriter/src/main/resources/plugin_job_template.json diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index 8b23b6d..a14aa4f 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -314,6 +314,13 @@ export default function CollectionTaskCreate() { placeholder="Secret Key" /> + + + )} diff --git a/runtime/datax/obsreader/pom.xml b/runtime/datax/obsreader/pom.xml new file mode 100644 index 0000000..b5c4398 --- /dev/null +++ b/runtime/datax/obsreader/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + obsreader + + + 21 + 21 + UTF-8 + + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + software.amazon.awssdk + s3 + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + diff --git a/runtime/datax/obsreader/src/main/assembly/package.xml b/runtime/datax/obsreader/src/main/assembly/package.xml new file mode 100644 index 0000000..61fa60b --- /dev/null +++ b/runtime/datax/obsreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/obsreader + + + target/ + + obsreader-0.0.1-SNAPSHOT.jar + + plugin/reader/obsreader + + + + + + false + plugin/reader/obsreader/libs + runtime + + + diff --git a/runtime/datax/obsreader/src/main/java/com/datamate/plugin/reader/obsreader/ObsReader.java b/runtime/datax/obsreader/src/main/java/com/datamate/plugin/reader/obsreader/ObsReader.java new file mode 100644 index 0000000..7ca1131 --- /dev/null +++ b/runtime/datax/obsreader/src/main/java/com/datamate/plugin/reader/obsreader/ObsReader.java @@ -0,0 +1,206 @@ +package com.datamate.plugin.reader.obsreader; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.S3Configuration; + +public class ObsReader extends Reader { + + private static final Logger LOG = LoggerFactory.getLogger(ObsReader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private Set fileType; + private String endpoint; + private String accessKey; + private String secretKey; + private String bucket; + private String prefix; + private S3Client s3; + private String effectivePrefix; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.endpoint = this.jobConfig.getString("endpoint"); + this.accessKey = this.jobConfig.getString("accessKey"); + this.secretKey = this.jobConfig.getString("secretKey"); + this.bucket = this.jobConfig.getString("bucket"); + this.prefix = this.jobConfig.getString("prefix"); + this.s3 = getS3Client(); + this.effectivePrefix = getEffectivePrefix(); + } + + @Override + public void startRead(RecordSender recordSender) { + try { + List files = listFiles().stream() + .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) + .collect(Collectors.toList()); + files.forEach(filePath -> { + Record record = recordSender.createRecord(); + record.addColumn(new StringColumn(filePath)); + recordSender.sendToWriter(record); + }); + this.jobConfig.set("columnNumber", 1); + } catch (Exception e) { + LOG.error("Error reading files from obs file system: {}", this.endpoint, e); + throw new RuntimeException(e); + } + } + + /** + * 使用 AWS SDK v2 列举 S3/OBS 对象并将对象下载到 /dataset/local/。 + * 非递归:只列举 prefix 当前目录下的对象(通过 delimiter="/" 实现)。 + * 返回对象 key 列表(下载后文件名为 key 的最后一段)。 + */ + private List listFiles() throws Exception { + if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) { + throw new IllegalArgumentException("endpoint and bucket must be provided"); + } + List keys = new ArrayList<>(); + String continuationToken = null; + try { + do { + ListObjectsV2Request.Builder reqBuilder = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(effectivePrefix) + .delimiter("/"); // 非递归:只列出当前目录下的对象(不下钻子目录) + if (continuationToken != null) reqBuilder.continuationToken(continuationToken); + ListObjectsV2Response res = s3.listObjectsV2(reqBuilder.build()); + for (S3Object obj : res.contents()) { + String key = obj.key(); + if (isInValid(key)) continue; + // 到此认为是“文件”key(且位于 prefix 当前目录) + keys.add(key); + } + continuationToken = res.isTruncated() ? res.nextContinuationToken() : null; + } while (continuationToken != null); + } catch (Exception e) { + LOG.warn("Failed to build S3 client or read object: {}", e.getMessage(), e); + // 保持原行为,对下载失败记录 warn,但不抛出新的运行时错误(外层会捕获) + } + return keys; + } + + private boolean isInValid(String key) { + // 仅接受以 effectivePrefix 开头的 key(请求通常已保证),并排除目录占位符 + if (!effectivePrefix.isEmpty() && !key.startsWith(effectivePrefix)) { + return true; + } + if (key.equals(effectivePrefix) || key.endsWith("/")) { + // 这是一个目录占位符或与 prefix 相同,跳过 + return true; + } + return false; + } + + private String getEffectivePrefix() { + // 规范化 prefix:去掉前导 '/',并确保以 '/' 结尾以表示目录前缀(如果非空) + String effectivePrefix = ""; + if (prefix != null) { + effectivePrefix = prefix.startsWith("/") ? prefix.substring(1) : prefix; + if (!effectivePrefix.isEmpty() && !effectivePrefix.endsWith("/")) { + effectivePrefix = effectivePrefix + "/"; + } + } + return effectivePrefix; + } + + private S3Client getS3Client() { + try { + AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey); + S3Configuration serviceConfig = S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(); + return S3Client.builder() + .endpointOverride(new URI(endpoint)) + .region(Region.of("us-east-1")) + .serviceConfiguration(serviceConfig) + .credentialsProvider(StaticCredentialsProvider.create(creds)) + .build(); + } catch (Exception e) { + LOG.error("Error init s3 client: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + private String getFileSuffix(String key) { + String fileName = Paths.get(key).getFileName().toString(); + int lastDotIndex = fileName.lastIndexOf('.'); + if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) { + return ""; + } + return fileName.substring(lastDotIndex + 1); + } + + @Override + public void destroy() { + if (s3 != null) { + try { + s3.close(); + } catch (Exception ignore) { + } + } + } + } +} diff --git a/runtime/datax/obsreader/src/main/resources/plugin.json b/runtime/datax/obsreader/src/main/resources/plugin.json new file mode 100644 index 0000000..d00a3e7 --- /dev/null +++ b/runtime/datax/obsreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "obsreader", + "class": "com.datamate.plugin.reader.obsreader.ObsReader", + "description": "read from obs file system", + "developer": "datamate" +} diff --git a/runtime/datax/obsreader/src/main/resources/plugin_job_template.json b/runtime/datax/obsreader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..9a79273 --- /dev/null +++ b/runtime/datax/obsreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,10 @@ +{ + "name": "obsreader", + "parameter": { + "endpoint": "127.0.0.1", + "bucket": "test", + "accessKey": "ak-xxx", + "secretKey": "sk-xxx", + "prefix": "/test" + } +} diff --git a/runtime/datax/obswriter/pom.xml b/runtime/datax/obswriter/pom.xml new file mode 100644 index 0000000..69ec1a7 --- /dev/null +++ b/runtime/datax/obswriter/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + obswriter + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + software.amazon.awssdk + s3 + + + + + 21 + 21 + UTF-8 + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + diff --git a/runtime/datax/obswriter/src/main/assembly/package.xml b/runtime/datax/obswriter/src/main/assembly/package.xml new file mode 100644 index 0000000..582c9ef --- /dev/null +++ b/runtime/datax/obswriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/obswriter + + + target/ + + obswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/obswriter + + + + + + false + plugin/writer/obswriter/libs + runtime + + + diff --git a/runtime/datax/obswriter/src/main/java/com/datamate/plugin/writer/obswriter/ObsWriter.java b/runtime/datax/obswriter/src/main/java/com/datamate/plugin/writer/obswriter/ObsWriter.java new file mode 100644 index 0000000..1082a04 --- /dev/null +++ b/runtime/datax/obswriter/src/main/java/com/datamate/plugin/writer/obswriter/ObsWriter.java @@ -0,0 +1,177 @@ +// java +package com.datamate.plugin.writer.obswriter; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +public class ObsWriter extends Writer { + + private static final Logger LOG = LoggerFactory.getLogger(ObsWriter.class); + + public static class Job extends Writer.Job { + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Writer.Task { + + private Configuration jobConfig; + private Set fileType; + private String endpoint; + private String accessKey; + private String secretKey; + private String bucket; + private String destPath; + private S3Client s3; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.endpoint = this.jobConfig.getString("endpoint"); + this.accessKey = this.jobConfig.getString("accessKey"); + this.secretKey = this.jobConfig.getString("secretKey"); + this.bucket = this.jobConfig.getString("bucket"); + this.destPath = this.jobConfig.getString("destPath"); + this.s3 = getS3Client(); + } + + private S3Client getS3Client() { + try { + AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey); + S3Configuration serviceConfig = S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(); + return S3Client.builder() + .endpointOverride(new URI(endpoint)) + .region(Region.of("us-east-1")) + .serviceConfiguration(serviceConfig) + .credentialsProvider(StaticCredentialsProvider.create(creds)) + .build(); + } catch (Exception e) { + LOG.error("Error init s3 client: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + try { + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + String key = record.getColumn(0).asString(); + if (StringUtils.isBlank(key)) { + continue; + } + copyFileFromObs(key); + } + } catch (Exception e) { + LOG.error("Error reading files from obs file system: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + private void copyFileFromObs(String key) throws IOException { + if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) { + throw new IllegalArgumentException("endpoint and bucket must be provided"); + } + try { + // 确保目标目录存在 + Path targetDir = Paths.get(destPath); + try { + Files.createDirectories(targetDir); + } catch (IOException e) { + LOG.warn("Create dest dir {} failed: {}", targetDir, e.getMessage(), e); + } + + // 下载对象到本地目录,文件名取 key 最后一段 + String fileName = Paths.get(key).getFileName().toString(); + if (StringUtils.isBlank(fileName)) { + // 防护:无法解析文件名则跳过下载 + LOG.warn("Skip object with empty file name for key {}", key); + return; + } + Path target = targetDir.resolve(fileName); + try { + if (Files.exists(target)) { + Files.delete(target); + } + GetObjectRequest getReq = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + s3.getObject(getReq, ResponseTransformer.toFile(target)); + LOG.info("Downloaded obs object {} to {}", key, target.toString()); + } catch (Exception ex) { + LOG.warn("Failed to download object {}: {}", key, ex.getMessage(), ex); + } + } catch (Exception e) { + LOG.warn("Failed to build S3 client or download object {}: {}", key, e.getMessage(), e); + // 保持原行为,对下载失败记录 warn,但不抛出新的运行时错误(外层会捕获) + } + } + + @Override + public void destroy() { + if (s3 != null) { + try { + s3.close(); + } catch (Exception ignore) { + } + } + } + } +} diff --git a/runtime/datax/obswriter/src/main/resources/plugin.json b/runtime/datax/obswriter/src/main/resources/plugin.json new file mode 100644 index 0000000..ca4cc8e --- /dev/null +++ b/runtime/datax/obswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "obswriter", + "class": "com.datamate.plugin.writer.obswriter.ObsWriter", + "description": "writer obs file to local", + "developer": "datamate" +} diff --git a/runtime/datax/obswriter/src/main/resources/plugin_job_template.json b/runtime/datax/obswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..9952c0d --- /dev/null +++ b/runtime/datax/obswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,11 @@ +{ + "name": "obswriter", + "parameter": { + "endpoint": "127.0.0.1", + "bucket": "test", + "accessKey": "ak-xxx", + "secretKey": "sk-xxx", + "prefix": "/test", + "destPath": "/test" + } +} diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml index 2d3d93e..a900328 100644 --- a/runtime/datax/package.xml +++ b/runtime/datax/package.xml @@ -581,5 +581,19 @@ datax + + obsreader/target/datax/ + + **/*.* + + datax + + + obswriter/target/datax/ + + **/*.* + + datax + diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml index 939dc68..68e9ab1 100644 --- a/runtime/datax/pom.xml +++ b/runtime/datax/pom.xml @@ -33,6 +33,7 @@ 4.13.1 5.1.22-1 1.0.0 + 2.20.0 UTF-8 UTF-8 @@ -137,7 +138,9 @@ plugin-unstructured-storage-util gaussdbreader gaussdbwriter - + obsreader + obswriter + @@ -225,6 +228,11 @@ log4j-core 2.17.1 + + software.amazon.awssdk + s3 + ${amazon.awssdk-version} +