
    g                         d dl Z d dlZd dlmZ d dlZd dlZd dlmZmZm	Z	 	 d dl
mZ dZdZ G d de      Zy# e$ r	 d dlmZ Y w xY w)	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   j    e Zd ZdZ ej
                  d      Z	 	 	 	 	 	 	 	 d
dZd Zd Z	d Z
d Zd	 Zy)Consumerz.Consumes the messages from the client's queue.posthogNc                     t        j                  |        d| _        || _        || _        || _        || _        || _        || _        || _	        d| _
        || _        |	| _        |
| _        y)zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r   s              E/var/www/openai/venv/lib/python3.12/site-packages/posthog/consumer.pyr   zConsumer.__init__   sh     	 ,	 
	
 $8!    c                     | j                   j                  d       | j                  r| j                          | j                  r| j                   j                  d       y)zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   s    r   runzConsumer.run=   s?    /0llKKM ll 	)*r   c                     d| _         y)zPause the consumer.FN)r   r    s    r   pausezConsumer.pauseE   s	    r   c                    d}| j                         }t        |      dk(  ry	 | j                  |       d}|D ]  }| j                  j                           |S # t        $ rF}| j                  j                  d|       d}| j                  r| j                  ||       Y d}~md}~ww xY w# |D ]  }| j                  j                           |c cY S xY w)z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %sN)	nextlenrequest	Exceptionr   errorr   r   	task_done)r   successbatcheitems        r   r   zConsumer.uploadI   s    		u:?	LLG 

$$& N  	(HHNN0!4G}}a'		( 

$$& Ns)   A 	B("<B#B+ #B((B+ +&Cc                    | j                   }g }t        j                         }d}t        |      | j                  k  rt        j                         |z
  }|| j                  k\  r	 |S 	 |j                  d| j                  |z
        }t        t        j                  |t              j                               }|t        kD  r&| j                  j                  dt        |             |j                  |       ||z  }|t        k\  r| j                  j!                  d|       	 |S 	 t        |      | j                  k  r|S # t"        $ r Y |S w xY w)z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   	monotonicr&   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr   r)   strappendBATCH_SIZE_LIMITr   r   )r   r   items
start_time
total_sizeelapsedr.   	item_sizes           r   r%   zConsumer.next^   s7   

((*

%j4==())+j8G$--- yytT5H5H75RyS

45G H O O QR	|+HHNN#NPSTXPYZT"i'
!11HHNN#DjQ  2 %j4==($   s   (B E );E 	EEc                      d }t        j                  t         j                  t         j                  dz   |       fd       } |        y)z=Attempt to upload the batch and retry before raising an errorc                     t        | t              r;| j                  dk(  ryd| j                  cxk  xr dk  nc xr | j                  dk7  S y)NzN/AFi  i  i  )
isinstancer   status)excs    r   fatal_exceptionz)Consumer.request.<locals>.fatal_exception}   sD    #x( ::& szz/C/FSZZ35FF r      )	max_triesgiveupc                      t        j                  j                  j                  j                   j
                         y )N)r   r   r,   r   )r   r   r   r   r   r   )r,   r   s   r   send_requestz&Consumer.request.<locals>.send_request   s3    		YY%)%>%>r   N)backoffon_exceptionexpor(   r   )r   r,   rE   rJ   s   ``  r   r'   zConsumer.requestz   sD    
	 
		gllIPQAQZi	j	 
k	 	r   )d   NNg      ?F
      F)__name__
__module____qualname____doc__logging	getLoggerr   r   r!   r#   r   r%   r'    r   r   r	   r	      sP    8
'

I
&C "9B+*8r   r	   )r4   rU   	threadingr   rK   r2   posthog.requestr   r   r   r   r   ImportErrorQueuer7   r:   r	   rW   r   r   <module>r\      sS         D D
  # }v }  s   8 AA