Eigen(二) 旋转,平移,特征值

旋转

有旋转向量、四元数和旋转矩阵三种表示方法,在Eigen里用前两种进行定义,这样参数少,可以转化为第三种。

牢记下面这张图,根据《视觉SLAM十四讲》总结:
三种方式的转换

使用的是模板类:class Eigen::AngleAxis< Scalar >,这被称为角轴表示法,顾名思义给定旋转角度和旋转轴即可。其创建方式符合Eigen的原则,传入数据类型作为模板参数。旋转角度以弧度表示,旋转轴为Vector类型的向量,注意向量必须要被归一化(vec.normalized()即可)。Eigen中也预先定义好了AngleAxisdAngleAxisf两种方便使用。

参考:基于eigen实现欧拉角(RPY), 旋转矩阵, 旋转向量, 四元数之间的变换

1
2
3
4
5
6
7
8
9
Vector3f v1 = Vector3f::UnitX();

AngleAxisf aa(45*3.1415/180, v1);
// 返回弧度制表示的旋转角度 和 旋转轴(列向量)
cout << aa.angle() <<" "<<endl<< aa.axis() <<endl;
cout << aa.toRotationMatrix()<<endl; // 转换为3 × 3的旋转矩阵

// fromRotationMatrix():从一个3×3的旋转矩阵构建AngleAxis对象。也就是提取旋转矩阵所表示的旋转角
// inverse(): 返回以AngleAxis表示的当前旋转的逆旋转

Quaternion是四元数表示法,Quaternion的构造是标准Eigen格式:数据类型作为模板参数+构造参数,而且重载了多个构造函数,因此可以方便地从角轴、旋转矩阵等数据类型进行构造。

Eigen提供了Quaternionf和Quaterniond方便使用。Quaterniond的初始化:

1
2
3
4
5
Eigen::Quaterniond q1(x, y, z, w);

Eigen::Quaterniond q2(Vector4d(x, y, z, w));

Eigen::Quaterniond q2(Matrix3d(R));

在Quaternion内部的保存中,虚部在前,实部在后

如果以第一种方式构造四元数,则实部是x,虚部是[y,z,w]。 对于第二种方式,则实部是w,虚部是[x,y,z]; 对于第三种方式,则是用3x3的旋转矩阵初始化四元数。

所以最常用是 Eigen::Quaterniond q1(w, x, y, z); ,四个数的传入顺序是w、x、y、z,对应w+xi+yj+zk

Eigen::Quaterniond 不能使用cout

  • Identity(): 返回一个表示单位旋转的四元数

  • w(): 返回四元数中的w分量。 同理 x(), y(), z()是返回相应的分量。

  • coeffs(): 返回四元数的四个数,可以对其进行索引[]获取值。 需要注意的是返回顺序是x、y、z、w,和构造函数的不同

  • toRotationMatrix() 将当前Quaternion对象转换为3×3的旋转矩阵。另外,没有fromRotationMatrix()函数,只能用构造函数传入旋转矩阵

  • slerp():对两个四元数进行球面线性插值(Spherical linear interpolation,通常简称Slerp),是四元数的一种线性插值运算,主要用于在两个表示旋转的四元数之间平滑差值。

欧拉角 —— 旋转矩阵

可以让三个AngleAxis相乘:

1
2
3
4
5
6
// 参数顺序和 static_transform_publisher 的一致
Eigen::Vector3f angle(roll, pitch, yaw);
Eigen::Matrix3d rotation;
rotation = Eigen::AngleAxisd(angle[0], Eigen::Vector3d::UnitZ()) *
Eigen::AngleAxisd(angle[1], Eigen::Vector3d::UnitY()) *
Eigen::AngleAxisd(angle[2], Eigen::Vector3d::UnitX()) ;

这样计算的结果和每个AngleAxisf转换成旋转矩阵后再相乘得到的结果是相同的

旋转矩阵转欧拉角的结果可能不是想要的,最好不使用

四元数 —— 旋转矩阵

其实这样更简单,我们可以直接从tf2_ros::Buffergeometry_msgs::TransformStamped直接获得四元数

1
2
3
4
5
6
Eigen::Quaterniond quaternion(transform.transform.rotation.w,
transform.transform.rotation.x,
transform.transform.rotation.y,
transform.transform.rotation.z
);
Eigen::Matrix3d rotation = quaternion.matrix();

平移

采用Eigen标准构造原则,构造示例如下:

1
2
3
4
Translation<float,2>(tx, ty)
Translation<float,3>(tx, ty, tz)
Translation<float,N>(s)
Translation<float,N>(vecN)

多个平移合并用乘号而不是加号,需要注意。和旋转相比,它的成员函数也相对少一些:

  • x():获取平移的x分量(可修改)

  • y():获取平移的y分量(可修改)

  • z():获取平移的z分量(可修改),注意,千万不要对一个二维的Translation获取z分量,否则直接会运行报错

  • vector()&.translation():返回当前平移的向量表示(可修改),可以索引[]获取各分量

1
2
3
4
5
Translation2d t1(1,4);
Translation2d t2(2,7);
Translation2d t3;
t3=t1*t2;
cout<<t3.translation()<<endl;

计算特征值与特征向量

构造一个EigenSolver,然后分别调用其成员函数.eigenvalues()、.eigenvectors()即可获得特征值与特征向量。

1
2
3
4
5
6
7
8
9
#include <Eigen/Eigenvalues>

A << 2,-2,0, -2,1,-2, 0,-2,0;
EigenSolver<Matrix3d> eigensolver(A);
if (eigensolver.info() == Success)
{
cout << eigensolver.eigenvalues() << endl<<endl;
cout << eigensolver.eigenvectors() << endl;
}

输出结果并不是我们平时熟悉的形式,而是

1
2
3
4
5
6
7
(4,0)
(1,0)
(-2,0)

(-0.666667,0) (-0.666667,0) (-0.333333,0)
(0.666667,0) (-0.333333,0) (-0.666667,0)
(-0.333333,0) (0.666667,0) (-0.666667,0)

实际特征值是4,1,-2. 4对应的特征向量分别为(-0.666667, -0.666667,-0.333333)

参考:
C++ Eigen库计算矩阵特征值及特征向量
Eigen函数与Matlab对应关系


彻底解决 sudo rosdep init 和 rosdep update失败问题

如果sudo rosdep initrosdep update没有执行成功,有些ROS功能会无法执行,比如rqt_tf_tree会报错:

1
2
the rosdep view is empty: call 'sudo rosdep init' and 'rosdep update'
[WARN] wait_for_service(/rqt_gui_py_node_26204/tf2_frames): failed to contact, will keep trying

这两个老大难其实都是网络的问题,sudo rosdep init的目的是为了在/etc目录下新建/ros/rosdep/sources.list.d/20-default.list文件。

小鱼基于rosdep源码制作了rosdepc,专门服务国内ROS用户,使用pip3 和 rosdepc 可以彻底消灭这个问题

1
2
3
4
5
sudo apt-get install python3-pip 
sudo pip3 install rosdepc

sudo rosdepc init
rosdepc update


rosdep update根据20-default.list文件中的网址链接去下载相应的文件,最终在/etc/ros/rosdep下放5个文件,分别是base.yaml,gentoo.yaml,osx-homebrew.yaml,python.yaml,ruby.yaml. 但又做了一些配置,所以不是下载文件能解决的。

以下步骤先从网上下载rosdistro文件夹,放到/etc/ros

执行sudo rosdep init后报错

1
2
ERROR: cannot download default sources list from:
https://raw.githubusercontent.com/ros/rosdistro/master/rosdep/sources.list.d/20-default.list Website may be down

从网上直接下载这个文件,然后放到/etc/ros/rosdep/sources.list.d,再修改如下:

1
2
3
4
5
6
7
8
9
10
11
# os-specific listings first

yaml file:///etc/ros/rosdistro/rosdep/osx-homebrew.yaml osx

# generic
yaml file:///etc/ros/rosdistro/rosdep/base.yaml
yaml file:///etc/ros/rosdistro/rosdep/python.yaml
yaml file:///etc/ros/rosdistro/rosdep/ruby.yaml
gbpdistro file:///etc/ros/rosdistro/releases/fuerte.yaml fuerte

# newer distributions (Groovy, Hydro, ...) must not be listed anymore, they are being fetched from the rosdistro index.yaml instead

rosdep update 报错

kinetic 要修改python2.7的路径, melodic和noetic要修改python3 ,当然另一种方法是从其他电脑拷贝已经生成的文件。

kinetic 的修改

1
2
reading in sources list data from /etc/ros/rosdep/sources.list.d
ERROR: unable to process source

对下面三个文件做修改
1
2
3
4
5
6
7
8
9
10
/usr/lib/python2.7/dist-packages/rosdep2/gbpdistro_support.py
修改: FUERTE_GBPDISTRO_URL = 'file:///etc/ros/rosdistro/releases/fuerte.yaml'


/usr/lib/python2.7/dist-packages/rosdep2/rep3.py
修改: REP3_TARGETS_URL = 'file:///etc/ros/rosdistro/releases/targets.yaml'


/usr/lib/python2.7/dist-packages/rosdistro/__init__.py
修改: DEFAULT_INDEX_URL = 'file:///etc/ros/rosdistro/index-v4.yaml'

启动新终端,再次执行rosdep update

melodic和noetic 的修改:clone代码仓https://github.com/ros/rosdistro到本地,并更改其文件rosdep/sources.list.d/20-default.list,将其url改成本地文件路径,内容类似如下:

1
2
3
4
5
6
7
8
# os-specific listings first
yaml file:///home/baby/rosdistro/rosdep/osx-homebrew.yaml osx

# generic
yaml file:///home/baby/rosdistro/rosdep/base.yaml
yaml file:///home/baby/rosdistro/rosdep/python.yaml
yaml file:///home/baby/rosdistro/rosdep/ruby.yaml
gbpdistro file:///home/baby/rosdistro/releases/fuerte.yaml fuerte

由于rosdep使用python3,可直接该动源码。我们需要改动三个文件:

  • /usr/lib/python3/dist-packages/rosdep2/sources_list.py,修改 DEFAULT_SOURCES_LIST_URL = 'file:///home/baby/rosdistro/rosdep/sources.list.d/20-default.list'

  • /usr/lib/python3/dist-packages/rosdep2/rep3.py,修改 REP3_TARGETS_URL = 'file:///home/baby/rosdistro/releases/targets.yaml'

  • /usr/lib/python3/dist-packages/rosdep2/gbpdistro_support.py,修改 FUERTE_GBPDISTRO_URL = 'file:///home/baby/rosdistro/releases/fuerte.yaml'

  • /usr/lib/python3/dist-packages/rosdistro/__init__.py,修改 DEFAULT_INDEX_URL = 'file:///home/baby/rosdistro/index-v4.yaml'

完成后先把/etc/ros/rosdep/sources.list.d/20-default.list删除,再执行sudo rosdep initrosdep update就可以成功了。

参考:
解决 rosdep update


ubuntu的显卡及驱动

ubuntu-drivers devices查看显卡硬件型号,也就是其中的model。 如果同意安装推荐版本,那我们只需要终端输入:sudo ubuntu-drivers autoinstall 就可以自动安装了。
当然我们也可以使用 apt 命令安装自己想要安装的版本,比如我想安装 340 这个版本号的版本,终端输入:sudo apt install nvidia-340 就自动安装了。

lshw -numeric -C display | grep product也可以查看电脑显卡的十六进制ID

驱动安装

电脑装好好,启动一直失败,报错failed to start User Manager for UID 121,连文本界面都进不去,后来才知道是ubuntu下缺少显卡驱动

先完全卸载之前安装的显卡驱动

ppa源文件卸载:sudo apt-get remove --purge nvidia*

runfile源文件卸载:sudo ./NVIDIA-Linux-x86_64-384.59.run --uninstall

ppa源驱动安装

先查询电脑最适合的显卡驱动版本,命令 ubuntu-drivers devices,这个命令不仅适用于显卡

最佳显卡驱动版本为 nvidia-driver-435, 用命令行进行安装

1
2
3
4
sudo add-apt-repository ppa:graphics-drivers/ppa
sudo apt-get update
sudo apt-get install nvidia-driver-435 #此处数字要对应上面查询到的版本号
sudo apt-get install mesa-common-dev

如果前面没有禁用secure boot,则在安装过程中会提示设置一个密码,在重启时需要输入密码验证以禁用secure boot,重启后会出现蓝屏,这时候不能直接选择continue, 而应该按下按键,选择Enroll MOK, 确认后在下一个选项中选择continue,接着输入安装驱动时设置的密码,开机。

安装完成后重启,nvidia-smi命令显示驱动信息,若出现驱动版本,就是成功

官网驱动下载

到 NVIDIA 的官网下载相应型号的驱动,官网地址
操作系统那里,一定选择Linux 64-bit

需要先安装一些 NVIDIA 显卡依赖的软件,在终端依次执行如下命令:

1
2
3
sudo dpkg --add-architecture i386
sudo apt update
sudo apt install build-essential libc6:i386

Ubuntu 系统默认安装好是使用的一个开源的驱动:nouveau,我们需要先禁用这个开源驱动,方法如下,依次执行:
1
2
sudo bash -c "echo blacklist nouveau > /etc/modprobe.d/blacklist-nvidia-nouveau.conf"
sudo bash -c "echo options nouveau modeset=0 >> /etc/modprobe.d/blacklist-nvidia-nouveau.conf"

重启一下系统,运行之前下载的驱动。可能会出现下面的信息:
1
2
3
The distribution-provided pre-install script failed!
Are you sure you want to continue? -> CONTINUE INSTALLATION
Would you like to run the nvidia-xconfig utility? -> YES

安装完成后重启系统就可以点击软件列表中的 NVIDIA 的配置软件配置显卡驱动了,如果你遇到
1
WARNING: Unable to find suitable destination to install 32-bit compatibility libraries

解决办法:
1
2
3
sudo dpkg --add-architecture i386
sudo apt update
sudo apt install libc6:i386


某次从Nvidia网站下载安装了显卡对应的驱动,结果重启之后进不了系统,一直黑屏。只好进recovery模式,然后再进入Shell prompt模式卸载驱动

1
2
sudo apt-get --purge remove "*nvidia*"
sudo /usr/bin/nvidia-uninstall

打开文件/etc/gdm3/custom.conf,将 #WaylandEnable=false 前的#去掉。重启之后又进不了系统了,只好又卸载。

参考: ubuntu安装和卸载Nvidia显卡驱动


cartographer的缺点
abstract Welcome to my blog, enter password to read.
Read more
最后阶段 RunFinalOptimization

现在返回到最初的node_main.cc中的Run函数,还有一句node.RunFinalOptimization();,所有轨迹结束时,再执行一次全局优化。 其实就是MapBuilderBridge::RunFinalOptimization —— PoseGraph2D::RunFinalOptimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void PoseGraph2D::RunFinalOptimization()
{
{
// 参数 max_num_final_iterations 默认 200
// 优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
// 参数 max_num_iterations, 默认 50
// 不优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
// 另一个调用地方在 PoseGraph2D 析构
WaitForAllComputations();
}

在建图结束之后会运行一个新的全局优化,不要求实时性,迭代次数多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void PoseGraph2D::WaitForAllComputations()
{
int num_trajectory_nodes;
{
absl::MutexLock locker(&mutex_);
num_trajectory_nodes = data_.num_trajectory_nodes;
}
const int num_finished_nodes_at_start =
constraint_builder_.GetNumFinishedNodes();

auto report_progress = [this, num_trajectory_nodes,
num_finished_nodes_at_start]() {
// Log progress on nodes only when we are actually processing nodes.
if (num_trajectory_nodes != num_finished_nodes_at_start) {
std::ostringstream progress_info;
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
<< 100. *
(constraint_builder_.GetNumFinishedNodes() -
num_finished_nodes_at_start) /
(num_trajectory_nodes - num_finished_nodes_at_start)
<< "%...";
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
}
};

  • num_finished_nodes_at_start表示此时已经优化的节点数,假设为 10

  • GetNumFinishedNodes返回当前已经优化的节点数,假设为 11,也就是这个函数开始后,优化了1个节点

  • num_trajectory_nodes,轨迹的总节点数

progress_info 输出的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  // First wait for the work queue to drain so that it's safe to schedule
// a WhenDone() callback.
{
const auto predicate = [this]()
EXCLUSIVE_LOCKS_REQUIRED(work_queue_mutex_) {
return work_queue_ == nullptr;
};
absl::MutexLock locker(&work_queue_mutex_);
while (!work_queue_mutex_.AwaitWithTimeout(
absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
}

// Now wait for any pending constraint computations to finish.
absl::MutexLock locker(&mutex_);
bool notification = false;
constraint_builder_.WhenDone(
[this,
&notification](const constraints::ConstraintBuilder2D::Result& result)
LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end());
notification = true;
});
const auto predicate = [&notification]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return notification;
};
while (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes);
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
}

就是把ceres的迭代次数设置为200,其他部分其实不用太深入研究了。


后端的线程池 3 HandleWorkQueue和优化

优化的具体实现在优化器optimization_problem_。将优化的结果采用回调的方式,返回到result中。
后端将数据加入到 optimization_problem_ 的对应传感器队列中,并按时间排列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void PoseGraph2D::HandleWorkQueue(
const constraints::ConstraintBuilder2D::Result& result)
{
{ // 将新的约束添加到全局约束队列中
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end() );
}
// ceres 在这里
RunOptimization();
// 以下是 省略 的内容
//如果设置了全局优化回调函数,则进行调用
//根据约束结果,更新轨迹间的链接关系
// 优化完成后,累计节点清零
// 计算相同轨迹的 inter_constraints_same_trajectory
// 计算不同轨迹的 inter_constraints_different_trajectory

// 优化结束后,重新开启任务队列,即继续执行
// work_queue_里的 work_item
DrainWorkQueue();
}

RunOptimization

优化的实际就是成员变量node_data_submap_data_,也就是

1
2
3
4
5
6
7
8
9
10
11
12
struct NodeSpec2D
{
common::Time time;
transform::Rigid2d local_pose_2d;
transform::Rigid2d global_pose_2d;
Eigen::Quaterniond gravity_alignment;
};

struct SubmapSpec2D
{
transform::Rigid2d global_pose;
};

1
2
3
4
5
6
7
8
9
10
11
12
void PoseGraph2D::RunOptimization()
{
if (optimization_problem_->submap_data().empty() )
return;
// std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem_;

// No other thread is accessing the optimization_problem_,
// data_.constraints, data_.frozen_trajectories and data_.landmark_nodes
// when executing Solve.
// 调用优化,Ceres在这里面,由于耗时间,故没加锁,防止阻塞其他线程
optimization_problem_->Solve(data_.constraints,
GetTrajectoryStates(), data_.landmark_nodes);

将所有内部约束和外部约束合并在一起执行Solve
遍历所有submap,建立参数块;遍历所有node,建立参数块。
根据约束,添加残差函数;
处理里程计问题,添加可能的残差;
求解返回结果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  absl::MutexLock locker(&mutex_);
// 优化后所有的submap和node数据
const auto& submap_data = optimization_problem_->submap_data();
const auto& node_data = optimization_problem_->node_data();
/* 遍历所有优化后的轨迹的所有节点位姿 */
for (const int trajectory_id : node_data.trajectory_ids())
{
for (const auto& node : node_data.trajectory(trajectory_id) )
{
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node.id);
// 更新所有节点的 全局位姿
mutable_trajectory_node.global_pose =
transform::Embed3D(node.data.global_pose_2d) *
transform::Rigid3d::Rotation(
mutable_trajectory_node.constant_data->gravity_alignment);
}

// Extrapolate all point cloud poses that were not included in the
// 'optimization_problem_' yet. 注意是已经加入tarjectory但是还没有进行优化的新Node
// 由于使用采样器建立约束,有的node没有建立约束,其pose就不会被优化
// 因此,要通过已经优化的位姿转换关系来修正所有的node

// 子图的local到global的新的转移矩阵
const auto local_to_new_global =
ComputeLocalToGlobalTransform(submap_data, trajectory_id);
// 旧的转移矩阵
const auto local_to_old_global = ComputeLocalToGlobalTransform(
data_.global_submap_poses_2d, trajectory_id);
const transform::Rigid3d old_global_to_new_global =
local_to_new_global * local_to_old_global.inverse();
// 上次最后一个优化的节点
const NodeId last_optimized_node_id =
std::prev(node_data.EndOfTrajectory(trajectory_id))->id;
auto node_it =
std::next(data_.trajectory_nodes.find(last_optimized_node_id));
// 后续未优化的节点的全局pose进行转移
for (; node_it != data_.trajectory_nodes.EndOfTrajectory(trajectory_id);
++node_it)
{
// TrajectoryNode类型, 指针
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node_it->id);
修正 global_pose
mutable_trajectory_node.global_pose =
old_global_to_new_global * mutable_trajectory_node.global_pose;
}
}
// 更新所有路标位姿
for (const auto& landmark : optimization_problem_->landmark_data()) {
data_.landmark_nodes[landmark.first].global_landmark_pose = landmark.second;
}
data_.global_submap_poses_2d = submap_data;
}

优化并非是实时,是在后台进行运行的,并且需要一定的时间。因此当完成优化时,前端输出结果已经对整个位姿图个数进行了增加。后面新加入的节点并未优化,所以返回优化的结果没有最新加入轨迹节点对应的结果。因此采用优化后结果中最后一个轨迹节点的位姿的转移矩阵,作为未参与优化轨迹节点的转移矩阵进行更新。

其中 为参与优化的节点,优化前位姿为 ,优化后的位姿为 。而 则为未参与优化的节点。


后端 5 计算约束

我的理解是,求inter约束也是一个 scan to map 的过程,找到和点云最相似的不同时间的子图,也就是找回环。其实是和lidar_localization的后端用NDT找关键帧的匹配是类似的,但是cartographer是点云和栅格地图匹配,不像点云匹配那样直观,分支定界的score就像NDT匹配的score,不过前者越大越好,后者越小越好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ConstraintBuilder2D::ComputeConstraint(
const SubmapId& submap_id, const Submap2D* const submap,
const NodeId& node_id, bool match_full_submap,
const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose,
const SubmapScanMatcher& submap_scan_matcher,
std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint)
{
CHECK(submap_scan_matcher.fast_correlative_scan_matcher);
/* node在local坐标系的位姿 = 子图在local坐标系中的位姿 * node在子图的位姿*/
const transform::Rigid2d initial_pose =
ComputeSubmapPose(*submap) * initial_relative_pose;

float score = 0.;
transform::Rigid2d pose_estimate = transform::Rigid2d::Identity();

整个函数是为了计算constraint_transform(节点 j 和子图 i的关系) ,需要的参数有:

  • 节点 j 的filtered_gravity_aligned_point_cloud
  • 分支定界的初值 initial_pose (节点 j 在local map坐标系的坐标)
  • Match() 的结果 pose_estimate (节点 j 在local map坐标系的坐标).
  • ComputeSubmapPose()函数 (local map坐标系转到子图 i 坐标系)

计算pose_estimate的三步:

  1. 使用 fast correlative scan matcher 做 Fast estimate
  2. Prune if the score is too low.
  3. ceres Refine

匹配所有子图 或 局部子图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 匹配所有子图,对应 MaybeAddGlobalConstraint
if (match_full_submap)
{
kGlobalConstraintsSearchedMetric->Increment();
if( submap_scan_matcher.fast_correlative_scan_matcher->MatchFullSubmap(
constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 global_localization_min_score
options_.global_localization_min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.global_localization_min_score());
CHECK_GE(node_id.trajectory_id, 0);
CHECK_GE(submap_id.trajectory_id, 0);
// 最后记录下全局约束的次数和统计置信度
kGlobalConstraintsFoundMetric->Increment();
kGlobalConstraintScoresMetric->Observe(score);
}
else return;
}
// 匹配局部子图
else
{
kConstraintsSearchedMetric->Increment();
if (submap_scan_matcher.fast_correlative_scan_matcher->Match(
initial_pose, constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 min_score
options_.min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.min_score());
kConstraintsFoundMetric->Increment();
kConstraintScoresMetric->Observe(score);
}
else return;
}
{
absl::MutexLock locker(&mutex_);
score_histogram_.Add(score);
}

分枝定界求出的位姿被称为 初始位姿 3,这个初始位姿3及其携带的点云作为输入,用于ceres与此子图进行优化匹配

在建立全局约束的时候,直接在一个超大的范围内进行分枝定界搜索,并不需要计算一个特殊的初始位姿2,而直接把初始位姿设置为地图limits的中心点,可以理解为map的中心点。 而且打分的参数也不同了。

全局约束的搜索窗口范围: [1e6 * limits_.resolution(), M_PI],角度其实是±180°

ceres refine

ceres优化匹配,得到更加准确的优化位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Use the CSM estimate as both the initial and previous pose. 
// This has the effect that, in the absence of better information,
// we prefer the original CSM estimate.
ceres::Solver::Summary unused_summary;
// ceres更新pose_estimate,获得节点在local map中的最优位姿
ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate,
constant_data->filtered_gravity_aligned_point_cloud,
*submap_scan_matcher.grid, &pose_estimate,
&unused_summary);
// 计算得到node相对子图的位姿
const transform::Rigid2d constraint_transform =
ComputeSubmapPose(*submap).inverse() * pose_estimate;
constraint->reset(new Constraint{submap_id,
node_id,
{ transform::Embed3D(constraint_transform),
options_.loop_closure_translation_weight(),
options_.loop_closure_rotation_weight() },
Constraint::INTER_SUBMAP} );

对于局部约束,constraint_transform并不是回环边,其实就是子图和节点的普通约束。 全局约束才构造回环边

日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (options_.log_matches() )
{
std::ostringstream info;
info << "Node " << node_id << " with "
<< constant_data->filtered_gravity_aligned_point_cloud.size()
<< " points on submap " << submap_id << std::fixed;
if (match_full_submap)
info << " matches";
else
{
const transform::Rigid2d difference =
initial_pose.inverse() * pose_estimate;
info << " differs by translation " << std::setprecision(2)
<< difference.translation().norm() << " rotation "
<< std::setprecision(3) << std::abs(difference.normalized_angle());
}
info << " with score " << std::setprecision(1) << 100. * score << "%.";
LOG(INFO) << info.str();
}

思考的问题
  • 为何子图插入scan结束时CPU升高

  • 为什么要用线程池

  • To be sure there is a recent loop closure constraint, you can query PoseGraphInterface::constraints(), loop over all constraints and check if there is a recent one of type INTER that spans some time.

  • A simpler solution is to subscribe to the global slam callback GlobalSlamOptimizationCallback by setting PoseGraphInterface::SetGlobalSlamOptimizationCallback. This will call you back after Cartographer has searched for loop closures. It does not tell you if loop closures were found, it mostly tells you if the background loop closure search is keeping up or falls behind.


emplace 和 emplace_back

常见用法:

1
2
3
4
5
6
7
8
vector<int> v;
for(int i=0; i<4; i++)
{
v.emplace_back();
auto& num = v.back();
num = i;
qDebug()<< "size: "<<v.size() <<" v.back: "<<v.back();
}

结果

1
2
3
4
size:  1   v.back:  0
size: 2 v.back: 1
size: 3 v.back: 2
size: 4 v.back: 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <map>
#include <string>
#include <vector>
using namespace std;

class A{
public:
A(const string& s) {cout << "A construct " <<endl;}
A(const A& a) {cout <<"A copy"<<endl; }
private:
string s;
};

int main()
{
std::map<int, string> m;
vector<A> v;
A a1("test"); // a1 构造函数
cout << "------------------" << endl;
v.push_back(a1); // a1 拷贝构造函数
v.emplace_back(a1);

//先运行构造函数,后拷贝构造函数
v.push_back(A("test") );
v.emplace_back(A("test") );

// v.push_back("test"); 报错
// 这个才是真正的用途
v.emplace_back("test"); // 隐式转换,只有拷贝构造函数
cout << "******************" << endl;
return 0;
}

对于C++ 11里vector的emplace_back函数比较失望,都说提高了效率,其实它仅对于元素做隐式转换的情况有效,此时没有产生临时对象。对其他情况,和push_back没区别。

这里用到的c++11特性完美转发:将接收下来的参数原样完美地传递给对象的构造函数,这带来另一个方便性就是即使是构造函数声明为 explicit 它还是可以正常工作,因为它不存在临时变量和隐式转换。

map::emplace

map就只有emplace,机制也是一样的。元素是直接构建的,既不复制也不移动,仅当键不存在时才插入。
但是map有个问题:emplace 方法把它接收到的所有的参数都一起转发给 pair 的构造函数。但是对于一个 pair 来说,它既需要构造它的 key 又需要构造它的 value。如果我们按照之前普通的语法使用变参模板的话,则它是无法区分哪些参数用来构造 key, 哪些用来构造 value的。 比如下面的代码

1
2
3
4
5
// 无法区分哪个参数用来构造 key 哪些用来构造 value
// 有可能是 std::string s("hello", 1), std::complex<double> cpx(2)
// 也有可能是 std::string s("hello"), std::complex<double> cpx(1, 2)
std::map<std::string, std::complex<double>> scp;
scp.emplace("hello", 1, 2);

考虑使用可以接受异构、变长参数的 tuple 来对参数进行分组。再使用 C++11 提供的一个特殊类型 piecewise_construct_t 来帮助它们找到各自正确的构造函数了。std::piecewise_construct_t 是一个空类,全局变量 std::piecewise_construct 就是该类型的一个变量。

std::piecewise_construct作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。即第一个 tuple 作为 key,第二个 tuple 作为 value。 使用 forward_as_tuple,该函数会构造一个 tuple 并转发给 pair。

1
2
3
4
5
// 想对于 map 避免临时变量的构造的话,就需要构建两个 tuple
std::map<std::string, std::complex<double>> scp;
scp.emplace(std::piecewise_construct, // 此常量值作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。
std::forward_as_tuple("hello"), // 该函数会构造一个 tuple 并转发给 pair 构造,并存储在 first 字段
std::forward_as_tuple(1, 2)); //该函数会构造一个 tuple 并转发给 pair 构造,存储在 second 字段

cartographer中的代码就是这样

1
2
3
4
5
6
7
8
9
// std::map<int, ::cartographer::mapping::PoseExtrapolator>  extrapolators_;
// PoseExtrapolator的构造函数有2个参数:common::Duration pose_queue_duration,
// double imu_gravity_time_constant
extrapolators_.emplace(
std::piecewise_construct,
std::forward_as_tuple(trajectory_id),
std::forward_as_tuple(
::cartographer::common::FromSeconds(kExtrapolationEstimationTimeSec),
gravity_time_constant) );

无参的emplace_back

还有一种使用无参的vector::emplace_back() 或者 deque::emplace_back(),下面是从cartographer中学来的代码,确实有一定技巧

1
2
3
4
5
6
7
8
9
10
    vector<std::unique_ptr<A> > dd;
dd.emplace_back();
// 注意这里的 size 也是1
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
// cout << dd.at(0)->getValue() <<endl; 这里运行会崩溃

auto* a1 = &dd.back(); // 这个是 unique_ptr
a1->reset(new A(5) ); // 不能再用=,unique_ptr不能复制
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
cout << dd.at(0)->getValue() <<endl;

运行结果:

1
2
3
4
capacity: 1 size: 1
A construct
capacity: 1 size: 1
5

参考:
emplace_back VS push_back


后端的线程池 1 thread_pool 和 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 省略部分不重要的成员
class Task
{
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
// 设置Task 执行的任务 (函数)
void SetWorkItem(const WorkItem& work_item);
// 给当前任务添加 依赖任务, 如当前任务为b,AddDependency(a)表示任务a依赖b
// 把当前任务b,加入到依赖任务 a 的 dependent_tasks_ 列表
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

private:
// 执行当前任务,比如当前任务为a,并更新依赖a的任务dependent_tasks_中所有任务状态
void Execute() LOCKS_EXCLUDED(mutex_);

// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();


using WorkItem = std::function<void()>;
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
// 任务具体执行过程
WorkItem work_item_ ;
// 执行当前任务的线程池
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
// 依赖当前任务的任务列表
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);
};

Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_是仿函数,其返回类型void。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 省略部分不重要的成员
// 不明白为什么要先构造个抽象类,只有这一个派生类
class ThreadPool : public ThreadPoolInterface
{
public:
//初始化一个线程数量固定的线程池。
explicit ThreadPool(int num_threads);
// 添加想要ThreadPool执行的task,插入 tasks_not_ready_
// 如果任务满足执行要求,直接插入task_queue_准备执行
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task);
private:
//每个线程初始化时, 执行DoWork()函数. 与线程绑定
void DoWork();
void NotifyDependenciesCompleted(Task* task);
//running_只是一个监视哨, 只有线程池在running_状态时, 才能往work_queue_加入函数
bool running_ = true;
// pool_就是一系列线程
std::vector<std::thread> pool_ ;
// 十分重要的任务队列
std::deque<std::shared_ptr<Task>> task_queue_ ;
//未准备好的 task,task可能有依赖还未完成
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
};

看构造函数

1
2
3
4
5
6
7
8
9
10
ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}

大致看ThreadPool::DoWork(),最后执行了ThreadPoolInterface::Execute,其实就是Task::Execute。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数

新的Task如果有依赖项,通过Thread_pool::Schedule添加到 Thread_pooltasks_not_ready_队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED,然后再插入task_queue_队列。最终所有Task都会插入task_queue_中,在DoWork中得到执行。

如果该Task没有依赖,直接插入task_queue_,准备执行。

对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED

Thread_pool通过固定数量的thread与task_queue_(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

示意图