概述
服务端
提供时间查询服务
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
*
* @author guokaige
*
*/
public class TimeServer {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket server = null;
try {
server = new ServerSocket(port);
System.out.println("The server is start in port : " + port);
Socket socket = null;
//死循环
while (true) {
//没有消息,将阻塞
socket = server.accept();
//没个消息,都开启新线程处理
new Thread(new TimeServerHandler(socket)).start();
}
} finally {
if (server != null) {
System.out.println("The server close");
server.close();
server = null;
}
}
}
}
Runnable实现
异步处理server接受查询请求,返回查询结果
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
*
* @author guokaige
*
*/
public class TimeServerHandler implements Runnable {
private Socket socket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
// 循环读取数据
while (true) {
body = in.readLine();
if (body == null)
break;
System.out.println("The server receive order : " + body);
currentTime = "QUERY TIME".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "ERROR";
out.println(currentTime);
}
} catch (Exception e) {
if (in != null) {
try {
in.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
if (out != null) {
out.close();
out = null;
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
this.socket = null;
}
}
}
}
客户端端
查询时间, 接收结果
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
*
* @author guokaige
*
*/
public class TimeClient {
/**
* @param args
*/
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//发送消息,查询时间
out.println("QUERY TIME");
System.out.println("Send query msg to server succeed.");
//查询返回结果
String resp = in.readLine();
System.out.println("receive from server : " + resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (out != null) {
out.close();
out = null;
}
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
总结: 同步阻塞I/O, 一个线程处理一条消息, 并发访问量大后,会造成线程堆栈溢出.
伪异步io
伪异步io在服务端添加一个线程池, 防止线程堆栈溢出.
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import com.zzy.netty.bio.TimeServerHandler;
/**
*
* @author kevin
*
*/
public class TimeServer {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket server = null;
try {
server = new ServerSocket(port);
System.out.println("The time server is start in port : " + port);
Socket socket = null;
//自定义线程池,池最大50, 消息最大保留10000
TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50, 10000);
while (true) {
socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}
} finally {
if (server != null) {
System.out.println("The time server close");
server.close();
server = null;
}
}
}
}
自定义线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
* @author kevin
*
*/
public class TimeServerHandlerExecutePool {
private ExecutorService executor;
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
//最小活跃线程数 是系统cpu核心数
int corePoolSize = Runtime.getRuntime().availableProcessors();
//大于cpu核心数的空闲线程,最长维持120秒销毁.
long keepAliveTime = 120L ;
//阻塞队列
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<java.lang.Runnable>(queueSize);
//线程工场,能自定义线程名
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//队列超限,处理策略, 拒绝消息
//RejectedExecutionHandler rejectedExecutionHandler=new ThreadPoolExecutor.AbortPolicy();
//队列超限,处理策略, 阻塞放入队列
RejectedExecutionHandler rejectedExecutionHandler=new CustomRejectedExecutionHandler();
//创建线程池
executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue ,namedThreadFactory, rejectedExecutionHandler);
}
public void execute(java.lang.Runnable task) {
executor.execute(task);
}
/**
* 重写拒绝机制
* @author Administrator
*
*/
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
总结: 伪异步io , 面临一个问题: 队列超限,处理策略RejectedExecutionHandler.
有两种处理方式:
第一种是丢弃信息,
第二种是阻塞的放入队列.
为了不丢消息,会选择后者. 当队列超限时,消息发送方会阻塞, 进而出现级联故障.
最后
以上就是含蓄香氛为你收集整理的《netty权威指南》同步阻塞I/O服务端Runnable实现客户端端伪异步io的全部内容,希望文章能够帮你解决《netty权威指南》同步阻塞I/O服务端Runnable实现客户端端伪异步io所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复