diff --git a/common/src/main/java/com/alibaba/datax/common/util/ConfigurationUtil.java b/common/src/main/java/com/alibaba/datax/common/util/ConfigurationUtil.java index cbb8e0767655e9242392105608e597aa5f5997a7..9e4a43d2e09181dbf5a21411c4a7ebf99807747d 100644 --- a/common/src/main/java/com/alibaba/datax/common/util/ConfigurationUtil.java +++ b/common/src/main/java/com/alibaba/datax/common/util/ConfigurationUtil.java @@ -10,7 +10,7 @@ import java.nio.charset.StandardCharsets; public class ConfigurationUtil { public static void persistence(Configuration configuration) throws IOException { - String resume = System.getProperty("datax.resume"); + String resume = System.getProperty("datax.persistence"); if ("true".equalsIgnoreCase(resume)) { configuration.set("core.container.model", "taskGroup"); String json = configuration.toJSON(); @@ -19,7 +19,7 @@ public class ConfigurationUtil { } public static void remove(Configuration configuration) throws IOException { - String resume = System.getProperty("datax.resume"); + String resume = System.getProperty("datax.persistence"); if ("true".equalsIgnoreCase(resume)) { File file = file(configuration); if (file.exists()) { diff --git a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java index e3f05cfb72aa2029bbc099c5dad0f54353502d4e..769780a6e95d443241162257858838c439c352ec 100644 --- a/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java +++ b/core/src/main/java/com/alibaba/datax/core/statistics/container/communicator/taskgroup/StandaloneTGContainerCommunicator.java @@ -5,6 +5,7 @@ import com.alibaba.datax.core.statistics.communication.CommunicationTool; import com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator; import com.alibaba.datax.core.statistics.container.report.ProcessInnerReporter; import com.alibaba.datax.core.statistics.communication.Communication; +import com.alibaba.datax.core.util.container.CoreConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,21 +13,31 @@ import java.util.Map; public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommunicator { - private static final Logger LOG = LoggerFactory - .getLogger(StandaloneTGContainerCommunicator.class); +// private static final Logger LOG = LoggerFactory +// .getLogger(StandaloneTGContainerCommunicator.class); + private Logger LOG = null; public StandaloneTGContainerCommunicator(Configuration configuration, Map taskGroupCommunicationMap) { super(configuration, taskGroupCommunicationMap); super.setReporter(new ProcessInnerReporter(this)); + String jobName = configuration.getString("job.name"); + if (null != jobName) { + LOG = LoggerFactory.getLogger(jobName); + } else { + LOG = LoggerFactory.getLogger(StandaloneTGContainerCommunicator.class); + } } @Override public void report(Communication communication) { super.getReporter().reportTGCommunication(super.taskGroupId, communication); // todo - LOG.info(CommunicationTool.Stringify.getSnapshot(communication)); - reportVmInfo(); + if ("taskGroup".equalsIgnoreCase(configuration + .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))) { + LOG.info(CommunicationTool.Stringify.getSnapshot(communication)); + reportVmInfo(); + } } } diff --git a/dmt/src/main/java/com/gbase8c/dmt/Dmt.java b/dmt/src/main/java/com/gbase8c/dmt/Dmt.java index 104b5769fd6ade331167eada24630f4462c72364..ce379b71171e98d4cc500dfb0ccc89e9bb6811d7 100644 --- a/dmt/src/main/java/com/gbase8c/dmt/Dmt.java +++ b/dmt/src/main/java/com/gbase8c/dmt/Dmt.java @@ -8,17 +8,26 @@ public class Dmt { public static void main(String[] args) throws ParseException { Options options = new Options(); + options.addOption("workdir", true, "work dir."); options.addOption("taskid", true, "task id."); + options.addOption("action", true, "action."); + options.addOption("tables", true, "tables."); DefaultParser parser = new DefaultParser(); CommandLine cl = parser.parse(options, args); String workdir = cl.getOptionValue("workdir"); String taskId = cl.getOptionValue("taskid"); + String action = cl.getOptionValue("action"); + String tables = cl.getOptionValue("tables"); DmtConfig dmtConfig = DmtConfig.read(workdir, taskId); TaskService taskService = new TaskService(dmtConfig); - taskService.start(); + if (!"rbt".equalsIgnoreCase(action)) { + taskService.start(); + } else { + taskService.resumeBt(tables); + } } } diff --git a/dmt/src/main/java/com/gbase8c/dmt/migration/MigrationObjectService.java b/dmt/src/main/java/com/gbase8c/dmt/migration/MigrationObjectService.java index 4054dd0fcccab2757d26ca6e0c54e0e8486fce31..cf09b922d6a9882047cdc81975e02a215f066a42 100644 --- a/dmt/src/main/java/com/gbase8c/dmt/migration/MigrationObjectService.java +++ b/dmt/src/main/java/com/gbase8c/dmt/migration/MigrationObjectService.java @@ -148,7 +148,7 @@ public class MigrationObjectService { .jobId(jobId) .src(src) .tar(tar) - .tableDto(tableDto) +// .tableDto(tableDto) .contents(Lists.newArrayList(JSON.toJSONString(dataxMap))) .json(JSON.toJSONString(dataxMap)) .build()); diff --git a/dmt/src/main/java/com/gbase8c/dmt/model/migration/job/DataxJob.java b/dmt/src/main/java/com/gbase8c/dmt/model/migration/job/DataxJob.java index 5cf0a1933f11aef29304ad19819b5c040cab42df..3a7e029d6d260ac136999d033988c955c956897b 100644 --- a/dmt/src/main/java/com/gbase8c/dmt/model/migration/job/DataxJob.java +++ b/dmt/src/main/java/com/gbase8c/dmt/model/migration/job/DataxJob.java @@ -20,7 +20,7 @@ public class DataxJob extends MigrationObject { private Long jobId; private DataSourceDto src; private DataSourceDto tar; - private TableDto tableDto; +// private TableDto tableDto; private String json; @Override diff --git a/dmt/src/main/java/com/gbase8c/dmt/service/TaskService.java b/dmt/src/main/java/com/gbase8c/dmt/service/TaskService.java index ccea6a7f0a6ee2cb39a83084932defc6be161171..d758a0daea98135ec72433a512eaef54324cc57e 100644 --- a/dmt/src/main/java/com/gbase8c/dmt/service/TaskService.java +++ b/dmt/src/main/java/com/gbase8c/dmt/service/TaskService.java @@ -1,5 +1,8 @@ package com.gbase8c.dmt.service; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.Engine; +import com.alibaba.datax.core.util.ConfigParser; import com.gbase8c.dmt.config.DmtConfig; import com.gbase8c.dmt.dao.*; import com.gbase8c.dmt.dao.file.*; @@ -22,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; +import java.io.File; import java.util.*; import java.util.stream.Collectors; @@ -80,6 +84,60 @@ public class TaskService { start; } + private String fileDir() { + String workdir = dmtConfig.getWorkDir(); + String id = dmtConfig.getTaskId(); + return StringUtils.joinWith(File.separator, workdir, id, "configuration"); + } + + private String file(String jsonName) { + String workdir = dmtConfig.getWorkDir(); + String id = dmtConfig.getTaskId(); + jsonName = StringUtils.trim(jsonName); + return StringUtils.joinWith(File.separator, workdir, id, "configuration", jsonName); + } + + public void resumeBt(String tables) { + List configurations = Lists.newArrayList(); + String[] ts = null; + if (StringUtils.isNotBlank(tables)) { + ts = tables.split(","); + for (int i=0; i< ts.length; i++) { + ts[i] = ts[i] + ".json"; + } + } else { + File dir = new File(fileDir()); + ts = dir.list(); + } + if (ts != null) { + configurations = Arrays.stream(ts).map(this::file) + .map(ConfigParser::parse) + .collect(Collectors.toList()); + } + + Flowable.fromIterable(configurations) + .observeOn(Schedulers.io()) + .parallel() + .runOn(Schedulers.io()) + .doOnNext(this::start) + .sequential() + .collect(Collectors.toList()) + .blockingSubscribe( + list -> { + log.info("断点续传成功"); + }, + error -> { + log.error("断点续传出错", error); + } + ); + + } + + public void start(Configuration configuration) { + Engine engine = new Engine(); + engine.start(configuration); + } + public void start() { String id = dmtConfig.getTaskId(); Task task = taskDao.get(id); diff --git a/dmt/src/test/java/com/gbase8c/dmt/Test.java b/dmt/src/test/java/com/gbase8c/dmt/Test.java index 52194698e58b5b8cef8099dcd0947bcade12f749..2e246d4f70c2eb50eb32005970435bdfe51dc999 100644 --- a/dmt/src/test/java/com/gbase8c/dmt/Test.java +++ b/dmt/src/test/java/com/gbase8c/dmt/Test.java @@ -8,11 +8,11 @@ public class Test { public static void main(String[] args) {; System.setProperty("datax.home", "D:/ora-migration-tool/target/ora-migration-tool/ora-migration-tool"); - System.setProperty("datax.resume", "true"); + System.setProperty("datax.persistence", "true"); // Configuration configuration = ConfigParser.parse("D:/workdir/1/configuration/TEST.T_TABLE.json"); - Configuration configuration = ConfigParser.parse("D:/workdir/ori.json"); - configuration.set("core.container.model", "taskGroup"); + Configuration configuration = ConfigParser.parse("D:/workdir/test.json"); +// configuration.set("core.container.model", "taskGroup"); Engine engine = new Engine(); engine.start(configuration); } diff --git a/userGuid.md b/userGuid.md index 0c4aae54387bfd6b2ab009e683b56e9557b57ec0..bafb4e30b855dba1bc6382de66474abfa7d62ab9 100644 --- a/userGuid.md +++ b/userGuid.md @@ -70,7 +70,11 @@ * 第四步:启动迁移 ``` shell - java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${ora-migration-tool.home}/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=${ora-migration-tool.home} -Ddatax.resume=true -Dlogback.configurationFile=${ora-migration-tool.home}/conf/logback.xml -classpath ${ora-migration-tool.home}\lib\* com.gbase8c.dmt.Dmt -taskid ${tasked} -workdir ${workdir} + windows: + java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${ora-migration-tool.home}/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=${ora-migration-tool.home} -Ddatax.persistence=true -Dlogback.configurationFile="${ora-migration-tool.home}/conf/logback.xml" -classpath "${ora-migration-tool.home}/lib/*” com.gbase8c.dmt.Dmt -taskid ${tasked} -workdir ${workdir} [-action rbt] [-tables schema.tn1,schema.tn2...] + + 其它(mac/linux): + java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${ora-migration-tool.home}/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=${ora-migration-tool.home} -Ddatax.persistence=true -Dlogback.configurationFile="${ora-migration-tool.home}/conf/logback.xml" -classpath "${ora-migration-tool.home}/lib/*:.” com.gbase8c.dmt.Dmt -taskid ${tasked} -workdir ${workdir} [-action rbt] [-tables schema.tn1,schema.tn2...] ``` * 配置文件说明