我是靠谱客的博主 敏感树叶,这篇文章主要介绍flink metric对接openTSDB背景方案代码,现在分享给大家,希望可以做个参考。

目录

  • 背景
  • 方案
  • 代码
    • openTSDBReporter
    • openTSDBentity
    • httpClient

背景

    Flink 任务有着丰富监控指标,但是Flink原生(1.10)支持的Reporter只有JMXReporter、GraphiteReporter、InfluxdbReporter、PrometheusReporter、StatsDReporter、DatadogHttpReporter、Slf4jReporter这些Reporter,但是我们公司的监控系统后端使用的是openTSDB,如何把我们的flink任务的metric 接入到我们的监控系统,最后通过grafana展示,成了我们的一个要解决的问题。

方案

    虽然Flink原生不支持openTSDB的Reporter,但是Flink支持我们自定义reporter,只要继承AbstractReporter类并且实现Scheduled接口,就可以实现我们自己的reporter了,将flink任务的metric对接到我们各自的监控系统。
    我们在实现自己的reporter的时候,希望多加几个功能,一个是批量发送,因为和openTSDB交互是通过http进行的,所以希望通过批量发送提高发送效率。另外一个是flink metric还挺多的,我们考虑到openTSDB承受能力,希望加入白名单和黑名单机制。

代码

openTSDBReporter

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
public class openTSDBReporter extends AbstractReporter implements Scheduled { private static String TSDB_URL; private static int Batch_SIZE; private static int Batch_TIMEOUT; private static String WHITELIST; private static List <String> tagBlackList = new ArrayList<>(); private ArrayList<String> whitePrefixList = new ArrayList<>(); private static final Logger log = LoggerFactory.getLogger(openTSDBRepoter.class); private long timestamp = System.currentTimeMillis() / 1000L; private ArrayList <openTSDBentity> entityBuffer = new ArrayList<>(); private JSONArray cmaEntityBuffer = new JSONArray(); public void open(MetricConfig metricConfig) { TSDB_URL = metricConfig.getString("url", "disable"); Batch_SIZE = metricConfig.getInteger("batch_size", 20); Batch_TIMEOUT = metricConfig.getInteger("batch_timeout", 30); WHITELIST = metricConfig.getString("whitelist", "none") +",flink.operator.commitsSucceeded,flink.operator.commitsFailed"; whitePrefixList.add("flink.operator."+metricConfig.getString("whitePrefix", "jj.")); whitePrefixList.add("flink.operator.KafkaConsumer.topic"); whitePrefixList.add("flink.tm"); whitePrefixList.add("flink.jm"); whitePrefixList.add("flink.task"); tagBlackList.add("tm_id"); tagBlackList.add("task_name"); } public void close() { } public String filterCharacters(String s) { return s; } public void report() { for (Map.Entry <Counter, String> metric : counters.entrySet()) { String metricTagString = metric.getValue(); Object value = metric.getKey().getCount(); generateMetricEntity(metricTagString, value); } for (Map.Entry <Gauge <?>, String> metric : gauges.entrySet()) { String metricTagString = metric.getValue(); Object value = metric.getKey().getValue(); generateMetricEntity(metricTagString, value); } for (Map.Entry <Meter, String> metric : meters.entrySet()) { String metricTagString = metric.getValue(); Object value = metric.getKey().getRate(); generateMetricEntity(metricTagString, value); } for (Map.Entry <Histogram, String> metric : histograms.entrySet()) { HistogramStatistics stats = metric.getKey().getStatistics(); String metricTagString = metric.getValue(); Object valueMean = stats.getMean(); generateMetricEntity(metricTagString, valueMean,"mean"); Object valueMin = stats.getMin(); generateMetricEntity(metricTagString, valueMin,"min"); Object valueMax = stats.getMax(); generateMetricEntity(metricTagString, valueMax,"max"); Object value99 = stats.getQuantile(0.99); generateMetricEntity(metricTagString, value99,"p99"); } } private void send(String metricName, long metricTs, Object metricValue, HashMap <String, String> tagMap) { long now = System.currentTimeMillis() / 1000L; boolean match = Pattern.matches("^(\+|-)?\d+($|\.\d+$)",String.valueOf(metricValue)); if(!match){ return; } //openTSDB buffer if(!"disable".equals(TSDB_URL)) { openTSDBentity opentsdbEntity = new openTSDBentity(metricName, metricTs, metricValue, tagMap); entityBuffer.add(opentsdbEntity); if ((entityBuffer.size() >= Batch_SIZE) || (now - timestamp >= Batch_TIMEOUT)) { try { httpClient.doPost(TSDB_URL, JSON.toJSONString(entityBuffer)); } catch (Exception e) { log.error(e.getLocalizedMessage()); System.out.println("send metrics to openTSDB failure:" + e.getMessage()); } entityBuffer.clear(); } } timestamp = now; } private void generateMetricEntity(String metricTagString, Object value,String suffix) { String metricName = metricTagString.substring(metricTagString.indexOf("~flink") + 1); String realMetricName; if(suffix==null){ realMetricName = metricName.replaceAll(" |\~", "."); }else { realMetricName = metricName.replaceAll(" |\~", ".")+"."+suffix; } String[] tagArray = metricTagString .substring(0, metricTagString.indexOf("~flink")) .split("~"); if (WHITELIST.startsWith("all") ||stringStartWithArrayList(whitePrefixList,realMetricName) ||WHITELIST.contains(realMetricName)) { HashMap <String, String> innerTagMap = new HashMap<>(); for (int i = 0; i < tagArray.length; i += 2) { String s = tagArray[(i + 1)].replaceAll(">| ", "").replaceAll(":", "--"); if(!tagBlackList.contains(tagArray[i])) { innerTagMap.put(tagArray[i], s); } } //解决kafka connector metric的问题 if(realMetricName.startsWith("flink.operator.KafkaConsumer")) { if (realMetricName.endsWith("committedOffsets") || realMetricName.endsWith("currentOffsets")) { String ktpTag = realMetricName.substring(realMetricName.indexOf("KafkaConsumer.") + 14, realMetricName.lastIndexOf(".")); innerTagMap.put("topic", ktpTag.split("\.")[1]); innerTagMap.put("partition", ktpTag.split("\.")[3]); realMetricName = "flink.operator.KafkaConsumer" + realMetricName.substring(realMetricName.lastIndexOf(".") + 1); } } if (value instanceof JSONArray) { JSONArray jsonArrayValues = (JSONArray) value; for (int i = 0; i < jsonArrayValues.size(); i++) { HashMap <String, String> fullTagMap = new HashMap <>(innerTagMap); JSONObject singleMetric = jsonArrayValues.getJSONObject(i); JSONArray tags = singleMetric.getJSONArray("tags"); Object realMetricValue = singleMetric.get("value"); for(int j = 0; j < tags.size(); j++){ JSONObject tag =tags.getJSONObject(j); fullTagMap.put(tag.getString("tagK"),tag.getString("tagV")); } send(realMetricName, timestamp, realMetricValue, fullTagMap); } } else { send(realMetricName, timestamp, value, innerTagMap); } } } private void generateMetricEntity(String metricTagString, Object value) { this.generateMetricEntity(metricTagString,value,null); } private boolean stringStartWithArrayList(ArrayList<String> list,String s){ for(String a:list){ if(s.startsWith(a)){ return true; } } return false; } }

openTSDBentity

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.HashMap; public class openTSDBentity { private String metric; private long timestamp; private Object value; private HashMap<String,String> tags; public openTSDBentity(String metric, long timestamp, Object value, HashMap <String, String> tags) { this.metric = metric; this.timestamp = timestamp; this.value = value; this.tags = tags; } public String getMetric() { return metric; } public void setMetric(String metric) { this.metric = metric; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public Object getValue() { return value; } public void setValue(Object value) { this.value = value; } public HashMap <String, String> getTags() { return tags; } public void setTags(HashMap <String, String> tags) { this.tags = tags; } }

httpClient

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.*; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.http.HttpVersion.HTTP; public class httpClient { private static SSLConnectionSocketFactory sslsf = null; private static PoolingHttpClientConnectionManager cm = null; private static SSLContextBuilder builder = null; private static final Logger log = LoggerFactory.getLogger(httpClient.class); static { try { builder = new SSLContextBuilder(); // 全部信任 不做身份鉴定 builder.loadTrustMaterial(null, (TrustStrategy) (x509Certificates, s) -> true); sslsf = new SSLConnectionSocketFactory(builder.build(), new String[]{"SSLv2Hello", "SSLv3", "TLSv1", "TLSv1.2"}, null, NoopHostnameVerifier.INSTANCE); Registry <ConnectionSocketFactory> registry = RegistryBuilder. <ConnectionSocketFactory>create() .register(HTTP, new PlainConnectionSocketFactory()) .register("https", sslsf) .build(); cm = new PoolingHttpClientConnectionManager(registry); cm.setMaxTotal(200);//max connection } catch (Exception e) { e.printStackTrace(); } } public static CloseableHttpClient getHttpClient() { CloseableHttpClient httpClient = HttpClients.custom() .setSSLSocketFactory(sslsf) .setConnectionManager(cm) .setConnectionManagerShared(true) .build(); return httpClient; } public static String doPost(String url, String json) throws Exception { CloseableHttpClient httpclient = HttpClientBuilder.create().build(); HttpPost httppost = new HttpPost(url); String result = getStatus(httpclient, httppost, new StringEntity(json, "UTF-8")); httpclient.close(); if(result.contains("err_code")){ log.debug(url+"=="+json); // System.out.println(url+"=="+json); } return result; } private static String getStatus(CloseableHttpClient httpclient, HttpEntityEnclosingRequestBase req, HttpEntity requestEntity) throws Exception { req.setEntity(requestEntity); req.setHeader("Content-Type","application/json"); HttpResponse response = httpclient.execute(req); int code = response.getStatusLine().getStatusCode(); if (String.valueOf(code).startsWith("2")) { HttpEntity entity = response.getEntity(); String result = ""; if (entity != null) result = EntityUtils.toString(entity, "UTF-8"); return result; } else { return "{"err_code":"" + code+ ""}"; } } }

最后

以上就是敏感树叶最近收集整理的关于flink metric对接openTSDB背景方案代码的全部内容,更多相关flink内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部