思考的问题
  • 为何子图插入scan结束时CPU升高

  • 为什么要用线程池

  • To be sure there is a recent loop closure constraint, you can query PoseGraphInterface::constraints(), loop over all constraints and check if there is a recent one of type INTER that spans some time.

  • A simpler solution is to subscribe to the global slam callback GlobalSlamOptimizationCallback by setting PoseGraphInterface::SetGlobalSlamOptimizationCallback. This will call you back after Cartographer has searched for loop closures. It does not tell you if loop closures were found, it mostly tells you if the background loop closure search is keeping up or falls behind.


gmapping和AMCL的区别

粒子

AMCL的粒子只包括位姿和权重,需要大量的粒子才能较好描述机器人位姿。gmapping的粒子包括轨迹、地图、权重,粒子过多会占用很大的内存。

AMCL的粒子在初始化时,各粒子的位姿是不同的,是均值参数加高斯噪声; gmapping粒子初始化时,粒子的位姿都是相同的。 所以说AMCL更符合撒粒子的逻辑。

AMCL的粒子数不是固定的,有最小和最大粒子数。在定位过程中,KLD采样会动态减少粒子数;gmapping的粒子数一直是固定的。

AMCL的提议分布是运动模型,分布的特点是又扁又宽; gmapping的提议分布是运动模型又加入了最近的观测,分布特点是小的尖峰。

AMCL的粒子权重是似然域模型改变,gmapping是通过scan match

AMCL输出的位姿是所有粒子加权平均后的结果; gmapping是输出得分最高的粒子


emplace 和 emplace_back

常见用法:

1
2
3
4
5
6
7
8
vector<int> v;
for(int i=0; i<4; i++)
{
v.emplace_back();
auto& num = v.back();
num = i;
qDebug()<< "size: "<<v.size() <<" v.back: "<<v.back();
}

结果

1
2
3
4
size:  1   v.back:  0
size: 2 v.back: 1
size: 3 v.back: 2
size: 4 v.back: 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <map>
#include <string>
#include <vector>
using namespace std;

class A{
public:
A(const string& s) {cout << "A construct " <<endl;}
A(const A& a) {cout <<"A copy"<<endl; }
private:
string s;
};

int main()
{
std::map<int, string> m;
vector<A> v;
A a1("test"); // a1 构造函数
cout << "------------------" << endl;
v.push_back(a1); // a1 拷贝构造函数
v.emplace_back(a1);

//先运行构造函数,后拷贝构造函数
v.push_back(A("test") );
v.emplace_back(A("test") );

// v.push_back("test"); 报错
// 这个才是真正的用途
v.emplace_back("test"); // 隐式转换,只有拷贝构造函数
cout << "******************" << endl;
return 0;
}

对于C++ 11里vector的emplace_back函数比较失望,都说提高了效率,其实它仅对于元素做隐式转换的情况有效,此时没有产生临时对象。对其他情况,和push_back没区别。

这里用到的c++11特性完美转发:将接收下来的参数原样完美地传递给对象的构造函数,这带来另一个方便性就是即使是构造函数声明为 explicit 它还是可以正常工作,因为它不存在临时变量和隐式转换。

map::emplace

map就只有emplace,机制也是一样的。元素是直接构建的,既不复制也不移动,仅当键不存在时才插入。
但是map有个问题:emplace 方法把它接收到的所有的参数都一起转发给 pair 的构造函数。但是对于一个 pair 来说,它既需要构造它的 key 又需要构造它的 value。如果我们按照之前普通的语法使用变参模板的话,则它是无法区分哪些参数用来构造 key, 哪些用来构造 value的。 比如下面的代码

1
2
3
4
5
// 无法区分哪个参数用来构造 key 哪些用来构造 value
// 有可能是 std::string s("hello", 1), std::complex<double> cpx(2)
// 也有可能是 std::string s("hello"), std::complex<double> cpx(1, 2)
std::map<std::string, std::complex<double>> scp;
scp.emplace("hello", 1, 2);

考虑使用可以接受异构、变长参数的 tuple 来对参数进行分组。再使用 C++11 提供的一个特殊类型 piecewise_construct_t 来帮助它们找到各自正确的构造函数了。std::piecewise_construct_t 是一个空类,全局变量 std::piecewise_construct 就是该类型的一个变量。

std::piecewise_construct作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。即第一个 tuple 作为 key,第二个 tuple 作为 value。 使用 forward_as_tuple,该函数会构造一个 tuple 并转发给 pair。

1
2
3
4
5
// 想对于 map 避免临时变量的构造的话,就需要构建两个 tuple
std::map<std::string, std::complex<double>> scp;
scp.emplace(std::piecewise_construct, // 此常量值作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。
std::forward_as_tuple("hello"), // 该函数会构造一个 tuple 并转发给 pair 构造,并存储在 first 字段
std::forward_as_tuple(1, 2)); //该函数会构造一个 tuple 并转发给 pair 构造,存储在 second 字段

cartographer中的代码就是这样

1
2
3
4
5
6
7
8
9
// std::map<int, ::cartographer::mapping::PoseExtrapolator>  extrapolators_;
// PoseExtrapolator的构造函数有2个参数:common::Duration pose_queue_duration,
// double imu_gravity_time_constant
extrapolators_.emplace(
std::piecewise_construct,
std::forward_as_tuple(trajectory_id),
std::forward_as_tuple(
::cartographer::common::FromSeconds(kExtrapolationEstimationTimeSec),
gravity_time_constant) );

无参的emplace_back

还有一种使用无参的vector::emplace_back() 或者 deque::emplace_back(),下面是从cartographer中学来的代码,确实有一定技巧

1
2
3
4
5
6
7
8
9
10
    vector<std::unique_ptr<A> > dd;
dd.emplace_back();
// 注意这里的 size 也是1
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
// cout << dd.at(0)->getValue() <<endl; 这里运行会崩溃

auto* a1 = &dd.back(); // 这个是 unique_ptr
a1->reset(new A(5) ); // 不能再用=,unique_ptr不能复制
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
cout << dd.at(0)->getValue() <<endl;

运行结果:

1
2
3
4
capacity: 1 size: 1
A construct
capacity: 1 size: 1
5

参考:
emplace_back VS push_back


后端的线程池 1 thread_pool 和 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 省略部分不重要的成员
class Task
{
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
// 设置Task 执行的任务 (函数)
void SetWorkItem(const WorkItem& work_item);
// 给当前任务添加 依赖任务, 如当前任务为b,AddDependency(a)表示任务a依赖b
// 把当前任务b,加入到依赖任务 a 的 dependent_tasks_ 列表
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

private:
// 执行当前任务,比如当前任务为a,并更新依赖a的任务dependent_tasks_中所有任务状态
void Execute() LOCKS_EXCLUDED(mutex_);

// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();


using WorkItem = std::function<void()>;
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
// 任务具体执行过程
WorkItem work_item_ ;
// 执行当前任务的线程池
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
// 依赖当前任务的任务列表
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);
};

Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_是仿函数,其返回类型void。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 省略部分不重要的成员
// 不明白为什么要先构造个抽象类,只有这一个派生类
class ThreadPool : public ThreadPoolInterface
{
public:
//初始化一个线程数量固定的线程池。
explicit ThreadPool(int num_threads);
// 添加想要ThreadPool执行的task,插入 tasks_not_ready_
// 如果任务满足执行要求,直接插入task_queue_准备执行
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task);
private:
//每个线程初始化时, 执行DoWork()函数. 与线程绑定
void DoWork();
void NotifyDependenciesCompleted(Task* task);
//running_只是一个监视哨, 只有线程池在running_状态时, 才能往work_queue_加入函数
bool running_ = true;
// pool_就是一系列线程
std::vector<std::thread> pool_ ;
// 十分重要的任务队列
std::deque<std::shared_ptr<Task>> task_queue_ ;
//未准备好的 task,task可能有依赖还未完成
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
};

看构造函数

1
2
3
4
5
6
7
8
9
10
ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}

大致看ThreadPool::DoWork(),最后执行了ThreadPoolInterface::Execute,其实就是Task::Execute。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数

新的Task如果有依赖项,通过Thread_pool::Schedule添加到 Thread_pooltasks_not_ready_队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED,然后再插入task_queue_队列。最终所有Task都会插入task_queue_中,在DoWork中得到执行。

如果该Task没有依赖,直接插入task_queue_,准备执行。

对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED

Thread_pool通过固定数量的thread与task_queue_(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

示意图


后端的线程池 2 线程池和work_queue

cartographer_ros节点main函数所在的称为main线程,其它的称后台线程。PoseGrapher::AddNode在main线程,但计算两种约束、优化全局位姿都在后台线程。

PoseGraph有一个线程池,配置一定数量的线程来执行优化任务。线程池在MapBuilder构造函数中创建,作为参数给PoseGraph对象,PoseGraph将其作为 constraints::ConstraintBuilder2D的初始化使用。每个线程执行 ThreadPool::DoWork()函数。 DoWork()函数会一直循环遍历一个deque队列中的Task任务,每次执行完一个Task,就会从队列弹出。 即std::deque<std::shared_ptr<Task> > task_queue_

这个task_queue_的添加任务追溯到线程池的Schedule函数,Schedule函数在PoseGraph2D::AddWorkItem和添加约束的函数里

每次收到SLAM需要的信息都会生成一个work,这个work都会保存到PoseGraph的work_queue_队列中,上面说的task通过SetWorkItem进一步执行DrainWorkQueue, 进而从 work_queue_中取出 WorkItem 执行。这些 WorkItem 是由pose_graph添加各种传感器数据和 Node的时候加入到该队列的,比如AddImuData

lambda 和 仿函数

work_queue_队列之前,先要搞清lambda 和 仿函数是怎么回事。

先看一个简单的仿函数

1
2
3
4
5
6
7
int add(float a, float b)
{
return a+b;
}
// int是返回值类型,括号里是参数的类型
std::function<int(float, float)> func = add;
cout<< func(2.4, 6.9) <<endl;

lambda表达式通常仅仅是仿函数的语法糖,可以直接转换为仿函数。[]中的任何内容都会转换为构造函数参数和仿函数对象的成员,而()中的参数将转换为仿函数operator()的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void AddWorkItem(std::function<int()>&  work_item)
{
cout << work_item() <<endl;
}
void AddImuData(int a, int b)
{
// []:默认不捕获任何变量
// [=]:默认以值捕获所有变量
// 其实可以用 auto
std::function<int()> func = [=](){
cout <<"input: "<< a << " "<< b<< endl;
return a+b;
};
// std::function<int(float,float)> func2 = add;
AddWorkItem( func );
}

// main 函数中调用
AddImuData(29, 7);

把cartographer中的函数做了修改,这样容易理解。 main 函数中调用AddImuData,我们要的是运行lambda表达式,假如AddWorkItem是空函数,那么不会有运行结果。work_item()才是实际的运行lambda表达式。

workqueue 机制

PoseGraph2D有16个成员函数调用了AddWorkItem,也就是加入了work_queue_队列。操作work_queue_的函数主要是AddWorkItemDrainWorkQueue

PoseGraph2D::AddImuData为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
void PoseGraph2D::AddImuData(const int trajectory_id,
const sensor::ImuData& imu_data)
{
AddWorkItem( [=]() LOCKS_EXCLUDED(mutex_)
{
absl::MutexLock locker(&mutex_);
if (CanAddWorkItemModifying(trajectory_id)) {
optimization_problem_->AddImuData(trajectory_id, imu_data);
}
return WorkItem::Result::kDoNotRunOptimization;
}
);
}

后面会看到AddWorkItem的参数是一个仿函数std::functionAddImuData实际上就一句,也就是调用了AddWorkItem, 这里面的AddWorkItem的参数其实是把一个lambda表达式作为仿函数,这个仿函数的返回值是WorkItem::Result,实际是枚举值kDoNotRunOptimization 或者 kRunOptimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 将一个函数地址加入到一个工作队列中
void PoseGraph2D::AddWorkItem(
const std::function<WorkItem::Result()>& work_item)
{
absl::MutexLock locker(&work_queue_mutex_);
// 如果工作队列未被初始化,则先初始化,再打开启线程池,后台打开任务队列执行循环
// 这也是唯一初始化 work_queue_ 的地方
// std::unique_ptr<WorkQueue> work_queue_
if (work_queue_ == nullptr)
{
work_queue_ = absl::make_unique<WorkQueue>();
// DrainWorkQueue函数加入到线程池
auto task = absl::make_unique<common::Task>();
task->SetWorkItem([this]() { DrainWorkQueue(); });
thread_pool_->Schedule(std::move(task) );
}
const auto now = std::chrono::steady_clock::now();
work_queue_->push_back({now, work_item});
}

work_queue_的类型是std::unique_ptr<WorkQueue>WorkQueueWorkItem的deque。根据上面的分析, 我们要想运行lambda表达式,要的是 work_item(),但是这个函数里没看到,因为这里只有添加,是添加到work_queue_里了。实际的运行在DrainWorkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void PoseGraph2D::DrainWorkQueue()
{
bool process_work_queue = true;
size_t work_queue_size;
while (process_work_queue)
{
std::function<WorkItem::Result()> work_item;
{
absl::MutexLock locker(&work_queue_mutex_);
// work_queue_ 队列中取出压栈的任务一个一个执行,直到为空
// work_queue_ 为空, 说明thread_pool中DrainWorkQueue执行完,
// 等待下一次addNode时候,再次执行thread_pool_->Schedule(task),再次执行.
if (work_queue_->empty())
{
work_queue_.reset(); // 智能指针 reset
return;
}
// 获取队列中最前的任务,这就是上面说的 work_item
work_item = work_queue_->front().task;
work_queue_->pop_front();
work_queue_size = work_queue_->size();
kWorkQueueSizeMetric->Set(work_queue_size);
}
//这里的 work_item()就是我们需要的 AddImuData里的lambda表达式
//回头看lambda,可以发现返回值是 kDoNotRunOptimization
// 执行工作任务后,返回的状态,赋值false或true给 process_work_queue
process_work_queue = work_item() == WorkItem::Result::kDoNotRunOptimization;
}
// 运行到这里,说明队列里运行的lambda表达式的返回值是 kRunOptimization
// 实际上pop了好几个任务
// 如果work_item()都不需要全局优化, 则直到work_queue_为空, 都不会执行优化
LOG(INFO) << "Remaining work items in queue: " << work_queue_size;
// We have to optimize again
constraint_builder_.WhenDone(
[this](const constraints::ConstraintBuilder2D::Result& result)
{
HandleWorkQueue(result);
} );
}

上面说了PoseGraph2D有16个成员函数调用了AddWorkItem,但是发现只有3个函数会返回kRunOptimization
FinishTrajectory, RunFinalOptimizationComputeConstraintsForNode

16个里面最重要的当然是后端入口函数PoseGraph2D::AddNode,在它的work_itemComputeConstraintsForNode, 在ComputeConstraintsForNode的最后部分:

1
2
3
4
5
6
if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes() )
{
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;

整个SLAM中实时进入任务队列的主要为计算约束,当加入节点数超过参数时,才进行一次优化。

ConstraintBuilder2D::WhenDone

最后的ConstraintBuilder2D::WhenDone又用到了lambda表达式,又涉及到线程池,实际的后台优化是在HandleWorkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Result 就是 vector<Constraint>
void ConstraintBuilder2D::WhenDone(
const std::function<void(const ConstraintBuilder2D::Result&)>& callback)
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ == nullptr);
// 成员 unique_ptr<std::function<void(const Result&)>> when_done_;
when_done_ = absl::make_unique<std::function<void(const Result&)> > (callback);
CHECK(when_done_task_ != nullptr);
// 约束的计算结果通过回调进行回传
when_done_task_->SetWorkItem([this] { RunWhenDoneCallback(); });
thread_pool_->Schedule(std::move(when_done_task_));
when_done_task_ = absl::make_unique<common::Task>();
}

这里实际就是处理when_done_task_,它是在finish_node_task执行完之后才执行的,内容就是HandleWorkQueue

优化执行完毕时的回调函数RunWhenDoneCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void ConstraintBuilder2D::RunWhenDoneCallback()
{
Result result;
std::unique_ptr<std::function<void(const Result&)>> callback;
{
absl::MutexLock locker(&mutex_);
CHECK(when_done_ != nullptr);
// 将约束结果放入 result
// MaybeAddConstraint 里添加 constraints_
for (const std::unique_ptr<Constraint>& constraint : constraints_)
{
if (constraint == nullptr) continue;
result.push_back(*constraint);
}
if (options_.log_matches() )
{
LOG(INFO) << constraints_.size() << " computations resulted in "
<< result.size() << " additional constraints.";
LOG(INFO) << "Score histogram:\n" << score_histogram_.ToString(10);
}
constraints_.clear();
callback = std::move(when_done_);
when_done_.reset();
kQueueLengthMetric->Set(constraints_.size());
}
// 我理解的 HandleWorkQueue 是在这里
(*callback)(result);
}

scan match的过程如下

AddImuData等函数不是直接放到线程里运行的,进线程池的是DrainWorkQueue


cartographer的gflag和glog日志

gflag

CHECK宏.png
先看DEFINE宏,有三个宏:flag的名称,flag的默认值,help字符串。如何查看help信息呢,在执行可执行文件时加上–help选项即可,但是如果要显示help信息,代码里需要调用接口gflags::ParseCommandLineFlags(&argc, &argv, true)

gflags支持的变量类型如下:

  • DEFINE_bool: 布尔类型
  • DEFINE_int32: 32-bit 整型
  • DEFINE_int64: 64-bit 整型
  • DEFINE_uint64: 无符号 64-bit 整型
  • DEFINE_double: double
  • DEFINE_string: C++ string

在程序中使用flag,对于DEFINE过的flag我们可以像使用普通变量那样读写它,只需加一个FLAGS_前缀即可,如下:

1
2
3
4
5
6
7
8
if (FLAGS_is_handsome)
FLAGS_is_handsome = false;
std::cout << FLAGS_hobby << std::endl;

DEFINE_string(configuration_directory, "",
"First directory in which configuration files are searched, "
"second is always the Cartographer installation to allow "
"including files from there.");


1
2
3
4
5
6
CHECK_EQ(x,y)    ==
CHECK_NE(x,y) =
CHECK_LE(x,y) <=
CHECK_LT(x,y) <
CHECK_GE(x,y) >=
CHECK_GT(x,y) >

glog

Google glog是一个基于程序级记录日志信息的c++库,编程使用方式与c++的stream操作类似。 每个级别的日志除了输出到对应日志文件中,还输出到每个低级别日志文件中。默认ERROR和FATAL消息除了输出到日志文件中之外,还会输出到标准错误中
在CMakeLists中的配置.png
每个级别都对应有相应的日志文件,文件默认存放在临时文件中,Linux是 /tmp。运行cartographer之后,可以在/tmp里看到

glog的库文件都在/usr/lib/x86_64-linux-gnu

自定义cartographer的日志格式

glog的明显缺点就是配置输出格式不方便,还要修改源码的logging.cc,重新编译。我下载glog 0.5.0后,修改logging.cc文件再编译安装,结果导致cartographer报错,ScopedRosLogSink::send没有链接,应当是和现有的glog造成了冲突,然后一直解决不了。

然后尝试在ros_log_sink.cc中添加函数,就是把::google::LogSink::ToString重新定义到ros_log_sink.cc,这样避开glog的链接问题。头文件当然也要添加这个成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
string ScopedRosLogSink::ToString(const ::google::LogSeverity severity, const char* file, int line,
const struct ::tm* tm_time,
const char* message, size_t message_len)
{
ostringstream stream(string(message, message_len));
stream.fill('0');

stream << '['
<< setw(2) << 1+tm_time->tm_mon
<< '_'
<< setw(2) << tm_time->tm_mday
<< " "
<< setw(2) << tm_time->tm_hour << ':'
<< setw(2) << tm_time->tm_min << ':'
<< setw(2) << tm_time->tm_sec << " "
<< file << ": " << line << "] ";

stream << string(message, message_len);
return stream.str();
}

tm_time只精确到秒,这样日志就不能精确到毫秒了,不过目前还不需要。ScopedRosLogSink::send的开头改为ToString函数。这样改完后,日志格式可能还不是想要的
默认日志格式.png
后来才发现原来cartographer的日志格式是把glog的格式和ROS的格式融合一起了,所以修改环境变量如下
1
export ROSCONSOLE_FORMAT='[${message}'

修改后的日志.png

几个常用参数

1
2
3
4
5
6
7
8
9
10
11
FLAGS_logtostderr = true; // 设置日志消息是否转到标准输出而不是日志文件

FLAGS_alsologtostderr = true; // 设置日志消息除了日志文件之外是否去标准输出

FLAGS_colorlogtostderr = true; // 设置记录到标准输出的颜色消息(如果终端支持)

FLAGS_logbufsecs = ; // 设置可以缓冲日志的最大秒数,0指实时输出

FLAGS_max_log_size = ; // 设置最大日志文件大小(以MB为单位)

FLAGS_stop_logging_if_full_disk = true; // 设置是否在磁盘已满时避免日志记录到磁盘

cartographer的大部分文件都可以直接使用LOG(INFO)<<"log";这样的语句,因为都提前配置好了,但是最好重新指定日志路径。

  • google::SetLogDestination(google::INFO, "log/INFO_"); // 设置特定级别的日志的输出目录和前缀。第一个参数为日志级别,第二个参数表示输出目录及日志文件名前缀
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <glog/logging.h>

int main(int argc, char** argv)
{
google::InitGoogleLogging(argv[0]);
// INFO
std::string str_info;
// 文件名最后只能是 等级+下划线
str_info.append("/home/user/log/INFO_");
google::SetLogDestination(google::INFO, str_info.c_str());
LOG(INFO)<< "The is a info!";
// WARNING
std::string str_warn;
str_warn.append("/home/user/log/WARNING_");
google::SetLogDestination(google::WARNING, str_warn.c_str());
LOG(WARNING) << "The is a warning!";

// stop glog 可以没有
google::ShutdownGoogleLogging();
}

每次运行cartographer后发现在自定义的路径里,INFO开头的日志文件每次都会新建,而INFO结尾的日志是最新运行得到的日志


后端 4 计算约束的准备工作

接上一篇,看两次计算约束的函数 ComputeConstraint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 根据指定的 nodeid 和 submapid 计算其约束
void PoseGraph2D::ComputeConstraint(const NodeId& node_id,
const SubmapId& submap_id)
{
bool maybe_add_local_constraint = false;
bool maybe_add_global_constraint = false;
const TrajectoryNode::Data* constant_data;
const Submap2D* submap;
{
absl::MutexLock locker(&mutex_);
CHECK(data_.submap_data.at(submap_id).state == SubmapState::kFinished);
if (!data_.submap_data.at(submap_id).submap->insertion_finished() )
{
// Uplink server only receives grids when they are finished,
// so skip constraint search before that.
return;
}
// 获取两个id最新的那个时刻
const common::Time node_time = GetLatestNodeTime(node_id, submap_id);
const common::Time last_connection_time =
data_.trajectory_connectivity_state.LastConnectionTime(
node_id.trajectory_id, submap_id.trajectory_id );
// 如果节点与submap在同一轨迹内或者距离上次全局约束时间较短,则计算局部约束
if (node_id.trajectory_id == submap_id.trajectory_id ||
node_time <
last_connection_time +
common::FromSeconds(
options_.global_constraint_search_after_n_seconds()) )
{
maybe_add_local_constraint = true;
}
// 如果不在同一轨迹内,一定间隔计算全局约束
else if (global_localization_samplers_[node_id.trajectory_id]->Pulse() )
{
maybe_add_global_constraint = true;
}
constant_data = data_.trajectory_nodes.at(node_id).constant_data.get();
submap = static_cast<const Submap2D*>(
data_.submap_data.at(submap_id).submap.get() );
}

  • 建图模式,节点与submap在同一轨迹内 或者 存在一个最近的全局约束把节点的轨迹和子图的轨迹连接起来时,使用local search window计算局部约束

  • 纯定位模式,节点与submap不在同一轨迹,使用全局搜索窗口计算约束(对整体子图进行回环检测) 。纯定位进行慢,主要就是 global_constraint_search_after_n_seconds 较大导致,迟迟不能确认maybe_add_global_constraint为true

local约束在求解时,搜索窗口小,有初值;    global约束在求解时,搜索窗口大,没有初值。 记住二者都是计算 Constraint::INTER_SUBMAP

准备计算局部和全局约束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (maybe_add_local_constraint)
{
const transform::Rigid2d initial_relative_pose =
optimization_problem_->submap_data()
.at(submap_id)
.global_pose.inverse() *
optimization_problem_->node_data().at(node_id).global_pose_2d;
// 添加局部约束,进行闭环匹配,更新相对位置
constraint_builder_.MaybeAddConstraint(
submap_id, submap, node_id, constant_data, initial_relative_pose);
}
// 添加全局约束
else if (maybe_add_global_constraint)
constraint_builder_.MaybeAddGlobalConstraint(
submap_id, submap, node_id, constant_data);

前端得到节点相对于世界的位姿,也可以得到某个子图的世界位姿,因此得到这个节点相对于这个子图的相对位姿,把这个位姿称为 初始位姿 1。 之所以要用世界坐标系作为桥梁,是因为子图和这个节点并不一定在在同一条轨迹坐标系中(local map坐标系)

这里主要是ConstraintBuilder2D类的两个函数: MaybeAddConstraintMaybeAddGlobalConstraint,它们只有细微不同,前者的开头有这样两句:

1
2
3
if (initial_relative_pose.translation().norm() >
options_.max_constraint_distance() )
return;

然后就是其中调用的ConstraintBuilder2D::ComputeConstraint不同,局部约束的是 ComputeConstraint(submap_id, submap, node_id, false, constant_data, initial_relative_pose, *scan_matcher, constraint);

全局约束的是 ComputeConstraint( submap_id, submap, node_id, true, constant_data, transform::Rigid2d::Identity(), *scan_matcher, constraint);,也就是 match full submap

局部约束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void ConstraintBuilder2D::MaybeAddConstraint(
const SubmapId& submap_id, const Submap2D* const submap,
const NodeId& node_id,
const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose)
{
// 参数 max_constraint_distance
// 初始位姿 1 的模不能太大
// 约束并非无限制距离,若子图和节点的距离太远,则无需考虑约束
if (initial_relative_pose.translation().norm() >
options_.max_constraint_distance() )
return;
/* 这是 ConstraintBuilder2D类 唯一使用采样器的地方
ratio 默认 0.3, 参数 sampling_ratio
sampling_ratio越小, Pulse()返回的false越多,更容易return*/
if (!sampler_.Pulse()) return;

absl::MutexLock locker(&mutex_);
// std::unique_ptr<std::function<void(const Result&)>> when_done_;
if (when_done_)
{
LOG(WARNING)
<< "MaybeAddConstraint was called while WhenDone was scheduled.";
}
constraints_.emplace_back(); // 添加空元素
kQueueLengthMetric->Set(constraints_.size() );
// 指针指向,对最后一个元素赋值
auto* const constraint = &constraints_.back();
// fast time scan matcher 在这里
const auto* scan_matcher =
DispatchScanMatcherConstruction(submap_id, submap->grid() );
}

参数max_constraint_distance很重要,如果建图回到同一位置,但没有出现回环,可能是因为过程中的累计误差过大了,大于这个参数,导致没有求 inter 约束。

DispatchScanMatcherConstruction

针对某一个submap_id的submap构建一个扫描匹配器,先看返回类型

1
2
3
4
5
6
7
8
struct SubmapScanMatcher
{
const Grid2D* grid = nullptr;
std::unique_ptr<scan_matching::FastCorrelativeScanMatcher2D>
fast_correlative_scan_matcher;
// 线程池用的 Task
std::weak_ptr<common::Task> creation_task_handle;
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const ConstraintBuilder2D::SubmapScanMatcher*
ConstraintBuilder2D::DispatchScanMatcherConstruction(
const SubmapId& submap_id, const Grid2D* const grid)
{
CHECK(grid);
if (submap_scan_matchers_.count(submap_id) != 0)
return &submap_scan_matchers_.at(submap_id);

// Map of dispatched or constructed scan matchers by 'submap_id'
// std::map<SubmapId, SubmapScanMatcher> submap_scan_matchers_
auto& submap_scan_matcher = submap_scan_matchers_[submap_id];
// 下面都是成员赋值
submap_scan_matcher.grid = grid;
auto& scan_matcher_options = options_.fast_correlative_scan_matcher_options();

auto scan_matcher_task = absl::make_unique<common::Task>();
// 这里也是 work_item 机制
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]() {
submap_scan_matcher.fast_correlative_scan_matcher =
absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
} );

submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
return &submap_scan_matchers_.at(submap_id);
}

这里就是构造了scan_matcher_taskwork_item,返回 scan_matcher

继续看MaybeAddConstraint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 // 线程池
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_)
{
ComputeConstraint(submap_id, submap, node_id, false, /* match */
constant_data, initial_relative_pose, *scan_matcher,
constraint);
});
// constraint_task 依赖 scan_matcher_task
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
// finish_node_task_ 依赖 constraint_task
finish_node_task_->AddDependency(constraint_task_handle);

其实MaybeAddConstraint做的就是下面的工作,接下来的重点就是 ConstraintBuilder2D::ComputeConstraint


二叉树

四叉树和八叉树就是2D和3D的“二分法”,搜索过程与二叉树搜索也类似,二叉树中是将数组sort后存入二叉树中,从而在查找中实现时间复杂度为log2N;四叉树/八叉树是按平面/空间范围划分有序node,将所有points(坐标已知,但是每个点的point在vector中的index可以认为是随机的,没有规律的,所以不能直接根据index取出point(x,y))放入所属node中,实现所有points的sorted,进而在搜索时,实现时间复杂度为logN


后端 3 ComputeConstraintsForNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct Constraint
{
struct Pose
{
transform::Rigid3d zbar_ij;
double translation_weight;
double rotation_weight;
};
SubmapId submap_id; // 'i' in the paper.
NodeId node_id; // 'j' in the paper.
// Pose of the node 'j' relative to submap 'i'.
Pose pose;
// Differentiates between intra-submap (where node 'j' was inserted into
// submap 'i') and inter-submap constraints (where node 'j' was not
// inserted into submap 'i').
enum Tag { INTRA_SUBMAP, INTER_SUBMAP } tag;
};

现在看PoseGraph2D::AddNode最后的计算约束

1
2
3
4
5
6
7
8
WorkItem::Result PoseGraph2D::ComputeConstraintsForNode(
const NodeId& node_id,
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps,
const bool newly_finished_submap)
{
std::vector<SubmapId> submap_ids;
std::vector<SubmapId> finished_submap_ids;
std::set<NodeId> newly_finished_submap_node_ids;

下面的内容都放到一个大括号内了

1
2
3
4
5
6
7
{
absl::MutexLock locker(&mutex_);
const auto& constant_data =
data_.trajectory_nodes.at(node_id).constant_data;
//如果只有1个子图,则返回index为0,否则返回最新的两个连续的submap id
submap_ids = InitializeGlobalSubmapPoses(
node_id.trajectory_id, constant_data->time, insertion_submaps);

这里有重要函数InitializeGlobalSubmapPoses

InitializeGlobalSubmapPoses

这个主要是为了计算论文里的 ,最终返回的是子图Id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
std::vector<SubmapId> PoseGraph2D::InitializeGlobalSubmapPoses(
const int trajectory_id, const common::Time time,
const std::vector<std::shared_ptr<const Submap2D>>& insertion_submaps)
{
CHECK(!insertion_submaps.empty());
// std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem_;
// MapById<SubmapId, SubmapSpec2D> submap_data
const auto& submap_data = optimization_problem_->submap_data();
// insertion_submaps 只有一个子图的情况
// 说明这是该trajectory的第一张 submap
if (insertion_submaps.size() == 1)
{
// If we don't already have an entry for the first submap, add one.
if (submap_data.SizeOfTrajectoryOrZero(trajectory_id) == 0)
{
if (data_.initial_trajectory_poses.count(trajectory_id) > 0)
{
data_.trajectory_connectivity_state.Connect(
trajectory_id,
data_.initial_trajectory_poses.at(trajectory_id).to_trajectory_id,
time);
}
// 计算该submap的global_pose(查找轨迹的初始位姿来计算), 后加入优化问题
// ComputeLocalToGlobalTransform 是 local to global map frame transform
optimization_problem_->AddSubmap(
trajectory_id, transform::Project2D(
ComputeLocalToGlobalTransform(
data_.global_submap_poses_2d, trajectory_id) *
insertion_submaps[0]->local_pose()
)
);
}
CHECK_EQ(1, submap_data.SizeOfTrajectoryOrZero(trajectory_id));
// 该trajectory的第一张 submap, 索引 0
const SubmapId submap_id{trajectory_id, 0};
CHECK(data_.submap_data.at(submap_id).submap == insertion_submaps.front());
return {submap_id};
}
// 前端维护两个子图了
CHECK_EQ(2, insertion_submaps.size());
const auto end_it = submap_data.EndOfTrajectory(trajectory_id);
CHECK(submap_data.BeginOfTrajectory(trajectory_id) != end_it);
const SubmapId last_submap_id = std::prev(end_it)->id;
// submap 是 InternalSubmapData类型的成员
if (data_.submap_data.at(last_submap_id).submap ==
insertion_submaps.front() )
{
// 此时,'last_submap_id' is the ID of 'insertions_submaps.front()'
// 'insertions_submaps.back()' is new,也就是没插入

// 如果有2个子图,把第1个子图的 global_pose 加入优化问题
// 计算第一张子图的 global_pose,last_submap_id已经在后端里
const auto& first_submap_pose = submap_data.at(last_submap_id).global_pose;
// ComputeSubmapPose函数 其实就是 submap.local_pose()
optimization_problem_->AddSubmap(
trajectory_id,
first_submap_pose *
constraints::ComputeSubmapPose(*insertion_submaps[0]).inverse() *
constraints::ComputeSubmapPose(*insertion_submaps[1]) );
return { last_submap_id,
SubmapId{trajectory_id, last_submap_id.submap_index + 1} };
}
// 不是上面两种情况,也就是已经加入了 back()
CHECK(data_.submap_data.at(last_submap_id).submap ==
insertion_submaps.back() );
const SubmapId front_submap_id{trajectory_id,
last_submap_id.submap_index - 1};
CHECK(data_.submap_data.at(front_submap_id).submap ==
insertion_submaps.front());
// 如果不是上面两种情况,那就返回目前最后的两个子图,front_submap_id是更旧的子图
return {front_submap_id, last_submap_id};
}

这里之所以出现三种情况,是之前的AppendNode中的data_.submap_data.Append插入子图。

第1种情况用到的ComputeLocalToGlobalTransform在前面讲过。

第2种情况的公式实际是 , 也就是 ,把现有的local to global map的变换用作新子图的位姿计算。

第3种很简单,相当于没处理全局位姿。

计算 INTRA_SUBMAP

返回继续看 ComputeConstraintsForNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CHECK_EQ(submap_ids.size(), insertion_submaps.size());
const SubmapId matching_id = submap_ids.front();
// node(scan) 在 local map坐标系的位姿
const transform::Rigid2d local_pose_2d =
transform::Project2D(constant_data->local_pose *
transform::Rigid3d::Rotation(
constant_data->gravity_alignment.inverse() ) );
// 全局位姿,通过优化后的 submap全局位姿和 local位姿求出矩阵
// ComputeSubmapPose返回 参数子图的 local_pose
const transform::Rigid2d global_pose_2d =
optimization_problem_->submap_data().at(matching_id).global_pose *
constraints::ComputeSubmapPose(*insertion_submaps.front()).inverse() *
local_pose_2d;
// 优化器中增加轨迹节点信息
optimization_problem_->AddTrajectoryNode(
matching_id.trajectory_id,
optimization::NodeSpec2D{constant_data->time, local_pose_2d,
global_pose_2d,
constant_data->gravity_alignment} );

for (size_t i = 0; i < insertion_submaps.size(); ++i)
{
const SubmapId submap_id = submap_ids[i];
// Even if this was the last node added to 'submap_id', the submap will
// only be marked as finished in 'data_.submap_data' further below.
CHECK( data_.submap_data.at(submap_id).state ==
SubmapState::kNoConstraintSearch );
data_.submap_data.at(submap_id).node_ids.emplace(node_id);
// node_id的 scan pose 在子图中的 相对位置,就是后面的 intra constrait
const transform::Rigid2d constraint_transform =
constraints::ComputeSubmapPose(*insertion_submaps[i]).inverse() *
local_pose_2d;
/* 遍历matching结果的所有submap,其新增加的轨迹节点必定组成了其submap,
故其约束为 INTRA_SUBMAP 类型约束,可直接将相对位置加入约束队列中 */
data_.constraints.push_back(
Constraint{ submap_id, node_id,
{ transform::Embed3D(constraint_transform),
options_.matcher_translation_weight(),
options_.matcher_rotation_weight()},
Constraint::INTRA_SUBMAP} );
}

这里是计算 intra 约束的地方。

匹配的过程就是建立约束的过程,也就是回环检测的过程,因为Cartographer 采用的是图优化,有节点和约束的概念,,节点可以理解为激光点的位姿,约束就是激光数据与子图之间相对位姿。

一共建立3种约束:

  • 匹配结果的所有子图,必定包含了新增加的轨迹节点,它们的约束是 INTRA_SUBMAP
  • 新增的节点和已经完成约束计算的子图都要计算约束
  • 对于新的子图不再添加新的轨迹节点时,建立其他节点和该子图之间的约束

后两个就是下面的两次ComputeConstraint函数,是INTER约束

子图在后台线程有两种状态 kNoConstraintSearchkFinished。对于前者的子图,只有内部node去匹配;对后者的子图,所有node(包括新node)都匹配这个子图

两次 ComputeConstraint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for (const auto& submap_id_data : data_.submap_data)
{
// 完成过约束计算的子图
if (submap_id_data.data.state == SubmapState::kFinished)
{
// 不包含当前的节点 node_id
CHECK_EQ(submap_id_data.data.node_ids.count(node_id), 0);
finished_submap_ids.emplace_back(submap_id_data.id);
}
}
// 前端匹配输出的submap是刚完成的一个,即维护的两个submap中front不再更新
if (newly_finished_submap)
{
const SubmapId newly_finished_submap_id = submap_ids.front();
InternalSubmapData& finished_submap_data =
data_.submap_data.at(newly_finished_submap_id);
CHECK(finished_submap_data.state == SubmapState::kNoConstraintSearch);
// 将新的submap设置为finished,表明已经增加约束条件了
finished_submap_data.state = SubmapState::kFinished;
newly_finished_submap_node_ids = finished_submap_data.node_ids;
}
}

要遍历的submap_dataMapById<SubmapId, InternalSubmapData>,该容器记录了所有的子图数据及其内部节点,InternalSubmapData除了描述了子图的数据之外还记录了所有内部的节点,成员是

1
2
3
4
5
6
7
8
// enum class SubmapState { kActive, kFinished };    
struct InternalSubmapData
{
std::shared_ptr<const Submap2D> submap;
// 所有直接插入submap的节点
std::set<NodeId> node_ids;
SubmapState state = SubmapState::kActive;
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  // 新加入的节点与之前所有完成约束计算的子图,均计算一次约束
for (const auto& submap_id : finished_submap_ids)
ComputeConstraint(node_id, submap_id);
/* 每次新的子图完成,不再更新即不再添加新的轨迹节点时,
则需要计算此子图与所有优化位姿图node之间的约束 */
if (newly_finished_submap)
{
const SubmapId newly_finished_submap_id = submap_ids.front();
// We have a new completed submap, so we look into
// adding constraints for old nodes
for (const auto& node_id_data : optimization_problem_->node_data() )
{
const NodeId& node_id = node_id_data.id;
if (newly_finished_submap_node_ids.count(node_id) == 0)
{
ComputeConstraint(node_id, newly_finished_submap_id);
}
}
}

子图的状态主要是给后台的线程提供的。一开始子图的状态都是kActive的,当它完成之后(kFinished的状态)就会与所有旧的节点 计算约束

此外新增的节点也会与所有完成的子图 计算约束。这一次可以理解为是进行闭环检测,根据节点和循环中的各个子图之间的情况不同,可能构建的是全局约束,也可能是局部约束。 回环检测是一种全局约束。

PoseGraph2D::ComputeConstraint只有这里的两次调用。

最后部分

1
2
3
4
5
6
7
8
9
10
11
12
13
  // 每次新添加一个节点时,均需执行,通知约束构建器。
constraint_builder_.NotifyEndOfNode();
absl::MutexLock locker(&mutex_);
// 记录距离上次闭环处理增加的节点个数
++num_nodes_since_last_loop_closure_;
// 当新增加节点个数大于阈值时,则调用优化处理结果
if (options_.optimize_every_n_nodes() > 0 &&
num_nodes_since_last_loop_closure_ > options_.optimize_every_n_nodes())
{
return WorkItem::Result::kRunOptimization;
}
return WorkItem::Result::kDoNotRunOptimization;
}

添加节点和约束是实时的,但是优化不是,更不需要每个节点都进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void ConstraintBuilder2D::NotifyEndOfNode()
{
absl::MutexLock locker(&mutex_);
CHECK(finish_node_task_ != nullptr);
// 完成一个后端节点
finish_node_task_->SetWorkItem([this] {
absl::MutexLock locker(&mutex_);
++num_finished_nodes_;
});
auto finish_node_task_handle =
thread_pool_->Schedule(std::move(finish_node_task_));
// 重新开始添加节点任务
finish_node_task_ = absl::make_unique<common::Task>();
when_done_task_->AddDependency(finish_node_task_handle);
++num_started_nodes_;
}

这里就是处理finish_node_task_,每次完成一个node约束计算时均会进行调用。目的是完成finish_node_task_,同时开启一个新的task用于下一次约束任务。


后端 7 分支定界

算法思想

在Local SLAM中,通过在子图中的Scan-to-map匹配过程可以得到了一个比较理想的机器人位姿估计。 但由于Local SLAM只使用了一段时间内的局部信息,所以定位误差还会随着时间累积。为了能够进一步的降低局部误差累积的影响,Cartographer还需要通过Pixel-accurate扫描匹配来进行闭环检测, 进一步的优化机器人的位姿估计。 分支定界时,地图一定要完整

为雷达扫描到的 个hit点集合, 是第k个hit点在scan frame下的坐标,在地图坐标系下的坐标表示为

其中 ,用表示位姿估计描述的坐标变换。

Pixel-accurate扫描匹配问题可以用下式来描述:

其中 是一个搜索窗口, 是离最近的栅格单元的占用概率。也就是在搜索窗口中找到一个最优的位姿,使得hit点集合出现的概率最大化。


这里和前端的实时相关性匹配有部分相似,但是多分辨率地图和分支定界是新加入的。机器人的位姿是用表示,对于暴力匹配法来说,该方法的搜索步长为1,搜索窗口的栅格索引集合 可以通过笛卡尔积枚举:

其中是x 和 y方向上的最大搜索索引,是搜索角度的最大索引。搜索窗口就可以用集合 表示。 是搜索中心,也是机器人位姿的初始估计, 分别是位移和角度的搜索步长。

如果直接用暴力搜索就太复杂了,所以使用深度优先的分支定界法。使用一个满四叉树的概念,这跟二叉树是类似的。

其根节点代表整个搜索窗口 。树中的每一个节点的孩子都是对该节点所代表的搜索空间的一个划分, 每个叶子节点都对应着一个解。

树共有8层,分别表示从顶层()到底层/叶子(),树的深度depth每增加1,搜索步长step就减半,表示查找的分辨率越高。每次对x和y都减半操作,像分田一样。如果栅格地图分辨率是0.05m(一个栅格的物理长度),step与depth的关系如下:


比如在树的最底层,, 此时 ,显然只有1个解。

对于一个candidate,可构建四个子Candidate。C层搜索空间:

搜索树上的每个节点的上界可以通过下式计算得到:

Cartographer仍然采取了一种空间换时间的方法来进一步的提高搜索效率。预算图(precomputed grids)又叫膨胀图,从上面我们知道了树的每一层都对应一个步长,根据这个步长,生成一个对应的膨胀图。步长越大,膨胀级别越高。 根节点的分辨率是最粗的。Cartographer还为每个子图预先计算了不同尺度下的占用概率,以后的搜索过程只需要简单的查表就可以完成了。

需要注意的是膨胀不会改变栅格图的分辨率,改变的仅仅是栅格图中像素值而已。这个膨胀的过程比较复杂,源码里是用一种滑动窗口的机制实现的。

整个搜索过程借助了一个栈{C}来完成,首先将整个搜索空间分割,得到初始的子空间节点集合 。 然后对集合 中的节点根据它们的上界进行排序,并依次入栈{C}保证得分最高的节点在栈顶的位置。这样可以优先搜索最有可能的解空间。



最底层的节点是所有候选的评分, 这是以两个叶子节点做示范。 父节点的贪心得分永远大于子节点的贪心得分。从而保证,一旦父节点的贪心得分小于best_score,那么这个父节点的子树全部被剪枝,因为其子树的叶子节点的得分肯定也小于best_score。

源码

接上一篇的分支定界部分

1
2
3
4
5
const Candidate2D best_candidate = BranchAndBound(
discrete_scans, search_parameters,
lowest_resolution_candidates,
precomputation_grid_stack_->max_depth(),
min_score);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 初始传入的candidates为最上层的候选解
// 初始 candidate_depth = 7
// min_score 在lua里设置
Candidate2D FastCorrelativeScanMatcher2D::BranchAndBound(
const std::vector<DiscreteScan2D>& discrete_scans,
const SearchParameters& search_parameters,
const std::vector<Candidate2D>& candidates,
const int candidate_depth,
float min_score) const
{
// 检查是否是最底层(叶子层),如果已经查找到最底层了,则返回分数最高的候选解
if (candidate_depth == 0)
return *candidates.begin();
// Candidate2D的构造
// 参数分别为:0:scan_index,即旋转序号
// 0:x方向的偏移序号 0:y方向的偏移数 search_parameters:搜索参数
Candidate2D best_high_resolution_candidate(0, 0, 0, search_parameters);

best_high_resolution_candidate.score = min_score;
// 对传入的候选人candidate进行循环,即从最上层开始遍历
for (const Candidate2D& candidate : candidates)
{
// 将叶子节点与某棵分枝树的非叶子节点进行比较,如果非叶子节点的分数小于
// 之前选出的最高分叶子节点,则直接将此非叶子节点candidate及其子树全部砍掉
if(candidate.score <= min_score)
break;

// 一个容器,盛放这个节点引出的四个下一层的候选者
std::vector<Candidate2D> higher_resolution_candidates;

// 区域边长右移,相当于步长减半,进行分枝
const int half_width = 1 << (candidate_depth - 1);

// 对x、y偏移进行遍历,求出这一个candidate的四个子节点候选人(即最上面遍历的那个元素)
for (int x_offset : {0, half_width} )
{ // 只能取0和half_width
if (candidate.x_index_offset + x_offset >
search_parameters.linear_bounds[candidate.scan_index].max_x)
// 超出边界则break
break;
for (int y_offset : {0, half_width} )
{ // 只能取0和half_width xy一共遍历四个子节点
if (candidate.y_index_offset + y_offset >
search_parameters.linear_bounds[candidate.scan_index].max_y)
// 超出边界则break
break;

// 候选者依次推进来,一共4个
// 可以看出,分枝定界方法的分枝是向右下角的四个子节点进行分枝
higher_resolution_candidates.emplace_back(
candidate.scan_index, candidate.x_index_offset + x_offset,
candidate.y_index_offset + y_offset, search_parameters);
}
}
// 对candidate四个子节点进行打分,并将
// higher_resolution_candidates 按照score从大到小的顺序进行排列
ScoreCandidates(precomputation_grid_stack_->Get(candidate_depth - 1),
discrete_scans, search_parameters,
&higher_resolution_candidates);
// 开始递归
best_high_resolution_candidate = std::max(
best_high_resolution_candidate,
BranchAndBound(discrete_scans, search_parameters,
higher_resolution_candidates, candidate_depth - 1,
best_high_resolution_candidate.score));
}
return best_high_resolution_candidate;
}

第一步应先求取顶层的解及其对应评分(即可能位置和对应匹配置信度)。每层的当前节点的对应的评分均大于等于其所有下层枝叶节点,即上边界。由于不同分辨率地图存储格式,显然满足上边界条件。低分辨地图下的匹配置信度显然高于下层的高分辨地图下的匹配,然后采用迭代方法裁剪枝叶,直到遍历所有叶子节点。

在地图预处理时,其分辨率按照2的层数次方进行压缩的,由于地图有x和y两个方向,因此此层的一个节点,在下层会分为4个节点,即分辨率会放大2倍

参考:分枝定界图解