
    VhJ4                         d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	 d dl
Z
d dlmZ g dZ G d d      Z G d d	e      Z G d
 de      Z G d d      Zy)    N)ABCabstractmethod)TracebackType)Any
NamedTupleOptional)JoinHookJoinableJoinc                   (    e Zd ZdZddZdeddfdZy)r	   a  
    This defines a join hook, which provides two entry points in the join context manager.

    Entry points : a main hook, which is called repeatedly while there exists a non-joined
    process, and a post-hook, which is called once all processes have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    returnNc                      y)zCall this hook while there exists a non-joined process to shadow collective communications in a training iteration.

        Training iteration i.e., in one forward pass, backward pass, and optimizer step.
        N selfs    Q/home/dcms/DCMS/lib/python3.12/site-packages/torch/distributed/algorithms/join.py	main_hookzJoinHook.main_hook           is_last_joinerc                      y)aK  
        Call hook after all processes have joined.

        It is passed an additional ``bool`` argument ``is_last_joiner``, which indicates if the rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   s     r   	post_hookzJoinHook.post_hook    r   r   r   N)__name__
__module____qualname____doc__r   boolr   r   r   r   r	   r	      s    		 	 	r   r	   c                        e Zd ZdZed fd       Zedefd       Zeede	j                  fd              Zeedefd              Z xZS )r
   a_  
    This defines an abstract base class for joinable classes.

    A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    r   c                 T    t         |           t        j                         | _        y N)super__init___JoinConfigconstruct_disabled_join_config_join_config)r   	__class__s    r   r#   zJoinable.__init__7   s    'FFHr   c                      y)a  
        Return a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargss     r   	join_hookzJoinable.join_hook<   s     	r   c                      y)zeReturn the device from which to perform collective communications needed by the join context manager.Nr   r   s    r   join_devicezJoinable.join_deviceI        	r   c                      y)zfReturns the process group for the collective communications needed by the join context manager itself.Nr   r   s    r   join_process_groupzJoinable.join_process_groupO   r-   r   r   )r   r   r   r   r   r#   r	   r*   propertytorchdevicer,   r   r/   __classcell__)r'   s   @r   r
   r
   ,   s     I I 
X 
 
 U\\    C   r   r
   c                   @    e Zd ZU dZeed<   eed<   eed<   ed        Zy)r$   zdThis includes all fields needed from a :class:`Joinable` instance for the join context manager side.enablethrow_on_early_terminationis_first_joinablec                      t        ddd      S )zReturn a :class:`_JoinConfig` instance indicating that join-related logic should be disabled.

        e.g. if the caller is not in a join context manager.
        Fr5   r6   r7   )r$   r   r   r   r%   z*_JoinConfig.construct_disabled_join_config]   s     Ue
 	
r   N)r   r   r   r   r   __annotations__staticmethodr%   r   r   r   r$   r$   V   s(    oL $$
 
r   r$   c                       e Zd ZdZ	 	 ddee   dedefdZddZddZ	d	 Z
d
eee      dee   dee   fdZd Zd Zedefd       Zy)r   a
  
    This class defines the generic join context manager, which allows custom hooks to be called after a process joins.

    These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    	joinablesr5   r6   c                    t        |      dk(  rt        d      || _        | j                  D cg c]  } |j                  di | c}| _        || _        || _        | j                          | j                          y c c}w )Nr   z7The join context manager requires at least one joinabler   )	len
ValueError
_joinablesr*   _join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r=   r5   r6   r)   joinables         r   r#   zJoin.__init__   s~     y>QVWW#9=
-5H((
 +E(""$!
s   A?Nc                     t        | j                        dkD  sJ d}| j                  D ]+  }t        | j                  | j                  |      |_        d}- y)zESet the :class:`_JoinConfig` of each participating :class:`Joinable`.r   Tr9   FN)r?   rA   r$   rC   rD   r&   )r   r7   rG   s      r   rE   zJoin._set_joinable_configs   sZ    4??#a'''  	&H$/||+/+K+K"3%H!
 !&	&r   c                    d}d}| j                   D ]:  }||j                  }n||j                  k7  rt        d      |/|j                  }< || _        t        j                  | j                        | _        || _        y)a  
        Extract the process group and device information from the joinables.

        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	rA   r/   r@   r,   _process_groupdistget_rank_rank_device)r   process_groupr2   rG   s       r   rF   zJoin._extract_dist_info   s      	.H$ ( ; ;("="== M  ~!--	. ,]]4#6#67
r   c                      y r!   r   r   s    r   	__enter__zJoin.__enter__   r   r   typevalue	tracebackc           	         | j                   r|ryd}d}d}d}t        j                  d       |s||kD  r)t        j                  d| d| j                   d	| d
       | j                         }|dk(  rd}nD| j                  r| j                          | j                  D ]  }	|	j                           d}|dz  }|s| j                  D ]  }	|	j                  |        y)z
        Repeatedly runs the main hooks until all processes join; then, runs the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )rC   warningssimplefilterwarnrM   _get_num_nonjoined_procsrD   _notify_procs_to_terminaterB   r   r   )
r   rR   rS   rT   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr*   s
             r   __exit__zJoin.__exit__   s    ||t f%">!A%&&=zzl.0@ A33 #'"?"?"A"a'#' 33335 "&!1!1 *I'')* "'Q/ #4 )) 	0I/	0r   c                     t        j                  d| j                        }t        j                  || j
                         |j                         S )zaReturn the number of non-joined processes by shadowing an all-reduce in the non-joined processes.rW   r2   group)r1   zerosrN   rK   
all_reducerJ   item)r   r`   s     r   r[   zJoin._get_num_nonjoined_procs  s;    #kk!DLLA+43F3FG"''))r   c                     t        j                  d| j                        }t        j                  || j
                         t        d| j                   d      )zSchedule an all-reduce to notify non-joined processes to terminate.

        Also raise a ``RuntimeError`` indicating that the current process has exhausted its inputs.
        rW   rc   rd   zRank z exhausted all inputs.)r1   onesrN   rK   rg   rJ   RuntimeErrorrM   )r   rj   s     r   r\   zJoin._notify_procs_to_terminate  sE    
 zz!DLL1D$7$78U4::,.DEFFr   rG   c                    t        | d      sJ dt        |        d       | j                  }|j                  r|j                  sy| j
                  }| j                  }t        j                  d|      }t        j                  ||d      }|j                  rKt        j                  d|      }t        j                  ||	       |j                         }|rt        d
      |S )aH  
        Notifies the join context manager that the calling process has not yet joined.

        Then, if ``throw_on_early_termination=True``, checks if uneven inputs have been detected
        (i.e. if one process has already joined) and throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r&   zCheck that the z/ constructor calls the ``Joinable`` constructorNrW   rc   T)re   async_oprd   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrR   r&   r7   r5   r,   r/   r1   rj   rK   rg   r6   rf   rh   rk   )rG   join_configr2   rO   rj   workrf   should_throws           r   notify_join_contextzJoin.notify_join_context'  s    4 x0 	
d8n- .' '	
0
 ++,,K4F4F%% 33 zz!F+t=4H11KK&1EOOE7 ::<L"1  r   )TFr   )r   r   r   r   listr
   r   r#   rE   rF   rQ   r   rR   BaseExceptionr   ra   r[   r\   r;   rr   r   r   r   r   r   h   s    ;@ +0	">" " %)	"$
&< 20tM*+20 &20 M*	20h*G 4h 4 4r   r   )rX   abcr   r   typesr   typingr   r   r   r1   torch.distributeddistributedrK   __all__r	   r
   r$   r   r   r   r   <module>r{      sP     #  , ,    + <'s 'T
* 
$t tr   