现在返回到最初的node_main.cc
中的Run
函数,还有一句node.RunFinalOptimization();
,所有轨迹结束时,再执行一次全局优化。 其实就是MapBuilderBridge::RunFinalOptimization
—— PoseGraph2D::RunFinalOptimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25void PoseGraph2D::RunFinalOptimization()
{
{
// 参数 max_num_final_iterations 默认 200
// 优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
// 参数 max_num_iterations, 默认 50
// 不优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
// 另一个调用地方在 PoseGraph2D 析构
WaitForAllComputations();
}
在建图结束之后会运行一个新的全局优化,不要求实时性,迭代次数多1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24void PoseGraph2D::WaitForAllComputations()
{
int num_trajectory_nodes;
{
absl::MutexLock locker(&mutex_);
num_trajectory_nodes = data_.num_trajectory_nodes;
}
const int num_finished_nodes_at_start =
constraint_builder_.GetNumFinishedNodes();
auto report_progress = [this, num_trajectory_nodes,
num_finished_nodes_at_start]() {
// Log progress on nodes only when we are actually processing nodes.
if (num_trajectory_nodes != num_finished_nodes_at_start) {
std::ostringstream progress_info;
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
<< 100. *
(constraint_builder_.GetNumFinishedNodes() -
num_finished_nodes_at_start) /
(num_trajectory_nodes - num_finished_nodes_at_start)
<< "%...";
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
}
};
num_finished_nodes_at_start
表示此时已经优化的节点数,假设为 10
GetNumFinishedNodes
返回当前已经优化的节点数,假设为 11,也就是这个函数开始后,优化了1个节点
num_trajectory_nodes
,轨迹的总节点数
progress_info
输出的是
1 | // First wait for the work queue to drain so that it's safe to schedule |
就是把ceres的迭代次数设置为200,其他部分其实不用太深入研究了。
优化的具体实现在优化器optimization_problem_
。将优化的结果采用回调的方式,返回到result中。
后端将数据加入到 optimization_problem_
的对应传感器队列中,并按时间排列。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21void PoseGraph2D::HandleWorkQueue(
const constraints::ConstraintBuilder2D::Result& result)
{
{ // 将新的约束添加到全局约束队列中
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end() );
}
// ceres 在这里
RunOptimization();
// 以下是 省略 的内容
//如果设置了全局优化回调函数,则进行调用
//根据约束结果,更新轨迹间的链接关系
// 优化完成后,累计节点清零
// 计算相同轨迹的 inter_constraints_same_trajectory
// 计算不同轨迹的 inter_constraints_different_trajectory
// 优化结束后,重新开启任务队列,即继续执行
// work_queue_里的 work_item
DrainWorkQueue();
}
优化的实际就是成员变量node_data_
和 submap_data_
,也就是1
2
3
4
5
6
7
8
9
10
11
12struct NodeSpec2D
{
common::Time time;
transform::Rigid2d local_pose_2d;
transform::Rigid2d global_pose_2d;
Eigen::Quaterniond gravity_alignment;
};
struct SubmapSpec2D
{
transform::Rigid2d global_pose;
};
1 | void PoseGraph2D::RunOptimization() |
将所有内部约束和外部约束合并在一起执行Solve
遍历所有submap,建立参数块;遍历所有node,建立参数块。
根据约束,添加残差函数;
处理里程计问题,添加可能的残差;
求解返回结果;
1 | absl::MutexLock locker(&mutex_); |
优化并非是实时,是在后台进行运行的,并且需要一定的时间。因此当完成优化时,前端输出结果已经对整个位姿图个数进行了增加。后面新加入的节点并未优化,所以返回优化的结果没有最新加入轨迹节点对应的结果。因此采用优化后结果中最后一个轨迹节点的位姿的转移矩阵,作为未参与优化轨迹节点的转移矩阵进行更新。
其中 为参与优化的节点,优化前位姿为 ,优化后的位姿为 。而 则为未参与优化的节点。
我的理解是,求inter约束也是一个 scan to map 的过程,找到和点云最相似的不同时间的子图,也就是找回环。其实是和lidar_localization
的后端用NDT找关键帧的匹配是类似的,但是cartographer是点云和栅格地图匹配,不像点云匹配那样直观,分支定界的score就像NDT匹配的score,不过前者越大越好,后者越小越好。
1 | void ConstraintBuilder2D::ComputeConstraint( |
整个函数是为了计算constraint_transform
(节点 j 和子图 i的关系) ,需要的参数有:
filtered_gravity_aligned_point_cloud
initial_pose
(节点 j 在local map坐标系的坐标)Match()
的结果 pose_estimate
(节点 j 在local map坐标系的坐标).ComputeSubmapPose()
函数 (local map坐标系转到子图 i 坐标系)计算pose_estimate
的三步:
1 | // 匹配所有子图,对应 MaybeAddGlobalConstraint |
分枝定界求出的位姿被称为 初始位姿 3,这个初始位姿3及其携带的点云作为输入,用于ceres与此子图进行优化匹配
在建立全局约束的时候,直接在一个超大的范围内进行分枝定界搜索,并不需要计算一个特殊的初始位姿2,而直接把初始位姿设置为地图limits的中心点,可以理解为map的中心点。 而且打分的参数也不同了。
全局约束的搜索窗口范围: [1e6 * limits_.resolution(), M_PI]
,角度其实是±180°
ceres优化匹配,得到更加准确的优化位置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// Use the CSM estimate as both the initial and previous pose.
// This has the effect that, in the absence of better information,
// we prefer the original CSM estimate.
ceres::Solver::Summary unused_summary;
// ceres更新pose_estimate,获得节点在local map中的最优位姿
ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate,
constant_data->filtered_gravity_aligned_point_cloud,
*submap_scan_matcher.grid, &pose_estimate,
&unused_summary);
// 计算得到node相对子图的位姿
const transform::Rigid2d constraint_transform =
ComputeSubmapPose(*submap).inverse() * pose_estimate;
constraint->reset(new Constraint{submap_id,
node_id,
{ transform::Embed3D(constraint_transform),
options_.loop_closure_translation_weight(),
options_.loop_closure_rotation_weight() },
Constraint::INTER_SUBMAP} );
对于局部约束,constraint_transform
并不是回环边,其实就是子图和节点的普通约束。 全局约束才构造回环边
1 | if (options_.log_matches() ) |
为何子图插入scan结束时CPU升高
为什么要用线程池
To be sure there is a recent loop closure constraint, you can query PoseGraphInterface::constraints()
, loop over all constraints and check if there is a recent one of type INTER that spans some time.
A simpler solution is to subscribe to the global slam callback GlobalSlamOptimizationCallback
by setting PoseGraphInterface::SetGlobalSlamOptimizationCallback
. This will call you back after Cartographer has searched for loop closures. It does not tell you if loop closures were found, it mostly tells you if the background loop closure search is keeping up or falls behind.
AMCL的粒子只包括位姿和权重,需要大量的粒子才能较好描述机器人位姿。gmapping的粒子包括轨迹、地图、权重,粒子过多会占用很大的内存。
AMCL的粒子在初始化时,各粒子的位姿是不同的,是均值参数加高斯噪声; gmapping粒子初始化时,粒子的位姿都是相同的。 所以说AMCL更符合撒粒子的逻辑。
AMCL的粒子数不是固定的,有最小和最大粒子数。在定位过程中,KLD采样会动态减少粒子数;gmapping的粒子数一直是固定的。
AMCL的提议分布是运动模型,分布的特点是又扁又宽; gmapping的提议分布是运动模型又加入了最近的观测,分布特点是小的尖峰。
AMCL的粒子权重是似然域模型改变,gmapping是通过scan match
AMCL输出的位姿是所有粒子加权平均后的结果; gmapping是输出得分最高的粒子
常见用法:
1 | vector<int> v; |
结果1
2
3
4size: 1 v.back: 0
size: 2 v.back: 1
size: 3 v.back: 2
size: 4 v.back: 3
1 |
|
对于C++ 11里vector的emplace_back
函数比较失望,都说提高了效率,其实它仅对于元素做隐式转换的情况有效,此时没有产生临时对象。对其他情况,和push_back
没区别。
这里用到的c++11特性完美转发:将接收下来的参数原样完美地传递给对象的构造函数,这带来另一个方便性就是即使是构造函数声明为 explicit
它还是可以正常工作,因为它不存在临时变量和隐式转换。
map就只有emplace
,机制也是一样的。元素是直接构建的,既不复制也不移动,仅当键不存在时才插入。
但是map有个问题:emplace 方法把它接收到的所有的参数都一起转发给 pair
的构造函数。但是对于一个 pair
来说,它既需要构造它的 key 又需要构造它的 value。如果我们按照之前普通的语法使用变参模板的话,则它是无法区分哪些参数用来构造 key, 哪些用来构造 value的。 比如下面的代码1
2
3
4
5// 无法区分哪个参数用来构造 key 哪些用来构造 value
// 有可能是 std::string s("hello", 1), std::complex<double> cpx(2)
// 也有可能是 std::string s("hello"), std::complex<double> cpx(1, 2)
std::map<std::string, std::complex<double>> scp;
scp.emplace("hello", 1, 2);
考虑使用可以接受异构、变长参数的 tuple 来对参数进行分组。再使用 C++11 提供的一个特殊类型 piecewise_construct_t
来帮助它们找到各自正确的构造函数了。std::piecewise_construct_t
是一个空类,全局变量 std::piecewise_construct
就是该类型的一个变量。
std::piecewise_construct
作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。即第一个 tuple 作为 key,第二个 tuple 作为 value。 使用 forward_as_tuple
,该函数会构造一个 tuple 并转发给 pair。
1 | // 想对于 map 避免临时变量的构造的话,就需要构建两个 tuple |
cartographer中的代码就是这样1
2
3
4
5
6
7
8
9// std::map<int, ::cartographer::mapping::PoseExtrapolator> extrapolators_;
// PoseExtrapolator的构造函数有2个参数:common::Duration pose_queue_duration,
// double imu_gravity_time_constant
extrapolators_.emplace(
std::piecewise_construct,
std::forward_as_tuple(trajectory_id),
std::forward_as_tuple(
::cartographer::common::FromSeconds(kExtrapolationEstimationTimeSec),
gravity_time_constant) );
还有一种使用无参的vector::emplace_back()
或者 deque::emplace_back()
,下面是从cartographer中学来的代码,确实有一定技巧1
2
3
4
5
6
7
8
9
10 vector<std::unique_ptr<A> > dd;
dd.emplace_back();
// 注意这里的 size 也是1
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
// cout << dd.at(0)->getValue() <<endl; 这里运行会崩溃
auto* a1 = &dd.back(); // 这个是 unique_ptr
a1->reset(new A(5) ); // 不能再用=,unique_ptr不能复制
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
cout << dd.at(0)->getValue() <<endl;
运行结果:1
2
3
4capacity: 1 size: 1
A construct
capacity: 1 size: 1
5
1 | // 省略部分不重要的成员 |
Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_
是仿函数,其返回类型void。
1 | // 省略部分不重要的成员 |
看构造函数1
2
3
4
5
6
7
8
9
10ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}
大致看ThreadPool::DoWork()
,最后执行了ThreadPoolInterface::Execute
,其实就是Task::Execute
。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数
新的Task如果有依赖项,通过Thread_pool::Schedule
添加到 Thread_pool
的 tasks_not_ready_
队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED
,然后再插入task_queue_
队列。最终所有Task都会插入task_queue_
中,在DoWork
中得到执行。
如果该Task没有依赖,直接插入task_queue_
,准备执行。
对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED
Thread_pool
通过固定数量的thread与task_queue_
(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。
cartographer_ros
节点main函数所在的称为main线程,其它的称后台线程。PoseGrapher::AddNode
在main线程,但计算两种约束、优化全局位姿都在后台线程。
PoseGraph有一个线程池,配置一定数量的线程来执行优化任务。线程池在MapBuilder
构造函数中创建,作为参数给PoseGraph
对象,PoseGraph将其作为 constraints::ConstraintBuilder2D
的初始化使用。每个线程执行 ThreadPool::DoWork()
函数。 DoWork()
函数会一直循环遍历一个deque
队列中的Task任务,每次执行完一个Task,就会从队列弹出。 即std::deque<std::shared_ptr<Task> > task_queue_
这个task_queue_
的添加任务追溯到线程池的Schedule
函数,Schedule
函数在PoseGraph2D::AddWorkItem
和添加约束的函数里
每次收到SLAM需要的信息都会生成一个work,这个work都会保存到PoseGraph的work_queue_
队列中,上面说的task通过SetWorkItem
进一步执行DrainWorkQueue
, 进而从 work_queue_
中取出 WorkItem 执行。这些 WorkItem 是由pose_graph添加各种传感器数据和 Node的时候加入到该队列的,比如AddImuData
看work_queue_
队列之前,先要搞清lambda 和 仿函数是怎么回事。
先看一个简单的仿函数1
2
3
4
5
6
7int add(float a, float b)
{
return a+b;
}
// int是返回值类型,括号里是参数的类型
std::function<int(float, float)> func = add;
cout<< func(2.4, 6.9) <<endl;
lambda表达式通常仅仅是仿函数的语法糖,可以直接转换为仿函数。[]中的任何内容都会转换为构造函数参数和仿函数对象的成员,而()中的参数将转换为仿函数operator()的参数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void AddWorkItem(std::function<int()>& work_item)
{
cout << work_item() <<endl;
}
void AddImuData(int a, int b)
{
// []:默认不捕获任何变量
// [=]:默认以值捕获所有变量
// 其实可以用 auto
std::function<int()> func = [=](){
cout <<"input: "<< a << " "<< b<< endl;
return a+b;
};
// std::function<int(float,float)> func2 = add;
AddWorkItem( func );
}
// main 函数中调用
AddImuData(29, 7);
把cartographer中的函数做了修改,这样容易理解。 main 函数中调用AddImuData
,我们要的是运行lambda表达式,假如AddWorkItem
是空函数,那么不会有运行结果。work_item()
才是实际的运行lambda表达式。
PoseGraph2D
有16个成员函数调用了AddWorkItem
,也就是加入了work_queue_
队列。操作work_queue_
的函数主要是AddWorkItem
和 DrainWorkQueue
以PoseGraph2D::AddImuData
为例:1
2
3
4
5
6
7
8
9
10
11
12
13void PoseGraph2D::AddImuData(const int trajectory_id,
const sensor::ImuData& imu_data)
{
AddWorkItem( [=]() LOCKS_EXCLUDED(mutex_)
{
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
}
);
}
后面会看到AddWorkItem
的参数是一个仿函数std::function
。 AddImuData
实际上就一句,也就是调用了AddWorkItem
, 这里面的AddWorkItem
的参数其实是把一个lambda表达式作为仿函数,这个仿函数的返回值是WorkItem::Result
,实际是枚举值kDoNotRunOptimization
或者 kRunOptimization
1 | // 将一个函数地址加入到一个工作队列中 |
work_queue_
的类型是std::unique_ptr<WorkQueue>
, WorkQueue
是WorkItem
的deque。根据上面的分析, 我们要想运行lambda表达式,要的是 work_item(),但是这个函数里没看到,因为这里只有添加,是添加到work_queue_
里了。实际的运行在DrainWorkQueue
1 | void PoseGraph2D::DrainWorkQueue() |
上面说了PoseGraph2D
有16个成员函数调用了AddWorkItem
,但是发现只有3个函数会返回kRunOptimization
:FinishTrajectory
, RunFinalOptimization
和 ComputeConstraintsForNode
。
16个里面最重要的当然是后端入口函数PoseGraph2D::AddNode
,在它的work_item
是ComputeConstraintsForNode
, 在ComputeConstraintsForNode
的最后部分:1
2
3
4
5
6if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes() )
{
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;
整个SLAM中实时进入任务队列的主要为计算约束,当加入节点数超过参数时,才进行一次优化。
最后的ConstraintBuilder2D::WhenDone
又用到了lambda表达式,又涉及到线程池,实际的后台优化是在HandleWorkQueue
。1
2
3
4
5
6
7
8
9
10
11
12
13
14// Result 就是 vector<Constraint>
void ConstraintBuilder2D::WhenDone(
const std::function<void(const ConstraintBuilder2D::Result&)>& callback)
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ == nullptr);
// 成员 unique_ptr<std::function<void(const Result&)>> when_done_;
when_done_ = absl::make_unique<std::function<void(const Result&)> > (callback);
CHECK(when_done_task_ != nullptr);
// 约束的计算结果通过回调进行回传
when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule(std::move(when_done_task_));
when_done_task_ = absl::make_unique<common::Task>();
}
这里实际就是处理when_done_task_
,它是在finish_node_task
执行完之后才执行的,内容就是HandleWorkQueue
优化执行完毕时的回调函数RunWhenDoneCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28void ConstraintBuilder2D::RunWhenDoneCallback()
{
Result result;
std::unique_ptr<std::function<void(const Result&)>> callback;
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ != nullptr);
// 将约束结果放入 result
// MaybeAddConstraint 里添加 constraints_
for (const std::unique_ptr<Constraint>& constraint : constraints_)
{
if (constraint == nullptr) continue;
result.push_back(*constraint);
}
if (options_.log_matches() )
{
LOG(INFO) << constraints_.size() << " computations resulted in "
<< result.size() << " additional constraints.";
LOG(INFO) << "Score histogram:\n" << score_histogram_.ToString(10);
}
constraints_.clear();
callback = std::move(when_done_);
when_done_.reset();
kQueueLengthMetric->Set(constraints_.size());
}
// 我理解的 HandleWorkQueue 是在这里
(*callback)(result);
}
scan match
的过程如下AddImuData
等函数不是直接放到线程里运行的,进线程池的是DrainWorkQueue
先看DEFINE
宏,有三个宏:flag的名称,flag的默认值,help字符串。如何查看help信息呢,在执行可执行文件时加上–help选项即可,但是如果要显示help信息,代码里需要调用接口gflags::ParseCommandLineFlags(&argc, &argv, true)
gflags支持的变量类型如下:
在程序中使用flag,对于DEFINE过的flag我们可以像使用普通变量那样读写它,只需加一个FLAGS_
前缀即可,如下:1
2
3
4
5
6
7
8if (FLAGS_is_handsome)
FLAGS_is_handsome = false;
std::cout << FLAGS_hobby << std::endl;
DEFINE_string(configuration_directory, "",
"First directory in which configuration files are searched, "
"second is always the Cartographer installation to allow "
"including files from there.");
1 | CHECK_EQ(x,y) == |
Google glog是一个基于程序级记录日志信息的c++库,编程使用方式与c++的stream操作类似。 每个级别的日志除了输出到对应日志文件中,还输出到每个低级别日志文件中。默认ERROR和FATAL消息除了输出到日志文件中之外,还会输出到标准错误中
每个级别都对应有相应的日志文件,文件默认存放在临时文件中,Linux是 /tmp
。运行cartographer之后,可以在/tmp
里看到
glog的库文件都在/usr/lib/x86_64-linux-gnu
glog
的明显缺点就是配置输出格式不方便,还要修改源码的logging.cc
,重新编译。我下载glog 0.5.0
后,修改logging.cc文件再编译安装,结果导致cartographer报错,ScopedRosLogSink::send
没有链接,应当是和现有的glog造成了冲突,然后一直解决不了。
然后尝试在ros_log_sink.cc
中添加函数,就是把::google::LogSink::ToString
重新定义到ros_log_sink.cc
,这样避开glog的链接问题。头文件当然也要添加这个成员函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20string ScopedRosLogSink::ToString(const ::google::LogSeverity severity, const char* file, int line,
const struct ::tm* tm_time,
const char* message, size_t message_len)
{
ostringstream stream(string(message, message_len));
stream.fill('0');
stream << '['
<< setw(2) << 1+tm_time->tm_mon
<< '_'
<< setw(2) << tm_time->tm_mday
<< " "
<< setw(2) << tm_time->tm_hour << ':'
<< setw(2) << tm_time->tm_min << ':'
<< setw(2) << tm_time->tm_sec << " "
<< file << ": " << line << "] ";
stream << string(message, message_len);
return stream.str();
}tm_time
只精确到秒,这样日志就不能精确到毫秒了,不过目前还不需要。ScopedRosLogSink::send
的开头改为ToString
函数。这样改完后,日志格式可能还不是想要的
后来才发现原来cartographer的日志格式是把glog的格式和ROS的格式融合一起了,所以修改环境变量如下1
export ROSCONSOLE_FORMAT='[${message}'
1 | FLAGS_logtostderr = true; // 设置日志消息是否转到标准输出而不是日志文件 |
cartographer的大部分文件都可以直接使用LOG(INFO)<<"log";
这样的语句,因为都提前配置好了,但是最好重新指定日志路径。
google::SetLogDestination(google::INFO, "log/INFO_");
// 设置特定级别的日志的输出目录和前缀。第一个参数为日志级别,第二个参数表示输出目录及日志文件名前缀1 |
|
每次运行cartographer后发现在自定义的路径里,INFO开头的日志文件每次都会新建,而INFO结尾的日志是最新运行得到的日志