Commits

Kazuhiko Shimada authored 143142dbc11
CAS-13612: uses new xml-casa and modified casaxmlutil.py and sdcal.xml for it
No tags

casatasks/src/private/casaxmlutil.py

Modified
1 1 import os
2 2 import functools
3 3 import inspect
4 4 from xml.dom import minidom
5 -import opcode
6 5
7 6 import casatasks
8 7
9 8 # constants for generating converter method
10 9 __FUNCTION = 'override_args'
11 10 __ARGS = '_a'
12 11 __ARGS_DICT = '_d'
13 12 __ARGS_SUPPLIED = '_s'
14 13 __LOGLEVEL_IN_FUNCTION = 'INFO'
15 14
96 95 # Any errors are handled outside the task.
97 96 # however, the implementation below is effectively
98 97 # equivalent to handling it inside the task.
99 98 try:
100 99 funcname = func.__name__
101 100
102 101 # load the function name and arguments which is wrapped the decorator
103 102 # get an object reference to read informantion of argument
104 103 func_ = func.__dict__.get('__wrapped__', func)
105 104
106 - # load user-supplied arguments from current bytecode.
107 - # task calling bytecode (dumped by dis.dis()) is:
108 - #
109 - # [opcode] [opland]
110 - # ...
111 - # LOAD_CONST [n]
112 - # CALL_FUNCTION_KW [m] <- current, pointed by frame.f_lasti
113 - # ...
114 - #
115 - # At the frame next to '__call__' on the call stack,
116 - # bytecode[f_lasti] is the opcode of CALL_FUNCTION_KW executing currently.
117 - # then, bytecode[f_lasti-2] is the opcode of LOAD_CONST,
118 - # bytecode[f_lasti-1] is an index of the constant stack co_consts.
119 - # co_consts[bytecode[f_lasti-1]] points the user-supplied arguments of the task called currently.
120 -
121 105 is_recursive_load = False
122 - next_to_call = args_on_code = False
123 106 for frame in inspect.stack():
124 107 if frame.function == func_.__name__:
125 108 # when the task is called from the same task (ex: sdcal with two calmodes calls itself)
126 109 is_recursive_load = True
127 - if next_to_call:
128 - if hasattr(frame.frame, 'f_code'):
129 - const_stack = frame.frame.f_code.co_consts
130 - bytecodes = frame.frame.f_code.co_code
131 - code_index = frame.frame.f_lasti
132 - if bytecodes[code_index - 2] == opcode.opmap['LOAD_CONST']:
133 - args_on_code = const_stack[bytecodes[code_index - 1]]
134 - casatasks.casalog.post(f'user-supplied arguments:{args_on_code}', 'INFO')
135 - next_to_call = False
136 - if frame.function == '__call__':
137 - # the order of stacktrace should be '__call__' -> '<module>',
138 - # we should load user-supplied arguments from the next to '__call__'
139 - next_to_call = True
140 110
141 111 if is_recursive_load:
142 112 casatasks.casalog.post('recursive task call', 'INFO')
143 113 retval = func(*args, **kwargs)
144 114 else:
145 - arg_keys = func_.__code__.co_varnames
146 - arg_count = func_.__code__.co_argcount
147 - args_ = [''] * arg_count
148 - supplied_args_flags = [False] * arg_count
149 -
150 - # copy all arguments into args_
151 - # Note: all arguments (args, kwargs) defined a task are converted into
152 - # position args by task.__call__() generated from task XML
153 - args_[:len(args)] = args
154 - args_position_dict = {key: i for i, key in enumerate(arg_keys)}
115 +
116 + # generate the converter method
117 + args_, args_position_dict, converter_function_string = __load_xml(funcname)
118 +
119 + for i in range(len(args)):
120 + args_[i] = args[i]
121 + supplied_args_flags = [False] * len(args_position_dict)
122 +
155 123 kwargs_ = dict()
156 124 for k, v in kwargs.items():
157 - if args_position_dict.get(k) is not None and args_position_dict[k] < arg_count:
125 + if args_position_dict.get(k) is not None:
158 126 args_[args_position_dict[k]] = v
127 + supplied_args_flags[args_position_dict[k]] = True
159 128 else:
160 129 kwargs_[k] = v
161 130
162 - if args_on_code and isinstance(args_on_code, tuple):
163 - for k in args_on_code:
164 - i = args_position_dict.get(k, False)
165 - if i is not False:
166 - supplied_args_flags[i] = True
167 -
168 - # generate the converter method
169 - func_ = __generate_constraints_from_xml(funcname)
170 -
171 131 if __DEBUG:
172 - print(func_)
132 + print(converter_function_string)
173 133 pprint(args_position_dict)
174 134 pprint(args_)
175 135
176 - # override args by the converter generated
136 + # override args by the converter generated from xml
177 137 casatasks.casalog.post('loaded constraints from XML', 'INFO')
178 - exec(func_)
138 + exec(converter_function_string)
179 139 exec(f'{__FUNCTION}(args_, args_position_dict, supplied_args_flags)')
180 140
181 141 # execute task
182 142 retval = func(*args_, **kwargs_)
183 143
184 144 except Exception:
185 145 raise
186 146 return retval
187 147 return wrapper
188 148
189 149
190 150 def __get_taskxmlfilepath(task):
191 151 if not isinstance(task, str):
192 152 return False
193 153 xmlpath = os.path.abspath(casatasks.__path__[0]) + '/__xml__'
194 154 taskxmlfile = f'{xmlpath}/{task}.xml'
195 155 if os.path.isfile(taskxmlfile) and os.access(taskxmlfile, os.R_OK):
196 156 return taskxmlfile
197 157 return False
198 158
199 159
200 -def __generate_constraints_from_xml(task):
160 +def __load_xml(task):
201 161 # return False if file loadging faults
202 162 taskxml = __get_taskxmlfilepath(task)
203 163
204 164 stmt = []
205 165 if taskxml:
206 166 dom = minidom.parse(taskxml)
207 167 constraints = dom.getElementsByTagName('constraints')[0]
208 168 for s in constraints.getElementsByTagName('when'):
209 169 __handle_when(s, stmt)
210 - return __convert_stmt_to_pycode(stmt)
170 + args = [__generate_default_value(param) for param in dom.getElementsByTagName('param')]
171 + args_position_dict = {param.getAttribute('name'): i for i, param in enumerate(dom.getElementsByTagName('param'))}
172 + return args, args_position_dict, __convert_stmt_to_pycode(stmt)
173 +
174 +
175 +def __generate_default_value(param):
176 + type_ = param.getAttribute('type')
177 + value_, type_ = __handle_value(param.getElementsByTagName('value')[0], type_)
178 + if type_ == 'int':
179 + return int(value_)
180 + elif type_ == 'double':
181 + return float(value_)
182 + elif type_ == 'bool':
183 + return value_ == 'True'
184 + return value_
211 185
212 186
213 187 def __convert_stmt_to_pycode(stmt_list):
214 188 ret = f'def {__FUNCTION}({__ARGS}, {__ARGS_DICT}, {__ARGS_SUPPLIED}):\n'
215 189 if len(stmt_list) > 0:
216 190 for [stmt, indent] in stmt_list:
217 191 ret += __indent(indent) + stmt + '\n'
218 192 else:
219 193 ret += __indent(1) + 'pass\n'
220 194 return ret

Everything looks good. We'll let you know here if there's anything you should know about.

Add shortcut