Heim > Backend-Entwicklung > Python-Tutorial > Erstellen eines benutzerdefinierten Datenloggers mit Raspberry Pi für Fahrzeuge: Integration von BNO- und ELM-Sensoren

Erstellen eines benutzerdefinierten Datenloggers mit Raspberry Pi für Fahrzeuge: Integration von BNO- und ELM-Sensoren

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Freigeben: 2024-09-10 16:30:06
Original
930 Leute haben es durchsucht

Zusammenfassung

Als Softwareentwickler in der Automobilindustrie interessiere ich mich sehr für autonome Fahrzeuge, Datenmesstechniken und Analysemethoden. In diesem Beitrag werde ich ein maßgeschneidertes Messsystem beschreiben, den Prozess von Grund auf detailliert beschreiben und einige experimentelle Ergebnisse vorstellen. Mein Datenlogger besteht aus einem Raspberry Pi 3, einem BNO055-Sensor und einem ELM327-OBD-II-Adapter, die für die Berechnung, das Sammeln von Beschleunigungs- und Drehgeschwindigkeitsdaten bzw. das Abrufen von Motorinformationen von einem tatsächlichen Fahrzeug verantwortlich sind.

Inhaltsverzeichnis

1.Hintergrund
2.Architektur
3.Sensorinformationen
4.Informationen zum Koordinatensystem
5.Datenvergleich
6.Kernlogik
7.Fazit
8.Referenz

1. Hintergrund

In den letzten Jahren hat der Aufstieg von IoT, künstlicher Intelligenz und Edge Computing zu erheblichen Fortschritten in verschiedenen Branchen geführt, darunter auch im Automobilsektor. Moderne Fahrzeuge sind mit einer Vielzahl von Sensoren ausgestattet, die zusammenarbeiten, um anspruchsvolle Systeme zu schaffen, die alles von verbesserten Sicherheitsfunktionen bis hin zu autonomen Fahrfunktionen ermöglichen.

Typischerweise erfordert die Überwachung des Fahrzeugzustands den Einsatz fortschrittlicher und teurer Sensoren, um präzise und zuverlässige Messungen zu erzielen. In bestimmten Situationen, beispielsweise bei der Erfassung von Daten über Schaltkreise für ein allgemeines Verständnis der Fahrzeugdynamik, sind diese High-End-Sensoren jedoch möglicherweise nicht unbedingt erforderlich. Als ich dies erkannte, beschloss ich, ein kostengünstiges System zu entwickeln, das den physischen Zustand eines Fahrzeugs mit günstigeren Komponenten messen kann.

Ich habe beschlossen, mich auf die Messung von Daten zur Fahrzeugdynamik statt auf den Fahrkomfort zu konzentrieren. Denn für die Analyse des Fahrkomforts müssen mehr Hochfrequenzsignale mit ausreichender Genauigkeit erfasst werden, was die Einstellung einer hohen Abtastfrequenz bei der Messung erforderlich macht.

Insbesondere bei den Frequenzgangeigenschaften der Querbeschleunigung in Abhängigkeit vom Lenkwinkel liegt die Resonanzfrequenz im Allgemeinen unter 5 Hz, obwohl sie von der Fahrzeuggeschwindigkeit abhängt. Andererseits erstrecken sich die Diskussionen bei der Fahrkomfortanalyse oft auf den Bereich von mehreren zehn Hz.

Daher besteht das Hauptziel der Entwicklung des Messsystems darin, ein kostengünstiges Datenprotokollierungssystem zu schaffen, das die Datenanalyse im Bereich der Fahrzeugdynamik erleichtert.

2. Architektur

Das Messsystem besteht aus folgender Hardware:

  • Raspberry Pi 3
  • BNO055
  • ELM327 OBD2-Scanner
  • Wechselrichter
  • Überwachen
  • Tastatur/Maus (bei Bedarf)

Der Raspberry Pi fungiert als Hauptcomputer und sammelt Daten vom BNO055 und ELM327. Der Raspberry Pi kommuniziert mit dem BNO055 über I2C und mit dem ELM327 über Bluetooth (siehe Abbildung 1).

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 1: Hardware-Architektur

3. Sensorinformationen

3-1. Adafruits BNO055

Der BNO055-Sensor ist in der Lage, verschiedene Arten von Bewegungen und Daten zu messen:★Adafruit BNO055-Übersicht[1]

  • Absolute Orientierung (Euler-Vektor, 100 Hz): Bietet dreiachsige Orientierungsdaten basierend auf einer 360°-Kugel.

  • Absolute Orientierung (Quaternion, 100 Hz): Bietet eine Vierpunkt-Quaternion-Ausgabe für eine genauere Datenmanipulation.

  • Winkelgeschwindigkeitsvektor (100 Hz): Misst die Rotationsgeschwindigkeit im Bogenmaß pro Sekunde (rad/s) über drei Achsen.

  • Beschleunigungsvektor (100 Hz): Erfasst die Beschleunigung einschließlich Schwerkraft und linearer Bewegung in Metern pro Sekunde im Quadrat (m/s²) über drei Achsen.

  • Magnetfeldstärke-Vektor (20 Hz): Erfasst die Magnetfeldstärke in Mikrotesla (µT) über drei Achsen.

  • Linearer Beschleunigungsvektor (100 Hz): Zeichnet lineare Beschleunigungsdaten (ohne Schwerkraft) in Metern pro Sekunde im Quadrat (m/s²) über drei Achsen auf.

  • Schwerkraftvektor (100 Hz): Misst die Gravitationsbeschleunigung (ohne jegliche Bewegung) in Metern pro Sekunde im Quadrat (m/s²) über drei Achsen.

  • Temperatur (1 Hz): Gibt die Umgebungstemperatur in Grad Celsius an.

Der BNO055 ist kompakt und wird von Adafruits Python-Bibliothek namens Adafruit_CircuitPython unterstützt. Darüber hinaus ist eine C-Sprachbibliothek von Adafruit erhältlich.

★Adafruit_CircuitPython_BNO055[2]

★Adafruit_BNO055[3]

Für eine Demonstration können Sie sich das Video hier ansehen:
Adafruit BNO055 Demo
★Adafruit BNO055 Demo[4]

3-2. ELM327

Der ELM327 ist ein weit verbreiteter OBD-II-Adapter (On-Board-Diagnose), der den Zugriff auf Fahrzeugmotordaten über eine Standardschnittstelle ermöglicht. Es fungiert als Brücke zwischen der ECU (Electronic Control Unit) des Fahrzeugs und externen Geräten wie Computern oder Smartphones und ermöglicht den Abruf von Diagnose- und Leistungsdaten. Zu den wichtigsten Funktionen und Funktionen des ELM327 gehören:★ELM327-Informationen[5]

  • OBD-II-Protokollunterstützung: Der ELM327 unterstützt verschiedene OBD-II-Protokolle, darunter ISO 9141, ISO 14230 (KWP2000), ISO 15765 (CAN) und mehr, wodurch er mit einer Vielzahl von Fahrzeugen kompatibel ist.

  • Diagnosefehlercodes (DTCs): Es kann Diagnosefehlercodes aus dem Steuergerät des Fahrzeugs lesen und löschen und so bei der Identifizierung und Fehlerbehebung helfen.

  • Live-Daten-Streaming: Bietet Echtzeitdaten von den Sensoren des Fahrzeugs, wie Motordrehzahl, Fahrzeuggeschwindigkeit, Kühlmitteltemperatur und Kraftstoffstand.

  • Standbilddaten: Erfasst und speichert Daten in dem Moment, in dem ein Fehler erkannt wird, was bei der Diagnose intermittierender Probleme hilft.

  • Fahrzeuginformationen: Ruft Details zum Fahrzeug ab, einschließlich VIN (Fahrzeugidentifikationsnummer), Kalibrierungs-IDs und mehr.

  • Kompatibilität: Verfügbar in verschiedenen Formen, einschließlich Bluetooth-, USB- und Wi-Fi-Versionen, was die Kompatibilität mit verschiedenen Geräten und Plattformen ermöglicht.

Typischerweise wird der ELM327 zur Diagnose des Fahrzeugstatus, insbesondere des Motorzustands, verwendet. Es können aber auch vielfältige Daten aus dem Fahrzeug erfasst werden. Der ELM327 ist auf Plattformen wie Amazon zu Preisen zwischen etwa 20 und 80 US-Dollar erhältlich.

Mit der Python-OBD-Bibliothek können Sie Abfragen erstellen, um Daten über den ELM327-Adapter zu sammeln, was eine anpassbare und detaillierte Datenerfassung ermöglicht

4. Informationen zum Koordinatensystem

Da dieses System mit mehreren Sensoren und einem tatsächlichen Fahrzeug interagiert, muss es jedes Koordinatensystem verstehen.

4-1. BNO055-Koordinatensystem für Datenlogger

Das Bild unten zeigt das BNO055-Koordinatensystem für mein Datenmesssystem. Mein System berechnet zusätzlich Drehwinkel mithilfe von Quaternionen. Das Sensorkoordinatensystem muss mit dem Fahrzeugkoordinatensystem übereinstimmen.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 2: BNO055-Koordinatensystem

4-2. Fahrzeugkoordinatensystem (ISO 8855)

Das Messsystem basiert auf dem Fahrzeugkoordinatensystem.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 3: Fahrzeugkoordinatensystem [6] Quelle: MathWorks, Coordinate Systems in Automated Driving Toolbox.

4-3. iPhone-Koordinatensystem

Ich habe für Validierungsaufgaben eine iPhone-Anwendung verwendet, die mehrere physische Datenpunkte messen kann, darunter Beschleunigung, Kreisel und mehr. Der Zweck der Nutzung der iPhone-Anwendung besteht darin, die Gültigkeit meines Datenmesssystems mit Daten aus dieser Anwendung zu bestätigen. Abbildung 4 stammt von der Entwicklerseite von Apple.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 4: iPhone-Koordinatensystem [7] Quelle: Apple, Erhalten roher Gyroskopereignisse.

5. Datenvergleich

In diesem Abschnitt präsentiere ich einige experimentelle Ergebnisse. Im ersten Schritt habe ich Sinuswellenaufgaben durchgeführt, um zu überprüfen, ob mein Messsystem Messwerte ohne Fehler, wie z. B. Einheitenfehler, ausgibt. Dieser Teil ist grundlegend, aber entscheidend, um sicherzustellen, dass Daten korrekt erfasst werden.
Als nächstes habe ich während der Fahrt einige physikalische Werte an einem tatsächlichen Fahrzeug gemessen.
Dann habe ich die Leistung und Abtastgenauigkeit jedes Sensors im ELM327 überprüft.

5-1. Sinuswellentests

Ich habe Sinuswellentests durchgeführt und in diesem Abschnitt präsentiere ich die Ergebnisse. Das Ziel dieser Tests besteht darin, Folgendes zu bestätigen:

  • Ob das Koordinatensystem des Messsystems ordnungsgemäß funktioniert
  • Um die Genauigkeit der Mess-App auf dem iPhone zu überprüfen

Um dies zu erreichen, habe ich „phyphox“ ★phyphox[8] verwendet, eine Smartphone-App, die verschiedene physikalische Werte messen kann. Ich habe meinen BNO055-Sensor wie unten gezeigt am iPhone montiert [Abbildung 5].

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 5: Bild der Sensor- und iPhone-Montagemethode zur Durchführung von Sinuswellentests

Abbildung 6 zeigt die Ergebnisse des Sinuswellentests mit einer Abtastfrequenz von 10 Hz. Die Legende „VDDM“ steht für Daten, die von einem von mir entwickelten Messsystem erfasst wurden, während „iPhone“ für Daten steht, die mit „phyphox“ erfasst wurden.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 6: Ergebnisse der Sinuswellentests
  • Die Daten von VDDM und dem iPhone zeigen ähnliche Trends auf allen Achsen, was darauf hindeutet, dass beide Anwendungen ähnliche Bewegungsmuster erfassen.

  • Die VDDM-Daten scheinen etwas variabler zu sein, insbesondere in den Beschleunigungsdiagrammen, was auf eine höhere Empfindlichkeit oder einen anderen Filteransatz im Vergleich zum iPhone hinweisen könnte.

  • Beide Daten stimmen im Allgemeinen gut überein, aber die Unterschiede in der Amplitude lassen darauf schließen, dass je nach Genauigkeitsanforderungen möglicherweise eine weitere Kalibrierung oder Anpassung erforderlich ist.

  • VDDM erfasst möglicherweise zusätzliches Rauschen oder höherfrequente Komponenten, insbesondere in den Beschleunigungsdiagrammen, die in den iPhone-Daten nicht vorhanden sind. Dies kann entweder für eine detaillierte Analyse von Vorteil sein oder eine weitere Filterung erfordern.

5-2. Auswertung von Fahrzeugdynamikdaten

Abbildung 7 zeigt die Ergebnisse eines Experiments, das sich auf Daten des BNO055 in einem tatsächlichen Fahrzeug konzentriert. Als Referenz diente eine iPhone-Anwendung namens „phyphox“. Die Abtastfrequenz betrug 50 Hz. Ich habe festgestellt, dass die Beschleunigungs- und Rotationssignale, die mit dem von mir am Fahrzeug entwickelten Datenprotokollierungssystem gemessen wurden, Rauschen, Ausreißer und NaN-Werte enthielten. Daher habe ich Nachbearbeitungsfunktionen angewendet, darunter eine IQR-Methode und einen Butterworth-Tiefpassfilter. Der Begriff „fc“ in der Legende jedes Diagramms stellt die Grenzfrequenz dar. Beispielsweise gibt fc_19.5 eine Grenzfrequenz von 19,5 Hz an. Beachten Sie, dass dieses Experiment nur mit dem BNO055-Sensor durchgeführt wurde und die ELM327-Funktion deaktiviert war.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 7: Tatsächliches Fahrzeugtestergebnis
  • Die Daten von VDDM (Rohdaten) und dem iPhone zeigen ähnliche Trends über alle Achsen hinweg. Dies deutet darauf hin, dass beide Systeme die gleichen Bewegungsmuster erfassen.

  • Die Daten von VDDM scheinen im Vergleich zu den iPhone-Daten größere Schwankungen aufzuweisen. Dies macht sich insbesondere in den Beschleunigungsdiagrammen bemerkbar und kann darauf hindeuten, dass VDDM Daten mit höherer Empfindlichkeit erfasst oder einen anderen Filteransatz verwendet.

  • Gefilterte Daten von VDDM (mit fc 19,5 Hz und 10 Hz) weisen im Vergleich zu den Rohdaten eine geringere Variation auf und liegen näher an den iPhone-Daten. Dies deutet darauf hin, dass VDDM (Rohdaten) möglicherweise mehr Rauschen oder Hochfrequenzkomponenten enthält.

  • Rohdaten von VDDM enthielten Ausreißer und fehlende Werte, die durch die Anwendung von Filtern gemildert wurden. Dies wird insbesondere in den Rollraten- und Nickratendiagrammen deutlich.

  • VDDM scheint im Vergleich zum iPhone mehr Hochfrequenzkomponenten und Rauschen zu enthalten. Während dies für detaillierte Analysen von Vorteil sein kann, sind möglicherweise zusätzliche Filter oder Anpassungen auf Hardwareebene erforderlich, wenn die Rauschunterdrückung von entscheidender Bedeutung ist.

5-3. Bewertung der Fahrzeuggeschwindigkeit

Abbildung 8 zeigt Daten zur Motorleistung. Diese Daten wurden mit dem ELM327-Scanner mit einer Abtastfrequenz von 4 Hz erfasst. Die BNO055-Funktion wurde während der Datenerfassung deaktiviert.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 8: Fahrzeuggeschwindigkeit und Motordaten
  • Fahrzeuggeschwindigkeitsdaten von VDDM (Rohdaten) und dem iPhone weisen ähnliche Trends auf.
  • Der ELM327 kann Informationen über Motordrehzahl und Drosselklappenstellung sammeln. Diese Daten sind nützlich, um die allgemeine Funktionsweise des Gaspedals zu verstehen. Beachten Sie, dass es sich bei dem Testfahrzeug um ein HEV handelt.

5-4. Bewertung der Genauigkeit der Abtastfrequenz

Während meiner Experimente mit BNO055 und ELM327 habe ich Verzögerungen bei der Probenentnahme beobachtet. Um die Probleme zu identifizieren, habe ich Messungen durchgeführt, indem ich die Abtastfrequenz angepasst und ELM327 und BNO055 aktiviert und deaktiviert habe.
Zunächst stelle ich den Zeitunterschied dar, der der Abtastfrequenz entspricht. Bei diesem Test werden sowohl der BNO055- als auch der ELM327-Sensor verwendet

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 9: Zeitunterschied entsprechend der Abtastfrequenz

Abbildung 9 zeigt, dass die Genauigkeit der Abtastzeit erheblich abnimmt, wenn die Abtastfrequenz auf 10 Hz eingestellt ist.
Der ELM327 kann keine so hohen Abtastfrequenzen erreichen wie der BNO055, was sich negativ auf die Gesamtsystemleistung auswirken kann. Um die Leistung jedes einzelnen Sensors besser zu verstehen, habe ich dennoch Einzelmessungen durchgeführt.

Abbildungen 10 und 11 veranschaulichen den Zusammenhang zwischen Abtastfrequenzen und der Präzision des Abtastzeitpunkts. Die Präzision des Abtastzeitpunkts wird wie folgt dargestellt:

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 10: BNO055_Sampling-Testergebnis

Gemäß Abbildung 10 kann der BNO055 auf Abtastfrequenzen bis zu 50 Hz reagieren. Jenseits von 60 Hz nimmt die Präzision des Sampling-Timings jedoch dramatisch ab.

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Abbildung 11: Ergebnis des BNO055-Probenahmetests

Abbildung 11 zeigt, dass der ELM327 im Vergleich zum BNO055 eine geringere Reaktionsfähigkeit aufweist. Der ELM327 behält eine gute Sampling-Timing-Präzision nur bis zu 5 Hz bei. Dies liegt daran, dass der ELM327 über OBD mit einem Fahrzeug kommuniziert und das Steuergerät des Fahrzeugs möglicherweise keine hochfrequenten Antworten auf OBD-Anfragen unterstützt, was zu einer niederfrequenten Kommunikation führt.

Darüber hinaus ist der ELM327 die günstigste und qualitativ minderwertige Option unter den Sensoren, die über OBD-II Daten von mehreren Steuergeräten in einem Fahrzeug abrufen können.
In diesem Beitrag heißt es: „Die Kraftstoffeffizienzwerte Ihres Bordcomputers in Frage stellen“[9]

Eine wichtige Sache, die man beim Elm327 berücksichtigen sollte, ist, dass die Reaktionszeiten keineswegs vernachlässigbar sind: Das Gerät kann 0,1 bis 0,3 Sekunden brauchen, um auf eine Nachrichtenanfrage zu antworten, was bedeutet, dass das Hinzufügen zu vieler Signale schnell zu einer Verringerung der Antwortzeit führt Abtastrate bis zu einem Punkt, an dem sie nutzlos wird

Ausführlichere Ratschläge finden Sie auf dieser Seite:★OBDII-Adapter auswählen[10]

Als Ursachen für die geringe Leistung kommen folgende Gründe in Betracht:

  • Spezifikationen des Fahrzeug-ECU
  • Zu viele Anfragen an den OBD-Scanner
  • Anfragen für mehrere Signale von mehreren Steuergeräten
  • Probleme mit der Software
  • Netzwerkprobleme
  • OBD-II-Protokoll-Overhead
  • Einschränkungen der Scanner-Hardware
  • Fahrzeugkommunikationsgeschwindigkeit
  • Firmware- und Treiberprobleme
  • Umweltfaktoren

6. Kernlogiken

Messsoftware, die ich entwickelt habe, geht von Anwendungsfällen aus, die mehrere Sensoren benötigen. Darüber hinaus kann ein Benutzer die Messeinstellungen in der Yaml-Datei ändern. Ein Benutzer kann die Messanwendung über eine einfache GUI bedienen. Den Quellcode können Sie hier sehen: ★VDDM[11]

Die Struktur des Messsystems ist unten:

VDDM/
│
├── fusion/
│   ├── __init__.py
│   ├── sensor_fusion.py   # Handles data fusion, collects data from sensors
│   └── sensors/
│       ├── __init__.py
│       ├── bno055_measurement.py   # Script for BNO055 sensor data collection
│       └── elm327_measurement.py   # Script for ELM327 OBD2 data collection
│
├── signalprocessing/
│   ├── __init__.py
│   └── filter.py           # Contains filtering algorithms (e.g., Butterworth filter)
│
├── config/
│   ├── config_manager.py    # Loads configuration from YAML
│   └── measurement_system_config.yaml   # Configuration file for sensors and system settings
│
├── utils/
│   ├── tools.py             # Utility functions (e.g., wait functions)
│   └── visualize_data.py    # Functions to format and display sensor data
│
├── measurement/
│   ├── __init__.py
│   └── measurement_control.py   # Controls the measurement process
│
├── gui/
│   ├── __init__.py
│   └── main_gui.py          # GUI setup for starting/stopping measurement
│
├── main.py                  # Entry point for starting the system
└── requirements.txt         # Project dependencies

Nach dem Login kopieren

Das folgende Diagramm zeigt den VDDM-Fluss.

main.py
  └── main_gui.py
        ├── Starts GUI interface and buttons
        └── measurement_control.py
              ├── Initializes sensor measurements
              ├── Controls start/stop of measurements
              └── sensor_fusion.py
                    ├── Manages sensor data collection and fusion
                    ├── Uses:
                    │    ├── bno055_measurement.py  (BNO055 sensor data collection)
                    │    └── elm327_measurement.py  (ELM327 OBD2 data collection)
                    └── Processes data with:
                         ├── filter.py  (Applies Butterworth low-pass filter)
                         └── visualize_data.py  (Formats and visualizes sensor data)

Nach dem Login kopieren

Die Hauptkomponenten von VDDM sind VDDM/fusion/sensor_fusion.py, VDDM/fusion/sensors/bno055_measurement.py und VDDM/fusion/sensors/elm327_measurement.py.

# sensor_fusion.py
import os
import sys
import importlib
from time import perf_counter
from collections import defaultdict
import pandas as pd
import datetime
import asyncio
import numpy as np



parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)

from config import config_manager
from utils.tools import wait_process
from utils.visualize_data import format_sensor_fusion_data
from signalprocessing.filter import butterlowpass

config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class SensorFactory:
    @staticmethod
    def create_sensor(sensor_type, config):
        try:
            # Import the appropriate sensor module based on sensor_type
            module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement")
            # Get a sensor class
            sensor_class = getattr(module, f"{sensor_type.upper()}")
            # Create a sensor instance and return
            return sensor_class(config)
        except (ImportError, AttributeError) as e:
            print(f"Error creating sensor {sensor_type}: {e}")
            return None


class Sensors:
    def __init__(self, config):
        self.config = config_manager.load_config(config_path)
        self.sensor_list = tuple(self.config.sensors.keys())
        self.sensor_instances = {}
        self.is_running = False


        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv"
        self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s]
        # Buffer size is determined by the relation of sequence length and sampling frequency
        # Buffer secures data for SEQUENCE_LENGTH[s]
        self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.is_filter = config.filter_params.is_filter
        self.is_show_real_time_data = config.is_show_real_time_data
        self.TIMEZONE = config.timezone
        self.all_data_columns_list = ()
        for sensor_name in self.sensor_list:
            self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"])            




        self.data_buffer = pd.DataFrame()  # data buffer

        for sensor_type in self.sensor_list:
            sensor_config = self.config.sensors[sensor_type]
            sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config)
            if sensor_instance:
                self.sensor_instances[sensor_type] = sensor_instance


        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization")                


    def get_sensor(self, sensor_type):
        """
        Retrieve the sensor instance corresponding to the specified sensor type.

        Args:
            sensor_type (str): The type of the sensor to retrieve.

        Returns:
            object: The sensor instance corresponding to the specified sensor type.
                    Returns None if the sensor type does not exist.
        """
        return self.sensor_instances.get(sensor_type)

    def collect_data(self):
        """
        Collect data from all sensors.

        This method iterates over all sensor instances and collects data from each sensor.
        The collected data is stored in a dictionary where the keys are sensor types and
        the values are the data collected from the corresponding sensors.

        Returns:
            dict: A dictionary containing the collected data from all sensors.
                The keys are sensor types and the values are the data from each sensor.

        Raises:
            Exception: If an error occurs while collecting data from any sensor, the exception
                    is caught and printed.
        """
        data = {}
        try:
            for sensor_type, sensor in self.sensor_instances.items():
                # get data from sensors
                data[sensor_type] = sensor.get_data_from_sensor()
            return data
        except Exception as e:
            print(e)



    def on_change_start_measurement(self):
        """
        Start the measurement process.

        This method sets the is_running flag to True, indicating that the measurement
        process should start.
        """
        self.is_running = True

    def on_change_stop_measurement(self):
        """
        Stop the measurement process.

        This method sets the is_running flag to False, indicating that the measurement
        process should stop.
        """
        self.is_running = False


    def filtering(self, df, labellist):
        """
        Apply a low-pass filter to the specified columns in the DataFrame.

        This method applies a Butterworth low-pass filter to each column specified
        in the labellist. The "Time" column should be excluded from the labellist
        as it is not needed for the computation.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data to be filtered.
            labellist (list of str): A list of column names to be filtered. The "Time"
                                     column should not be included in this list.

        Returns:
            pd.DataFrame: A new DataFrame with the filtered data.
        """
        filtered_df = df.copy()
        for labelname in labellist:
            # Ensure the column is converted to a numpy array
            x = df[labelname].to_numpy()
            filtered_df[labelname] = butterlowpass(
                x=x,  # Correctly pass the numpy array as 'x'
                fpass=self.FPASS,
                fstop=self.FSTOP,
                gpass=self.GPASS,
                gstop=self.GSTOP,
                fs=self.SAMPLING_FREQUENCY_HZ,
                dt=self.SAMPLING_TIME,
                checkflag=False,
                labelname=labelname
            )
        return filtered_df

    def convert_dictdata(self, current_time, sensor_data_dict):
        """
        Convert nested dictionary data from multiple sensors into a single DataFrame.

        This method converts nested dictionary data obtained from multiple sensors
        into a single dictionary and then converts it into a pandas DataFrame. The
        current_time information is associated with the data.

        Args:
            current_time (float): The current time at which the data was obtained.
            sensor_data_dict (dict): A nested dictionary containing data from multiple sensors.

        Returns:
            pd.DataFrame: A DataFrame containing the converted data with the current time information.
        """
        converted_data = {'Time': current_time}
        for sensor, data in sensor_data_dict.items():
            converted_data.update(data)

        converted_data = pd.DataFrame([converted_data])

        return converted_data



    async def update_data_buffer(self, dict_data):
        """
        Add data from sensors to the buffer and save it if necessary.

        This method adds the provided sensor data to the internal buffer. If the buffer
        exceeds the specified maximum length, the oldest data is saved to a CSV file
        and removed from the buffer.

        Args:
            dict_data (dict): The data from sensors to be added to the buffer.
        """

        # Add data to the buffer
        self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True)

        # If the buffer exceeds the specified length, save the oldest data
        if len(self.data_buffer) > self.MAX_DATA_BUF_LEN:
            # Save the oldest data to a CSV file
            old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN)

            await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH)

            # Update the buffer
            self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN)        



    async def save_data_async(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method uses asyncio.to_thread to run the synchronous to_csv method
        in a separate thread, allowing it to be handled asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        if not os.path.isfile(path):
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w')
        else:
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a')

    async def save_data(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method calls save_data_async to save the DataFrame to a CSV file
        asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        await self.save_data_async(df, path)



    async def finish_measurement_and_save_data(self):
        """
        Finish the measurement process and save the data.

        This method finalizes the measurement process by saving the buffered data
        to a CSV file. It also applies filtering if specified and saves the filtered
        data to a separate CSV file. The method handles time zone settings and
        generates a timestamp for the file names.

        The buffered data is saved to a temporary CSV file, which is then read back
        and saved to a final file path with a timestamp. If filtering is enabled,
        the filtered data is also saved. The temporary CSV file is deleted after
        the data is saved.

        Raises:
            Exception: If an error occurs during the file operations.
        """
        t_delta = datetime.timedelta(hours=9)
        TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone
        now = datetime.datetime.now(TIMEZONE)
        timestamp = now.strftime('%Y%m%d%H%M%S')
        final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], 
                                                   timestamp + "/" + timestamp + '_' + 
                                                   self.SAVE_BUF_CSVDATA_PATH.split('/')[-1])
        await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH)
        raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0)
        os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True)
        raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True)


        if self.is_filter:
            filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:])
            filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True)

        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted")
        else:
            print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed")






async def sensor_fusion_main():
    """
    Main function for sensor fusion.

    This function initializes the sensor fusion process, starts the measurement loop,
    collects data from multiple sensors, updates the data buffer, and handles real-time
    data display. It also calculates and prints the sampling delay and reliability rate
    upon termination.

    The main loop runs until the measurement process is stopped, either by an exception
    or a keyboard interrupt.

    Raises:
        Exception: If an error occurs during the measurement process, it is caught and printed.
        KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped
                           and the data is saved.

    """
    print("Start sensor fusion main")
    config = config_manager.load_config(config_path)
    sensors = Sensors(config["master"])
    print("Called an instance of Sensors class")

    # sensors.start_all_measurements()
    sampling_counter = 0
    current_time = 0
    #sensors.is_running = True
    sensors.on_change_start_measurement()

    try:
        main_loop_start_time = None
        while sensors.is_running:
            iteration_start_time = perf_counter() # Start time of each iteration

            if main_loop_start_time is None:
                    main_loop_start_time = iteration_start_time  # initialize main loop start time

            current_time = perf_counter() - main_loop_start_time # Current time

            data = sensors.collect_data() # Get data from sensors                                        
            sampling_counter += 1 # Num of sampling

            converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format
            # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file.
            await sensors.update_data_buffer(converted_data)
            # Display data in real time. This process is executed on additional thread.
            if sensors.is_show_real_time_data:
                formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list)    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait based on the sampling interval and execution time to maintain the sampling frequency.
            iteration_end_time = perf_counter()
            iteration_duration = iteration_end_time - iteration_start_time 
            print("Iteration duration is: {0} [s]".format(iteration_duration))
            sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration)
            if sleep_time > 0:
                wait_process(sleep_time)

    except Exception as e:
        print(e)

    except KeyboardInterrupt:
        sensors.on_change_stop_measurement()
        print("KeyboardInterrupt")
        await sensors.finish_measurement_and_save_data()

    finally:
        print("finish")
         # Compute delay of sampling
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))



        # Compute ideal sampliing time
        ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ)
        # Cpmpute a delay
        delay_time = current_time - ideal_time
        # reliability rate
        sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100


        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))


if __name__ == '__main__':
    asyncio.run(sensor_fusion_main())
Nach dem Login kopieren

sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.

# bno055_measurement.py
import time
import numpy as np
import adafruit_bno055
import board
import os
import sys

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class BNO055:
    def __init__(self, config):
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter

        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data 

        i2c_instance = board.I2C() # Create i2c instance
        self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance

    def calibration(self):
        print("Start calibration!")
        while not self.bno055_sensor.calibrated:
            print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status)))
            time.sleep(1)


    def calcEulerfromQuaternion(self, _w, _x, _y, _z):
        """
        Calculate Euler angles (roll, pitch, yaw) from quaternion components.

        This method converts quaternion components (_w, _x, _y, _z) into Euler angles
        (roll, pitch, yaw) in degrees. If any of the quaternion components are None,
        it returns (0.0, 0.0, 0.0) and prints an error message.

        Args:
            _w (float): The w component of the quaternion.
            _x (float): The x component of the quaternion.
            _y (float): The y component of the quaternion.
            _z (float): The z component of the quaternion.

        Returns:
            tuple: A tuple containing the roll, pitch, and yaw angles in degrees.
                If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message.
        """
        if None in (_w, _x, _y, _z):
            print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}")
            return 0.0, 0.0, 0.0

        try:
            sqw = _w ** 2
            sqx = _x ** 2
            sqy = _y ** 2
            sqz = _z ** 2
            COEF_EULER2DEG = 57.2957795131

            # Yaw
            term1 = 2.0 * (_x * _y + _z * _w)
            term2 = sqx - sqy - sqz + sqw
            yaw = np.arctan2(term1, term2)

            # Pitch
            term1 = -2.0 * (_x * _z - _y * _w)
            term2 = sqx + sqy + sqz + sqw
            pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0

            # Roll
            term1 = 2.0 * (_y * _z + _x * _w)
            term2 = -sqx - sqy + sqz + sqw
            roll = np.arctan2(term1, term2)

            return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw

        except Exception as e:
            print(f"Error in calcEulerfromQuaternion: {e}")
            return 0.0, 0.0, 0.0

    def get_data_from_sensor(self):
        """
        Retrieve data from the BNO055 sensor and return it as a dictionary.

        This method collects various sensor readings from the BNO055 sensor, including
        Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field,
        and calibration status. It then constructs a dictionary with these values and
        returns only the columns specified in self.COLUMNS.

        Returns:
            dict: A dictionary containing the sensor data. Only the columns specified
                in self.COLUMNS are included in the returned dictionary.
        """
        # Get data
        euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler]  # X: yaw, Y: pitch, Z: roll
        gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro]  # Gyro[rad/s]
        linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration]  # Linear acceleration[m/s^2]
        quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion]  # Quaternion
        quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4)  # Cal Euler angle from quaternion
        magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic]  # Magnetic field
        calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status]  # Status of calibration


        data_dict = {
            "linear_accel_x": linear_accel_x,
            "linear_accel_y": linear_accel_y,
            "linear_accel_z": linear_accel_z,
            "gyro_x": gyro_x,
            "gyro_y": gyro_y,
            "gyro_z": gyro_z,
            "euler_x": euler_x,
            "euler_y": euler_y,
            "euler_z": euler_z,
            "quat_roll": quat_roll,
            "quat_pitch": quat_pitch,
            "quat_yaw": quat_yaw,
            "quaternion_1": quaternion_1,
            "quaternion_2": quaternion_2,
            "quaternion_3": quaternion_3,
            "quaternion_4": quaternion_4,
            "magnetic_x": magnetic_x,
            "magnetic_y": magnetic_y,
            "magnetic_z": magnetic_z,
            "calibstat_sys": calibstat_sys,
            "calibstat_gyro": calibstat_gyro,
            "calibstat_accel": calibstat_accel,
            "calibstat_mag": calibstat_mag
        }

        return {column: data_dict[column] for column in self.COLUMNS if column in data_dict}


def format_sensor_data(data, labels):
    """
    Format sensor data into a string for display.

    This method takes a dictionary of sensor data and a list of labels, and formats
    the data into a string where each label is followed by its corresponding value.
    If a value is None, it is replaced with the string "None". Each label-value pair
    is separated by " / ".

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the BNO055 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")

    config = load_config(config_path)
    meas_bno055 = BNO055(config.sensors['bno055'])

    start_time = perf_counter()
    sampling_counter = 0

    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_bno055.get_data_from_sensor()
            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_bno055.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_bno055.COLUMNS)
                # current time    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        # Calculate the sampling delay from the number of samples and the current time
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))




if __name__ == '__main__':
    test_main()
Nach dem Login kopieren

This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.

# elm327_measurement.py

import obd
import os
import time
from collections import deque
import numpy as np
import pandas as pd
import datetime
import asyncio
import scipy
from scipy import signal
import matplotlib as plt
import sys
from collections import defaultdict
import random

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')

class ELM327:
    def __init__(self, config):
        """
        Initialize the ELM327 class with configuration parameters.

        Args:
            config (dict): Configuration parameters for the ELM327.
        """
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter
        self.res = self.connect_to_elm327()

        self.is_offline = config.is_offline
        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data

    def initialize_BLE(self):
        """
        Initialize Bluetooth Low Energy (BLE) for ELM327 connection.
        """
        os.system('sudo hcitool scan')
        os.system('sudo hciconfig hci0 up')
        os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3')
        os.system('sudo rfcomm listen 0 1 &')

    def connect_to_elm327(self):
        """
        Establish a connection to the ELM327 device.

        Returns:
            res (obd.OBDStatus): The connection status of the ELM327 device.
        """
        res = None
        try:
            self.initialize_BLE()
            self.connection = obd.OBD()
            print(self.connection.status())
            res = self.connection.status()
            if res == obd.OBDStatus.CAR_CONNECTED:
                print("----------Connection establishment is successful!----------")
                return res
            else:
                print("----------Connection establishment failed!----------")
                print("End program. Please check settings of the computer and ELM327")
        except Exception as e:
            print("----------Exception!----------")
            print(e)
        finally:
            return res

    def get_data_from_sensor(self):
        """
        Retrieve data from the sensor.

        Returns:
            dict: A dictionary containing sensor data.
        """
        if self.is_offline:
            data = self.get_data_from_sensor_stub()
        else:
            # Retrieve data and save it in dictionary format
            data = {column: self.get_obd2_value(column) for column in self.COLUMNS}
        return data

    def get_obd2_value_debug(self, column):
        """
        Retrieve OBD-II value for a specific column with debug information.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response:
                print(f"Response for command '{command}': {response}")
                if response.value is not None:  # Check for None
                    print(f"Response value for command '{command}': {response.value}")
                    return response.value.magnitude
                else:
                    print(f"No value in response for command '{command}'")
            else:
                print(f"No response for command '{command}'")
        else:
            print(f"No command found for column '{column}'")
        return None

    def get_obd2_value(self, column):
        """
        Retrieve OBD-II value for a specific column.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response.value is not None:  # Check for None
                return response.value.magnitude
        return None

    def get_data_from_sensor_stub(self):
        """
        Generate stub data for the sensor.

        Returns:
            dict: A dictionary containing stub sensor data.
        """
        data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS}        
        # Randomly insert None or 0.0
        if random.choice([True, False]):
            random_column = random.choice(self.COLUMNS)
            if random.choice([True, False]):
                data_stub[random_column] = None
            else:
                data_stub[random_column] = 0.0

        return data_stub

def format_data_for_display(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    for label, value in zip(labels, data.values()):
        if value is None:
            value = "None"
        else:
            value = f"{value:.4f}"
        formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def format_sensor_data(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict or list): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    else:
        for label, value in zip(labels, data):
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the ELM327 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")
    config = load_config(config_path)
    meas_elm327 = ELM327(config.sensors['elm327'])
    # res = meas_elm327.connect_to_elm327()

    start_time = perf_counter()
    sampling_counter = 0
    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_elm327.get_data_from_sensor()

            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_elm327.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_elm327.COLUMNS)
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))

if __name__ == '__main__':
    test_main()
Nach dem Login kopieren

This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.

7. Conclusion

In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.

8. Reference

[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.

[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.

[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.

[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.

[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.

[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.

[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.

[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.

[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.

[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.

[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.

Das obige ist der detaillierte Inhalt vonErstellen eines benutzerdefinierten Datenloggers mit Raspberry Pi für Fahrzeuge: Integration von BNO- und ELM-Sensoren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage