"""
This module contains a class for translating data produced by ESO's ERIS system.
"""
import importlib.resources
import warnings
import datetime
from pathlib import Path
import numpy as np
from astropy.io import fits
import aotpy
from aotpy.io.fits import image_from_fits_file
from .eso import ESOTranslator
# TODO set image units
[docs]
class ERISTranslator(ESOTranslator):
"""Contains functions for translating telemetry data produced by ESO's ERIS system.
Parameters
----------
path
Path to folder containing all system data.
"""
def __init__(self, path):
self._path = Path(path)
if (ho_loop_file := self._find_bintable_files('hoLoopData')) is not None \
and (ho_pixel_file := self._find_bintable_files('hoPixelData')) is not None:
mode = 'SCAO'
handling_function = self._handle_ngs_data
function_args = (ho_loop_file, ho_pixel_file)
elif (lgs_loop_file := self._find_bintable_files('lgsLoopData')) is not None \
and (lo_loop_file := self._find_bintable_files('loLoopData')) is not None \
and (lgs_pixel_file := self._find_bintable_files('lgsPixelData')) is not None \
and (lo_pixel_file := self._find_bintable_files('loPixelData')) is not None:
mode = 'LTAO'
handling_function = self._handle_lgs_data
function_args = (lgs_loop_file, lgs_pixel_file, lo_loop_file, lo_pixel_file)
else:
raise ValueError('Path does not contain necessary telemetry data.')
self.system = aotpy.AOSystem(name='ERIS AO', ao_mode=mode)
self.system.main_telescope = aotpy.MainTelescope(
uid='ESO VLT UT4',
enclosing_diameter=8.2,
inscribed_diameter=8.2
)
self.dsm = aotpy.DeformableMirror(
uid='DSM',
telescope=self.system.main_telescope,
n_valid_actuators=1156,
)
self.system.wavefront_correctors.append(self.dsm)
handling_function(*function_args)
def _find_bintable_files(self, name):
file = list(self._path.glob(f'{name}_*.fits'))
if (size := len(file)) == 0:
return None
else:
file = file[0]
if size > 1:
warnings.warn(f"Found more than one '{name}' file. Picking '{file.name}'.")
return file
def _handle_lgs_data(self, lgs_loop_file, lgs_pixel_file, lo_loop_file, lo_pixel_file):
lgs_loop_frame = fits.getdata(lgs_loop_file, extname='LGSLoopFrame')
lgs_pix_frame = fits.getdata(lgs_pixel_file, extname='LGSPixelFrame')
lgs_timestamps = lgs_loop_frame['Seconds'] + lgs_loop_frame['USeconds'] / 1.e6
self.system.date_beginning = datetime.datetime.fromtimestamp(lgs_timestamps[0], datetime.UTC)
self.system.date_end = datetime.datetime.fromtimestamp(lgs_timestamps[-1], datetime.UTC)
lgs_frame_numbers = lgs_loop_frame['FrameCounter']
lgs_time = aotpy.Time('LGS Loop Time', timestamps=lgs_timestamps.tolist(),
frame_numbers=lgs_frame_numbers.tolist())
active_laser = fits.getheader(self._path / 'JitCtr.CFG.DYNAMIC.fits')['ACTIVE_JITTER']
llt = aotpy.LaserLaunchTelescope(f'LLT{active_laser}')
lgs = aotpy.SodiumLaserGuideStar(uid='LGS', laser_launch_telescope=llt)
self.system.sources.append(lgs)
eris_data_path = importlib.resources.files('aotpy.data.ERIS')
with importlib.resources.as_file(eris_data_path / 'ho_subap.fits') as p:
subaperture_mask = image_from_fits_file(p, name='LGS WFS SUBAPERTURE MASK')
n_valid_subapertures = np.count_nonzero(subaperture_mask.data != -1)
reference = self._stack_slopes(fits.getdata(self._path / 'LGSAcq.DET1.REFSLP_WITH_OFFSETS.fits'),
subap_axis=1)[0]
lgs_wfs = aotpy.ShackHartmann(
uid='LGS WFS',
source=lgs,
n_valid_subapertures=n_valid_subapertures,
measurements=aotpy.Image('LGS Gradients', self._stack_slopes(lgs_loop_frame['Gradients'], subap_axis=1)),
ref_measurements=aotpy.Image('LGSAcq.DET1.REFSLP_WITH_OFFSETS', reference),
subaperture_mask=subaperture_mask,
mask_offsets=[aotpy.Coordinates(0, 0)],
subaperture_intensities=aotpy.Image('LGS Intensities', lgs_loop_frame['Intensities'])
)
self.system.wavefront_sensors.append(lgs_wfs)
lgs_wfs.detector = aotpy.Detector(
uid='LGS DET1',
dark=self._image_from_eso_file(self._path / 'LGSAcq.DET1.DARK.fits'),
weight_map=self._image_from_eso_file(self._path / 'LGSAcq.DET1.WEIGHT.fits'),
sky_background=self._image_from_eso_file(self._path / 'LGSAcq.DET1.BACKGROUND.fits'),
pixel_intensities=aotpy.Image(name='LGS Pixels',
data=self._get_pixel_data_from_table(lgs_pix_frame),
time=aotpy.Time('LGS Pixel Time',
frame_numbers=lgs_pix_frame['FrameCounter'].tolist()))
)
lgs_wfs.subaperture_size = \
lgs_wfs.detector.pixel_intensities.data.shape[0] // lgs_wfs.subaperture_mask.data.shape[0]
cm = self._stack_slopes(fits.getdata(self._path / f'LGSRecn.REC1.HOCM{active_laser}.fits'), subap_axis=1)
s2m = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.S2M.fits'), subap_axis=1)
m2s = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.M2S.fits'), subap_axis=0)
im = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.SYNTHETIC_LGS_IM.fits'), subap_axis=0)
lgs_freq = fits.getheader(self._path / 'LGSDet.CFG.DYNAMIC.fits')['FREQ']
self.system.loops.append(aotpy.ControlLoop(
uid='High-order loop',
input_sensor=lgs_wfs,
commanded_corrector=self.dsm,
commands=aotpy.Image('DSM_positions', lgs_loop_frame['DSM_Positions']),
ref_commands=aotpy.Image('LGSCtr.ACT_POS_REF_MAP_WITH_OFFSETS',
fits.getdata(self._path / 'LGSCtr.ACT_POS_REF_MAP_WITH_OFFSETS.fits')[:, 0]),
time=lgs_time,
framerate=lgs_freq,
time_filter_num=aotpy.Image('LGSCtr.A_TERMS', fits.getdata(self._path / 'LGSCtr.A_TERMS.fits').T),
time_filter_den=aotpy.Image('LGSCtr.B_TERMS', fits.getdata(self._path / 'LGSCtr.B_TERMS.fits').T),
control_matrix=aotpy.Image(f'LGSRecn.REC1.HOCM{active_laser}', cm),
measurements_to_modes=aotpy.Image('CLMatrixOptimiser.S2M', s2m),
modes_to_commands=self._image_from_eso_file(self._path / 'CLMatrixOptimiser.M2V.fits'),
commands_to_modes=self._image_from_eso_file(self._path / 'CLMatrixOptimiser.V2M.fits'),
modes_to_measurements=aotpy.Image('CLMatrixOptimiser.M2S', m2s),
interaction_matrix=aotpy.Image('CLMatrixOptimiser.SYNTHETIC_LGS_IM', im),
))
jit = aotpy.TipTiltMirror(
uid='Jitter',
telescope=llt
)
cm = self._stack_slopes(fits.getdata(self._path / 'JitRecnOptimiser.JitCM.fits'), subap_axis=1)
im = self._stack_slopes(fits.getdata(self._path / 'JitRecnCalibrat.IM.fits'), subap_axis=0)
# jit_ref = fits.getdata(path_lgs / 'JitCtr.ACT_POS_REF_MAP_WITH_OFFSETS.fits')[:, 0]
self.system.loops.append(aotpy.ControlLoop(
uid='Jitter loop',
input_sensor=lgs_wfs,
commanded_corrector=jit,
commands=aotpy.Image('Jitter_Positions', lgs_loop_frame['Jitter_Positions']),
# ref_commands=aotpy.Image(f'Jit{i}Ctr.ACT_POS_REF_MAP_WITH_OFFSETS', jit_ref[(i - 1) * 2: i * 2]),
time=lgs_time,
framerate=lgs_freq,
control_matrix=aotpy.Image('JitRecnOptimiser.JitCM', cm),
interaction_matrix=aotpy.Image('JitRecnCalibrat.IM', im)
))
fsm = aotpy.TipTiltMirror(
uid='Field Steering Mirror',
telescope=llt
)
# off_ref = fits.getdata(path_lgs / 'JitCtr.OACT_POS_REF_MAP.fits')[0, :]
# proj_map = fits.getdata(path_lgs / f'JitCtr.PROJ_MAP_SCALED.fits')
self.system.loops.append(aotpy.OffloadLoop(
uid='Jitter Offload loop',
input_corrector=jit,
commanded_corrector=fsm,
commands=aotpy.Image('Jitter_Offload', lgs_loop_frame['Jitter_Offload']),
# ref_commands=aotpy.Image(f'Jit{i}Ctr.OACT_POS_REF_MAP', off_ref[(i - 1) * 2: i * 2]),
time=lgs_time,
framerate=lgs_freq,
# offload_matrix=aotpy.Image(f'Jitter{i}_Offload_Matrix', proj_map[(i - 1) * 2:i * 2, (i - 1) * 2:i * 2])
))
self.system.wavefront_correctors.extend([jit, fsm])
ngs = aotpy.NaturalGuideStar(uid='NGS')
self.system.sources.append(ngs)
lo_loop_frame = fits.getdata(lo_loop_file, extname='LOLoopFrame')
lo_pix_frame = fits.getdata(lo_pixel_file, extname='LOPixelFrame')
lo_timestamps = lo_loop_frame['Seconds'] + lo_loop_frame['USeconds'] / 1.e6
if np.all(lo_timestamps == 0):
# The file has no timestamps
lo_timestamps_list = []
else:
lo_timestamps_list = lo_timestamps.tolist()
ho_frame_numbers = lo_loop_frame['HO_FrameCounter']
lo_time = aotpy.Time('LO Loop Time', timestamps=lo_timestamps_list, frame_numbers=ho_frame_numbers.tolist())
with importlib.resources.as_file(eris_data_path / 'lo_subap.fits') as p:
subaperture_mask = image_from_fits_file(p, name='LO WFS SUBAPERTURE MASK')
n_valid_subapertures = np.count_nonzero(subaperture_mask.data != -1)
reference = self._stack_slopes(fits.getdata(self._path / 'LOAcq.DET1.REFSLP_WITH_OFFSETS.fits'),
subap_axis=1)[0]
lo_wfs = aotpy.ShackHartmann(
uid='LO WFS',
source=ngs,
n_valid_subapertures=n_valid_subapertures,
measurements=aotpy.Image('LO Gradients', self._stack_slopes(lo_loop_frame['Gradients'], subap_axis=1)),
ref_measurements=aotpy.Image('LOAcq.DET1.REFSLP_WITH_OFFSETS', reference),
subaperture_mask=subaperture_mask,
mask_offsets=[aotpy.Coordinates(0, 0)],
subaperture_intensities=aotpy.Image('LO Intensities', lo_loop_frame['Intensities'])
)
lo_pix_fc = lo_pix_frame['FrameCounter']
aux_fc = np.full_like(lo_pix_fc, -1, dtype=np.int32)
x = 0
for i, fc_loop in enumerate(lo_loop_frame['FrameCounter']):
for j, fc_pix in enumerate(lo_pix_fc[x:]):
if fc_pix > fc_loop:
# we're already past the place
x += j
break
if fc_pix == fc_loop:
x += j
aux_fc[x] = i
break
else:
break
where = np.where(aux_fc != -1)
mask = aux_fc[where]
masked_lgs = lgs_frame_numbers[mask]
step = int((masked_lgs[-1] - masked_lgs[0]) / (masked_lgs.size - 1))
aux_fc[where] = masked_lgs
first = where[0][0]
diff = 0
while True:
diff += 1
cur = first - diff
if cur < 0:
break
aux_fc[cur] = masked_lgs[0] - diff * step
last = where[0][-1]
diff = 0
while True:
diff += 1
cur = last + diff
if cur > aux_fc.size - 1:
break
aux_fc[cur] = masked_lgs[-1] + diff * step
lo_wfs.detector = aotpy.Detector(
uid='LO DET1',
dark=self._image_from_eso_file(self._path / 'LOAcq.DET1.DARK.fits'),
weight_map=self._image_from_eso_file(self._path / 'LOAcq.DET1.WEIGHT.fits'),
sky_background=self._image_from_eso_file(self._path / 'LOAcq.DET1.BACKGROUND.fits'),
pixel_intensities=aotpy.Image(name='LO Pixels',
data=self._get_pixel_data_from_table(lo_pix_frame),
time=aotpy.Time('LO Pixel Time',
frame_numbers=aux_fc.tolist()))
)
self.system.wavefront_sensors.append(lo_wfs)
lo_wfs.subaperture_size = \
lo_wfs.detector.pixel_intensities.data.shape[0] // lo_wfs.subaperture_mask.data.shape[0]
s2m = self._stack_slopes(fits.getdata(self._path / 'LOCtr.SENSOR_2_MODES.fits'), subap_axis=1)
self.system.loops.append(aotpy.ControlLoop(
uid='Low-order loop',
input_sensor=lo_wfs,
commanded_corrector=self.dsm,
commands=aotpy.Image('LO_DSM_positions', lo_loop_frame['LO_DSM_positions']),
modal_coefficients=aotpy.Image('LO_Modal', lo_loop_frame['LO_Modal']),
# ref_commands=aotpy.Image('LGSCtr.ACT_POS_REF_MAP_WITH_OFFSETS',
# fits.getdata(path_lgs / 'LGSCtr.ACT_POS_REF_MAP_WITH_OFFSETS.fits')[:, 0]),
time=lo_time,
framerate=fits.getheader(self._path / 'LODet.CFG.DYNAMIC.fits')['FREQ'],
time_filter_num=aotpy.Image('LOCtr.A_TERMS', fits.getdata(self._path / 'LOCtr.A_TERMS.fits').T),
time_filter_den=aotpy.Image('LOCtr.B_TERMS', fits.getdata(self._path / 'LOCtr.B_TERMS.fits').T),
measurements_to_modes=aotpy.Image('LOCtr.SENSOR_2_MODES', s2m),
))
trombone = aotpy.LinearStage(
uid='Trombone',
telescope=self.system.main_telescope
)
self.system.wavefront_correctors.append(trombone)
s2m = self._stack_slopes(fits.getdata(self._path / 'TruthCtr.SENSOR_2_MODES.fits'), subap_axis=1)
self.system.loops.append(aotpy.ControlLoop(
uid='Truth loop',
input_sensor=lo_wfs,
commanded_corrector=trombone,
time_filter_num=aotpy.Image('TruthCtr.A_TERMS', fits.getdata(self._path / 'TruthCtr.A_TERMS.fits').T),
time_filter_den=aotpy.Image('TruthCtr.B_TERMS', fits.getdata(self._path / 'TruthCtr.B_TERMS.fits').T),
measurements_to_modes=aotpy.Image('TruthCtr.SENSOR_2_MODES', s2m),
))
def _handle_ngs_data(self, ho_loop_file, ho_pixel_file):
ho_loop_frame = fits.getdata(ho_loop_file, extname='HOLoopFrame')
ho_pix_frame = fits.getdata(ho_pixel_file, extname='HOPixelFrame')
ho_timestamps = ho_loop_frame['Seconds'] + ho_loop_frame['USeconds'] / 1.e6
self.system.date_beginning = datetime.datetime.fromtimestamp(ho_timestamps[0], datetime.UTC)
self.system.date_end = datetime.datetime.fromtimestamp(ho_timestamps[-1], datetime.UTC)
ho_frame_numbers = ho_loop_frame['FrameCounter']
ho_time = aotpy.Time('HO Loop Time', timestamps=ho_timestamps.tolist(),
frame_numbers=ho_frame_numbers.tolist())
ngs = aotpy.NaturalGuideStar('NGS')
self.system.sources.append(ngs)
eris_data_path = importlib.resources.files('aotpy.data.ERIS')
with importlib.resources.as_file(eris_data_path / 'ho_subap.fits') as p:
subaperture_mask = image_from_fits_file(p, name='HO WFS SUBAPERTURE MASK')
n_valid_subapertures = np.count_nonzero(subaperture_mask.data != -1)
reference = self._stack_slopes(fits.getdata(self._path / 'HOAcq.DET1.REFSLP_WITH_OFFSETS.fits'),
subap_axis=1)[0]
ho_wfs = aotpy.ShackHartmann(
uid='HO WFS',
source=ngs,
n_valid_subapertures=n_valid_subapertures,
measurements=aotpy.Image('Gradients', self._stack_slopes(ho_loop_frame['Gradients'], subap_axis=1)),
ref_measurements=aotpy.Image('HOAcq.DET1.REFSLP_WITH_OFFSETS', reference),
subaperture_mask=subaperture_mask,
mask_offsets=[aotpy.Coordinates(0, 0)],
subaperture_intensities=aotpy.Image('Intensities', ho_loop_frame['Intensities'])
)
self.system.wavefront_sensors.append(ho_wfs)
ho_wfs.detector = aotpy.Detector(
uid='HO DET1',
dark=self._image_from_eso_file(self._path / 'HOAcq.DET1.DARK.fits'),
weight_map=self._image_from_eso_file(self._path / 'HOAcq.DET1.WEIGHT.fits'),
sky_background=self._image_from_eso_file(self._path / 'HOAcq.DET1.BACKGROUND.fits'),
pixel_intensities=aotpy.Image(name='HO Pixels',
data=self._get_pixel_data_from_table(ho_pix_frame),
time=aotpy.Time('HO Pixel Time',
frame_numbers=ho_pix_frame['FrameCounter'].tolist()))
)
ho_wfs.subaperture_size = \
ho_wfs.detector.pixel_intensities.data.shape[0] // ho_wfs.subaperture_mask.data.shape[0]
cm = self._stack_slopes(fits.getdata(self._path / 'HORecn.REC1.HOCM.fits'), subap_axis=1)
s2m = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.S2M.fits'), subap_axis=1)
m2s = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.M2S.fits'), subap_axis=0)
im = self._stack_slopes(fits.getdata(self._path / 'CLMatrixOptimiser.SYNTHETIC_HO_IM.fits'), subap_axis=0)
self.system.loops.append(aotpy.ControlLoop(
uid='High-order loop',
input_sensor=ho_wfs,
commanded_corrector=self.dsm,
commands=aotpy.Image('DSM_positions', ho_loop_frame['DSM_Positions']),
ref_commands=aotpy.Image('HOCtr.ACT_POS_REF_MAP_WITH_OFFSETS',
fits.getdata(self._path / 'HOCtr.ACT_POS_REF_MAP_WITH_OFFSETS.fits')[:, 0]),
time=ho_time,
framerate=fits.getheader(self._path / 'HODet.CFG.DYNAMIC.fits')['FREQ'],
time_filter_num=aotpy.Image('HOCtr.A_TERMS', fits.getdata(self._path / 'HOCtr.A_TERMS.fits').T),
time_filter_den=aotpy.Image('HOCtr.B_TERMS', fits.getdata(self._path / 'HOCtr.B_TERMS.fits').T),
control_matrix=aotpy.Image(f'HORecn.REC1.HOCM', cm),
measurements_to_modes=aotpy.Image('CLMatrixOptimiser.S2M', s2m),
modes_to_commands=self._image_from_eso_file(self._path / 'CLMatrixOptimiser.M2V.fits'),
commands_to_modes=self._image_from_eso_file(self._path / 'CLMatrixOptimiser.V2M.fits'),
modes_to_measurements=aotpy.Image('CLMatrixOptimiser.M2S', m2s),
interaction_matrix=aotpy.Image('CLMatrixOptimiser.SYNTHETIC_HO_IM', im),
))
def _get_eso_telescope_name(self) -> str:
return 'ESO-VLT-U4'
def _get_eso_ao_name(self) -> str:
return 'ERIAO'
def _get_run_id(self) -> str:
return '60.A-9278(E)'
def _get_chip_id(self) -> str:
return f'ERIAO'