
    2Vh\                     .   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
Zd dlmZ d dlmZ d dlmZ  eddg       G d d	             Z G d
 de      Zi ada ej,                         Zdadad Zd Zd Z G d d      Z G d de      ZddZy)    N)closing)keras_export)data_adapter_utils)DataAdapterzkeras.utils.PyDatasetzkeras.utils.Sequencec                       e Zd ZdZddZd Zed        Zej                  d        Zed        Z	e	j                  d        Z	ed        Z
e
j                  d	        Z
d
 Zed        Zd Zd Zy)	PyDataseta  Base class for defining a parallel dataset using Python code.

    Every `PyDataset` must implement the `__getitem__()` and the `__len__()`
    methods. If you want to modify your dataset between epochs,
    you may additionally implement `on_epoch_end()`,
    or `on_epoch_begin` to be called at the start of each epoch.
    The `__getitem__()` method should return a complete batch
    (not a single sample), and the `__len__` method should return
    the number of batches in the dataset (rather than the number of samples).

    Args:
        workers: Number of workers to use in multithreading or
            multiprocessing.
        use_multiprocessing: Whether to use Python multiprocessing for
            parallelism. Setting this to `True` means that your
            dataset will be replicated in multiple forked processes.
            This is necessary to gain compute-level (rather than I/O level)
            benefits from parallelism. However it can only be set to
            `True` if your dataset can be safely pickled.
        max_queue_size: Maximum number of batches to keep in the queue
            when iterating over the dataset in a multithreaded or
            multiprocessed setting.
            Reduce this value to reduce the CPU memory consumption of
            your dataset. Defaults to 10.

    Notes:

    - `PyDataset` is a safer way to do multiprocessing.
        This structure guarantees that the model will only train
        once on each sample per epoch, which is not the case
        with Python generators.
    - The arguments `workers`, `use_multiprocessing`, and `max_queue_size`
        exist to configure how `fit()` uses parallelism to iterate
        over the dataset. They are not being used by the `PyDataset` class
        directly. When you are manually iterating over a `PyDataset`,
        no parallelism is applied.

    Example:

    ```python
    from skimage.io import imread
    from skimage.transform import resize
    import numpy as np
    import math

    # Here, `x_set` is list of path to the images
    # and `y_set` are the associated classes.

    class CIFAR10PyDataset(keras.utils.PyDataset):

        def __init__(self, x_set, y_set, batch_size, **kwargs):
            super().__init__(**kwargs)
            self.x, self.y = x_set, y_set
            self.batch_size = batch_size

        def __len__(self):
            # Return number of batches.
            return math.ceil(len(self.x) / self.batch_size)

        def __getitem__(self, idx):
            # Return x, y for batch idx.
            low = idx * self.batch_size
            # Cap upper bound at array length; the last batch may be smaller
            # if the total number of items is not a multiple of batch size.
            high = min(low + self.batch_size, len(self.x))
            batch_x = self.x[low:high]
            batch_y = self.y[low:high]

            return np.array([
                resize(imread(file_name), (200, 200))
                   for file_name in batch_x]), np.array(batch_y)
    ```
    c                 .    || _         || _        || _        y N)_workers_use_multiprocessing_max_queue_size)selfworkersuse_multiprocessingmax_queue_sizes       c/home/dcms/DCMS/lib/python3.12/site-packages/keras/src/trainers/data_adapters/py_dataset_adapter.py__init__zPyDataset.__init__]   s    $7!-    c                     d}t        | d      s	d| _        d}t        | d      s	d| _        d}t        | d      s	d| _        d}|rt	        j
                  dd	
       y y )NFr      Tr   r   
   zYour `PyDataset` class should call `super().__init__(**kwargs)` in its constructor. `**kwargs` can include `workers`, `use_multiprocessing`, `max_queue_size`. Do not pass these arguments to `fit()`, as they will be ignored.   )
stacklevel)hasattrr   r   r   warningswarn)r   r   s     r   _warn_if_super_not_calledz#PyDataset._warn_if_super_not_calledb   sl    tZ(DMDt34(-D%Dt./#%D DMMG
  r   c                 :    | j                          | j                  S r
   )r   r   r   s    r   r   zPyDataset.workersw   s    &&(}}r   c                     || _         y r
   )r   r   values     r   r   zPyDataset.workers|   s	    r   c                 :    | j                          | j                  S r
   )r   r   r   s    r   r   zPyDataset.use_multiprocessing   s    &&((((r   c                     || _         y r
   )r   r!   s     r   r   zPyDataset.use_multiprocessing   s
    $)!r   c                 :    | j                          | j                  S r
   )r   r   r   s    r   r   zPyDataset.max_queue_size   s    &&(###r   c                     || _         y r
   )r   r!   s     r   r   zPyDataset.max_queue_size   s
    $r   c                     t         )zGets batch at position `index`.

        Args:
            index: position of the batch in the PyDataset.

        Returns:
            A batch
        NotImplementedError)r   indexs     r   __getitem__zPyDataset.__getitem__   
     "!r   c                 F    t        | d      rt        |       S t        d      )zNumber of batches in the PyDataset.

        Returns:
            The number of batches in the PyDataset or `None` to indicate that
            the dataset is infinite.
        __len__z`You need to implement the `num_batches` property:

@property
def num_batches(self):
  return ...)r   lenr)   r   s    r   num_batcheszPyDataset.num_batches   s*     4#t9!>
 	
r   c                      y)z.Method called at the beginning of every epoch.N r   s    r   on_epoch_beginzPyDataset.on_epoch_begin       r   c                      y)z(Method called at the end of every epoch.Nr2   r   s    r   on_epoch_endzPyDataset.on_epoch_end   r4   r   Nr   Fr   )__name__
__module____qualname____doc__r   r   propertyr   setterr   r   r+   r0   r3   r6   r2   r   r   r   r      s    HT.
*   ^^  ) ) *  * $ $ % %	" 
 
r   r   c                       e Zd ZdZ	 	 ddZd Zd Zd Zd Zd Z	d	 Z
d
 Zd Zd Zd Zd Zd Zed        Zed        Zy)PyDatasetAdapterz.Adapter for `keras.utils.PyDataset` instances.Nc                 F   || _         || _        d | _        || _        d | _        d| _        | j                   j                  }| j                   j                  }|dkD  s|dkD  rA|r>t        | j                   ||| j                   j                  | j                        | _        y y y )NFr   r   )r   r   r   shuffle)

py_datasetclass_weightenqueuerrA   _output_signature_within_epochr   r   OrderedEnqueuerr   )r   xrC   rA   r   r   s         r   r   zPyDatasetAdapter.__init__   s     (!%"//))"ooAAQ;7Q;+>+$7#==DM ,?;r   c                    t        |t              r|S t        |t        j                        r|f}t        |t              rt        |      }t        |t
              rt        |      dvr&t        dt        |      d d  dt        |             | j                  Pt        |      dk(  rt        d      t        |      dk(  r)t        j                  |d   | j                        }||fz   }|S )	N>   r   r      zPyDataset.__getitem__() must return a tuple or a dict. If a tuple, it must be ordered either (input,) or (inputs, targets) or (inputs, targets, sample_weights). Received: d   z... of type rJ   zGYou cannot specify `class_weight` and `sample_weight` at the same time.r   r   )
isinstancedictnpndarraylisttupler/   
ValueErrorstrtyperC   r   class_weight_to_sample_weights)r   batchsws      r   _standardize_batchz#PyDatasetAdapter._standardize_batch   s    eT"LeRZZ(HEeT"%LE%'3u:Y+F !Z-.l4;-	I  (5zQ <  5zQ'FF!Hd// r   c              #   ~   K   t        j                         D ]"  }| j                  | j                  |          $ y wr
   )	itertoolscountrX   rB   )r   is     r   _infinite_generatorz$PyDatasetAdapter._infinite_generator   s6     " 	>A))$//!*<==	>s   ;=c              #      K   t        | j                  j                        }| j                  r t	        |      }t        j                  |       |D ]"  }| j                  | j                  |          $ y wr
   )rangerB   r0   rA   rP   randomrX   )r   indicesr\   s      r   _finite_generatorz"PyDatasetAdapter._finite_generator   s_     334<<7mGNN7# 	>A))$//!*<==	>s   A4A6c              #      K   | j                   j                          | j                   j                         D ]  }| j                  |        y wr
   )rD   startgetrX   )r   rV   s     r   _infinite_enqueuer_generatorz-PyDatasetAdapter._infinite_enqueuer_generator   sC     ]]&&( 	1E))%00	1s   AAc              #   0  K   | j                   j                          | j                  j                  }t	        | j                   j                               D ]<  \  }}| j                  |       ||dz
  k\  s"| j                   j                           y  y w)Nr   )rD   rd   rB   r0   	enumeratere   rX   stop)r   r0   r\   rV   s       r   _finite_enqueuer_generatorz+PyDatasetAdapter._finite_enqueuer_generator   sz     oo11!$--"3"3"56 	HAu))%00K!O#""$		s   A5B8Bc                     | j                   6| j                  j                  | j                         S | j	                         S | j                  j                  | j                         S | j                         S r
   )rD   rB   r0   r]   rb   rf   rj   r   s    r   _get_iteratorzPyDatasetAdapter._get_iterator  sh    == **2//11--//**288::6688r   c                 H    t        j                  | j                               S r
   )r   get_numpy_iteratorrl   r   s    r   rn   z#PyDatasetAdapter.get_numpy_iterator  s    !44T5G5G5IJJr   c                 H    t        j                  | j                               S r
   )r   get_jax_iteratorrl   r   s    r   rp   z!PyDatasetAdapter.get_jax_iterator  s    !2243E3E3GHHr   c                    ddl m} | j                  j                  }| j                  t
        j                  }|t        ||      }t        |      D cg c]   }| j                  | j                  |         " }}t        |      dk(  rt        d      t        j                  |      | _        |j                  j                  j                  | j                   | j                        }| j"                  O|j                  j%                         }d|j&                  _        d|j*                  _        |j/                  |      }|S |j1                  |j                  j2                        }|S c c}w )Nr   )
tensorflowzThe PyDataset has length 0)output_signatureFr   )keras.src.utils.module_utilsrr   rB   r0   rE   r   NUM_BATCHES_FOR_TENSOR_SPECminr_   rX   r/   rR   get_tensor_specdataDatasetfrom_generatorrl   rD   Optionsautotuneenabled	threadingprivate_threadpool_sizewith_optionsprefetchAUTOTUNE)r   tfr0   num_samplesr\   batchesdsoptionss           r   get_tf_datasetzPyDatasetAdapter.get_tf_dataset  sD   Aoo11!!),HHK&!+{; {+ ''(:;G  7|q  !=>>%7%G%G%PD"WW__++!33 , 
 ==$ ggoo'G',G$89G5)B 	 RWW--.B	-s   %E+c                 H    t        j                  | j                               S r
   )r   get_torch_dataloaderrl   r   s    r   r   z%PyDatasetAdapter.get_torch_dataloader:  s    !66t7I7I7KLLr   c                     | j                   rt        d      d| _         | j                  r| j                  j                          | j                  j                          y )NzL`on_epoch_begin` was called twice without `on_epoch_end` having been called.T)rF   rR   rD   rd   rB   r3   r   s    r   r3   zPyDatasetAdapter.on_epoch_begin=  sN    &  "==MM!&&(r   c                     | j                   r| j                   j                          | j                  j                          d| _        y NF)rD   ri   rB   r6   rF   r   s    r   r6   zPyDatasetAdapter.on_epoch_endH  s1    ==MM $$&"r   c                 .    | j                   j                  S r
   )rB   r0   r   s    r   r0   zPyDatasetAdapter.num_batchesN  s    ***r   c                      y r
   r2   r   s    r   
batch_sizezPyDatasetAdapter.batch_sizeR  s    r   r   )r8   r9   r:   r;   r   rX   r]   rb   rf   rj   rl   rn   rp   r   r   r3   r6   r<   r0   r   r2   r   r   r?   r?      sz    8
 	08>>1

9KI@M	)# + +  r   r?   Fc                 f    | rt         rt        j                  j                  S t        j                  S r
   )_FORCE_THREADPOOLmultiprocessingdummyPool)r   s    r   get_pool_classr   e  s&    "3$$)))r   c                  B    t         t        j                         a t         S )z,Lazily create the queue to track worker ids.)_WORKER_ID_QUEUEr   Queuer2   r   r   get_worker_id_queuer   l  s     *002r   c                     t         |    |   S )a  Get the value from the PyDataset `uid` at index `i`.

    To allow multiple PyDatasets to be used at the same time, we use `uid` to
    get a specific one. A single PyDataset would cause the validation to
    overwrite the training PyDataset.

    This methods is called from worker threads.

    Args:
        uid: int, PyDataset identifier
        i: index

    Returns:
        The value at index `i`.
    )_SHARED_SEQUENCES)uidr\   s     r   	get_indexr   t  s      S!!$$r   c                   P    e Zd ZdZ	 	 	 ddZd Zd ZddZd Zd Z	d Z
d	 Zd
 Zy)PyDatasetEnqueuera  Base class to enqueue inputs.

    The task of an Enqueuer is to use parallelism to speed up preprocessing.
    This is done with processes or threads.

    Example:

    ```python
        enqueuer = PyDatasetEnqueuer(...)
        enqueuer.start()
        datas = enqueuer.get()
        for data in datas:
            # Use the inputs; training, evaluating, predicting.
            # ... stop sometime.
        enqueuer.stop()
    ```

    The `enqueuer.get()` should be an infinite stream of data.
    c                 j   || _         t        	 t        j                  dd      at        t        t              rt        | _        t        dz  anKt        j                         5  t        j                  | _        t        xj                  dz  c_	        d d d        t        j                         | _        t        j                  |      | _        d| _        t        j                          | _        d | _        |r| j'                        | _        y fd| _        y # t        $ r daY w xY w# 1 sw Y   xY w)Nr\   r   r   Fc                 &     t        d            S r   )r   )_r   s    r   <lambda>z,PyDatasetEnqueuer.__init__.<locals>.<lambda>  s    )>)>w)G r   )rB   _SEQUENCE_COUNTERr   ValueOSErrorrL   intr   get_lockr"   queuer   ready_queuefuture_queuerunningr~   Lockstart_stop_lock
run_thread_get_executor_initexecutor_fn)r   rB   r   r   r   s     `  r   r   zPyDatasetEnqueuer.__init__  s    % $&$3$9$9#q$A! '-(DH" #++- -,22!''1,'- !;;=!KK7(~~/#66w?D  HD1  & %&!	&- -s   D #/D)D&%D&)D2c                     | j                   S )zWhether the enqueuer is running.

        This method is thread safe and called from many threads.

        Returns: boolean indicating whether this enqueuer is running.
        )r   r   s    r   
is_runningzPyDatasetEnqueuer.is_running  s     ||r   c                 \   | j                   5  | j                  r
	 ddd       yd| _        t        j                  | j                        | _        d| j                   | j
                  _        d| j
                  _        | j
                  j                          ddd       y# 1 sw Y   yxY w)zStarts the handler's workers.

        This method is thread safe but is called from the main thread.
        It is safe to call this method multiple times, extra calls are ignored.
        NT)targetWorker_)
r   r   r~   Thread_runr   r   namedaemonrd   r   s    r   rd   zPyDatasetEnqueuer.start  s     !! 	$||	$ 	$  DL'..dii@DO%,TXXJ#7DOO %)DOO"OO!!#	$ 	$ 	$s   B"A5B""B+c                    | j                   5  | j                  s
	 ddd       yd| _        |ry	 	 | j                  j                  dd      }t	        |t
              r||j                         }| j                  j                          || j                  j                  |       xd| _        dt        | j                  <   ddd       y# t        j                  $ r Y nw xY w| j                  j                          W# 1 sw Y   yxY w)ao  Stops running threads and wait for them to exit, if necessary.

        This method is thread safe and is called from various threads. Note that
        the `drain_queue_and_join` argument must be set correctly.
        It is safe to call this method multiple times, extra calls are ignored.

        Args:
            drain_queue_and_join: set to True to drain the queue of pending
                items and wait for the worker thread to complete. Set to False
                if invoked from a worker thread to avoid deadlocks. Note that
                setting this to False means this enqueuer won't be reused.
        NFT皙?blocktimeout)r   r   r   re   rL   	Exception	task_doner   putr   Emptyr   joinr   r   )r   drain_queue_and_joinr"   inputss       r   ri   zPyDatasetEnqueuer.stop  s     !! 	/<<	/ 	/ !DL# 	 $ 1 1 5 5D# 5 N%eY7"'K!&))335!- ,,008  #DO*.dhh'-	/ 	/" !;; $$&'	/ 	/s4   C=
C=A6C	%C=	CC=CC==Dc                 >    | j                   t        | j                  <   y)z&Sends current Iterable to all workers.N)rB   r   r   r   s    r   _send_py_datasetz"PyDatasetEnqueuer._send_py_dataset   s     '+oo$((#r   c                 (    | j                  d       y )NFr   )ri   r   s    r   __del__zPyDatasetEnqueuer.__del__  s    		u	-r   c                     t         )z?Submits request to the executor and queue the `Future` objects.r(   r   s    r   r   zPyDatasetEnqueuer._run  s    !!r   c                     t         )Gets the Pool initializer for multiprocessing.

        Args:
            workers: Number of workers.

        Returns:
            Function, a Function to initialize the pool
        r(   )r   r   s     r   r   z$PyDatasetEnqueuer._get_executor_init  r,   r   c                     t         )6  Creates a generator to extract data from the queue.

        Skip the data if it is `None`.

        This method is called from the main thread.

        Yields:
            The next element in the queue, i.e. a tuple
            `(inputs, targets)` or
            `(inputs, targets, sample_weights)`.
        r(   r   s    r   re   zPyDatasetEnqueuer.get  s
     "!r   Nr7   )T)r8   r9   r:   r;   r   r   rd   ri   r   r   r   r   re   r2   r   r   r   r     sB    . !%HN$#/J6
."	""r   r   c                   >     e Zd ZdZ	 	 	 	 d fd	Zd Zd Zd Z xZS )rG   a  Builds a Enqueuer from a PyDataset.

    Args:
        py_dataset: A `keras.utils.PyDataset` object.
        use_multiprocessing: use multiprocessing if True, otherwise threading
        shuffle: whether to shuffle the data at the beginning of each epoch
    c                     t         |   ||||       || _        | j                  j                  t        j                         | _        y y r
   )superr   rA   rB   r0   rZ   r[   ra   )r   rB   r   r   r   rA   	__class__s         r   r   zOrderedEnqueuer.__init__/  sJ     	!4n	
 ??&&. %??,DL /r   c                     fd}|S )r   c                 x     t        d      t        | d t               f      }t        j	                  |       |S )NT)initializerinitargs)r   init_pool_generatorr   _DATA_POOLSadd)seqspoolr   s     r   pool_fnz3OrderedEnqueuer._get_executor_init.<locals>.pool_fnJ  s;    '>$'/&9&;<D
 OOD!Kr   r2   )r   r   r   s    ` r   r   z"OrderedEnqueuer._get_executor_init@  s    	 r   c                    	 | j                   j                  [t        | j                   j                        }| j                  r t	        |      }t        j                  |       t        |      | _        | j                          t        | j                  t                    5 }| j                         rd	 t        | j                        }| j                  j                  |j!                  t"        | j$                  |f      d       | j                         rdddd       y# t&        $ r Y w xY w# 1 sw Y   yxY w# t(        $ r%}| j                  j                  |       Y d}~yd}~ww xY w)zSubmits request to the executor and queue the `Future` objects.

        This method is the run method of worker threads.
        NTr   )rB   r0   r_   rA   rP   r`   iterra   r   r   r   r   r   nextr   r   apply_asyncr   r   StopIterationr   )r   ra   executorr\   es        r   r   zOrderedEnqueuer._runU  s)   
	%**6   ; ;<<<"7mGNN7+#G}!!#))*;<= 	oo' .))--$00TXXqMJ"& .  oo'	 	 ) 	 	  	%!!!$$	%s[   BD: !D.3ADD.D: 	D+(D.*D++D..D73D: 7D: :	E(E##E(c              #     K   | j                         r"	 | j                  j                  d      }| 2t        d      # t        j                  $ r Y nw xY w	 | j
                  j                  dd      }| j
                  j                          t        |t              r||j                         }|| n9# t        j                  $ r Y n$t        $ r}| j                  d       |d}~ww xY w| j                         rЭw)	r   Fr   T   r   Nr   z@Iterator called after `on_epoch_end` or before `on_epoch_begin`.)r   r   re   r   r   r   r   rL   r   ri   rR   )r   r   r"   r   s       r   re   zOrderedEnqueuer.getr  s      oo))--E-:* N
 	
) ;; ))--D!-D!!++-eY/K% L;;  		t	4% oosX   D A  D ADADAB: 9D:C0DC0C++C00D)r   Fr   F)	r8   r9   r:   r;   r   r   r   re   __classcell__)r   s   @r   rG   rG   &  s*     !-"*%:'
r   rG   c                     | a t        j                         }d|j                   |_        |,t        j
                  j                  ||j                  z          ||j                  |j                  dd       yy)a_  Initializer function for pool workers.

    Args:
        gens: State which should be made available to worker processes.
        random_seed: An optional value with which to seed child processes.
        id_queue: A multiprocessing Queue of worker ids.
            This is used to indicate that a worker process
            was created by Keras.
    Keras_worker_NTr   r   )	r   r   current_processr   rN   r`   seedidentr   )gensrandom_seedid_queueworker_procs       r   r   r     st     !113K '{'7'7&89K
		{[%6%667[&&dC@ r   )NN) rZ   multiprocessing.dummyr   r   r`   r~   r   weakref
contextlibr   numpyrN   keras.src.api_exportr    keras.src.trainers.data_adaptersr   -keras.src.trainers.data_adapters.data_adapterr   r   r?   r   r   WeakSetr   r   r   r   r   r   r   rG   r   r2   r   r   <module>r      s             - ? E &(>?@a a AaH^{ ^D    goo   %&\" \"~s
' s
lAr   