概述
目录
- 背景
- 方案
- 代码
- openTSDBReporter
- openTSDBentity
- httpClient
背景
Flink 任务有着丰富监控指标,但是Flink原生(1.10)支持的Reporter只有JMXReporter、GraphiteReporter、InfluxdbReporter、PrometheusReporter、StatsDReporter、DatadogHttpReporter、Slf4jReporter这些Reporter,但是我们公司的监控系统后端使用的是openTSDB,如何把我们的flink任务的metric 接入到我们的监控系统,最后通过grafana展示,成了我们的一个要解决的问题。
方案
虽然Flink原生不支持openTSDB的Reporter,但是Flink支持我们自定义reporter,只要继承AbstractReporter类并且实现Scheduled接口,就可以实现我们自己的reporter了,将flink任务的metric对接到我们各自的监控系统。
我们在实现自己的reporter的时候,希望多加几个功能,一个是批量发送,因为和openTSDB交互是通过http进行的,所以希望通过批量发送提高发送效率。另外一个是flink metric还挺多的,我们考虑到openTSDB承受能力,希望加入白名单和黑名单机制。
代码
openTSDBReporter
public class openTSDBReporter extends AbstractReporter implements Scheduled {
private static String TSDB_URL;
private static int Batch_SIZE;
private static int Batch_TIMEOUT;
private static String WHITELIST;
private static List <String> tagBlackList = new ArrayList<>();
private ArrayList<String> whitePrefixList = new ArrayList<>();
private static final Logger log = LoggerFactory.getLogger(openTSDBRepoter.class);
private long timestamp = System.currentTimeMillis() / 1000L;
private ArrayList <openTSDBentity> entityBuffer = new ArrayList<>();
private JSONArray cmaEntityBuffer = new JSONArray();
public void open(MetricConfig metricConfig) {
TSDB_URL = metricConfig.getString("url", "disable");
Batch_SIZE = metricConfig.getInteger("batch_size", 20);
Batch_TIMEOUT = metricConfig.getInteger("batch_timeout", 30);
WHITELIST = metricConfig.getString("whitelist", "none")
+",flink.operator.commitsSucceeded,flink.operator.commitsFailed";
whitePrefixList.add("flink.operator."+metricConfig.getString("whitePrefix", "jj."));
whitePrefixList.add("flink.operator.KafkaConsumer.topic");
whitePrefixList.add("flink.tm");
whitePrefixList.add("flink.jm");
whitePrefixList.add("flink.task");
tagBlackList.add("tm_id");
tagBlackList.add("task_name");
}
public void close() {
}
public String filterCharacters(String s) {
return s;
}
public void report() {
for (Map.Entry <Counter, String> metric : counters.entrySet()) {
String metricTagString = metric.getValue();
Object value = metric.getKey().getCount();
generateMetricEntity(metricTagString, value);
}
for (Map.Entry <Gauge <?>, String> metric : gauges.entrySet()) {
String metricTagString = metric.getValue();
Object value = metric.getKey().getValue();
generateMetricEntity(metricTagString, value);
}
for (Map.Entry <Meter, String> metric : meters.entrySet()) {
String metricTagString = metric.getValue();
Object value = metric.getKey().getRate();
generateMetricEntity(metricTagString, value);
}
for (Map.Entry <Histogram, String> metric : histograms.entrySet()) {
HistogramStatistics stats = metric.getKey().getStatistics();
String metricTagString = metric.getValue();
Object valueMean = stats.getMean();
generateMetricEntity(metricTagString, valueMean,"mean");
Object valueMin = stats.getMin();
generateMetricEntity(metricTagString, valueMin,"min");
Object valueMax = stats.getMax();
generateMetricEntity(metricTagString, valueMax,"max");
Object value99 = stats.getQuantile(0.99);
generateMetricEntity(metricTagString, value99,"p99");
}
}
private void send(String metricName, long metricTs, Object metricValue, HashMap <String, String> tagMap) {
long now = System.currentTimeMillis() / 1000L;
boolean match = Pattern.matches("^(\+|-)?\d+($|\.\d+$)",String.valueOf(metricValue));
if(!match){
return;
}
//openTSDB buffer
if(!"disable".equals(TSDB_URL)) {
openTSDBentity opentsdbEntity = new openTSDBentity(metricName, metricTs, metricValue, tagMap);
entityBuffer.add(opentsdbEntity);
if ((entityBuffer.size() >= Batch_SIZE) || (now - timestamp >= Batch_TIMEOUT)) {
try {
httpClient.doPost(TSDB_URL, JSON.toJSONString(entityBuffer));
} catch (Exception e) {
log.error(e.getLocalizedMessage());
System.out.println("send metrics to openTSDB failure:" + e.getMessage());
}
entityBuffer.clear();
}
}
timestamp = now;
}
private void generateMetricEntity(String metricTagString, Object value,String suffix) {
String metricName = metricTagString.substring(metricTagString.indexOf("~flink") + 1);
String realMetricName;
if(suffix==null){
realMetricName = metricName.replaceAll(" |\~", ".");
}else {
realMetricName = metricName.replaceAll(" |\~", ".")+"."+suffix;
}
String[] tagArray = metricTagString
.substring(0, metricTagString.indexOf("~flink"))
.split("~");
if (WHITELIST.startsWith("all")
||stringStartWithArrayList(whitePrefixList,realMetricName)
||WHITELIST.contains(realMetricName)) {
HashMap <String, String> innerTagMap = new HashMap<>();
for (int i = 0; i < tagArray.length; i += 2) {
String s = tagArray[(i + 1)].replaceAll(">| ", "").replaceAll(":", "--");
if(!tagBlackList.contains(tagArray[i])) {
innerTagMap.put(tagArray[i], s);
}
}
//解决kafka connector metric的问题
if(realMetricName.startsWith("flink.operator.KafkaConsumer")) {
if (realMetricName.endsWith("committedOffsets")
|| realMetricName.endsWith("currentOffsets")) {
String ktpTag = realMetricName.substring(realMetricName.indexOf("KafkaConsumer.") + 14, realMetricName.lastIndexOf("."));
innerTagMap.put("topic", ktpTag.split("\.")[1]);
innerTagMap.put("partition", ktpTag.split("\.")[3]);
realMetricName = "flink.operator.KafkaConsumer" + realMetricName.substring(realMetricName.lastIndexOf(".") + 1);
}
}
if (value instanceof JSONArray) {
JSONArray jsonArrayValues = (JSONArray) value;
for (int i = 0; i < jsonArrayValues.size(); i++) {
HashMap <String, String> fullTagMap = new HashMap <>(innerTagMap);
JSONObject singleMetric = jsonArrayValues.getJSONObject(i);
JSONArray tags = singleMetric.getJSONArray("tags");
Object realMetricValue = singleMetric.get("value");
for(int j = 0; j < tags.size(); j++){
JSONObject tag =tags.getJSONObject(j);
fullTagMap.put(tag.getString("tagK"),tag.getString("tagV"));
}
send(realMetricName, timestamp, realMetricValue, fullTagMap);
}
} else {
send(realMetricName, timestamp, value, innerTagMap);
}
}
}
private void generateMetricEntity(String metricTagString, Object value) {
this.generateMetricEntity(metricTagString,value,null);
}
private boolean stringStartWithArrayList(ArrayList<String> list,String s){
for(String a:list){
if(s.startsWith(a)){
return true;
}
}
return false;
}
}
openTSDBentity
import java.util.HashMap;
public class openTSDBentity {
private String metric;
private long timestamp;
private Object value;
private HashMap<String,String> tags;
public openTSDBentity(String metric, long timestamp, Object value, HashMap <String, String> tags) {
this.metric = metric;
this.timestamp = timestamp;
this.value = value;
this.tags = tags;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
public HashMap <String, String> getTags() {
return tags;
}
public void setTags(HashMap <String, String> tags) {
this.tags = tags;
}
}
httpClient
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.*;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.http.HttpVersion.HTTP;
public class httpClient {
private static SSLConnectionSocketFactory sslsf = null;
private static PoolingHttpClientConnectionManager cm = null;
private static SSLContextBuilder builder = null;
private static final Logger log = LoggerFactory.getLogger(httpClient.class);
static {
try {
builder = new SSLContextBuilder();
// 全部信任 不做身份鉴定
builder.loadTrustMaterial(null, (TrustStrategy) (x509Certificates, s) -> true);
sslsf = new SSLConnectionSocketFactory(builder.build(), new String[]{"SSLv2Hello", "SSLv3", "TLSv1", "TLSv1.2"}, null, NoopHostnameVerifier.INSTANCE);
Registry <ConnectionSocketFactory> registry = RegistryBuilder. <ConnectionSocketFactory>create()
.register(HTTP, new PlainConnectionSocketFactory())
.register("https", sslsf)
.build();
cm = new PoolingHttpClientConnectionManager(registry);
cm.setMaxTotal(200);//max connection
} catch (Exception e) {
e.printStackTrace();
}
}
public static CloseableHttpClient getHttpClient() {
CloseableHttpClient httpClient = HttpClients.custom()
.setSSLSocketFactory(sslsf)
.setConnectionManager(cm)
.setConnectionManagerShared(true)
.build();
return httpClient;
}
public static String doPost(String url, String json) throws Exception {
CloseableHttpClient httpclient = HttpClientBuilder.create().build();
HttpPost httppost = new HttpPost(url);
String result = getStatus(httpclient, httppost, new StringEntity(json, "UTF-8"));
httpclient.close();
if(result.contains("err_code")){
log.debug(url+"=="+json);
// System.out.println(url+"=="+json);
}
return result;
}
private static String getStatus(CloseableHttpClient httpclient, HttpEntityEnclosingRequestBase req, HttpEntity requestEntity) throws Exception {
req.setEntity(requestEntity);
req.setHeader("Content-Type","application/json");
HttpResponse response = httpclient.execute(req);
int code = response.getStatusLine().getStatusCode();
if (String.valueOf(code).startsWith("2")) {
HttpEntity entity = response.getEntity();
String result = "";
if (entity != null)
result = EntityUtils.toString(entity, "UTF-8");
return result;
} else {
return "{"err_code":"" + code+ ""}";
}
}
}
最后
以上就是敏感树叶为你收集整理的flink metric对接openTSDB背景方案代码的全部内容,希望文章能够帮你解决flink metric对接openTSDB背景方案代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复