激光雷达的配置和ROS消息

配置雷达

我用的雷达是sick LMS511,配置步骤可以参考SICK激光雷达LMS511笔记

连接直流电源24V,将雷达网线连接到无线路由器。准备一个windows电脑,连接到雷达的同一个无线网,开启雷达配送的调试工具Sick sensor intellingence,搜索设备获得雷达,记住IP:192.168.10.100,双击设备图标进入配置页面。

在另一台Ubuntu电脑上进行驱动配置,也是连到同一个无线网,先测试能否ping通,如不能则按上面步骤配置。

使用驱动lms511_laser_node,下载后放到某工作空间中,先在lms5xx_node.cpp中,在ip_add修改雷达的IP,然后使用catkin_make --pkg laser_node编译。

数据表示

激光雷达获取的数据是以以雷达为中心的极坐标系表示, 激光雷达以数据帧的方式返还数据
2020-05-31_105134.png
2020-05-31_105250.png

雷达的消息和参数

消息成员

启动roscore后,用rosrun laser_node laser_node启动驱动程序,会有很多输出信息。
使用命令rosnode list查看运行的节点列表,发现了sicklms5xx

使用rostopic list查看主题,发现了主题scan,信息如下:

其消息类型是sensor_msgs/LaserScan,发布者是sicklidar,其消息成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
Header header            # 时间戳是收到第一束激光的时间
# in frame frame_id, angles are measured around
# the positive Z axis (counterclockwise, if Z is up)
# with zero angle being forward along the x axis
float32 angle_min # 扫描起始角度[rad]
float32 angle_max # 扫描结束角度[rad]
float32 angle_increment # 角分辨率,相邻扫描点的角度增量,[弧度]
float32 time_increment # 每个数据点的时间间隔 [秒]
float32 scan_time # 当前帧数据与下一帧数据的时间间隔 [秒]
float32 range_min # 最小的扫描距离 [m]
float32 range_max # 最大的扫描距离 [m]
float32[] ranges # 各个方向的扫描数据 [m]
float32[] intensities # 雷达的强度数据,思岚有,有些SICK雷达没有

使用命令rostopic echo scan可查看雷达扫描到的数据:

扫描角度

1
2
angle_min: -1.65806281567
angle_max: 1.65806281567

经过换算,其实就是-95°~95°,而雷达的扫描范围是190°,显然扫描范围是个对称的扇形。雷达角度范围是-1.658~1.658, 角分辨率是0.00872, 这样算出来大概是380, 也就是380个激光束射出。

雷达有旋转方向的问题,大部分雷达都是逆时针旋转,这和ROS中的习惯一致,少数雷达是顺时针,这就会对建图和定位产生问题。比如EAI YDLIDAR G4既可正转也可反转。

判断angle_minangle_max的大小,就能知道雷达扫描的方向,因为雷达的扫描角度都是从angle_min到angle_max变化的。如果angle_min<angle_max,就是逆时针。这样雷达数据从极坐标系转换到笛卡尔右手系时,才能使用公式angle = angle_min + (i * angle_increment)。某些雷达已经在驱动里处理过了,即使顺时针旋转,也可以直接用此公式。

换句话说,angle_min应该叫开始角度angle_max应该叫结束角度

扫描距离

激光传感器在每次扫描都会返回几百个数据,这就是ranges数组。每秒钟还要扫描很多次,这是由scan_time描述,也就是每秒扫出一个扇形(一帧),对每个扫描束都必须处理一个ray-tracing操作.

需要保证range数组中元素的存储顺序与扫描到顺序一致:

  • ranges数组中第一个元素存的scan中第一个扫描到点
  • ranges数组中最后一个元素存的scan中最后一个扫描到的点

如果我们有个rosbag文件,使用rosbag info可以看到这样的结果:
scan消息.png
这里面有360个msg,也就是雷达扫描了360次,注意这不是ranges数组的大小

ranges是个float32类型数组,但不同雷达的ranges大小不同,获得数组大小的方法是 rostopic echo --noarr /scan/ranges,会不断显示<array type: float32, length: 381>,可以看到ranges的大小是381,和激光束的数量基本对应。

A2的量程是0.15-12/18,还是基于白色高反光物体测得。 使用A2时,不能离障碍物太近,否则会出现ranges数组很多个0的情况,这一点在导航时要注意

LMS511最大检测81m,所以说适合室外。这个参数一般是在良好的反射率情况下得到的,对黑色物体的检测量程没这么大。


有时的ranges会出现很多inf和nan的情况,这个对不同雷达的处理可能不同,对于A2,可以看一下源码是这样的:

1
2
if (read_value == 0.0)
scan_msg.ranges[node_count-1-i] = std::numeric_limits<float>::infinity();

也就是读取距离为0时,输出inf

对于北洋雷达,距离超过测量范围时候的两种情况是INF,其余由于强度或其他原因导致的数据错误会输出。

强度

雷达数据强度是反映生成某点的激光雷达脉冲回波强度的一种测量指标(针对每个点而采集)。 雷达强度数据跟激光的反射情况有关,如果射向黑色物体,可能不会有反馈 数组intensities与ranges中的数据是一一对应的,描述的是每个测量数据的强度。

雷达在室外跟室内区别, 基本的点就是光照条件的区别

问题

  • 启动地图后,移动机器人,雷达轨迹跟随移动

先看雷达安装朝上还是朝下,程序中要保持一致。如果没问题,把所有程序关闭再启动,一般会恢复正常

  • 启动雷达,雷达未分配到IP,导致无法使用;运行雷达节点时,突然报错,雷达无法扫描到数据,然后雷达IP丢失

  1. 雷达扫描的结果比实际偏一定角度

偏移的scan.png
正常scan.png
再三试验,应当是rviz视角问题

参考:
Turtlebot3中级教程-应用-自动泊车
EAI的YDLIDAR-X4激光雷达在ROS下使用方法


传递数组给函数

数组不能赋值给数组

1
2
3
int a[3] = {1,2,3};
int b[3];
b=a;

编译器报错, 数组不能直接赋值, 可以使用std::copy()或手工循环赋值, 但是不可以直接把一个数组赋值给另外一个数组.

但是std::array和std::vector是可以的
1
2
std::array<int, 5> a = {1,2,3,4,5};
std::array<int, 5> b = a;


形式参数是一个已定义大小的数组:

1
2
void myFunction(int param[10])
{}

形式参数是一个未定义大小的数组:

1
2
void myFunction(int param[])
{}

函数而言,数组的长度是无关紧要的,因为 C++ 不会对形式参数执行边界检查。

二维数组做形参

1
2
3
4
5
6
7
8
9
10
void func1(int  iArray[][10])
{

}

int main()
{
int array[10][10];
func1(array);
}

形参声明一定要给出第二个维度的大小,否则编译不过。


轮速里程计

里程计是衡量我们从初始位姿到终点位姿的一个标准,通俗的说,要实现机器人的定位与导航,就需要知道机器人行进了多少距离,是往哪个方向行进的, odom的原点是机器人启动的位置。

里程计一个很重要的特性,是它只关心局部时间上的运动,多数时候是指两个时刻间的运动。当我们以某种间隔对时间进行采样时,就可估计运动物体在各时间间隔之内的运动。由于这个估计受噪声影响,先前时刻的估计误差,会累加到后面时间的运动之上

odom坐标系是在机器人运动前以当前观测到的数据建立的临时的坐标系,避免远离map原点而造成误差。比如轮子的编码器或者视觉里程计算法或者陀螺仪和加速度计。odom坐标系原则上是固定的,但可能会随着机器人的运动漂移,这就是累计误差,漂移导致odom不是一个很有用的长期的全局坐标,但是一个有用的短期坐标。在odom坐标系下机器人的位置必须是连续变化的,不能有突变和跳跃,比如人不能把机器人搬到其他地方。

里程计中也存在线速度角速度,但是这个线速度角速度是通过距离除以时间得到的,没有IMU的精确.

里程计的发布频率不能低,move_base里面默认订阅里程计的频率是20Hz,所以里程计至少也得20Hz,目前机器人的为50Hz

运动模型

机器人在世界坐标系中的位置.png
机器人的位置X,Y可以看做车在极小时间内的位置增量在X,Y方向的分量的积分。我们采用速度推算方式确定机器人的轨迹,在极小时间Δt内,以速度V移动的距离是Δd,分解到两坐标轴可以得到

这样不断累积,就可以得到任意时间的位置:
2020-03-21_094244.png

  • Δt是我们自己取的,比如10ms,可以在ROS中使用ros::Rate实现

  • V和ω并不是我们指定的速度,而是由指定速度和角速度根据航迹推演之后,分配到两个驱动轮上的速度Vl和Vr,再从硬件层根据Kinematics返回的速度Vx和ω。由于驱动轮不是全向轮,所以Vy是0

  • 至于偏航角,有两种方法获取:编码器和IMU。编码器很依赖精度,效果不如IMU。使用IMU的话,θ就是其返回的偏航角Yaw,IMU上电时的Yaw为0,与小车航向一致,只是还要根据公式转换到(-π,π]

tf关系

用到的tf父子关系为: map–>odometry–>base_link ,随着机器人的移动,我们需要生成机器人的里程计信息,如此可以提供机器在map中大致的位置,一般在一个节点里发出odometry->base_link的tf广播,它们的转换信息是每时每刻都在发布的。

odom的原点是机器人启动的位置,因为每次机器人启动的位置不同,所以一开始odom在map上的位置或转换矩阵是未知的,但是AMCL可以根据最佳粒子的位置推算出map->odom(就是说通过最佳粒子推算出机器人在地图的中的位置)的tf转换信息并发布到tf主题上,这样三者的tf父子关系就建立起来了,我们也就知道了base_linkmap中的坐标。

不管机器人在哪启动,odom和map坐标系之间的tf理论上是不变的,发生的改变就是里程计的累计误差

一般来说map坐标系的坐标是通过传感器的信息不断的计算更新而来。比如激光雷达,视觉定位等等。因此能够有效的减少累积误差,但是也导致每次坐标更新可能会产生跳跃。map坐标系是一个很有用的长期全局坐标系。但是由于坐标会跳跃改变,这是一个比较差的局部坐标系(不适合用于避障和局部操作)。

map->odom变换代表了对里程计误差的纠正

odom话题

odom消息成员:

1
2
3
4
std_msgs/Header header
string child_frame_id
geometry_msgs/PoseWithCovariance pose
geometry_msgs/TwistWithCovariance twist

里程计向局部路径规划器传递当前机器人的速度,消息中的速度也在TrajectoryPlannerROS对象中的代价地图包含的robot_base_frame参数表示的坐标系中,一般就是base_link,这个参数是在全局代价地图的yaml文件中。



在rviz中添加Odometry,最好去掉它的Covariance项后面的勾,也就是在Odometry显示中不启用Covariance信息,否则将导致显示效果很难看

代码如下,对不同的机器人,获取线速度和角速度的方法不同,这里没有用到IMU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <ros/ros.h>
#include <tf/transform_broadcaster.h>
#include <nav_msgs/Odometry.h>
#include <boost/shared_ptr.hpp>
#include <geometry_msgs/Twist.h>

static ros::Publisher *odomPubPtr;
double x = 0;
double y = 0;
double theta = 0;
ros::Time current_time;
ros::Time last_time;

geometry_msgs::TransformStamped odom_trans;
nav_msgs::Odometry odom;

void callback(const geometry_msgs::TwistConstPtr& velocity)
{
static tf::TransformBroadcaster odom_broadcaster;
double vx, vy, omega;

current_time = ros::Time::now();

vx = velocity->linear.x;
omega = velocity->angular.z;
//compute odometry in a typical way given the velocities of the robot
double dt = (current_time - last_time).toSec();
double delta_x = (vx * cos(theta) - vy * sin(theta)) * dt;
double delta_y = (vx * sin(theta) + vy * cos(theta)) * dt;
double delta_th = omega * dt;

x += delta_x;
y += delta_y;
theta += delta_th;

//since all odometry is 6DOF we'll need a quaternion created from yaw
geometry_msgs::Quaternion odom_quat = tf::createQuaternionMsgFromYaw(theta);

odom_trans.header.stamp = current_time;
odom_trans.transform.translation.x = x;
odom_trans.transform.translation.y = y;
odom_trans.transform.translation.z = 0.0;
odom_trans.transform.rotation = odom_quat;

odom_broadcaster.sendTransform(odom_trans);
odom.header.stamp = current_time;

//set the position
if(fabs(x) < 0.0001 ) x=0;
if(fabs(y) < 0.0001 ) y=0;
odom.pose.pose.position.x = x;
odom.pose.pose.position.y = y;
odom.pose.pose.position.z = 0.0;
odom.pose.pose.orientation = odom_quat;

odom.twist.twist.linear.x = vx;
odom.twist.twist.linear.y = vy;
odom.twist.twist.angular.z = omega;

odomPubPtr->publish(odom);
ROS_INFO_ONCE(" 开始发布里程计信息 ");
last_time = current_time;
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "odometry_publisher");

ros::NodeHandle n;
setlocale(LC_ALL,"");

last_time = ros::Time::now();
odom_trans.header.frame_id = "odom";
odom_trans.child_frame_id = "base_link";

odom.header.frame_id = "odom";
odom.child_frame_id = "base_link";

ros::Subscriber sub = n.subscribe<geometry_msgs::Twist>("velocity",20,callback);
ros::Publisher odom_pub = n.advertise<nav_msgs::Odometry>("odom", 50);
odomPubPtr = &odom_pub;
ros::spin();
}

这里的里程计频率就由话题velocity的频率决定了,因为是订阅建立后阻塞了,回调函数就一直执行

如果像这样写,每次都要通过网络获取话题的消息,会明显降低里程计的频率

1
2
3
4
5
6
7
8
9
10
boost::shared_ptr<geometry_msgs::Twist const> velEdge;    // 必须有const
velEdge = ros::topic::waitForMessage<geometry_msgs::Twist>("velocity",ros::Duration(1));
if(velEdge != NULL)
{
velocity = *velEdge;
vx = velocity.linear.x;
omega = velocity.angular.z;
}
else
ROS_WARN("linear and angular velocity not received !");

参考:里程计的信息及对其标定
tf between /map and /odom is not static?


容器中的iterator

先看vector的部分源码,只看变量声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
template <class T, class Alloc = alloc>  // 预设使用 alloc 为配置器
class vector
{
public:
typedef T value_type;
typedef value_type* pointer;
typedef value_type* iterator;
typedef const value_type* const_pointer;
typedef const value_type* const_iterator;
typedef value_type& reference;
typedef const value_type& const_reference;
...
}

容器的迭代器iterator其实是类模板,不是一个指针,但实现了指针的功能,不能赋值为NULL,也不能与NULL比较。迭代器采用了iterator模式,类似于数据库中的游标,它屏蔽了底层存储空间的不连续性,在上层使容器元素维持一种连续的假象,迭代器代表的是元素在容器中的相对位置。

如果迭代器iterator要初始化或者做空值比较,需要使用end(),它默认指向容器最后一个元素的下一个位置,可以理解为空.如果非要对iterator进行初始化的话,可以将iterator指向end()函数即可。 判断是否为空即与end()函数做比较即可

看下面代码:

1
2
3
4
5
6
7
8
9
10
std::vector<int> vec;
vec.push_back(1);
vec.push_back(3);
vec.push_back(5);
vec.push_back(7);
std::vector<int>::iterator it= vec.end();
qDebug()<<*(--it); // 像游标
it++;
if(it == vec.end())
qDebug()<<"vec end";

运行结果为:

1
2
7
vec end

迭代器失效

迭代器指向容器中的某个或某些元素的存储发生了变化,使之成为无效迭代器,使用无效迭代器和无效指针一样危险。

解决迭代器失效问题有两种方式:

  1. 使用迭代器时重新获取迭代器

  2. 在修改容器前为其预留足够的空闲空间,以避免存储空间重分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vector<int> l;
l.push_back(1);
l.push_back(2);
l.push_back(3);
l.push_back(4);
l.push_back(5);
l.push_back(6);

vector<int>::iterator it = l.begin();
vector<int>::iterator end = l.end();
Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;  // 6 8
cout<< "begin: "<<*it <<endl; // 1
cout<< "end: "<< *(--end) <<endl; // 6
// 再插入元素
l.push_back(7);
l.push_back(8);
// l.push_back(9);

Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl; // 8 8
cout<<"after push_buck begin: "<< *it <<endl;
cout<<"after push_buck end: "<< *(end) <<endl; // 6

运行结果:

1
2
3
4
5
6
7
8
9
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6

1 2 3 4 5 6 7 8
size: 8 capacity: 8
after push_buck begin: 1
after push_buck end: 6

可以看出进行push_back后,end返回的迭代器肯定是失效的,容器内部没有对其进行更新,erase也是同样结果.

上面代码加上l.push_back(9);后,运行结果成了这样:

1
2
3
4
5
6
7
8
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6
1 2 3 4 5 6 7 8 9
size: 9 capacity: 16
after push_buck begin: 0
after push_buck end: 6

我们往vector插入6个元素,size就是6,但vector分配的容量要稍大一些.如果再插入3个元素,就超出了容量,vector会先分配一个2倍大的内存,然后把原来的元素都拷贝过来,释放原内存,这样迭代器就像悬垂指针一样失效了.当删除容器内的数据时,其存储空间不释放

有些资料说重新分配内存是增加原内存的一半,估计是STL版本和编译器版本不同的缘故,我测试的vector的容量都是2的次方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

vector<int> l;
l.push_back(1);
l.push_back(2);
l.push_back(3);
l.push_back(4);
l.push_back(5);
l.push_back(6);

vector<int>::iterator it = l.begin();
vector<int>::iterator end = l.end();
Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;
cout<< "begin: "<<*it <<endl;
cout<< "end: "<< *(--end) <<endl<<endl;

l.erase(++it);
l.erase(++it);

Print(l);
cout<<"size: "<<l.size()<<" capacity: "<<l.capacity()<<endl;
cout<<"after erase begin: "<< *it <<endl;
cout<<"after erase end: "<< *(end) <<endl;

运行结果:

1
2
3
4
5
6
7
8
9
1 2 3 4 5 6 
size: 6 capacity: 8
begin: 1
end: 6

1 3 5 6
size: 4 capacity: 8
after erase begin: 5
after erase end: 6

显然begin是失效了

  • vector的push_back操作可能没事,但是一旦引发内存重分配,所有迭代器都会失效
  • vector的insert操作插入点之后的所有迭代器失效,但一旦发生内存重分配,所有迭代器都会失效
  • vector的erase操作插入点之后的所有迭代器失效
  • vector的reserve操作所有迭代器失效(导致内存重分配)

对于序列式容器(如vector,deque),删除当前的iterator会使后面所有元素的iterator都失效。这是因为vetor,deque使用了连续分配的内存,删除一个元素导致后面所有的元素会向前移动一个位置。但是erase方法可以返回下一个有效的iterator

  1. 对于节点式容器(map, list, set)元素的删除,插入操作会导致指向该元素的迭代器失效,其他元素迭代器不受影响;
  2. 对于顺序式容器(vector,string,deque)元素的删除、插入操作会导致指向该元素以及后面的元素的迭代器失效

迭代器的自增减

迭代器可以一直自增或自减,可以超出begin()end()

1
2
3
4
5
6
7
8
9
10
11
vector<int> poses;
poses.push_back(1);

auto iter = poses.end();
iter++;
iter++;
iter++;
cout << " iter - begin: " << iter - poses.begin() << endl <<
" iter - end: " << iter - poses.end() << endl;
cout << "poses size: " << poses.size() << endl;
cout << "iter pose: " << *iter << endl; // 不确定的结果

运行结果:
1
2
3
4
iter - begin: 4
iter - end: 3
poses size: 1
iter pose: 0

超出范围的迭代器,不要取对应的元素值,否则就是bug,最好加判断 if(iter - poses.end() >0)


move_base分析(一)使用GetPlan获得路径规划

ROS导航模块的框架如下:

要获得路径规划需要用到move_base中的一个服务make_plan,它就是用于获得从当前位姿到目标位姿的路径,其服务类型为nav_msgs/GetPlan

nav_msgs/GetPlan.srv成员如下:

1
2
3
4
5
6
7
geometry_msgs/PoseStamped start	  // 起始位姿
geometry_msgs/PoseStamped goal // 终点位姿

# 如果目标被阻挡了,规划失败时,规划器能到达的位置在x和y方向约束多少米
float32 tolerance
---
nav_msgs/Path plan // 服务端返回的路径

其中nav_msgs/Path是一个位姿数组,代表了机器人跟随的路径,定义:

1
2
Header header
geometry_msgs/PoseStamped[] poses

其实思路很简单,写一个service的客户端程序,通信move_base的get_plan,service的请求部分就是起始点的位姿, header.frame_id永远是map,返回值是路径,也是位姿数组。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//路线规划代码
#include <ros/ros.h>
#include <nav_msgs/GetPlan.h>
#include <geometry_msgs/PoseStamped.h>
#include <string>
#include <boost/foreach.hpp>
#define forEach BOOST_FOREACH // c++没有foreach

void fillPathRequest(nav_msgs::GetPlan::Request &request)
{
request.start.header.frame_id ="map";
request.start.pose.position.x = 12.378;//初始位置x坐标
request.start.pose.position.y = 28.638;//初始位置y坐标
request.start.pose.orientation.w = 1.0;//方向
request.goal.header.frame_id = "map";
request.goal.pose.position.x = 18.792;//终点坐标
request.goal.pose.position.y = 29.544;
request.goal.pose.orientation.w = 1.0;
request.tolerance = 0.5; //如果不能到达目标,最近可到的约束
}
//路线规划结果回调
void callPlanningService(ros::ServiceClient &serviceClient, nav_msgs::GetPlan &srv)
{
//执行实际路径规划器
if (serviceClient.call(srv))
{
//srv.response.plan.poses 为保存结果的容器,遍历取出
if (!srv.response.plan.poses.empty()) // empty()判断
{
forEach(const geometry_msgs::PoseStamped &p, srv.response.plan.poses)
{
// 一系列点的位姿
ROS_INFO("x = %f, y = %f", p.pose.position.x, p.pose.position.y);
}
}
else
{
ROS_WARN("Got empty plan");
}
}
else {
ROS_ERROR("Failed to call service %s - is the robot moving?",
serviceClient.getService().c_str());
}
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "make_plan_node");
ros::NodeHandle nh;
// Init service query for make plan
//初始化路径规划服务,服务名称为"move_base_node/make_plan"
std::string service_name = "move_base_node/make_plan";
//等待服务空闲,如果已经在运行这个服务,会等到运行结束。
while (!ros::service::waitForService(service_name, ros::Duration(3.0))) {
ROS_INFO("Waiting for service move_base/make_plan to become available");
}
// 服务的客户端,类型就是 nav_msgs::GetPlan
ros::ServiceClient serviceClient = nh.serviceClient<nav_msgs::GetPlan>(service_name, true);
if (!serviceClient) {
ROS_FATAL("Could not initialize get plan service from %s",
serviceClient.getService().c_str());
return -1;
}
nav_msgs::GetPlan srv;
//请求服务:规划路线
fillPathRequest(srv.request);
if (!serviceClient) {
ROS_FATAL("Persistent service connection to %s failed",
serviceClient.getService().c_str());
return -1;
}
ROS_INFO("conntect to %s",serviceClient.getService().c_str());
callPlanningService(serviceClient, srv);
}

有时路径规划失败会报错:


使用jsoncpp读写JSON

还可以用另一个库 nlohmann

从Github上下载后,使用老三步安装:

1
2
3
4
mkdir dir && cd dir
cmake ..
make
sudo make install

安装的路径如下:

在CMakeList里进行配置,别忘了最后链接库文件:-ljsoncpp

写JSON

1
2
3
4
5
6
7
Json::Value ret_json;

ret_json["msg"] = "Hello JSON";
ret_json["code"] = 123;
ret_json["ret"] = true;

cout << ret_json.toStyledString()<< endl;

运行结果:

1
2
3
4
5
{
"code" : 123,
"msg" : "Hello JSON",
"ret" : true
}

写JSON数组如下:

1
2
3
4
5
6
7
8
9
10
Json::Value ret_json,sub;
sub.append(Json::Value(123));
sub.append(Json::Value(456));
sub.append(Json::Value(789));

ret_json["msg"] = "Hello JSON";
ret_json["code"] = sub;
ret_json["ret"] = true;

cout << ret_json.toStyledString()<< endl;

结果为:

1
2
3
4
5
{
"code" : [ 123, 456, 789 ],
"msg" : "Hello JSON",
"ret" : true
}


用SIGINT信号的回调函数关闭节点(自定义+ROS源码)

用自定义的方法关闭节点时,代码一般是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void mySigintHandler(int sig)
{
ROS_INFO("Exit from my_node !");
ros::shutdown();
ros::waitForShutdown();
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "my_node", ros::init_options::NoSigintHandler);
ros::NodeHandle nh;
signal(SIGINT, mySigintHandler);
ros::spin();
return 0;
}


为什么这样做,原因看ros::start()的源码就明白了,其中有一句:

1
2
3
4
5
6
7
8
9
10
if (!(g_init_options & init_options::NoSigintHandler))
{
signal(SIGINT, basicSigintHandler);
}

void basicSigintHandler(int sig)
{
(void)sig;
ros::requestShutdown();
}

这就是配置ROS启动flag的问题,它是个枚举值,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
 //  Flags for ROS initialization,在命名空间ros::init_options
enum InitOption
{
//不安装SIGINT handler,可以安装自定义的SIGINT handler
NoSigintHandler = 1 << 0,

// 匿名节点模式,在节点名最后添加随机数,使得可以同时运行多个同名节点
AnonymousName = 1 << 1,

// brief Don't broadcast rosconsole output to the /rosout topic
NoRosout = 1 << 2,
};
typedef init_options::InitOption InitOption;

再看上面的源码,如果ROS flag不是NoSigintHandler,那么basicSigintHandler就是默认的SIGINT回调函数,它会运行ros::requestShutdown();,所以我们在运行节点时,按Ctrl+C可以退出节点。

下面我们详细看看这个过程,先看回调函数中的函数requestShutdown

1
2
3
4
void requestShutdown()
{
g_shutdown_requested = true;
}

只有一个赋值,而处理变量g_shutdown_requested的只有一个函数checkForShutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void checkForShutdown()
{
if (g_shutdown_requested) // 除非ros::requestShutdown()
{
// Since this gets run from within a mutex inside PollManager, we need to prevent ourselves from deadlocking with another thread that's already in the middle of shutdown()
boost::recursive_mutex::scoped_try_lock lock(g_shutting_down_mutex, boost::defer_lock);
while (!lock.try_lock() && !g_shutting_down)
{
ros::WallDuration(0.001).sleep();
}

if (!g_shutting_down) // ros::start()中赋值为false,之后未变
{
shutdown(); // 关闭节点
}

g_shutdown_requested = false;
}
}

而跟checkForShutdown有关的只有PollManager::instance()->addPollThreadListener(checkForShutdown);,它位于ros::start()中:

1
2
3
PollManager::instance()->addPollThreadListener(checkForShutdown);
. . . . . .
PollManager::instance()->start();

函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typedef boost::signals2::signal<void(void)> VoidSignal;
VoidSignal poll_signal_;
typedef boost::function<void(void)> VoidFunc;

// 使用了观察者模式
boost::signals2::connection PollManager::addPollThreadListener(const VoidFunc& func)
{
boost::recursive_mutex::scoped_lock lock(signal_mutex_);
return poll_signal_.connect(func);
}

void PollManager::start()
{
shutting_down_ = false;
thread_ = boost::thread(&PollManager::threadFunc, this);
}

void PollManager::threadFunc()
{
disableAllSignalsInThisThread();
while (!shutting_down_) //start()里赋值为false,PollManager::shutdown()里赋值true
{
boost::recursive_mutex::scoped_lock lock(signal_mutex_);
poll_signal_(); // 触发信号,执行槽函数 checkForShutdown
if (shutting_down_)
return;
poll_set_.update(100); // PollSet  poll_set_;函数定义在 poll_set.cpp
}
}

这里用到了boost中的信号和槽,和Qt中的原理一样,关键变量poll_signal_用于连接函数,这里是连接了 checkForShutdown。PollManager是一个线程,start()开启线程,不停地运行函数threadFunc(),因为shutting_down_开始为false,所以不停运行while循环,然后执行poll_signal_(),这会触发信号然后执行它连接的函数,也就是checkForShutdown,也就是节点不停检查是否需要关闭。

这里就是关键的变量g_shutdown_requested了,它只在ros::requestShutdown()赋值为true,所以节点只有在执行了ros::requestShutdown()函数后才会执行,然后执行ros::shutdown(),节点最终的关闭都在这里。当最后一个节点句柄被销毁时,shutdown()被自动调用。

说的有点乱了,整个过程示意图如下:


节点退出问题

以前我的SIGINT回调函数是这样写的:

1
2
ros::shutdown();
ros::waitForShutdown();

这是从网上看来的,后来发现偶尔会很长时间才执行成功,甚至不能成功,一直阻塞。现在来看一下源码:

1
2
3
4
5
6
7
8
9
10
11
12
void waitForShutdown()		// Wait for this node to be shutdown
{
while (ok())
{
WallDuration(0.05).sleep(); // 0.05s
}
}

bool ok()
{
return g_ok;
}

waitForShutdown表示如果节点ok(),会一直睡眠。只有在ros::shutdown()的最后执行了g_ok = false;,有时一直无法退出,说明节点是一直ok(),因为节点在回调函数运行时无法完成shutdown,没执行到g_ok = false;


Shell小技巧

多行注释

bash脚本的多行注释可以采用HERE DOCUMENT特性,实现多行注释,比如

1
2
3
4
<<'COMMENT'
...

COMMENT

举例如下:
1
2
3
4
5
6
7
8
#!/bin/bash
echo "Say Something"
<<COMMENT
your comment 1
comment 2
blah
COMMENT
echo "Do something else"

将命令执行的结果写入日志文件

catkin_make > make.log, 注意不要用一直执行的命令,比如ping

head与tail

tail -f showMapInfo.log可以实时显示文件的更新
tail -n 5 showMapInfo.log显示文件最后5行
head -n 5 showMapInfo.log显示文件开始5行

修改指定目录下的所有文件的权限

全体可读、可修改、可执行: chmod -R 777 dir

history

使用history列出历史命令后,如果需要执行第 1024 历史命令,可以执行 !1024

如果以前执行了一个很长命令,只记录命令开头是 curl,这时就可以通过 !curl 快速执行该命令。但是执行的可能不是我们需要的,所以常常加上 :p,即 !curl:p ,打印出了搜索到的命令,如果要执行,请按 Up 键,然后回车即可。

在命令前额外多加一个空格,这样的命令是不会被记录到历史记录的

执行文件

对于可执行文件,在当前目录时用命令./file即可,如果是完整绝对路径,就用命令/path/file即可

检测温度

开始检测硬件传感器:sudo sensors-detect

要确保已经工作,运行命令:sensors,但只运行一次

设置终端初始路径

bashrc中加一行: cd PATH即可

有一次发现不管怎么改也没用,按照网上的方法修改bash_profile不奏效。 后来发现是系统环境变量$HOME被改变了,于是在/etc/profile中添加一行 export HOME=/home/user,问题解决

显示当前目录的文件和目录,按大小列表排序

ls -Slrh

如果排除目录: ls -Slrh | grep -v '^d'

当前目录下占用最大磁盘空间的目录

1
du -ahx . | sort -rh | head -5

Linux 踢出其他正在 SSH 登陆用户

查看系统当前所有在线用户: w

1
2
3
4
14:15:41 up 42 days, 56 min,  2 users,  load average: 0.07, 0.02, 0.00 
USER TTY FROM LOGIN@ IDLE JCPU PCPU WHAT
root pts/0 116.204.64.165 14:15 0.00s 0.06s 0.04s w
root pts/1 116.204.64.165 14:15 2.00s 0.02s 0.02s –bash

查看当前自己占用终端

1
2
[root@apache ~]# who am i 
root pts/0 2013-01-16 14:15 (116.204.64.165)

用 pkill 命令踢掉对方: pkill -kill -t pts/1

再用 w 命令在看看踢掉没有,如果最后查看还是没有踢掉,加上 -9 强制杀死。

反向删除

文件夹下,删除abc文件以外的所有文件

1
ls | grep -v ^abc$ | xargs -i rm -rf {}

如果abc不加正则表达式,会保留文件名包含abc的文件,比如abcd

对当前目录下的所有文件排序

du -s *. | sort -nr

显示当前目录的最大的文件

du -s *. | sort -nr | head -1

检查系统中的设备情况

需要检查日志 /var/log/messages

只显示文件名和大小

  • ls -sh . 显示当前文件夹的所有文件大小

  • du -sh folder     显示指定文件夹或文件的大小,结果容易可读,比如4G

du -s /home/user/.ros/log的执行结果为

1
1147000 /home/user/.ros/log

大小的单位为 K。

如果只取空格左边部分,使用 echo $(du -s /home/user/.ros/log) | cut -d " " -f 1,如果取空格右边部分,那么把1改为2.


  • ls -Slrh 显示当前目录的文件和目录,按大小列表排序。如果排除目录: ls -Slrh | grep -v '^d'

获取CPU核数

getconf _NPROCESSORS_ONLN 2>/dev/null || sysctl -n hw.ncpu || echo 1

tar.gz 压缩

file.txt文件压缩成file.tar.gz: tar -zcvf myfile.tar.gz myfile.txt

删除包含字符串的行

把文件中包含某字符串的行删掉,比如下面文件中的abc所在行删除,使用sed命令:

1
2
3
4
5
6
7
8
abc12
abc402
a213bc
dlifjljs
abc
904650-9
aabbcc
123abc53465

经过sed处理,变为
1
2
3
4
a213bc
dlifjljs
904650-9
aabbcc

如果执行sed -e '/abc/d' test.txt,只显示修改后的结果,未修改文件。如果执行sed -i,修改并保存文件。

延时函数

使用 sleep 或usleep函数。只看sleep函数:

1
2
3
4
5
#参数单位默认为秒。
sleep 1s 或 sleep 1 #表示延迟一秒
sleep 1m #表示延迟一分钟
sleep 1h #表示延迟一小时
sleep 1d #表示延迟一天

xargs

xargs 是给命令传递参数的一个过滤器,也是组合多个命令的一个工具。它能够捕获一个命令的输出,然后传递给另外一个命令。
xargs 可以将管道或标准输入数据转换成命令行参数,这是最常用的场合,也能够从文件的输出中读取数据。
比如执行命令find *.sh,结果如下:

1
2
3
4
5
deb软件安装.sh
Kinetic插件安装.sh
ping.sh
ubuntu初始化精简版.sh
所有alias.sh

执行find *.sh | xargs会将多行结果变为单行:
1
deb软件安装.sh Kinetic插件安装.sh ping.sh ubuntu初始化精简版.sh 所有alias.sh

执行find *.sh | xargs ls -smh会使xargs捕获find结果,然后传递给之后的命令做参数,这是管道做不到的,结果:

1
2
3
4.0K deb软件安装.sh, 4.0K Kinetic插件安装.sh,
4.0K ping.sh, 4.0K ubuntu初始化精简版.sh,
4.0K 所有alias.sh

xargs与管道不同,首先看命令echo test.cpp,结果是test.cpp.再看命令echo test.cpp | cat,结果还是test.cpp,这是管道将echo的结果定向为cat的输入,执行cat当然还是那个结果.但是echo test.cpp | xargs cat就不一样了,xargs将test.cpp做cat的命令参数,也就是第二次执行的是cat test.cpp

修改某文件的修改时间

1
touch -t 201905171210.20 test.py  # 指定新时间

查看进程PID及线程

  • pidof chrome     显示所有进程的pid,进程名必须完整
  • pgrep sublime     通过名字获取pid,可以是部分进程名
  • ps -T -p pid     进程的子进程情况

pkill kill killall

  • kill pid     按pid杀死进程
  • kill -9 pid     按pid强制终止进程,但是可能造成数据丢失,轻易不要使用。比如 kill -9 $(pgrep rosmaster)
  • killall name     按名称杀死一个进程组的所有进程,比如killall chrome杀死所有的chrome进程

  • pkill name     按名称终止包含name的所有进程

  • pkill -o chrome    杀死最旧的chrome进程
  • pkill -n chrome     杀死最新的chrome进程
  • pkill -c chrome     杀死所有chrome进程,同时显示数量

用脚本杀死阻塞的进程

1
2
3
4
5
6
7
8
#!/bin/bash

while true; do

command & sleep 2 && kill -2 $!
sleep 1
echo " ---------- start next loop ---------- "
done

command是一个阻塞的进程,只有按下ctrl+C后才能继续运行,但在这种循环的情况,显然不能一直去按ctrl+C,所以需要脚本去终止进程command,也就是kill -2 $!

杀死终端的进程

ubuntu终端的进程名为bash,如果在自己电脑pkill bash,还会剩下当前操作的终端,其他的全关。如果通过另一台电脑远程登录,执行pkill bash,可以将所有终端关闭。

xdotool

sudo apt-get install -y xdotool

  • 最小化当前窗口: xdotool windowminimize $(xdotool getactivewindow)

还有很多功能,这个工具能模拟鼠标键盘的大量操作,可参考 神器 xdotool

带参数的alias

有时需要运行一个带参数的很长的命令,这样感觉很不方便,可以与alias结合,提高效率。alias不支持直接加参数,而是通过函数来间接实现。

1
2
# test()是个函数
alias ldd='test() { ldd $1 | grep -v kinetic | grep -v x86_64-linux-gnu;}; test'

统计文件行数、字节、字数

wc命令,选项 -l, -c, -w 分别统计行数、字节、字数,可统计多个文件,但不能统计目录。

1
2
3
wc -c main.cpp
wc -l *.cpp
wc -l main.cpp test.cpp t.cpp //统计三个文件行数

只显示文件名和日期

1
ls -lSh | awk '{print $6,$7,"   " $8,"  " $9}'

统计目录中的文件个数 (包括文件夹)

ls | wc -l

当前文件夹或文件大小

当前文件夹:du -smh
当前文件夹下所有文件和子文件夹: du -mh .
某文件: du -smh file

当前目录下大于400M的文件夹和文件,并从小到大排序

1
sudo du -h | grep "[4-9][0-9][0-9]M" | sort -n

ls -lht frames.pdf 显示文件的详细信息,比如:

1
-rw-rw-r-- 1 user user 20K 6月  29 10:24 frames.pdf

时间相关

  • date -s "2024-10-15 15:50:00"    设置时间
  • date +%s    输出UTC时间

判断文件/文件夹是否存在

1
2
3
4
5
6
7
if [ -d ~/Documents/dir ]
then
echo "yes"
else
echo "no, so create it"
mkdir dir
fi

-d判断后面的文件夹是否存在
-f判断是否存在某文件
-s判断是否存在某文件,且大小不为0,这个比-f更有用

函数

组合多个命令到一个操作。下面的命令组合了 mkdir 和 cd 命令。输入 mc folder_name可以在你的工作目录创建一个名为folder_name的目录并立刻进入

1
2
3
4
mc () {
mkdir -p $1
cd $1
}

scp出错

使用scp传递文件时,出现了这样的报错:

解决方法: ssh -o StrictHostKeyChecking=no 192.168.73.14

注意: 这里的IP是发送者的IP

自动化工作

每次本机提交代码后,远程再更新,编译和移动可执行文件,过程比较繁琐,干脆都放到一个Shell脚本里,用alias执行这个脚本

1
2
3
4
5
6
7
cd /home/user/Robot/workspace/src
svn up
cd /home/user/Robot/workspace
catkin_make --pkg package1 package2 package3

cp $packagebin"/exe1" $dir
cp $packagebin"/exe2" $dir

用echo管道命令给sudo自动输入密码

1
echo password | sudo -S cmd

-S表示从标准输入读取密码。这样用于system函数就比较方便了,但是这种方式密码会明文显示,密码不安全

拆分大文件

split命令拆分大文件,以原本文件名为前缀,数字为后缀

1
split --verbose -n5 myfile.cpp -d myfile_

myfile.cpp分割为5个文件, 以myfile_为前缀, 显示分割过程,但是缺陷在于拆分的文件没有扩展名

进一步优化:把test.log拆分,每个最大200k,并且按照test_数字命名,带扩展名。
split test.log -b 200k -d -a 2 test_&&ls | grep test_ | xargs -n1 -i{} mv {} {}.log

升级版ping命令

当能ping通一个主机时,此时ping命令会一直执行,要想终止,可采用CTRL+c或CTRL+z方式退出。也可以设置选项方式,使得ping命令执行若干次包就终止。ping -c 4 ip,此时ping命令将执行4次

下面的脚步是执行ping命令时,可以只输入最后地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#/bin/bash

ip=`hostname -I`
net=$(cut -d'.' -f3<<<$ip)
var1=$(cut -d'.' -f1<<<$ip)
var2=$(cut -d'.' -f2<<<$ip)
sub=${var1}'.'${var2}'.'${net}'.'

echo -e "\033[41;37m 输入要ping的IP最后一个地址 \033[0m"
read -p ":" addr
address=${sub}${addr}

ping -c 3 $address

echo -e "\033[44;37m Ping IP address done ! \033[0m"

当前文件夹中只有一个子目录时,直接进入

可以做成alias ccd :

1
2
3
4
5
6
7
8
9
10
11
#/bin/zsh

n=$(ls | wc -l)

if [ "$n" -eq "1" ]
then
folder=$(ls)
cd $folder
else
echo "can't cd, more than 1 child folder"
fi

当前文件夹中只有一个文件时,直接用vim打开

1
2
3
4
5
6
7
8
9
10
#!/bin/bash

n=$(ls | wc -l)
if [ "$n" -eq "1" ]
then
file=$(ls)
vim $file
else
echo "more than 1 file"
fi

使用脚本在当前终端打开一个新标签

1
2
3
gnome-terminal --tab
# 打开两个标签
gnome-terminal --tab --tab

在终端中打开多个tab,每个tab运行单独的命令

1
2
3
4
5
6
7
8
9
10
11
title1="tab 1"
title2="tab 2"
title3="tab 3"

cmd1="echo tab1"
cmd2="echo tab2"
cmd3="echo tab3"

gnome-terminal --tab --title="$title1" --command="bash -c '$cmd1; $SHELL'" \
--tab --title="$title2" --command="bash -c '$cmd2; $SHELL'" \
--tab --title="$title3" --command="bash -c '$cmd3; $SHELL'"

参考链接

linux删除某文件以外的文件

1
2
3
rm -f  !(a) 
# 删除a和b以外的文件
rm -f !(a|b)

找出当前文件夹下大于100M的文件

1
find . -type f -size +100M -print0 | xargs -0 du -h | sort -nr

只显示当前所有文件的名称和大小

1
ls -smhSlr | awk '{ print $1, $10 }'

判断当前某进程的个数或是否在运行

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash

# ps -ef |grep zookeeper 这个就是看zookeeper的启动情况
# grep -v "grep" 是为了去掉查询 grep的那一条
# wc -l 是计数的

COUNT=$(ps -ef | grep zookeeper |grep -v "grep" |wc -l)
echo $COUNT
if [ $COUNT -eq 0 ]; then
echo NOT RUNNING
else
echo is RUNNING
fi

Linux 踢出其他正在 SSH 登陆用户

查看系统当前所有在线用户: w

1
2
3
4
14:15:41 up 42 days, 56 min,  2 users,  load average: 0.07, 0.02, 0.00 
USER TTY FROM LOGIN@ IDLE JCPU PCPU WHAT
root pts/0 116.204.64.165 14:15 0.00s 0.06s 0.04s w
root pts/1 116.204.64.165 14:15 2.00s 0.02s 0.02s –bash

查看当前自己占用终端

1
2
[root@apache ~]# who am i 
root pts/0 2013-01-16 14:15 (116.204.64.165)

用 pkill 命令踢掉对方: pkill -kill -t pts/1

再用 w 命令在看看踢掉没有,如果最后查看还是没有踢掉,加上 -9 强制杀死。


ROS通信模型

ROS节点构成了一个计算图(computation graph),它是一个p2p的网络结构,节点之间的拓扑结构为 mesh(网状)。节点必须与其他节点通讯,否则就是个普通程序。ROS通讯中,节点通过XML-RPC远程调用来实现建立连接,传输数据。远程调用负责管理节点对计算图中信息的获取与更改,还有一些全局的设置, 但RPC不直接支持数据的流传输(而是通过 TCPROS与 UDPROS)

每个ROS节点运行一个XML-RPC server,ROS的通讯模块在ros_comm

topic通信模型

我运行了两个发布者节点:Pub和second,一个订阅者节点:Sub,话题名为topic,详细情况如下:


上面列出了三个节点的端口号,IP都是相同的,因为都在本机上,roscore的uri为http://192.168.0.106:11311,整个通信模型如下:

发布者节点通过一个名为registerPublisher的远程调用(XML-RPC),注册它们的话题和消息类型到主节点。同样地,订阅者通过registerSubscriber()远程调用,注册到主节点,这样主节点就获知了双方的uri、话题名称、消息类型。如果仅仅话题名一样,消息类型不一样,也是不能通信的。

订阅者注册时,主节点会回复注册同一话题的所有发布者,然后订阅者就直接与发布者通信,基于TCPROS协议传输消息,这就跟主节点无关了。当又有新的发布者注册同一个话题到主节点时,主节点执行publisherUpdate()给订阅者,回复一个新的发布者list。

二者通信是基于TCPROS协议,过程与三次握手类似,订阅者与发布者建立第一次连接,传输topic信息,然后再根据发布者返回的topic 信息,建立第二次连接,发布者开始传输具体的数据。

对于unregistration,发布者执行远程调用unregisterPublisher(),订阅者执行unregisterSubscriber()远程调用。

举个例子,我们可以看一个节点的信息:
rosnode info.png
话题tf_static是本节点传递给amcl节点的,方向outbound说明了这一点,也就是本节点负责发布,消息传递方式是TCPROS;话题joint_states的传递方向是inbound,也就是本节点从节点joint_state_publisher订阅了此话题

TCPROS中的重点:

  • Md5:如果两个话题名一样而消息类型不同,会无法通信成功。TCPROS为了保证两边传输数据类型一致,会在协议头中给出话题名的md5 hash算法处理过的值,而每次你生成新的msg时,md5的值都会因为你内部数据类型的变动而改变。

  • Subscriber 选项tcp_nodelay :如果是“1” 则给socket 设置 TCP_NODELAY 选项,降低延迟,可能会降低传输效率。

  • Service client 选项:persistent 设置为1,则service的链接会一直开放给多个 service request

使用netstat和wireshark研究通信过程

但是我在使用ROS时, 发现ROS命令显示的端口号并不准确,使用netstatwireshark是找不到的,最好以后两者为准

启动了一对发布订阅者的程序,用netstat查看如下,排除rosout节点:

1
2
# 同IP所有建立通信的节点,排除rosout和SSH
netstat -anop | grep 0.109 | grep ESTA | grep -v rosout | grep -v 109:22

结果还需要去掉3个通信,就是sub-rosout, pub-rosout, python-rosout。python对应的进程是roscore
netstat的结果
显然pub和sub的通信端口不是rosnode info显示的,不知道这算不算ROS的bug

wireshark抓到的发布和订阅TCPROS通信
两者之间的TCP通信只有一问一答:Publisher发PSH和ACK,要求Subscriber数据尽快达到应用层,Subscriber发回ACK,从wireshark里看不到传递的消息,但ROS又不是加密的,这点我没明白什么意思

参考:ROS的弊端


rosbag的使用

bag文件包含tf信息和话题消息,没有rosparam参数和node。 /clock是由ROS的录包回放系统rosbag发布的主题,主要是提供一个仿真时间

rosbag本质也是个ROS节点,whereis rosbag的结果: /opt/ros/kinetic/bin/rosbag,运行rosbag play会出现一个play开头的节点,后面是当前的utc时间

rosbag record

基本使用:rosbag record topic,可以有多个话题:rosbag record topic1 topic2 topic3,话题可以用shell脚本列出来,例如: rosbag record rostopic list | grep plan

常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-o: 在文件名之前加指定的字符串, rosbag record -o test topic ,结果生成文件test_2019-06-24-14-32-08.bag

-node: 记录某节点订阅的所有话题,` rosbag record --node=/joy_teleop

-l: 只记录指定数量的消息, rosbag record -l 1000 /chatter,只记录1000条

--duration: 保持一定时间记录话题, rosbag record ---duration=30 /chatter, 即记录30秒,还可以有5m, 2h
rosbag record --split --duration=30 /chatter #持续时间到30s后分文件存储

-O: 大写O,指定完整文件名 rosbag record -O sess.bag /topic

--size: 指定文件大小,rosbag record --split --size=1024 /chatter #空间达到1024M后分文件存储

-e, -x: 用于正则表达式

我最常用的是rosbag record topic -o filename

rosbag info

显示指定bag文件的详细信息,rosbag info session*.bag

rosbag play

rosplay其实是重现话题数据传输的效果,比如向话题cmd_vel发数据的过程

rosbag回放过程中按空格暂停。默认模式下,rosbag play命令在公告每条消息后会等待0.2秒才真正开始发布bag文件中的消息。如果rosbag play在公告消息后立即发布,订阅器可能会接收不到几条最先发布的消息

如果同时回放两个单独的bag文件,则根据时间戳的间隔来播放。比如我先录制一个bag1,等待一个小时,然后录制另一个 bag2,在一起回放 bag1 和 bag2 时,实际是先回放bag1,然后等待1个小时才回放bag2

等待时间可以通过-d选项来指定,比如等待10秒再播放bag: rosbag play -d 10 realsense.bag

以暂停的方式启动,防止跑掉数据:

1
rosbag play --pause record.bag

要一帧一帧播放的效果,先用暂停方式启动,然后按S键,这个用的还比较少

设置以0.5倍速回放,也就是以录制频率的一半回放。可以加倍播放,但会增大CPU占用。

1
rosbag play -r 0.5 record.bag

循环播放:

1
rosbag play -l record.bag

use_sim_time的问题

播放bag时,运行算法程序,结果报警:

1
2
3
Warning: TF_OLD_DATA ignoring data from the past for frame base_footprint at time 1.59524e+09 according to authority unknown_publisher
Possible reasons are listed at http://wiki.ros.org/tf/Errors%20explained
at line 277 in /tmp/binarydeb/ros-kinetic-tf2-0.5.20/src/buffer_core.cpp

在其他节点之前运行rosbag play --clock ...,会设置好use_sim_time

--clock选项会让 rosbag play发布和订阅bag文件中的消息到/clock话题,这样会让你的其他节点运行在消息发布的时候。 如果use_sim_time是 true, /clock需要消息

这样一来launch文件里加入rosparam set use_sim_time true就不合适了,launch里常常不加入 rosbag play,即使加入了,也不一定先设置了use_sim_time,合适的执行顺序如下:

  1. 启动ROS,但是不包含你需要启动的节点
  2. rosparam set use_sim_time true
  3. 启动你需要的节点
  4. rosbag play —clock

有的资料说不用--clock才能正常,我估计这是数据集的问题,一般还是要有的

filter

可以实现对已有rosbag文件中的某些topic去除或者保留

1
rosbag filter input.bag output.bag 'topic == "/camera/image_raw/compressed" or topic == "/scan" or topic == "/timetag" or  topic == "/tf" and m.transforms[0].header.frame_id != "/odom" and m.transforms[0].child_frame_id != "/odom"'

结果是保留话题/camera/image_raw/compressed , /scan, /timetag, /tf。但是tf变换的frame_idchild_frame_id不是odom,这样的tf消息会被过滤

参考: How to remove a TF from a ROS bag

bag文件转txt csv

将file_name.bag文件中topic_name话题的消息转换到 file.txt文件中:

1
rostopic echo -b file.bag -p /topic_name > file.txt

-p 只能指定一个话题

rosbag可以直接转csv文件,用excel打开,跟转txt一样的

rosbag compress

如果录制的bag很大,我们可以压缩它,默认的压缩格式是bz2:

1
2
3
4
5
6
7
8
rosbag compress xxx.bag
# 可以添加`-j`手动指定压缩格式为bz2
rosbag compress -j xxx.bag
# 也可以使用LZ4来压缩数据
rosbag compress --lz4 xxx.bag

# 解压缩
rosbag decompress xxx.bag

实例

我们常常使用rosbag record命令录制机器人的建图或行走过程,一般先运行record命令,然后开始机器人的路径规划和行走,到达目标点后,在record命令的终端直接ctrl+C就可以保存bag文件.

这里涉及到一个时间戳的问题, bag文件的时间而非本机时间,在默认情况下,ROS使用ubuntu系统的时间,也就是wall clock time.但bag文件中记录的是历史时间,所以play之前要告诉ROS启用simulated time

1
rosparam set use_sim_time true

然后使用rosbag play --clock test.bag命令播放,当然要先打开rviz,添加对应的topic:

play结束之后,还要把参数还原:

1
rosparam set use_sim_time false

这样可以在rviz中离线分析问题了, 大大提升了效率

问题

我之前根据make_plan服务生成了路径,存放在bag文件里,在我使用rosbag play这些文件时,出现了报错:

不管怎么检查,文件命名也没有问题,不得其解。

使用rosbag info查看其信息,发现起始和终止时间相同,只有1个message。因为make_plan服务生成路径只是运行了一次回调函数,所以只有一个message,即使能在rviz显示,也只有几微妙的一瞬间,没什么意义

读取bag的大小不一致

录制bag包后可能没有正确结束掉程序,回放出现错误:

1
2
[ INFO] [1603677455.288200829]: Opening fuse_2020-10-24-18-01-07.bag
[FATAL] [1603677455.288476478]: Error reading from file: wanted 4 bytes, read 0 bytes

可使用如下命令修复:rosbag reindex xxx-xxx-xxx.bag

play时error reading version line

rosbag可能已损坏,你可以使用rosbag info XX.bag查看你想要读取的rosbag包的信息,如果无法读取到有效信息,说明该rosbag确实已损坏。