
    AVhǱ                        d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddlm!Z! ddl"m#Z# ddl"m$Z$ ddl"m%Z& ddl'm(Z) ddl*m+Z, ddl-m.Z. ddl/m0Z0 ddl/m1Z1 dd l2m3Z3 d!Z4d"Z5d#Z6 e3d$d%g &       G d' d(e
jn                               Z8 G d) d*ejr                        Z:d+Z;d,Z<y)-zkParameter server strategy V2 class.

This is currently under development and the API is subject to change.
    N)coordination_config_pb2)cross_device_ops)device_util)distribute_lib)	input_lib)
input_util)load_context)mirrored_run)multi_worker_util)parameter_server_strategy)	ps_values)sharded_variable)values)cluster_resolver)cluster_coordinator)context)remote)config)device)ops)tensor_shape)	array_ops)resource_variable_ops)variable_scope)
tf_logging)base)
server_lib)nest)
tf_inspect)	tf_export)chiefworkerps   2   z/distribute.experimental.ParameterServerStrategyz"distribute.ParameterServerStrategy)v1c                        e Zd ZdZd
dej
                  dej                  f fdZdej                  fdZ
defdZdej
                  fd	Z xZS )ParameterServerStrategyV2a=  An multi-worker tf.distribute strategy with parameter servers.

  Parameter server training is a common data-parallel method to scale up a
  machine learning model on multiple machines. A parameter server training
  cluster consists of workers and parameter servers. Variables are created on
  parameter servers and they are read and updated by workers in each step.
  By default, workers read and update these variables independently without
  synchronizing with each other. Under this configuration, it is known as
  asynchronous training.

  In TensorFlow 2, we recommend an architecture based on central coordination
  for parameter server training. Each worker and parameter server runs a
  `tf.distribute.Server`, and on top of that, a coordinator task is responsible
  for creating resources on workers and parameter servers, dispatching
  functions, and coordinating the training. The coordinator uses a
  `tf.distribute.experimental.coordinator.ClusterCoordinator` to coordinate the
  cluster, and a `tf.distribute.experimental.ParameterServerStrategy` to define
  variables on parameter servers and computation on workers.

  For the training to work, the coordinator dispatches `tf.function`s to be
  executed on remote workers. Upon receiving requests from the coordinator, a
  worker executes the `tf.function` by reading the variables from parameter
  servers, executing the ops, and updating the variables on the parameter
  servers. Each of the worker only processes the requests from the coordinator,
  and communicates with parameter servers, without direct interactions with
  other workers in the cluster.

  As a result, failures of some workers do not prevent the cluster from
  continuing the work, and this allows the cluster to train with instances that
  can be occasionally unavailable (e.g. preemptible or spot instances). The
  coordinator and parameter servers though, must be available at all times for
  the cluster to make progress.

  Note that the coordinator is not one of the training workers. Instead, it
  creates resources such as variables and datasets, dispatches `tf.function`s,
  saves checkpoints and so on. In addition to workers, parameter servers and
  the coordinator, an optional evaluator can be run on the side that
  periodically reads the checkpoints saved by the coordinator and runs
  evaluations against each checkpoint.

  `ParameterServerStrategy` is supported with two training APIs: [Custom
  Training Loop (CTL)]
  (https://www.tensorflow.org/tutorials/distribute/custom_training)
  and [Keras Training API, also known as `Model.fit`]
  (https://www.tensorflow.org/tutorials/distribute/keras). CTL is recommended
  when users prefer to define the details of their training loop, and
  `Model.fit` is recommended when users prefer a high-level abstraction and
  handling of training.

  When using a CTL, `ParameterServerStrategy` has to work in conjunction with a
  `tf.distribute.experimental.coordinator.ClusterCoordinator` object.

  When using `Model.fit`, currently only the
  `tf.keras.utils.experimental.DatasetCreator` input type is supported.

  __Example code for coordinator__

  This section provides code snippets that are intended to be run on (the only)
  one task that is designated as the coordinator. Note that `cluster_resolver`,
  `variable_partitioner`, and `dataset_fn` arguments are explained in the
  following "Cluster setup", "Variable partitioning", and "Dataset preparation"
  sections.

  With a CTL,

  ```python
  # Prepare a strategy to use with the cluster and variable partitioning info.
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver=...,
      variable_partitioner=...)
  coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
      strategy=strategy)

  # Prepare a distribute dataset that will place datasets on the workers.
  distributed_dataset = coordinator.create_per_worker_dataset(dataset_fn=...)

  with strategy.scope():
    model = ...
    optimizer, metrics = ...  # Keras optimizer/metrics are great choices
    checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer)
    checkpoint_manager = tf.train.CheckpointManager(
        checkpoint, checkpoint_dir, max_to_keep=2)
    # `load_checkpoint` infers initial epoch from `optimizer.iterations`.
    initial_epoch = load_checkpoint(checkpoint_manager) or 0

  @tf.function
  def worker_fn(iterator):

    def replica_fn(inputs):
      batch_data, labels = inputs
      # calculate gradient, applying gradient, metrics update etc.

    strategy.run(replica_fn, args=(next(iterator),))

  for epoch in range(initial_epoch, num_epoch):
    distributed_iterator = iter(distributed_dataset)  # Reset iterator state.
    for step in range(steps_per_epoch):

      # Asynchronously schedule the `worker_fn` to be executed on an arbitrary
      # worker. This call returns immediately.
      coordinator.schedule(worker_fn, args=(distributed_iterator,))

    # `join` blocks until all scheduled `worker_fn`s finish execution. Once it
    # returns, we can read the metrics and save checkpoints as needed.
    coordinator.join()
    logging.info('Metric result: %r', metrics.result())
    train_accuracy.reset_states()
    checkpoint_manager.save()
  ```

  With `Model.fit`,

  ```python
  # Prepare a strategy to use with the cluster and variable partitioning info.
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver=...,
      variable_partitioner=...)

  # A dataset function takes a `input_context` and returns a `Dataset`
  def dataset_fn(input_context):
    dataset = tf.data.Dataset.from_tensors(...)
    return dataset.repeat().shard(...).batch(...).prefetch(...)

  # With `Model.fit`, a `DatasetCreator` needs to be used.
  input = tf.keras.utils.experimental.DatasetCreator(dataset_fn=...)

  with strategy.scope():
    model = ...  # Make sure the `Model` is created within scope.
  model.compile(optimizer="rmsprop", loss="mse", steps_per_execution=..., ...)

  # Optional callbacks to checkpoint the model, back up the progress, etc.
  callbacks = [tf.keras.callbacks.ModelCheckpoint(...), ...]

  # `steps_per_epoch` is required with `ParameterServerStrategy`.
  model.fit(input, epochs=..., steps_per_epoch=..., callbacks=callbacks)
  ```

  __Example code for worker and parameter servers__

  In addition to the coordinator, there should be tasks designated as
  "worker" or "ps". They should run the following code to start a TensorFlow
  server, waiting for coordinator's requests:

  ```python
  # Provide a `tf.distribute.cluster_resolver.ClusterResolver` that serves
  # the cluster information. See below "Cluster setup" section.
  cluster_resolver = ...

  server = tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name=cluster_resolver.task_type,
      task_index=cluster_resolver.task_id,
      protocol="grpc")

  # Blocking the process that starts a server from exiting.
  server.join()
  ```

  __Cluster setup__

  In order for the tasks in the cluster to know other tasks' addresses,
  a `tf.distribute.cluster_resolver.ClusterResolver` is required to be used
  in coordinator, worker, and ps. The
  `tf.distribute.cluster_resolver.ClusterResolver` is responsible for providing
  the cluster information, as well as the task type and id of the current task.
  See `tf.distribute.cluster_resolver.ClusterResolver` for more information.

  If `TF_CONFIG` environment variable is set, a
  `tf.distribute.cluster_resolver.TFConfigClusterResolver` should be used as
  well.

  Since there are assumptions in
  `tf.distribute.experimental.ParameterServerStrategy` around the naming of the
  task types, "chief", "ps", and "worker" should be used in the
  `tf.distribute.cluster_resolver.ClusterResolver` to refer to the coordinator,
  parameter servers, and workers, respectively.

  The following example demonstrates setting `TF_CONFIG` for the task designated
  as a parameter server (task type "ps") and index 1 (the second task), in a
  cluster with 1 chief, 2 parameter servers, and 3 workers. Note that it needs
  to be set before the use of
  `tf.distribute.cluster_resolver.TFConfigClusterResolver`.

  Example code for cluster setup:
  ```python
  os.environ['TF_CONFIG'] = '''
  {
    "cluster": {
      "chief": ["chief.example.com:2222"],
      "ps": ["ps0.example.com:2222", "ps1.example.com:2222"],
      "worker": ["worker0.example.com:2222", "worker1.example.com:2222",
                 "worker2.example.com:2222"]
    },
    "task": {
      "type": "ps",
      "index": 1
    }
  }
  '''
  ```

  If you prefer to run the same binary for all tasks, you will need to let the
  binary branch into different roles at the beginning of the program:
  ```python
  # If coordinator, create a strategy and start the training program.
  if cluster_resolver.task_type == 'chief':
    strategy = tf.distribute.experimental.ParameterServerStrategy(
        cluster_resolver)
    ...

  # If worker/ps, create a server
  elif cluster_resolver.task_type in ("worker", "ps"):
    server = tf.distribute.Server(...)
    ...
  ```
  Alternatively, you can also start a bunch of TensorFlow servers in advance and
  connect to them later. The coordinator can be in the same cluster or on any
  machine that has connectivity to workers and parameter servers. This is
  covered in our guide and tutorial.

  __Variable creation with `strategy.scope()`__

  `tf.distribute.experimental.ParameterServerStrategy` follows the
  `tf.distribute` API contract where variable creation is expected to be inside
  the context manager returned by `strategy.scope()`, in order to be correctly
  placed on parameter servers in a round-robin manner:

  ```python
  # In this example, we're assuming having 3 ps.
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver=...)
  coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
      strategy=strategy)

  # Variables should be created inside scope to be placed on parameter servers.
  # If created outside scope such as `v1` here, it would be placed on the
  # coordinator.
  v1 = tf.Variable(initial_value=0.0)

  with strategy.scope():
    v2 = tf.Variable(initial_value=1.0)
    v3 = tf.Variable(initial_value=2.0)
    v4 = tf.Variable(initial_value=3.0)
    v5 = tf.Variable(initial_value=4.0)

  # v2 through v5 are created in scope and are distributed on parameter servers.
  # Default placement is round-robin but the order should not be relied on.
  assert v2.device == "/job:ps/replica:0/task:0/device:CPU:0"
  assert v3.device == "/job:ps/replica:0/task:1/device:CPU:0"
  assert v4.device == "/job:ps/replica:0/task:2/device:CPU:0"
  assert v5.device == "/job:ps/replica:0/task:0/device:CPU:0"
  ```

  See `distribute.Strategy.scope` for more information.

  __Variable partitioning__

  Having dedicated servers to store variables means being able to divide up, or
  "shard" the variables across the ps. Partitioning large variable among ps is a
  commonly used technique to boost training throughput and mitigate memory
  constraints. It enables parallel computations and updates on different shards
  of a variable, and often yields better load balancing across parameter
  servers. Without sharding, models with large variables (e.g, embeddings) that
  can't fit into one machine's memory would otherwise be unable to train.

  With `tf.distribute.experimental.ParameterServerStrategy`, if a
  `variable_partitioner` is provided to `__init__` and certain conditions are
  satisfied, the resulting variables created in scope are sharded across the
  parameter servers, in a round-robin fashion. The variable reference returned
  from `tf.Variable` becomes a type that serves as the container of the sharded
  variables. One can access `variables` attribute of this container for the
  actual variable components. If building model with `tf.Module` or Keras,
  the variable components are collected in the `variables` alike attributes.

  It is recommended to use size-based partitioners like
  `tf.distribute.experimental.partitioners.MinSizePartitioner` to avoid
  partitioning small variables, which could have negative impact on model
  training speed.

  ```python
  # Partition the embedding layer into 2 shards.
  variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
      min_shard_bytes=(256 << 10),
      max_shards = 2))
  strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver=...,
    variable_partitioner = variable_partitioner)
  with strategy.scope():
    embedding = tf.keras.layers.Embedding(input_dim=1024, output_dim=1024)
  assert len(embedding.variables) == 2
  assert isinstance(embedding.variables[0], tf.Variable)
  assert isinstance(embedding.variables[1], tf.Variable)
  assert embedding.variables[0].shape == (512, 1024)
  assert embedding.variables[1].shape == (512, 1024)
  ```

  The sharded variable container can be converted to a `Tensor` via
  `tf.convert_to_tensor`. This means the container can be directly used in most
  Python Ops where such `Tensor` conversion automatically happens. For example,
  in the above code snippet, `x * self.w` would implicitly apply the said tensor
  conversion. Note that such conversion can be expensive, as the variable
  components need to be transferred from multiple parameter servers to where
  the value is used.

  `tf.nn.embedding_lookup` on the other hand doesn't apply the tensor
  conversion, and performs parallel lookups on the variable components instead.
  This is crucial to scale up embedding lookups when the embedding table
  variable is large.

  When a partitioned variable is saved to a `SavedModel`, it will be saved as if
  it is one single variable. This improves serving efficiency by eliminating
  a number of Ops that handle the partiton aspects.

  Known limitations of variable partitioning:

  * Number of partitions must not change across Checkpoint saving/loading.

  * After saving partitioned variables to a SavedModel, the SavedModel can't be
    loaded via `tf.saved_model.load`.

  * Partition variable doesn't directly work with `tf.GradientTape`, please use
    the `variables` attributes to get the actual variable components and use
    them in gradient APIs instead.

  __Dataset preparation__

  With `tf.distribute.experimental.ParameterServerStrategy`, a dataset is
  created in each of the workers to be used for training. This is done by
  creating a `dataset_fn` that takes no argument and returns a
  `tf.data.Dataset`, and passing the `dataset_fn` into
  `tf.distribute.experimental.coordinator.
  ClusterCoordinator.create_per_worker_dataset`. We recommend the dataset to be
  shuffled and repeated to have the examples run through the training as evenly
  as possible.

  ```python
  def dataset_fn():
    filenames = ...
    dataset = tf.data.Dataset.from_tensor_slices(filenames)

    # Dataset is recommended to be shuffled, and repeated.
    return dataset.shuffle(buffer_size=...).repeat().batch(batch_size=...)

  coordinator =
      tf.distribute.experimental.coordinator.ClusterCoordinator(strategy=...)
  distributed_dataset = coordinator.create_per_worker_dataset(dataset_fn)
  ```

  __Limitations__

  * `tf.distribute.experimental.ParameterServerStrategy` in TF2 is experimental,
  and the API is subject to further changes.

  * When using `Model.fit`, `tf.distribute.experimental.ParameterServerStrategy`
  must be used with a `tf.keras.utils.experimental.DatasetCreator`, and
  `steps_per_epoch` must be specified.
  r   variable_partitionerc                    || _         | j                  |       d| _        t        j                  d|j                                t        j                  d      r| j                  |j                                | j                  d       t        | ||      | _        t        t        | ;  | j                         t        j                   j#                  d      j%                  d       d| _        d	| _        d| _        y)
a  Initializes the TF2 parameter server strategy.

    This initializes the `tf.distribute.experimental.ParameterServerStrategy`
    object to be ready for use with
    `tf.distribute.experimental.coordinator.ClusterCoordinator`.

    Args:
      cluster_resolver: a `tf.distribute.cluster_resolver.ClusterResolver`
        object.
      variable_partitioner:
        a `distribute.experimental.partitioners.Partitioner` that specifies
        how to partition variables. If `None`, variables will not be
        partitioned.

        * Predefined partitioners in `tf.distribute.experimental.partitioners`
        can be used for this argument. A commonly used partitioner is
        `MinSizePartitioner(min_shard_bytes = 256 << 10, max_shards = num_ps)`,
        which allocates at least 256K per shard, and each ps gets at most one
        shard.

        * `variable_partitioner` will be called for each variable created under
        strategy `scope` to instruct how the variable should be partitioned.
        Variables that have only one partition along the partitioning axis
        (i.e., no need for partition) will be created as a normal `tf.Variable`.

        * Only the first / outermost axis partitioning is supported.

        * Div partition strategy is used to partition variables. Assuming we
        assign consecutive integer ids along the first axis of a variable, then
        ids are assigned to shards in a contiguous manner, while attempting to
        keep each shard size identical. If the ids do not evenly divide the
        number of shards, each of the first several shards will be assigned one
        more id. For instance, a variable whose first dimension is 13 has 13
        ids, and they are split across 5 shards as:
        `[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10], [11, 12]]`.

        * Variables created under `strategy.extended.colocate_vars_with` will
        not be partitioned.
    NzY`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: %s"TF_PSS_ENABLE_COORDINATION_SERVICEr!   )coordinator_nameV2ParameterServerStrategyTF)_cluster_resolver_verify_args_and_config_cluster_coordinatorlogginginfocluster_specosgetenv_configure_coordination_service_connect_to_cluster!ParameterServerStrategyV2Extended	_extendedsuperr(   __init__r   distribution_strategy_gaugeget_cellset_should_use_with_coordinator_canonicalize_devices _is_parameter_server_strategy_v2)selfr   r)   	__class__s      i/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/parameter_server_strategy_v2.pyr<   z"ParameterServerStrategyV2.__init__  s    R .D  !12 $DLL	 !1!>!>!@B 
yy56
**+;+H+H+JKg66t=M7KMDN	
#T3DNNC..77=AA!#(,D%!&D,0D)    r4   c           	      f   t        j                          j                  ddg}g }|D ]F  }||j                  v s|j                  t	        j
                  ||j                  |                   H t        j                          j                  dt        j                  |      t        dz  d       y y )Nr"   r#   )name	num_tasks
standalonei  T)service_typeservice_leaderheartbeat_timeout_in_ms"allow_new_incarnation_to_reconnect)r   coordination_servicejobsappendr   CoordinatedJobrI   configure_coordination_servicer   coordination_leader_HEARTBEAT_TIMEOUT_SECS)rC   r4   coordinated_jobscoordinated_job_configjobs        rE   r7   z9ParameterServerStrategyV2._configure_coordination_service  s    --5"D)!! :#,###
 
'
'%44(22379:: oo66#*>>"9D"@-1 7 3 6rF   r,   c                 4   |dv rt        d      | j                  j                         }t        |j	                         j                  dd            | _        t        |j	                         j                  dd            | _        t        j                         }t        | j                        D ]  }|j                  d|dd|z  g        t        | j                        D ]  }|j                  d|dd|z  g        d	t        j                  d
<   dt        j                  d<   t        j                  d| j                   j"                  |       t%        j&                  ||| j                  j(                  |       t*        j,                  j/                  d      j1                  | j                         t*        j,                  j/                  d      j1                  | j                         t3        j4                  t6               t3        j8                          y )N)r"   r#   z0coordinator name should not be 'worker' or 'ps'.r"    r#   z/job:psz/job:%sz/job:workerFalse(TF_ENABLE_EAGER_CLIENT_STREAMING_ENQUEUETrue%TF_PS_DISABLE_ASYNC_EXECUTOR_GLOBALLYz5%s is now connecting to cluster with cluster_spec: %r)job_nameprotocolcluster_device_filtersps_strategy_num_workersps_strategy_num_ps)
ValueErrorr/   r4   lenas_dictget_num_workers_num_psr   ClusterDeviceFiltersrangeset_device_filtersr5   environr2   r3   rD   __name__r   connect_to_cluster	rpc_layerr   #distribution_strategy_replica_gauger>   r?   r   set_server_def_retries_SET_SERVER_DEF_RETRIESensure_initialized)rC   r,   r4   device_filtersis        rE   r8   z-ParameterServerStrategyV2._connect_to_cluster  s   ++IJJ))668LL00266xDED|++-11$;<DL446N4$$% B''
A	9/?#?@BB
 4<<  B''
M9/?#?@BB >EBJJ9:
 ;ABJJ67LLH((,8
!''11-	/ 66??!##&3t'8'8#966??!c$,,/ ""#:; rF   c                 @   |j                         st        d      |j                         }t        j                  ||j                  |j
                         t        j                  |d      dk  rt        d      t        j                  |d      dk  rt        d      y )NzSCluster spec must be non-empty in `tf.distribute.cluster_resolver.ClusterResolver`.r#      zThere must be at least one ps.r"   z"There must be at least one worker.)r4   rd   r   _validate_cluster_spec	task_typetask_id
task_count)rC   r   r4   s      rE   r0   z1ParameterServerStrategyV2._verify_args_and_config5  s    ((* K L L#002L ,,&002B2J2JL ##L$7!;788##L(;a?;<< @rF   N)rn   
__module____qualname____doc__base_cluster_resolverClusterResolverr   Partitionerr<   ClusterSpecr7   strr8   r0   __classcell__rD   s   @rE   r(   r(   H   sg    eP>1'<'L'L >1dt  eA  eA >1@3:O:[:[ 3",!# ,!\=6K6[6[ =rF   r(   c                        e Zd ZdZdej
                  f fdZd Zed        Z	d Z
d Zd Zd	 Zd
 Zd Zd ZddZd Zd Zed        Zd Zd Zd Z xZS )r9   zyExtended class for ParameterServerStrategyV2.

  Please see `tf.distribute.StrategyExtended` doc for more information.
  r   c                 |   t         t        |   |       t        |j	                         j                         j                  dg             | _        t        |j	                         j                         j                  dg             | _        d| _	        || _
        d| _        d| _        | j                          t        j                  j!                  d      j#                  | j$                         t'        j(                  d      | _        d| j*                  _        d| _        t1        j2                         | _        y)	z4Initialization of ParameterServerStrategyV2Extended.r#   r"   r   Fnum_gpus_per_worker/device:CPU:0)reduce_to_deviceN)r;   r9   r<   re   r4   rf   rg   ri   rh   _variable_count_variable_partitioner_used_with_coordinator_being_scheduled_set_num_gpusr   rq   r>   r?   _num_gpus_per_workercross_device_ops_libReductionToOneDevice_cross_device_opsrA   _allow_run_without_coordinator	threadingLock_coordinator_creation_lock)rC   container_strategyr   r)   rD   s       rE   r<   z*ParameterServerStrategyV2Extended.__init__N  s    

+T;<NO'446>>@DDT2NODL%%'//155hCD D!5D #(D!D66??	c$
#
#$ 2FF(D 49D0*/D'&/nn&6D#rF   c                    t        j                  d      }i }|D ]v  }t        j                  j	                  |j
                        }|j                  dk(  s<|j                  dk(  sL|j                  d d       }|j                  |d      dz   ||<   x d}|j                         D ]  \  }}|dkD  r||k7  rt        d      |} || _        t        j                  d| j                          y )NGPUr"   )device_typedevice_indexr   rx   z$Mismatched number of GPUs per workerzNumber of GPUs on workers: )r   list_logical_devices	tf_device
DeviceSpecfrom_stringrH   r   rX   replacerg   itemsrd   r   r2   r3   )	rC   devicesper_worker_gpusdd_specjob_specnum_gpus_counts	            rE   r   z/ParameterServerStrategyV2Extended._set_num_gpusq  s    ))%0GO I##//7f			u	$x)?>>d>F$3$7$7!$Dq$H!I H#))+ 5	A%8+?@@h
 !)DLL.t/H/H.IJKrF   c                 "    | j                   xs dS )Nrx   )r   rC   s    rE   _num_replicas_in_syncz7ParameterServerStrategyV2Extended._num_replicas_in_sync  s    $$))rF   c                     |j                  dt        j                  j                         fd} j                  dkD  rt        j                  j                  t        j                  j
                  t        j                  j                  t        j                  j                  fvrt        dz   dz   |d   z         |S fd}|S )Naggregationc                       di | }t        j                  |      }t        j                  j                         |      }|S )zCreate an AggregatingVariable.rZ   )r   CachingVariableAggregatingVariable_container_strategy)kwargsv	wrapped_vwrappedr   next_creatorrC   s       rE   var_creatorzJParameterServerStrategyV2Extended._create_var_creator.<locals>.var_creator  sI     
 
 a++A.i--

"
"
$ig nrF   rx   z#Invalid variable aggregation mode: z for variable: rH   c                  >     di | }t        j                  |      S )NrZ   )r   r   )r   r   r   s     rE   variable_creator_single_replicaz^ParameterServerStrategyV2Extended._create_var_creator.<locals>.variable_creator_single_replica  s!    "6"((++rF   )	popvsVariableAggregationNONEr   SUMMEANONLY_FIRST_REPLICArd   )rC   r   r   r   r   r   s   ``   @rE   _create_var_creatorz5ParameterServerStrategyV2Extended._create_var_creator  s    **]B,B,B,G,GHK !!A%	

 
 
%
%

 
 
$
$

 
 
%
%

 
 
3
3	 
 1  Vn
 	
 , -,rF   c                 L    t        j                  | j                         |fi |S )z9Create an unsynced, unaggregated variable on each worker.)r   PerWorkerVariabler   )rC   r   r   s      rE   _create_per_worker_variablez=ParameterServerStrategyV2Extended._create_per_worker_variable  s+    &&  "L4: rF   c           	      j   |j                  dd      r(t        j                  d        | j                  |fi |S  | j                  |fi |}d|v r|d   }t        j                  d      5  t        j                  |      5   |di |}t        j                  d|j                  |j                  |d   j                         |cddd       cddd       S | j                   | j                  |fi |S |j                  dd      |j                  dd      }|j                  d	d      |j                  d
d      W |di |}t        |t        j                         st#        d      |t#        d      d }t%        j&                  ||      t)              r|	d        s0t        j*                  |      j,                  }j                  nt/        j0                        j2                  dk(  r | j                  |fi |S | j                  |      }	|	r|	d   dk(  st5        d |	dd D              rt#        d|	z        |	d   dk(  r | j                  |fi |S t7        |	d   d         }	d   |	z  }
d   |	z  }g t9        |	      D ]B  dk(  rj;                  d       |
dz
  |k  rdndz   }j;                  dz
     |z          D j;                  d          fdg }t9        |	      D ]\  dz         z
  fdd z   |d	<   fd|d
<   dj=                        |d<   |j;                   | j                  |fi |       ^ t?        j@                  |      }|S # 1 sw Y   nxY w	 ddd       # 1 sw Y   xY w)a  Implements StrategyExtendedV2._create_variable.

    Creates a `Variable` or a `ShardedVariable`. A `ShardedVariable` will be
    created if satisfying all the following criteria:
      1. `self._variable_partitioner` results in more than one partition on the
         first axis.
      2. variable's rank is greater than 0.
      3. variable is not colocated with another variable.
    Otherwise a `Variable` will be created.

    Args:
      next_creator: See `variable_scope.variable_creator_scope`; the next
        creator in the chain.
      **kwargs: Passed through to the next creator.

    Returns:
      A `Variable` or `ShardedVariable`.
    per_worker_variableFzCreating per worker variablecolocate_withNz<Creating variable (name:%s, shape:%r) that colocates with %srH   dtypeshapeinitial_valuezIt looks like you are using `ParameterServerStrategy` with a `variable_partitioner`, and trying to create a variable without specifying `initial_value`. This is not allowed. Please specify the `initial_value`.zIt looks like you are trying to load a `SavedModel` using `tf.saved_model.load` within a `ParameterServerStrategy` scope, but the `SavedModel` is missing shape or dtype information.c                 @    d|v r|d   } t        j                  | |      S )Npartition_shape)r   zeros)r   r   r   s      rE   initializerzGParameterServerStrategyV2Extended._create_variable.<locals>.initializer  s&    &(,-E.
.rF   )r   r   )r   r   c              3   &   K   | ]	  }|d k7    yw)rx   NrZ   ).0r   s     rE   	<genexpr>zEParameterServerStrategyV2Extended._create_variable.<locals>.<genexpr>  s     2!qAv2s   rx   zpvariable_partitioner must return a list/tuple whose elements are 1 besides the first element (non-zero), got: %rc                    sVt        j                  t         j                  t        z  | dk(  xr 	j	                         t
        kD         |    | dz       S | dz      |    z
  f	dd  z   }|    fdt        	dd        z  z   }t        j                        }d|j                  vrd|j                  vrr	  ||      }|j                  |k(  r|S t        j                  t         j                  t        z  | dk(  xr 	j	                         t
        kD         ||    | dz       S  t        j                  t!        j"                  |      |            S # t        t        f$ r
         }Y w xY w)Nr   rx   )r   
shard_info)r   partition_offset)r   offset)r   )r2   log_ifWARN_INEFFICIENT_INIT_WARNINGnum_elements_LARGE_VARIABLE_NUM_ELEMENTSre   r   getfullargspecargs
kwonlyargs	TypeErrorrd   r   	trackable	ShardInfor   as_shape)
shard_indexr   r   arg_specvalueinit_from_fnr   rH   offsetsr   s
        rE   init_shard_fnzIParameterServerStrategyV2Extended._create_variable.<locals>.init_shard_fn+  s   LL%,1 D""$'CC		
 W[1GK!O4LMM
+/
"W[%9
9
)o "+.04#eABi.3HH**=9h
hmm
+("5"55	"-@P% ;;/), ..ll'$.Q F$$&)EE	 w{+gkAo.FG
G  **"++O<'
 	
+ :& 	"  /%		"s   9
E E43E4c                              S r}   rZ   )rv   r   s   rE   <lambda>zDParameterServerStrategyV2Extended._create_variable.<locals>.<lambda>`  s    a(8 rF   z
{}/part_{}rZ   )!r   r2   r3   r   r   r   r   r   debugrH   r   r   _create_variable_round_robinrg   
isinstancer   UninitializedVariablerd   	functoolspartialcallableconvert_to_tensorr   r   r   rankanyminrk   rQ   formatr   ShardedVariable)rC   r   r   r   r   varr   r   r   num_partitionsr   extraprev_shard_sizevar_listresultrv   r   r   r   rH   r   r   s                  @@@@@@@rE   _create_variablez2ParameterServerStrategyV2Extended._create_variable  s   & zz'/ll12-T--lEfEE*$**<B6BK& _-m ::d 	}- 	%f%#
--Lhhii_%**	 	 		 	 !!).T..{EfEE::fd#DJJw%EJJw%EJJ5M

 
 a0FFG$
 	
 =EMJ
 	
	/
 "))+U%P M*L%-l#om ++MGm!!e!!e##E*ezzQ.T..{EfEE//e5/IN!!2~ab122;=KL 
 aA.T..{EfEE *E!H5N8~%D!H~%E G>" 9	
aqq1uu}!!<wq1u~789 NN580
 0
d H>" P Q'!*46qrBfWo 8f_		%,,T15voo7d77NvNOP --h7FMq	 	 		 	 	s%   3N(	AN	N(N	N((N2c           	         t        j                  d d      5  t        j                  d| j                  | j                  z  z        5   |di |}t        j                  d      rt        j                  nt        j                  } |d|j                  |j                  | j                  | j                  z         | xj                  dz  c_        |cd d d        cd d d        S # 1 sw Y   nxY w	 d d d        y # 1 sw Y   y xY w)NT)ignore_existingz/job:ps/task:%d/device:CPU:0!TF_PSS_VERBOSE_VARIABLE_PLACEMENTzECreating variable (name:%s, shape:%r) on /job:ps/task:%d/device:CPU:0rx   rZ   )r   r   r   r   ri   r5   r6   r2   r3   r   rH   r   )rC   r   r   r   
log_methods        rE   r   z>ParameterServerStrategyV2Extended._create_variable_round_robinh  s     
		4	6  ::
(D,@,@4<<,O
P  $V$ yy<= LL 	
 	+HHII!!DLL0	
 	!#      s$   0C;B
C%	C;%C.	*C;;Dc                 .     j                   5   j                         j                  s$t        j                   j                                d d d         fd} fd}t        j                  d|      t        j                  d|      gS # 1 sw Y   ?xY w)N)strategyc                      t        j                         r)t        j                  j	                          fd      S t        j
                  j	                          fd      S )Nc                        i S r}   rZ   r   r   r   s   rE   r   zcParameterServerStrategyV2Extended._resource_creator_scope.<locals>.lookup_creator.<locals>.<lambda>      d0Mf0M rF   c                        i S r}   rZ   r
  s   rE   r   zcParameterServerStrategyV2Extended._resource_creator_scope.<locals>.lookup_creator.<locals>.<lambda>  r  rF   )r	   in_load_contextr   RestoredDistributedTabler   DistributedTabler   r   r   rC   s   ```rE   lookup_creatorzQParameterServerStrategyV2Extended._resource_creator_scope.<locals>.lookup_creator  sU    		%	%	'11$$&(M
 	
 ))$$&(M
 	
rF   c                 \     t        j                  j                          fd      S )Nc                        i S r}   rZ   r
  s   rE   r   zlParameterServerStrategyV2Extended._resource_creator_scope.<locals>.restored_lookup_creator.<locals>.<lambda>  s    lD.KF.K rF   )r   r  r   r  s   ```rE   restored_lookup_creatorzZParameterServerStrategyV2Extended._resource_creator_scope.<locals>.restored_lookup_creator  s%    //

"
"
$&K rF   StaticHashTableRestoredStaticHashTable)r   r   r1   r   ClusterCoordinatorr   resource_creator_scope)rC   r  r  s   `  rE   _resource_creator_scopez9ParameterServerStrategyV2Extended._resource_creator_scope  s    		(	( 
%%'<<..--/	

 
 	""#4nE""%'>	
 ?
 
s   ?BBc                 L    | j                   s| j                  st        d      y y )Na   `tf.distribute.experimental.ParameterServerStrategy` must be used with `tf.distribute.experimental.coordinator.ClusterCoordinator` in a custom training loop. If you are using `Model.fit`, please supply a dataset function directly to a `tf.keras.utils.experimental.DatasetCreator` instead.)r   r   NotImplementedErrorr   s    rE   %_assert_used_with_cluster_coordinatorzGParameterServerStrategyV2Extended._assert_used_with_cluster_coordinator  s1    ''33B  4 (rF   c                 b    | j                   s#| j                  st        j                  d       y y y )Na  A `tf.distribute.experimental.ParameterServerStrategy` method is invoked without using `ClusterCoordinator.schedule`. If you are not tracing a tf.function, this method is possibly executed on the coordinator, which can be slow. To properly dispatch functions to run on workers, methods like `run` or `reduce` should be used within a function passed to `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule`.)r   r   r2   warningr   s    rE   ._assert_being_scheduled_by_cluster_coordinatorzPParameterServerStrategyV2Extended._assert_being_scheduled_by_cluster_coordinator  s-      )L)Loo* *M rF   c                 N    d| j                   ff}t        j                  |d      S )Nr   F)canonicalize_devices)worker_devicesr   InputWorkers)rC   optionsinput_workers_devicess      rE   _input_workers_with_optionsz=ParameterServerStrategyV2Extended._input_workers_with_options  s.    -t/B/BCE!!E rF   c           	          | j                         }t        j                  ||| j                         | j                  |t        j                               S )N)num_replicas_in_syncr$  build)r&  r   get_distributed_datasetr   r   r   inside_function)rC   datasetr$  r%  s       rE    _experimental_distribute_datasetzBParameterServerStrategyV2Extended._experimental_distribute_dataset  sP     <<>
 --  "!77!!# rF   c           	          d}d}t        j                  ||| j                        }t        j                  || j                  |      |g| j                         |t        j                               S )Nr   rx   )num_input_pipelinesinput_pipeline_idr(  )r$  r)  )	r   InputContextr   r   &get_distributed_datasets_from_functionr&  r   r   r+  )rC   
dataset_fnr$  input_pipeline_id_in_syncnum_input_pipelines_in_syncinput_contexts         rE   "_distribute_datasets_from_functionzDParameterServerStrategyV2Extended._distribute_datasets_from_function  su     !""#"//73!77M <<((1	  "!!# rF   c                 f    | j                   }|dkD  rt        d t        |      D              }|S d}|S )Nr   c              3   (   K   | ]
  }d |fz    yw)z/device:GPU:%dNrZ   )r   rv   s     rE   r   zCParameterServerStrategyV2Extended.worker_devices.<locals>.<genexpr>  s     O!.!5Os   )r   )r   tuplerk   )rC   r   compute_devicess      rE   r"  z0ParameterServerStrategyV2Extended.worker_devices  s=    ((H!|OuXOOo  +orF   c                 n    | j                          t        j                  | j                         |||      S r}   )r  r
   call_for_each_replicar   )rC   fnr   r   s       rE   _call_for_each_replicaz8ParameterServerStrategyV2Extended._call_for_each_replica  s3    779--  "Bf rF   c                     | j                          t        j                         xs | j                  xs d}t        j                  |      }| j                  | j                  |||            d   }|S )Nr   r   )r  r   current_default_device!canonicalize_without_job_and_task_local_results	reduce_to)rC   	reduce_opr   dstdestinationsr   s         rE   _reducez)ParameterServerStrategyV2Extended._reduce  sk    779




J4#7#7
J?C@@EL  y%6	F MrF   c                 b      j                           fd}t        j                  ||      S )Nc                 x    t        | t        j                        rj                  j	                  |       S | S )N)rH  )r   r   DistributedValuesr   reduce)xrH  rF  rC   s    rE   
get_valuesz@ParameterServerStrategyV2Extended._reduce_to.<locals>.get_values  s?    	Av//	0%%,,q| - 
 	
 hrF   )r  r   map_structure)rC   rF  r   rH  r$  rO  s   `` `  rE   
_reduce_toz,ParameterServerStrategyV2Extended._reduce_to  s)    779 j%00rF   r}   )rn   r~   r   r   r   r   r<   r   propertyr   r   r   r  r   r  r  r  r&  r-  r7  r"  r?  rI  rQ  r   r   s   @rE   r9   r9   F  s    
!7 .==!7FL& * *!-Fun2&P
2  
1rF   r9   a  Large variable %s is partitioned but not initialized in a memory-efficient way. On each shard, the full value is first being created and then sliced into smaller values. To reduce the memory footprint, explicitly specify `dtype` and `shape` when creating variables, and use `tf.initializers` to initialize the variable. Note that some initializers (e.g., orthogonal) don't support memory-efficient initialization and there is not much you can do here.g    eA)=r   r   r5   r   $tensorflow.compiler.xla.tsl.protobufr   tensorflow.python.distributer   r   r   r   r   r   r	   r
   r   r   r   r   r   -tensorflow.python.distribute.cluster_resolverr   r   (tensorflow.python.distribute.coordinatorr   tensorflow.python.eagerr   r   tensorflow.python.frameworkr   r   r   r   r   tensorflow.python.opsr   r   r   r   tensorflow.python.platformr   r2   tensorflow.python.trackabler   r   tensorflow.python.trainingr   tensorflow.python.utilr   r    tensorflow.python.util.tf_exportr    ALLOWED_TASK_TYPESrU   rs   Strategyr(   ParameterServerStrategyExtendedr9   r   r   rZ   rF   rE   <module>rb     s   
  	  H Q 4 7 2 3 5 5 : B 2 9 / c H + * . ; + 4 + 7 6 < 9 1 ' - 6 /      5(	
v= 7 7 v=
v=rO1==O1jM   # rF   