"""The WaveBlocks Project
IOM plugin providing functions for handling
linear combinations of general wavepackets.
@author: R. Bourquin
@copyright: Copyright (C) 2013, 2016 R. Bourquin
@license: Modified BSD License
"""
import numpy as np
[docs]def add_lincombwp(self, parameters, timeslots=None, lincombsize=None, blockid=0):
r"""Add storage for the linear combination of general wavepackets.
:param parameters: An :py:class:`ParameterProvider` instance with at
least the key ``ncomponents``.
:param timeslots: The number of time slots we need. Can be set to ``None``
to get automatically growing datasets.
:param lincombsize: The (maximal) size ``J`` of the linear combination of wavepackets. If specified
this remains fixed for all timeslots. Can be set to ``None`` (default)
to get automatically growing datasets.
:param blockid: The ID of the data block to operate on.
"""
N = parameters["ncomponents"]
# TODO: Handle multi-component packets
assert N == 1
if timeslots is None:
T = 0
Ts = None
else:
T = timeslots
Ts = timeslots
if lincombsize is None:
J = 0
Js = None
csJs = 32
else:
J = lincombsize
Js = lincombsize
csJs = min(32, Js)
# The overall group containing all lincombwp data
grp_lc = self._srf[self._prefixb + str(blockid)].require_group("lincombwp")
# Create the dataset with appropriate parameters
daset_tg_c = grp_lc.create_dataset("timegrid_coefficients", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1)
daset_tg_p = grp_lc.create_dataset("timegrid_packets", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,), fillvalue=-1)
grp_lc.create_dataset("lincomb_size", (T,), dtype=np.integer, chunks=True, maxshape=(Ts,))
# Coefficients
grp_lc.create_dataset("coefficients", (T, J), dtype=np.complexfloating, chunks=(1, csJs), maxshape=(Ts, Js))
# Packet IDs (32 characters is the length of a 'md5' digest in hex representation)
daset_refs = grp_lc.create_dataset("packet_refs", (T, J), dtype=np.dtype((str, 32)), chunks=(1, csJs), maxshape=(Ts, Js))
gid = self.create_group(groupid="wavepacketsLCblock" + str(blockid))
daset_refs.attrs["packet_gid"] = gid
# Attach pointer to timegrid
daset_tg_c.attrs["pointer"] = 0
daset_tg_p.attrs["pointer"] = 0
[docs]def delete_lincombwp(self, blockid=0):
r"""Remove the stored linear combination.
:param blockid: The ID of the data block to operate on.
"""
try:
del self._srf[self._prefixb + str(blockid) + "/lincombwp"]
except KeyError:
pass
[docs]def has_lincombwp(self, blockid=0):
r"""Ask if the specified data block has the desired data tensor.
:param blockid: The ID of the data block to operate on.
"""
return "lincombwp" in self._srf[self._prefixb + str(blockid)].keys()
[docs]def save_lincombwp_description(self, descr, blockid=0):
r"""Save the description of this linear combination.
:param descr: The description.
:param blockid: The ID of the data block to operate on.
"""
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp"
# Save the description
for key, value in descr.items():
self._srf[pathd].attrs[key] = self._save_attr_value(value)
[docs]def save_lincombwp_coefficients(self, coefficients, timestep=None, blockid=0):
r"""Save the coefficients of the linear combination to a file.
:param coefficients: The coefficients of the linear combination of wavepackets.
:type coefficients: A single, suitable :py:class:`ndarray`.
:param timestep: The timestep at which we save the data.
:param blockid: The ID of the data block to operate on.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients"
pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size"
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/coefficients"
timeslot = self._srf[pathtg].attrs["pointer"]
# Write the data
self.must_resize(pathlcs, timeslot)
J = np.size(coefficients)
self._srf[pathlcs][timeslot] = J
self.must_resize(pathd, timeslot)
if not J == 0:
self.must_resize(pathd, J - 1, axis=1)
self._srf[pathd][timeslot, :J] = np.squeeze(coefficients)
# Write the timestep to which the stored values belong into the timegrid
self.must_resize(pathtg, timeslot)
self._srf[pathtg][timeslot] = timestep
# Update the pointer
self._srf[pathtg].attrs["pointer"] += 1
[docs]def save_lincombwp_wavepackets(self, packetlist, timestep=None, blockid=0):
r"""Save the wavepackets being part of this linear combination.
.. warning:: This is quite an expensive operation.
:param timestep: Load only the data of this timestep.
:param blockid: The ID of the data block to operate on.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets"
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs"
gid = self._srf[pathd].attrs["packet_gid"]
timeslot = self._srf[pathtg].attrs["pointer"]
# Book keeping
self.must_resize(pathd, timeslot)
K = len(packetlist)
if not K == 0:
self.must_resize(pathd, K - 1, axis=1)
# Save the packets
known_packets = self.get_block_ids(groupid=gid)
for k, packet in enumerate(packetlist):
bid = "LC" + str(blockid) + "WP" + str(packet.get_id())
if bid not in known_packets:
bid = self.create_block(blockid=bid, groupid=gid)
descr = packet.get_description()
self.add_genericwp(descr, blockid=bid)
self.save_genericwp(packet, timestep=timestep, blockid=bid)
# Book keeping
self._srf[pathd][timeslot, k] = packet.get_id()
# Write the timestep to which the stored packets belong into the timegrid
self.must_resize(pathtg, timeslot)
self._srf[pathtg][timeslot] = timestep
# Update the pointer
self._srf[pathtg].attrs["pointer"] += 1
[docs]def load_lincombwp_description(self, blockid=0):
r"""Load the description of this linear combination.
:param blockid: The ID of the data block to operate on.
"""
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp"
# Load and return all descriptions available
descr = {}
for key, value in self._srf[pathd].attrs.items():
descr[key] = self._load_attr_value(value)
return descr
[docs]def load_lincombwp_timegrid(self, blockid=0, key=("coeffs", "packets")):
r"""Load the timegrid of this linear combination.
:param blockid: The ID of the data block to operate on.
:param key: Specify which linear combination timegrids to load. All are independent.
:type key: Tuple of valid identifier strings that are ``coeffs`` and ``packets``.
Default is ``("coeffs", "packets")``.
"""
tg = []
for item in key:
if item == "coeffs":
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients"
tg.append(self._srf[pathtg][:])
elif item == "packets":
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets"
tg.append(self._srf[pathtg][:])
if len(tg) == 1:
return tg[0]
else:
return tuple(tg)
[docs]def load_lincombwp_size(self, timestep=None, blockid=0):
r"""Load the size (number of packets) of this linear combination.
:param timestep: Load only the data of this timestep.
:param blockid: The ID of the data block to operate on.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients"
pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size"
if timestep is not None:
index = self.find_timestep_index(pathtg, timestep)
return self._srf[pathlcs][index]
else:
index = slice(None)
return self._srf[pathlcs][index]
[docs]def load_lincombwp_coefficients(self, timestep=None, blockid=0):
r"""Load the coefficients of this linear combination.
:param timestep: Load only the data of this timestep.
:param blockid: The ID of the data block to operate on.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_coefficients"
pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size"
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/coefficients"
if timestep is not None:
index = self.find_timestep_index(pathtg, timestep)
J = self._srf[pathlcs][index]
return self._srf[pathd][index, :J]
else:
index = slice(None)
return self._srf[pathd][index, :]
[docs]def load_lincombwp_wavepackets(self, timestep, packetindex=None, blockid=0):
r"""Load the wavepackets being part of this linear combination.
Note that this is quite an expensive operation.
:param timestep: Load only the data of this timestep.
:param packetindex: Load only the packet with this index. If ``None``
then load all packets for the given timestep.
:param blockid: The ID of the data block to operate on.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets"
pathlcs = "/" + self._prefixb + str(blockid) + "/lincombwp/lincomb_size"
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs"
index = self.find_timestep_index(pathtg, timestep)
J = self._srf[pathlcs][index]
refs = self._srf[pathd][index, :J]
if packetindex is None:
packets = []
for ref in refs:
bid = "LC" + str(blockid) + "WP" + str(ref)
packets.append(self.load_genericwp(timestep=timestep, blockid=bid))
return tuple(packets)
else:
if packetindex >= J:
raise ValueError("Packet index is invalid.")
bid = "LC" + str(blockid) + "WP" + str(refs[packetindex])
return self.load_genericwp(timestep=timestep, blockid=bid)
[docs]def load_lincombwp_wavepacket_refs(self, timestep=None, blockid=0):
r"""Load the references of the wavepackets being part of
this linear combination. References can be used as ``blockid``
for loading selected wavepackets manually. If for example a
``ref`` obtained through this method is:
>>> refs = anIom.load_lincombwp_wavepacket_refs(timestep=4)
>>> refs
array(['673290fd36a0fa80f28973ae31f10378',
'075dc9d7d2c558c97608e2fe08a7d53d',
'0aed8bf3e21b5894bf89ef894d3f7d0c'],
dtype='|S32')
>>> ref = refs[0]
'673290fd36a0fa80f28973ae31f10378'
the the corresponding block ID is:
>>> bid = "LC" + str(blockid) + "WP" + ref
'LC0WP673290fd36a0fa80f28973ae31f10378'
with ``blockid`` the block ID where the linear combination
was stored. With that ``bid`` we can now for example load
data of a selected wavepacket:
>>> Pi = anIom.load_wavepacket_parameters(timestep=4, blockid=bid)
in case of a Hagedorn wavepacket.
:param timestep: Load only the data of this timestep.
:param blockid: The ID of the data block to operate on.
:return: A :py:class:`ndarray` of strings.
"""
pathtg = "/" + self._prefixb + str(blockid) + "/lincombwp/timegrid_packets"
pathd = "/" + self._prefixb + str(blockid) + "/lincombwp/packet_refs"
if timestep is not None:
index = self.find_timestep_index(pathtg, timestep)
else:
index = slice(None)
return self._srf[pathd][index, :]
#
# The following two methods are only for convenience and are NOT particularly efficient.
#
[docs]def load_lincombwp(self, timestep, blockid=0):
r"""Load a linear combination at a given timestep and return a fully configured
:py:class:`LinearCombinationOfWPs` instance. This method just calls some other
:py:class:`IOManager` methods in the correct order. It is included only for
convenience and is not particularly efficient.
:param timestep: The timestep :math:`n` we load the wavepacket.
:param blockid: The ID of the data block to operate on.
:return: A :py:class:`LinearCombinationOfWPs` instance.
"""
from WaveBlocksND.LinearCombinationOfWPs import LinearCombinationOfWPs
descr = self.load_lincombwp_description(blockid=blockid)
J = self.load_lincombwp_size(timestep=timestep, blockid=blockid)
if J == 0:
return None
# Load the data
c = self.load_lincombwp_coefficients(timestep=timestep, blockid=blockid)
psi = self.load_lincombwp_wavepackets(timestep=timestep, blockid=blockid)
# Assemble the linear combination
LC = LinearCombinationOfWPs(descr["dimension"], descr["ncomponents"])
LC.add_wavepackets(psi, c)
return LC
[docs]def save_lincombwp(self, lincomb, timestep, blockid=0):
r"""Save a linear combination of general wavepackets at a given timestep and read
all data to save from the :py:class:`LinearCombinationOfWPs` instance provided. This
method just calls some other :py:class:`IOManager` methods in the correct order.
It is included only for convenience and is not particularly efficient. We assume
the linear combination is already set up with the correct :py:meth:`add_lincombwp`
method call.
:param lincomb: The :py:class:`LinearCombinationOfWPs` instance we want to save.
:param timestep: The timestep :math:`n` at which we save the linear combination.
:param blockid: The ID of the data block to operate on.
"""
# Description
self.save_lincombwp_description(lincomb.get_description(), blockid=blockid)
# Wavepackets
self.save_lincombwp_wavepackets(lincomb.get_wavepackets(), timestep=timestep, blockid=blockid)
# Coefficients
self.save_lincombwp_coefficients(lincomb.get_coefficients(), timestep=timestep, blockid=blockid)