激光雷达的配置和ROS消息

配置雷达

我用的雷达是sick LMS511,配置步骤可以参考SICK激光雷达LMS511笔记

连接直流电源24V,将雷达网线连接到无线路由器。准备一个windows电脑,连接到雷达的同一个无线网,开启雷达配送的调试工具Sick sensor intellingence,搜索设备获得雷达,记住IP:192.168.10.100,双击设备图标进入配置页面。

在另一台Ubuntu电脑上进行驱动配置,也是连到同一个无线网,先测试能否ping通,如不能则按上面步骤配置。

使用驱动lms511_laser_node,下载后放到某工作空间中,先在lms5xx_node.cpp中,在ip_add修改雷达的IP,然后使用catkin_make --pkg laser_node编译。

数据表示

激光雷达获取的数据是以以雷达为中心的极坐标系表示, 激光雷达以数据帧的方式返还数据
2020-05-31_105134.png
2020-05-31_105250.png

雷达的消息和参数

消息成员

启动roscore后,用rosrun laser_node laser_node启动驱动程序,会有很多输出信息。
使用命令rosnode list查看运行的节点列表,发现了sicklms5xx

使用rostopic list查看主题,发现了主题scan,信息如下:

其消息类型是sensor_msgs/LaserScan,发布者是sicklidar,其消息成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
Header header            # 时间戳是收到第一束激光的时间
# in frame frame_id, angles are measured around
# the positive Z axis (counterclockwise, if Z is up)
# with zero angle being forward along the x axis
float32 angle_min # 扫描起始角度[rad]
float32 angle_max # 扫描结束角度[rad]
float32 angle_increment # 角分辨率,相邻扫描点的角度增量,[弧度]
float32 time_increment # 每个数据点的时间间隔 [秒]
float32 scan_time # 当前帧数据与下一帧数据的时间间隔 [秒]
float32 range_min # 最小的扫描距离 [m]
float32 range_max # 最大的扫描距离 [m]
float32[] ranges # 各个方向的扫描数据 [m]
float32[] intensities # 雷达的强度数据,思岚有,有些SICK雷达没有

使用命令rostopic echo scan可查看雷达扫描到的数据:

扫描角度

1
2
angle_min: -1.65806281567
angle_max: 1.65806281567

经过换算,其实就是-95°~95°,而雷达的扫描范围是190°,显然扫描范围是个对称的扇形。雷达角度范围是-1.658~1.658, 角分辨率是0.00872, 这样算出来大概是380, 也就是380个激光束射出。

雷达有旋转方向的问题,大部分雷达都是逆时针旋转,这和ROS中的习惯一致,少数雷达是顺时针,这就会对建图和定位产生问题。比如EAI YDLIDAR G4既可正转也可反转。

判断angle_minangle_max的大小,就能知道雷达扫描的方向,因为雷达的扫描角度都是从angle_min到angle_max变化的。如果angle_min<angle_max,就是逆时针。这样雷达数据从极坐标系转换到笛卡尔右手系时,才能使用公式angle = angle_min + (i * angle_increment)。某些雷达已经在驱动里处理过了,即使顺时针旋转,也可以直接用此公式。

换句话说,angle_min应该叫开始角度angle_max应该叫结束角度

扫描距离

激光传感器在每次扫描都会返回几百个数据,这就是ranges数组。每秒钟还要扫描很多次,这是由scan_time描述,也就是每秒扫出一个扇形(一帧),对每个扫描束都必须处理一个ray-tracing操作.

需要保证range数组中元素的存储顺序与扫描到顺序一致:

  • ranges数组中第一个元素存的scan中第一个扫描到点
  • ranges数组中最后一个元素存的scan中最后一个扫描到的点

如果我们有个rosbag文件,使用rosbag info可以看到这样的结果:
scan消息.png
这里面有360个msg,也就是雷达扫描了360次,注意这不是ranges数组的大小

ranges是个float32类型数组,但不同雷达的ranges大小不同,获得数组大小的方法是 rostopic echo --noarr /scan/ranges,会不断显示<array type: float32, length: 381>,可以看到ranges的大小是381,和激光束的数量基本对应。

A2的量程是0.15-12/18,还是基于白色高反光物体测得。 使用A2时,不能离障碍物太近,否则会出现ranges数组很多个0的情况,这一点在导航时要注意

LMS511最大检测81m,所以说适合室外。这个参数一般是在良好的反射率情况下得到的,对黑色物体的检测量程没这么大。


有时的ranges会出现很多inf和nan的情况,这个对不同雷达的处理可能不同,对于A2,可以看一下源码是这样的:

1
2
if (read_value == 0.0)
scan_msg.ranges[node_count-1-i] = std::numeric_limits<float>::infinity();

也就是读取距离为0时,输出inf

对于北洋雷达,距离超过测量范围时候的两种情况是INF,其余由于强度或其他原因导致的数据错误会输出。

强度

雷达数据强度是反映生成某点的激光雷达脉冲回波强度的一种测量指标(针对每个点而采集)。 雷达强度数据跟激光的反射情况有关,如果射向黑色物体,可能不会有反馈 数组intensities与ranges中的数据是一一对应的,描述的是每个测量数据的强度。

雷达在室外跟室内区别, 基本的点就是光照条件的区别

问题

  • 启动地图后,移动机器人,雷达轨迹跟随移动

先看雷达安装朝上还是朝下,程序中要保持一致。如果没问题,把所有程序关闭再启动,一般会恢复正常

  • 启动雷达,雷达未分配到IP,导致无法使用;运行雷达节点时,突然报错,雷达无法扫描到数据,然后雷达IP丢失

  1. 雷达扫描的结果比实际偏一定角度

偏移的scan.png
正常scan.png
再三试验,应当是rviz视角问题

参考:
Turtlebot3中级教程-应用-自动泊车
EAI的YDLIDAR-X4激光雷达在ROS下使用方法


用freopen函数操作输入输出流和读写文件
此方法非常不好,容易造成输入输出流混乱,最好用fopen,fread,fwrite



freopen函数用于重定向输入输出流,freopen(file, "w",stdout)用于将输出流定向到文件file,freopen(file, "r",stdin)是从文件file读文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
FILE* fp = freopen("/home/user/Documents/test.txt","w",stdout);
srand((int)time(NULL));
for(int i=0;i<8;i++)
{
int num = rand()%100;
string s = boost::lexical_cast<string>(num);
cout << s << endl;
}
// fclose(stdout);
freopen("/dev/tty", "w", stdout);

// 在终端运行会输出,在IDE里不会输出
cout<<"**************"<<endl;

代码先将输出流定向到文件test.txt,后面的cout<<s<<endl会将几个随机数写入到文件。
如果之后直接fclose(stdout),之后的cout输出肯定不管用,此时把输出流关了,对终端和文件都无法输出了。

在使用完freopen之后,如果还需要使用标准输入输出,不能把它们直接fclose,不妨再次重定向,把stdin、stdout重定向到控制台,就能从键盘接受输入、从屏幕输出。在linux中,控制台设备是/dev/tty,所以再次调用freopen。最后的星号只能在终端执行可执行文件时,才会输出。

以上函数对log4cpp的日志和ROS_INFO也有影响,显然log4cpp的本质也是操作输出流,与freopen相同。

读文件

比如文件内容如下:

1
2
3
4
5
6
31
96
5
86
98
20

读其中数字的代码:

1
2
3
4
5
6
7
8
freopen(file, "r", stdin);
vector<int> data(6,0);

for(int i = 0; i < 6; ++i)
{
cin >> data[i];
cout << "data: " << data[i] ;
}

但是文件中有其他内容时,会读取失败,例如:

第4个元素赋值会成为0,值之后的数据都不正常


传递数组给函数

数组不能赋值给数组

1
2
3
int a[3] = {1,2,3};
int b[3];
b=a;

编译器报错, 数组不能直接赋值, 可以使用std::copy()或手工循环赋值, 但是不可以直接把一个数组赋值给另外一个数组.

但是std::array和std::vector是可以的
1
2
std::array<int, 5> a = {1,2,3,4,5};
std::array<int, 5> b = a;


形式参数是一个已定义大小的数组:

1
2
void myFunction(int param[10])
{}

形式参数是一个未定义大小的数组:

1
2
void myFunction(int param[])
{}

函数而言,数组的长度是无关紧要的,因为 C++ 不会对形式参数执行边界检查。

二维数组做形参

1
2
3
4
5
6
7
8
9
10
void func1(int  iArray[][10])
{

}

int main()
{
int array[10][10];
func1(array);
}

形参声明一定要给出第二个维度的大小,否则编译不过。


轮速里程计

里程计是衡量我们从初始位姿到终点位姿的一个标准,通俗的说,要实现机器人的定位与导航,就需要知道机器人行进了多少距离,是往哪个方向行进的, odom的原点是机器人启动的位置。

里程计一个很重要的特性,是它只关心局部时间上的运动,多数时候是指两个时刻间的运动。当我们以某种间隔对时间进行采样时,就可估计运动物体在各时间间隔之内的运动。由于这个估计受噪声影响,先前时刻的估计误差,会累加到后面时间的运动之上

odom坐标系是在机器人运动前以当前观测到的数据建立的临时的坐标系,避免远离map原点而造成误差。比如轮子的编码器或者视觉里程计算法或者陀螺仪和加速度计。odom坐标系原则上是固定的,但可能会随着机器人的运动漂移,这就是累计误差,漂移导致odom不是一个很有用的长期的全局坐标,但是一个有用的短期坐标。在odom坐标系下机器人的位置必须是连续变化的,不能有突变和跳跃,比如人不能把机器人搬到其他地方。

里程计中也存在线速度角速度,但是这个线速度角速度是通过距离除以时间得到的,没有IMU的精确.

里程计的发布频率不能低,move_base里面默认订阅里程计的频率是20Hz,所以里程计至少也得20Hz,目前机器人的为50Hz

运动模型

机器人在世界坐标系中的位置.png
机器人的位置X,Y可以看做车在极小时间内的位置增量在X,Y方向的分量的积分。我们采用速度推算方式确定机器人的轨迹,在极小时间Δt内,以速度V移动的距离是Δd,分解到两坐标轴可以得到

这样不断累积,就可以得到任意时间的位置:
2020-03-21_094244.png

  • Δt是我们自己取的,比如10ms,可以在ROS中使用ros::Rate实现

  • V和ω并不是我们指定的速度,而是由指定速度和角速度根据航迹推演之后,分配到两个驱动轮上的速度Vl和Vr,再从硬件层根据Kinematics返回的速度Vx和ω。由于驱动轮不是全向轮,所以Vy是0

  • 至于偏航角,有两种方法获取:编码器和IMU。编码器很依赖精度,效果不如IMU。使用IMU的话,θ就是其返回的偏航角Yaw,IMU上电时的Yaw为0,与小车航向一致,只是还要根据公式转换到(-π,π]

tf关系

用到的tf父子关系为: map–>odometry–>base_link ,随着机器人的移动,我们需要生成机器人的里程计信息,如此可以提供机器在map中大致的位置,一般在一个节点里发出odometry->base_link的tf广播,它们的转换信息是每时每刻都在发布的。

odom的原点是机器人启动的位置,因为每次机器人启动的位置不同,所以一开始odom在map上的位置或转换矩阵是未知的,但是AMCL可以根据最佳粒子的位置推算出map->odom(就是说通过最佳粒子推算出机器人在地图的中的位置)的tf转换信息并发布到tf主题上,这样三者的tf父子关系就建立起来了,我们也就知道了base_linkmap中的坐标。

不管机器人在哪启动,odom和map坐标系之间的tf理论上是不变的,发生的改变就是里程计的累计误差

一般来说map坐标系的坐标是通过传感器的信息不断的计算更新而来。比如激光雷达,视觉定位等等。因此能够有效的减少累积误差,但是也导致每次坐标更新可能会产生跳跃。map坐标系是一个很有用的长期全局坐标系。但是由于坐标会跳跃改变,这是一个比较差的局部坐标系(不适合用于避障和局部操作)。

map->odom变换代表了对里程计误差的纠正

odom话题

odom消息成员:

1
2
3
4
std_msgs/Header header
string child_frame_id
geometry_msgs/PoseWithCovariance pose
geometry_msgs/TwistWithCovariance twist

里程计向局部路径规划器传递当前机器人的速度,消息中的速度也在TrajectoryPlannerROS对象中的代价地图包含的robot_base_frame参数表示的坐标系中,一般就是base_link,这个参数是在全局代价地图的yaml文件中。



在rviz中添加Odometry,最好去掉它的Covariance项后面的勾,也就是在Odometry显示中不启用Covariance信息,否则将导致显示效果很难看

代码如下,对不同的机器人,获取线速度和角速度的方法不同,这里没有用到IMU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <ros/ros.h>
#include <tf/transform_broadcaster.h>
#include <nav_msgs/Odometry.h>
#include <boost/shared_ptr.hpp>
#include <geometry_msgs/Twist.h>

static ros::Publisher *odomPubPtr;
double x = 0;
double y = 0;
double theta = 0;
ros::Time current_time;
ros::Time last_time;

geometry_msgs::TransformStamped odom_trans;
nav_msgs::Odometry odom;

void callback(const geometry_msgs::TwistConstPtr& velocity)
{
static tf::TransformBroadcaster odom_broadcaster;
double vx, vy, omega;

current_time = ros::Time::now();

vx = velocity->linear.x;
omega = velocity->angular.z;
//compute odometry in a typical way given the velocities of the robot
double dt = (current_time - last_time).toSec();
double delta_x = (vx * cos(theta) - vy * sin(theta)) * dt;
double delta_y = (vx * sin(theta) + vy * cos(theta)) * dt;
double delta_th = omega * dt;

x += delta_x;
y += delta_y;
theta += delta_th;

//since all odometry is 6DOF we'll need a quaternion created from yaw
geometry_msgs::Quaternion odom_quat = tf::createQuaternionMsgFromYaw(theta);

odom_trans.header.stamp = current_time;
odom_trans.transform.translation.x = x;
odom_trans.transform.translation.y = y;
odom_trans.transform.translation.z = 0.0;
odom_trans.transform.rotation = odom_quat;

odom_broadcaster.sendTransform(odom_trans);
odom.header.stamp = current_time;

//set the position
if(fabs(x) < 0.0001 ) x=0;
if(fabs(y) < 0.0001 ) y=0;
odom.pose.pose.position.x = x;
odom.pose.pose.position.y = y;
odom.pose.pose.position.z = 0.0;
odom.pose.pose.orientation = odom_quat;

odom.twist.twist.linear.x = vx;
odom.twist.twist.linear.y = vy;
odom.twist.twist.angular.z = omega;

odomPubPtr->publish(odom);
ROS_INFO_ONCE(" 开始发布里程计信息 ");
last_time = current_time;
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "odometry_publisher");

ros::NodeHandle n;
setlocale(LC_ALL,"");

last_time = ros::Time::now();
odom_trans.header.frame_id = "odom";
odom_trans.child_frame_id = "base_link";

odom.header.frame_id = "odom";
odom.child_frame_id = "base_link";

ros::Subscriber sub = n.subscribe<geometry_msgs::Twist>("velocity",20,callback);
ros::Publisher odom_pub = n.advertise<nav_msgs::Odometry>("odom", 50);
odomPubPtr = &odom_pub;
ros::spin();
}

这里的里程计频率就由话题velocity的频率决定了,因为是订阅建立后阻塞了,回调函数就一直执行

如果像这样写,每次都要通过网络获取话题的消息,会明显降低里程计的频率

1
2
3
4
5
6
7
8
9
10
boost::shared_ptr<geometry_msgs::Twist const> velEdge;    // 必须有const
velEdge = ros::topic::waitForMessage<geometry_msgs::Twist>("velocity",ros::Duration(1));
if(velEdge != NULL)
{
velocity = *velEdge;
vx = velocity.linear.x;
omega = velocity.angular.z;
}
else
ROS_WARN("linear and angular velocity not received !");

参考:里程计的信息及对其标定
tf between /map and /odom is not static?


容器中的iterator

先看vector的部分源码,只看变量声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
template <class T, class Alloc = alloc>  // 预设使用 alloc 为配置器
class vector
{
public:
typedef T value_type;
typedef value_type* pointer;
typedef value_type* iterator;
typedef const value_type* const_pointer;
typedef const value_type* const_iterator;
typedef value_type& reference;
typedef const value_type& const_reference;
...
}

容器的迭代器iterator其实是类模板,不是一个指针,但实现了指针的功能,不能赋值为NULL,也不能与NULL比较。迭代器采用了iterator模式,类似于数据库中的游标,它屏蔽了底层存储空间的不连续性,在上层使容器元素维持一种连续的假象,迭代器代表的是元素在容器中的相对位置。

如果迭代器iterator要初始化或者做空值比较,需要使用end(),它默认指向容器最后一个元素的下一个位置,可以理解为空.如果非要对iterator进行初始化的话,可以将iterator指向end()函数即可。 判断是否为空即与end()函数做比较即可

看下面代码:

1
2
3
4
5
6
7
8
9
10
std::vector<int> vec;
vec.push_back(1);
vec.push_back(3);
vec.push_back(5);
vec.push_back(7);
std::vector<int>::iterator it= vec.end();
qDebug()<<*(--it); // 像游标
it++;
if(it == vec.end())
qDebug()<<"vec end";

运行结果为:

1
2
7
vec end

迭代器失效

迭代器指向容器中的某个或某些元素的存储发生了变化,使之成为无效迭代器,使用无效迭代器和无效指针一样危险。

解决迭代器失效问题有两种方式:

  1. 使用迭代器时重新获取迭代器

  2. 在修改容器前为其预留足够的空闲空间,以避免存储空间重分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vector<int> l;
l.push_back(1);
l.push_back(2);
l.push_back(3);
l.push_back(4);
l.push_back(5);
l.push_back(6);

vector<int>::iterator it = l.begin();
vector<int>::iterator end = l.end();
Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;  // 6 8
cout<< "begin: "<<*it <<endl; // 1
cout<< "end: "<< *(--end) <<endl; // 6
// 再插入元素
l.push_back(7);
l.push_back(8);
// l.push_back(9);

Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl; // 8 8
cout<<"after push_buck begin: "<< *it <<endl;
cout<<"after push_buck end: "<< *(end) <<endl; // 6

运行结果:

1
2
3
4
5
6
7
8
9
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6

1 2 3 4 5 6 7 8
size: 8 capacity: 8
after push_buck begin: 1
after push_buck end: 6

可以看出进行push_back后,end返回的迭代器肯定是失效的,容器内部没有对其进行更新,erase也是同样结果.

上面代码加上l.push_back(9);后,运行结果成了这样:

1
2
3
4
5
6
7
8
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6
1 2 3 4 5 6 7 8 9
size: 9 capacity: 16
after push_buck begin: 0
after push_buck end: 6

我们往vector插入6个元素,size就是6,但vector分配的容量要稍大一些.如果再插入3个元素,就超出了容量,vector会先分配一个2倍大的内存,然后把原来的元素都拷贝过来,释放原内存,这样迭代器就像悬垂指针一样失效了.当删除容器内的数据时,其存储空间不释放

有些资料说重新分配内存是增加原内存的一半,估计是STL版本和编译器版本不同的缘故,我测试的vector的容量都是2的次方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

vector<int> l;
l.push_back(1);
l.push_back(2);
l.push_back(3);
l.push_back(4);
l.push_back(5);
l.push_back(6);

vector<int>::iterator it = l.begin();
vector<int>::iterator end = l.end();
Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;
cout<< "begin: "<<*it <<endl;
cout<< "end: "<< *(--end) <<endl<<endl;

l.erase(++it);
l.erase(++it);

Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;
cout<<"after erase begin: "<< *it <<endl;
cout<<"after erase end: "<< *(end) <<endl;

运行结果:

1
2
3
4
5
6
7
8
9
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6

1 3 5 6
size: 4 capacity: 8
after erase begin: 5
after erase end: 6

显然begin是失效了

  • vector的push_back操作可能没事,但是一旦引发内存重分配,所有迭代器都会失效
  • vector的insert操作插入点之后的所有迭代器失效,但一旦发生内存重分配,所有迭代器都会失效
  • vector的erase操作插入点之后的所有迭代器失效
  • vector的reserve操作所有迭代器失效(导致内存重分配)

对于序列式容器(如vector,deque),删除当前的iterator会使后面所有元素的iterator都失效。这是因为vetor,deque使用了连续分配的内存,删除一个元素导致后面所有的元素会向前移动一个位置。但是erase方法可以返回下一个有效的iterator

  1. 对于节点式容器(map, list, set)元素的删除,插入操作会导致指向该元素的迭代器失效,其他元素迭代器不受影响;
  2. 对于顺序式容器(vector,string,deque)元素的删除、插入操作会导致指向该元素以及后面的元素的迭代器失效

迭代器的自增减

迭代器可以一直自增或自减,可以超出begin()end()

1
2
3
4
5
6
7
8
9
10
11
vector<int> poses;
poses.push_back(1);

auto iter = poses.end();
iter++;
iter++;
iter++;
cout << " iter - begin: " << iter - poses.begin() << endl <<
" iter - end: " << iter - poses.end() << endl;
cout << "poses size: " << poses.size() << endl;
cout << "iter pose: " << *iter << endl; // 不确定的结果

运行结果:
1
2
3
4
iter - begin: 4
iter - end: 3
poses size: 1
iter pose: 0

超出范围的迭代器,不要取对应的元素值,否则就是bug,最好加判断 if(iter - poses.end() >0)


Boost教程(五)bind,function和signal

bind

使用boost实现回调函数常常用的是bind,bind第一个参数可以接受函数对象、函数指针(包括成员函数),bind最多可以接受9个参数,返回一个函数对象,具有operator(),而且类型可以自动推导。对于类的成员函数,需要传入一个类的对象(或引用、指针),这样就剩8个参数可用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Demo
{
public:
int add(int a, int b) {return a+b; }
static int add_2(int a, int b) {return a+b;}
};

int add_3(int a, int b) {return a+b;}

int main()
{
Demo d;
int a = 1, b=4;
// 类成员函数,d做对象,加&是函数指针
boost::function<int(int,int)> func = boost::bind(&Demo::add, d, a, b);
// 静态成员函数,不用&
boost::function<int(int,int)> func_2 = boost::bind(Demo::add_2, _1, _2);
// 普通函数
boost::function<int(int,int)> func_3 = boost::bind(&add_3, _1, _2);
// 返回函数对象,有operator ()
cout<<"add: "<<func(a,b) <<endl;
cout<<"static add: "<<func_2(a,b) <<endl;
cout<<"function add: "<<func_3(a,b) <<endl;
return 0;
}

这里的func_2,func_3中的bind用了占位符的形式,在ROS中有一个很重要的应用就是在main函数中给变量赋值,在话题订阅的回调函数中,可以获得此变量的值,用的就是bind,而且用到占位符,在回调执行时才传入值。参考:如何让回调函数与main函数互相传值

function

function可以理解为函数对象的容器,也像一种智能的函数指针,可以配合bind使用,作为bind表达式的结果,可以用于回调函数机制,而且具有很大的弹性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int add(int a, int b)
{
return a+b;
}

class Demo
{
public:
int add_2(int a, int b)
{
return a+b;
}
};

int main()
{
boost::function<int(int, int)> func, func_2;
func = add; // 函数对象
cout << sizeof(func) <<endl; // 32
if(func) // 像智能指针
cout<< func(4, 2) <<endl;
func.clear(); // 清空函数对象,这次像容器,也可以用func=0,这像指针
assert(func.empty()); // 是否为空,也是像容器的特性

// 对于类成员函数,只能经过bind
int a=3, b=1;
Demo d;
func_2 = boost::bind(&Demo::add_2, d, a, b);
cout<< func_2( a, b)<<endl;

return 0;
}

调用空的function对象会抛出bad_function_call异常,所以在使用function前最好检测有效性,就像指针一样。从function的一些成员函数来看,它确实既像容器又像指针。

两个function对象不能使用==和!=比较,这是特意的。function对象体积比一般函数指针大,速度慢一点。

signal

boost的signal和Qt的signal道理是一样的,都是观察者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <boost/signals2.hpp>
void slot1(int n)
{
cout<<"slot1: "<<n<<endl;
}

void slot2(int n)
{
cout<<"slot2: "<<n*2<<endl;
}

int main()
{
signals2::signal<void(int)> sig; // 模板与槽函数的声明对应
sig.connect(&slot1);
sig.connect(&slot2, signals2::at_front); // slot2函数放到前面,默认是插到
// sig.disconnect_all_slots(); // 解绑所有槽函数
sig(7); // 触发信号,调用槽函数
return 0;
}

运行结果:

1
2
slot2: 14
slot1: 7


move_base分析(一)使用GetPlan获得路径规划

ROS导航模块的框架如下:

要获得路径规划需要用到move_base中的一个服务make_plan,它就是用于获得从当前位姿到目标位姿的路径,其服务类型为nav_msgs/GetPlan

nav_msgs/GetPlan.srv成员如下:

1
2
3
4
5
6
7
geometry_msgs/PoseStamped start	  // 起始位姿
geometry_msgs/PoseStamped goal // 终点位姿

# 如果目标被阻挡了,规划失败时,规划器能到达的位置在x和y方向约束多少米
float32 tolerance
---
nav_msgs/Path plan // 服务端返回的路径

其中nav_msgs/Path是一个位姿数组,代表了机器人跟随的路径,定义:

1
2
Header header
geometry_msgs/PoseStamped[] poses

其实思路很简单,写一个service的客户端程序,通信move_base的get_plan,service的请求部分就是起始点的位姿, header.frame_id永远是map,返回值是路径,也是位姿数组。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//路线规划代码
#include <ros/ros.h>
#include <nav_msgs/GetPlan.h>
#include <geometry_msgs/PoseStamped.h>
#include <string>
#include <boost/foreach.hpp>
#define forEach BOOST_FOREACH // c++没有foreach

void fillPathRequest(nav_msgs::GetPlan::Request &request)
{
request.start.header.frame_id ="map";
request.start.pose.position.x = 12.378;//初始位置x坐标
request.start.pose.position.y = 28.638;//初始位置y坐标
request.start.pose.orientation.w = 1.0;//方向
request.goal.header.frame_id = "map";
request.goal.pose.position.x = 18.792;//终点坐标
request.goal.pose.position.y = 29.544;
request.goal.pose.orientation.w = 1.0;
request.tolerance = 0.5; //如果不能到达目标,最近可到的约束
}
//路线规划结果回调
void callPlanningService(ros::ServiceClient &serviceClient, nav_msgs::GetPlan &srv)
{
//执行实际路径规划器
if (serviceClient.call(srv))
{
//srv.response.plan.poses 为保存结果的容器,遍历取出
if (!srv.response.plan.poses.empty()) // empty()判断
{
forEach(const geometry_msgs::PoseStamped &p, srv.response.plan.poses)
{
// 一系列点的位姿
ROS_INFO("x = %f, y = %f", p.pose.position.x, p.pose.position.y);
}
}
else
{
ROS_WARN("Got empty plan");
}
}
else {
ROS_ERROR("Failed to call service %s - is the robot moving?",
serviceClient.getService().c_str());
}
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "make_plan_node");
ros::NodeHandle nh;
// Init service query for make plan
//初始化路径规划服务,服务名称为"move_base_node/make_plan"
std::string service_name = "move_base_node/make_plan";
//等待服务空闲,如果已经在运行这个服务,会等到运行结束。
while (!ros::service::waitForService(service_name, ros::Duration(3.0))) {
ROS_INFO("Waiting for service move_base/make_plan to become available");
}
// 服务的客户端,类型就是 nav_msgs::GetPlan
ros::ServiceClient serviceClient = nh.serviceClient<nav_msgs::GetPlan>(service_name, true);
if (!serviceClient) {
ROS_FATAL("Could not initialize get plan service from %s",
serviceClient.getService().c_str());
return -1;
}
nav_msgs::GetPlan srv;
//请求服务:规划路线
fillPathRequest(srv.request);
if (!serviceClient) {
ROS_FATAL("Persistent service connection to %s failed",
serviceClient.getService().c_str());
return -1;
}
ROS_INFO("conntect to %s",serviceClient.getService().c_str());
callPlanningService(serviceClient, srv);
}

有时路径规划失败会报错:


ROS话题通信原理

发布者的代码一般是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ros::init(argc,argv, "Pub");
ros::NodeHandle nh;
ros::Publisher pub = nh.advertise<std_msgs::Int8>("topic", 50, false);
ros::Rate rate(1);

std_msgs::Int8 msg;

srand(time(NULL));

while(ros::ok())
{
msg.data = rand()%100;
pub.publish(msg);
ROS_INFO("msg:%d",msg.data);

ros::spinOnce();
rate.sleep();
}

注册话题

advertise函数的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Publisher NodeHandle::advertise(AdvertiseOptions& ops)
{
ops.topic = resolveName(ops.topic);
if (ops.callback_queue == 0)
{
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else // 实际执行这里
{
// spinOnce() 就是 g_global_queue->callAvailable(ros::WallDuration());
ops.callback_queue = getGlobalCallbackQueue();
}
}
SubscriberCallbacksPtr callbacks(boost::make_shared<SubscriberCallbacks>(ops.connect_cb, ops.disconnect_cb, ops.tracked_object, ops.callback_queue));

if (TopicManager::instance()->advertise(ops, callbacks))
{
Publisher pub(ops.topic, ops.md5sum, ops.datatype, *this, callbacks);
{
boost::mutex::scoped_lock lock(collection_->mutex_);
// `collection_`在NodeHandle构造函数中初始化为0,也就是空指针,在NodeHandle::construct中分配内存
// impl_是boost::shared_ptr< Impl >, pubs_ 是std::vector< Publisher::ImplWPtr >
collection_->pubs_.push_back(pub.impl_);
}
return pub; // 执行这里
}
return Publisher();
}

NodeHandle::advertise的流程.png

NodeHandle部分成员:

1
2
3
4
5
6
7
class NodeHandle
{

private:
CallbackQueueInterface * callback_queue_;
NodeHandleBackingCollection * collection_;
};

collection_在NodeHandle构造函数中初始化为0,也就是空指针。其类型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class NodeHandleBackingCollection
{
public:
typedef std::vector< Publisher::ImplWPtr > V_PubImpl;
typedef std::vector< ServiceClient::ImplWPtr > V_SrvCImpl;
typedef std::vector< ServiceServer::ImplWPtr > V_SrvImpl;
typedef std::vector< Subscriber::ImplWPtr > V_SubImpl;

public:
ServiceManagerPtr keep_alive_service_manager = ServiceManager::instance()
TopicManagerPtr keep_alive_topic_manager = TopicManager::instance()

boost::mutex mutex_;
V_PubImpl pubs_;
V_SrvCImpl srv_cs_;
V_SrvImpl srvs_;
V_SubImpl subs_;
};


//一个简单的广播话题的函数, 至少传入两个参数,topic:话题名字,必须独一无二,不能重复, queue_size:发送到subscriber的消息缓存数量
template <class M>
Publisher advertise(const std::string& topic, uint32_t queue_size, bool latch = false){
//话题广播时的配置选项
AdvertiseOptions ops;
    //n.advertise<std_msgs::String>("topic_m",1000)语句运行时<M>为: <std_msgs::String>
    //看不懂, 貌似在调用对象ops的成员函数init(),估计类似于opt.init<M>(topic, queue_size);
ops.template init<M>(topic, queue_size);
ops.latch = latch;
return advertise(ops);//又调用了下面的advertise()函数
}

AdvertiseOptions定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*  AdvertiseOptions是一个类,在/opt/ros/kinetic/include/ros/advertise_options.h
* \brief Constructor
* \param _topic Topic to publish on
* \param _queue_size Maximum number of outgoing messages to be queued for delivery to subscribers
* \param _md5sum The md5sum of the message datatype published on this topic
* \param _datatype Datatype of the message published on this topic (eg. "std_msgs/String")
* \param _connect_cb Function to call when a subscriber connects to this topic
* \param _disconnect_cb Function to call when a subscriber disconnects from this topic
*/
AdvertiseOptions(const std::string& _topic, uint32_t _queue_size, const std::string& _md5sum,
const std::string& _datatype, const std::string& _message_definition,
const SubscriberStatusCallback& _connect_cb = SubscriberStatusCallback(),
const SubscriberStatusCallback& _disconnect_cb = SubscriberStatusCallback())
: topic(_topic)
, queue_size(_queue_size)
, md5sum(_md5sum)
, datatype(_datatype)
, message_definition(_message_definition)
, connect_cb(_connect_cb)
, disconnect_cb(_disconnect_cb)
, callback_queue(0)
, latch(false)
, has_header(false)
{}

发布话题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename M>
void publish(const M& message) const
{
using namespace serialization;
namespace mt = ros::message_traits;

if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
return;
}

if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
// 验证消息和发布者的类型是否对应
ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(message),
"Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
mt::datatype<M>(message), mt::md5sum<M>(message),
impl_->datatype_.c_str(), impl_->md5sum_.c_str());

SerializedMessage m;
publish(boost::bind(serializeMessage<M>, boost::ref(message)), m);
}

接着再调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
// 验证publisher合法性
if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
}

然后是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
if (isShuttingDown())
{
return;
}

PublicationPtr p = lookupPublicationWithoutLock(topic);
if (p->hasSubscribers() || p->isLatching())
{
ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

// Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can do a no-copy publish.
bool nocopy = false;
bool serialize = false;
// We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
if (m.type_info && m.message)
p->getPublishTypes(serialize, nocopy, *m.type_info);
else
serialize = true;

if (!nocopy)
{
m.message.reset();
m.type_info = 0;
}
if (serialize || p->isLatching())
{
SerializedMessage m2 = serfunc();
m.buf = m2.buf;
m.num_bytes = m2.num_bytes;
m.message_start = m2.message_start;
}
p->publish(m);

// If we're not doing a serialized publish we don't need to signal the pollset. The write()
// call inside signal() is actually relatively expensive when doing a nocopy publish.
if (serialize)
{
poll_manager_->getPollSet().signal();
}
}
else
{
p->incrementSequence();
}
}


探索ROS中的XML-RPC机制(二)客户端

HelloClient

客户端程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include "xmlrpcpp/XmlRpc.h"
#include <iostream>
using namespace XmlRpc;

int main(int argc, char* argv[])
{
if (argc != 3) {
std::cerr << "Usage: HelloClient serverHost serverPort\n";
return -1;
}
int port = atoi(argv[2]);
XmlRpc::setVerbosity(5);

// Use introspection API to look up the supported methods
XmlRpcClient c(argv[1], port);
XmlRpcValue noArg, result;
for (int i = 0; i < 2000; i++)
{
if (c.execute("system.listMethods", noArg, result))
std::cout << "\nMethods:\n " << result << "\n\n";
else
std::cout << "Error calling 'listMethods'\n\n";
}
// Call the HelloName method
oneArg[0] = "Chris";
if (c.execute("HelloName", oneArg, result))
std::cout << result << "\n\n";
else
std::cout << "Error calling 'HelloName'\n\n";

return 0;
}

看完服务端,客户端程序就更简单了,XmlRpcClient的构造函数是XmlRpcClient(const char* host, int port, const char* uri=0);,第一个参数是运行服务端程序的远方主机的IP(Server在本机时可用localhost),第二个是端口。

看类XmlRpcServer会发现自带了两个系统methods:system.listMethods和system.methodHelp,各对应一个类,我们可以用客户端直接调用这两个method。

在执行HelloName时需要使用另一个重要的类XmlRpcValue,对若干基本的C++的数据类型进行了封装,涉及的主要数据类型包括如下几个:

1
2
3
4
5
6
7
8
bool          asBool;
int asInt;
double asDouble;
struct tm* asTime;
std::string* asString;
BinaryData* asBinary; //typedef std::vector<char> BinaryData;
ValueArray* asArray; //typedef std::vector<XmlRpcValue> ValueArray;
ValueStruct* asStruct; //typedef std::map<std::string, XmlRpcValue> ValueStruct;

几个重要函数:

1
2
3
4
5
6
7
8
9
bool fromXml(std::string const& valueXml, int* offset);
std::string toXml() const;

bool valid() const
Type const &getType() const
// XML解码等一系列函数
bool boolFromXml(std::string const& valueXml, int* offset);
// XML编码等一系列函数
std::string boolToXml() const;


execute函数

执行远程服务端的方法,参数含义一目了然。如果请求发送成功且受到结果,函数返回true。
这是个同步(阻塞)的实现,直到客户端收到回复或错误才返回,否则一直阻塞。使用isFault()函数判断结果是否错误。

大致说一下函数的流程,先是执行setupConnection,与服务端建立连接.然后是生成要发送的XML文本在函数XmlRpcClient::generateRequest,处理了method和参数值.

客户端执行execute后,事件分发器执行XmlRpcClient::handleEvent,处理服务端的回复,主要调用下面三个函数:

  • 首先是XmlRpcClient::writeRequest(),日志从attempt 1 到 XmlRpcClient::writeRequest: wrote 258 of 258 bytes
  • 然后是XmlRpcClient::readHeader(),日志一直到 client read conten length: 122
  • 最后是XmlRpcClient::readResponse(),日志从XmlRpcClient::readResponse (read 122 bytes) 到 response的XML文本

现在回到execute函数,打出日志XmlRpcClient::execute: method HelloName completed

服务端回复的结果赋给result是在XmlRpcClient::parseResponse,这个函数里解析XML文本取出结果.

最后调用XmlRpcClient的析构函数,实际就是close()




整个通信过程中,客户端发的请求是:

1
2
3
4
5
6
7
8
9
POST /RPC2 HTTP/1.1                 # HTTP 1.1默认进行持久连接 Keep-Alive
User-Agent: XMLRPC++ 0.7 # 客户端浏览器类别
Host: 192.168.10.14:8888
Content-Type: text/xml # 这次类型是text/xml
Content-length: 138 # 发送请求的长度

<?xml version="1.0"?>
<methodCall><methodName>HelloName</methodName>
<params><param><value>Chris</value></param></params></methodCall>

服务端回复的是:

1
2
3
4
5
6
7
8
9
HTTP/1.1 200 OK
Server: XMLRPC++ 0.7 # web服务器软件名称
Content-Type: text/xml
Content-length: 122 # 响应体的长度

<?xml version="1.0"?>
<methodResponse><params><param>
<value>Hello, Chris</value>
</param></params></methodResponse>


解读NodeHandle源码(二)ros start()函数

开始简单的几行如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
boost::mutex::scoped_lock lock(g_start_mutex);
if (g_started) //如果节点已经运行,返回
{
return;
}

g_shutdown_requested = false;
g_shutting_down = false;
g_started = true;
g_ok = true;

bool enable_debug = false;
std::string enable_debug_env;
// 获得环境变量,是否DEBUG
if ( get_environment_variable(enable_debug_env,"ROSCPP_ENABLE_DEBUG") )
{
try
{
enable_debug = boost::lexical_cast<bool>(enable_debug_env.c_str());
}
catch (boost::bad_lexical_cast&)
{
}
}

下面是重要的一句:XMLRPCManager::instance()->bind("shutdown", shutdownCallback);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool XMLRPCManager::bind(const std::string& function_name, const XMLRPCFunc& cb)
{
boost::mutex::scoped_lock lock(functions_mutex_);
if (functions_.find(function_name) != functions_.end())
{
return false;
}

FunctionInfo info;
info.name = function_name;
info.function = cb;
info.wrapper.reset(new XMLRPCCallWrapper(function_name, cb, &server_));
functions_[function_name] = info;

return true;
}

几个变量的定义:

1
2
3
4
5
6
7
8
9
10
typedef boost::shared_ptr<XMLRPCCallWrapper> XMLRPCCallWrapperPtr;
struct FunctionInfo
{
std::string name;
XMLRPCFunc function;
XMLRPCCallWrapperPtr wrapper;
};
typedef std::map<std::string, FunctionInfo> M_StringToFuncInfo;
boost::mutex functions_mutex_;
M_StringToFuncInfo functions_;

关键的XMLRPCCallWrapper其实是XML-RPC服务端的子类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class XMLRPCCallWrapper : public XmlRpcServerMethod
{
public:
XMLRPCCallWrapper(const std::string& function_name, const XMLRPCFunc& cb, XmlRpcServer *s)
: XmlRpcServerMethod(function_name, s)
, name_(function_name)
, func_(cb)
{ }

void execute(XmlRpcValue &params, XmlRpcValue &result)
{
func_(params, result);
}

private:
std::string name_;
XMLRPCFunc func_;
};