使用ROS Service(一) 服务和话题的区别,基本命令

服务和话题

特点 Topic Service
通信方式 异步通信 同步通信
通信模型 Publish-Subscribe Request-Reply
多对多 多对一
接收者接到数据会回调 远程过程调用(RPC)服务器端的服务
应用场景 连续,高频的数据发布 偶尔调用的功能
举例 激光雷达 开关传感器,拍照,逆解计算
由于是同步通信,service 的回调不能写复杂的、耗时长的业务

rosservice call

rosservice call addTwoNum 3 4: 调用服务,addTwoNum是服务名称, 名称不加引号,3和4是服务中的req变量的值,注意必须按服务文件中的顺序赋值。

但上面是简单情况,srv文件稍微有点复杂时,就会怎么填也不对,个人认为这是ROS Service很失败的一个地方。
例如srv文件的请求部分是这样的:

1
2
string request_string
string album

我试了很多种组合,都不正确。其实这种情况也好办,在call srvname之后直接Tab,会给出一个模板:
1
2
rosservice call service_name "request_string: ''
album: ''"

我们要做的就是把内容填到单引号里面,不要再自己修改模板,否则容易出错。

call执行成功时,终端不会有任何结果。但是经常出现这样的错误: ERROR: service [/control_cam] responded with an error: ,但程序执行没有问题。原因是 程序中对srv文件中的应答没有处理

其他常用命令:

  • rosservice list: 显示当前所有活动的服务
  • rosservice info addTwoNum: 显示服务的信息
  • rosservice type addTwoNum: 显示服务的变量类型

其实服务的相关命令和话题的很类似

ROS的服务类型

启动乌龟节点时,有一个服务叫clear,类型:

1
2
3
rosservice type clear

std_srvs/Empty

看源码可知,服务的类型为空,这表明调用这个服务不需要参数(比如,请求不需要发送数据,响应也没有数据).调用后,服务清除了turtlesim_node的背景上的轨迹,没有响应.

std_srvs包还有两个服务:std_srvs/Triggerstd_srvs/SetBool,但是很不常用


locate和find
  • locate的速度比find快,因为它并不是真的查找文件,而是查数据库。

  • 新建的文件,我们立即用locate命令去查找,一般是找不到的,因为数据库的更新不是实时的,而是每天

  • locate命令所搜索的后台数据库在/var/lib/mlocate这个目录下,可能有些Linux系统位置不同,具体我们可以用locate locate查询
  • 并不是所有的目录下的文件都会用locate命令搜索到,/etc/updatedb.conf这个配置文件中,配置了一些locate命令的一些规则。

  • updatedb会大致每天运行,这是靠系统的crontab命令实现的

  • updatedb -U:更新指定目录相关的数据库信息。默认是整个系统,耗时比较长,因此可以使用该参数,比如sudo updatedb -U /home/user/

updatedb的配置文件 /etc/updatedb.conf

1
2
3
4
5
cat /etc/updatedb.conf 
PRUNE_BIND_MOUNTS = "yes"
PRUNEFS = "9p afs anon_inodefs auto autofs bdev binfmt_misc cgroup cifs coda configfs cpuset debugfs devpts ecryptfs exofs fuse fuse.sshfs fusectl gfs gfs2 gpfs hugetlbfs inotifyfs iso9660 jffs2 lustre mqueue ncpfs nfs nfs4 nfsd pipefs proc ramfs rootfs rpc_pipefs securityfs selinuxfs sfs sockfs sysfs tmpfs ubifs udf usbfs fuse.glusterfs ceph fuse.ceph"
PRUNENAMES = ".git .hg .svn"
PRUNEPATHS = "/afs /media /mnt /net /sfs /tmp /udev /var/cache/ccache /var/lib/yum/yumdb /var/spool/cups /var/spool/squid /var/tmp /var/lib/ceph"

  • PRUNENAMES 搜索时不搜索的文件类型
  • PRUNEPATHS 搜索时不搜索的路径
  • PRUNE_BIND_MOUNTS = "yes" 开启搜索限制
  • PRUNEFS 搜索时不搜索的文件系统

locate 常用命令

1
2
3
4
5
6
7
locate -c   # 查询指定文件的数目。(c为count的意思)
locate -e # 只显示当前存在的文件条目。(e为existing的意思)
locate -i # 查找时忽略大小写区别

# 使用正则表达式查找文件
locate -r makefile$ # 以makefile结尾的文件
locate -r ^/home/user/ # 以/home/user/开头的文件

从结果中取出词尾是config2

注意:locate的结果可能是不存在的文件,这时最好用locate -e

locate 查找文件tree.xml,也就是知道完整的文件名时,那么最好用 locate /tree.xml,如果不加/,会显示test_tree.xml的结果

find 常用命令

查找当前目录中(包括子目录)所有扩展名为cfg的文件:

1
2
# 或者 '*.cfg'
find -name *.cfg

  • find . -name '*.cpp' -mmin -30 当前目录下,最近30分钟修改的cpp文件

  • find . -name '*.cpp' -mtime 0 当前目录下,最近24小时修改的cpp文件

  • find . -type f -mtime 0 当前目录下,最近24小时修改的常规文件


深入解析乌龟追乌龟程序

ROS启动乌龟追乌龟程序用的是roslaunch turtle_tf turtle_tf_demo.launch,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<launch>

<node pkg="turtlesim" type="turtlesim_node" name="sim"/>
<node pkg="turtlesim" type="turtle_teleop_key" name="teleop" output="screen"/>
<!-- Axes -->
<param name="scale_linear" value="2" type="double"/>
<param name="scale_angular" value="2" type="double"/>
<!-- 针对两个乌龟的tf广播 -->
<node name="turtle1_tf_broadcaster" pkg="turtle_tf" type="turtle_tf_broadcaster.py" respawn="false" output="screen" >
<param name="turtle" type="string" value="turtle1" />
</node>
<node name="turtle2_tf_broadcaster" pkg="turtle_tf" type="turtle_tf_broadcaster.py" respawn="false" output="screen" >
<param name="turtle" type="string" value="turtle2" />
</node>

<node name="turtle_pointer" pkg="turtle_tf" type="turtle_tf_listener.py" respawn="false" output="screen" >
</node>

</launch>

乌龟的程序,按下方向键,乌龟只走一段距离,是在cmd_vel的回调函数里更新乌龟的位置,取决于发布的twist消息和更新间隔dt,由Qt实现


tf(一) 概念和基本使用

tf包处理的是任一个点在所有坐标系之间的坐标变换问题,它把各种转换关系建立在一个树结构上,树的每个节点是坐标系,每个坐标系可以有多个child,但只能有一个parent,转换只能是从parent向child。比如Tb-a表示坐标系a向b转换,也就是说a是parent,b是child,这个变换描述的就是child坐标系中的点在parent坐标系下的姿态。要实现这个变换,就是用child坐标系在parent坐标系下的描述(一个矩阵)去描述(乘以)这个点在child坐标系下的描述(坐标)。world参考系是tf树最顶端的父参考系

如果打算用tf解决你的坐标变换问题,请一定要先清晰的画出这棵树的结构,再开始写程序。比较重要的类是tf::TransformBroadcaster, tf::TransformListener, tf::Transform, tf::StampedTransform

在tf的运行机制中,由于tf会把监听到的内容放到一个缓存中。我们通过transformPose获取变换关系,是通过查询这个缓存来实现的。获取的数据不能保证实时性,会有一定的延迟。也有可能无法获得,因此这个函数在运行过程中会抛出异常,所以这里使用try-catch语句捕获这个异常并返回。

tf::TransformBroadcaster类

sendtransform接口可以建立tf树,发布一个从已有的父坐标系到新的子坐标系的变换时,这棵树就会添加一个树枝,之后就是维护。TransformBroadcaster类就是一个publisher, 如果两个frame之间发生了相对运动,TransformBroadcaster类就会发布TransformStamped消息到tf话题,当多个节点向tf话题发消息时,就形成了tf树。

tf::Transform

建立坐标系之间的位移和旋转的关系,最后用于sendTransform函数。

它是一个坐标转换。成员有:Matrix3x3 m_basis,用3*3的矩阵表示旋转; Vector3 m_origin,用3*1的向量表示平移。

tf::Transform支持乘法运算符,实际的计算是先把旋转矩阵和平移量组合为变换矩阵,变换矩阵相乘后,再转换为tf::Transform类型

tf::Transform类的重要函数如下:

1
2
3
4
5
Matrix3x3 &   getBasis ()    //Return the basis matrix for the rotation
const Vector3 & getOrigin () //Return the origin vector translation
Quaternion getRotation () //Return a quaternion representing the rotation
Transform operator* (const Transform &t) const //Return the product of this transform and the other.
Transform inverse () //Return the inverse of this transform

inverse()函数很有用,我们可以把上面程序中的transform.getOrigin().x()改成transform.inverse().getOrigin().x()就可以求出乌龟1在乌龟2坐标系中的坐标了。

tf::StampedTransform类继承自tf::Transform,它多了两个重要变量就是child_frame_id_frame_id_

tf::TransformListener

监听一个父坐标系到子坐标系的变换,waitForTransform是监听转换关系,可以指定监听的时间或一直阻塞;lookupTransform紧随其后,获取 tf::Transform

使用前需要#include <tf/transform_listener.h>

TransformListener构造函数有两个,常用的是

1
2
3
4
TransformListener::TransformListener(
ros::Duration max_cache_time = ros::Duration(DEFAULT_CACHE_TIME),
bool spin_thread = true
)

平时用的是无参构造函数,其实是默认构造函数,如果指定缓存时间,就用tf::TransformListener tf_(ros::Duration(15) );Costmap2DROS中使用的tf缓存,根源是move_base_node.cpp中的tf::TransformListener tf(ros::Duration(10) );

参考我写的程序test_costmap。开始,如果没有map—->base_link的TF转换,则报错No Transform available Error。此时发布TF变换,则不再报错。然后再关闭TF变换,test_costmap还能正常运行10s,然后报错 Extrapolation Error

transformPose

原型是void transformPose(const std::string &target_frame, const geometry_msgs::PoseStamped &stamped_in, geometry_msgs::PoseStamped &stamped_out) const

target_frame就是你要把源pose转换成哪个frame上的pose。假如你的源pose的frame_id是”odom”,你想转到”map”上,那么target_frame写成“map”就可以了。stamped_in就是源pose,而stamped_out就是目标数据了,也就是转换完成的数据。需要注意的是,从参数上来看,转换时是不需要指定源frame_id的,这是因为它已经包含在了stamped_in中,换句话说,就是这个函数一个隐含的使用条件是,stamped_in中必须指明源pose属于哪个frame

把odom坐标系的数据转换到map坐标系下

1
2
3
4
5
6
7
8
9
10
11
12
13
geometry_msgs::PoseStamped pose_odom;
pose_odom.header = odom->header;
pose_odom.pose = odom->pose.pose;

geometry_msgs::PoseStamped pose_map;
try{
listener.transformPose("map", pose_odom, pose_map);
}
catch( tf::TransformException ex)
{
ROS_WARN("transfrom exception : %s",ex.what());
return;
}

有时会出现这样的报错: transfrom exception : “map” passed to lookupTransform argument target_frame does not exist ,但是使用tf_echo发现是正常的。需要检查代码是不是在回调函数里运行了, 不需要在回调函数里创建TransformListener对象, 将它作为类成员变量或者全局变量。

全局变量是在main函数之前完成构造函数的,如果用到的类构造函数用到NodeHandle,就会报错。比如tf::TransformListener,解决方法是用全局指针,比如boost::shared_ptr<T>,然后在main函数的ros::init()之后指向一个对象。

参考:tf::TransformListener::transformPose [exception] target_frame does not exist


ROS标定摄像头
abstract Welcome to my blog, enter password to read.
Read more
ROS编程常用函数

接受中文注释

setlocale(LC_ALL, "");

ros::Time 和 double 转换

1
2
double x=ros::Time::now().toSec();     //把时间戳转化成浮点型格式
ros::Time stamp = ros::Time().fromSec(x); //把浮点型变成时间戳

读取环境变量

跨平台函数 bool get_environment_variable (std::string &str, const char *environment_variable)

clamp函数

1
2
3
4
5
6
7
8
9
template<class T>
T clamp(T x, T min, T max)
{
if (x > max)
return max;
if (x < min)
return min;
return x;
}
1
2
3
4
// 注意最后一个路径点,是 end() 的前一个点
geometry_msgs::PoseStamped global_end = *(std::prev(global_plan_.end()) );
geometry_msgs::PoseStamped base_end;
transformPose("base_link", global_end, base_end);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bool  transformPose(
const std::string frame,
geometry_msgs::PoseStamped &in_pose,
geometry_msgs::PoseStamped &out_pose) const
{
if (in_pose.header.frame_id == frame)
{
out_pose = in_pose;
return true;
}
try
{
out_pose.header.frame_id = frame;
tf_->transformPose(frame, in_pose, out_pose);
out_pose.header.seq = in_pose.header.seq;
return true;
}
catch (tf2::TransformException &ex)
{
ROS_ERROR("Exception in transformPose : %s", ex.what() );
}
return false;
}

四元数转欧拉角yaw

1
2
3
4
5
6
7
8
9
10
11
12
13
14
double quaternionToYaw(const geometry_msgs::Quaternion&  quat)
{
tf::Quaternion tf_quat;
tf::quaternionMsgToTF(quat, tf_quat);
double roll, pitch, yaw;
tf::Matrix3x3(tf_quat).getRPY(roll, pitch, yaw);
return yaw;
}

double quaternionToYaw(const geometry_msgs::Quaternion& quat)
{
tf::Quaternion quat(0,0, quat.z, quat.w);
return tf::getYaw(quat);
}

欧拉角 —— 四元数

1
2
3
4
5
6
7
8
9
//只通过yaw计算四元数,用于平面小车
static geometry_msgs::Quaternion tf::createQuaternionMsgFromYaw(double yaw);

static tf::Quaternion createQuaternionFromYaw (double yaw)

static tf::Quaternion createQuaternionFromRPY(double roll, double pitch, double yaw)

static geometry_msgs::Quaternion createQuaternionMsgFromRollPitchYaw( double roll,
double pitch, double yaw)

获得当前节点名称

ros::this_node::getName()

bool ros::master::getNodes (V_string &nodes) 是获得当前所有运行的节点,效果和rosnode list一样。但这里有个缺陷,使用pkill杀死的节点,在rosnode list里仍然能看到,实际已经退出了。

规范角度 [-π, π)

调用g2o::normalize_theta(double theta)或者

1
2
3
4
5
6
7
void normAngle ( double& angle )
{
if( angle >= M_PI)
angle -= 2 * M_PI;
if( angle < -M_PI)
angle += 2 * M_PI;
}

判断机器人是否静止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool robotIsStill()
{
bool velStatus;
boost::shared_ptr<nav_msgs::Odometry const> twistEdge;
twistEdge = ros::topic::waitForMessage<nav_msgs::Odometry>("odom",ros::Duration(0.3) );
if(twistEdge != NULL)
{
double velX = twistEdge->twist.twist.linear.x;
double angularZ = twistEdge->twist.twist.angular.z;
if(velX == 0 && angularZ == 0)
velStatus = true;
else
velStatus = false;
}
else velStatus = false;
return velStatus;
}

移动一定时间,也可以表示移动一定距离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
geometry_msgs::Twist cmd_vel;
cmd_vel.linear.x = -0.12;
cmd_vel.linear.y = 0.0;
cmd_vel.angular.z = 0.0;

auto t1 = std::chrono::steady_clock::now();
auto t2 = std::chrono::steady_clock::now();
ROS_INFO(" -------------- backward begin ! ------------------ ");
while( std::chrono::duration<double>(t2-t1).count() < 3 )
{
vel_pub.publish(cmd_vel);
t2 = std::chrono::steady_clock::now();
}
cmd_vel.linear.x = 0;
vel_pub.publish(cmd_vel);
ROS_INFO(" -------------- backward done, now continue ! ------------------ ");

获取当前速度

1
2
3
4
5
6
7
8
tf::Stamped<tf::Pose> robot_vel_tf;
base_local_planner::OdometryHelperRos odom_helper_;
odom_helper_.getRobotVel(robot_vel_tf);

geometry_msgs::Twist robot_vel_;
robot_vel_.linear.x = robot_vel_tf.getOrigin().getX();
robot_vel_.linear.y = robot_vel_tf.getOrigin().getY();
robot_vel_.angular.z = tf::getYaw(robot_vel_tf.getRotation());

poseStampedMsgToTF

1
2
3
4
5
6
7
8
//get the starting pose of the robot
tf::Stamped<tf::Pose> global_pose;
if(!planner_costmap_ros_->getRobotPose(global_pose)) {
ROS_WARN("Unable to get starting pose of robot, unable to create global plan");
return false;
}
geometry_msgs::PoseStamped start_pose;
tf::poseStampedTFToMsg(global_pose, start_pose);

还有类似的函数: poseTFToMsgtransformMsgToTF

angles

  • static double from_degrees (double degrees): Convert degrees to radians

  • static double normalize_angle (double angle)

  • static double shortest_angular_distance (double from, double to)

  • static double to_degrees (double radians): Convert radians to degrees

  • static double two_pi_complement (double angle): returns the angle in [-2*M_PI, 2*M_PI] going the other way along the unit circl

tf 命名空间

tf的命名空间有很多有用的函数,先列出一部分:

  • TFSIMD_FORCE_INLINE tfScalar angle (const Quaternion &q1, const Quaternion &q2)

Return the half angle between two quaternions

  • TFSIMD_FORCE_INLINE tfScalar angleShortestPath (const Quaternion &q1, const Quaternion &q2)

Return the shortest angle between two quaternions

  • void assertQuaternionValid (const geometry_msgs::Quaternion &q)

Throw InvalidArgument if quaternion is malformed

  • void assertQuaternionValid (const tf::Quaternion &q)

Throw InvalidArgument if quaternion is malformed

  • TFSIMD_FORCE_INLINE Quaternion shortestArcQuat (const Vector3 &v0, const Vector3 &v1)

TFSIMD_FORCE_INLINE Quaternion shortestArcQuatNormalize2 (Vector3 &v0, Vector3 &v1)

  • TFSIMD_FORCE_INLINE Quaternion slerp (const Quaternion &q1, const Quaternion &q2, const tfScalar &t)

Return the result of spherical linear interpolation betwen two quaternions

  • std::string strip_leading_slash (const std::string &frame_name)

  • TFSIMD_FORCE_INLINE tfScalar tfAngle (const Vector3 &v1, const Vector3 &v2)

Return the angle between two vectors

  • TFSIMD_FORCE_INLINE Vector3 tfCross (const Vector3 &v1, const Vector3 &v2)

Return the cross product of two vectors

  • TFSIMD_FORCE_INLINE tfScalar tfDistance (const Vector3 &v1, const Vector3 &v2)

Return the distance between two vectors

  • TFSIMD_FORCE_INLINE tfScalar tfDistance2 (const Vector3 &v1, const Vector3 &v2)

Return the distance squared between two vectors

  • TFSIMD_FORCE_INLINE tfScalar tfDot (const Vector3 &v1, const Vector3 &v2)

Return the dot product between two vectors

goalToGlobalFrame

geometry_msgs::PoseStamped MoveBase::goalToGlobalFrame(const geometry_msgs::PoseStamped& goal_pose_msg)
这个函数的核心功能是获得机器人坐标系和全局地图坐标系的关系:

1
tf_.transformPose(global_frame, goal_pose, global_pose);

计算机器人的内接圆和外切圆半径

1
2
3
4
5
std::vector<geometry_msgs::Point> footprint = planner_costmap_ros_->getRobotFootprint();
costmap_2d::calculateMinAndMaxDistances(footprint, robot_inscribed_radius_, robot_circumscribed_radius);
// 或者这样
planner_costmap_ros_->getLayeredCostmap()->getInscribedRadius()
planner_costmap_ros_->getLayeredCostmap()->getCircumscribedRadius()

通过footprint的代价值判断是否撞障碍

与上一条结合,参考TEB代码

1
2


计算 直角三角形的斜边长

#include <math.h>

double hypot(double x, double y),输入是两个直角边,用法: hypot(p1.x - p2.x, p1.y - p2.y)

ros::shutdown()

关闭节点并且从主节点登出,会终结所有的订阅、发布、服务,连ROS_INFO也不能再用。一般不用调用此函数,当所有的NodeHandle销毁时,就会自动关闭节点。默认的roscpp也会安装SIGINT句柄用来检测Ctrl-C,并自动为你关闭节点。

ros::waitForShutdown ()

等待节点关闭,无论时通过Ctrl-C还是ros::shutdown()

ros::isShuttingDown()

一旦ros::shutdown()调用(注意是刚开始调用,而不是调用完毕)就返回true。一般建议用ros::ok(),特殊情况可以用ros::isShuttingDown()

ros::NodeHandle

获取节点的句柄,ros::init是初始化节点,这个是启动节点。
如果不想通过对象的生命周期来管理节点的开始和结束,可以通过ros::start()ros::shutdown()来自己管理节点。

ros::ok()

判断是否退出节点,如果返回false,说明可能发生了以下事件

  • 调用了ros::shutdown()
  • 被另一同名节点踢出ROS网络
  • 节点中的所有ros::NodeHandles都已经被销毁

ros::Time::now()

获取当前时间: ros::Time time = ros::Time::now()
使用模拟时间时,当 /clock节点接受到第一条消息时,ros::Time::now()返回时刻 0,此时客户端还不知道时钟时间。

ros::Rate

1
2
3
4
5
6
ros::Rate rate(10);
while(ros::ok())
{
// do something
rate.sleep();
}

Rate类的构造函数参数是频率,不是时间。rate(10)对应时间间隔为1/10秒,也就是100毫秒。Rate::sleep()是进行睡眠,实际调用Duration::sleep(),间隔就是从构造函数开始计算,源码处理比较复杂。

获得当前运行的所有节点名

1
2
3
typedef std::vector<std::string> ros::V_string
// Retreives the currently-known list of nodes from the master. 放入
ROSCPP_DECL bool getNodes (V_string &nodes)

用这个函数很简单:

1
2
3
4
5
6
7
std::vector<std::string> nodes;
ros::master::getNodes(nodes); // bool
int len = nodes.size();
for (int i =0; i < len; i ++)
{
cout<<"node: "<<nodes.at(i)<<endl;
}

获得当前运行的所有话题名

1
2
3
4
5
6
7
8
9
10
11
struct ros::master::TopicInfo
{
TopicInfo ()
TopicInfo (const std::string &_name, const std::string &_datatype)
public:
std::string datatype // Datatype of the topic.
std::string name // Name of the topic.
}
typedef std::vector<TopicInfo> ros::master::V_TopicInfo

ROSCPP_DECL bool getTopics (V_TopicInfo &topics)

使用是类似的:

1
2
3
4
5
6
7
8
std::vector<ros::master::TopicInfo> topics;
ros::master::getTopics(topics);
int len = topics.size();
for (int i =0; i < len; i ++)
{
cout<<"topic: "<<topics.at(i).name<<" type: "
<<topics.at(i).datatype<<endl;
}

其他可能用到的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ros::isInitialized ()    // 判断ros::init()是否被调用

ros::isStarted ()  // 判断节点是否已经通过ros::start()启动

ros::requestShutdown ()  // 在节点的一个线程内要求ROS关闭

ros::this_node::getName // 返回当前节点的名称

getAdvertisedTopics()  //返回节点注册的话题
getSubscribedTopics() //返回节点订阅的主题

getNamespace() //返回节点的命名空间

ROS_ASSERT(a<b) //同assert(),若条件不满足,程序会终止

ROS_BREAK() //直接终止程序,报警FATAL


// Get the hostname where the master runs.
ROSCPP_DECL const std::string & getHost ()

// Get the port where the master runs.
ROSCPP_DECL uint32_t getPort ()

// Get the full URI to the master (eg. http://host:port/)
ROSCPP_DECL const std::string & getURI ()

ros_Time_now不能在NodeHandle之前.png

参考:
ros 初始化和关闭


spin与spinOnce

在话题发布和订阅中,消息订阅器一旦知道话题上面有消息发布,就会将消息的值作为参数传入回调函数中,把回调函数放到了一个回调函数队列中,它们的函数名一样,只是实参不一样,这就是subscribe函数的作用。但是此时还没有执行callback函数,当spinOnce函数被调用时,spinOnce就会调用回调函数队列中第一个callback函数,此时回调函数才被执行,然后等到下次spinOnce函数又被调用时,回调函数队列中第二个回调函数就会被调用,以此类推。

注意:因为回调函数队列的长度是有限的,如果发布数据的速度太快,spinOnce函数调用的频率太少,就会导致队列溢出,一些回调函数就会被挤掉,导致没被执行。

对于spin函数,一旦进入spin函数,它就不会返回了,相当于它在自己的函数里阻塞。只要回调函数队列里面有回调函数在,它就会马上去执行。如果没有的话,它就会阻塞,不会占用CPU。

spin()的目的是启动一个新的线程去获取队列中的回调函数并调用它,而回调函数本身不是线程 ,有单线程,同步多线程和异步多线程等情况,这些都有内置的语句。所有用户的调用程序将从 ros::spin()开始调用,只到节点关闭,ros::spin()才有返回值。

ros::spin其实就相当于

1
2
3
4
5
while(ros::ok())
{
// Do Something
ros::spinOnce();
}

发布和订阅话题都不一定要使用spinOnce(),如果仅仅只是响应topic,就用ros::spin(),当程序中除了响应回调函数还有其他重复性工作的时候,那就在循环中做那些工作,然后调用ros::spinOnce()

spinOnece的注意事项

我仔细试验了这几个参数,没有发现缺失回调函数的情况,一般不需要太注意

ros::spinOnce()的用法相对来说很灵活,但往往需要考虑调用消息的时机,调用频率,以及消息池的大小。

比如下面的程序,消息送达频率为10Hz,ros::spinOnce()的调用频率为5Hz,那么消息池的大小就一定要大于2,才能保证数据不丢失,无延迟,跟这里的发布消息池容量无关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//发送频率为10Hz(1秒发10次)  消息池最大容量1000。
ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);
ros::Rate loop_rate(10);
while (ros::ok())
{
// 发布消息
}


ros::Subscriber sub = n.subscribe("chatter", 2, chatterCallback);
ros::Rate loop_rate(5);
while (ros::ok())
{
/*...TODO...*/
ros::spinOnce();
loop_rate.sleep();
}

看一下源码:

1
2
3
4
void spinOnce()
{
g_global_queue->callAvailable(ros::WallDuration());
}

调用队列中的所有回调函数,如果一个回调还没有准备好调用,再推回队列


发布和订阅

话题通信的媒介是消息,消息的产生和消费是解偶的,二者之间没有强行绑定的关系,ROS节点不在乎是谁在发布Topic,哪个节点在订阅Topic,它只关心topic的名字以及当前的消息类型时候和要求的匹配。所以说话题通信是多对多的方式

publisher注册时的消息类型,如果和发布时的消息类型不一致,编译不报错,运行报错

消息是以Boost共享指针的形式传输,我们可以存储它而又不需要复制数据。回调函数最常用的原型是这样的:

1
void callback(const std_msgs::StringConstPtr& str)

其实它就是void callback(const boost::shared_ptr<std_msgs::String const>&);,还有以下几种:
1
2
3
4
5
6
void callback(boost::shared_ptr<std_msgs::String const>);
void callback(std_msgs::StringConstPtr);
void callback(std_msgs::String::ConstPtr);
void callback(const std_msgs::String&);
void callback(std_msgs::String);
void callback(const ros::MessageEvent<std_msgs::String const>&);

MessageEvent类允许你在订阅的回调函数内获取信息的元数据。

其他常用的ros::Subscriber函数

1
2
3
4
5
6
// 注意: 只能判断话题是否发布,不能判断有没有发布消息
uint32_t getNumPublishers () const //获得连接订阅者的发布者个数

std::string getTopic () const //获得所订阅的话题

uint32_t getNumSubscribers () const  //获得连接发布者的订阅者个数

订阅话题后,回调函数的参数类型必须是对应话题的消息类型,否则能订阅成功但不运行回调。

发布者的回调函数

用于检测是否有订阅者

1
2
3
4
5
6
7
8
9
10
11
12
ros::Publisher pub = nh.advertise<std_msgs::Int8>("topic", 50, connectCb,disconnectCb);

// 回调函数如下
void connectCb(const ros::SingleSubscriberPublisher& pub)
{
ROS_INFO("topic Name: %s", pub.getTopic().c_str());
ROS_INFO("Subscriber Name: %s", pub.getSubscriberName().c_str());
}
void disconnectCb(const ros::SingleSubscriberPublisher& pub)
{
ROS_INFO("disconnectCb");
}

ROS的回调函数里定义count变量,居然报错ambiguous,看来以后最好不用count变量
image-20211227101811232

特殊需求

有时需要在订阅者的回调函数中发布消息,而且只发布一次,这时没必要用service混合topic,用static变量就可以做到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static ros::Publisher *pubPtr;
static bool published = false;

void callback(const std_msgs::Int8ConstPtr& msg)
{
int num = msg->data;
ROS_INFO("msg: %d", num);
std_msgs::String str;
str.data = std::string(to_string(num) );
if(!published && num>8) // 在num>8时发布一次,以后再也不发布
{
pubPtr->publish(str);
published = true;
}
sleep(1);
}

int main(int argc, char** argv)
{
ros::init(argc,argv, "Sub");
ros::NodeHandle nh;

ros::Subscriber sub = nh.subscribe<std_msgs::Int8>("topic",20,callback);
ros::Publisher pub = nh.advertise<std_msgs::String>("new",50,false);
pubPtr = &pub;
ros::spin();
return 0;
}

让订阅者停止订阅

ROS官方有这么一段话:

1
2
3
4
5
function   subscribe()  returns a Subscriber object that you must hold on to until you want to unsubscribe. When all copies of the Subscriber object go out of scope, this callback will automatically be unsubscribed from this topic.

function shutdown() Unsubscribe the callback associated with this Subscriber.
This method usually does not need to be explicitly called, as automatic shutdown happens when all copies of this Subscriber go out of scope
This method overrides the automatic reference counted unsubscribe, and immediately unsubscribes the callback associated with this Subscriber

根据说明,有以下三种方法:

  1. 一般让订阅者停止订阅的方法是等subscribe()生成的订阅者对象出作用域.

  2. 另一种方法就是explicitly调用shutdown(),典型方法是声明订阅者对象为全局变量,在回调函数中满足某条件时,对此对象调用shutdown(),但这样会直接退出程序,并不好

  3. 第三种方法其实与第二种类似,声明一个bool全局变量ok,订阅时执行

1
2
3
4
while(!ok)
{
ros::spinOnce();
}

在回调函数中满足某条件时,让ok=true

queue_size和buff_size

发布者的原型是这样的

1
2
3
4
Publisher ros::NodeHandle::advertise(const std::string & topic,
uint32_t queue_size,
bool latch = false
)

  • queue_size Maximum number of outgoing messages to be queued for delivery to subscribers。 消息队列是为了缓存发布节点发布的消息,一旦队列中消息的数量超过了queue_size,那么最先进入队列的(最老的)消息被舍弃。
  • latch (optional) If true, the last message published on this topic will be saved and sent to new subscribers when they connect

订阅者Subscriber ros::NodeHandle::subscribe函数里也有一个queue_size: Number of incoming messages to queue up for processing,消息队列是为了缓存节点接收到的信息,一旦自己处理的速度过慢,接收到的消息数量超过了queue_size,那么最先进入队列的(最老的)消息会被舍弃。

机器人应用中难免会遇到运算起来很费时间的操作,比如图像的特征提取、点云的匹配等等。有时候,我们需要在ROS的Subscriber的Callback回调函数中进行这些费时的操作。如果我们要是没有取舍的对于每个消息都调用一次回调函数,那么势必会导致计算越来越不实时,很有可能当下在处理的还是几秒以前的数据,之前的消息被舍弃了。所以,如果我们想只处理最新的消息,实际上只需要把两个queue_size都设置成1,那么系统不会缓存数据,自然处理的就是最新的消息。

rospy的订阅者还有个buff_size参数,要设置一个很大很大的数,这个缓冲区的大小是指消息队列使用的缓冲区物理内存空间大小。如果这个空间小于一个消息所需要的空间,比如消息是一副图片或者一帧点云,数据量超过了缓冲区的大小。这个时候为了保证通信不发生错误,就会触发网络通信的保护机制,TCP的Buffer会为你缓存消息。

但是roscpp里面没有buff_size参数,它获得 incoming message的大小,分配所需的缓冲区大小,然后一次读取整个incoming message

参考:
queue_size和buff_size的理解
ROS anwser的解释


ROS定时器

roscpp的定时器会计划在某一频率下执行一次回调操作,在订阅或服务中的回调函数队列机制中使用。

创建定时器

通过ros::NodeHandle::createTimer()创建: ros::Timer timer = nh.createTimer(ros::Duration(0.1), timerCallback); 其实有多个重载,这是用的比较多的一种。

回调函数只能是这种形式: void callback(const ros::TimerEvent&);
ros::TimerEvent结构体作为参数传入,它提供时间的相关信息,对于调试和配置非常有用。ros::TimerEvent结构体说明:

1
2
3
4
5
6
7
8
9
10
// 上次回调期望发生的时间
ros::Time last_expected
// 上次回调实际发生的时间
ros::Time last_real
// 本次回调期待发生的时间
ros::Time current_expected
// 本次回调实际发生的时间
ros::Time current_real
// 上次回调的时间间隔(结束时间-开始时间),是wall-clock时间。
ros::WallTime profile.last_duration

常用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void callback(const ros::TimerEvent&)
{
ROS_INFO("Callback triggered");
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "test_timer_node");
signal(SIGINT, SigintHandler);

ros::NodeHandle nh;
ros::Timer timer = nh.createTimer(ros::Duration(0.5), callback);

ros::spin();

return(0);
}

实战

比如某些情况下,我们需要一直监视机器人的里程计数据状态,并在某个界面上显示出来。这种情况下可以用到定时器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void odomTimerCb(const ros::TimerEvent& e);
void odomSubCb(const nav_msgs::OdometryConstPtr& m);
ros::Timer odomTimer ;
riki_msgs::HealthStatus msg;

int main(int argc, char** argv)
{
ros::init(argc,argv,"healthStatus");
ros::NodeHandle nh;
odomTimer = nh.createTimer(ros::Duration(5), odomTimerCb);
ros::Subscriber odomSub = nh.subscribe("odom", 1000, odomSubCb);

ros::Publisher pub = nh.advertise<riki_msgs::HealthStatus>("healthStatus", 1000);
ros::Rate loop_rate(1);
while(ros::ok())
{
pub.publish(msg);
ros::spinOnce();
loop_rate.sleep();
}
}

void odomTimerCb(const ros::TimerEvent& e)
{
msg.odomData = false;
}
void odomSubCb(const nav_msgs::OdometryConstPtr& m)
{
msg.odomData = true;
odomTimer.stop();
odomTimer.start();
}

未发布odom主题时,不会进入订阅者的回调函数odomSubCb,只进入定时器的回调odomTimerCb,此时消息成员odomData一直是false。
发布odom后,进入订阅者回调,此时odomData成为true,此时需要先停止计时器再打开。如果不停止,还会进入计时器回调,让消息成员赋值为false,实际造成赋值混乱。但是还得打开,因为一旦odom话题关闭了,就不会再进入订阅者回调,需要设法调用计时器回调,赋值false。订阅odom话题成功再打开计时器后,不会进入计时器回调。

参考: ROS与C++入门教程-Timers


ROS只发布和订阅一次消息

只发布一次消息

1
2
3
4
5
6
ros::init(argc,argv,"op_motor");
ros::NodeHandle nh;
ros::Publisher pub = nh.advertise<std_msgs::Bool>("Switch",1000);
//不必判断订阅者,直接等待1秒钟,只发布一次
sleep(1);
pub.publish(msg);

只发布一次消息就简单了,不必用while循环,直接发布就可以,但是最好也延时一次,防止订阅者的网络状况不好,当然如果不考虑有没有订阅者就不必延时了,不过这样没什么意义。


ROS只订阅一次消息

1
2
3
4
5
6
7
8
9
10
11
12
ros::init(argc,argv,"node");
ros::NodeHandle nh;
boost::shared_ptr<riki_msgs::Battery const> Edge; // 必须有const
Edge = ros::topic::waitForMessage<riki_msgs::Battery>("BMS",ros::Duration(0.1));
if(Edge != NULL)
{
riki_msgs::Battery bat = *Edge;
cout<<"battery:"<< bat.Ah <<endl;
cout<<"battery:"<< Edge->Ah <<endl;
}
else
cout<<"no message from topic BMS"<<endl;

waitForMessage是等待一次消息,造成的网络传输会出现延时。如果是获得消息后再发布,新话题的发布频率会明显下降 为了保证发布频率,还是尽量使用spin()

这里用到的函数是ros::topic::waitForMessage,没有涉及回调函数,它有4个重载,常用的是两个:

1
2
3
template<class M> boost::shared_ptr< M const >   waitForMessage (const std::string &topic, ros::Duration timeout)

template<class M> boost::shared_ptr< M const > waitForMessage (const std::string &topic, ros::NodeHandle &nh)

第一个是订阅话题topic,假如没有发布话题,那么等待时间timeout,超时后程序继续执行;第二个是如果没有发现话题topic,会一直阻塞,不能向下运行,除非执行rosnode kill。前一个用的比较多,注意时间不要设置太短,否则可能因为网络状况不好而订阅失败。

返回类型一看就知道是模板类的Boost共享指针,从这里就可以看出, 消息是以const Boost共享指针的形式传输,我们可以存储它而又不需要复制数据waitForMessage就是返回了订阅到的消息,取指向就可以获得消息成员,也可以直接用指针获得消息成员。
注意模板类型有const,否则不符合重载声明

回调函数中常见的MsgType::ConstPtr参数类型其实就是boost::shared_ptr<const MsgType>

MsgType::Ptr就是boost::shared_ptr<MsgType>

参考:

How do I publish exactly one message
ros::topic Namespace