localfs support

This commit is contained in:
2026-01-09 14:19:29 +08:00
parent 010ffceab5
commit ba210d3d4f
13 changed files with 497 additions and 1 deletions

View File

@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>localreader</artifactId>
<name>localreader</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/localreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>localreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/localreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/localreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,116 @@
package com.datamate.plugin.reader.localreader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 本地文件夹读取器
* 从本地文件系统的指定目录读取文件列表
*/
public class LocalReader extends Reader {
private static final Logger LOG = LoggerFactory.getLogger(LocalReader.class);
public static class Job extends Reader.Job {
private Configuration jobConfig = null;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
}
@Override
public void prepare() {
String path = this.jobConfig.getString("path");
if (path == null || path.isEmpty()) {
throw new RuntimeException("path is required for localreader");
}
Path dirPath = Paths.get(path);
if (!Files.exists(dirPath)) {
throw new RuntimeException("path does not exist: " + path);
}
if (!Files.isDirectory(dirPath)) {
throw new RuntimeException("path is not a directory: " + path);
}
}
@Override
public List<Configuration> split(int adviceNumber) {
return Collections.singletonList(this.jobConfig);
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private Configuration jobConfig;
private String path;
private Set<String> fileType;
private List<String> files;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.path = this.jobConfig.getString("path");
this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
}
@Override
public void startRead(RecordSender recordSender) {
try (Stream<Path> stream = Files.list(Paths.get(this.path))) {
List<String> fileList = stream.filter(Files::isRegularFile)
.filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
.map(p -> p.getFileName().toString())
.filter(fileName -> this.files.isEmpty() || this.files.contains(fileName))
.collect(Collectors.toList());
fileList.forEach(filePath -> {
Record record = recordSender.createRecord();
record.addColumn(new StringColumn(filePath));
recordSender.sendToWriter(record);
});
this.jobConfig.set("columnNumber", 1);
} catch (IOException e) {
LOG.error("Error reading files from local path: {}", this.path, e);
throw new RuntimeException(e);
}
}
private String getFileSuffix(Path path) {
String fileName = path.getFileName().toString();
int lastDotIndex = fileName.lastIndexOf('.');
if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) {
return "";
}
return fileName.substring(lastDotIndex + 1);
}
@Override
public void destroy() {
}
}
}

View File

@@ -0,0 +1,6 @@
{
"name": "localreader",
"class": "com.datamate.plugin.reader.localreader.LocalReader",
"description": "read from local file system",
"developer": "datamate"
}

View File

@@ -0,0 +1,6 @@
{
"name": "localreader",
"parameter": {
"path": "/data/source"
}
}

View File

@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>localwriter</artifactId>
<name>localwriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/localwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>localwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/localwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/localwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,118 @@
package com.datamate.plugin.writer.localwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.List;
/**
* 本地文件夹写入器
* 从本地源目录复制文件到目标目录
*/
public class LocalWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(LocalWriter.class);
public static class Job extends Writer.Job {
private Configuration jobConfig;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
}
@Override
public void prepare() {
String destPath = this.jobConfig.getString("destPath");
if (destPath == null || destPath.isEmpty()) {
throw new RuntimeException("destPath is required for localwriter");
}
try {
Files.createDirectories(Paths.get(destPath));
} catch (IOException e) {
throw new RuntimeException("Failed to create destination directory: " + destPath, e);
}
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return Collections.singletonList(this.jobConfig);
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private Configuration jobConfig;
private String sourcePath;
private String destPath;
private List<String> files;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.sourcePath = this.jobConfig.getString("path");
this.destPath = this.jobConfig.getString("destPath");
this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
try {
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
String fileName = record.getColumn(0).asString();
if (StringUtils.isBlank(fileName)) {
continue;
}
if (!files.isEmpty() && !files.contains(fileName)) {
continue;
}
copyFile(fileName);
}
} catch (Exception e) {
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
}
}
private void copyFile(String fileName) {
Path source = Paths.get(this.sourcePath, fileName);
Path target = Paths.get(this.destPath, fileName);
try {
if (!Files.exists(source)) {
LOG.warn("Source file does not exist: {}", source);
return;
}
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
LOG.info("Copied file {} to {}", source, target);
} catch (IOException e) {
LOG.error("Failed to copy file {} to {}: {}", source, target, e.getMessage(), e);
throw new RuntimeException("Failed to copy file: " + fileName, e);
}
}
@Override
public void destroy() {
}
}
}

View File

@@ -0,0 +1,6 @@
{
"name": "localwriter",
"class": "com.datamate.plugin.writer.localwriter.LocalWriter",
"description": "write to local file system",
"developer": "datamate"
}

View File

@@ -0,0 +1,7 @@
{
"name": "localwriter",
"parameter": {
"path": "/data/source",
"destPath": "/data/dest"
}
}

View File

@@ -278,6 +278,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>localreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<!-- writer -->
<fileSet>
@@ -595,6 +602,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>localwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>obsreader/target/datax/</directory>
<includes>

View File

@@ -86,6 +86,7 @@
<!-- <module>dorisreader</module>-->
<module>nfsreader</module>
<module>glusterfsreader</module>
<module>localreader</module>
<!-- writer -->
<module>mysqlwriter</module>
<module>starrockswriter</module>
@@ -135,6 +136,7 @@
<!-- <module>milvuswriter</module>-->
<module>nfswriter</module>
<module>glusterfswriter</module>
<module>localwriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>

View File

@@ -77,4 +77,5 @@ VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台
('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input", "required": true, "index": 1},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input", "required": true, "index": 2},"accessKey": {"name": "AK","description": "OBS访问密钥。","type": "input", "required": true, "index": 3},"secretKey": {"name": "SK","description": "OBS密钥。","type": "password", "required": true, "index": 4},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input", "required": true, "index": 5}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system');
('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system');