/* * AsynchronousTools.cc * * Created on: Nov 1, 2010 * Author: jjacobs */ #include <assert.h> #include <cstdarg> #include <cstring> #include <errno.h> #include <fstream> #include <queue> #include <semaphore.h> #include <fcntl.h> #include <time.h> #include <casacore/casa/aips.h> #if defined(AIPS_LINUX) #if ! defined(_GNU_SOURCE) #define _GNU_SOURCE /* or _BSD_SOURCE or _SVID_SOURCE */ #endif #include <unistd.h> #include <sys/syscall.h> /* For SYS_xxx definitions */ #endif #include <sys/time.h> #include <casacore/casa/Exceptions/Error.h> #include <casacore/casa/Logging/LogIO.h> #include <condition_variable> #include <mutex> #include <thread> #include "AsynchronousTools.h" #include <stdcasa/UtilJ.h> using namespace std; using namespace casacore; using namespace casa::utilj; namespace casa { namespace async { class ConditionImpl { friend class Condition; private: ConditionImpl () : condition_p () {} std::condition_variable condition_p; }; class MutexImpl { friend class Mutex; friend class Condition; private: MutexImpl () : mutex_p () {} ~MutexImpl () {} std::thread::id lockingThreadId_p; std::mutex mutex_p; }; class SemaphoreImpl { friend class Semaphore; private: SemaphoreImpl () : semaphore_p (NULL) {} sem_t * semaphore_p; // [use] }; struct timespec convertMsDeltaToTimespec (Int milliseconds) { // Get the time with a possible accuracy to microseconds and // then convert it into the timeout's data structure used by // pthreads struct timeval tVal; gettimeofday (& tVal, NULL); struct timespec t; t.tv_sec = tVal.tv_sec; t.tv_nsec = tVal.tv_usec * 1000; // Add the wait time in milliseconds to this structure // taking care to handle carry out and avoid overflow t.tv_sec += milliseconds / 1000; // extract seconds and add them in t.tv_nsec += (milliseconds % 1000) * 1000000L; // extract ms, convert to ns and add in t.tv_sec += t.tv_nsec / 1000000000L; // carry out of the ns field into seconds t.tv_nsec %= 1000000000L; // remove any seconds carried out of ns field return t; } Condition::Condition () { impl_p = new ConditionImpl (); } Condition::~Condition () { delete impl_p; } void Condition::broadcast () { notify_all (); } void Condition::notify_all () { impl_p->condition_p.notify_all (); } void Condition::notify_one () { impl_p->condition_p.notify_one (); } void Condition::signal () { notify_one (); } void Condition::wait (UniqueLock & uniqueLock) { impl_p->condition_p.wait (uniqueLock.uniqueLock_p); } /* Bool Condition::wait (Mutex & mutex, int milliseconds) { Assert (milliseconds >= 0); // weird if it's negative struct timespec t = convertMsDeltaToTimespec (milliseconds); int code = pthread_cond_timedwait (impl_p->condition_p, mutex.getRep(), & t); bool gotWait = true; if (code == ETIMEDOUT){ gotWait = false; } else{ ThrowIfError (code, String::format ("Condition::wait (%d)", milliseconds)); } return gotWait; } */ LockGuard::LockGuard (Mutex & mutex) { mutex_p = & mutex; mutex_p->lock (); } LockGuard::LockGuard (Mutex * mutex) { Assert (mutex != NULL); mutex_p = mutex; mutex_p->lock (); } LockGuard::~LockGuard () { mutex_p->unlock (); } LockGuardInverse::LockGuardInverse (Mutex & mutex) { mutex_p = & mutex; mutex_p->unlock (); } LockGuardInverse::LockGuardInverse (Mutex * mutex) { Assert (mutex != NULL); mutex_p = mutex; mutex_p->unlock (); } LockGuardInverse::LockGuardInverse (LockGuard & lg) { mutex_p = lg.mutex_p; mutex_p->unlock(); } LockGuardInverse::~LockGuardInverse () { mutex_p->lock (); } Logger* Logger::singleton_p = NULL; Logger::Logger () : loggingStarted_p (false), nameMutex_p (new Mutex ()) {} Logger::~Logger () { if (loggingStarted_p){ delete get(); } } std::once_flag loggerOnceFlag; Logger* Logger::get() { std::call_once (loggerOnceFlag, initialize); return singleton_p; } void Logger::initialize () { singleton_p = new Logger (); } void Logger::log (const char * format, ...) { va_list vaList; va_start (vaList, format); char buffer[4096]; // Create the text to be logged vsnprintf (buffer, sizeof (buffer), format, vaList); // Grab the timestamp and pid (for IDing threads) String threadNameText; { MutexLocker ml (* nameMutex_p); pthread_t tid = pthread_self(); ThreadNames::iterator threadName = threadNames_p.find (tid); if (threadName != threadNames_p.end ()){ threadNameText = String (" [") + (threadName->second) + "] : "; } else{ threadNameText = String::format (" [0x%08x] : ", tid); } } String prefix = utilj::getTimestamp() + threadNameText; // Allocate a buffer to put into the queue string outputText = prefix + buffer; va_end (vaList); // Lock the queue, push on the block of text and increment // the drain semaphore loggerThread_p -> log (outputText); // ownership passes to the thread } void Logger::registerName (const String & threadName) { Assert (nameMutex_p != NULL); MutexLocker ml (* nameMutex_p); threadNames_p [pthread_self()] = threadName; } void Logger::start (const char * filename) { if (! loggingStarted_p){ // ignore multiple starts loggerThread_p = new LoggerThread (); loggerThread_p ->setLogFilename (filename == NULL ? "" : filename); loggerThread_p ->startThread(); loggingStarted_p = true; } } Logger::LoggerThread::LoggerThread () {} Logger::LoggerThread::~LoggerThread () { terminate(); this->join(); if (deleteStream_p) { dynamic_cast<ofstream *> (logStream_p)->close(); delete logStream_p; } } void Logger::LoggerThread::log (const string & text) { MutexLocker m (mutex_p); outputQueue_p.push (text); loggerChanged_p.notify_all (); } void * Logger::LoggerThread::run () { LogIO logIo (LogOrigin ("Logger::LoggerThread")); logIo << "starting execution; tid=" << gettid() << endl << LogIO::POST; try { // Determine where to write the logging info. If nothing is specified or either "cerr" or // "stdout" are specified then use standard error. If "cout" or "stdout" are specified then // use standard out. Otherwise open the specified file and write to that. if (logFilename_p.empty () || logFilename_p == "cerr" || logFilename_p == "stderr"){ logStream_p = & cerr; deleteStream_p = false; } else if (logFilename_p == "cout" || logFilename_p == "stdout"){ logStream_p = & cout; deleteStream_p = false; } else{ logStream_p = new ofstream (logFilename_p.c_str(), ios::out); deleteStream_p = true; } * logStream_p << utilj::getTimestamp() << ": Logging started, tid=" << gettid() << endl; // Loop waiting on the drain semaphore. This should be incremented once // every time users add a block of text to the queue. while (true){ string text; { // Pop the front block of output off of the queue // Keep mutex locked while accessing queue. UniqueLock uniqueLock (mutex_p); while (! isTerminationRequested() && outputQueue_p.empty()){ loggerChanged_p.wait (uniqueLock); } if (isTerminationRequested() && outputQueue_p.empty()){ break; } text = outputQueue_p.front(); outputQueue_p.pop(); } // Now output the text and then delete the storage * logStream_p << text; logStream_p->flush(); } * logStream_p << "*** Logging terminated" << endl; logStream_p->flush(); return NULL; } catch (exception & e){ const char * message = "*** Logging thread caught exception: "; cerr << message << e.what() << endl; cerr.flush(); if (logStream_p != & cerr){ * logStream_p << message << e.what() << endl; logStream_p->flush(); } throw; } catch (...){ const char * message = "*** Logging thread caught unknown exception"; cerr << message << endl; cerr.flush(); if (logStream_p != & cerr){ * logStream_p << message << endl; logStream_p->flush(); } throw; } } void Logger::LoggerThread::setLogFilename (const String & filename) { logFilename_p = filename; } void Logger::LoggerThread::terminate () { Thread::terminate(); loggerChanged_p.notify_all (); } Mutex::Mutex () { impl_p = new MutexImpl (); isLocked_p = false; } Mutex::~Mutex () { delete impl_p; } std::mutex & Mutex::getMutex () { return impl_p->mutex_p; } //Bool //Mutex::isLockedByThisThread () const //{ // // Only for use in debugs or asserts // // Bool itIs = isLocked_p && std::this_thread::get_id () == impl_p->lockingThreadId_p; // // return itIs; //} void Mutex::lock () { impl_p->mutex_p.lock(); impl_p->lockingThreadId_p = std::this_thread::get_id (); isLocked_p = true; } /* Bool Mutex::lock (Int milliseconds) { Assert (milliseconds >= 0); // weird if it's negative struct timespec t = convertMsDeltaToTimespec (milliseconds); int code = pthread_mutex_timedlock (impl_p->mutex_p, & t); bool gotLock = true; if (code == ETIMEDOUT){ gotLock = false; } else{ ThrowIfError (code, String::format ("Mutex::lock (%d)", milliseconds)); } return gotLock; } */ Bool Mutex::trylock () { bool gotLock = impl_p->mutex_p.try_lock (); isLocked_p = gotLock; if (isLocked_p){ impl_p->lockingThreadId_p = std::this_thread::get_id (); } return gotLock; } void Mutex::unlock () { isLocked_p = false; impl_p->mutex_p.unlock (); } // jagonzal: Useful when locking is mandatory void Mutex::acquirelock() { while (!trylock()) { sched_yield(); } } MutexLocker::MutexLocker (Mutex & mutex) : mutex_p (& mutex) { mutex_p->lock(); } MutexLocker::MutexLocker (Mutex * mutex) : mutex_p (mutex) { Assert (mutex_p != NULL); mutex_p->lock(); } MutexLocker::~MutexLocker () { mutex_p->unlock(); } Semaphore::Semaphore (int initialValue) { Assert (initialValue >= 0); impl_p = new SemaphoreImpl (); // Since Mac doesn't support unnamed semaphores, try and find a // unique name for the semaphore. Names will be of the form // "/Semaphore_xxx" int code = 0; int i = 0; do { ++ i; name_p = String::format ("/CasaAsync_%03d", i); impl_p->semaphore_p = sem_open (name_p.c_str(), O_CREAT | O_EXCL, 0700, initialValue);//new sem_t; code = (impl_p->semaphore_p == SEM_FAILED) ? errno : 0; } while (impl_p->semaphore_p == SEM_FAILED && code == EEXIST); ThrowIfError (code, "Semaphore::open: name='" + name_p + "'"); } Semaphore::~Semaphore () { int code = sem_close (impl_p->semaphore_p); ThrowIfError (code == 0 ? 0 : errno, "Semaphore::close"); code = sem_unlink (name_p.c_str()); ThrowIfError (code == 0 ? 0 : errno, "Semaphore::unlink: name='" + name_p + "'"); delete impl_p; } Int Semaphore::getValue () { int value; int code = sem_getvalue (impl_p->semaphore_p, & value); ThrowIfError (code == 0 ? 0 : errno, "Semaphore::getValue"); return value; } void Semaphore::post () { int code = sem_post (impl_p->semaphore_p); ThrowIfError (code == 0 ? 0 : errno, "Semaphore::post"); } Bool Semaphore::trywait () { int code = sem_trywait (impl_p->semaphore_p); bool gotSemaphore = true; if (code != 0 && errno == EAGAIN){ gotSemaphore = false; } else{ ThrowIfError (code == 0 ? 0 : errno, "Semaphore::wait"); } return gotSemaphore; } void Semaphore::wait () { int errorCode = 0; int code; do { code = sem_wait (impl_p->semaphore_p); errorCode = errno; } while (code != 0 && errorCode == EINTR); ThrowIfError (code == 0 ? 0 : errorCode, "Semaphore::wait"); } Bool Semaphore::wait (int milliseconds) { Assert (milliseconds >= 0); // it's weird if it's negative //// struct timespec t = convertMsDeltaToTimespec (milliseconds); int errorCode = 0; int code; do { code = sem_wait (impl_p->semaphore_p); errorCode = errno; } while (code != 0 && errorCode == EINTR); Bool gotSemaphore = true; if (code == 0){ gotSemaphore = true; } else if (errno == ETIMEDOUT){ gotSemaphore = false; } else { ThrowIfError (errno, String::format ("Mutex::lock (%d)", milliseconds)); } return gotSemaphore; } Thread::Thread () { id_p = new pthread_t; started_p = false; terminationRequested_p = false; } Thread::~Thread () { // Make sure the thread knows it's time to quit terminate (); delete id_p; } pthread_t Thread::getId () const { return * id_p; } pid_t Thread::gettid () const { pid_t result = 0; #if defined(AIPS_LINUX) result = syscall (SYS_gettid); #endif return result; } void * Thread::join () { void * result; int code = pthread_join (* id_p, & result); ThrowIfError (code, "Thread::join"); return result; } bool Thread::isStarted () const { return started_p; } void Thread::startThread () { // Create the thread, passing a pointer to this object as its // single argument. Subclass Thread to pass other information // into the thread function. int code = pthread_create (id_p, NULL, threadFunction, this); started_p = true; ThrowIfError (code, "Thread::create"); } void Thread::terminate () { terminationRequested_p = true; } bool Thread::isTerminationRequested () const { return terminationRequested_p; } void * Thread::threadFunction (void * arg) { Thread * thread = reinterpret_cast<Thread *> (arg); void * result = thread->run (); return result; // use thread variable to store any results } UniqueLock::UniqueLock (Mutex & mutex) : uniqueLock_p (mutex.getMutex()) {} void UniqueLock::lock () { uniqueLock_p.lock (); } void UniqueLock::unlock () { uniqueLock_p.unlock (); } } // end namespace Async } // end namespace CASA