1
0

Implement MQTT connection and message publishing based on CO levels; refactor air quality status handling

This commit is contained in:
Jose
2025-05-16 13:05:34 +02:00
parent a50716c483
commit 51862cf0a8
5 changed files with 47 additions and 34 deletions

View File

@@ -8,9 +8,11 @@ import java.util.Set;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise; import io.vertx.core.Promise;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router; import io.vertx.ext.web.Router;
@@ -19,6 +21,8 @@ import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions; import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.handler.BodyHandler; import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler; import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import net.miarma.contaminus.common.ConfigManager; import net.miarma.contaminus.common.ConfigManager;
import net.miarma.contaminus.common.Constants; import net.miarma.contaminus.common.Constants;
import net.miarma.contaminus.entities.COValue; import net.miarma.contaminus.entities.COValue;
@@ -34,12 +38,14 @@ public class LogicLayerAPIVerticle extends AbstractVerticle {
private ConfigManager configManager; private ConfigManager configManager;
private final Gson gson = new GsonBuilder().serializeNulls().create(); private final Gson gson = new GsonBuilder().serializeNulls().create();
private RestClientUtil restClient; private RestClientUtil restClient;
private MqttClient mqttClient;
public LogicLayerAPIVerticle() { public LogicLayerAPIVerticle() {
this.configManager = ConfigManager.getInstance(); this.configManager = ConfigManager.getInstance();
WebClientOptions options = new WebClientOptions() WebClientOptions options = new WebClientOptions()
.setUserAgent("ContaminUS"); .setUserAgent("ContaminUS");
this.restClient = new RestClientUtil(WebClient.create(Vertx.vertx(), options)); this.restClient = new RestClientUtil(WebClient.create(Vertx.vertx(), options));
this.mqttClient = MqttClient.create(Vertx.vertx(), new MqttClientOptions().setAutoKeepAlive(true));
} }
@Override @Override
@@ -146,6 +152,28 @@ public class LogicLayerAPIVerticle extends AbstractVerticle {
WeatherValue weatherValue = gson.fromJson(weather.toString(), WeatherValue.class); WeatherValue weatherValue = gson.fromJson(weather.toString(), WeatherValue.class);
COValue coValue = gson.fromJson(co.toString(), COValue.class); COValue coValue = gson.fromJson(co.toString(), COValue.class);
float coAmount = coValue.getValue();
String topic = buildTopic(Integer.parseInt(groupId), deviceId, "matrix");
if (coAmount >= 80.0f) {
mqttClient.connect(1883, "miarma.net", ar -> {
if(ar.succeeded()) {
Constants.LOGGER.info("Connected to MQTT broker");
mqttClient.publish(topic, Buffer.buffer("ECO"), MqttQoS.AT_LEAST_ONCE, false, false);
} else {
Constants.LOGGER.error("Error connecting to MQTT broker: " + ar.cause().getMessage());
}
});
} else {
mqttClient.connect(1883, "miarma.net", ar -> {
if(ar.succeeded()) {
Constants.LOGGER.info("Connected to MQTT broker");
mqttClient.publish(topic, Buffer.buffer("GAS"), MqttQoS.AT_LEAST_ONCE, false, false);
} else {
Constants.LOGGER.error("Error connecting to MQTT broker: " + ar.cause().getMessage());
}
});
}
gpsValue.setDeviceId(deviceId); gpsValue.setDeviceId(deviceId);
weatherValue.setDeviceId(deviceId); weatherValue.setDeviceId(deviceId);
coValue.setDeviceId(deviceId); coValue.setDeviceId(deviceId);
@@ -168,4 +196,10 @@ public class LogicLayerAPIVerticle extends AbstractVerticle {
}) })
.onFailure(err -> context.fail(500, err)); .onFailure(err -> context.fail(500, err));
} }
private String buildTopic(int groupId, String deviceId, String topic)
{
String topicString = "group/" + groupId + "/device/" + deviceId + "/" + topic;
return topicString;
}
} }

View File

@@ -27,12 +27,6 @@ struct SensorInfo
String type; String type;
}; };
enum AirQualityStatus
{
GOOD,
BAD
};
void readMQ7(); void readMQ7();
void readBME280(); void readBME280();
void readGPS(); void readGPS();

View File

@@ -1,6 +1,7 @@
#include "MqttClient.hpp" #include "MqttClient.hpp"
extern WiFiClient wifiClient; extern WiFiClient wifiClient;
extern const char *currentMessage;
PubSubClient client(wifiClient); PubSubClient client(wifiClient);
@@ -19,9 +20,14 @@ void MQTT_OnReceived(char *topic, byte *payload, unsigned int length)
content.concat((char)payload[i]); content.concat((char)payload[i]);
} }
#ifdef DEBUG if(content == "ECO")
Serial.println(content); {
#endif currentMessage = "Solo vehiculos electricos/hibridos";
}
else
{
currentMessage = "Todo tipo de vehiculos";
}
} }
void MQTT_Init(const char *MQTTServerAddress, uint16_t MQTTServerPort) void MQTT_Init(const char *MQTTServerAddress, uint16_t MQTTServerPort)
@@ -38,7 +44,9 @@ void MQTT_Connect(const char *MQTTClientName)
if (client.connect(MQTTClientName, USER, MQTT_PASSWORD)) if (client.connect(MQTTClientName, USER, MQTT_PASSWORD))
{ {
String statusTopic = buildTopic(GROUP_ID, String(DEVICE_ID, HEX), "status"); String statusTopic = buildTopic(GROUP_ID, String(DEVICE_ID, HEX), "status");
String matrixTopic = buildTopic(GROUP_ID, String(DEVICE_ID, HEX), "matrix");
client.subscribe(statusTopic.c_str()); client.subscribe(statusTopic.c_str());
client.subscribe(matrixTopic.c_str());
client.publish(statusTopic.c_str(), "connected"); client.publish(statusTopic.c_str(), "connected");
} }
#ifdef DEBUG #ifdef DEBUG

View File

@@ -28,6 +28,6 @@ GPSData_t GPS_Read()
GPSData_t GPS_Read_Fake() GPSData_t GPS_Read_Fake()
{ {
float rnd = random(-0.005, 0.005); float rnd = random(-0.0005, 0.0005);
return {37.358201f + rnd, -5.986640f + rnd}; return {37.358201f + rnd, -5.986640f + rnd};
} }

View File

@@ -2,8 +2,6 @@
const uint32_t DEVICE_ID = getChipID(); const uint32_t DEVICE_ID = getChipID();
const int GROUP_ID = 1; const int GROUP_ID = 1;
const char ALL_VEHICLES[] = "Todo tipo de vehiculos";
const char ELECTRIC_VEHICLES[] = "Solo vehiculos electricos/hibridos";
const char *currentMessage = nullptr; const char *currentMessage = nullptr;
const String id = "CUS-" + String(DEVICE_ID, HEX); const String id = "CUS-" + String(DEVICE_ID, HEX);
@@ -18,7 +16,6 @@ extern MD_Parola display;
MQ7Data_t mq7Data; MQ7Data_t mq7Data;
BME280Data_t bme280Data; BME280Data_t bme280Data;
GPSData_t gpsData; GPSData_t gpsData;
AirQualityStatus currentAirStatus = GOOD;
void setup() void setup()
{ {
@@ -38,7 +35,7 @@ void setup()
MAX7219_Init(); MAX7219_Init();
Serial.println("Display inicializado"); Serial.println("Display inicializado");
writeMatrix(ALL_VEHICLES); writeMatrix(currentMessage);
} }
void loop() void loop()
@@ -79,23 +76,7 @@ void loop()
void readMQ7() void readMQ7()
{ {
const float CO_THRESHOLD = 100.0f; const float CO_THRESHOLD = 100.0f;
mq7Data = MQ7_Read(); mq7Data = MQ7_Read();
AirQualityStatus newStatus = (mq7Data.co >= CO_THRESHOLD) ? BAD : GOOD;
if (newStatus != currentAirStatus)
{
currentAirStatus = newStatus;
if (currentAirStatus == BAD)
{
writeMatrix(ELECTRIC_VEHICLES);
}
else
{
writeMatrix(ALL_VEHICLES);
}
}
} }
void readBME280() void readBME280()
@@ -110,10 +91,6 @@ void readGPS()
void writeMatrix(const char *message) void writeMatrix(const char *message)
{ {
if (currentMessage == message)
return;
currentMessage = message;
#ifdef DEBUG #ifdef DEBUG
Serial.println("Escribiendo en el display..."); Serial.println("Escribiendo en el display...");
#endif #endif