/*
 * VlaData.h
 *
 *  Created on: Sep 21, 2011
 *      Author: jjacobs
 */

#ifndef ASYNCHRONOUS_INTERFACE_H_
#define ASYNCHRONOUS_INTERFACE_H_

#include <stdcasa/thread/AsynchronousTools.h>
#include <stdcasa/UtilJ.h>

using casa::utilj::ThreadTimes;
using casa::utilj::DeltaThreadTimes;

#include <casacore/casa/Arrays/Cube.h>
#include <casacore/casa/Arrays/Matrix.h>
#include <casacore/casa/Arrays/Vector.h>
#include <casacore/casa/Containers/Block.h>
#include <casacore/casa/Quanta/MVRadialVelocity.h>
#include <casacore/measures/Measures/MRadialVelocity.h>
#include <casacore/measures/Measures/MDoppler.h>
#include "VisBufferAsync.h"
#include "VisibilityIterator.h"
#include "VisibilityIteratorImpl.h"

///#pragma GCC diagnostic warning "-Wno-missing-field-initializers"
#include <memory>
#include <queue>
#include <vector>

namespace casa {

class ROVisibilityIterator;

namespace asyncio {

class RoviaModifier {
public:

    friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m);

    virtual ~RoviaModifier () {}
    virtual void apply (ROVisibilityIterator *) const = 0;

    inline operator std::string( ) const {
        std::stringstream ss;
        print(ss);
        return ss.str( );
    }

private:

    virtual void print (std::ostream & o) const = 0;

};

class ChannelSelection {

public:

    ChannelSelection () {}

    ChannelSelection (const casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
                      const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
                      const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
                      const casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
                      const casacore::Block< casacore::Vector<casacore::Int> > & blockSpw);

    ChannelSelection (const ChannelSelection & other);
    ChannelSelection & operator= (const ChannelSelection & other);


    void
    get (casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
         casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
         casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
         casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
         casacore::Block< casacore::Vector<casacore::Int> > & blockSpw) const;

protected:

    void copyBlock (const casacore::Block <casacore::Vector<casacore::Int> > & src,
                    casacore::Block <casacore::Vector<casacore::Int> > & to) const;

private:

    casacore::Block< casacore::Vector<casacore::Int> > blockNGroup_p;
    casacore::Block< casacore::Vector<casacore::Int> > blockStart_p;
    casacore::Block< casacore::Vector<casacore::Int> > blockWidth_p;
    casacore::Block< casacore::Vector<casacore::Int> > blockIncr_p;
    casacore::Block< casacore::Vector<casacore::Int> > blockSpw_p;
};


class SelectChannelModifier : public RoviaModifier {

public:

    SelectChannelModifier (casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow);
    SelectChannelModifier (const casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
                           const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
                           const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
                           const casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
                           const casacore::Block< casacore::Vector<casacore::Int> > & blockSpw);

    void apply (ROVisibilityIterator *) const;

private:

    casacore::Bool channelBlocks_p;
    ChannelSelection channelSelection_p;
    casacore::Int increment_p;
    casacore::Int nGroup_p;
    casacore::Int spectralWindow_p;
    casacore::Int start_p;
    casacore::Int width_p;

    void print (std::ostream & o) const;
    casacore::String toCsv (const casacore::Block< casacore::Vector<casacore::Int> > & bv) const;
    casacore::String toCsv (const casacore::Vector<casacore::Int> & v) const;

};

class SetIntervalModifier : public RoviaModifier {

public:

    SetIntervalModifier  (casacore::Double timeInterval);
    void apply (ROVisibilityIterator *) const;

private:

    casacore::Double timeInterval_p;

    void print (std::ostream & o) const;
};


class SetRowBlockingModifier : public RoviaModifier {

public:

    SetRowBlockingModifier (casacore::Int nRows);
    void apply (ROVisibilityIterator *) const;

private:

    casacore::Int nRows_p;
    casacore::Int nGroup_p;
    casacore::Int spectralWindow_p;
    casacore::Int start_p;
    casacore::Int width_p;

    void print (std::ostream & o) const;
};

class RoviaModifiers {

public:

    ~RoviaModifiers ();

    void add (RoviaModifier *);
    void apply (ROVisibilityIterator *);
    void clearAndFree ();
    RoviaModifiers transferModifiers ();

private:

    typedef std::vector<RoviaModifier *> Data;
    Data data_p;

};

class SelectVelocityModifier : public RoviaModifier {

public:

    SelectVelocityModifier (casacore::Int nChan, const casacore::MVRadialVelocity& vStart, const casacore::MVRadialVelocity& vInc,
                            casacore::MRadialVelocity::Types rvType, casacore::MDoppler::Types dType, casacore::Bool precise);
    void apply (ROVisibilityIterator *) const;

private:

    casacore::MDoppler::Types dType_p;
    casacore::Int nChan_p;
    casacore::Bool precise_p;
    casacore::MRadialVelocity::Types rvType_p;
    casacore::MVRadialVelocity vInc_p;
    casacore::MVRadialVelocity vStart_p;

    virtual void print (std::ostream & o) const;

};


// <summary>
//    VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
//    ROVisibilityIteratorAsync.
// </summary>

// <use visibility=local>

// <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
// </reviewed>

// <prerequisite>
//   <li> VisBuffer
//   <li> VisBufferAsync
//   <li> ROVisibilityIteratorAsync
//   <li> VlaData
//   <li> VLAT
// </prerequisite>
//
// <etymology>
//    VlaDatum is the quantum of data associated with a single position of the visibility
//    look-ahead scheme.
// </etymology>
//
// <synopsis>
//    VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
//    main thread.  A collection of VlaDatum objects is organized as a buffer ring in a
//    VlaData object.
//
//    A VlaDatum object is responsible for maintaining its state as well as containing the set
//    of data accessed from a single position of a ROVisibilityIterator.  It contains a
//    VisibilityBufferAsync object to hold the data that will be used by the main thread; other
//    data is maintained in member variables.
//
//    VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
//    It does support a set of states that indicate its current use:
//        Empty -> Filling -> Full -> Reading -> Empty.
//    Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
// </synopsis>
//
// <example>
// </example>
//
// <motivation>
// </motivation>
//
// <thrown>
//    <li>casacore::AipsError for unhandleable errors
// </thrown>
//
// <todo asof="yyyy/mm/dd">
// </todo>


class VlaDatum {

public:

    typedef enum {Empty, Filling, Full, Reading} State;

    VlaDatum (SubChunkPair);
    ~VlaDatum ();

    SubChunkPair  getSubChunkPair () const;
    VisBufferAsync * getVisBuffer ();
    //const VisBufferAsync * getVisBuffer () const;
    casacore::Bool isSubChunk (SubChunkPair) const;

    VisBufferAsync * releaseVisBufferAsync ();
    void reset ();

protected:

private:

    SubChunkPair     subchunk_p;
    VisBufferAsync * visBuffer_p;

    // Illegal operations

    VlaDatum & operator= (const VlaDatum & other);

};

class VLAT;

// <summary>
//    The VlaData class is a buffer ring used to support the communication between
//    the visiblity lookahead thread (VLAT) and the main application thread.  It
//    implements the required concurrency control to support this communication.
// </summary>

// <use visibility=local>

// <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
// </reviewed>

// <prerequisite>
//   <li> VisBuffer
//   <li> VisBufferAsync
//   <li> ROVisibilityIteratorAsync
//   <li> VlaData
//   <li> VLAT
// </prerequisite>
//
// <etymology>
//    VlaData is the entire collection of visibility look-ahead data currently (or potentially)
//    shared between the lookahead and main threads.
// </etymology>
//
// <synopsis>
//    The VlaData object supports the sharing of information between the VLAT look ahead thread and
//    the main thread.  It contains a buffer ring of VlaDatum objects which each hold all of the
//    data that is normally access by a position of a ROVisibiltyIterator.  Other data that is shared
//    or communicated between the two threads is also managed by VlaData.
//
//    A single mutex (member variable mutex_p) is used to protect data that is shared between the
//    two threads.  In addition there is a single PThreads condition variable, vlaDataChanged_p used
//    to allow either thread to wait for the other to change the state of VlaData object.
//
//    Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
//    VlaDatum object.  Whenever a free buffer (VlaDatum object) is available the VLAT thread will
//    fill it with the data from the next position of the ROVisibilityIterator; a buffer is free for
//    filling when it is in the Empty state.  Before the VLAT fills a buffer it must call fillStart;
//    this method will block the caller until the next buffer in the ring becomes free; as a side effect
//    the buffer's state becomes Filling.  After fillStart is complete, the VLAT owns the buffer.
//    When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
//    the buffer state to change from Filling to Full.
//
//    The main thread calls readStart to get the next filled buffer; the main thread is blocked until
//    the a filled buffer is available.  When the full buffer is ready its state is changed to Reading
//    and readStart returns.  The VLAT thread will not access the buffer while the main thread is reading.
//    The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
//    does not block the main thread except potentially to acquire the mutex.
//
//    The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
//    CASA data structures.  Some of the CASA containers (e.g., casacore::Array<T>) can potentially share storage
//    although it appears that normal operation they do not.  Some problems have been encountered with
//    objects that share data via reference-counted pointers.  For instance, objects derived from
//    casacore::MeasBase<T,U> (e.g., casacore::MDirection, casacore::MPosition, casacore::MEpoch, etc.) share the object that serves as the
//    frame of reference for the measurement; only by converting the object to text and back again can
//    a user easily obtain a copy which does not share values with another.  It is possible that other
//    objects deep many layers down a complex object may still be waiting to trip up VlaData's
//    concurrency scheme.
//
//    On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
//    iterator back to the start of a MeasurementSet.  This usually happens either at the start of the MS
//    sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
//    additional pass through the casacore::MS).  The main thread requests a reset of the VI and then is blocked
//    until the VI is reset.  The sweepTerminationRequested_p variable is set to true; when the VLAT
//    discovers that this variable is true it resets the buffer ring, repositions its VI to the start
//    of the casacore::MS and then informs the blocked main thread by setting viResetComplete to true and
//    signalling vlaDataChanged_p.
// </synopsis>
//
// <example>
// </example>
//
// <motivation>
// </motivation>
//
// <thrown>
//    <li> AipsError
// </thrown>
//
// <todo asof="yyyy/mm/dd">
// </todo>

class AsynchronousInterface;
class InterfaceController;

class VlaData {

public:

    VlaData (casacore::Int maxNBuffers, async::Mutex & mutex);
    ~VlaData ();

    casacore::Bool fillCanStart () const;
    void fillComplete (VlaDatum * datum);
    VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime);
    asyncio::ChannelSelection getChannelSelection () const;
    void initialize (const AsynchronousInterface *);
    void insertValidChunk (casacore::Int chunkNumber);
    void insertValidSubChunk (SubChunkPair);
    casacore::Bool isValidChunk (casacore::Int chunkNumber) const;
    casacore::Bool isValidSubChunk (SubChunkPair) const;
    void readComplete (SubChunkPair);
    VisBufferAsync * readStart (SubChunkPair);
    void resetBufferData ();
    void setNoMoreData ();
    void storeChannelSelection (const asyncio::ChannelSelection & channelSelection);

    //static void debugBlock ();
    //static void debugUnblock ();
    //static casacore::Bool logThis (casacore::Int level);

    //static casacore::Bool loggingInitialized_p;
    //static casacore::Int logLevel_p;

protected:

private:

    typedef std::queue<VlaDatum *> Data;
    typedef std::queue<casacore::Int> ValidChunks;
    typedef std::queue<SubChunkPair> ValidSubChunks;

    class Timing {
    public:
        ThreadTimes      fill1_p;
        ThreadTimes      fill2_p;
        ThreadTimes      fill3_p;
        DeltaThreadTimes fillCycle_p;
        DeltaThreadTimes fillOperate_p;
        DeltaThreadTimes fillWait_p;
        ThreadTimes      read1_p;
        ThreadTimes      read2_p;
        ThreadTimes      read3_p;
        DeltaThreadTimes readCycle_p;
        DeltaThreadTimes readOperate_p;
        DeltaThreadTimes readWait_p;
        ThreadTimes      timeStart_p;
        ThreadTimes      timeStop_p;
    };

    asyncio::ChannelSelection     channelSelection_p; // last channels selected for the VI in use
    Data                          data_p;             // Buffer queue
    const AsynchronousInterface * interface_p;
    const casacore::Int                     MaxNBuffers_p;
    async::Mutex &                mutex_p; // provided by Asynchronous interface
    Timing                        timing_p;
    mutable ValidChunks           validChunks_p;       // casacore::Queue of valid chunk numbers
    mutable ValidSubChunks        validSubChunks_p; // casacore::Queue of valid subchunk pairs


    casacore::Int clock (casacore::Int arg, casacore::Int base);
    casacore::String makeReport ();

    casacore::Bool statsEnabled () const;
    void terminateSweep ();

    //// static Semaphore debugBlockSemaphore_p; // used to block a thread for debugging

    static casacore::Bool initializeLogging ();

    // Illegal operations

    VlaData (const VlaData & other);
    VlaData & operator= (const VlaData & other);
};

class WriteData {

public:

    WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {}

    virtual ~WriteData () {}

    SubChunkPair getSubChunkPair () const { return subchunkPair_p;}
    virtual void write (VisibilityIterator * vi) = 0;

private:

    SubChunkPair subchunkPair_p;

};

template <typename Data>
class WriteDataImpl : public WriteData {
public:

    typedef void (VisibilityIterator::* Setter) (const Data &);

    WriteDataImpl (const SubChunkPair & subchunkPair,
                   const Data & data,
                   Setter setter)
    : WriteData (subchunkPair),
      data_p (),
      setter_p (setter)
    {
        data_p.assign (data); // Make a pure copy
    }

    void
    write (VisibilityIterator * vi)
    {
        (vi ->* setter_p) (data_p);
    }

private:

    Data   data_p;
    Setter setter_p;

};

template <typename Data>
WriteData *
createWriteData (const SubChunkPair & subchunkPair,
                 const Data & data,
                 void (VisibilityIterator::* setter) (const Data &))
{
    return new WriteDataImpl<Data> (subchunkPair, data, setter);
}

template <typename Data>
class WriteDataImpl2 : public WriteData {
public:

    typedef ROVisibilityIterator::DataColumn DataColumn;
    typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn);

    WriteDataImpl2 (const SubChunkPair & subchunkPair,
                    const Data & data,
                    DataColumn dataColumn,
                    Setter setter)
    : WriteData (subchunkPair),
      data_p (),
      dataColumn_p (dataColumn),
      setter_p (setter)
    {
        data_p.assign (data); // Make a pure copy
    }

    void
    write (VisibilityIterator * vi)
    {
        (vi ->* setter_p) (data_p, dataColumn_p);
    }

private:

    Data       data_p;
    DataColumn dataColumn_p;
    Setter     setter_p;
};

template <typename Data>
WriteData *
createWriteData (const SubChunkPair & subchunkPair,
                 const Data & data,
                 ROVisibilityIterator::DataColumn dataColumn,
                 void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn))
{
    return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
}

class AsynchronousInterface;

class WriteQueue {

public:

    WriteQueue ();
    ~WriteQueue ();

    WriteData * dequeue ();
    casacore::Bool empty (casacore::Bool alreadyLocked = false);
    void enqueue (WriteData * writeData);

    void initialize (const AsynchronousInterface *);

    void write (VisibilityIterator * vi);

private:

    const AsynchronousInterface * interface_p;
    async::Mutex mutex_p;
    std::queue<WriteData *> queue_p;
};


class AsynchronousInterface {

    //friend class InterfaceController;

public:

    // make noncopyable...
    AsynchronousInterface( const AsynchronousInterface& ) = delete;
    AsynchronousInterface& operator=( const AsynchronousInterface& ) = delete;

    AsynchronousInterface (int maxNBuffers);
    ~AsynchronousInterface ();

    void addModifier (asyncio::RoviaModifier * modifier);
    async::Mutex & getMutex () const;
    //async::LockGuard getLockGuard () const;
    VlaData * getVlaData ();
    VLAT * getVlat ();
    WriteQueue & getWriteQueue ();
    void initialize ();
    casacore::Bool isSweepTerminationRequested () const;
    casacore::Bool isLookaheadTerminationRequested () const;
    void notifyAllInterfaceChanged () const;
    void requestViReset ();
    std::pair<casacore::Bool, RoviaModifiers> resetVi ();
    void terminate ();
    void terminateLookahead ();
    void terminateSweep ();
    RoviaModifiers transferRoviaModifiers ();
    void viResetComplete ();
    casacore::Bool viResetRequested ();
    void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;

    static casacore::Bool initializeLogging ();
    static casacore::Bool logThis (casacore::Int level);

private:

    mutable async::Condition  interfaceDataChanged_p; // Signals interface data has changed
                                                      // o VisBuffer consumed
                                                      // o Write data queued
                                                      // o Sweep or thread termination requested
    volatile casacore::Bool             lookaheadTerminationRequested_p; // true to request thread termination
    mutable async::Mutex      mutex_p;                // casacore::Mutex protecting access to concurrent data
    asyncio::RoviaModifiers   roviaModifiers_p;
    volatile casacore::Bool             sweepTerminationRequested_p;     // true to request sweep termination
                                                               // (e.g., prior to rewinding
    volatile casacore::Bool             viResetComplete_p; // VI reset process has completed
    volatile casacore::Bool             viResetRequested_p; // VI reset has been requested
    VlaData                   vlaData_p;          // Lookahead data
    VLAT *                    vlat_p;             // Lookahead thread
    WriteQueue                writeQueue_p;       // casacore::Data to be written (writable VIs only)

    static casacore::Bool loggingInitialized_p;
    static casacore::Int logLevel_p;
};

} // end namespace asyncio

} // end namespace casa

#endif /* ASYNCHRONOUS_INTERFACE_H_ */