Source code for IOM_plugin_lincombhawp

"""The WaveBlocks Project

IOM plugin providing functions for handling
linear combinations of Hagedorn wavepackets.

@author: R. Bourquin
@copyright: Copyright (C) 2013, 2016 R. Bourquin
@license: Modified BSD License
"""

import numpy as np


[docs]def add_lincombhawp(self, parameters, timeslots=None, lincombsize=None, wavepacketsize=None, blockid=0, key=("q", "p", "Q", "P", "S")): r"""Add storage for the linear combination of Hagedorn wavepackets. :param parameters: An :py:class:`ParameterProvider` instance with at least the keys ``dimension`` and ``ncomponents``. :param timeslots: The number of time slots we need. Can be set to ``None`` to get automatically growing datasets. :param lincombsize: The (maximal) size ``J`` of the linear combination of wavepackets. If specified this remains fixed for all timeslots. Can be set to ``None`` (default) to get automatically growing datasets. :param wavepacketsize: The (maximal) basis shape size ``K`` of each of the wavepackets. If specified this remains fixed for all timeslots. Can be set to ``None`` (default) to get automatically growing datasets. :param blockid: The ID of the data block to operate on. :param key: Specify which parameters to save. All are independent. :type key: Tuple of valid identifier strings that are ``q``, ``p``, ``Q``, ``P``, ``S`` and ``adQ``. Default is ``("q", "p", "Q", "P", "S")``. """ N = parameters["ncomponents"] D = parameters["dimension"] # TODO: Handle multi-component packets assert N == 1 if timeslots is None: T = 0 Ts = None else: T = timeslots Ts = timeslots if lincombsize is None: J = 0 Js = None csJs = 32 else: J = lincombsize Js = lincombsize csJs = min(32, Js) if wavepacketsize is None: K = 0 Ks = None csKs = 8 else: K = wavepacketsize Ks = wavepacketsize csKs = min(8, Ks) # The overall group containing all lincombwp data grp_lc = self._srf[self._prefixb + str(blockid)].require_group("lincombhawp") # The group for storing the wavepacket basis shapes grp_lc.create_group("basisshapes") # The group for storing the wavepacket parameter set Pi grp_wppi = grp_lc.create_group("Pi") # The group for storing the wavepacket coefficients grp_wpci = grp_lc.create_group("wp_coefficients") # Create the dataset with appropriate parameters daset_tg_lc = grp_lc.create_dataset("timegrid_lc_coefficients", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1) grp_lc.create_dataset("timegrid_wp_parameters", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1) grp_lc.create_dataset("timegrid_wp_coefficients", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1) grp_lc.create_dataset("lincomb_size", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=J) # Linear combination coefficients grp_lc.create_dataset("lc_coefficients", (T, J), dtype=np.complexfloating, chunks=(1, csJs), maxshape=(Ts, Js)) # Linear combination wavepackets grp_lc.create_dataset("basis_shapes_hashes", (T, J, N), dtype=np.integer, chunks=(1, csJs, 1), maxshape=(Ts, Js, N)) grp_lc.create_dataset("basis_sizes", (T, J, N), dtype=np.integer, chunks=(1, csJs, 1), maxshape=(Ts, Js, N)) # Wavepacket parameters if "q" in key and "q" not in grp_wppi.keys(): grp_wppi.create_dataset("q", (T, J, D), dtype=np.complexfloating, chunks=(1, csJs, D), maxshape=(Ts, Js, D)) if "p" in key and "p" not in grp_wppi.keys(): grp_wppi.create_dataset("p", (T, J, D), dtype=np.complexfloating, chunks=(1, csJs, D), maxshape=(Ts, Js, D)) if "Q" in key and "Q" not in grp_wppi.keys(): grp_wppi.create_dataset("Q", (T, J, D, D), dtype=np.complexfloating, chunks=(1, csJs, D, D), maxshape=(Ts, Js, D, D)) if "P" in key and "P" not in grp_wppi.keys(): grp_wppi.create_dataset("P", (T, J, D, D), dtype=np.complexfloating, chunks=(1, csJs, D, D), maxshape=(Ts, Js, D, D)) if "S" in key and "S" not in grp_wppi.keys(): grp_wppi.create_dataset("S", (T, J, 1), dtype=np.complexfloating, chunks=(1, csJs, 1), maxshape=(Ts, Js, 1)) # Wavepacket coefficients for i in range(N): grp_wpci.create_dataset("c_" + str(i), (T, J, K), dtype=np.complexfloating, chunks=(1, csJs, csKs), maxshape=(Ts, Js, Ks)) # Attach pointer to timegrid daset_tg_lc.attrs["pointer"] = 0 grp_wppi.attrs["pointer"] = 0 grp_wpci.attrs["pointer"] = 0
[docs]def delete_lincombhawp(self, blockid=0): r"""Remove the stored linear combination. :param blockid: The ID of the data block to operate on. """ try: del self._srf[self._prefixb + str(blockid) + "/lincombhawp"] except KeyError: pass
[docs]def has_lincombhawp(self, blockid=0): r"""Ask if the specified data block has the desired data tensor. :param blockid: The ID of the data block to operate on. """ return "lincombhawp" in self._srf[self._prefixb + str(blockid)].keys()
[docs]def save_lincombhawp_description(self, descr, blockid=0): r"""Save the description of this linear combination. :param descr: The description. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp" # Save the description for key, value in descr.items(): self._srf[pathd].attrs[key] = self._save_attr_value(value)
[docs]def save_lincombhawp_coefficients(self, coefficients, timestep, blockid=0): r"""Save the coefficients of the linear combination to a file. :param coefficients: The coefficients of the linear combination of wavepackets. :type coefficients: A single, suitable ``ndarray``. :param timestep: The timestep at which we save the data. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_lc_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/lc_coefficients" timeslot = self._srf[pathtg].attrs["pointer"] # Write the data self.must_resize(pathlcs, timeslot) J = np.size(coefficients) self._srf[pathlcs][timeslot] = J self.must_resize(pathd, timeslot) if not J == 0: self.must_resize(pathd, J - 1, axis=1) self._srf[pathd][timeslot, :J] = np.squeeze(coefficients) # Write the timestep to which the stored values belong into the timegrid self.must_resize(pathtg, timeslot) self._srf[pathtg][timeslot] = timestep # Update the pointer self._srf[pathtg].attrs["pointer"] += 1
[docs]def save_lincombhawp_wavepacket_parameters(self, parameters, timestep, blockid=0, key=("q", "p", "Q", "P", "S")): r"""Save the parameter set :math:`\Pi` of the Hagedorn wavepacket :math:`\Psi` to a file. :param parameters: The parameter set of the Hagedorn wavepacket. :type parameters: A ``list`` containing the (five) ``ndarrays`` like :math:`(q,p,Q,P,S)` :param timestep: The timestep at which we save the data. :param blockid: The ID of the data block to operate on. :param key: Specify which parameters to save. All are independent. :type key: Tuple of valid identifier strings that are ``q``, ``p``, ``Q``, ``P``, ``S`` and ``adQ``. Default is ``("q", "p", "Q", "P", "S")``. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_wp_parameters" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/Pi/" timeslot = self._srf[pathd].attrs["pointer"] # TODO: This an assumption based on data layout and stable J = parameters[0].shape[0] # Write the basis size self.must_resize(pathlcs, timeslot) self._srf[pathlcs][timeslot] = J # Write the parameters for key, item in zip(key, parameters): self.must_resize(pathd + key, timeslot) self.must_resize(pathd + key, J - 1, axis=1) self._srf[pathd + key][timeslot, :J, ...] = item # Write the timestep to which the stored values belong into the timegrid self.must_resize(pathtg, timeslot) self._srf[pathtg][timeslot] = timestep # Update the pointer self._srf[pathd].attrs["pointer"] += 1
[docs]def save_lincombhawp_wavepacket_coefficients(self, coefficients, basisshapes, timestep=None, blockid=0): r"""Save the coefficients of the Hagedorn wavepacket linear combination to a file. Warning: we do only save tha hash of the basis shapes here! You have to save the basis shape with the corresponding function too. :param coefficients: The coefficients of the Hagedorn wavepacket. :type coefficients: A ``list`` with :math:`N` suitable ``ndarrays``. :param basisshapes: The corresponding basis shapes of the Hagedorn wavepacket. :type basisshapes: A ``list`` with :math:`N` :py:class:`BasisShape` subclass instances. :param timestep: The timestep at which we save the data. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_wp_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathbsi = "/" + self._prefixb + str(blockid) + "/lincombhawp/basis_sizes" pathbsh = "/" + self._prefixb + str(blockid) + "/lincombhawp/basis_shapes_hashes" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/wp_coefficients/" timeslot = self._srf[pathd].attrs["pointer"] # Write the lincomb size basissizes = [K.get_basis_size() for K in basisshapes] J = len(basissizes) self.must_resize(pathlcs, timeslot) self._srf[pathlcs][timeslot] = J # Write all basis sizes self.must_resize(pathbsi, timeslot) self.must_resize(pathbsi, J - 1, axis=1) self._srf[pathbsi][timeslot, :J, 0] = np.array(basissizes) # Write basis shape hashes basisshapeshashes = np.array([hash(K) for K in basisshapes]) self.must_resize(pathbsh, timeslot) self.must_resize(pathbsh, J - 1, axis=1) self._srf[pathbsh][timeslot, :J, 0] = basisshapeshashes # Write the wavepackets coefficients data coefficients = np.atleast_2d(coefficients) j, k = coefficients.shape # TODO: Allow wavepackets with multiple components index = 0 pathc = pathd + "c_" + str(index) # Do we have to resize due to changed number of packets or coefficients self.must_resize(pathc, timeslot) self.must_resize(pathc, j - 1, axis=1) self.must_resize(pathc, k - 1, axis=2) self._srf[pathc][timeslot, :j, :k] = coefficients # Write the timestep to which the stored values belong into the timegrid self.must_resize(pathtg, timeslot) self._srf[pathtg][timeslot] = timestep # Update the pointer self._srf[pathd].attrs["pointer"] += 1
[docs]def save_lincombhawp_wavepacket_basisshapes(self, basisshapes, blockid=0): r"""Save the basis shapes of the linear combination of Hagedorn wavepacket to a file. :param basisshapes: A list of the basis shapes of the linear combination. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/basisshapes/" for basisshape in basisshapes: ha = hash(basisshape) name = "basis_shape_" + str(ha) # Chech if we already stored this basis shape if name not in self._srf[pathd].keys(): # TODO: Consider storing all hashes in one big dataset # Create new data set daset = self._srf[pathd].create_dataset("basis_shape_" + str(ha), (1,), dtype=np.integer) daset[0] = ha # Save the description descr = basisshape.get_description() for key, value in descr.items(): daset.attrs[key] = self._save_attr_value(value)
[docs]def load_lincombhawp_description(self, blockid=0): r"""Load the description of this linear combination. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp" # Load and return all descriptions available descr = {} for key, value in self._srf[pathd].attrs.items(): descr[key] = self._load_attr_value(value) return descr
[docs]def load_lincombhawp_timegrid(self, blockid=0, key=("coeffs", "packets")): r"""Load the timegrid of this linear combination. :param blockid: The ID of the data block to operate on. :param key: Specify which linear combination timegrids to load. All are independent. :type key: Tuple of valid identifier strings that are ``ceoffs`` and ``packets``. Default is ``("coeffs", "packets")``. """ tg = [] for item in key: if item == "coeffs": pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_lc_coefficients" tg.append(self._srf[pathtg][:]) elif item == "packets": pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_lc_packets" tg.append(self._srf[pathtg][:]) if len(tg) == 1: return tg[0] else: return tuple(tg)
[docs]def load_lincombhawp_size(self, timestep=None, blockid=0): r"""Load the size (number of packets) of this linear combination. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_lc_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) return self._srf[pathlcs][index] else: index = slice(None) return self._srf[pathlcs][index]
[docs]def load_lincombhawp_coefficients(self, timestep=None, blockid=0): r"""Load the coefficients of this linear combination. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_lc_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/lc_coefficients" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) J = self._srf[pathlcs][index] return self._srf[pathd][index, :J] else: index = slice(None) return self._srf[pathd][index, :]
[docs]def load_lincombhawp_wavepacket_parameters(self, timestep=None, blockid=0, key=("q", "p", "Q", "P", "S")): r"""Load the wavepacket parameters. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. :param key: Specify which parameters to load. All are independent. :type key: Tuple of valid identifier strings that are ``q``, ``p``, ``Q``, ``P``, ``S`` and ``adQ``. Default is ``("q", "p", "Q", "P", "S")``. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_wp_parameters" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/Pi/" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) J = self._srf[pathlcs][index] params = tuple([self._srf[pathd + k][index, :J, ...] for k in key]) else: params = tuple([self._srf[pathd + k][:, :, ...] for k in key]) return params
[docs]def load_lincombhawp_wavepacket_coefficients(self, timestep=None, get_hashes=False, blockid=0): r"""Load the wavepacket coefficients. :param timestep: Load only the data of this timestep. :param get_hashes: Return the corresponding basis shape hashes. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombhawp/timegrid_wp_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombhawp/lincomb_size" pathbsh = "/" + self._prefixb + str(blockid) + "/lincombhawp/basis_shapes_hashes" pathbsi = "/" + self._prefixb + str(blockid) + "/lincombhawp/basis_sizes" pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/wp_coefficients/" # TODO: Allow wavepackets with multiple components i = 0 if timestep is not None: index = self.find_timestep_index(pathtg, timestep) Js = slice(0, self._srf[pathlcs][index]) Ks = slice(0, np.max(self._srf[pathbsi][index, :, 0])) else: index = slice(None) Js = slice(None) Ks = slice(None) # Load the hash data if get_hashes is True: hashes = self._srf[pathbsh][index, Js] # Load the coefficient data data = self._srf[pathd + "c_" + str(i)][index, Js, Ks] if get_hashes is True: return (hashes, data) else: return data
[docs]def load_lincombhawp_wavepacket_basisshapes(self, the_hash=None, blockid=0): r"""Load the basis shapes by hash. :param the_hash: The hash of the basis shape whose description we want to load. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombhawp/basisshapes/" if the_hash is None: # Load and return all descriptions available descrs = {} for ahash in self._srf[pathd].keys(): # TODO: What data exactly do we want to return? descr = {} for key, value in self._srf[pathd + ahash].attrs.items(): descr[key] = self._load_attr_value(value) # 'ahash' is "basis_shape_..." and we want only the "..." part descrs[int(ahash[12:])] = descr return descrs else: the_hash = int(the_hash) name = "basis_shape_" + str(the_hash) # Chech if we already stored this basis shape if name in self._srf[pathd].keys(): # TODO: What data exactly do we want to return? descr = {} for key, value in self._srf[pathd + name].attrs.items(): descr[key] = self._load_attr_value(value) return descr else: raise IndexError("No basis shape with given hash {}".format(hash))
# # The following two methods are only for convenience and are NOT particularly efficient. #
[docs]def load_lincombhawp(self, timestep, blockid=0, key=("q", "p", "Q", "P", "S")): r"""Load a linear combination at a given timestep and return a fully configured :py:class:`LinearCombinationOfHAWPs` instance. This method just calls some other :py:class:`IOManager` methods in the correct order. It is included only for convenience and is not particularly efficient. :param timestep: The timestep :math:`n` we load the wavepacket. :param blockid: The ID of the data block to operate on. :return: A :py:class:`LinearCombinationOfHAWPs` instance. """ from WaveBlocksND.LinearCombinationOfHAWPs import LinearCombinationOfHAWPs from WaveBlocksND.BlockFactory import BlockFactory BF = BlockFactory() descr = self.load_lincombhawp_description(blockid=blockid) # Empty linear combination J = self.load_lincombhawp_size(timestep=timestep, blockid=blockid) if J == 0: return None # A new and empty linear combination LC = LinearCombinationOfHAWPs(descr["dimension"], descr["ncomponents"], descr["eps"], number_packets=J) # Basis shapes K_descrs = self.load_lincombhawp_wavepacket_basisshapes(blockid=blockid) K = {ha: BF.create_basis_shape(de) for ha, de in K_descrs.items()} # Coefficients and basis shape hashes hashes, coeffs = self.load_lincombhawp_wavepacket_coefficients(timestep=timestep, get_hashes=True, blockid=blockid) Ks = [K[ha] for ha in np.squeeze(hashes)] LC.set_wavepacket_coefficients(coeffs, Ks) # Parameters Pi = self.load_lincombhawp_wavepacket_parameters(timestep=timestep, blockid=blockid, key=key) LC.set_wavepacket_parameters(Pi) # Cj Cj = self.load_lincombhawp_coefficients(timestep=timestep, blockid=blockid) LC.set_coefficients(Cj) return LC
[docs]def save_lincombhawp(self, lincomb, timestep, blockid=0): r"""Save a linear combination of Hagedorn wavepackets at a given timestep and read all data to save from the :py:class:`LinearCombinationOfHAWPs` instance provided. This method just calls some other :py:class:`IOManager` methods in the correct order. It is included only for convenience and is not particularly efficient. We assume the linear combination is already set up with the correct :py:meth:`add_lincombhawp` method call. :param lincomb: The :py:class:`LinearCombinationOfHAWPs` instance we want to save. :param timestep: The timestep :math:`n` at which we save the linear combination. :param blockid: The ID of the data block to operate on. """ # Description self.save_lincombhawp_description(lincomb.get_description(), blockid=blockid) # Wavepackets Ks = lincomb.get_basis_shapes() self.save_lincombhawp_wavepacket_basisshapes(Ks, blockid=blockid) Pi = lincomb.get_wavepacket_parameters() self.save_lincombhawp_wavepacket_parameters(Pi, timestep=timestep, blockid=blockid) Ck = lincomb.get_wavepacket_coefficients() self.save_lincombhawp_wavepacket_coefficients(Ck, Ks, timestep=timestep, blockid=blockid) # Coefficients Cj = lincomb.get_coefficients() self.save_lincombhawp_coefficients(Cj, timestep=timestep, blockid=blockid)