ubuntu的显卡及驱动

ubuntu-drivers devices查看显卡硬件型号,也就是其中的model。 如果同意安装推荐版本,那我们只需要终端输入:sudo ubuntu-drivers autoinstall 就可以自动安装了。
当然我们也可以使用 apt 命令安装自己想要安装的版本,比如我想安装 340 这个版本号的版本,终端输入:sudo apt install nvidia-340 就自动安装了。

lshw -numeric -C display | grep product也可以查看电脑显卡的十六进制ID

驱动安装

电脑装好好,启动一直失败,报错failed to start User Manager for UID 121,连文本界面都进不去,后来才知道是ubuntu下缺少显卡驱动

先完全卸载之前安装的显卡驱动

ppa源文件卸载:sudo apt-get remove --purge nvidia*

runfile源文件卸载:sudo ./NVIDIA-Linux-x86_64-384.59.run --uninstall

ppa源驱动安装

先查询电脑最适合的显卡驱动版本,命令 ubuntu-drivers devices,这个命令不仅适用于显卡

最佳显卡驱动版本为 nvidia-driver-435, 用命令行进行安装

1
2
3
4
sudo add-apt-repository ppa:graphics-drivers/ppa
sudo apt-get update
sudo apt-get install nvidia-driver-435 #此处数字要对应上面查询到的版本号
sudo apt-get install mesa-common-dev

如果前面没有禁用secure boot,则在安装过程中会提示设置一个密码,在重启时需要输入密码验证以禁用secure boot,重启后会出现蓝屏,这时候不能直接选择continue, 而应该按下按键,选择Enroll MOK, 确认后在下一个选项中选择continue,接着输入安装驱动时设置的密码,开机。

安装完成后重启,nvidia-smi命令显示驱动信息,若出现驱动版本,就是成功

官网驱动下载

到 NVIDIA 的官网下载相应型号的驱动,官网地址
操作系统那里,一定选择Linux 64-bit

需要先安装一些 NVIDIA 显卡依赖的软件,在终端依次执行如下命令:

1
2
3
sudo dpkg --add-architecture i386
sudo apt update
sudo apt install build-essential libc6:i386

Ubuntu 系统默认安装好是使用的一个开源的驱动:nouveau,我们需要先禁用这个开源驱动,方法如下,依次执行:
1
2
sudo bash -c "echo blacklist nouveau > /etc/modprobe.d/blacklist-nvidia-nouveau.conf"
sudo bash -c "echo options nouveau modeset=0 >> /etc/modprobe.d/blacklist-nvidia-nouveau.conf"

重启一下系统,运行之前下载的驱动。可能会出现下面的信息:
1
2
3
The distribution-provided pre-install script failed!
Are you sure you want to continue? -> CONTINUE INSTALLATION
Would you like to run the nvidia-xconfig utility? -> YES

安装完成后重启系统就可以点击软件列表中的 NVIDIA 的配置软件配置显卡驱动了,如果你遇到
1
WARNING: Unable to find suitable destination to install 32-bit compatibility libraries

解决办法:
1
2
3
sudo dpkg --add-architecture i386
sudo apt update
sudo apt install libc6:i386


禾赛16线雷达
  • 扫描频率:转速。10Hz的含义是一秒雷达转10圈

  • 帧率:一秒内激光雷达电机旋转的圈数,每秒完成一圈扫描的次数。一幅点云图像代表一帧,对应激光雷达内部就是电机旋转一圈完成扫描

  • 输出点云中相邻两个点之间的夹角就是角分辨率。 水平方向上做到高分辨率不难,垂直分辨率和发射器的几何大小相关也与其排布相关。相邻两个发射器间隔越小垂直分辨率越小,禾赛的是2°

  • 输出数据: 距离、角度、反射率、时间戳

禾赛雷达的参数,波长905nm。 最常用的波长是905nm和1550 nm。1550nm波长LiDAR传感器可以以更高的功率运行,以提高探测范围,同时对于雨雾的穿透力更强。


注意这是禾赛雷达,其他雷达的线束索引和角度的关系不一定是这样了,比如速腾的线束1对应-15°,线束2对应-13°,以此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
header:
seq: 42200 # 第几帧数据
stamp:
secs: 1640071161
nsecs: 165116000
frame_id: "base_link"

# 若为有序点云,height 可以大于 1,即多行点云,每行固定点云的宽度
# 若为无序点云,height 等于1,即一行点云,此时 width 即点云的数量
height: 1
width: 60676 # 一行点云的长度
fields:
-
name: "x"
offset: 0
datatype: 7 # 对应 Float32
count: 1
-
name: "y"
offset: 4
datatype: 7
count: 1
-
name: "z"
offset: 8
datatype: 7
count: 1
-
name: "intensity"
offset: 16
datatype: 7
count: 1
-
name: "timestamp"
offset: 24
datatype: 8
count: 1
-
name: "ring"
offset: 32
datatype: 4
count: 1
is_bigendian: False
# 一个点占的字节数
point_step: 48
# 一行的长度占用的字节数,也就是 point_step * width
row_step: 2912448
data: [] # 省略

# 若点云中的数据都是有限的(不包含 inf/NaN), 则为 true, 否则为 false
is_dense: True

目前把雷达坐标系设置为base_link了。

对有序点云进行操作(降采样、刚体变换)后,有序点云就会变成无序点云。velodyne雷达选择给每个点多加了一个属性ring, 详细定义如下:

1
2
3
4
5
6
7
8
9
10
namespace velodyne_pointcloud
{
struct PointXYZIR
{
PCL_ADD_POINT4D; // quad-word XYZ
float intensity;
uint16_t ring;
EIGEN_MAKE_ALIGNED_OPERATOR_NEW // ensure proper alignment
} EIGEN_ALIGN16;
}; // namespace velodyne_pointcloud

这个ring表示每个点之前属于哪个线的,16线的话,这个值为0~15。并且高度自低向高排列。


通过官方的 PandarView 软件可以用于录制和播放点云数据,点击Record(工具栏上的红色圆点)即可。这样录制的包的格式是pcap。如果是velodyne雷达,可以在ROS中运行以下指令即可播放这个包
rosrun velodyne_driver velodyne_node _model:=VLP16 _pcap:=/home/q/VLP16Data.pcap

驱动

型号PandarXT-16
禾赛雷达的驱动

节点/hesai/hesai_lidar无订阅,发布两个话题:

  • /hesai/pandar [sensor_msgs/PointCloud2] 被节点data_pretreat_node订阅
  • /hesai/pandar_packets [hesai_lidar/PandarScan],无内容

禾赛雷达数据占用的带宽.png

参考:
各品牌雷达的参数
Velodyne VLP-16激光雷达数据分析
Velodyne16 与禾赛Pandar XT16测试


cartographer的缺点
abstract Welcome to my blog, enter password to read.
Read more
最后阶段 RunFinalOptimization

现在返回到最初的node_main.cc中的Run函数,还有一句node.RunFinalOptimization();,所有轨迹结束时,再执行一次全局优化。 其实就是MapBuilderBridge::RunFinalOptimization —— PoseGraph2D::RunFinalOptimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void PoseGraph2D::RunFinalOptimization()
{
{
// 参数 max_num_final_iterations 默认 200
// 优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.max_num_final_iterations());
return WorkItem::Result::kRunOptimization;
});
// 参数 max_num_iterations, 默认 50
// 不优化标志
AddWorkItem([this]() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
optimization_problem_->SetMaxNumIterations(
options_.optimization_problem_options()
.ceres_solver_options()
.max_num_iterations());
return WorkItem::Result::kDoNotRunOptimization;
});
}
// 另一个调用地方在 PoseGraph2D 析构
WaitForAllComputations();
}

在建图结束之后会运行一个新的全局优化,不要求实时性,迭代次数多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void PoseGraph2D::WaitForAllComputations()
{
int num_trajectory_nodes;
{
absl::MutexLock locker(&mutex_);
num_trajectory_nodes = data_.num_trajectory_nodes;
}
const int num_finished_nodes_at_start =
constraint_builder_.GetNumFinishedNodes();

auto report_progress = [this, num_trajectory_nodes,
num_finished_nodes_at_start]() {
// Log progress on nodes only when we are actually processing nodes.
if (num_trajectory_nodes != num_finished_nodes_at_start) {
std::ostringstream progress_info;
progress_info << "Optimizing: " << std::fixed << std::setprecision(1)
<< 100. *
(constraint_builder_.GetNumFinishedNodes() -
num_finished_nodes_at_start) /
(num_trajectory_nodes - num_finished_nodes_at_start)
<< "%...";
std::cout << "\r\x1b[K" << progress_info.str() << std::flush;
}
};

  • num_finished_nodes_at_start表示此时已经优化的节点数,假设为 10

  • GetNumFinishedNodes返回当前已经优化的节点数,假设为 11,也就是这个函数开始后,优化了1个节点

  • num_trajectory_nodes,轨迹的总节点数

progress_info 输出的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  // First wait for the work queue to drain so that it's safe to schedule
// a WhenDone() callback.
{
const auto predicate = [this]()
EXCLUSIVE_LOCKS_REQUIRED(work_queue_mutex_) {
return work_queue_ == nullptr;
};
absl::MutexLock locker(&work_queue_mutex_);
while (!work_queue_mutex_.AwaitWithTimeout(
absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
}

// Now wait for any pending constraint computations to finish.
absl::MutexLock locker(&mutex_);
bool notification = false;
constraint_builder_.WhenDone(
[this,
&notification](const constraints::ConstraintBuilder2D::Result& result)
LOCKS_EXCLUDED(mutex_) {
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end());
notification = true;
});
const auto predicate = [&notification]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return notification;
};
while (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
absl::FromChrono(common::FromSeconds(1.)))) {
report_progress();
}
CHECK_EQ(constraint_builder_.GetNumFinishedNodes(), num_trajectory_nodes);
std::cout << "\r\x1b[KOptimizing: Done. " << std::endl;
}

就是把ceres的迭代次数设置为200,其他部分其实不用太深入研究了。


后端的线程池 3 HandleWorkQueue和优化

优化的具体实现在优化器optimization_problem_。将优化的结果采用回调的方式,返回到result中。
后端将数据加入到 optimization_problem_ 的对应传感器队列中,并按时间排列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void PoseGraph2D::HandleWorkQueue(
const constraints::ConstraintBuilder2D::Result& result)
{
{ // 将新的约束添加到全局约束队列中
absl::MutexLock locker(&mutex_);
data_.constraints.insert(data_.constraints.end(), result.begin(),
result.end() );
}
// ceres 在这里
RunOptimization();
// 以下是 省略 的内容
//如果设置了全局优化回调函数,则进行调用
//根据约束结果,更新轨迹间的链接关系
// 优化完成后,累计节点清零
// 计算相同轨迹的 inter_constraints_same_trajectory
// 计算不同轨迹的 inter_constraints_different_trajectory

// 优化结束后,重新开启任务队列,即继续执行
// work_queue_里的 work_item
DrainWorkQueue();
}

RunOptimization

优化的实际就是成员变量node_data_submap_data_,也就是

1
2
3
4
5
6
7
8
9
10
11
12
struct NodeSpec2D
{
common::Time time;
transform::Rigid2d local_pose_2d;
transform::Rigid2d global_pose_2d;
Eigen::Quaterniond gravity_alignment;
};

struct SubmapSpec2D
{
transform::Rigid2d global_pose;
};

1
2
3
4
5
6
7
8
9
10
11
12
void PoseGraph2D::RunOptimization()
{
if (optimization_problem_->submap_data().empty() )
return;
// std::unique_ptr<optimization::OptimizationProblem2D> optimization_problem_;

// No other thread is accessing the optimization_problem_,
// data_.constraints, data_.frozen_trajectories and data_.landmark_nodes
// when executing Solve.
// 调用优化,Ceres在这里面,由于耗时间,故没加锁,防止阻塞其他线程
optimization_problem_->Solve(data_.constraints,
GetTrajectoryStates(), data_.landmark_nodes);

将所有内部约束和外部约束合并在一起执行Solve
遍历所有submap,建立参数块;遍历所有node,建立参数块。
根据约束,添加残差函数;
处理里程计问题,添加可能的残差;
求解返回结果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  absl::MutexLock locker(&mutex_);
// 优化后所有的submap和node数据
const auto& submap_data = optimization_problem_->submap_data();
const auto& node_data = optimization_problem_->node_data();
/* 遍历所有优化后的轨迹的所有节点位姿 */
for (const int trajectory_id : node_data.trajectory_ids())
{
for (const auto& node : node_data.trajectory(trajectory_id) )
{
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node.id);
// 更新所有节点的 全局位姿
mutable_trajectory_node.global_pose =
transform::Embed3D(node.data.global_pose_2d) *
transform::Rigid3d::Rotation(
mutable_trajectory_node.constant_data->gravity_alignment);
}

// Extrapolate all point cloud poses that were not included in the
// 'optimization_problem_' yet. 注意是已经加入tarjectory但是还没有进行优化的新Node
// 由于使用采样器建立约束,有的node没有建立约束,其pose就不会被优化
// 因此,要通过已经优化的位姿转换关系来修正所有的node

// 子图的local到global的新的转移矩阵
const auto local_to_new_global =
ComputeLocalToGlobalTransform(submap_data, trajectory_id);
// 旧的转移矩阵
const auto local_to_old_global = ComputeLocalToGlobalTransform(
data_.global_submap_poses_2d, trajectory_id);
const transform::Rigid3d old_global_to_new_global =
local_to_new_global * local_to_old_global.inverse();
// 上次最后一个优化的节点
const NodeId last_optimized_node_id =
std::prev(node_data.EndOfTrajectory(trajectory_id))->id;
auto node_it =
std::next(data_.trajectory_nodes.find(last_optimized_node_id));
// 后续未优化的节点的全局pose进行转移
for (; node_it != data_.trajectory_nodes.EndOfTrajectory(trajectory_id);
++node_it)
{
// TrajectoryNode类型, 指针
auto& mutable_trajectory_node = data_.trajectory_nodes.at(node_it->id);
修正 global_pose
mutable_trajectory_node.global_pose =
old_global_to_new_global * mutable_trajectory_node.global_pose;
}
}
// 更新所有路标位姿
for (const auto& landmark : optimization_problem_->landmark_data()) {
data_.landmark_nodes[landmark.first].global_landmark_pose = landmark.second;
}
data_.global_submap_poses_2d = submap_data;
}

优化并非是实时,是在后台进行运行的,并且需要一定的时间。因此当完成优化时,前端输出结果已经对整个位姿图个数进行了增加。后面新加入的节点并未优化,所以返回优化的结果没有最新加入轨迹节点对应的结果。因此采用优化后结果中最后一个轨迹节点的位姿的转移矩阵,作为未参与优化轨迹节点的转移矩阵进行更新。

其中 为参与优化的节点,优化前位姿为 ,优化后的位姿为 。而 则为未参与优化的节点。


后端 5 计算约束

我的理解是,求inter约束也是一个 scan to map 的过程,找到和点云最相似的不同时间的子图,也就是找回环。其实是和lidar_localization的后端用NDT找关键帧的匹配是类似的,但是cartographer是点云和栅格地图匹配,不像点云匹配那样直观,分支定界的score就像NDT匹配的score,不过前者越大越好,后者越小越好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ConstraintBuilder2D::ComputeConstraint(
const SubmapId& submap_id, const Submap2D* const submap,
const NodeId& node_id, bool match_full_submap,
const TrajectoryNode::Data* const constant_data,
const transform::Rigid2d& initial_relative_pose,
const SubmapScanMatcher& submap_scan_matcher,
std::unique_ptr<ConstraintBuilder2D::Constraint>* constraint)
{
CHECK(submap_scan_matcher.fast_correlative_scan_matcher);
/* node在local坐标系的位姿 = 子图在local坐标系中的位姿 * node在子图的位姿*/
const transform::Rigid2d initial_pose =
ComputeSubmapPose(*submap) * initial_relative_pose;

float score = 0.;
transform::Rigid2d pose_estimate = transform::Rigid2d::Identity();

整个函数是为了计算constraint_transform(节点 j 和子图 i的关系) ,需要的参数有:

  • 节点 j 的filtered_gravity_aligned_point_cloud
  • 分支定界的初值 initial_pose (节点 j 在local map坐标系的坐标)
  • Match() 的结果 pose_estimate (节点 j 在local map坐标系的坐标).
  • ComputeSubmapPose()函数 (local map坐标系转到子图 i 坐标系)

计算pose_estimate的三步:

  1. 使用 fast correlative scan matcher 做 Fast estimate
  2. Prune if the score is too low.
  3. ceres Refine

匹配所有子图 或 局部子图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 匹配所有子图,对应 MaybeAddGlobalConstraint
if (match_full_submap)
{
kGlobalConstraintsSearchedMetric->Increment();
if( submap_scan_matcher.fast_correlative_scan_matcher->MatchFullSubmap(
constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 global_localization_min_score
options_.global_localization_min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.global_localization_min_score());
CHECK_GE(node_id.trajectory_id, 0);
CHECK_GE(submap_id.trajectory_id, 0);
// 最后记录下全局约束的次数和统计置信度
kGlobalConstraintsFoundMetric->Increment();
kGlobalConstraintScoresMetric->Observe(score);
}
else return;
}
// 匹配局部子图
else
{
kConstraintsSearchedMetric->Increment();
if (submap_scan_matcher.fast_correlative_scan_matcher->Match(
initial_pose, constant_data->filtered_gravity_aligned_point_cloud,
// 参数为 min_score
options_.min_score(),
&score, &pose_estimate) )
{
CHECK_GT(score, options_.min_score());
kConstraintsFoundMetric->Increment();
kConstraintScoresMetric->Observe(score);
}
else return;
}
{
absl::MutexLock locker(&mutex_);
score_histogram_.Add(score);
}

分枝定界求出的位姿被称为 初始位姿 3,这个初始位姿3及其携带的点云作为输入,用于ceres与此子图进行优化匹配

在建立全局约束的时候,直接在一个超大的范围内进行分枝定界搜索,并不需要计算一个特殊的初始位姿2,而直接把初始位姿设置为地图limits的中心点,可以理解为map的中心点。 而且打分的参数也不同了。

全局约束的搜索窗口范围: [1e6 * limits_.resolution(), M_PI],角度其实是±180°

ceres refine

ceres优化匹配,得到更加准确的优化位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Use the CSM estimate as both the initial and previous pose. 
// This has the effect that, in the absence of better information,
// we prefer the original CSM estimate.
ceres::Solver::Summary unused_summary;
// ceres更新pose_estimate,获得节点在local map中的最优位姿
ceres_scan_matcher_.Match(pose_estimate.translation(), pose_estimate,
constant_data->filtered_gravity_aligned_point_cloud,
*submap_scan_matcher.grid, &pose_estimate,
&unused_summary);
// 计算得到node相对子图的位姿
const transform::Rigid2d constraint_transform =
ComputeSubmapPose(*submap).inverse() * pose_estimate;
constraint->reset(new Constraint{submap_id,
node_id,
{ transform::Embed3D(constraint_transform),
options_.loop_closure_translation_weight(),
options_.loop_closure_rotation_weight() },
Constraint::INTER_SUBMAP} );

对于局部约束,constraint_transform并不是回环边,其实就是子图和节点的普通约束。 全局约束才构造回环边

日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (options_.log_matches() )
{
std::ostringstream info;
info << "Node " << node_id << " with "
<< constant_data->filtered_gravity_aligned_point_cloud.size()
<< " points on submap " << submap_id << std::fixed;
if (match_full_submap)
info << " matches";
else
{
const transform::Rigid2d difference =
initial_pose.inverse() * pose_estimate;
info << " differs by translation " << std::setprecision(2)
<< difference.translation().norm() << " rotation "
<< std::setprecision(3) << std::abs(difference.normalized_angle());
}
info << " with score " << std::setprecision(1) << 100. * score << "%.";
LOG(INFO) << info.str();
}

思考的问题
  • 为何子图插入scan结束时CPU升高

  • 为什么要用线程池

  • To be sure there is a recent loop closure constraint, you can query PoseGraphInterface::constraints(), loop over all constraints and check if there is a recent one of type INTER that spans some time.

  • A simpler solution is to subscribe to the global slam callback GlobalSlamOptimizationCallback by setting PoseGraphInterface::SetGlobalSlamOptimizationCallback. This will call you back after Cartographer has searched for loop closures. It does not tell you if loop closures were found, it mostly tells you if the background loop closure search is keeping up or falls behind.


gmapping和AMCL的区别

粒子

AMCL的粒子只包括位姿和权重,需要大量的粒子才能较好描述机器人位姿。gmapping的粒子包括轨迹、地图、权重,粒子过多会占用很大的内存。

AMCL的粒子在初始化时,各粒子的位姿是不同的,是均值参数加高斯噪声; gmapping粒子初始化时,粒子的位姿都是相同的。 所以说AMCL更符合撒粒子的逻辑。

AMCL的粒子数不是固定的,有最小和最大粒子数。在定位过程中,KLD采样会动态减少粒子数;gmapping的粒子数一直是固定的。

AMCL的提议分布是运动模型,分布的特点是又扁又宽; gmapping的提议分布是运动模型又加入了最近的观测,分布特点是小的尖峰。

AMCL的粒子权重是似然域模型改变,gmapping是通过scan match

AMCL输出的位姿是所有粒子加权平均后的结果; gmapping是输出得分最高的粒子


emplace 和 emplace_back

常见用法:

1
2
3
4
5
6
7
8
vector<int> v;
for(int i=0; i<4; i++)
{
v.emplace_back();
auto& num = v.back();
num = i;
qDebug()<< "size: "<<v.size() <<" v.back: "<<v.back();
}

结果

1
2
3
4
size:  1   v.back:  0
size: 2 v.back: 1
size: 3 v.back: 2
size: 4 v.back: 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <map>
#include <string>
#include <vector>
using namespace std;

class A{
public:
A(const string& s) {cout << "A construct " <<endl;}
A(const A& a) {cout <<"A copy"<<endl; }
private:
string s;
};

int main()
{
std::map<int, string> m;
vector<A> v;
A a1("test"); // a1 构造函数
cout << "------------------" << endl;
v.push_back(a1); // a1 拷贝构造函数
v.emplace_back(a1);

//先运行构造函数,后拷贝构造函数
v.push_back(A("test") );
v.emplace_back(A("test") );

// v.push_back("test"); 报错
// 这个才是真正的用途
v.emplace_back("test"); // 隐式转换,只有拷贝构造函数
cout << "******************" << endl;
return 0;
}

对于C++ 11里vector的emplace_back函数比较失望,都说提高了效率,其实它仅对于元素做隐式转换的情况有效,此时没有产生临时对象。对其他情况,和push_back没区别。

这里用到的c++11特性完美转发:将接收下来的参数原样完美地传递给对象的构造函数,这带来另一个方便性就是即使是构造函数声明为 explicit 它还是可以正常工作,因为它不存在临时变量和隐式转换。

map::emplace

map就只有emplace,机制也是一样的。元素是直接构建的,既不复制也不移动,仅当键不存在时才插入。
但是map有个问题:emplace 方法把它接收到的所有的参数都一起转发给 pair 的构造函数。但是对于一个 pair 来说,它既需要构造它的 key 又需要构造它的 value。如果我们按照之前普通的语法使用变参模板的话,则它是无法区分哪些参数用来构造 key, 哪些用来构造 value的。 比如下面的代码

1
2
3
4
5
// 无法区分哪个参数用来构造 key 哪些用来构造 value
// 有可能是 std::string s("hello", 1), std::complex<double> cpx(2)
// 也有可能是 std::string s("hello"), std::complex<double> cpx(1, 2)
std::map<std::string, std::complex<double>> scp;
scp.emplace("hello", 1, 2);

考虑使用可以接受异构、变长参数的 tuple 来对参数进行分组。再使用 C++11 提供的一个特殊类型 piecewise_construct_t 来帮助它们找到各自正确的构造函数了。std::piecewise_construct_t 是一个空类,全局变量 std::piecewise_construct 就是该类型的一个变量。

std::piecewise_construct作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。即第一个 tuple 作为 key,第二个 tuple 作为 value。 使用 forward_as_tuple,该函数会构造一个 tuple 并转发给 pair。

1
2
3
4
5
// 想对于 map 避免临时变量的构造的话,就需要构建两个 tuple
std::map<std::string, std::complex<double>> scp;
scp.emplace(std::piecewise_construct, // 此常量值作为构造 pair 对象的第一个参数传递,以选择构造函数形式,通过将两个元组对象的元素转发给它们各自的构造函数来构造其成员。
std::forward_as_tuple("hello"), // 该函数会构造一个 tuple 并转发给 pair 构造,并存储在 first 字段
std::forward_as_tuple(1, 2)); //该函数会构造一个 tuple 并转发给 pair 构造,存储在 second 字段

cartographer中的代码就是这样

1
2
3
4
5
6
7
8
9
// std::map<int, ::cartographer::mapping::PoseExtrapolator>  extrapolators_;
// PoseExtrapolator的构造函数有2个参数:common::Duration pose_queue_duration,
// double imu_gravity_time_constant
extrapolators_.emplace(
std::piecewise_construct,
std::forward_as_tuple(trajectory_id),
std::forward_as_tuple(
::cartographer::common::FromSeconds(kExtrapolationEstimationTimeSec),
gravity_time_constant) );

无参的emplace_back

还有一种使用无参的vector::emplace_back() 或者 deque::emplace_back(),下面是从cartographer中学来的代码,确实有一定技巧

1
2
3
4
5
6
7
8
9
10
    vector<std::unique_ptr<A> > dd;
dd.emplace_back();
// 注意这里的 size 也是1
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
// cout << dd.at(0)->getValue() <<endl; 这里运行会崩溃

auto* a1 = &dd.back(); // 这个是 unique_ptr
a1->reset(new A(5) ); // 不能再用=,unique_ptr不能复制
cout <<"capacity: "<< dd.capacity() <<" size: "<< dd.size() <<endl;
cout << dd.at(0)->getValue() <<endl;

运行结果:

1
2
3
4
capacity: 1 size: 1
A construct
capacity: 1 size: 1
5

参考:
emplace_back VS push_back


后端的线程池 1 thread_pool 和 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 省略部分不重要的成员
class Task
{
public:
friend class ThreadPoolInterface;
State GetState() LOCKS_EXCLUDED(mutex_); //返回本Task当前状态
// 设置Task 执行的任务 (函数)
void SetWorkItem(const WorkItem& work_item);
// 给当前任务添加 依赖任务, 如当前任务为b,AddDependency(a)表示任务a依赖b
// 把当前任务b,加入到依赖任务 a 的 dependent_tasks_ 列表
void AddDependency(std::weak_ptr<Task> dependency) LOCKS_EXCLUDED(mutex_);

private:
// 执行当前任务,比如当前任务为a,并更新依赖a的任务dependent_tasks_中所有任务状态
void Execute() LOCKS_EXCLUDED(mutex_);

// 当前任务进入线程待执行队列
void SetThreadPool(ThreadPoolInterface* thread_pool) LOCKS_EXCLUDED(mutex_);

// 当前任务的依赖任务完成时候,当前任务状态随之改变
void OnDependenyCompleted();


using WorkItem = std::function<void()>;
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
// 任务具体执行过程
WorkItem work_item_ ;
// 执行当前任务的线程池
ThreadPoolInterface* thread_pool_to_notify_ = nullptr;
State state_ GUARDED_BY(mutex_) = NEW; // 初始化状态为 NEW
// 依赖当前任务的任务列表
std::set<Task*> dependent_tasks_ GUARDED_BY(mutex_);
};

Task类不是线程,可以理解为有状态和依赖的函数,核心的成员变量work_item_是仿函数,其返回类型void。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 省略部分不重要的成员
// 不明白为什么要先构造个抽象类,只有这一个派生类
class ThreadPool : public ThreadPoolInterface
{
public:
//初始化一个线程数量固定的线程池。
explicit ThreadPool(int num_threads);
// 添加想要ThreadPool执行的task,插入 tasks_not_ready_
// 如果任务满足执行要求,直接插入task_queue_准备执行
std::weak_ptr<Task> Schedule(std::unique_ptr<Task> task);
private:
//每个线程初始化时, 执行DoWork()函数. 与线程绑定
void DoWork();
void NotifyDependenciesCompleted(Task* task);
//running_只是一个监视哨, 只有线程池在running_状态时, 才能往work_queue_加入函数
bool running_ = true;
// pool_就是一系列线程
std::vector<std::thread> pool_ ;
// 十分重要的任务队列
std::deque<std::shared_ptr<Task>> task_queue_ ;
//未准备好的 task,task可能有依赖还未完成
absl::flat_hash_map<Task*, std::shared_ptr<Task>> tasks_not_ready_;
};

看构造函数

1
2
3
4
5
6
7
8
9
10
ThreadPool::ThreadPool(int num_threads)
{
absl::MutexLock locker(&mutex_);
// 原来还是用了std::thread,还以为google连线程也是自己实现的
for (int i = 0; i != num_threads; ++i)
{
// std::vector<std::thread> pool_;
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}

大致看ThreadPool::DoWork(),最后执行了ThreadPoolInterface::Execute,其实就是Task::Execute。 每个线程与DoWork()函数绑定,也就是线程在后台不断执行DoWork()函数

新的Task如果有依赖项,通过Thread_pool::Schedule添加到 Thread_pooltasks_not_ready_队列中。队列中的Task状态是变化的,等依赖项执行完成,Task状态变为DEPENDENCIES_COMPLETED,然后再插入task_queue_队列。最终所有Task都会插入task_queue_中,在DoWork中得到执行。

如果该Task没有依赖,直接插入task_queue_,准备执行。

对任一个任务的状态转换顺序为:NEW—->DISPATCHED—->DEPENDENCIES_COMPLETED—->RUNNING—->COMPLETED

Thread_pool通过固定数量的thread与task_queue_(待执行的task队列)执行函数绑定。Thread_pool 按照队列首尾顺序不断执行Task。

示意图