
    ug,                        d dl Z d dlZd dlmZmZmZmZmZ d dlm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZ d dlmZmZmZmZ d dlmZmZmZ d d	lmZ d d
lmZm Z  dZ!dZ"dZ#dZ$ G d dee      Z% G d dee      Z& G d dee      Z'y)    N)AnyCallableDictOptionalcast)clientconfigwatch)ApiException)EnforceOverridesoverride)System)Member
MemberlistMemberlistProviderSegmentDirectory)OpenTelemetryGranularityadd_attributes_to_current_spantrace_method)Segment)assignmurmur3hasher<   chromazchroma.clusterzsvc.cluster.localc                   t     e Zd ZU dZeed<   def fdZedefd       Z	ede
ddfd	       Zdeddfd
Z xZS )MockMemberlistProviderz&A mock memberlist provider for testing_memberlistsystemc                 z    t         |   |       t        dd      t        dd      t        dd      g| _        y )Naz10.0.0.1idipbz10.0.0.2cz10.0.0.3)super__init__r   r   selfr   	__class__s     h/var/www/openai/venv/lib/python3.12/site-packages/chromadb/segment/impl/distributed/segment_directory.pyr'   zMockMemberlistProvider.__init__%   s8     cj)cj)cj)
    returnc                     | j                   S N)r   r)   s    r+   get_memberlistz%MockMemberlistProvider.get_memberlist-   s    r,   
memberlistNc                      y r/    r)   r2   s     r+   set_memberlist_namez*MockMemberlistProvider.set_memberlist_name1   s    r,   c                 D    || _         | j                  D ]
  } ||        y)z]Updates the memberlist and calls all registered callbacks. This mocks an update from a k8s CRN)r   	callbacksr)   r2   callbacks      r+   update_memberlistz(MockMemberlistProvider.update_memberlist5   s     %HZ  'r,   )__name__
__module____qualname____doc__r   __annotations__r   r'   r   r1   strr6   r;   __classcell__r*   s   @r+   r   r       sh    0
v 
  
     c d  !J !4 !r,   r   c                       e Zd ZU dZej
                  ed<   ee   ed<   ee	   ed<   e
j                  ed<   ee
j                     ed<   e
j                  ed<   e
j                  ed<   d	ef fd
Zed fd       Zed fd       Zedd       Zede	fd       Zededdfd       Zde	fdZddZdeeef   de	fdZde	ddfdZ xZS ) CustomResourceMemberlistProviderzMA memberlist provider that uses a k8s custom resource to store the memberlist_kubernetes_api_memberlist_name_curr_memberlist_curr_memberlist_mutex_watch_thread_kill_watch_thread_done_waiting_for_resetr   c                 >   t         |   |       t        j                          t	        j
                         | _        d | _        d | _        d | _	        t        j                         | _        t        j                         | _        t        j                         | _        y r/   )r&   r'   r	   load_configr   CustomObjectsApirF   rJ   rG   rH   	threadingLockrI   EventrK   rL   r(   s     r+   r'   z)CustomResourceMemberlistProvider.__init__G   sr     %668! $ $&/nn&6#"+//"3'0'8$r,   r-   Nc                     | j                   t        d      | j                          | j                  j	                          | j                          t        |          S )Nz+Memberlist name must be set before starting)rG   
ValueErrorr1   rL   clear_watch_worker_memberlistr&   startr)   r*   s    r+   rW   z&CustomResourceMemberlistProvider.startR   sQ      (JKK$$**,%%'w}r,   c                 2   d | _         d | _        | j                  j                          | j                  | j                  j                          d | _        | j                  j                          | j                  j                          t        | %         S r/   )
rH   rG   rK   setrJ   joinrU   rL   r&   stoprX   s    r+   r\   z%CustomResourceMemberlistProvider.stop[   s}     $ $ 	##%)##%!%%'$$**,w|~r,   c           
         | j                   j                  j                  d      st        d      | j                  r| j
                  j                          | j                  j                  t        dt        d| j                  ddg id       | j
                  j                  d	       t        j                  d
       y y )Nallow_resetzResetting the database is not allowed. Set `allow_reset` to true in the config in tests or other non-production environments where reset should be permitted.v1memberlists
MemberListmembers)kindspec)groupversion	namespacepluralnamebodyg      @g      ?)_systemsettingsrequirerT   rG   rL   rU   rF   patch_namespaced_custom_objectKUBERNETES_GROUPKUBERNETES_NAMESPACEwaittimesleepr0   s    r+   reset_statez,CustomResourceMemberlistProvider.reset_statei   s    
 ||$$,,]; p    ((..0  ??&.$**(&O @ 
 ((--c2 JJsO# !r,   c                 \    | j                   | j                         | _         | j                   S r/   )rH   _fetch_memberlistr0   s    r+   r1   z/CustomResourceMemberlistProvider.get_memberlist   s+      ($($:$:$<D!$$$r,   r2   c                     || _         y r/   )rG   r5   s     r+   r6   z4CustomResourceMemberlistProvider.set_memberlist_name   s
     *r,   c                    | j                   j                  t        dt        d| j                         }t        t        t        t        f   |      }d|vrg S t        t        t        t        f   |d         }| j                  |      S )Nr_   r`   )re   rf   rg   rh   ri   rd   )
rF   get_namespaced_custom_objectro   rp   rG   r   r   rA   r   _parse_response_memberlist)r)   api_responseresponse_specs      r+   rv   z2CustomResourceMemberlistProvider._fetch_memberlist   s    ++HH"* ))* I 
 DcNL9%IT#s(^\&-AB..}==r,   c                      d fd} j                   /t        j                  |d      }|j                          | _         y t	        d      )Nc                     t        j                         dfd} j                  j                         s#	  |         j                  j                         s#y # t        $ r}|j
                  dk(  r	 Y d }~:d }~ww xY w)Nc            	         j                  j                  j                  t        dt        ddj
                   t              D ]  } t        t        t        t        f   |       } | d   d   }t        t        t        t        f   |      }j                  5  j                  |      _        d d d        j                  j                         j                  j                   j#                  d      sj$                  j'                         rt)        j                        dkD  sj$                  j+                           y # 1 sw Y   xY w)	Nr_   r`   zmetadata.name=)re   rf   rg   rh   field_selectortimeout_secondsobjectrd   r^   r   )streamrF   list_namespaced_custom_objectro   rp   rG   WATCH_TIMEOUT_SECONDSr   r   rA   r   rI   rz   rH   _notifyrk   rl   rm   rL   is_setlenrZ   )eventr|   r)   ws     r+   do_watchz^CustomResourceMemberlistProvider._watch_worker_memberlist.<locals>.run_watch.<locals>.do_watch   s   XX((FF* 2(%3D4I4I3J#K$9 & E !c3h7E$)(OF$;M$(c3h$GM44040O0O)1- 5 LL!6!67--55mD $ < < C C E 5 56:4488:- 54s   E

E	i  r-   N)r
   WatchrK   r   r   status)r   er   r)   s     @r+   	run_watchzLCustomResourceMemberlistProvider._watch_worker_memberlist.<locals>.run_watch   sk    A;6 --446J --446 	 $ xx3s   A 	A?%A::A?T)targetdaemonz"A watch thread is already running.r   )rJ   rP   ThreadrW   	Exception)r)   r   threads   `  r+   rV   z9CustomResourceMemberlistProvider._watch_worker_memberlist   sF    %	N %%%YtDFLLN!'D@AAr,   api_response_specc                     d|vrg S g }|d   D ].  }|d   }d|v r|d   nd}|j                  t        ||             0 |S )Nrb   	member_id	member_ip r!   )appendr   )r)   r   parsedmr"   r#   s         r+   rz   z;CustomResourceMemberlistProvider._parse_response_memberlist   s[     --I"9-A;B#.!#3;BMM&B2./ . r,   c                 6    | j                   D ]
  } ||        y r/   )r8   r9   s      r+   r   z(CustomResourceMemberlistProvider._notify   s    HZ  'r,   r   )r<   r=   r>   r?   r   rO   r@   r   rA   r   rP   rQ   r   rR   r   r'   r   rW   r\   rt   r1   r6   rv   rV   r   r   rz   r   rB   rC   s   @r+   rE   rE   <   s   W,,,sm#z**%NN*I,,--!'&__,	9v 	9      6 %
 % %
 +c +d + +>: >/Bb
!%c3h
	
!* ! !r,   rE   c                       e Zd ZU eed<   ej                  ed<   ee   ed<   de	f fdZ
ed fd       Zed fd	       Zed
edefd       Zedeegdf   ddfd       Z edej*                        deddfd       Zdedee   fdZ xZS )RendezvousHashSegmentDirectory_memberlist_providerrI   rH   r   c                    t         |   |       | j                  t              | _        |j
                  j                  d      }| j                  j                  |       d | _        t        j                         | _
        y )Nworker_memberlist_name)r&   r'   rm   r   r   rl   r6   rH   rP   rQ   rI   )r)   r   memberlist_namer*   s      r+   r'   z'RendezvousHashSegmentDirectory.__init__   sb     $(LL1C$D! //112JK!!55oF $&/nn&6#r,   r-   Nc                     | j                   j                         | _        | j                   j                  | j                         t
        |          S r/   )r   r1   rH   $register_updated_memberlist_callback_update_memberlistr&   rW   rX   s    r+   rW   z$RendezvousHashSegmentDirectory.start   sE     $ 9 9 H H J!!FF##	
 w}r,   c                 j    | j                   j                  | j                         t        |          S r/   )r   &unregister_updated_memberlist_callbackr   r&   r\   rX   s    r+   r\   z#RendezvousHashSegmentDirectory.stop   s.    !!HH##	
 w|~r,   segmentc                    | j                   t        | j                         dk(  rt        d      t        |d   j                  | j                   D cg c]  }|j
                   c}t        d      d   }| j                  |      }| j                   D ]A  }|j
                  |k(  s|j                   |j                  dk7  s0|j                   d}|c S  | d| dt         dt         d}|S c c}w )Nr   zMemberlist is not initialized
collection   r   z:50051.)rH   r   rT   r   hexr"   r   extract_service_namer#   rp   HEADLESS_SERVICE)r)   r   r   
assignmentservice_namememberendpoints          r+   get_segment_endpointz3RendezvousHashSegmentDirectory.get_segment_endpoint   s      (C0E0E,F!,K<==L!%%0010aQTT01	

 
 00< ++FyyJ&99(VYY"_"())F3H#O	 , !\<.2F1GqIYHZZ`a 2s   C-
r:   c                     t               r/   )NotImplementedError)r)   r:   s     r+   !register_updated_segment_callbackz@RendezvousHashSegmentDirectory.register_updated_segment_callback  s     "##r,   z1RendezvousHashSegmentDirectory._update_memberlistr2   c                     | j                   5  t        d|D cg c]  }|j                   c}i       || _        d d d        y c c}w # 1 sw Y   y xY w)Nnew_memberlist)rI   r   r"   rH   )r)   r2   r   s      r+   r   z1RendezvousHashSegmentDirectory._update_memberlist  sN    
 ((*!*#=*QADD*#=> %/D!	 )(#= )(s   AAAAApod_namec                 j    |j                  d      }t        |      dkD  rdj                  |d d       S y )N-r   )splitr   r[   )r)   r   partss      r+   r   z3RendezvousHashSegmentDirectory.extract_service_name%  s3    s#u:>88E#2J''r,   r   )r<   r=   r>   r   r@   rP   rQ   r   r   r   r'   r   rW   r\   r   rA   r   r   r   r   r   ALLr   r   rB   rC   s   @r+   r   r      s    ,,%NN*z**7v 7     G   . $ 'D1$	$ $
 ; $$/Z /D /	/S Xc] r,   r   )(rP   rr   typingr   r   r   r   r   
kubernetesr   r	   r
   kubernetes.client.restr   	overridesr   r   chromadb.configr   chromadb.segment.distributedr   r   r   r    chromadb.telemetry.opentelemetryr   r   r   chromadb.typesr   chromadb.utils.rendezvous_hashr   r   r   rp   ro   r   r   rE   r   r4   r,   r+   <module>r      s      6 6 , , / 0 "  
 # @   # & !/1A !8`!'9;K `!FL%57G Lr,   