概述
1. 背景
背景是想通过最新的 jbdc 来使waterdrop 可以导入bitmap 类型的数据
2. 实施方法
a. 配置文件 adm_dmp_clickhouse_jdbc.conf
spark {
# Waterdrop defined streaming batch duration in seconds
spark.streaming.batchDuration = 5
spark.sql.catalogImplementation = "hive"
spark.app.name = "dmp#tranadm.adm_user_id_dist_pre"
spark.ui.port = 13000
spark.dynamicAllocation.enabled=false
spark.executor.instances = 9
spark.executor.cores = 4
spark.executor.memory = "28g"
spark.default.parallelism=72
spark.sql.shuffle.partitions=72
spark.dynamicAllocation.enabled=false
}
input {
hive {
pre_sql = "select 'code' as code , label_value as value , from_unixtime(unix_timestamp(dt,'yyyyMMdd'),'yyyy-MM-dd') as dt , cast('123456' AS LONG) , collect_list(cast(id as int)) as id_bitmap from tranadm.adm_audc_user_base_label_string_inner_pro where dt='20221228' and label_name='country' and label_value is not null group by label_value,dt"
result_table_name = "adm_dmp_user_id_dist"
}
}
filter {
}
output {
clickhousebitmap {
save_mode = "overwrite"
host = ""${clickhouse_urls}""
clickhouse.socket_timeout = 100000
database = ""${desc_ck_db}""
table = ""${desc_ck_table}""
fields = ["code","value","dt","version","id_bitmap"]
username = ""
password = ""
bulk_size = 5000000
}
}
b. 代码具体逻辑
collect_list 这个里面存放的是 spark 中arraylist [string] 类型的数据
/**
* 将数组转换成bitmap
*/
private def getBitMapById(list_id: mutable.WrappedArray[Int]): RoaringBitmap = {
if (list_id.length == 0) {
new RoaringBitmap()
} else {
var startBitMap = RoaringBitmap.bitmapOf(list_id(0))
if (list_id.length > 1) {
for (i <- 1 to list_id.length - 1) {
startBitMap.add(list_id(i))
}
}
startBitMap
}
}
private def renderBaseTypeStatement(
index: Int,
fieldIndex: Int,
fieldType: String,
item: Row,
statement: PreparedStatement): Unit = {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1, item.getAs[String](fieldIndex))
case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
statement.setInt(index + 1, item.getAs[Int](fieldIndex))
case "UInt32" | "UInt64" | "Int64" =>
statement.setLong(index + 1, item.getAs[Long](fieldIndex))
case "Float32" => statement.setFloat(index + 1, item.getAs[Float](fieldIndex))
case "Float64" => statement.setDouble(index + 1, item.getAs[Double](fieldIndex))
case "Decimal" => statement.setBigDecimal(index + 1, item.getAs[BigDecimal](fieldIndex))
case "AggregateFunction(groupBitmap, UInt32)" =>
{ val value = item.getAs[mutable.WrappedArray[Int]](fieldIndex)
val bitmap = getBitMapById(value)
statement.setObject(index + 1, ClickHouseBitmap.wrap(bitmap, ClickHouseDataType.UInt32))}
case _ => statement.setString(index + 1, item.getAs[String](fieldIndex))
}
}
3. 遇到的问题
在新版本的jbdc 版本里面用到的是 0.9.10版本的 roaringbitmap
RoaringBitmap-0.9.10.jar
公司spark 集群是 2.4.6 其中 Roarbitmap 是低版本的;缺少高版本用到的方法。
在自己写jdbc spark 写ck 的时候用的是 spark 程序 config 的
--deploy-mode cluster
--conf spark.executor.userClassPathFirst=true
--conf spark.driver.userClassPathFirst=true
--jar /home/xizhi.wu/waterdrop_1_5_1/waterdrop-1.5.1/lib/RoaringBitmap-0.9.10.jar
这样的方式来使spark 程序优先用 高版本的roaringbitmap。
但在sbt 项目中始终没法通过。
最后
以上就是甜甜电脑为你收集整理的waterdrop(1.5.1版本)增加bitmap类型导数的遇到的问题的全部内容,希望文章能够帮你解决waterdrop(1.5.1版本)增加bitmap类型导数的遇到的问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复