我是靠谱客的博主 耍酷蓝天,最近开发中收集的这篇文章主要介绍socket之flume监控系统,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

公司要求写一个监控系统用来监控2000台flume服务器的运行情况,还必须对服务器进行一些操作,如:重新启动flume,停止flume,更新flume文件等。这里也走了很多弯路,希望能给其他人前车之鉴。

首先来分析系统需求:一、要不间断的读写flume服务器的运行情况,用JSP展示出来;二、要提供一些对服务器的操作动作(上面有提到)。

初步设想是一个socket服务端,一个socket客户端各两个线程应该能解决。

服务端不停的接收客户端发送的状态信息,并且监听指令输入进而对flume服务器进行相应的操作。

客户端采用心跳模式,发送状态信息,并且监听服务端发来的指令,进而执行相关操作。

后来经过一系列测试,是不行的。原因如下:

服务端实现多线程来不间断的接收客户端消息,对客户端发送重启、停止的指令可以顺利执行。但是当传输文件的时候就有问题了。写完文件的的时候必须关掉输出流,否则客户端无法真正读取文件,处在阻塞状态;但是如果关闭了输出流,socket也相应关闭了,就不能继续接收客户端的状态信息了。后来网上有说使用半关闭:shutdownOutputStream();貌似实现了传输文件及继续监听客户机状态信息的功能,但当你再次发生指令的时候,问题来了,之前socket的outputStream被半关闭,没办法再对客户机发送指令。

想来想去,没有好的解决办法,只能把程序拆分。服务端一分为二,客户端也是一分为二。这样就有两个服务端,两个客户端。

服务机:1、不间断的接收客户端的消息。2、当有指令过来的时候跟客户机建立连接操作完成后断开连接;

客户机:1、不间断的发送本地flume运行情况。2、监听客户端的指令输入,进而操作。

服务机1部分的实现代码:

public class NioServer {
	private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>();
	private List<Socket> list = new ArrayList<Socket>();
	private static final int BUFFER_SIZE = 1024;// 缓存大小
	private ServerSocketChannel serverChannel;
	ExecutorService executor = Executors.newCachedThreadPool();

	public NioServer(ServletContext servletContext) {
		try {
			executor.execute(new Handler(servletContext));
			executor.execute(new Task());
		} catch (IOException e) {
			System.err.println(e.getMessage());
			//e.printStackTrace();
		}
	}

	class Handler implements Runnable {
		private ServletContext servletContext;
		private Selector selector;
		private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); // 字节转字符
		private ByteBuffer buffer;

		public Handler(ServletContext servletContext) throws IOException {
			this.servletContext = servletContext;
		}

		public void run() {
			try {
				// 获取一个ServerSocket通道
				serverChannel = ServerSocketChannel.open();
				serverChannel.configureBlocking(false);
				// 从web.xml中context-param节点获取socket端口
				String port = this.servletContext
						.getInitParameter("socketPort");
				serverChannel.socket().bind(
						new InetSocketAddress(Integer.parseInt(port)));
				selector = Selector.open();// 获取通道管理器
				// 将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
				serverChannel.register(selector, SelectionKey.OP_ACCEPT);
				System.out.println("Waiting for client connect……");
				while (true) {
					selector.select();
					for (Iterator<SelectionKey> itr = selector.selectedKeys()
							.iterator(); itr.hasNext();) {
						SelectionKey key = itr.next();
						listen(key);
						itr.remove();
					}
				}
			} catch (IOException e) {
				if(!serverChannel.isOpen()||!selector.isOpen()||serverChannel.socket().isClosed()){
					new NioServer(servletContext);
				}
			}
		}

		// 监听
		private void listen(SelectionKey key) throws IOException {
			if (key.isAcceptable()) {// 客户端请求连接事件
				ServerSocketChannel server = (ServerSocketChannel) key
						.channel();
				SocketChannel channel = server.accept();// 获得客户端连接通道
				Socket s=channel.socket();
				s.setKeepAlive(true);
				s.setSoTimeout(1000);
				list.add(channel.socket());
				channel.configureBlocking(false);
				System.out.println("a new client connected!");
				// 在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。
				channel.register(selector, SelectionKey.OP_READ);
			} else if (key.isReadable()) {// 有可读数据事件
				// 获取客户端传输数据可读取消息通道。
				SocketChannel channel = (SocketChannel) key.channel();
				buffer = ByteBuffer.allocate(BUFFER_SIZE);
				try {
					int len = channel.read(buffer);
					if (len <= 0) {
						return;
					}else{
						buffer.flip();
						DateFormat df = new SimpleDateFormat(
								"yyyy-MM-dd HH:mm:ss");
						String createTime = df.format(new Date());
						UUID uuid = UUID.randomUUID();
						CharBuffer charBuffer = decoder.decode(buffer);
						String msg = charBuffer.toString();
						String[] strs = msg.split(",");
						FlumeInfo info = new FlumeInfo();
						for (String str : strs) {
							String[] texts = str.split("=");
							String s = texts[0];
							if ("name".equals(s)) {
								info.setName(texts[1]);
							} else if ("ip".equals(s)) {
								info.setIP(texts[1].replace("/", ""));
							} else if ("port".equals(s)) {
								info.setPort(texts[1]);
							} else if ("status".equals(s)) {
								String status = texts[1];
								if (status.equalsIgnoreCase("true")) {
									status = "1";
								} else {
									status = "0";
								}
								info.setStatus(status);
							} else if ("version".equals(s)) {
								info.setVersion(texts[1]);
							}
						}
						info.setCode("code");
						info.setSystemId(uuid.toString());
						info.setCreateTime(createTime);
						queue.put(info);
						buffer.clear();
					}
				} catch (Exception e) {
				}
			}
		}
	}

	class Task implements Runnable {
		private Connection conn;
		private PreparedStatement pstmt;

		public Task() {
			DBHelper helper = new DBHelper();
			conn = helper.getConnection();
		}

		public void run() {
			try {
				while (true) {
					FlumeInfo flume = queue.take();
					StringBuffer sb = new StringBuffer("insert into TB1");
					sb.append("	(systemId,name,code,ip,port,status,createTime,version) values (?,?,?,?,?,?,?,?)");
					pstmt = conn.prepareStatement(sb.toString());
					pstmt.setString(1, flume.getSystemId());
					pstmt.setString(2, flume.getName());
					pstmt.setString(3, flume.getCode());
					pstmt.setString(4, flume.getIP());
					pstmt.setString(5, flume.getPort());
					pstmt.setString(6, flume.getStatus());
					pstmt.setString(7, flume.getCreateTime());
					pstmt.setString(8, flume.getVersion());
					pstmt.executeUpdate();
				}
			} catch (Exception e) {
				//e.printStackTrace();
				System.err.println(e.getMessage());
			} finally {
				try {
					if (pstmt != null)
						pstmt.close();
					if (conn != null)
						conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}

	// 关闭服务
	public void closeServerSocket() {
		try {
			if (serverChannel != null && serverChannel.isOpen()) {
				serverChannel.close();
			}
		} catch (Exception ex) {
			System.out.println("SocketThread err:" + ex.getMessage());
		}
	}
}


服务端2的实现代码:

public class FileSkClient {
	private static final int BUFFER_SIZE = 1024;// 缓存大小
	private static final String IP = "127.0.0.1";
	private static final int PORT = 3356;
	private String filePath = null;
	private Socket socket = null;
	private String savePath = null;
	BufferedReader br = null;
	PrintWriter out = null;
	private int sumL = 0;

	public String handle(String IP, String command, String filePath,
			String savePath) throws Exception {
		String result = "FAILED";
		try {
			socket = new Socket(IP, PORT);
			this.filePath = filePath;
			this.savePath = savePath;
			br = new BufferedReader(new InputStreamReader(
					socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream());
			if ("0".equals(command)) {
				out.print("restart");
				out.flush();
				result = br.readLine();
				System.out.println(result);
			} else if ("1".equals(command)) {
				out.print("replace");
				out.flush();
				String isReady = br.readLine();
				if ("ready".equals(isReady)) {
					sumL = 0;
					boolean b=findFile(socket);
					if(b)result="SUCCESS";
					System.out.println(result);
				}
			} else if ("2".equals(command)) {
				out.print("stop");
				out.flush();
				result = br.readLine();
				System.out.println(result);
			}
		} catch (Exception e) {
			throw new Exception(e);
		} finally {
			if(out!=null)out.close();
			if(br!=null)br.close();
			if(socket!=null)socket.close();
		}
		return result;
	}

	// 选择文件
	private boolean findFile(Socket s) throws Exception {
		System.out.println("Begin to zip the file...");
		File f = FileUtil.doZip(filePath);
		if (f == null) {
			System.err.println("文件打包异常,无法继续");
			//throw new FileNotFoundException("文件打包异常,无法继续");
			return false;
		}
		return sendFile(s, f);
	}

	// 发送文件
	private boolean sendFile(Socket s, File file) throws Exception {
		DataOutputStream dos = null;
		DataInputStream dis = null;
		try {
			dos = new DataOutputStream(new BufferedOutputStream(
					s.getOutputStream()));
			long l = file.length();
			String dir = file.getName();
			if (savePath != null && !"".equals(savePath)) {
				dir = savePath + File.separator + file.getName();
			}
			dos.writeUTF(dir);
			dos.writeLong(l);
			dos.flush();
			dis = new DataInputStream(new BufferedInputStream(
					new FileInputStream(file)));
			byte[] bs = new byte[BUFFER_SIZE];
			int length = 0;
			System.out.println("开始传输文件...");
			while ((length = dis.read(bs)) > 0) {
				sumL += length;
				dos.write(bs, 0, length);
				dos.flush();
			}
			dos.close();
			dis.close();
			if (sumL == l) {
				System.out.println("文件传输完成");
				return true;
			}
		} catch (Exception e) {
			throw new Exception("发送文件失败:" + e.getMessage());
		} finally {
			try {
				if (dos != null)
					dos.close();
				if (dis != null)
					dis.close();
				if (s != null)
					s.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return false;
	}

	public static void main(String[] args) {
		System.out.println("请输入命令(0--重启、1--覆盖重启、2--停止):");
		Scanner sc = new Scanner(System.in);
		String str = sc.nextLine();
		String filePath = "";
		String savePath = "";
		try {
			new FileSkClient().handle(IP, str, filePath, savePath);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}




客户机1实现代码:


public class MonitorClient {
	private static final Logger logger = LoggerFactory
			.getLogger(MonitorClient.class);
	private Runtime rt = Runtime.getRuntime();
	private Socket s = null;
	private PrintWriter w = null;
	private Timer timer = new Timer();
	private static int i = 0;
	private String hostName = InetAddress.getLocalHost().getCanonicalHostName();// 主机名
	private static final String configFile = "config.properties";//版本号存放文件
	private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息
	private static final int TimeOut=60000;//发送数据间隔时间

	public static void main(String[] args) throws Exception {
		new MonitorClient();
	}

	public MonitorClient() throws Exception {
		try {
			pro.load(MonitorClient.class.getResourceAsStream(configFile));
			String IP=pro.getProperty("server.ip");
			String port=pro.getProperty("server.port");
			pro.clear();
			s = new Socket(IP, Integer.parseInt(port));
			s.setKeepAlive(true);
			System.out.println("connect successed!");
			w = new PrintWriter(s.getOutputStream());
			timer.schedule(new Processor(), 1000, TimeOut);// 在*秒后执行此任务,每次间隔*秒.
		} catch (Exception e) {
			System.err.println("Run error:" + e.getMessage());
			if (i < 50) {
				new MonitorClient();// 重试
				i++;
			}else{
				if(w!=null)w.close();
				destroyedTimer();
				if(s!=null)s.close();
			}
		}
	}

	public void destroyedTimer() {
		if (timer != null) {
			timer.cancel();
		}
	}

	// 处理类
	class Processor extends TimerTask {
		public void run() {
			try {
				boolean isAlive = isAlive();
				String version = getVersion();
				StringBuffer sb = new StringBuffer();
				sb.append("name="+hostName + ",");
				sb.append("ip="+s.getLocalAddress() + ",");
				sb.append("port="+s.getLocalPort() + ",");
				sb.append("status="+isAlive + ",");
				sb.append("version="+version);
				w.println(sb.toString());
				w.flush();
			} catch (Exception e) {
				e.printStackTrace();
				logger.info("Run error!" + e.getMessage());
			}
		}
	}

	// 判断是否是windows操作系统
	private boolean isWin() {
		String os = System.getProperty("os.name").toLowerCase();
		if (os.startsWith("win")) {// 如果是Windows操作系统
			return true;
		}
		return false;
	}

	/**
	 * 判断是否运行
	 * 
	 * @param rt
	 * @return
	 * @throws IOException
	 */
	private boolean isAlive() {
		boolean isAlive = false;
		try {
			if (isWin()) {
				String program = "CWAgen.exe";
				String command = "TASKLIST.EXE /FI "IMAGENAME EQ CWAGEN.EXE" /FO CSV /NH";
				Process p = rt.exec(command);
				BufferedReader read = new BufferedReader(new InputStreamReader(
						p.getInputStream()));
				String line = "";
				while ((line = read.readLine()) != null) {
					if (line.indexOf(program) != -1) {
						System.out.println("This application is running:" + line);
						isAlive = true;
					} else {
						System.out.println("This application is closed");
					}
				}
				read.close();
				p.destroy();
			}else{
				String command = "ps -ef | grep "CWAgen" | wc -l";
				Process p = rt.exec(command);
				BufferedReader read = new BufferedReader(new InputStreamReader(
						p.getInputStream()));
				String line = "";
				while ((line = read.readLine()) != null) {
					if (line.equals("1")) {
						System.out.println("This application is running:" + line);
						isAlive = true;
					} else if (line.equals("0")){
						System.out.println("This application is closed");
					}
				}
				read.close();
				p.destroy();
				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return isAlive;
	}

	/**
	 * 获取版本号
	 * 
	 * @return
	 */
	public String getVersion() {
		try {
			pro.load(MonitorClient.class.getResourceAsStream(configFile));
			return pro.getProperty("flume.version");
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}finally{
			pro.clear();
		}
	}
	
}

客户机2部分实现代码:

public class FileSkServer {
	ExecutorService exec = Executors.newCachedThreadPool();
	private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符
	private static final String FAILED_MSG = "FAILED";// 失败标识符
	private static final String REPLACE = "replace";// 操作命令
	private static final String RESTART = "restart";// 操作命令
	private static final String STOP = "stop";// 操作命令
	private static final int BUFFER_SIZE = 1024;// 缓存大小
	private static final String configFile = "config.properties";// 版本号存放文件
	private Properties pro = new Properties();// 加载属性文件,读取数据库连接配置信息

	private String PATH = "";
	private Runtime rt = Runtime.getRuntime();
	private static boolean flag = true;
	Socket socket = null;

	public FileSkServer() {
		ServerSocket server = null;
		try {
			pro.load(FileSkServer.class.getResourceAsStream(configFile));
			String port = pro.getProperty("fileServer.port");
			server = new ServerSocket(Integer.parseInt(port));// 绑定接受数据端口
			System.out.println("Waiting for client connect……");
			while (flag) {
				socket = server.accept();
				exec.execute(new Handler(socket));
			}
		} catch (IOException e) {
			//e.printStackTrace();
			if(server.isClosed()||socket.isClosed()){
				new FileSkServer();
			}
		} finally {
		}
	}

	public static void main(String[] args) {
		new FileSkServer();
	}

	// 判断是否是windows操作系统
	private boolean isWin() throws IOException {
		boolean flag = false;
		String os = System.getProperty("os.name").toLowerCase();
		pro.load(FileSkServer.class.getResourceAsStream(configFile));
		if (os.startsWith("win")) {// 如果是Windows操作系统
			flag = true;
		}else{
			PATH = pro.getProperty("flume_home_linux");
		}
		return flag;
	}

	// 判断系统架构(32位或64位)
	private boolean is64bit() throws IOException {
		boolean flag = false;
		String os = System.getProperty("os.arch");
		pro.load(MonitorClient.class.getResourceAsStream(configFile));
		if (os.contains("86") || os.contains("32")) {
			PATH = pro.getProperty("flume_home_win_32");
		} else if (os.contains("64")) {
			PATH = pro.getProperty("flume_home_win_64");
			flag= true;
		}
		return flag;
	}

	// 停止系统
	private boolean stop() {
		System.out.println("Begin to stop this application...");
		try {
			String command = "";
			if (isWin()) {
				is64bit();
				command = "cmd /c " + PATH + "bin\stopCWAgen.bat start";
			} else {
				command = PATH + "bin/stopCWAgen.sh";
			}
			if (isAlive()) {// 结束TOMCAT进程
				Process p= rt.exec(command, null, new File(PATH + "bin\"));
				final BufferedReader br2 = new BufferedReader(
						new InputStreamReader(p.getErrorStream(), "UTF-8"));
				final BufferedReader br = new BufferedReader(
						new InputStreamReader(p.getInputStream(), "UTF-8"));
				Thread t1 = new Thread() {
					public void run() {
						String line = null;
						try {
							while ((line = br2.readLine()) != null) {
								System.out.println(line);
							}
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				};
				Thread t2 = new Thread() {
					public void run() {
						String line = null;
						try {
							while ((line = br.readLine()) != null) {
								System.out.println(line);
							}
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				};
				t1.start();
				t2.start();
				// if (p.waitFor() != 0) {
				// if (p.exitValue() == 1) {// 0表示正常结束,1:非正常结束
				// System.err.println("Stop Failed!");
				// }
				// }
				if (isAlive())
					kill();// 强制结束进程
			}
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}

	// 启动系统
	private String start() {
		System.out.println("starting run this application...");
		String result = FAILED_MSG;
		try {
			if (!isAlive()) {
				String command = "";
				Process p = null;
				if (isWin()) {
					if (is64bit()) {
						command = "cmd /c " + PATH
								+ "bin\startCWAgen.bat start";
					} else {
						command = "cmd /c " + PATH
								+ "bin\startCWAgen_32.bat start";
					}
				} else {
					command = PATH + "bin/startup.sh";
				}
				p = rt.exec(command, null, new File(PATH + "bin\"));
				final BufferedReader br2 = new BufferedReader(
						new InputStreamReader(p.getErrorStream(), "UTF-8"));
				final BufferedReader br = new BufferedReader(
						new InputStreamReader(p.getInputStream(), "UTF-8"));
				Thread t1 = new Thread() {
					public void run() {
						String line = null;
						try {
							while ((line = br2.readLine()) != null) {
								System.out.println(line);
							}
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				};
				Thread t2 = new Thread() {
					public void run() {
						String line = null;
						try {
							while ((line = br.readLine()) != null) {
								System.out.println(line);
							}
						} catch (IOException e) {
							e.printStackTrace();
						}
					}
				};
				t1.start();
				t2.start();
				// if (p.waitFor() != 0) {
				// if (p.exitValue() == 1) {
				// System.err.println("Start failed!");
				// return FAILED_MSG;
				// }
				// }
				if (isAlive()) {
					System.out.println("Start successfully!");
					return SUCCESS_MSG;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return result;
	}

	/**
	 * 判断是否运行
	 * 
	 * @param rt
	 * @return
	 * @throws IOException
	 */
	private boolean isAlive() {
		boolean isAlive = false;
		try {
			if (isWin()) {
				String program = "CWAgen.exe";
				String command = "TASKLIST.EXE /FI "IMAGENAME EQ CWAGEN.EXE" /FO CSV /NH";
				Process p = rt.exec(command);
				BufferedReader read = new BufferedReader(new InputStreamReader(
						p.getInputStream()));
				String line = "";
				while ((line = read.readLine()) != null) {
					if (line.indexOf(program) != -1) {
						System.out.println("This application is running:"
								+ line);
						isAlive = true;
					} else {
						System.out.println("This application is closed");
					}
				}
				read.close();
				p.destroy();
			} else {
				String command = "ps -ef | grep "CWAgen" | wc -l";
				Process p = rt.exec(command);
				BufferedReader read = new BufferedReader(new InputStreamReader(
						p.getInputStream()));
				String line = "";
				while ((line = read.readLine()) != null) {
					if (line.equals("1")) {
						System.out.println("This application is running:"
								+ line);
						isAlive = true;
					} else if (line.equals("0")) {
						System.out.println("This application is closed");
					}
				}
				read.close();
				p.destroy();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return isAlive;
	}

	/**
	 * 强制结束进程
	 * 
	 * @param rt
	 * @throws IOException
	 */
	public boolean kill() {
		try {
			System.out.println("Begin to stop this application...");
			if (isAlive()) {
				Process p = null;
				if (isWin()) {
					String cmd = "taskkill -f -im CWAgen.exe";
					p = rt.exec(cmd);
				} else {
					String[] cmd = { "sh", "-c", "ps aux | grep CWAgen" };
					p = rt.exec(cmd);
					InputStreamReader is = new InputStreamReader(
							p.getInputStream());
					BufferedReader read = new BufferedReader(is);
					String line = null;
					while ((line = read.readLine()) != null) {
						if (line.indexOf("org.apache.catalina.startup.Bootstrap start") >= 0) {
							String tomcatPid = line.split("\s+")[1];
							rt.exec("kill -9 " + tomcatPid);
						}
					}
					is.close();
					read.close();
				}
				if (p.waitFor() != 0) {
					if (p.exitValue() == 1) {
						System.err.println("Start failed!");
					}
				}
				p.destroy();
			}
			if (!isAlive()) {
				System.out.println("Stop successfully!");
				return true;
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 命令处理类
	 * 
	 * @author T430
	 * 
	 */
	class Handler implements Runnable {
		Socket s;
		double sumL = 0;
		PrintWriter w = null;
		InputStream in = null;

		public Handler(Socket s) {
			this.s = s;
		}

		public void run() {
			DataInputStream dis = null;
			BufferedOutputStream bout = null;
			try {
				String command = null;
				if (flag) {
					int len = 0;
					byte[] b = new byte[BUFFER_SIZE];
					in = s.getInputStream();
					w = new PrintWriter(s.getOutputStream());
					if ((len = in.read(b)) != -1) {
						command = new String(b, 0, len);
						System.out.println(s.getLocalPort() + "command: "
								+ command);
					}
				}
				if (REPLACE.equalsIgnoreCase(command)) {
					flag = false;
					stop();
					System.out.println(PATH);
					w.println("ready");
					w.flush();
					dis = new DataInputStream(in);
					String fileName = dis.readUTF();
					long fileLength = dis.readLong();
					String dir = PATH + fileName;
					File file = new File(dir);
					if (!file.exists()) {
						file.createNewFile();
					}
					byte[] buf = new byte[BUFFER_SIZE];
					int read = 0;
					System.out.println("Begin to receive file(" + fileName
							+ "),size(" + fileLength + "B)...");
					bout = new BufferedOutputStream(new FileOutputStream(file));
					while ((read = dis.read(buf)) != -1) {
						sumL += read;
						bout.write(buf, 0, read);
						bout.flush();
					}
					bout.close();
					dis.close();
					if (fileLength == sumL) {
						System.out.println("Received over!The save path:"
								+ file.getPath());
						w.println("Received over!The save path:"
								+ file.getPath());
						w.flush();
						boolean isUnZip = unZip(file, file.getParent());// 解压文件
						if (isUnZip) {
							String result = start();
							w.println(result);
							w.flush();
						} else {
							w.println("Failed");
							w.flush();
						}
					}
					flag = true;
				} else if (RESTART.equalsIgnoreCase(command)) {
					String result = FAILED_MSG;
					if (stop()) {
						result = start();
					}
					w.println(result);
					w.flush();
				} else if (STOP.equalsIgnoreCase(command)) {
					boolean b = stop();
					String result = FAILED_MSG;
					if (b)
						result = SUCCESS_MSG;
					w.println(result);
					w.flush();
				}
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				try {
					if (w != null)
						w.close();
					if (dis != null)
						dis.close();
					if (bout != null)
						bout.close();
					if (s != null)
						s.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		/**
		 * 解压一个文件
		 * 
		 * @param zipfilename
		 *            解压的文件
		 * @param destDir
		 *            解压的目录
		 */
		private boolean unZip(File file, String destDir) {
			if (destDir == null)
				destDir = file.getParent();
			OutputStream os = null;
			InputStream is = null;
			if (!file.isFile() || !file.getName().endsWith(".zip")) {
				System.out.println("The file cannot unzip");
				return false;
			} else {
				try {
					int length;
					byte b[] = new byte[1024];
					destDir = destDir.endsWith("\") ? destDir : destDir + "\";
					ZipFile zipFile = new ZipFile(file);
					Enumeration<?> enumeration = zipFile.entries();
					ZipEntry zipEntry = null;
					System.out.println("Begin to unzip the file...");
					while (enumeration.hasMoreElements()) {
						zipEntry = (ZipEntry) enumeration.nextElement();
						File loadFile = new File(destDir + zipEntry.getName());
						// 判断压缩文件中的某个条目是文件夹还是文件
						if (zipEntry.isDirectory()) {// 如果是目录,那么判断该文件是否已存在并且不是一个文件夹,解决空文件夹解压后不存在的问题
							if (!loadFile.exists()) {
								loadFile.mkdirs();
							}
						} else {
							if (!loadFile.getParentFile().exists()) {
								loadFile.getParentFile().mkdirs();
							}
							os = new FileOutputStream(loadFile);
							is = zipFile.getInputStream(zipEntry);
							while ((length = is.read(b)) > 0) {
								os.write(b, 0, length);
								os.flush();
							}
						}

					}
					zipFile.close();
					is.close();
					os.close();
					file.delete();
					System.out.println("unzip successed");
					return true;
				} catch (Exception e) {
					e.printStackTrace();
					return false;
				}
			}
		}
	}
}

写到这里基本能实现上述需求,接下来是服务端的界面。

用到了Spring MVC,因为对这个不是很了解,所有写的很简单。

总体是采用Spring mvc+mybatis的架构,数据库是DB2,用到了EasyUI 1.4(很强大,以后丰富一下这方面的知识)

整个项目,包括建表语句、客户机实现都提供了下载:http://download.csdn.net/detail/tiantang_1986/8142325

最后

以上就是耍酷蓝天为你收集整理的socket之flume监控系统的全部内容,希望文章能够帮你解决socket之flume监控系统所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(26)

评论列表共有 0 条评论

立即
投稿
返回
顶部