from socket import timeout as socket_timeout
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import urlopen
from casatasks import casalog
from casatasks.private.sdutil import (table_manager, table_selector,
from casatools import measures
from casatools import ms as mstool
from casatools import msmetadata, quanta
def gen_factor_via_web_api(vis, spw='*',
timeout=180, retry=3, retry_wait_time=5):
"""Generate factors via Jy/K Web API.
This function will be used task_gencal.
vis {str} -- The file path of the visibility data.
spw {str} -- Spectral windows.
endpoint {str} -- The endpoint of Jy/K DB Web API to access. Options are
'asdm' (default), 'model-fit', 'interpolation'.
timeout {int} -- Maximum waiting time [sec] for the Web API access, defaults
retry {int} -- Number of retry when the Web API access fails, defaults to 3
retry_wait_time {int} -- Waiting time [sec] until next query when the Web API
access fails, defaults to 5 sec.
assert endpoint in ['asdm', 'model-fit', 'interpolation'], \
'The JyPerKDatabaseClient class requires one of endpoint: asdm, model-fit or interpolation'
return __factor_creator_via_jy_per_k_db(endpoint=endpoint, vis=vis, spw=spw,
factory=__jyperk_factory[endpoint],
timeout=timeout, retry=retry,
retry_wait_time=retry_wait_time)
def __factor_creator_via_jy_per_k_db(endpoint='', vis=None, spw='*',
timeout=180, retry=3, retry_wait_time=5):
params_generator = factory[0]
response_translator = factory[1]
params = params_generator.get_params(vis, spw=spw)