ROS话题通信原理

发布者的代码一般是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ros::init(argc,argv, "Pub");
ros::NodeHandle nh;
ros::Publisher pub = nh.advertise<std_msgs::Int8>("topic", 50, false);
ros::Rate rate(1);

std_msgs::Int8 msg;

srand(time(NULL));

while(ros::ok())
{
msg.data = rand()%100;
pub.publish(msg);
ROS_INFO("msg:%d",msg.data);

ros::spinOnce();
rate.sleep();
}

注册话题

advertise函数的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Publisher NodeHandle::advertise(AdvertiseOptions& ops)
{
ops.topic = resolveName(ops.topic);
if (ops.callback_queue == 0)
{
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else // 实际执行这里
{
// spinOnce() 就是 g_global_queue->callAvailable(ros::WallDuration());
ops.callback_queue = getGlobalCallbackQueue();
}
}
SubscriberCallbacksPtr callbacks(boost::make_shared<SubscriberCallbacks>(ops.connect_cb, ops.disconnect_cb, ops.tracked_object, ops.callback_queue));

if (TopicManager::instance()->advertise(ops, callbacks))
{
Publisher pub(ops.topic, ops.md5sum, ops.datatype, *this, callbacks);
{
boost::mutex::scoped_lock lock(collection_->mutex_);
// `collection_`在NodeHandle构造函数中初始化为0,也就是空指针,在NodeHandle::construct中分配内存
// impl_是boost::shared_ptr< Impl >, pubs_ 是std::vector< Publisher::ImplWPtr >
collection_->pubs_.push_back(pub.impl_);
}
return pub; // 执行这里
}
return Publisher();
}

NodeHandle::advertise的流程.png

NodeHandle部分成员:

1
2
3
4
5
6
7
class NodeHandle
{

private:
CallbackQueueInterface * callback_queue_;
NodeHandleBackingCollection * collection_;
};

collection_在NodeHandle构造函数中初始化为0,也就是空指针。其类型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class NodeHandleBackingCollection
{
public:
typedef std::vector< Publisher::ImplWPtr > V_PubImpl;
typedef std::vector< ServiceClient::ImplWPtr > V_SrvCImpl;
typedef std::vector< ServiceServer::ImplWPtr > V_SrvImpl;
typedef std::vector< Subscriber::ImplWPtr > V_SubImpl;

public:
ServiceManagerPtr keep_alive_service_manager = ServiceManager::instance()
TopicManagerPtr keep_alive_topic_manager = TopicManager::instance()

boost::mutex mutex_;
V_PubImpl pubs_;
V_SrvCImpl srv_cs_;
V_SrvImpl srvs_;
V_SubImpl subs_;
};


//一个简单的广播话题的函数, 至少传入两个参数,topic:话题名字,必须独一无二,不能重复, queue_size:发送到subscriber的消息缓存数量
template <class M>
Publisher advertise(const std::string& topic, uint32_t queue_size, bool latch = false){
//话题广播时的配置选项
AdvertiseOptions ops;
    //n.advertise<std_msgs::String>("topic_m",1000)语句运行时<M>为: <std_msgs::String>
    //看不懂, 貌似在调用对象ops的成员函数init(),估计类似于opt.init<M>(topic, queue_size);
ops.template init<M>(topic, queue_size);
ops.latch = latch;
return advertise(ops);//又调用了下面的advertise()函数
}

AdvertiseOptions定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*  AdvertiseOptions是一个类,在/opt/ros/kinetic/include/ros/advertise_options.h
* \brief Constructor
* \param _topic Topic to publish on
* \param _queue_size Maximum number of outgoing messages to be queued for delivery to subscribers
* \param _md5sum The md5sum of the message datatype published on this topic
* \param _datatype Datatype of the message published on this topic (eg. "std_msgs/String")
* \param _connect_cb Function to call when a subscriber connects to this topic
* \param _disconnect_cb Function to call when a subscriber disconnects from this topic
*/
AdvertiseOptions(const std::string& _topic, uint32_t _queue_size, const std::string& _md5sum,
const std::string& _datatype, const std::string& _message_definition,
const SubscriberStatusCallback& _connect_cb = SubscriberStatusCallback(),
const SubscriberStatusCallback& _disconnect_cb = SubscriberStatusCallback())
: topic(_topic)
, queue_size(_queue_size)
, md5sum(_md5sum)
, datatype(_datatype)
, message_definition(_message_definition)
, connect_cb(_connect_cb)
, disconnect_cb(_disconnect_cb)
, callback_queue(0)
, latch(false)
, has_header(false)
{}

发布话题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename M>
void publish(const M& message) const
{
using namespace serialization;
namespace mt = ros::message_traits;

if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
return;
}

if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
// 验证消息和发布者的类型是否对应
ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(message),
"Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
mt::datatype<M>(message), mt::md5sum<M>(message),
impl_->datatype_.c_str(), impl_->md5sum_.c_str());

SerializedMessage m;
publish(boost::bind(serializeMessage<M>, boost::ref(message)), m);
}

接着再调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
// 验证publisher合法性
if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
}

然后是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
if (isShuttingDown())
{
return;
}

PublicationPtr p = lookupPublicationWithoutLock(topic);
if (p->hasSubscribers() || p->isLatching())
{
ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

// Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can do a no-copy publish.
bool nocopy = false;
bool serialize = false;
// We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
if (m.type_info && m.message)
p->getPublishTypes(serialize, nocopy, *m.type_info);
else
serialize = true;

if (!nocopy)
{
m.message.reset();
m.type_info = 0;
}
if (serialize || p->isLatching())
{
SerializedMessage m2 = serfunc();
m.buf = m2.buf;
m.num_bytes = m2.num_bytes;
m.message_start = m2.message_start;
}
p->publish(m);

// If we're not doing a serialized publish we don't need to signal the pollset. The write()
// call inside signal() is actually relatively expensive when doing a nocopy publish.
if (serialize)
{
poll_manager_->getPollSet().signal();
}
}
else
{
p->incrementSequence();
}
}