


Le package de file d'attente de messages de Python, SnakeMQ, est utilisé
L'utilisation de files d'attente de messages présente de nombreux avantages dans la communication de données. SnakeMQ est une bibliothèque MQ multiplateforme open source implémentée en Python. Eh bien, une étude préliminaire sur l'utilisation du package de file d'attente de messages de Python, SnakeMQ, c'est parti :
1. Introduction officielle à Snakemq
Page du projet GitHub de SnakeMQ : https://github.com/dsiroky/snakemq1 Implémentation python pure, multiplateforme
2. redémarrer la connexion
3. Envoi fiable - mode de message configurable et mode d'expiration des messages
4. Files d'attente persistantes/temporaires
Prise en charge asynchrone - sondage ()
6. symétrique -- une seule connexion TCP peut être utilisée pour la communication duplex 7. Prise en charge de plusieurs bases de données -- SQLite, MongoDB...8.brokerless - similaire. principe de mise en œuvre de ZeroMQ9. Modules d'extension : RPC, limitation de bande passanteCe qui précède sont tous des mots officiels et doivent être vérifiés par vous-même. Je l'ai emballé moi-même et c'est mignon.2. Description de plusieurs problèmes majeurs
1. Prise en charge de la reconnexion automatique, pas besoin d'écrire la logique du rythme cardiaque par vous-même, vous Concentrez-vous simplement sur l'envoi et la réception 2. Prend en charge la persistance des données Si la persistance est démarrée, les données seront envoyées automatiquement après la reconnexion. 3. Snakemq implémente la réception des données en fournissant des rappels. Il vous suffit d'écrire une méthode de réception et de l'ajouter à la liste de rappel. 4. Les données envoyées ici sont de type octets (binaires), elles doivent donc être converties. Ce que je teste dans le programme, ce sont toutes les chaînes de texte. J'utilise str.encode('utf-8') pour les convertir en octets, puis les reconvertir lors de la réception. 5. Explication de la terminologie, Connecteur : TcpClient similaire à socket, Listener : TcpServer similaire à socket. Chaque connecteur ou écouteur a une identification lors de l'envoi et de la réception de données, vous saurez de qui il s'agit. 6. Lorsque vous utilisez la persistance SQLite, vous devez modifier le code source, sqlite3.connect(filename, check_same_thread = False), pour résoudre le problème de l'accès multithread à SQLite. (Y aura-t-il une impasse ?) 7. Lors du démarrage de la persistance, si la connexion est reconnectée, elle sera envoyée automatiquement pour garantir la fiabilité. 8. Aux fins de l'encapsulation, une fois les données reçues, je les envoie par rappel.3. Code
Explication selon laquelle le module de journal personnalisé est utilisé dans le code
from common import nxlogger import snakemqlogger as logger
# -*- coding:utf-8 -*- '''synchronized callback''' class Callback(object): def __init__(self): self.callbacks = [] def add(self, func): self.callbacks.append(func) def remove(self, func): self.callbacks.remove(func) def __call__(self, *args, **kwargs): for callback in self.callbacks: callback(*args, **kwargs)
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from snakemq.storage.sqlite import SqliteQueuesStorage from snakemq.message import FLAG_PERSISTENT from common.callbacks import Callback from common import nxlogger import snakemqlogger as logger class SnakemqConnector(threading.Thread): def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False): super(SnakemqConnector,self).__init__() self.messaging = None self.link = None self.snakemqident = snakemqident self.pktr = None self.remoteIp = remoteIp self.remotePort = remotePort self.persistent = persistent self.on_recv = Callback() self._initConnector() def run(self): logger.info("connector start...") if self.link != None: self.link.loop() logger.info("connector end...") def terminate(self): logger.info("connetor terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("connetor terminated") def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data except Exception as e: logger.error("connector recv:{0}".format(e)) print(e) '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("connector:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg) ''' ''' def _initConnector(self): try: self.link = snakemq.link.Link() self.link.add_connector((self.remoteIp, self.remotePort)) self.pktr = snakemq.packeter.Packeter(self.link) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) except Exception as e: logger.error("connector:{0}".format(e)) finally: logger.info("connector[{0}] loop ended...".format(self.snakemqident))
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from common import nxlogger import snakemqlogger as logger from common.callbacks import Callback class SnakemqListener(threading.Thread): def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False): super(SnakemqListener,self).__init__() self.messaging = None self.link = None self.pktr = None self.snakemqident = snakemqident self.ip = ip; self.port = port self.connectors = {} self.on_recv = Callback() self.persistent = persistent self._initlistener() ''' thread run ''' def run(self): logger.info("listener start...") if self.link != None: self.link.loop() logger.info("listener end...") ''' terminate snakemq listener thread ''' def terminate(self): logger.info("listener terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("listener terminated") ''' receive message from host named ident ''' def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8')) except Exception as e: logger.error("listener recv:{0}".format(e)) print(e) def on_drop_message(self, ident, message): print("message dropped", ident, message) logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message)) '''client connect''' def on_connect(self, ident): logger.debug("listener:{0} connected".format(ident)) self.connectors[ident] = ident self.sendMsg(ident, "hello".encode('utf-8')) '''client disconnect''' def on_disconnect(self, ident): logger.debug("listener:{0} disconnected".format(ident)) if ident in self.connectors: self.connectors.pop(ident) ''' listen start loop ''' def _initlistener(self): try: self.link = snakemq.link.Link() self.link.add_listener((self.ip, self.port)) self.pktr = snakemq.packeter.Packeter(self.link) self.pktr.on_connect.add(self.on_connect) self.pktr.on_disconnect.add(self.on_disconnect) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) self.messaging.on_message_drop.add(self.on_drop_message) except Exception as e: logger.error("listener:{0}".format(e)) finally: logger.info("listener:loop ended...") '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("listener:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg)
from netComm.snakemq import snakemqConnector import time import sys import os def received(ident, data): print(data) if __name__ == "__main__": bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True) bob.on_recv.add(received) bob.start() try: with open("testfile.txt",encoding='utf-8') as f: txt = f.read() for i in range(100): bob.sendMsg("niess",txt.encode('utf-8')) time.sleep(0.1) except Exception as e: print(e) time.sleep(5) bob.terminate() 测试代码listener(testSnakeListener.py): from netComm.snakemq import snakemqListener import time def received(ident, data): filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime())) file = open(filename,'w') file.writelines(data) file.close() if __name__ == "__main__": niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002) niess.on_recv.add(received) niess.start() print("niess start...") time.sleep(60) niess.terminate() print("niess end...")

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Sujets chauds

Ce tutoriel montre comment utiliser Python pour traiter le concept statistique de la loi de Zipf et démontre l'efficacité de la lecture et du tri de Python de gros fichiers texte lors du traitement de la loi. Vous vous demandez peut-être ce que signifie le terme distribution ZIPF. Pour comprendre ce terme, nous devons d'abord définir la loi de Zipf. Ne vous inquiétez pas, je vais essayer de simplifier les instructions. La loi de Zipf La loi de Zipf signifie simplement: dans un grand corpus en langage naturel, les mots les plus fréquents apparaissent environ deux fois plus fréquemment que les deuxième mots fréquents, trois fois comme les troisième mots fréquents, quatre fois comme quatrième mots fréquents, etc. Regardons un exemple. Si vous regardez le corpus brun en anglais américain, vous remarquerez que le mot le plus fréquent est "th

Cet article explique comment utiliser la belle soupe, une bibliothèque Python, pour analyser HTML. Il détaille des méthodes courantes comme find (), find_all (), select () et get_text () pour l'extraction des données, la gestion de diverses structures et erreurs HTML et alternatives (Sel

Traiter avec des images bruyantes est un problème courant, en particulier avec des photos de téléphones portables ou de caméras basse résolution. Ce tutoriel explore les techniques de filtrage d'images dans Python à l'aide d'OpenCV pour résoudre ce problème. Filtrage d'image: un outil puissant Filtre d'image

Les fichiers PDF sont populaires pour leur compatibilité multiplateforme, avec du contenu et de la mise en page cohérents sur les systèmes d'exploitation, les appareils de lecture et les logiciels. Cependant, contrairement aux fichiers de texte brut de traitement Python, les fichiers PDF sont des fichiers binaires avec des structures plus complexes et contiennent des éléments tels que des polices, des couleurs et des images. Heureusement, il n'est pas difficile de traiter les fichiers PDF avec les modules externes de Python. Cet article utilisera le module PYPDF2 pour montrer comment ouvrir un fichier PDF, imprimer une page et extraire du texte. Pour la création et l'édition des fichiers PDF, veuillez vous référer à un autre tutoriel de moi. Préparation Le noyau réside dans l'utilisation du module externe PYPDF2. Tout d'abord, l'installez en utilisant PIP: pip is p

Ce tutoriel montre comment tirer parti de la mise en cache Redis pour augmenter les performances des applications Python, en particulier dans un cadre Django. Nous couvrirons l'installation redis, la configuration de Django et les comparaisons de performances pour mettre en évidence le bien

Cet article compare TensorFlow et Pytorch pour l'apprentissage en profondeur. Il détaille les étapes impliquées: préparation des données, construction de modèles, formation, évaluation et déploiement. Différences clés entre les cadres, en particulier en ce qui concerne le raisin informatique

Python, un favori pour la science et le traitement des données, propose un écosystème riche pour l'informatique haute performance. Cependant, la programmation parallèle dans Python présente des défis uniques. Ce tutoriel explore ces défis, en se concentrant sur l'interprète mondial

Ce didacticiel montre la création d'une structure de données de pipeline personnalisée dans Python 3, en tirant parti des classes et de la surcharge de l'opérateur pour une fonctionnalité améliorée. La flexibilité du pipeline réside dans sa capacité à appliquer une série de fonctions à un ensemble de données, GE
