"""
This module contains a class for translating telemetry data produced by the ALPAO RTC, part of the PAPYRUS system.
It assumes the MATLAB files produced by ALPAO RTC have been converted to use struct objects instead of classes.
The custom MATLAB function makeTelemetryFileReadable is made for this.
Note that the usage of scipy.io.loadmat to read the .mat files in Python assumes that these files were saved with the
version -v7 of matlab or previous. scipy.io.loadmat does not work for files saved with matlab -v7.3 or later.
"""
import datetime
import importlib.resources
import numpy as np
import aotpy
from aotpy.io.fits import image_from_fits_file
from .base import BaseTranslator
try:
from scipy.io import loadmat
except (ImportError, ModuleNotFoundError):
loadmat = None
[docs]
class PAPYRUSTranslator(BaseTranslator):
def __init__(self, file) -> None:
if loadmat is None:
raise ImportError("Translating PAPYRUS data requires the scipy module.")
data = loadmat(file, simplify_cells=True)['data']
# common fields between SH and PYWFS telemetry
self.system = aotpy.AOSystem(ao_mode='SCAO', name='PAPYRUS')
papyrus_data_path = importlib.resources.files('aotpy.data.PAPYRUS')
with importlib.resources.as_file(papyrus_data_path / 'T152_pupil.fits') as p:
pupil_mask = image_from_fits_file(p, name='T152 PUPIL')
self.system.main_telescope = aotpy.MainTelescope(
uid="T152",
enclosing_diameter=1.5,
inscribed_diameter=1.5,
pupil_mask=pupil_mask)
actuator_coordinates = [aotpy.Coordinates(coor[0], coor[1]) for coor in data['wfcCommand']['coordinates']]
dm = aotpy.DeformableMirror(uid="DM_ALPAO_241",
telescope=self.system.main_telescope,
n_valid_actuators=data['wfc']['offset']['values'].size,
actuator_coordinates=actuator_coordinates,
pupil_mask=self.system.main_telescope.pupil_mask)
self.system.wavefront_correctors.append(dm)
# read timestamps
frame_number_measurements = np.array(data['wfsSlopesMetaData']['frameid'], dtype=float).tolist()
time_stamp_measurements = (np.array(data['wfsSlopesMetaData']['timestamp'], dtype=float) / 1e9).tolist()
frame_number_pixel_intensities = np.array(data['wfsImagesMetaData']['frameid'], dtype=float).tolist()
time_stamp_pixel_intensities = (np.array(data['wfsImagesMetaData']['timestamp'], dtype=float) / 1e9).tolist()
frame_number_commands = np.array(data['wfcCommandMetaData']['frameid'], dtype=float).tolist()
time_stamp_commands = (np.array(data['wfcCommandMetaData']['timestamp'], dtype=float) / 1e9).tolist()
# extract acquisition date from time stamps
self.system.date_beginning = datetime.datetime.fromtimestamp(time_stamp_commands[0], datetime.UTC)
self.system.date_end = datetime.datetime.fromtimestamp(time_stamp_commands[-1], datetime.UTC)
time_measurements = aotpy.Time(uid='Measurements time',
timestamps=time_stamp_measurements,
frame_numbers=frame_number_measurements)
time_pixel_intensities = aotpy.Time(uid='WFS detector frames time',
timestamps=time_stamp_pixel_intensities,
frame_numbers=frame_number_pixel_intensities)
time_commands = aotpy.Time(uid='Commands time',
timestamps=time_stamp_commands,
frame_numbers=frame_number_commands)
if data['wfsUid'] == 'ACE.REMOTE.AlpaoSHS.16': # if SH telemetry, run SH translator script
self.system.config = 'WFS : SH'
self.system.sources = [aotpy.NaturalGuideStar(uid=data['source'])]
slope_x = data['wfsSlopes']['sx']
slope_y = data['wfsSlopes']['sy']
measurements = np.empty((slope_x.shape[1], 2, slope_x.shape[0]))
measurements[:, 0, :] = np.transpose(slope_x)
measurements[:, 1, :] = np.transpose(slope_y)
subaperture_mask = np.array(data['wfsSlopes']['mask'], dtype=int)
subaperture_mask[np.where(subaperture_mask == 0)] = -1
index = 0
for j in range(subaperture_mask.shape[1]):
for i in range(subaperture_mask.shape[0]):
if subaperture_mask[i, j] != -1:
subaperture_mask[i, j] = index
index += 1
sh = aotpy.ShackHartmann(uid=data['wfsUid'],
source=self.system.sources[0],
n_valid_subapertures=np.count_nonzero(data['wfsSlopes']['mask']),
measurements=aotpy.Image(name="PAPYRUS Sh Slopes",
data=measurements,
time=time_measurements),
subaperture_mask=aotpy.Image('SH subaperture mask', subaperture_mask))
dark = data['detector']['dark']
detector = aotpy.Detector(uid='cblue One',
readout_noise=3,
pixel_intensities=aotpy.Image(name="cblue Frames",
data=np.moveaxis(data['wfsImages'], -1, 0),
unit='Cblue One ADU',
time=time_pixel_intensities),
frame_rate=float(data['detector']['frameRate']),
integration_time=data['detector']['exposureTime'],
gain=data['detector']['gain'],
binning=np.max(np.array(data['detector']['binning'])),
dark=aotpy.Image('Ocam2K dark', dark),
pixel_scale=5.8 / 3600 / 16 * np.pi / 180,
# on-sky angular size of one pixel of SH detector [rad]
type='CMOS')
sh.detector = detector
self.system.wavefront_sensors.append(sh)
im = data['calibrator']['interactionMatrix']
interaction_matrix = np.empty((im.shape[0] // 2, 2, im.shape[1]))
interaction_matrix[:, 0, :] = im[::2]
interaction_matrix[:, 1, :] = im[1::2]
cm = data['loop']['commandMatrix']
control_matrix = np.empty((cm.shape[0], 2, cm.shape[1] // 2))
control_matrix[:, 0, :] = cm[:, ::2]
control_matrix[:, 1, :] = cm[:, 1::2]
modes_to_commands = data['wfc']['modeToCommand']
modal_coefficients = np.transpose(data['loop']['calibrator']['ModeGains'])
gain = data['loop']['controller']['Gain']
if gain == 0:
closed = False
else:
closed = True
ref_commands = data['wfc']['offset']['values']
loop = aotpy.ControlLoop(
uid=data['uid'],
commanded_corrector=dm,
input_sensor=sh,
commands=aotpy.Image(name='PAPYRUS DM Commands',
data=np.transpose(data['wfcCommand']['values']),
unit=data['wfcCommand']['unit'],
time=time_commands),
interaction_matrix=aotpy.Image("PAPYRUS Interaction Matrix", interaction_matrix),
control_matrix=aotpy.Image("PAPYRUS Control Matrix", control_matrix),
modes_to_commands=aotpy.Image("M2C", modes_to_commands),
modal_coefficients=aotpy.Image('loop modal gains', modal_coefficients),
closed=closed,
ref_commands=aotpy.Image('dm flat(i.e closed on static aberations)', ref_commands),
time_filter_num=aotpy.Image("Loop gain", np.array([float(gain)])))
self.system.loops.append(loop)
else: # run pyramid telemetry translator script
self.system.config = 'WFS : Pyramid'
self.system.sources = [aotpy.NaturalGuideStar(uid='NGS')]
slope_x = data['wfsSlopes']['sx']
slope_y = data['wfsSlopes']['sy']
measurements = np.empty((slope_x.shape[1], 2, slope_x.shape[0]))
measurements[:, 0, :] = np.transpose(slope_x)
measurements[:, 1, :] = np.transpose(slope_y)
subaperture_mask = np.array(data['wfsSlopes']['mask'], dtype=int)
subaperture_mask[np.where(subaperture_mask == 0)] = -1
index = 0
for j in range(subaperture_mask.shape[1]):
for i in range(subaperture_mask.shape[0]):
if subaperture_mask[i, j] != -1:
subaperture_mask[i, j] = int(index)
index += 1
pyramid = aotpy.Pyramid(uid=data['wfsUid'],
source=self.system.sources[0],
dimensions=2,
n_valid_subapertures=np.count_nonzero(data['wfsSlopes']['mask']),
measurements=aotpy.Image("PAPYRUS Sh Slopes", measurements),
n_sides=4,
subaperture_mask=aotpy.Image('SH subaperture mask', subaperture_mask))
dark = data['detector']['dark']
detector = aotpy.Detector(uid='Ocam2K',
readout_noise=0,
pixel_intensities=aotpy.Image("Ocam2K frames",
np.moveaxis(data['wfsImages'], -1, 0)),
frame_rate=1500.0,
integration_time=data['detector']['exposureTime'],
gain=data['detector']['gain'],
binning=np.max(np.array(data['detector']['binning'])),
dark=aotpy.Image('Ocam2K dark', dark),
type='EMCCD')
pyramid.detector = detector
self.system.wavefront_sensors.append(pyramid)
im = data['calibrator']['interactionMatrix']
interaction_matrix = np.empty((im.shape[0] // 2, 2, im.shape[1]))
interaction_matrix[:, 0, :] = im[::2]
interaction_matrix[:, 1, :] = im[1::2]
cm = data['loop']['commandMatrix']
control_matrix = np.empty((cm.shape[0], 2, cm.shape[1] // 2))
control_matrix[:, 0, :] = cm[:, ::2]
control_matrix[:, 1, :] = cm[:, 1::2]
modes_to_commands = data['wfc']['modeToCommand']
modal_coefficients = np.transpose(data['loop']['calibrator']['ModeGains'])
gain = data['loop']['controller']['Gain']
if gain == 0:
closed = False
else:
closed = True
ref_commands = data['wfc']['offset']['values']
loop = aotpy.ControlLoop(
uid=data['uid'],
commanded_corrector=dm,
input_sensor=pyramid,
commands=aotpy.Image('PAPYRUS DM Commands', np.transpose(data['wfcCommand']['values'])),
interaction_matrix=aotpy.Image("PAPYRUS Interaction Matrix", interaction_matrix),
control_matrix=aotpy.Image("PAPYRUS Control Matrix", control_matrix),
modes_to_commands=aotpy.Image("M2C", modes_to_commands),
modal_coefficients=aotpy.Image('Loop modal gains', modal_coefficients),
closed=closed,
ref_commands=aotpy.Image('DM flat (i.e loop closed on static aberrations)', ref_commands),
time_filter_num=aotpy.Image("Loop gain", np.array([float(gain)])))
self.system.loops.append(loop)