As a software engineer working in the automotive industry, I have a keen interest in autonomous vehicles, data measurement techniques, and analysis methods. In this post, I will describe a custom-built measurement system, detailing the process from scratch, and present some experimental results. My data logger consists of a Raspberry Pi 3, a BNO055 sensor, and an ELM327 OBD-II adapter, which are responsible for computing, collecting acceleration and rotational velocity data, and retrieving engine information from an actual vehicle, respectively.
1.Background
2.Architecture
3.Sensor information
4.Coordinate system information
5.Data Comparison
6.Core Logics
7.Conclusion
8.Reference
In recent years, the rise of IoT, artificial intelligence, and edge computing has brought significant advancements to various industries, including the automotive sector. Modern vehicles are equipped with a multitude of sensors that work together to create sophisticated systems, enabling everything from enhanced safety features to autonomous driving capabilities.
Typically, monitoring a vehicle’s state requires the use of advanced and expensive sensors to achieve precise and reliable measurements. However, for certain situations, such as obtaining data on circuits for a general understanding of vehicle dynamics, these high-end sensors may not be strictly necessary. Recognizing this, I decided to create a cost-effective system capable of measuring a vehicle's physical state using more affordable components.
I decided to focus on measuring data related to vehicle dynamics rather than ride comfort. This is because analyzing ride comfort requires capturing more high-frequency signals with suitable accuracy, which necessitates setting a high sampling frequency during measurement.
Specifically, in the frequency response characteristics of lateral acceleration with respect to steering angle, the resonant frequency generally appears below 5 Hz, although it depends on the vehicle speed. On the other hand, in ride comfort analysis, discussions often extend to the range of several tens of Hz.
Therefore, the primary goal of developing the measurement system is to create a low-cost data logging system that facilitates data analysis in the vehicle dynamics domain.
The measurement system consists of the following hardware:
The Raspberry Pi acts as the main computer, collecting data from the BNO055 and ELM327. The Raspberry Pi communicates with the BNO055 via I2C and with the ELM327 via Bluetooth (see Figure 1).
The BNO055 sensor is capable of measuring various types of motion and data:★Adafruit BNO055 Overview[1]
Absolute Orientation (Euler Vector, 100Hz): Provides three-axis orientation data based on a 360° sphere.
Absolute Orientation (Quaternion, 100Hz): Offers four-point quaternion output for more accurate data manipulation.
Angular Velocity Vector (100Hz): Measures rotation speed in radians per second (rad/s) across three axes.
Acceleration Vector (100Hz): Captures acceleration including gravity and linear motion in meters per second squared (m/s²) across three axes.
Magnetic Field Strength Vector (20Hz): Senses the magnetic field strength in microteslas (µT) across three axes.
Linear Acceleration Vector (100Hz): Records linear acceleration data (excluding gravity) in meters per second squared (m/s²) across three axes.
Gravity Vector (100Hz): Measures gravitational acceleration (excluding any movement) in meters per second squared (m/s²) across three axes.
Temperature (1Hz): Provides ambient temperature in degrees Celsius.
The BNO055 is compact and supported by Adafruit's Python library called Adafruit_CircuitPython. Additionally, a C-language library is available from Adafruit.
★Adafruit_CircuitPython_BNO055[2]
★Adafruit_BNO055[3]
For a demonstration, you can view the video here:
Adafruit BNO055 Demo
★Adafruit BNO055 Demo[4]
ELM327 は、標準インターフェイスを介して車両エンジン データにアクセスできるようにする、広く使用されている OBD-II (オンボード診断) アダプターです。車両の ECU (電子制御ユニット) とコンピューターやスマートフォンなどの外部デバイスの間のブリッジとして機能し、診断データやパフォーマンス データの取得を可能にします。 ELM327 の主な特徴と機能は次のとおりです。★ELM327 情報[5]
OBD-II プロトコルのサポート: ELM327 は、ISO 9141、ISO 14230 (KWP2000)、ISO 15765 (CAN) などを含むさまざまな OBD-II プロトコルをサポートし、幅広い車両と互換性があります。
診断トラブル コード (DTC): 車両の ECU から診断トラブル コードを読み取って消去し、問題の特定とトラブルシューティングを支援します。
ライブ データ ストリーミング: エンジン RPM、車両速度、冷却水温度、燃料レベルなどの車両センサーからのリアルタイム データを提供します。
フレーム データのフリーズ: 障害が検出された瞬間にデータをキャプチャして保存します。これは、断続的な問題の診断に役立ちます。
車両情報: VIN (車両識別番号)、キャリブレーション ID などを含む車両に関する詳細を取得します。
互換性: Bluetooth、USB、Wi-Fi バージョンなどのさまざまな形式で利用可能で、さまざまなデバイスやプラットフォームとの互換性が可能です。
通常、ELM327 は車両の状態、特にエンジンの状態を診断するために使用されます。ただし、車両から広範囲のデータを収集するために使用することもできます。 ELM327 は Amazon などのプラットフォームで購入でき、価格は約 20 ドルから 80 ドルの範囲です。
Python-OBD ライブラリを使用すると、ELM327 アダプター経由でデータを収集するクエリを作成でき、カスタマイズ可能な詳細なデータ収集が可能になります
このシステムは複数のセンサーおよび実際の車両と対話するため、各座標系を理解する必要があります。
下の画像は、私のデータ測定システムの BNO055 座標系を示しています。私のシステムはさらに、四元数を使用して回転角度を計算します。センサーの座標系は車両の座標系と一致する必要があります。
測定システムは車両座標系に基づいて動作します。
検証タスクには、加速度、ジャイロなどを含む複数の物理データ ポイントを測定できる iPhone アプリケーションを使用しました。 iPhone アプリケーションを使用する目的は、このアプリケーションのデータを使用してデータ測定システムの妥当性を確認することです。図 4 は Apple の開発者サイトから引用されています。
このセクションでは、いくつかの実験結果を紹介します。最初のステップとして、測定システムが単位間違いなどのエラーなく読み取り値を出力するかどうかを確認するために、正弦波タスクを実行しました。この部分は基本的なものですが、データを収集する正しい方法を確保するために非常に重要です。
次に実車を走行させながら物理値を計測してみました。
次に、ELM327の性能と各センサーのサンプリング精度を確認してみました。
正弦波テストを実施したので、このセクションではその結果を紹介します。これらのテストの目的は、次のことを確認することです:
これを実現するために、さまざまな物理値を測定できるスマホアプリ「phyphox」★phyphox[8]を使用しました。以下に示すように、BNO055 センサーを iPhone に取り付けました [図 5]。
図 6 は、サンプリング周波数 10 Hz での正弦波テストの結果を示しています。凡例の「VDDM」は私が開発した測定システムによって収集されたデータを表し、「iPhone」は「phyphox」を使用して収集されたデータを表します。
VDDM と iPhone の両方からのデータは、すべての軸にわたって同様の傾向を示しており、両方のアプリケーションが同様のモーション パターンをキャプチャしていることを示しています。
VDDM データは、特に加速度グラフで若干変動が大きいように見えます。これは、iPhone と比較して感度が高いか、または異なるフィルタリング アプローチを示唆している可能性があります。
両方のデータは一般的によく一致していますが、振幅の違いは、精度要件に応じてさらなる校正または調整が必要になる可能性があることを示唆しています。
VDDM は、特に加速度グラフで、iPhone データには存在しない追加のノイズまたは高周波成分をキャプチャしている可能性があります。これは、詳細な分析に役立つ場合もあれば、さらなるフィルタリングが必要になる場合もあります。
図7はBNO055の実車データに着目した実験結果を示しています。参考にしたのは「phyphox」というiPhoneアプリです。サンプリング周波数は50Hzであった。私が開発した車両のデータ ロギング システムによって測定された加速度と回転信号には、ノイズ、外れ値、NaN 値が含まれていることが観察されました。そこで、IQR 法やバターワース ローパス フィルターなどの後処理関数を適用しました。各グラフの凡例中の「fc」という用語はカットオフ周波数を表します。たとえば、fc_19.5 はカットオフ周波数 19.5Hz を示します。この実験は BNO055 センサーのみを使用して実行され、ELM327 機能は無効になっていることに注意してください。
VDDM (生データ) と iPhone からのデータは、すべての軸にわたって同様の傾向を示しています。これは、両方のシステムが同じモーション パターンをキャプチャしていることを示唆しています。
VDDM からのデータは、iPhone のデータと比較してばらつきが大きいようです。これは加速度グラフで特に顕著であり、VDDM がより高い感度でデータをキャプチャしているか、異なるフィルタリング アプローチを使用していることを示している可能性があります。
VDDM からのフィルター処理されたデータ (fc 19.5Hz および 10Hz) は、生データと比較して変動が減少しており、iPhone データに近づいています。これは、VDDM (生データ) にはより多くのノイズまたは高周波成分が含まれている可能性があることを示唆しています。
VDDM からの生データには外れ値と欠損値が含まれていましたが、フィルターを適用することで軽減されました。これは、ロール レートとピッチ レートのグラフで特に顕著です。
VDDM には iPhone に比べて高周波成分とノイズが多く含まれているようです。これは詳細な分析には有利ですが、ノイズ低減が重要な場合は追加のフィルタリングまたはハードウェア レベルの調整が必要になる場合があります。
図 8 はエンジン性能に関するデータを示しています。これらのデータは、サンプリング周波数 4 Hz の ELM327 スキャナを使用して収集されました。データ収集中に BNO055 機能が無効になりました。
BNO055 と ELM327 を使った実験中に、サンプリングの遅延が観察されました。問題を特定するために、サンプリング周波数を調整し、ELM327 と BNO055 を有効または無効にして測定を実行しました。
まず、サンプリング周波数に対応する時間差を提示します。このテストには、BNO055 センサーと ELM327 センサーの両方の使用が含まれます
図 9 は、サンプリング周波数が 10Hz に設定されている場合、サンプリング時間の精度が大幅に低下することを示しています。
ELM327 は BNO055 ほど高いサンプリング周波数を達成できないため、システム全体のパフォーマンスに悪影響を及ぼす可能性があります。それにも関わらず、各センサーの性能をよりよく理解するために、個別の測定を実施しました。
図10と図11は、サンプリング周波数とサンプリングタイミング精度の関係を示しています。サンプリング タイミングの精度は次のように表されます。
図 10 によると、BNO055 は最大 50 Hz のサンプリング周波数に応答できます。ただし、60 Hz を超えると、サンプリング タイミングの精度が大幅に低下します。
図 11 は、ELM327 が BNO055 と比較して応答性が低いことを示しています。 ELM327 は、最大 5 Hz までのみ良好なサンプリング タイミング精度を維持します。これは、ELM327 が OBD 経由で車両と通信し、車両の ECU が OBD リクエストに対する高周波応答をサポートしていない可能性があり、その結果低周波通信が発生するためです。
さらに、ELM327 は、OBD-II 経由で車両内の複数の ECU からデータを取得できるセンサーの中で最も安価で品質の低いオプションです。
この投稿★車のトリップコンピューターの燃費数値に挑戦する[9] には、
Elm327 に関して考慮すべき重要なことの 1 つは、応答時間が無視できるほどではないということです。デバイスはメッセージ要求に応答するまでに 0.1 ~ 0.3 秒かかる場合があり、これは、信号を追加しすぎると急速に応答時間が短縮されることを意味します。サンプリングレートを役に立たないレベルまで引き上げる
さらに詳しいアドバイスについては、こちらのページをご覧ください:★OBDII アダプターの選択[10]
パフォーマンスが低下する原因としては以下のことが考えられます。
私が開発している計測ソフトウェアは、複数のセンサーが必要なユースケースを想定しています。さらに、ユーザーは yaml ファイルの測定設定を変更できます。ユーザーはシンプルなGUI上で測定アプリケーションを操作できます。ソースコードはここで見ることができます: ★VDDM[11]
測定システムの構造は以下のとおりです:
VDDM/ │ ├── fusion/ │ ├── __init__.py │ ├── sensor_fusion.py # Handles data fusion, collects data from sensors │ └── sensors/ │ ├── __init__.py │ ├── bno055_measurement.py # Script for BNO055 sensor data collection │ └── elm327_measurement.py # Script for ELM327 OBD2 data collection │ ├── signalprocessing/ │ ├── __init__.py │ └── filter.py # Contains filtering algorithms (e.g., Butterworth filter) │ ├── config/ │ ├── config_manager.py # Loads configuration from YAML │ └── measurement_system_config.yaml # Configuration file for sensors and system settings │ ├── utils/ │ ├── tools.py # Utility functions (e.g., wait functions) │ └── visualize_data.py # Functions to format and display sensor data │ ├── measurement/ │ ├── __init__.py │ └── measurement_control.py # Controls the measurement process │ ├── gui/ │ ├── __init__.py │ └── main_gui.py # GUI setup for starting/stopping measurement │ ├── main.py # Entry point for starting the system └── requirements.txt # Project dependencies
以下の図は VDDM の流れを示しています。
main.py └── main_gui.py ├── Starts GUI interface and buttons └── measurement_control.py ├── Initializes sensor measurements ├── Controls start/stop of measurements └── sensor_fusion.py ├── Manages sensor data collection and fusion ├── Uses: │ ├── bno055_measurement.py (BNO055 sensor data collection) │ └── elm327_measurement.py (ELM327 OBD2 data collection) └── Processes data with: ├── filter.py (Applies Butterworth low-pass filter) └── visualize_data.py (Formats and visualizes sensor data)
VDDM の主なコンポーネントは、VDDM/fusion/sensor_fusion.py、VDDM/fusion/sensors/bno055_measurement.py、および VDDM/fusion/sensors/elm327_measurement.py です。
# sensor_fusion.py import os import sys import importlib from time import perf_counter from collections import defaultdict import pandas as pd import datetime import asyncio import numpy as np parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) from config import config_manager from utils.tools import wait_process from utils.visualize_data import format_sensor_fusion_data from signalprocessing.filter import butterlowpass config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class SensorFactory: @staticmethod def create_sensor(sensor_type, config): try: # Import the appropriate sensor module based on sensor_type module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement") # Get a sensor class sensor_class = getattr(module, f"{sensor_type.upper()}") # Create a sensor instance and return return sensor_class(config) except (ImportError, AttributeError) as e: print(f"Error creating sensor {sensor_type}: {e}") return None class Sensors: def __init__(self, config): self.config = config_manager.load_config(config_path) self.sensor_list = tuple(self.config.sensors.keys()) self.sensor_instances = {} self.is_running = False self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv" self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s] # Buffer size is determined by the relation of sequence length and sampling frequency # Buffer secures data for SEQUENCE_LENGTH[s] self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.is_filter = config.filter_params.is_filter self.is_show_real_time_data = config.is_show_real_time_data self.TIMEZONE = config.timezone self.all_data_columns_list = () for sensor_name in self.sensor_list: self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"]) self.data_buffer = pd.DataFrame() # data buffer for sensor_type in self.sensor_list: sensor_config = self.config.sensors[sensor_type] sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config) if sensor_instance: self.sensor_instances[sensor_type] = sensor_instance if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization") def get_sensor(self, sensor_type): """ Retrieve the sensor instance corresponding to the specified sensor type. Args: sensor_type (str): The type of the sensor to retrieve. Returns: object: The sensor instance corresponding to the specified sensor type. Returns None if the sensor type does not exist. """ return self.sensor_instances.get(sensor_type) def collect_data(self): """ Collect data from all sensors. This method iterates over all sensor instances and collects data from each sensor. The collected data is stored in a dictionary where the keys are sensor types and the values are the data collected from the corresponding sensors. Returns: dict: A dictionary containing the collected data from all sensors. The keys are sensor types and the values are the data from each sensor. Raises: Exception: If an error occurs while collecting data from any sensor, the exception is caught and printed. """ data = {} try: for sensor_type, sensor in self.sensor_instances.items(): # get data from sensors data[sensor_type] = sensor.get_data_from_sensor() return data except Exception as e: print(e) def on_change_start_measurement(self): """ Start the measurement process. This method sets the is_running flag to True, indicating that the measurement process should start. """ self.is_running = True def on_change_stop_measurement(self): """ Stop the measurement process. This method sets the is_running flag to False, indicating that the measurement process should stop. """ self.is_running = False def filtering(self, df, labellist): """ Apply a low-pass filter to the specified columns in the DataFrame. This method applies a Butterworth low-pass filter to each column specified in the labellist. The "Time" column should be excluded from the labellist as it is not needed for the computation. Args: df (pd.DataFrame): The input DataFrame containing the data to be filtered. labellist (list of str): A list of column names to be filtered. The "Time" column should not be included in this list. Returns: pd.DataFrame: A new DataFrame with the filtered data. """ filtered_df = df.copy() for labelname in labellist: # Ensure the column is converted to a numpy array x = df[labelname].to_numpy() filtered_df[labelname] = butterlowpass( x=x, # Correctly pass the numpy array as 'x' fpass=self.FPASS, fstop=self.FSTOP, gpass=self.GPASS, gstop=self.GSTOP, fs=self.SAMPLING_FREQUENCY_HZ, dt=self.SAMPLING_TIME, checkflag=False, labelname=labelname ) return filtered_df def convert_dictdata(self, current_time, sensor_data_dict): """ Convert nested dictionary data from multiple sensors into a single DataFrame. This method converts nested dictionary data obtained from multiple sensors into a single dictionary and then converts it into a pandas DataFrame. The current_time information is associated with the data. Args: current_time (float): The current time at which the data was obtained. sensor_data_dict (dict): A nested dictionary containing data from multiple sensors. Returns: pd.DataFrame: A DataFrame containing the converted data with the current time information. """ converted_data = {'Time': current_time} for sensor, data in sensor_data_dict.items(): converted_data.update(data) converted_data = pd.DataFrame([converted_data]) return converted_data async def update_data_buffer(self, dict_data): """ Add data from sensors to the buffer and save it if necessary. This method adds the provided sensor data to the internal buffer. If the buffer exceeds the specified maximum length, the oldest data is saved to a CSV file and removed from the buffer. Args: dict_data (dict): The data from sensors to be added to the buffer. """ # Add data to the buffer self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True) # If the buffer exceeds the specified length, save the oldest data if len(self.data_buffer) > self.MAX_DATA_BUF_LEN: # Save the oldest data to a CSV file old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN) await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH) # Update the buffer self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN) async def save_data_async(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method uses asyncio.to_thread to run the synchronous to_csv method in a separate thread, allowing it to be handled asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ if not os.path.isfile(path): await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w') else: await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a') async def save_data(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method calls save_data_async to save the DataFrame to a CSV file asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ await self.save_data_async(df, path) async def finish_measurement_and_save_data(self): """ Finish the measurement process and save the data. This method finalizes the measurement process by saving the buffered data to a CSV file. It also applies filtering if specified and saves the filtered data to a separate CSV file. The method handles time zone settings and generates a timestamp for the file names. The buffered data is saved to a temporary CSV file, which is then read back and saved to a final file path with a timestamp. If filtering is enabled, the filtered data is also saved. The temporary CSV file is deleted after the data is saved. Raises: Exception: If an error occurs during the file operations. """ t_delta = datetime.timedelta(hours=9) TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone now = datetime.datetime.now(TIMEZONE) timestamp = now.strftime('%Y%m%d%H%M%S') final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], timestamp + "/" + timestamp + '_' + self.SAVE_BUF_CSVDATA_PATH.split('/')[-1]) await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH) raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0) os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True) raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True) if self.is_filter: filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:]) filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True) if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted") else: print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed") async def sensor_fusion_main(): """ Main function for sensor fusion. This function initializes the sensor fusion process, starts the measurement loop, collects data from multiple sensors, updates the data buffer, and handles real-time data display. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until the measurement process is stopped, either by an exception or a keyboard interrupt. Raises: Exception: If an error occurs during the measurement process, it is caught and printed. KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped and the data is saved. """ print("Start sensor fusion main") config = config_manager.load_config(config_path) sensors = Sensors(config["master"]) print("Called an instance of Sensors class") # sensors.start_all_measurements() sampling_counter = 0 current_time = 0 #sensors.is_running = True sensors.on_change_start_measurement() try: main_loop_start_time = None while sensors.is_running: iteration_start_time = perf_counter() # Start time of each iteration if main_loop_start_time is None: main_loop_start_time = iteration_start_time # initialize main loop start time current_time = perf_counter() - main_loop_start_time # Current time data = sensors.collect_data() # Get data from sensors sampling_counter += 1 # Num of sampling converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file. await sensors.update_data_buffer(converted_data) # Display data in real time. This process is executed on additional thread. if sensors.is_show_real_time_data: formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait based on the sampling interval and execution time to maintain the sampling frequency. iteration_end_time = perf_counter() iteration_duration = iteration_end_time - iteration_start_time print("Iteration duration is: {0} [s]".format(iteration_duration)) sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration) if sleep_time > 0: wait_process(sleep_time) except Exception as e: print(e) except KeyboardInterrupt: sensors.on_change_stop_measurement() print("KeyboardInterrupt") await sensors.finish_measurement_and_save_data() finally: print("finish") # Compute delay of sampling main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Compute ideal sampliing time ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ) # Cpmpute a delay delay_time = current_time - ideal_time # reliability rate sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': asyncio.run(sensor_fusion_main())
sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.
# bno055_measurement.py import time import numpy as np import adafruit_bno055 import board import os import sys parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class BNO055: def __init__(self, config): self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data i2c_instance = board.I2C() # Create i2c instance self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance def calibration(self): print("Start calibration!") while not self.bno055_sensor.calibrated: print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status))) time.sleep(1) def calcEulerfromQuaternion(self, _w, _x, _y, _z): """ Calculate Euler angles (roll, pitch, yaw) from quaternion components. This method converts quaternion components (_w, _x, _y, _z) into Euler angles (roll, pitch, yaw) in degrees. If any of the quaternion components are None, it returns (0.0, 0.0, 0.0) and prints an error message. Args: _w (float): The w component of the quaternion. _x (float): The x component of the quaternion. _y (float): The y component of the quaternion. _z (float): The z component of the quaternion. Returns: tuple: A tuple containing the roll, pitch, and yaw angles in degrees. If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message. """ if None in (_w, _x, _y, _z): print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}") return 0.0, 0.0, 0.0 try: sqw = _w ** 2 sqx = _x ** 2 sqy = _y ** 2 sqz = _z ** 2 COEF_EULER2DEG = 57.2957795131 # Yaw term1 = 2.0 * (_x * _y + _z * _w) term2 = sqx - sqy - sqz + sqw yaw = np.arctan2(term1, term2) # Pitch term1 = -2.0 * (_x * _z - _y * _w) term2 = sqx + sqy + sqz + sqw pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0 # Roll term1 = 2.0 * (_y * _z + _x * _w) term2 = -sqx - sqy + sqz + sqw roll = np.arctan2(term1, term2) return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw except Exception as e: print(f"Error in calcEulerfromQuaternion: {e}") return 0.0, 0.0, 0.0 def get_data_from_sensor(self): """ Retrieve data from the BNO055 sensor and return it as a dictionary. This method collects various sensor readings from the BNO055 sensor, including Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field, and calibration status. It then constructs a dictionary with these values and returns only the columns specified in self.COLUMNS. Returns: dict: A dictionary containing the sensor data. Only the columns specified in self.COLUMNS are included in the returned dictionary. """ # Get data euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler] # X: yaw, Y: pitch, Z: roll gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro] # Gyro[rad/s] linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration] # Linear acceleration[m/s^2] quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion] # Quaternion quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4) # Cal Euler angle from quaternion magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic] # Magnetic field calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status] # Status of calibration data_dict = { "linear_accel_x": linear_accel_x, "linear_accel_y": linear_accel_y, "linear_accel_z": linear_accel_z, "gyro_x": gyro_x, "gyro_y": gyro_y, "gyro_z": gyro_z, "euler_x": euler_x, "euler_y": euler_y, "euler_z": euler_z, "quat_roll": quat_roll, "quat_pitch": quat_pitch, "quat_yaw": quat_yaw, "quaternion_1": quaternion_1, "quaternion_2": quaternion_2, "quaternion_3": quaternion_3, "quaternion_4": quaternion_4, "magnetic_x": magnetic_x, "magnetic_y": magnetic_y, "magnetic_z": magnetic_z, "calibstat_sys": calibstat_sys, "calibstat_gyro": calibstat_gyro, "calibstat_accel": calibstat_accel, "calibstat_mag": calibstat_mag } return {column: data_dict[column] for column in self.COLUMNS if column in data_dict} def format_sensor_data(data, labels): """ Format sensor data into a string for display. This method takes a dictionary of sensor data and a list of labels, and formats the data into a string where each label is followed by its corresponding value. If a value is None, it is replaced with the string "None". Each label-value pair is separated by " / ". Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the BNO055 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_bno055 = BNO055(config.sensors['bno055']) start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_bno055.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_bno055.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_bno055.COLUMNS) # current time print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: # Calculate the sampling delay from the number of samples and the current time main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1 # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.
# elm327_measurement.py import obd import os import time from collections import deque import numpy as np import pandas as pd import datetime import asyncio import scipy from scipy import signal import matplotlib as plt import sys from collections import defaultdict import random parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class ELM327: def __init__(self, config): """ Initialize the ELM327 class with configuration parameters. Args: config (dict): Configuration parameters for the ELM327. """ self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.res = self.connect_to_elm327() self.is_offline = config.is_offline self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data def initialize_BLE(self): """ Initialize Bluetooth Low Energy (BLE) for ELM327 connection. """ os.system('sudo hcitool scan') os.system('sudo hciconfig hci0 up') os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3') os.system('sudo rfcomm listen 0 1 &') def connect_to_elm327(self): """ Establish a connection to the ELM327 device. Returns: res (obd.OBDStatus): The connection status of the ELM327 device. """ res = None try: self.initialize_BLE() self.connection = obd.OBD() print(self.connection.status()) res = self.connection.status() if res == obd.OBDStatus.CAR_CONNECTED: print("----------Connection establishment is successful!----------") return res else: print("----------Connection establishment failed!----------") print("End program. Please check settings of the computer and ELM327") except Exception as e: print("----------Exception!----------") print(e) finally: return res def get_data_from_sensor(self): """ Retrieve data from the sensor. Returns: dict: A dictionary containing sensor data. """ if self.is_offline: data = self.get_data_from_sensor_stub() else: # Retrieve data and save it in dictionary format data = {column: self.get_obd2_value(column) for column in self.COLUMNS} return data def get_obd2_value_debug(self, column): """ Retrieve OBD-II value for a specific column with debug information. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response: print(f"Response for command '{command}': {response}") if response.value is not None: # Check for None print(f"Response value for command '{command}': {response.value}") return response.value.magnitude else: print(f"No value in response for command '{command}'") else: print(f"No response for command '{command}'") else: print(f"No command found for column '{column}'") return None def get_obd2_value(self, column): """ Retrieve OBD-II value for a specific column. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response.value is not None: # Check for None return response.value.magnitude return None def get_data_from_sensor_stub(self): """ Generate stub data for the sensor. Returns: dict: A dictionary containing stub sensor data. """ data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS} # Randomly insert None or 0.0 if random.choice([True, False]): random_column = random.choice(self.COLUMNS) if random.choice([True, False]): data_stub[random_column] = None else: data_stub[random_column] = 0.0 return data_stub def format_data_for_display(data, labels): """ Format sensor data for display. Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" for label, value in zip(labels, data.values()): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def format_sensor_data(data, labels): """ Format sensor data for display. Args: data (dict or list): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " else: for label, value in zip(labels, data): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the ELM327 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_elm327 = ELM327(config.sensors['elm327']) # res = meas_elm327.connect_to_elm327() start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_elm327.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_elm327.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_elm327.COLUMNS) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.
In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.
[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.
[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.
[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.
[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.
[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.
[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.
[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.
[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.
[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.
[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.
[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.
The above is the detailed content of Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors. For more information, please follow other related articles on the PHP Chinese website!