# geodesy and pointing and other helper functions that are useful
# to be available outside of the simdata task
# geodesy from NGS: http://www.ngs.noaa.gov/TOOLS/program_descriptions.html
from __future__ import absolute_import
from __future__ import print_function
from scipy.stats import scoreatpercentile
import scipy.special as spspec
import scipy.signal as spsig
import scipy.interpolate as spintrp
from collections import OrderedDict
from casatasks.private.casa_transition import is_CASA6
from casatools import table, image, imagepol, regionmanager, calibrater, measures, quanta, coordsys, componentlist, simulator
from casatasks import casalog
from casatools import table, image, imagepol, regionmanager, calibrater, measures, quanta, coordsys, componentlist, simulator, synthesisutils
from casatasks import casalog, tclean
from casatasks.private.cleanhelper import cleanhelper
# all I really need is casalog, but how to get it:?
from tclean import tclean
#qatool = casac.homefinder.find_home_by_name('quantaHome')
im,cb,ms,tb,me,ia,po,sm,cl,cs,rg,sl,dc,vp,msmd,fi,fn,imd,sdms=gentools(['im','cb','ms','tb','me','ia','po','sm','cl','cs','rg','sl','dc','vp','msmd','fi','fn','imd','sdms'])
_su = casac.synthesisutils( )
#im, cb, ms, tb, fl, me, ia, po, sm, cl, cs, rg, sl, dc, vp, msmd, fi, fn, imd = gentools()
# functions defined outside of the simutil class
def is_array_type(value):
array_type = [list, tuple, pl.ndarray]
if type(value) in array_type:
return self.itsnumbers[xi]
simutil contains methods to facilitate simulation.
To use these, create a simutil instance e.g.
CASA> from simutil import simutil
CASA> from casatasks.private import simutil
CASA> u=simutil.simutil()
CASA> x,y,z,d,padnames,antnames,telescope,posobs = u.readantenna("myconfig.cfg")
def __init__(self, direction="",
centerfreq=qa.quantity("245GHz"),
bandwidth=qa.quantity("1GHz"),
totaltime=qa.quantity("0h"),
self.centerfreq=centerfreq
###########################################################
def direction_splitter(self, direction=None):
Given a direction, return its epoch, x, and y parts. Epoch will be ''
if absent, or '%s ' % epoch if present. x and y will be angle qa's in
if type(direction) == type([]):
direction=self.average_direction(direction)[0]
# x, y = map(qa.toangle, dirl[-2:])
telescope=None, diam=None, nant=None,
integration=None,debug=None,
method="tsys-atm",tau0=None,t_sky=None):
if qa.convert(elevation,"deg")['value']<3:
self.msg("Elevation < ALMA limit of 3 deg",priority="error")
tmpname="tmp"+str(os.getpid())
while i<10 and len(glob.glob(tmpname+"*"))>0:
tmpname="tmp"+str(os.getpid())+str(i)
xx=glob.glob(tmpname+"*")
return model_refdir,model_cell,model_size,model_nchan,model_specrefval,model_specrefpix,model_width,model_stokes
##################################################################
def imclean(self,mstoimage,imagename,
cleanmode,cell,imsize,imcenter,interactive,niter,threshold,weighting,
outertaper,pbcor,stokes,sourcefieldlist="",modelimage="",mask=[],dryrun=False):
cleanmode,psfmode,cell,imsize,imcenter,
interactive,niter,threshold,weighting,
outertaper,pbcor,stokes,sourcefieldlist="",
modelimage="",mask=[],dryrun=False):
Wrapper function to call CASA imaging task 'clean' on a MeasurementSet
mstoimage parameter expects the path to a MeasurementSet
imsize parameter expects a length-2 list of integers
cell parameter expects a length-2 list containing qa.quantity objects
interactive and dryrun parameters expect boolean type input
Other parameters expect input in format compatible with 'clean'
from simutil import is_array_type
Creates a template '[imagename].clean.last' file in addition to
# determine channelization from (first) ms:
if is_array_type(mstoimage):
imagermode="clark" # set default to prevent UnboundLocalError
# in 3.4 clean doesn't accept just any imsize
from cleanhelper import cleanhelper
optsize[0]=cleanhelper.getOptimumSize(imsize[0])
optsize[0]=_su.getOptimumSize(imsize[0])
if nksize==1: # imsize can be a single element or array
optsize[1]=cleanhelper.getOptimumSize(imsize[1])
optsize[1]=_su.getOptimumSize(imsize[1])
if((optsize[0] != imsize[0]) or (nksize!=1 and optsize[1] != imsize[1])):
self.msg(str(imsize)+' is not an acceptable imagesize, will use '+str(optsize)+" instead",priority="warn")
# print clean inputs no matter what, so user can use them.
# and write a clean.last file
cleanlast=open(imagename+".clean.last",'w')
cleanlast.write('taskname = "clean"\n')
uvtaper=uvtaper,outertaper=outertaper, pbcor=True, mask=mask,
del freq,nchan # something is holding onto the ms in table cache
##################################################################
def imtclean(self, mstoimage, imagename,
cell, imsize, imdirection,
interactive, niter, threshold, weighting,
outertaper, pbcor, stokes,
modelimage="", mask=[], dryrun=False):
Wrapper function for Radio Interferometric Image Reconstruction from
input MeasurementSet using standard CASA imaging task ('tclean').
Duplicates the method "imclean" but with non-deprecated task call.
Selecting individual fields for imaging is not supported
mstoimage parameter expects the path to a MeasurementSet,
or list of MeasurementSets
imagename parameter expects string for output image file
imsize parameter expects a length-1 or length-2 list of integers
cell parameter expects a length-2 list containing qa.quantity objects
Just like imclean, does not yield return
Creates a template '[imagename].tclean.last' file in addition to
normal tclean task outputs
invocation_parameters = OrderedDict( )
# use the first provided MS to determine channelization for output
if is_array_type(mstoimage):
tb.open(ms0 + "/SPECTRAL_WINDOW")
self.msg("Detected more than one SPW in " + ms0,
priority="info", origin="simutil")
self.msg("Determining output cube parameters using " +
"first SPW present in " + ms0,
priority="info", origin="simutil")
freq=tb.getvarcol("CHAN_FREQ")['r1']
nchan=1 # duplicate imclean method
self.msg("nchan > 1 is not supported for dryrun = True",
priority="info", origin="simutil")
# next, define tclean call defaults
# legacy comparison of image size input against heuristic
optsize[0] = _su.getOptimumSize(imsize[0])
if len(imsize) == 1: # user expects square output images
optsize[1]=_su.getOptimumSize(imsize[1])
if((optsize[0] != imsize[0]) or
(len(imsize) != 1 and optsize[1] != imsize[1])):
self.msg(str(imsize)+" is not an acceptable imagesize, " +
" using imsize=" + str(optsize) + " instead",
priority="warn", origin="simutil")
# the cell parameter expects a list of qa.quantity objects,
formatted_correctly = [qa.isquantity(cell[i]) for i in range(len(cell))]
assert False not in formatted_correctly, "simutil function imtclean expects cell parameter input to be comprised of quantity objects"
# convert the first two elements for storage in the tclean.last file
cellparam = [str(cell[0]['value']) + str(cell[0]['unit']),
str(cell[1]['value']) + str(cell[1]['unit'])]
if os.path.exists(modelimage):
elif len(modelimage) > 0:
self.msg("Could not find modelimage, proceeding without one",
priority="warn", origin="simutil")
# set tclean top-level parameters (no parent nodes)
invocation_parameters['vis'] = mstoimage
invocation_parameters['selectdata'] = False
invocation_parameters['imagename'] = imagename
invocation_parameters['imsize'] = imsize
invocation_parameters['cell'] = cellparam
invocation_parameters['phasecenter'] = imdirection
invocation_parameters['stokes'] = stokes
invocation_parameters['startmodel'] = modelimage
invocation_parameters['specmode'] = chanmode
invocation_parameters['gridder'] = gridder
invocation_parameters['deconvolver'] = deconvolver
invocation_parameters['restoration'] = True
invocation_parameters['outlierfile'] = ''
invocation_parameters['weighting'] = weighting
invocation_parameters['niter'] = niter
invocation_parameters['usemask'] = 'user'
invocation_parameters['fastnoise'] = True
invocation_parameters['restart'] = True
invocation_parameters['savemodel'] = 'none'
invocation_parameters['calcres'] = True
invocation_parameters['calcpsf'] = True
invocation_parameters['parallel'] = False
invocation_parameters['restoringbeam'] = 'common'
invocation_parameters['pbcor'] = pbcor
invocation_parameters['uvtaper'] = outertaper
invocation_parameters['threshold'] = threshold
invocation_parameters['interactive'] = interactive
invocation_parameters['threshold'] = ''
invocation_parameters['interactive'] = False
invocation_parameters['mask'] = mask
invocation_parameters['pbmask'] = 0.0
# write the tclean.last file (template in case of dryrun=True)
# it would be preferable to have a helper function do _this_,
# then just call tclean right from simanalyze, but precedent.
filename = os.path.join(os.getcwd(),imagename+'.tclean.last')
if os.path.isfile(filename):
self.msg("Overwriting existing 'tclean.last' file",
priority="info",origin="simutil")
with open(filename, 'w'): pass
# fill in the tclean.last file
# the sections of code that handle this for tasks are
# autogenerated during the CASAshell build process. See:
# [unpacked_build]/lib/py/lib/python3.6/site-packages/casashell/private
with open(filename,'w') as f:
# fill in the used parameters first
for i in invocation_parameters:
# catch None type objects returned by repr function
if i.startswith('<') and s.endswith('>'):
f.write("{:<20}={!s}\n".format(i, None))
f.write("{:<20}={!r}\n".format(i,
invocation_parameters[i]))
# next, open and fill the task call
for i in invocation_parameters:
if i.startswith('<') and s.endswith('>'):
f.write("{!s}={!s}".format(i, None))
f.write("{!s}={!r}".format(i,
invocation_parameters[i]))
if count < len(invocation_parameters): f.write(",")
# finally close the task call
# gather tclean task call by attempting to parse the file just created
with open(filename, 'r') as _f:
if line.startswith('#tclean( '):
self.msg(task_call, priority="warn", origin="simutil")
self.msg(task_call, priority="info", origin="simutil")
# now that the tclean call is specified, it may be executed
tclean( vis = invocation_parameters['vis'],
selectdata = invocation_parameters['selectdata'],
imagename=invocation_parameters['imagename'],
imsize=invocation_parameters['imsize'],
cell=invocation_parameters['cell'],
phasecenter=invocation_parameters['phasecenter'],
stokes=invocation_parameters['stokes'],
startmodel=invocation_parameters['startmodel'],
specmode=invocation_parameters['specmode'],
gridder=invocation_parameters['gridder'],
deconvolver=invocation_parameters['deconvolver'],
restoration=invocation_parameters['restoration'],
outlierfile=invocation_parameters['outlierfile'],
weighting=invocation_parameters['weighting'],
niter=invocation_parameters['niter'],
usemask=invocation_parameters['usemask'],
fastnoise=invocation_parameters['fastnoise'],
restart=invocation_parameters['restart'],
savemodel=invocation_parameters['savemodel'],
calcres=invocation_parameters['calcres'],
calcpsf=invocation_parameters['calcpsf'],
parallel=invocation_parameters['parallel'],
restoringbeam=invocation_parameters['restoringbeam'],
pbcor=invocation_parameters['pbcor'],
uvtaper=invocation_parameters['uvtaper'],
threshold=invocation_parameters['threshold'],
interactive=invocation_parameters['interactive'],
mask=invocation_parameters['mask'],
pbmask=invocation_parameters['pbmask'] )
###################################################
def flatimage(self,image,complist="",verbose=False):
def approxBeam(self,configfile,freq):
mylengths=self.baselineLengths(configfile)
rmslength = pl.sqrt(pl.mean(mylengths.flatten()**2))
from scipy.stats import scoreatpercentile
ninety = scoreatpercentile(mylengths, 90)
return 0.2997924/freq/ninety*3600.*180/pl.pi # lambda/b converted to arcsec
def sfBeam1d(self,beam="", cell="", convsupport=-1, sampling=""):
Calculates PSF of image generated by gridfunction 'SF'.
Migrated codes from gjinc.sfBeam() in analysisUtil module
beam : Antenna primary beam (quantity)
cell : Cell size of image (quantity)
convsupport : convolution support used for imaging (integer)
sampling : pointing spacing of observation (quantity).
If not defined, comvolution of sampling kernel is
Estimated PSF of image (quantity).
import scipy.special as spspec
import scipy.signal as spsig
import scipy.interpolate as spintrp
if not qa.compare(beam, "rad"):
raise ValueError("beam should be a quantity of antenna primary beam size (angle)")
if not qa.compare(cell, "rad"):
raise ValueError("cell should be a quantity of image pixel size (angle)")
if len(sampling) > 0 and not qa.compare(sampling, "rad"):
raise ValueError("sampling should be a quantity of pointing spacing (angle)")
supportwidth = (convsupport*2 + 1)