Hey, In this quick tutorial, and a reminder to myself for future use, I’m going to show you how to test with MQTT locally using docker-compose. This can be easily replicated using docker-run as well, but the scope of this is to set up home-assistant locally for testing an IOT project that I’m working on. The purpose of using MQTT is to receive updates from sensors that would be connected to a Golang application. ( I may actually move away from Golang eventually and go to python if the friction to integrate my sensors is too much )
For more info on MQTT visit – https://mqtt.org/
The aim is simple, to be able to subscribe to a topic and publish to a topic using the mosquitto_sub and moquitto_pub commands. and also show you how to use the Golang library to get the same result.
What you need:
- Docker + docker compose installed
- Golang installed
- A basic concept of how MQTT works
the docker-compose file:
version: '3'
services:
mqtt:
container_name: eclipse-mosquitto
image: eclipse-mosquitto:2.0.12
volumes:
- ./mosquitto_conf/mosquitto.conf:/mosquitto/config/mosquitto.conf
ports:
- 1883:1883
- 9001:9001
docker run --rm -it --link eclipse-mosquitto --net home_assistant_config_default eclipse-mosquitto mosquitto_sub
-h eclipse-mosquitto -t "#"
Where:
-t = topic
-h = host
--net = network to use
--link = the container to link to
docker run --rm -it --link eclipse-mosquitto
--net home_assistant_config_default eclipse-mosquitto mosquitto_pub
-h eclipse-mosquitto -t "topic/test" -m "{on}"
-m = message
This will allow you to see if the MQTT container is running correctly.
now, for the more meaty stuff.
I created two files, service.go and a main.go file.
service.go contains my MQTT functions.
type MQTT struct {
mqttClient mqtt.Client
mqtt.MessageHandler // todo figure out how to register these funcs
mqtt.ConnectionLostHandler
mqtt.OnConnectHandler
mqtt.ConnectionAttemptHandler
mqtt.ReconnectHandler
logger *log.Logger
}
func NewMQTTClient(port int, broker string, username, password string) (*MQTT, error) {
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
err := token.Error()
if err != nil {
return nil, err
}
return &MQTT{
mqttClient: client,
}, nil
}
func (mq *MQTT) Publish(topic string, payload []byte) error {
token := mq.mqttClient.Publish(topic, 0, false, payload)
token.Wait()
err := token.Error()
if err != nil {
return err
}
return nil
}
func (mq *MQTT) Subscribe(topic string, data chan []byte) {
mq.mqttClient.Subscribe(topic, 0, func(client mqtt.Client, message mqtt.Message) {
//fmt.Printf("Received message: %s from topic: %s\n", message.Payload(), message.Topic())
message.Ack()
data <- message.Payload()
})
}
main.go
func main() {
var broker = "localhost"
var port = 1883
fmt.Println("starting application")
//options.SetClientID("test_client")
// no username or password is use, as allow allow_anonymous is set to true
mqttService, err := NewMQTTClient(port, broker, "", "")
if err != nil {
panic(err)
}
topic := "sensor_data/temperature"
type publishMessage struct {
Message string `json:"message"`
}
wg := sync.WaitGroup{}
num := 10
wg.Add(num)
go func() {
for i := 0; i < num; i++ {
fmt.Println("publishing to topic")
message, _ := json.Marshal(&publishMessage{Message: fmt.Sprintf("message %d", i)})
err := mqttService.Publish(topic, message)
if err != nil {
fmt.Println(err)
}
time.Sleep(time.Second)
}
}()
respDataChan := make(chan []byte)
quit := make(chan bool, 1)
go func() {
wg.Wait()
quit <- true
close(respDataChan)
}()
go func() {
mqttService.Subscribe(topic, respDataChan)
}()
for {
select {
case respData := <-respDataChan:
wg.Done()
fmt.Printf("got response %v \n", string(respData))
case <-quit:
fmt.Println("program exited")
return
}
}
}
mqtt "github.com/eclipse/paho.mqtt.golang"
be sure to import the library
Don’t be alarmed if you’re new to Golang the use of the goroutines and the channels, it was only done this way to show publishing and subscribing in the same application.
This could also have easily been done using two applications, one as a publisher and the other as a subscriber.
go func() {
for i := 0; i < num; i++ {
fmt.Println("publishing to topic")
message, _ := json.Marshal(&publishMessage{Message: fmt.Sprintf("message %d", i)})
err := mqttService.Publish(topic, message)
if err != nil {
fmt.Println(err)
}
time.Sleep(time.Second)
}
}()
This code block, in a goroutine loops through and publishes to the topic, and we set up another goroutine to read from the topic.
go func() {
mqttService.Subscribe(topic, respDataChan)
}()
This is my starting place, i do plan to write some more as I learn and set up for my IOT project.
If you have any questions, feel free to reach out to me, or Check out some other posts below.