
    AVh4                    `   d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ  ddlm!Z! ddlm"Z" ddlm#Z# ddlm$Z$ ddlm%Z% ddlm&Z& ddlm'Z' ddl(m)Z) ddl(m*Z+ ddl(m,Z, dd l(m-Z- dd!l.m/Z/ dd"l0m1Z2 ddl3mZ4 dd#l5m6Z6 dd$l7m8Z8  ejr                  d% ejt                  d&d'd()      d*d+d,      Z; ejr                  d- ejt                  d&d'd()      d.d+d,      Z<d/ Z= G d0 d1e>      Z?d2 Z@d3 ZA G d4 d5e8j                  e4j                        ZD G d6 d7e'j                        ZF G d8 d9eF      ZG G d: d;eDej                        ZI G d< d=e8j                  e4j                        ZL G d> d?eF      ZM G d@ dAeLej                        ZN G dB dCeF      ZO G dD dEeLej                        ZPdF ZQdG ZR G dH dIe>      ZS G dJ dKe'j                        ZT G dL dMeSej                        ZU	 	 d^dNZVdO ZWdP ZXdQ ZYdR ZZdS Z[ G dT dUe>      Z\dV Z]dW Z^dX Z_dY Z`dZ Zad[ Zbd\ Zcd] Zdej                  j                  eIed       ej                  j                  eNed       y)_z0Various classes representing distributed inputs.    N)py_builtins)batching)cardinality)
distribute)dataset_ops)iterator_ops)multi_device_iterator_ops)optional_ops)device_util)distribute_lib)distribute_utils)	input_ops)reduce_util)values)InputReplicationMode)context)
monitoring)composite_tensor)device)dtypes)errors)ops)sparse_tensor)tensor_shape)tensor_util)	type_spec)	array_ops)cond)math_ops)
while_loop)ragged_tensor)
tf_logging)nest)collections_abczZ/tensorflow/api/distribution_strategy/distributed_dataset_initialization_time_milliseconds         )scalegrowth_factorbucket_countzDTrack the time (in milliseconds) to initialize distributed datasets.strategyworkerszh/tensorflow/api/distribution_strategy/distributed_dataset_from_function_initialization_time_millisecondszRTrack the time (in milliseconds) to initialize distributed datasets from function.c                    |j                   }t        |j                  t        t        f      rMt        | j                  j                         || j                  j                         d|j                  d      }|S | j                  j                  r,t        j                  | j                  j                   d       t        j                  |      }|S )a  Returns an iterator spec from dataset function.

  This function constructs type spec for iterator obtained from
  iter(dataset).

  Args:
    strategy: a `tf.distribute.Strategy` object, used to run all-reduce to
        handle last partial batch.
    dataset: A tf.data.Dataset instance. If using a function that returns a
      tf.data.Dataset instance, pass dataset_fn.structured_outputs.

  Returns:
    A type_spec for iterator for dataset instance.

  NT)optionsr   enable_get_next_as_optionalz GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources)element_spec
isinstance
_type_specDistributedDatasetSpec#DistributedDatasetsFromFunctionSpecDistributedIteratorSpecextended_input_workers_with_options_container_strategyr   _num_gpus_per_workerloggingwarningr   IteratorSpec)r+   datasetoutput_element_speciterator_type_specs       V/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/input_lib.pyget_iterator_spec_from_datasetrA   N   s    "  ,,""'467 1557--/''$(*" 
 --oo334 5  &223FG	    c                   R    e Zd ZdZ	 d
dZed        Zed        Zd Zd Z	d Z
d Zy	)InputWorkerszAA 1-to-many mapping from input worker devices to compute devices.c                     || _         t        d | j                   D              | _        || _        |r"t        d | j                   D              | _        yt        d | j                   D              | _        y)aj  Initialize an `InputWorkers` object.

    Args:
      worker_device_pairs: A sequence of pairs: `(input device, a tuple of
        compute devices fed by that input device)`.
      canonicalize_devices: Whether to canonicalize devices for workers fully or
        partially. If False, it will partially canonicalize devices by removing
        job and task.
    c              3   &   K   | ]	  \  }}|  y wN ).0d_s      r@   	<genexpr>z(InputWorkers.__init__.<locals>.<genexpr>   s     &OTQq&Os   c              3   F   K   | ]  \  }}t        d  |D                yw)c              3   F   K   | ]  }t        j                  |        y wrG   r   canonicalizerI   rJ   s     r@   rL   z2InputWorkers.__init__.<locals>.<genexpr>.<genexpr>   s#       ((+    !NtuplerI   rK   fs      r@   rL   z(InputWorkers.__init__.<locals>.<genexpr>   0       1 a    1rR   c              3   F   K   | ]  \  }}t        d  |D                yw)c              3   F   K   | ]  }t        j                  |        y wrG   r   !canonicalize_without_job_and_taskrQ   s     r@   rL   z2InputWorkers.__init__.<locals>.<genexpr>.<genexpr>   s#       ==a@ rR   NrS   rU   s      r@   rL   z(InputWorkers.__init__.<locals>.<genexpr>   rW   rR   N)_worker_device_pairsrT   _input_worker_devices_canonicalize_devices_fed_devices)selfworker_device_pairscanonicalize_devicess      r@   __init__zInputWorkers.__init__}   sv     !4D!&&OT5N5N&O!OD!5D  1 // 1 1d
    1 // 1 1drB   c                 ,    t        | j                        S rG   )lenr]   r`   s    r@   num_workerszInputWorkers.num_workers   s    t))**rB   c                     | j                   S rG   )r]   rf   s    r@   worker_deviceszInputWorkers.worker_devices   s    %%%rB   c                      | j                   |   S rG   r_   )r`   worker_indexs     r@   compute_devices_for_workerz'InputWorkers.compute_devices_for_worker   s    \**rB   c                       j                   dj                   fdt        t                    D              } j                  j
                  d|dS )Nz,
c              3   N   K   | ]  }d ||   j                   |   fz    yw)z  %d %s: %sNrk   )rI   idevicesr`   s     r@   rL   z(InputWorkers.__repr__.<locals>.<genexpr>   s8      : ! *
D,=,=a,@AB :s   "%z:{
})ri   joinrangere   	__class____name__)r`   
debug_reprrq   s   ` @r@   __repr__zInputWorkers.__repr__   sI    !!G :%*3w<%8: :J ..11:>>rB   c                 2    | j                   | j                  fS rG   )r\   r^   rf   s    r@   	serializezInputWorkers.serialize   s    %%t'A'ABBrB   c                     t        |      S rG   )rD   )r`   
serializeds     r@   deserializezInputWorkers.deserialize   s    
##rB   NT)rv   
__module____qualname____doc__rc   propertyrg   ri   rm   rx   rz   r}   rH   rB   r@   rD   rD   x   sL    I %)14 + + & &+?C$rB   rD   c           	      ~   g }t        |j                  |      D ]  \  }}t        j                  |      5  |D cg c]4  }t	        j
                  |j                         t        j                        6 }}|j                  t	        j                  |d             ddd        t	        j                  |d      }| j                  j                         rB| j                  t        j                  j                   |d      }	t#        j$                  |	g       S t#        j$                  |g       S c c}w # 1 sw Y   #xY w)a7  Computes the number of replicas that have values.

  Args:
    strategy: the `tf.distribute.Strategy`.
    input_workers: the `InputWorkers`.
    optional_list: a list of lists `tf.experimental.Optional`. The values from
      each compute device grouped by the input device.

  Returns:
    A scalar Tensor.
  T)keepdimsNaxis)zipri   r   r   r   cast	has_valuer   int64append
reduce_sumr6   _in_multi_worker_modereducer   ReduceOpSUMr   reshape)
r+   input_workersoptional_listworker_has_valuesworker	optionalsvdevice_has_valuesclient_has_valuesglobal_has_valuess
             r@   _calculate_replicas_with_valuesr      s(    };;]K Afi	F	 A>G9:(--v||
4  


/$
?A	A AA ))*;dK,,.   "3$ ( @.33.33A As   D29D-2(D2-D22D<	c                 \   t        j                  |       D ]  }t        |t        j                  t
        j                  f      r:|j                  j                  dkD  sG|j                  j                         d   e y|j                  D ]  }|j                  j                         r  y  y)a,  Test if an iterator output is statically shaped.

  For sparse and ragged tensors this only tests the batch dimension.

  Args:
    element_spec: a nest structure of `tf.TypeSpec`. The element spec of the
      dataset of the iterator.

  Returns:
    True if the shape is static, false otherwise.
  r   FT)r#   flattenr1   r   SparseTensorSpecr!   RaggedTensorSpecshaperankas_list_flat_tensor_specsis_fully_defined)r0   spec	components      r@   _is_statically_shapedr      s     ll<( d}--}/M/MNP 
1	!3!3!5a!8!@.. )//1 
rB   c                   H    e Zd ZdZ	 ddZd Zd Zd Zd ZddZ	dd	Z
d
 Zy)DistributedIteratorBasez.Common implementation for all input iterators.Nc                     t        |t              sJ |j                  st        d      || _        || _        || _        || _        || _        || _	        y )Nz3Should have at least one worker for input iterator.)
r1   rD   ri   
ValueError
_iterators_input_workers	_strategy_cardinality_enable_get_next_as_optional_replica_order)r`   r   	iteratorsr+   r   r/   replica_orders          r@   rc   z DistributedIteratorBase.__init__   sW     m\222''LMMDO'DDN#D(CD%'DrB   c                 "    | j                         S rG   )__next__rf   s    r@   nextzDistributedIteratorBase.next  s    ==?rB   c                 ^    	 | j                         S # t        j                  $ r t        w xY wrG   )get_nextr   OutOfRangeErrorStopIterationrf   s    r@   r   z DistributedIteratorBase.__next__  s.    ]]_!! s    ,c                     | S rG   rH   rf   s    r@   __iter__z DistributedIteratorBase.__iter__  s    KrB   c                      j                   t        j                  k(  r-t        j                  j                   j                               S  j                   dk(  rM j                  j                  j                         s)t        j                  j                   j                        S g t         j                  j                        D ]O  \  }}t        j                   |      5  j#                   j$                  |   j'                                d d d        Q  fd} fd}t)         j                   j                        }t+        j,                  |dkD  ||d      S # 1 sw Y   xY w)Nr   c                      t        j                  d      } j                  j                  |       } t	        | j
                        }t        j                  j                  |      S NT)produce_dummy)	_get_value_or_dummyr   r   _reorder_replicas_create_per_replicar   r
   Optional
from_value)
value_listper_replicar   r`   s     r@   _create_optional_with_dummyzQDistributedIteratorBase.get_next_as_optional.<locals>._create_optional_with_dummy%  s`    &


}DBj 
			(++J7
'
DNNCk""--k::rB   c                  V    t         j                  j                   j                        S rG   )r
   r   empty_element_specrf   s   r@   _create_empty_optionalzLDistributedIteratorBase.get_next_as_optional.<locals>._create_empty_optional/  s     ""((););<<rB   Tstrict)r   cardinality_libINFINITEr
   r   r   #_get_next_no_partial_batch_handlingr   r6   r   r   r   	enumerater   ri   r   r   r   r   get_next_as_optional_listr   tf_condr   )r`   rp   r   r   r   num_replicas_with_valuesr   s   `     @r@   get_next_as_optionalz,DistributedIteratorBase.get_next_as_optional  sA    O444""--

2
2
46 6QNN##99;""((););<<Mt22AAB M	6::f MT__Q/IIKLM MM;=  ?++] < << 1$#	 'M Ms   %-E$$E-	c                 v    t        j                   j                        5  t        j                         t	        d      	 ddd        j
                  s j                  |      S g t         j                  j                        D ]O  \  }}t        j                  |      5  j                   j                  |   j                                ddd       Q t         j                   j                        } fd} fd}t!        j"                  |dkD  ||d      S # 1 sw Y   xY w# 1 sw Y   xY w)z:Returns the next input from the iterator for all replicas.Nzqnext(iterator) should be called from outside of replica_fn. e.g. strategy.run(replica_fn, args=(next(iterator),))c                      t        j                  d      } j                  j                  |       } t	        | j
                        S r   )r   r   r   r   r   r   )r   r   r`   s    r@   _value_or_dummyz9DistributedIteratorBase.get_next.<locals>._value_or_dummyN  sJ    &


}DBj 
			(++J7
 T^^<<rB   c                  $     j                         S rG   )r   rf   s   r@   _eofz.DistributedIteratorBase.get_next.<locals>._eofW  s     5577rB   r   Tr   )r   enter_or_assert_strategyr   get_replica_contextr   r   r   r   r   ri   r   r   r   r   r   r   r   r   )r`   namerp   r   r   r   r   r   s   `      @r@   r   z DistributedIteratorBase.get_next;  s0   		0	0
 4		+	+	-	9 3 4 	4 
:4 ,,55d;;Mt22AAB M	6::f MT__Q/IIKLM MM  ?++] <=8
 << 1$otDJ J?4 4M Ms    D#$-D/#D,/D8	c                    g }t        | j                  j                        D ]  \  }}|<t        j                  j                  |      }d||j                  |j                  fz  }nd }t        j                  |      5  |j                  | j                  |   j                  |             d d d         | j                  | j                  |      }t        || j                         S # 1 sw Y   xY w)Nz%s_%s_%d)r   r   ri   	tf_device
DeviceSpecfrom_stringjobtaskr   r   extendr   get_next_as_listr   r   r   r   )r`   r   replicasrp   r   rJ   new_names          r@   r   z;DistributedIteratorBase._get_next_no_partial_batch_handling_  s    Ht22AAB G	6		  ,,V4quuaff 55::f G*;;HEFG GG &''1hx88G Gs   =.C((C1	c                     t        | j                        t        |      k(  s3J dj                  t        | j                        t        |                   | j                  D cg c]  }||   	 c}S c c}w )Nz.replica order size ({}) != replicas size ({})!)re   r   format)r`   r   rp   s      r@   r   z)DistributedIteratorBase._reorder_replicasp  sp    t""#s(  7>>D #h- 
 "&!4!45AHQK555s   #A2rG   )rv   r   r   r   rc   r   r   r   r   r   r   r   rH   rB   r@   r   r      s5    6 ((*X"JH9"6rB   r   c                   d    e Zd ZdZg dZej                  ddfdZd Zd Z	d Z
d Zd	 Zd
 Zd Zy)!DistributedDatasetAndIteratorSpeczVCommon Type specification for `DistributedDataset and DistributedDatasetsFromFunction.)r   r   r   r   r   _optionsr^   Nc                     t        |t              rt        d      || _        || _        || _        || _        || _        || _        | j
                  rt        | j
                  dd      | _
        nd| _
        || _        y )NzBDistributedIteratorSpec does not have support for deserialization.r^   T)r1   rT   NotImplementedErrorr   r   r   r   r   r   getattrr^   r   )r`   r   r0   r+   r.   r   r/   r   s           r@   rc   z*DistributedDatasetAndIteratorSpec.__init__  s     -' !7 8 8 *d'ddn%d*Ed'dm	%,T^^-Dd&L" &*")drB   c                     | j                   j                         | j                  t        | j                        t        | j
                        fS rG   )r   rz   r   idr   r   rf   s    r@   
_serializez,DistributedDatasetAndIteratorSpec._serialize  s?     ))+T-?-?t~~4== 13 3rB   c                 2    t        dt        |        d      )Nz-Deserialization is currently unsupported for .)r   typerf   s    r@   _deserializez.DistributedDatasetAndIteratorSpec._deserialize  s!    

7T
|1EG GrB   c                 0   t        |       t        |      urt        d| d|      | j                  j                         |j                  j                         k7  rt        d| d|      | j                  |j                  urt        d| d|      y)zReturns the most specific TypeSpec compatible with `self` and `other`.

    Args:
      other: A `TypeSpec`.

    Raises:
      ValueError: If there is no TypeSpec that is compatible with both `self`
        and `other`.
    z$No TypeSpec is compatible with both z and z+_input_workers is not compatible with both z3tf.distribute strategy is not compatible with both N)r   r   r   rz   r   )r`   others     r@   sanity_check_typez3DistributedDatasetAndIteratorSpec.sanity_check_type  s     Dze$e% & &$$&%*>*>*H*H*JJ#'0 1 1~~U__,#'0 1 1 -rB   c                 X   	 | j                  |       t        j                  | j                  |j                         t        j                  | j                        }t        j                  |j                        }t        d t        ||      D              S # t        t
        f$ r Y yw xY w)zXReturns True if `self` is subtype of `other`.

    Args:
      other: A `TypeSpec`.
    Fc              3   D   K   | ]  \  }}|j                  |        y wrG   )is_subtype_of)rI   self_elementother_elements      r@   rL   zBDistributedDatasetAndIteratorSpec.is_subtype_of.<locals>.<genexpr>  s)      Q)\= 	""=1Qs    )	r   r#   assert_same_structurer   	TypeErrorr   r   allr   )r`   r   self_elementsother_elementss       r@   r   z/DistributedDatasetAndIteratorSpec.is_subtype_of  s    
U#
  !3!3U5H5HI LL!3!34M\\%"5"56N Q-0-OQ Q Q z" s   ;B B)(B)c                    	 |D ]=  }| j                  |       t        j                  | j                  |j                         ? 	 t        j                  | j                        }|D cg c]!  }t        j                  |j                        # }}dgt        |      z  }t        |      D ]1  \  }}|j                  |D cg c]  }||   	 c}      ||<   ||   1 y t        j                  | j                  |      }	 t        |       | j                  |	| j                  | j                  | j                  | j                         S # t        t
        f$ r Y yw xY wc c}w c c}w )zReturns the most specific supertype of `self` and `others`.

    Args:
      others: A Sequence of `TypeSpec`.

    Returns `None` if a supertype does not exist.
    Nr   r/   )r   r#   r   r   r  r   r   re   r   most_specific_common_supertypepack_sequence_asr   r   r   r   r   r   )
r`   othersr   r  others_elementscommon_elementsrp   r   r  common_element_specs
             r@   r  z@DistributedDatasetAndIteratorSpec.most_specific_common_supertype  s^    L%u%""4#5#5u7J7JKL LL!3!34MFLMUt||E$7$78MOMfs=11O$]3 <'FF3B
C>!
CEoa			#	
 //0B0B0?A4:%%$($E$EG G z"  N
 Ds   AE (&E?E!
EEc                     t        j                  d | j                        } t        |       | j                  || j
                  | j                  | j                  | j                        S )Nc                 "    | j                         S rG   )_with_tensor_ranks_onlyss    r@   <lambda>zKDistributedDatasetAndIteratorSpec._with_tensor_ranks_only.<locals>.<lambda>  s    !++- rB   r  	r#   map_structurer   r   r   r   r   r   r   r`   r0   s     r@   r  z9DistributedDatasetAndIteratorSpec._with_tensor_ranks_only  sa    %%-L 4:%%$($E$EG GrB   c                     t        j                  d | j                        } t        |       | j                  || j
                  | j                  | j                  | j                        S )Nc                 "    | j                         S rG   )_without_tensor_namesr  s    r@   r  zIDistributedDatasetAndIteratorSpec._without_tensor_names.<locals>.<lambda>  s    !))+ rB   r  r  r  s     r@   r  z7DistributedDatasetAndIteratorSpec._without_tensor_names   sa    %%+L 4:%%$($E$EG GrB   )rv   r   r   r   	__slots__r   UNKNOWNrc   r   r   r   r   r  r  r  rH   rB   r@   r   r   y  sK    ^) "))"&*<3G1*Q& GD
G
GrB   r   c                   L    e Zd ZdZed        Zed        Zd Zd Ze	d        Z
y)r5   z-Type specification for `DistributedIterator`.c                     t         S rG   )DistributedIteratorrf   s    r@   
value_typez"DistributedIteratorSpec.value_type  s    rB   c           
      8   g }| j                   j                  }t        |      D ]s  \  }\  }}t        j                  t        j                  t        |      | j                        }|j                  t        |||| j                  | j                               u |S N)rp   )r   r\   r   r#   r  	functoolspartial_replace_per_replica_specr   r    _SingleWorkerDatasetIteratorSpecr   r^   )r`   specsra   rp   input_devicecompute_devicesr0   s          r@   _component_specsz(DistributedIteratorSpec._component_specs  s    E--BB.78K.L H**L/''


5
;T=O=OQlll
*<+7+/+E+EGHH LrB   c                     |j                   S rG   )r   r`   values     r@   _to_componentsz&DistributedIteratorSpec._to_components"  s    rB   c                     t        | j                  d || j                  | j                  | j                  | j
                  | j                  | j                  	      S )N)	r   r   
componentsr0   r+   r   r/   r.   r   )r  r   r   r   r   r   r   r   r`   r.  s     r@   _from_componentsz(DistributedIteratorSpec._from_components%  sP    ))''%%$($E$E))
 
rB   c                     t        | j                  | j                  | j                  | j                  | j
                  | j                        S )Nr  )r5   r   r   r   r   r   r   r+  s    r@   r   z"DistributedIteratorSpec.from_value2  sE     #&&$)$F$FH HrB   Nrv   r   r   r   r   r  r(  r,  r0  staticmethodr   rH   rB   r@   r5   r5     sK    5    H HrB   r5   c            	       j     e Zd ZdZdddddej
                  dddf	 fd	Zed        Zed        Z	 xZ
S )r  z)Input Iterator for a distributed dataset.NFc
                    |t        d      d}
|| _        |A||t        |
      || _        || _        || _        || _        || _        || _        |	| _        y ||t        |
      t        t        | /  ||||||	       y )Nz#`input_workers` should be provided.zSEither `input_workers` or both `components` and `element_spec` need to be provided.)r   r   r   r   r   r   r   r   r   superr  rc   )r`   r   r   r+   r.  r0   r   r/   r.   r   error_messageru   s              r@   rc   zDistributedIterator.__init__B  s      # $ $!M DM

 4'''d)d"dodn%d*Ed')d

 \%=''/




%
rB   c                     | j                   rJ| j                  j                  j                         r&t	        j
                  t        | j                  d      S | j                  S )NFexpand_compositesr   r   r6   r   r#   r  _rebatch_as_dynamicr   rf   s    r@   r0   z DistributedIterator.element_specn  sQ     	))557
t11UL LrB   c           	          t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                        S rG   )r5   r   r   r   r   r   r   r   rf   s    r@   r2   zDistributedIterator._type_specz  sL    
 #)) rB   )rv   r   r   r   r   r  rc   r   r0   r2   __classcell__ru   s   @r@   r  r  >  sW    1 !))"'*X 	 	  rB   r  c                   "    e Zd ZdZd Zd Zd Zy)_IterableInputz;Base class for iterable inputs for distribution strategies.c                 6    t        |t              sJ || _        y rG   )r1   rD   r   )r`   r   s     r@   rc   z_IterableInput.__init__  s    m\222'DrB   c                     t        d      Nz"must be implemented in descendantsr   rf   s    r@   r   z_IterableInput.__iter__      
B
CCrB   c                     t        |       j                         }d }fd}t        j                  ||||gdd      \  }}|S )z9Execute a `reduce_fn` over all the elements of the input.c                 $    ~| j                         S rG   )r   )optional_datastates     r@   r   z#_IterableInput.reduce.<locals>.cond  s    
$$&&rB   c                 Z     || j                               }j                         } | |fS )z9Executes `reduce_fn` in a loop till the dataset is empty.)	get_valuer   )rJ  rK  iterator	reduce_fns     r@   	loop_bodyz(_IterableInput.reduce.<locals>.loop_body  s1    }6689e335mE!!rB   r%   T)parallel_iterationsreturn_same_structure)iterr   r    )r`   initial_staterO  rJ  r   rP  final_staterN  s     `    @r@   r   z_IterableInput.reduce  sV    DzH113M'" ",!6!6M=1"	"$M;
 rB   N)rv   r   r   r   rc   r   r   rH   rB   r@   rB  rB    s    C(DrB   rB  c                   L    e Zd ZdZed        Zed        Zd Zd Ze	d        Z
y)r3   z+Type specification for `DistributedDataset.c                     t         S rG   )DistributedDatasetrf   s    r@   r  z!DistributedDatasetSpec.value_type  s    rB   c                    g }| j                   j                  }t        |      D ]b  \  }}t        j                  t        j                  t        |      | j                        }|j                  t        j                  |             d |S r   r   r\   r   r#   r  r!  r"  r#  r   r   r   DatasetSpecr`   r%  ra   rp   rK   r0   s         r@   r(  z'DistributedDatasetSpec._component_specs  z    E--BB-. :1''


5
;T=O=OQlll;**<89: LrB   c                     |j                   S rG   )_cloned_datasetsr*  s     r@   r,  z%DistributedDatasetSpec._to_components  s    !!!rB   c           	          t        | j                  | j                  || j                  | j                  | j
                  | j                        S )N)r   r+   r.  r0   r/   r.   r   )rX  r   r   r   r   r   r   r/  s     r@   r0  z'DistributedDatasetSpec._from_components  sD    ))''$($E$E)) rB   c                     t        | j                  | j                  | j                  | j                  | j
                        S N)r/   r3   r   r   r   r   r   r2  s    r@   r   z!DistributedDatasetSpec.from_value  s<     "$)$F$FH HrB   Nr3  rH   rB   r@   r3   r3     sK    3   "	 H HrB   r3   c                        e Zd ZdZ	 	 	 	 	 	 	 	 	 d fd	ZddZd Zed        Zd Z	d Z
d Zed	        Zed
        Z xZS )rX  zBDistributed dataset that supports prefetching to multiple devices.c                 H   t         t        |   |       ||t        d      ||t        d      ||t        d      || _        || _        |
| _        || _        || _        || _	        |"|| _
        d| _        |	r| j                          yy|	st        d      |t        d      || _        t        | j                  d	         | _        || _        |J |t!        | j
                  | j                  d	   j"                        k7  rt        d
      || _        d| _        y)a  Distribute the dataset on all workers.

    If `num_replicas_in_sync` is not None, we split each batch of the dataset
    into `num_replicas_in_sync` smaller batches, to be distributed among that
    worker's replicas, so that the batch size for a global step (across all
    workers and replicas) is as expected.

    Args:
      input_workers: an `InputWorkers` object.
      strategy: a `tf.distribute.Strategy` object, used to run all-reduce to
        handle last partial batch.
      dataset: `tf.data.Dataset` that will be used as the input source. Either
        dataset or components field should be passed when constructing
        DistributedDataset. Use this when constructing DistributedDataset from a
        new `tf.data.Dataset`. Use components when constructing using
        DistributedDatasetSpec.
      num_replicas_in_sync: Optional integer. If this is not None, the value is
        used to decide how to rebatch datasets into smaller batches so that the
        total batch size for each step (across all workers and replicas) adds up
        to `dataset`'s batch size.
      input_context: `InputContext` for sharding. Only pass this in for between
        graph multi-worker cases where there is only one `input_worker`. In
        these cases, we will shard based on the `input_pipeline_id` and
        `num_input_pipelines` in the `InputContext`.
      components: datasets when DistributedDataset is constructed from
        DistributedDatasetSpec. Either field dataset or components should be
        passed.
      element_spec: element spec for DistributedDataset when constructing from
        DistributedDatasetSpec. This will be used to set the element_spec for
        DistributedDataset and verified against element_spec from components.
      enable_get_next_as_optional: this is required when components is passed
        instead of dataset.
      build: whether to build underlying datasets when this object is created.
        This is only useful for `ParameterServerStrategy` now.
      options: `tf.distribute.InputOptions` used to control options on how this
        dataset is distributed.
      replica_order: the order of the replicas, which will be used to reorder
        the iterators to match the device order.
    r   Nz1input_workers and strategy are required argumentsz3Only one of dataset or components should be presentz6At least one of dataset or components should be passedFz~When constructing DistributedDataset with components, build should not be False. This is an internal error. Please file a bug.zgWhen constructing DistributedDataset with components, enable_get_next_as_optional should also be passedr   z2Mismatched element_spec from the passed componentsT)r7  rX  rc   r   r   r   r   _input_context_num_replicas_in_syncr   _original_dataset_builtbuildr_  r   r   _create_distributed_tensor_specr0   r   )r`   r   r+   r=   num_replicas_in_syncinput_contextr.  r0   r/   rk  r.   r   ru   s               r@   rc   zDistributedDataset.__init__  sb   j 

d,=,I 0JKKz5LMM:-OPP'DDNDM'D!5D'D&ddk	

 
  	 
%	,@A 	A )d&t'<'<Q'?@d*Ed'%%%	8
..$//2??A 
AMNN'ddkrB   c                    | j                   rJ |xs | j                  }t        |      | _        t        | j                  || j                        | _        t        j                         }| j                  || j                  | j                  | j                  | j                         t        j                         rt        j                          t        j                         |z
  dz  }t        j                  | j                  j                   j"                  t%        | j                  j&                              j)                  |       t+        | j                  | j,                  d   j.                        | _        d| _         y Ni@B r   T)rj  ri  r   r   r   timetime_ns$_create_cloned_datasets_from_datasetrg  r   rh  r   executing_eagerly
async_wait5_distributed_dataset_initialization_time_millisecondsget_cellru   rv   strrg   addrl  r_  r0   r   )r`   dataset_to_replacer=   distribute_start_time_nsdistribute_duration_mss        r@   rk  zDistributedDataset.build;  s,   {{? :D$:$:G$W-D(D!2!2)4D%#||~--gt7J7J.2.A.A.2nn.2.H.HJ   " $ 8!9=F G;DD
..
"
"
+
+
d!!--
.0034J0K8--a0==?DDKrB   c           
         t        | j                        t        | j                  j                        k(  s=J dt        | j                         dt        | j                  j                                g }t	        t        | j                  j                              D ]u  }t        j                  | j                  |   j                        5  |j                  t        j                  | j                  |   ||| j                               d d d        w t        | j                  | j                  || j                  | j                  | j                         S # 1 sw Y   xY w)N
datasets: , input workers: )r.  r0   r.   r/   )re   r_  r   ri   rt   r   colocate_with_variant_tensorr   r   auto_shard_datasetrh  rX  r   r   r   r   r`   
num_shardsshard_ixsharded_datasetsrp   s        r@   
auto_shardzDistributedDataset.auto_shardR  sD   D!!"c$*=*=*L*L&MM S../0 1d11@@AB	DM
 3t**99:; T2215EEF ((%%a(*h**	  #''$($E$EG G s   ?EE!	c                 H    | j                   st        d      | j                  S Nz9Cannot get the cardinality of a dataset that is not builtrj  r   r   rf   s    r@   r   zDistributedDataset.cardinalityi  '    ;;
EG GrB   c                    |<|dkD  r7|r|j                   nt        |j                        }| j                  |||      }nd }g | _        |rn|j
                  dk(  sJ | |||j                        }t        j                  ||j                   |j                  |      }| j                  j                  |       y t        j                  ||j                        }t        |j                        D ]y  \  }	}
t        j                  |
      5  ||
   }|	 |||	      }t        j                  |t        |j                        |	|      }| j                  j                  |       d d d        { y # 1 sw Y   xY w)Nr%   )num_input_pipelinesre   ri   _make_rebatch_fnr_  rg   input_pipeline_idr   r  r   r   	replicater   r   r   )r`   r=   rn  r   r+   rm  rg   
rebatch_fnreplicated_dsrp   r   cloned_datasets               r@   rs  z7DistributedDataset._create_cloned_datasets_from_datasetp  sz    ',@1,D9FM55C

&
&M(k((+)=?j jD&&!+++		Wm&E&EF,,W-:-N-N-:-L-L-ACg ""7+ **7+8+G+GIm !=!=> 7)!VZZ 	7(0.#':N$77c-">">?"$. 


&
&~
6	7 	77	7 	7s   AE**E3	c                     z  rt        dj                              z  t        j                  |j                        5  t        j                  |      ddd       fd}|S # 1 sw Y   xY w)aj  Returns a callable that rebatches the input dataset.

    Args:
      dataset: A `tf.data.Dataset` representing the dataset to be distributed.
      num_workers: An integer representing the number of workers to distribute
        `dataset` among.
      num_replicas_in_sync: An integer representing the number of replicas in
        sync across all workers.
    ztf.distribute expects every worker to have the same number of replicas. However, encountered `num_replicas_in_sync` ({}) that cannot be divided by `num_workers` ({})Nc           
          	  fd} fd}t        j                   j                        5  t        j                  t        j                  d      ||      cd d d        S # 1 sw Y   y xY w# t        j                  $ rb}dt        |      v rJt        j                  t        t        dj                  |            t        j                         d          n Y d }~y d }~ww xY w)Nc                  t    t        j                        } j                  |       j                        S rG   )r   batch_sizes_for_workerrebatchprefetch)batch_sizes
batch_sizer=   num_replicas_per_workerrg   rl   s    r@   apply_rebatchzNDistributedDataset._make_rebatch_fn.<locals>.rebatch_fn.<locals>.apply_rebatch  s9    "99+'>N+-667NO
OrB   c                  N    t        j                         j                        S rG   )r   _LegacyRebatchDatasetr  )r=   rm  r  s   r@   apply_legacy_rebatchzUDistributedDataset._make_rebatch_fn.<locals>.rebatch_fn.<locals>.apply_legacy_rebatch  s(    11+--5X6M-NOrB   )true_fnfalse_fnzwithout encountering a batchzCall the `batch` method on the input Dataset in order to be able to split your input across {} replicas.
 Please see the tf.distribute.Strategy guide. {}r&   )r   r  r  r   r   r   	not_equalr   InvalidArgumentErrorrx  sixreraiser   r   sysexc_info)	r=   rl   r  r  er  rm  r  rg   s	   ``   r@   r  z7DistributedDataset._make_rebatch_fn.<locals>.rebatch_fn  s    	P 	P	O w667 	-  R0#+-	- 	- 	-
 (( )SV3
++99?*A:/0
 llnQ! !s5   /A6 ,A* 	A6 *A3/A6 3A6 6C+	AC&&C+)r   r   r   r  r  r   compute_batch_size)r`   r=   rg   rm  r  r  r  s     `` @@r@   r  z#DistributedDataset._make_rebatch_fn  s     k)44:F"K512 2 3kA			722	3 :009j:> E: :s   	A11A:c           	      D   t        j                         s)t        j                         j                  st        d      | j                  st        d      t        | j                  dd      }t        | j                  | j                  | j                  |      }t        | j                  || j                  | j                  | j                   | j                  | j"                        }| j$                  |_        t        j                         rt        j&                          |S )NV__iter__() is only supported inside of tf.function or when eager execution is enabled.zcTo use this dataset, you need to pass this dataset to ClusterCoordinator.create_per_worker_dataset.r^   Tr.   rb   )r   r/   r.   r   )r   rt  r   get_default_graphbuilding_functionRuntimeErrorrj  r   r   r   _create_iterators_per_workerr_  r   r   r  r   r   r   r   ru  )r`   rb   worker_iteratorsrN  s       r@   r   zDistributedDataset.__iter__  s   %%'!!#55 ? @ @;; G H H #4>>3J#') 41	3
 #%%$($E$E))H "//H
   "OrB   c                     | j                   rJ| j                  j                  j                         r&t	        j
                  t        | j                  d      S | j                  S z5The type specification of an element of this dataset.Fr:  r<  rf   s    r@   r0   zDistributedDataset.element_spec  Q     	))557
t11UL LrB   c                     t        | j                  | j                  | j                  | j                  | j
                        S rb  rc  rf   s    r@   r2   zDistributedDataset._type_spec  s:    !$($E$EG GrB   )	NNNNNNTNNrG   )rv   r   r   r   rc   rk  r  r   r   rs  r  r   r0   r2   r?  r@  s   @r@   rX  rX    s    J "&]~.G.  *7X4l"H 
 
 G GrB   rX  c                   L    e Zd ZdZed        Zed        Zd Zd Ze	d        Z
y)r4   z8Type specification for `DistributedDatasetsFromFunction.c                     t         S rG   )DistributedDatasetsFromFunctionrf   s    r@   r  z.DistributedDatasetsFromFunctionSpec.value_type  s    **rB   c                    g }| j                   j                  }t        |      D ]b  \  }}t        j                  t        j                  t        |      | j                        }|j                  t        j                  |             d |S r   rZ  r\  s         r@   r(  z4DistributedDatasetsFromFunctionSpec._component_specs  r]  rB   c                     |j                   S rG   )	_datasetsr*  s     r@   r,  z2DistributedDatasetsFromFunctionSpec._to_components  s    ??rB   c                 r    t        | j                  | j                  || j                  | j                        S )N)r   r+   r.  r0   r.   )r  r   r   r   r   r/  s     r@   r0  z4DistributedDatasetsFromFunctionSpec._from_components"  s2    *))'' rB   c                 p    t        | j                  | j                  | j                  | j                        S )N)r   r0   r+   r.   r4   r   r   r   r   r2  s    r@   r   z.DistributedDatasetsFromFunctionSpec.from_value*  s1     /**((	   rB   Nr3  rH   rB   r@   r4   r4     sI    @+ +      rB   r4   c                   t     e Zd ZdZ	 	 	 	 	 	 	 d	 fd	Zd Zd Zed        Zd Z	ed        Z
ed        Z xZS )
r  z%Inputs created from dataset function.c
                    t         t        |   |       || _        || _        || _        |	| _        ||t        d      ||t        d      ||j                  t        |      k7  r#t        d|j                  t        |      fz        || _
        | j                  d   j                  | _        || _        d| _        |r| j                          yy|t        d      |st        d	      || _        || _        d| _        d
| _        t%        | j"                  d         | _        t'        | j                  | j"                  d   | j$                        | _        y)a<  Makes an iterable from datasets created by the given function.

    Args:
      input_workers: an `InputWorkers` object.
      strategy: a `tf.distribute.Strategy` object, used to run all-reduce to
        handle last partial batch.
      input_contexts: A list of `InputContext` instances to be passed to call(s)
        to `dataset_fn`. Length and order should match worker order in
        `worker_device_pairs`.
      dataset_fn: A function that returns a `Dataset` given an `InputContext`.
        Either dataset_fn or components should be passed to construct
        DistributedDatasetsFromFunction. Use this when constructing
        DistributedDataset using a function. Use components when constructing
        using DistributedDatasetsFromFunctionSpec.
      options: `tf.distribute.InputOptions` used to control options on how this
        dataset is distributed.
      components: datasets when DistributedDatasetsFromFunction is constructed
        from DistributedDatasetsFromFunctionSpec. Only one of dataset or
        components should be passed.
      element_spec: element spec for DistributedDataset when constructing from
        DistributedDatasetSpec. This will be used to set the element_spec for
        DistributedDatasetsFromFunctionSpec and verified against element_spec
        from components.
      build: whether to build underlying datasets when this object is created.
        This is only useful for `ParameterServerStrategy` now.
      replica_order: the order of the replicas, which will be used to reorder
        the iterators to match the device order.
    rf  Nz2Only one of dataset_fn or components should be setz6At least one of dataset_fn or components should be setzINumber of input workers (%d) is not same as number of input_contexts (%d)r   Fz:element_spec should also be passed when passing componentszWhen constructing DistributedDatasetFromFunction with components, build should not be False. This is an internal error. Please file a bug.T)r7  r  rc   r   r   r   r   r   rg   re   _input_contextsrm  rh  _dataset_fnrj  rk  r   r  r   r   )r`   r   r+   input_contexts
dataset_fnr.   r.  r0   rk  r   ru   s             r@   rc   z(DistributedDatasetsFromFunction.__init__9  s}   P 

)49# : %'DDNDM'D*"8KLLj0OPP		"	"c.&9	9"&&N(;<=> 	> ,d#'#7#7#:#O#Od #ddk	

 
 
	HJ 	J 	 (d!dn#'d dk&t~~a'89d*F
..$..+T->->+@d'rB   c                    | j                   rJ t        j                         }t        | j                  | j
                  | j                        \  | _        }t        j                         rt        j                          t        j                         |z
  dz  }t        j                  | j                  j                  j                  t!        | j
                  j"                              j%                  |       t'        | j                  |      | _        t+        | j                  d         | _        t-        | j                  | j                  d   | j*                        | _        d| _         y rp  )rj  rq  rr  1_create_datasets_from_function_with_input_contextr  r   r  r  r   rt  ru  C_distributed_dataset_from_function_initialization_time_millisecondsrw  r   ru   rv   rx  rg   ry  rl  r   r   r   )r`   r{  r0   r|  s       r@   rk  z%DistributedDatasetsFromFunction.build  s   {{?#||~9  $"5"5t7G7G	I !DNL   " $ 8!9=F GIRR
..
"
"
+
+
d!!--
.0034J0K8&D$T^^A%67D(Dq)4+<+<)>D%DKrB   c           
         t        | j                        t        | j                  j                        k(  s=J dt        | j                         dt        | j                  j                                g }t	        t        | j                  j                              D ]u  }t        j                  | j                  |   j                        5  |j                  t        j                  | j                  |   ||| j                               d d d        w t        | j                  | j                  || j                  | j                        S # 1 sw Y   xY w)Nr~  r  )r.  r0   r.   )re   r  r   ri   rt   r   r  r  r   r   r  rh  r  r   r   r   r  s        r@   r  z*DistributedDatasetsFromFunction.auto_shard  s1   DNNs4#6#6#E#EFF S() *d11@@AB	DF
 3t**99:; 
T^^A.>>? 
((q!:x**	

 

 +4+>+>6F8<8J8J37==B B
 
s   ?EE	c                 H    | j                   st        d      | j                  S r  r  rf   s    r@   r   z+DistributedDatasetsFromFunction.cardinality  r  rB   c           	      D   t        j                         s)t        j                         j                  st	        d      | j
                  st        d      t        | j                  dd      }t        | j                  | j                  | j                  |      }t        | j                  || j                  | j                  | j                  | j                  | j                         }| j"                  |_        t%        j&                         rt%        j(                          |S )Nr  zMYou need to use this dataset in ClusterCoordinator.create_per_worker_dataset.r^   Tr  )r   r   r+   r   r/   r.   r   )r   #executing_eagerly_outside_functionsr  r  r  rj  r   r   r   r  r  r   r   r  r   r   r   r   r   rt  ru  )r`   rb   r   rN  s       r@   r   z(DistributedDatasetsFromFunction.__iter__  s   335!!#55 ? @ @ ;; G H H #4>>3J#') -1	3I
 #))%%$($E$E))H "//H
   "OrB   c                     | j                   rJ| j                  j                  j                         r&t	        j
                  t        | j                  d      S | j                  S r  r<  rf   s    r@   r0   z,DistributedDatasetsFromFunction.element_spec  r  rB   c                 n    t        | j                  | j                  | j                  | j                        S rG   r  rf   s    r@   r2   z*DistributedDatasetsFromFunction._type_spec  s/    .t/B/B/3/A/A/3~~t}}N NrB   )NNNNNTN)rv   r   r   r   rc   rk  r  r   r   r   r0   r2   r?  r@  s   @r@   r  r  5  st    - N@`,B*  #J 
 
 N NrB   r  c                 4    d }t        j                  ||       S )z:A function to create dummy tensors from `value_structure`.c           	         t        | d      r| j                         S t        | t        j                        rE| j
                  dd j                  | j
                  d| j                  z   d       }| j                  }n| j                  }| j                  }|r"|j                         D cg c]  }||nd
 c}ng }|rBt        | t        j                        s|j                         rt        j                  d      |d<   t        | t        j                         rYt        j"                  t%        j&                  d|      t%        j&                  dt)        |      ft*        j,                        |      S t%        j&                  t        j.                  |      |      }t        | t        j                        rPt%        j&                  d| j0                        }t        j2                  j5                  ||f| j                  z  d      }|S c c}w )z>Create a dummy tensor with possible batch dimensions set to 0._create_empty_valueNr%   r   )r   indicesdense_shapeF)validate)hasattrr  r1   r!   r   _shapeconcatenate_ragged_rank_dtyper   dtyper   r   r   	Dimensionr   r   SparseTensorr   zerosre   r   r   TensorShape_row_splits_dtypeRaggedTensorfrom_nested_row_splits)r   feature_shapefeature_typedimdimsdummy_tensor
row_splitss          r@   create_dummy_tensorz-_dummy_tensor_fn.<locals>.create_dummy_tensor  s   t*+ %%''$667 kk"1o11
++q4,,,.
/1m[[l jjmZZl  7D6K6K6MNsCOS*N"$ 	D-"@"@A//1&&q)d1g$667''L1//1c$i.&,,?  ??<#;#;D#A<PL$667 ??1d&<&<=j"//FF
(9(99E G Kl + Os   (H)r#   r  )value_structurer  s     r@   _dummy_tensor_fnr    s    1f 
		/	AArB   c                    g }t        | j                        D ]  \  }t        j                  |      5  | j	                        }t        |      D ]  \  }t        j                  |      5  |rG|j                  t        j                        j                         fdfdd             n%|j                        j                                ddd        	 ddd        |S # 1 sw Y   xY w# 1 sw Y   xY w)a  Returns the value of the optionals or dummy values.

  Args:
    input_workers: the `InputWorkers`.
    optional_list: a list of lists `tf.experimental.Optional`. The values from
      each compute device grouped by the input device.
    produce_dummy: a bool. Whether to produce dummy tensors when the optional
      doesn't have a value.

  Returns:
    A flatten list of Tensors.

  c                  0           j                         S rG   )rM  rp   jr   s   r@   r  z%_get_value_or_dummy.<locals>.<lambda>F  s    M!,Q/99; rB   c                  :    t               j                        S rG   )r  r0   r  s   r@   r  z%_get_value_or_dummy.<locals>.<lambda>G  s    ,]1-=a-@-M-MN rB   Tr   N)
r   ri   r   r   rm   r   r   r   r   rM  )	r   r   r   r   r   rq   r   rp   r  s	    `     @@r@   r   r   -  s    *]99: ?ia	F	 ?88;g ) ?)!VZZ 	?!!$Q'113;N	 mA.q1;;=>	? 	??? ??" 
	? 	?? ?s$   8D .A/C4	
D 4C=9D  D		c                   :    e Zd ZdZd	dZd Zd Zd	dZd	dZd Z	y)
 _SingleWorkerDatasetIteratorBasez(Iterator for a single `tf.data.Dataset`.Nc                 ~    || _         || _        || _        |j                  | _        || _        | j                          y)a  Create iterator for the `dataset` to fetch data to worker's `devices` .

    A `MultiDeviceIterator`  or `OwnedMultiDeviceIterator` is used to prefetch
    input to the devices on the given worker.

    Args:
      dataset: A `tf.data.Dataset` instance.
      worker: Worker on which ops should be created.
      devices: Distribute data from `dataset` to these devices.
      options: options.
    N)_dataset_worker_devicesr0   r   r   _make_iterator)r`   r=   r   rq   r.   s        r@   rc   z)_SingleWorkerDatasetIteratorBase.__init__S  s:     DMDLDM --DDMrB   c                     t        d      rE  rF  rf   s    r@   r  z/_SingleWorkerDatasetIteratorBase._make_iteratorf  rG  rB   c                     | j                   r@| j                   j                  t        j                  k(  r| j                   j                  s|gS |S )a=  Change the data in to a list type if required.

    The OwnedMultiDeviceIterator returns the list data type,
    while the PER_REPLICA iterator (when used with prefetch disabled)
    returns without the enclosed list. This is to fix the inconsistency.
    Args:
      data_list: data_list
    Returns:
      list
    )r   experimental_replication_moder   PER_REPLICAexperimental_fetch_to_device)r`   	data_lists     r@   _format_data_list_with_optionsz?_SingleWorkerDatasetIteratorBase._format_data_list_with_optionsi  s@     	$--EE(()MM66[rB   c                    ~t        j                  | j                        5  t        | j                        r$| j
                  j                  |      cddd       S | j
                  j                         cddd       S # 1 sw Y   yxY w)z&Get next element for the given device.N)r   r   r  !_should_use_multi_device_iteratorr   	_iteratorr   )r`   r   r   s      r@   r   z)_SingleWorkerDatasetIteratorBase.get_next{  sd    	DLL	! )	*4==	9~~&&v.) ) ~~&&(	) ) )s   0A>A>>Bc                     ~t        j                  | j                        5  | j                  | j                  j                               cddd       S # 1 sw Y   yxY w)a  Get next element from the underlying iterator.

    Runs the iterator get_next() within a device scope. Since this doesn't use
    get_next_as_optional(), it is considerably faster than get_next_as_list(),
    but it raises EOFError if any of the device doesn't get any data.

    Args:
      name: not used.

    Returns:
      A list consisting of the next data from each device.
    N)r   r   r  r  r  r   )r`   r   s     r@   r   z1_SingleWorkerDatasetIteratorBase.get_next_as_list  sK     		DLL	! L001H1H1JKL L Ls   )AAc                     t        j                  | j                        5  | j                  | j                  j                               cd d d        S # 1 sw Y   y xY wrG   )r   r   r  r  r  r   rf   s    r@   r   z:_SingleWorkerDatasetIteratorBase.get_next_as_optional_list  sE    	DLL	! 100
..
-
-
/11 1 1s   )AArG   )
rv   r   r   r   rc   r  r  r   r   r   rH   rB   r@   r  r  P  s%    0&D$)L"1rB   r  c                   j    e Zd ZdZg dZ	 ddZed        Zd Zd Z	ed        Z
d Zd	 Zed
        Zy)r$  z;Type specification for `_SingleWorkerOwnedDatasetIterator`.r  r  r   r   r^   c                     || _         |rt        d |D              | _        nt        d |D              | _        || _        ||nt	        j
                         | _        || _        y )Nc              3   F   K   | ]  }t        j                  |        y wrG   rO   rQ   s     r@   rL   z<_SingleWorkerDatasetIteratorSpec.__init__.<locals>.<genexpr>  s     IAK44Q7IrR   c              3   F   K   | ]  }t        j                  |        y wrG   rZ   rQ   s     r@   rL   z<_SingleWorkerDatasetIteratorSpec.__init__.<locals>.<genexpr>  s"      M?@+
7
7
:MrR   )r  rT   r  r   r   InputOptionsr   r^   )r`   r   rq   r0   r.   rb   s         r@   rc   z)_SingleWorkerDatasetIteratorSpec.__init__  se    DLIIIdm MDKM Mdm%D ' 3W#002 	M!5DrB   c                     t         S rG   )!_SingleWorkerOwnedDatasetIteratorrf   s    r@   r  z+_SingleWorkerDatasetIteratorSpec.value_type  s    ,,rB   c                 t    | j                   | j                  | j                  | j                  | j                  fS rG   r  rf   s    r@   r   z+_SingleWorkerDatasetIteratorSpec._serialize  s0    LL$--););T]]&&( (rB   c                    t        j                  | j                  t        j                               }t        j                  |      }|}|j                  t        j                  | j                  || j                               y N)r0   )
r   rP   r  currentget_host_for_devicer   r	   MultiDeviceIteratorSpecr  r   )r`   r%  device_scopehost_devicer   s        r@   _get_multi_device_iterator_specz@_SingleWorkerDatasetIteratorSpec._get_multi_device_iterator_spec  se    ++DLL+:M:M:OPL11,?K F	LL!99MM60B0B	DErB   c                     g }t        | j                        r| j                  |       |S |j                  t	        j
                  | j                               |S r  )r   r   r  r   r   r<   r   )r`   r%  s     r@   r(  z1_SingleWorkerDatasetIteratorSpec._component_specs  sL    E(7
**51 L ll<,,$:L:LMNLrB   c                     |j                   gS rG   )r  r*  s     r@   r,  z/_SingleWorkerDatasetIteratorSpec._to_components  s    OOrB   c           	          t        d | j                  | j                  || j                  | j                  | j
                        S )N)r=   r   rq   r.  r0   r.   rb   )r  r  r  r   r   r^   r/  s     r@   r0  z1_SingleWorkerDatasetIteratorSpec._from_components  s<    ,||''!779 9rB   c                     t        | j                  | j                  | j                  | j                  | j
                        S rG   r$  r  r  r   r   r^   r2  s    r@   r   z+_SingleWorkerDatasetIteratorSpec.from_value  s7     ,EMM5>>,1,?,?,1,G,GI IrB   Nr~   )rv   r   r   r   r  rc   r   r  r   r  r(  r,  r0  r4  r   rH   rB   r@   r$  r$    si    C) %)6 - -(E  9 I IrB   r$  c                        e Zd ZdZ	 	 	 	 	 	 	 d
 fd	Zd Zd Zed        Zed        Z	ed        Z
ed        Zed	        Z xZS )r  z+Iterator for a DistributedDataset instance.c                    ||t        d      d}|| _        || _        |/||t        |      || _        || _        || _        |d   | _        y||t        |      t        t        | '  |||| j                         y)a0  Create iterator for the `dataset` to fetch data to worker's `devices` .

    `OwnedMultiDeviceIterator` is used to prefetch input to the devices on the
    given worker. The lifetime of this iterator is tied to the encompassing
    python object. Once we go out of scope of the python object or return from
    a tf.function the underlying iterator resource is deleted.

    Args:
      dataset: A `tf.data.Dataset` instance.
      worker: Worker on which ops should be created.
      devices: Distribute data from `dataset` to these devices.
      components: Tensor components to construct the
        _SingleWorkerOwnedDatasetIterator from.
      element_spec: A nested structure of `TypeSpec` objects that represents the
      type specification of elements of the iterator.
      options: `tf.distribute.InputOptions` used to control options on how this
      dataset is distributed.
      canonicalize_devices: Whether to canonicalize devices for workers fully or
      partially. If False, it will partially canonicalize devices by removing
      job and task.
    Nz.Both `worker` and `devices` should be providedzMEither `dataset` or both `components` and `element_spec` need to be provided.r   )
r   r   r^   r   r  r  r  r7  r  rc   )
r`   r=   r   rq   r.  r0   r.   rb   r8  ru   s
            r@   rc   z*_SingleWorkerOwnedDatasetIterator.__init__  s    : ~GHH,M DM!5D

 4'''ddldm!!}dn

 L$<''-"FGT]]CrB   c                 r   t        j                         sHt        j                  | j                  t        j
                               }t        j                  |      }n| j                  | j                  }}t        j                  |      5  | j                  \t        j                  | j                  | j                  || j                  j                  | j                  j                        | _        n1t        j                  | j                  | j                  |      | _        d d d        y # 1 sw Y   y xY w)N)source_devicemax_buffer_sizeprefetch_buffer_size)r  )r   inside_functionr   rP   r  r  r  r   r   r	   OwnedMultiDeviceIteratorr  r  $experimental_per_replica_buffer_sizer  )r`   r  r  s      r@   #_create_owned_multi_device_iteratorzE_SingleWorkerOwnedDatasetIterator._create_owned_multi_device_iterator  s    
   --dll.9.A.A.CEl33LAk #',,Kl	L	! E		"2KKMMMM% MM11!%113 3KKMM4==EE E Es   
BD--D6c                    | j                   st        d      t        | j                        r| j	                          yt        j                  | j                         5  t        | j                        | _	        ddd       y# 1 sw Y   yxY w)z)Make appropriate iterator on the dataset.z@Worker device must be specified when creating an owned iterator.N)
r  r   r   r   r#  r   r   rS  r  r  rf   s    r@   r  z0_SingleWorkerOwnedDatasetIterator._make_iterator7  sh    << ) * *(7
..0::dll# -dmm,- - -s   BB
c                     | j                   S rG   )r   rf   s    r@   r0   z._SingleWorkerOwnedDatasetIterator.element_specB  s    rB   c                     t        | j                  | j                  | j                  | j                  | j
                        S rG   r  rf   s    r@   r2   z,_SingleWorkerOwnedDatasetIterator._type_specF  s5    +DLL$--,0,>,>,0,F,FH HrB   c                 D    t        j                  d | j                        S )a  Returns the class of each component of an element of this iterator.

    The expected values are `tf.Tensor` and `tf.SparseTensor`.

    Returns:
      A nested structure of Python `type` objects corresponding to each
      component of an element of this dataset.
    c                 "    | j                         S rG   )_to_legacy_output_classescomponent_specs    r@   r  zB_SingleWorkerOwnedDatasetIterator.output_classes.<locals>.<lambda>W  s    ~GGI rB   r#   r  r   rf   s    r@   output_classesz0_SingleWorkerOwnedDatasetIterator.output_classesL  s#     I rB   c                 D    t        j                  d | j                        S )zReturns the shape of each component of an element of this iterator.

    Returns:
      A nested structure of `tf.TensorShape` objects corresponding to each
      component of an element of this dataset.
    c                 "    | j                         S rG   )_to_legacy_output_shapesr*  s    r@   r  zA_SingleWorkerOwnedDatasetIterator.output_shapes.<locals>.<lambda>c  s    ~FFH rB   r,  rf   s    r@   output_shapesz/_SingleWorkerOwnedDatasetIterator.output_shapesZ  s#     H rB   c                 D    t        j                  d | j                        S )zReturns the type of each component of an element of this iterator.

    Returns:
      A nested structure of `tf.DType` objects corresponding to each component
      of an element of this dataset.
    c                 "    | j                         S rG   )_to_legacy_output_typesr*  s    r@   r  z@_SingleWorkerOwnedDatasetIterator.output_types.<locals>.<lambda>o  s    ~EEG rB   r,  rf   s    r@   output_typesz._SingleWorkerOwnedDatasetIterator.output_typesf  s#     G rB   )NNNNNNN)rv   r   r   r   rc   r#  r  r   r0   r2   r-  r1  r5  r?  r@  s   @r@   r  r    s    3  $(0DdE@	-   H H
   	 	 	 	rB   r  c           	      l   t        |t              sJ t        |       t        |j                        k(  sJ g }t	        |j                        D ]X  \  }}t        j                  |      5  |j                  |      }t        | |   ||||      }|j                  |       ddd       Z |S # 1 sw Y   fxY w)z5Create a multidevice iterator on each of the workers.)r=   r   rq   r.   rb   N)
r1   rD   re   ri   r   r   r   rm   r  r   )	worker_datasetsr   r.   rb   r   rp   r   ri   rN  s	            r@   r  r  s  s    
 
M<	00	0	_	]%A%A!B	BB	B)]99: 	!ia	F	 !$??Bn2!!$ 35h x ! !	! 
! !s   (6B**B3	c                     g }t        |       D ]K  \  }}|j                  |   }t        j                  |      5   ||      }|j	                  |       ddd       M |j
                  fS # 1 sw Y   exY w)z;Create device datasets per worker given a dataset function.N)r   ri   r   r   r   r0   )r  r   r  datasetsrp   ctxr   r=   s           r@   r  r    s     (.) fa))!,F	F	 3goog 
 
7''	'' s   A**A3	c                 :   t        | t        j                        r| j                  } t        | t        j                  t
        j                  f      r| S t        | t        j                  t        j                  f      rt        | j                        S t        d      )z!Get the batched dataset from `d`.zUnable to get batched dataset from the input dataset. `batch` `map_and_batch` need to be the last operations on the dataset. The batch operations can be followed by a prefetch.)r1   r   DatasetV1Adapterr  BatchDatasetr   _MapAndBatchDatasetPrefetchDataset_OptionsDataset_get_batched_dataset_input_datasetr   )rJ   s    r@   rA  rA    s     ;//0	

AK,,h.J.JKLH!k11!113 4 0 011<	= =rB   c                    t        | t        j                  t        j                  f      sJ t        | t        j                        r| j
                  }| j                  }n2t        | t        j                        r| j                  }| j                  }t        j                        rt        j                  |      }t        j                        rt        j                  |      }||fS )z.Get `batch_size`, `drop_remainder` of dataset.)r1   r   r=  r   r>  _batch_size_drop_remainder_batch_size_t_drop_remainder_tr   
is_tf_typeconstant_value)rJ   r  drop_remainders      r@   _get_batched_dataset_attributesrK    s     
A --x/K/KL
N N 
N;++,J&&N!X112J((N J'++J7JN+ //?N	^	##rB   c                 8   t        |       }t        |      \  }}d}t        | t        j                        r| j
                  }nTt        | t        j                        r:t        | j                  t        j                        r| j                  j
                  }|||fS )z6Get the underlying attributes from the dataset object.N)rA  rK  r1   r   r?  _buffer_sizer<  r  )r=   batched_datasetr  rJ  prefetch_buffers        r@   _get_dataset_attributesrP    s     )1/>O*n /445**O7K889w'')D)DE&&33O	^_	44rB   c                     | F| j                   t        j                  k(  s)| j                   t        j                  k(  r| j                  ryy)z3Determine whether to use multi_device_iterator_ops.TF)r  r   
PER_WORKERr  r  )r.   s    r@   r   r     s@    o++/C/N/NN,,0D0P0PP//	rB   c                   J    e Zd ZdZd Zed        Zd Zd	dZed        Z	d Z
y)
MultiStepContexta  A context object that can be used to capture things when running steps.

  This context object is useful when running multiple steps at a time using the
  `experimental_run_steps_on_iterator` API. For e.g. it allows the user's step
  function to specify which outputs to emit at what frequency. Currently it
  supports capturing output from the last step, as well as capturing non tensor
  outputs.  In the future it will be augmented to support other use cases such
  as output each N steps.
  c                 .    i | _         i | _        i | _        y)zHInitialize an output context.

    Returns:
      A context object.
    N)_last_step_outputs_last_step_outputs_reduce_ops_non_tensor_outputsrf   s    r@   rc   zMultiStepContext.__init__  s     !D)+D&!DrB   c                     | j                   S )a  A dictionary consisting of outputs to be captured on last step.

    Keys in the dictionary are names of tensors to be captured, as specified
    when `set_last_step_output` is called.
    Values in the dictionary are the tensors themselves. If
    `set_last_step_output` was called with a `reduce_op` for this output,
    then the value is the reduced value.

    Returns:
      A dictionary with last step outputs.
    )rV  rf   s    r@   last_step_outputsz"MultiStepContext.last_step_outputs  s     """rB   c                 H    t        |t              st        d      || _        y)z3Replace the entire dictionary of last step outputs.z+Need a dictionary to set last_step_outputs.N)r1   dictr   rV  )r`   outputss     r@   _set_last_step_outputsz'MultiStepContext._set_last_step_outputs  s     gt$DEE%DrB   Nc                 B    t        j                         rW j                  <   | j                  <   yt        j                         }|j                  |d       j                  <   yJ  fd}t        j                         j                  ||f       y)a  Set `output` with `name` to be outputted from the last step.

    Args:
      name: String, name to identify the output. Doesn't need to match tensor
        name.
      output: The tensors that should be outputted with `name`. See below for
        actual types supported.
      reduce_op: Reduction method to use to reduce outputs from multiple
        replicas. Required if `set_last_step_output` is called in a replica
        context. Optional in cross_replica_context.
        When present, the outputs from all the replicas are reduced using the
        current distribution strategy's `reduce` method. Hence, the type of
        `output` must be what's supported by the corresponding `reduce` method.
        For e.g. if using MirroredStrategy and reduction is set, output
        must be a `PerReplica` value.
        The reduce method is also recorded in a dictionary
        `_last_step_outputs_reduce_ops` for later interpreting of the
        outputs as already reduced or not.
    Nr   c                 f    | j                  |d       j                  <   j                  <   y )Nr   )r   rV  rW  )distributionr+  r   	reduce_opr`   s     r@   merge_fnz7MultiStepContext.set_last_step_output.<locals>.merge_fn'  s=    (4(;(;IuAE )< )G%
 4=**40rB   args)r   in_cross_replica_contextrW  rV  get_strategyr   r   
merge_call)r`   r   outputrb  ra  rc  s   `` `  r@   set_last_step_outputz%MultiStepContext.set_last_step_output	  s    ( ..01:d((.		(.%%224(4(;(;IvAE )< )G% """= ((*55
& 6 $rB   c                     | j                   S )zAA dictionary consisting of any non tensor outputs to be captured.)rX  rf   s    r@   non_tensor_outputsz#MultiStepContext.non_tensor_outputs2  s     ###rB   c                      t        j                         r| j                  <   y fd}t        j                         j	                  ||f       y)z?Set `output` with `name` to be captured as a non tensor output.c                 B    | j                  |      j                  <   y rG   )experimental_local_resultsrX  )ra  r+  r   r`   s     r@   rc  z8MultiStepContext.set_non_tensor_output.<locals>.merge_fn<  s#     33E: 	  &rB   rd  N)r   rf  rX  r   rh  )r`   r   ri  rc  s   ``  r@   set_non_tensor_outputz&MultiStepContext.set_non_tensor_output7  sI    ..0'-dt$<
 ((*55
& 6 $rB   rG   )rv   r   r   r   rc   r   rZ  r^  rj  rl  rp  rH   rB   r@   rT  rT    sD    " # #&'$R $ $$rB   rT  c                     t        | j                  j                        t        |       s|S fd}t	        j
                  ||      S )a  Create a `tf.TypeSpec` for a given strategy and input `tensor_spec`.

  Args:
    strategy: The given `tf.distribute` strategy.
    tensor_spec: `tf.TensorSpec` of a given value. The batch dimension of the
      shape should be None if you have partial batches.

  Returns:
    A `tf.TypeSpec` that matches the values produced by a given strategy. This
    can be a `tf.TensorSpec` or a `PerRelicaSpec`.
  c                 b    t              D cg c]  }|  }}t        j                  | S c c}w rG   )rt   r   PerReplicaSpec)tensor_spec_per_inputrK   value_specsnum_replicass      r@   _get_value_per_replicaz?_create_distributed_tensor_spec.<locals>._get_value_per_replica[  s3    272EFQ(FKF  +.. Gs   	,)re   r6   ri   _always_wrapr#   r  )r+   tensor_specrw  rv  s      @r@   rl  rl  E  sF     X&&556, 
h	/ 
		2K	@@rB   c                 X    t        | t        j                        r| j                  |   S | S )zBIf `spec` is a `PerReplicaSpec`, then return its `i`th value_spec.)r1   r   rs  _value_specs)r   rp   s     r@   r#  r#  b  s(    f++,QKrB   c                 "   t        j                         rQt        j                  | j                  j                        5  | j                         j                         cddd       S t        j                  S # 1 sw Y   t        j                  S xY w)z'Returns the cardinality of the dataset.N)	r   rt  r   r   r  r   numpyr   r  )r=   s    r@   r   r   j  sh     	G++22	3 +  "((*+ +		 	  +		 	  s   A66Bc           	          t        | j                  dt        | j                  dd            sy|t        j                  k(  ryt	        |j
                         xs | j                  j                         S )z7Returns whether to enable using partial batch handling.enable_partial_batch_handling(experimental_enable_get_next_as_optionalF)r   r6   r   r   r   r0   r   )r+   r=   r   s      r@   r   r   r  sz     
8h!K
 
 O,,," 
 
I'00FFHIrB   c                 J    t        |      }t        j                  | |      }|S )ay  Creates a PerReplica.

  For strategies other than OneDeviceStrategy, it creates a PerReplica whose
  type spec is set to the element spec of the dataset. This helps avoid
  retracing for partial batches. Retracing is problematic for multi client when
  different client retraces different time, since retracing changes the
  collective keys in the tf.function, and causes mismatches among clients.

  For single client strategies, this simply calls distribute_utils.regroup().

  Args:
    value_list: a list of values, one for each replica.
    strategy: the `tf.distribute.Strategy`.

  Returns:
    a structure of PerReplica.

  )always_wrap)rx  r   regroup)r   r+   r  per_replicass       r@   r   r     s&    ( X&+!))*+N,	rB   c                 ~    | j                   j                         xs" t        | j                   j                        dkD  S )zAReturns whether to always wrap the values in a DistributedValues.r%   )r6   r   re   ri   )r+   s    r@   rx  rx    s<    				0	0	2 
,c&&7(*+7, ,rB   c                     t        | t        j                        sJ |        d }t        j                  t        j                  || j
                         S )z3Rebatch the spec to have a dynamic batch dimension.c                 b    	 | j                         j                  d       S # t        $ r Y | S w xY wrG   )_unbatch_batchr   )r   s    r@   _rebatchz%_rebatch_as_dynamic.<locals>._rebatch  s5    ]]_##D)) 
Ks   ! 	..)r1   r   rs  r#   r  r{  )per_replica_specr  s     r@   r=  r=    sR    	$f&;&;	<N>NN	< 
		($4$A$AB
D DrB   c                 L    d| j                   j                   d}t        |      )Nzenumerate not supported with z` types within tf.functions. Use a for loop over the dataset and keep a separate counter instead.)ru   rv   r   )r  unused_startmsgs      r@   _ag_enumerate_not_implementedr    s0    %akk&:&:%; <  
 	C  rB   )NF)gr   r!  r  rq  r  %tensorflow.python.autograph.operatorsr   'tensorflow.python.data.experimental.opsr   r   r   r   tensorflow.python.data.opsr   r   r	   r
   tensorflow.python.distributer   r   r   r   r   r   +tensorflow.python.distribute.distribute_libr   tensorflow.python.eagerr   r   tensorflow.python.frameworkr   r   r   r   r   r   r   r   r   r   tensorflow.python.opsr   r   r   r   r    tensorflow.python.ops.raggedr!   tensorflow.python.platformr"   r:   tensorflow.python.typesdistribute_typestensorflow.python.utilr#   tensorflow.python.util.compatr$   SamplerExponentialBucketsrv  r  rA   objectrD   r   r   IteratorDistributedIteratorInterfacer   TypeSpecr   r5   CompositeTensorr  IterableDistributedDatasetInterfacerB  r3   rX  r4   r  r  r   r  r$  r  r  r  rA  rK  rP  r   rT  rl  r#  r   r   r   rx  r=  r  enumerate_registryregisterrH   rB   r@   <module>r     s   7  
  
 = < R > 2 3 @ 3 4 7 9 2 4 / L + . 8 ; . . + 5 4 3 1 + 1 * , 6 < B ' 9 9K
8J8J;!J!!LJ	9 5 J	M%
%%12	7	I D&T5$6 5$p4::L6o66.KKL6^QG	(:(: QGh.H? .HbI1*::IX _--%AA F(H> (HXqG)9)I)I qGh	$ *K $ P|Nn&6&F&F|N~6Br FH1v H1VCIy'9'9 CILO(H(8(H(HOh *.6;*
(=$$.5*c$v c$LA:!I22,D$!    ' '6    ' '5rB   