概述
上一篇讲到了数据的处理,这一篇主要讲使用多线程收发消息
//创建消息数据模型 2
//正式项目中,消息的结构一般是消息长度+消息id+消息主体内容 3
public class Message 4
{
public IExtensible protobuf;
public int messageId; 7 } 8 9 public class SocketClientTemp : MonoBehaviour 10 { 11
const int packageMaxLength = 1024; 12 13 Socket mSocket; 14 Thread threadSend; 15
Thread threadRecive; 16 Queue<Message> allMessages = new Queue<Message>(); 17
Queue<byte[]> sendQueue = new Queue<byte[]>();
public bool Init() 20
{ 21 //创建一个socket对象 22
mSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 23
return SocketConnection("此处是ip", 1111); 24 } 25 26 void Update() 27 { 28
AnalysisMessage(); 29 } 30 31 /// <summary> 32 /// 建立服务器连接 33 /// </summary> 34
/// <param name="ip">服务器的ip地址</param> 35 /// <param name="port">端口</param> 36
bool SocketConnection(string ip, int port) 37 { 38 try 39 { 40
IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port); 41
//同步连接服务器,实际使用时推荐使用异步连接,处理方式会在下一篇讲断线重连时讲到 42
mSocket.Connect(ipep); 43 //连接成功后,创建两个线程,分别用于发送和接收消息 44
threadSend = new Thread(new ThreadStart(SendMessage)); 45 threadSend.Start(); 46
threadRecive = new Thread(new ThreadStart(ReceiveMessage)); 47 threadRecive.Start(); 48
return true; 49 } 50 catch (Exception e) 51 { 52
Debug.Log(e.ToString()); 53 Close(); 54 return false; 55
} }
#region ...发送消息 59
/// <summary> 60 /// 添加数据到发送队列 61
/// </summary> 62 /// <param name="protobufModel"></param> 63
/// <param name="messageId"></param> 64
public void AddSendMessageQueue(IExtensible protobufModel, int messageId) 65
{ 66 sendQueue.Enqueue(BuildPackage(protobufModel, messageId)); 67 }
void SendMessage() 70 { 71 //循环获取发送队列中第一个数据,然后发送到服务器 72
while (true) 73 { 74 if (sendQueue.Count == 0) 75 { 76
Thread.Sleep(100); 77 continue; 78 } 79
if (!mSocket.Connected) 80 { 81 Close(); 82
break; 83 } 84
else 85 Send(sendQueue.Peek());
//发送队列中第一条数据 86 } 87 } 88 89
void Send(byte[] bytes) 90 { 91 try 92
{ 93 mSocket.Send(bytes, SocketFlags.None); 94
//发送成功后,从发送队列中移除已发送的消息 95 sendQueue.Dequeue(); 96
} 97 catch (SocketException e) 98 { 99
//如果错误码为10035,说明服务器缓存区满了,所以等100毫秒再次发送100
if (e.NativeErrorCode == 10035)101 {102
Thread.Sleep(100);103 Send(bytes);104
}105 else106
Debug.Log(e.ToString());107 }108 }109 #endregion110 111
#region ...接收消息112 /// <summary>113 /// 解析收到的消息114
/// </summary>115 void AnalysisMessage()116 {117
while (allMessages.Count > 0)118 {119
int id = allMessages.Dequeue().messageId;120
switch (id)121 {122
//根据消息id做不同的处理123 }124 }125 }126 127
/// <summary>128 /// 接收数据129 /// </summary>130
void ReceiveMessage()131 {132 while (true)133
{134 if (!mSocket.Connected)135
break;136 byte[] recvBytesHead = GetBytesReceive(4);137
int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvBytesHead, 0));138
byte[] recvBytesBody = GetBytesReceive(bodyLength);139 140
byte[] messageId = new byte[4];141
Array.Copy(recvBytesBody, 0, messageId, 0, 4);142
byte[] messageBody = new byte[bodyLength - 4];143
Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4);144 145
if (BitConverter.IsLittleEndian)146
Array.Reverse(messageId);147
FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody);148
}
}
/// <summary>152
/// 填充接收消息队列153 /// </summary>154
/// <param name="messageId"></param>155 /// <param name="messageBody"></param>156 void FillAllPackages(int messageId, byte[] messageBody)157 {158 switch (messageId)159 {160 //根据消息id处理消息,并添加到接收消息队列161 case 1:162 allMessages.Enqueue(new Message()
163
{164
protobuf = ProtobufSerilizer.DeSerialize<TestTemp>(messageBody),
165
messageId = messageId
166 });167
break;168 }169 }170 171
/// <summary>172 /// 接收数据并处理173
/// </summary>174 /// <param name="length"></param>175
/// <returns></returns>176 byte[] GetBytesReceive(int length)177
{178 byte[] recvBytes = new byte[length];179 while (length > 0)180
{181
byte[] receiveBytes = new byte[length < packageMaxLength ? length : packageMaxLength];182
int iBytesBody = 0;183 if (length >= receiveBytes.Length)184
iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0);185
else186 iBytesBody = mSocket.Receive(receiveBytes, length, 0);187
receiveBytes.CopyTo(recvBytes, recvBytes.Length - length);188 length -= iBytesBody;189
}190 return recvBytes;191 }192 #endregion193 194 /// <summary>195 /// 构建消息数据包196
/// </summary>197 /// <param name="protobufModel"></param>198 /// <param name="messageId"></param>199
byte[] BuildPackage(IExtensible protobufModel, int messageId)200 {201 byte[] b;202
if (protobufModel != null)203 b = ProtobufSerilizer.Serialize(protobufModel);204
else205 b = new byte[0];206 //消息长度(int数据,长度4) + 消息id(int数据,长度4) + 消息主体内容207
ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4);208 //消息长度 = 消息主体内容长度 + 消息id长度209
buf.WriteInt(b.Length + 4);210 buf.WriteInt(messageId);211 212 if (protobufModel != null)213
buf.WriteBytes(b);214 return buf.GetBytes();215 }216 217 void OnDestroy()218 {219
//停止运行后,如果不关闭socket多线程,再次运行时,unity会卡死220 Close();221 }222 223
/// <summary>224 /// 关闭socket,终止线程225 /// </summary>226 public void Close()227
{228 if (mSocket != null)229 {230
//微软官方推荐在关闭socket前先shutdown,但是经过测试,发现网络断开后,shutdown会无法执行231
if (mSocket.Connected)232 mSocket.Shutdown(SocketShutdown.Both);233
mSocket.Close();234 mSocket = null;235 }236 //关闭线程237
if (threadSend != null)238 threadSend.Abort();239 if (threadRecive != null)240
threadRecive.Abort();241 threadSend = null;242 threadRecive = null;243 }244 }
登录后复制
到这里,使用socket处理消息的收发就基本结束了,但是,某些项目为了增强体验,可能还会增加断线重连的功能,这个功能会在下一篇讲到
以上就是socket传输protobuf字节流的实例介绍的详细内容,更多请关注靠谱客其它相关文章!
最后
以上就是机智香氛为你收集整理的socket传输protobuf字节流的实例介绍的全部内容,希望文章能够帮你解决socket传输protobuf字节流的实例介绍所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复