σ
P'^c           @@  sΣ   d  Z  d d l m Z d d l m Z d d l m Z d d l m Z m	 Z	 d d l
 m Z d d l m Z d d l m Z d	 g Z e e  Z d
   Z d   Z d e f d     YZ d	 e f d     YZ d S(   s	  
celery.contrib.batches
======================

Experimental task class that buffers messages and processes them as a list.

.. warning::

    For this to work you have to set
    :setting:`CELERYD_PREFETCH_MULTIPLIER` to zero, or some value where
    the final multiplied value is higher than ``flush_every``.

    In the future we hope to add the ability to direct batching tasks
    to a channel with different QoS requirements than the task channel.

**Simple Example**

A click counter that flushes the buffer every 100 messages, and every
10 seconds.  Does not do anything with the data, but can easily be modified
to store it in a database.

.. code-block:: python

    # Flush after 100 messages, or 10 seconds.
    @app.task(base=Batches, flush_every=100, flush_interval=10)
    def count_click(requests):
        from collections import Counter
        count = Counter(request.kwargs['url'] for request in requests)
        for url, count in count.items():
            print('>>> Clicks: {0} -> {1}'.format(url, count))


Then you can ask for a click to be counted by doing::

    >>> count_click.delay(url='http://example.com')

**Example returning results**

An interface to the Web of Trust API that flushes the buffer every 100
messages, and every 10 seconds.

.. code-block:: python

    import requests
    from urlparse import urlparse

    from celery.contrib.batches import Batches

    wot_api_target = 'https://api.mywot.com/0.4/public_link_json'

    @app.task(base=Batches, flush_every=100, flush_interval=10)
    def wot_api(requests):
        sig = lambda url: url
        reponses = wot_api_real(
            (sig(*request.args, **request.kwargs) for request in requests)
        )
        # use mark_as_done to manually return response data
        for response, request in zip(reponses, requests):
            app.backend.mark_as_done(request.id, response)


    def wot_api_real(urls):
        domains = [urlparse(url).netloc for url in urls]
        response = requests.get(
            wot_api_target,
            params={'hosts': ('/').join(set(domains)) + '/'}
        )
        return [response.json()[domain] for domain in domains]

Using the API is done as follows::

    >>> wot_api.delay('http://example.com')

.. note::

    If you don't have an ``app`` instance then use the current app proxy
    instead::

        from celery import current_app
        app.backend.mark_as_done(request.id, response)

i    (   t   absolute_import(   t   count(   t   Task(   t   Emptyt   Queue(   t
   get_logger(   t   Request(   t   noopt   Batchesc         c@  s5   |  j  } x% y |   VWq t k
 r, Pq Xq Wd S(   sN  Iterator yielding all immediately available items in a
    :class:`Queue.Queue`.

    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.

    *Examples*

        >>> q = Queue()
        >>> map(q.put, range(4))
        >>> list(consume_queue(q))
        [0, 1, 2, 3]
        >>> list(consume_queue(q))
        []

    N(   t
   get_nowaitR   (   t   queuet   get(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   consume_queuec   s    	c         C@  sn   |  j  d | d |  zF y |  |   } Wn/ t k
 rZ } d  } t j d | d t n XWd  |  j   X| S(   Nt   loglevelt   logfiles	   Error: %rt   exc_info(   t   push_requestt	   Exceptiont   Nonet   loggert   errort   Truet   pop_request(   t   taskt   argsR   R   t   resultt   exc(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   apply_batches_task{   s     t   SimpleRequestc           B@  sJ   e  Z d  Z d Z d Z d Z i  Z d Z d Z	 d   Z
 e d    Z RS(   s   Pickleable request.c         C@  s:   | |  _  | |  _ | |  _ | |  _ | |  _ | |  _ d  S(   N(   t   idt   nameR   t   kwargst   delivery_infot   hostname(   t   selfR   R   R   R   R    R!   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   __init__   s    					c         C@  s+   |  | j  | j | j | j | j | j  S(   N(   R   R   R   R   R    R!   (   t   clst   request(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   from_request€   s    N(    (   t   __name__t
   __module__t   __doc__R   R   R   R   R   R    R!   R#   t   classmethodR&   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyR      s   	c           B@  sV   e  Z e Z d  Z d Z d   Z d   Z d   Z d   Z	 d   Z
 d i  d  Z RS(	   i
   i   c         C@  s1   t    |  _ t d  |  _ d  |  _ d  |  _ d  S(   Ni   (   R   t   _bufferR   t   _countR   t   _treft   _pool(   R"   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyR#   ³   s    	c         C@  s   t  d   d  S(   Ns   must implement run(requests)(   t   NotImplementedError(   R"   t   requests(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   runΉ   s    c      
   @  sy   | j   _ | j  | j  t   | j  | j 	  j j   j	            	 f
 d   } | S(   Nc         @  s     | d | d  d  d  d  d  d |  j  }  |   j d  k rm 	 j  j    _ n  t  j   j s    n  d  S(   Nt   on_ackt   appR!   t   eventsR   t   connection_errorsR    (   R    R-   R   t   call_repeatedlyt   flush_intervalt   nextR,   t   flush_every(   t   messaget   bodyt   ackt   rejectt	   callbackst   kwR%   (
   t   ReqR3   R5   t   eventert   flush_bufferR!   t
   put_bufferR"   R   t   timer(    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   task_message_handlerΖ   s    
(
   t   poolR.   R!   t   event_dispatcherR   R5   RD   R+   t   putt	   _do_flush(   R"   R   R3   t   consumerRE   (    (
   R@   R3   R5   RA   RB   R!   RC   R"   R   RD   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   StrategyΌ   s    					*c         C@  s/   |  j  | g  | D] } t j |  ^ q f  S(   N(   t   apply_bufferR   R&   (   R"   R0   t   r(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   flushΧ   s    c         C@  s’   t  j d  d  } |  j j   rf t t |  j   } | rf t  j d t |   |  j |  qf n  | s t  j d  |  j	 r |  j	 j
   n  d  |  _	 n  d  S(   Ns#   Batches: Wake-up to flush buffer...s   Batches: Buffer complete: %ss,   Batches: Canceling timer: Nothing in buffer.(   R   t   debugR   R+   t   qsizet   listR   t   lenRN   R-   t   cancel(   R"   R0   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyRI   Ϋ   s    	c         @  s―   g  g  f   g  | D] }   | j  j j |  ^ q | rP   t sV   t sV t    f d   }   f d   } |  j j t |  | d d  f d | d   t r¨ | p« t
 S(   Nc         @  s%   g    t  D] } | j   ^ q d  S(   N(   t   Falset   acknowledge(   t   pidt   time_acceptedt   req(   t	   acks_late(    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   on_acceptedξ   s    c         @  s%   g    t  D] } | j   ^ q d  S(   N(   R   RU   (   R   RX   (   RY   (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt	   on_returnρ   s    i    t   accept_callbackt   callback(   R   RY   t   appendR   RT   t   AssertionErrorR.   t   apply_asyncR   R   R   (   R"   R0   R   R   RM   RZ   R[   (    (   RY   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyRL   ι   s    * 	(    (   R'   R(   R   t   abstractR9   R7   R#   R1   RK   RN   RI   RL   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyR   ͺ   s   					N(   R)   t
   __future__R    t	   itertoolsR   t   celery.taskR   t   celery.fiveR   R   t   celery.utils.logR   t   celery.worker.jobR   t   celery.utilsR   t   __all__R'   R   R   R   t   objectR   R   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/batches.pyt   <module>S   s   			#