kopia lustrzana https://github.com/Yakifo/amqtt
350 wiersze
10 KiB
Python
350 wiersze
10 KiB
Python
import asyncio
|
|
import math
|
|
|
|
import pytest
|
|
import time
|
|
from hamcrest import equal_to, assert_that, close_to, has_key, instance_of, has_entry
|
|
|
|
|
|
|
|
from amqtt.contrib.shadows import ShadowPlugin, ShadowOperation
|
|
from amqtt.contrib.shadows.states import State, StateDocument, calculate_delta_update, calculate_iota_update, \
|
|
MetaTimestamp
|
|
|
|
|
|
@pytest.mark.parametrize("topic,client_id,shadow_name,message_type,is_match", [
|
|
('$shadow/myclientid/myshadow/get', 'myclientid', 'myshadow', ShadowOperation.GET, True),
|
|
('$shadow/myshadow/get', '', '', '', False)
|
|
])
|
|
def test_shadow_topic_match(topic, client_id, shadow_name, message_type, is_match):
|
|
|
|
# broker_context = BrokerContext(broker=Broker())
|
|
# shadow_plugin = ShadowPlugin(context=broker_context)
|
|
shadow_topic = ShadowPlugin.shadow_topic_match(topic)
|
|
if is_match:
|
|
assert shadow_topic.device_id == client_id
|
|
assert shadow_topic.name == shadow_name
|
|
assert shadow_topic.message_op in ShadowOperation
|
|
assert shadow_topic.message_op == message_type
|
|
else:
|
|
assert shadow_topic is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_state_add():
|
|
|
|
cur_time = math.floor(time.time())
|
|
|
|
data = {
|
|
'state':{
|
|
'desired': {
|
|
'item1': 'value1a',
|
|
'item2': 'value2a'
|
|
},
|
|
'reported': {
|
|
'item1': 'value1a',
|
|
'item2': 'value2b'
|
|
}
|
|
}
|
|
}
|
|
|
|
meta = {
|
|
'metadata': {
|
|
'desired': {
|
|
'item1': 10,
|
|
'item2': 20
|
|
},
|
|
'reported': {
|
|
'item1': 11,
|
|
'item2': 21
|
|
}
|
|
}
|
|
}
|
|
|
|
data_state = State.from_dict(data['state'])
|
|
meta_state = State.from_dict(meta['metadata'])
|
|
|
|
state_document_one = StateDocument(state=data_state, metadata=meta_state)
|
|
await asyncio.sleep(2)
|
|
|
|
data_update = {
|
|
'state':{
|
|
'desired': {
|
|
'item2': 'value2a'
|
|
},
|
|
'reported': {
|
|
'item1': 'value1c',
|
|
'item2': 'value2c'
|
|
}
|
|
}
|
|
}
|
|
|
|
state_document_two = StateDocument.from_dict(data_update)
|
|
|
|
final_doc = state_document_one + state_document_two
|
|
|
|
assert final_doc.state.desired['item1'] == 'value1a'
|
|
assert final_doc.metadata.desired['item1'] == 10
|
|
|
|
assert final_doc.state.desired['item2'] == 'value2a'
|
|
assert final_doc.metadata.desired['item2'] > cur_time
|
|
|
|
assert final_doc.state.reported['item1'] == 'value1c'
|
|
assert final_doc.metadata.reported['item1'] > cur_time
|
|
assert final_doc.state.reported['item1'] == 'value1c'
|
|
assert final_doc.metadata.reported['item1'] > cur_time
|
|
|
|
|
|
def test_state_from_dict() -> None:
|
|
|
|
state_dict = {
|
|
'desired': {'keyA': 'valueA', 'keyB': 'valueB'},
|
|
'reported': {'keyC': 'valueC', 'keyD': 'valueD'}
|
|
}
|
|
|
|
state = State.from_dict(state_dict)
|
|
|
|
assert_that(state.desired['keyA'], equal_to('valueA'))
|
|
assert_that(state.desired['keyB'], equal_to('valueB'))
|
|
assert_that(state.reported['keyC'], equal_to('valueC'))
|
|
assert_that(state.reported['keyD'], equal_to('valueD'))
|
|
|
|
|
|
def test_state_doc_from_dict() -> None:
|
|
now = int(time.time())
|
|
|
|
state_dict = {
|
|
'state': {
|
|
'desired': {'keyA': 'valueA', 'keyB': 'valueB'},
|
|
'reported': {'keyC': 'valueC', 'keyD': 'valueD'}
|
|
}
|
|
}
|
|
|
|
state_doc = StateDocument.from_dict(state_dict)
|
|
|
|
assert_that(state_doc.state.desired['keyA'], equal_to('valueA'))
|
|
assert_that(state_doc.state.desired['keyB'], equal_to('valueB'))
|
|
|
|
assert_that(state_doc.metadata.desired, has_key('keyA')) # noqa
|
|
assert_that(state_doc.metadata.desired, has_key('keyB')) # noqa
|
|
|
|
assert_that(state_doc.metadata.desired['keyA'], instance_of(MetaTimestamp)) # noqa
|
|
assert_that(state_doc.metadata.desired['keyB'], instance_of(MetaTimestamp)) # noqa
|
|
assert_that(state_doc.metadata.desired['keyA'], close_to(now, 0)) # noqa
|
|
assert_that(state_doc.metadata.desired['keyA'], equal_to(state_doc.metadata.desired['keyB']))
|
|
|
|
|
|
def test_state_doc_including_meta() -> None:
|
|
now = int(time.time())
|
|
state1 = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
meta1 = State(
|
|
desired={'keyA': now - 100, 'keyB': now - 110},
|
|
reported={'keyC': now - 90, 'keyD': now - 120}
|
|
)
|
|
|
|
state_doc1 = StateDocument(
|
|
state=state1,
|
|
metadata=meta1
|
|
)
|
|
|
|
state2 = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
meta2 = State(
|
|
desired={'keyA': now - 5, 'keyB': now - 5},
|
|
reported={'keyC': now - 5, 'keyD': now - 5}
|
|
)
|
|
|
|
state_doc2 = StateDocument(
|
|
state=state2,
|
|
metadata=meta2
|
|
)
|
|
|
|
new_doc = state_doc1 + state_doc2
|
|
|
|
assert_that(new_doc.metadata.desired['keyA'], equal_to(now - 5))
|
|
assert_that(new_doc.metadata.reported['keyC'], equal_to(now - 5))
|
|
|
|
|
|
def test_state_doc_plus_new_key_update() -> None:
|
|
now = int(time.time())
|
|
state = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
meta = State(
|
|
desired={'keyA': now - 100, 'keyB': now - 110},
|
|
reported={'keyC': now - 90, 'keyD': now - 120}
|
|
)
|
|
|
|
state_doc = StateDocument(
|
|
state=state,
|
|
metadata=meta
|
|
)
|
|
|
|
update_dict = {'state': {'reported': {'keyE': 'valueE', 'keyF': 'valueF'}}}
|
|
update = StateDocument.from_dict(update_dict)
|
|
|
|
next_doc = state_doc + update
|
|
|
|
assert_that(next_doc.state.reported, has_key('keyC'))
|
|
assert_that(next_doc.metadata.reported['keyC'], equal_to(now - 90))
|
|
assert_that(next_doc.metadata.reported['keyD'], equal_to(now - 120))
|
|
|
|
assert_that(next_doc.state.reported, has_key('keyE'))
|
|
assert_that(next_doc.state.reported, has_key('keyF'))
|
|
assert_that(next_doc.metadata.reported['keyE'], close_to(now, 1))
|
|
assert_that(next_doc.metadata.reported['keyF'], close_to(now, 1))
|
|
|
|
|
|
def test_state_with_updated_keys() -> None:
|
|
|
|
now = int(time.time())
|
|
state = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
meta = State(
|
|
desired={'keyA': now - 100, 'keyB': now - 110},
|
|
reported={'keyC': now - 90, 'keyD': now - 120}
|
|
)
|
|
|
|
state_doc = StateDocument(
|
|
state=state,
|
|
metadata=meta
|
|
)
|
|
|
|
update_dict = {'state': {'reported': {'keyD': 'valueD'}}}
|
|
update = StateDocument.from_dict(update_dict)
|
|
|
|
next_doc = state_doc + update
|
|
|
|
assert_that(next_doc.state.reported, has_key('keyC'))
|
|
assert_that(next_doc.state.reported, has_key('keyD'))
|
|
assert_that(next_doc.metadata.reported['keyC'], equal_to(now - 90))
|
|
assert_that(next_doc.metadata.reported['keyD'], close_to(now, 1))
|
|
|
|
|
|
def test_update_with_empty_initial_state() -> None:
|
|
now = int(time.time())
|
|
|
|
prev_doc = StateDocument.from_dict({})
|
|
|
|
state = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
|
|
state_doc = StateDocument(state=state)
|
|
new_doc = prev_doc + state_doc
|
|
|
|
assert_that(state_doc.state.desired, equal_to(new_doc.state.desired))
|
|
assert_that(state_doc.state.reported, equal_to(new_doc.state.reported))
|
|
assert_that(state_doc.metadata.reported, has_key('keyC'))
|
|
assert_that(state_doc.metadata.reported['keyC'], close_to(now, 1))
|
|
|
|
|
|
def test_update_with_clearing_key() -> None:
|
|
state = State(
|
|
desired={'keyA': 'valueA', 'keyB': 'valueB'},
|
|
reported={'keyC': 'valueC', 'keyD': 'valueD'}
|
|
)
|
|
|
|
state_doc = StateDocument(state=state)
|
|
|
|
update_doc = StateDocument.from_dict({'state': {'reported': {'keyC': None}}})
|
|
|
|
new_doc = state_doc + update_doc
|
|
|
|
assert_that(new_doc.state.reported, has_entry(equal_to('keyC'), equal_to(None)))
|
|
|
|
|
|
def test_empty_desired_state() -> None:
|
|
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'reported': {
|
|
'items': ['value1', 'value2', 'value3']
|
|
}
|
|
}
|
|
})
|
|
|
|
diff = calculate_delta_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == {}
|
|
|
|
|
|
def test_empty_reported_state() -> None:
|
|
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'desired': {
|
|
'items': ['value1', 'value2', 'value3']
|
|
}
|
|
}
|
|
})
|
|
|
|
diff = calculate_delta_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == {'items': ['value1', 'value2', 'value3']}
|
|
|
|
|
|
def test_matching_desired_reported_state() -> None:
|
|
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'desired': {
|
|
'items': ['value1', 'value2', 'value3']
|
|
},
|
|
'reported': {
|
|
'items': ['value1', 'value2', 'value3']
|
|
}
|
|
}
|
|
})
|
|
|
|
diff = calculate_delta_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == {}
|
|
|
|
|
|
def test_out_of_order_list() -> None:
|
|
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'desired': {
|
|
'items': ['value1', 'value2', 'value3']
|
|
},
|
|
'reported': {
|
|
'items': ['value2', 'value1', 'value3']
|
|
}
|
|
}
|
|
})
|
|
|
|
diff = calculate_delta_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == { 'items': ['value1', 'value2', 'value3'] }
|
|
|
|
def test_states_with_connector_transaction() -> None:
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'desired': {},
|
|
'reported': {"transaction": False, "transactionId": "5678", "tag": "ghijk"}
|
|
}
|
|
})
|
|
|
|
diff = calculate_iota_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == {"transaction": None, "transactionId": None, "tag": None}
|
|
|
|
|
|
def test_extra_reported_into_desired_with_overlap() -> None:
|
|
state_doc = StateDocument.from_dict({
|
|
'state': {
|
|
'desired': {"connectors": [1, 2, 3]},
|
|
'reported': {"status": None, "heartbeat": "2025-02-07T04:16:51.431Z", "connectors": [1, 2, 3],
|
|
"module_version": "2.0.1", "restartTime": "2025-02-07T03:16:51.431Z"}
|
|
}})
|
|
|
|
diff = calculate_iota_update(state_doc.state.desired, state_doc.state.reported)
|
|
assert diff == {"status": None, "heartbeat": None, "module_version": None, "restartTime": None}
|
|
|