公司要求写一个监控系统用来监控2000台flume服务器的运行情况,还必须对服务器进行一些操作,如:重新启动flume,停止flume,更新flume文件等。这里也走了很多弯路,希望能给其他人前车之鉴。
首先来分析系统需求:一、要不间断的读写flume服务器的运行情况,用JSP展示出来;二、要提供一些对服务器的操作动作(上面有提到)。
初步设想是一个socket服务端,一个socket客户端各两个线程应该能解决。
服务端不停的接收客户端发送的状态信息,并且监听指令输入进而对flume服务器进行相应的操作。
客户端采用心跳模式,发送状态信息,并且监听服务端发来的指令,进而执行相关操作。
后来经过一系列测试,是不行的。原因如下:
服务端实现多线程来不间断的接收客户端消息,对客户端发送重启、停止的指令可以顺利执行。但是当传输文件的时候就有问题了。写完文件的的时候必须关掉输出流,否则客户端无法真正读取文件,处在阻塞状态;但是如果关闭了输出流,socket也相应关闭了,就不能继续接收客户端的状态信息了。后来网上有说使用半关闭:shutdownOutputStream();貌似实现了传输文件及继续监听客户机状态信息的功能,但当你再次发生指令的时候,问题来了,之前socket的outputStream被半关闭,没办法再对客户机发送指令。
想来想去,没有好的解决办法,只能把程序拆分。服务端一分为二,客户端也是一分为二。这样就有两个服务端,两个客户端。
服务机:1、不间断的接收客户端的消息。2、当有指令过来的时候跟客户机建立连接操作完成后断开连接;
客户机:1、不间断的发送本地flume运行情况。2、监听客户端的指令输入,进而操作。
服务机1部分的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176public class NioServer { private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>(); private List<Socket> list = new ArrayList<Socket>(); private static final int BUFFER_SIZE = 1024;// 缓存大小 private ServerSocketChannel serverChannel; ExecutorService executor = Executors.newCachedThreadPool(); public NioServer(ServletContext servletContext) { try { executor.execute(new Handler(servletContext)); executor.execute(new Task()); } catch (IOException e) { System.err.println(e.getMessage()); //e.printStackTrace(); } } class Handler implements Runnable { private ServletContext servletContext; private Selector selector; private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); // 字节转字符 private ByteBuffer buffer; public Handler(ServletContext servletContext) throws IOException { this.servletContext = servletContext; } public void run() { try { // 获取一个ServerSocket通道 serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // 从web.xml中context-param节点获取socket端口 String port = this.servletContext .getInitParameter("socketPort"); serverChannel.socket().bind( new InetSocketAddress(Integer.parseInt(port))); selector = Selector.open();// 获取通道管理器 // 将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件, serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Waiting for client connect……"); while (true) { selector.select(); for (Iterator<SelectionKey> itr = selector.selectedKeys() .iterator(); itr.hasNext();) { SelectionKey key = itr.next(); listen(key); itr.remove(); } } } catch (IOException e) { if(!serverChannel.isOpen()||!selector.isOpen()||serverChannel.socket().isClosed()){ new NioServer(servletContext); } } } // 监听 private void listen(SelectionKey key) throws IOException { if (key.isAcceptable()) {// 客户端请求连接事件 ServerSocketChannel server = (ServerSocketChannel) key .channel(); SocketChannel channel = server.accept();// 获得客户端连接通道 Socket s=channel.socket(); s.setKeepAlive(true); s.setSoTimeout(1000); list.add(channel.socket()); channel.configureBlocking(false); System.out.println("a new client connected!"); // 在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。 channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) {// 有可读数据事件 // 获取客户端传输数据可读取消息通道。 SocketChannel channel = (SocketChannel) key.channel(); buffer = ByteBuffer.allocate(BUFFER_SIZE); try { int len = channel.read(buffer); if (len <= 0) { return; }else{ buffer.flip(); DateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); String createTime = df.format(new Date()); UUID uuid = UUID.randomUUID(); CharBuffer charBuffer = decoder.decode(buffer); String msg = charBuffer.toString(); String[] strs = msg.split(","); FlumeInfo info = new FlumeInfo(); for (String str : strs) { String[] texts = str.split("="); String s = texts[0]; if ("name".equals(s)) { info.setName(texts[1]); } else if ("ip".equals(s)) { info.setIP(texts[1].replace("/", "")); } else if ("port".equals(s)) { info.setPort(texts[1]); } else if ("status".equals(s)) { String status = texts[1]; if (status.equalsIgnoreCase("true")) { status = "1"; } else { status = "0"; } info.setStatus(status); } else if ("version".equals(s)) { info.setVersion(texts[1]); } } info.setCode("code"); info.setSystemId(uuid.toString()); info.setCreateTime(createTime); queue.put(info); buffer.clear(); } } catch (Exception e) { } } } } class Task implements Runnable { private Connection conn; private PreparedStatement pstmt; public Task() { DBHelper helper = new DBHelper(); conn = helper.getConnection(); } public void run() { try { while (true) { FlumeInfo flume = queue.take(); StringBuffer sb = new StringBuffer("insert into TB1"); sb.append(" (systemId,name,code,ip,port,status,createTime,version) values (?,?,?,?,?,?,?,?)"); pstmt = conn.prepareStatement(sb.toString()); pstmt.setString(1, flume.getSystemId()); pstmt.setString(2, flume.getName()); pstmt.setString(3, flume.getCode()); pstmt.setString(4, flume.getIP()); pstmt.setString(5, flume.getPort()); pstmt.setString(6, flume.getStatus()); pstmt.setString(7, flume.getCreateTime()); pstmt.setString(8, flume.getVersion()); pstmt.executeUpdate(); } } catch (Exception e) { //e.printStackTrace(); System.err.println(e.getMessage()); } finally { try { if (pstmt != null) pstmt.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } // 关闭服务 public void closeServerSocket() { try { if (serverChannel != null && serverChannel.isOpen()) { serverChannel.close(); } } catch (Exception ex) { System.out.println("SocketThread err:" + ex.getMessage()); } } }
服务端2的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126public class FileSkClient { private static final int BUFFER_SIZE = 1024;// 缓存大小 private static final String IP = "127.0.0.1"; private static final int PORT = 3356; private String filePath = null; private Socket socket = null; private String savePath = null; BufferedReader br = null; PrintWriter out = null; private int sumL = 0; public String handle(String IP, String command, String filePath, String savePath) throws Exception { String result = "FAILED"; try { socket = new Socket(IP, PORT); this.filePath = filePath; this.savePath = savePath; br = new BufferedReader(new InputStreamReader( socket.getInputStream())); out = new PrintWriter(socket.getOutputStream()); if ("0".equals(command)) { out.print("restart"); out.flush(); result = br.readLine(); System.out.println(result); } else if ("1".equals(command)) { out.print("replace"); out.flush(); String isReady = br.readLine(); if ("ready".equals(isReady)) { sumL = 0; boolean b=findFile(socket); if(b)result="SUCCESS"; System.out.println(result); } } else if ("2".equals(command)) { out.print("stop"); out.flush(); result = br.readLine(); System.out.println(result); } } catch (Exception e) { throw new Exception(e); } finally { if(out!=null)out.close(); if(br!=null)br.close(); if(socket!=null)socket.close(); } return result; } // 选择文件 private boolean findFile(Socket s) throws Exception { System.out.println("Begin to zip the file..."); File f = FileUtil.doZip(filePath); if (f == null) { System.err.println("文件打包异常,无法继续"); //throw new FileNotFoundException("文件打包异常,无法继续"); return false; } return sendFile(s, f); } // 发送文件 private boolean sendFile(Socket s, File file) throws Exception { DataOutputStream dos = null; DataInputStream dis = null; try { dos = new DataOutputStream(new BufferedOutputStream( s.getOutputStream())); long l = file.length(); String dir = file.getName(); if (savePath != null && !"".equals(savePath)) { dir = savePath + File.separator + file.getName(); } dos.writeUTF(dir); dos.writeLong(l); dos.flush(); dis = new DataInputStream(new BufferedInputStream( new FileInputStream(file))); byte[] bs = new byte[BUFFER_SIZE]; int length = 0; System.out.println("开始传输文件..."); while ((length = dis.read(bs)) > 0) { sumL += length; dos.write(bs, 0, length); dos.flush(); } dos.close(); dis.close(); if (sumL == l) { System.out.println("文件传输完成"); return true; } } catch (Exception e) { throw new Exception("发送文件失败:" + e.getMessage()); } finally { try { if (dos != null) dos.close(); if (dis != null) dis.close(); if (s != null) s.close(); } catch (IOException e) { e.printStackTrace(); } } return false; } public static void main(String[] args) { System.out.println("请输入命令(0--重启、1--覆盖重启、2--停止):"); Scanner sc = new Scanner(System.in); String str = sc.nextLine(); String filePath = ""; String savePath = ""; try { new FileSkClient().handle(IP, str, filePath, savePath); } catch (Exception e) { e.printStackTrace(); } } }
客户机1实现代码:
1
2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147public class MonitorClient { private static final Logger logger = LoggerFactory .getLogger(MonitorClient.class); private Runtime rt = Runtime.getRuntime(); private Socket s = null; private PrintWriter w = null; private Timer timer = new Timer(); private static int i = 0; private String hostName = InetAddress.getLocalHost().getCanonicalHostName();// 主机名 private static final String configFile = "config.properties";//版本号存放文件 private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息 private static final int TimeOut=60000;//发送数据间隔时间 public static void main(String[] args) throws Exception { new MonitorClient(); } public MonitorClient() throws Exception { try { pro.load(MonitorClient.class.getResourceAsStream(configFile)); String IP=pro.getProperty("server.ip"); String port=pro.getProperty("server.port"); pro.clear(); s = new Socket(IP, Integer.parseInt(port)); s.setKeepAlive(true); System.out.println("connect successed!"); w = new PrintWriter(s.getOutputStream()); timer.schedule(new Processor(), 1000, TimeOut);// 在*秒后执行此任务,每次间隔*秒. } catch (Exception e) { System.err.println("Run error:" + e.getMessage()); if (i < 50) { new MonitorClient();// 重试 i++; }else{ if(w!=null)w.close(); destroyedTimer(); if(s!=null)s.close(); } } } public void destroyedTimer() { if (timer != null) { timer.cancel(); } } // 处理类 class Processor extends TimerTask { public void run() { try { boolean isAlive = isAlive(); String version = getVersion(); StringBuffer sb = new StringBuffer(); sb.append("name="+hostName + ","); sb.append("ip="+s.getLocalAddress() + ","); sb.append("port="+s.getLocalPort() + ","); sb.append("status="+isAlive + ","); sb.append("version="+version); w.println(sb.toString()); w.flush(); } catch (Exception e) { e.printStackTrace(); logger.info("Run error!" + e.getMessage()); } } } // 判断是否是windows操作系统 private boolean isWin() { String os = System.getProperty("os.name").toLowerCase(); if (os.startsWith("win")) {// 如果是Windows操作系统 return true; } return false; } /** * 判断是否运行 * * @param rt * @return * @throws IOException */ private boolean isAlive() { boolean isAlive = false; try { if (isWin()) { String program = "CWAgen.exe"; String command = "TASKLIST.EXE /FI "IMAGENAME EQ CWAGEN.EXE" /FO CSV /NH"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.indexOf(program) != -1) { System.out.println("This application is running:" + line); isAlive = true; } else { System.out.println("This application is closed"); } } read.close(); p.destroy(); }else{ String command = "ps -ef | grep "CWAgen" | wc -l"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.equals("1")) { System.out.println("This application is running:" + line); isAlive = true; } else if (line.equals("0")){ System.out.println("This application is closed"); } } read.close(); p.destroy(); } } catch (Exception e) { e.printStackTrace(); } return isAlive; } /** * 获取版本号 * * @return */ public String getVersion() { try { pro.load(MonitorClient.class.getResourceAsStream(configFile)); return pro.getProperty("flume.version"); } catch (IOException e) { e.printStackTrace(); return null; }finally{ pro.clear(); } } }
客户机2部分实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460public class FileSkServer { ExecutorService exec = Executors.newCachedThreadPool(); private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符 private static final String FAILED_MSG = "FAILED";// 失败标识符 private static final String REPLACE = "replace";// 操作命令 private static final String RESTART = "restart";// 操作命令 private static final String STOP = "stop";// 操作命令 private static final int BUFFER_SIZE = 1024;// 缓存大小 private static final String configFile = "config.properties";// 版本号存放文件 private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息 private String PATH = ""; private Runtime rt = Runtime.getRuntime(); private static boolean flag = true; Socket socket = null; public FileSkServer() { ServerSocket server = null; try { pro.load(FileSkServer.class.getResourceAsStream(configFile)); String port = pro.getProperty("fileServer.port"); server = new ServerSocket(Integer.parseInt(port));// 绑定接受数据端口 System.out.println("Waiting for client connect……"); while (flag) { socket = server.accept(); exec.execute(new Handler(socket)); } } catch (IOException e) { //e.printStackTrace(); if(server.isClosed()||socket.isClosed()){ new FileSkServer(); } } finally { } } public static void main(String[] args) { new FileSkServer(); } // 判断是否是windows操作系统 private boolean isWin() throws IOException { boolean flag = false; String os = System.getProperty("os.name").toLowerCase(); pro.load(FileSkServer.class.getResourceAsStream(configFile)); if (os.startsWith("win")) {// 如果是Windows操作系统 flag = true; }else{ PATH = pro.getProperty("flume_home_linux"); } return flag; } // 判断系统架构(32位或64位) private boolean is64bit() throws IOException { boolean flag = false; String os = System.getProperty("os.arch"); pro.load(MonitorClient.class.getResourceAsStream(configFile)); if (os.contains("86") || os.contains("32")) { PATH = pro.getProperty("flume_home_win_32"); } else if (os.contains("64")) { PATH = pro.getProperty("flume_home_win_64"); flag= true; } return flag; } // 停止系统 private boolean stop() { System.out.println("Begin to stop this application..."); try { String command = ""; if (isWin()) { is64bit(); command = "cmd /c " + PATH + "bin\stopCWAgen.bat start"; } else { command = PATH + "bin/stopCWAgen.sh"; } if (isAlive()) {// 结束TOMCAT进程 Process p= rt.exec(command, null, new File(PATH + "bin\")); final BufferedReader br2 = new BufferedReader( new InputStreamReader(p.getErrorStream(), "UTF-8")); final BufferedReader br = new BufferedReader( new InputStreamReader(p.getInputStream(), "UTF-8")); Thread t1 = new Thread() { public void run() { String line = null; try { while ((line = br2.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; Thread t2 = new Thread() { public void run() { String line = null; try { while ((line = br.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; t1.start(); t2.start(); // if (p.waitFor() != 0) { // if (p.exitValue() == 1) {// 0表示正常结束,1:非正常结束 // System.err.println("Stop Failed!"); // } // } if (isAlive()) kill();// 强制结束进程 } } catch (Exception e) { e.printStackTrace(); return false; } return true; } // 启动系统 private String start() { System.out.println("starting run this application..."); String result = FAILED_MSG; try { if (!isAlive()) { String command = ""; Process p = null; if (isWin()) { if (is64bit()) { command = "cmd /c " + PATH + "bin\startCWAgen.bat start"; } else { command = "cmd /c " + PATH + "bin\startCWAgen_32.bat start"; } } else { command = PATH + "bin/startup.sh"; } p = rt.exec(command, null, new File(PATH + "bin\")); final BufferedReader br2 = new BufferedReader( new InputStreamReader(p.getErrorStream(), "UTF-8")); final BufferedReader br = new BufferedReader( new InputStreamReader(p.getInputStream(), "UTF-8")); Thread t1 = new Thread() { public void run() { String line = null; try { while ((line = br2.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; Thread t2 = new Thread() { public void run() { String line = null; try { while ((line = br.readLine()) != null) { System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }; t1.start(); t2.start(); // if (p.waitFor() != 0) { // if (p.exitValue() == 1) { // System.err.println("Start failed!"); // return FAILED_MSG; // } // } if (isAlive()) { System.out.println("Start successfully!"); return SUCCESS_MSG; } } } catch (Exception e) { e.printStackTrace(); } return result; } /** * 判断是否运行 * * @param rt * @return * @throws IOException */ private boolean isAlive() { boolean isAlive = false; try { if (isWin()) { String program = "CWAgen.exe"; String command = "TASKLIST.EXE /FI "IMAGENAME EQ CWAGEN.EXE" /FO CSV /NH"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.indexOf(program) != -1) { System.out.println("This application is running:" + line); isAlive = true; } else { System.out.println("This application is closed"); } } read.close(); p.destroy(); } else { String command = "ps -ef | grep "CWAgen" | wc -l"; Process p = rt.exec(command); BufferedReader read = new BufferedReader(new InputStreamReader( p.getInputStream())); String line = ""; while ((line = read.readLine()) != null) { if (line.equals("1")) { System.out.println("This application is running:" + line); isAlive = true; } else if (line.equals("0")) { System.out.println("This application is closed"); } } read.close(); p.destroy(); } } catch (Exception e) { e.printStackTrace(); } return isAlive; } /** * 强制结束进程 * * @param rt * @throws IOException */ public boolean kill() { try { System.out.println("Begin to stop this application..."); if (isAlive()) { Process p = null; if (isWin()) { String cmd = "taskkill -f -im CWAgen.exe"; p = rt.exec(cmd); } else { String[] cmd = { "sh", "-c", "ps aux | grep CWAgen" }; p = rt.exec(cmd); InputStreamReader is = new InputStreamReader( p.getInputStream()); BufferedReader read = new BufferedReader(is); String line = null; while ((line = read.readLine()) != null) { if (line.indexOf("org.apache.catalina.startup.Bootstrap start") >= 0) { String tomcatPid = line.split("\s+")[1]; rt.exec("kill -9 " + tomcatPid); } } is.close(); read.close(); } if (p.waitFor() != 0) { if (p.exitValue() == 1) { System.err.println("Start failed!"); } } p.destroy(); } if (!isAlive()) { System.out.println("Stop successfully!"); return true; } } catch (Exception e) { e.printStackTrace(); } return false; } /** * 命令处理类 * * @author T430 * */ class Handler implements Runnable { Socket s; double sumL = 0; PrintWriter w = null; InputStream in = null; public Handler(Socket s) { this.s = s; } public void run() { DataInputStream dis = null; BufferedOutputStream bout = null; try { String command = null; if (flag) { int len = 0; byte[] b = new byte[BUFFER_SIZE]; in = s.getInputStream(); w = new PrintWriter(s.getOutputStream()); if ((len = in.read(b)) != -1) { command = new String(b, 0, len); System.out.println(s.getLocalPort() + "command: " + command); } } if (REPLACE.equalsIgnoreCase(command)) { flag = false; stop(); System.out.println(PATH); w.println("ready"); w.flush(); dis = new DataInputStream(in); String fileName = dis.readUTF(); long fileLength = dis.readLong(); String dir = PATH + fileName; File file = new File(dir); if (!file.exists()) { file.createNewFile(); } byte[] buf = new byte[BUFFER_SIZE]; int read = 0; System.out.println("Begin to receive file(" + fileName + "),size(" + fileLength + "B)..."); bout = new BufferedOutputStream(new FileOutputStream(file)); while ((read = dis.read(buf)) != -1) { sumL += read; bout.write(buf, 0, read); bout.flush(); } bout.close(); dis.close(); if (fileLength == sumL) { System.out.println("Received over!The save path:" + file.getPath()); w.println("Received over!The save path:" + file.getPath()); w.flush(); boolean isUnZip = unZip(file, file.getParent());// 解压文件 if (isUnZip) { String result = start(); w.println(result); w.flush(); } else { w.println("Failed"); w.flush(); } } flag = true; } else if (RESTART.equalsIgnoreCase(command)) { String result = FAILED_MSG; if (stop()) { result = start(); } w.println(result); w.flush(); } else if (STOP.equalsIgnoreCase(command)) { boolean b = stop(); String result = FAILED_MSG; if (b) result = SUCCESS_MSG; w.println(result); w.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (w != null) w.close(); if (dis != null) dis.close(); if (bout != null) bout.close(); if (s != null) s.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 解压一个文件 * * @param zipfilename * 解压的文件 * @param destDir * 解压的目录 */ private boolean unZip(File file, String destDir) { if (destDir == null) destDir = file.getParent(); OutputStream os = null; InputStream is = null; if (!file.isFile() || !file.getName().endsWith(".zip")) { System.out.println("The file cannot unzip"); return false; } else { try { int length; byte b[] = new byte[1024]; destDir = destDir.endsWith("\") ? destDir : destDir + "\"; ZipFile zipFile = new ZipFile(file); Enumeration<?> enumeration = zipFile.entries(); ZipEntry zipEntry = null; System.out.println("Begin to unzip the file..."); while (enumeration.hasMoreElements()) { zipEntry = (ZipEntry) enumeration.nextElement(); File loadFile = new File(destDir + zipEntry.getName()); // 判断压缩文件中的某个条目是文件夹还是文件 if (zipEntry.isDirectory()) {// 如果是目录,那么判断该文件是否已存在并且不是一个文件夹,解决空文件夹解压后不存在的问题 if (!loadFile.exists()) { loadFile.mkdirs(); } } else { if (!loadFile.getParentFile().exists()) { loadFile.getParentFile().mkdirs(); } os = new FileOutputStream(loadFile); is = zipFile.getInputStream(zipEntry); while ((length = is.read(b)) > 0) { os.write(b, 0, length); os.flush(); } } } zipFile.close(); is.close(); os.close(); file.delete(); System.out.println("unzip successed"); return true; } catch (Exception e) { e.printStackTrace(); return false; } } } } }
写到这里基本能实现上述需求,接下来是服务端的界面。
用到了Spring MVC,因为对这个不是很了解,所有写的很简单。
总体是采用Spring mvc+mybatis的架构,数据库是DB2,用到了EasyUI 1.4(很强大,以后丰富一下这方面的知识)
整个项目,包括建表语句、客户机实现都提供了下载:http://download.csdn.net/detail/tiantang_1986/8142325
最后
以上就是耍酷蓝天最近收集整理的关于socket之flume监控系统的全部内容,更多相关socket之flume监控系统内容请搜索靠谱客的其他文章。
发表评论 取消回复