handlePublish method
Handles the receipt of publish messages from a message broker.
Implementation
bool handlePublish(MqttMessage? msg) {
final pubMsg = msg as MqttPublishMessage;
var publishSuccess = true;
try {
final topic = PublicationTopic(pubMsg.variableHeader!.topicName);
MqttLogger.log(
'PublishingManager::handlePublish - publish received from broker with topic $topic');
if (pubMsg.header!.qos == MqttQos.atMostOnce) {
// QOS AtMostOnce 0 require no response.
// Send the message for processing to whoever is waiting.
_fireMessageReceived(topic, msg);
} else if (pubMsg.header!.qos == MqttQos.atLeastOnce) {
// QOS AtLeastOnce 1 requires an acknowledgement
// Send the message for processing to whoever is waiting.
_fireMessageReceived(topic, msg);
// If configured the client will send the acknowledgement, else the user must.
final messageIdentifier = pubMsg.variableHeader!.messageIdentifier;
if (!manuallyAcknowledgeQos1) {
final ackMsg =
MqttPublishAckMessage().withMessageIdentifier(messageIdentifier);
connectionHandler!.sendMessage(ackMsg);
} else {
// Add to the awaiting manual acknowledge list
awaitingManualAcknowledge[messageIdentifier!] = pubMsg;
}
} else if (pubMsg.header!.qos == MqttQos.exactlyOnce) {
// QOS ExactlyOnce means we can't give it away yet, we need to do a handshake
// to make sure the broker knows we got it, and we know he knows we got it.
// If we've already got it thats ok, it just means its being republished because
// of a handshake breakdown, overwrite our existing one for the sake of it
if (!receivedMessages
.containsKey(pubMsg.variableHeader!.messageIdentifier)) {
receivedMessages[pubMsg.variableHeader!.messageIdentifier] = pubMsg;
}
final pubRecv = MqttPublishReceivedMessage()
.withMessageIdentifier(pubMsg.variableHeader!.messageIdentifier);
connectionHandler!.sendMessage(pubRecv);
}
} on Exception {
publishSuccess = false;
}
return publishSuccess;
}