
    Vh                     >   d dl Z d dlZd dlZd dlmZmZmZmZmZ d dl	Z	d dl
mZ d dlmc mZ d dlmZ d dlmZ ddlmZ 	 d dlmZ  e	j6                         rd Zn	 d d	lmZ 	 	 	 ee!e"   e!e!e"      ejF                  ee$de"f   e%f   Z&	 d Z'dpde	jP                  de"de&de%fdZ)dpde	jP                  de%de&de%fdZ*	 dpde	jP                  de"de&de%de	jP                  f
dZ+	 dpde	jP                  de"de&de%fdZ,	 dpde	jP                  de%de"de&de%f
dZ-	 dpde	jP                  de%de"de&de%f
dZ.	 dpde!e	jP                     de%de&de%de!e	jP                     f
dZ/	 dpde!e	jP                     de&de%de!e	jP                     fdZ0	 dpde!e	jP                     de%de!e"   de&de%de!e	jP                     fdZ1d  Z2	 dpde	jP                  d!ee!e"      d"ee!e"      de&de%de	jP                  fd#Z3	 dpde	jP                  d!ee!e"      d"ee!e"      de&de%de	jP                  fd$Z4	 dpde	jP                  d%e!e"   de&de%de	jP                  f
d&Z5 G d' d(e	jP                        Z6	 dpde&de%de$e%e!e"   e"f   fd)Z7dpde&de%de%fd*Z8 G d+ d,e	jr                  jt                        Z;de<fd-Z=de	jP                  fd.Z>e j~                  dqd/e<fd0       Z@d1 ZAd2 ZBd3 ZCd4 ZDd5 ZEd6 ZFd7 ZGd8 ZHd9 ZId: ZJd; ZKd< ZLd= ZMd> ZNd? ZOd@ ZPdA ZQ e	j6                         sPe	j                  j                  dBdC      ZTeTj                  dDeCdE       eTj                  dFeHdE       eTj                  dGeGdE       eTj                  dHeJdE       eTj                  dIeDdE       eTj                  dJeMdE       eTj                  dKeNdE       eTj                  dLeOdE       eTj                  dMePdE       eTj                  dNeQdE       eTj                  dOeLdE       eTj                  dPeBdE       eTj                  dQeIdE       e	j                  j                  j                  e	j                  j                  jN                  j                         e	j                  j                  j                  e	j                  j                  jN                         e	j                  j                  dRdS      Z\e	j                  j                  dRdC      Z]g dTZ^ej                  e`   Zae^D ]\  Zbebd ebj                  dU       Zd eeedVed       Zfe\j                  ebe	j                  j                  W       e]j                  edefdX       ^ n ej@                  dY       	 	 	 	 	 drdZe	jP                  d[e	jP                  d\e<de%de"f
d]Zj	 	 	 	 	 dsd_e	jP                  d`e	jP                  dae%d\e<de"de%fdbZkej                  j                  d^ej                  j                  dcej                  j                  ddej                  j                  deej                  j                  dfej                  j                  dgej                  j                  dhej                  j                  diiZu	 	 	 	 dtdje	jP                  dae%d\e<de%fdkZv	 	 	 	 	 dud_e	jP                  d`e	jP                  de%fdlZw	 	 	 dvdme!e	jP                     dje	jP                  de%fdnZxd dolmyZzm{Z|m}Z~mZm*Zm3Zm-Z eejeekeeveewe~exe|ekezejiZy# e$ r
 d dlmZ Y w xY w# e$ r  ej@                  d
       d ZY w xY w)w    N)AnycastOptionalTYPE_CHECKINGUnion)
DeviceMesh)get_proxy_mode   )_functional_collectives_impl)tree_map_onlyc                       y)z9Can't import torchdynamo in torchdeploy builds currently.F r       Y/home/dcms/DCMS/lib/python3.12/site-packages/torch/distributed/_functional_collectives.pyis_torchdynamo_compilingr      s    r   )is_dynamo_compilingzdUnable to import torchdynamo util `is_torchdynamo_compiling`, so won't support torchdynamo correctlyc                       y)NFr   r   r   r   r   r   $   s    r   zdist.tensor.DeviceMeshc                 T    t         j                  j                  j                  |       S )z
    Wait on a tensor returned by the collectives ops.

    Waiting follows device semantics, which means blocking on CPU and synchronizing streams on CUDA.
    )torchops_c10d_functionalwait_tensor)tensors    r   r   r      s     99%%11&99r   selfsrcgrouptagc                     t        ||      }t        j                  j                  j	                  | ||      }t        |      S )a  
    Broadcasts the tensor to all processes in the given process group.

    Args:
        src (int): Source rank
        group (ProcessGroup or List[int]): The process group to work on.
        tag (str, optional): A unique identifier for the collective. Default: empty string
    )_resolve_group_namer   r   r   	broadcast_maybe_wrap_tensor)r   r   r   r   
group_namer   s         r   r    r       s9     %UC0JYY''11$ZHFf%%r   reduceOpc                     t        ||      }t        j                  j                  j	                  | |j                         |      }t        |      S )a  
    Reduces the tensor data across all machines in such a way that all get
    the final result.

    The input tensor is left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )r   r   r   r   
all_reducelowerr!   )r   r#   r   r   r"   r   s         r   r%   r%      s@    " %UC0JYY''2249I:VFf%%r   
gather_dimreturnc                 |   | j                         sJ t        ||      }t        j                  |      }t        j
                  j                  j                  | ||      }t        |      }|dk7  rMt        |t              r|j                         }t	        j                  t	        j                  ||d      |      }|S )a%  
    Gather tensor data across from all machines and concatenate over ``gather_dim``.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   dim)is_contiguousr   c10d_get_group_size_by_namer   r   r   all_gather_into_tensorr!   
isinstanceAsyncCollectiveTensorwaitcatchunkr   r'   r   r   r"   
group_sizer   ress           r   all_gather_tensorr8      s    , $UC0J--j9JYY''>>j*F V
$CQ c01((*CiiC;LJr   c                 l   t        ||      }t        j                  |      }t        j                  j
                  j                  | ||      }t        j                  |      }|dk7  rMt        |t              r|j                         }t        j                  t        j                  ||d      |      }|S )a<  
    Gather tensor data across from all machines and concatenate over ``gather_dim``.

    Note that it currently only supports gather_dim = 0.

    This function is the same as all_gather_tensor but will propagate the
    backwards gradient across workers.

    See all_gather_tensor for more details on usage.
    r   r*   )r   r-   r.   r   r   _c10d_functional_autogradr/   _FromTorchTensorapplyr0   r1   r2   r3   r4   r5   s           r   all_gather_tensor_autogradr=      s      %UC0J--j9JYY00GGj*F 
 
 
(CQ c01((*CiiC;LJr   scatter_dimc                    t        ||      }t        j                  |      }| j                  |      |z  dk(  sJ d| j                  d       d|        |dk7  r-t	        j
                  | ||      }t	        j                  |      } t        j                  j                  j                  | |j                         ||      }t        |      }	|	S )a(  
    Reduces the tensor data across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.


    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh
    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   input dimension 0 (" must be a multiple of group_size r*   )r   r-   r.   sizer   r4   r3   r   r   reduce_scatter_tensorr&   r!   
r   r#   r>   r   r   r"   r6   tensor_listr   r7   s
             r   rC   rC      s    , %UC0J--j9J99[!J.!3 
diil^+Mj\Z3 akk$
Dyy%YY''==	F V
$CJr   c                    t        ||      }t        j                  |      }| j                  |      |z  dk(  sJ d| j                  d       d|        |dk7  r-t	        j
                  | ||      }t	        j                  |      } t        j                  j                  j                  | |j                         ||      }t        j                  |      }	|	S )a|  
    Reduces the tensor data across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.

    This function is the same as reduce_scatter_tensor but will propagate the
    backwards gradient across workers.

    Currently only the "sum" reduceOp is supported.

    See reduce_scatter_tensor for more details on usage.
    r   r@   rA   r*   )r   r-   r.   rB   r   r4   r3   r   r:   rC   r&   r;   r<   rD   s
             r   reduce_scatter_tensor_autogradrG   &  s    & %UC0J--j9J99[!J.!3 
diil^+Mj\Z3 akk$
Dyy%YY00FF	F 
 
 
(CJr   c                     t        ||      }t        j                  j                  j	                  | |j                         |      }t        t        t        |            S )a  
    Reduces a list of tensors across all machines in such a way that all get
    the final result.

    The all tensors in the input list are left unmodified.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )	r   r   r   r   all_reduce_coalescedr&   listmapr!   )r   r#   r   r   r"   rE   s         r   rI   rI   M  sO    & %UC0J)),,AAK
 &455r   c                     t        ||      }t        j                  |      }t        j                  j
                  j                  | ||      }t        t        t        |            S )a  
    Gather a list of tensors across from all machines.

    Note that it currently only supports gather_dim = 0.

    The input tensor is left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    )
r   r-   r.   r   r   r    all_gather_into_tensor_coalescedrJ   rK   r!   )r   r   r   r"   r6   rE   s         r   rM   rM   i  sX    & %UC0J--j9J)),,MMK
 &455r   inputsc           
      :   t        ||      }t        j                  |      }t        |      t        |       k(  sJ t	        t        ||             D ]w  \  }\  }}	|	j                  |      |z  dk(  s"J d| d|	j                  |       d| d|        |dk7  sHt        j                  |	||      }
t        j                  |
      | |<   y t        j                  j                  j                  | |j                         ||      }
t        t        t         |
            S )a,  
    Reduces a list of tensors across all machines in such a way that all get
    the final result, then scatter the results to corresponding ranks.

    The input tensors are left unmodified.
    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    r   zinput dimension z (rA   z for tensor at index r*   )r   r-   r.   len	enumerateziprB   r   r4   r3   r   r   reduce_scatter_tensor_coalescedr&   rJ   rK   r!   )rN   r#   r>   r   r   r"   r6   idxr+   r   rE   s              r   rS   rS     s   , %UC0J--j9J{s6{***'K(@A 1]c6{{3*,1 	
se2fkk#&6%77YZdYeez{~z  A	
1 !8++fjcBK))K0F3K1 )),,LL	K &455r   c                    t        | t        j                  j                        sJ t        j                  j                  | j                         t        j                  j                        ry| j                  }t        |j                        dkD  r6|j                  d   }|j                  d uxr |j                  j                   S y )NFr   )r0   r   _ops
OpOverload_C%_dispatch_has_kernel_for_dispatch_keynameDispatchKeyCompositeImplicitAutograd_schemarP   	arguments
alias_infois_write)tgtschema	first_args      r   _is_view_oprd     s    c5::00111 xx55
E%%?? [[F
6q $$Q'	##4/U	8L8L8U8U4UU !r   output_split_sizesinput_split_sizesc                 l   |t        d |D              sJ |       |t        d |D              sJ |       t        ||      }t        j                  |      }||#||J d       | j                  d   |z  g|z  }|}t
        j                  j                  j                  | |||      }t        |      S )aC  
    Each process splits input tensor and then scatters the split list
    to all processes in a group. Then concatenate the received tensors from all
    the processes in the group and return single output tensor.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one dimension of the DeviceMesh

    :: N.B. If you pass a PG or a 1D list to perform a MPMD collective, the compiler won't be able to recover
    that information and perform collective algebraic optimization. Use other forms of input for that.
    c              3   \   K   | ]$  }t        |t        t        j                  f       & y wNr0   intr   SymInt.0rB   s     r   	<genexpr>z$all_to_all_single.<locals>.<genexpr>  %      
6:Jtc5<<01
   *,c              3   \   K   | ]$  }t        |t        t        j                  f       & y wri   rj   rm   s     r   ro   z$all_to_all_single.<locals>.<genexpr>  rp   rq   ^output_split_sizes and input_split_sizes must either be specified together or both set to Noner   )
allr   r-   r.   shaper   r   r   all_to_all_singler!   r   re   rf   r   r   r"   r6   r   s           r   rv   rv     s    , % 
>P
 
 		 
 $ 
>O
 
 		 
 %UC0J--j9J!%6%>!).?.G 	
5	
G #jjmz9:ZG.YY''99	F f%%r   c                    |t        d |D              sJ |       |t        d |D              sJ |       t        ||      }t        j                  |      }||#||J d       | j                  d   |z  g|z  }|}t
        j                  j                  j                  | |||      }t        j                  |      S )z:
    Same as all_to_all_single but supports autograd.
    c              3   \   K   | ]$  }t        |t        t        j                  f       & y wri   rj   rm   s     r   ro   z-all_to_all_single_autograd.<locals>.<genexpr>  rp   rq   c              3   \   K   | ]$  }t        |t        t        j                  f       & y wri   rj   rm   s     r   ro   z-all_to_all_single_autograd.<locals>.<genexpr>  rp   rq   rs   r   )rt   r   r-   r.   ru   r   r   r:   rv   r;   r<   rw   s           r   all_to_all_single_autogradr{     s    % 
>P
 
 		 
 $ 
>O
 
 		 
 %UC0J--j9J!%6%>!).?.G 	
5	
G #jjmz9:ZG.YY00BB	F !!&))r   src_dstc                 Z   t        ||      \  }}}t        j                  |||      }dg|z  }dg|z  }	t        |      D ]\  \  }
}|
t	        j
                  |      k(  r| j                         |	|<   |t	        j
                  |      k(  sJ| j                         ||
<   ^ t        | ||	||      S )a"  
    Permutes the elements of the tensor according to the given source/destination pairs. `src_dst` should
    be defined such that src_dst[m] == n means m sends to n.

    Group can be one of:
        List[int]: ranks participating in the collective.
        List[List[int]]: 2D mesh of ranks taking part of this collective in MPMD.
        ProcessGroup: Will perform a collective using the ranks and tag of the PG.
        DeviceMesh: Do a SPMD collective over all ranks of the mesh
        (DeviceMesh, int): Do a MPMD collective over one
    r   )_expand_groupr-   #_find_or_create_pg_by_ranks_and_tagrQ   distget_ranknumelrv   )r   r|   r   r   tranksetr6   local_pgre   rf   r   dsts               r   permute_tensorr     s    " +5#6Aw
777JOHz)j(g& 3S$--))%)ZZ\c"$--))&*jjls#	3 T#57H%QTUUr   c                       e Zd ZU dZej
                  ed<   eed<   ddgZe	dej
                  fd       Z
d Zd Ze	d        Z	 dd	ed
ee   fdZdefdZd Zdej
                  fdZd Zedd       Zd Zy)r1   a  
    A Tensor wrapper subclass that is used to trigger a call to wait
    prior to first use of the underlying tensor.
    Use it inside functional collective pytorch wrappers like the following:
    def functional_collective(self, group, tag):
        tag, rankset, group_size = _expand_group(group, tag)
        tensor = torch.ops.c10d_functional.{collective}(self, tag, rankset, group_size)
        return _maybe_wrap_tensor(tensor)
    elem	completedc           
         t         j                  j                  | |j                         |j	                         |j                         |j                  |j                  |j                  |j                        }||_
        d|_        |S )N)stridesstorage_offsetdtypelayoutdevicerequires_gradF)r   Tensor_make_wrapper_subclassrB   strider   r   r   r   r   r   r   )clsr   rs      r   __new__zAsyncCollectiveTensor.__new__G  so    LL//IIKKKM..0**;;;;,, 0 	
 r   c                     dgd fS Nr   r   r   s    r   __tensor_flatten__z(AsyncCollectiveTensor.__tensor_flatten__W  s    x~r   c                 >    | j                         j                         S ri   )trigger_waittolistr   s    r   r   zAsyncCollectiveTensor.tolistZ  s      "))++r   c                 *    |J | d   }t        |      S r   )r1   )inner_tensorsmeta
outer_sizeouter_strider   s        r   __tensor_unflatten__z*AsyncCollectiveTensor.__tensor_unflatten__]  s!    ||V$$T**r   Nexpected_metadataexpected_typec                 H    |t         j                  ury | j                         S ri   )r   r   r   )r   r   r   s      r   #__coerce_same_metadata_as_tangent__z9AsyncCollectiveTensor.__coerce_same_metadata_as_tangent__c  s"     ,  ""r   r(   c                 *    d| j                          dS )NzAsyncCollectiveTensor())r   r   s    r   __repr__zAsyncCollectiveTensor.__repr__k  s    '(9(9(;'<A>>r   c                 n    | j                   st        | j                        }d| _         |S | j                  S NT)r   r   r   )r   outs     r   r   z"AsyncCollectiveTensor.trigger_waitn  s-    ~~dii(C!DNJ99r   c                 ,    t        | j                        S ri   )r   r   r   s    r   r2   zAsyncCollectiveTensor.waitv  s    499%%r   c                     | j                   S )zOThis method enables  _functional_collectives_impl to test if a tensor is an ACS)r   r   s    r   _get_acs_underlying_tensorz0AsyncCollectiveTensor._get_acs_underlying_tensory  s    yyr   c                    |t         j                  j                  j                  j                  k(  r& ||d   j
                  |d         }t        |      }|S t        |      dt        ffd}dt         j                  fd}t        t        ||      }	t        t        ||      }
 ||	i |
}rt        t         j                  ||      }|S )Nr   r
   ec                 @    s| j                         S | j                  S ri   )r   r   )r   
is_view_ops    r   unwrapz8AsyncCollectiveTensor.__torch_dispatch__.<locals>.unwrap  s    ~~''66Mr   c                 @    t        | t              rJ t        |       }|S ri   )r0   r1   )r   r7   s     r   wrapz6AsyncCollectiveTensor.__torch_dispatch__.<locals>.wrap  s"    !!%:;;;'*CJr   )
r   r   atenviewdefaultr   r1   rd   r   r   )r   functypesargskwargsr7   wrapper_resr   r   unwrapped_argsunwrapped_kwargsr   r   s               @r   __torch_dispatch__z(AsyncCollectiveTensor.__torch_dispatch__}  s    599>>&&... tAw||T!W-C/4K &
	+ 		ELL 	 ''<fdK()>O N7&67 dC8C
r   c                 >    | j                         j                         S ri   )r2   numpyr   s    r   r   zAsyncCollectiveTensor.numpy  s    yy{  ""r   ri   )r   N)__name__
__module____qualname____doc__r   r   __annotations__bool	__slots__staticmethodr   r   r   r   r   r   typer   strr   r   r2   r   classmethodr   r   r   r   r   r1   r1   7  s     ,,O%I5<<  , + + GK#!$#5=d^#?# ?&ell &    D#r   r1   c           	         t         rd }d }nd }d }t        | t              rt        | d   t              r^ ||       }g }d}|D ]K  }|j                  |       |dk7  r(|t	        |      k7  rt        d| dt	        |             t	        |      }M n1 ||       }t	        |      }nt        | t        j                        r:t        j                  |       }t	        |      }|xs t        j                  |       }nt        | t              r5| j                  d	k(  sJ d
       | j                  d   \  }}}t	        |      }nt        | t              rht	        |       dk(  rOt        | d   t              r<t        | d	   t              r)| d   }	| d	   }
|	j                  |
   \  }}}t	        |      }nt        d      t        d      |||fS )a5  
    _expand_group desugars the different RANK_TYPES types into a canonical format that is traceable.

    By having this be part of the explicit eager codepath, we avoid having to specialize behavior inside
    torchdynamo and can still interoperate with processgroup objects or other untraceable forms.
    c                 >    t        t        t        t              |       S ri   r   rJ   rk   xs    r   cast_listlistintz'_expand_group.<locals>.cast_listlistint  s    T#Y++r   c                 0    t        t        t           |       S ri   r   r   s    r   cast_listintz#_expand_group.<locals>.cast_listint  s    S	1%%r   c                     | S ri   r   r   s    r   r   z'_expand_group.<locals>.cast_listlistint      Hr   c                     | S ri   r   r   s    r   r   z#_expand_group.<locals>.cast_listint  r   r   r   z$group sizes must be identical found z and r
   JOnly 1D mesh is supported, pass in (DeviceMesh, int) together if mesh > 1D   1Invalid tuple for group must be (DeviceMesh, int)z[Invalid type for group, must be one of List, Processgroup, DeviceMesh or (DeviceMesh, int).)r   r0   rJ   extendrP   
ValueErrorr   ProcessGroupget_process_group_ranksr-   _get_group_tagr   ndim_dim_group_infostuplerk   )r   r   r   r   nested_listr   r6   rs_dmeshr+   s              r   r~   r~     s    	,	&		 %eAh%*51KGJ! %r"#
c"g(=$>zl%PSTVPWyY  !W
% #5)GWJ	E4,,	-..u5\
/T((/	E:	&zzQ 	
X	
  003Wa\
	E5	!J!O58Z058S)!HE(C#44S9OC!WJPQQi
 	
 *%%r   c                    t        | t        j                        r| j                  S t        | t              r| S t        | t
              r(| j                  dk(  sJ d       | j                  d   d   S t        | t              r[t        |       dk(  rBt        | d   t
              r/t        | d   t              r| d   }| d   }|j                  |   d   S t        d      t        | t              rQt               st        j                  dt         d       t#        j$                  t'        t        t           |       |      S t        d	t)        |        d
|        )z;
    Given group in RANK_TYPES, return the group name.
    r
   r   r   r   r   zThe combination of ranks + tag as process group identifier has been deprecated. Please switch to using ProcessGroup, DeviceMesh, or group name instead.   )
stacklevelzUnsupported group type: z, )r0   r   r   r"   r   r   r   r   r   rP   rk   r   rJ   r   warningswarnFutureWarningr-   $_resolve_group_name_by_ranks_and_tagr   r   )r   r   r   r+   s       r   r   r     s8    %**+	E3		E:	&zzQ 	
X	
 %%a(++	E5	!J!O58Z058S)!HE(C))#.q11PQQ	E4	 ')MMI  88d3i9OQTUU3DK=5'JKKr   c                       e Zd ZdZedej                  dej                  fd       Zedej                  dej                  fd       Zy)r;   zm
    _FromTorchTensor allows autograd to propagate from a normal Tensor to an
    AsyncCollectiveTensor.
    inputr(   c                     t        |      S ri   )r!   )ctxr   s     r   forwardz_FromTorchTensor.forward"  s    
 "%((r   grad_outputc                     |S ri   r   )r   r   s     r   backwardz_FromTorchTensor.backward)  s    r   N)	r   r   r   r   r   r   r   r   r   r   r   r   r;   r;     s_    
 )||) 
) ) 5<< ELL  r   r;   c                      t               ryt        j                  j                  t        j                  j                  j
                        yt               d uS r   )r   r   rX   _get_dispatch_mode_TorchDispatchModeKeyFAKEr	   r   r   r   _are_we_tracingr   .  sA    !xx""588#A#A#F#FGS4''r   c                 v    t               rt        |       S t        |       }t        t        j
                  |      S ri   )r   r   r1   r   r   r   )r   r7   s     r   r!   r!   7  s.    4  

%Cc""r   valuec              #   f  K   t         j                  j                  j                         }	 t         j                  j                  j	                  |        d t         j                  j                  j	                  |       y# t         j                  j                  j	                  |       w xY ww)aC  
    Context manager to temporarily set whether inflight collectives are allowed as torch.compile graph inputs.
    Common use case is when the collective is issued in eager (with `async_op=True`) but waited in compiled region:
    ```
    def all_reduce_eager(x):
        y = x * x
        req = dist.all_reduce(y, op=dist.ReduceOp.SUM, async_op=True)
        return y


    @torch.compile(fullgraph=True)
    def all_reduce_wait_compiled(y):
        torch.ops.c10d_functional.wait_tensor(y)
        return y * y


    x = torch.ones(1280, 1280, device="cuda") + self.rank
    # the context manager ensures that `wait_tensor(y)` will wait on the correct work object
    with allow_inflight_collective_as_graph_input_ctx():
        y = all_reduce_eager(x)
        z = all_reduce_wait_compiled(y)
    ```
    With this context manager, when a collective is called, under the hood the work object of the collective
    will be registered in the work registry, and the wait_tensor() in compiled region called on
    the output tensor of the collective will wait on the correct work object.
    N)r   rX   _distributed_c10d)_allow_inflight_collective_as_graph_input-_set_allow_inflight_collective_as_graph_input)r  previouss     r   ,allow_inflight_collective_as_graph_input_ctxr  >  sw     8 xx))SSUH
""PPQVW""PP	
""PP	
s   )B1-B *B1+B..B1c                 D    fd}| D cg c]
  } ||       c}S c c}w )Nc                 v    t        | j                               }|dxx   z  cc<   | j                  |      }|S Nr   rJ   rB   	new_empty)shardout_size
out_tensorr6   s      r   mk_out_tensorz=_all_gather_into_tensor_coalesced_meta.<locals>.mk_out_tensorf  s5    

%z!__X.
r   r   )r   r   r   r6   r  r   s      `  r   &_all_gather_into_tensor_coalesced_metar  e  s#     '++M!+++   c                 ,    t        j                  |       S ri   r   
empty_liker   r   s     r   _broadcast_metar  p      D!!r   c                 ,    t        j                  |       S ri   r  r  s     r   _all_reduce_metar  t  r  r   c                 ,    t        j                  |       S ri   r  r  s     r   _wait_tensor_metar  x  r  r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S r
  r  )r  r   r   r6   r  s        r   _all_gather_into_tensor_metar  |  s/    EJJL!HQK:K??8$$r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S r
  r  )r   	reduce_opr   r   r6   r  s         r   _reduce_scatter_tensor_metar!    s/    EJJL!HQKJK??8$$r   c                 R    | D cg c]  }t        j                  |       c}S c c}w ri   r  )r   r   r   s      r   _all_reduce_coalesced_metar#    s!    )-.AEQ...s   $c                     | S ri   r   inpr   s     r   _all_reduce__metar'        Jr   c                     | S ri   r   r%  s     r   _broadcast__metar*    r(  r   c                     | S ri   r   )rN   r   s     r   _all_reduce_coalesced__metar,    s    Mr   c                 D    fd}| D cg c]
  } ||       c}S c c}w )Nc                 v    t        | j                               }|dxx   z  cc<   | j                  |      }|S r
  r  )r   r  r  r6   s      r   r  z<_reduce_scatter_tensor_coalesced_meta.<locals>.mk_out_tensor  s5    

%
"__X.
r   r   )rN   r#   r   r   r6   r  r   s       `  r   %_reduce_scatter_tensor_coalesced_metar/    s#     '--M!---r  c                     || j                  | j                               S |D ]  }t        j                  |        t	        | j                               }t        |      |d<   | j                  |      S r
  )r  rB   r   _check_is_sizerJ   sum)r   re   rf   r   r   sr  s          r   _all_to_all_single_metar4    sj     !uzz|,,# 	$A  #	$

%,-x((r   c                p    t        | j                               }|dxx   |z  cc<   | j                  |      S r
  r  )r   r6   r"   r   ru   s        r   '_all_gather_into_tensor_out_native_metar6    /    E	!H
H??5!!r   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S r
  r  )r   r6   r"   ru   s       r   #_all_gather_into_tensor_native_metar9    r7  r   c                 B    | D cg c]  }t        |||       c}S c c}w ri   )r9  )rN   r6   r"   r   s       r   -_all_gather_into_tensor_coalesced_native_metar;    s-      	,E:zJ  s   c                 p    t        | j                               }|dxx   |z  cc<   | j                  |      S r
  r  )r&  r   r6   r"   ru   s        r   "_reduce_scatter_tensor_native_metar=    s/    E	!HH==r   c           	      D    | D cg c]  }t        ||||       c}S c c}w ri   )r=  )rN   r   r6   r"   r&  s        r   ,_reduce_scatter_tensor_coalesced_native_metar?    s/    
  	+3	:zR  s   r   IMPLr%   Metaall_reduce_rI   all_reduce_coalesced_r   all_gather_into_tensor_outr/   rM   rC   rS   rv   r    
broadcast_c10d_functionalDEF)	zObroadcast(Tensor self, int src, str tag, int[] ranks, int group_size) -> TensorzUall_reduce(Tensor self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzcall_reduce_coalesced(Tensor[] self, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]z"wait_tensor(Tensor self) -> TensorzTall_gather_into_tensor(Tensor shard, str tag, int[] ranks, int group_size) -> Tensorzball_gather_into_tensor_coalesced(Tensor[] input, str tag, int[] ranks, int group_size) -> Tensor[]zareduce_scatter_tensor(Tensor input, str reduceOp, str tag, int[] ranks, int group_size) -> Tensorzpreduce_scatter_tensor_coalesced(Tensor[] inputs, str reduceOp, str tag, int[] ranks, int group_size) -> Tensor[]zall_to_all_single(Tensor input, SymInt[]? output_split_sizes, SymInt[]? input_split_sizes, str tag, int[] ranks, int group_size) -> Tensor(r   )tagsr\   zJPyTorch Distributed functional collectives do not work with torch::deploy.output_tensorinput_tensorasync_opc                     |rJ d       |xs t         j                  j                  }|J | j                  t	        ||||            S N@Can't remap async version of inplace op to functional collective)r   r   WORLDcopy_r8   )rJ  rK  r   rL  r   r'   s         r   all_gather_tensor_inplacerR    sV      J< %TZZ%%E0z5RUVWWr   r2  outputr   opc           	          |rJ d       |xs t         j                  j                  }|J | j                  t	        |||||            S rN  )r   r   rP  rQ  rC   )rS  r   rT  r   rL  r>   r   s          r   reduce_scatter_tensor_inplacerV  1  sU      J< %TZZ%%E<<-eReSQRRr   avgproductminmaxbandborbxorr   c                     |rJ d       |xs t         j                  j                  }|J | j                  t	        | |||            S rN  )r   r   rP  rQ  r%   )r   rT  r   rL  r   s        r   all_reduce_inplacer_  P  sR      J< %TZZ%%E<<
62uc:;;r   c           	          |rJ d       |xs t         j                  j                  }|J | j                  t	        |||||            S rN  )r   r   rP  rQ  rv   )rS  r   re   rf   r   rL  r   s          r   all_to_all_inplacera  a  s`      J< %TZZ%%E<<	
 r   rE   c           	         |rJ d       t        fd| D              sJ d       |xs t        j                  j                  }|J t	        d||      }g }d}| D ]<  }|j                  ||||j                  d      z           ||j                  d      z  }> t        | |      D ]  \  }	}
|	j                  |
        | S )NrO  c              3   d   K   | ]'  }|j                  d       j                  d       k(   ) yw)r   N)rB   )rn   r   r   s     r   ro   z%all_gather_inplace.<locals>.<genexpr>  s&     @qqvvayFKKN*@s   -0z7Remapping variable size all_gather is not yet supportedr   )	rt   r   r   rP  r8   appendrB   rR   rQ  )rE   r   r   rL  r   rS  output_splitsoffsetr   r   r   s    `         r   all_gather_inplacerg  |  s      J< @K@@ A@ %TZZ%%Evq%5F MF VFVaffQi-?@A!&&) ]3 S		#r   )_all_gather_base_reduce_scatter_base
all_gatherr/   r%   rv   rC   ) )T)NFrk  r   )r2  NFr   rk  )r2  NFrk  )NNNFrk  )NFrk  )
contextlibsysr   typingr   r   r   r   r   r   torch.distributeddistributedr   "torch.distributed.distributed_c10ddistributed_c10dr-   torch.distributed.device_meshr   "torch.fx.experimental.proxy_tensorr	   rk  r   fun_col_impltorch.utils._cxx_pytreer   ImportErrortorch.utils._pytree_running_with_deployr   torch.compilerr   	Exceptionr   rJ   rk   r   r   r   
RANK_TYPESr   r   r    r%   r8   r=   rC   rG   rI   rM   rS   rd   rv   r{   r   r1   r~   r   autogradFunctionr;   r   r   r!   contextmanagerr  r  r  r  r  r  r!  r#  r'  r*  r,  r/  r4  r6  r9  r;  r=  r?  libraryLibrarylib_implimplfxnodehas_side_effectr   r   r   
legacy_liblegacy_lib_implops_defsmodulesr   	my_moduleop_defindexop_namegetattrbackend_impldefineTagpt2_compliant_tagrR  rV  ReduceOpSUMAVGPRODUCTMINMAXBANDBORBXORREDUCE_OP_TO_STRr_  ra  rg  rh  legacy_all_gather_baseri  legacy_reduce_scatter_baserj  legacy_all_gatherr/   legacy_allgatherlegacy_allreducelegacy_all_to_all_singlelegacy_reducescattertraceable_collective_remapsr   r   r   <module>r     se	    
  < <    1 1 4 = :25
 5
R#J
 IcO	
"C
'(	
::&ELL &s &: &C &&U\\ &S & &# &4 	$
,,$$ $ 
	$
 \\$V 	
,,  
	L '
,,'' ' 	'
 
'^ $
,,$$ $ 	$
 
$P LN6
u||
6(+64>6EH6	%,,6: =?6
u||
6%/6696	%,,6D )6)6)6 c)6 	)6
 
)6 
%,,)6\V( -&
,,-& c+-&  S	*-& 	-&
 
-& \\-&j "*
,,"* c+"*  S	*"* 	"*
 
"* \\"*R 	V
,,V#YV V 
	V
 \\V>j#ELL j#Z
H& H&# H&uS$s)S=P7Q H&V%Lz %L %LS %LPu~~.. $( (# # #
 #
 #
L,"""%%/.
)""  "u!!# }}$$%7@HMM, 0&9MM-!2F;MM(*DfMMM)+FOMM-!2F;MM$&Mv MM*,OQWXMM*5
 MM)+MvVMM)4
 MM%'>GMM+7MM, 0&9 
HHMM!!%))"<"<"H"H"P"PQ	HHMM!!%))"<"<"H"HI &&'8%@Jmm++,=vFO
H H%I QV\\#./|q	];&uyy'B'BCWl4OP	Q HMMT
 X<<X,,X 	X
 
X X* 
SLLS<<S 	S
 S S 
S( 	MMuMMuMM9MMuMMuMMMMuMM	  
<LL<< 	<
 
<( 
LL<< 
< ell#LL
 
>   /7(0) =5 m$  2112  r	
	s$   [, [> ,[;:[;>\\