<?php
/**
 * Created by 2022/12/3.
 * User: Jone
 * Info: 2022/12/3
 * Time: 下午4:08
 */

namespace App\Logic;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MqLogic
{
    public function pushAmq($exchange, $route_key, $queue_name, $content='')
    {
        $connection = new AMQPStreamConnection(env('RABBITMQ_HOST'), env('RABBITMQ_PORT'), env('RABBITMQ_LOGIN'), env('RABBITMQ_PASSWORD'), env('RABBITMQ_VHOST')); // 创建连接

        $channel = $connection->channel();

        $channel->exchange_declare($exchange, 'direct', false, true, false); // 初始化交换机
        $channel->queue_declare($queue_name, false, true, false, false); // 初始化队列
        $channel->queue_bind($queue_name, $exchange, $route_key);  // 将队列与某个交换机进行绑定,并使用路由关键字

        $message = new AMQPMessage($content);
        $channel->basic_publish($message, '', $queue_name); // 推送消息

        $channel->close();
        $connection->close();
    }

    public function pullAmq($queue_name='')
    {
        $connection = new AMQPStreamConnection(env('RABBITMQ_HOST'), env('RABBITMQ_PORT'), env('RABBITMQ_LOGIN'), env('RABBITMQ_PASSWORD'), env('RABBITMQ_VHOST')); // 创建连接

        $channel = $connection->channel();

        $message = $channel->basic_get($queue_name); // 取出消息

        $channel->basic_ack($message->delivery_info['delivery_tag']); // 确认取出消息后会发送一个ack来确认取出来了,然后会从rabbitmq中将这个消息移除,如果删掉这段代码,会发现rabbitmq中的消息还是没有减少

        $channel->close();
        $connection->close();

        return $message;
    }
}