In einem früheren Beitrag haben wir gezeigt, wie man IoT-Gerätedaten von einem MQTT-Broker empfängt. In diesem Beitrag werden wir die Daten in einer Datenbank speichern.
In einem robusten System können wir uns dafür entscheiden, die Rohdatenereignisse in einem Data Lake zu speichern. Vielleicht werden wir das in Zukunft untersuchen; aber der Einfachheit halber speichern wir es vorerst in PostGres.
Im vorherigen Beitrag wurde der Empfang der Rohdaten und deren Unmarshalling in eine Struktur gezeigt, die bereits mit Gorm-Tags annotiert war. Gorm ist ein beliebtes ORM für Go. Wenn Sie damit nicht vertraut sind, finden Sie hier weitere Informationen.
type IoTDeviceMessage struct { BaseModel Time time.Time `json:"time" gorm:"index"` DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` DeviceData json.RawMessage `json:"device_data"` }
Alles, was wir tun müssen, ist, die Postgres-Verbindung zu konfigurieren und dann Gorm zum Speichern der Ereignisdaten zu verwenden.
func setupPostgres(logger *zerolog.Logger) *Repository { dbHost := os.Getenv("POSTGRES_HOST") dbName := os.Getenv("POSTGRES_DB") dbPort := os.Getenv("POSTGRES_PORT") dbUser := os.Getenv("POSTGRES_USER") dbPassword := os.Getenv("POSTGRES_PASSWORD") dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC", dbHost, dbUser, dbPassword, dbName, dbPort) logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn)) db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } // Auto-migrate the schema err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{}) if err != nil { logger.Fatal().Err(err).Msg("failed to migrate models") } sqlDB, err := db.DB() sqlDB.SetMaxIdleConns(10) sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) repo := NewRepository(db, logger) return repo }
Hier richten wir die Postgres-Verbindung ein. Beachten Sie, dass wir Umgebungsvariablen zum Speichern unserer vertraulichen Informationen verwenden. Dies ist eine bewährte Vorgehensweise für Produktionssysteme, unabhängig davon, ob sie in Containern vorliegen oder nicht.
Wir initialisieren außerdem eine Struktur namens Repository. Diese Struktur enthält unsere eigentlichen Speicher- und Abrufmethoden. Dies bietet uns eine gewisse Trennung von der Postgres-Konfiguration.
type Repository struct { db *gorm.DB logger *zerolog.Logger } func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository { return &Repository{db: db, logger: logger} } func (r *Repository) Close() { sqlDb, err := r.db.DB() if err != nil { r.logger.Error().Err(err).Msg("failed to close database") return } _ = sqlDb.Close() } ... // Message-related functions func (r *Repository) CreateMessage(message *IoTDeviceMessage) error { return r.db.Create(message).Error } func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) { var messages []IoTDeviceMessage err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error return messages, err } func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error { return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error }
Jetzt muss die Nachricht nur noch persistiert werden. Da wir das Pipeline-Muster zum Verarbeiten der Nachrichten verwenden, werden wir den Persistenzschritt als neue Stufe in der Pipeline hinzufügen.
// pipeline stage to persist the message func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage { out := make(chan IoTRawDeviceMessage) go func() { defer close(out) for iotMsg := range input { logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID)) err := repo.CreateMessage(&iotMsg) if err != nil { logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage") } } }() return out } ... finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan)) for iotMsg := range finalChan { // now we have the IoTRawDeviceMessage that has been persisted logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg)) // do something like check for alert conditions }
Das ist alles.
Den Code dafür finden Sie hier. Sie können es mit demselben Herausgebercode wie im vorherigen Beitrag verwenden. Stellen Sie sicher, dass Sie Ihre Postgres-Einstellungen als Umgebungsvariablen konfigurieren.
Das obige ist der detaillierte Inhalt vonSpeichern Sie IoT-Gerätedaten. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!