######################################################################### # test_task_gencal.py # Copyright (C) 2018 # Associated Universities, Inc. Washington DC, USA. # # This script is free software; you can redistribute it and/or modify it # under the terms of the GNU Library General Public License as published by # the Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This library is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public # License for more details. # # # Based on the requirements listed in casadocs found here: # https://casadocs.readthedocs.io/en/stable/api/tt/casatasks.calibration.gencal.html # ########################################################################## import contextlib import csv import os import shutil import tempfile import unittest from unittest.mock import patch import uuid import numpy as np from casatestutils import testhelper as th from casatasks import gencal, rmtables from casatasks.private import tec_maps from casatools import ctsys, table _tb = table() datapath = ctsys.resolve('/unittest/gencal/') # input data evndata = 'n08c1.ms' vlbadata = 'ba123a.ms' swpowdata = '3C286_syspower_CAS-11860.ms' vlbacal = os.path.join(datapath, 'ba123a.gc') evncal = os.path.join(datapath, 'n08c1.tsys') caltab = 'cal.A' evncopy = 'evn_copy.ms' vlbacopy = 'vlba_copy.ms' swpowcopy = 'swpow_copy.ms' ''' Unit tests for gencal ''' # # ToDo: # add more tests # once more independent tests (e.g. comparison # the AIPS REWAY results) add reference mses # and do tests against them # # Pick up alternative data directory to run tests on MMSs testmms = False if 'TEST_DATADIR' in os.environ: DATADIR = str(os.environ.get('TEST_DATADIR'))+'/gencal/' if os.path.isdir(DATADIR): testmms = True datapath = DATADIR else: print('WARN: directory '+DATADIR+' does not exist') print('gencal tests will use data from ' + datapath) class gencal_antpostest(unittest.TestCase): # Input and output names msfile = 'tdem0003gencal.ms' # used for test_antpos_auto_evla_CAS13057 msfile2 = 'auto_antposcorr_evla_gencal.ms' # if testmms: # msfile = 'tdem0003gencal.mms' caltable = 'anpos.cal' reffile1 = os.path.join(datapath+'evla_reference/', 'anpos.manual.cal') reffile2 = os.path.join(datapath+'evla_reference/', 'anpos.auto.cal') reffile3 = os.path.join(datapath+'evla_reference/', 'anpos.autoCAS13057.cal') res = False def setUp(self): if (os.path.exists(self.msfile)): shutil.rmtree(self.msfile) if (os.path.exists(self.msfile2)): shutil.rmtree(self.msfile2) shutil.copytree(os.path.join(datapath, self.msfile), self.msfile, symlinks=True) shutil.copytree(os.path.join(datapath, self.msfile2), self.msfile2, symlinks=True) def tearDown(self): if (os.path.exists(self.msfile)): shutil.rmtree(self.msfile) if (os.path.exists(self.msfile2)): shutil.rmtree(self.msfile2) shutil.rmtree(self.caltable, ignore_errors=True) def test_antpos_manual(self): """Test manual antenna position correction.""" gencal(vis=self.msfile, caltable=self.caltable, caltype='antpos', antenna='ea12,ea22', parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190]) self.assertTrue(os.path.exists(self.caltable)) # ToDo:check generated caltable. Wait for new caltable # Compare with reference file from the repository reference = self.reffile1 self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID'])) def test_antpos_auto_evla(self): """Test automated antenna position correction.""" # check if the URL is reachable from urllib.request import urlopen from urllib.error import URLError # current EVLA baseline correction URL evlabslncorrURL = "http://www.vla.nrao.edu/cgi-bin/evlais_blines.cgi?Year=" try: urlaccess = urlopen(evlabslncorrURL+"2010", timeout=60.0) gencal(vis=self.msfile, caltable=self.caltable, caltype='antpos', antenna='') self.assertTrue(os.path.exists(self.caltable)) # ToDo: check for generated caltable # Compare with reference file from the repository reference = self.reffile2 self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID'])) except URLError as err: print("Cannot access %s , skip this test" % evlabslncorrURL) self.res = True def test_antpos_auto_evla_CAS13057(self): """ gencal: test a bugfix of CAS-13057 for automated antenna position correction """ # check if the URL is reachable from urllib.request import urlopen from urllib.error import URLError # current EVLA baseline correction URL evlabslncorrURL = "http://www.vla.nrao.edu/cgi-bin/evlais_blines.cgi?Year=" try: urlaccess = urlopen(evlabslncorrURL+"2019", timeout=60.0) gencal(vis=self.msfile2, caltable=self.caltable, caltype='antpos', antenna='') self.assertTrue(os.path.exists(self.caltable)) # Compare with reference file from the repository # CAS-13940 - as the correction values are accumulated, running the test at later time # may cause the values to deviate from the time of reference caltable generation. # For this specific data set, with antenna 28 on the same pad for a long timespan such # situation can occur. So here do the comparison of the caltables skipping the correction reference = self.reffile3 self.assertTrue(th.compTables(self.caltable, reference, ['WEIGHT', 'OBSERVATION_ID', 'FPARAM'])) # now just compare antennas 23 and 25 entries for FPARAM... # row 21 (=ant 23) and 23 (= ant 25) _tb.open(self.caltable) curfparam=_tb.getcol('FPARAM').transpose() _tb.close() _tb.open(reference) reffparam=_tb.getcol('FPARAM').transpose() _tb.close() self.assertTrue((curfparam[21]==reffparam[21]).all()) self.assertTrue((curfparam[23]==reffparam[23]).all()) except URLError as err: print("Cannot access %s , skip this test" % evlabslncorrURL) self.res = True def test_antpos_manual_time_limit_evla(self): """ gencal: test if time limit sets cutoff date for antpos corrections """ # Mechanical test if time limit functions as expected, very short limit gencal(vis=self.msfile2, caltable=self.caltable, caltype='antpos', ant_pos_time_limit=400) _tb.open(self.caltable) res = np.mean(_tb.getcol('FPARAM')) _tb.close() self.assertTrue(np.isclose(res, -1.2345658040341035e-06, atol=1e-5)) shutil.rmtree(self.caltable) # Test again with no time limit/ ant_pos_time_limit = 0 gencal(vis=self.msfile2, caltable=self.caltable, caltype='antpos', ant_pos_time_limit=0) _tb.open(self.caltable) res = np.mean(_tb.getcol('FPARAM')) _tb.close() self.assertTrue(np.isclose(res, -5.308641580703818e-05, atol=1e-5)) class test_gencal_antpos_alma(unittest.TestCase): """Tests the automatic generation of antenna position corrections for ALMA. New REST web service: https://bitbucket.sco.alma.cl/projects/ALMA/repos/almasw/browse/CONTROL-SERVICES/PositionsService Old SOAP web service: http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl Example minimalistic use of a client to query the service: from suds.client import Client srv_wsdl_url = 'http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl' ws_cli = Client(srv_wsdl_url) resp = ws_cli.service.getAntennaPositions("CURRENT.AOS", "DA49", "2017-01-30T01:53:54") """ # setup of the ALMA TMC DB AntennaPadService ALMA_SRV_WSDL_URL = 'http://asa.alma.cl/axis2/services/TMCDBAntennaPadService?wsdl' # For this MS, there is position information for 25 out of the 29 antennas # (at 2013-11-15T10:26:19) ALMA_MS = 'uid___A002_X72c4aa_X8f5_scan21_spw18_field2_corrXX.ms' CAL_TYPE = 'antpos' REF_CALTABLE_MANUAL = os.path.join(datapath, 'alma_reference/A002_X72c4aa_ref_ant_pos.manual.cal') REF_CALTABLE_AUTO = os.path.join(datapath, 'alma_reference/A002_X72c4aa_ref_ant_pos.auto.cal') IGNORE_COLS = ['WEIGHT', 'OBSERVATION_ID'] def setUp(self): if (os.path.exists(self.ALMA_MS)): shutil.rmtree(self.ALMA_MS) shutil.copytree(os.path.join(datapath, self.ALMA_MS), self.ALMA_MS, symlinks=True) def tearDown(self): if (os.path.exists(self.ALMA_MS)): shutil.rmtree(self.ALMA_MS) def remove_caltable(self, ct_name): """ Removes a cal table. ct_name: path to the caltable """ import shutil shutil.rmtree(ct_name) def test_antpos_alma_manual(self): """ gencal: manual antenna position correction on ALMA table """ out_caltable = 'ant_pos_man.cal' gencal(vis=self.ALMA_MS, caltable=out_caltable, caltype=self.CAL_TYPE, antenna='DV07,DV10,DV11', parameter=[-0.0072, 0.0045, -0.0017, -0.0220, 0.0040, -0.0190]) self.assertTrue(os.path.exists(out_caltable), "The output cal table should have been created") # Compare against ref file self.assertTrue(th.compTables(out_caltable, self.REF_CALTABLE_MANUAL, self.IGNORE_COLS)) self.remove_caltable(out_caltable) @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' 'TMCDB based auto correction in gencal is validated.') def tmp_disabled_test_antpos_alma_server_SOAP_methods(self): """ gencal: connection to alma TCM DB AntennaPadService for ALMA """ try: # these imports don't work in CASA6 - test is being skipped so not important import urllib2 from suds.client import Client ws_cli = Client(self.ALMA_SRV_WSDL_URL) # Basic check that the schema has the minimum requirement method_name = 'getAntennaPositions' self.assertTrue(callable(getattr(ws_cli.service, method_name)), 'The client service should have this method: {}, and ' 'it should be callable.'.format(method_name)) except ImportError as exc: print('Cannot import required dependencies to query the ALMA TCM DB web service') raise except urllib2.URLError as exc: print('Connection/network error while querying the ALMA TCM DB web service') raise @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' 'TMCDB based auto correction in gencal is validated.') def tmp_disabled_test_antpos_auto_alma_SOAP_empty_query(self): """ gencal: empty query (empty antennas list) to the (old) SOAP TCMDB AntennaPadService web service (ALMA) """ try: import correct_ant_posns_alma as almacor resp = almacor.query_tmcdb_antennas_rest([], '2017-01-01T16:53:54.000') if resp: raise RuntimeError('Unexpected response for an empty query: {0}'. format(resp)) except ImportError: print('Cannot import required dependencies to query the ALMA TCM DB web service') raise except urllib2.URLError as exc: print('Connection/network error while querying the ALMA TCM DB web service') raise @unittest.skip('SOAP AntennaPad Positions SOAP service needs to be removed once the ' 'TMCDB based auto correction in gencal is validated.') def tmp_disabled_test_antpos_auto_web_srv_SOAP_alma(self): """ gencal: auto gencal using data from TCM DB AntennaPadService (ALMA) """ import urllib2 out_caltable = 'ant_pos_web_srv.cal' try: # This will import the required libraries, urllib2, suds, etc. # Coul also use additional parameters: antenna='', parameter='' gencal(vis=self.ALMA_MS, caltable=out_caltable, caltype=self.CAL_TYPE) except ImportError: print('Cannot import required dependencies to query the ALMA TCM DB web service') raise except urllib2.URLError: print('Connection/network error while querying the ALMA TCM DB web service') raise self.assertTrue(os.path.exists(out_caltable), "The output cal table should have been created: {0}". format(out_caltable)) # Compare against ref file self.assertTrue(th.compTables(out_caltable, self.REF_CALTABLE_AUTO, self.IGNORE_COLS)) self.remove_caltable(out_caltable) @unittest.skip('REST Position service needs validation and final deployment') def tmp_disabled_test_antpos_auto_alma_REST_empty_query(self): """ gencal: empty query (empty antennas list) to the (new) REST TCMDB Positions web service (ALMA) """ import urllib2 TEST_HOSTNAME = 'https://2018may.asa-test.alma.cl' hostname = TEST_HOSTNAME port = 443 api = 'antenna-position/position/antenna' try: import requests import correct_ant_posns_alma as almacor tstamp = '2017-01-01T16:53:54.000' # query via correct_ant_posns function resp = almacor.query_tmcdb_antennas_rest([], tstamp) if resp: raise RuntimeError('Unexpected response for an empty query: {0}'. format(resp)) # query directly via requests url = '{}:{}/{}?antenna={}×tamp={}'.format(hostname, port, api, '', '2017-01-01T16:53:54.000') resp = requests.get(url) if resp: raise RuntimeError('Unexpected response for an empty query: {0}'. format(resp)) except ImportError: print('Cannot import required dependencies to query the ALMA TCM DB web service') raise except urllib2.URLError as exc: print('Connection/network error while querying the ALMA TCM DB web service') raise @unittest.skip('REST Position service needs validation and final deployment') def tmp_disabled_test_antpos_auto_web_srv_REST_alma(self): """ gencal: auto gencal using data from TCMDB Positions service (ALMA) """ import urllib2 out_caltable = 'ant_pos_web_srv.cal' try: # This will import the required libraries, urllib2, suds, etc. # Coul also use additional parameters: antenna='', parameter='' gencal(vis=self.ALMA_MS, caltable=out_caltable, caltype=self.CAL_TYPE) except urllib2.URLError: print('Connection/network error while querying the ALMA TCMDB Positions web service') raise self.assertTrue(os.path.exists(out_caltable), "The output cal table should have been created: {0}". format(out_caltable)) # Compare against ref file self.assertTrue(th.compTables(out_caltable, self.REF_CALTABLE_AUTO, self.IGNORE_COLS)) self.remove_caltable(out_caltable) class gencal_test_tec_vla(unittest.TestCase): # Input and output names msfile = 'tdem0003gencal.ms' igsfile = 'igsg1160.10i' tecfile = msfile+'.IGS_TEC.im' rmstecfile = msfile+'.IGS_RMS_TEC.im' caltable = msfile+'_tec.cal' newigsfile='IGS0OPSFIN_20233350000_01D_02H_GIM.INX' # NEAL: Please check that these setUp and tearDown functions are ok def setUp(self): self.tearDown() shutil.copytree(os.path.join(datapath, self.msfile), self.msfile, symlinks=True) def tearDown(self): if os.path.exists(self.msfile): shutil.rmtree(self.msfile) if os.path.exists(self.igsfile): os.remove(self.igsfile) shutil.rmtree(self.tecfile, ignore_errors=True) shutil.rmtree(self.rmstecfile, ignore_errors=True) shutil.rmtree(self.caltable, ignore_errors=True) # this file is created by a successful test if os.path.exists(self.newigsfile): os.remove(self.newigsfile) def test_tec_maps(self): """ gencal: very basic test of tec_maps and gencal(caltype='tecim') """ try: tec_maps.create0(self.msfile) gencal(vis=self.msfile, caltable=self.caltable, caltype='tecim', infile=self.msfile+'.IGS_TEC.im') self.assertTrue(os.path.exists(self.caltable)) _tb.open(self.caltable) nrows = _tb.nrows() dtecu = abs(13.752-np.mean(_tb.getcol('FPARAM'))/1e16) _tb.close() # print(str(nrows)+' '+str(dtecu)) self.assertTrue(nrows == 1577) self.assertTrue(dtecu < 1e-3) # Test new CDDIS filename convention # (file with correct name is retrieved and uncompressed) # (CAS-14219, CAS-14192) # (tec_maps.create0 above tests the old filename convention) a=tec_maps.get_IGS_TEC('2023/12/01') self.assertTrue(os.path.exists(self.newigsfile)) self.assertTrue(a[9]=='IGS_Final_Product') except: # should catch case of internet access failure? raise class gencal_gaincurve_test(unittest.TestCase): @classmethod def setUpClass(cls): shutil.copytree(os.path.join(datapath, evndata), evncopy) shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) def setUp(self): pass def tearDown(self): rmtables(caltab) @classmethod def tearDownClass(cls): shutil.rmtree(evncopy) shutil.rmtree(vlbacopy) def test_gainCurve(self): ''' Test calibration table produced when gencal is run on an MS with a GAIN_CURVE table ''' gencal(vis=vlbacopy, caltable=caltab, caltype='gc') self.assertTrue(os.path.exists(caltab)) self.assertTrue(th.compTables(caltab, vlbacal, ['WEIGHT'])) def test_noGainCurve(self): ''' Test that when gencal is run on an MS with no GAIN_CURVE table it creates no calibration table ''' try: gencal(vis=evncopy, caltable=caltab, caltype='gc') except: pass self.assertFalse(os.path.exists(caltab)) class gencal_tsys_test(unittest.TestCase): @classmethod def setUpClass(cls): shutil.copytree(os.path.join(datapath, evndata), evncopy) shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) def setUp(self): pass def tearDown(self): rmtables(caltab) @classmethod def tearDownClass(cls): shutil.rmtree(evncopy) shutil.rmtree(vlbacopy) def test_tsys(self): ''' Test calibration table produced when gencal is run on an MS with a SYSCAL table''' gencal(vis=evncopy, caltable=caltab, caltype='tsys', uniform=False) self.assertTrue(os.path.exists(caltab)) self.assertTrue(th.compTables(caltab, evncal, ['WEIGHT'])) def test_tsys_nan(self): ''' Test calibration table produced when gencal is run on an MS with a SYSCAL table that contains NaNs''' # Change negative values in SYSCAL to NaNs. # This should result in the same calibration table entries # being flagged. _tb.open(evncopy + '/SYSCAL', nomodify=False) tsys = _tb.getcol('TSYS') tsys = np.where(tsys < 0, float('nan'), tsys) _tb.putcol('TSYS', tsys) _tb.close() gencal(vis=evncopy, caltable=caltab, caltype='tsys', uniform=False) self.assertTrue(os.path.exists(caltab)) self.assertTrue(th.compTables(caltab, evncal, ['FPARAM', 'WEIGHT'])) class TestJyPerK(unittest.TestCase): """Tests specifying antenna-based calibration values with external resource. The caltype jyperk is a type of amplitude correction or 'amp'. In the process of specifycal() executed within gencal(), the values loaded from a csv file with factors or obtained from the Jy/K Web API are given as the 'parameter' argument. Details are as follows. https://open-jira.nrao.edu/browse/CAS-12236 """ vis = 'uid___A002_X85c183_X36f.ms' jyperk_factor_csv = os.path.join(datapath, 'jyperk_factor.csv') @classmethod def setUpClass(cls): cls.casa_cwd_path = os.getcwd() cls.working_directory = TestJyPerK._generate_uniq_fuse_name_in_cwd( prefix='working_directory_for_jyperk_') os.mkdir(cls.working_directory) os.chdir(cls.working_directory) original_vis = os.path.join(datapath, f'{cls.vis}.sel') shutil.copytree(original_vis, cls.vis, symlinks=False) @classmethod def tearDownClass(cls): os.chdir(cls.casa_cwd_path) shutil.rmtree(cls.working_directory) def setUp(self): # The caltable is generated by each gencal task. self.caltable = TestJyPerK._generate_uniq_fuse_name_in_cwd( prefix='generated_caltable_', suffix='.cal') def tearDown(self): if os.path.isdir(self.caltable): shutil.rmtree(self.caltable) @staticmethod def _generate_uniq_fuse_name_in_cwd(prefix='', suffix=''): while True: fuse_name = f'{prefix}{str(uuid.uuid4())}{suffix}' if not os.path.isdir(fuse_name): return fuse_name @classmethod @contextlib.contextmanager def _generate_jyperk_file_xxyy(cls, infile): with open(infile, 'r') as f: lines = map(lambda x: x.rstrip('\n'), f) header = next(lines) factors = list(filter( lambda x: x.startswith(cls.vis), lines )) with tempfile.NamedTemporaryFile() as f: # editing file here f.write(f'{header}\n'.encode()) for line in factors: items = line.split(',') factor_org = float(items[-1]) # factor for XX is original value while # factor for YY is 4 times original value for factor, pol in zip([factor_org, factor_org * 4], ['XX', 'YY']): _items = items[:-2] + [pol, str(factor)] f.write(f'{",".join(_items)}\n'.encode()) f.flush() yield f.name def _read_cparam_as_real(self, name): tb = table() tb.open(name) try: paramlist = tb.getcol('CPARAM').real finally: tb.close() return paramlist[0, 0], paramlist[1, 0] def _load_jyperkdb_responses(self, test_data): responses = {} with open(test_data) as f: reader = csv.reader(f, delimiter='\t') for row in reader: responses[row[0]] = row[1] return responses @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') def test_jyperk_gencal_for_web_api_error(self, mock_retrieve): """Test to check that the factors from the web API are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * endpoint='asdm' """ error_message = "expected error" def get_response(url): # return failed response response = '{"success": false, "error": "%s"}' % (error_message) return response mock_retrieve.side_effect = get_response with self.assertRaisesRegex(RuntimeError, f'Failed to get Jy/K factors from DB: {error_message}'): gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', endpoint='asdm', uniform=False) self.assertTrue(mock_retrieve.called) @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') def test_jyperk_gencal_for_asdm_web_api(self, mock_retrieve): """Test to check that the factors from the web API are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * endpoint='asdm' """ def get_response(url): return responses[url] responses = self._load_jyperkdb_responses( os.path.join(datapath, 'jyperk_web_api_response/asdm.csv')) mock_retrieve.side_effect = get_response gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', endpoint='asdm', uniform=False) self.assertTrue(os.path.exists(self.caltable)) reference_caltable = os.path.join( datapath, 'jyperk_reference/web_api_with_asdm.cal') self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) self.assertTrue(mock_retrieve.called) @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') def test_jyperk_gencal_for_model_fit_web_api(self, mock_retrieve): """Test to check that the factors from the web API are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * endpoint='model-fit' """ def get_response(url): return responses[url] responses = self._load_jyperkdb_responses( os.path.join(datapath, 'jyperk_web_api_response/model-fit.csv')) mock_retrieve.side_effect = get_response gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', endpoint='model-fit', uniform=False) self.assertTrue(os.path.exists(self.caltable)) reference_caltable = os.path.join( datapath, 'jyperk_reference/web_api_with_model_fit.cal') self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) self.assertTrue(mock_retrieve.called) @patch('casatasks.private.jyperk.JyPerKDatabaseClient._try_to_get_response') def test_jyperk_gencal_for_interpolation_web_api(self, mock_retrieve): """Test to check that the factors from the web API are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * endpoint='interpolation' """ def get_response(url): return responses[url] responses = self._load_jyperkdb_responses( os.path.join(datapath, 'jyperk_web_api_response/interpolation.csv')) mock_retrieve.side_effect = get_response gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', endpoint='interpolation', uniform=False) self.assertTrue(os.path.exists(self.caltable)) reference_caltable = os.path.join( datapath, 'jyperk_reference/web_api_with_interpolation.cal') self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) self.assertTrue(mock_retrieve.called) def test_jyperk_gencal_for_factor_file(self): """Test to check that the factors in the csv file are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * infile """ gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', infile=self.jyperk_factor_csv, uniform=False) self.assertTrue(os.path.exists(self.caltable)) reference_caltable = os.path.join( datapath, 'jyperk_reference/factor_file.cal') self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT'])) reference = \ np.array([1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., 0.13882191479206085,0.13882191479206085,0.13882191479206085, 1.,1.,1.,0.13728643953800201,0.13728643953800201,0.13728643953800201, 1.,1.,1.,0.13593915104866028,0.13593915104866028,0.13593915104866028, 1.,1.,1.,0.13782501220703125,0.13782501220703125,0.13782501220703125, 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]) p1, p2 = self._read_cparam_as_real(self.caltable) self.assertTrue(np.allclose(reference, p1)) self.assertTrue(np.allclose(reference, p2)) def test_jyperk_gencal_for_factor_file_xxyy(self): """Test to check that the factors in the csv file are applied to the caltable. The following arguments are required for this test. * caltype='jyperk' * infile """ with self._generate_jyperk_file_xxyy(self.jyperk_factor_csv) as temp_csv: # temp_csv should contain pol-dependent Jy/K factors # factors for XX is same as original factors for I while # factors for YY is 4 times original factors so that # CPARAM value becomes half of reference value gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', infile=temp_csv, uniform=False) self.assertTrue(os.path.exists(self.caltable)) reference_caltable = os.path.join( datapath, 'jyperk_reference/factor_file.cal') self.assertTrue(th.compTables(self.caltable, reference_caltable, ['WEIGHT', 'CPARAM'])) # reference_xx is same as "reference" in test_jyperk_gencal_for_factor_file reference_xx = \ np.array([1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1., 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1., 0.13882191479206085,0.13882191479206085,0.13882191479206085, 1.,1.,1.,0.13728643953800201,0.13728643953800201,0.13728643953800201, 1.,1.,1.,0.13593915104866028,0.13593915104866028,0.13593915104866028, 1.,1.,1.,0.13782501220703125,0.13782501220703125,0.13782501220703125, 1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.,1.]) # reference_yy is half of original reference value (except 1.0) reference_yy = np.where( reference_xx < 1.0, reference_xx / 2, 1.0 ) p1, p2 = self._read_cparam_as_real(self.caltable) self.assertTrue(np.allclose(reference_xx, p1)) self.assertTrue(np.allclose(reference_yy, p2)) def test_not_vis_name_in_factor_csv(self): """Test to check a caltable does not been generated when there are not vis name in the factor csv file. """ vis = 'non-existent-observation.ms' if not os.path.isfile(vis): os.symlink(self.vis, vis) with self.assertRaises(Exception) as cm: gencal(vis=vis, caltable=self.caltable, caltype='jyperk', infile=self.jyperk_factor_csv, uniform=False) self.assertEqual(cm.exception.args[0], 'There is no factor.') def test_infile_is_incorrect_type(self): """Test to check for ejecting raise when infile is incorrect type.""" from casatasks.private.task_gencal import gencal as private_gencal with self.assertRaises(Exception) as cm: private_gencal(vis=self.vis, caltable=self.caltable, caltype='jyperk', infile=[self.jyperk_factor_csv], uniform=False) self.assertEqual(cm.exception.args[0], 'The infile argument should be str or None.') class TestSwPow(unittest.TestCase): testcal = 'swpow.cal' def setUp(self): shutil.copytree(os.path.join(datapath,swpowdata), swpowcopy) def tearDown(self): if os.path.exists(swpowcopy): shutil.rmtree(swpowcopy) if os.path.exists(self.testcal): shutil.rmtree(self.testcal) def test_switched_power_weights_caltype(self): """Check that resulting caltable has all 1's for gains and non-trivial values for weight adjustment The following arguments are required for this test. * caltype='swpwts' """ gencal(vis=swpowcopy, caltable=self.testcal, caltype='swpwts') _tb.open(self.testcal) res = _tb.getcol('FPARAM') _tb.close() #self.assertTrue(np.all(res[0:1,:,:] == 1)) self.assertTrue(np.mean(res[1,:,:]) != 1) class gencal_eoptest(unittest.TestCase): usno_finals_erp = os.path.join(datapath, 'usno_finals.erp') eopc04_IAU2000 = os.path.join(datapath, 'eopc04_IAU2000.62-now') @classmethod def setUpClass(cls): shutil.copytree(os.path.join(datapath, evndata), evncopy) shutil.copytree(os.path.join(datapath, vlbadata), vlbacopy) def setUp(self): pass def tearDown(self): rmtables(caltab) @classmethod def tearDownClass(cls): shutil.rmtree(evncopy) shutil.rmtree(vlbacopy) def test_eop(self): """Test calibration table produced when gencal is run on an MS with an EARTH_ORIENTATION table.""" gencal(vis=vlbacopy, caltable=caltab, caltype='eop') self.assertTrue(os.path.exists(caltab)) # Compare with reference file from the repository reference = os.path.join(datapath, 'ba123a_casa.eop') self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) def test_eop_usno(self): """Test calibration table produced when gencal is run using an external file.""" gencal(vis=vlbacopy, caltable=caltab, caltype='eop', infile=self.usno_finals_erp) self.assertTrue(os.path.exists(caltab)) # Compare with reference file from the repository reference = os.path.join(datapath, 'ba123a_usno.eop') self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) def test_eop_iers(self): """Test calibration table produced when gencal is run using an external file.""" gencal(vis=vlbacopy, caltable=caltab, caltype='eop', infile=self.eopc04_IAU2000) self.assertTrue(os.path.exists(caltab)) # Compare with reference file from the repository reference = os.path.join(datapath, 'ba123a_iers.eop') self.assertTrue(th.compTables(caltab, reference, ['WEIGHT'], 0.002)) def test_noeop(self): """Test that no calibration table is produced when gencal is run on an MS without an EARTH_ORIENTATION table. """ try: gencal(vis=evncopy, caltable=caltab, caltype='eop') except: pass self.assertFalse(os.path.exists(caltab)) if __name__ == '__main__': unittest.main()