rabbitmq 发送与接收消息之间的过程

BUG 0
k9606
k9606 2019-11-27 19:40:07
山西女婿...

生产者发送消息

  • 连接服务节点, 开启信道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
  • 声明交换器并设置属性 ( 类型, 持久化, ... )
$exchangeName = 'exchange_name';
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
  • 声明队列并设置属性 ( 排他, 持久化, 自动删除, ... )
$queueName = 'queue_name';
$channel->queue_declare($queueName, false, false, false, false);
  • 通过路由键绑定交换器与队列
$routingKey = 'routing_key';
$channel->queue_bind($queueName, $exchangeName, $routingKey);
  • 发送消息并带上路由键, 交换器等信息到服务节点
$message = new AMQPMessage('message');
$channel->basic_publish($message, $exchangeName, $routingKey);
  • 交换器根据路由键查找队列
    • 若找到, 则将消息存入队列
    • 若没找到, 则选择丢弃消息或退回消息
  • 关闭信道
$channel->close();
  • 关闭连接
$connection->close();

消费者接收消息

  • 连接服务节点, 开启信道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
  • 向服务节点请求某队列中的消息, 回调
$queueName = 'queue_name';
$channel->queue_declare($queueName, false, false, false, false);

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}
  • 服务节点回应并发送消息, 消费者接收
$callback = function ($message) {
    echo "$message->body \r\n";

    ...
};
  • 消费者确认
$callback = function ($message) {
    ...

    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
  • 服务节点删除被确认的消息
  • 关闭信道
$channel->close();
  • 关闭连接
$connection->close();

###完整代码

public function Producer()
{
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $exchangeName = 'exchange_name';
    // 第四个参数 true 表示持久化, 重启服务器不会丢失
    // 第五个参数 true 表示自动删除, 前提是至少有一个队列或交换器与它绑定
    $channel->exchange_declare($exchangeName, 'direct', false, false, false);

    $queueName = 'queue_name';
    // 第三个参数 true 表示持久化, 重启服务器不会丢失
    // 第四个参数 true 表示排他, 基于连接, 无法持久化, 适用于同时发送与读取场景
    // 第五个参数 true 表示自动删除, 前提是至少有一个消费者连接到它
    $channel->queue_declare($queueName, false, false, false, false);

    $routingKey = 'routing_key';
    $channel->queue_bind($queueName, $exchangeName, $routingKey);

    $message = new AMQPMessage('message');
    $channel->basic_publish($message, $exchangeName, $routingKey);

    $channel->close();
    $connection->close();
}

public function Consumer()
{
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queueName = 'queue_name';
    $channel->queue_declare($queueName, false, false, false, false);

    $callback = function ($message) {
        echo "$message->body \r\n";

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    };

    // 推模式
    // 第二个参数是消费者标签, 区分多个消费者
    // 第三个参数 true 表示不能将同一连接中生产者的消息发送给同连接中的消费者
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);

    // 拉模式, 获取单条消息使用
    /*$message = $channel->basic_get($queueName);
    if ($message) {
        echo "$message->body \r\n";

        $channel->basic_ack($message->delivery_info['delivery_tag']);
    }*/

    while ($channel->is_consuming()) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

简要版

生产
  1. 连接服务节点, 开启信道
  2. 声明交换器并设置属性 ( 类型, 持久化, ... )
  3. 声明队列并设置属性 ( 排他, 持久化, 自动删除, ... )
  4. 通过路由键绑定交换器与队列
  5. 发送消息并带上路由键, 交换器等信息到服务节点
  6. 交换器根据路由键查找队列
    • 若找到, 则将消息存入队列
    • 若没找到, 则选择丢弃消息或退回消息
  7. 关闭信道
  8. 关闭连接
消费
  1. 连接服务节点, 开启信道
  2. 向服务节点请求某队列中的消息, 回调
  3. 服务节点回应并发送消息, 消费者接收
  4. 消费者确认
  5. 服务节点删除被确认的消息
  6. 关闭信道
  7. 关闭连接
回复
  • 暂无回复