1 + | |
2 + | |
3 + | |
4 + | |
5 + | |
6 + | |
7 + | |
8 + | |
9 + | |
10 + | |
11 + | |
12 + | |
13 + | |
14 + | |
15 + | |
16 + | |
17 + | |
18 + | |
19 + | |
20 + | |
21 + | import contextlib |
22 + | import csv |
23 + | import os |
24 + | import shutil |
25 + | import tempfile |
26 + | import unittest |
27 + | from unittest.mock import patch |
28 + | import uuid |
29 + | |
30 + | import numpy as np |
31 + | |
32 + | from casatestutils import testhelper as th |
33 + | |
34 + | from casatasks import gencal, rmtables |
35 + | from casatasks.private import tec_maps |
36 + | from casatools import ctsys, table |
37 + | |
38 + | _tb = table() |
39 + | |
40 + | datapath = ctsys.resolve('/unittest/gencal/') |
41 + | |
42 + | |
43 + | evndata = 'n08c1.ms' |
44 + | vlbadata = 'ba123a.ms' |
45 + | vlbacal = os.path.join(datapath, 'ba123a.gc') |
46 + | evncal = os.path.join(datapath, 'n08c1.tsys') |
47 + | |
48 + | caltab = 'cal.A' |
49 + | evncopy = 'evn_copy.ms' |
50 + | vlbacopy = 'vlba_copy.ms' |
51 + | |
52 + | |
53 + | vladata = 'tdem0003gencal.ms' |
54 + | vlacopy = 'vla_copy.ms' |
55 + | vlacal = 'vla.gc' |
56 + | vlacaltab = os.path.join(datapath, 'gencalGaincurveRef.gc') |
57 + | |
58 + | |
59 + | |
60 + | ''' |
61 + | Unit tests for gencal |
62 + | ''' |
63 + | |
64 + | |
65 + | |
66 + | |
67 + | |
68 + | |
69 + | |
70 + | |
71 + | |
72 + | testmms = False |
73 + | if 'TEST_DATADIR' in os.environ: |
74 + | DATADIR = str(os.environ.get('TEST_DATADIR'))+'/gencal/' |
75 + | if os.path.isdir(DATADIR): |
76 + | testmms = True |
77 + | datapath = DATADIR |
78 + | else: |
79 + | print('WARN: directory '+DATADIR+' does not exist') |
80 + | |
81 + | print('gencal tests will use data from ' + datapath) |
82 + | |
83 + | |
84 + | class gencal_antpostest(unittest.TestCase): |
85 + | |
86 + | |
87 + | msfile = 'tdem0003gencal.ms' |
88 + | |
89 + | msfile2 = 'auto_antposcorr_evla_gencal.ms' |
90 + | |
91 + | |
92 + | caltable = 'anpos.cal' |
93 + | reffile1 = os.path.join(datapath+'evla_reference/', 'anpos.manual.cal') |
94 + | reffile2 = os.path.join(datapath+'evla_reference/', 'anpos.auto.cal') |
95 + | reffile3 = os.path.join(datapath+'evla_reference/', 'anpos.autoCAS13057.cal') |
96 + | res = False |
97 + | |
98 + | def setUp(self): |
99 + | if (os.path.exists(self.msfile)): |
100 + | shutil.rmtree(self.msfile) |
101 + | if (os.path.exists(self.msfile2)): |
102 + | shutil.rmtree(self.msfile2) |
103 + | |
104 + | shutil.copytree(os.path.join(datapath, self.msfile), self.msfile, symlinks=True) |
105 + | shutil.copytree(os.path.join(datapath, self.msfile2), self.msfile2, symlinks=True) |
106 + | |
107 + | def tearDown(self): |
108 + | if (os.path.exists(self.msfile)): |
109 + | shutil.rmtree(self.msfile) |
110 + | if (os.path.exists(self.msfile2)): |
111 + | shutil.rmtree(self.msfile2) |
112 + | |
113 + | shutil.rmtree(self.caltable, ignore_errors=True) |
114 + | |
115 + | def test_antpos_manual(self): |
116 + | """Test manual antenna position correction.""" |
117 + | gencal(vis=self.msfile, |
118 + | caltable=self.caltable, |
119 + | caltype='antpos', |
120 + | antenna='ea12,ea22', |
121 + | parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190]) |
122 + | |
123 + | self.assertTrue(os.path.exists(self.caltable)) |
124 + | |
125 + | |
126 + | |
127 + | |
128 + | reference = self.reffile1 |
129 + | self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID'])) |
130 + | |
131 + | def test_antpos_auto_evla(self): |
132 + | """Test automated antenna position correction.""" |
133 + | |
134 + | from urllib.request import urlopen |
135 + | from urllib.error import URLError |
136 + | |
137 + | |
138 + | evlabslncorrURL = "http://www.vla.nrao.edu/cgi-bin/evlais_blines.cgi?Year=" |
139 + | try: |
140 + | urlaccess = urlopen(evlabslncorrURL+"2010", timeout=60.0) |
141 + | gencal(vis=self.msfile, |
142 + | caltable=self.caltable, |
143 + | caltype='antpos', |
144 + | antenna='') |
145 + | |
146 + | self.assertTrue(os.path.exists(self.caltable)) |
147 + | |
148 + | |
149 + | |
150 + | |
151 + | reference = self.reffile2 |
152 + | self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID'])) |
153 + | |
154 + | except URLError as err: |
155 + | print("Cannot access %s , skip this test" % evlabslncorrURL) |
156 + | self.res = True |
157 + | |
158 + | def test_antpos_auto_evla_CAS13057(self): |
159 + | """ |
160 + | gencal: test a bugfix of CAS-13057 for automated antenna position correction |
161 + | """ |
162 + | |
163 + | from urllib.request import urlopen |
164 + | from urllib.error import URLError |
165 + | |
166 + | |
167 + | evlabslncorrURL = "http://www.vla.nrao.edu/cgi-bin/evlais_blines.cgi?Year=" |
168 + | try: |
169 + | urlaccess = urlopen(evlabslncorrURL+"2019", timeout=60.0) |
170 + | gencal(vis=self.msfile2, |
171 + | caltable=self.caltable, |
172 + | caltype='antpos', |
173 + | antenna='') |
174 + | |
175 + | self.assertTrue(os.path.exists(self.caltable)) |
176 + | |
177 + | |
178 + | |
179 + | |
180 + | |
181 + | reference = self.reffile3 |
182 + | self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID', 'FPARAM'])) |
183 + | |
184 + | |
185 + | |
186 + | _tb.open(self.caltable) |
187 + | curfparam=_tb.getcol('FPARAM').transpose() |
188 + | _tb.close() |
189 + | _tb.open(reference) |
190 + | reffparam=_tb.getcol('FPARAM').transpose() |
191 + | _tb.close() |
192 + | self.assertTrue((curfparam[21]==reffparam[21]).all()) |
193 + | self.assertTrue((curfparam[23]==reffparam[23]).all()) |
194 + | |
195 + | except URLError as err: |
196 + | print("Cannot access %s , skip this test" % evlabslncorrURL) |
197 + | self.res = True |
198 + | |
199 + | def test_antpos_manual_time_limit_evla(self): |
200 + | """ |
201 + | gencal: test if time limit sets cutoff date for antpos corrections |
202 + | """ |
203 + | |
204 + | gencal(vis=self.msfile2, |
205 + | caltable=self.caltable, |
206 + | caltype='antpos', |
207 + | ant_pos_time_limit=400) |
208 + | |
209 + | _tb.open(self.caltable) |
210 + | res = np.mean(_tb.getcol('FPARAM')) |
211 + | _tb.close() |
212 + | |
213 + | self.assertTrue(np.isclose(res, -1.2345658040341035e-06, atol=1e-5)) |
214 + | |
215 + | shutil.rmtree(self.caltable) |
216 + | |
217 + | |
218 + | gencal(vis=self.msfile2, |
219 + | caltable=self.caltable, |
220 + | caltype='antpos', |
221 + | ant_pos_time_limit=0) |
222 + | |
223 + | _tb.open(self.caltable) |
224 + | res = np.mean(_tb.getcol('FPARAM')) |
225 + | _tb.close() |
226 + | |
227 + | self.assertTrue(np.isclose(res, -5.308641580703818e-05, atol=1e-5)) |
228 + | |
229 + | |
230 + | |
231 + | class test_gencal_antpos_alma(unittest.TestCase): |
232 + | """Tests the automatic generation of antenna position corrections for ALMA. |
233 + | |
234 + | New REST web service: |
235 + | https://bitbucket.sco.alma.cl/projects/ALMA/repos/almasw/browse/CONTROL-SERVICES/PositionsService |
236 + | |
237 + | |
238 + | Old SOAP web service: |
239 + | http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl |
240 + | Example minimalistic use of a client to query the service: |
241 + | from suds.client import Client |
242 + | srv_wsdl_url = 'http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl' |
243 + | ws_cli = Client(srv_wsdl_url) |
244 + | resp = ws_cli.service.getAntennaPositions("CURRENT.AOS", "DA49", |
245 + | "2017-01-30T01:53:54") |
246 + | """ |
247 + | |
248 + | |
249 + | ALMA_SRV_WSDL_URL = 'http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl' |
250 + | |
251 + | |
252 + | |
253 + | ALMA_MS = 'uid___A002_X72c4aa_X8f5_scan21_spw18_field2_corrXX.ms' |
254 + | CAL_TYPE = 'antpos' |
255 + | REF_CALTABLE_MANUAL = os.path.join(datapath, 'alma_reference/A002_X72c4aa_ref_ant_pos.manual.cal') |
256 + | REF_CALTABLE_AUTO = os.path.join(datapath, 'alma_reference/A002_X72c4aa_ref_ant_pos.auto.cal') |
257 + | IGNORE_COLS = ['WEIGHT', 'OBSERVATION_ID'] |
258 + | |
259 + | def setUp(self): |
260 + | if (os.path.exists(self.ALMA_MS)): |
261 + | shutil.rmtree(self.ALMA_MS) |
262 + | |
263 + | shutil.copytree(os.path.join(datapath, self.ALMA_MS), |
264 + | self.ALMA_MS, symlinks=True) |
265 + | |
266 + | def tearDown(self): |
267 + | if (os.path.exists(self.ALMA_MS)): |
268 + | shutil.rmtree(self.ALMA_MS) |
269 + | |
270 + | def remove_caltable(self, ct_name): |
271 + | """ Removes a cal table. ct_name: path to the caltable """ |
272 + | import shutil |
273 + | shutil.rmtree(ct_name) |
274 + | |
275 + | def test_antpos_alma_manual(self): |
276 + | """ |
277 + | gencal: manual antenna position correction on ALMA table |
278 + | """ |
279 + | |
280 + | out_caltable = 'ant_pos_man.cal' |
281 + | gencal(vis=self.ALMA_MS, |
282 + | caltable=out_caltable, |
283 + | caltype=self.CAL_TYPE, |
284 + | antenna='DV07,DV10,DV11', |
285 + | parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190]) |
286 + | |
287 + | self.assertTrue(os.path.exists(out_caltable), |
288 + | "The output cal table should have been created") |
289 + | |
290 + | |
291 + | self.assertTrue(th.compTables(out_caltable, |
292 + | self.REF_CALTABLE_MANUAL, |
293 + | self.IGNORE_COLS)) |
294 + | |
295 + | self.remove_caltable(out_caltable) |
296 + | |
297 + | @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' |
298 + | 'TMCDB based auto correction in gencal is validated.') |
299 + | def tmp_disabled_test_antpos_alma_server_SOAP_methods(self): |
300 + | """ |
301 + | gencal: connection to alma TCM DB AntennaPadService for ALMA |
302 + | """ |
303 + | try: |
304 + | |
305 + | import urllib2 |
306 + | from suds.client import Client |
307 + | ws_cli = Client(self.ALMA_SRV_WSDL_URL) |
308 + | |
309 + | |
310 + | method_name = 'getAntennaPositions' |
311 + | self.assertTrue(callable(getattr(ws_cli.service, method_name)), |
312 + | 'The client service should have this method: {}, and ' |
313 + | 'it should be callable.'.format(method_name)) |
314 + | except ImportError as exc: |
315 + | print('Cannot import required dependencies to query the ALMA TCM DB web service') |
316 + | raise |
317 + | except urllib2.URLError as exc: |
318 + | print('Connection/network error while querying the ALMA TCM DB web service') |
319 + | raise |
320 + | |
321 + | @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' |
322 + | 'TMCDB based auto correction in gencal is validated.') |
323 + | def tmp_disabled_test_antpos_auto_alma_SOAP_empty_query(self): |
324 + | """ |
325 + | gencal: empty query (empty antennas list) to the (old) SOAP TCMDB AntennaPadService |
326 + | web service (ALMA) |
327 + | """ |
328 + | try: |
329 + | import correct_ant_posns_alma as almacor |
330 + | |
331 + | resp = almacor.query_tmcdb_antennas_rest([], '2017-01-01T16:53:54.000') |
332 + | if resp: |
333 + | raise RuntimeError('Unexpected response for an empty query: {0}'. |
334 + | format(resp)) |
335 + | except ImportError: |
336 + | print('Cannot import required dependencies to query the ALMA TCM DB web service') |
337 + | raise |
338 + | except urllib2.URLError as exc: |
339 + | print('Connection/network error while querying the ALMA TCM DB web service') |
340 + | raise |
341 + | |
342 + | @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' |
343 + | 'TMCDB based auto correction in gencal is validated.') |
344 + | def tmp_disabled_test_antpos_auto_web_srv_SOAP_alma(self): |
345 + | """ |
346 + | gencal: auto gencal using data from TCM DB AntennaPadService (ALMA) |
347 + | """ |
348 + | |
349 + | import urllib2 |
350 + | |
351 + | out_caltable = 'ant_pos_web_srv.cal' |
352 + | try: |
353 + | |
354 + | |
355 + | gencal(vis=self.ALMA_MS, caltable=out_caltable, caltype=self.CAL_TYPE) |
356 + | except ImportError: |
357 + | print('Cannot import required dependencies to query the ALMA TCM DB web service') |
358 + | raise |
359 + | except urllib2.URLError: |
360 + | print('Connection/network error while querying the ALMA TCM DB web service') |
361 + | raise |
362 + | |
363 + | self.assertTrue(os.path.exists(out_caltable), |
364 + | "The output cal table should have been created: {0}". |
365 + | format(out_caltable)) |
366 + | |
367 + | |
368 + | self.assertTrue(th.compTables(out_caltable, |
369 + | self.REF_CALTABLE_AUTO, |
370 + | self.IGNORE_COLS)) |
371 + | self.remove_caltable(out_caltable) |
372 + | |
373 + | @unittest.skip('REST Position service needs validation and final deployment') |
374 + | def tmp_disabled_test_antpos_auto_alma_REST_empty_query(self): |
375 + | """ |
376 + | gencal: empty query (empty antennas list) to the (new) REST TCMDB Positions |
377 + | web service (ALMA) |
378 + | """ |
379 + | import urllib2 |
380 + | |
381 + | TEST_HOSTNAME = 'https://2018may.asa-test.alma.cl' |
382 + | |
383 + | hostname = TEST_HOSTNAME |
384 + | port = 443 |
385 + | api = 'antenna-position/position/antenna' |
386 + | try: |
387 + | import requests |
388 + | import correct_ant_posns_alma as almacor |
389 + | |
390 + | tstamp = '2017-01-01T16:53:54.000' |
391 + | |
392 + | resp = almacor.query_tmcdb_antennas_rest([], tstamp) |
393 + | if resp: |
394 + | raise RuntimeError('Unexpected response for an empty query: {0}'. |
395 + | format(resp)) |
396 + | |
397 + | |
398 + | url = '{}:{}/{}?antenna={}×tamp={}'.format(hostname, port, api, '', |
399 + | '2017-01-01T16:53:54.000') |
400 + | resp = requests.get(url) |
401 + | if resp: |
402 + | raise RuntimeError('Unexpected response for an empty query: {0}'. |
403 + | format(resp)) |
404 + | except ImportError: |
405 + | print('Cannot import required dependencies to query the ALMA TCM DB web service') |
406 + | raise |
407 + | except urllib2.URLError as exc: |
408 + | print('Connection/network error while querying the ALMA TCM DB web service') |
409 + | raise |
410 + | |
411 + | @unittest.skip('REST Position service needs validation and final deployment') |
412 + | def tmp_disabled_test_antpos_auto_web_srv_REST_alma(self): |
413 + | """ |
414 + | gencal: auto gencal using data from TCMDB Positions service (ALMA) |
415 + | """ |
416 + | |
417 + | import urllib2 |
418 + | |
419 + | out_caltable = 'ant_pos_web_srv.cal' |
420 + | try: |
421 + | |
422 + | |
423 + | gencal(vis=self.ALMA_MS, caltable=out_caltable, caltype=self.CAL_TYPE) |
424 + | except urllib2.URLError: |
425 + | print('Connection/network error while querying the ALMA TCMDB Positions web service') |
426 + | raise |
427 + | |
428 + | self.assertTrue(os.path.exists(out_caltable), |
429 + | "The output cal table should have been created: {0}". |
430 + | format(out_caltable)) |
431 + | |
432 + | |
433 + | self.assertTrue(th.compTables(out_caltable, |
434 + | self.REF_CALTABLE_AUTO, |
435 + | self.IGNORE_COLS)) |
436 + | self.remove_caltable(out_caltable) |
437 + | |
438 + | |
439 + | class gencal_test_tec_vla(unittest.TestCase): |
440 + | |
441 + | |
442 + | msfile = 'tdem0003gencal.ms' |
443 + | igsfile = 'igsg1160.10i' |
444 + | tecfile = msfile+'.IGS_TEC.im' |
445 + | rmstecfile = msfile+'.IGS_RMS_TEC.im' |
446 + | caltable = msfile+'_tec.cal' |
447 + | newigsfile='IGS0OPSFIN_20233350000_01D_02H_GIM.INX' |
448 + | |
449 + | |
450 + | |
451 + | def setUp(self): |
452 + | self.tearDown() |
453 + | shutil.copytree(os.path.join(datapath, self.msfile), self.msfile, symlinks=True) |
454 + | |
455 + | def tearDown(self): |
456 + | if os.path.exists(self.msfile): |
457 + | shutil.rmtree(self.msfile) |
458 + | |
459 + | if os.path.exists(self.igsfile): |
460 + | os.remove(self.igsfile) |
461 + | |
462 + | shutil.rmtree(self.tecfile, ignore_errors=True) |
463 + | shutil.rmtree(self.rmstecfile, ignore_errors=True) |
464 + | shutil.rmtree(self.caltable, ignore_errors=True) |
465 + | |
466 + | |
467 + | if os.path.exists(self.newigsfile): |
468 + | os.remove(self.newigsfile) |
469 + | |
470 + | def test_tec_maps(self): |
471 + | """ |
472 + | gencal: very basic test of tec_maps and gencal(caltype='tecim') |
473 + | """ |
474 + | |
475 + | try: |
476 + | tec_maps.create0(self.msfile) |
477 + | gencal(vis=self.msfile, caltable=self.caltable, caltype='tecim', infile=self.msfile+'.IGS_TEC.im') |
478 + | |
479 + | self.assertTrue(os.path.exists(self.caltable)) |
480 + | |
481 + | _tb.open(self.caltable) |
482 + | nrows = _tb.nrows() |
483 + | dtecu = abs(13.752-np.mean(_tb.getcol('FPARAM'))/1e16) |
484 + | _tb.close() |
485 + | |
486 + | |
487 + | |
488 + | self.assertTrue(nrows == 1577) |
489 + | self.assertTrue(dtecu < 1e-3) |
490 + | |
491 + | |
492 + | |
493 + | |
494 + | |
495 + | a=tec_maps.get_IGS_TEC('2023/12/01') |
496 + | self.assertTrue(os.path.exists(self.newigsfile)) |
497 + | self.assertTrue(a[9]=='IGS_Final_Product') |
498 + | |
499 + | except: |
500 + | |
501 + | raise |
502 + | |
503 + | |
504 + | class gencal_gaincurve_test(unittest.TestCase): |
505 + | |
506 + | @classmethod |
507 + | def setUpClass(cls): |
508 + | |
509 + | shutil.copytree(os.path.join(datapath, vladata), vlacopy) |
510 + | shutil.copytree(os.path.join(datapath, evndata), evncopy) |
511 + | shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) |
512 + | |
513 + | def setUp(self): |
514 + | pass |
515 + | |
516 + | def tearDown(self): |
517 + | rmtables(vlacal) |
518 + | rmtables(caltab) |
519 + | |
520 + | @classmethod |
521 + | def tearDownClass(cls): |
522 + | shutil.rmtree(vlacopy) |
523 + | shutil.rmtree(evncopy) |
524 + | shutil.rmtree(vlbacopy) |
525 + | |
526 + | def test_gainCurveVLA(self): |
527 + | ''' Test calibration table produced when gencal is run on a *VLA* MS and relying on data/nrao/VLA/GainCurves ''' |
528 + | |
529 + | gencal(vis=vlacopy, caltable=vlacal, caltype='gc') |
530 + | |
531 + | self.assertTrue(os.path.exists(vlacaltab)) |
532 + | self.assertTrue(th.compTables(vlacaltab, vlacal, ['WEIGHT'])) |
533 + | |
534 + | def test_gainCurveVLBA(self): |
535 + | ''' Test calibration table produced when gencal is run on a VLBA MS with an internal GAIN_CURVE table ''' |
536 + | |
537 + | gencal(vis=vlbacopy, caltable=caltab, caltype='gc') |
538 + | |
539 + | self.assertTrue(os.path.exists(caltab)) |
540 + | self.assertTrue(th.compTables(caltab, vlbacal, ['WEIGHT'])) |
541 + | |
542 + | def test_noGainCurveEVN(self): |
543 + | ''' Test that when gencal is run on an EVN MS with no GAIN_CURVE table it creates no calibration table ''' |
544 + | |
545 + | try: |
546 + | gencal(vis=evncopy, caltable=caltab, caltype='gc') |
547 + | except: |
548 + | pass |
549 + | |
550 + | self.assertFalse(os.path.exists(caltab)) |
551 + | |
552 + | |
553 + | class gencal_tsys_test(unittest.TestCase): |
554 + | |
555 + | @classmethod |
556 + | def setUpClass(cls): |
557 + | shutil.copytree(os.path.join(datapath, evndata), evncopy) |
558 + | shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) |
559 + | |
560 + | def setUp(self): |
561 + | pass |
562 + | |
563 + | def tearDown(self): |
564 + | rmtables(caltab) |
565 + | |
566 + | @classmethod |
567 + | def tearDownClass(cls): |
568 + | shutil.rmtree(evncopy) |
569 + | shutil.rmtree(vlbacopy) |
570 + | |
571 + | def test_tsys(self): |
572 + | ''' Test calibration table produced when gencal is run on an MS with a SYSCAL table''' |
573 + | |
574 + | gencal(vis=evncopy, caltable=caltab, caltype='tsys', uniform=False) |
575 + | |
576 + | self.assertTrue(os.path.exists(caltab)) |
577 + | self.assertTrue(th.compTables(caltab, evncal, ['WEIGHT'])) |
578 + | |
579 + | def test_tsys_nan(self): |
580 + | ''' Test calibration table produced when gencal is run on an MS with a SYSCAL table that contains NaNs''' |
581 + | |
582 + | |
583 + | |
584 + | |
585 + | _tb.open(evncopy + '/SYSCAL', nomodify=False) |
586 + | tsys = _tb.getcol('TSYS') |
587 + | tsys = np.where(tsys < 0, float('nan'), tsys) |
588 + | _tb.putcol('TSYS', tsys) |
589 + | _tb.close() |
590 + | |
591 + | gencal(vis=evncopy, caltable=caltab, caltype='tsys', uniform=False) |
592 + | |
593 + | self.assertTrue(os.path.exists(caltab)) |
594 + | self.assertTrue(th.compTables(caltab, evncal, ['FPARAM', 'WEIGHT'])) |
595 + | |
596 + | |
597 + | class TestJyPerK(unittest.TestCase): |
598 + | """Tests specifying antenna-based calibration values with external resource. |
599 + | |
600 + | The caltype jyperk is a type of amplitude correction or 'amp'. In the process |
601 + | of specifycal() executed within gencal(), the values loaded from a csv file |
602 + | with factors or obtained from the Jy/K Web API are given as the 'parameter' |
603 + | argument. |
604 + | |
605 + | Details are as follows. |
606 + | https://open-jira.nrao.edu/browse/CAS-12236 |
607 + | """ |
608 + | |
609 + | vis = 'uid___A002_X85c183_X36f.ms' |
610 + | jyperk_factor_csv = os.path.join(datapath, 'jyperk_factor.csv') |
611 + | |
612 + | @classmethod |
613 + | def setUpClass(cls): |
614 + | cls.casa_cwd_path = os.getcwd() |
615 + | |
616 + | cls.working_directory = TestJyPerK._generate_uniq_fuse_name_in_cwd( |
617 + | prefix='working_directory_for_jyperk_') |
618 + | os.mkdir(cls.working_directory) |
619 + | os.chdir(cls.working_directory) |
620 + | |
621 + | original_vis = os.path.join(datapath, f'{cls.vis}.sel') |
622 + | shutil.copytree(original_vis, cls.vis, symlinks=False) |
623 + | |
624 + | @classmethod |
625 + | def tearDownClass(cls): |
626 + | os.chdir(cls.casa_cwd_path) |
627 + | shutil.rmtree(cls.working_directory) |
628 + | |
629 + | def setUp(self): |
630 + | |
631 + | self.caltable = TestJyPerK._generate_uniq_fuse_name_in_cwd( |
632 + | prefix='generated_caltable_', suffix='.cal') |
633 + | |
634 + | def tearDown(self): |
635 + | if os.path.isdir(self.caltable): |
636 + | shutil.rmtree(self.caltable) |
637 + | |
638 + | @staticmethod |
639 + | def _generate_uniq_fuse_name_in_cwd(prefix='', suffix=''): |
640 + | while True: |
641 + | fuse_name = f'{prefix}{str(uuid.uuid4())}{suffix}' |
642 + | if not os.path.isdir(fuse_name): |
643 + | return fuse_name |
644 + | |
645 + | @classmethod |
646 + | @contextlib.contextmanager |
647 + | def _generate_jyperk_file_xxyy(cls, infile): |
648 + | with open(infile, 'r') as f: |
649 + | lines = map(lambda x: x.rstrip('\n'), f) |
650 + | header = next(lines) |
651 + | factors = list(filter( |
652 + | lambda x: x.startswith(cls.vis), |
653 + | lines |
654 + | )) |
655 + | |
656 + | with tempfile.NamedTemporaryFile() as f: |
657 + | |
658 + | f.write(f'{header}\n'.encode()) |
659 + | for line in factors: |
660 + | items = line.split(',') |
661 + | factor_org = float(items[-1]) |
662 + | |
663 + | |
664 + | for factor, pol in zip([factor_org, factor_org * 4], ['XX', 'YY']): |
665 + | _items = items[:-2] + [pol, str(factor)] |
666 + | f.write(f'{",".join(_items)}\n'.encode()) |
667 + | f.flush() |
668 + | yield f.name |
669 + | |
670 + | def _read_cparam_as_real(self, name): |
671 + | tb = table() |
672 + | tb.open(name) |
673 + | try: |
674 + | paramlist = tb.getcol('CPARAM').real |
675 + | finally: |
676 + | tb.close() |
677 + | return paramlist[0, 0], paramlist[1, 0] |
678 + | |
679 + | def _load_jyperkdb_responses(self, test_data): |
680 + | responses = {} |
681 + | with open(test_data) as f: |
682 + | reader = csv.reader(f, delimiter='\t') |
683 + | for row in reader: |
684 + | responses[row[0]] = row[1] |
685 + | return responses |
686 + | |
687 + | @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') |
688 + | def test_jyperk_gencal_for_web_api_error(self, mock_retrieve): |
689 + | """Test to check that the factors from the web API are applied to the caltable. |
690 + | |
691 + | The following arguments are required for this test. |
692 + | * caltype='jyperk' |
693 + | * endpoint='asdm' |
694 + | """ |
695 + | error_message = "expected error" |
696 + | |
697 + | def get_response(url): |
698 + | |
699 + | response = '{"success": false, "error": "%s"}' % (error_message) |
700 + | return response |
701 + | |
702 + | mock_retrieve.side_effect = get_response |
703 + | |
704 + | with self.assertRaisesRegex(RuntimeError, f'Failed to get Jy/K factors from DB: {error_message}'): |
705 + | gencal(vis=self.vis, |
706 + | caltable=self.caltable, |
707 + | caltype='jyperk', |
708 + | endpoint='asdm', |
709 + | uniform=False) |
710 + | |
711 + | self.assertTrue(mock_retrieve.called) |
712 + | |
713 + | @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') |
714 + | def test_jyperk_gencal_for_asdm_web_api(self, mock_retrieve): |
715 + | """Test to check that the factors from the web API are applied to the caltable. |
716 + | |
717 + | The following arguments are required for this test. |
718 + | * caltype='jyperk' |
719 + | * endpoint='asdm' |
720 + | """ |
721 + | def get_response(url): |
722 + | return responses[url] |
723 + | |
724 + | responses = self._load_jyperkdb_responses( |
725 + | os.path.join(datapath, 'jyperk_web_api_response/asdm.csv')) |
726 + | mock_retrieve.side_effect = get_response |
727 + | |
728 + | gencal(vis=self.vis, |
729 + | caltable=self.caltable, |
730 + | caltype='jyperk', |
731 + | endpoint='asdm', |
732 + | uniform=False) |
733 + | |
734 + | self.assertTrue(os.path.exists(self.caltable)) |
735 + | |
736 + | reference_caltable = os.path.join( |
737 + | datapath, 'jyperk_reference/web_api_with_asdm.cal') |
738 + | self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) |
739 + | self.assertTrue(mock_retrieve.called) |
740 + | |
741 + | @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') |
742 + | def test_jyperk_gencal_for_model_fit_web_api(self, mock_retrieve): |
743 + | """Test to check that the factors from the web API are applied to the caltable. |
744 + | |
745 + | The following arguments are required for this test. |
746 + | * caltype='jyperk' |
747 + | * endpoint='model-fit' |
748 + | """ |
749 + | def get_response(url): |
750 + | return responses[url] |
751 + | |
752 + | responses = self._load_jyperkdb_responses( |
753 + | os.path.join(datapath, 'jyperk_web_api_response/model-fit.csv')) |
754 + | mock_retrieve.side_effect = get_response |
755 + | |
756 + | gencal(vis=self.vis, |
757 + | caltable=self.caltable, |
758 + | caltype='jyperk', |
759 + | endpoint='model-fit', |
760 + | uniform=False) |
761 + | |
762 + | self.assertTrue(os.path.exists(self.caltable)) |
763 + | |
764 + | reference_caltable = os.path.join( |
765 + | datapath, 'jyperk_reference/web_api_with_model_fit.cal') |
766 + | self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) |
767 + | self.assertTrue(mock_retrieve.called) |
768 + | |
769 + | @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') |
770 + | def test_jyperk_gencal_for_interpolation_web_api(self, mock_retrieve): |
771 + | """Test to check that the factors from the web API are applied to the caltable. |
772 + | |
773 + | The following arguments are required for this test. |
774 + | * caltype='jyperk' |
775 + | * endpoint='interpolation' |
776 + | """ |
777 + | def get_response(url): |
778 + | return responses[url] |
779 + | |
780 + | responses = self._load_jyperkdb_responses( |
781 + | os.path.join(datapath, 'jyperk_web_api_response/interpolation.csv')) |
782 + | mock_retrieve.side_effect = get_response |
783 + | |
784 + | gencal(vis=self.vis, |
785 + | caltable=self.caltable, |
786 + | caltype='jyperk', |
787 + | endpoint='interpolation', |
788 + | uniform=False) |
789 + | |
790 + | self.assertTrue(os.path.exists(self.caltable)) |
791 + | |
792 + | reference_caltable = os.path.join( |
793 + | datapath, 'jyperk_reference/web_api_with_interpolation.cal') |
794 + | self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) |
795 + | self.assertTrue(mock_retrieve.called) |
796 + | |
797 + | def test_jyperk_gencal_for_factor_file(self): |
798 + | """Test to check that the factors in the csv file are applied to the caltable. |
799 + | |
800 + | The following arguments are required for this test. |
801 + | * caltype='jyperk' |
802 + | * infile |
803 + | """ |
804 + | gencal(vis=self.vis, |
805 + | caltable=self.caltable, |
806 + | caltype='jyperk', |
807 + | infile=self.jyperk_factor_csv, |
808 + | uniform=False) |
809 + | |
810 + | self.assertTrue(os.path.exists(self.caltable)) |
811 + | |
812 + | reference_caltable = os.path.join( |
813 + | datapath, 'jyperk_reference/factor_file.cal') |
814 + | self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) |
815 + | |
816 + | reference = \ |
817 + | np.array([1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., |
818 + | 1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., |
819 + | 0.13882191479206085,0.13882191479206085,0.13882191479206085, |
820 + | 1.,1.,1.,0.13728643953800201,0.13728643953800201,0.13728643953800201, |
821 + | 1.,1.,1.,0.13593915104866028,0.13593915104866028,0.13593915104866028, |
822 + | 1.,1.,1.,0.13782501220703125,0.13782501220703125,0.13782501220703125, |
823 + | 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]) |
824 + | |
825 + | p1, p2 = self._read_cparam_as_real(self.caltable) |
826 + | self.assertTrue(np.allclose(reference, p1)) |
827 + | self.assertTrue(np.allclose(reference, p2)) |
828 + | |
829 + | def test_jyperk_gencal_for_factor_file_xxyy(self): |
830 + | """Test to check that the factors in the csv file are applied to the caltable. |
831 + | |
832 + | The following arguments are required for this test. |
833 + | * caltype='jyperk' |
834 + | * infile |
835 + | """ |
836 + | with self._generate_jyperk_file_xxyy(self.jyperk_factor_csv) as temp_csv: |
837 + | |
838 + | |
839 + | |
840 + | |
841 + | gencal(vis=self.vis, |
842 + | caltable=self.caltable, |
843 + | caltype='jyperk', |
844 + | infile=temp_csv, |
845 + | uniform=False) |
846 + | |
847 + | self.assertTrue(os.path.exists(self.caltable)) |
848 + | |
849 + | reference_caltable = os.path.join( |
850 + | datapath, 'jyperk_reference/factor_file.cal') |
851 + | self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT', 'CPARAM'])) |
852 + | |
853 + | |
854 + | reference_xx = \ |
855 + | np.array([1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., |
856 + | 1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., |
857 + | 0.13882191479206085,0.13882191479206085,0.13882191479206085, |
858 + | 1.,1.,1.,0.13728643953800201,0.13728643953800201,0.13728643953800201, |
859 + | 1.,1.,1.,0.13593915104866028,0.13593915104866028,0.13593915104866028, |
860 + | 1.,1.,1.,0.13782501220703125,0.13782501220703125,0.13782501220703125, |
861 + | 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]) |
862 + | |
863 + | reference_yy = np.where( |
864 + | reference_xx < 1.0, reference_xx / 2, 1.0 |
865 + | ) |
866 + | |
867 + | p1, p2 = self._read_cparam_as_real(self.caltable) |
868 + | self.assertTrue(np.allclose(reference_xx, p1)) |
869 + | self.assertTrue(np.allclose(reference_yy, p2)) |
870 + | |
871 + | def test_not_vis_name_in_factor_csv(self): |
872 + | """Test to check a caltable does not been generated when there are not vis name in the factor csv file. |
873 + | """ |
874 + | vis = 'non-existent-observation.ms' |
875 + | if not os.path.isfile(vis): |
876 + | os.symlink(self.vis, vis) |
877 + | |
878 + | with self.assertRaises(Exception) as cm: |
879 + | gencal(vis=vis, |
880 + | caltable=self.caltable, |
881 + | caltype='jyperk', |
882 + | infile=self.jyperk_factor_csv, |
883 + | uniform=False) |
884 + | |
885 + | self.assertEqual(cm.exception.args[0], 'There is no factor.') |
886 + | |
887 + | def test_infile_is_incorrect_type(self): |
888 + | """Test to check for ejecting raise when infile is incorrect type.""" |
889 + | from casatasks.private.task_gencal import gencal as private_gencal |
890 + | |
891 + | with self.assertRaises(Exception) as cm: |
892 + | private_gencal(vis=self.vis, |
893 + | caltable=self.caltable, |
894 + | caltype='jyperk', |
895 + | infile=[self.jyperk_factor_csv], |
896 + | uniform=False) |
897 + | |
898 + | self.assertEqual(cm.exception.args[0], 'The infile argument should be str or None.') |
899 + | |
900 + | class gencal_eoptest(unittest.TestCase): |
901 + | |
902 + | usno_finals_erp = os.path.join(datapath, 'usno_finals.erp') |
903 + | eopc04_IAU2000 = os.path.join(datapath, 'eopc04_IAU2000.62-now') |
904 + | |
905 + | @classmethod |
906 + | def setUpClass(cls): |
907 + | shutil.copytree(os.path.join(datapath, evndata), evncopy) |
908 + | shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) |
909 + | |
910 + | def setUp(self): |
911 + | pass |
912 + | |
913 + | def tearDown(self): |
914 + | rmtables(caltab) |
915 + | |
916 + | @classmethod |
917 + | def tearDownClass(cls): |
918 + | shutil.rmtree(evncopy) |
919 + | shutil.rmtree(vlbacopy) |
920 + | |
921 + | def test_eop(self): |
922 + | """Test calibration table produced when gencal is run on an MS |
923 + | with an EARTH_ORIENTATION table.""" |
924 + | |
925 + | gencal(vis=vlbacopy, caltable=caltab, caltype='eop') |
926 + | |
927 + | self.assertTrue(os.path.exists(caltab)) |
928 + | |
929 + | |
930 + | reference = os.path.join(datapath, 'ba123a_casa.eop') |
931 + | self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) |
932 + | |
933 + | def test_eop_usno(self): |
934 + | """Test calibration table produced when gencal is run using an |
935 + | external file.""" |
936 + | |
937 + | gencal(vis=vlbacopy, caltable=caltab, caltype='eop', |
938 + | infile=self.usno_finals_erp) |
939 + | |
940 + | self.assertTrue(os.path.exists(caltab)) |
941 + | |
942 + | |
943 + | reference = os.path.join(datapath, 'ba123a_usno.eop') |
944 + | self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) |
945 + | |
946 + | def test_eop_iers(self): |
947 + | """Test calibration table produced when gencal is run using an |
948 + | external file.""" |
949 + | |
950 + | gencal(vis=vlbacopy, caltable=caltab, caltype='eop', |
951 + | infile=self.eopc04_IAU2000) |
952 + | |
953 + | self.assertTrue(os.path.exists(caltab)) |
954 + | |
955 + | |
956 + | reference = os.path.join(datapath, 'ba123a_iers.eop') |
957 + | self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) |
958 + | |
959 + | def test_noeop(self): |
960 + | """Test that no calibration table is produced when gencal is run on an |
961 + | MS without an EARTH_ORIENTATION table. |
962 + | |
963 + | """ |
964 + | |
965 + | try: |
966 + | gencal(vis=evncopy, caltable=caltab, caltype='eop') |
967 + | except: |
968 + | pass |
969 + | |
970 + | self.assertFalse(os.path.exists(caltab)) |
971 + | |
972 + | |
973 + | if __name__ == '__main__': |
974 + | unittest.main() |