Commit 6a05c889 by mushishixian

rbmq包

parent 01ede865
Showing with 3489 additions and 5 deletions
......@@ -9,7 +9,8 @@
"laravel/framework": "5.2.*",
"ext-json": "*",
"ext-curl": "*",
"predis/predis": "^1.1"
"predis/predis": "^1.1",
"vladimir-yuldashev/laravel-queue-rabbitmq": "5.2"
},
"require-dev": {
"fzaninotto/faker": "~1.4",
......
......@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "0f1afe014feaa70879e650091e283d00",
"content-hash": "83e1cb18d31ab4c1f7e7a8af172b56d5",
"packages": [
{
"name": "classpreloader/classpreloader",
......@@ -934,6 +934,82 @@
"time": "2018-04-04T21:48:54+00:00"
},
{
"name": "php-amqplib/php-amqplib",
"version": "v2.6.3",
"source": {
"type": "git",
"url": "https://github.com/php-amqplib/php-amqplib.git",
"reference": "fa2f0d4410a11008cb36b379177291be7ee9e4f6"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/fa2f0d4410a11008cb36b379177291be7ee9e4f6",
"reference": "fa2f0d4410a11008cb36b379177291be7ee9e4f6",
"shasum": "",
"mirrors": [
{
"url": "https://mirrors.aliyun.com/composer/dists/%package%/%reference%.%type%",
"preferred": true
}
]
},
"require": {
"ext-bcmath": "*",
"ext-mbstring": "*",
"php": ">=5.3.0"
},
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"require-dev": {
"phpunit/phpunit": "^4.8",
"scrutinizer/ocular": "^1.1",
"squizlabs/php_codesniffer": "^2.5"
},
"suggest": {
"ext-sockets": "Use AMQPSocketConnection"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "2.7-dev"
}
},
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"LGPL-2.1"
],
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "John Kelly",
"email": "johnmkelly86@gmail.com",
"role": "Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
}
],
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"keywords": [
"message",
"queue",
"rabbitmq"
],
"time": "2016-04-11T14:30:01+00:00"
},
{
"name": "predis/predis",
"version": "v1.1.7",
"source": {
......@@ -2111,6 +2187,51 @@
"time": "2016-07-26T08:03:56+00:00"
},
{
"name": "vladimir-yuldashev/laravel-queue-rabbitmq",
"version": "5.2",
"source": {
"type": "git",
"url": "https://github.com/vyuldashev/laravel-queue-rabbitmq.git",
"reference": "54460897a5a52cf6d679d3995a85255049eb09e8"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/vyuldashev/laravel-queue-rabbitmq/zipball/54460897a5a52cf6d679d3995a85255049eb09e8",
"reference": "54460897a5a52cf6d679d3995a85255049eb09e8",
"shasum": "",
"mirrors": [
{
"url": "https://mirrors.aliyun.com/composer/dists/%package%/%reference%.%type%",
"preferred": true
}
]
},
"require": {
"illuminate/queue": "5.2.*",
"illuminate/support": "5.2.*",
"php": ">=5.5.9",
"php-amqplib/php-amqplib": "2.6.*"
},
"type": "library",
"autoload": {
"psr-0": {
"VladimirYuldashev\\LaravelQueueRabbitMQ": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Vladimir Yuldashev",
"email": "misterio92@gmail.com"
}
],
"description": "RabbitMQ driver for Laravel Queue",
"time": "2016-12-09T09:14:37+00:00"
},
{
"name": "vlucas/phpdotenv",
"version": "v2.6.3",
"source": {
......@@ -2291,6 +2412,7 @@
"faker",
"fixtures"
],
"abandoned": true,
"time": "2019-12-12T13:22:17+00:00"
},
{
......@@ -2929,6 +3051,7 @@
"keywords": [
"tokenizer"
],
"abandoned": true,
"time": "2017-12-04T08:55:13+00:00"
},
{
......@@ -3673,12 +3796,12 @@
"version": "1.7.0",
"source": {
"type": "git",
"url": "https://github.com/webmozart/assert.git",
"url": "https://github.com/webmozarts/assert.git",
"reference": "aed98a490f9a8f78468232db345ab9cf606cf598"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/webmozart/assert/zipball/aed98a490f9a8f78468232db345ab9cf606cf598",
"url": "https://api.github.com/repos/webmozarts/assert/zipball/aed98a490f9a8f78468232db345ab9cf606cf598",
"reference": "aed98a490f9a8f78468232db345ab9cf606cf598",
"shasum": "",
"mirrors": [
......@@ -3729,7 +3852,7 @@
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
"php": ">=5.5.9",
"php": ">=5.6",
"ext-json": "*",
"ext-curl": "*"
},
......
......@@ -6,6 +6,7 @@ $vendorDir = dirname(dirname(__FILE__));
$baseDir = dirname($vendorDir);
return array(
'VladimirYuldashev\\LaravelQueueRabbitMQ' => array($vendorDir . '/vladimir-yuldashev/laravel-queue-rabbitmq/src'),
'UpdateHelper\\' => array($vendorDir . '/kylekatarnls/update-helper/src'),
'Mockery' => array($vendorDir . '/mockery/mockery/library'),
'JakubOnderka\\PhpConsoleHighlighter' => array($vendorDir . '/jakub-onderka/php-console-highlighter/src'),
......
......@@ -32,6 +32,7 @@ return array(
'Prophecy\\' => array($vendorDir . '/phpspec/prophecy/src/Prophecy'),
'Predis\\' => array($vendorDir . '/predis/predis/src'),
'PhpParser\\' => array($vendorDir . '/nikic/php-parser/lib/PhpParser'),
'PhpAmqpLib\\' => array($vendorDir . '/php-amqplib/php-amqplib/PhpAmqpLib'),
'Monolog\\' => array($vendorDir . '/monolog/monolog/src/Monolog'),
'League\\Flysystem\\' => array($vendorDir . '/league/flysystem/src'),
'JakubOnderka\\PhpConsoleColor\\' => array($vendorDir . '/jakub-onderka/php-console-color/src'),
......
......@@ -1204,6 +1204,84 @@
]
},
{
"name": "php-amqplib/php-amqplib",
"version": "v2.6.3",
"version_normalized": "2.6.3.0",
"source": {
"type": "git",
"url": "https://github.com/php-amqplib/php-amqplib.git",
"reference": "fa2f0d4410a11008cb36b379177291be7ee9e4f6"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/fa2f0d4410a11008cb36b379177291be7ee9e4f6",
"reference": "fa2f0d4410a11008cb36b379177291be7ee9e4f6",
"shasum": "",
"mirrors": [
{
"url": "https://mirrors.aliyun.com/composer/dists/%package%/%reference%.%type%",
"preferred": true
}
]
},
"require": {
"ext-bcmath": "*",
"ext-mbstring": "*",
"php": ">=5.3.0"
},
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"require-dev": {
"phpunit/phpunit": "^4.8",
"scrutinizer/ocular": "^1.1",
"squizlabs/php_codesniffer": "^2.5"
},
"suggest": {
"ext-sockets": "Use AMQPSocketConnection"
},
"time": "2016-04-11T14:30:01+00:00",
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "2.7-dev"
}
},
"installation-source": "dist",
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"LGPL-2.1"
],
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "John Kelly",
"email": "johnmkelly86@gmail.com",
"role": "Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
}
],
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"keywords": [
"message",
"queue",
"rabbitmq"
]
},
{
"name": "phpdocumentor/reflection-common",
"version": "2.0.0",
"version_normalized": "2.0.0.0",
......@@ -3713,6 +3791,53 @@
"homepage": "https://symfony.com"
},
{
"name": "vladimir-yuldashev/laravel-queue-rabbitmq",
"version": "5.2",
"version_normalized": "5.2.0.0",
"source": {
"type": "git",
"url": "https://github.com/vyuldashev/laravel-queue-rabbitmq.git",
"reference": "54460897a5a52cf6d679d3995a85255049eb09e8"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/vyuldashev/laravel-queue-rabbitmq/zipball/54460897a5a52cf6d679d3995a85255049eb09e8",
"reference": "54460897a5a52cf6d679d3995a85255049eb09e8",
"shasum": "",
"mirrors": [
{
"url": "https://mirrors.aliyun.com/composer/dists/%package%/%reference%.%type%",
"preferred": true
}
]
},
"require": {
"illuminate/queue": "5.2.*",
"illuminate/support": "5.2.*",
"php": ">=5.5.9",
"php-amqplib/php-amqplib": "2.6.*"
},
"time": "2016-12-09T09:14:37+00:00",
"type": "library",
"installation-source": "dist",
"autoload": {
"psr-0": {
"VladimirYuldashev\\LaravelQueueRabbitMQ": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Vladimir Yuldashev",
"email": "misterio92@gmail.com"
}
],
"description": "RabbitMQ driver for Laravel Queue"
},
{
"name": "vlucas/phpdotenv",
"version": "v2.6.3",
"version_normalized": "2.6.3.0",
......
# Changelog
All Notable changes to `php-amqplib` will be documented in this file
## [Unreleased]
## 2.6.3 - 2016-04-11
### Added
- Added the ability to set timeout as float
### Fixed
- Fixed restoring of error_handler on connection error
### Enhancements
- Verify read_write_timeout is at least 2x the heartbeat (if set)
- Many PHPDoc fixes
- Throw exception when trying to create an exchange on a closed connection
## 2.6.2 - 2016-03-02
### Added
- Added AMQPLazySocketConnection
- AbstractConnection::getServerProperties method to retrieve server properties.
- AMQPReader::wait() will throw IOWaitException on stream_select failure
- Add PHPDocs to Auto-generated Protocol Classes
### Fixed
- Disable heartbeat when closing connection
- Fix for when the default error handler is not restored in StreamIO
### Enhancements
- Cleanup tests and improve testing performance
- Confirm received valid frame type on wait_frame in AbstractConnection
- Update DEMO files closer to PSR-2 standards
## 2.6.1 - 2016-02-12
### Added
- Add constants for delivery modes to AMQPMessage
### Fixed
- Fix some PHPDoc problems
- AbstractCollection value de/encoding on PHP7
- StreamIO: fix "bad write retry" in SSL mode
### Enhancements
- Update PHPUnit configuration
- Add scrutinizer-ci configuration
- Organizational changes from videlalvaro to php-amqplib org
- Minor complexity optimizations, code organization, and code cleanup
## 2.6.0 - 2015-09-23
### BC Breaking Changes
- The `AMQPStreamConnection` class now throws `ErrorExceptions` when errors happen while reading/writing to the network.
### Added
- Heartbeat frames will decrease the timeout used when calling wait_channel - heartbeat frames do not reset the timeout
### Fixed
- Declared the class AbstractChannel as being an abstract class
- Reads, writes and signals respond immediately instead of waiting for a timeout
- Fatal error in some cases on Channel.wait with timeout when RabbitMQ restarted
- Remove warning when trying to push a deferred frame
# Contributor Code of Conduct
As contributors and maintainers of this project, and in the interest of
fostering an open and welcoming community, we pledge to respect all people who
contribute through reporting issues, posting feature requests, updating
documentation, submitting pull requests or patches, and other activities.
We are committed to making participation in this project a harassment-free
experience for everyone, regardless of level of experience, gender, gender
identity and expression, sexual orientation, disability, personal appearance,
body size, race, ethnicity, age, religion, or nationality.
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery
* Personal attacks
* Trolling or insulting/derogatory comments
* Public or private harassment
* Publishing other's private information, such as physical or electronic
addresses, without explicit permission
* Other unethical or unprofessional conduct
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
By adopting this Code of Conduct, project maintainers commit themselves to
fairly and consistently applying these principles to every aspect of managing
this project. Project maintainers who do not follow or enforce the Code of
Conduct may be permanently removed from the project team.
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting a project maintainer. All complaints will be reviewed
and investigated and will result in a response that is deemed necessary and
appropriate to the circumstances. Maintainers are obligated to maintain
confidentiality with regard to the reporter of an incident.
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 1.3.0, available at
[http://contributor-covenant.org/version/1/3/0/][version]
[homepage]: http://contributor-covenant.org
[version]: http://contributor-covenant.org/version/1/3/0/
# Contributing
Contributions are **welcome** and will be fully **credited**.
We accept contributions via Pull Requests on [Github](https://github.com/videlalvaro/php-amqplib).
## Pull Requests
- **[PSR-2 Coding Standard](https://github.com/php-fig/fig-standards/blob/master/accepted/PSR-2-coding-style-guide.md)** - The easiest way to apply the conventions is to install [PHP Code Sniffer](http://pear.php.net/package/PHP_CodeSniffer).
- **Add tests!** - Your patch won't be accepted if it doesn't have tests.
- **Document any change in behaviour** - Make sure the README and any other relevant documentation are kept up-to-date.
- **Consider our release cycle** - We try to follow semver. Randomly breaking public APIs is not an option.
- **Create topic branches** - Don't ask us to pull from your master branch.
- **One pull request per feature** - If you want to do more than one thing, send multiple pull requests.
- **Send coherent history** - Make sure each individual commit in your pull request is meaningful. If you had to make multiple intermediate commits while developing, please squash them before submitting.
## Running Tests
``` bash
$ phpunit
```
**Happy coding**!
Following people contributed to this project:
Barry Pederson <bp@barryp.org> - author of original Python lib
Vadim Zaliva <lord@crocodile.org> - PHP paort
taavi013@gmail.com patches - patches
Sean Murphy http://code.google.com/u/sgmurphy/ - patches
spiderbill http://code.google.com/u/spiderbill/ - patches
<?php
namespace PhpAmqpLib\Connection;
/**
* Class AMQPConnection
*
* Kept for BC
*
* @deprecated
*/
class AMQPConnection extends AMQPStreamConnection
{
}
<?php
namespace PhpAmqpLib\Connection;
class AMQPLazyConnection extends AMQPStreamConnection
{
/**
* Gets socket from current connection
*
* @deprecated
*/
public function getSocket()
{
$this->connect();
return parent::getSocket();
}
/**
* {@inheritdoc}
*/
public function channel($channel_id = null)
{
$this->connect();
return parent::channel($channel_id);
}
/**
* @return null|\PhpAmqpLib\Wire\IO\AbstractIO
*/
protected function getIO()
{
if (!$this->io) {
$this->connect();
}
return $this->io;
}
/**
* Should the connection be attempted during construction?
*
* @return bool
*/
public function connectOnConstruct()
{
return false;
}
}
<?php
namespace PhpAmqpLib\Connection;
/**
* Yet another lazy connection. This time using sockets. Current architecture doesn't allow to wrap existing connections
*/
class AMQPLazySocketConnection extends AMQPSocketConnection
{
/**
* Gets socket from current connection
*
* @deprecated
*/
public function getSocket()
{
$this->connect();
return parent::getSocket();
}
/**
* {@inheritdoc}
*/
public function channel($channel_id = null)
{
$this->connect();
return parent::channel($channel_id);
}
/**
* @return null|\PhpAmqpLib\Wire\IO\AbstractIO
*/
protected function getIO()
{
if (!$this->io) {
$this->connect();
}
return $this->io;
}
/**
* Should the connection be attempted during construction?
*
* @return bool
*/
public function connectOnConstruct()
{
return false;
}
}
\ No newline at end of file
<?php
namespace PhpAmqpLib\Connection;
class AMQPSSLConnection extends AMQPStreamConnection
{
/**
* @param string $host
* @param int $port
* @param string $user
* @param string $password
* @param string $vhost
* @param array $ssl_options
* @param array $options
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$ssl_options = array(),
$options = array()
) {
$ssl_context = empty($ssl_options) ? null : $this->create_ssl_context($ssl_options);
parent::__construct(
$host,
$port,
$user,
$password,
$vhost,
isset($options['insist']) ? $options['insist'] : false,
isset($options['login_method']) ? $options['login_method'] : 'AMQPLAIN',
isset($options['login_response']) ? $options['login_response'] : null,
isset($options['locale']) ? $options['locale'] : 'en_US',
isset($options['connection_timeout']) ? $options['connection_timeout'] : 3,
isset($options['read_write_timeout']) ? $options['read_write_timeout'] : 3,
$ssl_context,
isset($options['keepalive']) ? $options['keepalive'] : false,
isset($options['heartbeat']) ? $options['heartbeat'] : 0
);
}
/**
* @param array $options
* @return resource
*/
private function create_ssl_context($options)
{
$ssl_context = stream_context_create();
foreach ($options as $k => $v) {
stream_context_set_option($ssl_context, 'ssl', $k, $v);
}
return $ssl_context;
}
}
<?php
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Wire\IO\SocketIO;
class AMQPSocketConnection extends AbstractConnection
{
/**
* @param string $host
* @param int $port
* @param string $user
* @param string $password
* @param string $vhost
* @param bool $insist
* @param string $login_method
* @param null $login_response
* @param string $locale
* @param float $timeout
* @param bool $keepalive
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$timeout = 3,
$keepalive = false
) {
$io = new SocketIO($host, $port, $timeout, $keepalive);
parent::__construct(
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$io
);
}
}
<?php
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Wire\IO\StreamIO;
class AMQPStreamConnection extends AbstractConnection
{
/**
* @param string $host
* @param string $port
* @param string $user
* @param string $password
* @param string $vhost
* @param bool $insist
* @param string $login_method
* @param null $login_response
* @param string $locale
* @param float $connection_timeout
* @param float $read_write_timeout
* @param null $context
* @param bool $keepalive
* @param int $heartbeat
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$connection_timeout = 3.0,
$read_write_timeout = 3.0,
$context = null,
$keepalive = false,
$heartbeat = 0
) {
$io = new StreamIO(
$host,
$port,
$connection_timeout,
$read_write_timeout,
$context,
$keepalive,
$heartbeat
);
parent::__construct(
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$io,
$heartbeat
);
// save the params for the use of __clone, this will overwrite the parent
$this->construct_params = func_get_args();
}
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPBasicCancelException extends \Exception implements AMQPExceptionInterface
{
/** @var string */
public $consumerTag;
/**
* @param string $consumerTag
*/
public function __construct($consumerTag)
{
$this->consumerTag = $consumerTag;
}
}
<?php
namespace PhpAmqpLib\Exception;
/**
* @deprecated use AMQPProtocolChannelException instead
*/
class AMQPChannelException extends AMQPException
{
}
<?php
namespace PhpAmqpLib\Exception;
/**
* @deprecated use AMQPProtocolConnectionException instead
*/
class AMQPConnectionException extends AMQPException
{
}
<?php
namespace PhpAmqpLib\Exception;
//TODO refactor usage of static methods
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Helper\MiscHelper;
/**
* @deprecated use AMQPProtocolException instead
*/
class AMQPException extends \Exception
{
/** @var string */
public $amqp_reply_code;
/** @var int */
public $amqp_reply_text;
/** @var \Exception */
public $amqp_method_sig;
/** @var array */
public $args;
/**
* @param string $reply_code
* @param int $reply_text
* @param array $method_sig
*/
public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_text, $reply_code);
$this->amqp_reply_code = $reply_code; // redundant, but kept for BC
$this->amqp_reply_text = $reply_text; // redundant, but kept for BC
$this->amqp_method_sig = $method_sig;
$ms = MiscHelper::methodSig($method_sig);
$protocolClass = AbstractChannel::$PROTOCOL_CONSTANTS_CLASS;
$mn = isset($protocolClass::$GLOBAL_METHOD_NAMES[$ms])
? $protocolClass::$GLOBAL_METHOD_NAMES[$ms]
: $mn = '';
$this->args = array($reply_code, $reply_text, $method_sig, $mn);
}
}
<?php
namespace PhpAmqpLib\Exception;
interface AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPIOException extends \Exception implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPIOWaitException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPInvalidArgumentException extends \RuntimeException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPLogicException extends \LogicException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPOutOfBoundsException extends \OutOfBoundsException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPOutOfRangeException extends \OutOfRangeException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPProtocolChannelException extends AMQPProtocolException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPProtocolConnectionException extends AMQPProtocolException
{
}
<?php
namespace PhpAmqpLib\Exception;
//TODO refactor usage of static methods
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Helper\MiscHelper;
class AMQPProtocolException extends \Exception implements AMQPExceptionInterface
{
/** @var string */
public $amqp_reply_code;
/** @var int */
public $amqp_reply_text;
/** @var \Exception */
public $amqp_method_sig;
/** @var array */
public $args;
/**
* @param string $reply_code
* @param int $reply_text
* @param array $method_sig
*/
public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_text, $reply_code);
$this->amqp_reply_code = $reply_code; // redundant, but kept for BC
$this->amqp_reply_text = $reply_text; // redundant, but kept for BC
$this->amqp_method_sig = $method_sig;
$ms = MiscHelper::methodSig($method_sig);
$protocolClass = AbstractChannel::$PROTOCOL_CONSTANTS_CLASS;
$mn = isset($protocolClass::$GLOBAL_METHOD_NAMES[$ms])
? $protocolClass::$GLOBAL_METHOD_NAMES[$ms]
: $mn = '';
$this->args = array($reply_code, $reply_text, $method_sig, $mn);
}
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPRuntimeException extends \RuntimeException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPTimeoutException extends \RuntimeException implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Helper;
class DebugHelper
{
/**
* @var bool
*/
protected $debug;
/**
* @var string
*/
protected $PROTOCOL_CONSTANTS_CLASS;
/**
* @param string $PROTOCOL_CONSTANTS_CLASS
*/
public function __construct($PROTOCOL_CONSTANTS_CLASS) {
$this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false;
$this->PROTOCOL_CONSTANTS_CLASS = $PROTOCOL_CONSTANTS_CLASS;
}
/**
* @param string $msg
*/
public function debug_msg($msg) {
if ($this->debug) {
$this->print_msg($msg);
}
}
/**
* @param array $allowed_methods
*/
public function debug_allowed_methods($allowed_methods) {
if ($allowed_methods) {
$msg = 'waiting for ' . implode(', ', $allowed_methods);
} else {
$msg = 'waiting for any method';
}
$this->debug_msg($msg);
}
/**
* @param string $method_sig
*/
public function debug_method_signature1($method_sig) {
$this->debug_method_signature('< %s:', $method_sig);
}
/**
* @param string $msg
* @param string $method_sig
*/
public function debug_method_signature($msg, $method_sig) {
if ($this->debug) {
$protocolClass = $this->PROTOCOL_CONSTANTS_CLASS;
$this->debug_msg(sprintf(
$msg . ': %s',
MiscHelper::methodSig($method_sig),
$protocolClass::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)]
));
}
}
/**
* @param string $data
*/
public function debug_hexdump($data) {
if ($this->debug) {
$this->debug_msg(sprintf(
'< [hex]: %s%s',
PHP_EOL,
MiscHelper::hexdump($data, $htmloutput = false, $uppercase = true, $return = true)
));
}
}
/**
* @param int $version_major
* @param int $version_minor
* @param array $server_properties
* @param array $mechanisms
* @param array $locales
*/
public function debug_connection_start($version_major, $version_minor, $server_properties, $mechanisms, $locales) {
if ($this->debug) {
$this->debug_msg(sprintf(
'Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s',
$version_major,
$version_minor,
MiscHelper::dump_table($server_properties),
implode(', ', $mechanisms),
implode(', ', $locales)
));
}
}
/**
* @param string $s
*/
protected function print_msg($s) {
echo $s . PHP_EOL;
}
}
<?php
namespace PhpAmqpLib\Helper;
class MiscHelper
{
/**
* @param string|array $a
* @return string
*/
public static function methodSig($a)
{
if (is_string($a)) {
return $a;
}
return sprintf('%d,%d', $a[0], $a[1]);
}
/**
* @param string $bytes
*/
public static function saveBytes($bytes)
{
$fh = fopen('/tmp/bytes', 'wb');
fwrite($fh, $bytes);
fclose($fh);
}
/**
* Gets a number (either int or float) and returns an array containing its integer part as first element and its
* decimal part mutliplied by 10^6. Useful for some PHP stream functions that need seconds and microseconds as
* different arguments
*
* @param int|float $number
* @return array
*/
public static function splitSecondsMicroseconds($number)
{
return array(floor($number), ($number - floor($number)) * 1000000);
}
/**
* View any string as a hexdump.
*
* This is most commonly used to view binary data from streams
* or sockets while debugging, but can be used to view any string
* with non-viewable characters.
*
* @version 1.3.2
* @author Aidan Lister <aidan@php.net>
* @author Peter Waller <iridum@php.net>
* @link http://aidanlister.com/repos/v/function.hexdump.php
*
* @param string $data The string to be dumped
* @param bool $htmloutput Set to false for non-HTML output
* @param bool $uppercase Set to true for uppercase hex
* @param bool $return Set to true to return the dump
* @return string|null
*/
public static function hexdump($data, $htmloutput = true, $uppercase = false, $return = false)
{
// Init
$hexi = '';
$ascii = '';
$dump = $htmloutput ? '<pre>' : '';
$offset = 0;
$len = mb_strlen($data, 'ASCII');
// Upper or lower case hexidecimal
$hexFormat = $uppercase ? 'X' : 'x';
// Iterate string
for ($i = $j = 0; $i < $len; $i++) {
// Convert to hexidecimal
// We must use concatenation here because the $hexFormat value
// is needed for sprintf() to parse the format
$hexi .= sprintf('%02' . $hexFormat . ' ', ord($data[$i]));
// Replace non-viewable bytes with '.'
if (ord($data[$i]) >= 32) {
$ascii .= $htmloutput ? htmlentities($data[$i]) : $data[$i];
} else {
$ascii .= '.';
}
// Add extra column spacing
if ($j === 7) {
$hexi .= ' ';
$ascii .= ' ';
}
// Add row
if (++$j === 16 || $i === $len - 1) {
// Join the hexi / ascii output
// We must use concatenation here because the $hexFormat value
// is needed for sprintf() to parse the format
$dump .= sprintf('%04' . $hexFormat . ' %-49s %s', $offset, $hexi, $ascii);
// Reset vars
$hexi = $ascii = '';
$offset += 16;
$j = 0;
// Add newline
if ($i !== $len - 1) {
$dump .= PHP_EOL;
}
}
}
// Finish dump
$dump .= $htmloutput ? '</pre>' : '';
$dump .= PHP_EOL;
if ($return) {
return $dump;
}
echo $dump;
}
/**
* @param array $table
* @return string
*/
public static function dump_table($table)
{
$tokens = array();
foreach ($table as $name => $value) {
switch ($value[0]) {
case 'D':
$val = $value[1]->n . 'E' . $value[1]->e;
break;
case 'F':
$val = '(' . self::dump_table($value[1]) . ')';
break;
case 'T':
$val = date('Y-m-d H:i:s', $value[1]);
break;
default:
$val = $value[1];
}
$tokens[] = $name . '=' . $val;
}
return implode(', ', $tokens);
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class MethodMap080
{
/**
* @var array
*/
protected $method_map = array(
'10,10' => 'connection_start',
'10,11' => 'connection_start_ok',
'10,20' => 'connection_secure',
'10,21' => 'connection_secure_ok',
'10,30' => 'connection_tune',
'10,31' => 'connection_tune_ok',
'10,40' => 'connection_open',
'10,41' => 'connection_open_ok',
'10,50' => 'connection_redirect',
'10,60' => 'connection_close',
'10,61' => 'connection_close_ok',
'20,10' => 'channel_open',
'20,11' => 'channel_open_ok',
'20,20' => 'channel_flow',
'20,21' => 'channel_flow_ok',
'20,30' => 'channel_alert',
'20,40' => 'channel_close',
'20,41' => 'channel_close_ok',
'30,10' => 'access_request',
'30,11' => 'access_request_ok',
'40,10' => 'exchange_declare',
'40,11' => 'exchange_declare_ok',
'40,20' => 'exchange_delete',
'40,21' => 'exchange_delete_ok',
'50,10' => 'queue_declare',
'50,11' => 'queue_declare_ok',
'50,20' => 'queue_bind',
'50,21' => 'queue_bind_ok',
'50,30' => 'queue_purge',
'50,31' => 'queue_purge_ok',
'50,40' => 'queue_delete',
'50,41' => 'queue_delete_ok',
'50,50' => 'queue_unbind',
'50,51' => 'queue_unbind_ok',
'60,10' => 'basic_qos',
'60,11' => 'basic_qos_ok',
'60,20' => 'basic_consume',
'60,21' => 'basic_consume_ok',
'60,30' => 'basic_cancel',
'60,31' => 'basic_cancel_ok',
'60,40' => 'basic_publish',
'60,50' => 'basic_return',
'60,60' => 'basic_deliver',
'60,70' => 'basic_get',
'60,71' => 'basic_get_ok',
'60,72' => 'basic_get_empty',
'60,80' => 'basic_ack',
'60,90' => 'basic_reject',
'60,100' => 'basic_recover_async',
'60,110' => 'basic_recover',
'60,111' => 'basic_recover_ok',
'70,10' => 'file_qos',
'70,11' => 'file_qos_ok',
'70,20' => 'file_consume',
'70,21' => 'file_consume_ok',
'70,30' => 'file_cancel',
'70,31' => 'file_cancel_ok',
'70,40' => 'file_open',
'70,41' => 'file_open_ok',
'70,50' => 'file_stage',
'70,60' => 'file_publish',
'70,70' => 'file_return',
'70,80' => 'file_deliver',
'70,90' => 'file_ack',
'70,100' => 'file_reject',
'80,10' => 'stream_qos',
'80,11' => 'stream_qos_ok',
'80,20' => 'stream_consume',
'80,21' => 'stream_consume_ok',
'80,30' => 'stream_cancel',
'80,31' => 'stream_cancel_ok',
'80,40' => 'stream_publish',
'80,50' => 'stream_return',
'80,60' => 'stream_deliver',
'90,10' => 'tx_select',
'90,11' => 'tx_select_ok',
'90,20' => 'tx_commit',
'90,21' => 'tx_commit_ok',
'90,30' => 'tx_rollback',
'90,31' => 'tx_rollback_ok',
'100,10' => 'dtx_select',
'100,11' => 'dtx_select_ok',
'100,20' => 'dtx_start',
'100,21' => 'dtx_start_ok',
'110,10' => 'tunnel_request',
'120,10' => 'test_integer',
'120,11' => 'test_integer_ok',
'120,20' => 'test_string',
'120,21' => 'test_string_ok',
'120,30' => 'test_table',
'120,31' => 'test_table_ok',
'120,40' => 'test_content',
'120,41' => 'test_content_ok',
);
/**
* @var string $method_sig
* @return string
*/
public function get_method($method_sig)
{
return $this->method_map[$method_sig];
}
/**
* @var string $method_sig
* @return bool
*/
public function valid_method($method_sig)
{
return array_key_exists($method_sig, $this->method_map);
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class MethodMap091
{
/**
* @var array
*/
protected $method_map = array(
'10,10' => 'connection_start',
'10,11' => 'connection_start_ok',
'10,20' => 'connection_secure',
'10,21' => 'connection_secure_ok',
'10,30' => 'connection_tune',
'10,31' => 'connection_tune_ok',
'10,40' => 'connection_open',
'10,41' => 'connection_open_ok',
'10,50' => 'connection_close',
'10,51' => 'connection_close_ok',
'10,60' => 'connection_blocked',
'10,61' => 'connection_unblocked',
'20,10' => 'channel_open',
'20,11' => 'channel_open_ok',
'20,20' => 'channel_flow',
'20,21' => 'channel_flow_ok',
'20,40' => 'channel_close',
'20,41' => 'channel_close_ok',
'30,10' => 'access_request',
'30,11' => 'access_request_ok',
'40,10' => 'exchange_declare',
'40,11' => 'exchange_declare_ok',
'40,20' => 'exchange_delete',
'40,21' => 'exchange_delete_ok',
'40,30' => 'exchange_bind',
'40,31' => 'exchange_bind_ok',
'40,40' => 'exchange_unbind',
'40,51' => 'exchange_unbind_ok',
'50,10' => 'queue_declare',
'50,11' => 'queue_declare_ok',
'50,20' => 'queue_bind',
'50,21' => 'queue_bind_ok',
'50,30' => 'queue_purge',
'50,31' => 'queue_purge_ok',
'50,40' => 'queue_delete',
'50,41' => 'queue_delete_ok',
'50,50' => 'queue_unbind',
'50,51' => 'queue_unbind_ok',
'60,10' => 'basic_qos',
'60,11' => 'basic_qos_ok',
'60,20' => 'basic_consume',
'60,21' => 'basic_consume_ok',
'60,30' => 'basic_cancel_from_server',
'60,31' => 'basic_cancel_ok',
'60,40' => 'basic_publish',
'60,50' => 'basic_return',
'60,60' => 'basic_deliver',
'60,70' => 'basic_get',
'60,71' => 'basic_get_ok',
'60,72' => 'basic_get_empty',
'60,80' => 'basic_ack_from_server',
'60,90' => 'basic_reject',
'60,100' => 'basic_recover_async',
'60,110' => 'basic_recover',
'60,111' => 'basic_recover_ok',
'60,120' => 'basic_nack_from_server',
'90,10' => 'tx_select',
'90,11' => 'tx_select_ok',
'90,20' => 'tx_commit',
'90,21' => 'tx_commit_ok',
'90,30' => 'tx_rollback',
'90,31' => 'tx_rollback_ok',
'85,10' => 'confirm_select',
'85,11' => 'confirm_select_ok',
);
/**
* @var string $method_sig
* @return string
*/
public function get_method($method_sig)
{
return $this->method_map[$method_sig];
}
/**
* @var string $method_sig
* @return bool
*/
public function valid_method($method_sig)
{
return array_key_exists($method_sig, $this->method_map);
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class Wait080
{
/**
* @var array
*/
protected $wait = array(
'connection.start' => '10,10',
'connection.start_ok' => '10,11',
'connection.secure' => '10,20',
'connection.secure_ok' => '10,21',
'connection.tune' => '10,30',
'connection.tune_ok' => '10,31',
'connection.open' => '10,40',
'connection.open_ok' => '10,41',
'connection.redirect' => '10,50',
'connection.close' => '10,60',
'connection.close_ok' => '10,61',
'channel.open' => '20,10',
'channel.open_ok' => '20,11',
'channel.flow' => '20,20',
'channel.flow_ok' => '20,21',
'channel.alert' => '20,30',
'channel.close' => '20,40',
'channel.close_ok' => '20,41',
'access.request' => '30,10',
'access.request_ok' => '30,11',
'exchange.declare' => '40,10',
'exchange.declare_ok' => '40,11',
'exchange.delete' => '40,20',
'exchange.delete_ok' => '40,21',
'queue.declare' => '50,10',
'queue.declare_ok' => '50,11',
'queue.bind' => '50,20',
'queue.bind_ok' => '50,21',
'queue.purge' => '50,30',
'queue.purge_ok' => '50,31',
'queue.delete' => '50,40',
'queue.delete_ok' => '50,41',
'queue.unbind' => '50,50',
'queue.unbind_ok' => '50,51',
'basic.qos' => '60,10',
'basic.qos_ok' => '60,11',
'basic.consume' => '60,20',
'basic.consume_ok' => '60,21',
'basic.cancel' => '60,30',
'basic.cancel_ok' => '60,31',
'basic.publish' => '60,40',
'basic.return' => '60,50',
'basic.deliver' => '60,60',
'basic.get' => '60,70',
'basic.get_ok' => '60,71',
'basic.get_empty' => '60,72',
'basic.ack' => '60,80',
'basic.reject' => '60,90',
'basic.recover_async' => '60,100',
'basic.recover' => '60,110',
'basic.recover_ok' => '60,111',
'file.qos' => '70,10',
'file.qos_ok' => '70,11',
'file.consume' => '70,20',
'file.consume_ok' => '70,21',
'file.cancel' => '70,30',
'file.cancel_ok' => '70,31',
'file.open' => '70,40',
'file.open_ok' => '70,41',
'file.stage' => '70,50',
'file.publish' => '70,60',
'file.return' => '70,70',
'file.deliver' => '70,80',
'file.ack' => '70,90',
'file.reject' => '70,100',
'stream.qos' => '80,10',
'stream.qos_ok' => '80,11',
'stream.consume' => '80,20',
'stream.consume_ok' => '80,21',
'stream.cancel' => '80,30',
'stream.cancel_ok' => '80,31',
'stream.publish' => '80,40',
'stream.return' => '80,50',
'stream.deliver' => '80,60',
'tx.select' => '90,10',
'tx.select_ok' => '90,11',
'tx.commit' => '90,20',
'tx.commit_ok' => '90,21',
'tx.rollback' => '90,30',
'tx.rollback_ok' => '90,31',
'dtx.select' => '100,10',
'dtx.select_ok' => '100,11',
'dtx.start' => '100,20',
'dtx.start_ok' => '100,21',
'tunnel.request' => '110,10',
'test.integer' => '120,10',
'test.integer_ok' => '120,11',
'test.string' => '120,20',
'test.string_ok' => '120,21',
'test.table' => '120,30',
'test.table_ok' => '120,31',
'test.content' => '120,40',
'test.content_ok' => '120,41',
);
/**
* @var string $method
* @return string
*/
public function get_wait($method)
{
return $this->wait[$method];
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class Wait091
{
/**
* @var array
*/
protected $wait = array(
'connection.start' => '10,10',
'connection.start_ok' => '10,11',
'connection.secure' => '10,20',
'connection.secure_ok' => '10,21',
'connection.tune' => '10,30',
'connection.tune_ok' => '10,31',
'connection.open' => '10,40',
'connection.open_ok' => '10,41',
'connection.close' => '10,50',
'connection.close_ok' => '10,51',
'connection.blocked' => '10,60',
'connection.unblocked' => '10,61',
'channel.open' => '20,10',
'channel.open_ok' => '20,11',
'channel.flow' => '20,20',
'channel.flow_ok' => '20,21',
'channel.close' => '20,40',
'channel.close_ok' => '20,41',
'access.request' => '30,10',
'access.request_ok' => '30,11',
'exchange.declare' => '40,10',
'exchange.declare_ok' => '40,11',
'exchange.delete' => '40,20',
'exchange.delete_ok' => '40,21',
'exchange.bind' => '40,30',
'exchange.bind_ok' => '40,31',
'exchange.unbind' => '40,40',
'exchange.unbind_ok' => '40,51',
'queue.declare' => '50,10',
'queue.declare_ok' => '50,11',
'queue.bind' => '50,20',
'queue.bind_ok' => '50,21',
'queue.purge' => '50,30',
'queue.purge_ok' => '50,31',
'queue.delete' => '50,40',
'queue.delete_ok' => '50,41',
'queue.unbind' => '50,50',
'queue.unbind_ok' => '50,51',
'basic.qos' => '60,10',
'basic.qos_ok' => '60,11',
'basic.consume' => '60,20',
'basic.consume_ok' => '60,21',
'basic.cancel' => '60,30',
'basic.cancel_ok' => '60,31',
'basic.publish' => '60,40',
'basic.return' => '60,50',
'basic.deliver' => '60,60',
'basic.get' => '60,70',
'basic.get_ok' => '60,71',
'basic.get_empty' => '60,72',
'basic.ack' => '60,80',
'basic.reject' => '60,90',
'basic.recover_async' => '60,100',
'basic.recover' => '60,110',
'basic.recover_ok' => '60,111',
'basic.nack' => '60,120',
'tx.select' => '90,10',
'tx.select_ok' => '90,11',
'tx.commit' => '90,20',
'tx.commit_ok' => '90,21',
'tx.rollback' => '90,30',
'tx.rollback_ok' => '90,31',
'confirm.select' => '85,10',
'confirm.select_ok' => '85,11',
);
/**
* @var string $method
* @return string
*/
public function get_wait($method)
{
return $this->wait[$method];
}
}
<?php
namespace PhpAmqpLib\Message;
use PhpAmqpLib\Wire\GenericContent;
/**
* A Message for use with the Channnel.basic_* methods.
*/
class AMQPMessage extends GenericContent
{
const DELIVERY_MODE_NON_PERSISTENT = 1;
const DELIVERY_MODE_PERSISTENT = 2;
/** @var string */
public $body;
/** @var int */
public $body_size;
/** @var bool */
public $is_truncated = false;
/** @var string */
public $content_encoding;
/** @var array */
protected static $propertyDefinitions = array(
'content_type' => 'shortstr',
'content_encoding' => 'shortstr',
'application_headers' => 'table_object',
'delivery_mode' => 'octet',
'priority' => 'octet',
'correlation_id' => 'shortstr',
'reply_to' => 'shortstr',
'expiration' => 'shortstr',
'message_id' => 'shortstr',
'timestamp' => 'timestamp',
'type' => 'shortstr',
'user_id' => 'shortstr',
'app_id' => 'shortstr',
'cluster_id' => 'shortstr',
);
/**
* @param string $body
* @param array $properties
*/
public function __construct($body = '', $properties = array())
{
$this->setBody($body);
parent::__construct($properties, static::$propertyDefinitions);
}
/**
* @return string
*/
public function getBody()
{
return $this->body;
}
/**
* Sets the message payload
*
* @param string $body
* @return $this
*/
public function setBody($body)
{
$this->body = $body;
return $this;
}
/**
* @return string
*/
public function getContentEncoding()
{
return $this->content_encoding;
}
/**
* @return int
*/
public function getBodySize()
{
return $this->body_size;
}
/**
* @param int $body_size Message body size in byte(s)
* @return AMQPMessage
*/
public function setBodySize($body_size)
{
$this->body_size = (int) $body_size;
return $this;
}
/**
* @return boolean
*/
public function isTruncated()
{
return $this->is_truncated;
}
/**
* @param bool $is_truncated
* @return AMQPMessage
*/
public function setIsTruncated($is_truncated)
{
$this->is_truncated = (bool) $is_truncated;
return $this;
}
}
<?php
namespace PhpAmqpLib\Wire;
class AMQPArray extends AMQPAbstractCollection
{
/**
* @param array|null $data
*/
public function __construct(array $data = null)
{
parent::__construct(empty($data) ? null : array_values($data));
}
/**
* @return int
*/
final public function getType()
{
return self::T_ARRAY;
}
/**
* @param mixed $val
* @param null $type
* @return $this
*/
public function push($val, $type = null)
{
$this->setValue($val, $type);
return $this;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
/**
* AMQP protocol decimal value.
*
* Values are represented as (n,e) pairs. The actual value
* is n * 10^(-e).
*
* From 0.8 spec: Decimal values are
* not intended to support floating point values, but rather
* business values such as currency rates and amounts. The
* 'decimals' octet is not signed.
*/
class AMQPDecimal
{
/** @var int */
protected $n;
/** @var int */
protected $e;
/**
* @param int $n
* @param int $e
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
*/
public function __construct($n, $e)
{
if ($e < 0) {
throw new AMQPOutOfBoundsException('Decimal exponent value must be unsigned!');
}
$this->n = $n;
$this->e = $e;
}
/**
* @return string
*/
public function asBCvalue()
{
return bcdiv($this->n, bcpow(10, $this->e));
}
/**
* @return int
*/
public function getE()
{
return $this->e;
}
/**
* @return int
*/
public function getN()
{
return $this->n;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception;
class AMQPTable extends AMQPAbstractCollection
{
/**
* @return int
*/
final public function getType()
{
return self::T_TABLE;
}
/**
* @param string $key
* @param mixed $val
* @param int|null $type
*/
public function set($key, $val, $type = null)
{
//https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf, https://www.rabbitmq.com/resources/specs/amqp0-8.pdf
//Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits, or underlines, to a maximum length of 128 characters
//The server SHOULD validate field names and upon receiving an invalid field name, it SHOULD signal a connection exception with reply code 503 (syntax error)
//validating length only and delegating other stuff to server, as rabbit seems to currently support numeric keys
if (!($len = strlen($key)) || ($len > 128)) {
throw new Exception\AMQPInvalidArgumentException('Table key must be non-empty string up to 128 chars in length');
}
$this->setValue($val, $type, $key);
}
}
<?php
namespace PhpAmqpLib\Wire;
class AbstractClient
{
/**
* @var bool
*/
protected $is64bits;
/**
* @var bool
*/
protected $isLittleEndian;
public function __construct()
{
$this->is64bits = PHP_INT_SIZE == 8;
$tmp = unpack('S', "\x01\x00"); // to maintain 5.3 compatibility
$this->isLittleEndian = $tmp[1] == 1;
}
/**
* Converts byte-string between native and network byte order, in both directions
*
* @param string $byteStr
* @return string
*/
protected function correctEndianness($byteStr)
{
return $this->isLittleEndian ? strrev($byteStr) : $byteStr;
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
class Constants080
{
/**
* @var string
*/
public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x08\x00";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
4 => 'FRAME-OOB-METHOD',
5 => 'FRAME-OOB-HEADER',
6 => 'FRAME-OOB-BODY',
7 => 'FRAME-TRACE',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
4 => '70,50',
5 => '70,70',
6 => '80,40',
7 => '80,50',
8 => '80,60',
9 => '110,10',
10 => '120,40',
11 => '120,41',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,60',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.redirect',
'10,60' => 'Connection.close',
'10,61' => 'Connection.close_ok',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,30' => 'Channel.alert',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'70,10' => 'File.qos',
'70,11' => 'File.qos_ok',
'70,20' => 'File.consume',
'70,21' => 'File.consume_ok',
'70,30' => 'File.cancel',
'70,31' => 'File.cancel_ok',
'70,40' => 'File.open',
'70,41' => 'File.open_ok',
'70,50' => 'File.stage',
'70,60' => 'File.publish',
'70,70' => 'File.return',
'70,80' => 'File.deliver',
'70,90' => 'File.ack',
'70,100' => 'File.reject',
'80,10' => 'Stream.qos',
'80,11' => 'Stream.qos_ok',
'80,20' => 'Stream.consume',
'80,21' => 'Stream.consume_ok',
'80,30' => 'Stream.cancel',
'80,31' => 'Stream.cancel_ok',
'80,40' => 'Stream.publish',
'80,50' => 'Stream.return',
'80,60' => 'Stream.deliver',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'100,10' => 'Dtx.select',
'100,11' => 'Dtx.select_ok',
'100,20' => 'Dtx.start',
'100,21' => 'Dtx.start_ok',
'110,10' => 'Tunnel.request',
'120,10' => 'Test.integer',
'120,11' => 'Test.integer_ok',
'120,20' => 'Test.string',
'120,21' => 'Test.string_ok',
'120,30' => 'Test.table',
'120,31' => 'Test.table_ok',
'120,40' => 'Test.content',
'120,41' => 'Test.content_ok',
);
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
class Constants091
{
/**
* @var string
*/
public static $AMQP_PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,50',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.close',
'10,51' => 'Connection.close_ok',
'10,60' => 'Connection.blocked',
'10,61' => 'Connection.unblocked',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'40,30' => 'Exchange.bind',
'40,31' => 'Exchange.bind_ok',
'40,40' => 'Exchange.unbind',
'40,51' => 'Exchange.unbind_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'60,120' => 'Basic.nack',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'85,10' => 'Confirm.select',
'85,11' => 'Confirm.select_ok',
);
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Channel\AMQPChannel;
/**
* Abstract base class for AMQP content. Subclasses should override
* the propertyDefinitions attribute.
*/
abstract class GenericContent
{
/** @var AMQPChannel[] */
public $delivery_info = array();
/** @var array Final property definitions */
protected $prop_types;
/** @var array Properties content */
private $properties = array();
/** @var string Compiled properties */
private $serialized_properties;
/**
* @var array
*/
protected static $propertyDefinitions = array(
'dummy' => 'shortstr'
);
/**
* @param array $properties Message property content
* @param array $propertyTypes Message property definitions
*/
public function __construct($properties, $propertyTypes = null)
{
$this->prop_types = self::$propertyDefinitions;
if (!empty($propertyTypes)) {
$this->prop_types = $propertyTypes;
}
if (!empty($properties)) {
$this->properties = array_intersect_key($properties, $this->prop_types);
}
}
/**
* Check whether a property exists in the 'properties' dictionary
* or if present - in the 'delivery_info' dictionary.
*
* @param string $name
* @return bool
*/
public function has($name)
{
return isset($this->properties[$name]) || isset($this->delivery_info[$name]);
}
/**
* Look for additional properties in the 'properties' dictionary,
* and if present - the 'delivery_info' dictionary.
*
* @param string $name
* @throws \OutOfBoundsException
* @return mixed|AMQPChannel
*/
public function get($name)
{
if (isset($this->properties[$name])) {
return $this->properties[$name];
}
if (isset($this->delivery_info[$name])) {
return $this->delivery_info[$name];
}
throw new \OutOfBoundsException(sprintf(
'No "%s" property',
$name
));
}
/**
* Returns the properties content
*
* @return array
*/
public function get_properties()
{
return $this->properties;
}
/**
* Sets a property value
*
* @param string $name The property name (one of the property definition)
* @param mixed $value The property value
* @throws \OutOfBoundsException
*/
public function set($name, $value)
{
if (!array_key_exists($name, $this->prop_types)) {
throw new \OutOfBoundsException(sprintf(
'No "%s" property',
$name
));
}
$this->properties[$name] = $value;
}
/**
* Given the raw bytes containing the property-flags and
* property-list from a content-frame-header, parse and insert
* into a dictionary stored in this object as an attribute named
* 'properties'.
*
* @param AMQPReader $reader
* NOTE: do not mutate $reader
* @return $this
*/
public function load_properties(AMQPReader $reader)
{
// Read 16-bit shorts until we get one with a low bit set to zero
$flags = array();
while (true) {
$flag_bits = $reader->read_short();
$flags[] = $flag_bits;
if (($flag_bits & 1) === 0) {
break;
}
}
$shift = 0;
$data = array();
foreach ($this->prop_types as $key => $proptype) {
if ($shift === 0) {
if (!$flags) {
break;
}
$flag_bits = array_shift($flags);
$shift = 15;
}
if ($flag_bits & (1 << $shift)) {
$data[$key] = $reader->{'read_' . $proptype}();
}
$shift -= 1;
}
$this->properties = $data;
return $this;
}
/**
* Serializes the 'properties' attribute (a dictionary) into the
* raw bytes making up a set of property flags and a property
* list, suitable for putting into a content frame header.
*
* @return string
* @todo Inject the AMQPWriter to make the method easier to test
*/
public function serialize_properties()
{
if (!empty($this->serialized_properties)) {
return $this->serialized_properties;
}
$shift = 15;
$flag_bits = 0;
$flags = array();
$raw_bytes = new AMQPWriter();
foreach ($this->prop_types as $key => $prototype) {
$val = isset($this->properties[$key]) ? $this->properties[$key] : null;
// Very important: PHP type eval is weak, use the === to test the
// value content. Zero or false value should not be removed
if ($val === null) {
$shift -= 1;
continue;
}
if ($shift === 0) {
$flags[] = $flag_bits;
$flag_bits = 0;
$shift = 15;
}
$flag_bits |= (1 << $shift);
if ($prototype != 'bit') {
$raw_bytes->{'write_' . $prototype}($val);
}
$shift -= 1;
}
$flags[] = $flag_bits;
$result = new AMQPWriter();
foreach ($flags as $flag_bits) {
$result->write_short($flag_bits);
}
$result->write($raw_bytes->getvalue());
$this->serialized_properties = $result->getvalue();
return $this->serialized_properties;
}
}
<?php
namespace PhpAmqpLib\Wire\IO;
abstract class AbstractIO
{
/**
* @param int $n
* @return mixed
*/
abstract public function read($n);
/**
* @param string $data
* @return mixed
*/
abstract public function write($data);
/**
* @return mixed
*/
abstract public function close();
/**
* @param int $sec
* @param int $usec
* @return mixed
*/
abstract public function select($sec, $usec);
/**
* @return mixed
*/
abstract public function connect();
/**
* @return mixed
*/
abstract public function reconnect();
/**
* @return mixed
*/
abstract public function getSocket();
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Helper\MiscHelper;
class SocketIO extends AbstractIO
{
/** @var string */
protected $host;
/** @var int */
protected $port;
/** @var float */
protected $timeout;
/** @var resource */
private $sock;
/** @var bool */
private $keepalive;
/**
* @param string $host
* @param int $port
* @param float $timeout
* @param bool $keepalive
*/
public function __construct($host, $port, $timeout, $keepalive = false)
{
$this->host = $host;
$this->port = $port;
$this->timeout = $timeout;
$this->keepalive = $keepalive;
}
/**
* Sets up the socket connection
*
* @throws \Exception
*/
public function connect()
{
$this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->timeout);
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
if (!socket_connect($this->sock, $this->host, $this->port)) {
$errno = socket_last_error($this->sock);
$errstr = socket_strerror($errno);
throw new AMQPIOException(sprintf(
'Error Connecting to server (%s): %s',
$errno,
$errstr
), $errno);
}
socket_set_block($this->sock);
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
if ($this->keepalive) {
$this->enable_keepalive();
}
}
/**
* @return resource
*/
public function getSocket()
{
return $this->sock;
}
/**
* Reconnects the socket
*/
public function reconnect()
{
$this->close();
$this->connect();
}
/**
* @param int $n
* @return mixed|string
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function read($n)
{
$res = '';
$read = 0;
$buf = socket_read($this->sock, $n);
while ($read < $n && $buf !== '' && $buf !== false) {
// Null sockets are invalid, throw exception
if (is_null($this->sock)) {
throw new AMQPRuntimeException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$read += mb_strlen($buf, 'ASCII');
$res .= $buf;
$buf = socket_read($this->sock, $n - $read);
}
if (mb_strlen($res, 'ASCII') != $n) {
throw new AMQPIOException(sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($res, 'ASCII'),
$n
));
}
return $res;
}
/**
* @param string $data
* @return mixed|void
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function write($data)
{
$len = mb_strlen($data, 'ASCII');
while (true) {
// Null sockets are invalid, throw exception
if (is_null($this->sock)) {
throw new AMQPRuntimeException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$sent = socket_write($this->sock, $data, $len);
if ($sent === false) {
throw new AMQPIOException(sprintf(
'Error sending data. Last SocketError: %s',
socket_strerror(socket_last_error())
));
}
// Check if the entire message has been sent
if ($sent < $len) {
// If not sent the entire message.
// Get the part of the message that has not yet been sent as message
$data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
// Get the length of the not sent part
$len -= $sent;
} else {
break;
}
}
}
public function close()
{
if (is_resource($this->sock)) {
socket_close($this->sock);
}
$this->sock = null;
}
/**
* @param int $sec
* @param int $usec
* @return int|mixed
*/
public function select($sec, $usec)
{
$read = array($this->sock);
$write = null;
$except = null;
return socket_select($read, $write, $except, $sec, $usec);
}
/**
* @throws \PhpAmqpLib\Exception\AMQPIOException
*/
protected function enable_keepalive()
{
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
}
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
}
}
{
"name": "php-amqplib/php-amqplib",
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"type": "library",
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"keywords": ["rabbitmq", "message", "queue"],
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "John Kelly",
"email": "johnmkelly86@gmail.com",
"role": "Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
}
],
"require": {
"php": ">=5.3.0",
"ext-bcmath": "*",
"ext-mbstring": "*"
},
"require-dev": {
"phpunit/phpunit": "^4.8",
"scrutinizer/ocular": "^1.1",
"squizlabs/php_codesniffer": "^2.5"
},
"suggest": {
"ext-sockets": "Use AMQPSocketConnection"
},
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"autoload-dev": {
"psr-4": {
"PhpAmqpLib\\Tests\\Functional\\": "tests/Functional",
"PhpAmqpLib\\Tests\\Unit\\": "tests/Unit"
}
},
"license": "LGPL-2.1",
"extra": {
"branch-alias": {
"dev-master": "2.7-dev"
}
}
}
/vendor
composer.lock
\ No newline at end of file
RabbitMQ Queue driver for Laravel
======================
[![Latest Stable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/stable)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) [![Total Downloads](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/downloads)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) [![Latest Unstable Version](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/v/unstable)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq) [![License](https://poser.pugx.org/vladimir-yuldashev/laravel-queue-rabbitmq/license)](https://packagist.org/packages/vladimir-yuldashev/laravel-queue-rabbitmq)
####Installation
Require this package in your composer.json and run composer update (IMPORTANT! DO NOT USE "dev-master"):
"vladimir-yuldashev/laravel-queue-rabbitmq": "5.2"
After composer update is finished you need to add ServiceProvider to your `providers` array in `app.php`:
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
Add these lines to your `app/config/queue.php` file to `connections` array:
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE'), // name of the default queue,
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange
'queue_params' => [
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
],
'exchange_params' => [
'name' => env('RABBITMQ_EXCHANGE_NAME', null),
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
],
],
And add these properties to `.env` with proper values:
QUEUE_DRIVER=rabbitmq
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_QUEUE=queue_name
You can also find full examples in src/examples folder.
####Usage
Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues
####PHPUnit
Unit tests will be provided soon.
####Contribution
You can contribute to this package by discovering bugs and opening issues. Enjoy!
####Supported versions of Laravel
4.0, 4.1, 4.2, 5.0, 5.1, 5.2
The version is being matched by the release tag of this library.
{
"name": "vladimir-yuldashev/laravel-queue-rabbitmq",
"version": "5.2",
"description": "RabbitMQ driver for Laravel Queue",
"license": "MIT",
"authors": [
{
"name": "Vladimir Yuldashev",
"email": "misterio92@gmail.com"
}
],
"require": {
"php": ">=5.5.9",
"illuminate/support": "5.2.*",
"illuminate/queue": "5.2.*",
"php-amqplib/php-amqplib": "2.6.*"
},
"autoload": {
"psr-0": {
"VladimirYuldashev\\LaravelQueueRabbitMQ": "src/"
}
},
"minimum-stability": "stable"
}
<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
backupStaticAttributes="false"
bootstrap="vendor/autoload.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
syntaxCheck="false"
>
<testsuites>
<testsuite name="Package Test Suite">
<directory suffix=".php">./tests/</directory>
</testsuite>
</testsuites>
</phpunit>
\ No newline at end of file
<?php
namespace VladimirYuldashev\LaravelQueueRabbitMQ;
use Illuminate\Support\ServiceProvider;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;
class LaravelQueueRabbitMQServiceProvider extends ServiceProvider
{
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->mergeConfigFrom(
__DIR__.'/../../config/rabbitmq.php', 'queue.connections.rabbitmq'
);
}
/**
* Register the application's event listeners.
*
* @return void
*/
public function boot()
{
/**
* @var \Illuminate\Queue\QueueManager
*/
$manager = $this->app['queue'];
$manager->addConnector('rabbitmq', function () {
return new RabbitMQConnector;
});
}
}
<?php
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors;
use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
class RabbitMQConnector implements ConnectorInterface
{
/**
* Establish a queue connection.
*
* @param array $config
*
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
// create connection with AMQP
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['login'], $config['password'],
$config['vhost']);
return new RabbitMQQueue(
$connection,
$config
);
}
}
<?php
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue;
class RabbitMQJob extends Job implements JobContract
{
/**
* Same as RabbitMQQueue, used for attempt counts.
*/
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
protected $connection;
protected $channel;
protected $queue;
protected $message;
/**
* Creates a new instance of RabbitMQJob.
*
* @param Illuminate\Container\Container $container
* @param VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue $connection
* @param PhpAmqpLib\Channel\AMQPChannel $channel
* @param string $queue
* @param PhpAmqpLib\Message\AMQPMessage $message
*/
public function __construct(
Container $container,
RabbitMQQueue $connection,
AMQPChannel $channel,
$queue,
AMQPMessage $message
) {
$this->container = $container;
$this->connection = $connection;
$this->channel = $channel;
$this->queue = $queue;
$this->message = $message;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire($this->getParsedBody());
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->message->body;
}
/**
* Retrieves the parsed body for the job.
*
* @return array|false
*/
public function getParsedBody()
{
return json_decode($this->getRawBody(), true);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->channel->basic_ack($this->message->get('delivery_tag'));
}
/**
* Get the queue name.
*
* @return string
*/
public function getQueue()
{
return $this->queue;
}
/**
* Release the job back into the queue.
*
* @param int $delay
*
* @return void
*/
public function release($delay = 0)
{
$this->delete();
$this->setAttempts($this->attempts() + 1);
$body = $this->getParsedBody();
/*
* Some jobs don't have the command set, so fall back to just sending it the job name string
*/
if (isset($body['data']['command']) === true) {
$job = unserialize($body['data']['command']);
} else {
$job = $this->getName();
}
$data = $body['data'];
if ($delay > 0) {
$this->connection->later($delay, $job, $data, $this->getQueue());
} else {
$this->connection->push($job, $data, $this->getQueue());
}
}
/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
if ($this->message->has('application_headers') === true) {
$headers = $this->message->get('application_headers')->getNativeData();
if (isset($headers[self::ATTEMPT_COUNT_HEADERS_KEY]) === true) {
return $headers[self::ATTEMPT_COUNT_HEADERS_KEY];
}
}
return 0;
}
/**
* Sets the count of attempts at processing this job.
*
* @param int $count
*
* @return void
*/
private function setAttempts($count)
{
$this->connection->setAttempts($count);
}
/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->message->get('correlation_id');
}
/**
* Sets the job identifier.
*
* @param string $id
*
* @return void
*/
public function setJobId($id)
{
$this->connection->setCorrelationId($id);
}
}
<?php
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue;
use DateTime;
use ErrorException;
use Exception;
use Log;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;
class RabbitMQQueue extends Queue implements QueueContract
{
/**
* Used for retry logic, to set the retries on the message metadata instead of the message body.
*/
const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
protected $connection;
protected $channel;
protected $declareExchange;
protected $declaredExchanges = [];
protected $declareBindQueue;
protected $declaredQueues = [];
protected $defaultQueue;
protected $configQueue;
protected $configExchange;
protected $sleepOnError;
/**
* @var int
*/
private $attempts;
/**
* @var string
*/
private $correlationId;
/**
* @param AMQPStreamConnection $amqpConnection
* @param array $config
*/
public function __construct(AMQPStreamConnection $amqpConnection, $config)
{
$this->connection = $amqpConnection;
$this->defaultQueue = $config['queue'];
$this->configQueue = $config['queue_params'];
$this->configExchange = $config['exchange_params'];
$this->declareExchange = $config['exchange_declare'];
$this->declareBindQueue = $config['queue_declare_bind'];
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
$this->channel = $this->getChannel();
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
*
* @return bool
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue, []);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
*
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$queue = $this->getQueueName($queue);
try {
$this->declareQueue($queue);
if (isset($options['delay']) && $options['delay'] > 0) {
list($queue, $exchange) = $this->declareDelayedQueue($queue, $options['delay']);
} else {
list($queue, $exchange) = $this->declareQueue($queue);
}
$headers = [
'Content-Type' => 'application/json',
'delivery_mode' => 2,
];
if (isset($this->attempts) === true) {
$headers['application_headers'] = [self::ATTEMPT_COUNT_HEADERS_KEY => ['I', $this->attempts]];
}
// push job to a queue
$message = new AMQPMessage($payload, $headers);
$correlationId = $this->getCorrelationId();
$message->set('correlation_id', $correlationId);
// push task to a queue
$this->channel->basic_publish($message, $exchange, $queue);
return $correlationId;
} catch (ErrorException $exception) {
$this->reportConnectionError('pushRaw', $exception);
}
return null;
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
*
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue, ['delay' => $delay]);
}
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
*
* @return \Illuminate\Queue\Jobs\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueueName($queue);
try {
// declare queue if not exists
$this->declareQueue($queue);
// get envelope
$message = $this->channel->basic_get($queue);
if ($message instanceof AMQPMessage) {
return new RabbitMQJob($this->container, $this, $this->channel, $queue, $message);
}
} catch (ErrorException $exception) {
$this->reportConnectionError('pop', $exception);
}
return null;
}
/**
* @param string $queue
*
* @return string
*/
private function getQueueName($queue)
{
return $queue ?: $this->defaultQueue;
}
/**
* @return AMQPChannel
*/
private function getChannel()
{
return $this->connection->channel();
}
/**
* @param $name
* @return array
*/
private function declareQueue($name)
{
$name = $this->getQueueName($name);
$exchange = $this->configExchange['name'] ?: $name;
if ($this->declareExchange && !in_array($exchange, $this->declaredExchanges)) {
$this->declaredExchanges[] = $exchange;
// declare exchange
$this->channel->exchange_declare(
$exchange,
$this->configExchange['type'],
$this->configExchange['passive'],
$this->configExchange['durable'],
$this->configExchange['auto_delete']
);
}
if ($this->declareBindQueue && !in_array($name, $this->declaredQueues)) {
$this->declaredQueues[] = $name;
// declare queue
$this->channel->queue_declare(
$name,
$this->configQueue['passive'],
$this->configQueue['durable'],
$this->configQueue['exclusive'],
$this->configQueue['auto_delete']
);
// bind queue to the exchange
$this->channel->queue_bind($name, $exchange, $name);
}
return [$name, $exchange];
}
/**
* @param string $destination
* @param DateTime|int $delay
*
* @return string
*/
private function declareDelayedQueue($destination, $delay)
{
$delay = $this->getSeconds($delay);
$destination = $this->getQueueName($destination);
$destinationExchange = $this->configExchange['name'] ?: $destination;
$name = $this->getQueueName($destination) . '_deferred_' . $delay;
$exchange = $this->configExchange['name'] ?: $destination;
// declare exchange
$this->channel->exchange_declare(
$exchange,
$this->configExchange['type'],
$this->configExchange['passive'],
$this->configExchange['durable'],
$this->configExchange['auto_delete']
);
// declare queue
$this->channel->queue_declare(
$name,
$this->configQueue['passive'],
$this->configQueue['durable'],
$this->configQueue['exclusive'],
$this->configQueue['auto_delete'],
false,
new AMQPTable([
'x-dead-letter-exchange' => $destinationExchange,
'x-dead-letter-routing-key' => $destination,
'x-message-ttl' => $delay * 1000,
])
);
// bind queue to the exchange
$this->channel->queue_bind($name, $exchange, $name);
return [$name, $exchange];
}
/**
* Sets the attempts member variable to be used in message generation.
*
* @param int $count
*
* @return void
*/
public function setAttempts($count)
{
$this->attempts = $count;
}
/**
* Sets the correlation id for a message to be published.
*
* @param string $id
*
* @return void
*/
public function setCorrelationId($id)
{
$this->correlationId = $id;
}
/**
* Retrieves the correlation id, or a unique id.
*
* @return string
*/
public function getCorrelationId()
{
return $this->correlationId ?: uniqid();
}
/**
* @param string $action
* @param Exception $e
*/
private function reportConnectionError($action, Exception $e)
{
Log::error('AMQP error while attempting ' . $action . ': ' . $e->getMessage());
// Sleep so that we don't flood the log file
sleep($this->sleepOnError);
}
}
<?php
/**
* default configuration for laravel-queue-rabbitmq merged with project config to base key 'queue'.
* @see \MapleSyrupGroup\AMQPEvents\Providers\AMQPEventServiceProvider
*/
return [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE'), // name of the default queue,
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange
'queue_params' => [
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
],
'exchange_params' => [
'name' => env('RABBITMQ_EXCHANGE_NAME', null),
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
],
'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5), // the number of seconds to sleep if there's an error communicating with rabbitmq
];
<?php
return [
'providers' => append_config([
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
]),
];
<?php
return [
/*
|--------------------------------------------------------------------------
| Default Queue Driver
|--------------------------------------------------------------------------
|
| The Laravel queue API supports a variety of back-ends via an unified
| API, giving you convenient access to each back-end using the same
| syntax for each one. Here you may set the default queue driver.
|
| Supported: "null", "sync", "database", "beanstalkd",
| "sqs", "iron", "redis"
|
*/
'default' => env('QUEUE_DRIVER', 'sync'),
/*
|--------------------------------------------------------------------------
| Queue Connections
|--------------------------------------------------------------------------
|
| Here you may configure the connection information for each server that
| is used by your application. A default configuration has been added
| for each back-end shipped with Laravel. You are free to add more.
|
*/
'connections' => [
'sync' => [
'driver' => 'sync',
],
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],
'beanstalkd' => [
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
'ttr' => 60,
],
'sqs' => [
'driver' => 'sqs',
'key' => 'your-public-key',
'secret' => 'your-secret-key',
'queue' => 'your-queue-url',
'region' => 'us-east-1',
],
'iron' => [
'driver' => 'iron',
'host' => 'mq-aws-us-east-1.iron.io',
'token' => 'your-token',
'project' => 'your-project-id',
'queue' => 'your-queue-name',
'encrypt' => true,
],
'redis' => [
'driver' => 'redis',
'queue' => 'default',
'expire' => 60,
],
'rabbitmq' => [
'driver' => 'rabbitmq',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE'), // name of the default queue,
'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange
'queue_params' => [
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
],
'exchange_params' => [
'name' => env('RABBITMQ_EXCHANGE_NAME', null),
'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
],
],
],
/*
|--------------------------------------------------------------------------
| Failed Queue Jobs
|--------------------------------------------------------------------------
|
| These options configure the behavior of failed queue job logging so you
| can control which database and table are used to store the jobs that
| have failed. You may change them to any database / table you wish.
|
*/
'failed' => [
'database' => 'mysql', 'table' => 'failed_jobs',
],
];
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment