From e3b96f4bd40e24f8be1cda1f45934647e4ca89f6 Mon Sep 17 00:00:00 2001 From: andrekir Date: Thu, 12 Oct 2023 17:52:52 -0300 Subject: [PATCH] feat: implement MQTT client proxy --- app/build.gradle | 3 + .../mesh/repository/network/MQTTRepository.kt | 151 ++++++++++++++++++ .../network/TrustAllX509TrustManager.kt | 12 ++ .../geeksville/mesh/service/MeshService.kt | 67 ++++++-- 4 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt create mode 100644 app/src/main/java/com/geeksville/mesh/repository/network/TrustAllX509TrustManager.kt diff --git a/app/build.gradle b/app/build.gradle index 0314e350b..e18c87492 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -249,6 +249,9 @@ dependencies { // App intro implementation 'com.github.AppIntro:AppIntro:6.3.1' + + // MQTT + implementation "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5" } ksp { diff --git a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt new file mode 100644 index 000000000..f96e6a6bd --- /dev/null +++ b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt @@ -0,0 +1,151 @@ +package com.geeksville.mesh.repository.network + +import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage +import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig.MQTTConfig +import com.geeksville.mesh.android.Logging +import com.geeksville.mesh.mqttClientProxyMessage +import com.geeksville.mesh.repository.datastore.ModuleConfigRepository +import com.geeksville.mesh.util.ignoreException +import com.google.protobuf.ByteString +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttAsyncClient +import org.eclipse.paho.client.mqttv3.MqttAsyncClient.generateClientId +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended +import org.eclipse.paho.client.mqttv3.MqttConnectOptions +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import java.net.URI +import java.security.SecureRandom +import javax.inject.Inject +import javax.inject.Singleton +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManager + +@Singleton +class MQTTRepository @Inject constructor( + private val moduleConfigRepository: ModuleConfigRepository, +) : Logging { + + companion object { + /** + * Quality of Service (QoS) levels in MQTT: + * - QoS 0: "at most once". Packets are sent once without validation if it has been received. + * - QoS 1: "at least once". Packets are sent and stored until the client receives confirmation from the server. MQTT ensures delivery, but duplicates may occur. + * - QoS 2: "exactly once". Similar to QoS 1, but with no duplicates. + */ + private const val DEFAULT_QOS = 1 + private const val DEFAULT_TOPIC_ROOT = "msh" + private const val VERSION_TOPIC_LEVEL = "/2/c/#" + private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org" + } + + private var mqttClient: MqttAsyncClient? = null + + suspend fun connect(callback: MqttCallbackExtended) { + val mqttConfig: MQTTConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt + + val sslContext = SSLContext.getInstance("TLS") + // Create a custom SSLContext that trusts all certificates + sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) + + val connectOptions = MqttConnectOptions().apply { + userName = mqttConfig.username + password = mqttConfig.password.toCharArray() + isCleanSession = false // must be false to auto subscribe on reconnects + isAutomaticReconnect = true + if (mqttConfig.tlsEnabled) { + socketFactory = sslContext.socketFactory + } + } + + val bufferOptions = DisconnectedBufferOptions().apply { + isBufferEnabled = true + bufferSize = 100 + isPersistBuffer = false + isDeleteOldestMessages = true + } + + val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp" + val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS } + .split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) } + + val serverURI: String = URI(scheme, null, host, port, "", "", "").toString() + + val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + VERSION_TOPIC_LEVEL + + mqttClient = MqttAsyncClient( + serverURI, + generateClientId(), + MemoryPersistence(), + ) + mqttClient?.apply { + setCallback(callback) + setBufferOpts(bufferOptions) + connect(connectOptions).waitForCompletion() + subscribe(topic, DEFAULT_QOS).waitForCompletion() + info("MQTT Subscribed to topic: $topic") + } + } + + fun disconnect() { + info("MQTT Disconnected") + mqttClient?.apply { + ignoreException { disconnect() } + close(true) + mqttClient = null + } + } + + val proxyMessageFlow: Flow = callbackFlow { + val callback = object : MqttCallbackExtended { + override fun connectComplete(reconnect: Boolean, serverURI: String) { + info("MQTT connectComplete: $serverURI reconnect: $reconnect ") + } + + override fun connectionLost(cause: Throwable) { + info("MQTT connectionLost cause: $cause") + if (cause is IllegalArgumentException) close(cause) + } + + override fun messageArrived(topic: String, message: MqttMessage) { + trySend(mqttClientProxyMessage { + this.topic = topic + data = ByteString.copyFrom(message.payload) + retained = message.isRetained + }) + } + + override fun deliveryComplete(token: IMqttDeliveryToken?) { + info("MQTT deliveryComplete messageId: ${token?.messageId}") + } + } + + try { + connect(callback) + } catch (ex: Exception) { + errormsg("MQTT Connect error: ${ex.message}") + close(ex) + } + + awaitClose { + disconnect() + } + } + + fun publish(topic: String, data: ByteArray, retained: Boolean) { + try { + val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained) + info("MQTT Publish messageId: ${token?.messageId}") + } catch (ex: Exception) { + errormsg("MQTT Publish error: ${ex.message}") + } + } + + fun publish(topic: String, message: String, retained: Boolean) { + publish(topic, message.encodeToByteArray(), retained) + } +} diff --git a/app/src/main/java/com/geeksville/mesh/repository/network/TrustAllX509TrustManager.kt b/app/src/main/java/com/geeksville/mesh/repository/network/TrustAllX509TrustManager.kt new file mode 100644 index 000000000..1783d7617 --- /dev/null +++ b/app/src/main/java/com/geeksville/mesh/repository/network/TrustAllX509TrustManager.kt @@ -0,0 +1,12 @@ +package com.geeksville.mesh.repository.network + +import android.annotation.SuppressLint +import java.security.cert.X509Certificate +import javax.net.ssl.X509TrustManager + +@SuppressLint("CustomX509TrustManager", "TrustAllX509TrustManager") +class TrustAllX509TrustManager : X509TrustManager { + override fun checkClientTrusted(chain: Array?, authType: String?) {} + override fun checkServerTrusted(chain: Array?, authType: String?) {} + override fun getAcceptedIssuers(): Array = arrayOf() +} diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index e15be8e70..55987aeeb 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -24,6 +24,7 @@ import com.geeksville.mesh.database.entity.Packet import com.geeksville.mesh.model.DeviceVersion import com.geeksville.mesh.repository.datastore.RadioConfigRepository import com.geeksville.mesh.repository.location.LocationRepository +import com.geeksville.mesh.repository.network.MQTTRepository import com.geeksville.mesh.repository.radio.BluetoothInterface import com.geeksville.mesh.repository.radio.RadioInterfaceService import com.geeksville.mesh.repository.radio.RadioServiceConnectionState @@ -38,6 +39,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.withTimeoutOrNull @@ -75,6 +77,9 @@ class MeshService : Service(), Logging { @Inject lateinit var radioConfigRepository: RadioConfigRepository + @Inject + lateinit var mqttRepository: MQTTRepository + companion object : Logging { /// Intents broadcast by MeshService @@ -145,6 +150,7 @@ class MeshService : Service(), Logging { private var connectionState = ConnectionState.DISCONNECTED private var locationFlow: Job? = null + private var mqttMessageFlow: Job? = null private fun getSenderName(packet: DataPacket?): String { val name = nodeDBbyID[packet?.from]?.user?.longName @@ -184,7 +190,7 @@ class MeshService : Service(), Logging { private fun stopLocationRequests() { if (locationFlow?.isActive == true) { - debug("Stopping location requests") + info("Stopping location requests") locationFlow?.cancel() locationFlow = null } @@ -515,8 +521,7 @@ class MeshService : Service(), Logging { wantAck = true, channel = adminChannelIndex, priority = MeshPacket.Priority.RELIABLE - ) - { + ) { this.wantResponse = wantResponse portnumValue = Portnums.PortNum.ADMIN_APP_VALUE payload = AdminProtos.AdminMessage.newBuilder().also { @@ -849,7 +854,7 @@ class MeshService : Service(), Logging { private fun stopPacketQueue() { if (queueJob?.isActive == true) { - debug("Stopping packet queueJob") + info("Stopping packet queueJob") queueJob?.cancel() queueJob = null queuedPackets.clear() @@ -1038,6 +1043,7 @@ class MeshService : Service(), Logging { fun startDeviceSleep() { stopPacketQueue() stopLocationRequests() + stopMqttClientProxy() if (connectTimeMsec != 0L) { val now = System.currentTimeMillis() @@ -1071,6 +1077,7 @@ class MeshService : Service(), Logging { fun startDisconnect() { stopPacketQueue() stopLocationRequests() + stopMqttClientProxy() GeeksvilleApplication.analytics.track( "mesh_disconnect", @@ -1120,12 +1127,9 @@ class MeshService : Service(), Logging { connectionState = c when (c) { - ConnectionState.CONNECTED -> - startConnect() - ConnectionState.DEVICE_SLEEP -> - startDeviceSleep() - ConnectionState.DISCONNECTED -> - startDisconnect() + ConnectionState.CONNECTED -> startConnect() + ConnectionState.DEVICE_SLEEP -> startDeviceSleep() + ConnectionState.DISCONNECTED -> startDisconnect() } // Update the android notification in the status bar @@ -1169,6 +1173,7 @@ class MeshService : Service(), Logging { MeshProtos.FromRadio.MODULECONFIG_FIELD_NUMBER -> handleModuleConfig(proto.moduleConfig) MeshProtos.FromRadio.QUEUESTATUS_FIELD_NUMBER -> handleQueueStatus(proto.queueStatus) MeshProtos.FromRadio.METADATA_FIELD_NUMBER -> handleMetadata(proto.metadata) + MeshProtos.FromRadio.MQTTCLIENTPROXYMESSAGE_FIELD_NUMBER -> handleMqttProxyMessage(proto.mqttClientProxyMessage) else -> errormsg("Unexpected FromRadio variant") } } catch (ex: InvalidProtocolBufferException) { @@ -1364,10 +1369,52 @@ class MeshService : Service(), Logging { rawDeviceMetadata = metadata } + /** + * Publish MqttClientProxyMessage (fromRadio) + */ + private fun handleMqttProxyMessage(message: MeshProtos.MqttClientProxyMessage) { + with(message) { + when (payloadVariantCase) { + MeshProtos.MqttClientProxyMessage.PayloadVariantCase.TEXT -> { + mqttRepository.publish(topic, text, retained) + } + + MeshProtos.MqttClientProxyMessage.PayloadVariantCase.DATA -> { + mqttRepository.publish(topic, data.toByteArray(), retained) + } + + else -> {} + } + } + } + + /** + * Connect, subscribe and receive Flow of MqttClientProxyMessage (toRadio) + */ + private fun startMqttClientProxy() { + if (mqttMessageFlow?.isActive == true) return + if (moduleConfig.mqtt.enabled && moduleConfig.mqtt.proxyToClientEnabled) { + mqttMessageFlow = mqttRepository.proxyMessageFlow.onEach { message -> + sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message }) + }.catch { throwable -> + errormsg("MqttClientProxy failed: $throwable") + }.launchIn(serviceScope) + } + } + + private fun stopMqttClientProxy() { + if (mqttMessageFlow?.isActive == true) { + info("Stopping MqttClientProxy") + mqttMessageFlow?.cancel() + mqttMessageFlow = null + } + } + /// If we've received our initial config, our radio settings and all of our channels, send any queued packets and broadcast connected to clients private fun onHasSettings() { processQueuedPackets() // send any packets that were queued up + startMqttClientProxy() // broadcast an intent with our new connection state serviceBroadcasts.broadcastConnection()