cartographer的杂项模块

detect_floors.cc

detect_floors.h定义了关于3D扫描楼层的数据结构。

1
2
3
4
5
6
7
8
9
10
struct Timespan {
common::Time start;
common::Time end;
};
Timespan表征扫描的时间范围。

struct Floor {
std::vector< Timespan> timespans;
double z; //z轴的中值
};

一个楼层对应多个扫描timespan:有可能重复的扫描多次,但只有一个高度z。

std::vector DetectFloors(const proto::Trajectory& trajectory); 使用启发式搜索寻找building的不同楼层的z值。

normal_estimation_2d.cc

用于计算TSDF地图中的点云法向量,可以不看。 normal_estimation_2d_test.cc是测试文件,但是没有在CMakeLists中设置 。


优雅地订阅话题

这段代码其实是模仿cartographer的LaunchSubscribers函数,订阅多个传感器的数据话题,同时可以获得主函数里多个参数的赋值

主函数长这样

1
2
3
4
5
6
7
8
int main(int argc, char** argv)
{
ros::init(argc, argv, "launch_subscribers");
ros::NodeHandle nh;
Node node;
node.LaunchSubscribers("base_scan", 1, "/base_odometry/odom", 1, "/torso_lift_imu/data", 1);
ros::spin();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void Node::LaunchSubscribers(const std::string& scan_topic, const int scan_num,
const std::string& odom_topic, const int use_odom,
const std::string& imu_topic, const int use_imu
)
{
if(scan_num)
LaunchLaserScanSub(scan_topic, scan_num);
if(use_odom)
LaunchOdomSub(odom_topic, use_odom);
if(use_imu)
LaunchImuSub(imu_topic, use_imu);
}

void Node::LaunchLaserScanSub(const std::string& scan_topic, const int scan_num)
{
subscribers_.push_back(
{ SubscribeWithHandler<sensor_msgs::LaserScan>(
&Node::HandleLaserScanMessage,
scan_num, scan_topic, &node_handle_, this),
scan_topic} );
}

// 这里也可以把 int 改为 bool
template <typename MessageType>
::ros::Subscriber SubscribeWithHandler(
void (Node::*handler)(int, const std::string&,
const typename MessageType::ConstPtr&),
const int num, const std::string& topic,
::ros::NodeHandle* const node_handle, Node* const node)
{
return node_handle->subscribe<MessageType>(
topic, 1,
boost::function<void(const typename MessageType::ConstPtr&)>(
[node, handler, num,
topic](const typename MessageType::ConstPtr& msg) {
(node->*handler)(num, topic, msg);
}) );
}

// 根据上面的void (Node::*handler)部分,参数依次为int, string&, MessageType::ConstPtr&
// scan_num 就是main函数里的1
void Node::HandleLaserScanMessage(int scan_num, const std::string& sensor_id,
const sensor_msgs::LaserScan::ConstPtr& msg)
{
ROS_INFO("range_min: %f, scan_num: %d", msg->range_min, scan_num );
}

find 和 find_if, find_if_not

头文件: #include <algorithm>

返回范围 [first, last) 中满足特定判别标准的首个元素:

  • find: 搜索等于 value 的元素

  • find_if: 根据指定的查找规则,在指定区域内查找第一个符合该函数要求(使函数返回 true)的元素。如果查找成功,该迭代器指向的是查找到的那个元素;反之,如果查找失败,该迭代器的指向和 last 迭代器相同

  • find_if_not: 与find_if相反

示例代码:

1
2
3
4
5
6
7
8
9
10
11
std::vector<int> score{ 10, 20, 30, 40 };

auto it_1 = std::find(score.begin(), score.end(), 30 );
if(it_1 != score.end() )
qDebug()<< "found !";
else qDebug()<< "no found !";

auto it_2 = std::find_if(score.begin(), score.end(), [](int& m){return m>20;} );
if(it_2 != score.end() )
qDebug()<< "found: " << *it_2;
else qDebug()<< "no found !";

  • 若作为算法一部分调用的函数的执行抛出异常,且 ExecutionPolicy 为标准策略之一,则调用 std::terminate 。对于任何其他 ExecutionPolicy ,行为是实现定义的。
  • 若算法无法分配内存,则抛出 std::bad_alloc

原子操作和atomic关键字

多个线程之间共享地址空间,所以多个线程共享进程中的全局变量和堆,都可以对全局变量和堆上的数据进行读写,但是如果两个线程同时修改同一个数据,可能造成某线程的修改丢失;如果一个线程写的同时,另一个线程去读该数据时可能会读到写了一半的数据。这些行为都是线程不安全的行为,会造成程序运行逻辑出现错误。下面的程序很常见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// boost::atomic<int> i(0);
int i=0;
boost::mutex mut;
void thread_1(int n);

int main()
{
int n=100000; // n不够大时,i不容易出现不同的情况
boost::thread th1 = boost::thread(boost::bind(&thread_1,n));
boost::thread th2 = boost::thread(boost::bind(&thread_1,n));
th1.join();
th2.join();
cout<< i<<endl;
return 0;
}

void thread_1(int n)
{
while(n--)
{
mut.lock();
i++;
mut.unlock();
}
}

如果不加lock,i最终值是不确定的,因为两个线程同时对i进行了写操作。一般地,我们用互斥锁mutex保护临界区,保证同一时间只能有一个线程可以获取锁,持有锁的线程可以对共享变量进行修改,修改完毕后释放锁,而不持有锁的线程阻塞等待直到获取到锁,然后才能对共享变量进行修改,最后i必定为200000

Boost提供了原子类型atomic,通过使用原子类型可摆脱每次对共享变量进行操作都进行的加锁解锁动作,节省了系统开销,同时避免了线程因阻塞而频繁的切换。atomic封装了不同计算机硬件的底层操作原语,提供了跨平台的原子操作功能,解决并发竞争读写变量的困扰,只需要包含文件<boost/atomic.hpp>,在上面的代码中使用boost::atomic<int> i(0);,然后去掉函数中的互斥锁,运行效果是一样的,而且节省了系统开销。

atomic可以把对类型T的操作原子化,T的要求:

  1. 标量类型(算数,枚举,指针)
  2. POD类型,可以使用memcmp,memset等函数

两种方式创建atomic对象:

1
2
3
4
5
6
atomic<int> a(10);
assert(a==10); //安全函数,若表达式不成立结束程序


atomic<long> L;
cout << L <<endl; //初始值不确定

最重要的两个成员函数: store() (operator=) 和 load() (operator T() )以原子方式存取,不会因为并发访问导致数据不一致。

1
2
3
4
5
6
7
8
9
boost::atomic<bool> b(1);
assert(b != 0);
std::cout << b << std::endl;
b.store(0);//存值
std::cout << b << std::endl;

boost::atomic<int> n1(100);
std::cout << n1.exchange(200) << std::endl; //交换两个值,并且返回原值100
std::cout << n1 << std::endl;

测试代码中临界区非常短,只有一个语句,所以显得加锁解锁操作对程序性能影响很大,但在实际应用中,我们的临界区一般不会这么短,临界区越长,加锁和解锁操作的性能损耗越微小,无锁编程和有锁编程之间的性能差距也就越微小。

无锁编程最大的优势在于两点:

  • 避免了死锁的产生。由于无锁编程避免了使用锁,所以也就不会出现并发编程中最让人头疼的死锁问题,对于提高程序健壮性有很大积极意义
  • 代码更加清晰与简洁

std::atomic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//int v = 0;
std::atomic<int> v(0);

void Test()
{
for(int i=0; i<10000; i++)
++v;
}

int main(int argc, char *argv[])
{
std::thread t1(Test);
std::thread t2(Test);
t1.join();
t2.join();

std::cout << "v: " << v << std::endl;
return 0;
}

如果只用简单的全局变量int v = 0;,结果一般不是20000。 换上std::atomic<int> v(0);,结果必为20000

参考:
Item40 Use std::atomic for concurrency
聊一聊原子操作
C++11多线程编程详解


pre和next

std::prev

原型template <class BidirectionalIterator> BidirectionalIterator prev (BidirectionalIterator it, typename iterator_traits<BidirectionalIterator>::difference_type n = 1);

该函数会返回一个距离 it 迭代器 n 个元素的新迭代器。当 n 为正数时,其返回的迭代器将位于 it 左侧;反之,当 n 为负数时,其返回的迭代器位于 it 右侧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>     // std::cout
#include <iterator> // std::next
#include <list> // std::list
using namespace std;
int main() {
//创建并初始化一个 list 容器
std::list<int> mylist{ 1,2,3,4,5 };
std::list<int>::iterator it = mylist.end();
//获取一个距离 it 迭代器 2 个元素的迭代器,由于 2 为正数,newit 位于 it 左侧
auto newit = prev(it, 2);
cout << "prev(it, 2) = " << *newit << endl; // 4

//n为负数,newit 位于 it 右侧
it = mylist.begin();
newit = prev(it, -2);
cout << "prev(it, -2) = " << *newit; // 3
return 0;
}

prev() 函数自身不会检验新迭代器的指向是否合理,需要我们自己来保证其合理性。

next()函数和prev()函数恰好相反


pbstream_main的使用

旧版本为cartographer_pbstream

源码是pbstream_main.cc,又实际调用了pbstream_info.ccpbstream_migrate.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int main(int argc, char** argv)
{
google::InitGoogleLogging(argv[0]);

FLAGS_logtostderr = true;
const std::string usage_message =
"Swiss Army knife for pbstreams.\n\n"
"Currently supported subcommands are:\n"
"\tinfo - Prints summary of pbstream.\n"
"\tmigrate - Migrates old pbstream (w/o header) to new pbstream format.";
google::ParseCommandLineFlags(&argc, &argv, true);

if (argc < 2) {
google::SetUsageMessage(usage_message);
google::ShowUsageWithFlagsRestrict(argv[0], "pbstream_info_main");
return EXIT_FAILURE;
} else if (std::string(argv[1]) == "info") {
return ::cartographer::io::pbstream_info(argc, argv);
} else if (std::string(argv[1]) == "migrate") {
return ::cartographer::io::pbstream_migrate(argc, argv);
}
// 其他情况会报错
}

可执行文件在carto_ws/install_isolated/bin

pbstream_main info

终端执行pbstream_main info,调用pbstream_info:打印出pbstream文件的概况,路径支持环境变量。

1
./pbstream_main info $MAPS/file.pbstream

结果的中间一大部分是采用的lua参数,如果不想显示这些参数,注释掉源码里的LOG(INFO) << "Trajectory options: " << trajectory_options.DebugString();

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Reading pbstream file from '/home/xiaoqiang/carto_ws/maps/file.pbstream'...
Header: format_version: 2
Trajectory options: sensor_id {
id: "scan"
}
sensor_id {
type: IMU
id: "imu"
}
sensor_id {
type: ODOMETRY
id: "odom"
}

# ......省略lua参数
Trajectory id: 0 has #nodes 9778 has #submaps 161
SerializedData package contains #node: 9778
SerializedData package contains #odometry_data: 49697

SerializedData package contains #submap: 161
SerializedData package contains #submap_2d: 161
SerializedData package contains #submap_2d_grid: 161

SerializedData package contains #submap_3d: 0
SerializedData package contains #submap_3d_high_resolution_hybrid_grid: 0
SerializedData package contains #trajectory_data: 1

不要在最后加-all_debug_strings=true,否则打印出无穷无尽的内容,包括了栅格值和子图转换表等等。

pbstream_main migrate

终端执行pbstream_main migrate,调用pbstream_migrate:把pbstream文件移植到新的子图格式。把未使用histogram的子图文件移植为包含histogram的子图格式。如果想在输出文件中排除未完成的子图,设置--include_unfinished_submaps为false


ninja

如果没有ninja,在使用时会报错: CMake was unable to find a build program corresponding to "Ninja". CMAKE_MAKE_PROGRAM is not set. You probably need to select a different build tool.

安装

ninja需要依赖于re2c,否则编译是会报错,re2c是一款语法分析器,官网地址是:http://re2c.org/
下载页面的安装说明都在: http://re2c.org/install/install.html, 这里直接从源码安装最新版本

安装re2c:

1
2
3
4
5
git clone https://github.com/skvadrik/re2c
cd re2c
./autogen.sh
./configure make
make install

可以从ninja的github仓库切换到release分支或者使用git下载:

1
2
git clone https://github.com/ninja-build/ninja.git
cd ninja

采用python的安装方式
1
./configure.py --bootstrap

等待完成即可

ninja比较精简,只需要一个可执行文件即可,现在可以做软链或者复制到/usr/bin下面就可以直接调用了

1
cp ninja /usr/bin/ 


cartographer_offline_node

cartographer_offline_node对应offline_node_main.cc,它和cartographer_grpc_offline_node对应的offline_node_grpc_main.cc极为相似

在线和离线的建图效果无区别。离线节点直接从bag读取传感器数据,使用了CPU能实现的最大速度处理bag,而且不会丢失消息,不过无法设置速度和看不清建图过程的细节。 如果想控制处理速度,就使用在线节点和rosbag play -r 2,但是如果设置速度太快,可能会丢失消息,这是很常见的。

简单看了源码,离线节点之所以运行快,是因为使用了ROS多线程ros::AsyncSpinner,而在线节点还是普通的ROS程序

运行示例

1
2
wget -P ~/Downloads https://storage.googleapis.com/cartographer-public-data/bags/backpack_2d/b2-2016-04-05-14-44-52.bag
roslaunch cartographer_ros offline_backpack_2d.launch bag_filenames:=${HOME}/Downloads/b2-2016-04-05-14-44-52.bag

此时的CPU占用果然要爆了,所以说只适合运行数据集,不适合需要优化cartographer的场合
CPU占用.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "cartographer/mapping/map_builder.h"
#include "cartographer_ros/offline_node.h"
#include "cartographer_ros/ros_log_sink.h"
#include "gflags/gflags.h"
#include "ros/ros.h"

int main(int argc, char** argv)
{
google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);

::ros::init(argc, argv, "cartographer_offline_node");
::ros::start();
cartographer_ros::ScopedRosLogSink ros_log_sink;

const cartographer_ros::MapBuilderFactory map_builder_factory =
[](const ::cartographer::mapping::proto::MapBuilderOptions&
map_builder_options) {
return absl::make_unique< ::cartographer::mapping::MapBuilder>(
const_cast<::cartographer::mapping::proto::MapBuilderOptions&>(map_builder_options));
};
cartographer_ros::RunOfflineNode(map_builder_factory);
::ros::shutdown();
}

参数

RunOfflineNode函数转到了offline_node.cc,这个文件主要就是这个函数。这个文件开头是一堆宏定义,根据字符串解释每个可用的参数

  • configuration_directory
  • configuration_basenames
  • load_state_filename

  • bag_filenames: Comma-separated list of bags to process. One bag per trajectory. Any combination of simultaneous and sequential bags is supported.

  • urdf_filenames: Comma-separated list of one or more URDF files that contain. static links for the sensor configurations

  • use_bag_transforms: 默认true,Whether to read, use and republish transforms from bags

  • keep_running: 默认false,最好改为true。Keep running the offline node after all messages from the bag have been processed

  • load_frozen_state: 默认true,Load the saved state as frozen (non-optimized) trajectories

  • skip_seconds: 默认0,Optional amount of seconds to skip from the beginning (i.e. when the earliest bag starts)

一个配置了两个机器人的的launch的关键部分是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<node name="cartographer_offline_node" pkg="cartographer_ros"
required="$(arg no_rviz)"
type="cartographer_offline_node" args="
-configuration_directory $(arg abspath)
-configuration_basenames alpha-cartographer.lua,delta-cartographer.lua
-urdf_filenames $(arg abspath)/pioneer-alpha.urdf,$(arg abspath)/pioneer-delta.urdf
-bag_filenames $(arg abspath)/alpha.bag,$(arg abspath)/delta.bag
-keep_running=true -use_bag_transforms=false"
output="screen">
<remap from="bag_1_scan" to="alpha/scan" />
<remap from="bag_1_odom" to="alpha/pose" />
<remap from="bag_2_scan" to="delta/scan" />
<remap from="bag_2_odom" to="delta/pose" />
</node>

话题和service

Publications:

  • cartographer_offline_node/bagfile_progress [cartographer_ros_msgs/BagfileProgress]
  • clock [rosgraph_msgs/Clock]
  • constraint_list [visualization_msgs/MarkerArray]
  • landmark_poses_list [visualization_msgs/MarkerArray]
  • rosout [rosgraph_msgs/Log]
  • scan_matched_points2 [sensor_msgs/PointCloud2]
  • submap_list [cartographer_ros_msgs/SubmapList]
  • tf [tf2_msgs/TFMessage]
  • tf_static [tf2_msgs/TFMessage]
  • tracked_pose [geometry_msgs/PoseStamped]
  • trajectory_node_list [visualization_msgs/MarkerArray]

Subscriptions:

  • clock [rosgraph_msgs/Clock]

Services:

  • finish_trajectory
  • get_trajectory_states
  • read_metrics
  • start_trajectory
  • submap_query
  • trajectory_query
  • write_state

离线节点是直接在程序里读取消息,所以它没有像rosbag play那样把bag里的话题发布出来,我运行bag时,因为还需要运行一个节点对bag里的话题进行类型转换,而话题又没发布出来,所以离线节点实际无法使用了。

源码的重要部分

1
2
3
4
5
6
7
8
9
constexpr char kClockTopic[] = "clock";
constexpr char kTfStaticTopic[] = "/tf_static";
constexpr char kTfTopic[] = "tf";
constexpr double kClockPublishFrequencySec = 1. / 30.;
constexpr int kSingleThreaded = 1;
// We publish tf messages one second earlier than other messages. Under
// the assumption of higher frequency tf this should ensure that tf can
// always interpolate.
const ::ros::Duration kDelay = ::ros::Duration(1.0);

sort函数

sort使用快速排序的递归形式,时间复杂度是O(nlog(n) )

Sort函数有三个参数:(第三个参数可不写)

  1. 第一个是要排序的数组的起始地址。
  2. 第二个是结束的地址(最后一位要排序的地址)
  3. 第三个参数是排序的方法,可以是从大到小也可是从小到大,还可以不写第三个参数,此时默认的排序方法是从小到大排序。
1
2
3
int b = 3, c = 12;
qDebug() << std::greater<int>()(b, c);
qDebug() << std::less<int>()(b, c);
1
2
3
4
5
6
7
8
int a[]={3,4,1,12,0,8,4};
// 默认从小到大排列
std::sort(a, a+7);
// 从大到小
std::sort(a, a+7, std::greater<int>());
for(int i=0; i < 7; i++)
cout << a[i] <<" ";
// 0 1 3 4 4 8 12

针对自定义的数据类型,需要自定义排序的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
struct Frontier
{
Frontier(double c):
cost(c){}

double cost;
};

bool compareFrontier(Frontier f1, Frontier f2)
{
return (f1.cost < f2.cost);
}
void showFrontiers(const vector<Frontier>& v)
{
for(auto f: v)
cout << f.cost << " ";
cout << endl;
}

vector<Frontier> frontiers;
frontiers.push_back(Frontier(1.2) );
frontiers.push_back(Frontier(4.9) );
frontiers.push_back(Frontier(12.7) );
frontiers.push_back(Frontier(0.3) );
frontiers.push_back(Frontier(3.6) );
showFrontiers(frontiers);

std::sort(frontiers.begin(), frontiers.end(),
[](const Frontier& f1, const Frontier& f2) { return f1.cost < f2.cost; });
// std::sort(frontiers.begin(), frontiers.end(), compareFrontier);

showFrontiers(frontiers);

frontiers一般用vector或数组,不能是list, dequeue, queue

运行结果:

1
2
1.2  4.9  12.7  0.3  3.6  
0.3 1.2 3.6 4.9 12.7

reverse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <vector>
#include <iostream>
#include <iterator>
#include <algorithm>

int main()
{
std::vector<int> v({1,2,3});
std::reverse(std::begin(v), std::end(v));
std::cout << v[0] << v[1] << v[2] << '\n';

int a[] = {4, 5, 6, 7};
std::reverse(std::begin(a), std::end(a));
std::cout << a[0] << a[1] << a[2] << a[3] << '\n';
}

期刊会议的情况

机器人顶级期刊

IEEE Transactions on Robotics (TRO)

International Journal of Robotics Research (IJRR)

Journal of Field Robotics (JFR)

IEEE Robotics & Automation Magazine (RAM)

机器人次顶级期刊

Robotics and Autonomous Systems (RAS)

Autonomous Robots (AURO)

Robotics and Computer-Integrated Manufacturing (RCIM)

期刊影响因子应该至少为2

机器人顶级会议

Robotics: Science and Systems (RSS)

IEEE The International Conference on Robotics and Automation (ICRA)

IEEE/RSJ International Conference on Intelligent Robots and Systems (IROS)

机器人次顶级会议

IEEE International Conference on Robotics and Biomimetics (ROBIO)

IEEE International Conference on Real-time Computing and Robotics (RCAR)

International Conference on Advanced Robotics (ICAR)

IEEE International Conference on CYBER Technology in Automation, Control, and Intelligent Systems (IEEE-CYBER)

IEEE International Conference on Multisensor Fusion and Integration for Intelligent Systems (MFI)

机器人一般会议

IEEE International Conference on Advanced Intelligent Mechatronics (AIM)

IEEE International Conference on Humanoid Robots (Humanoids)

International Conference on Climbing and Walking Robots (CLAWAR)

IEEE International Conference on Information and Automation (ICIA)

论文的相关工作一项也是有意义的。比如论文A,作者说他的方法很好,我看了之后找不出问题,后来又看到了论文B,B在介绍论文A时会提出A存在的问题,而它又如何避免了这种问题。 写的差的论文往往相关工作只是简单介绍,它和以往的论文没多大关系,或者介绍的论文价值不高。