diff --git a/lib/install.sh b/lib/install.sh deleted file mode 100644 index 2a5ce9b537b10b811a19af6634437618c8b383a0..0000000000000000000000000000000000000000 --- a/lib/install.sh +++ /dev/null @@ -1 +0,0 @@ -mvn install:install-file -DgroupId=org.opengauss -DartifactId=opengauss-jdbc -Dversion=2.0.0 -Dpackaging=jar -Dfile=opengauss-jdbc-2.0.0.jar -DgeneratePom=true diff --git a/lib/opengauss-jdbc-2.0.0.jar b/lib/opengauss-jdbc-2.0.0.jar deleted file mode 100644 index 66cc4679d8be8a53f4bad75b3bce0df7343f4de9..0000000000000000000000000000000000000000 Binary files a/lib/opengauss-jdbc-2.0.0.jar and /dev/null differ diff --git a/pom.xml b/pom.xml index 500c77a6f934ac65f7b35d556606c52d2b961a95..aba2c203b7208de651e30c28251e0097fd85f0ef 100644 --- a/pom.xml +++ b/pom.xml @@ -149,12 +149,16 @@ ojdbc8 21.1.0.0 + + + org.postgresql + postgresql + 42.2.23 + org.opengauss opengauss-jdbc - 2.0.0 - system - ${user.dir}/lib/opengauss-jdbc-2.0.0.jar + 2.0.1-compatibility com.alibaba @@ -165,7 +169,7 @@ org.projectlombok lombok - 1.16.16 + 1.18.0 commons-configuration diff --git a/src/main/assembly/base.xml b/src/main/assembly/base.xml index f78c7ab60ad5f192515ed87c5b96e8d689a582a8..3681e464ac8eda741c782e87a4b0d7c2c99de862 100644 --- a/src/main/assembly/base.xml +++ b/src/main/assembly/base.xml @@ -40,10 +40,6 @@ ./src/main/resources/gauss.properties conf - - ./lib/opengauss-jdbc-2.0.0.jar - lib - @@ -53,4 +49,4 @@ - \ No newline at end of file + diff --git a/src/main/java/com/gauss/applier/CheckRecordApplier.java b/src/main/java/com/gauss/applier/CheckRecordApplier.java index 183c922429bec339eedcbb8956768ca01b0e9bd9..8055df308e0bf094d671c01f79f38bf37788c7fd 100644 --- a/src/main/java/com/gauss/applier/CheckRecordApplier.java +++ b/src/main/java/com/gauss/applier/CheckRecordApplier.java @@ -9,8 +9,8 @@ import java.util.List; import com.gauss.common.model.DbType; import com.gauss.common.utils.Quote; import org.apache.commons.lang.exception.ExceptionUtils; -import org.postgresql.copy.CopyManager; -import org.postgresql.core.BaseConnection; +import org.opengauss.copy.CopyManager; +import org.opengauss.core.BaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; diff --git a/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java b/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java index 5aacdbf86cd1fa2c13994a323af2d6e1380112cb..163de2d697b6ea941ae1724367f3870028ff8a16 100644 --- a/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java +++ b/src/main/java/com/gauss/common/db/meta/TableMetaGenerator.java @@ -107,7 +107,7 @@ public class TableMetaGenerator { " FROM USER_TABLES T , USER_USERS U WHERE T.TABLE_NAME ='" + tableName.toUpperCase() + "'"); } } else { - //Mysql + //Mysql and PostgreSQL if (StringUtils.isEmpty(tableName)) { // ignore system tales query = new StringBuffer("select TABLE_SCHEMA, TABLE_NAME from information_schema.tables where table_schema='"+ schemaName diff --git a/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java b/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java new file mode 100644 index 0000000000000000000000000000000000000000..3ca3e906c8e00fc55507fda1a405d86a733201d0 --- /dev/null +++ b/src/main/java/com/gauss/common/db/sql/OpenGaussForPostgreSQL.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 Huawei Technologies Co.,Ltd. + */ +package com.gauss.common.db.sql; + +import com.gauss.common.db.meta.Table; +import com.gauss.common.model.GaussContext; +import com.gauss.common.utils.Quote; + +public class OpenGaussForPostgreSQL extends OpenGaussUtil { + + public OpenGaussForPostgreSQL(GaussContext context) { + super(context); + Table tableMeta = context.getTableMeta(); + String srcCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".", + Quote.ins.quote(tableMeta.getName() + "_dataCheckerA")); + setSrcCompareTableName(srcCompareTableName); + String destCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".", + Quote.ins.quote(tableMeta.getName() + "_dataCheckerB")); + setDestCompareTableName(destCompareTableName); + setConcatEnd(")"); + setConcatStart("concat_ws('',"); + setDelimiter(","); + String originTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".", + Quote.ins.quote(tableMeta.getName())); + setOrinTableName(originTableName); + setQuote("\""); + } +} diff --git a/src/main/java/com/gauss/common/db/sql/PostgresUtil.java b/src/main/java/com/gauss/common/db/sql/PostgresUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..811c37c6b60764c5037229d3909f37b11b9d78fa --- /dev/null +++ b/src/main/java/com/gauss/common/db/sql/PostgresUtil.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 Huawei Technologies Co.,Ltd. + */ +package com.gauss.common.db.sql; + +import com.gauss.common.db.meta.ColumnMeta; +import com.gauss.common.db.meta.Table; +import com.gauss.common.model.GaussContext; +import com.gauss.common.utils.Quote; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class PostgresUtil extends SqlTemplate { + + private String orinTableName; + + private GaussContext context; + + static final String convertBit = "cast(%s as int)"; + + static final String convertChar = "cast(%s as varchar)"; + + static final String convertFloat = "round(%s::numeric, 10)"; + + static final String convertGeo = "replace(cast(%s as varchar),',',' ')"; + + static final String convertVarchar = "substring(cast(%s as varchar) from 3)"; + + static final String convertIntervalDay = "round((extract(day from %s) * 60 * 60 * 24 + extract(hour from %s)" + + " * 60 * 60 + extract(min from %s) * 60 + extract(second from %s))::numeric, 10)"; + + static final String convertIntervalYear = "extract(year from %s) * 12 + extract(month from %s)"; + + static final String convertDefault = "%s"; + + public PostgresUtil(GaussContext context) { + this.context = context; + Table tableMeta = context.getTableMeta(); + this.orinTableName = Quote.join("", tableMeta.getSchema(), ".\"", tableMeta.getName(), "\""); + } + + static private String convert(ColumnMeta meta) { + String columnName = meta.getName(); + switch(meta.getType()) { + case Types.BOOLEAN: + case Types.BIT: + return String.format(convertBit, columnName); + case Types.CHAR: + case Types.ROWID: + case Types.SQLXML: + return String.format(convertChar, columnName); + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + case Types.NUMERIC: + return String.format(convertFloat, columnName); + case Types.VARBINARY: + case Types.BINARY: + case Types.LONGVARBINARY: + if (meta.getTypeName().equals("GEOMETRY")) { + return String.format(convertGeo, columnName); + } else { + return String.format(convertVarchar, columnName); + } + case INTERVAL_DAY: + return String.format(convertIntervalDay, columnName, columnName, columnName, columnName); + case INTERVAL_YEAR: + return String.format(convertIntervalYear, columnName, columnName); + default: + return String.format(convertDefault, columnName); + } + } + + @Override + public String getMd5Sql() { + StringBuilder sb = new StringBuilder(); + sb.append("md5(concat_ws('',"); + List columns = context.getTableMeta().getColumns(); + sb.append(columns.stream().map(PostgresUtil::convert).collect(Collectors.joining(","))); + sb.append("))"); + return sb.toString(); + } + + @Override + public String getExtractSql() { + return Quote.join(" ","select ", getMd5Sql(), "from", orinTableName); + } + + @Override + public String getSearchSql(ArrayList md5list) { + return Quote.join(" ", "select * from", orinTableName, "where", getMd5Sql(), "in (", + md5list.stream().map(str->"'" + str + "'").collect(Collectors.joining(",")), ")"); + } +} diff --git a/src/main/java/com/gauss/common/db/sql/SqlFactory.java b/src/main/java/com/gauss/common/db/sql/SqlFactory.java index 91609997b530ece46843189636b01d8365f6b47b..0b6bda915e97972d162a747c93c9e9e5dc452c61 100644 --- a/src/main/java/com/gauss/common/db/sql/SqlFactory.java +++ b/src/main/java/com/gauss/common/db/sql/SqlFactory.java @@ -12,12 +12,16 @@ public class SqlFactory { if (type.isMysql()) { return new MysqlUtil(context); } else if (type.isOracle()) { - return new OracleUtil(context); + return new OracleUtil(context); + } else if (type.isPostgreSQL()) { + return new PostgresUtil(context); } else if (type.isOpenGauss()) { if (srcType.isOracle()) { return new OpenGaussForOracle(context); } else if (srcType.isMysql()) { return new OpenGaussForMysql(context); + } else if (srcType.isPostgreSQL()) { + return new OpenGaussForPostgreSQL(context); } } throw new GaussException("Unknown database type"); diff --git a/src/main/java/com/gauss/common/model/DbType.java b/src/main/java/com/gauss/common/model/DbType.java index 6b148633747b94f4a8e8a59cf189bd68ab40c40d..50f95f908eb3e246afbfc619e5a6e2f595261749 100644 --- a/src/main/java/com/gauss/common/model/DbType.java +++ b/src/main/java/com/gauss/common/model/DbType.java @@ -9,11 +9,15 @@ public enum DbType { /** * openGauss DB */ - OPGS("org.postgresql.Driver"), + OPGS("org.opengauss.Driver"), /** * oracle DB */ - ORACLE("oracle.jdbc.driver.OracleDriver"); + ORACLE("oracle.jdbc.driver.OracleDriver"), + /** + * postgreSQL DB + */ + PG("org.postgresql.Driver"); private String driver; @@ -37,4 +41,8 @@ public enum DbType { return this.equals(DbType.OPGS); } + public boolean isPostgreSQL() { + return this.equals(DbType.PG); + } + } diff --git a/src/main/java/com/gauss/controller/GaussController.java b/src/main/java/com/gauss/controller/GaussController.java index deb293ce82c82f2c3e587b3d0cb3b6d39133f00c..7c06cc26381e268efef22a8631085c08c5d08c80 100644 --- a/src/main/java/com/gauss/controller/GaussController.java +++ b/src/main/java/com/gauss/controller/GaussController.java @@ -132,7 +132,7 @@ public class GaussController extends AbstractGaussLifeCycle { RecordApplier applier = chooseApplier(context); GaussInstance instance = new GaussInstance(context); StatAggregation statAggregation = new StatAggregation(statBufferSize, statPrintInterval); - instance.setPreparer(new GaussRecordPreparer(context,query_dop)); + instance.setPreparer(new GaussRecordPreparer(sourceDbType, context,query_dop)); instance.setExtractor(extractor); instance.setApplier(applier); instance.setComparer(new GaussRecordComparer(sourceDbType, context, query_dop)); @@ -227,6 +227,10 @@ public class GaussController extends AbstractGaussLifeCycle { DbOnceFullRecordExtractor recordExtractor = new DbOnceFullRecordExtractor(context, DbType.ORACLE); recordExtractor.setTracer(progressTracer); return recordExtractor; + } else if (sourceDbType == DbType.PG){ + DbOnceFullRecordExtractor recordExtractor = new DbOnceFullRecordExtractor(context, DbType.PG); + recordExtractor.setTracer(progressTracer); + return recordExtractor; } else { throw new GaussException("unsupport " + sourceDbType); } diff --git a/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java b/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java index 02ec5b4c7b5390d8961d2ef50f72de237b48267c..cd373e6ebcccd5fb7371d2348322d6d36313d68d 100644 --- a/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java +++ b/src/main/java/com/gauss/extractor/db/DbOnceFullRecordExtractor.java @@ -44,6 +44,7 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor { @Override public void start() { super.start(); + Runnable extractor; if (StringUtils.isEmpty(extractSql)) { SqlFactory sqlFactory = new SqlFactory(); SqlTemplate sqlTemplate = sqlFactory.getSqlTemplate(dbType, dbType, context); @@ -52,17 +53,14 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor { // 启动异步线程 if (dbType == DbType.ORACLE) { - //Oracle - extractorThread = new NamedThreadFactory( - this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread( - new OracleContinueExtractor(context)); + extractor = new OracleContinueExtractor(context); + } else if (dbType == DbType.PG) { + extractor = new PGContinueExtractor(context); } else { - //Mysql - extractorThread = new NamedThreadFactory( - this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread( - new MysqlContinueExtractor(context)); + extractor = new MysqlContinueExtractor(context); } - + extractorThread = new NamedThreadFactory( + this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName()).newThread(extractor); extractorThread.start(); queue = new LinkedBlockingQueue(context.getOnceCrawNum() * 2); @@ -173,4 +171,36 @@ public class DbOnceFullRecordExtractor extends AbstractRecordExtractor { } } + + public class PGContinueExtractor implements Runnable { + + private JdbcTemplate jdbcTemplate; + + public PGContinueExtractor(GaussContext context){ + jdbcTemplate = new JdbcTemplate(context.getSourceDs()); + } + + public void run() { + jdbcTemplate.execute(new StatementCallback() { + + public Object doInStatement(Statement stmt) throws SQLException { + stmt.setFetchSize(200); + stmt.execute(extractSql); + ResultSet rs = stmt.getResultSet(); + while (rs.next()) { + try { + queue.put(rs.getString(1)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // transfer + throw new GaussException(e); + } + } + setStatus(ExtractStatus.TABLE_END); + rs.close(); + return null; + } + }); + } + } + } diff --git a/src/main/java/com/gauss/preparer/GaussRecordPreparer.java b/src/main/java/com/gauss/preparer/GaussRecordPreparer.java index 0b7b87bbf0fe84ea86ec25b187ace85de5417b88..245e0cb23a35cfa19d91628c7469aa5c7114be6e 100644 --- a/src/main/java/com/gauss/preparer/GaussRecordPreparer.java +++ b/src/main/java/com/gauss/preparer/GaussRecordPreparer.java @@ -6,7 +6,6 @@ import com.gauss.common.db.sql.SqlTemplate; import com.gauss.common.model.DbType; import com.gauss.common.model.GaussContext; import com.gauss.common.model.PrepareStatus; -import com.gauss.common.utils.GaussUtils; import com.gauss.common.utils.Quote; import com.gauss.common.utils.thread.NamedThreadFactory; import com.gauss.exception.GaussException; @@ -41,10 +40,10 @@ public class GaussRecordPreparer extends AbstractRecordPreparer { return success; } - public GaussRecordPreparer(GaussContext context, int query_dop) { + public GaussRecordPreparer(DbType srcType, GaussContext context, int query_dop) { this.context = context; this.query_dop = query_dop; - this.srcType = GaussUtils.judgeDbType(context.getSourceDs()); + this.srcType = srcType; Table tableMeta = context.getTableMeta(); this.srcCompareTableName = Quote.join("", Quote.ins.quote(tableMeta.getSchema()), ".", Quote.ins.quote(tableMeta.getName() + "_dataCheckerA"));