#ifndef _SINGLEDISH_PTHREAD_UTILS_H_
#define _SINGLEDISH_PTHREAD_UTILS_H_

#include <pthread.h>

#include <iostream>
#include <sstream>
#include <stdexcept>
#include <unistd.h>

using namespace std;

#define THROW_IF(condition, msg) \
  do { \
    if ((condition)) { \
      throw runtime_error((msg)); \
    } \
  } while (false)

namespace casa { //# NAMESPACE CASA - BEGIN
namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN

class Mutex {

public:
  Mutex() :
      mutex_(PTHREAD_MUTEX_INITIALIZER) {
//    cout << "Mutex::Mutex()" << endl;
    int ret = pthread_mutex_init(&mutex_, NULL);
    THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex");
  }
  ~Mutex() noexcept(false) {
//    cout << "Mutex::~Mutex()" << endl;
    int ret = pthread_mutex_destroy(&mutex_);
    THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex");
  }
  int lock() {
//    cout << "Mutex::lock()" << endl;
    int ret = pthread_mutex_lock(&mutex_);
    THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex");
    return ret;
  }
  int unlock() {
//    cout << "Mutex::unlock()" << endl;
    int ret = pthread_mutex_unlock(&mutex_);
    THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex");
    return ret;
  }
  int try_lock() {
//    cout << "Mutex::try_lock()" << endl;
    return pthread_mutex_trylock(&mutex_);
  }

protected:
  pthread_mutex_t mutex_;

  friend class PCondition;
};

class PCondition {
public:
  PCondition(Mutex *mutex) :
      mutex_(&(mutex->mutex_)) {
    int ret = pthread_cond_init(&cond_, NULL);
    THROW_IF(ret != 0,
        "PCondition::PCondition() failed to initialize pthread_cond_t");
  }

  virtual ~PCondition() noexcept(false) {
    int ret = pthread_cond_destroy(&cond_);
    THROW_IF(ret != 0,
        "PCondition::~PCondition() failed to destroy pthread_cond_t");
  }
  int lock() {
//    cout << "PCondition::lock()" << endl;
    return pthread_mutex_lock(mutex_);
  }

  int unlock() {
//    cout << "PCondition::unlock()" << endl;
    return pthread_mutex_unlock(mutex_);
  }

  int wait() {
//    cout << "PCondition::wait()" << endl;
    int ret = pthread_cond_wait(&cond_, mutex_);
    THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t");
    return ret;
  }

  int signal() {
//    cout << "PCondition::signal()" << endl;
    int ret = pthread_cond_signal(&cond_);
    THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t");
    return ret;
  }
private:
  pthread_mutex_t *mutex_;
  pthread_cond_t cond_;
};

// implementation of producer consumer model
template<class DataType, ssize_t BufferSize>
class ProducerConsumerModelContext {
public:
  typedef ProducerConsumerModelContext<DataType, BufferSize> _Context;

  // production function
  static void produce(_Context *context, DataType item) {
    context->lock();

    // wait until buffer becomes available for production
    while (context->buffer_is_full()) {
      context->producer_wait();
    }

    assert(!context->buffer_is_full());

    context->push_product(item);

    context->producer_next();

    // send a signal to consumer since something is produced
    context->consumer_signal();

    context->unlock();
  }

  // consumption function
  // return false if no more products available
  // otherwise return true
  static bool consume(_Context *context, DataType *item) {
    context->lock();

    // wait until something is produced
    while (context->buffer_is_empty()) {
      context->consumer_wait();
    }

    assert(!context->buffer_is_empty());

    context->pop_product(item);
    bool more_products = (*item != context->end_of_production_);

    context->consumer_next();

    // send a signal to consumer since there are available slot in buffer
    context->producer_signal();

    context->unlock();

    return more_products;
  }

  // it should be called when production complete
  static void complete_production(_Context *context) {
    produce(context, context->end_of_production_);
  }

  // constructor
  ProducerConsumerModelContext(DataType const terminator) :
      end_of_production_(terminator), num_product_in_buffer_(0),
      producer_index_(0), consumer_index_(0), mutex_(),
      consumer_condition_(&mutex_), producer_condition_(&mutex_) {
    //std::cout << "end_of_production = " << end_of_production_ << std::endl;
  }

  // destructor
  ~ProducerConsumerModelContext() {
  }

  // utility
  template<class T>
  static void locked_print(T msg, _Context *context) {
    context->lock();
    cout << msg << endl;
    context->unlock();
  }

private:
  int lock() {
    return mutex_.lock();
  }

  int unlock() {
    return mutex_.unlock();
  }

  int try_lock() {
    return mutex_.try_lock();
  }

  int producer_wait() {
    return producer_condition_.wait();
  }

  int producer_signal() {
    return producer_condition_.signal();
  }

  int consumer_wait() {
    return consumer_condition_.wait();
  }

  int consumer_signal() {
    return consumer_condition_.signal();
  }

  bool buffer_is_full() {
    return num_product_in_buffer_ >= BufferSize;
  }

  bool buffer_is_empty() {
    return num_product_in_buffer_ <= 0;
  }

  void producer_next() {
    producer_index_++;
    producer_index_ %= BufferSize;
    num_product_in_buffer_++;
  }

  void consumer_next() {
    consumer_index_++;
    consumer_index_ %= BufferSize;
    num_product_in_buffer_--;
  }

  void push_product(DataType item) {
    buffer_[producer_index_] = item;
  }

  void pop_product(DataType *item) {
    *item = buffer_[consumer_index_];
  }

  // terminator data
  // (product == end_of_production_) indicates that production
  // is completed.
  DataType const end_of_production_;
  DataType buffer_[BufferSize];
  ssize_t num_product_in_buffer_;
  ssize_t producer_index_;
  ssize_t consumer_index_;
  Mutex mutex_;
  PCondition consumer_condition_;
  PCondition producer_condition_;
};

void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *),
    void *param);
void join_thread(pthread_t *tid, void **status);

} //# NAMESPACE SDFILLER - END
} //# NAMESPACE CASA - END

#endif /* _SINGLEDISH_PTHREAD_UTILS_H_ */