概述
在上一篇的基础上,增强功能。
common包,定义通用接口,和传递中使用的对象,对象需要实现序列化接口。
接口:
- package mina.common;
- public interface RpcInterface {
- public String getStringValue(String arg0,int arg1,Apple arg2);
- public int getIntValue();
- public void printPrice();
- }
- package mina.common;
- import java.awt.Color;
- import java.io.Serializable;
- import java.util.Date;
- public class Apple implements Serializable{
- private Color color = Color.BLACK;
- private double weight = 1.1;
- private double dia = 2.33;
- private int num = 3;
- private String name = "aaapple";
- private Date pdate = new Date();
- public Date getPdate() {
- return pdate;
- }
- public void setPdate(Date pdate) {
- this.pdate = pdate;
- }
- public Color getColor() {
- return color;
- }
- public void setColor(Color color) {
- this.color = color;
- }
- public double getDia() {
- return dia;
- }
- public void setDia(double dia) {
- this.dia = dia;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getNum() {
- return num;
- }
- public void setNum(int num) {
- this.num = num;
- }
- public double getWeight() {
- return weight;
- }
- public void setWeight(double weight) {
- this.weight = weight;
- }
- }
Server类,启动监听,
- package mina.server;
- import java.net.InetSocketAddress;
- import org.apache.mina.common.DefaultIoFilterChainBuilder;
- import org.apache.mina.common.IoAcceptor;
- import org.apache.mina.common.IoAcceptorConfig;
- import org.apache.mina.transport.socket.nio.SocketAcceptor;
- import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
- import org.apache.mina.transport.socket.nio.SocketSessionConfig;
- public class Server {
- /** Choose your favorite port number. */
- private static final int PORT = 8080;
- private static final boolean USE_SSL = false;
- public static void main(String[] args) throws Exception {
- IoAcceptor acceptor = new SocketAcceptor();
- IoAcceptorConfig config = new SocketAcceptorConfig();
- DefaultIoFilterChainBuilder chain = config.getFilterChain();
- ((SocketSessionConfig) config.getSessionConfig()).setReuseAddress(true);
- // Add SSL filter if SSL is enabled.
- if (USE_SSL) {
- // addSSLSupport(chain);
- }
- // Bind
- acceptor.bind(new InetSocketAddress(PORT), new ServerHandler(), config);
- System.out.println("Listening on port " + PORT);
- }
- }
- package mina.server;
- import mina.common.Apple;
- import mina.common.RpcInterface;
- public class RpcServerImpl implements RpcInterface {
- public String getStringValue(String arg0,int arg1,Apple arg2) {
- System.out.println("apple time is "+arg2.getPdate());
- return "this is sign from server.";
- }
- public int getIntValue() {
- return 3;
- }
- public void printPrice() {
- System.out.println("******price**********");
- System.out.println("$10000000000000000000");
- System.out.println("*********************");
- }
- }
- package mina.server;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.net.SocketAddress;
- import java.util.ArrayList;
- import java.util.Enumeration;
- import java.util.Hashtable;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Properties;
- import org.apache.mina.common.ByteBuffer;
- import org.apache.mina.common.IoSession;
- import org.apache.mina.handler.StreamIoHandler;
- import org.apache.mina.integration.jmx.IoSessionManager;
- public class ServerHandler extends StreamIoHandler {
- Hashtable sessionMgr = new Hashtable();
- List serverList = new ArrayList();
- ServerHandler() {
- serverList.add(new RpcServerImpl());
- }
- public void messageReceived(IoSession session, Object buf) {
- SocketAddress adr = session.getRemoteAddress();
- System.out.println("remote address is =" + adr.toString());
- System.out.println("buf=" + buf.toString());
- if (buf instanceof ByteBuffer) {
- ByteBuffer bb = (ByteBuffer) buf;
- System.out.println("bbb===" + bb);
- try {
- Properties prop = (Properties) bb.getObject();
- System.out.println("prop==" + prop);
- String interfaceName = (String) prop.get("interface");
- Iterator it = serverList.iterator();
- while (it.hasNext()) {// 查找实例
- Object serobj = it.next();
- Class[] clazz = serobj.getClass().getInterfaces();
- if (isContains(clazz, interfaceName)) {// 找到相应实例
- System.out.println("find.." + interfaceName);
- int argc = Integer.parseInt(String.valueOf(prop
- .get("argc")));
- Class[] types = null;// 参数类型数组
- Object[] args = null;// 参数对象数组
- // 不为最大值时表示有正常参数,否则为无参数。
- if (argc != Integer.MAX_VALUE) {
- // 重组参数列表
- types = new Class[argc];
- args = new Object[argc];
- List typeList = (List) prop.get("types");
- List argList = (List) prop.get("args");
- for (int i = 0; i < argc; i++) {
- args[i] = argList.get(i);
- types[i] = (Class) typeList.get(i);
- System.out.println("arg###=" + args[i]
- + " type@@@=" + types[i]);
- }
- }
- String methodName = prop.getProperty("method");
- Method method = serobj.getClass().getMethod(methodName,
- types);
- //调用
- Object resultObject = method.invoke(serobj, args);
- //回复
- replyCall(session.getRemoteAddress().toString(),resultObject);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- //广播操作,广播消息或事件.标记为"BROADCAST"
- public void brocastMessage(Object obj){
- Enumeration enu = sessionMgr.keys();
- while (enu.hasMoreElements()) {
- String addr = (String) enu.nextElement();
- sendMessage("BROADCAST",addr,obj);
- }
- }
- //通知操作,通知消息或事件。标记为"NOTIFY"
- public void notify(String addr,Object obj){
- sendMessage("NOTIFY",addr,obj);
- }
- //响应RPC调用。
- public void replyCall(String addr,Object obj){
- sendMessage("REPLYCALL",addr,obj);
- }
- private void sendMessage(String mark,String addr,Object obj){
- Object sessionObj = sessionMgr.get(addr);
- if(sessionObj!=null){
- IoSession session = (IoSession)sessionObj;
- if(!session.isConnected()){
- return;
- }
- Properties resultProp = new Properties();
- resultProp.setProperty("mark",mark);
- if(obj!=null) //对于无返回的情况。
- resultProp.put("Object",obj);
- ByteBuffer bb = ByteBuffer.allocate(16);
- bb.setAutoExpand(true);
- bb.putObject(resultProp);
- bb.flip();
- session.write(bb);
- }else{
- System.out.println("session null.addr="+addr);
- }
- }
- protected void processStreamIo(IoSession session, InputStream ins,
- OutputStream ous) {
- System.out.println("processStreamIo is called.");
- }
- private boolean isContains(Class[] clazz, String ifName) {
- for (int i = 0; i < clazz.length; i++) {
- if (clazz[i].getName().equals(ifName))
- return true;
- }
- return false;
- }
- public void sessionOpened(IoSession ssn) {
- System.out.println("session open for " + ssn.getRemoteAddress());
- sessionMgr.put(ssn.getRemoteAddress().toString(),ssn);
- }
- public void exceptionCaught(IoSession ssn, Throwable cause) {
- cause.printStackTrace();
- sessionMgr.remove(ssn.getRemoteAddress().toString());
- ssn.close();
- }
- public void sessionClosed(IoSession ssn) throws Exception {
- System.out.println("session closed from " + ssn.getRemoteAddress());
- sessionMgr.remove(ssn.getRemoteAddress().toString());
- }
- }
Client类,连接服务端,调用客户端的方法。
- package mina.client;
- import java.net.InetSocketAddress;
- import mina.common.Apple;
- import org.apache.mina.common.ConnectFuture;
- import org.apache.mina.common.IoConnectorConfig;
- import org.apache.mina.transport.socket.nio.SocketConnector;
- public class Client {
- public static void main(String[] args) throws Exception {
- // Create TCP/IP connector.
- SocketConnector connector = new SocketConnector();
- // Set connect timeout.
- ((IoConnectorConfig) connector.getDefaultConfig())
- .setConnectTimeout(15);
- ClientIoHandler handler = new ClientIoHandler();
- // Start communication.
- ConnectFuture cf = connector.connect(new InetSocketAddress("localhost",
- 8080), handler);
- // Wait for the connection attempt to be finished.
- System.out.println("start to join");
- cf.join();
- cf.getSession();
- System.out.println("test get value="
- + RpcClientImpl.getInstance().getStringValue("adsf", 222,
- new Apple()));
- System.out.println("test get int value="
- + RpcClientImpl.getInstance().getIntValue());
- System.out.println("test call process start.");
- RpcClientImpl.getInstance().printPrice();
- System.out.println("test call process end.");
- }
- }
AbstractClientImpl类,为client实现接口时使用的超类。
- package mina.client;
- import org.apache.mina.common.IoSession;
- public abstract class AbstractClientImpl {
- IoSession session = null;
- ClientIoHandler handler = null;
- public IoSession getSession() {
- return session;
- }
- public void setSession(IoSession session) {
- this.session = session;
- }
- public ClientIoHandler getHandler() {
- return handler;
- }
- public void setHandler(ClientIoHandler handler) {
- this.handler = handler;
- }
- }
- package mina.client;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Properties;
- import mina.common.Apple;
- import mina.common.RpcInterface;
- import org.apache.mina.common.ByteBuffer;
- public class RpcClientImpl extends AbstractClientImpl implements RpcInterface {
- private static String IFNAME = RpcInterface.class.getName();
- private static RpcClientImpl instance = null;
- public static RpcClientImpl getInstance(){
- if(instance == null){
- instance = new RpcClientImpl();
- }
- return instance;
- }
- public String getStringValue(String arg0,int arg1,Apple arg2) {
- if(session!=null){
- Properties prop = new Properties();
- prop.setProperty("interface",IFNAME);
- prop.setProperty("method","getStringValue");
- prop.put("argc","3");//参数个数.
- //生成参数类型链表
- List typeList = new ArrayList();
- typeList.add(String.class);
- typeList.add(Integer.TYPE);
- typeList.add(Apple.class);
- //生成参数对象链表
- List argList = new ArrayList();
- argList.add(arg0);
- argList.add(arg1);
- argList.add(arg2);
- prop.put("types",typeList);
- prop.put("args",argList);
- ByteBuffer bb = ByteBuffer.allocate(16);
- bb.setAutoExpand( true );
- bb.putObject(prop);
- bb.flip();
- System.out.println("bbb==="+bb.toString());
- session.write(bb);
- }
- try {
- System.out.println("handler="+handler);
- System.out.println("lock="+handler.lock);
- printStamp("1");
- synchronized(handler.lock){
- printStamp("2");
- handler.lock.wait();
- printStamp("3");
- if(handler.resultObject!=null){
- return String.valueOf(handler.resultObject);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }
- public int getIntValue() {
- if(session!=null){
- Properties prop = new Properties();
- prop.setProperty("interface",IFNAME);
- prop.setProperty("method","getIntValue");
- prop.put("argc",Integer.MAX_VALUE);
- //prop.put("object",new Object());
- ByteBuffer bb = ByteBuffer.allocate(16);
- bb.setAutoExpand( true );
- bb.putObject(prop);
- bb.flip();
- System.out.println("bbb==="+bb.toString());
- session.write(bb);
- }
- try {
- System.out.println("handler="+handler);
- System.out.println("lock="+handler.lock);
- printStamp("1");
- synchronized(handler.lock){
- printStamp("2");
- handler.lock.wait();
- printStamp("3");
- if(handler.resultObject!=null){
- return Integer.parseInt(String.valueOf(handler.resultObject));
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return Integer.MIN_VALUE;
- }
- public void printStamp(String str){
- System.out.println((new Date().toString())+" "+str);
- }
- public void printPrice() {
- if(session!=null){
- Properties prop = new Properties();
- prop.setProperty("interface",IFNAME);
- prop.setProperty("method","printPrice");
- prop.put("argc",Integer.MAX_VALUE);//参数个数
- ByteBuffer bb = ByteBuffer.allocate(16);
- bb.setAutoExpand( true );
- bb.putObject(prop);
- bb.flip();
- System.out.println("bbb==="+bb.toString());
- session.write(bb);
- }
- try {
- System.out.println("handler="+handler);
- System.out.println("lock="+handler.lock);
- printStamp("1");
- synchronized(handler.lock){
- printStamp("2");
- handler.lock.wait();
- printStamp("3");
- return;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return;
- }
- }
- package mina.client;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Properties;
- import org.apache.mina.common.ByteBuffer;
- import org.apache.mina.common.IdleStatus;
- import org.apache.mina.common.IoSession;
- import org.apache.mina.handler.StreamIoHandler;
- public class ClientIoHandler extends StreamIoHandler{
- IoSession session = null;
- List implList = new ArrayList();
- Object resultObject;
- Object lock = new Object();
- public ClientIoHandler(){
- //init impls.
- RpcClientImpl impl = RpcClientImpl.getInstance();
- implList.add(impl);
- //impl.setHandler(this);
- }
- private void initImpls(){
- Iterator it = implList.iterator();
- while(it.hasNext())
- {
- Object obj = it.next();
- if(obj instanceof AbstractClientImpl){
- AbstractClientImpl impl = (AbstractClientImpl)obj;
- impl.setSession(session);
- System.out.println("add this handler");
- impl.setHandler(this);
- }
- }
- }
- //收到服务端消息后的处理,框架内部为异步。在应用中改为同步。
- public void messageReceived(IoSession session, Object buf) {
- System.out.println("receive message.");
- System.out.println(buf.toString());
- try{
- if(buf instanceof ByteBuffer){
- Properties prop = (Properties)(((ByteBuffer)buf).getObject());
- System.out.println("received prop="+prop);
- String mark = prop.getProperty("mark");
- //根据mark类型,选择不同处理。
- if(mark.equals("REPLYCALL")){
- Object obj = prop.get("Object");
- synchronized(lock){
- System.out.println("result="+obj);
- resultObject = obj;
- lock.notify();
- }
- }else if(mark.equals("BROADCAST")){
- Object obj = prop.get("Object");
- System.out.println("BROADCAST obj="+obj);
- }else if(mark.equals("NOTIFY")){
- Object obj = prop.get("Object");
- System.out.println("NOTIFY obj="+obj);
- }else{
- System.out.println("unknow mark.mark="+mark);
- }
- }else{
- System.out.println("class type error.");
- }
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- protected void processStreamIo(IoSession session, InputStream is, OutputStream os) {
- System.out.println("process stream info,"+session.getRemoteAddress());
- }
- public void sessionOpened(IoSession session) {
- // Set reader idle time to 10 seconds.
- // sessionIdle(...) method will be invoked when no data is read
- // for 10 seconds.
- System.out.println("open session..");
- this.session = session;
- initImpls();
- session.setIdleTime(IdleStatus.READER_IDLE, 10);
- }
- public void sessionClosed(IoSession session) {
- // Print out total number of bytes read from the remote peer.
- System.err.println("Total " + session.getReadBytes() + " byte(s)");
- }
- public void sessionIdle(IoSession session, IdleStatus status) {
- // Close the connection if reader is idle.
- if (status == IdleStatus.READER_IDLE)
- session.close();
- }
- }
需要改进的地方:
在实际应用中,还需要精简客户端的结构,实现多个impl,一个handler,即实现一个将消息dispatch方法。服务端对客户端的主动消息还需要另外定义格式,以满足不同条件下的需要。或者需要定义内容比较详细的事件。
最后
以上就是动人战斗机为你收集整理的使用Mina实现RPC调用,消息通知,广播。的全部内容,希望文章能够帮你解决使用Mina实现RPC调用,消息通知,广播。所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复