后端的线程池 1 thread_pool 和 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 省略部分不重要的成员
class Task
{
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
// 设置Task 执行的任务 (函数)
void SetWorkItem(const WorkItem& work_item);
// 给当前任务添加 依赖任务, 如当前任务为b,AddDependency(a)表示任务a依赖b
// 把当前任务b,加入到依赖任务 a 的 dependent_tasks_ 列表
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

private:
// 执行当前任务,比如当前任务为a,并更新依赖a的任务dependent_tasks_中所有任务状态
void Execute() LOCKS_EXCLUDED(mutex_);

// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();


using WorkItem = std::function<void()>;
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
// 任务具体执行过程
WorkItem work_item_ ;
// 执行当前任务的线程池
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
// 依赖当前任务的任务列表
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);
};

Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_是仿函数,其返回类型void。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 省略部分不重要的成员
// 不明白为什么要先构造个抽象类,只有这一个派生类
class ThreadPool : public ThreadPoolInterface
{
public:
//初始化一个线程数量固定的线程池。
explicit ThreadPool(int num_threads);
// 添加想要ThreadPool执行的task,插入 tasks_not_ready_
// 如果任务满足执行要求,直接插入task_queue_准备执行
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task);
private:
//每个线程初始化时, 执行DoWork()函数. 与线程绑定
void DoWork();
void NotifyDependenciesCompleted(Task* task);
//running_只是一个监视哨, 只有线程池在running_状态时, 才能往work_queue_加入函数
bool running_ = true;
// pool_就是一系列线程
std::vector<std::thread> pool_ ;
// 十分重要的任务队列
std::deque<std::shared_ptr<Task>> task_queue_ ;
//未准备好的 task,task可能有依赖还未完成
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
};

看构造函数

1
2
3
4
5
6
7
8
9
10
ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}

大致看ThreadPool::DoWork(),最后执行了ThreadPoolInterface::Execute,其实就是Task::Execute。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数

新的Task如果有依赖项,通过Thread_pool::Schedule添加到 Thread_pooltasks_not_ready_队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED,然后再插入task_queue_队列。最终所有Task都会插入task_queue_中,在DoWork中得到执行。

如果该Task没有依赖,直接插入task_queue_,准备执行。

对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED

Thread_pool通过固定数量的thread与task_queue_(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

示意图