cartographer的缺点
abstract Welcome to my blog, enter password to read.
Read more
最后阶段 RunFinalOptimization

现在返回到最初的node_main.cc中的Run函数,还有一句node.RunFinalOptimization();,所有轨迹结束时,再执行一次全局优化。 其实就是MapBuilderBridge::RunFinalOptimization —— PoseGraph2D::RunFinalOptimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void PoseGraph2D::RunFinalOptimization()
{
{
// 参数 max_num_final_iterations 默认 200
// 优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
// 参数 max_num_iterations, 默认 50
// 不优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
// 另一个调用地方在 PoseGraph2D 析构
WaitForAllComputations();
}

在建图结束之后会运行一个新的全局优化,不要求实时性,迭代次数多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void PoseGraph2D::WaitForAllComputations()
{
int num_trajectory_nodes;
{
absl::MutexLock locker(&mutex_);
num_trajectory_nodes = data_.num_trajectory_nodes;
}
const int num_finished_nodes_at_start =
constraint_builder_.GetNumFinishedNodes();

auto report_progress = [this, num_trajectory_nodes,
num_finished_nodes_at_start]() {
// Log progress on nodes only when we are actually processing nodes.
if (num_trajectory_nodes != num_finished_nodes_at_start) {
std::ostringstream progress_info;
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
<< 100. *
(constraint_builder_.GetNumFinishedNodes() -
num_finished_nodes_at_start) /
(num_trajectory_nodes - num_finished_nodes_at_start)
<< "%...";
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
}
};

  • num_finished_nodes_at_start表示此时已经优化的节点数,假设为 10

  • GetNumFinishedNodes返回当前已经优化的节点数,假设为 11,也就是这个函数开始后,优化了1个节点

  • num_trajectory_nodes,轨迹的总节点数

progress_info 输出的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  // First wait for the work queue to drain so that it's safe to schedule
// a WhenDone() callback.
{
const auto predicate = [this]()
EXCLUSIVE_LOCKS_REQUIRED(work_queue_mutex_) {
return work_queue_ == nullptr;
};
absl::MutexLock locker(&work_queue_mutex_);
while (!work_queue_mutex_.AwaitWithTimeout(
absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
}

// Now wait for any pending constraint computations to finish.
absl::MutexLock locker(&mutex_);
bool notification = false;
constraint_builder_.WhenDone(
[this,
&notification](const constraints::ConstraintBuilder2D::Result& result)
LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end());
notification = true;
});
const auto predicate = [&notification]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return notification;
};
while (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes);
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
}

就是把ceres的迭代次数设置为200,其他部分其实不用太深入研究了。


后端的线程池 3 HandleWorkQueue和优化

优化的具体实现在优化器optimization_problem_。将优化的结果采用回调的方式,返回到result中。
后端将数据加入到 optimization_problem_ 的对应传感器队列中,并按时间排列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void PoseGraph2D::HandleWorkQueue(
const constraints::ConstraintBuilder2D::Result& result)
{
{ // 将新的约束添加到全局约束队列中
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end() );
}
// ceres 在这里
RunOptimization();
// 以下是 省略 的内容
//如果设置了全局优化回调函数,则进行调用
//根据约束结果,更新轨迹间的链接关系
// 优化完成后,累计节点清零
// 计算相同轨迹的 inter_constraints_same_trajectory
// 计算不同轨迹的 inter_constraints_different_trajectory

// 优化结束后,重新开启任务队列,即继续执行
// work_queue_里的 work_item
DrainWorkQueue();
}

RunOptimization

优化的实际就是成员变量node_data_submap_data_,也就是

1
2
3
4
5
6
7
8
9
10
11
12
struct NodeSpec2D
{
common::Time time;
transform::Rigid2d local_pose_2d;
transform::Rigid2d global_pose_2d;
Eigen::Quaterniond gravity_alignment;
};

struct SubmapSpec2D
{
transform::Rigid2d global_pose;
};

1
2
3
4
5
6
7
8
9
10
11
12
void PoseGraph2D::RunOptimization()
{
if (optimization_problem_->submap_data().empty() )
return;
// std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem_;

// No other thread is accessing the optimization_problem_,
// data_.constraints, data_.frozen_trajectories and data_.landmark_nodes
// when executing Solve.
// 调用优化,Ceres在这里面,由于耗时间,故没加锁,防止阻塞其他线程
optimization_problem_->Solve(data_.constraints,
GetTrajectoryStates(), data_.landmark_nodes);

将所有内部约束和外部约束合并在一起执行Solve
遍历所有submap,建立参数块;遍历所有node,建立参数块。
根据约束,添加残差函数;
处理里程计问题,添加可能的残差;
求解返回结果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  absl::MutexLock locker(&mutex_);
// 优化后所有的submap和node数据
const auto& submap_data = optimization_problem_->submap_data();
const auto& node_data = optimization_problem_->node_data();
/* 遍历所有优化后的轨迹的所有节点位姿 */
for (const int trajectory_id : node_data.trajectory_ids())
{
for (const auto& node : node_data.trajectory(trajectory_id) )
{
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node.id);
// 更新所有节点的 全局位姿
mutable_trajectory_node.global_pose =
transform::Embed3D(node.data.global_pose_2d) *
transform::Rigid3d::Rotation(
mutable_trajectory_node.constant_data->gravity_alignment);
}

// Extrapolate all point cloud poses that were not included in the
// 'optimization_problem_' yet. 注意是已经加入tarjectory但是还没有进行优化的新Node
// 由于使用采样器建立约束,有的node没有建立约束,其pose就不会被优化
// 因此,要通过已经优化的位姿转换关系来修正所有的node

// 子图的local到global的新的转移矩阵
const auto local_to_new_global =
ComputeLocalToGlobalTransform(submap_data, trajectory_id);
// 旧的转移矩阵
const auto local_to_old_global = ComputeLocalToGlobalTransform(
data_.global_submap_poses_2d, trajectory_id);
const transform::Rigid3d old_global_to_new_global =
local_to_new_global * local_to_old_global.inverse();
// 上次最后一个优化的节点
const NodeId last_optimized_node_id =
std::prev(node_data.EndOfTrajectory(trajectory_id))->id;
auto node_it =
std::next(data_.trajectory_nodes.find(last_optimized_node_id));
// 后续未优化的节点的全局pose进行转移
for (; node_it != data_.trajectory_nodes.EndOfTrajectory(trajectory_id);
++node_it)
{
// TrajectoryNode类型, 指针
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node_it->id);
修正 global_pose
mutable_trajectory_node.global_pose =
old_global_to_new_global * mutable_trajectory_node.global_pose;
}
}
// 更新所有路标位姿
for (const auto& landmark : optimization_problem_->landmark_data()) {
data_.landmark_nodes[landmark.first].global_landmark_pose = landmark.second;
}
data_.global_submap_poses_2d = submap_data;
}

优化并非是实时,是在后台进行运行的,并且需要一定的时间。因此当完成优化时,前端输出结果已经对整个位姿图个数进行了增加。后面新加入的节点并未优化,所以返回优化的结果没有最新加入轨迹节点对应的结果。因此采用优化后结果中最后一个轨迹节点的位姿的转移矩阵,作为未参与优化轨迹节点的转移矩阵进行更新。

其中 为参与优化的节点,优化前位姿为 ,优化后的位姿为 。而 则为未参与优化的节点。


后端 5 计算约束

我的理解是,求inter约束也是一个 scan to map 的过程,找到和点云最相似的不同时间的子图,也就是找回环。其实是和lidar_localization的后端用NDT找关键帧的匹配是类似的,但是cartographer是点云和栅格地图匹配,不像点云匹配那样直观,分支定界的score就像NDT匹配的score,不过前者越大越好,后者越小越好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ConstraintBuilder2D::ComputeConstraint(
const SubmapId& submap_id, const Submap2D* const submap,
const NodeId& node_id, bool match_full_submap,
const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose,
const SubmapScanMatcher& submap_scan_matcher,
std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint)
{
CHECK(submap_scan_matcher.fast_correlative_scan_matcher);
/* node在local坐标系的位姿 = 子图在local坐标系中的位姿 * node在子图的位姿*/
const transform::Rigid2d initial_pose =
ComputeSubmapPose(*submap) * initial_relative_pose;

float score = 0.;
transform::Rigid2d pose_estimate = transform::Rigid2d::Identity();

整个函数是为了计算constraint_transform(节点 j 和子图 i的关系) ,需要的参数有:

  • 节点 j 的filtered_gravity_aligned_point_cloud
  • 分支定界的初值 initial_pose (节点 j 在local map坐标系的坐标)
  • Match() 的结果 pose_estimate (节点 j 在local map坐标系的坐标).
  • ComputeSubmapPose()函数 (local map坐标系转到子图 i 坐标系)

计算pose_estimate的三步:

  1. 使用 fast correlative scan matcher 做 Fast estimate
  2. Prune if the score is too low.
  3. ceres Refine

匹配所有子图 或 局部子图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 匹配所有子图,对应 MaybeAddGlobalConstraint
if (match_full_submap)
{
kGlobalConstraintsSearchedMetric->Increment();
if( submap_scan_matcher.fast_correlative_scan_matcher->MatchFullSubmap(
constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 global_localization_min_score
options_.global_localization_min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.global_localization_min_score());
CHECK_GE(node_id.trajectory_id, 0);
CHECK_GE(submap_id.trajectory_id, 0);
// 最后记录下全局约束的次数和统计置信度
kGlobalConstraintsFoundMetric->Increment();
kGlobalConstraintScoresMetric->Observe(score);
}
else return;
}
// 匹配局部子图
else
{
kConstraintsSearchedMetric->Increment();
if (submap_scan_matcher.fast_correlative_scan_matcher->Match(
initial_pose, constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 min_score
options_.min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.min_score());
kConstraintsFoundMetric->Increment();
kConstraintScoresMetric->Observe(score);
}
else return;
}
{
absl::MutexLock locker(&mutex_);
score_histogram_.Add(score);
}

分枝定界求出的位姿被称为 初始位姿 3,这个初始位姿3及其携带的点云作为输入,用于ceres与此子图进行优化匹配

在建立全局约束的时候,直接在一个超大的范围内进行分枝定界搜索,并不需要计算一个特殊的初始位姿2,而直接把初始位姿设置为地图limits的中心点,可以理解为map的中心点。 而且打分的参数也不同了。

全局约束的搜索窗口范围: [1e6 * limits_.resolution(), M_PI],角度其实是±180°

ceres refine

ceres优化匹配,得到更加准确的优化位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Use the CSM estimate as both the initial and previous pose. 
// This has the effect that, in the absence of better information,
// we prefer the original CSM estimate.
ceres::Solver::Summary unused_summary;
// ceres更新pose_estimate,获得节点在local map中的最优位姿
ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate,
constant_data->filtered_gravity_aligned_point_cloud,
*submap_scan_matcher.grid, &pose_estimate,
&unused_summary);
// 计算得到node相对子图的位姿
const transform::Rigid2d constraint_transform =
ComputeSubmapPose(*submap).inverse() * pose_estimate;
constraint->reset(new Constraint{submap_id,
node_id,
{ transform::Embed3D(constraint_transform),
options_.loop_closure_translation_weight(),
options_.loop_closure_rotation_weight() },
Constraint::INTER_SUBMAP} );

对于局部约束,constraint_transform并不是回环边,其实就是子图和节点的普通约束。 全局约束才构造回环边

日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (options_.log_matches() )
{
std::ostringstream info;
info << "Node " << node_id << " with "
<< constant_data->filtered_gravity_aligned_point_cloud.size()
<< " points on submap " << submap_id << std::fixed;
if (match_full_submap)
info << " matches";
else
{
const transform::Rigid2d difference =
initial_pose.inverse() * pose_estimate;
info << " differs by translation " << std::setprecision(2)
<< difference.translation().norm() << " rotation "
<< std::setprecision(3) << std::abs(difference.normalized_angle());
}
info << " with score " << std::setprecision(1) << 100. * score << "%.";
LOG(INFO) << info.str();
}

思考的问题
  • 为何子图插入scan结束时CPU升高

  • 为什么要用线程池

  • To be sure there is a recent loop closure constraint, you can query PoseGraphInterface::constraints(), loop over all constraints and check if there is a recent one of type INTER that spans some time.

  • A simpler solution is to subscribe to the global slam callback GlobalSlamOptimizationCallback by setting PoseGraphInterface::SetGlobalSlamOptimizationCallback. This will call you back after Cartographer has searched for loop closures. It does not tell you if loop closures were found, it mostly tells you if the background loop closure search is keeping up or falls behind.


gmapping和AMCL的区别

粒子

AMCL的粒子只包括位姿和权重,需要大量的粒子才能较好描述机器人位姿。gmapping的粒子包括轨迹、地图、权重,粒子过多会占用很大的内存。

AMCL的粒子在初始化时,各粒子的位姿是不同的,是均值参数加高斯噪声; gmapping粒子初始化时,粒子的位姿都是相同的。 所以说AMCL更符合撒粒子的逻辑。

AMCL的粒子数不是固定的,有最小和最大粒子数。在定位过程中,KLD采样会动态减少粒子数;gmapping的粒子数一直是固定的。

AMCL的提议分布是运动模型,分布的特点是又扁又宽; gmapping的提议分布是运动模型又加入了最近的观测,分布特点是小的尖峰。

AMCL的粒子权重是似然域模型改变,gmapping是通过scan match

AMCL输出的位姿是所有粒子加权平均后的结果; gmapping是输出得分最高的粒子


emplace 和 emplace_back

常见用法:

1
2
3
4
5
6
7
8
vector<int> v;
for(int i=0; i<4; i++)
{
v.emplace_back();
auto& num = v.back();
num = i;
qDebug()<< "size: "<<v.size() <<" v.back: "<<v.back();
}

结果

1
2
3
4
size:  1   v.back:  0
size: 2 v.back: 1
size: 3 v.back: 2
size: 4 v.back: 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <map>
#include <string>
#include <vector>
using namespace std;

class A{
public:
A(const string& s) {cout << "A construct " <<endl;}
A(const A& a) {cout <<"A copy"<<endl; }
private:
string s;
};

int main()
{
std::map<int, string> m;
vector<A> v;
A a1("test"); // a1 构造函数
cout << "------------------" << endl;
v.push_back(a1); // a1 拷贝构造函数
v.emplace_back(a1);

//先运行构造函数,后拷贝构造函数
v.push_back(A("test") );
v.emplace_back(A("test") );

// v.push_back("test"); 报错
// 这个才是真正的用途
v.emplace_back("test"); // 隐式转换,只有拷贝构造函数
cout << "******************" << endl;
return 0;
}

对于C++ 11里vector的emplace_back函数比较失望,都说提高了效率,其实它仅对于元素做隐式转换的情况有效,此时没有产生临时对象。对其他情况,和push_back没区别。

这里用到的c++11特性完美转发:将接收下来的参数原样完美地传递给对象的构造函数,这带来另一个方便性就是即使是构造函数声明为 explicit 它还是可以正常工作,因为它不存在临时变量和隐式转换。

map::emplace

map就只有emplace,机制也是一样的。元素是直接构建的,既不复制也不移动,仅当键不存在时才插入。
但是map有个问题:emplace 方法把它接收到的所有的参数都一起转发给 pair 的构造函数。但是对于一个 pair 来说,它既需要构造它的 key 又需要构造它的 value。如果我们按照之前普通的语法使用变参模板的话,则它是无法区分哪些参数用来构造 key, 哪些用来构造 value的。 比如下面的代码

1
2
3
4
5
// 无法区分哪个参数用来构造 key 哪些用来构造 value
// 有可能是 std::string s("hello", 1), std::complex<double> cpx(2)
// 也有可能是 std::string s("hello"), std::complex<double> cpx(1, 2)
std::map<std::string, std::complex<double>> scp;
scp.emplace("hello", 1, 2);

考虑使用可以接受异构、变长参数的 tuple 来对参数进行分组。再使用 C++11 提供的一个特殊类型 piecewise_construct_t 来帮助它们找到各自正确的构造函数了。std::piecewise_construct_t 是一个空类,全局变量 std::piecewise_construct 就是该类型的一个变量。

std::piecewise_construct作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。即第一个 tuple 作为 key,第二个 tuple 作为 value。 使用 forward_as_tuple,该函数会构造一个 tuple 并转发给 pair。

1
2
3
4
5
// 想对于 map 避免临时变量的构造的话,就需要构建两个 tuple
std::map<std::string, std::complex<double>> scp;
scp.emplace(std::piecewise_construct, // 此常量值作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。
std::forward_as_tuple("hello"), // 该函数会构造一个 tuple 并转发给 pair 构造,并存储在 first 字段
std::forward_as_tuple(1, 2)); //该函数会构造一个 tuple 并转发给 pair 构造,存储在 second 字段

cartographer中的代码就是这样

1
2
3
4
5
6
7
8
9
// std::map<int, ::cartographer::mapping::PoseExtrapolator>  extrapolators_;
// PoseExtrapolator的构造函数有2个参数:common::Duration pose_queue_duration,
// double imu_gravity_time_constant
extrapolators_.emplace(
std::piecewise_construct,
std::forward_as_tuple(trajectory_id),
std::forward_as_tuple(
::cartographer::common::FromSeconds(kExtrapolationEstimationTimeSec),
gravity_time_constant) );

无参的emplace_back

还有一种使用无参的vector::emplace_back() 或者 deque::emplace_back(),下面是从cartographer中学来的代码,确实有一定技巧

1
2
3
4
5
6
7
8
9
10
    vector<std::unique_ptr<A> > dd;
dd.emplace_back();
// 注意这里的 size 也是1
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
// cout << dd.at(0)->getValue() <<endl; 这里运行会崩溃

auto* a1 = &dd.back(); // 这个是 unique_ptr
a1->reset(new A(5) ); // 不能再用=,unique_ptr不能复制
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
cout << dd.at(0)->getValue() <<endl;

运行结果:

1
2
3
4
capacity: 1 size: 1
A construct
capacity: 1 size: 1
5

参考:
emplace_back VS push_back


后端的线程池 1 thread_pool 和 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 省略部分不重要的成员
class Task
{
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
// 设置Task 执行的任务 (函数)
void SetWorkItem(const WorkItem& work_item);
// 给当前任务添加 依赖任务, 如当前任务为b,AddDependency(a)表示任务a依赖b
// 把当前任务b,加入到依赖任务 a 的 dependent_tasks_ 列表
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

private:
// 执行当前任务,比如当前任务为a,并更新依赖a的任务dependent_tasks_中所有任务状态
void Execute() LOCKS_EXCLUDED(mutex_);

// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();


using WorkItem = std::function<void()>;
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
// 任务具体执行过程
WorkItem work_item_ ;
// 执行当前任务的线程池
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
// 依赖当前任务的任务列表
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);
};

Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_是仿函数,其返回类型void。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 省略部分不重要的成员
// 不明白为什么要先构造个抽象类,只有这一个派生类
class ThreadPool : public ThreadPoolInterface
{
public:
//初始化一个线程数量固定的线程池。
explicit ThreadPool(int num_threads);
// 添加想要ThreadPool执行的task,插入 tasks_not_ready_
// 如果任务满足执行要求,直接插入task_queue_准备执行
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task);
private:
//每个线程初始化时, 执行DoWork()函数. 与线程绑定
void DoWork();
void NotifyDependenciesCompleted(Task* task);
//running_只是一个监视哨, 只有线程池在running_状态时, 才能往work_queue_加入函数
bool running_ = true;
// pool_就是一系列线程
std::vector<std::thread> pool_ ;
// 十分重要的任务队列
std::deque<std::shared_ptr<Task>> task_queue_ ;
//未准备好的 task,task可能有依赖还未完成
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
};

看构造函数

1
2
3
4
5
6
7
8
9
10
ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}

大致看ThreadPool::DoWork(),最后执行了ThreadPoolInterface::Execute,其实就是Task::Execute。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数

新的Task如果有依赖项,通过Thread_pool::Schedule添加到 Thread_pooltasks_not_ready_队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED,然后再插入task_queue_队列。最终所有Task都会插入task_queue_中,在DoWork中得到执行。

如果该Task没有依赖,直接插入task_queue_,准备执行。

对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED

Thread_pool通过固定数量的thread与task_queue_(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

示意图


后端的线程池 2 线程池和work_queue

cartographer_ros节点main函数所在的称为main线程,其它的称后台线程。PoseGrapher::AddNode在main线程,但计算两种约束、优化全局位姿都在后台线程。

PoseGraph有一个线程池,配置一定数量的线程来执行优化任务。线程池在MapBuilder构造函数中创建,作为参数给PoseGraph对象,PoseGraph将其作为 constraints::ConstraintBuilder2D的初始化使用。每个线程执行 ThreadPool::DoWork()函数。 DoWork()函数会一直循环遍历一个deque队列中的Task任务,每次执行完一个Task,就会从队列弹出。 即std::deque<std::shared_ptr<Task> > task_queue_

这个task_queue_的添加任务追溯到线程池的Schedule函数,Schedule函数在PoseGraph2D::AddWorkItem和添加约束的函数里

每次收到SLAM需要的信息都会生成一个work,这个work都会保存到PoseGraph的work_queue_队列中,上面说的task通过SetWorkItem进一步执行DrainWorkQueue, 进而从 work_queue_中取出 WorkItem 执行。这些 WorkItem 是由pose_graph添加各种传感器数据和 Node的时候加入到该队列的,比如AddImuData

lambda 和 仿函数

work_queue_队列之前,先要搞清lambda 和 仿函数是怎么回事。

先看一个简单的仿函数

1
2
3
4
5
6
7
int add(float a, float b)
{
return a+b;
}
// int是返回值类型,括号里是参数的类型
std::function<int(float, float)> func = add;
cout<< func(2.4, 6.9) <<endl;

lambda表达式通常仅仅是仿函数的语法糖,可以直接转换为仿函数。[]中的任何内容都会转换为构造函数参数和仿函数对象的成员,而()中的参数将转换为仿函数operator()的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void AddWorkItem(std::function<int()>&  work_item)
{
cout << work_item() <<endl;
}
void AddImuData(int a, int b)
{
// []:默认不捕获任何变量
// [=]:默认以值捕获所有变量
// 其实可以用 auto
std::function<int()> func = [=](){
cout <<"input: "<< a << " "<< b<< endl;
return a+b;
};
// std::function<int(float,float)> func2 = add;
AddWorkItem( func );
}

// main 函数中调用
AddImuData(29, 7);

把cartographer中的函数做了修改,这样容易理解。 main 函数中调用AddImuData,我们要的是运行lambda表达式,假如AddWorkItem是空函数,那么不会有运行结果。work_item()才是实际的运行lambda表达式。

workqueue 机制

PoseGraph2D有16个成员函数调用了AddWorkItem,也就是加入了work_queue_队列。操作work_queue_的函数主要是AddWorkItemDrainWorkQueue

PoseGraph2D::AddImuData为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
void PoseGraph2D::AddImuData(const int trajectory_id,
const sensor::ImuData& imu_data)
{
AddWorkItem( [=]() LOCKS_EXCLUDED(mutex_)
{
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
}
);
}

后面会看到AddWorkItem的参数是一个仿函数std::functionAddImuData实际上就一句,也就是调用了AddWorkItem, 这里面的AddWorkItem的参数其实是把一个lambda表达式作为仿函数,这个仿函数的返回值是WorkItem::Result,实际是枚举值kDoNotRunOptimization 或者 kRunOptimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 将一个函数地址加入到一个工作队列中
void PoseGraph2D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item)
{
absl::MutexLock locker(&work_queue_mutex_);
// 如果工作队列未被初始化,则先初始化,再打开启线程池,后台打开任务队列执行循环
// 这也是唯一初始化 work_queue_ 的地方
// std::unique_ptr<WorkQueue> work_queue_
if (work_queue_ == nullptr)
{
work_queue_ = absl::make_unique<WorkQueue>();
// DrainWorkQueue函数加入到线程池
auto task = absl::make_unique<common::Task>();
task->SetWorkItem([this]() { DrainWorkQueue(); });
thread_pool_->Schedule(std::move(task) );
}
const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item});
}

work_queue_的类型是std::unique_ptr<WorkQueue>WorkQueueWorkItem的deque。根据上面的分析, 我们要想运行lambda表达式,要的是 work_item(),但是这个函数里没看到,因为这里只有添加,是添加到work_queue_里了。实际的运行在DrainWorkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void PoseGraph2D::DrainWorkQueue()
{
bool process_work_queue = true;
size_t work_queue_size;
while (process_work_queue)
{
std::function<WorkItem::Result()> work_item;
{
absl::MutexLock locker(&work_queue_mutex_);
// work_queue_ 队列中取出压栈的任务一个一个执行,直到为空
// work_queue_ 为空, 说明thread_pool中DrainWorkQueue执行完,
// 等待下一次addNode时候,再次执行thread_pool_->Schedule(task),再次执行.
if (work_queue_->empty())
{
work_queue_.reset(); // 智能指针 reset
return;
}
// 获取队列中最前的任务,这就是上面说的 work_item
work_item = work_queue_->front().task;
work_queue_->pop_front();
work_queue_size = work_queue_->size();
kWorkQueueSizeMetric->Set(work_queue_size);
}
//这里的 work_item()就是我们需要的 AddImuData里的lambda表达式
//回头看lambda,可以发现返回值是 kDoNotRunOptimization
// 执行工作任务后,返回的状态,赋值false或true给 process_work_queue
process_work_queue = work_item() == WorkItem::Result::kDoNotRunOptimization;
}
// 运行到这里,说明队列里运行的lambda表达式的返回值是 kRunOptimization
// 实际上pop了好几个任务
// 如果work_item()都不需要全局优化, 则直到work_queue_为空, 都不会执行优化
LOG(INFO) << "Remaining work items in queue: " << work_queue_size;
// We have to optimize again
constraint_builder_.WhenDone(
[this](const constraints::ConstraintBuilder2D::Result& result)
{
HandleWorkQueue(result);
} );
}

上面说了PoseGraph2D有16个成员函数调用了AddWorkItem,但是发现只有3个函数会返回kRunOptimization
FinishTrajectory, RunFinalOptimizationComputeConstraintsForNode

16个里面最重要的当然是后端入口函数PoseGraph2D::AddNode,在它的work_itemComputeConstraintsForNode, 在ComputeConstraintsForNode的最后部分:

1
2
3
4
5
6
if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes() )
{
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;

整个SLAM中实时进入任务队列的主要为计算约束,当加入节点数超过参数时,才进行一次优化。

ConstraintBuilder2D::WhenDone

最后的ConstraintBuilder2D::WhenDone又用到了lambda表达式,又涉及到线程池,实际的后台优化是在HandleWorkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Result 就是 vector<Constraint>
void ConstraintBuilder2D::WhenDone(
const std::function<void(const ConstraintBuilder2D::Result&)>& callback)
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ == nullptr);
// 成员 unique_ptr<std::function<void(const Result&)>> when_done_;
when_done_ = absl::make_unique<std::function<void(const Result&)> > (callback);
CHECK(when_done_task_ != nullptr);
// 约束的计算结果通过回调进行回传
when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule(std::move(when_done_task_));
when_done_task_ = absl::make_unique<common::Task>();
}

这里实际就是处理when_done_task_,它是在finish_node_task执行完之后才执行的,内容就是HandleWorkQueue

优化执行完毕时的回调函数RunWhenDoneCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void ConstraintBuilder2D::RunWhenDoneCallback()
{
Result result;
std::unique_ptr<std::function<void(const Result&)>> callback;
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ != nullptr);
// 将约束结果放入 result
// MaybeAddConstraint 里添加 constraints_
for (const std::unique_ptr<Constraint>& constraint : constraints_)
{
if (constraint == nullptr) continue;
result.push_back(*constraint);
}
if (options_.log_matches() )
{
LOG(INFO) << constraints_.size() << " computations resulted in "
<< result.size() << " additional constraints.";
LOG(INFO) << "Score histogram:\n" << score_histogram_.ToString(10);
}
constraints_.clear();
callback = std::move(when_done_);
when_done_.reset();
kQueueLengthMetric->Set(constraints_.size());
}
// 我理解的 HandleWorkQueue 是在这里
(*callback)(result);
}

scan match的过程如下

AddImuData等函数不是直接放到线程里运行的,进线程池的是DrainWorkQueue


cartographer的gflag和glog日志

gflag

CHECK宏.png
先看DEFINE宏,有三个宏:flag的名称,flag的默认值,help字符串。如何查看help信息呢,在执行可执行文件时加上–help选项即可,但是如果要显示help信息,代码里需要调用接口gflags::ParseCommandLineFlags(&argc, &argv, true)

gflags支持的变量类型如下:

  • DEFINE_bool: 布尔类型
  • DEFINE_int32: 32-bit 整型
  • DEFINE_int64: 64-bit 整型
  • DEFINE_uint64: 无符号 64-bit 整型
  • DEFINE_double: double
  • DEFINE_string: C++ string

在程序中使用flag,对于DEFINE过的flag我们可以像使用普通变量那样读写它,只需加一个FLAGS_前缀即可,如下:

1
2
3
4
5
6
7
8
if (FLAGS_is_handsome)
FLAGS_is_handsome = false;
std::cout << FLAGS_hobby << std::endl;

DEFINE_string(configuration_directory, "",
"First directory in which configuration files are searched, "
"second is always the Cartographer installation to allow "
"including files from there.");


1
2
3
4
5
6
CHECK_EQ(x,y)    ==
CHECK_NE(x,y) =
CHECK_LE(x,y) <=
CHECK_LT(x,y) <
CHECK_GE(x,y) >=
CHECK_GT(x,y) >

glog

Google glog是一个基于程序级记录日志信息的c++库,编程使用方式与c++的stream操作类似。 每个级别的日志除了输出到对应日志文件中,还输出到每个低级别日志文件中。默认ERROR和FATAL消息除了输出到日志文件中之外,还会输出到标准错误中
在CMakeLists中的配置.png
每个级别都对应有相应的日志文件,文件默认存放在临时文件中,Linux是 /tmp。运行cartographer之后,可以在/tmp里看到

glog的库文件都在/usr/lib/x86_64-linux-gnu

自定义cartographer的日志格式

glog的明显缺点就是配置输出格式不方便,还要修改源码的logging.cc,重新编译。我下载glog 0.5.0后,修改logging.cc文件再编译安装,结果导致cartographer报错,ScopedRosLogSink::send没有链接,应当是和现有的glog造成了冲突,然后一直解决不了。

然后尝试在ros_log_sink.cc中添加函数,就是把::google::LogSink::ToString重新定义到ros_log_sink.cc,这样避开glog的链接问题。头文件当然也要添加这个成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
string ScopedRosLogSink::ToString(const ::google::LogSeverity severity, const char* file, int line,
const struct ::tm* tm_time,
const char* message, size_t message_len)
{
ostringstream stream(string(message, message_len));
stream.fill('0');

stream << '['
<< setw(2) << 1+tm_time->tm_mon
<< '_'
<< setw(2) << tm_time->tm_mday
<< " "
<< setw(2) << tm_time->tm_hour << ':'
<< setw(2) << tm_time->tm_min << ':'
<< setw(2) << tm_time->tm_sec << " "
<< file << ": " << line << "] ";

stream << string(message, message_len);
return stream.str();
}

tm_time只精确到秒,这样日志就不能精确到毫秒了,不过目前还不需要。ScopedRosLogSink::send的开头改为ToString函数。这样改完后,日志格式可能还不是想要的
默认日志格式.png
后来才发现原来cartographer的日志格式是把glog的格式和ROS的格式融合一起了,所以修改环境变量如下
1
export ROSCONSOLE_FORMAT='[${message}'

修改后的日志.png

几个常用参数

1
2
3
4
5
6
7
8
9
10
11
FLAGS_logtostderr = true; // 设置日志消息是否转到标准输出而不是日志文件

FLAGS_alsologtostderr = true; // 设置日志消息除了日志文件之外是否去标准输出

FLAGS_colorlogtostderr = true; // 设置记录到标准输出的颜色消息(如果终端支持)

FLAGS_logbufsecs = ; // 设置可以缓冲日志的最大秒数,0指实时输出

FLAGS_max_log_size = ; // 设置最大日志文件大小(以MB为单位)

FLAGS_stop_logging_if_full_disk = true; // 设置是否在磁盘已满时避免日志记录到磁盘

cartographer的大部分文件都可以直接使用LOG(INFO)<<"log";这样的语句,因为都提前配置好了,但是最好重新指定日志路径。

  • google::SetLogDestination(google::INFO, "log/INFO_"); // 设置特定级别的日志的输出目录和前缀。第一个参数为日志级别,第二个参数表示输出目录及日志文件名前缀
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <glog/logging.h>

int main(int argc, char** argv)
{
google::InitGoogleLogging(argv[0]);
// INFO
std::string str_info;
// 文件名最后只能是 等级+下划线
str_info.append("/home/user/log/INFO_");
google::SetLogDestination(google::INFO, str_info.c_str());
LOG(INFO)<< "The is a info!";
// WARNING
std::string str_warn;
str_warn.append("/home/user/log/WARNING_");
google::SetLogDestination(google::WARNING, str_warn.c_str());
LOG(WARNING) << "The is a warning!";

// stop glog 可以没有
google::ShutdownGoogleLogging();
}

每次运行cartographer后发现在自定义的路径里,INFO开头的日志文件每次都会新建,而INFO结尾的日志是最新运行得到的日志