Source code for aotpy.io.fits._file

import numbers
import os
import re
import warnings
from dataclasses import dataclass, field
import datetime
from enum import IntEnum

import numpy as np
from astropy.io import fits

from . import _strings as _s
from . import compat
from ._strings import *
from .utils import _keyword_is_relevant, _valid_filename, _datetime_to_iso

_image_reference_pattern = re.compile(r'([^<]+)<(.+)>(\d+)?')
_row_reference_pattern = re.compile(rf'{_s.ROW_REFERENCE}<(.+)>')

# NaN is defined here as a single precision float (32-bits), which is the lowest possible float precision in FITS.
# The goal is to ensure that low precision numpy arrays aren't unnecessarily upcasted just because of NaN.
_nan = np.single(np.nan)

# Given that all integer values in AOT are "counts" they should not be negative. Therefore, we represent null integers
# with the lowest integer in a signed 16-bit integer, in order to avoid unnecessary upcasting.
_int_min = np.iinfo(np.int16).min


class AOTVerificationWarning(UserWarning):
    pass


class AOTVerificationException(Exception):
    pass


class AOTFITSErrorLevel(IntEnum):
    PEDANTIC = 1
    """Goes against recommendations"""
    WARNING = 2
    """No functionality loss, but unexpected behavior"""
    SERIOUS = 3
    """Functionality and/or data integrity compromised"""
    CRITICAL = 4
    """Complete loss of functionality"""


class AOTFITSErrorManager:
    def __init__(self, *, exception_level: AOTFITSErrorLevel = AOTFITSErrorLevel.SERIOUS,
                 log_level: AOTFITSErrorLevel = AOTFITSErrorLevel.PEDANTIC):
        if not isinstance(exception_level, AOTFITSErrorLevel):
            raise ValueError("Invalid exception_level parameter")

        if not isinstance(log_level, AOTFITSErrorLevel):
            raise ValueError("Invalid exception_level parameter")

        self.exception_level = exception_level
        self.log_level = log_level

        self.error: dict[str, list[tuple[AOTFITSErrorLevel, str]]] = {}

    def add_error(self, level: AOTFITSErrorLevel, message: str, context: str):
        self.error.setdefault(context, []).append((level, message))
        if level >= self.exception_level:
            # TODO be more explicit on the error
            raise AOTVerificationException(f'[{context}] {message} (Error level: {level.name})')
        if level >= self.log_level:
            warnings.warn(f'[{context}] {message} (Error level: {level.name})', AOTVerificationWarning, stacklevel=2)

    def print_full_report(self):
        # TODO make this pretty
        if not self.error:
            print('No errors found.')
        else:
            print('Some errors were found:')
            for context in self.error:
                for level, message in self.error[context]:
                    print(f'[{context}] {message} (Error level: {level.name})')


[docs] def verify_file(path: str | os.PathLike, log_level=AOTFITSErrorLevel.CRITICAL, exception_level=AOTFITSErrorLevel.CRITICAL, **kwargs): f = AOTFITSFile.from_file(path, log_level=log_level, exception_level=exception_level, **kwargs) f.issue_manager.print_full_report()
@dataclass class AOTField: name: str format: str unit: str mandatory: bool = False unique: bool = False reference: str = None description: str = None data: list = field(default_factory=list) allowed_list: list = None ignored: bool = False canonical_name: str = field(init=False, default=None) def __getitem__(self, item) -> str | int | float | list: return self.data[item] def append(self, item: str | int | float | list): self.data.append(item) def __post_init__(self): # 'name' might be modifed for reading older specifications, so we keep a canonical copy for writing self.canonical_name = self.name class AOTFieldList(list): def __init__(self, *args: AOTField): super().__init__(args) self._dict = None def _get_dict(self) -> dict: if self._dict is None: self._dict = {fld.name: fld for fld in self} return self._dict def __getitem__(self, key) -> AOTField: if isinstance(key, str): return self._get_dict()[key] return super().__getitem__(key) def __contains__(self, key): if isinstance(key, str): return key in self._get_dict() return super().__contains__(key) def rows_must_be_referenced(cls): """Decorator function for tables whose rows must be referenced.""" def check_was_referenced(self: AOTFITSTable): for uid, was_referenced in self.uid_was_referenced.items(): if not was_referenced: self.add_error(AOTFITSErrorLevel.WARNING, f"'{uid}' is never referenced.") cls.check_was_referenced = check_was_referenced return cls # TODO check for compressed image # fits.hdu.image.ExtensionHDU ? def get_image_from_hdu(hdu: fits.ImageHDU) -> tuple[str, np.ndarray, str, str, list[fits.Card]]: unit = None time_ref = None metadata: list[fits.Card] = [] for card in hdu.header.cards: if _keyword_is_relevant(card.keyword): if card.keyword == _s.IMAGE_UNIT: unit = card.value elif card.keyword == _s.TIME_REFERENCE: time_ref = card.value else: metadata.append(card) return hdu.name, hdu.data, unit, time_ref, metadata @dataclass class AOTFITSImage: extname: str data: np.ndarray unit: str time_ref: str metadata: list[fits.Card] time_uid: str = field(init=False, default=None) @dataclass class AOTFITSInternalImage(AOTFITSImage): was_referenced: bool = field(init=False, default=False) @classmethod def from_hdu(cls, hdu: fits.ImageHDU): return cls(*get_image_from_hdu(hdu)) def to_hdu(self) -> fits.ImageHDU: hdr = fits.Header(self.metadata) if self.time_uid is not None: hdr[TIME_REFERENCE] = AOTFITSFile.create_row_reference(self.time_uid) if self.unit is not None: hdr[IMAGE_UNIT] = self.unit return fits.ImageHDU(name=self.extname, data=self.data, header=hdr) def __str__(self): return f'{INTERNAL_REFERENCE}<{self.extname}>' @dataclass class AOTFITSExternalImage(AOTFITSImage): is_url: bool path: str index: int @classmethod def from_hdu(cls, hdu: fits.ImageHDU, prefix: str, path: str, index: int): return cls(*get_image_from_hdu(hdu), prefix == URL_REFERENCE, path, index) def __str__(self): ref = f'{URL_REFERENCE if self.is_url else FILE_REFERENCE}<{self.path}>' if self.index is None: return ref return f'{ref}{self.index}' # TODO check dimensions for images, warn if unexpected class AOTFITSFile: def __init__(self, *, exception_level=AOTFITSErrorLevel.SERIOUS, log_level=AOTFITSErrorLevel.WARNING, externals='manual', externals_dictionary=None, externals_directory=None, ignore_version=False, **kwargs): self.primary_header = AOTFITSPrimaryHeader(self) self.time_table = AOTFITSTableTime(self) self.atmospheric_parameters_table = AOTFITSTableAtmosphericParameters(self) self.aberrations_table = AOTFITSTableAberrations(self) self.telescopes_table = AOTFITSTableTelescopes(self) self.sources_table = AOTFITSTableSources(self) self.sources_sodium_lgs_table = AOTFITSTableSourcesSodiumLGS(self, self.sources_table) self.sources_rayleigh_lgs_table = AOTFITSTableSourcesRayleighLGS(self, self.sources_table) self.detectors_table = AOTFITSTableDetectors(self) self.scoring_cameras_table = AOTFITSTableScoringCameras(self) self.wavefront_sensors_table = AOTFITSTableWavefrontSensors(self) self.wavefront_sensors_shack_hartmann_table = AOTFITSTableWavefrontSensorsShackHartmann( self, self.wavefront_sensors_table) self.wavefront_sensors_pyramid_table = AOTFITSTableWavefrontSensorsPyramid(self, self.wavefront_sensors_table) self.wavefront_correctors_table = AOTFITSTableWavefrontCorrectors(self) self.wavefront_correctors_dm_table = AOTFITSTableWavefrontCorrectorsDM(self, self.wavefront_correctors_table) self.loops_table = AOTFITSTableLoops(self) self.loops_control_table = AOTFITSTableLoopsControl(self, self.loops_table) self.loops_offload_table = AOTFITSTableLoopsOffload(self, self.loops_table) self.tables: dict[str, AOTFITSTable] = { self.time_table.extname: self.time_table, self.atmospheric_parameters_table.extname: self.atmospheric_parameters_table, self.aberrations_table.extname: self.aberrations_table, self.telescopes_table.extname: self.telescopes_table, self.sources_table.extname: self.sources_table, self.sources_sodium_lgs_table.extname: self.sources_sodium_lgs_table, self.sources_rayleigh_lgs_table.extname: self.sources_rayleigh_lgs_table, self.detectors_table.extname: self.detectors_table, self.scoring_cameras_table.extname: self.scoring_cameras_table, self.wavefront_sensors_table.extname: self.wavefront_sensors_table, self.wavefront_sensors_shack_hartmann_table.extname: self.wavefront_sensors_shack_hartmann_table, self.wavefront_sensors_pyramid_table.extname: self.wavefront_sensors_pyramid_table, self.wavefront_correctors_table.extname: self.wavefront_correctors_table, self.wavefront_correctors_dm_table.extname: self.wavefront_correctors_dm_table, self.loops_table.extname: self.loops_table, self.loops_control_table.extname: self.loops_control_table, self.loops_offload_table.extname: self.loops_offload_table } self.internal_images: dict[str, AOTFITSInternalImage] = {} self.external_images: list[AOTFITSExternalImage] = [] self.extra_hdus: fits.HDUList = fits.HDUList() self.extra_images: fits.HDUList = fits.HDUList() self.issue_manager = AOTFITSErrorManager(exception_level=exception_level, log_level=log_level) if externals not in ['manual', 'enforce', 'ignore', 'disallow']: raise RuntimeError("Unknown value for 'externals' parameter") self.externals = externals self.externals_dictionary = externals_dictionary if externals_dictionary is not None else {} self.externals_directory = externals_directory self.ignore_version = ignore_version self.kwargs = kwargs @classmethod def from_file(cls, filename: str | os.PathLike, **kwargs): file = cls(**kwargs) file.read_from_file(filename) return file def get_extra_data(self) -> tuple[fits.HDUList, fits.HDUList, dict[str, list[fits.Column]]]: """ Return a tuple of extra data that may have been in AOTFITS file.""" return self.extra_hdus, self.extra_images, {k: v.extra_columns for k, v in self.tables.items() if v} def to_hdulist(self, discard_empty_tables=True) -> fits.HDUList: primary_hdu = self.primary_header.to_hdu() bintable_hdus = [table.to_hdu() for table in self.tables.values()] if discard_empty_tables: bintable_hdus = [x for x in bintable_hdus if (x.size > 0 or not isinstance(self.tables[x.name], AOTFITSSecondaryTable))] image_hdus = [image.to_hdu() for image in self.internal_images.values()] return fits.HDUList([primary_hdu, *bintable_hdus, *image_hdus]) def to_file(self, filename: str | os.PathLike, discard_empty_tables=True, **kwargs) -> None: self.to_hdulist(discard_empty_tables).writeto(filename, **kwargs) @staticmethod def create_row_reference(uid: str) -> str: return f'{ROW_REFERENCE}<{uid}>' def add_error(self, level: AOTFITSErrorLevel, message: str): self.issue_manager.add_error(level, message, "AOTFITSFile") def verify_all_table_contents(self): for table in self.tables.values(): table.verify_contents() def read_from_file(self, filename: str | os.PathLike): with fits.open(filename, **self.kwargs) as hdus: # TODO critical if can't be opened # TODO handle pedantic order self.primary_header.read_from_primary(hdus[0]) # Skip PrimaryHDU already handled above for i, hdu in enumerate(hdus[1:], start=1): if hdu.name in self.tables: table = self.tables[hdu.name] if table.found: # Table already found once self.extra_hdus.append(hdu) self.add_error(AOTFITSErrorLevel.WARNING, f"Table '{hdu.name}' repeated in file, ignoring further appearances.") continue table.found = True table.read_from_bintable(hdu) else: if hdu.is_image: if not hdu.name: self.extra_images.append(hdu) self.add_error(AOTFITSErrorLevel.WARNING, f"Image in HDU index {i} has no name and thus cannot be referenced.") continue if hdu.name in self.internal_images: self.extra_images.append(hdu) self.add_error(AOTFITSErrorLevel.WARNING, f"Multiple images in the file share the name '{hdu.name}'." f"Only the first instance will be used for referencing.") continue if hdu.name == 'PRIMARY': self.add_error(AOTFITSErrorLevel.PEDANTIC, "Image name 'PRIMARY' may be confused with the primary HDU.") if hdu.size == 0: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Image '{hdu.name}' has no data.") self.internal_images[hdu.name] = AOTFITSInternalImage.from_hdu(hdu) else: self.add_error(AOTFITSErrorLevel.WARNING, f"HDU '{hdu.name}' is not an AOT binary table nor an image.") self.extra_hdus.append(hdu) for table in self.tables.values(): if isinstance(table, AOTFITSSecondaryTable): table.verify_from_main() elif not table.found: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Missing mandatory table '{table.extname}'") table.verify_references() for image in self.internal_images.values(): if not image.was_referenced: self.extra_images.append(image) self.add_error(AOTFITSErrorLevel.WARNING, f"Image '{image.extname}' is never referenced") self.internal_images = {k: v for k, v in self.internal_images.items() if v.was_referenced} for image in list(self.internal_images.values()) + self.external_images: image.time_uid = self.time_table.handle_reference(image.time_ref) for table in self.tables.values(): table.check_was_referenced() self.verify_all_table_contents() def handle_reference(self, value: str): # Handles image references if value is None: return None fullmatch = _image_reference_pattern.fullmatch(value) if fullmatch is None: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Image reference '{value}' was ignored: not properly formatted") return None prefix, name, index = fullmatch.groups() if index is not None: try: index = int(index) except ValueError: self.add_error(AOTFITSErrorLevel.WARNING, f"Index in reference '{value}' was ignored: not properly formatted.") index = None match prefix: case _s.INTERNAL_REFERENCE: if index is not None: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Internal image reference '{name}' should not have an index (got '{index}').") try: im = self.internal_images[name] im.was_referenced = True except KeyError: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Image '{name}' not found in file") im = None case _s.FILE_REFERENCE: fullmatch = _valid_filename.fullmatch(name) if fullmatch is None: self.add_error(AOTFITSErrorLevel.WARNING, f"File name '{name}' contains disallowed characters.") im = self.get_external_image(prefix, name, index) if im is not None: self.external_images.append(im) case _s.URL_REFERENCE: im = self.get_external_image(prefix, name, index) if im is not None: self.external_images.append(im) case _: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Reference '{value}' was ignored: unknown reference type") return None return im def get_external_image(self, prefix: str, name: str, index: int) -> AOTFITSExternalImage | None: if self.externals == 'disallow': self.add_error(AOTFITSErrorLevel.CRITICAL, f"External image found while " f"'externals_option' parameter is set to 'disallow'") return None if self.externals == 'ignore': self.add_error(AOTFITSErrorLevel.WARNING, f"External image ignored due to 'ignore' option in" f"'externals_option' parameter.") return None if name in self.externals_dictionary: # Replace the name with a hardcoded replacement if it exists name = self.externals_dictionary[name] elif self.externals_directory is not None and prefix == FILE_REFERENCE: # If an externals directory is specified, prepend to file path name = os.path.join(self.externals_directory, name) try: with fits.open(name, **self.kwargs) as hdus: hdu = self.get_imagehdu_from_hdus(name, hdus, index) if hdu is None: return None return AOTFITSExternalImage.from_hdu(hdu, prefix, name, index) except FileNotFoundError: if self.externals == 'enforce': self.add_error(AOTFITSErrorLevel.CRITICAL, f"Could not automatically find {name}" f" while 'externals_option' parameter is set to 'enforce'.") # If it reaches this point, externals_option must be 'manual' from tkinter.filedialog import askopenfilename selected = askopenfilename(title=f"Please select '{name}'.", initialfile=name, filetypes=(('FITS files', '*.fits'), ('Compressed FITS files', '*.fits.gz'))) if not selected: self.add_error(AOTFITSErrorLevel.CRITICAL, f"Could not find '{name}' automatically." f" Manual selection is required.") return None with fits.open(selected, **self.kwargs) as hdus: hdu = self.get_imagehdu_from_hdus(selected, hdus, index) if hdu is None: return None return AOTFITSExternalImage.from_hdu(hdu, FILE_REFERENCE, selected, index) def get_imagehdu_from_hdus(self, filename: str | os.PathLike, hdus: fits.HDUList, index: int) -> fits.ImageHDU | None: if index is None: for hdu in hdus: if hdu.is_image and hdu.size > 0: break else: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Could not find any image data in '{filename}'.") return None else: hdu = hdus[index] if not hdu.is_image: self.add_error(AOTFITSErrorLevel.SERIOUS, f"HDU index {index} in '{filename}' is not an image.") return None elif hdu.size == 0: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"HDU index {index} in '{filename}' has no image data.") return hdu def handle_version(self): # TODO generalize if self.primary_header.version not in compat.known_versions: self.primary_header.add_error(AOTFITSErrorLevel.CRITICAL, f"Unknown version '{self.primary_header.version}'. " f"Use option 'ignore_version' to force reading.") self.primary_header.version = compat.latest_version if self.primary_header.version == compat.latest_version: return if self.primary_header.version == compat.AOTVersion(1, 0, 0): # Columns had different names self.atmospheric_parameters_table.seeing.name = compat.LEGACY_ATMOSPHERIC_PARAMETERS_FWHM self.atmospheric_parameters_table.layers_rel_weight.name = compat.LEGACY_ATMOSPHERIC_PARAMETERS_LAYERS_WEIGHT self.sources_table.fwhm.name = compat.LEGACY_SOURCE_WIDTH # Columns didn't exist self.detectors_table.field_centre_x.ignored = True self.detectors_table.field_centre_y.ignored = True # Dimensionless was represented differently for t in self.tables.values(): for fld in t.fields: if fld.unit == UNIT_DIMENSIONLESS: fld.unit = compat.LEGACY_DIMENSIONLESS else: # This should never happen raise NotImplementedError @dataclass class AOTFITSPrimaryHeader: parent: AOTFITSFile version: compat.AOTVersion = None ao_mode: str = None date_beg: datetime.datetime = None date_end: datetime.datetime = None system_name: str = None strehl_ratio: float = None strehl_wavelength: float = None config: str = None metadata: list[fits.Card] = field(default_factory=list) def add_error(self, level: AOTFITSErrorLevel, message: str): self.parent.issue_manager.add_error(level, message, "Primary Header") def read_from_primary(self, hdu: fits.PrimaryHDU): if hdu.size != 0: self.add_error(AOTFITSErrorLevel.WARNING, f"Primary HDU should not contain data array.") hdr = hdu.header if self.parent.ignore_version: self.version = compat.latest_version else: try: self.version = compat.AOTVersion.from_string(hdr[AOT_VERSION]) except ValueError: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Keyword '{AOT_VERSION}' not formatted properly.") self.version = compat.latest_version except KeyError: self.add_error(AOTFITSErrorLevel.CRITICAL, f"Keyword '{AOT_VERSION}' not found, file is likely " f"not AOT. Use option 'ignore_version' to force reading.") self.parent.handle_version() try: if (timesys := hdr[AOT_TIMESYS]) != AOT_TIMESYS_UTC: self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_TIMESYS}' should have the value" f" '{AOT_TIMESYS_UTC}'. Got {timesys} instead.") except KeyError: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory keyword '{AOT_TIMESYS}' missing.") try: self.ao_mode = hdr[AOT_AO_MODE] if self.ao_mode not in AOT_AO_MODE_SET: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Unknown value '{self.ao_mode}' for keyword '{AOT_AO_MODE}'." f" Expected one of {AOT_AO_MODE_SET}.") self.ao_mode = 'SCAO' except KeyError: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory keyword '{AOT_AO_MODE}' missing.") self.ao_mode = 'SCAO' if AOT_DATE_BEG in hdr and (date_str := hdr[AOT_DATE_BEG]): try: self.date_beg = datetime.datetime.fromisoformat(date_str).replace(tzinfo=datetime.UTC) except (ValueError, TypeError): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_DATE_BEG}' improperly formatted.") if AOT_DATE_END in hdr and (date_str := hdr[AOT_DATE_END]): try: self.date_end = datetime.datetime.fromisoformat(date_str).replace(tzinfo=datetime.UTC) except (ValueError, TypeError): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_DATE_END}' improperly formatted.") if AOT_SYSTEM_NAME in hdr: self.system_name = hdr[AOT_SYSTEM_NAME] if not isinstance(self.system_name, str): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_SYSTEM_NAME}' should have a string value.") if AOT_STREHL_RATIO in hdr: self.strehl_ratio = hdr[AOT_STREHL_RATIO] if not isinstance(self.strehl_ratio, float): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_STREHL_RATIO}' should have a floating-point value.") if AOT_STREHL_WAVELENGTH in hdr: self.strehl_wavelength = hdr[AOT_STREHL_WAVELENGTH] if not isinstance(self.strehl_wavelength, float): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_STREHL_WAVELENGTH}' should have a floating-point value.") if AOT_CONFIG in hdr: self.config = hdr[AOT_CONFIG] if not isinstance(self.config, str): self.add_error(AOTFITSErrorLevel.WARNING, f"Keyword '{AOT_CONFIG}' should have a string value.") self.metadata = [card for card in hdr.cards if card.keyword not in AOT_HEADER_SET and _keyword_is_relevant(card.keyword)] if self.metadata: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Found non-AOT keywords: {[x.keyword for x in self.metadata]}.") def to_hdu(self) -> fits.PrimaryHDU: # TODO check for types hdr = fits.Header() hdr[AOT_VERSION] = str(compat.latest_version) hdr[AOT_TIMESYS] = AOT_TIMESYS_UTC hdr[AOT_AO_MODE] = self.ao_mode if (date := _datetime_to_iso(self.date_beg)) is not None: hdr[AOT_DATE_BEG] = date if (date := _datetime_to_iso(self.date_end)) is not None: hdr[AOT_DATE_END] = date if self.system_name is not None: hdr[AOT_SYSTEM_NAME] = self.system_name if self.strehl_ratio is not None: hdr[AOT_STREHL_RATIO] = self.strehl_ratio if self.strehl_wavelength is not None: hdr[AOT_STREHL_RATIO] = self.strehl_wavelength if self.config is not None: hdr[AOT_CONFIG] = self.config hdr.extend(self.metadata) return fits.PrimaryHDU(header=hdr) @dataclass class AOTFITSTable: extname: str fields: AOTFieldList[AOTField] parent: AOTFITSFile uid: AOTField = field(init=False) uid_dict: dict[str, int] = field(init=False, default_factory=dict) uid_was_referenced: dict[str, bool] = field(init=False, default_factory=dict) found: bool = field(init=False, default=False) extra_columns: list[fits.Column] = field(init=False, default_factory=list) def add_error(self, level: AOTFITSErrorLevel, message: str): self.parent.issue_manager.add_error(level, message, self.extname) def read_from_bintable(self, hdu: fits.BinTableHDU): cols: fits.ColDefs = hdu.columns data: fits.FITS_rec = hdu.data n_rows = data.size found_columns = {} for col in cols: if col.name in self.fields: # Standard column if col.name in found_columns: # TODO AstroPy should have been able to detect this self.add_error(AOTFITSErrorLevel.WARNING, f"Column '{col.name}' repeated in table, ignoring further appearances.") continue else: found_columns[col.name] = col fld = self.fields[col.name] datacolumn = data[fld.name] fld.data = [_convert_null_to_none(x, col) for x in datacolumn] if fld.mandatory and None in fld.data: # This won't trigger on null lists, but there are no mandatory lists self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory column '{col.name}' contains null entries.") if _fits_type_to_aot(col.format) != fld.format: # Column doesn't match the expected format if fld.mandatory: # Very dangerous! self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory column '{col.name}' does not match the expected format.") elif all(v is None for v in fld.data): # It has no data, so this isn't really dangerous self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Empty column '{col.name}' does not match the expected format.") else: self.add_error(AOTFITSErrorLevel.WARNING, f"Column '{col.name}' does not match the expected format.") if fld.allowed_list is not None: for i, x in enumerate(fld.data): if x is not None and x not in fld.allowed_list: if fld.mandatory: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory column '{col.name}' contains unrecognized entry {x}. " f"Expected one of {fld.allowed_list}.") else: self.add_error(AOTFITSErrorLevel.WARNING, f"Column '{col.name}' contains unrecognized entry {x}. " f"Expected one of {fld.allowed_list}.") fld.data[i] = fld.allowed_list[0] if fld.unit: # Not dimensionless if col.unit != fld.unit: self.add_error(AOTFITSErrorLevel.WARNING, f"Column '{col.name}' does not match the expected unit.") else: # Dimensionless if col.unit is not None: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Dimensionless column '{col.name}' specifies a unit.") if fld.unique: # These are the UID fields if len(np.unique(datacolumn)) != len(datacolumn): self.add_error(AOTFITSErrorLevel.SERIOUS, f"Unique column '{col.name}' contains repeated entries.") else: self.extra_columns.append(col) self.add_error(AOTFITSErrorLevel.WARNING, f"Table contains non-AOT column '{col.name}'.") if list(found_columns.keys()) != [fld.name for fld in self.fields if fld.name in found_columns]: self.add_error(AOTFITSErrorLevel.PEDANTIC, f"Columns not in recommended order.") # Verify if all fields were found if len(found_columns) != len(self.fields): # Find which fields are missing for fld in self.fields: if fld.name not in found_columns: # Field might be ignored due to version compatibility reasons if not fld.ignored: if fld.mandatory: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory column '{fld.name}' is missing.") else: self.add_error(AOTFITSErrorLevel.WARNING, f"Column '{fld.name}' is missing.") # Column not found, so we create null data in its place if fld.format == LIST_FORMAT: fld.data = [[] for _ in range(n_rows)] else: fld.data = [None for _ in range(n_rows)] for i, value in enumerate(self.uid.data): self.uid_dict[value] = i self.uid_was_referenced[value] = False def verify_references(self): for fld in self.fields: if fld.reference is None: # Not a reference field continue match fld.reference: case _s.IMAGE_REF: referenced_table = self.parent case _s.TIME_TABLE: referenced_table = self.parent.time_table case _s.ABERRATIONS_TABLE: referenced_table = self.parent.aberrations_table case _s.TELESCOPES_TABLE: referenced_table = self.parent.telescopes_table case _s.DETECTORS_TABLE: referenced_table = self.parent.detectors_table case _s.SOURCES_TABLE: referenced_table = self.parent.sources_table case _s.WAVEFRONT_CORRECTORS_TABLE: referenced_table = self.parent.wavefront_correctors_table case _s.WAVEFRONT_SENSORS_TABLE: referenced_table = self.parent.wavefront_sensors_table case _: # This should never happen raise RuntimeError fld.data = [referenced_table.handle_reference(x) for x in fld.data] if fld.mandatory and None in fld.data: # This won't trigger on null lists, but there are no mandatory lists self.add_error(AOTFITSErrorLevel.SERIOUS, f"Mandatory column '{fld.name}' contains references that could not be resolved.") def verify_contents(self): pass def _ensure_has_data(self): if not self.uid.data: self.add_error(AOTFITSErrorLevel.WARNING, f"'{self.extname}' contains no data (at least one row expected).") def _ensure_same_length(self, a: AOTField, b: AOTField, independent=False): for i, (x, y) in enumerate(zip(a.data, b.data)): if not x and not y: # Both are empty lists if independent: # This is just the case for AOT_TIME, one must be not-null self.add_error(AOTFITSErrorLevel.SERIOUS, f"In table '{self.extname}', for UID '{self.uid[i]}', " f"both entries for '{a.name}' and '{b.name}' are null.") continue if (x and not y) or (y and not x): # One is emtpy and the other isn't if not independent: self.add_error(AOTFITSErrorLevel.SERIOUS, f"In table '{self.extname}', for UID '{self.uid[i]}', one" f" of '{a.name}' and '{b.name}' is null," f" while the other is not.") continue if len(x) != len(y): self.add_error(AOTFITSErrorLevel.SERIOUS, f"In table '{self.extname}', for UID '{self.uid[i]}', the " f"entries for '{a.name}' and '{b.name}' do not have the same" f" length.") def handle_reference(self, ref): # Handles row references # TODO if reference is missing we should create a fake placeholder if ref is None: return None fullmatch = _row_reference_pattern.fullmatch(ref) if fullmatch is None: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Row reference '{ref}' was ignored: not properly formatted") return None uid = fullmatch.group(1) if uid not in self.uid_dict: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Referenced UID '{uid}' not found in table {self.extname}") return None self.uid_was_referenced[uid] = True return uid def check_was_referenced(self): # We consider that by default rows in tables don't need to be referenced. The tables that need to have their # rows refereced use the 'rows_must_be_referenced' decorator. return def to_hdu(self): cols = [] for fld in self.fields: if fld.reference is not None: if fld.reference == _s.IMAGE_REF: # AOTFITSImage subclasses have a __str__ that creates the reference string automatically data = ['' if x is None else str(x) for x in fld.data] else: data = ['' if x is None else AOTFITSFile.create_row_reference(x) for x in fld.data] else: data = [self.convert_none_to_null(x, fld) for x in fld.data] if fld.format == LIST_FORMAT: array = np.empty(len(data), dtype=np.object_) # Convert every entry to numpy array, if any 64-bit floats are detected we need to use 'D' format. flag = False for i, l in enumerate(data): aux = np.array(l) if aux.dtype != np.float32: aux = aux.astype(np.float64, casting='safe') flag = True array[i] = aux # We always use the 'Q' format, meaning the VLAs use a 64-bit descriptor. # If we used the 'P' format we could potentially save some storage (64-bits per row per VLA column). # However, in scenarios with very large amounts of VLA data, the heap offset could overflow. # This is hard to calculate ahead of time, so we just prefer to take the small bump to file size. # Realistically, this size increase is insignificant when compared to the actual data being stored. col = fits.Column(name=fld.canonical_name, format=f"Q{'D' if flag else 'E'}", unit=fld.unit, array=array) else: # Convert to numpy array and try as much as possible to keep the resulting dtype array = np.array(data) if fld.format == STRING_FORMAT: # re-implements numpy's _get_num_chars for cross compatibility between numpy versions size = array.itemsize // 4 if issubclass(array.dtype.type, np.str_) else array.itemsize col = fits.Column(name=fld.canonical_name, format=f'{size}A', unit=fld.unit, array=array) elif fld.format == _s.INTEGER_FORMAT: if (t := array.dtype) == np.int16 or t == np.int8: f = '1I' elif t == np.int32: f = '1J' else: if t != np.int64 and array.size > 0: # If not int16, int32 or int64 make one last-ditch effort to convert to int64 by default array = array.astype(np.int64, casting='safe') f = '1K' col = fits.Column(name=fld.canonical_name, format=f, null=_int_min, unit=fld.unit, array=array) elif fld.format == _s.FLOAT_FORMAT: if (t := array.dtype) == np.float32: f = '1E' else: if t != np.float64 and array.size > 0: # If not float32 or float64 make one last-ditch effort to convert to float64 by default array = array.astype(np.float64, casting='safe') f = '1D' col = fits.Column(name=fld.canonical_name, format=f, unit=fld.unit, array=array) else: # This should never happen raise RuntimeError cols.append(col) return fits.BinTableHDU.from_columns(name=self.extname, columns=cols) def convert_none_to_null(self, value, fld: AOTField): if fld.mandatory and value is None: raise ValueError(f"Got 'None' value for mandatory field '{fld.name} on table '{self.extname}") match fld.format: case _s.STRING_FORMAT: if value is None: return '' if not isinstance(value, str) or (isinstance(value, np.ndarray) and not np.issubdtype(value.dtype, np.character)): raise ValueError(f"Unxpected value in table '{self.extname}' column '{fld.name}'. " f"Expected '{fld.format}' format, got: {value}.") case _s.FLOAT_FORMAT: if value is None: return _nan if not isinstance(value, numbers.Real): raise ValueError(f"Unxpected value in table '{self.extname}' column '{fld.name}'. " f"Expected '{fld.format}' format, got: {value}.") case _s.INTEGER_FORMAT: if value is None: return _int_min if not isinstance(value, numbers.Integral): raise ValueError(f"Unxpected value in table '{self.extname}' column '{fld.name}'. " f"Expected '{fld.format}' format, got: {value}.") case _s.LIST_FORMAT: if value is None: return [] try: value = [_nan if v is None else v for v in value] except TypeError: # Not iterable raise ValueError(f"Unxpected value in table '{self.extname}' column '{fld.name}'. " f"Expected '{fld.format}' format, got: {value}.") from None case _: # This will trigger a warning later pass return value @dataclass class AOTFITSSecondaryTable(AOTFITSTable): main: AOTFITSTable type_name: str def verify_references(self): for uid in self.uid.data: if uid not in self.main.uid_dict: self.add_error(AOTFITSErrorLevel.WARNING, f"UID '{uid}' in {self.extname} does not exist in {self.main.extname}.") super().verify_references() def verify_from_main(self): # If the main table has a row with the correct type, we need to have that UID in the secondary table expected_uids = [uid for uid, index in self.main.uid_dict.items() if self.main.type.data[index] == self.type_name] if expected_uids and not self.found: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Table {self.extname} does not exist even though there" f" are rows of type {self.type_name} in {self.main.extname}.") for uid in expected_uids: if uid not in self.uid_dict: if self.found: # Only report here if table is found, otherwise we'll be double reporting self.add_error(AOTFITSErrorLevel.SERIOUS, f"UID '{uid}' in {self.main.extname} has type " f"{self.type_name} but does not exist in table " f"{self.extname}.") self.uid_dict[uid] = len(self.uid.data) self.uid.data.append(uid) for fld in self.fields: if fld.name != REFERENCE_UID: if fld.format == LIST_FORMAT: fld.data.append([]) else: fld.data.append(None) @rows_must_be_referenced class AOTFITSTableTime(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.timestamps = AOTField(TIME_TIMESTAMPS, LIST_FORMAT, UNIT_SECONDS) self.frame_numbers = AOTField(TIME_FRAME_NUMBERS, LIST_FORMAT, UNIT_COUNT) fields = AOTFieldList(self.uid, self.timestamps, self.frame_numbers) super().__init__(TIME_TABLE, fields, parent) def verify_contents(self): self._ensure_same_length(self.timestamps, self.frame_numbers, True) class AOTFITSTableAtmosphericParameters(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.wavelength = AOTField(ATMOSPHERIC_PARAMETERS_WAVELENGTH, FLOAT_FORMAT, UNIT_METERS) self.time_uid = AOTField(TIME_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=TIME_TABLE) self.r0 = AOTField(ATMOSPHERIC_PARAMETERS_R0, LIST_FORMAT, UNIT_METERS) self.seeing = AOTField(ATMOSPHERIC_PARAMETERS_SEEING, LIST_FORMAT, UNIT_ARCSEC) self.tau0 = AOTField(ATMOSPHERIC_PARAMETERS_TAU0, LIST_FORMAT, UNIT_SECONDS) self.theta0 = AOTField(ATMOSPHERIC_PARAMETERS_THETA0, LIST_FORMAT, UNIT_RADIANS) self.layers_rel_weight = AOTField(ATMOSPHERIC_PARAMETERS_LAYERS_REL_WEIGHT, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.layers_height = AOTField(ATMOSPHERIC_PARAMETERS_LAYERS_HEIGHT, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.layers_l0 = AOTField(ATMOSPHERIC_PARAMETERS_LAYERS_L0, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.layers_wind_speed = AOTField(ATMOSPHERIC_PARAMETERS_LAYERS_WIND_SPEED, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.layers_wind_direction = AOTField(ATMOSPHERIC_PARAMETERS_LAYERS_WIND_DIRECTION, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.wavelength, self.time_uid, self.r0, self.seeing, self.tau0, self.theta0, self.layers_rel_weight, self.layers_height, self.layers_l0, self.layers_wind_speed, self.layers_wind_direction, self.transformation_matrix) super().__init__(ATMOSPHERIC_PARAMETERS_TABLE, fields, parent) @rows_must_be_referenced class AOTFITSTableAberrations(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.modes = AOTField(ABERRATION_MODES, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=IMAGE_REF) self.coefficients = AOTField(ABERRATION_COEFFICIENTS, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=IMAGE_REF) self.x_offsets = AOTField(ABERRATION_X_OFFSETS, LIST_FORMAT, UNIT_RADIANS) self.y_offsets = AOTField(ABERRATION_Y_OFFSETS, LIST_FORMAT, UNIT_RADIANS) fields = AOTFieldList(self.uid, self.modes, self.coefficients, self.x_offsets, self.y_offsets) super().__init__(ABERRATIONS_TABLE, fields, parent) def verify_contents(self): self._ensure_same_length(self.x_offsets, self.y_offsets) @rows_must_be_referenced class AOTFITSTableTelescopes(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(TELESCOPE_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=TELESCOPE_TYPE_LIST) self.latitude = AOTField(TELESCOPE_LATITUDE, FLOAT_FORMAT, UNIT_DEGREES) self.longitude = AOTField(TELESCOPE_LONGITUDE, FLOAT_FORMAT, UNIT_DEGREES) self.elevation = AOTField(TELESCOPE_ELEVATION, FLOAT_FORMAT, UNIT_DEGREES) self.azimuth = AOTField(TELESCOPE_AZIMUTH, FLOAT_FORMAT, UNIT_DEGREES) self.parallactic = AOTField(TELESCOPE_PARALLACTIC, FLOAT_FORMAT, UNIT_DEGREES) self.pupil_mask = AOTField(TELESCOPE_PUPIL_MASK, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.pupil_angle = AOTField(TELESCOPE_PUPIL_ANGLE, FLOAT_FORMAT, UNIT_RADIANS) self.enclosing_d = AOTField(TELESCOPE_ENCLOSING_D, FLOAT_FORMAT, UNIT_METERS) self.inscribed_d = AOTField(TELESCOPE_INSCRIBED_D, FLOAT_FORMAT, UNIT_METERS) self.obstruction_d = AOTField(TELESCOPE_OBSTRUCTION_D, FLOAT_FORMAT, UNIT_METERS) self.segment_type = AOTField(TELESCOPE_SEGMENTS_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=TELESCOPE_SEGMENT_LIST) self.segment_size = AOTField(TELESCOPE_SEGMENTS_SIZE, FLOAT_FORMAT, UNIT_METERS) self.segments_x = AOTField(TELESCOPE_SEGMENTS_X, LIST_FORMAT, UNIT_METERS) self.segments_y = AOTField(TELESCOPE_SEGMENTS_Y, LIST_FORMAT, UNIT_METERS) self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.aberration_uid = AOTField(ABERRATION_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=ABERRATIONS_TABLE) fields = AOTFieldList(self.uid, self.type, self.latitude, self.longitude, self.elevation, self.azimuth, self.parallactic, self.pupil_mask, self.pupil_angle, self.enclosing_d, self.inscribed_d, self.obstruction_d, self.segment_type, self.segment_size, self.segments_x, self.segments_y, self.transformation_matrix, self.aberration_uid) self.main_telescope_uid = None super().__init__(TELESCOPES_TABLE, fields, parent) def read_from_bintable(self, hdu: fits.BinTableHDU): super().read_from_bintable(hdu) main_telescopes_list = [uid for uid, t in zip(self.uid.data, self.type.data) if t == _s.TELESCOPE_TYPE_MAIN] if (n := len(main_telescopes_list)) == 0: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Table {self.extname} does not contain a main telescope.") # If we get to this point, try to create a placeholder telescope just for functionality rows = len(self.uid.data) # Filling the mandatory fields self.main_telescope_uid = 'FAKE PLACEHOLDER MAIN TELESCOPE' self.uid.data.append(self.main_telescope_uid) self.type.data.append(_s.TELESCOPE_TYPE_MAIN) self.segment_type.data.append(_s.TELESCOPE_SEGMENT_TYPE_MONOLITHIC) for fld in self.fields: if not fld.mandatory: # Mandatory fields are handled above if fld.format == _s.LIST_FORMAT: fld.data.append([]) else: fld.data.append(None) self.uid_dict[self.main_telescope_uid] = rows self.uid_was_referenced[self.main_telescope_uid] = True else: if n > 1: self.add_error(AOTFITSErrorLevel.SERIOUS, f"Table {self.extname} contains more than one main telescope.") self.main_telescope_uid = main_telescopes_list[0] self.uid_was_referenced[self.main_telescope_uid] = True def verify_contents(self): # TODO if Monolithic we should reject segment coordinates self._ensure_same_length(self.segments_x, self.segments_y) class AOTFITSTableSources(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(SOURCE_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=SOURCE_TYPE_LIST) self.right_ascension = AOTField(SOURCE_RIGHT_ASCENSION, FLOAT_FORMAT, UNIT_DEGREES) self.declination = AOTField(SOURCE_DECLINATION, FLOAT_FORMAT, UNIT_DEGREES) self.elevation_offset = AOTField(SOURCE_ELEVATION_OFFSET, FLOAT_FORMAT, UNIT_DEGREES) self.azimuth_offset = AOTField(SOURCE_AZIMUTH_OFFSET, FLOAT_FORMAT, UNIT_DEGREES) self.fwhm = AOTField(SOURCE_FWHM, FLOAT_FORMAT, UNIT_RADIANS) fields = AOTFieldList(self.uid, self.type, self.right_ascension, self.declination, self.elevation_offset, self.azimuth_offset, self.fwhm) super().__init__(SOURCES_TABLE, fields, parent) def verify_contents(self): self._ensure_has_data() class AOTFITSTableSourcesSodiumLGS(AOTFITSSecondaryTable): # TODO verify that llt_uid is not a MainTelescope def __init__(self, parent: AOTFITSFile, main: AOTFITSTableSources): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.height = AOTField(SOURCE_SODIUM_LGS_HEIGHT, FLOAT_FORMAT, UNIT_METERS) self.profile = AOTField(SOURCE_SODIUM_LGS_PROFILE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.altitudes = AOTField(SOURCE_SODIUM_LGS_ALTITUDES, LIST_FORMAT, UNIT_METERS) self.llt_uid = AOTField(LASER_LAUNCH_TELESCOPE_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=TELESCOPES_TABLE) fields = AOTFieldList(self.uid, self.height, self.profile, self.altitudes, self.llt_uid) super().__init__(SOURCES_SODIUM_LGS_TABLE, fields, parent, main, _s.SOURCE_TYPE_SODIUM_LASER_GUIDE_STAR) class AOTFITSTableSourcesRayleighLGS(AOTFITSSecondaryTable): # TODO verify that llt_uid is not a MainTelescope def __init__(self, parent: AOTFITSFile, main: AOTFITSTableSources): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.distance = AOTField(SOURCE_RAYLEIGH_LGS_DISTANCE, FLOAT_FORMAT, UNIT_METERS) self.depth = AOTField(SOURCE_RAYLEIGH_LGS_DEPTH, FLOAT_FORMAT, UNIT_METERS) self.llt_uid = AOTField(LASER_LAUNCH_TELESCOPE_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=TELESCOPES_TABLE) fields = AOTFieldList(self.uid, self.distance, self.depth, self.llt_uid) super().__init__(SOURCES_RAYLEIGH_LGS_TABLE, fields, parent, main, _s.SOURCE_TYPE_RAYLEIGH_LASER_GUIDE_STAR) @rows_must_be_referenced class AOTFITSTableDetectors(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(DETECTOR_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS) self.sampling_technique = AOTField(DETECTOR_SAMPLING_TECHNIQUE, STRING_FORMAT, UNIT_DIMENSIONLESS) self.shutter_type = AOTField(DETECTOR_SHUTTER_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS) self.flat_field = AOTField(DETECTOR_FLAT_FIELD, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.readout_noise = AOTField(DETECTOR_READOUT_NOISE, FLOAT_FORMAT, f'{UNIT_ELECTRONS}*{UNIT_SECONDS}^-1*{UNIT_PIXELS}^-1') self.pixel_intensities = AOTField(DETECTOR_PIXEL_INTENSITIES, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.field_centre_x = AOTField(DETECTOR_FIELD_CENTRE_X, FLOAT_FORMAT, UNIT_PIXELS) self.field_centre_y = AOTField(DETECTOR_FIELD_CENTRE_Y, FLOAT_FORMAT, UNIT_PIXELS) self.integration_time = AOTField(DETECTOR_INTEGRATION_TIME, FLOAT_FORMAT, UNIT_SECONDS) self.coadds = AOTField(DETECTOR_COADDS, INTEGER_FORMAT, UNIT_COUNT) self.dark = AOTField(DETECTOR_DARK, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.weight_map = AOTField(DETECTOR_WEIGHT_MAP, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.quantum_efficiency = AOTField(DETECTOR_QUANTUM_EFFICIENCY, FLOAT_FORMAT, UNIT_DIMENSIONLESS) self.pixel_scale = AOTField(DETECTOR_PIXEL_SCALE, FLOAT_FORMAT, f'{UNIT_RADIANS}*{UNIT_PIXELS}^-1') self.binning = AOTField(DETECTOR_BINNING, INTEGER_FORMAT, UNIT_COUNT) self.bandwidth = AOTField(DETECTOR_BANDWIDTH, FLOAT_FORMAT, UNIT_METERS) self.transmission_wavelength = AOTField(DETECTOR_TRANSMISSION_WAVELENGTH, LIST_FORMAT, UNIT_METERS) self.transmission = AOTField(DETECTOR_TRANSMISSION, LIST_FORMAT, UNIT_DIMENSIONLESS) self.sky_background = AOTField(DETECTOR_SKY_BACKGROUND, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.gain = AOTField(DETECTOR_GAIN, FLOAT_FORMAT, UNIT_ELECTRONS) self.excess_noise = AOTField(DETECTOR_EXCESS_NOISE, FLOAT_FORMAT, UNIT_ELECTRONS) self.filter = AOTField(DETECTOR_FILTER, STRING_FORMAT, UNIT_DIMENSIONLESS) self.bad_pixel_map = AOTField(DETECTOR_BAD_PIXEL_MAP, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.dynamic_range = AOTField(DETECTOR_DYNAMIC_RANGE, FLOAT_FORMAT, UNIT_DECIBELS) self.readout_rate = AOTField(DETECTOR_READOUT_RATE, FLOAT_FORMAT, f'{UNIT_PIXELS}*{UNIT_SECONDS}^-1') self.frame_rate = AOTField(DETECTOR_FRAME_RATE, FLOAT_FORMAT, f'{UNIT_FRAME}*{UNIT_SECONDS}^-1') self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.type, self.sampling_technique, self.shutter_type, self.flat_field, self.readout_noise, self.pixel_intensities, self.field_centre_x, self.field_centre_y, self.integration_time, self.coadds, self.dark, self.weight_map, self.quantum_efficiency, self.pixel_scale, self.binning, self.bandwidth, self.transmission_wavelength, self.transmission, self.sky_background, self.gain, self.excess_noise, self.filter, self.bad_pixel_map, self.dynamic_range, self.readout_rate, self.frame_rate, self.transformation_matrix) super().__init__(DETECTORS_TABLE, fields, parent) def verify_contents(self): self._ensure_same_length(self.transmission_wavelength, self.transmission) class AOTFITSTableScoringCameras(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.pupil_mask = AOTField(SCORING_CAMERA_PUPIL_MASK, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.wavelength = AOTField(SCORING_CAMERA_WAVELENGTH, FLOAT_FORMAT, UNIT_METERS) self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.detector_uid = AOTField(DETECTOR_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=DETECTORS_TABLE) self.aberration_uid = AOTField(ABERRATION_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=ABERRATIONS_TABLE) fields = AOTFieldList(self.uid, self.pupil_mask, self.wavelength, self.transformation_matrix, self.detector_uid, self.aberration_uid) super().__init__(SCORING_CAMERAS_TABLE, fields, parent) class AOTFITSTableWavefrontSensors(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(WAVEFRONT_SENSOR_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=WAVEFRONT_SENSOR_TYPE_LIST) self.source_uid = AOTField(SOURCE_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=SOURCES_TABLE) self.dimensions = AOTField(WAVEFRONT_SENSOR_DIMENSIONS, INTEGER_FORMAT, UNIT_COUNT, mandatory=True) self.n_valid_subapertures = AOTField(WAVEFRONT_SENSOR_N_VALID_SUBAPERTURES, INTEGER_FORMAT, UNIT_COUNT, mandatory=True) self.measurements = AOTField(WAVEFRONT_SENSOR_MEASUREMENTS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.ref_measurements = AOTField(WAVEFRONT_SENSOR_REF_MEASUREMENTS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.subaperture_mask = AOTField(WAVEFRONT_SENSOR_SUBAPERTURE_MASK, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.mask_x_offsets = AOTField(WAVEFRONT_SENSOR_MASK_X_OFFSETS, LIST_FORMAT, UNIT_PIXELS) self.mask_y_offsets = AOTField(WAVEFRONT_SENSOR_MASK_Y_OFFSETS, LIST_FORMAT, UNIT_PIXELS) self.subaperture_size = AOTField(WAVEFRONT_SENSOR_SUBAPERTURE_SIZE, FLOAT_FORMAT, UNIT_PIXELS) self.subaperture_intensities = AOTField(WAVEFRONT_SENSOR_SUBAPERTURE_INTENSITIES, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.wavelength = AOTField(WAVEFRONT_SENSOR_WAVELENGTH, FLOAT_FORMAT, UNIT_METERS) self.optical_gain = AOTField(WAVEFRONT_SENSOR_OPTICAL_GAIN, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.detector_uid = AOTField(DETECTOR_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=DETECTORS_TABLE) self.aberration_uid = AOTField(ABERRATION_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=ABERRATIONS_TABLE) self.ncpa_uid = AOTField(NCPA_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=ABERRATIONS_TABLE) fields = AOTFieldList(self.uid, self.type, self.source_uid, self.dimensions, self.n_valid_subapertures, self.measurements, self.ref_measurements, self.subaperture_mask, self.mask_x_offsets, self.mask_y_offsets, self.subaperture_size, self.subaperture_intensities, self.wavelength, self.optical_gain, self.transformation_matrix, self.detector_uid, self.aberration_uid, self.ncpa_uid) super().__init__(WAVEFRONT_SENSORS_TABLE, fields, parent) def verify_contents(self): # TODO "source" should not be a science star self._ensure_has_data() self._ensure_same_length(self.mask_x_offsets, self.mask_y_offsets) class AOTFITSTableWavefrontSensorsShackHartmann(AOTFITSSecondaryTable): # TODO # if dimensions != 2: # warnings.warn(f"Unexpected value for '{kw.WAVEFRONT_SENSOR_DIMENSIONS}' in wavefront sensor '{uid}'" # f" of type '{t}'. Expected 2, got {dimensions}.") def __init__(self, parent: AOTFITSFile, main: AOTFITSTableWavefrontSensors): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.centroiding_algorithm = AOTField(WAVEFRONT_SENSOR_SHACK_HARTMANN_CENTROIDING_ALGORITHM, STRING_FORMAT, UNIT_DIMENSIONLESS) self.centroid_gains = AOTField(WAVEFRONT_SENSOR_SHACK_HARTMANN_CENTROID_GAINS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.spot_fwhm = AOTField(WAVEFRONT_SENSOR_SHACK_HARTMANN_SPOT_FWHM, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.centroiding_algorithm, self.centroid_gains, self.spot_fwhm) super().__init__(WAVEFRONT_SENSORS_SHACK_HARTMANN_TABLE, fields, parent, main, _s.WAVEFRONT_SENSOR_TYPE_SHACK_HARTMANN) class AOTFITSTableWavefrontSensorsPyramid(AOTFITSSecondaryTable): def __init__(self, parent: AOTFITSFile, main: AOTFITSTableWavefrontSensors): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.n_sides = AOTField(WAVEFRONT_SENSOR_PYRAMID_N_SIDES, INTEGER_FORMAT, UNIT_COUNT, mandatory=True) self.modulation = AOTField(WAVEFRONT_SENSOR_PYRAMID_MODULATION, FLOAT_FORMAT, UNIT_METERS) fields = AOTFieldList(self.uid, self.n_sides, self.modulation) super().__init__(WAVEFRONT_SENSORS_PYRAMID_TABLE, fields, parent, main, _s.WAVEFRONT_SENSOR_TYPE_PYRAMID) class AOTFITSTableWavefrontCorrectors(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(WAVEFRONT_CORRECTOR_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=WAVEFRONT_CORRECTOR_TYPE_LIST) self.telescope_uid = AOTField(TELESCOPE_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=TELESCOPES_TABLE) self.n_valid_actuators = AOTField(WAVEFRONT_CORRECTOR_N_VALID_ACTUATORS, INTEGER_FORMAT, UNIT_COUNT) self.pupil_mask = AOTField(WAVEFRONT_CORRECTOR_PUPIL_MASK, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.tfz_num = AOTField(WAVEFRONT_CORRECTOR_TFZ_NUM, LIST_FORMAT, UNIT_DIMENSIONLESS) self.tfz_den = AOTField(WAVEFRONT_CORRECTOR_TFZ_DEN, LIST_FORMAT, UNIT_DIMENSIONLESS) self.transformation_matrix = AOTField(TRANSFORMATION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.aberration_uid = AOTField(ABERRATION_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=ABERRATIONS_TABLE) fields = AOTFieldList(self.uid, self.type, self.telescope_uid, self.n_valid_actuators, self.pupil_mask, self.tfz_num, self.tfz_den, self.transformation_matrix, self.aberration_uid) super().__init__(WAVEFRONT_CORRECTORS_TABLE, fields, parent) def verify_contents(self): # TODO ensure that n_valid_actuators is 2 for TT and 1 for LS self._ensure_has_data() class AOTFITSTableWavefrontCorrectorsDM(AOTFITSSecondaryTable): def __init__(self, parent: AOTFITSFile, main: AOTFITSTableWavefrontCorrectors): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.actuators_x = AOTField(WAVEFRONT_CORRECTOR_DM_ACTUATORS_X, LIST_FORMAT, UNIT_METERS) self.actuators_y = AOTField(WAVEFRONT_CORRECTOR_DM_ACTUATORS_Y, LIST_FORMAT, UNIT_METERS) self.influence_function = AOTField(WAVEFRONT_CORRECTOR_DM_INFLUENCE_FUNCTION, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.stroke = AOTField(WAVEFRONT_CORRECTOR_DM_STROKE, FLOAT_FORMAT, UNIT_METERS) fields = AOTFieldList(self.uid, self.actuators_x, self.actuators_y, self.influence_function, self.stroke) super().__init__(WAVEFRONT_CORRECTORS_DM_TABLE, fields, parent, main, _s.WAVEFRONT_CORRECTOR_TYPE_DM) def verify_contents(self): # TOOD check that the length is equal to N_VALID_ACTUATORS self._ensure_same_length(self.actuators_x, self.actuators_y) class AOTFITSTableLoops(AOTFITSTable): def __init__(self, parent: AOTFITSFile): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.type = AOTField(LOOPS_TYPE, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, allowed_list=LOOPS_TYPE_LIST) self.commanded_uid = AOTField(LOOPS_COMMANDED, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=WAVEFRONT_CORRECTORS_TABLE) self.time_uid = AOTField(TIME_REFERENCE, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=TIME_TABLE) self.status = AOTField(LOOPS_STATUS, STRING_FORMAT, UNIT_DIMENSIONLESS, allowed_list=LOOPS_STATUS_LIST) self.commands = AOTField(LOOPS_COMMANDS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.ref_commands = AOTField(LOOPS_REF_COMMANDS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.framerate = AOTField(LOOPS_FRAMERATE, FLOAT_FORMAT, UNIT_HERTZ) self.delay = AOTField(LOOPS_DELAY, FLOAT_FORMAT, UNIT_FRAME) self.time_filter_num = AOTField(LOOPS_TIME_FILTER_NUM, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.time_filter_den = AOTField(LOOPS_TIME_FILTER_DEN, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.type, self.commanded_uid, self.time_uid, self.status, self.commands, self.ref_commands, self.framerate, self.delay, self.time_filter_num, self.time_filter_den) super().__init__(LOOPS_TABLE, fields, parent) def verify_contents(self): self._ensure_has_data() class AOTFITSTableLoopsControl(AOTFITSSecondaryTable): def __init__(self, parent: AOTFITSFile, main: AOTFITSTableLoops): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.input_sensor_uid = AOTField(LOOPS_CONTROL_INPUT_SENSOR, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=WAVEFRONT_SENSORS_TABLE) self.modes = AOTField(LOOPS_CONTROL_MODES, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.modal_coefficients = AOTField(LOOPS_CONTROL_MODAL_COEFFICIENTS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.control_matrix = AOTField(LOOPS_CONTROL_CONTROL_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.measurements_to_modes = AOTField(LOOPS_CONTROL_MEASUREMENTS_TO_MODES, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.modes_to_commands = AOTField(LOOPS_CONTROL_MODES_TO_COMMANDS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.interaction_matrix = AOTField(LOOPS_CONTROL_INTERACTION_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.commands_to_modes = AOTField(LOOPS_CONTROL_COMMANDS_TO_MODES, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.modes_to_measurements = AOTField(LOOPS_CONTROL_MODES_TO_MEASUREMENTS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) self.residual_commands = AOTField(LOOPS_CONTROL_RESIDUAL_COMMANDS, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.input_sensor_uid, self.modes, self.modal_coefficients, self.control_matrix, self.measurements_to_modes, self.modes_to_commands, self.interaction_matrix, self.commands_to_modes, self.modes_to_measurements, self.residual_commands) super().__init__(LOOPS_CONTROL_TABLE, fields, parent, main, _s.LOOPS_TYPE_CONTROL) class AOTFITSTableLoopsOffload(AOTFITSSecondaryTable): def __init__(self, parent: AOTFITSFile, main: AOTFITSTableLoops): self.uid = AOTField(REFERENCE_UID, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, unique=True) self.input_corrector_uid = AOTField(LOOPS_OFFLOAD_INPUT_CORRECTOR, STRING_FORMAT, UNIT_DIMENSIONLESS, mandatory=True, reference=WAVEFRONT_CORRECTORS_TABLE) self.offload_matrix = AOTField(LOOPS_OFFLOAD_OFFLOAD_MATRIX, STRING_FORMAT, UNIT_DIMENSIONLESS, reference=IMAGE_REF) fields = AOTFieldList(self.uid, self.input_corrector_uid, self.offload_matrix) super().__init__(LOOPS_OFFLOAD_TABLE, fields, parent, main, _s.LOOPS_TYPE_OFFLOAD) def _fits_type_to_aot(fits_type: str) -> str: if fits_type in ['E', 'D']: return FLOAT_FORMAT if fits_type in ['B', 'I', 'J', 'K']: return INTEGER_FORMAT if re.fullmatch(r'\d*A', fits_type): return STRING_FORMAT if re.fullmatch(r'[QP][DE]\(\d*\)', fits_type): return LIST_FORMAT return '' def _convert_null_to_none(value, col: fits.Column): match _fits_type_to_aot(col.format): case _s.STRING_FORMAT: if value == '': value = None case _s.FLOAT_FORMAT: if np.isnan(value): value = None case _s.INTEGER_FORMAT: if col.null and value == col.null: value = None case _s.LIST_FORMAT: if value is None: value = [] value = [None if np.isnan(v) else v for v in value] case _: # This will trigger a warning later pass return value