//# Applicator.cc: Implementation of Applicator.h
//# Copyright (C) 1999,2000,2002
//# Associated Universities, Inc. Washington DC, USA.
//#
//# This library is free software; you can redistribute it and/or modify it
//# under the terms of the GNU Library General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or (at your
//# option) any later version.
//#
//# This library is distributed in the hope that it will be useful, but WITHOUT
//# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
//# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
//# License for more details.
//#
//# You should have received a copy of the GNU Library General Public License
//# along with this library; if not, write to the Free Software Foundation,
//# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
//#
//# Correspondence concerning AIPS++ should be addressed as follows:
//#        Internet email: aips2-request@nrao.edu.
//#        Postal address: AIPS++ Project Office
//#                        National Radio Astronomy Observatory
//#                        520 Edgemont Road
//#                        Charlottesville, VA 22903-2475 USA
//#
//# $Id$

#include <casacore/casa/Utilities/Assert.h>

#include <synthesis/Parallel/Applicator.h>
#include <synthesis/Parallel/MPITransport.h>
#include <synthesis/Parallel/SerialTransport.h>
#include <synthesis/Parallel/Algorithm.h>
#include <synthesis/MeasurementComponents/ClarkCleanAlgorithm.h>
#include <synthesis/MeasurementComponents/ReadMSAlgorithm.h>
#include <synthesis/MeasurementComponents/MakeApproxPSFAlgorithm.h>
#include <synthesis/MeasurementComponents/PredictAlgorithm.h>
#include <synthesis/MeasurementComponents/ResidualAlgorithm.h>
#include <synthesis/ImagerObjects/CubeMajorCycleAlgorithm.h>
#include <synthesis/ImagerObjects/CubeMakeImageAlgorithm.h>
#include <synthesis/ImagerObjects/CubeMinorCycleAlgorithm.h>
#include <synthesis/Parallel/MPIError.h>

using namespace casacore;
using namespace std;
namespace casa { //# NAMESPACE CASA - BEGIN

Applicator::Applicator() : comm(0), algorithmIds( ),
  knownAlgorithms( ), LastID(101), usedAllThreads(false),
                           serial(true), nProcs(0), procStatus(0), initialized_p(false)
{
// Default constructor; requires later init().
}

Applicator::~Applicator()
{
// Default destructor
//  
  if (comm) {
    // If controller, then stop all worker processes
    if (isController() && !(comm->isFinalized())) {
      comm->setTag(STOP);
      for (Int i=0; i<nProcs; i++) {
	if (i != comm->controllerRank()) {
	  comm->connect(i);
	  put(STOP);
	}
      }
    }
    delete comm;
  }

  for (auto &algo : knownAlgorithms) {
      delete algo.second;
  }
}

void Applicator::initThreads(Int argc, Char *argv[]){

  Int numprocs=0;
 
   // A no-op if not using MPI
#ifdef HAVE_MPI
  //if (debug_p) {

  if(initialized_p) return;
  
  //If detecting only  1 proc is offered to OpenMPI but compiling with MPI
  if (!getenv("OMPI_COMM_WORLD_LOCAL_SIZE") ||  (String::toInt(getenv("OMPI_COMM_WORLD_LOCAL_SIZE")) <2) ) {
    //go serial
    initThreads();
  } 
  else {
    //cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
    int flag=0;
    MPI_Initialized(&flag);
    //cerr << "FLAG " << flag << endl;
    if(flag || MPI_Init(&argc, &argv)==MPI_SUCCESS){
      Int numproc=0;
      MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
      if(numprocs < 2){
        initThreads();
        MPI_Finalize();
        return;
      }
    }
 
    //  cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
    // Initialize the MPI transport layer
    try {
      comm = new MPITransport(argc, argv);

       // Initialize the process status list
       setupProcStatus();

       // If controller then exit, else loop, waiting for an assigned task
       if (isWorker()) {
         loop();
       }

    } catch (MPIError x) {
      cerr << x.getMesg() << " doing serial "<< endl;
      initThreads();
    }
  } 

#else
  (void)argc;
  (void)argv;
  cerr << " doing serial "<< endl;
  initThreads();
#endif
}

   // Serial transport all around.
void Applicator::initThreads(){
     // Initialize a serial transport layer
  comm = new SerialTransport();
     // Initialize the process status list
  setupProcStatus();
}
void Applicator::destroyThreads(){
  if(initialized_p){
    if (comm) {
    // If controller, then stop all worker processes
      if (isController() && !isSerial() && !(comm->isFinalized())) {
              //comm->setTag(STOP);
	for (Int i=0; i<nProcs; i++) {
	  if (i != comm->controllerRank()) {
	    comm->connect(i);
	    comm->setTag(STOP);
	    put(STOP);

	  }
	}
      }
      //delete comm; ///leaking this for now as if initialized from python..it brings down the whole house
      //comm=nullptr;
    }

  }

}
void Applicator::init(Int argc, Char *argv[])
{
// Initialize the process and parallel transport layer
//
  //cerr <<"Applicatorinit " << initialized_p << endl;
  if(comm){
    //if worker  was released from loop...want it back now
    if(comm && isWorker() && !isSerial())
      loop();
    return;
  }
  // Fill the map of known algorithms
  //cerr << "APPINIT defining algorithms " << endl;
  defineAlgorithms();

#ifdef HAVE_MPI
  if (debug_p) {
     cerr << "In init threads, HAVE_MPI...\n";
  }
  initThreads(argc, argv);
#else
  if (debug_p) {
      cerr << "In init threads, not HAVE_MPI...\n";
  }
  (void)argc;
  (void)argv;
  initThreads();
#endif
  initialized_p=true;
  return;
}

Bool Applicator::isController()
{
// Return T if the current process is the controller
//
  Bool result;
  if (comm) {
    result = comm->isController();
  } else {
    throw(AipsError("Parallel transport layer not initialized"));
  }
  return result;
}

Bool Applicator::isWorker()
{
// Return T if the current process is a worker process
//
  Bool result;
  if (comm) {
    result = comm->isWorker();
  } else {
    throw(AipsError("Parallel transport layer not initialized"));
  }
  return result;
}

void Applicator::loop()
{
// Loop, if a worker process, waiting for an assigned task
//
  Bool die(false);
  Int what;
  // Wait for a message from the controller with any Algorithm tag
  while(!die){
    comm->connectToController();
    comm->setAnyTag();
    //cerr << "in loop get" << endl;
    comm->get(what);
    if (debug_p) {
        cerr << "worker, got what (algID/stop): " << what << endl;
    }
    switch(what){
    case STOP :
      die = true;
      break;
    default :
      // In this case, an Algorithm tag is expected.
      // First check that it is known.
      if (knownAlgorithms.find(what) != knownAlgorithms.end( )) {
	// Identified algorithm tag; set for subsequent communication
	comm->setTag(what);
	// Execute (apply) the algorithm
	knownAlgorithms.at(what)->apply();
      } else {
	throw(AipsError("Unidentified parallel algorithm code"));
      }
      break;
    }
  }
  //cerr <<"getting out of loop " <<endl;
  return;
}

Bool Applicator::nextAvailProcess(Algorithm &a, Int &rank)
{
// Assign the next available process for the specified Algorithm
//  
  // Must be the controller to request a worker process
  Bool assigned=False;
  if (isWorker()) {
    throw(AipsError("Must be the controller to assign a worker process"));
  } else {
    if (!usedAllThreads) {
      // Connect to the next available process in the list
      Bool lastOne;
      rank = findFreeProc(lastOne);
      AlwaysAssert(rank >= 0, AipsError);
      if (lastOne) usedAllThreads = true;
      Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
      
      // Send wake-up message (containing the Algorithm tag) to
      // the assigned worker process to activate it (see loop()).
      comm->connect(rank);
      comm->setTag(tag);
      //cerr << "nextAvailproc settag " << tag << " rank " << rank << " name " << a.name() << endl;
      put(tag);
      /*
      if (not isWorker() and numProcs() <= 1){
      // the first int, algID, is consumed in the loop for the workers when running
      // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
      // single process, we need to consume it:
      // TODO - it could be consumed up here, right after the put()
      int algID;
      comm->get(algID);
    if (debug_p) {
      cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<<  endl;
     }
      }
      */
      assigned = true;
      procStatus(rank) = ASSIGNED;
    } else {
      assigned = false;
    }
  }
  //cerr << "nextAvailproc controller assigned " << assigned << endl;
  
  if ((!isWorker()) && (numProcs() <= 1) && assigned){
      // the first int, algID, is consumed in the loop for the workers when running
      // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
      // single process, we need to consume it:
      // TODO - it could be consumed up here, right after the put()
      Int algID;
      //comm->get(algID);
      get(algID);
    if (debug_p) {
      cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<<  endl;
     }
  }
  
  return assigned;
}

bool Applicator::initialized(){
#ifdef HAVE_MPI
  return initialized_p;  
#endif  
  
  return false;
}
Int Applicator::nextProcessDone(Algorithm &a, Bool &allDone)
{
// Return the rank of the next process to complete the specified algorithm
//
  Int rank = -1;
  allDone = true;
  //cerr << "nextprocess done procstatus " << procStatus << endl;
  for (uInt i=0; i<procStatus.nelements(); i++) {
    if (procStatus(i) == ASSIGNED) {
      if (isSerial()) {
	// In the serial case, the controller can be assigned
	allDone = false;
      } else {
	// In the parallel case, the controller is not assigned
	if (i != static_cast<uInt>(comm->controllerRank())) {
	  allDone = false;
	}
      }
    }
  }
  if (!allDone) {
    // Wait for a process to finish with the correct algorithm tag
    comm->connectAnySource();
    Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
    //cerr <<"procdone name" << a.name() << " id " << tag << endl;
    comm->setTag(tag);
    Int doneSignal;
    rank = get(doneSignal);
    //cerr <<" procdone rank " << rank << " donesig " << doneSignal << endl;
    // Consistency check; should return a DONE signal to contoller
    // on completion.
    if (doneSignal != DONE) {
      throw(AipsError("Worker process terminated unexpectedly"));
    } else {
      // Set source in parallel transport layer
      comm->connect(rank);
      // Mark process as free
      procStatus(rank) = FREE;
      //cerr << "NEXTProcDone connect rank" << rank << " procstat " << procStatus << endl; 
      usedAllThreads = false;
    }
  }
  return rank;
}

void Applicator::done()
{
// Signal that a worker process is done
//
  donesig_p=DONE;
  Int donesig=DONE;
  if(isSerial())
    put(donesig_p);
  else
    put(donesig);
  return;
}

void Applicator::apply(Algorithm &a)
{
// Execute an algorithm directly
//
  // Null operation unless serial, in which case the 
  // controller needs to execute the algorithm directly.
  // In the parallel case, the algorithm applies are
  // performed in workers processes' applicator.init().
  donesig_p=10000;
  if (isSerial() && isController()) {
    a.apply();
  }
  return;
}

void Applicator::defineAlgorithm(Algorithm *a)
{
  //no need to add if it is already defined
  //  if(algorithmIds.count(a->name()) <1){
  //knownAlgorithms.insert( std::pair<casacore::Int,Algorithm*>(LastID, a) );
  // algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a->name(), LastID) );
  Int theid=LastID;
  if(algorithmIds.count(a->name()) >0){
    theid=algorithmIds[a->name()];
  }
  else{
    theid=LastID;
    algorithmIds[a->name()]=LastID;
    ++LastID;
  }
  knownAlgorithms[theid]=a;
   // }
   return;
}

void Applicator::defineAlgorithms()
{
// Fill the algorithm map
//
  // Clark CLEAN parallel deconvolution
  Algorithm *a1 = new ClarkCleanAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a1) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a1->name(), LastID) );
  LastID++;
  Algorithm *a2 = new ReadMSAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a2) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a2->name(), LastID) );
  LastID++;
  Algorithm *a3 = new MakeApproxPSFAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a3) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a3->name(), LastID) );
  LastID++;
  Algorithm *a4 = new PredictAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a4) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a4->name(), LastID) );
  LastID++;
  Algorithm *a5 = new ResidualAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a5) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a5->name(), LastID) );
  LastID++;
  Algorithm *a6 = new CubeMajorCycleAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a6) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a6->name(), LastID) );
  LastID++;
  Algorithm *a7 = new CubeMakeImageAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a7) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a7->name(), LastID) );
  LastID++;
  Algorithm *a8 = new CubeMinorCycleAlgorithm;
  knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a8) );
  algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a8->name(), LastID) );
  LastID++;
  return;
}

void Applicator::setupProcStatus()
{
// Set up the process status list
//
  nProcs = comm->numThreads();
  if (nProcs <= 1) {
    serial = true;
  } else {
    serial = false;
  }
  // Resize the process list, and mark as unassigned (except for controller)
  usedAllThreads = false;
  procStatus.resize(max(nProcs,1));
  procStatus = FREE;
  // In the parallel case, the controller is never assigned
  if (!isSerial())
      procStatus(comm->controllerRank()) = ASSIGNED;
}

Int Applicator::findFreeProc(Bool &lastOne)
{
// Search the process status list for the next free process
// 
  Int freeProc = -1;
  Int nfree = 0;
 
  for (uInt i=0; i<procStatus.nelements(); i++) {
    if (procStatus(i) == FREE) {
      nfree++;
      if (freeProc < 0) freeProc = i;
    }
  }
  lastOne = (nfree==1);
  //cerr <<"FreeProc procstat "<< procStatus << " nfree " << nfree << endl; 
  return freeProc;
}

// The applicator is ominpresent.
// Moved here for shared libraries.
Applicator applicator;


} //# NAMESPACE CASA - END