from socket import timeout as socket_timeout
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import urlopen
from casatasks import casalog
from casatasks.private.sdutil import (table_manager, table_selector,
from casatools import measures
from casatools import ms as mstool
from casatools import msmetadata, quanta
def gen_factor_via_web_api(vis, spw='*',
timeout=180, retry=3, retry_wait_time=5):
"""Generate factors via Jy/K Web API.
This function will be used task_gencal.
vis {str} -- The file path of the visibility data.
spw {str} -- Spectral windows.
endpoint {str} -- The endpoint of Jy/K DB Web API to access. Options are
'asdm' (default), 'model-fit', 'interpolation'.
timeout {int} -- Maximum waiting time [sec] for the Web API access, defaults
retry {int} -- Number of retry when the Web API access fails, defaults to 3
retry_wait_time {int} -- Waiting time [sec] until next query when the Web API
access fails, defaults to 5 sec.
assert endpoint in ['asdm', 'model-fit', 'interpolation'], \
'The JyPerKDatabaseClient class requires one of endpoint: asdm, model-fit or interpolation'
return __factor_creator_via_jy_per_k_db(endpoint=endpoint, vis=vis, spw=spw,
factory=__jyperk_factory[endpoint],
timeout=timeout, retry=retry,
retry_wait_time=retry_wait_time)
def __factor_creator_via_jy_per_k_db(endpoint='', vis=None, spw='*',
timeout=180, retry=3, retry_wait_time=5):
params_generator = factory[0]
response_translator = factory[1]
params = params_generator.get_params(vis, spw=spw)
client = JyPerKDatabaseClient(endpoint,
timeout=timeout, retry=retry,
retry_wait_time=retry_wait_time)
manager = RequestsManager(client)
resps = manager.get(params)
return response_translator.convert(resps, vis, spw=spw)
QueryStruct = collections.namedtuple('QueryStruct', ['param', 'subparam'])
ResponseStruct = collections.namedtuple('ResponseStruct', ['response', 'subparam'])
class ASDMParamsGenerator():
"""A class to generate required parameters for Jy/K Web API with asdm.
vis = "./uid___A002_X85c183_X36f.ms"
params = ASDMParamsGenerator.get_params(vis)
def get_params(cls, vis, spw=None):
"""Generate required parameters for Jy/K Web API.
vis {str} -- The file path of the visibility data.
spw {str} -- This parameter is not used. It is provided to align the
calling method with other classes (InterpolationParamsGenerator and