//# FlagDataHandler.h: This file contains the interface definition of the FlagDataHandler class. //# //# CASA - Common Astronomy Software Applications (http://casa.nrao.edu/) //# Copyright (C) Associated Universities, Inc. Washington DC, USA 2011, All rights reserved. //# Copyright (C) European Southern Observatory, 2011, All rights reserved. //# //# This library is free software; you can redistribute it and/or //# modify it under the terms of the GNU Lesser General Public //# License as published by the Free software Foundation; either //# version 2.1 of the License, or (at your option) any later version. //# //# This library is distributed in the hope that it will be useful, //# but WITHOUT ANY WARRANTY, without even the implied warranty of //# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU //# Lesser General Public License for more details. //# //# You should have received a copy of the GNU Lesser General Public //# License along with this library; if not, write to the Free Software //# Foundation, Inc., 59 Temple Place, Suite 330, Boston, //# MA 02111-1307 USA //# $Id: $ #ifndef FLAGDATAHANDLER_H_ #define FLAGDATAHANDLER_H_ // Measurement Set selection #include <ms/MeasurementSets/MeasurementSet.h> #include <ms/MSSel/MSSelection.h> #include <ms/MeasurementSets/MSAntennaColumns.h> #include <ms/MeasurementSets/MSFieldColumns.h> #include <ms/MeasurementSets/MSPolColumns.h> #include <ms/MeasurementSets/MSSpWindowColumns.h> #include <ms/MeasurementSets/MSProcessorColumns.h> // VI/VB infrastructure #include <msvis/MSVis/StokesVector.h> #include <msvis/MSVis/VisBuffer2.h> #include <msvis/MSVis/VisibilityIterator2.h> // TVI framework #include <msvis/MSVis/AveragingVi2Factory.h> #include <msvis/MSVis/AveragingTvi2.h> // .casarc interface #include <casa/System/AipsrcValue.h> // Records interface #include <casa/Containers/Record.h> // System utilities (for profiling macros) #include <casa/OS/HostInfo.h> #include <sys/time.h> // casacore::Data mapping #include <algorithm> #include <map> #define STARTCLOCK timeval start,stop; double elapsedTime; if (profiling_p) gettimeofday(&start,0); #define STOPCLOCK if (profiling_p) \ {\ gettimeofday(&stop,0);\ elapsedTime = (stop.tv_sec-start.tv_sec)*1000.0+(stop.tv_usec-start.tv_usec)/1000.0;\ *logger_p << casacore::LogIO::DEBUG2 << "FlagDataHandler::" << __FUNCTION__ << " Executed in: " << elapsedTime << " ms, Memory free: " << casacore::HostInfo::memoryFree( )/1024.0 << " MB" << casacore::LogIO::POST;\ } namespace casa { //# NAMESPACE CASA - BEGIN // Type definitions typedef std::map< std::pair<casacore::Int,casacore::Int>,std::vector<casacore::uInt> >::iterator antennaPairMapIterator; typedef std::map< casacore::Double,std::vector<casacore::uInt> >::iterator subIntegrationMapIterator; typedef std::map< casacore::uShort,casacore::uShort >::iterator polartizationMapIterator; typedef std::map< std::pair<casacore::Int,casacore::Int>,std::vector<casacore::uInt> > antennaPairMap; typedef std::map< casacore::Double,std::vector<casacore::uInt> > subIntegrationMap; typedef std::map< casacore::uShort,casacore::uShort > polarizationMap; typedef std::map< casacore::uInt,casacore::String > polarizationIndexMap; typedef std::vector< vector<casacore::Double> > antennaPointingMap; typedef std::map< casacore::Int,vector<casacore::Double> > scanStartStopMap; typedef std::map< casacore::Int,casacore::Double > lambdaMap; const casacore::Complex ImaginaryUnit = casacore::Complex(0,1); // We need to have the CubeView definition here because its type is used by FlagDataHandler class template<class T> class CubeView { public: CubeView(casacore::Cube<T> *parentCube, std::vector<casacore::uInt> *rows = NULL, std::vector<casacore::uInt> *channels = NULL, std::vector<casacore::uInt> *polarizations = NULL) { parentCube_p = parentCube; casacore::IPosition baseCubeShape = parentCube_p->shape(); reducedLength_p = casacore::IPosition(3); if (((polarizations != NULL) and (polarizations->size() > 0)) and ((channels != NULL) and (channels->size() > 0)) and ((rows != NULL) and (rows->size() > 0))) { access_p = &CubeView::accessMapped; } else if (((polarizations != NULL) and (polarizations->size() > 0)) and ((channels != NULL) and (channels->size() > 0))) { access_p = &CubeView::accessIndex12Mapped; } else if (((polarizations != NULL) and (polarizations->size() > 0)) and ((rows != NULL) and (rows->size() > 0))) { access_p = &CubeView::accessIndex13Mapped; } else if (((channels != NULL) and (channels->size() > 0)) and ((rows != NULL) and (rows->size() > 0))) { access_p = &CubeView::accessIndex23Mapped; } else if ((polarizations != NULL) and (polarizations->size() > 0)) { access_p = &CubeView::accessIndex1Mapped; } else if ((channels != NULL) and (channels->size() > 0)) { access_p = &CubeView::accessIndex2Mapped; } else if ((rows != NULL) and (rows->size() > 0)) { access_p = &CubeView::accessIndex3Mapped; } else { access_p = &CubeView::accessUnmapped; } if ((polarizations != NULL) and (polarizations->size() > 0)) { polarizations_p = polarizations; reducedLength_p(0) = polarizations_p->size(); } else { polarizations_p = NULL; reducedLength_p(0) = baseCubeShape(0); } if ((channels != NULL) and (channels->size() > 0)) { channels_p = channels; reducedLength_p(1) = channels_p->size(); } else { channels_p = NULL; reducedLength_p(1) = baseCubeShape(1); } if ((rows != NULL) and (rows->size() > 0)) { rows_p = rows; reducedLength_p(2) = rows_p->size(); } else { rows_p = NULL; reducedLength_p(2) = baseCubeShape(2); } } T &operator()(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { return (*this.*access_p)(i1,i2,i3); } const casacore::IPosition &shape() const { return reducedLength_p; } void shape(casacore::Int &s1, casacore::Int &s2, casacore::Int &s3) const { s1 = reducedLength_p(0); s2 = reducedLength_p(1); s3 = reducedLength_p(2); return; } protected: vector<casacore::uInt> *createIndex(casacore::uInt size) { vector<casacore::uInt> *index = new vector<casacore::uInt>(size); index->clear(); for (casacore::uInt i=0; i<size; i++ ) { index->push_back(i); } return index; } T &accessUnmapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { return parentCube_p->at(i1,i2,i3); } T &accessMapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i1_index = polarizations_p->at(i1); casacore::uInt i2_index = channels_p->at(i2); casacore::uInt i3_index = rows_p->at(i3); return parentCube_p->at(i1_index,i2_index,i3_index); } T &accessIndex1Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i1_index = polarizations_p->at(i1); return parentCube_p->at(i1_index,i2,i3); } T &accessIndex2Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i2_index = channels_p->at(i2); return parentCube_p->at(i1,i2_index,i3); } T &accessIndex3Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i3_index = rows_p->at(i3); return parentCube_p->at(i1,i2,i3_index); } T &accessIndex12Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i1_index = polarizations_p->at(i1); casacore::uInt i2_index = channels_p->at(i2); return parentCube_p->at(i1_index,i2_index,i3); } T &accessIndex13Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i1_index = polarizations_p->at(i1); casacore::uInt i3_index = rows_p->at(i3); return parentCube_p->at(i1_index,i2,i3_index); } T &accessIndex23Mapped(casacore::uInt i1, casacore::uInt i2, casacore::uInt i3) { casacore::uInt i2_index = channels_p->at(i2); casacore::uInt i3_index = rows_p->at(i3); return parentCube_p->at(i1,i2_index,i3_index); } private: casacore::Cube<T> *parentCube_p; std::vector<casacore::uInt> *rows_p; std::vector<casacore::uInt> *channels_p; std::vector<casacore::uInt> *polarizations_p; casacore::IPosition reducedLength_p; T &(casa::CubeView<T>::*access_p)(casacore::uInt,casacore::uInt,casacore::uInt); }; template<class T> class VectorView { public: VectorView(casacore::Vector<T> *parentVector, std::vector<casacore::uInt> *rows = NULL) { casacore::IPosition parentVectorShape = parentVector->shape(); parentVector_p = parentVector; reducedLength_p = casacore::IPosition(1); if ((rows != NULL) and (rows->size() > 0)) { access_p = &VectorView::accessMapped; } else { access_p = &VectorView::accessUnmapped; } if ((rows != NULL) and (rows->size() > 0)) { rows_p = rows; reducedLength_p(0) = rows_p->size(); } else { rows_p = NULL; reducedLength_p(0) = parentVectorShape(0); } } T &operator()(casacore::uInt i1) { return (*this.*access_p)(i1); } const casacore::IPosition &shape() const { return reducedLength_p; } void shape(casacore::Int &s1) const { s1 = reducedLength_p(0); return; } protected: vector<casacore::uInt> *createIndex(casacore::uInt size) { vector<casacore::uInt> *index = new vector<casacore::uInt>(size); index->clear(); for (casacore::uInt i=0; i<size; i++ ) { index->push_back(i); } return index; } T &accessUnmapped(casacore::uInt i1) { return parentVector_p->operator()(i1); } T &accessMapped(casacore::uInt i1) { casacore::uInt i1_index = rows_p->at(i1); return parentVector_p->operator()(i1_index); } private: casacore::Vector<T> *parentVector_p; std::vector<casacore::uInt> *rows_p; casacore::IPosition reducedLength_p; T &(casa::VectorView<T>::*access_p)(casacore::uInt); }; class VisMapper { typedef casacore::Complex (casa::VisMapper::*corrProduct)(casacore::uInt,casacore::uInt); public: enum calsolutions { CALSOL1=casacore::Stokes::NumberOfTypes, CALSOL2, CALSOL3, CALSOL4 }; VisMapper(casacore::String expression,polarizationMap *polMap,CubeView<casacore::Complex> *leftVis,CubeView<casacore::Complex> *rightVis=NULL); VisMapper(casacore::String expression,polarizationMap *polMap); ~VisMapper(); void setParentCubes(CubeView<casacore::Complex> *leftVis,CubeView<casacore::Complex> *rightVis=NULL); vector< vector<casacore::uInt> > getSelectedCorrelations() { return selectedCorrelations_p;} vector< string > getSelectedCorrelationStrings() { return selectedCorrelationStrings_p;} casacore::Float operator()(casacore::uInt chan, casacore::uInt row); casacore::Float operator()(casacore::uInt pol, casacore::uInt chan, casacore::uInt row); // Direct access to the complex correlation product casacore::Complex correlationProduct(casacore::uInt pol, casacore::uInt chan, casacore::uInt row); // NOTE: reducedLength_p is defined as [chan,row,pol] const casacore::IPosition &shape() const { return reducedLength_p; } void shape(casacore::Int &chan, casacore::Int &row) const { chan = reducedLength_p(0); row = reducedLength_p(1); return; } void shape(casacore::Int &pol, casacore::Int &chan, casacore::Int &row) const { chan = reducedLength_p(0); row = reducedLength_p(1); pol = reducedLength_p(2); return; } protected: void setExpressionMapping(casacore::String expression,polarizationMap *polMap); casacore::Float real(casacore::Complex val) {return val.real();} casacore::Float imag(casacore::Complex val) {return val.imag();} casacore::Float abs(casacore::Complex val) {return std::abs(val);} casacore::Float arg(casacore::Complex val) {return std::arg(val);} casacore::Float norm(casacore::Complex val) {return std::norm(val);} casacore::Complex leftVis(casacore::uInt pol, casacore::uInt chan, casacore::uInt row); casacore::Complex diffVis(casacore::uInt pol, casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_i(casacore::uInt pol, casacore::uInt chan); casacore::Complex stokes_q(casacore::uInt pol, casacore::uInt chan); casacore::Complex stokes_u(casacore::uInt pol, casacore::uInt chan); casacore::Complex stokes_v(casacore::uInt pol, casacore::uInt chan); casacore::Complex linear_xx(casacore::uInt pol, casacore::uInt chan); casacore::Complex linear_yy(casacore::uInt pol, casacore::uInt chan); casacore::Complex linear_xy(casacore::uInt pol, casacore::uInt chan); casacore::Complex linear_yx(casacore::uInt pol, casacore::uInt chan); casacore::Complex circular_rr(casacore::uInt pol, casacore::uInt chan); casacore::Complex circular_ll(casacore::uInt pol, casacore::uInt chan); casacore::Complex circular_rl(casacore::uInt pol, casacore::uInt chan); casacore::Complex circular_lr(casacore::uInt pol, casacore::uInt chan); casacore::Complex stokes_i_from_linear(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_q_from_linear(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_u_from_linear(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_v_from_linear(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_i_from_circular(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_q_from_circular(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_u_from_circular(casacore::uInt chan, casacore::uInt row); casacore::Complex stokes_v_from_circular(casacore::uInt chan, casacore::uInt row); casacore::Complex calsol1(casacore::uInt chan, casacore::uInt row); casacore::Complex calsol2(casacore::uInt chan, casacore::uInt row); casacore::Complex calsol3(casacore::uInt chan, casacore::uInt row); casacore::Complex calsol4(casacore::uInt chan, casacore::uInt row); private: casacore::Float (casa::VisMapper::*applyVisExpr_p)(casacore::Complex); casacore::Complex (casa::VisMapper::*getVis_p)(casacore::uInt,casacore::uInt,casacore::uInt); casacore::Complex (casa::VisMapper::*getCorr_p)(casacore::uInt,casacore::uInt); vector<corrProduct> selectedCorrelationProducts_p; vector< vector<casacore::uInt> > selectedCorrelations_p; vector<string> selectedCorrelationStrings_p; CubeView<casacore::Complex> *leftVis_p; CubeView<casacore::Complex> *rightVis_p; casacore::IPosition reducedLength_p; polarizationMap *polMap_p; casacore::String expression_p; }; class FlagMapper { public: FlagMapper(casacore::Bool flag, vector < vector<casacore::uInt> > selectedCorrelations, CubeView<casacore::Bool> *commonFlagsView, CubeView<casacore::Bool> *originalFlagsView, CubeView<casacore::Bool> *privateFlagsView=NULL, VectorView<casacore::Bool> *commonFlagRowView=NULL, VectorView<casacore::Bool> *originalFlagRowView=NULL, VectorView<casacore::Bool> *privateFlagRowView=NULL); FlagMapper(casacore::Bool flag,vector< vector<casacore::uInt> > selectedCorrelations); ~FlagMapper(); void setParentCubes(CubeView<casacore::Bool> *commonFlagsView,CubeView<casacore::Bool> *originalFlagsView,CubeView<casacore::Bool> *privateFlagsView=NULL); void setParentFlagRow(VectorView<casacore::Bool> *commonFlagRowView,VectorView<casacore::Bool> *originalFlagRowView,VectorView<casacore::Bool> *privateFlagRowView=NULL); void applyFlag(casacore::uInt chan, casacore::uInt row); void applyFlag(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); void applyFlagRow(casacore::uInt row); void applyFlagInRow(casacore::uInt row); casacore::Bool getOriginalFlags(casacore::uInt chan, casacore::uInt row); casacore::Bool getModifiedFlags(casacore::uInt chan, casacore::uInt row); casacore::Bool getPrivateFlags(casacore::uInt chan, casacore::uInt row); casacore::Bool getOriginalFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); casacore::Bool getModifiedFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); casacore::Bool getPrivateFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); // These methods are needed for flag extension void setModifiedFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); void setPrivateFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); casacore::Bool getOriginalFlagRow(casacore::uInt row); casacore::Bool getModifiedFlagRow(casacore::uInt row); casacore::Bool getPrivateFlagRow(casacore::uInt row); const casacore::IPosition &shape() const { return reducedLength_p; } void shape(casacore::Int &chan, casacore::Int &row) const { chan = reducedLength_p(0); row = reducedLength_p(1); return; } void shape(casacore::Int &pol, casacore::Int &chan, casacore::Int &row) const { chan = reducedLength_p(0); row = reducedLength_p(1); pol = reducedLength_p(2); return; } vector< vector<casacore::uInt> > getSelectedCorrelations() {return selectedCorrelations_p;} void activateCheckMode() {applyFlag_p = &FlagMapper::checkCommonFlags;} casacore::uInt nSelectedCorrelations() {return nSelectedCorrelations_p;} casacore::uInt flagsPerRow() {return flagsPerRow_p;} protected: void setExpressionMapping(vector< vector<casacore::uInt> > selectedCorrelations); // Apply flags to common flag cube void applyCommonFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); // Apply flags to common and private flag cubes void applyPrivateFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); // Apply flags to common and private flag cubes void checkCommonFlags(casacore::uInt pol, casacore::uInt channel, casacore::uInt row); // Apply flags to common flag rows void applyCommonFlagRow(casacore::uInt row); // Apply flags to common and private flag rows void applyPrivateFlagRow(casacore::uInt row); private: casacore::Bool flag_p; casacore::IPosition reducedLength_p; CubeView<casacore::Bool> *commonFlagsView_p; CubeView<casacore::Bool> *originalFlagsView_p; CubeView<casacore::Bool> *privateFlagsView_p; VectorView<casacore::Bool> *commonFlagRowView_p; VectorView<casacore::Bool> *originalFlagRowView_p; VectorView<casacore::Bool> *privateFlagRowView_p; vector< vector<casacore::uInt> > selectedCorrelations_p; casacore::uInt nSelectedCorrelations_p; casacore::uInt flagsPerRow_p; void (casa::FlagMapper::*applyFlag_p)(casacore::uInt,casacore::uInt,casacore::uInt); void (casa::FlagMapper::*applyFlagRow_p)(casacore::uInt); }; // <summary> // A top level class defining the data handling interface for the flagging module // </summary> // // <use visibility=export> // // <prerequisite> // <li> <linkto class="VisBuffer:description">VisBuffer</linkto> // <li> <linkto class="FlagMapper:description">FlagMapper</linkto> // <li> <linkto class="VisMapper:description">VisMapper</linkto> // </prerequisite> // // <etymology> // FlagDataHandler stands for generic data handling (i.e. MSs, CalTables, ...) specific to the flagging module // </etymology> // // <synopsis> // // This is a top-level class defining the data handling interface for the flagging module. // There are various methods (virtual) that must be re-implemented by the specific derived // classes (e.g. FlagMSHandler, FlagCalTableHandler). These methods essentially cover: // // - casacore::Table access (i.e. open/close/iteration/rw) // // - casacore::Table selection (i.e. expression parsing, sub-table selection) // // Additionally there are public non-virtual methods to: // // - Set configuration parameters (pre-sort columns, data selection, time interval, async I/O) // // - Enable and access different kinds of mapping (baselines, correlation products, antenna pointing) // // Also at this top level there are public members which are used by the FlagAgent classes, // so that there is no dependency with the specific implementation classes (e.g. FlagMsHandler, // FlagCalTableHandler), and thus no re-implementation is required at the FlagAgent level. // // - VisBuffer (for accessing the data and meta data columns) // // - Chunk and casacore::Table flag counters (for statistics) // // </synopsis> // // <motivation> // The motivation for the FlagDataHandler class is having all the data operations encapsulated // in one single class, with a common interface for all types of tables (MSs, CalTables, SingleDish), // so that no specific specific table type implementation has to be done at the FlagAgent level. // </motivation> // // <example> // <srcblock> // // // The following code sets up a FlagDataHandler with either CalTable or casacore::MS implementation and // // iterates through the table applying a clip agent, flushing the flags, and extracting summaries. // // // IMPORTANT NOTE: // // The order of FlagDataHandler and FlagAgent initialization is critical to have everything right, // // in particular data selection must happen before initializing FlagAgents, and iterator generation // // must be done after initializing FlagAgents, so that each agent can communicate to the FlagDataHandler // // which columns have to be pre-fetched (async i/o or parallel mode), and what mapping options are necessary. // // // NOTE ON ASYNC I/O: // // Asyncnronous I/O is only enabled for casacore::MS-type tables, but not for CalTables, and it is necessary to switch // // it on before generating the iterators. Something else to take into account, is that there are 2 global // // switches at .casarc level which invalidate the application code selection: // // // // VisibilityIterator.async.enabled rules over // // |-> FlagDataHandler.asyncio, and in turns rules over // // |-> FlagDataHandler.enableAsyncIO(true) // // // Identify table type // casacore::Table table(msname_p,casacore::TableLock(TableLock::AutoNoReadLocking)); // casacore::TableInfo& info = table.tableInfo(); // casacore::String type=info.type(); // // // Create derived FlagDataHandler object with corresponding implementation // FlagDataHandler *fdh_p = NULL; // if (type == "Measurement Set") // { // fdh_p = new FlagMSHandler(msname_p, FlagDataHandler::COMPLETE_SCAN_UNMAPPED, timeInterval_p); // } // else // { // fdh_p = new FlagCalTableHandler(msname_p, FlagDataHandler::COMPLETE_SCAN_UNMAPPED, timeInterval_p); // } // // // NOTE: It is also possible to independently set the iteration approach via the setIterationApproach // // method which accepts the following modes, defined in the FlagDataHandler iteration enumeration // // // // COMPLETE_SCAN_MAPPED: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, SCAN_NUMBER, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // - Generate baseline maps (to iterate trough rows with the same antenna1, antenna2) // // - Generate sub-integration maps (to iterate trough rows with the same timestamp) // // COMPLETE_SCAN_MAP_SUB_INTEGRATIONS_ONLY: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, SCAN_NUMBER, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // * Don't generate baseline maps // // - Generate sub-integration maps (to iterate trough rows with the same timestamp) // // COMPLETE_SCAN_MAP_ANTENNA_PAIRS_ONLY: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, SCAN_NUMBER, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // - Generate baseline maps (to iterate trough rows with the same antenna1, antenna2) // // * Don't generate sub-integration maps // // COMPLETE_SCAN_UNMAPPED: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, SCAN_NUMBER, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // * Don't generate baseline maps // // * Don't generate sub-integration maps // // COMBINE_SCANS_MAPPED: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // - Generate baseline maps (to iterate trough rows with the same antenna1, antenna2) // // - Generate sub-integration maps (to iterate trough rows with the same timestamp) // // COMBINE_SCANS_MAP_SUB_INTEGRATIONS_ONLY: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // * Don't generate baseline maps // // - Generate sub-integration maps (to iterate trough rows with the same timestamp) // // COMBINE_SCANS_MAP_ANTENNA_PAIRS_ONLY: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // - Generate baseline maps (to iterate trough rows with the same antenna1, antenna2) // // * Don't generate sub-integration maps // // COMBINE_SCANS_UNMAPPED: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // * Don't generate baseline maps // // * Don't generate sub-integration maps // // ANTENNA_PAIR: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, ANTENNA1, ANTENNA2, DATA_DESC_ID and TIME // // - Group all time steps together, so that there is no sub-chunk iteration // // * Don't generate baseline maps (they are not necessary because the chunks have constant ANTENNA1,ANTENNA2) // // * Don't generate sub-integration maps // // SUB_INTEGRATION: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, SCAN_NUMBER, FIELD_ID, DATA_DESC_ID and TIME // // - Don't group all time steps together, so it is necessary to add an inner sub-chunk iteration loop // // * Don't generate baseline maps (it is not possible because not all the rows corresponding to a given baseline are available) // // * Don't generate sub-integration maps (they are not necessary because the sub-chunks have constant TIME) // // ARRAY_FIELD: // // - casacore::Sort by OBSERVATION_ID, ARRAY_ID, FIELD_ID, DATA_DESC_ID and TIME // // - Don't group all time steps together, so it is necessary to add an inner sub-chunk iteration loop // // * Don't generate baseline maps (it is not possible because not all the rows corresponding to a given baseline are available) // // * Don't generate sub-integration maps (they are not necessary because the sub-chunks have constant TIME) // // * NOTE: This is the iteration approach used by the old flagger framework // // // Open table // fdh_p->open(); // // // Parse data selection to Flag casacore::Data Handler // fdh_p->setDataSelection(dataSelection); // // // Select data (thus creating selected table) // fdh_p->selectData(); // // // Create flagging agent and list // casacore::Record agentConfig; // agentConfig.define("mode","clip") // FlagAgentBase *agent = FlagAgentBase::create(fdh_p,agentConfig); // FlagAgentList agentList; // agentList.push_back(agent); // // // Switch on/off async i/p // fdh_p->enableAsyncIO(true); // // // Generate table iterator // fdh_p->generateIterator(); // // // Start Flag Agent // agentList.start(); // // // Iterates over chunks (constant column values) // while (fdh_p->nextChunk()) // { // // Iterates over buffers (time steps) // while (fdh_p->nextBuffer()) // { // // Apply flags // agentList.apply(); // // Flush flag cube // fdh_p->flushFlags(); // } // // // Print end-of-chunk statistics // agentList.chunkSummary(); // } // // // Print total stats from each agent // agentList.msSummary(); // // // Stop Flag Agent // agentList.terminate(); // agentList.join(); // // // Close MS // fdh_p->close(); // // // Clear Flag Agent List // agentList.clear(); // // </srcblock> // </example> // // <example> // <srcblock> // // // The following code shows the FlagAgent-FlagDataHandler interaction works internally: // // // First of all, at construction time, each agent has to communicate to the FlagDataHandler // // which columns have to be pre-fetched (for async i/o or parallel mode) and what mapping // // options are necessary // // ...for instance in the case of FlagAgentShadow we need Ant1,Ant2,UVW,TimeCentroid and PhaseCenter: // flagDataHandler_p->preLoadColumn(vi::Antenna1); // flagDataHandler_p->preLoadColumn(vi::Antenna2); // flagDataHandler_p->preLoadColumn(vi::uvw); // flagDataHandler_p->preLoadColumn(vi::TimeCentroid); // flagDataHandler_p->preLoadColumn(vi::PhaseCenter); // // // ...and FlagAgentElevation needs to have the antenna pointing information globally available for each chunk: // flagDataHandler_p->setMapAntennaPointing(true); // // // Then, at iteration time, the FlagAgentBase class has access to the VisBuffer held // // in the FlagDataHandler, in order to retrieve the meta-data columns needed for the // // data selection engine (agent-level row filtering). // // NOTE: The VisBuffer is actually held within an auto-pointer wrapper, // // thus there is an additional get() involved when accessing it. // // if (spwList_p.size()) // { // if (!find(spwList_p,visibilityBuffer_p->get()->spectralWindow())) return false; // } // // // The sorting columns used for the iteration are also accessible to optimize the selection engine: // // (e.g.: If scan is constant check only 1st row) // if ( (scanList_p.size()>0) and (find(flagDataHandler_p->sortOrder_p,casacore::MS::SCAN_NUMBER)==true) ) // { // if (!find(scanList_p,visibilityBuffer_p->get()->scan()[0])) return false; // } // // // Once that chunk/rows are evaluated as eligible for the flagging process // // by the data selection engine, the previously booked maps at construction // // time can be access in order to iterate trough the data as desired: // // e.g.: Baseline (antenna pairs) iteration // for (myAntennaPairMapIterator=flagDataHandler_p->getAntennaPairMap()->begin(); // myAntennaPairMapIterator != flagDataHandler_p->getAntennaPairMap()->end(); // ++myAntennaPairMapIterator) // { // // NOTE: The following code is also encapsulated in the FlagAgentBase::processAntennaPair(casacore::Int antenna1,casacore::Int antenna2) code // // // From the antenna map we can retrieve the rows corresponding to the baseline defined by the antenna pair // vector<casacore::uInt> baselineRows = (*flagDataHandler_p->getAntennaPairMap())[std::make_pair(antennaPair.first,antennaPair.second)]; // // // This rows can be now inserted in the mapper classes (VisMapper and FlagMapper using the CubeView<T> template class) // VisMapper visibilitiesMap = VisMapper(expression_p,flagDataHandler_p->getPolarizationMap()); // FlagMapper flagsMap = FlagMapper(flag_p,visibilitiesMap.getSelectedCorrelations()); // setVisibilitiesMap(antennaRows,&visibilitiesMap); // setFlagsMap(antennaRows,&flagsMap); // } // // // Finally, after flagging time, the FlagAgent can communicate to the FlagDataHandler // // that the modified FlagCube has to be flushed to disk, this is a small but very important // // step in order to avoid unnecessary I/O activity when a chunk is not eligible for flagging // // or the auto-flagging algorithms don't produce any flags. // // // If any row was flag, then we have to flush the flagRow // if (flagRow_p) flagDataHandler_p->flushFlagRow_p = true; // // If any flag was raised, then we have to flush the flagCube // if (visBufferFlags_p>0) flagDataHandler_p->flushFlags_p = true; // // </srcblock> // </example> class FlagDataHandler { public: enum iteration { COMPLETE_SCAN_MAPPED=0, COMPLETE_SCAN_MAP_SUB_INTEGRATIONS_ONLY, COMPLETE_SCAN_MAP_ANTENNA_PAIRS_ONLY, COMPLETE_SCAN_UNMAPPED, COMBINE_SCANS_MAPPED, COMBINE_SCANS_MAP_SUB_INTEGRATIONS_ONLY, COMBINE_SCANS_MAP_ANTENNA_PAIRS_ONLY, COMBINE_SCANS_UNMAPPED, ANTENNA_PAIR, SUB_INTEGRATION, ARRAY_FIELD }; enum tableType { MEASUREMENT_SET=0, CALIBRATION_TABLE }; // Default constructor // NOTE: casacore::Time interval 0 groups all time steps together in one chunk. FlagDataHandler(string msname, casacore::uShort iterationApproach = SUB_INTEGRATION, casacore::Double timeInterval = 0); // Default destructor virtual ~FlagDataHandler(); // Common casacore::MS/CalTables public interface virtual bool open() {return false;} virtual bool close() {return false;} virtual bool selectData() {return false;} virtual bool generateIterator() {return false;} virtual bool nextChunk() {return false;} virtual bool nextBuffer() {return false;} virtual bool flushFlags() {return false;} virtual casacore::String getTableName() {return casacore::String("none");} virtual bool parseExpression(casacore::MSSelection &/*parser*/) {return true;} virtual bool checkIfColumnExists(casacore::String /*column*/) {return true;} virtual bool summarySignal() {return true;} // Set the iteration approach void setIterationApproach(casacore::uShort iterationApproach); // Set casacore::Data Selection parameters bool setDataSelection(casacore::Record record); // Set time interval (also known as ntime) void setTimeInterval(casacore::Double timeInterval); // Methods to switch on/off async i/o void enableAsyncIO(casacore::Bool enable); // Pre-Load columns (in order to avoid parallelism problems when not using // async i/o, and also to know what columns to pre-fetch in async i/o mode) void preLoadColumn(VisBufferComponent2 column); void preFetchColumns(); // Stop iterating void stopIteration() {stopIteration_p = true;}; // As requested by Urvashi R.V. provide access to the original and modified flag cubes casacore::Cube<casacore::Bool> * getModifiedFlagCube() {return &modifiedFlagCube_p;} casacore::Cube<casacore::Bool> * getOriginalFlagCube() {return &originalFlagCube_p;} casacore::Vector<casacore::Bool> * getModifiedFlagRow() {return &modifiedFlagRow_p;} casacore::Vector<casacore::Bool> * getOriginalFlagRow() {return &originalFlagRow_p;} // Functions to switch on/off mapping functions void setMapAntennaPairs(bool activated); void setMapSubIntegrations(bool activated); void setMapPolarizations(bool activated); void setMapAntennaPointing(bool activated); void setScanStartStopMap(bool activated); void setScanStartStopFlaggedMap(bool activated); void setTimeAverageIter(bool activated); void setChanAverageIter(casacore::Vector<casacore::Int> chanbin); // Accessors for the mapping functions antennaPairMap * getAntennaPairMap() {return antennaPairMap_p;} subIntegrationMap * getSubIntegrationMap() {return subIntegrationMap_p;} polarizationMap * getPolarizationMap() {return polarizationMap_p;} polarizationIndexMap * getPolarizationIndexMap() {return polarizationIndexMap_p;} antennaPointingMap * getMapAntennaPointing() {return antennaPointingMap_p;} scanStartStopMap * getMapScanStartStop() {return scanStartStopMap_p;} lambdaMap * getLambdaMap() {return lambdaMap_p;} void setProfiling(casacore::Bool value) {profiling_p=value;} // Get a casacore::Float visCube and return a casacore::Complex one casacore::Cube<casacore::Complex>& weightVisCube(); casacore::Cube<casacore::Complex> weight_spectrum_p; // Make the logger public to that we can use it from FlagAgentBase::create casacore::LogIO *logger_p; // Measurement set section casacore::String tablename_p; casacore::MSSelection *measurementSetSelection_p; casacore::Vector<casacore::String> *antennaNames_p; std::map< string, std::pair<casacore::Int,casacore::Int> > baselineToAnt1Ant2_p; std::map< std::pair<casacore::Int,casacore::Int>, string > Ant1Ant2ToBaseline_p; casacore::ROScalarMeasColumn<casacore::MPosition> *antennaPositions_p; casacore::Vector<casacore::Double> *antennaDiameters_p; casacore::Vector<casacore::String> *fieldNames_p; std::vector<casacore::String> *corrProducts_p; // RO Visibility Iterator VisBufferComponents2 *prefetchColumns_p; // Iteration counters casacore::uLong processedRows; casacore::uShort chunkNo; casacore::uShort bufferNo; // FlagDataHanler-FlagAgents interaction bool flushFlags_p; bool flushFlagRow_p; casacore::uInt64 chunkCounts_p; casacore::uInt64 progressCounts_p; casacore::uInt64 msCounts_p; casacore::uShort summaryThreshold_p; bool printChunkSummary_p; casacore::uShort tableTye_p; casacore::Bool loadProcessorTable_p; // PROCESSOR sub-table section casacore::Vector<bool> isCorrelatorType_p; bool processorTableExist_p; // Visibility Buffer // WARNING: The attach mechanism only works with pointers or // referenced variables. Otherwise the VisBuffer is created // and attached, but when it is assigned to the member it is // detached because of the dynamically called destructor vi::VisBuffer2 *visibilityBuffer_p; // Vis buffer characteristics (constant values) bool groupTimeSteps_p; casacore::Block<int> sortOrder_p; // casacore::Time average iterator parameters casacore::Bool enableTimeAvg_p; casacore::Bool enableChanAvg_p; casacore::Double timeAverageBin_p; casacore::Vector <casacore::Int> chanAverageBin_p; casacore::String dataColumnType_p; vi::AveragingOptions timeAvgOptions_p; casacore::Record chanAvgOptions_p; protected: // Common casacore::MS/CalTables private interface virtual void generateAntennaPairMap(); virtual void generateSubIntegrationMap(); virtual void generatePolarizationsMap(); virtual void generateAntennaPointingMap(); virtual void generateScanStartStopMap(); // casacore::Data Selection ranges bool anySelection_p; bool inrowSelection_p; casacore::String arraySelection_p; casacore::String fieldSelection_p; casacore::String scanSelection_p; casacore::String timeSelection_p; casacore::String spwSelection_p; casacore::String baselineSelection_p; casacore::String uvwSelection_p; casacore::String polarizationSelection_p; casacore::String scanIntentSelection_p; casacore::String observationSelection_p; // Async I/O stuff bool asyncio_enabled_p; // Pre-Load columns (in order to avoid parallelism problems when not using // async i/o, and also to know what columns to pre-fetch in async i/o mode) vector<VisBufferComponent2> preLoadColumns_p; // Iteration parameters casacore::uShort iterationApproach_p; casacore::Double timeInterval_p; // Slurp flag bool slurp_p; // Iteration initialization parameters bool chunksInitialized_p; bool buffersInitialized_p; bool iteratorGenerated_p; bool stopIteration_p; // Flag Cubes casacore::Cube<casacore::Bool> originalFlagCube_p; casacore::Cube<casacore::Bool> modifiedFlagCube_p; // FlagRows casacore::Vector<casacore::Bool> originalFlagRow_p; casacore::Vector<casacore::Bool> modifiedFlagRow_p; // Mapping members antennaPairMap *antennaPairMap_p; subIntegrationMap *subIntegrationMap_p; polarizationMap *polarizationMap_p; polarizationIndexMap *polarizationIndexMap_p; antennaPointingMap *antennaPointingMap_p; scanStartStopMap *scanStartStopMap_p; lambdaMap *lambdaMap_p; bool mapAntennaPairs_p; bool mapSubIntegrations_p; bool mapPolarizations_p; bool mapAntennaPointing_p; bool mapScanStartStop_p; bool mapScanStartStopFlagged_p; // Stats members bool stats_p; casacore::uLong cubeAccessCounter_p; double cubeAccessTime_p; casacore::uLong cubeAccessCounterTotal_p; double cubeAccessTimeTotal_p; // Profiling bool profiling_p; }; } //# NAMESPACE CASA - END #endif /* FLAGDATAHANDLER_H_ */