topobench.dataloader.dataloader module#

TBDataloader class.

class Any(*args, **kwargs)#

Bases: object

Special type indicating an unconstrained type.

  • Any is compatible with every type.

  • Any assumed to have all methods.

  • All values assumed to be instances of Any.

Note that all the above statements are true from the point of view of static type checkers. At runtime, Any should not be used with instance checks.

class DataLoader(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, *, prefetch_factor=None, persistent_workers=False, pin_memory_device='')#

Bases: Generic[T_co]

Data loader combines a dataset and a sampler, and provides an iterable over the given dataset.

The DataLoader supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning.

See torch.utils.data documentation page for more details.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • multiprocessing_context (str or multiprocessing.context.BaseContext, optional) – If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory to if pin_memory is True.

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See multiprocessing-best-practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See `Dataset Types`_ for more details on these two types of datasets and how IterableDataset interacts with `Multi-process data loading`_.

Warning

See reproducibility, and dataloader-workers-random-seed, and data-loading-randomness notes for random seed related questions.

__init__(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, *, prefetch_factor=None, persistent_workers=False, pin_memory_device='')#
check_worker_number_rationality()#
batch_size: int | None#
dataset: Dataset[T_co]#
drop_last: bool#
property multiprocessing_context#

!! processed by numpydoc !!

num_workers: int#
pin_memory: bool#
pin_memory_device: str#
prefetch_factor: int | None#
sampler: Sampler | Iterable#
timeout: float#
class DataloadDataset(data_lst)#

Bases: Dataset

Custom dataset to return all the values added to the dataset object.

Parameters:
data_lstlist[torch_geometric.data.Data]

List of torch_geometric.data.Data objects.

__init__(data_lst)#
get(idx)#

Get data object from data list.

Parameters:
idxint

Index of the data object to get.

Returns:
tuple

Tuple containing a list of all the values for the data and the corresponding keys.

len()#

Return the length of the dataset.

Returns:
int

Length of the dataset.

class LightningDataModule#

Bases: DataHooks, HyperparametersMixin

A DataModule standardizes the training, val, test splits, data preparation and transforms. The main advantage is consistent data splits, data preparation and transforms across models.

Example:

import lightning as L
import torch.utils.data as data
from lightning.pytorch.demos.boring_classes import RandomDataset

class MyDataModule(L.LightningDataModule):
    def prepare_data(self):
        # download, IO, etc. Useful with shared filesystems
        # only called on 1 GPU/TPU in distributed
        ...

    def setup(self, stage):
        # make assignments here (val/train/test split)
        # called on every process in DDP
        dataset = RandomDataset(1, 100)
        self.train, self.val, self.test = data.random_split(
            dataset, [80, 10, 10], generator=torch.Generator().manual_seed(42)
        )

    def train_dataloader(self):
        return data.DataLoader(self.train)

    def val_dataloader(self):
        return data.DataLoader(self.val)

    def test_dataloader(self):
        return data.DataLoader(self.test)

    def on_exception(self, exception):
        # clean up state after the trainer faced an exception
        ...

    def teardown(self):
        # clean up state after the trainer stops, delete files...
        # called on every process in DDP
        ...
__init__()#
prepare_data_per_node#

If True, each LOCAL_RANK=0 will call prepare data. Otherwise only NODE_RANK=0, LOCAL_RANK=0 will prepare data.

allow_zero_length_dataloader_with_multiple_devices#

If True, dataloader with zero length within local rank is allowed. Default value is False.

classmethod from_datasets(train_dataset=None, val_dataset=None, test_dataset=None, predict_dataset=None, batch_size=1, num_workers=0, **datamodule_kwargs)#

Create an instance from torch.utils.data.Dataset.

Parameters:
  • train_dataset (Dataset | Iterable[Dataset] | None) – Optional dataset or iterable of datasets to be used for train_dataloader()

  • val_dataset (Dataset | Iterable[Dataset] | None) – Optional dataset or iterable of datasets to be used for val_dataloader()

  • test_dataset (Dataset | Iterable[Dataset] | None) – Optional dataset or iterable of datasets to be used for test_dataloader()

  • predict_dataset (Dataset | Iterable[Dataset] | None) – Optional dataset or iterable of datasets to be used for predict_dataloader()

  • batch_size (int) – Batch size to use for each dataloader. Default is 1. This parameter gets forwarded to the __init__ if the datamodule has such a name defined in its signature.

  • num_workers (int) – Number of subprocesses to use for data loading. 0 means that the data will be loaded in the main process. Number of CPUs available. This parameter gets forwarded to the __init__ if the datamodule has such a name defined in its signature.

  • **datamodule_kwargs (Any) – Additional parameters that get passed down to the datamodule’s __init__.

load_from_checkpoint(checkpoint_path, map_location=None, hparams_file=None, **kwargs)#

Primary way of loading a datamodule from a checkpoint. When Lightning saves a checkpoint it stores the arguments passed to __init__ in the checkpoint under "datamodule_hyper_parameters".

Any arguments specified through **kwargs will override args stored in "datamodule_hyper_parameters".

Parameters:
  • checkpoint_path (str | Path | IO) – Path to checkpoint. This can also be a URL, or file-like object

  • map_location (device | str | int | Callable[[UntypedStorage, str], UntypedStorage | None] | Dict[device | str | int, device | str | int] | None) – If your checkpoint saved a GPU model and you now load on CPUs or a different number of GPUs, use this to map to the new setup. The behaviour is the same as in torch.load().

  • hparams_file (str | Path | None) –

    Optional path to a .yaml or .csv file with hierarchical structure as in this example:

    dataloader:
        batch_size: 32
    

    You most likely won’t need this since Lightning will always save the hyperparameters to the checkpoint. However, if your checkpoint weights don’t have the hyperparameters saved, use this method to pass in a .yaml file with the hparams you’d like to use. These will be converted into a dict and passed into your LightningDataModule for use.

    If your datamodule’s hparams argument is Namespace and .yaml file has hierarchical structure, you need to refactor your datamodule to treat hparams as dict.

  • **kwargs (Any) – Any extra keyword args needed to init the datamodule. Can also be used to override saved hyperparameter values.

Returns:

LightningDataModule instance with loaded weights and hyperparameters (if available).

Return type:

Self

Note

load_from_checkpoint is a class method. You must use your LightningDataModule class to call it instead of the LightningDataModule instance, or a TypeError will be raised.

Example:

# load weights without mapping ...
datamodule = MyLightningDataModule.load_from_checkpoint('path/to/checkpoint.ckpt')

# or load weights and hyperparameters from separate files.
datamodule = MyLightningDataModule.load_from_checkpoint(
    'path/to/checkpoint.ckpt',
    hparams_file='/path/to/hparams_file.yaml'
)

# override some of the params with new values
datamodule = MyLightningDataModule.load_from_checkpoint(
    PATH,
    batch_size=32,
    num_workers=10,
)
load_state_dict(state_dict)#

Called when loading a checkpoint, implement to reload datamodule state given datamodule state_dict.

Parameters:

state_dict (Dict[str, Any]) – the datamodule state returned by state_dict.

on_exception(exception)#

Called when the trainer execution is interrupted by an exception.

state_dict()#

Called when saving a checkpoint, implement to generate and save datamodule state.

Returns:

A dictionary containing datamodule state.

Return type:

Dict[str, Any]

CHECKPOINT_HYPER_PARAMS_KEY = 'datamodule_hyper_parameters'#
CHECKPOINT_HYPER_PARAMS_NAME = 'datamodule_hparams_name'#
CHECKPOINT_HYPER_PARAMS_TYPE = 'datamodule_hparams_type'#
name: str | None = None#
class TBDataloader(dataset_train, dataset_val=None, dataset_test=None, batch_size=1, num_workers=0, pin_memory=False, **kwargs)#

Bases: LightningDataModule

This class takes care of returning the dataloaders for the training, validation, and test datasets.

It also handles the collate function. The class is designed to work with the torch dataloaders.

Parameters:
dataset_trainDataloadDataset

The training dataset.

dataset_valDataloadDataset, optional

The validation dataset (default: None).

dataset_testDataloadDataset, optional

The test dataset (default: None).

batch_sizeint, optional

The batch size for the dataloader (default: 1).

num_workersint, optional

The number of worker processes to use for data loading (default: 0).

pin_memorybool, optional

If True, the data loader will copy tensors into pinned memory before returning them (default: False).

**kwargsoptional

Additional arguments.

References

Read the docs:

https://lightning.ai/docs/pytorch/latest/data/datamodule.html

__init__(dataset_train, dataset_val=None, dataset_test=None, batch_size=1, num_workers=0, pin_memory=False, **kwargs)#
prepare_data_per_node#

If True, each LOCAL_RANK=0 will call prepare data. Otherwise only NODE_RANK=0, LOCAL_RANK=0 will prepare data.

allow_zero_length_dataloader_with_multiple_devices#

If True, dataloader with zero length within local rank is allowed. Default value is False.

state_dict()#

Called when saving a checkpoint. Implement to generate and save the datamodule state.

Returns:
dict

A dictionary containing the datamodule state that you want to save.

teardown(stage=None)#

Lightning hook for cleaning up after trainer.fit(), trainer.validate(), trainer.test(), and trainer.predict().

Parameters:
stagestr, optional

The stage being torn down. Either “fit”, “validate”, “test”, or “predict” (default: None).

test_dataloader()#

Create and return the test dataloader.

Returns:
torch.utils.data.DataLoader

The test dataloader.

train_dataloader()#

Create and return the train dataloader.

Returns:
torch.utils.data.DataLoader

The train dataloader.

val_dataloader()#

Create and return the validation dataloader.

Returns:
torch.utils.data.DataLoader

The validation dataloader.

collate_fn(batch)#

Overwrite torch_geometric.data.DataLoader collate function to use the DomainData class.

This ensures that the torch_geometric dataloaders work with sparse matrices that are not necessarily named adj. The function also generates the batch slices for the different cell dimensions.

Parameters:
batchlist

List of data objects (e.g., torch_geometric.data.Data).

Returns:
torch_geometric.data.Batch

A torch_geometric.data.Batch object.