
    BVh                        d Z ddlZddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	l
mZ dd
l
mZ ddl
mZ ddlmZ  e	j"                  dd      Z e	j"                  dd      ZdZ edg        G d d             Zy)z?Provides a utility class for preemption detection and recovery.    N)logging)detect_platform)PlatformDevice)context)
monitoring)AbortedError)CancelledError)InternalError)UnavailableError)	tf_exportzD/tensorflow/api/distribution_strategy/preemption_watcher_initializedz'Counter for usages of PreemptionWatcherz@/tensorflow/api/distribution_strategy/preemption_watcher_handledzICounter for number of preempions catched and handled by PreemptionWatcher TF_DEFAULT_PREEMPTION_NOTICE_KEYz)distribute.experimental.PreemptionWatcher)v1c                   2    e Zd ZdZd Zed        Zd Zd Zy)PreemptionWatchera  Watch preemption signal and store it.

  Notice: Currently only support Borg TPU environment with TPUClusterResolver.

  This class provides a way to monitor the preemption signal during training on
  TPU. It will start a background thread to watch the training process, trying
  to fetch preemption message from the coordination service. When preemption
  happens, the preempted worker will write the preemption message to the
  coordination service. Thus getting a non-empty preemption message means there
  is a preemption happened.

  User can use the preemption message as a reliable preemption indicator, and
  then set the coordinator to reconnect to the TPU worker instead of a fully
  restart triggered by Borg. For example, a training process with
  preemption recovery will be like:

  ```python
  keep_running = True
  preemption_watcher = None
  while keep_running:
    try:
      # Initialize TPU cluster and stratygy.
      resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
      tf.config.experimental_connect_to_cluster(resolver)
      tf.tpu.experimental.initialize_tpu_system(resolver)
      strategy = tf.distribute.TPUStrategy(resolver)

      # PreemptionWatcher must be created after connected to cluster.
      preemption_watcher = tf.distribute.experimental.PreemptionWatcher()
      train_model(strategy)
      keep_running = False
    except Exception as e:
      if preemption_watcher and preemption_watcher.preemption_message:
        preemption_watcher.block_until_worker_exit()
        keep_running = True
      else:
        raise e
  ```

  Attributes:
    preemption_message: A variable to store the preemption message fetched from
      the coordination service. If it is not None, then there is a preemption
      happened.
    platform: A PlatformDevice to indicate the current job's platform. Refer to
      failure_handling_util.py for the definition of enum class PlatformDevice.
  c                 P   d | _         t               | _        | j                  t        j                  k7  r!t        j                  d| j                         y t        j                         j                  d       t        j                  | j                  d      j                          y )Nz3Preemption watcher does not support environment: %s   T)targetdaemon)_preemption_messager   	_platformr   INTERNAL_TPUr   warning*_preemption_watcher_initialization_counterget_cellincrease_by	threadingThread_watch_preemption_keystartselfs    p/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/failure_handling/preemption_watcher.py__init__zPreemptionWatcher.__init__]   ss    #D$&DN~~444oo
? 199;GGJd88FLLN    c                     | j                   S )zReturns the preemption message.)r   r    s    r"   preemption_messagez$PreemptionWatcher.preemption_messagei   s     ###r$   c                     t        j                  d       t        j                         j                  t              }t
        j                         j                  d       t        j                  d       || _        y )NzWatching preemption signal.r   zPreemption signal received.)	r   infor   get_config_key_value_PREEMPTION_KEY_preemption_handling_counterr   r   r   )r!   messages     r"   r   z'PreemptionWatcher._watch_preemption_keyn   sR    LL./oo44_EG ))+77:LL./&Dr$   c                 P   | j                   t        j                  k7  ry	 t        j                         j	                  d       y# t
        $ r.}d|j                  vr t        j                  d       Y d}~yd}~wt        t        t        f$ r t        j                  d       Y yw xY w)a  Block coordinator until workers exit.

    In some rare cases, another error could be raised during the
    preemption grace period. This will cause the coordinator to reconnect to the
    same TPU workers, which will be killed later. It prevents the coordinator to
    reconnect to new TPU workers, and falls back to a hard restart. To avoid
    this situation, this method will block the coordinator to reconnect until
    workers exit. This method will be a no-op for non-TPU platform.
    NBLOCK_TILL_EXITz$Coordination service is not enabled.zWorkers exited.)r   r   r   r   r)   r
   r,   r   r(   r   r	   r   )r!   es     r"   block_until_worker_exitz)PreemptionWatcher.block_until_worker_exitu   s     ~~444&oo,,->? &	/qyy	@ll$%%.*:; &ll$%&s   #A 	B%$A66,B%$B%N)	__name__
__module____qualname____doc__r#   propertyr&   r   r0    r$   r"   r   r   ,   s,    -^
O $ $'&r$   r   )r4   r   abslr   Ctensorflow.python.distribute.failure_handling.failure_handling_utilr   r   tensorflow.python.eagerr   r   "tensorflow.python.framework.errorsr   r	   r
   r    tensorflow.python.util.tf_exportr   Counterr   r+   r*   r   r6   r$   r"   <module>r=      s    F   _ ^ + . ; = < ? 6 .@Z-?-?J-. *  2z11FO  
 5 62>\& \& ?\&r$   