译-PHP rabbitMQ Tutorial-3
Jun 23, 2016 pm 01:59 PM
(using php-amqplib)
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".
To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.
Essentially, published log messages are going to be broadcast to all the receivers.
In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.
Let's quickly go over what we covered in the previous tutorials:
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
它将消息推送到队列。交换器必须清楚地知道如何处理收到的消息。是应该把这个消息追加到一个特定的队列?还是追加到多个队列?亦或者应将其丢弃。 要做到这个,我们可以通过交换器类型(exchange type)来定义规则。
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:
集中可用的交换器类型:direct,topic,headers和fanout. 我们来看最后一个--fanout. 创建一个这种类型的交换器叫“logs".
1 |
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.
Listing exchanges(交换器列表)
To list the exchanges on the serve you can run the ever useful rabbitmqctl:
1 |
In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.
Nameless exchange(无名氏交换器)
In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which is identified by the empty string ("").
Recall how we published a message before:
1 |
Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish
我们在这用默认的或无名氏交换器:如果有routing_key,消息就会被路由到routing_key指定的队列,routing_key就是 basic_publish的第二个参数。
Now, we can publish to our named exchange instead:
1 |
Temporary queues(临时队列)
As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.
But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.
但那不是我们日志系统要用的情景。 我们是要收听到所有的日志消息,不是一部分。同样,我们只对当前最新的消息感兴趣,而不是那些陈旧的。两件事儿来解决这个问题。
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.
Secondly, once we disconnect the consumer the queue should be automatically deleted.
In the php-amqplib client, when we supply queue name as an empty string, we create a non-durable queue with a generated name:
1 |
When the method returns, the $queue_name variable contains a random queue name generated by RabbitMQ. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
When the connection that declared it closes, the queue will be deleted because it is declared as exclusive.
We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
1 |
From now on the logs exchange will append messages to our queue.
Listing bindings(捆绑列表)
You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.
已经猜到了吧,用rabbitmqctl list_bindings来列出现存的捆绑使用。
Putting it all together(合体!!!又来!!哈哈哈)
The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.php script:
1 |
(emit_log.php source)
As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.
The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.
The code for receive_logs.php:(receive_logs.php代码)
1 |
(receive_logs.php source)
If you want to save logs to a file, just open a console and type:
1 |
If you wish to see the logs on your screen, spawn a new terminal and run:
1 |
And of course, to emit logs type:
1 |
Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.rb programs running you should see something like:
使用rabbitmqctl list_bindings你可以验证代码有木有真的按照我们的期许创建捆绑和队列。运行俩receive_logs.php(这里官方文档copy有误,.rb的ruby文件扩展名还没改回来),有应该会看到像这样:
1 |
The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.
To find out how to listen for a subset of messages, let's move on to tutorial 4
要想知道怎么收听部分消息嘛,请听下回分解~~~~~~~~~~~ :)

