我是靠谱客的博主 强健流沙,最近开发中收集的这篇文章主要介绍基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
功能介绍:实现将上一篇中推送到kafka上的mysql-binlog日志信息,flink流处理入库到mysql
一、kafka消费者代码简介
(1)调用者KafkaConsumerInvoke代码这里就不讲了,和上一篇类似的
(2)消费者KafkaConsumerService代码示例
@Component
@Scope("prototype")
public class KafkaConsumerService {
private static final Logger PRINT = LoggerFactory.getLogger(KafkaConsumerService.class);
private static final String V_0_10 = "0.10"; // sdk版本,0.10及以上版的kafka
private static final String MAIN_CLASS_NAME = "com.ztesoft.zstream.Boot"; // 主类名称
private static final String ZK_CONFIG = "config.properties"; // zk 配置文件
private static final String KTS_LOG_PATH = "ktslog"; // 实时日志路径
private static final String CLUSTER_CDH5_8_4 = "cdh5.8.4"; // 默认的hbase集群版本
private static final String RUM_MODE_SPARK = "spark"; // 运行模式
private KtsElement ktsElement;
@Autowired
private DataIntegWorkitemMapper dataIntegWorkitemMapper;
public void init(KtsElement ktsElement) {
this.ktsElement = ktsElement;
}
/**
* 加载jar包(具体kafka入库mysql是放在单独的一个服务处理的,那个服务打包成jar部署在服务器上)
*/
public void uploadJar() {
String kafkaVersion = ktsElement.getKafkaVersion(); // kafka版本
if (StringUtils.isEmpty(kafkaVersion)) {
kafkaVersion = V_0_10; // 默认使用0.10以上的版本
}
if (DataSource.KAFKA.equalsIgnoreCase(ktsElement.getOriginDatasourceType())) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Use sdk kafka version is " + kafkaVersion, PRINT);
}
// 识别集群版本,默认**匹配第一个
String clusterVersion = "**";
if (ktsElement.getTargetDataSource() != null
&& DataSource.HBASE.equals(ktsElement.getTargetDataSource().getDatasourceType())) {
Map<String, String> clusterInfo = ClusterInfoUtil.getClusterInfo(ktsElement.getTargetDataSource());
clusterVersion = clusterInfo.get("clusterVersion");
}
if (StringUtils.isEmpty(clusterVersion) || "**".equals(clusterVersion)) {
clusterVersion = CLUSTER_CDH5_8_4;
}
// 上传sdk jar包 防止多个分片同时去上传jar包
String taskInstId = ktsElement.getTaskInstId();
DataIntegWorkitemExample example = new DataIntegWorkitemExample();
example.createCriteria().andTaskInstIdEqualTo(Integer.parseInt(taskInstId));
List<DataIntegWorkitem> dataIntegWorkitems = dataIntegWorkitemMapper.selectByExample(example);
Boolean isSplitWorkitem = dataIntegWorkitems.size() > 1 ? true : false;
SSHHelper.uploadResource(ktsElement.getSshDataSource(), "kafka/" + clusterVersion + "/" + kafkaVersion,
PlugInvokeConstant.KTS_JAR, ktsElement.getPath(), isSplitWorkitem, ktsElement.getWorkitemId(), PRINT);
}
/**
* 预创建任务路径
*/
public void preOperation() {
SAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT, ktsElement.getTaskPath());
// 创建实时日志文件路径:接口机路径 + ktslog + 任务ID
SAScriptHelper.mkdir(ktsElement.getSshDataSource(), PRINT,
FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()) + KTS_LOG_PATH + "/"
+ ktsElement.getTaskId());
SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
}
/**
* 加载映射关系jaon,这个json是映射关系,后期功能演示时会说明
*/
public void uploadJson() {
// 先删除后上传
try {
SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,
ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_JSON);
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator
+ PlugInvokeConstant.KTS_JSON + "error :" + e.getMessage(), PRINT);
}
try {
boolean flag = SSHHelper.transferFile(ktsElement.getDtoJson().getBytes("UTF-8"),
PlugInvokeConstant.KTS_JSON, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);
if (flag) {
SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
}
else {
throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_JSON);
}
}
catch (Exception e) {
throw new SyDevOpsRuntimeException(e.getMessage(), e);
}
}
/**
* 加载shell脚本信息,上边将了具体入库功能是放在单独一个服务处理,打包成了jar,在这里通过shell执行jar处理
*/
public void uploadShell() {
// config.properties如果不存在就需要拷贝进去,如果存在就不要覆盖
StringBuffer configFilePath = new StringBuffer();
configFilePath.append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);
boolean configExist;
try {
SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, configFilePath.toString());
configExist = true;
}
catch (Exception e) {
configExist = false;
}
if (!configExist) {
try {
StringBuffer cpConfigCmd = new StringBuffer();
cpConfigCmd.append("\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ")
.append(ktsElement.getTaskPath()).append(File.separator).append(ZK_CONFIG);
SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cpConfigCmd.toString());
}
catch (Exception e) {
throw new SyDevOpsRuntimeException("Failed to cp " + ZK_CONFIG, e);
}
}
// 根据运行模式来处理
String runMode = ktsElement.getRunMode();
if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {
// 集群模式需要提前将脚本放到sdk目录下
StringBuffer shellPathBuf = new StringBuffer();
shellPathBuf.append(ktsElement.getPath()).append(File.separator)
.append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);
try {
SAScriptHelper.find(ktsElement.getSshDataSource(), PRINT, shellPathBuf.toString());
}
catch (Exception e) {
throw new SyDevOpsRuntimeException("Failed to find " + shellPathBuf.toString(), e);
}
}
else {
/**
* <pre>
* #!/bin/sh
* cp -f /home/dhpuser/sdkdir/config.properties /home/dhpuser/sdkdir/38327/config.properties
* java -cp /home/dhpuser/sdkdir/KTSSubscriber.jar com.ztesoft.zstream.Boot /home/dhpuser/sdkdir/38327/ 2>&1 &
* echo $! > /home/dhpuser/sdkdir/38327/KTSSubscriber.pid
* </pre>
*/
StringBuffer shell = new StringBuffer();
shell.append("#!/bin/sh n");
// // 需要将zk配置文件copy到启动目录下
// shell.append("\cp -f ").append(ktsElement.getPath()).append(ZK_CONFIG).append(" ")
// .append(ktsElement.getTaskPath()).append(ZK_CONFIG).append("n");
if (ktsElement.isSplit()) {
shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ")
.append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ")
.append(ktsElement.getTaskPath()).append(" ").append(ktsElement.getTaskInstId());
}
else {
shell.append("java ").append(ktsElement.getSdkArgument()).append(" -cp ")
.append(ktsElement.getKtsJarPath()).append(" ").append(MAIN_CLASS_NAME).append(" ")
.append(ktsElement.getTaskPath());
}
shell.append(" 2>&1 & n");
shell.append("echo $! > ").append(ktsElement.getKtsPidPath()).append("n");
// 先删除后上传
try {
SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT,
ktsElement.getTaskPath() + File.separator + PlugInvokeConstant.KTS_START_SHELL);
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "rm " + ktsElement.getTaskPath() + File.separator
+ PlugInvokeConstant.KTS_START_SHELL + "error :" + e.getMessage(), PRINT);
}
try {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Upload KTSstart.sh: " + shell.toString(),
PRINT);
boolean flag = SSHHelper.transferFile(shell.toString().getBytes("UTF-8"),
PlugInvokeConstant.KTS_START_SHELL, ktsElement.getTaskPath(), ktsElement.getSshDataSource(), null);
if (flag) {
SAScriptHelper.chmodR(ktsElement.getSshDataSource(), PRINT, "a+r+w+x", ktsElement.getTaskPath());
}
else {
throw new SyDevOpsRuntimeException("Failed to upload " + PlugInvokeConstant.KTS_START_SHELL);
}
}
catch (Exception e) {
throw new SyDevOpsRuntimeException(e.getMessage(), e);
}
}
}
/**
* 远程调用jar包
*/
public void executeStartShell() {
// 根据运行模式来处理
String runMode = ktsElement.getRunMode();
if (StringUtils.equalsIgnoreCase(runMode, RUM_MODE_SPARK)) {
/**
* <pre>
* 参数信息:
* confPath=$1 工作目录
* logPath=$2 日志目录
* pidPath=$3 进程号文件
* sdkArgument=$4 jvm参数、环境变量等(启动参数)
*
* cd /opt/testSDK/;nohup ./KTSSparksubmit.sh /opt/testSDK/4642 /tmp/ditagent/ktslog/4642/KTSSubscriber.log /opt/testSDK/4642/KTSSubscriber.pid > /opt/testSDK/4642/KTSSparksubmit.log 2>&1 &
* </pre>
*/
// 实时任务运行日志
String param1 = ktsElement.getTaskPath();
StringBuffer logPathBuf = new StringBuffer();
logPathBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()))
.append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator)
.append(PlugInvokeConstant.KTS_LOG);
String param2 = logPathBuf.toString();
String param3 = ktsElement.getKtsPidPath();
String param4 = ktsElement.getSdkArgument();
// spark-submit的日志
StringBuffer sparkSubmitLogBuf = new StringBuffer();
sparkSubmitLogBuf.append(ktsElement.getTaskPath()).append(File.separator)
.append(PlugInvokeConstant.KTS_SPARK_SUBMIT_LOG);
String path = ktsElement.getPath();
StringBuffer cmdBuf = new StringBuffer();
cmdBuf.append("cd ").append(path).append(";");
cmdBuf.append("nohup ./").append(PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL);
cmdBuf.append(" ").append(param1).append(" ").append(param2).append(" ").append(param3).append(" ")
.append(param4);
cmdBuf.append(" > ").append(sparkSubmitLogBuf).append(" 2>&1 &");
try {
SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);
}
catch (Exception e) {
throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_SPARK_SUBMIT_SHELL, e);
}
}
else {
/**
* cd /home/dhpuser/sdkdir/38327/;nohup ./KTSstart.sh > /dev/null 2>&1 &
*/
String path = FileUtils.transformationPath(ktsElement.getTaskPath());
StringBuffer cmdBuf = new StringBuffer();
cmdBuf.append("cd ").append(path).append(";nohup ./").append(PlugInvokeConstant.KTS_START_SHELL)
.append(" > /dev/null 2>&1 &");
try {
SAScriptHelper.shell(ktsElement.getSshDataSource(), PRINT, cmdBuf.toString());
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Start KTSSubscriber job successful !", PRINT);
}
catch (Exception e) {
throw new SyDevOpsRuntimeException("Failed to execute " + PlugInvokeConstant.KTS_START_SHELL, e);
}
finally {
SAScriptHelper.remove(ktsElement.getSshDataSource(), PRINT, path + PlugInvokeConstant.KTS_START_SHELL);
}
}
}
public ComponentResult monitor() {
// PID=$(cat /KTSSubscriber.pid) ps -ef|grep -v grep|grep 'xx/xxx/xxxx'
StringBuffer cmd = new StringBuffer();
cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") n");
cmd.append("ps -ef|grep -v grep|grep $PID");
String process = null;
try {
process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
}
catch (Exception e) {
PRINT.error("Monitor error 1:" + e.getMessage(), e);
}
// 如果pid不存在,不一定是进程被杀了,可能是网络原因,也可能是grep的问题
if (StringUtils.isEmpty(process)) {
// 网络原因,先休息一会
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
PRINT.error("Thread.sleep InterruptedException:" + e.getMessage(), e);
}
try {
process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
}
catch (Exception e) {
PRINT.error("Monitor error 2:" + e.getMessage(), e);
}
if (StringUtils.isEmpty(process)) {
// 还不行就可能是grep的问题,换个命令试一试
// ps -ef|grep -v grep|grep 'xx/xxx/xxxx'
cmd = new StringBuffer();
cmd.append("ps -ef|grep -v grep|grep '").append(ktsElement.getTaskPath()).append("'");
try {
process = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
}
catch (Exception e) {
PRINT.error("Monitor error 3:" + e.getMessage(), e);
}
}
}
if (StringUtils.isEmpty(process)) {
throw new SyDevOpsRuntimeException(
"The process of KTSSubscriber is stopped! Task Id is " + ktsElement.getTaskId());
}
else {
StringBuffer logFileBuf = new StringBuffer();
logFileBuf.append(FileUtils.transformationPath(AgentConfigProperties.getJumpserverLocalpath()))
.append(KTS_LOG_PATH).append(File.separator).append(ktsElement.getTaskId()).append(File.separator)
.append(PlugInvokeConstant.KTS_LOG);
String result = null;
String tailCmd = "tail -50 " + logFileBuf.toString();
// 日志压缩的时候会cat不到日志,这里必须异常捕获一下
try {
result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, tailCmd);
}
catch (Exception e) {
result = "";
}
ComponentResult readerResult = new ReaderResult();
readerResult.setInvokeResult(ComponentResult.RUNNING);
readerResult.setInvokeMsg(tailCmd + "n" + result);
return readerResult;
}
}
public void stop(boolean ignore) {
boolean isExists = true;
try {
SAScriptHelper.find(ktsElement.getSshDataSource(), null, ktsElement.getKtsPidPath());
PRINT.info(ktsElement.getKtsPidPath());
}
catch (Exception e) {
isExists = false;
}
/**
* <pre>
* PID=$(cat /KTSSubscriber.pid)
* kill -9 $PID
* pkill -f xx/xxx/xxxx
* </pre>
*/
StringBuffer cmd = new StringBuffer();
if (isExists) {
cmd.append("PID=$(cat " + ktsElement.getKtsPidPath() + ") n");
cmd.append("kill -9 $PID; n");
}
cmd.append("pkill -f ").append(ktsElement.getTaskPath());
try {
String result = SAScriptHelper.shell(ktsElement.getSshDataSource(), null, cmd.toString());
PRINT.info(result);
}
catch (Exception e) {
if (ObjectUtils.getStackTraceAsString(e).indexOf("Operation not permitted") != -1 || !ignore) {
throw new SyDevOpsRuntimeException(e.getMessage(), e);
}
}
}
public ComponentResult getRunningResult() {
ComponentResult readerResult = new ReaderResult();
readerResult.setInvokeResult(ComponentResult.RUNNING);
readerResult.setInvokeMsg("success");
return readerResult;
}
public ComponentResult getStopResult() {
ComponentResult readerResult = new ReaderResult();
readerResult.setInvokeResult(ComponentResult.SUCCESS);
readerResult.setInvokeMsg("success");
return readerResult;
}
/**
* 是否因为版本原因需要重新上传sdk包,默认不替换
*
* @param sdkPath
* @return
*/
public boolean isNeedReplaceSDKForVersion(String sdkPath) {
// 需要兼容以前的sdk包,考虑没有版本信息的情况
try {
String resourcesVersionCode = getVersionCodeForResources(sdkPath);
if (StringUtils.isEmpty(resourcesVersionCode)) {
// resources下的jar都没有版本信息,没必要继续执行
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources sdk not have versionCode.", PRINT);
return false;
}
String serverVersionCode = getVersionCodeForServer();
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Resources versionCode = " + resourcesVersionCode,
PRINT);
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Server versionCode = " + serverVersionCode, PRINT);
// 如果resources目录下的sdk包的版本比服务器端的大,那么返回true,代表需要替换服务器端的sdk包
if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isNotEmpty(serverVersionCode)) {
if (Long.parseLong(resourcesVersionCode) > Long.parseLong(serverVersionCode)) {
return true;
}
}
else {
// 如果resources目录下的sdk包有版本信息,而服务器端的没有,也需要替换。这表示服务器端使用的sdk包是旧的
if (StringUtils.isNotEmpty(resourcesVersionCode) && StringUtils.isEmpty(serverVersionCode)) {
// 我重新评估了一下,这风险有点高……
// return true;
}
}
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not compare the sdk version", PRINT);
}
return false;
}
/**
* 读取Resources目录的sdk版本号
*
* @param sdkPath
* @return
*/
private String getVersionCodeForResources(String sdkPath) {
try {
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); // 获取容器资源解析器
Resource[] resources = resolver.getResources(sdkPath);
for (Resource resource : resources) {
if (PlugInvokeConstant.KTS_JAR.equalsIgnoreCase(resource.getFilename())) {
File sdkFile = resource.getFile();
JarFile jarFile = null;
try {
jarFile = new JarFile(sdkFile);
Attributes attributes = jarFile.getManifest().getMainAttributes();
return attributes.getValue("versionCode"); // 版本号(yyyyMMddHHmmss)
}
catch (Exception e) {
throw new SyDevOpsRuntimeException(e.getMessage(), e);
}
finally {
if (jarFile != null) {
jarFile.close();
}
}
}
}
}
catch (RuntimeException e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",
PRINT);
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the resources sdk version info",
PRINT);
}
return null;
}
/**
* 读取服务器端的sdk的版本信息
*
* @return
*/
private String getVersionCodeForServer() {
try {
String versionFilePath = ktsElement.getPath() + File.separator + PlugInvokeConstant.KTS_VERSION;
String version = SAScriptHelper.cat(ktsElement.getSshDataSource(), PRINT, versionFilePath);
if (StringUtils.isNotEmpty(version)) {
version = version.replaceAll("n", "");
}
return version;
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO, "Can not read the server sdk version info", PRINT);
}
return null;
}
/**
* 写入sdk的版本信息,与sdk同级目录
*
* @param versionInfo
*/
public void writeSDKVersionFile(String versionInfo) {
if (StringUtils.isNotEmpty(versionInfo)) {
try {
StringBuffer echoVersionCmd = new StringBuffer();
echoVersionCmd.append("echo ").append(""").append(versionInfo).append(""").append(" > ")
.append(ktsElement.getPath()).append(File.separator).append(PlugInvokeConstant.KTS_VERSION);
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,
"Create sdk version file: " + echoVersionCmd.toString(), PRINT);
SAScriptHelper.shell(ktsElement.getSshDataSource(), null, echoVersionCmd.toString());
}
catch (Exception e) {
CommonContext.getLogger().makeRunLog(LoggerLevel.INFO,
"Write to " + PlugInvokeConstant.KTS_VERSION + " error", PRINT);
}
}
}
}
二、Flink对接对接实时计算平台(打成了jar)处理
Flink就不介绍了,开发是有分工的,大数据处理交给大数据工程师弄
最后
以上就是强健流沙为你收集整理的基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理的全部内容,希望文章能够帮你解决基于canal+kafka+flink的实时增量同步功能2:消费者kafkaTomysql代码实现一、kafka消费者代码简介二、Flink对接对接实时计算平台(打成了jar)处理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复