1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class Task { public: friend class ThreadPoolInterface; State GetState() LOCKS_EXCLUDED(mutex_); void SetWorkItem(const WorkItem& work_item); void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);
private: void Execute() LOCKS_EXCLUDED(mutex_);
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);
void OnDependenyCompleted();
using WorkItem = std::function<void()>; enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED }; WorkItem work_item_ ; ThreadPoolInterface* thread_pool_to_notify_ = nullptr; State state_ GUARDED_BY(mutex_) = NEW; std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_); };
|
Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_
是仿函数,其返回类型void。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
class ThreadPool : public ThreadPoolInterface { public: explicit ThreadPool(int num_threads); std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task); private: void DoWork(); void NotifyDependenciesCompleted(Task* task); bool running_ = true; std::vector<std::thread> pool_ ; std::deque<std::shared_ptr<Task>> task_queue_ ; absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_; };
|
看构造函数
1 2 3 4 5 6 7 8 9 10
| ThreadPool::ThreadPool(int num_threads) { absl::MutexLock locker(&mutex_); for (int i = 0; i != num_threads; ++i) { pool_.emplace_back([this]() { ThreadPool::DoWork(); }); } }
|
大致看ThreadPool::DoWork()
,最后执行了ThreadPoolInterface::Execute
,其实就是Task::Execute
。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数
新的Task如果有依赖项,通过Thread_pool::Schedule
添加到 Thread_pool
的 tasks_not_ready_
队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED
,然后再插入task_queue_
队列。最终所有Task都会插入task_queue_
中,在DoWork
中得到执行。
如果该Task没有依赖,直接插入task_queue_
,准备执行。
对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED
Thread_pool
通过固定数量的thread与task_queue_
(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。