
    BVh7                     
   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddl
mZ dd	l
mZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ  G d de	j>                        Z  G d de       Z! eddg        G d dejD                               Z# G d dejH                        Z% G d d e&      Z'd! Z( G d" d#ejR                        Z*d$ Z+ G d% d&e'      Z,d' Z- G d( d)e#      Z.y)*zImportant value classes relevant to `ClusterCoordinator`.

This is currently under development and the API is subject to change.
    N)dataset_ops)ExternalStatePolicy)	input_lib)remote_value)context)def_function)function)composite_tensor)errors)ops)	type_spec)	array_ops)gen_dataset_ops)gen_experimental_dataset_ops)variable_scope)nest)	tf_exportc                   R    e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zd Zy)RemoteValueImplz Implementation of `RemoteValue`.c                     || _         || _        d| _        d| _        t	        j
                         | _        d| _        d| _        t	        j                         | _
        t        j                  j                  | _        y)zInitializes a `RemoteValueImpl`.

    Args:
      closure: The closure from which the `RemoteValue` is created.
      type_spec: The type spec for this `RemoteValue` which is used to trace
        functions that take this `RemoteValue` as input.
    NF)_closure
_type_spec_values_has_fetched_to_local	threadingLock_has_fetched_to_local_lock_fetched_tensors_errorEvent_status_available_eventr   RemoteValueStatus	NOT_READY_status)selfclosurer   s      _/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/coordinator/values.py__init__zRemoteValueImpl.__init__.   sa     DMDODL!&D&/nn&6D# DDK#,??#4D 11;;DL    c                     t         j                  j                  | _        d | _        || _        | j                  j                          y N)r   r"   ABORTEDr$   r   r   r!   setr%   errors     r'   _set_abortedzRemoteValueImpl._set_aborted@   s7    1199DLDLDK 	  $$&r)   c                 n    | j                   j                          | j                  j                  |       y r+   )r!   clearr   
execute_on)r%   workers     r'   _rebuild_onzRemoteValueImpl._rebuild_onH   s&      &&(MMV$r)   c                     t         j                  j                  | _        || _        d | _        | j                  j                          y r+   r   r"   READYr$   r   r   r!   r-   )r%   tensorss     r'   _set_valueszRemoteValueImpl._set_valuesM   s5    1177DLDLDK  $$&r)   c                     t         j                  j                  | _        d | _        || _        | j                  j                          y r+   r7   r.   s     r'   
_set_errorzRemoteValueImpl._set_errorS   s5    1177DLDLDK  $$&r)   c                 N    | j                   j                          | j                  S r+   )r!   waitr   r%   s    r'   _get_valueszRemoteValueImpl._get_valuesY   s      %%'<<r)   c                 N    | j                   j                          | j                  S r+   )r!   r>   r   r?   s    r'   
_get_errorzRemoteValueImpl._get_error]   s      %%';;r)   c                     | j                   j                          | j                  t        j                  j
                  u rt        j                  d d d      | j                  | j                  y )NzFThe corresponding function is aborted. Please reschedule the function.)	r!   r>   r$   r   r"   r,   r   CancelledErrorr   r?   s    r'   _wait_and_maybe_errorz%RemoteValueImpl._wait_and_maybe_errora   sd      %%'|||55===!!
  {{KK r)   c                 L    t        j                  d | j                               S )Nc                 >    t        | d      r| j                         S | S )Nnumpy)hasattrrH   )xs    r'   <lambda>z'RemoteValueImpl.fetch.<locals>.<lambda>o   s    wq'2!'')  r)   )r   map_structuregetr?   s    r'   fetchzRemoteValueImpl.fetchk   s$     9488:G Gr)   c                 h    d }d }| j                    t        j                  || j                         }|S )Nc                     t        | t        j                        r| S t        j                  dt        j                         j                  z        5  t        j                  |       cddd       S # 1 sw Y   yxY w)z,Copy a remote tensor to local (coordinator).z/job:%sN)

isinstancer   DistributedIteratorr   devicer   get_server_defjob_namer   identity)composite_tensor_objs    r'   copy_tensorz3RemoteValueImpl._copy_to_local.<locals>.copy_tensorr   s_    	()*G*G	H $#::i'"8"8":"C"CCD 8!!"678 8 8s   A00A9)r   r   rL   )r%   rX   fetched_results      r'   _copy_to_localzRemoteValueImpl._copy_to_localq   s6    	8 N|| ))+t||Dnr)   c                     | j                          | j                  5  | j                  s| j                         | _        d| _        d d d        | j                  S # 1 sw Y   | j                  S xY w)NT)rE   r   r   rZ   r   r?   s    r'   rM   zRemoteValueImpl.get   se     		(	( *'' $ 3 3 5%)"*
    *
    s   )AA.N)__name__
__module____qualname____doc__r(   r0   r5   r:   r<   r@   rB   rE   rN   rZ   rM    r)   r'   r   r   +   s=    (<$'%
''G&!r)   r   c                       e Zd ZdZd Zy)RemoteVariablez<A RemoteValue that represents a mutable per-worker variable.c                 B    | j                          | j                         S )zERetrieve value with no caching to ensure we get the up-to-date value.)rE   rZ   r?   s    r'   rM   zRemoteVariable.get   s       r)   N)r\   r]   r^   r_   rM   r`   r)   r'   rb   rb      s
    D!r)   rb   z3distribute.experimental.coordinator.PerWorkerValuesz%distribute.coordinator.PerWorkerValue)v1c                   &    e Zd ZdZd Zed        Zy)PerWorkerValuesa9  A container that holds a list of values, one value per worker.

  `tf.distribute.experimental.coordinator.PerWorkerValues` contains a collection
  of values, where each of the values is located on its corresponding worker,
  and upon being used as one of the `args` or `kwargs` of
  `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule()`, the
  value specific to a worker will be passed into the function being executed at
  that corresponding worker.

  Currently, the only supported path to create an object of
  `tf.distribute.experimental.coordinator.PerWorkerValues` is through calling
  `iter` on a `ClusterCoordinator.create_per_worker_dataset`-returned
  distributed dataset instance. The mechanism to create a custom
  `tf.distribute.experimental.coordinator.PerWorkerValues` is not yet supported.
  c                 |    |D ]'  }t        |t        j                        rt        d       t	        |      | _        y )Nz2`PerWorkerValues` should only take `RemoteValue`s.)rQ   r   RemoteValueAssertionErrortupler   )r%   valuesvs      r'   r(   zPerWorkerValues.__init__   sB     B<334@B 	BB =DLr)   c                 Z    t        | j                  d   j                  t        |             S )Nr   )PerWorkerValuesTypeSpecr   r   typer?   s    r'   r   zPerWorkerValues._type_spec   s'    "Q""T
 r)   N)r\   r]   r^   r_   r(   propertyr   r`   r)   r'   rf   rf      s      !  r)   rf   c                   N    e Zd ZdZd Zd Zed        Zd Zed        Z	d Z
d Zy	)
rn   z_TypeSpec for PerWorkerValues.

  It only support tracing a function using a PerWorkerValues.
  c                 (    |sJ || _         || _        y r+   )_value_spec_descendant_type)r%   
value_specdescendant_types      r'   r(   z PerWorkerValuesTypeSpec.__init__   s    :!D+Dr)   c                     | j                   fS r+   rs   r?   s    r'   
_serializez"PerWorkerValuesTypeSpec._serialize   s    r)   c                     | j                   S r+   )rt   r?   s    r'   
value_typez"PerWorkerValuesTypeSpec.value_type   s       r)   c                     t        d      )Nz1most_specific_common_supertype is not implementedNotImplementedError)r%   otherss     r'   most_specific_common_supertypez6PerWorkerValuesTypeSpec.most_specific_common_supertype   s    
;= =r)   c                     | j                   S r+   rx   r?   s    r'   _component_specsz(PerWorkerValuesTypeSpec._component_specs   s    r)   c                     | j                   S r+   rx   r%   values     r'   _to_componentsz&PerWorkerValuesTypeSpec._to_components   s    r)   c                     |S r+   r`   r   s     r'   _from_componentsz(PerWorkerValuesTypeSpec._from_components   s    Lr)   N)r\   r]   r^   r_   r(   ry   rp   r{   r   r   r   r   r`   r)   r'   rn   rn      sH    
,
 ! !=  r)   rn   c                   2    e Zd ZdZd Zd Zd Zed        Zy)#PerWorkerDatasetFromDatasetFunctionzERepresents worker-distributed datasets created from dataset function.c                    d }t        |t        j                        r/t        j                  |      5  |j                         }ddd       n[t        |t        j                        sAt        j                  |      5  t        j                  |      j                         }ddd       || _	        || _
        d| _        y# 1 sw Y   xY w# 1 sw Y   +xY w)zMakes an iterable from datasets created by the given function.

    Args:
      dataset_fn: A function that returns a `Dataset`.
      coordinator: a `ClusterCoordinator` object, used to create dataset
        resources.
    c                     t        d      )Nz2Creating variables in `dataset_fn` is not allowed.)
ValueError)next_creatorkwargss     r'   disallow_variable_creationzPPerWorkerDatasetFromDatasetFunction.__init__.<locals>.disallow_variable_creation   s    KLLr)   N)rQ   r   Functionr   variable_creator_scopeget_concrete_functiontf_functionConcreteFunctionr	   _dataset_fn_coordinator_element_spec)r%   
dataset_fncoordinatorr   s       r'   r(   z,PerWorkerDatasetFromDatasetFunction.__init__   s    M *l334001KL 8557
8 8
K$@$@A001KL O!**:6LLN
O!D#DD8 8O Os   B><$C
>C
Cc                       fd} j                   j                  |      } j                  j                  j                  }|j
                  D ]	  }||_         |S )zTrigger dataset creation on workers without creating an iterator.

    Returns:
      A PerWorkerValues object containing a tuple of RemoteValues, themselves
      containing the built Dataset for each worker
    c                  (    j                         } | S r+   )r   datasetr%   s    r'   _create_per_worker_datasetzMPerWorkerDatasetFromDatasetFunction.build.<locals>._create_per_worker_dataset   s      "gnr)   )r   _create_per_worker_resourcesr   structured_outputsr   r   )r%   r   per_worker_datasetdataset_fn_output_type_specdataset_remote_values   `    r'   buildz)PerWorkerDatasetFromDatasetFunction.build   se    
 **GG"$ #'"2"2"E"E"P"P 2 : : D(C%Dr)   c                     t        j                         rt        j                         j                  rt        d       fd} j                  j                  |      }|j                  D ]E  }t        j                   j                  j                   j                  j                        |_        G t        |j                        S )NzC__iter__() is not supported inside of tf.function or in graph mode.c                  :    j                         } t        |       S r+   )r   iterr   s    r'   _create_per_worker_iteratorzQPerWorkerDatasetFromDatasetFunction.__iter__.<locals>._create_per_worker_iterator  s      "g']r)   )r   executing_eagerlyr   get_default_graphbuilding_functionRuntimeErrorr   r   r   r   get_iterator_spec_from_datasetstrategyr   r   r   PerWorkerDistributedIterator)r%   r   per_worker_iteratoriterator_remote_values   `   r'   __iter__z,PerWorkerDatasetFromDatasetFunction.__iter__  s     %%'11
OQ Q ++HH#%
 "5!<!< P

2
2(($*:*:*M*MO &P
 ((;(C(CDDr)   c                     t        | j                  t        j                        st	        d      | j                  j
                  j                  S )zrThe type specification of an element of this dataset.

    This property is subject to change without notice.
    zR`element_spec` is not supported when the `dataset_fn` is not a `ConcreteFunction`.)rQ   r   r   r   r~   r   element_specr?   s    r'   r   z0PerWorkerDatasetFromDatasetFunction.element_spec"  sF     d&&(D(DE"# # ..;;;r)   N)	r\   r]   r^   r_   r(   r   r   rp   r   r`   r)   r'   r   r      s)    M,(E6 	< 	<r)   r   c                     | j                         } t        j                  | j                  t        j
                  j                  d      }|S )NT)external_state_policystrip_device_assignment)_apply_debug_optionsr   dataset_to_graph_v2_variant_tensorr   WARNr   )r   	graph_defs     r'   serialize_dataset_to_graphr   /  sC    ((*'11/44::"$) 
r)   c                   2     e Zd ZdZ fdZed        Z xZS )_RemoteDatasetz$Creates a dataset given a graph def.c                 d    || _         t        j                  |      }t        t        |   |       y r+   )
_elem_specged_opsdataset_from_graphsuperr   r(   )r%   r   r   variant_tensor	__class__s       r'   r(   z_RemoteDataset.__init__;  s)    "DO//	:N	.$(8r)   c                     | j                   S r+   )r   r?   s    r'   r   z_RemoteDataset.element_spec@  s    ??r)   )r\   r]   r^   r_   r(   rp   r   __classcell__r   s   @r'   r   r   8  s    ,9
  r)   r   c                     t        | |      S r+   )r   )r   r   s     r'   deserialize_dataset_from_graphr   E  s    		<	00r)   c                   "     e Zd ZdZ fdZ xZS )PerWorkerDatasetFromDatasetz>Represents worker-distributed datasets created from a dataset.c                 T   t        t        j                        rj                  t	              fd}nWt        t        j
                        rfd}n7t        t        j                        rt	              fd}nt        d      t        t        | /  ||       y)ah  Makes an iterable from datasets created by the given dataset.

    It creates a dataset_fn which deserializes a dataset from a graph under the
    hood.

    Args:
      dataset: A tf.data.Dataset, a DistributedDataset or a
        DistributedDatasetsFromFunction
      coordinator: a `ClusterCoordinator` object, used to create dataset
        resources.
    c                  X    t        j                        } j                  |        S )N)dataset_to_replace)r   r   r   )deserializedr   original_dataset
serializeds    r'   r   z8PerWorkerDatasetFromDataset.__init__.<locals>.dataset_fn\  s,    5(5576r)   c                  (     j                           S r+   )r   )r   s   r'   r   z8PerWorkerDatasetFromDataset.__init__.<locals>.dataset_fnb  s    r)   c                  0    t         j                        S r+   )r   r   )r   r   s   r'   r   z8PerWorkerDatasetFromDataset.__init__.<locals>.dataset_fnh  s    -j':N:NOOr)   zUnexpected dataset type!N)rQ   r   DistributedDataset_original_datasetr   DistributedDatasetsFromFunctionr   Datasetr   r   r   r(   )r%   r   r   r   r   r   r   s    `  @@r'   r(   z$PerWorkerDatasetFromDataset.__init__L  s     '9778 22-.>?j
 
GYFF	G 
G[00	1-g6jP 122	
%t5j+Nr)   )r\   r]   r^   r_   r(   r   r   s   @r'   r   r   I  s    F!O !Or)   r   c                 H    t        |       rt        | |      S t        | |      S )zBReturns a per-worker dataset from a dataset or a dataset function.)callabler   r   )dataset_or_dataset_fnr   s     r'   get_per_worker_datasetr   p  s.    #$./D/:< < ''<kJJr)   c                       e Zd ZdZd ZddZy)r   z.Distributed iterator for `ClusterCoordinator`.c                 "    | j                         S r+   )get_nextr?   s    r'   __next__z%PerWorkerDistributedIterator.__next__|  s    ==?r)   Nc                     t        d      )z:Returns the next input from the iterator for all replicas.zHIterating over an `AsyncDistributedIterator` is not supported right now.r}   )r%   names     r'   r   z%PerWorkerDistributedIterator.get_next  s    
 < = =r)   r+   )r\   r]   r^   r_   r   r   r`   r)   r'   r   r   y  s    6=r)   r   )/r_   r   tensorflow.python.data.opsr   "tensorflow.python.data.ops.optionsr   tensorflow.python.distributer   (tensorflow.python.distribute.coordinatorr   tensorflow.python.eagerr   r   r	   r   tensorflow.python.frameworkr
   r   r   r   type_spec_libtensorflow.python.opsr   r   r   r   r   tensorflow.python.utilr    tensorflow.python.util.tf_exportr   rh   r   rb   CompositeTensorrf   TypeSpecrn   objectr   r   DatasetSourcer   r   r   r   r   r`   r)   r'   <module>r      s   
  2 B 2 A + 0 ; 8 . + B + 1 I 0 ' 6
a!l.. a!H!_ ! @2r;&66 ;>m44 BR<& R<j
[.. 
1$O"E $ONK	=? 	=r)   