概述
目录
一、Kafka 创建topic、生产者
二、向kafka生产数据
三、Apache Druid 配置DataSource 数据源
1) Start
2) Connect
3) Pase Data
4) Pase Time
5) Transform【可跳过】
6) Filter 【可跳过】
7) Configure Schema【重点配置】
8) Partition
9) Tune
10) Pulish
11) Edit Json spec
案例一:销售数据查询示例
案例二:通过服务器系统时间(毫秒)作为时间戳上传
配置Apache Druid 数据源DataSource
案例三:通过服务器系统时间(秒)作为时间戳上传
配置Apache Druid 数据源DataSource
一、Kafka 创建topic、生产者
1. 创建topic
kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales
2. 创建生产者
kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales
3. 创建消费者
kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales --group topic_test1_g1
二、向kafka生产数据
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}
三、Apache Druid 配置DataSource 数据源
1) Start
2) Connect
3) Pase Data
4) Pase Time
5) Transform【可跳过】
6) Filter 【可跳过】
7) Configure Schema【重点配置】
8) Partition
9) Tune
10) Pulish
Max parse exceptions: 2147483647
11) Edit Json spec
{
"type": "kafka",
"dataSchema": {
"dataSource": "fast_sales",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"areaName",
"category"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "longSum",
"name": "sum_monye",
"fieldName": "monye",
"expression": null
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "MINUTE",
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": true,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "fast_sales",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "fast_sales",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"context": null,
"suspended": false
}
案例一:销售数据查询示例
1)数据源
2)回忆向Kafka输入数据有,如下:
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}
-- 查询所有数据
-- 按时间范围查询数据
-- 查询输入数据总记录数
-- 按地域、商品类别分类,统计销售总金额
-- 按地域分组,计算消费总额
-- 按商品品类分组,计算消费总额
-- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
案例二:通过服务器系统时间(毫秒)作为时间戳上传
1. 创建topic
kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales_test_timestamp
2. 创建生产者
kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp
3. 创建消费者
kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp --group topic_test1_g1
4. Kafka 生产数据
-- 需要在服务器系统时间戳上加上8小时对应的毫秒数偏移量
1609549913324 对应 2021-01-02 09:11:53 + 28800000 = 1609578713324 对应 2021-01-02 17:11:53
1609550385297 对应 2021-01-02 09:19:45 + 28800000 = 1609579185297 对应 2021-01-02 17:19:45
{"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609549913324","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2200"}
============================== 加上偏8小时偏移后的数据 =================================
{"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609578713324","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2200"}
配置Apache Druid 数据源DataSource
{
"type": "kafka",
"dataSchema": {
"dataSource": "fast_sales_test_timestamp",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"areaName",
"category"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "longSum",
"name": "sum_monye",
"fieldName": "monye",
"expression": null
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "MINUTE",
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": true,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "fast_sales_test_timestamp",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "fast_sales_test_timestamp",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"context": null,
"suspended": false
}
查询数据示例:
SELECT * FROM "fast_sales_test_timestamp" -- 查询所有数据
-- SELECT * FROM "fast_sales_test_timestamp" WHERE __time <= '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
-- SELECT * FROM "fast_sales_test_timestamp" WHERE __time > '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
-- SELECT SUM("count") FROM "fast_sales_test_timestamp" -- 查询输入数据总记录数
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
-- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName -- 按地域分组,计算消费总额
-- SELECT category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY category -- 按商品品类分组,计算消费总额
-- SELECT SUM(sum_monye) FROM "fast_sales_test_timestamp" -- 计算消费总额
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp"
-- WHERE __time <= '2021-01-02T09:11:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
- SELECT * FROM "fast_sales_test_timestamp" -- 查询所有数据
- SELECT * FROM "fast_sales_test_timestamp" WHERE __time <= '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
- SELECT * FROM "fast_sales_test_timestamp" WHERE __time > '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据
- SELECT SUM("count") FROM "fast_sales_test_timestamp" -- 查询输入数据总记录数
- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName -- 按地域分组,计算消费总额
- SELECT category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY category -- 按商品品类分组,计算消费总额
- SELECT SUM(sum_monye) FROM "fast_sales_test_timestamp" -- 计算消费总额
SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp"
WHERE __time <= '2021-01-02T09:11:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
案例三:通过服务器系统时间(秒)作为时间戳上传
1. 创建topic
kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales_test_second
2. 创建生产者
kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_second
3. 创建消费者
kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_second --group topic_test1_g1
4. Kafka 生产数据
-- 需要在服务器系统时间戳上加上8小时对应的秒数【28800秒】偏移量
1609555315 对应 2021-01-02 10:41:55 + 28800 = 1609584115 对应 2021-01-02 18:41:55
1609555530 对应 2021-01-02 10:45:30 + 28800 = 1609584330 对应 2021-01-02 18:45:30
{"timestamp":"1609555315","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609555315","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609555315","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609555530","category":"手机","areaName":"深圳","monye":"2200"}
-- 需要在服务器系统时间戳上加上8小时对应的秒数【28800】偏移量
{"timestamp":"1609584115","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609584115","category":"手机","areaName":"北京","monye":"1450"}
{"timestamp":"1609584115","category":"家电","areaName":"北京","monye":"1550"}
{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"1000"}
{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"2000"}
{"timestamp":"1609584330","category":"手机","areaName":"深圳","monye":"2200"}
配置Apache Druid 数据源DataSource
{
"type": "kafka",
"dataSchema": {
"dataSource": "fast_sales_test_second",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"dimensionsSpec": {
"dimensions": [
"areaName",
"category"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "sum_monye",
"type": "longSum",
"fieldName": "monye"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "MINUTE",
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": true,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "fast_sales_test_second",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "fast_sales_test_timestamp",
"useEarliestSequenceNumber": false,
"type": "kafka"
},
"context": null,
"suspended": false
}
查询数据示例:
SELECT * FROM "fast_sales_test_second" -- 查询所有数据
-- SELECT * FROM "fast_sales_test_second" WHERE __time <= '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
-- SELECT * FROM "fast_sales_test_second" WHERE __time > '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
-- SELECT SUM("count") FROM "fast_sales_test_second" -- 查询输入数据总记录数
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
-- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName -- 按地域分组,计算消费总额
-- SELECT category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY category -- 按商品品类分组,计算消费总额
-- SELECT SUM(sum_monye) FROM "fast_sales_test_second" -- 计算消费总额
-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second"
-- WHERE __time <= '2021-01-02T10:41:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
- SELECT * FROM "fast_sales_test_second" -- 查询所有数据
- SELECT * FROM "fast_sales_test_second" WHERE __time <= '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
- SELECT * FROM "fast_sales_test_second" WHERE __time > '2021-01-02T10:41:00.000Z' -- 按时间范围查询数据
- SELECT SUM("count") FROM "fast_sales_test_second" -- 查询输入数据总记录数
- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName, category -- 按地域、商品品类分组,计算消费总额
- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY areaName -- 按地域分组,计算消费总额
- SELECT category, SUM(sum_monye) FROM "fast_sales_test_second" GROUP BY category -- 按商品品类分组,计算消费总额
- SELECT SUM(sum_monye) FROM "fast_sales_test_second" -- 计算消费总额
SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_second"
WHERE __time <= '2021-01-02T10:41:00.000Z' GROUP BY areaName, category -- 先搂时间范围过滤,再按地域、商品品类分组,计算消费总额
案例四:通过服务器系统时间(20210101082805)作为时间戳上传
此方式不可行,必须通过时间戳进行时间传递,同时也方便进行时间方面的校正。
文章最后,给大家推荐一些受欢迎的技术博客链接:
- JAVA相关的深度技术博客链接
- Flink 相关技术博客链接
- Spark 核心技术链接
- 设计模式 —— 深度技术博客链接
- 机器学习 —— 深度技术博客链接
- Hadoop相关技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
- 深入聊聊Java 垃圾回收机制【附原理图及调优方法】
欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!
最后
以上就是优美夕阳为你收集整理的Apache Druid 从Kafka加载数据 -- 全流程分析一、Kafka 创建topic、生产者二、向kafka生产数据三、Apache Druid 配置DataSource 数据源案例一:销售数据查询示例案例二:通过服务器系统时间(毫秒)作为时间戳上传案例三:通过服务器系统时间(秒)作为时间戳上传案例四:通过服务器系统时间(20210101082805)作为时间戳上传的全部内容,希望文章能够帮你解决Apache Druid 从Kafka加载数据 -- 全流程分析一、Kafka 创建topic、生产者二、向kafka生产数据三、Apache Druid 配置DataSource 数据源案例一:销售数据查询示例案例二:通过服务器系统时间(毫秒)作为时间戳上传案例三:通过服务器系统时间(秒)作为时间戳上传案例四:通过服务器系统时间(20210101082805)作为时间戳上传所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复