Source code for IOM_plugin_lincombwp

"""The WaveBlocks Project

IOM plugin providing functions for handling
linear combinations of general wavepackets.

@author: R. Bourquin
@copyright: Copyright (C) 2013, 2016 R. Bourquin
@license: Modified BSD License
"""

import numpy as np


[docs]def add_lincombwp(self, parameters, timeslots=None, lincombsize=None, blockid=0): r"""Add storage for the linear combination of general wavepackets. :param parameters: An :py:class:`ParameterProvider` instance with at least the key ``ncomponents``. :param timeslots: The number of time slots we need. Can be set to ``None`` to get automatically growing datasets. :param lincombsize: The (maximal) size ``J`` of the linear combination of wavepackets. If specified this remains fixed for all timeslots. Can be set to ``None`` (default) to get automatically growing datasets. :param blockid: The ID of the data block to operate on. """ N = parameters["ncomponents"] # TODO: Handle multi-component packets assert N == 1 if timeslots is None: T = 0 Ts = None else: T = timeslots Ts = timeslots if lincombsize is None: J = 0 Js = None csJs = 32 else: J = lincombsize Js = lincombsize csJs = min(32, Js) # The overall group containing all lincombwp data grp_lc = self._srf[self._prefixb + str(blockid)].require_group("lincombwp") # Create the dataset with appropriate parameters daset_tg_c = grp_lc.create_dataset("timegrid_coefficients", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1) daset_tg_p = grp_lc.create_dataset("timegrid_packets", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1) grp_lc.create_dataset("lincomb_size", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,)) # Coefficients grp_lc.create_dataset("coefficients", (T, J), dtype=np.complexfloating, chunks=(1, csJs), maxshape=(Ts, Js)) # Packet IDs (32 characters is the length of a 'md5' digest in hex representation) daset_refs = grp_lc.create_dataset("packet_refs", (T, J), dtype=np.dtype((str, 32)), chunks=(1, csJs), maxshape=(Ts, Js)) gid = self.create_group(groupid="wavepacketsLCblock" + str(blockid)) daset_refs.attrs["packet_gid"] = gid # Attach pointer to timegrid daset_tg_c.attrs["pointer"] = 0 daset_tg_p.attrs["pointer"] = 0
[docs]def delete_lincombwp(self, blockid=0): r"""Remove the stored linear combination. :param blockid: The ID of the data block to operate on. """ try: del self._srf[self._prefixb + str(blockid) + "/lincombwp"] except KeyError: pass
[docs]def has_lincombwp(self, blockid=0): r"""Ask if the specified data block has the desired data tensor. :param blockid: The ID of the data block to operate on. """ return "lincombwp" in self._srf[self._prefixb + str(blockid)].keys()
[docs]def save_lincombwp_description(self, descr, blockid=0): r"""Save the description of this linear combination. :param descr: The description. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombwp" # Save the description for key, value in descr.items(): self._srf[pathd].attrs[key] = self._save_attr_value(value)
[docs]def save_lincombwp_coefficients(self, coefficients, timestep=None, blockid=0): r"""Save the coefficients of the linear combination to a file. :param coefficients: The coefficients of the linear combination of wavepackets. :type coefficients: A single, suitable :py:class:`ndarray`. :param timestep: The timestep at which we save the data. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/coefficients" timeslot = self._srf[pathtg].attrs["pointer"] # Write the data self.must_resize(pathlcs, timeslot) J = np.size(coefficients) self._srf[pathlcs][timeslot] = J self.must_resize(pathd, timeslot) if not J == 0: self.must_resize(pathd, J - 1, axis=1) self._srf[pathd][timeslot, :J] = np.squeeze(coefficients) # Write the timestep to which the stored values belong into the timegrid self.must_resize(pathtg, timeslot) self._srf[pathtg][timeslot] = timestep # Update the pointer self._srf[pathtg].attrs["pointer"] += 1
[docs]def save_lincombwp_wavepackets(self, packetlist, timestep=None, blockid=0): r"""Save the wavepackets being part of this linear combination. .. warning:: This is quite an expensive operation. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets" pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs" gid = self._srf[pathd].attrs["packet_gid"] timeslot = self._srf[pathtg].attrs["pointer"] # Book keeping self.must_resize(pathd, timeslot) K = len(packetlist) if not K == 0: self.must_resize(pathd, K - 1, axis=1) # Save the packets known_packets = self.get_block_ids(groupid=gid) for k, packet in enumerate(packetlist): bid = "LC" + str(blockid) + "WP" + str(packet.get_id()) if bid not in known_packets: bid = self.create_block(blockid=bid, groupid=gid) descr = packet.get_description() self.add_genericwp(descr, blockid=bid) self.save_genericwp(packet, timestep=timestep, blockid=bid) # Book keeping self._srf[pathd][timeslot, k] = packet.get_id() # Write the timestep to which the stored packets belong into the timegrid self.must_resize(pathtg, timeslot) self._srf[pathtg][timeslot] = timestep # Update the pointer self._srf[pathtg].attrs["pointer"] += 1
[docs]def load_lincombwp_description(self, blockid=0): r"""Load the description of this linear combination. :param blockid: The ID of the data block to operate on. """ pathd = "/" + self._prefixb + str(blockid) + "/lincombwp" # Load and return all descriptions available descr = {} for key, value in self._srf[pathd].attrs.items(): descr[key] = self._load_attr_value(value) return descr
[docs]def load_lincombwp_timegrid(self, blockid=0, key=("coeffs", "packets")): r"""Load the timegrid of this linear combination. :param blockid: The ID of the data block to operate on. :param key: Specify which linear combination timegrids to load. All are independent. :type key: Tuple of valid identifier strings that are ``coeffs`` and ``packets``. Default is ``("coeffs", "packets")``. """ tg = [] for item in key: if item == "coeffs": pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients" tg.append(self._srf[pathtg][:]) elif item == "packets": pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets" tg.append(self._srf[pathtg][:]) if len(tg) == 1: return tg[0] else: return tuple(tg)
[docs]def load_lincombwp_size(self, timestep=None, blockid=0): r"""Load the size (number of packets) of this linear combination. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) return self._srf[pathlcs][index] else: index = slice(None) return self._srf[pathlcs][index]
[docs]def load_lincombwp_coefficients(self, timestep=None, blockid=0): r"""Load the coefficients of this linear combination. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/coefficients" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) J = self._srf[pathlcs][index] return self._srf[pathd][index, :J] else: index = slice(None) return self._srf[pathd][index, :]
[docs]def load_lincombwp_wavepackets(self, timestep, packetindex=None, blockid=0): r"""Load the wavepackets being part of this linear combination. Note that this is quite an expensive operation. :param timestep: Load only the data of this timestep. :param packetindex: Load only the packet with this index. If ``None`` then load all packets for the given timestep. :param blockid: The ID of the data block to operate on. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets" pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size" pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs" index = self.find_timestep_index(pathtg, timestep) J = self._srf[pathlcs][index] refs = self._srf[pathd][index, :J] if packetindex is None: packets = [] for ref in refs: bid = "LC" + str(blockid) + "WP" + str(ref) packets.append(self.load_genericwp(timestep=timestep, blockid=bid)) return tuple(packets) else: if packetindex >= J: raise ValueError("Packet index is invalid.") bid = "LC" + str(blockid) + "WP" + str(refs[packetindex]) return self.load_genericwp(timestep=timestep, blockid=bid)
[docs]def load_lincombwp_wavepacket_refs(self, timestep=None, blockid=0): r"""Load the references of the wavepackets being part of this linear combination. References can be used as ``blockid`` for loading selected wavepackets manually. If for example a ``ref`` obtained through this method is: >>> refs = anIom.load_lincombwp_wavepacket_refs(timestep=4) >>> refs array(['673290fd36a0fa80f28973ae31f10378', '075dc9d7d2c558c97608e2fe08a7d53d', '0aed8bf3e21b5894bf89ef894d3f7d0c'], dtype='|S32') >>> ref = refs[0] '673290fd36a0fa80f28973ae31f10378' the the corresponding block ID is: >>> bid = "LC" + str(blockid) + "WP" + ref 'LC0WP673290fd36a0fa80f28973ae31f10378' with ``blockid`` the block ID where the linear combination was stored. With that ``bid`` we can now for example load data of a selected wavepacket: >>> Pi = anIom.load_wavepacket_parameters(timestep=4, blockid=bid) in case of a Hagedorn wavepacket. :param timestep: Load only the data of this timestep. :param blockid: The ID of the data block to operate on. :return: A :py:class:`ndarray` of strings. """ pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets" pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs" if timestep is not None: index = self.find_timestep_index(pathtg, timestep) else: index = slice(None) return self._srf[pathd][index, :]
# # The following two methods are only for convenience and are NOT particularly efficient. #
[docs]def load_lincombwp(self, timestep, blockid=0): r"""Load a linear combination at a given timestep and return a fully configured :py:class:`LinearCombinationOfWPs` instance. This method just calls some other :py:class:`IOManager` methods in the correct order. It is included only for convenience and is not particularly efficient. :param timestep: The timestep :math:`n` we load the wavepacket. :param blockid: The ID of the data block to operate on. :return: A :py:class:`LinearCombinationOfWPs` instance. """ from WaveBlocksND.LinearCombinationOfWPs import LinearCombinationOfWPs descr = self.load_lincombwp_description(blockid=blockid) J = self.load_lincombwp_size(timestep=timestep, blockid=blockid) if J == 0: return None # Load the data c = self.load_lincombwp_coefficients(timestep=timestep, blockid=blockid) psi = self.load_lincombwp_wavepackets(timestep=timestep, blockid=blockid) # Assemble the linear combination LC = LinearCombinationOfWPs(descr["dimension"], descr["ncomponents"]) LC.add_wavepackets(psi, c) return LC
[docs]def save_lincombwp(self, lincomb, timestep, blockid=0): r"""Save a linear combination of general wavepackets at a given timestep and read all data to save from the :py:class:`LinearCombinationOfWPs` instance provided. This method just calls some other :py:class:`IOManager` methods in the correct order. It is included only for convenience and is not particularly efficient. We assume the linear combination is already set up with the correct :py:meth:`add_lincombwp` method call. :param lincomb: The :py:class:`LinearCombinationOfWPs` instance we want to save. :param timestep: The timestep :math:`n` at which we save the linear combination. :param blockid: The ID of the data block to operate on. """ # Description self.save_lincombwp_description(lincomb.get_description(), blockid=blockid) # Wavepackets self.save_lincombwp_wavepackets(lincomb.get_wavepackets(), timestep=timestep, blockid=blockid) # Coefficients self.save_lincombwp_coefficients(lincomb.get_coefficients(), timestep=timestep, blockid=blockid)