저는 자동차 산업에 종사하는 소프트웨어 엔지니어로서 자율주행차, 데이터 측정 기술, 분석 방법에 관심이 많습니다. 이 게시물에서는 맞춤형 측정 시스템에 대해 설명하고 프로세스를 처음부터 자세히 설명하고 몇 가지 실험 결과를 제시하겠습니다. 내 데이터 로거는 Raspberry Pi 3, BNO055 센서, ELM327 OBD-II 어댑터로 구성되어 있으며 각각 컴퓨팅, 가속도 및 회전 속도 데이터 수집, 실제 차량에서 엔진 정보 검색을 담당합니다.
1.배경
2.건축
3.센서정보
4.좌표계정보
5.데이터 비교
6.핵심 로직
7.결론
8.참고자료
최근 IoT, 인공지능, 엣지 컴퓨팅의 등장으로 자동차 부문을 비롯한 다양한 산업이 크게 발전했습니다. 현대 자동차에는 정교한 시스템을 만들기 위해 함께 작동하는 다양한 센서가 장착되어 있어 향상된 안전 기능부터 자율 주행 기능에 이르기까지 모든 것이 가능합니다.
일반적으로 차량 상태를 모니터링하려면 정확하고 안정적인 측정을 달성하기 위해 값비싼 첨단 센서를 사용해야 합니다. 그러나 차량 역학에 대한 일반적인 이해를 위해 회로에 대한 데이터를 얻는 것과 같은 특정 상황에서는 이러한 고급 센서가 반드시 필요하지 않을 수도 있습니다. 이를 인식하여 보다 저렴한 부품을 사용하여 차량의 물리적 상태를 측정할 수 있는 비용 효율적인 시스템을 만들기로 결정했습니다.
승차감보다는 차량 동역학과 관련된 데이터 측정에 집중하기로 했습니다. 승차감을 분석하려면 적절한 정확도로 더 많은 고주파 신호를 캡처해야 하며, 이는 측정 중에 높은 샘플링 주파수를 설정해야 하기 때문입니다.
구체적으로 조향각에 따른 횡가속도의 주파수 응답 특성에서는 차량 속도에 따라 다르지만 일반적으로 공진주파수가 5Hz 이하로 나타난다. 한편, 승차감 분석에서는 수십Hz 범위까지 논의가 확장되는 경우가 많습니다.
따라서 측정 시스템 개발의 주요 목표는 차량 동역학 영역의 데이터 분석을 용이하게 하는 저렴한 데이터 로깅 시스템을 만드는 것입니다.
측정 시스템은 다음 하드웨어로 구성됩니다.
라즈베리 파이는 BNO055와 ELM327에서 데이터를 수집하는 메인 컴퓨터 역할을 합니다. Raspberry Pi는 I2C를 통해 BNO055와 통신하고 Bluetooth를 통해 ELM327과 통신합니다(그림 1 참조).
BNO055 센서는 다양한 유형의 동작과 데이터를 측정할 수 있습니다. ★Adafruit BNO055 개요[1]
절대 방향(Euler Vector, 100Hz): 360° 구를 기준으로 3축 방향 데이터를 제공합니다.
절대 방향(쿼터니언, 100Hz): 보다 정확한 데이터 조작을 위해 4포인트 쿼터니언 출력을 제공합니다.
각속도 벡터(100Hz): 세 축에 걸쳐 초당 라디안(rad/s) 단위로 회전 속도를 측정합니다.
가속도 벡터(100Hz): 세 축에 걸쳐 중력 및 선형 운동을 포함한 가속도를 초당 제곱미터(m/s²) 단위로 캡처합니다.
자기장 강도 벡터(20Hz): 세 축에 걸친 자기장 강도를 마이크로테슬라(μT) 단위로 감지합니다.
선형 가속도 벡터(100Hz): 세 축에 걸쳐 선형 가속도 데이터(중력 제외)를 초당 제곱미터(m/s²) 단위로 기록합니다.
중력 벡터(100Hz): 3개 축에 걸쳐 중력 가속도(모든 움직임 제외)를 초당 제곱미터(m/s²) 단위로 측정합니다.
온도(1Hz): 주변 온도(섭씨)를 제공합니다.
BNO055는 컴팩트하며 Adafruit_CircuitPython이라는 Adafruit의 Python 라이브러리에서 지원됩니다. 또한 Adafruit에서 C 언어 라이브러리를 사용할 수 있습니다.
★Adafruit_CircuitPython_BNO055[2]
★Adafruit_BNO055[3]
시연을 보려면 여기에서 비디오를 시청하세요.
Adafruit BNO055 데모
★Adafruit BNO055 데모[4]
ELM327은 표준 인터페이스를 통해 차량 엔진 데이터에 액세스할 수 있게 해주는 널리 사용되는 OBD-II(온보드 진단) 어댑터입니다. 차량의 ECU(전자 제어 장치)와 컴퓨터, 스마트폰 등 외부 장치 사이의 브리지 역할을 하여 진단 및 성능 데이터를 검색할 수 있습니다. ELM327의 주요 기능은 다음과 같습니다. ★ELM327 정보[5]
OBD-II 프로토콜 지원: ELM327은 ISO 9141, ISO 14230(KWP2000), ISO 15765(CAN) 등을 포함한 다양한 OBD-II 프로토콜을 지원하므로 광범위한 차량과 호환됩니다.
진단 문제 코드(DTC): 차량 ECU에서 진단 문제 코드를 읽고 지울 수 있어 문제 식별 및 문제 해결에 도움이 됩니다.
라이브 데이터 스트리밍: 엔진 RPM, 차량 속도, 냉각수 온도, 연료량 등 차량 센서의 실시간 데이터를 제공합니다.
프레임 데이터 고정: 오류가 감지되는 순간의 데이터를 캡처하고 저장하여 간헐적인 문제 진단에 도움이 됩니다.
차량 정보: VIN(차량 식별 번호), 보정 ID 등을 포함하여 차량에 대한 세부 정보를 검색합니다.
호환성: 블루투스, USB, Wi-Fi 버전 등 다양한 형태로 제공되어 다양한 기기 및 플랫폼과 호환 가능합니다.
일반적으로 ELM327은 차량 상태, 특히 엔진 상태를 진단하는 데 사용됩니다. 그러나 차량에서 광범위한 데이터를 수집하는 데에도 사용할 수 있습니다. ELM327은 Amazon과 같은 플랫폼에서 구매할 수 있으며 가격은 약 $20~$80입니다.
python-OBD 라이브러리를 사용하면 ELM327 어댑터를 통해 데이터를 수집하는 쿼리를 생성할 수 있으므로 사용자 정의 가능하고 상세한 데이터 수집이 가능합니다.
이 시스템은 여러 센서 및 실제 차량과 상호 작용하므로 각 좌표계를 이해해야 합니다.
아래 이미지는 내 데이터 측정 시스템의 BNO055 좌표계를 보여줍니다. 내 시스템은 쿼터니언을 사용하여 회전 각도를 추가로 계산합니다. 센서 좌표계는 차량 좌표계와 일치해야 합니다.
측정 시스템은 차량 좌표계를 기준으로 작동합니다.
검증 작업을 위해 가속도, 자이로 등 여러 물리적 데이터 포인트를 측정할 수 있는 iPhone 애플리케이션을 사용했습니다. iPhone 애플리케이션을 사용하는 목적은 이 애플리케이션의 데이터로 내 데이터 측정 시스템의 유효성을 확인하는 것입니다. 그림 4는 Apple 개발자 사이트에서 인용한 것입니다.
이 섹션에서는 몇 가지 실험 결과를 제시합니다. 첫 번째 단계로 측정 시스템이 단위 실수 등의 오류 없이 판독값을 출력하는지 확인하기 위해 사인파 작업을 수행했습니다. 이 부분은 기본적이지만 올바른 데이터 수집 방법을 보장하는 데 중요합니다.
다음으로 실제 차량을 주행하면서 몇 가지 물리적인 값을 측정해봤습니다.
이어서 ELM327의 성능과 각 센서의 샘플링 정확도를 확인해보았습니다.
사인파 테스트를 진행했고, 이번 섹션에서는 그 결과를 소개합니다. 이 테스트의 목적은 다음을 확인하는 것입니다.
이를 위해 저는 다양한 신체 수치를 측정할 수 있는 스마트폰 앱인 "phyphox" ★phyphox[8]를 사용했습니다. 아래 [그림 5]와 같이 BNO055 센서를 아이폰에 장착했습니다.
그림 6은 샘플링 주파수 10Hz의 사인파 테스트 결과를 보여줍니다. 범례 "VDDM"은 제가 개발한 측정 시스템으로 수집한 데이터를 나타내고, "iPhone"은 "phyphox"를 사용하여 수집한 데이터를 나타냅니다.
VDDM과 iPhone의 데이터는 모든 축에서 유사한 추세를 보여 두 애플리케이션이 유사한 모션 패턴을 캡처하고 있음을 나타냅니다.
VDDM 데이터는 특히 가속도 그래프에서 약간 더 가변적인 것으로 보이며, 이는 iPhone에 비해 감도가 더 높거나 다른 필터링 접근 방식을 제안할 수 있습니다.
두 데이터는 일반적으로 잘 일치하지만 진폭의 차이로 인해 정확도 요구 사항에 따라 추가 보정이나 조정이 필요할 수 있습니다.
VDDM은 특히 iPhone 데이터에는 없는 가속도 그래프에서 추가 노이즈나 고주파 성분을 캡처할 수 있습니다. 이는 자세한 분석에 도움이 될 수도 있고 추가 필터링이 필요할 수도 있습니다.
그림 7은 실제 차량에서 BNO055의 데이터를 중심으로 실험한 결과입니다. 참고자료로는 'phyphox'라는 아이폰용 애플리케이션을 사용했습니다. 샘플링 주파수는 50Hz였습니다. 차량에서 개발한 데이터 로깅 시스템으로 측정한 가속도 및 회전 신호에 노이즈, 이상값, NaN 값이 포함되어 있음을 관찰했습니다. 그래서 IQR 방법과 버터워스 저역 통과 필터를 포함한 후처리 기능을 적용했습니다. 각 그래프의 범례에 있는 "fc"라는 용어는 차단 주파수를 나타냅니다. 예를 들어 fc_19.5는 차단 주파수가 19.5Hz임을 나타냅니다. 참고로 본 실험은 BNO055 센서만을 이용하여 진행되었으며, ELM327 기능은 비활성화되어 있습니다.
VDDM(원시 데이터)과 iPhone의 데이터는 모든 축에서 유사한 추세를 보여줍니다. 이는 두 시스템이 동일한 모션 패턴을 캡처하고 있음을 의미합니다.
VDDM의 데이터는 iPhone 데이터에 비해 변동폭이 더 큰 것으로 보입니다. 이는 가속도 그래프에서 특히 두드러지며 VDDM이 더 높은 감도로 데이터를 캡처하거나 다른 필터링 접근 방식을 사용함을 나타낼 수 있습니다.
VDDM(fc 19.5Hz 및 10Hz)에서 필터링된 데이터는 원시 데이터에 비해 변형이 감소하고 iPhone 데이터에 더 가깝습니다. 이는 VDDM(원시 데이터)에 더 많은 노이즈나 고주파수 구성요소가 포함될 수 있음을 나타냅니다.
VDDM의 원시 데이터에는 이상값과 누락된 값이 포함되어 있었는데, 이는 필터를 적용하여 완화되었습니다. 이는 특히 롤 레이트와 피치 레이트 그래프에서 확연히 드러납니다.
VDDM은 아이폰에 비해 고주파 성분과 노이즈가 더 많이 포함된 것으로 보입니다. 이는 상세한 분석에 유리할 수 있지만 노이즈 감소가 중요한 경우 추가 필터링이나 하드웨어 수준 조정이 필요할 수 있습니다.
그림 8은 엔진 성능에 관한 데이터를 보여줍니다. 이 데이터는 샘플링 주파수가 4Hz인 ELM327 스캐너를 사용하여 수집되었습니다. 데이터 수집 중에는 BNO055 기능이 비활성화되었습니다.
BNO055 및 ELM327을 실험하는 동안 샘플링이 지연되는 것을 관찰했습니다. 문제를 식별하기 위해 샘플링 주파수를 조정하고 ELM327 및 BNO055를 활성화 및 비활성화하여 측정을 구현했습니다.
처음에는 샘플링 주파수에 해당하는 시간차를 제시합니다. 이 테스트에는 BNO055 및 ELM327 센서를 모두 사용합니다
그림 9를 보면 샘플링 주파수를 10Hz로 설정하면 샘플링 시간의 정밀도가 크게 떨어지는 것을 알 수 있습니다.
ELM327은 BNO055만큼 높은 샘플링 주파수를 달성할 수 없으므로 전체 시스템 성능에 부정적인 영향을 미칠 수 있습니다. 그럼에도 불구하고, 각 센서의 성능을 더 잘 이해하기 위해 개별 측정을 진행했습니다.
그림 10과 11은 샘플링 주파수와 샘플링 타이밍 정밀도 사이의 관계를 보여줍니다. 샘플링 타이밍 정밀도는 다음과 같이 표시됩니다.
그림 10에 따르면 BNO055는 최대 50Hz의 샘플링 주파수에 응답할 수 있습니다. 그러나 60Hz를 초과하면 샘플링 타이밍 정밀도가 급격하게 떨어집니다.
그림 11은 ELM327이 BNO055에 비해 응답성이 낮다는 것을 보여줍니다. ELM327은 최대 5Hz까지만 우수한 샘플링 타이밍 정밀도를 유지합니다. 이는 ELM327이 Obd를 통해 차량과 통신하는데, 차량의 ECU가 obd 요청에 대한 고주파 응답을 지원하지 않아 저주파 통신이 발생할 수 있기 때문입니다.
또한 ELM327은 OBD-II를 통해 차량 내 여러 ECU에서 데이터를 검색할 수 있는 센서 중 가장 저렴하고 품질이 가장 낮은 옵션입니다.
이번 포스팅은 자동차의 여행용 컴퓨터 연비 수치에 도전[9]한 내용입니다
Elm327에서 고려해야 할 중요한 점 중 하나는 응답 시간이 무시할 수 없을 만큼 크다는 것입니다. 장치가 메시지 요청에 응답하는 데 0.1~0.3초가 걸릴 수 있습니다. 즉, 너무 많은 신호를 추가하면 빠르게 속도가 줄어들게 됩니다. 쓸모가 없을 정도의 샘플링 속도
자세한 조언을 보려면 다음 페이지를 방문하세요. ★OBDII 어댑터 선택[10]
실적이 저조한 원인으로는 다음과 같은 이유를 생각해 볼 수 있습니다.
내가 개발 중인 측정 소프트웨어는 여러 센서가 필요한 사용 사례를 가정합니다. 또한 사용자는 yaml 파일에서 측정 설정을 변경할 수 있습니다. 사용자는 간단한 GUI에서 측정 애플리케이션을 작동할 수 있습니다. 여기에서 소스 코드를 볼 수 있습니다: ★VDDM[11]
측정 시스템의 구조는 아래와 같습니다.
VDDM/ │ ├── fusion/ │ ├── __init__.py │ ├── sensor_fusion.py # Handles data fusion, collects data from sensors │ └── sensors/ │ ├── __init__.py │ ├── bno055_measurement.py # Script for BNO055 sensor data collection │ └── elm327_measurement.py # Script for ELM327 OBD2 data collection │ ├── signalprocessing/ │ ├── __init__.py │ └── filter.py # Contains filtering algorithms (e.g., Butterworth filter) │ ├── config/ │ ├── config_manager.py # Loads configuration from YAML │ └── measurement_system_config.yaml # Configuration file for sensors and system settings │ ├── utils/ │ ├── tools.py # Utility functions (e.g., wait functions) │ └── visualize_data.py # Functions to format and display sensor data │ ├── measurement/ │ ├── __init__.py │ └── measurement_control.py # Controls the measurement process │ ├── gui/ │ ├── __init__.py │ └── main_gui.py # GUI setup for starting/stopping measurement │ ├── main.py # Entry point for starting the system └── requirements.txt # Project dependencies
아래 그림은 VDDM의 흐름을 나타냅니다.
main.py └── main_gui.py ├── Starts GUI interface and buttons └── measurement_control.py ├── Initializes sensor measurements ├── Controls start/stop of measurements └── sensor_fusion.py ├── Manages sensor data collection and fusion ├── Uses: │ ├── bno055_measurement.py (BNO055 sensor data collection) │ └── elm327_measurement.py (ELM327 OBD2 data collection) └── Processes data with: ├── filter.py (Applies Butterworth low-pass filter) └── visualize_data.py (Formats and visualizes sensor data)
VDDM의 주요 구성 요소는 VDDM/fusion/sensor_fusion.py, VDDM/fusion/sensors/bno055_measurement.py 및 VDDM/fusion/sensors/elm327_measurement.py입니다.
# sensor_fusion.py import os import sys import importlib from time import perf_counter from collections import defaultdict import pandas as pd import datetime import asyncio import numpy as np parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) from config import config_manager from utils.tools import wait_process from utils.visualize_data import format_sensor_fusion_data from signalprocessing.filter import butterlowpass config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class SensorFactory: @staticmethod def create_sensor(sensor_type, config): try: # Import the appropriate sensor module based on sensor_type module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement") # Get a sensor class sensor_class = getattr(module, f"{sensor_type.upper()}") # Create a sensor instance and return return sensor_class(config) except (ImportError, AttributeError) as e: print(f"Error creating sensor {sensor_type}: {e}") return None class Sensors: def __init__(self, config): self.config = config_manager.load_config(config_path) self.sensor_list = tuple(self.config.sensors.keys()) self.sensor_instances = {} self.is_running = False self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv" self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s] # Buffer size is determined by the relation of sequence length and sampling frequency # Buffer secures data for SEQUENCE_LENGTH[s] self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.is_filter = config.filter_params.is_filter self.is_show_real_time_data = config.is_show_real_time_data self.TIMEZONE = config.timezone self.all_data_columns_list = () for sensor_name in self.sensor_list: self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"]) self.data_buffer = pd.DataFrame() # data buffer for sensor_type in self.sensor_list: sensor_config = self.config.sensors[sensor_type] sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config) if sensor_instance: self.sensor_instances[sensor_type] = sensor_instance if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization") def get_sensor(self, sensor_type): """ Retrieve the sensor instance corresponding to the specified sensor type. Args: sensor_type (str): The type of the sensor to retrieve. Returns: object: The sensor instance corresponding to the specified sensor type. Returns None if the sensor type does not exist. """ return self.sensor_instances.get(sensor_type) def collect_data(self): """ Collect data from all sensors. This method iterates over all sensor instances and collects data from each sensor. The collected data is stored in a dictionary where the keys are sensor types and the values are the data collected from the corresponding sensors. Returns: dict: A dictionary containing the collected data from all sensors. The keys are sensor types and the values are the data from each sensor. Raises: Exception: If an error occurs while collecting data from any sensor, the exception is caught and printed. """ data = {} try: for sensor_type, sensor in self.sensor_instances.items(): # get data from sensors data[sensor_type] = sensor.get_data_from_sensor() return data except Exception as e: print(e) def on_change_start_measurement(self): """ Start the measurement process. This method sets the is_running flag to True, indicating that the measurement process should start. """ self.is_running = True def on_change_stop_measurement(self): """ Stop the measurement process. This method sets the is_running flag to False, indicating that the measurement process should stop. """ self.is_running = False def filtering(self, df, labellist): """ Apply a low-pass filter to the specified columns in the DataFrame. This method applies a Butterworth low-pass filter to each column specified in the labellist. The "Time" column should be excluded from the labellist as it is not needed for the computation. Args: df (pd.DataFrame): The input DataFrame containing the data to be filtered. labellist (list of str): A list of column names to be filtered. The "Time" column should not be included in this list. Returns: pd.DataFrame: A new DataFrame with the filtered data. """ filtered_df = df.copy() for labelname in labellist: # Ensure the column is converted to a numpy array x = df[labelname].to_numpy() filtered_df[labelname] = butterlowpass( x=x, # Correctly pass the numpy array as 'x' fpass=self.FPASS, fstop=self.FSTOP, gpass=self.GPASS, gstop=self.GSTOP, fs=self.SAMPLING_FREQUENCY_HZ, dt=self.SAMPLING_TIME, checkflag=False, labelname=labelname ) return filtered_df def convert_dictdata(self, current_time, sensor_data_dict): """ Convert nested dictionary data from multiple sensors into a single DataFrame. This method converts nested dictionary data obtained from multiple sensors into a single dictionary and then converts it into a pandas DataFrame. The current_time information is associated with the data. Args: current_time (float): The current time at which the data was obtained. sensor_data_dict (dict): A nested dictionary containing data from multiple sensors. Returns: pd.DataFrame: A DataFrame containing the converted data with the current time information. """ converted_data = {'Time': current_time} for sensor, data in sensor_data_dict.items(): converted_data.update(data) converted_data = pd.DataFrame([converted_data]) return converted_data async def update_data_buffer(self, dict_data): """ Add data from sensors to the buffer and save it if necessary. This method adds the provided sensor data to the internal buffer. If the buffer exceeds the specified maximum length, the oldest data is saved to a CSV file and removed from the buffer. Args: dict_data (dict): The data from sensors to be added to the buffer. """ # Add data to the buffer self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True) # If the buffer exceeds the specified length, save the oldest data if len(self.data_buffer) > self.MAX_DATA_BUF_LEN: # Save the oldest data to a CSV file old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN) await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH) # Update the buffer self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN) async def save_data_async(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method uses asyncio.to_thread to run the synchronous to_csv method in a separate thread, allowing it to be handled asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ if not os.path.isfile(path): await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w') else: await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a') async def save_data(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method calls save_data_async to save the DataFrame to a CSV file asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ await self.save_data_async(df, path) async def finish_measurement_and_save_data(self): """ Finish the measurement process and save the data. This method finalizes the measurement process by saving the buffered data to a CSV file. It also applies filtering if specified and saves the filtered data to a separate CSV file. The method handles time zone settings and generates a timestamp for the file names. The buffered data is saved to a temporary CSV file, which is then read back and saved to a final file path with a timestamp. If filtering is enabled, the filtered data is also saved. The temporary CSV file is deleted after the data is saved. Raises: Exception: If an error occurs during the file operations. """ t_delta = datetime.timedelta(hours=9) TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone now = datetime.datetime.now(TIMEZONE) timestamp = now.strftime('%Y%m%d%H%M%S') final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], timestamp + "/" + timestamp + '_' + self.SAVE_BUF_CSVDATA_PATH.split('/')[-1]) await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH) raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0) os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True) raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True) if self.is_filter: filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:]) filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True) if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted") else: print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed") async def sensor_fusion_main(): """ Main function for sensor fusion. This function initializes the sensor fusion process, starts the measurement loop, collects data from multiple sensors, updates the data buffer, and handles real-time data display. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until the measurement process is stopped, either by an exception or a keyboard interrupt. Raises: Exception: If an error occurs during the measurement process, it is caught and printed. KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped and the data is saved. """ print("Start sensor fusion main") config = config_manager.load_config(config_path) sensors = Sensors(config["master"]) print("Called an instance of Sensors class") # sensors.start_all_measurements() sampling_counter = 0 current_time = 0 #sensors.is_running = True sensors.on_change_start_measurement() try: main_loop_start_time = None while sensors.is_running: iteration_start_time = perf_counter() # Start time of each iteration if main_loop_start_time is None: main_loop_start_time = iteration_start_time # initialize main loop start time current_time = perf_counter() - main_loop_start_time # Current time data = sensors.collect_data() # Get data from sensors sampling_counter += 1 # Num of sampling converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file. await sensors.update_data_buffer(converted_data) # Display data in real time. This process is executed on additional thread. if sensors.is_show_real_time_data: formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait based on the sampling interval and execution time to maintain the sampling frequency. iteration_end_time = perf_counter() iteration_duration = iteration_end_time - iteration_start_time print("Iteration duration is: {0} [s]".format(iteration_duration)) sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration) if sleep_time > 0: wait_process(sleep_time) except Exception as e: print(e) except KeyboardInterrupt: sensors.on_change_stop_measurement() print("KeyboardInterrupt") await sensors.finish_measurement_and_save_data() finally: print("finish") # Compute delay of sampling main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Compute ideal sampliing time ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ) # Cpmpute a delay delay_time = current_time - ideal_time # reliability rate sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': asyncio.run(sensor_fusion_main())
sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.
# bno055_measurement.py import time import numpy as np import adafruit_bno055 import board import os import sys parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class BNO055: def __init__(self, config): self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data i2c_instance = board.I2C() # Create i2c instance self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance def calibration(self): print("Start calibration!") while not self.bno055_sensor.calibrated: print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status))) time.sleep(1) def calcEulerfromQuaternion(self, _w, _x, _y, _z): """ Calculate Euler angles (roll, pitch, yaw) from quaternion components. This method converts quaternion components (_w, _x, _y, _z) into Euler angles (roll, pitch, yaw) in degrees. If any of the quaternion components are None, it returns (0.0, 0.0, 0.0) and prints an error message. Args: _w (float): The w component of the quaternion. _x (float): The x component of the quaternion. _y (float): The y component of the quaternion. _z (float): The z component of the quaternion. Returns: tuple: A tuple containing the roll, pitch, and yaw angles in degrees. If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message. """ if None in (_w, _x, _y, _z): print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}") return 0.0, 0.0, 0.0 try: sqw = _w ** 2 sqx = _x ** 2 sqy = _y ** 2 sqz = _z ** 2 COEF_EULER2DEG = 57.2957795131 # Yaw term1 = 2.0 * (_x * _y + _z * _w) term2 = sqx - sqy - sqz + sqw yaw = np.arctan2(term1, term2) # Pitch term1 = -2.0 * (_x * _z - _y * _w) term2 = sqx + sqy + sqz + sqw pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0 # Roll term1 = 2.0 * (_y * _z + _x * _w) term2 = -sqx - sqy + sqz + sqw roll = np.arctan2(term1, term2) return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw except Exception as e: print(f"Error in calcEulerfromQuaternion: {e}") return 0.0, 0.0, 0.0 def get_data_from_sensor(self): """ Retrieve data from the BNO055 sensor and return it as a dictionary. This method collects various sensor readings from the BNO055 sensor, including Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field, and calibration status. It then constructs a dictionary with these values and returns only the columns specified in self.COLUMNS. Returns: dict: A dictionary containing the sensor data. Only the columns specified in self.COLUMNS are included in the returned dictionary. """ # Get data euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler] # X: yaw, Y: pitch, Z: roll gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro] # Gyro[rad/s] linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration] # Linear acceleration[m/s^2] quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion] # Quaternion quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4) # Cal Euler angle from quaternion magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic] # Magnetic field calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status] # Status of calibration data_dict = { "linear_accel_x": linear_accel_x, "linear_accel_y": linear_accel_y, "linear_accel_z": linear_accel_z, "gyro_x": gyro_x, "gyro_y": gyro_y, "gyro_z": gyro_z, "euler_x": euler_x, "euler_y": euler_y, "euler_z": euler_z, "quat_roll": quat_roll, "quat_pitch": quat_pitch, "quat_yaw": quat_yaw, "quaternion_1": quaternion_1, "quaternion_2": quaternion_2, "quaternion_3": quaternion_3, "quaternion_4": quaternion_4, "magnetic_x": magnetic_x, "magnetic_y": magnetic_y, "magnetic_z": magnetic_z, "calibstat_sys": calibstat_sys, "calibstat_gyro": calibstat_gyro, "calibstat_accel": calibstat_accel, "calibstat_mag": calibstat_mag } return {column: data_dict[column] for column in self.COLUMNS if column in data_dict} def format_sensor_data(data, labels): """ Format sensor data into a string for display. This method takes a dictionary of sensor data and a list of labels, and formats the data into a string where each label is followed by its corresponding value. If a value is None, it is replaced with the string "None". Each label-value pair is separated by " / ". Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the BNO055 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_bno055 = BNO055(config.sensors['bno055']) start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_bno055.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_bno055.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_bno055.COLUMNS) # current time print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: # Calculate the sampling delay from the number of samples and the current time main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1 # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.
# elm327_measurement.py import obd import os import time from collections import deque import numpy as np import pandas as pd import datetime import asyncio import scipy from scipy import signal import matplotlib as plt import sys from collections import defaultdict import random parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class ELM327: def __init__(self, config): """ Initialize the ELM327 class with configuration parameters. Args: config (dict): Configuration parameters for the ELM327. """ self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.res = self.connect_to_elm327() self.is_offline = config.is_offline self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data def initialize_BLE(self): """ Initialize Bluetooth Low Energy (BLE) for ELM327 connection. """ os.system('sudo hcitool scan') os.system('sudo hciconfig hci0 up') os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3') os.system('sudo rfcomm listen 0 1 &') def connect_to_elm327(self): """ Establish a connection to the ELM327 device. Returns: res (obd.OBDStatus): The connection status of the ELM327 device. """ res = None try: self.initialize_BLE() self.connection = obd.OBD() print(self.connection.status()) res = self.connection.status() if res == obd.OBDStatus.CAR_CONNECTED: print("----------Connection establishment is successful!----------") return res else: print("----------Connection establishment failed!----------") print("End program. Please check settings of the computer and ELM327") except Exception as e: print("----------Exception!----------") print(e) finally: return res def get_data_from_sensor(self): """ Retrieve data from the sensor. Returns: dict: A dictionary containing sensor data. """ if self.is_offline: data = self.get_data_from_sensor_stub() else: # Retrieve data and save it in dictionary format data = {column: self.get_obd2_value(column) for column in self.COLUMNS} return data def get_obd2_value_debug(self, column): """ Retrieve OBD-II value for a specific column with debug information. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response: print(f"Response for command '{command}': {response}") if response.value is not None: # Check for None print(f"Response value for command '{command}': {response.value}") return response.value.magnitude else: print(f"No value in response for command '{command}'") else: print(f"No response for command '{command}'") else: print(f"No command found for column '{column}'") return None def get_obd2_value(self, column): """ Retrieve OBD-II value for a specific column. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response.value is not None: # Check for None return response.value.magnitude return None def get_data_from_sensor_stub(self): """ Generate stub data for the sensor. Returns: dict: A dictionary containing stub sensor data. """ data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS} # Randomly insert None or 0.0 if random.choice([True, False]): random_column = random.choice(self.COLUMNS) if random.choice([True, False]): data_stub[random_column] = None else: data_stub[random_column] = 0.0 return data_stub def format_data_for_display(data, labels): """ Format sensor data for display. Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" for label, value in zip(labels, data.values()): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def format_sensor_data(data, labels): """ Format sensor data for display. Args: data (dict or list): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " else: for label, value in zip(labels, data): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the ELM327 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_elm327 = ELM327(config.sensors['elm327']) # res = meas_elm327.connect_to_elm327() start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_elm327.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_elm327.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_elm327.COLUMNS) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.
In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.
[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.
[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.
[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.
[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.
[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.
[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.
[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.
[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.
[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.
[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.
[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.
위 내용은 차량용 Raspberry Pi를 사용하여 맞춤형 데이터 로거 구축: BNO 및 ELM 센서 통합의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!