RabbitMQ學習筆記之基本概念介紹

2019-08-22     IT技術分享

RabbitMQ , 是一個使用 erlang 編寫的 AMQP(高級消息隊列協議) 的服務實現. 簡單來說, 就是一個功能強大的消息隊列服務.流程上來說,是發消息者(producer)把消息放到隊列(queue)中去,然後收消息者(consumer)從隊列中取出消息. RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在 發消息者 和 隊列 之間, 加入了 交換器 (Exchange) . 這樣 發消息者 和 隊列 就沒有直接聯繫, 轉而變成 發消息者 把消息給 交換器 , 交換器 根據調度策略再把消息再給 隊列 。

rabbitmq 中幾個比價重要的概念如下:

rabbitmq

消息隊列運轉過程:


使用示例

下面介紹下 python 如何使用 rabbitmq ,這裡假定你已經有了 rabbitmq 的環境並且已經配置好了,下面只介紹如何使用。 生產者:

import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
# 從連接上獲取channel
channel = connection.channel()
# 定義名為testexchange的交換機類型是fanout,交換機支持持久化
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
# 定義名為hello的隊列,設置其支持持久化
channel.queue_declare(queue='hello', durable=True)
# 將hello隊列綁定到我們定義的testexchange交換機上
channel.queue_bind(exchange='testexchange', queue='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

消費者:

import pika
from time import sleep
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

這裡在消費者和生產者都定義了同樣的隊列,這樣做是因為你不知道消費者還是生產者哪個會先啟動起來。 我們這裡為交換機,隊列,消息都設置了 durable=True 使其支持持久化,這樣在當rabbitmq異常退出之後,你的消息不至於丟失。

交換機類型

參考在上面的生產者中定義交換機的代碼:

channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

其中 exchange_type='fanout' 就是設置交換機的類型, RabbitMQ 常用的交換器類型有 fanout 、 direct 、 topic 、 headers 這四種,下面分別介紹下。

fanout

它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。 參考如下代碼:

import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello')
channel.queue_bind(exchange='testexchange', queue='hello1')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

運行結果如下(這裡我都沒有啟動consumer):

這裡可以看到, hello 和 hello1 隊列中都有消息進入,而 hello2 沒有,因為他沒有綁定。

direct

direct類型的交換器路由規則也很簡單,它會把消息路由到那些RoutingKey完全匹配的隊列中。 參考如下代碼:

import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello1')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

運行結果如下:

這裡可以看到, hello 和 hello3 隊列中都有消息進入,而 hello2 沒有,因為他綁定的 routing_key 不是 hello 。

topic

topic類型的交換器在匹配規則上進行了擴展,它與direct類型的交換器相似,也是將消息路由到routing_key相匹配的隊列中。routing_key中可以存在兩種特殊字符串 * 和 # ,用於做模糊匹配,其中 * 用於匹配一個單詞, # 用於匹配多規格單詞(可以是零個)。 參考如下代碼:

import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='topic', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello_1.*.*')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello_1.#')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='hello_1', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent 'Hello World!{}'".format(i))
channel.basic_publish(exchange='testexchange', routing_key='hello_1.a.b', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
connection.close()

運行結果如下:

我們發出了兩條消息, hello_1 只會被 hello_1.# 匹配到,而 hello_1.a.b 會被兩個都匹配到。

headers

headers類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 消息被投送到對應隊列.

import pika
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='headers', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello', arguments={'a': '1'})
channel.queue_bind(exchange='testexchange', queue='hello1', arguments={'b': '2', 'c': '3', 'x-match': 'all'})
channel.queue_bind(exchange='testexchange', queue='hello2', arguments={'a': '1', 'b': '4', 'c': '5', 'x-match': 'any'})
for i in range(10):
channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
headers={'a': '1'}
))
print(" [x] Sent 'Hello World!{}'".format(i))
channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
headers={'a': '1', 'b': '2'}
))
connection.close()

end:如果你覺得本文對你有幫助的話,記得關注點贊轉發,你的支持就是我更新動力。

文章來源: https://twgreatdaily.com/zh-tw/gczfgG4BMH2_cNUgbTfF.html