"""Base Pydantic models for DatasetInfo and DatasetConfiguration."""
from typing import Any, ClassVar, Literal
from hdmf.container import DataIO
from pydantic import BaseModel, ConfigDict, Field
from pynwb import NWBFile
from typing_extensions import Self
from ._base_dataset_io import DatasetIOConfiguration
from ._pydantic_pure_json_schema_generator import PureJSONSchemaGenerator
from .._dataset_configuration import (
get_default_dataset_io_configurations,
get_existing_dataset_io_configurations,
)
[docs]
class BackendConfiguration(BaseModel):
"""A model for matching collections of DatasetConfigurations to a specific backend."""
backend: ClassVar[Literal["hdf5", "zarr"]]
pretty_backend_name: ClassVar[Literal["HDF5", "Zarr"]]
data_io_class: ClassVar[type[DataIO]]
model_config = ConfigDict(validate_assignment=True) # Re-validate model on mutation
dataset_configurations: dict[str, DatasetIOConfiguration] = Field(
description=(
"A mapping from object locations (e.g. `acquisition/TestElectricalSeriesAP/data`) "
"to their DatasetConfiguration specification that contains all information "
"for writing the datasets to disk using the specific backend."
)
)
def __str__(self) -> str:
"""Not overriding __repr__ as this is intended to render only when wrapped in print()."""
string = (
f"\n{self.pretty_backend_name} dataset configurations" f"\n{'-' * (len(self.pretty_backend_name) + 23)}"
)
for dataset_configuration in self.dataset_configurations.values():
string += f"\n{dataset_configuration}"
return string
# Pydantic models have several API calls for retrieving the schema - override all of them to work
[docs]
@classmethod
def schema(cls, **kwargs) -> dict[str, Any]:
return cls.model_json_schema(**kwargs)
[docs]
@classmethod
def schema_json(cls, **kwargs) -> dict[str, Any]:
return cls.model_json_schema(**kwargs)
[docs]
@classmethod
def model_json_schema(cls, **kwargs) -> dict[str, Any]:
assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed."
assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed."
return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs)
[docs]
@classmethod
def from_nwbfile(cls, nwbfile: NWBFile) -> Self:
"""
Create a backend configuration from an NWBFile with default chunking and compression settings.
.. deprecated:: 0.8.4
The `from_nwbfile` method is deprecated and will be removed on or after June 2026.
Use `from_nwbfile_with_defaults` or `from_nwbfile_with_existing` instead.
"""
import warnings
warnings.warn(
"The 'from_nwbfile' method is deprecated and will be removed on or after June 2026. "
"Use 'from_nwbfile_with_defaults' or 'from_nwbfile_with_existing' instead.",
FutureWarning,
stacklevel=2,
)
default_dataset_configurations = get_default_dataset_io_configurations(nwbfile=nwbfile, backend=cls.backend)
dataset_configurations = {
default_dataset_configuration.location_in_file: default_dataset_configuration
for default_dataset_configuration in default_dataset_configurations
}
return cls(dataset_configurations=dataset_configurations)
[docs]
@classmethod
def from_nwbfile_with_defaults(cls, nwbfile: NWBFile) -> Self:
"""
Create a backend configuration from an NWBFile with default chunking and compression settings.
Parameters
----------
nwbfile : pynwb.NWBFile
The NWBFile object to extract the backend configuration from.
Returns
-------
Self
The backend configuration with default chunking and compression settings for each neurodata object in the NWBFile.
"""
dataset_io_configurations = get_default_dataset_io_configurations(nwbfile=nwbfile, backend=cls.backend)
dataset_configurations = {
default_dataset_configuration.location_in_file: default_dataset_configuration
for default_dataset_configuration in dataset_io_configurations
}
return cls(dataset_configurations=dataset_configurations)
[docs]
@classmethod
def from_nwbfile_with_existing(cls, nwbfile: NWBFile) -> Self:
"""
Create a backend configuration from an NWBFile using existing dataset settings.
This method extracts existing chunking and compression settings from an NWBFile that has already been written to disk.
Parameters
----------
nwbfile : pynwb.NWBFile
The NWBFile object to extract the backend configuration from.
Returns
-------
Self
The backend configuration with existing chunking and compression settings for each neurodata object in the NWBFile.
"""
dataset_io_configurations = get_existing_dataset_io_configurations(nwbfile=nwbfile)
dataset_configurations = {
default_dataset_configuration.location_in_file: default_dataset_configuration
for default_dataset_configuration in dataset_io_configurations
}
return cls(dataset_configurations=dataset_configurations)
[docs]
def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> dict[str, DatasetIOConfiguration]:
"""
Find locations of objects with mismatched IDs in the file.
This function identifies neurodata objects in the `nwbfile` that have matching locations
with the current configuration but different object IDs. It returns a dictionary of
remapped `DatasetIOConfiguration` objects for these mismatched locations.
Parameters
----------
nwbfile : pynwb.NWBFile
The NWBFile object to check for mismatched object IDs.
Returns
-------
dict[str, DatasetIOConfiguration]
A dictionary where:
* Keys: Locations in the NWB of objects with mismatched IDs.
* Values: New `DatasetIOConfiguration` objects corresponding to the updated object IDs.
Notes
-----
* This function only checks for objects with the same location but different IDs.
* It does not identify objects missing from the current configuration.
* The returned `DatasetIOConfiguration` objects are copies of the original configurations
with updated `object_id` fields.
"""
# Use a fresh default configuration to get mapping of object IDs to locations in file
default_configurations = list(get_default_dataset_io_configurations(nwbfile=nwbfile, backend=self.backend))
if len(default_configurations) != len(self.dataset_configurations):
raise ValueError(
f"The number of default configurations ({len(default_configurations)}) does not match the number of "
f"specified configurations ({len(self.dataset_configurations)})!"
)
objects_requiring_remapping = {}
for dataset_configuration in default_configurations:
location_in_file = dataset_configuration.location_in_file
object_id = dataset_configuration.object_id
location_cannot_be_remapped = location_in_file not in self.dataset_configurations
if location_cannot_be_remapped:
raise KeyError(
f"Unable to remap the object IDs for object at location '{location_in_file}'! This "
"usually occurs if you are attempting to configure the backend for two files of "
"non-equivalent structure."
)
former_configuration = self.dataset_configurations[location_in_file]
former_object_id = former_configuration.object_id
if former_object_id == object_id:
continue
remapped_configuration = former_configuration.model_copy(update={"object_id": object_id})
objects_requiring_remapping[location_in_file] = remapped_configuration
return objects_requiring_remapping
[docs]
def build_remapped_backend(
self,
locations_to_remap: dict[str, DatasetIOConfiguration],
) -> Self:
"""
Build a remapped backend configuration by updating mismatched object IDs.
This function takes a dictionary of new `DatasetIOConfiguration` objects
(as returned by `find_locations_requiring_remapping`) and updates a copy of the current configuration
with these new configurations.
Parameters
----------
locations_to_remap : dict
A dictionary mapping locations in the NWBFile to their corresponding new
`DatasetIOConfiguration` objects with updated IDs.
Returns
-------
Self
A new instance of the backend configuration class with updated object IDs for
the specified locations.
"""
new_backend_configuration = self.model_copy(deep=True)
new_backend_configuration.dataset_configurations.update(locations_to_remap)
return new_backend_configuration
[docs]
def apply_global_compression(
self,
compression_method: str,
compression_options: dict[str, Any] | None = None,
) -> None:
"""
Apply compression settings to all datasets in this backend configuration.
This method modifies the backend configuration in-place, applying the specified
compression method and options to ALL datasets, regardless of their current
compression settings.
Parameters
----------
compression_method : str
The compression method to apply to all datasets (e.g., "gzip", "Blosc", "Zstd").
compression_options : dict, optional
Additional compression options to apply. The available options depend on the
compression method chosen.
Raises
------
ValueError
If the compression method is not available for this backend type.
Examples
--------
>>> backend_config = get_default_backend_configuration(nwbfile, backend="hdf5")
>>> backend_config.apply_global_compression("Blosc", {"cname": "zstd", "clevel": 5})
"""
# Import here to avoid circular imports
from ._hdf5_dataset_io import AVAILABLE_HDF5_COMPRESSION_METHODS
from ._zarr_dataset_io import AVAILABLE_ZARR_COMPRESSION_METHODS
# Validate compression method for the backend
if self.backend == "hdf5":
available_methods = AVAILABLE_HDF5_COMPRESSION_METHODS
elif self.backend == "zarr":
available_methods = AVAILABLE_ZARR_COMPRESSION_METHODS
else:
raise ValueError(f"Unknown backend: {self.backend}")
if compression_method not in available_methods:
raise ValueError(
f"Compression method '{compression_method}' is not available for backend "
f"'{self.backend}'. Available methods: {list(available_methods.keys())}"
)
# Apply global compression to ALL datasets
for dataset_configuration in self.dataset_configurations.values():
dataset_configuration.compression_method = compression_method
dataset_configuration.compression_options = compression_options