Source
xxxxxxxxxx
3
3
import shutil
4
4
import unittest
5
5
import numpy as np
6
6
import os
7
7
import filecmp
8
8
import numpy
9
9
10
10
from casatasks.private.casa_transition import is_CASA6
11
11
if is_CASA6:
12
12
from casatools import ctsys, table, ms
13
-
from casatasks import setjy
13
+
from casatasks import setjy, partition
14
+
from casatasks.private.parallel.parallel_task_helper import ParallelTaskHelper
14
15
mslocal = ms()
15
16
ctsys_resolve = ctsys.resolve
16
17
else:
17
18
from tasks import *
18
19
from taskinit import *
19
20
from taskinit import tbtool as table
21
+
from parallel.parallel_task_helper import ParallelTaskHelper
20
22
from casa_stack_manip import stack_frame_find
21
23
from __main__ import default
22
24
mslocal = mstool()
23
25
dataRoot = os.path.join(os.environ.get('CASAPATH').split()[0],'data')
24
26
def ctsys_resolve(apath):
25
27
return os.path.join(dataRoot,apath)
26
28
27
29
"""
28
30
Unit tests for task setjy.
29
31
63
65
# Create working area
64
66
# setjydatapath = 'unittest/setjy/'
65
67
# 2015-02-02 TT: this seems unnessary directory layer...
66
68
#if not os.path.exists(setjydatapath):
67
69
# print "\nCreate working area..."
68
70
# os.system('mkdir -p '+setjydatapath)
69
71
70
72
# Create a new fresh copy of the MS
71
73
print("\nCreate a new local copy of the MS...")
72
74
#print(" setjydatapath=",setjydatapath, " inpms=",self.inpms)
75
+
#print(" setjydatapath=",datapath, " inpms=",self.inpms)
73
76
if testmms:
74
77
os.system('cp -rH ' + os.path.join(datapath,self.inpms) + ' ' + self.inpms)
75
-
elif ismms:
76
-
os.system('cp -rH ' + os.path.join(datapath+'/mms',self.inpms) + ' ' + self.inpms)
78
+
#elif ismms:
79
+
# os.system('cp -rH ' + os.path.join(datapath+'/mms',self.inpms) + ' ' + self.inpms)
77
80
else:
78
81
os.system('cp -rf ' + os.path.join(datapath,self.inpms) + ' ' + self.inpms)
82
+
83
+
if ismms:
84
+
self.createMMS(self.inpms)
79
85
80
86
def resetMS(self):
81
87
82
88
if os.path.exists(self.inpms):
83
89
print("\nRemoving a local copy of MS from the previous test...")
84
90
#ret = os.system('rm -rf unittest/setjy/*')
85
91
shutil.rmtree(self.inpms)
92
+
93
+
if hasattr(self, 'inpmms') and os.path.exists(self.inpmms):
94
+
print("\nRemoving a local copy of MMS from the previous test...")
95
+
shutil.rmtree(self.inpmms)
96
+
86
97
87
98
def get_last_history_line(self,vis, origin='setjy::imager::setjy()',
88
99
nback=0, maxnback=20, hint=''):
89
100
"""
90
101
Finding the right history line is a bit tricky...it helps to filter
91
102
by origin and read from the back to remain unaffected by changes
92
103
elsewhere.
93
104
94
105
This reads up to maxnback lines with origin origin until it finds one
95
106
including hint in the message, going backwards from nback lines from the
152
163
if not are_eq:
153
164
raise ValueError('!=')
154
165
except ValueError:
155
166
errmsg = "%r != %r" % (val, expval)
156
167
if (len(errmsg) > 66): # 66 = 78 - len('ValueError: ')
157
168
errmsg = "\n%r\n!=\n%r" % (val, expval)
158
169
raise ValueError(errmsg)
159
170
except Exception:
160
171
print("Error comparing", val, "to", expval)
161
172
raise
173
+
174
+
def createMMS(self, msname):
175
+
'''Create MMSs for tests with input MMS'''
176
+
prefix = msname.rstrip('.ms')
177
+
if not os.path.exists(msname):
178
+
os.system('cp -RL '+os.path.join(datapath,msname)+' '+ msname)
179
+
180
+
# Create an MMS for the tests
181
+
self.inpmms = prefix + ".test.mms"
182
+
#default(partition)
183
+
184
+
if os.path.exists(self.inpmms):
185
+
os.system("rm -rf " + self.inpmms)
186
+
if os.path.exists(self.inpmms+'.flagversions'):
187
+
os.system("rm -rf " + self.inpmms +'.flagversions')
188
+
189
+
print("................. Creating test MMS ..................")
190
+
partition(vis=msname, outputvis=self.inpmms, flagbackup=False, separationaxis='auto')
162
191
163
192
164
193
class test_SingleObservation(SetjyUnitTestBase):
165
194
"""Test single observation MS"""
166
195
167
196
def setUp(self):
168
197
# Replaced with a realistic ALMA data
169
198
# - use modified version of CASAGuide's TWHya data (X3c1_wvrtsys.ms with only first 4scans)
170
199
# 4 spws (wvr spw is already split out)
171
200
#self.setUpMS("unittest/setjy/2528.ms") # Uranus
1192
1221
outfldid = ky
1193
1222
break
1194
1223
ret = len(outfldid)
1195
1224
if not ret:
1196
1225
print("FAIL: missing field = %s in the returned dictionary" % self.field)
1197
1226
#self.check_eq(sjran['12']['0']['fluxd'][0],1.15116881972,0.0001)
1198
1227
#self.check_eq(sjran['12']['1']['fluxd'][0],1.15111995508,0.0001)
1199
1228
self.check_eq(sjran['12']['0']['fluxd'][0],0.99137,0.0001)
1200
1229
self.check_eq(sjran['12']['1']['fluxd'][0],0.99132,0.0001)
1201
1230
self.assertTrue(ret)
1202
-
print("ret=%s" % sjran)
1231
+
#print("ret=%s" % sjran)
1203
1232
1204
1233
class test_newStandards_MMS(SetjyUnitTestBase):
1205
1234
"""Test simple Stnadard Scaling with MMS data"""
1235
+
# can be just mpicasa specific tests but for now it will be tested for serial
1206
1236
def setUp(self):
1207
1237
# MMS version of the data is stored in the subdirectory, mms
1208
1238
prefix = 'n1333_1'
1209
1239
msname=prefix+'.ms'
1210
1240
# this is MMS
1211
1241
self.setUpMS(msname, True)
1212
1242
self.field='0542+498_1' #3C147
1213
1243
1214
1244
def tearDown(self):
1215
1245
self.resetMS()
1216
1246
#pass
1217
1247
1218
1248
def test_PB2013_MMS(self):
1219
1249
self.modelim = ""
1220
-
sjran = setjy(vis=self.inpms,
1250
+
sjran = setjy(vis=self.inpmms,
1221
1251
field=self.field,
1222
1252
modimage=self.modelim,
1223
1253
standard='Perley-Butler 2013',
1224
1254
usescratch=True
1225
1255
)
1226
1256
ret = True
1227
1257
if type(sjran)!=dict:
1228
1258
ret = False
1229
1259
else:
1230
1260
outfldid = ""
1235
1265
ret = len(outfldid)
1236
1266
if not ret:
1237
1267
print("FAIL: missing field = %s in the returned dictionary" % self.field)
1238
1268
self.check_eq(sjran['12']['0']['fluxd'][0],0.99137,0.0001)
1239
1269
self.check_eq(sjran['12']['1']['fluxd'][0],0.99132,0.0001)
1240
1270
self.assertTrue(ret)
1241
1271
#print("ret=%s" % sjran)
1242
1272
1243
1273
def test_PB2017_MMS(self):
1244
1274
self.modelim = ""
1245
-
sjran = setjy(vis=self.inpms,
1275
+
sjran = setjy(vis=self.inpmms,
1246
1276
field=self.field,
1247
1277
modimage=self.modelim,
1248
1278
standard='Perley-Butler 2017',
1249
1279
usescratch=True
1250
1280
)
1251
1281
ret = True
1252
1282
if type(sjran)!=dict:
1253
1283
ret = False
1254
1284
else:
1255
1285
outfldid = ""