概述
package com.msyc.bi.sync.utils;
import cn.hutool.core.util.ReflectUtil;
import com.msyc.bi.sync.annotation.NestedSink;
import com.msyc.bi.sync.annotation.TableSink;
import com.msyc.bi.sync.annotation.TransientSink;
import com.msyc.bi.sync.bean.ods.GmpSodPartnerRec;
import com.mysql.cj.jdbc.ClientPreparedStatement;
import lombok.SneakyThrows;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class ClickHouseUtil {
/**
* Ck bean转sql 工具类
* @param param
* @param clazz
* @param <T>
* @return
* @throws NoSuchMethodException
* @throws ClassNotFoundException
*/
public static <T> SinkFunction<T> getSink(ParameterTool param,Class<?> clazz) throws NoSuchMethodException, ClassNotFoundException {
String sql = createInsertSql(clazz);
return JdbcSink.<T>sink(sql,
new JdbcStatementBuilder<T>() {
@SneakyThrows
@Override
public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
//获取所有的属性信息
Field[] fields = t.getClass().getDeclaredFields();
//遍历字段
// int offset = 0;
int num = 1;
// int t = 0;
for (Field field : fields) {
//设置私有属性可访问
if (!field.isAccessible()) {
field.setAccessible(true);//赋权使用,否则private类型是无法操作的
}
boolean nestedSink = field.isAnnotationPresent(NestedSink.class);
if (nestedSink) {
Type genericType = field.getGenericType();
if(genericType instanceof ParameterizedType) {
//获取泛型类
ParameterizedType pt = (ParameterizedType) field.getGenericType();
Type[] actualTypeArguments = pt.getActualTypeArguments();
//
//获取对象
if (actualTypeArguments.length > 0 ) {
// System.out.println(field.get(t));
if (field.get(t) != null) {
Class clazz = field.get(t).getClass();
// Class<?> accountPrincipalApproveClazz = (Class<?>) actualTypeArguments[0];
//获取对象list属性的class
// Class clazz = field.get(t).getClass();
//获取list属性的size方法
Method m = clazz.getDeclaredMethod("size");
int size = (Integer) m.invoke(field.get(t));//调用size方法
//将存入一个map
LinkedHashMap<Field, List<Object>> mapList = new LinkedHashMap<>();
for (int i = 0; i < size; i++) {
Method getM = clazz.getDeclaredMethod("get", int.class);//获取list属性的get方法
Object u = getM.invoke(field.get(t), i);//调用get方法获取list中的对象
Field[] uf = u.getClass().getDeclaredFields();
for (Field fu : uf) {
if (!fu.isAccessible())
fu.setAccessible(true);
// System.out.println("i=>" + i+ " fu=>" +fu + " fu.get(u)=>" + fu.get(u) + "n");
//将相同字段的值存在一个list里
if (mapList.containsKey(fu)) {
List<Object> objects = mapList.get(fu);
List<Object> list = new CopyOnWriteArrayList<>(objects);
list.add(fu.get(u));
mapList.put(fu, list);
} else {
List<Object> list = new CopyOnWriteArrayList<>();
list.add(fu.get(u));
mapList.put(fu, list);
}
}
}
for (Field itemKey2 : mapList.keySet()) {
// System.out.println(itemKey2.getGenericType().toString());
// System.out.println("itemKey2=>" + itemKey2);
// System.out.println("mapList.get(itemKey2=>" + mapList.get(itemKey2));
if (itemKey2.getGenericType().toString().equals("long") || itemKey2.getGenericType().toString().equals("double")) {
preparedStatement.setObject(num++, mapList.get(itemKey2));
} else {
java.sql.Array sqlArray = preparedStatement.getConnection().createArrayOf("String", mapList.get(itemKey2).toArray());
preparedStatement.setArray(num++, sqlArray);
}
}
}
}
}
continue;
}
Object value = field.get(t);
preparedStatement.setObject(num++, value);
}
//此处打印了对象和 带入参数后的sql语句
// String rsq = ((ru.yandex.clickhouse.ClickHousePreparedStatement)preparedStatement).asSql();
// System.out.println("sql1=>" + num);
// System.out.println("num1=>" + num);
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(param.getInt("db.ck.bachSize"))
.withBatchIntervalMs(param.getInt("db.ck.batchIntervalSeconds"))
.withMaxRetries(param.getInt("db.ck.retries"))
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(param.getRequired("db.ck.driver"))
.withUrl(param.getRequired("db.ck.url"))
.withPassword(param.getRequired("db.ck.pass"))
.withUsername(param.getRequired("db.ck.user"))
.build());
}
/**
* 生成占位符 sql
*
* @return sql
*/
private static String createInsertSql(Class<?> clazz) throws NoSuchMethodException, ClassNotFoundException {
TableSink annotation = clazz.getAnnotation(TableSink.class);
String tableName = annotation.name();
StringBuilder sql1 = new StringBuilder().append("insert into ").append(tableName).append("(");
StringBuilder sql2 = new StringBuilder().append(" values (");
Field[] declaredFields = clazz.getDeclaredFields();
int indices = declaredFields.length;
int t =1;
for (int i = 0; i < indices; i++) {
Field field = declaredFields[i];
field.setAccessible(true);
//获取字段名称
String fieldName = field.getName();
//判断是否为嵌套字段,如果为嵌套字段则转为[]格式
boolean nestedSink = field.isAnnotationPresent(NestedSink.class);
if (nestedSink) {
//获取泛型类
ParameterizedType pt = (ParameterizedType)field.getGenericType();
Class<?> accountPrincipalApproveClazz = (Class<?>)pt.getActualTypeArguments()[0];
Field[] fields2 = accountPrincipalApproveClazz.getDeclaredFields();
for (int j = 0; j < fields2.length; j++) {
//获取字段
Field field2 = fields2[j];
//设置私有属性可访问
field2.setAccessible(true);
//获取字段名称
String fieldName2 = field2.getName();
//合并一级字段名称 和二级字段名称 并转下划线加大写
String name2 = xX2x_x(fieldName).toUpperCase() + "." + xX2x_x(fieldName2).toUpperCase();
// System.out.println("name2=>" +name2);
if(j == fields2.length-1 && i == indices - 1){
sql1.append(name2).append(") ");
sql2.append("?)");
t ++;
}else{
sql1.append(name2).append(",");
sql2.append( "?,");
t ++;
}
}
continue;
}
//转下划线加大写
String name = xX2x_x(fieldName).toUpperCase();
if(i == indices - 1){
sql1.append(name).append(") ");
sql2.append("?) ");
t ++;
}else{
sql1.append(name).append(",");
sql2.append( "?,");
t ++;
}
}
// System.out.println("num2=>" + t);
return sql1.append(sql2).toString();
}
/**
*
* @Description 将驼峰转为下划线
* @param str
* @return java.lang.String
* @Date 2022/4/22 13:11
* @since 1.0.0
*/
public static String xX2x_x(String str) {
Pattern compile = Pattern.compile("[A-Z]");
Matcher matcher = compile.matcher(str);
StringBuffer sb = new StringBuffer();
while(matcher.find()) {
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
}
package com.msyc.bi.sync.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Nested字段项
*/
@Target(ElementType.FIELD)
@Retention(RUNTIME)
public @interface TransientSink {
}
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version> 0.3.2</version>
</dependency>
最后
以上就是苗条发带为你收集整理的基于Flink的JDBC SINK插入Nested结构数据到Clickhouse的全部内容,希望文章能够帮你解决基于Flink的JDBC SINK插入Nested结构数据到Clickhouse所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复