
    ugK                     
   d dl mZ d dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dl m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl)Z)d dl.Z.d dl/m0Z0  e.jb                  e2      Z3ejh                  d ejj                  dejl                  dejn                  diZ8e8js                         D  ci c]  \  } }|| 
 c}} Z:dZ; G d deee      Z<yc c}} w )    )cached_propertyN)ConfigurationParameter$EmbeddingsQueueConfigurationInternal)SqlDBParameterValueget_sql)BatchSizeExceededError)ProducerConsumerConsumerCallbackFndecode_vectorencode_vector)OperationRecord	LogRecordScalarEncodingSeqId	Operation)System)OpenTelemetryClientOpenTelemetryGranularitytrace_method)override)defaultdict)SequenceOptionalDictSetTuplecast)UUID)Table	functions)create_topic_name         Fc                       e Zd ZU dZ G d d      Zeeee   f   ed<   e	e
   ed<   eed<   eed<   dZd	ef fd
Z edej                         ed: fd              Z edej                         ededdfd              Z edej                         ededdfd              Z edej                         edededefd              Z edej                         ededee   dee   fd              Z edej                         e	 	 	 d;dedede	e   de	e   de	e   defd              Z ed ej                         ed!eddfd"              Zedefd#       Zedefd$       Ze  ed%ej                         ede
fd&                     Z! ed'ej                         dede"e	e#   e	e   e	e   f   fd(       Z$ ed)ej                         d*eddfd+       Z% ed,ej                         de	e   de	e   de"e
e
f   fd-       Z& ed.ej                         de
fd/       Z' ed0ej                         d1edee(   ddfd2       Z) ed3ej                         d4edee(   ddfd5       Z*e+de,fd6       Z-d7e,ddfd8Z.de
fd9Z/ xZ0S )<SqlEmbeddingsQueuea  A SQL database that stores embeddings, allowing a traditional RDBMS to be used as
    the primary ingest queue and satisfying the top level Producer/Consumer interfaces.

    Note that this class is only suitable for use cases where the producer and consumer
    are in the same process.

    This is because notification of new embeddings happens solely in-process: this
    implementation does not actively listen to the the database for new records added by
    other processes.
    c            
       \    e Zd ZU eed<   eed<   eed<   eed<   eed<   dededededef
dZy)SqlEmbeddingsQueue.Subscriptionid
topic_namestartendcallbackc                 J    || _         || _        || _        || _        || _        y N)r+   r,   r-   r.   r/   )selfr+   r,   r-   r.   r/   s         X/var/www/openai/venv/lib/python3.12/site-packages/chromadb/db/mixins/embeddings_queue.py__init__z(SqlEmbeddingsQueue.Subscription.__init__I   s'     DG(DODJDH$DM    N)	__name__
__module____qualname__r    __annotations__strintr   r4    r5   r3   Subscriptionr*   B   sQ    
$$	%	% 	% 		%
 	% )	%r5   r=   _subscriptions_max_batch_size_tenant_topic_namespace   systemc                    t        t              | _        d | _        |j	                  t
              | _        |j                  j	                  d      | _        |j                  j	                  d      | _	        t        | -  |       y )N	tenant_idtopic_namespace)r   setr>   r?   requirer   _opentelemetry_clientsettingsr@   rA   superr4   )r2   rC   	__class__s     r3   r4   zSqlEmbeddingsQueue.__init__^   se    )#.#%+^^4G%H"..{; & 7 78I J r5   zSqlEmbeddingsQueue.reset_statereturnNc                 n    t         |           t        t              | _        	 | `y # t        $ r Y y w xY wr1   )rK   reset_stater   rG   r>   configAttributeError)r2   rL   s    r3   rO   zSqlEmbeddingsQueue.reset_statef   s9     	)#.	 		s   ( 	44zSqlEmbeddingsQueue.delete_topiccollection_idc                    t        | j                  | j                  |      }t        d      }| j	                         j                  |      j                  |j                  t        |      k(        j                         }| j                         5 }t        || j                               \  }}|j                  ||       d d d        y # 1 sw Y   y xY w)Nembeddings_queue)r#   r@   rA   r!   querybuilderfrom_wheretopicr   deletetxr   parameter_formatexecute)r2   rR   r,   tqcursqlparamss           r3   
delete_logzSqlEmbeddingsQueue.delete_logs   s     'LL$//

 $%U1XU177nZ889VX	 	
 WWY#!!T%:%:%<=KCKKV$ YYs   0CCzSqlEmbeddingsQueue.purge_logc                 j    t        d      } j                         j                  |      j                  t	        j
                  t        d      j                  d            j                  |j                  t         j                  |            k(        j                  t        d            j                  |j                  t        d      j                  k(        }t         j                    j"                  |      } j%                         5 }t'        | j)                               \  }}|j+                  ||       |j-                         }|rt/         fd|D              }	n
	 d d d        y t        d      }
 j                         j                  |
      j                  |
j                  t        |	      k        j                  |
j0                  t        |      k(        j3                         }t'        | j)                               \  }}|j+                  ||       d d d        y # 1 sw Y   y xY w)Nsegments
max_seq_idc              3   F   K   | ]  }j                  |d            yw)r   N)decode_seq_id).0rowr2   s     r3   	<genexpr>z/SqlEmbeddingsQueue.purge_log.<locals>.<genexpr>   s!      Ow!3!3CF!;ws   !rT   )r!   rU   rV   selectr"   Coalesceseq_idrW   
collectionr   
uuid_to_db	left_joinonr+   
segment_idr#   r@   rA   rZ   r   r[   r\   fetchallminrX   rY   )r2   rR   
segments_tsegment_ids_qr,   r_   r`   ra   results
min_seq_idr]   r^   s   `           r3   	purge_logzSqlEmbeddingsQueue.purge_log   s   
 :&
U:
 VI&&u\':'A'A2FGU%%8V)WW Yu\*+R
|!4!?!??@ 	 'LL$//

 WWY#!-1F1F1HIKCKKV$llnG  Ow OO
 Y ()A!!#qqxx."<<=qww."<<=  "!T%:%:%<=KCKKV$' YYs   AH)/B1H))H2z#SqlEmbeddingsQueue.submit_embedding	embeddingc                 \    | j                   st        d      | j                  ||g      d   S )NComponent not runningr   )_runningRuntimeErrorsubmit_embeddings)r2   rR   r{   s      r3   submit_embeddingz#SqlEmbeddingsQueue.submit_embedding   s1    
 }}677%%mi[A!DDr5   z$SqlEmbeddingsQueue.submit_embeddings
embeddingsc                 t   | j                   st        d      t        |      dk(  rg S t        |      | j                  kD  r(t	        d| j                  dd| j                  dd      | j
                  }t        | j                  | j                  |      }t        d      }| j                         j                  |      j                  |j                  |j                  |j                  |j                   |j"                  |j$                        }i }|D ]  }| j'                  |      \  }	}
}|j)                  t+        t,        |d            t+        |      t+        |d	         t+        |	      t+        |
      t+        |            }t        |      ||d	   <    | j/                         5 }t1        || j3                               \  }}| d
}|j5                  ||      j7                         }t9        t:        d       gt        |      z  }g }|D ]M  \  }}||||   <   |||      }t=        |t?        ||d   |d   |d   |d               }|jA                  |       O | jC                  ||       | j
                  jE                  d      jF                  r| jI                  |       |cd d d        S # 1 sw Y   y xY w)Nr}   r   z)
                Cannot submit more than ,zf embeddings at once.
                Please submit your embeddings in batches of size
                z or less.
                rT   	operationr+   z RETURNING seq_id, idr{   encodingmetadata)r+   r{   r   r   r   
log_offsetrecordautomatically_purge)%r~   r   lenmax_batch_sizer	   rP   r#   r@   rA   r!   rU   intocolumnsr   rX   r+   vectorr   r   !_prepare_vector_encoding_metadatainsertr   _operation_codesrZ   r   r[   r\   rt   r   r   r   r   append_notify_allget_parametervaluerz   )r2   rR   r   _r,   r]   r   	id_to_idxr{   embedding_bytesr   r   r_   r`   ra   rx   seq_idsembedding_recordsrn   r+   submit_embedding_recordembedding_records                         r3   r   z$SqlEmbeddingsQueue.submit_embeddings   s   
 }}677z?aIz?T000())-)<)<Q(? @$$Q' (  KK&LL$//

 $%T!WWQ[[!''1441::qzzR 	
 %'	#I
 66yA	]]/	+0FGHz*y//x(x(F *-YIio& $ WWY#!&$*?*?*ABKC E./Ckk#v.779GE4()C- G !#%
)/	"&*4Yr]*C' $-%*"9+"F!8!D!8!D"9+"F	$  "(()9: &  Z):;{{(()>?EE}-A YYs   'C=J..J7zSqlEmbeddingsQueue.subscribe
consume_fnr-   r.   r+   c                 X   | j                   st        d      t        | j                  | j                  |      }|xs t        j                         }| j                  ||      \  }}| j                  |||||      }| j                  |       | j                  |   j                  |       |S )Nr}   )r~   r   r#   r@   rA   uuiduuid4_validate_ranger=   	_backfillr>   add)	r2   rR   r   r-   r.   r+   r,   subscription_idsubscriptions	            r3   	subscribezSqlEmbeddingsQueue.subscribe  s     }}677&LL$//

 ,

))%5
s((ZZ

 	|$J'++L9r5   zSqlEmbeddingsQueue.unsubscriber   c                     | j                   j                         D ]J  \  }}|D ]@  }|j                  |k(  s|j                  |       t	        |      dk(  r| j                   |=   y  L y )Nr   )r>   itemsr+   remover   )r2   r   r,   subscriptionsr   s        r3   unsubscribezSqlEmbeddingsQueue.unsubscribe.  se     *.)<)<)B)B)D%J -??o5!((6=)Q. //
; !. *Er5   c                      y)Nrf   r<   r2   s    r3   	min_seqidzSqlEmbeddingsQueue.min_seqid9  s    r5   c                      y)Nl    r<   r   s    r3   	max_seqidzSqlEmbeddingsQueue.max_seqid=  s    r5   z!SqlEmbeddingsQueue.max_batch_sizec                    | j                   | j                         5 }|j                  d       |j                         }|D ]<  }d|d   v st	        |d   j                  d      d         | j                  z  | _         > | j                   d| j                  z  | _         d d d        | j                   S | j                   S # 1 sw Y   | j                   S xY w)NzPRAGMA compile_options;MAX_VARIABLE_NUMBERr   =r$   i  )r?   rZ   r\   rt   r;   splitVARIABLES_PER_RECORD)r2   r_   compile_optionsoptions       r3   r   z!SqlEmbeddingsQueue.max_batch_sizeA  s     'c56"%,,.-F,q	9/26!9??33G3J/K 550, . ''/ ,/$2K2K+KD(   ###t###!   ###s   .C AC  Cz4SqlEmbeddingsQueue._prepare_vector_encoding_metadatac                     |d   /t        t        |d         }|j                  }t        |d   |      }nd }d }|d   rt	        j
                  |d         nd }|||fS )Nr{   r   r   )r   r   r   r   jsondumps)r2   r{   encoding_typer   r   r   s         r3   r   z4SqlEmbeddingsQueue._prepare_vector_encoding_metadataX  sp     [!- :1FGM$**H+Ik,BMRO"OH8A*8M4::i
34SW(22r5   zSqlEmbeddingsQueue._backfillr   c                    t        d      }| j                         j                  |      j                  |j                  t        |j                        k(        j                  |j                  t        |j                        kD        j                  |j                  t        |j                        k        j                  |j                  |j                  |j                  |j                  |j                  |j                        j!                  |j                        }| j#                         5 }t%        || j'                               \  }}|j)                  ||       |j+                         }|D ]  }|d   rt-        |d         }	t/        |d   |	      }
nd}	d}
| j1                  |t3        |d   t5        t6        |d      |d   |
|	|d   rt9        j:                  |d         nd	      
      g        	 ddd       y# 1 sw Y   yxY w)zUBackfill the given subscription with any currently matching records in the
        DBrT   r&      Nr   r$   r%      )r   r+   r{   r   r   r   )r!   rU   rV   rW   rX   r   r,   rn   r-   r.   rl   r   r+   r   r   r   orderbyrZ   r   r[   r\   rt   r   r   _notify_oner   r   _operation_codes_invr   loads)r2   r   r]   r^   r_   r`   ra   rowsrj   r   r   s              r3   r   zSqlEmbeddingsQueue._backfilli  s    $%U1XU177n\-D-DEEFU188n\-?-?@@AU188~l.>.>??@VAHHakk1441::qzzRWQXX 	
 WWY#!!T%:%:%<=KCKKV$<<>Dq6-c!f5H*3q68<F#H!F   !'*1v#2*>s1v*F#&q6*0)1?B1vCF);4$	 	 YYs   1CHHz"SqlEmbeddingsQueue._validate_rangec                     |xs | j                         }|xs | j                         }t        |t              rt        |t              st	        d      ||k\  rt        d| d|       ||fS )z[Validate and normalize the start and end SeqIDs for a subscription using this
        impl.z2SeqIDs must be integers for sql-based EmbeddingsDBzInvalid SeqID range: z to )_next_seq_idr   
isinstancer;   	TypeError
ValueError)r2   r-   r.   s      r3   r   z"SqlEmbeddingsQueue._validate_range  sq     ,**,%T^^%%%ZS-APQQC<4UG4uEFFczr5   zSqlEmbeddingsQueue._next_seq_idc                 v   t        d      }| j                         j                  |      j                  t	        j
                  |j                              }| j                         5 }|j                  |j                                t        |j                         d         dz   cddd       S # 1 sw Y   yxY w)z%Get the next SeqID for this database.rT   r   r$   N)r!   rU   rV   rl   r"   Maxrn   rZ   r\   r   r;   fetchoner2   r]   r^   r_   s       r3   r   zSqlEmbeddingsQueue._next_seq_id  s     $%%%a(//	ahh0GHWWY#KK		$s||~a()A- YYs   '>B//B8zSqlEmbeddingsQueue._notify_allrX   c                 j    | j                   r'| j                  |   D ]  }| j                  ||        yy)z:Send a notification to each subscriber of the given topic.N)r~   r>   r   )r2   rX   r   subs       r3   r   zSqlEmbeddingsQueue._notify_all  s5     ==**51  j1 2 r5   zSqlEmbeddingsQueue._notify_oner   c                    d}g }|D ]<  }|d   |j                   k  r|d   |j                  kD  rd} n|j                  |       > 	 t        |      dkD  r|j	                  |       |r| j                  |j                         yy# t        $ rY}t        j                  d|j                  j                   d|j                   dz   t        |             t        r|Y d}~yd}~ww xY w)	z+Send a notification to a single subscriber.Fr   Tr   z6Exception occurred invoking consumer for subscription z	to topic z %sN)r-   r.   r   r   r/   r   r+   BaseExceptionloggererrorhexr,   r:   _called_from_test)r2   r   r   should_unsubscribefiltered_embeddingsr{   es          r3   r   zSqlEmbeddingsQueue._notify_one  s     # #I&#))3&0%)"&&y1 $	&'!+01!  ( " 	LLHUcnn-S12A
 ! !	s   <B 	C'AC""C'c                    t        d      }| j                         j                  |      j                  |j                        j                  d      }| j                         5 }|j                  |j                                |j                         }d d d        <| j                         dk(  }t        t        d|      g      }| j                  |       |S t        j                  |d         S # 1 sw Y   _xY w)Nembeddings_queue_configr$   r   r   )r!   rU   rV   rl   config_json_strlimitrZ   r\   r   r   _get_wal_sizer   r   
set_configfrom_json_str)r2   r]   r^   r_   resultis_fresh_systemrP   s          r3   rP   zSqlEmbeddingsQueue.config  s    +,%%a(//0A0ABHHKWWY#KK		$\\^F  >"002a7O9'(=OPF OOF#M3AA&)LL Ys   #0C11C:rP   c                     | j                         5 }|j                  dd|j                         f       d d d        	 | `y # 1 sw Y   xY w# t        $ r Y y w xY w)Nz
                INSERT OR REPLACE INTO embeddings_queue_config (id, config_json_str)
                VALUES (?, ?)
            r$   )rZ   r\   to_json_strrP   rQ   )r2   rP   r_   s      r3   r   zSqlEmbeddingsQueue.set_config  s^    WWY#KK
 &&(	 	 Y  		s   #A A  A		AAc                 \   t        d      }| j                         j                  |      j                  t	        j
                  d            }| j                         5 }|j                  |j                                t        |j                         d         cd d d        S # 1 sw Y   y xY w)NrT   *r   )r!   rU   rV   rl   r"   CountrZ   r\   r   r;   r   r   s       r3   r   z SqlEmbeddingsQueue._get_wal_size  st    $%%%a(//	0DEWWY#KK		$s||~a() YYs   ;B""B+)rM   N)NNN)1r6   r7   r8   __doc__r=   r   r:   r   r9   r   r;   r   r   r4   r   r   ALLr   rO   r    rb   rz   r   r   r   r   r   r   r   r   r   r   propertyr   r   bytesr   r   r   r   r   r   r   r   r   rP   r   r   __classcell__)rL   s   @r3   r(   r(   6   sh   	% %* c,//00c]"L!v ! 24L4P4PQ	  R	 35M5Q5QR% % %  S% 02J2N2NO)%t )% )%  P)%V 79Q9U9UVE!E.=E	E  WE 8:R:V:VWQ!Q/7/HQ	%Q  XQf 02J2N2NO
 "&#! ' 	
 e_ TN 
  P8 24L4P4PQ4 D   R 5   5   57O7S7ST$ $  U $( > $$3(3	xx}<	=3	3 02J2N2NO&l &t & P&P 68P8T8TUe_+3E?	sCx V 35M5Q5QR.c . S. 24L4P4PQ2 2(92E 2$ 2 R2 24L4P4PQ| )9L QU  R: M< M M$!E $ (*s *r5   r(   )=	functoolsr   r   chromadb.api.configurationr   r   chromadb.db.baser   r   r   chromadb.errorsr	   chromadb.ingestr
   r   r   r   r   chromadb.typesr   r   r   r   r   chromadb.configr    chromadb.telemetry.opentelemetryr   r   r   	overridesr   collectionsr   typingr   r   r   r   r   r   r   r    pypikar!   r"   loggingchromadb.ingest.impl.utilsr#   	getLoggerr6   r   ADDUPDATEUPSERTDELETEr   r   r   r   r(   )kvs   00r3   <module>r     s    %  < ; 2   # 
  # = =  #   8 
		8	$ MM1aaa	  *:)?)?)AB)AA1)AB   E*( E* Cs   C?