概述
为什么80%的码农都做不了架构师?>>>
1、首先先介绍下简单的网络编程,需要编写一个服务端类(Server)和一个客户端类(Client),服务端可以接受多个客户端的连接。将客户端与服务端之间发送接收的消息封装为Msg类。代码如下:
//服务端
public class Server {
static boolean flag = true;
public static void main(String[] args){
try {
ServerSocket server = new ServerSocket(9528);
System.out.println("开始启动服务器");
while(flag){
//阻塞的IO
Socket socket = server.accept();
System.out.println("[Client Request]:"+socket.getRemoteSocketAddress().toString());
//可以读或写对象
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
//单线程
Msg msg = (Msg)ois.readObject();
System.out.println(msg.getContent());
ois.close();
oos.close();
}
server.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//客户端
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9528);
//可以读或写对象
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
//单线程
Msg msg = new Msg();
msg.setContent("Hello");
oos.writeObject(msg);
ois.close();
oos.close();
//监测所有文件状态变化
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消息
* @author ZD
*/
public class Msg implements Serializable {
private static final long serialVersionUID = 1L;
private String content; //消息内容public String getContent() {
return content;
}public void setContent(String content) {
this.content = content;
}
}
运行结果如下:
以上是单线程。为了提高处理文件的效率,需改用为多线程。较为简单就不展示了。
2、解析:
现在我们需要将客户端即本地源文件上传至服务端,首先我们需将源文件分块计算文件的弱校验和与强校验和,并且设置块号,然后将消息(Msg类)传入服务端,所以此处还需要为消息添加消息类型(MsgType类),服务端再根据消息类型做相应回应。
消息类型如下:
public class MsgType {
public static final int Req_MetaData = 1; //请求元数据
public static final int Req_Patch = 2; //补丁
public static final int Req_NewFile = 3; //新文件
public static final int SUCCESS = 99;
public static final int ERROR = -1;
}
/**
* 消息
* @author ZD
*/
public class Msg implements Serializable {
private static final long serialVersionUID = 1L;
private String content;
private int msgType; //消息类型
private String filePath; //文件路径
private boolean isDir = false; //是否是目录
private boolean isNew = false; //服务器是否存在
private Map<Long, List<Chunk>> metaData; //服务器文件元数据
private Patch patch; //补丁
private byte[] datas; //如果是新文件全部放这里
... //省略set、get方法
}
客户端需要一个阻塞队列来保存消息,然后根据接收到服务端传来的Msg类型做相应处理。具体类型不同操作请看代码,如下:
//客户端
public class Client {
// 阻塞队列
public static BlockingQueue<Msg> msgQueue = new LinkedBlockingQueue<>();public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9528);
// 可以读或写对象
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
new ClientWriteThread(oos).start();
new ClientReadThread(ois).start();
// 文件固定
Msg msg = new Msg();
msg.setMsgType(MsgType.Req_MetaData);
msg.setFilePath("Hello.txt"); msgQueue.put(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中ClientReadThread类为客户端读线程,若接收到MsgType为获取元数据时,需要创建补丁文件并将此消息加入队列中发送至服务端;若为服务端不存在该文件,即此文件为新文件时,需要将该文件的数据放至消息中并告诉服务端该消息为新文件类型;若为成功即代表该操作成功。为了方便观察,其中添加了一些输出语句。具体实现如下:
/**
* 客户端读线程
* @author ZD
*/
public class ClientReadThread extends Thread {
private ObjectInputStream ois;
private String clientPath="F:/rsyncTemp/local/";
public ClientReadThread(ObjectInputStream ois) {
this.ois = ois;
}@Override
public void run() {
try {
while(true){
Msg msg = (Msg) ois.readObject();
//System.out.println(msg.getContent());
switch(msg.getMsgType()){
case MsgType.Req_MetaData:
doReqMetaData(msg);
break;
case MsgType.Req_NewFile:
doNewFile(msg);
break;
case MsgType.SUCCESS:
doSuccess(msg);
default:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}private void doNewFile(Msg msg) throws IOException, InterruptedException {
System.out.println("[Client]:"+msg.getFilePath()+"服务器端不存在");
String filePath = clientPath+msg.getFilePath();
File file = new File(filePath);
byte[] datas = new byte[(int)file.length()]; //文件不是很大的时候
FileInputStream fis = new FileInputStream(file);
fis.read(datas);
fis.close();
Msg reMsg = new Msg();
reMsg.setMsgType(MsgType.Req_NewFile);
reMsg.setFilePath(msg.getFilePath());
reMsg.setDatas(datas);
Client.msgQueue.put(reMsg);
}private void doSuccess(Msg msg) {
System.out.println("[Client]:"+msg.getFilePath()+"同步成功");
}private void doReqMetaData(Msg msg) throws Exception {
System.out.println("[Client]:"+msg.getFilePath()+"获取元数据成功");
String filePath = clientPath+msg.getFilePath();
//打补丁
Patch patch = RsyncUtil.createPatch(new File(filePath), msg.getMetaData());
Msg patchMsg = new Msg();
patchMsg.setMsgType(MsgType.Req_Patch);
patchMsg.setPatch(patch);
patchMsg.setFilePath(msg.getFilePath());
Client.msgQueue.put(patchMsg);
}
}
ClientWriteThread类为客户端写线程,具体实现如下:
/**
* 客户端写线程
* @author ZD
*/
public class ClientWriteThread extends Thread {
private ObjectOutputStream oos;
public ClientWriteThread(ObjectOutputStream oos) {
this.oos = oos;
}@Override
public void run() {
while(true){
try {
Msg msg = Client.msgQueue.take();
oos.writeObject(msg);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服务端也是根据接收到客户端的Msg做相应处理,若接收到的为请求元数据,先查看文件是否是目录,若为目录则需要查看目录是否存在,不存在则创建;若不为目录,则会先查看文件是否存在,存在则计算文件的CheckSum(调用上一篇博客中提到的RsyncUtil工具类),并且计算每块的强校验和与弱校验和,然后将元数据写给客户端。若不存在则告知客户端服务端不存在该文件;若为打补丁类型,则将客户端与服务端不同部分同步至服务端,更新文件;若为新文件类型,则将新文件同步至服务端。
啰啰嗦嗦半天,可能你还是没看懂,那就直接上代码吧!如下:
/**
* 服务器端的线程
* @author ZD
*/
public class ServerThread extends Thread{
private ObjectInputStream ois;
private ObjectOutputStream oos;
private String serverPath="F:/rsyncTemp/remote/";
public ServerThread(ObjectInputStream ois, ObjectOutputStream oos) {
this.ois = ois;
this.oos = oos;
}@Override
public void run() {
try {
//需要不停的获取消息
while(true){
Msg msg = (Msg)ois.readObject();
switch(msg.getMsgType()){
case MsgType.Req_MetaData:
if(msg.isDir()){
doDir(msg);
}else{
doReqMetaData(msg);
}
break;
case MsgType.Req_NewFile:
doNewFile(msg);
break;
case MsgType.Req_Patch:
doPatch(msg);
break;
default:
break;
}
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private void doNewFile(Msg msg) throws IOException {
String filePath = serverPath+msg.getFilePath();
File file = new File(filePath);
FileOutputStream fos = new FileOutputStream(file);
fos.write(msg.getDatas());
fos.close();
Msg reMsg = new Msg();
reMsg.setMsgType(MsgType.SUCCESS);
reMsg.setFilePath(msg.getFilePath());
oos.writeObject(reMsg);
}
//专处理目录的
private void doDir(Msg msg) throws IOException {
String filePath = serverPath+msg.getFilePath();
File file = new File(filePath);
if(!file.exists()){
file.mkdir();
}
Msg reMsg = new Msg();
reMsg.setMsgType(MsgType.SUCCESS);
reMsg.setFilePath(msg.getFilePath());
oos.writeObject(reMsg);
}private void doPatch(Msg msg) throws Exception {
System.out.println("[Server]:"+msg.getFilePath()+"开始打补丁");
String filePath = serverPath+msg.getFilePath();
Patch patch = msg.getPatch();
RsyncUtil.applyPatch(patch, filePath);
Msg reMsg = new Msg();
reMsg.setMsgType(MsgType.SUCCESS);
reMsg.setFilePath(msg.getFilePath());
oos.writeObject(reMsg);
}
private void doReqMetaData(Msg msg) throws IOException{
System.out.println("[Server]:"+msg.getFilePath()+"开始请求元数据");
String filePath = serverPath+msg.getFilePath();
File file = new File(filePath);
Map<Long, List<Chunk>> metaData = new HashMap<>();
if(file.exists()){
metaData = RsyncUtil.calcCheckSumFromFile(filePath);
}else{
msg.setMsgType(MsgType.Req_NewFile);
msg.setNew(true);
}
msg.setMetaData(metaData);
oos.writeObject(msg);
}
}
现在差不多就完成了一个简单云盘,只是现在只能针对指定的单个文件,且输出的调试语句也很生硬。下面就给出一个监测的工具类,不仅可以对多个文件操作,还可以实现监测指定路径下所有文件的变化情况。(其实是老师写的~~~抱歉、具体实现没有特别研究,如果你有兴趣可以研究研究哦~~~~)。代码如下:
/**
* 文件监控回调函数
* @author: 0xC000005
*/
public interface WatchCallBack {
public void create(String path);
public void modify(String path);
public void rename(String from , String to);
public void delete(String path);
}
/**
* 回调实现
* @author: 0xC000005
*/
public class WatchCallBackImpl implements WatchCallBack{
private String clientPath="F:/rsyncTemp/local/";
public WatchCallBackImpl() {
}
@Override
public void create(String path) {
System.out.println("创建文件 "+path);
}
@Override
public void modify(String path) { //主要修改该方法
System.out.println("修改文件 "+path);
Msg msg = new Msg();
File file = new File(clientPath+path);
if(file.isDirectory()){
msg.setDir(true);
}
msg.setType(MsgType.Req_MetaData);
msg.setFilePath(path);
try {
Client.msgQueue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void rename( String from , String to ){
System.out.println("重命名文件 "+from + " --> "+to);
}
@Override
public void delete(String path) {}
}
/**
* 磁盘监听器,用于监听目录变化
* @author: 0xC000005
*/
public class Watcher {
@SuppressWarnings("unchecked")
public static boolean listenChanges(String fileName,WatchCallBack callback) throws Exception{
File file = new File(fileName);
if( file.exists() && file.isDirectory() ){
System.out.println("同步服务开始监听 !");
Path path = Bootstrapper.newPath(file);
WatchService ws = Bootstrapper.newWatchService();
path.register(ws,new WatchEvent.Kind<?>[]{
StandardWatchEventKind.ENTRY_CREATE,
StandardWatchEventKind.ENTRY_MODIFY,
StandardWatchEventKind.ENTRY_DELETE,
ExtendedWatchEventKind.ENTRY_RENAME_FROM,
ExtendedWatchEventKind.ENTRY_RENAME_TO},
ExtendedWatchEventModifier.FILE_TREE);
WatchKey watch = null;
boolean flag = true;
while(flag){
watch = ws.take();
List<WatchEvent<?>> events = watch.pollEvents();
watch.reset();
String from ="";
String to ="";
for(WatchEvent<?> event : events){
Kind<Path> kind = (Kind<Path>) event.kind();
PathImpl context = (PathImpl) event.context();
String filePath = context.getFile().getPath();
if( kind.equals(StandardWatchEventKind.ENTRY_MODIFY) ){
System.out.println("修改: "+filePath);
callback.modify(filePath);
}else if( kind.equals(StandardWatchEventKind.ENTRY_CREATE) ){
System.out.println("创建: "+filePath);
callback.create(filePath);
}else if( kind.equals(StandardWatchEventKind.ENTRY_DELETE) ){
System.out.println("删除: "+filePath);
callback.delete(filePath);
}else if( kind.equals(ExtendedWatchEventKind.ENTRY_RENAME_FROM) ){
System.out.println("重命名From: "+filePath);
from = filePath;
}else if( kind.equals(ExtendedWatchEventKind.ENTRY_RENAME_TO) ){
System.out.println("重命名To: "+filePath);
to = filePath;
callback.rename(from, to);
}
}
}
return true;
}else{
System.err.println("文件不存在或不是目录");
return false;
}
}
}
写在最后:若有错误,望纠正。希望我的分析能给大家带来收获。
转载于:https://my.oschina.net/eager/blog/685685
最后
以上就是积极犀牛为你收集整理的基于Rsync算法的简单云盘实现(下)的全部内容,希望文章能够帮你解决基于Rsync算法的简单云盘实现(下)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复