如何手动终止DataX启动后的异源数据同步任务?

摘要:开心一刻 刚刚和老婆吵架,气到不行,想离婚女儿突然站出来劝解道:难道你们就不能打一顿孩子消消气,非要闹离婚吗?我和老婆同时看向女儿,各自挽起了衣袖女儿补充道:弟弟那么小,打他,他又不会记仇 需求背景 项目基于 DataX 来实现异源之间的数
开心一刻 刚刚和老婆吵架,气到不行,想离婚 女儿突然站出来劝解道:难道你们就不能打一顿孩子消消气,非要闹离婚吗? 我和老婆同时看向女儿,各自挽起了衣袖 女儿补充道:弟弟那么小,打他,他又不会记仇 需求背景 项目基于 DataX 来实现异源之间的数据离线同步,我对 Datax 进行了一些梳理与改造 异构数据源同步之数据同步 → datax 改造,有点意思 异构数据源同步之数据同步 → datax 再改造,开始触及源码 异构数据源同步之数据同步 → DataX 使用细节 异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密 异源数据同步 → DataX 为什么要支持 kafka? 异源数据同步 → 如何获取 DataX 已同步数据量? 本以为离线同步告一段落,不会再有新的需求,可打脸来的非常快,产品经理很快找到我,说了如下一段话 昨天我在测试开发环境试用了一下离线同步功能,很好的实现了我提的需求,给你点赞! 但是使用过程中我遇到个情况,有张的表的数据量很大,一开始我没关注其数据量,所以配置了全量同步,启动同步后迟迟没有同步完成,我才意识到表的数据量非常大,一查才知道 2 亿多条数据,我想终止同步却发现没有地方可以进行终止操作 所以需要加个功能:同步中的任务可以进行终止操作 这话术算是被产品经理给玩明白了,先对我进行肯定,然后指出使用中的痛点,针对该痛点提出新的功能,让我一点反驳的余地都没有;作为一个讲道理的开发人员,面对一个很合理的需求,我们还是很乐意接受的,你们说是不是? 需求一接,问题就来了 如何终止同步 思考这个问题之前,我们先来回顾下 DataX 的启动;还记得我们是怎么集成 DataX 的吗,异构数据源同步之数据同步 → datax 再改造,开始触及源码 中有说明,新增 qsl-datax-hook 模块,该模块中通过命令 Process process = Runtime.getRuntime().exec(realCommand); realCommand 就是启动 DataX 的 java 命令,类似 java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json 来启动 DataX,也就是给 DataX 单独启动一个 java 进程;那么如何停止 DataX,思路是不是就有了?问题是不是就转换成了 如何终止 java 进程 终止进程 如何终止进程,这个我相信你们都会 Linux:kill -9 pid Win:cmd.exe /c taskkill /PID pid /F /T 但这有个前提,需要知道 DataX 的 java 进程的 pid,而 JDK8 中 Process 的方法如下 是没有提供获取 pid 的方法,在不调整 JDK 版本的情况下,我们如何获取 DataX 进程的 pid?不同的操作系统获取方式不一样,我们分别对 Linux 和 Win 进行实现 Linux 实现就比较简单了,仅仅基于 JDK 就可以实现 Field field = process.getClass().getDeclaredField("pid"); field.setAccessible(true); int pid = field.getInt(process); 通过反射获取 process 实现类的成员变量 pid 的值;这段代码,你们应该都能看懂吧 Win Win 系统下,则需要依赖第三方工具 oshi <dependency> <groupId>com.github.oshi</groupId> <artifactId>oshi-core</artifactId> <version>6.6.5</version> </dependency> 获取 pid 实现如下 Field field = process.getClass().getDeclaredField("handle"); field.setAccessible(true); long handle = field.getLong(process); WinNT.HANDLE winntHandle = new WinNT.HANDLE(); winntHandle.setPointer(Pointer.createConstant(handle)); int pid = Kernel32.INSTANCE.GetProcessId(winntHandle); 同样用到了反射,还用到了 oshi 提供的方法 合并起来即得到获取 pid 的方法 /** * 获取进程ID * @param process 进程 * @return 进程id,-1表示获取失败 * @author 青石路 */ public static int getProcessId(Process process) { int pid = NULL_PROCESS_ID; Field field; if (Platform.isWindows()) { try { field = process.getClass().getDeclaredField("handle"); field.setAccessible(true); long handle = field.getLong(process); WinNT.HANDLE winntHandle = new WinNT.HANDLE(); winntHandle.setPointer(Pointer.createConstant(handle)); pid = Kernel32.INSTANCE.GetProcessId(winntHandle); } catch (Exception e) { LOGGER.error("获取进程id失败,异常信息:", e); } } else if (Platform.isLinux() || Platform.isAIX()) { try { field = process.getClass().getDeclaredField("pid"); field.setAccessible(true); pid = field.getInt(process); } catch (Exception e) { LOGGER.error("获取进程id失败,异常信息:", e); } } LOGGER.info("进程id={}", pid); return pid; } 得到的 pid 是不是正确的,我们是不是得验证一下?写个 mainClass /** * mainClass * @author 青石路 */ public class HookMain { public static void main(String[] args) throws Exception { String command = ""; if (Platform.isWindows()) { command = "ping -n 1000 localhost"; } else if (Platform.isLinux() || Platform.isAIX()) { command = "ping -c 1000 localhost"; } Process process = Runtime.getRuntime().exec(command); int processId = ProcessUtil.getProcessId(process); System.out.println("ping 进程id = " + processId); new Thread(() -> { try (BufferedReader reader = new BufferedReader( new InputStreamReader(process.getInputStream(), System.getProperty("sun.jnu.encoding")))) { String line; while ((line = reader.readLine()) != null) { System.out.println(line); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); } } 利用 maven 打包成可执行 jar 包 <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.qsl.hook.HookMain</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>target/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> 然后执行 jar java -jar qsl-datax-hook-0.0.1-SNAPSHOT.jar 我们来看下输出结果 Linux jar 输出日志如下 我们 ps 下进程 ps -ef|grep ping Win jar 输出日志如下 我们再看下任务管理器的 ping 进程 可以看出,不管是 Linux 还是 Win,得到的 pid 都是正确的;得到 pid 后,终止进程就简单了 /** * 终止进程 * @param pid 进程的PID * @return true:成功,false:失败 */ public static boolean killProcessByPid(int pid) { if (NULL_PROCESS_ID == pid) { LOGGER.error("pid[{}]异常", pid); return false; } String command = "kill -9 " + pid; boolean result; if (Platform.isWindows()) { command = "cmd.exe /c taskkill /PID " + pid + " /F /T "; } Process process = null; try { process = Runtime.getRuntime().exec(command); } catch (IOException e) { LOGGER.error("终止进程[pid={}]异常:", pid, e); return false; } try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { //杀掉进程 String line; while ((line = reader.readLine()) != null) { LOGGER.info(line); } result = true; } catch (Exception e) { LOGGER.error("终止进程[pid={}]异常:", pid, e); result = false; } finally { if (!Objects.isNull(process)) { process.destroy(); } } return result; } 完整流程应该是 使用 Runtime.getRuntime().exec(java命令) 启动 DataX,并获取到 Process java 命令指的是启动 DataX 的 java 命令,例如 java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json 通过 ProcessUtil#getProcessId 获取 Process 的 pid,并与同步任务信息绑定进行持久化 通过任务id 可以查询到对应的 pid 触发任务 终止,通过任务id找到对应的 pid,通过 ProcessUtil#killProcessByPid 终止进程 终止了进程也就终止了同步任务 如果 qsl-datax-hook 是单节点,上述处理方案是没有问题的,但生产环境下,qsl-datax-hook 不可能是单节点,肯定是集群部署,那么上述方案就行不通了,为什么呢?我举个例子 假设 qsl-datax-hook 有 2 个节点:A、B,在 A 节点上启动 DataX 同步任务(taskId = 666)并得到对应的 pid = 1488,终止任务 666 的请求被负载均衡到 B 节点,会发生什么情况 B 节点上没有 pid = 1488 进程,那么终止失败,A、B 节点都不受影响 B 节点上有 pid = 1488 进程,这个进程可能是 DataX 同步任务进程,也可能是其他进程,那么这个终止操作就会产生可轻可重的故障了! 然而需要终止的同步任务却还在 A 节点上安然无恙的执行着 所以集群模式下,我们不仅需要将 pid 与任务进行绑定,还需要将任务执行的节点信息也绑定进来,节点信息可以是 节点ID,也可以是 节点IP,只要能唯一标识节点就行;具体实现方案,需要结合具体的负载均衡组件来做设计,由负载均衡组件将任务终止请求分发到正确的节点上,而不能采用常规的负载均衡策略进行分发了;因为负载均衡组件很多,所以实现方案没法统一设计,需要你们结合自己的项目去实现,我相信对你们来说很简单 总结 任务的启动方式不同,终止方式也会有所不同,如何优雅的终止,是我们需要考虑的重点 直接杀进程的方式,简单粗暴,但不够优雅,一旦错杀,问题可大可小,如果有其他方式,不建议选择该方式 适用单节点的终止方式不一定适用于集群,大家设计方案的时候一定要做全方位的考虑 示例代码:qsl-datax-hook