diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml
index 2c58249..fd765f9 100644
--- a/runtime/datax/package.xml
+++ b/runtime/datax/package.xml
@@ -285,6 +285,13 @@
datax
+
+ s3reader/target/datax/
+
+ **/*.*
+
+ datax
+
@@ -609,6 +616,13 @@
datax
+
+ s3writer/target/datax/
+
+ **/*.*
+
+ datax
+
obsreader/target/datax/
diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml
index 2ba87fc..bd8b97d 100644
--- a/runtime/datax/pom.xml
+++ b/runtime/datax/pom.xml
@@ -87,6 +87,7 @@
nfsreader
glusterfsreader
localreader
+ s3reader
mysqlwriter
starrockswriter
@@ -137,6 +138,7 @@
nfswriter
glusterfswriter
localwriter
+ s3writer
plugin-rdbms-util
plugin-unstructured-storage-util
diff --git a/runtime/datax/s3reader/pom.xml b/runtime/datax/s3reader/pom.xml
new file mode 100644
index 0000000..5abfd93
--- /dev/null
+++ b/runtime/datax/s3reader/pom.xml
@@ -0,0 +1,79 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ s3reader
+ s3reader
+ jar
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ software.amazon.awssdk
+ s3
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/runtime/datax/s3reader/src/main/assembly/package.xml b/runtime/datax/s3reader/src/main/assembly/package.xml
new file mode 100644
index 0000000..b6a9922
--- /dev/null
+++ b/runtime/datax/s3reader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/s3reader
+
+
+ target/
+
+ s3reader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/s3reader
+
+
+
+
+
+ false
+ plugin/reader/s3reader/libs
+ runtime
+
+
+
diff --git a/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java b/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java
new file mode 100644
index 0000000..2e85d7b
--- /dev/null
+++ b/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java
@@ -0,0 +1,222 @@
+package com.datamate.plugin.reader.s3reader;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.S3Configuration;
+
+/**
+ * S3兼容对象存储读取器
+ * 支持自定义 S3 兼容对象存储(如 MinIO、Ceph 等)
+ */
+public class S3Reader extends Reader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3Reader.class);
+
+ public static class Job extends Reader.Job {
+ private Configuration jobConfig = null;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void prepare() {
+ String endpoint = this.jobConfig.getString("endpoint");
+ String bucket = this.jobConfig.getString("bucket");
+ String accessKey = this.jobConfig.getString("accessKey");
+ String secretKey = this.jobConfig.getString("secretKey");
+
+ if (StringUtils.isBlank(endpoint)) {
+ throw new RuntimeException("endpoint is required for s3reader");
+ }
+ if (StringUtils.isBlank(bucket)) {
+ throw new RuntimeException("bucket is required for s3reader");
+ }
+ if (StringUtils.isBlank(accessKey)) {
+ throw new RuntimeException("accessKey is required for s3reader");
+ }
+ if (StringUtils.isBlank(secretKey)) {
+ throw new RuntimeException("secretKey is required for s3reader");
+ }
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ return Collections.singletonList(this.jobConfig);
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ private Configuration jobConfig;
+ private Set fileType;
+ private String endpoint;
+ private String accessKey;
+ private String secretKey;
+ private String bucket;
+ private String prefix;
+ private String region;
+ private S3Client s3;
+ private String effectivePrefix;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
+ this.endpoint = this.jobConfig.getString("endpoint");
+ this.accessKey = this.jobConfig.getString("accessKey");
+ this.secretKey = this.jobConfig.getString("secretKey");
+ this.bucket = this.jobConfig.getString("bucket");
+ this.prefix = this.jobConfig.getString("prefix");
+ this.region = this.jobConfig.getString("region", "us-east-1");
+ this.s3 = getS3Client();
+ this.effectivePrefix = getEffectivePrefix();
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ try {
+ List files = listFiles().stream()
+ .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
+ .collect(Collectors.toList());
+ files.forEach(filePath -> {
+ Record record = recordSender.createRecord();
+ record.addColumn(new StringColumn(filePath));
+ recordSender.sendToWriter(record);
+ });
+ this.jobConfig.set("columnNumber", 1);
+ } catch (Exception e) {
+ LOG.error("Error reading files from S3 compatible storage: {}", this.endpoint, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * 列举 S3 对象
+ * 非递归:只列举 prefix 当前目录下的对象(通过 delimiter="/" 实现)
+ */
+ private List listFiles() {
+ if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) {
+ throw new IllegalArgumentException("endpoint and bucket must be provided");
+ }
+ List keys = new ArrayList<>();
+ String continuationToken = null;
+ try {
+ do {
+ ListObjectsV2Request.Builder reqBuilder = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(effectivePrefix)
+ .delimiter("/");
+ if (continuationToken != null) {
+ reqBuilder.continuationToken(continuationToken);
+ }
+ ListObjectsV2Response res = s3.listObjectsV2(reqBuilder.build());
+ for (S3Object obj : res.contents()) {
+ String key = obj.key();
+ if (isInValid(key)) {
+ continue;
+ }
+ keys.add(key);
+ }
+ continuationToken = res.isTruncated() ? res.nextContinuationToken() : null;
+ } while (continuationToken != null);
+ } catch (Exception e) {
+ LOG.warn("Failed to list S3 objects: {}", e.getMessage(), e);
+ }
+ return keys;
+ }
+
+ private boolean isInValid(String key) {
+ if (!effectivePrefix.isEmpty() && !key.startsWith(effectivePrefix)) {
+ return true;
+ }
+ if (key.equals(effectivePrefix) || key.endsWith("/")) {
+ return true;
+ }
+ return false;
+ }
+
+ private String getEffectivePrefix() {
+ String effectivePrefix = "";
+ if (prefix != null) {
+ effectivePrefix = prefix.startsWith("/") ? prefix.substring(1) : prefix;
+ if (!effectivePrefix.isEmpty() && !effectivePrefix.endsWith("/")) {
+ effectivePrefix = effectivePrefix + "/";
+ }
+ }
+ return effectivePrefix;
+ }
+
+ private S3Client getS3Client() {
+ try {
+ AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
+ S3Configuration serviceConfig = S3Configuration.builder()
+ .pathStyleAccessEnabled(true)
+ .build();
+ return S3Client.builder()
+ .endpointOverride(new URI(endpoint))
+ .region(Region.of(region))
+ .serviceConfiguration(serviceConfig)
+ .credentialsProvider(StaticCredentialsProvider.create(creds))
+ .build();
+ } catch (Exception e) {
+ LOG.error("Error init S3 client: {}", this.endpoint, e);
+ throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
+ }
+ }
+
+ private String getFileSuffix(String key) {
+ String fileName = Paths.get(key).getFileName().toString();
+ int lastDotIndex = fileName.lastIndexOf('.');
+ if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) {
+ return "";
+ }
+ return fileName.substring(lastDotIndex + 1);
+ }
+
+ @Override
+ public void destroy() {
+ if (s3 != null) {
+ try {
+ s3.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+}
diff --git a/runtime/datax/s3reader/src/main/resources/plugin.json b/runtime/datax/s3reader/src/main/resources/plugin.json
new file mode 100644
index 0000000..f73c711
--- /dev/null
+++ b/runtime/datax/s3reader/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "s3reader",
+ "class": "com.datamate.plugin.reader.s3reader.S3Reader",
+ "description": "read from S3 compatible object storage",
+ "developer": "datamate"
+}
diff --git a/runtime/datax/s3reader/src/main/resources/plugin_job_template.json b/runtime/datax/s3reader/src/main/resources/plugin_job_template.json
new file mode 100644
index 0000000..a22c0f9
--- /dev/null
+++ b/runtime/datax/s3reader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,11 @@
+{
+ "name": "s3reader",
+ "parameter": {
+ "endpoint": "http://127.0.0.1:9000",
+ "bucket": "test-bucket",
+ "accessKey": "ak-xxx",
+ "secretKey": "sk-xxx",
+ "prefix": "/test",
+ "region": "us-east-1"
+ }
+}
diff --git a/runtime/datax/s3writer/pom.xml b/runtime/datax/s3writer/pom.xml
new file mode 100644
index 0000000..071093d
--- /dev/null
+++ b/runtime/datax/s3writer/pom.xml
@@ -0,0 +1,79 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ s3writer
+ s3writer
+ jar
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ software.amazon.awssdk
+ s3
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/runtime/datax/s3writer/src/main/assembly/package.xml b/runtime/datax/s3writer/src/main/assembly/package.xml
new file mode 100644
index 0000000..baa0337
--- /dev/null
+++ b/runtime/datax/s3writer/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/s3writer
+
+
+ target/
+
+ s3writer-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/s3writer
+
+
+
+
+
+ false
+ plugin/writer/s3writer/libs
+ runtime
+
+
+
diff --git a/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java b/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java
new file mode 100644
index 0000000..5b8e719
--- /dev/null
+++ b/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java
@@ -0,0 +1,181 @@
+package com.datamate.plugin.writer.s3writer;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.sync.ResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+/**
+ * S3兼容对象存储写入器
+ * 从S3兼容存储下载文件到本地目标目录
+ */
+public class S3Writer extends Writer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
+
+ public static class Job extends Writer.Job {
+ private Configuration jobConfig = null;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void prepare() {
+ String destPath = this.jobConfig.getString("destPath");
+ if (StringUtils.isBlank(destPath)) {
+ throw new RuntimeException("destPath is required for s3writer");
+ }
+ try {
+ Files.createDirectories(Paths.get(destPath));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create destination directory: " + destPath, e);
+ }
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ return Collections.singletonList(this.jobConfig);
+ }
+
+ @Override
+ public void post() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Writer.Task {
+
+ private Configuration jobConfig;
+ private Set fileType;
+ private String endpoint;
+ private String accessKey;
+ private String secretKey;
+ private String bucket;
+ private String destPath;
+ private String region;
+ private S3Client s3;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
+ this.endpoint = this.jobConfig.getString("endpoint");
+ this.accessKey = this.jobConfig.getString("accessKey");
+ this.secretKey = this.jobConfig.getString("secretKey");
+ this.bucket = this.jobConfig.getString("bucket");
+ this.destPath = this.jobConfig.getString("destPath");
+ this.region = this.jobConfig.getString("region", "us-east-1");
+ this.s3 = getS3Client();
+ }
+
+ private S3Client getS3Client() {
+ try {
+ AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
+ S3Configuration serviceConfig = S3Configuration.builder()
+ .pathStyleAccessEnabled(true)
+ .build();
+ return S3Client.builder()
+ .endpointOverride(new URI(endpoint))
+ .region(Region.of(region))
+ .serviceConfiguration(serviceConfig)
+ .credentialsProvider(StaticCredentialsProvider.create(creds))
+ .build();
+ } catch (Exception e) {
+ LOG.error("Error init S3 client: {}", this.endpoint, e);
+ throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
+ }
+ }
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ try {
+ Record record;
+ while ((record = lineReceiver.getFromReader()) != null) {
+ String key = record.getColumn(0).asString();
+ if (StringUtils.isBlank(key)) {
+ continue;
+ }
+ copyFileFromS3(key);
+ }
+ } catch (Exception e) {
+ LOG.error("Error writing files from S3 compatible storage: {}", this.endpoint, e);
+ throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
+ }
+ }
+
+ private void copyFileFromS3(String key) throws IOException {
+ if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) {
+ throw new IllegalArgumentException("endpoint and bucket must be provided");
+ }
+ try {
+ Path targetDir = Paths.get(destPath);
+ try {
+ Files.createDirectories(targetDir);
+ } catch (IOException e) {
+ LOG.warn("Create dest dir {} failed: {}", targetDir, e.getMessage(), e);
+ }
+
+ String fileName = Paths.get(key).getFileName().toString();
+ if (StringUtils.isBlank(fileName)) {
+ LOG.warn("Skip object with empty file name for key {}", key);
+ return;
+ }
+ Path target = targetDir.resolve(fileName);
+ try {
+ if (Files.exists(target)) {
+ Files.delete(target);
+ }
+ GetObjectRequest getReq = GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build();
+ s3.getObject(getReq, ResponseTransformer.toFile(target));
+ LOG.info("Downloaded S3 object {} to {}", key, target.toString());
+ } catch (Exception ex) {
+ LOG.warn("Failed to download object {}: {}", key, ex.getMessage(), ex);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to download object {}: {}", key, e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (s3 != null) {
+ try {
+ s3.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+}
diff --git a/runtime/datax/s3writer/src/main/resources/plugin.json b/runtime/datax/s3writer/src/main/resources/plugin.json
new file mode 100644
index 0000000..0f85224
--- /dev/null
+++ b/runtime/datax/s3writer/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "s3writer",
+ "class": "com.datamate.plugin.writer.s3writer.S3Writer",
+ "description": "write S3 compatible object storage files to local",
+ "developer": "datamate"
+}
diff --git a/runtime/datax/s3writer/src/main/resources/plugin_job_template.json b/runtime/datax/s3writer/src/main/resources/plugin_job_template.json
new file mode 100644
index 0000000..7c21749
--- /dev/null
+++ b/runtime/datax/s3writer/src/main/resources/plugin_job_template.json
@@ -0,0 +1,12 @@
+{
+ "name": "s3writer",
+ "parameter": {
+ "endpoint": "http://127.0.0.1:9000",
+ "bucket": "test-bucket",
+ "accessKey": "ak-xxx",
+ "secretKey": "sk-xxx",
+ "prefix": "/test",
+ "region": "us-east-1",
+ "destPath": "/data/dest"
+ }
+}
diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql
index c8aa5cf..782bf75 100644
--- a/scripts/db/data-collection-init.sql
+++ b/scripts/db/data-collection-init.sql
@@ -78,4 +78,5 @@ VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
- ('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system');
+ ('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
+ ('7', 'S3兼容存储归集模板', '将S3兼容对象存储(如MinIO、Ceph等)上的文件归集到DataMate平台上。', 's3reader', 's3reader', 's3writer', 's3writer', '{"parameter": {"endpoint": {"name": "服务地址","description": "S3兼容存储的服务地址(如http://minio.example.com:9000)。","type": "input", "required": true, "index": 1}, "bucket": {"name": "存储桶名称","description": "S3存储桶名称。","type": "input", "required": true, "index": 2}, "accessKey": {"name": "Access Key","description": "S3访问密钥(Access Key ID)。","type": "input", "required": true, "index": 3}, "secretKey": {"name": "Secret Key","description": "S3密钥(Secret Access Key)。","type": "password", "required": true, "index": 4}, "prefix": {"name": "匹配前缀","description": "按照匹配前缀选中S3中的文件进行归集。","type": "input", "required": false, "index": 5}, "region": {"name": "区域","description": "S3区域(默认us-east-1)。","type": "input", "required": false, "index": 6}}, "reader": {}, "writer": {}}', True, 'system', 'system');