
    AVh~)                        d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ 	 ddlZej<                   G d d             Zd Z d Z!d Z"d Z#d Z$d#dZ%d Z&d Z'd Z(d Z)d Z*d Z+d$dZ,d  Z-d! Z.d" Z/y# e$ r dZY Uw xY w)%zTest utilities.    N)app)	v2_compat)collective_all_reduce_strategy)multi_process_runner)multi_worker_test_base)tpu_strategy)values)context)config)ops)tensor)	array_ops)array_ops_stack)nestc                   ,    e Zd ZU eed<   eed<   eed<   y)TestClusterParamsclustermax_num_worker
max_num_psN)__name__
__module____qualname__dict__annotations__int     V/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/test_util.pyr   r   .   s    -/r   r   c                    || j                   kD  s|| j                  kD  rt        d      | j                  0t	        j
                  | j                   | j                        | _        | j                  d   d | | j                  d   d | dS )Nz]Requesting more servers than the maximum, adjustcluster params' max_num_ps and max_num_worker)num_workersnum_psworkerps)r"   r#   )r   r   
ValueErrorr   r   create_in_process_cluster)cluster_paramsr    r!   s      r   get_cluster_defr'   5   s    N111~(((
 E F F#3MM"11((*N &&x0+>""4(&1
 r   c                 ^    t        j                  t        j                  t        |       |      S )a  Gathers value from all workers.

  This is intended for tests before we implement an official all-gather API.

  Args:
    strategy: a `tf.distribute.Strategy`.
    value: a nested structure of n-dim `tf.distribute.DistributedValue` of
      `tf.Tensor`, or of a `tf.Tensor` if the strategy only has one replica.
      Cannot contain tf.sparse.SparseTensor.

  Returns:
    a (n+1)-dim `tf.Tensor`.
  )r   map_structure	functoolspartial_gather)strategyvalues     r   gatherr/   D   s$     
		I--gx@%	HHr   c                 .   t        |t        j                        s)t        j                  t	        j
                  |      g      }t        | j                  t        j                        st        j                  |j                        S t        | j                  j                        t        |j                        k(  sJ |j                  D cg c]  }t        j                  |d       }}| j!                  t        j                  |      d      S c c}w )zGathers a single value.r   )axis)
isinstancer	   DistributedValues
PerReplicar   convert_to_tensorextendedr   CollectiveAllReduceExtendedr   stack_valueslenworker_devicesr   expand_dims_v2r/   )r-   r.   vinputss       r   r,   r,   U   s     
E633	4s44U;<=E	H%%2NN
P  //	X--	.#emm2D	DD	D9>GAI$$QQ/G&G	**62	;; Hs   Dc                    |dk  rt        d|      t        j                  |       }|st        dj	                  |             t        |      |k\  ry|t        |      z
  dz   }g }t        |      D ]^  }| j                         dk(  r&|j                  t        j                  d             <|j                  t        j                                ` t        j                  |d   |       y)	z2Create logical devices of at least a given number.   z`num` must be at least 1 not zNo {} foundNGPUi   )memory_limit)r$   r   list_physical_devicesRuntimeErrorformatr:   rangeupperappendr
   LogicalDeviceConfiguration set_logical_device_configuration)devicenumphysical_deviceslogical_devices_s        r   set_logical_devices_to_at_leastrQ   c   s    1W
#?
@@11&9	
}++F3
44	c!
 	c"##a'#/: Ca||~

,
,$
?A W??ABC 	))*:2*>Pr   c                      t        j                  d      rt        dd       t        j                  d      rt        dd       y y )NrA      CPU)r   rD   rQ   r   r   r   _set_logical_devicesrU   |   s7    !!%(#E1-!!%(#E1- )r   c                     |rt        j                  t               | rt        j                          nt        j
                          t        j                          y)z1All-in-one main function for tf.distribute tests.N)r   call_after_initrU   r   enable_v2_behaviordisable_v2_behaviorr   	test_main)rX   config_logical_devicess     r   mainr\      s;    ,-  "!!#  "r   c                    g }t        j                  | j                  | j                        D ]U  }t	        |t
        j                        r|j                  }t	        |t        j                        sJ |j                  |       W |S )zEReturns the data and control dependencies of a tf.Operation combined.)	itertoolschainr>   control_inputsr2   r   Tensoropr   	OperationrI   )rb   depsnodes      r   _op_dependenciesrf      si    	$oobii):):; d$&WWddCMM***KK	
 
+r   c                    t        j                         }t        |       D ]<  }||vrd||<   t        t        |            D ]  }|j	                  |d      dz   ||<    > g }|j                         D ]  \  }}|dk(  s|j                  |        i }d}|rW|d   |dd }}|||<   |dz  }t        t        |            D ])  }||xx   dz  cc<   ||   dk(  s|j                  |       + |rWt        |      t        |       k(  sJ |S )a  Topological sorts a list of operations.

  This does a topological sort of the operations in a graph. The edges include
  both data dependencies and control dependencies. Note that the edge goes from
  an operation to its dependencies.

  The sort is intentionally unstable, reversing orders of operations and
  dependencies on ties.

  Args:
    operations: a list of tf.Operation in the same graph.

  Returns:
    A map from a tf.Operation to its topological order.
  r   r@   N)collectionsOrderedDictreversedrf   getitemsrI   r:   )
operations
in_degreesrb   next_opnexts	in_degreeorder
next_orders           r   topological_sort_operationsrt      s:     &&(*Z  ;b	jn,R01 ;&NN7A6:j;;
 %!'') mb)A~ll2 %*a%)BE"I!OJ,R01 Q	G		!W	 	 
Us:	&&	&	,r   c                 x    | g}|r5|d   |dd }}t        |      D ]  }||k(  r y|j                  |        |r5y)zBReturns whether there exists a dependency chain from start to end.r   r@   NTF)rf   rI   )startendrp   rb   ro   s        r   _exists_dependencyrx      sS    '%a%)B#B' 	Cll7 	 
r   c           	      D    t        | fd      }t        t        |      dz
        D ]u  }t        ||   ||dz            rt	        ||   j
                  j                                t        dj                  ||   j                  ||dz      j                               y)a  Asserts there's a deterministic execution order between the operations.

  Args:
    order: a map from a tf.Operation to its topological order.
    operations: a list of operations that should be executed sequentially. It
      can be given in any order.
  c                     |    S )Nr   )rb   rr   s    r   <lambda>z-assert_sequential_execution.<locals>.<lambda>   s    r r   )keyr@   z;No dependency between {} and {}. Graph is dumped to stdout.N)
sortedrG   r:   rx   printgraphas_graph_defAssertionErrorrF   name)rr   rm   is   `  r   assert_sequential_executionr      s     j&:;*Z1$% ;ajmZA->?JqM,,./
G
N
Nm  *QU"3"8"8:; ;;r   c                      t               } t        j                         D ]*  }|j                  | j	                  |j                         , | S )z*Returns a set of all running thread names.)set	threading	enumerater   add)running_threadsthreads     r   get_running_threadsr      sE    E/##% 'f{{&++&' 
r   c                 8    |D ]  }|j                  |       s y y)zReturns whether any 'running_threads' is prefixed with 'prefix'.

  Args:
    prefix: The prefix of the expected thread name.
    running_threads: A collection of the running thread names.
  TF)
startswith)prefixr   r   s      r   
has_threadr      s)       f  
r   c                     t         t        d      t        j                         }t        j                  | ||       |j                         }|j                          |S )aB  Returns a dot graph of all the objects that are referencing the target.

  A object referencing graph is useful to debug memory leak like circular
  reference. objgraph provides a good visualization of the memory graph than
  most python built-in utilities like gc.get_referrers(), which are not
  human-readable sometimes.

  The dot graph will be written to a string IO object, and can be rendered with
  graphviz in operating system.
  E.g. dot -Tpng {$dot_graph} -o output.png
  Args:
    target: The target object for the memory graph.
    max_depth: The maximum depth of the graph. By default 3 layers of references
      are used. Increases this a lot may result in the graph growing too big.

  Returns:
    A string that contains the object reference graph.
  Raises:
    NotImplementedError: if objgraph is not installed.
  zobjgraph is not installed.)	max_depthoutput)objgraphNotImplementedErrorioStringIOshow_backrefsgetvalueclose)targetr   	string_ior   s       r   show_backrefr      sP    * 
:
;;kkm)
9YG



%//	,r   c                    t        | j                  j                        t        |      k7  rt        d      g }t	        | j                  j                  |      D ]G  \  }}t        j                  |      5  |j                  t        j                  |             ddd       I t        j                  |      S # 1 sw Y   hxY w)z4Creates a PerReplica of Tensors from the value_list.zEthe length of values must be the same as the number of worker devicesN)r:   r6   r;   r$   zipr   rL   rI   r5   r	   r4   )r-   
value_listtensorsrL   r.   s        r   create_per_replicar     s    			)	)*c*o=
OQ Q'8,,;;ZH 3mfe	F	 3nnS**5123 33 
		7	##3 3s   4%B88C	c                 t    t        | t        j                  t        j                  t        j                  f      S )z/Returns whether the strategy is a TPU strategy.)r2   r   TPUStrategyTPUStrategyV1TPUStrategyV2)r-   s    r   is_tpu_strategyr   $  s0    	H!--|/I/I!//1
2 2r   c                  ,    t        j                          y)zResets eager context.N)r
   _reset_contextr   r   r   reset_contextr   +  s    	r   )TT)   )0__doc__rh   dataclassesr*   r   r^   r   abslr   tensorflow.python.compatr   tensorflow.python.distributer   r   r   r   r	   tensorflow.python.eagerr
   tensorflow.python.frameworkr   r   r   tensorflow.python.opsr   r   tensorflow.python.utilr   r   ImportError	dataclassr   r'   r/   r,   rQ   rU   r\   rf   rt   rx   r   r   r   r   r   r   r   r   r   r   <module>r      s        	    . G = ? 5 / + . + . + 1 '
   I"
<Q2.#%P	;.
<	$2C  (s   0C   C
	C
