概述
1、SocketAsyncEventArgs介绍
SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步。使用此类执行异步套接字操作的模式包含以下步骤:
1.分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
2.将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
3.调用适当的套接字方法 (xxxAsync) 以启动异步操作。
4.如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
5.如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
6.将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。
2、SocketAsyncEventArgs封装
使用SocketAsyncEventArgs之前需要先建立一个Socket监听对象,使用如下代码:
然后开始接受连接,SocketAsyncEventArgs有连接时会通过Completed事件通知外面,所以接受连接的代码如下:public void Start(IPEndPoint localEndPoint) { listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(localEndPoint); listenSocket.Listen(m_numConnections); Program.Logger.InfoFormat("Start listen socket {0} success", localEndPoint.ToString()); //for (int i = 0; i < 64; i++) //不能循环投递多次AcceptAsync,会造成只接收8000连接后不接收连接了 StartAccept(null); m_daemonThread = new DaemonThread(this); }
接受连接响应事件代码:public void StartAccept(SocketAsyncEventArgs acceptEventArgs) { if (acceptEventArgs == null) { acceptEventArgs = new SocketAsyncEventArgs(); acceptEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed); } else { acceptEventArgs.AcceptSocket = null; //释放上次绑定的Socket,等待下一个Socket连接 } m_maxNumberAcceptedClients.WaitOne(); //获取信号量 bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs); if (!willRaiseEvent) { ProcessAccept(acceptEventArgs); } }
void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs) { try { ProcessAccept(acceptEventArgs); } catch (Exception E) { Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", acceptEventArgs.AcceptSocket, E.Message); Program.Logger.Error(E.StackTrace); } }
接受连接后,从当前Socket缓冲池AsyncSocketUserTokenPool中获取一个用户对象AsyncSocketUserToken,AsyncSocketUserToken包含一个接收异步事件m_receiveEventArgs,一个发送异步事件m_sendEventArgs,接收数据缓冲区m_receiveBuffer,发送数据缓冲区m_sendBuffer,协议逻辑调用对象m_asyncSocketInvokeElement,建立服务对象后,需要实现接收和发送的事件响应函数:private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs) { Program.Logger.InfoFormat("Client connection accepted. Local Address: {0}, Remote Address: {1}", acceptEventArgs.AcceptSocket.LocalEndPoint, acceptEventArgs.AcceptSocket.RemoteEndPoint); AsyncSocketUserToken userToken = m_asyncSocketUserTokenPool.Pop(); m_asyncSocketUserTokenList.Add(userToken); //添加到正在连接列表 userToken.ConnectSocket = acceptEventArgs.AcceptSocket; userToken.ConnectDateTime = DateTime.Now; try { bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求 if (!willRaiseEvent) { lock (userToken) { ProcessReceive(userToken.ReceiveEventArgs); } } } catch (Exception E) { Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", userToken.ConnectSocket, E.Message); Program.Logger.Error(E.StackTrace); } StartAccept(acceptEventArgs); //把当前异步事件释放,等待下次连接 }
在Completed事件中需要处理发送和接收的具体逻辑代码,其中接收的逻辑实现如下:void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs) { AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken; userToken.ActiveDateTime = DateTime.Now; try { lock (userToken) { if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive) ProcessReceive(asyncEventArgs); else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send) ProcessSend(asyncEventArgs); else throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } catch (Exception E) { Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message); Program.Logger.Error(E.StackTrace); } }
由于我们制定的协议第一个字节是协议标识,因此在接收到第一个字节的时候需要绑定协议解析对象,具体代码实现如下:private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs) { AsyncSocketUserToken userToken = receiveEventArgs.UserToken as AsyncSocketUserToken; if (userToken.ConnectSocket == null) return; userToken.ActiveDateTime = DateTime.Now; if (userToken.ReceiveEventArgs.BytesTransferred > 0 && userToken.ReceiveEventArgs.SocketError == SocketError.Success) { int offset = userToken.ReceiveEventArgs.Offset; int count = userToken.ReceiveEventArgs.BytesTransferred; if ((userToken.AsyncSocketInvokeElement == null) & (userToken.ConnectSocket != null)) //存在Socket对象,并且没有绑定协议对象,则进行协议对象绑定 { BuildingSocketInvokeElement(userToken); offset = offset + 1; count = count - 1; } if (userToken.AsyncSocketInvokeElement == null) //如果没有解析对象,提示非法连接并关闭连接 { Program.Logger.WarnFormat("Illegal client connection. Local Address: {0}, Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint); CloseClientSocket(userToken); } else { if (count > 0) //处理接收数据 { if (!userToken.AsyncSocketInvokeElement.ProcessReceive(userToken.ReceiveEventArgs.Buffer, offset, count)) { //如果处理数据返回失败,则断开连接 CloseClientSocket(userToken); } else //否则投递下次介绍数据请求 { bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求 if (!willRaiseEvent) ProcessReceive(userToken.ReceiveEventArgs); } } else { bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求 if (!willRaiseEvent) ProcessReceive(userToken.ReceiveEventArgs); } } } else { CloseClientSocket(userToken); } }
发送响应函数实现需要注意,我们是把发送数据放到一个列表中,当上一个发送事件完成响应Completed事件,这时我们需要检测发送队列中是否存在未发送的数据,如果存在则继续发送。private void BuildingSocketInvokeElement(AsyncSocketUserToken userToken) { byte flag = userToken.ReceiveEventArgs.Buffer[userToken.ReceiveEventArgs.Offset]; if (flag == (byte)SocketFlag.Upload) userToken.AsyncSocketInvokeElement = new UploadSocketProtocol(this, userToken); else if (flag == (byte)SocketFlag.Download) userToken.AsyncSocketInvokeElement = new DownloadSocketProtocol(this, userToken); else if (flag == (byte)SocketFlag.RemoteStream) userToken.AsyncSocketInvokeElement = new RemoteStreamSocketProtocol(this, userToken); else if (flag == (byte)SocketFlag.Throughput) userToken.AsyncSocketInvokeElement = new ThroughputSocketProtocol(this, userToken); else if (flag == (byte)SocketFlag.Control) userToken.AsyncSocketInvokeElement = new ControlSocketProtocol(this, userToken); else if (flag == (byte)SocketFlag.LogOutput) userToken.AsyncSocketInvokeElement = new LogOutputSocketProtocol(this, userToken); if (userToken.AsyncSocketInvokeElement != null) { Program.Logger.InfoFormat("Building socket invoke element {0}.Local Address: {1}, Remote Address: {2}", userToken.AsyncSocketInvokeElement, userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint); } }
SendCompleted用于回调下次需要发送的数据,具体实现过程如下:private bool ProcessSend(SocketAsyncEventArgs sendEventArgs) { AsyncSocketUserToken userToken = sendEventArgs.UserToken as AsyncSocketUserToken; if (userToken.AsyncSocketInvokeElement == null) return false; userToken.ActiveDateTime = DateTime.Now; if (sendEventArgs.SocketError == SocketError.Success) return userToken.AsyncSocketInvokeElement.SendCompleted(); //调用子类回调函数 else { CloseClientSocket(userToken); return false; } }
当一个SocketAsyncEventArgs断开后,我们需要断开对应的Socket连接,并释放对应资源,具体实现函数如下:public virtual bool SendCompleted() { m_activeDT = DateTime.UtcNow; m_sendAsync = false; AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer; asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包 int offset = 0; int count = 0; if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count)) { m_sendAsync = true; return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs, asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count); } else return SendCallback(); } //发送回调函数,用于连续下发数据 public virtual bool SendCallback() { return true; }
3、SocketAsyncEventArgs封装和MSDN的不同点public void CloseClientSocket(AsyncSocketUserToken userToken) { if (userToken.ConnectSocket == null) return; string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint); Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo); try { userToken.ConnectSocket.Shutdown(SocketShutdown.Both); } catch (Exception E) { Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message); } userToken.ConnectSocket.Close(); userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源 m_maxNumberAcceptedClients.Release(); m_asyncSocketUserTokenPool.Push(userToken); m_asyncSocketUserTokenList.Remove(userToken); }
MSDN在http://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx实现了示例代码,并实现了初步的池化处理,我们是在它的基础上扩展实现了接收数据缓冲,发送数据队列,并把发送SocketAsyncEventArgs和接收SocketAsyncEventArgs分开,并实现了协议解析单元,这样做的好处是方便后续逻辑实现文件的上传,下载和日志输出。
DEMO下载地址:http://download.csdn.net/detail/sqldebug_fan/7467745
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。
最后
以上就是欢呼斑马为你收集整理的C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装的全部内容,希望文章能够帮你解决C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复