
    AVh)                     v    d Z ddlmZ ddlmZ ddlmZ d Zd Z	d Z
dd	Zd
 Zd Zd Zd Zd Zd Zd Zd Zy)z3Utilities for multi-worker distribution strategies.    )cluster_pb2)distribute_coordinator_context)
server_libc                     t        | t        t        j                  f      rt	        j
                  |       S t        | t        j
                        st        d      | S )a4  Makes `cluster_spec` into a `ClusterSpec` object.

  Args:
    cluster_spec: a dict, ClusterDef or ClusterSpec object specifying the
      cluster configurations.

  Returns:
    a `ClusterSpec` object.

  Raises:
    ValueError: if `cluster_spec` is not a dict or a `ClusterSpec` or a
      `ClusterDef`.
  z[`cluster_spec' should be dict or a `tf.train.ClusterSpec` or a `tf.train.ClusterDef` object)
isinstancedictr   
ClusterDefr   ClusterSpec
ValueErrorcluster_specs    ^/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/multi_worker_util.pynormalize_cluster_specr      sT     t[%;%;<=!!,//lJ$:$:;
	'( ( 
    c                 D    	 | j                  |      S # t        $ r Y yw xY w)Nr   )	num_tasksr   r   	task_types     r   
task_countr   -   s*    !!),,	 s    	c                    dt        |       } t        fd| j                  D              rt        dj	                  |             |vrt        dj	                  |            |r!|| j                  vr|dk7  rt        d|z        t        | d      dkD  rt        d	      t        | d      dkD  rt        d
      || j                  v r |t        | |      k\  rt        d||fz        yy)aM  Validates `cluster_spec`.

  It checks:
  1) task type is one of "chief", "worker", "ps", "evaluator", or not provided
     (None).
  2) whether there is such a task type as `task_type` in the `cluster_spec`. The
     only exception is `evaluator`. In other words, it is still a valid
     configuration when `task_type` is `evaluator` but it doesn't appear in
     `cluster_spec`.
  3) whether there is at most one "chief" job.
  4) whether there is at most one "evaluator" job.
  5) whether the `task_id` is smaller than the number of tasks for that
     particular `task_type`.

  Args:
    cluster_spec: a dict, `ClusterDef` or `ClusterSpec` object to be validated.
    task_type: string indicating the type of the task.
    task_id: the id of the `task_type` in this cluster.

  Raises:
    ValueError: if `cluster_spec` fails any check.
  )chiefworker	evaluatorpsNc              3   &   K   | ]  }|v 
 y w)N ).0joballowed_task_typess     r   	<genexpr>z)_validate_cluster_spec.<locals>.<genexpr>Q   s     D3&	&Ds   z\Disallowed task type found in cluster spec. Allowed types are {} and the cluster spec is {}.z4Unrecognized task_type: {}, valid task types are: {}r   z)`task_type` %r not found in cluster_spec.r      z&There must be at most one 'chief' job.z*There must be at most one 'evaluator' job.z.The `task_id` %d exceeds the maximum id of %s.N)r   anyjobsr   formatr   )r   r   task_idr   s      @r   _validate_cluster_specr&   4   s!   2 D'5,D,2C2CDD
 @@F+\A;< < ((
>EE)	+, , I\%6%66;
@9L
MMg&*
=
>>k*Q.
A
BB ,###:I4 )
8GY;OOQ Q)#r   Nc                     t               rt        j                         j                  S t	        | ||       t        |       j                         } |dk(  s|dk(  ryd| vr|dk(  r|dk(  ryy)a  Returns whether the given task is chief in the cluster.

  Since there is at most one evaluator and the evaluator itself should be
  independent of the training cluster, the evaluator job is also a chief job on
  its own.

  If this is currently running under a `_WorkerContext` of distribute
  coordinator, the arguments can be omitted as the result is already available.

  Args:
    cluster_spec: a dict, `ClusterDef` or `ClusterSpec` object specifying the
      cluster configurations.
    task_type: the task type in the cluster.
    task_id: the task id in the cluster.

  Returns:
    a boolean indicating whether the given task is chief.

  Raises:
    ValueError: if `task_type` is not in the `cluster_spec` or `task_id` exceeds
      the maximum id of the `task_type`.
  r   r   Tr   r   F)has_worker_context
dc_contextget_current_worker_contextis_chiefr&   r   as_dictr   r   r%   s      r   r+   r+   l   sm    . 002;;;y':'5==?,'Y+5 \!i8&;1	r   c                     t        |       } | j                         syt        | ||       |dk(  ryd| j                  v ryd| j                  v sJ y)aq  Return the job name for the leader of for collective ops.

  Args:
    cluster_spec: a dict, `ClusterDef` or `ClusterSpec` object specifying the
      cluster configurations.
    task_type: the task type in the cluster.
    task_id: the task id in the cluster.

  Returns:
    a string indicating the leader job name or empty string if no need to set
    leader job.
   r   r   /job:chief/replica:0/task:0r   /job:worker/replica:0/task:0)r   r,   r&   r#   r-   s      r   collective_leaderr2      sd     (5, 
			y': + !!!( 
\&&	&&	&	'r   c                     t        |       } | j                         syd| j                  v ryd| j                  v ryd| j                  v sJ y)a  Return the task name of the coordination service leader.

  Args:
    cluster_spec: a dict, `ClusterDef` or `ClusterSpec` object sxpecifying the
      cluster configurations.

  Returns:
    a string indicating the task name of the coordination service leader.
  r/   r   z/job:ps/replica:0/task:0r   r0   r   r1   )r   r,   r#   r   s    r   coordination_leaderr4      s^     (5, 
			 
\% !!!( 
\&&	&&	&	'r   c                    t        | |d       t        |       j                         } |dvrt        d|z        |dk(  rt	        | d         S t	        | j                  dg             t	        | j                  dg             z   S )z-Returns the number of workers in the cluster.r   )r%   )r   r   r   zUnexpected `task_type` %rr   r   r   )r&   r   r,   r   lengetr   s     r   worker_countr8      s    y!<'5==?, 66
09<
==+ |K())   "-.2&2( ( )r   c                     t        | ||       t        |       j                         } |dk(  ry|dk(  r|t        | j	                  dg             z   S |dk(  r|S t        d|z        )aE  Returns a unique id for the task in the `task_type`'s cluster.

  It returns an id ranging from [0, `worker_count(task_type, task_id)`).

  Note: this function assumes that "evaluate" job is in its own cluster or its
  own partition of a cluster.

  Args:
    cluster_spec: a dict, `ClusterDef` or `ClusterSpec` object to be validated.
    task_type: string indicating the type of the task.
    task_id: the id of the `task_type` in this cluster.

  Returns:
    an int indicating the unique id.

  Throws:
    ValueError: if `task_type` is not "chief", "worker" or "evaluator".
  r   r   r   r   zThere is no id for task_type %r)r&   r   r,   r6   r7   r   r-   s      r   id_in_clusterr:      sw    & y':'5==?, '(S))'26777 +N 	4y@AAr   c                  >    t        j                         j                  S )a  Returns whether the current worker should save checkpoints.

  In multi-worker training, if saving checkpoint is requested by user, or needed
  for fault-tolerance, the cluster should save checkpoint but not necessarily
  every worker in the cluster should.

  TODO(rchao): Consider generalizing this util to be `should_save_file` as there
  can be other files to save such as summary.

  Returns:
      Whether this particular worker in the cluster should save checkpoints.
  )r)   r*   should_checkpointr   r   r   should_save_checkpointr=     s     
	.	.	0	B	BBr   c                  >    t        j                         j                  S )a^  Returns whether the current worker should load checkpoints.

  In multi-worker training, if loading checkpoint is requested by user, or
  needed for fault-tolerance, the cluster should load checkpoint but not
  necessarily every worker in the cluster should.

  Returns:
      Whether this particular worker in the cluster should load checkpoints.
  )r)   r*   experimental_should_initr   r   r   should_load_checkpointr@     s     
	.	.	0	I	IIr   c                  F    t        j                         j                         S )z>Waits for other workers to reach the same call to this method.)r)   r*   wait_for_other_workersr   r   r   rB   rB   +  s    		.	.	0	G	G	IIr   c                  .    t        j                         duS )z2Returns whether a worker context has been entered.N)r)   r*   r   r   r   r(   r(   0  s    		.	.	0	<<r   )NNN)__doc__tensorflow.core.protobufr   tensorflow.python.distributer   r)   tensorflow.python.trainingr   r   r   r&   r+   r2   r4   r8   r:   r=   r@   rB   r(   r   r   r   <module>rH      sX    : 0 U 1.5Qp%P(D(:)*#BLC 
JJ
=r   