
    BVhSF                         d Z ddlZddlZddlZddlZddlmZ ddlmZ	 ddl
mZ ddlmZ  ed       G d d	             Z ed
g       G d dej                               Zy)z9Coordinator to help multiple threads stop when requested.    N)errors)
tf_logging)compat)	tf_exportztrain.Coordinatorc                       e Zd ZdZddZd ZddZd Zd Ze	j                  d        Zdd	Zd
 Z	 	 ddZed        Zd Zy)Coordinatora
  A coordinator for threads.

  This class implements a simple mechanism to coordinate the termination of a
  set of threads.

  #### Usage:

  ```python
  # Create a coordinator.
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate.
  coord.join(threads)
  ```

  Any of the threads can call `coord.request_stop()` to ask for all the threads
  to stop.  To cooperate with the requests, each thread must check for
  `coord.should_stop()` on a regular basis.  `coord.should_stop()` returns
  `True` as soon as `coord.request_stop()` has been called.

  A typical thread running with a coordinator will do something like:

  ```python
  while not coord.should_stop():
    ...do some work...
  ```

  #### Exception handling:

  A thread can report an exception to the coordinator as part of the
  `request_stop()` call.  The exception will be re-raised from the
  `coord.join()` call.

  Thread code:

  ```python
  try:
    while not coord.should_stop():
      ...do some work...
  except Exception as e:
    coord.request_stop(e)
  ```

  Main code:

  ```python
  try:
    ...
    coord = Coordinator()
    # Start a number of threads, passing the coordinator to each of them.
    ...start thread 1...(coord, ...)
    ...start thread N...(coord, ...)
    # Wait for all the threads to terminate.
    coord.join(threads)
  except Exception as e:
    ...exception that was passed to coord.request_stop()
  ```

  To simplify the thread implementation, the Coordinator provides a
  context handler `stop_on_exception()` that automatically requests a stop if
  an exception is raised.  Using the context handler the thread code above
  can be written as:

  ```python
  with coord.stop_on_exception():
    while not coord.should_stop():
      ...do some work...
  ```

  #### Grace period for stopping:

  After a thread has called `coord.request_stop()` the other threads have a
  fixed time to stop, this is called the 'stop grace period' and defaults to 2
  minutes.  If any of the threads is still alive after the grace period expires
  `coord.join()` raises a RuntimeError reporting the laggards.

  ```python
  try:
    ...
    coord = Coordinator()
    # Start a number of threads, passing the coordinator to each of them.
    ...start thread 1...(coord, ...)
    ...start thread N...(coord, ...)
    # Wait for all the threads to terminate, give them 10s grace period
    coord.join(threads, stop_grace_period_secs=10)
  except RuntimeError:
    ...one of the threads took more than 10s to stop after request_stop()
    ...was called.
  except Exception:
    ...exception that was passed to coord.request_stop()
  ```
  Nc                     |t         j                  f}t        |      | _        t	        j
                         | _        t	        j                         | _        d| _	        d| _
        t               | _        y)a%  Create a new Coordinator.

    Args:
      clean_stop_exception_types: Optional tuple of Exception types that should
        cause a clean stop of the coordinator. If an exception of one of these
        types is reported to `request_stop(ex)` the coordinator will behave as
        if `request_stop(None)` was called.  Defaults to
        `(tf.errors.OutOfRangeError,)` which is used by input queues to signal
        the end of input. When feeding training data from a Python iterator it
        is common to add `StopIteration` to this list.
    NF)r   OutOfRangeErrortuple_clean_stop_exception_types	threadingLock_lockEvent_stop_event_exc_info_to_raise_joinedset_registered_threads)selfclean_stop_exception_typess     V/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/training/coordinator.py__init__zCoordinator.__init__|   sa     ")$*$:$:#< ',-G'HD$!DJ (D #DDL
  #uD    c                 f    t        |t              r|d   }n|}t        || j                        rd}|S )a  Check if the exception indicated in 'ex' should be ignored.

    This method examines `ex` to check if it is an exception that should be
    reported to the users.  If yes, it returns `ex` as is, otherwise it returns
    None.

    The code returns None for exception types listed in
    `_clean_stop_exception_types`.

    Args:
      ex: None, an `Exception`, or a Python `exc_info` tuple as returned by
        `sys.exc_info()`.

    Returns:
      ex or None.
       N)
isinstancer   r   )r   exex2s      r   _filter_exceptionzCoordinator._filter_exception   s6    " "eqEcc#t778bIr   c                    | j                   5  | j                  |      }| j                  r4t        |t              r|\  }}}||t        j                         \  }}}|| j                  j                         s	|r| j                  t        |t              r6t        j                  dt        j                  |d         |       || _	        nLt        j                  dt        |      t        j                  |             t        j                         | _	        t        | j                        dk7  s| j                  d   r| j                  d   s	 t!        d| j                  z        | j                  j#                          ddd       y# t         $ r t        j                         | _	        Y Gw xY w# 1 sw Y   yxY w)	a  Request that the threads stop.

    After this is called, calls to `should_stop()` will return `True`.

    Note: If an exception is being passed in, in must be in the context of
    handling the exception (i.e. `try: ... except Exception as ex: ...`) and not
    a newly created one.

    Args:
      ex: Optional `Exception`, or Python `exc_info` tuple as returned by
        `sys.exc_info()`.  If this is the first call to `request_stop()` the
        corresponding exception is recorded and re-raised from `join()`.
    Nz!Error reported to Coordinator: %sr   )exc_infoz%Error reported to Coordinator: %s, %s   r   zHex must be a tuple or sys.exc_info must return the current exception: %s)r   r    r   r   r   sysr"   r   is_setr   logginginfor   
as_str_anytypelen
ValueErrorr   )r   r   _ex_instances       r   request_stopzCoordinator.request_stop   s    
 -!!"%b
 
b%  
![!
^ "lln
![!
$$&$))1E"LL<**2a51"$& ')D#LL@b**2.0 '*llnD# $))*a/))!,))!,	7"++,- - 	[- -N  7 ),d%	7O- -s0   EF4F)F4"F1.F40F11F44F=c                     | j                   5  d| _        d| _        | j                  j	                         r| j                  j                          ddd       y# 1 sw Y   yxY w)zcClears the stop flag.

    After this is called, calls to `should_stop()` will return `False`.
    FN)r   r   r   r   r%   clearr   s    r   
clear_stopzCoordinator.clear_stop   sS    
 
 !dl $d				 	 	" 	! ! !s   AAA"c                 6    | j                   j                         S )zSCheck if stop was requested.

    Returns:
      True if a stop was requested.
    )r   r%   r1   s    r   should_stopzCoordinator.should_stop   s     ""$$r   c              #   l   K   	 d y#  | j                  t        j                                Y yxY ww)a  Context manager to request stop when an Exception is raised.

    Code that uses a coordinator must catch exceptions and pass
    them to the `request_stop()` method to stop the other threads
    managed by the coordinator.

    This context handler simplifies the exception handling.
    Use it as follows:

    ```python
    with coord.stop_on_exception():
      # Any exception raised in the body of the with
      # clause is reported to the coordinator before terminating
      # the execution of the body.
      ...body...
    ```

    This is completely equivalent to the slightly longer code:

    ```python
    try:
      ...body...
    except:
      coord.request_stop(sys.exc_info())
    ```

    Yields:
      nothing.
    N)r   )r.   r$   r"   r1   s    r   stop_on_exceptionzCoordinator.stop_on_exception  s+     >++
3<<>*s   4	 4&14c                 8    | j                   j                  |      S )zWait till the Coordinator is told to stop.

    Args:
      timeout: Float.  Sleep for up to that many seconds waiting for
        should_stop() to become True.

    Returns:
      True if the Coordinator is told stop, False if the timeout expired.
    )r   wait)r   timeouts     r   wait_for_stopzCoordinator.wait_for_stop)  s       ))r   c                 |    | j                   5  | j                  j                  |       ddd       y# 1 sw Y   yxY w)zQRegister a thread to join.

    Args:
      thread: A Python thread to join.
    N)r   r   add)r   threads     r   register_threadzCoordinator.register_thread5  s4     
 +
""6*+ + +s   2;c                    | j                   5  || j                  }n$| j                  j                  t        |            }t	        |      }ddd       t        d |D              r6| j                  d      s%	 t        d |D              r| j                  d      s%d}t        d |D              rR|dk\  rMt        j                  |       ||z  }d|z  }t        t        ||      d      }t        d |D              r|dk\  rM|D cg c]  }|j                         s|j                  ! }}| j                   5  d| _        t               | _        | j                  r| j                  \  }}}||rE|r&t        j                   d	d
j#                  |             nt%        d	d
j#                  |      z        ddd       y# 1 sw Y   sxY wc c}w # 1 sw Y   yxY w)a  Wait for threads to terminate.

    This call blocks until a set of threads have terminated.  The set of thread
    is the union of the threads passed in the `threads` argument and the list
    of threads that registered with the coordinator by calling
    `Coordinator.register_thread()`.

    After the threads stop, if an `exc_info` was passed to `request_stop`, that
    exception is re-raised.

    Grace period handling: When `request_stop()` is called, threads are given
    'stop_grace_period_secs' seconds to terminate.  If any of them is still
    alive after that period expires, a `RuntimeError` is raised.  Note that if
    an `exc_info` was passed to `request_stop()` then it is raised instead of
    that `RuntimeError`.

    Args:
      threads: List of `threading.Threads`. The started threads to join in
        addition to the registered threads.
      stop_grace_period_secs: Number of seconds given to threads to stop after
        `request_stop()` has been called.
      ignore_live_threads: If `False`, raises an error if any of the threads are
        still alive after `stop_grace_period_secs`.

    Raises:
      RuntimeError: If any thread is still alive after `request_stop()`
        is called and the grace period expires.
    Nc              3   <   K   | ]  }|j                           y wNis_alive.0ts     r   	<genexpr>z#Coordinator.join.<locals>.<genexpr>g       ,qajjl,   g      ?gMbP?c              3   <   K   | ]  }|j                           y wrA   rB   rD   s     r   rG   z#Coordinator.join.<locals>.<genexpr>p  rH   rI   g           Tz2Coordinator stopped with threads still running: %s )r   r   unionr   listanyr:   timesleepmaxminrC   namer   r   r&   r'   joinRuntimeError)	r   threadsstop_grace_period_secsignore_live_threadsstop_wait_secsrF   
stragglersr,   r-   s	            r   rU   zCoordinator.join>  s   > 
 	****00W> Wg ,G,
,T5G5G5L
 ,G,
,T5G5G5L N
,G,
,1G31N
jj .>)n 3~/EFNn ,G,
,1G31N #*:QQZZ\!&&:J: 
 $dl!$d		 	  33;
,,Kxx
+- Bhhz"#$ $$ $A : ;$ $s$   ?F=G
G
8A<G=GGc                     | j                   S rA   )r   r1   s    r   joinedzCoordinator.joined  s    <<r   c                     | j                   5  | j                  r| j                  \  }}}|	 ddd       y# 1 sw Y   yxY w)zBIf an exception has been passed to `request_stop`, this raises it.N)r   r   )r   r,   r-   s      r   raise_requested_exceptionz%Coordinator.raise_requested_exception  sD    	 		 	  33; 
!  s   6?rA   )Nx   F)__name__
__module____qualname____doc__r   r    r.   r2   r4   
contextlibcontextmanagerr6   r:   r>   rU   propertyr]   r_    r   r   r   r      sp    ]~%>4;z	!% !+ !+F
*+ 7:$L$\  r   r   ztrain.LooperThread)v1c                   R     e Zd ZdZ	 	 d fd	Zed	d       Zd Zd Zd Z	d Z
 xZS )
LooperThreada}  A thread that runs code repeatedly, optionally on a timer.

  This thread class is intended to be used with a `Coordinator`.  It repeatedly
  runs code specified either as `target` and `args` or by the `run_loop()`
  method.

  Before each run the thread checks if the coordinator has requested stop.  In
  that case the looper thread terminates immediately.

  If the code being run raises an exception, that exception is reported to the
  coordinator and the thread terminates.  The coordinator will then request all
  the other threads it coordinates to stop.

  You typically pass looper threads to the supervisor `Join()` method.
  c                 8   t        |t              st        d|z        t        t        |           d| _        || _        || _        || _	        | j                  r|xs d| _
        |xs i | _        n|s|rt        d      | j                  j                  |        y)a  Create a LooperThread.

    Args:
      coord: A Coordinator.
      timer_interval_secs: Time boundaries at which to call Run(), or None
        if it should be called back to back.
      target: Optional callable object that will be executed in the thread.
      args: Optional arguments to pass to `target` when calling it.
      kwargs: Optional keyword arguments to pass to `target` when calling it.

    Raises:
      ValueError: If one of the arguments is invalid.
    z*'coord' argument must be a Coordinator: %sTrh   z@'args' and 'kwargs' argument require that you also pass 'target'N)r   r   r+   superrk   r   daemon_coord_timer_interval_secs_target_args_kwargsr>   )r   coordtimer_interval_secstargetargskwargs	__class__s         r   r   zLooperThread.__init__  s     e[)CeKLL	,&(DKDK 3DDL||:2dj\rdl	 ' ( (KK%r   c                 F    t        | ||||      }|j                          |S )ax  Start a LooperThread that calls a function periodically.

    If `timer_interval_secs` is None the thread calls `target(args)`
    repeatedly.  Otherwise `target(args)` is called every `timer_interval_secs`
    seconds.  The thread terminates when a stop of the coordinator is
    requested.

    Args:
      coord: A Coordinator.
      timer_interval_secs: Number. Time boundaries at which to call `target`.
      target: A callable object.
      args: Optional arguments to pass to `target` when calling it.
      kwargs: Optional keyword arguments to pass to `target` when calling it.

    Returns:
      The started thread.
    )rv   rw   rx   )rk   start)rt   ru   rv   rw   rx   loopers         r   loopzLooperThread.loop  s(    & %!4V$!')F
LLNMr   c                 n   | j                   j                         5  | j                          | j                  F| j                   j	                         s| j                          | j                   j	                         s+nt        j                         }| j                   j                  |t        j                         z
        sP|| j                  z  }| j                          | j                   j                  |t        j                         z
        sP| j                          d d d        y # 1 sw Y   y xY wrA   )	ro   r6   
start_looprp   r4   run_looprP   r:   	stop_loop)r   next_timer_times     r   runzLooperThread.run  s    		&	&	( 
oo		"	"	*++))+
--/ ++))+ ))+++++Odiik,IJ
T66
6/
--/ ++++Odiik,IJ nn  s   A!D+=BD+D++D4c                      y)zCalled when the thread starts.Nrh   r1   s    r   r   zLooperThread.start_loop      r   c                      y)zCalled when the thread stops.Nrh   r1   s    r   r   zLooperThread.stop_loop  r   r   c                 j    | j                   r' | j                   | j                  i | j                   yy)z+Called at 'timer_interval_secs' boundaries.N)rq   rr   rs   r1   s    r   r   zLooperThread.run_loop  s)    ||dllDJJ/$,,/ r   )NNN)NN)ra   rb   rc   rd   r   staticmethodr}   r   r   r   r   __classcell__)ry   s   @r   rk   rk     s?      DH&<  .		0r   rk   )rd   re   r$   r   rP   tensorflow.python.frameworkr   tensorflow.python.platformr   r&   tensorflow.python.utilr    tensorflow.python.util.tf_exportr   r   Threadrk   rh   r   r   <module>r      ss    @  
   . < ) 6 y y  yz #$%a09## a0 &a0r   