//# VLAT.cc: Implemenation of visibility lookahead thread related functionality.
//# Copyright (C) 2011
//# Associated Universities, Inc. Washington DC, USA.
//#
//# This library is free software; you can redistribute it and/or modify it
//# under the terms of the GNU Library General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or (at your
//# option) any later version.
//#
//# This library is distributed in the hope that it will be useful, but WITHOUT
//# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
//# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
//# License for more details.
//#
//# You should have received a copy of the GNU Library General Public License
//# along with this library; if not, write to the Free Software Foundation,
//# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
//#
//# Correspondence concerning CASA should be addressed as follows:
//#        Internet email: CASA-request@nrao.edu.
//#        Postal address: CASA Project Office
//#                        National Radio Astronomy Observatory
//#                        520 Edgemont Road
//#                        Charlottesville, VA 22903-2475 USA
//#
//#
//# $Id$

#include <assert.h>
#include <time.h>
#include <sys/time.h>

#include <casacore/casa/Logging/LogIO.h>
#include <casacore/casa/System/AipsrcValue.h>
#include <msvis/MSVis/VLAT.h>
#include <msvis/MSVis/VisBufferAsync.h>

#include <stdcasa/thread/AsynchronousTools.h>
using namespace casacore;
using namespace casa::async;

#include <algorithm>
#include <cstdarg>
#include <functional>

#include <stdcasa/UtilJ.h>

using namespace casacore;
using namespace casa::utilj;
using namespace std;
using namespace casacore;
using namespace casa::asyncio;

#define Log(level, ...) \
        {if (AsynchronousInterface::logThis (level)) \
    Logger::get()->log (__VA_ARGS__);};

using namespace casacore;
namespace casa {

namespace asyncio {

//  *****************************
//  *                           *
//  * VlatAndDataImplementation *
//  *                           *
//  *****************************

VlatAndData::VlatAndData ()
: vlaData_p (NULL),
  vlat_p (NULL)
{
}

//  ***********************
//  *                     *
//  * VLAT Implementation *
//  *                     *
//  ***********************

VLAT::VLAT (AsynchronousInterface * asynchronousInterface)
{
    interface_p = asynchronousInterface;
    vlaData_p = interface_p->getVlaData();
    visibilityIterator_p = NULL;
    writeIterator_p = NULL;
    threadTerminated_p = false;
}

VLAT::~VLAT ()
{

    // Free up storage

    for (FillerDictionary::iterator f = fillerDictionary_p.begin();
            f != fillerDictionary_p.end();
            f++){
        delete (f->second);
    }

    for (Fillers::iterator f = fillers_p.begin();
            f != fillers_p.end();
            f++){
        delete (* f);
    }

    delete visibilityIterator_p;
    delete writeIterator_p;
}

void
VLAT::alignWriteIterator (SubChunkPair subchunk)
{
    Assert (subchunk <= readSubchunk_p);

    Bool done = false;

    while (subchunk > writeSubchunk_p && ! done){

        ++ (* writeIterator_p); // advance to next subchunk in chunk

        if (writeIterator_p->more()){

            // Sucessfully moved on to next subchunk

            writeSubchunk_p.incrementSubChunk();
        }
        else{

            // End of subchunks to advance to start of next chunk

            writeIterator_p->nextChunk ();

            if (writeIterator_p->moreChunks ()){

                // Moved on to next chunk; position to first subchunk

                writeIterator_p->origin ();

                if (writeIterator_p->more()){
                    writeSubchunk_p.incrementChunk();
                }
                else{
                    done = true; // no more data
                }
            }
            else{
                done = true; // no more data
            }
        }
    }

    ThrowIf (subchunk != writeSubchunk_p,
             String::format ("Failed to advance write iterator to subchunk %s; last subchunk is %s",
                             subchunk.toString().c_str(), writeSubchunk_p.toString().c_str()));
}

void
VLAT::applyModifiers (ROVisibilityIterator * rovi, VisibilityIterator * vi)
{
    // Apply modifiers to read iterator and to write iterator (if it exists)

    roviaModifiers_p.apply (rovi);

    if (vi != NULL){
        roviaModifiers_p.apply (vi);
    }

    // Get the channel selection information from the modified read VI and provide it to the
    // data object

    roviaModifiers_p.clearAndFree ();

    Block< Vector<Int> > blockNGroup;
    Block< Vector<Int> > blockStart;
    Block< Vector<Int> > blockWidth;
    Block< Vector<Int> > blockIncr;
    Block< Vector<Int> > blockSpw;

    rovi->getChannelSelection (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);

    vlaData_p -> storeChannelSelection (asyncio::ChannelSelection (blockNGroup, blockStart,
                                                                   blockWidth, blockIncr, blockSpw));
}

void
VLAT::checkFiller (VisBufferComponents::EnumType fillerId)
{
    ThrowIf (! visibilityIterator_p -> existsColumn (fillerId),
             String::format ("VLAT: Column to be prefetched, %s, does not exist!",
                            PrefetchColumns::columnName (fillerId).c_str()));
}

void
VLAT::createFillerDictionary ()
{
    // Create a dictionary of all the possible fillers using the
    // ViReadImplAsync::PrefetchColumnIds as the keys

    fillerDictionary_p.clear();

    fillerDictionary_p.add (VisBufferComponents::AllBeamOffsetsZero,
                           vlatFunctor0 (& VisBufferAsync::fillAllBeamOffsetsZero));
    fillerDictionary_p.add (VisBufferComponents::AntennaMounts,
                           vlatFunctor0 (& VisBufferAsync::fillAntennaMounts));
    fillerDictionary_p.add (VisBufferComponents::Ant1,
                           vlatFunctor0 (& VisBuffer::fillAnt1));
    fillerDictionary_p.add (VisBufferComponents::Ant2,
                           vlatFunctor0 (& VisBuffer::fillAnt2));
    fillerDictionary_p.add (VisBufferComponents::ArrayId,
                           vlatFunctor0 (& VisBuffer::fillArrayId));
    fillerDictionary_p.add (VisBufferComponents::BeamOffsets,
                           vlatFunctor0 (& VisBufferAsync::fillBeamOffsets));
    fillerDictionary_p.add (VisBufferComponents::Channel,
                           vlatFunctor0 (& VisBuffer::fillChannel));
    fillerDictionary_p.add (VisBufferComponents::Cjones,
                           vlatFunctor0 (& VisBuffer::fillCjones));
    fillerDictionary_p.add (VisBufferComponents::CorrType,
                           vlatFunctor0 (& VisBuffer::fillCorrType));
    fillerDictionary_p.add (VisBufferComponents::Corrected,
                           vlatFunctor1(& VisBuffer::fillVis,
                                        VisibilityIterator::Corrected));
    fillerDictionary_p.add (VisBufferComponents::CorrectedCube,
                           vlatFunctor1(& VisBuffer::fillVisCube,
                                        VisibilityIterator::Corrected));
    fillerDictionary_p.add (VisBufferComponents::DataDescriptionId,
                           vlatFunctor0 (& VisBuffer::fillDataDescriptionId));
//    fillerDictionary_p.add (VisBufferComponents::Direction1,
//                           vlatFunctor0 (& VisBuffer::fillDirection1));
//    fillerDictionary_p.add (VisBufferComponents::Direction2,
//                           vlatFunctor0 (& VisBuffer::fillDirection2));
    fillerDictionary_p.add (VisBufferComponents::Exposure,
                           vlatFunctor0 (& VisBuffer::fillExposure));
    fillerDictionary_p.add (VisBufferComponents::Feed1,
                           vlatFunctor0 (& VisBuffer::fillFeed1));
//    fillerDictionary_p.add (VisBufferComponents::Feed1_pa,
//                           vlatFunctor0 (& VisBuffer::fillFeed1_pa));
    fillerDictionary_p.add (VisBufferComponents::Feed2,
                           vlatFunctor0 (& VisBuffer::fillFeed2));
//    fillerDictionary_p.add (VisBufferComponents::Feed2_pa,
//                           vlatFunctor0 (& VisBuffer::fillFeed2_pa));
    fillerDictionary_p.add (VisBufferComponents::FieldId,
                           vlatFunctor0 (& VisBuffer::fillFieldId));
    fillerDictionary_p.add (VisBufferComponents::Flag,
                           vlatFunctor0 (& VisBuffer::fillFlag));
    fillerDictionary_p.add (VisBufferComponents::FlagCategory,
                           vlatFunctor0 (& VisBuffer::fillFlagCategory));
    fillerDictionary_p.add (VisBufferComponents::FlagCube,
                           vlatFunctor0 (& VisBuffer::fillFlagCube));
    fillerDictionary_p.add (VisBufferComponents::FlagRow,
                           vlatFunctor0 (& VisBuffer::fillFlagRow));
    fillerDictionary_p.add (VisBufferComponents::Freq,
                           vlatFunctor0 (& VisBuffer::fillFreq));
    fillerDictionary_p.add (VisBufferComponents::ImagingWeight,
                           new VlatFunctor ("ImagingWeight")); // do not fill this one
    fillerDictionary_p.add (VisBufferComponents::Model,
                           vlatFunctor1(& VisBuffer::fillVis,
                                        VisibilityIterator::Model));
    fillerDictionary_p.add (VisBufferComponents::ModelCube,
                           vlatFunctor1(& VisBuffer::fillVisCube,
                                        VisibilityIterator::Model));
    fillerDictionary_p.add (VisBufferComponents::NChannel,
                           vlatFunctor0 (& VisBuffer::fillnChannel));
    fillerDictionary_p.add (VisBufferComponents::NCorr,
                           vlatFunctor0 (& VisBuffer::fillnCorr));
    fillerDictionary_p.add (VisBufferComponents::NRow,
                           vlatFunctor0 (& VisBuffer::fillnRow));
    fillerDictionary_p.add (VisBufferComponents::ObservationId,
                           vlatFunctor0 (& VisBuffer::fillObservationId));
    fillerDictionary_p.add (VisBufferComponents::Observed,
                           vlatFunctor1(& VisBuffer::fillVis,
                                        VisibilityIterator::Observed));
    fillerDictionary_p.add (VisBufferComponents::ObservedCube,
                           vlatFunctor1(& VisBuffer::fillVisCube,
                                        VisibilityIterator::Observed));
    fillerDictionary_p.add (VisBufferComponents::PhaseCenter,
                           vlatFunctor0 (& VisBuffer::fillPhaseCenter));
    fillerDictionary_p.add (VisBufferComponents::PolFrame,
                           vlatFunctor0 (& VisBuffer::fillPolFrame));
    fillerDictionary_p.add (VisBufferComponents::ProcessorId,
                           vlatFunctor0 (& VisBuffer::fillProcessorId));
    fillerDictionary_p.add (VisBufferComponents::ReceptorAngles,
                           vlatFunctor0 (& VisBufferAsync::fillReceptorAngles));
    fillerDictionary_p.add (VisBufferComponents::Scan,
                           vlatFunctor0 (& VisBuffer::fillScan));
    fillerDictionary_p.add (VisBufferComponents::Sigma,
                           vlatFunctor0 (& VisBuffer::fillSigma));
    fillerDictionary_p.add (VisBufferComponents::SigmaMat,
                           vlatFunctor0 (& VisBuffer::fillSigmaMat));
    fillerDictionary_p.add (VisBufferComponents::SpW,
                           vlatFunctor0 (& VisBuffer::fillSpW));
    fillerDictionary_p.add (VisBufferComponents::StateId,
                           vlatFunctor0 (& VisBuffer::fillStateId));
    fillerDictionary_p.add (VisBufferComponents::Time,
                           vlatFunctor0 (& VisBuffer::fillTime));
    fillerDictionary_p.add (VisBufferComponents::TimeCentroid,
                           vlatFunctor0 (& VisBuffer::fillTimeCentroid));
    fillerDictionary_p.add (VisBufferComponents::TimeInterval,
                           vlatFunctor0 (& VisBuffer::fillTimeInterval));
    fillerDictionary_p.add (VisBufferComponents::Uvw,
                           vlatFunctor0 (& VisBuffer::filluvw));
    fillerDictionary_p.add (VisBufferComponents::UvwMat,
                           vlatFunctor0 (& VisBuffer::filluvwMat));
    fillerDictionary_p.add (VisBufferComponents::Weight,
                           vlatFunctor0 (& VisBuffer::fillWeight));
    fillerDictionary_p.add (VisBufferComponents::WeightMat,
                           vlatFunctor0 (& VisBuffer::fillWeightMat));
    fillerDictionary_p.add (VisBufferComponents::WeightSpectrum,
                           vlatFunctor0 (& VisBuffer::fillWeightSpectrum));

    // assert (fillerDictionary_p.size() == VisBufferComponents::N_VisBufferComponents);
    // Every supported prefetch column needs a filler

    //fillerDependencies_p.add ();

    //fillerDependencies_p.performTransitiveClosure (fillerDictionary_p);
}

void
VLAT::fillDatum (VlaDatum * datum)
{

    VisBufferComponents::EnumType fillerId = VisBufferComponents::Unknown;
    try{
        VisBufferAsync * vb = datum->getVisBuffer();

        vb->clear();
        vb->setFilling (true);
        vb->attachToVisIter (* visibilityIterator_p); // invalidates vb's cache as well

        fillDatumMiscellanyBefore (datum);

        for (Fillers::iterator filler = fillers_p.begin(); filler != fillers_p.end(); filler ++){

            //Log (2, "Filler id=%d name=%s starting\n", (* filler)->getId(), ViReadImplAsync::prefetchColumnName((* filler)->getId()).c_str());

            fillerId = (* filler)->getId();
            checkFiller (fillerId);
            (** filler) (vb);
        }

        fillDatumMiscellanyAfter (datum);

        vb->detachFromVisIter ();
        vb->setFilling (false);
    }
    catch (...){

        if (fillerId == -1){
            Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; rethrowing\n",
                 datum, datum->getVisBuffer());
        }
        else{
            Log (1, "VLAT: Error while filling datum at 0x%08x, vb at 0x%08x; "
                 "fillingColumn='%s'; rethrowing\n",
                 datum, datum->getVisBuffer(),
                 PrefetchColumns::columnName (fillerId).c_str());
        }

        throw;
    }
}

void
VLAT::fillDatumMiscellanyAfter (VlaDatum * datum)
{
    datum->getVisBuffer()->setVisibilityShape (visibilityIterator_p->visibilityShape ());

    //////datum->getVisBuffer()->setDataDescriptionId (visibilityIterator_p->getDataDescriptionId());

    datum->getVisBuffer()->setPolarizationId (visibilityIterator_p->polarizationId());

    datum->getVisBuffer()->setNCoh(visibilityIterator_p->numberCoh ());

    datum->getVisBuffer()->setNRowChunk (visibilityIterator_p->nRowChunk());

    Vector<Int> nvischan;
    Vector<Int> spw;

    visibilityIterator_p->allSelectedSpectralWindows(spw, nvischan);

    datum->getVisBuffer()->setSelectedNVisibilityChannels (nvischan);
    datum->getVisBuffer()->setSelectedSpectralWindows (spw);

    int nSpw = visibilityIterator_p->numberSpw();
    datum->getVisBuffer()->setNSpw (nSpw);

    int nRowChunk = visibilityIterator_p->nRowChunk();
    datum->getVisBuffer()->setNRowChunk (nRowChunk);

    datum->getVisBuffer()->setMSD (visibilityIterator_p->getMSD ()); // ought to be last
}

void
VLAT::fillDatumMiscellanyBefore (VlaDatum * datum)
{
    datum->getVisBuffer()->setMeasurementSet (visibilityIterator_p->getMeasurementSet());
    datum->getVisBuffer()->setMeasurementSetId (visibilityIterator_p->getMeasurementSetId(),
                                                datum->getSubChunkPair().second == 0);

    datum->getVisBuffer()->setNewEntityFlags (visibilityIterator_p->newArrayId(),
                                              visibilityIterator_p->newFieldId(),
                                              visibilityIterator_p->newSpectralWindow());
    datum->getVisBuffer()->setNAntennas (visibilityIterator_p->getNAntennas ());
    datum->getVisBuffer()->setMEpoch (visibilityIterator_p->getEpoch ());
    datum->getVisBuffer()->setReceptor0Angle (visibilityIterator_p->getReceptor0Angle());

    fillLsrInfo (datum);

    Vector<Double> lsrFreq, selFreq;
    visibilityIterator_p->getTopoFreqs (lsrFreq, selFreq);
    datum->getVisBuffer()->setTopoFreqs (lsrFreq, selFreq);
}


void
VLAT::fillLsrInfo (VlaDatum * datum)
{
    MPosition observatoryPositon;
    MDirection phaseCenter;
    Bool velocitySelection;


    Block<Int> channelGroupNumber;
    Block<Int> channelIncrement;
    Block<Int> channelStart;
    Block<Int> channelWidth;

    visibilityIterator_p->getLsrInfo (channelGroupNumber,
                                      channelIncrement,
                                      channelStart,
                                      channelWidth,
                                      observatoryPositon,
                                      phaseCenter,
                                      velocitySelection);

    datum->getVisBuffer()->setLsrInfo (channelGroupNumber,
                                       channelIncrement,
                                       channelStart,
                                       channelWidth,
                                       observatoryPositon,
                                       phaseCenter,
                                       velocitySelection);
}

void
VLAT::flushWrittenData ()
{
    for (int i = 0; i < (int) measurementSets_p.nelements() ; i++){
        measurementSets_p [i].flush();
    }
}

void
VLAT::handleWrite ()
{
    // While there is data to write out, write it out.

    Bool done = false;

    WriteQueue & writeQueue = interface_p->getWriteQueue ();

    do {

        WriteData * writeData = writeQueue.dequeue ();

        if (writeData != NULL){

            SubChunkPair subchunk = writeData->getSubChunkPair();

            alignWriteIterator (subchunk);

            writeData->write (writeIterator_p);
            delete writeData;
        }
        else{
            done = true;
        }

    } while (! done);
}


void
VLAT::initialize (const ROVisibilityIterator & rovi)
{
    ThrowIf (isStarted(), "VLAT::initialize: thread already started");

    visibilityIterator_p = new ROVisibilityIterator (rovi);

    visibilityIterator_p->originChunks (true);
    // force the MSIter, etc., to be rewound, reinitialized, etc.
}

void
VLAT::initialize (const Block<MeasurementSet> & mss,
                  const Block<Int> & sortColumns,
                  Bool addDefaultSortCols,
                  Double timeInterval,
                  Bool writable)
{
    ThrowIf (isStarted(), "VLAT::initialize: thread already started");

    visibilityIterator_p = new ROVisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);

    if (writable){
        writeIterator_p = new VisibilityIterator (mss, sortColumns, addDefaultSortCols, timeInterval);
        writeIterator_p->originChunks();
        writeIterator_p->origin ();

        measurementSets_p = mss;
    }

}

Bool
VLAT::isTerminated () const
{
    return threadTerminated_p;
}

void *
VLAT::run ()
{
    // Log thread initiation

    Logger::get()->registerName ("VLAT");
    String writable = writeIterator_p != NULL ? "writable" : "readonly";
    Log (1, "VLAT starting execution; tid=%d; VI is %s.\n", gettid(), writable.c_str());

    LogIO logIo (LogOrigin ("VLAT"));
    logIo << "starting execution; tid=" << gettid() << endl << LogIO::POST;

    // Enter run loop.  The run loop will only be exited when the main thread
    // explicitly asks for the termination of this thread (or an uncaught
    // exception comes to this level).


    try {
        do{
            Log (1, "VLAT starting VI sweep\n");

            // Start sweeping the real VI over its entire range
            // (subject to the number of free buffers).  The sweep
            // can be ended abruptly if

            sweepVi ();

            Bool startNewSweep = waitForViReset ();

            if (! startNewSweep){
                break; // Not resetting so it's time to quit
            }

        } while (true);

        handleWrite (); // service any pending writes

        flushWrittenData ();

        threadTerminated_p = true;
        Log (1, "VLAT stopping execution.\n");
        logIo << "stopping execution normally; tid=" << gettid() << endl << LogIO::POST;

        return NULL;

    }
    catch (std::exception & e){

        cerr << "VLAT thread caught exception: " << e.what() << endl;
        cerr.flush();

        Log (1, "VLAT caught exception: %s.\n", e.what());
        logIo << "caught exception; tid=" << gettid() << "-->" << e.what() << endl << LogIO::POST;

        threadTerminated_p = true;
        throw;
    }
    catch (...){

        cerr << "VLAT thread caught unknown exception: " << endl;
        cerr.flush();

        Log (1, "VLAT caught unknown exception:\n");
        logIo << "caught unknown exception; tid=" << gettid() << endl << LogIO::POST;

        threadTerminated_p = true;
        throw;
    }
}

void
VLAT::setPrefetchColumns (const ViReadImplAsync::PrefetchColumns & columns)
{
    ThrowIf (isStarted(), "VLAT::setColumns: cannot do this after thread started");
    ThrowIf (! fillers_p.empty(), "VLAT::setColumns:: has already been done");

    createFillerDictionary ();

    for (ViReadImplAsync::PrefetchColumns::const_iterator c = columns.begin();
            c != columns.end();
            c ++){

        ThrowIf (! containsKey (*c, fillerDictionary_p),
                 String::format ("Unknown prefetch column id (%d)", *c));

        fillers_p.push_back (fillerDictionary_p [*c]->clone());
    }

    //	sort (fillers_p.begin(),
    //	      fillers_p.end(),
    //	      VlatFunctor::byDecreasingPrecedence);
}

void
VLAT::sweepVi ()
{

    // Configure the iterator(s) to start a new sweep through the data.
    // Reset the subchunk counters, apply any queued modifiers and if
    // the write iterator exists reset it to the chunk origin (the
    // read iterator gets reset at the start of the sweep loop).

    readSubchunk_p.resetToOrigin ();
    writeSubchunk_p.resetToOrigin ();

    applyModifiers (visibilityIterator_p, writeIterator_p);

    if (writeIterator_p != NULL){
        writeIterator_p->originChunks (true);
    }

    // Start sweeping the data with the read only iterator.  If there
    // is a write iterator it will write behind the RO iterator; the RW
    // iterator is advanced to align with the appropriate subchunk when
    // a request is made to write a particular subchunk.

    try {

        for (visibilityIterator_p->originChunks(true);
             visibilityIterator_p->moreChunks();
             visibilityIterator_p->nextChunk(), readSubchunk_p.incrementChunk ()){

            for (visibilityIterator_p->origin();
                 visibilityIterator_p->more();
                 ++ (* visibilityIterator_p), readSubchunk_p.incrementSubChunk ()){

                ThreadTimes startTime = ThreadTimes ();

                waitUntilFillCanStart ();

                throwIfSweepTerminated ();

                VlaDatum * vlaDatum = vlaData_p -> fillStart (readSubchunk_p, startTime);

                throwIfSweepTerminated();

                fillDatum (vlaDatum);

                throwIfSweepTerminated ();

                vlaData_p -> fillComplete (vlaDatum);

                throwIfSweepTerminated ();

                handleWrite ();
            }
        }

        Log (1, "VLAT: no more data\n");

        vlaData_p -> setNoMoreData ();
    }
    catch (SweepTerminated &){
        Log (1, "VLAT: VI sweep termination requested.\n");
    }
    catch (AipsError e){
        Log (1, "AipsError during sweepVi; readSubchunk=%s, writeSubChunk=%s",
             readSubchunk_p.toString().c_str(), writeSubchunk_p.toString().c_str());
        throw;
    }
}

void
VLAT::terminate ()
{
    // Called by another thread to terminate the VLAT.

    Log (2, "Terminating VLAT\n");
    //printBacktrace (cerr, "VLAT termination");

    Thread::terminate(); // ask thread to terminate

    interface_p->terminateLookahead (); // stop lookahead
}

void
VLAT::throwIfSweepTerminated ()
{
    if (interface_p->isSweepTerminationRequested()){
        throw SweepTerminated ();
    }
}

Bool
VLAT::waitForViReset()
{
    UniqueLock uniqueLock (interface_p->getMutex());

    while (! interface_p->viResetRequested () &&
           ! interface_p->isLookaheadTerminationRequested ()){

        handleWrite (); // process any pending write requests

        // Wait for the interface to change:
        //
        //   o Buffer consumed by main thread
        //   o A write was requested
        //   o A sweep or thread termination was requested

        interface_p->waitForInterfaceChange (uniqueLock);
    }

    handleWrite (); // One more time to be sure that all writes are completed before
                    // we either quit or rewind the iterator.

    if (interface_p->isLookaheadTerminationRequested ()){
        return false;
    }
    else{

        vlaData_p->resetBufferData ();

        roviaModifiers_p = interface_p->transferRoviaModifiers ();

        interface_p->viResetComplete ();

        return true;
    }
}

void
VLAT::waitUntilFillCanStart ()
{
    UniqueLock uniqueLock (interface_p->getMutex());

    while ( ! vlaData_p->fillCanStart () &&
            ! interface_p->isSweepTerminationRequested ()){

        handleWrite (); // process any pending write requests

        // Wait for the interface to change:
        //
        //   o Buffer consumed by main thread
        //   o A write was requested
        //   o A sweep or thread termination was requested

        interface_p->waitForInterfaceChange (uniqueLock);
    }
}

void
VlatFunctor::operator() (VisBuffer *)
{
    ThrowIf (true, "No filler is defined for this VisBuffer component: " + name_p);
}


} // end namespace asyncio

using namespace casacore;
} // end namespace casa