From ba210d3d4f4418f4309765f4946367784da07039 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 9 Jan 2026 14:19:29 +0800 Subject: [PATCH] localfs support --- runtime/datax/localreader/pom.xml | 75 +++++++++++ .../localreader/src/main/assembly/package.xml | 35 ++++++ .../reader/localreader/LocalReader.java | 116 +++++++++++++++++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 6 + runtime/datax/localwriter/pom.xml | 75 +++++++++++ .../localwriter/src/main/assembly/package.xml | 35 ++++++ .../writer/localwriter/LocalWriter.java | 118 ++++++++++++++++++ .../src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 7 ++ runtime/datax/package.xml | 14 +++ runtime/datax/pom.xml | 2 + scripts/db/data-collection-init.sql | 3 +- 13 files changed, 497 insertions(+), 1 deletion(-) create mode 100644 runtime/datax/localreader/pom.xml create mode 100644 runtime/datax/localreader/src/main/assembly/package.xml create mode 100644 runtime/datax/localreader/src/main/java/com/datamate/plugin/reader/localreader/LocalReader.java create mode 100644 runtime/datax/localreader/src/main/resources/plugin.json create mode 100644 runtime/datax/localreader/src/main/resources/plugin_job_template.json create mode 100644 runtime/datax/localwriter/pom.xml create mode 100644 runtime/datax/localwriter/src/main/assembly/package.xml create mode 100644 runtime/datax/localwriter/src/main/java/com/datamate/plugin/writer/localwriter/LocalWriter.java create mode 100644 runtime/datax/localwriter/src/main/resources/plugin.json create mode 100644 runtime/datax/localwriter/src/main/resources/plugin_job_template.json diff --git a/runtime/datax/localreader/pom.xml b/runtime/datax/localreader/pom.xml new file mode 100644 index 0000000..3d6c620 --- /dev/null +++ b/runtime/datax/localreader/pom.xml @@ -0,0 +1,75 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + localreader + localreader + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + + + + src/main/java + + **/*.properties + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/runtime/datax/localreader/src/main/assembly/package.xml b/runtime/datax/localreader/src/main/assembly/package.xml new file mode 100644 index 0000000..f87d222 --- /dev/null +++ b/runtime/datax/localreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/localreader + + + target/ + + localreader-0.0.1-SNAPSHOT.jar + + plugin/reader/localreader + + + + + + false + plugin/reader/localreader/libs + runtime + + + diff --git a/runtime/datax/localreader/src/main/java/com/datamate/plugin/reader/localreader/LocalReader.java b/runtime/datax/localreader/src/main/java/com/datamate/plugin/reader/localreader/LocalReader.java new file mode 100644 index 0000000..ce67b5d --- /dev/null +++ b/runtime/datax/localreader/src/main/java/com/datamate/plugin/reader/localreader/LocalReader.java @@ -0,0 +1,116 @@ +package com.datamate.plugin.reader.localreader; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 本地文件夹读取器 + * 从本地文件系统的指定目录读取文件列表 + */ +public class LocalReader extends Reader { + + private static final Logger LOG = LoggerFactory.getLogger(LocalReader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + String path = this.jobConfig.getString("path"); + if (path == null || path.isEmpty()) { + throw new RuntimeException("path is required for localreader"); + } + Path dirPath = Paths.get(path); + if (!Files.exists(dirPath)) { + throw new RuntimeException("path does not exist: " + path); + } + if (!Files.isDirectory(dirPath)) { + throw new RuntimeException("path is not a directory: " + path); + } + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private String path; + private Set fileType; + private List files; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.path = this.jobConfig.getString("path"); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + } + + @Override + public void startRead(RecordSender recordSender) { + try (Stream stream = Files.list(Paths.get(this.path))) { + List fileList = stream.filter(Files::isRegularFile) + .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) + .map(p -> p.getFileName().toString()) + .filter(fileName -> this.files.isEmpty() || this.files.contains(fileName)) + .collect(Collectors.toList()); + fileList.forEach(filePath -> { + Record record = recordSender.createRecord(); + record.addColumn(new StringColumn(filePath)); + recordSender.sendToWriter(record); + }); + this.jobConfig.set("columnNumber", 1); + } catch (IOException e) { + LOG.error("Error reading files from local path: {}", this.path, e); + throw new RuntimeException(e); + } + } + + private String getFileSuffix(Path path) { + String fileName = path.getFileName().toString(); + int lastDotIndex = fileName.lastIndexOf('.'); + if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) { + return ""; + } + return fileName.substring(lastDotIndex + 1); + } + + @Override + public void destroy() { + } + } +} diff --git a/runtime/datax/localreader/src/main/resources/plugin.json b/runtime/datax/localreader/src/main/resources/plugin.json new file mode 100644 index 0000000..b119f02 --- /dev/null +++ b/runtime/datax/localreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "localreader", + "class": "com.datamate.plugin.reader.localreader.LocalReader", + "description": "read from local file system", + "developer": "datamate" +} diff --git a/runtime/datax/localreader/src/main/resources/plugin_job_template.json b/runtime/datax/localreader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..e0e6828 --- /dev/null +++ b/runtime/datax/localreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,6 @@ +{ + "name": "localreader", + "parameter": { + "path": "/data/source" + } +} diff --git a/runtime/datax/localwriter/pom.xml b/runtime/datax/localwriter/pom.xml new file mode 100644 index 0000000..9b383be --- /dev/null +++ b/runtime/datax/localwriter/pom.xml @@ -0,0 +1,75 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + localwriter + localwriter + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + + + + src/main/java + + **/*.properties + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/runtime/datax/localwriter/src/main/assembly/package.xml b/runtime/datax/localwriter/src/main/assembly/package.xml new file mode 100644 index 0000000..c5921e0 --- /dev/null +++ b/runtime/datax/localwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/localwriter + + + target/ + + localwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/localwriter + + + + + + false + plugin/writer/localwriter/libs + runtime + + + diff --git a/runtime/datax/localwriter/src/main/java/com/datamate/plugin/writer/localwriter/LocalWriter.java b/runtime/datax/localwriter/src/main/java/com/datamate/plugin/writer/localwriter/LocalWriter.java new file mode 100644 index 0000000..e13f4b6 --- /dev/null +++ b/runtime/datax/localwriter/src/main/java/com/datamate/plugin/writer/localwriter/LocalWriter.java @@ -0,0 +1,118 @@ +package com.datamate.plugin.writer.localwriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Collections; +import java.util.List; + +/** + * 本地文件夹写入器 + * 从本地源目录复制文件到目标目录 + */ +public class LocalWriter extends Writer { + + private static final Logger LOG = LoggerFactory.getLogger(LocalWriter.class); + + public static class Job extends Writer.Job { + private Configuration jobConfig; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + String destPath = this.jobConfig.getString("destPath"); + if (destPath == null || destPath.isEmpty()) { + throw new RuntimeException("destPath is required for localwriter"); + } + try { + Files.createDirectories(Paths.get(destPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to create destination directory: " + destPath, e); + } + } + + @Override + public List split(int mandatoryNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + } + + @Override + public void destroy() { + } + } + + public static class Task extends Writer.Task { + private Configuration jobConfig; + private String sourcePath; + private String destPath; + private List files; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.sourcePath = this.jobConfig.getString("path"); + this.destPath = this.jobConfig.getString("destPath"); + this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + try { + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + String fileName = record.getColumn(0).asString(); + if (StringUtils.isBlank(fileName)) { + continue; + } + if (!files.isEmpty() && !files.contains(fileName)) { + continue; + } + copyFile(fileName); + } + } catch (Exception e) { + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + private void copyFile(String fileName) { + Path source = Paths.get(this.sourcePath, fileName); + Path target = Paths.get(this.destPath, fileName); + try { + if (!Files.exists(source)) { + LOG.warn("Source file does not exist: {}", source); + return; + } + Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING); + LOG.info("Copied file {} to {}", source, target); + } catch (IOException e) { + LOG.error("Failed to copy file {} to {}: {}", source, target, e.getMessage(), e); + throw new RuntimeException("Failed to copy file: " + fileName, e); + } + } + + @Override + public void destroy() { + } + } +} diff --git a/runtime/datax/localwriter/src/main/resources/plugin.json b/runtime/datax/localwriter/src/main/resources/plugin.json new file mode 100644 index 0000000..636ce05 --- /dev/null +++ b/runtime/datax/localwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "localwriter", + "class": "com.datamate.plugin.writer.localwriter.LocalWriter", + "description": "write to local file system", + "developer": "datamate" +} diff --git a/runtime/datax/localwriter/src/main/resources/plugin_job_template.json b/runtime/datax/localwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..ef3a4cb --- /dev/null +++ b/runtime/datax/localwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,7 @@ +{ + "name": "localwriter", + "parameter": { + "path": "/data/source", + "destPath": "/data/dest" + } +} diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml index b2124c5..2c58249 100644 --- a/runtime/datax/package.xml +++ b/runtime/datax/package.xml @@ -278,6 +278,13 @@ datax + + localreader/target/datax/ + + **/*.* + + datax + @@ -595,6 +602,13 @@ datax + + localwriter/target/datax/ + + **/*.* + + datax + obsreader/target/datax/ diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml index 5436948..2ba87fc 100644 --- a/runtime/datax/pom.xml +++ b/runtime/datax/pom.xml @@ -86,6 +86,7 @@ nfsreader glusterfsreader + localreader mysqlwriter starrockswriter @@ -135,6 +136,7 @@ nfswriter glusterfswriter + localwriter plugin-rdbms-util plugin-unstructured-storage-util diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index 2aebb29..c8aa5cf 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -77,4 +77,5 @@ VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台 ('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input", "required": true, "index": 1},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input", "required": true, "index": 2},"accessKey": {"name": "AK","description": "OBS访问密钥。","type": "input", "required": true, "index": 3},"secretKey": {"name": "SK","description": "OBS密钥。","type": "password", "required": true, "index": 4},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input", "required": true, "index": 5}}, "reader": {}, "writer": {}}', True, 'system', 'system'), ('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), - ('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'); + ('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'), + ('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system');