
    BVhQ                     B   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ  edg       G d dej4                               Z G d dej8                        Zy)z"Synchronize replicas for training.    )distribute_lib)indexed_slices)ops)tensor)	array_ops)control_flow_ops)data_flow_ops)	state_ops)variable_v1)	variables)
tf_logging)	optimizer)queue_runner)session_manager)session_run_hook)deprecation)	tf_exportztrain.SyncReplicasOptimizer)v1c                        e Zd ZdZ ej
                  ddd      	 	 	 	 	 d fd	       Zd ZddZd	 Z	d
 Z
d Zd ZddZddZ xZS )SyncReplicasOptimizera  Class to synchronize, aggregate gradients and pass them to the optimizer.

  This class is deprecated. For synchronous training, please use [Distribution
  Strategies](https://github.com/tensorflow/tensorflow/tree/master/tensorflow/contrib/distribute).

  In a typical asynchronous training environment, it's common to have some
  stale gradients. For example, with a N-replica asynchronous training,
  gradients will be applied to the variables N times independently. Depending
  on each replica's training speed, some gradients might be calculated from
  copies of the variable from several steps back (N-1 steps on average). This
  optimizer avoids stale gradients by collecting gradients from all replicas,
  averaging them, then applying them to the variables in one shot, after
  which replicas can fetch the new variables and continue.

  The following accumulators/queue are created:

  * N `gradient accumulators`, one per variable to train. Gradients are pushed
    to them and the chief worker will wait until enough gradients are collected
    and then average them before applying to variables. The accumulator will
    drop all stale gradients (more details in the accumulator op).
  * 1 `token` queue where the optimizer pushes the new global_step value after
    all variables are updated.

  The following local variable is created:
  * `sync_rep_local_step`, one per replica. Compared against the global_step in
    each accumulator to check for staleness of the gradients.

  The optimizer adds nodes to the graph to collect gradients and pause the
  trainers until variables are updated.
  For the Parameter Server job:

  1. An accumulator is created for each variable, and each replica pushes the
     gradients into the accumulators instead of directly applying them to the
     variables.
  2. Each accumulator averages once enough gradients (replicas_to_aggregate)
     have been accumulated.
  3. Apply the averaged gradients to the variables.
  4. Only after all variables have been updated, increment the global step.
  5. Only after step 4, pushes `global_step` in the `token_queue`, once for
     each worker replica. The workers can now fetch the global step, use it to
     update its local_step variable and start the next batch. Please note that
     some workers can consume multiple minibatches, while some may not consume
     even one. This is because each worker fetches minibatches as long as
     a token exists. If one worker is stuck for some reason and does not
     consume a token, another worker can use it.

  For the replicas:

  1. Start a step: fetch variables and compute gradients.
  2. Once the gradients have been computed, push them into gradient
     accumulators. Each accumulator will check the staleness and drop the stale.
  3. After pushing all the gradients, dequeue an updated value of global_step
     from the token queue and record that step to its local_step variable. Note
     that this is effectively a barrier.
  4. Start the next batch.

  ### Usage

  ```python
  # Create any optimizer to update the variables, say a simple SGD:
  opt = GradientDescentOptimizer(learning_rate=0.1)

  # Wrap the optimizer with sync_replicas_optimizer with 50 replicas: at each
  # step the optimizer collects 50 gradients before applying to variables.
  # Note that if you want to have 2 backup replicas, you can change
  # total_num_replicas=52 and make sure this number matches how many physical
  # replicas you started in your job.
  opt = tf.compat.v1.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=50,
                                 total_num_replicas=50)

  # Some models have startup_delays to help stabilize the model but when using
  # sync_replicas training, set it to 0.

  # Now you can call `minimize()` or `compute_gradients()` and
  # `apply_gradients()` normally
  training_op = opt.minimize(total_loss, global_step=self.global_step)


  # You can create the hook which handles initialization and queues.
  sync_replicas_hook = opt.make_session_run_hook(is_chief)
  ```

  In the training program, every worker will run the train_op as if not
  synchronized.

  ```python
  with training.MonitoredTrainingSession(
      master=workers[worker_id].target, is_chief=is_chief,
      hooks=[sync_replicas_hook]) as mon_sess:
    while not mon_sess.should_stop():
      mon_sess.run(training_op)
  ```
  NzThe `SyncReplicaOptimizer` class is deprecated. For synchronous training, please use [Distribution Strategies](https://github.com/tensorflow/tensorflow/tree/master/tensorflow/contrib/distribute).T)	warn_oncec                    ||}t         t        |   ||       t        j                  d||       || _        || _        d| _        || _        || _	        || _
        t        ||      | _        d| _        d| _        d| _        g | _        y)a  Construct a sync_replicas optimizer.

    Args:
      opt: The actual optimizer that will be used to compute and apply the
        gradients. Must be one of the Optimizer classes.
      replicas_to_aggregate: number of replicas to aggregate for each variable
        update.
      total_num_replicas: Total number of tasks/workers/replicas, could be
        different from replicas_to_aggregate.
        If total_num_replicas > replicas_to_aggregate: it is backup_replicas +
        replicas_to_aggregate.
        If total_num_replicas < replicas_to_aggregate: Replicas compute
        multiple batches per update to variables.
      variable_averages: Optional `ExponentialMovingAverage` object, used to
        maintain moving averages for the variables passed in
        `variables_to_average`.
      variables_to_average: a list of variables that need to be averaged. Only
        needed if variable_averages is passed in.
      use_locking: If True use locks for update operation.
      name: string. Optional name of the returned operation.
    Nz?SyncReplicasV2: replicas_to_aggregate=%s; total_num_replicas=%sF)superr   __init__logginginfo_opt_replicas_to_aggregate_gradients_applied_variable_averages_variables_to_average_total_num_replicasmax_tokens_per_step_global_step_sync_token_queue_chief_queue_runner_accumulator_list)	selfoptreplicas_to_aggregatetotal_num_replicasvariable_averagesvariables_to_averageuse_lockingname	__class__s	           b/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/training/sync_replicas_optimizer.pyr   zSyncReplicasOptimizer.__init__   s    D !0	
/TBLLI13 DI"7D#D/D!5D1D 24IJDD!D  $D
  D    c                 :     | j                   j                  |i |S )a`  Compute gradients of "loss" for the variables in "var_list".

    This simply wraps the compute_gradients() from the real optimizer. The
    gradients will be aggregated in the apply_gradients() so that user can
    modify the gradients like clipping with per replica global norm if needed.
    The global norm with aggregated gradients can be bad as one replica's huge
    gradients can hurt the gradients from other replicas.

    Args:
      *args: Arguments for compute_gradients().
      **kwargs: Keyword arguments for compute_gradients().

    Returns:
      A list of (gradient, variable) pairs.
    )r   compute_gradientsr)   argskwargss      r2   r5   z'SyncReplicasOptimizer.compute_gradients   s       '499&&777r3   c           
      V   |st        d      |t        d      || _        g }g }g }t        j                         }t	        j
                         }|j                  j                  |      5  t        j                  ddt        j                  j                  g|j                  j                  d      | _        ddd       t!        j"                  | j                  |      | _        | j$                  g}	t'        j(                  t'        j*                               | _        t        j.                  d| j0                        5  |D ]  \  }
}|j3                  |       t        j4                  |j4                        5  |
|j3                  d       	 ddd       Ut7        |
t8        j:                        rt=        j>                  |
j                  |jA                         |jB                  dz   	      }|j3                  |jE                  |
| j                  
             |j3                  |jG                  | jH                               nt7        |
tJ        jL                        st        d      t=        jN                  |
j                  d|jB                  dz   	      }|j3                  |jQ                  |
| j                  
             |j3                  |jS                  | jH                               | jT                  j3                  ||j4                  f       ddd        tW        ||      }t        j4                  |j4                        5  t        j.                  d      5  | jX                  j[                  ||      }ddd       ddd       t        j4                  |j4                        5  t        j.                  d      5  t=        j\                  d|j                  j                  ddd      }|| _/        ddd       ddd       t        j4                  |j4                        5  t        j.                  d      5  t        j`                  |      5  jc                         }ddd       t!        j"                  | j                        }t        j`                  g      5  te        jf                  | jh                  g|      }jk                  |f      }ddd       | jl                  bt        j`                  g      5  t        j.                  d      5  | jl                  jo                  | jp                        }ddd       ddd       ts        jt                  g      | _;        ddd       ddd       | jT                  D ]E  \  }}t        j4                  |      5  |	j3                  |jy                  |d             ddd       G t        jz                  |	 | _>        d| _?        cddd       S # 1 sw Y   @xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   3xY w# 1 sw Y   8xY w# 1 sw Y   xY w# 1 sw Y   |xY w# 1 sw Y   #xY w# 1 sw Y   (xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   xY w# 1 sw Y   yxY w)a(  Apply gradients to variables.

    This contains most of the synchronization implementation and also wraps the
    apply_gradients() from the real optimizer.

    Args:
      grads_and_vars: List of (gradient, variable) pairs as returned by
        compute_gradients().
      global_step: Optional Variable to increment by one after the
        variables have been updated.
      name: Optional name for the returned operation.  Default to the
        name passed to the Optimizer constructor.

    Returns:
      train_op: The op to dequeue a token so the replicas can exit this batch
      and start the next one. This is executed by each replica.

    Raises:
      ValueError: If the grads_and_vars is empty.
      ValueError: If global step is not provided, the staleness cannot be
        checked.
    z!Must supply at least one variableNz*Global step is required to check stalenessr   Fsync_rep_local_step)initial_value	trainablecollectionsdtyper0   z/grad_accum)shapeshared_name)
local_stepzUnknown grad type!  sync_token_q)shapesr0   r@   SetGlobalStepr0   T)@
ValueErrorr%   r   no_opr   get_strategyextendedcolocate_vars_withr   
VariableV1r   	GraphKeysLOCAL_VARIABLESr>   
base_dtype_local_stepr
   assignlocal_step_init_opr   report_uninitialized_variablesglobal_variablesready_for_local_init_op
name_scope_nameappenddevice
isinstancer   Tensorr	   ConditionalAccumulator	get_shaper0   
apply_grad	take_gradr   r   IndexedSlicesSparseConditionalAccumulatorapply_indexed_slices_gradtake_indexed_slices_gradr(   zipr   apply_gradients	FIFOQueuer&   control_dependenciesdequeuer   fillr$   enqueue_manyr    applyr!   r   QueueRunnerr'   set_global_stepgroupchief_init_opr   )r)   grads_and_varsglobal_stepr0   	train_opsaggregated_gradvar_listlocal_anchordistribution_strategychief_init_opsgradvar
grad_accumaggregated_grads_and_vars	update_opsync_token_queuetokentrain_optokenssync_opaccumdevs                         r2   rg   z%SyncReplicasOptimizer.apply_gradients   s   . :;;CDD#DIOH $))+L*779		'	'	:	:<	H &$//}}445!!,,$&d& (..t/?/?MD--.N#,#K#K""$$&D  
djj	) H% B)$ZZ

# 	B\""4(		B 	B
 $.&==

mmoHH}46J Z22!1!1 3 3 4"":#7#7++$- . dN$@$@A344&CC

"#((]2JLJZAA!1!1 B 3 4"":#F#F++$- . 
 
 
'
'SZZ(@
A1	B 	BB8 #&ox"@ ::k(() ;3>>"+= ;II--.G.9;	; ;
 ::k(() 23>>"+= 2##B$/$5$5$@$@+-)70>	@ 	 "22 2 ::k(() )3>>"+= )%%i0 	-"**,%	-##D$4$4e<%%yk2 	= >>4#8#8"9;G&$116)<'		= "".''	2 ,CNN24F ,--33**,G, , $0#;#;wi$) #) )& .. 6*%ZZ_ 	6


##O $ 56	6 	66
 ,11NDd $dQH H& & 	B 	B:; ; ; ;
2 2 2 2	- 	-	= 	=, , , ,) ) ) )(	6 	6EH HsF  +AW6>:Z8X	ZE?X6ZX!X>X'Z-X76X*9X7'Z(Z>Y8Y	%>Y8#4Y	*Y8Y+	&Y=Y+	$Y8)Z1/Z #Z)Z6X XZXXX'	"Z*X4/X77Y	<ZY	Y8YY8Y(#Y+	+Y50Y88Z=ZZ	
ZZZZ(c                 L    | j                   du rt        d      | j                  S )a  Returns the QueueRunner for the chief to execute.

    This includes the operations to synchronize replicas: aggregate gradients,
    apply to variables, increment global step, insert tokens to token queue.

    Note that this can only be called after calling apply_gradients() which
    actually generates this queuerunner.

    Returns:
      A `QueueRunner` for chief to execute.

    Raises:
      ValueError: If this is called before apply_gradients().
    Fz)Should be called after apply_gradients().)r   rI   r'   r)   s    r2   get_chief_queue_runnerz,SyncReplicasOptimizer.get_chief_queue_runnerT  s*     %'BCC###r3   c                 :     | j                   j                  |i |S )a=  Return a slot named "name" created for "var" by the Optimizer.

    This simply wraps the get_slot() from the actual optimizer.

    Args:
      *args: Arguments for get_slot().
      **kwargs: Keyword arguments for get_slot().

    Returns:
      The `Variable` for the slot if it was created, `None` otherwise.
    )r   get_slotr6   s      r2   r   zSyncReplicasOptimizer.get_sloth  s      499t.v..r3   c                 6    | j                   j                         S )zFetches a list of optimizer variables in the default graph.

    This wraps `variables()` from the actual optimizer. It does not include
    the `SyncReplicasOptimizer`'s local step.

    Returns:
      A list of variables.
    )r   r   r   s    r2   r   zSyncReplicasOptimizer.variablesv  s     99  r3   c                 :     | j                   j                  |i |S )a  Return a list of the names of slots created by the `Optimizer`.

    This simply wraps the get_slot_names() from the actual optimizer.

    Args:
      *args: Arguments for get_slot().
      **kwargs: Keyword arguments for get_slot().

    Returns:
      A list of strings.
    )r   get_slot_namesr6   s      r2   r   z$SyncReplicasOptimizer.get_slot_names  s      $499##T4V44r3   c                 >   | j                   du rt        d      | j                  | j                  z
  }|dk(  r| j                  }n||k  rt        d||fz        |dkD  rt	        j
                  | j                  j
                        5  t	        j                  d      5  t        j                  |g| j                        }| j                  j                  |f      }ddd       ddd       S t        j                  d	      }|S # 1 sw Y   +xY w# 1 sw Y   S xY w)
a  Returns the op to fill the sync_token_queue with the tokens.

    This is supposed to be executed in the beginning of the chief/sync thread
    so that even if the total_num_replicas is less than replicas_to_aggregate,
    the model can still proceed as the replicas can compute multiple steps per
    variable update. Make sure:
    `num_tokens >= replicas_to_aggregate - total_num_replicas`.

    Args:
      num_tokens: Number of tokens to add to the queue.

    Returns:
      An op for the chief/sync replica to fill the token queue.

    Raises:
      ValueError: If this is called before apply_gradients().
      ValueError: If num_tokens are smaller than replicas_to_aggregate -
        total_num_replicas.
    Fz>get_init_tokens_op() should be called after apply_gradients().rD   zBToo few tokens to finish the first step: %d (given) vs %d (needed)r   rC   Nno_init_tokensrH   )r   rI   r   r"   r   r[   r%   rX   r   rk   r&   rl   r   rJ   )r)   
num_tokenstokens_neededr   init_tokenss        r2   get_init_tokens_opz(SyncReplicasOptimizer.get_init_tokens_op  s%   ( %'
JL L //$2J2JJMR..j	m	#
N}
%&' ' A~::d''../ E1C Ed.?.?@,,996)DE E  %**0@AkE E E s$   D>DDD	DDc                     t        | ||      S )zECreates a hook to handle SyncReplicasHook ops such as initialization.)_SyncReplicasOptimizerHook)r)   is_chiefr   s      r2   make_session_run_hookz+SyncReplicasOptimizer.make_session_run_hook  s    %dHjAAr3   )NNNFsync_replicas)NN)rD   )__name__
__module____qualname____doc__r   
deprecatedr   r5   rg   r   r   r   r   r   r   __classcell__)r1   s   @r2   r   r   )   sw    \| ;
 J 	 #'!%$( #5 
5 n8${z$(/	!5'RBr3   r   c                   "    e Zd ZdZd Zd Zd Zy)r   z>A SessionRunHook handles ops related to SyncReplicasOptimizer.c                 .    || _         || _        || _        y)a  Creates hook to handle SyncReplicasOptimizer initialization ops.

    Args:
      sync_optimizer: `SyncReplicasOptimizer` which this hook will initialize.
      is_chief: `Bool`, whether is this a chief replica or not.
      num_tokens: Number of tokens to add to the queue.
    N)_sync_optimizer	_is_chief_num_tokens)r)   sync_optimizerr   r   s       r2   r   z#_SyncReplicasOptimizerHook.__init__  s     *DDN!Dr3   c                    | j                   j                  du rt        d      | j                  r| j                   j                  | _        | j                   j                  | _        | j                   j                         | _	        | j                   j                  | j                        | _        y | j                   j                  | _        | j                   j                  | _        d | _	        d | _        y )NFzLSyncReplicasOptimizer.apply_gradient should be called before using the hook.)r   r   rI   r   rq   _local_init_oprW   _ready_for_local_init_opr   	_q_runnerr   r   _init_tokens_oprT   r   s    r2   beginz _SyncReplicasOptimizerHook.begin  s    ..%7  ~~ 00>>d



6
6 #++BBDdn!11DD


d !00CCd



6
6 #dn!dr3   c                 v   t        j                  | j                  |d      \  }}|s%t        d| j                  j
                  d|      |j                  | j                         | j                  |j                  | j                         | j                   | j                  j                  ||dd       yy)z.Runs SyncReplicasOptimizer initialization ops.z8Model is not ready for SyncReplicasOptimizer local init.zXInit operations did not make model ready for SyncReplicasOptimizer local_init. Init op: z	, error: NT)coorddaemonstart)
r   _readyr   RuntimeErrorr   r0   runr   r   create_threads)r)   sessionr   local_init_successmsgs        r2   after_create_sessionz/_SyncReplicasOptimizerHook.after_create_session  s    -44%%wBD  ##S*+ + KK##$'kk$&&'~~!
nn##
t4 $ 9 "r3   N)r   r   r   r   r   r   r   rB   r3   r2   r   r     s    F
""&9r3   r   N)r   tensorflow.python.distributer   tensorflow.python.frameworkr   r   r   tensorflow.python.opsr   r   r	   r
   r   r   tensorflow.python.platformr   r   tensorflow.python.trainingr   r   r   r   tensorflow.python.utilr    tensorflow.python.util.tf_exportr   	Optimizerr   SessionRunHookr   rB   r3   r2   <module>r      s     ) 7 6 + . + 2 / + - + < 0 3 6 7 . 6 ,-.PBI// PB /PBf19!1!@!@ 19r3   