메시지 대기열을 사용하면 데이터 통신에 많은 이점이 있습니다. SnakeMQ는 Python으로 구현된 오픈 소스 크로스 플랫폼 MQ 라이브러리입니다. Python의 메시지 대기열 패키지 SnakeMQ 사용에 대한 예비 연구는 다음과 같습니다.
1. snakemq에 대한 공식 소개
SnakeMQ의 GitHub 프로젝트 페이지: https://github.com/dsiroky/snakemq1. 순수 Python 구현, 크로스 플랫폼
2. 연결 다시 시작
3. 안정적인 전송-구성 가능한 메시지 모드 및 메시지 시간 초과 모드
4. 영구/임시 대기열
5. 비동기식 지원--폴링()
6.대칭 -- 단일 TCP 연결을 이중 통신에 사용할 수 있습니다.
7. 다중 데이터베이스 지원 -- SQLite, MongoDB...
8.brokerless - 유사 ZeroMQ 구현 원리
9. 확장 모듈: RPC, 대역폭 조절
위 내용은 모두 공식적인 단어이며 직접 확인해야 합니다.
2. 여러 주요 문제 설명
1. 자동 재접속 지원, 하트비트 로직을 직접 작성할 필요가 없습니다. 보내고 받는 데만 집중하세요
2. 데이터 지속성을 지원합니다. 지속성이 시작되면 재접속 후 자동으로 데이터가 전송됩니다.
3. Snakemq는 콜백을 제공하여 데이터 수신을 구현합니다. 수신 메소드를 작성하고 콜백 목록에 추가하기만 하면 됩니다.
4. 여기에 전송되는 데이터는 바이트형(바이너리)이므로 변환이 필요합니다. 프로그램에서 테스트하는 것은 모두 텍스트 문자열입니다. str.encode('utf-8')를 사용하여 이를 바이트로 변환한 다음 수신 시 다시 변환합니다.
5. 용어 설명, 커넥터: 소켓과 유사한 TcpClient, 소켓과 유사한 TcpServer. 각 커넥터 또는 리스너는 데이터를 보내고 받을 때 누구의 데이터인지 알 수 있습니다.
6. sqlite 지속성을 사용하는 경우 sqlite에 대한 멀티 스레드 액세스 문제를 해결하려면 소스 코드 sqlite3.connect(filename, check_same_thread = False)를 수정해야 합니다. (교착상태가 발생할까요?)
7. 지속성 시작 시 연결이 다시 연결되면 안정성을 보장하기 위해 자동으로 전송됩니다.
8. Encapsulation을 위해 데이터를 받은 후 콜백을 통해 보낸다.
3. 코드
코드에 커스텀 로그 모듈을 사용했다는 설명
from common import nxlogger import snakemqlogger as logger
는 로깅으로 대체 가능합니다.
콜백 클래스(callbacks.py):
# -*- coding:utf-8 -*- '''synchronized callback''' class Callback(object): def __init__(self): self.callbacks = [] def add(self, func): self.callbacks.append(func) def remove(self, func): self.callbacks.remove(func) def __call__(self, *args, **kwargs): for callback in self.callbacks: callback(*args, **kwargs)
커넥터 클래스(snakemqConnector.py):
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from snakemq.storage.sqlite import SqliteQueuesStorage from snakemq.message import FLAG_PERSISTENT from common.callbacks import Callback from common import nxlogger import snakemqlogger as logger class SnakemqConnector(threading.Thread): def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False): super(SnakemqConnector,self).__init__() self.messaging = None self.link = None self.snakemqident = snakemqident self.pktr = None self.remoteIp = remoteIp self.remotePort = remotePort self.persistent = persistent self.on_recv = Callback() self._initConnector() def run(self): logger.info("connector start...") if self.link != None: self.link.loop() logger.info("connector end...") def terminate(self): logger.info("connetor terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("connetor terminated") def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data except Exception as e: logger.error("connector recv:{0}".format(e)) print(e) '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("connector:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg) ''' ''' def _initConnector(self): try: self.link = snakemq.link.Link() self.link.add_connector((self.remoteIp, self.remotePort)) self.pktr = snakemq.packeter.Packeter(self.link) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) except Exception as e: logger.error("connector:{0}".format(e)) finally: logger.info("connector[{0}] loop ended...".format(self.snakemqident))
리스너 클래스(snakemqListener.py):
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from common import nxlogger import snakemqlogger as logger from common.callbacks import Callback class SnakemqListener(threading.Thread): def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False): super(SnakemqListener,self).__init__() self.messaging = None self.link = None self.pktr = None self.snakemqident = snakemqident self.ip = ip; self.port = port self.connectors = {} self.on_recv = Callback() self.persistent = persistent self._initlistener() ''' thread run ''' def run(self): logger.info("listener start...") if self.link != None: self.link.loop() logger.info("listener end...") ''' terminate snakemq listener thread ''' def terminate(self): logger.info("listener terminating...") if self.link != None: self.link.stop() self.link.cleanup() logger.info("listener terminated") ''' receive message from host named ident ''' def on_recv_message(self, conn, ident, message): try: self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8')) except Exception as e: logger.error("listener recv:{0}".format(e)) print(e) def on_drop_message(self, ident, message): print("message dropped", ident, message) logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message)) '''client connect''' def on_connect(self, ident): logger.debug("listener:{0} connected".format(ident)) self.connectors[ident] = ident self.sendMsg(ident, "hello".encode('utf-8')) '''client disconnect''' def on_disconnect(self, ident): logger.debug("listener:{0} disconnected".format(ident)) if ident in self.connectors: self.connectors.pop(ident) ''' listen start loop ''' def _initlistener(self): try: self.link = snakemq.link.Link() self.link.add_listener((self.ip, self.port)) self.pktr = snakemq.packeter.Packeter(self.link) self.pktr.on_connect.add(self.on_connect) self.pktr.on_disconnect.add(self.on_disconnect) if self.persistent: storage = SqliteQueuesStorage("SnakemqStorage.db") self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage) else: self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr) self.messaging.on_message_recv.add(self.on_recv_message) self.messaging.on_message_drop.add(self.on_drop_message) except Exception as e: logger.error("listener:{0}".format(e)) finally: logger.info("listener:loop ended...") '''send message to dest host named destIdent''' def sendMsg(self, destIdent, byteseq): msg = None if self.persistent: msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT) else: msg = snakemq.message.Message(byteseq, ttl=60) if self.messaging == None: logger.error("listener:messaging is not initialized, send message failed") return self.messaging.send_message(destIdent, msg)
테스트 코드 커넥터(testSnakeConnector.py):
로컬 1M 파일을 읽은 다음 이를 리스너에게 보내고 리스너는 다시 hello 메시지를 보냅니다.
from netComm.snakemq import snakemqConnector import time import sys import os def received(ident, data): print(data) if __name__ == "__main__": bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True) bob.on_recv.add(received) bob.start() try: with open("testfile.txt",encoding='utf-8') as f: txt = f.read() for i in range(100): bob.sendMsg("niess",txt.encode('utf-8')) time.sleep(0.1) except Exception as e: print(e) time.sleep(5) bob.terminate() 测试代码listener(testSnakeListener.py): from netComm.snakemq import snakemqListener import time def received(ident, data): filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime())) file = open(filename,'w') file.writelines(data) file.close() if __name__ == "__main__": niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002) niess.on_recv.add(received) niess.start() print("niess start...") time.sleep(60) niess.terminate() print("niess end...")
Python의 메시지 대기열 패키지 SnakeMQ 사용과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트를 참고하세요. !