
    AVhZ                        d Z ddlZddlZddlZddlmZ ddlmZ ddl	m
Z ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  d Z!ddZ" ejF                         Z$ejJ                  d d       Z&ejJ                  d        Z'd Z( G d de)      Z*d Z+d Z, G d dejZ                        Z. G d dej^                        Z0y)!z;Class MirroredStrategy implementing tf.distribute.Strategy.    N)
pywrap_tfe)ag_ctx)api)distribute_lib)distribute_utils)shared_variable_creator)context)def_functiondevice)ops)summary_ops_v2)variable_scope)
tf_logging)coordinator)traceback_utilsc                 Z    t         j                  j                  |       j                  dk(  S )NGPU)	tf_device
DeviceSpecfrom_stringdevice_typer   s    Y/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/mirrored_run.py_is_gpu_devicer   &   s$    				)	)&	1	=	=	FF    c                     |d}|i }t        t        j                        rj                  rDt	         j
                  j                  D cg c]  }t        |       c}      rt         ||      S  t        vrt        j                         t         <   t            j                        }|$ fd}j                  |      }|t            <    ||i |S t        j                         r=t!        j"                  t         j$                  d j&                  j(                  z  d       n(t+        j,                  t/        j0                               t         ||      S c c}w )a  Call `fn` on each worker devices(replica).

  It's highly recommended to wrap the call to this function inside a
  `tf.function`, otherwise the performance is poor.

  Args:
    strategy: `tf.distribute.Strategy`.
    fn: function to call on each worker devices.
    args: positional arguments to `fn`.
    kwargs: keyword arguments to `fn`.

  Returns:
    Wrapped returned value of `fn` from all replicas.
   c                  4    t        j                  | |      S N)call_for_each_replicapython_function)argskwargsfnstrategys     r   
wrapped_fnz)call_for_each_replica.<locals>.wrapped_fnN   s    $Xr/A/A4PPr   )r!   zUsing %s eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.   )
isinstancer
   Function_jit_compileallextendedworker_devicesr   _call_for_each_replica_cfer_fn_cacheweakrefWeakKeyDictionaryget_cloner	   executing_eagerlylogginglog_first_nWARN	__class____name__	autograph
tf_convertautograph_ctxcontrol_status_ctx)r%   r$   r"   r#   dwrappedr&   s   ``     r   r    r    *   sU    
\D^FL))* 
3$,$5$5$D$DEq	EG#Hb$??~%!(!:!:!<nXX&**2.G
Q 		$  &g%,nXr"D#F##   
 #+"4"4"="=>
 @AB 
		b-"B"B"D	EB	"dF	;;G 	Fs   E%c              #   ,  K   |rD| j                         5  t        j                         5  ||| _        d ddd       ddd       y| j                         5  ||| _        d ddd       y# 1 sw Y   9xY w# 1 sw Y   yxY w# 1 sw Y   yxY ww)z;Context manager for selecting a graph and maybe eager mode.N)
as_defaultr	   
eager_mode_variable_creator_stack)geagercreator_stacks      r   _enter_graphrG   m   s      	
 ++- 		"$1!  
 
 		"$1!    
 sJ   BA<A0A< BB'	B0A9	5A<<BBBBc              #   t   K   | r"t        j                         5  d  d d d        y d  y # 1 sw Y   y xY wwr   )r	   rB   )rE   s    r   _maybe_enter_eager_moderI   |   s6     
				   
 s   8,858c                     t         j                  j                  |       }|j                  dd      }|j	                         S )NCPUr   )r   device_index)r   r   r   replace	to_string)r   
cpu_devices     r   _cpu_devicerP      s<    ##//7*!!e!!D*				r   c                       e Zd Zy)_RequestedStopN)r9   
__module____qualname__r   r   r   rR   rR      s    r   rR   c                  v    t        j                         rt         j                  h} | S t         j                  h} | S r   )r   is_traceback_filtering_enabledenable_traceback_filteringdisable_traceback_filtering)thread_local_callabless    r   (_get_thread_local_configuration_callablerZ      s<    335-HHI 
  .IIJ	r   c                 
   d}t        j                         s"t        j                         j	                          t        j                  t        f      }i }| j                  j                  }t               }g }	t        t        |            D ]s  }
t        j                  ||
      }t        | ||
|||t         j"                  t!        j$                  |
|      t!        j$                  |
|      |
      }|	j'                  |       u |	D ]  }|j)                           	 |j+                         5  d}|so|j-                         s^g }|r|	D ]  }|j.                  j1                           |	D ]  }|j2                  j5                          |j2                  j7                          |j-                         r< ddd       |	D ]  }|j.                  j1                           |j9                  |	       y|j'                  |j:                          n|	D ]  }|j.                  j1                          |j2                  j5                          |j2                  j7                          |j-                         r< ddd       |	D ]  }|j.                  j1                           |j9                  |	       y|j'                  |j:                          |j-                         r<	 ddd       |	D ]  }|j.                  j1                           |j9                  |	       yt=        |      }|slt?        |      rtA        d      t!        jB                  tE        d |	D                    }t!        jB                  tE        d |	D                    }|	d   jF                  }|	d   jH                  }t1               }|	D ]  }|jK                  |jL                          t        jN                  |      5  t        jP                  |      5  tS        jR                  |      5  tU        |	d   jV                        5   |	d   jX                  | g|i |}ddd       ddd       ddd       ddd       t[        |	      D ]   \  }}t!        j$                  |      |_.        " |s|j-                         s^ddd       |	D ]  }|j.                  j1                           |j9                  |	       t!        jB                  tE        d |	D                    S # 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# |	D ]  }|j.                  j1                           |j9                  |	       w xY w)	a  Run `fn` in separate threads, once per replica/worker device.

  Args:
    distribution: the DistributionStrategy object.
    fn: function to run (will be run once per replica, each in its own thread).
    args: positional arguments for `fn`
    kwargs: keyword arguments for `fn`.

  Returns:
    Merged return value of `fn` across all replicas.

  Raises:
    RuntimeError: If fn() calls get_replica_context().merge_call() a different
        number of times from the available devices.
  F)clean_stop_exception_typesNzNSome replicas made a different number of replica_context().merge_call() calls.c              3   4   K   | ]  }|j                     y wr   )
merge_args.0ts     r   	<genexpr>z)_call_for_each_replica.<locals>.<genexpr>   s     2QALL2   c              3   4   K   | ]  }|j                     y wr   )merge_kwargsr_   s     r   rb   z)_call_for_each_replica.<locals>.<genexpr>   s     4qANN4rc   r   c              3   4   K   | ]  }|j                     y wr   )main_resultr_   s     r   rb   z)_call_for_each_replica.<locals>.<genexpr>  s     'G!'Grc   )/r	   r4   r   get_default_graphswitch_to_thread_localr   CoordinatorrR   r,   r-   rZ   rangelenr   make_fn_MirroredReplicaThreadr   caching_scope_localselect_replicaappendstartstop_on_exceptionshould_stop
should_runset
has_pausedwaitclearjoindoner+   anyRuntimeErrorregrouptuplecaptured_name_scopecaptured_var_scopeupdatecaptured_control_deps
name_scopecontrol_dependenciesr   rI   merge_call_entered_in_eagermerge_fn	enumeratemerge_result)distributionr$   r"   r#   run_concurrentlycoordshared_variable_storedevicesrY   threadsindexvariable_creator_fnra   all_doner{   r^   re   mtt_captured_name_scopemtt_captured_var_scopemtt_captured_control_depsr   rs                         r   r.   r.      s   $ 		"	"	$224

!
!n=N
O%!!00'CE 'S\" 	e199u&|UE72B/CC/>>udK/>>ufM5	7A NN1	  aGGIK		 	 	" FNh!2!2!4 aLL  aLLLL   "FNP  ll	JJw} KK    aLLLLLL   "'FNP  ll	JJwm KK  -FNP  ll	JJwg t9Y  G H H (//2'224*)114G446, %,AJ$B$B
!#*1:#@#@
 &)e
# Fa%,,Q-D-DEF( ~~%' ?(+(@(@+)-?.<.K.K,/.? 0G!!*@@0B?
 /71:..| ?j ?1=?L? ? ? ?  ( Nda-<<QMANNG !2!2!4FNP  ll	JJw		!	!%'Gw'G"G	HH? ? ? ? ? ? ? ?}FN FNP  ll	JJws   T: *BT.0T: +BT.-T: (.T.T: CT.T"4T
T
	#S>=T
	TT"AT.T: >TT
	
TTTT""T+	'T..T73T: :4U.c                   J     e Zd ZdZ	 d	 fd	Zd Zd Zd Zd Zd Z	d Z
 xZS )
rn   z,A thread that runs() a function on a device.c                    t         t        |           || _        || _        || _        || _        |j                  j                  |      | _	        || _
        || _        || _        |	| _        d | _        d| _        d | _        d | _        d | _        d | _        d | _        d | _        	 |j,                  | _        |j0                  | _        t7        j8                         | _        t7        j8                         | _        t?        j@                          t?        j>                         }|jC                         | _"        | jG                          | jI                          tK        jL                  |jN                        | _(        tS        jT                         | _+        tS        jX                         5  t?        jB                         | _-        tS        jT                         | _.        d d d        | jV                  j^                  d d  | _/        ta        jb                         | _2        | jV                  jg                         | _4        | jh                  r| xjh                  dz  c_4        | j                  dkD  r5| jh                  sd| _4        | xjh                  d| j                  z  z  c_4        |
| _5        y # t4        $ r d | _        d | _        Y w xY w# 1 sw Y   xY w)NF/r    zreplica_%d/)6superrn   __init__r   r   r   
replica_idr,   _get_replica_id_in_sync_groupreplica_id_in_sync_groupr   main_fn	main_argsmain_kwargsrg   r{   r   r^   re   r   r   r   new_cache_scope_countcaching_scope_enteredcache_scope_exited_countcaching_scope_exitedAttributeError	threadingEventru   rw   r	   ensure_initializedr4   in_eager!record_thread_local_summary_state'record_thread_local_eager_context_stater   #TFE_ContextGetDevicePlacementPolicy_context_handlecontext_device_policyr   rh   graph
init_scope_init_in_eager_init_graphrC   r   get_variable_scope
_var_scopeget_name_scope_name_scope_thread_local_callables)selfdistr   r   r   r   r$   caching_scoper"   r#   rY   ctxr8   s               r   r   z_MirroredReplicaThread.__init__$  sS   	
 $02DJDDL DO33J? 	!  3DDLDNDDDI DMDODD#D"D'#0#F#Fd "/"H"Hd  oo'DOoo'DO  
//
C))+DM**,00266	! 	 &&(DJ		 1#557d..0d1 $(::#E#Ea#HD $779DOzz002D
#
-$//99#9D O  '#'d "&d'21 1s   %"J% '3K%K ?K Kc                    | j                   j                          | j                   j                          	 | j                  j	                         r	 | j
                  j                          y | j                          | j                          | j                          | j                  J| j                  >| j                  t        j                  _        | j                  t        j                  _        | j                  j!                         5  t#        | j$                  | j&                        5  t#        | j(                  | j*                  | j,                        5  t/        j0                  | j2                        5  t5        | j6                  | j8                        5  t;        j<                  | j>                  | j@                           5  t;        jB                  | jD                        5  tG        jF                  | jH                  | j@                  dkD        5  tG        jJ                  | jL                        5   | jN                  | jP                  i | jR                  | _*        d| _+        d d d        d d d        d d d        d d d        d d d        d d d        d d d        d d d        d d d        | j
                  j                          y # 1 sw Y   dxY w# 1 sw Y   hxY w# 1 sw Y   lxY w# 1 sw Y   pxY w# 1 sw Y   txY w# 1 sw Y   xxY w# 1 sw Y   |xY w# 1 sw Y   xY w# 1 sw Y   xY w# | j
                  j                          w xY w)Nr   )reuseT),ru   rx   ry   r   rt   rw   rv   "restore_thread_local_summary_staterestore_thread_local_callable(restore_thread_local_eager_context_stater   r   r   ro   r   r   rs   rG   r   r   r   r   rC   r	   device_policyr   _MirroredReplicaContextr   r   r   r   r   r   r   r   r   r   variable_creator_scoper   r   r   r   rg   r{   r   s    r   runz_MirroredReplicaThread.runi  s   OOOO				!0 oo/ --/
((*
335

$
$
0

#
#
/EIE_E_,,BHLHaHa,,E::'') 
t'')<)<
=
tzz4==335 

 : :
;	
 "$"3"3"&"?"?A **T\\$//2
3 ..))
* 
'
'ooT__q%8: 
/
/0H0H
I (4<<L4;K;KL	         oo                  oos   L8 ,B L8 !L,-,L  L9!L	-K< K0'/K$ K63K)K1K$9K0K<	L	LL !L,)L8 KKK!K$$K-)K00K95K<<LL	LLLL  L)	%L,,L51L8 8Mc                     t         j                  }|j                  | _        |j                  | _        |j                  | _        |j                  | _	        y)z.Record the thread local summary state in self.N)
r   _summary_statestep_summary_stepwriter_summary_writeris_recording_summary_recording"is_recording_distribution_strategy(_summary_recording_distribution_strategyr   summary_states     r   r   z8_MirroredReplicaThread.record_thread_local_summary_state  sJ     #11M&++D(//D+88D88 	1r   c                     t         j                  }| j                  |_        | j                  |_        | j                  |_        | j                  |_	        y)z-Restore thread local summary state from self.N)
r   r   r   r   r   r   r   r   r   r   r   s     r   r   z9_MirroredReplicaThread.restore_thread_local_summary_state  sJ     #11M++M//M!%!8!8M55 4r   c                 f    t        j                          }|j                  }|j                  | _        y r   )r	   _thread_local_dataop_callbacks_eager_context_op_callbacksr   r   eager_context_states      r   r   z>_MirroredReplicaThread.record_thread_local_eager_context_state  s(    
//
C00':'G'GD$r   c                 f    t        j                          }|j                  }| j                  |_        y r   )r	   r   r   r   r   s      r   r   z?_MirroredReplicaThread.restore_thread_local_eager_context_state  s)    
//
C00'+'G'G$r   c                 N    | j                   r| j                   D ]	  } |         y y r   )r   )r   r$   s     r   r   z4_MirroredReplicaThread.restore_thread_local_callable  s*    ##,, "
 $r   r   )r9   rS   rT   __doc__r   r   r   r   r   r   r   __classcell__)r8   s   @r   rn   rn   !  s4    4 DHC:J>:7HHr   rn   c                   &    e Zd ZdZd Zed        Zy)r   z(ReplicaContext for synchronized replica.c                    t        j                         }t        |t              sJ ||_        ||_        ||_        |j                  j                         |_	        |j                  r|xj                  dz  c_	        t        j                         |_        |j                  j                         |_        t        j                         j!                         |_        t%        j&                         |j                  k7  rt)        d      |j*                  j-                          |j.                  j1                          |j.                  j3                          |j4                  j7                         r
t9               d|_        |j:                  S )a  `merge_call()` implementation for synchronized replica.

    This pauses the current replica thread and passes `fn` and its arguments to
    the main thread. The main thread will wait until all replicas pause, then
    invoke `fn` with grouped arguments. The current replica thread will continue
    after `fn` completes.

    See `_call_for_each_replica` for the logic in the main thread.

    Args:
      fn: a function that is called in cross replica context with grouped
        arguments from each replica. `fn` should returns grouped values.
      args: positional arguments to `fn`.
      kwargs: keyward arguments to `fn`.

    Returns:
      Return value of `fn` for the current replica.

    Raises:
      RuntimeError: when merge_call happens in a different graph, e.g. in a
        different tf.function, which is not supported now.
      _RequestedStop: when stop is requested.

    r   aI  `merge_call` called while defining a new graph or a tf.function. This can often happen if the function `fn` passed to `strategy.run()` contains a nested `@tf.function`, and the nested `@tf.function` contains a synchronization point, such as aggregating gradients (e.g, optimizer.apply_gradients), or if the function `fn` uses a control flow statement which contains a synchronization point in the body. Such behaviors are not yet supported. Instead, please avoid nested `tf.function`s or control flow statements that may potentially cross a synchronization boundary, for example, wrap the `fn` passed to `strategy.run` or the entire `strategy.run` inside a `tf.function` or move the control flow out of `fn`. If you are subclassing a `tf.keras.Model`, please avoid decorating overridden methods `test_step` and `train_step` in `tf.function`.N)r   current_threadr(   rn   r   r^   re   r   r   r   r   r   r   _current_control_dependenciesr   r	   r4   r   r   rh   r}   rw   rv   ru   rx   ry   r   rt   rR   r   )r   r$   r"   r#   ra   s        r   _merge_callz#_MirroredReplicaContext._merge_call  s.   2 	  "Aa/000AJALANGG224As")<<>AggCCEA$+OO$5$G$G$IA!> !'')OP P LLLLLLww$(A!>>r   c                     t        j                  |        | j                  j                  j                  | j
                     gS r   )r   require_replica_context	_strategyr,   worker_devices_by_replica_replica_id_in_sync_groupr   s    r   r   z_MirroredReplicaContext.devices  s;    **4099**	, r   N)r9   rS   rT   r   r   propertyr   r   r   r   r   r     s     0[z  r   r   )NNr   )1r   
contextlibr   r0   tensorflow.pythonr    tensorflow.python.autograph.corer   r<    tensorflow.python.autograph.implr   r:   tensorflow.python.distributer   r   r   tensorflow.python.eagerr	   r
   tensorflow.python.frameworkr   r   r   tensorflow.python.opsr   r   tensorflow.python.platformr   r5   tensorflow.python.trainingr   tensorflow.python.utilr   r   r    r1   r/   contextmanagerrG   rI   rP   	ExceptionrR   rZ   r.   Threadrn   ReplicaContextr   r   r   r   <module>r      s    B    ( D = 7 9 @ + 0 ; + 0 0 < 2 2G<<@ +**,   
 
 Y  GITJY-- JZfn;; fr   