import argparse
import math
from typing import List, Union
import numpy as np
import astropy.units as u
import matplotlib.pyplot as plt
import os.path as path
from ctapipe.io import EventSource
from ctapipe.instrument import TelescopeDescription
from ctapipe.coordinates import TelescopeFrame
[docs]class bcolors:
"""Color definitions for standard and debug printing."""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
BOLDWARNING = "\033[1m\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
BOLDGREEN = "\033[1m\033[92m"
REVERSED = "\033[7m"
PURPLE = "\033[95m"
[docs]def save_fig(outdir, name, fig=None):
"""Save a figure in multiple formats."""
for ext in ["pdf", "png"]:
if fig:
fig.savefig(path.join(outdir, f"{name}.{ext}"))
else:
plt.savefig(path.join(outdir, f"{name}.{ext}"))
return None
[docs]class SignalHandler:
"""handles ctrl+c signals; set up via
`signal_handler = SignalHandler()
`signal.signal(signal.SIGINT, signal_handler)`
# or for two step interupt:
`signal.signal(signal.SIGINT, signal_handler.stop_drawing)`
"""
def __init__(self):
self.stop = False
self.draw = True
[docs] def __call__(self, signal, frame):
if self.stop:
print("you pressed Ctrl+C again -- exiting NOW")
exit(-1)
print("you pressed Ctrl+C!")
print("exiting after current event")
self.stop = True
[docs] def stop_drawing(self, signal, frame):
if self.stop:
print("you pressed Ctrl+C again -- exiting NOW")
exit(-1)
if self.draw:
print("you pressed Ctrl+C!")
print("turn off drawing")
self.draw = False
else:
print("you pressed Ctrl+C!")
print("exiting after current event")
self.stop = True
[docs]def str2bool(v):
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
[docs]def make_argparser():
from os.path import expandvars
parser = argparse.ArgumentParser(description="")
# Add configuration file
parser.add_argument("--config_file", type=str, required=True)
parser.add_argument("-o", "--outfile", type=str, required=True)
parser.add_argument(
"-m",
"--max_events",
type=int,
default=None,
help="maximum number of events considered per file",
)
parser.add_argument(
"-i",
"--indir",
type=str,
required=True,
help="Input folder",
)
parser.add_argument(
"-f",
"--infile_list",
type=str,
required=True,
nargs="*",
help="give a specific list of files to run on",
)
parser.add_argument(
"--cam_ids",
type=str,
default=["LSTCam", "NectarCam"],
nargs="*",
help="give the specific list of camera types to run on",
)
parser.add_argument(
"--wave_dir",
type=str,
default=None,
help="directory where to find mr_filter. " "if not set look in $PATH",
)
parser.add_argument(
"--wave_temp_dir",
type=str,
default="/dev/shm/",
help="directory where mr_filter to store the temporary fits" " files",
)
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument(
"--wave",
dest="mode",
action="store_const",
const="wave",
default="tail",
help="if set, use wavelet cleaning -- default",
)
mode_group.add_argument(
"--tail",
dest="mode",
action="store_const",
const="tail",
help="if set, use tail cleaning, otherwise wavelets",
)
return parser
[docs]def final_array_to_use(
original_array, subarray_selection: Union[str, List[int]], subarrays=None
):
"""Infer IDs of telescopes and cameras with equivalent focal lengths.
This is an helper function for utils.prod3b_array.
Parameters
----------
subarrays : `dict` [`str`, `list` [`int`]], optional
Dictionary of subarray names linked to lists of tel_ids
automatically extracted by telescope type.
If set, it will extract tel_ids from there, otherwise from the custom
list given by 'array'.
original_array : `ctapipe.instrument.SubarrayDescription`
Full simulated array from the first event.
subarray_selection : {`str`, `list` [`int`]}
Custom list of telescope IDs that the user wants to use or name of
specific subarray.
Returns
-------
tel_ids : `list` [`int`]
List of telescope IDs to use in a format readable by ctapipe.
cams_and_foclens : `dict` [`str`, `float`]
Dictionary containing the IDs of the involved cameras as inferred from
the involved telescopes IDs, together with the equivalent focal lengths
of the telescopes.
The camera IDs will feed both the estimators and the image cleaning.
The equivalent focal lengths will be used to calculate the radius of
the camera on which cut for truncated images.
subarray : ctapipe.instrument.SubarrayDescription
Complete subarray information of the final array/subarray selected.
"""
if subarrays:
tel_ids = subarrays[subarray_selection]
selected_subarray = original_array.select_subarray(
tel_ids, name="selected_subarray"
)
else:
selected_subarray = original_array.select_subarray(
subarray_selection, name="selected_subarray"
)
tel_ids = selected_subarray.tel_ids
tel_types = selected_subarray.telescope_types
cams_and_foclens = {
tel_types[i]
.camera.camera_name: tel_types[i]
.optics.equivalent_focal_length.value
for i in range(len(tel_types))
}
return set(tel_ids), cams_and_foclens, selected_subarray
[docs]def prod5N_array(file_name, site: str, subarray_selection: Union[str, List[int]]):
"""Return tel IDs and involved cameras from configuration and simtel file.
Alpha configurations refer to the subarrays selected in [1]_.
Parameters
----------
file_name : `str`
Name of the first file of the list of files given by the user.
subarray_selection : {`str`, `list` [`int`]}
Name or list if telescope IDs which identifies the subarray to extract.
site : `str`
Can be only "north" or "south".
Returns
-------
tel_ids : `list` [`int`]
List of telescope IDs to use in a format readable by ctapipe.
cameras : `list` [`str`]
List of camera types inferred from tel_ids.
This will feed both the estimators and the image cleaning.
References
----------
.. [1] Cherenkov Telescope Array Observatory, & Cherenkov Telescope Array
Consortium. (2021). CTAO Instrument Response Functions - prod5 version
v0.1 (v0.1) [Data set]. Zenodo. https://doi.org/10.5281/zenodo.5499840
"""
with EventSource(input_url=file_name, max_events=1) as source:
original_array = source.subarray
tot_num_tels = original_array.num_tels
# prod5N_alpha_north
# CTA North Advanced Threshold Layout D25
# Prod5 North
# 4 LSTs and 9 MSTs (NectarCam type)
# prod5N_alpha_south
# CTA South
# Prod5 layout S-M6C5-14MSTs37SSTs-MSTF
# 0 LSTs and 14 MSTs (FlashCam type), 37 SSTs
site_to_subarray_name = {
"north": ["prod5N_alpha_north"],
"south": ["prod5N_alpha_south", "prod5N_alpha_south_NectarCam"],
}
name_to_tel_ids = {
"full_array": original_array.tel_ids,
"prod5N_alpha_north": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 19, 35],
"prod5N_alpha_south": [
5,
6,
8,
10,
11,
12,
13,
14,
126,
16,
125,
20,
24,
26,
30,
33,
34,
35,
36,
37,
38,
39,
40,
41,
42,
43,
44,
45,
46,
47,
48,
49,
50,
51,
52,
53,
133,
59,
61,
66,
67,
68,
69,
70,
71,
72,
73,
143,
144,
145,
146,
],
"prod5N_alpha_south_NectarCam": [
30,
33,
34,
35,
36,
37,
38,
39,
40,
41,
42,
43,
44,
45,
46,
47,
48,
49,
50,
51,
52,
53,
59,
61,
66,
67,
68,
69,
70,
71,
72,
73,
100,
101,
103,
105,
106,
107,
108,
109,
111,
115,
119,
121,
128,
129,
133,
143,
144,
145,
146,
],
}
# Add subarrays by telescope type for the full original array
# and extract the same from the alpha congiguration subarrays
for tel_type in original_array.telescope_types:
name_to_tel_ids[f"full_array_{tel_type}"] = original_array.get_tel_ids_for_type(
tel_type
)
name_to_tel_ids[f"prod5N_alpha_north_subarray_{tel_type}"] = set(
name_to_tel_ids[f"full_array_{tel_type}"]
).intersection(name_to_tel_ids["prod5N_alpha_north"])
name_to_tel_ids[f"prod5N_alpha_south_subarray_{tel_type}"] = set(
name_to_tel_ids[f"full_array_{tel_type}"]
).intersection(name_to_tel_ids["prod5N_alpha_south"])
# Check if the user is using the correct file for the site declared in
# the configuration
if site.lower() == "north" and (tot_num_tels > 84):
raise ValueError("\033[91m Input simtel file and site are uncorrelated!\033[0m")
if site.lower() == "south" and (tot_num_tels < 180):
raise ValueError("\033[91m infile and site uncorrelated! \033[0m")
# Validate the subarray selection in case it is a string
if type(subarray_selection) is str:
if subarray_selection not in name_to_tel_ids:
raise ValueError(
f"""\033[91m {subarray_selection} is not a supported subarray_selection for this production.
Possible choices are:
- use a custom list of IDs,
- add a new subarray definition to protopipe.utils.prod5N_array.
"""
)
elif subarray_selection not in site_to_subarray_name[site]:
raise ValueError(
f"""\033[91m {subarray_selection} is not a supported subarray_selection for the {site} site.
"""
)
else:
print(
f"\033[94m Extracting telescope IDs for {subarray_selection}...\033[0m"
)
return final_array_to_use(
original_array, subarray_selection, name_to_tel_ids
)
else: # subarray_selection is a list of telescope IDs
print(
f"\033[94m Extracting telescope IDs list = {subarray_selection}...\033[0m"
)
return final_array_to_use(original_array, subarray_selection)
return final_array_to_use(original_array, subarray_selection)
[docs]def prod3b_array(file_name, site, array):
"""Return tel IDs and involved cameras from configuration and simtel file.
The initial check (and the too-high cyclomatic complexity) will disappear
with the advent of the final array layouts.
Parameters
----------
file_name : `str`
Name of the first file of the list of files given by the user.
array : {`str`, `list` [`int`]}
Name of the subarray or - if not supported - a custom list of telescope
IDs that the user wants to use
site : `str`
Can be only "north" or "south".
Currently relevant only for baseline simulations.
For non-baseline simulations only custom lists of IDs matter.
Returns
-------
tel_ids : `list` [`int`]
List of telescope IDs to use in a format readable by ctapipe.
cameras : `list` [`str`]
List of camera types inferred from tel_ids.
This will feed both the estimators and the image cleaning.
"""
with EventSource(input_url=file_name, max_events=1) as source:
original_array = source.subarray # get simulated array
# Dictionaries of subarray names for BASELINE simulations
subarrays_N = { # La Palma has only 2 cameras
"subarray_LSTs": original_array.get_tel_ids_for_type("LST_LST_LSTCam"),
"subarray_MSTs": original_array.get_tel_ids_for_type("MST_MST_NectarCam"),
"full_array": original_array.tel_ids,
}
subarrays_S = { # Paranal has only 3 cameras
"subarray_LSTs": original_array.get_tel_ids_for_type("LST_LST_LSTCam"),
"subarray_MSTs": original_array.get_tel_ids_for_type("MST_MST_FlashCam"),
"subarray_SSTs": original_array.get_tel_ids_for_type("SST_GCT_CHEC"),
"full_array": original_array.tel_ids,
}
if site.lower() == "north":
if original_array.num_tels > 19: # this means non-baseline simulation..
if (
original_array.num_tels > 125 # Paranal non-baseline
or original_array.num_tels == 99 # Paranal baseline
or original_array.num_tels == 98 # gamma_test_large
):
raise ValueError(
"\033[91m ERROR: infile and site uncorrelated! \033[0m"
)
if (type(array) is str) and (array != "full_array"):
raise ValueError(
"\033[91m ERROR: Only 'full_array' supported for this production.\n\
Please, use that or define a custom array with a list of tel_ids. \033[0m"
)
elif array == "full_array":
return final_array_to_use(original_array, array, subarrays_N)
elif (
type(array) == list
): # ..for which only custom lists are currently supported
return final_array_to_use(original_array, array)
else:
raise ValueError(
f"\033[91m ERROR: array {array} not supported. \033[0m"
)
else: # this is a baseline simulation
if type(array) == str:
if array not in subarrays_N.keys():
raise ValueError(
"\033[91m ERROR: requested missing camera from simtel file. \033[0m"
)
else:
return final_array_to_use(original_array, array, subarrays_N)
elif type(array) == list:
if any((tel_id < 1 or tel_id > 19) for tel_id in array):
raise ValueError(
"\033[91m ERROR: non-existent telescope ID. \033[0m"
)
return final_array_to_use(original_array, array)
else:
raise ValueError(
f"\033[91m ERROR: array {array} not supported. \033[0m"
)
elif site.lower() == "south":
if original_array.num_tels > 99: # this means non-baseline simulation..
if original_array.num_tels < 126:
raise ValueError(
"\033[91m ERROR: infile and site uncorrelated! \033[0m"
)
if type(array) == str and array != "full_array":
raise ValueError(
"\033[91m ERROR: Only 'full_array' supported for this production.\n\
Please, use that or define a custom array with a list of tel_ids. \033[0m"
)
if array == "full_array":
return final_array_to_use(original_array, array, subarrays_S)
elif (
type(array) == list
): # ..for which only custom lists are currently supported
return final_array_to_use(original_array, array)
else:
raise ValueError(
f"\033[91m ERROR: Array {array} not supported. \033[0m"
)
else: # this is a baseline simulation
if original_array.num_tels == 19:
raise ValueError(
"\033[91m ERROR: infile and site uncorrelated! \033[0m"
)
if type(array) == str:
if array not in subarrays_S.keys():
raise ValueError(
"\033[91m ERROR: requested missing camera from simtel file. \033[0m"
)
else:
if original_array.num_tels == 98: # this is gamma_test_large
subarrays_S[
"subarray_SSTs"
] = original_array.get_tel_ids_for_type(
"SST_ASTRI_ASTRICam" # in this file SSTs are ASTRI
)
return final_array_to_use(original_array, array, subarrays_S)
elif type(array) == list:
if any((tel_id < 1 or tel_id > 99) for tel_id in array):
raise ValueError(
"\033[91m ERROR: non-existent telescope ID. \033[0m"
)
return final_array_to_use(original_array, array)
else:
raise ValueError(
f"\033[91m ERROR: Array {array} not supported. \033[0m"
)
[docs]def effective_focal_lengths(camera_name):
"""Provide the effective focal length for the required camera.
Data comes from Konrad's table.
Parameters
----------
camera_name : str
Name of the camera from ctapipe.instrument.CameraDescription
Returns
-------
eff_foc_len : float
Effective focal length in meters
"""
effective_focal_lengths = {
"LSTCam": 29.30565 * u.m,
"NectarCam": 16.44505 * u.m,
"FlashCam": 16.44505 * u.m,
"ASTRICam": 2.15191 * u.m,
"SCTCam": 5.58310 * u.m,
"CHEC": 2.30913 * u.m,
"DigiCam": 5.69705 * u.m,
}
eff_foc_len = effective_focal_lengths[camera_name]
return eff_foc_len
[docs]def camera_radius(camid_to_efl, cam_id="all"):
"""Get camera radii.
Inspired from pywi-cta CTAMarsCriteria, CTA Mars like preselection cuts.
This should be replaced by a function in ctapipe getting the radius either
from the pixel poisitions or from an external database
Notes
-----
average_camera_radius_meters = math.tan(math.radians(average_camera_radius_degree)) * foclen
The average camera radius values are, in degrees :
- LST: 2.31
- Nectar: 4.05
- Flash: 3.95
- SST-1M: 4.56
- GCT-CHEC-S: 3.93
- ASTRI: 4.67
"""
average_camera_radii_deg = {
"ASTRICam": 4.67,
"CHEC": 3.93,
"DigiCam": 4.56,
"FlashCam": 3.95,
"NectarCam": 4.05,
"LSTCam": 2.31,
"SCTCam": 4.0, # dummy value
}
if cam_id in camid_to_efl.keys():
foclen_meters = camid_to_efl[cam_id]
average_camera_radius_meters = (
math.tan(math.radians(average_camera_radii_deg[cam_id])) * foclen_meters
)
elif cam_id == "all":
print("Available camera radii in meters:")
for cam_id in camid_to_efl.keys():
print(f"* {cam_id} : {camera_radius(camid_to_efl, cam_id)}")
average_camera_radius_meters = 0
else:
raise ValueError("Unknown camid", cam_id)
return average_camera_radius_meters
[docs]def get_cameras_radii(subarray, frame=TelescopeFrame(), ctamars=False):
"""Get the radius of a camera using ctapipe.
Parameters
----------
camera_name: str
Identifier for the camera.
frame: astropy.coordinates.baseframe.BaseCoordinateFrame
Coordinate frame in which to get the radius of the camera
ctamars: bool
If True return the hard-coded values from CTAMARS (default: False)
Returns
-------
camera_radii: dict
Dictionary with camera names as keys and their radius as value
"""
if ctamars:
average_camera_radii_deg = {
"ASTRICam": 4.67,
"CHEC": 3.93,
"DigiCam": 4.56,
"FlashCam": 3.95,
"NectarCam": 4.05,
"LSTCam": 2.31,
"SCTCam": 4.0,
}
return average_camera_radii_deg
camera_radii = {}
for tel in subarray.telescope_types:
cam_name = tel.camera.camera_name
geom = tel.camera.geometry
new_geom = geom.transform_to(frame)
camera_radii[cam_name] = new_geom.guess_radius()
return camera_radii