From a98eeb530f5a005f290a2b6cd741ff9e6a5d7f60 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 9 Jan 2026 14:25:59 +0800 Subject: [PATCH] s3-compatible-fs support --- runtime/datax/package.xml | 14 ++ runtime/datax/pom.xml | 2 + runtime/datax/s3reader/pom.xml | 79 +++++++ .../s3reader/src/main/assembly/package.xml | 35 +++ .../plugin/reader/s3reader/S3Reader.java | 222 ++++++++++++++++++ .../s3reader/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 11 + runtime/datax/s3writer/pom.xml | 79 +++++++ .../s3writer/src/main/assembly/package.xml | 35 +++ .../plugin/writer/s3writer/S3Writer.java | 181 ++++++++++++++ .../s3writer/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 12 + scripts/db/data-collection-init.sql | 3 +- 13 files changed, 684 insertions(+), 1 deletion(-) create mode 100644 runtime/datax/s3reader/pom.xml create mode 100644 runtime/datax/s3reader/src/main/assembly/package.xml create mode 100644 runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java create mode 100644 runtime/datax/s3reader/src/main/resources/plugin.json create mode 100644 runtime/datax/s3reader/src/main/resources/plugin_job_template.json create mode 100644 runtime/datax/s3writer/pom.xml create mode 100644 runtime/datax/s3writer/src/main/assembly/package.xml create mode 100644 runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java create mode 100644 runtime/datax/s3writer/src/main/resources/plugin.json create mode 100644 runtime/datax/s3writer/src/main/resources/plugin_job_template.json diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml index 2c58249..fd765f9 100644 --- a/runtime/datax/package.xml +++ b/runtime/datax/package.xml @@ -285,6 +285,13 @@ datax + + s3reader/target/datax/ + + **/*.* + + datax + @@ -609,6 +616,13 @@ datax + + s3writer/target/datax/ + + **/*.* + + datax + obsreader/target/datax/ diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml index 2ba87fc..bd8b97d 100644 --- a/runtime/datax/pom.xml +++ b/runtime/datax/pom.xml @@ -87,6 +87,7 @@ nfsreader glusterfsreader localreader + s3reader mysqlwriter starrockswriter @@ -137,6 +138,7 @@ nfswriter glusterfswriter localwriter + s3writer plugin-rdbms-util plugin-unstructured-storage-util diff --git a/runtime/datax/s3reader/pom.xml b/runtime/datax/s3reader/pom.xml new file mode 100644 index 0000000..5abfd93 --- /dev/null +++ b/runtime/datax/s3reader/pom.xml @@ -0,0 +1,79 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + s3reader + s3reader + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + software.amazon.awssdk + s3 + + + + + + + src/main/java + + **/*.properties + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/runtime/datax/s3reader/src/main/assembly/package.xml b/runtime/datax/s3reader/src/main/assembly/package.xml new file mode 100644 index 0000000..b6a9922 --- /dev/null +++ b/runtime/datax/s3reader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/s3reader + + + target/ + + s3reader-0.0.1-SNAPSHOT.jar + + plugin/reader/s3reader + + + + + + false + plugin/reader/s3reader/libs + runtime + + + diff --git a/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java b/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java new file mode 100644 index 0000000..2e85d7b --- /dev/null +++ b/runtime/datax/s3reader/src/main/java/com/datamate/plugin/reader/s3reader/S3Reader.java @@ -0,0 +1,222 @@ +package com.datamate.plugin.reader.s3reader; + +import java.net.URI; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.S3Configuration; + +/** + * S3兼容对象存储读取器 + * 支持自定义 S3 兼容对象存储(如 MinIO、Ceph 等) + */ +public class S3Reader extends Reader { + + private static final Logger LOG = LoggerFactory.getLogger(S3Reader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + String endpoint = this.jobConfig.getString("endpoint"); + String bucket = this.jobConfig.getString("bucket"); + String accessKey = this.jobConfig.getString("accessKey"); + String secretKey = this.jobConfig.getString("secretKey"); + + if (StringUtils.isBlank(endpoint)) { + throw new RuntimeException("endpoint is required for s3reader"); + } + if (StringUtils.isBlank(bucket)) { + throw new RuntimeException("bucket is required for s3reader"); + } + if (StringUtils.isBlank(accessKey)) { + throw new RuntimeException("accessKey is required for s3reader"); + } + if (StringUtils.isBlank(secretKey)) { + throw new RuntimeException("secretKey is required for s3reader"); + } + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private Set fileType; + private String endpoint; + private String accessKey; + private String secretKey; + private String bucket; + private String prefix; + private String region; + private S3Client s3; + private String effectivePrefix; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.endpoint = this.jobConfig.getString("endpoint"); + this.accessKey = this.jobConfig.getString("accessKey"); + this.secretKey = this.jobConfig.getString("secretKey"); + this.bucket = this.jobConfig.getString("bucket"); + this.prefix = this.jobConfig.getString("prefix"); + this.region = this.jobConfig.getString("region", "us-east-1"); + this.s3 = getS3Client(); + this.effectivePrefix = getEffectivePrefix(); + } + + @Override + public void startRead(RecordSender recordSender) { + try { + List files = listFiles().stream() + .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) + .collect(Collectors.toList()); + files.forEach(filePath -> { + Record record = recordSender.createRecord(); + record.addColumn(new StringColumn(filePath)); + recordSender.sendToWriter(record); + }); + this.jobConfig.set("columnNumber", 1); + } catch (Exception e) { + LOG.error("Error reading files from S3 compatible storage: {}", this.endpoint, e); + throw new RuntimeException(e); + } + } + + /** + * 列举 S3 对象 + * 非递归:只列举 prefix 当前目录下的对象(通过 delimiter="/" 实现) + */ + private List listFiles() { + if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) { + throw new IllegalArgumentException("endpoint and bucket must be provided"); + } + List keys = new ArrayList<>(); + String continuationToken = null; + try { + do { + ListObjectsV2Request.Builder reqBuilder = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(effectivePrefix) + .delimiter("/"); + if (continuationToken != null) { + reqBuilder.continuationToken(continuationToken); + } + ListObjectsV2Response res = s3.listObjectsV2(reqBuilder.build()); + for (S3Object obj : res.contents()) { + String key = obj.key(); + if (isInValid(key)) { + continue; + } + keys.add(key); + } + continuationToken = res.isTruncated() ? res.nextContinuationToken() : null; + } while (continuationToken != null); + } catch (Exception e) { + LOG.warn("Failed to list S3 objects: {}", e.getMessage(), e); + } + return keys; + } + + private boolean isInValid(String key) { + if (!effectivePrefix.isEmpty() && !key.startsWith(effectivePrefix)) { + return true; + } + if (key.equals(effectivePrefix) || key.endsWith("/")) { + return true; + } + return false; + } + + private String getEffectivePrefix() { + String effectivePrefix = ""; + if (prefix != null) { + effectivePrefix = prefix.startsWith("/") ? prefix.substring(1) : prefix; + if (!effectivePrefix.isEmpty() && !effectivePrefix.endsWith("/")) { + effectivePrefix = effectivePrefix + "/"; + } + } + return effectivePrefix; + } + + private S3Client getS3Client() { + try { + AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey); + S3Configuration serviceConfig = S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(); + return S3Client.builder() + .endpointOverride(new URI(endpoint)) + .region(Region.of(region)) + .serviceConfiguration(serviceConfig) + .credentialsProvider(StaticCredentialsProvider.create(creds)) + .build(); + } catch (Exception e) { + LOG.error("Error init S3 client: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + private String getFileSuffix(String key) { + String fileName = Paths.get(key).getFileName().toString(); + int lastDotIndex = fileName.lastIndexOf('.'); + if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) { + return ""; + } + return fileName.substring(lastDotIndex + 1); + } + + @Override + public void destroy() { + if (s3 != null) { + try { + s3.close(); + } catch (Exception ignore) { + } + } + } + } +} diff --git a/runtime/datax/s3reader/src/main/resources/plugin.json b/runtime/datax/s3reader/src/main/resources/plugin.json new file mode 100644 index 0000000..f73c711 --- /dev/null +++ b/runtime/datax/s3reader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "s3reader", + "class": "com.datamate.plugin.reader.s3reader.S3Reader", + "description": "read from S3 compatible object storage", + "developer": "datamate" +} diff --git a/runtime/datax/s3reader/src/main/resources/plugin_job_template.json b/runtime/datax/s3reader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..a22c0f9 --- /dev/null +++ b/runtime/datax/s3reader/src/main/resources/plugin_job_template.json @@ -0,0 +1,11 @@ +{ + "name": "s3reader", + "parameter": { + "endpoint": "http://127.0.0.1:9000", + "bucket": "test-bucket", + "accessKey": "ak-xxx", + "secretKey": "sk-xxx", + "prefix": "/test", + "region": "us-east-1" + } +} diff --git a/runtime/datax/s3writer/pom.xml b/runtime/datax/s3writer/pom.xml new file mode 100644 index 0000000..071093d --- /dev/null +++ b/runtime/datax/s3writer/pom.xml @@ -0,0 +1,79 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + s3writer + s3writer + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + software.amazon.awssdk + s3 + + + + + + + src/main/java + + **/*.properties + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/runtime/datax/s3writer/src/main/assembly/package.xml b/runtime/datax/s3writer/src/main/assembly/package.xml new file mode 100644 index 0000000..baa0337 --- /dev/null +++ b/runtime/datax/s3writer/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/s3writer + + + target/ + + s3writer-0.0.1-SNAPSHOT.jar + + plugin/writer/s3writer + + + + + + false + plugin/writer/s3writer/libs + runtime + + + diff --git a/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java b/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java new file mode 100644 index 0000000..5b8e719 --- /dev/null +++ b/runtime/datax/s3writer/src/main/java/com/datamate/plugin/writer/s3writer/S3Writer.java @@ -0,0 +1,181 @@ +package com.datamate.plugin.writer.s3writer; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +/** + * S3兼容对象存储写入器 + * 从S3兼容存储下载文件到本地目标目录 + */ +public class S3Writer extends Writer { + + private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class); + + public static class Job extends Writer.Job { + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + String destPath = this.jobConfig.getString("destPath"); + if (StringUtils.isBlank(destPath)) { + throw new RuntimeException("destPath is required for s3writer"); + } + try { + Files.createDirectories(Paths.get(destPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to create destination directory: " + destPath, e); + } + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Writer.Task { + + private Configuration jobConfig; + private Set fileType; + private String endpoint; + private String accessKey; + private String secretKey; + private String bucket; + private String destPath; + private String region; + private S3Client s3; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.endpoint = this.jobConfig.getString("endpoint"); + this.accessKey = this.jobConfig.getString("accessKey"); + this.secretKey = this.jobConfig.getString("secretKey"); + this.bucket = this.jobConfig.getString("bucket"); + this.destPath = this.jobConfig.getString("destPath"); + this.region = this.jobConfig.getString("region", "us-east-1"); + this.s3 = getS3Client(); + } + + private S3Client getS3Client() { + try { + AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey); + S3Configuration serviceConfig = S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(); + return S3Client.builder() + .endpointOverride(new URI(endpoint)) + .region(Region.of(region)) + .serviceConfiguration(serviceConfig) + .credentialsProvider(StaticCredentialsProvider.create(creds)) + .build(); + } catch (Exception e) { + LOG.error("Error init S3 client: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + try { + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + String key = record.getColumn(0).asString(); + if (StringUtils.isBlank(key)) { + continue; + } + copyFileFromS3(key); + } + } catch (Exception e) { + LOG.error("Error writing files from S3 compatible storage: {}", this.endpoint, e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + private void copyFileFromS3(String key) throws IOException { + if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) { + throw new IllegalArgumentException("endpoint and bucket must be provided"); + } + try { + Path targetDir = Paths.get(destPath); + try { + Files.createDirectories(targetDir); + } catch (IOException e) { + LOG.warn("Create dest dir {} failed: {}", targetDir, e.getMessage(), e); + } + + String fileName = Paths.get(key).getFileName().toString(); + if (StringUtils.isBlank(fileName)) { + LOG.warn("Skip object with empty file name for key {}", key); + return; + } + Path target = targetDir.resolve(fileName); + try { + if (Files.exists(target)) { + Files.delete(target); + } + GetObjectRequest getReq = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + s3.getObject(getReq, ResponseTransformer.toFile(target)); + LOG.info("Downloaded S3 object {} to {}", key, target.toString()); + } catch (Exception ex) { + LOG.warn("Failed to download object {}: {}", key, ex.getMessage(), ex); + } + } catch (Exception e) { + LOG.warn("Failed to download object {}: {}", key, e.getMessage(), e); + } + } + + @Override + public void destroy() { + if (s3 != null) { + try { + s3.close(); + } catch (Exception ignore) { + } + } + } + } +} diff --git a/runtime/datax/s3writer/src/main/resources/plugin.json b/runtime/datax/s3writer/src/main/resources/plugin.json new file mode 100644 index 0000000..0f85224 --- /dev/null +++ b/runtime/datax/s3writer/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "s3writer", + "class": "com.datamate.plugin.writer.s3writer.S3Writer", + "description": "write S3 compatible object storage files to local", + "developer": "datamate" +} diff --git a/runtime/datax/s3writer/src/main/resources/plugin_job_template.json b/runtime/datax/s3writer/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..7c21749 --- /dev/null +++ b/runtime/datax/s3writer/src/main/resources/plugin_job_template.json @@ -0,0 +1,12 @@ +{ + "name": "s3writer", + "parameter": { + "endpoint": "http://127.0.0.1:9000", + "bucket": "test-bucket", + "accessKey": "ak-xxx", + "secretKey": "sk-xxx", + "prefix": "/test", + "region": "us-east-1", + "destPath": "/data/dest" + } +} diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index c8aa5cf..782bf75 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -78,4 +78,5 @@ VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台 ('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), ('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'), - ('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system'); + ('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system'), + ('7', 'S3兼容存储归集模板', '将S3兼容对象存储(如MinIO、Ceph等)上的文件归集到DataMate平台上。', 's3reader', 's3reader', 's3writer', 's3writer', '{"parameter": {"endpoint": {"name": "服务地址","description": "S3兼容存储的服务地址(如http://minio.example.com:9000)。","type": "input", "required": true, "index": 1}, "bucket": {"name": "存储桶名称","description": "S3存储桶名称。","type": "input", "required": true, "index": 2}, "accessKey": {"name": "Access Key","description": "S3访问密钥(Access Key ID)。","type": "input", "required": true, "index": 3}, "secretKey": {"name": "Secret Key","description": "S3密钥(Secret Access Key)。","type": "password", "required": true, "index": 4}, "prefix": {"name": "匹配前缀","description": "按照匹配前缀选中S3中的文件进行归集。","type": "input", "required": false, "index": 5}, "region": {"name": "区域","description": "S3区域(默认us-east-1)。","type": "input", "required": false, "index": 6}}, "reader": {}, "writer": {}}', True, 'system', 'system');