ó
P'—^c           @@  s	  d  Z  d d l m Z y, d d l Z d d l m Z e j j Z Wn e	 k
 r[ d Z n Xd d l Z d d l Z d d l m Z d d l m Z d d l m Z d d l m Z d d	 l m Z m Z d
 d l m Z d g Z e e ƒ Z d e f d „  ƒ  YZ d S(   sj   
    celery.backends.cassandra
    ~~~~~~~~~~~~~~~~~~~~~~~~~

    Apache Cassandra result store backend.

i    (   t   absolute_importN(   t   Thrift(   t   states(   t   ImproperlyConfigured(   t	   monotonic(   t
   get_logger(   t   maybe_timedeltat   timedelta_secondsi   (   t   BaseBackendt   CassandraBackendc           B@  sž   e  Z d  Z g  Z d Z d Z e Z d Z	 d Z
 e Z d d d d e d „ Z d „  Z d „  Z d „  Z d d d „ Z e d „ Z d	 „  Z d i  d
 „ Z RS(   sõ   Highly fault tolerant Cassandra backend.

    .. attribute:: servers

        List of Cassandra servers with format: ``hostname:port``.

    :raises celery.exceptions.ImproperlyConfigured: if
        module :mod:`pycassa` is not available.

    i,  i   c   
      K@  så  t  t |  ƒ j |   | j d ƒ p7 t |  j j j ƒ |  _ t	 sR t
 d ƒ ‚ n  |  j j } | py | j d ƒ py |  j |  _ | pš | j d ƒ pš |  j |  _ | p» | j d ƒ p» |  j |  _ t | j d ƒ pÖ i  | pß i   |  _ | p| j d ƒ p|  j |  _ | j d ƒ pd	 } | j d
 ƒ p0d	 }	 y t t	 j | ƒ |  _ Wn  t k
 rnt	 j j |  _ n Xy t t	 j |	 ƒ |  _ Wn  t k
 rªt	 j j |  _ n X|  j sÉ|  j sÉ|  j rØt
 d ƒ ‚ n  d |  _ d S(   s§   Initialize Cassandra backend.

        Raises :class:`celery.exceptions.ImproperlyConfigured` if
        the :setting:`CASSANDRA_SERVERS` setting is not set.

        t   expiressl   You need to install the pycassa library to use the Cassandra backend. See https://github.com/pycassa/pycassat   CASSANDRA_SERVERSt   CASSANDRA_KEYSPACEt   CASSANDRA_COLUMN_FAMILYt   CASSANDRA_OPTIONSt   CASSANDRA_DETAILED_MODEt   CASSANDRA_READ_CONSISTENCYt   LOCAL_QUORUMt   CASSANDRA_WRITE_CONSISTENCYs!   Cassandra backend not configured.N(   t   superR	   t   __init__t   getR   t   appt   conft   CELERY_TASK_RESULT_EXPIRESR
   t   pycassaR   t   serverst   keyspacet   column_familyt   dictt   cassandra_optionst   detailed_modet   getattrt   ConsistencyLevelt   read_consistencyt   AttributeErrorR   t   write_consistencyt   Nonet   _column_family(
   t   selfR   R   R   R   R   t   kwargsR   t	   read_const
   write_cons(    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyR   5   sJ    		c         O@  s¡   t  ƒ  |  j } xŠ y | | | Ž  SWq t j t j t j t j t j t j	 t
 j f k
 r˜ } t  ƒ  | k ru ‚  n  t j d | ƒ t j |  j ƒ q Xq Wd  S(   Ns    Cassandra error: %r. Retrying...(   R   t   _retry_timeoutR   t   InvalidRequestExceptiont   TimedOutExceptiont   UnavailableExceptiont   AllServersUnavailablet   sockett   errort   timeoutR   t
   TExceptiont   loggert   warningt   timet   sleept   _retry_wait(   R'   t   funt   argsR(   t   tst   exc(    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   _retry_on_errori   s    c         C@  sd   |  j  d  k r] t j |  j d |  j |  j } t j | |  j d |  j	 d |  j
 ƒ|  _  n  |  j  S(   Nt   server_listt   read_consistency_levelt   write_consistency_level(   R&   R%   R   t   ConnectionPoolR   R   R   t   ColumnFamilyR   R"   R$   (   R'   t   conn(    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   _get_column_familyz   s    	c         C@  s   |  j  d  k	 r d  |  _  n  d  S(   N(   R&   R%   (   R'   (    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   process_cleanup†   s    c         @  s+   ‡  ‡ ‡ ‡ ‡ ‡ f d †  } ˆ j  | ƒ S(   s2   Store return value and status of an executed task.c          @  sá   ˆ j  ƒ  }  ˆ j j ƒ  } i ˆ d 6| j d ƒ d 6ˆ j ˆ ƒ d 6ˆ j ˆ ƒ d 6ˆ j ˆ j ˆ  ƒ ƒ d 6} ˆ j rµ |  j ˆ i ˆ j | ƒ | 6d ˆ j o® t	 ˆ j ƒ ƒn( |  j ˆ | d ˆ j oÙ t	 ˆ j ƒ ƒd  S(   Nt   statuss   %Y-%m-%dT%H:%M:%SZt	   date_donet	   tracebackt   resultt   childrent   ttl(
   RD   R   t   nowt   strftimet   encodet   current_task_childrenR   t   insertR
   R   (   t   cfRG   t   meta(   t   requestRI   R'   RF   t   task_idRH   (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt	   _do_storeŽ   s    
	(   R=   (   R'   RT   RI   RF   RH   RS   R(   RU   (    (   RS   RI   R'   RF   RT   RH   s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   _store_resultŠ   s    c         C@  s   d S(   Ns   cassandra://(    (   R'   t   include_password(    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   as_uri¡   s    c         @  s   ‡  ‡ f d †  } ˆ  j  | ƒ S(   s#   Get task metadata for a task by id.c          @  sü   ˆ  j  ƒ  }  y¸ ˆ  j rU |  j ˆ d t d d ƒ} ˆ  j t | j ƒ  ƒ d ƒ } n |  j ˆ ƒ } i ˆ d 6| d d 6ˆ  j | d ƒ d 6| d d 6ˆ  j | d	 ƒ d	 6ˆ  j | d
 ƒ d
 6} Wn1 t t j	 f k
 r÷ i t
 j d 6d  d 6} n X| S(   Nt   column_reversedt   column_counti   i    RT   RF   RI   RG   RH   RJ   (   RD   R   R   t   Truet   decodet   listt   valuest   KeyErrorR   t   NotFoundExceptionR   t   PENDINGR%   (   RQ   t   rowt   objRR   (   R'   RT   (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   _do_get§   s     	"(   R=   (   R'   RT   Rd   (    (   R'   RT   s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   _get_task_meta_for¤   s    c      
   C@  sM   | j  t d |  j d |  j d |  j d |  j ƒ ƒ t t |  ƒ j | | ƒ S(   NR   R   R   R   (	   t   updateR   R   R   R   R   R   R	   t
   __reduce__(   R'   R:   R(   (    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyRg   ¾   s    		N(    (   t   __name__t
   __module__t   __doc__R   R%   R   R   t   FalseR   R+   R8   R[   t   supports_autoexpireR   R=   RD   RE   RV   RX   Re   Rg   (    (    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyR	   "   s"   
	3				(   Rj   t
   __future__R    R   t   thriftR   t	   cassandrat   ttypest   Ct   ImportErrorR%   R0   R6   t   celeryR   t   celery.exceptionsR   t   celery.fiveR   t   celery.utils.logR   t   celery.utils.timeutilsR   R   t   baseR   t   __all__Rh   R4   R	   (    (    (    s;   /tmp/pip-unpacked-wheel-gV1wwp/celery/backends/cassandra.pyt   <module>   s$   
	