Implement bulk of TtnHabBridge, add configuration.

koppelting
Bertrik Sikken 2017-08-22 02:14:50 +02:00
rodzic 87ac94d809
commit 2fa6b004aa
4 zmienionych plików z 333 dodań i 6 usunięć

Wyświetl plik

@ -0,0 +1,17 @@
package nl.sikken.bertrik;
/**
* Configuration interface for the application.
*/
public interface ITtnHabBridgeConfig {
String getHabitatUrl();
int getHabitatTimeout();
String getMqttServerUrl();
String getMqttClientId();
String getMqttUserName();
char[] getMqttPassword();
String getMqttTopic();
}

Wyświetl plik

@ -1,23 +1,184 @@
package nl.sikken.bertrik;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.sikken.bertrik.hab.Sentence;
import nl.sikken.bertrik.hab.SodaqOnePayload;
import nl.sikken.bertrik.hab.habitat.HabReceiver;
import nl.sikken.bertrik.hab.habitat.HabitatUploader;
import nl.sikken.bertrik.hab.habitat.IHabitatRestApi;
import nl.sikken.bertrik.hab.habitat.Location;
import nl.sikken.bertrik.hab.ttn.MqttData;
import nl.sikken.bertrik.hab.ttn.MqttGateway;
/**
* @author bertrik
* Bridge between the-things-network and the habhub network.
*/
public final class TtnHabBridge {
private static final Logger LOG = LoggerFactory.getLogger(TtnHabBridge.class);
private static final String CONFIG_FILE = "ttnhabbridge.properties";
private final ITtnHabBridgeConfig config;
private final HabitatUploader uploader;
private final ObjectMapper mapper;
private MqttClient mqttClient;
/**
* Main application entry point.
*
* @param arguments application arguments (none taken)
* @throws IOException in case of a problem reading a config file
* @throws MqttException in case of a problem starting MQTT client
*/
public static void main(String arguments) {
final TtnHabBridge app = new TtnHabBridge();
app.run();
public static void main(String[] arguments) throws IOException, MqttException {
final ITtnHabBridgeConfig config = readConfig(new File(CONFIG_FILE));
final TtnHabBridge app = new TtnHabBridge(config);
app.start();
Runtime.getRuntime().addShutdownHook(new Thread(app::stop));
}
/**
* Runs the "main loop".
* Constructor.
*
* @param config the application configuration
*/
private void run() {
private TtnHabBridge(ITtnHabBridgeConfig config) {
this.config = config;
final IHabitatRestApi restApi = HabitatUploader.newRestClient(config.getHabitatUrl(), config.getHabitatTimeout());
this.uploader = new HabitatUploader(restApi);
this.mapper = new ObjectMapper();
}
/**
* Starts the application.
* @throws MqttException in case of a problem starting MQTT client
*/
private void start() throws MqttException {
LOG.info("Starting TTN-HAB bridge application");
// start sub-modules
uploader.start();
// start MQTT client
this.mqttClient = new MqttClient(config.getMqttServerUrl(), config.getMqttClientId());
final MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(config.getMqttUserName());
options.setPassword(config.getMqttPassword());
options.setAutomaticReconnect(true);
mqttClient.connect(options);
mqttClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
final String payload = new String(message.getPayload(), StandardCharsets.US_ASCII);
LOG.info("Message arrived on topic {}: {}", topic, payload);
try {
handleMessageArrived(topic, payload);
} catch (Exception e) {
LOG.info("Exception in message handling: {}", e.getMessage());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LOG.info("deliveryComplete");
// we don't care
}
@Override
public void connectionLost(Throwable cause) {
LOG.info("connectionLost: {}", cause.getMessage());
}
});
mqttClient.subscribe(config.getMqttTopic());
}
private void handleMessageArrived(String topic, String message) {
final Date now = new Date();
try {
// try to decode the payload
final MqttData data = mapper.readValue(message, MqttData.class);
final SodaqOnePayload sodaq = SodaqOnePayload.parse(data.getPayload());
LOG.info("Got SODAQ message: {}", sodaq);
final String callSign = data.getDevId();
final int id = data.getCounter();
final double latitude = sodaq.getLatitude();
final double longitude = sodaq.getLongitude();
final double altitude = sodaq.getAltitude();
// get the payload coordinates and construct a sentence
final Sentence sentence = new Sentence(callSign, id, now, latitude, longitude, altitude);
final String line = sentence.format();
// create listeners
final List<HabReceiver> receivers = new ArrayList<>();
for (MqttGateway gw : data.getMetaData().getMqttGateways()) {
final HabReceiver receiver =
new HabReceiver(gw.getGatewayId(), new Location(gw.getLatitude(), gw.getLongitude(), gw.getAltitude()));
receivers.add(receiver);
}
// send listener data
for (HabReceiver receiver : receivers) {
uploader.uploadListenerData(receiver, now);
}
// send payload telemetry data
uploader.uploadPayloadTelemetry(line, receivers, now);
} catch (IOException e) {
LOG.warn("JSON unmarshalling exception '{}' for {}", e.getMessage(), message);
} catch (BufferUnderflowException e) {
LOG.warn("Sodaq payload exception: {}", e.getMessage());
}
}
/**
* Stops the application.
* @throws MqttException
*/
private void stop() {
LOG.info("Stopping TTN HAB bridge application");
try {
mqttClient.close();
} catch (MqttException e) {
// what can we do about this?
LOG.warn("Error closing MQTT client...");
}
uploader.stop();
}
private static ITtnHabBridgeConfig readConfig(File file) throws IOException {
final TtnHabBridgeConfig config = new TtnHabBridgeConfig();
try {
config.load(file);
} catch (IOException e) {
LOG.info("Failed to load config, attempt to write defaults...");
config.save(file);
}
return config;
}
}

Wyświetl plik

@ -0,0 +1,123 @@
package nl.sikken.bertrik;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Configuration class.
*/
public final class TtnHabBridgeConfig implements ITtnHabBridgeConfig {
/**
* One enumeration item per configuration item.
*/
private enum EConfigItem {
HABITAT_URL("habitat.url", "http://habitat.habhub.org", "URL of the habitat server"),
HABITAT_TIMEOUT("habitat.timeout", "3000", "Timeout in milliseconds"),
MQTT_SERVER_URL("mqtt.serverurl", "eu.thethings.network", "URL to MQTT server"),
MQTT_CLIENT_ID("mqtt.clientid", "ttnhabbridge", "MQTT client id"),
MQTT_USER_NAME("mqtt.username", "ttnmapper", "TTN application name used as MQTT user name"),
MQTT_USER_PASS("mqtt.password", "ttn-account-v2.Xc8BFRKeBK5nUhc9ikDcR-sbelgSMdHKnOQKMAiwpgI", "TTN application password"),
MQTT_TOPIC("mqtt.topic", "ttnmapper/devices/+/up", "MQTT topic to subscribe to")
;
private String key;
private String def;
private String comment;
EConfigItem(String key, String def, String comment) {
this.key = key;
this.def = def;
this.comment = comment;
}
}
private final Map<EConfigItem, String> props = new HashMap<>();
/**
* Create a configuration setting with defaults.
*/
public TtnHabBridgeConfig() {
for (EConfigItem e : EConfigItem.values()) {
props.put(e, e.def);
}
}
/**
* Load settings from file.
*
* @param file the file
* @throws IOException in case of a problem reading the file
*/
public void load(File file) throws IOException {
final Properties properties = new Properties();
try (FileInputStream fis = new FileInputStream(file)) {
properties.load(fis);
}
for (EConfigItem e : EConfigItem.values()) {
String value = properties.getProperty(e.key);
if (value != null) {
props.put(e, value);
}
}
}
/**
* Save settings to file.
*
* @param file the file
* @throws IOException in case of a file problem
*/
public void save(File file) throws IOException {
try (FileWriter writer = new FileWriter(file)) {
for (EConfigItem e : EConfigItem.values()) {
// comment line
writer.append("# " + e.comment + "\n");
writer.append(e.key + "=" + e.def + "\n");
writer.append("\n");
}
}
}
@Override
public int getHabitatTimeout() {
return Integer.valueOf(props.get(EConfigItem.HABITAT_TIMEOUT));
}
@Override
public String getHabitatUrl() {
return props.get(EConfigItem.HABITAT_URL);
}
@Override
public String getMqttClientId() {
return props.get(EConfigItem.MQTT_CLIENT_ID);
}
@Override
public String getMqttUserName() {
return props.get(EConfigItem.MQTT_USER_NAME);
}
@Override
public char[] getMqttPassword() {
return props.get(EConfigItem.MQTT_USER_PASS).toCharArray();
}
@Override
public String getMqttServerUrl() {
return props.get(EConfigItem.MQTT_SERVER_URL);
}
@Override
public String getMqttTopic() {
return props.get(EConfigItem.MQTT_TOPIC);
}
}

Wyświetl plik

@ -0,0 +1,26 @@
package nl.sikken.bertrik;
import java.io.File;
import java.io.IOException;
import org.junit.Test;
/**
* Unit tests for TtnHabBridgeConfig.
*/
public final class TtnHabBridgeConfigTest {
/**
* Verifies basic loading/saving of a configuration.
*
* @throws IOException
*/
@Test
public void testLoadSave() throws IOException {
final TtnHabBridgeConfig config = new TtnHabBridgeConfig();
final File file = new File("test.properties");
config.save(file);
config.load(file);
}
}