
    AVh"                         d Z ddlZddlZddlmZ ddlmZ  edd       G d dej                               ZeZ	 ed	       G d
 de
             Z G d de
      Z ed       G d de
             Zy)zUtilities for collectives.    N)deprecation)	tf_exportz3distribute.experimental.CommunicationImplementationz/distribute.experimental.CollectiveCommunicationc                       e Zd ZdZdZdZdZy)CommunicationImplementationu  Cross device communication implementation.

  Warning: The alias `tf.distribute.experimental.CollectiveCommunication` is
  deprecated and will be removed in a future version. Use
  `tf.distribute.experimental.CommunicationImplementation` instead.

  * `AUTO`: Automatically chosen by Tensorflow.
  * `RING`: TensorFlow's ring algorithms for all-reduce and
    all-gather.
  * `NCCL`: NVIDIA®'s NCCL library. This is now only used for all-reduce on
    GPUs; all-reduce on CPU, all-gather and broadcast fallbacks to RING.
  AUTORINGNCCLN)__name__
__module____qualname____doc__r   r   r	        \/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/collective_util.pyr   r      s     
$	$	$r   r   z,distribute.experimental.CommunicationOptionsc                   8    e Zd ZdZd Zddej                  fdZy)_OptionsExporteda  Options for cross device communications like All-reduce.

  This can be passed to methods like
  `tf.distribute.get_replica_context().all_reduce()` to optimize collective
  operation performance. Note that these are only hints, which may or may not
  change the actual behavior. Some options only apply to certain strategy and
  are ignored by others.

  One common optimization is to break gradients all-reduce into multiple packs
  so that weight updates can overlap with gradient all-reduce.

  Examples:

  ```python
  options = tf.distribute.experimental.CommunicationOptions(
      bytes_per_pack=50 * 1024 * 1024,
      timeout_seconds=120.0,
      implementation=tf.distribute.experimental.CommunicationImplementation.NCCL
  )
  grads = tf.distribute.get_replica_context().all_reduce(
      'sum', grads, options=options)
  optimizer.apply_gradients(zip(grads, vars),
      experimental_aggregate_gradients=False)
  ```

  c                     t        |i |S )NOptions)clsargskwargss      r   __new__z_OptionsExported.__new__O   s    
 D#F##r   r   Nc                      y)a  Creates a CollectiveHints.

    Args:
      bytes_per_pack: a non-negative integer. Breaks collective operations into
        packs of certain size. If it's zero, the value is determined
        automatically. This hint is respected by all multi-replica strategies
        except `TPUStrategy`.
      timeout_seconds: a float or None, timeout in seconds. If not None, the
        collective raises `tf.errors.DeadlineExceededError` if it takes longer
        than this timeout. Zero disables timeout. This can be useful when
        debugging hanging issues.  This should only be used for debugging since
        it creates a new thread for each collective, i.e. an overhead of
        `timeout_seconds * num_collectives_per_second` more threads. This only
        works for `tf.distribute.experimental.MultiWorkerMirroredStrategy`.
      implementation: a
        `tf.distribute.experimental.CommunicationImplementation`. This is a hint
        on the preferred communication implementation. Possible values include
        `AUTO`, `RING`, and `NCCL`. NCCL is generally more performant for GPU,
        but doesn't work for CPU. This only works for
        `tf.distribute.experimental.MultiWorkerMirroredStrategy`.

    Raises:
      ValueError: When arguments have invalid value.
    Nr   selfbytes_per_packtimeout_secondsimplementations       r   __init__z_OptionsExported.__init__V   s    8 	r   )r
   r   r   r   r   r   r   r    r   r   r   r   r   2   s"    6$  #9>>	r   r   c                   t    e Zd ZdZddej
                  fdZej                  j                  e_        d Zd Z	y)r   z#Implementation of OptionsInterface.r   Nc                     |dk  rt        d| d      t        |t              rt        |j	                               }t        |t              st        d      || _        || _        || _        y )Nr   z0Argument `bytes_per_pack` must be >=0, Received .zgArgument `implementation` must be instance of `tf.distribute.experimental.CommunicationImplementation`.)
ValueError
isinstancestrr   upperr   r   r   r   s       r   r    zOptions.__init__x   s     <^<LA
NP P.#&2>3G3G3IJnn&ABFG G )D*D(Dr   c                    t        j                  |       }||S |j                  dk7  r|j                  |_        |j                  |j                  |_        |j                  t
        j                  k7  r|j                  |_        |S )a1  Merges with another options and returns a new one.

    Values specified in the `options` takes precedence if they're not the
    default.

    Args:
      options: a `tf.distribute.experimental.CollectiveCommunication`.

    Returns:
      A new `tf.distribute.experimental.CollectiveCommunication`.
    r   )copydeepcopyr   r   r   r   r   )r   optionsmergeds      r   mergezOptions.merge   sz     ]]4 Fm"%44f*&66f!<!A!AA%44fMr   c                 V    d| j                    d| j                   d| j                   dS )NzOptions(bytes_per_pack=z,timeout_seconds=z, implementation=))r   r   r   )r   s    r   __str__zOptions.__str__   s>    %d&9&9%: ;#334 5"112!5 6r   )
r
   r   r   r   r   r   r    r   r-   r0   r   r   r   r   r   u   s;    +  #9>>)" &..66(.6r   r   z'distribute.experimental.CollectiveHintsc                   L    e Zd ZdZ ej
                  dd      dd       ZddZy)Hintsan  Hints for collective operations like AllReduce.

  This can be passed to methods like
  `tf.distribute.get_replica_context().all_reduce()` to optimize collective
  operation performance. Note that these are only hints, which may or may not
  change the actual behavior. Some options only apply to certain strategy and
  are ignored by others.

  One common optimization is to break gradients all-reduce into multiple packs
  so that weight updates can overlap with gradient all-reduce.

  Examples:

  - bytes_per_pack

  ```python
  hints = tf.distribute.experimental.CollectiveHints(
      bytes_per_pack=50 * 1024 * 1024)
  grads = tf.distribute.get_replica_context().all_reduce(
      'sum', grads, experimental_hints=hints)
  optimizer.apply_gradients(zip(grads, vars),
      experimental_aggregate_gradients=False)
  ```

  - timeout_seconds

  ```python
  strategy = tf.distribute.MirroredStrategy()
  hints = tf.distribute.experimental.CollectiveHints(
      timeout_seconds=120.0)
  try:
    strategy.reduce("sum", v, axis=None, experimental_hints=hints)
  except tf.errors.DeadlineExceededError:
    do_something()
  ```

  Nz8use distribute.experimental.CommunicationOptions insteadc                     t        ||      S )N)r   r   r   )r   r   r   s      r   r   zHints.__new__   s     %H Hr   c                      y)aq  Creates a CollectiveHints.

    Args:
      bytes_per_pack: a non-negative integer. Breaks collective operations into
        packs of certain size. If it's zero, the value is determined
        automatically. This only applies to all-reduce with
        `MultiWorkerMirroredStrategy` currently.
      timeout_seconds: a float or None, timeout in seconds. If not None, the
        collective raises `tf.errors.DeadlineExceededError` if it takes longer
        than this timeout. This can be useful when debugging hanging issues.
        This should only be used for debugging since it creates a new thread for
        each collective, i.e. an overhead of `timeout_seconds *
        num_collectives_per_second` more threads.  This only works for
        `tf.distribute.experimental.MultiWorkerMirroredStrategy`.

    Raises:
      ValueError: When arguments have invalid value.
    Nr   )r   r   r   s      r   r    zHints.__init__   s    & 	r   )r   N)r
   r   r   r   r   
deprecatedr   r    r   r   r   r2   r2      s8    $L ;
FHHHH	r   r2   )r   r)   enumtensorflow.python.utilr    tensorflow.python.util.tf_exportr   Enumr   CollectiveCommunicationobjectr   r   r2   r   r   r   <module>r<      s     !   . 6 @<>$)) >& 6  9:?	v ?	 ;?	D06f 06f 45@	F @	 6@	r   