
P'^c           @@  s  d  Z  d d l m Z d d l Z d d l Z d d l Z d d l Z d d l m Z d d l	 m
 Z
 d d l m Z d d l m Z d d l m Z m Z m Z d d	 l m Z d d
 l m Z d d l m Z d d l m Z d d l m Z m Z d d l m Z d d l m  Z  m! Z! m" Z" d d d d g Z# e d d d Z$ e d d  Z% d Z& d Z' d   Z( d e* e j d  Z+ d   Z, d e- f d     YZ. d e f d     YZ/ d e- f d      YZ0 d S(!   s   
    celery.events
    ~~~~~~~~~~~~~

    Events is a stream of messages sent for certain actions occurring
    in the worker (and clients if :setting:`CELERY_SEND_TASK_SENT_EVENT`
    is enabled), used for monitoring purposes.

i    (   t   absolute_importN(   t   deque(   t   contextmanager(   t   copy(   t
   itemgetter(   t   Exchanget   Queuet   Producer(   t   maybe_channel(   t   ConsumerMixin(   t   cached_property(   t   app_or_default(   t   anon_nodenamet   uuid(   t
   dictfilter(   t   adjust_timestampt	   utcoffsett   maybe_s_to_mst   Eventst   Eventt   EventDispatchert   EventReceivert   celeryevt   typet   topicR   t	   timestamps   
anyjson is currently using the yajl library.
This json implementation is broken, it severely truncates floats
so timestamps will not work.

Please uninstall yajl or force anyjson to use a different library.
ic         C@  s.   t  t  } |  j j d k r* d | _ n  | S(   Nt   redist   fanout(   R   t   event_exchanget	   transportt   driver_typeR   (   t   connt   ex(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   get_exchange2   s    c         K@  sQ   | r | | |  n | } d | k rC | j  d |   d |   n
 |  | d <| S(   s   Create an event.

    An event is a dictionary, the only required field is ``type``.
    A ``timestamp`` field will be set to the current time if not provided.

    R   R   (   t   update(   R   t   _fieldst   __dict__t   __now__t   fieldst   event(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   :   s
    
c         C@  s   |  j  d d  d S(   s   Get the group part of an event type name.

    E.g.::

        >>> group_from('task-sent')
        'task'

        >>> group_from('custom-my-event')
        'custom'

    t   -i   i    (   t   split(   R   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt
   group_fromI   s    c        	   B@  s   e  Z d  Z e d g  Z d Z d Z d Z d d e	 d e	 d d d d  Z
 d   Z d   Z d   Z d   Z d   Z e d e e e d  Z e d	  Z d
   Z d   Z d   Z d   Z d   Z e e e  Z RS(   s  Dispatches event messages.

    :param connection: Connection to the broker.

    :keyword hostname: Hostname to identify ourselves as,
        by default uses the hostname returned by
        :func:`~celery.utils.anon_nodename`.

    :keyword groups: List of groups to send events for.  :meth:`send` will
        ignore send requests to groups not in this list.
        If this is :const:`None`, all events will be sent. Example groups
        include ``"task"`` and ``"worker"``.

    :keyword enabled: Set to :const:`False` to not actually publish any events,
        making :meth:`send` a noop operation.

    :keyword channel: Can be used instead of `connection` to specify
        an exact channel to use when sending events.

    :keyword buffer_while_offline: If enabled events will be buffered
       while the connection is down. :meth:`flush` must be called
       as soon as the connection is re-established.

    You need to :meth:`close` this after use.

    t   sqlc	   
      C@  s  t  | p |  j  |  _ | |  _ | |  _ | p6 t   |  _ | |  _ t j   |  _	 d  |  _ t   |  _ | p{ |  j j j |  _ t   |  _ t   |  _ t | p g   |  _ t j t j g |  _ |  j j |  _ | r | r | j j |  _ n  | |  _ |  j p|  j j   }	 t |	  |  _ |	 j j |  j  k rDt! |  _ n  |  j rZ|  j"   n  i |  j d 6|  _# t$ j%   |  _& |  j'   d  S(   Nt   hostname((   R   t   appt
   connectiont   channelR   R,   t   buffer_while_offlinet	   threadingt   Lockt   mutext   Nonet   producerR   t   _outbound_buffert   conft   CELERY_EVENT_SERIALIZERt
   serializert   sett
   on_enabledt   on_disabledt   groupst   timet   timezonet   altzonet   tzoffsett   clockt   clientt   enabledR!   t   exchangeR   R   t   DISABLED_TRANSPORTSt   Falset   enablet   headerst   ost   getpidt   pidt   warn_if_yajl(
   t   selfR.   R,   RD   R/   R0   R-   R9   R=   t   conninfo(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   __init__}   s4    						c         C@  s8   d d  l  } | j j d k r4 t j t t   n  d  S(   Ni    t   yajl(   t   anyjsont   implementationt   namet   warningst   warnt   UserWarningt   W_YAJL(   RN   RR   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRM      s    c         C@  s   |  S(   N(    (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt	   __enter__   s    c         G@  s   |  j    d  S(   N(   t   close(   RN   t   exc_info(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   __exit__   s    c         C@  sU   t  |  j p |  j d |  j d |  j |  _ t |  _ x |  j D] } |   q@ Wd  S(   NRE   R9   (	   R   R/   R.   RE   R9   R5   t   TrueRD   R;   (   RN   t   callback(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRH      s    		c         C@  s>   |  j  r: t |  _  |  j   x |  j D] } |   q& Wn  d  S(   N(   RD   RG   RZ   R<   (   RN   R^   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   disable   s
    		
c	         C@  s   |  j   | r d n |  j j   }	 | | d |  j d |   d |  j d |	 | }
 |  j } | j |
 d | j d d  d | j	 d	 | d
 | d | g d |  j
 d |  j Wd QXd S(   s8  Publish event using a custom :class:`~kombu.Producer`
        instance.

        :param type: Event type name, with group separated by dash (`-`).
        :param fields: Dictionary of event fields, must be json serializable.
        :param producer: :class:`~kombu.Producer` instance to use,
            only the ``publish`` method will be called.
        :keyword retry: Retry in the event of connection failure.
        :keyword retry_policy: Dict of custom retry policy, see
            :meth:`~kombu.Connection.ensure`.
        :keyword blind: Don't set logical clock value (also do not forward
            the internal logical clock).
        :keyword Event: Event type used to create event,
            defaults to :func:`Event`.
        :keyword utcoffset: Function returning the current utcoffset in hours.

        R,   R   RL   RB   t   routing_keyR(   t   .RE   t   retryt   retry_policyt   declareR9   RI   N(   R3   R4   RB   t   forwardR,   RL   RE   t   publisht   replaceRT   R9   RI   (   RN   R   R&   R5   Rb   Rc   t   blindR   R   RB   R'   RE   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRf      s    
				c         K@  s   |  j  r |  j } | r. t |  | k r. d Sy |  j | | |  j |  Wq t k
 r } |  j sl   n  |  j j | | | f  q Xn  d S(   sy  Send event.

        :param type: Event type name, with group separated by dash (`-`).
        :keyword retry: Retry in the event of connection failure.
        :keyword retry_policy: Dict of custom retry policy, see
            :meth:`~kombu.Connection.ensure`.
        :keyword blind: Don't set logical clock value (also do not forward
            the internal logical clock).
        :keyword Event: Event type used to create event,
            defaults to :func:`Event`.
        :keyword utcoffset: Function returning the current utcoffset in hours.
        :keyword \*\*fields: Event fields, must be json serializable.

        N(	   RD   R=   R*   Rf   R5   t	   ExceptionR0   R6   t   append(   RN   R   Rh   R&   R=   t   exc(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   send   s    			c         C@  sU   xN |  j  rP y |  j  j   \ } } } Wn t k
 r< d SX|  j | |  q Wd S(   s   Flushes the outbound buffer.N(   R6   t   popleftt
   IndexErrorRl   (   RN   R   R&   t   _(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   flush   s    c         C@  s   |  j  j | j   d S(   s/   Copies the outbound buffer of another instance.N(   R6   t   extend(   RN   t   other(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   extend_buffer   s    c         C@  s)   |  j  j   o |  j  j   d |  _ d S(   s   Close the event dispatcher.N(   R3   t   lockedt   releaseR4   R5   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRZ     s    c         C@  s   |  j  S(   N(   R5   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   _get_publisher  s    c         C@  s   | |  _  d  S(   N(   R5   (   RN   R5   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   _set_publisher	  s    N(   t   __name__t
   __module__t   __doc__R:   RF   R4   R-   R;   R<   R]   RP   RM   RY   R\   RH   R_   RG   R   R   Rf   Rl   Rp   Rs   RZ   Rv   Rw   t   propertyt	   publisher(    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   X   s.   								#						c           B@  s   e  Z d  Z d Z d d d d d d d  Z d   Z d   Z d   Z e	 d  Z
 d d e	 d  Z d d e	 d	  Z d d
  Z e	 e j e e e d  Z d   Z e d    Z RS(   s  Capture events.

    :param connection: Connection to the broker.
    :keyword handlers: Event handlers.

    :attr:`handlers` is a dict of event types and their handlers,
    the special handler `"*"` captures all events that doesn't have a
    handler.

    t   #R   c         C@  s9  t  | p |  j  |  _ t |  |  _ | d  k r9 i  n | |  _ | |  _ | pW t   |  _ | |  _	 t
 |  j p~ |  j j    |  _ t d j |  j	 |  j g  d |  j d |  j d t d t d |  j   |  _ |  j j |  _ |  j j |  _ |  j j |  _ | d  k r,t |  j j j d g  } n  | |  _ d  S(   NRa   RE   R`   t   auto_deletet   durablet   queue_argumentst   json(   R   R-   R   R/   R4   t   handlersR`   R   t   node_idt   queue_prefixR!   R.   RE   R   t   joinR]   RG   t   _get_queue_argumentst   queueRB   t   adjustt   adjust_clockRe   t   forward_clockR:   R7   R8   t   accept(   RN   R/   R   R`   R   R-   R   R   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRP     s&    		!		c         C@  s6   |  j  j } t i t | j  d 6t | j  d 6 S(   Ns   x-message-ttls	   x-expires(   R-   R7   R   R   t   CELERY_EVENT_QUEUE_TTLt   CELERY_EVENT_QUEUE_EXPIRES(   RN   R7   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   2  s    c         C@  s8   |  j  j |  p! |  j  j d  } | o3 | |  d S(   sP   Process the received event by dispatching it to the appropriate
        handler.t   *N(   R   t   get(   RN   R   R'   t   handler(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   process9  s    $c      	   C@  s1   | d |  j  g d |  j g d t d |  j  g S(   Nt   queuest	   callbackst   no_ackR   (   R   t   _receiveR]   R   (   RN   t   ConsumerR/   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   get_consumers?  s    c         K@  s   | r |  j  d |  n  d  S(   NR/   (   t   wakeup_workers(   RN   R.   R/   t	   consumerst   wakeupt   kwargs(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   on_consume_readyD  s    c         C@  s   |  j  d | d | d |  S(   Nt   limitt   timeoutR   (   t   consume(   RN   R   R   R   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   itercaptureI  s    c         C@  s"   t  |  j d | d | d |   S(   s   Open up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.

        R   R   R   (   t   listR   (   RN   R   R   R   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   captureL  s    c         C@  s&   |  j  j j d d |  j d | d  S(   Nt	   heartbeatR.   R/   (   R-   t   controlt	   broadcastR.   (   RN   R/   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   V  s    	c         C@  s   | d } | d k rD |  j  j p% d | } | d <|  j |  n? y | d }	 Wn! t k
 ru |  j   | d <n X|  j |	  | r y | |  \ }
 } Wn t k
 r q X| | |
  | d <n  |   | d <| | f S(   NR   s	   task-senti   RB   R   t   local_received(   RB   t   valueR   t   KeyErrorR   (   RN   t   bodyt   localizet   nowt   tzfieldsR   t   CLIENT_CLOCK_SKEWR   t   _cRB   t   offsetR   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   event_from_message[  s"    
c         C@  s   |  j  |  j |    d  S(   N(   R   R   (   RN   R   t   message(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   v  s    c         C@  s   |  j  r |  j  j j Sd  S(   N(   R/   R.   RC   R4   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR.   y  s    N(   Rx   Ry   Rz   R4   R-   RP   R   R   R   R]   R   R   R   R   R>   t	   _TZGETTERR   R   R   R   R{   R.   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR     s$   
				
		c           B@  sY   e  Z d d   Z e d    Z e d    Z e d    Z e d e	 e
 d   Z RS(   c         C@  s   | |  _  d  S(   N(   R-   (   RN   R-   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyRP     s    c         C@  s   |  j  j t d d S(   Nt   reverses   events.Receiver(   R-   t   subclass_with_selfR   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   Receiver  s    c         C@  s   |  j  j t d d S(   NR   s   events.Dispatcher(   R-   R   R   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt
   Dispatcher  s    c         C@  s   |  j  j d d d S(   Ns   celery.events.state:StateR   s   events.State(   R-   R   (   RN   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   State  s    c      
   c@  sW   |  j  j j j d t  6 } |  j | j | | | j |   } | VWd  QXWd  QXd  S(   Nt   block(   R-   t   amqpt   producer_poolt   acquireR]   R   R.   R/   (   RN   R,   RD   R0   t   prodt   d(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   default_dispatcher  s    N(   Rx   Ry   R4   RP   R
   R   R   R   R   R]   RG   R   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyR   ~  s   (1   Rz   t
   __future__R    RJ   R>   R1   RU   t   collectionsR   t
   contextlibR   R   t   operatorR   t   kombuR   R   R   t   kombu.connectionR   t   kombu.mixinsR	   t   kombu.utilsR
   t
   celery.appR   t   celery.utilsR   R   t   celery.utils.functionalR   t   celery.utils.timeutilsR   R   R   t   __all__R   R   RX   R   R!   R4   t   dictR   R*   t   objectR   R   R   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/events/__init__.pyt   <module>
   s8   		p