Commits

update and extend tests for output center values in same frame as metadata, CAS-13616
No tags

casatasks/tests/tasks/test_task_phaseshift.py

Modified
95 95 Ensures that the field subtable of outputvis has the expected phase centers
96 96 (from 'new_centers' passed to the phasecenter parameter of phaseshift),
97 97 comparing also unmodified ('passthrough') fields with the 'inputvis'.
98 98
99 99 :param outputvis: output, phase-shifted MS to check
100 100 :param inputvis: input MS, to compare against when some fields unchanged
101 101 :param new_centers: string or dict with new phase center(s)
102 102 :param field_selection: field selected as integer if used (only single value supported)
103 103
104 104 """
105 - from casatasks.private.task_phaseshift import _convert_to_ra_dec_j2000
105 + from casatasks.private.task_phaseshift import (_find_field_ref_frames,
106 + _convert_to_ref_frame)
106 107
107 108 def get_expected_output_ra_dec(row, new_centers, input_phase_col,
108 - field_selection):
109 + field_selection, field_frames):
109 110 field_id = str(row)
110 111 if isinstance(new_centers, dict):
111 112 if field_id in new_centers:
112 - ra_rad, dec_rad = _convert_to_ra_dec_j2000(
113 - new_centers[field_id])
113 + ra_rad, dec_rad = _convert_to_ref_frame(new_centers[field_id],
114 + field_frames[row])
114 115 else:
115 116 ra_rad = input_phase_col[0, 0, row]
116 117 dec_rad = input_phase_col[1, 0, row]
117 118 else:
118 119 if field_selection and row != field_selection:
119 120 ra_rad = input_phase_col[0, 0, row]
120 121 dec_rad = input_phase_col[1, 0, row]
121 122 else:
122 - ra_rad, dec_rad = _convert_to_ra_dec_j2000(new_centers)
123 + ra_rad, dec_rad = _convert_to_ref_frame(new_centers, field_frames[row])
123 124
124 125 return ra_rad, dec_rad
125 126
126 127 def get_field_subt_phasedir_col(vis_path):
127 128 try:
128 129 tblocal = table()
129 130 tblocal.open(vis_path + '/FIELD', nomodify=True)
130 131 phase_col = tblocal.getcol('PHASE_DIR')
131 132 finally:
132 133 tblocal.done()
133 134
134 135 return phase_col
135 136
137 + def get_field_frame_names(vis_path):
138 + try:
139 + tblocal = table()
140 + tblocal.open(vis_path + '/FIELD', nomodify=True)
141 + field_frames = _find_field_ref_frames(tblocal)
142 + finally:
143 + tblocal.done()
144 +
145 + return field_frames
136 146
137 147 phase_col = get_field_subt_phasedir_col(outputvis)
138 148 input_phase_col = get_field_subt_phasedir_col(inputvis)
149 + # Get as reference the ref frames of the input vis (with mstransform we expect the output MS
150 + # should have the same metadata = same frames as the input MS)
151 + field_frames = get_field_frame_names(inputvis)
152 +
139 153 for row in range(0, phase_col.shape[-1]):
140 154 ra_rad, dec_rad = get_expected_output_ra_dec(row, new_centers,
141 155 input_phase_col,
142 - field_selection)
156 + field_selection,
157 + field_frames)
143 158
144 159 # The 0 in the middle is the 'NUM_POLY' axis
145 160 self.assertEqual(phase_col[0, 0, row], ra_rad,
146 161 f"unexpected PHASE_DIR ra value in row {row} "
147 162 f"(with {new_centers=})")
148 163 self.assertEqual(phase_col[1, 0, row], dec_rad,
149 164 f"unexpected PHASE_DIR dec value in row {row} "
150 165 f"(with {new_centers=})")
151 166
152 167
1020 1035 self.assertTrue(
1021 1036 x['max'][0]/x['rms'][0] < 5,
1022 1037 msg='Incorrectly found signal in empty field, got S/N of '
1023 1038 + str(x['max'][0]/x['rms'][0])
1024 1039 )
1025 1040 self.__delete_intermediate_products()
1026 1041
1027 1042
1028 1043 class phaseshift_subfunctions_test(unittest.TestCase):
1029 1044
1030 - def test__convert_to_j2000(self):
1031 - from casatasks.private.task_phaseshift import _convert_to_ra_dec_j2000
1045 + def setUp(self):
1046 + shutil.copytree(datapath, datacopy)
1047 + shutil.copytree(datapath_ngc, datacopy_ngc)
1048 + change_perms(datacopy)
1049 + change_perms(datacopy_ngc)
1050 +
1051 + def tearDown(self):
1052 + shutil.rmtree(datacopy)
1053 + shutil.rmtree(datacopy_ngc)
1054 +
1055 + def test__fiend_field_ref_frame(self):
1056 + from casatasks.private.task_phaseshift import _find_field_ref_frames
1057 +
1058 + try:
1059 + tblocal = table()
1060 + tblocal.open(datacopy + "/FIELD", nomodify=True)
1061 + ref_frames = _find_field_ref_frames(tblocal)
1062 + finally:
1063 + tblocal.close()
1064 +
1065 + self.assertEqual(ref_frames, {0: "J2000"})
1066 +
1067 + def test__fiend_field_ref_frame_b1950_vla(self):
1068 + from casatasks.private.task_phaseshift import _find_field_ref_frames
1069 +
1070 + try:
1071 + tblocal = table()
1072 + tblocal.open(datacopy_ngc + "/FIELD", nomodify=True)
1073 + ref_frames = _find_field_ref_frames(tblocal)
1074 + finally:
1075 + tblocal.close()
1076 +
1077 + self.assertEqual(ref_frames, {0: "B1950_VLA", 1: 'B1950_VLA', 2: 'B1950_VLA'})
1078 +
1079 + def test__convert_to_ref_frame(self):
1080 + from casatasks.private.task_phaseshift import _convert_to_ref_frame
1032 1081
1033 1082 phasecenter = 'J2000 19h53m50 40d06m00'
1034 - fra, fdec = _convert_to_ra_dec_j2000(phasecenter)
1083 + fra, fdec = _convert_to_ref_frame(phasecenter, "J2000")
1035 1084 places = 6
1036 1085 self.assertAlmostEqual(fra, -1.074105, places=places)
1037 1086 self.assertAlmostEqual(fdec, 0.6998770, places=places)
1038 1087
1039 - def test__convert_to_j2000_using_default_frame(self):
1040 - from casatasks.private.task_phaseshift import _convert_to_ra_dec_j2000
1088 + def test__convert_to_ref_frame_using_default_frame(self):
1089 + from casatasks.private.task_phaseshift import _convert_to_ref_frame
1041 1090
1042 1091 phasecenter = '19h53m50 40d06m00'
1043 - fra, fdec = _convert_to_ra_dec_j2000(phasecenter)
1092 + fra, fdec = _convert_to_ref_frame(phasecenter, "J2000")
1044 1093 places = 6
1045 1094 self.assertAlmostEqual(fra, -1.074105, places=places)
1046 1095 self.assertAlmostEqual(fdec, 0.6998770, places=places)
1047 1096
1048 - def test__convert_to_j2000_wrong(self):
1049 - from casatasks.private.task_phaseshift import _convert_to_ra_dec_j2000
1097 + def test__convert_to_ref_frame_wrong(self):
1098 + from casatasks.private.task_phaseshift import _convert_to_ref_frame
1050 1099
1051 1100 phasecenter = 'BOGUS xxh53m50 40d06m00'
1052 1101 with self.assertRaisesRegex(RuntimeError, expected_regex="failed"):
1053 - fra, fdec = _convert_to_ra_dec_j2000(phasecenter)
1102 + fra, fdec = _convert_to_ref_frame(phasecenter, "B1950_VLA")
1103 +
1104 + def test__convert_to_ref_frame_icrs(self):
1105 + from casatasks.private.task_phaseshift import _convert_to_ref_frame
1106 +
1107 + phasecenter = 'B1950 19h53m50 40d06m00'
1108 + fra, fdec = _convert_to_ref_frame(phasecenter, "ICRS")
1109 + places = 6
1110 + self.assertAlmostEqual(fra, -1.066522, places=places)
1111 + self.assertAlmostEqual(fdec, 0.7022094, places=places)
1054 1112
1055 1113
1056 1114 class phaseshift_multi_phasecenter_test(phaseshift_base_checks):
1057 1115 """ Tests around the use of multi-field phasecenter values (dicts) """
1058 1116
1059 1117 # Other candidates, could have been:
1060 1118 # uid___A002_X30a93d_X43e_small.ms: 3 fields, but only 3 scans, and >240MB
1061 1119 # uid___X02_X3d737_X1_01_small.ms: 3 fields, but only 3 scans
1062 1120 # twocenteredpointsources.ms: simulated, 2 fields, only 2 scans, (fixvis)
1063 1121 datadir_multifield = os.path.join('measurementset', 'alma')
1151 1209 def test_test_nodict(self):
1152 1210 new_center = 'J2000 19h53m50 40d06m00'
1153 1211 result = phaseshift(datacopy, outputvis=self.outputvis,
1154 1212 phasecenter=new_center)
1155 1213
1156 1214 self.check_nrows(self.outputvis, 15344)
1157 1215 self.check_field_subtable(self.outputvis, datacopy, new_center)
1158 1216
1159 1217 def test_phasecenter_dict_one_out(self):
1160 1218 ''' Check multiple field phasecenter(s) given as a dict, skip one field '''
1161 - new_centerA = 'J2000 19h53m50 40d06m00'
1162 - new_centerB = 'J2000 22h01m02 40d04m03'
1219 + new_centerA = 'GALACTIC 19h53m50 40d06m00'
1220 + new_centerB = 'GALACTIC 22h01m02 40d04m03'
1163 1221 phasecenter = {'0': new_centerA,
1164 1222 '2' : new_centerB}
1165 1223 result = phaseshift(datacopy, outputvis=self.outputvis,
1166 1224 phasecenter=phasecenter)
1167 1225
1168 1226 self.assertEqual(result, None)
1169 1227 self.check_nrows(self.outputvis, 15344)
1170 1228 self.check_field_subtable(self.outputvis, datacopy, phasecenter)
1171 1229
1172 1230 for ddi in self.relevant_ddis_multifield:
1173 1231 self.check_field_unchanged(datacopy, self.outputvis, ddi=ddi,
1174 1232 field='1')
1175 1233
1176 1234 def test_phasecenter_dict_with_field_selection_overlapping(self):
1177 1235 ''' Check multiple field phasecenter(s) given as a dict,
1178 1236 skip one field, with selection of all fields in phasecenter dict '''
1179 - new_centerA = 'J2000 19h53m50 40d06m00'
1180 - new_centerB = 'J2000 22h01m02 40d04m03'
1237 + new_centerA = 'ICRS 19h53m50 40d06m00'
1238 + new_centerB = 'ICRS 22h01m02 40d04m03'
1181 1239 phasecenter = {'0': new_centerA,
1182 1240 '2' : new_centerB}
1183 1241 result = phaseshift(datacopy, outputvis=self.outputvis, field='0,2',
1184 1242 phasecenter=phasecenter)
1185 1243
1186 1244 self.assertEqual(result, None)
1187 1245 self.check_nrows(self.outputvis, 11536)
1188 1246 self.check_field_subtable(self.outputvis, datacopy, phasecenter)
1189 1247
1190 1248 def test_phasecenter_dict_with_field_selection_nonoverlapping(self):

Everything looks good. We'll let you know here if there's anything you should know about.

Add shortcut