
O'^c           @@  s  d  Z  d d l m Z d d l Z d d l Z d d l Z d d l m Z m Z d d l	 Z	 d d l	 m
 Z
 d d l	 m Z d d l	 m Z d d l m Z d d	 l m Z d d
 l m Z d d l m Z d d l m Z m Z m Z d d l m Z d d l m Z m Z d d l  m! Z! m" Z" d d l# m$ Z$ d d l% m& Z& e e'  Z( e) d   e j* D  Z+ d e+ d <d   Z, e- d   e	 j. j/ d  D  Z0 e0 d d f k Z1 d Z2 d e f d     YZ3 d e& j4 f d     YZ4 d  e& j5 f d!     YZ5 d S("   sq  
kombu.transport.SQS
===================

Amazon SQS transport module for Kombu. This package implements an AMQP-like
interface on top of Amazons SQS service, with the goal of being optimized for
high performance and reliability.

The default settings for this module are focused now on high performance in
task queue situations where tasks are small, idempotent and run very fast.

SQS Features supported by this transport:
  Long Polling:
    http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
      sqs-long-polling.html

    Long polling is enabled by setting the `wait_time_seconds` transport
    option to a number > 1. Amazon supports up to 20 seconds. This is
    disabled for now, but will be enabled by default in the near future.

  Batch API Actions:
   http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
     sqs-batch-api.html

    The default behavior of the SQS Channel.drain_events() method is to
    request up to the 'prefetch_count' messages on every request to SQS.
    These messages are stored locally in a deque object and passed back
    to the Transport until the deque is empty, before triggering a new
    API call to Amazon.

    This behavior dramatically speeds up the rate that you can pull tasks
    from SQS when you have short-running tasks (or a large number of workers).

    When a Celery worker has multiple queues to monitor, it will pull down
    up to 'prefetch_count' messages from queueA and work on them all before
    moving on to queueB. If queueB is empty, it will wait up until
    'polling_interval' expires before moving back and checking on queueA.
i    (   t   absolute_importN(   t   loadst   dumps(   t	   exception(   t   sdb(   t   sqs(   t   Domain(   t   SDBConnection(   t   SQSConnection(   t   Message(   t   Emptyt   ranget   text_t(   t
   get_logger(   t   cached_propertyt   uuid(   t   bytes_to_strt   safe_str(   t
   schedulingi   (   t   virtualc         c@  s-   |  ]# } | d  k r t  |  d f Vq d S(   s   -_.i_   N(   t   ord(   t   .0t   c(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pys	   <genexpr>E   s   i-   i.   c         C@  s'   y t  |   SWn t k
 r" |  SXd  S(   N(   t   intt
   ValueError(   t   x(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt	   maybe_intJ   s    c         c@  s   |  ] } t  |  Vq d  S(   N(   R   (   R   t   part(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pys	   <genexpr>O   s    t   .i   i   i
   t   Tablec           B@  s   e  Z d  Z e   Z d   Z d   Z d   Z d   Z d   Z	 d   Z
 d   Z d d e d d	  Z d d
  Z d   Z d   Z d   Z RS(   s<   Amazon SimpleDB domain describing the message routing table.c         C@  s   |  j  d |  S(   s+   Iterator giving all routes for an exchange.s   WHERE exchange = '%s'(   t   select(   t   selft   exchange(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt
   routes_for[   s    c         C@  s&   |  j  |  } | r" |  j |  Sd S(   s   Get binding for queue.N(   t   _get_queue_idt   get_item(   R   t   queuet   qid(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt	   get_queue_   s    c         C@  s?   |  j  |  } | r# | | d f St   } |  j |  | f S(   sT   Get binding item for queue.

        Creates the item if it doesn't exist.

        t   id(   R&   R   t   new_item(   R   R$   t   itemR'   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   create_bindinge   s
    	c         C@  s   | |  j  k r{ |  j |  \ } } | j d | d | p< d d | pH d d | pT d d |  | j   |  j  j |  n  d  S(   NR    t   routing_keyt    t   patternR$   R'   (   t   _already_boundR*   t   updatet   savet   add(   R   R    R+   R-   R$   t   bindingR'   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt
   queue_bindq   s    
c         C@  s9   |  j  j |  |  j |  } | r5 |  j |  n  d S(   s   delete queue by name.N(   R.   t   discardt   _get_queue_itemt   delete_item(   R   R$   R)   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   queue_delete|   s    c         C@  s/   x( |  j  |  D] } |  j | d  q Wd S(   s!   Delete all routes for `exchange`.R'   N(   R!   R6   (   R   R    R)   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   exchange_delete   s    c         C@  s:   x3 t  t f D]% } t j |  | |  } | r | Sq Wd S(   s"   Uses `consistent_read` by default.N(   t   Falset   TrueR   R#   (   R   t	   item_namet   consistent_readR)   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR#      s    R,   c         C@  s,   d |  j  | f } t j |  | | | |  S(   s"   Uses `consistent_read` by default.s   SELECT * FROM `%s` %s(   t   nameR   R   (   R   t   queryt
   next_tokenR<   t	   max_items(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR      s    c         K@  s?   x8 t  t f D]* } x! |  j | d | | D] } | SWq Wd  S(   NR<   (   R9   R:   R   (   R   R>   t   kwargsR   R)   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt
   _try_first   s    c         C@  s    t  t d   |  j   D   S(   Nc         s@  s   |  ] } | d  Vq d S(   R    N(    (   R   t   i(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pys	   <genexpr>   s    (   t   listt   setR   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   get_exchanges   s    c         C@  s   |  j  d |  S(   Ns   WHERE queue = '%s' limit 1(   RB   (   R   R$   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR5      s    c         C@  s!   |  j  |  } | r | d Sd  S(   NR'   (   R5   (   R   R$   R)   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR"      s    N(   t   __name__t
   __module__t   __doc__RE   R.   R!   R&   R*   R3   R7   R8   R#   t   NoneR:   R   RB   RF   R5   R"   (    (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR   V   s   										t   Channelc           B@  s  e  Z e Z d  Z d Z d Z d Z d. Z d. Z	 i  Z
 e   Z d   Z d   Z d   Z d. d  Z d   Z e d	  Z d
   Z d. d d. d  Z d   Z d   Z d   Z d   Z d   Z d   Z d   Z d   Z d d  Z d   Z d   Z  e! d  Z" d   Z# d/ d  Z$ d   Z% d   Z& d    Z' d!   Z( d"   Z) d#   Z* e+ d$    Z, e+ d%    Z- e+ d&    Z. e+ d'    Z/ e+ d(    Z0 e1 d)    Z2 e1 d*    Z3 e1 d+    Z4 e1 d,    Z5 e1 d-    Z6 RS(0   s	   us-east-1i  i    s   kombu%(vhost)sc         O@  sq   t  t |   j | |   |  j j d |  j  } x | D] } | |  j | j <q8 Wt   |  _	 t
 j   |  _ d  S(   Nt   prefix(   t   superRK   t   __init__R   t   get_all_queuest   queue_name_prefixt   _queue_cacheR=   RE   t   _fanout_queuest   collectionst   dequet   _queue_message_cache(   R   t   argsRA   t   queuesR$   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRN      s    c         O@  s8   | r |  j  j |  n  t t |   j | | | |  S(   N(   t   _noack_queuesR1   RM   RK   t   basic_consume(   R   R$   t   no_ackRV   RA   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRY      s    c         C@  sE   | |  j  k r/ |  j | } |  j j |  n  t t |   j |  S(   N(   t
   _consumerst   _tag_to_queueRX   R4   RM   RK   t   basic_cancel(   R   t   consumer_tagR$   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR]      s    c         @  s   |  j  s |  j j   r& t    n  |  j } y | j   SWn t k
 rP n X|  j |  j d | \ }   | j	   f d   | D  y | j   SWn t k
 r t    n Xd S(   ss   Return a single payload message from one of our queues.

        :raises Empty: if no messages available.

        t   timeoutc         3@  s   |  ] } |   f Vq d  S(   N(    (   R   t   r(   R$   (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pys	   <genexpr>   s    N(
   R[   t   qost   can_consumeR
   RU   t   popleftt
   IndexErrort   _pollt   cyclet   extend(   R   R_   t   message_cachet   res(    (   R$   s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   drain_events   s    	c         C@  s"   t  j |  j |  j t  |  _ d S(   s  Reset the consume cycle.

        :returns: a FairCycle object that points to our _get_bulk() method
          rather than the standard _get() method. This allows for multiple
          messages to be returned at once from SQS (based on the prefetch
          limit).

        N(   R   t	   FairCyclet	   _get_bulkt   _active_queuesR
   t   _cycle(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _reset_cycle   s    	c         C@  s   t  t |   j |  S(   s3   Format AMQP queue name into a legal SQS queue name.(   R   R   t	   translate(   R   R=   t   table(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   entity_name   s    c         K@  sa   |  j  |  j |  } y |  j | SWn5 t k
 r\ |  j j | |  j  } |  j | <| SXd S(   s-   Ensure a queue with given name exists in SQS.N(   Rr   RP   RQ   t   KeyErrorR   t   create_queuet   visibility_timeout(   R   R$   RA   t   q(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt
   _new_queue   s    	R,   c         K@  sQ   t  t |   j | | | | |  |  j |  j d k rM |  j j |  n  d  S(   Nt   fanout(   RM   RK   R3   t   typeoft   typeRR   R1   (   R   R$   R    R+   t	   argumentsRA   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR3     s    
c         G@  s    |  j  r |  j j |   n  d S(   sn   Bind ``queue`` to ``exchange`` with routing key.

        Route will be stored in SDB if so enabled.

        N(   t   supports_fanoutRq   R3   (   R   RV   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _queue_bind  s    	c         C@  sW   |  j  rA g  |  j j |  D]! } | d | d | d f ^ q St t |   j |  S(   sT   Get routing table.

        Retrieved from SDB if :attr:`supports_fanout`.

        R+   R-   R$   (   R|   Rq   R!   RM   RK   t	   get_table(   R   R    R`   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR~     s    	5c         C@  s)   |  j  r |  j j   St t |   j   S(   N(   R|   Rq   RF   RM   RK   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRF   (  s    	c         G@  sI   |  j  r |  j j |  n  t t |   j |  |  j j | d  d S(   s   delete queue by name.N(	   R|   Rq   R7   RM   RK   t   _deleteRQ   t   popRJ   (   R   R$   RV   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR   -  s    	c         K@  s9   |  j  r |  j j |  n  t t |   j | |  d S(   s   Delete exchange by name.N(   R|   Rq   R8   RM   RK   (   R   R    RA   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR8   4  s    	c         K@  s5   |  j  r t |  j j |   St t |   j |  S(   s1   Return True if ``queue`` was previously declared.(   R|   t   boolRq   R&   RM   RK   t
   _has_queue(   R   R$   RA   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR   :  s    	c         K@  s<   |  j  |  } t   } | j t |   | j |  d S(   s   Put message onto queue.N(   Rw   R	   t   set_bodyR   t   write(   R   R$   t   messageRA   Rv   t   m(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _put@  s    	c         K@  s8   x1 |  j  j |  D] } |  j | d | |  q Wd S(   s5   Deliver fanout message to all queues in ``exchange``.R$   N(   Rq   R!   R   (   R   R    R   R+   RA   t   route(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _put_fanoutG  s    i   c         C@  sK   |  j  |  } t r: | |  j k r: | j | d |  j S| j |  Sd S(   s|   Retrieve messages from SQS and returns the raw SQS message objects.

        :returns: List of SQS message objects

        t   wait_time_secondsN(   Rw   t   W_LONG_POLLINGRR   t   get_messagesR   (   R   R$   t   countRv   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _get_from_sqsL  s
    c         C@  s^   t  t | j     } | |  j k r7 | j |  n# | d d j i | d 6| d 6 | S(   Nt
   propertiest   delivery_infot   sqs_messaget	   sqs_queue(   R   R   t   get_bodyRX   t   delete_messageR/   (   R   R   t
   queue_nameR$   t   payload(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _message_to_pythonZ  s    c         C@  s5   |  j  |  } g  | D] } |  j | | |  ^ q S(   s  Convert a list of SQS Message objects into Payloads.

        This method handles converting SQS Message objects into
        Payloads, and appropriately updating the queue depending on
        the 'ack' settings for that queue.

        :param messages: A list of SQS Message objects.
        :param queue: String name representing the queue they came from

        :returns: A list of Payload objects

        (   Rw   R   (   R   t   messagesR$   Rv   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _messages_to_pythond  s    c         C@  sz   |  j  j   } | d k r! | n t | d  } | rm |  j | d t | t  } | rm |  j | |  Sn  t    d S(   s+  Try to retrieve multiple messages off ``queue``.

        Where _get() returns a single Payload object, this method returns a
        list of Payload objects. The number of objects returned is determined
        by the total number of messages available in the queue and the
        number of messages that the QoS object allows (based on the
        prefetch_count).

        .. note::
            Ignores QoS limits so caller is responsible for checking
            that we are allowed to consume at least one message from the
            queue.  get_bulk will then ask QoS for an estimate of
            the number of extra messages that we can consume.

        args:
            queue: The queue name (string) to pull from

        returns:
            payloads: A list of payload objects returned
        i   R   N(	   Ra   t   can_consume_max_estimateRJ   t   maxR   t   mint   SQS_MAX_MESSAGESR   R
   (   R   R$   t   max_if_unlimitedt   maxcountR   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRl   t  s    !c         C@  s<   |  j  | d d } | r/ |  j | |  d St    d S(   s/   Try to retrieve a single message off ``queue``.R   i   i    N(   R   R   R
   (   R   R$   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _get  s    R   R   c         C@  s:   x! | D] } | j  j | d   q Wt t |   j |  S(   N(   R   R   RJ   RM   RK   t   _restore(   R   R   t   unwanted_delivery_infot   unwanted_key(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  sb   |  j  j |  j } y | d } Wn t k
 r6 n X| j | d  t t |   j |  d  S(   NR   R   (   Ra   t   getR   Rs   R   RM   RK   t	   basic_ack(   R   t   delivery_tagR   R$   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s   |  j  |  j   S(   s)   Return the number of messages in a queue.(   Rw   R   (   R   R$   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _size  s    c         C@  sT   |  j  |  } d } x. t d  D]  } | | j   7} | s" Pq" q" W| j   | S(   s'   Delete all current messages in a queue.i    i
   (   Rw   R   R   t   clear(   R   R$   Rv   t   sizeRC   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _purge  s    
c         C@  sy   t  t |   j   x_ |  j |  j f D]K } | r& y | j   Wqq t k
 rm } d t |  k rn   qn qq Xq& q& Wd  S(   Ns   can't set attribute(   RM   RK   t   closet   _sqst   _sdbt   AttributeErrort   str(   R   t   connt   exc(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s7   |  j  r3 x' | D] } | j |  j  k r | Sq Wn  d  S(   N(   t   regionR=   (   R   t   regionst   _r(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _get_regioninfo  s    	c      	   C@  s@   |  j  } |  j |  } | d | d | j d | j d | j  S(   NR   t   aws_access_key_idt   aws_secret_access_keyt   port(   t   conninfoR   t   useridt   passwordR   (   R   t   funR   R   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   _aws_connect_to  s    			c         C@  s4   |  j  d  k r- |  j t t  j    |  _  n  |  j  S(   N(   R   RJ   R   R   R   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s4   |  j  d  k r- |  j t t  j    |  _  n  |  j  S(   N(   R   RJ   R   R   R   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  sR   |  j  |  j i |  j j d 6 } |  j j d i | d 6|  j  } | | _ | S(   Nt   vhostt   CreateDomaint
   DomainName(   Rr   t   domain_formatR   t   virtual_hostR   t
   get_objectR   R=   (   R   R=   t   d(    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRq     s    		c         C@  s
   |  j  j S(   N(   t
   connectiont   client(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s   |  j  j j S(   N(   R   R   t   transport_options(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s   |  j  j d  p |  j S(   NRu   (   R   R   t   default_visibility_timeout(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRu     s    c         C@  s   |  j  j d d  S(   NRP   R,   (   R   R   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRP     s    c         C@  s   |  j  j d t  S(   Nt   sdb_persistence(   R   R   R9   (   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR|     s    c         C@  s   |  j  j d  p |  j S(   NR   (   R   R   t   default_region(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    c         C@  s   |  j  j d |  j  S(   NR   (   R   R   t   default_wait_time_seconds(   R   (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s    N(   R   R   (7   RG   RH   R   R   R   R   R   RJ   R   R   RQ   RE   RX   RN   RY   R]   Rj   Ro   t   CHARS_REPLACE_TABLERr   Rw   R3   R}   R~   RF   R   R8   R   R   R   R   R   R   R   Rl   R   R   R   R   R   R   R   R   t   propertyR   R   Rq   R   R   R   Ru   RP   R|   R   R   (    (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyRK      s`   																
	#			
			
			t	   Transportc           B@  s^   e  Z e Z d  Z d Z d Z e j j	 e
 j e j f Z	 e j j e
 j f Z d Z d Z RS(   i   i    R   N(   RG   RH   RK   t   polling_intervalR   RJ   t   default_portR   R   t   connection_errorsR   t   SQSErrort   sockett   errort   channel_errorst   SQSDecodeErrort   driver_typet   driver_name(    (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyR     s   	(6   RI   t
   __future__R    RS   R   t   stringt   anyjsonR   R   t   botoR   R   R   R   R   t   boto.sdb.domainR   t   boto.sdb.connectionR   t   boto.sqs.connectionR   t   boto.sqs.messageR	   t
   kombu.fiveR
   R   R   t	   kombu.logR   t   kombu.utilsR   R   t   kombu.utils.encodingR   R   t   kombu.transport.virtualR   R,   R   RG   t   loggert   dictt   punctuationR   R   t   tuplet   __version__t   splitt   BOTO_VERSIONR   R   R   RK   R   (    (    (    s5   /tmp/pip-unpacked-wheel-UAnTfW/kombu/transport/SQS.pyt   <module>&   s>   	
	"R f