我是靠谱客的博主 苗条发带,最近开发中收集的这篇文章主要介绍基于Flink的JDBC SINK插入Nested结构数据到Clickhouse,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package com.msyc.bi.sync.utils;

import cn.hutool.core.util.ReflectUtil;
import com.msyc.bi.sync.annotation.NestedSink;
import com.msyc.bi.sync.annotation.TableSink;
import com.msyc.bi.sync.annotation.TransientSink;
import com.msyc.bi.sync.bean.ods.GmpSodPartnerRec;
import com.mysql.cj.jdbc.ClientPreparedStatement;
import lombok.SneakyThrows;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class ClickHouseUtil {
    /**
     * Ck bean转sql 工具类
     * @param param
     * @param clazz
     * @param <T>
     * @return
     * @throws NoSuchMethodException
     * @throws ClassNotFoundException
     */


    public static <T> SinkFunction<T> getSink(ParameterTool param,Class<?> clazz) throws NoSuchMethodException, ClassNotFoundException {

        String sql = createInsertSql(clazz);



        return JdbcSink.<T>sink(sql,
                new JdbcStatementBuilder<T>() {
                    @SneakyThrows
                    @Override
                    public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
                        
                        //获取所有的属性信息
                        Field[] fields = t.getClass().getDeclaredFields();

                        //遍历字段
//                            int offset = 0;
                        int num = 1;
//                        int t = 0;
                        for (Field field : fields) {

                            //设置私有属性可访问
                            if (!field.isAccessible()) {
                                field.setAccessible(true);//赋权使用,否则private类型是无法操作的
                            }

                            boolean  nestedSink = field.isAnnotationPresent(NestedSink.class);
                            if (nestedSink) {

                                Type genericType = field.getGenericType();
                                if(genericType instanceof ParameterizedType) {
                                    //获取泛型类
                                    ParameterizedType pt = (ParameterizedType) field.getGenericType();
                                    Type[] actualTypeArguments = pt.getActualTypeArguments();

//
                                    //获取对象
                                    if (actualTypeArguments.length > 0 ) {
//                                        System.out.println(field.get(t));

                                        if (field.get(t) != null) {
                                            Class clazz = field.get(t).getClass();


//                                        Class<?> accountPrincipalApproveClazz = (Class<?>) actualTypeArguments[0];
                                            //获取对象list属性的class
//                                        Class clazz = field.get(t).getClass();

                                            //获取list属性的size方法
                                            Method m = clazz.getDeclaredMethod("size");

                                            int size = (Integer) m.invoke(field.get(t));//调用size方法

                                            //将存入一个map
                                            LinkedHashMap<Field, List<Object>> mapList = new LinkedHashMap<>();

                                            for (int i = 0; i < size; i++) {
                                                Method getM = clazz.getDeclaredMethod("get", int.class);//获取list属性的get方法
                                                Object u = getM.invoke(field.get(t), i);//调用get方法获取list中的对象
                                                Field[] uf = u.getClass().getDeclaredFields();
                                                for (Field fu : uf) {
                                                    if (!fu.isAccessible())
                                                        fu.setAccessible(true);
//                                                System.out.println("i=>" + i+ " fu=>" +fu +  " fu.get(u)=>" +   fu.get(u) + "n");
                                                    //将相同字段的值存在一个list里
                                                    if (mapList.containsKey(fu)) {
                                                        List<Object> objects = mapList.get(fu);
                                                        List<Object> list = new CopyOnWriteArrayList<>(objects);
                                                        list.add(fu.get(u));
                                                        mapList.put(fu, list);
                                                    } else {
                                                        List<Object> list = new CopyOnWriteArrayList<>();
                                                        list.add(fu.get(u));
                                                        mapList.put(fu, list);
                                                    }
                                                }

                                            }
                                            for (Field itemKey2 : mapList.keySet()) {
//                                            System.out.println(itemKey2.getGenericType().toString());
//                                                System.out.println("itemKey2=>" + itemKey2);
//                                                System.out.println("mapList.get(itemKey2=>" + mapList.get(itemKey2));
                                                if (itemKey2.getGenericType().toString().equals("long") || itemKey2.getGenericType().toString().equals("double")) {
                                                    preparedStatement.setObject(num++,  mapList.get(itemKey2));
                                                } else {

                                                    java.sql.Array sqlArray = preparedStatement.getConnection().createArrayOf("String", mapList.get(itemKey2).toArray());
                                                    preparedStatement.setArray(num++, sqlArray);
                                                }


                                            }

                                        }
                                    }
                                }
                                continue;
                            }
                            Object value = field.get(t);

                            preparedStatement.setObject(num++, value);
                        }

                        //此处打印了对象和 带入参数后的sql语句
//                        String rsq = ((ru.yandex.clickhouse.ClickHousePreparedStatement)preparedStatement).asSql();
//                        System.out.println("sql1=>" + num);
//                        System.out.println("num1=>" + num);
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(param.getInt("db.ck.bachSize"))
                        .withBatchIntervalMs(param.getInt("db.ck.batchIntervalSeconds"))
                        .withMaxRetries(param.getInt("db.ck.retries"))
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(param.getRequired("db.ck.driver"))
                        .withUrl(param.getRequired("db.ck.url"))
                        .withPassword(param.getRequired("db.ck.pass"))
                        .withUsername(param.getRequired("db.ck.user"))
                        .build());

    }

    /**
     * 生成占位符 sql
     *
     * @return sql
     */
    private static String createInsertSql(Class<?> clazz) throws NoSuchMethodException, ClassNotFoundException {

        TableSink annotation = clazz.getAnnotation(TableSink.class);
        String tableName = annotation.name();
        StringBuilder sql1 = new StringBuilder().append("insert into ").append(tableName).append("(");
        StringBuilder sql2 = new StringBuilder().append(" values (");
        Field[] declaredFields = clazz.getDeclaredFields();
        int indices = declaredFields.length;
        int t =1;

        for (int i = 0; i < indices; i++) {
            Field field = declaredFields[i];
            field.setAccessible(true);

            //获取字段名称
            String fieldName = field.getName();

            //判断是否为嵌套字段,如果为嵌套字段则转为[]格式
            boolean  nestedSink = field.isAnnotationPresent(NestedSink.class);
            if (nestedSink) {

                //获取泛型类
                ParameterizedType pt = (ParameterizedType)field.getGenericType();
                Class<?> accountPrincipalApproveClazz = (Class<?>)pt.getActualTypeArguments()[0];
                Field[] fields2 = accountPrincipalApproveClazz.getDeclaredFields();


                for (int j = 0; j < fields2.length; j++) {

                        //获取字段
                        Field field2 = fields2[j];

                        //设置私有属性可访问
                        field2.setAccessible(true);

                        //获取字段名称
                        String fieldName2 = field2.getName();

                        //合并一级字段名称 和二级字段名称 并转下划线加大写
                        String name2 = xX2x_x(fieldName).toUpperCase() + "." + xX2x_x(fieldName2).toUpperCase();
//                        System.out.println("name2=>" +name2);
                        if(j == fields2.length-1 && i == indices - 1){
                            sql1.append(name2).append(") ");
                            sql2.append("?)");
                            t ++;
                        }else{
                            sql1.append(name2).append(",");
                            sql2.append( "?,");
                            t ++;
                        }
                    }
                continue;
            }

                //转下划线加大写
                String name = xX2x_x(fieldName).toUpperCase();
                if(i == indices - 1){
                    sql1.append(name).append(") ");
                    sql2.append("?) ");
                    t ++;
                }else{
                    sql1.append(name).append(",");
                    sql2.append( "?,");
                    t ++;
                }



        }
//        System.out.println("num2=>" + t);
        return sql1.append(sql2).toString();
    }



    /**
     *
     * @Description 将驼峰转为下划线
     * @param str
     * @return java.lang.String
     * @Date   2022/4/22 13:11
     * @since  1.0.0
     */
    public static String xX2x_x(String str) {
        Pattern compile = Pattern.compile("[A-Z]");
        Matcher matcher = compile.matcher(str);
        StringBuffer sb = new StringBuffer();
        while(matcher.find()) {
            matcher.appendReplacement(sb,  "_" + matcher.group(0).toLowerCase());
        }
        matcher.appendTail(sb);
        return sb.toString();
    }




}
package com.msyc.bi.sync.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
 * Nested字段项
 */
@Target(ElementType.FIELD)
@Retention(RUNTIME)
public @interface TransientSink {
}
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version> 0.3.2</version>
</dependency>

最后

以上就是苗条发带为你收集整理的基于Flink的JDBC SINK插入Nested结构数据到Clickhouse的全部内容,希望文章能够帮你解决基于Flink的JDBC SINK插入Nested结构数据到Clickhouse所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部