
    BVhJ                        d Z ddlmZmZmZ ddlmc mc mc m	c m
Z
 ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ  ddl!m"Z" ddl#m$Z$ dejJ                  fdZ&dejJ                  fdZ' e$dg        G d de(             Z) e$dg        G d de(             Z* G d de)      Z+ G d d e*      Z, G d! d"e(      Z-y)#z(Module to expose RPC APIs in tensorflow.    )OptionalSequenceUnionN)tf_rpc_service_pb2)context)def_function)function)constant_op)dtypes)errors)none_tensor)	type_spec)math_ops)resource_variable_ops)nested_structure_coder)core)nest)	tf_exportfuncc                     t        j                  t        j                  | j                        }t        j                  |      }|j                         S N)r   map_structurer   type_spec_from_valuestructured_outputsr   encode_structureSerializeToString)r   output_specsoutput_specs_protos      e/home/dcms/DCMS/lib/python3.12/site-packages/tensorflow/python/distribute/experimental/rpc/rpc_ops.pyget_output_specs_from_functionr    %   sC    ##I$B$B$($;$;=,->>|L		-	-	//    c                 j    | j                   \  }}t        j                  |      }|j                         S r   )structured_input_signaturer   r   r   )r   	arg_specs_arg_specs_protos       r   get_input_specs_from_functionr'   ,   s0    00,)Q*;;IF/		*	*	,,r!   z"distribute.experimental.rpc.Server)v1c                   h    e Zd ZdZed        Zdedeej                  e
j                  f   fdZd Zy)ServerzA Server base class for accepting RPCs for registered tf.functions.

    Functions can be registered on the server and are exposed via RPCs.
  c                 :    | dk7  rt        d      t        |      S )a  Create TF RPC server at given address.

    Args:
      rpc_layer: Communication layer between client and server. Only "grpc" rpc
        layer is supported at the moment.
      address: Address where RPC server is hosted.

    Returns:
      An instance of `tf.distribute.experimental.rpc.Server` class.

    Raises:
        A ValueError if rpc_layer other than "grpc" is used. Only GRPC
        is supported at the moment.

    Example usage:

      >>> import portpicker
      >>> @tf.function(input_signature=[
      ...      tf.TensorSpec([], tf.int32),
      ...      tf.TensorSpec([], tf.int32)])
      ... def remote_fn(a, b):
      ...   return tf.add(a, b)

      >>> port = portpicker.pick_unused_port()
      >>> address = "localhost:{}".format(port)
      >>> server = tf.distribute.experimental.rpc.Server.create("grpc", address)
      >>> server.register("addition", remote_fn)
      >>> server.start()

    grpc-Only GRPC backend is supported at the moment.)address)
ValueError
GrpcServer)	rpc_layerr.   s     r   createzServer.create9   s$    @ FFGGg&&r!   method_namer   c                     t        d      )a  Method for registering tf.function on server.

    Registered methods can be invoked remotely from clients.

    Args:
      method_name: Name of the tf.function. Clients use this method_name to make
        RPCs.
      func: A `tf.function` or ConcreteFunction to register.
    GPlease use create_server method to create aconcrete subclass of Server.NotImplementedError)selfr3   r   s      r   registerzServer.register]   s      = > >r!   c                     t        d      )zvStarts the RPC server on provided address.

     Server listens for new requests from client, once it is started.
    r5   r6   r8   s    r   startzServer.startl   s    
  = > >r!   N)__name__
__module____qualname____doc__staticmethodr2   strr   r   Functiontf_functionConcreteFunctionr9   r<    r!   r   r*   r*   2   sP    
 !' !'F># ><00&778 9>>r!   r*   z"distribute.experimental.rpc.Clientc                   Z    e Zd ZdZedd       Z	 	 	 ddedeee	j                        fdZy)	Clientz-Client class for invoking RPCs to the server.c                 r    | dk7  rt        d      t        j                         rd}nd}t        ||||      S )a  Create TF RPC client to connect to the given address.

    Args:
      rpc_layer: Communication layer between client and server. Only "grpc" rpc
        layer is supported at the moment.
      address: Address of the server to connect the RPC client to.
      name: Name of the RPC Client. You can create multiple clients connecting
        to same server and distinguish them using different names.
      timeout_in_ms: The default timeout to use for outgoing RPCs from client. 0
        indicates no timeout. Exceeding timeout during RPC will raise
        DeadlineExceeded error.

    Returns:
      An instance of `tf.distribute.experimental.rpc.Client` with the following
      dynamically added methods for eagerly created clients:
        * `Registered methods` e.g. multiply(**args):
            If Client is created when executing eagerly, client will request the
            list of registered methods from server during client creation.
            The convenience methods for RPCs will be dynamically added to the
            created Client instance.

            For example, when a server has method "multiply" registered, the
            client object created in eager mode will have 'multiply' method
            available. Users can use client.multiply(..) to make RPC, instead of
            client.call("multiply", ...)

            Both "call" and "multiply" methods are non-blocking i.e. they return
            a StatusOrResult object which should be used to wait for getting
            value or error.

            Along with the above, blocking versions of the registered
            methods are also dynamically added to client instance.
            e.g. multiply_blocking(**args). These methods block till the RPC is
            finished and return response for successful RPC. Otherwise raise
            exception.

            These methods are not available when Client is created inside a
            tf.function.

    Raises:
        A ValueError if rpc_layer other than "grpc" is used. Only GRPC
          is supported at the moment.
        A DeadlineExceeded exception in eager mode if timeout exceeds while
          creating and listing client methods.

    Example usage:
      >>> # Have server already started.
      >>> import portpicker
      >>> @tf.function(input_signature=[
      ...      tf.TensorSpec([], tf.int32),
      ...      tf.TensorSpec([], tf.int32)])
      ... def remote_fn(a, b):
      ...   return tf.add(a, b)

      >>> port = portpicker.pick_unused_port()
      >>> address = "localhost:{}".format(port)
      >>> server = tf.distribute.experimental.rpc.Server.create("grpc", address)
      >>> server.register("addition", remote_fn)
      >>> server.start()

      >>> # Start client
      >>> client = tf.distribute.experimental.rpc.Client.create("grpc",
      ...      address=address, name="test_client")

      >>> a = tf.constant(2, dtype=tf.int32)
      >>> b = tf.constant(3, dtype=tf.int32)

      >>> result = client.call(
      ...    args=[a, b],
      ...    method_name="addition",
      ...    output_specs=tf.TensorSpec((), tf.int32))

      >>> if result.is_ok():
      ...   result.get_value()

      >>> result = client.addition(a, b)

      >>> if result.is_ok():
      ...   result.get_value()

      >>> value = client.addition_blocking(a, b)
    r,   r-   TF)r.   namelist_registered_methodstimeout_in_ms)r/   r   executing_eagerly
GrpcClient)r1   r.   rJ   rL   rK   s        r   r2   zClient.createy   sJ    h FFGG  " $ % 7#	% %r!   Nr3   argsc                     t        d      )a  Method for making RPC calls to remote server.

    This invokes RPC to the server, executing the registered method_name
    remotely.
    Args:
      method_name: Remote registered method to invoke
      args: List of arguments for the registered method.
      output_specs: Output specs for the output from method.
         For example, if tf.function is: @tf.function(input_signature=[
           tf.TensorSpec([], tf.int32), tf.TensorSpec([], tf.int32) ])
          def multiply_fn(a, b): return tf.math.multiply(a, b)
        output_spec is: tf.TensorSpec((), tf.int32)  If you have access to TF
          Function, the output specs can be generated
       from tf.function by calling: output_specs =
         tf.nest.map_structure(tf.type_spec_from_value,
         tf_function.get_concrete_function().structured_outputs  If output_specs
         are not provided, flattened list of tensors will be returned in
         response.
      timeout_in_ms: Timeout for this call. If 0, default client timeout will be
        used.

    Returns:
      An instance of `StatusOrResult` class with the following available
      methods.
        * `is_ok()`:
            Returns True of RPC was successful.
        * `get_error()`:
            Returns TF error_code and error message for the RPC.
        * `get_value()`:
            Returns the returned value from remote TF function execution
            when RPC is successful.

      Calling any of the above methods will block till RPC is completed and
      result is available.
    z)Must be implemented in inherited classes.r6   )r8   r3   rO   r   rL   s        r   callzClient.call   s    P I
JJr!   ) r   NNr   )r=   r>   r?   r@   rA   r2   rB   r   r   core_tf_typesTensorrQ   rF   r!   r   rH   rH   u   sN    5]% ]%B =A	(K(K(=#7#789(Kr!   rH   c                   d    e Zd ZdZdefdZdedeej                  e	j                  f   fdZd Zy)	r0   a6  GrpcServer object encapsulates a resource with GRPC server.

    Functions can be registered locally and are exposed via RPCs.
    Example:
    ```
    server = rpc_ops.GrpcServer("host:port")
    @tf.function
    def add(a, b):
      return a + b

    server.register("add", add)
    server.start()
    ```
  r.   c                     t        j                  |      | _        t        j                         r;t        j                  | j                  | j                  j                        | _        y t        d      )Nhandlehandle_devicez-Please create the server outside tf.function.)
gen_rpc_ops
rpc_server_server_handler   rM   r   EagerResourceDeleterdevice_handle_deleterr7   )r8   r.   s     r   __init__zGrpcServer.__init__  sZ    %009D  "2GG$$D4G4G4N4NPd   OPPr!   r3   r   c           	         t        |t        j                        r|j                  j                  r|j
                  t        d      |j                         }t        j                  | j                  ||j                  t        |      t        |      |       yt        |t        j                        rBt        j                  | j                  ||j                  t        |      t        |      |       yt        d      )z!Method for registering functions.Nz/Input signature not specified for the function.)r3   captured_inputsinput_specsr   fz4Only TF functions are supported with Register method)
isinstancer   rC   function_spec	arg_namesinput_signaturer/   get_concrete_functionr[   rpc_server_registerr]   rc   r'   r    rD   rE   )r8   r3   r   concrete_fns       r   r9   zGrpcServer.register  s    
 $--.				%	%'LM
M..0k%%


!%553K@5kB 
D+66	7%%


!..3D95d; MNNr!   c                 B    t        j                  | j                         y)zStarts GRPC server.N)r[   rpc_server_startr]   r;   s    r   r<   zGrpcServer.start:  s      !4!45r!   N)r=   r>   r?   r@   rB   ra   r   r   rC   rD   rE   r9   r<   rF   r!   r   r0   r0     sL    Qc QO# O<00&778 9O<6r!   r0   c                   f    e Zd ZdZ	 	 	 d
dedefdZd Z	 	 	 ddedeee	j                        fd	Zy)rN   a|  Client wrapper to connect to remote RPC server using GRPC.

  If Client is created with (list_registered_methods=True):
  1. Input and output specs for the methods till this point will be fetched from
  Server.
  2. convenience methods are added to invoke registered methods directly from
  client.
  For example:
    For call a server method `add`
    client.add(a, b) or client.add_async(a, b) can be used instead of
    client.call(args=[a,b], output_specs=[..])

  Prerequisite for using list_registered_methods=True:
   1. Server should be already started with the registered methods.
   2. Client must be created in Eager mode.
  r.   rJ   c                    t        j                  ||||      \  | _        }t        j                         r;t        j                  | j                  | j                  j                        | _        nt        d      || _
        i | _        |j                         D ]  }t        j                         }|j                  |       t!        j"                  |j$                        }t!        j"                  |j&                        }	|| j                  |j(                  <   d|j(                  z   dz   |z   }
| j+                  |j(                  ||	| j                  |
        y )N)shared_nameserver_addressrK   rL   rX   z0Client creation is supported only in eager mode.zRPC Call for z method to server )r[   
rpc_client_client_handler   rM   r   r^   r_   r`   r7   _server_address_method_registrynumpyrpc_pb2RegisteredMethodParseFromStringr   decode_protor   rd   method_add_method)r8   r.   rJ   rK   rL   methodsr|   mr   rd   
doc_strings              r   ra   zGrpcClient.__init__Q  s.   
 $/#9#9 7#	$% D
   "2GG$$D4G4G4N4NPd  
<> >"DD--/ 
#

"
"
$a+88Hl*77Fk(4dAHH% #QXX-0DDwNj
qxx{D<O<O!#
#r!   c                    	 fd	dd	fd
}dd	fd
}t        | |       ||_        dz   }t        | ||       ||_        y)z/Method to add RPC methods to the client object.c                  j    | g } rt        j                  |        t        j                  |       }|S r   )r   assert_same_structureflatten)rO   flat_inputsrd   s     r   validate_and_get_flat_inputsz<GrpcClient._add_method.<locals>.validate_and_get_flat_inputss  s4    		""45LL&kr!   r   )rL   c                 \    t        j                   | |       \  }}t        ||      S NrO   r3   rL   )r[   rpc_callStatusOrResult)rL   rO   	status_ordeleterclient_handler3   r   r   s       r   call_wrapperz,GrpcClient._add_method.<locals>.call_wrapper{  s:    &//
+T2!%	'i
 Iw==r!   c                 4   t        j                   	| |       \  }}t        ||      }|j                         r|j	                         S |j                         \  }} t        j                  |j                               d d |j                               r   )	r[   r   r   is_ok	get_value	get_errorr   exception_type_from_error_coderw   )
rL   rO   r   r   
error_code	error_msgr   r3   r   r   s
         r   call_blocking_wrapperz5GrpcClient._add_method.<locals>.call_blocking_wrapper  s    &//
+T2!%	'i
 !G\Bi		""$$ ) 3 3 5
IGf33J4D4D4FG$	)+ 	+r!   	_blockingN)setattrr@   )
r8   r3   r   rd   r   r   r   r   blocking_method_namer   s
    ````    @r   r}   zGrpcClient._add_methodo  s^     +, > > 45 + + D+|,%L&4D&(=>$.!r!   Nr3   rO   c                     |g }t        j                  | j                  t        j                  |      ||      \  }}t        |||      S )a  Method to invoke remote registered functions on the connected server.

    Server should be started before making an RPC Call.

    Args:
      method_name: Registered method to invoke on Server.
      args: Input arguments for the method.
      output_specs: Output specs for the output from method.
      timeout_in_ms: Timeout for this call. If 0, default client timeout will be
       used.

    Returns:
      StatusOrResult object. This function issues the RPC call to server, it
      does not block for the duration of RPC. Please call is_ok, get_error or
      get_value methods on the returned object to blocked till RPC finishes.
    r   )r[   r   rt   r   r   r   )r8   r3   rO   r   rL   r   r   s          r   rQ   zGrpcClient.call  sO    * |d$--\\$#	%Iw
 )Wl;;r!   )rR   Fr   rS   )r=   r>   r?   r@   rB   ra   r}   r   r   rT   rU   rQ   rF   r!   r   rN   rN   ?  s^    & ',	###<'/V =A	<<(=#7#789<r!   rN   c                   6    e Zd ZdZd	dZd Zd Zd Zd Zd Z	y)
r   z3Class representing result and status from RPC Call.Nc                 J    || _         || _        || _        d | _        d | _        y r   )
_status_or_output_specs_deleter_error_code_error_message)r8   r   r   r   s       r   ra   zStatusOrResult.__init__  s(    DO%DDM%)D)-Dr!   c                 v    | j                   -t        j                  | j                        \  | _         | _        y y r   )r   r[   rpc_check_statusr   r   r;   s    r   _check_statuszStatusOrResult._check_status  s5    .9.J.J
///+d+  r!   c                 ~   t        j                         rIt        j                         5  t        j                  | j
                  | j                         d d d        y t        j                         5  t        j                  | j
                  | j                         d d d        y # 1 sw Y   y xY w# 1 sw Y   y xY w)N)rY   r   )r   rM   
eager_moder[   delete_rpc_future_resourcer   r   
graph_moder;   s    r   __del__zStatusOrResult.__del__  s      " ;..??DMM	;; ;  ;..??DMM	;; ;	; ;; ;s   ,B'2,B3'B03B<c                     | j                          t        j                  | j                  t	        j
                  dt        j                              S )zjReturns True if RPC is successful, otherwise returns False.

    This call will block for RPC result.
    r   )dtype)r   r   equalr   r
   constantr   int64r;   s    r   r   zStatusOrResult.is_ok  s?    
 	>>$**%..qEG Gr!   c                 R    | j                          | j                  | j                  fS )zhReturns (TF Error Code, Error Message) from RPC Response.

    This call will block for RPC result.
    )r   r   r   r;   s    r   r   zStatusOrResult.get_error  s'    
 	T0000r!   c                    | j                          | j                  $t        | j                  t        j                        rg }d}n8d}t        j                  | j                        D cg c]  }|j                   }}t        j                  | j                  |      }|ryt        j                  | j                  |      S c c}w )zReturns the returned response value from RPC Call when RPC is successful.

      The returned value is tensors in the output_specs format as returned from
      the RPC call


    This call will block for RPC result.
    NTF)Tout)r   r   rf   r   NoneTensorSpecr   r   r   r[   rpc_get_valuer   pack_sequence_as)r8   flat_output_dtypesreturn_nonesresults        r   r   zStatusOrResult.get_value  s     	!Z0B0B0;0J0J&Lkk-1\\$:L:L-MNAGGNN&&t=OPF""4#5#5v>> Os   )Cr   )
r=   r>   r?   r@   ra   r   r   r   r   r   rF   r!   r   r   r     s$    ;.
	;G1?r!   r   ).r@   typingr   r   r   :tensorflow.distribute.experimental.rpc.kernels.gen_rpc_ops
distributeexperimentalrpckernelsr[   ,tensorflow.distribute.experimental.rpc.protor   rx   tensorflow.python.eagerr   r   r	   rD   tensorflow.python.frameworkr
   r   r   r   r   tensorflow.python.opsr   r   tensorflow.python.saved_modelr   tensorflow.python.typesr   rT   tensorflow.python.utilr    tensorflow.python.util.tf_exportr   rE   r    r'   objectr*   rH   r0   rN   r   rF   r!   r   <module>r      s    / , , P P P V + 0 ; 3 . . 3 1 * 7 @ 9 ' 60)E)E 0-(D(D - /B7?>V ?> 8?>D /B7KKV KK 8KK\86 86vu< u<pB?V B?r!   