我是靠谱客的博主 还单身冬日,这篇文章主要介绍Socket实现发布订阅,现在分享给大家,希望可以做个参考。

片头语:

      由于工作需要,最近一直在蛋疼这些事情,实在分身不暇。原计划1周一篇的WPF系列,也就此搁置了。不过俗话说的好,生活就像强X,反抗不了,就去享受吧……

  在这段时间里,把一直不太懂的Socket编程一步一步看了一些,也算收获颇丰了。在这里给初学者推荐一篇学习Socket的博客,园子里著名高帅富JimmyZhang写的,通俗易懂由浅入深……也解答了我很多疑惑,在这里对博主的文采和原创谨慎表示感谢。

  传送门:http://www.cnblogs.com/JimmyZhang/archive/2008/09/07/1286300.html

 

下边进入正题:

    发布订阅机制,跟观察者模式相同,WCF那篇里也有说,这里简单描述一下。

  客户端--注册-->服务

  服务--广播-->客户端

 

  实现发布订阅,需要在服务端维护已连接客户端列表,并且客户端与服务端之间需要建立可靠的长连接。

 

  使用Socket实现,服务端的Socket需要创建一个Listener用于监听客户端的连接。而客户端需要Connect到服务器之后,才可以进行消息的交换。

 

      这里盗一张图,表达一下Socket通讯的生命周期……见谅……

  

      

  我们先看服务端:

    服务端最主要的就是做了2件事,一个是维护一个已连接的客户端列表,另一个就是循环对客户端进行消息广播。

      服务器端的Socket通过Accept()阻塞线程,等待客户端的连接,获取客户端Socket对象。然而我们却不能使用同步的方式进行开发,因为我们除了时刻侦听客户端连接的同时,我们还需要做广播消息的动作,因此这里对Aceept的行为,我们选择使用异步进行实现。代码如下:

  

PushServer
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/// <summary> /// 推送服务类 /// </summary> public class PushServer : IPushServer { #region 基本参数 /// <summary> /// 监听器 /// </summary> TcpListener listener; /// <summary> /// 客户端列表 /// </summary> List<TcpClient> clientList; /// <summary> /// 异步获取客户端连接回调 /// </summary> AsyncCallback acceptTcpClientCallback; #endregion #region 构造函数 public PushServer(string ip, int port) { //初始化侦听器 listener = new TcpListener(IPAddress.Parse(ip), port); //初始化客户端列表 clientList = new List<TcpClient>(); //初始化异步操作委托 acceptTcpClientCallback = new AsyncCallback(OnAcceptCallback); } #endregion #region StratListen /// <summary> /// 开始侦听并推送 /// </summary> public void Start() { this.Start(0); } /// <summary> /// 开始侦听并推送 /// </summary> /// <param name="block"></param> public void Start(int block) { //启动侦听列表 listener.Start(block); Console.WriteLine("服务启动..."); //异步操作解决,无法用同步 listener.BeginAcceptTcpClient(acceptTcpClientCallback, null); } /// <summary> /// 异步获取客户端连接的回调 /// </summary> /// <param name="ar"></param> private void OnAcceptCallback(IAsyncResult ar) { try { lock (clientList) { TcpClient remoteClient = listener.EndAcceptTcpClient(ar); //将客户端添加到列表中 clientList.Add(remoteClient); Console.WriteLine("加入远程客户端{0}到列表中", remoteClient.Client.RemoteEndPoint); //回调等待下一个客户端的加入 listener.BeginAcceptTcpClient(acceptTcpClientCallback, null); } } catch (Exception) { Console.WriteLine("发生异常,未加入客户端Socket"); } } #endregion #region Send Message public void SendMessage(string message) { byte[] buffer = Encoding.UTF8.GetBytes(message); foreach (var remoteClient in clientList.ToArray()) { if (remoteClient.Connected) { NetworkStream ns = null; try { //获取流 ns = remoteClient.GetStream(); //写入数据到客户端缓冲区 ns.Write(buffer, 0, buffer.Length); //清空缓冲区 全部发送到客户端 ns.Flush(); } catch (Exception e) { Console.WriteLine("Socket连接 {0} 出现异常,错误信息:n{1}", remoteClient.Client.RemoteEndPoint, e.Message); //从广播列表中移除 clientList.Remove(remoteClient); //进行资源释放 if (ns != null) ns.Dispose(); remoteClient.Close(); } } else { //移除 clientList.Remove(remoteClient); Console.WriteLine("客户端{0}断开连接,已回收", remoteClient.Client.RemoteEndPoint); } } } #endregion }

 

  PushServer这个类,封装了服务器的所有行为,我们在这里关注的是它的Start()方法。服务端的Listener 我们使用的是System.Net.Sockets命名空间下的TcpListener类。这个类是微软封装好的用于处理服务端Socket的类型。

  在Start()方法里,我们在建立了服务侦听之后,使用了BeginAcceptTcpClient方法。这是个异步的方法,会开启后台线程等待客户端的连接,不会阻塞主线程的执行。在方法的回调委托中,我们将获取到的客户端连接压入到我们的客户端列表中。在完成客户端的搜集之后,我们应该再次启用BeginAccpetTcpClient进行对客户端的监听,形成一个循环。这么做,是因为服务端每次只能Accept到一个客户端Socket对象,当处理完这个对象之后,需要再次Accept才能得到下一个连接的客户端对象。

 

  至于SendMessage方法就不多说了,就是通过NetworkStream向客户端写入数据即可。

  NetworkStream中数据的流向是双向的,即客户端的数据可以流向服务端,服务端的数据也可以流向客户端。

  NetworkStream有两个方法Read 与 Write 分别是对数据流的读写操作。

  其中Read方法是会阻塞线程的,也就是说当一端为Read时,线程是阻塞的,除非另一端使用Write写入了一段数据,否则会一直等待下去。

  同时Write方法在写入数据的时候,数据会立刻流向另一端的缓冲区中。

  在这里需要注意的是,在服务器与客户端交换数据的过程中,无论哪一方Close()了NetworkStream对象,都会关闭这个数据流。

 

  服务端的东西就这么多了,我们来看客户端。

 

  客户端:

  

Client
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class Client { #region 基本参数 //为了方便操作,这里设置为全局变量 private TcpClient client; private AsyncCallback receiveCallback; public event EventHandler<MessageReceiveEventArgs> MessageReceived; //设置缓冲区,8196字节 byte[] buffer = new byte[8196]; //网络流 NetworkStream ns = null; #endregion #region public Client() { client = new TcpClient(); receiveCallback = new AsyncCallback(OnReceived); } #endregion #region 连接 #endregion public void Connect(string ip, int port) { try { //尝试连接 client.Connect(IPAddress.Parse(ip), port); //输出连接信息 Console.WriteLine("连接到远程服务器{0}成功", client.Client.RemoteEndPoint); //获取数据流 ns = client.GetStream(); //异步读取信息 ns.BeginRead(buffer, 0, buffer.Length, receiveCallback, null); } catch (Exception e) { Console.WriteLine("尝试连接远程服务器失败,错误信息:n{0}", e.Message); } } #region 收取 /// <summary> /// 异步收取回调 /// </summary> /// <param name="ar"></param> private void OnReceived(IAsyncResult ar) { try { //获取读取字节数 int readBytes = ns.EndRead(ar); //如果读取到0字节,则是废包……直接跳出 if (readBytes != 0) { //编码 string msg = Encoding.UTF8.GetString(buffer, 0, readBytes); //抛出事件 if (this.MessageReceived != null) { this.MessageReceived(this, new MessageReceiveEventArgs(msg)); } } //输出读取到的数据 Console.WriteLine("读取了{0}字节的数据", readBytes); //清理数组 避免脏读 Array.Clear(buffer, 0, buffer.Length); //递归读取下一条进入缓冲区的数据 ns.BeginRead(buffer, 0, buffer.Length, receiveCallback, null); } catch (Exception e) { Console.WriteLine("异步收取失败...."); if (ns != null) ns.Dispose(); client.Close(); } } #endregion }

 

  由于客户端是各自独立的,不存在一对多的情况,因为客户端可以完全使用同步操作。本段代码中对NetWorkStream使用了异步操作,使用方式大同小异。就不多说了。

  当客户端Connect到服务端的时候,就会被存到列表中。这时候服务端就可以向客户端广播消息了。当服务端向NetWorkStream中写入数据的时候,客户端阻塞的NetWorkStream的Read()方法就开始读取数据并且在处理完毕后等待下一条消息的写入。

 

  总体实现大概就是这样,晚些会贴上完整代码。Socket实现发布订阅并不是很难,只要思想正确了,实现的方式,与WCF也是大同小异的。这里没有做粘包的处理,有兴趣的朋友可以自己完成。最近忙得焦头烂额,文章质量可能也不是很好,希望大家可以多多包涵 谢谢了。

  

 

  Demo地址:猛击这里

转载于:https://www.cnblogs.com/ShadowLoki/archive/2012/09/06/2673099.html

最后

以上就是还单身冬日最近收集整理的关于Socket实现发布订阅的全部内容,更多相关Socket实现发布订阅内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部