1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| import json import datetime import time import random import pika from pika.exceptions import ChannelClosed, ConnectionClosed
MQ_CONFIG = { "hostname": "127.0.0.1", "port": 5672, "vhost": "my_vhost", "username": "admin", "password": "adminxxx", "exchange": "my_exchange", "queue": "my_queue", "routing_key": "my_key" }
class RabbitMQServer(object): def __init__(self): self.config = MQ_CONFIG self.host = self.config.get("hostname") self.port = self.config.get("port") self.username = self.config.get("username") self.password = self.config.get("password") self.vhost = self.config.get("vhost") self.exchange = self.config.get("exchange") self.queue = self.config.get("queue") self.routing_key = self.config.get("routing_key")
self.connection = None self.channel = None
self.arguments = { 'x-message-ttl': 82800000, 'x-expires': 82800000, 'x-max-length': 100000, 'x-max-priority': 10 }
def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close()
credentials = pika.PlainCredentials(self.username, self.password) parameters = pika.ConnectionParameters(self.host, self.port, self.vhost, credentials)
self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct", durable=True) self.channel.queue_declare(queue=self.queue, exclusive=False, durable=True, arguments=self.arguments) self.channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)
if isinstance(self, RabbitComsumer): self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(on_message_callback=self.consumer_callback, queue=self.queue, auto_ack=False)
except Exception as e: print("RECONNECT: ", e)
class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__()
def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=json.dumps(message)) print("Publish value: ", i) i += 1 time.sleep(3) except ConnectionClosed as e: print("ConnectionClosed: ", e) self.reconnect() time.sleep(2) except ChannelClosed as e: print("ChannelClosed: ", e) self.reconnect() time.sleep(2) except Exception as e: print("basic_publish: ", e) self.reconnect() time.sleep(2)
class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__()
def execute(self, body): body = body.decode('utf8') body = json.loads(body) print(body["value"]) return True
def consumer_callback(self, channel, method, properties, body): result = self.execute(body) if channel.is_open: if result: channel.basic_ack(delivery_tag=method.delivery_tag) else: channel.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True) if not channel.is_open: print("Callback 接收频道关闭,无法ack")
def start_consumer(self): self.reconnect() while True: try: self.channel.start_consuming() except ConnectionClosed as e: print("ConnectionClosed: ", e) self.reconnect() time.sleep(2) except ChannelClosed as e: print("ChannelClosed: ", e) self.reconnect() time.sleep(2) except Exception as e: print("consuming: ", e) self.reconnect() time.sleep(2)
|