########################################
# MIT License
#
# Copyright (c) 2020 Miguel Ramos Pernas
########################################
'''
Functions and classes to handle sets of data.
'''
from ..base import core
from ..base import data_types
from ..base import exceptions
from ..base import parameters
from ..backends.arrays import darray
from ..backends.core import parse_backend
import logging
import numpy as np
__all__ = ['DataObject', 'DataSet', 'BinnedDataSet']
# Names of different data types
BINNED = 'binned'
UNBINNED = 'unbinned'
logger = logging.getLogger(__name__)
[docs]class DataObject(object, metaclass=core.DocMeta):
_sample_type = None # : Sample type (*binned*/*unbinned*)
def __init__(self, pars, backend):
'''
Abstract class for data objects.
:param pars: data parameters.
:type pars: Registry(Parameter)
:param backend: backend where this object is built.
:type backend: Backend
'''
self.__data_pars = pars
self.__aop = backend.aop
super().__init__()
@property
def aop(self):
'''
Object to do operations on arrays.
:type: ArrayOperations
'''
return self.__aop
@property
def backend(self):
'''
Backend interface.
:type: Backend
'''
return self.__aop.backend
@property
def data_pars(self):
'''
Data parameters associated to this sample.
:type: Registry(Parameter)
'''
return self.__data_pars
@property
def ndim(self):
'''
Number of dimensions.
:type: int
'''
return data_types.cpu_int(len(self.__data_pars))
[docs] def to_backend(self, backend):
'''
Initialize this class in a different backend.
:param backend: new backend.
:type backend: Backend
:returns: This class in the new backend.
'''
raise exceptions.MethodNotDefinedError(self.__class__, 'to_backend')
[docs]class DataSet(DataObject):
_sample_type = UNBINNED # : *unbinned*
def __init__(self, data, pars, weights=None):
'''
Definition of an unbinned data set to evaluate PDFs.
:param data: data in the provided backend.
:type data: darray
:param pars: data parameters.
:type pars: Registry(Parameter)
:param weights: possible set of weights.
:type weights: darray or None
'''
super().__init__(pars, data.aop.backend)
self.__data = data
self.__weights = weights
def __getitem__(self, var):
'''
Get the array of data for the given parameter.
:param var: name of the data parameter.
:type var: str
:returns: Data array.
:rtype: numpy.ndarray
'''
return self.__data.take_column(self.data_pars.index(var))
def __len__(self):
'''
Get the size of the sample.
:returns: Size of the sample.
:rtype: int
'''
if self.__data is not None:
return len(self.__data)
else:
return data_types.cpu_int(0)
@property
def values(self):
'''
Values of the data set.
:type: darray
'''
return self.__data
@property
def weights(self):
'''
Weights of the sample.
:type: darray or None
'''
return self.__weights
[docs] @classmethod
def from_ndarray(cls, arr, arg, weights=None, backend=None):
'''
Build the class from a single array.
:param arr: array of data.
:type arr: numpy.ndarray
:param arg: if *arr* only contains one set of values, it must be
a single data parameter. Otherwise a collection of parameters.
:type arg: Registry(Parameter)
:param weights: possible weights to use.
:type weights: numpy.ndarray or None
:param backend: backend where the data set is built.
:type backend: Backend or None
'''
aop = parse_backend(backend)
data = darray.from_ndarray(arr, aop.backend)
if weights is not None:
weights = darray.from_ndarray(weights, aop.backend)
if data.ndim > 1:
return cls(data, parameters.Registry(arg), weights)
else:
return cls(data, parameters.Registry([arg]), weights)
[docs] @classmethod
def from_records(cls, arr, data_pars, weights=None, backend=None):
'''
Build the class from a :class:`numpy.ndarray` object.
:param arr: array of data.
:type arr: numpy.ndarray
:param data_pars: data parameters.
:type data_pars: Registry(Parameter)
:param weights: possible weights to use.
:type weights: numpy.ndarray or None
:param backend: backend where the data set is built.
:type backend: Backend or None
:raises RuntimeError: If a parameter is not found in the input array.
'''
aop = parse_backend(backend)
arrs = np.empty((len(arr), len(data_pars)))
for i, p in enumerate(data_pars):
if p.name not in arr.dtype.names:
raise RuntimeError(
f'No data for parameter "{p.name}" has been found in the input array')
arrs[:, i] = arr[p.name]
data = darray.from_ndarray(arrs, aop.backend)
if weights is not None:
weights = darray.from_ndarray(weights, aop.backend)
return cls(data, data_pars, weights)
[docs] def get(self, index):
'''
Get the values given an index.
:param index: index to process.
:type index: int
:returns: Values at the index.
:rtype: numpy.ndarray
'''
return self.__data.get(index)
[docs] def make_binned(self, bins=100):
'''
Make a binned version of this sample.
:param bins: number of bins per data parameter.
:type bins: int or tuple(int, ...)
:returns: Binned data sample.
:rtype: BinnedDataSet
'''
bins = data_types.array_int(bins)
if bins.ndim > 0:
edges = self.aop.concatenate(
tuple(self.aop.linspace(*p.bounds, b + 1) for p, b in zip(self.data_pars, bins)))
gaps = bins
else:
edges = self.aop.concatenate(
tuple(self.aop.linspace(*p.bounds, bins + 1) for p in self.data_pars))
gaps = data_types.full_int(len(self.data_pars), bins)
gaps = np.cumprod(gaps) // gaps[0]
indices = edges_indices(gaps, edges)
values = self.aop.sum_inside(
indices, gaps, self.__data, edges, self.__weights)
return BinnedDataSet(edges, gaps, self.data_pars, values)
[docs] @classmethod
def merge(cls, samples, maximum=None):
'''
Merge many samples into one. If *maximum* is specified, then the last elements will
be dropped.
:param samples: samples to merge.
:type samples: tuple(DataSet)
:param maximum: maximum number of entries for the final sample.
:type maximum: int or None
:returns: Merged sample.
:rtype: DataSet
:raise RuntimeError: If the samples have different parameters or if
only some of them have weights.
... warning:: If *maximum* is specified, the last elements corresponding to the
last samples might be dropped.
'''
ns = np.sum(data_types.fromiter_int(map(len, samples)))
if maximum is not None and maximum > ns:
logger.warning(
'Specified a maximum length that exceeds the sum of lengths of the samples to merge; set to the latter')
maximum = None
data_pars = samples[0].data_pars
aop = samples[0].aop
for s in samples[1:]:
if len(data_pars) != len(s.data_pars) or not all(p in s.data_pars for p in data_pars):
raise RuntimeError(
'Attempt to merge samples with different data parameters')
mw = (samples[0].weights is None)
if any(map(lambda s: (s.weights is None) != mw, samples[1:])):
raise RuntimeError(
'Attempt to merge samples with and without weights')
data = aop.concatenate(
tuple(s.values for s in filter(lambda s: s.values is not None, samples)))
if not mw:
weights = aop.concatenate(tuple(s.weights for s in filter(
lambda s: s.weights is not None, samples)))
else:
weights = None
if maximum is not None:
data = aop.restrict_data_size(maximum, data)
if weights is not None:
weights = weights[:maximum]
return cls(data, data_pars, weights)
[docs] def subset(self, arg, rescale_weights=False):
r'''
Get a subset of this data set. If *arg* is a string, it will be
considered as a range. In case it is a :class:`barray`, then it is
considered to be a mask array. If *rescale_weights* is set to True, then
the weights are rescaled so their statistical weight in minimization
processes is proportional to the event weights:
.. math::
\omega^\prime_i = \omega_i \times \frac{\sum_{j = 0}^n \omega_j}{\sum_{j = 0}^n \omega_j^2}
:param arg: argument to create the subset.
:type arg: str or barray
:param rescale_weights: whether to rescale the sample weights.
:type rescale_weights: bool
:returns: New data set.
:rtype: DataSet
'''
if np.asarray(arg).dtype.kind == np.dtype(str).kind:
use_range = True
else:
use_range = False
if use_range:
cond = self.aop.bzeros(len(self))
elif arg is None:
cond = self.aop.bones(len(self))
else:
cond = arg
if use_range:
bounds = parameters.bounds_for_range(self.data_pars, arg)
if len(bounds) == 1:
cond |= self.aop.is_inside(self.__data, *bounds[0])
else:
for lb, ub in bounds:
cond |= self.aop.is_inside(self.__data, lb, ub)
data = self.__data.slice(cond)
if self.__weights is not None:
weights = self.__weights.slice(cond)
else:
weights = self.__weights
if weights is not None and rescale_weights:
weights = rescale_weights_sw2(weights)
return self.__class__(data, self.data_pars, weights)
[docs] def to_backend(self, backend):
data = self.__data.to_backend(backend)
if self.__weights is not None:
weights = self.__weights.to_backend(backend)
else:
weights = None
return self.__class__(data, self.data_pars, weights)
[docs] def to_records(self):
'''
Convert this class into a :class:`numpy.ndarray` object.
:returns: This object as a a :class:`numpy.ndarray` object.
:rtype: numpy.ndarray
'''
out = np.empty(len(self), dtype=[(p.name, data_types.cpu_float)
for p in self.data_pars])
data = self.__data.as_ndarray()
for i, p in enumerate(self.data_pars):
out[p.name] = data[i::self.ndim]
return out
[docs]class BinnedDataSet(DataObject):
_sample_type = BINNED # : *binned*
def __init__(self, edges, gaps, pars, values):
'''
A binned data set.
:param edges: edges of the bins.
:type edges: darray
:param gaps: gaps between edges belonging to different parameters.
:type gaps: numpy.ndarray
:param pars: data parameters.
:type pars: Registry(Parameter)
:param values: values of the data for each center.
:type values: darray
'''
super().__init__(pars, edges.aop.backend)
self.__edges = edges
self.__gaps = data_types.array_int(gaps)
self.__values = values
# The gaps refer to the bins, not to the edges
g = self.edges_indices
bounds = data_types.empty_float(2 * len(pars))
bounds[0::2] = [self.__edges.get(i) for i in g[:-1]]
bounds[1::2] = [self.__edges.get(i - 1) for i in g[1:]]
self.__bounds = bounds
def __getitem__(self, var):
'''
Get the centers of the bins for the given parameter.
:param var: name of the data parameter.
:type var: str
:returns: Centers of the bins.
:rtype: darray
'''
i = self.data_pars.index(var)
e = self.edges_indices
return self.__edges.take_slice(e[i], e[i + 1])
def __len__(self):
'''
Get the size of the sample.
:returns: Size of the sample.
:rtype: int
'''
return len(self.__values)
@property
def bounds(self):
'''
Bounds of each data parameter.
:type: numpy.ndarray
'''
return self.__bounds
@property
def edges(self):
'''
Edges of the histogram.
:type: darray
'''
return self.__edges
@property
def edges_indices(self):
'''
Indices to access the edges.
:type: numpy.ndarray
'''
return edges_indices(self.__gaps, self.__edges)
@property
def gaps(self):
'''
Gaps among the different edges.
:type: numpy.ndarray
'''
return self.__gaps
@property
def values(self):
'''
Values of the data set.
:type: darray
'''
return self.__values
[docs] @classmethod
def from_ndarray(cls, edges, data_par, values, backend=None):
'''
Build the class from the array of edges and values.
:param edges: edges of the bins.
:type edges: numpy.ndarray
:param data_par: data parameter.
:type data_par: Parameter
:param values: values at each bin.
:type values: numpy.ndarray
:param backend: backend where the data set is built.
:type backend: Backend or None
:returns: Binned data set.
:rtype: BinnedDataSet
'''
aop = parse_backend(backend)
edges = darray.from_ndarray(edges, aop.backend)
values = darray.from_ndarray(values, aop.backend)
return cls(edges, [1], parameters.Registry([data_par]), values)
[docs] def to_backend(self, backend):
edges = self.__edges.to_backend(backend)
values = self.__values.to_backend(backend)
return self.__class__(edges, self.__gaps, self.__data_pars, values)
def adaptive_grid(aop, data_par, edges, nsteps):
'''
Create a data grid where *nsteps* values are generated for each bounds in
*edges*.
:param aop: instance to do array operations.
:type aop: ArrayOperations
:param data_par: data parameter.
:type data_par: Parameter
:param edges: edges defining the bins.
:type edges: darray
:param nsteps: number of steps to consider within each set of bounds.
:type nsteps: int
:returns: Grid adapted to the given steps.
:rtype: DataSet
'''
values = aop.concatenated_linspace(edges, nsteps)
return DataSet(values, [data_par])
def edges_indices(gaps, edges):
'''
Calculate the indices to access the first element and
that following to the last of a list of edges.
:param gaps: gaps used to address the correct edges from a common array.
:type gaps: numpy.ndarray
:param edges: common array of edges.
:type edges: numpy.ndarray
:returns: Array of indices.
:rtype: numpy.ndarray
'''
g = data_types.empty_int(len(gaps) + 1)
g[0], g[-1] = 0, len(edges)
g[1:-1] = np.cumsum(np.arange(1, len(gaps)) + gaps[1:] // gaps[:-1])
return g
def evaluation_grid(aop, data_pars, bounds, size):
'''
Create a grid of points to evaluate a :class:`PDF` object.
:param data_pars: data parameters.
:type data_pars: list(Parameter)
:param size: number of entries in the output sample per set of bounds. This
means that *size* entries will be generated for each pair of (min, max)
provided, that is, per data parameter.
:type size: int or tuple(int)
:param bounds: bounds of the different data parameters. Even indices for
the lower bounds, and odd indices for the upper bounds.
:type bounds: numpy.ndarray
:returns: Uniform sample.
:rtype: DataSet
'''
if bounds.shape == (2,):
data = aop.linspace(*bounds, size)
else:
if np.asarray(size).ndim == 0:
size = data_types.full_int(len(bounds[0]), size)
data = aop.meshgrid(*bounds, size)
return DataSet(data, data_pars)
def rescale_weights_sw2(weights):
r'''
Rescale the given weights so the statistical power is treated properly in
:math:`\log\mathcal{L}` FCNs. This is done using the following equation:
.. math:: \omega^\prime_i = \omega_i \times \frac{\sum_{j = 0}^n \omega_j}{\sum_{j = 0}^n \omega_j^2}
:param weights: weights to rescale.
:type weights: numpy.ndarray
:returns: rescaled weights.
:rtype: numpy.ndarray
'''
return weights * weights.sum() / (weights**2).sum()
def uniform_sample(aop, data_pars, bounds, size):
'''
Generate a sample following an uniform distribution in all data parameters.
:param data_pars: data parameters.
:type data_pars: Registry(Parameter)
:param size: number of entries in the output sample.
:type size: int
:param bounds: bounds where to create the sample.
:type bounds: tuple(float, float)
:returns: Uniform sample.
:rtype: DataSet
'''
if bounds.shape == (2,):
data = aop.random_uniform(*bounds, size)
else:
data = aop.random_grid(*bounds, size)
return DataSet(data, data_pars)