
    gG                        d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZ d dlmZmZmZmZmZmZmZmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZ erddlm Z   ejB                  e"      Z# G d d      Z$y)    )annotationsN)TYPE_CHECKINGAnyCallableDictListOptional)BindingCallbackChannelEventsChannelStatesRealtimeChannelOptions"RealtimePostgresChangesListenEventRealtimePresenceStateRealtimeSubscribeStates   )http_endpoint_url   )AsyncRealtimePresencePresenceOnJoinCallbackPresenceOnLeaveCallback)	AsyncPush)
AsyncTimer)AsyncRealtimeClientc                     e Zd ZdZ	 d 	 	 	 	 	 	 	 d!dZed        Zed        Zed        Zed        Z	ed        Z
ed	        Z	 d 	 	 	 d"d
Zd Z	 d 	 	 	 	 	 	 	 d#dZd$dZ	 d 	 	 	 	 	 	 	 d%dZd&dZ	 	 	 	 	 	 d'dZ	 	 	 d(	 	 	 	 	 	 	 	 	 	 	 d)dZd*dZd+dZd,dZd-dZ	 	 	 	 d.dZ	 	 	 	 d/dZd0dZd Zd+dZd Zd0dZd d1dZd2dZ d Z!y)3AsyncRealtimeChannela  
    `Channel` is an abstraction for a topic listener for an existing socket connection.
    Each Channel has its own topic and a list of event-callbacks that responds to messages.
    Should only be instantiated through `connection.RealtimeClient().channel(topic)`.
    Nc                    | _         |xs t        dddddidd       _        | _        d _        i  _        t                _        t        j                   _
        g  _         j                   j                   _        t         t        j                   j                         _        t#         j$                  d        _         j)                          _        d fd} fd	} j                   j-                  d
|      j-                  d|        fd} fd} j/                  d|        j/                  d|        fd} j/                  t        j0                  |       y)z
        Initialize the Channel object.

        :param socket: RealtimeClient object
        :param topic: Topic that it subscribes to on the realtime server
        :param params: Optional parameters for connection.
        F)ackselfkey )	broadcastpresenceprivate)configc                    d| z  S )Nr    )triess    L/var/www/openai/venv/lib/python3.12/site-packages/realtime/_async/channel.py<lambda>z/AsyncRealtimeChannel.__init__.<locals>.<lambda>H   s    5    c                    t         j                  _        j                  j	                          j
                  D ]%  }t        j                  |j                                ' g _        y N)	r   JOINEDstaterejoin_timerreset_push_bufferasynciocreate_tasksend)payloadargspushr   s      r)   on_join_push_okz6AsyncRealtimeChannel.__init__.<locals>.on_join_push_okM   sN    &--DJ##%))##DIIK0 * "Dr+   c                     j                   sy t        j                  dj                          t        j
                  _        j                  j                          y )Nzjoin push timeout for channel )	
is_joiningloggererrortopicr   ERROREDr/   r0   schedule_timeoutr7   r   s    r)   on_join_push_timeoutz;AsyncRealtimeChannel.__init__.<locals>.on_join_push_timeoutT   sD    ??LL9$**FG&..DJ..0r+   oktimeoutc                     t         j                  dj                   d       j                  j	                          t
        j                  _        j                  j                         y )Nchannel z closed)
r<   infor>   r0   r1   r   CLOSEDr/   socketremove_channelrA   s    r)   on_closez/AsyncRealtimeChannel.__init__.<locals>.on_close`   sM    KK(4::,g67##%&--DJKK&&t,r+   c                    j                   sj                  ry t        j                  dj                   d|         t
        j                  _        j                  j                          y )NrF   z error: )

is_leaving	is_closedr<   rG   r>   r   r?   r/   r0   r@   )r6   r7   r   s     r)   on_errorz/AsyncRealtimeChannel.__init__.<locals>.on_errorf   sO    $..KK(4::,hwi@A&..DJ..0r+   closer=   c                H    j                  j                  |      |        y r-   )_trigger_reply_event_name)r6   refr   s     r)   on_replyz/AsyncRealtimeChannel.__init__.<locals>.on_replyq   s    MM$005w?r+   Nr6   Dict[str, Any])rI   r   paramsr>   _joined_oncebindingsr   r#   r   rH   r/   r2   rD   r   r   join	join_pushr   _rejoin_until_connectedr0   _broadcast_endpoint_urlbroadcast_endpoint_urlreceive_onreply)	r   rI   r>   rX   r9   rB   rK   rO   rU   s	   `        r)   __init__zAsyncRealtimeChannel.__init__)   s<     
 6%*E:"BK !
 
!24-d3"))
-/{{**"4););T[[I&((*@
 '+&B&B&D#	#	1 	t_5==+	
	-	1 	(#(#	@ 	$$h/r+   c                <    | j                   t        j                  k(  S r-   )r/   r   rH   r   s    r)   rN   zAsyncRealtimeChannel.is_closedw       zz]1111r+   c                <    | j                   t        j                  k(  S r-   )r/   r   JOININGre   s    r)   r;   zAsyncRealtimeChannel.is_joining{       zz]2222r+   c                <    | j                   t        j                  k(  S r-   )r/   r   LEAVINGre   s    r)   rM   zAsyncRealtimeChannel.is_leaving   ri   r+   c                <    | j                   t        j                  k(  S r-   )r/   r   r?   re   s    r)   
is_erroredzAsyncRealtimeChannel.is_errored   ri   r+   c                <    | j                   t        j                  k(  S r-   )r/   r   r.   re   s    r)   	is_joinedzAsyncRealtimeChannel.is_joined   rf   r+   c                .    | j                   j                  S r-   )r\   rT   re   s    r)   join_refzAsyncRealtimeChannel.join_ref   s    ~~!!!r+   c                F   K    j                   j                  s" j                   j                          d{     j                  rt	        d       j
                  j                  di       }|j                  di       }|j                  di       }|j                  dd      }i }|||t        t        d  j                  j                  d	g                   d
} j                   j                  r j                   j                  |d<    j                  j                  i d|i|       d _        d fd}dfd}fd}	 j                  j                  d|      j                  d|      j                  d|	        j                          d{     S 7 d7 	w)zg
        Subscribe to the channel.

        :return: The Channel instance for method chaining.
        NzdTried to subscribe multiple times. 'subscribe' can only be called a single time per channel instancer%   r"   r#   r$   Fc                    | j                   S r-   )filter)xs    r)   r*   z0AsyncRealtimeChannel.subscribe.<locals>.<lambda>   s    !((r+   postgres_changes)r"   r#   r$   rv   access_tokenTc                   | j                  dg       }t        |      dk(  rxr  t        j                  d        y j                  j                  dg       }g }t        |      }t        |      D ]K  }||   }|j                  j                  d      }|j                  j                  d      }|j                  j                  d      }	|j                  j                  d      }
|t        |      k  r||   nd }|ry|j                  d      |k(  re|j                  d      |k(  rQ|j                  d      |	k(  r=|j                  d      |
k(  r)|j                  d      |_        |j                  |       t        j                  j                                xr   t        j                  t        d              y  |j                  d<   xr  t        j                  d        y  y )	Nrv   r   eventschematablert   idz@mismatch between server and client bindings for postgres changes)getlenr   
SUBSCRIBEDrZ   rangert   r|   appendr3   r4   unsubscribeCHANNEL_ERROR	Exception)r6   server_postgres_changesclient_postgres_changesnew_postgres_bindingsbindings_leniclient_bindingry   rz   r{   rt   server_bindingcallbackr   s               r)   r9   z7AsyncRealtimeChannel.subscribe.<locals>.on_join_push_ok   s   @G&A' ./14S*A*L*Ld!S*.--*;*;<NPR*S'(*%"#:;|,A%<Q%?N*1155g>E+2266x@F*1155g>E+2266x@F s#:;; 02! # '*..w75@*..x8FB*..w75@*..x8FB,:,>,>t,D)-44^D++D,<,<,>?  X3AA% b& = -@ 5J01OX&=&H&H$OOr+   c                x    xr5  t         j                  t        t        j                  |                    y  y r-   )r   r   r   jsondumps)r6   r   s    r)   on_join_push_errorz:AsyncRealtimeChannel.subscribe.<locals>.on_join_push_error   s1     X+99djj12 r+   c                 @    xr  t         j                  d        y  y r-   )r   	TIMED_OUT)r7   r   s    r)   rB   z<AsyncRealtimeChannel.subscribe.<locals>.on_join_push_timeout   s    NX&=&G&GNNr+   rC   r=   rD   rV   )rI   is_connectedconnectrY   r   rX   r}   listmaprZ   rw   r\   update_payloadr`   _rejoin)
r   r   r%   r"   r#   r$   access_token_payloadr9   r   rB   s
   ``        r)   	subscribezAsyncRealtimeChannel.subscribe   s     {{''++%%'''v  [[__Xr2F

;3Izz*b1HjjE2G#% &$"$(*DMM,=,=>PRT,UV%	F {{''7;{{7O7O$^4NN))>Hf%>)=> !%D/PbO NN""49AA+gi!56,,.  { (v !s"   4F!FEF!FF!F!c                   K   t         j                   _         j                  j	                           j
                  j                           fd}t         t        j                  i       }|j                  d|      j                  d|       |j                          d {     j                         s|j                  di        y y 7 (w)Nc                     t         j                  dj                   d       j                  t        j
                  d       y )NrF   z leaveleave)r<   rG   r>   rR   r   rP   rA   s    r)   _closez0AsyncRealtimeChannel.unsubscribe.<locals>._close  s0    KK(4::,f56MM---w7r+   rC   rD   )r   rk   r/   r0   r1   r\   destroyr   r   r   r`   r5   	_can_pushtrigger)r   r   
leave_pushs   `  r)   r   z AsyncRealtimeChannel.unsubscribe   s     "**
! 	8 t]%8%8"=
4(00FCoo~~tR(   	 s   BC"C#)Cc                H  K   | j                   st        d| d| j                   d      |xs | j                  }t	        | |||      }| j                         r|j                          d {    |S |j                          | j                  j                  |       |S 7 3w)Nztried to push 'z' to 'z?' before joining. Use channel.subscribe() before pushing events)
rY   r   r>   rD   r   r   r5   start_timeoutr2   r   )r   ry   r6   rD   r8   s        r)   r8   zAsyncRealtimeChannel.push  s        !%tzzl:yz  )T\\ugw7>>))+
   $$T* s   A*B",B -4B"c                   K   	 | j                   j                  | j                  dd| j                  idd       d{    y7 # t        $ r}t        |       | cY d}~S d}~ww xY ww)zx
        Coroutine that attempts to join Phoenix Realtime server via a certain topic.

        :return: Channel
        phx_joinr%   N)r>   ry   r6   rT   )rI   r5   r>   rX   r   print)r   es     r)   r[   zAsyncRealtimeChannel.join   sb     	++""!ZZ' ($++6	    	!HK	sD   A(9A AA A(A 	A%A A%A( A%%A(c                    |j                         }t        ||xs i |      }| j                  j                  |g       j	                  |       | S )aA  
        Set up a listener for a specific event.

        :param type: The type of the event to listen for.
        :param filter: Additional parameters for the event.
        :param callback: The callback function to execute when the event is received.
        :return: The Channel instance for method chaining.
        )typert   r   )lowerr
   rZ   
setdefaultr   )r   r   r   rt   type_lowercasebindings         r)   ra   zAsyncRealtimeChannel._on4  sE     ~flXV  4;;GDr+   c                    |j                         }|| j                  v r9| j                  |   D cg c]  }|j                  |k7  r| c}| j                  |<   | S c c}w )a  
        Remove a listener for a specific event type and filter.

        :param type: The type of the event to remove the listener for.
        :param filter: The filter associated with the listener to remove.
        :return: The Channel instance for method chaining.

        This method removes all bindings for the specified event type that match
        the given filter. If no matching bindings are found, the method does nothing.
        )r   rZ   rt   )r   r   rt   r   r   s        r)   _offzAsyncRealtimeChannel._offF  sd     T]]*  $}}^<-<G>>V+ <-DMM.)
 -s   Ac                6    | j                  dd|ifd      S )a  
        Set up a listener for a specific broadcast event.

        :param event: The name of the broadcast event to listen for.
        :param callback: The callback function to execute when the event is received.
        :return: The Channel instance for method chaining.
        r"   ry   c                     |       S r-   r'   r6   _r   s     r)   r*   z3AsyncRealtimeChannel.on_broadcast.<locals>.<lambda>h  
    (9r+   rt   r   ra   )r   ry   r   s     `r)   on_broadcastz!AsyncRealtimeChannel.on_broadcast[  s)     xxU#9  
 	
r+   c                L    |||d}|r||d<   | j                  d|fd      S )a  
        Set up a listener for a specific Postgres changes event.

        :param event: The name of the Postgres changes event to listen for.
        :param table: The table name for which changes should be monitored.
        :param callback: The callback function to execute when the event is received.
        :param schema: The database schema where the table exists. Default is 'public'.
        :return: The Channel instance for method chaining.
        )ry   rz   r{   rt   rv   c                     |       S r-   r'   r   s     r)   r*   z:AsyncRealtimeChannel.on_postgres_changes.<locals>.<lambda>  r   r+   r   r   )r   ry   r   r{   rz   rt   binding_filters     `    r)   on_postgres_changesz(AsyncRealtimeChannel.on_postgres_changesk  s=    $ $)FUK'-N8$xx!9  
 	
r+   c                D   K   | j                  d|       d{    y7 w)zu
        Track a user's presence.

        :param user_status: User's presence status.
        :return: None
        trackNsend_presence)r   user_statuss     r)   r   zAsyncRealtimeChannel.track  s        +666     c                D   K   | j                  di        d{    y7 w)zC
        Untrack a user's presence.

        :return: None
        untrackNr   re   s    r)   r   zAsyncRealtimeChannel.untrack  s        B///r   c                .    | j                   j                  S r-   )r#   r/   re   s    r)   presence_statez#AsyncRealtimeChannel.presence_state  s    }}"""r+   c                <    | j                   j                  |       | S )z
        Register a callback for presence sync events.

        :param callback: The callback function to execute when a presence sync event occurs.
        :return: The Channel instance for method chaining.
        )r#   on_syncr   r   s     r)   on_presence_syncz%AsyncRealtimeChannel.on_presence_sync  s     	h'r+   c                <    | j                   j                  |       | S )z
        Register a callback for presence join events.

        :param callback: The callback function to execute when a presence join event occurs.
        :return: The Channel instance for method chaining.
        )r#   on_joinr   s     r)   on_presence_joinz%AsyncRealtimeChannel.on_presence_join  s     	h'r+   c                <    | j                   j                  |       | S )z
        Register a callback for presence leave events.

        :param callback: The callback function to execute when a presence leave event occurs.
        :return: The Channel instance for method chaining.
        )r#   on_leaver   s     r)   on_presence_leavez&AsyncRealtimeChannel.on_presence_leave  s     	x(r+   c                h   K   | j                  t        j                  d||d       d{    y7 w)a  
        Sends a broadcast message to the current channel.

        :param event: The name of the broadcast event.
        :param data: The data to be sent with the message.
        :return: An asyncio.Future object representing the send operation.
        r"   )r   ry   r6   N)r8   r   r"   r   ry   datas      r)   send_broadcastz#AsyncRealtimeChannel.send_broadcast  s1      ii## 5TB
 	
 	
s   (202c                F    t        | j                  j                         dS )Nz/api/broadcast)r   rI   http_endpointre   s    r)   r^   z,AsyncRealtimeChannel._broadcast_endpoint_url  s    #DKK$=$=>?~NNr+   c                   K   | j                   ry | j                  j                  | j                         d {    t        j
                  | _        | j                  j                          d {    y 7 <7 wr-   )	rM   rI   _leave_open_topicr>   r   rh   r/   r\   resendre   s    r)   r   zAsyncRealtimeChannel._rejoin  sX     ??kk++DJJ777"**
nn##%%% 	8%s!   6A9A56A9/A70A97A9c                J    | j                   j                  xr | j                  S r-   )rI   r   rY   re   s    r)   r   zAsyncRealtimeChannel._can_push  s    {{''=D,=,==r+   c                f   K   | j                  t        j                  ||d       d {    y 7 w)N)ry   r6   )r8   r   r#   r   s      r)   r   z"AsyncRealtimeChannel.send_presence  s%     ii..%D0QRRRs   '1/1c                \   |j                         t        j                  t        j                  t        j                  t        j
                  g}||v r|| j                  j                  k7  ry dv rCt        fd| j                  j                  dg             }|D ]  }|j                  ||        y | j                  j                  g       }|D ]T  }dv r)|j                  }|j                  j                  d      r*|j                  j                  dd      j                         nd }	|j                  d      r |j                  dd      j                         nd }
|j                  di       j                  d      r0|j                  di       j                  dd      j                         nd }|r2||j                  d	g       v r|	|k(  s|	d
k(  r|j                  ||       |	|
d
fv s|j                  ||       2|j                  k(  sC|j                  ||       W y )N)insertupdatedeletec                `    | j                   j                  dd      j                         dfv S )Nry   r!   *)rt   r}   r   )r   r   s    r)   r*   z/AsyncRealtimeChannel._trigger.<locals>.<lambda>  s,     2 27B ? E E G"C(!)r+   rv   )r"   rv   r#   ry   r!   r   r   idsr   )r   r   rP   r=   r   r[   r\   rT   rt   rZ   r}   r   r|   r   )r   r   r6   rT   eventsrv   r   rZ   bind_id
bind_eventpayload_event	data_typer   s               @r)   rR   zAsyncRealtimeChannel._trigger  s   	
 ?~7C4>>CUCU<U;;%)!!"4b9 
 ,  #. , }}((<H#!%RR%jjG #>>--g6  **7B7==?!  #;;w/  GR0668! " #;;vr266v>  FB/33FB?EEG!   #w{{5"'=='94
c8I((#6#s';;((#6\\^3$$Wc27 $r+   c                    d| S )Nchan_reply_r'   )r   rT   s     r)   rS   z&AsyncRealtimeChannel._reply_event_name  s    SE""r+   c                   K   | j                   j                          | j                  j                  r| j	                          d {    y y 7 wr-   )r0   r@   rI   r   r   re   s    r)   r]   z,AsyncRealtimeChannel._rejoin_until_connected  s<     **,;;##,,.   $ s   AAAAr-   )rI   r   r>   strrX   z Optional[RealtimeChannelOptions]returnNone)r   zHOptional[Callable[[RealtimeSubscribeStates, Optional[Exception]], None]]r   r   )ry   r   r6   rW   rD   zOptional[int]r   r   )r   r   )r   r   r   r   rt   zOptional[Dict[str, Any]]r   r   )r   r   rt   rW   r   r   )ry   r   r    Callable[[Dict[str, Any]], None]r   r   )r   publicN)ry   r   r   r   r{   r   rz   r   rt   Optional[str]r   r   )r   rW   r   r   )r   r   )r   r   )r   zCallable[[], None]r   r   )r   r   r   r   )r   r   r   r   )ry   r   r   r   r   r   )r   r   r6   zOptional[Any]rT   r   )rT   r   )"__name__
__module____qualname____doc__rc   propertyrN   r;   rM   rm   ro   rq   r   r   r8   r[   ra   r   r   r   r   r   r   r   r   r   r   r^   r   r   r   rR   rS   r]   r'   r+   r)   r   r   "   s    48	K0#K0 K0 1	K0
 
K0\ 2 2 3 3 3 3 3 3 2 2 " " 	i
i
 
iV)& MQ#1<I	&* QU#+5M	$*

$D
	
(  $
1
 3
 	

 
 
 

:70#
.
	

/
	

O&>S13f#!r+   r   )%
__future__r   r3   r   loggingtypingr   r   r   r   r   r	   realtime.typesr
   r   r   r   r   r   r   r   transformersr   r#   r   r   r   r8   r   timerr   clientr   	getLoggerr   r<   r   r'   r+   r)   <module>r      sa    "    E E	 	 	 - 
  +			8	$t! t!r+   