前の投稿では、MQTT ブローカーから IoT デバイス データを受信する方法を示しました。この投稿では、データをデータベースに保存します。
堅牢なシステムでは、生データ イベントをデータ レイクに保存することを選択する場合があります。おそらく、将来的にはそれを調査することになるでしょう。ただし、今は簡単にするために PostGres に保存します。
前の投稿では、生データを受信し、すでに gorm タグで注釈が付けられている構造体にアンマーシャリングする方法を示しました。 Gorm は Go の人気のある ORM です。よくわからない場合は、ここで詳細を確認してください。
type IoTDeviceMessage struct { BaseModel Time time.Time `json:"time" gorm:"index"` DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` DeviceData json.RawMessage `json:"device_data"` }
つまり、Postgres 接続を設定し、gorm を使用してイベント データを保存するだけです。
func setupPostgres(logger *zerolog.Logger) *Repository { dbHost := os.Getenv("POSTGRES_HOST") dbName := os.Getenv("POSTGRES_DB") dbPort := os.Getenv("POSTGRES_PORT") dbUser := os.Getenv("POSTGRES_USER") dbPassword := os.Getenv("POSTGRES_PASSWORD") dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC", dbHost, dbUser, dbPassword, dbName, dbPort) logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn)) db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } // Auto-migrate the schema err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{}) if err != nil { logger.Fatal().Err(err).Msg("failed to migrate models") } sqlDB, err := db.DB() sqlDB.SetMaxIdleConns(10) sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) repo := NewRepository(db, logger) return repo }
ここでは Postgres 接続をセットアップします。機密情報を保存するために環境変数を使用していることに注意してください。これは、コンテナ化されているかどうかに関係なく、運用システムにとって良い習慣です。
リポジトリと呼ばれる構造体も初期化しています。この構造体には、実際の保存メソッドと取得メソッドが含まれています。これにより、postgres 構成からある程度分離できるようになります。
type Repository struct { db *gorm.DB logger *zerolog.Logger } func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository { return &Repository{db: db, logger: logger} } func (r *Repository) Close() { sqlDb, err := r.db.DB() if err != nil { r.logger.Error().Err(err).Msg("failed to close database") return } _ = sqlDb.Close() } ... // Message-related functions func (r *Repository) CreateMessage(message *IoTDeviceMessage) error { return r.db.Create(message).Error } func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) { var messages []IoTDeviceMessage err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error return messages, err } func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error { return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error }
あとは、メッセージを永続化するだけです。パイプライン パターンを使用してメッセージを処理しているため、パイプラインの新しいステージに永続化ステップを追加します。
// pipeline stage to persist the message func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage { out := make(chan IoTRawDeviceMessage) go func() { defer close(out) for iotMsg := range input { logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID)) err := repo.CreateMessage(&iotMsg) if err != nil { logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage") } } }() return out } ... finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan)) for iotMsg := range finalChan { // now we have the IoTRawDeviceMessage that has been persisted logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg)) // do something like check for alert conditions }
必要なのはこれだけです。
このコードはここにあります。前回の投稿と同じ発行者コードで使用できます。 Postgres 設定を必ず環境変数として構成してください。
以上がIoTデバイスのデータを保存するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。