Hi Community,
I’m working on a project using the OS0-32 with ROS2 Humble for 2D object detection. As an intermediate step, I’m trying to extract a .pcap file from a rosbag recorded with ouster-ros to use it in my script.
I haven’t found any direct tools for this, so I’m currently using a custom script that extracts the UDP packets from the rosbag and generates a .pcap file. When I replay this .pcap using the ouster-ros commands, the playback works correctly and the images display as expected. However, when I try to access it directly using open_scan(...), it doesn’t work.
My question is: is there a recommended way to generate a .pcap file from a rosbag that can be used like the ones from Ouster Studio (e.g., Urban_Drive recordings), or is this approach not feasible?
This is my script for the extraction :
import argparse
import rosbag2_py
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
from ouster_sensor_msgs.msg import PacketMsg
from scapy.all import Ether, IP, UDP, Raw, PcapWriter
# ✅ Destination ports MUST match metadata
PORT_MAP = {
"/ouster/lidar_packets": 46121, # udp_port_lidar
"/ouster/imu_packets": 43466, # udp_port_imu
}
def bag_to_pcap_ros2(bag_path: str, pcap_path: str):
# Configure rosbag2 reader
storage_options = rosbag2_py.StorageOptions(
uri=bag_path,
storage_id="sqlite3"
)
converter_options = rosbag2_py.ConverterOptions(
input_serialization_format="cdr",
output_serialization_format="cdr"
)
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)
# Discover topics and types in the bag
topic_types = reader.get_all_topics_and_types()
type_map = {t.name: t.type for t in topic_types}
# ✅ Sanity check
for topic in PORT_MAP:
if topic not in type_map:
raise RuntimeError(
f"Required topic {topic} not found in bag.\n"
f"Available topics: {list(type_map.keys())}"
)
# Open PCAP writer
pcap_writer = PcapWriter(pcap_path, append=False, sync=True)
# ✅ Fake but valid network parameters
SRC_MAC = "02:00:00:00:00:01"
DST_MAC = "02:00:00:00:00:02"
SRC_IP = "192.168.131.1"
DST_IP = "192.168.131.50" # from metadata
SRC_PORT = 7502 # arbitrary / ignored by SDK
pkt_count = 0
while reader.has_next():
topic, data, t = reader.read_next()
if topic not in PORT_MAP:
continue
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
# ✅ Raw UDP payload
payload = bytes(msg.buf)
scapy_pkt = (
Ether(src=SRC_MAC, dst=DST_MAC) /
IP(src=SRC_IP, dst=DST_IP) /
UDP(sport=SRC_PORT, dport=PORT_MAP[topic]) /
Raw(load=payload)
)
# ✅ Preserve ROS timestamp
scapy_pkt.time = t / 1e9
pcap_writer.write(scapy_pkt)
pkt_count += 1
if pkt_count % 10000 == 0:
print(f"Wrote {pkt_count} packets...")
pcap_writer.close()
print(f"✅ Done. Total packets written: {pkt_count}")
def main():
parser = argparse.ArgumentParser(
description="Convert ROS 2 Ouster packet bag to PCAP"
)
parser.add_argument(
"bag_path",
help="Path to ROS 2 bag directory"
)
parser.add_argument(
"pcap_path",
help="Output PCAP file path"
)
args = parser.parse_args()
bag_to_pcap_ros2(args.bag_path, args.pcap_path)
if __name__ == "__main__":
main()
This my commands for the rosbag and the .pcap replay :
ros2 launch ouster_ros record.launch.xml \
sensor_hostname:=<sensor host name> \
bag_file:=<optional bag file name> \
metadata:=<json file name>
ros2 launch ouster_ros replay_pcap.launch.xml \
pcap_file:=<path to ouster pcap file> \
metadata:=<json file name> # required