Source code for aotpy.translators.gpao

"""
This module contains a class for translating data produced by ESO's GRAVITY+ AO system.
"""

import re
import datetime
from pathlib import Path

import numpy as np
from astropy.io import fits

import aotpy
from .eso import ESOTranslator


# TODO set image units


[docs] class GPAOTranslator(ESOTranslator): """Contains functions for translating telemetry data produced by ESO's GRAVITY+ AO system. Parameters ---------- path_ngs_loop Path to folder containing NGS loop data. path_ngs_pix Path to folder containing NGS pixel data. ut_number : {1, 2, 3, 4} Number of the UT that produced the data. """ def __init__(self, path_ngs_loop: str, path_ngs_pix: str, ut_number: int): self._ut_number = ut_number # TODO support LGS modes """ GPAO has 4 possible modes: NGS_VIS, NGS_IR, LGS_VIS, LGS_IR. Depending on the mode we are using different pipeline and different SH configurations All LGS modes have a 30x30 LGS SH + one NGS SH (depending on VIS/IR see below) All IR modes have one 9x9 NGS SH (HOIR/LOIR pipelines both use the CIAO WFS) VIS modes either have a 40x40 NGS SH (if NGS mode, HO pipeline) or 4x4 (if LGS mode, LO pipeline) """ # Get the NGS pipeline, should be HO/HOIR/LO/LOIR pipeline = re.match(r".*\d{8}T\d{6}-(?P<Pipeline>.*)LoopRecorder", path_ngs_loop).group("Pipeline") path_ngs_loop = Path(path_ngs_loop) # TODO ao_mode differs if LGS self.system = aotpy.AOSystem(name='GPAO', ao_mode='SCAO') self.system.main_telescope = aotpy.MainTelescope( uid=f'ESO VLT UT{ut_number}', enclosing_diameter=8.2, inscribed_diameter=8.2 ) act_valid_map = fits.getdata(path_ngs_loop / 'RTC.ACT_VALID_MAP.fits')[0] dm = aotpy.DeformableMirror( uid='DM', telescope=self.system.main_telescope, n_valid_actuators=act_valid_map.size, ) self.system.wavefront_correctors.append(dm) ngs_loop_frame = fits.getdata(path_ngs_loop / f'{path_ngs_loop.name}.fits', extname=f'{pipeline}LoopFrame') ngs_time = self._get_time_from_table(f"{pipeline} Loop Time", ngs_loop_frame) self.system.date_beginning = datetime.datetime.fromtimestamp(ngs_time.timestamps[0], datetime.UTC) self.system.date_end = datetime.datetime.fromtimestamp(ngs_time.timestamps[-1], datetime.UTC) ngs = aotpy.NaturalGuideStar('NGS') self.system.sources.append(ngs) subap_valid_map = fits.getdata(path_ngs_loop / f'{pipeline}RecnOptimiser.SUBAP_VALID_MAP.fits')[0] gradients = self._stack_slopes(ngs_loop_frame['Gradients'], subap_axis=1)[:, :, subap_valid_map] ngs_wfs = aotpy.ShackHartmann( uid='NGS WFS', source=ngs, n_valid_subapertures=subap_valid_map.size, measurements=aotpy.Image('NGS Gradients', gradients), ref_measurements=self._image_from_object(path_ngs_loop, f'{pipeline}Acq.DET1.REFSLP_WITH_OFFSETS', ngs_time, subap_axis=1, subap_mask=subap_valid_map), subaperture_intensities=aotpy.Image('NGS Intensities', ngs_loop_frame['Intensities'][:, subap_valid_map]) ) self.system.wavefront_sensors.append(ngs_wfs) path_ngs_pix = Path(path_ngs_pix) ngs_pix_frame = fits.getdata(path_ngs_pix / f'{path_ngs_pix.name}.fits', extname=f'{pipeline}PixelFrame') ngs_wfs.detector = aotpy.Detector( uid='DET1', pixel_intensities=aotpy.Image(name=f'{pipeline} Pixels', data=self._get_pixel_data_from_table(ngs_pix_frame), time=aotpy.Time(f'{pipeline} Pixel Time', frame_numbers=ngs_pix_frame['FrameCounter'].tolist())) ) # ngs_wfs.subaperture_size = ngs_wfs.detector.pixel_intensities.data.shape[0] // ngs_wfs.subaperture_mask.data.shape[0] loop_freq = fits.getheader(path_ngs_loop / f'{pipeline}Ctr.CFG.DYNAMIC.fits')['LOOP_FREQ'] self.system.loops.append(aotpy.ControlLoop( uid=f'{pipeline} loop', input_sensor=ngs_wfs, commanded_corrector=dm, commands=aotpy.Image('DM_Positions', ngs_loop_frame['DM_Positions'][:, act_valid_map]), ref_commands=self._image_from_object(path_ngs_loop, 'RTC.ACT_POS_REF_MAP_WITH_OFFSETS', ngs_time, act_axis=0, act_mask=act_valid_map), time=ngs_time, framerate=loop_freq, time_filter_num=aotpy.Image(f'{pipeline}Ctr.A_TERMS', fits.getdata(path_ngs_loop / f'{pipeline}Ctr.A_TERMS.fits').T), time_filter_den=aotpy.Image(f'{pipeline}Ctr.B_TERMS', fits.getdata(path_ngs_loop / f'{pipeline}Ctr.B_TERMS.fits').T), control_matrix=self._image_from_object(path_ngs_loop, f'{pipeline}Recn.REC1.HOCM', ngs_time, subap_axis=1, subap_mask=subap_valid_map, act_axis=0, act_mask=act_valid_map), measurements_to_modes=self._image_from_object(path_ngs_loop, 'CLMatrixOptimiser.S2M', ngs_time, subap_axis=1, subap_mask=subap_valid_map), modes_to_commands=self._image_from_object(path_ngs_loop, 'CLMatrixOptimiser.M2V', ngs_time, act_axis=0, act_mask=act_valid_map), interaction_matrix=self._image_from_object(path_ngs_loop, f'{pipeline}SynthIMComp.IM', ngs_time, subap_axis=0, subap_mask=subap_valid_map, act_axis=1, act_mask=act_valid_map), commands_to_modes=self._image_from_object(path_ngs_loop, 'CLMatrixOptimiser.V2M', ngs_time, act_axis=1, act_mask=act_valid_map), modes_to_measurements=self._image_from_object(path_ngs_loop, 'CLMatrixOptimiser.M2S', ngs_time, subap_axis=0, subap_mask=subap_valid_map), )) m2 = aotpy.TipTiltMirror( uid='M2', telescope=self.system.main_telescope ) self.system.wavefront_correctors.append(m2) ttol_pos = ngs_loop_frame['TTOL_Positions'] ttol_updated = ngs_loop_frame['TTOL_OUpdated'] == 1 ttol_updated[0] = True # always keep the first position ttol_pos = ttol_pos[ttol_updated] self.system.loops.append(aotpy.OffloadLoop( uid=f'TTOL (M2 tip-tilt offload)', input_corrector=dm, commanded_corrector=m2, commands=aotpy.Image('TTOL_Positions', ttol_pos), time=aotpy.Time('TTOL time', frame_numbers=ngs_loop_frame['FrameCounter'][ttol_updated].tolist()) )) avc_term = ngs_loop_frame['AVC_Term'] # Only add AVC loop if it was enabled if not np.all(avc_term == 0): self.system.loops.append(aotpy.ControlLoop( uid=f'{pipeline} AVC loop', input_sensor=ngs_wfs, commanded_corrector=dm, commands=aotpy.Image('AVC_Term', avc_term), time=ngs_time, framerate=loop_freq )) @staticmethod def _get_pixscale_from_pipeline(pipeline): return {'HO': 0.42, 'LO': 0.21, 'HOIR': 0.51, 'LOIR': 0.51, 'LGS': 0.8}[pipeline] @staticmethod def _get_subapsize_from_pipeline(pipeline): return {'HO': 6, 'LO': 12, 'HOIR': 8, 'LOIR': 8, 'LGS': 8}[pipeline] @staticmethod def _get_wavelength_from_pipeline(pipeline): return {'HO': 0.850, 'LO': 0.850, 'HOIR': 2.100, 'LOIR': 2.100, 'LGS': 0.589}[pipeline] def _get_eso_telescope_name(self) -> str: return f'ESO-VLT-U{self._ut_number}' def _get_eso_ao_name(self) -> str: return 'GPAO' def _get_run_id(self) -> str: # It should be something like '60.A-9278(F)' raise NotImplementedError("Run ID not yet decided") def _get_chip_id(self) -> str: return f'GPAO{self._ut_number}'