概述
Distributed Table Engine | ClickHouse DocsTables with Distributed engine do not store any data of their own, but allow distributed query processing on multiple servers.https://clickhouse.com/docs/en/engines/table-engines/special/distributed具有分布式引擎的表不存储自己的任何数据,但允许在多个服务器上进行分布式查询处理。 读取是自动并行的。读取数据会自动使用表索引。
建表语句
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]
根据已存在的表建立
如果distributed表指向服务器中已经存在的表:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
AS [db2.]name2
ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]
分布式表引擎的参数
-
cluster
- 集群名 -
database
- 数据库名,除了使用字符串,还能够使用currentDatabase() -
table
- 表名 -
sharding_key
- (可选)分片键 -
policy_name
- (可选)策略名称,它将用于存储异步发送的临时文件
Distributed相关的参数
-
fsync_after_insert
- 异步插入到分布式表之后,再对文件数据进行 fsync(fsync指同步内存中所有已修改的文件数据到储存设备)。保证操作系统将能将所有数据刷到磁盘文件中。 -
fsync_directories
- do thefsync
for directories. -
bytes_to_throw_insert
- 等待异步 INSERT的数据字节数超过这个值,就会抛出异常。默认为0,取消限制。 -
bytes_to_delay_insert
- 等待异步 INSERT的数据字节数超过这个值,插入将被延迟。默认为0,取消限制。 -
max_delay_to_insert
- 上面参数延迟的时间,默认为60(s) -
monitor_batch_inserts
- same as distributed_directory_monitor_batch_inserts,启用/禁用批量发送插入的数据。
启用批量发送后,分布式表引擎会尝试在一次操作中发送多个插入数据的文件,而不是单独发送。 批量发送通过更好地利用服务器和网络资源来提高集群性能。
-
monitor_split_batch_on_failure
- same as distributed_directory_monitor_split_batch_on_failure,启用/禁用在失败时拆分批次。
有时由于超出内存限制或类似错误导致某些复杂的管道(即带有 GROUP BY 的 MATERIALIZED VIEW)之后,将特定批次发送到远程分片可能会失败。 在这种情况下,重试将无济于事(这将卡住表的分布式发送),但从该批次中一一发送文件可能会成功插入。
-
monitor_sleep_time_ms
- same as distributed_directory_monitor_sleep_time_ms,分布式表引擎发送数据的基本间隔。 如果出现错误,实际间隔会呈指数增长。 -
monitor_max_sleep_time_ms
- same as distributed_directory_monitor_max_sleep_time_ms,分布式表引擎发送数据的最大间隔。
distributed不仅会从远程服务器上读取数据,而且会在允许的范围内在远程服务器上对数据进行部分处理。
比如,带group by的查询语句,将会在远程服务器上进行初步聚合,然后将聚合的中间结果发送过来进行下一步聚合。
Clusters
Clusters会定义在数据库的config.xml中。(在实际生产中,clusters的内容一般会拿出来单独在metrika.xml中设定,然后使用<include_from>/etc/clickhouse/metrika.xml</include_from>将配置包含在config.xml中)
<remote_servers>
<logs>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
-->
<!-- <secret></secret> -->
<shard>
<!-- Optional. Shard weight when writing data. Default: 1. -->
<weight>1</weight>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<internal_replication>false</internal_replication>
<replica>
<!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
<priority>1</priority>
<host>example01-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-01-2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<weight>2</weight>
<internal_replication>false</internal_replication>
<replica>
<host>example01-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-02-2</host>
<secure>1</secure>
<port>9440</port>
</replica>
</shard>
</logs>
</remote_servers>
这里定义了一个名为 logs 的集群,该集群由两个分片组成,每个分片包含两个副本。
分片:是指包含不同部分数据的服务器(为了读取所有数据,您必须访问所有分片)。
副本:是复制服务器(为了读取所有数据,您可以访问任何一个副本上的数据)。
Cluster的名称不能包含'.' 。
每个服务器都需要指定参数:
- 主机(host):必选。
- 远程服务器的地址。您可以使用域或 IPv4 或 IPv6 地址。如果指定域,则服务器在启动时会发出 DNS 请求,只要服务器正在运行,就会存储结果。如果 DNS 请求失败,则服务器不会启动。如果更改 DNS 记录,请重新启动服务器。
- 端口(port):必选。
- 发送信息的TCP端口(配置中的 tcp_port,通常设置为 9000)。 不要与 http_port 混淆。
- 用户(user):可选。
- 用于连接到远程服务器的用户名。 默认值是配置中的default用户。此用户必须有权连接到指定的服务器。 访问权限在 users.xml 文件中配置。 有关详细信息,请参阅访问权限部分。
- 密码(password):可选。
- 连接到远程服务器的密码(未屏蔽)。 默认值:空字符串。
- 安全(secure):可选。
- 是否使用安全的 SSL/TLS 连接。 通常还需要指定端口(默认安全端口为 9440)。 服务器应该监听 <tcp_port_secure>9440</tcp_port_secure> 并配置正确的证书。
- 压缩方式(compression):可选。
- 使用数据压缩。,默认值true
如果设置了副本,那么读取数据的时候,会为每个分片选择一个可用副本。(参考 load_balancing 设置可以设置数据负载均衡算法。)
如果选择的副本未建立与当前服务器的连接,在一小段时间内将尝试连接。如果连接不上,那么就会选择下一个副本,以此类推。如果所有副本的连接尝试都失败,则尝试以相同的方式重复几次。
这有利于弹性,但不提供完整的容错能力: 远程服务器可能接受连接后无法正常工作,或者工作不佳。
ck支持仅指定一个分片(在这种情况下,查询处理应称为远程处理,而不是分布式处理)或指定任意数量的分片。在每个分片中,可以指定任意数量的副本。 也可以为每个分片指定不同数量的副本。
配置中可以定义任意数量的集群(clusters)。
可以通过sql查询 system.clusters表来查看当前的集群信息。
Distributed引擎可以像操作本地服务器一样操作分布式集群,但是需要在配置中配置集群信息。
通常情况下,集群中的所有服务器都应该有相同的集群配置。
配置文件中的集群会即时更新,无需重新启动服务器。
如果每次发送的查询选择的分片是随机的,那么就不需要创建分布式表了,可以选择远程表函数的功能,参考 Table functions.
向Distributed表中写入数据
直接写入分片
可以定义将哪些数据写入哪些服务器并直接在每个分片上执行写入。 换句话说,对分布式表指向的集群中的远程表执行直接 INSERT 语句。
这是最灵活的解决方案,因为您可以使用任何分片方案,即使是由于主题领域的要求而并非微不足道的分片方案。
这也是最优化的解决方案,因为数据可以完全独立地写入不同的分片。
写入distributed表
对分布式表执行 INSERT 语句。
在这种情况下,distributed表将在服务器本身之间分配插入的数据。
为了写入分布式表,它必须配置 sharding_key 参数(除非只有一个分片)。
权重的配置与数据分发
每个分片都可以在配置文件中定义一个 <weight>。 默认情况下,权重为 1。
数据以与分片权重成比例的数量分布在分片中。
比例就是,将所有分片权重相加,然后除以所有分片的总和,以确定每个分片的比例。 例如,如果有两个分片,第一个的权重为 1,而第二个的权重为 2,第一个将被发送三分之一 (1 / 3) 的插入行,第二个将被发送三分之二 (2 / 3)。
internal_replication 参数
每个分片都可以在配置文件中定义 internal_replication 参数。
如果此参数设置为 true,则写入操作会选择第一个健康的副本并向其写入数据。如果distributed表下的表引擎是replicated系列的,那么就开启这个值,一个表副本将接收写入,并将自动复制到其他副本。
如果 internal_replication 设置为 false(默认值),则数据将写入所有副本。 在这种情况下,分布式表自己复制数据,由于不会检查一致性,一段时间后必然会出现数据不一致的情况。
如何选择数据插入的分片
要选择一行数据插入哪个分片,需要计算该行对应的分片表达式。
分片表达式可以是返回整数的常量或者表的列中的任何表达式。
例如,您可以使用表达式 rand() 来进行数据的随机分布,或者使用 UserID 来通过除用户 ID 得到的余数进行分布(那么单个用户的数据将驻留在单个分片上,这样可以简化运行 IN 和 JOIN由用户)。如果其中一列分布不够均匀,您可以将其包装在哈希函数中,例如 intHash64(UserID)。
在计算完分片表达式后,将其除以分片的总权重得到余数。根据余数的范围就可以选择插入到哪个分片中。
比如:
计算的分片表达式值为n。如果有两个分片,第一个的权重为 9,而第二个的权重为 10,则余数就是n%19。
该行发送到第一个分片的范围余数范围为 [0, 9) ,发送到第二个分片的范围为 [9, 19) 。
做除法取余的这种方式解决的场景有限,在其他场景的情况下是不合适的。这种方式适合中大型的场景(数十台服务器),但是对于特大型场景(数百台或更多服务器)就不试用了。这种情况应该对区域分区,不应该使用distributed表。
在以下情况下应该关注一下选择分片的方案:
- join或者in的查询,这时候如果join或者in的两张表能够有相同的分发策略,那么join或者in查询就能够在本地实现,不用再使用global join/in,这样会更有效率。
- 使用大量服务器(数百台),并且包含大量的小查询。例如查询单个客户(例如网站、广告商或合作伙伴)的数据。
- 这种情况下将这个用户的所有数据放置在一个分片上会更有效率。
- 或者将一些分片分在一起,称之为一个“layer”,数据在每个layer随机分布,然后对这些“layer”做分布式表,每个客户的数据放置在一个layer中。为全局查询创建一个共享分布式表。
数据的写入方式
数据是异步写入的。
当插入数据的时候,数据块只会写入本地文件系统。然后在后台,数据会尽快发到各个shard。
发送数据的周期由下面两个配置项定义:
distributed_directory_monitor_sleep_time_ms
distributed_directory_monitor_max_sleep_time_ms
两个参数的含义上面写了。
Distributed 引擎会分别发送插入的文件数据,设置 distributed_directory_monitor_batch_inserts 可以批量发送文件。此设置通过更好地利用本地服务器和网络资源来提高集群性能。
可以通过查看表目录/var/lib/clickhouse/data/database/table/ 来检查数据是否发送成功。
执行后台任务的线程数可以通过 background_distributed_schedule_pool_size 来设置。
如果在 INSERT 到分布式表之后,服务器因为某些原因停止服务或者重启了,那么插入的数据可能会丢失。如果在表目录中检测到损坏的数据部分,则将其转移到损坏的子目录中,不再使用。
从Distributed表中读取数据
查询分布式表时,SELECT 查询会被发送到所有分片。
添加新分片时,不必传输旧数据。可以通过使用更大的权重向其写入新数据——数据将稍微不均匀地分布,但查询将正确有效地工作。
启用 max_parallel_replicas 后,查询处理将在单个分片内的所有副本之间进行并行处理。
Virtual Columns
_shard_num:包含表 system.clusters 中的 shard_num 值。 类型:UInt32。
由于远程和集群表函数在内部创建临时分布式表,所以 _shard_num 在那里也可用。
最后
以上就是悲凉乐曲为你收集整理的ClickHouse distributed表引擎建表语句 Distributed相关的参数Clusters向Distributed表中写入数据从Distributed表中读取数据Virtual Columns的全部内容,希望文章能够帮你解决ClickHouse distributed表引擎建表语句 Distributed相关的参数Clusters向Distributed表中写入数据从Distributed表中读取数据Virtual Columns所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复