
    BVh{                        d Z ddlZddlZddlZddlZddlZddlmZ ddlm	Z
 ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' dZ(dZ)dZ*dZ+dZ,dZ-dZ.d Z/ e%dg        G d  d!e0             Z1 G d" d#e1      Z2 G d$ d%e1      Z3 G d& d'e1      Z4 G d( d)e1      Z5d* Z6 e%d+g        G d, d-e0             Z7e7Z8y).ax  Module for `PreemptionCheckpointHandler`.

This is currently under development and the API is subject to change.

PreemptionCheckpointHandler reduces loss of training progress caused by
termination (preemption or maintenance) of workers in multi-worker synchronous
training and avoid surfacing an error indistinguishable from application errors
to the job scheduler or users.
    N)gen_check_preemption_op)
checkpoint)checkpoint_context)checkpoint_management)distribute_lib)multi_worker_util)failure_handling_util)context)constant_op)dtypes)errors)file_io)	variables)gfile)
tf_logging)tf_contextlib)
deprecated)	tf_export)doc_controlsRUN_TO_CHECKPOINTLAST_RUN_TO_CHECKPOINTTERMINATED_WORKERRECEIVED_SIGNALcheckpointed_runsSTOP_WATCHER TF_DEFAULT_PREEMPTION_NOTICE_KEYc                 D   t         j                  j                  |       }t         j                  j                  |       }dt	        |      z   }t         j                  j                  ||      }t        j                  |       t         j                  j                  ||      S )z<Returns a directory for non-chief worker to save checkpoint.workertemp_)ospathdirnamebasenamestrjoinr   recursive_create_dir_v2)checkpoint_dirtask_iddirpathbasebase_dirpaths        n/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/failure_handling/failure_handling.py_non_chief_checkpoint_dirr,   @   sm    GGOON+'			.	)$W-,GGLL,/'	!!'*	gt	$$    z)distribute.experimental.TerminationConfig)v1c                        e Zd ZdZ	 	 	 	 ddZy)TerminationConfiga  Customization of `PreemptionCheckpointHandler` for various platforms.

  A `TerminationConfig` can be created and passed to a
  `tf.distribute.experimental.PreemptionCheckpointHandler` to provide
  customization based on the platform. It can deliver three pieces of
  information:

  * How to decide if there is a termination event soon

  The form of termination notification and how to fetch it vary across
  platforms. Thus `PreemptionCheckpointHandler` may take a user-defined
  function, `termination_watcher_fn`, and execute it repeatedly to check for
  termination notification. `termination_watcher_fn` should be a function
  that returns `True` if a termination notification is available and
  `False` otherwise. The function should be lightweight and non-blocking so that
  resources can be cleaned up properly if no termination signal is ever raised
  until training finishes.

  * How to exit the program

  A user can configure this through the `exit_fn`, which
  `PreemptionCheckpointHandler` executes after saving the checkpoint to exit the
  training program gracefully. For `tf.distribute.MultiWorkerMirroredStrategy`,
  a restart is necessary to reset the program's state. However, having a
  customized `exit_fn` may facilitate the restart and smoothen the training
  experience. How so? Maybe the platform has an agreement to a `RESTART_CODE`
  recognized as a program auto-restart signal, or maybe the user has a
  coordinating script that starts up the training, in which they can configure
  the program to auto-restart if it ever exits with this `RESTART_CODE`. In both
  cases, configuring the `exit_fn` to be `sys.exit(RESTART_CODE)` makes the
  training seamless.

  * How long does `PreemptionCheckpointHandler` have from receiving a
  termination event notice till the actual termination

  Some platforms have a gap time as long as one hour or so. In these cases,
  there is the option to utilize this gap time for training as much as possible
  before saving a checkpoint and exiting. This can be achieved by passing the
  `grace_period` argument a nonzero value. Note, for a user with a grace period
  that is not multiple times longer than their checkpoint writing time (e.g.,
  three times or more), we advise not to configure this argument, in which case
  `PreemptionCheckpointHandler` will directly save a checkpoint and exit.


  **The default behavior**:

  * For Google Borg Platform:
      * Automatically know how to detect preemption signal
      * Exit with a platform-recognized restart code
      * Save a checkpoint and exit immediately

  * For Google Cloud Platform:
      * Automatically know how to detect maintenance signal.
      * Exit with a code (User may configure this)
      * Automatically utilized the extended training period before save and exit

  * For Other platform:
      * If `termination_watcher_fn` is `None`, we will treat `signal.SIGTERM` as
      a termination signal.
      * If `exit_fn` is not configured, we exit the program with an arbitrary
      code.
      * If `grace_period` is not configured, we will wrap up the current
      training step, save a checkpoint, and exit the program as soon as we
      receive the termination signal.
  Nc                 <    || _         || _        || _        || _        y)a\  Creates a `TerminationConfig` object.

    Args:
      termination_watcher_fn: a function to execute repeatedly that returns
        `True` if a preemption signal is available and False otherwise. The
        function cannot block until a preemption signal is available, which
        prevents proper cleanup of the program. A change is **NOT** recommended
        for users on Google Borg or Google Cloud Platform.
      exit_fn: a function to execute after a checkpoint is saved and before the
        preemption happens. Usually, it should be in the form of
        `lambda: sys.exit(RESTART_CODE)`, where `RESTART_CODE` varies by
        platform. A change is **NOT** recommended for users on Google Borg.
        Users on Google Cloud Platform may configure it to use a customized
        `RESTART_CODE`.
      grace_period: the length of time between receiving a preemption signal and
        the actual preemption. A change is **NOT** recommended for users on
        Google Borg, Google Cloud Platform, or users with a short grace period.
      save_fn: an optional function letting you configure how to save a
        checkpoint. This is useful if you'd like to pass extra argument to
        `tf.train.CheckpointManager.save` or `tf.train.Checkpoint.save`. By
        default, if not configured, the API will save checkpoint without extra
        arguments.
    Ntermination_watcher_fnexit_fngrace_periodsave_fnselfr3   r4   r5   r6   s        r+   __init__zTerminationConfig.__init__   s"    8 #9DDL$DDLr-   NNNN__name__
__module____qualname____doc__r9    r-   r+   r0   r0   J   s    @F '+ 	r-   r0   c                        e Zd ZdZ	 	 	 	 ddZy)GcpGpuTerminationConfigzConfigurations for GCP GPU VM.Nc                     |xs t         j                  | _        |xs t         j                  | _        |s|dk(  r|nt         j
                  | _        || _        y Nr   )r	    termination_watcher_function_gcer3   gce_exit_fnr4   GRACE_PERIOD_GCEr5   r6   r7   s        r+   r9   z GcpGpuTerminationConfig.__init__   sY     	 	B AA 	 ?3??DL$(9.. 	 DLr-   r:   r;   r@   r-   r+   rB   rB      s    & "r-   rB   c                        e Zd ZdZ	 	 	 	 ddZy)GcpCpuTerminationConfigzConfigurations for GCP CPU VM.Nc                     |xs t         j                  | _        |xs t         j                  | _        |xs d| _        || _        y rD   )r	   rE   r3   rF   r4   r5   r6   r7   s        r+   r9   z GcpCpuTerminationConfig.__init__   s?     #9"r<Q<r<rD?3??DL$)DDLr-   r:   r;   r@   r-   r+   rI   rI      s    & "	r-   rI   c                        e Zd ZdZ	 	 	 	 ddZy)BorgTerminationConfigConfigurations for Borg.Nc                 R    || _         d }|xs || _        |xs d| _        || _        y )Nc                  ,    t        j                  d      S )N*   )sysexitr@   r-   r+   <lambda>z0BorgTerminationConfig.__init__.<locals>.<lambda>   s    chhrl r-   r   r2   )r8   r3   r4   r5   r6   default_exit_fns         r+   r9   zBorgTerminationConfig.__init__   s1     #9D*O-oDL$)DDLr-   r:   r;   r@   r-   r+   rL   rL      s      "
r-   rL   c                        e Zd ZdZ	 	 	 	 ddZy)BorgTPUTerminationConfigrM   Nc                 h    || _         |xs t        j                  | _        |xs d| _        || _        y rD   )r3   r	   default_tpu_exit_fnr4   r5   r6   r7   s        r+   r9   z!BorgTPUTerminationConfig.__init__   s3     #9DG3GGDL$)DDLr-   r:   r;   r@   r-   r+   rV   rV      s      "	r-   rV   c                 r   |s
t               }| t        j                  j                  u r6t	        |j
                  |j                  |j                  |j                        S | t        j                  j                  u r6t        |j
                  |j                  |j                  |j                        S | t        j                  j                  u r6t        |j
                  |j                  |j                  |j                        S t        |j
                  |j                  |j                  |j                        S )zAComplete un-filled fields of TerminationConfig based on platform.)r0   r	   PlatformDeviceGCE_GPUrB   r3   r4   r5   r6   GCE_CPUrI   INTERNAL_TPUrV   rL   )platform_devicetermination_configs     r+    _complete_config_for_environmentr`      s   	*,-<<DDD"#5#L#L#5#=#=#5#B#B#5#=#=? ?
 />>FFF"#5#L#L#5#=#=#5#B#B#5#=#=? ?
 />>KKK#$6$M$M$6$>$>$6$C$C$6$>$>@ @ !11""$6$C$C""$ $r-   z3distribute.experimental.PreemptionCheckpointHandlerc                       e Zd ZdZ	 	 ddZd Zd Zd Zd Zd Z	d	 Z
d
 Zd Zd Zd Ze edd      ej$                  d                      Zd Zd Zd Zd Zej2                  d        Zd Zd Zd Zd Zd Zd Z y)PreemptionCheckpointHandlera8  Preemption and error handler for synchronous training.

  Note: This API only supports use with
  `tf.distribute.MultiWorkerMirroredStrategy` and `tf.distribute.TPUStrategy`.

  A `PreemptionCheckpointHandler` coordinates all workers to save a checkpoint
  upon receiving a preemption signal. It also helps disseminate application
  error messages accurately among the cluster. When a
  `PreemptionCheckpointHandler` object is created, it restores values from
  the latest checkpoint file if any exists.

  Right after the initialization, the object starts to watch out for termination
  signal for any member in the cluster. If receiving a signal, the next time the
  worker executes `PreemptionCheckpointHandler.run`, the
  `PreemptionCheckpointHandler` will align all workers to save a checkpoint.
  Then, if an `exit_fn` is configured via
  `tf.distribute.experimental.TerminationConfig`, it will be invoked. Otherwise,
  the process will simply exit and later the platform should restart it.

  Note: We advise users of `tf.distribute.MultiWorkerMirroredStrategy` who
  choose to configure their
  own `exit_fn` in `tf.distribute.experimental.TerminationConfig` to include a
  `sys.exit(CODE_OR_MESSAGE)` in the `exit_fn` so that after the restart, all
  workers can initialize communication services correctly. For users of
  `tf.distribute.TPUStrategy`, if they do not wish to do a cluster restart but
  would like an in-process restart (i.e., keep the coordinator alive and re-do
  the steps to connect to cluster, initialize TPU system, and make the
  `TPUStrategy` object), they could configure the `exit_fn` to a no-op.

  For users of `tf.distribute.MultiWorkerMirroredStrategy`, the core API is
  `PreemptionCheckpointHandler.run`:

  ```python
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  trained_epoch = tf.Variable(initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
  step_in_epoch = tf.Variable(initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='step_in_epoch')

  with strategy.scope():
    dataset, model, optimizer = ...

    checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                     model=model,
                                     trained_epoch=trained_epoch,
                                     step_in_epoch=step_in_epoch)

    preemption_checkpoint_handler = tf.distribute.experimental.PreemptionCheckpointHandler(cluster_resolver, checkpoint, checkpoint_dir)

  while trained_epoch.numpy() < NUM_EPOCH:

    while step_in_epoch.numpy() < STEPS_PER_EPOCH:

      # distributed_train_function contains a call to strategy.run.
      loss += preemption_checkpoint_handler.run(distributed_train_function, args=(next(iterator),))
      # For users of MultiWorkerMirroredStrategy, usually
      # STEPS_PER_TRAIN_FUNCTION = 1.
      step_in_epoch.assign_add(STEPS_PER_TRAIN_FUNCTION)
      ...

    epoch.assign_add(1)
    step_in_epoch.assign(0)
  ```

  For users of `tf.distribute.TPUStrategy`, the core APIs are
  `PreemptionCheckpointHandler.run` and
  `PreemptionCheckpointHandler.watch_preemption_scope`:

  ```python

  strategy = tf.distribute.TPUStrategy(tpu_cluster_resolver)

  # Rest of TPU init omitted, see documentation for TPUSTrategy.

  with preemption_checkpoint_handler.watch_preemption_scope():
    while trained_epoch.numpy() < NUM_EPOCH:

      while step_in_epoch.numpy() < STEPS_PER_EPOCH:

        # distributed_train_function contains a call to strategy.run.
        loss += preemption_checkpoint_handler.run(distributed_train_function, args=(next(iterator),))

        # For users of TPUStrategy, usually STEPS_PER_TRAIN_FUNCTION >> 1 since
        # clustering multiple steps within a tf.function amortizes the overhead
        # of launching a multi-device function on TPU Pod.
        step_in_epoch.assign_add(STEPS_PER_TRAIN_FUNCTION)
        ...

      epoch.assign_add(1)
      step_in_epoch.assign(0)
  ```

  Not all interruptions come with advance notice so that the
  `PreemptionCheckpointHandler` can handle them, e.g., those caused by hardware
  failure. For a user who saves checkpoints for these cases themselves outside
  the `PreemptionCheckpointHandler`, if they are using a
  `tf.train.CheckpointManager`, pass it as the
  `checkpoint_or_checkpoint_manager` argument to the
  `PreemptionCheckpointHandler`. If they do not have a
  `tf.train.CheckpointManager` but are directly working with
  `tf.train.Checkpoint`, we advise saving the checkpoints in the directory
  that's passed as the `checkpoint_dir` argument. In this way, at the program
  beginning, `PreemptionCheckpointHandler` can restore the latest checkpoint
  from the directory, no matter it's saved by the user themselves or saved by
  the `PreemptionCheckpointHandler` before preemption happens.

  **A note on the platform:**

  `PreemptionCheckpointHandler` can only handle the kind of termination with
  advance notice. For now, the API recognizes the termination signal for CPU,
  GPU, and TPU on Google Borg and CPU and GPU on the Google Cloud Platform. In
  these cases, `PreemptionCheckpointHandler` will automatically adopt the
  correct preemption/maintenance notification detection mechanism. Users of
  other platforms can configure a detection monitoring behavior through the
  `tf.distribute.experimental.TerminationConfig`. Customization for the exit
  behavior and grace period length could also be done here.
  Nc                 f   t        |t        j                        r|st        j                  d      || _        || _        || _        || _        t        j                         | _        t        | j                  | j                        }|j                  | _        |j                  | _        |j"                  | _        |j&                  | _        d| _        | j                  t        j,                  j.                  t        j,                  j0                  fv rt3        j4                  d       nu| j                  t        j,                  j6                  k(  r| j9                          n=|r+d|j;                         j=                         v rt?        d      | jA                          t3        jB                  d       y)a:  Creates the `PreemptionCheckpointHandler`.

    Args:
      cluster_resolver: a `tf.distribute.cluster_resolver.ClusterResolver`
        object. You may also obtain it through the `cluster_resolver` attribute
        of the distribution strategy in use.
      checkpoint_or_checkpoint_manager: a `tf.train.CheckpointManager` or a
        `tf.train.Checkpoint`. If you are using a `tf.train.CheckpointManager`
        to manage checkpoints outside the `PreemptionCheckpointHandler` for
        backup purpose as well, pass it as `checkpoint_or_checkpoint_manager`
        argument. Otherwise, pass a `tf.train.Checkpoint` and the
        `PreemptionCheckpointHandler` will create
        a `tf.train.CheckpointManager` to manage it in the `checkpoint_dir`.
      checkpoint_dir: a directory where the `PreemptionCheckpointHandler` saves
        and restores checkpoints. When a `PreemptionCheckpointHandler` is
        created, the latest checkpoint in the `checkpoint_dir` will be restored.
        (This is not needed if a `tf.train.CheckpointManager` instead of a
        `tf.train.Checkpoint` is passed as the
        `checkpoint_or_checkpoint_manager` argument.)
      termination_config: optional, a
        `tf.distribute.experimental.TerminationConfig` object to configure for a
        platform other than Google Borg or GCP.
    zEWhen a checkpoint is passed, a checkpoint_dir must be passed as well.TzQPreemptionCheckpointHandler does not support usage with TPU or CPU device on GCP.pszjPreemptionCheckpointHandler does not supportusage with tf.distribute.experimental.ParameterServerStrategy.z4PreemptionCheckpointHandler initialized or restored.N)"
isinstancecheckpoint_lib
Checkpointr   InvalidArgumentError_cluster_resolver_termination_config!_checkpoint_or_checkpoint_manager_checkpoint_dirr	   detect_platform_platform_devicer`   r3   _termination_watcher_fnr4   _exit_fnr5   _grace_periodr6   _save_fn_local_moderZ   GCE_TPUr\   loggingwarningr]   _initialize_for_tpu_strategycluster_specas_dictNotImplementedError2_initialize_for_mirrored_and_multi_worker_mirroredinfo)r8   cluster_resolver checkpoint_or_checkpoint_managerr&   r_   completed_termination_configs         r+   r9   z$PreemptionCheckpointHandler.__init__  s   < 2 ++-5C'' ), - - .D1D-MD*)D1AACD#Ct77$  	%;; 	  188DM5BBD088DMD,,44,,44!  oo& 	 //<<	= '') 
d&6&C&C&E&M&M&OO!M
 	

 ==?LLGHr-   c                     d| _         d| _        d| _        | j                          | j                  j                          d| _        y)z<Makes configurations for using the handler with TPUStrategy.TNr   )	_is_chief_poll_termination_signal_thread(_cluster_wise_termination_watcher_thread _maybe_create_checkpoint_manager_read_checkpoint_managerrestore_or_initialize_run_counterr8   s    r+   rw   z8PreemptionCheckpointHandler._initialize_for_tpu_strategy  s?    DN+/D(48D1))+!!779Dr-   c                    | j                   r$| j                   j                         j                  sd| _        d| _        d| _        nd| _        t        t        j                  | j                   j                         | j                   j                  | j                   j                              | _        t        j                  | j                   j                         | j                   j                  | j                   j                        | _        t        j                  t        j                  dt         j"                        dt$              | _        | j)                          t+        | j,                  j.                  t$              s/t1        | j,                  j.                  t$        | j&                         t+        | j2                  j.                  t$              s/t1        | j2                  j.                  t$        | j&                         | j2                  j5                          d| _        d| _        | j&                  j;                         | _        t?        j@                         | _!        t?        j@                         | _"        tF        jH                  jK                  | jL                  jN                  d      jQ                  d	       | j                  sdt?        jR                  | jT                  d
| j                  z  d      | _+        tY        jZ                  d       | jV                  j]                          nd| _+        d| _/        | j`                  r| jc                          y| je                          y)zCMakes configurations and start watchers for MS, MWMS, or OneDevice.Tsingle_workerF)rx   	task_typer'   r   )dtype)initial_value	trainablenamerb      zPeerTerminationWatcher-%stargetr   daemonz Start watcher for peer's signal.N)3ri   rx   jobsrs   _id_in_clusterr   r#   r   id_in_clusterr   r'   is_chiefr   Variabler   constantr   int64_ITERATION_VARIABLE_checkpointed_runsr   hasattr_write_checkpoint_manager_checkpointsetattrr   r   _final_checkpoint_countdown_estimated_run_timenumpyr   	threadingEvent_received_own_sigterm_received_checkpoint_stepr   'distribution_strategy_input_api_counterget_cellrn   r   increase_byThread_watch_step_to_save_keyr   ru   r|   startr   ro   %_start_polling_for_termination_signal_start_watching_for_signalr   s    r+   r{   zNPreemptionCheckpointHandler._initialize_for_mirrored_and_multi_worker_mirrored%  s    ""%%22499 d+ddnd

)
)$$113$$..$$,,./d
 )11--::<**44((002dn (00!**1FLLA "D
 	))+411==&(d,,88:M%%' 400<<&(d++779L%%' 	!!779 (-D$ D //557D "+!2D &/__%6D"::CC""%''2{1~ 7@6F6F--*T-@-@@7d3 ll67
3399; 7;d3+/D(##
002
%%'r-   c                    t        | j                  t        j                        r*| j                  | _        | j                  | _        d| _        yd| _        t        j                  | j                  | j                  d      | _        | j                  r| j                  | _        yt        j                  | j                  t        | j                  | j                  j                        d      | _        y)zCCreate CheckpointManager(s) if a checkpoint is passed else take it.FTr   )	directorymax_to_keep)r   N)re   rk   r   CheckpointManagerr   r   _api_made_checkpoint_managerrl   r   r,   ri   r'   r   s    r+   r   z<PreemptionCheckpointHandler._maybe_create_checkpoint_manager  s    $88'99;&*&L&Ld#'+'M'Md$*/d'*.d' '<&M&M

0
0(('d#
 
)-)F)F& "3366)$*>*>*.*@*@*H*HJ	 	&r-   c                     t        j                  d       t        j                  t        j                  | j                         y )NzStart watcher for local signal.)ru   r|   signalSIGTERM_sigterm_handler_fnr   s    r+   r   z6PreemptionCheckpointHandler._start_watching_for_signal  s'    LL23
MM&..$":":;r-   c                     t        j                         | _        t        j                  | j                  d| j
                  z  d      | _        t        j                  d       | j                  j                          y )Nz!WorkerTerminationSignalWatcher-%sTr   z%Start polling for termination signal.)
r   r   +_poll_termination_signal_thread_should_stopr   _poll_termination_signalr   r   ru   r|   r   r   s    r+   r   zAPreemptionCheckpointHandler._start_polling_for_termination_signal  s_    7@7HD4+4+;+;,,043F3FF,D( LL89((..0r-   c                     	 | j                   j                         s| j                  ry| j                         rnt	        j
                  d       N| j                          y)z:Poll maintenance notice and notify peers if receiving one.Nr   )r   is_setr   ro   timesleep_maybe_set_received_own_sigtermr   s    r+   r   z4PreemptionCheckpointHandler._poll_termination_signal  sR    
		9	9	@	@ 
++		%	%	'
jjm  	((*r-   c                 B   | j                   rTt        j                  d| j                         t	        j                         | _        | j                  j                          y	 t        j                         j                  t        | j                         t        j                  d| j                         t	        j                         | _        | j                  j                          y# t        j                  $ r# t        j                  d| j                         Y yw xY w)z<Claim earliest preemption if no one else has done it before.z*Member %s has received termination notice.NzMember %s has received termination notice. But some other worker has received it as well! Leaving it to them to decide when to checkpoint. )rs   ru   r|   r   r   _received_own_sigterm_timer   setr
   set_config_key_value_PREEMPTION_WORKER_KEYr   AlreadyExistsErrorr   s    r+   r   z;PreemptionCheckpointHandler._maybe_set_received_own_sigterm  s    ll?&&((,		d%
  $$&oo,,-C-1-@-@Bll?&&((,		d%
  $$& $$ 	ll; 

 	s   "BC( (3DDc                     t        | dd       rQ| j                  j                          | j                  j	                          d | _        t        j                  d       y y )Nr   z2Shut down watcher for one's own termination signal)getattrr   r   r   r$   ru   r|   r   s    r+   $_stop_poll_termination_signal_threadz@PreemptionCheckpointHandler._stop_poll_termination_signal_thread  sM    t6=
66::<
**//1-1d*llGH >r-   c                 "   t        | dd      r	 t        j                         j                  t        t               	 t        j                         j                  t        t               | j                  j                          d| _        t        j                  d       yy# t
        j                  t
        j                  f$ r Y t        $ r+}t        j                  dt        |      z          Y d}~d}~ww xY w# t
        j                  t
        j                  f$ r Y t        $ r+}t        j                  dt        |      z          Y d}~d}~ww xY w# | j                  j                          d| _        t        j                  d       w xY w)z0Stop the thread that is _watch_step_to_save_key.r   NzQIgnoring error when shutting down _stop_cluster_wise_termination_watcher_thread: z0Shut down watcher for peer's termination signal.)r   r
   r   _INITIAL_RUN_COUNT_KEY_STOP_WATCHING_CLUSTER_VALUEr   r   UnavailableError	Exceptionru   r|   r#   _FINAL_RUN_COUNT_KEYr   r$   )r8   es     r+   -_stop_cluster_wise_termination_watcher_threadzIPreemptionCheckpointHandler._stop_cluster_wise_termination_watcher_thread  sZ   t?FQ.."$@	
J../C/K	M 	55::<8<5HI= G
 '')@)@A 
 	 Q 	 GILQP 	Q 	QQ '')@)@A  Q GILQP 	Q 	QQ
 	55::<8<5HIsR   ,B  ,C;  #C8C8!C33C8;#EE  E(!E	E EE 8Fc                 D    | j                          | j                          y )N)r   r   r   s    r+   __del__z#PreemptionCheckpointHandler.__del__  s    668--/r-   z<Track steps using a tf.Variable saved in checkpoint instead.c                 ~    | j                   t        j                  j                  k(  rt	        d      | j
                  S )a  Returns the number of times `PreemptionCheckpointHandler.run` is called.

    DEPRECATED: user should track total steps themselves, as this API provides
    little expressivity gain but could easily be misused and incurs extra
    synchronization cost for TPUStrategy users.

    This value tracks the number of all calls to
    `PreemptionCheckpointHandler.run` including those before the program is
    restarted and the training is restored, by saving and reading the value in
    the checkpoint. A user can compute their total number of iterations
    by `PreemptionCheckpointHandler.total_run_calls *
    number_of_steps_in_train_function`,
    while `number_of_steps_in_train_function` should be one for
    `tf.distribute.MultiWorkerMirroredStrategy` users. They can also use this
    value to infer the starting epoch and step after training restores, as shown
    in the example above.
    zNPlease create variables saved in checkpoint to keep track of steps and epochs.)rn   r	   rZ   r]   rz   r   r   s    r+   total_run_callsz+PreemptionCheckpointHandler.total_run_calls	  sB    , 	,,99	: !E F Fr-   c                 8   | j                   t        j                  j                  k(  r | j                  |g|i |S | j                   t        j                  j
                  t        j                  j                  fv r ||i |S  | j                  |g|i |S )a>
  Runs a training function with error and preemption handling.

    This function handles the preemption signal from any peer in the cluster by
    saving the training progress and exiting gracefully. It will
    also broadcase any program error encountered during the execution of
    `distributed_train_function` to all workers so that they can raise the same
    error.

    The `distributed_train_function` argument should be a distributed train
    function (i.e., containing a call to `tf.distribute.Strategy.run`). For
    `tf.distribute.MultiWorkerMirroredStrategy` users, we recommend passing in a
    single-step `distributed_train_function` to
    `PreemptionCheckpointHandler.run` so that the checkpoint can be saved in
    time in case a preemption signal or maintenance notice is sent.

    Besides the preemption and error handling part,
    `PreemptionCheckpointHandler.run(distributed_train_function, *args,
    **kwargs)` has the same effect and output as
    `distributed_train_function(*args, **kwargs)`. `distributed_train_function`
    can return either some or no result. The following is a shortened example:

    ```python

    @tf.function
    def distributed_train_step(iterator):
      # A distributed single-step training function.

      def step_fn(inputs):
        # A per-replica single-step training function.
        x, y = inputs
        ...
        return loss

      per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
      return strategy.reduce(
          tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

    for epoch in range(preemption_handler.total_run_calls // STEPS_PER_EPOCH,
                       EPOCHS_TO_RUN):
      iterator = iter(multi_worker_dataset)
      total_loss = 0.0
      num_batches = 0

      for step in range(preemption_handler.total_run_calls % STEPS_PER_EPOCH,
                        STEPS_PER_EPOCH):
        total_loss += preemption_handler.run(distributed_train_step)
        num_batches += 1

      train_loss = total_loss / num_batches
      print('Epoch: %d, train_loss: %f.' %(epoch.numpy(), train_loss))

      train_accuracy.reset_states()
    ```

    Args:
      distributed_train_function: A (single-step) distributed training function.
      *args: args for `distributed_train_function`.
      **kwargs: kwargs for `distributed_train_function`.

    Raises:
      Program error encountered by any member in the cluster while executing the
      `distributed_train_function`, or any error from the program error
      propagation process.

    Returns:
      Result of running the `distributed_train_function`.
    )rn   r	   rZ   r]   _run_for_tpurt   r\   _run_for_multi_worker_mirroredr8   distributed_train_functionargskwargss       r+   runzPreemptionCheckpointHandler.run%  s    \ 	 //<<	= T9KDKFKK			,,44,,44# 
 (8880T00
$'+/5 r-   c                 F    t        j                  t                ||i |S )z?PreemptionCheckpointHandler.run implementation for TPUStrategy.preemption_key)r   check_preemptionPREEMPTION_KEYr   s       r+   r   z(PreemptionCheckpointHandler._run_for_tpu  s     ,,NK%t6v66r-   c                 `   	 | j                          t        j                         } ||i |}t        j                         |z
  }| xj                  dz  c_        | j                  || j                  z
  | j                  z  z   | _        |S # t        j
                  $ r}| j                  s~t        j                  d||       	 t        j                         j                  |j                  |j                          # t        $ r!}t        j                  d||       Y d}~ d}~ww xY w d}~ww xY w)z8PreemptionCheckpointHandler.run implementation for MWMS.r   $Propagating error to cluster: %r: %s.Ignoring error during error propagation: %r:%sN)&_check_preemption_and_maybe_checkpointr   r   r   r   OpErrorrs   ru   r|   r
   report_error_to_cluster
error_codemessager   )	r8   r   r   r   run_begin_timeresultnew_run_timer   exs	            r+   r   z:PreemptionCheckpointHandler._run_for_multi_worker_mirrored  s   
113yy{n)4:6:fYY[>1l
1!%!9!9
11
1T5F5F=G "Gd M >> ;QB	Q
//

3
3ALL!))
L   	Q
,,GR
P
P	QsB   BB
 
D-#D(8C:9D(:	D$DD(D$$D((D-c                    | j                   t        j                  j                  k(  r:	 t	        j
                         5  t        j                  t               ddd       yy| j                   t        j                  j$                  t        j                  j&                  fv ry | j(                  |i | | xj*                  dz  c_        d| _        y# 1 sw Y   zxY w# t        j                  $ rr}|j                  j                  d      rLt        j                  d       t	        j                           | j                   |i | | j#                          n Y d}~yd}~ww xY w)a$  Saves a checkpoint if a preemption signal has been made available.

    This is an alternative API for `PreemptionCheckpointHandler.run` and
    `PreemptionCheckpointHandler.watch_preemption_scope`. This method works for
    both `tf.distribute.MultiWorkerMirroredStrategy` and
    `tf.distribute.TPUStrategy`. However, **for TPUStrategy, this method will
    add a synchronization point between workers and the coordinator** and thus
    may have performance implication. If this is a concern, use the combination
    of `PreemptionCheckpointHandler.watch_preemption_scope` and
    `PreemptionCheckpointHandler.run` instead.

    ```python
    strategy = tf.distribute.TPUStrategy(tpu_cluster_resolver)
    # initialization omitted

    with strategy.scope():
      # Save in the checkpoint.
      trained_step = tf.Variable(initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='trained_step', aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)

      checkpoint_manager = tf.train.CheckpointManager(checkpoint, directory, max_to_keep=1)
      preemption_handler = tf.distribute.experimental.PreemptionCheckpointHandler(cluster_resolver, checkpoint_manager)

    while trained_step.numpy() < NUM_STEPS:
      # Train STEPS_IN_FUNCTION steps at once.
      train_multi_step_function()
      trained_step.assign_add(STEPS_IN_FUNCTION)
      preemption_handler.save_checkpoint_if_preempted()
    ```

    Args:
      *args: args for `tf.train.CheckpointManager.save()` to save checkpoint.
      **kwargs: kwargs for `tf.train.CheckpointManager.save()` to save.
    r   NC   type.googleapis.com/tensorflow.distributed_runtime.WorkerPreemption/Clearing preemption error to save checkpoint...r   r   )rn   r	   rZ   r]   r
   async_scoper   r   r   r   AbortedErrorexperimental_payloadsgetru   r|   async_clear_error_save_checkpointrp   rt   r\   r   r   r   )r8   r   r   abort_errors       r+   save_checkpoint_if_preemptedz8PreemptionCheckpointHandler.save_checkpoint_if_preempted  s9   F 	,,99	:  " 	-
!
2
2+-	- 	-& 
		,,44,,44# 
  2d114B6B
1!"d9	- 	-    ,,00R
 ,,H
I

#
#
%
$


0
0 --/  s/   C CC CC E#1A(EE#c              #     K   | j                   t        j                  j                  k(  r$	 t	        j
                         5  d ddd       yy	 d y# 1 sw Y   xY w# t        j                  $ rp}|j                  j                  d      rJt        j                  d       t	        j                          | j                          | j                          n Y d}~yd}~ww xY w# t        j                  $ r}| j                   s~t        j                  d||       	 t	        j                         j#                  |j$                  |j&                          # t(        $ r!}t        j                  d||       Y d}~ d}~ww xY w d}~ww xY ww)a  Syncs error and maybe save checkpoint for usage with TPUStrategy.

    Note: Usage with `tf.distribute.MultiWorkerMirroredStrategy` does not need
    this API.

    Example usage:

    ```python
    with preemption_checkpoint_handler.watch_preemption_scope():
      while trained_step.numpy() < NUM_STEPS:

        # distributed_train_function contains a call to strategy.run.
        loss += preemption_checkpoint_handler.run(distributed_train_function, args=(next(iterator),))
        trained_step.assign_add(STEPS_PER_TRAIN_FUNCTION)
    ```

    In this workflow, `PreemptionCheckpointHandler.run` will flag preemption
    signal received, and `watch_preemption_scope` will handle the preemption
    signal by saving a checkpoint and then either exit to restart or execute a
    user-passed `exit_fn` in `tf.distribute.experimental.TerminationConfig`. If
    no preemption signal is received during execution of ops and function inside
    the scope, `watch_preemption_scope` ensures the completion of all async op
    and function execution when exiting and will raises exceptions if async
    execution results in an error state.

    Yields:
      None
    Nr   r   r   r   )rn   r	   rZ   r]   r
   r   r   r   r   r   ru   r|   r   r   rp   r   rs   r   r   r   r   )r8   r   r   r   s       r+   watch_preemption_scopez2PreemptionCheckpointHandler.watch_preemption_scope  sE    <  5 D D Q QQ  " 	
	 	 	#	 	   ,,00R
 ,,H
I

#
#
%



!
--/    ^^ 
,,=q!
DSOO55allAIIN 	  SLLI2rRRSs   (FA  AA  FC& FAA   C#3A&CFC##F&F	9#F8EF	F E;6F;F  FF		Fc                    t         j                  j                  | j                  j                  d      j                  d       t        j                  d       | j                  t        j                  j                  k7  r%| j                  j                  | j                         t        j                         }t!        j"                         5  | j$                  r | j$                  |i | n | j&                  j(                  |i | ddd       t        j                         }t        j                  d| j&                  j*                         ||z
  | _        y# 1 sw Y   RxY w)z&Saves the checkpoint and exit program.z-PreemptionCheckpointHandler Saving Checkpointr   z:PreemptionCheckpointHandler: Starting saving a checkpoint.NzCheckpoint finished at path %s)r   r   r   rn   r   r   ru   r|   r	   rZ   r]   r   assignr   r   	monotonicr   preemption_save_contextrr   r   saver   _checkpoint_time)r8   r   r   
start_timeend_times        r+   r   z,PreemptionCheckpointHandler._save_checkpoint"  s   ::CC""799DQLLMN 5 D D Q QQ
$$T%9%9:!J		3	3	5 =	t&v&+&&++T<V<	= ~~HLL1//99;$z1D= =s   <EE#c                    | j                   t        j                  j                  k(  rt	        j
                  t               y| j                  rt        }nt        }| j                  j                         r | j                  t        | j                        k(  r | j                  |i | | j!                         r| j#                          | j%                          | j&                  rR| j(                  sFt+        j,                  t.        j0                  j3                  | j4                  j6                               t9        j:                  d       | j=                          yt9        j:                  d       d| _        | j                  j?                          yy| j@                  j                         r| j                  r8| jB                  tE        jD                         k  rt9        j:                  d       nyt        | j                  dz         }t9        j:                  d       | jF                  r"|| _        | j                  jI                          ntK        jJ                         jM                  ||       t9        j:                  d	||       | jF                  stO        jP                  | jR                  jU                         | jR                  jV                        }tY        |      D ]G  }tK        jJ                         j[                  t\         d
| d
|        t9        j:                  d|       I | j_                          yy)aE  Checkpoint if any worker has received a preemption signal.

    This function handles preemption signal reported by any worker in the
    cluster. The current implementation relies on the fact that all workers in a
    MultiWorkerMirroredStrategy training cluster have a step number difference
    maximum of 1.
    - If the signal comes from the worker itself (i.e., where this failure
    handler sits), the worker will notify all peers to checkpoint after they
    finish CURRENT_STEP+1 steps, where CURRENT_STEP is the step this worker has
    just finished. And the worker will wait for all peers to acknowledge that
    they have received its preemption signal and the final-step number before
    the worker proceeds on training the final step.
    - If the signal comes from another member in the cluster but NO final-step
    info is available, proceed on training, because it will be available after
    finishing the next step.
    - If the signal comes from some other member in the cluster, and final-step
    info is available, if the worker has not finished these steps yet, keep
    training; otherwise, checkpoint and exit with a cluster-recognized restart
    code.

    Args:
      *args: args for `tf.train.CheckpointManager.save()` to save checkpoint.
      **kwargs: kwargs for `tf.train.CheckpointManager.save()` to save.
    r   Nz7PreemptionCheckpointHandler: checkpoint saved. Exiting.z'Continue training for the grace period.Tz;Grace period almost ended. Final call to save a checkpoint!r   z5Termination caught in main thread on preempted workerz%s set to %s_z0Sigterm acknowledgement from replica %d received)0rn   r	   rZ   r]   r   r   r   r   r   r   r   r   _step_to_checkpointr#   r   r   _time_to_exitr   r   r   r   r   DeleteRecursivelyr   r    r!   r   r   ru   r|   rp   clearr   _target_time_for_terminationr   rs   r   r
   r   r   worker_countri   rx   r   rangeget_config_key_value_ACKNOWLEDGE_KEYB_setup_countdown_if_has_grace_period_and_not_already_counting_down)r8   r   r   run_count_config_keystep_to_save_atr  is          r+   r   zBPreemptionCheckpointHandler._check_preemption_and_maybe_checkpoint:  s   2  5 D D Q QQ..nM''1 4%%,,.		!	!S):):%;	;t.v.

3
3
5

<
<
>..t~~## > > H HIK
,,GI --/ ,,@
A-1$
*

(
(
.
.
0# 
<& 
	#	#	*	*	, 
	)	),,tyy{:
,,KM D--12ollJK			#2 &&**, 	../C/>	@^%9?K*77$$113$$..0, & PaOO22#$A&:%;1QC@BLLKQOP
 MMOS 
-r-   c                 <    | j                   dk  xs | j                  S )zEReturn whether to exit: exit if no grace period or grace period ends.r   )rq   r   r   s    r+   r  z)PreemptionCheckpointHandler._time_to_exit  s     * !#H(H(HHr-   c                     | j                   dkD  rA| j                  s4d}| j                  | j                   z   || j                  z  dz  z
  | _        yyy)zDSet up at the beginning of a countdown period for long grace period.r         N)rq   r   r   r   r
  )r8   buffer_factors     r+   r  z^PreemptionCheckpointHandler._setup_countdown_if_has_grace_period_and_not_already_counting_down  s^    Ad&F&F m
 
)
)D,>,>
>
$22
2Q
67 ' 'Gr-   c                 (    ~~| j                          y)z?Upload the to-be-preempted worker's id to coordination service.N)r   )r8   signumframes      r+   r   z/PreemptionCheckpointHandler._sigterm_handler_fn  s    ((*r-   c                    t        j                          j                  t              }|t        k7  r0|| _        | j
                  j                          t         dt         d| j                   }t        j                          j                  |d       t        j                  d|       | j                  dkD  rt        j                          j                  t              }|t        k7  rxt         dt         d| j                   }t        j                          j                  |d       t        j                  d|       | j
                  j                          || _        yyyy)zWatch out for step-to-save config key and acknowledge.

    All workers, including the one to be preempted, execute this function to get
    step-to-save.
    r  1zFPreemptionCheckpointHandler: %s set, preemption awareness acknowledgedr   zOPreemptionCheckpointHandler: %s acknowledged, final checkpoint timing received.N)r
   r  r   r   r  r   r   r  r   r   ru   r|   rq   r   )r8   
step_valueack_keyfinal_step_values       r+   r   z3PreemptionCheckpointHandler._watch_step_to_save_key  s8    "778NOJ
 11 ",d
$$((*#$A&<%=Qt?R?R>STgoo,,Wc:ll./68 
		a	"??,AA ";;'(*>)?qATAT@UV'
//

0
0#
>
,,,-46 
(
(
,
,
.%5$
" <	 
 ' 2r-   )NN)!r<   r=   r>   r?   r9   rw   r{   r   r   r   r   r   r   r   r   propertyr   r   do_not_generate_docsr   r   r   r   r   r   contextmanagerr   r   r   r  r  r   r   r@   r-   r+   rb   rb   P  s    sr #"&	QIf`(D 4<1
+BI JD0 dLN$$ %N 0Zx7
8C#J 8  8t20aPFI.8+
*6r-   rb   )9r?   r   r   rQ   r   r   .tensorflow.core.distributed_runtime.preemptionr   tensorflow.python.checkpointr   rf   r   r   tensorflow.python.distributer   r   -tensorflow.python.distribute.failure_handlingr	   tensorflow.python.eagerr
   tensorflow.python.frameworkr   r   r   tensorflow.python.lib.ior   tensorflow.python.opsr   tensorflow.python.platformr   r   ru   tensorflow.python.utilr   "tensorflow.python.util.deprecationr    tensorflow.python.util.tf_exportr   tensorflow.tools.docsr   r   r   r   r  r   r   r   r,   objectr0   rB   rI   rL   rV   r`   rb   WorkerPreemptionHandlerr@   r-   r+   <module>r2     s   
  
   R E ; > 7 : O + 3 . . , + , < 0 9 6 . - /  - $ ) - 3% 62>b b ?bN/ */ -  0 $v @RH_6& _6 I_6F 6 r-   