from __future__ import absolute_import
from __future__ import print_function
from casatasks.private.casa_transition import is_CASA6
from casatasks import cvel2, cvel, split, importuvfits, partition
from casatools import ms as mstool
from casatools import ctsys, table, quanta
from casatasks.private.parallel.parallel_task_helper import ParallelTaskHelper
from casatasks.private.parallel.parallel_data_helper import ParallelDataHelper
datapath = ctsys.resolve('unittest/cvel/')
ephedata = ctsys.resolve('ephemerides/JPL-Horizons/Jupiter_54708-55437dUTC.tab')
from __main__ import default
from tasks import cvel2, cvel, partition, importuvfits, split
from taskinit import tbtool, mstool, qa
from parallel.parallel_task_helper import ParallelTaskHelper
from parallel.parallel_data_helper import ParallelDataHelper
datapath = os.path.join(os.environ['CASAPATH'].split()[0],'casatestdata/unittest/cvel/')
ephedata = os.path.join(os.environ['CASAPATH'].split()[0],'data/ephemerides/JPL-Horizons/Jupiter_54708-55437dUTC.tab')
if 'TEST_DATADIR' in os.environ:
DATADIR = str(os.environ.get('TEST_DATADIR'))+'/cvel/'
if os.path.isdir(DATADIR):
print('cvel2 tests will use data from '+datapath)
if 'BYPASS_PARALLEL_PROCESSING' in os.environ:
ParallelTaskHelper.bypassParallelProcessing(1)
vis_b = 'ARP299F_sma_2scans_24spws_negative_chanwidth.ms'
vis_c = 'jupiter6cm.demo-thinned.ms'
vis_d = 'g19_d2usb_targets_line-shortened-thinned.ms'
vis_e = 'evla-highres-sample-thinned.ms'
outfile = 'cvel-output.ms'
def verify_ms(msname, expnumspws, expnumchan, inspw, expchanfreqs=[]):
mytb.open(msname+'/SPECTRAL_WINDOW')
nc = mytb.getcell("NUM_CHAN", inspw)
cf = mytb.getcell("CHAN_FREQ", inspw)
dimdata = mytb.getcell("FLAG", 0)[0].size
msg = "Found "+str(nr)+", expected "+str(expnumspws)+" spectral windows in "+msname
if not (nc == expnumchan):
msg = "Found "+ str(nc) +", expected "+str(expnumchan)+" channels in spw "+str(inspw)+" in "+msname
if not (dimdata == expnumchan):
msg = "Found "+ str(dimdata) +", expected "+str(expnumchan)+" channels in FLAG column in "+msname
if not (expchanfreqs==[]):
print("Testing channel frequencies ...")
if not (expchanfreqs.size == expnumchan):
msg = "Internal error: array of expected channel freqs should have dimension ", expnumchan
df = (cf - expchanfreqs)/expchanfreqs
if not (abs(df) < 1E-8).all:
msg = "channel frequencies in spw "+str(inspw)+" differ from expected values by (relative error) "+str(df)