1 + | |
2 + | |
3 + | |
4 + | |
5 + | |
6 + | |
7 + | |
8 + | |
9 + | |
10 + | |
11 + | |
12 + | |
13 + | |
14 + | |
15 + | |
16 + | |
17 + | |
18 + | |
19 + | |
20 + | |
21 + | |
22 + | |
23 + | |
24 + | |
25 + | |
26 + | |
27 + | |
28 + | |
29 + | |
30 + | |
31 + | |
32 + | |
33 + | |
34 + | |
35 + | import os |
36 + | import sys |
37 + | import unittest |
38 + | import copy |
39 + | import shutil |
40 + | import numpy |
41 + | |
42 + | from casatestutils import testhelper as th |
43 + | from casatestutils import compare |
44 + | |
45 + | from casatools import ctsys, synthesisutils, image, msmetadata, measures, table, quanta |
46 + | |
47 + | su = synthesisutils() |
48 + | _qa = quanta() |
49 + | |
50 + | class sutest_base(unittest.TestCase): |
51 + | |
52 + | @classmethod |
53 + | def setUpClass(cls): |
54 + | |
55 + | |
56 + | cls.datapath = '/export/home/murasame/casasrc/cas13590dev/testdata/' |
57 + | cls.ia = image() |
58 + | cls.tb = table() |
59 + | cls.msmd = msmetadata() |
60 + | @classmethod |
61 + | def tearDownClass(cls): |
62 + | cls.ia.done() |
63 + | cls.tb.done() |
64 + | cls.msmd.done() |
65 + | |
66 + | def checklistval(kind, inlist, reflist): |
67 + | state = False |
68 + | pstr = '' |
69 + | if inlist==reflist: |
70 + | state = True |
71 + | else: |
72 + | state = False |
73 + | pstr = 'Fail : {} {} expected {}'.format(kind,inlist, reflist) |
74 + | return (state, pstr) |
75 + | |
76 + | def checkdict(self,indict,refdict, testdesc=''): |
77 + | """ |
78 + | Compare dictionaries between indict and refdict |
79 + | testesc: optional test description printed with the message for failed case |
80 + | |
81 + | Returns True/False and messages in cases of failures |
82 + | also print the failure messages if at least one of the tests fails |
83 + | """ |
84 + | tests =[] |
85 + | msg = '' |
86 + | if testdesc!='': |
87 + | msg += '[ {} ]'.format(testdesc) |
88 + | for key in indict: |
89 + | if key in refdict: |
90 + | if isinstance(indict[key], numpy.ndarray): |
91 + | if isinstance(refdict[key], numpy.ndarray): |
92 + | if numpy.array_equal(indict[key],refdict[key]): |
93 + | tests.append(True) |
94 + | else: |
95 + | |
96 + | msg += "Fail: {0}:{1} (current) != {0}:{2} (reference)\n ".format(key,indict[key],refdict[key]) |
97 + | tests.append(False) |
98 + | |
99 + | else: |
100 + | msg += "Fail: Type mismatch for {}: current {}, reference {} \n".format(key,type(indict[key]), type(refdict[key])) |
101 + | elif isinstance(indict[key], list): |
102 + | if isinstance(refdict[key], list): |
103 + | if indict[key]==refdict[key]: |
104 + | tests.append(True) |
105 + | else: |
106 + | tests.append(False) |
107 + | msg += "Fail: {0}:{1} (current) != {0}:{2}(reference) \n".format(key, indict[key], refdict[key]) |
108 + | else: |
109 + | if indict[key]==refdict[key]: |
110 + | tests.append(True) |
111 + | else: |
112 + | tests.append(False) |
113 + | msg += "Fail {0}:{1} (current) != {0}:{2} (reference) \n".format(key, indict[key], refdict[key]) |
114 + | else: |
115 + | pass |
116 + | |
117 + | allpass = all(tests) |
118 + | if not allpass: |
119 + | print(msg) |
120 + | |
121 + | return (allpass, msg) |
122 + | |
123 + | def topof_to_outframef(self, vis, infreq, outframe, fieldid): |
124 + | """ |
125 + | Calulate corresponding outframe frequencies for the input TOPO |
126 + | frequency for a specific source for the given observing time stamps |
127 + | as the outframe frequency shift. |
128 + | (returns a list of measure frequency) |
129 + | """ |
130 + | _me = measures() |
131 + | |
132 + | self.tb.open(vis+'/FIELD') |
133 + | dir = self.tb.getcell('PHASE_DIR',fieldid) |
134 + | self.tb.done() |
135 + | mdir = _me.direction('J2000', str(dir[0][0])+'rad',str(dir[1][0])+'rad') |
136 + | |
137 + | |
138 + | self.tb.open(vis+'/OBSERVATION') |
139 + | tel = self.tb.getcol('TELESCOPE_NAME') |
140 + | self.tb.done() |
141 + | |
142 + | |
143 + | self.msmd.open(vis) |
144 + | tms = self.msmd.timesforfield(fieldid) |
145 + | self.msmd.done() |
146 + | |
147 + | |
148 + | _me.doframe(_me.observatory(tel[0])) |
149 + | |
150 + | _me.doframe(mdir) |
151 + | |
152 + | |
153 + | inmf = _me.frequency('TOPO',infreq) |
154 + | |
155 + | outflist=[] |
156 + | for tm in tms: |
157 + | ep = _me.epoch('UTC', str(tm)+'s') |
158 + | |
159 + | _me.doframe(ep) |
160 + | |
161 + | |
162 + | outf = _me.measure(inmf,outframe) |
163 + | |
164 + | outflist.append(outf) |
165 + | |
166 + | return outflist |
167 + | |
168 + | |
169 + | class fitpsfbeam_test(sutest_base): |
170 + | |
171 + | |
172 + | |
173 + | |
174 + | |
175 + | |
176 + | def setUp(self): |
177 + | |
178 + | self.inputims=[] |
179 + | psfim1 = 'su_fitpsfbeam_test_mfs.psf' |
180 + | self.inputims.append(psfim1) |
181 + | shutil.copytree(os.path.join(self.datapath, psfim1),psfim1) |
182 + | |
183 + | psfim2base = 'su_fitpsfbeam_test_mtmfs.psf.tt' |
184 + | for ext in ['0','1','2']: |
185 + | psfname = psfim2base+ext |
186 + | psf2ttfullpath = os.path.join(self.datapath, psfname) |
187 + | shutil.copytree(os.path.join(self.datapath, psfname),psfname) |
188 + | self.inputims.append(psfname) |
189 + | |
190 + | |
191 + | def tearDown(self): |
192 + | for img in self.inputims: |
193 + | shutil.rmtree(img) |
194 + | |
195 + | |
196 + | def test_mfs(self): |
197 + | '''Test that fitting of mfs psf works ''' |
198 + | |
199 + | |
200 + | psfim = self.inputims[0] |
201 + | self.ia.open(psfim) |
202 + | origbeam = self.ia.restoringbeam() |
203 + | |
204 + | resetbeam = copy.deepcopy(origbeam) |
205 + | resetbeam['major']['value']=1.0 |
206 + | resetbeam['minor']['value']=1.0 |
207 + | resetbeam['positionangle']['value']=0.0 |
208 + | self.ia.setrestoringbeam(major=resetbeam['major'], minor=resetbeam['minor'], pa=resetbeam['positionangle']) |
209 + | self.ia.done() |
210 + | |
211 + | |
212 + | bmref= {'major': {'unit': 'arcsec', 'value': 52.93011474609375}, 'minor': {'unit': 'arcsec', 'value': 49.147438049316406}, 'positionangle': {'unit': 'deg', 'value': -87.39420318603516}} |
213 + | |
214 + | su = synthesisutils() |
215 + | ret = su.fitPsfBeam(imagename=psfim[:-4]) |
216 + | del su |
217 + | self.ia.open(psfim) |
218 + | newbeam = self.ia.restoringbeam() |
219 + | self.ia.done() |
220 + | print("psfim=",psfim) |
221 + | print("origbeam=",origbeam) |
222 + | print("newbeam=",newbeam) |
223 + | |
224 + | self.assertTrue(ret) |
225 + | |
226 + | |
227 + | self.assertDictContainsSubset(newbeam, bmref) |
228 + | |
229 + | def test_mfs_wrong_nterms(self): |
230 + | '''Test that it catches if nterms is inconsistent with input psf (nterms=2, for a single term psf) ''' |
231 + | su = synthesisutils() |
232 + | psfim = self.inputims[0] |
233 + | |
234 + | self.assertRaises(RuntimeError, su.fitPsfBeam, imagename=psfim[:-4],nterms=2) |
235 + | del su |
236 + | |
237 + | def test_mfs_wrong_psfcutoff(self): |
238 + | '''Test that psfcutoff is given outside the allowed range (1.0) ''' |
239 + | |
240 + | |
241 + | psfim = self.inputims[0] |
242 + | |
243 + | su = synthesisutils() |
244 + | ret1 = su.fitPsfBeam(imagename=psfim[:-4],psfcutoff=1.0) |
245 + | ret2 = su.fitPsfBeam(imagename=psfim[:-4],psfcutoff=-1.0) |
246 + | del su |
247 + | self.assertFalse(ret1) |
248 + | self.assertFalse(ret2) |
249 + | |
250 + | def test_mfs_largerpsfcutoff(self): |
251 + | '''Test that psfcutoff with a valid (larger) number works ''' |
252 + | |
253 + | |
254 + | psfim = self.inputims[0] |
255 + | self.ia.open(psfim) |
256 + | origbeam = self.ia.restoringbeam() |
257 + | |
258 + | resetbeam = copy.deepcopy(origbeam) |
259 + | resetbeam['major']['value']=1.0 |
260 + | resetbeam['minor']['value']=1.0 |
261 + | resetbeam['positionangle']['value']=0.0 |
262 + | self.ia.setrestoringbeam(major=resetbeam['major'], minor=resetbeam['minor'], pa=resetbeam['positionangle']) |
263 + | self.ia.done() |
264 + | |
265 + | |
266 + | bmref= {'major': {'unit': 'arcsec', 'value': 49.89558029174805}, 'minor': {'unit': 'arcsec', 'value': 46.80238342285156}, 'positionangle': {'unit': 'deg', 'value': -88.28898620605469}} |
267 + | |
268 + | su = synthesisutils() |
269 + | ret = su.fitPsfBeam(imagename=psfim[:-4],psfcutoff=0.5) |
270 + | del su |
271 + | self.ia.open(psfim) |
272 + | newbeam = self.ia.restoringbeam() |
273 + | self.ia.done() |
274 + | print("psfim=",psfim) |
275 + | print("origbeam=",origbeam) |
276 + | print("newbeam=",newbeam) |
277 + | |
278 + | self.assertTrue(ret) |
279 + | |
280 + | |
281 + | self.assertDictContainsSubset(newbeam, bmref) |
282 + | |
283 + | def test_mtmfs_nterms2(self): |
284 + | '''Test that fitting of multiterm psf works ''' |
285 + | |
286 + | |
287 + | psfimtt0 = self.inputims[1] |
288 + | self.ia.open(psfimtt0) |
289 + | origbeam = self.ia.restoringbeam() |
290 + | |
291 + | resetbeam = copy.deepcopy(origbeam) |
292 + | resetbeam['major']['value']=1.0 |
293 + | resetbeam['minor']['value']=1.0 |
294 + | resetbeam['positionangle']['value']=0.0 |
295 + | self.ia.setrestoringbeam(major=resetbeam['major'], minor=resetbeam['minor'], pa=resetbeam['positionangle']) |
296 + | self.ia.done() |
297 + | |
298 + | |
299 + | bmref= {'major': {'unit': 'arcsec', 'value': 52.93011474609375}, 'minor': {'unit': 'arcsec', 'value': 49.147438049316406}, 'positionangle': {'unit': 'deg', 'value': -87.39420318603516}} |
300 + | |
301 + | su = synthesisutils() |
302 + | |
303 + | ret = su.fitPsfBeam(imagename=psfimtt0[:-8],nterms=2) |
304 + | del su |
305 + | self.ia.open(psfimtt0) |
306 + | newbeam = self.ia.restoringbeam() |
307 + | self.ia.done() |
308 + | |
309 + | |
310 + | |
311 + | |
312 + | self.assertTrue(ret) |
313 + | |
314 + | |
315 + | self.assertDictContainsSubset(newbeam, bmref) |
316 + | |
317 + | class getoptimumsize_test(sutest_base): |
318 + | |
319 + | |
320 + | |
321 + | def setUp(self): |
322 + | pass |
323 + | |
324 + | def tearDown(self): |
325 + | pass |
326 + | |
327 + | def test_default(self): |
328 + | '''Test default size''' |
329 + | su = synthesisutils() |
330 + | ret = su.getOptimumSize() |
331 + | del su |
332 + | self.assertEqual(ret,100) |
333 + | |
334 + | def test_2(self): |
335 + | '''Test odd non-optimal number''' |
336 + | su = synthesisutils() |
337 + | ret = su.getOptimumSize(size=501) |
338 + | del su |
339 + | self.assertEqual(ret,512) |
340 + | |
341 + | def test_3(self): |
342 + | '''Test even non-optimal number ''' |
343 + | su = synthesisutils() |
344 + | ret = su.getOptimumSize(size=510) |
345 + | del su |
346 + | self.assertEqual(ret,512) |
347 + | |
348 + | class advisechansel_test(sutest_base): |
349 + | |
350 + | |
351 + | |
352 + | def setUp(self): |
353 + | |
354 + | self.inputmses=[] |
355 + | ms1 = 'twhya.short.ms' |
356 + | ms2 = 'alma_ephemobj_icrs.ms' |
357 + | self.inputmses.append(ms1) |
358 + | self.inputmses.append(ms2) |
359 + | for inms in self.inputmses: |
360 + | if not os.path.exists(inms): |
361 + | shutil.copytree(os.path.join(self.datapath, inms),inms) |
362 + | |
363 + | |
364 + | |
365 + | |
366 + | |
367 + | |
368 + | |
369 + | |
370 + | |
371 + | |
372 + | def TearDown(self): |
373 + | for inpdata in self.inputmses: |
374 + | if os.path.exists(inpdata): |
375 + | shutil.rmtree(indata) |
376 + | |
377 + | |
378 + | def test_advisechanelsel_datasel(self): |
379 + | '''Test that data seleciton parameters or given frequency range, etc is returned correctly''' |
380 + | |
381 + | |
382 + | |
383 + | su = synthesisutils() |
384 + | |
385 + | inputms = 'twhya.short.ms' |
386 + | |
387 + | fsttopo = '356.55897119140625GHz' |
388 + | fentopo = '358.2029418946312GHz' |
389 + | res = su.advisechansel(freqstart=fsttopo,freqend=fentopo, freqframe='TOPO', fieldid=2, msname=inputms) |
390 + | |
391 + | |
392 + | refdict = {'nchan': numpy.array([3340, 3840]), 'spw': numpy.array([0, 1]), 'start': numpy.array([500, 0])} |
393 + | |
394 + | (pof, errmsg) = self.checkdict(res,refdict, 'TOPO test') |
395 + | |
396 + | |
397 + | print("inputms=",inputms) |
398 + | listoffreqs = self.topof_to_outframef(inputms, fsttopo, 'LSRK', 2) |
399 + | fval = [] |
400 + | for f in listoffreqs: |
401 + | fval.append(f['m0']['value']) |
402 + | fstlsrk = str(_qa.convert(str(min(fval))+'Hz', 'GHz')['value'])+'GHz' |
403 + | |
404 + | listoffreqs = self.topof_to_outframef(inputms, fentopo, 'LSRK', 2) |
405 + | fval.clear() |
406 + | for f in listoffreqs: |
407 + | fval.append(f['m0']['value']) |
408 + | fenlsrk = str(_qa.convert(str(min(fval))+'Hz', 'GHz')['value'])+'GHz' |
409 + | |
410 + | |
411 + | reslsrk = su.advisechansel(freqstart=fstlsrk,freqend=fenlsrk, freqframe='LSRK', fieldid=2, msname=inputms) |
412 + | del su |
413 + | |
414 + | (poflsrk, errmsglsrk) = self.checkdict(reslsrk,refdict,'LSRK test') |
415 + | self.assertTrue(all([pof,poflsrk])) |
416 + | |
417 + | def test_advisechanelsel_datasel_ephem(self): |
418 + | '''Test that data selection parameters for given frequency range,etc for ephemeris object returned correctly''' |
419 + | |
420 + | su = synthesisutils() |
421 + | |
422 + | inputms = 'alma_ephemobj_icrs.ms' |
423 + | |
424 + | fst = '220.34439739444292GHz' |
425 + | fen = '220.29556926944292GHz' |
426 + | cw = '122.0703125kHz' |
427 + | res = su.advisechansel(freqstart=fst,freqend=fen, freqstep=cw, freqframe='SOURCE', fieldid=1, ephemtable='TRACKFIELD', msname=inputms) |
428 + | |
429 + | |
430 + | refdict = {'nchan': numpy.array([402]), 'spw': numpy.array([2]), 'start': numpy.array([251])} |
431 + | |
432 + | (pof, errmsg) = self.checkdict(res,refdict, 'internal ephem table test') |
433 + | |
434 + | |
435 + | shutil.copytree(inputms+'/FIELD/EPHEM0_Uranus_57362.91000000.tab', 'external_EPHEM0_Uranus_57362.91000000.tab') |
436 + | resext = su.advisechansel(freqstart=fst,freqend=fen, freqstep=cw, freqframe='SOURCE', fieldid=1, ephemtable='external_EPHEM0_Uranus_57362.91000000.tab', msname=inputms) |
437 + | (pofext, errmsgext) = self.checkdict(resext,refdict,'external ephem table test') |
438 + | shutil.rmtree('external_EPHEM0_Uranus_57362.91000000.tab') |
439 + | |
440 + | |
441 + | resdef = su.advisechansel(freqstart=fst,freqend=fen, freqstep=cw, freqframe='SOURCE', fieldid=1, ephemtable='Uranus', msname=inputms) |
442 + | (pofdef, errmsgdef) = self.checkdict(resdef,refdict,'default ephem table test') |
443 + | |
444 + | del su |
445 + | |
446 + | self.assertTrue(all([pof,pofext,pofdef])) |
447 + | |
448 + | |
449 + | def test_su_adivsechansel_getfreqrange |
450 + | '''Test that frequency range for givne data selections returned correctly''' |
451 + | |
452 + | pass |
453 + | |
454 + | def test_su_advisechansel_getfreqrange_ephem |
455 + | '''Test that frequency range for givne data selections for an ephemeris object returned correctly''' |
456 + | |
457 + | pass |
458 + | |
459 + | def test_su_adivsechanel_defaults |
460 + | '''Test non specified parameter case for proper error/warning message ''' |
461 + | pass |
462 + | |
463 + | |
464 + | |
465 + | |
466 + | |
467 + | |
468 + | if __name__ == '__main__': |
469 + | unittest.main() |