Home > Backend Development > Python Tutorial > Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors

WBOY
Release: 2024-09-10 16:30:06
Original
862 people have browsed it

Summary

As a software engineer working in the automotive industry, I have a keen interest in autonomous vehicles, data measurement techniques, and analysis methods. In this post, I will describe a custom-built measurement system, detailing the process from scratch, and present some experimental results. My data logger consists of a Raspberry Pi 3, a BNO055 sensor, and an ELM327 OBD-II adapter, which are responsible for computing, collecting acceleration and rotational velocity data, and retrieving engine information from an actual vehicle, respectively.

Table of Contents

1.Background
2.Architecture
3.Sensor information
4.Coordinate system information
5.Data Comparison
6.Core Logics
7.Conclusion
8.Reference

1. Background

In recent years, the rise of IoT, artificial intelligence, and edge computing has brought significant advancements to various industries, including the automotive sector. Modern vehicles are equipped with a multitude of sensors that work together to create sophisticated systems, enabling everything from enhanced safety features to autonomous driving capabilities.

Typically, monitoring a vehicle’s state requires the use of advanced and expensive sensors to achieve precise and reliable measurements. However, for certain situations, such as obtaining data on circuits for a general understanding of vehicle dynamics, these high-end sensors may not be strictly necessary. Recognizing this, I decided to create a cost-effective system capable of measuring a vehicle's physical state using more affordable components.

I decided to focus on measuring data related to vehicle dynamics rather than ride comfort. This is because analyzing ride comfort requires capturing more high-frequency signals with suitable accuracy, which necessitates setting a high sampling frequency during measurement.

Specifically, in the frequency response characteristics of lateral acceleration with respect to steering angle, the resonant frequency generally appears below 5 Hz, although it depends on the vehicle speed. On the other hand, in ride comfort analysis, discussions often extend to the range of several tens of Hz.

Therefore, the primary goal of developing the measurement system is to create a low-cost data logging system that facilitates data analysis in the vehicle dynamics domain.

2. Architecture

The measurement system consists of the following hardware:

  • Raspberry Pi 3
  • BNO055
  • ELM327 OBD2 Scanner
  • Inverter
  • Monitor
  • keyboard / mouse(if need)

The Raspberry Pi acts as the main computer, collecting data from the BNO055 and ELM327. The Raspberry Pi communicates with the BNO055 via I2C and with the ELM327 via Bluetooth (see Figure 1).

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Figure 1: Hardware architecture

3. Sensor information

3-1. Adafruits BNO055

The BNO055 sensor is capable of measuring various types of motion and data:★Adafruit BNO055 Overview[1]

  • Absolute Orientation (Euler Vector, 100Hz): Provides three-axis orientation data based on a 360° sphere.

  • Absolute Orientation (Quaternion, 100Hz): Offers four-point quaternion output for more accurate data manipulation.

  • Angular Velocity Vector (100Hz): Measures rotation speed in radians per second (rad/s) across three axes.

  • Acceleration Vector (100Hz): Captures acceleration including gravity and linear motion in meters per second squared (m/s²) across three axes.

  • Magnetic Field Strength Vector (20Hz): Senses the magnetic field strength in microteslas (µT) across three axes.

  • Linear Acceleration Vector (100Hz): Records linear acceleration data (excluding gravity) in meters per second squared (m/s²) across three axes.

  • Gravity Vector (100Hz): Measures gravitational acceleration (excluding any movement) in meters per second squared (m/s²) across three axes.

  • Temperature (1Hz): Provides ambient temperature in degrees Celsius.

The BNO055 is compact and supported by Adafruit's Python library called Adafruit_CircuitPython. Additionally, a C-language library is available from Adafruit.

★Adafruit_CircuitPython_BNO055[2]

★Adafruit_BNO055[3]

For a demonstration, you can view the video here:
Adafruit BNO055 Demo
★Adafruit BNO055 Demo[4]

3-2. ELM327

ELM327 は、標準インターフェイスを介して車両エンジン データにアクセスできるようにする、広く使用されている OBD-II (オンボード診断) アダプターです。車両の ECU (電子制御ユニット) とコンピューターやスマートフォンなどの外部デバイスの間のブリッジとして機能し、診断データやパフォーマンス データの取得を可能にします。 ELM327 の主な特徴と機能は次のとおりです。★ELM327 情報[5]

  • OBD-II プロトコルのサポート: ELM327 は、ISO 9141、ISO 14230 (KWP2000)、ISO 15765 (CAN) などを含むさまざまな OBD-II プロトコルをサポートし、幅広い車両と互換性があります。

  • 診断トラブル コード (DTC): 車両の ECU から診断トラブル コードを読み取って消去し、問題の特定とトラブルシューティングを支援します。

  • ライブ データ ストリーミング: エンジン RPM、車両速度、冷却水温度、燃料レベルなどの車両センサーからのリアルタイム データを提供します。

  • フレーム データのフリーズ: 障害が検出された瞬間にデータをキャプチャして保存します。これは、断続的な問題の診断に役立ちます。

  • 車両情報: VIN (車両識別番号)、キャリブレーション ID などを含む車両に関する詳細を取得します。

  • 互換性: Bluetooth、USB、Wi-Fi バージョンなどのさまざまな形式で利用可能で、さまざまなデバイスやプラットフォームとの互換性が可能です。

通常、ELM327 は車両の状態、特にエンジンの状態を診断するために使用されます。ただし、車両から広範囲のデータを収集するために使用することもできます。 ELM327 は Amazon などのプラットフォームで購入でき、価格は約 20 ドルから 80 ドルの範囲です。

Python-OBD ライブラリを使用すると、ELM327 アダプター経由でデータを収集するクエリを作成でき、カスタマイズ可能な詳細なデータ収集が可能になります

4. 座標系情報

このシステムは複数のセンサーおよび実際の車両と対話するため、各座標系を理解する必要があります。

4-1. BNO055 データロガー用座標系

下の画像は、私のデータ測定システムの BNO055 座標系を示しています。私のシステムはさらに、四元数を使用して回転角度を計算します。センサーの座標系は車両の座標系と一致する必要があります。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 2: BNO055 座標系

4-2.車両座標系(ISO 8855)

測定システムは車両座標系に基づいて動作します。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 3: 車両座標系 [6] 出典: MathWorks、自動運転ツールボックスの座標系

4-3. iPhone 座標系

検証タスクには、加速度、ジャイロなどを含む複数の物理データ ポイントを測定できる iPhone アプリケーションを使用しました。 iPhone アプリケーションを使用する目的は、このアプリケーションのデータを使用してデータ測定システムの妥当性を確認することです。図 4 は Apple の開発者サイトから引用されています。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 4: iPhone 座標系 [7] 出典: Apple、生のジャイロスコープ イベントの取得

5. データの比較

このセクションでは、いくつかの実験結果を紹介します。最初のステップとして、測定システムが単位間違いなどのエラーなく読み取り値を出力するかどうかを確認するために、正弦波タスクを実行しました。この部分は基本的なものですが、データを収集する正しい方法を確保するために非常に重要です。
次に実車を走行させながら物理値を計測してみました。
次に、ELM327の性能と各センサーのサンプリング精度を確認してみました。

5-1.正弦波テスト

正弦波テストを実施したので、このセクションではその結果を紹介します。これらのテストの目的は、次のことを確認することです:

  • 測定システムの座標系が正しく機能しているかどうか
  • iPhone の測定アプリの精度を検証するには

これを実現するために、さまざまな物理値を測定できるスマホアプリ「phyphox」★phyphox[8]を使用しました。以下に示すように、BNO055 センサーを iPhone に取り付けました [図 5]。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 5: 正弦波テストを実施するためのセンサーと iPhone の取り付け方法のイメージ

図 6 は、サンプリング周波数 10 Hz での正弦波テストの結果を示しています。凡例の「VDDM」は私が開発した測定システムによって収集されたデータを表し、「iPhone」は「phyphox」を使用して収集されたデータを表します。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 6: 正弦波テストの結果
  • VDDM と iPhone の両方からのデータは、すべての軸にわたって同様の傾向を示しており、両方のアプリケーションが同様のモーション パターンをキャプチャしていることを示しています。

  • VDDM データは、特に加速度グラフで若干変動が大きいように見えます。これは、iPhone と比較して感度が高いか、または異なるフィルタリング アプローチを示唆している可能性があります。

  • 両方のデータは一般的によく一致していますが、振幅の違いは、精度要件に応じてさらなる校正または調整が必要になる可能性があることを示唆しています。

  • VDDM は、特に加速度グラフで、iPhone データには存在しない追加のノイズまたは高周波成分をキャプチャしている可能性があります。これは、詳細な分析に役立つ場合もあれば、さらなるフィルタリングが必要になる場合もあります。

5-2.車両運動データの評価

図7はBNO055の実車データに着目した実験結果を示しています。参考にしたのは「phyphox」というiPhoneアプリです。サンプリング周波数は50Hzであった。私が開発した車両のデータ ロギング システムによって測定された加速度と回転信号には、ノイズ、外れ値、NaN 値が含まれていることが観察されました。そこで、IQR 法やバターワース ローパス フィルターなどの後処理関数を適用しました。各グラフの凡例中の「fc」という用語はカットオフ周波数を表します。たとえば、fc_19.5 はカットオフ周波数 19.5Hz を示します。この実験は BNO055 センサーのみを使用して実行され、ELM327 機能は無効になっていることに注意してください。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 7: 実車テスト結果
  • VDDM (生データ) と iPhone からのデータは、すべての軸にわたって同様の傾向を示しています。これは、両方のシステムが同じモーション パターンをキャプチャしていることを示唆しています。

  • VDDM からのデータは、iPhone のデータと比較してばらつきが大きいようです。これは加速度グラフで特に顕著であり、VDDM がより高い感度でデータをキャプチャしているか、異なるフィルタリング アプローチを使用していることを示している可能性があります。

  • VDDM からのフィルター処理されたデータ (fc 19.5Hz および 10Hz) は、生データと比較して変動が減少しており、iPhone データに近づいています。これは、VDDM (生データ) にはより多くのノイズまたは高周波成分が含まれている可能性があることを示唆しています。

  • VDDM からの生データには外れ値と欠損値が含まれていましたが、フィルターを適用することで軽減されました。これは、ロール レートとピッチ レートのグラフで特に顕著です。

  • VDDM には iPhone に比べて高周波成分とノイズが多く含まれているようです。これは詳細な分析には有利ですが、ノイズ低減が重要な場合は追加のフィルタリングまたはハードウェア レベルの調整が必要になる場合があります。

5-3.車速評価

図 8 はエンジン性能に関するデータを示しています。これらのデータは、サンプリング周波数 4 Hz の ELM327 スキャナを使用して収集されました。データ収集中に BNO055 機能が無効になりました。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 8: 車両速度とエンジン データ
  • VDDM からの車両速度データ (生データ) と iPhone は同様の傾向を示します。
  • ELM327 はエンジン回転数とスロットル位置に関する情報を収集できます。これらのデータは、一般的なスロットル ペダルの操作を理解するのに役立ちます。なお、試験車両はHEVです。

5-4.サンプリング周波数の精度の評価

BNO055 と ELM327 を使った実験中に、サンプリングの遅延が観察されました。問題を特定するために、サンプリング周波数を調整し、ELM327 と BNO055 を有効または無効にして測定を実行しました。
まず、サンプリング周波数に対応する時間差を提示します。このテストには、BNO055 センサーと ELM327 センサーの両方の使用が含まれます

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 9: サンプリング周波数に対応する時間差

図 9 は、サンプリング周波数が 10Hz に設定されている場合、サンプリング時間の精度が大幅に低下することを示しています。
ELM327 は BNO055 ほど高いサンプリング周波数を達成できないため、システム全体のパフォーマンスに悪影響を及ぼす可能性があります。それにも関わらず、各センサーの性能をよりよく理解するために、個別の測定を実施しました。

図10と図11は、サンプリング周波数とサンプリングタイミング精度の関係を示しています。サンプリング タイミングの精度は次のように表されます。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 10: BNO055_サンプリング テストの結果

図 10 によると、BNO055 は最大 50 Hz のサンプリング周波数に応答できます。ただし、60 Hz を超えると、サンプリング タイミングの精度が大幅に低下します。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
図 11: BNO055 サンプリング テストの結果

図 11 は、ELM327 が BNO055 と比較して応答性が低いことを示しています。 ELM327 は、最大 5 Hz までのみ良好なサンプリング タイミング精度を維持します。これは、ELM327 が OBD 経由で車両と通信し、車両の ECU が OBD リクエストに対する高周波応答をサポートしていない可能性があり、その結果低周波通信が発生するためです。

さらに、ELM327 は、OBD-II 経由で車両内の複数の ECU からデータを取得できるセンサーの中で最も安価で品質の低いオプションです。
この投稿★車のトリップコンピューターの燃費数値に挑戦する[9] には、

と書かれています。

Elm327 に関して考慮すべき重要なことの 1 つは、応答時間が無視できるほどではないということです。デバイスはメッセージ要求に応答するまでに 0.1 ~ 0.3 秒かかる場合があり、これは、信号を追加しすぎると急速に応答時間が短縮されることを意味します。サンプリングレートを役に立たないレベルまで引き上げる

さらに詳しいアドバイスについては、こちらのページをご覧ください:★OBDII アダプターの選択[10]

パフォーマンスが低下する原因としては以下のことが考えられます。

  • 車両ECUの仕様
  • OBD スキャナーへのリクエストが多すぎます
  • 複数の ECU からの複数の信号のリクエスト
  • ソフトウェアの問題
  • ネットワークの問題
  • OBD-II プロトコルのオーバーヘッド
  • スキャナーのハードウェア制限
  • 車両通信速度
  • ファームウェアとドライバーの問題
  • 環境要因

6. コアロジック

私が開発している計測ソフトウェアは、複数のセンサーが必要なユースケースを想定しています。さらに、ユーザーは yaml ファイルの測定設定を変更できます。ユーザーはシンプルなGUI上で測定アプリケーションを操作できます。ソースコードはここで見ることができます: ★VDDM[11]

測定システムの構造は以下のとおりです:

VDDM/
│
├── fusion/
│   ├── __init__.py
│   ├── sensor_fusion.py   # Handles data fusion, collects data from sensors
│   └── sensors/
│       ├── __init__.py
│       ├── bno055_measurement.py   # Script for BNO055 sensor data collection
│       └── elm327_measurement.py   # Script for ELM327 OBD2 data collection
│
├── signalprocessing/
│   ├── __init__.py
│   └── filter.py           # Contains filtering algorithms (e.g., Butterworth filter)
│
├── config/
│   ├── config_manager.py    # Loads configuration from YAML
│   └── measurement_system_config.yaml   # Configuration file for sensors and system settings
│
├── utils/
│   ├── tools.py             # Utility functions (e.g., wait functions)
│   └── visualize_data.py    # Functions to format and display sensor data
│
├── measurement/
│   ├── __init__.py
│   └── measurement_control.py   # Controls the measurement process
│
├── gui/
│   ├── __init__.py
│   └── main_gui.py          # GUI setup for starting/stopping measurement
│
├── main.py                  # Entry point for starting the system
└── requirements.txt         # Project dependencies

Copy after login

以下の図は VDDM の流れを示しています。

main.py
  └── main_gui.py
        ├── Starts GUI interface and buttons
        └── measurement_control.py
              ├── Initializes sensor measurements
              ├── Controls start/stop of measurements
              └── sensor_fusion.py
                    ├── Manages sensor data collection and fusion
                    ├── Uses:
                    │    ├── bno055_measurement.py  (BNO055 sensor data collection)
                    │    └── elm327_measurement.py  (ELM327 OBD2 data collection)
                    └── Processes data with:
                         ├── filter.py  (Applies Butterworth low-pass filter)
                         └── visualize_data.py  (Formats and visualizes sensor data)

Copy after login

VDDM の主なコンポーネントは、VDDM/fusion/sensor_fusion.py、VDDM/fusion/sensors/bno055_measurement.py、および VDDM/fusion/sensors/elm327_measurement.py です。

# sensor_fusion.py
import os
import sys
import importlib
from time import perf_counter
from collections import defaultdict
import pandas as pd
import datetime
import asyncio
import numpy as np



parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)

from config import config_manager
from utils.tools import wait_process
from utils.visualize_data import format_sensor_fusion_data
from signalprocessing.filter import butterlowpass

config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class SensorFactory:
    @staticmethod
    def create_sensor(sensor_type, config):
        try:
            # Import the appropriate sensor module based on sensor_type
            module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement")
            # Get a sensor class
            sensor_class = getattr(module, f"{sensor_type.upper()}")
            # Create a sensor instance and return
            return sensor_class(config)
        except (ImportError, AttributeError) as e:
            print(f"Error creating sensor {sensor_type}: {e}")
            return None


class Sensors:
    def __init__(self, config):
        self.config = config_manager.load_config(config_path)
        self.sensor_list = tuple(self.config.sensors.keys())
        self.sensor_instances = {}
        self.is_running = False


        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv"
        self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s]
        # Buffer size is determined by the relation of sequence length and sampling frequency
        # Buffer secures data for SEQUENCE_LENGTH[s]
        self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.is_filter = config.filter_params.is_filter
        self.is_show_real_time_data = config.is_show_real_time_data
        self.TIMEZONE = config.timezone
        self.all_data_columns_list = ()
        for sensor_name in self.sensor_list:
            self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"])            




        self.data_buffer = pd.DataFrame()  # data buffer

        for sensor_type in self.sensor_list:
            sensor_config = self.config.sensors[sensor_type]
            sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config)
            if sensor_instance:
                self.sensor_instances[sensor_type] = sensor_instance


        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization")                


    def get_sensor(self, sensor_type):
        """
        Retrieve the sensor instance corresponding to the specified sensor type.

        Args:
            sensor_type (str): The type of the sensor to retrieve.

        Returns:
            object: The sensor instance corresponding to the specified sensor type.
                    Returns None if the sensor type does not exist.
        """
        return self.sensor_instances.get(sensor_type)

    def collect_data(self):
        """
        Collect data from all sensors.

        This method iterates over all sensor instances and collects data from each sensor.
        The collected data is stored in a dictionary where the keys are sensor types and
        the values are the data collected from the corresponding sensors.

        Returns:
            dict: A dictionary containing the collected data from all sensors.
                The keys are sensor types and the values are the data from each sensor.

        Raises:
            Exception: If an error occurs while collecting data from any sensor, the exception
                    is caught and printed.
        """
        data = {}
        try:
            for sensor_type, sensor in self.sensor_instances.items():
                # get data from sensors
                data[sensor_type] = sensor.get_data_from_sensor()
            return data
        except Exception as e:
            print(e)



    def on_change_start_measurement(self):
        """
        Start the measurement process.

        This method sets the is_running flag to True, indicating that the measurement
        process should start.
        """
        self.is_running = True

    def on_change_stop_measurement(self):
        """
        Stop the measurement process.

        This method sets the is_running flag to False, indicating that the measurement
        process should stop.
        """
        self.is_running = False


    def filtering(self, df, labellist):
        """
        Apply a low-pass filter to the specified columns in the DataFrame.

        This method applies a Butterworth low-pass filter to each column specified
        in the labellist. The "Time" column should be excluded from the labellist
        as it is not needed for the computation.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data to be filtered.
            labellist (list of str): A list of column names to be filtered. The "Time"
                                     column should not be included in this list.

        Returns:
            pd.DataFrame: A new DataFrame with the filtered data.
        """
        filtered_df = df.copy()
        for labelname in labellist:
            # Ensure the column is converted to a numpy array
            x = df[labelname].to_numpy()
            filtered_df[labelname] = butterlowpass(
                x=x,  # Correctly pass the numpy array as 'x'
                fpass=self.FPASS,
                fstop=self.FSTOP,
                gpass=self.GPASS,
                gstop=self.GSTOP,
                fs=self.SAMPLING_FREQUENCY_HZ,
                dt=self.SAMPLING_TIME,
                checkflag=False,
                labelname=labelname
            )
        return filtered_df

    def convert_dictdata(self, current_time, sensor_data_dict):
        """
        Convert nested dictionary data from multiple sensors into a single DataFrame.

        This method converts nested dictionary data obtained from multiple sensors
        into a single dictionary and then converts it into a pandas DataFrame. The
        current_time information is associated with the data.

        Args:
            current_time (float): The current time at which the data was obtained.
            sensor_data_dict (dict): A nested dictionary containing data from multiple sensors.

        Returns:
            pd.DataFrame: A DataFrame containing the converted data with the current time information.
        """
        converted_data = {'Time': current_time}
        for sensor, data in sensor_data_dict.items():
            converted_data.update(data)

        converted_data = pd.DataFrame([converted_data])

        return converted_data



    async def update_data_buffer(self, dict_data):
        """
        Add data from sensors to the buffer and save it if necessary.

        This method adds the provided sensor data to the internal buffer. If the buffer
        exceeds the specified maximum length, the oldest data is saved to a CSV file
        and removed from the buffer.

        Args:
            dict_data (dict): The data from sensors to be added to the buffer.
        """

        # Add data to the buffer
        self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True)

        # If the buffer exceeds the specified length, save the oldest data
        if len(self.data_buffer) > self.MAX_DATA_BUF_LEN:
            # Save the oldest data to a CSV file
            old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN)

            await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH)

            # Update the buffer
            self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN)        



    async def save_data_async(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method uses asyncio.to_thread to run the synchronous to_csv method
        in a separate thread, allowing it to be handled asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        if not os.path.isfile(path):
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w')
        else:
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a')

    async def save_data(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method calls save_data_async to save the DataFrame to a CSV file
        asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        await self.save_data_async(df, path)



    async def finish_measurement_and_save_data(self):
        """
        Finish the measurement process and save the data.

        This method finalizes the measurement process by saving the buffered data
        to a CSV file. It also applies filtering if specified and saves the filtered
        data to a separate CSV file. The method handles time zone settings and
        generates a timestamp for the file names.

        The buffered data is saved to a temporary CSV file, which is then read back
        and saved to a final file path with a timestamp. If filtering is enabled,
        the filtered data is also saved. The temporary CSV file is deleted after
        the data is saved.

        Raises:
            Exception: If an error occurs during the file operations.
        """
        t_delta = datetime.timedelta(hours=9)
        TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone
        now = datetime.datetime.now(TIMEZONE)
        timestamp = now.strftime('%Y%m%d%H%M%S')
        final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], 
                                                   timestamp + "/" + timestamp + '_' + 
                                                   self.SAVE_BUF_CSVDATA_PATH.split('/')[-1])
        await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH)
        raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0)
        os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True)
        raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True)


        if self.is_filter:
            filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:])
            filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True)

        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted")
        else:
            print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed")






async def sensor_fusion_main():
    """
    Main function for sensor fusion.

    This function initializes the sensor fusion process, starts the measurement loop,
    collects data from multiple sensors, updates the data buffer, and handles real-time
    data display. It also calculates and prints the sampling delay and reliability rate
    upon termination.

    The main loop runs until the measurement process is stopped, either by an exception
    or a keyboard interrupt.

    Raises:
        Exception: If an error occurs during the measurement process, it is caught and printed.
        KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped
                           and the data is saved.

    """
    print("Start sensor fusion main")
    config = config_manager.load_config(config_path)
    sensors = Sensors(config["master"])
    print("Called an instance of Sensors class")

    # sensors.start_all_measurements()
    sampling_counter = 0
    current_time = 0
    #sensors.is_running = True
    sensors.on_change_start_measurement()

    try:
        main_loop_start_time = None
        while sensors.is_running:
            iteration_start_time = perf_counter() # Start time of each iteration

            if main_loop_start_time is None:
                    main_loop_start_time = iteration_start_time  # initialize main loop start time

            current_time = perf_counter() - main_loop_start_time # Current time

            data = sensors.collect_data() # Get data from sensors                                        
            sampling_counter += 1 # Num of sampling

            converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format
            # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file.
            await sensors.update_data_buffer(converted_data)
            # Display data in real time. This process is executed on additional thread.
            if sensors.is_show_real_time_data:
                formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list)    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait based on the sampling interval and execution time to maintain the sampling frequency.
            iteration_end_time = perf_counter()
            iteration_duration = iteration_end_time - iteration_start_time 
            print("Iteration duration is: {0} [s]".format(iteration_duration))
            sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration)
            if sleep_time > 0:
                wait_process(sleep_time)

    except Exception as e:
        print(e)

    except KeyboardInterrupt:
        sensors.on_change_stop_measurement()
        print("KeyboardInterrupt")
        await sensors.finish_measurement_and_save_data()

    finally:
        print("finish")
         # Compute delay of sampling
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))



        # Compute ideal sampliing time
        ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ)
        # Cpmpute a delay
        delay_time = current_time - ideal_time
        # reliability rate
        sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100


        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))


if __name__ == '__main__':
    asyncio.run(sensor_fusion_main())
Copy after login

sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.

# bno055_measurement.py
import time
import numpy as np
import adafruit_bno055
import board
import os
import sys

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class BNO055:
    def __init__(self, config):
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter

        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data 

        i2c_instance = board.I2C() # Create i2c instance
        self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance

    def calibration(self):
        print("Start calibration!")
        while not self.bno055_sensor.calibrated:
            print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status)))
            time.sleep(1)


    def calcEulerfromQuaternion(self, _w, _x, _y, _z):
        """
        Calculate Euler angles (roll, pitch, yaw) from quaternion components.

        This method converts quaternion components (_w, _x, _y, _z) into Euler angles
        (roll, pitch, yaw) in degrees. If any of the quaternion components are None,
        it returns (0.0, 0.0, 0.0) and prints an error message.

        Args:
            _w (float): The w component of the quaternion.
            _x (float): The x component of the quaternion.
            _y (float): The y component of the quaternion.
            _z (float): The z component of the quaternion.

        Returns:
            tuple: A tuple containing the roll, pitch, and yaw angles in degrees.
                If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message.
        """
        if None in (_w, _x, _y, _z):
            print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}")
            return 0.0, 0.0, 0.0

        try:
            sqw = _w ** 2
            sqx = _x ** 2
            sqy = _y ** 2
            sqz = _z ** 2
            COEF_EULER2DEG = 57.2957795131

            # Yaw
            term1 = 2.0 * (_x * _y + _z * _w)
            term2 = sqx - sqy - sqz + sqw
            yaw = np.arctan2(term1, term2)

            # Pitch
            term1 = -2.0 * (_x * _z - _y * _w)
            term2 = sqx + sqy + sqz + sqw
            pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0

            # Roll
            term1 = 2.0 * (_y * _z + _x * _w)
            term2 = -sqx - sqy + sqz + sqw
            roll = np.arctan2(term1, term2)

            return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw

        except Exception as e:
            print(f"Error in calcEulerfromQuaternion: {e}")
            return 0.0, 0.0, 0.0

    def get_data_from_sensor(self):
        """
        Retrieve data from the BNO055 sensor and return it as a dictionary.

        This method collects various sensor readings from the BNO055 sensor, including
        Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field,
        and calibration status. It then constructs a dictionary with these values and
        returns only the columns specified in self.COLUMNS.

        Returns:
            dict: A dictionary containing the sensor data. Only the columns specified
                in self.COLUMNS are included in the returned dictionary.
        """
        # Get data
        euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler]  # X: yaw, Y: pitch, Z: roll
        gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro]  # Gyro[rad/s]
        linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration]  # Linear acceleration[m/s^2]
        quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion]  # Quaternion
        quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4)  # Cal Euler angle from quaternion
        magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic]  # Magnetic field
        calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status]  # Status of calibration


        data_dict = {
            "linear_accel_x": linear_accel_x,
            "linear_accel_y": linear_accel_y,
            "linear_accel_z": linear_accel_z,
            "gyro_x": gyro_x,
            "gyro_y": gyro_y,
            "gyro_z": gyro_z,
            "euler_x": euler_x,
            "euler_y": euler_y,
            "euler_z": euler_z,
            "quat_roll": quat_roll,
            "quat_pitch": quat_pitch,
            "quat_yaw": quat_yaw,
            "quaternion_1": quaternion_1,
            "quaternion_2": quaternion_2,
            "quaternion_3": quaternion_3,
            "quaternion_4": quaternion_4,
            "magnetic_x": magnetic_x,
            "magnetic_y": magnetic_y,
            "magnetic_z": magnetic_z,
            "calibstat_sys": calibstat_sys,
            "calibstat_gyro": calibstat_gyro,
            "calibstat_accel": calibstat_accel,
            "calibstat_mag": calibstat_mag
        }

        return {column: data_dict[column] for column in self.COLUMNS if column in data_dict}


def format_sensor_data(data, labels):
    """
    Format sensor data into a string for display.

    This method takes a dictionary of sensor data and a list of labels, and formats
    the data into a string where each label is followed by its corresponding value.
    If a value is None, it is replaced with the string "None". Each label-value pair
    is separated by " / ".

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the BNO055 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")

    config = load_config(config_path)
    meas_bno055 = BNO055(config.sensors['bno055'])

    start_time = perf_counter()
    sampling_counter = 0

    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_bno055.get_data_from_sensor()
            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_bno055.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_bno055.COLUMNS)
                # current time    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        # Calculate the sampling delay from the number of samples and the current time
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))




if __name__ == '__main__':
    test_main()
Copy after login

This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.

# elm327_measurement.py

import obd
import os
import time
from collections import deque
import numpy as np
import pandas as pd
import datetime
import asyncio
import scipy
from scipy import signal
import matplotlib as plt
import sys
from collections import defaultdict
import random

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')

class ELM327:
    def __init__(self, config):
        """
        Initialize the ELM327 class with configuration parameters.

        Args:
            config (dict): Configuration parameters for the ELM327.
        """
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter
        self.res = self.connect_to_elm327()

        self.is_offline = config.is_offline
        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data

    def initialize_BLE(self):
        """
        Initialize Bluetooth Low Energy (BLE) for ELM327 connection.
        """
        os.system('sudo hcitool scan')
        os.system('sudo hciconfig hci0 up')
        os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3')
        os.system('sudo rfcomm listen 0 1 &')

    def connect_to_elm327(self):
        """
        Establish a connection to the ELM327 device.

        Returns:
            res (obd.OBDStatus): The connection status of the ELM327 device.
        """
        res = None
        try:
            self.initialize_BLE()
            self.connection = obd.OBD()
            print(self.connection.status())
            res = self.connection.status()
            if res == obd.OBDStatus.CAR_CONNECTED:
                print("----------Connection establishment is successful!----------")
                return res
            else:
                print("----------Connection establishment failed!----------")
                print("End program. Please check settings of the computer and ELM327")
        except Exception as e:
            print("----------Exception!----------")
            print(e)
        finally:
            return res

    def get_data_from_sensor(self):
        """
        Retrieve data from the sensor.

        Returns:
            dict: A dictionary containing sensor data.
        """
        if self.is_offline:
            data = self.get_data_from_sensor_stub()
        else:
            # Retrieve data and save it in dictionary format
            data = {column: self.get_obd2_value(column) for column in self.COLUMNS}
        return data

    def get_obd2_value_debug(self, column):
        """
        Retrieve OBD-II value for a specific column with debug information.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response:
                print(f"Response for command '{command}': {response}")
                if response.value is not None:  # Check for None
                    print(f"Response value for command '{command}': {response.value}")
                    return response.value.magnitude
                else:
                    print(f"No value in response for command '{command}'")
            else:
                print(f"No response for command '{command}'")
        else:
            print(f"No command found for column '{column}'")
        return None

    def get_obd2_value(self, column):
        """
        Retrieve OBD-II value for a specific column.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response.value is not None:  # Check for None
                return response.value.magnitude
        return None

    def get_data_from_sensor_stub(self):
        """
        Generate stub data for the sensor.

        Returns:
            dict: A dictionary containing stub sensor data.
        """
        data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS}        
        # Randomly insert None or 0.0
        if random.choice([True, False]):
            random_column = random.choice(self.COLUMNS)
            if random.choice([True, False]):
                data_stub[random_column] = None
            else:
                data_stub[random_column] = 0.0

        return data_stub

def format_data_for_display(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    for label, value in zip(labels, data.values()):
        if value is None:
            value = "None"
        else:
            value = f"{value:.4f}"
        formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def format_sensor_data(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict or list): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    else:
        for label, value in zip(labels, data):
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the ELM327 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")
    config = load_config(config_path)
    meas_elm327 = ELM327(config.sensors['elm327'])
    # res = meas_elm327.connect_to_elm327()

    start_time = perf_counter()
    sampling_counter = 0
    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_elm327.get_data_from_sensor()

            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_elm327.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_elm327.COLUMNS)
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))

if __name__ == '__main__':
    test_main()
Copy after login

This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.

7. Conclusion

In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.

8. Reference

[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.

[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.

[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.

[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.

[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.

[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.

[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.

[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.

[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.

[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.

[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.

The above is the detailed content of Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template