# mysql-es-synchronizer **Repository Path**: shigen/mysql-es-synchronizer ## Basic Information - **Project Name**: mysql-es-synchronizer - **Description**: 轻量级mysql-es数据同步工具 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 2 - **Created**: 2023-12-27 - **Last Updated**: 2024-12-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README

mysql-es-synchronizer

# 项目介绍 开源的轻量级MySQL数据增量同步到elasticsearch工具,只需简单的配置即可实现增量的数据同步。 ## 配置文件 > MySQL和elasticsearch 的配置基本和官方的 `spring-boot-starter-xxx` 的配置类似 ```yaml spring: # MySQL数据库连接信息 datasource: url: jdbc:mysql://shigen.com:3306/demo?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false driver-class-name: com.mysql.cj.jdbc.Driver username: root password: xxxx # elasticsearch相关配置 elasticsearch: # elasticsearch集群 uris: - http://localhost:9200 # elasticsearch账号密码 username: elastic password: 123456 # 同步相关配置 sync: # 需要同步的sql查询语句,此处的 :sql_last_value是一个固定值 sql: select id, username, password, nickname, email, phone, gender, birthday, address, create_time, update_time from demo.user_info where update_time > :sql_last_value order by update_time asc # sql数据偏移量记录的文件地址 sql_last_value_file: mysql/last_id.txt # 同步的定时任务 cron: 0 * * * * * ``` ## 中间件版本 * MySQL 5.7+ * elasticsearch 7.2.0 > 其它的elasticsearch版本还在陆续的测试接入中 ## 同步原理 从文件中读取上次同步的时间,如果未读取到正确的时间,则默认从 `1970-01-01 08:00:00` (东八区元年)开始。再根据数据中的字段 `update_time` 做为条件,查询在上次同步时间之后更新的数据。此时再将查询出的数据列表转换成json格式,批量插入到elasticsearch中。 ## 和其它主流工具对比 | 特性 | Canal | Logstash | mysql-es-synchronizer | |--------------------|-----------------------------------------|--------------------------------------------------------|----------------------| | 数据同步方式 | 基于 MySQL binlog 实时解析 | 可以使用 JDBC、Logstash-input-jdbc 等插件进行实时同步 | 可以使用JDBC实现实时同步 | | 支持的数据源 | 仅支持 MySQL | 支持多种关系型数据库、NoSQL 数据库等 | 目前仅支持 MySQL | | 配置灵活性 | 需要配置 Canal Server 和 Destination | 通过配置文件定义输入、过滤、输出等管道 | 需要配置配置文件 | | 支持的数据转换功能 | 提供简单的字段映射和格式转换功能 | 支持丰富的过滤、映射、计算等功能 | -- | | 社区活跃度 | 在 MySQL 数据同步领域较为活跃 | 在数据同步领域有着广泛的应用和社区支持 | -- | | 性能 | 高性能,适合大规模数据同步 | 良好的性能,但在大规模数据同步时可能需要优化 | -- | # 常见问题 ## 我的MySQL数据库没有字段 `update_time` 怎么办 在进行初次开发时, `shigen` 就考虑到了这个问题,采用的方式是 `MySQL` 的存储过程,下边是数据表的设计: ```sql CREATE TABLE IF NOT EXISTS `user_info` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户 ID,主键', `username` varchar(50) NOT NULL COMMENT '用户名,唯一索引', `password` varchar(255) NOT NULL COMMENT '加密后的密码', `nickname` varchar(50) DEFAULT NULL COMMENT '用户昵称', `email` varchar(100) DEFAULT NULL COMMENT '邮箱地址', `phone` varchar(20) DEFAULT NULL COMMENT '手机号码', `gender` tinyint(4) NOT NULL DEFAULT '0' COMMENT '性别:0 代表未知,1 代表男性,2 代表女性', `birthday` date DEFAULT NULL COMMENT '出生日期', `address` varchar(200) DEFAULT NULL COMMENT '联系地址', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), UNIQUE KEY `username` (`username`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='用户信息表'; ``` ### 定义存储过程 现在我们只需要在所有的表中添加字段 update_time 并设置成更新触发。在查询工具中运行创建存储过程的代码: ```sql -- 创建存储过程 DELIMITER // CREATE PROCEDURE add_update_time_column( IN dbName VARCHAR(255) ) BEGIN DECLARE done INT DEFAULT FALSE; DECLARE tableName VARCHAR(255); DECLARE cur CURSOR FOR SELECT table_name FROM information_schema.tables WHERE table_schema = dbName; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; OPEN cur; read_loop: LOOP FETCH cur INTO tableName; IF done THEN LEAVE read_loop; END IF; SET @columnExists = (SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = dbName AND table_name = tableName AND column_name = 'update_time'); IF @columnExists = 0 THEN SET @alterSql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT \'更新时间\''); PREPARE stmt FROM @alterSql; EXECUTE stmt; DEALLOCATE PREPARE stmt; END IF; END LOOP; CLOSE cur; END // DELIMITER ; ``` ### 调用 需要注意的是:存储过程我们定义成了类似于函数的形式,它的参数之一就是 `dbName` 。 ```sql call add_update_time_column('demo'); ``` ### 删除存储过程 ```sql DROP PROCEDURE IF EXISTS add_update_time_column; ``` ## 为什么不采用开源的ORM框架来便利数据的查询展示 其实提到这个问题,确实我们很喜欢偷懒,像 `mybatis-plus` 、 `easy-es` 这类的ORM框架,只需要直接调用封装好的接口,轻松实现数据的查询转换,而作者这里使用的是原生的 `JDBC` 。但是作为中间件,我们更在意的性能,在 `shigen` 的文章[EasyExcel实现复杂数据导入](https://juejin.cn/post/7310046632672870426)中用到了原生的JDBC。
在后期,也会逐渐的摆脱 `spring-boot-starter-jdbc` 、 `spring-boot-starter-data-elasticsearch` 的依赖,采用原生的方式获得数据库的连接和操作。 ## 后期迭代的方向 ### 封装开箱即用的自定义 `starter` 后期实现开箱即用的 `starter` ,只需要导入 `starter` 并在配置文件中稍微配置即可实现在项目中的使用。 ### 支持多个SQL查询同步 对标 `canal` 和 `logstash` ,实现多个SQL查询的数据同步。 # 后记 目前想到的就这么多,初次尝试开源,也希望伙伴们多多支持。您的建议我也会认真的采纳,共同推进项目的优化和开源生态的持续发展。也可关注[GitHub](https://github.com/shigen-fu)获得更多的开源项目。