概述
公司有个需求,为了减少运维人员的工作量,需要开发一个远程部署flume的工具。我这里使用比较方便安全的SFTP协议,前提是需要客户端安装有SSH。Unix、linux、aix系统基本默认安装,windows的要自行安装。
其实在windows系统中还可以使用SMB文件共享协议来实现文件传输,还有比较通用的FTP协议、Telnet协议,但这里我们选用比较安全,操作方便的SFTP协议,其他的,我在下面也贴出来。
首先下载依赖jar包:jsch-0.1.52.jar。官网:http://www.jcraft.com/jsch/
然后编写工具类,具体看代码:
package com.mysite.stfp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import org.apache.log4j.Logger;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.ChannelSftp.LsEntry;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
public class SFTPChannel {
private static final Logger LOG = Logger.getLogger(SFTPChannel.class.getName());
private Session session = null;
private ChannelSftp sftp = null;
private ChannelExec exec = null;
public SFTPChannel(String host, int port, String userName, String password, int timeout) {
try {
JSch jsch = new JSch(); // 创建JSch对象
session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象
LOG.debug("Session created.");
if (password != null) {
session.setPassword(password); // 设置密码
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config); // 为Session对象设置properties
session.setTimeout(timeout); // 设置timeout时间
session.connect(); // 通过Session建立链接
LOG.debug("Session connected.");
} catch (JSchException e) {
e.printStackTrace();
}
}
public SFTPChannel(Map<String, String> map, int timeout) {
try {
String host = map.get("host").toString();
int port = Integer.parseInt(map.get("port") + "");
String userName = map.get("userName").toString();
String password = map.get("password").toString();
JSch jsch = new JSch(); // 创建JSch对象
session = jsch.getSession(userName, host, port);// 根据用户名,主机ip,端口获取一个Session对象
LOG.debug("Session created.");
if (password != null) {
session.setPassword(password); // 设置密码
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config); // 为Session对象设置properties
session.setTimeout(timeout); // 设置timeout时间
session.connect(); // 通过Session建立链接
LOG.debug("Session connected.");
} catch (JSchException e) {
e.printStackTrace();
}
}
/**
* 打开SFTP通道
*/
public ChannelSftp getSftp() {
LOG.debug("Opening Channel.");
try {
Channel channel = session.openChannel("sftp");// 打开SFTP通道
channel.connect(); // 建立SFTP通道的连接
sftp = (ChannelSftp) channel;
} catch (JSchException e) {
e.printStackTrace();
}
return sftp;
}
/**
* 打开exec通道
*/
public ChannelExec getExec() {
LOG.debug("Opening Channel.");
try {
Channel channel = session.openChannel("exec");// 打开SFTP通道
channel.setInputStream(null);
channel.connect(); // 建立SFTP通道的连接
exec = (ChannelExec) channel;
} catch (JSchException e) {
e.printStackTrace();
}
return exec;
}
public void execCmd(String command) throws JSchException {
BufferedReader reader = null;
Channel channel = null;
try {
channel = session.openChannel("exec");
((ChannelExec) channel).setCommand(command);
channel.setInputStream(null);
((ChannelExec) channel).setErrStream(System.err);
channel.connect();
reader = new BufferedReader(new InputStreamReader(channel.getInputStream()));
String buf = null;
while ((buf = reader.readLine()) != null) {
System.out.println(buf);
}
} catch (IOException e) {
e.printStackTrace();
} catch (JSchException e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
channel.disconnect();
}
}
public void pwd() throws SftpException {
LOG.info(sftp.pwd());
}
public void ls() throws SftpException {
ls(".");
}
public void ls(String path) throws SftpException {
Vector<?> vector = sftp.ls(path);
for (Object object : vector) {
if (object instanceof LsEntry) {
LsEntry entry = LsEntry.class.cast(object);
LOG.info(entry.getFilename());
}
}
}
public void closeChannel() {
if (sftp != null){
sftp.quit();
sftp.disconnect();
}
if (exec != null)
exec.disconnect();
if (session != null) {
session.disconnect();
}
}
public void closeSftp() {
if (sftp != null){
sftp.quit();
sftp.disconnect();
}
}
public void closeExec() {
if (exec != null)
exec.disconnect();
}
}
然后是调用方法:
@RequestMapping(value = "/sftp.do")
public String uploadBySftp(HttpServletRequest request, HttpServletResponse response) {
String result = "handle failed!";
SFTPChannel channel = null;
try {
if (logger.isDebugEnabled()) {
logger.debug("execute uploadBySftp method...");
}
Map<String, String> params = getParas(request);
String path = params.get("path").toString();
String os = params.get("os").toString();
String userName = params.get("userName").toString();
params.put("userName", decoder(userName));
String password = params.get("password").toString();
params.put("password", decoder(password));
channel = new SFTPChannel(params, 60000);
ChannelSftp chSftp = channel.getSftp();
try {
channel.ls(path);
} catch (Exception e) {
chSftp.mkdir(path);
}
File file = new File(src);
long fileSize = file.length();
chSftp.put(src, path, new FileProgressMonitor(fileSize), ChannelSftp.OVERWRITE);
String fileName = file.getName();
if (file.isFile() && (fileName.endsWith(".zip") || fileName.endsWith(".ZIP")))
channel.execCmd("unzip -o " + path + fileName + " -d " + path);
else if (file.isFile() && (fileName.endsWith(".tar") || fileName.endsWith(".tar.gz")))
channel.execCmd("tar -zxvf " + path + fileName);
chSftp.rm(path + file.getName());
if (!"windows".equals(os)) {
channel.execCmd(path + "flume/bin/flume.sh");
} else {
channel.execCmd(path + "flume\bin\start.bat");
}
result = "success";
return result;
} catch (SftpException e) {
logger.error("has a error:{}", e.getMessage());
e.printStackTrace();
result = "file create failed";
return result;
} catch (JSchException e) {
logger.error("has a error:{}", e.getMessage());
e.printStackTrace();
return result;
} finally {
if (channel != null)
channel.closeChannel();
}
}
下面是重写的文件传输的百分比统计:
package com.mysite.stfp;
import java.text.DecimalFormat;
import java.util.Timer;
import java.util.TimerTask;
import com.jcraft.jsch.SftpProgressMonitor;
public class FileProgressMonitor extends TimerTask implements SftpProgressMonitor {
private long progressInterval = 1000; // 默认间隔时间为1秒
private boolean isEnd = false; // 记录传输是否结束
private long transfered; // 记录已传输的数据总大小
private long fileSize; // 记录文件总大小
private Timer timer; // 定时器对象
private boolean isScheduled = false; // 记录是否已启动timer记时器
public FileProgressMonitor(long fileSize) {
this.fileSize = fileSize;
}
@Override
public void run() {
if (!isEnd()) { // 判断传输是否已结束
//System.out.println("Transfering is in progress.");
long transfered = getTransfered();
if (transfered != fileSize) { // 判断当前已传输数据大小是否等于文件总大小
//System.out.println("Current transfered: " + transfered + " bytes");
sendProgressMessage(transfered);
} else {
System.out.println("File transfering is done.");
setEnd(true); // 如果当前已传输数据大小等于文件总大小,说明已完成,设置end
}
} else {
//System.out.println("Transfering done. Cancel timer.");
stop(); // 如果传输结束,停止timer记时器
return;
}
}
public void stop() {
//System.out.println("Try to stop progress monitor.");
if (timer != null) {
timer.cancel();
timer.purge();
timer = null;
isScheduled = false;
}
System.out.println("Progress monitor stoped.");
}
public void start() {
//System.out.println("Try to start progress monitor.");
if (timer == null) {
timer = new Timer();
}
timer.schedule(this, 1000, progressInterval);
isScheduled = true;
System.out.println("Progress monitor started.");
}
/**
* 打印progress信息
*
* @param transfered
*/
private void sendProgressMessage(long transfered) {
if (fileSize != 0) {
double d = ((double) transfered * 100) / (double) fileSize;
DecimalFormat df = new DecimalFormat("#.##");
System.out.println("Sending progress message: " + df.format(d) + "%");
} else {
System.out.println("Sending progress message: " + transfered);
}
}
/**
* 实现了SftpProgressMonitor接口的count方法
*/
public boolean count(long count) {
if (isEnd())
return false;
if (!isScheduled) {
start();
}
add(count);
return true;
}
/**
* 实现了SftpProgressMonitor接口的end方法
*/
public void end() {
setEnd(true);
System.out.println("transfering end.");
}
private synchronized void add(long count) {
transfered = transfered + count;
}
private synchronized long getTransfered() {
return transfered;
}
public synchronized void setTransfered(long transfered) {
this.transfered = transfered;
}
private synchronized void setEnd(boolean isEnd) {
this.isEnd = isEnd;
}
private synchronized boolean isEnd() {
return isEnd;
}
public void init(int op, String src, String dest, long max) {
// Not used for putting InputStream
}
}
------------------------------------------------------------------------------------------------
以上是SFTP协议的一些方法,下面介绍smb协议操作,依赖的包是jcifs-1.3.18.jar:
package com.mysite.smb;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import jcifs.smb.SmbFile;
import jcifs.smb.SmbFileInputStream;
import jcifs.smb.SmbFileOutputStream;
/**
*
* @author T430
*
*/
public class SmbUtil {
/**
* 方法一:
*
* @param remoteUrl
* 远程路径 smb://192.168.75.204/test/新建 文本文档.txt
* @throws IOException
*/
public static void smbGet(String remoteUrl) throws IOException {
SmbFileInputStream in = null;
try {
SmbFile smbFile = new SmbFile(remoteUrl);
int length = smbFile.getContentLength();// 得到文件的大小
byte buffer[] = new byte[length];
in = new SmbFileInputStream(smbFile);
// 建立smb文件输入流
while ((in.read(buffer)) != -1) {
System.out.write(buffer);
System.out.println(buffer.length);
}
} catch (IOException e) {
throw new IOException(e);
} finally {
try {
if (in != null)
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 从共享目录下载文件
/**
* 方法二: 路径格式:smb://192.168.75.204/test/新建 文本文档.txt
* smb://username:password@192.168.0.77/test
*
* @param remoteUrl
* 远程路径
* @param localDir
* 要写入的本地路径
* @throws Exception
*/
public static void smbGet(String remoteUrl, String localDir) throws IOException {
InputStream in = null;
OutputStream out = null;
try {
SmbFile remoteFile = new SmbFile(remoteUrl);
String fileName = remoteFile.getName();
File localFile = new File(localDir + File.separator + fileName);
in = new BufferedInputStream(new SmbFileInputStream(remoteFile));
out = new BufferedOutputStream(new FileOutputStream(localFile));
byte[] buffer = new byte[1024];
while (in.read(buffer) != -1) {
out.write(buffer);
buffer = new byte[1024];
}
} catch (IOException e) {
throw new IOException(e);
} finally {
try {
if (out != null)
out.close();
if (in != null)
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 向共享目录上传文件 远程url smb://192.168.1.77/test 如果需要用户名密码就这样:
* smb://username:password@192.168.1.77/test
*
* @param remoteUrl
* @param localFilePath
* @throws Exception
*/
public static void smbPut(String remoteUrl, String localFilePath) throws IOException {
InputStream in = null;
OutputStream out = null;
try {
File localFile = new File(localFilePath);
String fileName = localFile.getName();
SmbFile remoteFile = new SmbFile(remoteUrl + "/" + fileName);
in = new BufferedInputStream(new FileInputStream(localFile));
out = new BufferedOutputStream(new SmbFileOutputStream(remoteFile));
byte[] buffer = new byte[1024];
while (in.read(buffer) != -1) {
out.write(buffer);
buffer = new byte[1024];
}
} catch (IOException e) {
throw new IOException(e);
} finally {
try {
if (out != null)
out.close();
if (in != null)
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
FTP的操作,依赖包commons-net-3.3.jar:
package com.mysite.ftp;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
public class FtpUtil {
private static FTPClient ftp;
/**
* FTP上传
*
* @param map
* @param src
*/
public static void upload(Map<String, Object> map, String src) {
String host = map.get("host").toString();
String userName = map.get("userName").toString();
String password = map.get("password").toString();
String path = map.get("path") == null ? "" : map.get("path").toString();
String port = map.get("port") == null ? "21" : map.get("port").toString();
upload(host, Integer.parseInt(port), userName, password, path, src);
}
/**
* FTP上传
*
* @param host
* @param userName
* @param password
* @param path
* @param src
*/
public static void upload(String host, int port, String userName, String password, String path, String src) {
try {
boolean isConnect = connect(path, host, port, userName, password);
if(isConnect){
File file = new File(src);
upload(file);
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("FTP客户端出错!", e);
} finally {
try {
ftp.disconnect();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("关闭FTP连接发生异常!", e);
}
}
}
/**
*
* @param path
* 上传到ftp服务器哪个路径下
* @param addr
* 地址
* @param port
* 端口号
* @param username
* 用户名
* @param password
* 密码
* @return
* @throws Exception
*/
private static boolean connect(String path, String addr, int port, String username, String password) throws IOException {
boolean result = false;
ftp = new FTPClient();
int reply;
ftp.connect(addr, port);
ftp.login(username, password);
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftp.disconnect();
return result;
}
ftp.changeWorkingDirectory(path);
ftp.setBufferSize(1024);
ftp.setControlEncoding("GBK");
result = true;
return result;
}
/**
*
* @param file
* 上传的文件或文件夹
* @throws Exception
*/
private static void upload(File file) throws IOException {
if (file.isDirectory()) {
ftp.makeDirectory(file.getName());
ftp.changeWorkingDirectory(file.getName());
String[] files = file.list();
for (int i = 0; i < files.length; i++) {
File file1 = new File(file.getPath() + "\" + files[i]);
if (file1.isDirectory()) {
upload(file1);
ftp.changeToParentDirectory();
} else {
File file2 = new File(file.getPath() + "\" + files[i]);
FileInputStream input = new FileInputStream(file2);
ftp.storeFile(file2.getName(), input);
input.close();
}
}
} else {
File file2 = new File(file.getPath());
FileInputStream input = new FileInputStream(file2);
ftp.storeFile(file2.getName(), input);
input.close();
}
}
/**
* FTP下载
*
* @param host
* @param userName
* @param password
* @param path
* @param src
*/
public static void download(String host, String userName, String password, String path, String src) {
FTPClient ftpClient = new FTPClient();
FileOutputStream fos = null;
try {
ftpClient.connect(host);
ftpClient.login(userName, password);
String remoteFileName = path;
fos = new FileOutputStream(src);
ftpClient.setBufferSize(1024);
// 设置文件类型(二进制)
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
ftpClient.retrieveFile(remoteFileName, fos);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("FTP客户端出错!", e);
} finally {
IOUtils.closeQuietly(fos);
try {
ftpClient.disconnect();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("关闭FTP连接发生异常!", e);
}
}
}
}
Telnet协议,依赖包commons-net-3.3.jar:
package com.mysite.telnet;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.SocketException;
import org.apache.commons.net.telnet.TelnetClient;
public class TelnetUtil {
private static InputStream in;
private static PrintStream out;
private static String prompt = "#";
private static String os = "windows";
public static void handle(String host, int port, String os, String user, String pwd) {
TelnetClient telnet = null;
try {
if ("windows".equals(os)) {
telnet = new TelnetClient("VT220");
prompt = "";
} else {
TelnetUtil.os = os;
telnet = new TelnetClient();
prompt = user.equals("root") ? "#" : "$";
}
telnet.connect(host, port);
telnet.setSoTimeout(60000);
in = telnet.getInputStream();
out = new PrintStream(telnet.getOutputStream());
login(user, pwd);
//sendCommand("cd /d "D:/ftp/flume/bin"");
//sendCommand("start "" "D:/ftp/flume/bin/startCWAgen.bat"");
sendCommand("java -jar "D:/ftp/flume/lib/controller.jar" "D:/ftp/flume/lib/config.properties"");
readUntil();
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (telnet != null)
telnet.disconnect();
} catch (IOException e) {
e.printStackTrace();
try {
throw new Exception("telnet 关闭失败");
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void handle(String host, String os, String user, String pwd) {
int port = 23;
handle(host, port, os, user, pwd);
}
private static void login(String user, String password) {
readUntil("login:");
write(user);
if ("windows".equals(os)) {
readUntil("password:");
} else {
readUntil("Password:");
}
write(password);
readUntil(prompt + " ");
}
private static void su(String password) {
try {
write("su");
readUntil("Password: ");
write(password);
prompt = "#";
readUntil(prompt + " ");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void write(String value) {
try {
out.println(value);
out.flush();
System.out.println(value);
} catch (Exception e) {
e.printStackTrace();
}
}
public static String sendCommand(String command) {
try {
write(command);
return readUntil(prompt + " ");
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static String readUntil() {
InputStreamReader isr = null;
BufferedReader br = null;
try {
//StringBuffer sb = new StringBuffer();
isr = new InputStreamReader(in, "GBK");
br = new BufferedReader(isr);
int str = 0;
while ((str = br.read()) != -1) {
//sb.append((char) str);
System.out.print((char) str);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (isr != null)
isr.close();
if (br != null)
br.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
}
private static String readUntil(String pattern) {
StringBuffer sb = new StringBuffer();
try {
char lastChar = pattern.charAt(pattern.length() - 1);
char ch = (char) in.read();
while (true) {
sb.append(ch);
if (ch == lastChar) {
if (sb.toString().endsWith(pattern)) {
byte[] temp = sb.toString().getBytes("iso8859-1");// 处理编码,界面显示乱码问题
return new String(temp, "GBK");
}
}
ch = (char) in.read();
}
} catch (Exception e) {
e.printStackTrace();
}
return sb.toString();
}
public static void main(String[] args) {
TelnetUtil.handle("127.0.0.1", "windows", "bob", "123");
}
}
最后
以上就是超帅帅哥为你收集整理的使用JSch远程部署flume采集点的全部内容,希望文章能够帮你解决使用JSch远程部署flume采集点所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复