SDK 0.16 ‘sensor_replay’ [BETA] Feature

Feature Overview

The feature allows users to replay existing captures as if they were coming from an actual sensor.

Usage:

ouster-cli source <source_url> sensor_replay [OPTIONS]

This will command will start a:

  • an HTTP server to respond to sensor REST APIs calls

  • a UDP stream of sensor packets representing Lidar and IMU packets

  • And an mDNS service for virtual sensor discovery

Different apps would then be able to connect to the virtual sensor as shown below:

Usage Patterns

when you launch the ouster-cli soruce <source_url> sensor_replay first time with no additional options it will offer you some instructions on how it can be used, there are mainly two modes of operation to it:

  • To use without docker:

    • Server:

      • ouster-cli source <source_url> sensor_reply --http-port 8080
        
    • Client:

      • http_proxy=http://0.0.0.0:8080 ouster-cli source 127.0.0.1 viz
        
  • To use with docker

    • Precondition: must have followed the docker postinstall guide: https://docs.docker.com/engine/install/linux-postinstall/

    • Server:

      • ouster-cli <source_url> sensor_replay --dockerize # or just /-d/
        
      • First time users will be prompted to construct the docker image (1 min)

    • Client:

      • ouster-cli 127.0.0.1 viz OR ./OusterStudio-2.3.1.AppImage
        
  • Other alternative approaches:

    • ReverseProxy (NGIX)

    • Elevated Permissions for ouster-cli

Command OPTIONS

Following is a full listing of the command available options

Usage: ouster-cli source SOURCE sensor_replay [OPTIONS]

  [BETA] Replay PCAP|BAG|OSF|MCAP as a Sensor

Options:
  -d, --dockerize                 Run the sensor replay inside a Docker
                                  container
  -p, --http-port INTEGER         Change the port used by the sensor HTTP
                                  server  [default: 80]
  --lidar-port INTEGER            lidar port  [default: -1]
  --imu-port INTEGER              imu port  [default: -1]
  --loop BOOLEAN                  When playback reaches the end of the file,
                                  restart from the begnning  [default: True]
  --rate FLOAT                    udp packets replay rate.  [default: 1.0]
  --udp-dest TEXT                 The destination IP to send the replayed
                                  packets to.  [default: 127.0.0.1]
  --operating-mode [NORMAL|STANDBY]
                                  Set sensor Operating Mode on startup.
                                  [default: NORMAL]
  --sensor-sn TEXT                Override Sensor SN.  [default: ""]
  --hide-diagnostics              Hide diagnostic information.
  --help                          Show this message and exit.

Use Cases

  • Nothing replaces the interactions with the real sensor (users will need a physical sensor to confirm their application)

  • However, one could use this feature to kick start their development effort

  • Replicate multiple sensors: create multiple virtual sensors for stress test the system

  • Speed up testing: for example switching between NORMAL & STANDBY modes faster for testing application lifecycle (~ takes 30 seconds on a real sensor).

  • Use to convert to a data format that not directly supported by ouster-cli

Known Limitations

  • Certain properties are mutable while others are immutable, for example it is possible to mock the sensor ports or operating mode but it is not possible to properly mutate the LidarMode property

  • The feature doesn’t support multi-sensor datasets yet. If you supply a multisensor dataset only one sensor will be responding to http call and the published packets will be mixed up

  • It is not yet possible to start a sensor_replay service under MacOS due to NECP restrictions

2 Likes

Hello,

I modified the initial source_replay.py script to add a feature to ask the virtual lidar to generate Alerts on demand.

Replace the initial script with the following :

How to generate Alerts on demand

  1. Run the Virtual Lidar as explained previously
  2. In a Client terminal, inject an alert into the virtual lidar (here is the Table of All Alerts and Errors that can be generated by the lidar).

curl -X POST http://127.0.0.1:8080/api/v1/dev/inject_alert
-H “Content-Type: application/json”
-d ‘{
“id”: “Alert_ID”,
“level”: “Alert_Level”,
“category”: “Alert_Category”,
“msg”: “Alert_Message”,
“msg_verbose”: “Alert_DESCRIPTION”
}’

  1. Get the alert from the Virtual Lidar :

curl -i -X GET http://127.0.0.1:8080/api/v1/sensor/alerts

  1. Clear the lidar Alert :

curl -X POST http://127.0.0.1:8080/api/v1/dev/clear_alerts

The virtual lidar generates now alerts on demand and consequently, intercepts the UDP stream and updates the Alert Flags accordingly (see the Packet Header description to get more information about the Alert Flags.

Real-Time Packet Header Flags Sniffing

Here is a script to monitor the Alert Flags in the Packet Header

import socket
import argparse
import sys

def main():
    # Setup command line arguments
    parser = argparse.ArgumentParser(description="Ouster Physical Sensor UDP Alert Sniffer")
    parser.add_argument("--port", type=int, default=7502, 
                        help="UDP port to listen on (default: 7502)")
    parser.add_argument("--sensor", type=str, default=None, 
                        help="Sensor IP or hostname (e.g., os-122518000325.local) to filter traffic.")
    args = parser.parse_args()

    # Resolve hostname to IP if a sensor is specified
    filter_ip = None
    if args.sensor:
        try:
            filter_ip = socket.gethostbyname(args.sensor)
            print(f"🔍 Resolved {args.sensor} to IP: {filter_ip}")
        except socket.gaierror:
            print(f"❌ Error: Could not resolve {args.sensor}. Ensure the sensor is powered on and connected.")
            sys.exit(1)

    # 0.0.0.0 means "Listen on all network interfaces of this computer"
    UDP_IP = "0.0.0.0"
    UDP_PORT = args.port

    # Creating the UDP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    try:
        sock.bind((UDP_IP, UDP_PORT))
    except OSError as e:
        print(f"❌ Error binding to port {UDP_PORT}: {e}")
        print("Make sure no other program (like Ouster Studio or ROS) is currently using this port.")
        sys.exit(1)

    print(f"📡 Listening for Ouster UDP packets on port {UDP_PORT}...")
    if filter_ip:
        print(f"🎯 Filtering traffic exclusively for sensor: {filter_ip}")
    print("Waiting for an alert state change... (Press Ctrl+C to stop)\n")

    last_flags = None

    try:
        while True:
            # Wait and read the next UDP packet
            data, addr = sock.recvfrom(65535)
            sender_ip = addr[0]

            # If a specific sensor was requested, ignore packets from other devices
            if filter_ip and sender_ip != filter_ip:
                continue
            
            # An Ouster packet header is at least 32 bytes long
            if len(data) >= 32:
                # Capture our target 16th byte (Index 15)
                current_flags = data[15]
                
                # Print only if the value has just changed
                if current_flags != last_flags:
                    
                    # Extract bits using bitwise masks
                    alerts_active = (current_flags >> 7) & 1
                    cursor_overflow = (current_flags >> 6) & 1
                    alert_cursor = current_flags & 0x3F  # 0x3F is 00111111
                    
                    # Pretty formatting in 8-bit binary
                    bin_str = format(current_flags, '08b')
                    
                    print(f"⚠️ NEW STATE DETECTED IN UDP HEADER (from {sender_ip}):")
                    print(f"   Raw value (Byte 15) : {bin_str}")
                    print(f"   ├─ [Bit 7] Alerts Active : {'🔴 1 (YES)' if alerts_active else '🟢 0 (NO)'}")
                    print(f"   ├─ [Bit 6] Overflow      : {'🟡 1 (YES)' if cursor_overflow else '🟢 0 (NO)'}")
                    print(f"   └─ [Bit 0-5] Cursor      : {alert_cursor}")
                    print("-" * 50)
                    
                    last_flags = current_flags

    except KeyboardInterrupt:
        print("\nStopping network analyzer.")

if __name__ == "__main__":
    main()

How to use it

  1. Run the Virtual sensor as described above
  2. Run the Alert Flags sniffer
  • Use with the virtual sensor
    python3 Ouster_Alert_Bits_Sniffer.py --sensor 127.0.0.1

  • Use with a physical sensor
    python3 Ouster_Alert_Bits_Sniffer.py --sensor os-122427000408.local

  • With the sensor IP
    python3 Ouster_Alert_Bits_Sniffer.py --sensor 169.254.244.106

  • With a specific port# (by default 7502)
    python3 Ouster_Alert_Bits_Sniffer.py --sensor 169.254.244.106 --port 9001

1 Like