Commits
Federico Montesino Pouzols authored and Ville Suoranta committed 601dacfebee Merge
1 + | from typing import Dict, Optional, Tuple, Union |
1 2 | |
2 - | |
3 + | import numpy as np |
3 4 | from .mstools import write_history |
4 5 | from casatools import table, ms, mstransformer |
5 6 | from casatools import measures as me |
6 7 | from casatasks import casalog |
7 8 | from .parallel.parallel_data_helper import ParallelDataHelper |
8 9 | |
9 10 | |
10 11 | def phaseshift( |
11 - | vis=None, outputvis=None, keepmms=None, field=None, |
12 - | spw=None, scan=None, intent=None, array=None, |
13 - | observation=None, datacolumn=None, phasecenter=None |
12 + | vis: str, |
13 + | outputvis: str, |
14 + | keepmms: bool, |
15 + | field: Optional[str], |
16 + | spw: Optional[str], |
17 + | scan: Optional[str], |
18 + | intent: Optional[str], |
19 + | array: Optional[str], |
20 + | observation: Optional[str], |
21 + | datacolumn: Optional[str], |
22 + | phasecenter: Union[str, dict], |
14 23 | ): |
15 24 | """ |
16 25 | Changes the phase center for either short or large |
17 26 | offsets/angles w.r.t. the original |
18 27 | """ |
19 - | casalog.origin('phaseshift') |
28 + | casalog.origin("phaseshift") |
20 29 | |
21 30 | if len(phasecenter) == 0: |
22 - | raise ValueError('phasecenter parameter must be specified') |
31 + | raise ValueError("phasecenter parameter must be specified") |
23 32 | # Initiate the helper class |
24 33 | pdh = ParallelDataHelper("phaseshift", locals()) |
25 34 | |
26 35 | # Validate input and output parameters |
27 36 | try: |
28 37 | pdh.setupIO() |
29 38 | except Exception as instance: |
30 - | casalog.post(str(instance), 'ERROR') |
39 + | casalog.post(str(instance), "ERROR") |
31 40 | raise RuntimeError(str(instance)) |
32 41 | |
33 42 | # Input vis is an MMS |
34 43 | if pdh.isMMSAndNotServer(vis) and keepmms: |
35 44 | if not pdh.validateInputParams(): |
36 - | raise RuntimeError('Unable to continue with MMS processing') |
45 + | raise RuntimeError("Unable to continue with MMS processing") |
37 46 | |
38 - | pdh.setupCluster('phaseshift') |
47 + | pdh.setupCluster("phaseshift") |
39 48 | |
40 49 | # Execute the jobs |
41 50 | try: |
42 51 | pdh.go() |
43 52 | except Exception as instance: |
44 - | casalog.post(str(instance), 'ERROR') |
53 + | casalog.post(str(instance), "ERROR") |
45 54 | raise RuntimeError(str(instance)) |
46 55 | return |
47 56 | |
48 - | # Create local copies of tools (has to be here, otherwise |
49 - | # ParallelDataHelper has a porblem digest the locals |
57 + | # Actual task code starts here |
58 + | # Gather all the parameters in a dictionary. |
59 + | config = {} |
60 + | |
61 + | config = pdh.setupParameters( |
62 + | inputms=vis, |
63 + | outputms=outputvis, |
64 + | field=field, |
65 + | spw=spw, |
66 + | array=array, |
67 + | scan=scan, |
68 + | intent=intent, |
69 + | observation=observation, |
70 + | ) |
71 + | |
72 + | colnames = _get_col_names(vis) |
73 + | # Check if CORRECTED column exists, when requested |
74 + | datacolumn = datacolumn.upper() |
75 + | if datacolumn == "CORRECTED": |
76 + | if "CORRECTED_DATA" not in colnames: |
77 + | casalog.post( |
78 + | "Input data column CORRECTED_DATA does not exist. Will use DATA", "WARN" |
79 + | ) |
80 + | datacolumn = "DATA" |
81 + | |
82 + | casalog.post(f"Will use datacolumn = {datacolumn}", "DEBUG") |
83 + | config["datacolumn"] = datacolumn |
84 + | |
85 + | # Call MSTransform framework with tviphaseshift=True |
86 + | config["tviphaseshift"] = True |
87 + | config["reindex"] = False |
88 + | tviphaseshift_config = {"phasecenter": phasecenter} |
89 + | config["tviphaseshiftlib"] = tviphaseshift_config |
90 + | |
91 + | # Configure the tool |
92 + | casalog.post(str(config), "DEBUG1") |
50 93 | |
51 - | tblocal = table() |
52 - | mslocal = ms() |
53 94 | mtlocal = mstransformer() |
54 - | melocal = me() |
95 + | try: |
96 + | mtlocal.config(config) |
55 97 | |
56 - | # Actual task code starts here |
98 + | # Open the MS, select the data and configure the output |
99 + | mtlocal.open() |
100 + | |
101 + | # Run the tool |
102 + | casalog.post("Shift phase center") |
103 + | mtlocal.run() |
104 + | finally: |
105 + | mtlocal.done() |
106 + | |
107 + | # Write history to output MS, not the input ms. |
57 108 | try: |
58 - | dirstr = phasecenter.split(' ') |
59 - | if not melocal.direction(dirstr[0], dirstr[1], dirstr[2]): |
60 - | raise ValueError("Illegal phacecenter specification " + phasecenter) |
61 - | try: |
62 - | # Gather all the parameters in a dictionary. |
63 - | config = {} |
109 + | mslocal = ms() |
110 + | param_names = phaseshift.__code__.co_varnames[: phaseshift.__code__.co_argcount] |
111 + | vars = locals() |
112 + | param_vals = [vars[p] for p in param_names] |
113 + | casalog.post("Updating the history in the output", "DEBUG1") |
114 + | write_history( |
115 + | mslocal, outputvis, "phaseshift", param_names, param_vals, casalog |
116 + | ) |
117 + | except Exception as instance: |
118 + | casalog.post(f"*** Error {instance} updating HISTORY", "WARN") |
119 + | raise RuntimeError(str(instance)) |
120 + | finally: |
121 + | mslocal.done() |
64 122 | |
65 - | config = pdh.setupParameters( |
66 - | inputms=vis, outputms=outputvis, field=field, |
67 - | spw=spw, array=array, scan=scan, intent=intent, |
68 - | observation=observation |
69 - | ) |
123 + | casalog.post( |
124 + | "Updating the FIELD subtable of the output MeasurementSet with shifted" |
125 + | " phase centers", |
126 + | "INFO", |
127 + | ) |
128 + | _update_field_subtable(outputvis, field, phasecenter) |
70 129 | |
71 - | # Check if CORRECTED column exists, when requested |
72 - | datacolumn = datacolumn.upper() |
73 - | if datacolumn == 'CORRECTED': |
74 - | tblocal.open(vis) |
75 - | if 'CORRECTED_DATA' not in tblocal.colnames(): |
76 - | casalog.post( |
77 - | 'Input CORRECTED_DATA does not exist. Will use DATA', |
78 - | 'WARN' |
79 - | ) |
80 - | datacolumn = 'DATA' |
81 - | tblocal.close() |
82 130 | |
83 - | casalog.post('Will use datacolumn = ' + datacolumn, 'DEBUG') |
84 - | config['datacolumn'] = datacolumn |
131 + | def _get_col_names(vis: str) -> np.ndarray: |
132 + | tblocal = table() |
133 + | try: |
134 + | tblocal.open(vis) |
135 + | colnames = tblocal.colnames() |
136 + | finally: |
137 + | tblocal.done() |
138 + | return colnames |
85 139 | |
86 - | # Call MSTransform framework with tviphaseshift=True |
87 - | config['tviphaseshift'] = True |
88 - | tviphaseshift_config = {} |
89 - | tviphaseshift_config['phasecenter'] = phasecenter |
90 - | config['tviphaseshiftlib'] = dict(tviphaseshift_config) |
91 140 | |
92 - | # Configure the tool |
93 - | casalog.post(str(config), 'DEBUG1') |
94 - | mtlocal.config(config) |
141 + | def _update_field_subtable(outputvis: str, field: str, phasecenter: Union[str, dict]): |
142 + | """Update MS/FIELD subtable with shifted center(s).""" |
95 143 | |
96 - | # Open the MS, select the data and configure the output |
97 - | mtlocal.open() |
144 + | try: |
145 + | tblocal = table() |
146 + | # modify FIELD table |
147 + | tblocal.open(outputvis + "/FIELD", nomodify=False) |
148 + | pcol = tblocal.getcol("PHASE_DIR") |
98 149 | |
99 - | # Run the tool |
100 - | casalog.post('Shift phase center') |
101 - | mtlocal.run() |
150 + | field_frames = _find_field_ref_frames(tblocal) |
151 + | if isinstance(phasecenter, str): |
152 + | if field: |
153 + | try: |
154 + | field_id = int(field) |
155 + | except ValueError as _exc: |
156 + | fnames = tblocal.getcol("NAME") |
157 + | field_id = np.where(fnames == field)[0][0] |
158 + | thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[field_id]) |
159 + | pcol[0][0][field_id] = thenewra_rad |
160 + | pcol[1][0][field_id] = thenewdec_rad |
161 + | else: |
162 + | for row in range(0, tblocal.nrows()): |
163 + | thenewra_rad, thenewdec_rad = _convert_to_ref_frame(phasecenter, field_frames[row]) |
164 + | pcol[0][0][row] = thenewra_rad |
165 + | pcol[1][0][row] = thenewdec_rad |
102 166 | |
103 - | mtlocal.done() |
167 + | elif isinstance(phasecenter, dict): |
168 + | for field_id, field_center in phasecenter.items(): |
169 + | field_iidx = int(field_id) |
170 + | thenewra_rad, thenewdec_rad = _convert_to_ref_frame(field_center, field_frames[field_iidx]) |
171 + | pcol[0][0][field_iidx] = thenewra_rad |
172 + | pcol[1][0][field_iidx] = thenewdec_rad |
104 173 | |
105 - | except Exception as instance: |
106 - | mtlocal.done() |
107 - | casalog.post(str(instance), 'ERROR') |
108 - | raise RuntimeError(str(instance)) |
174 + | tblocal.putcol("PHASE_DIR", pcol) |
109 175 | |
110 - | # Write history to output MS, not the input ms. |
111 - | try: |
112 - | param_names = phaseshift.__code__.co_varnames[ |
113 - | :phaseshift.__code__.co_argcount |
114 - | ] |
115 - | vars = locals() |
116 - | param_vals = [vars[p] for p in param_names] |
117 - | |
118 - | casalog.post('Updating the history in the output', 'DEBUG1') |
119 - | write_history( |
120 - | mslocal, outputvis, 'phaseshift', param_names, |
121 - | param_vals, casalog |
122 - | ) |
123 - | except Exception as instance: |
124 - | casalog.post( |
125 - | "*** Error " + str(instance) |
126 - | + " updating HISTORY", 'WARN' |
127 - | ) |
128 - | raise RuntimeError(str(instance)) |
176 + | except Exception as instance: |
177 + | casalog.post(f"*** Error '%s' updating FIELD subtable {instance}", "WARN") |
178 + | raise RuntimeError(str(instance)) |
179 + | finally: |
180 + | tblocal.done() |
129 181 | |
130 - | # Update field table |
131 - | try: |
132 182 | |
133 - | # Parse phase center string to obtain ra/dec |
134 - | dirstr = phasecenter.split(' ') |
135 - | try: |
136 - | thedir = melocal.direction(dirstr[0], dirstr[1], dirstr[2]) |
137 - | if (dirstr[0] != 'J2000'): |
138 - | # Convert to J2000 |
139 - | thedir = melocal.measure(thedir, 'J2000') |
140 - | thenewra_rad = thedir['m0']['value'] |
141 - | thenewdec_rad = thedir['m1']['value'] |
142 - | except Exception as instance: |
143 - | casalog.post( |
144 - | "*** Error " + str(instance) |
145 - | + " when interpreting parameter \'phasecenter\': ", |
146 - | 'SEVERE' |
147 - | ) |
148 - | raise RuntimeError(str(instance)) |
149 - | |
150 - | # modify FIELD table |
151 - | tblocal.open(outputvis + '/FIELD', nomodify=False) |
152 - | pcol = tblocal.getcol('PHASE_DIR') |
153 - | for row in range(0, tblocal.nrows()): |
154 - | pcol[0][0][row] = thenewra_rad |
155 - | pcol[1][0][row] = thenewdec_rad |
156 - | tblocal.putcol('PHASE_DIR', pcol) |
157 - | except Exception as instance: |
158 - | casalog.post( |
159 - | "*** Error \'%s\' updating FIELD" + str(instance), 'WARN' |
183 + | def _find_field_ref_frames(tblocal: table) -> Dict[int, str]: |
184 + | """ |
185 + | Given an open FIELD subtable, returns a dict of {field: reference_frame} for the PHASE_DIR |
186 + | column, where field is of type int, and reference_frame is of type str. |
187 + | |
188 + | This handles: |
189 + | - simple metadata where the reference frame is in the keywords of the PHASE_DIR column (same |
190 + | ref frame for all fields) |
191 + | - variable (per-field) reference frame metadata, usually in a PhaseDir_Ref additional column |
192 + | """ |
193 + | |
194 + | dir_col = "PHASE_DIR" |
195 + | metainfo = tblocal.getcolkeyword(dir_col, "MEASINFO") |
196 + | nrows = tblocal.nrows() |
197 + | if "Ref" in metainfo: |
198 + | ref_frame = metainfo["Ref"] |
199 + | field_ref_frames = {field_id: ref_frame for field_id in range(0, nrows)} |
200 + | elif "Ref" not in metainfo and ("VarRefCol" in metainfo and "TabRefTypes" in metainfo and |
201 + | "TabRefCodes" in metainfo): |
202 + | col = metainfo["VarRefCol"] # presumably PhaseDir_Ref |
203 + | ref_frame_codes = tblocal.getcol(col) |
204 + | ref_codes_to_frame_idx = {code: idx for idx, code in enumerate(metainfo["TabRefCodes"])} |
205 + | ref_frame_names = [metainfo["TabRefTypes"][ref_codes_to_frame_idx[code]] for code in |
206 + | ref_frame_codes] |
207 + | field_ref_frames = {field_id: ref_frame_names[field_id] for field_id in np.arange(0, nrows)} |
208 + | else: |
209 + | raise RuntimeError("Error when retrieving reference frames from the metadata of column " |
210 + | f"{dir_col}. The field 'Ref' is not present but could not find the " |
211 + | "fields 'VarRefCol', 'TabRefTypes', and 'TabRefCodes'") |
212 + | |
213 + | return field_ref_frames |
214 + | |
215 + | |
216 + | def _convert_to_ref_frame(phasecenter: str, output_ref_frame: str) -> Tuple[float, float]: |
217 + | """ |
218 + | Converts one phasecenter (presumably given as input to this task) to another reference |
219 + | frame (presumably the frame used in the (output) MS for the relevant field). |
220 + | |
221 + | When applying phaseshift, the output MS has the same reference frames as the input MS, as |
222 + | propagated by mstransform. |
223 + | |
224 + | Returns the v0,v1 (RA,Dec) values in units of radians and using the requested frame, ready to be |
225 + | written to a FIELD subtable. |
226 + | """ |
227 + | def parse_phasecenter(center: str) -> Tuple[str, str, str]: |
228 + | """ |
229 + | Parse phase center string to obtain ra/dec (in rad). Splits the: |
230 + | - (optional) frame, |
231 + | - v0 (typically RA), |
232 + | - v1 (typically Dec) |
233 + | in a phasecenter string. |
234 + | """ |
235 + | dirstr = center.split(" ") |
236 + | if 3 == len(dirstr): |
237 + | dir_frame, dir_v0, dir_v1 = dirstr[0], dirstr[1], dirstr[2] |
238 + | elif 2 == len(dirstr): |
239 + | dir_frame = "" # J2000 will be default in me.direction, etc. |
240 + | dir_v0, dir_v1 = dirstr[0], dirstr[1] |
241 + | else: |
242 + | raise AttributeError(f"Wrong phasecenter string: '{center}'. It must have 3 or 2 " |
243 + | "items separated by space(s): 'reference_frame RA Dec' or 'RA Dec'") |
244 + | |
245 + | return dir_frame, dir_v0, dir_v1 |
246 + | |
247 + | try: |
248 + | melocal = me() |
249 + | dir_frame, dir_v0, dir_v1 = parse_phasecenter(phasecenter) |
250 + | |
251 + | # Note: even if the input frame is the same as output_ref_frame, we need to ensure units of |
252 + | # radians for the FIELD subtable |
253 + | if dir_frame: |
254 + | thedir = melocal.direction(dir_frame, dir_v0, dir_v1) |
255 + | else: |
256 + | thedir = melocal.direction(v0=dir_v0, v1=dir_v1) |
257 + | if not thedir: |
258 + | raise RuntimeError( |
259 + | f"measures.direction() failed for phasecenter string: {phasecenter}" |
160 260 | ) |
161 - | raise RuntimeError(str(instance)) |
162 261 | |
262 + | if dir_frame != output_ref_frame: |
263 + | thedir = melocal.measure(thedir, output_ref_frame) |
264 + | thenewra_rad = thedir["m0"]["value"] |
265 + | thenewdec_rad = thedir["m1"]["value"] |
266 + | except Exception as instance: |
267 + | casalog.post( |
268 + | f"*** Error {instance} when interpreting parameter 'phasecenter': ", |
269 + | "SEVERE", |
270 + | ) |
271 + | raise RuntimeError(str(instance)) |
163 272 | finally: |
164 - | if tblocal: |
165 - | tblocal.done() |
166 - | tblocal = None |
167 - | if mslocal: |
168 - | mslocal.done() |
169 - | mslocal = None |
170 - | if mtlocal: |
171 - | mtlocal.done() |
172 - | mtlocal = None |
173 - | if melocal: |
174 - | melocal.done() |
175 - | melocal = None |
273 + | melocal.done() |
176 274 | |
275 + | return thenewra_rad, thenewdec_rad |