この記事の主な目的は、.NET/C#を使用してTCPの高性能サービスを実装するためのさまざまな方法です。これには以下が含まれますが、これらに限定されません:
TAPメソッド、つまりタスクベースの非同期パターン
SAEAメソッド、つまりSocketAsyncEventArgs
RIOメソッド、つまりRegistered I/O
.NET/C# で S ソケットをサポート これらはすべて Windows I/O 完了ポートに基づいてポート テクノロジのパッケージ化を完了し、さまざまなノンブロッキング パッケージ構造を通じてさまざまな プログラミング ニーズに対応します。上記のメソッドは Cowboy.Sockets に完全に実装されており、APM メソッドと TAP メソッドは実際のプロジェクトに適用されています。 Cowboy.Sockets はまだ進化し、改善されています。問題がある場合は、すぐに修正してください。
非常に多くの実装メソッドがありますが、抽象的に見るとそれらは同じであり、以下の図に示すように、Accept Loop と Read Loop の 2 つのループで説明できます。 (ここで言う「Loop」は、具体的に while/for やその他のキーワードではなく、 ループ メソッド を指します。)
ending バックログがいっぱいになり、クライアントが Connect Timeout 例外を受け取ることになります。読み取りループがブロックされると、明らかにクライアントからデータを時間内に受信できなくなり、クライアントの送信バッファがいっぱいになり、データを送信できなくなります。
実装の詳細の観点から見ると、サービスのブロックを引き起こす可能性がある場所は次のとおりです。コピー
が発生する可能性があります。Module
によって処理されます。処理プロセスは低速です。1-2 には Accept プロセスと Connection 確立プロセスが含まれ、3-4 には ReceiveBuffer 処理プロセスが含まれます。アプリケーションロジック側の実装。
Java の有名な Netty ネットワーク ライブラリは、バージョン 4.0 以降、バッファ部分で新しい試みを行っています。ByteBuf と呼ばれる設計を採用して、バッファ ゼロ コピーを実装し、同時実行性が高い条件下でバッファ コピーによって引き起こされるパフォーマンスの損失と GC プレッシャーを軽減します。 。 DotNetty、Orleans、Helios などのプロジェクトは、同様の ByteBuf を C# で実装しようとしています。インターフェイス
を使用して実装されます。 TcpSocketServer の Accept ループは、BeginAccept -> EndAccept -> BeginAccept -> を参照します。 Session
を処理するため、TcpSocketSession には読み取りループが含まれます。BeginRead -> EndRead -> BeginRead -> ...
event EventHandler<TcpClientConnectedEventArgs> ClientConnected; event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;
もシンプルで使いやすく、イベントの通知を直接購読するだけです。
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public Task<Socket> AcceptSocketAsync() { return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task<TcpClient> AcceptTcpClientAsync() { return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
正常に確立されたすべての Connection が AsyncTcpSocketSession によって処理されるため、 Async TcpSocketSession には読み取りループが含まれます
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }
オブジェクトをAsyncTcpSocketServerのコンストラクターに注入するだけです。
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
public AsyncTcpSocketServer( IPEndPoint listenedEndPoint, Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null, Func<AsyncTcpSocketSession, Task> onSessionStarted = null, Func<AsyncTcpSocketSession, Task> onSessionClosed = null, AsyncTcpSocketServerConfiguration configuration = null) {}
これらの機能強化の新しい SocketAsyncEventArgs コンテキスト オブジェクトを割り当てるか、アプリケーション プールから空きオブジェクトを取得します。主な特徴は、大量の非同期ソケット I 中に繰り返される すべての位置とオブジェクトの同期を回避することです。 /O. Begin/End デザイン パターンは、現在、非同期ソケット I/O 用の Socket クラスによって実装されており、それぞれの 非同期ソケット操作に System.IAsyncResult オブジェクトを割り当てる必要があります利点は、各呼び出しでネイティブ ソケットに近い IAsyncResult などのオブジェクトを生成する必要がないことです。 SocketAsyncEventArgs を使用するための推奨手順は次のとおりです:
If
非同期ソケットメソッド (xxxAsync) がコールバックで true を返した場合、完了ステータスについてコンテキスト プロパティを照会します。非同期ソケット メソッド (xxxAsync) がコールバックで false を返した場合、操作は同期的に完了し、操作結果についてコンテキスト プロパティを照会できます。
public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool は QueuedObjectPool
TcpSocketSaeaServer 中的 Accept Loop 指的就是,
while (IsListening) { var saea = _acceptSaeaPool.Take(); var socketError = await _listener.AcceptAsync(saea); if (socketError == SocketError.Success) { var acceptedSocket = saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }
每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,
var saea = _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); while (State == TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(0, _receiveBuffer.Length); var socketError = await _socket.ReceiveAsync(saea); if (socketError != SocketError.Success) break; var receiveCount = saea.Saea.BytesTransferred; if (receiveCount == 0) break; }
同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。
public interface ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count); Task OnSessionClosed(TcpSocketSaeaSession session); }
使用起来也是简单直接:
public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
RIOCloseCompletionQueue
RIOCreateCompletionQueue
RIOCreateRequestQueue
RIODequeueCompletion
RIODeregisterBuffer
RIONotify
RIOReceive
RIOReceiveEx
RIORegisterBuffer
RIOResizeCompletionQueue
RIOResizeRequestQueue
RIOSend
RIOSendEx
到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。
Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。
同样,通过 TcpSocketRioServer 来实现 Accept Loop,
_listener.OnAccepted = (acceptedSocket) =>{ Task.Run(async () => { await Process(acceptedSocket); }) .Forget(); };
通过 TcpSocketRioSession 来处理 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); if (receiveCount == 0) break; }
测试代码一如既往的类似:
public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketRioSession session) { //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); Console.WriteLine(string.Format("TCP session has connected {0}.", session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.Write(string.Format("Client : --> ")); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
Asynchronous Programming Model (APM)
Task-based Asynchronous Pattern (TAP)
Event-based Asynchronous Pattern (EAP)
SocketAsyncEventArgs
Registered I/O
Netty: Reference counted objects
Socket Performance Enhancements in Version 3.5
What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2
RIO_EXTENSION_FUNCTION_TABLE structure
Windows 8 Registered I/O Networking Extensions
本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。
以上がC# での高性能 TCP サービスの複数の実装の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。