
    BVhE              
       d   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 ddlmZ d	efd
Z ed       G d d ej                  dg d                   Z edg        G d d             Z ed       G d d ej                  dg d                   Z edg        G d d             Zy)z0A Python interface for creating dataset servers.    N)Iterable)service_config_pb2)pywrap_tensorflow)_pywrap_server_lib)_pywrap_utils_exp)	tf_exportreturnc                     | dk(  ry| y| S )zCModifies time-based config values to account for special behaviors.r       )values    f/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/data/experimental/service/server_lib.py_get_time_or_placeholderr      s     aZ
]	,    z*data.experimental.service.DispatcherConfigc                   6     e Zd ZdZ	 	 	 	 	 	 	 	 	 d fd	Z xZS )DispatcherConfiga  Configuration class for tf.data service dispatchers.

  Fields:
    port: Specifies the port to bind to. A value of 0 indicates that the server
      may bind to any available port.
    protocol: The protocol to use for communicating with the tf.data service,
      e.g. "grpc".
    work_dir: A directory to store dispatcher state in. This
      argument is required for the dispatcher to be able to recover from
      restarts.
    fault_tolerant_mode: Whether the dispatcher should write its state to a
      journal so that it can recover from restarts. Dispatcher state, including
      registered datasets and created jobs, is synchronously written to the
      journal before responding to RPCs. If `True`, `work_dir` must also be
      specified.
    worker_addresses: If the job uses auto-sharding, it needs to specify a fixed
      list of worker addresses that will register with the dispatcher. The
      worker addresses should be in the format `"host"` or `"host:port"`, where
      `"port"` is an integer, named port, or `%port%` to match any port.
    job_gc_check_interval_ms: How often the dispatcher should scan through to
      delete old and unused jobs, in milliseconds. If not set, the runtime will
      select a reasonable default. A higher value will reduce load on the
      dispatcher, while a lower value will reduce the time it takes for the
      dispatcher to garbage collect expired jobs.
    job_gc_timeout_ms: How long a job needs to be unused before it becomes a
      candidate for garbage collection, in milliseconds. A value of -1 indicates
      that jobs should never be garbage collected. If not set, the runtime will
      select a reasonable default. A higher value will cause jobs to stay around
      longer with no consumers. This is useful if there is a large gap in
      time between when consumers read from the job. A lower value will reduce
      the time it takes to reclaim the resources from expired jobs.
    worker_timeout_ms: How long to wait for a worker to heartbeat before
      considering it missing. If not set, the runtime will select a reasonable
      default.
    worker_max_concurrent_snapshots: The maximum number of snapshots a worker
      can concurrently process.
  c
                     |t        j                         }t        |      }t        |      }t        
|   | |||||||||	
      S N)r   TF_DATA_DefaultProtocolr   super__new__)clsportprotocolwork_dirfault_tolerant_modeworker_addressesjob_gc_check_interval_msjob_gc_timeout_msworker_timeout_msworker_max_concurrent_snapshots	__class__s             r   r   zDispatcherConfig.__new__c   sb     "::<h7  "01BC7? ' r   )	r   NNFNNNNr   __name__
__module____qualname____doc__r   __classcell__r"   s   @r   r   r   ,   s0    "$P #&' r   r   	r   r   r   r   r   r   r   r    r!   z(data.experimental.service.DispatchServer)v1c                       e Zd ZdZddZd ZddZddZede	fd       Z
dd	Zdd
Zede	fd       ZdefdZdeej$                     fdZy)DispatchServera.  An in-process tf.data service dispatch server.

  A `tf.data.experimental.service.DispatchServer` coordinates a cluster of
  `tf.data.experimental.service.WorkerServer`s. When the workers start, they
  register themselves with the dispatcher.

  >>> dispatcher = tf.data.experimental.service.DispatchServer()
  >>> dispatcher_address = dispatcher.target.split("://")[1]
  >>> worker = tf.data.experimental.service.WorkerServer(
  ...     tf.data.experimental.service.WorkerConfig(
  ...     dispatcher_address=dispatcher_address))
  >>> dataset = tf.data.Dataset.range(10)
  >>> dataset = dataset.apply(tf.data.experimental.service.distribute(
  ...     processing_mode="parallel_epochs", service=dispatcher.target))
  >>> [a.item() for a in dataset.as_numpy_iterator()]
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

  When starting a dedicated tf.data dispatch process, use join() to block
  after starting up the server, until the server terminates.

  ```
  dispatcher = tf.data.experimental.service.DispatchServer(
      tf.data.experimental.service.DispatcherConfig(port=5050))
  dispatcher.join()
  ```

  Call stop() to gracefully terminate the dispatcher. The server automatically
  stops when all reference to it have been deleted.

  To start a `DispatchServer` in fault-tolerant mode, set `work_dir` and
  `fault_tolerant_mode` like below:

  ```
  dispatcher = tf.data.experimental.service.DispatchServer(
      tf.data.experimental.service.DispatcherConfig(
          port=5050,
          work_dir="gs://my-bucket/dispatcher/work_dir",
          fault_tolerant_mode=True))
  ```
  Nc                 (   |xs
 t               }|j                  r|j                  st        d      || _        t        |t        j                         r|}nxt        j                   |j                  |j                  |j                  |j                  |j                  |j                  |j                  |j                  |j                  	      }t        j                  |j!                               | _        |r| j"                  j%                          yy)aO  Creates a new dispatch server.

    Args:
      config: (Optional.) A `tf.data.experimental.service.DispatcherConfig`
        configration. If `None`, the dispatcher will use default
        configuration values.
      start: (Optional.) Boolean, indicating whether to start the server after
        creating it. Defaults to True.
    zCannot enable fault tolerant mode without configuring a work dir. Make sure to set `work_dir` in the `config` object passed to `DispatcherServer`.r*   N)r   r   r   
ValueError_config
isinstancer   r   r   r   r   r   r    r!   r   TF_DATA_NewDispatchServerSerializeToString_serverstartselfconfigr5   config_protos       r   __init__zDispatchServer.__init__   s     )')F!!&// ! ! DL&,==>l'88{{????$88!22#)#B#B"44"44*0*P*P
l &??&&(*DL
ll r   c                 8    | j                   j                          y)zStarts this server.

    >>> dispatcher = tf.data.experimental.service.DispatchServer(start=False)
    >>> dispatcher.start()

    Raises:
      tf.errors.OpError: Or one of its subclasses if an error occurs while
        starting the server.
    Nr4   r5   r7   s    r   r5   zDispatchServer.start   s     	LLr   r	   c                 8    | j                   j                          y)a  Blocks until the server has shut down.

    This is useful when starting a dedicated dispatch process.

    ```
    dispatcher = tf.data.experimental.service.DispatchServer(
        tf.data.experimental.service.DispatcherConfig(port=5050))
    dispatcher.join()
    ```

    Raises:
      tf.errors.OpError: Or one of its subclasses if an error occurs while
        joining the server.
    Nr4   joinr=   s    r   r@   zDispatchServer.join   s     	LLr   c                 $    | j                          yzStops the server.

    Raises:
      tf.errors.OpError: Or one of its subclasses if an error occurs while
        stopping the server.
    N_stopr=   s    r   stopzDispatchServer.stop        	JJLr   c                 ~    dj                  | j                  j                  | j                  j	                               S )a  Returns a target that can be used to connect to the server.

    >>> dispatcher = tf.data.experimental.service.DispatchServer()
    >>> dataset = tf.data.Dataset.range(10)
    >>> dataset = dataset.apply(tf.data.experimental.service.distribute(
    ...     processing_mode="parallel_epochs", service=dispatcher.target))

    The returned string will be in the form protocol://address, e.g.
    "grpc://localhost:5050".
    z{0}://localhost:{1})formatr0   r   r4   
bound_portr=   s    r   targetzDispatchServer.target   s5     !''(=(=(,(?(?(AC Cr   c                 8    | j                   j                          yrB   r4   rE   r=   s    r   rD   zDispatchServer._stop       	LLr   c                 $    | j                          y r   rC   r=   s    r   __del__zDispatchServer.__del__      JJLr   c                 T    dj                  | j                  j                               S zyReturns the address of the server.

    The returned string will be in the form address:port, e.g. "localhost:1000".
    zlocalhost:{0}rH   r4   rI   r=   s    r   _addresszDispatchServer._address  "     !!$,,"9"9";<<r   c                 6    | j                   j                         S )z=Returns the number of workers registered with the dispatcher.)r4   num_workersr=   s    r   _num_workerszDispatchServer._num_workers  s    <<##%%r   c                 8    | j                   j                  |      S )z9Returns information about all the streams for a snapshot.)r4   snapshot_streams)r7   paths     r   _snapshot_streamsz DispatchServer._snapshot_streams  s     <<((..r   )NTr	   N)r$   r%   r&   r'   r:   r5   r@   rE   propertystrrJ   rD   rO   rT   intrX   r   r   SnapshotStreamInfoWrapperr\   r   r   r   r-   r-      s    'R"H
" Cc C C = = =&C &/0JJK/r   r-   z&data.experimental.service.WorkerConfigc                   2     e Zd ZdZ	 	 	 	 	 	 	 d fd	Z xZS )WorkerConfiga  Configuration class for tf.data service dispatchers.

  Fields:
    dispatcher_address: Specifies the address of the dispatcher.
    worker_address: Specifies the address of the worker server. This address is
      passed to the dispatcher so that the dispatcher can tell clients how to
      connect to this worker.
    port: Specifies the port to bind to. A value of 0 indicates that the worker
      can bind to any available port.
    protocol: A string indicating the protocol to be used by the worker to
      connect to the dispatcher. E.g. "grpc".
    heartbeat_interval_ms: How often the worker should heartbeat to the
      dispatcher, in milliseconds. If not set, the runtime will select a
      reasonable default. A higher value will reduce the load on the dispatcher,
      while a lower value will reduce the time it takes to reclaim resources
      from finished jobs.
    dispatcher_timeout_ms: How long, in milliseconds, to retry requests to the
      dispatcher before giving up and reporting an error. Defaults to 1 hour.
    data_transfer_protocol: A string indicating the protocol to be used by the
      worker to transfer data to the client. E.g. "grpc".
    data_transfer_address: A string indicating the data transfer address of the
      worker server.
  c	                     |d}|t        j                         }|d}t        |      }t        |      }t        t        |   | ||||||||	      S )Nzlocalhost:%port%zlocalhost:%dts_port%)r   r   r   r   rc   r   )
r   dispatcher_addressworker_addressr   r   heartbeat_interval_msdispatcher_timeout_msdata_transfer_protocoldata_transfer_addressr"   s
            r   r   zWorkerConfig.__new__C  sx     )n"::<h$445JK45JK!#5~t&(=35K355r   )Nr   NNNNNr#   r)   s   @r   rc   rc   $  s)    4 "$($(%)$(5 5r   rc   re   rf   r   r   rg   rh   ri   rj   z&data.experimental.service.WorkerServerc                       e Zd ZdZddZddZddZddZddZdd	Z	e
defd
       ZdefdZdeej"                     fdZy)WorkerServera  An in-process tf.data service worker server.

  A `tf.data.experimental.service.WorkerServer` performs `tf.data.Dataset`
  processing for user-defined datasets, and provides the resulting elements over
  RPC. A worker is associated with a single
  `tf.data.experimental.service.DispatchServer`.

  >>> dispatcher = tf.data.experimental.service.DispatchServer()
  >>> dispatcher_address = dispatcher.target.split("://")[1]
  >>> worker = tf.data.experimental.service.WorkerServer(
  ...     tf.data.experimental.service.WorkerConfig(
  ...         dispatcher_address=dispatcher_address))
  >>> dataset = tf.data.Dataset.range(10)
  >>> dataset = dataset.apply(tf.data.experimental.service.distribute(
  ...     processing_mode="parallel_epochs", service=dispatcher.target))
  >>> [a.item() for a in dataset.as_numpy_iterator()]
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

  When starting a dedicated tf.data worker process, use join() to block
  after starting up the worker, until the worker terminates.

  ```
  worker = tf.data.experimental.service.WorkerServer(
      port=5051, dispatcher_address="localhost:5050")
  worker.join()
  ```

  Call stop() to gracefully terminate the worker. The worker automatically stops
  when all reference to it have been deleted.
  c           
         |j                   t        d      t        |t        j                        r|}nmt        j                  |j                   |j
                  |j                  |j                  |j                  |j                  |j                  |j                        }t        j                  |j                               | _        |r| j                  j!                          yy)zCreates a new worker server.

    Args:
      config: A `tf.data.experimental.service.WorkerConfig` configration.
      start: (Optional.) Boolean, indicating whether to start the server after
        creating it. Defaults to True.
    NzMMust specify a `dispatcher_address` in the `config` passed to `WorkerServer`.rk   )re   r/   r1   r   rc   rf   r   r   rg   rh   ri   rj   r   TF_DATA_NewWorkerServerr3   r4   r5   r6   s       r   r:   zWorkerServer.__init__}  s       (    &,99:l'44#66..{{?? & < < & < <!'!>!> & < <>l &==&&(*DL
ll r   r	   Nc                 8    | j                   j                          y)zStarts this server.

    Raises:
      tf.errors.OpError: Or one of its subclasses if an error occurs while
        starting the server.
    Nr<   r=   s    r   r5   zWorkerServer.start  s     	LLr   c                 8    | j                   j                          y)a  Blocks until the server has shut down.

    This is useful when starting a dedicated worker process.

    ```
    worker_server = tf.data.experimental.service.WorkerServer(
        port=5051, dispatcher_address="localhost:5050")
    worker_server.join()
    ```

    This method currently blocks forever.

    Raises:
      tf.errors.OpError: Or one of its subclasses if an error occurs while
        joining the server.
    Nr?   r=   s    r   r@   zWorkerServer.join  s    " 	LLr   c                 $    | j                          yrB   rC   r=   s    r   rE   zWorkerServer.stop  rF   r   c                 8    | j                   j                          yrB   rL   r=   s    r   rD   zWorkerServer._stop  rM   r   c                 $    | j                          y r   rC   r=   s    r   rO   zWorkerServer.__del__  rP   r   c                 T    dj                  | j                  j                               S rR   rS   r=   s    r   rT   zWorkerServer._address  rU   r   c                 6    | j                   j                         S )zCReturns the number of tasks currently being executed on the worker.)r4   	num_tasksr=   s    r   
_num_taskszWorkerServer._num_tasks  s    <<!!##r   c                 6    | j                   j                         S )zReturns the progresses of the snapshot tasks currently being executed.

    Returns:
      An `Iterable[common_pb2.SnapshotTaskProgress]`.
    )r4   snapshot_task_progressesr=   s    r   _snapshot_task_progressesz&WorkerServer._snapshot_task_progresses  s     <<0022r   )Tr]   )r$   r%   r&   r'   r:   r5   r@   rE   rD   rO   r^   r_   rT   r`   rx   r   r   SnapshotTaskProgressWrapperr{   r   r   r   rm   rm   \  se    >:& = = =$# $3*FFG3r   rm   )r'   collectionstypingr   tensorflow.core.protobufr   tensorflow.pythonr   +tensorflow.python.data.experimental.servicer   r    tensorflow.python.util.tf_exportr   r`   r   
namedtupler   r-   rc   rm   r   r   r   <module>r      s    7   8 / J I 6s   78RK
	
R 9Rj 5"=^/ ^/ >^/B 3445K> , 45 545n 3;A3 A3 <A3r   