概述
目录
1、引入maven依赖
2、CkConfig.java
3、CkClient.java
4、Demo.java
5、CkConnectTest.java
1、引入maven依赖
<!-- OKHttp3依赖 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
</dependency>
2、CkConfig.java
import lombok.Data;
import org.springframework.stereotype.Component;
/**
* Created By ZhuKeqian on 2020/8/11
*/
@Component
@Data
public class CkConfig {
// 在这里我写成固定值
private String url = "http://******";
private String cluster = "your_cluster_name";
private String database = "default";
private String user = "user_name";
private String password = "ck_password";
}
3、CkClient.java
import com.alibaba.fastjson.JSONObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Created By ZhuKeqian on 2020/8/11
*/
@Slf4j
@Component
public class CkClient {
@Autowired
CkConfig ckConfig;
private static OkHttpClient client = null;
public static OkHttpClient getClient() {
if (client == null) {
client = new OkHttpClient.Builder()
.readTimeout(60, TimeUnit.SECONDS)
.connectTimeout(1, TimeUnit.SECONDS).build();
}
return client;
}
private static final String QUERY_STRING = "{"sql":" %s FORMAT JSONEachRow", "database" : "%s", "user" : "%s", "password" : "%s", "cluster" : "%s" }";
// 采用默认ckConfig获取ck数据,并且反序列化为指定的dataClass
public <T> List<T> getData(String sql, Class<T> dataClass) {
long t = System.currentTimeMillis();
List<T> list = new ArrayList<>();
String respStr = getResultString(sql);
String[] lines = respStr.split("n");
for (String line : lines) {
if (StringUtils.isBlank(line)) {
continue;
}
JSONObject obj = JSONObject.parseObject(line);
if (obj.containsKey("error")) {
log.error("query data from ck fail. sql:" + sql + " result:" + line);
throw new BigoException(
ResultCodes.CommResultCode.SYS_ERROR.getRealResultCode("query data from ck fail"));
}
list.add(JSONObject.parseObject(line, dataClass));
}
log.info("ck time spend ={}", System.currentTimeMillis() - t);
return list;
}
public <T> List<T> getData(String sql, Class<T> dataClass, CkConfig ckConfig) {
long t = System.currentTimeMillis();
List<T> list = new ArrayList<>();
String respStr = getResultString(sql, ckConfig);
String[] lines = respStr.split("n");
for (String line : lines) {
if (StringUtils.isBlank(line)) {
continue;
}
JSONObject obj = JSONObject.parseObject(line);
if (obj.containsKey("error")) {
log.error("query data from ck fail. sql:" + sql + " result:" + line);
throw new BigoException(
ResultCodes.CommResultCode.SYS_ERROR.getRealResultCode("query data from ck fail"));
}
list.add(JSONObject.parseObject(line, dataClass));
}
log.info("ck time spend ={}", System.currentTimeMillis() - t);
return list;
}
// 可自配置ckConfig,获取对应ckConfig访问ck返回的result
public String getResultString(String sql, CkConfig ckConfig) {
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, String.format(QUERY_STRING, sql, ckConfig.getDatabase(),
ckConfig.getUser(), ckConfig.getPassword(), ckConfig.getCluster()));
Request request = new Request.Builder()
.url(ckConfig.getUrl())
.post(body)
.addHeader("content-type", "application/json")
.addHeader("cache-control", "no-cache")
.build();
try {
Response response = getClient().newCall(request).execute();
String respStr = response.body().string();
return respStr;
} catch (IOException ex) {
log.error("query data from ck fail. sql:" + sql, ex);
throw new BigoException(
ResultCodes.CommResultCode.SYS_ERROR.getRealResultCode("query data from ck fail"));
}
}
// 采用默认的ckConfig获取ck的result
public String getResultString(String sql) {
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, String.format(QUERY_STRING, sql, ckConfig.getDatabase(),
ckConfig.getUser(), ckConfig.getPassword(), ckConfig.getCluster()));
Request request = new Request.Builder()
.url(ckConfig.getUrl())
.post(body)
.addHeader("content-type", "application/json")
.addHeader("cache-control", "no-cache")
.build();
try {
Response response = getClient().newCall(request).execute();
String respStr = response.body().string();
return respStr;
} catch (IOException ex) {
log.error("query data from ck fail. sql:" + sql, ex);
throw new BigoException(
ResultCodes.CommResultCode.SYS_ERROR.getRealResultCode("query data from ck fail"));
}
}
}
4、Demo.java
下例是使用示例,获取当前时间前移24小时的,整点1小时内的ck数据。
@Service
public class ConnectClickHouse {
private final static Logger LOG = LoggerFactory.getLogger(ConnectClickHouse.class);
@Autowired
CkClient ckClient;
public void getDataFromCk() {
int curTime = (int) (System.currentTimeMillis() / 1000);
int lastTime = curTime;
// 把这个时间戳修正为小时数,并前推24hours
lastTime = lastTime - lastTime % 3600 - total * 3600;
// 每次获取一个小时的数据
// today是ck的限制
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
String nowDate = formatter.format(new Date(lastTime * 1000L));
String reportSql = "select slot_name, country, sum(total) as total, sum(succ) as succ "
+ "from table_name "
+ "where day= '" + nowDate + "' and timestamp >= " + lastTime + " and timestamp <= " + (lastTime + 3599)
+ " group by country,slot_name limit 100000";
//fixed test sql
String testSql = "select slot_name, country, sum(total) as total, sum(succ) as succ "
+ "from table_name "
+ "WHERE day='2020-08-09' and `timestamp` >= 1596941000 "
+ "and `timestamp` <= 1596945599 group by country,slot_name limit 100000";
List<SimpleExample> adcallbackPkgCounterList =
ckClient.getData(reportSql, SimpleExample.class);
}
}
5、CkConnectTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CronApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SyncReportJobTest {
@Autowired
CkClient ckClient;
@Autowired
ConnectClickHouse connectClickHouse;
@Test
public void testCkClient() {
ckClient.getResultString("select * from table_name where day='2020-08-09' and timestamp >= "
+ "1596941000 and timestamp <= 1596945599 limit 100000");
}
@Test
public void testConnectCkClient() {
connectClickHouse.getDataFromCk();
}
}
最后
以上就是清新鸡翅为你收集整理的使用Http方式访问ClickHouse demo的全部内容,希望文章能够帮你解决使用Http方式访问ClickHouse demo所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复