#include "AsynchronousInterface.h" #include "VLAT.h" #include <stdcasa/thread/AsynchronousTools.h> #include <stdcasa/UtilJ.h> #include <casacore/casa/System/AipsrcValue.h> #include <msvis/MSVis/VisBufferAsync.h> #include <msvis/MSVis/VisibilityIteratorImplAsync.h> #include <ostream> #include <utility> using namespace casacore; using namespace casa::async; using namespace casacore; using namespace casa::utilj; using namespace std; #define Log(level, ...) \ {if (AsynchronousInterface::logThis (level)) \ Logger::get()->log (__VA_ARGS__);}; using casa::async::Mutex; using namespace casacore; namespace casa { namespace asyncio { Bool AsynchronousInterface::loggingInitialized_p = false; Int AsynchronousInterface::logLevel_p = -1; AsynchronousInterface::AsynchronousInterface (int maxNBuffers) : lookaheadTerminationRequested_p (false), sweepTerminationRequested_p (false), viResetComplete_p (false), viResetRequested_p (false), vlaData_p (maxNBuffers, mutex_p), vlat_p (NULL), writeQueue_p () {} AsynchronousInterface::~AsynchronousInterface () {} void AsynchronousInterface::addModifier (RoviaModifier * modifier) { Log (1, "AsynchronousInterface::addModifier: {%s}\n", string(*modifier).c_str()); LockGuard lg (mutex_p); roviaModifiers_p.add (modifier); } async::Mutex & AsynchronousInterface::getMutex () const { return mutex_p; } VlaData * AsynchronousInterface::getVlaData () { return & vlaData_p; } VLAT * AsynchronousInterface::getVlat () { return vlat_p; } WriteQueue & AsynchronousInterface::getWriteQueue () { return writeQueue_p; } void AsynchronousInterface::initialize () { initializeLogging (); vlaData_p.initialize (this); writeQueue_p.initialize (this); vlat_p = new VLAT (this); } Bool AsynchronousInterface::initializeLogging () { if (loggingInitialized_p){ return true; } loggingInitialized_p = true; // If the log file variable is defined then start // up the logger const String logFileVariable = "Casa_VIA_LogFile"; const String logLevelVariable = "Casa_VIA_LogLevel"; String logFilename; Bool logFileFound = AipsrcValue<String>::find (logFilename, ROVisibilityIterator::getAsyncRcBase () + ".debug.logFile", ""); if (logFileFound && ! logFilename.empty() && downcase (logFilename) != "null" && downcase (logFilename) != "none"){ Logger::get()->start (logFilename.c_str()); AipsrcValue<Int>::find (logLevel_p, ROVisibilityIterator::getAsyncRcBase () + ".debug.logLevel", 1); Logger::get()->log ("VlaData log-level is %d; async I/O: %s; nBuffers=%d\n", logLevel_p, ROVisibilityIterator::isAsynchronousIoEnabled() ? "enabled" : "disabled", ViReadImplAsync::getDefaultNBuffers() ); return true; } return false; } Bool AsynchronousInterface::isLookaheadTerminationRequested () const { return lookaheadTerminationRequested_p; } Bool AsynchronousInterface::isSweepTerminationRequested () const { return sweepTerminationRequested_p; } Bool AsynchronousInterface::logThis (Int level) { return loggingInitialized_p && level <= logLevel_p; } void AsynchronousInterface::notifyAllInterfaceChanged () const { interfaceDataChanged_p.notify_all(); } void AsynchronousInterface::requestViReset () { // Called by main thread to request that the VI reset to the // start of the MS. UniqueLock uniqueLock (mutex_p); // enter critical section Log (1, "Requesting VI reset\n"); viResetRequested_p = true; // officially request the reset viResetComplete_p = false; // clear any previous completions terminateSweep (); // Wait for the request to be completed. Log (1, "Waiting for requesting VI reset\n"); while (! viResetComplete_p){ interfaceDataChanged_p.wait (uniqueLock); } Log (1, "Notified that VI reset has completed\n"); // The VI was reset } void AsynchronousInterface::terminate () { // Destroy the VLAT vlat_p->terminate(); // request termination vlat_p->join(); // wait for it to terminate delete vlat_p; // free its storage } void AsynchronousInterface::terminateLookahead () { // Called by main thread to stop the VLAT, etc. LockGuard lg (& mutex_p); lookaheadTerminationRequested_p = true; terminateSweep(); } void AsynchronousInterface::terminateSweep () { // Called internally to terminate VI sweeping. sweepTerminationRequested_p = true; // stop filling notifyAllInterfaceChanged(); } RoviaModifiers AsynchronousInterface::transferRoviaModifiers () { return roviaModifiers_p.transferModifiers(); } void AsynchronousInterface::viResetComplete () { ////Assert (mutex_p.isLockedByThisThread()); viResetRequested_p = false; sweepTerminationRequested_p = false; viResetComplete_p = true; notifyAllInterfaceChanged(); } Bool AsynchronousInterface::viResetRequested () { ////Assert (mutex_p.isLockedByThisThread()); return viResetRequested_p; } void AsynchronousInterface::waitForInterfaceChange (async::UniqueLock & uniqueLock) const { interfaceDataChanged_p.wait (uniqueLock); } ChannelSelection::ChannelSelection (const Block< Vector<Int> > & blockNGroup, const Block< Vector<Int> > & blockStart, const Block< Vector<Int> > & blockWidth, const Block< Vector<Int> > & blockIncr, const Block< Vector<Int> > & blockSpw) { blockNGroup_p = blockNGroup; blockStart_p = blockStart; blockWidth_p = blockWidth; blockIncr_p = blockIncr; blockSpw_p = blockSpw; } ChannelSelection::ChannelSelection (const ChannelSelection & other) { * this = other; } ChannelSelection & ChannelSelection::operator= (const ChannelSelection & other) { if (this != & other){ copyBlock (other.blockNGroup_p, blockNGroup_p); copyBlock (other.blockStart_p, blockStart_p); copyBlock (other.blockWidth_p, blockWidth_p); copyBlock (other.blockIncr_p, blockIncr_p); copyBlock (other.blockSpw_p, blockSpw_p); } return * this; } void ChannelSelection::copyBlock (const Block <Vector<Int> > & src, Block <Vector<Int> > & to) const { // Since this is a Block of Vector, we need to wipe out // the original contents of "to"; otherwise the semantics // of Vector::operator= will generate an exception if there // is a difference in length of any of the vector elements. to.resize (0, true); to = src; } void ChannelSelection::get (Block< Vector<Int> > & blockNGroup, Block< Vector<Int> > & blockStart, Block< Vector<Int> > & blockWidth, Block< Vector<Int> > & blockIncr, Block< Vector<Int> > & blockSpw) const { copyBlock (blockNGroup_p, blockNGroup); copyBlock (blockStart_p, blockStart); copyBlock (blockWidth_p, blockWidth); copyBlock (blockIncr_p, blockIncr); copyBlock (blockSpw_p, blockSpw); } std::ostream & operator<< (std::ostream & o, const RoviaModifier & m) { m.print (o); return o; } RoviaModifiers::~RoviaModifiers () { // // Free the objects owned by the vector // // for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){ // delete (* i); // } } void RoviaModifiers::add (RoviaModifier * modifier) { data_p.push_back (modifier); } void RoviaModifiers::apply (ROVisibilityIterator * rovi) { // Free the objects owned by the vector for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){ Log (1, "Applying vi modifier: %s\n", string(** i).c_str()); (* i) -> apply (rovi); } } void RoviaModifiers::clearAndFree () { for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){ delete (* i); } data_p.clear(); } RoviaModifiers RoviaModifiers::transferModifiers () { RoviaModifiers result; result.data_p.assign (data_p.begin(), data_p.end()); data_p.clear(); // remove them from the other object but do not destroy them return result; } SelectChannelModifier::SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow) : channelBlocks_p (false), increment_p (increment), nGroup_p (nGroup), spectralWindow_p (spectralWindow), start_p (start), width_p (width) {} SelectChannelModifier::SelectChannelModifier (const Block< Vector<Int> > & blockNGroup, const Block< Vector<Int> > & blockStart, const Block< Vector<Int> > & blockWidth, const Block< Vector<Int> > & blockIncr, const Block< Vector<Int> > & blockSpw) : channelBlocks_p (true), channelSelection_p (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw) {} void SelectChannelModifier::apply (ROVisibilityIterator * rovi) const { if (! channelBlocks_p){ rovi->selectChannel (nGroup_p, start_p, width_p, increment_p, spectralWindow_p); } else{ Block< Vector<Int> > blockNGroup; Block< Vector<Int> > blockStart; Block< Vector<Int> > blockWidth; Block< Vector<Int> > blockIncr; Block< Vector<Int> > blockSpw; channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw); rovi->selectChannel (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw); } } void SelectChannelModifier::print (ostream & os) const { os << "SelectChannel::{"; if (channelBlocks_p){ Block< Vector<Int> > blockNGroup; Block< Vector<Int> > blockStart; Block< Vector<Int> > blockWidth; Block< Vector<Int> > blockIncr; Block< Vector<Int> > blockSpw; channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw); os << "nGroup=" << toCsv (blockNGroup) << ", start=" << toCsv (blockStart) << ", width=" << toCsv (blockWidth) << ", increment=" << toCsv (blockIncr) << ", spw=" << toCsv (blockSpw); } else { os << "nGroup=" << nGroup_p << ", start=" << start_p << ", width=" << width_p << ", increment=" << increment_p << ", spw=" << spectralWindow_p; } os << "}"; } String SelectChannelModifier::toCsv (const Block< Vector<Int> > & bv) const { String result = "{"; for (Block<Vector<Int> >::const_iterator v = bv.begin(); v != bv.end(); ++ v){ if (result.size() != 1) result += ","; result += "{" + toCsv (* v) + "}"; } result += "}"; return result; } String SelectChannelModifier::toCsv (const Vector<Int> & v) const { String result = ""; for (Vector<Int>::const_iterator i = v.begin(); i != v.end(); ++ i){ if (! result.empty()) result += ","; result += String::toString (* i); } return result; } SelectVelocityModifier::SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc, MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise) : dType_p (dType), nChan_p (nChan), precise_p (precise), rvType_p (rvType), vInc_p (vInc), vStart_p (vStart) {} void SelectVelocityModifier::apply (ROVisibilityIterator * rovi) const { rovi-> selectVelocity (nChan_p, vStart_p, vInc_p, rvType_p, dType_p, precise_p); } void SelectVelocityModifier::print (std::ostream & os) const { os << "SelectVelocity::{" << "dType=" << dType_p << ",nChan=" << nChan_p << ",precise=" << precise_p << ",rvType=" << rvType_p << ",vInc=" << vInc_p << ",vStart=" << vStart_p << "}"; } SetIntervalModifier::SetIntervalModifier (Double timeInterval) : timeInterval_p (timeInterval) {} void SetIntervalModifier::apply (ROVisibilityIterator * rovi) const { rovi -> setInterval (timeInterval_p); } void SetIntervalModifier::print (std::ostream & os) const { os << "SetInterval::{" << timeInterval_p << "}"; } SetRowBlockingModifier::SetRowBlockingModifier (Int nRows) : nRows_p (nRows) {} void SetRowBlockingModifier::apply (ROVisibilityIterator * rovi) const { rovi->setRowBlocking (nRows_p); } void SetRowBlockingModifier::print (std::ostream & os) const { os << "SetRowBlocking::{" << "nRows=" << nRows_p << ",nGroup=" << nGroup_p << ",spectralWindow=" << spectralWindow_p << ",start=" << start_p << ",width=" << width_p << "}"; } // ************************** // * * // * VlaData Implementation * // * * // ************************** //Semaphore VlaData::debugBlockSemaphore_p (0); // used to block a thread for debugging VlaData::VlaData (Int maxNBuffers, async::Mutex & mutex) : MaxNBuffers_p (maxNBuffers), mutex_p (mutex) { timing_p.fillCycle_p = DeltaThreadTimes (true); timing_p.fillOperate_p = DeltaThreadTimes (true); timing_p.fillWait_p = DeltaThreadTimes (true); timing_p.readCycle_p = DeltaThreadTimes (true); timing_p.readOperate_p = DeltaThreadTimes (true); timing_p.readWait_p = DeltaThreadTimes (true); timing_p.timeStart_p = ThreadTimes(); } VlaData::~VlaData () { timing_p.timeStop_p = ThreadTimes(); if (statsEnabled()){ Log (1, "VlaData stats:\n%s", makeReport ().c_str()); } resetBufferData (); } Int VlaData::clock (Int arg, Int base) { Int r = arg % base; if (r < 0){ r += base; } return r; } //void //VlaData::debugBlock () //{ // // Log (1, "VlaData::debugBlock(): Blocked\n"); // // // // debugBlockSemaphore_p.wait (); // // // // Log (1, "VlaData::debugBlock(): Unblocked\n"); //} //void //VlaData::debugUnblock () //{ // // int v = debugBlockSemaphore_p.getValue(); // // // // if (v == 0){ // // Log (1, "VlaData::debugUnblock()\n"); // // debugBlockSemaphore_p.post (); // // } // // else // // Log (1, "VlaData::debugUnblock(): already unblocked; v=%d\n", v); //} void VlaData::fillComplete (VlaDatum * datum) { LockGuard lg (mutex_p); if (statsEnabled()){ timing_p.fill3_p = ThreadTimes(); timing_p.fillWait_p += timing_p.fill2_p - timing_p.fill1_p; timing_p.fillOperate_p += timing_p.fill3_p - timing_p.fill2_p; timing_p.fillCycle_p += timing_p.fill3_p - timing_p.fill1_p; } data_p.push (datum); Log (2, "VlaData::fillComplete on %s\n", datum->getSubChunkPair ().toString().c_str()); assert ((Int)data_p.size() <= MaxNBuffers_p); interface_p->notifyAllInterfaceChanged(); } Bool VlaData::fillCanStart () const { // Caller must lock Bool canStart = (int) data_p.size() < MaxNBuffers_p; return canStart; } VlaDatum * VlaData::fillStart (SubChunkPair subchunk, const ThreadTimes & fillStartTime) { LockGuard lg (mutex_p); statsEnabled () && (timing_p.fill1_p = fillStartTime, true); Assert ((int) data_p.size() < MaxNBuffers_p); VlaDatum * datum = new VlaDatum (subchunk); Log (2, "VlaData::fillStart on %s\n", datum->getSubChunkPair().toString().c_str()); if (validChunks_p.empty() || validChunks_p.back() != subchunk.chunk ()) insertValidChunk (subchunk.chunk ()); insertValidSubChunk (subchunk); statsEnabled () && (timing_p.fill2_p = ThreadTimes(), true); if (interface_p->isSweepTerminationRequested()){ delete datum; datum = NULL; // datum may not be ready to fill and shouldn't be anyway } return datum; } asyncio::ChannelSelection VlaData::getChannelSelection () const { LockGuard lg (mutex_p); return channelSelection_p; } void VlaData::initialize (const AsynchronousInterface * interface) { interface_p = interface; LockGuard lg (mutex_p); resetBufferData (); } void VlaData::insertValidChunk (Int chunkNumber) { ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex. validChunks_p.push (chunkNumber); interface_p->notifyAllInterfaceChanged(); } void VlaData::insertValidSubChunk (SubChunkPair subchunk) { ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex. validSubChunks_p.push (subchunk); interface_p->notifyAllInterfaceChanged(); } //Bool //VlaData::isSweepTerminationRequested () const //{ // return sweepTerminationRequested_p; //} Bool VlaData::isValidChunk (Int chunkNumber) const { bool validChunk = false; // Check to see if this is a valid chunk. If the data structure is empty // then sleep for a tiny bit to allow the VLAT thread to either make more // chunks available for insert the sentinel value INT_MAX into the data // structure. UniqueLock uniqueLock (mutex_p); do { while (validChunks_p.empty()){ interface_p->waitForInterfaceChange (uniqueLock); } while (! validChunks_p.empty() && validChunks_p.front() < chunkNumber){ validChunks_p.pop(); } if (! validChunks_p.empty()) validChunk = validChunks_p.front() == chunkNumber; } while (validChunks_p.empty()); Log (3, "isValidChunk (%d) --> %s\n", chunkNumber, validChunk ? "true" : "false"); return validChunk; } Bool VlaData::isValidSubChunk (SubChunkPair subchunk) const { SubChunkPair s; bool validSubChunk = false; // Check to see if this is a valid subchunk. If the data structure is empty // then sleep for a tiny bit to allow the VLAT thread to either make more // subchunks available for insert the sentinel value (INT_MAX, INT_MAX) into the data // structure. UniqueLock uniqueLock (mutex_p); do { while (validSubChunks_p.empty()){ interface_p->waitForInterfaceChange (uniqueLock); } while (! validSubChunks_p.empty() && validSubChunks_p.front() < subchunk){ validSubChunks_p.pop(); } if (! validSubChunks_p.empty()) validSubChunk = validSubChunks_p.front() == subchunk; } while (validSubChunks_p.empty()); Log (3, "isValidSubChunk %s --> %s\n", subchunk.toString().c_str(), validSubChunk ? "true" : "false"); return validSubChunk; } String VlaData::makeReport () { String report; DeltaThreadTimes duration = (timing_p.timeStop_p - timing_p.timeStart_p); // seconds report += String::format ("\nLookahead Stats: nCycles=%d, duration=%.3f sec\n...\n", timing_p.readWait_p.n(), duration.elapsed()); report += "...ReadWait: " + timing_p.readWait_p.formatAverage () + "\n"; report += "...ReadOperate: " + timing_p.readOperate_p.formatAverage() + "\n"; report += "...ReadCycle: " + timing_p.readCycle_p.formatAverage() + "\n"; report += "...FillWait: " + timing_p.fillWait_p.formatAverage() + "\n"; report += "...FillOperate: " + timing_p.fillOperate_p.formatAverage () + "\n"; report += "...FillCycle: " + timing_p.fillCycle_p.formatAverage () + "\n"; Double syncCycle = timing_p.fillOperate_p.elapsedAvg() + timing_p.readOperate_p.elapsedAvg(); Double asyncCycle = max (timing_p.fillCycle_p.elapsedAvg(), timing_p.readCycle_p.elapsedAvg()); report += String::format ("...Sync cycle would be %6.1f ms\n", syncCycle * 1000); report += String::format ("...Speedup is %5.1f%%\n", (syncCycle / asyncCycle - 1) * 100); report += String::format ("...Total time savings estimate is %7.3f seconds\n", (syncCycle - asyncCycle) * timing_p.readWait_p.n()); return report; } void VlaData::readComplete (SubChunkPair subchunk) { LockGuard lg (mutex_p); if (statsEnabled()){ timing_p.read3_p = ThreadTimes(); timing_p.readWait_p += timing_p.read2_p - timing_p.read1_p; timing_p.readOperate_p += timing_p.read3_p - timing_p.read2_p; timing_p.readCycle_p += timing_p.read3_p - timing_p.read1_p; } Log (2, "VlaData::readComplete on %s\n", subchunk.toString().c_str()); } VisBufferAsync * VlaData::readStart (SubChunkPair subchunk) { // Called by main thread UniqueLock uniqueLock (mutex_p); statsEnabled () && (timing_p.read1_p = ThreadTimes(), true); // Wait for a subchunk's worth of data to be available. while (data_p.empty()){ interface_p->waitForInterfaceChange (uniqueLock); } // Get the data off the queue and notify world of change in VlaData. VlaDatum * datum = data_p.front(); data_p.pop (); interface_p->notifyAllInterfaceChanged(); ThrowIf (! datum->isSubChunk (subchunk), String::format ("Reader wanted subchunk %s while next subchunk is %s", subchunk.toString().c_str(), datum->getSubChunkPair().toString().c_str())); Log (2, "VlaData::readStart on %s\n", subchunk.toString().c_str()); statsEnabled () && (timing_p.read2_p = ThreadTimes(), true); // Extract the VisBufferAsync enclosed in the datum for return to caller, // then destroy the rest of the datum object VisBufferAsync * vba = datum->releaseVisBufferAsync (); delete datum; return vba; } void VlaData::resetBufferData () { ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex. // Flush any accumulated buffers while (! data_p.empty()){ VlaDatum * datum = data_p.front(); data_p.pop (); delete datum; } // Flush the chunk and subchunk indices while (! validChunks_p.empty()) validChunks_p.pop(); while (! validSubChunks_p.empty()) validSubChunks_p.pop(); } void VlaData::setNoMoreData () { LockGuard lg (mutex_p); insertValidChunk (INT_MAX); insertValidSubChunk (SubChunkPair::noMoreData ()); } Bool VlaData::statsEnabled () const { // Determines whether asynchronous I/O is enabled by looking for the // expected AipsRc value. If not found then async i/o is disabled. Bool doStats; AipsrcValue<Bool>::find (doStats, ROVisibilityIterator::getAsyncRcBase () + ".doStats", false); return doStats; } void VlaData::storeChannelSelection (const asyncio::ChannelSelection & channelSelection) { LockGuard lg (mutex_p); channelSelection_p = channelSelection; } // *************************** // * * // * VlaDatum Implementation * // * * // *************************** VlaDatum::VlaDatum (SubChunkPair subchunk) : subchunk_p (subchunk), visBuffer_p (new VisBufferAsync ()) {} VlaDatum::~VlaDatum() { delete visBuffer_p; } SubChunkPair VlaDatum::getSubChunkPair () const { return subchunk_p; } VisBufferAsync * VlaDatum::getVisBuffer () { return visBuffer_p; } //const VisBufferAsync * //VlaDatum::getVisBuffer () const //{ // assert (state_p == Filling || state_p == Reading); // // return visBuffer_p; //} Bool VlaDatum::isSubChunk (SubChunkPair subchunk) const { return subchunk == subchunk_p; } VisBufferAsync * VlaDatum::releaseVisBufferAsync () { VisBufferAsync * vba = visBuffer_p; visBuffer_p = NULL; return vba; } WriteQueue::WriteQueue () : interface_p (NULL) {} WriteQueue::~WriteQueue () { Assert (queue_p.empty()); } WriteData * WriteQueue::dequeue () { LockGuard lg (mutex_p); WriteData * result = NULL; if (! empty (true)){ result = queue_p.front(); // get the first value queue_p.pop(); // remove it from the queue } return result; } Bool WriteQueue::empty (Bool alreadyLocked) { Bool isEmpty; if (alreadyLocked){ isEmpty = queue_p.empty(); } else { LockGuard lg (mutex_p); isEmpty = queue_p.empty(); } return isEmpty; } void WriteQueue::enqueue (WriteData * writeData) { Assert (writeData != NULL); LockGuard lg (mutex_p); queue_p.push (writeData); interface_p->notifyAllInterfaceChanged (); } void WriteQueue::initialize (const AsynchronousInterface * interface) { interface_p = interface; } } // end namespace asyncio using namespace casacore; } // end namespace casa