我是靠谱客的博主 温婉向日葵,最近开发中收集的这篇文章主要介绍ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
目录
前文:
一、概括
二、集群
三、优化
四、表引擎
4.1 表引擎分类
4.2 表引擎相关说明及写入
五、Bitmap
六、Flink—ClickHouse 代码
6.0 DDL
6.1 实时 pom.xml
6.2 实时代码
6.3 离线 pom.xml
6.4 离线代码
前文:
ClickHouse是一个强大的OLAP数据库。
一、概括
二、集群
6节点:3分片2副本配置
<yandex>
<!--自定义配置名,与config.xml 配置的 incl 属性对应即可-->
<ch_order_servers>
<order_cluster_1> <!--自定义集群名,全局唯一 -->
<shard> <!--分片1-->
<!--是否内部复制-->
<internal_replication>true</internal_replication>
<replica> <!--分片节点1-->
<host>clickhouse1.com</host>
<port>9000</port>
</replica>
<replica> <!--副本节点1-->
<host>clickhouse2.com</host>
<port>9000</port>
</replica>
</shard>
<shard> <!--分片2-->
<!--是否内部复制-->
<internal_replication>true</internal_replication>
<replica> <!--分片节点2-->
<host>clickhouse3.com</host>
<port>9000</port>
</replica>
<replica> <!--副本节点2-->
<host>clickhouse4.com</host>
<port>9000</port>
</replica>
</shard>
<shard> <!--分片3-->
<!--是否内部复制-->
<internal_replication>true</internal_replication>
<replica> <!--分片节点3-->
<host>clickhouse5.com</host>
<port>9000</port>
</replica>
<replica> <!--副本节点3-->
<host>clickhouse6.com</host>
<port>9000</port>
</replica>
</shard>
</order_cluster_1>
</ch_order_servers>
<!-- Zookeeper 配置,自定义-->
<zookeeper-servers>
<node index="1"> <!--节点配置,可配置多个-->
<host>clickhouse1.com</host>
<port>2181</port>
</node>
<node index="2">
<host>clickhouse2.com</host>
<port>2181</port>
</node>
<node index="3"> <!--节点配置,可配置多个-->
<host>clickhouse3.com</host>
<port>2181</port>
</node>
<node index="4">
<host>clickhouse4.com</host>
<port>2181</port>
</node>
<node index="5"> <!--节点配置,可配置多个-->
<host>clickhouse5.com</host>
<port>2181</port>
</node>
<node index="6">
<host>clickhouse6.com</host>
<port>2181</port>
</node>
</zookeeper-servers>
<!--定义分区宏变量-->
<macros>
<shard>01</shard>
<replica>clickhouse1.com</replica>
</macros>
</yandex>
三、优化
四、表引擎
4.1 表引擎分类
4.2 表引擎相关说明及写入
备注: Replicated* 本地表 , Distributed 分布式表
五、Bitmap
六、Flink—ClickHouse 代码
6.0 DDL
CREATE TABLE tmp.id_val
(
`id` UInt32,
`val` UInt32
)
ENGINE = TinyLog
6.1 实时 pom.xml
<!--ClickHouse-->
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.1.0</version>
</dependency>
6.2 实时代码
package sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import ru.ivi.opensource.flinkclickhousesink.ClickhouseSink;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseClusterSettings;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseSinkConsts;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class ClickhouseSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
Map<String, String> globalParameters = new HashMap<>();
// clickhouse cluster properties
globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_HOSTS, "http://bt-05:8123/tmp");
// globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_USER, "root");
// globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_PASSWORD, "123456");
// sink common
globalParameters.put(ClickhouseSinkConsts.TIMEOUT_SEC, "10");
globalParameters.put(ClickhouseSinkConsts.FAILED_RECORDS_PATH, "D:\project_demo\flink_demo\src\main\resources");
globalParameters.put(ClickhouseSinkConsts.NUM_WRITERS, "1");
globalParameters.put(ClickhouseSinkConsts.NUM_RETRIES, "3");
globalParameters.put(ClickhouseSinkConsts.QUEUE_MAX_CAPACITY, "3");
// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(globalParameters);
env.getConfig().setGlobalJobParameters(parameters);
DataStreamSource<String> input = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> out = input.map(new MapFunction<String, String>() {
@Override
public String map(String in) throws Exception {
return convertToCsv(in);
}
});
// create props for sink
Properties props = new Properties();
props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "tmp.id_val");
props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "2");
out.addSink(new ClickhouseSink(props));
env.execute();
}
public static String convertToCsv(String in) {
String[] arr = in.split(",");
StringBuilder builder = new StringBuilder();
builder.append("(");
// add a.str
// builder.append("'");
// builder.append(a.str);
// builder.append("', ");
// add a.intger
builder.append(String.valueOf(arr[0]));
builder.append(", ");
builder.append(String.valueOf(arr[1]));
builder.append(" )");
return builder.toString();
}
}
6.3 离线 pom.xml
<!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
6.4 离线代码
package sink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
object StreamingJob {
def main(args: Array[String]) {
val SourceCsvPath =
"D:\project_demo\flink_demo\src\main\resources\id_vid.csv"
val CkJdbcUrl =
"jdbc:clickhouse://bt-05:8123/tmp"
// val CkUsername = "root"
// val CkPassword = "123456"
val BatchSize = 500 // 设置您的batch size
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val csvTableSource = CsvTableSource
.builder()
.path(SourceCsvPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", DataTypes.INT)
.field("val", DataTypes.INT)
.build()
tEnv.registerTableSource("source", csvTableSource)
val resultTable = tEnv.scan("source").select("id, val")
val insertIntoCkSql =
"""
| INSERT INTO id_val (
| id, val
| ) VALUES (
| ?, ?
| )
""".stripMargin
//将数据写入 ClickHouse Sink
val sink = JDBCAppendTableSink
.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl(CkJdbcUrl)
// .setUsername(CkUsername)
// .setPassword(CkPassword)
.setQuery(insertIntoCkSql)
.setBatchSize(BatchSize)
.setParameterTypes(Types.INT, Types.INT())
.build()
tEnv.registerTableSink(
"sink",
Array("id", "val"),
Array(Types.INT, Types.INT),
sink
)
tEnv.insertInto(resultTable, "sink")
env.execute("Flink Table API to ClickHouse Example")
}
}
最后
以上就是温婉向日葵为你收集整理的ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码的全部内容,希望文章能够帮你解决ClickHouse 知识点整理前文:一、概括二、集群三、优化四、表引擎五、Bitmap六、Flink—ClickHouse 代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复