Maison développement back-end Tutoriel Python Le package de file d'attente de messages de Python, SnakeMQ, est utilisé

Le package de file d'attente de messages de Python, SnakeMQ, est utilisé

Mar 01, 2017 pm 02:06 PM

L'utilisation de files d'attente de messages présente de nombreux avantages dans la communication de données. SnakeMQ est une bibliothèque MQ multiplateforme open source implémentée en Python. Eh bien, une étude préliminaire sur l'utilisation du package de file d'attente de messages de Python, SnakeMQ, c'est parti :

1. Introduction officielle à Snakemq
Page du projet GitHub de SnakeMQ : https://github.com/dsiroky/snakemq1 Implémentation python pure, multiplateforme

2. redémarrer la connexion

3. Envoi fiable - mode de message configurable et mode d'expiration des messages

4. Files d'attente persistantes/temporaires

Prise en charge asynchrone - sondage ()

6. symétrique -- une seule connexion TCP peut être utilisée pour la communication duplex

7. Prise en charge de plusieurs bases de données -- SQLite, MongoDB...

8.brokerless - similaire. principe de mise en œuvre de ZeroMQ

9. Modules d'extension : RPC, limitation de bande passante

Ce qui précède sont tous des mots officiels et doivent être vérifiés par vous-même. Je l'ai emballé moi-même et c'est mignon.


2. Description de plusieurs problèmes majeurs

1. Prise en charge de la reconnexion automatique, pas besoin d'écrire la logique du rythme cardiaque par vous-même, vous Concentrez-vous simplement sur l'envoi et la réception

2. Prend en charge la persistance des données Si la persistance est démarrée, les données seront envoyées automatiquement après la reconnexion.

3. Snakemq implémente la réception des données en fournissant des rappels. Il vous suffit d'écrire une méthode de réception et de l'ajouter à la liste de rappel.

4. Les données envoyées ici sont de type octets (binaires), elles doivent donc être converties. Ce que je teste dans le programme, ce sont toutes les chaînes de texte. J'utilise str.encode('utf-8') pour les convertir en octets, puis les reconvertir lors de la réception.

5. Explication de la terminologie, Connecteur : TcpClient similaire à socket, Listener : TcpServer similaire à socket. Chaque connecteur ou écouteur a une identification lors de l'envoi et de la réception de données, vous saurez de qui il s'agit.

6. Lorsque vous utilisez la persistance SQLite, vous devez modifier le code source, sqlite3.connect(filename, check_same_thread = False), pour résoudre le problème de l'accès multithread à SQLite. (Y aura-t-il une impasse ?)

7. Lors du démarrage de la persistance, si la connexion est reconnectée, elle sera envoyée automatiquement pour garantir la fiabilité.

8. Aux fins de l'encapsulation, une fois les données reçues, je les envoie par rappel.


3. Code

Explication selon laquelle le module de journal personnalisé est utilisé dans le code

from common import nxlogger

import snakemqlogger as logger
Copier après la connexion

peut être remplacé par la journalisation.

Classe de rappel (callbacks.py) :

# -*- coding:utf-8 -*-

'''synchronized callback'''

class Callback(object):

  def __init__(self):

    self.callbacks = []

 

  def add(self, func):

    self.callbacks.append(func)

 

  def remove(self, func):

    self.callbacks.remove(func)

 

  def __call__(self, *args, **kwargs):

    for callback in self.callbacks:

      callback(*args, **kwargs)
Copier après la connexion

Classe de connecteur (snakemqConnector.py) :

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from snakemq.storage.sqlite import SqliteQueuesStorage

from snakemq.message import FLAG_PERSISTENT

from common.callbacks import Callback

 

from common import nxlogger

import snakemqlogger as logger

 

class SnakemqConnector(threading.Thread):

     def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):

         super(SnakemqConnector,self).__init__()

         self.messaging = None

         self.link = None

         self.snakemqident = snakemqident

         self.pktr = None

         self.remoteIp = remoteIp

         self.remotePort = remotePort

         self.persistent = persistent

         self.on_recv = Callback()

         self._initConnector()

 

     def run(self):

         logger.info("connector start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("connector end...")

    

     def terminate(self):

         logger.info("connetor terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("connetor terminated")

 

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

         except Exception as e:

              logger.error("connector recv:{0}".format(e))

              print(e)

 

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("connector:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

 

     '''

    

     '''

     def _initConnector(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_connector((self.remoteIp, self.remotePort))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

             

         except Exception as e:

              logger.error("connector:{0}".format(e))

         finally:

              logger.info("connector[{0}] loop ended...".format(self.snakemqident))
Copier après la connexion

Classe d'écoute (snakemqListener.py) :

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from common import nxlogger

import snakemqlogger as logger

from common.callbacks import Callback

class SnakemqListener(threading.Thread):

     def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):

         super(SnakemqListener,self).__init__()

         self.messaging = None

         self.link = None

         self.pktr = None

         self.snakemqident = snakemqident

         self.ip = ip;

         self.port = port

         self.connectors = {}

         self.on_recv = Callback()

         self.persistent = persistent

         self._initlistener()

 

     '''

     thread run

     '''

     def run(self):

         logger.info("listener start...")

         

         if self.link != None:

              self.link.loop()

 

         logger.info("listener end...")

 

     '''

     terminate snakemq listener thread

     '''

     def terminate(self):

         logger.info("listener terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("listener terminated")

 

     '''

     receive message from host named ident

     '''

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

              self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))

         except Exception as e:

              logger.error("listener recv:{0}".format(e))

              print(e)

 

     def on_drop_message(self, ident, message):

         print("message dropped", ident, message)

         logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))

 

     '''client connect'''

     def on_connect(self, ident):

         logger.debug("listener:{0} connected".format(ident))

         self.connectors[ident] = ident

         self.sendMsg(ident, "hello".encode('utf-8'))

 

     '''client disconnect'''

     def on_disconnect(self, ident):

         logger.debug("listener:{0} disconnected".format(ident))

         if ident in self.connectors:

              self.connectors.pop(ident)

 

     '''

     listen start loop

     '''

     def _initlistener(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_listener((self.ip, self.port))

 

              self.pktr = snakemq.packeter.Packeter(self.link)

              self.pktr.on_connect.add(self.on_connect)

              self.pktr.on_disconnect.add(self.on_disconnect)

 

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

             

              self.messaging.on_message_recv.add(self.on_recv_message)

              self.messaging.on_message_drop.add(self.on_drop_message)

 

         except Exception as e:

              logger.error("listener:{0}".format(e))

         finally:

              logger.info("listener:loop ended...")

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("listener:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)
Copier après la connexion

Connecteur de code de test (testSnakeConnector.py ):

Lisez un fichier local de 1 Mo, puis envoyez-le à l'auditeur, puis l'auditeur renvoie un message de bonjour.

from netComm.snakemq import snakemqConnector

import time

import sys

import os

def received(ident, data):

     print(data)

 

if __name__ == "__main__":

     bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)

     bob.on_recv.add(received)

     bob.start()

     try:

         with open("testfile.txt",encoding='utf-8') as f:

              txt = f.read()

              for i in range(100):

                  bob.sendMsg("niess",txt.encode('utf-8'))

                  time.sleep(0.1)

     except Exception as e:

         print(e)

     time.sleep(5)

     bob.terminate()   

 

测试代码listener(testSnakeListener.py):

from netComm.snakemq import snakemqListener

import time

 

def received(ident, data):

     filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))

     file = open(filename,'w')

     file.writelines(data)

     file.close()

 

if __name__ == "__main__":

     niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)

     niess.on_recv.add(received)

     niess.start()

     print("niess start...")

     time.sleep(60)

     niess.terminate()  

     print("niess end...")
Copier après la connexion


Pour plus d'articles liés à l'utilisation du package de file d'attente de messages de Python, SnakeMQ, veuillez faire attention au PHP Site chinois !

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
2 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
Repo: Comment relancer ses coéquipiers
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island Adventure: Comment obtenir des graines géantes
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
Combien de temps faut-il pour battre Split Fiction?
3 Il y a quelques semaines By DDD

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Comment utiliser Python pour trouver la distribution ZIPF d'un fichier texte Comment utiliser Python pour trouver la distribution ZIPF d'un fichier texte Mar 05, 2025 am 09:58 AM

Ce tutoriel montre comment utiliser Python pour traiter le concept statistique de la loi de Zipf et démontre l'efficacité de la lecture et du tri de Python de gros fichiers texte lors du traitement de la loi. Vous vous demandez peut-être ce que signifie le terme distribution ZIPF. Pour comprendre ce terme, nous devons d'abord définir la loi de Zipf. Ne vous inquiétez pas, je vais essayer de simplifier les instructions. La loi de Zipf La loi de Zipf signifie simplement: dans un grand corpus en langage naturel, les mots les plus fréquents apparaissent environ deux fois plus fréquemment que les deuxième mots fréquents, trois fois comme les troisième mots fréquents, quatre fois comme quatrième mots fréquents, etc. Regardons un exemple. Si vous regardez le corpus brun en anglais américain, vous remarquerez que le mot le plus fréquent est "th

Comment utiliser la belle soupe pour analyser HTML? Comment utiliser la belle soupe pour analyser HTML? Mar 10, 2025 pm 06:54 PM

Cet article explique comment utiliser la belle soupe, une bibliothèque Python, pour analyser HTML. Il détaille des méthodes courantes comme find (), find_all (), select () et get_text () pour l'extraction des données, la gestion de diverses structures et erreurs HTML et alternatives (Sel

Filtrage d'image en python Filtrage d'image en python Mar 03, 2025 am 09:44 AM

Traiter avec des images bruyantes est un problème courant, en particulier avec des photos de téléphones portables ou de caméras basse résolution. Ce tutoriel explore les techniques de filtrage d'images dans Python à l'aide d'OpenCV pour résoudre ce problème. Filtrage d'image: un outil puissant Filtre d'image

Comment travailler avec des documents PDF à l'aide de Python Comment travailler avec des documents PDF à l'aide de Python Mar 02, 2025 am 09:54 AM

Les fichiers PDF sont populaires pour leur compatibilité multiplateforme, avec du contenu et de la mise en page cohérents sur les systèmes d'exploitation, les appareils de lecture et les logiciels. Cependant, contrairement aux fichiers de texte brut de traitement Python, les fichiers PDF sont des fichiers binaires avec des structures plus complexes et contiennent des éléments tels que des polices, des couleurs et des images. Heureusement, il n'est pas difficile de traiter les fichiers PDF avec les modules externes de Python. Cet article utilisera le module PYPDF2 pour montrer comment ouvrir un fichier PDF, imprimer une page et extraire du texte. Pour la création et l'édition des fichiers PDF, veuillez vous référer à un autre tutoriel de moi. Préparation Le noyau réside dans l'utilisation du module externe PYPDF2. Tout d'abord, l'installez en utilisant PIP: pip is p

Comment se cacher en utilisant Redis dans les applications Django Comment se cacher en utilisant Redis dans les applications Django Mar 02, 2025 am 10:10 AM

Ce tutoriel montre comment tirer parti de la mise en cache Redis pour augmenter les performances des applications Python, en particulier dans un cadre Django. Nous couvrirons l'installation redis, la configuration de Django et les comparaisons de performances pour mettre en évidence le bien

Comment effectuer l'apprentissage en profondeur avec TensorFlow ou Pytorch? Comment effectuer l'apprentissage en profondeur avec TensorFlow ou Pytorch? Mar 10, 2025 pm 06:52 PM

Cet article compare TensorFlow et Pytorch pour l'apprentissage en profondeur. Il détaille les étapes impliquées: préparation des données, construction de modèles, formation, évaluation et déploiement. Différences clés entre les cadres, en particulier en ce qui concerne le raisin informatique

Introduction à la programmation parallèle et simultanée dans Python Introduction à la programmation parallèle et simultanée dans Python Mar 03, 2025 am 10:32 AM

Python, un favori pour la science et le traitement des données, propose un écosystème riche pour l'informatique haute performance. Cependant, la programmation parallèle dans Python présente des défis uniques. Ce tutoriel explore ces défis, en se concentrant sur l'interprète mondial

Comment implémenter votre propre structure de données dans Python Comment implémenter votre propre structure de données dans Python Mar 03, 2025 am 09:28 AM

Ce didacticiel montre la création d'une structure de données de pipeline personnalisée dans Python 3, en tirant parti des classes et de la surcharge de l'opérateur pour une fonctionnalité améliorée. La flexibilité du pipeline réside dans sa capacité à appliquer une série de fonctions à un ensemble de données, GE

See all articles