协方差矩阵

协方差可以表示两个变量的协同关系, 变化趋势是否一致。 相关性就是用X、Y的协方差除以X的标准差和Y的标准差。

我们机器人用到的向量是(x,y,z, roll,pitch,yaw),协方差就是一个6x6的矩阵,对角线的元素为各个随机变量的方差,非对角线的元素就是两两随机变量之间的协方差。AMCL中所用的协方差矩阵,由于z,pitch,roll都是0,矩阵的第1,8和最后的元素,分别是x,y,yaw的方差。另外还有x和y的协方差

如果协方差矩阵是单位矩阵,那么就是标准的多元高斯分布。由于协方差矩阵是对称矩阵,所以可以做特征分解

其实可以变成

因此任意一个协方差矩阵可以看做三维变换的结果,概率密度函数是由协方差矩阵的特征向量控制旋转,特征值控制尺度。 协方差矩阵就是根据这两项对标准高斯分布进行变换。

AMCL中的pf_ran_gaussian函数是以对角线元素为标准差,生成一个高斯分布。


点云地图和八叉树地图

如果点云分布非常规整,是某个特定物体的点云模型,则应该使用Octree,因为很容易求解凸包并且点与点之间相对距离无需再次比对父节点和子节点,更加明晰,典型的例子是斯坦福的兔子。

lego_loam建立的点云太稀疏,只能用于定位而不能导航。

用RGBD建立3D稠密点云图,并使用octomap进行压缩滤除地面信息。然后通过2D投影生成占据栅格地图最后利用costmap进行全局和局部路径规划导航实时避障,这又变成2D导航了

点云地图也有一些基于ICP或直接用八叉树地图的三维导航的方法,可以用于无人机在三维空间的导航。至于稀疏特征点地图,容易受光照和场景影响

点云地图: 无序,因为它的点都是无序的,无法坐标索引查询

八叉树地图特点

  • 稀疏: 不需要对空间进行稠密切分
  • 结构化: 方块排列固定,切分为八份
  • 非直接索引查询

八叉树地图基于OctoTree,可以是多分辨率的地图
八叉树

Voxel Grid就像3D的占用地图,分辨率是固定的
VoxelGrid


虚拟墙/自定义的障碍

要想在地图中添加障碍,用virtual_layercostmap_prohibition_layer都可以,效果一样,区别就是前者的yaml里需要加上朝向,其实是没必要的。下面以virtual_layer为例向代价地图加入新的障碍。

进入costmap_2d这个包,查看costmap_2d\costmap_plugins.xml文件,可以看到目前所拥有的层,基类都是costmap_2d::Layer

配置代价地图

launch里加入

1
2
<rosparam file="$(find navigation2d)/param/virtual_layer.yaml" command="load" ns="global_costmap" />
<rosparam file="$(find navigation2d)/param/virtual_layer.yaml" command="load" ns="local_costmap" />

global_costmap_params.yamllocal_costmap_params.yaml里加入 - {name: virtual_layer, type: "virtual_costmap_layer::VirtualLayer"}

最后是设置禁止区域坐标参数,首先需要在参数配置文件夹(global_costmap_params.yaml所在的文件夹)中创建新的文档,命名为prohibition_areas.yaml,然后输入:

1
2
3
4
5
6
7
costmap_prohibition_layer:
prohibition_areas:
- [[-1.3, -2.98],[-1.3, -16.77]]
- [[13.31, -26.77],[2.88, -3.45]]
- [[-4.96, -37.78],[-32.67, -37.90]]
- [[-23.39, -33.43],[-11.89, -33.20]]
- [[0.0, 6.89],[0.0, 38.05]]

如果是virtual_layer.yaml,设置类似如下:
1
2
3
4
5
6
virtual_layer:
zone_topics: ["/zone"]
obstacle_topics: ["/obstacle"]
static_forms: [[[-18.46,-12.18,0.0],[-13.21,-12.18,0.0],[-13.21,-13.4,0.0],[-18.46,-13.4,0.0]],
[[-13.99,-0.17,0.0],[-11.87,-0.14,0.0]],
[[-13.18,-11.15,0.0],[-11.81,-11.15,0.0]]]

此时加载virtual_layer成功,但是障碍没有被膨胀,只有静态障碍和代价值

查来查去,怎么也找不到原因,最后是在两个代价地图的yaml里,把- {name: virtual_layer, type: "virtual_costmap_layer::VirtualLayer"} 放到膨胀层之前 ,问题解决,看来膨胀层的源码不完善。

注意事项:

  1. 一定要严格按照上述格式来设置坐标,格式错误导致不能识别禁止区域坐标情形有: (1)坐标前的短横线没对齐 (2) 定义禁止区域或者禁止线,两坐标之间缺少了逗号
  2. 可以同时定义多个点障碍/线障碍/障碍区域。对于3个及以上的点,形成的是封闭的区域障碍。
  3. 禁止区域的坐标是map坐标系的
  4. 由于Bresenham算法,形成的线可能看起来不够直
    18_1119.png

costmap_prohibition_layer

源码在costmap_prohibition_layer

CMakeLists里可以看到add_library(costmap_prohibition_layer src/costmap_prohibition_layer.cpp), 结果在lib/libcostmap_prohibition_layer.so

看文件package.xml,最后有一句:

1
2
3
<export>
<costmap_2d plugin="${prefix}/costmap_plugins.xml"/>
</export>

costmap_plugins.xml如下:

1
2
3
4
5
<library path="lib/libcostmap_prohibition_layer">
<class type="costmap_prohibition_layer_namespace::CostmapProhibitionLayer" base_class_type="costmap_2d::Layer">
<description>ROS-Package that implements a costmap layer to add prohibited areas to the costmap-2D by a user configuration.</description>
</class>
</library>

以上几项缺一不可,放到工作空间编译后,在终端中输入rospack plugins —attrib=plugin costmap_2d ,这个命令是显示当前costmap_2d能接受的所有插件,如果出现

1
costmap_prohibition_layer /home/user/plan_ws/src/costmap_prohibition_layer/costmap_plugins.xml

说明prohibition_layer已经是一个可供使用的地图插件了。


虚拟墙的坐标.png

参考:
在Costmap_2d里面插入Prohibition_layer


rviz订阅和发布的话题

以前我没注意这个问题,看了一个程序才想起来研究。

打开rviz,发现它发布的话题如下:

1
2
3
4
5
/clicked_point
/initialpose
/move_base_simple/goal
/rosout
/rosout_agg

只看前三个,这三个话题从哪来的?其实就是工具栏的 Publish Point, 2D Pose Estimate, 2D Nav Goal三个按钮。消息类型分别为geometry_msgs/PointStamped, geometry_msgs/PoseWithCovarianceStamped, geometry_msgs/PoseStamped

我们在rviz按下后两个按钮,就可以用echo获得对应点的位姿,注意当前的固定坐标系。如果有了地图,那就可以在地图上选起始点和终点了。

至于订阅的话题,就看在显示栏的添加的项目了。


PoseExtrapolator 2. ImuTracker

IMU Tracker

  1. imu_tracker_: 存放由IMU数据得到的信息,用一帧一帧IMU来推断当前时刻的机器人位姿
  2. odometry_imu_tracker_: 存放由里程计得到的信息,表示AddPoseAddOdometryData之间(即 最新优化的位姿时刻 到 最新里程计数据时刻 之间的时间段)的姿态变化。 在AddOdometryData用于辅助计算线速度
  3. extrapolation_imu_tracker_: 存放经过数据融合后的结果,表示AddPoseExtrapolatePose之间(即 最新优化的位姿时刻 到 最新估计位姿时刻 之间的时间段)的姿态变化。 在ExtrapolatePose用于前端辅助实现激光运动畸变矫正。

重点看IMU Tracker。

一般IMU要安装在被测物体的重心上。imu的角速度测量精度往往比线速度好的多,因此在工程上,经常采用imu的角速度,角速度与时间积分计算角度,得到的角度变化量与初始角度相加,就得到目标角度,其中积分时间 越小输出的角度越精确。但积分的累积误差会随着时间的而增加,所以陀螺仪只能工作在相对较短的时间内。

所以要得到较为真实的姿态角,就要利用一阶低通滤波扬长避短,在短时间内增加陀螺仪的权值,在更长的时间增加加速度的权值,这样系统输出角度就接近真实值了。

ImuTracker是使用角速度和线加速度(就是来自IMU数据) 跟踪 朝向和重力向量。 平均化后的线加速度(假设移动很慢), 是直接测量为g倍数的,roll/pitch不会漂移,但是重力加速度会漂移,也就是说用IMU测量得到的话题/IMU/linear_acceleration/z不是固定的9.8

更新姿态 void Advance(common::Time time)

将姿态更新到时刻time。 通过计算time和上次的时间差,imu的角速度乘时间差得到角位移,原来的姿态右乘角位移得到新的姿态;同时更新时间和重力向量gravity_vector_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void ImuTracker::Advance(const common::Time time) 
{
// 要求指定的时间要比当前时间晚或相等
CHECK_LE(time_, time);
// 求当前时间对指定时间的时间差
const double delta_t = common::ToSeconds(time - time_);
//假设为匀角速度运动的计算, 求得角度, 换成四元数
const Eigen::Quaterniond rotation =
transform::AngleAxisVectorToRotationQuaternion(
Eigen::Vector3d(imu_angular_velocity_ * delta_t));
// 在原朝向基础上继续旋转, 更新方位角
// orientation_ 初始为 Eigen::Quaterniond::Identity()
orientation_ = (orientation_ * rotation).normalized();

/* 车体的旋转方向,跟重力向量的旋转方向是反向的,比如车体向上仰a角度,
重力向量是往反方向即俯方向旋转a角度,
conjugate 返回当前四元数的共轭四元数,就是逆旋转 */
// gravity_vector_ 初始为 Eigen::Vector3d::UnitZ() 向量(0,0,1)
gravity_vector_ = rotation.conjugate() * gravity_vector_;
time_ = time;
}

// 因为imu直接测得角速度,所以直接更新
void ImuTracker::AddImuAngularVelocityObservation(
const Eigen::Vector3d& imu_angular_velocity)
{
imu_angular_velocity_ = imu_angular_velocity;
}

更新重力向量

使用线加速度不断更新重力向量gravity_vector_,再根据重力向量更新 旋转矩阵和方位角。后面还会对齐点云方向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 使用线加速度更新重力向量,使用了一阶低通滤波, 防止加速度跳动剧烈
void ImuTracker::AddImuLinearAccelerationObservation(
const Eigen::Vector3d& imu_linear_acceleration)
{
// 当前时刻离上一次时刻的差, 初值是inf
const double delta_t =
last_linear_acceleration_time_ > common::Time::min()
? common::ToSeconds(time_ - last_linear_acceleration_time_)
: std::numeric_limits<double>::infinity();
last_linear_acceleration_time_ = time_;
// 计算权重, alpha为滤波系数, delta_t越大, 当前的权重越大. delta为inf时,权重为1
// imu_gravity_time_constant_为滤波器常数, 默认为10
const double alpha = 1. - std::exp(-delta_t / imu_gravity_time_constant_);
// 一阶低通滤波, gravity_vector_ 为滤波输出值, imu_linear_acceleration为采样值
// 初始的输出值是 imu_linear_acceleration,如果没有IMU就是 (0,0,1)
gravity_vector_ =
(1. - alpha) * gravity_vector_ + alpha * imu_linear_acceleration;

// 根据新的重力向量,更新对应的旋转矩阵, FromTwoVectors返回一个表示从向量a旋转到向量b的四元数
// rotation 看做线加速度对上次线加速度的校正
// orientation_.conjugate() * Eigen::Vector3d::UnitZ()就是把这个四元数转换成向量的形式
// 在这里我们只关注重力方向上的值,即z轴上的值
const Eigen::Quaterniond rotation = FromTwoVectors(
gravity_vector_, orientation_.conjugate() * Eigen::Vector3d::UnitZ() );
// 更新方位角
orientation_ = (orientation_ * rotation).normalized();
// 如果之前的计算正确,二者的乘积应当是(0,0,1)
CHECK_GT((orientation_ * gravity_vector_).z(), 0.);
CHECK_GT((orientation_ * gravity_vector_).normalized().z(), 0.99);
}

一阶低通滤波,实现了硬件上的RC低通滤波器的功能,滤掉IMU噪声。算法公式为:


这里的X就是线加速度,Y是重力向量,滤波系数 通常远小于1,d为滤波器的常数10

从公式可以看出,时间差越大, (当前线加速度的权重 )越大,当前线加速度的权重越大,上一次的重力向量的权重越小,应该是考虑到长时间内的重力向量数据不可靠

使用我的参数和IMU,发现alpha经常保持很精确的数值0.0009995,原因当然是delta_t

没看出这里如何避免传统一阶滤波的缺点,常说的动态调整滤波系数的要求:当数据快速变化时,滤波结果能及时跟进(灵敏度优先);当数据趋于稳定,在一个固定的点上下振荡时,滤波结果能趋于平稳(平稳度优先)

其实还可以用卡尔曼滤波,效果更好,估计是因为运算负荷问题而没有使用

AdvanceImuTracker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void PoseExtrapolator::AdvanceImuTracker(const common::Time time,
ImuTracker* const imu_tracker) const
{
CHECK_GE(time, imu_tracker->time());
// 当没有IMU数据时,输入一个假的imu数据
if (imu_data_.empty() || time < imu_data_.front().time)
{
// There is no IMU data until 'time', so we advance the ImuTracker and use
// the angular velocities from poses and fake gravity to help 2D stability.
imu_tracker->Advance(time); // 根据时间进行imu数据插值
// 给定一个理论上的重力向量
imu_tracker->AddImuLinearAccelerationObservation(Eigen::Vector3d::UnitZ());
//优先选择里程计得出的角速度给imu
//当里程计的数据小于2个时,选择从pose;否则选择里程计。因为里程计的精度更高
imu_tracker->AddImuAngularVelocityObservation(
odometry_data_.size() < 2 ? angular_velocity_from_poses_
: angular_velocity_from_odometry_ );
return; // 没有IMU可以return了
}
//有imu时,此处调用了imu数据队列,ImuTracker维护的时间早于IMU数据队列最早的时间
if (imu_tracker->time() < imu_data_.front().time)
{
// 先把ImuTracker更新到IMU数据来临的那一刻
imu_tracker->Advance(imu_data_.front().time);
}
// 依次取出IMU数据队列中的数据,更新ImuTracker,直到IMU数据的时间比指定时间time要晚
auto it = std::lower_bound(
imu_data_.begin(), imu_data_.end(), imu_tracker->time(),
[](const sensor::ImuData& imu_data, const common::Time& time) {
return imu_data.time < time;
});
// 用imu数据队列进行imu-tracker的数据更新.
// imu_tracker主要负责对角速度进行插值, 并且估计重力向量
while (it != imu_data_.end() && it->time < time)
{
imu_tracker->Advance(it->time);
imu_tracker->AddImuLinearAccelerationObservation(it->linear_acceleration);
imu_tracker->AddImuAngularVelocityObservation(it->angular_velocity);
++it;
}
imu_tracker->Advance(time);
}

参考:cartographer-位姿推测部分


PoseExtrapolator 1

位姿估计器就是用一段时间的位姿数据估计线速度和角速度,进而预测运动。

位姿估计器在Node中有一个(一条轨迹一个),在LocalTrajectoryBuilder也有一个。 Node中的估计器只有在Node::PublishTrajectoryStates有用,也就只有执行发布数据的时候才会将local_slam_pose添加到其中。所以重点看后者。

LocalTrajectoryBuilder::AddRangeData函数,发现估计器会在两种情况下初始化:如果没在lua里启用IMU,就会在LocalTrajectoryBuilder2D里直接初始化;如果在lua里启用了,会在收到IMU数据后再初始化,否则一直等待,此时也不会处理里程计数据。

从Cartographer流程图可以看出,PoseExtrapolator输入为scan match反馈的位姿IMU里程计三个数据源,估计出下一时刻的位姿,用来作为下次scan match的初值。

框架图

scan match可以获取较高精度的位姿,但是两次扫描匹配之间的运动只有里程计数据,如果里程计误差严重,将影响定位。

InitializeExtrapolator

初始化需要一个pose_queue_duration_(1ms) 和 重力加速度时间常数(10s)。
初始化之后立刻添加一个位姿为0的位姿。添加位姿的过程:实例化imu_tracker_,更新速度信息;处理imu和 odom 数据;初始化odometry_imu_tracker_extrapolator_imu_tracker_都指向 imu_tracker_

1
2
3
4
5
6
7
8
9
10
11
if(extrapolator_ != nullptr)
return;
CHECK(!options_.pose_extrapolator_options().use_imu_based());
// 创建一个PoseExtrapolator类型的位姿推理器
// pose queue 用于速度估计,pose_queue_duration是估计所用的时间
extrapolator_ = absl::make_unique<PoseExtrapolator>(
::cartographer::common::FromSeconds(
options_.pose_extrapolator_options().constant_velocity().pose_queue_duration() ),
options_.pose_extrapolator_options().constant_velocity().imu_gravity_time_constant() );
// 添加一个初始位姿,Rigid3d::Identity()对变换矩阵赋值,位移为0,旋转为1
extrapolator_->AddPose(time, transform::Rigid3d::Identity() );

PoseExtrapolator 的三条线

PoseExtrapolator中管理了三个队列:
  1. imu_data_: 保存imu的双向队列 std::deque
  2. odometry_data_: 保存里程计数据的双向队列
  3. timed_pose_queue_: 保存估计的位姿的双向队列 std::deque。只保存时间在 pose_queue_duration_范围内的位姿,旧的数据要删除掉。 位姿包括从IMU测量得到的Pose,以及从Scan Match输出的PoseObservation。

流程图

AddImuData 和 AddOdometryData

PoseExtrapolator管理里程计数据和经过ImuTracker的IMU数据,这两个函数的调用的最终源头是LocalTrajectoryBuilder2D::AddOdometryDataLocalTrajectoryBuilder2D::AddImuData

AddImuData比较简单,主要是把IMU数据加入imu_data_队列,清除旧的imu数据,保证队列中第2个数据的时间大于最后一个位姿的时间。AddOdometryData除了添加里程计数据,删除比最新位姿旧的odom数据,还根据首尾的odometry数据,计算tracking 坐标系下的线速度和角速度。

AddPose, ExtrapolatePose, EstimateGravityOrientation,这三个函数在LocalTrajectoryBuilder2D::AddRangeData中调用

ExtrapolatePose

目的是预测time时刻tracking坐标系在local坐标系下的位姿。假设传感器数据之间,机器人以恒定线速度和恒定角速度移动( local坐标系 )

估计的线速度有2种方式组成,优先级参见ExtrapolateTranslation

  1. 由于里程计的更新速度更快,优先使用里程计获得线速度
  2. 采用两次scan-match位置进行微分获得

估计的角速度有3种方式,按优先级(参见AdvanceImuTracker开始部分):

  1. 采优先使用IMU数据获取角速度
  2. 采用里程计进行微分
  3. 采用两次scan-match的朝向进行微分获得,频率最低

以上估计线速度和角速度所用的两次scan mathch的位姿的处理都在UpdateVelocitiesFromPoses。如果出现了报警: Queue too short for velocity estimation,可以尝试降低odometry_sampling_ratioimu_sampling_ratio,先从1降到0.5

最后针对线速度和角速度进行实时积分,输出当前时间的融合的估计位姿

AddPose

存入的是前端一帧一帧激光匹配的位姿,就是两个node之间的位姿。由于pose是由激光匹配产生的,而激光的频率是比IMU和odom要低的,所以这里pose的传入频率比IMU、odom低,比如传入5帧IMU,才插入一个pose。

位姿估计器十分相信scan match得到的位姿,如果遇到长走廊等环境相似的区域,这个位姿就会有问题

只在Node::PublishTrajectoryStatesLocalTrajectoryBuilder2D::AddAccumulatedRangeData调用。重点是后者,这个pose是Scan Match反馈给估计器以修正估计值, 也就是添加到timed_pose_queue_队列

函数大致流程:

  • 确保ImuTracker实例化:实例化时需要用到重力加速度常数,初始时间。重力常数主要用来做距离的修正。

  • 将该位姿pose加入到timed_pose_queue_中,删除其中比加入时间早 pose_queue_duration_的位姿。

  • 根据timed_pose_queue_中的位姿估计速度:找出队列中最久和最新的位姿和时间, 通过除以时间得到线速度(linear_velocity_from_poses_)和角速度(angular_velocity_from_poses_)。 计算角速度的时候是使用四元数old_q.inverse() * new_q 得到old_q 到 new_q 的角位移,然后除以时间得到的。队列中的位姿时间要求如下:
    timed_pose_queue队列
    一般选择1ms以内的姿态来估计,而且旧的数据要删除掉。

  • 进一步处理ImuTracker:如果到该时间之前没有imu数据,那么使用从poses中推断的角速度和一个假的重力来保障2d的稳定。如果有IMU,使用IMU数据进行姿态估计(imu_tracker_),并将此时估计出的姿态用于计算 里程计线速度 和最新估计姿态 的初值(odometry_imu_tracker_ extrapolation_imu_tracker_

GetLastPoseTime

1
2
3
4
5
6
common::Time PoseExtrapolator::GetLastPoseTime() const {
if (timed_pose_queue_.empty()) {
return common::Time::min();
}
return timed_pose_queue_.back().time;
}

就是返回队列最后一个pose对应的时间

在位姿融合中,因为IMU和里程计的采集速率比scan match快很多,因而在一定时间段内,可能没有scan match估计的位姿数据,这时需要通过 IMU 和里程计数据进行推断,但因为这两个传感器存在相对激光扫描较大的累积误差,所以需要对两者进行一些更新和初始化消除累积误差

最后以里程计数据为例,补上整个的处理流程

为什么每次新建地图的角度不同:每次开始的方向不同,从第一帧雷达数据到来时,前端子图的坐标原点来自ExtrapolatePose,此时机器人发生旋转,ExtrapolatePose输出的位姿朝向会不同;另外还有IMU的数据问题。


(五) 传感器数据的分发处理

SensorBridge::HandleLaserScan之后进入下面的流程:

雷达和imu数据队列按时间整理,没有对齐

trajectory_builder_->AddSensorData当中的trajectory_builder_其实是CollatedTrajectoryBuilder,它的根源在sensor_bridge的初始化:

1
2
3
4
5
6
7
sensor_bridges_[trajectory_id] = 
cartographer::common::make_unique<SensorBridge>(
trajectory_options.num_subdivisions_per_laser_scan,
trajectory_options.tracking_frame,
node_options_.lookup_transform_timeout_sec, tf_buffer_,
// 实际类型是 CollatedTrajectoryBuilder
map_builder_->GetTrajectoryBuilder(trajectory_id) );

GetTrajectoryBuilder的里面是trajectory_builders_.push_back( absl::make_unique<CollatedTrajectoryBuilder>() ),已经写死了,所以说 后面的TrajectoryBuilderInterface只能是CollatedTrajectoryBuilder

trajectory_builder_->AddSensorData实际是:

1
2
3
4
5
6
void AddSensorData(
const std::string& sensor_id,
const sensor::TimedPointCloudData& timed_point_cloud_data) override
{
AddData(sensor::MakeDispatchable(sensor_id, timed_point_cloud_data));
}

将一个sensor::TimedPointCloudData的数据,变换成了 Dispatchable 的格式。 Dispatchable是一个类模板,所有类型的数据, 都会被放在data_里面,这个的作用会在后面显现,靠这个去判断调用的是GlobalTrajectoryBuilder里面的哪个函数

AddData就一句sensor_collator_->AddSensorData(trajectory_id_, std::move(data));. 根据参数collate_by_trajectoryCollatorTrajectoryCollator)选择, 默认是sensor::Collator,所以是sensor::Collator::AddSensorData:

1
2
3
4
5
6
7
8
void Collator::AddSensorData(const int trajectory_id,
std::unique_ptr<Data> data)
{
QueueKey queue_key{trajectory_id, data->GetSensorId()};
// Queue keys are a pair of trajectory ID and sensor identifier
// OrderedMultiQueue queue_;
queue_.Add(std::move(queue_key), std::move(data));
}

然后会调用OrderedMultiQueue::add()的函数,把数据存入队列里,形成queues_
1
2
3
(0, scan):  {      4,   }    带callback
(0, imu) : {1, 3, 5 } 带callback
(0, odom): { 2, 6} 带callback

最后再调用OrderedMultiQueue::Dispatch()

OrderedMultiQueue::Dispatch()

函数将队列中的数据根据时间依次传入回调函数 GlobalTrajectoryBuilder::AddSensorData

这里的处理是生产者——消费者模式。 生产者 OrderedMultiQueue::Add     消费者 CollatedTrajectoryBuilder::HandleCollatedSensorData

这个分发函数就太复杂了,涉及到OrderedMultiQueue这个数据结构了,它的主要作用就是管理多个有序的传感器数据, 主要的体现就是成员变量std::map<QueueKey, Queue> queues_,它会形成这么一个组织结构:

1
2
key1(sensor_1): queue
key2(sensor_2): queue

queue里面是按时间的数据的组织:
1
2
3
4
5
struct Queue {
common::BlockingQueue<std::unique_ptr<Data>> queue;
Callback callback; //回调
bool finished = false;
};

这里发现了Queue有个成员是 callback 函数,在Dispatch函数中,如果找出来的数据, 那么就调用这个数据的callback函数。那么在哪儿引入了这个callback函数呢? 结果发现是在OrderedMultiQueue::AddQueue,它又在Collator::AddTrajectory中调用,这样就有了另一条线。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// CollatedTrajectoryBuilder 的构造函数调用该函数
// CollatedTrajectoryBuilder 的 HandleCollatedSensorData 作为回调函数callback传进来
// 添加轨迹以生成排序的传感器输出,每个topic设置一个回调函数
void Collator::AddTrajectory(
const int trajectory_id,
const absl::flat_hash_set<std::string>& expected_sensor_ids,
const Callback& callback)
{
for (const auto& sensor_id : expected_sensor_ids)
{
const auto queue_key = QueueKey{trajectory_id, sensor_id};
// 根据QueueKey,将对应的回调函数callback(CollatedTrajectoryBuilder::HandleCollatedSensorData)
// 放入 queue_ (OrderedMultiQueue::AddQueue)里
queue_.AddQueue(queue_key,
[callback, sensor_id](std::unique_ptr<Data> data) {
callback(sensor_id, std::move(data));
});
queue_keys_[trajectory_id].push_back(queue_key);
}
}

CollatedTrajectoryBuilder处理传感器数据,使其按照时间排列,然后传入GlobalTrajectoryBuilder,最终的回调函数其实就是GlobalTrajectoryBuilder::AddSensorData


(六) 核心 --- GlobalTrajectoryBuilder::AddSensorData

TrajectoryBuilder是用来创建一个trajectory的。

该类首先要保存trajectory上的传感器数据,从一个连续的多帧传感器数据中会抽取若干关键帧。一帧关键帧数据被称为trajectory上的一个节点,一条trajectory由一串节点组成,所以TrajectoryBuilder要维护一个节点列表

每一帧时的传感器数据相对于该Submap的局部坐标变换要已知

该类要创建并维护一个栅格化的Submap列表,以便在MapBuilder中对所有的submap进行优化,形成一个总的Map。每个Submap相对于世界坐标系的位姿要估计出来,这样PoseGraph才能依据不同的submap各自的位姿变换矩阵,把他们tie在一起并做全局优化。

GlobalTrajectoryBuilder

TrajectoryBuilderInterface的继承关系

分为前端和后端两部分:local_trajectory_builder和pose_graph。GlobalTrajectoryBuilder实际上是一个模板类,模板列表中的LocalTrajectoryBuilder2DPoseGraph2D分别是前端和后端的两个核心类型。

成员变量很少:

完成指定ID的trajectory中前端和后端的联系,需要获取trajectory的ID。

类体内的成员函数AddLocalSlamResultData是把前端结果直接给后端,但是不能是当前的前端。这个仅用于后端约束优化。这个函数没有被调用,LocalSlamResult2D::AddToTrajectoryBuilder中被调用的同名函数应当是CollatedTrajectoryBuilder

时间点云的 AddSensorData

类有多个AddSensorData函数,其中最重要的是时间点云对应的,也是cartographer最核心的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 参数sensor_id记录了产生数据的传感器, timed_point_cloud_data则是传感器产生的数据
void AddSensorData(
const std::string& sensor_id,
const sensor::TimedPointCloudData& timed_point_cloud_data) override
{
// 检查一下前端核心对象是否存在
CHECK(local_trajectory_builder_)
<< "Cannot add TimedPointCloudData without a LocalTrajectoryBuilder.";
// Local SLAM的业务主线, 在该结果中同时记录了子图的更新信息
std::unique_ptr<typename LocalTrajectoryBuilder::MatchingResult>
matching_result = local_trajectory_builder_->AddRangeData(
sensor_id, timed_point_cloud_data);
// 结果中包含了轨迹中节点的pose、点云和submap的相关信息
if (matching_result == nullptr)
// The range data 还没有fully accumulated yet.
return;


/* 将前端的输出结果喂给后端进行闭环检测和全局优化 */
kLocalSlamMatchingResults->Increment();
// 前端匹配的结果,即submap位置和submap
std::unique_ptr<InsertionResult> insertion_result;
// 判定前端成功的是否将传感器的数据插入到子图中
if (matching_result->insertion_result != nullptr)
{
kLocalSlamInsertionResults->Increment();
// 将匹配后的结果加入图优化节点中
// 把前端的输出结果喂给后端,即后端增加一个新的节点
// 后端优化位姿图节点ID,记录在临时对象 node_id
auto node_id = pose_graph_->AddNode(
matching_result->insertion_result->constant_data, trajectory_id_,
matching_result->insertion_result->insertion_submaps );
CHECK_EQ(node_id.trajectory_id, trajectory_id_);

// 重新封装前端匹配的后的submap结果,并增加一个node_id
insertion_result = absl::make_unique<InsertionResult>( InsertionResult{
node_id, matching_result->insertion_result->constant_data,
std::vector<std::shared_ptr<const Submap>>(
matching_result->insertion_result->insertion_submaps.begin(),
matching_result->insertion_result->insertion_submaps.end()) } );
}
// 如果提供了回调函数,就调用之,并将前端的输出和刚刚构建的
//insertion_result对象传参,也就是采用回调函数将前端匹配的结果信息传递出去
if (local_slam_result_callback_)
{
local_slam_result_callback_(
trajectory_id_, matching_result->time, matching_result->local_pose,
std::move(matching_result->range_data_in_local),
std::move(insertion_result) );
}
}

基类TrajectoryBuilderInterface中有个成员变量InsertionResult,它描述前端的一次子图更新操作,将传感器的扫描数据插入子图中。
1
2
3
4
5
6
7
8
9
struct InsertionResult
{
// 有两个字段,分别记录了轨迹的索引(trajectory_id)和一个从零开始计数的节点编号(node_index)
NodeId node_id;
// 子图更新时在局部地图中的位姿,以及有传感器原始数据转换之后的点云信息
std::shared_ptr<const TrajectoryNode::Data> constant_data;
// 被更新的子图对象
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps;
};

根据NodeId,可以把一条轨迹理解为由若干个节点串联起来的一个数据结构。

LocalTrajectoryBuilder2D中也有一个InsertionResult但是注意前端类不是继承自 TrajectoryBuilderInterface, 这两个没有直接关系,前端只关心更新子图时的位姿和点云信息,不考虑在它与整个运动轨迹之间的关系,所以缺少了字段node_id

1
2
3
4
5
6
struct InsertionResult
{
// 插入的节点数据,TrajectoryNode::Data 包含了处理之后的点云数据
std::shared_ptr<const TrajectoryNode::Data> constant_data;
std::vector<std::shared_ptr<const Submap2D>> insertion_submaps;
};

里程计和IMU的 AddSensorData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void AddSensorData(const std::string& sensor_id,
const sensor::ImuData& imu_data) override
{
// InitializeExtrapolator 和 extrapolator_->AddImuData
if (local_trajectory_builder_)
local_trajectory_builder_->AddImuData(imu_data);

pose_graph_->AddImuData(trajectory_id_, imu_data);
}

void AddSensorData(const std::string& sensor_id,
const sensor::OdometryData& odometry_data) override
{
CHECK(odometry_data.pose.IsValid()) << odometry_data.pose;
if (local_trajectory_builder_)
// 实际有用的就一句 extrapolator_->AddOdometryData(odometry_data);
local_trajectory_builder_->AddOdometryData(odometry_data);

pose_graph_->AddOdometryData(trajectory_id_, odometry_data);
}

先判定存在前端对象,将数据喂给前端对象进入PoseExtrapolator,然后通过pose_graph将传感器的信息添加到全局地图中。

Cartographer将类似GPS这种具有全局定位能力的传感器输出的位姿称为 固定坐标系位姿(fixed frame pose)。由于它们的测量结果是全局的信息,所以没有喂给前端用于局部定位。此外路标数据也可以认为是全局的定位信息,也直接喂给了后端


(四) LaunchSubscribers流程

上一篇的Node::LaunchSubscribers的编程技巧比较复杂,只需要知道是执行 Node::Handle***Message那几个函数即可

流程图
然后就进入了 GlobalTrajectoryBuilder::AddSensorData

1
2
3
4
5
6
7
8
9
10
11
void SensorBridge::HandleLaserScanMessage(
const std::string& sensor_id, const sensor_msgs::LaserScan::ConstPtr& msg)
{
// 转换之后的点云数据和时间戳
carto::sensor::PointCloudWithIntensities point_cloud;
carto::common::Time time;
// 将ROS的消息转换成点云数据,函数在 msg_conversion.cc
// 只有一句 LaserScanToPointCloudWithIntensities(msg)
std::tie(point_cloud, time) = ToPointCloudWithIntensities(*msg);
HandleLaserScan(sensor_id, time, msg->header.frame_id, point_cloud);
}

只要一个时间是 carto::common::Time 类型,那么一定是绝对时间戳。

TimedPointCloud是后面一直使用的类型,转换流程在这里
类型转换

LaserScanToPointCloudWithIntensities

不考虑强度时,用雷达扫描只获得range和angle两个参数,LaserScanToPointCloudWithIntensities(msg)实际是把这两个参数转换为扫描点在laser坐标系下的坐标,时间戳以最后一束激光为基准。 同时每个激光点也包含一个相对最后一个点的时间偏移。这 样通过这个偏移就可以求出每个点的时间戳了。

不考虑多回波雷达,直接看时间戳处理部分:

1
2
3
4
5
6
7
8
9
10
::cartographer::common::Time timestamp = FromRos(msg.header.stamp);
if (!point_cloud.points.empty())
{
const double duration = point_cloud.points.back()[3];
// scan 最后一束激光的时间戳
timestamp += cartographer::common::FromSeconds(duration);
// 时间戳变成以最后一个测量数据为基准
for (Eigen::Vector4f& point : point_cloud.points)
point[3] -= duration;
}

这里把一帧激光的时间戳进行处理,有了绝对时间戳和相对时间戳两个概念。

我所用的雷达一帧有1040个点, 参数 time_increment: 1.73611115315e-05,     duration: 0.0180382

time_increment * 1040 = duration

timestamp: 634369239333347047

每个点的时间戳在处理前:

1
2
3
4
5
6
7
8
point.time:  0
point.time: 1.73611e-05
point.time: 3.47222e-05
point.time: 5.20833e-05
......
point.time: 0.0180035
point.time: 0.0180208
point.time: 0.0180382 # 同duration必然相同

然后每个时间戳减去 duration,这样最后一个点的时间戳就是0,之前的全是负值。 timestamp开始是第一个点的时间,最后处理为最后一个点的时间戳。

这里的时间戳计算是以雷达每束激光的 时间差相等和第一束激光的时间戳最小 为前提的,如果雷达的时间戳相反(逆向)或时间差不均匀,就会出错。

坐标如此处理:比如range为3,角度为45°,相应的坐标是(2.12132,2.12132,0)
转换示意图.png

HandleLaserScan 和 num_subdivisions_per_laser_scan 分段

接下来的HandleLaserScan按照参数num_subdivisions_per_laser_scan把点云分成了很多小段,每一段数据的时间偏移调整为以这一段的最后一条数据为基准,也就是把一帧scan拆成几段,每段按一个TimedPointCloudData发送,到了LocalTrajectoryBuilder2D::AddRangeData又把它们拼接起来,作为当前帧输入。

num_subdivisions_per_laser_scan一般设置为1,也就是不分段,可以不必仔细研究,直接看HandleRangefinder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void SensorBridge::HandleLaserScan(
const std::string& sensor_id, const carto::common::Time time,
const std::string& frame_id,
const carto::sensor::PointCloudWithIntensities& points)
{
if (points.points.empty()) return;
CHECK_LE(points.points.back()[3], 0); //最后一个点云数据的时间小于等于0
for (int i = 0; i != num_subdivisions_per_laser_scan_; ++i)
{
const size_t start_index =
points.points.size() * i / num_subdivisions_per_laser_scan_;
const size_t end_index =
points.points.size() * (i + 1) / num_subdivisions_per_laser_scan_;
// 每一小段
carto::sensor::TimedPointCloud subdivision(
points.points.begin() + start_index, points.points.begin() + end_index);
if (start_index == end_index)
{
continue;
}
// 参考分段中最后一个数据的时间,调整其他数据的时间
const double time_to_subdivision_end = subdivision.back()[3];
// subdivision_time is the end of the measurement so sensor::Collator will
// send all other sensor data first.
// 先确认当前的数据没有过时,如果分段的时间落后于记录值,将抛弃所对应的数据
const carto::common::Time subdivision_time =
time + carto::common::FromSeconds(time_to_subdivision_end);
auto it = sensor_to_previous_subdivision_time_.find(sensor_id);
if (it != sensor_to_previous_subdivision_time_.end() &&
it->second >= subdivision_time)
{
LOG(WARNING) << "Ignored subdivision of a LaserScan message from sensor "
<< sensor_id << " because previous subdivision time "
<< it->second << " is not before current subdivision time "
<< subdivision_time;
continue;
}
sensor_to_previous_subdivision_time_[sensor_id] = subdivision_time;
for (Eigen::Vector4f& point : subdivision)
{
point[3] -= time_to_subdivision_end;
}
CHECK_EQ(subdivision.back()[3], 0);
// 将分段数据喂给Cartographer
HandleRangefinder(sensor_id, subdivision_time, frame_id, subdivision);
}
}

HandleRangefinder

查询tracking_frame--->laser坐标系的变换, 把之前在laser坐标系的点云,转换为在tracking_frame坐标系(一般是base_footprint)的点云,类型变成时间点云数据TimedPointCloudData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void SensorBridge::HandleRangefinder(
const std::string& sensor_id, const carto::common::Time time,
const std::string& frame_id, const carto::sensor::TimedPointCloud& ranges)
{
// 查询在time时间,tracking_frame--->laser坐标系的变换,记录到对象 sensor_to_tracking
// 这里有时会卡住
const auto sensor_to_tracking =
tf_bridge_.LookupToTracking(time, CheckNoLeadingSlash(frame_id));
// 如果找到变换,通过trajectory_builder_ 添加传感器数据
if (sensor_to_tracking != nullptr)
{
trajectory_builder_->AddSensorData(
// 第一个转换是 TransformTimedPointCloud, 把数据从 sensor frame到tracking frame
// 第二个转换是从 TimedPointCloud 转到了 TimedPointCloudData
sensor_id, carto::sensor::TimedPointCloudData{
time, sensor_to_tracking->translation().cast<float>(),
carto::sensor::TransformTimedPointCloud(
ranges, sensor_to_tracking->cast<float>() ) } );
}
}

TfBridgeSensorBridge的构造函数里初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TfBridge {
public:
TfBridge(const std::string& tracking_frame,
double lookup_transform_timeout_sec, const tf2_ros::Buffer* buffer);
~TfBridge() {}

TfBridge(const TfBridge&) = delete;
TfBridge& operator=(const TfBridge&) = delete;

// 查询在time时间,frame_id--->tracking_frame 的变换,如果存在的话
// 也就是说 tracking_frame 是 target frame
std::unique_ptr<::cartographer::transform::Rigid3d> LookupToTracking(
::cartographer::common::Time time, const std::string& frame_id) const;

private:
const std::string tracking_frame_;
const double lookup_transform_timeout_sec_;
const tf2_ros::Buffer* const buffer_;
};


Voxel Filter和Adaptive Voxel Filter

近处的物体经常扫描得到更多的点,而远处的物体的点经常比较稀少。为了降低计算量,需要对点云数据进行降采样,但简单的随机采样仍然是这个效果。

cartographer中的VoxelFilter是把空间分为很多个立方体的栅格,一个栅格内可能有很多点, 但是只取落入栅格内的第一个点,而PCL库实现的体素滤波是用所有点的重心来代表整体

VoxelFilter

LocalTrajectoryBuilder2D::AddRangeData函数的最后:

1
2
3
4
5
6
7
return AddAccumulatedRangeData(
time,
TransformToGravityAlignedFrameAndFilter(
gravity_alignment.cast<float>() * range_data_poses.back().inverse(),
accumulated_range_data_),
gravity_alignment,
sensor_duration);

也就是第二个参数,这个很长名字的函数如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sensor::RangeData
LocalTrajectoryBuilder2D::TransformToGravityAlignedFrameAndFilter(
const transform::Rigid3f& transform_to_gravity_aligned_frame,
const sensor::RangeData& range_data) const
{
//裁剪数据,指定一定z轴范围内的数据
const sensor::RangeData cropped =
sensor::CropRangeData(sensor::TransformRangeData(
range_data, transform_to_gravity_aligned_frame),
options_.min_z(), options_.max_z());
return sensor::RangeData{
cropped.origin,
sensor::VoxelFilter(options_.voxel_filter_size()).Filter(cropped.returns),
sensor::VoxelFilter(options_.voxel_filter_size()).Filter(cropped.misses) };
}

简单说,这些滤波就是对点云数据的预处理:去除不合理的范围的点、合并相同位置的点等,从而减少点云数量,提高点云质量。

滤波的基本原理:将xyz分别除以分辨率,并取整,得到xyz所在cell的id(x,y,z),再将该三个整数分别放入3*32个字节的数中(很好的技巧,避免了大量的遍历和比较),插入voxelFileter::voxel_set_中,后面如果是第一次插入该数,则保留该点,如果之前存在该数,即意味着有其他点落在该网格内,不再保留该点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// using PointCloud = std::vector<RangefinderPoint>;
PointCloud VoxelFilter::Filter(const PointCloud& point_cloud)
{
PointCloud results;
for (const RangefinderPoint& point : point_cloud)
{
auto it_inserted =
voxel_set_.insert(IndexToKey(GetCellIndex(point.position)));
// 该体素之前没有没占据过,则插入,否则不插入
if (it_inserted.second)
results.push_back(point);
}
return results;
}

GetCellIndex: 3D点除以体素大小,得到所在的cell_index

IndexToKey: 将cell_index转换为一个32字节的数

AdaptiveVoxelFilter

adaptive_voxel_filter可以在最大边长 adaptive_voxel_filter.max_length 的限制下优化确定 voxel_filter_size 来实现目标的 .adaptive_voxel_filter.min_num_points

还是LocalTrajectoryBuilder2D::AddAccumulatedRangeData,这次看中间部分代码:

1
2
3
const sensor::PointCloud& filtered_gravity_aligned_point_cloud =
sensor::AdaptiveVoxelFilter(options_.adaptive_voxel_filter_options())
.Filter(gravity_aligned_range_data.returns);

也就是

1
2
3
4
5
PointCloud AdaptiveVoxelFilter::Filter(const PointCloud& point_cloud) const 
{
return AdaptivelyVoxelFiltered(
options_, FilterByMaxRange(point_cloud, options_.max_range()) );
}

AdaptiveVoxelFilter的构造函数按老规矩还是全赋值option,然后调用Filter,接着就是AdaptivelyVoxelFiltered函数: 再调用一次体素滤波,如果体素滤波后点数大于阈值,则返回; 如果小于阈值,则接着使用二分法进行体素滤波,使最终的点云数接近 min_num_points min_num_points <= 最终点云数 <= min_num_points x 1.1 网上说这里的二分查找计算量比较大,如果参数min_num_pointsmax_length 设置不好,会增加CPU,但我试了试发现影响很小。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
PointCloud AdaptivelyVoxelFiltered(
const proto::AdaptiveVoxelFilterOptions& options,
const PointCloud& point_cloud)
{
// 已经足够稀疏,默认200
if (point_cloud.size() <= options.min_num_points()) {
return point_cloud;
}
// 还是先调用VoxelFilter来滤波,参数 max_length
PointCloud result = VoxelFilter(options.max_length()).Filter(point_cloud);
// 按max_length进行滤波,还超过这个数,说明是这个参数能实现的最稀疏
if (result.size() >= options.min_num_points()) {
return result;
}

// 如果小于min_num_points,点云太稀疏,用于扫描匹配就不准了
// 说明上面的 max_length太大了,现在要使用二分法找一个合适的参数
// Search for a 'low_length' that is known to result in a sufficiently
// dense point cloud. We give up and use the full 'point_cloud' if reducing
// the edge length by a factor of 1e-2 is not enough.
for (float high_length = options.max_length();
high_length > 1e-2f * options.max_length();
high_length /= 2.f)
{
float low_length = high_length / 2.f;
result = VoxelFilter(low_length).Filter(point_cloud);
if (result.size() >= options.min_num_points())
{
// 二分查找法找到合适的过滤器
// low_length给出足够稠密的结果
// 当 edge length is at most 10% off 时停止
while ((high_length - low_length) / low_length > 1e-1f)
{
const float mid_length = (low_length + high_length) / 2.f;
const PointCloud candidate =
VoxelFilter(mid_length).Filter(point_cloud);
// 点数又多了,增加边长,让 low_length 变大
if (candidate.size() >= options.min_num_points()) {
low_length = mid_length;
result = candidate;
}
// 点数偏少,让high_length变小
else {
high_length = mid_length;
}
}
return result;
}
}
return result;
}

举例:刚开始,点云数量390,Voxel Filter之后只有58个,而参数min_num_points默认50,此时就返回了。

参数min_num_points改为100,点云数量390,Voxel Filter之后只有58个,进入二分法过滤,最后的点云数量为110.