<?php /** * Created by 2022/12/3. * User: Jone * Info: 2022/12/3 * Time: 下午4:08 */ namespace App\Logic; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class MqLogic { public function pushAmq($exchange, $route_key, $queue_name, $content='') { $connection = new AMQPStreamConnection(env('RABBITMQ_HOST'), env('RABBITMQ_PORT'), env('RABBITMQ_LOGIN'), env('RABBITMQ_PASSWORD'), env('RABBITMQ_VHOST')); // 创建连接 $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, true, false); // 初始化交换机 $channel->queue_declare($queue_name, false, true, false, false); // 初始化队列 $channel->queue_bind($queue_name, $exchange, $route_key); // 将队列与某个交换机进行绑定,并使用路由关键字 $message = new AMQPMessage($content); $channel->basic_publish($message, '', $queue_name); // 推送消息 $channel->close(); $connection->close(); } public function pullAmq($queue_name='') { $connection = new AMQPStreamConnection(env('RABBITMQ_HOST'), env('RABBITMQ_PORT'), env('RABBITMQ_LOGIN'), env('RABBITMQ_PASSWORD'), env('RABBITMQ_VHOST')); // 创建连接 $channel = $connection->channel(); $message = $channel->basic_get($queue_name); // 取出消息 $channel->basic_ack($message->delivery_info['delivery_tag']); // 确认取出消息后会发送一个ack来确认取出来了,然后会从rabbitmq中将这个消息移除,如果删掉这段代码,会发现rabbitmq中的消息还是没有减少 $channel->close(); $connection->close(); return $message; } }