Source code for mmtwfs.telescope

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# coding=utf-8

import subprocess
import warnings

import numpy as np
from skimage.transform import rotate as imrotate

import astropy.units as u
from astropy.io import ascii
from astropy.table import Table
from astropy import visualization

import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.colors as col

from .config import recursive_subclasses, merge_config, mmtwfs_config
from .custom_exceptions import WFSConfigException
from .secondary import SecondaryFactory
from .zernike import ZernikeVector

import logging
import logging.handlers

# we need to wrap the poppy import in a context manager to trap its whinging about missing pysynphot stuff that we don't use.
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    import poppy

log = logging.getLogger("Telescope")
log.setLevel(logging.INFO)


__all__ = ['TelescopeFactory', 'MMT', 'FLWO12']


[docs]def TelescopeFactory(telescope="mmt", secondary="f5", config={}, **kwargs): """ Build and return proper Telescope sub-class instance based on the value of 'telescope' and 'secondary'. """ config = merge_config(config, dict(**kwargs)) secondary = secondary.lower() telescope = telescope.lower() types = recursive_subclasses(Telescope) telescopes = [t.__name__.lower() for t in types] tel_map = dict(list(zip(telescopes, types))) if telescope not in telescopes: raise WFSConfigException(value=f"Specified telescope, {telescope}, not valid or not implemented.") tel_cls = tel_map[telescope](secondary=secondary, config=config) return tel_cls
class Telescope(object): """ Defines generic configuration and methods that pertain to telescope and primary mirror systems """ def __init__(self, telescope="mmt", secondary="f5", config={}, **kwargs): config = merge_config(config, dict(**kwargs)) if telescope not in mmtwfs_config['telescope']: msg = f"Invalid telescope specified, {telescope}." raise WFSConfigException(value=msg) if secondary not in mmtwfs_config['secondary']: msg = f"Invalid secondary specified, {secondary}." raise WFSConfigException(value=msg) if mmtwfs_config['secondary'][secondary]['telescope'] != telescope: msg = f"Invalid secondary, {secondary}, for telescope, {telescope}." raise WFSConfigException(value=msg) self.__dict__.update(merge_config(mmtwfs_config['telescope'][telescope], config)) self.secondary = SecondaryFactory(secondary=secondary) self.radius = self.diameter / 2. self.nmperrad = self.radius.to(u.nm).value self.nmperasec = self.nmperrad / 206265. # ratio of the size of the central obstruction of the secondary to the size of the primary self.obscuration = self.secondary.diameter / self.diameter # create model of MMTO pupil including secondary and secondary support obstructions self.pupil = self._pupil_model() # initialize poppy optical system used for calculating the PSFs self.osys = poppy.OpticalSystem() self.osys.add_pupil(self.pupil) self.osys.add_pupil(poppy.ZernikeWFE(radius=self.radius.to(u.m).value, coefficients=[0.0, 0.0, 0.0, 0.0])) self.osys.add_detector(pixelscale=self.psf_pixel_scale, fov_arcsec=self.psf_fov) def _pupil_model(self): """ Use poppy to create a model of the pupil given the configured primary and secondary mirrors. """ primary = poppy.CircularAperture(radius=self.radius.to(u.m).value) secondary = poppy.SecondaryObscuration( secondary_radius=self.secondary.diameter.to(u.m).value / 2, n_supports=self.n_supports, support_width=self.support_width.to(u.m).value, support_angle_offset=self.support_offset.to(u.deg).value ) pup_model = poppy.CompoundAnalyticOptic(opticslist=[primary, secondary], name="MMTO") return pup_model def pupil_mask(self, rotation=0.0, size=512): """ Use the pupil model to make a pupil mask that can be used as a kernel for finding pupil-like things in images """ if size >= 700: msg = "WFS pupil sizes are currently restricted to 700 pixels in diameter or less." raise WFSConfigException(value=msg) rotation = u.Quantity(rotation, u.deg) # not sure how to get the image data out directly, but the to_fits() method gives me a path... pup_im = imrotate(self.pupil.to_fits(npix=size)[0].data.astype(float), rotation.value) pup_im = pup_im / pup_im.max() return pup_im def psf(self, zv=ZernikeVector(), wavelength=550.*u.nm, plot=True): """ Take a ZernikeVector and calculate resulting PSF at given wavelength. """ # poppy wants the wavelength in meters try: w = wavelength.to(u.m).value except AttributeError: w = wavelength # if no unit provided, assumed meters # poppy wants the piston term so whack it in there if modestart isn't already 1 if zv.modestart != 1: zv.modestart = 1 zv['Z01'] = 0.0 # poppy wants coeffs in meters zv.units = u.m # poppy wants Noll normalized coefficients coeffs = zv.norm_array # pop detector out to reuse, pop old wavefront error out to make way for new det = self.osys.planes.pop() fov = det.fov_arcsec.value # add new wavefront error and put detector back in place wfe = poppy.ZernikeWFE(radius=self.radius.to(u.m).value, coefficients=coeffs) self.osys.add_pupil(wfe) self.osys.planes.append(det) psf = self.osys.calc_psf(w) psf_fig = None if plot: psf_fig, ax = plt.subplots() psf_fig.set_label("PSF at {0:0.0f}".format(wavelength)) norm = visualization.mpl_normalize.ImageNormalize(stretch=visualization.LinearStretch()) ims = ax.imshow(psf[0].data, extent=[-fov/2, fov/2, -fov/2, fov/2], cmap=cm.magma, norm=norm) ax.set_xlabel("arcsec") ax.set_ylabel("arcsec") cb = psf_fig.colorbar(ims) cb.set_label("Fraction of Total Flux") return psf, psf_fig
[docs]class FLWO12(Telescope): """ Defines configuration and methods for the FLWO 1.2-meter """ def __init__(self, config={}, **kwargs): config = merge_config(config, dict(**kwargs)) super(FLWO12, self).__init__(telescope="flwo12", secondary="flwo12", config=config)
class FLWO15(Telescope): """ Defines configuration and methods for the FLWO 1.5-meter """ def __init__(self, config={}, **kwargs): config = merge_config(config, dict(**kwargs)) super(FLWO15, self).__init__(telescope="flwo15", secondary="flwo15", config=config)
[docs]class MMT(Telescope): """ Defines configuration and methods that pertain to the MMT's telescope and primary mirror systems """ def __init__(self, secondary="f5", config={}, **kwargs): config = merge_config(config, dict(**kwargs)) super(MMT, self).__init__(telescope="mmt", secondary=secondary, config=config) # load table of finite element coordinates self.nodecoor = self.load_bcv_coordinates() self.n_node = len(self.nodecoor) # load table of actuator coordinates self.actcoor = self.load_actuator_coordinates() self.n_act = len(self.actcoor) # load actuator influence matrix that provides the surface displacement caused by 1 N of force by # each actuator at each of self.node finite element node positions. self.surf2act = self.load_influence_matrix() # use this boolean to determine if corrections are actually to be sent self.connected = False # keep track of last and total forces. a blank ZernikeVector will generate the appropriate format # table with all forces set to 0. self.last_forces = self.bending_forces(zv=ZernikeVector()) self.total_forces = self.bending_forces(zv=ZernikeVector()) self.last_m1focus = 0.0 * u.um self.total_m1focus = 0.0 * u.um
[docs] def connect(self): """ Set state to connected so that calculated corrections will be sent to the relevant systems """ self.connected = True
[docs] def disconnect(self): """ Set state to disconnected so that corrections will be calculated, but not sent """ self.connected = False
[docs] def bending_forces(self, zv=ZernikeVector(), gain=0.5): """ Given a ZernikeVector (or similar object describing a 2D polynomial surface), calculate the actuator forces required to correct for the surface displacement it describes. """ # we don't want to bend any tilts... if 'Z02' in zv: zv['Z02'] = 0.0 if 'Z03' in zv: zv['Z03'] = 0.0 # convert to nm... zv.units = u.nm # make sure we're not Noll normalized... zv.denormalize() # need to rotate the wavefront -90 degrees to match the BCV angle convention of +Y being 0 deg. zv.rotate(-90*u.deg) # get surface displacements at the BCV node positions. multiply the wavefront amplitude by 0.5 to account for # reflection off the surface. surf_corr = -0.5 * gain * zv.total_phase(self.nodecoor['bcv_rho'], self.nodecoor['bcv_phi']) if isinstance(surf_corr, float): # means we got 0.0 from zv.total_phase() force_vec = np.zeros(self.n_act) else: force_vec = np.dot(surf_corr, self.surf2act).value # remove the units that got passed through # return an astropy.table.Table so we can easily package actuator ID along with the force. its write() method # also provides a lot of flexibility in providing outputs that match the old system. t = Table([self.actcoor['act_id'], force_vec], names=['actuator', 'force']) return t
[docs] def to_rcell(self, t, filename="zfile", overwrite=True): """ Take table generated by bending_forces() and write it to a file of a format that matches the old SHWFS system """ t.write(filename, format="ascii.no_header", delimiter="\t", formats={'force': ".1f"}, overwrite=overwrite)
[docs] def calculate_primary_corrections(self, zv, mask=[], gain=0.5): """ Take ZernikeVector as input and determine corrections to apply to primary/secondary """ # leave out tilts, focus, and coma from force calcs to start with def_mask = ['Z02', 'Z03', 'Z04', 'Z07', 'Z08'] def_mask.extend(mask) # mask out all high order terms beyond 2nd order spherical for i in range(23, 99): def_mask.append("Z{0:02d}".format(i)) mask = list(set(def_mask)) zv_masked = zv.copy() zv_masked.denormalize() for k in mask: zv_masked.ignore(k) # to reduce the amount of force required to remove spherical aberration, we offset the r**2 part of that term by # bending focus into the primary and then offsetting that by adjusting the secondary. this has the effect of # reducing by ~1/4 to 1/3 the total force required to correct a given amount of spherical aberration. # # this same scheme can also be extended to the higher order spherical terms as well, Z22 and Z37. # # for reference: # Z04 ~ 2r**2 - 1 # Z11 ~ 6r**4 - 6r**2 + 1 # Z22 ~ 20r**6 - 30r**4 + 12r**2 - 1 # Z37 ~ 70r**8 - 140r**6 + 90r**4 - 20r**2 + 1 # zv_masked['Z04'] = -6.0 * zv_masked['Z11'] - 12.0 * zv_masked['Z22'] - 20.0 * zv_masked['Z37'] m1focus_corr = gain * zv_masked['Z04'] / self.secondary.focus_trans t = self.bending_forces(zv=zv_masked, gain=gain) return t, m1focus_corr, zv_masked
[docs] def bend_mirror(self, filename="zfile"): """ Take a force file and send it to the cell to apply bending forces. Return fraction of requested forces that the cell reports were applied. """ frac = 1.0 log.info(f"Using command, /mmt/scripts/cell_send_forces {filename}, to apply forces...") pipe = subprocess.Popen( ['/mmt/scripts/cell_send_forces', f"{filename}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: (stdout, stderr) = pipe.communicate(timeout=10) except subprocess.TimeoutExpired: pipe.kill() (stdout, stderr) = pipe.communicate() outstr = stdout.decode('utf8') outerr = stderr.decode('utf8') # had to dig into the cell code at /mmt/vxsource/mmt/cell/src/cell_inf.c to get the messages that are produces if "Able to Apply" in outstr: log.info("...forces successfully applied in full.") elif "Forces Rejected" in outstr: log.error("...forces rejected!") frac = 0.0 elif "Unable to apply forces" in outstr: log.error("...unable to apply forces!") frac = 0.0 elif "Applying partial forces" in outstr: percent = float(outstr.split()[3]) frac = percent / 100.0 log.warn(f"...applied {percent}% of requested forces.") else: log.error(f"...got unexpected reply from cell command: {outstr}") log.error(f"\t stderr: {outerr}") return frac
[docs] def correct_primary(self, t, m1focus_corr, filename="zfile"): """ Take force table and focus offset calculated by self.calculate_primary_corrections() and apply them, if connected. """ frac = 1.0 if self.connected: self.to_rcell(t, filename=filename) log.info(f"Sending forces from {filename}...") frac = self.bend_mirror(filename=filename) self.secondary.m1spherical(frac * m1focus_corr) else: log.info("Not connected; no commands sent to cell or hexapod.") self.last_forces = t.copy(copy_data=True) self.last_forces['force'] *= frac self.last_m1focus = frac * m1focus_corr.copy() self.total_forces['force'] += frac * t['force'] self.total_m1focus += frac * m1focus_corr return t, m1focus_corr
[docs] def undo_last(self, zfilename="zfile_undo"): """ Undo the last set of corrections. """ self.last_forces['force'] *= -1 self.last_m1focus *= -1 frac = 1.0 if self.connected: log.info("Undoing last set of primary mirror corrections...") self.to_rcell(self.last_forces, filename=zfilename) frac = self.bend_mirror(filename=zfilename) self.secondary.m1spherical(frac * self.last_m1focus) else: log.info("Not connected; no undo commands sent.") self.total_m1focus += frac * self.last_m1focus self.total_forces['force'] += frac * self.last_forces['force'] return self.last_forces.copy(), self.last_m1focus.copy()
[docs] def clear_forces(self): """ Clear applied forces from primary mirror and clear any m1spherical offsets from secondary hexapod """ if self.connected: log.info("Clearing forces and spherical aberration focus offsets...") self.secondary.clear_m1spherical() pipe = subprocess.Popen(['/mmt/scripts/cell_clear_forces'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: (stdout, stderr) = pipe.communicate(timeout=20) except subprocess.TimeoutExpired: pipe.kill() (stdout, stderr) = pipe.communicate() outstr = stdout.decode('utf8') outerr = stderr.decode('utf8') log.info(f"...{outstr.strip()}") if len(outerr) > 0: log.warn(f"Got error from cell_clear_forces: {outerr}") else: log.info("Not connected; no clearing commands sent.") # the 'last' corrections are negations of the current total. reset the totals to 0. self.last_forces = self.total_forces.copy(copy_data=True) self.last_forces['force'] *= -1 self.last_m1focus = -self.total_m1focus self.total_forces = self.bending_forces(zv=ZernikeVector()) self.total_m1focus = 0.0 return self.last_forces, self.last_m1focus
[docs] def load_influence_matrix(self): """ The influence of each actuator on the mirror surface has been modeled via finite element analysis. This method loads the influence matrix that resulted from this analysis and maps for each actuator the influence of 1 lb of force on the mirror surface at each finite element node. This matrix is stored in a binary file for compactness and speed of loading. """ surf2act = np.fromfile(self.surf2act_file, dtype=np.float32).reshape(self.n_act, self.n_node).transpose() return surf2act
[docs] def load_actuator_coordinates(self): """ The actuator IDs and X/Y positions in mm are stored in a simple ASCII table. Load it using astropy.io.ascii, convert to units of mirror radius, and add polar coordinates. """ coord = ascii.read(self.actuator_file, names=["act_id", "act_x", "act_y", "act_type"]) for ax in ["act_x", "act_y"]: coord[ax] /= self.bcv_radius.to(u.mm).value coord['act_rho'] = np.sqrt(coord['act_x']**2 + coord['act_y']**2) coord['act_phi'] = np.arctan2(coord['act_y'], coord['act_x']) coord['act_phi'].unit = u.radian return coord
[docs] def load_bcv_coordinates(self): """ The BCV finite element nodes IDs and X/Y/Z positions in mm are stored in a simple ASCII table. Load it using astropy.io.ascii, convert to units of mirror radius, and add polar coordinates. """ coord = ascii.read(self.nodecoor_file, names=["bcv_id", "bcv_x", "bcv_y", "bcv_z"]) for ax in ["bcv_x", "bcv_y"]: coord[ax] /= self.bcv_radius.to(u.mm).value coord['bcv_rho'] = np.sqrt(coord['bcv_x']**2 + coord['bcv_y']**2) coord['bcv_phi'] = np.arctan2(coord['bcv_y'], coord['bcv_x']) coord['bcv_phi'].unit = u.radian return coord
[docs] def plot_forces(self, t, m1focus=None, limit=100.): """ Plot actuator forces given force table as output from self.bending_forces() """ coords = self.actcoor r_fac = 0.5 * self.diameter / self.bcv_radius # adjust for slight difference cmap = cm.ScalarMappable(col.Normalize(-1*limit, limit), cm.bwr) cmap._A = [] # grr stupid matplotlib fig, ax = plt.subplots() fig.set_label("M1 Actuator Forces") xcor, ycor = coords['act_x']/r_fac, coords['act_y']/r_fac ax.scatter(xcor, ycor, color=cmap.to_rgba(t['force'])) for i, (x, y) in enumerate(zip(xcor, ycor)): ax.text(x, y+0.02, t['actuator'][i], horizontalalignment='center', verticalalignment='bottom', size='xx-small') ax.set_aspect(1.0) circle1 = plt.Circle((0, 0), 1.0, fill=False, color='black', alpha=0.2) circle2 = plt.Circle((0, 0), 0.9/6.5, fill=False, color='black', alpha=0.2) ax.add_artist(circle1) ax.add_artist(circle2) if m1focus is not None: ax.set_title("M1 Focus Offset: {0:0.1f}".format(m1focus)) ax.set_axis_off() cb = fig.colorbar(cmap) cb.set_label("Actuator Force (N)") return fig