grain.experimental.InterleaveIterDataset

grain.experimental.InterleaveIterDataset#

class grain.experimental.InterleaveIterDataset(datasets, *, cycle_length, num_make_iter_threads=1, make_iter_buffer_size=1, iter_buffer_size=1)#

Interleaves the given sequence of datasets.

The sequence can be a MapDataset.

Concurrently processes at most cycle_length iterators and interleaves their elements. If cycle_length is larger than the number of datasets, then the behavior is similar to mixing the datasets with equal proportions. If cycle_length is 1, the datasets are chained.

Can be used with mp_prefetch to parallelize reading from sources that do not support random access and are implemented as IterDataset:

def make_source(filename: str) -> grain.IterDataset:
  ...

ds = grain.MapDataset.source(filenames).shuffle(seed=42).map(make_source)
ds = grain.experimental.InterleaveIterDataset(ds, cycle_length=4)
ds = ...
ds = ds.mp_prefetch(ds, 2)
for element in ds:
  ...
Parameters:
  • datasets (Sequence[IterDataset[T] | MapDataset[T]])

  • cycle_length (int)

  • num_make_iter_threads (int)

  • make_iter_buffer_size (int)

  • iter_buffer_size (int)

__init__(datasets, *, cycle_length, num_make_iter_threads=1, make_iter_buffer_size=1, iter_buffer_size=1)#

Initializes the InterleaveIterDataset.

Parameters:
  • datasets (Sequence[IterDataset[T] | MapDataset[T]]) – A sequence of IterDataset or MapDataset objects, or a MapDataset of datasets to be interleaved.

  • cycle_length (int) – The maximum number of input datasets from which elements will be processed concurrently. If cycle_length is greater than the total number of datasets, all available datasets will be interleaved. If cycle_length is 1, the datasets will be processed sequentially.

  • num_make_iter_threads (int) – Optional. The number of threads to use for asynchronously creating new iterators and starting prefetching elements (for each iterator) from the underlying datasets. Default value is 1, with this we’ll create one background thread to asynchronously create iterators.

  • make_iter_buffer_size (int) – Optional. The number of iterators to create and keep ready in advance in each preparation thread. This helps in reducing latency by ensuring iterators are available when needed. Default value is 1, with this we’ll always keep the next iterator ready in advance.

  • iter_buffer_size (int) – Optional. The number of elements to prefetch from each iterator. Default value is 1.

Methods

__init__(datasets, *, cycle_length[, ...])

Initializes the InterleaveIterDataset.

apply(transformations)

Returns a dataset with the given transformation(s) applied.

batch(batch_size, *[, drop_remainder, batch_fn])

Returns a dataset of elements batched along a new first dimension.

filter(transform)

Returns a dataset containing only the elements that match the filter.

map(transform)

Returns a dataset containing the elements transformed by transform.

map_with_index(transform)

Returns a dataset of the elements transformed by the transform.

mp_prefetch([options, worker_init_fn])

Returns a dataset prefetching elements in multiple processes.

pipe(func, /, *args, **kwargs)

Syntactic sugar for applying a callable to this dataset.

prefetch(multiprocessing_options)

Deprecated, use mp_prefetch instead.

random_map(transform, *[, seed])

Returns a dataset containing the elements transformed by transform.

seed(seed)

Returns a dataset that uses the seed for default seed generation.

set_slice(sl)

Attributes

parents