import numpy as np
import pandas as pd
from sklearn.metrics import auc, roc_curve
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
[docs]def prepare_data(ds, derived_features, cuts, select_data=True, label=None):
"""Add custom variables to the input data and optionally select it.
Parameters
----------
ds : pandas.DataFrame
Input data not yet selected.
derived_features: dict
Dictionary of more complex featuresread from the configuration file.
cuts: str
Fiducial cuts from protopipe.mva.utils.make_cut_list
select_data: bool
If True apply cuts to the final dataframe.
label: str
Name of the classifier target label if any.
Returns
-------
ds : pandas.DataFrame
Input data integrated with new variables and optionally selected for
the fiducial cuts.
"""
# This is always useful
ds["log10_true_energy"] = np.log10(ds["true_energy"])
if label is not None: # only for classification
ds["label"] = np.full(len(ds), label)
# This is needed because our reference analysis uses energy as
# feature for classification
# We should propably support a more elastic choice in the future.
if not all(
i in derived_features
for i in ["log10_reco_energy", "log10_reco_energy_tel"]
):
raise ValueError(
"log10_reco_energy and log10_reco_energy_tel need to be model features."
)
# Compute derived features and add them to the dataframe
for feature_name, feature_expression in derived_features.items():
ds.eval(f"{feature_name} = {feature_expression}", inplace=True)
if select_data:
ds = ds.query(cuts)
return ds
[docs]def make_cut_list(cuts):
cut_list = ""
for idx, cut in enumerate(cuts):
cut_list += cut
if idx != len(cuts) - 1:
cut_list += " and "
return cut_list
[docs]def split_train_test(survived_images, train_fraction, feature_name_list, target_name):
"""Split the data selected for cuts in train and test samples.
If the estimator is a classifier, data is split in a stratified fashion,
using this as the class labels.
Parameters
----------
survived_images: `~pandas.DataFrame`
Images that survived the selection cuts.
train_fraction: `float`
Fraction of data to be used for training.
feature_name_list: `list`
List of variables to use for training the model.
target_name: `str`
Variable against which to train.
Returns
-------
X_train: `~pandas.DataFrame`
Data frame
X_test: `~pandas.DataFrame`
Data frame
y_train: `~pandas.DataFrame`
Data frame
y_test: `~pandas.DataFrame`
Data frame
data_train: `~pandas.DataFrame`
Training data indexed by observation ID and event ID.
data_test: `~pandas.DataFrame`
Test data indexed by observation ID and event ID.
"""
# If the estimator is a classifier, data is split in a stratified fashion,
# using this as the class labels
labels = None
if target_name == "label":
labels = survived_images[target_name]
if train_fraction != 1.0:
data_train, data_test = train_test_split(
survived_images,
train_size=train_fraction,
random_state=0,
shuffle=True,
stratify=labels,
)
y_train = data_train[target_name]
X_train = data_train[feature_name_list]
y_test = data_test[target_name]
X_test = data_test[feature_name_list]
data_train = data_train.set_index(["obs_id", "event_id"])
data_test = data_test.set_index(["obs_id", "event_id"])
else:
# if the user wants to use the whole input dataset
# there is not 'test' data, though we shuffle anyway
data_train = survived_images
shuffle(data_train, random_state=0, n_samples=None)
y_train = data_train[target_name]
X_train = data_train[feature_name_list]
data_test = None
y_test = None
X_test = None
return X_train, X_test, y_train, y_test, data_train, data_test
[docs]def get_evt_subarray_model_output(
data,
weight_name=None,
keep_cols=["reco_energy"],
model_output_name="score_img",
model_output_name_evt="score",
):
"""
Returns DataStore with keepcols + score/target columns of model at the
level-subarray-event.
Parameters
----------
data: `~pandas.DataFrame`
Data frame
weight_name: `str`
Variable name in data frame to weight events with
keep_cols: `list`, optional
List of variables to keep in resulting data frame
model_output_name: `str`, optional
Name of model output (image level)
model_output_name_evt: `str`, optional
Name of averaged model output (event level)
Returns
-------
data: `~pandas.DataFrame`
Data frame
"""
keep_cols += [model_output_name]
keep_cols += [weight_name]
new_data = data[keep_cols].copy(deep=True)
new_data[model_output_name_evt] = np.zeros(len(new_data))
new_data.set_index(["tel_id"], append=True, inplace=True)
new_data[model_output_name_evt] = new_data.groupby(["obs_id", "event_id"]).apply(
lambda g: np.average(g[model_output_name], weights=g[weight_name])
)
# Remove columns
new_data = new_data.drop(columns=[model_output_name])
# Remove duplicates
new_data = new_data[~new_data.index.duplicated(keep="first")]
return new_data
[docs]def get_evt_model_output(
data_dict,
weight_name=None,
reco_energy_label="reco_energy",
model_output_name="score_img",
model_output_name_evt="score",
):
"""
Returns DataStore with reco energy + score/target columns of model at the level-event.
Parameters
----------
data: `~pandas.DataFrame`
Data frame with at least (obs_id, evt_id), image score/proba and label
weight_name: `str`
Variable name in data frame to weight events with
reco_energy_label: `list`, optional
List of variables to keep in resulting data frame
model_output_name: `str`, optional
Name of model output (image level)
model_output_name: `str`, optional
Name of averaged model output (event level)
Returns
-------
data: `~pandas.DataFrame`
Data frame
Warnings
--------
Need optimisation, too much loops.
"""
# Get list of observations
obs_ids = np.array([])
for cam_id in data_dict:
obs_ids = np.hstack((obs_ids, data_dict[cam_id].index.get_level_values(0)))
obs_ids = np.unique(obs_ids)
obs_id_list = []
evt_id_list = []
model_output_list = []
reco_energy_list = []
label_list = []
# Loop on observation Id
for obs_id in obs_ids:
# Get joint list of events
evt_ids = np.array([])
for cam_id in data_dict:
evt_ids = np.hstack(
(evt_ids, data_dict[cam_id].loc[(obs_id,)].index.get_level_values(0))
)
evt_ids = np.unique(evt_ids)
for evt_id in evt_ids:
output = np.array([])
weight = np.array([])
for cam_id in data_dict:
try: # Stack camera information
data = data_dict[cam_id].xs(obs_id).xs(evt_id)
output = np.hstack((output, data[model_output_name]))
weight = np.hstack((weight, data[weight_name]))
reco_energy = np.hstack((np.array([]), data[reco_energy_label]))[
0
] # single entry is enough
label = np.hstack((np.array([]), data["label"]))[
0
] # single entry is enough
except:
pass # No information for this type of camera
obs_id_list.append(obs_id)
evt_id_list.append(evt_id)
model_output_list.append(np.sum((output * weight)) / np.sum(weight))
reco_energy_list.append(reco_energy)
label_list.append(label)
data = {
"obs_id": obs_id_list,
"event_id": evt_id_list,
model_output_name_evt: model_output_list,
reco_energy_label: reco_energy_list,
"label": label_list,
}
return pd.DataFrame(data=data)
[docs]def plot_roc_curve(ax, model_output, y, **kwargs):
"""Plot ROC curve for a given set of model outputs and labels"""
fpr, tpr, _ = roc_curve(y_score=model_output, y_true=y)
roc_auc = auc(fpr, tpr)
label = "{} (area={:.2f})".format(kwargs.pop("label"), roc_auc) # Remove label
ax.plot(fpr, tpr, label=label, **kwargs)
return ax
[docs]def plot_hist(
ax, data, nbin, limit, norm=False, yerr=False, hist_kwargs={}, error_kw={}
):
"""Utility function to plot histogram"""
bin_edges = np.linspace(limit[0], limit[-1], nbin + 1, True)
y, tmp = np.histogram(data, bins=bin_edges)
weights = np.ones_like(y)
if norm is True:
weights = weights / float(np.sum(y))
if yerr is True:
yerr = np.sqrt(y) * weights
else:
yerr = np.zeros(len(y))
centers = 0.5 * (bin_edges[1:] + bin_edges[:-1])
width = bin_edges[1:] - bin_edges[:-1]
ax.bar(
centers, y * weights, width=width, yerr=yerr, error_kw=error_kw, **hist_kwargs
)
return ax
[docs]def plot_distributions(
feature_list, data_list, nbin=30, hist_kwargs_list={}, error_kw_list={}, ncols=2
):
"""Plot feature distributions for several data set. Returns list of axes."""
import matplotlib.pyplot as plt
n_feature = len(feature_list)
nrows = (
int(n_feature / ncols)
if n_feature % ncols == 0
else int((n_feature + 1) / ncols)
)
fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5 * ncols, 5 * nrows))
if nrows == 1 and ncols == 1:
axes = [axes]
else:
axes = axes.flatten()
for i, colname in enumerate(feature_list):
ax = axes[i]
# Range for binning
range_min = min([data[colname].min() for data in data_list])
range_max = max([data[colname].max() for data in data_list])
myrange = [range_min, range_max]
for j, data in enumerate(data_list):
ax = plot_hist(
ax=ax,
data=data[colname],
nbin=nbin,
limit=myrange,
norm=True,
yerr=True,
hist_kwargs=hist_kwargs_list[j],
error_kw=error_kw_list[j],
)
ax.set_xlabel(colname)
ax.set_ylabel("Arbitrary units")
ax.legend(loc="upper left")
ax.grid()
plt.tight_layout()
return fig, axes
[docs]def plot_profile(ax, data, xcol, ycol, nbin, limit, hist_kwargs={}):
"""Plot profile of a distribution"""
bin_edges = np.linspace(limit[0], limit[-1], nbin + 1, True)
bin_center = 0.5 * (bin_edges[1:] + bin_edges[:-1])
bin_width = bin_edges[1:] - bin_edges[:-1]
y = []
yerr = []
for idx in range(len(bin_center)):
counts = data[
(data[xcol] > bin_edges[idx]) & (data[xcol] <= bin_edges[idx + 1])
][ycol]
y.append(counts.mean())
yerr.append(counts.std() / np.sqrt(len(counts)))
print(y)
print(yerr)
ax.errorbar(x=bin_center, y=y, xerr=bin_width / 2.0, yerr=yerr, **hist_kwargs)
return ax