如何排查基于chunjun纯钧的增量数据同步问题?

摘要:基于chunjun纯钧的增量数据同步 目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步 chu
基于chunjun纯钧的增量数据同步 目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步 chunjun的官网文档对增量同步已经做出了一定的说明 纯钧官方 根据文档我编写了一个SQL脚本 create table `source` ( `sfzh` STRING COMMENT '', `xm` STRING COMMENT '', `xb` STRING COMMENT '', `xbdm` STRING COMMENT '', `jzdz` STRING COMMENT '', `fzrq` DATE COMMENT '', `dsc_biz_record_id` STRING COMMENT '' ) with ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true', 'table-name' = '', 'username' = '', 'password' = '', 'scan.fetch-size' = '1024', 'scan.increment.column' = 'fzrq', --'scan.increment.column-type' = 'date', 'scan.start-location' = '1659974400000' ); create table `sink` ( `sfzh` STRING COMMENT '', `xm` STRING COMMENT '', `xb` STRING COMMENT '', `xbdm` STRING COMMENT '', `jzdz` STRING COMMENT '', `fzrq` DATE COMMENT '', `dsc_biz_record_id` STRING COMMENT '', PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED ) with ( 'connector' = 'stream-x' ); 然后提交任务的时候发现已经记录了start-location 和 start-location的指标信息了,但是并没有上报到Prometheus! 在本地调试源码解决问题的大致过程 在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量 /** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */ protected transient CustomReporter customReporter; 该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法 @Override public void openInputFormat() throws IOException { Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables(); if (vars != null) { jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName"); jobId = vars.get(Metrics.JOB_NAME); indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX)); } LOG.info("是否使用自定义报告 {}", useCustomReporter());
阅读全文