kopia lustrzana https://github.com/meshtastic/Meshtastic-Android
feat: implement MQTT client proxy
rodzic
d71a9171ec
commit
e3b96f4bd4
|
@ -249,6 +249,9 @@ dependencies {
|
|||
|
||||
// App intro
|
||||
implementation 'com.github.AppIntro:AppIntro:6.3.1'
|
||||
|
||||
// MQTT
|
||||
implementation "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5"
|
||||
}
|
||||
|
||||
ksp {
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
package com.geeksville.mesh.repository.network
|
||||
|
||||
import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage
|
||||
import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig.MQTTConfig
|
||||
import com.geeksville.mesh.android.Logging
|
||||
import com.geeksville.mesh.mqttClientProxyMessage
|
||||
import com.geeksville.mesh.repository.datastore.ModuleConfigRepository
|
||||
import com.geeksville.mesh.util.ignoreException
|
||||
import com.google.protobuf.ByteString
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
|
||||
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
|
||||
import org.eclipse.paho.client.mqttv3.MqttAsyncClient.generateClientId
|
||||
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
|
||||
import java.net.URI
|
||||
import java.security.SecureRandom
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.TrustManager
|
||||
|
||||
@Singleton
|
||||
class MQTTRepository @Inject constructor(
|
||||
private val moduleConfigRepository: ModuleConfigRepository,
|
||||
) : Logging {
|
||||
|
||||
companion object {
|
||||
/**
|
||||
* Quality of Service (QoS) levels in MQTT:
|
||||
* - QoS 0: "at most once". Packets are sent once without validation if it has been received.
|
||||
* - QoS 1: "at least once". Packets are sent and stored until the client receives confirmation from the server. MQTT ensures delivery, but duplicates may occur.
|
||||
* - QoS 2: "exactly once". Similar to QoS 1, but with no duplicates.
|
||||
*/
|
||||
private const val DEFAULT_QOS = 1
|
||||
private const val DEFAULT_TOPIC_ROOT = "msh"
|
||||
private const val VERSION_TOPIC_LEVEL = "/2/c/#"
|
||||
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
|
||||
}
|
||||
|
||||
private var mqttClient: MqttAsyncClient? = null
|
||||
|
||||
suspend fun connect(callback: MqttCallbackExtended) {
|
||||
val mqttConfig: MQTTConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt
|
||||
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
// Create a custom SSLContext that trusts all certificates
|
||||
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
|
||||
|
||||
val connectOptions = MqttConnectOptions().apply {
|
||||
userName = mqttConfig.username
|
||||
password = mqttConfig.password.toCharArray()
|
||||
isCleanSession = false // must be false to auto subscribe on reconnects
|
||||
isAutomaticReconnect = true
|
||||
if (mqttConfig.tlsEnabled) {
|
||||
socketFactory = sslContext.socketFactory
|
||||
}
|
||||
}
|
||||
|
||||
val bufferOptions = DisconnectedBufferOptions().apply {
|
||||
isBufferEnabled = true
|
||||
bufferSize = 100
|
||||
isPersistBuffer = false
|
||||
isDeleteOldestMessages = true
|
||||
}
|
||||
|
||||
val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp"
|
||||
val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS }
|
||||
.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) }
|
||||
|
||||
val serverURI: String = URI(scheme, null, host, port, "", "", "").toString()
|
||||
|
||||
val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + VERSION_TOPIC_LEVEL
|
||||
|
||||
mqttClient = MqttAsyncClient(
|
||||
serverURI,
|
||||
generateClientId(),
|
||||
MemoryPersistence(),
|
||||
)
|
||||
mqttClient?.apply {
|
||||
setCallback(callback)
|
||||
setBufferOpts(bufferOptions)
|
||||
connect(connectOptions).waitForCompletion()
|
||||
subscribe(topic, DEFAULT_QOS).waitForCompletion()
|
||||
info("MQTT Subscribed to topic: $topic")
|
||||
}
|
||||
}
|
||||
|
||||
fun disconnect() {
|
||||
info("MQTT Disconnected")
|
||||
mqttClient?.apply {
|
||||
ignoreException { disconnect() }
|
||||
close(true)
|
||||
mqttClient = null
|
||||
}
|
||||
}
|
||||
|
||||
val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
|
||||
val callback = object : MqttCallbackExtended {
|
||||
override fun connectComplete(reconnect: Boolean, serverURI: String) {
|
||||
info("MQTT connectComplete: $serverURI reconnect: $reconnect ")
|
||||
}
|
||||
|
||||
override fun connectionLost(cause: Throwable) {
|
||||
info("MQTT connectionLost cause: $cause")
|
||||
if (cause is IllegalArgumentException) close(cause)
|
||||
}
|
||||
|
||||
override fun messageArrived(topic: String, message: MqttMessage) {
|
||||
trySend(mqttClientProxyMessage {
|
||||
this.topic = topic
|
||||
data = ByteString.copyFrom(message.payload)
|
||||
retained = message.isRetained
|
||||
})
|
||||
}
|
||||
|
||||
override fun deliveryComplete(token: IMqttDeliveryToken?) {
|
||||
info("MQTT deliveryComplete messageId: ${token?.messageId}")
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
connect(callback)
|
||||
} catch (ex: Exception) {
|
||||
errormsg("MQTT Connect error: ${ex.message}")
|
||||
close(ex)
|
||||
}
|
||||
|
||||
awaitClose {
|
||||
disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
fun publish(topic: String, data: ByteArray, retained: Boolean) {
|
||||
try {
|
||||
val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)
|
||||
info("MQTT Publish messageId: ${token?.messageId}")
|
||||
} catch (ex: Exception) {
|
||||
errormsg("MQTT Publish error: ${ex.message}")
|
||||
}
|
||||
}
|
||||
|
||||
fun publish(topic: String, message: String, retained: Boolean) {
|
||||
publish(topic, message.encodeToByteArray(), retained)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.geeksville.mesh.repository.network
|
||||
|
||||
import android.annotation.SuppressLint
|
||||
import java.security.cert.X509Certificate
|
||||
import javax.net.ssl.X509TrustManager
|
||||
|
||||
@SuppressLint("CustomX509TrustManager", "TrustAllX509TrustManager")
|
||||
class TrustAllX509TrustManager : X509TrustManager {
|
||||
override fun checkClientTrusted(chain: Array<X509Certificate>?, authType: String?) {}
|
||||
override fun checkServerTrusted(chain: Array<X509Certificate>?, authType: String?) {}
|
||||
override fun getAcceptedIssuers(): Array<X509Certificate> = arrayOf()
|
||||
}
|
|
@ -24,6 +24,7 @@ import com.geeksville.mesh.database.entity.Packet
|
|||
import com.geeksville.mesh.model.DeviceVersion
|
||||
import com.geeksville.mesh.repository.datastore.RadioConfigRepository
|
||||
import com.geeksville.mesh.repository.location.LocationRepository
|
||||
import com.geeksville.mesh.repository.network.MQTTRepository
|
||||
import com.geeksville.mesh.repository.radio.BluetoothInterface
|
||||
import com.geeksville.mesh.repository.radio.RadioInterfaceService
|
||||
import com.geeksville.mesh.repository.radio.RadioServiceConnectionState
|
||||
|
@ -38,6 +39,7 @@ import kotlinx.coroutines.CoroutineScope
|
|||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
|
@ -75,6 +77,9 @@ class MeshService : Service(), Logging {
|
|||
@Inject
|
||||
lateinit var radioConfigRepository: RadioConfigRepository
|
||||
|
||||
@Inject
|
||||
lateinit var mqttRepository: MQTTRepository
|
||||
|
||||
companion object : Logging {
|
||||
|
||||
/// Intents broadcast by MeshService
|
||||
|
@ -145,6 +150,7 @@ class MeshService : Service(), Logging {
|
|||
private var connectionState = ConnectionState.DISCONNECTED
|
||||
|
||||
private var locationFlow: Job? = null
|
||||
private var mqttMessageFlow: Job? = null
|
||||
|
||||
private fun getSenderName(packet: DataPacket?): String {
|
||||
val name = nodeDBbyID[packet?.from]?.user?.longName
|
||||
|
@ -184,7 +190,7 @@ class MeshService : Service(), Logging {
|
|||
|
||||
private fun stopLocationRequests() {
|
||||
if (locationFlow?.isActive == true) {
|
||||
debug("Stopping location requests")
|
||||
info("Stopping location requests")
|
||||
locationFlow?.cancel()
|
||||
locationFlow = null
|
||||
}
|
||||
|
@ -515,8 +521,7 @@ class MeshService : Service(), Logging {
|
|||
wantAck = true,
|
||||
channel = adminChannelIndex,
|
||||
priority = MeshPacket.Priority.RELIABLE
|
||||
)
|
||||
{
|
||||
) {
|
||||
this.wantResponse = wantResponse
|
||||
portnumValue = Portnums.PortNum.ADMIN_APP_VALUE
|
||||
payload = AdminProtos.AdminMessage.newBuilder().also {
|
||||
|
@ -849,7 +854,7 @@ class MeshService : Service(), Logging {
|
|||
|
||||
private fun stopPacketQueue() {
|
||||
if (queueJob?.isActive == true) {
|
||||
debug("Stopping packet queueJob")
|
||||
info("Stopping packet queueJob")
|
||||
queueJob?.cancel()
|
||||
queueJob = null
|
||||
queuedPackets.clear()
|
||||
|
@ -1038,6 +1043,7 @@ class MeshService : Service(), Logging {
|
|||
fun startDeviceSleep() {
|
||||
stopPacketQueue()
|
||||
stopLocationRequests()
|
||||
stopMqttClientProxy()
|
||||
|
||||
if (connectTimeMsec != 0L) {
|
||||
val now = System.currentTimeMillis()
|
||||
|
@ -1071,6 +1077,7 @@ class MeshService : Service(), Logging {
|
|||
fun startDisconnect() {
|
||||
stopPacketQueue()
|
||||
stopLocationRequests()
|
||||
stopMqttClientProxy()
|
||||
|
||||
GeeksvilleApplication.analytics.track(
|
||||
"mesh_disconnect",
|
||||
|
@ -1120,12 +1127,9 @@ class MeshService : Service(), Logging {
|
|||
|
||||
connectionState = c
|
||||
when (c) {
|
||||
ConnectionState.CONNECTED ->
|
||||
startConnect()
|
||||
ConnectionState.DEVICE_SLEEP ->
|
||||
startDeviceSleep()
|
||||
ConnectionState.DISCONNECTED ->
|
||||
startDisconnect()
|
||||
ConnectionState.CONNECTED -> startConnect()
|
||||
ConnectionState.DEVICE_SLEEP -> startDeviceSleep()
|
||||
ConnectionState.DISCONNECTED -> startDisconnect()
|
||||
}
|
||||
|
||||
// Update the android notification in the status bar
|
||||
|
@ -1169,6 +1173,7 @@ class MeshService : Service(), Logging {
|
|||
MeshProtos.FromRadio.MODULECONFIG_FIELD_NUMBER -> handleModuleConfig(proto.moduleConfig)
|
||||
MeshProtos.FromRadio.QUEUESTATUS_FIELD_NUMBER -> handleQueueStatus(proto.queueStatus)
|
||||
MeshProtos.FromRadio.METADATA_FIELD_NUMBER -> handleMetadata(proto.metadata)
|
||||
MeshProtos.FromRadio.MQTTCLIENTPROXYMESSAGE_FIELD_NUMBER -> handleMqttProxyMessage(proto.mqttClientProxyMessage)
|
||||
else -> errormsg("Unexpected FromRadio variant")
|
||||
}
|
||||
} catch (ex: InvalidProtocolBufferException) {
|
||||
|
@ -1364,10 +1369,52 @@ class MeshService : Service(), Logging {
|
|||
rawDeviceMetadata = metadata
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish MqttClientProxyMessage (fromRadio)
|
||||
*/
|
||||
private fun handleMqttProxyMessage(message: MeshProtos.MqttClientProxyMessage) {
|
||||
with(message) {
|
||||
when (payloadVariantCase) {
|
||||
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.TEXT -> {
|
||||
mqttRepository.publish(topic, text, retained)
|
||||
}
|
||||
|
||||
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.DATA -> {
|
||||
mqttRepository.publish(topic, data.toByteArray(), retained)
|
||||
}
|
||||
|
||||
else -> {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect, subscribe and receive Flow of MqttClientProxyMessage (toRadio)
|
||||
*/
|
||||
private fun startMqttClientProxy() {
|
||||
if (mqttMessageFlow?.isActive == true) return
|
||||
if (moduleConfig.mqtt.enabled && moduleConfig.mqtt.proxyToClientEnabled) {
|
||||
mqttMessageFlow = mqttRepository.proxyMessageFlow.onEach { message ->
|
||||
sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message })
|
||||
}.catch { throwable ->
|
||||
errormsg("MqttClientProxy failed: $throwable")
|
||||
}.launchIn(serviceScope)
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopMqttClientProxy() {
|
||||
if (mqttMessageFlow?.isActive == true) {
|
||||
info("Stopping MqttClientProxy")
|
||||
mqttMessageFlow?.cancel()
|
||||
mqttMessageFlow = null
|
||||
}
|
||||
}
|
||||
|
||||
/// If we've received our initial config, our radio settings and all of our channels, send any queued packets and broadcast connected to clients
|
||||
private fun onHasSettings() {
|
||||
|
||||
processQueuedPackets() // send any packets that were queued up
|
||||
startMqttClientProxy()
|
||||
|
||||
// broadcast an intent with our new connection state
|
||||
serviceBroadcasts.broadcastConnection()
|
||||
|
|
Ładowanie…
Reference in New Issue