1 - | import os |
2 - | import numpy |
3 - | import sys |
4 - | import shutil |
5 - | import unittest |
6 - | |
7 - | import numpy as np |
8 - | |
9 - | from casatasks import importuvfits |
10 - | |
11 - | from casatools import ctsys |
12 - | from casatools import ms as mstool |
13 - | from casatools import msmetadata as msmdtool |
14 - | from casatools import table as tbtool |
15 - | from casatools import quanta |
16 - | |
17 - | ''' |
18 - | Unit tests for UVFITS I/O tasks. |
19 - | |
20 - | Features tested: |
21 - | 0. Can multiple spws with the same # of channels be exported to UVFITS |
22 - | using padwithflags? |
23 - | 1. When that UVFITS file is read back in, is its data still correct? |
24 - | ''' |
25 - | |
26 - | datapath = ctsys.resolve('unittest/importuvfits/') |
27 - | |
28 - | def check_eq(val, expval, tol=None): |
29 - | """Checks that val matches expval within tol.""" |
30 - | try: |
31 - | if tol: |
32 - | are_eq = abs(val - expval) < tol |
33 - | else: |
34 - | are_eq = val == expval |
35 - | if hasattr(are_eq, 'all'): |
36 - | are_eq = are_eq.all() |
37 - | if not are_eq: |
38 - | raise ValueError('!=') |
39 - | except ValueError: |
40 - | raise ValueError("%r != %r" % (val, expval)) |
41 - | |
42 - | |
43 - | class importuvfits_test(unittest.TestCase): |
44 - | |
45 - | |
46 - | inpms = 'cvel/input/ANTEN_sort_hann_for_cvel_reg.ms' |
47 - | |
48 - | origms = 'start.ms' |
49 - | fitsfile = 'hanningsmoothed.UVF' |
50 - | msfromfits = 'end.ms' |
51 - | |
52 - | records = {} |
53 - | need_to_initialize = True |
54 - | do_teardown = False |
55 - | |
56 - | |
57 - | def setUp(self): |
58 - | self.qa = quanta( ) |
59 - | |
60 - | |
61 - | |
62 - | |
63 - | |
64 - | |
65 - | |
66 - | |
67 - | |
68 - | |
69 - | |
70 - | |
71 - | |
72 - | |
73 - | |
74 - | |
75 - | |
76 - | |
77 - | |
78 - | |
79 - | |
80 - | |
81 - | |
82 - | |
83 - | |
84 - | |
85 - | |
86 - | |
87 - | |
88 - | |
89 - | def tearDown(self): |
90 - | if self.do_teardown: |
91 - | self.qa.done( ) |
92 - | shutil.rmtree(self.origms) |
93 - | shutil.rmtree(self.msfromfits) |
94 - | os.remove(self.fitsfile) |
95 - | self.do_teardown = False |
96 - | |
97 - | |
98 - | |
99 - | |
100 - | |
101 - | |
102 - | |
103 - | |
104 - | |
105 - | |
106 - | |
107 - | |
108 - | |
109 - | |
110 - | |
111 - | |
112 - | |
113 - | |
114 - | |
115 - | |
116 - | |
117 - | def test_receptor_angle(self): |
118 - | """CAS-7081: Test receptor angle is preserved""" |
119 - | myms = mstool() |
120 - | msname = os.path.join(datapath, "uvfits_test.ms") |
121 - | self.assertTrue(myms.open(msname), "Input dataset not found") |
122 - | uvfits = "xyz.uvfits" |
123 - | self.assertTrue(myms.tofits(uvfits), "Failed to write uvfits") |
124 - | myms.done() |
125 - | feed = "/FEED" |
126 - | mytb = tbtool() |
127 - | mytb.open(msname + feed) |
128 - | rec_ang = "RECEPTOR_ANGLE" |
129 - | expec = mytb.getcol(rec_ang) |
130 - | mytb.done() |
131 - | importname = "kf.ms" |
132 - | importuvfits(fitsfile=uvfits, vis=importname) |
133 - | mytb.open(importname + feed) |
134 - | got = mytb.getcol(rec_ang) |
135 - | mytb.done() |
136 - | self.assertTrue(np.max(np.abs(got-expec)) < 1e-7, "Receptor angles not preserved") |
137 - | |
138 - | def suite(): |
139 - | return [importuvfits_test] |
140 - | |
141 - | if __name__ == '__main__': |
142 - | unittest.main() |