公司有个需求,为了减少运维人员的工作量,需要开发一个远程部署flume的工具。我这里使用比较方便安全的SFTP协议,前提是需要客户端安装有SSH。Unix、linux、aix系统基本默认安装,windows的要自行安装。
其实在windows系统中还可以使用SMB文件共享协议来实现文件传输,还有比较通用的FTP协议、Telnet协议,但这里我们选用比较安全,操作方便的SFTP协议,其他的,我在下面也贴出来。
首先下载依赖jar包:jsch-0.1.52.jar。官网:http://www.jcraft.com/jsch/
然后编写工具类,具体看代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177package com.mysite.stfp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map; import java.util.Properties; import java.util.Vector; import org.apache.log4j.Logger; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.ChannelSftp.LsEntry; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.SftpException; public class SFTPChannel { private static final Logger LOG = Logger.getLogger(SFTPChannel.class.getName()); private Session session = null; private ChannelSftp sftp = null; private ChannelExec exec = null; public SFTPChannel(String host, int port, String userName, String password, int timeout) { try { JSch jsch = new JSch(); // 创建JSch对象 session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象 LOG.debug("Session created."); if (password != null) { session.setPassword(password); // 设置密码 } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties session.setTimeout(timeout); // 设置timeout时间 session.connect(); // 通过Session建立链接 LOG.debug("Session connected."); } catch (JSchException e) { e.printStackTrace(); } } public SFTPChannel(Map<String, String> map, int timeout) { try { String host = map.get("host").toString(); int port = Integer.parseInt(map.get("port") + ""); String userName = map.get("userName").toString(); String password = map.get("password").toString(); JSch jsch = new JSch(); // 创建JSch对象 session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象 LOG.debug("Session created."); if (password != null) { session.setPassword(password); // 设置密码 } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties session.setTimeout(timeout); // 设置timeout时间 session.connect(); // 通过Session建立链接 LOG.debug("Session connected."); } catch (JSchException e) { e.printStackTrace(); } } /** * 打开SFTP通道 */ public ChannelSftp getSftp() { LOG.debug("Opening Channel."); try { Channel channel = session.openChannel("sftp");// 打开SFTP通道 channel.connect(); // 建立SFTP通道的连接 sftp = (ChannelSftp) channel; } catch (JSchException e) { e.printStackTrace(); } return sftp; } /** * 打开exec通道 */ public ChannelExec getExec() { LOG.debug("Opening Channel."); try { Channel channel = session.openChannel("exec");// 打开SFTP通道 channel.setInputStream(null); channel.connect(); // 建立SFTP通道的连接 exec = (ChannelExec) channel; } catch (JSchException e) { e.printStackTrace(); } return exec; } public void execCmd(String command) throws JSchException { BufferedReader reader = null; Channel channel = null; try { channel = session.openChannel("exec"); ((ChannelExec) channel).setCommand(command); channel.setInputStream(null); ((ChannelExec) channel).setErrStream(System.err); channel.connect(); reader = new BufferedReader(new InputStreamReader(channel.getInputStream())); String buf = null; while ((buf = reader.readLine()) != null) { System.out.println(buf); } } catch (IOException e) { e.printStackTrace(); } catch (JSchException e) { e.printStackTrace(); } finally { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } channel.disconnect(); } } public void pwd() throws SftpException { LOG.info(sftp.pwd()); } public void ls() throws SftpException { ls("."); } public void ls(String path) throws SftpException { Vector<?> vector = sftp.ls(path); for (Object object : vector) { if (object instanceof LsEntry) { LsEntry entry = LsEntry.class.cast(object); LOG.info(entry.getFilename()); } } } public void closeChannel() { if (sftp != null){ sftp.quit(); sftp.disconnect(); } if (exec != null) exec.disconnect(); if (session != null) { session.disconnect(); } } public void closeSftp() { if (sftp != null){ sftp.quit(); sftp.disconnect(); } } public void closeExec() { if (exec != null) exec.disconnect(); } }
然后是调用方法:
复制代码
下面是重写的文件传输的百分比统计:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54@RequestMapping(value = "/sftp.do") public String uploadBySftp(HttpServletRequest request, HttpServletResponse response) { String result = "handle failed!"; SFTPChannel channel = null; try { if (logger.isDebugEnabled()) { logger.debug("execute uploadBySftp method..."); } Map<String, String> params = getParas(request); String path = params.get("path").toString(); String os = params.get("os").toString(); String userName = params.get("userName").toString(); params.put("userName", decoder(userName)); String password = params.get("password").toString(); params.put("password", decoder(password)); channel = new SFTPChannel(params, 60000); ChannelSftp chSftp = channel.getSftp(); try { channel.ls(path); } catch (Exception e) { chSftp.mkdir(path); } File file = new File(src); long fileSize = file.length(); chSftp.put(src, path, new FileProgressMonitor(fileSize), ChannelSftp.OVERWRITE); String fileName = file.getName(); if (file.isFile() && (fileName.endsWith(".zip") || fileName.endsWith(".ZIP"))) channel.execCmd("unzip -o " + path + fileName + " -d " + path); else if (file.isFile() && (fileName.endsWith(".tar") || fileName.endsWith(".tar.gz"))) channel.execCmd("tar -zxvf " + path + fileName); chSftp.rm(path + file.getName()); if (!"windows".equals(os)) { channel.execCmd(path + "flume/bin/flume.sh"); } else { channel.execCmd(path + "flume\bin\start.bat"); } result = "success"; return result; } catch (SftpException e) { logger.error("has a error:{}", e.getMessage()); e.printStackTrace(); result = "file create failed"; return result; } catch (JSchException e) { logger.error("has a error:{}", e.getMessage()); e.printStackTrace(); return result; } finally { if (channel != null) channel.closeChannel(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128package com.mysite.stfp; import java.text.DecimalFormat; import java.util.Timer; import java.util.TimerTask; import com.jcraft.jsch.SftpProgressMonitor; public class FileProgressMonitor extends TimerTask implements SftpProgressMonitor { private long progressInterval = 1000; // 默认间隔时间为1秒 private boolean isEnd = false; // 记录传输是否结束 private long transfered; // 记录已传输的数据总大小 private long fileSize; // 记录文件总大小 private Timer timer; // 定时器对象 private boolean isScheduled = false; // 记录是否已启动timer记时器 public FileProgressMonitor(long fileSize) { this.fileSize = fileSize; } @Override public void run() { if (!isEnd()) { // 判断传输是否已结束 //System.out.println("Transfering is in progress."); long transfered = getTransfered(); if (transfered != fileSize) { // 判断当前已传输数据大小是否等于文件总大小 //System.out.println("Current transfered: " + transfered + " bytes"); sendProgressMessage(transfered); } else { System.out.println("File transfering is done."); setEnd(true); // 如果当前已传输数据大小等于文件总大小,说明已完成,设置end } } else { //System.out.println("Transfering done. Cancel timer."); stop(); // 如果传输结束,停止timer记时器 return; } } public void stop() { //System.out.println("Try to stop progress monitor."); if (timer != null) { timer.cancel(); timer.purge(); timer = null; isScheduled = false; } System.out.println("Progress monitor stoped."); } public void start() { //System.out.println("Try to start progress monitor."); if (timer == null) { timer = new Timer(); } timer.schedule(this, 1000, progressInterval); isScheduled = true; System.out.println("Progress monitor started."); } /** * 打印progress信息 * * @param transfered */ private void sendProgressMessage(long transfered) { if (fileSize != 0) { double d = ((double) transfered * 100) / (double) fileSize; DecimalFormat df = new DecimalFormat("#.##"); System.out.println("Sending progress message: " + df.format(d) + "%"); } else { System.out.println("Sending progress message: " + transfered); } } /** * 实现了SftpProgressMonitor接口的count方法 */ public boolean count(long count) { if (isEnd()) return false; if (!isScheduled) { start(); } add(count); return true; } /** * 实现了SftpProgressMonitor接口的end方法 */ public void end() { setEnd(true); System.out.println("transfering end."); } private synchronized void add(long count) { transfered = transfered + count; } private synchronized long getTransfered() { return transfered; } public synchronized void setTransfered(long transfered) { this.transfered = transfered; } private synchronized void setEnd(boolean isEnd) { this.isEnd = isEnd; } private synchronized boolean isEnd() { return isEnd; } public void init(int op, String src, String dest, long max) { // Not used for putting InputStream } }
------------------------------------------------------------------------------------------------
以上是SFTP协议的一些方法,下面介绍smb协议操作,依赖的包是jcifs-1.3.18.jar:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128package com.mysite.smb; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import jcifs.smb.SmbFile; import jcifs.smb.SmbFileInputStream; import jcifs.smb.SmbFileOutputStream; /** * * @author T430 * */ public class SmbUtil { /** * 方法一: * * @param remoteUrl * 远程路径 smb://192.168.75.204/test/新建 文本文档.txt * @throws IOException */ public static void smbGet(String remoteUrl) throws IOException { SmbFileInputStream in = null; try { SmbFile smbFile = new SmbFile(remoteUrl); int length = smbFile.getContentLength();// 得到文件的大小 byte buffer[] = new byte[length]; in = new SmbFileInputStream(smbFile); // 建立smb文件输入流 while ((in.read(buffer)) != -1) { System.out.write(buffer); System.out.println(buffer.length); } } catch (IOException e) { throw new IOException(e); } finally { try { if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } // 从共享目录下载文件 /** * 方法二: 路径格式:smb://192.168.75.204/test/新建 文本文档.txt * smb://username:password@192.168.0.77/test * * @param remoteUrl * 远程路径 * @param localDir * 要写入的本地路径 * @throws Exception */ public static void smbGet(String remoteUrl, String localDir) throws IOException { InputStream in = null; OutputStream out = null; try { SmbFile remoteFile = new SmbFile(remoteUrl); String fileName = remoteFile.getName(); File localFile = new File(localDir + File.separator + fileName); in = new BufferedInputStream(new SmbFileInputStream(remoteFile)); out = new BufferedOutputStream(new FileOutputStream(localFile)); byte[] buffer = new byte[1024]; while (in.read(buffer) != -1) { out.write(buffer); buffer = new byte[1024]; } } catch (IOException e) { throw new IOException(e); } finally { try { if (out != null) out.close(); if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 向共享目录上传文件 远程url smb://192.168.1.77/test 如果需要用户名密码就这样: * smb://username:password@192.168.1.77/test * * @param remoteUrl * @param localFilePath * @throws Exception */ public static void smbPut(String remoteUrl, String localFilePath) throws IOException { InputStream in = null; OutputStream out = null; try { File localFile = new File(localFilePath); String fileName = localFile.getName(); SmbFile remoteFile = new SmbFile(remoteUrl + "/" + fileName); in = new BufferedInputStream(new FileInputStream(localFile)); out = new BufferedOutputStream(new SmbFileOutputStream(remoteFile)); byte[] buffer = new byte[1024]; while (in.read(buffer) != -1) { out.write(buffer); buffer = new byte[1024]; } } catch (IOException e) { throw new IOException(e); } finally { try { if (out != null) out.close(); if (in != null) in.close(); } catch (IOException e) { e.printStackTrace(); } } } }
FTP的操作,依赖包commons-net-3.3.jar:
复制代码
Telnet协议,依赖包commons-net-3.3.jar:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163package com.mysite.ftp; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; public class FtpUtil { private static FTPClient ftp; /** * FTP上传 * * @param map * @param src */ public static void upload(Map<String, Object> map, String src) { String host = map.get("host").toString(); String userName = map.get("userName").toString(); String password = map.get("password").toString(); String path = map.get("path") == null ? "" : map.get("path").toString(); String port = map.get("port") == null ? "21" : map.get("port").toString(); upload(host, Integer.parseInt(port), userName, password, path, src); } /** * FTP上传 * * @param host * @param userName * @param password * @param path * @param src */ public static void upload(String host, int port, String userName, String password, String path, String src) { try { boolean isConnect = connect(path, host, port, userName, password); if(isConnect){ File file = new File(src); upload(file); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("FTP客户端出错!", e); } finally { try { ftp.disconnect(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("关闭FTP连接发生异常!", e); } } } /** * * @param path * 上传到ftp服务器哪个路径下 * @param addr * 地址 * @param port * 端口号 * @param username * 用户名 * @param password * 密码 * @return * @throws Exception */ private static boolean connect(String path, String addr, int port, String username, String password) throws IOException { boolean result = false; ftp = new FTPClient(); int reply; ftp.connect(addr, port); ftp.login(username, password); ftp.setFileType(FTPClient.BINARY_FILE_TYPE); reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return result; } ftp.changeWorkingDirectory(path); ftp.setBufferSize(1024); ftp.setControlEncoding("GBK"); result = true; return result; } /** * * @param file * 上传的文件或文件夹 * @throws Exception */ private static void upload(File file) throws IOException { if (file.isDirectory()) { ftp.makeDirectory(file.getName()); ftp.changeWorkingDirectory(file.getName()); String[] files = file.list(); for (int i = 0; i < files.length; i++) { File file1 = new File(file.getPath() + "\" + files[i]); if (file1.isDirectory()) { upload(file1); ftp.changeToParentDirectory(); } else { File file2 = new File(file.getPath() + "\" + files[i]); FileInputStream input = new FileInputStream(file2); ftp.storeFile(file2.getName(), input); input.close(); } } } else { File file2 = new File(file.getPath()); FileInputStream input = new FileInputStream(file2); ftp.storeFile(file2.getName(), input); input.close(); } } /** * FTP下载 * * @param host * @param userName * @param password * @param path * @param src */ public static void download(String host, String userName, String password, String path, String src) { FTPClient ftpClient = new FTPClient(); FileOutputStream fos = null; try { ftpClient.connect(host); ftpClient.login(userName, password); String remoteFileName = path; fos = new FileOutputStream(src); ftpClient.setBufferSize(1024); // 设置文件类型(二进制) ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); ftpClient.retrieveFile(remoteFileName, fos); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("FTP客户端出错!", e); } finally { IOUtils.closeQuietly(fos); try { ftpClient.disconnect(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("关闭FTP连接发生异常!", e); } } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158package com.mysite.telnet; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.SocketException; import org.apache.commons.net.telnet.TelnetClient; public class TelnetUtil { private static InputStream in; private static PrintStream out; private static String prompt = "#"; private static String os = "windows"; public static void handle(String host, int port, String os, String user, String pwd) { TelnetClient telnet = null; try { if ("windows".equals(os)) { telnet = new TelnetClient("VT220"); prompt = ""; } else { TelnetUtil.os = os; telnet = new TelnetClient(); prompt = user.equals("root") ? "#" : "$"; } telnet.connect(host, port); telnet.setSoTimeout(60000); in = telnet.getInputStream(); out = new PrintStream(telnet.getOutputStream()); login(user, pwd); //sendCommand("cd /d "D:/ftp/flume/bin""); //sendCommand("start "" "D:/ftp/flume/bin/startCWAgen.bat""); sendCommand("java -jar "D:/ftp/flume/lib/controller.jar" "D:/ftp/flume/lib/config.properties""); readUntil(); } catch (SocketException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (telnet != null) telnet.disconnect(); } catch (IOException e) { e.printStackTrace(); try { throw new Exception("telnet 关闭失败"); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void handle(String host, String os, String user, String pwd) { int port = 23; handle(host, port, os, user, pwd); } private static void login(String user, String password) { readUntil("login:"); write(user); if ("windows".equals(os)) { readUntil("password:"); } else { readUntil("Password:"); } write(password); readUntil(prompt + " "); } private static void su(String password) { try { write("su"); readUntil("Password: "); write(password); prompt = "#"; readUntil(prompt + " "); } catch (Exception e) { e.printStackTrace(); } } private static void write(String value) { try { out.println(value); out.flush(); System.out.println(value); } catch (Exception e) { e.printStackTrace(); } } public static String sendCommand(String command) { try { write(command); return readUntil(prompt + " "); } catch (Exception e) { e.printStackTrace(); } return null; } private static String readUntil() { InputStreamReader isr = null; BufferedReader br = null; try { //StringBuffer sb = new StringBuffer(); isr = new InputStreamReader(in, "GBK"); br = new BufferedReader(isr); int str = 0; while ((str = br.read()) != -1) { //sb.append((char) str); System.out.print((char) str); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (isr != null) isr.close(); if (br != null) br.close(); } catch (Exception e) { e.printStackTrace(); } } return null; } private static String readUntil(String pattern) { StringBuffer sb = new StringBuffer(); try { char lastChar = pattern.charAt(pattern.length() - 1); char ch = (char) in.read(); while (true) { sb.append(ch); if (ch == lastChar) { if (sb.toString().endsWith(pattern)) { byte[] temp = sb.toString().getBytes("iso8859-1");// 处理编码,界面显示乱码问题 return new String(temp, "GBK"); } } ch = (char) in.read(); } } catch (Exception e) { e.printStackTrace(); } return sb.toString(); } public static void main(String[] args) { TelnetUtil.handle("127.0.0.1", "windows", "bob", "123"); } }
最后
以上就是超帅帅哥最近收集整理的关于使用JSch远程部署flume采集点的全部内容,更多相关使用JSch远程部署flume采集点内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复