
    BVh                        d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 d Zd	 Zd
 Zd Zd Zd Z	 d"dZd Zd Zd Zd"dZd Zd Zd"dZd"dZd Zd Zd"dZd Zd Zd"dZd"dZ 	 d"dZ!d Z"	 d"d Z#	 d"d!Z$y)#zIUtilities to construct a TF subgraph implementing distributed All-Reduce.    N)device)ops)	array_ops)math_ops)nccl_opsc                    | st        d      | d   j                  }| D ]  }|j                  |j                        } |j                         st        d      t	        |      dk7  rOg }| D ]F  }t        j                  |      5  |j                  t        j                  |dg             ddd       H |} | |fS # 1 sw Y   XxY w)ai  Check tensors for isomorphism and flatten.

  Args:
    tensors: list of `tf.Tensor` which must all have the same shape.

  Returns:
    tensors: a list of `tf.Tensor` which are flattened (1D) views of tensors
    shape: the original shape of each element of input tensors

  Raises:
    ValueError: tensors are empty or non-isomorphic or have unknown shape.
  tensors cannot be emptyr   z)Tensors must have statically known shape.   N)

ValueErrorshape
merge_withis_fully_definedlenr   colocate_withappendr   reshape)tensorsr   tensorreshapedts        Z/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/v1/all_reduce.py_flatten_tensorsr      s     

.
//
!*

% +fV\\*E+				!
@
AAZ1_H 4Q 4	))!bT234 44 G	%4 4s   'B;;C	c                     g }| D ]E  }t        j                  |      5  |j                  t        j                  ||             ddd       G |S # 1 sw Y   SxY w)a=  Reshape tensors flattened by _flatten_tensors.

  Args:
    tensors: list of `tf.Tensor` of identical length 1D tensors.
    shape: list of integers describing the desired shape.  Product of
      the elements must equal the length of each tensor.

  Returns:
    list of `tf.Tensor` which are the reshaped inputs.
  N)r   r   r   r   r   )r   r   r   r   s       r   _reshape_tensorsr   8   s`     ( 3a			1	 3ooi''5123 33 
/3 3s   &AA	c                    | j                   }dt        |      k7  rt        d      |j                  d   j                  }t        j                  |       5  ||z  dk7  rld||z  z   }||kD  r`||z
  }t        j                  | t        j                  |g| j                        gd      }t        j                  ||      }||fcddd       S |dz
  |z  |k\  rc||z  |z  }t        j                  | t        j                  |g| j                        gd      }t        j                  ||      }||fcddd       S ||dz
  |z  z
  }||z
  }t        |dz
        D 	cg c]  }	| c}	|gz   }
t        j                  | |
      }t        j                  |d   t        j                  |g| j                        gd      |d<   ||fcddd       S t        j                  | |      dfcddd       S c c}	w # 1 sw Y   yxY w)a  Like split for 1D tensors but pads-out case where len % pieces != 0.

  Args:
    tensor: `tf.Tensor` that must be 1D.
    pieces: a positive integer specifying the number of pieces into which
      tensor should be split.

  Returns:
    list of `tf.Tensor` of length pieces, which hold the values of
      thin input tensor, in order. The final tensor may
      be zero-padded on the end to make its size equal to those of all
      of the other tensors.

  Raises:
    ValueError: The input tensor is not 1D.
  r
   input tensor must be 1Dr   )dtypeNr   )r   r   r   dimsvaluer   r   r   concatzerosr   splitrange)r   piecesr   
tensor_len
chunk_sizepad_lenextended_wholepartslast_chunk_size_
piece_lenss              r   _padded_splitr.   J   s   " ,,%#e*_
.
//zz!}""*
  0FazV++j	*	 :%"))Y__gYfllCDaI7g~0 0 QJ*$
2J&*4"))Y__gYfllCDaI7g~'0 0* %
j'@@.*/
*;<Qj<?PP

3$$2Y		FGLb	g~70 0: __VV,a/;0 0. =/0 0s3   A-G0A$G09 G0	G+"AG0
G0+G00G9c           	      h   | st        d      | d   j                  }t        |      dkD  rt        d      t        |d   |z
        }|dk  rt        d      g }| D ]H  }t	        j
                  |      5  |j                  t        j                  |dg|g             ddd       J |S # 1 sw Y   VxY w)a  Strip the suffix padding added by _padded_split.

  Args:
    tensors: list of `tf.Tensor` of identical length 1D tensors.
    pad_len: number of elements to be stripped from the end of each tensor.

  Returns:
    list of `tf.Tensor` which are the stripped inputs.

  Raises:
    ValueError: tensors must be a non-empty list of 1D tensors, and
      each must be longer than pad_len.
  r	   r   r
   ztensors must be 1Dzpad_len longer than tensorN)	r   r   r   intr   r   r   r   slice)r   r(   r   
prefix_lenstrippedr   s         r   _strip_paddingr4      s     

.
//
!*

%Z!^
)
**58g%&*!^
1
22( =a			1	 =ooiooa!zl;<= == 
/= =s   3)B((B1	c                    | j                   }dt        |      k7  rt        d      |j                  d   j                  }||z  }t        j                  |       5  |||z  k7  rX|dkD  sJ ||dz
  |z  z
  }|dkD  sJ t        |dz
        D cg c]  }| c}|gz   }t        j                  | |      cddd       S t        j                  | |      cddd       S c c}w # 1 sw Y   yxY w)a  Like split for 1D tensors but allows case where len % pieces != 0.

  Args:
    tensor: `tf.Tensor` that must be 1D.
    pieces: a positive integer specifying the number of pieces into which
      tensor should be split.

  Returns:
    list of `tf.Tensor` of length pieces, which hold the values of
      the input tensor, in order. The final tensor may be shorter
      than the others, which will all be of equal length.

  Raises:
    ValueError: input tensor must be 1D.
  r
   r   r   N)
r   r   r   r   r    r   r   r$   r   r#   )r   r%   r   r&   r'   r+   r,   r-   s           r   _ragged_splitr6      s      ,,%#e*_
.
//zz!}""*V#*
  	-fz)*aZZ"vzZ&?@oq   (-fqj(9:1J:o=NNj__VZ0	- 	- __VV,	- 	- ;	- 	-s$   2C	CC:CCC'c                    t        |      }| |z  }|dk(  rg g fS ||kD  rt        d||fz        t        dt        ||z              }g }t	        d|      D ]P  }g }||z  }	t	        d|       D ]'  }
|D cg c]
  }|
|z  |z    }}||	d |d|	 z   }||z  }) |j                  |       R t	        d|      D cg c]  }t	        d|      D cg c]  }d c} }}}t	        d|      D cg c]  }t	        d|      D cg c]  }d c} }}}t	        d|      D ]P  }t	        d|      D ]?  }t	        d|      D ].  }|||   |   k(  s|||   |<   ||   ||z   dz
  |z     ||   |<    ? A R ||fS c c}w c c}w c c}}w c c}w c c}}w )a  "Generate an array of device index arrays, one for each subchunk.

  In the basic ring reduction algorithm there are size(T)/num_devices
  data chunks and each device process one chunk per tick, i.e. sending
  one chunk and receiving one chunk.  The idea of subchunking is that
  each device processes num_subchunks smaller data regions per tick,
  and the ring rank permutation is different for each subchunk index
  so that a device is potentially sending to and receiving from
  num_subchunks different other devices at each tick.  Where multiple
  independent data channels exist between devices, this strategy
  supplies a method of using them in parallel.

  Args:
    num_workers: number of worker tasks
    num_subchunks: number of subchunks into which to divide each per-GPU chunk.
    gpu_perm: an array of integers in [0, num_gpus-1] giving the default
      ring order of GPUs at each worker.  Other permutations will be generated
      by rotating this array and splicing together per-worker instances.

  Raises:
    ValueError: the number of subchunks may not exceed the number of GPUs.

  Returns:
    pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to
        preceding device in the permutation for that subchunk.  The
        device index of GPU i at worker j is i + (j * num_gpus).
    rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to
       local rank of device d in the permutation for that subchunk.
  r   z'num_subchunks %d must be <= num_gpus %dr
   Nr   )r   r   maxr0   r$   r   )num_workersnum_subchunksgpu_permnum_gpusdevicesrotation_interval
perms_by_ss
full_orderoffsetwidefault_order	dev_orderdpred_by_s_drank_by_s_dr   s                     r   _ring_permutationsrJ      s   < ](("'\r6MX
1]H4MMO O!SM!9:;*M" "aJ""F1k" 3;<aH)<m<(=&+AAiIj j!"  =13 $Aw/0"0 3+ 3  =13 $Aw/0"0 3+ 3M" a1g Q  !
1a  +a.
(mQ[1_,GH+a.

	 {	## = 1 30 3s6   2E17E;
	E6E;*F=	FF6E;Fc                 b   t        |       dk  rt        d      t        |       \  } }| D cg c]  }|j                   }}t	        |||      \  }	}
t        | |||	|
|      \  }}|rt        ||      }t        |	|
|      }|dkD  rt        ||      }t        |      dk7  rt        ||      }|S c c}w )av  Construct a subgraph performing a ring-style all-reduce of input_tensors.

  Args:
    input_tensors: a list of `tf.Tensor` objects, which must all
      have the same shape and type.
    num_workers: number of worker tasks spanned by input_tensors.
    num_subchunks: number of subchunks each device should process in one tick.
    gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at
      each worker.  All workers must have the same number of
      GPUs with the same rank ordering.  If NVLINK is available, this should
      be a ring order supported by NVLINK edges.
    red_op: a binary operator for elementwise reduction.
    un_op: an optional unary operator to apply to fully reduced values.

  Raises:
    ValueError: empty input_tensors or they don't all have same
    size.

  Returns:
    a list of `tf.Tensor` identical sum-reductions of input_tensors.
     z(input_tensors must be length 2 or longerr   r
   )
r   r   r   r   rJ   _build_ring_gather_apply_unary_to_chunks_build_ring_scatterr4   r   )input_tensorsr9   r:   r;   red_opun_opr   r   r=   rH   rI   chunks_by_devr(   output_tensorss                 r   build_ring_all_reducerU      s    . 	!
?
@@)-8-,-!QXX-'-1=( ,;-W[+v7- *5-@M&{K'46.q[#NG<NZ1_%ne<N	 .s   B,c           	      |   t        |       }|dk(  rg S |dk(  r| S | d   j                  }dt        |      k7  rt        d      ||z  }|dz
  }	g }
d}t        d|      D ]F  }t	        j
                  ||         5  t        | |   |      \  }}|
j                  |       ddd       H t        d|	      D ]  }t        d|      D cg c]  }d }}t        d|      D ]s  }t	        j
                  ||         5  t        d|      D ]@  }||   |   }||z   d|z   z
  |z  }||   |   }||z  |z   } ||
|   |   |
|   |         ||<   B 	 ddd       u t        d|      D ]<  }t        d|      D ]+  }||   |   }||z   d|z   z
  |z  }||z  |z   }||   |
|   |<   - >  |
|fS # 1 sw Y   NxY wc c}w # 1 sw Y   xY w)a  Construct a subgraph for the first (reduction) pass of ring all-reduce.

  Args:
    input_tensors: a list of `tf.Tensor` 1D input tensors of same
      shape and type.
    devices: array of device name strings
    num_subchunks: number of subchunks each device should process in one tick.
    pred_by_s_d: as produced by _ring_permutations
    rank_by_s_d: as produced by _ring_permutations
    red_op: a binary operator for elementwise reduction

  Raises:
    ValueError: tensors must all be one dimensional.

  Returns:
    list of list of `tf.Tensor` of (partially) reduced values where
    exactly num_subchunks chunks at each device are fully reduced.
  r   r
   zinput tensors must be 1DNrL   )r   r   r   r$   r   r   r.   r   )rP   r=   r:   rH   rI   rQ   num_devicesr   
num_chunks	num_ticksrS   split_pad_lenrG   splitstickr,   new_partial_reductionsr@   rank	seg_indexpred_devchunk_indexs                         r   rM   rM   %  sV   ( M"+AIA


 
 %#e*_
/
00]**Ao)--K  #a	GAJ	 #+M!,<jIfm6"# ##
 Ay! Ld,1!Z,@AqdAA1k" 	-::gaj! -q-( 	-AQ"$k)QX6+E) ^A&("]2a7+06H%k2A{+1-
 
-	-- -	- 1k" LQ& L!1~a K'1t84C	 =0A5(>{(Ka%	LLL* 
	%%3# # B- -s   7$F 	F-4AF2 F*	2F;c           
          g }|D ]H  }t        j                  |d         5  |j                  |D cg c]
  } | |       c}       ddd       J |S c c}w # 1 sw Y   [xY w)a&  Apply a unary op to each tensor in chunks_by_dev, on same device.

  Args:
    f: a unary function over `tf.Tensor`.
    chunks_by_dev: list of lists of `tf.Tensor`.

  Returns:
    new list of lists of `tf.Tensor` with the same structure as
    chunks_by_dev containing the derived tensors.
  r   N)r   r   r   )frS   outputxr   s        r   rN   rN   c  sk     & 'a			1Q4	  'mm1%aQqT%&' '' 
- &' 's   AAAAA 	c                 ^   t        |      }t        |d         }d||z  k7  rt        d      t        ||z        }|dz
  }t        d|      D ]  }t        d|      D cg c]  }d }	}t        d|      D ]|  }
t	        j
                  ||
   d         5  t        d|      D ]F  }||   |
   }||z   d|z   z
  |z  }| |   |
   }||z  |z   }t        j                  ||   |         |	|<   H 	 ddd       ~ t        d|      D ]<  }
t        d|      D ]+  }||   |
   }||z   d|z   z
  |z  }||z  |z   }|	|   ||
   |<   - >  g }|D ]H  }t	        j
                  |d         5  |j                  t        j                  |d             ddd       J |S c c}w # 1 sw Y   ,xY w# 1 sw Y   hxY w)a  Construct subgraph for second (scatter) pass of ring all-reduce.

  Args:
    pred_by_s_d: as produced by _ring_permutations
    rank_by_s_d: as produced by _ring_permutations
    chunks_by_dev: list of list of `tf.Tensor` indexed by ints
      (device, chunk)

  Raises:
    ValueError: chunks_by_dev is not well-formed

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors, one
    at each device corresponding to the outer dimension of chunks_by_dev.
  r   zAExpect number of chunks per device to be divisible by num_devicesr
   N)
r   r   r0   r$   r   r   r   identityr   r!   )rH   rI   rS   rW   rX   r:   rY   r\   r,   passed_valuesrG   r@   r^   r_   r`   ra   rd   re   s                     r   rO   rO   u  s+   " M"+=#$**{
""
KM Mj;./-Ao)Ay! Cd#(J#78aT8M81k" 4]1-a01 4q-( 	4AQ"$k)QX6+E) ^A&("]2a7+'0'9'9H%k2(4-
$	44 44 1k" CQ& C!1~a K'1t84C	 =0A5(5k(Ba%	CCC$ & ,a			1Q4	  ,mmI$$Q*+, ,, 
-+ 94 4", ,s   	FAF&F#F #F,	c                     | D cg c]  }|j                    }}t        |       \  } }t        | ||      }|r|D cg c]
  } ||       }}t        ||      }t	        |      dk7  rt        ||      }|S c c}w c c}w )a  Construct a subgraph for recursive halving-doubling all-reduce.

  The recursive halving-doubling algorithm is described in
  (Thakur et al., 2015).

  The concept is to arrange the participating n devices in
  a linear sequence where devices exchange data pairwise
  with one other device in each round.  During the gather
  phase there are lg(n) rounds where devices exchange
  increasingly smaller sub-tensors with another device
  at increasingly greater distances, until at the top
  each device has 1/n of the fully reduced values.  During the
  scatter phase each device exchanges its fully reduced
  sub-tensor (which doubles in length at each round)
  with one other device at increasingly smaller distances
  until each device has all of the fully reduced values.

  Note: this preliminary version requires that len(input_tensors) be a
    power of 2.  TODO(tucker): relax this restriction.  Also, the
    number of elements in each tensor must be divisible by 2^h where h
    is the number of hops in each phase.  This will also be relaxed in
    the future with edge-case specific logic.

  Args:
    input_tensors: list of `tf.Tensor` to be elementwise reduced.
    red_op: a binary elementwise reduction Op.
    un_op: an optional unary elementwise Op to apply to reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors, one
    at each device of input_tensors.

  Raises:
    ValueError: num_devices not a power of 2, or tensor len not divisible
    by 2 the proper number of times.

  References:
    Optimization of Collective Communication Operations in MPICH:
      [Thakur et al., 2005]
      (https://journals.sagepub.com/doi/abs/10.1177/1094342005051521)
      ([pdf](http://wwwi10.lrr.in.tum.de/~gerndt/home/Teaching/HPCSeminar/mpich_multi_coll.pdf))
  r
   )r   r   _build_recursive_hd_gather_build_recursive_hd_scatterr   r   )rP   rQ   rR   r   r=   r   reduced_shardsrT   s           r   build_recursive_hd_all_reducerm     s    V  --!QXX-'-)-8--mWfM.
(671eAh7N7.~wG.Z1_%ne<N	 . 8s
   A4A9c                    t        |      }t        t        j                  |d            }|d|z  k7  rt	        d      | }t        d|      D ]  }d|z  }|dz  }|D 	cg c]  }	g  }
}	t        d|      D ]  }||z  |dz  k\  r||   }|||z      }t        j                  ||   d      }t        j                  |||z      d      }t        j                  |      5   ||d   |d         |
|<   ddd       t        j                  |      5   ||d   |d         |
||z   <   ddd        |
} |S c c}	w # 1 sw Y   IxY w# 1 sw Y   xY w)a  Construct the gather phase of recursive halving-doubling all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` to be elementwise reduced.
    devices: a list of strings naming the devices hosting input_tensors,
      which will also be used to host the (partial) reduction values.
    red_op: a binary elementwise reduction Op.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensor shards.

  Raises:
    ValueError: num_devices not a power of 2, or tensor len not divisible
    by 2 the proper number of times.
  rL    num_devices must be a power of 2r   Nr
   )
r   r0   mathlogr   r$   r   r#   r   r   )rP   r=   rQ   rW   num_hopschunkshspan
group_sizer,   
new_chunksrG   left_dev	right_dev
left_splitright_splits                   r   rj   rj     s|     G++q)*(Q(]#
7
88&H a6DJ%&"&J&1k" E
j.j1n	-h!d(#i??6!9a0jOOF1T6NA6k::h >z!}k!n=
1>::i  E%jm[^D
1t8E EE F!" 
- '> >E Es   	D0D5
E5D>E
c                 x   t        |      }t        t        j                  |d            }|d|z  k(  sJ d       | }t	        t        d|            D ]  }d|z  }|dz  }|D cg c]  }g  }	}t        d|      D ]  }
|
|z  |dz  k\  r|
}|
|z   }||   }||   }t        j                  |      5  t        j                  ||   ||   gd      |	|<   ddd       t        j                  |      5  t        j                  ||   ||   gd      |	|<   ddd        |	} |S c c}w # 1 sw Y   UxY w# 1 sw Y   xY w)aR  Construct the scatter phase of recursive halving-doubling all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` that are fully-reduced shards.
    devices: a list of strings naming the devices on which the reconstituted
      full tensors should be placed.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors.
  rL   ro   r   N)
r   r0   rp   rq   reversedr$   r   r   r   r!   )rP   r=   rW   rr   rs   rt   ru   rv   r,   rw   rG   left_idx	right_idxrx   ry   s                  r   rk   rk     s    G++q)*(	h	'K)KK	'&E!X&' a6DJ%&"&J&1k" I
j.j1n	-hd(i"h)$i::h H(//1A17	1B1DEF H
8H ::i  I ) 0 0&2B282C2EFG!I
9I II F%& 
-! 'H HI Is   "	D."D$-"D0$D-0D9c                     t        |       \  } }| D cg c]  }|j                   }}t        | |||      }t        ||      }t	        |      dk7  rt        ||      }|S c c}w )a  Construct a subgraph for shuffle all-reduce.

  Shuffle reduce is essentially the algorithm implemented when using
  parameter servers.  Suppose tensor length is n, there are d devices
  and g gather shards.  Each device sends a n/g length sub-tensor to
  each gather shard.  The gather shards perform a reduction across d
  fragments, then broadcast the result back to each device.  The
  devices then join the g fully reduced fragments they receive from
  the shards.  The gather shards could perform d-1 pairwise
  reductions, or one d-way reduction.  The first is better where
  reduction Op time is low compared to transmission time, the second
  better in the other case.

  Args:
    input_tensors: list of `tf.Tensor` values to be reduced.
    gather_devices: list of names of devices on which reduction shards
      should be placed.
    red_op: an n-array elementwise reduction Op
    un_op: optional elementwise unary Op to be applied to fully-reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors.
  r
   )r   r   _build_shuffle_gather_build_shuffle_scatterr   r   )	rP   gather_devicesrQ   rR   r   r   dst_devicesrl   rT   s	            r   build_shuffle_all_reducer   *  sk    0 *-8-#01a1+1()/8.).+F.Z1_%ne<N	 2s   Ac                 2   t        |       }t        |      }| d   j                  }t        |      dk7  rt        d      g }t        d|      D ]A  }t	        j
                  | |         5  |j                  t        | |   |             ddd       C g }	t        d|      D ]X  }t	        j                  ||         5  |D 
cg c]  }
|
|   	 }}
 ||      }|r ||      }|	j                  |       ddd       Z |	S # 1 sw Y   xY wc c}
w # 1 sw Y   wxY w)a  Construct the gather (concentrate and reduce) phase of shuffle all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` values to be reduced.
    gather_devices: list of names of devices on which reduction shards
      should be placed.
    red_op: the binary reduction Op
    un_op: optional elementwise unary Op to be applied to fully-reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced shards.

  Raises:
    ValueError: inputs not well-formed.
  r   r
   zinput_tensors must be 1DN)	r   r   r   r$   r   r   r   r6   r   )rP   r   rQ   rR   num_source_devicesnum_gather_devicesr   shards_by_sourcerG   rl   r@   values	red_shards                r   r   r   L  s9     =)>*


 
 %Z1_
/
00&' ?a			=+	, ?
a(*<
=?? ?? .&' 'a	N1%	& './!/f/.i	)$	I&' '' 
? ? 0' 's*   (C<:D?D%D<D	DD	c                     t        |      }g }t        d|      D ]H  }t        j                  ||         5  |j	                  t        j                  | d             ddd       J |S # 1 sw Y   VxY w)a  Build the scatter phase of shuffle all-reduce.

  Args:
    reduced_shards:  list of `tf.Tensor` fully reduced shards
    dst_devices: list of names of devices at which the fully-reduced value
      should be reconstituted.

  Returns:
    list of `tf.Tensor` scattered tensors.
  r   N)r   r$   r   r   r   r   r!   )rl   r   rW   out_tensorsrG   s        r   r   r   q  sv     K ++K  >a	KN	# >)).!<=> >> 
> >s   &A''A0	c                 z   t        |       }|t        |      k7  rt        d      t        j                         }t        j                         }t	        |      D ]  }t
        j                  j                  | |         }t        |d      r|j                  J d| |   z         |j                  xs d|j                  xs d|j                  f}||vr
g ||<   g ||<   ||   j                  | |          ||   j                  ||           t        |j                               t        |j                               fS )a]  Partition devices and values by common task.

  Args:
    devices: list of device name strings
    values: list of `tf.Tensor` of same length as devices.

  Returns:
    (per_task_devices, per_task_values) where both values are
    lists of lists with isomorphic structure: the outer list is
    indexed by task, and the inner list has length of the number
    of values belonging to that task.  per_task_devices contains
    the specific devices to which the values are local, and
    per_task_values contains the corresponding values.

  Raises:
    ValueError: devices must be same length as values.
  z#len(devices) must equal len(values)taskzfailed to parse device %s	localhostr   )r   r   collectionsOrderedDictr$   
device_lib
DeviceSpecfrom_stringhasattrr   jobreplicar   listr   )r=   r   rW   per_task_devicesper_task_valuesrG   d_specindexs           r   _split_by_taskr     s/   $ G+CK
:
;; ,,.++-/ 	-a""..wqz:F66"fkk&9</'!*<<UZZ&;(;!V[[IE$$ "u!oeU""71:.E!!&),	- &&(
)40F0F0H+I	JJ    c                    |t         j                  k(  rt        j                  |       }nt	        d|      |r@g }|D ]7  }t        j                  |      5  |j                   ||             ddd       9 |}|S # 1 sw Y   GxY w)a  Build a subgraph that does one full all-reduce, using NCCL.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    red_op: binary elementwise reduction operator. Must be one of
      {tf.add}
    un_op: optional unary elementwise Op to apply to fully-reduce values.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: red_op not supported.
  z)red_op not supported by NCCL all-reduce: N)r   addr   all_sumr   r   r   r   )rP   rQ   rR   rT   un_op_wrappedr   s         r   build_nccl_all_reducer     s      x||%%m4N
@&
II
M 'Q 'U1X&' '' #N	' 's   A::B	c                 J   t        |       \  } }| D cg c]  }|j                   }}t        ||       \  }}t        |      }t	        d|      D 	cg c]  }	d }
}	|
dd }|
dd }t	        d|      D ]  }	t        ||	   |      }t        j                  |      5  t        j                  |d   j                        5  t        j                  |d         |
|	<   ddd       ||	   d   ||	<   ddd         ||
      }t	        d|      D ]  }	g }t        j                  ||	   d         5  t        j                  t        j                  ||	               }ddd       ||	   D ]D  }t        j                  |      5  |j                  t        j                               ddd       F |||	<    |D cg c]  }|D ]  }|  }}}t        |      dk7  rt        ||      }|S c c}w c c}	w # 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY wc c}}w )a  Construct a subgraph for NCCL hybrid all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    red_op: binary elementwise reduction operator.
    upper_level_f: function for reducing one value per worker, across
      workers.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: inputs not well-formed.
  r   Nr
   )r   r   r   r   r$   r   r   control_dependenciesr   rg   r   	broadcastr   r   )rP   rQ   upper_level_fr   r   r=   per_worker_devicesper_worker_valuesr9   rC   	up_values
up_devicesdown_valuesworker_valueslevel_2_outputdst_tensorsbroadcast_srcrG   sublistvrT   s                        r   _build_nccl_hybridr     s?     *-8-,-!QXX-'-*8-*P''&'+"1k23t3)3|*!+K  /a)*;A*>GM 
	!	!-	0 /::mA&--. < ))-*:;	!<(+A.jm/ // !+.K  !aK	&q)!,	- P((););N1<M)NOmP" >::a= >9--m<=> >> !KN! &1B''BQABAB.BZ1_%ne<N	? . 4< </ /P P> > CsM   G#	G(#G:;G-G:(,H9%H4H-G72G::H	H	Hc                     t        |       dkD  r ||       S |s| S g }| D ]7  }t        j                  |      5  |j                   ||             ddd       9 |S # 1 sw Y   ExY w)z9If len(input_tensors) > 1, apply red_f, else apply un_op.r
   N)r   r   r   r   )rP   red_frR   rT   r   s        r   _reduce_non_singletonr     sy    !N (Q (eAh'( (( ( (s   AA$	c                 >    fdfd}t        | |      S )z=Construct hybrid of NCCL within workers, Ring across workers.c                 8    t        | t        |       dg      S Nr   rU   r   )yrQ   subdivrR   s    r   upper_builderz+build_nccl_then_ring.<locals>.upper_builder	  s     CFFQCGGr   c                     t        |       S Nr   )re   rR   r   s    r   r   z+build_nccl_then_ring.<locals>.upper_level_f  s     M599r   r   )rP   r   rQ   rR   r   r   s    ``` @r   build_nccl_then_ringr     s    H:	M6=	AAr   c                 ,    fd}t        | |      S )zEConstruct hybrid of NCCL within workers, Recursive-HD across workers.c                     t        |       S r   )rm   )re   rQ   rR   s    r   <lambda>z.build_nccl_then_recursive_hd.<locals>.<lambda>  s    9!VUK r   r   )rP   rQ   rR   r   s    `` r   build_nccl_then_recursive_hdr     s    K-	M6=	AAr   c                 0    fd}t        | ||      S )z@Construct hybrid of NCCL within workers, Shuffle across workers.c                      t        |       S r   r   )re   r   shuffle_red_oprR   s    r   r   z.build_nccl_then_shuffle.<locals>.upper_level_f  s    #A~~uMMr   r   )rP   r   nccl_red_opr   rR   r   s    ` `` r   build_nccl_then_shuffler     s    N 
M;	FFr   c                    t        |       \  } }| D cg c]  }|j                   }}t        ||       \  }}t        |      }	g }
t        |      |	k7  rt	        d      t        d|	      D ]*  }t        ||   ||   g|      }|
j                  |d          ,  ||
      }g }t        d|	      D ]  }|t        ||   g||         z  } t        |      dk7  rt        ||      }|S c c}w )a  Construct a subgraph for Shuffle hybrid all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    gather_devices: list of device names on which to host gather shards.
    red_op: binary elementwise reduction operator.
    upper_level_f: function for reducing one value per worker, across
      workers.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: inputs not well-formed.
  zGFor shuffle hybrid, gather_devices must contain one device per worker. r   r
   )
r   r   r   r   r   r$   r   r   r   r   )rP   r   rQ   r   r   r   r=   r   r   r9   r   rC   rl   r   rT   s                  r   _build_shuffle_hybridr     s#   " *-8-,-!QXX-'-*8-*P''&'+)K'
 + , ,K  (a*!~a016;N^A&'(
 !+..K  4a,		/24 4N4 	Z1_%ne<N	) .s   C#c                 @    fdfd}t        | |||      S )z@Construct hybrid of Shuffle within workers, Ring across workers.c                 8    t        | t        |       dg      S r   r   )r   rQ   r   rR   s    r   r   z.build_shuffle_then_ring.<locals>.upper_builderL  s#     #g,!'0 0r   c                     t        |       S r   r   r   rR   r   s    r   r   z.build_shuffle_then_ring.<locals>.upper_level_fO       -??r   r   )rP   r   r   red_n_oprQ   rR   r   r   s     ` `` @r   build_shuffle_then_ringr   I  s$    0@	^X}
> >r   c                 @    fdfd}t        | ||      S )zCConstruct hybrid of Shuffle within workers, Shuffle across workers.c                      t        |       S r   r   )r   rQ   second_gather_devicesrR   s    r   r   z1build_shuffle_then_shuffle.<locals>.upper_builderX  s    #G-B$*E3 3r   c                     t        |       S r   r   r   s    r   r   z1build_shuffle_then_shuffle.<locals>.upper_level_f[  r   r   r   )rP   first_gather_devicesr   rQ   rR   r   r   s     ``` @r   build_shuffle_then_shuffler   U  s'    3@	)6=
B Br   r   )%__doc__r   rp   tensorflow.python.frameworkr   r   r   tensorflow.python.opsr   r   r   r   r   r.   r4   r6   rJ   rU   rM   rN   rO   rm   rj   rk   r   r   r   r   r   r   r   r   r   r   r   r   r    r   r   <module>r      s    P   < + + * *:$20j:-B:$| 37(V;&|$.b3l&R"JD"J&"KJ:0fBB 37G'V 59	> EI	Br   