探索ROS中的XML-RPC机制(一)概述和服务端

ROS节点的通信机制是基于XML-RPC协议,这是一个远程调用的协议,说白了也是一种进程间通信的方式,让一个进程A调用进程B上的函数,不可能直接调用,只能是将进程A的参数按一定协议封装传到进程B,进程B调用函数处理后,将返回值再封装成XML返回至进程A.

Xml-RPC是一个远程过程调用的分布式计算协议,通过XML将调用函数封装,并 使用HTTP协议作为传送机制。 RPC采用C/S模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

XML-RPC不支持在socket里实现用户轮询代码,用户要轮转一个额外的线程去做这部分工作。使用XML-RPC时,所有使用都通过XMLRPSManager组件,所有与master的通信都通过master::execute函数。



XML-RPC的优点:简单、轻量;XML编码,可读性强;支持多语言多平台;

缺点:

  1. 对字符的编码较弱,中文编码可能得需要通过base64,所以ROS话题不能是中文
  2. 浪费带宽,比如就传递两个参数,需要发送一大堆无用的xml节点
  3. 对复杂数据结构支持不够好
  4. 实时性不够好,不如基于protobuf的RPC通信

XmlRpc++是一个基于C++的XML-RPC第三方库,也是ROS所使用的,需要头文件和链接动态库。我找了半天都没找到完整的Linux版本的,只好直接用ROS自带的,CMakeLists这样写:

1
2
3
4
5
include_directories(/opt/ros/kinetic/include/)
link_directories(/opt/ros/kinetic/lib)

add_executable(HelloServer "HelloServer.cpp")
target_link_libraries(HelloServer -lxmlrpcpp ) # 链接库文件

这个库其实很小,头文件如下:

ROS源码自带了一个XmlRpc++的测试程序,就是ros_comm\utilities\xmlrpcpp\test中的HelloClient.cppHelloServer.cpp,我们先看服务端

HelloServer

要想自定义一个远程调用,服务端需要实现一个抽象类XmlRpcServerMethod的子类,其内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 // 类定义在命名空间 XmlRpc
// Representation of a parameter or result value
class XmlRpcValue;

//类XmlRpcServer处理客户端的请求以进行远程调用
class XmlRpcServer;
// 宏XMLRPCPP_DECL无定义,仅说明XmlRpc在ROS中做静态库编译
class XMLRPCPP_DECL XmlRpcServerMethod {
public:
XmlRpcServerMethod(std::string const& name, XmlRpcServer* server = 0);
virtual ~XmlRpcServerMethod();

std::string& name() { return _name; } // 远程调用的名称

// 执行method,参数 XmlRpcValue,子类必须实现此纯虚函数
virtual void execute(XmlRpcValue& params, XmlRpcValue& result) = 0;

// 返回method的帮助信息,由子类实现
virtual std::string help() { return std::string(); }

protected:
std::string _name; // method 名称赋值
XmlRpcServer* _server;
};

看来这个类很简单,关键是method名称和实现纯虚函数,服务端程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include "xmlrpcpp/XmlRpc.h"
#include <iostream>
#include <stdlib.h>
using namespace XmlRpc;

// The server
XmlRpcServer s;

// One argument is passed, result is "Hello, " + arg.
class HelloName : public XmlRpcServerMethod
{
public: // HelloName是 method 名称
HelloName(XmlRpcServer* s) : XmlRpcServerMethod("HelloName", s) {}
void execute(XmlRpcValue& params, XmlRpcValue& result)
{
std::string resultString = "Hello, ";
resultString += std::string(params[0]);
result = resultString; // 赋值给 result
}
} helloName(&s); // 类HelloName实例化

// A variable number of arguments are passed, all doubles, result is their sum.
class Sum : public XmlRpcServerMethod
{
public:
Sum(XmlRpcServer* s) : XmlRpcServerMethod("Sum", s) {}
void execute(XmlRpcValue& params, XmlRpcValue& result)
{
int nArgs = params.size();
double sum = 0.0;
for (int i=0; i<nArgs; ++i)
sum += double(params[i]);
result = sum;
}
} sum(&s);

int main(int argc, char* argv[])
{
if (argc != 2)
{
std::cerr << "Usage: HelloServer serverPort \n";
return -1;
}
int port = atoi(argv[1]);
// Specify the level of verbosity of informational messages. 0 is no output, 5 is very verbose.
XmlRpc::setVerbosity(5);

// 创建socket,绑定到端口,进入listen模式等待客户端
s.bindAndListen(port);

// Enable introspection
s.enableIntrospection(true);

// 等待一定时间以处理客户端请求,-1表示一直等待或指导exit()被调用
s.work(-1.0);
return 0;
}

服务端程序比较简单,主要是自定义method类后,使用XmlRpcServer的构造函数添加method,另外也可以用XmlRpcServer::addMethod(XmlRpcServerMethod* method);,构造函数中实际也是调用这个函数。

然后是函数setVerbosity:

1
2
3
// Easy API for log verbosity
int XmlRpc::getVerbosity() { return XmlRpcLogHandler::getVerbosity(); }
void XmlRpc::setVerbosity(int level) { XmlRpcLogHandler::setVerbosity(level); }

看来就是设置日志等级,不用再深究了。

bindAndListen其实就是封装了Linux网络编程的服务端常用函数,一看源码就清楚。至于work函数,内容如下:

1
2
3
4
5
6
// 在一定时间内处理客户端请求
void XmlRpcServer::work(double msTime)
{
XmlRpcUtil::log(2, "XmlRpcServer::work: waiting for a connection");
_disp.work(msTime);
}

这里看到日志函数了,等级是2,这里的2要与上面设置的等级相比较,也就是XmlRpcLogHandler::getVerbosity(),如果不大于所设置等级才会有日志。然后是XmlRpcDispatch::work,这个就是通信的核心了, 看源码应该是poll的方式, 懒得深入研究了。

在终端输入./HelloServer 8888运行,开始监听端口8888,运行客户端后收到了HelloName的远程调用

然后服务端回复XML,内容基于Http协议



参考:
XmlRPC简介及使用


使用jsoncpp读写JSON

还可以用另一个库 nlohmann

从Github上下载后,使用老三步安装:

1
2
3
4
mkdir dir && cd dir
cmake ..
make
sudo make install

安装的路径如下:

在CMakeList里进行配置,别忘了最后链接库文件:-ljsoncpp

写JSON

1
2
3
4
5
6
7
Json::Value ret_json;

ret_json["msg"] = "Hello JSON";
ret_json["code"] = 123;
ret_json["ret"] = true;

cout << ret_json.toStyledString()<< endl;

运行结果:

1
2
3
4
5
{
"code" : 123,
"msg" : "Hello JSON",
"ret" : true
}

写JSON数组如下:

1
2
3
4
5
6
7
8
9
10
Json::Value ret_json,sub;
sub.append(Json::Value(123));
sub.append(Json::Value(456));
sub.append(Json::Value(789));

ret_json["msg"] = "Hello JSON";
ret_json["code"] = sub;
ret_json["ret"] = true;

cout << ret_json.toStyledString()<< endl;

结果为:

1
2
3
4
5
{
"code" : [ 123, 456, 789 ],
"msg" : "Hello JSON",
"ret" : true
}


用SIGINT信号的回调函数关闭节点(自定义+ROS源码)

用自定义的方法关闭节点时,代码一般是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void mySigintHandler(int sig)
{
ROS_INFO("Exit from my_node !");
ros::shutdown();
ros::waitForShutdown();
}

int main(int argc, char** argv)
{
ros::init(argc, argv, "my_node", ros::init_options::NoSigintHandler);
ros::NodeHandle nh;
signal(SIGINT, mySigintHandler);
ros::spin();
return 0;
}


为什么这样做,原因看ros::start()的源码就明白了,其中有一句:

1
2
3
4
5
6
7
8
9
10
if (!(g_init_options & init_options::NoSigintHandler))
{
signal(SIGINT, basicSigintHandler);
}

void basicSigintHandler(int sig)
{
(void)sig;
ros::requestShutdown();
}

这就是配置ROS启动flag的问题,它是个枚举值,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
 //  Flags for ROS initialization,在命名空间ros::init_options
enum InitOption
{
//不安装SIGINT handler,可以安装自定义的SIGINT handler
NoSigintHandler = 1 << 0,

// 匿名节点模式,在节点名最后添加随机数,使得可以同时运行多个同名节点
AnonymousName = 1 << 1,

// brief Don't broadcast rosconsole output to the /rosout topic
NoRosout = 1 << 2,
};
typedef init_options::InitOption InitOption;

再看上面的源码,如果ROS flag不是NoSigintHandler,那么basicSigintHandler就是默认的SIGINT回调函数,它会运行ros::requestShutdown();,所以我们在运行节点时,按Ctrl+C可以退出节点。

下面我们详细看看这个过程,先看回调函数中的函数requestShutdown

1
2
3
4
void requestShutdown()
{
g_shutdown_requested = true;
}

只有一个赋值,而处理变量g_shutdown_requested的只有一个函数checkForShutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void checkForShutdown()
{
if (g_shutdown_requested) // 除非ros::requestShutdown()
{
// Since this gets run from within a mutex inside PollManager, we need to prevent ourselves from deadlocking with another thread that's already in the middle of shutdown()
boost::recursive_mutex::scoped_try_lock lock(g_shutting_down_mutex, boost::defer_lock);
while (!lock.try_lock() && !g_shutting_down)
{
ros::WallDuration(0.001).sleep();
}

if (!g_shutting_down) // ros::start()中赋值为false,之后未变
{
shutdown(); // 关闭节点
}

g_shutdown_requested = false;
}
}

而跟checkForShutdown有关的只有PollManager::instance()->addPollThreadListener(checkForShutdown);,它位于ros::start()中:

1
2
3
PollManager::instance()->addPollThreadListener(checkForShutdown);
. . . . . .
PollManager::instance()->start();

函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typedef boost::signals2::signal<void(void)> VoidSignal;
VoidSignal poll_signal_;
typedef boost::function<void(void)> VoidFunc;

// 使用了观察者模式
boost::signals2::connection PollManager::addPollThreadListener(const VoidFunc& func)
{
boost::recursive_mutex::scoped_lock lock(signal_mutex_);
return poll_signal_.connect(func);
}

void PollManager::start()
{
shutting_down_ = false;
thread_ = boost::thread(&PollManager::threadFunc, this);
}

void PollManager::threadFunc()
{
disableAllSignalsInThisThread();
while (!shutting_down_) //start()里赋值为false,PollManager::shutdown()里赋值true
{
boost::recursive_mutex::scoped_lock lock(signal_mutex_);
poll_signal_(); // 触发信号,执行槽函数 checkForShutdown
if (shutting_down_)
return;
poll_set_.update(100); // PollSet  poll_set_;函数定义在 poll_set.cpp
}
}

这里用到了boost中的信号和槽,和Qt中的原理一样,关键变量poll_signal_用于连接函数,这里是连接了 checkForShutdown。PollManager是一个线程,start()开启线程,不停地运行函数threadFunc(),因为shutting_down_开始为false,所以不停运行while循环,然后执行poll_signal_(),这会触发信号然后执行它连接的函数,也就是checkForShutdown,也就是节点不停检查是否需要关闭。

这里就是关键的变量g_shutdown_requested了,它只在ros::requestShutdown()赋值为true,所以节点只有在执行了ros::requestShutdown()函数后才会执行,然后执行ros::shutdown(),节点最终的关闭都在这里。当最后一个节点句柄被销毁时,shutdown()被自动调用。

说的有点乱了,整个过程示意图如下:


节点退出问题

以前我的SIGINT回调函数是这样写的:

1
2
ros::shutdown();
ros::waitForShutdown();

这是从网上看来的,后来发现偶尔会很长时间才执行成功,甚至不能成功,一直阻塞。现在来看一下源码:

1
2
3
4
5
6
7
8
9
10
11
12
void waitForShutdown()		// Wait for this node to be shutdown
{
while (ok())
{
WallDuration(0.05).sleep(); // 0.05s
}
}

bool ok()
{
return g_ok;
}

waitForShutdown表示如果节点ok(),会一直睡眠。只有在ros::shutdown()的最后执行了g_ok = false;,有时一直无法退出,说明节点是一直ok(),因为节点在回调函数运行时无法完成shutdown,没执行到g_ok = false;


Log4cpp的配置和使用

下载log4cpp后,按照老三步进行安装,头文件安装到/usr/local/include/log4cpp,库文件在/usr/local/lib。如果在ARM平台,安装为

1
2
3
./configure --build=arm-linux
make
sudo make install

在CMakeList里写好调用后,编译,结果报错:

1
2
3
4
//usr/local/lib/liblog4cpp.so: undefined reference to `pthread_key_create'
//usr/local/lib/liblog4cpp.so: undefined reference to `pthread_getspecific'
//usr/local/lib/liblog4cpp.so: undefined reference to `pthread_key_delete'
//usr/local/lib/liblog4cpp.so: undefined reference to `pthread_setspecific'

这是不识别pthread,需要再链接pthread,target_link_libraries(${PROJECT_NAME} -llog4cpp -pthread),这样就正常了

我的日志配置文件是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# DEBUG是指定rootCategory的log级别门槛
# TESTAppender是输出到日志文件
# 如果有console,日志会在CGI,IDE输出和Linux终端中输出
log4cpp.rootCategory=DEBUG,TESTAppender

# -------定义console属性------- #

# consoleAppender类型:控制台输出
# 控制台输出的log输出的格式是:[%p] %d{%H:%M:%S.%l} (%c): %m%n
log4cpp.appender.console=ConsoleAppender
log4cpp.appender.console.layout=PatternLayout
log4cpp.appender.console.layout.ConversionPattern=[%p] %d{%H:%M:%S.%l} (%c): %m%n

# -------定义TESTAppender的属性-------
# RollingFileAppender类型:输出到回卷文件,即文件到达某个大小的时候产生一个新的文件
log4cpp.appender.TESTAppender=RollingFileAppender

#当日志文件到达maxFileSize大小时,将会自动滚动,这里是100K
log4cpp.appender.TESTAppender.maxFileSize=100000000

#maxBackupIndex指定可以产生的滚动文件的最大数
log4cpp.appender.TESTAppender.maxBackupIndex=10

# 日志信息输出到的文件路径
log4cpp.appender.TESTAppender.fileName=/home/user/log/package.log

# PatternLayout 表示可以灵活指定布局模式
log4cpp.appender.TESTAppender.layout=PatternLayout

#append=true 信息追加到上面指定的日志文件中,false表示将信息覆盖指定文件内容
log4cpp.appender.TESTAppender.append=true
log4cpp.appender.TESTAppender.layout.ConversionPattern=[%d{%Y-%m-%d %H:%M:%S.%l}-%p]: %m%n

log4cpp的模式设置,我常用的是 [%d{%m-%d %H:%M:%S.%l}-%p]: %m%n,日志的形式: [03-08 13:48:07.876-INFO]: this is log

默认是%m%n

  • %%%%: a single percent sign
  • %c: the category
  • %d: the date
  • %p: 优先级
  • %r: milliseconds since this layout was created.
  • %R: seconds since Jan 1, 1970
  • %l: 毫秒
  • %m the message
  • %n: the platform specific line separator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <log4cpp/Category.hh>
#include <log4cpp/PropertyConfigurator.hh>


try
{
log4cpp::PropertyConfigurator::configure(config_base_path+"/log.conf");
}
catch (log4cpp::ConfigureFailure& f)
{
std::cout << "Configure Problem: " << f.what() << std::endl;
}

log4cpp::Category& log = log4cpp::Category::getRoot(); //类Category的静态函数进行初始化
log.info("Hello log, %s", "abc");
log.info("[%s -> %s -> %d] Test option",__FILE__,__FUNCTION__,__LINE__);

如果配置文件路径不存在,会报错: Configure Problem: File /home/user/Document/test.conf does not exist
但是程序还可以继续向下运行,也就是不会受日志的问题而影响,这正是我们需要的。

如果将log作为成员变量,这样写

1
2
3
4
log4cpp::Category& log;  // 成员变量

MyClass::MyClass()
:log(log4cpp::Category::getRoot() )

不指定配置文件仍能通过编译,但是不要这样做。 Log4cpp不存在获得配置文件或日志目录的接口

运行结果生成了日志文件,内容和终端输出都是:

1
2
[06-26 06:14:21.658 - INFO]: Hello log, abc
[06-26 06:14:21.659 - INFO]: [/home/user/QtProjects/TestLog4cpp/main.cpp -> main -> 22] Test option

log4cpp::Category::shutdown();会关闭所有的appenders,这一句只能在程序的最后使用,我曾经在某个类的析构函数里调用了它,但这个类在一个函数里使用,所以这个类对象是局部变量,函数结束后就执行了shutdown(),但主函数尚未结束,导致主函数后面的日志都不能显示了

另外还有一个 spdlog

spdlog 的配置使用


Boost教程(六)其他杂项

类型转换lexical_cast

最近发现除了Qt外,用boost做一些类型转换也很好,这样可以避开QString这样的Qt数据类型了,需要使用的时模板函数lexical_cast,需要try catch

1
2
3
4
5
6
7
8
9
try
{
string s = "123";
int num = boost::lexical_cast<int>(s);
}
catch (boost::bad_lexical_cast& e)
{
cout<<e.what()<<endl;
}

BOOST_FOREACH

只需要头文件#include <boost/foreach.hpp>

1
2
3
4
5
6
7
8
9
10
11
vector<int> ages;
ages.reserve(20);
ages.push_back(2);
ages.push_back(9);
ages.push_back(14);
ages.push_back(27);
ages.push_back(39);
BOOST_FOREACH(int age, ages)
{
cout<< age <<endl;
}

实际使用中,最好定义一个宏:#define Foreach BOOST_FOREACH,宏最好不要是foreach,容易跟其他库(例如Qt)冲突


Shell小技巧

多行注释

bash脚本的多行注释可以采用HERE DOCUMENT特性,实现多行注释,比如

1
2
3
4
<<'COMMENT'
...

COMMENT

举例如下:
1
2
3
4
5
6
7
8
#!/bin/bash
echo "Say Something"
<<COMMENT
your comment 1
comment 2
blah
COMMENT
echo "Do something else"

将命令执行的结果写入日志文件

catkin_make > make.log, 注意不要用一直执行的命令,比如ping

head与tail

tail -f showMapInfo.log可以实时显示文件的更新
tail -n 5 showMapInfo.log显示文件最后5行
head -n 5 showMapInfo.log显示文件开始5行

修改指定目录下的所有文件的权限

全体可读、可修改、可执行: chmod -R 777 dir

history

使用history列出历史命令后,如果需要执行第 1024 历史命令,可以执行 !1024

如果以前执行了一个很长命令,只记录命令开头是 curl,这时就可以通过 !curl 快速执行该命令。但是执行的可能不是我们需要的,所以常常加上 :p,即 !curl:p ,打印出了搜索到的命令,如果要执行,请按 Up 键,然后回车即可。

在命令前额外多加一个空格,这样的命令是不会被记录到历史记录的

对于可执行文件,在当前目录时用命令./file即可,如果是完整绝对路径,就用命令/path/file即可

检测温度

开始检测硬件传感器:sudo sensors-detect

要确保已经工作,运行命令:sensors,但只运行一次

设置终端初始路径

bashrc中加一行: cd PATH即可

有一次发现不管怎么改也没用,按照网上的方法修改bash_profile不奏效。 后来发现是系统环境变量$HOME被改变了,于是在/etc/profile中添加一行 export HOME=/home/user,问题解决

当前目录下占用最大磁盘空间的目录

1
du -ahx . | sort -rh | head -5

反向删除

文件夹下,删除abc文件以外的所有文件

1
ls | grep -v ^abc$ | xargs -i rm -rf {}

如果abc不加正则表达式,会保留文件名包含abc的文件,比如abcd

对当前目录下的所有文件排序

du -s *. | sort -nr

显示当前目录的最大的文件

du -s *. | sort -nr | head -1

将几个文件合并为一个文件

cat file1 file2 > file

检查系统中的设备情况

需要检查日志 /var/log/messages

只显示文件名和大小

  • ls -sh . 显示当前文件夹的所有文件大小

  • du -sh folder     显示指定文件夹或文件的大小,结果容易可读,比如4G

du -s /home/user/.ros/log的执行结果为

1
1147000 /home/user/.ros/log

大小的单位为 K。

如果只取空格左边部分,使用 echo $(du -s /home/user/.ros/log) | cut -d " " -f 1,如果取空格右边部分,那么把1改为2.

获取CPU核数

getconf _NPROCESSORS_ONLN 2>/dev/null || sysctl -n hw.ncpu || echo 1

tar.gz 压缩

file.txt文件压缩成file.tar.gz: tar -zcvf myfile.tar.gz myfile.txt

删除包含字符串的行

把文件中包含某字符串的行删掉,比如下面文件中的abc所在行删除,使用sed命令:

1
2
3
4
5
6
7
8
abc12
abc402
a213bc
dlifjljs
abc
904650-9
aabbcc
123abc53465

经过sed处理,变为
1
2
3
4
a213bc
dlifjljs
904650-9
aabbcc

如果执行sed -e '/abc/d' test.txt,只显示修改后的结果,未修改文件。如果执行sed -i,修改并保存文件。

延时函数

使用 sleep 或usleep函数。只看sleep函数:

1
2
3
4
5
#参数单位默认为秒。
sleep 1s 或 sleep 1 #表示延迟一秒
sleep 1m #表示延迟一分钟
sleep 1h #表示延迟一小时
sleep 1d #表示延迟一天

xargs

xargs 是给命令传递参数的一个过滤器,也是组合多个命令的一个工具。它能够捕获一个命令的输出,然后传递给另外一个命令。
xargs 可以将管道或标准输入数据转换成命令行参数,这是最常用的场合,也能够从文件的输出中读取数据。
比如执行命令find *.sh,结果如下:

1
2
3
4
5
deb软件安装.sh
Kinetic插件安装.sh
ping.sh
ubuntu初始化精简版.sh
所有alias.sh

执行find *.sh | xargs会将多行结果变为单行:
1
deb软件安装.sh Kinetic插件安装.sh ping.sh ubuntu初始化精简版.sh 所有alias.sh

执行find *.sh | xargs ls -smh会使xargs捕获find结果,然后传递给之后的命令做参数,这是管道做不到的,结果:

1
2
3
4.0K deb软件安装.sh, 4.0K Kinetic插件安装.sh,
4.0K ping.sh, 4.0K ubuntu初始化精简版.sh,
4.0K 所有alias.sh

xargs与管道不同,首先看命令echo test.cpp,结果是test.cpp.再看命令echo test.cpp | cat,结果还是test.cpp,这是管道将echo的结果定向为cat的输入,执行cat当然还是那个结果.但是echo test.cpp | xargs cat就不一样了,xargs将test.cpp做cat的命令参数,也就是第二次执行的是cat test.cpp

修改某文件的修改时间

1
touch -t 201905171210.20 test.py  # 指定新时间

查看进程PID及线程

  • pidof chrome     显示所有进程的pid,进程名必须完整
  • pgrep sublime     通过名字获取pid,可以是部分进程名
  • ps -T -p pid     进程的子进程情况

pkill kill killall

  • kill pid     按pid杀死进程
  • kill -9 pid     按pid强制终止进程,但是可能造成数据丢失,轻易不要使用。比如 kill -9 $(pgrep rosmaster)
  • killall name     按名称杀死一个进程组的所有进程,比如killall chrome杀死所有的chrome进程

  • pkill name     按名称终止包含name的所有进程

  • pkill -o chrome    杀死最旧的chrome进程
  • pkill -n chrome     杀死最新的chrome进程
  • pkill -c chrome     杀死所有chrome进程,同时显示数量

用脚本杀死阻塞的进程

1
2
3
4
5
6
7
8
#!/bin/bash

while true; do

command & sleep 2 && kill -2 $!
sleep 1
echo " ---------- start next loop ---------- "
done

command是一个阻塞的进程,只有按下ctrl+C后才能继续运行,但在这种循环的情况,显然不能一直去按ctrl+C,所以需要脚本去终止进程command,也就是kill -2 $!

杀死终端的进程

ubuntu终端的进程名为bash,如果在自己电脑pkill bash,还会剩下当前操作的终端,其他的全关。如果通过另一台电脑远程登录,执行pkill bash,可以将所有终端关闭。

xdotool

sudo apt-get install -y xdotool

  • 最小化当前窗口: xdotool windowminimize $(xdotool getactivewindow)

还有很多功能,这个工具能模拟鼠标键盘的大量操作,可参考 神器 xdotool

带参数的alias

有时需要运行一个带参数的很长的命令,这样感觉很不方便,可以与alias结合,提高效率。alias不支持直接加参数,而是通过函数来间接实现。

1
2
# test()是个函数
alias ldd='test() { ldd $1 | grep -v kinetic | grep -v x86_64-linux-gnu;}; test'

统计文件行数、字节、字数

wc命令,选项 -l, -c, -w 分别统计行数、字节、字数,可统计多个文件,但不能统计目录。

1
2
3
wc -c main.cpp
wc -l *.cpp
wc -l main.cpp test.cpp t.cpp //统计三个文件行数

只显示文件名和日期

1
ls -lSh | awk '{print $6,$7,"   " $8,"  " $9}'

统计目录中的文件个数 (包括文件夹)

ls | wc -l

当前文件夹或文件大小

当前文件夹:du -smh
当前文件夹下所有文件和子文件夹: du -mh .
某文件: du -smh file

当前目录下大于400M的文件夹和文件,并从小到大排序

1
sudo du -h | grep "[4-9][0-9][0-9]M" | sort -n

ls -lht frames.pdf 显示文件的详细信息,比如:

1
-rw-rw-r-- 1 user user 20K 6月  29 10:24 frames.pdf

时间相关

  • date -s "2024-10-15 15:50:00"    设置时间
  • date +%s    输出UTC时间

判断文件/文件夹是否存在

1
2
3
4
5
6
7
if [ -d ~/Documents/dir ]
then
echo "yes"
else
echo "no, so create it"
mkdir dir
fi

-d判断后面的文件夹是否存在
-f判断是否存在某文件
-s判断是否存在某文件,且大小不为0,这个比-f更有用

函数

组合多个命令到一个操作。下面的命令组合了 mkdir 和 cd 命令。输入 mc folder_name可以在你的工作目录创建一个名为folder_name的目录并立刻进入

1
2
3
4
mc () {
mkdir -p $1
cd $1
}

scp出错

使用scp传递文件时,出现了这样的报错:

解决方法: ssh -o StrictHostKeyChecking=no 192.168.73.14

注意: 这里的IP是发送者的IP

自动化工作

每次本机提交代码后,远程再更新,编译和移动可执行文件,过程比较繁琐,干脆都放到一个Shell脚本里,用alias执行这个脚本

1
2
3
4
5
6
7
cd /home/user/Robot/workspace/src
svn up
cd /home/user/Robot/workspace
catkin_make --pkg package1 package2 package3

cp $packagebin"/exe1" $dir
cp $packagebin"/exe2" $dir

用echo管道命令给sudo自动输入密码

1
echo password | sudo -S cmd

-S表示从标准输入读取密码。这样用于system函数就比较方便了,但是这种方式密码会明文显示,密码不安全

拆分大文件

split命令拆分大文件,以原本文件名为前缀,数字为后缀

1
split --verbose -n5 myfile.cpp -d myfile_

myfile.cpp分割为5个文件, 以myfile_为前缀, 显示分割过程,但是缺陷在于拆分的文件没有扩展名

进一步优化:把test.log拆分,每个最大200k,并且按照test_数字命名,带扩展名。
split test.log -b 200k -d -a 2 test_&&ls | grep test_ | xargs -n1 -i{} mv {} {}.log

升级版ping命令

当能ping通一个主机时,此时ping命令会一直执行,要想终止,可采用CTRL+c或CTRL+z方式退出。也可以设置选项方式,使得ping命令执行若干次包就终止。ping -c 4 ip,此时ping命令将执行4次

下面的脚步是执行ping命令时,可以只输入最后地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#/bin/bash

ip=`hostname -I`
net=$(cut -d'.' -f3<<<$ip)
var1=$(cut -d'.' -f1<<<$ip)
var2=$(cut -d'.' -f2<<<$ip)
sub=${var1}'.'${var2}'.'${net}'.'

echo -e "\033[41;37m 输入要ping的IP最后一个地址 \033[0m"
read -p ":" addr
address=${sub}${addr}

ping -c 3 $address

echo -e "\033[44;37m Ping IP address done ! \033[0m"

当前文件夹中只有一个子目录时,直接进入

可以做成alias ccd :

1
2
3
4
5
6
7
8
9
10
11
#/bin/zsh

n=$(ls | wc -l)

if [ "$n" -eq "1" ]
then
folder=$(ls)
cd $folder
else
echo "can't cd, more than 1 child folder"
fi

当前文件夹中只有一个文件时,直接用vim打开

1
2
3
4
5
6
7
8
9
10
#!/bin/bash

n=$(ls | wc -l)
if [ "$n" -eq "1" ]
then
file=$(ls)
vim $file
else
echo "more than 1 file"
fi

使用脚本在当前终端打开一个新标签

1
2
3
gnome-terminal --tab
# 打开两个标签
gnome-terminal --tab --tab

在终端中打开多个tab,每个tab运行单独的命令

1
2
3
4
5
6
7
8
9
10
11
title1="tab 1"
title2="tab 2"
title3="tab 3"

cmd1="echo tab1"
cmd2="echo tab2"
cmd3="echo tab3"

gnome-terminal --tab --title="$title1" --command="bash -c '$cmd1; $SHELL'" \
--tab --title="$title2" --command="bash -c '$cmd2; $SHELL'" \
--tab --title="$title3" --command="bash -c '$cmd3; $SHELL'"

参考链接

linux删除某文件以外的文件

1
2
3
rm -f  !(a) 
# 删除a和b以外的文件
rm -f !(a|b)

找出当前文件夹下大于100M的文件

1
find . -type f -size +100M -print0 | xargs -0 du -h | sort -nr

只显示当前所有文件的名称和大小

1
ls -smhSlr | awk '{ print $1, $10 }'

判断当前某进程的个数或是否在运行

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash

# ps -ef |grep zookeeper 这个就是看zookeeper的启动情况
# grep -v "grep" 是为了去掉查询 grep的那一条
# wc -l 是计数的

COUNT=$(ps -ef | grep zookeeper |grep -v "grep" |wc -l)
echo $COUNT
if [ $COUNT -eq 0 ]; then
echo NOT RUNNING
else
echo is RUNNING
fi

ROS通信模型

ROS节点构成了一个计算图(computation graph),它是一个p2p的网络结构,节点之间的拓扑结构为 mesh(网状)。节点必须与其他节点通讯,否则就是个普通程序。ROS通讯中,节点通过XML-RPC远程调用来实现建立连接,传输数据。远程调用负责管理节点对计算图中信息的获取与更改,还有一些全局的设置, 但RPC不直接支持数据的流传输(而是通过 TCPROS与 UDPROS)

每个ROS节点运行一个XML-RPC server,ROS的通讯模块在ros_comm

topic通信模型

我运行了两个发布者节点:Pub和second,一个订阅者节点:Sub,话题名为topic,详细情况如下:


上面列出了三个节点的端口号,IP都是相同的,因为都在本机上,roscore的uri为http://192.168.0.106:11311,整个通信模型如下:

发布者节点通过一个名为registerPublisher的远程调用(XML-RPC),注册它们的话题和消息类型到主节点。同样地,订阅者通过registerSubscriber()远程调用,注册到主节点,这样主节点就获知了双方的uri、话题名称、消息类型。如果仅仅话题名一样,消息类型不一样,也是不能通信的。

订阅者注册时,主节点会回复注册同一话题的所有发布者,然后订阅者就直接与发布者通信,基于TCPROS协议传输消息,这就跟主节点无关了。当又有新的发布者注册同一个话题到主节点时,主节点执行publisherUpdate()给订阅者,回复一个新的发布者list。

二者通信是基于TCPROS协议,过程与三次握手类似,订阅者与发布者建立第一次连接,传输topic信息,然后再根据发布者返回的topic 信息,建立第二次连接,发布者开始传输具体的数据。

对于unregistration,发布者执行远程调用unregisterPublisher(),订阅者执行unregisterSubscriber()远程调用。

举个例子,我们可以看一个节点的信息:
rosnode info.png
话题tf_static是本节点传递给amcl节点的,方向outbound说明了这一点,也就是本节点负责发布,消息传递方式是TCPROS;话题joint_states的传递方向是inbound,也就是本节点从节点joint_state_publisher订阅了此话题

TCPROS中的重点:

  • Md5:如果两个话题名一样而消息类型不同,会无法通信成功。TCPROS为了保证两边传输数据类型一致,会在协议头中给出话题名的md5 hash算法处理过的值,而每次你生成新的msg时,md5的值都会因为你内部数据类型的变动而改变。

  • Subscriber 选项tcp_nodelay :如果是“1” 则给socket 设置 TCP_NODELAY 选项,降低延迟,可能会降低传输效率。

  • Service client 选项:persistent 设置为1,则service的链接会一直开放给多个 service request

使用netstat和wireshark研究通信过程

但是我在使用ROS时, 发现ROS命令显示的端口号并不准确,使用netstatwireshark是找不到的,最好以后两者为准

启动了一对发布订阅者的程序,用netstat查看如下,排除rosout节点:

1
2
# 同IP所有建立通信的节点,排除rosout和SSH
netstat -anop | grep 0.109 | grep ESTA | grep -v rosout | grep -v 109:22

结果还需要去掉3个通信,就是sub-rosout, pub-rosout, python-rosout。python对应的进程是roscore
netstat的结果
显然pub和sub的通信端口不是rosnode info显示的,不知道这算不算ROS的bug

wireshark抓到的发布和订阅TCPROS通信
两者之间的TCP通信只有一问一答:Publisher发PSH和ACK,要求Subscriber数据尽快达到应用层,Subscriber发回ACK,从wireshark里看不到传递的消息,但ROS又不是加密的,这点我没明白什么意思

参考:ROS的弊端


rosbag的使用

bag文件包含tf信息和话题消息,没有rosparam参数和node。 /clock是由ROS的录包回放系统rosbag发布的主题,主要是提供一个仿真时间

rosbag本质也是个ROS节点,whereis rosbag的结果: /opt/ros/kinetic/bin/rosbag,运行rosbag play会出现一个play开头的节点,后面是当前的utc时间

rosbag record

基本使用:rosbag record topic,可以有多个话题:rosbag record topic1 topic2 topic3,话题可以用shell脚本列出来,例如: rosbag record rostopic list | grep plan

常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-o: 在文件名之前加指定的字符串, rosbag record -o test topic ,结果生成文件test_2019-06-24-14-32-08.bag

-node: 记录某节点订阅的所有话题,` rosbag record --node=/joy_teleop

-l: 只记录指定数量的消息, rosbag record -l 1000 /chatter,只记录1000条

--duration: 保持一定时间记录话题, rosbag record ---duration=30 /chatter, 即记录30秒,还可以有5m, 2h
rosbag record --split --duration=30 /chatter #持续时间到30s后分文件存储

-O: 大写O,指定完整文件名 rosbag record -O sess.bag /topic

--size: 指定文件大小,rosbag record --split --size=1024 /chatter #空间达到1024M后分文件存储

-e, -x: 用于正则表达式

我最常用的是rosbag record topic -o filename

rosbag info

显示指定bag文件的详细信息,rosbag info session*.bag

rosbag play

rosplay其实是重现话题数据传输的效果,比如向话题cmd_vel发数据的过程

rosbag回放过程中按空格暂停。默认模式下,rosbag play命令在公告每条消息后会等待0.2秒才真正开始发布bag文件中的消息。如果rosbag play在公告消息后立即发布,订阅器可能会接收不到几条最先发布的消息

如果同时回放两个单独的bag文件,则根据时间戳的间隔来播放。比如我先录制一个bag1,等待一个小时,然后录制另一个 bag2,在一起回放 bag1 和 bag2 时,实际是先回放bag1,然后等待1个小时才回放bag2

等待时间可以通过-d选项来指定,比如等待10秒再播放bag: rosbag play -d 10 realsense.bag

以暂停的方式启动,防止跑掉数据:

1
rosbag play --pause record.bag

要一帧一帧播放的效果,先用暂停方式启动,然后按S键,这个用的还比较少

设置以0.5倍速回放,也就是以录制频率的一半回放。可以加倍播放,但会增大CPU占用。

1
rosbag play -r 0.5 record.bag

循环播放:

1
rosbag play -l record.bag

use_sim_time的问题

播放bag时,运行算法程序,结果报警:

1
2
3
Warning: TF_OLD_DATA ignoring data from the past for frame base_footprint at time 1.59524e+09 according to authority unknown_publisher
Possible reasons are listed at http://wiki.ros.org/tf/Errors%20explained
at line 277 in /tmp/binarydeb/ros-kinetic-tf2-0.5.20/src/buffer_core.cpp

在其他节点之前运行rosbag play --clock ...,会设置好use_sim_time

--clock选项会让 rosbag play发布和订阅bag文件中的消息到/clock话题,这样会让你的其他节点运行在消息发布的时候。 如果use_sim_time是 true, /clock需要消息

这样一来launch文件里加入rosparam set use_sim_time true就不合适了,launch里常常不加入 rosbag play,即使加入了,也不一定先设置了use_sim_time,合适的执行顺序如下:

  1. 启动ROS,但是不包含你需要启动的节点
  2. rosparam set use_sim_time true
  3. 启动你需要的节点
  4. rosbag play —clock

有的资料说不用--clock才能正常,我估计这是数据集的问题,一般还是要有的

filter

可以实现对已有rosbag文件中的某些topic去除或者保留

1
rosbag filter input.bag output.bag 'topic == "/camera/image_raw/compressed" or topic == "/scan" or topic == "/timetag" or  topic == "/tf" and m.transforms[0].header.frame_id != "/odom" and m.transforms[0].child_frame_id != "/odom"'

结果是保留话题/camera/image_raw/compressed , /scan, /timetag, /tf。但是tf变换的frame_idchild_frame_id不是odom,这样的tf消息会被过滤

参考: How to remove a TF from a ROS bag

bag文件转txt csv

将file_name.bag文件中topic_name话题的消息转换到 file.txt文件中:

1
rostopic echo -b file.bag -p /topic_name > file.txt

-p 只能指定一个话题

rosbag可以直接转csv文件,用excel打开,跟转txt一样的

rosbag compress

如果录制的bag很大,我们可以压缩它,默认的压缩格式是bz2:

1
2
3
4
5
6
7
8
rosbag compress xxx.bag
# 可以添加`-j`手动指定压缩格式为bz2
rosbag compress -j xxx.bag
# 也可以使用LZ4来压缩数据
rosbag compress --lz4 xxx.bag

# 解压缩
rosbag decompress xxx.bag

实例

我们常常使用rosbag record命令录制机器人的建图或行走过程,一般先运行record命令,然后开始机器人的路径规划和行走,到达目标点后,在record命令的终端直接ctrl+C就可以保存bag文件.

这里涉及到一个时间戳的问题, bag文件的时间而非本机时间,在默认情况下,ROS使用ubuntu系统的时间,也就是wall clock time.但bag文件中记录的是历史时间,所以play之前要告诉ROS启用simulated time

1
rosparam set use_sim_time true

然后使用rosbag play --clock test.bag命令播放,当然要先打开rviz,添加对应的topic:

play结束之后,还要把参数还原:

1
rosparam set use_sim_time false

这样可以在rviz中离线分析问题了, 大大提升了效率

问题

我之前根据make_plan服务生成了路径,存放在bag文件里,在我使用rosbag play这些文件时,出现了报错:

不管怎么检查,文件命名也没有问题,不得其解。

使用rosbag info查看其信息,发现起始和终止时间相同,只有1个message。因为make_plan服务生成路径只是运行了一次回调函数,所以只有一个message,即使能在rviz显示,也只有几微妙的一瞬间,没什么意义

读取bag的大小不一致

录制bag包后可能没有正确结束掉程序,回放出现错误:

1
2
[ INFO] [1603677455.288200829]: Opening fuse_2020-10-24-18-01-07.bag
[FATAL] [1603677455.288476478]: Error reading from file: wanted 4 bytes, read 0 bytes

可使用如下命令修复:rosbag reindex xxx-xxx-xxx.bag

play时error reading version line

rosbag可能已损坏,你可以使用rosbag info XX.bag查看你想要读取的rosbag包的信息,如果无法读取到有效信息,说明该rosbag确实已损坏。


C++与Qt常用的类型转换

注意: c++11不允许char* c = "123";这种初始化,通常都用const char*stringchar*是不允许的。
printf("%s")只接收char*const char*,不能是string

stringToNum

template
Type stringToNum(const std::string& str)
{
std::istringstream iss(str);
Type num;
iss >> num;

return num;

}

const char* 转 string

1
2
const char* cc = "12345";
std::string s = cc;

其实就是直接赋值

string转const char*

1
2
3
std::string s = "12345";
const char* cc = s.c_str();
const char* cc = s.data(); // 实际调用同c_str()

const char 转 char

1
char* c = const_cast<char*>(cc);

char 转 const char

直接赋值转换即可

拼接const char*

先转换成string,用+运算符,最后转为const char*类型。或者用strcat,但是要保证空间足够:

1
2
3
4
5
6
char str1[30];
memset(str1, 0, sizeof(str1));
memcpy(str1, "abcd",4);
const char* str2 = "1234";
strcat(str1, str2);
cout << str1 <<endl;

还有stringstream
1
2
3
4
5
6
#include <sstream>

std::stringstream ss;
ss << a << b;

std::string combined = ss.str();

数值类型转为string

C++11标准增加了全局函数std::to_string,有多个重载,将常用数值类型转为string

1
2
3
4
5
6
7
8
9
string to_string (int val);
string to_string (long val);
string to_string (long long val);
string to_string (unsigned val);
string to_string (unsigned long val);
string to_string (unsigned long long val);
string to_string (float val);
string to_string (double val);
string to_string (long double val)

string转数值类型

也是C++11新增加的函数

1
2
3
4
std::stod 		// string转double
std::stof // string转float
std::stoi // string转int
std::stol // string转long

QString转std::string

QString::toStdString

std::string转QString

QString::fromStdString

QString转char*

1
2
3
QString str1 = "Hello";
QByteArray a = str.toLatin1();
char *b = a.data();

char* 转QString

1
2
char *b;
QString str2 = QString(QLatin1String(b));

QByteArray与QString

QString会用UTF-16编码存储,而qDebug()等I/O函数会以UTF-8编码处理。其实转换后的字节流是正确的,只是显示时用了和字节流不同的编码方式处理导致乱码

// Qt默认会使用本机编码,所以对于中文系统,下面这句设置是多余的
QTextCodec::setCodecForLocale(QTextCodec::codecForName("GBK"));

1
2
3
4
5
6
7
8
9
//QByteArray转QString
QByteArray ba;
QString s = QString(ba); // 强制转换

//QString转QByteArray
QString s="abc";
QByteArray ba = s.toLocal8Bit(); // 受setCodecForLocale影响,会转换为设定的编码。如果本机不支持指定编码,则会按toLatin1处理
QByteArray baLatin1 = str1.toLatin1(); //不受setCodecForLocale影响,强制转换为ISO-8859-1编码
QByteArray bUtf8 = str1.toUtf8(); // 不受setCodecForLocale影响,强制转换为UTF-8编码

QByteArray为汉字时,输出会出现乱码,解决方法:

1
2
3
4
5
QByteArray ba = "汉字";
QTextCodec* tc = QTextCodec::codecForName("GBK");
qDebug()<<ba; // "\xBA\xBA\xD7\xD6"
qDebug()<<QString(ba); // "????"
qDebug()<<tc->toUnicode(ba); // 汉字

wchar_t 转 QString

1
2
wchar_t a[10];
QString str1= QString::fromWCharArray(a);

int/char 转 16进制QString 补0

1
2
int a = 0x0483;
QString str1 = QString("%1").arg(a,4,16,QLatin1Char('0'));

环境设置为GB2312,汉字会显示为乱码,使用QStringLiteral会报错error: converting to execution character set: Illegal byte sequence,发现是Mingw编译器问题,换成MSVC编译正常。

参考:Qt字符编码、乱码的一点总结


roscore源码

执行which roscore的结果是/opt/ros/kinetic/bin/roscore,本身是个python脚本。

roscore的源代码在https://github.com/ros/ros_comm/blob/melodic-devel/tools/roslaunch/scripts/roscore,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

import sys
from optparse import OptionParser
from rosmaster.master_api import NUM_WORKERS

NAME = 'roscore'

def _get_optparse():

parser = OptionParser(usage="usage: %prog [options]",
prog=NAME,
description="roscore will start up a ROS Master, a ROS Parameter Server and a rosout logging node",
epilog="See http://wiki.ros.org/roscore"
)
parser.add_option("-p", "--port",
dest="port", default=None,
help="master port. Only valid if master is launched", metavar="PORT")
parser.add_option("-v", action="store_true",
dest="verbose", default=False,
help="verbose printing")
parser.add_option("-w", "--numworkers",
dest="num_workers", default=NUM_WORKERS, type=int,
help="override number of worker threads", metavar="NUM_WORKERS")
parser.add_option("-t", "--timeout",
dest="timeout",
help="override the socket connection timeout (in seconds).", metavar="TIMEOUT")
parser.add_option("--master-logger-level",
dest="master_logger_level", default=False, type=str,
help="set rosmaster.master logger level ('debug', 'info', 'warn', 'error', 'fatal')")
return parser


parser = _get_optparse()
(options, args) = parser.parse_args(sys.argv[1:])
#从这里知道roscore 并不支持参数
if len(args) > 0:
parser.error("roscore does not take arguments")

#roscore实际只调用roslanch.main
import roslaunch
roslaunch.main(['roscore', '--core'] + sys.argv[1:])

roslaunch.main

roslaunch.main 其实主要做了几件事,

  • 创建保存log的文件夹
  • 启动roslaunch server
  • 启动roscores master和rosout

roslaunch脚本只有两句话,其实是导入了roslaunch包,执行了roslaunch.main()函数:

1
2
import roslaunch
roslaunch.main()

roslaunch.main()的源码在http://docs.ros.org/kinetic/api/roslaunch/html/ 的217行。