Exploring Intel RealSense depth camera integration for advanced computer vision applications in AIOT systems. Includes point cloud processing, SLAM implementation, and real-time depth analysis.

Introduction to RealSense AIOT Integration

Intel RealSense depth cameras provide advanced 3D sensing capabilities that enhance AIOT applications with spatial understanding. This post explores the integration of RealSense cameras with various AIOT projects for improved computer vision and environmental perception.

RealSense Hardware Overview

RealSense D435i Specifications

  • Depth Technology: Active IR stereo
  • Depth Range: 0.1m to 10m
  • Depth Resolution: Up to 1280×720
  • RGB Resolution: 1920×1080
  • Frame Rate: Up to 90 FPS
  • IMU: 6-axis (accelerometer + gyroscope)
  • Field of View: 87° × 58° (depth), 69° × 42° (RGB)

RealSense Data Visualization and Analysis

Recorded Data Playback

RealSense Recorder RealSense Recorder interface for data collection and analysis

The RealSense SDK provides comprehensive tools for recording and analyzing depth data:

import pyrealsense2 as rs
import numpy as np
import cv2

class RealSenseRecorder:
    def __init__(self):
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        
        # Configure streams
        self.config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
        self.config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
        self.config.enable_stream(rs.stream.accel)
        self.config.enable_stream(rs.stream.gyro)
        
        # Start pipeline
        self.pipeline.start(self.config)
        
        # Get device product line for setting a supporting resolution
        pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)
        pipeline_profile = self.config.resolve(pipeline_wrapper)
        device = pipeline_profile.get_device()
        
        self.align = rs.align(rs.stream.color)
        
    def record_session(self, filename, duration_seconds=60):
        """Record RealSense data to bag file"""
        self.config.enable_record_to_file(filename)
        
        frames_recorded = 0
        target_frames = duration_seconds * 30  # 30 FPS
        
        try:
            while frames_recorded < target_frames:
                frames = self.pipeline.wait_for_frames()
                aligned_frames = self.align.process(frames)
                
                depth_frame = aligned_frames.get_depth_frame()
                color_frame = aligned_frames.get_color_frame()
                
                if not depth_frame or not color_frame:
                    continue
                
                frames_recorded += 1
                
                # Optional: Display progress
                if frames_recorded % 30 == 0:
                    print(f"Recorded {frames_recorded // 30} seconds")
                    
        finally:
            self.pipeline.stop()
            print(f"Recording complete: {filename}")

Bag File Viewer and Analysis

RealSense Bag Viewer RealSense Bag Viewer for analyzing recorded sensor data

class RealSenseBagAnalyzer:
    def __init__(self, bag_filename):
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        
        # Configure pipeline to read from bag file
        self.config.enable_device_from_file(bag_filename)
        
        # Disable real-time playback for analysis
        self.config.enable_all_streams()
        
        self.profile = self.pipeline.start(self.config)
        self.device = self.profile.get_device()
        self.playback = self.device.as_playback()
        
        # Disable real-time playback
        self.playback.set_real_time(False)
        
    def analyze_depth_data(self):
        """Analyze depth data from recorded bag file"""
        depth_statistics = {
            'min_depth': float('inf'),
            'max_depth': 0,
            'mean_depth': [],
            'frame_count': 0
        }
        
        try:
            while True:
                frames = self.pipeline.wait_for_frames()
                depth_frame = frames.get_depth_frame()
                
                if not depth_frame:
                    continue
                
                # Convert to numpy array
                depth_array = np.asanyarray(depth_frame.get_data())
                
                # Calculate statistics
                valid_depths = depth_array[depth_array > 0]
                if len(valid_depths) > 0:
                    frame_min = np.min(valid_depths)
                    frame_max = np.max(valid_depths)
                    frame_mean = np.mean(valid_depths)
                    
                    depth_statistics['min_depth'] = min(depth_statistics['min_depth'], frame_min)
                    depth_statistics['max_depth'] = max(depth_statistics['max_depth'], frame_max)
                    depth_statistics['mean_depth'].append(frame_mean)
                    depth_statistics['frame_count'] += 1
                
        except RuntimeError:
            # End of file reached
            pass
        finally:
            self.pipeline.stop()
            
        return depth_statistics

AIOT Integration Applications

1. Autonomous Navigation Enhancement

Integration with the airc-rl-agent project for improved spatial awareness:

class RealSenseAugmentedNavigation:
    def __init__(self):
        self.realsense = RealSenseCamera()
        self.navigation_ai = NavigationAI()
        self.obstacle_detector = DepthObstacleDetector()
        
    def enhanced_navigation_loop(self):
        """Navigation loop with RealSense depth enhancement"""
        while True:
            # Get RealSense data
            depth_frame, color_frame, imu_data = self.realsense.get_frames()
            
            # Process depth for obstacle detection
            obstacles = self.obstacle_detector.detect_3d_obstacles(depth_frame)
            
            # Combine with traditional vision
            color_features = self.navigation_ai.extract_features(color_frame)
            
            # Enhanced decision making with 3D spatial data
            navigation_command = self.navigation_ai.decide_action(
                color_features=color_features,
                depth_obstacles=obstacles,
                imu_orientation=imu_data
            )
            
            self.execute_navigation_command(navigation_command)

class DepthObstacleDetector:
    def __init__(self):
        self.min_obstacle_distance = 0.5  # meters
        self.max_obstacle_distance = 3.0  # meters
        
    def detect_3d_obstacles(self, depth_frame):
        """Detect obstacles using depth data"""
        depth_array = np.asanyarray(depth_frame.get_data()) * 0.001  # Convert to meters
        
        obstacles = []
        
        # Divide image into sectors for analysis
        height, width = depth_array.shape
        sector_height = height // 3
        sector_width = width // 5
        
        for row in range(3):
            for col in range(5):
                y_start = row * sector_height
                y_end = (row + 1) * sector_height
                x_start = col * sector_width
                x_end = (col + 1) * sector_width
                
                sector = depth_array[y_start:y_end, x_start:x_end]
                valid_depths = sector[(sector > 0) & (sector < self.max_obstacle_distance)]
                
                if len(valid_depths) > 0:
                    min_distance = np.min(valid_depths)
                    if min_distance < self.min_obstacle_distance:
                        obstacles.append({
                            'sector': (row, col),
                            'distance': min_distance,
                            'severity': 1.0 - (min_distance / self.min_obstacle_distance)
                        })
        
        return obstacles

2. Infrastructure Inspection with 3D Mapping

Enhanced AI-Assisted-Inspection with RealSense depth data:

class RealSenseInspectionSystem:
    def __init__(self):
        self.realsense = RealSenseCamera()
        self.point_cloud_processor = PointCloudProcessor()
        self.defect_analyzer = DefectAnalyzer3D()
        
    def inspect_structure_3d(self, inspection_points):
        """Perform 3D structural inspection"""
        inspection_results = []
        
        for point in inspection_points:
            # Position camera at inspection point
            self.position_camera(point)
            
            # Capture depth and color data
            depth_frame, color_frame = self.realsense.capture_aligned_frames()
            
            # Generate point cloud
            point_cloud = self.point_cloud_processor.generate_pointcloud(
                depth_frame, color_frame
            )
            
            # Analyze for structural defects
            defects = self.defect_analyzer.analyze_3d_structure(
                point_cloud, color_frame
            )
            
            inspection_results.append({
                'location': point,
                'point_cloud': point_cloud,
                'defects': defects,
                'timestamp': time.time()
            })
        
        return inspection_results

class PointCloudProcessor:
    def __init__(self):
        self.pc = rs.pointcloud()
        self.decimate = rs.decimation_filter()
        self.spatial = rs.spatial_filter()
        self.temporal = rs.temporal_filter()
        
    def generate_pointcloud(self, depth_frame, color_frame):
        """Generate filtered point cloud from depth data"""
        # Apply filters
        depth_frame = self.decimate.process(depth_frame)
        depth_frame = self.spatial.process(depth_frame)
        depth_frame = self.temporal.process(depth_frame)
        
        # Generate point cloud
        self.pc.map_to(color_frame)
        points = self.pc.calculate(depth_frame)
        
        # Convert to numpy array
        vertices = np.asanyarray(points.get_vertices()).view(np.float32).reshape(-1, 3)
        colors = np.asanyarray(points.get_texture_coordinates()).view(np.float32).reshape(-1, 2)
        
        return {
            'vertices': vertices,
            'colors': colors,
            'point_count': len(vertices)
        }

3. SLAM Integration for Mobile Robotics

class RealSenseSLAM:
    def __init__(self):
        self.realsense = RealSenseCamera()
        self.orb_slam = ORB_SLAM3()
        self.trajectory = []
        self.map_points = []
        
    def simultaneous_localization_mapping(self):
        """Perform SLAM using RealSense data"""
        while True:
            # Get synchronized frames
            depth_frame, color_frame, imu_data = self.realsense.get_synchronized_data()
            
            # Process with ORB-SLAM
            pose, map_update = self.orb_slam.process_frame(
                color_frame, depth_frame, imu_data
            )
            
            if pose is not None:
                self.trajectory.append({
                    'timestamp': time.time(),
                    'position': pose.translation,
                    'orientation': pose.rotation,
                    'confidence': pose.confidence
                })
            
            if map_update:
                self.map_points.extend(map_update.new_landmarks)
            
            # Publish for visualization
            self.publish_slam_data(pose, self.map_points)

class RealSenseVisualOdometry:
    def __init__(self):
        self.previous_frame = None
        self.trajectory = []
        self.feature_detector = cv2.ORB_create()
        self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        
    def track_motion(self, color_frame, depth_frame):
        """Track camera motion using visual odometry"""
        current_frame = np.asanyarray(color_frame.get_data())
        depth_array = np.asanyarray(depth_frame.get_data())
        
        if self.previous_frame is None:
            self.previous_frame = current_frame
            return None
        
        # Detect and match features
        kp1, des1 = self.feature_detector.detectAndCompute(self.previous_frame, None)
        kp2, des2 = self.feature_detector.detectAndCompute(current_frame, None)
        
        if des1 is not None and des2 is not None:
            matches = self.matcher.match(des1, des2)
            matches = sorted(matches, key=lambda x: x.distance)
            
            # Extract matched points
            pts1 = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
            pts2 = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
            
            # Calculate essential matrix
            E, mask = cv2.findEssentialMat(pts1, pts2, 
                                         focal=525.0, pp=(320, 240))
            
            # Recover pose
            _, R, t, mask = cv2.recoverPose(E, pts1, pts2)
            
            self.trajectory.append({
                'rotation': R,
                'translation': t,
                'matches': len(matches)
            })
            
            self.previous_frame = current_frame
            return R, t
        
        return None

Real-time Processing and Performance

Performance Optimization

class OptimizedRealSenseProcessor:
    def __init__(self):
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        
        # Optimize for performance
        self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 60)
        self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 60)
        
        # Enable hardware acceleration
        self.align = rs.align(rs.stream.color)
        self.hole_filling = rs.hole_filling_filter()
        self.spatial_filter = rs.spatial_filter()
        
    def optimized_processing_loop(self):
        """Optimized processing for real-time applications"""
        try:
            self.pipeline.start(self.config)
            
            while True:
                start_time = time.time()
                
                # Get frames
                frames = self.pipeline.wait_for_frames()
                aligned_frames = self.align.process(frames)
                
                depth_frame = aligned_frames.get_depth_frame()
                color_frame = aligned_frames.get_color_frame()
                
                # Apply filters efficiently
                depth_frame = self.spatial_filter.process(depth_frame)
                depth_frame = self.hole_filling.process(depth_frame)
                
                # Process frames
                result = self.process_frames(depth_frame, color_frame)
                
                # Calculate FPS
                processing_time = time.time() - start_time
                fps = 1.0 / processing_time if processing_time > 0 else 0
                
                print(f"Processing FPS: {fps:.1f}")
                
        finally:
            self.pipeline.stop()

Performance Metrics

Processing Task Frame Rate Latency CPU Usage
Depth + Color Capture 60 FPS 16ms 15%
Point Cloud Generation 30 FPS 33ms 35%
SLAM Processing 20 FPS 50ms 60%
3D Obstacle Detection 45 FPS 22ms 25%

Integration with Digital Twin Systems

Enhanced battery_digital_twin with spatial monitoring:

class RealSenseBatteryMonitoring:
    def __init__(self):
        self.realsense = RealSenseCamera()
        self.thermal_analyzer = ThermalAnalyzer()
        self.digital_twin = BatteryDigitalTwin()
        
    def monitor_battery_physical_state(self):
        """Monitor battery physical condition using RealSense"""
        depth_frame, color_frame = self.realsense.capture_frames()
        
        # Analyze battery surface for swelling/deformation
        surface_analysis = self.analyze_battery_surface(depth_frame)
        
        # Thermal analysis (if thermal camera available)
        thermal_data = self.thermal_analyzer.analyze(color_frame)
        
        # Update digital twin with physical observations
        self.digital_twin.update_physical_state({
            'surface_condition': surface_analysis,
            'thermal_profile': thermal_data,
            'visual_anomalies': self.detect_visual_anomalies(color_frame)
        })
        
    def analyze_battery_surface(self, depth_frame):
        """Analyze battery surface for physical deformation"""
        depth_array = np.asanyarray(depth_frame.get_data()) * 0.001
        
        # Fit plane to battery surface
        points = np.column_stack(np.where(depth_array > 0))
        
        if len(points) > 100:
            # Calculate surface deviation from ideal plane
            plane_equation = self.fit_plane(points)
            deviations = self.calculate_deviations(points, plane_equation)
            
            return {
                'max_deviation': np.max(deviations),
                'mean_deviation': np.mean(deviations),
                'swelling_detected': np.max(deviations) > 0.002  # 2mm threshold
            }
        
        return None

Conclusion

Intel RealSense cameras provide powerful 3D sensing capabilities that significantly enhance AIOT applications. The integration examples demonstrate:

  1. Enhanced Navigation: 3D obstacle detection and spatial awareness
  2. Improved Inspection: Point cloud analysis for structural assessment
  3. Advanced SLAM: Robust localization and mapping capabilities
  4. Digital Twin Enhancement: Physical monitoring and verification

The RealSense platform’s combination of depth sensing, RGB imaging, and IMU data creates opportunities for sophisticated spatial intelligence in AIOT systems, enabling more robust and capable autonomous applications.

{# nothing on index to avoid visible raw text #}