42 + | try: |
43 + | from casatools import ctsys, image, table, quanta, regionmanager |
44 + | _tb = table() |
45 + | _rg = regionmanager() |
46 + | _qa = quanta() |
47 + | ctsys_resolve = ctsys.resolve |
48 + | is_CASA6 = True |
49 + | except ImportError: |
50 + | from tasks import * |
51 + | from taskinit import * |
52 + | import casac |
53 + | from __main__ import * |
54 + | _tb = tbtool() |
55 + | _rg = rgtool() |
56 + | image = iatool |
57 + | _qa = qatool() |
58 + | dataRoot = os.path.join(os.environ.get('CASAPATH').split()[0],'data') |
59 + | def ctsys_resolve(apath): |
60 + | return os.path.join(dataRoot,apath) |
61 + | is_CASA6 = False |
62 + | |
63 + | def _near(got, expected, tol): |
64 + | return _qa.le( |
65 + | _qa.div(_qa.abs(_qa.sub(got, expected)), expected), tol |
66 + | ) |
67 + | |
68 + | def make_gauss2d(shape, xfwhm, yfwhm): |
69 + | fac = 4*math.log(2) |
70 + | values = np.empty(shape, dtype=float) |
71 + | for i in range(shape[0]): |
72 + | x = shape[0]/2 - i |
73 + | for j in range(shape[1]): |
74 + | y = shape[1]/2 - j |
75 + | xfac = x*x*fac/(xfwhm*xfwhm) |
76 + | yfac = y*y*fac/(yfwhm*yfwhm) |
77 + | values[i, j] = math.exp(-(xfac + yfac)); |
78 + | return values |
79 + | |
80 + | def run_convolve2d( |
81 + | imagename, major, minor, pa, targetres, |
82 + | outfile, kernel="gauss", beam={}, overwrite=False |
83 + | ): |
84 + | myia = image() |
85 + | myia.open(imagename) |
86 + | res = myia.convolve2d( |
87 + | type=kernel, |
88 + | major=major, minor=minor, pa=pa, |
89 + | targetres=targetres, outfile=outfile, |
90 + | beam=beam, overwrite=overwrite |
91 + | ) |
92 + | myia.done() |
93 + | res.done() |
94 + | |
82 - | pass |
101 + | self.assertTrue(len(_tb.showcache()) == 0, 'table cache is not empty') |
102 + | |
103 + | cwd = os.getcwd() |
104 + | for filename in os.listdir(cwd): |
105 + | file_path = os.path.join(cwd, filename) |
106 + | try: |
107 + | if os.path.isfile(file_path) or os.path.islink(file_path): |
108 + | os.unlink(file_path) |
109 + | elif os.path.isdir(file_path): |
110 + | |
111 + | if filename != 'xml': |
112 + | shutil.rmtree(file_path) |
113 + | except Exception as e: |
114 + | print('Failed to delete %s. Reason: %s' % (file_path, e)) |
115 + | |
116 + | def _compare_beams(self, beam1, beam2): |
117 + | self.assertTrue(_near(beam1["major"], beam2["major"], 2e-5)) |
118 + | self.assertTrue(_near(beam1["minor"], beam2["minor"], 2e-5)) |
119 + | pa = [] |
120 + | for b in [beam1, beam2]: |
121 + | if "positionangle" in b: |
122 + | pa.append(b["positionangle"]) |
123 + | else: |
124 + | pa.append(b["pa"]) |
125 + | |
126 + | diff = abs( |
127 + | _qa.sub( |
128 + | _qa.quantity(pa[0]), |
129 + | _qa.quantity(pa[1]) |
130 + | )["value"] |
131 + | ) |
132 + | self.assertTrue(diff < 1e-5) |
133 + | |
134 + | def test_multibeam(self): |
135 + | """Test per plane beams""" |
136 + | myia = image() |
137 + | imname = "test_image2dconvolver_multibeam.im" |
138 + | shutil.copytree(ctsys_resolve(os.path.join(self.datapath, imname)), imname) |
139 + | myia.open(imname) |
140 + | major = "10arcmin" |
141 + | minor = "8arcmin" |
142 + | pa = "80deg" |
143 + | got = myia.convolve2d(axes=[0, 1], major=major, minor=minor, pa=pa) |
144 + | shape = myia.shape() |
145 + | for i in range(5): |
146 + | blc=[0, 0, i] |
147 + | trc=[shape[0]-1, shape[1]-1, i] |
148 + | reg = _rg.box(blc=blc, trc=trc) |
149 + | xx = myia.subimage(region=reg) |
150 + | exp = xx.convolve2d(axes=[0, 1], major=major, minor=minor, pa=pa) |
151 + | expbeam = exp.restoringbeam() |
152 + | gotbeam = got.restoringbeam(channel=i) |
153 + | for j in ["major", "minor", "positionangle"]: |
154 + | self.assertTrue(_near(gotbeam[j], expbeam[j], 2e-7)) |
155 + | self.assertTrue(abs(got.getchunk(blc=blc, trc=trc) - exp.getchunk()).max() < 3e-5) |
156 + | exp.done() |
157 + | xx.done() |
158 + | myia.done() |
159 + | got.done() |
160 + | |
161 + | def test_targetres(self): |
162 + | """Test targetres parameter""" |
163 + | myia = image() |
164 + | imagename = "tres1.im" |
165 + | myia.fromshape(imagename, [100, 100]) |
166 + | csys = myia.coordsys() |
167 + | csys.setunits(["arcsec", "arcsec"]) |
168 + | csys.setincrement([-1, 1]) |
169 + | myia.setcoordsys(csys.torecord()) |
170 + | myia.setrestoringbeam(major="6arcsec", minor="3arcsec", pa="0deg") |
171 + | shape = myia.shape() |
172 + | values = make_gauss2d(shape, 3.0, 6.0) |
173 + | expected = make_gauss2d(shape, 5.0, 10.0) |
174 + | myia.putchunk(values) |
175 + | myia.done() |
176 + | emaj = _qa.quantity("10arcsec") |
177 + | emin = _qa.quantity("5arcsec") |
178 + | epa = _qa.quantity("0deg") |
179 + | for unit in ("Jy/beam", "K"): |
180 + | myia.open(imagename) |
181 + | myia.setbrightnessunit(unit) |
182 + | myia.done() |
183 + | expected = make_gauss2d(shape, 5.0, 10.0) |
184 + | if (unit == "K"): |
185 + | expected *= 3.0*6.0/5.0/10.0 |
186 + | |
187 + | for targetres in [False, True]: |
188 + | if not targetres: |
189 + | major = "8arcsec" |
190 + | minor = "4arcsec" |
191 + | pa = "0deg" |
192 + | outfile = "tres1" + unit[0] |
193 + | else: |
194 + | major = "10arcsec" |
195 + | minor = "5arcsec" |
196 + | pa = "0deg" |
197 + | outfile = "tres2" + unit[0] |
198 + | run_convolve2d( |
199 + | imagename=imagename, kernel="gaussian", |
200 + | major=major, minor=minor, pa=pa, targetres=targetres, |
201 + | outfile=outfile |
202 + | ) |
203 + | myia.open(outfile) |
204 + | gotbeam = myia.restoringbeam() |
205 + | gotvals = myia.getchunk() |
206 + | myia.done() |
207 + | self._compare_beams( |
208 + | gotbeam, {"major": emaj, "minor": emin, "pa": epa} |
209 + | ) |
211 + | def test_beam(self): |
212 + | """Test the beam parameter""" |
213 + | myia = image() |
214 + | imagename = "tbeam1.im" |
215 + | myia.fromshape(imagename, [100, 100]) |
216 + | csys = myia.coordsys() |
217 + | csys.setunits(["arcsec", "arcsec"]) |
218 + | csys.setincrement([1, 1]) |
219 + | myia.setcoordsys(csys.torecord()) |
220 + | myia.setbrightnessunit("Jy/beam") |
221 + | myia.setrestoringbeam(major="6arcsec", minor="3arcsec", pa="0deg") |
222 + | shape = myia.shape() |
223 + | myia.putchunk(make_gauss2d(shape, 3.0, 6.0)) |
224 + | expected = make_gauss2d(shape, 5.0, 10.0) |
225 + | for beam in [ |
226 + | {"major": "8arcsec", "minor": "4arcsec", "pa": "0deg"}, |
227 + | { |
228 + | "major": {"unit": "arcsec", "value": 8}, |
229 + | "minor": {"unit": "arcsec", "value": 4}, |
230 + | "pa": {"unit": "deg", "value": 0}, |
231 + | } |
232 + | ]: |
233 + | outfile = 'convolve2d' |
234 + | x = run_convolve2d( |
235 + | imagename=imagename, major="", minor="", pa="", |
236 + | beam=beam, outfile=outfile, targetres=False, |
237 + | overwrite=True |
238 + | ) |
239 + | if type(x) == type(myia): |
240 + | x.done() |
241 + | myia.open(outfile) |
242 + | maxdiff = (abs(myia.getchunk()-expected)).max() |
243 + | self.assertTrue(maxdiff < 1e-6) |
244 + | myia.done() |
245 + | |
246 + | def test_history(self): |
247 + | """Test that history is written""" |
248 + | myia = image() |
249 + | imagename = "zz.im" |
250 + | myia.fromshape(imagename, [20,20]) |
251 + | major = "2arcmin" |
252 + | minor = "2arcmin" |
253 + | pa = "0deg" |
254 + | bb = myia.convolve2d("", major=major, minor=minor, pa=pa) |
255 + | myia.done() |
256 + | msgs = bb.history() |
257 + | bb.done() |
258 + | teststr = "ia.convolve2d" |
259 + | self.assertTrue(teststr in msgs[-4], "'" + teststr + "' not found") |
260 + | self.assertTrue(teststr in msgs[-3], "'" + teststr + "' not found") |
261 + | |