
P'^c           @@  s  d  Z  d d l m Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l	 Z	 d d l
 Z
 d d l m Z m Z d d l m Z d d l m Z d d l
 m Z d d l m Z m Z d d	 l m Z d d
 l m Z m Z m Z m Z m Z d d l m Z  d d l! m" Z" m# Z# m$ Z$ d d l% m& Z& d d l' m( Z( d d l) m* Z* m+ Z+ m, Z, d d l- m Z. d d l/ m0 Z0 d d l1 m2 Z2 d d l3 m4 Z4 d d l5 m6 Z6 m7 Z7 m8 Z8 m9 Z9 m: Z: d d l; m< Z< d d l= m> Z> d d l? m@ ZA yg d d lB mC ZD d d l mE ZF eG ZG eH ZI e	 jJ d d k rje	 jJ d4 k  rjeF d  ZE n eF ZE Wn; eK eL f k
 re jC d  ZD eM ZI e jN d  ZE n Xe< eO  ZP eP jQ eP jR ZQ ZR eS e jT e jU g  ZV d  ZW d! ZX d" ZY d# ZZ i eY d 6eZ d$ 6Z\ d% Z] e d& d5  Z^ d*   Z_ d+   Z` d d d d d,  Za d-   Zb d. e  jc f d/     YZc d0 e  jd f d1     YZd d2 e  je f d3     YZf d S(6   s  
    celery.concurrency.asynpool
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    .. note::

        This module will be moved soon, so don't use it directly.

    Non-blocking version of :class:`multiprocessing.Pool`.

    This code deals with three major challenges:

        1) Starting up child processes and keeping them running.
        2) Sending jobs to the processes and receiving results back.
        3) Safely shutting down this system.

i    (   t   absolute_importN(   t   dequet
   namedtuple(   t   BytesIO(   t   HIGHEST_PROTOCOL(   t   sleep(   t   WeakValueDictionaryt   ref(   t   promise(   t   RUNt	   TERMINATEt   ACKt   NACKt   WorkersJoined(   t   pool(   t   buf_tt   setblockingt
   isblocking(   t   ExceptionInfo(   t   _SimpleQueue(   t   READt   WRITEt   ERR(   t   pickle(   t   fxrange(   t	   get_errno(   t   SELECT_BAD_FD(   t   Countert   itemst   string_tt   text_tt   values(   t
   get_logger(   t   truncate(   t   state(   t   read(   t   unpack_fromi   i   i   c         C@  s   | |  | j     S(   N(   t   tobytes(   t   fmtt   viewt   _unpack_from(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR$   <   s    c         C@  s;   | |  |  } t  |  } | d k r7 | j |  n  | S(   Ni    (   t   lent   write(   t   fdt   buft   sizeR#   t   chunkt   n(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   __read__D   s
    c         C@  s   | |  | j     S(   N(   t   getvalue(   R&   t   iobuft   unpack(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR$   L   s    i   g      @i   i   t   fairi   t   Ackt   idR+   t   payloadc         C@  s   |  j  o |  j  j d k S(   Ni(   t   gi_framet   f_lasti(   t   gen(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   gen_not_startedh   s    c         C@  s,   y |  j  } Wn t k
 r  n X|   Sd  S(   N(   t   _writert   AttributeError(   t   jobt   writer(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _get_job_writerm   s
    c   	      C@  s  |  d k r t   n |  }  | d k r0 t   n | } | d k rK t   n | } yW t j |  | | |  \ } } } | r t t |  t |  B } n  | | d f SWn t j t j f k
 r} t |  t j k r g  g  d f St |  t	 k rx |  | B| BD] } y t j | g g  g  d  Wq	t j t j f k
 r} t |  t	 k rb  n  |  j
 |  | j
 |  | j
 |  q	Xq	Wg  g  d f S  n Xd S(   s  Simple wrapper to :class:`~select.select`.

    :param readers: Set of reader fds to test if readable.
    :param writers: Set of writer fds to test if writable.
    :param err: Set of fds to test for error condition.

    All fd sets passed must be mutable as this function
    will remove non-working fds from them, this also means
    the caller must make sure there are still fds in the sets
    before calling us again.

    :returns: tuple of ``(readable, writable, again)``, where
        ``readable`` is a set of fds that have data available for read,
        ``writable`` is a set of fds that is ready to be written to
        and ``again`` is a flag that if set means the caller must
        throw away the result and call us again.

    i    i   N(   t   Nonet   sett   selectt   listt   errort   socketR   t   errnot   EINTRR   t   discard(	   t   readerst   writerst   errt   timeoutt   rt   wt   et   excR+   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _selectv   s.    !c         C@  s   y t  |   SWn| t k
 r } y t |   SWnN t k
 r~ t |  t  r y |  j d d d SWq{ t k
 rw q{ Xq n Xd j |  SXd  S(   Ns   utf-8t   errorst   replaces6   <Unrepresentable: {0!r} (o.__repr__ returns unicode?)>(   t   reprt	   ExceptionR   t   UnicodeDecodeErrort
   isinstanceR   t   decodet   format(   t   objt   orig_exc(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _repr_result   s    t   Workerc           B@  s,   e  Z d  Z e Z d   Z e e d  Z RS(   s   Pool worker process.c         C@  s   |  j  j t | f f  d  S(   N(   t   outqt   putt	   WORKER_UP(   t   selft   pid(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_loop_start   s    c         C@  s&   t  | t  s" | t |  |  S| S(   N(   RX   R   R]   (   Rb   t   resultt   maxlenR!   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   prepare_result   s    (	   t   __name__t
   __module__t   __doc__t   Falset   deadRd   t   RESULT_MAXLENR!   Rg   (    (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR^      s   	t   ResultHandlerc           B@  s_   e  Z d  Z d   Z e e e e e j	 d  Z
 d   Z d   Z d   Z d   Z d   Z RS(   s)   Handles messages from the pool processes.c         O@  sQ   | j  d  |  _ | j  d  |  _ t t |   j | |   |  j |  j t <d  S(   Nt   fileno_to_outqt   on_process_alive(   t   popRo   Rp   t   superRn   t   __init__t   state_handlersRa   (   Rb   t   argst   kwargs(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRs      s    c	         c@  s  d }	 }
 | r+ t  d  } t |  } n |   } } x |	 d k  r y* | | | r` | |	 n | d |	  } Wn0 t k
 r } t |  t k r   n  d  Vq; X| d k r |	 r t d  n t    n  |	 | 7}	 q; W| d |  \ } | rt  |  } t |  } n |   } } x |
 | k  ry* | | | rD| |
 n | | |
  } Wn0 t k
 r} t |  t k r  n  d  VqX| d k r|
 rt d  n t    n  |
 | 7}
 qW| | |  j |  | r| | |   } n | j d  | |  } | r| |  n  d  S(   Ni    i   s   End of file during messages   >i(   t	   bytearrayt
   memoryviewt   OSErrorR   t   UNAVAILt   EOFErrort   handle_eventt   seek(   Rb   t
   add_readerR+   t   callbackR0   t
   readcanbufR   R$   t   loadt   Hrt   BrR,   t   bufvR/   RQ   t	   body_sizet   message(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _recv_message   sR    
'	'	c         @  sL   |  j   |  j  | j   | j  |  j        f d   } | S(   s`   Coroutine that reads messages from the pool processes
        and calls the appropriate handler.c         @  s   y  |  Wn t  k
 r&  |   SX   |    } y t |  Wn4 t k
 rZ n1 t t t f k
 r}  |   n X  |  |  d  S(   N(   t   KeyErrort   nextt   StopIterationt   IOErrorRy   R{   (   t   filenot   it(   R~   Ro   t   on_state_changet   recv_messaget   remove_reader(    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_result_readable  s    (   Ro   R   R~   R   R   (   Rb   t   hubR   (    (   R~   Ro   R   R   R   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _make_process_result  s    					c         C@  s   |  j  |  |  _ d  S(   N(   R   R|   (   Rb   R   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   register_with_event_loop   s    c         C@  s   t  d   d  S(   Ns   Not registered with event loop(   t   RuntimeError(   Rb   R   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR|   #  s    c   	      C@  s   |  j  } |  j } |  j } |  j } |  j } t |  } x | r | r |  j t k r | d k	 rm |   n  t   } xS | D]K } |  j	 | | j
 | |  y | d t  Wq} t k
 r t d  SXq} W| j |  q< Wd S(   sQ   This method is always used to stop when the helper thread is not
        started.t   shutdowns&   result handler: all workers terminatedN(   t   cachet   check_timeoutsRo   R   t   join_exited_workersRB   t   _stateR
   RA   t   _flush_outqueueRI   t   TrueR   t   debugt   difference_update(	   Rb   R   R   Ro   R   R   t	   outqueuest   pending_remove_fdR+   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_stop_not_started&  s&    					
	c      	   C@  s  y | | } Wn t  k
 r( | |  SX| j j } y t | d  Wn t t f k
 rf | |  SXzj y2 | j d  r | j   } n d  } t	 d  Wn t t
 f k
 r | |  SX| r | |  n  Wd  y t | d  Wn t t f k
 r| |  SXXd  S(   Ni   i    g      ?(   R   R_   t   _readerR   Ry   R   t   pollt   recvRA   R   R{   (   Rb   R+   t   removet   process_indexR   t   proct   readert   task(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR   B  s.     (   Rh   Ri   Rj   Rs   R0   R   R   R$   t   _pickleR   R   R   R   R|   R   R   (    (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRn      s   	6				t   AsynPoolc           B@  s  e  Z d  Z e Z e Z e e e d  Z d   Z d   Z	 d   Z
 d   Z d   Z e j d  Z e j d  Z d	   Z d
   Z e e d  Z e j e j e d  Z d   Z d   Z d   Z d   Z d   Z d   Z  d   Z! d   Z" d   Z# d   Z$ d   Z% e& d    Z' d   Z( d   Z) d   Z* d   Z+ d   Z, d   Z- d   Z. e j e j e d   Z/ e0 d!    Z1 d"   Z2 e0 d#    Z3 e4 d$    Z5 RS(%   s5   Pool version that uses AIO instead of helper threads.c         @  sv  t  j | |    _ | d  k r-   j   n | } |   _ t   f d   t |  D    _ i    _	 i    _
 i    _ t   _ t     _ t     _ t     _ t     _ t     _   j j   _ t     _ t     _ t t    j | | |  x1   j D]& } |   j
 | j <|   j | j <qWd    _    _!   j" rr  j" j    _    j" j!   _! n  d  S(   Nc         3@  s!   |  ] }   j    d  f Vq d  S(   N(   t   create_process_queuesRA   (   t   .0t   _(   Rb   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>n  s   (#   t   SCHED_STRATEGIESt   gett   sched_strategyRA   t	   cpu_countt   synackt   dictt   ranget   _queuest   _fileno_to_inqt   _fileno_to_outqt   _fileno_to_synqt   PROC_ALIVE_TIMEOUTt   _proc_alive_timeoutRB   t   _waiting_to_startt   _all_inqueuest   _active_writest   _active_writerst   _busy_workersRI   t   _mark_worker_as_availableR   t   outbound_bufferR   t   write_statsRr   R   Rs   t   _poolt   outqR_fdt   synqW_fdt   on_soft_timeoutt   on_hard_timeoutt   _timeout_handler(   Rb   t	   processesR   R   Ru   Rv   R   (    (   Rb   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRs   g  s4    							c         C@  s    t  j   t t |   j |  S(   N(   t   gct   collectRr   R   t   _create_worker_process(   Rb   t   i(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    
c         C@  s   |  j  | |  |  j   d  S(   N(   t   _untrack_child_processt   maintain_pool(   Rb   R   R   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _event_process_exit  s    c         C@  sZ   y | j  } Wn- t k
 r< t j | j j  } | _  n X| j | |  j | |  d  S(   N(   t   _sentinel_pollR=   t   ost   dupt   _popent   sentinelR~   R   (   Rb   R   R   R+   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _track_child_process  s
     c         C@  sC   | j  d  k	 r? | j  d  } | _  | j |  t j |  n  d  S(   N(   R   RA   R   R   t   close(   Rb   R   R   R+   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    c         C@  s   |  j  j |  |  j  j |  _ |  j |  |  j |  |  j |  g  |  j D] } |  j | |  ^ qP g  |  j	 D] } | j
 | |  j |  ^ qv x- t |  j  D] \ } } | j | |  q W| j j |  j  d S(   s5   Registers the async pool with the current event loop.N(   t   _result_handlerR   R|   t   handle_result_eventt   _create_timelimit_handlerst   _create_process_handlerst   _create_write_handlersR   R   R   R~   R   t   timerst   call_repeatedlyt   on_tickt   addt   on_poll_start(   Rb   R   RO   R+   t   handlert   interval(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    &)c         @  sn    j   t     _     f d   } |  _  f d        _   f d   } |  _ d S(   sO   For async pool this sets up the handlers used
        to implement time limits.c         @  s]   | r1   |  j  |  j | |    |  j <n( | rY   |  j |  j   |  j <n  d  S(   N(   t   _on_soft_timeoutt   _jobt   _on_hard_timeout(   t   Rt   softt   hard(   t
   call_laterR   Rb   t   trefs(    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_timeout_set  s    (c         @  s>   y    j  |   } | j   ~ Wn t t f k
 r9 n Xd  S(   N(   Rq   t   cancelR   R=   (   R>   t   tref(   R   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _discard_tref  s    
c         @  s     |  j   d  S(   N(   R   (   R   (   R   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_timeout_cancel  s    N(   R   R   t   _tref_for_idR   R   R   (   Rb   R   t   nowR   R   (    (   R   R   R   Rb   R   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    				c         C@  s   | r3 | j  |   | | |  j |  |  j | <n  z6 y |  j | } Wn t k
 rZ n X|  j |  Wd  | s |  j |  n  Xd  S(   N(   t   call_atR   R   t   _cacheR   R   R   (   Rb   R>   R   R   R   R   Re   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    ' c         C@  sK   z6 y |  j  | } Wn t k
 r' n X|  j |  Wd  |  j |  Xd  S(   N(   R   R   R   R   (   Rb   R>   Re   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s     c         C@  s   |  j  |  d  S(   N(   R   (   Rb   R>   R   R[   t   inqW_fd(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_job_ready  s    c         @  s   	 j  	 j 	 j     j   j   j   j   j   j   j	   j
 
  j   	  f d        	    f d   } |  _ d d           	 
     f d   } |  _ d S(   s^   For async pool this will create the handlers called
        when a process is up/down and etc.c         @  s   |    }  |  d  k	 r |  j   r |   k r |  j   k sB t    |  j |  k s[ t  |  j  j k ss t  t d |   t j |  j d  n  d  S(   Ns(   Timed out waiting for UP message from %ri	   (	   RA   t	   _is_aliveR   t   AssertionErrorRJ   RE   R   t   killRc   (   R   (   Ro   R   t   waiting_to_start(    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   verify_process_alive  s    	c         @  s   |  j  } xb t   D]T } | j rC | j j  | k rC |  | _ n  | j r | j j  | k r |  | _ q q W|   |  j < j |    t |  j j  s t	    |  j  |  j   j
 |    j  j  t |    d S(   s"   Called when a process has started.N(   R   R   t	   _write_tot   _scheduled_forR   R   R   R_   R   R   R   R   R   R   (   R   t   infdR>   (   R~   R   Ro   R   R   Rb   R   R   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_process_up%  s    	c         S@  s   y |  j    } Wn t t f k
 r* d  SXy' | | | k rQ | j | d   n  Wn t k
 re n$ X| |  | d  k	 r | |  n  | S(   N(   R   R   Ry   Rq   RA   R   (   R[   R   t   indext
   remove_funR   R+   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _remove_from_indexC  s    
c         @  s7  t  |  d d  r d S |     |  j j |     |  j r^   |  j j |   	  n    |  j j |   	 d  j } | r  j |  n  
 j |     j |   
 j	 j |  j
  	 |  j j   |  j j  |  j r |  j j  n  |  j r3
 j	 j |  j   |  j j  n  d S(   s#   Called when a worker process exits.Rl   NR   (   t   getattrRA   R_   R   t   synqR<   t   inqRI   R   R   R   t   synqR_fdR   (   R   R  (   R  t   all_inqueuest   busy_workerst   fileno_to_inqRo   t   fileno_to_synqR   t   process_flush_queuesR   t   remove_writerRb   R   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_process_downX  s.    
			N(   R~   R   R  R   R   R   R   R   R   R   R  R   R   RA   R  (   Rb   R   R   R   R   R  (    (   R  R~   R  R	  R   R
  Ro   R  R   R   R  R   R  Rb   R   R   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s     									
$	0c         @  sw   j  	  j 
  j   j   j   j   j   j   j }  j	   j
   j   j  j    j  | j   j  | j   j j   j   j t k  t j  t j  i  j t d  t 6 j t d  t 6 t j    f d  } |  _  rS       f d   } n      f d   } |  _    	  f d   } |  _ d g       	        f d  }	 |	  _      f d   }
 |
  _    f d         f d	    t! t"        f d
  } |  _# d  
 f d    d S(   sW   For async pool this creates the handlers used to write data to
        child processes.i    c         @  sy   |  j  d  k	 s |  j  k rY |  j sF |  j d  |       d   n  |  j |  j   n |   k ru  j |   n  d  S(   N(   t   _terminatedRA   t   correlation_idt	   _acceptedt   _ackt   _set_terminatedt
   appendleft(   R>   t   _time(   t   getpidt   outboundt   revoked_tasks(    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt	   _put_back  s    	c          @  s    rZ t    t    k  rZ     }  g  |  D]" }  | d  t t Bd t ^ q1 n# g      D] }  |  ^ qg d  S(   Nt   consolidate(   R)   RA   R   R   R   (   t   inactiveR+   (   t   active_writesR  R	  t   difft   hub_addt
   hub_removeR  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s
    -c          @  sc    r< g      D]" }   |  d  t t Bd t ^ q n# g      D] }   |   ^ qI d  S(   NR  (   RA   R   R   R   (   R+   (   R  R  R  R  R  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    3c         @  sp    j  |   yK  |  | k rW  j |  d     j  |    j  |    |   n  Wn t k
 rk n Xd  S(   N(   RI   Rq   RA   R   (   R+   R   (   R  R  R	  R
  R  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_inqueue_close  s    c   
      @  s  t  |   } xt |  D]} |  | d | } |  k rS | d c d 7<q n   r{ |  k r{ | d c d 7<q n  |  k r  |  | d c d 7<q n  y    } Wn3 t k
 r x    D] }  |  q WPn X| j sy  | } | _ Wn. t k
 r8 |  | d c d 7<q n X  | | |  } t |  | _  |  
 |  	 |  y t |  Wn> t	 k
 rqt
 k
 r}	 t |	  t j k r  qqX | |  n  | d c d 7<q Wd  S(   Ni    i   (   R)   R   t
   IndexErrorR  R   R   R   R<   R   R   Ry   R   RG   t   EBADF(
   t	   ready_fdst   curindext   totalR   t   ready_fdR>   t   inqfdR   t   corRQ   (   t
   _write_jobR  t
   add_writerR  R	  R  R
  R  t   is_fair_strategyt   mark_worker_as_busyt   mark_write_fd_as_activet   mark_write_gen_as_activet   pop_messaget   put_back_message(    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   schedule_writes  sP    
	




c         @  sm    |  d  } t  |  }  d |  }  |  d d  } t |  t |  | f | _   |  d  S(   Nt   protocols   >Ii   i    (   R)   R   t   _payload(   t   tupt   bodyR   t   headerR>   (   t   append_messaget   dumpst   get_jobt   packR2  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   send_job  s    c         @  sS   t  d |  |  j | d d |  j   r5 |  j   n    j |   j |  d  S(   Ns"   Process inqueue damaged: %r %r: %rt   exc_infoi   (   RE   t   exitcodeR   t	   terminateR   R  (   R   R+   R>   RQ   (   R   Rb   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_not_recovering"  s    c         3@  s  | j  \ } } } d } zP|  | _ |  j } d } }	 x | d k  r y | | | |  7} Wne t k
 r }
 t |
  t k r   n  | d 7} | d k r  |  | | |
  t    n  d  Vq: Xd } q: Wx |	 | k  rfy |	 | | |	  7}	 Wne t k
 r\}
 t |
  t k r  n  | d 7} | d k rT |  | | |
  t    n  d  Vq Xd } q WWd   |   |  j c d 7<  j |   | j	    Xd  S(   Ni    i   i   id   (
   R3  R   t   send_job_offsetRV   R   Rz   R   R  RI   R<   (   R   R+   R>   R6  R5  R   RS   t   sendt   Hwt   BwRQ   (   R  R  R?  t   write_generator_doneR   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR)  *  sD    		

	

	
c   	      @  sh   t  | |  |   } t   }   | | d | }  |   |  | f | _  | |  d  S(   NR   (   R5   R   Ru   (	   t   responseRc   R>   R+   R   R   t   msgR   R(  (   t
   _write_ackR*  R-  R.  t   precalcRD  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   send_ack]  s    

c         3@  s4  | d \ } } } z y  |  } Wn t  k
 r@ t    n X| j } d } }	 xZ | d k  r y | | | |  7} WqW t k
 r }
 t |
  t k r   n  d  VqW XqW WxZ |	 | k  ry |	 | | |	  7}	 Wq t k
 r	}
 t |
  t k r  n  d  Vq Xq WWd  | r"|   n    j |   Xd  S(   Ni   i    i   (   R   R   t   send_syn_offsetRV   R   Rz   RI   (   R+   t   ackR   R6  R5  R   R   RA  RB  RC  RQ   (   R  R  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRG  i  s2    	

N(   i    (   i    (%   R   R   R   t   popleftt   appendR  R   R   R   R   t
   differenceR*  R   R   RI   R   t   __getitem__R   R   t   SCHED_STRATEGY_FAIRt   worker_statet   revokedR   R  t   _create_payloadR   R   t   timeR  R   R   t   consolidate_callbackt
   _quick_putR   R   RI  RA   (   Rb   R   R:  R8  R2  t   active_writersR  R   R   R1  R;  RI  (    (   RG  R)  R  R*  R  R7  R	  R  R8  R
  R  R9  R  R   R  R  R+  R,  R-  R.  R?  R  R:  R/  RH  R2  R0  R  Rb   RD  R   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR   v  sR    																				$		<G	
	3$
	c         C@  s  |  j  t k r d  Sx- t |  j  D] } | j s# | j   q# q# W|  j r\ |  j j   n  |  j   za|  j  t	 k rt
 d d d d t } i  } x< t |  j  D]+ } t |  } | d  k	 r | | | <q q Wx |  j rt |  j  } x | D] } | j d k rWt |  rWy | | } Wn t k
 r9n X| j   |  j j |  q y | | } Wn t k
 rxq X| j } | j   r |  j | |  q q W|  j   t t |   q Wn  Wd  |  j j   |  j j   |  j j   |  j j   Xd  S(   Ng{Gz?g?t
   repeatlastR)  (   R   R
   R   R   R  t   _cancelR   t   clearR   R	   R   R   R@   RA   R   RD   Rh   R;   R   RI   R   R   t   _flush_writerR   R   R   R   (   Rb   R>   t	   intervalst   owned_byR?   RK   R:   t   job_proc(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   flush  sP    		

	
c         C@  s   t  | j j g  } z x | r | j   s1 Pn  t d | d | d d  \ } } } | r | sh | r y t |  Wq t t t t	 f k
 r Pq Xq q WWd  |  j
 j |  Xd  S(   NRK   RL   RM   g      ?(   RB   R  R<   R   RR   R   R   Ry   R   R{   R   RI   (   Rb   R   R?   t   fdst   readablet   writablet   again(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR[    s    	c         C@  s   t  d   t |  j  D  S(   s   Get queues for a new process.

        Here we will find an unused slot, as there should always
        be one available when we start a new process.
        c         s@  s'   |  ] \ } } | d  k r | Vq d  S(   N(   RA   (   R   t   qt   owner(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>  s    	(   R   R   R   (   Rb   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   get_process_queues  s    c         @  sX   t    j t   j  d  } | rT   j j t   f d   t |  D   n  d S(   s    Grow the pool by ``n`` proceses.i    c         3@  s!   |  ] }   j    d  f Vq d  S(   N(   R   RA   (   R   R   (   Rb   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>  s    N(   t   maxt
   _processesR)   R   t   updateR   R   (   Rb   R/   R  (    (   Rb   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_grow  s    	c         C@  s   d S(   s#   Shrink the pool by ``n`` processes.N(    (   Rb   R/   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt	   on_shrink  s    c         C@  s   t  d t  } t  d t  } d } t | j  s9 t  t | j  sO t  t | j  se t  t | j  sz t  |  j r t  d t  } t | j  s t  t | j  s t  n  | | | f S(   sM   Creates new in, out (and optionally syn) queues,
        returned as a tuple.t	   wnonblockt	   rnonblockN(   R   R   RA   R   R   R   R<   R   (   Rb   R  R_   R  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    	c         @  s   y# t    f d   |  j D  } Wn t k
 rC t j d    SX| j |  j k s\ t  | j |  j k st t  |  j	 j
 |  | |  j | j <| |  j | j <|  j j | j  d S(   s   Handler called when the :const:`WORKER_UP` message is received
        from a child process, which marks the process as ready
        to receive work.c         3@  s$   |  ] } | j    k r | Vq d  S(   N(   Rc   (   R   RO   (   Rc   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>  s    s"   process with pid=%s already exitedN(   R   R   R   t   loggert   warningR   R   R   R   R   RI   R   R   R   (   Rb   Rc   R   (    (   Rc   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRp     s    #c         C@  s\   | j  r/ | j  j   r/ |  j | | j   n) | j rX | j j   rX |  j |  n  d S(   sN   Handler called for each job when the process it was assigned to
        exits.N(   R   R   t   on_partial_readR   R  (   Rb   R>   t   pid_gone(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_job_process_down  s    c         C@  s   |  j  | |  d S(   s   Handler called for each *started* job when the process it
        was assigned to exited by mysterious means (error exitcodes and
        signals)N(   t   mark_as_worker_lost(   Rb   R>   Rc   R=  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   on_job_process_lost!  s    c         @  s   |  j  d  k r d St t |  j    } t |   d     i  d 6   rc  t |  j   n d   d 6d j    f d   | D  d 6d j t t |   d	 6i t |  j	  d 6t |  j
  d
 6d 6S(   Ns   N/Ac         S@  s'   d j  |  r  t |   | d n d  S(   Ns   {0:.2f}%g      Y@i    (   RZ   t   float(   t   vR%  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   per-  s    R%  i    t   avgs   , c         3@  s   |  ] }   |   Vq d  S(   N(    (   R   Rv  (   Rw  R%  (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>3  s    t   allt   rawt   activet   inqueues(   R   RA   RD   R   t   sumR)   t   joint   mapt   strR   R   (   Rb   t   vals(    (   Rw  R%  s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   human_write_stats'  s    	)#c         C@  sD   | j  s@ y d |  j |  j |  <Wq@ t t f k
 r< q@ Xn  d S(   sI   Handler called to clean up a processes queues after process
        exit.N(   Rl   RA   R   t   _find_worker_queuesR   t
   ValueError(   Rb   R   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _process_cleanup_queues;  s
    	c         C@  s   x |  j  D]| } y t | j j d  Wn t t f k
 r@ q
 Xy | j j d  Wq
 t k
 r } t |  t	 j
 k r   q q
 Xq
 Wd S(   s?   Called at shutdown to tell processes that we are shutting down.i   N(   R   R   R  R<   Ry   R   R`   RA   R   RG   R"  (   t   task_handlerR   RQ   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _stop_task_handlerD  s    c         C@  s%   t  t |   j d |  j d |  j  S(   NRo   Rp   (   Rr   R   t   create_result_handlerR   Rp   (   Rb   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR  S  s    	c         C@  sP   | |  j  k s t  t |  j   } | |  j  | <| t |  j   k sL t  d S(   sR   Marks new ownership for ``queues`` so that the fileno indices are
        updated.N(   R   R   R)   (   Rb   R   t   queuest   b(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _process_register_queuesY  s    c         @  sK   y' t    f d   t |  j  D  SWn t k
 rF t     n Xd S(   s"   Find the queues owned by ``proc``.c         3@  s'   |  ] \ } } |   k r | Vq d  S(   N(    (   R   Rd  Re  (   R   (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pys	   <genexpr>d  s    	N(   R   R   R   R   R  (   Rb   R   (    (   R   s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR  a  s    'c         C@  s)   d  |  _ |  _ |  _ |  _ |  _ d  S(   N(   RA   t   _inqueuet	   _outqueueRV  t
   _quick_gett   _poll_result(   Rb   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _setup_queuesi  s    c   
      C@  sH  | j  j } |  j j } t | g  } x| rC| j rC|  j t k rCt | d | d d \ } } } | r?y | j
   } Wn t t t f k
 r}	 t |	  t j k r q* n/ t |	  t j k r Pn t d |	 | d d t |	  t k rt d |	 | d d n  Pq@X| d k r2t d |  Pq@| |  q* Pq* Wd S(   s   Flushes all queues, including the outbound buffer, so that
        all tasks that have not been started will be discarded.

        In Celery this is called whenever the transport connection is lost
        (consumer restart).

        RM   g{Gz?s    got %r while flushing process %rR<  i   s&   got sentinel while flushing process %rN(   R_   R   R   R   RB   t   closedR   R
   RR   RA   R   Ry   R   R{   R   RG   RH   t   EAGAINR   Rz   (
   Rb   R   t   resqR   R`  Ra  R   Rc  R   RQ   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR  r  s0    "!c         C@  s   | j  s |  j |  n  t |  } | rA |  j j |  ~ n  | j s t | _ t |  j  } y; |  j	 |  } |  j
 | |  r d |  j |  j   <n  Wn t k
 r n Xt |  j  | k s t  n  d S(   sV   Called when a job was only partially written to a child process
        and it exited.N(   R  R  R@   R   RI   Rl   R   R)   R   R  t   destroy_queuesRA   R   R  R   (   Rb   R>   R   R?   t   beforeR  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRp    s     			c         C@  s   | j    s t  |  j j |  d } y |  j j |  Wn t k
 rV d } n Xy! |  j | d j j	   |  Wn t
 k
 r n Xxh | D]` } | r xQ | j | j f D]: } | j s y | j   Wq t
 t f k
 r q Xq q Wq q W| S(   s[   Destroy queues that can no longer be used, so that they
        be replaced by new sockets.i   i    (   R   R   R   RI   R   Rq   R   R   R<   R   R   R   R  R   Ry   (   Rb   R  R   t   removedt   queuet   sock(    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR    s(    
!	c   	      C@  s@   | | | f d | } t  |  } | d |  } | | | f S(   NR2  s   >I(   R)   (	   Rb   t   type_Ru   R8  R:  R2  R5  R-   R6  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyRS    s    c         C@  s   d  S(   N(    (   t   clsR  R   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _set_result_sentinel  s    c         C@  s
   |  j  f S(   N(   R   (   Rb   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _help_stuff_finish_args  s    c   	      C@  s   t  d  i  } t   } xO | D]G } y- | j j j   } | j |  | | | <Wq  t k
 rf q  Xq  Wxj | r t | d d \ } } } | r qn n  | s Pn  x" | D] } | | j j j   q Wt	 d  qn Wd  S(   Ns7   removing tasks from inqueue until task handler finishedRM   g      ?i    (
   R   RB   R  R   R   R   R   RR   R   R   (	   R  R   t   fileno_to_proct   inqRRO   R+   Ra  R   Rc  (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   _help_stuff_finish  s(    		c         C@  s   i d |  j  6S(   Ng      @(   R   (   Rb   (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR     s    (6   Rh   Ri   Rj   Rn   R^   RA   Rk   Rs   R   R   R   R   R   RT  R   R   R   R   R   R   R   t   structR:  R   R8  R   R   R_  R[  Rf  Rj  Rk  R   Rp   Rr  Rt  R  R  t   staticmethodR  R  R  R  R  R  Rp  R  RS  t   classmethodR  R  R  t   propertyR   (    (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyR   b  sV   5							l 	>																		%			(   i   i   i   (   R6   R+   R7   (g   Rj   t
   __future__R    RG   R   R   RC   RF   R  t   sysRT  t   collectionsR   R   t   ioR   R   R   R   t   weakrefR   R   t
   amqp.utilsR   t   billiard.poolR	   R
   R   R   R   t   billiardR   R   t   billiard.compatR   R   R   t   billiard.einfoR   t   billiard.queuesR   t   kombu.asyncR   R   R   t   kombu.serializationR   t   kombu.utilsR   t   kombu.utils.compatR   t   kombu.utils.eventioR   t   celery.fiveR   R   R   R   R   t   celery.utils.logR    t   celery.utils.textR!   t   celery.workerR"   RQ  t	   _billiardR#   R0   R$   R(   Rx   R   R   t   version_infot   ImportErrort	   NameErrorRk   R3   Rh   Rn  RE   R   t	   frozensetR  RH   Rz   Ra   R   t   SCHED_STRATEGY_PREFETCHRP  RA   R   Rm   R5   R;   R@   RR   R]   R^   Rn   t   PoolR   (    (    (    s=   /tmp/pip-unpacked-wheel-gV1wwp/celery/concurrency/asynpool.pyt   <module>   sx   (("

			-	