赞
踩
主要是学习自带的example中的spawn_npc.py的源码.
发现前一天学的很多代码都可以在这里找到原型,觉得还是要先把example代码学习完,在读一下doc才能更深入的了解carla.
- #!/usr/bin/env python
-
- # Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de
- # Barcelona (UAB).
- #
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
-
- """Spawn NPCs into the simulation"""
-
- import glob
- """
- glob()函数是python的glob模块中的方法,是种文件通配符,glob()函数可以查找符合自己要求的文件,
- glob模块中的函数,有三个:glob.glob(pathname,*,recursive=False)
- """
- import os
- import sys
- import time
-
- try:
- sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
- sys.version_info.major,
- sys.version_info.minor,
- 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
- except IndexError:
- pass
- """
- os.name该变量返回当前操作系统的类型,当前只注册了3个值:分别是posix , nt , java, 对应linux/windows/java虚拟机
- """
-
- import carla
-
- from carla import VehicleLightState as vls
-
- import argparse
- # argparse 是一种类似于python字典的数据结构
- # 我们可以用args.参数名来获取参数
- import logging
- from numpy import random
-
- def main():
- argparser = argparse.ArgumentParser(
- description=__doc__)
- argparser.add_argument(
- '--host',
- # 主机127.0.0.1
- metavar='H',
- default='127.0.0.1',
- help='IP of the host server (default: 127.0.0.1)')
- argparser.add_argument(
- '-p', '--port',
- # 端口2000
- metavar='P',
- default=2000,
- type=int,
- help='TCP port to listen to (default: 2000)')
- argparser.add_argument(
- '-n', '--number-of-vehicles',
- # 默认车辆个数
- metavar='N',
- default=10,
- type=int,
- help='number of vehicles (default: 10)')
- argparser.add_argument(
- '-w', '--number-of-walkers',
- # 默认行人数
- metavar='W',
- default=50,
- type=int,
- help='number of walkers (default: 50)')
- argparser.add_argument(
- '--safe',
- # 大概是安全系数?
- action='store_true',
- help='avoid spawning vehicles prone to accidents')
- argparser.add_argument(
- '--filterv',
- # 选择车辆的某种参数
- metavar='PATTERN',
- default='vehicle.*',
- help='vehicles filter (default: "vehicle.*")')
- argparser.add_argument(
- '--filterw',
- # 选择行人的某种参数
- metavar='PATTERN',
- default='walker.pedestrian.*',
- help='pedestrians filter (default: "walker.pedestrian.*")')
- argparser.add_argument(
- '--tm-port',
- # traffic_manager的参数
- metavar='P',
- default=8000,
- type=int,
- help='port to communicate with TM (default: 8000)')
- argparser.add_argument(
- '--sync',
- # 同步模式
- action='store_true',
- help='Synchronous mode execution')
- argparser.add_argument(
- '--hybrid',
- # 异步模式
- action='store_true',
- help='Enanble')
- argparser.add_argument(
- '-s', '--seed',
- # 生成设备的种子??
- metavar='S',
- type=int,
- help='Random device seed')
- argparser.add_argument(
- '--car-lights-on',
- # 车灯开启
- action='store_true',
- default=False,
- help='Enanble car lights')
- # 封装到了args里面
- args = argparser.parse_args()
-
- # 日志,先可以不用过多了解
- logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
-
- # 生成的车辆list
- vehicles_list = []
- # 生成的行人的list
- walkers_list = []
- # 所有的ID
- all_id = []
- # 获取客户端
- client = carla.Client(args.host, args.port)
- # 设定时序?
- client.set_timeout(10.0)
- # 现将同步模式关闭
- synchronous_master = False
- # 随机数种子,返回时间戳?
- random.seed(args.seed if args.seed is not None else int(time.time()))
-
- try:
- # 获取仿真世界
- world = client.get_world()
- # 设置tm相关的参数,要管理大量的车辆必须使用tm
- # 先设置tm的端口
- traffic_manager = client.get_trafficmanager(args.tm_port)
- # 设置tm的安全距离为1米
- traffic_manager.set_global_distance_to_leading_vehicle(1.0)
- # 将物理引擎打开
- if args.hybrid:
- traffic_manager.set_hybrid_physics_mode(True)
- # 生成随机数相关?
- if args.seed is not None:
- traffic_manager.set_random_device_seed(args.seed)
-
- # 把同步模式打开
- if args.sync:
- settings = world.get_settings()
- traffic_manager.set_synchronous_mode(True)
- if not settings.synchronous_mode:
- synchronous_master = True
- settings.synchronous_mode = True
- # 设置0.05秒一帧?
- settings.fixed_delta_seconds = 0.05
- # 将相关的设置应用到仿真世界中去
- world.apply_settings(settings)
- else:
- synchronous_master = False
-
- # 获取仿真世界里面的蓝图,主要用filter获取所有的车辆
- blueprints = world.get_blueprint_library().filter(args.filterv)
- # 获取仿真世界里面的蓝图,主要用filter获取所有的行人模型
- blueprintsWalkers = world.get_blueprint_library().filter(args.filterw)
-
- # 保证安全,避免产生的车辆容易发生故障?
- if args.safe:
- blueprints = [x for x in blueprints if int(x.get_attribute('number_of_wheels')) == 4]
- blueprints = [x for x in blueprints if not x.id.endswith('isetta')]
- blueprints = [x for x in blueprints if not x.id.endswith('carlacola')]
- blueprints = [x for x in blueprints if not x.id.endswith('cybertruck')]
- blueprints = [x for x in blueprints if not x.id.endswith('t2')]
-
- # 将车辆以id来排序
- blueprints = sorted(blueprints, key=lambda bp: bp.id)
-
- # 获取world中可以生成车辆的点
- spawn_points = world.get_map().get_spawn_points()
- # 生成点的个数
- number_of_spawn_points = len(spawn_points)
-
- # 如果车辆的个数少于点的个数,那么打乱点的顺序,否则就会报错,先忽略报错
- if args.number_of_vehicles < number_of_spawn_points:
- random.shuffle(spawn_points)
- elif args.number_of_vehicles > number_of_spawn_points:
- msg = 'requested %d vehicles, but could only find %d spawn points'
- logging.warning(msg, args.number_of_vehicles, number_of_spawn_points)
- args.number_of_vehicles = number_of_spawn_points
-
- # 这部分主要是carla中的命令,可能是为了方便?
- # @todo cannot import these directly.
- # 控制所有生成actor
- SpawnActor = carla.command.SpawnActor
- # 控制车辆的自动驾驶开关
- SetAutopilot = carla.command.SetAutopilot
- # 控制车的灯
- SetVehicleLightState = carla.command.SetVehicleLightState
- # 控制某种属性,默认值是0
- FutureActor = carla.command.FutureActor
-
- # --------------
- # Spawn vehicles
- # --------------
- batch = []
- # 给spawn_points加上序号0,1,2,3...
- for n, transform in enumerate(spawn_points):
- # 若n大于要求的数量就报错
- if n >= args.number_of_vehicles:
- break
- # 随机选择一辆车的蓝图,下面全是随机选择并设置车的相关属性
- blueprint = random.choice(blueprints)
- if blueprint.has_attribute('color'):
- color = random.choice(blueprint.get_attribute('color').recommended_values)
- blueprint.set_attribute('color', color)
- if blueprint.has_attribute('driver_id'):
- driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values)
- blueprint.set_attribute('driver_id', driver_id)
- blueprint.set_attribute('role_name', 'autopilot')
-
- # 设置车灯的一些属性
- # prepare the light state of the cars to spawn
- light_state = vls.NONE
- if args.car_lights_on:
- light_state = vls.Position | vls.LowBeam | vls.LowBeam
-
- # 将随机好的车辆生成并设置驾驶模式和车灯开启相关
- # spawn the cars and set their autopilot and light state all together
- batch.append(SpawnActor(blueprint, transform)
- .then(SetAutopilot(FutureActor, True, traffic_manager.get_port()))
- .then(SetVehicleLightState(FutureActor, light_state)))
-
- # 生成同步模式相关
- for response in client.apply_batch_sync(batch, synchronous_master):
- if response.error:
- logging.error(response.error)
- else:
- vehicles_list.append(response.actor_id)
-
- # -------------
- # Spawn Walkers
- # -------------
- # some settings
- percentagePedestriansRunning = 0.0 # how many pedestrians will run
- percentagePedestriansCrossing = 0.0 # how many pedestrians will walk through the road
- # 1. take all the random locations to spawn
- # 主要是将carla的transform的数据格式放入spawn_point中
- spawn_points = []
- for i in range(args.number_of_walkers):
- spawn_point = carla.Transform()
- # 只能针对walks使用,在sidewalk上使用
- loc = world.get_random_location_from_navigation()
- if (loc != None):
- spawn_point.location = loc
- spawn_points.append(spawn_point)
- # 2. we spawn the walker object
- # 设置行人的速度
- batch = []
- walker_speed = []
- for spawn_point in spawn_points:
- walker_bp = random.choice(blueprintsWalkers)
- # set as not invincible
- # 设置无敌?
- if walker_bp.has_attribute('is_invincible'):
- walker_bp.set_attribute('is_invincible', 'false')
- # set the max speed
- if walker_bp.has_attribute('speed'):
- if (random.random() > percentagePedestriansRunning):
- # walking
- walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1])
- else:
- # running
- walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2])
- else:
- print("Walker has no speed")
- walker_speed.append(0.0)
- batch.append(SpawnActor(walker_bp, spawn_point))
- results = client.apply_batch_sync(batch, True)
- walker_speed2 = []
- for i in range(len(results)):
- if results[i].error:
- logging.error(results[i].error)
- else:
- walkers_list.append({"id": results[i].actor_id})
- walker_speed2.append(walker_speed[i])
- walker_speed = walker_speed2
- # 3. we spawn the walker controller
- # 设置一个行人的控制器
- batch = []
- # 在蓝图库中找到行人ai控制器
- walker_controller_bp = world.get_blueprint_library().find('controller.ai.walker')
- for i in range(len(walkers_list)):
- batch.append(SpawnActor(walker_controller_bp, carla.Transform(), walkers_list[i]["id"]))
- results = client.apply_batch_sync(batch, True)
- for i in range(len(results)):
- if results[i].error:
- logging.error(results[i].error)
- else:
- walkers_list[i]["con"] = results[i].actor_id
- # 4. we put altogether the walkers and controllers id to get the objects from their id
- for i in range(len(walkers_list)):
- all_id.append(walkers_list[i]["con"])
- all_id.append(walkers_list[i]["id"])
- all_actors = world.get_actors(all_id)
-
- # wait for a tick to ensure client receives the last transform of the walkers we have just created
- if not args.sync or not synchronous_master:
- world.wait_for_tick()
- else:
- world.tick()
-
- # 5. initialize each controller and set target to walk to (list is [controler, actor, controller, actor ...])
- # set how many pedestrians can cross the road
- # 设置行人的控制相关
- world.set_pedestrians_cross_factor(percentagePedestriansCrossing)
- for i in range(0, len(all_id), 2):
- # start walker
- all_actors[i].start()
- # set walk to random point
- all_actors[i].go_to_location(world.get_random_location_from_navigation())
- # max speed
- all_actors[i].set_max_speed(float(walker_speed[int(i/2)]))
-
- print('spawned %d vehicles and %d walkers, press Ctrl+C to exit.' % (len(vehicles_list), len(walkers_list)))
-
- # example of how to use parameters
- traffic_manager.global_percentage_speed_difference(30.0)
-
- while True:
- if args.sync and synchronous_master:
- world.tick()
- else:
- world.wait_for_tick()
-
- finally:
- # 主要是恢复初始化
-
- if args.sync and synchronous_master:
- settings = world.get_settings()
- settings.synchronous_mode = False
- settings.fixed_delta_seconds = None
- world.apply_settings(settings)
-
- print('\ndestroying %d vehicles' % len(vehicles_list))
- client.apply_batch([carla.command.DestroyActor(x) for x in vehicles_list])
-
- # stop walker controllers (list is [controller, actor, controller, actor ...])
- for i in range(0, len(all_id), 2):
- all_actors[i].stop()
-
- print('\ndestroying %d walkers' % len(walkers_list))
- client.apply_batch([carla.command.DestroyActor(x) for x in all_id])
-
- time.sleep(0.5)
-
- if __name__ == '__main__':
-
- try:
- main()
- except KeyboardInterrupt:
- pass
- finally:
- print('\ndone.')
在学习的过程中,想到了昨天调用cv2的包,突发奇想,想把spectator的视角用imshow独立的窗口展示出来,但自己上手后没有成功,可能是imshow的格式属性不对,也可能是别的问题.需要在后续的学习中可以实现.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。