
    AVhF                        d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddl
mZ ddl
mZ dd	l
mZ dd
l
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z$ ddl"m%Z% ddl"m&Z& ddl'm(Z( ddl'm)Z) ddl'm*Z* ddl+m,Z- ddl.m/Z/ ddl0m1Z1 dd l2m3Z3  e3d!g "       G d# d$ejh                               Z5 G d% d&e6      Z7 e3d'g "       G d( d)e5e7*             Z8e5jr                  e8_9         e3d'g"       G d+ d,ejt                               Z;d- Z< G d. d/ejz                        Z>y)0zDClass CollectiveAllReduceStrategy implementing DistributionStrategy.    N)coordination_config_pb2)rewriter_config_pb2)tensorflow_server_pb2)collective_util)cross_device_ops)cross_device_utils)device_util)distribute_lib)distribute_utils)	input_lib)
input_util)mirrored_strategy)multi_worker_util)numpy_dataset)reduce_util)values)cluster_resolver)tfconfig_cluster_resolver)tpu_cluster_resolver)contextdevice)errors)ops)	array_ops)collective_ops)control_flow_util)
tf_logging)base)deprecation)	tf_exportz&distribute.MultiWorkerMirroredStrategy)v1c                   J     e Zd ZdZdZd fd	Zedd       Zed        Z	 xZ
S )CollectiveAllReduceStrategya  A distribution strategy for synchronous training on multiple workers.

  This strategy implements synchronous distributed training across multiple
  workers, each with potentially multiple GPUs. Similar to
  `tf.distribute.MirroredStrategy`, it replicates all variables and computations
  to each local device. The difference is that it uses a distributed collective
  implementation (e.g. all-reduce), so that multiple workers can work together.

  You need to launch your program on each worker and configure
  `cluster_resolver` correctly. For example, if you are using
  `tf.distribute.cluster_resolver.TFConfigClusterResolver`, each worker needs to
  have its corresponding `task_type` and `task_id` set in the `TF_CONFIG`
  environment variable. An example TF_CONFIG on worker-0 of a two worker cluster
  is:

  ```
  TF_CONFIG = '{"cluster": {"worker": ["localhost:12345", "localhost:23456"]},
  "task": {"type": "worker", "index": 0} }'
  ```

  Your program runs on each worker as-is. Note that collectives require each
  worker to participate. All `tf.distribute` and non `tf.distribute` API may use
  collectives internally, e.g. checkpointing and saving since reading a
  `tf.Variable` with `tf.VariableSynchronization.ON_READ` all-reduces the value.
  Therefore it's recommended to run exactly the same program on each worker.
  Dispatching based on `task_type` or `task_id` of the worker is error-prone.

  `cluster_resolver.num_accelerators()` determines the number of GPUs the
  strategy uses. If it's zero, the strategy uses the CPU. All workers need to
  use the same number of devices, otherwise the behavior is undefined.

  This strategy is not intended for TPU. Use `tf.distribute.TPUStrategy`
  instead.

  After setting up TF_CONFIG, using this strategy is similar to using
  `tf.distribute.MirroredStrategy` and `tf.distribute.TPUStrategy`.

  ```
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  with strategy.scope():
    model = tf.keras.Sequential([
      tf.keras.layers.Dense(2, input_shape=(5,)),
    ])
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)

  def dataset_fn(ctx):
    x = np.random.random((2, 5)).astype(np.float32)
    y = np.random.randint(2, size=(2, 1))
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    return dataset.repeat().batch(1, drop_remainder=True)
  dist_dataset = strategy.distribute_datasets_from_function(dataset_fn)

  model.compile()
  model.fit(dist_dataset)
  ```

  You can also write your own training loop:

  ```
  @tf.function
  def train_step(iterator):

    def step_fn(inputs):
      features, labels = inputs
      with tf.GradientTape() as tape:
        logits = model(features, training=True)
        loss = tf.keras.losses.sparse_categorical_crossentropy(
            labels, logits)

      grads = tape.gradient(loss, model.trainable_variables)
      optimizer.apply_gradients(zip(grads, model.trainable_variables))

    strategy.run(step_fn, args=(next(iterator),))

  for _ in range(NUM_STEP):
    train_step(iterator)
  ```

  See
  [Multi-worker training with
  Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras)
  for a detailed tutorial.

  __Saving__

  You need to save and checkpoint on all workers instead of just one. This is
  because variables whose synchronization=ON_READ triggers aggregation during
  saving. It's recommended to save to a different path on each worker to avoid
  race conditions. Each worker saves the same thing. See
  [Multi-worker training with
  Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#model_saving_and_loading)
  tutorial for examples.

  __Known Issues__

  * `tf.distribute.cluster_resolver.TFConfigClusterResolver` does not return the
  correct number of accelerators. The strategy uses all available GPUs if
  `cluster_resolver` is `tf.distribute.cluster_resolver.TFConfigClusterResolver`
  or `None`.
  * In eager mode, the strategy needs to be created before calling any other
  Tensorflow API.
  r   c                    |t        j                         }t        t        |   t        | ||             t        j                  j                  d      j                  d       t        j                  j                  d      j                  | j                  j                         t        j                  j                  d      j                  | j                  j                         y)aQ  Creates the strategy.

    Args:
      cluster_resolver: optional
        `tf.distribute.cluster_resolver.ClusterResolver`. If `None`,
        `tf.distribute.cluster_resolver.TFConfigClusterResolver` is used.
      communication_options: optional
        `tf.distribute.experimental.CommunicationOptions`. This configures the
        default options for cross device communications. It can be overridden by
        options provided to the communication APIs like
        `tf.distribute.ReplicaContext.all_reduce`. See
        `tf.distribute.experimental.CommunicationOptions` for details.
    Nr   communication_optionsV2MultiWorkerMirroredStrategynum_workersnum_replicas_per_worker)r   Optionssuperr$   __init__CollectiveAllReduceExtendedr
   distribution_strategy_gaugeget_cellset#distribution_strategy_replica_gaugeextended_num_workers_num_devices_per_worker)selfr   r'   	__class__s      k/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.pyr.   z$CollectiveAllReduceStrategy.__init__   s     $-557	
%t5#-"7	
 ..77=AA% 66??	c$--
$
$%66??!	c$--
/
/0    c                 v     | |      }|j                   j                  t        j                         |       |S )@A convenience method to create an object with a list of devices.)r'   devicesr4   _initialize_localr   TFConfigClusterResolver)clsr>   r'   objs       r9   _from_local_devicesz/CollectiveAllReduceStrategy._from_local_devices   s;     $9
:CLL""!99;W #  Jr:   c                 .    | j                   j                  S )am  Returns the cluster resolver associated with this strategy.

    As a multi-worker strategy, `tf.distribute.MultiWorkerMirroredStrategy`
    provides the associated `tf.distribute.cluster_resolver.ClusterResolver`. If
    the user provides one in `__init__`, that instance is returned; if the user
    does not, a default `TFConfigClusterResolver` is provided.
    )r4   _cluster_resolverr7   s    r9   r   z,CollectiveAllReduceStrategy.cluster_resolver   s     ==***r:   )NNN)__name__
__module____qualname____doc___collective_key_baser.   classmethodrD   propertyr   __classcell__r8   s   @r9   r$   r$   8   s@    f\ !1F   + +r:   r$   c                       e Zd Zed        Zy),_CollectiveAllReduceStrategyExperimentalMetac                 "    t        |t              S rH   )
isinstancer$   )rB   instances     r9   __instancecheck__z>_CollectiveAllReduceStrategyExperimentalMeta.__instancecheck__   s    
 h ;<<r:   N)rI   rJ   rK   rN   rW    r:   r9   rS   rS      s    = =r:   rS   z3distribute.experimental.MultiWorkerMirroredStrategyc                        e Zd Zej                  Z ej                  dd      ej                  j                  df fd	       Z
eej                  j                  fd       Z xZS )(_CollectiveAllReduceStrategyExperimentalNz2use distribute.MultiWorkerMirroredStrategy insteadc                 Z    t        j                  |      }t        t        |   ||       y)a  Creates the strategy.

    Args:
      communication: optional
        `tf.distribute.experimental.CommunicationImplementation`. This is a hint
        on the preferred collective communication implementation. Possible
        values include `AUTO`, `RING`, and `NCCL`.
      cluster_resolver: optional
        `tf.distribute.cluster_resolver.ClusterResolver`. If `None`,
        `tf.distribute.cluster_resolver.TFConfigClusterResolver` is used.
    implementationN)r   r,   r-   rZ   r.   r7   communicationr   r'   r8   s       r9   r.   z1_CollectiveAllReduceStrategyExperimental.__init__   s1    & ,33$ 

2DB/r:   c                 t     | |      }|j                   j                  t        j                         |       |S )r<   r=   r?   )rB   r>   r_   rC   s       r9   rD   z<_CollectiveAllReduceStrategyExperimental._from_local_devices  s:     m
CLL""!99;W #  Jr:   )rI   rJ   rK   r$   rL   r    
deprecatedr   CommunicationImplementationAUTOr.   rN   rD   rP   rQ   s   @r9   rZ   rZ      sn     (//';
@
 $??DD.  $??DD r:   rZ   )	metaclassc                   h     e Zd Zej                  ZdZej                  j                  df fd	Z	 xZ
S )CollectiveAllReduceStrategyV1r   Nc                    t        j                  |      }t        t        |   t        | ||             t        j                  j                  d      j                  d       t        j                  j                  d      j                  | j                  j                         t        j                  j                  d      j                  | j                  j                  dk(  r| j                  j                         y	d       y	)
zInitializes the object.r\   r&   V1r)   r*   num_gpu_per_workerGPUr   N)r   r,   r-   rf   r.   r/   r
   r0   r1   r2   r3   r4   r5   _local_device_typer6   r^   s       r9   r.   z&CollectiveAllReduceStrategyV1.__init__*  s     ,33$ 

'7#-"7	
 ..77=AA% 66??	c$--
$
$%66??	c==++u4 	-- r:   )rI   rJ   rK   r$   rL   rM   r   rb   rc   r.   rP   rQ   s   @r9   rf   rf   "  s6     (//'  $??DD r:   rf   c                 Z    t         j                  j                  |       j                  dk(  S )Nrj   )	tf_device
DeviceSpecfrom_stringdevice_typer   s    r9   _is_gpu_devicerq   J  s$    				)	)&	1	=	=	FFr:   c                       e Zd ZdZdZdZdZdZdZ	 d+dZ	d Z
d	 Zd
 Zd+ fd	Z fdZd Zd+dZed        Z fdZd Zd Zd Zd Zd Zej4                  j6                  fdZ	 d,dZd Zd Zd Z d Z!d+ fd	Z"d Z#d Z$d Z%d  Z&d! Z'ed"        Z(ed#        Z)ed$        Z*ed%        Z+ed&        Z,ed'        Z-d( Z.d) Z/d* Z0 xZ1S )-r/   z.Implementation of CollectiveAllReduceStrategy.T   r      
   c                 :   t        |t        j                        st        d      |r|rt        d      |xs t	        j
                         | _        t        | j                  t        j                        st        d      t        j                  j                  | |       || _        |j                  | _        | j                  | j                  |       t        j                          | _        d| _        t        | j&                  t(        j*                        sJ y )Nz\communication_options must be an instance of tf.distribute.experimental.CommunicationOptionsz;cluster_resolver and devices cannot be set at the same timezVcluster_resolver must be an instance of tf.distribute.cluster_resolver.ClusterResolverr=   T)rU   r   r,   
ValueErrorr   rA   rF   cluster_resolver_libClusterResolverr
   StrategyExtendedV1r.   _communication_optionsrM   _initialize_strategyweakrefWeakKeyDictionary_cfer_fn_cache(experimental_enable_get_next_as_optional_cross_device_opscross_device_ops_libCollectiveAllReduce)r7   container_strategyr   r'   r>   s        r9   r.   z$CollectiveAllReduceExtended.__init___  s    +_-D-DE<  G
G 
 	O5MMO 	  4 D D ;  %%..t5GH"7D 2 G GDd44gF!335D48D1 4 H H  r:   c                     t        j                  t        j                                xs- t	        | j
                  D cg c]  }t        |       c}       S c c}w rH   )r   GraphOrParentsInXlaContextr   get_default_graphall_devicesrq   )r7   ds     r9   _use_merge_callz+CollectiveAllReduceExtended._use_merge_call  sO     !;;  =T]];.#;<	<=;s   Ac                     |s|j                         j                         s| j                  ||       y | j                  |       y )Nr=   )cluster_specas_dictr@   _initialize_multi_worker)r7   r   r>   s      r9   r|   z0CollectiveAllReduceExtended._initialize_strategy  s?     &335==?
-w?
##$45r:   c                 P   t        |t        j                        rt        j                         }d}n@|j                         j                  dd      }|j                         j                  dd      }|rd|}n|rd|}ndd}t        fdt        |      D              }|fS )Nr   rj   TPUCPU   c              3   2   K   | ]  } d  d|   yw)z/device::NrX   ).0ilocal_device_typeworker_devices     r9   	<genexpr>zHCollectiveAllReduceExtended._initialize_local_devices.<locals>.<genexpr>  s,       /"3!4AaS9s   )	rU   r   rA   r   num_gpusnum_acceleratorsgettuplerange)r7   r   r   r   num_tpusnum_local_deviceslocal_devicesr   s     `    @r9   _initialize_local_devicesz5CollectiveAllReduceExtended._initialize_local_devices  s     3KK !!#hh!22488Bh!22488Bh"	" () M +++r:   c                 P   d| _         d| _        t        j                         r,	 t	        j                         j                  d       d| _	        |r|}d|d   v rd}n#d|d   v rd}nd	}n| j                  |d
      \  }}t        j                  d      | _        t        j                  | j                        | _        t#        j$                  d| j&                  z         | _        t+        j,                  |t/        |      | j0                  | j(                        | _        t+        j,                  | j                  g| j                  | j0                  | j(                        | _        t6        t8        | w  |       d| _        d| _        d| _         d| _!        d| _"        t/        |      | _#        || _$        |jJ                  | _&        | jO                          t        jP                  d|| j0                  jR                         y# t        $ r t        j                  d       Y w xY w)z*Initializes the object for local training.Tr   CollectiveReduce)scoped_allocator_enabled_opszbCollective ops is not configured at program startup. Some performance features may not be enabled.rj   r   r   r    )r   z/device:CPU:0group_key_startr>   
group_sizeoptionscollective_keysNzUSingle-worker MultiWorkerMirroredStrategy with local_devices = %r, communication = %s)*	_is_chiefr5   r   #executing_eagerly_outside_functionsr   configure_collective_opsRuntimeErrorloggingwarning_collective_ops_configuredr   r	   canonicalize_worker_devicer   SingleDevice_host_input_devicer   CollectiveKeysrM   _collective_keysr   r   lenr{   r   _host_cross_device_opsr-   r/   _initialize_single_worker_cluster_spec
_task_type_task_id_id_in_cluster _local_or_standalone_client_moder6   rk   	rpc_layer
_rpc_layer_warn_nccl_no_gpuinfor]   )r7   r   r>   r   r   r8   s        r9   r@   z-CollectiveAllReduceExtended._initialize_local  s   DND
..0
22)> 	3 	
 )-d%m	'!*	!GAJ!!)-)G)G
" *H *&m& &22?CD+889L9LMD.==D555D 2EE}%++--	D #7"J"J$$%$$++--	#D 

%tF DDODMD -1D) $'}#5D /D&00DOLL	###22	y  
<	

s   $H H%$H%c           	      
   t        j                  |j                               }|j                  }|j                  }||t        d      || _        || _        || _        t        j                  | j                  | j                  | j                        | _
        t        j                  ||      | _        | j                  st        d      t        j                  |||      | _        d||fz  | _        t!        j"                  | j                        | _        t'        j(                         rt+        | dd      st-        j,                         j/                  t        j0                  |||      dd||fz  f       d	| _        t-        j,                         j4                  d
dg}||v rg }|D ]F  }||j6                  v s|j9                  t;        j<                  ||j?                  |                   H t-        j,                         jA                  dt        jB                  |      |       t-        jD                         rYt+        | dd      sKt+        | dd      s=tG        jH                  t-        j,                         jJ                        }| jM                  |      }|jN                  jP                  jR                  rd| _*        tW        |d      r|jX                  }	nd}	t[        j\                  |j_                         ||||j`                  xs d|	      }
t-        j,                         jc                  |
       d	| _2        t-        j,                         jg                          ti        jj                  dt-        j,                         jm                                | jo                  || j                        \  }}|dk(  rtq        jr                          tu        jv                  d| jx                  z         | _=        t}        j~                  |t        |      | j                  z  | j                  | jz                        | _B        t}        j~                  | j                  g| j                  | j                  | jz                        | _C        t        t        |   |       d||fz  | _G        t        |      | _H        || _I        |j`                  | _J        | j                          | jT                  r%t-        jD                         r| j                          nti        jj                  d       ti        jj                  d|j                         ||| j                  || j                  j                         y)z1Initializes the object for multi-worker training.NzNWhen `cluster_spec` is given, you must also specify `task_type` and `task_id`.zINo `worker`, `chief` or `evaluator` tasks can be found in `cluster_spec`./job:%s/task:%dr   Fr   )collective_leaderr   device_filtersTchiefworker)name	num_tasks
standalone)service_typeservice_leadercoordinated_jobs_std_server_startedportr   grpc)clusterdefault_session_configjob_name
task_indexprotocolr   z>Enabled multi-worker collective ops with available devices: %rr   r   r   r   zCheck health not enabled.zMultiWorkerMirroredStrategy with cluster_spec = %r, task_type = %r, task_id = %r, num_workers = %r, local_devices = %r, communication = %s)Or   normalize_cluster_specr   	task_typetask_idrw   r   r   r   id_in_clusterr   worker_countr5   is_chiefr   r   r   r   r   r   r   getattrr   r   r   r   coordination_servicejobsappendr   CoordinatedJobr   configure_coordination_servicecoordination_leaderexecuting_eagerlycopydeepcopyconfig_update_config_protoexperimentalcoordination_configr   _enable_check_healthhasattrr   r   	ServerDefas_cluster_defr   enable_collective_opsr   ensure_initializedr   r   r>   r   r   initialize_tpu_systemr   r   rM   r   r   r   r   r{   r   r   r-   r/   r   _default_devicer6   rk   r   r   _start_check_health_threadr   r]   )r7   r   r   r   r   r   coordinated_job_configjobconfig_protor   
server_defr   r   r8   s                r9   r   z4CollectiveAllReduceExtended._initialize_multi_worker  s   $;;%%'L !**I&&GGO'  &DDODM+99DOOT]]D *66|YOD 
 '//iDN ,y'.BBD+889L9LMD
..00%: oo00-??Iw (=+y'.BBD 1  )-d%			/	/	7#X.((#%
 % cl'''$++)88,*@*@*E //

:
:'.BB  6 ;  	!!#3U;@%H ]]7??#4#;#;<l..|<l 
	"	"	6	6	C	C$)!	!6	*$$(22--/!-#--7j oo--j9!%d oo**,ll
J
//

#
#
% (,'E'E$--($M$ E!002.==D555D 2EE}%(9(99++--	D #7"J"J$$%$$++--	#D 

%tF -	7/CCD $'}#5D /D&00DO  W%>%>%@
%%'ll./LL	 	##22
r:   c                 $    | j                          y rH   )_stop_check_health_threadrG   s    r9   __del__z#CollectiveAllReduceExtended.__del__  s    ""$r:   c           	      0   t        j                  | j                        }|r|j                  r"t	        j
                  || j                  fg      S t	        j
                  || j                  D cg c]  }t        j                  |       c}fg      S c c}w rH   )r	   get_host_for_devicer   experimental_fetch_to_devicer   InputWorkersworker_devices)r7   r   host_devicer   s       r9   _input_workers_with_optionsz7CollectiveAllReduceExtended._input_workers_with_options  s    11$2E2EFKg::##k43F3F%G$HII##
 !// --f5& %	 
 
s   .B
c                 "    | j                         S rH   )r  rG   s    r9   _input_workersz*CollectiveAllReduceExtended._input_workers  s    ++--r:   c                 f     |dk(  rJ |J  fd}|S t        t         
  	 d||dS )Nr   c            	      Z   j                   j                  g      } j                  }j                   j                  |       }t	        j
                        5  d   }t        |      r |       }t        |t        j                        r|j                  }t        |      rJ t	        j                  |j                  dd             }j                  dkD  rj                  rlt        j                  ||j                   |j"                  || |      }t	        j$                  |g      5  t'        j(                  |      cd d d        cd d d        S t        j*                  |j                   |j"                  || |      cd d d        S |cd d d        S # 1 sw Y   xY w# 1 sw Y   y xY w)Ninitial_valuedtype)r  r   )r   get_group_keyr5   get_instance_keyr   r   callablerU   r   CheckpointInitialValuewrapped_valueconvert_to_tensorr   r   r   broadcast_sendshaper  control_dependenciesr   identitybroadcast_recv)	group_keyr   collective_instance_keyr  
bcast_sendr   kwargsr7   s        r9   initial_value_fnzYCollectiveAllReduceExtended._get_variable_creator_initial_value.<locals>.initial_value_fn  s   ))77A	&&
"&"7"7"H"Hv#
 ZZ 	 1-m$)OMt'B'BC)77Mm,
,,//6::gt#<- "~~)88%%%%)j ++ZL9 9 ))-89 9+	 	0 $22%%%%)1	 	> ?	 	*9 9+	 	s1   CF!,F	F!,F!
F!F	F!!F*)
replica_idr   primary_varrX   )r-   r/   #_get_variable_creator_initial_value)r7   r  r   r  r  r  r8   s   ` ` ` r9   r  z?CollectiveAllReduceExtended._get_variable_creator_initial_value  sf     Q   'R 
%t,+  !  r:   c                 r    t        j                  | j                  | j                  | j                        }|S )N)num_input_pipelinesinput_pipeline_idnum_replicas_in_sync)r
   InputContextr5   r   _num_replicas_in_sync)r7   input_contexts     r9   _make_input_contextz/CollectiveAllReduceExtended._make_input_context  s5    "// ----!77M
 r:   c                    |r2|j                   t        j                  j                  k(  rt	        d      | j                         }t        j                  || j                  |      | j                         | j                  ||      S )N{InputReplicationMode.PER_REPLICA is only supported in `distribute_datasets_from_function` of tf.distribute.MirroredStrategy)r#  r&  r   )experimental_replication_moder
   InputReplicationModePER_REPLICANotImplementedErrorr'  r   get_distributed_datasetr  _container_strategyr%  )r7   datasetr   r&  s       r9    _experimental_distribute_datasetz<CollectiveAllReduceExtended._experimental_distribute_dataset  s    11..::;  .  ,,.M--((1  "!77# r:   c                     |r2|j                   t        j                  j                  k(  rt	        d      | j                         }t        j                  || j                  |      |g| j                         |      S )Nr)  )
dataset_fninput_workersinput_contextsstrategyr   )
r*  r
   r+  r,  r-  r'  r   &get_distributed_datasets_from_functionr  r/  )r7   r3  r   r&  s       r9   "_distribute_datasets_from_functionz>CollectiveAllReduceExtended._distribute_datasets_from_function  s~    11..::;  .  ,,.M<<66w?%))+ r:   c                    g }t        | j                        }t        |      D ]K  }| j                  |z  |z   }t	        j
                  || j                        }|j                   ||             M t        j                  |d      S )NT)always_wrap)
r   r  r   r   r
   ValueContextr%  r   r   regroup)r7   value_fnper_replica_valuesnum_local_replicaslocal_replica_idr  value_contexts          r9   -_experimental_distribute_values_from_functionzICollectiveAllReduceExtended._experimental_distribute_values_from_function  s    T001!"45 9&&);;>NNj$11
d00m  789 ##$6DIIr:   c                     | j                         }t        j                  || j                  | j	                         | j
                  |      S )z*Distributes the dataset to each local GPU.)r#  r&  )r'  input_lib_v1DatasetIteratorr  r/  r%  )r7   r0  r&  s      r9   _make_dataset_iteratorz2CollectiveAllReduceExtended._make_dataset_iterator'  sG    ,,.M''  "!77# r:   c                     | j                         }t        j                  || j                  |g| j	                               S )z1Distributes the input function to each local GPU.)r'  rD  InputFunctionIteratorr  r/  )r7   input_fnreplication_moder&  s       r9   _make_input_fn_iteratorz3CollectiveAllReduceExtended._make_input_fn_iterator2  sB     ,,.M--	  "	 r:   c                 V   |rt        j                  t        j                  |      ||| j                  | j
                  i| j                        }| j                  |       t        | j                  t        j                        sJ |r!|j                  | j                  |             yy)ao  Configures the object.

    Args:
      session_config: a `tf.compat.v1.ConfigProto`
      cluster_spec: a dict, ClusterDef or ClusterSpec object specifying the
        cluster configurations.
      task_type: the current task type, such as "worker".
      task_id: the current task id.

    Raises:
      ValueError: if `task_type` is not in the `cluster_spec`.
    )r   r   r   r   r   N)rx   SimpleClusterResolverr   r   rk   r6   r   r   rU   r   r   r   CopyFromr   )r7   session_configr   r   r   r   s         r9   
_configurez&CollectiveAllReduceExtended._configure@  s     -CC(??M%%t'C'C OO ##$45

 
 "6"J"J   d77GH r:   c                 $   t        j                  |      }|j                  j                  }t        j
                  j                  |_        |j                  j                  d d = |j                  j                  j                  d       t        j                         sB| j                  j                  t        j                   j"                  k(  rd|j$                  _        | j(                  s|S | j*                  sJ | j,                  J t/        j0                  | j(                  | j*                  | j,                        |j$                  _        |j4                  d d = |j4                  j                  d| j*                  | j,                  fz         |S )Nr   Tr   )r   r   graph_optionsrewrite_optionsr   RewriterConfigONscoped_allocator_optimizationscoped_allocator_opts	enable_opr   r   r   r{   r]   r   rb   NCCLr   collective_ncclr   r   r   r   r   collective_group_leaderr   )r7   r   updated_configrS  s       r9   r   z0CollectiveAllReduceExtended._update_config_protoa  sK   ]]<0N %22BBO**-- 1 	--77:))33::;MN 335''6666;;< 59n!!1???==$$$
 	++	
 7 	%%a(!!((T__dmm<< r:   c                     t        |t        j                        rt        |j                        }nd}|t        | j
                        k(  r| j                  S | j                  S )Nr   )rU   r   DistributedValuesr   _valuesr  r   r   )r7   valuenum_devicess      r9   _get_cross_device_opsz1CollectiveAllReduceExtended._get_cross_device_ops  sR     %112&kkc$--..###(((r:   c                 J    | j                  |      j                  ||||      S )N)destinationsaxisr   )rb  _gather)r7   r`  rd  re  r   s        r9   _gather_to_implementationz5CollectiveAllReduceExtended._gather_to_implementation  s.    %%e,44LtW 5  r:   c                 P   t        |t        j                        r|t        j                  j
                  k(  r|S t        |t        j                        rJ t        |t        j                        r't        | j                        dk(  r|j                  d   }t        |t        j                        s:| j                  dk(  r+t        j                  |||t        | j                              S | j                  |      j                  |||| j                  j                  |            S )Nr   r   )rd  r   )rU   r   Mirroredr   ReduceOpMEANr^  r   r  r5   r   reduce_non_distributed_valuerb  reducer{   merge)r7   	reduce_opr`  rd  r   s        r9   
_reduce_toz&CollectiveAllReduceExtended._reduce_to  s    5&//*--222l%111 	5&223##$)ll1oe
 uf667" ">>
UL#d.A.A*B  %%e,33!++11':	 4  r:   c                    |t        j                         }t        j                         rt        |   |||      S t        j                         }|sJ d       | j                  j                  |||j                  |      S )z8Implements `StrategyExtendedV2._replica_ctx_all_reduce`.zN`StrategyExtended._replica_ctx_all_reduce` must be called in a replica context)r   r,   r   r   r-   _replica_ctx_all_reducer
   get_replica_contextr   _all_reduce_replica_id)r7   ro  r`  r   replica_contextr8   s        r9   rr  z3CollectiveAllReduceExtended._replica_ctx_all_reduce  s     '')g  "
 W,YwGG$88:O 	? !!--##	 r:   c           	         	 | j                   j                         ry | j                  j                  D ]x  }t	        | j                  j                  |            D ]O  }dj                  ||      }d}	 |dz  }	 t        j                         j                  || j                  dz         Q z t-        j.                  | j0                         # t        j                  t        j                  t        j                  f$ r}|| j                  k  r't        j                   d||| j                         Y d }~t        j"                  d||       t        j                         j%                  t        j&                  dj                  |             Y d }~  y d }~wt(        $ rW}t        j"                  d	|       t        j                         j%                  t        j*                  d
|z         Y d }~  y d }~ww xY w)Nz/job:{}/replica:0/task:{}r   r   i  )timeout_in_msz%s seems down, retrying %d/%dz@Cluster check alive failed, %s is down, aborting collectives: %sz&cluster check alive failed, {} is downz'Unexpected exception in check alive: %sz'unexecpted exception in check alive: %s) _check_health_thread_should_stopis_setr   r   r   r   formatr    check_collective_ops_peer_health_check_health_timeoutr   UnavailableErrorFailedPreconditionErrorDeadlineExceededError_check_health_retry_limitr   r   errorabort_collective_opsUNAVAILABLE	ExceptionINTERNALtimesleep_check_health_interval)r7   r   r   peerattemptses         r9   _check_healthz)CollectiveAllReduceExtended._check_health  s   
		.	.	5	5	7##(( .#T//99#>? -	G,33CA$(MH(oo@@d&@&@4&G A 
 -	.^ jj,,-e " ''..,,  D:::322	 mm-	 oo44$$:AA$G  mmEqIoo44//#Lq#P s,   82C2G8 1F6AFG8!AG33G8c           	         t        j                  g       }t        j                  d| j                  xs d       	 | j
                  j                  t        j                  j                  ||t        j                  | j                  t        j                  j                               t        j                         rt        j                           t        j                  d       t)        j*                         | _        t)        j.                  | j0                  d      | _        | j2                  j5                          y # t"        j$                  $ r t'        d| j                  z        w xY w)	Nz%Waiting for the cluster, timeout = %sinf)timeout_secondsr]   )r   z6Timeout waiting for the cluster, timeout is %d secondszCluster is ready.T)targetdaemon)r   r  r   r   _check_health_initial_timeoutr   rm  r   rj  SUMr   r,   rb   RINGr   is_async
async_waitr   r  r   	threadingEventry  Threadr  _check_health_threadstart)r7   dummy_values     r9   r   z6CollectiveAllReduceExtended._start_check_health_thread  s+    $$R(KLL/**3e
!!((



"
"

!))"@@,HHMM	 )  
			 LL$%,5OO,=D) !* 0 0!!$!D 	##% '' 
B../ s   BE   ,E,c                     t        | dd       rft        j                  d       | j                  j	                          | j
                  j                          d | _        t        j                  d       y y )Nr  zstopping check health threadzcheck health thread stopped)r   r   r   ry  r2   r  joinrG   s    r9   r   z5CollectiveAllReduceExtended._stop_check_health_thread<  sZ    t+T2ll12
++//1
$$&"&dll01 3r:   c                     | j                   j                  t        j                  j                  k(  r&| j
                  dk7  rt        j                  d       y y y )Nrj   z:Enabled NCCL communication but no GPUs detected/specified.)r{   r]   r   rb   rY  rk   r   r   rG   s    r9   r   z-CollectiveAllReduceExtended._warn_nccl_no_gpuD  sL    ##2266;;	<

!
!U
*oo
F +	<r:   c                      | j                   dkD  S )zAWhether this strategy indicates working in multi-worker settings.r   )r5   rG   s    r9   _in_multi_worker_modez1CollectiveAllReduceExtended._in_multi_worker_modeM  s    q  r:   c                      yNTrX   rG   s    r9   experimental_between_graphz6CollectiveAllReduceExtended.experimental_between_graphQ      r:   c                      yr  rX   rG   s    r9   experimental_should_initz4CollectiveAllReduceExtended.experimental_should_initU  r  r:   c                     | j                   S rH   r   rG   s    r9   should_checkpointz-CollectiveAllReduceExtended.should_checkpointY      >>r:   c                     | j                   S rH   r  rG   s    r9   should_save_summaryz/CollectiveAllReduceExtended.should_save_summary]  r  r:   c                 F    t        | j                        | j                  z  S rH   )r   r  r5   rG   s    r9   r%  z1CollectiveAllReduceExtended._num_replicas_in_synca  s    t""#d&7&777r:   c                      y)z`make_dataset_iterator` and `make_numpy_iterator` use global batch size.

    `make_input_fn_iterator` assumes per-replica batching.

    Returns:
      Boolean.
    TrX   rG   s    r9   _global_batch_sizez.CollectiveAllReduceExtended._global_batch_sizef  s     r:   c                 L    | j                   t        | j                        z  |z   S rH   r   r   r  )r7   r  s     r9   _get_replica_id_in_sync_groupz9CollectiveAllReduceExtended._get_replica_id_in_sync_groupq  s#    T%8%8!99JFFr:   c                 L    || j                   t        | j                        z  z
  S rH   r  )r7   replica_id_in_sync_groups     r9   _get_local_replica_idz1CollectiveAllReduceExtended._get_local_replica_idt  s,    #d&9&9C= '  r:   c           	         t        | d      rt        d      | j                  }|j                  |      }||t	        |       <   | j
                  j                         D ]&  \  }}t        ||t        j                  ||             ( |S )Nr  z@MultiWorkerMirroredStrategy cannot be deep copied in eager mode.)
r   rw   r8   __new__id__dict__itemssetattrr   r   )r7   memorB   resultkvs         r9   __deepcopy__z(CollectiveAllReduceExtended.__deepcopy__y  s     t+,
L  ..C[[FDDN##% 11faq$/01Mr:   rH   )NNNN)2rI   rJ   rK   rL   r   r  r  r  r}  r.   r   r|   r   r@   r   r   r  rO   r  r  r'  r1  r8  rB  rF  r
   r+  
PER_WORKERrK  rP  r   rb  rg  rp  rr  r  r   r   r   r  r  r  r  r  r%  r  r  r  r  rP   rQ   s   @r9   r/   r/   N  sp   6
  #$ #J=6,6KZ]~%
 . .9v,*	J	 &::EE MQIB*X) 
 D43.j'&R2!         8 8  G
r:   r/   )?rL   r   r  r  r}   $tensorflow.compiler.xla.tsl.protobufr   tensorflow.core.protobufr   r   tensorflow.python.distributer   r   r   r   r	   r
   r   r   r   r   r   r   r   r   -tensorflow.python.distribute.cluster_resolverr   rx   r   r   tensorflow.python.distribute.v1rD  tensorflow.python.eagerr   tensorflow.python.frameworkr   rm   r   r   tensorflow.python.opsr   r   r   tensorflow.python.platformr   r   tensorflow.python.trackabler   tensorflow.python.utilr     tensorflow.python.util.tf_exportr!   Strategyr$   typerS   rZ   rI   
StrategyV1rf   rq   MirroredExtendedr/   rX   r:   r9   <module>r     s0   K     H 8 : 8 Q ; 4 7 9 2 3 : : 6 4 / b S N E + ; . + + 0 3 < , . 6 3;f+."9"9 f+ <f+R=4 = @RH-:- I-b  (( ) 1
 DEF$N$=$= $ G$NGx"3"D"D xr:   