ó
P'—^c           @@ sO  d  Z  d d l m Z m Z m Z d d l Z d d l m Z d d l m	 Z	 m
 Z
 d d l m Z m Z d d l m Z d d l m Z d d	 l m Z d d
 l m Z m Z d d l m Z d d d d d d d d d d d d d d g Z d Z d e f d „  ƒ  YZ d e f d „  ƒ  YZ d d d d  d! d" g d# „ Z  d d$ „ Z! d% „  Z" e! d d d& „ Z# d' „  Z$ d d d d d d d d d( „ Z% d) „  Z& d* „  Z' d+ „  Z( d, „  Z) d d- e* d d d e* d d d d d. „ Z+ d/ „  Z, d0 „  Z- d1 „  Z. d2 „  Z/ e e% d3 e ƒZ0 e e, d3 e ƒZ1 e e- d3 e ƒZ2 e e. d3 e ƒZ3 d S(4   uN   
    celery.contrib.migrate
    ~~~~~~~~~~~~~~~~~~~~~~

    Migration tools.

i    (   t   absolute_importt   print_functiont   unicode_literalsN(   t   partial(   t   cyclet   islice(   t	   eventloopt   Queue(   t   maybe_declare(   t   ensure_bytes(   t   app_or_default(   t   stringt   string_t(   t   worker_directu   StopFilteringu   Stateu	   republishu   migrate_tasku   migrate_tasksu   moveu
   task_id_equ
   task_id_inu   start_filteru   move_task_by_idu   move_by_idmapu   move_by_taskmapu   move_directu   move_direct_by_iduG   Moving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]t   StopFilteringc           B@ s   e  Z RS(    (   t   __name__t
   __module__(    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyR   #   s   t   Statec           B@ s2   e  Z d  Z d  Z d  Z e d „  ƒ Z d „  Z RS(   i    c         C@ s   |  j  s d St |  j  ƒ S(   Nu   ?(   t	   total_apxR   (   t   self(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   strtotal,   s    	c         C@ s#   |  j  r d j |  ƒ Sd j |  ƒ S(   Nu   ^{0.filtered}u   {0.count}/{0.strtotal}(   t   filteredt   format(   R   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   __repr__2   s    	(   R   R   t   countR   R   t   propertyR   R   (    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyR   '   s
   u   application_headersu   content_typeu   content_encodingu   headersc         C@ sè   t  | j ƒ } | j | j | j } } } | d  k rB | d n | } | d  k r^ | d n | } | j | j }	 }
 | j d d  ƒ } x | D] } | j | d  ƒ q W|  j	 t  | ƒ d | d | d | d | d |	 d	 |
 | d  S(
   Nu   exchangeu   routing_keyu   compressiont   exchanget   routing_keyt   compressiont   headerst   content_typet   content_encoding(
   R	   t   bodyt   delivery_infoR   t
   propertiest   NoneR   R   t   popt   publish(   t   producert   messageR   R   t   remove_propsR    t   infoR   t   propst   ctypet   encR   t   key(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt	   republish8   s    c      	   C@ sX   | j  } | d  k r i  n | } t |  | d | j | d ƒ d | j | d ƒ ƒd  S(   NR   u   exchangeR   u   routing_key(   R!   R#   R.   t   get(   R&   t   body_R'   t   queuesR)   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   migrate_taskP   s
    	c         @ s   ‡  ‡ f d †  } | S(   Nc         @ s'   ˆ r |  d ˆ k r d  Sˆ  |  | ƒ S(   Nu   task(    (   R    R'   (   t   callbackt   tasks(    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyR   Z   s    (    (   R3   R4   R   (    (   R3   R4   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   filter_callbackX   s    c      	   @ sp   t  | ƒ } t ˆ ƒ ‰ | j j | ƒ ‰  t | ˆ  d ˆ ƒ} ‡  ‡ f d †  } t | |  | d ˆ d | | S(   NR1   c         @ sž   |  ˆ  j  ƒ } ˆ j |  j |  j ƒ | _ | j |  j k rZ ˆ j |  j | j ƒ | _ n  | j j |  j k r ˆ j |  j |  j ƒ | j _ n  | j ƒ  d  S(   N(   t   channelR/   t   nameR   R   t   declare(   t   queuet	   new_queue(   R&   R1   (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   on_declare_queuei   s    !R;   (   R
   t   prepare_queuest   amqpt   TaskProducerR   t   start_filter(   t   sourcet   destt   migratet   appR1   t   kwargsR;   (    (   R&   R1   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   migrate_tasksb   s    
c         C@ s!   t  | t ƒ r |  j j | S| S(   N(   t
   isinstanceR   R=   R1   (   RC   t   q(    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   _maybe_queuew   s    c	         @ s·   t  | ƒ } g  | p g  D] }
 t | |
 ƒ ^ q p7 d } | j | d t ƒb ‰ | j j ˆ ƒ ‰ t ƒ  ‰ ‡  ‡ ‡ ‡ ‡ ‡ ‡ ‡ ‡ f	 d †  } t | ˆ | d | |	 SWd QXd S(   u³  Find tasks by filtering them and move the tasks to a new queue.

    :param predicate: Filter function used to decide which messages
        to move.  Must accept the standard signature of ``(body, message)``
        used by Kombu consumer callbacks. If the predicate wants the message
        to be moved it must return either:

            1) a tuple of ``(exchange, routing_key)``, or

            2) a :class:`~kombu.entity.Queue` instance, or

            3) any other true value which means the specified
               ``exchange`` and ``routing_key`` arguments will be used.

    :keyword connection: Custom connection to use.
    :keyword source: Optional list of source queues to use instead of the
        default (which is the queues in :setting:`CELERY_QUEUES`).
        This list can also contain new :class:`~kombu.entity.Queue` instances.
    :keyword exchange: Default destination exchange.
    :keyword routing_key: Default destination routing key.
    :keyword limit: Limit number of messages to filter.
    :keyword callback: Callback called after message moved,
        with signature ``(state, body, message)``.
    :keyword transform: Optional function to transform the return
        value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, string_t):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    The predicate may also return a tuple of ``(exchange, routing_key)``
    to specify the destination to where the task should be moved,
    or a :class:`~kombu.entitiy.Queue` instance.
    Any other true value means that the task will be moved to the
    default exchange/routing_key.

    t   poolc         @ sí   ˆ |  | ƒ } | ré ˆ r* ˆ | ƒ } n  t  | t ƒ rb t | ˆ j ƒ | j j | j } } n t | ˆ ˆ ƒ \ } } t ˆ | d | d | ƒ| j	 ƒ  ˆ j
 d 7_
 ˆ  rÅ ˆ  ˆ |  | ƒ n  ˆ ré ˆ j
 ˆ k ré t ƒ  ‚ qé n  d  S(   NR   R   i   (   RF   R   R   t   default_channelR   R7   R   t   expand_destR.   t   ackR   R   (   R    R'   t   rett   ext   rk(	   R3   t   connR   t   limitt	   predicateR&   R   t   statet	   transform(    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   on_taskÀ   s     
t   consume_fromN(	   R
   RH   R#   t   connection_or_acquiret   FalseR=   R>   R   R?   (   RR   t
   connectionR   R   R@   RC   R3   RQ   RT   RD   R9   R1   RU   (    (	   R3   RP   R   RQ   RR   R&   R   RS   RT   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   move}   s    =.	'c         C@ sA   y |  \ } } Wn$ t  t f k
 r6 | | } } n X| | f S(   N(   t	   TypeErrort
   ValueError(   RM   R   R   RN   RO   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyRK   ×   s
    c         C@ s   | d |  k S(   Nu   id(    (   t   task_idR    R'   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt
   task_id_eqß   s    c         C@ s   | d |  k S(   Nu   id(    (   t   idsR    R'   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt
   task_id_inã   s    c         C@ sb   t  |  t ƒ r! |  j d ƒ }  n  t  |  t ƒ rI t d „  |  Dƒ ƒ }  n  |  d  k r^ i  }  n  |  S(   Nu   ,c         s@ s6   |  ], } t  t t | j d  ƒ ƒ d d ƒ ƒ Vq d S(   u   :i   N(   t   tupleR   R   t   splitR#   (   t   .0RG   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pys	   <genexpr>ë   s   (   RF   R   Rb   t   listt   dictR#   (   R1   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyR<   ç   s    		g      ð?c      
   @ sk  ˆ p t  ƒ  ‰ t | ƒ } g  | p- t | ƒ D] } t |  | ƒ ^ q. } t | t ƒ rp t | j d ƒ ƒ } n  | d  k r‹ t g  ƒ } n  ‡  ‡ f d †  } d „  } |  j	 j
 | d | d | ƒ} | rú t | | ƒ } t | | ƒ } t | | ƒ } n  | j | ƒ | j | ƒ | r*| j | ƒ n  | d  k	 rmt | ˆ ƒ } | r]t | | ƒ } n  | j | ƒ n  xœ | j D]‘ } | r˜| j | k r˜qwn  |
 d  k	 r±|
 | ƒ n  y@ | | j ƒ j d t ƒ \ } } } | rðˆ j | 7_ n  Wqw| j k
 rqwXqwW| S y' x  t | d | d |	 ƒD] } q/WWn$ t j k
 rPn t k
 r`n XWd  QXˆ S(	   Nu   ,c         @ s4   ˆ j  d 7_  ˆ  r0 ˆ j  ˆ  k r0 t ƒ  ‚ n  d  S(   Ni   (   R   R   (   R    R'   (   RQ   RS   (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   update_stateÿ   s    c         S@ s   | j  ƒ  d  S(   N(   RL   (   R    R'   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   ack_message  s    R1   t   acceptt   passivet   timeoutt   ignore_timeouts(   R   R<   Rd   RH   RF   R   t   setRb   R#   R=   t   TaskConsumerR5   t   register_callbackR   R1   R7   R6   t   queue_declaret   TrueR   t   channel_errorsR   t   socketRj   R   (   RC   RP   t   filterRQ   Rj   t   ack_messagesR4   R1   R3   t   foreverR;   RV   RS   Rh   RD   RG   Rf   Rg   t   consumerR9   t   _t   mcount(    (   RQ   RS   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyR?   ò   sZ    +	$
c         K@ s   t  i | |  6|  S(   uÁ   Find a task by id and move it to another queue.

    :param task_id: Id of task to move.
    :param dest: Destination queue.

    Also supports the same keyword arguments as :func:`move`.

    (   t   move_by_idmap(   R]   RA   RD   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   move_task_by_id2  s    	c         @ s(   ‡  f d †  } t  | d t ˆ  ƒ | S(   u–  Moves tasks by matching from a ``task_id: queue`` mapping,
    where ``queue`` is a queue to move the task to.

    Example::

        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])

    c         @ s   ˆ  j  |  d ƒ S(   Nu   id(   R/   (   R    R'   (   t   map(    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   task_id_in_mapK  s    RQ   (   RZ   t   len(   R{   RD   R|   (    (   R{   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyRy   >  s    c         @ s   ‡  f d †  } t  | |  S(   u  Moves tasks by matching from a ``task_name: queue`` mapping,
    where ``queue`` is the queue to move the task to.

    Example::

        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })

    c         @ s   ˆ  j  |  d ƒ S(   Nu   task(   R/   (   R    R'   (   R{   (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   task_name_in_map`  s    (   RZ   (   R{   RD   R~   (    (   R{   s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   move_by_taskmapS  s    c         K@ s#   t  t j d |  d | |  ƒ d  S(   NRS   R    (   t   printt   MOVING_PROGRESS_FMTR   (   RS   R    R'   RD   (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   filter_statusf  s    RT   (4   t   __doc__t
   __future__R    R   R   Rr   t	   functoolsR   t	   itertoolsR   R   t   kombuR   R   t   kombu.commonR   t   kombu.utils.encodingR	   t
   celery.appR
   t   celery.fiveR   R   t   celery.utilsR   t   __all__R   t	   ExceptionR   t   objectR   R#   R.   R2   R5   RE   RH   RZ   RK   R^   R`   R<   RX   R?   Rz   Ry   R   R‚   t   move_directt   move_direct_by_idt   move_direct_by_idmapt   move_direct_by_taskmap(    (    (    s8   /tmp/pip-unpacked-wheel-gV1wwp/celery/contrib/migrate.pyt   <module>   sZ   		
		Y						=				