
    BVhWf                        d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
 ddl	mZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ da ej:                         ZdZdZ d Z!d Z" G d d      Z#y)z>Utilities for saving/loading Trackable objects asynchronously.    N)logging)checkpoint_context)trackable_view)device_util)context)def_function)executor)ops)	variables)metrics)base)object_identityasync_checkpoint!_create_copy_for_async_checkpointc                 0    || k  ryt        || z
  dz        S )a  Calculate the duration between start and end time.

  Args:
    start_time_seconds: The start time in seconds.
    end_time_seconds: The end time in seconds.

  Returns:
    The duration between the start and the end time. Return 0 if
    end_time_seconds < start_time_seconds.
  r   i@B )round)start_time_secondsend_time_secondss     d/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/checkpoint/async_checkpoint_helper.py_get_duration_microsecondsr   3   s'     **	 #55@	AA    c                 :   t        j                  |       j                         }d}|t        |      k  rE|rC||   |v r&|j	                  ||          |j                  |       n|dz  }|t        |      k  r|rCd }|D cg c]  } ||      r| }}||fS c c}w )a  Return the list of checkpointable trackables dependent on `root`.

  Args:
    root: The root trackable from where we get all its dependent trackables.
    exclude_set: An ObjectIdentitySet of Trackables to exclude before returning.
        Each element in `exclude_set` is a specific instance of a `Trackable`
        and appears precisely once in `TrackableView(root).descendants()`.

  Returns:
    saveable_trackables: All trackables that are saveable in `all_trackables`
        (see definition of "saveable" in `_trackable_needs_to_be_saved()`). A
        subset of `all_trackables`.
    all_trackables: All trackables returned by `TrackableView`'s `descendants()`
        after excluding `exclude_set`. A superset of `saveable_trackables`.
  )rootr      c                 ,   t        | d      r+d| j                  v sd| j                  v sd| j                  v ryt        |       j                         D ]A  }|t        j
                  u rd|j                  v sd|j                  v sd|j                  v sA y y)zReturns whether a trackable needs to be saved.

    Returns a bool to indicate whether obj's class has `_serialize_to_tensors`,
    `gather_saveables_for_checkpoint`, or `_copy_trackable_to_cpu` defined.

    Args:
      obj: A Trackable object.
    __dict___serialize_to_tensors _gather_saveables_for_checkpoint_copy_trackable_to_cpuTF)hasattrr   typemror   	Trackable)objts     r   _trackable_needs_to_be_savedz9_get_all_trackables.<locals>._trackable_needs_to_be_savedl   s     sJ
!S\\
1/3<<?%5 #Y]]_ 	
dnn	#qzz11QZZ?'1::5 r   )r   TrackableViewdescendantslendiscardpop)r   exclude_setall_trackablestrackable_indexr&   xsaveable_trackabless          r   _get_all_trackablesr1   D   s      "//T:FFH.
 /#n--+o&+5.9:) o 	#n--+$: %3 :q5a8  : : 
n	,,:s   Bc                       e Zd ZdZddZej                  d        Zd Zd Z	d Z
d Zd	 Zd
 Zed        ZddZddZddZddZddZd Zy)AsyncCheckpointHelperz"Helper class for async checkpoint.Nc                    |r8t        |t        j                        r |       n|}||d<   |j                          |t	        d      || _        || _        d| _        | j                          d| _	        d| _
        d| _        d| _        d| _        d| _        t        j                          xs d| _        t        j$                  | j"                        | _        d| _        d| _        d| _        t-        j.                  d      | _        t3        j4                  | j6                         d| _        t:        5  t<        t?        j>                         addd       y# 1 sw Y   yxY w)a  Initialize AsyncCheckpoint.

    Args:
      checkpointer_impl: The Checkpoint class to power the AsyncCheckpoint.
      root: The root object to checkpoint. `root` may be a trackable object or
        `WeakRef` of a trackable object.
      **kwargs: The keyword arguments representing the checkpointed variables.

    Raises:
      AttributeError: when checkpointer_impl is None.
    r   Nz;checkpointer_impl cannot be None for AsyncCheckpointHelper.FzCPU:0r   )maxsize) 
isinstanceweakrefref_maybe_initialize_trackableAttributeError_checkpointer_impl_checkpoint_items_checkpointcheckpointer_checkpoint_options_initialized_original_nodes_object_map_tpu_embedding_objects_saveable_trackablesr   current_default_devicecanonicalize_save_file_prefix_use_checkpoint_save_async_save_threadqueueQueue_queueatexitregister_join_async_save_thread_async_error"_END_TIME_OF_LAST_ASYNC_WRITE_LOCK_END_TIME_OF_LAST_ASYNC_WRITEtime)selfcheckpointer_implr   kwargstrackable_roots        r   __init__zAsyncCheckpointHelper.__init__   sI    !+D'++!>tvDn%fVn002  
G  0D#DD#D D  D D"&D !%D&..0;GD&33D4H4HID!D %D"D ++a(DK OOD001D 
, 4	&	.(,		%4 4 4s   4EE!c                     | j                   D ]  }	 |j                  | j                         ! | j                  D ]  }|j                           y# t        $ r!}t	        j
                  d||       Y d}~id}~ww xY w)zCopy the checkpointed variables from the accelerator to the host CPU.

    TODO(chienchunh): Get the concrete function before firstly called to avoid
                      hangining the accelerators idle during function tracing.
    
object_mapTrackable %s skipped due to: %sN)rD   r   rB   NotImplementedErrorr   warningrC   _retrieve_variables)rU   r%   etpu_embeddings       r   _copy_to_cpuz"AsyncCheckpointHelper._copy_to_cpu   s     && AA	  D,<,< =A 44 *'')* ! A91a@@As   A	A;A66A;c                 t    | j                   ! | j                  di | j                  | _         | j                   S )z3Gets or creates the underlying Checkpoint instance. )r=   r;   r<   rU   s    r   r>   z"AsyncCheckpointHelper.checkpointer   s8    000J43I3IJdr   c                 D   t        j                         | _        g | _        t        j                         }|j                  | j                                |j                  | j                         j                         t        | j                         |      \  | _	        }|D ]  }t        t        |      t              r| j                  |       dt        |      v s;|j                         }|D ]m  }|D ]f  }t!        |t"        j$                        s	 |j'                  ||      }t!        |t,        j.                        sL| j                  j1                  |       h o  | j                         j                  j3                         }t5        j6                  d|       | j                  | j                         j8                  _        | j                  D ]  }	 |j;                  | j                         ! | j                  D ]  }
|
jA                           tC        jD                  | jF                  d      | _$        | jH                  jK                          d| _&        y# t(        t*        f$ r Y }w xY w# t<        $ r!}	t5        j>                  d||	       Y d}	~	d}	~	ww xY w)	z/Initialize the async checkpoint internal state.)r   r,   get_slot_namesz0Initializing async checkpoint's save_counter: %dr[   r]   NT)targetdaemon)'r   ObjectIdentityDictionaryrB   rC   ObjectIdentitySetaddr>   save_counterr1   rD   r    r!   _TPU_EMBEDDING_ATTR_handle_tpu_embeddingdirrh   r6   r   Variableget_slotr:   KeyErrorr   r#   appendnumpyr   info_saverr   r^   r_   r`   	threadingThread_async_saverJ   startr@   )rU   r,   r-   r%   
slot_names	slot_nameoriginal_variableoriginal_slot_variablern   ra   rb   s              r   _ensure_initializedz)AsyncCheckpointHelper._ensure_initialized   sc    '??AD"$D "335KOOD%%'(OOD%%'4450C k1;-D~  G
 
a-	.""1% 
SV	#%%'
# 
	GI#1 	G/1C1CD'(zz2CY'O$ 0$..A''../EF	G
	GG2 $$&3399;LLLC .2-=-=D* && AA	  D,<,< =A 44 *'')* (...D!!#DC #H- , ! A91a@@As*   II5I2	1I2	5	J>JJc                     | j                   r4| j                   }d| _         t        j                  dt        |             |y)zMExpose the most recent error from the async saving thread to the caller.
    NzJPropagating the most recent error from the async thread before joining: %s)rQ   r   errorstr)rU   ra   s     r   _check_async_thread_errorz/AsyncCheckpointHelper._check_async_thread_error>  sB     


admm )*-a&2g r   c                 \   	 | j                   j                  dd       t        j                  d       | j                  | j                  j                          | j                          y# t        j                  $ r t        j                  d       Y ;w xY w# | j                          w xY w)a  Join the async save thread.

    The steps for terminating the async save thread:
    1). Put will succeed when the last async save event is done. Putting a false
        triggers the async save thread's while loop to end. We use put instead
        of sync because sync does not have a timeout argument.
    2). Join the async save thread. (The thread may finish before joining.)
    Fi,  )timeoutzJoining the async save thread.NzrTimeout waiting for the async save thread; terminating the thread instead. The last checkpoint may be incomeplete.)
rM   putr   rw   rJ   joinrK   Fullr   r   rf   s    r   rP   z-AsyncCheckpointHelper._join_async_save_threadH  s    	'
kkooeSo)ll34		 	 	,$$&
 $$&	 :: Pmm O PP $$&s$   AA+ +(BB BB B+c           	         t        j                  t        j                  dd            5  | j                  j                         rt        j                  d| j                         t        j                         }	 t        j                  | j                        5  t        j                         5  | j                  r5| j                         j!                  | j"                  | j$                         n5| j                         j'                  | j"                  | j$                         ddd       ddd       | j                  j-                          t        j                         }t/        j0                  t2        t5        ||             t6        5  t/        j8                  t2        t5        t:        |             |addd       | j                  j                         rddd       t        j                  d       y# 1 sw Y   xY w# 1 sw Y   xY w# t(        $ r}|| _        Y d}~d}~ww xY w# | j                  j-                          w xY w# 1 sw Y   xY w# 1 sw Y   xY w)z2The thread function for the async checkpoint save.F)enable_asyncenable_streaming_enqueuez0Starting async checkpoint save on the device: %s)optionsN	api_labelmicrosecondsz3Async save thread reached the end of the execution.)r   executor_scoper	   new_executorrM   getr   rw   rF   rT   r
   devicer   async_metrics_contextrI   r>   saverH   r?   _write	ExceptionrQ   	task_doner   AddAsyncCheckpointWriteDuration_ASYNC_CHECKPOINTr   rR   AddTrainingTimeSavedrS   )rU   async_save_start_timera   async_save_end_times       r   r{   z!AsyncCheckpointHelper._async_save\  s   				@
A 3@
 KKOOG))	+ !%			"zz$../ 
#99; 	**!!#((**D,D,D !!#**** 44 + 	
 ++


!"iik//'34I4GI	J 0 	@

&
&)5/1FHI +@
'	@S KKOO3@h LLFGE	 	
 
  	 $

	  ++


!	@ 	@]3@ 3@s   AI<HH
0A7G>'H
/H7AI,I<"I>HH

H	H	H0H+&H3+H00H33III	II'c                    t        t        |      t              rt        |j                        st        dt        |      z        |j	                  |j                  |j                  r|j                  d   nd|j                        }|| j                  |<   || j                  vr| j                  j                  |       yy)a  Handle TPUEmbedding.

    This is the only place where we populate object map in the class of
    `AsyncCheckpointHelper`. For all other checkpointable trackables, we
    populate object map using the trackable's own `_copy_trackable_to_cpu()`.

    Args:
      tpu_embedding: TPUEmbedding object to be handled.

    Raises:
      AttributeError: if the input trackable is not TPUEmbedding type.
    z#Expecting TPUEmbedding type; got %sr   N)feature_config	optimizer#pipeline_execution_with_tensor_core)r    r!   ro   callabler   r:   _feature_config_table_config$_pipeline_execution_with_tensor_corerB   rC   ru   )rU   rb   new_embeddings      r   rp   z+AsyncCheckpointHelper._handle_tpu_embedding  s     4&(;<H77E 
/$}2E
E  "CC$44&&  --a0,9,^,^ D M '4D]# D777
!!((7 8r   c                 6    | j                         j                  S )a  An integer variable numbering the checkpoint events.

    This is maintained by the underlying tf.train.Checkpoint object employed by
    AsyncCheckpoint class. The number starts at 0 and gets incremented for each
    checkpoint event.

    Returns:
      The save counter variable.
    )r>   rn   rf   s    r   rn   z"AsyncCheckpointHelper.save_counter  s     +++r   c                 &    | j                  ||      S )Save the checkpointed variables.

    Args:
      save_path: The file prefix of the checkpoint file.
      options: Optional CheckpointOption instance.

    Returns:
      The full path of the checkpoint file.
    )r   rU   	save_pathr   s      r   writezAsyncCheckpointHelper.write  s     ;;y'**r   c                 >   t        j                          }| j                  s| j                          n*| j                  j	                          | j                          | j                          t        j                          || _	        d| _
        |rt        j                  |      nd| _        | j                  rd| j                  _        | j                  j                  d       t        j                          }t        j                   t"        t%        ||             |S )a  Save the checkpointed variables.

    This method has exactly the same logic as save(), except it does not
    increment the underlying save_counter, which is done by the caller, e.g.,
    CheckpointManager.

    Args:
      save_path: The file prefix of the checkpoint file.
      options: Optional CheckpointOption instance.

    Returns:
      The full path of the checkpoint file.
    FNTr   )rT   r@   r   rM   r   rc   r   r   
async_waitrH   rI   copyr?   $experimental_enable_async_checkpointr   r   AddCheckpointWriteDurationr   r   )rU   r   r   write_start_timewrite_end_times        r   r   zAsyncCheckpointHelper._write  s     yy{
  kk
 	""$ &D %D 6=tyy1$DFKdCKKOODYY[N&&#/0@0>@A
 r   c                    t        j                          }| j                  s| j                          n*| j                  j	                          | j                          | j                          | j                         j                  j                         dz   }dj                  ||      }t        j                          || _        d| _        |rt        j                  |      nd| _        | j                   rd| j                   _        | j                  j%                  d       t        j                          }t'        j(                  t*        t-        ||             |S )r   r   z{}-{}TNFr   )rT   r@   r   rM   r   rc   r   r>   rn   rv   formatr   r   rH   rI   r   r?   r   r   r   r   r   r   )rU   r   r   save_start_timern   	full_pathsave_end_times          r   r   zAsyncCheckpointHelper.save  s    iikO 
  kk
 	""$ $$&3399;a?Ly,7I &D $D 6=tyy1$DFKdCKKOODIIKM&&#/OQ r   c                 &    | j                  ||      S )a  Restore the checkpointed variables.

    This method has exactly the same logic as restore(). This method is
    implemented only to fulfill the duty of subclassing tf.train.Checkpoint.

    Args:
      save_path: The full name of the checkpoint file to be restored.
      options: CheckpointOption instance.

    Returns:
      A load status object, which can be used to make assertions about the
      status of a checkpoint restoration. See tf.train.Checkpoint.restore()
      for more details.
    )restorer   s      r   readzAsyncCheckpointHelper.readF  s     <<	7++r   c                    |rt        j                   |      n| j                  | _        | j                  rd| j                  _        | j                  j	                          | j                         j                  || j                        }|S )a`  Restore the checkpointed variables.

    Args:
      save_path: The full name of the checkpoint file to be restored.
      options: CheckpointOption instance.

    Returns:
      A load status object, which can be used to make assertions about the
      status of a checkpoint restoration. See tf.train.Checkpoint.restore()
      for more details.
    F)r   r?   r   rM   r   r>   r   )rU   r   r   statuss       r   r   zAsyncCheckpointHelper.restoreW  sq     &		'4+C+C 	FKdC 	KK ((D4L4LMFMr   c                 b    | j                   j                          t        j                  d       y)z+Sync on any ongoing save or restore events.zSync on ongoing save/restore.N)rM   r   r   rw   rf   s    r   synczAsyncCheckpointHelper.syncq  s    KKLL01r   )N)__name__
__module____qualname____doc__rY   r   functionrc   r>   r   r   rP   r{   rp   propertyrn   r   r   r   r   r   r   re   r   r   r3   r3      s}    *G4R * *KZ'(6Hp#8J 
, 
,
+2h>@,"42r   r3   )$r   rN   r   rK   ry   rT   r7   abslr   tensorflow.python.checkpointr   r   tensorflow.python.distributer   tensorflow.python.eagerr   r   r	   tensorflow.python.frameworkr
   tensorflow.python.opsr   0tensorflow.python.saved_model.pywrap_saved_modelr   tensorflow.python.trackabler   tensorflow.python.utilr   rS   LockrR   r   ro   r   r1   r3   re   r   r   <module>r      s    E        ; 7 4 + 0 , + + D , 2 !% %3Y^^%5 " '  : B"H-Ve2 e2r   