Commits
31 31 | # |
32 32 | # <summary> |
33 33 | # Test suite for the CASA tool msmd |
34 34 | # </summary> |
35 35 | # |
36 36 | # <reviewed reviwer="" date="" tests="" demos=""> |
37 37 | # </reviewed |
38 38 | # |
39 39 | # <prerequisite> |
40 40 | # <ul> |
41 - | # <li> <linkto class="msmd:description">msmd</linkto> |
41 + | # <li> <linkto class="msmd:description">msmd</linkto> |
42 42 | # </ul> |
43 43 | # </prerequisite> |
44 44 | # |
45 45 | # <etymology> |
46 46 | # Test for the msmd tool |
47 47 | # </etymology> |
48 48 | # |
49 49 | # <synopsis> |
50 50 | # Test the msmd tool |
51 - | # </synopsis> |
51 + | # </synopsis> |
52 52 | # |
53 53 | # <example> |
54 54 | # |
55 55 | # This test runs as part of the CASA python unit test suite and can be run from |
56 56 | # the command line via eg |
57 - | # |
57 + | # |
58 58 | # `echo $CASAPATH/bin/casa | sed -e 's$ $/$'` --nologger --log2term -c `echo $CASAPATH | awk '{print $1}'`/code/xmlcasa/scripts/regressions/admin/runUnitTest.py test_msmd[test1,test2,...] |
59 59 | # |
60 60 | # </example> |
61 61 | # |
62 62 | # <motivation> |
63 63 | # To provide a test standard for the ia.subimage tool method to ensure |
64 - | # coding changes do not break the associated bits |
64 + | # coding changes do not break the associated bits |
65 65 | # </motivation> |
66 66 | # |
67 67 | |
68 68 | ########################################################################### |
69 69 | import shutil |
70 70 | import casac |
71 71 | from tasks import * |
72 72 | from taskinit import * |
73 73 | from __main__ import * |
74 74 | import unittest |
75 75 | import numpy |
76 76 | |
77 77 | datadir = os.environ.get('CASAPATH').split()[0]+'/data/' |
78 78 | fixture = datadir + 'regression/unittest/MSMetaData/MSMetaData.ms' |
79 79 | |
80 80 | writeable = datadir + 'regression/unittest/MSMetaData/checker.ms' |
81 81 | |
82 82 | def near(a, b, epsilon): |
83 83 | return abs((a-b)/max(a,b)) <= epsilon |
84 84 | |
85 85 | class msmd_test(unittest.TestCase): |
86 - | |
86 + | |
87 87 | def setUp(self): |
88 88 | self.md = msmdtool() |
89 89 | self.md.open(fixture) |
90 - | |
90 + | |
91 91 | def tearDown(self): |
92 92 | self.md.done() |
93 93 | self.assertTrue(len(tb.showcache()) == 0) |
94 - | |
94 + | |
95 95 | def test_antennanames_and_antennaids(self): |
96 96 | """Test antennanames() and antennaids()""" |
97 97 | names = [ |
98 98 | "DA43", "DA44", "DV02", "DV03", "DV05", |
99 99 | "DV07", "DV08", "DV10", "DV12", "DV13", |
100 100 | "DV14", "DV15", "DV16", "DV17", "DV18" |
101 101 | ] |
102 102 | for i in range(self.md.nantennas()): |
103 103 | got = self.md.antennanames(i) |
104 104 | self.assertTrue(got == [names[i]]) |
105 105 | got = self.md.antennaids(names[i]) |
106 106 | self.assertTrue(got == [i]) |
107 107 | expec = ["DV07", "DV02"] |
108 108 | got = self.md.antennanames([5, 2]) |
109 109 | self.assertTrue(got == expec) |
110 110 | expec = [4, 0, 7] |
111 111 | got = self.md.antennaids([names[4], names[0], names[7]]) |
112 112 | self.assertTrue((got == expec).all()) |
113 - | |
114 - | got = self.md.antennaids() |
113 + | |
114 + | got = self.md.antennaids() |
115 115 | expec = range(self.md.nantennas()) |
116 116 | self.assertTrue((got == expec).all()) |
117 - | |
117 + | |
118 118 | got = self.md.antennaids(["DV12", "DA*", "DV1*"]) |
119 119 | expec = [ 8, 0, 1, 7, 9, 10, 11, 12, 13, 14] |
120 120 | self.assertTrue((got == expec).all()) |
121 - | |
121 + | |
122 122 | got = self.md.antennaids(["DV12", "DA*", "DV1*"], "1m", qa.quantity(15,"m")) |
123 123 | expec = [ 8, 0, 1, 7, 9, 10, 11, 12, 13, 14] |
124 124 | self.assertTrue((got == expec).all()) |
125 - | |
125 + | |
126 126 | got = self.md.antennaids(["DV12", "DA*", "DV1*"], "1m", qa.quantity(2,"m")) |
127 127 | self.assertTrue(len(got) == 0) |
128 - | |
128 + | |
129 129 | got = self.md.antennaids([], mindiameter="25m") |
130 130 | self.assertTrue(len(got) == 0) |
131 - | |
131 + | |
132 132 | def test_chanavgspws(self): |
133 133 | """Test chanavgspws()""" |
134 134 | got = self.md.chanavgspws() |
135 135 | expec = numpy.array([ |
136 136 | 2, 4, 6, 8, 10, 12, 14, |
137 137 | 16, 18, 20, 22, 24 |
138 138 | ]) |
139 139 | self.assertTrue((got == expec).all()) |
140 140 | |
141 141 | def test_exposuretime(self): |
142 142 | """Test exposuretime()""" |
143 143 | # no DDID for spwid=0, polid=0 |
144 144 | self.assertRaises(Exception,self.md.exposuretime, scan=30, spwid=0, polid=0) |
145 145 | got = self.md.exposuretime(scan=30, spwid=0, polid=1) |
146 146 | self.assertTrue(got == qa.quantity("1.152s")) |
147 147 | got = self.md.exposuretime(scan=30, spwid=0) |
148 148 | self.assertTrue(got == qa.quantity("1.152s")) |
149 149 | got = self.md.exposuretime(scan=17, spwid=10, polid=0) |
150 150 | self.assertTrue(got == qa.quantity("1.008s")) |
151 151 | got = self.md.exposuretime(scan=17, spwid=10) |
152 152 | self.assertTrue(got == qa.quantity("1.008s")) |
153 - | |
153 + | |
154 154 | def test_fdmspws(self): |
155 155 | """Test fdmspws()""" |
156 156 | got = self.md.fdmspws() |
157 157 | expec = numpy.array([17, 19, 21, 23]) |
158 158 | self.assertTrue((got == expec).all()) |
159 159 | |
160 160 | def test_fieldsforintent(self): |
161 161 | """Test fieldsforintent()""" |
162 162 | for intent in self.md.intents(): |
163 163 | if intent=="CALIBRATE_AMPLI#ON_SOURCE": |
194 194 | def test_fieldsforname(self): |
195 195 | """Test fieldforname()""" |
196 196 | got = self.md.fieldsforname() |
197 197 | expec = numpy.array([0, 1, 2, 3, 4, 5]) |
198 198 | self.assertTrue((got == expec).all()) |
199 199 | names = ["3C279", "J1337-129", "Titan", "J1625-254", "V866 Sco", "RNO 90"] |
200 200 | for i in range(self.md.nfields()): |
201 201 | expec = numpy.array([i]) |
202 202 | got = self.md.fieldsforname(names[i]) |
203 203 | self.assertTrue((got==expec).all()) |
204 - | |
204 + | |
205 205 | def test_fieldsforscan(self): |
206 206 | """Test fieldsforscan() and fieldsforscans()""" |
207 207 | expec2 = numpy.array([], dtype="int") |
208 208 | scans = numpy.array([], dtype="int") |
209 209 | names = numpy.array([ |
210 210 | "3C279", "J1337-129", "Titan", |
211 211 | "J1625-254", "V866 Sco", "RNO 90" |
212 212 | ]) |
213 213 | self.assertTrue((self.md.fieldsforscans() == [0, 1, 2, 3, 4, 5]).all()) |
214 214 | for scan in self.md.scannumbers(): |
243 243 | expec = 0 |
244 244 | elif ik == 5: |
245 245 | expec = 1 |
246 246 | elif ik in [6, 7]: |
247 247 | expec = 2 |
248 248 | elif ik in [ 8, 9, 10, 13, 14, 17, 18, 21, 24, 25, 28, 31, 32]: |
249 249 | expec = 3 |
250 250 | elif ik in [11, 12, 19, 20, 26, 27]: |
251 251 | expec = 4 |
252 252 | elif ik in [15, 16, 22, 23, 29, 30]: |
253 - | expec = 5 |
253 + | expec = 5 |
254 254 | self.assertTrue(v[0] == expec) |
255 255 | |
256 256 | def test_fieldsforspw(self): |
257 257 | """Test fieldsforspw()""" |
258 258 | for i in range(self.md.nspw()): |
259 259 | if (i==0): |
260 260 | expids = [0, 1, 2, 3, 4, 5] |
261 261 | expnames = [ |
262 262 | "3C279", "J1337-129", "Titan", "J1625-254", |
263 263 | "V866 Sco", "RNO 90" |
270 270 | expnames = [ |
271 271 | "3C279", "Titan", "J1625-254", |
272 272 | "V866 Sco", "RNO 90" |
273 273 | ] |
274 274 | else: |
275 275 | expids = [] |
276 276 | expnames = [] |
277 277 | if i < 25: |
278 278 | got = self.md.fieldsforspw(i, False) |
279 279 | self.assertTrue((got == expids).all()) |
280 - | got = self.md.fieldsforspw(i, true) |
280 + | got = self.md.fieldsforspw(i, True) |
281 281 | self.assertTrue((got == expnames).all()) |
282 282 | else: |
283 283 | got = self.md.fieldsforspw(i, False) |
284 284 | self.assertTrue(len(got) == 0) |
285 - | got = self.md.fieldsforspw(i, true) |
285 + | got = self.md.fieldsforspw(i, True) |
286 286 | self.assertTrue(len(got) == 0) |
287 287 | |
288 288 | def test_fieldsfortimes(self): |
289 289 | """Test fieldsfortimes()""" |
290 290 | got = self.md.fieldsfortimes(4842824746, 10) |
291 291 | expec = numpy.array([0]) |
292 292 | self.assertTrue((got == expec).all()) |
293 293 | got = self.md.fieldsfortimes(4842824746, 10000) |
294 294 | expec = numpy.array([0, 1, 2, 3, 4, 5]) |
295 295 | self.assertTrue((got == expec).all()) |
296 - | |
296 + | |
297 297 | def test_intents(self): |
298 298 | """Test intents()""" |
299 299 | got = self.md.intents() |
300 300 | expec = numpy.array( |
301 301 | [ |
302 302 | "CALIBRATE_AMPLI#ON_SOURCE", |
303 303 | "CALIBRATE_ATMOSPHERE#OFF_SOURCE", |
304 304 | "CALIBRATE_ATMOSPHERE#ON_SOURCE", |
305 305 | "CALIBRATE_BANDPASS#ON_SOURCE", |
306 306 | "CALIBRATE_PHASE#ON_SOURCE", |
307 307 | "CALIBRATE_POINTING#ON_SOURCE", |
308 308 | "CALIBRATE_SIDEBAND_RATIO#OFF_SOURCE", |
309 309 | "CALIBRATE_SIDEBAND_RATIO#ON_SOURCE", |
310 310 | "CALIBRATE_WVR#OFF_SOURCE", |
311 311 | "CALIBRATE_WVR#ON_SOURCE", |
312 312 | "OBSERVE_TARGET#ON_SOURCE" |
313 313 | ] |
314 314 | ) |
315 315 | self.assertTrue((got == expec).all()) |
316 - | |
316 + | |
317 317 | def test_intentsforfield(self): |
318 318 | """Test intentsforfield()""" |
319 319 | for field in range(self.md.nfields()): |
320 320 | for i in [0, 1]: |
321 321 | if i == 0: |
322 322 | f = field |
323 323 | else: |
324 324 | f = self.md.namesforfields(field)[0] |
325 325 | got = self.md.intentsforfield(f) |
326 326 | if field == 0: |
353 353 | "CALIBRATE_WVR#OFF_SOURCE", "CALIBRATE_WVR#ON_SOURCE", |
354 354 | "OBSERVE_TARGET#ON_SOURCE" |
355 355 | ]) |
356 356 | if field == 5: |
357 357 | expec = numpy.array([ |
358 358 | "CALIBRATE_ATMOSPHERE#OFF_SOURCE", "CALIBRATE_ATMOSPHERE#ON_SOURCE", |
359 359 | "CALIBRATE_WVR#OFF_SOURCE", "CALIBRATE_WVR#ON_SOURCE", |
360 360 | "OBSERVE_TARGET#ON_SOURCE" |
361 361 | ]) |
362 362 | self.assertTrue((got == expec).all()) |
363 - | |
363 + | |
364 364 | def test_intentsforscan(self): |
365 365 | """Test intentsforscan()""" |
366 366 | for scan in self.md.scannumbers(): |
367 367 | got = self.md.intentsforscan(scan); |
368 368 | if [1, 5, 8].count(scan) > 0: |
369 369 | expec = numpy.array( |
370 370 | ["CALIBRATE_POINTING#ON_SOURCE", "CALIBRATE_WVR#ON_SOURCE"] |
371 371 | ) |
372 372 | elif scan==2: |
373 373 | expec = numpy.array( |
374 374 | [ |
375 375 | "CALIBRATE_SIDEBAND_RATIO#OFF_SOURCE", |
376 376 | "CALIBRATE_SIDEBAND_RATIO#ON_SOURCE", |
377 377 | "CALIBRATE_WVR#OFF_SOURCE", |
378 378 | "CALIBRATE_WVR#ON_SOURCE" |
379 379 | ] |
380 - | ) |
380 + | ) |
381 381 | elif [3, 6, 9, 11, 13, 15, 17, 19, 22, 24, 26, 29, 31].count(scan) > 0: |
382 382 | expec = numpy.array( |
383 383 | [ |
384 384 | "CALIBRATE_ATMOSPHERE#OFF_SOURCE", |
385 385 | "CALIBRATE_ATMOSPHERE#ON_SOURCE", |
386 386 | "CALIBRATE_WVR#OFF_SOURCE", |
387 387 | "CALIBRATE_WVR#ON_SOURCE" |
388 388 | ] |
389 389 | ) |
390 390 | elif scan == 4: |
406 406 | elif [10, 14, 18, 21, 25, 28, 32].count(scan) > 0: |
407 407 | expec = numpy.array( |
408 408 | [ |
409 409 | "CALIBRATE_PHASE#ON_SOURCE", |
410 410 | "CALIBRATE_WVR#ON_SOURCE" |
411 411 | ] |
412 412 | ) |
413 413 | elif [12, 16, 20, 23, 27, 30].count(scan) > 0: |
414 414 | expec = numpy.array(["OBSERVE_TARGET#ON_SOURCE"]) |
415 415 | self.assertTrue((got == expec).all()) |
416 - | |
416 + | |
417 417 | def test_intentsforspw(self): |
418 418 | """Test intentsforspw()""" |
419 419 | for spw in range(self.md.nspw()): |
420 420 | got = self.md.intentsforspw(spw) |
421 421 | if spw == 0: |
422 422 | expec = numpy.array( |
423 423 | [ |
424 424 | "CALIBRATE_AMPLI#ON_SOURCE", |
425 425 | "CALIBRATE_ATMOSPHERE#OFF_SOURCE", |
426 426 | "CALIBRATE_ATMOSPHERE#ON_SOURCE", |
476 476 | got = self.md.namesforfields(i) |
477 477 | self.assertTrue(got == [names[i]]) |
478 478 | self.assertTrue(self.md.namesforfields() == names) |
479 479 | got = self.md.namesforfields([4, 0, 2]) |
480 480 | print "*** got " + str(got) |
481 481 | self.assertTrue(got == ["V866 Sco", "3C279", "Titan"]) |
482 482 | |
483 483 | def test_nantennas(self): |
484 484 | """ Test nantennas()""" |
485 485 | self.assertTrue(self.md.nantennas() == 15) |
486 - | |
486 + | |
487 487 | def test_narrays(self): |
488 488 | """ Test narrays()""" |
489 489 | self.assertTrue(self.md.narrays() == 1) |
490 490 | |
491 491 | def test_nfields(self): |
492 492 | """ Test nfields()""" |
493 493 | self.assertTrue(self.md.nfields() == 6) |
494 - | |
494 + | |
495 495 | def test_nobservations(self): |
496 496 | """ Test nobservations()""" |
497 497 | self.assertTrue(self.md.nobservations() == 1) |
498 - | |
498 + | |
499 499 | def test_nscans(self): |
500 500 | """ Test nscans()""" |
501 501 | self.assertTrue(self.md.nscans() == 32) |
502 502 | |
503 503 | def test_nspw(self): |
504 504 | """ Test nspw()""" |
505 505 | self.assertTrue(self.md.nspw() == 40) |
506 - | |
506 + | |
507 507 | def test_nstates(self): |
508 508 | """ Test nstates()""" |
509 509 | self.assertTrue(self.md.nstates() == 43) |
510 - | |
510 + | |
511 511 | def test_nvis(self): |
512 512 | """ Test nvis()""" |
513 513 | self.assertTrue(self.md.nrows() == 15363) |
514 - | |
514 + | |
515 515 | def test_scannumbers(self): |
516 516 | """ Test scannumbers()""" |
517 517 | expec = numpy.array(range(1, 33)) |
518 518 | self.assertTrue((self.md.scannumbers()==expec).all()) |
519 - | |
519 + | |
520 520 | def test_scansforfield(self): |
521 521 | """Test scansforfield() and scansforfields()""" |
522 522 | names = ["3C279", "J1337-129", "Titan", "J1625-254", "V866 Sco", "RNO 90"] |
523 523 | mymap = self.md.scansforfields(0, 0) |
524 524 | for i in range(self.md.nfields()): |
525 525 | if i == 0: |
526 526 | expec = numpy.array([1, 2, 3, 4]) |
527 527 | elif i == 1: |
528 528 | expec = numpy.array([5]) |
529 529 | elif i == 2: |
535 535 | ]) |
536 536 | elif i == 4: |
537 537 | expec = numpy.array([11, 12, 19, 20, 26, 27]) |
538 538 | elif i == 5: |
539 539 | expec = numpy.array([15, 16, 22, 23, 29, 30]) |
540 540 | got = self.md.scansforfield(i) |
541 541 | self.assertTrue((got==expec).all()) |
542 542 | got = self.md.scansforfield(names[i]) |
543 543 | self.assertTrue((got == expec).all()) |
544 544 | self.assertTrue((mymap[str(i)] == expec).all()) |
545 - | |
545 + | |
546 546 | def test_scansforintent(self): |
547 547 | """Test scansforintent()""" |
548 548 | for i in self.md.intents(): |
549 549 | if i=="CALIBRATE_AMPLI#ON_SOURCE": |
550 550 | expec = numpy.array([7]) |
551 551 | elif ["CALIBRATE_ATMOSPHERE#OFF_SOURCE", "CALIBRATE_ATMOSPHERE#ON_SOURCE"].count(i) > 0: |
552 552 | expec = numpy.array([ |
553 553 | 3, 6, 9, 11, 13, 15, 17, |
554 554 | 19, 22, 24, 26, 29, 31 |
555 555 | ]) |
579 579 | self.assertTrue( |
580 580 | ( |
581 581 | self.md.scansforintent('*WVR*') |
582 582 | == numpy.array([ |
583 583 | 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, |
584 584 | 11, 13, 14, 15, 17, 18, 19, 21, |
585 585 | 22, 24, 25, 26, 28, 29, 31, 32 |
586 586 | ]) |
587 587 | ).all() |
588 588 | ) |
589 - | |
589 + | |
590 590 | def test_scansforspw(self): |
591 591 | """Test scansforspw() and scansforspws()""" |
592 592 | gotmap = self.md.scansforspws(0, 0) |
593 593 | for i in range(self.md.nspw()): |
594 594 | if (i==0): |
595 595 | expec = numpy.array([ |
596 596 | 1, 2, 3, 4, 5, 6, 7, 8, 9, |
597 597 | 10, 11, 12, 13, 14, 15, 16, 17, 18, |
598 598 | 19, 20, 21, 22, 23, 24, 25, 26, 27, |
599 599 | 28, 29, 30, 31, 32 |
713 713 | ] |
714 714 | ) |
715 715 | elif (i == 1): |
716 716 | expec = numpy.array([0, 1, 2, 3, 4, 5, 6, 7, 8]) |
717 717 | elif (i==2 or i==4 or i==5): |
718 718 | expec = numpy.array( |
719 719 | [ |
720 720 | 0, 9, 10, 11, 12, 13, 14, 15, 16, |
721 721 | 17, 18, 19, 20, 21, 22, 23, 24 |
722 722 | ] |
723 - | ) |
723 + | ) |
724 724 | got = self.md.spwsforfield(i) |
725 725 | self.assertTrue((got == expec).all()) |
726 726 | got = self.md.spwsforfield(names[i]) |
727 727 | self.assertTrue((got == expec).all()) |
728 728 | got = field_spw_map[str(i)] |
729 729 | self.assertTrue((got == expec).all()) |
730 730 | |
731 731 | def test_spwsforintent(self): |
732 732 | """Test spwsforintent()""" |
733 733 | for intent in self.md.intents(): |
734 734 | got = self.md.spwsforintent(intent) |
735 735 | if [ |
736 736 | "CALIBRATE_AMPLI#ON_SOURCE", |
737 737 | "CALIBRATE_BANDPASS#ON_SOURCE", |
738 738 | "CALIBRATE_PHASE#ON_SOURCE", |
739 739 | "OBSERVE_TARGET#ON_SOURCE" |
740 740 | ].count(intent) > 0: |
741 - | |
741 + | |
742 742 | expec = numpy.array([0, 17, 18, 19, 20, 21, 22, 23, 24]) |
743 743 | elif [ |
744 744 | "CALIBRATE_ATMOSPHERE#OFF_SOURCE", |
745 745 | "CALIBRATE_ATMOSPHERE#ON_SOURCE", |
746 746 | "CALIBRATE_SIDEBAND_RATIO#OFF_SOURCE", |
747 747 | "CALIBRATE_SIDEBAND_RATIO#ON_SOURCE", |
748 748 | "CALIBRATE_WVR#OFF_SOURCE" |
749 749 | ].count(intent) > 0: |
750 750 | expec = numpy.array([0, 9, 10, 11, 12, 13, 14, 15, 16]) |
751 751 | elif intent == "CALIBRATE_POINTING#ON_SOURCE": |
752 752 | expec = numpy.array([0, 1, 2, 3, 4, 5, 6, 7, 8]) |
753 753 | elif intent == "CALIBRATE_WVR#ON_SOURCE": |
754 754 | expec = numpy.array( |
755 755 | [ |
756 756 | 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, |
757 757 | 10, 11, 12, 13, 14, 15, 16, |
758 758 | 17, 18, 19, 20, 21, 22, 23, 24 |
759 759 | ] |
760 760 | ) |
761 761 | self.assertTrue((got == expec).all()) |
762 - | |
762 + | |
763 763 | got = self.md.spwsforintent('*WVR*') |
764 764 | expec = numpy.array( |
765 765 | [ |
766 766 | 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, |
767 767 | 10, 11, 12, 13, 14, 15, 16, |
768 768 | 17, 18, 19, 20, 21, 22, 23, 24 |
769 769 | ] |
770 770 | ) |
771 771 | self.assertTrue((got == expec).all()) |
772 772 | |
776 776 | for i in self.md.scannumbers(): |
777 777 | if [1, 5, 8].count(i) > 0: |
778 778 | expec = numpy.array([0, 1, 2, 3, 4, 5, 6, 7, 8]) |
779 779 | elif [2, 3, 6, 9, 11, 13, 15, 17, 19, 22, 24, 26, 29, 31].count(i) > 0: |
780 780 | expec = numpy.array([0, 9, 10, 11, 12, 13, 14, 15, 16]) |
781 781 | elif [4, 7, 10, 12, 14, 16, 18, 20, 21, 23, 25, 27, 28, 30, 32].count(i) > 0: |
782 782 | expec = numpy.array([0, 17, 18, 19, 20, 21, 22, 23, 24]) |
783 783 | got = self.md.spwsforscan(i) |
784 784 | self.assertTrue((got == expec).all()) |
785 785 | self.assertTrue((scan_to_spws[str(i)] == expec).all()) |
786 - | |
786 + | |
787 787 | def test_statesforscan(self): |
788 788 | """Test statesforscan() and statesforscans()""" |
789 789 | mymap = self.md.statesforscans() |
790 790 | for i in self.md.scannumbers(): |
791 791 | if [1, 5, 8].count(i) > 0: |
792 792 | expec = numpy.array([0, 1, 2, 3, 4]) |
793 793 | elif i == 2: |
794 794 | expec = numpy.array([5, 6]) |
795 795 | elif [3, 6, 9, 11, 13, 15, 17, 19, 22, 24, 26, 29, 31].count(i) > 0: |
796 796 | expec = numpy.array([7, 8, 9]) |
809 809 | expec = numpy.array([33, 34, 35, 36]) |
810 810 | got = self.md.statesforscan(i) |
811 811 | self.assertTrue((got == expec).all()) |
812 812 | self.assertTrue((mymap[str(i)] == expec).all()) |
813 813 | |
814 814 | def test_telescopenames(self): |
815 815 | """ Test observatorynames()""" |
816 816 | got = self.md.observatorynames() |
817 817 | expec = numpy.array(["ALMA"]) |
818 818 | self.assertTrue((got == expec).all()) |
819 - | |
819 + | |
820 820 | def test_tdmspws(self): |
821 821 | """Test tdmspws()""" |
822 822 | got = self.md.tdmspws() |
823 823 | expec = numpy.array([1, 3, 5, 7, 9, 11, 13, 15]) |
824 824 | self.assertTrue((got == expec).all()) |
825 825 | |
826 826 | def test_timesforfield(self): |
827 827 | """Test timesforfield()""" |
828 828 | for i in range(self.md.nfields()): |
829 829 | if i == 0: |
984 984 | 4842825869.1599998474, 4842825870.1680002213 |
985 985 | ] |
986 986 | self.assertTrue(len(got.keys()) == 9) |
987 987 | for i in range(9): |
988 988 | k = str(i) |
989 989 | e = numpy.array(expec[i]) |
990 990 | g = got[k] |
991 991 | self.assertTrue(len(g) == len(e)) |
992 992 | d = g - e |
993 993 | self.assertTrue(numpy.max(numpy.abs(d)) < 0.1) |
994 - | |
994 + | |
995 995 | def test_timesforscans(self): |
996 996 | """Test timesforscans()""" |
997 997 | expec = numpy.array([ |
998 998 | 4842825928.7, 4842825929.5, |
999 999 | 4842825930.0, |
1000 1000 | 4842825930.6, 4842825941.4, |
1001 1001 | 4842825942.2, 4842825942.5, |
1002 1002 | 4842825942.7, 4842825943.2, |
1003 1003 | 4842825954.0, 4842825954.9, |
1004 1004 | 4842825955.2, 4842825955.4, |
1007 1007 | 4842825004.8, 4842825005.0, |
1008 1008 | 4842825016.3, 4842825016.6, |
1009 1009 | 4842825017.1, 4842825017.5, |
1010 1010 | 4842825017.6, 4842825029.0, |
1011 1011 | 4842825029.3, 4842825029.8, |
1012 1012 | 4842825030.1, 4842825030.3 |
1013 1013 | ]) |
1014 1014 | expec.sort() |
1015 1015 | got = self.md.timesforscans([3, 6]) |
1016 1016 | self.assertTrue((abs(got - expec)).all() < 0.1) |
1017 - | |
1017 + | |
1018 1018 | def test_wvrspws(self): |
1019 1019 | """Test wvrspws()""" |
1020 1020 | got = self.md.wvrspws() |
1021 1021 | expec = numpy.array([ |
1022 1022 | 0, 25, 26, 27, 28, 29, 30, 31, |
1023 1023 | 32, 33, 34, 35, 36, 37, 38, 39 |
1024 1024 | ]) |
1025 1025 | self.assertTrue((got == expec).all()) |
1026 1026 | got = self.md.wvrspws(complement=False) |
1027 1027 | self.assertTrue((got == expec).all()) |
1047 1047 | 2, 4, 6, 8, 10, 12, 14, |
1048 1048 | 16, 18, 20, 22, 24 |
1049 1049 | ]) |
1050 1050 | self.assertTrue((got == expec).all()) |
1051 1051 | got = self.md.almaspws(chavg=True, complement=True) |
1052 1052 | jj = range(40) |
1053 1053 | for i in expec: |
1054 1054 | jj.remove(i) |
1055 1055 | expec = jj |
1056 1056 | self.assertTrue((got == expec).all()) |
1057 - | |
1057 + | |
1058 1058 | got = self.md.almaspws(fdm=True) |
1059 1059 | expec = [17, 19, 21, 23] |
1060 1060 | self.assertTrue((got == expec).all()) |
1061 1061 | got = self.md.almaspws(fdm=True, complement=True) |
1062 1062 | jj = range(40) |
1063 1063 | for i in expec: |
1064 1064 | jj.remove(i) |
1065 1065 | expec = jj |
1066 1066 | self.assertTrue((got == expec).all()) |
1067 - | |
1067 + | |
1068 1068 | got = self.md.almaspws(tdm=True) |
1069 1069 | expec = [1, 3, 5, 7, 9, 11, 13, 15] |
1070 1070 | self.assertTrue((got == expec).all()) |
1071 1071 | got = self.md.almaspws(tdm=True, complement=True) |
1072 1072 | jj = range(40) |
1073 1073 | for i in expec: |
1074 1074 | jj.remove(i) |
1075 1075 | expec = jj |
1076 1076 | self.assertTrue((got == expec).all()) |
1077 - | |
1077 + | |
1078 1078 | got = self.md.almaspws(wvr=True) |
1079 1079 | expec = numpy.array([ |
1080 1080 | 0, 25, 26, 27, 28, 29, 30, 31, |
1081 1081 | 32, 33, 34, 35, 36, 37, 38, 39 |
1082 1082 | ]) |
1083 1083 | self.assertTrue((got == expec).all()) |
1084 1084 | got = self.md.almaspws(wvr=True, complement=True) |
1085 1085 | expec = range(1, 25) |
1086 1086 | self.assertTrue((got == expec).all()) |
1087 1087 | |
1108 1108 | self.assertTrue((got == expec).all()) |
1109 1109 | got = self.md.bandwidths(-1) |
1110 1110 | self.assertTrue((got == expec).all()) |
1111 1111 | for i in range(len(expec)): |
1112 1112 | self.assertTrue(self.md.bandwidths(i) == expec[i]) |
1113 1113 | self.assertTrue(self.md.bandwidths([i]) == [expec[i]]) |
1114 1114 | self.assertTrue((self.md.bandwidths([4, 10, 5]) == [expec[4], expec[10], expec[5]]).all) |
1115 1115 | self.assertRaises(Exception, self.md.bandwidths, 50) |
1116 1116 | self.assertRaises(Exception, self.md.bandwidths, [4, 50]) |
1117 1117 | self.assertRaises(Exception, self.md.bandwidths, [4, -1]) |
1118 - | |
1118 + | |
1119 1119 | def test_chanwidths(self): |
1120 1120 | """Test chanwidths()""" |
1121 1121 | got = self.md.chanwidths(0) |
1122 1122 | expec = numpy.array([1.5e9, 2.5e9, 2e9, 1.5e9]) |
1123 1123 | self.assertTrue((got == expec).all()) |
1124 1124 | got = self.md.chanwidths(0, "MHz") |
1125 1125 | self.assertTrue((got == expec/1e6).all()) |
1126 1126 | self.assertRaises(Exception, self.md.chanwidths, 50); |
1127 1127 | self.assertRaises(Exception, self.md.chanwidths, -2); |
1128 - | |
1128 + | |
1129 1129 | self.md.close() |
1130 1130 | self.assertRaises(Exception, self.md.chanwidths, 1) |
1131 1131 | self.assertRaises(Exception, self.md.chanfreqs, 1) |
1132 1132 | self.assertRaises(Exception, self.md.meanfreq, 1) |
1133 1133 | self.assertRaises(Exception, self.md.sideband, 1) |
1134 1134 | self.assertRaises(Exception, self.md.effexposuretime) |
1135 1135 | |
1136 1136 | def test_datadescids(self): |
1137 1137 | """Test datadescids()""" |
1138 1138 | md = self.md; |
1142 1142 | self.assertTrue((got == range(25)).all()) |
1143 1143 | for i in range(25): |
1144 1144 | got = md.datadescids(i, -1) |
1145 1145 | self.assertTrue(got == [i]) |
1146 1146 | got = md.datadescids(pol=1) |
1147 1147 | self.assertTrue(got == [0]) |
1148 1148 | got = md.datadescids(pol=0) |
1149 1149 | self.assertTrue((got == range(1, 25)).all()) |
1150 1150 | got = md.datadescids(spw=10, pol=1) |
1151 1151 | self.assertTrue(len(got) == 0) |
1152 - | |
1152 + | |
1153 1153 | def test_antennastations(self): |
1154 1154 | """Test antennastations()""" |
1155 1155 | md = self.md |
1156 1156 | got = md.antennastations() |
1157 1157 | expec = numpy.array([ |
1158 1158 | 'A075', 'A068', 'A077', 'A137', 'A082', 'A076', 'A021', 'A071', |
1159 1159 | 'A011', 'A072', 'A025', 'A074', 'A069', 'A138', 'A053' |
1160 1160 | ]) |
1161 1161 | self.assertTrue((got == expec).all()) |
1162 1162 | got = md.antennastations(-1) |
1172 1172 | expec = numpy.array(['A082', 'A077']) |
1173 1173 | self.assertTrue((got == expec).all()) |
1174 1174 | self.assertRaises(Exception, md.antennastations, [1, 20]) |
1175 1175 | self.assertRaises(Exception, md.antennastations, 20) |
1176 1176 | got = md.antennastations('DV13') |
1177 1177 | expec = numpy.array(["A072"]) |
1178 1178 | self.assertTrue((got == expec).all()) |
1179 1179 | expec = numpy.array(["A072", "A075"]) |
1180 1180 | got = md.antennastations(['DV13', 'DA43']) |
1181 1181 | self.assertTrue((got == expec).all()) |
1182 - | |
1182 + | |
1183 1183 | def test_namesforspws(self): |
1184 1184 | """Test namesforspws()""" |
1185 1185 | md = self.md |
1186 1186 | got = md.namesforspws() |
1187 1187 | i = 0 |
1188 1188 | for name in got: |
1189 1189 | if i == 3: |
1190 1190 | expec = "BB_1#SQLD" |
1191 1191 | else: |
1192 1192 | expec = "" |
1193 1193 | self.assertTrue(name == expec) |
1194 1194 | i += 1 |
1195 1195 | got = md.namesforspws([4, 3]) |
1196 1196 | self.assertTrue((got == numpy.array(["", "BB_1#SQLD"])).all()) |
1197 1197 | got = md.namesforspws(3) |
1198 1198 | self.assertTrue((got == numpy.array(["BB_1#SQLD"])).all()) |
1199 - | |
1199 + | |
1200 1200 | self.assertRaises(Exception, md.namesforspws, -2) |
1201 1201 | self.assertRaises(Exception, md.namesforspws, [0,-2]) |
1202 1202 | self.assertRaises(Exception, md.namesforspws, 85) |
1203 1203 | self.assertRaises(Exception, md.namesforspws, [0,85]) |
1204 1204 | |
1205 1205 | def test_spwsfornames(self): |
1206 1206 | """Test spwsfornames()""" |
1207 1207 | md = self.md |
1208 1208 | got = md.spwsfornames() |
1209 1209 | for k,v in got.iteritems(): |
1210 1210 | if (k == ""): |
1211 1211 | self.assertEqual(len(v), 39) |
1212 1212 | elif k == 'BB_1#SQLD': |
1213 1213 | self.assertEqual(len(v), 1) |
1214 1214 | self.assertEqual(v[0], 3) |
1215 - | |
1215 + | |
1216 1216 | got = md.spwsfornames("BB_1#SQLD") |
1217 1217 | self.assertEqual(len(got), 1) |
1218 1218 | v = got["BB_1#SQLD"] |
1219 1219 | self.assertEqual(len(v), 1) |
1220 1220 | self.assertEqual(v[0], 3) |
1221 1221 | got = md.spwsfornames("blah") |
1222 1222 | self.assertEqual(len(got), 0) |
1223 1223 | |
1224 1224 | def test_fieldsforsource(self): |
1225 1225 | """Test fieldsforsource() and fieldsforsources()""" |
1255 1255 | self.assertTrue(ret['antenna1']['id'] == 7) |
1256 1256 | self.assertTrue(ret['antenna2']['id'] == 11) |
1257 1257 | eps = 1e-10 |
1258 1258 | self.assertTrue(near(ret['time'], 4842824902.632, eps)) |
1259 1259 | p1 = ret['antenna1']['pointingdirection'] |
1260 1260 | p2 = ret['antenna2']['pointingdirection'] |
1261 1261 | self.assertTrue(near(p1['m0']['value'], -1.231522504164003, eps)) |
1262 1262 | self.assertTrue(near(p1['m1']['value'], 0.8713643131745025, eps)) |
1263 1263 | self.assertTrue(near(p2['m0']['value'], -1.2315042783587336, eps)) |
1264 1264 | self.assertTrue(near(p2['m1']['value'], 0.8713175514123461, eps)) |
1265 - | |
1265 + | |
1266 1266 | def test_name(self): |
1267 1267 | """Test name(), CAS-6817""" |
1268 1268 | md = self.md |
1269 1269 | name = md.name() |
1270 - | self.assertTrue(name == os.path.abspath(fixture)) |
1271 - | |
1270 + | self.assertTrue(name == os.path.abspath(fixture)) |
1271 + | |
1272 1272 | def test_timesforintent(self): |
1273 1273 | """Test timesforintent(), CAS-6919""" |
1274 1274 | md = self.md |
1275 1275 | intents = md.intents() |
1276 1276 | for intent in intents: |
1277 1277 | times = md.timesforintent(intent) |
1278 1278 | ntimes = len(times) |
1279 1279 | expec = 0 |
1280 1280 | if intent == "CALIBRATE_AMPLI#ON_SOURCE": |
1281 1281 | expec = 234 |
1294 1294 | or intent == "CALIBRATE_SIDEBAND_RATIO#ON_SOURCE" |
1295 1295 | ): |
1296 1296 | expec = 49 |
1297 1297 | elif intent == "CALIBRATE_WVR#OFF_SOURCE": |
1298 1298 | expec = 95 |
1299 1299 | elif intent == "CALIBRATE_WVR#ON_SOURCE": |
1300 1300 | expec = 1514 |
1301 1301 | elif intent == "OBSERVE_TARGET#ON_SOURCE": |
1302 1302 | expec = 1868 |
1303 1303 | self.assertTrue(ntimes == expec) |
1304 - | |
1304 + | |
1305 1305 | def test_CAS7463(self): |
1306 1306 | md = self.md |
1307 1307 | self.assertTrue((md.chanwidths(0) == [1.5e9, 2.5e9, 2e9, 1.5e9]).all()) |
1308 1308 | self.assertTrue((md.chanwidths(0, "GHz") == [1.5, 2.5, 2, 1.5]).all()) |
1309 1309 | self.assertRaises(Exception, md.chanwidths, 0, "km/s") |
1310 1310 | |
1311 1311 | def test_sideband(self): |
1312 1312 | md = self.md |
1313 1313 | expec = [ |
1314 1314 | -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, 1, 1, |
1315 1315 | -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, |
1316 1316 | -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 |
1317 1317 | ] |
1318 1318 | expec = numpy.array(expec) |
1319 1319 | got = [] |
1320 1320 | for i in range(md.nspw()): |
1321 1321 | got.append(md.sideband(i)) |
1322 1322 | get = numpy.array(got) |
1323 - | self.assertTrue((got == expec).all()) |
1324 - | |
1323 + | self.assertTrue((got == expec).all()) |
1324 + | |
1325 1325 | def test_fieldnames(self): |
1326 1326 | md = self.md |
1327 1327 | got = md.fieldnames() |
1328 1328 | expec = ['3C279', 'J1337-129', 'Titan', 'J1625-254', 'V866 Sco', 'RNO 90'] |
1329 1329 | self.assertTrue(got == expec) |
1330 - | |
1330 + | |
1331 1331 | def test_projects(self): |
1332 1332 | """Test msmd.projects()""" |
1333 1333 | md = self.md |
1334 1334 | projects = md.projects() |
1335 1335 | self.assertTrue(len(projects) == 1) |
1336 1336 | self.assertTrue(projects[0] == "T.B.D.") |
1337 - | |
1337 + | |
1338 1338 | def test_observers(self): |
1339 1339 | """Test msmd.observers()""" |
1340 1340 | md = self.md |
1341 1341 | observers = md.observers() |
1342 1342 | self.assertTrue(len(observers) == 1) |
1343 1343 | self.assertTrue(observers[0] == "csalyk") |
1344 - | |
1344 + | |
1345 1345 | def test_schedule(self): |
1346 1346 | """Test msmd.schedule()""" |
1347 1347 | md = self.md |
1348 1348 | schedule = md.schedule(0) |
1349 1349 | self.assertTrue(len(schedule) == 2) |
1350 1350 | self.assertTrue(schedule[0] == "SchedulingBlock uid://A002/X391d0b/X5e") |
1351 1351 | self.assertTrue(schedule[1] == "ExecBlock uid://A002/X3f6a86/X5da") |
1352 - | |
1352 + | |
1353 1353 | def test_timerforobs(self): |
1354 1354 | """Test msmd.timerforobs()""" |
1355 1355 | md = self.md |
1356 1356 | timer = md.timerangeforobs(0) |
1357 1357 | self.assertTrue( |
1358 1358 | near( |
1359 1359 | qa.convert(me.getvalue(timer['begin'])['m0'],"s")['value'], |
1360 1360 | 4842824633.472, 1e-10 |
1361 1361 | ) |
1362 1362 | ) |
1363 1363 | self.assertTrue( |
1364 1364 | near( |
1365 1365 | qa.convert(me.getvalue(timer['end'])['m0'],"s")['value'], |
1366 1366 | 4842830031.632, 1e-10 |
1367 1367 | ) |
1368 1368 | ) |
1369 - | |
1369 + | |
1370 1370 | def test_reffreq(self): |
1371 1371 | """Test msmd.reffreq""" |
1372 1372 | md = self.md |
1373 1373 | nspw = md.nspw() |
1374 1374 | expec = [ |
1375 1375 | 1.83300000e+11, 2.15250000e+11, 2.15250000e+11, |
1376 1376 | 2.17250000e+11, 2.17250000e+11, 2.29250000e+11, |
1377 1377 | 2.29250000e+11, 2.31250000e+11, 2.31250000e+11, |
1378 1378 | 2.30471730e+11, 2.30471730e+11, 2.32352270e+11, |
1379 1379 | 2.32352270e+11, 2.20465062e+11, 2.20465062e+11, |
1385 1385 | 1.83360000e+11, 1.83370000e+11, 1.83380000e+11, |
1386 1386 | 1.83390000e+11, 1.83400000e+11, 1.83410000e+11, |
1387 1387 | 1.83420000e+11, 1.83430000e+11, 1.83440000e+11, |
1388 1388 | 1.83450000e+11 |
1389 1389 | ] |
1390 1390 | for i in range(nspw): |
1391 1391 | freq = md.reffreq(i) |
1392 1392 | self.assertTrue(me.getref(freq) == 'TOPO') |
1393 1393 | v = me.getvalue(freq)['m0'] |
1394 1394 | self.assertTrue(qa.getunit(v) == "Hz") |
1395 - | got = qa.getvalue(v) |
1395 + | got = qa.getvalue(v) |
1396 1396 | self.assertTrue(abs((got - expec[i])/expec[i]) < 1e-8) |
1397 - | |
1397 + | |
1398 1398 | def test_antennadiamter(self): |
1399 1399 | """Test msmd.antennadiameter""" |
1400 1400 | md = self.md |
1401 1401 | nants = md.nantennas() |
1402 1402 | for i in range(nants): |
1403 1403 | diam = md.antennadiameter(i) |
1404 1404 | self.assertTrue(qa.getvalue(diam) == 12) |
1405 1405 | self.assertTrue(qa.getunit(diam) == 'm') |
1406 - | |
1406 + | |
1407 1407 | def test_spwfordatadesc(self): |
1408 1408 | """Test msmd.spwfordatadesc()""" |
1409 1409 | md = self.md |
1410 1410 | for i in range(25): |
1411 1411 | self.assertTrue(md.spwfordatadesc(i) == i) |
1412 1412 | spws = md.spwfordatadesc(-1) |
1413 1413 | expec = numpy.array(range(25)) |
1414 1414 | self.assertTrue((spws == expec).all()) |
1415 - | |
1415 + | |
1416 1416 | def test_polidfordatadesc(self): |
1417 1417 | """Test msmd.polidfordatadesc()""" |
1418 1418 | md = self.md |
1419 1419 | for i in range(25): |
1420 1420 | polid = md.polidfordatadesc(i) |
1421 1421 | if i == 0: |
1422 1422 | self.assertTrue(polid == 1) |
1423 1423 | else: |
1424 1424 | self.assertTrue(polid == 0) |
1425 1425 | polids = md.polidfordatadesc(-1) |
1426 1426 | expec = numpy.zeros([25]) |
1427 1427 | expec[0] = 1 |
1428 1428 | self.assertTrue((polids == expec).all()) |
1429 - | |
1429 + | |
1430 1430 | def test_ncorrforpol(self): |
1431 1431 | """Test msmd.ncorrforpol()""" |
1432 1432 | md = self.md |
1433 1433 | for i in [0, 1]: |
1434 1434 | ncorr = md.ncorrforpol(i) |
1435 1435 | if i == 0: |
1436 1436 | expec = 2 |
1437 1437 | else: |
1438 1438 | expec = 1 |
1439 1439 | self.assertTrue(ncorr == expec) |
1440 1440 | ncorrs = md.ncorrforpol(-1) |
1441 1441 | self.assertTrue(len(ncorrs) == 2) |
1442 1442 | print "ncorrs", ncorrs |
1443 1443 | self.assertTrue(ncorrs[0] == 2 and ncorrs[1] == 1) |
1444 - | |
1444 + | |
1445 1445 | def test_corrtypesforpol(self): |
1446 1446 | """Test msmd.corrtypesforpol()""" |
1447 1447 | md = self.md |
1448 1448 | for i in [-1, 0, 1, 2]: |
1449 1449 | if i == -1 or i == 2: |
1450 1450 | self.assertRaises(Exception, md.corrtypesforpol, i) |
1451 1451 | elif i == 0: |
1452 1452 | ct = md.corrtypesforpol(i) |
1453 1453 | self.assertTrue(len(ct) == 2) |
1454 1454 | self.assertTrue(ct[0] == 9 and ct[1] == 12) |
1455 1455 | else: |
1456 1456 | ct = md.corrtypesforpol(i) |
1457 1457 | self.assertTrue(len(ct) == 1 and ct[0] == 1) |
1458 - | |
1458 + | |
1459 1459 | def test_corrprodsforpol(self): |
1460 1460 | """Test msmd.corrprodssforpol()""" |
1461 1461 | md = self.md |
1462 1462 | for i in [-1, 0, 1, 2]: |
1463 1463 | if i == -1 or i == 2: |
1464 1464 | self.assertRaises(Exception, md.corrprodsforpol, i) |
1465 1465 | elif i == 0: |
1466 1466 | ct = md.corrprodsforpol(i) |
1467 1467 | print "got", ct |
1468 1468 | self.assertTrue(ct.size == 4) |
1469 1469 | self.assertTrue(ct[0][0] == 0 and ct[1][1] == 1) |
1470 1470 | self.assertTrue(ct[0][1] == 1 and ct[1][0] == 0) |
1471 1471 | else: |
1472 1472 | ct = md.corrprodsforpol(i) |
1473 1473 | self.assertTrue(ct.size == 2 and (ct == 0).all()) |
1474 - | |
1474 + | |
1475 1475 | def test_sourceidforfield(self): |
1476 1476 | """Test msmd.sourceidforfield()""" |
1477 1477 | md = self.md |
1478 1478 | for i in range(6): |
1479 1479 | self.assertTrue(md.sourceidforfield(i) == i) |
1480 1480 | self.assertRaises(Exception, md.sourceidforfield, -1) |
1481 1481 | self.assertRaises(Exception, md.sourceidforfield, 6) |
1482 1482 | |
1483 1483 | def test_antennasforscan(self): |
1484 1484 | """Test msmd.antennasforscan()""" |
1544 1544 | "V866 Sco", "V866 Sco", "V866 Sco", "V866 Sco", "V866 Sco", |
1545 1545 | "V866 Sco", "V866 Sco", "V866 Sco", "V866 Sco", "V866 Sco", |
1546 1546 | "V866 Sco", "V866 Sco", "V866 Sco", "RNO 90", "RNO 90", "RNO 90", |
1547 1547 | "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", |
1548 1548 | "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", |
1549 1549 | "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", |
1550 1550 | "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90", |
1551 1551 | "RNO 90", "RNO 90", "RNO 90", "RNO 90", "RNO 90" |
1552 1552 | ]) |
1553 1553 | self.assertTrue((md.sourcenames() == expec).all()) |
1554 - | |
1554 + | |
1555 1555 | def test_sourcedirs(self): |
1556 1556 | """Test msmd.sourcedirs()""" |
1557 1557 | md = self.md; |
1558 1558 | elong = [ |
1559 1559 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1560 1560 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1561 1561 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1562 1562 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1563 1563 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1564 1564 | -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , -2.8964345 , |
1658 1658 | mu = md.propermotions() |
1659 1659 | self.assertTrue(len(mu.keys()) == 200) |
1660 1660 | for i in range(200): |
1661 1661 | mymu = mu[str(i)] |
1662 1662 | lon = mymu['longitude'] |
1663 1663 | lat = mymu['latitude'] |
1664 1664 | self.assertTrue(qa.getvalue(lon) == 0) |
1665 1665 | self.assertTrue(qa.getunit(lon) == "rad/s") |
1666 1666 | self.assertTrue(qa.getvalue(lat) == 0) |
1667 1667 | self.assertTrue(qa.getunit(lat) == "rad/s") |
1668 - | |
1668 + | |
1669 1669 | def test_nsources(self): |
1670 1670 | md = self.md |
1671 1671 | self.assertTrue(md.nsources() == 6) |
1672 1672 | |
1673 1673 | def test_refdir(self): |
1674 1674 | """Test msmd.refdir()""" |
1675 1675 | md = self.md |
1676 - | epsilon = 1e-6 |
1676 + | epsilon = 1e-6 |
1677 1677 | for i in range(md.nfields()): |
1678 1678 | res = md.refdir(i) |
1679 1679 | if i == 0: |
1680 1680 | self.assertTrue(abs(1 - res['m0']['value']/-2.8964345) < epsilon) |
1681 1681 | self.assertTrue(abs(1 - res['m1']['value']/-0.10104256) < epsilon) |
1682 1682 | elif i == 1: |
1683 1683 | self.assertTrue(abs(1 - res['m0']['value']/-2.71545722) < epsilon) |
1684 1684 | self.assertTrue(abs(1 - res['m1']['value']/-0.22613985) < epsilon) |
1685 1685 | elif i == 2: |
1686 1686 | self.assertTrue(abs(1 - res['m0']['value']/-2.72554329) < epsilon) |
1694 1694 | elif i == 5: |
1695 1695 | self.assertTrue(abs(1 - res['m0']['value']/-1.94537525) < epsilon) |
1696 1696 | self.assertTrue(abs(1 - res['m1']['value']/-0.27584353) < epsilon) |
1697 1697 | |
1698 1698 | def test_CAS7837(self): |
1699 1699 | """Test corner case with no intents to make sure it doesn't segfault""" |
1700 1700 | md = self.md |
1701 1701 | importuvfits(datadir + 'regression/cvel/input/W3OH_MC.UVFITS', 'lala.ms') |
1702 1702 | md.open('lala.ms') |
1703 1703 | self.assertTrue((md.fieldsforintent('*') == numpy.array([0])).all()) |
1704 - | |
1704 + | |
1705 1705 | def test_chaneffbws(self): |
1706 1706 | """Test chaneffbws()""" |
1707 1707 | md = self.md |
1708 1708 | nspw = md.nspw() |
1709 1709 | for i in range(nspw): |
1710 1710 | ebw = md.chaneffbws(i) |
1711 1711 | ebw2 = md.chaneffbws(i, "MHz") |
1712 1712 | nchans = len(ebw); |
1713 1713 | if (nchans == 1): |
1714 1714 | continue |
1715 - | |
1715 + | |
1716 1716 | elif nchans == 4: |
1717 1717 | expec = 7.5e9; |
1718 1718 | elif nchans == 128: |
1719 1719 | expec = 1.5625e7; |
1720 1720 | elif nchans == 3840: |
1721 1721 | expec = 30517.578125; |
1722 1722 | for w in ebw: |
1723 1723 | self.assertTrue(w == expec) |
1724 1724 | for w2 in ebw2: |
1725 1725 | self.assertTrue(w2 == expec/1e6) |
1726 1726 | self.assertTrue( |
1727 1727 | near(md.chaneffbws(9, asvel=True)[0], 20.23684342, 1e-8) |
1728 1728 | ) |
1729 1729 | self.assertTrue( |
1730 1730 | near(md.chaneffbws(9, "m/s", True)[0], 20236.84342, 1e-8) |
1731 1731 | ) |
1732 - | |
1732 + | |
1733 1733 | def test_chanres(self): |
1734 1734 | """Test chanres()""" |
1735 1735 | md = self.md |
1736 1736 | nspw = md.nspw() |
1737 1737 | for i in range(nspw): |
1738 1738 | ebw = md.chanres(i) |
1739 1739 | ebw2 = md.chanres(i, "MHz") |
1740 1740 | nchans = len(ebw); |
1741 1741 | if (nchans == 1): |
1742 1742 | continue |
1749 1749 | for w in ebw: |
1750 1750 | self.assertTrue(w == expec) |
1751 1751 | for w2 in ebw2: |
1752 1752 | self.assertTrue(w2 == expec/1e6) |
1753 1753 | self.assertTrue( |
1754 1754 | near(md.chanres(9, asvel=True)[0], 20.23684342, 1e-8) |
1755 1755 | ) |
1756 1756 | self.assertTrue( |
1757 1757 | near(md.chanres(9, "m/s", True)[0], 20236.84342, 1e-8) |
1758 1758 | ) |
1759 - | |
1759 + | |
1760 1760 | def test_restfreqs(self): |
1761 1761 | """Test restfreqs()""" |
1762 1762 | md = self.md |
1763 1763 | self.assertRaises(Exception, md.restfreqs, -1, 0) |
1764 1764 | self.assertRaises(Exception, md.restfreqs, 0, -1) |
1765 1765 | self.assertRaises(Exception, md.restfreqs, 50, 0) |
1766 1766 | self.assertRaises(Exception, md.restfreqs, 0, 50) |
1767 1767 | for i in range(40): |
1768 1768 | res = md.restfreqs(0, i) |
1769 1769 | if i == 34: |
1770 1770 | self.assertTrue(len(res) == 2) |
1771 1771 | self.assertTrue(res['0']['m0']['value'] == 1e10) |
1772 1772 | self.assertTrue(res['0']['m0']['unit'] == 'Hz') |
1773 1773 | self.assertTrue(res['1']['m0']['value'] == 2e10) |
1774 1774 | self.assertTrue(res['1']['m0']['unit'] == 'Hz') |
1775 1775 | else: |
1776 1776 | self.assertFalse(res) |
1777 - | |
1777 + | |
1778 1778 | def test_transitions(self): |
1779 1779 | """Test transitions()""" |
1780 1780 | md = self.md |
1781 1781 | self.assertRaises(Exception, md.transitions, -1, 0) |
1782 1782 | self.assertRaises(Exception, md.transitions, 0, -1) |
1783 1783 | self.assertRaises(Exception, md.transitions, 50, 0) |
1784 1784 | self.assertRaises(Exception, md.transitions, 0, 50) |
1785 1785 | for i in range(40): |
1786 1786 | res = md.transitions(0, i) |
1787 1787 | if i == 34: |
1788 1788 | self.assertTrue(len(res) == 2) |
1789 1789 | self.assertTrue(res[0] == "myline") |
1790 1790 | self.assertTrue(res[1] == "yourline") |
1791 1791 | else: |
1792 1792 | self.assertFalse(res) |
1793 - | |
1793 + | |
1794 1794 | def test_CAS7986(self): |
1795 1795 | """Verify datasets with referential integrity issues cause errors""" |
1796 1796 | myms = mstool() |
1797 1797 | mymsmd = msmdtool() |
1798 1798 | mytb = tbtool() |
1799 1799 | vis = "cas7986.ms" |
1800 1800 | def allgood(): |
1801 1801 | if (os.path.exists(vis)): |
1802 1802 | shutil.rmtree(vis) |
1803 1803 | shutil.copytree(writeable, vis) |
1804 1804 | self.assertTrue(myms.open(vis)) |
1805 1805 | self.assertTrue(myms.open(vis, check=True)) |
1806 1806 | myms.done() |
1807 1807 | self.assertTrue(mymsmd.open(vis)) |
1808 1808 | mymsmd.done() |
1809 - | |
1810 - | allgood() |
1811 - | |
1809 + | |
1810 + | allgood() |
1811 + | |
1812 1812 | def dobad(colname): |
1813 1813 | mytb.open(vis, nomodify=False) |
1814 1814 | mytb.putcell(colname, 20, 9) |
1815 1815 | mytb.done() |
1816 1816 | self.assertTrue(myms.open(vis)) |
1817 1817 | self.assertRaises( |
1818 1818 | Exception, myms.open, vis, check=True |
1819 1819 | ) |
1820 1820 | myms.done() |
1821 1821 | self.assertRaises( |
1822 1822 | Exception, mymsmd.open, vis |
1823 1823 | ) |
1824 - | mymsmd.done() |
1824 + | mymsmd.done() |
1825 1825 | |
1826 1826 | # insert a bad antenna |
1827 1827 | dobad("ANTENNA1") |
1828 1828 | allgood() |
1829 1829 | dobad("ANTENNA2") |
1830 1830 | allgood() |
1831 1831 | dobad("DATA_DESC_ID") |
1832 1832 | allgood() |
1833 1833 | dobad("FIELD_ID") |
1834 - | |
1834 + | |
1835 1835 | def test_nbaselines(self): |
1836 1836 | """Verify nbaselines()""" |
1837 1837 | md = self.md |
1838 1838 | self.assertTrue(md.nbaselines() == 21, "wrong number of baselines for default value of ac") |
1839 1839 | self.assertTrue(md.nbaselines(True) == 25, "wrong number of baselines for ac = True") |
1840 1840 | self.assertTrue(md.nbaselines(False) == 21, "wrong number of baselines for ac = False") |
1841 - | |
1841 + | |
1842 1842 | def suite(): |
1843 1843 | return [msmd_test] |