Ceres(三) 解析求导

最常见的一个例子是 链接,不再复制粘贴

还是之前的曲线拟合问题: ,如果改用解析求导,需要构建一个继承CostFunction的类。核心函数是bool CostFunction: :Evaluate(double const *const *parameters, double *residuals, double **jacobians)计算残差向量和雅可比矩阵

  • parameters: parameters是一个大小为CostFunction::parameter_block_sizes_.size()(输入参数块的数量和大小)的数组,parameters[i]是一个大小为parameter_block_sizes_[i]的数组,其中包含CostFunction所依赖的第i个参数块。parameters不会为nullptr

  • residuals: 是一个大小为num_residuals_(输出残差的个数)的数组。residuals也不会为nullptr

  • jacobians: 是一个大小为CostFunction::parameter_block_sizes_.size()的数组。如果jacobians是nullptr,用户只需要计算残差

  • jacobians[i]: 是一个大小为 num_residuals x parameter_block_sizes_[i] 的行数组,如果jacobians[i]不为nullptr,用户需要计算关于parameters[i]的残差向量的雅可比矩阵,并将其存储在这个数组中,即

如果jacobians为nullptr,通常我们只需要在Evaluate函数中实现residuals的计算,jacobians后面可以通过Ceres提供的自动求导(数值求导)替代,否则,还需要在Evaluate函数中实现jacobians的计算

如果parameters大小和residuals大小在编译时是已知的,就可以使用SizeCostFunction,该函数可以将paramters大小和residuals大小设置为模板参数,用户只需要在使用的时候给模板参数赋值就可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 模板参数依次为: number of residuals,  first parameter的维度
class MyCostFun : public SizedCostFunction<1, 1, 1, 1>
{
public:
MyCostFun(double x, double y):
m_x(x), m_y(y){}
virtual bool Evaluate(double const* const* parameters,
double* residuals,
double** jacobians) const
{
double a = parameters[0][0];
double b = parameters[0][1];
double c = parameters[0][2];
residuals[0] = m_y - exp(a*m_x*m_x + b*m_x + c);

if (jacobians != NULL && jacobians[0] != NULL)
{
jacobians[0][0] = -exp(a*m_x*m_x + b*m_x + c)* m_x * m_x;
jacobians[0][1] = -exp(a*m_x*m_x + b*m_x + c)* m_x;
jacobians[0][2] = -exp(a*m_x*m_x + b*m_x + c);

}
return true;
}
protected:
double m_x;
double m_y;
};

拟合 时,使用SizedCostFunction<1, 2>,只计算雅格比jacobians[0][0]jacobians[0][1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int main(int argv, char** argc)
{
ros::init(argv, argc, "test_ceres");
ros::NodeHandle nh;
// 真实值
double a = 1.0, b = 2.0, c = 1.0;

// 生成带有噪声的模拟数据
vector<double> x_data, y_data;
int N = 240; // 数据数量
double w_sigma = 1.0; // 高斯标准差
cv::RNG rng;
for(unsigned int i =0; i<N; i++)
{
double x = i/100.0;
x_data.push_back(x);
y_data.push_back(exp(a*x*x + b*x + c) + rng.gaussian(w_sigma) );
}
// 作为初值
a = 0.0;
b = 0.0;
c = 0.0;
Problem problem;
for(unsigned int i=0; i<N; i++)
{
CostFunction* cost_func = new MyCostFun(x_data[i], y_data[i]);
problem.AddResidualBlock(cost_func, nullptr, &a, &b, &c);
}
Solver::Options option;
option.minimizer_progress_to_stdout = true;

Solver::Summary summary;
Solve(option, &problem, &summary);
cout << summary.BriefReport() << endl <<endl;
cout << "a: "<< a <<" b: "<< b << " c: "<< c <<endl;

return 0;
}

一开始我设置代价函数的模板为SizedCostFunction<1, 3>,结果运行报错: [problem_impl.cc:286] Check failed: num_parameter_blocks == cost_function->parameter_block_sizes().size() (3 vs. 1)

问题在于parameter_block_sizes只有1,而我们需要3,也就是对应abc三个参数。读SizedCostFunction源码发现,模板定义为template <int kNumResiduals, int... Ns>,构造函数只有两行

1
2
set_num_residuals(kNumResiduals);
*mutable_parameter_block_sizes() = std::vector<int32_t>{Ns...};

因此从第2个模板参数开始,有几个参数就对应parameter_block_sizes,改为SizedCostFunction<1, 1, 1, 1>


Ceres(四) LocalParameterization

四元数表示的是一个SO3,四元数有四维即四个自由度,但它表示的含义其实只有三个自由度,这显然是不合理的,所以产生了单位四元数这么一个东西,单位四元数指四元数的四个量的二范数是1,这其实是一个约束,约束了四元数的一个自由度,这样四元数就只剩下三个自由度了,正好符合一个SO3的维数。

这段话没有完全理解: 这里其实是一个 Manifold 变量 over parameterized的问题,即 Manifold 上变量的维度大于其自由度。这会导致 Manifold 上变量各个量之间存在约束,如果直接对这些量求导、优化,那么这就是一个有约束的优化,实现困难。为了解决这个问题,在数学上是对 Manifold 在当前变量值处形成的切空间(Tangent Space)求导,在切空间上优化,最后投影回 Manifold。

在ceres里面,如果使用的是自动求导,然后再结合爬山法,那么每步迭代中都会产生一个四维的迭代的增量delta,这样就仅仅需要将原四元数 “加上” 这个迭代产生的delta就能够得到新的四元数了。这里问题就来了,直接加上以后这个四元数就不再是一个单位四元数了,不符合二范数之和为1。如果每次迭代过后,都将这个四元数进行归一化处理,就会显然很麻烦。

同理,旋转矩阵也是如此。二者都是使用过参数化表示旋转的方式,它们是不支持广义的加法(一般使用符号 表示)。也就是说使用普通的加法就会打破约束:旋转矩阵加旋转矩阵得到的就不再是旋转矩阵,四元数加四元数得到的不是单位四元数。所以我在使用ceres对其进行迭代更新的时候就需要自定义其更新方式了,具体的做法是实现一个参数本地化的子类,需要继承LocalParameterization,它是纯虚类,所以继承的时候要把所有的纯虚函数都实现。

Ceres存储四元数的顺序是(w, x, y, z)Eigen::Quaternion内部内存分布为(x, y, z, w)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

class CERES_EXPORT QuaternionParameterization : public LocalParameterization
{
public:
virtual ~QuaternionParameterization() {}

virtual bool Plus(const double* x,
const double* delta,
double* x_plus_delta) const;
virtual bool ComputeJacobian(const double* x,
double* jacobian) const;

// 表示参数的自由度,也就是 Manifold 空间(四元数)是 4 维的
// 即四元数本身的实际维数,如果是旋转矩阵,则为 9
virtual int GlobalSize() const { return 4; }
// 表示迭代增量所在的切空间(tangent space)是 3 维的。即旋转矢量的实际维数
virtual int LocalSize() const { return 3; }
};

对两个Size,我的理解是:四元数本身的维度是 4,但其自由度只有 3,因此迭代量的维度也应该是 3

  • Plus()函数

原型: bool Plus(const double* x, const double* delta, double* x_plus_delta)

四元数的更新方法,参数分别为优化前的四元数 x,用旋转矢量表示的增量delta 而且本来就是半轴角,所以不需要除以2 ),更新后的四元数 x_plus_delta。函数首先将增量由旋转矢量转换为四元数,转换过程就是《视觉SLAM十四讲》的公式(3.19),然后用优化前的x左乘增量,获得x_plus_delta

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool QuaternionParameterization::Plus(const double* x,
const double* delta,
double* x_plus_delta) const
{
/* -------- 将旋转矢量转换为四元数形式-------- */
const double norm_delta =
sqrt(delta[0] * delta[0] + delta[1] * delta[1] + delta[2] * delta[2]);
if (norm_delta > 0.0)
{
// 这里的 除以norm_delta 相当于归一化
const double sin_delta_by_delta = (sin(norm_delta) / norm_delta);
double q_delta[4];
q_delta[0] = cos(norm_delta);
q_delta[1] = sin_delta_by_delta * delta[0];
q_delta[2] = sin_delta_by_delta * delta[1];
q_delta[3] = sin_delta_by_delta * delta[2];

/* --------2.采用四元数乘法更新-------- */
// 注意这个乘法的顺序关乎求雅克比矩阵,具体说明参照 ComputeJacobian
x_plus_delta = q_delta * x;
}
else
{
for (int i = 0; i < 4; ++i)
x_plus_delta[i] = x[i];
}
return true;
}
  • ComputeJacobian函数

原型:bool ComputeJacobian(const double* x, double* jacobian)

四元数相对于旋转矢量的雅克比矩阵,即 x_plus_delta( 也就是 Plus(x, delta) ) 关于 delta(接近0)的雅克比矩阵。计算该参数加上更新量的结果对更新量的雅可比矩阵

四元数维度为 4,迭代量维度为 3,因此雅可比维度应该是 4x3。也就是

如果直接在求观测误差对四元数求雅可比,得到的结果维度和该变量的维度是一致的,也就是说 雅格比同样具有冗余的自由度 ,还是开始就提出的问题,因此还需要对其迭代量进行雅可比求解。数学表示如下

(2)式为四元数上的例子,q为四元数,维度为 4,为作用在四元数上的更新量,维度为 3

上面两个公式的第一部分是对原始过参数化的优化变量(四元数)的导数,这个很容易求得,直接借助ceres的AutoDiffCostFunction() 计算即可,或者自己计算雅可比矩阵,实现一个costfunction。关键是第二部分。

先补充一下四元数知识,来自十四讲

继续推导如下

1
2
3
4
5
6
7
8
9
10
// x[0] 对应 w
bool QuaternionParameterization::ComputeJacobian(const double* x,
double* jacobian) const
{
jacobian[0] = -x[1]; jacobian[1] = -x[2]; jacobian[2] = -x[3]; // NOLINT
jacobian[3] = x[0]; jacobian[4] = x[3]; jacobian[5] = -x[2]; // NOLINT
jacobian[6] = -x[3]; jacobian[7] = x[0]; jacobian[8] = x[1]; // NOLINT
jacobian[9] = x[2]; jacobian[10] = -x[1]; jacobian[11] = x[0]; // NOLINT
return true;
}

调用的方法:

1
2
ceres::LocalParameterization* local_param = new ceres::QuaternionParameterization();
problem.AddParameterBlock(quaternion, 4, local_param) //重构参数,优化时实际使用的是3维的等效旋转矢量

注意:从 Ceres version 2.2.0开始,LocalParameterization interface and associated classes are deprecated. Please use Manifold instead.

参考:LocalParameterization子类说明
Ceres 学习记录(二)

通过Problem::AddResidualBlock()函数,记录输入parameters和residuals的尺寸

AddParameterBlock添加的优化变量并不一定是ceres内部运算时采用的优化变量,例如我们通常会采用四元数+平移也就是SE3作为外部维护的状态,在ceres优化中也被成为global parameter,但通常会对 se3(local parameter)求解jacobian,那么此时我们必须要告诉ceres,求解出se3的优化增量后如何对SE3进行更新,这个就是通过指定参数化方式完成的,即传入LocalParameterization的子类对象,

参数化类中需要实现什么?

  1. 定义该参数优化过程中,求解出来的Local parameter对Global parameter进行更新的方法,virtual bool Plus(const double* x, const double* delta, double* x_plus_delta) const

  2. 定义Global parameterLocal parameter的jacobian, virtual bool ComputeJacobian(const double* x, double* jacobian) const,因为ceres优化时,只能设置残差关于李群的jacobian(通过实现ceres::SizedCostFunction子类完成),但我们需要的是残差关于李代数的jacobian,因此通过这个函数传入李群与对应李代数的jacobian,从而实现转换。


(零) 安装cartographer

cartographer的安装非常复杂,尤其是proto脚本和各种依赖项,如果一次不成功,最好把proto或其他依赖库彻底删除重来,越改可能越乱,再也装不好了。

ros2的carto好像还是非常早期的版本,一些参数都不支持,先不必使用

安装过程的问题

下载protobuf很可能失败,只好手动下载: git clone https://github.com/google/protobuf.git

小强编译cartographer更新的lib文件在/home/xiaoqiang/Documents/ros/devel/lib/cartographer_ros

gtest问题

编译cartographer报错gtest

1
2
/usr/src/gtest/src/gtest.cc:1897:10: error: type ‘const class testing::internal::scoped_ptr<testing::internal::GTestFlagSaver>’ argument given to ‘delete’, expected pointer
delete gtest_flag_saver_;

估计是gtest的版本不兼容导致,从github上下载编译google-test,注意修改CMake支持c++14,否则连从github直接下载的源码编译也报错:

然后编译cartographer时,把几个重要的CMakeLists.txt也加上c++14的支持,否则所有的测试文件(以test.cc结尾的文件)编译都会出错

缺少liborocos-kdl库文件

默认的版本是1.3.2,这是下载地址

FindLuaGoogle

报错:

1
2
3
Could NOT find Lua (missing: LUA_LIBRARIES LUA_INCLUDE_DIR) 
CMake Error at cmake/modules/FindLuaGoogle.cmake:217 (MESSAGE):
Did not find Lua >= 5.2

解决: sudo apt-get install lua5.2 lua5.2-dev

absl库的问题

cartographer新版本要先安装abseil,否则报错

1
2
3
4
git clone https://github.com/abseil/abseil-cpp.git
cd abseil-cpp
cmake -L CMakeLists.txt && make
sudo make install

编译时会有个问题,需要加C++11,在absl文件夹里的CMakeList.txt里面添加set(CMAKE_CXX_FLAGS "-std=c++11") 或者 add_compile_options(-std=c++11)

如果还是缺少文件,就按照find_package的要求配置
缺abslTargets.cmake.png

编译cartographer_rviz出错.png
这个问题困扰我好久,最后终于发现在编译absl库时的cmake没有加 -fPIC flag,所以执行 cmake -DCMAKE_POSITION_INDEPENDENT_CODE=ON ..,这样就永久解决这个问题了。

要注意报错信息,找的是AbseilConfig.cmake 或者 abseilConfig.cmake,查看cartographer_rosCMakeLists。 我的abseil装完是在/usr/local/lib/cmake/absl/abslConfig.cmake

1
2
3
# 添加查找目录
set(Abseil_DIR "/usr/local/lib/cmake/absl")
find_package(Abseil REQUIRED)


后来我又用另一种方式解决了

从github下载 abseil-cpp-20190808.1,解压后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
git clone https://github.com/abseil/abseil-cpp.git
cd abseil-cpp
git checkout 215105818dfde3174fe799600bb0f3cae233d0bf # 20211102.0
mkdir build
cd build
cmake -G Ninja \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_POSITION_INDEPENDENT_CODE=ON \
-DCMAKE_INSTALL_PREFIX=/usr/local/stow/absl \
..
ninja
sudo ninja install
cd /usr/local/stow
sudo stow absl

修改cartographer_ros/CMakeLists.txt的部分:

1
2
set(absl_DIR "/usr/local/stow/absl/lib/cmake/absl")
find_package(absl REQUIRED)

Protobuf

安装cartogapher新版本时候,编译后遇到Unrecognized syntax identifier "proto3". This parser only recognizes "proto2"。

检查protobuf版本: protoc --version 显示的是2.6版本。
使用locate protoc发现我有两个protc,在/usr/bin/usr/local/bin,分别检查版本,发现都是2.6。再次使用md5sum检查,发现两个文件完全一样
检查protoc.png

安装 Protobuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 版本不要太新
VERSION="v3.4.1"

# Build and install proto3.
git clone https://github.com/google/protobuf.git
cd protobuf
git checkout tags/${VERSION}
mkdir build
cd build
cmake -G Ninja \
-DCMAKE_POSITION_INDEPENDENT_CODE=ON \
-DCMAKE_BUILD_TYPE=Release \
-Dprotobuf_BUILD_TESTS=OFF \
../cmake
ninja
sudo ninja install

或者

1
2
3
4
5
6
7
8
9
10
11
sudo apt-get install autoconf autogen
git clone https://github.com/protocolbuffers/protobuf.git
cd protobuf
git submodule update --init --recursive
./autogen.sh
./configure
make
# 这一步可能会报错,无视就好
make check
sudo make install
sudo ldconfig # refresh shared library cache.

检查一下安装后protobuf的版本,protoc --version

编译器优先找了/usr/bin/protoc的版本, 新安装的proto3是放在/usr/local/bin/protoc下的,可以删除/usr/bin/protoc,然后把/usr/local/bin/protoc放入/usr/bin,或者建立软连接:

1
2
sudo mv /usr/bin/protoc /usr/bin/protoc.bk
sudo ln -s /usr/local/bin/protoc /usr/bin/protoc

编译

有时编译遇到错误,只删除build_isolated/cartographer_ros/CMakeFiles即可,不用全删编译完成的文件


cmake教程(七) add_definitions设置编译参数

添加编译参数,例如:

  • add_definitions(-DDEBUG)将在gcc命令行添加 DEBUG 宏定义;
  • add_definitions(“-Wall -ansi –pedantic –g”)

编译时有时出现报警warning: extra ‘;’ [-Wpedantic],很影响看编译信息,可使用下面方法禁止pedantic

1
2
3
4
5
6
7
8
9
10
//save compiler switches
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"

//Bad headers with problem goes here
#include <ros/ros.h>
#include <sensor_msgs/LaserScan.h>

//restore compiler switches
#pragma GCC diagnostic pop

类似的报警还有warning: ISO C++ forbids variable length array ‘angle_compensate_nodes’ [-Wvla],把上面的"-Wpedantic"换成"-Wvla"即可


读写pbstream文件

保存文件

Node::SerializeState —— MapBuilder::SerializeState —— MapBuilder::SerializeState —— cartographer\io\internal\mapping_state_serialization.cc文件中的WritePbStream函数

实际是MapBuilderBridge::SerializeStateNode::HandleWriteState调用的也是这个函数。在最新版本里是bool MapBuilderBridge::SerializeState(const std::string& filename, const bool include_unfinished_submaps);

1
2
3
cartographer::io::ProtoStreamWriter writer(filename);
map_builder_->SerializeState(&writer);
return writer.Close();

然后是
1
2
3
4
5
6
void MapBuilder::SerializeState(bool include_unfinished_submaps,
io::ProtoStreamWriterInterface* const writer)
{
io::WritePbStream(*pose_graph_, all_trajectory_builder_options_, writer,
include_unfinished_submaps);
}

proto_stream.cc包含ProtoStreamWriterProtoStreamReader两个类,这里用的是类ProtoStreamWriter,在它的构造函数里以二进制方式,写入8个字节的魔术数以确定proto stream格式,

WritePbStream如下,这里的writer就是类ProtoStreamWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void WritePbStream(
const mapping::PoseGraph& pose_graph,
const std::vector<mapping::proto::TrajectoryBuilderOptionsWithSensorIds>&
trajectory_builder_options,
ProtoStreamWriterInterface* const writer, bool include_unfinished_submaps)
{
writer->WriteProto(CreateHeader());
writer->WriteProto(
SerializePoseGraph(pose_graph, include_unfinished_submaps));
writer->WriteProto(SerializeTrajectoryBuilderOptions(
trajectory_builder_options,
GetValidTrajectoryIds(pose_graph.GetTrajectoryStates())));

SerializeSubmaps(pose_graph.GetAllSubmapData(), include_unfinished_submaps,
writer);
SerializeTrajectoryNodes(pose_graph.GetTrajectoryNodes(), writer);
SerializeTrajectoryData(pose_graph.GetTrajectoryData(), writer);
SerializeImuData(pose_graph.GetImuData(), writer);
SerializeOdometryData(pose_graph.GetOdometryData(), writer);
SerializeFixedFramePoseData(pose_graph.GetFixedFramePoseData(), writer);
SerializeLandmarkNodes(pose_graph.GetLandmarkNodes(), writer);
}

保存的是子图、轨迹节点、轨迹数据、IMU数据、里程计、Landmark等等

ProtoStreamWriter保存数据时用到了common::FastGzipString函数,实质是boost::iostreams::gzip_compressor

读pbstream文件

这个就参考纯定位流程,Node::LoadState —— MapBuilderBridge::LoadState —— MapBuilder::LoadState

MapBuilderBridge::LoadState(const std::string& state_filename, bool load_frozen_state)使用ProtoStreamReader

1
2
cartographer::io::ProtoStreamReader stream(state_filename);
map_builder_->LoadState(&stream, load_frozen_state);

到了MapBuilder::LoadState,可以看出如果只是读pbstream文件,不需要实例化map_builder,除去加载数据到后端,把函数里的大部分内容提取出来即可,主要是类型io::ProtoStreamDeserializerproto::PoseGraph等等。

另外可以参考pbstream_main.cc


旭日x3中运行cartographer
abstract Welcome to my blog, enter password to read.
Read more
Docker的使用

安装Docker

1
2
3
4
5
6
7
8
# 添加docker的GPG公匙
curl -fsSL https://mirrors.ustc.edu.cn/docker-ce/linux/ubuntu/gpg | sudo apt-key add -
# 添加中科大源
sudo add-apt-repository "deb [arch=amd64] https://mirrors.ustc.edu.cn/docker-ce/linux/ubuntu $(lsb_release -cs) stable"
sudo apt-get update
apt-cache policy docker-ce
sudo apt-get install -y docker-ce
docker version #查看版本

常用操作

1
2
3
4
5
6
7
8
9
10
11
# 显示所有容器
sudo docker image ls
sudo docker ps -a
# 启动已经停止的容器
docker start ed86d97c09bd
# 停止容器
docker stop ed86d97c09bd
# 进入容器
sudo docker exec -it ed86d97c09bd /bin/bash
# 删除容器
docker rm -f ed86d97c09bd


纯定位机制的流程

纯定位其实就是SLAM,只是不保存所有子图。因为node_main.cc里会先要运行map_builder,然后再是node.LoadState,最后StartTrajectoryWithDefaultTopics进行建图

Node::LoadState(const std::string& state_filename, const bool load_frozen_state) —— map_builder_bridge_.LoadState —— map_builder.LoadState

MapBuilder::AddTrajectoryBuilder中有一句MaybeAddPureLocalizationTrimmer(trajectory_id, trajectory_options, pose_graph_.get() );,实际就是

1
2
3
4
5
6
if (trajectory_options.has_pure_localization_trimmer()) 
{
pose_graph->AddTrimmer(absl::make_unique<PureLocalizationTrimmer>(
trajectory_id,
trajectory_options.pure_localization_trimmer().max_submaps_to_keep() ) );
}

PureLocalizationTrimmer这个类的解释就是 Keeps the last num_submaps_to_keep of the trajectory with trajectory_id to implement localization without mapping.

初值的处理

1
2
3
4
5
6
7
8
9
if (trajectory_options.has_initial_trajectory_pose())
{
const auto& initial_trajectory_pose =
trajectory_options.initial_trajectory_pose();
pose_graph_->SetInitialTrajectoryPose(
trajectory_id, initial_trajectory_pose.to_trajectory_id(),
transform::ToRigid3(initial_trajectory_pose.relative_pose()),
common::FromUniversal(initial_trajectory_pose.timestamp()) );
}

这里涉及到PoseGraph2D::SetInitialTrajectoryPose,其实就一句 data_.initial_trajectory_poses[from_trajectory_id] = InitialTrajectoryPose{to_trajectory_id, pose, time};

MapBuilder::LoadState

先是轨迹ID的增加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::map<int, int> trajectory_remapping;
for (int i = 0; i < pose_graph_proto.trajectory_size(); ++i)
{
auto& trajectory_proto = *pose_graph_proto.mutable_trajectory(i);
const auto& options_with_sensor_ids_proto =
all_builder_options_proto.options_with_sensor_ids(i);
// 新的轨迹id
const int new_trajectory_id =
AddTrajectoryForDeserialization(options_with_sensor_ids_proto);

CHECK(trajectory_remapping
.emplace(trajectory_proto.trajectory_id(), new_trajectory_id)
.second)
<< "Duplicate trajectory ID: " << trajectory_proto.trajectory_id();
trajectory_proto.set_trajectory_id(new_trajectory_id);

if (load_frozen_state) {
pose_graph_->FreezeTrajectory(new_trajectory_id);
}
}

然后从pbstream文件添加轨迹,更新约束中节点和子图的轨迹id,从获取的pose graph生成 submap_posesnode_poses,将landmark_poses添加到pose graph,向pose graph添加各类信息,添加子图的附属节点。

以添加子图为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
io::ProtoStreamDeserializer deserializer(reader);

// Create a copy of the pose_graph_proto, such that we can re-write the
// trajectory ids.
proto::PoseGraph pose_graph_proto = deserializer.pose_graph();
const auto& all_builder_options_proto =
// 子图位姿
MapById<SubmapId, transform::Rigid3d> submap_poses;
for (const proto::Trajectory& trajectory_proto :
pose_graph_proto.trajectory())
{
for (const proto::Trajectory::Submap& submap_proto :
trajectory_proto.submap())
{
submap_poses.Insert(SubmapId{trajectory_proto.trajectory_id(),
submap_proto.submap_index()},
transform::ToRigid3(submap_proto.pose()));
}
}
MapById<SubmapId, mapping::proto::Submap> submap_id_to_submap;
// while 读子图数据
proto.mutable_submap()->mutable_submap_id()->set_trajectory_id(
trajectory_remapping.at(
proto.submap().submap_id().trajectory_id()));
submap_id_to_submap.Insert(
SubmapId{proto.submap().submap_id().trajectory_id(),
proto.submap().submap_id().submap_index()},
proto.submap());
// 进入后端
for (const auto& submap_id_submap : submap_id_to_submap) {
pose_graph_->AddSubmapFromProto(submap_poses.at(submap_id_submap.id),
submap_id_submap.data);
}

后端的处理

1
2
3
4
5
6
7
8
9
10
void PoseGraph2D::AddTrimmer(std::unique_ptr<PoseGraphTrimmer> trimmer)
{
PoseGraphTrimmer* const trimmer_ptr = trimmer.release();
AddWorkItem([this, trimmer_ptr]() LOCKS_EXCLUDED(mutex_)
{
absl::MutexLock locker(&mutex_);
trimmers_.emplace_back(trimmer_ptr);
return WorkItem::Result::kDoNotRunOptimization;
});
}

这里作为WorkItem加入到了队列里,到了HandleWorkQueue才处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 纯定位时,子图的裁剪,如果没有裁剪器就不裁剪
TrimmingHandle trimming_handle(this);
for (auto& trimmer : trimmers_)
{
// 这里实际就是 PureLocalizationTrimmer::Trim
trimmer->Trim(&trimming_handle);
}
// 裁剪器完成状态,删除裁剪器
trimmers_.erase(
std::remove_if(trimmers_.begin(), trimmers_.end(),
[](std::unique_ptr<PoseGraphTrimmer>& trimmer) {
return trimmer->IsFinished();
}),
trimmers_.end());

这里的裁剪子图就是PureLocalizationTrimmer::Trim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void PureLocalizationTrimmer::Trim(Trimmable* const pose_graph)
{
// trajectory 是FINISHED状态,或者没有 trajectory
if (pose_graph->IsFinished(trajectory_id_)) {
num_submaps_to_keep_ = 0;
}
// 实际调用 TrimmingHandle::GetSubmapIds
auto submap_ids = pose_graph->GetSubmapIds(trajectory_id_);
// 从序列号0开始裁剪子图,并删除相关的约束和节点
// 剩下序列号大的子图
for (std::size_t i = 0; i + num_submaps_to_keep_ < submap_ids.size(); ++i)
{
// 这是个大函数
pose_graph->TrimSubmap(submap_ids.at(i) );
}
if (num_submaps_to_keep_ == 0)
{
finished_ = true;
pose_graph->SetTrajectoryState(
trajectory_id_,
PoseGraphInterface::TrajectoryState::DELETED );
}
}

GetSubmapIds有必要学习
1
2
3
4
5
6
7
8
9
10
11
12
std::vector<SubmapId> PoseGraph2D::TrimmingHandle::GetSubmapIds(
int trajectory_id) const
{
std::vector<SubmapId> submap_ids;
// PoseGraph2D* const parent_;
const auto& submap_data = parent_->optimization_problem_->submap_data();
for (const auto& it : submap_data.trajectory(trajectory_id) )
{
submap_ids.push_back(it.id);
}
return submap_ids;
}

核心是pose_graph->TrimSubmap,函数太长了,功能如下:

  • 获取除submap_id外的所有子图的所有节点的id(nodes_to_retain),这些节点是需要保留的
  • 找到在submap.id的子图内部,同时不在别的子图内的节点(nodes_to.remove),这些节点需要删除
  • 删除submap_id相关的约束
  • 删除与nodes_to_remove中节点相关联的约束,并对对应的submap_id进行标记
  • 对标记的submap_id进行检查,看是否还存在其他子图间约束
  • 删除没有子图间约束的标记的子图的扫描匹配器
  • 删除submap_id这个子图的指针
  • 删除submap_id这个子图的匹配器,与多分辨率地图
  • 删除optimization_problem_中的submap_id这个子图
  • 删除nodes_to_remove中的节点

纯定位时的匹配流程

PoseGraph2D::ComputeConstraint(node_id, submap_id) —— ConstraintBuilder2D::MaybeAddGlobalConstraint(submap_id, submap, node_id, constant_data) —— ConstraintBuilder2D::ComputeConstraint

第2步中

1
2
3
4
// 分别根据 node_id 和 submap_id 获得 节点数据和子图
const TrajectoryNode::Data* constant_data = data_.trajectory_nodes.at(node_id).constant_data.get();
const Submap2D* submap = static_cast<const Submap2D*>(data_.submap_data.at(submap_id).submap.get() );

第3步中,submap唯一的作用是获得初值:

1
2
const transform::Rigid2d initial_pose =
ComputeSubmapPose(*submap) * initial_relative_pose;

这里就想到那句话:前端建图,后端并不改变子图本身,只改变位姿。


条件变量

boost::condition

程序如下,main函数里要把上面两个线程的join交换一下,先执行线程2,这样在线程2里的随机数如果大于90,会唤醒线程1。如果先执行线程1,会一直阻塞,程序没法向下执行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include<ros/ros.h>
#include <boost/thread.hpp>
#include "boost/thread/mutex.hpp"
#include <boost/thread/condition.hpp>
using namespace std;

boost::mutex mut;
boost::condition cond;
boost::mutex::scoped_lock _lock(mut);

void thread_1(int n)
{
while(true)
{
// 线程1在这里阻塞,但先解开互斥锁,让其他线程能够得到这个互斥锁
cond.wait(_lock);
cout<< "thread 1: "<< rand()%50 <<endl;
sleep(1);
}
}

void thread_2(int n)
{
int m;
while(true)
{
m = 50 + rand()%50;
cout<< "thread 2: "<< m <<endl;
if(m > 90)
{
// 发送一个信号给另外一个阻塞的线程,使其继续执行
cond.notify_one();
cout<<"thread 2 wake thread 1 ";
}
sleep(1);
}
}

int main()
{
unsigned int n = 2;
boost::thread th_1 = boost::thread(boost::bind(&thread_1,n));
boost::thread th_2 = boost::thread(boost::bind(&thread_2,n));

th_1.join();
th_2.join();
return 0;
}

某次的测试结果:

1
2
3
4
5
6
7
thread 2: 65
thread 2: 93
thread 2 wake thread 1 thread 1: 35
thread 2: 86
thread 2: 92
thread 2 wake thread 1 thread 1: 49
thread 2: 71

条件变量的一般用法是:线程 A 等待某个条件并挂起,直到线程 B 满足了这个条件,并通知条件变量,然后线程 A 被唤醒。经典的生产者-消费者问题就可以用条件变量来解决。

这里等待的线程可以是多个,notify_one是通知一个线程,若有多个线程在等待,则根据优先级高低和入队顺序决定取消哪个线程,当然也可以使用notify_all是激活所有线程。

多个等待线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include<ros/ros.h>
#include <boost/thread.hpp>
#include "boost/thread/mutex.hpp"
#include <boost/thread/condition.hpp>
using namespace std;

boost::mutex mut, mut_2;
boost::condition cond;

// 线程1和线程2 必须使用不同的 scoped_lock
boost::mutex::scoped_lock lock(mut);
boost::mutex::scoped_lock lock_2(mut_2);

void thread_1(int n)
{
while(true)
{
cout << " ----- thread 1 is blocking ----- " << endl;
cond.wait(lock);
cout<< " ++++++++++++++ thread 1: "<< rand()%50 << " ++++++++++++++" << endl;
sleep(1);
}
}

void thread_2(int n)
{
while(true)
{
cout << " ----- thread 2 is blocking ----- " << endl;
cond.wait(lock_2);
cout<< " ************ thread 2: "<< rand()%50 << " ************" << endl;
sleep(1);
}
}

void thread_3(int n)
{
int m;
while(true)
{
m = 50 + rand()%50;
cout<< "thread 3: "<< m <<endl;
if(m > 88)
{
// 唤醒一个
cond.notify_one();
cout<<" ------------------ thread 3 wake a thread ------------------ " <<endl;
}
sleep(1);
}
}

int main()
{
unsigned int n = 2;
boost::thread th_1 = boost::thread(boost::bind(&thread_1,n));
boost::thread th_2 = boost::thread(boost::bind(&thread_2,n));
boost::thread th_3 = boost::thread(boost::bind(&thread_3,n));

th_1.join();
th_2.join();
th_3.join();
return 0;
}

一次的运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 ----- thread 1 is blocking ----- 
thread 3: 83
----- thread 2 is blocking -----
thread 3: 86
thread 3: 77
thread 3: 65
thread 3: 93
------------------ thread 3 wake a thread ------------------
++++++++++++++ thread 1: 35 ++++++++++++++
----- thread 1 is blocking -----
thread 3: 86
thread 3: 92
------------------ thread 3 wake a thread ------------------
************ thread 2: 49 ************
thread 3: ----- thread 2 is blocking ----- 71

thread 3: 62
thread 3: 77
thread 3: 90
------------------ thread 3 wake a thread ------------------
++++++++++++++ thread 1: 9 ++++++++++++++

线程2在线程1之后,所以线程3会先激活线程1,然后线程2

notify_all就会激活两个线程,不列举结果了。


读写锁

读写锁shared_mutex是一种较一般化的互斥锁,主要用于读操作比较多、写操作少的多线程情况,也就是可以有多个读线程和一个写线程对共享区域操作。读写锁有三种状态:读锁、写锁、不锁。遵循写互斥,读共享,写优先的原则。

常见的有下面几种情况:

  1. 多个线程可以同时占有读锁
  2. 每次只能有一个写线程,一个线程加了写锁,其他线程无论读写都要阻塞,解锁后,优先唤醒占有写锁的线程
  3. 一个线程加读锁,所有试图加读锁的线程都可以访问,试图加写锁的线程阻塞。也就是读共享,写互斥。 解锁后,要加写锁的线程优先。

boost中的shared_mutex默认的实现是写者优先。 读写锁在使用前要初始化,释放底层内存前要销毁。

shared_lockunique_lock

适用场景:对数据的读次数多于写次数的场合,比如数据库,我们允许在数据库上同时执行多个读操作,但是某一时刻只能在数据库上有一个写操作来更新数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <ros/ros.h>
#include <iostream>
#include <boost/ref.hpp>
#include <boost/thread/thread.hpp>
#include <string>
using namespace boost;
using namespace std;

int g_num = 10; // 全局变量,写者改变该全局变量,读者读该全局变量
shared_mutex s_mu; // 全局shared_mutex对象


void read_(std::string &str)
{
const char* c = str.c_str();
while (1)
{
{
shared_lock<shared_mutex> m(s_mu); //读锁定,shared_lock
printf("线程 %s 进入临界区,global_num = %d\n", c, g_num);
boost::this_thread::sleep(boost::posix_time::seconds(2)); //sleep 1秒,给足够的时间看其他线程是否能进入临界区
printf("线程 %s 离开临界区...\n", c);
}
boost::this_thread::sleep(boost::posix_time::seconds(2)); //读线程离开后再sleep 1秒(改变这个值可以看到不一样的效果)
}
cout << std::endl;
}


void writer_(std::string &str)
{
const char* c = str.c_str();
while (1)
{
{
unique_lock<shared_mutex> m(s_mu); //写锁定,unique_lock
++g_num;
printf("线程 %s 进入临界区,global_num = %d\n", c, g_num);
boost::this_thread::sleep(boost::posix_time::seconds(1)); //sleep 1秒,让其他线程有时间进入临界区
printf("线程 %s 离开临界区...\n", c);
}
//写线程离开后再sleep 2秒,如果这里也是1秒,那两个写线程一起请求锁时会让读线程饥饿
boost::this_thread::sleep(boost::posix_time::seconds(2));
}
cout << std::endl;
}

int main()
{
std::string r1 = "read_1";
std::string r2 = "read_2";
std::string w1 = "writer_1";
std::string w2 = "writer_2";

boost::thread_group tg;
tg.create_thread(bind(read_, boost::ref(r1))); //两个读者
tg.create_thread(bind(read_, boost::ref(r2)));

tg.create_thread(bind(writer_, boost::ref(w1))); //两个写者
tg.create_thread(bind(writer_, boost::ref(w2)));
tg.join_all();

return 0;
}

某次的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
线程read_1进入临界区,global_num = 10
线程read_2进入临界区,global_num = 10
线程read_1离开临界区...
线程read_2离开临界区...
线程writer_1进入临界区,global_num = 11
线程writer_1离开临界区...
线程writer_2进入临界区,global_num = 12
线程writer_2离开临界区...
线程read_2进入临界区,global_num = 12
线程read_2离开临界区...
线程read_1进入临界区,global_num = 12
线程read_1离开临界区...
线程writer_2进入临界区,global_num = 13
线程writer_2离开临界区...
线程writer_1进入临界区,global_num = 14
线程writer_1离开临界区...

由此可见,多个读线程可以同时进入临界区;写线程进入后,其他线程都不能进入,只能等它离开。即同时只有一个写线程能进入临界区。

需要避免这样的结果,也就是一个写线程连续执行,轮不到读线程了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
线程read_1进入临界区,global_num = 10
线程read_2进入临界区,global_num = 10
线程read_1离开临界区...
线程read_2离开临界区...

线程writer_1进入临界区,global_num = 11
线程writer_1离开临界区...
线程writer_1进入临界区,global_num = 12
线程writer_1离开临界区...
线程writer_1进入临界区,global_num = 13
线程writer_1离开临界区...

线程writer_2进入临界区,global_num = 14
线程writer_2离开临界区...
线程writer_2进入临界区,global_num = 15
线程writer_2离开临界区...
线程writer_2进入临界区,global_num = 16
线程writer_2离开临界区...
线程writer_2进入临界区,global_num = 17

参考:
读写锁