Source code for paramspace.paramspace

"""Implementation of the ParamSpace class"""

import collections
import copy
import logging
import warnings
from collections import OrderedDict
from functools import reduce
from typing import Dict, Generator, List, Sequence, Set, Tuple, Union

import numpy as np
import numpy.ma

from .paramdim import CoupledParamDim, Masked, ParamDim, ParamDimBase
from .tools import recursive_collect, recursive_replace, recursive_update

log = logging.getLogger(__name__)

# -----------------------------------------------------------------------------


[docs]class ParamSpace: """The ParamSpace class holds dict-like data in which some entries are ParamDim objects. These objects each define one parameter dimension. The ParamSpace class then allows to iterate over the space that is created by the parameter dimensions: at each point of the space (created by the cartesian product of all dimensions), one manifestation of the underlying dict-like data is returned. """ # Define the yaml tag to use yaml_tag = "!pspace" # .........................................................................
[docs] def __init__(self, d: dict): """Initialize a ParamSpace object from a given mapping or sequence. Args: d (Union[MutableMapping, MutableSequence]): The mapping or sequence that will form the parameter space. It is crucial that this object is mutable. """ # Warn if type is unusual if not isinstance(d, collections.abc.MutableMapping): warnings.warn( f"Got unusual type {type(d)} for ParamSpace initialisation! " f"If the given object is not mutable, this might fail at some " f"unexpected later point.", UserWarning, ) # Save a deep copy of the base dictionary. This dictionary will never # be changed. self._init_dict = copy.deepcopy(d) # Initialize a working copy. The parameter dimensions embedded in this # copy will change their values self._dict = copy.deepcopy(self._init_dict) # Initialize attributes that will be used to gather parameter # dimensions and coupled parameter dimensions, and call the function # that gathers these objects self._dims = None self._dims_by_loc = None self._cdims = None self._cdims_by_loc = None self._gather_paramdims() # NOTE attributes are set within this method # Initialize caching attributes self._smap = None self._iter = None
[docs] def _gather_paramdims(self): """Gathers ParamDim objects by recursively going through the dict""" log.debug("Gathering ParamDim objects ...") # Traverse the dict and look for ParamDim objects; collect them as # (order, key, value) tuples, such that they can be sorted by the # iteration order. pdims = recursive_collect( self._dict, select_func=lambda p: isinstance(p, ParamDim), prepend_info=("info_func", "keys"), info_func=lambda p: p.order, stop_recursion_types=(ParamDimBase,), ) # Parse the dimension names # Sort them -- very important for consistency! # This looks at the info first, which is the `order` entry, and then at # the keys. If a ParamDim does not provide an order, it has entry 0 # there, such that entries with the same `order` value get sorted by # their key. pdims.sort() # For initializing OrderedDicts, need to reduce the list items to # 2-tuples, ditching the first element (order) which we needed for # sorting pdims = [tpl[1:] for tpl in pdims] # Now, first save the objects with keys that represent their location # inside the dictionary. self._dims_by_loc = OrderedDict(pdims) # For easier access, save them in another dict, where the keys are pure # strings. To that end, a unique string representation is needed. self._dims = OrderedDict(self._unique_dim_names(pdims)) log.debug("Found %d ParamDim objects.", self.num_dims) log.debug("Gathering CoupledParamDim objects ...") # Also collect the coupled ParamDims; continue with the same procedure cpdims = recursive_collect( self._dict, select_func=lambda p: isinstance(p, CoupledParamDim), prepend_info=("info_func", "keys"), info_func=lambda p: p.order, stop_recursion_types=(ParamDimBase,), ) # Sort and ditch the order, same as with regular ParamDims # Note: sorting is not as crucial here because coupled dims do not # change the iteration order through state space cpdims.sort() cpdims = [tpl[1:] for tpl in cpdims] # Now store them, equivalent to how the regular dimensions were stored self._cdims_by_loc = OrderedDict(cpdims) self._cdims = OrderedDict(self._unique_dim_names(cpdims)) # Now resolve the coupling targets and add them to CoupledParamDim # instances. Also, let the target ParamDim objects know which # CoupledParamDim couples to them for cpdim_key, cpdim in self.coupled_dims.items(): # Try to get the coupling target by name try: c_target = self._get_dim(cpdim.target_name) except (KeyError, ValueError) as err: # Could not find that name _dim_info = self._parse_dims(mode="both") raise ValueError( f"Could not resolve the coupling target for " f"CoupledParamDim at {cpdim_key}. Check the " f"`target_name` specification of that entry " f"and the full traceback of this error.\n" f"Available parameter dimensions:\n{_dim_info}" ) from err # Set attribute of the coupled ParamDim cpdim.target_pdim = c_target # And inform the target ParamDim about it being the target of the # coupled param dim, if it is not already included there if cpdim not in c_target.target_of: c_target.target_of.append(cpdim) # Done with this coupling else: log.debug( "Found %d CoupledParamDim objects.", self.num_coupled_dims ) log.debug("Finished gathering.")
[docs] @staticmethod def _unique_dim_names( kv_pairs: Sequence[Tuple], ) -> List[Tuple[str, ParamDim]]: """Given a sequence of key-value pairs, tries to create a unique string representation of the entries, such that it can be used as a unique mapping from names to parameter dimension objects. Args: kv_pairs (Sequence[Tuple]): Pairs of (path, ParamDim), where the path is a Tuple of strings. Returns: List[Tuple[str, ParamDim]]: The now unique (name, ParamDim) pairs Raises: ValueError: For invalid names, i.e.: failure to find a unique representation. """ def unique(names: List[str]) -> bool: """Check for uniqueness of the given list of names""" return len(set(names)) == len(names) def collisions(names: List[str]) -> Set[Tuple[int]]: """For each name, find the collisons with other names and return a set of indicies that collide with other names, such that those names can be adjusted. """ def collide(a: str, b: str) -> bool: """Returns True if two names collide, with collisions defined as the following: * The shorter one is part of the longer one, seen from the back, e.g. ``foo`` vs ``spamfoo`` * The sorter one is part of the longer one, seen from the front, e.g. ``spamfoo`` vs ``spam`` """ L = min(len(a), len(b)) return (a[-L:] == b[-L:]) or (a[:L] == b[:L]) # First, determine colliding names for each combination colls = [ [j for j, other in enumerate(names) if collide(name, other)] for name in names ] # Filter out those entries that are only including themselves and # create a set, containing the colliding indices return {tuple(c) for c in colls if len(c) > 1} def join_path(path: Sequence[Union[str, int]]) -> str: """Joins a path sequence to a string, handling integer entries""" return ".".join([str(seg) for seg in path]) def initial_name(path: Sequence[Union[str, int]]) -> str: """Given a path sequence, returns an initial name, i.e. a guess for a good unique name. For purely key-based paths, simply start with the last path segment. For paths that contain some index-based access, start with a longer sequence that includes the name of the parent key. Examples: * ``foo.bar.baz.spam`` becomes ``spam`` * ``foo.bar.0.baz.1.spam`` becomes ``bar.0.baz.1.spam`` Args: path (Sequence[Union[str, int]]): The path sequence Returns: str: The joined path sequence that serves as initial name """ key_is_str = [isinstance(seg, str) for seg in path] if all(key_is_str): return path[-1] first_non_str = key_is_str.index(False) return join_path(path[max(0, first_non_str - 1) :]) # Extract paths and pdims paths = [("",) + path for path, _ in kv_pairs] plens = [len(p) for p in paths] pdims = [pdim for _, pdim in kv_pairs] # First, check the custom names pdim_names = [pdim.name for pdim in pdims if pdim.name] if any([not isinstance(name, str) for name in pdim_names]): raise TypeError( f"Custom parameter dimension names need to be strings, but at " f"least one of the custom names was not a string: {pdim_names}" ) elif any(["." in name for name in pdim_names]): raise ValueError( f"Custom parameter dimension names cannot contain the " f"hierarchy-separating character '.'! Please remove it from " f"the names it appears in: {pdim_names}" ) elif len(set(pdim_names)) != len(pdim_names): raise ValueError( f"There were duplicates among the manually set names of " f"parameter dimensions!\nList of names: {pdim_names}" ) # Set the custom names; with the others, determine an initial name # depending on the contents of the path sequence. names = [ pdim.name if pdim.name else initial_name(paths[cidx]) for cidx, pdim in enumerate(pdims) ] # Set a list of locks, which specifies which names are fixed and should # not change throughout the rest of the process. These are initialized # with locks for the explicitly given names. locks = [bool(pdim.name) for pdim in pdims] # With the remaining names, use path segments to generate a name, # starting in the back and adding more entries, if there are # collisions. By requiring at least one iteration, some pathological # cases can be resolved. i = 0 while i == 0 or not unique(names): # Go over the collisions and resolve them for colls in collisions(names): for cidx in colls: # Ignore those that are locked if locks[cidx]: continue # else: may change this name # Get the path segement, starting from the back path_seg = paths[cidx][-(i + 1) :] # Make sure the while loop has a break condition if i > max(plens): raise ValueError( f"Could not automatically find a unique string " f"representation for path {paths[cidx]}! You " f"should set a custom name for the parameter " "dimension." ) # Check there is no '.' in the (relevant!) path segement elif any(["." in str(seg) for seg in path_seg]): raise ValueError( f"A path segement of {path_seg} contains the '.' " f"character which interferes with automatically " f"creating unambiguous parameter dimension names. " f"Please select a custom name for the object at " f"path {paths[cidx]}." ) # If the resulting name would be shorter than the existing # one, discard it. This is to ensure that initial names # that were longer due to an index access segment are not # overwritten by the above path segment selection new_name = join_path(path_seg) if len(new_name) < len(names[cidx]): continue # All checks passed names[cidx] = new_name # Done with this iteration. Check for uniqueness again ... i += 1 # Generate the list of (name, ParamDim) tuples return list(zip(names, pdims))
[docs] def _get_dim(self, name: Union[str, Tuple[str]]) -> ParamDimBase: """Get the ParamDim object with the given name or location. Note that coupled parameter dimensions cannot be accessed via this method. Args: name (Union[str, Tuple[str]]): If a string, will look it up by that name, which has to match completely. If it is a tuple of strings, the location is looked up instead. Returns: ParamDimBase: the parameter dimension object Raises: KeyError: If the ParamDim could not be found ValueError: If the parameter dimension name was ambiguous """ if isinstance(name, str): try: return self._dims[name] except KeyError as err: _dim_info = self._parse_dims(mode="both") raise ValueError( f"A parameter dimension with name '{name}' was not found " f"in this ParamSpace. Available parameter dimensions:\n" f"{_dim_info}" ) from err # else: is assumed to be a path segement, i.e. a sequence of strings # Need to check whether the given key sequence suggests an abs. path is_abs = len(name) > 1 and name[0] == "" # Now go over the dimensions and try to find matching path segements pdim = None for path, _pdim in self.dims_by_loc.items(): if (not is_abs and name == path[-len(name) :]) or ( is_abs and name[1:] == path ): # Found one. if pdim is None: # Save it and continue to check for ambiguity pdim = _pdim continue # else: already set -> there was already one matching this name _dim_info = self._parse_dims(mode="both") raise ValueError( f"Could not unambiguously find a parameter dimension " f"matching the path segment {name}! " f"Pass a longer path segement to select the right " f"parameter dimension. To symbolize that the key sequence " f"should be regarded as absolute, start with an empty " f"string entry in the key sequence.\nAvailable parameter " f"dimensions:\n{_dim_info}" ) # If still None after all this, no such name was found if pdim is None: _dim_info = self._parse_dims(mode="both") raise ValueError( f"A parameter dimension matching location {name} was not " f"found in this ParamSpace. Available parameter dimensions:\n" f"{_dim_info}" ) return pdim
# Properties .............................................................. # Resolving a state . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def default(self) -> dict: """Returns the dictionary with all parameter dimensions resolved to their default values. If an object is Masked, it will resolve it. """ def get_unmasked_default(pdim): if isinstance(pdim.default, Masked): return pdim.default.value return pdim.default return recursive_replace( copy.deepcopy(self._dict), select_func=lambda v: isinstance(v, ParamDimBase), replace_func=get_unmasked_default, stop_recursion_types=(ParamSpace,), ) @property def current_point(self) -> dict: """Returns the dictionary with all parameter dimensions resolved to the values, depending on the point in parameter space at which the iteration is. Note that unlike .default, this does not resolve the value if it is Masked. """ return recursive_replace( copy.deepcopy(self._dict), select_func=lambda v: isinstance(v, ParamDimBase), replace_func=lambda pdim: pdim.current_value, stop_recursion_types=(ParamSpace,), ) # Dimensions: by names or locations . . . . . . . . . . . . . . . . . . . . @property def dims(self) -> Dict[str, ParamDim]: """Returns the ParamDim objects of this ParamSpace. The keys of this dictionary are the unique names of the dimensions, created during initialization.""" return self._dims @property def dims_by_loc(self) -> Dict[Tuple[str], ParamDim]: """Returns the ParamDim objects of this ParamSpace, keys being the paths to the objects in the dictionary. """ return self._dims_by_loc @property def coupled_dims(self) -> Dict[str, CoupledParamDim]: """Returns the CoupledParamDim objects of this ParamSpace. The keys of this dictionary are the unique names of the dimensions, created during initialization. """ return self._cdims @property def coupled_dims_by_loc(self) -> Dict[Tuple[str], CoupledParamDim]: """Returns the CoupledParamDim objects found in this ParamSpace, keys being the paths to the objects in the dictionary.""" return self._cdims_by_loc # Coordinates . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . @property def coords(self) -> Dict[str, tuple]: """Returns the coordinates of all parameter dimensions as dict. This does not include the coupled dimensions! As the coordinates are merely collected from the parameter dimensions, they may include Masked objects. Note that the coordinates are converted to lists to make interfacing with xarray.DataArray easier. """ return {name: list(pdim.coords) for name, pdim in self.dims.items()} @property def pure_coords(self) -> Dict[str, tuple]: """Returns the pure coordinates of all parameter dimensions as dict. This does not include the coupled dimensions! Unlike the .coords property, the pure coordinates are cleaned of any Masked values. Note that the coordinates are converted to lists to make interfacing with xarray.DataArray easier. """ return { name: list(pdim.pure_coords) for name, pdim in self.dims.items() } # TODO coupled coordinates? @property def current_coords(self) -> OrderedDict: """Returns the current coordinates of all parameter dimensions. This is a shortcut for the get_dim_values method without arguments. """ return self.get_dim_values() # Shape, volume, states . . . . . . . . . . . . . . . . . . . . . . . . . . @property def num_dims(self) -> int: """Returns the number of parameter space dimensions. Coupled dimensions are not counted here! """ return len(self.dims) @property def num_coupled_dims(self) -> int: """Returns the number of coupled parameter space dimensions.""" return len(self.coupled_dims) @property def volume(self) -> int: """Returns the active volume of the parameter space, i.e. not counting coupled parameter dimensions or masked values """ if self.num_dims == 0: return 0 vol = 1 for pdim in self.dims.values(): # Need to check whether a dimension is fully masked, in which case # the default value is used and the dimension length is 1 vol *= len(pdim) if pdim.mask is not True else 1 return vol @property def full_volume(self) -> int: """Returns the full volume, i.e. ignoring whether parameter dimensions are masked. """ if self.num_dims == 0: return 0 vol = 1 for pdim in self.dims.values(): vol *= pdim.num_values return vol @property def shape(self) -> Tuple[int]: """Returns the shape of the parameter space, not counting masked values of parameter dimensions. If a dimension is fully masked, it is still represented as of length 1, representing the default value being used. Returns: Tuple[int]: The iterator shape """ return tuple( len(pdim) if pdim.mask is not True else 1 for pdim in self.dims.values() ) @property def full_shape(self) -> Tuple[int]: """Returns the shape of the parameter space, ignoring masked values Returns: Tuple[int]: The shape of the fully unmasked iterator """ return tuple(pdim.num_values for pdim in self.dims.values()) @property def states_shape(self) -> Tuple[int]: """Returns the shape of the parameter space, including default states for each parameter dimension and ignoring masked ones. Returns: Tuple[int]: The shape tuple """ return tuple(pdim.num_states for pdim in self.dims.values()) @property def max_state_no(self) -> int: """Returns the highest possible state number""" if self.states_shape: return reduce(lambda x, y: x * y, self.states_shape) - 1 return 0 @property def state_vector(self) -> Tuple[int]: """Returns a tuple of all current parameter dimension states""" return tuple(s.state for s in self.dims.values()) @state_vector.setter def state_vector(self, vec: Tuple[int]): """Sets the state of all parameter dimensions""" if len(vec) != self.num_dims: raise ValueError( f"Given vector needs to be of same length as there are number " f"of dimensions ({self.num_dims}), was: {vec}" ) for (name, pdim), new_state in zip(self.dims.items(), vec): try: pdim.state = new_state except ValueError as err: raise ValueError( f"Could not set the state of parameter dimension {name} " f"to {new_state}!" ) from err log.debug("Successfully set state vector to %s.", vec) @property def state_no(self) -> Union[int, None]: """Returns the current state number by visiting the active parameter dimensions and querying their state numbers. """ return self._calc_state_no(self.state_vector) @state_no.setter def state_no(self, state_no: int): """Set the state number. This will first calculate the state vector from the number and then apply it. """ self.state_vector = self.get_state_vector(state_no=state_no) # Magic methods ...........................................................
[docs] def __eq__(self, other) -> bool: """Tests the equality of two ParamSpace objects.""" if not isinstance(other, ParamSpace): return False # Check for equality of the two objects' underlying __dict__s content, # skipping the caching attributes _smap and _iter # NOTE it is ok to not check these, because equality of the other # content asserts that the _smap attributes will be equal, too. return all( [ self.__dict__[k] == other.__dict__[k] for k in self.__dict__.keys() if k not in ("_smap", "_iter") ] )
[docs] def __str__(self) -> str: """Returns a parsed, human-readable information string""" return ( f"<{self.__class__.__name__} object at {id(self)} with " f"volume {self.volume}, shape {self.shape}>" )
[docs] def __repr__(self) -> str: """Returns the raw string representation of the ParamSpace.""" # TODO should actually be a string from which to re-create the object return "<paramspace.paramdim.{} object at {} with {}>" "".format( self.__class__.__name__, id(self), repr( dict( volume=self.volume, shape=self.shape, dims=self.dims, coupled_dims=self.coupled_dims, ) ), )
# TODO implement __format__ # Information .............................................................
[docs] def get_info_dict(self) -> dict: """Returns a dict with information about this ParamSpace object. The returned dict contains similar information as :py:meth:`~paramspace.paramspace.ParamSpace.get_info_str`. Furthermore, it uses only native data types (scalars, sequences, and mappings) such that it is easily serializable and usable in scenarios where the paramspace package is not available. .. note:: This information is not meant to fully recreate the ParamSpace object, but merely to provide essential metadata like the volume or shape of the parameter space and the coordinates of each of its dimensions. Raises: NotImplementedError: If any of the parameter dimensions is masked. """ def prepare_pdim_info( pdim: Union[ParamDim, ParamDimBase], *, name: str, keyseq: Tuple[str], ) -> dict: """Helper function to gather relevant ParamDim information""" if pdim.mask: raise NotImplementedError( "Retrieving information of ParamSpace objects with masked " "parameter dimensions is not yet possible." ) # NOTE This is a safety measure, as it is is currently unclear # how to clearly and robustly communicate a masked # parameter space via this metadata method. info = dict() info["name"] = name info["full_path"] = list(keyseq) info["values"] = list(pdim.values) if isinstance(pdim, CoupledParamDim): target_name = pdim.target_name if isinstance(target_name, str): info["target_name"] = target_name else: info["target_name"] = list(target_name) return info d = dict() # ParamSpace information d["shape"] = self.shape d["volume"] = self.volume d["num_dims"] = self.num_dims d["num_coupled_dims"] = self.num_coupled_dims # Information of individual ParamDim objects pdim_iter = zip(self.dims.items(), self.dims_by_loc.keys()) d["dims"] = [ prepare_pdim_info(pdim, name=name, keyseq=keyseq) for (name, pdim), keyseq in pdim_iter ] # Information of individual CoupledParamDim objects cpdim_iter = zip( self.coupled_dims.items(), self.coupled_dims_by_loc.keys() ) d["coupled_dims"] = [ prepare_pdim_info(cpdim, name=name, keyseq=keyseq) for (name, cpdim), keyseq in cpdim_iter ] return d
[docs] def get_info_str(self) -> str: """Returns a string that gives information about shape and size of this ParamSpace. """ # Gather lines in a list l = ["ParamSpace Information"] l += ["======================"] l += [""] # General information about the Parameter Space l += [f" Dimensions: {self.num_dims}"] l += [f" Coupled: {self.num_coupled_dims}"] l += [f" Shape: {self.shape}"] l += [f" Volume: {self.volume}"] l += [""] # ParamDim information l += ["Parameter Dimensions"] l += ["--------------------"] l += [ " (Dimensions further up in the list are iterated over less " "frequently)" ] l += [""] for name, pdim in self.dims.items(): l += [f" - {name}"] l += [f" {pdim.values}"] if pdim.mask is True: l += [f" fully masked -> using default: {pdim.default}"] l += [f" order: {pdim.order}"] l += [""] # CoupledParamDim information if self.num_coupled_dims: l += [""] l += ["Coupled Parameter Dimensions"] l += ["----------------------------"] l += [" (Move alongside the state of the coupled ParamDim)"] l += [""] for name, cpdim in self.coupled_dims.items(): l += [f" - {name}"] l += [f" Coupled to: {cpdim.target_name}"] # Add resolved target name, if it differs for pdim_name, pdim in self.dims.items(): if pdim is cpdim.target_pdim: # Found the coupling target object; get the full name resolved_target_name = pdim_name break if resolved_target_name != cpdim.target_name: l[-1] += f" [resolves to: {resolved_target_name}]" l += [f" Values: {cpdim.values}"] l += [""] return "\n".join(l)
[docs] def _parse_dims( self, *, mode: str = "names", join_str: str = " -> ", prefix: str = " * ", ) -> str: """Returns a multi-line string of dimension names or locations. This function is intended mostly for internal representation, thus defaulting to the longer join strings. """ if mode in ["names"]: lines = [n for n in self.dims.keys()] elif mode in ["locs"]: lines = [ join_str.join([str(s) for s in p]) for p in self.dims_by_loc.keys() ] elif mode in ["both"]: max_name_len = max(len(n) for n in self.dims) lines = [ "{name:>{w:d}} : {path:}".format( name=name, w=max_name_len, path=join_str.join([str(s) for s in path]), ) for name, path in zip( self.dims.keys(), self.dims_by_loc.keys() ) ] else: raise ValueError(f"Invalid mode: {mode}") # Create the multi-line string return "\n" + prefix + ("\n" + prefix).join(lines)
# YAML representation .....................................................
[docs] @classmethod def to_yaml(cls, representer, node): """In order to dump a ParamSpace as yaml, basically only the _dict attribute needs to be saved. It can be plugged into a constructor without any issues. However, to make the string representation a bit simpler, the OrderedDict is resolved to an unordered one. Args: representer (ruamel.yaml.representer): The representer module node (type(self)): The node, i.e. an instance of this class Returns: a yaml mapping that is able to recreate this object """ # Get the objects _dict d = copy.deepcopy(node._dict) # Recursively go through it and cast dict on all OrderedDict entries def to_dict(od: OrderedDict): for k, v in od.items(): if isinstance(v, OrderedDict): od[k] = to_dict(v) return dict(od) # Can now call the representer return representer.represent_mapping(cls.yaml_tag, to_dict(d))
[docs] @classmethod def from_yaml(cls, loader, node): """The default constructor for a ParamSpace object""" from .yaml_constructors import _pspace_constructor return _pspace_constructor(loader, node, Cls=cls)
# Dict access ............................................................. # This is a restricted interface for accessing dictionary items # It ensures that the ParamSpace remains in a valid state: items are only # returned by copy or, if popping them, it is ensured that the item was not # a parameter dimension.
[docs] def get(self, key, default=None): """Returns a _copy_ of the item in the underlying dict""" return copy.deepcopy(self._dict.get(key, default))
[docs] def pop(self, key, default=None): """Pops an item from the underlying dict, if it is not a ParamDim""" item = self._dict.get(key, None) if item in self.dims.values() or item in self.coupled_dims.values(): raise KeyError( f"Cannot remove item with key '{key}' as it is part of a " f"parameter dimension." ) return self._dict.pop(key, default)
# Iterator functionality ..................................................
[docs] def __iter__(self) -> dict: """Move to the next valid point in parameter space and return the corresponding dictionary. Returns: The current value of the iteration Raises: StopIteration: When the iteration has finished """ if self._iter is None: # Associate with the iterate function self._iter = self.iterator # Let generator yield and given the return value, check how to proceed return self._iter()
# NOTE the generator will also raise StopIteration once it ended
[docs] def iterator( self, *, with_info: Union[str, Tuple[str]] = None, omit_pt: bool = False, ) -> Generator[dict, None, None]: """Returns an iterator (more precisely: a generator) yielding all unmasked points of the parameter space. Iteration order depends on the ``order`` parameter, where smaller values of a parameter dimension will lead to more frequent iterations. To control which information is returned at each point, the ``with_info`` and `omit_pt` arguments can be used. By default, the generator will return a single dictionary for each iteration point. Note that an iteration is also possible for zero-volume parameter spaces, i.e. where no parameter dimensions were defined. Args: with_info (Union[str, Tuple[str]], optional): Can pass strings here that are to be returned as the second value. Possible values are: ``state_no``, ``state_vector``, ``state_no_str``, and ``current_coords``. To get multiple of them, add them to a tuple. omit_pt (bool, optional): If true, the current value is omitted and *only* the information tuple is returned. Returns: Generator[dict, None, None]: yields point after point of the ParamSpace and the corresponding information """ # Parse the with_info argument, making sure it is a tuple if isinstance(with_info, str): with_info = (with_info,) if self.volume > 0: log.debug( "Starting iteration over %d points in ParamSpace ...", self.volume, ) else: log.debug( "Starting iteration over zero-volume ParamSpace, i.e.: " "will return only the current state of the dict." ) # Prepare parameter dimensions: set them to state 0 for pdim in self.dims.values(): pdim.enter_iteration() # Yield the first state yield self._gen_iter_rv( self.current_point if not omit_pt else None, with_info=with_info ) # Now yield all the other states, while available. while self._next_state(): yield self._gen_iter_rv( (self.current_point if not omit_pt else None), with_info=with_info, ) else: log.debug("Iteration finished.") self.reset() return
[docs] def reset(self) -> None: """Resets the paramter space and all of its dimensions to the initial state, i.e. where all states are None. """ for pdim in self.dims.values(): pdim.reset() log.debug("Reset ParamSpace and ParamDims.")
[docs] def _next_state(self) -> bool: """Iterates the state of the parameter dimensions managed by this ParamSpace. Important: this assumes that the parameter dimensions already have been prepared for an iteration and that self.state_no == 0. Returns: bool: Returns False when iteration finishes """ # Iterate at least one parameter dimensions' state. # Do this in reverse such that the last dimensions are iterated over # most frequently. for pdim in reversed(self.dims.values()): try: pdim.iterate_state() except StopIteration: # Went through all states of this dim -> go to next dimension # and start iterating that (similar to the carry bit in # addition) # Important: prepare pdim such that it is at state zero again pdim.enter_iteration() continue else: # Iterated to next step without reaching the last dim item break else: # Loop went through # -> All states visited. # Now need to reset and communicate that iteration is finished; # do so by returning false, which is more convenient than # raising StopIteration; the iteration is handled by the # iterate method anyway. self.reset() return False # If this point is reached: broke out of loop # -> The next state was reached and we are not at the end yet. # Communicate that by returning True. return True
[docs] def _gen_iter_rv(self, pt, *, with_info: Sequence[str]) -> tuple: """Is used during iteration to generate the iteration return value, adding additional information if specified. Note that pt can also be None if iterate is a dry_run """ if not with_info: return pt # Parse the tuple and add information info_tup = tuple() for info in with_info: if info == "state_no": info_tup += (self.state_no,) elif info in ("state_no_str", "padded_state_no"): info_tup += ( "{sno:0{digs:d}d}" "".format( sno=self.state_no, digs=len(str(self.max_state_no)) ), ) elif info in ("state_vector", "state_vec"): info_tup += (self.state_vector,) elif info in ("current_coords", "coords"): info_tup += (self.current_coords,) else: raise ValueError( f"No such information '{info}' available. Check the " f"`with_info` argument!" ) # Return depending on whether a point was given or not if pt is not None: # Concatenate and return return (pt,) + info_tup elif len(info_tup) == 1: # Return only the single info entry return info_tup[0] # else: return as tuple return info_tup
# Mapping ................................................................. @property def state_map(self) -> "xr.DataArray": """Returns an inverse mapping, i.e. an n-dimensional array where the indices along the dimensions relate to the states of the parameter dimensions and the content of the array relates to the state numbers. Returns: xr.DataArray: A mapping of indices and coordinates to the state number. Note that it is not ensured that the coordinates are unique, so it _might_ not be possible to use location-based indexing. Raises: RuntimeError: If -- for an unknown reason -- the iteration did not cover all of the state mapping. Should not occur. """ import xarray as xr # Check if the cached result can be returned if self._smap is not None: log.debug("Returning previously cached inverse mapping ...") return self._smap # else: need to calculate the inverse mapping # Create empty n-dimensional array which will hold state numbers smap = np.ndarray(self.states_shape, dtype=int) smap.fill(-1) # i.e., not set yet # As .iterator does not allow iterating over default states, iterate # over the multi-index of the smap, which is equivalent to a valid # state vector, and get the corresponding state number for midx in np.ndindex(smap.shape): # Resolve the corresponding state number from the multi-index # (which is equivalent to a state vector) and store at this midx smap[tuple(midx)] = self._calc_state_no(midx) # Convert to DataArray smap = xr.DataArray( smap, dims=self.pure_coords.keys(), coords=self.pure_coords.values(), ) # Cache and make it read-only before returning log.debug( "Finished creating inverse mapping. Caching it and making " "the cache read-only ..." ) self._smap = smap self._smap.data.flags.writeable = False return self._smap @property def active_state_map(self) -> "xr.DataArray": """Returns a subset of the state map, where masked coordinates are removed and only the active coordinates are present. Note that this array has to be re-calculated every time, as the mask status of the ParamDim objects is not controlled by the ParamSpace and can change without notice. Also: the indices will no longer match the states of the dimensions! Values of the DataArray should only be accessed via the coordinates! Returns: xr.DataArray: A reduced state map which only includes active, i.e.: unmasked coordinates. """ # Work on a copy of the state map amap = self.state_map.copy() # Create a dict of (dimension names, indices to keep) indcs = { dim: [ i for i, coord in enumerate(coords) if not isinstance(coord, Masked) ] for dim, coords in self.coords.items() } # Apply the selection and return return amap.isel(indcs)
[docs] def get_state_vector(self, *, state_no: int) -> Tuple[int]: """Returns the state vector that corresponds to a state number Args: state_no (int): The state number to look for in the inverse mapping Returns: Tuple[int]: the state vector corresponding to the state number """ try: # Get it from the state map data ... vec = np.argwhere(self.state_map.data == state_no)[0] # Convert entries to integers, as they might be np.int64 ... return tuple(int(idx) for idx in vec) except IndexError as err: raise ValueError( f"Did not find state number {state_no} in inverse mapping! " f"Make sure it is an integer in the closed interval " f"[0, {reduce(lambda x, y: x * y, self.states_shape) - 1}]." )
[docs] def get_dim_values( self, *, state_no: int = None, state_vector: Tuple[int] = None ) -> OrderedDict: """Returns the current parameter dimension values or those of a certain state number or state vector. """ if state_no is None and state_vector is None: # Return the current value return OrderedDict( [ (name, pdim.current_value) for name, pdim in self.dims.items() ] ) # Check that only one of the arguments was given if state_no is not None and state_vector is not None: raise TypeError( "Expected only one of the arguments `state_no` " "and `state_vector`, got both!" ) elif state_no is not None: state_vector = self.get_state_vector(state_no=state_no) # Can now assume that state_vector variable (not the property!) is set return OrderedDict( [ (name, pdim.coords[s]) for (name, pdim), s in zip(self.dims.items(), state_vector) ] )
[docs] def _calc_state_no(self, state_vector: Tuple[int]) -> int: log.debug("Calculating state number from state vector ...") # Use the given state vector log.debug(" state vector: %s", state_vector) # Now need the full shape of the parameter space, i.e. ignoring masked # values but including the default values states_shape = self.states_shape log.debug( " states shape: %s (volume: %s)", states_shape, reduce(lambda x, y: x * y, states_shape) if states_shape else 0, ) # The lengths will now be used to calculate the multipliers, where the # _last_ dimension will have the multiplier 1. # For example, given lengths [ 5, 20, 10, 10], the corresponding # multipliers are: [2000, 100, 10, 1] mults = [ reduce(lambda x, y: x * y, states_shape[i + 1 :], 1) for i in range(self.num_dims) ] log.debug(" multipliers: %s", mults) # Now, calculate the state number state_no = sum((s * m) for s, m in zip(state_vector, mults)) log.debug(" state no: %s", state_no) return state_no
# Masking .................................................................
[docs] def set_mask( self, name: Union[str, Tuple[str]], mask: Union[bool, Tuple[bool]], invert: bool = False, ) -> None: """Set the mask value of the parameter dimension with the given name. Args: name (Union[str, Tuple[str]]): the name of the dim, which can be a tuple of strings or a string. If name is a string, it will be converted to a tuple, regarding the '/' character as splitting string. The tuple is compared to the paths of the dimensions, starting from the back; thus, not the whole path needs to be given, it just needs to be enough to resolve the dimension names unambiguously. For names at the root level that could be ambiguous, a leading "/" in the string argument or an empty string in the tuple-form of the argument needs to be set to symbolise the dimension being at root level. Also, the ParamDim's custom name attribute can be used to identify it. mask (Union[bool, Tuple[bool]]): The new mask values. Can also be a slice, the result of which defines the True values of the mask. invert (bool, optional): If set, the mask will be inverted _after_ application. """ # Resolve the parameter dimension pdim = self._get_dim(name) # Set its mask value pdim.mask = mask if invert: pdim.mask = [(not m) for m in pdim.mask_tuple] # Done. log.debug("Set mask of parameter dimension %s to %s.", name, pdim.mask)
[docs] def set_masks(self, *mask_specs) -> None: """Sets multiple mask specifications after another. Note that the order is maintained and that sequential specifications can apply to the same parameter dimensions. Args: *mask_specs: Can be tuples/lists or dicts which will be unpacked (in the given order) and passed to :py:meth:`.set_mask` """ log.debug("Setting %d masks ...", len(mask_specs)) for ms in mask_specs: if isinstance(ms, dict): self.set_mask(**ms) else: self.set_mask(*ms)
# TODO consider using the xarray interface here? i.e.: sel and isel
[docs] def activate_subspace( self, *, allow_default: bool = False, reset_all_others: bool = True, **selector, ) -> None: """Selects a subspace of the parameter space and makes only that part active for iteration. This is a wrapper around set_mask, implementing more arguments and also checking if any dimension is reduced to a default value, which might cause problems elsewhere. Args: allow_default (bool, optional): If True, a ValueError is raised when any of the dimensions is completely masked or when the index 0 is used during selecting of a mask. reset_all_others (bool, optional): If True, resets all masks before activating the subspace. If False, the previously applied masks are untouched. **selector: A dict specifying the *active* states. A key of the key-value pairs should be the name of the dimension, the value should be a dict with one of the following keys: - idx: to select by index - loc: to select by coordinate values - ``**tol_kwargs``: passed on to ``np.isclose`` when comparing coordinate values. Non-sequence values will be put into lists. Alternatively, slices can be specified, which are applied on the list of all available indices or coordinates, respectively. As a shorthand, not specifying a dict but directly a list or a slice defaults to ``loc``-behaviour. Raises: ValueError: If totally masking a parameter dimension """ def calc_mask(name, *, idx=None, loc=None, **tol_kwargs) -> List[bool]: """Calculates the mask to use such that the given indices or locations are _un_masked. The ``tol_kwargs`` are passed on to ``np.isclose`` for cases where a coordinate is selected by ``loc``. TODO This should be outsourced! """ def contains_close(a, seq, **tol_kwargs) -> bool: """Whether ``a`` is contained in ``seq`` when comparing a numeric-typed ``a`` via ``np.isclose`` rather than ``==``. For non-numeric types, the regular ``__contains__`` is used. NOTE: The decision is made via the type of ``a`` """ if isinstance(a, (float, int)): try: return any( [np.isclose(a, v, **tol_kwargs) for v in seq] ) except TypeError as err: raise TypeError( f"Could not ascertain whether {a} is contained in " f"{seq}! This is probably due to values of " f"numeric type being mixed with non-numeric ones. " f"Check the definition of your parameter " f"dimensions." ) from err return a in seq if idx is not None and loc is not None: raise ValueError( "Only accepting _either_ of the arguments " "`idx` and `loc`, but got both!" ) pdim = self._get_dim(name) # Distinguish idx and loc if idx is not None: if isinstance(idx, slice): # Apply it to the list of possible indices idcs = list(range(1, 1 + pdim.num_values))[idx] # Done. else: # Indices explicitly given. # Only need to check for invalid values if not isinstance(idx, (list, tuple)): idx = [idx] if 0 in idx: raise IndexError( "Encountered index 0 in list of " "indices to be selected! This is an " "invalid value when selecting a " "subspace, as that index corresponds " "to the default state of a parameter " "dimension; indices for iteration " "values start at 1!" ) elif max(idx) > pdim.num_values: raise IndexError( f"Given indices {idx} contained a value that " f"exceeds the highest index, {pdim.num_values}!" ) elif len(set(idx)) != len(idx): raise ValueError( f"Given indices {idx} contained at least " f"one duplicate element!" ) # Everything ok. idcs = idx elif loc is not None: # Get the coordinates (without the default, thus +1s below) coords = pdim.pure_coords[1:] if isinstance(loc, slice): # From the slice, extract start, stop and step start = loc.start if loc.start is not None else -np.inf stop = loc.stop if loc.stop is not None else +np.inf # Filter out those that are not within start, stop idcs = [ (idx + 1) for idx, val in enumerate(coords) if start <= val < stop ] # If a step was given, apply it in a second step if loc.step is not None: idcs = idcs[slice(None, None, loc.step)] # Done. else: # Got a list of explicit coordinates to use. # Only need to make a few checks. if not isinstance(loc, (list, tuple)): loc = [loc] if any([not contains_close(val, coords) for val in loc]): raise KeyError( f"At least one of the labels in {loc} is not " f"available as coordinate of this parameter " f"dimension, {coords}!" ) elif len(set(loc)) != len(loc): raise ValueError( f"Given labels {loc} contained at least " f"one duplicate item!" ) # Everything ok. Get the indices. Iterate over coordinates # rather than loc in order to ascertain the correct order # and have the indices available. The checks above make # sure that this is no issue. idcs = [ (idx + 1) for idx, val in enumerate(coords) if contains_close(val, loc) ] else: raise ValueError( "Missing one of the required keyword " "arguments `idx` or `loc`!" ) # Given the indices, create and return the mask return [bool(i not in idcs) for i in range(1, 1 + pdim.num_values)] # Determine whether to reset all masks if reset_all_others: for dim_name in self.dims.keys(): self.set_mask(dim_name, False) # Calculate all the masks masks = { k: calc_mask(k, **v) if isinstance(v, dict) else calc_mask(k, loc=v) for k, v in selector.items() } log.debug("Calculated masks: %s", masks) # Apply the masks, checking if it would result in defaulting for dim_name, mask in masks.items(): if not allow_default and all(mask): raise ValueError( f"With the given selector, parameter " f"dimension '{dim_name}' would be totally masked, " f"thus resulting in shifting to its default " f"state in iteration. If you want to permit " f"this, set the allow_default argument.\n" f"Selector:\n{selector}" ) # Everything ok, set the mask now. self.set_mask(dim_name, mask) log.debug( "Selected subspace. New volume: %d, shape: %s.", self.volume, self.shape, )