SpringBoot整合DataX数据同步(自动生成job文件)

SpringBoot整合Datax数据同步

文章目录

  • SpringBoot整合Datax数据同步
    • 1.简介
        • 设计理念
      • DataX3.0框架设计
      • DataX3.0核心架构
        • 核心模块介绍
        • DataX调度流程
    • 2.DataX3.0插件体系
    • 3.数据同步
      • 1.编写job的json文件
      • 2.进入bin目录下,执行文件
    • 4.SpringBoot整合DataX生成Job文件并执行
        • 1.准备工作
        • 2.文件目录如图
        • 3.Mysql数据同步
        • 4.Elasticsearch写入Mysql数据
    • 5.Job文件参数说明
      • 1.MysqlReader
      • 2.MysqlWriter
      • 3.ElasticsearchWriter

1.简介

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

DataX 是一个异构数据源离线同步工具,致力于实现各种异构数据源之间稳定高效的数据同步功能。

Download DataX下载地址

Github主页地址:https://github.com/alibaba/DataX

请点击:Quick Start

在这里插入图片描述

  • 设计理念

    为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX3.0框架设计

在这里插入图片描述

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

在这里插入图片描述

核心模块介绍
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

2.DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
Kingbase读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADB
ADS
OSS读 、写
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
图数据库阿里云 GDB读 、写
Neo4j
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Cassandra读 、写
数仓数据存储StarRocks读 、写
ApacheDoris
ClickHouse读 、写
Databend
Hive读 、写
kudu
selectdb
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写
TDengine读 、写

3.数据同步

1.编写job的json文件

mysql数据抽取到本地内存

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "name",
              "amount",
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                ],
                "table": [
                  "user"
                ]
              }
            ],
            "password": "root",
            "username": "root"
          }
        },
        "writer": {
            "name": "streamwriter",
            "parameter": {
                 "print": false,
                 "encoding": "UTF-8"
            }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

mysqlWriter数据写入

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                 "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19880808,
                                "type": "long"
                            },
                            {
                                "value": "1988-08-08 08:08:08",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 1000
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

mysql数据同步

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "project_code",
              "category"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                ],
                "table": [
                  "project_index"
                ]
              }
            ],
            "password": "root",
            "username": "root"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "column": [
              "id",
              "project_code",
              "category"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                "table": [
                  "project_index"
                ]
              }
            ],
            "password": "root",
            "username": "root",
            "writeMode": "update"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

2.进入bin目录下,执行文件

需python环境

python datax.py {YOUR_JOB.json}

4.SpringBoot整合DataX生成Job文件并执行

1.准备工作

下载datax,安装lib下的datax-commondatax-core的jar到本地maven仓库

依赖

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.21</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>


        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

server:
  port: 8080

# datax 相关配置,在生成文件时使用
datax:
  home: D:/software/datax/
  # job文件存储位置
  save-path: D:/software/datax/job/

属性配置

/**
 * datax配置
 * @author moshangshang
 */
@Data
@Component
@ConfigurationProperties("datax")
public class DataXProperties {


    private String home;

    private String savePath;

}

公共实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Content {

    private Reader reader;

    private Writer writer;

}
@Data
public class DataXJobRoot {
    private Job job;
}

@Data
public class Job {

    private List<Content> content;

    private Setting setting = new Setting();

}
@Data
public class Setting {

    private Speed speed = new Speed();

    @Data
    public static class Speed {
        private String channel = "1";
    }
}
public abstract class Parameter {
}
/**
 *  读取抽象类
 *  @author moshangshang
 */
@Data
public abstract class Reader {

    private String name;

    private Parameter parameter;

}
/**
 * 写入抽象类
 *  @author moshangshang
 */
@Data
public abstract class Writer {

    private String name;
    private Parameter parameter;
}

公共处理接口

public interface DataXInterface {

     /**
      * 获取读对象
      */
     Reader getReader(String table);


     /**
      * 获取写对象
      */
     Writer getWriter(String table);


     /**
      * 同类型读取写入,如mysql到mysql
      */
     String getJobTaskName(String readerTable,String writeTable);

     /**
      * 自定义读取写入
      * @param reader 读处理
      * @param write 写处理
      * @param suffix 文件名
      */
     String getJobTaskName(Reader reader,Writer write, String suffix);

}


/**
 * 接口抽象类
 * @author moshangshang
 */
@Component
public abstract class AbstractDataXHandler implements DataXInterface {

    @Autowired
    private DataXProperties dataXProperties;

    /**
     * 自定义读取写入
     * @param reader 读处理
     * @param write 写处理
     * @param suffix 文件名
     */
    @Override
    public String getJobTaskName(Reader reader, Writer write, String suffix) {
        DataXJobRoot root = new DataXJobRoot();
        Job job = new Job();
        root.setJob(job);
        Content content = new Content(reader,write);
        job.setContent(Collections.singletonList(content));
        String jsonStr = JSONUtil.parse(root).toJSONString(2);
        String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_"+suffix+".json";
        File file = FileUtil.file(dataXProperties.getSavePath(),fileName);
        FileUtil.appendString(jsonStr, file, "utf-8");
        return fileName;
    }



}

工具方法

@Repository
public class DatabaseInfoRepository {
    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * 获取所有表名
     */
    public List<String> getAllTableNames() {
        String sql = "SHOW TABLES";
        return jdbcTemplate.queryForList(sql, String.class);
    }

    /**
     * 根据表名获取字段信息
     */
    public List<Map<String, Object>> getTableColumns(String tableName) {
        String sql = "SHOW FULL COLUMNS FROM " + tableName;
        return jdbcTemplate.queryForList(sql);
    }
}
@Slf4j
@Service
public class DatabaseInfoService {

    private final DatabaseInfoRepository databaseInfoRepository;

    @Autowired
    public DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {
        this.databaseInfoRepository = databaseInfoRepository;
    }

    public void printAllTablesAndColumns() {
        // 获取所有表名
        List<String> tableNames = databaseInfoRepository.getAllTableNames();

        // 遍历表名,获取并打印每个表的字段信息
        for (String tableName : tableNames) {
            System.out.println("Table: " + tableName);

            // 获取当前表的字段信息
            List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

            // 遍历字段信息并打印
            for (Map<String, Object> column : columns) {
                System.out.println("  Column: " + column.get("Field") + " (Type: " + column.get("Type") + ")" + " (Comment: " + column.get("Comment") + ")");
            }
            // 打印空行作为分隔
            System.out.println();
        }
    }

    /** 查询指定表的所有字段列表 */
    public List<String> getColumns(String tableName) {
        List<String> list = new ArrayList<>();
        // 获取当前表的字段信息
        List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

        // 遍历字段信息并打印
        for (Map<String, Object> column : columns) {
            list.add(column.get("Field").toString());
        }

        return list;
    }

    /** 查询指定表的所有字段列表,封装成HdfsWriter格式 */
    public List<HdfsWriter.Column> getHdfsColumns(String tableName) {
        List<HdfsWriter.Column> list = new ArrayList<>();
        // 获取当前表的字段信息
        List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

        // 遍历字段信息并打印
        for (Map<String, Object> column : columns) {
            String name = column.get("Field").toString();
            String typeDb = column.get("Type").toString();
            String type = "string";
            if (typeDb.equals("bigint")) {
                type = "bigint";
            } else if (typeDb.startsWith("varchar")) {
                type = "string";
            } else if (typeDb.startsWith("date") || typeDb.endsWith("timestamp")) {
                type = "date";
            }
            HdfsWriter.Column columnHdfs = new HdfsWriter.Column();
            columnHdfs.setName(name);
            columnHdfs.setType(type);
            list.add(columnHdfs);
        }

        return list;
    }
}

datax的job任务json执行方法


/**
 * 执行器
 * @author moshangshang
 */
@Slf4j
@Component
public class DataXExecuter {

    @Autowired
    private DataXProperties dataXProperties;

    public void run(String fileName) throws IOException {
        System.setProperty("datax.home", dataXProperties.getHome());
        String filePath = dataXProperties.getSavePath()+fileName;
        String dataxJson = JSONUtil.parse(FileUtils.readFileToString(new File(filePath),"UTF-8")).toJSONString(2);
        log.info("datax log:{}",dataxJson);
        String[] dataxArgs = {"-job", filePath, "-mode", "standalone", "-jobid", "-1"};
        try {
            Engine.entry(dataxArgs);
        }catch (DataXException e){
            log.error("执行失败",e);
        } catch (Throwable throwable) {
            log.error("DataX执行异常,error cause::\n" + ExceptionTracker.trace(throwable));
        }

    }

}
2.文件目录如图

在这里插入图片描述

3.Mysql数据同步

1.编写mysql读写对象,继承读写接口

@Data
public class MysqlReader extends Reader {

    public String getName() {
        return "mysqlreader";
    }


    @Data
    public static class MysqlParameter extends Parameter {
        private List<String> column;
        private List<Connection> connection;
        private String password;
        private String username;
        private String splitPk;
        private String where;
    }

    @Data
    public static class Connection {
        private List<String> jdbcUrl;
        private List<String> table;
        private List<String> querySql;
    }
}

@EqualsAndHashCode(callSuper = true)
@Data
public class MysqlWriter extends Writer {

    public String getName() {
        return "mysqlwriter";
    }
    @EqualsAndHashCode(callSuper = true)
    @Data
    public static class MysqlParameter extends Parameter {
        private List<String> column;
        private List<Connection> connection;
        private String password;
        private String username;
        private String writeMode = "update";
    }

    @Data
    public static class Connection {
        private String jdbcUrl;
        private List<String> table;
    }
}

2.配置mysql读和写的数据库信息

/**
 * mysql读写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.mysql.reader")
public class DataXMysqlReaderProperties {


    private String url;

    private String password;

    private String username;


}
/**
 * mysql读写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.mysql.writer")
public class DataXMysqlWriterProperties {


    private String url;

    private String password;

    private String username;

}
# datax 相关配置,在生成文件时使用
datax:
  mysql:
    reader:
      url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
      username: root
      password: root
    writer:
      url: jdbc:mysql://127.0.0.1:3306/ruoyi_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
      username: root
      password: root

2.编写mysql处理类,继承抽象处理接口。生成job文件

/**
 * mysql读写处理
 * @author moshangshang
 */
@Component
@EnableConfigurationProperties({DataXMysqlReaderProperties.class, DataXMysqlWriterProperties.class})
public class MysqlHandler extends AbstractDataXHandler{

    @Autowired
    private DatabaseInfoService databaseInfoService;

    @Autowired
    private DataXProperties dataXProperties;
    @Autowired
    private DataXMysqlReaderProperties dataXMysqlReaderProperties;
    @Autowired
    private DataXMysqlWriterProperties dataXMysqlWriterProperties;

    @Override
    public Reader getReader(String table) {
        MysqlReader reader = new MysqlReader();
        MysqlReader.MysqlParameter readerParameter = new MysqlReader.MysqlParameter();
        readerParameter.setPassword(dataXMysqlReaderProperties.getPassword());
        readerParameter.setUsername(dataXMysqlReaderProperties.getUsername());
        List<String> readerColumns = databaseInfoService.getColumns(table);
        readerParameter.setColumn(readerColumns);
        MysqlReader.Connection readerConnection = new MysqlReader.Connection();
        readerConnection.setJdbcUrl(Collections.singletonList(dataXMysqlReaderProperties.getUrl()));
        readerConnection.setTable(Collections.singletonList(table));
        readerParameter.setConnection(Collections.singletonList(readerConnection));
        reader.setParameter(readerParameter);
        return reader;
    }

    @Override
    public Writer getWriter(String table) {
        MysqlWriter writer = new MysqlWriter();
        MysqlWriter.MysqlParameter writerParameter = new MysqlWriter.MysqlParameter();
        writerParameter.setPassword(dataXMysqlWriterProperties.getPassword());
        writerParameter.setUsername(dataXMysqlWriterProperties.getUsername());
        List<String> columns = databaseInfoService.getColumns(table);
        writerParameter.setColumn(columns);
        MysqlWriter.Connection connection = new MysqlWriter.Connection();
        connection.setJdbcUrl(dataXMysqlWriterProperties.getUrl());
        connection.setTable(Collections.singletonList(table));
        writerParameter.setConnection(Collections.singletonList(connection));
        writer.setParameter(writerParameter);
        return writer;
    }

    @Override
    public String getJobTaskName(String readerTable,String writeTable) {
        DataXJobRoot root = new DataXJobRoot();
        Job job = new Job();
        root.setJob(job);
        Content content = new Content(getReader(readerTable),getWriter(writeTable));
        job.setContent(Collections.singletonList(content));
        String jsonStr = JSONUtil.parse(root).toJSONString(2);
        String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_h2h.json";
        File file = FileUtil.file(dataXProperties.getSavePath(),fileName);
        FileUtil.appendString(jsonStr, file, "utf-8");
        return fileName;
    }
}

3.调用执行器,执行任务job

@SpringBootTest
public class DataxTest {

    @Autowired
    private MysqlHandler mysqlHandler;


    @Autowired
    private DataXExecuter dataXExecuter;


    /**
     * 读t_user表同步到user
     */
    @Test
    public void test() throws IOException {
        String jobTask = mysqlHandler.getJobTaskName("t_user", "user");
        dataXExecuter.run(jobTask);
    }
    
    
    /**
     * 直接执行json文件
     */
    @Test
    public void test2() throws IOException {
        dataXExecuter.run("datax_job_83798b5f1766406289a44fe681dc8878_m2m.json");
    }
}

4.执行结果

在这里插入图片描述

4.Elasticsearch写入Mysql数据

注意事项:

  • es目前只支持写入不支持读取
  • mysql数据写入es时,需保证es与mysql的列数column相同,不支持类似mysql的部分字段写入
  • 需保证列的顺序相同,写入时不会根据name名称字段去自动对应,如果顺序不一致,则可能会转换错误。如id,name,写入name,id

原理:使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch

创建es索引映射

PUT datax_data
{
  "mappings": {
    "properties": {
      "name":{"type": "keyword"},
      "amount":{"type": "long"}
    }
  }
}

1.添加es配置和文件

spring:
  elasticsearch:
    #username:
    #password:
    #path-prefix:
    uris: http://127.0.0.1:9200
    #连接elasticsearch超时时间
    connection-timeout: 60000
    socket-timeout: 30000
# datax 相关配置,在生成文件时使用
datax:
  elasticsearch:
    writer:
      url: http://127.0.0.1:9200
      username:
      password:

/**
 * es写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.elasticsearch.writer")
public class DataXElasticSearchProperties {

    private String url;

    private String username;

    private String password;

}

2.编写生成job文件实体类

@EqualsAndHashCode(callSuper = true)
@Data
public class ElasticSearchWriter extends Writer {

    public String getName() {
        return "elasticsearchwriter";
    }

    @EqualsAndHashCode(callSuper = true)
    @Data
    public static class ElasticSearchParameter extends Parameter {
        private List<Column> column;
        private String endpoint;
        private String accessId;
        private String accessKey;
        private String index;
        private Settings settings;
        private String type = "default";
        private boolean cleanup = true;
        private boolean discovery = false;
        private Integer batchSize = 1000;
        private String splitter = ",";
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Column {
        private String name;
        private String type;
        private String analyzer;
    }

    @Data
    public static class Settings {
        private Map<String,Object> index;
    }


}

3.es接口扩展

/**
 * es接口扩展
 * @author moshangshang
 */
public interface DataXElasticsearchInterface extends DataXInterface {

    Writer getWriter(String table, Map<String,Object> indexSettings);

}

4.es核心处理类

@Component
@EnableConfigurationProperties({DataXElasticSearchProperties.class})
public class ElasticSearchHandler extends AbstractDataXHandler implements DataXElasticsearchInterface{

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private DataXElasticSearchProperties dataXElasticSearchProperties;

    @Override
    public Reader getReader(String table) {
        return null;
    }

    /**
     * 普通写入
     * @param index 索引
     * @return Writer
     */
    @Override
    public Writer getWriter(String index) {
        ElasticSearchWriter writer = new ElasticSearchWriter();
        ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);
        writer.setParameter(writerParameter);
        return writer;
    }

    @Override
    public String getJobTaskName(String readerTable, String writeTable) {
        return null;
    }

    /**
     * es写入,带setting设置
     */
    @Override
    public Writer getWriter(String index,Map<String,Object> map) {
        ElasticSearchWriter writer = new ElasticSearchWriter();
        ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);
        ElasticSearchWriter.Settings settings = new ElasticSearchWriter.Settings();
        settings.setIndex(map);
        writerParameter.setSettings(settings);
        writer.setParameter(writerParameter);
        return writer;
    }


    /**
     * 获取公共写入参数
     */
    public ElasticSearchWriter.ElasticSearchParameter getElasticSearchWriter(String index){
        ElasticSearchWriter.ElasticSearchParameter writerParameter = new ElasticSearchWriter.ElasticSearchParameter();
        List<Column> columns = getEsColumns(index);
        writerParameter.setColumn(columns);
        writerParameter.setEndpoint(dataXElasticSearchProperties.getUrl());
        writerParameter.setAccessId(dataXElasticSearchProperties.getUsername());
        writerParameter.setAccessKey(dataXElasticSearchProperties.getPassword());
        writerParameter.setIndex(index);
        return writerParameter;
    }



   /**
     * 获取指定索引的映射字段
     * 读取时和创建顺序相反
     */
    public List<ElasticSearchWriter.Column> getEsColumns(String index){
        List<ElasticSearchWriter.Column> columns = new ArrayList<>();
        // 获取操作的索引文档对象
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));
        Map<String, Object> mapping = indexOperations.getMapping();
        mapping.forEach((k,value) ->{
            JSONObject json = JSON.parseObject(JSONObject.toJSONString(value));
            for (Map.Entry<String, Object> entry : json.entrySet()) {
                String key = entry.getKey();
                JSONObject properties = JSON.parseObject(JSONObject.toJSONString(entry.getValue()));
                String type = properties.getString("type");
                String analyzer = properties.getString("analyzer");
                columns.add(new ElasticSearchWriter.Column(key,type,analyzer));
            }
        });
        return columns;
    }

}

5.测试

    @Test
    public void test3() throws IOException {
        Map<String,Object> settings = new HashMap<>();
        settings.put("number_of_shards",1);
        settings.put("number_of_replicas",1);
        String jobTask = elasticSearchHandler.getJobTaskName(mysqlHandler.getReader("t_user")
                , elasticSearchHandler.getWriter("datax_data",settings),"m2e");
        dataXExecuter.run(jobTask);
    }

5.Job文件参数说明

1.MysqlReader

参数名描述必选
jdbcUrl对端数据库的JDBC连接信息并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP,如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里外部使用情况,JSON数组填写一个JDBC连接即可。
username数据源的用户名
password数据源指定用户名的密码
table所选取的需要同步的表。支持多张表同时抽取,用户自己需保证多张表是同一schema结构,注意,table必须包含在connection配置单元中。
column所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如['']。
splitPk分区主键,DataX因此会启动并发任务进行数据同步。推荐splitPk用户使用表主键
where筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。注意:limit不是SQL的合法where子句。where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,均视作同步全量数据。
querySql查询SQL同步。当用户配置了这一项之后,当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

2.MysqlWriter

参数名描述必选
jdbcUrl目的数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true;在一个数据库上只能配置一个 jdbcUrl 值。这与 MysqlReader 支持多个备库探测不同,因为此处不支持同一个数据库存在多个主库的情况(双主导入数据情况)
username数据源的用户名
password数据源指定用户名的密码
table目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。table 和 jdbcUrl 必须包含在 connection 配置单元中
column目的表需要写入数据的字段,例如: “column”: [“id”,“name”,“age”]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]
sessionDataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
preSql写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
postSql写入数据到目的表后,会执行这里的标准语句
writeMode控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句;可选:insert/replace/update,默认insert
batchSize一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。默认:1024

3.ElasticsearchWriter

参数名描述必选
endpointElasticSearch的连接地址
accessIdhttp auth中的user
accessKeyhttp auth中的password
indexelasticsearch中的index名
typeelasticsearch中index的type名,默认index名
cleanup是否删除原表, 默认值:false
batchSize每次批量数据的条数,默认值:1000
trySize失败后重试的次数, 默认值:30
timeout客户端超时时间,默认值:600000
discovery启用节点发现将(轮询)并定期更新客户机中的服务器列表。默认false
compressionhttp请求,开启压缩,默认true
multiThreadhttp请求,是否有多线程,默认true
ignoreWriteError忽略写入错误,不重试,继续写入,默认false
alias数据导入完成后写入别名
aliasMode数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个),默认append
settings创建index时候的settings, 与elasticsearch官方相同
splitter如果插入数据是array,就使用指定分隔符,默认值:-,-
columnelasticsearch所支持的字段类型,样例中包含了全部
dynamic不使用datax的mappings,使用es自己的自动mappings,默认值: false

参考资料:https://blog.csdn.net/wlddhj/article/details/137585979

Github主页地址:https://github.com/alibaba/DataX

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/770474.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】目录和文件的权限意义

现在我们知道了Linux系统内文件的三种身份&#xff08;拥有者、用户组与其他人&#xff09;&#xff0c;知道每种身份都有三种权限&#xff08;rwx&#xff09;&#xff0c;也知道能够使用chown、chgrp、chmod修改这些权限与属性&#xff0c;当然&#xff0c;利用IS-l去查看文件…

一文了解“大数据招商思维”,读懂什么是大数据招商!

近年来&#xff0c;随着大数据及人工智能等新一代信息技术的快速发展&#xff0c;数据作为重要的资源和资产&#xff0c;成为推动经济发展的核心驱动力&#xff0c;广泛应用于各个领域&#xff0c;深刻的改变着我们的生产和生活方式。那么对于“招商引资”来说&#xff0c;大数…

超级加密狗——CBS(赛博锁)

智能终端设备安全现状&#xff1a; 随着网络和智能终端普及&#xff0c;云管端的智能物联应用越来越多&#xff0c;如何保证云端平台安全&#xff0c;以及各种智能终端&#xff08;含智能仪器&#xff0c;车载终端、智能摄像头、工控机、网关路由器、智能设备、 IoT设备等&…

3D模型格式转换工具HOOPS Exchange如何实现对PRC文档的支持?

随着三维模型在各个行业中的应用越来越广泛&#xff0c;高效、准确的3D模型格式转换工具变得尤为重要。在众多工具中&#xff0c;HOOPS Exchange因其强大的功能和广泛的格式支持赢得了用户的青睐。本文将详细探讨HOOPS Exchange如何实现对PRC&#xff08;Product Representatio…

XLSX + LuckySheet + LuckyExcel实现前端的excel预览

文章目录 功能简介简单代码实现效果参考 功能简介 通过LuckyExcel的transformExcelToLucky方法&#xff0c; 我们可以把一个文件直接转成LuckySheet需要的json字符串&#xff0c; 之后我们就可以用LuckySheet预览excelLuckyExcel只能解析xlsx格式的excel文件&#xff0c;因此对…

封装stater时配置导入配置类提示功能

提示功能如下 使用注解导入配置属性时添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

Element中的选择器组件Select (一级选择组件el-select)

简述&#xff1a;在 Element UI 中&#xff0c;ElSelect&#xff08;或简称为 Select&#xff09;是一个非常常用的选择器组件&#xff0c;它提供了丰富的功能来帮助用户从一组预定义的选项中选择一个或多个值。这里来简单记录一下 一. 组件和属性配置 <el-selectv-model&q…

为什么说牛企查查企业超好用?

步入职场的职场人士&#xff0c;经济相关专业的学生&#xff0c;都有查企业的需求&#xff0c;市面上查企业的软件平台那么多&#xff0c;每个功能都不怎么一样。 有的便宜&#xff0c;但是信息不全。有的信息还可以&#xff0c;但是会员费又很贵&#xff0c;让我这个打工人没…

垂直领域大模型的机遇与挑战:从构建到应用

在人工智能技术的浪潮中,大模型以其强大的数据处理和学习能力,成为推动科技进步的重要力量。然而,这种跨领域应用的过程并非一帆风顺,既面临挑战也蕴含机遇。本文从复旦大学的研究工作出发,详细分析大模型的机遇与挑战。 背景 GPT4技术报告指出,GPT4仍处于通用人工智…

kpatch制作内核热补丁步骤总结

零、原理及参考 kpatch入门实践教程-CSDN博客 Kpatch 使用过程及其原理-CSDN博客 一、准备工作 安装对应版本的kpatch-build.rpm并解决依赖diff -Naur dir1 dir2 > hot.patch 拿到补丁文件下载对应内核版本的src.rpm安装好对应的开发包kernel-debuginfo&#xff0c;kern…

SpringBoot 多数据源配置

目录 一. 引入maven依赖包 二. 配置yml 三、创建 xml 分组文件 四、切换数据源 一. 引入maven依赖包 <dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1&…

既美观又方便的后台框架谁需要?进来就对了。

一套既美观又方便的后台框架可以大大幅节约开发时间和成本。 我们来一起看看几个明朗大气的管理控制台页面。 本文档会持续更新 模板编号&#xff1a;翠花_001模板编号&#xff1a;翠花_002模板编号&#xff1a;翠花_003

HTTP协议深入

1.了解web和网络基础 有客户端和服务端双方参与交互 客户端发送请求:request 服务端根据请求给出响应:response 请求通过URL来指定要获取都得资源 响应内容可以是HTML网页&#xff0c;或者用json表示的数据或者其他二进制文件内容 Web使用一种名为HTTP的协议作为规范&…

如何清理电脑内存?让电脑运行如飞!

电脑内存&#xff08;RAM&#xff09;的清理对于维持系统的流畅运行至关重要。随着使用时间的增加&#xff0c;系统内存会被各种应用程序和后台进程占用&#xff0c;导致系统响应变慢&#xff0c;甚至出现卡顿现象。通过有效地清理内存&#xff0c;可以提升电脑的性能&#xff…

实验六 SQL数据查询—单表查询

题目 打开ecommerce数据库&#xff0c;用SQL语句完成下列各项查询要求&#xff1a; 查询每位员工的员工编号empno、员工姓名empname、联系电话telephone和所在部门名称depname查询已下订单的商品的orderno、memname、proname、qty、totalmoney信息查询会员订单总金额超过2000的…

机器人视觉系统的发展前景如何?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「机器人视觉的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;机器视觉作为图像数据…

悲情短视频:成都柏煜文化传媒有限公司

悲情短视频&#xff1a;在光影交错中触动人心的温柔力量 在这个五彩斑斓的视频时代&#xff0c;悲情短视频如同一抹深秋的寒露&#xff0c;悄然落在人们的心田&#xff0c;带来一丝不易察觉却又难以抗拒的凉意。它们不以华丽的特效或激昂的音乐取胜&#xff0c;而是凭借真挚的…

UCOS-III 任务调度与就绪列表管理

01. 就绪优先级位图 在实时操作系统中&#xff0c;任务调度的效率至关重要。UCOS-III通过就绪优先级位图来快速查找最高优先级的就绪任务&#xff0c;从而实现高效调度。就绪优先级位图是一个按位表示的结构&#xff0c;每个位代表一个优先级&#xff0c;当某个优先级上有任务就…

DLS MARKETS外汇:美指牛市通道稳固,非农数据和美国大选成关键因素

摘要&#xff1a; 尽管近期美国经济数据表现疲弱&#xff0c;但美元指数&#xff08;美指&#xff09;依旧表现平稳。本周五即将公布的6月非农就业数据&#xff0c;以及即将到来的美国总统大选&#xff0c;将成为影响美元走势的关键因素。在技术面上&#xff0c;美指保持在牛市…

dell服务器RAID5磁盘阵列出现故障的解决过程二——热备盘制作与坏盘替换过程

目录 背景方案概念全局热备&#xff08;Global Hot Spare&#xff09;&#xff1a;独立热备&#xff08;Dedicated Hot Spare&#xff09;&#xff1a; 过程8号制作成热备清除配置制作独立热备热备顶替坏盘直接rebuild 更换2号盘2号热备 注意注意事项foreign状态要先清除配置 背…