
    AVh                     ,   d Z ddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" ddl!m#Z# ddl!m$Z$ ddl%m&Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- d Z.d Z/	 d@dZ0d Z1d Z2d  Z3d@d!Z4d@d"Z5d@d#Z6	 	 dAd$Z7d% Z8d& Z9 e+d'       G d( d)e:             Z; e+d*       G d+ d,e;             Z<d- Z=	 dBd.Z> G d/ d0e:      Z?dCd1Z@dDd2ZA G d3 d4e;      ZBeBZC ej                  d5d6      ZE e+d7       G d8 d9eB             ZF e+d:       G d; d<eB             ZGej                  ZIej                  ZH G d= d>e;      ZJdDd?ZKy)Ez?Classes for different algorithms of reduction and broadcasting.    N)
device_lib)collective_util)cross_device_utils)device_util)distribute_utils)	ps_values)reduce_util)
tpu_values)values)values_util)context)def_function)indexed_slices)kernels)ops)tensor)tensor_util)	array_ops)math_ops)resource_variable_ops)
tf_logging)nest)	tf_export)doc_controlsc                     t        | t        j                  t        j                  f      rt        | j                        S t        |       S )zChecks whether `destinations` is not empty.

  Args:
    destinations: a `DistributedValues`, variable, or string object.

  Returns:
    Boolean which is True if `destinations` is not empty.
  )
isinstancer   BaseResourceVariable
tensor_libTensorbooldevicedestinationss    ]/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/cross_device_ops.pycheck_destinationsr%   4   sC     &;;""$% ##$$	l	    c           	      <   t        | t        j                  t        j                  t
        j                  t        j                  t        j                  t        j                  f      s t        j                  |       st        d      t!        |       st        d      y)z5Validates the `destination` is one of expected types.zcdestinations must be one of a `DistributedValues` object, a tf.Variable object, or a device string.zdestinations can not be emptyN)r   	value_libDistributedValuesr   r   r   IndexedSlicesr   AggregatingVariablesixstring_typesr
   TPUMirroredVariabler   is_resource_variable
ValueErrorr%   r"   s    r$   validate_destinationsr1   E   s    	""J$5$5##Y%B%B
77
	
 ';;LI
 B C C 
L	)
4
55 
*r&   c                    t        |t        j                        rt        d      t	        j
                  |      sCt        j                  |dk(        r+t        j                  |j                  |j                        S | t        j                  j                  k(  r|S |dk7  rt        d|d| d      t        |       t        |||      S )	z8Reduce a non-DistributedValue `value` to `destinations`.z^You are passing a `DistributedValues` to `reduce_non_distributed_value`, which is not allowed.r   )dtype   zA non-DistributedValues value z, cannot be reduced with the given reduce op .)canonicalize_devices)r   r(   r)   r0   r   
is_tf_typenpallzerosshaper3   r	   ReduceOpMEANr1   simple_broadcast)	reduce_opvaluer#   num_replicas_in_graphr6   s        r$   reduce_non_distributed_valuerB   T   s     y223
 M N N 
			&266%1*+=88EKKu{{33 +&&+++L!
 27D E E ,'|2FH Hr&   c                     t        | t        j                        r| S t        j                  |       st        j                  |       } t        | d      rt        j                  | f      S t        d      )z2Converts a single tensor into a PerReplica object.r!   zZCannot convert `input_tensor` to a `PerReplica` object because it doesn't have device set.)
r   r(   r)   r   	is_tensorr   convert_to_tensorhasattr
PerReplicar0   )input_tensors    r$   _make_tensor_into_per_replicarI   u   sg    i99: 
		|	,((6L\8$00 9 	: :r&   c                     g }t        |       } t        | t         t        f      st        d      | D ]Z  }t        |t              st        d      t	        |      dk7  rt        d      t        |d         }|j                  ||d   f       \ |S )z@Converts each tensor into a PerReplica object in the input list.z3`value_destination_pairs` should be a list or tuplez<Each element of `value_destination_pairs` should be a tuple.   zFEach element of `value_destination_pairs` should be a tuple of size 2.r   r4   )listr   tupler0   lenrI   append)value_destination_pairsresultpairper_replicas       r$   "_normalize_value_destination_pairsrT      s    & !89	+dE]	;
J
KK% 	*ddE"
HJ J
4yA~ * + + 0Q8K
MM;Q()	* 
-r&   c                     | syt        | t        t        f      syt        d | D              syt        d | D              syy)z,Validates value_destination_pairs are valid.Fc              3   <   K   | ]  }t        |t                y wN)r   rM   ).0rR   s     r$   	<genexpr>z4_validate_value_destination_pairs.<locals>.<genexpr>   s     IZe$Is   c              3   V   K   | ]!  }t        |d    t        j                         # ywr   N)r   r(   rG   )rX   vs     r$   rY   z4_validate_value_destination_pairs.<locals>.<genexpr>   s)      / !i223 /   ')T)r   rL   rM   r9   )rP   s    r$   !_validate_value_destination_pairsr^      sG     
!	+dE]	;E	I1HI	I	 /-/ 
/	r&   c                    t        | t        j                        r| j                  S |rPt        | t        j
                        rt        j                  |       fS t        j                  | j                        fS t        | t        j
                        rt        j                  |       fS t        j                  | j                        fS rW   )
r   r(   r)   _devicesr,   r-   r   resolver!   !canonicalize_without_job_and_task)r#   r6   s     r$   get_devices_fromrc      s    i99:   , 0 01!!,/11 3 3466 c../99,GII

7
78K8K
L	NNr&   c                 d    | |u xs+ t        t        | |            t        t        ||            k(  S rW   )setrc   )leftrightr6   s      r$   _devices_matchrh      s@    	 
9#.
 " #&)
5"6
7'99 9r&   c                 d     t        fd D              syt         fd dd  D              syy)Nc              3   >   K   | ]  \  }}t        ||        y wrW   rh   )rX   r\   dr6   s      r$   rY   z%_all_devices_match.<locals>.<genexpr>   s'      +
!Q Q/0+s   Fc              3   J   K   | ]  \  }}t        |d    d            ywr[   rk   )rX   r\   _r6   rP   s      r$   rY   z%_all_devices_match.<locals>.<genexpr>   s2      /
!Q Q/2157KL/s    #r4   T)r9   )rP   r6   s   ``r$   _all_devices_matchro      sA    	 +)+ 
+ 	 /)!"-/ 
/ 	r&   c                    t        ||      }t        |      dk(  r|st        j                  | |d         S g }|D ]'  }|j	                  t        j                  | |             ) t        j                  |t        j                        S )z8Broadcast `value` to `destinations` using simple copies.r4   r   
wrap_class)	rc   rN   r   'copy_tensor_or_indexed_slices_to_devicerO   r   regroupr(   Mirrored)r@   r#   always_mirroredr6   devicesvalue_updatesrl   s          r$   r>   r>      s    
 \+?@'\QEEwqz  M P

D
DUA
NPP ##M/8/A/AC Cr&   c                    | j                   }|st        d      t        |      }t        j                  |      5  t        j                  t
        j                        5  t        j                  ||      }|t        j                  j                  k(  rt        j                  ||      }n(|t        j                  j                  k7  rt        d      ddd       ddd       S # 1 sw Y   xY w# 1 sw Y   S xY w)z3Reduces the value by accumulation_fn and reduce_op.%`per_replica_value` must be non-emptyz.`reduce_op` must be Reduce.SUM or Reduce.MEAN.N)r   r0   rN   r   r!   r   device_policyDEVICE_PLACEMENT_SILENTr   #aggregate_tensors_or_indexed_slicesr	   r<   r=   %divide_by_n_tensors_or_indexed_slicesSUM)per_replica_valuereduce_to_deviceaccumulation_fnr?   
all_valuescountreduceds          r$   _simple_reducer      s     !''*	
<
==
j/%
zz"# K			w>>	? K"FF
o'g	k**//	/$JJU,,000IJJKK 
.K KK 
.s$   $C/A3C#C/#C,	(C//C9c                 *   | j                   }|st        d      t        j                  |      5  t	        j
                  t        j                        5  t        j                  ||      }ddd       ddd       S # 1 sw Y   xY w# 1 sw Y   S xY w)zAConcatenate all values in the DistributedValues input and return.rz   N)	r   r0   r   r!   r   r{   r|   r   concat)r   r   axisr   gathereds        r$   _simple_gatherr      s     ''*	
<
==
zz"# 4			w>>	? 4!!*d3h44 
/4 44 
/s#   $BA<*B<B	BBzdistribute.CrossDeviceOpsc                       e Zd ZdZd Zed        ZddZddZd Z	ddZ
d	 Zej                  d
        Zej                  d        Zej                  d        Zd Zy)CrossDeviceOpsa  Base class for cross-device reduction and broadcasting algorithms.

  The main purpose of this class is to be passed to
  `tf.distribute.MirroredStrategy` in order to choose among different cross
  device communication implementations. Prefer using the methods of
  `tf.distribute.Strategy` instead of the ones of this class.

  Implementations:
  * `tf.distribute.ReductionToOneDevice`
  * `tf.distribute.NcclAllReduce`
  * `tf.distribute.HierarchicalCopyAllReduce`
  c                     d| _         y NT)_canonicalize_devicesselfs    r$   __init__zCrossDeviceOps.__init__
  s    !%Dr&   c                      y)Nr4    r   s    r$   _num_between_graph_workersz)CrossDeviceOps._num_between_graph_workers  s     r&   Nc                 <   |t        j                         }t        |      }t        |       | j                  dk(  rt        |j                        dk(  rt        ||| j                        r}t        j                  |j                  d   j                        5  t        j                  |j                  d         }ddd       t        j                  ft        j                         S |t        j                         }| j#                  ||||      S # 1 sw Y   YxY w)a  Reduce `per_replica_value` to `destinations`.

    See `tf.distribute.StrategyExtended.reduce_to`. This can only be called in
    the cross-replica context.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to reduce to. To perform an all-reduce, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is reduced
        to the devices of that variable, and this method doesn't update the
        variable.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    Nr4   r   rq   )r   OptionsrI   r1   r   rN   r   rh   r   r   r!   r   identityr   rt   r(   ru   reduce_implementation)r   r?   r   r#   optionsr\   s         r$   reducezCrossDeviceOps.reduce  s   8 '')g56GH,' &&!+  1"%&1'+9|T-G-G,I::'..q1889 <077:;<%%qdy7I7IJJ'')g%%i1B&2G= =< <s   #DDc                 Z   t        |t        j                        rt        d      |t	        j
                         }t        |      }t        |       | j                  dk(  rt        |j                        dk(  rt        ||| j                        r}t        j                  |j                  d   j                        5  t        j                   |j                  d         }ddd       t#        j$                  ft&        j(                        S | j+                  ||||      S # 1 sw Y   CxY w)a>  Gather `per_replica_value` to `destinations`.

    Args:
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to gather to. To perform an all-gather, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is gathered
        to the devices of that variable, and this method doesn't update the
        variable.
      axis: specifies the dimension to gather along within each replica's
        tensor.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z0gather/all_gather does not support IndexedSlicesNr4   r   rq   )r   r   r*   NotImplementedErrorr   r   rI   r1   r   rN   r   rh   r   r   r!   r   r   r   rt   r(   ru   _gather_implementation)r   r   r#   r   r   r\   s         r$   _gatherzCrossDeviceOps._gatherC  s   2 #^%A%AB !0 1 1'')g56GH,' &&!+  1"%&1'+9|T-G-G,I::'..q1889 <077:;<%%qdy7I7IJJ&&'8,'.0 0	< <s   <#D!!D*c                     t        d      )a  Implementation of `gather` method of `tf.distribute.CrossDeviceOps`.

    Overriding this method is useful for subclass implementers.

    Args:
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to gather to. To perform an all-gather, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is gathered
        to the devices of that variable, this method doesn't update the
        variable.
      axis: specifies the dimension to gather along within each replica's
        tensor.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z2_gather method must be implemented in descendants.r   r   r   r#   r   r   s        r$   r   z%CrossDeviceOps._gather_implementationq  s    8 <> >r&   c                    |t        j                         }t        |      st        |      }|D ]  \  }}t	        |        | j
                  dk(  rvt        || j                        r`t        |d   d   j                        dk(  rB|D cg c]4  \  }}t        j                  |j                  t        j                        6 c}}S |t        j                         }| j                  |||      S c c}}w )aB  Reduce values to destinations in batches.

    See `tf.distribute.StrategyExtended.batch_reduce_to`. This can only be
    called in the cross-replica context.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      value_destination_pairs: a sequence of (value, destinations) pairs. See
        `tf.distribute.CrossDeviceOps.reduce` for descriptions.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A list of `tf.Tensor` or `tf.distribute.DistributedValues`, one per pair
      in `value_destination_pairs`.

    Raises:
      ValueError: if `value_destination_pairs` is not an iterable of
        tuples of `tf.distribute.DistributedValues` and destinations.
    r4   r   rq   )r   r   r^   rT   r1   r   ro   r   rN   r   r   rt   r(   ru   batch_reduce_implementation)r   r?   rP   r   rn   rl   r\   s          r$   batch_reducezCrossDeviceOps.batch_reduce  s   , '')g --DE !C
!!# ( 1A &&!+0B!;!;1=AD#A&q)00B256B7 .a 
"
"188	8J8J
K 
 '')g++I7N,35 5s   
9C0c                 <    t        |       | j                  ||      S )aC  Broadcast `tensor` to `destinations`.

    This can only be called in the cross-replica context.

    Args:
      tensor: a `tf.Tensor` like object. The value to broadcast.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to broadcast to. Note that if it's a `tf.Variable`, the value is
        broadcasted to the devices of that variable, this method doesn't update
        the variable.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.
    )r1   broadcast_implementationr   r   r#   s      r$   	broadcastzCrossDeviceOps.broadcast  s      ,'((>>r&   c                     t        d      )am  Implementation of `reduce`.

    Overriding this method is useful for subclass implementers.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to reduce to. To perform an all-reduce, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is reduced
        to the devices of that variable, this method doesn't update the
        variable.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z2_reduce method must be implemented in descendants.r   r   r?   r   r#   r   s        r$   r   z$CrossDeviceOps.reduce_implementation  s    : <> >r&   c                     t        d      )a  Implementation of `batch_reduce`.

    Overriding this method is useful for subclass implementers.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      value_destination_pairs: a sequence of (value, destinations) pairs. See
        `reduce` for descriptions.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A list of `tf.Tensor` or `tf.distribute.DistributedValues`, one per pair
      in `value_destination_pairs`.

    Raises:
      ValueError: if `value_destination_pairs` is not an iterable of
        tuples of `tf.distribute.DistributedValues` and destinations.
    zFbatch_reduce_implementation method must be implemented in descendants.r   )r   r?   rP   r   s       r$   r   z*CrossDeviceOps.batch_reduce_implementation  s    . P r&   c                 4    t        ||d| j                        S )a  Implementation of `broadcast`.

    Args:
      tensor: a `tf.Tensor` like object. The value to broadcast.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to broadcast to.
        `destinations`. Note that if it's a `tf.Variable`, the value is
        broadcasted to the devices of that variable, this method doesn't update
        the variable.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.
    T)rv   r6   )r>   r   r   s      r$   r   z'CrossDeviceOps.broadcast_implementation  s#      !77	9 9r&   c                     t        d      )a  All-reduce the `value` across all replicas so that all get the result.

    `value` can be a nested structure of tensors or `IndexedSlices`. The
    implementation should generally batch the all-reduces when possible.
    `options` can be set to hint the batching behavior.

    This API must be called in a replica context.

    Args:
      reduce_op: A `tf.distribute.ReduceOp` value specifying how values should
        be combined.
      value: Value to be reduced. A tensor or a nested structure of tensors or
        `IndexedSlices`.
      replica_id: An integer indicating the id of the replica where this
        all_reduce is called under. This is the local replica id that ranges
        from 0 to len(local_devices) - 1.
      options: A `tf.distribute.experimental.CommunicationOptions`.

    Returns:
      A tensor/IndexedSlices or a nested strucutre of tensors/IndexedSlices with
      the reduced values. The structure is the same as `value`.
    z/_all_reduce must be implemented in descendants.r   )r   r?   r@   
replica_idr   s        r$   _all_reducezCrossDeviceOps._all_reduce+  s    . O
PPr&   rW   )__name__
__module____qualname____doc__r   propertyr   r   r   r   r   r   r   for_subclass_implementersr   r   r   r   r   r&   r$   r   r      s    	  .=`,0\>>/5b?& ))> *>> )) *4 ))9 *96Qr&   r   zdistribute.ReductionToOneDevicec                   6     e Zd ZdZd fd	Zd Zd Zd Z xZS )ReductionToOneDevicea  A CrossDeviceOps implementation that copies values to one device to reduce.

  This implementation always copies values to one device to reduce them, then
  broadcast reduced values to the destinations. It doesn't support efficient
  batching.

  Here is how you can use `ReductionToOneDevice` in
  `tf.distribute.MirroredStrategy`:

  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.ReductionToOneDevice())
  ```
  c                 j    || _         |xs t        j                  | _        t        t
        |           y)aH  Initializes with a device to reduce to and a way to accumulate.

    Args:
      reduce_to_device: the intermediate device to reduce to. If None, reduce
        to the first device in `destinations` of the `reduce` method.
      accumulation_fn: a function that does accumulation.  If None,
        `tf.math.add_n` is used.
    N)r   r   add_nr   superr   r   )r   r   r   	__class__s      r$   r   zReductionToOneDevice.__init__V  s,     -D*<hnnD	
.0r&   c                 F   ~t        |      rt        || j                        }nt        || j                        }| j                  xs |d   }t	        j
                  t        j                  d|d|dd       t        ||| j                  |      }| j                  ||      S )Nr   z
Reduce to  then broadcast to r5   
   )
r%   rc   r   r   logginglog_first_nINFOr   r   r   )r   r?   r   r#   r   rw   r   r   s           r$   r   z*ReductionToOneDevice.reduce_implementationc  s    ,' t/I/IJg !2D4N4NOg,,:
0@'JBP .0@!119>G>>'<00r&   c                 0   ~t        |      rt        || j                        }nt        || j                        }| j                  xs |d   }t	        j
                  t        j                  d|d|dd       t        |||      }| j                  ||      S )Nr   z
Gather to r   r5   r   )	r%   rc   r   r   r   r   r   r   r   )r   r   r#   r   r   rw   r   r   s           r$   r   z+ReductionToOneDevice._gather_implementationr  s    ,' t/I/IJg !2D4N4NOg,,:
0@'JBP /1A4HH>>(L11r&   c           
      ^    |D cg c]  \  }}| j                  ||||       c}}S c c}}w )N)r#   r   )r   )r   r?   rP   r   tr\   s         r$   r   z0ReductionToOneDevice.batch_reduce_implementation  sC    
 , Aq 	""qq' 	# 	;  s   ))NN)	r   r   r   r   r   r   r   r   __classcell__r   s   @r$   r   r   E  s    112r&   r   c                    | d   j                   }t        t        |            D cg c]  }g  }}| D ]F  }t        |j                        D ],  \  }}|j                   |k(  sJ ||   j                  |df       . H |S c c}w )ae  Group values into sublists by their devices.

  This grouping is needed to call the all-reduce library because it expects a
  list of the following form:
    [[(grad0_gpu0, v0_gpu0), (grad1_gpu0, v1_gpu0), (grad2_gpu0, v2_gpu0) ...],
     [(grad0_gpu1, v0_gpu1), (grad1_gpu1, v1_gpu1), (grad2_gpu1, v2_gpu1) ...],
     [(grad0_gpu2, v0_gpu2), (grad1_gpu0, v1_gpu2), (grad2_gpu0, v2_gpu2) ...],
     ...
    ]

  Args:
    per_replica_values: a list of PerReplica objects.

  Returns:
    a list of lists, each sublist has components for its corresponding device of
      PerReplica objects, paired with a None.
  r   N)r`   rangerN   	enumerater   rO   )per_replica_valuesr#   rn   groupedr   ir\   s          r$   _group_value_by_devicer     s    $ $A&//,s<012AR2'2- #+223 #1''<777ajD	"##
 
. 3s   	A>c                 &   t        t        |            |z  }t        t        | d               D cg c]  }g  }}| D ]  }t        |      D ]y  \  }\  }	}|t        j
                  j                  k(  r@t        j                  |	j                        5  ||   j                  |	|z         ddd       f||   j                  |	       {  |D 	cg c]'  }	t        j                  |	t        j                        ) c}	S c c}w # 1 sw Y   xY wc c}	w )a  Ungroup results from all-reduce and make Mirrored objects.

  Each all-reduce result will be divided by the number of destinations before
  Mirrored objects are created if reduce_op is "mean".

  Args:
    grouped_reduced: a list of lists, each sublist has components for each
      device, paired with a None. It is the result from
      cross_device_utils.aggregate_gradients_using*.
    destinations: a value to colocate the result with.
    reduce_op: Indicates how values will be aggregated. Accepted values
      are `tf.distribute.ReduceOp.SUM`, `tf.distribute.ReduceOp.MEAN`.
    num_between_graph_workers: number of workers in the between-graph
      replication.

  Returns:
    a list of Mirrored objects.
  r   Nrq   )rN   rc   r   r   r	   r<   r=   r   r!   rO   r   rt   r(   ru   )
grouped_reducedr#   r?   num_between_graph_workersnum_replicasrn   indexper_replica_reducedr   r\   s
             r$   _ungroup_and_make_mirroredr     s
   , %l347PP,S!345
6!2
6%
6, 23 	6Aq	k**//	/ZZ! 	,
(//!l*
+	, 	, 	a 27
8,- 
"
"I&&( 
8 8 7	, 	,
8s   	C=D,DDc                   $    e Zd ZdZddZd Zd Zy)_ConcatAndSplitPackerz,Concatenate and split tensors for reduction.c                 2    |dk  rt        d      || _        y)zInitialize the _ConcatAndSplitPacker object.

    Args:
      num_packs: specifies the number of split packs that will be
        formed.

    Raises:
      ValueError: if num_packs is not greater than 0.
    r   z$num_packs must be greater than zero.N)r0   	num_packs)r   r   s     r$   r   z_ConcatAndSplitPacker.__init__  s     A~=>>DNr&   c           
         || _         g | _        g | _        g }|D ]  }t        j                  |d   d         5  |D cg c]  \  }}t        j                  |dg       }}}|D cg c]  \  }}t        j                  |       }}}|D cg c]  \  }}t        j                  |       }}}t        j                  |d      }	| j                  }
t        d |D              r7t        |D cg c]  \  }}|j                  j                         ! c}}      }nt        j                  |	      }||
z  }|||
dz
  z  z
  }|g|
dz
  z  |gz   }t        j                  |	|      }|j                  t!        |dg|
z               | j                  j                  |       | j                  j                  |       ddd        |S c c}}w c c}}w c c}}w c c}}w # 1 sw Y   xY w)zPack tensors.r   c              3   V   K   | ]!  \  }}|j                   j                          # y wrW   )r;   is_fully_defined)rX   grn   s      r$   rY   z-_ConcatAndSplitPacker.pack.<locals>.<genexpr>  s!     Ldaqww'')Lr]   r4   N)grouped_grads_and_varsall_device_shapesall_device_sizesr   colocate_withr   reshaper;   sizer   r   r9   sumnum_elementssplitrO   zip)r   r   device_grad_packsdevice_grads_and_varsr   rn   
flat_gradsdevice_shapesdevice_sizesconcat_grads
num_splitstotal_grad_size
split_sizesplit_size_lastsplit_sizes
grad_packss                   r$   packz_ConcatAndSplitPacker.pack  s   "8DDD!7 )3215a89 (3 4I
+/1aIa"&

 
 9NN1+NN6KLda	q)LL ''
A6 ^^
 L6KLL2GH$!Qqww##%HJ/ &NN<8/$
2
)J*q.,II!lj1n58II__\;?
 	  Z$*1D!EF%%m4$$\2Q(3 (3)3V Q
 OL" I3(3 (3sB   G!G
 G(G
GG
.AG/$GB&GGG(	c           
      L   g }t        || j                  | j                  | j                        D ]  \  }}}}t	        j
                  |d   d         5  |D cg c]  \  }}|	 }}}t        j                  |d      }	t        j                  |	|      }
t        ||
      D cg c]  \  }}t        j                  ||       }}}t        ||      D cg c]  \  }\  }}||f }}}}|j                  |       ddd        |S c c}}w c c}}w c c}}}w # 1 sw Y   xY w)zReverse the pack.r   N)r   r   r   r   r   r   r   r   r   r   rO   )r   summed_device_grad_packsaggregated_device_gradsr   r   r   r   rn   r   device_grads_concatgrads_with_sizesr;   gradgrads_with_shapesr\   summed_device_gradss                   r$   unpackz_ConcatAndSplitPacker.unpack  sX    @C%t'B'B##T%:%:A<< 	=	!	| 5a8;< <+CD41aQDD (../@!D %??+>M
  #=2BC
t dE*
 
 %((9(=%?
 
 q&1aQF
 
 	 &&':;+< <<: #") E

#< <s6   DD
$>D" D
DD&DDD#	Nr4   )r   r   r   r   r   r   r   r   r&   r$   r   r     s    42h #r&   r   c                 \    |dkD  r t        |      }|j                  |       }||fS d}| }||fS )zPack tensors if specified.r   N)r   r   )device_gradsr   tensor_packerr   s       r$   _pack_tensorsr  0  sI    ]))4M%**<8 
M	)) M$	M	))r&   c                 ,    |r|j                  |       S | S )z4Unpack tensors if they are packed before all-reduce.)r   )r   r  s     r$   _unpack_tensorsr  ;  s    ((	.r&   c                   H     e Zd ZdZd	 fd	Zd Zd Zd Zd Zd Z	d Z
 xZS )
AllReduceCrossDeviceOpsaI  All-reduce implementation of CrossDeviceOps.

  It performs all-reduce when applicable using NCCL or hierarchical copy. For
  the batch API, tensors will be repacked or aggregated for more efficient
  cross-device transportation.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.
  c                 d    || _         || _        t               | _        t        t
        |           y)a  Initializes the object.

    Args:
      all_reduce_alg: the all-reduce algorithm to use, currently only "nccl" or
        "hierarchical_copy" are supported.
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.
    N)_all_reduce_alg
_num_packsr   _simple_cross_replica_opsr   r  r   )r   all_reduce_algr   r   s      r$   r   z AllReduceCrossDeviceOps.__init__M  s-     *DDO%9%;D"	
!413r&   c                     ~t        ||      r1t        d t        |      D              s| j                  ||g      d   S | j                  j                  |||      S )Nc              3   @   K   | ]  }d |j                         v   yw)cpuNlowerrX   rl   s     r$   rY   z@AllReduceCrossDeviceOps.reduce_implementation.<locals>.<genexpr>a  s     Kq"K   r   )rh   anyrc   _batch_all_reducer  r   r   s        r$   r   z-AllReduceCrossDeviceOps.reduce_implementation[  se     	(,7K,<\,JKK##I0A/BCAFF++229>O3?A Ar&   c           
          t        |      r#| j                  ||D cg c]  }|d   	 c}      S |D cg c]  \  }}| j                  ||||       c}}S c c}w c c}}w )Nr   )ro   r  r   )r   r?   rP   r   r\   r@   dests          r$   r   z3AllReduceCrossDeviceOps.batch_reduce_implementationg  sq    12##I3J$KaQqT$KM M
 5eT 
$
$YtW
E  %Ls
   A
Ac                     t        j                  |      \  }}}}|r| j                  ||      }ng }|r| j                  ||      }ng }t        j                  ||f||ff      S )z All-reduce algorithm in a batch.)r   split_by_sparsity_do_batch_all_reduce_do_batch_all_reduce_sparsestitch_values)	r   r?   r   dense_valuesdense_indicessparse_valuessparse_indicesdense_resultssparse_resultss	            r$   r  z)AllReduceCrossDeviceOps._batch_all_reducer  s     	,,-?@ ?L-//	<Hmm77	8EGn n++m]-K.<n-M-O P Pr&   c                    t        j                  t         j                  dt        |      | j                  | j
                  fz  d       |d   j                  }t        |      }t        || j
                        \  }}| j                  dk(  rt        j                  |      }nt        j                  ||      }t        ||      }t        ||d   |      S )zRun batch all-reduces.zDbatch_all_reduce: %d all-reduces with algorithm = %s, num_packs = %dr   r   nccl)r   r   r   rN   r
  r  r`   r   r  r   aggregate_gradients_using_nccl+aggregate_gradients_using_hierarchical_copyr  r   )r   r?   r  r#   r   r   r  r   s           r$   r  z,AllReduceCrossDeviceOps._do_batch_all_reduce  s    N	\	D00$//B	CDFH
  ?++L$\2G (5Wdoo'N$}
 v%"AA
g 
H
H-/  g}5G%g|A	JJr&   c                     t        j                  t         j                  dt        |      z  d       | j                  j                  |t        ||            S )z'Run batch all-reduce for sparse values.z9Efficient allreduce is not supported for %d IndexedSlicesr   )r   r   WARNrN   r  r   r   )r   r?   r   s      r$   r  z3AllReduceCrossDeviceOps._do_batch_all_reduce_sparse  sQ    CM	  ))663}m46 6r&   c                     t        j                  t         j                  dd       t               j	                  ||||      S )Nzgather/all_gather with NCCL or HierarchicalCopy is not supported. Falling back to gather on one device and then broadcast. We're working on a more efficient implementation.   )r   r   r)  r   r   r   s        r$   r   z.AllReduceCrossDeviceOps._gather_implementation  sA    	/01	3
  !))*;\4*13 3r&   )r%  r4   )r   r   r   r   r   r   r   r  r  r  r   r   r   s   @r$   r  r  B  s.    4
A	P K>	63r&   r  AllReduceSpecTuplezalg shards limitzdistribute.NcclAllReducec                   $     e Zd ZdZd fd	Z xZS )NcclAllReducea  NCCL all-reduce implementation of CrossDeviceOps.

  It uses Nvidia NCCL for all-reduce. For the batch API, tensors will be
  repacked or aggregated for more efficient cross-device transportation.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.

  Here is how you can use `NcclAllReduce` in `tf.distribute.MirroredStrategy`:


  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.NcclAllReduce())
  ```
  c                 n    |dk  rt        dj                  |            t        t        |   d|       y)zInitializes the object.

    Args:
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.

    Raises:
      ValueError: if `num_packs` is negative.
    r   z<NCCL all-reduce requires num_packs >= 0, but {} is specifiedr%  r  r   N)r0   formatr   r.  r   r   r   r   s     r$   r   zNcclAllReduce.__init__  sE     1}
H
O
O  
-' ( 4r&   r   r   r   r   r   r   r   r   s   @r$   r.  r.    s    "4 4r&   r.  z$distribute.HierarchicalCopyAllReducec                   $     e Zd ZdZd fd	Z xZS )HierarchicalCopyAllReduceaA  Hierarchical copy all-reduce implementation of CrossDeviceOps.

  It reduces to one GPU along edges in some hierarchy and broadcasts back to
  each GPU along the same path. For the batch API, tensors will be repacked or
  aggregated for more efficient cross-device transportation.

  This is a reduction created for Nvidia DGX-1 which assumes GPUs connects like
  that on DGX-1 machine. If you have different GPU inter-connections, it is
  likely that it would be slower than `tf.distribute.ReductionToOneDevice`.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.

  Here is how you can use `HierarchicalCopyAllReduce` in
  `tf.distribute.MirroredStrategy`:

  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
  ```
  c                 n    |dk  rt        dj                  |            t        t        |   d|       y)zInitializes the object.

    Args:
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.

    Raises:
      ValueError if `num_packs` is negative.
    r   z=HierarchicalCopy requires num_packs >= 0, but {} is specifiedhierarchical_copyr0  N)r0   r1  r   r5  r   r2  s     r$   r   z"HierarchicalCopyAllReduce.__init__  sC     1}
I6)  

#T3* 4 r&   r   r3  r   s   @r$   r5  r5    s    , r&   r5  c                   b     e Zd ZdZ	 	 d fd	Zed        Zd Zd Zd Z	d Z
d Zd	 Zd
 Z xZS )CollectiveAllReducezAll-reduce cross device ops using collective ops.

  In the between-graph replicated training, it will still do all-reduces across
  all workers and then put results on the right destinations.
  c                    |t        |      z  dkD  rt        d      || _        || _        |xs t	        j
                         | _        t        j                         | _	        |rt        d |D              | _        nt        d |D              | _        | j                  j                  | j                        }g | _        d| _        | j                  D ]X  }t	        j                  ||| j                  ||      }| j                  j!                  |       |j#                         rRd| _        Z t$        t&        | S          || _        y)a  Initializes the object.

    Args:
      devices: a list of device strings to run collectives on.
      group_size: the global group size. For between-graph replicated training
        it's the total number of devices across all workers.
      options: a `tf.distribute.experimental.CommunicationOptions`.
      collective_keys: an optional CollectiveKey object.
      canonicalize_devices: Whether to canonicalize devices for workers or not.
    r   z6group_size must be divisible by the number of devices.c              3   F   K   | ]  }t        j                  |        y wrW   r   canonicalizer  s     r$   rY   z/CollectiveAllReduce.__init__.<locals>.<genexpr>C  s     IAK44Q7I   !c              3   F   K   | ]  }t        j                  |        y wrW   )r   rb   r  s     r$   rY   z/CollectiveAllReduce.__init__.<locals>.<genexpr>E  s"      M?@+
7
7
:Mr>  FTN)rN   r0   _group_size_optionsr   CollectiveKeys_collective_keys	threadingLock_lockrM   r`   get_group_key
_launchers_limited_ncclCollectiveReplicaLauncherrO   can_order_ncclr   r9  r   r   )
r   rw   
group_sizer   collective_keysr6   	group_keyr!   launcherr   s
            r$   r   zCollectiveAllReduce.__init__  s-     CL 1$OPP!DDM, A/>>@ 	 !DJIIIdm MDKM Mdm%%33DMMBIDO D-- "#==
Z!6!6Ih
ooX&$$&!" 

t-/!5Dr&   c                 F    | j                   t        | j                        z  S rW   )r@  rN   r`   r   s    r$   r   z.CollectiveAllReduce._num_between_graph_workersW  s     c$--000r&   c           
         t        j                  |      }| j                  rr|j                  t        j
                  j                  k(  rKt        |      dk(  r=|j                  t	        j                  t        j
                  j                              }| j                  |   }t        j                  |      \  }}}	}
g }g }|r+|j                          t        j                  ||j                         }t#        j$                         sX|dk(  rSt'        j(                  dt        |      t        | j                        | j*                  |j                  t        |             |j-                  ||      }|t.        j0                  j2                  k(  rPt5        |      D ]B  \  }}t7        j8                  | j:                  |         5  || j*                  z  ||<   ddd       D |j                          |	r4t#        j$                         sN|dk(  rIt'        j(                  dt        |	      t        | j                        | j*                  |j                         |	D ]#  }|j=                  |j?                  ||             % |t.        j0                  j2                  k(  rt5        |      D ]  \  }}t7        j8                  | j:                  |         5  tA        jB                  ||   jD                  | j*                  z  ||   jF                  ||   jH                        ||<   ddd        t        jJ                  ||f||
ff      }t        jL                  ||      S # 1 sw Y   xY w# 1 sw Y   xY w)z%Implements CrossDeviceOps.all_reduce.r4   implementationr   zuCollective all_reduce tensors: %d all_reduces, num_devices = %d, group_size = %d, implementation = %s, num_packs = %dNzjCollective all_reduce IndexedSlices: %d all_reduces, num_devices =%d, group_size = %d, implementation = %s)r   indicesdense_shape)'r   flattenrI  rS  r   CommunicationImplementationNCCLrN   merger   RINGrH  r   r  reversegroup_by_sizebytes_per_packr   executing_eagerlyr   infor@  batch_all_reducer	   r<   r=   r   r   r!   r`   rO   all_reduce_indexed_slicesr   r*   r   rT  rU  r  pack_sequence_as)r   r?   r@   r   r   flat_valuesrO  r  r  r   r!  r"  r#  packsr   r\   indexed_sliceflat_resultss                     r$   r   zCollectiveAllReduce._all_reduce\  s    ,,u%K 	w5566;; <KA

!
!,HHMMOPg z*H,,[9 ?L-MN  ..|/6/E/EGe &&(Z1_Cs4??3T5E5E""CJ		0 //w?m	k**//	/m, 	4DAqzz$--
34 4 4#3#33M!4 4	4 &&(Z1_78;M8J $"2"2G4J4J	L
 ) H-..}gF	HH 
k**//	/n- 	;DAqzz$--
34 ; . < <%a(//$2B2BB&q)11*1-99!;N1; ;	; &33
	'..)IJLL  5534 4"; ;s   MAM$M!	$M-	c           
      h     j                   D cg c]  }g  c}t         j                         }|D ]3  }t        |      D ]#  }|   j                  |j                  |          % 5 t        j                         r fd} j                  5  t        j                  j                  t         j                               }	|	j                  |t        t        |                  }
|	j                          ddd       nNg }
 j                  5  t        |      D ](  }|
j                   j                  |   |             * 	 ddd       g }t        
 D ]6  }|j                  t!        j"                  |t$        j&                               8 |S c c}w # 1 sw Y   SxY w# 1 sw Y   _xY w)z'All reduce a list of per_replica_value.c                     t        j                         5  j                  |    |       cd d d        S # 1 sw Y   y xY wrW   )r   
eager_moder   )	device_idr   r?   r   values_by_devices    r$   	thread_fnzECollectiveAllReduce._all_reduce_per_replica_values.<locals>.thread_fn  sA    ! 	6!!)-=i-H"+W6	6 	6 	6s	   7A Nrq   )r`   rN   r   rO   r   r   r^  rF  multiprocessingpool
ThreadPoolmaprL   closer   r   r   rt   r(   ru   )r   r?   r   r   rn   num_devicesrS   r   rl  rn  outputs_by_devicerQ   r   rk  s   `` `         @r$   _all_reduce_per_replica_valuesz2CollectiveAllReduce._all_reduce_per_replica_values  s    %)MM2q2dmm$K) :[! :!"";#5#5a#89::   "6
 :: ##..s4==/AB HHYU;5G0HI

 
 :: L{# 	LA

"
"y*:1*=q'JL	LL
 F() Kmm

"
"6i6H6H
IKK M9 3 L Ls   	FA'F7F(F%(F1c           	         t        j                          | j                  ||g|      d   }t        || j                        }t        ||| j                        r|S t        |t        j                        st        j                  |g      }g }t        j                  |j                        5  |D ]  }t        j                  |      5  |j                  D ]7  }	|	j                  |k(  s|j                  t        j                  |	              n/ |j                  t        j                  |j                                d d d         	 d d d        t#        j$                  |t        j                        S # 1 sw Y   xY w# 1 sw Y   :xY wNr   rq   )r   mark_as_unsaveablert  rc   r   rh   r   r(   ru   r   control_dependenciesr   r!   rO   r   r   _primaryr   rt   )
r   r?   r   r#   r   all_reducedrw   r   rl   r\   s
             r$   r   z)CollectiveAllReduce.reduce_implementation  sk   ""$55i7H6I6=??@BK |T-G-GHG'002 k9#5#56&&}5k E		!	!+"4"4	5 
C 	C!ZZ] 	C%% Caxx1}ll9--a01C LL++K,@,@AB	C 	C	C
C ##Ei6H6HII	C 	C
C 
Cs+   ,F E4'AE4<
F 4E=9F  F	c           
      T   t        j                          t        || j                        }|r$| j	                  ||D cg c]  }|d   	 c}|      S |s%t        j                  t
        j                  dd       |D cg c]  \  }}| j                  ||||       c}}S c c}w c c}}w )Nr   zFEfficient batch_reduce is not supported if destinations are different.r   )	r   rw  ro   r   rt  r   r   r)  r   )r   r?   rP   r   all_devices_matchr\   r@   r  s           r$   r   z/CollectiveAllReduce.batch_reduce_implementation  s    ""$*+B+/+E+EG00
$;<qad<gG G LL *+-	/ 5eT 
$
$YtW
E  =s   B
=B$c           	         | j                  |g||      d   }t        j                          t        || j                        }t        ||| j                        r|S t        |t        j                        st        j                  |g      }g }t        j                  |j                        5  |D ]  }t        j                  |      5  |j                  D ]e  }	|	j                  |k(  r&|j                  t        j                  |	              n0|j                  t        j                  |j                                g d d d         	 d d d        t#        j$                  |t        j                        S # 1 sw Y   xY w# 1 sw Y   :xY wrv  )_batch_all_gatherr   rw  rc   r   rh   r   r(   ru   r   rx  r   r!   rO   r   r   ry  r   rt   )
r   r   r#   r   r   all_gatheredrw   r   rl   r\   s
             r$   r   z*CollectiveAllReduce._gather_implementation  s`   ))+<*=tWMaPL""$|T-G-GHG'002 lI$6$67''7l E		!	!,"5"5	6 F F!ZZ] 	F&& Faxx1}ll9--a01ll9--l.C.CDEF	F 	FFF ##Ei6H6HII	F 	FF Fs%   ,F A5E4<
F 4E=9F  F	c                     t              } j                  rij                  t        j                  j
                  k(  rB|dk(  r=j                  t        j                  t        j                  j                              t        j                  t        j                  d|t         j                         j                  j                  fz  d        fd}t        j                         r t!        j"                  |             }n |       }g }|D ]6  }|j%                  t'        j(                  |t*        j,                               8 |S )z'all gather multiple per-replica-values.r4   rR  zeCollective batch_all_gather: %d all-gathers, num_devices = %d, group_size = %d, implementation = %s, r   c                     g } j                   5  t        j                  d      5  D ]t  }g }t        t	        j
                              D ]>  }|j                  j                  |   j                  |j                  |                @ | j                  |       v 	 d d d        d d d        | S # 1 sw Y   xY w# 1 sw Y   | S xY w)N	allgather)
rF  r   
name_scoper   rN   r`   rO   rH  
all_gatherr   )gathered_valuesrS   outputsr   r   r   r   r   s       r$   compute_gathered_valueszFCollectiveAllReduce._batch_all_gather.<locals>.compute_gathered_values-  s    o:: *s~~k2 *- 	*K'T]]+, 7aNN4??1-88""1%tW6 77 
 
 
)	** * * * * s#   B?A:B3!B?3B<	8B??C	rq   )rN   rI  rS  r   rW  rX  rY  r   rZ  r   r   r   r`   r@  r   r^  r   functionrO   r   rt   r(   ru   )	r   r   r   r   
batch_sizer  r  mirroredr@   s	   ````     r$   r~  z%CollectiveAllReduce._batch_all_gather  s-   '(J 	w5566;; <a

!
!,HHMMOPg  C	SMM ,,g.D.D	FF HJ	K	   "F--.EFHo/1oH  Joo

"
"5Y5G5G
HJJ Or&   c                     t        j                  | j                  |      }t        | j                  | j
                  | j                  || j                        S rW   )copydeepcopyrC  r9  r`   r@  rA  r   )r   memorM  s      r$   __deepcopy__z CollectiveAllReduce.__deepcopy__C  sH     mmD$9$94@Ot}}d.>.>.0J0JL Lr&   r   )r   r   r   r   r   r   r   r   rt  r   r   r   r~  r  r   r   s   @r$   r9  r9    sU      $$(96v 1 1H6TB JD&J<'RLr&   r9  c           	         t        d | D              }t        j                         rt        j                         j	                  d      }t        j                         j                  d      }t        |      t        |      k7  rt        j                  d       t               S t        j                         j	                         }nt        j                  |      }t               }|D ]?  }t        j                  |j                        |v s%|j                  |j                         A t        |      t        |      k7  r1t        j                  ddj!                  t#        ||z
                     t%        d |D              rt        j                  d	       t               S t'        j(                  d
      rt+        d      S t        j                  d       t               S )aJ  Find the best `CrossDeviceOps` locally given a `tf.compat.v1.ConfigProto`.

  Args:
    devices: a list of devices passed to `tf.distribute.Strategy`.
    session_config: a `tf.compat.v1.ConfigProto` or `None`. If `None`, it will
      make decision based on all logical devices.

  Returns:
    A subclass of `CrossDeviceOps`.
  c              3   F   K   | ]  }t        j                  |        y wrW   r<  r  s     r$   rY   z*select_cross_device_ops.<locals>.<genexpr>V  s     G!+2215Gr>  GPU)device_typezUNCCL is not supported when using virtual GPUs, fallingback to reduction to one device)session_configzTSome requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: %s,c              3   @   K   | ]  }d |j                         v  yw)gpuNr  r  s     r$   rY   z*select_cross_device_ops.<locals>.<genexpr>m  s     ;Aaggi	;r  zPThere are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.r.  r4   )r   z3Nccl kernel is not found, not using nccl allreduce.)re   r   #executing_eagerly_outside_functionsr   list_logical_deviceslist_physical_devicesrN   r   warningr   r   list_local_devicesr   r=  nameaddjoinrL   r  r   get_registered_kernels_for_opr.  )rw   r  requested_deviceslogical_gpusphysical_gpusmachine_devicesusing_devicesrl   s           r$   select_cross_device_opsr  K  s    GwGG,,.??$99e9LLOO%;;;NM
<C..oo 8 9!##oo'<<>O 33%'O%-  a'+<<  	3011OO	 XXd+<}+L&MNP 	;):;;OO 0 1!!**?;1%%OOIJ!!r&   )T)FTr   )r   rW   )Lr   collectionsr  multiprocessing.dummyrm  multiprocessing.poolrD  numpyr8   r,   tensorflow.python.clientr   tensorflow.python.distributer   r   r   r   r   r	   r
   r   r(   r   tensorflow.python.eagerr   r   tensorflow.python.frameworkr   r   r   r   r   r   tensorflow.python.opsr   r   r   tensorflow.python.platformr   r   tensorflow.python.utilr    tensorflow.python.util.tf_exportr   tensorflow.tools.docsr   r%   r1   rB   rI   rT   r^   rc   rh   ro   r>   r   r   objectr   r   r   r   r   r  r  r  AllReduceCrossTowerOps
namedtupler,  r.  r5  rW  CollectiveCommunicationr9  r  r   r&   r$   <module>r     s   F       
 / 8 ; 4 9 2 4 3 < 4 + 0 6 / + < 3 + * 7 < ' 6 ."6& 7;	HB: *
O9	 &+*.C$(	 &'FQV FQ (FQR
 ,-@> @ .@F> :; 8Fe#F e#P*r3n r3l 1  ,[++,@,>@  %&!4+ !4 '!4H 12' 7 ' 3'V *EE -II sL. sLl	+"r&   