In a previous post, we showed how to send and receive messages from IoT devices using an MQTT broker. In this post we'll extend that idea to a real world example.
Suppose you have an IoT device that measures temperature and humidity in a greenhouse (Not hard to make one using Raspberry Pi or Arduino).
We want to monitor the greenhouse conditions remotely from another computer or, perhaps, a central logging service. In the previous post, we showed a Go implementation of code to send messages so we will extend that example.
Instead of just sending a string saying "temp is x, humidity is y", let's define a structure for the message and device. Consider that you have (or want to add in the future) a device to monitor moisture or rainfall and you want to connect that one as well.
To leave open the possibility of multiple devices and types, we need a data model to handle that.
type Message struct { Time time.Time `json:"time"` Device Device `json:"device"` } type Device interface { ID() string Name() string } type TempRHDevice struct { Id string `json:"id"` DeviceName string `json:"name,omitempty"` Temp float32 `json:"temp,omitempty"` Rh float32 `json:"rh,omitempty"` } func (t TempRHDevice) ID() string { return t.Id } func (t TempRHDevice) Name() string { return t.DeviceName }
The Message struct is what will be sent to the MQTT broker. We created an interface to handle the common attributes of our IoT devices and abstract the details of the specific devices.
The TempRHDevice is our device that measures temperature and humidity. It implements the Device interface, so it can be used in a Message.
Next, we need to send the message to the broker. We'll employ JSON format for its simplicity in this example. Note that in a large-scale system with thousands or more devices, we may want to use a more compact format.
message := generateRandomMessage() payload, err := json.Marshal(message) if err != nil { panic(err) } token := client.Publish(topic, 0, false, payload)
Go makes marshalling into JSON pretty easy. Once marshaled, the json message is sent to the broker.
What else would we want to do with the data once we have it: store it to a database, display it on a dashboard, check the values for alarm conditions. We'll need to convert the json to make it usable.
On the receiving side, we just need to unmarshal the json into a struct. We'll use a structure similar to that used on the sending side; but we need a way to unmarshal into a concrete type rather than the Device interface in Message. We'll add a custom unmarshal method on Message to make the code a little cleaner as well
type rawMessage struct { Time time.Time `json:"time"` Device TempRHDevice `json:"device"` } func (m *Message) UnmarshalJSON(data []byte) error { var raw rawMessage if err := json.Unmarshal(data, &raw); err != nil { return err } m.Time = raw.Time m.Device = &raw.Device return nil } ... func processMsg(ctx context.Context, .... ... case msg, ok := <-input: if !ok { return } logger.Info().Msg(fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())) var iotMsg Message err := json.Unmarshal(msg.Payload(), &iotMsg) if err != nil { logger.Error().Err(err).Msg("Error unmarshalling Message") } else { out <- iotMsg } ...
It is appropriate to point out here that this method gets complicated when more device types get added. For example, how will the UnmarshalJSON method know what device type the message contains. We could do some clever logic in UnmarshalJSON to detect the type.
For another alternative, remember that MQTT can be used to publish to multiple topics and it is common practice to user a hierarchical naming convention for topics. So in the case of multiple device types in the greenhouse example, the recommended way is to have different device types publish to different topics. This is the way we'll handle it moving forward as we add new device types.
In the greenhouse example, the topic names could be structured like:
/greenhouse/temprh/deviceid /greenhouse/moisture/deviceid
In MQTT, we can subscribe to topics using a wildcard topic, such as:
if token := client.Subscribe("/greenhouse/#", 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) }
which will match all devices in the greenhouse namespace. then we would just need to add logic to processMsg() to look at the topic of the incoming message to know how to unmarshal it.
Now that we have a device message in a usable form, what should be done with it. In the next post in this series, we'll demonstrate our approach to persist the message in PostGres.
As usual the full source code for the sender can be found here and the subscriber code can be found here.
Let me know your thoughts in the comments.
Thanks!
The above is the detailed content of Sending IoT Device Data via MQTT broker.. For more information, please follow other related articles on the PHP Chinese website!