我是靠谱客的博主 强健流沙,最近开发中收集的这篇文章主要介绍基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 功能介绍:实现将上一篇中推送到kafka上的mysql-binlog日志信息,flink流处理入库到mysql

一、kafka消费者代码简介

(1)调用者KafkaConsumerInvoke代码这里就不讲了,和上一篇类似的

(2)消费者KafkaConsumerService代码示例

@Component
@Scope("prototype")
public class KafkaConsumerService {
    private static final Logger PRINT = LoggerFactory.getLogger(KafkaConsumerService.class);

    private static final String V_0_10 = "0.10"; // sdk版本,0.10及以上版的kafka

    private static final String MAIN_CLASS_NAME = "com.ztesoft.zstream.Boot"; // 主类名称

    private static final String ZK_CONFIG = "config.properties"; // zk 配置文件

    private static final String KTS_LOG_PATH = "ktslog"; // 实时日志路径

    private static final String CLUSTER_CDH5_8_4 = "cdh5.8.4"; // 默认的hbase集群版本

    private static final String RUM_MODE_SPARK = "spark"; // 运行模式

    private KtsElement ktsElement;

    @Autowired
    private DataIntegWorkitemMapper dataIntegWorkitemMapper;

    public void init(KtsElement ktsElement) {
        this.ktsElement = ktsElement;
    }

    /**
     * 加载jar包(具体kafka入库mysql是放在单独的一个服务处理的,那个服务打包成jar部署在服务器上)
     */
    public void uploadJar() {
        String kafkaVersion = ktsElement.getKafkaVersion(); // kafka版本
        if (StringUtils.isEmpty(kafkaVersion)) {
            kafkaVersion = V_0_10; // 默认使用0.10以上的版本
        }
        if (DataSource.KAFKA.equalsIgnoreCase(ktsElement.getOriginDatasourceType())) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Use sdk kafka version is " + kafkaVersion, PRINT);
        }

        // 识别集群版本,默认**匹配第一个
        String clusterVersion = "**";
        if (ktsElement.getTargetDataSource() != null
            && DataSource.HBASE.equals(ktsElement.getTargetDataSource().getDatasourceType())) {
            Map<String, String> clusterInfo = ClusterInfoUtil.getClusterInfo(ktsElement.getTargetDataSource());
            clusterVersion = clusterInfo.get("clusterVersion");
        }
        if (StringUtils.isEmpty(clusterVersion) || "**".equals(clusterVersion)) {
            clusterVersion = CLUSTER_CDH5_8_4;
        }

        // 上传sdk jar包 防止多个分片同时去上传jar包
        String taskInstId = ktsElement.getTaskInstId();
        DataIntegWorkitemExample example = new DataIntegWorkitemExample();
        example.createCriteria().andTaskInstIdEqualTo(Integer.parseInt(taskInstId));
        List<DataIntegWorkitem> dataIntegWorkitems = dataIntegWorkitemMapper.selectByExample(example);
        Boolean isSplitWorkitem = dataIntegWorkitems.size() > 1 ? true : false;
        SSHHelper.uploadResource(ktsElement.getSshDataSource(), "kafka/" + clusterVersion + "/" + kafkaVersion,
            PlugInvokeConstant.KTS_JAR, ktsElement.getPath(), isSplitWorkitem, ktsElement.getWorkitemId(), PRINT);
    }

    /**
     * 预创建任务路径
     */
    public void preOperation() {
        SAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT, ktsElement.getTaskPath());
        // 创建实时日志文件路径:接口机路径 + ktslog + 任务ID
        SAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT,
            FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()) + KTS_LOG_PATH + "/"
                + ktsElement.getTaskId());
        SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
    }

    /**
     * 加载映射关系jaon,这个json是映射关系,后期功能演示时会说明
     */
    public void uploadJson() {
        // 先删除后上传
        try {
            SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,
                ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_JSON);
        }
        catch (Exception e) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator
                + PlugInvokeConstant.KTS_JSON + "error :" + e.getMessage(), PRINT);
        }

        try {
            boolean flag = SSHHelper.transferFile(ktsElement.getDtoJson().getBytes("UTF-8"),
                PlugInvokeConstant.KTS_JSON, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);
            if (flag) {
                SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
            }
            else {
                throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_JSON);
            }
        }
        catch (Exception e) {
            throw new SyDevOpsRuntimeException(e.getMessage(), e);
        }
    }

    /**
     * 加载shell脚本信息,上边将了具体入库功能是放在单独一个服务处理,打包成了jar,在这里通过shell执行jar处理
     */
    public void uploadShell() {
        // config.properties如果不存在就需要拷贝进去,如果存在就不要覆盖
        StringBuffer configFilePath = new StringBuffer();
        configFilePath.append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);
        boolean configExist;
        try {
            SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, configFilePath.toString());
            configExist = true;
        }
        catch (Exception e) {
            configExist = false;
        }

        if (!configExist) {
            try {
                StringBuffer cpConfigCmd = new StringBuffer();
                cpConfigCmd.append("\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ")
                    .append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);
                SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cpConfigCmd.toString());
            }
            catch (Exception e) {
                throw new SyDevOpsRuntimeException("Failed to cp " + ZK_CONFIG, e);
            }
        }

        // 根据运行模式来处理
        String runMode = ktsElement.getRunMode();

        if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {
            // 集群模式需要提前将脚本放到sdk目录下
            StringBuffer shellPathBuf = new StringBuffer();
            shellPathBuf.append(ktsElement.getPath()).append(File.separator)
                .append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);

            try {
                SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, shellPathBuf.toString());
            }
            catch (Exception e) {
                throw new SyDevOpsRuntimeException("Failed to find " + shellPathBuf.toString(), e);
            }
        }
        else {

            /**
             * <pre>
             * #!/bin/sh
             * cp -f /home/dhpuser/sdkdir/config.properties /home/dhpuser/sdkdir/38327/config.properties
             * java -cp /home/dhpuser/sdkdir/KTSSubscriber.jar com.ztesoft.zstream.Boot /home/dhpuser/sdkdir/38327/ 2>&1 &
             * echo $! > /home/dhpuser/sdkdir/38327/KTSSubscriber.pid
             * </pre>
             */
            StringBuffer shell = new StringBuffer();
            shell.append("#!/bin/sh n");
            // // 需要将zk配置文件copy到启动目录下
            // shell.append("\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ")
            // .append(ktsElement.getTaskPath()).append(ZK_CONFIG).append("n");

            if (ktsElement.isSplit()) {
                shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ")
                    .append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ")
                    .append(ktsElement.getTaskPath()).append(" ").append(ktsElement.getTaskInstId());
            }
            else {
                shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ")
                    .append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ")
                    .append(ktsElement.getTaskPath());
            }
            shell.append(" 2>&1 & n");
            shell.append("echo $! > ").append(ktsElement.getKtsPidPath()).append("n");

            // 先删除后上传
            try {
                SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,
                    ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_START_SHELL);
            }
            catch (Exception e) {
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator
                    + PlugInvokeConstant.KTS_START_SHELL + "error :" + e.getMessage(), PRINT);
            }

            try {
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Upload KTSstart.sh: " + shell.toString(),
                    PRINT);
                boolean flag = SSHHelper.transferFile(shell.toString().getBytes("UTF-8"),
                    PlugInvokeConstant.KTS_START_SHELL, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);
                if (flag) {
                    SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
                }
                else {
                    throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_START_SHELL);
                }
            }
            catch (Exception e) {
                throw new SyDevOpsRuntimeException(e.getMessage(), e);
            }
        }
    }

    /**
     * 远程调用jar包
     */
    public void executeStartShell() {
        // 根据运行模式来处理
        String runMode = ktsElement.getRunMode();

        if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {
            /**
             * <pre>
             * 参数信息:
             * confPath=$1 工作目录
             * logPath=$2  日志目录
             * pidPath=$3  进程号文件
             * sdkArgument=$4 jvm参数、环境变量等(启动参数)
             *
             * cd /opt/testSDK/;nohup ./KTSSparksubmit.sh /opt/testSDK/4642 /tmp/ditagent/ktslog/4642/KTSSubscriber.log /opt/testSDK/4642/KTSSubscriber.pid  > /opt/testSDK/4642/KTSSparksubmit.log 2>&1 &
             * </pre>
             */
            // 实时任务运行日志
            String param1 = ktsElement.getTaskPath();
            StringBuffer logPathBuf = new StringBuffer();
            logPathBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()))
                .append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator)
                .append(PlugInvokeConstant.KTS_LOG);
            String param2 = logPathBuf.toString();
            String param3 = ktsElement.getKtsPidPath();
            String param4 = ktsElement.getSdkArgument();

            // spark-submit的日志
            StringBuffer sparkSubmitLogBuf = new StringBuffer();
            sparkSubmitLogBuf.append(ktsElement.getTaskPath()).append(File.separator)
                .append(PlugInvokeConstant.KTS_SPARK_SUBMIT_LOG);

            String path = ktsElement.getPath();
            StringBuffer cmdBuf = new StringBuffer();
            cmdBuf.append("cd ").append(path).append(";");
            cmdBuf.append("nohup ./").append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);
            cmdBuf.append(" ").append(param1).append(" ").append(param2).append(" ").append(param3).append(" ")
                .append(param4);
            cmdBuf.append(" > ").append(sparkSubmitLogBuf).append(" 2>&1 &");
            try {
                SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);
            }
            catch (Exception e) {
                throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL, e);
            }
        }
        else {
            /**
             * cd /home/dhpuser/sdkdir/38327/;nohup ./KTSstart.sh > /dev/null 2>&1 &
             */
            String path = FileUtils.transformationPath(ktsElement.getTaskPath());
            StringBuffer cmdBuf = new StringBuffer();
            cmdBuf.append("cd ").append(path).append(";nohup ./").append(PlugInvokeConstant.KTS_START_SHELL)
                .append(" > /dev/null 2>&1 &");
            try {
                SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);
            }
            catch (Exception e) {
                throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_START_SHELL, e);
            }
            finally {
                SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT, path + PlugInvokeConstant.KTS_START_SHELL);
            }
        }
    }

    public ComponentResult monitor() {
        // PID=$(cat /KTSSubscriber.pid) ps -ef|grep -v grep|grep 'xx/xxx/xxxx'
        StringBuffer cmd = new StringBuffer();
        cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") n");
        cmd.append("ps -ef|grep -v grep|grep $PID");

        String process = null;
        try {
            process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
        }
        catch (Exception e) {
            PRINT.error("Monitor error 1:" + e.getMessage(), e);
        }

        // 如果pid不存在,不一定是进程被杀了,可能是网络原因,也可能是grep的问题
        if (StringUtils.isEmpty(process)) {
            // 网络原因,先休息一会
            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                PRINT.error("Thread.sleep InterruptedException:" + e.getMessage(), e);
            }

            try {
                process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
            }
            catch (Exception e) {
                PRINT.error("Monitor error 2:" + e.getMessage(), e);
            }
            if (StringUtils.isEmpty(process)) {
                // 还不行就可能是grep的问题,换个命令试一试
                // ps -ef|grep -v grep|grep 'xx/xxx/xxxx'
                cmd = new StringBuffer();
                cmd.append("ps -ef|grep -v grep|grep '").append(ktsElement.getTaskPath()).append("'");
                try {
                    process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
                }
                catch (Exception e) {
                    PRINT.error("Monitor error 3:" + e.getMessage(), e);
                }
            }
        }

        if (StringUtils.isEmpty(process)) {
            throw new SyDevOpsRuntimeException(
                "The process of KTSSubscriber is stopped! Task Id is " + ktsElement.getTaskId());
        }
        else {
            StringBuffer logFileBuf = new StringBuffer();
            logFileBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()))
                .append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator)
                .append(PlugInvokeConstant.KTS_LOG);

            String result = null;
            String tailCmd = "tail -50 " + logFileBuf.toString();
            // 日志压缩的时候会cat不到日志,这里必须异常捕获一下
            try {
                result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, tailCmd);
            }
            catch (Exception e) {
                result = "";
            }

            ComponentResult readerResult = new ReaderResult();
            readerResult.setInvokeResult(ComponentResult.RUNNING);
            readerResult.setInvokeMsg(tailCmd + "n" + result);
            return readerResult;
        }

    }

    public void stop(boolean ignore) {
        boolean isExists = true;
        try {
            SAScriptHelper.find(ktsElement.getSshDataSource(), null, ktsElement.getKtsPidPath());
            PRINT.info(ktsElement.getKtsPidPath());
        }
        catch (Exception e) {
            isExists = false;
        }

        /**
         * <pre>
         *   PID=$(cat /KTSSubscriber.pid)
         *   kill -9 $PID
         *   pkill -f xx/xxx/xxxx
         * </pre>
         */
        StringBuffer cmd = new StringBuffer();
        if (isExists) {
            cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") n");
            cmd.append("kill -9 $PID; n");
        }
        cmd.append("pkill -f ").append(ktsElement.getTaskPath());

        try {
            String result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
            PRINT.info(result);
        }
        catch (Exception e) {
            if (ObjectUtils.getStackTraceAsString(e).indexOf("Operation not permitted") != -1 || !ignore) {
                throw new SyDevOpsRuntimeException(e.getMessage(), e);
            }
        }
    }

    public ComponentResult getRunningResult() {
        ComponentResult readerResult = new ReaderResult();
        readerResult.setInvokeResult(ComponentResult.RUNNING);
        readerResult.setInvokeMsg("success");
        return readerResult;
    }

    public ComponentResult getStopResult() {
        ComponentResult readerResult = new ReaderResult();
        readerResult.setInvokeResult(ComponentResult.SUCCESS);
        readerResult.setInvokeMsg("success");
        return readerResult;
    }

    /**
     * 是否因为版本原因需要重新上传sdk包,默认不替换
     * 
     * @param sdkPath
     * @return
     */
    public boolean isNeedReplaceSDKForVersion(String sdkPath) {
        // 需要兼容以前的sdk包,考虑没有版本信息的情况
        try {
            String resourcesVersionCode = getVersionCodeForResources(sdkPath);
            if (StringUtils.isEmpty(resourcesVersionCode)) {
                // resources下的jar都没有版本信息,没必要继续执行
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources sdk not have versionCode.", PRINT);
                return false;
            }

            String serverVersionCode = getVersionCodeForServer();

            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources versionCode = " + resourcesVersionCode,
                PRINT);
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Server versionCode = " + serverVersionCode, PRINT);

            // 如果resources目录下的sdk包的版本比服务器端的大,那么返回true,代表需要替换服务器端的sdk包
            if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isNotEmpty(serverVersionCode)) {
                if (Long.parseLong(resourcesVersionCode) > Long.parseLong(serverVersionCode)) {
                    return true;
                }
            }
            else {
                // 如果resources目录下的sdk包有版本信息,而服务器端的没有,也需要替换。这表示服务器端使用的sdk包是旧的
                if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isEmpty(serverVersionCode)) {
                    // 我重新评估了一下,这风险有点高……
                    // return true;
                }
            }
        }
        catch (Exception e) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not compare the sdk version", PRINT);
        }

        return false;
    }

    /**
     * 读取Resources目录的sdk版本号
     * 
     * @param sdkPath
     * @return
     */
    private String getVersionCodeForResources(String sdkPath) {
        try {
            ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); // 获取容器资源解析器
            Resource[] resources = resolver.getResources(sdkPath);
            for (Resource resource : resources) {
                if (PlugInvokeConstant.KTS_JAR.equalsIgnoreCase(resource.getFilename())) {
                    File sdkFile = resource.getFile();
                    JarFile jarFile = null;
                    try {
                        jarFile = new JarFile(sdkFile);
                        Attributes attributes = jarFile.getManifest().getMainAttributes();
                        return attributes.getValue("versionCode"); // 版本号(yyyyMMddHHmmss)
                    }
                    catch (Exception e) {
                        throw new SyDevOpsRuntimeException(e.getMessage(), e);
                    }
                    finally {
                        if (jarFile != null) {
                            jarFile.close();
                        }
                    }
                }
            }
        }
        catch (RuntimeException e) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",
                PRINT);
        }
        catch (Exception e) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",
                PRINT);
        }

        return null;
    }

    /**
     * 读取服务器端的sdk的版本信息
     * 
     * @return
     */
    private String getVersionCodeForServer() {
        try {
            String versionFilePath = ktsElement.getPath() + File.separator + PlugInvokeConstant.KTS_VERSION;
            String version = SAScriptHelper.cat(ktsElement.getSshDataSource(), PRINT, versionFilePath);
            if (StringUtils.isNotEmpty(version)) {
                version = version.replaceAll("n", "");
            }
            return version;
        }
        catch (Exception e) {
            CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the server sdk version info", PRINT);
        }

        return null;
    }

    /**
     * 写入sdk的版本信息,与sdk同级目录
     * 
     * @param versionInfo
     */
    public void writeSDKVersionFile(String versionInfo) {
        if (StringUtils.isNotEmpty(versionInfo)) {
            try {
                StringBuffer echoVersionCmd = new StringBuffer();
                echoVersionCmd.append("echo ").append(""").append(versionInfo).append(""").append(" > ")
                    .append(ktsElement.getPath()).append(File.separator).append(PlugInvokeConstant.KTS_VERSION);
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,
                    "Create sdk version file: " + echoVersionCmd.toString(), PRINT);
                SAScriptHelper.shell(ktsElement.getSshDataSource(), null, echoVersionCmd.toString());
            }
            catch (Exception e) {
                CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,
                    "Write to " + PlugInvokeConstant.KTS_VERSION + " error", PRINT);
            }
        }
    }
}

二、Flink对接对接实时计算平台(打成了jar)处理

Flink就不介绍了,开发是有分工的,大数据处理交给大数据工程师弄

最后

以上就是强健流沙为你收集整理的基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理的全部内容,希望文章能够帮你解决基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部