// // C++ Implementation: rng_queue_pool // // Description: // // // Author: Frank Michler,,, , (C) 2010 // // Copyright: See COPYING file that comes with this distribution // // #include #include #include #include #include "rng_paraqueue_pool.hpp" #include "rngqueue.hpp" using namespace std; RngParaQueuePool::RngQueueInfo::RngQueueInfo(gsl_rng * _GslR, RngType Type, double Parameter, int NVectors, int NumbersPerVector): mGslR(_GslR), mType(Type), mParameter(Parameter), mNRndVectors(NVectors), mkNumbersPerVector(NumbersPerVector) { mRNVectorPool = vector >(mNRndVectors, vector(mkNumbersPerVector, 0)); for (int i=0; i::iterator it=mRNVectorPool[VectorNum].begin();it!=mRNVectorPool[VectorNum].end();++it) { (*it) = gsl_ran_poisson(mGslR, mParameter); } break; default: cerr << "ERROR: not yet supported\n"; break; } mLeasedNumber = VectorNum; } /** @brief returns current random number vector, only for use in getRngQueue(...) * * @return pointer to random number vector */ vector* RngParaQueuePool::RngQueueInfo::getCurrentRandomNumberVector() { return &(mRNVectorPool[mLeasedNumber]); } /** @brief returns current number of used random numbers, only for use in getRngQueue(...) * * @return number of used numbers */ int RngParaQueuePool::RngQueueInfo::NUsedNumbers() { return mUsedNumbers.size(); } vector* RngParaQueuePool::RngQueueInfo::getRandomNumberVector() { if (!mSemFreshNumbers.tryAcquire()) { //cout << "-"; mSemFreshNumbers.acquire(); } mMutFreshNumbers.lock(); int VectorNum=mFreshNumbers.front(); mFreshNumbers.pop_front(); mMutFreshNumbers.unlock(); // cout << "f" << VectorNum << " \n"; mMutUsedNumbers.lock(); mUsedNumbers.push_back(mLeasedNumber); mMutUsedNumbers.unlock(); // cout << "l" << mLeasedNumber << " \n"; mLeasedNumber = VectorNum; return &(mRNVectorPool[VectorNum]); } bool RngParaQueuePool::RngQueueInfo::refreshRandomNumbers() { // cout << "RngParaQueuePool::refreshRandomNumbers\n"; // are there used vectors? which vector should be refreshed? int VectorNum=-1; mMutUsedNumbers.lock(); if (mUsedNumbers.empty()) { mMutUsedNumbers.unlock(); cout << "x"; return false; } else { VectorNum = mUsedNumbers.front(); mUsedNumbers.pop_front(); /* cout << "r" << VectorNum ;*/ } mMutUsedNumbers.unlock(); switch (mType) { case kRngPoisson: for(vector::iterator it=mRNVectorPool[VectorNum].begin();it!=mRNVectorPool[VectorNum].end();++it) { (*it) = gsl_ran_poisson(mGslR, mParameter); } break; default: cerr << "ERROR: not yet supported\n"; break; } mMutFreshNumbers.lock(); mFreshNumbers.push_back(VectorNum); mMutFreshNumbers.unlock(); mSemFreshNumbers.release(); return true; } RngParaQueuePool::RngParaQueuePool(): mIsDead(false) { cout << "Singleton-Constructor: RngParaQueuePool::RngParaQueuePool()\n"; // initialize global random generator const gsl_rng_type * T; gsl_rng_env_setup(); T = gsl_rng_default; gslr = gsl_rng_alloc (T); #ifdef SEED_RNG_WITH_42 unsigned long int seed = (long)(42); #else unsigned long int seed = (long)(time( NULL )); #endif gsl_rng_set(gslr, seed); } RngParaQueuePool::~RngParaQueuePool() { cout << "\n Destructor\n"; mIsDead=true; mTotalUsedNumbers.release(); wait(); } IRngQueuePool* RngParaQueuePool::getRngQueuePool() { cout << "Singleton-Request\n"; static RngParaQueuePool instanz; if (!instanz.isRunning()) { cout << "Starting random number generator thread\n"; instanz.start(); } return &instanz; } IRngQueue* RngParaQueuePool::getRngQueue() { return getRngQueue(kRngPoisson, 3, 10, 2000); } IRngQueue* RngParaQueuePool::getRngQueue(RngType _RType, double _Parameter, int NVectors, int NumbersPerVector) { RngQueueInfo* NewQueueInfo = new RngQueueInfo(gslr, _RType, _Parameter, NVectors, NumbersPerVector); RngQueue* NewQueue = new RngQueue(this, NewQueueInfo->getCurrentRandomNumberVector()); /* mNotFullMutex.lock(); mNUsedNumbers += NewQueueInfo->NUsedNumbers(); mNotFullMutex.unlock(); */ int UsedNumbers = NewQueueInfo->NUsedNumbers(); mTotalUsedNumbers.release(UsedNumbers); cout << "UsedNumbers=" << UsedNumbers << "\n"; mMutQueueInfoMap.lock(); mQueueInfoMap[NewQueue] = NewQueueInfo; mMutQueueInfoMap.unlock(); // mNotFull.wakeAll(); return NewQueue; } vector* RngParaQueuePool::getRandomNumberVector(IRngQueue* _RngQueue) { IterQueInfoMap it = mQueueInfoMap.find(_RngQueue); if (it!=mQueueInfoMap.end()) { vector* RandomVect = (*it).second->getRandomNumberVector(); mTotalUsedNumbers.release(); /* mNotFullMutex.lock(); ++mNUsedNumbers; cout << "'" << mNUsedNumbers ; mNotFullMutex.unlock(); mNotFull.wakeAll();*/ return RandomVect; } cout << "RngParaQueuePool::getRandomNumberVector failed\n"; return 0; } /** release RngQueue, is called in destructor of RngQueue * * clean up housekeeping information for RngQueue * * @param _RngQueue pointer Random Number Queue object */ void RngParaQueuePool::releaseRngQueue(IRngQueue* _RngQueue) { /** @todo test this */ IterQueInfoMap it = mQueueInfoMap.find(_RngQueue); if (it!=mQueueInfoMap.end()) { mTotalUsedNumbers.release((*it).second->NUsedNumbers()); delete (*it).second; mQueueInfoMap.erase(it); } } void RngParaQueuePool::refillRNumberPool() { while (!mIsDead) { /* mNotFullMutex.lock(); if (mNUsedNumbers == 0) { // cout << "\nRngQueuePool::refillRNumberPool waiting for used numbers\n"; cout << "+"; mNotFull.wait(&mNotFullMutex); } mNotFullMutex.unlock(); */ // jede Queue in der map einmal auffüllen //cout << "a" << mTotalUsedNumbers.available() << " "; mMutQueueInfoMap.lock(); for (IterQueInfoMap it=mQueueInfoMap.begin(); it!= mQueueInfoMap.end();++it) { if (!mTotalUsedNumbers.tryAcquire()) { //cout << "+"; mTotalUsedNumbers.acquire(); } if (!(*it).second->refreshRandomNumbers()) { mTotalUsedNumbers.release(); cout << "b" << mTotalUsedNumbers.available() << " "; /* mNotFullMutex.lock(); --mNUsedNumbers; mNotFullMutex.unlock(); cout << "|" << mNUsedNumbers; */ } } mMutQueueInfoMap.unlock(); } cout << "finish thread loop \n"; } void RngParaQueuePool::run() { refillRNumberPool(); }