<?php namespace App\Http\Services; use GuzzleHttp\Client; use Illuminate\Support\Facades\Log; //调用队列接口投掷队列服务 class QueueDeliveryService { const QUEUE_NAME = 'lie_queue_united_data'; //默认的队列名称 const PUSH_TYPE_QUEUE = 1; const PUSH_TYPE_HTTP = 2; const PUSH_TYPE_SYNC_HTTP = 3;//异步转发到http服务 public $queueName; public function __construct() { $this->queueName = self::QUEUE_NAME; } public function setQueueName($name) { $this->queueName = $name; return $this; } /** * 队列推送队列数据 * @param int $types 类型: * 1:队列接口soap推送; * 2:队列原生推送,自己拼接json推送到接口 * @param string $func 如果是soap,则是 的函数:getCreditAmount 检测账期 * @param array $param 推送数据,如:["CURRENCY"=>'"民币","CUSTOMER"=>"深圳小明电子科技"] * 如果 $types=3 ,则是原生推送,自己拼接json推送到接口队列: * @param string $searchKey * @param string $callbackPath 相对回调路径:/open/testa * @param int $mainTimeOut 主推送超时时间 * @param int $callbackTimeout 推送数据 * @return array * @throws \GuzzleHttp\Exception\GuzzleException */ public function push( $types = 1, $func = "", $param = [], $searchKey = "", $callbackPath = "", $mainTimeOut = 120, $callbackTimeout = 10 ) { $client = new Client(); $rabmqUrl = config('website.QueueDomain'); $res = ""; //请求返回结果 $data = []; try { switch ($types) { case self::PUSH_TYPE_QUEUE: #队列推送 $uk = md5(json_encode($param) . time()); $data = [ "queue_name" => $this->queueName, "mq_data" => [ "__from" => $this->getFromUrl() ?: gethostname(), "__insert_time" => time(), "__timeout" => $mainTimeOut, "__route_key" => $func, "__type" => "soap", "__uk" => $uk, "__search_key" => $searchKey, "data" => $param, ] ]; if ($callbackPath) { $nowDomain = str_replace(request()->path(), "", request()->url()); $data["mq_data"]["__callback"] = [ // 选填字段,有回调时必填 "__callback_url" => $nowDomain . $callbackPath, // 回调的url路由地址,暂不支持回调soap接口列 "__callback_timeout" => $callbackTimeout, // 回调该接口的超时时间,默认为10秒,选填字段 "__callback_type" => "http", // 回调该接口的请求方式,同"__type",选填字段 "__callback_search_key" => $searchKey, //"__callback_verify"=>"verify" // 若回调的接口需要校验,如请求龙哥的接口需要该字段,选填字段 ]; } $response = $client->request('POST', $rabmqUrl, [ 'json' => $data, ]); $res = $response->getBody(); break; case self::PUSH_TYPE_SYNC_HTTP://异步转发到http请求 $uk = md5(json_encode($param) . time()); $data = [ "queue_name" => $this->queueName, "mq_data" => [ "__from" => gethostname(), "__insert_time" => time(), "__timeout" => $mainTimeOut, "__route_key" => $func, "__type" => "http", "__uk" => $uk, "__search_key" => $searchKey, "data" => $param, ] ]; if ($callbackPath) { $nowDomain = str_replace(request()->path(), "", request()->url()); $data["mq_data"]["__callback"] = [ // 选填字段,有回调时必填 "__callback_url" => $nowDomain . $callbackPath, // 回调的url路由地址,暂不支持回调soap接口列 "__callback_timeout" => $callbackTimeout, // 回调该接口的超时时间,默认为10秒,选填字段 "__callback_type" => "http", // 回调该接口的请求方式,同"__type",选填字段 "__callback_search_key" => $searchKey, ]; } $response = $client->request('POST', $rabmqUrl, [ 'json' => $data ]); $res = $response->getBody(); break; case self::PUSH_TYPE_HTTP: #原生推送,自己组织数据 $data = $param; $response = $client->request('POST', $rabmqUrl, [ 'json' => $data ]); $res = $response->getBody(); break; } } catch (\Exception $e) { dd($e->getMessage()); Log::info("--- 推送数据:" . json_encode($data) . " 异常:" . $e->getMessage()); } if (data_get($_REQUEST, "debug")) { echo json_encode([ "队列接口:" => $rabmqUrl, "请求数据:" => $data, "返回结果" => json_decode($res, true), ]); } Log::info("--- 推送数据:" . json_encode($data) . " 返回数据:" . $res); $arr = json_decode($res, true); if (isset($arr['code'])) { return $arr; } else { //兼容旧接口 $arrTemp["code"] = isset($arr['0000']) ? 0 : 1; $arrTemp["msg"] = $arrTemp["code"] == 0 ? "成功" : data_get($arr, "4444"); $arrTemp["data"] = $arr; return $arrTemp; } } private function getFromUrl() { $domain = isset($_SERVER['HTTP_HOST']) ? $_SERVER['HTTP_HOST'] : ''; $requestUrl = isset($_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''; $fromUrl = ''; if ($domain && $requestUrl) { $fromUrl = $domain . $requestUrl; } return $fromUrl; } }