casatasks/src/private/task_phaseshift.py

Modified
4 4
5 5 import numpy as np
6 6 from .mstools import write_history
7 7 from casatools import table, ms, mstransformer
8 8 from casatools import measures as me
9 9 from casatasks import casalog
10 10 from .parallel.parallel_data_helper import ParallelDataHelper
11 11
12 12
13 13 def phaseshift(
14 - vis: str, outputvis: str, keepmms: bool, field: Optional[str],
15 - spw: Optional[str], scan: Optional[str], intent: Optional[str], array: Optional[str],
16 - observation: Optional[str], datacolumn: Optional[str], phasecenter: Union[str, dict]
14 + vis: str,
15 + outputvis: str,
16 + keepmms: bool,
17 + field: Optional[str],
18 + spw: Optional[str],
19 + scan: Optional[str],
20 + intent: Optional[str],
21 + array: Optional[str],
22 + observation: Optional[str],
23 + datacolumn: Optional[str],
24 + phasecenter: Union[str, dict],
17 25 ):
18 26 """
19 27 Changes the phase center for either short or large
20 28 offsets/angles w.r.t. the original
21 29 """
22 - casalog.origin('phaseshift')
30 + casalog.origin("phaseshift")
23 31
24 32 if len(phasecenter) == 0:
25 - raise ValueError('phasecenter parameter must be specified')
33 + raise ValueError("phasecenter parameter must be specified")
26 34 # Initiate the helper class
27 35 pdh = ParallelDataHelper("phaseshift", locals())
28 36
29 37 # Validate input and output parameters
30 38 try:
31 39 pdh.setupIO()
32 40 except Exception as instance:
33 - casalog.post(str(instance), 'ERROR')
41 + casalog.post(str(instance), "ERROR")
34 42 raise RuntimeError(str(instance))
35 43
36 44 # Input vis is an MMS
37 45 if pdh.isMMSAndNotServer(vis) and keepmms:
38 46 if not pdh.validateInputParams():
39 - raise RuntimeError('Unable to continue with MMS processing')
47 + raise RuntimeError("Unable to continue with MMS processing")
40 48
41 - pdh.setupCluster('phaseshift')
49 + pdh.setupCluster("phaseshift")
42 50
43 51 # Execute the jobs
44 52 try:
45 53 pdh.go()
46 54 except Exception as instance:
47 - casalog.post(str(instance), 'ERROR')
55 + casalog.post(str(instance), "ERROR")
48 56 raise RuntimeError(str(instance))
49 57 return
50 58
51 -
52 59 # Actual task code starts here
53 60 # Gather all the parameters in a dictionary.
54 61 config = {}
55 62
56 63 config = pdh.setupParameters(
57 - inputms=vis, outputms=outputvis, field=field,
58 - spw=spw, array=array, scan=scan, intent=intent,
59 - observation=observation
64 + inputms=vis,
65 + outputms=outputvis,
66 + field=field,
67 + spw=spw,
68 + array=array,
69 + scan=scan,
70 + intent=intent,
71 + observation=observation,
60 72 )
61 73
62 -
63 74 colnames = _get_col_names(vis)
64 75 # Check if CORRECTED column exists, when requested
65 76 datacolumn = datacolumn.upper()
66 - if datacolumn == 'CORRECTED':
67 - if 'CORRECTED_DATA' not in colnames:
77 + if datacolumn == "CORRECTED":
78 + if "CORRECTED_DATA" not in colnames:
68 79 casalog.post(
69 - 'Input data column CORRECTED_DATA does not exist. Will use DATA',
70 - 'WARN'
80 + "Input data column CORRECTED_DATA does not exist. Will use DATA", "WARN"
71 81 )
72 - datacolumn = 'DATA'
73 -
74 - casalog.post('Will use datacolumn = ' + datacolumn, 'DEBUG')
75 - config['datacolumn'] = datacolumn
82 + datacolumn = "DATA"
76 83
84 + casalog.post("Will use datacolumn = " + datacolumn, "DEBUG")
85 + config["datacolumn"] = datacolumn
77 86
78 87 # Call MSTransform framework with tviphaseshift=True
79 - config['tviphaseshift'] = True
80 - config['reindex'] = False
81 - tviphaseshift_config = {'phasecenter': phasecenter}
82 - config['tviphaseshiftlib'] = tviphaseshift_config
88 + config["tviphaseshift"] = True
89 + config["reindex"] = False
90 + tviphaseshift_config = {"phasecenter": phasecenter}
91 + config["tviphaseshiftlib"] = tviphaseshift_config
83 92
84 93 # Configure the tool
85 - casalog.post(str(config), 'DEBUG1')
94 + casalog.post(str(config), "DEBUG1")
86 95
87 96 mtlocal = mstransformer()
88 97 try:
89 98 mtlocal.config(config)
90 99
91 100 # Open the MS, select the data and configure the output
92 101 mtlocal.open()
93 102
94 103 # Run the tool
95 - casalog.post('Shift phase center')
104 + casalog.post("Shift phase center")
96 105 mtlocal.run()
97 106 finally:
98 107 mtlocal.done()
99 108
100 109 # Write history to output MS, not the input ms.
101 110 try:
102 111 mslocal = ms()
103 - param_names = phaseshift.__code__.co_varnames[
104 - :phaseshift.__code__.co_argcount
105 - ]
112 + param_names = phaseshift.__code__.co_varnames[: phaseshift.__code__.co_argcount]
106 113 vars = locals()
107 114 param_vals = [vars[p] for p in param_names]
108 - casalog.post('Updating the history in the output', 'DEBUG1')
115 + casalog.post("Updating the history in the output", "DEBUG1")
109 116 write_history(
110 - mslocal, outputvis, 'phaseshift', param_names,
111 - param_vals, casalog
117 + mslocal, outputvis, "phaseshift", param_names, param_vals, casalog
112 118 )
113 119 except Exception as instance:
114 - casalog.post(f"*** Error {instance} updating HISTORY", 'WARN')
120 + casalog.post(f"*** Error {instance} updating HISTORY", "WARN")
115 121 raise RuntimeError(str(instance))
116 122 finally:
117 123 mslocal.done()
118 124
119 - casalog.post('Updating the FIELD subtable of the output MeasurementSet with shifted'
120 - ' phase centers', 'INFO')
125 + casalog.post(
126 + "Updating the FIELD subtable of the output MeasurementSet with shifted"
127 + " phase centers",
128 + "INFO",
129 + )
121 130 _update_field_subtable(outputvis, field, phasecenter)
122 131
123 132
124 133 def _get_col_names(vis: str) -> np.ndarray:
125 134 tblocal = table()
126 135 try:
127 136 tblocal.open(vis)
128 137 colnames = tblocal.colnames()
129 138 finally:
130 139 tblocal.done()
131 140 return colnames
132 141
133 142
134 143 def _update_field_subtable(outputvis: str, field: str, phasecenter: Union[str, dict]):
135 - """ Update MS/FIELD subtable with shifted center(s). """
144 + """Update MS/FIELD subtable with shifted center(s)."""
136 145 try:
137 146 tblocal = table()
138 147 # modify FIELD table
139 - tblocal.open(outputvis + '/FIELD', nomodify=False)
140 - pcol = tblocal.getcol('PHASE_DIR')
148 + tblocal.open(outputvis + "/FIELD", nomodify=False)
149 + pcol = tblocal.getcol("PHASE_DIR")
141 150
142 151 if isinstance(phasecenter, str):
143 152 thenewra_rad, thenewdec_rad = _convert_to_ra_dec_j2000(phasecenter)
144 153 if field:
145 154 try:
146 155 field_id = int(field)
147 156 except ValueError as exc:
148 - fnames = tblocal.getcol('NAME')
157 + fnames = tblocal.getcol("NAME")
149 158 field_id = np.where(fnames == field)[0][0]
150 159 pcol[0][0][field_id] = thenewra_rad
151 160 pcol[1][0][field_id] = thenewdec_rad
152 161 else:
153 162 for row in range(0, tblocal.nrows()):
154 163 pcol[0][0][row] = thenewra_rad
155 164 pcol[1][0][row] = thenewdec_rad
156 165
157 166 elif isinstance(phasecenter, dict):
158 167 for field_id, field_center in phasecenter.items():
159 168 thenewra_rad, thenewdec_rad = _convert_to_ra_dec_j2000(field_center)
160 169 field_iidx = int(field_id)
161 170 pcol[0][0][field_iidx] = thenewra_rad
162 171 pcol[1][0][field_iidx] = thenewdec_rad
163 172
164 - tblocal.putcol('PHASE_DIR', pcol)
173 + tblocal.putcol("PHASE_DIR", pcol)
165 174
166 175 except Exception as instance:
167 - casalog.post(
168 - "*** Error \'%s\' updating FIELD subtable" + str(instance),
169 - 'WARN')
176 + casalog.post("*** Error '%s' updating FIELD subtable" + str(instance), "WARN")
170 177 raise RuntimeError(str(instance))
171 178 finally:
172 179 tblocal.done()
173 180
174 181
175 182 def _convert_to_ra_dec_j2000(phasecenter: str) -> tuple[float, float]:
176 - """ Parse phase center string to obtain ra/dec (in rad) """
177 - dirstr = phasecenter.split(' ')
183 + """Parse phase center string to obtain ra/dec (in rad)"""
184 + dirstr = phasecenter.split(" ")
178 185 try:
179 186 melocal = me()
180 187 thedir = melocal.direction(dirstr[0], dirstr[1], dirstr[2])
181 188 if not thedir:
182 - raise RuntimeError(f"measures.direction() failed for phasecenter string:"
183 - f" {phasecenter}")
184 - if (dirstr[0] != 'J2000'):
189 + raise RuntimeError(
190 + f"measures.direction() failed for phasecenter string:" f" {phasecenter}"
191 + )
192 + if dirstr[0] != "J2000":
185 193 # Convert to J2000
186 - thedir = melocal.measure(thedir, 'J2000')
187 - thenewra_rad = thedir['m0']['value']
188 - thenewdec_rad = thedir['m1']['value']
194 + thedir = melocal.measure(thedir, "J2000")
195 + thenewra_rad = thedir["m0"]["value"]
196 + thenewdec_rad = thedir["m1"]["value"]
189 197 except Exception as instance:
190 198 casalog.post(
191 - "*** Error " + str(instance)
192 - + " when interpreting parameter \'phasecenter\': ",
193 - 'SEVERE'
199 + "*** Error "
200 + + str(instance)
201 + + " when interpreting parameter 'phasecenter': ",
202 + "SEVERE",
194 203 )
195 204 raise RuntimeError(str(instance))
196 205 finally:
197 206 melocal.done()
198 207
199 208 return thenewra_rad, thenewdec_rad

Everything looks good. We'll let you know here if there's anything you should know about.

Add shortcut