"""
This module contains functions to perform varius operation over data.
The contents of the module can be used separately,
but they are also used used by the benchmarking notebooks.
Notes
-----
This module has been created essentially to clean-up/refactor the
notebooks.
The implementation of functions and classes is far from perfect and
we should really try to synchronize in some way with ctaplot/ctabenchmarks.
"""
__all__ = [
"compute_weight_BTEL1010",
"add_BTEL1010_weigths_to_data",
"average_bias_of_charge_resolution",
"calculate_RMS_around_1",
"prepare_requirements",
"compute_resolution",
"compute_bias",
"get_evt_subarray_model_output",
"sum_of_squares",
"OnlineBinnedStats",
"create_lookup_function",
"compute_psf",
"load_tel_id",
]
from pathlib import Path
import astropy.units as u
import numpy as np
from astropy.table import Column, Table
from scipy.interpolate import RectBivariateSpline
from scipy.stats import binned_statistic
try:
from ctapipe.io import read_table
except ImportError:
from ctapipe.io.astropy_helpers import h5_table_to_astropy as read_table
[docs]def compute_weight_BTEL1010(true_energy, simtel_spectral_slope=-2.0):
"""Compute the weight from requirement B-TEL-1010-Intensity-Resolution.
Parameters
----------
true_energy: array_like
simtel_spectral_slope: float
Spectral slope from the simulation.
"""
target_slope = -2.62 # spectral slope from B-TEL-1010
spec_slope = simtel_spectral_slope
# each pixel of the same image (row of data table) needs the same weight
weight = np.power(true_energy / 200.0, target_slope - spec_slope)
return weight
[docs]def add_BTEL1010_weigths_to_data(data, subarray=None):
"""Update data table with weights from requirement B-TEL-1010-Intensity-Resolution."""
# and add B-TEL-1010 weights
for tel_type in sorted(
subarray.telescope_types, key=lambda t: -t.optics.equivalent_focal_length
):
true_energies = data[str(tel_type)]["true_energy"].to(u.GeV)
w = compute_weight_BTEL1010(true_energies)
n_pixels = tel_type.camera.geometry.n_pixels
weights = Column([np.repeat(w[i], n_pixels) for i in range(len(w))])
# each pixel gets its weight
data[str(tel_type)]["weights_B-TEL-1010"] = weights
return data
[docs]def average_bias_of_charge_resolution(
x_bin_edges, y_bin_edges, hist, min_phe=50, max_phe=500
):
"""Calculate the average bias of charge resolution.
Default limits are in true photoelectrons and chosen to be safely
away from saturation and NSB noise.
Parameters
----------
x_bin_edges : 1D array
Bin edges in true photoelectrons.
y_bin_edges : 1D array
Bin edges in reconstructed/true photoelectrons.
hist : 2D array
The full histogram of reconstructed/true against true photoelectrons.
Returns
-------
bias : float
Average bias of charge resolution from 50 to 500 true photoelectrons.
"""
min_edge_index = np.digitize(np.log10(min_phe), x_bin_edges) - 1
max_edge_index = np.digitize(np.log10(max_phe), x_bin_edges)
proj = np.zeros(600)
for i in range(min_edge_index, max_edge_index + 1):
proj = proj + hist[i]
y_bin_centers = 0.5 * (y_bin_edges[1:] + y_bin_edges[:-1])
bias = 1.0 / np.average(y_bin_centers, weights=proj)
return bias
[docs]def calculate_RMS_around_1(values, weights):
"""Root Mean Square around 1 as proposed from comparison with CTA-MARS.
The input values are vertical slices of the 2D histogram showing the bias-corrected charge resolution.
Parameters
----------
values : 1D array
Values in reconstructed / true photoelectrons corrected for average bias.
weights : 1D array
Counts in a cell from the weigthed histogram.
Returns
-------
rms : float
Root Mean Square of around 1 for a vertical slice.
"""
average = np.average(values, weights=weights)
variance = np.average((values - average) ** 2, weights=weights)
standard_deviation = np.sqrt(variance)
a = np.power(standard_deviation, 2)
b = np.power(average - 1, 2)
rms = np.sqrt(a + b)
return rms
[docs]def prepare_requirements(input_directory, site, obs_time):
"""Prepare requirements data as a dictionary.
Parameters
----------
input_directory : str or pathlib.Path
Directory where the requirements files are stored.
site: str
Site identifier as in the name of the requirement files
obs_time: int
Observation time as in the name of the requirement files
Returns
-------
requirements: dict
Extracted requirements organized by benchmark.
"""
requirements_input_filenames = {
"sens": f"/{site}-{obs_time}.dat",
"AngRes": f"/{site}-{obs_time}-AngRes.dat",
"ERes": f"/{site}-{obs_time}-ERes.dat",
}
requirements = {}
for key in requirements_input_filenames.keys():
requirements[key] = Table.read(
Path(input_directory) / requirements_input_filenames[key], format="ascii"
)
requirements["sens"].add_column(
Column(data=(10 ** requirements["sens"]["col1"]), name="ENERGY")
)
requirements["sens"].add_column(
Column(data=requirements["sens"]["col2"], name="SENSITIVITY")
)
return requirements
[docs]def compute_resolution(
x_bin_edges, reco, true, mask=None, statistic=lambda x: np.percentile(np.abs(x), 68)
):
"""Compute a resolution as a binned statistic."""
resolution = binned_statistic(
np.log10(true[mask]),
reco / true[mask] - 1,
statistic=statistic,
bins=x_bin_edges,
)
return resolution
[docs]def compute_bias(x_bin_edges, reco, true, mask=None, statistic="median"):
"""Compute bias as a binned statistic."""
bias = binned_statistic(
np.log10(true[mask]),
reco[mask] / true[mask] - 1,
statistic=statistic,
bins=x_bin_edges,
)
return bias
[docs]def get_evt_subarray_model_output(
data,
weight_name="reco_energy_tel_weigth",
keep_cols=["reco_energy"],
model_output_name="reco_energy_tel",
model_output_name_evt="reco_energy",
):
"""
Returns DataStore with keepcols + score/target columns of model at the
level-subarray-event.
Parameters
----------
data: `~pandas.DataFrame`
Data frame
weight_name: `str`
Variable name in data frame to weight events with
keep_cols: `list`, optional
List of variables to keep in resulting data frame
model_output_name: `str`, optional
Name of model output (image level)
model_output_name_evt: `str`, optional
Name of averaged model output (shower level)
Returns
-------
data: `~pandas.DataFrame`
Data frame
"""
keep_cols += [model_output_name]
keep_cols += [weight_name]
new_data = data[keep_cols].copy(deep=True)
new_data[model_output_name_evt] = np.zeros(len(new_data))
new_data.set_index(["tel_id"], append=True, inplace=True)
new_data[model_output_name_evt] = new_data.groupby(["obs_id", "event_id"]).apply(
lambda g: np.average(g[model_output_name], weights=g[weight_name])
)
# Remove columns
if (
model_output_name != "reco_energy_tel"
): # we want to keep the telescope-wise energy (might keep also gammaness in the future)
new_data = new_data.drop(columns=[model_output_name])
# Remove duplicates
new_data = new_data[~new_data.index.duplicated(keep="first")]
return new_data
[docs]def sum_of_squares(x):
x = np.asanyarray(x)
if len(x) == 0:
return 0
mean = x.mean()
return np.sum((x - mean) ** 2)
[docs]class OnlineBinnedStats:
"""Class to dynamically compute one-dimensional binned statistics.
Parameters
----------
bin_edges: array-like
Values which define the edges of the bins to use.
Notes
-----
This is an implementation of the Welford's online algorithm
(see [1]_ and references therein).
References
----------
.. [1] https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
"""
def __init__(self, bin_edges):
self.bin_edges = bin_edges
self.n_bins = len(bin_edges) - 1
self.n = np.zeros(self.n_bins)
self._mean = np.zeros(self.n_bins)
self._m2 = np.zeros(self.n_bins)
[docs] def update(self, x, values):
"""Update the mean and (estimated) variance of the sequence."""
n = binned_statistic(x, values, "count", self.bin_edges).statistic
mean = binned_statistic(x, values, "mean", bins=self.bin_edges).statistic
m2 = binned_statistic(x, values, sum_of_squares, bins=self.bin_edges).statistic
# empty bins are nan, but we need 0
empty = n == 0
mean[empty] = 0
m2[empty] = 0
n_total = self.n + n
delta = self._mean - mean
v = n_total > 0 # to avoid dividing by 0 and remove more NaNs
self._mean[v] = (self.n[v] * self._mean[v] + n[v] * mean[v]) / n_total[v]
self._m2[v] += m2[v] + delta[v] ** 2 * self.n[v] * n[v] / n_total[v]
self.n = n_total
@property
def mean(self):
"""Compute the mean for bins with at least 1 count."""
mean = np.full(self.n_bins, np.nan)
valid = self.n > 0
mean[valid] = self._mean[valid]
return mean
@property
def std(self):
"""Compute the standard deviation for bins with at least 1 count."""
std = np.full(self.n_bins, np.nan)
valid = self.n > 1
std[valid] = np.sqrt(self._m2[valid] / (self.n[valid] - 1))
return std
@property
def bin_centers(self):
"""Compute the center for each bin."""
return 0.5 * (self.bin_edges[:-1] + self.bin_edges[1:])
@property
def bin_width(self):
"""Compute the width of all bins."""
return np.diff(self.bin_edges)
[docs]def create_lookup_function(binned_stat):
"""
Returns a function f(x,y) that evaluates the lookup at a point.
"""
cx = 0.5 * (binned_stat.x_edge[0:-1] + binned_stat.x_edge[1:])
cy = 0.5 * (binned_stat.y_edge[0:-1] + binned_stat.y_edge[1:])
z = binned_stat.statistic
z[~np.isfinite(z)] = 0 # make sure there are no infs or nans
interpolator = RectBivariateSpline(x=cx, y=cy, z=z, kx=1, ky=1, s=0)
return lambda x, y: interpolator.ev(x, y) # go back to TeV and evaluate
[docs]def compute_psf(data, ebins, radius):
nbin = len(ebins) - 1
psf = np.zeros(nbin)
psf_err = np.zeros(nbin)
for idx in range(nbin):
emin = ebins[idx]
emax = ebins[idx + 1]
sel = data.loc[
(data["true_energy"] >= emin) & (data["true_energy"] < emax), ["xi"]
]
if len(sel) != 0:
psf[idx] = np.percentile(sel["xi"], radius)
psf_err[idx] = psf[idx] / np.sqrt(len(sel))
else:
psf[idx] = 0.0
psf_err[idx] = 0.0
return psf, psf_err
[docs]def load_tel_id(file_name=None, tel_id=None):
"""Load R0 and R1 waveforms for 1 telescope."""
if file_name is None:
raise ValueError("input information is undefined")
else:
r0_waveforms = read_table(file_name, f"/r0/event/telescope/tel_{tel_id:03d}")
r1_waveforms = read_table(file_name, f"/r1/event/telescope/tel_{tel_id:03d}")
return r0_waveforms, r1_waveforms