######################################################################### # test_task_flagcmd.py # Copyright (C) 2018 # Associated Universities, Inc. Washington DC, USA. # # This script is free software; you can redistribute it and/or modify it # under the terms of the GNU Library General Public License as published by # the Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This library is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public # License for more details. # # # Based on the requirements listed in casadocs found here: # https://casadocs.readthedocs.io/en/stable/api/tt/casatasks.flagging.flagcmd.html # ########################################################################## import shutil import unittest import os import filecmp from casatasks import flagcmd, flagdata, flagmanager from casatools import ctsys, agentflagger, table from collections import OrderedDict # Local copy of the agentflagger tool aflocal = agentflagger() # Path for data datapath = ctsys.resolve("unittest/flagcmd/") # # Test of flagcmd task. It uses flagdata to unflag and summary # def check_eq(result, total, flagged): print("%s of %s data was flagged, expected %s of %s" % \ (result['flagged'], result['total'], flagged, total)) assert result['total'] == total, \ "%s data in total; %s expected" % (result['total'], total) assert result['flagged'] == flagged, \ "%s flags set; %s expected" % (result['flagged'], flagged) def create_input(str_text,fname=''): '''Save the string in a text file''' if fname=='': inp = 'flagcmd.txt' else: inp=fname cmd = str_text # remove file first if os.path.exists(inp): os.system('rm -f '+ inp) # save it in a file with open(inp, 'w') as f: f.write(cmd) f.close() # return the name of the file return inp def create_input1(str_text, filename): '''Save the string in a text file''' inp = filename cmd = str_text # remove file first if os.path.exists(inp): os.system('rm -f '+ inp) # save to a file with open(inp, 'w') as f: f.write(cmd) f.close() return # Base class which defines setUp functions # for importing different data sets class test_base(unittest.TestCase): def setUp_ngc5921(self): self.vis = "ngc5921.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_multi(self): self.vis = "multiobs.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_alma_ms(self): '''ALMA MS, scan=1,8,10 spw=0~3 4,128,128,1 chans, I,XX,YY''' self.vis = "uid___A002_X30a93d_X43e_small.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_evla(self): self.vis = "tosr0001_scan3_noonline.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_shadowdata(self): self.vis = "shadowtest_part.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_data4rflag(self): self.vis = "Four_ants_3C286.ms" if os.path.exists(self.vis): print("The MS is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def setUp_bpass_case(self): self.vis = "cal.fewscans.bpass" if os.path.exists(self.vis): print("The CalTable is already around, just unflag") else: print("Moving data...") shutil.copytree(os.path.join(datapath,self.vis), self.vis) os.system('rm -rf ' + self.vis + '.flagversions') self.unflag_ms() def unflag_ms(self): aflocal.open(self.vis) aflocal.selectdata() agentUnflag={'apply':True,'mode':'unflag'} aflocal.parseagentparameters(agentUnflag) aflocal.init() aflocal.run(writeflags=True) aflocal.done() class test_manual(test_base): '''Test manual selections''' def setUp(self): self.setUp_multi() self.filename = '' def tearDown(self) -> None: if os.path.exists(self.filename): os.system('rm -rf '+self.filename) @classmethod def tearDownClass(cls) -> None: shutil.rmtree('multiobs.ms', ignore_errors=True) shutil.rmtree('ngc5921.ms', ignore_errors=True) shutil.rmtree('ngc5921.ms.flagversions', ignore_errors=True) def test_observation(self): myinput = "observation='1'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False, flagbackup=False) check_eq(flagdata(vis=self.vis, mode='summary'), 2882778, 28500) def test_compatibility(self): myinput = "observation='1' mode='manualflag'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False, flagbackup=False) check_eq(flagdata(vis=self.vis, mode='summary'), 2882778, 28500) def test_autocorr(self): '''flagcmd: autocorr=True''' self.setUp_ngc5921() flagcmd(vis=self.vis, inpmode='list', inpfile=['autocorr=True'], action='apply') res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 203994, 'Should flag only the auto-correlations') class test_alma(test_base): # Test various selections for alma data def setUp(self): # self.setUp_flagdatatest_alma() self.setUp_alma_ms() self.filename = '' def tearDown(self) -> None: if os.path.exists(self.filename): os.system('rm -rf '+self.filename) @classmethod def tearDownClass(cls) -> None: shutil.rmtree('uid___A002_X30a93d_X43e_small.ms', ignore_errors=True) def test_intent(self): '''flagcmd: test scan intent selection''' myinput = "intent='CAL*POINT*'\n"\ "#scan=3,4" self.filename = create_input(myinput) # flag POINTING CALIBRATION scans and ignore comment line flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False, flagbackup=False) # check_eq(flagdata(vis=self.vis,mode='summary', antenna='2'), 377280, 26200) res = flagdata(vis=self.vis,mode='summary') # self.assertEqual(res['scan']['1']['flagged'], 80184, 'Only scan 1 should be flagged') # self.assertEqual(res['scan']['4']['flagged'], 0, 'Scan 4 should not be flagged') self.assertEqual(res['scan']['1']['flagged'], 192416.0) def test_cmd(self): '''flagcmd: inpmode=list with empty parameter''' # Test the correct parsing with empty parameter such as antenna='' flagcmd(vis=self.vis, inpmode='list', inpfile=["intent='CAL*POINT*' field=''","scan='1,8' antenna=''","scan='10'"], action='apply', savepars=False, flagbackup=False) res = flagdata(vis=self.vis,mode='summary') self.assertEqual(res['scan']['1']['flagged'], 192416) self.assertEqual(res['scan']['8']['flagged'], 39096) self.assertEqual(res['scan']['10']['flagged'], 39096) self.assertEqual(res['flagged'], 270608) def test_extract(self): '''flagcmd: action = extract and apply clip on WVR''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Save cmd to FLAG_CMD cmd = "mode='clip' clipminmax=[0,50] correlation='ABS_WVR'" flagcmd(vis=self.vis, inpmode='list', inpfile=[cmd], action='list', savepars=True) # Extract it res = flagcmd(vis=self.vis, action='extract', useapplied=True) # Apply cmd to clip only WVR flagcmd(vis=self.vis, inpmode='list', inpfile=[res[0]['command']], savepars=False, action='apply',flagbackup=False) ret = flagdata(vis=self.vis, mode='summary') self.assertEqual(ret['flagged'], 498) self.assertEqual(ret['correlation']['I']['flagged'], 498) self.assertEqual(ret['correlation']['XX']['flagged'], 0) self.assertEqual(ret['correlation']['YY']['flagged'], 0) class test_unapply(test_base): # Action unapply def setUp(self): self.setUp_ngc5921() self.filename = '' def tearDown(self) -> None: if os.path.exists(self.filename): os.system('rm -rf '+self.filename) @classmethod def tearDownClass(cls) -> None: shutil.rmtree('ngc5921.ms', ignore_errors=True) shutil.rmtree('ngc5921.ms.flagversions', ignore_errors=True) def test_unsupported_unapply(self): '''flagcmd: raise exception from inpmode=list and unapply''' passes = False try: self.assertFalse(flagcmd( vis=self.vis, action='unapply', inpmode='list', inpfile=["spw='0' reason='MANUAL'"]) ) # CASA6 throws an except, CASA5 returns False # only CASA6 gets here, but that's OK passes = True except ValueError: # CASA6 gets here print('Expected error!') passes = True self.assertTrue(passes) def test_utfcrop(self): '''flagcmd: unapply tfcrop agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using manual agent myinput = "scan='1'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True) # Flag using tfcrop agent from file # Note : For this test, scan=4 gives identical flags on 32/64 bit machines, # and one flag difference on a Mac (32) # Other scans give differences at the 0.005% level. myinput = "scan='4' mode=tfcrop correlation='ABS_RR' extendflags=False" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) res = flagdata(vis=self.vis,mode='summary') self.assertEqual(res['scan']['1']['flagged'], 568134, 'Whole scan=1 should be flagged') #self.assertEqual(res['scan']['4']['flagged'], 1201, 'scan=4 should be partially flagged') self.assertTrue(res['scan']['4']['flagged']>= 1200 and res['scan']['4']['flagged']<= 1204, \ 'scan=4 should be partially flagged') # Unapply only the tfcrop line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=1, savepars=False) result = flagdata(vis=self.vis,mode='summary',scan='4') self.assertEqual(result['flagged'], 0, 'Expected 0 flags, found %s'%result['flagged']) self.assertEqual(result['total'], 95256,'Expected total 95256, found %s'%result['total']) def test_unapply_tfcrop_and_unset_flagrow(self): '''flagcmd: Check that FLAG_ROW is unset after un-applying an tfcrop agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using manual agent myinput = "scan='4'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Flag using tfcrop agent from file myinput = "scan='4' mode=tfcrop correlation='ABS_RR' extendflags=False" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Unapply only the tfcrop line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=False) # Check FLAG_ROW is now all set to false mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), 0) def test_unapply_rflag_and_unset_flagrow(self): '''flagcmd: Check that FLAG_ROW is unset after un-applying an rflag agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using manual agent myinput = "scan='4'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Flag using tfcrop agent from file myinput = "scan='4' mode=rflag " self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Unapply only the tfcrop line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=False) # Check FLAG_ROW is now all set to false mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), 0) def test_unapply_clip_and_unset_flagrow(self): '''flagcmd: Check that FLAG_ROW is unset after un-applying a clip agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using manual agent myinput = "scan='4'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Flag using clip agent from file myinput = "scan='4' mode=clip " self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) # Check FLAG_ROW is all set to true mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), FLAG_ROW.size) # Unapply only the clip line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=False) # Check FLAG_ROW is now all set to false mytb = table() mytb.open(self.vis) selectedtb = mytb.query('SCAN_NUMBER in [4]') FLAG_ROW = selectedtb.getcol('FLAG_ROW') mytb.close() selectedtb.close() self.assertEqual(FLAG_ROW.sum(), 0) def test_unapply_antint(self): '''flagcmd: unapply antint agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) ant_name = 'VA01' # Flag with antint in_cmd = "antint_ref_antenna={0} spw='0' minchanfrac=0.8".format(ant_name) self.filename = create_input(in_cmd) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) res = flagdata(vis=self.vis, mode='summary') antint_flagged = res['antenna'][ant_name]['flagged'] # Flag using manual agent in_manual = "scan='1'" self.filename = create_input(in_manual) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) result = flagdata(vis=self.vis,mode='summary') man_flagged = result['antenna'][ant_name]['flagged'] # Unapply only the antint line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=True) unapplied_res = flagdata(vis=self.vis, mode='summary') unapplied_flagged = unapplied_res['antenna'][ant_name]['flagged'] self.assertEqual(unapplied_res['total'], 2854278) self.assertEqual(unapplied_res['flagged'], 0) self.assertEqual(man_flagged, 203994) self.assertEqual(antint_flagged, 203994) self.assertEqual(unapplied_flagged, 0) self.assertEqual(unapplied_res['antenna'][ant_name]['flagged'], 0) def test_uquack(self): '''flagcmd: unapply quack agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using the quack agent myinput = "scan='1~3' mode=quack quackinterval=1.0" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) result = flagdata(vis=self.vis,mode='summary') quack_flags = result['scan']['1']['flagged'] # Flag using manual agent myinput = "scan='1'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) result = flagdata(vis=self.vis,mode='summary') scan1_flags = result['scan']['1']['flagged'] # Unapply only the quack line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=True) result = flagdata(vis=self.vis,mode='summary') manual_flags = result['scan']['1']['flagged'] # Only the manual flags should be there # self.assertEqual(result['flagged'], 568134, 'Expected 568134 flags, found %s'%result['flagged']) # self.assertEqual(result['total'], 568134,'Expected total 568134, found %s'%result['total']) # CAS-5377. New unapply action self.assertEqual(result['scan']['3']['flagged'], 0) self.assertEqual(manual_flags,scan1_flags - quack_flags) def test_umanualflag(self): '''flagcmd: unapply manual agent''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag using manual agent myinput = "scan='1'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) # Flag using the quack agent myinput = "scan='1~3' mode=quack quackinterval=1.0" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=True, flagbackup=False) # Unapply the manual line flagcmd(vis=self.vis, action='unapply', useapplied=True, tablerows=0, savepars=False) result = flagdata(vis=self.vis,mode='summary',scan='1,2,3') # scan 1 should be fully unflagged self.assertEqual(result['scan']['1']['flagged'], 0) self.assertEqual(result['scan']['2']['flagged'], 47628) self.assertEqual(result['scan']['3']['flagged'], 47628) self.assertEqual(result['flagged'], 47628+47628) def test_uscans(self): '''flagcmd: Unapply only APPLIED=True''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Flag several scans and save them to FLAG_CMD with APPLIED=True flagdata(vis=self.vis, scan='7', savepars=True, flagbackup=False) flagdata(vis=self.vis, scan='1', savepars=True, flagbackup=False) flagdata(vis=self.vis, scan='2', savepars=True, flagbackup=False) flagdata(vis=self.vis, scan='3', savepars=True, flagbackup=False) flagdata(vis=self.vis, scan='4', savepars=True, flagbackup=False) # There should be 5 cmds in FLAG_CMD. Unapply row=1 and set APPLIED to False flagcmd(vis=self.vis, action='unapply', tablerows=1, useapplied=True) # Unapply scans 2 and 3 only. It should not re-apply scan=1 (row 1) flagcmd(vis=self.vis, action='unapply', tablerows=[2,3], useapplied=True) # We should have left only scans 4 and 7 flagged. res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['scan']['1']['flagged'], 0, "It should not re-apply tablerows=1") self.assertEqual(res['scan']['4']['flagged'], 95256, "It should not unapply tablerows=4") self.assertEqual(res['scan']['7']['flagged'], 190512, "It should not unapply tablerows=7") self.assertEqual(res['flagged'], 285768) class test_savepars(test_base): # Action apply, with savepars=True/False def setUp(self): self.setUp_ngc5921() self.filename = '' def tearDown(self) -> None: if os.path.exists(self.filename): os.system('rm -rf '+self.filename) @classmethod def tearDownClass(cls) -> None: shutil.rmtree('ngc5921.ms', ignore_errors=True) shutil.rmtree('ngc5921.ms.flagversions', ignore_errors=True) if os.path.exists('myflags.txt'): os.system('rm -rf myflags.txt') if os.path.exists('newfile.txt'): os.system('rm -rf newfile.txt') if os.path.exists('filename1.txt'): os.system('rm -rf filename1.txt') def test_list1(self): '''flagcmd: list and savepars=True/False''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) ########## TEST 1 # create text file called flagcmd.txt myinput = "scan='4' mode='clip' correlation='ABS_RR' clipminmax=[0, 4]\n" self.filename = create_input(myinput) filename1 = 'filename1.txt' os.system('cp '+self.filename+' '+filename1) # save command to MS flagcmd(vis=self.vis, action='list', inpmode='list', inpfile=[myinput], savepars=True) # list/save to a file os.system('rm -rf myflags.txt') flagcmd(vis=self.vis, action='list', outfile='myflags.txt', savepars=True) # compare saved file with original input file self.assertTrue(filecmp.cmp(filename1, 'myflags.txt', 1), 'Files should be equal') ########## TEST 2 # create another input myinput = "scan='1~3' mode='manual'\n" self.filename = create_input(myinput) # apply and don't save to MS flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, action='apply', savepars=False, flagbackup=False) # list and check that parameters were not saved to MS os.system('rm -rf myflags.txt') flagcmd(vis=self.vis, action='list', outfile='myflags.txt', savepars=True) self.assertFalse(filecmp.cmp(self.filename, 'myflags.txt', 1), 'Files should not be equal') ########### TEST 3 # apply cmd from TEST 1 and update APPLIED column flagcmd(vis=self.vis, action='apply', savepars=False, flagbackup=False) # scans=1~3 should be fully flagged res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['scan']['1']['flagged'], 568134) self.assertEqual(res['scan']['1']['total'], 568134) self.assertEqual(res['scan']['2']['flagged'], 238140) self.assertEqual(res['scan']['2']['total'], 238140) self.assertEqual(res['scan']['3']['flagged'], 762048) self.assertEqual(res['scan']['3']['total'], 762048) # scan 4 should be partially flagged by clip mode self.assertEqual(res['scan']['4']['flagged'], 3348, 'Only RR should be flagged') self.assertEqual(res['scan']['4']['total'], 95256) # Only cmd from TEST 1 should be in MS os.system('rm -rf myflags.txt') flagcmd(vis=self.vis, action='list', outfile='myflags.txt', useapplied=True, savepars=True) self.assertTrue(filecmp.cmp(filename1, 'myflags.txt', 1), 'Files should be equal') def test_overwrite_true(self): '''flagcmd: Use savepars and overwrite=True''' # Create flag commands in file called flagcmd.txt myinput = "scan='4' mode='clip' correlation='ABS_RR' clipminmax=[0, 4]\n" self.filename = create_input(myinput) # Copy it to a new file newfile = 'newfile.txt' os.system('rm -rf '+newfile) os.system('cp '+self.filename+' '+newfile) # Create different flag command myinput = "scan='1'\n" self.filename = create_input(myinput) # Apply flags from filename and try to save in newfile # Overwrite parameter should allow this flagcmd(vis=self.vis, action='apply', inpmode='list',inpfile=self.filename, savepars=True, outfile=newfile, flagbackup=False) # newfile should contain what was in filename self.assertTrue(filecmp.cmp(self.filename, newfile, 1), 'Files should be equal') def test_overwrite_false(self): '''flagcmd: Use savepars and overwrite=False''' # Create flag commands in file called flagcmd.txt myinput = "scan='4' mode='clip' correlation='ABS_RR' clipminmax=[0, 4]\n" self.filename = create_input(myinput) # Copy it to a new file newfile = 'newfile.txt' os.system('rm -rf '+newfile) os.system('cp '+self.filename+' '+newfile) # Create different flag command myinput = "scan='1'\n" self.filename = create_input(myinput) passes = False try: # Apply flags from filename and try to save in newfile # Overwrite parameter at False should reject this flagcmd(vis=self.vis, action='apply', inpmode='list',inpfile=self.filename, savepars=True, outfile=newfile, flagbackup=False, overwrite=False) # CASA5 gets here, that's OK passes = True # do this additional test for CASA5 to make sure # newfile should contain what was in filename self.assertFalse(filecmp.cmp(self.filename, newfile, 1), 'Files should be different') except: # CASA 6 throws an exception print('Expected error!') passes = True self.assertTrue(passes) def test_overwrite_false1(self): '''flagcmd: Use savepars and overwrite=False''' # Remove any cmd from table flagcmd(vis=self.vis, action='clear', clearall=True) # Create flag commands in file called flagcmd.txt myinput = "scan='4' mode='clip' correlation='ABS_RR' clipminmax=[0, 4]\n" self.filename = create_input(myinput) newfile = 'myflags.txt' if os.path.exists("myflags.txt"): os.system('rm -rf myflags.txt') # Apply flags from filename and try to save in newfile # Overwrite=False shouldn't do anything as newfile doesn't exist flagcmd(vis=self.vis, action='apply', inpmode='list',inpfile=self.filename, savepars=True, outfile=newfile, flagbackup=False, overwrite=False) # newfile should contain what was in filename self.assertTrue(filecmp.cmp(self.filename, newfile, 1), 'Files should be the same') class test_XML(test_base): def setUp(self): self.setUp_evla() self.filename = '' self.filename1 = '' # During implementation of CAS-13049, test_xml2 would fail without a tearDown def tearDown(self): os.system('rm -rf tosr0001_scan3*.ms*') if os.path.exists(self.filename): os.system('rm -rf '+self.filename) if os.path.exists(self.filename1): os.system('rm -rf '+self.filename1) def test_xml1(self): '''flagcmd: list xml file and save in outfile''' # The MS only contains clip and shadow commands # The XML contain the online flags self.filename= 'origxml.txt' flagcmd(vis=self.vis, action='list', inpmode='xml', savepars=True, outfile=self.filename) # Now save the online flags to the FLAG_CMD without applying flagcmd(vis=self.vis, action='list', inpmode='xml', savepars=True) # Now apply them by selecting the reasons and save in another file # 507 cmds crash on my computer. reasons = ['ANTENNA_NOT_ON_SOURCE','FOCUS_ERROR','SUBREFLECTOR_ERROR'] self.filename1= 'myxml.txt' flagcmd(vis=self.vis, action='apply', reason=reasons, savepars=True, outfile=self.filename1, flagbackup=False) # Compare with original XML self.assertTrue(filecmp.cmp(self.filename, self.filename1,1), 'Files should be equal') # Check that APPLIED column has been updated to TRUE def test_xml2(self): '''flagcmd: list xml file and save in outfile''' # The MS only contains clip and shadow commands # CASA6 needs this os.system('rsync -a '+os.path.join(datapath,self.vis)+' .') # Apply the shadow command flagcmd(vis=self.vis, action='apply', reason='SHADOW', flagbackup=False) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 240640) def test_missing_corr(self): '''flagcmd: CAS-4234, non-existing correlation raise no error''' flagcmd(vis=self.vis, action='clear', clearall=True) flagcmd(vis=self.vis, inpmode='list', inpfile=["correlation='XX,RR,RL'"], action='list', savepars=True) flagcmd(vis=self.vis, action='apply', flagbackup=False) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 208000+208000, 'Should only flag RR and RL and not fail') class test_shadow(test_base): filename = '' def setUp(self): self.setUp_shadowdata() def tearDown(self) -> None: shutil.rmtree(self.vis, ignore_errors=True) if os.path.exists(self.filename): os.system('rm -rf '+self.filename) def test_CAS2399(self): '''flagcmd: shadow by antennas not present in MS''' # Create antennafile in disk myinput = 'name=VLA01\n'+\ 'diameter=25.0\n'+\ 'position=[-1601144.96146691, -5041998.01971858, 3554864.76811967]\n'+\ 'name=VLA02\n'+\ 'diameter=25.0\n'+\ 'position=[-1601105.7664601889, -5042022.3917835914, 3554847.245159178]\n'+\ 'name=VLA09\n'+\ 'diameter=25.0\n'+\ 'position=[-1601197.2182404203, -5041974.3604805721, 3554875.1995636248]\n'+\ 'name=VLA10\n'+\ 'diameter=25.0\n'+\ 'position=[-1601227.3367843349,-5041975.7011900628,3554859.1642644769]\n' # antfile = 'myants.txt' # if os.path.exists(antfile): # os.system('rm -rf myants.txt') self.filename = create_input(myinput) # Create command line myinput = ["mode='shadow' tolerance=0.0 addantenna='flagcmd.txt'"] # filename = 'cmdfile.txt' # if os.path.exists(filename): # os.system('rm -rf cmdfile.txt') # create_input(myinput, filename) # Flag flagcmd(vis=self.vis, action='clear', clearall=True) # flagcmd(vis=self.vis, action='apply', inpmode='list', inpfile=filename) flagcmd(vis=self.vis, action='apply', inpmode='list', inpfile=myinput, flagbackup=False) # Check flags res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['antenna']['VLA3']['flagged'], 3752) self.assertEqual(res['antenna']['VLA4']['flagged'], 1320) self.assertEqual(res['antenna']['VLA5']['flagged'], 1104) class test_antint(test_base): ''' For antint mode with inputs from files''' def setUp(self): self.setUp_evla() @classmethod def tearDownClass(cls) -> None: shutil.rmtree('tosr0001_scan3_noonline.ms', ignore_errors=True) def test_antint_defaults(self): '''flagcmd:: Test of antint with defaults''' ant_name = 'ea01' spw_spec = '0' flagdata(vis=self.vis, mode='unflag'); flagcmd(vis=self.vis, inpmode='list', inpfile=["mode=antint antint_ref_antenna='{0}' spw='{1}'".format(ant_name, spw_spec), "mode=summary spw='{0}'".format(spw_spec)], action='apply', flagbackup=False) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['total'], 832000) self.assertEqual(res['flagged'], 0) self.assertEqual(res['antenna'][ant_name]['total'], 64000) self.assertEqual(res['antenna'][ant_name]['flagged'], 0) def test_antint_inputs(self): '''flagcmd: test of inputs to flagcmd in antint mode''' antenna_spec = 'ea01' threshold_spec = '-0.1' spw_spec = '2' # Unflag flagdata(vis=self.vis,mode='unflag', flagbackup=False); # Flag passing parameter values in the command string flagcmd(vis=self.vis, inpmode='list', inpfile=["mode=antint antint_ref_antenna='{0}' spw='{1}' minchanfrac={2} verbose=True". format(antenna_spec, spw_spec, threshold_spec)], action='apply', flagbackup=False) res_str = flagdata(vis=self.vis, mode='summary') self.assertEqual(res_str['total'], 832000) self.assertEqual(res_str['flagged'], 416000) self.assertEqual(res_str['antenna'][antenna_spec]['total'], 64000) self.assertEqual(res_str['antenna'][antenna_spec]['flagged'], 32000) # Test rflag inputs with filenames, as well as inline thresholds. # rflag does not generate output thresholds for action='apply'. class test_rflag(test_base): def setUp(self): self.setUp_data4rflag() def tearDown(self) -> None: if os.path.exists('rflag_output_thresholds_freqdev0.txt'): os.system('rm -rf rflag_output_thresholds*') if os.path.exists('fdevfile.txt'): os.system('rm -rf *devfile.txt') if os.path.exists('outcmd.txt'): os.system('rm -rf outcmd.txt') @classmethod def tearDownClass(cls) -> None: os.system('rm -rf Four_ants_3C286.ms*') def test_rflaginputs(self): """flagcmd:: Test of rflag threshold-inputs of both types (file and inline) """ # (1) and (2) are the same as test_flagdata[test_rflag3]. # -- (3),(4),(5) should produce the same answers as (1) and (2) # # (1) Test input/output files, through the task, mode='rflag' #flagdata(vis=self.vis, mode='rflag', spw='9,10', timedev='tdevfile.txt', \ # freqdev='fdevfile.txt', action='calculate'); #flagdata(vis=self.vis, mode='rflag', spw='9,10', timedev='tdevfile.txt', \ # freqdev='fdevfile.txt', action='apply'); #res1 = flagdata(vis=self.vis, mode='summary') #print "(1) Finished flagdata : test 1 : ", res1['flagged'] # (2) Test rflag output written to cmd file via mode='rflag' and 'savepars' # and then read back in via list mode. # Also test the 'savepars' when timedev and freqdev are specified differently... #flagdata(vis=self.vis,mode='unflag'); #flagdata(vis=self.vis, mode='rflag', spw='9,10', timedev='', \ # freqdev=[],action='calculate',savepars=True,outfile='outcmd.txt'); #os.system('cat outcmd.txt') #flagdata(vis=self.vis, mode='list', inpfile='outcmd.txt'); #res2 = flagdata(vis=self.vis, mode='summary') #print "(2) Finished flagdata : test 2 : ", res2['flagged'] # (3) flagcmd : Send in the same text files produces/used in (1) # Note after CAS-5808 flagdata multiplies by the time/freqdevscale # factors when action='apply'. Default scale factors are 5. tdev1 = 0.038859101518899999 tdev2 = 0.16283325492600001 input1 = ("{'name':'Rflag'," + "'timedev':[[1.0,9.0,{0}],[1.0,10.0,{1}]]". format(tdev1, tdev2) + "}\n") fdev1 = 0.079151260994399994 fdev2 = 0.20569361620099999 input2 = ("{'name':'Rflag'," + "'freqdev':[[1.0,9.0,{0}],[1.0,10.0,{1}]]". format(fdev1, fdev2) + "}\n") filename1 = create_input(input1,'tdevfile.txt') filename2 = create_input(input2,'fdevfile.txt') commlist=["mode='rflag' spw='9,10' extendflags=False timedev='" + filename1+"' "+ "freqdev='" + filename2 + "' timedevscale=5 freqdevscale=5"] # flagdata(vis=self.vis,mode='unflag', flagbackup=False); flagcmd(vis=self.vis, inpmode='list', inpfile=commlist, action='apply', flagbackup=False) res3 = flagdata(vis=self.vis, mode='summary') print("(3) Finished flagcmd test : using tdevfile, fdevfile in the cmd (test 1)) : ", res3['flagged']) # (4) Give the values directly in the cmd input..... commstr = ("mode='rflag' spw='9,10' extendflags=False " "timedev=[[1.0,9.0,{0}],[1.0,10.0,{1}]] " "freqdev=[[1.0,9.0,{2}],[1.0,10.0,{3}]] " "timedevscale=5 freqdevscale=5". format(tdev1, tdev2, fdev1, fdev2)) flagdata(vis=self.vis,mode='unflag', flagbackup=False); flagcmd(vis=self.vis, inpmode='list', inpfile=[commstr], action='apply', flagbackup=False) res4 = flagdata(vis=self.vis, mode='summary') print("(4) Finished flagcmd test : using cmd arrays : ", res4['flagged']) # (5) Use the outcmd.txt file generated by (2). # i.e. re-run the threshold-generation of (2) with savepars=True flagdata(vis=self.vis, mode='unflag', flagbackup=False); flagdata(vis=self.vis, mode='rflag', spw='9,10', timedev='', freqdev=[], action='calculate', savepars=True, outfile='outcmd.txt', extendflags=False); # Note after CAS-5808, when mode='rflag' and action='calculate' the # time/freqdevscale parameters are ignored, or forced to 1.0. # So the threshold scale for res3 and res4 is x 5.0 whereas the # scale for res5 is X 1.0 flagcmd(vis=self.vis, inpmode='list', inpfile='outcmd.txt'); res5 = flagdata(vis=self.vis, mode='summary') print("(5) Finished flagcmd test : using outcmd.txt from flagdata (test 2) : ", res5['flagged']) self.assertEqual(res3['flagged'], res4['flagged']); self.assertEqual(res5['flagged'], 39504); self.assertEqual(res3['flagged'], 5971.0) def test_rflagauto(self): """flagcmd:: Test of rflag with defaults """ # (6) flagcmd AUTO. Should give same answers as test_flagdata[test_rflag_auto_thresholds] flagdata(vis=self.vis,mode='unflag'); flagcmd(vis=self.vis, inpmode='list', inpfile=['mode=rflag spw=9,10 extendflags=False'], action='apply', flagbackup=False) res6 = flagdata(vis=self.vis, mode='summary') print("(6) Finished flagcmd test : auto : ", res6['flagged']) #(7) flagdata AUTO (same as test_flagdata[test_rflag_auto_thresholds]) #flagdata(vis=self.vis,mode='unflag'); #flagdata(vis=self.vis, mode='rflag', spw='9,10'); #res7 = flagdata(vis=self.vis, mode='summary') #print("\n---------------------- Finished flagdata test : auto : ", res7['flagged']) self.assertEqual(res6['flagged'], 42728.0) class test_actions(test_base): def setUp(self): self.setUp_data4rflag() def tearDown(self) -> None: os.system('rm -rf list*txt *plot*png reasonfile*txt') if os.path.exists('flagcmd.txt'): os.system('rm -rf flagcmd.txt') if os.path.exists('cas9366.flags.txt'): os.system('rm -rf cas9366.flags.txt') @classmethod def tearDownClass(cls) -> None: os.system('rm -rf ngc5921.ms*') os.system('rm -rf Four_ants_3C286.ms*') def test_action_plot_table(self): '''flagcmd: Test action=plot, nothing plotted''' outplot = 'noplot.png' flagcmd(vis=self.vis, inpmode='list', inpfile=["intent='CAL*POINT*' field=''","scan='5'"], action='list', savepars=True) flagcmd(vis=self.vis, inpmode='table', useapplied=True, action='plot', plotfile=outplot) self.assertTrue(os.path.exists(outplot),'Plot file was not created') # CAS-5180 def test_action_plot_list(self): '''flagcmd: Test action=plot to plot 4 antennas and no timerange''' outplot = 'fourplot.png' cmds = ["antenna='ea01' reason='none'", "antenna='ea11' reason='no_reason'", "antenna='ea19' reason='none'", "antenna='ea24' reason='other'"] flagcmd(vis=self.vis, inpmode='list', inpfile=cmds, action='plot',plotfile=outplot) self.assertTrue(os.path.exists(outplot),'Plot file was not created') def test_action_plot_large(self): '''flagcmd: Test action=plot to plot many antennas into multiple pages''' cmds = [] for i in range(60): cmds.append("antenna='ea%03d' reason='none%d'" % (i, i % 5)) r = flagcmd(vis=self.vis, inpmode='list', inpfile=cmds, action='plot', plotfile='manyplot.png') self.assertIn('plotfiles', r) self.assertEqual(len(r['plotfiles']), 3) self.assertEqual(r['plotfiles'], ['manyplot-001.png', 'manyplot-002.png','manyplot-003.png']) for f in r['plotfiles']: self.assertTrue(os.path.exists(f)) def test_action_plot_da(self): '''flagcmd: Test action=plot to plot page with DA antenna CAS-5187''' cmds = [] for i in range(28): cmds.append("antenna='DA%03d' reason='none%d'" % (i, i % 5)) r = flagcmd(vis=self.vis, inpmode='list', inpfile=cmds, action='plot', plotfile='manyplot.png') self.assertIn('plotfiles', r) self.assertEqual(len(r['plotfiles']), 1) self.assertEqual(r['plotfiles'], ['manyplot.png']) self.assertTrue(os.path.exists('manyplot.png')) def test_action_list1(self): '''flagcmd: action=list with inpmode from a list''' flagcmd(vis=self.vis, action='clear', clearall=True) cmd = ["spw='5~7'","spw='1'"] flagcmd(vis=self.vis, action='list', inpmode='list', inpfile=cmd, savepars=True) # Apply the flags flagcmd(vis=self.vis) res=flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'],1099776) def test_action_list2(self): '''flagcmd: action=list with inpmode from a file''' flagcmd(vis=self.vis, action='clear', clearall=True) cmd = "spw='5~7'\n"+\ "spw='1'" self.filename = create_input(cmd) flagcmd(vis=self.vis, action='list', inpmode='list', inpfile=self.filename, savepars=True) # Apply the flags flagcmd(vis=self.vis) res=flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'],1099776) def test_CAS4819(self): '''flagcmd: CAS-4819, Flag commands from three files''' self.setUp_ngc5921() # creat first input file myinput = "scan='1'\n"\ "scan='2'\n"\ "scan='3'" filename1 = 'list7a.txt' create_input1(myinput, filename1) # Create second input file myinput = "scan='5'\n"\ "scan='6'\n"\ "scan='7'" filename2 = 'list7b.txt' create_input1(myinput, filename2) # Create third input file myinput = "scan='4' mode='clip' clipminmax=[0,4]" filename3 = 'list7c.txt' create_input1(myinput, filename3) flagcmd(vis=self.vis, inpmode='list', inpfile=[filename1,filename2,filename3], flagbackup=False, action='apply') res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['scan']['1']['flagged'], 568134) self.assertEqual(res['scan']['2']['flagged'], 238140) self.assertEqual(res['scan']['3']['flagged'], 762048) self.assertEqual(res['scan']['4']['flagged'], 6696) self.assertEqual(res['scan']['5']['flagged'], 142884) self.assertEqual(res['scan']['6']['flagged'], 857304) self.assertEqual(res['scan']['7']['flagged'], 190512) self.assertEqual(res['total'],2854278) self.assertEqual(res['flagged'],2765718) def test_list_reason1(self): '''flagcmd: select by reason from two files''' self.setUp_ngc5921() # creat first input file myinput = "scan='1' spw='0:10~20' reason='NONE'\n"\ "scan='2' reason='EVEN'\n"\ "scan='3' reason='ODD'" filename1 = 'reasonfile1.txt' create_input1(myinput, filename1) # Create second input file myinput = "scan='5' reason='ODD'\n"\ "scan='6' reason='EVEN'\n"\ "scan='7' reason='ODD'" filename2 = 'reasonfile2.txt' create_input1(myinput, filename2) # Apply flag cmds on ODD reason flagcmd(vis=self.vis, inpmode='list', inpfile=[filename1,filename2], reason='ODD', flagbackup=False, action='apply') res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['scan']['3']['flagged'], 762048) self.assertEqual(res['scan']['5']['flagged'], 142884) self.assertEqual(res['scan']['6']['flagged'], 0) self.assertEqual(res['scan']['7']['flagged'], 190512) self.assertEqual(res['flagged'], 762048+142884+190512) def test_spw_error_handler(self): '''flagcmd: A non-existing spw name in a compound with a existing spw should not fail''' # CAS-9366: flagcmd fails when applying flags based on an spw selection by name, when # one of the spws do not exist # The spw names in Four_ants_3C286.ms are not unique. They are: # Subband:0 Subband:1 Subband:2 Subband:3 Subband:4 Subband:5 Subband:6 Subband:7 # spw=0,8 1,9 2,10 etc. # Copy the input flagcmd file with a non-existing spw name # flagsfile has spw='"Subband:1","Subband:2","Subband:8" flagsfile = 'cas9366.flags.txt' os.system('cp -RH '+os.path.join(datapath,flagsfile)+' '+ ' .') # Save flags commands to FLAG_CMD table flagcmd(self.vis, inpmode='list', inpfile=flagsfile, action='list', savepars=True) # Try to flag flagcmd(self.vis, action='apply') # Check flagged spws s = flagdata(self.vis, mode='summary') # should flag spws 1,2,9,10 self.assertEqual(s['flagged'],274944.0*4) class test_cmdbandpass(test_base): """Flagcmd:: Test flagging task with Bpass-based CalTable """ def setUp(self): self.setUp_bpass_case() def tearDown(self): shutil.rmtree(self.vis, ignore_errors=True) os.system('rm -rf flagcmd.txt') @classmethod def tearDownClass(cls) -> None: os.system('rm -rf cal.fewscans*') os.system('rm -rf Four_ants_3C286.ms*') if os.path.exists('calflags.txt'): os.system('rm -rf calflags.txt') def test_unsupported_mode_in_list(self): '''Flagcmd: elevation and shadow are not supported in cal tables''' res = flagcmd(vis=self.vis, inpmode='list', inpfile=["mode='elevation'", "spw='1'"]) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['spw']['1']['flagged'], 83200) def test_default_cparam(self): '''Flagcmd: flag CPARAM data column''' flist = ["mode='clip' clipzeros=True datacolumn='CPARAM'"] flagcmd(vis=self.vis, inpmode='list', inpfile=flist, flagbackup=False) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 11078, 'Should use CPARAM as the default column') def test_manual_field_selection_for_bpass(self): """Flagcmd:: Manually flag a bpass-based CalTable using field selection""" flagcmd(vis=self.vis, inpmode='list', inpfile=["field='3C286_A'"], flagbackup=False) summary=flagdata(vis=self.vis, mode='summary') self.assertEqual(summary['field']['3C286_A']['flagged'], 499200.0) self.assertEqual(summary['field']['3C286_B']['flagged'], 0) self.assertEqual(summary['field']['3C286_C']['flagged'], 0) self.assertEqual(summary['field']['3C286_D']['flagged'], 0) def test_list_field_Selection_for_bpass(self): """Flagcmd:: Manually flag a bpass-based CalTable using file in list mode """ myinput = "field='3C286_A'" self.filename = create_input(myinput) flagcmd(vis=self.vis, inpmode='list', inpfile=self.filename, flagbackup=False) summary=flagdata(vis=self.vis, mode='summary') self.assertEqual(summary['field']['3C286_A']['flagged'], 499200.0) self.assertEqual(summary['field']['3C286_B']['flagged'], 0) self.assertEqual(summary['field']['3C286_C']['flagged'], 0) self.assertEqual(summary['field']['3C286_D']['flagged'], 0) def test_MS_flagcmds(self): """Flagcmd:: Save flags to MS and apply to cal table""" self.setUp_data4rflag() msfile = self.vis flagcmd(vis=msfile, action='clear', clearall=True) flagdata(vis=msfile, antenna='ea09', action='', savepars=True) self.setUp_bpass_case() flagcmd(vis=self.vis, inpfile=msfile, action='apply', flagbackup=False) summary=flagdata(vis=self.vis, mode='summary') self.assertEqual(summary['antenna']['ea09']['flagged'], 48000.0) self.assertEqual(summary['antenna']['ea10']['flagged'], 0.0) def test_clip_one_list(self): '''Flagcmd: Flag one solution using one command in a list''' flist = ["mode='clip' clipminmax=[0,3] correlation='REAL_Sol1' datacolumn='CPARAM'"] flagcmd(vis=self.vis, inpmode='list', inpfile=flist) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 309388) self.assertEqual(res['correlation']['Sol2']['flagged'], 0) def test_flagbackup(self): '''Flagcmd: backup cal table flags''' flagmanager(vis=self.vis, mode='list') aflocal.open(self.vis) self.assertEqual(len(aflocal.getflagversionlist()), 2) aflocal.done() flagcmd(vis=self.vis, inpmode='list', inpfile=["spw='3'"]) flagmanager(vis=self.vis, mode='list') aflocal.open(self.vis) self.assertEqual(len(aflocal.getflagversionlist()), 3) aflocal.done() flagcmd(vis=self.vis, inpmode='list', inpfile=["spw='4'"], flagbackup=False) flagmanager(vis=self.vis, mode='list') aflocal.open(self.vis) self.assertEqual(len(aflocal.getflagversionlist()), 3) aflocal.done() newname = 'BackupBeforeSpwFlags' flagmanager(vis=self.vis, mode='rename', oldname='flagcmd_1', versionname=newname, comment='Backup of flags before applying flags on spw') flagmanager(vis=self.vis, mode='list') aflocal.open(self.vis) self.assertEqual(len(aflocal.getflagversionlist()), 3) aflocal.done() # Apply spw=5 flags flagcmd(vis=self.vis, inpmode='list', inpfile=["spw='5'"], flagbackup=True) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['spw']['3']['flagged'], 83200) self.assertEqual(res['spw']['4']['flagged'], 83200) self.assertEqual(res['spw']['5']['flagged'], 83200) self.assertEqual(res['flagged'], 83200*3) # Restore backup flagmanager(vis=self.vis, mode='restore', versionname='BackupBeforeSpwFlags', merge='replace') res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['spw']['5']['flagged'], 0) self.assertEqual(res['flagged'], 0) def test_save_cal(self): '''Flagcmd: list flag cmds from MS and save to a file''' self.setUp_data4rflag() msfile = self.vis flagcmd(vis=msfile, action='clear', clearall=True) # Save cmds to FLAG_CMD table flagcmd(vis=msfile, inpmode='list', inpfile=["spw='1,3", "mode='clip' clipminmax=[0,500] datacolumn='SNR'"], action='list', savepars=True) self.setUp_bpass_case() flagcmds = 'calflags.txt' if os.path.exists(flagcmds): os.system('rm -rf '+flagcmds) # Apply to cal table and save to an external file flagcmd(vis=self.vis, inpmode='table', inpfile=msfile, savepars=True, outfile=flagcmds) self.assertTrue(os.path.exists(flagcmds)) res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 246315) # Apply from list in file and compare flagdata(vis=self.vis, mode='unflag') flagcmd(vis=self.vis, inpmode='list', inpfile=flagcmds, action='apply') res = flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'], 246315) def test_cal_time_field(self): '''Flagcmd: clip a timerange from a field''' # this timerange corresponds to field 3C286_D flags = "mode='clip' timerange='>14:58:33.6' clipzeros=True clipminmax=[0.,0.4]"\ " datacolumn='CPARAM'" # Apply the flags flagcmd(vis=self.vis, inpmode='list', inpfile=[flags], flagbackup=False) res=flagdata(vis=self.vis, mode='summary') self.assertEqual(res['field']['3C286_A']['flagged'],0) self.assertEqual(res['field']['3C286_B']['flagged'],0) self.assertEqual(res['field']['3C286_C']['flagged'],0) self.assertEqual(res['field']['3C286_D']['flagged'],2221) self.assertEqual(res['flagged'],2221) def test_cal_observation(self): '''Flagcmd: flag an observation from an old cal table format''' # Note: this cal table does not have an observation column. # The column and sub-table should be added and the flagging # should happen after this. flagcmd(vis=self.vis, inpmode='list', inpfile=["observation='0'"], flagbackup=False) res=flagdata(vis=self.vis, mode='summary') self.assertEqual(res['flagged'],1248000) self.assertEqual(res['total'],1248000) if __name__ == '__main__': unittest.main()