Source code for neuroconv.tools.nwb_helpers._configuration_models._base_backend

"""Base Pydantic models for DatasetInfo and DatasetConfiguration."""

from typing import Any, ClassVar, Literal

from hdmf.container import DataIO
from pydantic import BaseModel, ConfigDict, Field
from pynwb import NWBFile
from typing_extensions import Self

from ._base_dataset_io import DatasetIOConfiguration
from ._pydantic_pure_json_schema_generator import PureJSONSchemaGenerator
from .._dataset_configuration import (
    get_default_dataset_io_configurations,
    get_existing_dataset_io_configurations,
)


[docs] class BackendConfiguration(BaseModel): """A model for matching collections of DatasetConfigurations to a specific backend.""" backend: ClassVar[Literal["hdf5", "zarr"]] pretty_backend_name: ClassVar[Literal["HDF5", "Zarr"]] data_io_class: ClassVar[type[DataIO]] model_config = ConfigDict(validate_assignment=True) # Re-validate model on mutation dataset_configurations: dict[str, DatasetIOConfiguration] = Field( description=( "A mapping from object locations (e.g. `acquisition/TestElectricalSeriesAP/data`) " "to their DatasetConfiguration specification that contains all information " "for writing the datasets to disk using the specific backend." ) ) def __str__(self) -> str: """Not overriding __repr__ as this is intended to render only when wrapped in print().""" string = ( f"\n{self.pretty_backend_name} dataset configurations" f"\n{'-' * (len(self.pretty_backend_name) + 23)}" ) for dataset_configuration in self.dataset_configurations.values(): string += f"\n{dataset_configuration}" return string # Pydantic models have several API calls for retrieving the schema - override all of them to work
[docs] @classmethod def schema(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs)
[docs] @classmethod def schema_json(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs)
[docs] @classmethod def model_json_schema(cls, **kwargs) -> dict[str, Any]: assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed." assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed." return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs)
[docs] @classmethod def from_nwbfile(cls, nwbfile: NWBFile) -> Self: """ Create a backend configuration from an NWBFile with default chunking and compression settings. .. deprecated:: 0.8.4 The `from_nwbfile` method is deprecated and will be removed on or after June 2026. Use `from_nwbfile_with_defaults` or `from_nwbfile_with_existing` instead. """ import warnings warnings.warn( "The 'from_nwbfile' method is deprecated and will be removed on or after June 2026. " "Use 'from_nwbfile_with_defaults' or 'from_nwbfile_with_existing' instead.", FutureWarning, stacklevel=2, ) default_dataset_configurations = get_default_dataset_io_configurations(nwbfile=nwbfile, backend=cls.backend) dataset_configurations = { default_dataset_configuration.location_in_file: default_dataset_configuration for default_dataset_configuration in default_dataset_configurations } return cls(dataset_configurations=dataset_configurations)
[docs] @classmethod def from_nwbfile_with_defaults(cls, nwbfile: NWBFile) -> Self: """ Create a backend configuration from an NWBFile with default chunking and compression settings. Parameters ---------- nwbfile : pynwb.NWBFile The NWBFile object to extract the backend configuration from. Returns ------- Self The backend configuration with default chunking and compression settings for each neurodata object in the NWBFile. """ dataset_io_configurations = get_default_dataset_io_configurations(nwbfile=nwbfile, backend=cls.backend) dataset_configurations = { default_dataset_configuration.location_in_file: default_dataset_configuration for default_dataset_configuration in dataset_io_configurations } return cls(dataset_configurations=dataset_configurations)
[docs] @classmethod def from_nwbfile_with_existing(cls, nwbfile: NWBFile) -> Self: """ Create a backend configuration from an NWBFile using existing dataset settings. This method extracts existing chunking and compression settings from an NWBFile that has already been written to disk. Parameters ---------- nwbfile : pynwb.NWBFile The NWBFile object to extract the backend configuration from. Returns ------- Self The backend configuration with existing chunking and compression settings for each neurodata object in the NWBFile. """ dataset_io_configurations = get_existing_dataset_io_configurations(nwbfile=nwbfile) dataset_configurations = { default_dataset_configuration.location_in_file: default_dataset_configuration for default_dataset_configuration in dataset_io_configurations } return cls(dataset_configurations=dataset_configurations)
[docs] def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> dict[str, DatasetIOConfiguration]: """ Find locations of objects with mismatched IDs in the file. This function identifies neurodata objects in the `nwbfile` that have matching locations with the current configuration but different object IDs. It returns a dictionary of remapped `DatasetIOConfiguration` objects for these mismatched locations. Parameters ---------- nwbfile : pynwb.NWBFile The NWBFile object to check for mismatched object IDs. Returns ------- dict[str, DatasetIOConfiguration] A dictionary where: * Keys: Locations in the NWB of objects with mismatched IDs. * Values: New `DatasetIOConfiguration` objects corresponding to the updated object IDs. Notes ----- * This function only checks for objects with the same location but different IDs. * It does not identify objects missing from the current configuration. * The returned `DatasetIOConfiguration` objects are copies of the original configurations with updated `object_id` fields. """ # Use a fresh default configuration to get mapping of object IDs to locations in file default_configurations = list(get_default_dataset_io_configurations(nwbfile=nwbfile, backend=self.backend)) if len(default_configurations) != len(self.dataset_configurations): raise ValueError( f"The number of default configurations ({len(default_configurations)}) does not match the number of " f"specified configurations ({len(self.dataset_configurations)})!" ) objects_requiring_remapping = {} for dataset_configuration in default_configurations: location_in_file = dataset_configuration.location_in_file object_id = dataset_configuration.object_id location_cannot_be_remapped = location_in_file not in self.dataset_configurations if location_cannot_be_remapped: raise KeyError( f"Unable to remap the object IDs for object at location '{location_in_file}'! This " "usually occurs if you are attempting to configure the backend for two files of " "non-equivalent structure." ) former_configuration = self.dataset_configurations[location_in_file] former_object_id = former_configuration.object_id if former_object_id == object_id: continue remapped_configuration = former_configuration.model_copy(update={"object_id": object_id}) objects_requiring_remapping[location_in_file] = remapped_configuration return objects_requiring_remapping
[docs] def build_remapped_backend( self, locations_to_remap: dict[str, DatasetIOConfiguration], ) -> Self: """ Build a remapped backend configuration by updating mismatched object IDs. This function takes a dictionary of new `DatasetIOConfiguration` objects (as returned by `find_locations_requiring_remapping`) and updates a copy of the current configuration with these new configurations. Parameters ---------- locations_to_remap : dict A dictionary mapping locations in the NWBFile to their corresponding new `DatasetIOConfiguration` objects with updated IDs. Returns ------- Self A new instance of the backend configuration class with updated object IDs for the specified locations. """ new_backend_configuration = self.model_copy(deep=True) new_backend_configuration.dataset_configurations.update(locations_to_remap) return new_backend_configuration
[docs] def apply_global_compression( self, compression_method: str, compression_options: dict[str, Any] | None = None, ) -> None: """ Apply compression settings to all datasets in this backend configuration. This method modifies the backend configuration in-place, applying the specified compression method and options to ALL datasets, regardless of their current compression settings. Parameters ---------- compression_method : str The compression method to apply to all datasets (e.g., "gzip", "Blosc", "Zstd"). compression_options : dict, optional Additional compression options to apply. The available options depend on the compression method chosen. Raises ------ ValueError If the compression method is not available for this backend type. Examples -------- >>> backend_config = get_default_backend_configuration(nwbfile, backend="hdf5") >>> backend_config.apply_global_compression("Blosc", {"cname": "zstd", "clevel": 5}) """ # Import here to avoid circular imports from ._hdf5_dataset_io import AVAILABLE_HDF5_COMPRESSION_METHODS from ._zarr_dataset_io import AVAILABLE_ZARR_COMPRESSION_METHODS # Validate compression method for the backend if self.backend == "hdf5": available_methods = AVAILABLE_HDF5_COMPRESSION_METHODS elif self.backend == "zarr": available_methods = AVAILABLE_ZARR_COMPRESSION_METHODS else: raise ValueError(f"Unknown backend: {self.backend}") if compression_method not in available_methods: raise ValueError( f"Compression method '{compression_method}' is not available for backend " f"'{self.backend}'. Available methods: {list(available_methods.keys())}" ) # Apply global compression to ALL datasets for dataset_configuration in self.dataset_configurations.values(): dataset_configuration.compression_method = compression_method dataset_configuration.compression_options = compression_options