
    g0                     x   d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZmZmZ ddlZej$                  r
ddlmZmZmZ  ed      Zg d	Z G d
 de      Z G d de      Zdededeej8                  f   ddfdZ G d dee         Z G d dee         Z G d de      Z  G d de      Z!y)a  Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.

.. warning::

   Unlike the standard library's `queue` module, the classes defined here
   are *not* thread-safe. To use these queues from another thread,
   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
   before calling any queue methods.

    N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric	AwaitableOptional)DequeTupleAny_T)QueuePriorityQueue	LifoQueue	QueueFull
QueueEmptyc                       e Zd ZdZy)r   z:Raised by `.Queue.get_nowait` when the queue has no items.N__name__
__module____qualname____doc__     C/var/www/openai/venv/lib/python3.12/site-packages/tornado/queues.pyr   r   /   s    Dr   r   c                       e Zd ZdZy)r   zBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nr   r   r   r   r   r   5   s    Lr   r   futuretimeoutreturnc                      |rLd fd}t         j                  j                         j                  ||       j	                  fd       y y )Nc                  n     j                         s$ j                  t        j                                y y N)doneset_exceptionr   TimeoutError)r    s   r   
on_timeoutz _set_timeout.<locals>.on_timeout@   s(    ;;=$$S%5%5%78 !r   c                 &    j                        S r%   )remove_timeout)_io_looptimeout_handles    r   <lambda>z_set_timeout.<locals>.<lambda>F   s    7+A+A.+Qr   r"   N)r   IOLoopcurrentadd_timeoutadd_done_callback)r    r!   r)   r-   r.   s   `  @@r   _set_timeoutr5   ;   sG     	9 --'') ,,WjA  !QR r   c                   &    e Zd ZddZdee   fdZy)_QueueIteratorr"   Nc                     || _         y r%   )q)selfr9   s     r   __init__z_QueueIterator.__init__J   s	    r   c                 6    | j                   j                         S r%   )r9   getr:   s    r   	__anext__z_QueueIterator.__anext__M   s    vvzz|r   )r9   z	Queue[_T]r"   N)r   r   r   r;   r   r   r?   r   r   r   r7   r7   I   s    9R= r   r7   c                      e Zd ZdZdZddeddfdZedefd       ZdefdZ	de
fdZde
fd	Z	 dd
edeeeej$                  f      ddfdZd
eddfdZ	 ddeeeej$                  f      dee   fdZdefdZddZ	 ddeeeej$                  f      ded   fdZdee   fdZddZdefdZd
eddfdZd
eddfdZddZ de!fdZ"de!fdZ#de!fdZ$y)r   a  Coordinate producer and consumer coroutines.

    If maxsize is 0 (the default) the queue size is unbounded.

    .. testcode::

        import asyncio
        from tornado.ioloop import IOLoop
        from tornado.queues import Queue

        q = Queue(maxsize=2)

        async def consumer():
            async for item in q:
                try:
                    print('Doing work on %s' % item)
                    await asyncio.sleep(0.01)
                finally:
                    q.task_done()

        async def producer():
            for item in range(5):
                await q.put(item)
                print('Put %s' % item)

        async def main():
            # Start consumer without waiting (since it never finishes).
            IOLoop.current().spawn_callback(consumer)
            await producer()     # Wait for producer to put all tasks.
            await q.join()       # Wait for consumer to finish all tasks.
            print('Done')

        asyncio.run(main())

    .. testoutput::

        Put 0
        Put 1
        Doing work on 0
        Put 2
        Doing work on 1
        Put 3
        Doing work on 2
        Put 4
        Doing work on 3
        Doing work on 4
        Done


    In versions of Python without native coroutines (before 3.5),
    ``consumer()`` could be written as::

        @gen.coroutine
        def consumer():
            while True:
                item = yield q.get()
                try:
                    print('Doing work on %s' % item)
                    yield gen.sleep(0.01)
                finally:
                    q.task_done()

    .. versionchanged:: 4.3
       Added ``async for`` support in Python 3.5.

    Nmaxsizer"   c                 4   |t        d      |dk  rt        d      || _        | j                          t	        j
                  g       | _        t	        j
                  g       | _        d| _        t               | _
        | j                  j                          y )Nzmaxsize can't be Noner   zmaxsize can't be negative)	TypeError
ValueError_maxsize_initcollectionsdeque_getters_putters_unfinished_tasksr   	_finishedset)r:   rA   s     r   r;   zQueue.__init__   s{    ?344Q;899

#))"-#))"-!"r   c                     | j                   S )z%Number of items allowed in the queue.)rE   r>   s    r   rA   zQueue.maxsize   s     }}r   c                 ,    t        | j                        S )zNumber of items in the queue.)len_queuer>   s    r   qsizezQueue.qsize   s    4;;r   c                     | j                    S r%   rQ   r>   s    r   emptyzQueue.empty   s    ;;r   c                 \    | j                   dk(  ry| j                         | j                   k\  S )Nr   F)rA   rR   r>   s    r   fullz
Queue.full   s&    <<1::<4<<//r   itemr!   zFuture[None]c                     t               }	 | j                  |       |j                  d       |S # t        $ r- | j                  j                  ||f       t        ||       Y |S w xY w)a  Put an item into the queue, perhaps waiting until there is room.

        Returns a Future, which raises `tornado.util.TimeoutError` after a
        timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.
        N)r   
put_nowait
set_resultr   rJ   appendr5   )r:   rX   r!   r    s       r   putz	Queue.put   si     	$OOD!
 d#  	*MM  $0) 	*s   0 2A&%A&c                 D   | j                          | j                  r]| j                         sJ d       | j                  j                         }| j	                  |       t        || j                                y| j                         rt        | j	                  |       y)z{Put an item into the queue without blocking.

        If no free slot is immediately available, raise `QueueFull`.
        z)queue non-empty, why are getters waiting?N)	_consume_expiredrI   rU   popleft_Queue__put_internalr   _getrW   r   )r:   rX   getters      r   rZ   zQueue.put_nowait   sw    
 	==::<L!LL<]]**,F%.vtyy{CYY[O%r   c                     t               }	 |j                  | j                                |S # t        $ r+ | j                  j                  |       t        ||       Y |S w xY w)a.  Remove and return an item from the queue.

        Returns an awaitable which resolves once an item is available, or raises
        `tornado.util.TimeoutError` after a timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.

        .. note::

           The ``timeout`` argument of this method differs from that
           of the standard library's `queue.Queue.get`. That method
           interprets numeric values as relative timeouts; this one
           interprets them as absolute deadlines and requires
           ``timedelta`` objects for relative timeouts (consistent
           with other timeouts in Tornado).

        )r   r[   
get_nowaitr   rI   r\   r5   )r:   r!   r    s      r   r=   z	Queue.get   s^    . 	*doo/0   	*MM  ()	*s   - 0A! A!c                 H   | j                          | j                  ra| j                         sJ d       | j                  j                         \  }}| j	                  |       t        |d       | j                         S | j                         r| j                         S t        )zRemove and return an item from the queue without blocking.

        Return an item if one is immediately available, else raise
        `QueueEmpty`.
        z(queue not full, why are putters waiting?N)	r_   rJ   rW   r`   ra   r   rb   rR   r   )r:   rX   putters      r   re   zQueue.get_nowait   s     	==99;J JJ;==002LD&%.vt<99;ZZ\99;r   c                     | j                   dk  rt        d      | xj                   dz  c_         | j                   dk(  r| j                  j                          yy)a  Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each `.get` used to fetch a task, a
        subsequent call to `.task_done` tells the queue that the processing
        on the task is complete.

        If a `.join` is blocking, it resumes when all items have been
        processed; that is, when every `.put` is matched by a `.task_done`.

        Raises `ValueError` if called more times than `.put`.
        r   z!task_done() called too many times   N)rK   rD   rL   rM   r>   s    r   	task_donezQueue.task_done  sR     !!Q&@AA!#!!Q&NN  'r   c                 8    | j                   j                  |      S )zBlock until all items in the queue are processed.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )rL   wait)r:   r!   s     r   joinz
Queue.join$  s     ~~""7++r   c                     t        |       S r%   )r7   r>   s    r   	__aiter__zQueue.__aiter__.  s    d##r   c                 6    t        j                         | _        y r%   )rG   rH   rQ   r>   s    r   rF   zQueue._init2  s    !'')r   c                 6    | j                   j                         S r%   )rQ   r`   r>   s    r   rb   z
Queue._get5  s    {{""$$r   c                 :    | j                   j                  |       y r%   rQ   r\   r:   rX   s     r   _putz
Queue._put8      4 r   c                     | xj                   dz  c_         | j                  j                          | j                  |       y )Nri   )rK   rL   clearru   rt   s     r   __put_internalzQueue.__put_internal=  s.    !#		$r   c                    | j                   rg| j                   d   d   j                         rG| j                   j                          | j                   r!| j                   d   d   j                         rG| j                  rd| j                  d   j                         rF| j                  j                          | j                  r| j                  d   j                         rDy y y y )Nr   ri   )rJ   r&   r`   rI   r>   s    r   r_   zQueue._consume_expiredB  s    mma 0 3 8 8 :MM!!# mma 0 3 8 8 : mma 0 5 5 7MM!!# mma 0 5 5 7m 7mr   c                     dt        |       j                  dt        t        |             d| j	                         dS )N<z at  >)typer   hexid_formatr>   s    r   __repr__zQueue.__repr__J  s'    "&t*"5"5s2d8}dllnUUr   c                 V    dt        |       j                  d| j                         dS )Nr|   r}   r~   )r   r   r   r>   s    r   __str__zQueue.__str__M  s     J//@@r   c                 :   d| j                   }t        | dd       r|d| j                  z  z  }| j                  r|dt	        | j                        z  z  }| j
                  r|dt	        | j
                        z  z  }| j                  r|d| j                  z  z  }|S )Nzmaxsize=rQ   z	 queue=%rz getters[%s]z putters[%s]z	 tasks=%s)rA   getattrrQ   rI   rP   rJ   rK   )r:   results     r   r   zQueue._formatP  s    !%/44(kDKK//F==ns4=='999F==ns4=='999F!!kD$:$:::Fr   )r   r%   r0   )%r   r   r   r   rQ   intr;   propertyrA   rR   boolrU   rW   r   r   r   floatdatetime	timedeltar]   rZ   r   r=   re   rj   rm   r7   ro   rF   rb   ru   ra   r_   strr   r   r   r   r   r   r   r   Q   s   AJ F D     s  t 0d 0 OS!)%x7I7I0I*J!K	.&r &d &" EIeX-?-?&? @A	2>B $!& EI,eX-?-?&? @A,	4,$>"- $*%b %! ! !
2 $ 
$V# VA A
 
r   r   c                   4    e Zd ZdZddZdeddfdZdefdZy)	r   a  A `.Queue` that retrieves entries in priority order, lowest first.

    Entries are typically tuples like ``(priority number, data)``.

    .. testcode::

        import asyncio
        from tornado.queues import PriorityQueue

        async def main():
            q = PriorityQueue()
            q.put((1, 'medium-priority item'))
            q.put((0, 'high-priority item'))
            q.put((10, 'low-priority item'))

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        (0, 'high-priority item')
        (1, 'medium-priority item')
        (10, 'low-priority item')
    r"   Nc                     g | _         y r%   rT   r>   s    r   rF   zPriorityQueue._initz  	    r   rX   c                 D    t        j                  | j                  |       y r%   )heapqheappushrQ   rt   s     r   ru   zPriorityQueue._put}  s    t{{D)r   c                 @    t        j                  | j                        S r%   )r   heappoprQ   r>   s    r   rb   zPriorityQueue._get  s    }}T[[))r   r0   r   r   r   r   rF   r   ru   rb   r   r   r   r   r   ]  s+    8* * **b *r   r   c                   4    e Zd ZdZddZdeddfdZdefdZy)	r   a  A `.Queue` that retrieves the most recently put items first.

    .. testcode::

        import asyncio
        from tornado.queues import LifoQueue

        async def main():
            q = LifoQueue()
            q.put(3)
            q.put(2)
            q.put(1)

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        1
        2
        3
    r"   Nc                     g | _         y r%   rT   r>   s    r   rF   zLifoQueue._init  r   r   rX   c                 :    | j                   j                  |       y r%   rs   rt   s     r   ru   zLifoQueue._put  rv   r   c                 6    | j                   j                         S r%   )rQ   popr>   s    r   rb   zLifoQueue._get  s    {{  r   r0   r   r   r   r   r   r     s+    4! ! !!b !r   r   )"r   rG   r   r   tornador   r   tornado.concurrentr   r   tornado.locksr   typingr   r	   r
   r   r   TYPE_CHECKINGr   r   r   r   __all__	Exceptionr   r   r   r   r5   r7   r   r   r   r   r   r   <module>r      s        I  ? ? 	((T]
L	 			 	SS"40B0B#BCS	SWR[ IGBK IX$*E $*N"! "!r   