/*
 * VisibilityIteratorAsync.cc
 *
 *  Created on: Nov 2, 2010
 *      Author: jjacobs
 */

#include <msvis/MSVis/VisibilityIterator.h>
#include <msvis/MSVis/VisibilityIteratorImplAsync.h>
#include <msvis/MSVis/VisBufferAsync.h>
#include <msvis/MSVis/VisBufferAsyncWrapper.h>
#include <msvis/MSVis/VLAT.h>
#include <msvis/MSVis/AsynchronousInterface.h>
#include <stdcasa/UtilJ.h>
#include <casacore/casa/Utilities/GenSort.h>

#include <casacore/ms/MeasurementSets/MSColumns.h>
#include <casacore/casa/System/AipsrcValue.h>

#include <algorithm>
#include <cstdarg>
#include <ostream>

using namespace std;

using namespace casacore;
using namespace casa::utilj;

using namespace casacore;
using namespace casa::asyncio;

#define Log(level, ...) \
        {if (VlaData::loggingInitialized_p && level <= VlaData::logLevel_p) \
    Logger::get()->log (__VA_ARGS__);};

#define NotImplemented     Throw ("VisibilityIteratorReadImplAsync: Method not implemented!");

#define VIWIA_NotImplemented() \
        Throw ("ViWriteImplAsync --> Operation not permitted!\n" \
               "Modify VisBuffer, mark components as dirty and use VisibilityIterator::writeBack.");

#define ThrowIfNoVbaAttached() \
        ThrowIf (visBufferAsync_p == 0, "No VisBufferAsync attached to VI; try doing vi.origin() first.");


using namespace casacore;
namespace casa {


ViReadImplAsync::ViReadImplAsync (const Block<MeasurementSet> & mss,
                                  const PrefetchColumns & prefetchColumns,
                                  const Block<Int> & sortColumns,
                                  const Bool addDefaultSortCols,
                                  Double timeInterval,
                                  Bool writable)
: visBufferAsync_p (NULL),
  vlaData_p (NULL),
  vlat_p (NULL)
{
    construct (mss, prefetchColumns, sortColumns, addDefaultSortCols,
               timeInterval, writable);
}

ViReadImplAsync::ViReadImplAsync (const PrefetchColumns & prefetchColumns,
                                  const VisibilityIteratorReadImpl & other,
                                  Bool writable)
: visBufferAsync_p (NULL),
  vlaData_p (NULL),
  vlat_p (NULL)
{
    Block<MeasurementSet> mss (other.measurementSets_p.size());

    for (int i = 0; i < (Int) other.measurementSets_p.size(); i++){
        mss [i] = other.measurementSets_p [i];
    }

    construct (mss, prefetchColumns, other.sortColumns_p,
               other.addDefaultSort_p, other.timeInterval_p, writable);

    imwgt_p = other.imwgt_p;

    // Pass over to the VLAT some of the VI modifiers if they are
    // not at the default value.

    Bool needViReset = false;

    if (other.timeInterval_p != 0){
        setInterval (other.timeInterval_p);
        needViReset = true;
    }

    if (other.nRowBlocking_p != 0){
        setRowBlocking (other.nRowBlocking_p);
        needViReset = true;
    }

    if (other.msChannels_p.nGroups_p.nelements() != 0){
        selectChannel (other.msChannels_p.nGroups_p,
                       other.msChannels_p.start_p,
                       other.msChannels_p.width_p,
                       other.msChannels_p.inc_p,
                       other.msChannels_p.spw_p);
        needViReset = true;
    }

    if (needViReset){
        originChunks ();
    }
}


ViReadImplAsync::~ViReadImplAsync ()
{
    if (! vbaWrapperStack_p.empty()){
        VisBufferAsync * vba =
                vbaWrapperStack_p.top()->releaseVba ();
        assert (vba == visBufferAsync_p);
        UnusedVariable (vba);
        vba = NULL; // prevent warning when in non425debug build
        delete visBufferAsync_p;
    }
    else if (visBufferAsync_p != NULL){
        delete visBufferAsync_p;
    }

    interface_p->terminate ();
    delete interface_p;
}

void
ViReadImplAsync::advance ()
{
    //readComplete (); // complete any pending read

    subchunk_p.incrementSubChunk ();

    fillVisBuffer ();
}

Bool
ViReadImplAsync::allBeamOffsetsZero () const
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    return visBufferAsync_p -> getAllBeamOffsetsZero ();
}

const Vector<String> &
ViReadImplAsync::antennaMounts () const
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    return visBufferAsync_p -> getAntennaMounts ();
}


void
ViReadImplAsync::attachVisBuffer (VisBuffer & vb0)
{
    VisBufferAsyncWrapper * vb = dynamic_cast<VisBufferAsyncWrapper *> (& vb0);
    ThrowIf (vb == NULL, "Attempt to attach other than VisBufferAsyncWrapper");

    if (! vbaWrapperStack_p.empty()){
        vbaWrapperStack_p.top () -> releaseVba ();
    }

    vbaWrapperStack_p.push (vb);

    if (visBufferAsync_p != NULL){
        vb->wrap (visBufferAsync_p);
    }
}

ViReadImplAsync::PrefetchColumns
ViReadImplAsync::augmentPrefetchColumns (const PrefetchColumns & prefetchColumnsBase)
{
    // Augment the list of prefetch columns with any that are implied
    // by the others.  N.B., be wary of reordering these.

    PrefetchColumns prefetchColumns = prefetchColumnsBase;

    if (contains (VisBufferComponents::Direction1, prefetchColumns)){
        prefetchColumns.insert (VisBufferComponents::AllBeamOffsetsZero);
        prefetchColumns.insert (VisBufferComponents::AntennaMounts);
        prefetchColumns.insert (VisBufferComponents::Feed1_pa);
        prefetchColumns.erase (VisBufferComponents::Direction1);
    }

    if (contains (VisBufferComponents::Direction2, prefetchColumns)){
        prefetchColumns.insert (VisBufferComponents::AllBeamOffsetsZero);
        prefetchColumns.insert (VisBufferComponents::AntennaMounts);
        prefetchColumns.insert (VisBufferComponents::Feed2_pa);
        prefetchColumns.erase (VisBufferComponents::Direction2);
    }

    if (contains (VisBufferComponents::Feed1_pa, prefetchColumns)){
        prefetchColumns.insert (VisBufferComponents::Ant1);
        prefetchColumns.insert (VisBufferComponents::Feed1);
        prefetchColumns.insert (VisBufferComponents::PhaseCenter);
        prefetchColumns.insert (VisBufferComponents::ReceptorAngles);
        prefetchColumns.insert (VisBufferComponents::Time);
        prefetchColumns.insert (VisBufferComponents::TimeInterval);
        prefetchColumns.erase (VisBufferComponents::Feed1_pa);
    }

    if (contains (VisBufferComponents::Feed2_pa, prefetchColumns)){
        prefetchColumns.insert (VisBufferComponents::Ant2);
        prefetchColumns.insert (VisBufferComponents::Feed2);
        prefetchColumns.insert (VisBufferComponents::PhaseCenter);
        prefetchColumns.insert (VisBufferComponents::ReceptorAngles);
        prefetchColumns.insert (VisBufferComponents::Time);
        prefetchColumns.insert (VisBufferComponents::TimeInterval);
        prefetchColumns.erase (VisBufferComponents::Feed2_pa);
    }

    return prefetchColumns;
}

const Cube<RigidVector<Double, 2> >&
ViReadImplAsync::getBeamOffsets () const
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    return visBufferAsync_p -> getBeamOffsets ();
}



VisibilityIteratorReadImpl *
ViReadImplAsync::clone () const
{
    Throw ("ViReadImplAsync: cloning not permitted!!!");
}

void
ViReadImplAsync::construct(const Block<MeasurementSet> & mss,
                           const PrefetchColumns & prefetchColumns,
                           const Block<Int> & sortColumns,
                           const Bool addDefaultSortCols,
                           Double timeInterval,
                           Bool writable)
{
    AsynchronousInterface::initializeLogging();

    casa::async::Logger::get()->registerName ("Main");

    visBufferAsync_p = NULL;

    for (uint i = 0; i < mss.size(); i++){
        measurementSets_p.push_back (mss [i]);
    }

    msId_p = -1;

    // Create and initialize the Asynchronous Interface

    Int nReadAheadBuffers = getDefaultNBuffers ();

    interface_p = new AsynchronousInterface (nReadAheadBuffers);
    interface_p->initialize();

    vlaData_p = interface_p->getVlaData ();
    vlat_p = interface_p->getVlat ();

    // Augment the list of prefetch columns with any that are implied
    // by the others.  N.B., be wary of reordering these.

    prefetchColumns_p = augmentPrefetchColumns (prefetchColumns);

    // Get the VLAT going

    vlat_p->setPrefetchColumns (prefetchColumns_p);

    // If this is a writable VI then let the write implementation handle the
    // initialization of the VLAT.

    vlat_p->initialize (mss, sortColumns, addDefaultSortCols, timeInterval, writable);
    vlat_p->startThread ();
}

void
ViReadImplAsync::detachVisBuffer (VisBuffer & vb0)
{
    VisBufferAsyncWrapper * vb = dynamic_cast<VisBufferAsyncWrapper *> (& vb0);

    ThrowIf (vb == NULL, "Attempt to detach other than a VisBufferAsyncWrapper");

    if (vb == vbaWrapperStack_p.top()){

        // Get rid of the old buffer

        VisBufferAsync * vba = vb->releaseVba ();
        Assert (vba == visBufferAsync_p);
        UnusedVariable (vba); // prevent release build warning

        vbaWrapperStack_p.pop ();

        // If there is still a VB attached either fill it with the
        // current values for the VI position or clear it.

        if (! vbaWrapperStack_p.empty()){

            if (visBufferAsync_p != NULL){
                vbaWrapperStack_p.top() -> wrap (visBufferAsync_p);
            }
        }

    } else {
        Throw ("ViReadImplAsync::detachVisBuffer: VisBufferAsync not attached ");
    }
}

void
ViReadImplAsync::dumpPrefetchColumns () const
{
    int i = 0;
    for (PrefetchColumns::const_iterator c = prefetchColumns_p.begin();
            c != prefetchColumns_p.end();
            c ++){

        cerr << PrefetchColumns::columnName (*c) << ", ";
        i ++;
        if (i == 10){
            cerr << endl;
            i = 0;
        }

    }

    if (i != 0)
        cerr << endl;

}

void
ViReadImplAsync::fillVisBuffer()
{
    // Get the next buffer from the lookahead buffer ring.
    // This could block if the next buffer is not ready.
    // Before doing the fill check to see that there's more data.

    if (more()){

        readComplete ();

        visBufferAsync_p = vlaData_p->readStart (subchunk_p);

        Assert (visBufferAsync_p != NULL);

        msId_p = visBufferAsync_p->msId ();

        // If a VisBufferAsync is attached, then copy the prefetched VisBuffer into it.

        if (! vbaWrapperStack_p.empty ()){

            vbaWrapperStack_p.top() -> wrap (visBufferAsync_p);
        }
    }
}

//Vector<MDirection>
//ViReadImplAsync::fillAzel(Double time) const
//{
//    NotImplemented;
//}




//void
//ViReadImplAsync::fillVisBuffer ()
//{
//    vlaDatum_p = impl_p->getVlaData()->getDatum (subchunk_p);
//
//    ThrowIf (datum == NULL, format("Failed to get datum for subchunk (%d, %d)", subchunk_p));
//
//    if (visBuffer_p != NULL){
//        visBuffer_p->setRealBuffer (vlaDatum_p->getVisBuffer());
//    }
//}

void
ViReadImplAsync::getChannelSelection(Block< Vector<Int> >& blockNGroup,
                                     Block< Vector<Int> >& blockStart,
                                     Block< Vector<Int> >& blockWidth,
                                     Block< Vector<Int> >& blockIncr,
                                     Block< Vector<Int> >& blockSpw){

    asyncio::ChannelSelection channelSelection = vlaData_p->getChannelSelection ();
    channelSelection.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
}

int
ViReadImplAsync::getDefaultNBuffers ()
{
    int nBuffers;
    casacore::AipsrcValue<Int>::find (nBuffers, ROVisibilityIterator::getAsyncRcBase () + ".nBuffers", 2);

    return nBuffers;
}

MEpoch
ViReadImplAsync::getEpoch () const
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    return visBufferAsync_p -> mEpoch_p;
}

//ViReadImplAsync::PrefetchColumns
//ViReadImplAsync::getPrefetchColumns () const
//{
//    return prefetchColumns_p;
//}
const MeasurementSet &
ViReadImplAsync::getMs() const
{
    ThrowIfNoVbaAttached ();

    return visBufferAsync_p->getMs ();
}


VisBuffer *
ViReadImplAsync::getVisBuffer ()
{
    // Returns the currently attached VisBufferAsync or NULL if none attached

    VisBuffer * vb = (! vbaWrapperStack_p.empty()) ? vbaWrapperStack_p.top() : NULL;
    return vb;
}

//void
//ViReadImplAsync::linkWithRovi (VisibilityIteratorReadImpl * rovi)
//{
//    linkedVisibilityIterator_p = rovi;
//}


Bool
ViReadImplAsync::more () const
{
    // Returns true if the lookahead data structure has the next subchunk.

    Bool b = vlaData_p->isValidSubChunk (subchunk_p);

    return b;
}

Bool
ViReadImplAsync::moreChunks () const
{
    // Returns true if the looahead data structure has the first subchunk of the
    // next chunk.

    Bool b = vlaData_p->isValidChunk (subchunk_p.chunk());

    return b;
}

const MSColumns &
ViReadImplAsync::msColumns() const
{
    ThrowIfNoVbaAttached ();

    return visBufferAsync_p->msColumns();
}


Int
ViReadImplAsync::msId() const
{
    ThrowIf (msId_p < 0, "MS ID value not currently known.");

    return msId_p;
}


ViReadImplAsync &
ViReadImplAsync::nextChunk ()
{
    // Terminates the current read and advances the state of this
    // object to expect to access the first subchunk of the next
    // chunk

    subchunk_p.incrementChunk ();

    if (moreChunks()){

        readComplete (); // complete any pending read

    }

    return * this;
}

Int
ViReadImplAsync::nRowChunk() const
{
    ThrowIfNoVbaAttached ();

    return visBufferAsync_p->nRowChunk ();
}


Int
ViReadImplAsync::numberSpw()
{
    ThrowIfNoVbaAttached ();

    return visBufferAsync_p->getNSpw ();
}


void
ViReadImplAsync::origin ()
{
    // Terminates the current read and

    readComplete (); // complete any pending read

    subchunk_p.resetSubChunk();

    fillVisBuffer ();

    updateMsd ();
}

void
ViReadImplAsync::originChunks ()
{
    readComplete (); // complete any pending read

    subchunk_p.resetToOrigin ();

    interface_p->requestViReset ();
}

void
ViReadImplAsync::readComplete()
{
    if (visBufferAsync_p != NULL){

        // A buffer in the buffer ring was in use: clean up

        if (! vbaWrapperStack_p.empty()){

            // Break connection between our VisBufferAsync and
            // the shared data

            vbaWrapperStack_p.top()->releaseVba ();
        }

        // Clear the pointer to the shared buffer to indicate
        // internally that the read is complete

        delete visBufferAsync_p;
        visBufferAsync_p = NULL;

        // Inform the buffer ring that the read is complete.

        vlaData_p->readComplete (subchunk_p);
    }
}

void
ViReadImplAsync::updateMsd ()
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    msd_p.setAntennas (visBufferAsync_p->msColumns().antenna());

    MDirection phaseCenter = visBufferAsync_p -> getPhaseCenter();
    msd_p.setFieldCenter (phaseCenter);
}


const Cube<Double>&
ViReadImplAsync::receptorAngles() const
{
    ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");

    return visBufferAsync_p -> getReceptorAngles ();
}


void
ViReadImplAsync::saveMss (const Block<MeasurementSet> & mss)
{
    measurementSets_p.clear();
    for (uint i = 0; i < mss.size(); i++){
        measurementSets_p.push_back (mss [i]);
    }
}

void
ViReadImplAsync::saveMss (const MeasurementSet & ms)
{
    measurementSets_p.clear();
    measurementSets_p.push_back (ms);
}

VisibilityIteratorReadImpl&
ViReadImplAsync::selectVelocity (Int nChan,
                                 const MVRadialVelocity& vStart,
                                 const MVRadialVelocity& vInc,
                                 MRadialVelocity::Types rvType,
                                 MDoppler::Types dType,
                                 Bool precise)
{
    SelectVelocityModifier * svm = new SelectVelocityModifier (nChan, vStart, vInc, rvType, dType, precise);

    interface_p->addModifier (svm); // ownership is transferred by this call

    originChunks ();

    return * this;
}

VisibilityIteratorReadImpl&
ViReadImplAsync::selectChannel(Int nGroup,
                               Int start,
                               Int width,
                               Int increment,
                               Int spectralWindow)
{
    SelectChannelModifier * scm = new SelectChannelModifier (nGroup, start, width, increment, spectralWindow);

    interface_p->addModifier (scm);
    // ownership is transferred by this call

    return * this;
}

VisibilityIteratorReadImpl&
ViReadImplAsync::selectChannel(const Block< Vector<Int> >& blockNGroup,
                               const Block< Vector<Int> >& blockStart,
                               const Block< Vector<Int> >& blockWidth,
                               const Block< Vector<Int> >& blockIncr,
                               const Block< Vector<Int> >& blockSpw)
{
    SelectChannelModifier * scm = new SelectChannelModifier (blockNGroup, blockStart, blockWidth,
                                                             blockIncr, blockSpw);

    interface_p->addModifier (scm);
    // ownership is transferred by this call

    msChannels_p.nGroups_p = blockNGroup;
    msChannels_p.start_p = blockStart;
    msChannels_p.width_p = blockWidth;
    msChannels_p.inc_p = blockIncr;
    msChannels_p.spw_p = blockSpw;

    return * this;
}

void
ViReadImplAsync::setInterval (Double timeInterval)
{
    SetIntervalModifier * sim = new SetIntervalModifier (timeInterval);

    interface_p->addModifier (sim);
}


void
ViReadImplAsync::setRowBlocking(Int nRow)
{
    SetRowBlockingModifier * srbm = new SetRowBlockingModifier (nRow);

    interface_p->addModifier (srbm);
}

//void
//ViReadImplAsync::startVlat ()
//{
//    vlat_p->startThread ();
//}

namespace asyncio {

PrefetchColumns
PrefetchColumns::operator+ (const PrefetchColumns & other)
{
    // Form and return the union

            PrefetchColumns result;
    result.insert (begin(), end());

    for (const_iterator o = other.begin(); o != other.end(); o++){

        result.insert (* o);
    }

    return result;
}

} // end namespace asyncio

//void
//ViReadImplAsync::setPrefetchColumns (const PrefetchColumns & columns) const
//{
//
//}


//VisibilityIteratorReadImpl&
//ViReadImplAsync::selectChannel(Int nGroup=1,
//                                        Int start=0,
//                                        Int width=0,
//                                        Int increment=1,
//                                        Int spectralWindow=-1)
//{
//    VisibilityIteratorReadImpl & rovi = VisibilityIterator::selectChannel (nGroup, start, width, increment, spectralWindow);
//
//    ThrowIf (visBuffer_p == NULL, "No VisBufferAsync attached");
//
//    visBuffer_p->setBlockSpw (blockSpw_p);
//
//    return rovi;
//}

//VisibilityIteratorReadImpl&
//ViReadImplAsync::selectChannel(Block< Vector<Int> >& blockNGroup,
//                                        Block< Vector<Int> >& blockStart,
//                                        Block< Vector<Int> >& blockWidth,
//                                        Block< Vector<Int> >& blockIncr,
//                                        Block< Vector<Int> >& blockSpw)
//{
//    VisibilityIteratorReadImpl & rovi = VisibilityIterator::selectChannel (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
//
//    ThrowIf (visBuffer_p == NULL, "No VisBufferAsync attached");
//
//    visBuffer_p->setBlockSpw (blockSpw_p);
//
//    return rovi;
//}

ViWriteImplAsync::ViWriteImplAsync (VisibilityIterator * vi)
: VisibilityIteratorWriteImpl (vi)
{
}

//ViWriteImplAsync::ViWriteImplAsync (VisibilityIterator * vi)
// : VisibilityIteratorWriteImpl (vi)
//{
//}

ViWriteImplAsync::ViWriteImplAsync (const PrefetchColumns & /*prefetchColumns*/,
                                    const VisibilityIteratorWriteImpl & /*other*/,
                                    VisibilityIterator * vi)
: VisibilityIteratorWriteImpl (vi)
{}


ViWriteImplAsync::~ViWriteImplAsync ()
{}

VisibilityIteratorWriteImpl *
ViWriteImplAsync::clone () const
{
    Throw ("ViWriteImplAsync: cloning not permitted!!!");
}

ViReadImplAsync *
ViWriteImplAsync::getReadImpl()
{
    return dynamic_cast <ViReadImplAsync *> (VisibilityIteratorWriteImpl::getReadImpl());
}

void
ViWriteImplAsync::putModel(const RecordInterface& rec, Bool iscomponentlist, Bool incremental)
{
//visBufferAsync_p;
    //Throw ("ViWriteImplAsync::putModel not implemented!!!");

  Vector<Int> fields = getReadImpl()->msColumns().fieldId().getColumn();
  const Int option = Sort::HeapSort | Sort::NoDuplicates;
  const Sort::Order order = Sort::Ascending;

  Int nfields = GenSort<Int>::sort (fields, order, option);

  // Make sure  we have the right size

  fields.resize(nfields, true);
  Int msid = getReadImpl()->msId();

  Vector<Int> spws =  getReadImpl()->msChannels_p.spw_p[msid];
  Vector<Int> starts = getReadImpl()->msChannels_p.start_p[msid];
  Vector<Int> nchan = getReadImpl()->msChannels_p.width_p[msid];
  Vector<Int> incr = getReadImpl()->msChannels_p.inc_p[msid];

  CountedPtr<VisModelDataI> visModelData = VisModelDataI::create();

  visModelData->putModelI (getReadImpl()->getMs (), rec, fields, spws, starts, nchan, incr,
                           iscomponentlist, incremental);

}



void
ViWriteImplAsync::setFlag(const Matrix<Bool>& flag)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();
    SubChunkPair subchunk = getReadImpl()->getSubchunkId ();

    writeQueue.enqueue (createWriteData (subchunk, flag, & VisibilityIterator::setFlag));
}

void
ViWriteImplAsync::setFlag(const Cube<Bool>& flag)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         flag,
                                         & VisibilityIterator::setFlag));
}

void
ViWriteImplAsync::setFlagCategory (const Array<Bool> & flagCategory)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         flagCategory,
                                         & VisibilityIterator::setFlagCategory));
}


void
ViWriteImplAsync::setFlagRow(const Vector<Bool>& rowflags)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         rowflags,
                                         & VisibilityIterator::setFlagRow));
}

void
ViWriteImplAsync::setVis(const Matrix<CStokesVector>& vis, DataColumn whichOne)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         vis,
                                         whichOne,
                                         & VisibilityIterator::setVis));
}

void
ViWriteImplAsync::setVis(const Cube<Complex>& vis, DataColumn whichOne)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         vis,
                                         whichOne,
                                         & VisibilityIterator::setVis));
}

void
ViWriteImplAsync::setVisAndFlag(const Cube<Complex>& vis, const Cube<Bool>& flag,
                                DataColumn whichOne)
{
    setVis (vis, whichOne);
    setFlag (flag);
}

void
ViWriteImplAsync::setWeight(const Vector<Float>& wt)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();
    SubChunkPair subchunk = getReadImpl()->getSubchunkId ();

    writeQueue.enqueue (createWriteData (subchunk, wt, & VisibilityIterator::setWeight));
}

void
ViWriteImplAsync::setWeightMat(const Matrix<Float>& wtmat)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         wtmat,
                                         & VisibilityIterator::setWeightMat));
}

void
ViWriteImplAsync::setWeightSpectrum(const Cube<Float>& wtsp)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         wtsp,
                                         & VisibilityIterator::setWeightSpectrum));
}

void
ViWriteImplAsync::setSigma(const Vector<Float>& sig)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         sig,
                                         & VisibilityIterator::setSigma));
}

void
ViWriteImplAsync::setSigmaMat(const Matrix<Float>& sigmat)
{
    AsynchronousInterface * interface = getReadImpl()->interface_p;
    WriteQueue & writeQueue = interface->getWriteQueue();

    writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
                                         sigmat,
                                         & VisibilityIterator::setSigmaMat));
}

using namespace casacore;
} // end namespace casa