当前位置:   article > 正文

python机器人编程——无人机python联动控制实现(VREP仿真)1——手搓一个类ROS机器人消息订阅发布模块_v-rep无人机

v-rep无人机

一、前言

我们知道ROS是一个开源的机器人系统,有人叫他是操作系统,里面有丰富的生态,如导航等,很多大学搞研究都利用它。但是,使用ROS很多是基于linux系统,并且需要按照,学习,同时,很多用ROS可能只是为了使用它的消息发布订阅机制,由于其基于的是进程间的通信,可能对通信的实时性也有一些影响。本章,我们来手搓一个纯python构建的类似ROS系统的消息订阅发布系统,非常轻量级,可以在开发机器人时候,想轻量化不想装ROS系统的朋友们。并且我们把这个消息模块用在了无人机仿真控制环境如下:
在这里插入图片描述

接下来开搞…

二、总体设想

开发这个分布订阅系统的目的,我们想是作为一个机器人系统的“中枢神经”系统,用于跟硬件打交道,并为上层运动控制软件提供消息输入输出的服务,并且需要保持一定的实时性,满足实时控制,整个框架设想如下:
在这里插入图片描述

三、系统的组成

如上图所示,我们的一个消息订阅发布系统可以负责跟硬件打交道,通过如串口、以太网口通信的方式去和采集硬件如网关进行通信,这个我们在这里命名为“第一级原始信号通讯”,然后我们会进入一个预处理模块,对原始信号进行一些预处理,这个预处理模块根据需要我们可以自定义修改,预处理的型号变成了上层模块想要的格式和频率的TOPIC(主题)后,可以将这个TOCPIC发布出来,供多个用户订阅后使用,这个过程我们命名为“第二级信号预处理”。经过这个两级处理,从底层传感或者部件采集的不同频率、不同格式数据就变成了可以供上层应用的有固定周期和格式的TOPIC。接下来就可以做各种应用模块了。

四、python代码构建

构建一个MessageBroker消息代理类

python实现的消息代理类其实也非常常见了,网上到处都是,这里我们结合了一下协程的概念,对普通的消息代理类进行了一些优化,利用协程的高性能特性,势必可以在大量的消息来源服务下保持一个好的性能,这部分有待压力测试,发布本博文前并未经过性能测试。源代码如下:

以下这个是常规的MessageBroker类:

class MessageBroker:
    #同步消息订阅分发处理
    def __init__(self):
        self.message_queue = queue.Queue()
        self.subscribers = {}
        self.running=True

    def publish(self, topic, message):
        if self.running:
            self.message_queue.put((topic, message))
        else:
            print("MessageBroker is already stopped")
            

    def subscribe(self, topic, callback):
        if self.running:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(callback)
        else:
            print("MessageBroker is already stopped")
  
    def unsubscribe(self, topic, callback):
        if topic in self.subscribers:
            if callback in self.subscribers[topic]:
                self.subscribers[topic].remove(callback)

    def start(self):
        self.running=True
        def worker():
            while True:                
                topic, message = self.message_queue.get()
                #print("worker..")
                if message=="close":
                    print("MessageBroker thread stoped")
                    break
                if topic in self.subscribers:
                    for callback in self.subscribers[topic]:
                        callback(message)
                self.message_queue.task_done()
        thread = threading.Thread(target=worker)
        thread.daemon = True
        thread.start()
        print("broker started at:",thread)
    def close(self):
        self.running = False
        self.subscribers.clear()
        self.message_queue.put(('stop', "close"))
        del self.message_queue
        self.message_queue=queue.Queue()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

以下这个是引入协程的MessageBroker类:

class MessageBrokerAsy:
    #高性能异步步消息订阅分发处理
    def __init__(self):
        self.message_queue = queue.Queue()
        self.subscribers = {}        
        self.thread=None
        self.running=True
        #self.condition = asyncio.Condition()

    def publish(self, topic, message):
        if self.running:
            self.message_queue.put((topic, message))
            return True
        else:
            print("MessageBroker is already stopped")
            return 

    def subscribe(self, topic, callback):
        if self.running:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            #asy_callback=self.make_async(callback)
            self.subscribers[topic].append(callback)
            return True
        else:
            print("MessageBroker is already stopped")
            return 
         
    def unsubscribe(self, topic, callback):
        if topic in self.subscribers:
            if callback in self.subscribers[topic]:
                self.subscribers[topic].remove(callback)
   
    async def main(self):        
        while True:  
            if self.running==False:
                print("MessageBroker thread stoped")
                break
            topic, message = self.message_queue.get()
            if topic in self.subscribers:
                callbacks = self.subscribers[topic]
                for callback in callbacks:
                    if inspect.iscoroutinefunction(callback):
                        await callback(message) 
                    else:
                        callback(message)                         
                    #asyncio.run(dotasks())                    
            self.message_queue.task_done() 
    def start(self):            
        def worker():
            self.running=True
            asyncio.run(self.main())             
 
        thread = threading.Thread(target=worker)
        thread.daemon = True
        thread.start()
        self.thread=thread
        print("broker started at:",thread)
    def close(self):
        self.running = False
        time.sleep(1)
        self.subscribers.clear()       
        del self.message_queue
        self.message_queue=queue.Queue()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

如上所述,我们可以实例化这个类之后,通过start()启动这个消息服务,并且是在新线程中运行,不会影响主线程,也保持了一定的实时性,然后在main()这个消息分发处理方法中,采用了协程处理,这样理论上会提高处理时间。
此外,可以通过subscribeunsubscribe 来订阅相关的主题,用publish 发布主题:

下面是使用MessageBroker消息代理类

if __name__ == '__main__':	
	MB=MessageBrokerAsy()#创建一个消息系统
	MB.start()#启动一个消息系统
	 # 订阅者1的回调处理函数
    async def subscriber1(message):
        print("Subscriber 1 received:", message,time.perf_counter())        
   # 订阅者2的回调函数
    def subscriber2(message):
        print("Subscriber 2 received:", message)	
	# 订阅主题为"topic1"的消息,可以支持多个处理函数(用户)
    broker.subscribe("topic1", subscriber1)
    broker.subscribe("topic1", subscriber2)
	for i in range(20):
        # 模拟采集数据,发布消息到主题"topic1"
        MB.publish("topic1", np.array([11,1.5]))        
        time.sleep(0.5)
    MB.close()   

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

运行如下:
在这里插入图片描述

构建一个DataProcessor消息预处理类

根据我们的设想,是要建设一个消息预处理系统,也就是对原始数据进行加工处理,将原始数据按照一定的频率发布出去供上层订阅使用。这个DataProcessor是这么构建的:

class DataProcessor:
    def __init__(self, interval,broker=None,topic=None,processer=None):
        self.interval = interval        
        self.data_queue1 = []
        self.data_queue2 = []
        self.queue_shift=1
        if processer==None:#默认为均值过滤器
            self.processer=self.Everage_Filter
        else:
            self.processer=processer
        #输出逻辑
        if broker==None:
            self.broker=self
        else:
            self.broker=broker
        #发布的主题
        if topic==None:
            self.topic="Notopic"
        else:
            self.topic=topic
        
    def publish(self, topic, message):
        print(topic, message)        

    def Everage_Filter(self,data_queue):
        # 均值过滤,处理队列内的数据的逻辑,数据指定为np格式        
        if len(data_queue)>0:
            average = sum(data_queue) / len(data_queue)
            data_queue.clear()      
            #print("done:",average)
        else:
            print("empty done:",average,time.perf_counter())
        return average

    async def process_data(self):
        while True:
            processed_data=None
            if self.queue_shift==1:
                data_queue=self.data_queue1
                if len(data_queue) > 0:
                    self.queue_shift=2
                    # 处理队列内的数据
                    processed_data = self.processer(data_queue)
                    print("Processed data1:")
                    self.broker.publish(self.topic,processed_data)
            if self.queue_shift==2:
                data_queue=self.data_queue2                
                if len(data_queue) > 0:
                    self.queue_shift=1
                    # 处理队列内的数据
                    processed_data = self.processer(data_queue)
                    print("Processed data2:")            
                    self.broker.publish(self.topic,processed_data)
            await asyncio.sleep(self.interval)   

    def add_data(self, data):
        # 向队列中添加数据,多个容器,避免冲突
        if self.queue_shift==1:
            self.data_queue1.append(data)
        if self.queue_shift==2:
            self.data_queue2.append(data)
    async def start(self):
        # 启动主循环
        #loop = asyncio.get_event_loop()
        #loop.create_task(self.process_data())
        #loop.run_forever()       
        return asyncio.create_task(self.process_data())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

如上所述,我们在DataProcessor初始化时定义了interval 消息更新的周期,broker 指定的分发模块,topic 预处理完后的消息名称如“机械臂末端坐标”,processer 预处理的自定义函数,预处理默认我们内置了一个在周期内取平均的过滤器。这样就可以把各路原始数据来自一级系统进行预处理后在二级系统broker中进行发布了。
此外,我们知道原始数据路数非常多,如果使用串行进行预处理肯定是不合理的,特别是用到某些耗时的预处理函数时,那么我们还是引入了协程处理的方式,通过再开启一个线程,并集中创建预处理任务的方式,让各路预处理函数平行运行,这样就提高了效率。于是我们再要构建一个类DataProcessors

构建一个DataProcessors平行协程处理类

这个类的代码如下:

class DataProcessors:
    def __init__(self):
        self.DataPros=[]
        self.tasks=[]
        self.stop=False
        self.T=None
        self.main_queue= queue.Queue()
    def add(self,DataPro):
        self.DataPros.append(DataPro)       
    
    def close(self):
        self.stop=True
        self.DataPros.clear()
        
    def start(self):
        #创建新线程,启动所有周期处理
        async def main():
            # 启动主循环
            # 在主线程中创建并行的协程定时任务
            """
            async def datasource(processor):
                while True:
                    #print("Running task 2...")
                    # 在这里编写您的定时任务逻辑
                    processor.add_data(101)
                    processor.add_data(111)
                    processor.add_data(130)
                    await asyncio.sleep(0.5)  # 设置定时任务的时间间隔为10秒
            """    
            #task2=datasource            
            for DataPro in self.DataPros:
                self.tasks.append(await DataPro.start()) 
                #await asyncio.gather(task1(), task2(processor))        
                #task2_obj = asyncio.create_task(task2(DataPro))
            
            while True:
                print("DataProcessors main runing")
                await asyncio.sleep(5) 
                if self.stop:
                    break
            #task1_obj.cancel()            
            #task2_obj.cancel()
            print("DataProcessors  stoped")
        
        def run():
            print("Starting main program...")
            asyncio.run(main())
        thread = threading.Thread(target=run)
        thread.daemon = True
        thread.start()
        print("DataProcessors started at:",thread)
        self.T=thread
        return thread
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

综合应用示例

结合以上三个模块,我们就完成了本标题的一个类ROS机器人消息订阅发布模块,以下是简易使用:

if __name__ == '__main__':
    # 示例用法
    broker = MessageBrokerAsy()
    # 启动消息分发系统
    broker.start()
    
    # 创建一个DataProcessor对象
    processor = DataProcessor(interval=1,broker=broker,topic="topic2")  # 设置时间周期为5秒
    DataProcessors1=DataProcessors()
    DataProcessors1.add(processor)
    T=DataProcessors1.start() 
    
    # 订阅者1的回调函数
    async def subscriber1(message):
        #print("Subscriber 1 received:", message,time.perf_counter())
        processor.add_data(message)
        
    # 订阅者1的回调函数
    async def subscriber3(message):
        print("Subscriber 3 received:", message,time.perf_counter())
        
        
    # 订阅者2的回调函数
    def subscriber2(message):
        print("Subscriber 2 received:", message)
    # 订阅者2的回调函数
    def subscriber4(message):
        print("Subscriber 4 received:", message)
 
    # 订阅主题为"topic1"的消息
    broker.subscribe("topic1", subscriber1)
    #broker.subscribe("topic1", subscriber3)
    
    broker.subscribe("topic1", subscriber2)
    broker.subscribe("topic2", subscriber4)

    # 取消订阅主题"topic1"的消息
    #broker.unsubscribe("topic1", subscriber2)
    
    for i in range(20):
        # 发布消息到主题"topic1"
        broker.publish("topic1", np.array([11,1.5]))
        broker.publish("topic1", np.array([1.1,1.5]))
        broker.publish("topic1", np.array([12,1.5]))
        broker.publish("topic1", np.array([1.1,1.5]))
        time.sleep(0.5)
    
    DataProcessors1.close()
    time.sleep(6)
    broker.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

如上所述,我们对一级原始数据通过broker进行发布"topic1"给预处理器processor,processor在processors的线程里周期处理原始数据,处理完后按照周期为1s定时进行发布"topic2","topic2"即为成品数据,给subscriber4使用。
在这里插入图片描述
如上图,红框为原始数据,红线为预处理后的成品数据(这里是平均值)。

五、总结

好了到此,我们构建了一个非常轻量的类ROS机器人消息订阅发布模块,当然跟ROS其实没有半毛钱关系,也没法比较,只是一个说头。源码已经上传至本站资源库点击连接。需要请下载获取或者关注公众号回复获取。
下一篇我们将这个系统用于无人机的控制,尽情期待…
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/504117
推荐阅读
相关标签
  

闽ICP备14008679号