最近在对项目中Socket通讯中的服务端代码进行优化,保证能接受尽可能多的客户端的连接,并且不会丢掉连接,不会掉数据包。经过一段时间的反复测试和修改,终于达到了这一要求。服务端代码采用了异步通讯的方式,并使用ManualResetEvent来对线程进行控制。在程序中,ManualResetEvent 的使用很关键。 ManualResetEvent 允许线程通过发信号互相通信。通常,此通信涉及一个线程在其他线程进行之前必须完成的任务。当一个线程开始一个活动(此活动必须完成后,其他线程才能开始)时,它调用 Reset 以将 ManualResetEvent 置于非终止状态,此线程可被视为控制 ManualResetEvent。调用 ManualResetEvent 上的 WaitOne 的线程将阻止,并等待信号。当控制线程完成活动时,它调用 Set 以发出等待线程可以继续进行的信号。并释放所有等待线程。一旦它被终止,ManualResetEvent 将保持终止状态(即对 WaitOne 的调用的线程将立即返回,并不阻塞),直到它被手动重置。可以通过将布尔值传递给构造函数来控制 ManualResetEvent 的初始状态,如果初始状态处于终止状态,为 true;否则为 false。现在贴出主要的代码,欢迎大家指正,代码如下所示:
void ButtonStartListenClick(object sender, System.EventArgs e)
{ try { // Check the port value if(textBoxPort.Text == "") { MessageBox.Show("Please enter a Port Number"); return; } if (txtConnectNum.Text.Trim() != "") { iConnectNum = int.Parse(txtConnectNum.Text.Trim()); Flage = 0; } else { MessageBox.Show("Please enter a Connect Number"); return; } string portStr = textBoxPort.Text; int port = System.Convert.ToInt32(portStr); // Create the listening socket... m_mainSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPEndPoint ipLocal = new IPEndPoint (IPAddress.Any, port); // Bind to local IP Address... m_mainSocket.Bind( ipLocal ); // Start listening... m_mainSocket.Listen(10000); // Set the event to nonsignaled state.设置无信号状态的事件 allDone.Reset(); // Create the call back for any client connections... m_mainSocket.BeginAccept(new AsyncCallback (OnClientConnect), null); // Wait until a connection is made before continuing.等待连接创建后继续 allDone.WaitOne(); UpdateControls(true); } catch(SocketException se) { MessageBox.Show ( se.Message ); } }// This is the call back function, which will be invoked when a client is connected
public void OnClientConnect(IAsyncResult asyn) { try { //让它继续处理并建立与客户端的连接 allDone.Set(); // Here we complete/end the BeginAccept() asynchronous call // by calling EndAccept() - which returns the reference to // a new Socket object Socket workerSocket = m_mainSocket.EndAccept (asyn); // Now increment the client count for this client // in a thread safe manner Interlocked.Increment(ref m_clientCount); if (m_clientCount == 1) { lock (this) { stopwatch.Start(); dtStart = DateTime.Now; writeLog("Server Start Socket Connect Time"+dtStart.ToString("yyyy-MM-dd HH:mm:ss fff")); } } // Add the workerSocket reference to our ArrayList m_workerSocketList.Add(workerSocket); // Send a welcome message to client string msg = "Welcome client " + m_clientCount + "\n"; SendMsgToClient(msg, m_clientCount); // Update the list box showing the list of clients (thread safe call) UpdateClientListControl(); // Let the worker Socket do the further processing for the // just connected client WaitForData(workerSocket, m_clientCount); // Since the main Socket is now free, it can go back and wait for // other clients who are attempting to connect m_mainSocket.BeginAccept(new AsyncCallback ( OnClientConnect ),null); // Wait until a connection is made before continuing.等待连接创建后继续 allDone.WaitOne(); } catch(ObjectDisposedException) { System.Diagnostics.Debugger.Log(0,"1","\n OnClientConnection: Socket has been closed\n"); } catch(SocketException se) { MessageBox.Show ( se.Message ); } }public class SocketPacket
{ // Constructor which takes a Socket and a client number public SocketPacket(System.Net.Sockets.Socket socket, int clientNumber) { m_currentSocket = socket; m_clientNumber = clientNumber; } public System.Net.Sockets.Socket m_currentSocket; public int m_clientNumber; // Buffer to store the data sent by the client public byte[] dataBuffer = new byte[1024]; } // Start waiting for data from the client public void WaitForData(System.Net.Sockets.Socket soc, int clientNumber) { try { if ( pfnWorkerCallBack == null ) { // Specify the call back function which is to be // invoked when there is any write activity by the // connected client pfnWorkerCallBack = new AsyncCallback (OnDataReceived); } SocketPacket theSocPkt = new SocketPacket (soc, clientNumber); //receiveDone.Reset(); soc.BeginReceive (theSocPkt.dataBuffer, 0, theSocPkt.dataBuffer.Length, SocketFlags.None, pfnWorkerCallBack, theSocPkt); //receiveDone.WaitOne(); } catch(SocketException se) { MessageBox.Show (se.Message ); } } // This the call back function which will be invoked when the socket // detects any client writing of data on the stream public void OnDataReceived(IAsyncResult asyn) { SocketPacket socketData = (SocketPacket)asyn.AsyncState ; try { // Complete the BeginReceive() asynchronous call by EndReceive() method // which will return the number of characters written to the stream // by the client //receiveDone.Set(); int iRx = socketData.m_currentSocket.EndReceive (asyn); char[] chars = new char[iRx + 1]; // Extract the characters as a buffer System.Text.Decoder d = System.Text.Encoding.UTF8.GetDecoder(); int charLen = d.GetChars(socketData.dataBuffer, 0, iRx, chars, 0); System.String szData = new System.String(chars); string msg = "" + socketData.m_clientNumber + ":"; AppendToRichEditControl(msg + szData); //writeFromClientsMsgLog(msg + szData); // Send back the reply to the client string replyMsg = "Server Reply:" + szData.ToUpper(); // Convert the reply to byte array byte[] byData = System.Text.Encoding.ASCII.GetBytes(replyMsg); Socket workerSocket = (Socket)socketData.m_currentSocket; workerSocket.Send(byData); if (m_clientCount == iConnectNum && Flage == 0) { Interlocked.Increment(ref Flage); string msgTime = "Server End Socket Action Time:"; lock(this) { stopwatch.Stop(); //lblTime.Text = stopwatch.Elapsed.Seconds.ToString(); int itime = stopwatch.Elapsed.Milliseconds; //msgTime += stopwatch.Elapsed.Seconds.ToString()+"--"+itime.ToString(); msgTime += DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff"); } writeLog(msgTime); writeClientConnectionLog(); //UpdateLabelTime(msgTime); //byData = System.Text.Encoding.ASCII.GetBytes(msgTime); //workerSocket.Send(byData); } // Continue the waiting for data on the Socket WaitForData(socketData.m_currentSocket, socketData.m_clientNumber ); } catch (ObjectDisposedException ) { System.Diagnostics.Debugger.Log(0,"1","\nOnDataReceived: Socket has been closed\n"); } catch(SocketException se) { if(se.ErrorCode == 10054) // Error code for Connection reset by peer { string msg = "Client " + socketData.m_clientNumber + " Disconnected" + "\n"; AppendToRichEditControl(msg); //writeFromClientsMsgLog(msg); // Remove the reference to the worker socket of the closed client // so that this object will get garbage collected m_workerSocketList[socketData.m_clientNumber - 1] = null; UpdateClientListControl(); } else { MessageBox.Show (se.Message ); } } }void CloseSockets()
{ if(m_mainSocket != null) { m_mainSocket.Close(); } Socket workerSocket = null; for(int i = 0; i < m_workerSocketList.Count; i++) { workerSocket = (Socket)m_workerSocketList[i]; if(workerSocket != null) { workerSocket.Close(); workerSocket = null; } } }void SendMsgToClient(string msg, int clientNumber)
{ // Convert the reply to byte array byte[] byData = System.Text.Encoding.ASCII.GetBytes(msg); Socket workerSocket = (Socket)m_workerSocketList[clientNumber - 1]; //workerSocket.Send(byData); workerSocket.BeginSend(byData, 0, byData.Length, 0, new AsyncCallback(SendCallback), workerSocket); } private void SendCallback(IAsyncResult asyn) { Socket client = (Socket)asyn.AsyncState; // 完成数据发送. int bytesSent = client.EndSend(asyn); }完整的代码可以在我的资源中下载到,资源的名称为。