
    Vh                         d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 	 ddlmZ  G d	 d
      Z ee d       G d de
             Zy# e$ r dZY ,w xY w)z-
Tests for L{twisted.internet.epollreactor}.
    )skipIfConnectionDone)_ContinuousPolling)Clock)TestCase)epollreactorNc                   .    e Zd ZdZd Zd Zd Zd Zd Zy)
DescriptorzF
    Records reads and writes, as if it were a C{FileDescriptor}.
    c                     g | _         y N)eventsselfs    W/home/dcms/DCMS/lib/python3.12/site-packages/twisted/internet/test/test_epollreactor.py__init__zDescriptor.__init__   s	        c                      y)N    r   s    r   filenozDescriptor.fileno   s    r   c                 :    | j                   j                  d       y )Nreadr   appendr   s    r   doReadzDescriptor.doRead    s    6"r   c                 :    | j                   j                  d       y )Nwriter   r   s    r   doWritezDescriptor.doWrite#   s    7#r   c                 d    |j                  t               | j                  j                  d       y )Nlost)trapr   r   r   )r   reasons     r   connectionLostzDescriptor.connectionLost&   s     N#6"r   N)	__name__
__module____qualname____doc__r   r   r   r   r$   r   r   r   r   r      s     #$#r   r   z(epoll not supported in this environment.c                   ^    e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zd Zd Zy)ContinuousPollingTestsza
    L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
    objects.
    c                    t        t                     }| j                  |j                         t	               }| j                  |j                  |             |j                  |       | j                  |j                         | j                  |j                  j                         | j                  |j                  j                  |j                         | j                  |j                  |             y)zi
        Adding a reader when there was previously no reader starts up a
        C{LoopingCall}.
        N)r   r   assertIsNone_loopobjectassertFalse	isReading	addReaderassertIsNotNone
assertTruerunningassertIsclock_reactorr   pollerreaders      r   test_addReaderz%ContinuousPollingTests.test_addReader2       
 $EG,&,,'))&12 V\\*,,-fll((&//:((01r   c                    t        t                     }| j                  |j                         t	               }| j                  |j                  |             |j                  |       | j                  |j                         | j                  |j                  j                         | j                  |j                  j                  |j                         | j                  |j                  |             y)zi
        Adding a writer when there was previously no writer starts up a
        C{LoopingCall}.
        N)r   r   r,   r-   r.   r/   	isWriting	addWriterr2   r3   r4   r5   r6   r7   r   r9   writers      r   test_addWriterz%ContinuousPollingTests.test_addWriterA   r<   r   c                 L   t        t                     }t               }|j                  |       |j	                  |       | j                  |j                         | j                  |j                  j                         g        | j                  |j                  |             y)z=
        Removing a reader stops the C{LoopingCall}.
        N)r   r   r.   r1   removeReaderr,   r-   assertEqualr7   getDelayedCallsr/   r0   r8   s      r   test_removeReaderz(ContinuousPollingTests.test_removeReaderP   {     $EG, F#&,,'88:B?))&12r   c                 L   t        t                     }t               }|j                  |       |j	                  |       | j                  |j                         | j                  |j                  j                         g        | j                  |j                  |             y)z=
        Removing a writer stops the C{LoopingCall}.
        N)r   r   r.   r?   removeWriterr,   r-   rE   r7   rF   r/   r>   r@   s      r   test_removeWriterz(ContinuousPollingTests.test_removeWriter\   rH   r   c                     t        t                     }|j                  t                      |j	                  t                      y)zM
        Removing unknown readers and writers silently does nothing.
        N)r   r   rJ   r.   rD   )r   r9   s     r   test_removeUnknownz)ContinuousPollingTests.test_removeUnknownh   s0     $EG,FH%FH%r   c                    t        t                     }t               }|j                  |       | j	                  |j
                         |j                  t                      | j	                  |j
                         |j                  t                      | j	                  |j
                         |j                  t                      |j                  |       | j	                  |j
                         | j                  |j
                  j                         | j                  t        |j                  j                               d       y)za
        Adding multiple readers and writers results in a single
        C{LoopingCall}.
        r   N)r   r   r.   r?   r2   r-   r1   rJ   r3   r4   rE   lenr7   rF   r@   s      r   test_multipleReadersAndWritersz5ContinuousPollingTests.test_multipleReadersAndWritersp   s    
 $EG, V\\*"V\\*"V\\*"F#V\\*,,-V__<<>?Cr   c                    t               }t        |      }t               }|j                  |       | j	                  |j
                  g        |j                  d       | j	                  |j
                  dg       |j                  d       | j	                  |j
                  ddg       |j                  d       | j	                  |j
                  g d       y)za
        Adding a reader causes its C{doRead} to be called every 1
        milliseconds.
        gh㈵>r   )r   r   r   N)r   r   r   r1   rE   r   advancer   reactorr9   descs       r   test_readerPollingz)ContinuousPollingTests.test_readerPolling   s    
 '#G,|b) vh/ vv&67 &>?r   c                    t               }t        |      }t               }|j                  |       | j	                  |j
                  g        |j                  d       | j	                  |j
                  dg       |j                  d       | j	                  |j
                  ddg       |j                  d       | j	                  |j
                  g d       y)zb
        Adding a writer causes its C{doWrite} to be called every 1
        milliseconds.
        MbP?r   )r   r   r   N)r   r   r   r?   rE   r   rR   rS   s       r   test_writerPollingz)ContinuousPollingTests.test_writerPolling   s    
 '#G,|b)wi0w&89&ABr   c                    t               }t        |      }t               }d |_        |j	                  |       | j                  |j                  g        |j                  d       | j                  |j                  dg       y)zu
        If a C{doRead} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                      t               S r   r   r   r   r   <lambda>zBContinuousPollingTests.test_connectionLostOnRead.<locals>.<lambda>   s	    n. r   rX   r!   N)r   r   r   r   r1   rE   r   rR   rS   s       r   test_connectionLostOnReadz0ContinuousPollingTests.test_connectionLostOnRead   sg    
 '#G,|.b)vh/r   c                    t               }t        |      }t               }d |_        |j	                  |       | j                  |j                  g        |j                  d       | j                  |j                  dg       y)zv
        If a C{doWrite} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                      t               S r   r   r   r   r   r\   zCContinuousPollingTests.test_connectionLostOnWrite.<locals>.<lambda>   s	    ~/ r   rX   r!   N)r   r   r   r   r?   rE   r   rR   rS   s       r   test_connectionLostOnWritez1ContinuousPollingTests.test_connectionLostOnWrite   sg    
 '#G,|/b)vh/r   c                     t        t                     }t               }t               }t               }|j                  |       |j                  |       |j	                  |       |j	                  |       |j                         }| j                  |j                         g        | j                  |j                         g        | j                  t        |      d       | j                  t        |      |||h       y)zv
        L{_ContinuousPolling.removeAll} removes all descriptors and returns
        the readers and writers.
           N)r   r   r.   r1   r?   	removeAllrE   
getReaders
getWritersrO   set)r   r9   r:   rA   bothremoveds         r   test_removeAllz%ContinuousPollingTests.test_removeAll   s    
 $EG,x  ""$**,b1**,b1Wq)W'=>r   c                     t        t                     }t               }|j                  |       | j	                  ||j                                y)zb
        L{_ContinuousPolling.getReaders} returns a list of the read
        descriptors.
        N)r   r   r.   r1   assertInrd   r8   s      r   test_getReadersz&ContinuousPollingTests.test_getReaders   <    
 $EG, ff//12r   c                     t        t                     }t               }|j                  |       | j	                  ||j                                y)zc
        L{_ContinuousPolling.getWriters} returns a list of the write
        descriptors.
        N)r   r   r.   r?   rk   re   r@   s      r   test_getWritersz&ContinuousPollingTests.test_getWriters   rm   r   N)r%   r&   r'   r(   r;   rB   rG   rK   rM   rP   rV   rY   r]   r`   ri   rl   ro   r   r   r   r*   r*   +   sK    
22
3
3&D&@"C"00?&33r   r*   )r(   unittestr   twisted.internet.errorr   twisted.internet.posixbaser   twisted.internet.taskr   twisted.trial.unittestr   twisted.internetr	   ImportErrorr   r*   r   r   r   <module>rw      sm     1 9 ' +-
# #, LDEz3X z3 Fz37  Ls   A AA