Source code for neuroconv.tools.nwb_helpers._configure_backend
"""Collection of helper functions related to configuration of datasets dependent on backend."""
import importlib
import math
from hdmf.common import Data
from hdmf.data_utils import AbstractDataChunkIterator, DataChunkIterator
from packaging import version
from pynwb import NWBFile, TimeSeries, get_manager
from ._configuration_models._base_dataset_io import _find_location_in_memory_nwbfile
from ._configuration_models._hdf5_backend import HDF5BackendConfiguration
from ._configuration_models._zarr_backend import ZarrBackendConfiguration
from ..hdmf import has_compound_dtype
from ..importing import get_package_version, is_package_installed
[docs]
def configure_backend(
nwbfile: NWBFile, backend_configuration: HDF5BackendConfiguration | ZarrBackendConfiguration
) -> None:
"""
Configure all datasets specified in the `backend_configuration` with their appropriate DataIO and options.
Parameters
----------
nwbfile : pynwb.NWBFile
The in-memory pynwb.NWBFile object to configure.
backend_configuration : HDF5BackendConfiguration or ZarrBackendConfiguration
The configuration model to use when configuring the datasets for this backend.
"""
is_ndx_events_installed = is_package_installed(package_name="ndx_events")
ndx_events = importlib.import_module("ndx_events") if is_ndx_events_installed else None
nwbfile_is_on_disk = nwbfile.read_io is not None
# A remapping of the object IDs in the backend configuration might necessary
locations_to_remap = backend_configuration.find_locations_requiring_remapping(nwbfile=nwbfile)
if any(locations_to_remap):
backend_configuration = backend_configuration.build_remapped_backend(locations_to_remap=locations_to_remap)
manager = get_manager()
builder = manager.build(nwbfile, export=True)
# Set all DataIO based on the configuration
data_io_class = backend_configuration.data_io_class
for dataset_configuration in backend_configuration.dataset_configurations.values():
object_id = dataset_configuration.object_id
dataset_name = dataset_configuration.dataset_name
data_io_kwargs = dataset_configuration.get_data_io_kwargs()
# TODO: update buffer shape in iterator, if present
neurodata_object = nwbfile.objects[object_id]
is_dataset_linked = isinstance(neurodata_object.fields.get(dataset_name), TimeSeries)
location_in_file = _find_location_in_memory_nwbfile(neurodata_object=neurodata_object, field_name=dataset_name)
dtype_is_compound = has_compound_dtype(builder=builder, location_in_file=location_in_file)
if (
isinstance(neurodata_object.fields.get(dataset_name), AbstractDataChunkIterator)
or dtype_is_compound
or not nwbfile_is_on_disk
):
data_chunk_iterator_class = None
data_chunk_iterator_kwargs = dict()
else: # If the dataset has been written to disk and it is not compound and it is not already an iterator,
# we wrap each neurodata_object in a DataChunkIterator in order to support changes to the I/O settings.
# For more detail, see https://github.com/hdmf-dev/hdmf/issues/1170.
data_chunk_iterator_class = DataChunkIterator
data_chunk_iterator_kwargs = dict(buffer_size=math.prod(dataset_configuration.buffer_shape))
# Table columns
if isinstance(neurodata_object, Data):
neurodata_object.set_data_io(
data_io_class=data_io_class,
data_io_kwargs=data_io_kwargs,
data_chunk_iterator_class=data_chunk_iterator_class,
data_chunk_iterator_kwargs=data_chunk_iterator_kwargs,
)
# TimeSeries data or timestamps
elif isinstance(neurodata_object, TimeSeries) and not is_dataset_linked:
neurodata_object.set_data_io(
dataset_name=dataset_name,
data_io_class=data_io_class,
data_io_kwargs=data_io_kwargs,
data_chunk_iterator_class=data_chunk_iterator_class,
data_chunk_iterator_kwargs=data_chunk_iterator_kwargs,
)
# Special ndx-events v0.2.0 types
elif is_ndx_events_installed and (get_package_version("ndx-events") <= version.parse("0.2.1")):
# Temporarily skipping LabeledEvents
if isinstance(neurodata_object, ndx_events.LabeledEvents):
continue
elif isinstance(neurodata_object, ndx_events.Events):
neurodata_object.set_data_io(
dataset_name=dataset_name,
data_io_class=data_io_class,
data_io_kwargs=data_io_kwargs,
data_chunk_iterator_class=data_chunk_iterator_class,
data_chunk_iterator_kwargs=data_chunk_iterator_kwargs,
)
# Skip the setting of a DataIO when target dataset is a link (assume it will be found in parent)
elif isinstance(neurodata_object, TimeSeries) and is_dataset_linked:
continue
# Strictly speaking, it would be odd if a `backend_configuration` got to this line, but might as well be safe
else:
raise NotImplementedError(
f"Unsupported object type {type(neurodata_object)} for backend configuration "
f"of {neurodata_object.name}!"
)