
    Vh&                     b   d Z ddlZddlZddlmZmZmZmZmZm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ d
Zd
Z	 ej$                  j3                  d      rddlmZmZ dZdZ e       j;                         Z eee      Z ejC                           G d dee      Z"y# e$ r Y Bw xY w)z/
Tests for L{twisted.internet.asyncioreactor}.
    N)	AbstractEventLoopAbstractEventLoopPolicyDefaultEventLoopPolicyFutureSelectorEventLoopget_event_loopget_event_loop_policyset_event_loopset_event_loop_policy)skipIf)AsyncioSelectorReactor)platform)SynchronousTestCase   )ReactorBuilderFwin32)WindowsProactorEventLoopPolicyWindowsSelectorEventLoopPolicyTc            
          e Zd ZdZd ZdedefdZ ee	 dj                   ee      ej                  j                  ej                  j                    ej$                                     d        Z ee	 dj                   ee      ej                  j                  ej                  j                    ej$                                     d        Z ee	d	j                   ee      ej                  j                  ej                  j                    ej$                                     d
        Z ee d      d        Z ee d      d        Z ee d      d        Z ee d      d        Zd Zd Zd Zd Zy)AsyncioSelectorReactorTestsz*
    L{AsyncioSelectorReactor} tests.
    c                     t               }g fd}|j                  |       |j                  d       | j                  g        | j	                  d       | j                  dg       y)zn
        Ensure that C{reactor} has an event loop that works
        properly with L{asyncio.Future}.
        c                 d    j                  | j                                j                          y N)appendresultstop)futurereactorr   s    Y/home/dcms/DCMS/lib/python3.12/site-packages/twisted/internet/test/test_asyncioreactor.py	completedzRAsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuture.<locals>.completed=   s    MM&--/*LLN    Tr   )timeoutN)r   add_done_callback
set_resultassertEqual
runReactor)selfr   r   r    r   s    `  @r   #assertReactorWorksWithAsyncioFuturez?AsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuture5   sf    
 	 	  +$$+$(r!   policyreturnc                     t               t               |j                         | j                  fd       }S )z
        Make a new asyncio loop from a policy for use with a reactor, and add
        appropriate cleanup to restore any global state.
        c                  R    j                          t                t               y r   )closer
   r   )existingLoopexistingPolicyr   s   r   cleanUpz4AsyncioSelectorReactorTests.newLoop.<locals>.cleanUpQ   s    LLN<(!.1r!   )r   r	   new_event_loop
addCleanup)r'   r)   r0   r.   r/   r   s      @@@r   newLoopz#AsyncioSelectorReactorTests.newLoopH   s@    
 &'.0&&(		2 
	2
 r!   zLdefault event loop: {}
is not of type SelectorEventLoop on Python {}.{} ({})c                 :    t               }| j                  |       y)a  
        L{AsyncioSelectorReactor} wraps the global policy's event loop
        by default.  This ensures that L{asyncio.Future}s and
        coroutines created by library code that uses
        L{asyncio.get_event_loop} are bound to the same loop.
        N)r   r(   r'   r   s     r   -test_defaultSelectorEventLoopFromGlobalPolicyzIAsyncioSelectorReactorTests.test_defaultSelectorEventLoopFromGlobalPolicyY   s    " )*009r!   c                     | j                  t                     }t        |      }t        |       | j	                  |       y)z
        If we use the L{asyncio.DefaultLoopPolicy} to create a new event loop,
        and then pass that event loop to a new L{AsyncioSelectorReactor},
        this reactor should work properly with L{asyncio.Future}.
        N)r3   r   r   r
   r(   r'   
event_loopr   s      r   3test_newSelectorEventLoopFromDefaultEventLoopPolicyzOAsyncioSelectorReactorTests.test_newSelectorEventLoopFromDefaultEventLoopPolicym   s6      \\"8":;
(4z"009r!   zHdefault event loop: {}
is of type SelectorEventLoop on Python {}.{} ({})c                 8    | j                  t        t               y)ay  
        On Windows Python 3.5 to 3.7, L{get_event_loop()} returns a
        L{WindowsSelectorEventLoop} by default.
        On Windows Python 3.8+, L{get_event_loop()} returns a
        L{WindowsProactorEventLoop} by default.
        L{AsyncioSelectorReactor} should raise a
        L{TypeError} if the default event loop is not a
        L{WindowsSelectorEventLoop}.
        N)assertRaises	TypeErrorr   r'   s    r   1test_defaultNotASelectorEventLoopFromGlobalPolicyzMAsyncioSelectorReactorTests.test_defaultNotASelectorEventLoopFromGlobalPolicy   s    ( 	)%;<r!   z&WindowsProactorEventLoop not availablec                 l    | j                  t                     }| j                  t        t        |       y)z
        L{AsyncioSelectorReactor} will raise a L{TypeError}
        if instantiated with a L{asyncio.WindowsProactorEventLoop}
        N)r3   r   r<   r=   r   )r'   r9   s     r   test_WindowsProactorEventLoopz9AsyncioSelectorReactorTests.test_WindowsProactorEventLoop   s)     \\"@"BC
)%;ZHr!   z(WindowsSelectorEventLoop only on Windowsc                     | j                  t                     }t        |      }t        |       | j	                  |       y)zR
        L{WindowsSelectorEventLoop} works with L{AsyncioSelectorReactor}
        N)r3   r   r   r
   r(   r8   s      r   test_WindowsSelectorEventLoopz9AsyncioSelectorReactorTests.test_WindowsSelectorEventLoop   s6     \\"@"BC
(4z"009r!   z.WindowsProactorEventLoopPolicy only on Windowsc                     t        t                      | j                  d        | j                  t              5  t                ddd       y# 1 sw Y   yxY w)z
        L{AsyncioSelectorReactor} will raise a L{TypeError}
        if L{asyncio.WindowsProactorEventLoopPolicy} is default.
        c                      t        d       S r   r    r!   r   <lambda>zQAsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicy.<locals>.<lambda>        5d ; r!   N)r   r   r2   r<   r=   r   r>   s    r   #test_WindowsProactorEventLoopPolicyz?AsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicy   sF     	<>?;<y) 	%"$	% 	% 	%s   AAz.WindowsSelectorEventLoopPolicy only on Windowsc                     t        t                      | j                  d        t               }| j	                  |       y)zy
        L{AsyncioSelectorReactor} will work if
        if L{asyncio.WindowsSelectorEventLoopPolicy} is default.
        c                      t        d       S r   rF   rG   r!   r   rH   zQAsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicy.<locals>.<lambda>   rI   r!   N)r   r   r2   r   r(   r5   s     r   #test_WindowsSelectorEventLoopPolicyz?AsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicy   s3     	<>?;<(*009r!   c                     t         r%t        t                      | j                  d        t	               }|j                         }| j                  |d       | j                  |d       y)z0L{seconds} should return a plausible epoch time.c                      t        d       S r   rF   rG   r!   r   rH   z:AsyncioSelectorReactorTests.test_seconds.<locals>.<lambda>       $9$$? r!   i ^l    #G4 N)!hasWindowsSelectorEventLoopPolicyr   r   r2   r   secondsassertGreater
assertLess)r'   r   r   s      r   test_secondsz(AsyncioSelectorReactorTests.test_seconds   sT    ,!"@"BCOO?@(*" 	6:. 	
+r!   c                    t         r%t        t                      | j                  d        t	               dgfd}j                         }j                  d|      }|j                  d       j                  dj                         j                          | j                  d          | j                  d   |z
  d       y)zQ
        L{DelayedCall.reset()} properly reschedules timer to later time
        c                      t        d       S r   rF   rG   r!   r   rH   zJAsyncioSelectorReactorTests.test_delayedCallResetToLater.<locals>.<lambda>   rP   r!   Nc                  ,     j                         d<   y Nr   rR   r   timer_called_ats   r   on_timerzJAsyncioSelectorReactorTests.test_delayedCallResetToLater.<locals>.on_timer       !(!2OAr!   r         ?r   皙?)rQ   r   r   r2   r   rR   	callLaterresetr   runassertIsNotNonerS   )r'   r]   
start_timedcr   r\   s       @@r   test_delayedCallResetToLaterz8AsyncioSelectorReactorTests.test_delayedCallResetToLater   s     -!"@"BCOO?@(*&	3 __&
q(+
!W\\*_Q/0?1-
:C@r!   c                 H   t         rt        t                      t               dgfd}j	                         }j                  d|      }|j                  d       j                  dj                         ddl}ddl	m
} |j                         } ||      5  j                          ddd       | j                  |j                         d       | j                  d          | j!                  d   |z
  d       t         rt        d       yy# 1 sw Y   hxY w)	zS
        L{DelayedCall.reset()} properly reschedules timer to earlier time
        Nc                  ,     j                         d<   y rY   rZ   r[   s   r   r]   zLAsyncioSelectorReactorTests.test_delayedCallResetToEarlier.<locals>.on_timer   r^   r!   r_   r   r   )redirect_stderr r`   )rQ   r   r   r   rR   ra   rb   r   io
contextlibrj   StringIOrc   r%   getvaluerd   rT   )	r'   r]   re   rf   rl   rj   stderrr   r\   s	          @@r   test_delayedCallResetToEarlierz:AsyncioSelectorReactorTests.test_delayedCallResetToEarlier   s     -!"@"BC(*&	3 __&
sH-
!W\\*.V$ 	KKM	 	*B/_Q/0*Z7=,!$' -	 	s    DD!c                 4   t         rt        t                      t        j                         }t        j
                          	 t        t        j                               }d}t               }t        |      D ]  }|j                  dd         |j                          t        t        j                               }| j                  ||z
  |z  d       |rt        j                          	 t         rt        d       yy# |rt        j                          w w xY w)zW
        L{AsyncioSelectorReactor.callLater()} doesn't leave cyclic references
        i  r   c                       y r   rG   rG   r!   r   rH   zOAsyncioSelectorReactorTests.test_noCycleReferencesInCallLater.<locals>.<lambda>  s    r!   r   N)rQ   r   r   gc	isenableddisablelenget_objectsr   rangera   runUntilCurrentrT   enable)r'   gc_was_enabledobjects_beforetimer_countr   _objects_afters          r   !test_noCycleReferencesInCallLaterz=AsyncioSelectorReactorTests.test_noCycleReferencesInCallLater  s     -!"@"BC


	 !12NK,.G;' 3!!!\23##% 01MOO]^;{JAN		,!$' - 		 s   BC> >DN) __name__
__module____qualname____doc__r(   r   r   r3   r   _defaultEventLoopIsSelectorformattype_defaultEventLoopsysversion_infomajorminorr   getTyper6   r:   r?   !hasWindowsProactorEventLoopPolicyrA   rQ   rC   rJ   rM   rU   rg   rq   r   rG   r!   r   r   r   0   s   )&5 :K " ''	%v"#""""H	 
	:	: ''	%v"#""""H	 
		:		: #	%v"#""""H	 
	
=	
= --/WII --2:	: --8%	% --8:	:,A0(>(r!   r   )#r   rt   r   asyncior   r   r   r   r   r   r	   r
   r   unittestr   twisted.internet.asyncioreactorr   twisted.python.runtimer   twisted.trial.unittestr   reactormixinsr   r   rQ   
startswithr   r   ImportErrorr1   r   
isinstancer   r-   r   rG   r!   r   <module>r      s    
 

 
 
  B + 6 )$) !$) !
	
||w'	

 -1),0)
 +,;;= ():<MN     x(.2E x(  		s   'B& &B.-B.