L'objectif principal de cet article concerne les différentes manières d'utiliser .NET/C# pour implémenter des services TCP hautes performances, y compris, mais sans s'y limiter, les éléments suivants :
Méthode APM, c'est-à-dire AsynchrProgrammation Oneuse Modèle
Méthode TAP, c'est-à-dire, Modèle asynchrone basé sur les tâches
SAEUne méthode, c'est-à-dire SocketAsyncEventArgs
Méthode RIO, c'est-à-dire enregistrée La prise en charge des E/S
Socket dans .NET/C# est basée sur les ports de complétion d'E/S Windows pour compléter l'encapsulation de la technologie des ports et utilise différentes structures d'encapsulation non bloquantes pour répondre à différents programmation besoins. Les méthodes ci-dessus ont été entièrement implémentées dans Cowboy.Sockets, et les méthodes APM et TAP ont été appliquées dans des projets réels. Cowboy.Sockets continue d'évoluer et de s'améliorer. S'il y a des problèmes, veuillez les corriger à temps.
Bien qu'il existe de nombreuses méthodes d'implémentation, dans l'abstrait, ce sont les mêmes, qui peuvent être décrites par deux boucles : Accept Loop et Read Loop , comme illustré dans la figure ci-dessous. (La "Boucle" mentionnée ici fait référence à une bouclevoie, pas spécifiquement à pendant/pour et autres mots-clés. )
Dans toute implémentation de serveur TCP, il doit y avoir une boucle de socket d'acceptation pour recevoir les requêtes de connexion du client afin d'établir un TCP. Connexion.
Dans toute implémentation de serveur TCP, il doit y avoir une boucle de socket de lecture pour recevoir les données écrites par le client.
Si la boucle d'acceptation est bloquée, la connexion ne sera pas établie rapidement et le serveur Pending Backlog sera plein, ce qui entraînera le Client à recevez l'exception Connect Timeout. Si la boucle de lecture est bloquée, cela entraînera évidemment l'incapacité de recevoir les données du client à temps, ce qui entraînera la saturation du tampon d'envoi du client et ne pourra plus envoyer de données.
Du point de vue des détails de mise en œuvre, l'emplacement qui peut provoquer un blocage de service peut être :
Accepter un nouveau Socket, et la création d'une nouvelle connexion nécessite l'allocation de diverses ressources . L'allocation des ressources est lente ;
Accepter dans un nouveau Socket et la prochaine Acceptation n'est pas déclenchée à temps
Lire dans un nouveau tampon ; et déterminez la longueur du message de charge utile, le processus de jugement est long
Lisez le nouveau tampon et constatez que la charge utile n'a pas encore été collectée. Si vous continuez à lire, cela peut ". peut" entraîner une Copie du tampon ;
Une fois la charge utile reçue, la désérialisation est effectuée et convertie en un message de protocole reconnaissable. La désérialisation est lente ;
Par Business Module pour traiter le message de protocole correspondant, le processus de traitement est lent
1-2 implique ; le processus d'acceptation et le processus d'établissement de Connection, 3-4 impliquent le processus de traitement de ReceiverBuffer, 5-6 implique la mise en œuvre du côté logique d'application.
La célèbre bibliothèque réseau Netty en Java a fait une nouvelle tentative sur la partie Buffer à partir de la version 4.0, en adoptant une conception appelée ByteBuf pour implémenter Buffer Zero Copy afin de réduire les performances causées par la copie Buffer dans des conditions de concurrence élevée. pertes et pression GC. Des projets tels que DotNetty, Orleans et Helios tentent d'implémenter un ByteBuf similaire en C#.
L'implémentation de TcpSocketServer est basée sur l'encapsulation ultérieure de TcpListener et TcpClient fournie avec le .NET Framework, en utilisant l'interface BeginXXX et EndXXX basée sur APM. accomplir.
La boucle Accept dans TcpSocketServer fait référence à,
BeginAccept -> EndAccept-> 🎜>
Session, donc TcpSocketSession contiendra une boucle de lecture,
BeginRead -> EndRead -> BeginRead -> EndRead -> ...
TcpSocketServer est implémenté en exposant des événements Notification d'établissement et de déconnexion de connexion et de réception de données.
event EventHandler<TcpClientConnectedEventArgs> ClientConnected; event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;
est également simple et direct à utiliser, il suffit de vous abonner directement à la notification événement.
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }
L'implémentation d'AsyncTcpSocketServer est basée sur l'encapsulation ultérieure de TcpListener et TcpClient fournis avec le . NET Framework, implémenté à l'aide de l'interface XXXAsync basée sur l'async/await de TAP.
Cependant, en fait, XXXAsync ne crée aucun effet magique. Son implémentation interne convertit simplement les méthodes APM en méthodes d'appel TAP.
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public Task<Socket> AcceptSocketAsync() { return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task<TcpClient> AcceptTcpClientAsync() { return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
La boucle Accept dans AsyncTcpSocketServer fait référence à,
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
Chacun La connexion établie avec succès est gérée par AsyncTcpSocketSession, donc AsyncTcpSocketSession contiendra une boucle de lecture,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
Afin de rendre async/wait asynchrone jusqu'à la fin, l'interface exposée par AsyncTcpSocketServer est également Awaitable.
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }
Lors de son utilisation, il vous suffit d'injecter un objet qui implémente cette interface dans le constructeur d'AsyncTcpSocketServer . Peut.
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
Bien entendu, l'implémentation de l'interface n'est pas obligatoire, et l'implémentation de la méthode peut également être directement injectée dans le constructeur.
public AsyncTcpSocketServer( IPEndPoint listenedEndPoint, Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null, Func<AsyncTcpSocketSession, Task> onSessionStarted = null, Func<AsyncTcpSocketSession, Task> onSessionClosed = null, AsyncTcpSocketServerConfiguration configuration = null) {}
SAEA est l'abréviation de SocketAsyncEventArgs. SocketAsyncEventArgs est une implémentation qui prend en charge la communication Socket hautes performances à partir de .NET Framework 3.5. Les principaux avantages de SocketAsyncEventArgs par rapport à la méthode APM peuvent être décrits comme suit :
La principale caractéristique de ces améliorations est d'éviter les tous les répétitions emplacement et synchronisation des objets lors d'E/S de socket asynchrones à grand volume Le modèle de conception Begin/End actuelimplémenté par la classe Socket pour les E/S de socket asynchrones require. Un objet System.IAsyncResult doit être alloué pour chaque opération de socket asynchrone.
En d'autres termes, l'avantage est qu'il n'est pas nécessaire de générer IAsyncResult et d'autres objets pour chaque appel, plus proche du Socket natif Certains.
Les étapes recommandées pour utiliser SocketAsyncEventArgs sont les suivantes :
Allouez un nouvel objet contextuel SocketAsyncEventArgs ou obtenez-en un gratuitement à partir d'un pool d'applications.
Définissez les propriétés de l'objet contextuel sur l'opération sur le point d'être effectuée (la méthode déléguée de rappel et le tampon de données, par exemple).
Appelez la méthode socket appropriée (xxxAsync) pour lancer l'opération asynchrone.
Si la méthode socket asynchrone (xxxAsync) renvoie true dans le rappel, interrogez les propriétés de contexte pour connaître l'état d'achèvement.
Si la méthode socket asynchrone (xxxAsync) renvoie false dans le rappel, l'opération s'est terminée de manière synchrone. Les propriétés de contexte peuvent être interrogées pour le résultat de l'opération.
Réutilisez le contexte pour une autre opération, remettez-le dans le pool ou supprimez-le.
L'accent est mis sur la mise en commun. Le but de la mise en commun est de réutiliser et de réduire l'allocation de temps d'exécution. et la pression de la collecte des ordures.
TcpSocketSaeaServer est l'application et l'encapsulation de SocketAsyncEventArgs et implémente la technologie Pooling. L'objectif de TcpSocketSaeaServer est la classe SaeaAwaitable qui possède un SocketAsyncEventArgs intégré et renvoie SaeaAwaiter via GetAwaiter pour prendre en charge les opérations async/wait. Dans le même temps, l'implémentation Awaitable de SocketAsyncEventArgs est étendue via la paire de méthodes d'extension SaeaExtensions.
public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool est une implémentation dérivée de QueuedObjectPool
TcpSocketSaeaServer 中的 Accept Loop 指的就是,
while (IsListening) { var saea = _acceptSaeaPool.Take(); var socketError = await _listener.AcceptAsync(saea); if (socketError == SocketError.Success) { var acceptedSocket = saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }
每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,
var saea = _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); while (State == TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(0, _receiveBuffer.Length); var socketError = await _socket.ReceiveAsync(saea); if (socketError != SocketError.Success) break; var receiveCount = saea.Saea.BytesTransferred; if (receiveCount == 0) break; }
同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。
public interface ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count); Task OnSessionClosed(TcpSocketSaeaSession session); }
使用起来也是简单直接:
public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
RIOCloseCompletionQueue
RIOCreateCompletionQueue
RIOCreateRequestQueue
RIODequeueCompletion
RIODeregisterBuffer
RIONotify
RIOReceive
RIOReceiveEx
RIORegisterBuffer
RIOResizeCompletionQueue
RIOResizeRequestQueue
RIOSend
RIOSendEx
到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。
Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。
同样,通过 TcpSocketRioServer 来实现 Accept Loop,
_listener.OnAccepted = (acceptedSocket) =>{ Task.Run(async () => { await Process(acceptedSocket); }) .Forget(); };
通过 TcpSocketRioSession 来处理 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); if (receiveCount == 0) break; }
测试代码一如既往的类似:
public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketRioSession session) { //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); Console.WriteLine(string.Format("TCP session has connected {0}.", session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.Write(string.Format("Client : --> ")); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
Asynchronous Programming Model (APM)
Task-based Asynchronous Pattern (TAP)
Event-based Asynchronous Pattern (EAP)
SocketAsyncEventArgs
Registered I/O
Netty: Reference counted objects
Socket Performance Enhancements in Version 3.5
What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2
RIO_EXTENSION_FUNCTION_TABLE structure
Windows 8 Registered I/O Networking Extensions
本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!