1 + | |
2 + | |
3 + | |
4 + | |
5 + | |
6 + | |
7 + | |
8 + | |
9 + | |
10 + | |
11 + | |
12 + | |
13 + | |
14 + | |
15 + | |
16 + | |
17 + | |
18 + | |
19 + | CASA6=False |
20 + | try: |
21 + | import casatools |
22 + | from casatasks import casalog |
23 + | cb = casatools.calibrater() |
24 + | tb = casatools.table() |
25 + | CASA6 = True |
26 + | except ImportError: |
27 + | from __main__ import default |
28 + | from tasks import * |
29 + | from taskinit import * |
30 + | |
31 + | import os |
32 + | import shutil |
33 + | import unittest |
34 + | import numpy as np |
35 + | |
36 + | reg_unittest_datap = 'unittest/calibrater/' |
37 + | if CASA6: |
38 + | datapath = casatools.ctsys.resolve(reg_unittest_datap) |
39 + | else: |
40 + | datapath = os.path.join(os.path.join(os.environ.get('CASAPATH').split()[0], |
41 + | 'casatestdata'), reg_unittest_datap) |
42 + | |
43 + | |
44 + | |
45 + | if CASA6: |
46 + | validator_exc_type = AssertionError |
47 + | else: |
48 + | from casa_stack_manip import stack_frame_find |
49 + | casa_stack_rethrow = stack_frame_find().get('__rethrow_casa_exceptions', False) |
50 + | validator_exc_type = RuntimeError |
51 + | |
52 + | |
53 + | class calibrater_test(unittest.TestCase): |
54 + | |
55 + | @classmethod |
56 + | def setUpClass(cls): |
57 + | cls._vis = 'gaincaltest2.ms' |
58 + | cls._visObs = 'Itziar.ms' |
59 + | cls._cal = 'gaincaltest2.ms.G0' |
60 + | cls._lib = os.path.join(datapath, 'refcallib.txt') |
61 + | |
62 + | shutil.copytree(os.path.join(datapath, 'ngc5921.ms'), 'ngc5921.ms') |
63 + | cls._bandvis = 'ngc5921.ms' |
64 + | |
65 + | @classmethod |
66 + | def tearDownClass(cls): |
67 + | shutil.rmtree(cls._bandvis) |
68 + | |
69 + | def setUp(self): |
70 + | shutil.copytree(os.path.join(datapath, self._vis), self._vis) |
71 + | shutil.copytree(os.path.join(datapath, self._visObs), self._visObs) |
72 + | shutil.copytree(os.path.join(datapath, self._cal), self._cal) |
73 + | |
74 + | def tearDown(self): |
75 + | if os.path.exists('testlog.log'): |
76 + | os.remove('testlog.log') |
77 + | if os.path.exists('testcalout.cal'): |
78 + | shutil.rmtree('testcalout.cal') |
79 + | if os.path.exists('bpoly'): |
80 + | shutil.rmtree('bpoly') |
81 + | if os.path.exists('gainspline'): |
82 + | shutil.rmtree('gainspline') |
83 + | |
84 + | cb.close() |
85 + | cb.setvi(old=False) |
86 + | shutil.rmtree(self._vis) |
87 + | shutil.rmtree(self._cal) |
88 + | shutil.rmtree(self._visObs) |
89 + | |
90 + | |
91 + | def test_takesMs(self): |
92 + | """ Check that the calibrater tool can open and close an MS """ |
93 + | |
94 + | cb.open(self._vis) |
95 + | |
96 + | cache = tb.showcache() |
97 + | self.assertTrue(len(cache) > 0) |
98 + | |
99 + | cb.close() |
100 + | cache = tb.showcache() |
101 + | self.assertTrue(len(cache) == 0) |
102 + | |
103 + | def test_activityRecord(self): |
104 + | """ Check that using the calibrater to modify the vis shows in the activity record """ |
105 + | |
106 + | |
107 + | cb.open(self._vis) |
108 + | cb.setapply(table=self._cal) |
109 + | cb.corrupt() |
110 + | |
111 + | actRecord = cb.activityrec() |
112 + | self.assertTrue(len(actRecord) > 0) |
113 + | |
114 + | def test_standardPath(self): |
115 + | """ open setapply setsolve state solve close """ |
116 + | |
117 + | cb.open(self._vis) |
118 + | cb.setapply(table=self._cal) |
119 + | cb.setsolve(table='output.ms') |
120 + | cb.state() |
121 + | cb.solve() |
122 + | cb.close() |
123 + | |
124 + | self.assertTrue(os.path.exists('output.ms')) |
125 + | |
126 + | def test_createEmpty(self): |
127 + | """ Check that an empty cal table can be created""" |
128 + | |
129 + | cb.open(self._vis) |
130 + | cb.createcaltable(caltable='testcalout.cal', partype='', caltype='GAIN', singlechan=True) |
131 + | |
132 + | self.assertTrue(os.path.exists('testcalout.cal')) |
133 + | |
134 + | def test_writeToCorrected(self): |
135 + | """ Check that the tool writes to the CORRECTED_DATA column """ |
136 + | |
137 + | |
138 + | tb.open(self._vis) |
139 + | columns = tb.colnames() |
140 + | tb.close() |
141 + | |
142 + | self.assertFalse('CORRECTED_DATA' in columns) |
143 + | |
144 + | cb.open(self._vis) |
145 + | cb.setapply(table=self._cal) |
146 + | cb.correct() |
147 + | cb.close() |
148 + | |
149 + | |
150 + | tb.open(self._vis) |
151 + | columns = tb.colnames() |
152 + | tb.close() |
153 + | |
154 + | self.assertTrue('CORRECTED_DATA' in columns) |
155 + | |
156 + | def test_done(self): |
157 + | """ Check that done closes the active calibrator tool """ |
158 + | |
159 + | |
160 + | self.assertTrue(len(tb.showcache()) == 0, msg="The cache is not empty to begin with") |
161 + | cb.open(self._vis) |
162 + | self.assertTrue(len(tb.showcache()) > 0) |
163 + | cb.close() |
164 + | |
165 + | def test_reinitModel(self): |
166 + | """ Check that initcalset will reset the CORRECTED_DATA to unity """ |
167 + | |
168 + | |
169 + | cb.open(self._vis) |
170 + | cb.setapply(table=self._cal) |
171 + | cb.setsolve(table=self._cal) |
172 + | cb.solve() |
173 + | cb.close() |
174 + | |
175 + | |
176 + | cb.setvi(old=True) |
177 + | |
178 + | |
179 + | tb.open(self._vis, nomodify=False) |
180 + | col = tb.getcol('MODEL_DATA') + 2 |
181 + | tb.putcol('MODEL_DATA', col) |
182 + | tb.close() |
183 + | |
184 + | |
185 + | cb.open(self._vis) |
186 + | cb.initcalset() |
187 + | cb.close() |
188 + | |
189 + | |
190 + | tb.open(self.vis) |
191 + | col = tb.getcol('MODEL_DATA') |
192 + | tb.close() |
193 + | |
194 + | self.assertTrue(np.all(col[0] == 1)) |
195 + | self.assertTrue(np.all(col[1] == 0)) |
196 + | self.assertTrue(np.all(col[2] == 0)) |
197 + | self.assertTrue(np.all(col[3] == 1)) |
198 + | |
199 + | def test_applyPosAngCal(self): |
200 + | |
201 + | pass |
202 + | |
203 + | def test_resetSolveApply(self): |
204 + | """ Check that the reset function can clear set apply and solves """ |
205 + | |
206 + | |
207 + | logpath = casalog.logfile() |
208 + | |
209 + | cb.open(self._vis) |
210 + | cb.setapply(table=self._cal) |
211 + | cb.setsolve(table=self._cal) |
212 + | |
213 + | |
214 + | casalog.setlogfile('testlog.log') |
215 + | cb.state() |
216 + | casalog.setlogfile(logpath) |
217 + | |
218 + | |
219 + | counter = 0 |
220 + | applyset = False |
221 + | solveset = False |
222 + | |
223 + | with open('testlog.log') as logout: |
224 + | for line in logout: |
225 + | if counter == 1 and "(None)" not in line: |
226 + | applyset = True |
227 + | if counter == 3 and "(None)" not in line: |
228 + | solveset = True |
229 + | counter += 1 |
230 + | |
231 + | self.assertTrue(applyset) |
232 + | self.assertTrue(solveset) |
233 + | |
234 + | |
235 + | |
236 + | os.remove('testlog.log') |
237 + | cb.reset(apply=True, solve=True) |
238 + | casalog.setlogfile('testlog.log') |
239 + | cb.state() |
240 + | casalog.setlogfile(logpath) |
241 + | |
242 + | |
243 + | counter = 0 |
244 + | applyset = False |
245 + | solveset = False |
246 + | |
247 + | with open('testlog.log') as logout: |
248 + | for line in logout: |
249 + | if counter == 1 and "(None)" not in line: |
250 + | applyset = True |
251 + | if counter == 3 and "(None)" not in line: |
252 + | solveset = True |
253 + | counter += 1 |
254 + | |
255 + | |
256 + | self.assertFalse(applyset) |
257 + | self.assertFalse(solveset) |
258 + | |
259 + | cb.close() |
260 + | |
261 + | |
262 + | |
263 + | def test_selectVisSpw(self): |
264 + | """Check that spw is properly selected by selectvis""" |
265 + | |
266 + | rowswithspw = [] |
267 + | rowswithoutspw = [] |
268 + | tb.open(self._vis) |
269 + | datacol = tb.getcol('DATA_DESC_ID') |
270 + | |
271 + | for i in range(len(datacol)): |
272 + | if datacol[i] == 0: |
273 + | rowswithspw.append(i) |
274 + | else: |
275 + | rowswithoutspw.append(i) |
276 + | |
277 + | |
278 + | beforedata = tb.getcol('DATA')[0][0] |
279 + | tb.close() |
280 + | |
281 + | |
282 + | cb.open(self._vis) |
283 + | cb.selectvis(spw=0) |
284 + | cb.setapply(table=self._cal) |
285 + | cb.correct() |
286 + | cb.close() |
287 + | |
288 + | selectedrows = [] |
289 + | unselectedrows = [] |
290 + | |
291 + | |
292 + | tb.open(self._vis) |
293 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
294 + | |
295 + | for i in rowswithspw: |
296 + | if datacol[i] != beforedata[i]: |
297 + | selectedrows.append(True) |
298 + | |
299 + | for i in rowswithoutspw: |
300 + | if datacol[i] == beforedata[i]: |
301 + | unselectedrows.append(True) |
302 + | tb.close() |
303 + | |
304 + | |
305 + | self.assertTrue(len(rowswithspw) == len(selectedrows)) |
306 + | self.assertTrue(len(rowswithoutspw) == len(unselectedrows)) |
307 + | |
308 + | |
309 + | |
310 + | def test_selectVisTime(self): |
311 + | """Check that time is properly selected by selectvis""" |
312 + | |
313 + | tb.open(self._vis) |
314 + | before = tb.getcol('DATA') |
315 + | tb.close() |
316 + | |
317 + | |
318 + | cb.open(self._vis) |
319 + | cb.selectvis(time='>04:38:23') |
320 + | cb.setapply(table=self._cal) |
321 + | cb.correct() |
322 + | cb.close() |
323 + | |
324 + | tb.open(self._vis) |
325 + | after = tb.getcol('CORRECTED_DATA') |
326 + | tb.close() |
327 + | |
328 + | |
329 + | self.assertFalse(np.array_equal(before, after)) |
330 + | |
331 + | |
332 + | def test_selectVisScan(self): |
333 + | """Check tht the scan is properly selected by selectvis""" |
334 + | rowswithscan = [] |
335 + | rowswithoutscan = [] |
336 + | tb.open(self._vis) |
337 + | datacol = tb.getcol('SCAN_NUMBER') |
338 + | |
339 + | for i in range(len(datacol)): |
340 + | if datacol[i] == 2: |
341 + | rowswithscan.append(i) |
342 + | else: |
343 + | rowswithoutscan.append(i) |
344 + | |
345 + | |
346 + | beforedata = tb.getcol('DATA')[0][0] |
347 + | tb.close() |
348 + | |
349 + | |
350 + | cb.open(self._vis) |
351 + | cb.selectvis(scan=2) |
352 + | cb.setapply(table=self._cal) |
353 + | cb.correct() |
354 + | cb.close() |
355 + | |
356 + | selectedrows = [] |
357 + | unselectedrows = [] |
358 + | |
359 + | |
360 + | tb.open(self._vis) |
361 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
362 + | |
363 + | for i in rowswithscan: |
364 + | if datacol[i] != beforedata[i]: |
365 + | selectedrows.append(True) |
366 + | |
367 + | for i in rowswithoutscan: |
368 + | if datacol[i] == beforedata[i]: |
369 + | unselectedrows.append(True) |
370 + | tb.close() |
371 + | |
372 + | |
373 + | self.assertTrue(len(rowswithscan) == len(selectedrows)) |
374 + | self.assertTrue(len(rowswithoutscan) == len(unselectedrows)) |
375 + | |
376 + | def test_selectVisField(self): |
377 + | """Check that the field is properly selected by selectvis""" |
378 + | rowswithfield = [] |
379 + | rowswithoutfield = [] |
380 + | tb.open(self._vis) |
381 + | datacol = tb.getcol('FIELD_ID') |
382 + | |
383 + | for i in range(len(datacol)): |
384 + | if datacol[i] == 0: |
385 + | rowswithfield.append(i) |
386 + | else: |
387 + | rowswithoutfield.append(i) |
388 + | |
389 + | |
390 + | beforedata = tb.getcol('DATA')[0][0] |
391 + | tb.close() |
392 + | |
393 + | |
394 + | cb.open(self._vis) |
395 + | cb.selectvis(field=0) |
396 + | cb.setapply(table=self._cal) |
397 + | cb.correct() |
398 + | cb.close() |
399 + | |
400 + | selectedrows = [] |
401 + | unselectedrows = [] |
402 + | |
403 + | |
404 + | tb.open(self._vis) |
405 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
406 + | |
407 + | for i in rowswithfield: |
408 + | if datacol[i] != beforedata[i]: |
409 + | selectedrows.append(True) |
410 + | |
411 + | for i in rowswithoutfield: |
412 + | if datacol[i] == beforedata[i]: |
413 + | unselectedrows.append(True) |
414 + | tb.close() |
415 + | |
416 + | |
417 + | self.assertTrue(len(rowswithfield) == len(selectedrows)) |
418 + | self.assertTrue(len(rowswithoutfield) == len(unselectedrows)) |
419 + | |
420 + | def test_selectVisIntent(self): |
421 + | """Check that the intent is properly selected by selectvis""" |
422 + | |
423 + | tb.open(self._vis + '/STATE', nomodify=False) |
424 + | tb.addrows(1) |
425 + | data = tb.getcol('OBS_MODE') |
426 + | data[1] = 'CALIBRATE_DELAY#ON_SOURCE,CALIBRATE_PHASE#ON_SOURCE,CALIBRATE_WVR#ON_SOURCE' |
427 + | tb.putcol('OBS_MODE', data) |
428 + | tb.close() |
429 + | |
430 + | |
431 + | tb.open(self._vis, nomodify=False) |
432 + | data = tb.getcol('STATE_ID') |
433 + | data[10:5000] |
434 + | tb.putcol('STATE_ID', data) |
435 + | tb.close() |
436 + | |
437 + | tb.open(self._vis) |
438 + | |
439 + | beforedata = tb.getcol('DATA')[0][0] |
440 + | tb.close() |
441 + | |
442 + | |
443 + | cb.open(self._vis) |
444 + | cb.selectvis(intent='*AMPLI*') |
445 + | cb.setapply(table=self._cal) |
446 + | cb.correct() |
447 + | cb.close() |
448 + | |
449 + | |
450 + | tb.open(self._visObs) |
451 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
452 + | |
453 + | self.assertFalse(np.array_equal(datacol, beforedata)) |
454 + | |
455 + | |
456 + | def test_selectVisObs(self): |
457 + | |
458 + | tb.open(self._vis, nomodify=False) |
459 + | obsids = tb.getcol('OBSERVATION_ID') |
460 + | obsids[10:10000] = 1 |
461 + | tb.putcol('OBSERVATION_ID', obsids) |
462 + | tb.close() |
463 + | |
464 + | tb.open(self._cal, nomodify=False) |
465 + | obsids = tb.getcol('OBSERVATION_ID') |
466 + | obsids[20:30] = 1 |
467 + | tb.putcol('OBSERVATION_ID', obsids) |
468 + | tb.close() |
469 + | |
470 + | rowswithobs = [] |
471 + | rowswithoutobs = [] |
472 + | tb.open(self._vis) |
473 + | datacol = tb.getcol('OBSERVATION_ID') |
474 + | |
475 + | for i in range(len(datacol)): |
476 + | if datacol[i] == 0: |
477 + | rowswithobs.append(i) |
478 + | else: |
479 + | rowswithoutobs.append(i) |
480 + | |
481 + | |
482 + | beforedata = tb.getcol('DATA')[0][0] |
483 + | tb.close() |
484 + | |
485 + | |
486 + | cb.open(self._vis) |
487 + | cb.selectvis(observation=0) |
488 + | cb.setapply(table=self._cal) |
489 + | cb.correct() |
490 + | cb.close() |
491 + | |
492 + | |
493 + | tb.open(self._visObs) |
494 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
495 + | |
496 + | self.assertFalse(np.array_equal(datacol, beforedata)) |
497 + | |
498 + | def test_selectVisUVrange(self): |
499 + | """Check that selectvis properly selects for uvrange""" |
500 + | |
501 + | |
502 + | tb.open(self._visObs) |
503 + | beforedata = tb.getcol('DATA')[0][0] |
504 + | tb.close() |
505 + | |
506 + | |
507 + | cb.open(self._vis) |
508 + | cb.selectvis(uvrange='> 500000lambda') |
509 + | cb.setapply(table=self._cal) |
510 + | cb.correct() |
511 + | cb.close() |
512 + | |
513 + | |
514 + | tb.open(self._vis) |
515 + | afterdata = tb.getcol('DATA')[0][0] |
516 + | tb.close() |
517 + | |
518 + | |
519 + | issame = True |
520 + | |
521 + | for i in range(len(beforedata)): |
522 + | if beforedata[i] != afterdata[i]: |
523 + | issame = False |
524 + | break |
525 + | |
526 + | self.assertFalse(issame) |
527 + | |
528 + | def test_selectVisBaseline(self): |
529 + | """Check that selectvis properly selects baseline/antenna""" |
530 + | rowswithant = [] |
531 + | rowswithoutant = [] |
532 + | tb.open(self._vis) |
533 + | datacol = tb.getcol('ANTENNA1') |
534 + | datacol2 = tb.getcol('ANTENNA2') |
535 + | |
536 + | for i in range(len(datacol)): |
537 + | if datacol[i] == 0 or datacol2[i] == 0: |
538 + | rowswithant.append(i) |
539 + | else: |
540 + | rowswithoutant.append(i) |
541 + | |
542 + | |
543 + | beforedata = tb.getcol('DATA')[0][0] |
544 + | tb.close() |
545 + | |
546 + | |
547 + | cb.open(self._vis) |
548 + | cb.selectvis(baseline=0) |
549 + | cb.setapply(table=self._cal) |
550 + | cb.correct() |
551 + | cb.close() |
552 + | |
553 + | selectedrows = [] |
554 + | unselectedrows = [] |
555 + | |
556 + | |
557 + | tb.open(self._vis) |
558 + | datacol = tb.getcol('CORRECTED_DATA')[0][0] |
559 + | |
560 + | for i in rowswithant: |
561 + | if datacol[i] != beforedata[i]: |
562 + | selectedrows.append(True) |
563 + | |
564 + | for i in rowswithoutant: |
565 + | if datacol[i] == beforedata[i]: |
566 + | unselectedrows.append(True) |
567 + | tb.close() |
568 + | |
569 + | print(len(rowswithant), len(selectedrows)) |
570 + | print(len(rowswithoutant), len(unselectedrows)) |
571 + | |
572 + | |
573 + | self.assertTrue(len(rowswithant) == len(selectedrows)) |
574 + | self.assertTrue(len(rowswithoutant) == len(unselectedrows)) |
575 + | |
576 + | |
577 + | |
578 + | def test_setCalLib(self): |
579 + | """ Check that a provided cal table can be used to corrupt the MODEL_DATA """ |
580 + | |
581 + | cb.open(self._vis) |
582 + | thiscallib = cb.parsecallibfile(self._lib) |
583 + | cb.setcallib(thiscallib) |
584 + | cb.corrupt() |
585 + | cb.close() |
586 + | |
587 + | tb.open(self._vis) |
588 + | ref = np.mean(tb.getcol('MODEL_DATA')) |
589 + | tb.close() |
590 + | |
591 + | self.assertTrue(np.isclose(ref, (0.49776817835921183+0.0001577823988490721j))) |
592 + | |
593 + | def test_setCorrDepFlags(self): |
594 + | """ Check that corrdepflags will be checked """ |
595 + | |
596 + | |
597 + | cb.open(self._vis) |
598 + | result = cb.setcorrdepflags(corrdepflags=True) |
599 + | |
600 + | self.assertTrue(result, msg="Setting of corrdepflags has failed") |
601 + | |
602 + | def test_setModel(self): |
603 + | |
604 + | pass |
605 + | |
606 + | def test_solveBandpass(self): |
607 + | """ Check that solve band poly creates the output table""" |
608 + | |
609 + | |
610 + | cb.setvi(old=True, quiet=False) |
611 + | |
612 + | cb.open(self._bandvis) |
613 + | cb.setsolvebandpoly(table='bpoly', degamp=5, degphase=7) |
614 + | cb.solve() |
615 + | cb.close() |
616 + | |
617 + | self.assertTrue(os.path.exists('bpoly')) |
618 + | |
619 + | def test_solveGainspline(self): |
620 + | """ Check that solve gain spline creates the output table """ |
621 + | |
622 + | cb.setvi(old=True, quiet=False) |
623 + | cb.open(self._bandvis) |
624 + | cb.setsolvegainspline(table='gainspline',mode='AMP',splinetime=10800.0) |
625 + | cb.solve() |
626 + | cb.close() |
627 + | |
628 + | |
629 + | self.assertTrue(os.path.exists('gainspline')) |
630 + | |
631 + | def test_smoothedCalTables(self): |
632 + | """ Check that the smooth command creates a smoothed cal table """ |
633 + | |
634 + | |
635 + | cb.open(self._vis) |
636 + | cb.smooth(tablein=self._cal, tableout='testcalout.cal') |
637 + | cb.close() |
638 + | |
639 + | self.assertTrue(os.path.exists('testcalout.cal')) |
640 + | |
641 + | def test_specifyCal(self): |
642 + | |
643 + | |
644 + | pass |
645 + | |
646 + | def test_validateCalLib(self): |
647 + | |
648 + | |
649 + | pass |
650 + | |
651 + | def test_corruptCal(self): |
652 + | """ Check that the MS is corrupted using the cal table """ |
653 + | |
654 + | cb.open(self._vis) |
655 + | cb.setapply(table=self._cal) |
656 + | cb.corrupt() |
657 + | cb.close() |
658 + | |
659 + | tb.open(self._vis) |
660 + | columns = tb.colnames() |
661 + | tb.close() |
662 + | |
663 + | self.assertTrue('CORRECTED_DATA' in columns) |
664 + | |
665 + | |
666 + | def suite(): |
667 + | return [calibrater_test] |
668 + | |
669 + | if __name__ == '__main__': |
670 + | unittest.main() |