
    Vh6                     |   d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlZd dl	m
c mZ d dlm
c mc mZ d dlmZ d dlmZmZ d dlmZmZmZmZmZmZmZ  e j<                  e      Z  ejB                         s"ejD                  jG                  d      d	        Z$nd dl%Z% e%jL                  d
       d Z'	 	 d.d ddejP                  de)ejP                     dede*de+de*dee   fdZ,	 	 d.d ddejP                  dede*de+de*dee   fdZ-dejP                  de*de*dejP                  fdZ.dejP                  de*de*dejP                  fdZ/de)ejP                     de*de*de)ejP                     fdZ0	 d/ded   fd Z1d!d"de*fd#Z2e G d$ d%             Z3d&e4d'e3de*de4fd(Z5d&e4d'e3de*de4fd)Z6d&e4d'e3de*de4fd*Z7d+d"d,d"de4fd-Z8y)0    N)	dataclass)	lru_cache)Optional)_resolve_process_group)_mesh_resources
DeviceMesh)_get_group_size_by_name	broadcastget_group_rankget_rankProcessGroupscatterWorkz_dtensor::shard_dim_alltoallc                 (   t        |      }t        |      D cg c]  }t        j                  |        }}t	        |      }t        |t                     }t        j                  ||      j                  ||      |   j                         S c c}w )Ndim)
r	   rangetorch
empty_liker   r   r   catchunk
contiguous)	input
gather_dim	shard_dim
group_name
group_size_stacked_listgroup
group_ranks	            Z/home/dcms/DCMS/lib/python3.12/site-packages/torch/distributed/tensor/_collective_utils.py_shard_dim_alltoall_metar#      s~    ,Z8
9>z9JKA((/KK&z2#E8:6
 IIl
3U:9U-j:Z\	
	 Ls   BzJPyTorch Distributed functional collectives do not work with torch::deploy.c                    |j                   dk(  rt        j                  d       t        j                  | |||f      }t        |t        j                        r|j                         }t        j                  ||j                  |      |      |j                  |         }|j                         S t        j                  ||f      }t        j                  j                  j!                  | |||      S )NcpuzUCPU process group does not support alltoall yet, falling back with allgather + chunk!r   )device_typeloggerwarningfuncolall_gather_tensor
isinstanceAsyncCollectiveTensorwaitr   r   sizeget_local_rankr   _resolve_group_nameops_dtensorshard_dim_alltoall)r   r   r   meshmesh_dimoutr   s          r"   r3   r3   2   s    5  	c	
 &&uj4:JKc6778((*Ckk#tyy2	B)
 ~~++T8,<=J9900z9j     )	group_srcoutputscatter_listr4   r5   async_opr8   returnc                    | j                   ry|j                  |      }t        |t              sJ |t	        |      k(  rt        | ||||      }|S t        | d|||      }|S )aZ  
    scatter a list of tensors to a device mesh dimension. We by default
    use the first rank of the mesh dimension as the source of truth, i.e
    for a 2d mesh [[0, 1], [2, 3]], if we scatter on mesh_dim = 1, we will
    scatter the tensor list on rank 0 to rank 0/1, and tensor list on rank
    2 to rank 2/3.

    Args:
        output (torch.Tensor): the tensor to receive the scattered list.
        scatter_list (List[torch.Tensor]): the tensor list to be scattered.
        mesh_dim (int, optional): indicate which mesh dimension we want
            to scatter on, we by default choose the first rank on the
            mesh dimension as source of truth.

    Keyword args:
        group_src (int, optional): the group rank of the source data for the
        logical/global tensor, on the specific mesh dimension. By default, we
        use ``group_rank=0`` on each DeviceMesh dimension as the source data
        to preserve the single-device semantic. If passing ``None`` explicitly,
        this method simply uses its local data with no communication.

    Returns:
        A :class:`Work` object
    N)r:   r    r;   r8   )is_meta	get_groupr+   r   r   r   )r9   r:   r4   r5   r;   r8   	dim_groupfuts           r"   mesh_scatterrB   J   s|    J ~~x(Ii...HY''%
  J 
 Jr7   tensorc                    | j                   ry|j                  |      }t        |t              sJ t	        | |||      S )a  
    broadcast the tensor to a device mesh dimension. We by default
    use the first rank of the mesh dimension as the source of truth, i.e
    for a 2d mesh [[0, 1], [2, 3]], if we broadcast on mesh_dim = 1, we will
    broadcast the tensor on rank 0 to rank 0/1, and tensor on rank 2
    to rank 2/3.

    Args:
        tensor (torch.Tensor): tensor to broadcast.
        mesh_dim (int, optional): indicate which mesh dimension we want
            to scatter on, we by default choose the first rank on the
            mesh dimension as source of truth.

    Keyword args:
        group_src (int, optional): the group rank of the source data for the
        logical/global tensor, on the specific mesh dimension. By default, we
        use ``group_rank=0`` on each DeviceMesh dimension as the source data
        to preserve the single-device semantic. If passing ``None`` explicitly,
        this method simply uses its local data with no communication.

    Returns:
        A :class:`Work` object
    N)r    r;   r8   )r>   r?   r+   r   r
   )rC   r4   r5   r;   r8   r@   s         r"   mesh_broadcastrE      s>    F ~~x(Ii...V9x9UUr7   pad_dimpad_sizec                     |dk(  r| S ddg| j                   |z
  z  }||d<   t        j                  j                  j	                  | |      S )Nr   )ndimr   nn
functionalpad)rC   rF   rG   rM   s       r"   
pad_tensorrN      sK    1}a&FKK')
*CCG88""63//r7   c                 \    |dk(  r| S | j                  |d| j                  |      |z
        S )Nr   )startlength)narrowr.   )rC   rF   rG   s      r"   unpad_tensorrS      s;    1}=={{7#h.   r7   shardsr   num_empty_tensorsc                    |dk(  r| S t        | d   j                               }t        |      D cg c]  \  }}||k7  r|nd }}}| d   j                  |      | j	                  fdt        |      D               | S c c}}w )Nr   c              3   "   K   | ]  }  y wN ).0r   rC   s     r"   	<genexpr>z.fill_empty_tensor_to_shards.<locals>.<genexpr>   s     ;Q&;s   )listr.   	enumerate	new_zerosextendr   )rT   r   rU   tensor_sizeidxr.   rC   s         @r"   fill_empty_tensor_to_shardsrb      s     Avay~~'(K9B;9O,5Cy a'K  AY  -F
MM;%(9":;;Ms   Bzdtensor_spec.TensorMetac                    | j                   | j                  d|r,j                  | j                  | j	                         d       t        t        j                  j                               D cg c]  }d  }}t        j                  j                  |       t        fd|D              st        d      y c c}w )N)dtyperequires_grad)shapestridec              3   (   K   | ]	  }|k(    y wrX   rY   )rZ   metalocal_metadatas     r"   r[   z$check_tensor_meta.<locals>.<genexpr>   s     D$t~%Ds   zGInconsistent tensor metadata (including shape and stride) across ranks.)rd   re   updaterf   rg   r   r   distributedget_world_sizeall_gather_objectall
ValueError)local_tensorcheck_shape_strider   gathered_metadatarj   s       @r"   check_tensor_metart      s     ##%33N
 "((L4G4G4IJ	
 (-U->->-M-M-O'PQ!QQ	''(9>J D2CDDU
 	
  Rs   2	B=speczdtensor_spec.DTensorSpecc                     | j                   J d       | j                   j                  j                  t        j                  | j
                        z  S )Nz%spec should have tensor meta defined!)tensor_metard   itemsizemathprodrf   )ru   s    r"   spec_to_bytesr{      sD    'P)PP'!!**TYYtzz-BBBr7   c                   |    e Zd ZU dZeed<   ee   ed<   ee   ed<   ee   ed<   e	 e
d      dedd fd              Zy)	MeshTopoInfoz9
    Mesh information for collective cost estimation
    r4   mesh_dim_devicesmesh_dim_bandwidthmesh_dim_latencyNr<   c                 p   t        j                  | j                        }d}|g| j                  z  }dg| j                  z  }dg| j                  z  }d}t	        t        | j                              D ]5  }| j                  |      }|||<   ||z  }||kD  s$||xx   dz  cc<   d||<   7 t        | |||      S )NgU@g333333?   g)\(?g@)r   num_devices_per_hostr&   rJ   reversedr   r.   r}   )	r4   r   base_bwr   r   r~   total_num_devicesr5   num_devicess	            r"   build_from_meshzMeshTopoInfo.build_from_mesh   s      /CCDDTDTU%Y25499,3? tyy!12 	1H))H-K)4X&, #77
 #8,4,-0 *	1 "$68H
 	
r7   )__name__
__module____qualname____doc__r   __annotations__r\   intfloatstaticmethodr   r   rY   r7   r"   r}   r}      sY     3iU#5k!t_
j 
^ 
  
r7   r}   bytes_gb	mesh_topoc                     |j                   |   }|j                  |   }|dz
  }d||j                  |   z  z   }| |z  |z  |z  }||dz  z   S Nr   ffffff@    .Ar~   r   r   r   r   r5   num_devices_on_mesh_dimr   num_hopslatencybws           r"   allgather_costr     sm    '88B"55h?&*HHy99(CCCG
X
 7
7;M	MBR#Xr7   c                     |j                   |   }|j                  |   }d|dz
  z  }d||j                  |   z  z   }| |z  |z  |z  }||dz  z   S )N   r   r   r   r   r   s           r"   allreduce_costr   (  ss    '88B"55h?+a/0HHy99(CCCG
X
 7
7;M	MBR#Xr7   c                     |j                   |   }|j                  |   }|dz
  }d||j                  |   z  z   }| |z  |z  |z  }||dz  z   S r   r   r   s           r"   reduce_scatter_costr   3  so    
 (88B"55h?&*HHy99(CCCG
X
 7
7;M	MBR#Xr7   current_spectarget_specc                    | j                   |j                   k7  rt        d      S | j                         ryt        j	                  | j                         }d}t        |       | j                  z  dz  dz  dz  }t        t        | j                  |j                              D ]  \  }\  }}||k(  r|j                  |   }|j                         r&|j                         r||z  }|t        |||      z  }U|j                         r$|j                         r|t        |||      dz   z  }|j                         r!|j                         r|t        |||      z  }|j                         r&|j                         r|t!        |||      z  }||z  }|j                         s|j                         st        d      c S  |S )a  
    This function returns the cost of redistribute from current to target DTensorSpec.

    NOTE:
    1. Only consider communication cost here, since computation costs for redistribute
       are quite trival (i.e. we only need to narrow or simple division)
    2. Only consider redistribute cost on same mesh, cross mesh communication cost is
       not quite needed for operator strategy estimation/selection.
    infg        i   g      ?)r4   r   is_replicatedr}   r   r{   
num_shardsr]   zip
placementsr~   is_shardis_replicater   
is_partialr   r   )	r   r   r   costcomm_bytes_gbicurrenttargetr   s	            r"   redistribute_costr   A  s    K,,, U|!!# ,,\->->?IDl#l&=&==DtKdR  !*L##[%;%;<!  GV f"+"<"<Q"?&"5"5"744MN=)Q??DFOO$5 N=)Q?#EED!f&9&9&;N=)Q??D!foo&7'y!DDD44MF$5$5$7 <7 : Kr7   )r   F)F)9loggingry   dataclassesr   	functoolsr   typingr   r   )torch.distributed._functional_collectivesrl   _functional_collectivesr)   &torch.distributed.tensor._dtensor_specrC   _dtensor_specdtensor_spectorch._C._distributed_c10dr   torch.distributed.device_meshr   r   "torch.distributed.distributed_c10dr	   r
   r   r   r   r   r   	getLoggerr   r'   _running_with_deploylibraryregister_faker#   warningswarnr3   Tensorr\   r   boolrB   rE   rN   rS   rb   rt   r{   r}   r   r   r   r   r   rY   r7   r"   <module>r      s     !    : : = = = E   
		8	$ "u!!#
]]  !?@

 A

 HMMT
8 ; ;LL;u||$; ; 	;
 ; ; d^;B 	(V (VLL(V
(V (V 	(V (V d^(VV0u|| 0c 0S 0U\\ 0  s u|| +.CF	%,, &+'(0C2 Cs C
 *
 *
 *
ZU | s u U | s u   	<,<+< <r7   