[![Build Status](https://travis-ci.org/inway/mojo-rabbitmq-client.svg?branch=master)](https://travis-ci.org/inway/mojo-rabbitmq-client) [![MetaCPAN Release](https://badge.fury.io/pl/Mojo-RabbitMQ-Client.svg)](https://metacpan.org/release/Mojo-RabbitMQ-Client)
# NAME

Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client

# SYNOPSIS

    use Mojo::RabbitMQ::Client;

    # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
    my $client = Mojo::RabbitMQ::Client->new(
      url => 'amqp://guest:guest@127.0.0.1:5672/');

    # Catch all client related errors
    $client->catch(sub { warn "Some error caught in client"; });

    # When connection is in Open state, open new channel
    $client->on(
      open => sub {
        my ($client) = @_;

        # Create a new channel with auto-assigned id
        my $channel = Mojo::RabbitMQ::Client::Channel->new();

        $channel->catch(sub { warn "Error on channel received"; });

        $channel->on(
          open => sub {
            my ($channel) = @_;
            $channel->qos(prefetch_count => 1)->deliver;

            # Publish some example message to test_queue
            my $publish = $channel->publish(
              exchange    => 'test',
              routing_key => 'test_queue',
              body        => 'Test message',
              mandatory   => 0,
              immediate   => 0,
              header      => {}
            );
            # Deliver this message to server
            $publish->deliver;

            # Start consuming messages from test_queue
            my $consumer = $channel->consume(queue => 'test_queue');
            $consumer->on(message => sub { say "Got a message" });
            $consumer->deliver;
          }
        );
        $channel->on(close => sub { $log->error('Channel closed') });

        $client->open_channel($channel);
      }
    );

    # Start connection
    $client->connect();

    # Start Mojo::IOLoop if not running already
    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

## CONSUMER

    use Mojo::RabbitMQ::Client;
    my $consumer = Mojo::RabbitMQ::Client->consumer(
      url      => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
      defaults => {
        qos      => {prefetch_count => 1},
        queue    => {durable        => 1},
        consumer => {no_ack         => 0},
      }
    );

    $consumer->catch(sub { die "Some error caught in Consumer" } );
    $consumer->on('success' => sub { say "Consumer ready" });
    $consumer->on(
      'message' => sub {
        my ($consumer, $message) = @_;

        $consumer->channel->ack($message)->deliver;
      }
    );
    $consumer->start();

    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

## PUBLISHER

    use Mojo::RabbitMQ::Client;
    my $publisher = Mojo::RabbitMQ::Client->publisher(
      url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo'
    );

    $publisher->catch(sub { die "Some error caught in Publisher" } );
    $publisher->on('success' => sub { say "Publisher ready" });

    $publisher->publish('plain text');
    $publisher->publish({encode => { to => 'json'}});

    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

# DESCRIPTION

[Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) is a rewrite of [AnyEvent::RabbitMQ](https://metacpan.org/pod/AnyEvent::RabbitMQ) to work on top of [Mojo::IOLoop](https://metacpan.org/pod/Mojo::IOLoop).

# EVENTS

[Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) inherits all events from [Mojo::EventEmitter](https://metacpan.org/pod/Mojo::EventEmitter) and can emit the
following new ones.

## connect

    $client->on(connect => sub {
      my ($client, $stream) = @_;
      ...
    });

Emitted when TCP/IP connection with RabbitMQ server is established.

## open

    $client->on(open => sub {
      my ($client) = @_;
      ...
    });

Emitted AMQP protocol Connection.Open-Ok method is received.

## close

    $client->on(close => sub {
      my ($client) = @_;
      ...
    });

Emitted on reception of Connection.Close-Ok method.

## disconnect

    $client->on(close => sub {
      my ($client) = @_;
      ...
    });

Emitted when TCP/IP connection gets disconnected.

# ATTRIBUTES

[Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) has following attributes.

## tls

    my $tls = $client->tls;
    $client = $client->tls(1)

Force secure connection. Default is disabled (`0`).

## user

    my $user = $client->user;
    $client  = $client->user('guest')

Sets username for authorization, by default it's not defined.

## pass

    my $pass = $client->pass;
    $client  = $client->pass('secret')

Sets user password for authorization, by default it's not defined.

## pass

    my $pass = $client->pass;
    $client  = $client->pass('secret')

Sets user password for authorization, by default it's not defined.

## host

    my $host = $client->host;
    $client  = $client->host('localhost')

Hostname or IP address of RabbitMQ server. Defaults to `localhost`.

## port

    my $port = $client->port;
    $client  = $client->port(1234)

Port on which RabbitMQ server listens for new connections.
Defaults to `5672`, which is standard RabbitMQ server listen port.

## vhost

    my $vhost = $client->vhost;
    $client  = $client->vhost('/')

RabbitMQ virtual server to user. Default is `/`.

## params

    my $params = $client->params;
    $client  = $client->params(Mojo::Parameters->new('verify=1'))

Sets additional parameters for connection. Default is not defined.

For list of supported parameters see ["SUPPORTED QUERY PARAMETERS"](#supported-query-parameters).

## url

    my $url = $client->url;
    $client = $client->url('amqp://...');

Sets all connection parameters in one string, according to specification from
[https://www.rabbitmq.com/uri-spec.html](https://www.rabbitmq.com/uri-spec.html).

    amqp_URI       = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]

    amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]

    amqp_userinfo  = username [ ":" password ]

    username       = *( unreserved / pct-encoded / sub-delims )

    password       = *( unreserved / pct-encoded / sub-delims )

    vhost          = segment

## heartbeat\_timeout

    my $timeout = $client->heartbeat_timeout;
    $client     = $client->heartbeat_timeout(180);

Heartbeats are use to monitor peer reachability in AMQP.
Default value is `60` seconds, if set to `0` no heartbeats will be sent.

## connect\_timeout

    my $timeout = $client->connect_timeout;
    $client     = $client->connect_timeout(5);

Connection timeout used by [Mojo::IOLoop::Client](https://metacpan.org/pod/Mojo::IOLoop::Client).
Defaults to environment variable `MOJO_CONNECT_TIMEOUT` or `10` seconds
if nothing else is set.

## max\_channels

    my $max_channels = $client->max_channels;
    $client          = $client->max_channels(10);

Maximum number of channels allowed to be active. Defaults to `0` which
means no implicit limit.

When you try to call `add_channel` over limit an `error` will be
emitted on channel saying that: _Maximum number of channels reached_.

# STATIC METHODS

## consumer

    my $client = Mojo::RabbitMQ::Client->consumer(...)

Shortcut for creating [Mojo::RabbitMQ::Client::Consumer](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Consumer).

## publisher

    my $client = Mojo::RabbitMQ::Client->publisher(...)

Shortcut for creating [Mojo::RabbitMQ::Client::Publisher](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Publisher).

# METHODS

[Mojo::RabbitMQ::Client](https://metacpan.org/pod/Mojo::RabbitMQ::Client) inherits all methods from [Mojo::EventEmitter](https://metacpan.org/pod/Mojo::EventEmitter) and implements
the following new ones.

## connect

    $client->connect();

Tries to connect to RabbitMQ server and negotiate AMQP protocol.

## close

    $client->close();

## param

    my $param = $client->param('name');
    $client   = $client->param(name => 'value');

## add\_channel

    my $channel = Mojo::RabbitMQ::Client::Channel->new();
    ...
    $channel    = $client->add_channel($channel);
    $channel->open;

## open\_channel

    my $channel = Mojo::RabbitMQ::Client::Channel->new();
    ...
    $client->open_channel($channel);

## delete\_channel

    my $removed = $client->delete_channel($channel->id);

# SUPPORTED QUERY PARAMETERS

There's no formal specification, nevertheless a list of common parameters
recognized by officially supported RabbitMQ clients is maintained here:
[https://www.rabbitmq.com/uri-query-parameters.html](https://www.rabbitmq.com/uri-query-parameters.html).

Some shortcuts are also supported, you'll find them in parenthesis.

Aliases are less significant, so when both are specified only primary
value will be used.

## cacertfile (_ca_)

Path to Certificate Authority file for TLS.

## certfile (_cert_)

Path to the client certificate file for TLS.

## keyfile (_key_)

Path to the client certificate private key file for TLS.

## fail\_if\_no\_peer\_cert (_verify_)

TLS verification mode, defaults to 0x01 on the client-side if a certificate
authority file has been provided, or 0x00 otherwise.

## auth\_mechanism

Currently only AMQPLAIN is supported, **so this parameter is ignored**.

## heartbeat

Sets requested heartbeat timeout, just like `heartbeat_timeout` attribute.

## connection\_timeout (_timeout_)

Sets connection timeout - see [connection\_timeout](https://metacpan.org/pod/connection_timeout) attribute.

## channel\_max

Sets maximum number of channels - see [max\_channels](https://metacpan.org/pod/max_channels) attribute.

# SEE ALSO

[Mojo::RabbitMQ::Client::Channel](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Channel), [Mojo::RabbitMQ::Client::Consumer](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Consumer), [Mojo::RabbitMQ::Client::Publisher](https://metacpan.org/pod/Mojo::RabbitMQ::Client::Publisher)

# COPYRIGHT AND LICENSE

Copyright (C) 2015-2017, Sebastian Podjasek and others

Based on [AnyEvent::RabbitMQ](https://metacpan.org/pod/AnyEvent::RabbitMQ) - Copyright (C) 2010 Masahito Ikuta, maintained by `bobtfish@bobtfish.net`

This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.