"""
This module contains classes and functions that enable writing AOT FITS files.
"""
import aotpy
from ._file import AOTFITSFile, AOTFITSInternalImage, AOTFITSExternalImage
from ._strings import TELESCOPE_TYPE_MAIN, TELESCOPE_TYPE_LLT, TELESCOPE_SEGMENT_TYPE_MONOLITHIC, \
TELESCOPE_SEGMENT_TYPE_HEXAGON, TELESCOPE_SEGMENT_TYPE_CIRCLE, SOURCE_TYPE_SCIENCE_STAR, \
SOURCE_TYPE_NATURAL_GUIDE_STAR, SOURCE_TYPE_SODIUM_LASER_GUIDE_STAR, SOURCE_TYPE_RAYLEIGH_LASER_GUIDE_STAR, \
WAVEFRONT_SENSOR_TYPE_SHACK_HARTMANN, WAVEFRONT_SENSOR_TYPE_PYRAMID, WAVEFRONT_CORRECTOR_TYPE_DM, \
WAVEFRONT_CORRECTOR_TYPE_TTM, WAVEFRONT_CORRECTOR_TYPE_LS, LOOPS_TYPE_CONTROL, LOOPS_TYPE_OFFLOAD, \
LOOPS_STATUS_CLOSED, LOOPS_STATUS_OPEN
from .compat import latest_version
from .utils import card_from_metadatum, FITSURLImage, FITSFileImage
from ..base import SystemWriter
# TODO all checks need to happen here
# check for columns of matching length
# check for image dimensions
[docs]
class AOTFITSWriter(SystemWriter):
def __init__(self, system: aotpy.AOSystem) -> None:
self._file = AOTFITSFile()
self._internalimages: dict[str, aotpy.Image] = {}
self._times: dict[str, aotpy.Time] = {}
self._aberrations: dict[str, aotpy.Aberration] = {}
self._telescopes: dict[str, aotpy.Telescope] = {}
self._sources: dict[str, aotpy.Source] = {}
self._detectors: dict[str, aotpy.Detector] = {}
self._wavefrontsensors: dict[str, aotpy.WavefrontSensor] = {}
self._wavefrontcorrectors: dict[str, aotpy.WavefrontCorrector] = {}
self._scoring_cameras: dict[str, aotpy.ScoringCamera] = {}
self._loops: dict[str, aotpy.Loop] = {}
self._atmosphericparameters: dict[str, aotpy.AtmosphericParameters] = {}
self._sys: aotpy.AOSystem = system
self._convert_header()
for atm in self._sys.atmosphere_params:
self._convert_atmospheric_parameters(atm)
self._convert_telescope(self._sys.main_telescope, True)
for src in self._sys.sources:
self._convert_source(src, True)
for cam in self._sys.scoring_cameras:
self._convert_scoring_camera(cam)
for wfs in self._sys.wavefront_sensors:
self._convert_wavefront_sensor(wfs, True)
for cor in self._sys.wavefront_correctors:
self._convert_wavefront_corrector(cor, True)
for loop in self._sys.loops:
self._convert_loop(loop)
self._file.verify_all_table_contents()
[docs]
def write(self, filename, **kwargs) -> None:
self._file.to_file(filename, **kwargs)
[docs]
def get_hdulist(self):
"""
Get the HDUList that produces the AOT FITS file for the initialized `system`.
"""
return self._file.to_hdulist()
def _handle_reference(self, name: str, d: dict, obj: aotpy.Referenceable) -> bool:
if obj.uid in d:
if d[obj.uid] is obj:
# Same object, already added
return True
raise ValueError(f"Different {name} objects share the same UID '{obj.uid}'.")
d[obj.uid] = obj
return False
def _convert_header(self):
hdr = self._file.primary_header
hdr.version = latest_version
hdr.ao_mode = self._sys.ao_mode
hdr.date_beg = self._sys.date_beginning
hdr.date_end = self._sys.date_end
hdr.system_name = self._sys.name
hdr.strehl_ratio = self._sys.strehl_ratio
hdr.strehl_wavelength = self._sys.strehl_wavelength
hdr.config = self._sys.config
hdr.metadata = [card_from_metadatum(md) for md in self._sys.metadata]
def _convert_time(self, time: aotpy.Time) -> str | None:
if time is None:
return None
if self._handle_reference('Time', self._times, time):
return time.uid
if time.timestamps and time.frame_numbers and len(time.timestamps) != len(time.frame_numbers):
raise ValueError(f"Error in Time '{time.uid}': If both 'timestamps' and 'frame_numbers' are non-null, they "
f"must have the same length.")
tbl = self._file.time_table
tbl.uid.append(time.uid)
tbl.timestamps.append(time.timestamps)
tbl.frame_numbers.append(time.frame_numbers)
return time.uid
def _convert_atmospheric_parameters(self, atm: aotpy.AtmosphericParameters) -> None:
if atm is None:
raise ValueError("'AOSystem.atmosphere_params' list cannot contain 'None' items.")
if self._handle_reference('AtmpshericParameters', self._atmosphericparameters, atm):
raise ValueError("'AOSystem.atmosphere_params' list cannot contain repeated items.")
tbl = self._file.atmospheric_parameters_table
tbl.uid.append(atm.uid)
tbl.wavelength.append(atm.wavelength)
tbl.time_uid.append(self._convert_time(atm.time))
tbl.r0.append(atm.r0)
tbl.seeing.append(atm.seeing)
tbl.tau0.append(atm.tau0)
tbl.theta0.append(atm.theta0)
tbl.layers_rel_weight.append(self._convert_image(atm.layers_relative_weight))
tbl.layers_height.append(self._convert_image(atm.layers_height))
tbl.layers_l0.append(self._convert_image(atm.layers_l0))
tbl.layers_wind_speed.append(self._convert_image(atm.layers_wind_speed))
tbl.layers_wind_direction.append(self._convert_image(atm.layers_wind_direction))
tbl.transformation_matrix.append(self._convert_image(atm.transformation_matrix))
def _convert_aberration(self, abr: aotpy.Aberration) -> str | None:
if abr is None:
return None
if self._handle_reference('Aberration', self._aberrations, abr):
return abr.uid
tbl = self._file.aberrations_table
tbl.uid.append(abr.uid)
tbl.modes.append(self._convert_image(abr.modes))
tbl.coefficients.append(self._convert_image(abr.coefficients))
tbl.x_offsets.append([off.x for off in abr.offsets])
tbl.y_offsets.append([off.y for off in abr.offsets])
return abr.uid
def _convert_telescope(self, tel: aotpy.Telescope, enforce: bool = False, llt: bool = False) -> str | None:
if tel is None:
if enforce:
raise ValueError("'AOSystem.main_telescope' must not be 'None'.")
return None
if self._handle_reference('Telescope', self._telescopes, tel):
return tel.uid
if isinstance(tel, aotpy.MainTelescope):
tel_type = TELESCOPE_TYPE_MAIN
if llt:
raise ValueError("Referenced laser launch telescope must be of type 'aotpy.LaserLaunchTelescope'.")
elif tel is not self._sys.main_telescope:
raise ValueError("Telescope references cannot reference a 'MainTelescope' object that"
" is not AOSystem.main_telescope.")
elif isinstance(tel, aotpy.LaserLaunchTelescope):
tel_type = TELESCOPE_TYPE_LLT
if enforce:
raise ValueError("'AOSystem.main_telescope' must be of type 'aotpy.MainTelescope'.")
else:
raise ValueError(f"Unexpected type '{type(tel)}' for Telescope object.")
if tel.segments is None:
raise ValueError("'aotpy.Telescope.segments' cannot be 'None'.")
if isinstance(tel.segments, aotpy.Monolithic):
segments_type = TELESCOPE_SEGMENT_TYPE_MONOLITHIC
elif isinstance(tel.segments, aotpy.HexagonalSegments):
segments_type = TELESCOPE_SEGMENT_TYPE_HEXAGON
elif isinstance(tel.segments, aotpy.CircularSegments):
segments_type = TELESCOPE_SEGMENT_TYPE_CIRCLE
else:
raise ValueError(f"Unexpected type '{type(tel.segments)}' for Segments object.")
tbl = self._file.telescopes_table
tbl.uid.append(tel.uid)
tbl.type.append(tel_type)
tbl.latitude.append(tel.latitude)
tbl.longitude.append(tel.longitude)
tbl.elevation.append(tel.elevation)
tbl.azimuth.append(tel.azimuth)
tbl.parallactic.append(tel.parallactic)
tbl.pupil_mask.append(self._convert_image(tel.pupil_mask))
tbl.pupil_angle.append(tel.pupil_angle)
tbl.enclosing_d.append(tel.enclosing_diameter)
tbl.inscribed_d.append(tel.inscribed_diameter)
tbl.obstruction_d.append(tel.obstruction_diameter)
tbl.segment_type.append(segments_type)
tbl.segment_size.append(tel.segments.size)
tbl.segments_x.append([coord.x for coord in tel.segments.coordinates])
tbl.segments_y.append([coord.y for coord in tel.segments.coordinates])
tbl.transformation_matrix.append(self._convert_image(tel.transformation_matrix))
return tel.uid
def _convert_source(self, src: aotpy.Source, enforce: bool = False) -> str | None:
if src is None:
if enforce:
raise ValueError("'AOSystem.sources' list cannot contain 'None' items.")
return None
if self._handle_reference('Source', self._sources, src):
if enforce:
raise ValueError("'AOSystem.sources' list cannot contain repeated items.")
return src.uid
elif not enforce:
raise ValueError(f"Source '{src.uid}' was referenced but is not on the 'AOSystem.sources' list.")
if isinstance(src, aotpy.ScienceStar):
src_type = SOURCE_TYPE_SCIENCE_STAR
elif isinstance(src, aotpy.NaturalGuideStar):
src_type = SOURCE_TYPE_NATURAL_GUIDE_STAR
elif isinstance(src, aotpy.SodiumLaserGuideStar):
src_type = SOURCE_TYPE_SODIUM_LASER_GUIDE_STAR
sec_tbl = self._file.sources_sodium_lgs_table
sec_tbl.uid.append(src.uid)
sec_tbl.height.append(src.height)
sec_tbl.profile.append(self._convert_image(src.profile))
sec_tbl.altitudes.append(src.altitudes)
sec_tbl.llt_uid.append(self._convert_telescope(src.laser_launch_telescope, llt=True))
elif isinstance(src, aotpy.RayleighLaserGuideStar):
src_type = SOURCE_TYPE_RAYLEIGH_LASER_GUIDE_STAR
sec_tbl = self._file.sources_rayleigh_lgs_table
sec_tbl.uid.append(src.uid)
sec_tbl.distance.append(src.distance)
sec_tbl.depth.append(src.depth)
sec_tbl.llt_uid.append(self._convert_telescope(src.laser_launch_telescope, llt=True))
else:
raise ValueError(f"Unexpected type '{type(src)}' for Source object.")
tbl = self._file.sources_table
tbl.uid.append(src.uid)
tbl.type.append(src_type)
tbl.right_ascension.append(src.right_ascension)
tbl.declination.append(src.declination)
tbl.elevation_offset.append(src.elevation_offset)
tbl.azimuth_offset.append(src.azimuth_offset)
tbl.fwhm.append(src.fwhm)
return src.uid
def _convert_detector(self, det: aotpy.Detector) -> str | None:
if det is None:
return None
if self._handle_reference('Detector', self._detectors, det):
return det.uid
tbl = self._file.detectors_table
tbl.uid.append(det.uid)
tbl.type.append(det.type)
tbl.sampling_technique.append(det.sampling_technique)
tbl.shutter_type.append(det.shutter_type)
tbl.flat_field.append(self._convert_image(det.flat_field))
tbl.readout_noise.append(det.readout_noise)
tbl.pixel_intensities.append(self._convert_image(det.pixel_intensities))
tbl.field_centre_x.append(det.field_centre.x)
tbl.field_centre_y.append(det.field_centre.y)
tbl.integration_time.append(det.integration_time)
tbl.coadds.append(det.coadds)
tbl.dark.append(self._convert_image(det.dark))
tbl.weight_map.append(self._convert_image(det.weight_map))
tbl.quantum_efficiency.append(det.quantum_efficiency)
tbl.pixel_scale.append(det.pixel_scale)
tbl.binning.append(det.binning)
tbl.bandwidth.append(det.bandwidth)
tbl.transmission_wavelength.append(det.transmission_wavelength)
tbl.transmission.append(det.transmission)
tbl.sky_background.append(self._convert_image(det.sky_background))
tbl.gain.append(det.gain)
tbl.excess_noise.append(det.excess_noise)
tbl.filter.append(det.filter)
tbl.bad_pixel_map.append(self._convert_image(det.bad_pixel_map))
tbl.dynamic_range.append(det.dynamic_range)
tbl.readout_rate.append(det.readout_rate)
tbl.frame_rate.append(det.frame_rate)
tbl.transformation_matrix.append(self._convert_image(det.transformation_matrix))
return det.uid
def _convert_scoring_camera(self, cam: aotpy.ScoringCamera) -> None:
if cam is None:
raise ValueError("'AOSystem.scoring_cameras' list cannot contain 'None' items.")
if self._handle_reference('ScoringCamera', self._scoring_cameras, cam):
raise ValueError("'AOSystem.scoring_cameras' list cannot contain repeated items.")
tbl = self._file.scoring_cameras_table
tbl.uid.append(cam.uid)
tbl.pupil_mask.append(self._convert_image(cam.pupil_mask))
tbl.wavelength.append(cam.wavelength)
tbl.transformation_matrix.append(self._convert_image(cam.transformation_matrix))
tbl.detector_uid.append(self._convert_detector(cam.detector))
tbl.aberration_uid.append(self._convert_aberration(cam.aberration))
def _convert_wavefront_sensor(self, wfs: aotpy.WavefrontSensor, enforce: bool = False):
if wfs is None:
if enforce:
raise ValueError("'AOSystem.wavefront_sensors' list cannot contain 'None' items.")
return None
if self._handle_reference('WavefrontSensor', self._wavefrontsensors, wfs):
if enforce:
raise ValueError("'AOSystem.wavefront_sensors' list cannot contain repeated items.")
return wfs.uid
elif not enforce:
raise ValueError(f"WavefrontSensor '{wfs.uid}' was referenced but is not on the "
f"'AOSystem.wavefront_sensors' list.")
if isinstance(wfs, aotpy.ShackHartmann):
wfs_type = WAVEFRONT_SENSOR_TYPE_SHACK_HARTMANN
sec_tbl = self._file.wavefront_sensors_shack_hartmann_table
sec_tbl.uid.append(wfs.uid)
sec_tbl.centroiding_algorithm.append(wfs.centroiding_algorithm)
sec_tbl.centroid_gains.append(self._convert_image(wfs.centroid_gains))
sec_tbl.spot_fwhm.append(self._convert_image(wfs.spot_fwhm))
elif isinstance(wfs, aotpy.Pyramid):
wfs_type = WAVEFRONT_SENSOR_TYPE_PYRAMID
sec_tbl = self._file.wavefront_sensors_pyramid_table
sec_tbl.uid.append(wfs.uid)
sec_tbl.n_sides.append(wfs.n_sides)
sec_tbl.modulation.append(wfs.modulation)
else:
raise ValueError(f"Unexpected type '{type(wfs)}' for WavefrontSensor object.")
tbl = self._file.wavefront_sensors_table
tbl.uid.append(wfs.uid)
tbl.type.append(wfs_type)
tbl.source_uid.append(self._convert_source(wfs.source))
tbl.dimensions.append(wfs.dimensions)
tbl.n_valid_subapertures.append(wfs.n_valid_subapertures)
tbl.measurements.append(self._convert_image(wfs.measurements))
tbl.ref_measurements.append(self._convert_image(wfs.ref_measurements))
tbl.subaperture_mask.append(self._convert_image(wfs.subaperture_mask))
tbl.mask_x_offsets.append([off.x for off in wfs.mask_offsets])
tbl.mask_y_offsets.append([off.y for off in wfs.mask_offsets])
tbl.subaperture_size.append(wfs.subaperture_size)
tbl.subaperture_intensities.append(self._convert_image(wfs.subaperture_intensities))
tbl.wavelength.append(wfs.wavelength)
tbl.optical_gain.append(self._convert_image(wfs.optical_gain))
tbl.transformation_matrix.append(self._convert_image(wfs.transformation_matrix))
tbl.detector_uid.append(self._convert_detector(wfs.detector))
tbl.aberration_uid.append(self._convert_aberration(wfs.aberration))
tbl.ncpa_uid.append(self._convert_aberration(wfs.non_common_path_aberration))
return wfs.uid
def _convert_wavefront_corrector(self, wfc: aotpy.WavefrontCorrector, enforce: bool = False):
if wfc is None:
if enforce:
raise ValueError("'AOSystem.wavefront_correctors' list cannot contain 'None' items.")
return None
if self._handle_reference('WavefrontCorrector', self._wavefrontcorrectors, wfc):
if enforce:
raise ValueError("'AOSystem.wavefront_correctors' list cannot contain repeated items.")
return wfc.uid
elif not enforce:
raise ValueError(f"WavefrontCorrector '{wfc.uid}' was referenced but is not on the "
f"'AOSystem.wavefront_correctors' list.")
if isinstance(wfc, aotpy.DeformableMirror):
wfc_type = WAVEFRONT_CORRECTOR_TYPE_DM
sec_tbl = self._file.wavefront_correctors_dm_table
sec_tbl.uid.append(wfc.uid)
sec_tbl.actuators_x.append([coord.x for coord in wfc.actuator_coordinates])
sec_tbl.actuators_y.append([coord.y for coord in wfc.actuator_coordinates])
sec_tbl.influence_function.append(self._convert_image(wfc.influence_function))
sec_tbl.stroke.append(wfc.stroke)
elif isinstance(wfc, aotpy.TipTiltMirror):
wfc_type = WAVEFRONT_CORRECTOR_TYPE_TTM
elif isinstance(wfc, aotpy.LinearStage):
wfc_type = WAVEFRONT_CORRECTOR_TYPE_LS
else:
raise ValueError(f"Unexpected type '{type(wfc)}' for WavefrontCorrector object.")
tbl = self._file.wavefront_correctors_table
tbl.uid.append(wfc.uid)
tbl.type.append(wfc_type)
tbl.telescope_uid.append(self._convert_telescope(wfc.telescope))
tbl.n_valid_actuators.append(wfc.n_valid_actuators)
tbl.pupil_mask.append(self._convert_image(wfc.pupil_mask))
tbl.tfz_num.append(wfc.tfz_num)
tbl.tfz_den.append(wfc.tfz_den)
tbl.transformation_matrix.append(self._convert_image(wfc.transformation_matrix))
tbl.aberration_uid.append(self._convert_aberration(wfc.aberration))
return wfc.uid
def _convert_loop(self, loop: aotpy.Loop) -> None:
if loop is None:
raise ValueError("'AOSystem.loops' list cannot contain 'None' items.")
if self._handle_reference('Loop', self._loops, loop):
raise ValueError("'AOSystem.loops' list cannot contain repeated items.")
if isinstance(loop, aotpy.ControlLoop):
loop_type = LOOPS_TYPE_CONTROL
sec_tbl = self._file.loops_control_table
sec_tbl.uid.append(loop.uid)
sec_tbl.input_sensor_uid.append(self._convert_wavefront_sensor(loop.input_sensor))
sec_tbl.modes.append(self._convert_image(loop.modes))
sec_tbl.modal_coefficients.append(self._convert_image(loop.modal_coefficients))
sec_tbl.control_matrix.append(self._convert_image(loop.control_matrix))
sec_tbl.measurements_to_modes.append(self._convert_image(loop.measurements_to_modes))
sec_tbl.modes_to_commands.append(self._convert_image(loop.modes_to_commands))
sec_tbl.interaction_matrix.append(self._convert_image(loop.interaction_matrix))
sec_tbl.commands_to_modes.append(self._convert_image(loop.commands_to_modes))
sec_tbl.modes_to_measurements.append(self._convert_image(loop.modes_to_measurements))
sec_tbl.residual_commands.append(self._convert_image(loop.residual_commands))
elif isinstance(loop, aotpy.OffloadLoop):
loop_type = LOOPS_TYPE_OFFLOAD
sec_tbl = self._file.loops_offload_table
sec_tbl.uid.append(loop.uid)
sec_tbl.input_corrector_uid.append(self._convert_wavefront_corrector(loop.input_corrector))
sec_tbl.offload_matrix.append(self._convert_image(loop.offload_matrix))
else:
raise ValueError(f"Unexpected type '{type(loop)}' for Loop object.")
tbl = self._file.loops_table
tbl.uid.append(loop.uid)
tbl.type.append(loop_type)
tbl.commanded_uid.append(self._convert_wavefront_corrector(loop.commanded_corrector))
tbl.time_uid.append(self._convert_time(loop.time))
tbl.status.append(LOOPS_STATUS_CLOSED if loop.closed else LOOPS_STATUS_OPEN)
tbl.commands.append(self._convert_image(loop.commands))
tbl.ref_commands.append(self._convert_image(loop.ref_commands))
tbl.framerate.append(loop.framerate)
tbl.delay.append(loop.delay)
tbl.time_filter_num.append(self._convert_image(loop.time_filter_num))
tbl.time_filter_den.append(self._convert_image(loop.time_filter_den))
def _convert_image(self, img: aotpy.Image) -> AOTFITSInternalImage | AOTFITSExternalImage | None:
if img is None:
return None
time_uid = self._convert_time(img.time)
if isinstance(img, FITSURLImage):
im = AOTFITSExternalImage(img.name, img.data, img.unit, None,
[card_from_metadatum(md) for md in img.metadata], True, img.url, img.index)
elif isinstance(img, FITSFileImage):
im = AOTFITSExternalImage(img.name, img.data, img.unit, None,
[card_from_metadatum(md) for md in img.metadata], False, img.filename, img.index)
elif isinstance(img, aotpy.Image):
if img.name in self._internalimages:
if self._internalimages[img.name] is img:
# Same object, already added
return self._file.internal_images[img.name]
raise ValueError(f"Different Image objects share the same name '{img.name}'.")
self._internalimages[img.name] = img
im = AOTFITSInternalImage(img.name, img.data, img.unit, None,
[card_from_metadatum(md) for md in img.metadata])
self._file.internal_images[img.name] = im
else:
raise ValueError(f"Unexpected type '{type(img)}' for Image object.")
im.time_uid = time_uid
return im