c++11中线程池和定时器的使用
c++11中线程池和定时器的使⽤
简介
本篇⽂章主要介绍线程池和定时器的混合使⽤,实现多线程的时间调度
线程池:⼀种线程的使⽤模式,线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。
定时器:本⽂的定时器是配合线程池⼀起使⽤的。
⼯作⽅式:1.当新加⼊⼀个定时任务,定时器会将任务信息(任务执⾏时间)加到任务队列中,当任务到期 时投递到线程池中进⾏处理执⾏。
2.外部直接投递⼀个任务到线程池中
系统的组成和⼯作原理
1.线程池管理
AcsThreadPool类⽤来管理线程的调度(线程的释放和循环使⽤)和来⾃定时器投递的定时任务,AcsThread类会创建⼀定数量的线程,存到⼯作线程堆栈_stackThreads中启动线程,根据定时任务状态在空闲的线程中执⾏。
2.⼯作线程
线程池中线程,在线程池中等待并执⾏分配的任务。当定时器投递到期的定时任务时的线程调度⽅式:
1.如果没有分配线程池就在当前的线程中执⾏任务;
2.如果分配了线程池,就在线程池中寻空闲的线程执⾏任务;
3.如果线程池中没有空闲的线程,且已创建的线程数⼩于线程池允许的最⼤线程数,则创建新的新的⼯作线程执⾏任务;
4.如果线程池中没有空余的线程,且已创建的线程数⼤于等于线程池允许的最⼤线程数,则该次定时 任务就不会被执⾏了。
另外线程池也可以单独使⽤(不使⽤定时器来投递定时任务),外部直接投递任务进线程池,任务在空闲的⼯作线程中执⾏,线程调度⽅式跟定时任务的相同。
3.定时器
在程序中应保持定时器⼀直在正常⼯作,每个定时任务都会有个延迟时间,定时任务信息会存储到定时任务的multimap中,这⾥⾯是按时间顺序已经排序好的定时任务,当任务到期时就会投递到线程池或当前线程进⾏处理执⾏。
主要代码
线程池代码:
AcsThreadPool::AcsThread::AcsThread(AcsThreadPool& pool, int threadID) :
_pool(pool),
_threadID(threadID){教育部学籍在线验证报告
_hevent = event_create(true, false);
_pool._threadCount++;
_pool.add_thread(this);
LOG_INFO("Create thread. number:{}", _pool._threadCount.load());
}
AcsThreadPool::AcsThread::~AcsThread(){
_bStop = true;
if (_hevent) event_set(_hevent);
if (_pthread){
_pthread->join();
delete _pthread;
}
if (_hevent) event_destroy(_hevent);
_pool._threadCount--;
_ve_thread(_threadID);
LOG_INFO("Release thread {}", _pool._threadCount.load());
}
bool AcsThreadPool::AcsThread::thread_start(){
_pthread = new std::thread(std::bind(&AcsThread::loop_proc, this));    if (!_pthread){
LOG_ERROR("Create thread failed.");
return false;
}
return true;
}
void AcsThreadPool::AcsThread::thread_stop(){
_bStop = true;
if (_hevent) event_set(_hevent);
深模式如何设置
}
void AcsThreadPool::AcsThread::invoke_work(AcsExcutePtr& work){    _work = work;
if(_hevent) event_set(_hevent);
}
void AcsThreadPool::AcsThread::loop_proc(){
while (!_bStop){
if (event_timedwait(_hevent, 1000 * 300) == 0){
event_reset(_hevent);
if (_work){
_work->acs_excute_func();
_set();
}
if (!_bStop){
_le_idle_thread(this);
}
}
else{
LOG_TRACE("Stop thread. threadIndex:{}", _threadID);
_pool.stop_idle_thread();
}
}
}
AcsThreadPool::AcsThreadPool(int min, int max){
max = max > MAX_THREAD_NUMBER ? MAX_THREAD_NUMBER : max;    if (min > 0 && min <= max){
for (int i = 0; i < min; i++){
AcsThread* thread = create_thread();
if (thread){
_stackThreads.push(thread);
}
}
}
_minCount = min;
_maxCount = max;
奥迪a4首保多少公里}
AcsThreadPool::~AcsThreadPool(){
std::map<int, AcsThread*> tmpThreads;
tmpThreads.swap(_mapThreads);
for (auto& e : tmpThreads){
delete e.second;低碳生活手抄报内容
}
}
int AcsThreadPool::get_thread_id(){
return _threadIndex.fetch_add(1);
}
AcsThreadPool::AcsThread* AcsThreadPool::create_thread(){
int threadID = get_thread_id();
AcsThread* thread = new AcsThread(*this, threadID);
if (thread && thread->thread_start()){
return thread;
}
else{
if (thread) delete thread;
LOG_ERROR("Create thread failed.");
有山有水的成语
return nullptr;
}
}
AcsThreadPool::AcsThread* AcsThreadPool::get_idle_thread(){
AcsThread* thread = nullptr;
ktv最好唱的歌
AcsMutexLock::lock(&_mutex);
if (!_pty()){
thread = _p();
_stackThreads.pop();
}
AcsMutexLock::unlock(&_mutex);
return thread;
}
void AcsThreadPool::stop_idle_thread(){
AcsMutexLock::lock(&_mutex);
if (_stackThreads.size() > _minCount && _minCount >= 0){
AcsThread* thread = _p();
_stackThreads.pop();
if (thread){
thread->thread_stop();
_vecInvalidThreads.push_back(thread);
}
}
AcsMutexLock::unlock(&_mutex);
}
void AcsThreadPool::recycle_idle_thread(AcsThread* thread){    if (thread){
AcsMutexLock::lock(&_mutex);
_stackThreads.push(thread);
AcsMutexLock::unlock(&_mutex);
}
}
void AcsThreadPool::release_invalid_thread(){
std::vector<AcsThread*> threads;
AcsMutexLock::lock(&_mutex);
threads.swap(_vecInvalidThreads);
AcsMutexLock::unlock(&_mutex);
for (auto& e : threads){
delete e;
}
}
void AcsThreadPool::add_thread(AcsThread* thread){
AcsMutexLock::lock(&_mutex);
add_map_object(_mapThreads, thread->thread_id(), thread);    AcsMutexLock::unlock(&_mutex);
}
void AcsThreadPool::remove_thread(int threadID){
AcsMutexLock::lock(&_mutex);
remove_map_object(_mapThreads, threadID);
AcsMutexLock::unlock(&_mutex);
}
bool AcsThreadPool::post_work(AcsExcutePtr& work){
if (!work) return false;
AcsThread* thread = get_idle_thread();
if (!thread){
release_invalid_thread();
if (_threadCount.load() < _maxCount){
thread = create_thread();
if (thread){
thread->invoke_work(work);
return true;
}
else{
LOG_ERROR("Create thread failed.");
return false;
}
}
else{
LOG_DEBUG("Reach the up limit of thread");
return false;
}
}
else{
thread->invoke_work(work);
return true;
}
}
定时器代码:
typedef unsigned long long TimerIndex;
typedef std::weak_ptr<AcsExcute> AcsExucteWeakPtr;
class TimerHandle{
public:
friend class AcsTimer;
TimerHandle(){}
TimerHandle(TimerIndex index, AcsExucteWeakPtr& weakRef) : _timerIndex(index), _weakRef(weakRef){} private:
bool is_finish(){
return _weakRef.use_count() == 0;
}
bool try_lock(){
bool expected = false;
return _bHoldpare_exchange_strong(expected, true);
}
void unlock(){
_bHold.store(false);
}
private:
TimerIndex        _timerIndex{ 0 };
AcsExucteWeakPtr  _weakRef;
std::atomic<bool>  _bHold{ false };
};

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。