kopia lustrzana https://github.com/meshtastic/Meshtastic-Android
feat: add JSON topic subscription to MQTT client
rodzic
91943860e9
commit
f8a7596219
|
@ -48,61 +48,6 @@ class MQTTRepository @Inject constructor(
|
|||
|
||||
private var mqttClient: MqttAsyncClient? = null
|
||||
|
||||
suspend fun connect(callback: MqttCallbackExtended) {
|
||||
val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId()
|
||||
val channelSet = radioConfigRepository.channelSetFlow.first()
|
||||
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
|
||||
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
// Create a custom SSLContext that trusts all certificates
|
||||
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
|
||||
|
||||
val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
|
||||
val connectOptions = MqttConnectOptions().apply {
|
||||
userName = mqttConfig.username
|
||||
password = mqttConfig.password.toCharArray()
|
||||
isCleanSession = false // must be false to auto subscribe on reconnects
|
||||
isAutomaticReconnect = true
|
||||
if (mqttConfig.tlsEnabled) {
|
||||
socketFactory = sslContext.socketFactory
|
||||
}
|
||||
setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
|
||||
}
|
||||
|
||||
val bufferOptions = DisconnectedBufferOptions().apply {
|
||||
isBufferEnabled = true
|
||||
bufferSize = 512
|
||||
isPersistBuffer = false
|
||||
isDeleteOldestMessages = true
|
||||
}
|
||||
|
||||
val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp"
|
||||
val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS }
|
||||
.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) }
|
||||
|
||||
val serverURI: String = URI(scheme, null, host, port, "", "", "").toString()
|
||||
|
||||
val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } +
|
||||
DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL
|
||||
|
||||
mqttClient = MqttAsyncClient(
|
||||
serverURI,
|
||||
ownerId,
|
||||
MemoryPersistence(),
|
||||
)
|
||||
mqttClient?.apply {
|
||||
setCallback(callback)
|
||||
setBufferOpts(bufferOptions)
|
||||
connect(connectOptions).waitForCompletion()
|
||||
|
||||
channelSet.subscribeList.forEach { globalId ->
|
||||
val topicFilter = "$topic$globalId/#"
|
||||
subscribe(topicFilter, DEFAULT_QOS).waitForCompletion()
|
||||
info("MQTT Subscribed to topic: $topicFilter")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun disconnect() {
|
||||
info("MQTT Disconnected")
|
||||
mqttClient?.apply {
|
||||
|
@ -113,9 +58,42 @@ class MQTTRepository @Inject constructor(
|
|||
}
|
||||
|
||||
val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
|
||||
val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId()
|
||||
val channelSet = radioConfigRepository.channelSetFlow.first()
|
||||
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
|
||||
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
// Create a custom SSLContext that trusts all certificates
|
||||
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
|
||||
|
||||
val rootTopic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT }
|
||||
val statTopic = "$rootTopic$STAT_TOPIC_LEVEL$ownerId"
|
||||
|
||||
val connectOptions = MqttConnectOptions().apply {
|
||||
userName = mqttConfig.username
|
||||
password = mqttConfig.password.toCharArray()
|
||||
isAutomaticReconnect = true
|
||||
if (mqttConfig.tlsEnabled) {
|
||||
socketFactory = sslContext.socketFactory
|
||||
}
|
||||
setWill(statTopic, "offline".encodeToByteArray(), DEFAULT_QOS, true)
|
||||
}
|
||||
|
||||
val bufferOptions = DisconnectedBufferOptions().apply {
|
||||
isBufferEnabled = true
|
||||
bufferSize = 512
|
||||
isPersistBuffer = false
|
||||
isDeleteOldestMessages = true
|
||||
}
|
||||
|
||||
val callback = object : MqttCallbackExtended {
|
||||
override fun connectComplete(reconnect: Boolean, serverURI: String) {
|
||||
info("MQTT connectComplete: $serverURI reconnect: $reconnect ")
|
||||
channelSet.subscribeList.forEach { globalId ->
|
||||
subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/#")
|
||||
if (mqttConfig.jsonEnabled) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/#")
|
||||
}
|
||||
// publish(statTopic, "online".encodeToByteArray(), DEFAULT_QOS, true)
|
||||
}
|
||||
|
||||
override fun connectionLost(cause: Throwable) {
|
||||
|
@ -135,11 +113,29 @@ class MQTTRepository @Inject constructor(
|
|||
info("MQTT deliveryComplete messageId: ${token?.messageId}")
|
||||
}
|
||||
}
|
||||
connect(callback)
|
||||
|
||||
val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp"
|
||||
val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS }
|
||||
.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) }
|
||||
|
||||
mqttClient = MqttAsyncClient(
|
||||
URI(scheme, null, host, port, "", "", "").toString(),
|
||||
ownerId,
|
||||
MemoryPersistence(),
|
||||
).apply {
|
||||
setCallback(callback)
|
||||
setBufferOpts(bufferOptions)
|
||||
connect(connectOptions)
|
||||
}
|
||||
|
||||
awaitClose { disconnect() }
|
||||
}
|
||||
|
||||
private fun subscribe(topic: String) {
|
||||
mqttClient?.subscribe(topic, DEFAULT_QOS)
|
||||
info("MQTT Subscribed to topic: $topic")
|
||||
}
|
||||
|
||||
fun publish(topic: String, data: ByteArray, retained: Boolean) {
|
||||
try {
|
||||
val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)
|
||||
|
|
Ładowanie…
Reference in New Issue