
    BVh]                     8   d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ ddlmZ dd	Zd
 Z ed       ej"                  ej$                         G d de                    Z ed       G d de             Z ed       G d de             Zy)zFCluster Resolvers are used for dynamic cluster IP/hostname resolution.    N)session)contextconfig)ops)ClusterSpec)	tf_exportc                     |r|d| S | S )Nz:// )master	rpc_layers     n/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/cluster_resolver/cluster_resolver.pyformat_master_urlr      s    !6**M    c           	         t        j                         ryt        j                         }g }|D ]\  }|j                  dk(  s|j                  dk(  r"|j                  t        j                  |j                  |j                  dd             ^ |S t        j                         j                         5  t        j                  | |      5 }|j                         }ddd       ddd       S # 1 sw Y   xY w# 1 sw Y   S xY w)z?Returns accelerator devices given a master and a configuration.CPUXLA_CPUr   r   N)r   executing_eagerlyr   list_logical_devicesdevice_typeappendr   _DeviceAttributesnamer   Graph
as_defaultSessionlist_devices)r   config_protological_devicesdevicesdss         r   get_accelerator_devicesr#   &   s     113OG M	
%	1==I#=nnW..qvvq}}aKLM N				! #??6,7 #1.."## N# ## Ns$   0C7C+C7+C4	0C77Dz+distribute.cluster_resolver.ClusterResolverc                       e Zd ZdZej
                  d        Zej
                  dd       Z	 	 	 ddZe	d        Z
e	d        Ze	d        Zej                  d	        Zej                  d
        Zy)ClusterResolvera  Abstract class for all implementations of ClusterResolvers.

  This defines the skeleton for all implementations of ClusterResolvers.
  ClusterResolvers are a way for TensorFlow to communicate with various cluster
  management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary
  information to set up distributed training.

  By letting TensorFlow communicate with these systems, we will be able to
  automatically discover and resolve IP addresses for various TensorFlow
  workers. This will eventually allow us to automatically recover from
  underlying machine failures and scale TensorFlow worker clusters up and down.

  Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver`
  subclass: In addition to these abstract methods, when task_type, task_id, and
  rpc_layer attributes are applicable, you should also implement them either as
  properties with getters or setters, or directly set the attributes
  `self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class'
  getters and setters are used. See
  `tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an
  example.

  In general, multi-client tf.distribute strategies such as
  `tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and
  task_id properties to be available in the `ClusterResolver` they are using. On
  the other hand, these concepts are not applicable in single-client strategies,
  such as `tf.distribute.experimental.TPUStrategy`, because the program is only
  expected to be run on one task, so there should not be a need to have code
  branches according to task type and task id.

  - task_type is the name of the server's current named job (e.g. 'worker',
     'ps' in a distributed parameterized training job).
  - task_id is the ordinal index of the server within the task type.
  - rpc_layer is the protocol used by TensorFlow to communicate with other
      TensorFlow servers in a distributed environment.
  c                     t               )a@  Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`.

    Returns:
      A `tf.train.ClusterSpec` representing the state of the cluster at the
      moment this function is called.

    Implementors of this function must take care in ensuring that the
    ClusterSpec returned is up-to-date at the time of calling this function.
    This usually means retrieving the information from the underlying cluster
    management system every time this function is invoked and reconstructing
    a cluster_spec, rather than attempting to cache anything.
    NotImplementedErrorselfs    r   cluster_speczClusterResolver.cluster_spec^   s     
r   Nc                     t               )ah  Retrieves the name or URL of the session master.

    Note: this is only useful for TensorFlow 1.x.

    Args:
      task_type: (Optional) The type of the TensorFlow task of the master.
      task_id: (Optional) The index of the TensorFlow task of the master.
      rpc_layer: (Optional) The RPC protocol for the given cluster.

    Returns:
      The name or URL of the session master.

    Implementors of this function must take care in ensuring that the master
    returned is up-to-date at the time to calling this function. This usually
    means retrieving the master every time this function is invoked.
    r'   )r*   	task_typetask_idr   s       r   r   zClusterResolver.mastern   s    $ 
r   c                    | j                  ||      }t        ||      }t        j                  t              }|D ]D  }|)|'d|z  }d|z  }	||j
                  vs|	|j
                  vr.||j                  xx   dz  cc<   F |S )aK  Returns the number of accelerator cores per worker.

    This returns the number of accelerator cores (such as GPUs and TPUs)
    available per worker.

    Optionally, we allow callers to specify the task_type, and task_id, for
    if they want to target a specific TensorFlow task to query
    the number of accelerators. This is to support heterogenous environments,
    where the number of accelerators cores per host is different.

    Args:
      task_type: (Optional) The type of the TensorFlow task of the machine we
        want to query.
      task_id: (Optional) The index of the TensorFlow task of the machine we
        want to query.
      config_proto: (Optional) Configuration for starting a new session to
        query how many accelerator cores it has.

    Returns:
      A map of accelerator types to number of cores.
    z/job:%sz/task:%s   )r   r#   collectionsdefaultdictintr   r   )
r*   r-   r.   r   r   r    mappingdevicejob_path	task_paths
             r   num_acceleratorsz ClusterResolver.num_accelerators   s    2 [[G,F &fl;G%%c*G '		7#6y((	6;;&)6;;*F
f  !Q&!' Nr   c                      y)a  Returns the current environment which TensorFlow is running in.

    There are two possible return values, "google" (when TensorFlow is running
    in a Google-internal environment) or an empty string (when TensorFlow is
    running elsewhere).

    If you are implementing a ClusterResolver that works in both the Google
    environment and the open-source world (for instance, a TPU ClusterResolver
    or similar), you will have to return the appropriate string depending on the
    environment, which you will have to detect.

    Otherwise, if you are implementing a ClusterResolver that will only work
    in open-source TensorFlow, you do not need to implement this property.
     r   r)   s    r   environmentzClusterResolver.environment   s      r   c                     t        | dd      S )a  Returns the task type this `ClusterResolver` indicates.

    In TensorFlow distributed environment, each job may have an applicable
    task type. Valid task types in TensorFlow include
    'chief': a worker that is designated with more responsibility,
    'worker': a regular worker for training/evaluation,
    'ps': a parameter server, or
    'evaluator': an evaluator that evaluates the checkpoints for metrics.

    See [Multi-worker configuration](
    https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration)
    for more information about 'chief' and 'worker' task type, which are most
    commonly used.

    Having access to such information is useful when user needs to run specific
    code according to task types. For example,

    ```python
    cluster_spec = tf.train.ClusterSpec({
        "ps": ["localhost:2222", "localhost:2223"],
        "worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
    })

    # SimpleClusterResolver is used here for illustration; other cluster
    # resolvers may be used for other source of task type/id.
    simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
                                            task_id=1)

    ...

    if cluster_resolver.task_type == 'worker':
      # Perform something that's only applicable on workers. This block
      # will run on this particular instance since we've specified this task to
      # be a worker in above cluster resolver.
    elif cluster_resolver.task_type == 'ps':
      # Perform something that's only applicable on parameter servers. This
      # block will not run on this particular instance.
    ```

    Returns `None` if such information is not available or is not applicable
    in the current distributed environment, such as training with
    `tf.distribute.experimental.TPUStrategy`.

    For more information, please see
    `tf.distribute.cluster_resolver.ClusterResolver`'s class doc.
    
_task_typeNgetattrr)   s    r   r-   zClusterResolver.task_type   s    ` 4t,,r   c                     t        | dd      S )a  Returns the task id this `ClusterResolver` indicates.

    In TensorFlow distributed environment, each job may have an applicable
    task id, which is the index of the instance within its task type. This is
    useful when user needs to run specific code according to task index. For
    example,

    ```python
    cluster_spec = tf.train.ClusterSpec({
        "ps": ["localhost:2222", "localhost:2223"],
        "worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
    })

    # SimpleClusterResolver is used here for illustration; other cluster
    # resolvers may be used for other source of task type/id.
    simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
                                            task_id=0)

    ...

    if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0:
      # Perform something that's only applicable on 'worker' type, id 0. This
      # block will run on this particular instance since we've specified this
      # task to be a 'worker', id 0 in above cluster resolver.
    else:
      # Perform something that's only applicable on other ids. This block will
      # not run on this particular instance.
    ```

    Returns `None` if such information is not available or is not applicable
    in the current distributed environment, such as training with
    `tf.distribute.cluster_resolver.TPUClusterResolver`.

    For more information, please see
    `tf.distribute.cluster_resolver.ClusterResolver`'s class docstring.
    _task_idNr>   r)   s    r   r.   zClusterResolver.task_id   s    L 4T**r   c                     || _         y)z=Setter of `task_type` property. See `task_type` property doc.Nr=   r*   r-   s     r   r-   zClusterResolver.task_type  s      DOr   c                     || _         y)z;Setter of `task_id` property. See `task_type` property doc.NrA   r*   r.   s     r   r.   zClusterResolver.task_id  s     DMr   NNN)__name__
__module____qualname____doc__abcabstractmethodr+   r   r8   propertyr;   r-   r.   setterr   r   r   r%   r%   7   s    "H        ( "&#$(%N  " /- /-b %+ %+N     >> r   r%   z1distribute.cluster_resolver.SimpleClusterResolverc                        e Zd ZdZ	 	 	 d fd	Zd ZddZed        Zed        Z	ej                  d        Ze	j                  d        Z	ed	        Z	 	 	 dd
Zed        Zej                  d        Z xZS )SimpleClusterResolvera  Simple implementation of ClusterResolver that accepts all attributes.

  Please see the base class for documentation of arguments of its constructor.

  It is useful if you want to specify some or all attributes.

  Usage example with `tf.distribute.Strategy`:

    ```Python
    cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
                                               "worker1.example.com:2222"]})

    # On worker 0
    cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
                                             task_id=0,
                                             num_accelerators={"GPU": 8},
                                             rpc_layer="grpc")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
        cluster_resolver=cluster_resolver)

    # On worker 1
    cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
                                             task_id=1,
                                             num_accelerators={"GPU": 8},
                                             rpc_layer="grpc")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
        cluster_resolver=cluster_resolver)
    ```
  c                     t         t        |           || _        || _        || _        || _        || _        t        |t              st        d      || _        t        |t              st        d      || _        y)z3Creates a SimpleClusterResolver from a ClusterSpec.z.cluster_spec must be a `tf.train.ClusterSpec`.zmaster must be a string.N)superrR   __init__r=   rA   _environment_num_accelerators
_rpc_layer
isinstancer   	TypeError_cluster_specstr_master)	r*   r+   r   r-   r.   r;   r8   r   	__class__s	           r   rU   zSimpleClusterResolver.__init__@  ss     

/1DODM#D-DDOlK0FGG%Dfc"011DLr   c                     | j                   S )z4Returns the ClusterSpec passed into the constructor.)r[   r)   s    r   r+   z"SimpleClusterResolver.cluster_specU  s    r   c                     |#|!| j                         j                  ||      }n| j                  }t        ||xs | j                        S )a  Returns the master address to use when creating a session.

    Note: this is only useful for TensorFlow 1.x.

    Args:
      task_type: (Optional) The type of the TensorFlow task of the master.
      task_id: (Optional) The index of the TensorFlow task of the master.
      rpc_layer: (Optional) The RPC used by distributed TensorFlow.

    Returns:
      The name or URL of the session master.

    If a task_type and task_id is given, this will override the `master`
    string passed into the initialization function.
    r   )r+   task_addressr]   r   rX   r*   r-   r.   r   r   s        r   r   zSimpleClusterResolver.masterY  sI      !4  "//	7Cf||fVy/KDOOLLr   c                     | j                   S NrC   r)   s    r   r-   zSimpleClusterResolver.task_typep      ??r   c                     | j                   S re   rF   r)   s    r   r.   zSimpleClusterResolver.task_idt  s    ==r   c                     || _         y re   rC   rD   s     r   r-   zSimpleClusterResolver.task_typex  	    DOr   c                     || _         y re   rF   rG   s     r   r.   zSimpleClusterResolver.task_id|  	    DMr   c                     | j                   S re   )rV   r)   s    r   r;   z!SimpleClusterResolver.environment  s    r   c                 <    ~~~| j                   i S | j                   S )aE  Returns the number of accelerator cores per worker.

    The SimpleClusterResolver does not do automatic detection of accelerators,
    and thus all arguments are unused and we simply return the value provided
    in the constructor.

    Args:
      task_type: Unused.
      task_id: Unused.
      config_proto: Unused.
    )rW   r*   r-   r.   r   s       r   r8   z&SimpleClusterResolver.num_accelerators  s(      	7L%i!!!r   c                     | j                   S re   rX   r)   s    r   r   zSimpleClusterResolver.rpc_layer  rf   r   c                     || _         y re   rp   r*   r   s     r   r   zSimpleClusterResolver.rpc_layer  ri   r   )r:   NNr:   NNrH   rI   rJ   rK   rL   rU   r+   r   rO   r-   r.   rP   r;   r8   r   __classcell__r^   s   @r   rR   rR      s    < GK04*M.         >>    "&#$("*      r   rR   z)distribute.cluster_resolver.UnionResolverc                        e Zd ZdZ fdZd ZddZed        Zed        Z	ej                  d        Ze	j                  d        Z	ed	        Z	 	 	 dd
Zed        Zej                  d        Z xZS )UnionClusterResolvera  Performs a union on underlying ClusterResolvers.

  This class performs a union given two or more existing ClusterResolvers. It
  merges the underlying ClusterResolvers, and returns one unified ClusterSpec
  when cluster_spec is called. The details of the merge function is
  documented in the cluster_spec function.

  For additional ClusterResolver properties such as task type, task index,
  rpc layer, environment, etc..., we will return the value from the first
  ClusterResolver in the union.

  An example to combine two cluster resolvers:

    ```Python
    cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
                                                 "worker1.example.com:2222"]})
    cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker",
                                               task_id=0,
                                               rpc_layer="grpc")

    cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222",
                                             "ps1.example.com:2222"]})
    cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps",
                                               task_id=0,
                                               rpc_layer="grpc")

    # Its task type would be "worker".
    cluster_resolver = UnionClusterResolver(cluster_resolver_0,
                                            cluster_resolver_1)
    ```

  An example to override the number of GPUs in a TFConfigClusterResolver
  instance:

    ```Python
    tf_config = TFConfigClusterResolver()
    gpu_override = SimpleClusterResolver(tf_config.cluster_spec(),
                                         num_accelerators={"GPU": 1})
    cluster_resolver = UnionResolver(gpu_override, tf_config)
    ```
  c                 X   t         t        |           |j                  dd      | _        |j                  dd      | _        |j                  dd      | _        |rt        dj                  |            |st        d      |D ]  }t        |t              rt        d       || _        y)a  Initializes a UnionClusterResolver with other ClusterResolvers.

    Args:
      *args: `ClusterResolver` objects to be unionized.
      **kwargs:
        rpc_layer - (Optional) Override value for the RPC layer used by
          TensorFlow.
        task_type - (Optional) Override value for the current task type.
        task_id - (Optional) Override value for the current task index.

    Raises:
      TypeError: If any argument is not a subclass of `ClusterResolvers`.
      ValueError: If there are no arguments passed.
    r   Nr-   r.   zUnexpected kwargs provided {!r}z)At least one ClusterResolver is required.z7All arguments must be a sub-class of `ClusterResolver.`)rT   rw   rU   poprX   r=   rA   
ValueErrorformatrY   r%   rZ   _cluster_resolvers)r*   argskwargscluster_resolverr^   s       r   rU   zUnionClusterResolver.__init__  s     

.0jjd3DOjjd3DOJJy$/DM8??GHHBCC  .(/: - . 	.. #Dr   c                 \   i }| j                   D ]p  }|j                         }|j                         }|j                         D ];  \  }}||v rt	        |t
              si ||<   !t	        |t              rg ||<   7i ||<   = r | j                   D ]  }|j                         }|j                         }|j                         D ]  \  }}t	        ||   t              r||   j                  |       .t	        |t              r)t        t        t        dt        |            |            }n|j                         }t        |      }t        ||   j                               }	|j                  |	      }
|
rt        dt!        |
      z        ||   j#                  |         t%        |      S )a  Returns a union of all the ClusterSpecs from the ClusterResolvers.

    Returns:
      A ClusterSpec containing host information merged from all the underlying
      ClusterResolvers.

    Raises:
      KeyError: If there are conflicting keys detected when merging two or
      more dictionaries, this exception is raised.

    Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the
    same job name, we will merge the list/dict of workers.

    If *all* underlying ClusterSpecs expose the set of workers as lists, we will
    concatenate the lists of workers, starting with the list of workers from
    the first ClusterResolver passed into the constructor.

    If *any* of the ClusterSpecs expose the set of workers as a dict, we will
    treat all the sets of workers as dicts (even if they are returned as lists)
    and will only merge them into a dict if there is no conflicting keys. If
    there is a conflicting key, we will raise a `KeyError`.
    r   z9Duplicate keys detected when merging two ClusterSpecs: %s)r|   r+   as_dictitemsrY   dictlistextendziprangelencopysetkeysintersectionKeyErrorreprupdater   )r*   merged_clusterr   r+   cluster_dictjob_nametasks	task_dict	task_keysmerged_keysintersected_keyss              r   r+   z!UnionClusterResolver.cluster_spec  s   0 N !33 *%224l!))+l)//1 
*/(E~%t$')N8$ t$')N8$')N8$
*	*" !33 5%224l!))+l)//1 5/(EnX.5

"
)
)%
0t$Sq#e*!5u=>I 

I )n)N8499;<+&33K@
 .045E0FG H H 
"
)
))
4+5	56 ~&&r   c                     |<|:| j                         j                  ||      }t        ||xs | j                        S | j                  d   j                  |      S )a'  Returns the master address to use when creating a session.

    This usually returns the master from the first ClusterResolver passed in,
    but you can override this by specifying the task_type and task_id.

    Note: this is only useful for TensorFlow 1.x.

    Args:
      task_type: (Optional) The type of the TensorFlow task of the master.
      task_id: (Optional) The index of the TensorFlow task of the master.
      rpc_layer: (Optional) The RPC protocol for the given cluster.

    Returns:
      The name or URL of the session master.
    r   ra   )r+   rb   r   rX   r|   r   rc   s        r   r   zUnionClusterResolver.master9  s_      !4  "//	7Cfvy'CDOODD""1%,,y,AAr   c                 P    | j                   xs | j                  d   j                  S Nr   )r=   r|   r-   r)   s    r   r-   zUnionClusterResolver.task_typeO  "    ??Bd55a8BBBr   c                 P    | j                   xs | j                  d   j                  S r   )rA   r|   r.   r)   s    r   r.   zUnionClusterResolver.task_idS  s"    ==>D33A6>>>r   c                     || _         y re   rC   rD   s     r   r-   zUnionClusterResolver.task_typeW  ri   r   c                     || _         y re   rF   rG   s     r   r.   zUnionClusterResolver.task_id[  rk   r   c                 4    | j                   d   j                  S r   )r|   r;   r)   s    r   r;   z UnionClusterResolver.environment_  s    ""1%111r   c                 B    | j                   d   j                  |||      S r   )r|   r8   rn   s       r   r8   z%UnionClusterResolver.num_acceleratorsc  s)     ""1%667L* *r   c                 P    | j                   xs | j                  d   j                  S r   )rX   r|   r   r)   s    r   r   zUnionClusterResolver.rpc_layerj  r   r   c                     || _         y re   rp   rr   s     r   r   zUnionClusterResolver.rpc_layern  ri   r   rH   rs   ru   s   @r   rw   rw     s    (T#BH'TB, C C ? ?     >>  2 2 "&#$(* C C    r   rw   re   )rL   rM   r1   sixtensorflow.python.clientr   tensorflow.python.eagerr   tensorflow.python.frameworkr   r   %tensorflow.python.training.server_libr    tensorflow.python.util.tf_exportr	   r   r#   add_metaclassABCMetaobjectr%   rR   rw   r   r   r   <module>r      s    M 
  
 , + . + = 6" 893;;df d   :dN >?~ O ~  @~ B 67M ? M  8M r   