我是靠谱客的博主 温婉向日葵,最近开发中收集的这篇文章主要介绍ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

 

前文:

一、概括

二、集群

三、优化

四、表引擎

4.1 表引擎分类

4.2 表引擎相关说明及写入

五、Bitmap

六、Flink—ClickHouse 代码

         6.0 DDL

6.1 实时 pom.xml

6.2 实时代码 

6.3 离线 pom.xml

6.4 离线代码


前文:

                ClickHouse是一个强大的OLAP数据库。

一、概括

二、集群

 

 

6节点:3分片2副本配置

<yandex>
  <!--自定义配置名,与config.xml 配置的 incl 属性对应即可-->
  <ch_order_servers>
    <order_cluster_1> <!--自定义集群名,全局唯一 -->
      <shard> <!--分片1-->
        <!--是否内部复制-->
        <internal_replication>true</internal_replication>
        <replica> <!--分片节点1-->
          <host>clickhouse1.com</host>
          <port>9000</port>
        </replica>
        <replica> <!--副本节点1-->
          <host>clickhouse2.com</host>
          <port>9000</port>
        </replica>
      </shard>
      <shard> <!--分片2-->
        <!--是否内部复制-->
        <internal_replication>true</internal_replication>
        <replica> <!--分片节点2-->
          <host>clickhouse3.com</host>
          <port>9000</port>
        </replica>
        <replica> <!--副本节点2-->
          <host>clickhouse4.com</host>
          <port>9000</port>
        </replica>
      </shard>
      <shard> <!--分片3-->
        <!--是否内部复制-->
        <internal_replication>true</internal_replication>
        <replica> <!--分片节点3-->
          <host>clickhouse5.com</host>
          <port>9000</port>
        </replica>
        <replica> <!--副本节点3-->
          <host>clickhouse6.com</host>
          <port>9000</port>
        </replica>
      </shard>
    </order_cluster_1>
  </ch_order_servers>

  <!-- Zookeeper 配置,自定义-->
  <zookeeper-servers>
    <node index="1"> <!--节点配置,可配置多个-->
        <host>clickhouse1.com</host>
      <port>2181</port>
    </node>
    <node index="2">
        <host>clickhouse2.com</host>
      <port>2181</port>
    </node>
    <node index="3"> <!--节点配置,可配置多个-->
        <host>clickhouse3.com</host>
      <port>2181</port>
    </node>
    <node index="4">
        <host>clickhouse4.com</host>
      <port>2181</port>
    </node>
    <node index="5"> <!--节点配置,可配置多个-->
        <host>clickhouse5.com</host>
      <port>2181</port>
    </node>
    <node index="6">
        <host>clickhouse6.com</host>
      <port>2181</port>
    </node>
  </zookeeper-servers>
  
  <!--定义分区宏变量-->
  <macros>
    <shard>01</shard>
    <replica>clickhouse1.com</replica>
  </macros>
</yandex>

 

三、优化

四、表引擎

4.1 表引擎分类

4.2 表引擎相关说明及写入

备注: Replicated* 本地表 , Distributed 分布式表

五、Bitmap

六、Flink—ClickHouse 代码

6.0 DDL

CREATE TABLE tmp.id_val
(
    `id` UInt32,
    `val` UInt32
)
ENGINE = TinyLog

6.1 实时 pom.xml

        <!--ClickHouse-->
        <dependency>
            <groupId>ru.ivi.opensource</groupId>
            <artifactId>flink-clickhouse-sink</artifactId>
            <version>1.1.0</version>
        </dependency>

6.2 实时代码 

package sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import ru.ivi.opensource.flinkclickhousesink.ClickhouseSink;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseClusterSettings;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseSinkConsts;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class ClickhouseSinkDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        Map<String, String> globalParameters = new HashMap<>();
        // clickhouse cluster properties
        globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_HOSTS, "http://bt-05:8123/tmp");
//        globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_USER, "root");
//        globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_PASSWORD, "123456");

        // sink common
        globalParameters.put(ClickhouseSinkConsts.TIMEOUT_SEC, "10");
        globalParameters.put(ClickhouseSinkConsts.FAILED_RECORDS_PATH, "D:\project_demo\flink_demo\src\main\resources");
        globalParameters.put(ClickhouseSinkConsts.NUM_WRITERS, "1");
        globalParameters.put(ClickhouseSinkConsts.NUM_RETRIES, "3");
        globalParameters.put(ClickhouseSinkConsts.QUEUE_MAX_CAPACITY, "3");

        // set global paramaters
        ParameterTool parameters = ParameterTool.fromMap(globalParameters);
        env.getConfig().setGlobalJobParameters(parameters);

        DataStreamSource<String> input = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> out = input.map(new MapFunction<String, String>() {
            @Override
            public String map(String in) throws Exception {
                return convertToCsv(in);
            }
        });

        // create props for sink
        Properties props = new Properties();
        props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "tmp.id_val");
        props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "2");
        out.addSink(new ClickhouseSink(props));

        env.execute();
    }

    public static String convertToCsv(String in) {
        String[] arr = in.split(",");

        StringBuilder builder = new StringBuilder();
        builder.append("(");

        // add a.str
        // builder.append("'");
        // builder.append(a.str);
        // builder.append("', ");

        // add a.intger
        builder.append(String.valueOf(arr[0]));
        builder.append(", ");
        builder.append(String.valueOf(arr[1]));
        builder.append(" )");
        return builder.toString();
    }

}

6.3 离线 pom.xml

        <!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>

6.4 离线代码

package sink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink


object StreamingJob {
  def main(args: Array[String]) {
    val SourceCsvPath =
      "D:\project_demo\flink_demo\src\main\resources\id_vid.csv"
    val CkJdbcUrl =
      "jdbc:clickhouse://bt-05:8123/tmp"
    //    val CkUsername = "root"
    //    val CkPassword = "123456"
    val BatchSize = 500 // 设置您的batch size

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = StreamTableEnvironment.create(env)

    val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
      .ignoreFirstLine()
      .fieldDelimiter(",")
      .field("id", DataTypes.INT)
      .field("val", DataTypes.INT)
      .build()

    tEnv.registerTableSource("source", csvTableSource)

    val resultTable = tEnv.scan("source").select("id, val")

    val insertIntoCkSql =
      """
        |  INSERT INTO id_val (
        |    id, val
        |  ) VALUES (
        |    ?, ?
        |  )
      """.stripMargin

    //将数据写入 ClickHouse Sink
    val sink = JDBCAppendTableSink
      .builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl(CkJdbcUrl)
      //      .setUsername(CkUsername)
      //      .setPassword(CkPassword)
      .setQuery(insertIntoCkSql)
      .setBatchSize(BatchSize)
      .setParameterTypes(Types.INT, Types.INT())
      .build()

    tEnv.registerTableSink(
      "sink",
      Array("id", "val"),
      Array(Types.INT, Types.INT),
      sink
    )

    tEnv.insertInto(resultTable, "sink")

    env.execute("Flink Table API to ClickHouse Example")
  }
}

 

 

 

 

 

 

 

最后

以上就是温婉向日葵为你收集整理的ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码的全部内容,希望文章能够帮你解决ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部