
    AVh`                        d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	Z	ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl m"Z" ddl#m$Z$  G d dejJ                        Z& G d dejJ                        Z' G d dejP                        Z) G d dejP                        Z* G d dejP                        Z+ G d d ejP                        Z, G d! d"e-      Z. ej^                  d#ej`                        Z1 ej^                  d$d%       Z2d& Z3 e$d'g (      d9d)       Z4ejj                  Z5ejl                  Z6ej^                  Z/d*a7 e$d+g (      d,        Z8 G d- d.e-      Z9 e9       a: e$d/g (      d0        Z;d1 Z< ejz                  e<        ej|                  d2d3d4g      Z?d5 Z@d6 ZAd7 ZBd8 ZCy):zThis module customizes `test_combinations` for `tf.distribute.Strategy`.

Additionally it provides `generate()`, `combine()` and `times()` with
`tf.distribute.Strategy` customizations as a default.
    N)app)session)collective_all_reduce_strategy)distribute_lib)multi_process_runner)multi_worker_test_base)context)def_function)combinations)config)opstest_combinations)	test_util)flags)
tf_logging)tf_decorator)
tf_inspect)	tf_exportc                       e Zd ZdZd Zy)DistributionParameterzTransforms arguments of type `NamedDistribution`.

  Convert all arguments of type `NamedDistribution` to the value of their
  `strategy` property.
  c                     |j                  dd       }i }|j                         D ]:  \  }}t        |t              s|j                  }|r||j
                  _        |||<   < |S Nuse_var_policy)getitems
isinstanceNamedDistributionstrategyextended_use_var_policy)selfkwargsrequested_parametersr   distribution_argumentskvr   s           Y/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/combinations.pymodified_argumentsz(DistributionParameter.modified_arguments<   sl     ZZ 0$7N -1	A(	)::.<(


+$,q!- "!    N__name__
__module____qualname____doc__r)    r*   r(   r   r   5   s    "r*   r   c                       e Zd ZdZd Zy)ClusterParameterszkAdds cluster parameters if a `NamedDistribution` has it.

  It needs to be before DistributionParameter.
  c                    d }|j                         D ]H  \  }}t        |t              s|.t        |j                  |j
                        dkD  rt        d      |}J |rj|j                  }|j
                  }|j                  }|j                  }	|j                  }
d|v r|d   |k7  rt        d      d|v rm|d   |k7  ret        d      |j                  dd      }|j                  dd      }|j                  dd       }|j                  d	d
      }	|j                  dd      }
i }d|v r||d<   d|v r||d<   d|v r||d<   d	|v r|	|d	<   d|v r|
|d<   |S )N   z9Only support one NamedDistribution for multi workertests.	has_chiefz<both has_chief and strategy specified but are not compatiblenum_workersz>both num_workers and strategy specified but are not compatibleFrunner	share_gpuTnum_psr   )r   r   r   _num_total_workersr5   r6   
ValueErrorr7   r8   r9   r   )r"   r#   r$   r   _r'   r5   r6   r7   r8   r9   updates               r(   r)   z$ClusterParameters.modified_argumentsQ   s   H 1	A(	)$6q{{78}}%FHI%J $ % % $$i((kf$$if		6+#6)#CJL 	L	&	 VM%:k%ILN 	N **[%0iJJ}a0kzz(D)f**[$/izz(A&f F**%f[,,)f]''fX**%f[''fXMr*   Nr+   r0   r*   r(   r2   r2   K   s    
*r*   r2   c                       e Zd ZdZd Zd Zy)DistributionCombinationz(Sets up distribution strategy for tests.c                     |j                         D cg c]  }t        |t              s| }}t        j                         rt        d |D              ryyc c}w )Nc              3   4   K   | ]  }|j                     y wN)no_xla).0ds     r(   	<genexpr>zEDistributionCombination.should_execute_combination.<locals>.<genexpr>   s     )Jq!(()Js   )Fz@n/a: skipping strategy combination with no_xla=True in XLA testsTN)valuesr   r   r   is_xla_enabledany)r"   r#   r'   distributionss       r(   should_execute_combinationz2DistributionCombination.should_execute_combination   sW    ==?j4E&FM  !c)JM)J&JN s
   AAc                 @    t               t        j                  d      gS r   )r   combinations_libOptionalParameterr"   s    r(   parameter_modifiersz+DistributionCombination.parameter_modifiers   s!    **+;< r*   N)r,   r-   r.   r/   rL   rQ   r0   r*   r(   r?   r?   ~   s    0r*   r?   c                       e Zd ZdZd Zy)ClusterCombinationzSets up multi worker tests.c                     t               gS rB   )r2   rP   s    r(   rQ   z&ClusterCombination.parameter_modifiers   s      r*   N)r,   r-   r.   r/   rQ   r0   r*   r(   rS   rS      s
    #!r*   rS   c                   x    e Zd ZdZdZej                  r  ej                  dej                  d         Zd Z	d Z
y)GPUCombinationa  Enable tests to request GPU hardware and skip non-GPU combinations.

  This class expects test_combinations to be generated with `NamedDistribution`
  wrapping instances of `tf.distribute.Strategy`.

  Optionally, the `required_gpus` argument is supported.  GPU hardware is
  required, if its value is `True` or > 0.

  Attributes:
    GPU_TEST: The environment is considered to have GPU hardware available if
              the name of the program contains "test_gpu" or "test_xla_gpu".
  Fz(test_2?gpu|test_xla_2?gpu)$r   c                 8   |j                         D cg c]  }t        |t              s| }}|j                  dd      }|j                  dd      }|r|rt	        d      t        |g|gz   |D cg c]  }|j                  xs d c}z   |D cg c]  }|j                  xs d c}z         }t        |g|D cg c]  }|j                  xs d c}z         }|r|rt	        d      |st        j                  ry|dkD  r=t        j                         |k  r&ddj                  t        j                         |      fS |t        t        j                  d	            kD  r'dd
j                  t        j                  d	      |      fS yc c}w c c}w c c}w c c}w )Nrequired_gpusr   required_physical_gpuszLDo not use `required_gpus` and arguments of type NamedDistribution together.zOnly one of `required_physical_gpus`(number of physical GPUs required) and `required_gpus`(total number of GPUs required) should be set. )FzTest that doesn't require GPUs.Fz*Only {} of {} required GPUs are available.GPUz3Only {} of {} required physical GPUs are available.rG   )rH   r   r   r   r;   maxrY   rX   rV   GPU_TESTr	   num_gpusformatlenr   list_physical_devices)	r"   r#   r'   rK   rX   rY   rE   number_of_required_gpus number_of_required_physical_gpuss	            r(   rL   z)GPUCombination.should_execute_combination   s   ==?j4E&FM  JJ2M#ZZ(@!D 5 6 6 "	1220=>1	!	!	&Q	&>	?'45!	A	5	67 (+	 0=>1	!	!	&Q	&>	?(@$ 	= 8 9 9 #~'>'>7
!A
% #::BII



57 9 9	)C$$U+-- 
-DKK..u57MOQ Q E 	?5 	?s   FF1F
F
;F
c                 V    t        j                  d      t        j                  d      gS )NrX   rY   rN   rO   rP   s    r(   rQ   z"GPUCombination.parameter_modifiers   s+    ..?../GHJ Jr*   N)r,   r-   r.   r/   r\   sysargvresearchrL   rQ   r0   r*   r(   rV   rV      s<     (XXryy8#((1+FH#JJr*   rV   c                   Z    e Zd ZdZdZej                  rdej                  d   v Zd Zd Zy)TPUCombinationa  Allow to request TPU hardware and skip non-TPU combinations.

  This class expects test_combinations to be generated with `NamedDistribution`
  wrapping instances of `tf.distribute.Strategy`.

  Optionally, the `required_tpus` parameter is supported.  TPU hardware is
  required, if its argument is `True` or > 0.

  Optionally, the `use_cloud_tpu` parameter is supported. If TPU hardware is
  required by `required_tpus`, it specifically must be a Cloud TPU (specified
  with `--tpu`) if `use_cloud_tpu` is `True`.

  Attributes:
    TPU_TEST: The environment is considered to have TPU hardware available if
              the name of the program contains "test_tpu".
  Ftest_tpur   c                    |j                         D cg c]  }t        |t              s| }}d|v rd|v rt        d      |j	                  dd       xs |j	                  dd       }|r|rt        d      t        |xs dg|D cg c]  }|j                  xs d c}z         }t        |j	                  d      g|D cg c]  }|j                   c}z         }t        t        j                  d      xr t        j                  j                  xs d}|st        j                  ry	|rt        j                  sy
|r|sy|s|ryyc c}w c c}w c c}w )Nrequired_tpusrequired_tpuzSDo not use `required_tpu`.  Both `required_tpus` and `required_tpu` were specified.zLDo not use `required_tpus` and arguments of type NamedDistribution together.r   use_cloud_tputpu )FzTest that doesn't require TPUs.)Fz,Test requires a TPU, but it's not available.)Fz.Test requires a Cloud TPU, but none specified.)Fz1Test requires local TPU, but Cloud TPU specified.rG   )rH   r   r   r;   r   r[   rn   rJ   ro   hasattrr   FLAGSrp   rj   TPU_TEST)	r"   r#   r'   rK   rm   rE   number_of_required_tpusro   rp   s	            r(   rL   z)TPUCombination.should_execute_combination   sU   ==?j4E&FM 
 & ^v%= 8 9 9JJ5 :M  5 6 6
 "=#5A"6@M"N11>>#6Q#6"N#O PO452?@Q@A BM
%++u
%
9%++//
?RC"~'>'>7~'>'>DSFSI?$ #O@s   EEE
 E
c                 ~    t        j                  d      t        j                  d      t        j                  d      gS )Nrm   rn   ro   rd   rP   s    r(   rQ   z"TPUCombination.parameter_modifiers  s6    **?;**>:**?; r*   N)	r,   r-   r.   r/   rt   re   rf   rL   rQ   r0   r*   r(   rj   rj      s2    " (XXSXXa[(H Dr*   rj   c                   R    e Zd ZdZ	 	 	 	 	 	 	 	 	 	 ddZed        Zed        Zd Zy)r   zAWraps a `tf.distribute.Strategy` and adds a name for test titles.Nc                     t         j                  |        || _        || _        || _        || _        || _        || _        || _        || _	        |	| _
        |
| _        || _        || _        y)a  Initialize NamedDistribution.

    Args:
      name: Name that will be a part of the name of the test case.
      distribution_fn: A callable that creates a `tf.distribute.Strategy`.
      required_gpus: The number of GPUs that the strategy requires. Only one of
      `required_gpus` and `required_physical_gpus` should be set.
      required_physical_gpus: Number of physical GPUs required. Only one of
      `required_gpus` and `required_physical_gpus` should be set.
      required_tpu: Whether the strategy requires TPU.
      use_cloud_tpu: Whether the strategy requires cloud TPU.
      has_chief: Whether the strategy requires a chief worker.
      num_workers: The number of workers that the strategy requires.
      num_ps: The number of parameter servers.
      share_gpu: Whether to share GPUs among workers.
      pool_runner_fn: An optional callable that returns a MultiProcessPoolRunner
        to run the test.
      no_xla: Whether to skip in XLA tests.
    N)object__init___name_distribution_fnrX   rY   rn   ro   r5   r6   r9   r8   _pool_runner_fnrC   )r"   namedistribution_fnrX   rY   rn   ro   r5   r6   r9   r8   pool_runner_fnrC   s                r(   rz   zNamedDistribution.__init__  sl    @ OODDJ+D&D"8D$D&DDN"DDKDN)DDKr*   c                 <    | j                   | j                         S y rB   )r}   rP   s    r(   r7   zNamedDistribution.runnerE  s     '!!##r*   c                 "    | j                         S rB   )r|   rP   s    r(   r   zNamedDistribution.strategyK  s      ""r*   c                     | j                   S rB   )r{   rP   s    r(   __repr__zNamedDistribution.__repr__O  s    ::r*   )
Nr   FFFr4   r   TNF)	r,   r-   r.   r/   rz   propertyr7   r   r   r0   r*   r(   r   r     sX    I
 "&'!"",\  
 # #r*   r   
TfFunctionNoTfFunctionc                     | S rB   r0   )fs    r(   <lambda>r   b  s     r*   c                  "    g }| D ]  }||z  }	 |S )zConcats combinations.r0   )combinedresultones      r(   concatr   e  s$    & c
cMF	-r*   z-__internal__.distribute.combinations.generate)v1c                     t        j                         t        j                         t               t	               t               t               f}t        j                  | ||z         fd}|S )aT  Distributed adapter of `tf.__internal__.test.combinations.generate`.

  All tests with distributed strategy should use this one instead of
  `tf.__internal__.test.combinations.generate`. This function has support of
  strategy combinations, GPU/TPU and multi worker support.

  See `tf.__internal__.test.combinations.generate` for usage.
  r   c                 z   t        | t              r| }t        j                  |j                  j                               D ]`  \  }}|j                  t        j                  j                        s0t        |t        j                        sKt        ||t        |             b  |      S  t        |             S rB   )r   typesix	iteritems__dict__copy
startswithunittest
TestLoadertestMethodPrefixtypesFunctionTypesetattr_multi_worker_test)test_method_or_classclass_objectr~   test_methodcombination_decorators       r(   	decoratorzgenerate.<locals>.decorator  s    &-)l"}}\-B-B-G-G-IJ G
$OOH//@@A{E$6$67
,&8&E
FG #<00"#56J#KLLr*   )	framework_combinationsEagerGraphCombinationTFVersionCombinationrS   r?   rV   rj   rN   generate)r   r   default_combinationsr   r   s       @r(   r   r   m  sk     224113 +33&:=N&NPM 
r*   Fz4__internal__.distribute.combinations.in_main_processc                      t          S )zWhether it's in the main test process.

  This is normally used to prepare the test environment which should only happen
  in the main process.

  Returns:
    A boolean.
  )_running_in_workerr0   r*   r(   in_main_processr     s      	r*   c                   (     e Zd ZdZd Z fdZ xZS )TestEnvironmentzHolds the test environment information.

  Tests should modify the attributes of the instance returned by `env()` in the
  main process if needed, and it will be passed to the worker processes each
  time a test case is run.
  c                      d | _         d | _        y rB   )tf_data_service_dispatchertotal_phsyical_gpusrP   s    r(   rz   zTestEnvironment.__init__  s    &*D#  $Dr*   c                 P    t               st        d      t        |   ||       y )Nzvcombinations.env() should only be modified in the main process. Condition your code on combinations.in_main_process().)r   r;   super__setattr__)r"   r~   value	__class__s      r(   r   zTestEnvironment.__setattr__  s/    CD D 
Ge$r*   )r,   r-   r.   r/   rz   r   __classcell__)r   s   @r(   r   r     s    $% %r*   r   z(__internal__.distribute.combinations.envc                      t         S )zReturns the object holds the test environment information.

  Tests should modify this in the main process if needed, and it will be passed
  to the worker processes each time a test case is run.

  Returns:
    a TestEnvironment object.
  )_envr0   r*   r(   envr     s	     
+r*   c                      t               r:t        t        j                         j                  d            t	               _        y y )NrZ   )r   r_   r	   r`   r   r   r0   r*   r(   _set_total_phsyical_gpusr     s1     #//6!8CE r*   _TestResultstatusmessagec                    da |at        j                  j	                  |       }t        j
                         }|j                  |      }|j                  |j                  z   |j                  z   }|rt        d|d   d         }n7|j                  rt        d|j                  d   d         }nt        dd      }|j                  rt        |j                         |S )	a  Executes the test with the given test_id.

  This is a simple wrapper around TestRunner to be used with
  multi_process_runner. Similar to test.main(), but it executes only one test
  specified by test_id and returns whether the test succeeds. If the test fails,
  the function prints failures and errors to stdout.

  Args:
    test_id: TestCase.id()
    test_env: a TestEnvironment object.

  Returns:
    A boolean indicates whether the test succeeds.
  Tfailurer   r4   )r   r   skippedokN)r   r   r   defaultTestLoaderloadTestsFromNameTextTestRunnerrunfailuresexpectedFailureserrorsr   r   r   print)test_idtest_envtestr7   r   r   rets          r(   _test_runnerr     s    $ 	$		#	#	5	5g	>$""$&::d& __v666F(
YA
?C~~
Yq0A!0D
EC T4
0C 	[[	#++	*r*   c                       fd}t        j                         }|j                  xs g g dz   }|j                  |      }t	        j
                   ||      S )a  Decorate test_method so that it runs in each worker.

  We use `multi_process_runner` to simulate multiple workers. Since we run the
  this function in the main process and all worker processes, this decoration
  behaves differently in the main process and worker procssses. In the main
  process, it spawns subprocesses and runs the test on each of them; in a worker
  process, it executes test in the same way as a normal test, e.g.
  setUp()/tearDown() are called before/after the test.

  Args:
    test_method: a function which must be a test method.

  Returns:
    Decorated `test_method`. Note that the decorated function has additional
    arguments.
  c                    t        ||      dk(  st        st        j                         r#|dkD  rt	        |      5   | fi | d d d        y | j                         }|r|j                  t        |t        f      }ngt        j                  |||d      }	t        j                  t        |	||t        f|      }
|
j                          |
j                         j                  }d }|D ]J  }|j                   dk(  r| j#                  |j$                          n|j                   dk(  s?|j$                  }L || j'                  |       y y # 1 sw Y   y xY w)	Nr4   r   argsF)r5   r6   r9   has_eval)r8   r   dependence_on_chiefr   r   )r:   r   r   rI   _multi_worker_sessionidr   r   r   r   create_cluster_specr   MultiProcessRunnerstartjoinreturn_valuer   failr   skipTest)r"   r5   r6   r9   r8   r7   r#   r   resultscluster_specephemeral_runnerskip_reasonr   r   s                r(   r   z%_multi_worker_test.<locals>.decorator  sL   )%'*+,/A (668VaZ !( $D#F#$$ ggiG

<wo
>g+??!	l
 .@@

')  %%'44gK 	%	)	# 			&..!==I% nn	% 
mmK  c$s   
EE)r5   r6   r9   r8   r7   r   )decorator_argspec)r   getfullargspecr   _replacer   make_decorator)r   r   argspecdecorator_argsr   s   `    r(   r   r     sg    $B!H %%k2'LL&B + . &&N&;		$	$90A
C Cr*   c                     | r|dz   S |S )z2Returns the number of workers including the chief.r4   r0   )r5   r6   s     r(   r:   r:   k  s    ?	r*   c                 P   d}| j                         D ]O  \  }}t        |t        j                        s!|+t	        j
                  d       t        j                         c S |}Q t        j                         st        |t        j                        st        j                         S t        j                  t        j                         j                        }|j                  |      }|j                   j#                         }t%        j&                  ||      j)                         S )a  Returns a context manager that enters a session that is configured for the MultiWorkerMirroredStrategy.

  Args:
    kwargs: a dict. Keyword arguments passed to the test.

  Returns:
    A context manager. If MultiWorkerMirroredStrategy is the  one and only one
    strategy in kwargs and it's in graph mode, it's the seesion that is
    configured for that strategy.  Otherwise, it's a no-op context manager.
  NzcThe test uses multiple strategies. Skipping entering a session that is configured for the strategy.)r   target)r   r   r   StrategyBaseloggingwarningr   NullContextmanagerr	   executing_eagerlyr   CollectiveAllReduceStrategyr   deepcopyr   update_config_protocluster_resolvermasterr   Session
as_default)r#   r   r<   r'   sess_configr   s         r(   r   r   r  s     (lln da!^001		F	G %%''h  
.JJ)L!!##goo/667+,,[9+$$++-&	F	;	F	F	HHr*   )r0   )Dr/   collectionsr   rg   re   r   r   abslr   r   tensorflow.python.clientr   tensorflow.python.distributer   r   r   r   tensorflow.python.eagerr	   r
   tensorflow.python.frameworkr   r   r   r   r   rN   r   tensorflow.python.platformr   r   r   tensorflow.python.utilr   r    tensorflow.python.util.tf_exportr   ParameterModifierr   r2   TestCombinationr?   rS   rV   rj   ry   r   NamedObjectfunctiontf_functionno_tf_functionr   r   combinetimesr   r   r   r   r   r   call_after_init
namedtupler   r   r   r:   r   r0   r*   r(   <module>r     s     	 
    
 - G 7 = ? + 0 N . + M 1 , < / - 6
",>> ",0(:: 0f.>> (!)99 !8J%55 8Jv=%55 =@< <Z +**<9N9NO-!--nkJ :rB& C&R 
"
"**   AbI	  J	 %f %,  5"=	 >	8   , - %k$$]Xy4IJ'T\C~Ir*   