
    Vh5                     F   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
 d dlmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- ddl.m/Z/m0Z0m1Z1 g dZ2 G d de      Z3 ede4      	 	 	 	 d-de"de+de	ejj                     de6de7d e	e%   d!e!fd"       Z8 ed#$      e/dddddd%de"d&e
e9ejt                  df   de	e+   d e	e%   de	ejj                     de7d!e!fd'              Z; ed#$      dddde3jx                  d(de"d&e
e9ejt                  df   de	e+   d e	e%   de	ejj                     d)e3d!efd*       Z=de"d!e"fd+Z>	 	 	 	 d-de"de+de	ejj                     de6de7d e	e%   d!e!fd,Z?y).    N)Future)Enum)castOptionalUnion)
deprecated)_copy_state_dict_create_cpu_state_dict)_AsyncCheckpointExecutor)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)MetadataSTATE_DICT_TYPE)SavePlanSavePlanner)AsyncStager)Stateful)StorageWriter)_get_default_group   )_api_bc_check_DistWrapper_profile)save_state_dictsave
async_saveAsyncCheckpointerTypec                       e Zd ZdZdZdZy)r    z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS     ]/home/dcms/DCMS/lib/python3.12/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr    r    '   s    +FGr+   r    za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |j                          t               5  t        | |||||      cddd       S # 1 sw Y   yxY w)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r.   r/   r0   r1   r2   r3   s         r,   r   r   .   sF      
 


 
 
s   5>T)log_exceptions)checkpoint_idr/   r3   r0   r2   r9   c          	      x   t         j                  j                  d       |xs, t        j                          xs t        j
                          }|rt        j                  d       t               5  t        t        t        ||d            }t        t        |       ||||      cddd       S # 1 sw Y   yxY w)a  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint without using cross-rank synchronization.
            (Default: ``False``)

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r.   r/   r0   r2   r3   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r7   _stateful_to_state_dict)r.   r9   r/   r3   r0   r2   s         r,   r   r   J   s    r 
HH  !DEQd//11Q4;N;N;P7PG~	
 
 
>.-PUV
  .z:)'

 
 
s   15B00B9)r9   r/   r3   r0   async_checkpointer_typerF   c                   t         j                  j                  d       t        j                         rJt        j
                         r6|xs
 t               }t        j                  d      |j                  v sJ d       t        t        t        ||d            }t        |       } t        |t              r|j                  |       }nt!        |       }t#        | |d       |t$        j&                  k(  r
t)               n	t+               }|j-                  |||||      }	t        |t              r|j.                  r|j1                          |	S )a  Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'Fr;   )
type_check)r9   r/   r3   r0   )r=   r>   r?   r@   rA   rB   r   device_device_typesr   r   r   rE   
isinstancer   stager
   r	   r    r)   r   r   execute_save should_synchronize_after_executesynchronize_staging)
r.   r9   r/   r3   r0   rF   pgstaged_state_dictexecutorfs
             r,   r   r      s0   r 
HH  !JKt224202LL2#3#33	
 u	
3
 ~nmERN )4J.+.*00<2:>%65I #&;&C&CC 	-.02  %%#%# & A 	>;/;;**,Hr+   c                     i }| j                         D ]*  \  }}t        |t              r|j                         n|||<   , |S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsrL   r   r.   )r.   stateful_state_dictkeyelems       r,   rE   rE     sK    %%' 
	T!+D(!;DOO 	C 
 r+   c                     t         j                  j                  d       t        || |      
t	               J d i }t        dd       x}||d<   j                  |d<   t        d
i | fd       }t        d
i |fd       }	j                  d||	      t        d
i |fd       }
t        d
i |fd       }j                  d	|
|      S )Nz,torch.distributed.checkpoint.save_state_dictr9   r0   c                     J j                         } dt        j                  j                        j                  vr2t        j                  d       j                  j                         nj                  | j                         j                  j                         j                         }j                  |      }|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r.   r\   is_coordinator)r\   inspect	signatureset_up_planner
parametersrC   rD   r]   set_up_storage_writercreate_local_planprepare_local_plan)r\   
local_plandistWr3   r.   r/   s     r,   
local_stepz$_save_state_dict.<locals>.local_step>  s    """%224!2!273I3I!J!U!UUMM.
 "":u/C/CD""%)$33 # 
 	,,U-A-AB..0
#66zB
r+   c                 Z    J j                  |       \  } j                  |       } | S N)create_global_planprepare_global_plan)all_local_plansglobal_metadatar3   r/   s    r,   global_stepz%_save_state_dict.<locals>.global_stepU  s<     """+2+E+Eo+V((<<_Mr+   planc                      J j                        } j                  |       }|j                          |j                         S ri   )finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr3   r/   s     r,   rr   z$_save_state_dict.<locals>.write_data`  sL    """"..|<#../?I
!!r+   c                 6    J j                  |        S )N)metadataresults)finish)all_resultsrm   r/   s    r,   finish_checkpointz+_save_state_dict.<locals>.finish_checkpointi  s(    ***Lr+   writer*   )
r=   r>   r?   r   r   getattrgroupr   reduce_scatter
all_reduce)r.   r/   r0   r1   r2   r3   ckpt_kwargsckpt_idrg   rn   rr   r}   rw   rf   rm   s   ``   `      @@@r,   r7   r7   (  s    
HH  !OPG5EFE$&OK>?DAAN'.O$',{{O$&+& ', &+& ' #11&*kRL&+&" '" &+& '
 GZ1BCCr+   )Nr   FN)@r^   osrC   concurrent.futuresr   enumr   typingr   r   r   typing_extensionsr   r=   torch.distributeddistributedr@   #torch.distributed._state_dict_utilsr	   r
   ,torch.distributed.checkpoint._async_executorr   4torch.distributed.checkpoint._async_process_executorr   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   r   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   "torch.distributed.distributed_c10dr   utilsr   r   r   __all__r    FutureWarningProcessGroupintboolr   strPathLiker   r(   r   rE   r7   r*   r+   r,   <module>r      s    	  %  ( ( (    X G K B K F < : > A 8 8 MD  ! 26%)

!
 D--.
 	

 
 k"
 


. 4( 48.2%)15j
j
 bkk4/0j
 ]+	j

 k"j
 D--.j
 j
 j
  )j
Z 4( 48.2%)155J5Q5Qaa bkk4/0a ]+	a
 k"a D--.a 3a a )aH O  26%)GDGD!GD D--.GD 	GD
 GD k"GD GDr+   