Commits

Rui Xue authored 126dbf3ffc1 Merge
Pull request #1335: PIPE-2234/PIPE-2131: selfcal vla cube imaging updates

Merge in PIPE/pipeline from PIPE-2234-selfcal-vla-cube-imaging-updates to main * commit '1d00b7337cb2e9a0d1b2b8ee496838bff69bd6b9': (33 commits) PIPE-1346: fix a minor QA regression bug (unwanted niter-limit warnings) introduced in 9eaf2381eea; decrease maxproductsize to 100 GB in test recipes. PIPE-2275: make the heuristics-skipping conditions mutually exclusive in checkproductsize. PIPE-2234: fix typos from the code review feedback. PIPE-2234: Refactor conditions for skipping CheckProductSize heuristics. PIPE-2242: use the PL2024 hifa_image recipe for the selfcal restoration test. PIPE-2260: suppress the dubious warning message when calling the VLA restfreq heuristics method with specmode='cont'. PIPE-2260: use cube center frequencies in LSRK/SOURCE as tclean rest frequency settings. PIPE-2234: improve the logging messages in a corner case (no suitable datatype). PIPE-2234: replace 'Contline applied' with 'Cont. applied' in the weblog; note that selfcal is applied to CONT_SCIENCE rather than CONTLINE_SCIENCE for VLA. PIPE-1346: do not cont-imaging uvrange heuristics for VLA cube imaging. PIPE-1346: use the imaging base-class imsize heuristics for VLA cube imaging. PIPE-1346: use robust=2.0 for VLA cube imaging. PIPE-1346: add a simple QA scoring rule for VLA cube imaging. PIPE-2258: avoid direct use of pb-mask in selfcal_helpers.estimate_near_field_SNR(). PIPE-2181: decrease maxproductsize to 100 GB. PIPE-2275: remove the explict set for context.vla_skip_mfs_and_cube_imaging. PIPE-2260: explicitly use the spw center frequency as per-spw rest frequency for VLA cube imaging. PIPE-2255: avoid writing a blank cont.dat file if hif_findcont is bypassed in the VLA cube-imaging workflow. PIPE-2275: use the datatype registration info for "stage skipping" in the VLA cube imaging workflow. PIPE-2272: suppress the "Undefined representative bandwidth" warning when running hif_findcont on VLA data. ...
No tags

pipeline/domain/measurementset.py

Modified
6 6 import operator
7 7 import os
8 8 import re
9 9 from typing import TYPE_CHECKING, List, Optional, Tuple, Union
10 10
11 11 import numpy as np
12 12
13 13 import pipeline.infrastructure as infrastructure
14 14 import pipeline.infrastructure.utils as utils
15 15 from pipeline.infrastructure import casa_tools
16 -from pipeline.infrastructure.utils import conversion
17 16
18 17 if TYPE_CHECKING: # Avoid circular import. Used only for type annotation.
19 18 from pipeline.infrastructure.tablereader import RetrieveByIndexContainer
20 19
21 20 from pipeline.infrastructure import logging
22 -from pipeline.infrastructure.utils import range_to_list
23 21
24 22 from . import measures, spectralwindow
25 23 from .antennaarray import AntennaArray
26 24 from .datatype import DataType
27 25
28 26 LOG = infrastructure.get_logger(__name__)
29 27
30 28
31 29 class MeasurementSet(object):
32 30 """
252 250 fields_with_name = frozenset(self.get_fields(task_arg=field))
253 251 pool = [s for s in pool if not fields_with_name.isdisjoint(s.fields)]
254 252
255 253 if spw is not None:
256 254 if not isinstance(spw, collections.Sequence):
257 255 spw = (spw,)
258 256 if isinstance(spw, str):
259 257 if spw in ('', '*'):
260 258 spw = ','.join(str(spw.id) for spw in self.spectral_windows)
261 259 if '~' in spw:
262 - spw = conversion.range_to_list(spw)
260 + spw = utils.range_to_list(spw)
263 261 else:
264 262 spw = spw.split(',')
265 263 spw = {int(i) for i in spw}
266 264 pool = {scan for scan in pool for scan_spw in scan.spws if scan_spw.id in spw}
267 265 pool = sorted(pool, key=lambda s: s.id)
268 266
269 267 return pool
270 268
271 269 def get_data_description(self, spw=None, id=None):
272 270 match = None
391 389 except:
392 390 LOG.log(log_level, 'Could not translate spw name %s to ID. Trying frequency matching heuristics.' %
393 391 self.representative_window)
394 392
395 393 if target_spwid is not None:
396 394 return (target_source_name, target_spwid)
397 395
398 396 # Get the representative bandwidth
399 397 # Return if there isn't one
400 398 if not self.representative_target[2]:
401 - LOG.warning('Undefined representative bandwidth for data set %s' % self.basename)
399 + if self.antenna_array.name not in ('VLA', 'EVLA'):
400 + LOG.warning('Undefined representative bandwidth for data set %s' % self.basename)
402 401 return (target_source_name, None)
402 +
403 403 target_bw = cme.frequency('TOPO',
404 404 qa.quantity(qa.getvalue(self.representative_target[2]),
405 405 qa.getunit(self.representative_target[2])))
406 406
407 407 # Get the representative frequency
408 408 # Return if there isn't one
409 409 if not self.representative_target[1]:
410 410 LOG.warning('Undefined representative frequency for data set %s' % self.basename)
411 411 return (target_source_name, None)
412 412 target_frequency = cme.frequency('BARY',
591 591 if not science_windows_only:
592 592 return spws
593 593
594 594 if self.antenna_array.name == 'ALMA':
595 595 science_intents = {'TARGET', 'PHASE', 'BANDPASS', 'AMPLITUDE',
596 596 'POLARIZATION', 'POLANGLE', 'POLLEAKAGE',
597 597 'CHECK', 'DIFFGAINREF', 'DIFFGAINSRC'}
598 598 return [w for w in spws if w.num_channels not in self.exclude_num_chans
599 599 and not science_intents.isdisjoint(w.intents)]
600 600
601 - if self.antenna_array.name == 'VLA' or self.antenna_array.name == 'EVLA':
601 + if self.antenna_array.name in ('VLA', 'EVLA'):
602 602 science_intents = {'TARGET', 'PHASE', 'BANDPASS', 'AMPLITUDE',
603 603 'POLARIZATION', 'POLANGLE', 'POLLEAKAGE',
604 604 'CHECK'}
605 605 return [w for w in spws if w.num_channels not in self.exclude_num_chans
606 606 and not science_intents.isdisjoint(w.intents) and 'POINTING' not in w.intents]
607 607
608 608 if self.antenna_array.name == 'NRO':
609 609 science_intents = {'TARGET'}
610 610 return [w for w in spws if not science_intents.isdisjoint(w.intents)]
611 611
1359 1359 colnames = table.colnames()
1360 1360 return colnames
1361 1361
1362 1362 def data_colnames(self):
1363 1363 """
1364 1364 Return all data column names for this MS.
1365 1365 """
1366 1366 return [colname for colname in self.all_colnames() if colname in ('DATA', 'FLOAT_DATA', 'CORRECTED_DATA')]
1367 1367
1368 1368 def set_data_column(self, dtype: DataType, column: str,
1369 - source: Optional[str]=None,
1370 - spw: Optional[str]=None,
1371 - overwrite: bool=False):
1369 + source: Optional[str] = None,
1370 + spw: Optional[str] = None,
1371 + overwrite: bool = False):
1372 1372 """
1373 1373 Set data type and column.
1374 1374
1375 1375 Set data type and column to MS domain object and record the available
1376 1376 data types per (source,spw) tuple. If source or spw are unset, they
1377 1377 will be expanded to all available values.
1378 1378
1379 1379 Args:
1380 1380 dtype: data type to set
1381 1381 column: name of column in MS associated with the data type
1393 1393 column is already assigned to a type and would not
1394 1394 be overwritten.
1395 1395 """
1396 1396 # Check existence of the column
1397 1397 colnames = self.data_colnames()
1398 1398 if column not in colnames:
1399 1399 raise ValueError('Column {} does not exist in {}'.format(column, self.basename))
1400 1400
1401 1401 # Check if data type is already associated with another column
1402 1402 if not overwrite and dtype in self.data_column and self.get_data_column(dtype) != column:
1403 - raise ValueError('Data type {} is already associated with column {} in {}'.format(dtype, self.get_data_column(dtype), self.basename))
1403 + raise ValueError('Data type {} is already associated with column {} in {}'.format(
1404 + dtype, self.get_data_column(dtype), self.basename))
1404 1405
1405 1406 # Check if column is already assigned to another data type
1406 1407 if not overwrite and column in self.data_column.values() and self.get_data_column(dtype) != column:
1407 - raise ValueError('Column {} is already associated with data type {} in {}'.format(column, [k for k,v in self.data_column.items() if v == column][0], self.basename))
1408 -
1409 - # Update data types per (source,spw) selection
1410 - if source is None:
1411 - source_names = ','.join(utils.dequote(s.name) for s in self.sources)
1412 - else:
1413 - # Check for empty or blank strings
1414 - if source.strip():
1415 - source_names = ','.join(utils.dequote(s.strip()) for s in source.split(','))
1416 - else:
1417 - source_names = ','.join(utils.dequote(s.name) for s in self.sources)
1408 + raise ValueError('Column {} is already associated with data type {} in {}'.format(
1409 + column, [k for k, v in self.data_column.items() if v == column][0], self.basename))
1418 1410
1419 - if spw is None:
1420 - spw_ids = ','.join(str(s.id) for s in self.spectral_windows)
1421 - else:
1422 - # Check for empty or blank strings
1423 - if spw.strip():
1424 - spw_ids = spw
1425 - else:
1426 - spw_ids = ','.join(str(s.id) for s in self.spectral_windows)
1411 + source_name_list = self._source_select_to_list(source)
1412 + spw_id_list = self._spw_select_to_list(spw)
1427 1413
1428 - for source_name in source_names.split(','):
1429 - for spw_id in map(int, range_to_list(spw_ids)):
1414 + # Update data types per (source,spw) selection
1415 + for source_name in source_name_list:
1416 + for spw_id in spw_id_list:
1430 1417 key = (source_name, spw_id)
1431 1418 if key in self.data_types_per_source_and_spw:
1432 1419 if dtype not in self.data_types_per_source_and_spw[key]:
1433 1420 self.data_types_per_source_and_spw[key].append(dtype)
1434 1421 else:
1435 1422 self.data_types_per_source_and_spw[key] = [dtype]
1436 1423
1437 1424 # Check for existing column registration and remove it
1438 - column_keys = [k for k,v in self.data_column.items() if v == column]
1439 - if column_keys!= []:
1425 + column_keys = [k for k, v in self.data_column.items() if v == column]
1426 + if column_keys != []:
1440 1427 for k in column_keys:
1441 - del(self.data_column[k])
1428 + del (self.data_column[k])
1442 1429
1443 1430 # Update MS domain object
1444 1431 if dtype not in self.data_column:
1445 1432 self.data_column[dtype] = column
1446 1433 LOG.info('Updated data column information of {}. Set {} to column {}'.format(self.basename, dtype, column))
1447 1434
1448 - def get_data_column(self, dtype: DataType, source: Optional[str]=None, spw: Optional[str]=None) -> Optional[str]:
1435 + def get_data_column(self, dtype: DataType, source: Optional[str] = None, spw: Optional[str] = None) -> Optional[str]:
1449 1436 """
1450 1437 Return the column name associated with a DataType in an MS domain object.
1451 1438
1452 1439 Args:
1453 1440 dtype: DataType to fetch column name for
1454 - source: Comma separated list of source names to filter for.
1455 - spw: Comma separated list of real spw IDs to filter for.
1441 + source: source names (comma separated name selection string) to filter for.
1442 + If unset, all sources will be used.
1443 + spw: spectral windows (comma separated real spw ID selection string) to filter for.
1444 + If unset, all real spw IDs will be used.
1456 1445
1457 1446 If source and spw are both unset, the method will just look
1458 1447 at the MS data type and column information. If one or both
1459 1448 parameters are set, it will require all (source,spw)
1460 1449 combinations to have data of the requested data type.
1461 1450
1462 1451 Returns:
1463 1452 A name of column of a dtype. Returns None if dtype is not defined
1464 1453 in the MS.
1465 1454 """
1466 1455 if dtype not in self.data_column.keys():
1467 1456 return None
1468 1457
1469 1458 if source is None and spw is None:
1470 1459 return self.data_column[dtype]
1471 1460
1472 - if source is None:
1473 - source_names = ','.join(utils.dequote(s.name) for s in self.sources)
1474 - else:
1475 - source_names = ','.join(utils.dequote(s.strip()) for s in source.split(','))
1476 -
1477 - if spw is None:
1478 - spw_ids = ','.join(str(s.id) for s in self.spectral_windows)
1479 - else:
1480 - spw_ids = spw
1461 + source_name_list = self._source_select_to_list(source)
1462 + spw_id_list = self._spw_select_to_list(spw)
1481 1463
1482 1464 # Check all (source,spw) combinations
1483 1465 data_exists_for_all_source_spw_combinations = True
1484 - for source_name in source_names.split(','):
1485 - for spw_id in map(int, spw_ids.split(',')):
1466 + for source_name in source_name_list:
1467 + for spw_id in spw_id_list:
1486 1468 key = (source_name, spw_id)
1487 1469 if dtype not in self.data_types_per_source_and_spw.get(key, []):
1488 1470 data_exists_for_all_source_spw_combinations = False
1489 1471
1490 1472 if data_exists_for_all_source_spw_combinations:
1491 1473 return self.data_column[dtype]
1492 1474 else:
1493 1475 return None
1494 1476
1495 - def get_data_type(self, column: str, source: Optional[str]=None, spw: Optional[str]=None) -> Optional[DataType]:
1477 + def get_data_type(self, column: str, source: Optional[str] = None, spw: Optional[str] = None) -> Optional[DataType]:
1496 1478 """
1497 1479 Return the DataType associated with a column in an MS domain object.
1498 1480
1499 1481 Args:
1500 1482 column: name of column in MS
1501 - source: Comma separated list of source names to filter for.
1502 - spw: Comma separated list of real spw IDs to filter for.
1483 + source: source names (comma separated name selection string) to filter for.
1484 + If unset, all sources will be used.
1485 + spw: spectral windows (comma separated real spw ID selection string) to filter for.
1486 + If unset, all real spw IDs will be used.
1503 1487
1504 1488 If source and spw are both unset, the method will just look
1505 1489 at the MS data type and column information. If one or both
1506 1490 parameters are set, it will require all (source,spw)
1507 1491 combinations to have data of the requested data type.
1508 1492
1509 1493 Returns:
1510 1494 The DataType associated with the column name. Returns None
1511 1495 if dtype is not defined in the MS or in the source/spw
1512 1496 selection.
1513 1497 """
1514 1498 if column not in self.data_column.values():
1515 1499 return None
1516 1500
1517 1501 # Invert dictionary. This should not lead to wrong mappings
1518 1502 # because data types and columns have a 1:1 relation.
1519 1503 data_type = {v: k for k, v in self.data_column.items()}
1520 1504
1521 1505 if source is None and spw is None:
1522 1506 return data_type[column]
1523 1507
1524 - if source is None:
1525 - source_names = ','.join(utils.dequote(s.name) for s in self.sources)
1526 - else:
1527 - source_names = ','.join(utils.dequote(s.strip()) for s in source.split(','))
1528 -
1529 - if spw is None:
1530 - spw_ids = ','.join(str(s.id) for s in self.spectral_windows)
1531 - else:
1532 - spw_ids = spw
1508 + source_name_list = self._source_select_to_list(source)
1509 + spw_id_list = self._spw_select_to_list(spw)
1533 1510
1534 1511 # Check all (source,spw) combinations
1535 1512 data_exists_for_all_source_spw_combinations = True
1536 1513 column_dtype = data_type[column]
1537 - for source_name in source_names.split(','):
1538 - for spw_id in map(int, spw_ids.split(',')):
1514 + for source_name in source_name_list:
1515 + for spw_id in spw_id_list:
1539 1516 key = (source_name, spw_id)
1540 1517 if column_dtype not in self.data_types_per_source_and_spw.get(key, []):
1541 1518 data_exists_for_all_source_spw_combinations = False
1542 1519
1543 1520 if data_exists_for_all_source_spw_combinations:
1544 1521 return column_dtype
1545 1522 else:
1546 1523 return None
1524 +
1525 + def _source_select_to_list(self, source_select: Union[str, None]) -> List[str]:
1526 + """
1527 + Convert a CASA-style source selection string to a list of source names.
1528 +
1529 + Args:
1530 + source_select: source string to convert
1531 +
1532 + Returns:
1533 + A list of source names (as strings)
1534 + """
1535 +
1536 + if source_select is None or not source_select.strip():
1537 + # if None or empty or blank selection string, use all sources
1538 + source_list = [utils.dequote(s.name) for s in self.sources]
1539 + else:
1540 + source_list = [utils.dequote(s.strip()) for s in source_select.split(',')]
1541 +
1542 + return source_list
1543 +
1544 + def _spw_select_to_list(self, spw_select: Union[str, None]) -> List[int]:
1545 + """
1546 + Convert a CASA-style spw selection string to a list of spw IDs.
1547 +
1548 + Args:
1549 + spw_select: spw selection string to convert
1550 +
1551 + Returns:
1552 + A list of spw IDs (as integers)
1553 + """
1554 +
1555 + if spw_select is None or not spw_select.strip():
1556 + # if None or empty or blank selection string, use all spws
1557 + spw_list = [s.id for s in self.spectral_windows]
1558 + else:
1559 + spw_list = utils.range_to_list(spw_select)
1560 +
1561 + return spw_list

Everything looks good. We'll let you know here if there's anything you should know about.

Add shortcut