diff --git a/lib/install.sh b/lib/install.sh
deleted file mode 100644
index 2a5ce9b537b10b811a19af6634437618c8b383a0..0000000000000000000000000000000000000000
--- a/lib/install.sh
+++ /dev/null
@@ -1 +0,0 @@
-mvn install:install-file -DgroupId=org.opengauss -DartifactId=opengauss-jdbc -Dversion=2.0.0 -Dpackaging=jar -Dfile=opengauss-jdbc-2.0.0.jar -DgeneratePom=true
diff --git a/lib/opengauss-jdbc-2.0.0.jar b/lib/opengauss-jdbc-2.0.0.jar
deleted file mode 100644
index 66cc4679d8be8a53f4bad75b3bce0df7343f4de9..0000000000000000000000000000000000000000
Binary files a/lib/opengauss-jdbc-2.0.0.jar and /dev/null differ
diff --git a/pom.xml b/pom.xml
index 500c77a6f934ac65f7b35d556606c52d2b961a95..aba2c203b7208de651e30c28251e0097fd85f0ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,12 +149,16 @@
ojdbc8
21.1.0.0
+
+
+ org.postgresql
+ postgresql
+ 42.2.23
+
org.opengauss
opengauss-jdbc
- 2.0.0
- system
- ${user.dir}/lib/opengauss-jdbc-2.0.0.jar
+ 2.0.1-compatibility
com.alibaba
@@ -165,7 +169,7 @@
org.projectlombok
lombok
- 1.16.16
+ 1.18.0
commons-configuration
diff --git a/src/main/assembly/base.xml b/src/main/assembly/base.xml
index f78c7ab60ad5f192515ed87c5b96e8d689a582a8..3681e464ac8eda741c782e87a4b0d7c2c99de862 100644
--- a/src/main/assembly/base.xml
+++ b/src/main/assembly/base.xml
@@ -40,10 +40,6 @@
./src/main/resources/gauss.properties
conf
-
- ./lib/opengauss-jdbc-2.0.0.jar
- lib
-
@@ -53,4 +49,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/com/gauss/applier/CheckRecordApplier.java b/src/main/java/com/gauss/applier/CheckRecordApplier.java
index 183c922429bec339eedcbb8956768ca01b0e9bd9..8055df308e0bf094d671c01f79f38bf37788c7fd 100644
--- a/src/main/java/com/gauss/applier/CheckRecordApplier.java
+++ b/src/main/java/com/gauss/applier/CheckRecordApplier.java
@@ -9,8 +9,8 @@ import java.util.List;
import com.gauss.common.model.DbType;
import com.gauss.common.utils.Quote;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.postgresql.copy.CopyManager;
-import org.postgresql.core.BaseConnection;
+import org.opengauss.copy.CopyManager;
+import org.opengauss.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
diff --git a/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java b/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java
index 5aacdbf86cd1fa2c13994a323af2d6e1380112cb..163de2d697b6ea941ae1724367f3870028ff8a16 100644
--- a/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java
+++ b/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java
@@ -107,7 +107,7 @@ public class TableMetaGenerator {
" FROM USER_TABLES T , USER_USERS U WHERE T.TABLE_NAME ='" + tableName.toUpperCase() + "'");
}
} else {
- //Mysql
+ //Mysql and PostgreSQL
if (StringUtils.isEmpty(tableName)) {
// ignore system tales
query = new StringBuffer("select TABLE_SCHEMA, TABLE_NAME from information_schema.tables where table_schema='"+ schemaName
diff --git a/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java b/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java
new file mode 100644
index 0000000000000000000000000000000000000000..3ca3e906c8e00fc55507fda1a405d86a733201d0
--- /dev/null
+++ b/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 Huawei Technologies Co.,Ltd.
+ */
+package com.gauss.common.db.sql;
+
+import com.gauss.common.db.meta.Table;
+import com.gauss.common.model.GaussContext;
+import com.gauss.common.utils.Quote;
+
+public class OpenGaussForPostgreSQL extends OpenGaussUtil {
+
+ public OpenGaussForPostgreSQL(GaussContext context) {
+ super(context);
+ Table tableMeta = context.getTableMeta();
+ String srcCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".",
+ Quote.ins.quote(tableMeta.getName() + "_dataCheckerA"));
+ setSrcCompareTableName(srcCompareTableName);
+ String destCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".",
+ Quote.ins.quote(tableMeta.getName() + "_dataCheckerB"));
+ setDestCompareTableName(destCompareTableName);
+ setConcatEnd(")");
+ setConcatStart("concat_ws('',");
+ setDelimiter(",");
+ String originTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".",
+ Quote.ins.quote(tableMeta.getName()));
+ setOrinTableName(originTableName);
+ setQuote("\"");
+ }
+}
diff --git a/src/main/java/com/gauss/common/db/sql/PostgresUtil.java b/src/main/java/com/gauss/common/db/sql/PostgresUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..811c37c6b60764c5037229d3909f37b11b9d78fa
--- /dev/null
+++ b/src/main/java/com/gauss/common/db/sql/PostgresUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 Huawei Technologies Co.,Ltd.
+ */
+package com.gauss.common.db.sql;
+
+import com.gauss.common.db.meta.ColumnMeta;
+import com.gauss.common.db.meta.Table;
+import com.gauss.common.model.GaussContext;
+import com.gauss.common.utils.Quote;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PostgresUtil extends SqlTemplate {
+
+ private String orinTableName;
+
+ private GaussContext context;
+
+ static final String convertBit = "cast(%s as int)";
+
+ static final String convertChar = "cast(%s as varchar)";
+
+ static final String convertFloat = "round(%s::numeric, 10)";
+
+ static final String convertGeo = "replace(cast(%s as varchar),',',' ')";
+
+ static final String convertVarchar = "substring(cast(%s as varchar) from 3)";
+
+ static final String convertIntervalDay = "round((extract(day from %s) * 60 * 60 * 24 + extract(hour from %s)" +
+ " * 60 * 60 + extract(min from %s) * 60 + extract(second from %s))::numeric, 10)";
+
+ static final String convertIntervalYear = "extract(year from %s) * 12 + extract(month from %s)";
+
+ static final String convertDefault = "%s";
+
+ public PostgresUtil(GaussContext context) {
+ this.context = context;
+ Table tableMeta = context.getTableMeta();
+ this.orinTableName = Quote.join("", tableMeta.getSchema(), ".\"", tableMeta.getName(), "\"");
+ }
+
+ static private String convert(ColumnMeta meta) {
+ String columnName = meta.getName();
+ switch(meta.getType()) {
+ case Types.BOOLEAN:
+ case Types.BIT:
+ return String.format(convertBit, columnName);
+ case Types.CHAR:
+ case Types.ROWID:
+ case Types.SQLXML:
+ return String.format(convertChar, columnName);
+ case Types.REAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ case Types.NUMERIC:
+ return String.format(convertFloat, columnName);
+ case Types.VARBINARY:
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ if (meta.getTypeName().equals("GEOMETRY")) {
+ return String.format(convertGeo, columnName);
+ } else {
+ return String.format(convertVarchar, columnName);
+ }
+ case INTERVAL_DAY:
+ return String.format(convertIntervalDay, columnName, columnName, columnName, columnName);
+ case INTERVAL_YEAR:
+ return String.format(convertIntervalYear, columnName, columnName);
+ default:
+ return String.format(convertDefault, columnName);
+ }
+ }
+
+ @Override
+ public String getMd5Sql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("md5(concat_ws('',");
+ List columns = context.getTableMeta().getColumns();
+ sb.append(columns.stream().map(PostgresUtil::convert).collect(Collectors.joining(",")));
+ sb.append("))");
+ return sb.toString();
+ }
+
+ @Override
+ public String getExtractSql() {
+ return Quote.join(" ","select ", getMd5Sql(), "from", orinTableName);
+ }
+
+ @Override
+ public String getSearchSql(ArrayList md5list) {
+ return Quote.join(" ", "select * from", orinTableName, "where", getMd5Sql(), "in (",
+ md5list.stream().map(str->"'" + str + "'").collect(Collectors.joining(",")), ")");
+ }
+}
diff --git a/src/main/java/com/gauss/common/db/sql/SqlFactory.java b/src/main/java/com/gauss/common/db/sql/SqlFactory.java
index 91609997b530ece46843189636b01d8365f6b47b..0b6bda915e97972d162a747c93c9e9e5dc452c61 100644
--- a/src/main/java/com/gauss/common/db/sql/SqlFactory.java
+++ b/src/main/java/com/gauss/common/db/sql/SqlFactory.java
@@ -12,12 +12,16 @@ public class SqlFactory {
if (type.isMysql()) {
return new MysqlUtil(context);
} else if (type.isOracle()) {
- return new OracleUtil(context);
+ return new OracleUtil(context);
+ } else if (type.isPostgreSQL()) {
+ return new PostgresUtil(context);
} else if (type.isOpenGauss()) {
if (srcType.isOracle()) {
return new OpenGaussForOracle(context);
} else if (srcType.isMysql()) {
return new OpenGaussForMysql(context);
+ } else if (srcType.isPostgreSQL()) {
+ return new OpenGaussForPostgreSQL(context);
}
}
throw new GaussException("Unknown database type");
diff --git a/src/main/java/com/gauss/common/model/DbType.java b/src/main/java/com/gauss/common/model/DbType.java
index 6b148633747b94f4a8e8a59cf189bd68ab40c40d..50f95f908eb3e246afbfc619e5a6e2f595261749 100644
--- a/src/main/java/com/gauss/common/model/DbType.java
+++ b/src/main/java/com/gauss/common/model/DbType.java
@@ -9,11 +9,15 @@ public enum DbType {
/**
* openGauss DB
*/
- OPGS("org.postgresql.Driver"),
+ OPGS("org.opengauss.Driver"),
/**
* oracle DB
*/
- ORACLE("oracle.jdbc.driver.OracleDriver");
+ ORACLE("oracle.jdbc.driver.OracleDriver"),
+ /**
+ * postgreSQL DB
+ */
+ PG("org.postgresql.Driver");
private String driver;
@@ -37,4 +41,8 @@ public enum DbType {
return this.equals(DbType.OPGS);
}
+ public boolean isPostgreSQL() {
+ return this.equals(DbType.PG);
+ }
+
}
diff --git a/src/main/java/com/gauss/controller/GaussController.java b/src/main/java/com/gauss/controller/GaussController.java
index deb293ce82c82f2c3e587b3d0cb3b6d39133f00c..7c06cc26381e268efef22a8631085c08c5d08c80 100644
--- a/src/main/java/com/gauss/controller/GaussController.java
+++ b/src/main/java/com/gauss/controller/GaussController.java
@@ -132,7 +132,7 @@ public class GaussController extends AbstractGaussLifeCycle {
RecordApplier applier = chooseApplier(context);
GaussInstance instance = new GaussInstance(context);
StatAggregation statAggregation = new StatAggregation(statBufferSize, statPrintInterval);
- instance.setPreparer(new GaussRecordPreparer(context,query_dop));
+ instance.setPreparer(new GaussRecordPreparer(sourceDbType, context,query_dop));
instance.setExtractor(extractor);
instance.setApplier(applier);
instance.setComparer(new GaussRecordComparer(sourceDbType, context, query_dop));
@@ -227,6 +227,10 @@ public class GaussController extends AbstractGaussLifeCycle {
DbOnceFullRecordExtractor recordExtractor = new DbOnceFullRecordExtractor(context, DbType.ORACLE);
recordExtractor.setTracer(progressTracer);
return recordExtractor;
+ } else if (sourceDbType == DbType.PG){
+ DbOnceFullRecordExtractor recordExtractor = new DbOnceFullRecordExtractor(context, DbType.PG);
+ recordExtractor.setTracer(progressTracer);
+ return recordExtractor;
} else {
throw new GaussException("unsupport " + sourceDbType);
}
diff --git a/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java b/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java
index 02ec5b4c7b5390d8961d2ef50f72de237b48267c..cd373e6ebcccd5fb7371d2348322d6d36313d68d 100644
--- a/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java
+++ b/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java
@@ -44,6 +44,7 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor {
@Override
public void start() {
super.start();
+ Runnable extractor;
if (StringUtils.isEmpty(extractSql)) {
SqlFactory sqlFactory = new SqlFactory();
SqlTemplate sqlTemplate = sqlFactory.getSqlTemplate(dbType, dbType, context);
@@ -52,17 +53,14 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor {
// 启动异步线程
if (dbType == DbType.ORACLE) {
- //Oracle
- extractorThread = new NamedThreadFactory(
- this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread(
- new OracleContinueExtractor(context));
+ extractor = new OracleContinueExtractor(context);
+ } else if (dbType == DbType.PG) {
+ extractor = new PGContinueExtractor(context);
} else {
- //Mysql
- extractorThread = new NamedThreadFactory(
- this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread(
- new MysqlContinueExtractor(context));
+ extractor = new MysqlContinueExtractor(context);
}
-
+ extractorThread = new NamedThreadFactory(
+ this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread(extractor);
extractorThread.start();
queue = new LinkedBlockingQueue(context.getOnceCrawNum() * 2);
@@ -173,4 +171,36 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor {
}
}
+
+ public class PGContinueExtractor implements Runnable {
+
+ private JdbcTemplate jdbcTemplate;
+
+ public PGContinueExtractor(GaussContext context){
+ jdbcTemplate = new JdbcTemplate(context.getSourceDs());
+ }
+
+ public void run() {
+ jdbcTemplate.execute(new StatementCallback() {
+
+ public Object doInStatement(Statement stmt) throws SQLException {
+ stmt.setFetchSize(200);
+ stmt.execute(extractSql);
+ ResultSet rs = stmt.getResultSet();
+ while (rs.next()) {
+ try {
+ queue.put(rs.getString(1));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // transfer
+ throw new GaussException(e);
+ }
+ }
+ setStatus(ExtractStatus.TABLE_END);
+ rs.close();
+ return null;
+ }
+ });
+ }
+ }
+
}
diff --git a/src/main/java/com/gauss/preparer/GaussRecordPreparer.java b/src/main/java/com/gauss/preparer/GaussRecordPreparer.java
index 0b7b87bbf0fe84ea86ec25b187ace85de5417b88..245e0cb23a35cfa19d91628c7469aa5c7114be6e 100644
--- a/src/main/java/com/gauss/preparer/GaussRecordPreparer.java
+++ b/src/main/java/com/gauss/preparer/GaussRecordPreparer.java
@@ -6,7 +6,6 @@ import com.gauss.common.db.sql.SqlTemplate;
import com.gauss.common.model.DbType;
import com.gauss.common.model.GaussContext;
import com.gauss.common.model.PrepareStatus;
-import com.gauss.common.utils.GaussUtils;
import com.gauss.common.utils.Quote;
import com.gauss.common.utils.thread.NamedThreadFactory;
import com.gauss.exception.GaussException;
@@ -41,10 +40,10 @@ public class GaussRecordPreparer extends AbstractRecordPreparer {
return success;
}
- public GaussRecordPreparer(GaussContext context, int query_dop) {
+ public GaussRecordPreparer(DbType srcType, GaussContext context, int query_dop) {
this.context = context;
this.query_dop = query_dop;
- this.srcType = GaussUtils.judgeDbType(context.getSourceDs());
+ this.srcType = srcType;
Table tableMeta = context.getTableMeta();
this.srcCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()),
".", Quote.ins.quote(tableMeta.getName() + "_dataCheckerA"));