
    Vh                         d Z ddlZddlZddlZ	 ddlZddlmZ ddl	m
Z
 ddlmZ  G d dej                        Z G d	 d
ej                        Z G d dej                        Zy# e$ r dZY Uw xY w)z&
Tests for L{twisted.internet.fdesc}.
    N)fdescznot supported on this platform)untilConcludes)unittestc                       e Zd ZdZd Zd Zy)NonBlockingTestszE
    Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}.
    c                    t        j                         \  }}| j                  t         j                  |       | j                  t         j                  |       | j	                  t        j
                  |t
        j                        t         j                  z         t        j                  |       | j                  t        j
                  |t
        j                        t         j                  z         y)zR
        L{fdesc.setNonBlocking} sets a file description to non-blocking.
        N)ospipe
addCleanupcloseassertFalsefcntlF_GETFL
O_NONBLOCKr   setNonBlocking
assertTrueselfrws      G/home/dcms/DCMS/lib/python3.12/site-packages/twisted/test/test_fdesc.pytest_setNonBlockingz$NonBlockingTests.test_setNonBlocking   s     wwy1!$!$Q6FGQAu}}5EF    c                    t        j                         \  }}| j                  t         j                  |       | j                  t         j                  |       t	        j
                  |       t	        j                  |       | j                  t        j                  |t        j                        t         j                  z         y)zK
        L{fdesc.setBlocking} sets a file description to blocking.
        N)r	   r
   r   r   r   r   setBlockingr   r   r   r   r   s      r   test_setBlockingz!NonBlockingTests.test_setBlocking'   sy     wwy1!$!$Q!Q6FGr   N)__name__
__module____qualname____doc__r   r    r   r   r   r      s    	G	Hr   r   c                   X    e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zd Zy)ReadWriteTestsz<
    Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}.
    c                     t        j                         \  | _        | _        t	        j
                  | j                         t	        j
                  | j                         y)zG
        Create a non-blocking pipe that can be used in tests.
        N)r	   r
   r   r   r   r   r   s    r   setUpzReadWriteTests.setUp8   s;     TVV$TVV$r   c                     	 t        j                  | j                         	 t        j                  | j                         y# t        $ r Y ,w xY w# t        $ r Y yw xY w)z
        Close pipes.
        N)r	   r   r   OSErrorr   r%   s    r   tearDownzReadWriteTests.tearDown@   sS    	HHTVV	HHTVV  		  		s"   A A 	AA	AAc                 B    t        j                  | j                  |      S )z)
        Write data to the pipe.
        )r   	writeToFDr   )r   ds     r   writezReadWriteTests.writeM   s     tvvq))r   c                 r    g }t        j                  | j                  |j                        }||r|d   S y|S )z*
        Read data from the pipe.
        r   r   )r   
readFromFDr   append)r   lress      r   readzReadWriteTests.readS   s<     tvvqxx0;tJr   c                     | j                  d      }| j                  |dkD         | j                         }| j                  t	        |      |       | j                  dd| |       y)z
        Test that the number of bytes L{fdesc.writeToFD} reports as written
        with its return value are seen by L{fdesc.readFromFD}.
        s   hellor   N)r-   r   r3   assertEquallen)r   nss      r   test_writeAndReadz ReadWriteTests.test_writeAndReada   sW    
 JJx AIIKQ#"1q)r   c                 |   ddz  }| j                  |      }| j                  |dkD         g }d}d}||k  s|dk  rA|j                  | j                                |t	        |d         z  }|dz  }||k  r;|dk  rAdj                  |      }| j                  t	        |      |       | j                  |d| |       y)	z}
        Similar to L{test_writeAndRead}, but use a much larger string to verify
        the behavior for that case.
        s
   0123456879i'  r   2      r   N)r-   r   r0   r3   r6   joinr5   )r   origwrittenresultresultlengthis         r   test_writeAndReadLargez%ReadWriteTests.test_writeAndReadLargel   s    
 u$**T"!$W$BMM$))+&Cr
O+LFA	 W$B
 &!Vg.hw0r   c                     g }t        j                  | j                  |j                        }| j	                  |g        | j                  |       y)z
        Verify that reading from a file descriptor with no data does not raise
        an exception and does not result in the callback function being called.
        N)r   r/   r   r0   r5   assertIsNone)r   r1   rA   s      r   test_readFromEmptyz!ReadWriteTests.test_readFromEmpty   sA    
 !!$&&!((3B&!r   c                     t        j                  | j                         | j                  | j	                         t
        j                         y)z
        Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor
        returns a connection done indicator.
        N)r	   r   r   r5   r3   r   CONNECTION_DONEr%   s    r   test_readFromCleanClosez&ReadWriteTests.test_readFromCleanClose   0    
 	e&;&;<r   c                     t        j                  | j                         | j                  | j	                  d      t
        j                         y)z
        Verify that writing with L{fdesc.writeToFD} when the read end is closed
        results in a connection lost indicator.
           sN)r	   r   r   r5   r-   r   CONNECTION_LOSTr%   s    r   test_writeToClosedz!ReadWriteTests.test_writeToClosed   3    
 	D)5+@+@Ar   c                     t        j                  | j                         | j                  | j	                         t
        j                         y)z
        Verify that reading with L{fdesc.readFromFD} when the read end is
        closed results in a connection lost indicator.
        N)r	   r   r   r5   r3   r   rN   r%   s    r   test_readFromInvalidz#ReadWriteTests.test_readFromInvalid   rK   r   c                     t        j                  | j                         | j                  | j	                  d      t
        j                         y)z
        Verify that writing with L{fdesc.writeToFD} when the write end is
        closed results in a connection lost indicator.
        rM   N)r	   r   r   r5   r-   r   rN   r%   s    r   test_writeToInvalidz"ReadWriteTests.test_writeToInvalid   rP   r   c                 P   t         j                  }d }|t         _        	 | j                  | j                  d      d       |t         _        d }|t         _        	 | j                  | j                  d      d       |t         _        y# |t         _        w xY w# |t         _        w xY w)z8
        Test error path for L{fdesc.writeTod}.
        c                 D    t               }t        j                  |_        |N)r(   errnoEAGAINfddataerrs      r   eagainWritez4ReadWriteTests.test_writeErrors.<locals>.eagainWrite   s    )CCIIr   rM   r   c                 D    t               }t        j                  |_        |rW   )r(   rX   EINTRrZ   s      r   
eintrWritez3ReadWriteTests.test_writeErrors.<locals>.eintrWrite   s    )CCIIr   N)r	   r-   r5   )r   
oldOsWriter^   ra   s       r   test_writeErrorszReadWriteTests.test_writeErrors   s     XX
	
 	"TZZ-q1!BH	
 	"TZZ-q1!BH "BH "BHs   !B !B BB%N)r   r   r   r    r&   r)   r-   r3   r9   rD   rG   rJ   rO   rR   rT   rc   r!   r   r   r#   r#   3   sE    %*	*1("=B=B"r   r#   c                   &    e Zd ZdZdZd Zd Zd Zy)CloseOnExecTestszL
    Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}.
    z
import os, errno
try:
    os.write(%d, b'lul')
except OSError as e:
    if e.errno == errno.EBADF:
        os._exit(0)
    os._exit(5)
except BaseException:
    os._exit(10)
else:
    os._exit(20)
c                    t        j                         }|dk(  rS	 t        j                  t        j                  t        j                  d| j
                  |j                         fz  g       y t        t         j                  |d      d   S # t        $ r, dd l}|j                          t        j                  d       Y y w xY w)Nr   z-c   r=   )r	   forkexecvsys
executableprogramfilenoBaseException	traceback	print_exc_exitr   waitpid)r   fObjpidro   s       r   _execWithFileDescriptorz(CloseOnExecTests._execWithFileDescriptor   s    ggi!8	NN^^T4<<4;;=:J+JK ""**c15a88 !  ##%	s   AB 2C ?C c                 \   t        | j                         d      5 }t        j                  |j	                                | j                  |      }| j                  t        j                  |             | j                  t        j                  |      d       ddd       y# 1 sw Y   yxY w)z
        A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited
        by a new process image created with one of the exec family of
        functions.
        wbr   N)openmktempr   _setCloseOnExecrm   ru   r   r	   	WIFEXITEDr5   WEXITSTATUSr   rs   statuss      r   test_setCloseOnExecz$CloseOnExecTests.test_setCloseOnExec   s}     $++-& 	8$!!$++-011$7FOOBLL01R^^F3Q7		8 	8 	8s   A>B""B+c                    t        | j                         d      5 }t        j                  |j	                                t        j
                  |j	                                | j                  |      }| j                  t        j                  |             | j                  t        j                  |      d       ddd       y# 1 sw Y   yxY w)z
        A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by
        a new process image created with one of the exec family of functions.
        rw      N)rx   ry   r   rz   rm   _unsetCloseOnExecru   r   r	   r{   r5   r|   r}   s      r   test_unsetCloseOnExecz&CloseOnExecTests.test_unsetCloseOnExec   s    
 $++-& 	9$!!$++-0##DKKM211$7FOOBLL01R^^F3R8	9 	9 	9s   B!CCN)r   r   r   r    rl   ru   r   r   r!   r   r   re   re      s    G9$
8
9r   re   )r    rX   r	   rj   r   twisted.internetr   ImportErrorskiptwisted.python.utilr   twisted.trialr   SynchronousTestCaser   r#   re   r!   r   r   <module>r      sz     	 
' ' . "Hx33 H8Q"X11 Q"h;9x33 ;9s  ,+D,s   A& &A0/A0