首頁 > 後端開發 > Python教學 > python中pika模組的相關問題介紹(附程式碼)

python中pika模組的相關問題介紹(附程式碼)

不言
發布: 2018-10-13 14:30:38
轉載
3287 人瀏覽過

這篇文章帶給大家的內容是關於python中pika模組的相關問題介紹(附程式碼),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

工作中經常用到rabbitmq,而用的語言主要是python,所以也就經常會用到python中的pika模組,但是這個模組的使用,也給我帶了很多問題,這裡整理一下關於這個模組我在使用過程的改變歷程已經中間碰到一些問題的解決方法

剛開寫代碼的小菜鳥

##在最開始使用這個rabbitmq的時候,因為本身業​​務需求,我的程式既需要從rabbitmq消費訊息,也需要給rabbitmq發布訊息,程式碼的邏輯圖為如下:

 

#下面是我的模擬程式碼:

#! /usr/bin/env python3
# .-*- coding:utf-8 .-*-
import pika
import time
import threading
import os
import json
import datetime
from multiprocessing import Process

# rabbitmq 配置信息
MQ_CONFIG = {
    "host": "192.168.90.11",
    "port": 5672,
    "vhost": "/",
    "user": "guest",
    "passwd": "guest",
    "exchange": "ex_change",
    "serverid": "eslservice",
    "serverid2": "airservice"
}
class RabbitMQServer(object):
    _instance_lock = threading.Lock()
    def __init__(self, recv_serverid, send_serverid):
        # self.serverid = MQ_CONFIG.get("serverid")
        self.exchange = MQ_CONFIG.get("exchange")
        self.channel = None
        self.connection = None
        self.recv_serverid = recv_serverid
        self.send_serverid = send_serverid

    def reconnect(self):
        if self.connection and not self.connection.is_closed():
            self.connection.close()

        credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))
        parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),
                                               credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")
        result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)
        self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)
    def consumer_callback(self, channel, method, properties, body):
        """
        消费消息
        :param channel:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)
        process_id = os.getpid()
        print("current process id is {0} body is {1}".format(process_id, body))
    def publish_message(self, to_serverid, message):
        """
        发布消息
        :param to_serverid:
        :param message:
        :return:
        """
        message = dict_to_json(message)
        self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message)

    def run(self):
        while True:
            self.channel.start_consuming()
    @classmethod
    def get_instance(cls, *args, **kwargs):
        """
        单例模式
        :return:
        """
        if not hasattr(cls, "_instance"):
            with cls._instance_lock:
                if not hasattr(cls, "_instance"):
                    cls._instance = cls(*args, **kwargs)
        return cls._instance
def process1(recv_serverid, send_serverid):
    """
    用于测试同时订阅和发布消息
    :return:
    """
    # 线程1 用于去 从rabbitmq消费消息
    rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid)
    rabbitmq_server.reconnect()
    recv_threading = threading.Thread(target=rabbitmq_server.run)
    recv_threading.start()
    i = 1
    while True:
        # 主线程去发布消息
        message = {"value": i}
        rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message)
        i += 1
        time.sleep(0.01)
class CJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, datetime.date):
            return obj.strftime("%Y-%m-%d")
        else:
            return json.JSONEncoder.default(self, obj)


def dict_to_json(po):
    jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder)
    return jsonstr


def json_to_dict(jsonstr):
    if isinstance(jsonstr, bytes):
        jsonstr = jsonstr.decode("utf-8")
    d = json.loads(jsonstr)
    return d


if __name__ == '__main__':
    recv_serverid = MQ_CONFIG.get("serverid")
    send_serverid = MQ_CONFIG.get("serverid2")
    # 进程1 用于模拟模拟程序1 
    p = Process(target=process1, args=(recv_serverid, send_serverid, ))
    p.start()
    
    # 主进程用于模拟程序2
    process1(send_serverid, recv_serverid)
登入後複製


上面是我的將我的實際程式碼更改的測試模組,其實就是模擬實際業務中,我的rabbitmq模組既有訂閱訊息,又有發布訊息的時候,同時,

訂閱訊息和發布訊息用的同一個rabbitmq連接的同一個channel

但是這段程式碼運行之後基本上沒有運行多久就會看到以下錯誤訊息:

Traceback (most recent call last):
  File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1
    rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message)
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message
    self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish
    immediate=immediate)
  File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish
    raise exceptions.ChannelClosed()
pika.exceptions.ChannelClosed






Traceback (most recent call last):
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module>
    process1(send_serverid, recv_serverid)
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1
    rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message)
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message
    self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish
    immediate=immediate)
  File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish
    raise exceptions.ChannelClosed()
pika.exceptions.ChannelClosed
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/app/python3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run
    self.channel.start_consuming()
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
    self._flush_output(common_terminator)
  File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
    result.reason_text)
pika.exceptions.ConnectionClosed: (505, &#39;UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead&#39;)
登入後複製

而這個時候你查看rabbitmq服務的日誌訊息,你會看到兩種情況的錯誤日誌如下:

情況一:

=INFO REPORT==== 12-Oct-2018::18:32:37 ===
accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)

=INFO REPORT==== 12-Oct-2018::18:32:37 ===
accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672)

=ERROR REPORT==== 12-Oct-2018::18:32:38 ===
AMQP connection <0.19446.2> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected content header for class 60, got non content header frame instead",
            &#39;basic.publish&#39;}

=INFO REPORT==== 12-Oct-2018::18:32:38 ===
closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672)

=ERROR REPORT==== 12-Oct-2018::18:33:59 ===
AMQP connection <0.19439.2> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected content header for class 60, got non content header frame instead",
            &#39;basic.publish&#39;}

=INFO REPORT==== 12-Oct-2018::18:33:59 ===
closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)
登入後複製

情況二:

=INFO REPORT==== 12-Oct-2018::17:41:28 ===
accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672)
=INFO REPORT==== 12-Oct-2018::17:41:28 ===
accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)
=ERROR REPORT==== 12-Oct-2018::17:41:29 ===
AMQP connection <0.19045.2> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected content body, got non content body frame instead",
            &#39;basic.publish&#39;}
=INFO REPORT==== 12-Oct-2018::17:41:29 ===
closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672)
=ERROR REPORT==== 12-Oct-2018::17:42:23 ===
AMQP connection <0.19052.2> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected method frame, got non method frame instead",none}
=INFO REPORT==== 12-Oct-2018::17:42:23 ===
closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)
登入後複製

對於這種情況我查詢了很多資料和文檔,都沒有找到一個很好的答案,查到關於這個問題的連接有:

https:/ /stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame

http://rabbitmq.1065348.n5.nabble.com/UNEXPECTED-FRrabbitmq.1065348.n5.nabble.com/UNEXPECTED-FRAME-expected- content-header-for-class-60-got-non-content-header-frame-instead-td34981.html

這個問題其他人碰到的也不少,不過查了最後的解決辦法基本都是創建兩個rabbitmq連接,一個連接用於訂閱訊息,一個連接用於發布訊息,這種情況的時候,就不會出現上述的問題

在這個解決方法之前,我測試了用同一個連接,不同的channel,讓訂閱訊息用一個channel, 發布訊息用另外一個channel,但是在測試過程依然會出現上述的錯誤。

有點寫程式碼能力了

最後我也是選擇了用兩個連接的方法來解決出現上述的問題,現在是測試程式碼範例:

#! /usr/bin/env python3
# .-*- coding:utf-8 .-*-


import pika
import threading
import json
import datetime
import os


from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed


# rabbitmq 配置信息
MQ_CONFIG = {
    "host": "192.168.90.11",
    "port": 5672,
    "vhost": "/",
    "user": "guest",
    "passwd": "guest",
    "exchange": "ex_change",
    "serverid": "eslservice",
    "serverid2": "airservice"
}


class RabbitMQServer(object):
    _instance_lock = threading.Lock()

    def __init__(self):
        self.recv_serverid = ""
        self.send_serverid = ""
        self.exchange = MQ_CONFIG.get("exchange")
        self.connection = None
        self.channel = None

    def reconnect(self):
        if self.connection and not self.connection.is_closed:
            self.connection.close()

        credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))
        parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),
                                               credentials)
        self.connection = pika.BlockingConnection(parameters)

        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")

        if isinstance(self, RabbitComsumer):
            result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)
            queue_name = result.method.queue
            self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)
            self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)


class RabbitComsumer(RabbitMQServer):

    def __init__(self):
        super(RabbitComsumer, self).__init__()

    def consumer_callback(self, ch, method, properties, body):
        """
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        ch.basic_ack(delivery_tag=method.delivery_tag)
        process_id = threading.current_thread()
        print("current process id is {0} body is {1}".format(process_id, body))

    def start_consumer(self):
        while True:
            self.reconnect()
            self.channel.start_consuming()

    @classmethod
    def run(cls, recv_serverid):
        consumer = cls()
        consumer.recv_serverid = recv_serverid
        consumer.start_consumer()


class RabbitPublisher(RabbitMQServer):

    def __init__(self):
        super(RabbitPublisher, self).__init__()

    def start_publish(self):
        self.reconnect()
        i = 1
        while True:
            message = {"value": i}
            message = dict_to_json(message)
            self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message)
            i += 1
    @classmethod
    def run(cls, send_serverid):
        publish = cls()
        publish.send_serverid = send_serverid
        publish.start_publish()
class CJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime(&#39;%Y-%m-%d %H:%M:%S&#39;)
        elif isinstance(obj, datetime.date):
            return obj.strftime("%Y-%m-%d")
        else:
            return json.JSONEncoder.default(self, obj)
def dict_to_json(po):
    jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder)
    return jsonstr
def json_to_dict(jsonstr):
    if isinstance(jsonstr, bytes):
        jsonstr = jsonstr.decode("utf-8")
    d = json.loads(jsonstr)
    return d

if __name__ == &#39;__main__&#39;:
    recv_serverid = MQ_CONFIG.get("serverid")
    send_serverid = MQ_CONFIG.get("serverid2")
    # 这里分别用两个线程去连接和发送
    threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start()
    threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start()
    # 这里也是用两个连接去连接和发送,
    threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start()
    RabbitPublisher.run(recv_serverid)
登入後複製

上面程式碼中我分別用了兩個連接去訂閱和發布消息,同時另外一對訂閱發布也是用的兩個連接來執行訂閱和發布,這樣當再次運行程序之後,就不會在出現之前的問題

關於斷開重連

上面的程式碼雖然不會在出現之前的錯誤,但是這個程式非常脆弱,當rabbitmq服務重新啟動或斷開之後,程式並不會有重連線的機制,所以我們需要為程式碼添加重連機制,這樣即使rabbitmq服務重啟了或者

rabbitmq出現異常我們的程式也能進行重連機制

#! /usr/bin/env python3
# .-*- coding:utf-8 .-*-


import pika
import threading
import json
import datetime
import time


from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed


# rabbitmq 配置信息
MQ_CONFIG = {
    "host": "192.168.90.11",
    "port": 5672,
    "vhost": "/",
    "user": "guest",
    "passwd": "guest",
    "exchange": "ex_change",
    "serverid": "eslservice",
    "serverid2": "airservice"
}


class RabbitMQServer(object):
    _instance_lock = threading.Lock()

    def __init__(self):
        self.recv_serverid = ""
        self.send_serverid = ""
        self.exchange = MQ_CONFIG.get("exchange")
        self.connection = None
        self.channel = None

    def reconnect(self):
        try:

            if self.connection and not self.connection.is_closed:
                self.connection.close()

            credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))
            parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),
                                                   credentials)
            self.connection = pika.BlockingConnection(parameters)

            self.channel = self.connection.channel()
            self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")

            if isinstance(self, RabbitComsumer):
                result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)
                queue_name = result.method.queue
                self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)
                self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)
        except Exception as e:
            print(e)


class RabbitComsumer(RabbitMQServer):

    def __init__(self):
        super(RabbitComsumer, self).__init__()

    def consumer_callback(self, ch, method, properties, body):
        """
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        ch.basic_ack(delivery_tag=method.delivery_tag)
        process_id = threading.current_thread()
        print("current process id is {0} body is {1}".format(process_id, body))

    def start_consumer(self):
        while True:
            try:
                self.reconnect()
                self.channel.start_consuming()
            except ConnectionClosed as e:
                self.reconnect()
                time.sleep(2)
            except ChannelClosed as e:
                self.reconnect()
                time.sleep(2)
            except Exception as e:
                self.reconnect()
                time.sleep(2)

    @classmethod
    def run(cls, recv_serverid):
        consumer = cls()
        consumer.recv_serverid = recv_serverid
        consumer.start_consumer()


class RabbitPublisher(RabbitMQServer):

    def __init__(self):
        super(RabbitPublisher, self).__init__()

    def start_publish(self):
        self.reconnect()
        i = 1
        while True:
            message = {"value": i}
            message = dict_to_json(message)
            try:
                self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message)
                i += 1
            except ConnectionClosed as e:
                self.reconnect()
                time.sleep(2)
            except ChannelClosed as e:
                self.reconnect()
                time.sleep(2)
            except Exception as e:
                self.reconnect()
                time.sleep(2)

    @classmethod
    def run(cls, send_serverid):
        publish = cls()
        publish.send_serverid = send_serverid
        publish.start_publish()


class CJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime(&#39;%Y-%m-%d %H:%M:%S&#39;)
        elif isinstance(obj, datetime.date):
            return obj.strftime("%Y-%m-%d")
        else:
            return json.JSONEncoder.default(self, obj)


def dict_to_json(po):
    jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder)
    return jsonstr


def json_to_dict(jsonstr):
    if isinstance(jsonstr, bytes):
        jsonstr = jsonstr.decode("utf-8")
    d = json.loads(jsonstr)
    return d

if __name__ == &#39;__main__&#39;:
    recv_serverid = MQ_CONFIG.get("serverid")
    send_serverid = MQ_CONFIG.get("serverid2")
    # 这里分别用两个线程去连接和发送
    threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start()
    threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start()
    # 这里也是用两个连接去连接和发送,
    threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start()
    RabbitPublisher.run(recv_serverid)
登入後複製

上面的程式碼運行運行之後即使rabbitmq的服務出問題了,但是當rabbitmq的服務好了之後,我們的程式仍然可以重新進行連接,但是上述這種實現方式運行了一段時間之後,因為實際的發布訊息的地方的消息是從其他線程或進程中獲取的數據,這個時候你可能透過queue隊列的方式實現,這個時候你的queue中如果長時間沒有數據,在一定時間之後來了數據需要發佈出去,這個時候你發現,你的程式會提示連線被rabbitmq 服務端給斷開了,但是畢竟你設定了重連機制,當然也可以重連,但是這裡想想為啥會出現這種情況,這個時候查看rabbitmq的日誌你會發現出現瞭如下錯誤:

=ERROR REPORT==== 8-Oct-2018::15:34:19 ===
closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672):
{heartbeat_timeout,running}
登入後複製

這是我之前測試環境的日誌截取的,可以看到是因為這個錯誤導致的,後來查看pika連接rabbitmq的連接參數中有這麼一個參數

#這個參數預設沒有設置,那麼這個heatbeat的心跳時間,預設是不設定的,如果不設定的話,就是根絕服務端設定的,因為這個心跳時間是和服務端進行協商的結果

當這個參數設定為0的時候則表示不發送心跳,服務端永遠不會斷開這個連接,所以這裡我為了方便我給發布訊息的線程的心跳設定為0,並且我這裡,我整理通過抓包,看一下服務端和客戶端的協商過程

#從抓包分析中可以看出服務端和客戶端首先協商的是580秒,而客戶端回覆的是:

這樣這個連線就永遠不會斷了,但是如果我們不設定heartbeat這個值,再次抓包我們會看到如下

從上圖我們可以刪除最後服務端和客戶端協商的結果就是580,這樣當時間到了之後,如果沒有資料往來,那麼就會出現連線被服務端斷開的情況了

特別注意

 需要特別注意的是,經過我實際測試python的pika==0.11.2 版本及以下版本設定heartbeat的不生效的,只有0.12.0及以上版本設定才能生效

以上是python中pika模組的相關問題介紹(附程式碼)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:cnblogs.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板