Source code for aotpy.translators.galacsi

"""
This module contains a class for translating data produced by ESO's GALACSI system.
"""

import importlib.resources
import warnings
import datetime
from pathlib import Path

import numpy as np
from astropy.io import fits

import aotpy
from aotpy.io.fits import image_from_fits_file
from .eso import ESOTranslator


# TODO set image units


[docs] class GALACSITranslator(ESOTranslator): """Contains functions for translating telemetry data produced by ESO's GALACSI system. Parameters ---------- path_lgs Path to folder containing LGS data (LGSAcq, LGSCtr, LGSRecn, JitCtr, JitRecnOptimiser, RTC). path_ir Path to folder containing IR data (IRAcq, IRCtr, IRLoopMonitor). path_pix Path to folder containing pixel data. """ def __init__(self, path_lgs: str, path_ir: str, path_pix: str): self.system = aotpy.AOSystem(ao_mode='LTAO', name='GALACSI') self.system.main_telescope = aotpy.MainTelescope( uid='ESO VLT UT4', enclosing_diameter=8.2, inscribed_diameter=8.2 ) self._handle_lgs_data(path_lgs) self._handle_ngs_data(path_ir, path_pix) def _handle_lgs_data(self, path_lgs): path_lgs = Path(path_lgs) lgs_loop_frame = fits.getdata(path_lgs / f'{path_lgs.name}.fits', extname='LGSLoopFrame') self.dsm_valid = fits.getdata(path_lgs / 'RTC.USED_ACT_MAP.fits')[0] - 1 # We have to subtract one because the array uses one-based indexing unlike Python self.dsm = aotpy.DeformableMirror( uid='DSM', telescope=self.system.main_telescope, n_valid_actuators=self.dsm_valid.size, ) self.system.wavefront_correctors.append(self.dsm) lgs_timestamps = lgs_loop_frame['Seconds'] + lgs_loop_frame['USeconds'] / 1.e6 self.system.date_beginning = datetime.datetime.fromtimestamp(lgs_timestamps[0], datetime.UTC) self.system.date_end = datetime.datetime.fromtimestamp(lgs_timestamps[-1], datetime.UTC) lgs_frame_numbers = lgs_loop_frame['FrameCounter'] lgs_time = aotpy.Time('LGS Loop Time', timestamps=lgs_timestamps.tolist(), frame_numbers=lgs_frame_numbers.tolist()) galacsi_data_path = importlib.resources.files('aotpy.data.GALACSI') with importlib.resources.as_file(galacsi_data_path / 'subap.fits') as p: subaperture_mask = image_from_fits_file(p, name='LGS WFS SUBAPERTURE MASK') n_valid_subapertures = np.count_nonzero(subaperture_mask.data != -1) dsm_positions = aotpy.Image('DSM_positions', lgs_loop_frame['DSM_Positions'][:, self.dsm_valid]) m2c = self._image_from_eso_file(path_lgs / 'LGSCtr.ACT_POS_MODAL_PROJECTION.fits') lgs_tfz_num = aotpy.Image('LGSCtr.A_TERMS', fits.getdata(path_lgs / 'LGSCtr.A_TERMS.fits').T) lgs_tfz_den = aotpy.Image('LGSCtr.B_TERMS', fits.getdata(path_lgs / 'LGSCtr.B_TERMS.fits').T) jit_tfz_num = fits.getdata(path_lgs / 'JitCtr.A_TERMS.fits').T jit_tfz_den = fits.getdata(path_lgs / 'JitCtr.B_TERMS.fits').T jit_ref = fits.getdata(path_lgs / 'JitCtr.ACT_POS_REF_MAP_WITH_OFFSETS.fits')[:, 0] off_ref = fits.getdata(path_lgs / 'JitCtr.OACT_POS_REF_MAP.fits')[0, :] proj_map = fits.getdata(path_lgs / f'JitCtr.PROJ_MAP_SCALED.fits') im_list = np.split(fits.getdata(path_lgs / f'RTC.IMref4Atm.fits')[:, self.dsm_valid], 4, axis=0) for i in range(1, 5): llt = aotpy.LaserLaunchTelescope(f'LLT{i}') lgs = aotpy.SodiumLaserGuideStar(uid=f'LGS{i}', laser_launch_telescope=llt) self.system.sources.append(lgs) gradients = self._stack_slopes(lgs_loop_frame[f'WFS{i}_Gradients'], subap_axis=1) reference = self._stack_slopes(fits.getdata(path_lgs / f'LGSAcq.DET{i}.REFSLP_WITH_OFFSETS.fits'), subap_axis=1)[0] wfs = aotpy.ShackHartmann( uid=f'LGS WFS{i}', source=lgs, n_valid_subapertures=n_valid_subapertures, measurements=aotpy.Image(f'WFS{i}_Gradients', gradients), ref_measurements=aotpy.Image(f'LGSAcq.DET{i}.REFSLP_WITH_OFFSETS', reference), subaperture_mask=subaperture_mask, subaperture_intensities=aotpy.Image(f'WFS{i}_Intensities', lgs_loop_frame[f'WFS{i}_Intensities']) ) wfs.detector = aotpy.Detector( uid=f'LGS DET{i}', dark=self._image_from_eso_file(path_lgs / f'LGSAcq.DET{i}.DARK.fits'), weight_map=self._image_from_eso_file(path_lgs / f'LGSAcq.DET{i}.WEIGHT.fits') ) self.system.wavefront_sensors.append(wfs) cm = self._stack_slopes(fits.getdata(path_lgs / f'LGSRecn.REC{i}.HOCM.fits')[self.dsm_valid], subap_axis=1) im = self._stack_slopes(im_list[i - 1], subap_axis=0) self.system.loops.append(aotpy.ControlLoop( uid=f'High-order loop {i}', input_sensor=wfs, commanded_corrector=self.dsm, commands=dsm_positions, time=lgs_time, framerate=1000, time_filter_num=lgs_tfz_num, time_filter_den=lgs_tfz_den, control_matrix=aotpy.Image(f'LGSRecn.REC{i}.HOCM', cm), interaction_matrix=aotpy.Image(f'Interaction_Matrix_LGS_WFS{i}', im) )) jit = aotpy.TipTiltMirror( uid=f'Jitter{i}', telescope=llt ) cm = self._stack_slopes(fits.getdata(path_lgs / f'JitRecnOptimiser.JitCM{i}.fits'), subap_axis=1) self.system.loops.append(aotpy.ControlLoop( uid=f'Jitter loop {i}', input_sensor=wfs, commanded_corrector=jit, commands=aotpy.Image(f'Jitter{i}_Positions', lgs_loop_frame[f'Jitter{i}_Positions']), ref_commands=aotpy.Image(f'Jit{i}Ctr.ACT_POS_REF_MAP_WITH_OFFSETS', jit_ref[(i - 1) * 2: i * 2]), time=lgs_time, framerate=1000, time_filter_num=aotpy.Image(f'Jit{i}Ctr.A_TERMS', jit_tfz_num[i - 1:i + 1, :]), time_filter_den=aotpy.Image(f'Jit{i}Ctr.B_TERMS', jit_tfz_den[i - 1:i + 1, :]), control_matrix=aotpy.Image(f'JitRecnOptimiser.JitCM{i}', cm), modes_to_commands=m2c )) fsm = aotpy.TipTiltMirror( uid=f'Field Steering Mirror {i}', telescope=llt ) self.system.loops.append(aotpy.OffloadLoop( uid=f'Jitter Offload loop {i}', input_corrector=jit, commanded_corrector=fsm, commands=aotpy.Image(f'Jitter{i}_Offload', lgs_loop_frame[f'Jitter{i}_Offload']), ref_commands=aotpy.Image(f'Jit{i}Ctr.OACT_POS_REF_MAP', off_ref[(i - 1) * 2: i * 2]), time=lgs_time, offload_matrix=aotpy.Image(f'Jitter{i}_Offload_Matrix', proj_map[(i - 1) * 2:i * 2, (i - 1) * 2:i * 2]) )) self.system.wavefront_correctors.extend([jit, fsm]) def _handle_ngs_data(self, path_ir, path_pix): path_ir = Path(path_ir) ir_loop_frame = fits.getdata(path_ir / f'{path_ir.name}.fits', extname='IRLoopFrame') path_pix = Path(path_pix) pix_loop_frame = fits.getdata(path_pix / f'{path_pix.name}.fits', extname='IRPixelFrame') ngs_timestamps = ir_loop_frame['Seconds'] + ir_loop_frame['USeconds'] / 1.e6 if np.all(ngs_timestamps == 0): # The file has no timestamps ngs_timestamps_list = [] else: ngs_timestamps_list = ngs_timestamps.tolist() ho_frame_numbers = ir_loop_frame['HOFrameCounter'] ir_time = aotpy.Time('NGS Loop Time', timestamps=ngs_timestamps_list, frame_numbers=ho_frame_numbers.tolist()) ngs = aotpy.NaturalGuideStar('NGS') self.system.sources.append(ngs) gradients = self._stack_slopes(ir_loop_frame['WFS_Gradients'], subap_axis=1) reference = self._stack_slopes(fits.getdata(path_ir / 'IRAcq.DET1.REFSLP_WITH_OFFSETS.fits'), subap_axis=1)[0] ngs_wfs = aotpy.ShackHartmann( uid='NGS WFS1', n_valid_subapertures=4, # All subapertures are valid subaperture_mask=aotpy.Image('NGS WFS SUBAPERTURE MASK', np.array([[1, 3], [2, 4]])), source=ngs, measurements=aotpy.Image('NGS WFS_Gradients', gradients), ref_measurements=aotpy.Image('IRAcq.DET1.REFSLP_WITH_OFFSETS', reference), subaperture_intensities=aotpy.Image('WFS_Intensities', ir_loop_frame['WFS_Intensities']) ) self.system.wavefront_sensors.append(ngs_wfs) # Find the indexes where the counter for pixels matches the counter for ngs # Assumes the frames for pixel are contained in the frames for ngs ir_fc = ir_loop_frame['FrameCounter'] pix_fc = pix_loop_frame['FrameCounter'] if pix_fc[0] < ir_fc[0] or pix_fc[-1] > ir_fc[-1]: warnings.warn('Pixel frame counter not contained in IR frame counter, pixel time data may be incorrect.') pix_time_mask = np.searchsorted(ir_fc, pix_fc) pix_timestamps = ngs_timestamps[pix_time_mask] if np.all(pix_timestamps == 0): # The file has no timestamps pix_timestamps_list = [] else: pix_timestamps_list = pix_timestamps.tolist() pix_time = aotpy.Time('Pixel Time', timestamps=pix_timestamps_list, frame_numbers=ho_frame_numbers[pix_time_mask].tolist()) ngs_wfs.detector = aotpy.Detector( uid='NGS DET1', dark=self._image_from_eso_file(path_ir / 'IRAcq.DET1.DARK.fits'), weight_map=self._image_from_eso_file(path_ir / 'IRAcq.DET1.WEIGHT.fits'), pixel_intensities=aotpy.Image(name='NGS Pixels', data=self._get_pixel_data_from_table(pix_loop_frame), time=pix_time) ) s2m = self._stack_slopes(fits.getdata(path_ir / 'IRCtr.SENSOR_2_MODES.fits'), subap_axis=1) m2c = fits.getdata(path_ir / 'IRCtr.MODES_2_ACT.fits')[self.dsm_valid] self.system.loops.append(aotpy.ControlLoop( uid='Low-order loop', input_sensor=ngs_wfs, commanded_corrector=self.dsm, commands=aotpy.Image('LO_Positions', ir_loop_frame['LO_Positions'][:, self.dsm_valid]), measurements_to_modes=aotpy.Image('IRCtr.SENSOR_2_MODES', s2m), modes_to_commands=aotpy.Image('IRCtr.MODES_2_ACT', m2c), time=ir_time, time_filter_num=aotpy.Image('IRCtr.A_TERMS', fits.getdata(path_ir / 'IRCtr.A_TERMS.fits').T), time_filter_den=aotpy.Image('IRCtr.B_TERMS', fits.getdata(path_ir / 'IRCtr.B_TERMS.fits').T) )) def _get_eso_telescope_name(self) -> str: return 'ESO-VLT-U4' def _get_eso_ao_name(self) -> str: return 'GALACSI' def _get_run_id(self) -> str: return '60.A-9278(B)' def _get_chip_id(self) -> str: return f'GALACSI'