# mysql-es-synchronizer
**Repository Path**: shigen/mysql-es-synchronizer
## Basic Information
- **Project Name**: mysql-es-synchronizer
- **Description**: 轻量级mysql-es数据同步工具
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 5
- **Forks**: 2
- **Created**: 2023-12-27
- **Last Updated**: 2024-12-20
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
mysql-es-synchronizer
# 项目介绍
开源的轻量级MySQL数据增量同步到elasticsearch工具,只需简单的配置即可实现增量的数据同步。
## 配置文件
> MySQL和elasticsearch 的配置基本和官方的 `spring-boot-starter-xxx` 的配置类似
```yaml
spring:
# MySQL数据库连接信息
datasource:
url: jdbc:mysql://shigen.com:3306/demo?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: xxxx
# elasticsearch相关配置
elasticsearch:
# elasticsearch集群
uris:
- http://localhost:9200
# elasticsearch账号密码
username: elastic
password: 123456
# 同步相关配置
sync:
# 需要同步的sql查询语句,此处的 :sql_last_value是一个固定值
sql: select id, username, password, nickname, email, phone, gender, birthday, address, create_time, update_time from demo.user_info where update_time > :sql_last_value order by update_time asc
# sql数据偏移量记录的文件地址
sql_last_value_file: mysql/last_id.txt
# 同步的定时任务
cron: 0 * * * * *
```
## 中间件版本
* MySQL 5.7+
* elasticsearch 7.2.0
> 其它的elasticsearch版本还在陆续的测试接入中
## 同步原理
从文件中读取上次同步的时间,如果未读取到正确的时间,则默认从 `1970-01-01 08:00:00` (东八区元年)开始。再根据数据中的字段 `update_time` 做为条件,查询在上次同步时间之后更新的数据。此时再将查询出的数据列表转换成json格式,批量插入到elasticsearch中。
## 和其它主流工具对比
| 特性 | Canal | Logstash | mysql-es-synchronizer |
|--------------------|-----------------------------------------|--------------------------------------------------------|----------------------|
| 数据同步方式 | 基于 MySQL binlog 实时解析 | 可以使用 JDBC、Logstash-input-jdbc 等插件进行实时同步 | 可以使用JDBC实现实时同步 |
| 支持的数据源 | 仅支持 MySQL | 支持多种关系型数据库、NoSQL 数据库等 | 目前仅支持 MySQL |
| 配置灵活性 | 需要配置 Canal Server 和 Destination | 通过配置文件定义输入、过滤、输出等管道 | 需要配置配置文件 |
| 支持的数据转换功能 | 提供简单的字段映射和格式转换功能 | 支持丰富的过滤、映射、计算等功能 | -- |
| 社区活跃度 | 在 MySQL 数据同步领域较为活跃 | 在数据同步领域有着广泛的应用和社区支持 | -- |
| 性能 | 高性能,适合大规模数据同步 | 良好的性能,但在大规模数据同步时可能需要优化 | -- |
# 常见问题
## 我的MySQL数据库没有字段 `update_time` 怎么办
在进行初次开发时, `shigen` 就考虑到了这个问题,采用的方式是 `MySQL` 的存储过程,下边是数据表的设计:
```sql
CREATE TABLE IF NOT EXISTS `user_info`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户 ID,主键',
`username` varchar(50) NOT NULL COMMENT '用户名,唯一索引',
`password` varchar(255) NOT NULL COMMENT '加密后的密码',
`nickname` varchar(50) DEFAULT NULL COMMENT '用户昵称',
`email` varchar(100) DEFAULT NULL COMMENT '邮箱地址',
`phone` varchar(20) DEFAULT NULL COMMENT '手机号码',
`gender` tinyint(4) NOT NULL DEFAULT '0' COMMENT '性别:0 代表未知,1 代表男性,2 代表女性',
`birthday` date DEFAULT NULL COMMENT '出生日期',
`address` varchar(200) DEFAULT NULL COMMENT '联系地址',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `username` (`username`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='用户信息表';
```
### 定义存储过程
现在我们只需要在所有的表中添加字段 update_time 并设置成更新触发。在查询工具中运行创建存储过程的代码:
```sql
-- 创建存储过程
DELIMITER //
CREATE PROCEDURE add_update_time_column(
IN dbName VARCHAR(255)
)
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE tableName VARCHAR(255);
DECLARE cur CURSOR FOR SELECT table_name FROM information_schema.tables WHERE table_schema = dbName;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
read_loop: LOOP
FETCH cur INTO tableName;
IF done THEN
LEAVE read_loop;
END IF;
SET @columnExists = (SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = dbName AND table_name = tableName AND column_name = 'update_time');
IF @columnExists = 0 THEN
SET @alterSql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT \'更新时间\'');
PREPARE stmt FROM @alterSql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END IF;
END LOOP;
CLOSE cur;
END //
DELIMITER ;
```
### 调用
需要注意的是:存储过程我们定义成了类似于函数的形式,它的参数之一就是 `dbName` 。
```sql
call add_update_time_column('demo');
```
### 删除存储过程
```sql
DROP PROCEDURE IF EXISTS add_update_time_column;
```
## 为什么不采用开源的ORM框架来便利数据的查询展示
其实提到这个问题,确实我们很喜欢偷懒,像 `mybatis-plus` 、 `easy-es` 这类的ORM框架,只需要直接调用封装好的接口,轻松实现数据的查询转换,而作者这里使用的是原生的 `JDBC` 。但是作为中间件,我们更在意的性能,在 `shigen` 的文章[EasyExcel实现复杂数据导入](https://juejin.cn/post/7310046632672870426)中用到了原生的JDBC。
在后期,也会逐渐的摆脱 `spring-boot-starter-jdbc` 、 `spring-boot-starter-data-elasticsearch` 的依赖,采用原生的方式获得数据库的连接和操作。
## 后期迭代的方向
### 封装开箱即用的自定义 `starter`
后期实现开箱即用的 `starter` ,只需要导入 `starter` 并在配置文件中稍微配置即可实现在项目中的使用。
### 支持多个SQL查询同步
对标 `canal` 和 `logstash` ,实现多个SQL查询的数据同步。
# 后记
目前想到的就这么多,初次尝试开源,也希望伙伴们多多支持。您的建议我也会认真的采纳,共同推进项目的优化和开源生态的持续发展。也可关注[GitHub](https://github.com/shigen-fu)获得更多的开源项目。