ó
O'—^c           @@  s  d  Z  d d l m Z d d l Z d d l m Z d d l m Z d d l m	 Z	 d d l
 m Z m Z d d	 l m Z m Z m Z d d
 l m Z d d l m Z m Z d d l m Z m Z d d l m Z m Z d d d d g Z d e f d „  ƒ  YZ d e f d „  ƒ  YZ d S(   sC   
kombu.messaging
===============

Sending and receiving messages.

i    (   t   absolute_importN(   t   counti   (   t   maybe_declare(   t   compress(   t   maybe_channelt   is_connection(   t   Exchanget   Queuet   DELIVERY_MODES(   t   ContentDisallowed(   t   text_tt   values(   t   dumpst   prepare_accept_content(   t   ChannelPromiset
   maybe_listR   R   t   Producert   Consumerc           B@  s4  e  Z d  Z d Z d Z d Z d Z e Z	 d Z
 d Z d d d d d d d „ Z d „  Z d „  Z d „  Z d „  Z e d „ Z d d e e d d d d d d d e d g  d d	 „ Z d
 „  Z d „  Z d „  Z e e e ƒ Z d „  Z d „  Z d „  Z d „  Z e Z d d d d d d „ Z e d „  ƒ Z RS(   s  Message Producer.

    :param channel: Connection or channel.
    :keyword exchange: Optional default exchange.
    :keyword routing_key: Optional default routing key.
    :keyword serializer: Default serializer. Default is `"json"`.
    :keyword compression: Default compression method. Default is no
        compression.
    :keyword auto_declare: Automatically declare the default exchange
      at instantiation. Default is :const:`True`.
    :keyword on_return: Callback to call for undeliverable messages,
        when the `mandatory` or `immediate` arguments to
        :meth:`publish` is used. This callback needs the following
        signature: `(exception, exchange, routing_key, message)`.
        Note that the producer needs to drain events to use this feature.

    t    c         C@  s¼   | |  _  | |  _ | p |  j |  _ | p0 |  j |  _ | pB |  j |  _ | pT |  j |  _ d  |  _ |  j d  k r„ t d ƒ |  _ n  | d  k	 rœ | |  _	 n  |  j  r¸ |  j
 |  j  ƒ n  d  S(   NR   (   t   _channelt   exchanget   routing_keyt
   serializert   compressiont	   on_returnt   Nonet   _channel_promiseR   t   auto_declaret   revive(   t   selft   channelR   R   R   R   R   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   __init__E   s    				c         C@  s   d j  |  ƒ S(   Ns   <Producer: {0._channel}>(   t   format(   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   __repr__W   s    c         C@  s   |  j  |  j ƒ  f S(   N(   t	   __class__t   __reduce_args__(   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt
   __reduce__Z   s    c         C@  s%   d  |  j |  j |  j |  j |  j f S(   N(   R   R   R   R   R   R   (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR#   ]   s    c         C@  s    |  j  j r |  j  j ƒ  n  d S(   s€   Declare the exchange.

        This happens automatically at instantiation if
        :attr:`auto_declare` is enabled.

        N(   R   t   namet   declare(   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR&   a   s    c         K@  s    | r t  | |  j | |  Sd S(   sT   Declare the exchange if it hasn't already been declared
        during this session.N(   R   R   (   R   t   entityt   retryt   retry_policy(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR   k   s    i    c         K@  s|  |
 d k r i  n |
 }
 | d k r* i  n | } | d k rE |  j n | } | d k r` |  j n | } | pr |  j } t | t ƒ rŸ | p | j } | j } n | p® |  j j } t | t j	 ƒ sÐ t
 | } n  | | d <| d k	 rt t | d ƒ ƒ | d <n  |  j | |	 | | | |
 ƒ \ } } } |  j } | rT|  j j |  | |  } n  | | | | | |
 | | | | | | ƒ S(   s÷  Publish message to the specified exchange.

        :param body: Message body.
        :keyword routing_key: Message routing key.
        :keyword delivery_mode: See :attr:`delivery_mode`.
        :keyword mandatory: Currently not supported.
        :keyword immediate: Currently not supported.
        :keyword priority: Message priority. A number between 0 and 9.
        :keyword content_type: Content type. Default is auto-detect.
        :keyword content_encoding: Content encoding. Default is auto-detect.
        :keyword serializer: Serializer to use. Default is auto-detect.
        :keyword compression: Compression method to use.  Default is none.
        :keyword headers: Mapping of arbitrary headers to pass along
          with the message body.
        :keyword exchange: Override the exchange.  Note that this exchange
          must have been declared.
        :keyword declare: Optional list of required entities that must
            have been declared before publishing the message.  The entities
            will be declared using :func:`~kombu.common.maybe_declare`.
        :keyword retry: Retry publishing, or declaring entities if the
            connection is lost.
        :keyword retry_policy: Retry configuration, this is the keywords
            supported by :meth:`~kombu.Connection.ensure`.
        :keyword expiration: A TTL in seconds can be specified per message.
            Default is no expiration.
        :keyword \*\*properties: Additional message properties, see AMQP spec.

        t   delivery_modeiè  t
   expirationN(   R   R   R   R   t
   isinstanceR   R*   R%   t   numberst   IntegralR   t   strt   intt   _preparet   _publisht
   connectiont   ensure(   R   t   bodyR   R*   t	   mandatoryt	   immediatet   priorityt   content_typet   content_encodingR   t   headersR   R   R(   R)   R&   R+   t
   propertiest   publish(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR=   q   s.    !
		c      
   C@  s{   |  j  } | j | | | | | | ƒ } | rV |  j } g  | D] } | | ƒ ^ q= n  | j | d |
 d | d | d |	 ƒS(   NR   R   R6   R7   (   R   t   prepare_messageR   t   basic_publish(   R   R5   R8   R9   R:   R;   R<   R   R6   R7   R   R&   R   t   messageR   R'   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR2   ®   s    			 c         C@  sb   |  j  } t | t ƒ r^ | ƒ  } |  _  |  j j | ƒ |  j r^ | j d j |  j ƒ q^ n  | S(   Nt   basic_return(   R   R,   R   R   R   R   t   eventst   add(   R   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   _get_channel¿   s    		c         C@  s   | |  _  d  S(   N(   R   (   R   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   _set_channelÈ   s    c         @  s»   t  | ƒ r3 | ‰  ˆ  |  _ t ‡  f d †  ƒ } n  t | t ƒ r` | |  _ |  j | ƒ |  _ nA | |  _ |  j r |  j j d j |  j ƒ n  |  j | ƒ |  _ |  j	 r· |  j
 ƒ  n  d S(   s*   Revive the producer after connection loss.c           @  s   ˆ  j  S(   N(   t   default_channel(    (   R3   (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   <lambda>Ñ   R   RA   N(   R   t   __connection__R   R,   R   R   R   RB   RC   R   R&   (   R   R   (    (   R3   s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR   Ì   s    					c         C@  s   |  S(   N(    (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt	   __enter__à   s    c         G@  s   |  j  ƒ  d  S(   N(   t   release(   R   t   exc_info(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   __exit__ã   s    c         C@  s   d  S(   N(    (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRJ   æ   s    c         C@  s¡   | s3 | p |  j  } t | d | ƒ\ } } } n? t | t ƒ rc | sQ d } n  | j | ƒ } n | sr d } n  | r” t | | ƒ \ } | d <n  | | | f S(   NR   s   utf-8t   binaryR   (   R   R   R,   R
   t   encodeR   (   R   R5   R   R9   R:   R   R;   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR1   ê   s    		c         C@  s2   y |  j  p |  j j j SWn t k
 r- n Xd  S(   N(   RH   R   R3   t   clientt   AttributeError(   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR3     s    N(    t   __name__t
   __module__t   __doc__R   R   R   R   R   t   TrueR   R   RH   R   R!   R$   R#   R&   t   FalseR   R=   R2   RD   RE   t   propertyR   R   RI   RL   RJ   t   closeR1   R3   (    (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR      sB   					
		9								c        	   B@  sU  e  Z d  Z e Z d Z d Z d Z e Z	 d Z
 d Z d Z d Z e d ƒ Z d d d d d d d d d „ Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d	 „  Z d d
 „ Z d „  Z e Z d „  Z d „  Z d „  Z d „  Z d d e d „ Z  e d „ Z! d „  Z" d e e d „ Z# d d „ Z$ d „  Z% d „  Z& e' d „  ƒ Z( RS(   s_  Message consumer.

    :param channel: see :attr:`channel`.
    :param queues: see :attr:`queues`.
    :keyword no_ack: see :attr:`no_ack`.
    :keyword auto_declare: see :attr:`auto_declare`
    :keyword callbacks: see :attr:`callbacks`.
    :keyword on_message: See :attr:`on_message`
    :keyword on_decode_error: see :attr:`on_decode_error`.

    i   c
   
      C@  sé   | |  _  | d  k r$ |  j p' g  n | |  _ | d  k rB |  j n | |  _ | d  k rf |  j pi g  n | |  _ | |  _ |	 |  _ i  |  _ | d  k	 r¢ | |  _ n  | d  k	 rº | |  _	 n  t
 | ƒ |  _ |  j  rå |  j |  j  ƒ n  d  S(   N(   R   R   t   queuest   no_ackt	   callbackst
   on_messaget
   tag_prefixt   _active_tagsR   t   on_decode_errorR   t   acceptR   (
   R   R   RX   RY   R   RZ   R^   R[   R_   R\   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR   Z  s    	$					c         C@  s‰   |  j  j ƒ  t | ƒ } |  _ g  t |  j ƒ D] } | |  j ƒ ^ q0 |  _ x |  j D] } | j | ƒ qX W|  j r… |  j ƒ  n  d S(   s&   Revive consumer after connection loss.N(	   R]   t   clearR   R   R   RX   R   R   R&   (   R   R   t   queue(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR   n  s    +	c         C@  s"   x |  j  D] } | j ƒ  q
 Wd S(   sŽ   Declare queues, exchanges and bindings.

        This is done automatically at instantiation if :attr:`auto_declare`
        is set.

        N(   RX   R&   (   R   Ra   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR&   z  s    c         C@  s   |  j  j | ƒ d S(   s7  Register a new callback to be called when a message
        is received.

        The signature of the callback needs to accept two arguments:
        `(body, message)`, which is the decoded message body
        and the `Message` instance (a subclass of
        :class:`~kombu.transport.base.Message`.

        N(   RZ   t   append(   R   t   callback(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   register_callback„  s    
c         C@  s   |  j  ƒ  |  S(   N(   t   consume(   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRI     s    
c         G@  s&   y |  j  ƒ  Wn t k
 r! n Xd  S(   N(   t   cancelt	   Exception(   R   RK   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRL   ”  s    c         C@  s9   | |  j  ƒ } |  j r% | j ƒ  n  |  j j | ƒ | S(   s±   Add a queue to the list of queues to consume from.

        This will not start consuming from the queue,
        for that you will have to call :meth:`consume` after.

        (   R   R   R&   RX   Rb   (   R   Ra   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt	   add_queueš  s
    	c         K@  s   |  j  t j | |  ƒ S(   sv   This method is deprecated.

        Instead please use::

            consumer.add_queue(Queue.from_dict(d))

        (   Rh   R   t	   from_dict(   R   Ra   t   options(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   add_queue_from_dict§  s    c         C@  s‰   |  j  r… | d k r |  j n | } |  j  d  |  j  d } } x' | D] } |  j | d | d t ƒqF W|  j | d | d t ƒn  d S(   s.  Start consuming messages.

        Can be called multiple times, but note that while it
        will consume from new queues added since the last call,
        it will not cancel consuming from removed queues (
        use :meth:`cancel_by_queue`).

        :param no_ack: See :attr:`no_ack`.

        iÿÿÿÿRY   t   nowaitN(   RX   R   RY   t   _basic_consumeRT   RU   (   R   RY   t   Ht   TRa   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRe   ±  s    	c         C@  sA   |  j  j } x! t |  j ƒ D] } | | ƒ q W|  j j ƒ  d S(   sº   End all active queue consumers.

        This does not affect already delivered messages, but it does
        mean the server will not send any more messages for this consumer.

        N(   R   t   basic_cancelR   R]   R`   (   R   Rf   t   tag(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRf   Ä  s    c         C@  sm   y |  j  j | ƒ } Wn t k
 r) n@ Xg  |  j D] } | j | k r4 | ^ q4 |  j (|  j j | ƒ d S(   s   Cancel consumer by queue name.N(   R]   t   popt   KeyErrorRX   R%   R   Rp   (   R   Ra   Rq   t   q(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   cancel_by_queueÑ  s    /c         C@  s.   | } t  | t ƒ r! | j } n  | |  j k S(   sP   Return :const:`True` if the consumer is currently
        consuming from queue'.(   R,   R   R%   R]   (   R   Ra   R%   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   consuming_fromÛ  s    c         C@  s   t  d „  |  j Dƒ ƒ S(   sš   Purge messages from all queues.

        .. warning::
            This will *delete all ready messages*, there is no
            undo operation.

        c         s@  s   |  ] } | j  ƒ  Vq d  S(   N(   t   purge(   t   .0Ra   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pys	   <genexpr>ë  s    (   t   sumRX   (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRw   ã  s    c         C@  s   |  j  j | ƒ d S(   s“  Enable/disable flow from peer.

        This is a simple flow-control mechanism that a peer can use
        to avoid overflowing its queues or otherwise finding itself
        receiving more messages than it can process.

        The peer that receives a request to stop sending content
        will finish sending the current content (if any), and then wait
        until flow is reactivated.

        N(   R   t   flow(   R   t   active(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRz   í  s    i    c         C@  s   |  j  j | | | ƒ S(   sÃ  Specify quality of service.

        The client can request that messages should be sent in
        advance so that when the client finishes processing a message,
        the following message is already held locally, rather than needing
        to be sent down the channel. Prefetching gives a performance
        improvement.

        The prefetch window is Ignored if the :attr:`no_ack` option is set.

        :param prefetch_size: Specify the prefetch window in octets.
          The server will send a message in advance if it is equal to
          or smaller in size than the available prefetch size (and
          also falls within other prefetch limits). May be set to zero,
          meaning "no specific limit", although other prefetch limits
          may still apply.

        :param prefetch_count: Specify the prefetch window in terms of
          whole messages.

        :param apply_global: Apply new settings globally on all channels.

        (   R   t	   basic_qos(   R   t   prefetch_sizet   prefetch_countt   apply_global(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   qosû  s    c         C@  s   |  j  j d | ƒ S(   s™  Redeliver unacknowledged messages.

        Asks the broker to redeliver all unacknowledged messages
        on the specified channel.

        :keyword requeue: By default the messages will be redelivered
          to the original recipient. With `requeue` set to true, the
          server will attempt to requeue the message, potentially then
          delivering it to an alternative subscriber.

        t   requeue(   R   t   basic_recover(   R   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   recover  s    c         C@  sB   |  j  } | s t d ƒ ‚ n  g  | D] } | | | ƒ ^ q% d S(   s0  Method called when a message is received.

        This dispatches to the registered :attr:`callbacks`.

        :param body: The decoded message body.
        :param message: The `Message` instance.

        :raises NotImplementedError: If no consumer callbacks have been
          registered.

        s$   Consumer does not have any callbacksN(   RZ   t   NotImplementedError(   R   R5   R@   RZ   Rc   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   receive%  s    	c         C@  sY   |  j  j | j ƒ } | d  k rU |  j | | ƒ } | j | |  j d | d | ƒn  | S(   NRY   Rl   (   R]   t   getR%   R   t   _add_tagRe   t   _receive_callback(   R   Ra   t   consumer_tagRY   Rl   Rq   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRm   6  s    c         C@  s8   | p! d j  |  j t |  j ƒ ƒ } | |  j | j <| S(   Ns   {0}{1}(   R    R\   t   nextt   _tagsR]   R%   (   R   Ra   R‰   Rq   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR‡   ?  s    c         C@  sð   |  j  } |  j |  j d  } } } yt t | d d  ƒ } | rM | | ƒ } n  | d  k	 re | | _  n  | j r~ | j |  j ƒ S| rŠ d  n	 | j ƒ  } Wn2 t	 k
 rË } |  j s¸ ‚  n  |  j | | ƒ n! X| rÜ | | ƒ S|  j
 | | ƒ Sd  S(   Nt   message_to_python(   R_   R[   R   R   t   getattrt   errorst   _reraise_errorR^   t   decodeRg   R…   (   R   R@   R_   t   on_mR   t   decodedt   m2pt   exc(    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyRˆ   E  s     			c         C@  s   d j  |  ƒ S(   Ns   <Consumer: {0.queues}>(   R    (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR!   X  s    c         C@  s)   y |  j  j j SWn t k
 r$ n Xd  S(   N(   R   R3   RO   RP   (   R   (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR3   [  s    N()   RQ   RR   RS   R	   R   R   RX   RY   RT   R   RZ   R[   R^   R_   R   R‹   R   R   R&   Rd   RI   RL   Rh   Rk   Re   Rf   RW   Ru   Rv   Rw   Rz   RU   R€   Rƒ   R…   Rm   R‡   Rˆ   R!   RV   R3   (    (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyR     sJ   					
					
		
		
				(    RS   t
   __future__R    R-   t	   itertoolsR   t   commonR   R   R   R3   R   R   R'   R   R   R   t
   exceptionsR	   t   fiveR
   R   t   serializationR   R   t   utilsR   R   t   __all__t   objectR   R   (    (    (    s1   /tmp/pip-unpacked-wheel-UAnTfW/kombu/messaging.pyt   <module>   s   ò