import os
from functools import reduce
import matplotlib
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.renderer.logger as logger
import pipeline.infrastructure.utils as utils
from pipeline.h.tasks.common.displays import common as common
LOG = infrastructure.get_logger(__name__)
class PhaseOffsetPlotHelper(object):
colour_map = {'BEFORE': {'L': ('-', 'orange', 0.6),
'R': ('--', 'sandybrown', 0.6),
'X': ('-', 'lightslategray', 0.6),
'Y': ('--', 'lightslategray', 0.6),
'XX': ('-', 'lightslategray', 0.6),
'YY': ('--', 'lightslategray', 0.6)},
'AFTER': {'L': ('-', 'green', 0.6),
'R': ('-', 'red', 0.6),
'X': ('-', 'green', 0.6),
'Y': ('-', 'red', 0.6),
'XX': ('-', 'green', 0.6),
'YY': ('-', 'red', 0.6)}}
"""
caltable_map should be a dictionary mapping the state to caltable
"""
def __init__(self, rootdir, prefix, caltable_map=None, plot_per_antenna=True):
assert set(PhaseOffsetPlotHelper.colour_map.keys()).issuperset(set(caltable_map.keys())),\
'caltables argument defines states not in colour_map'
self._rootdir = rootdir
self._prefix = prefix
self.caltable_map = caltable_map
self.plot_per_antenna = plot_per_antenna
def get_symbol_and_colour(self, pol, state):
return self.colour_map[state][pol]
def get_figfile(self, spw, antennas):
if len(antennas) == 1:
antenna = '.ant%s' % antennas[0].name
else:
antenna = ''
return os.path.join(self._rootdir,
'%s%s.spw%0.2d.png' % (self._prefix, antenna, spw.id))
def group_antennas(self, antennas):
if self.plot_per_antenna:
return [[ant] for ant in antennas]
else:
return [antennas]
def label_antenna(self, fig, antennas):
if self.plot_per_antenna:
text = '%s' % antennas[0].name
else:
text = 'All Antennas'
plt.text(0.5, 0.89, '%s' % text, color='k',
transform=fig.transFigure, ha='center', size=9)
class PhaseOffsetPlot(object):
def __init__(self, context, ms, plothelper, scan_intent=None, scan_id=None, scan_spw=None, score_retriever=None):
self._context = context
self._ms = ms
self._plothelper = plothelper
self._scans = ms.get_scans(scan_id=scan_id, scan_intent=scan_intent, spw=scan_spw)
self._score_retriever = score_retriever if score_retriever else common.NullScoreFinder()
self._caltables_loaded = False
self._load_caltables(plothelper.caltable_map)
def _load_caltables(self, caltable_map):
if self._caltables_loaded:
return
data = [(state, common.CaltableWrapper.from_caltable(c)) for state, c in caltable_map.items()]