Create bus manager.

pull/273/head
Fredrik Öhrström 2021-03-08 17:14:03 +01:00
rodzic a53cb1f77c
commit 71c32c7852
10 zmienionych plików z 197 dodań i 58 usunięć

Wyświetl plik

@ -81,26 +81,33 @@ shared_ptr<SerialCommunicationManager> serial_manager_;
// Manage registered meters to decode and relay.
shared_ptr<MeterManager> meter_manager_;
// Current active set of wmbus devices that can receive telegrams.
// This can change during runtime, plugging/unplugging wmbus dongles.
vector<shared_ptr<WMBus>> bus_devices_;
RecursiveMutex bus_devices_mutex_("bus_devices_mutex");
#define LOCK_BUS_DEVICES(where) WITH(bus_devices_mutex_, where)
struct BusManager
{
BusManager() : bus_devices_mutex_("bus_devices_mutex") {}
// Current active set of wmbus devices that can receive telegrams.
// This can change during runtime, plugging/unplugging wmbus dongles.
vector<shared_ptr<WMBus>> bus_devices_;
RecursiveMutex bus_devices_mutex_;
#define LOCK_BUS_DEVICES(where) WITH(bus_manager_->bus_devices_mutex_, bus_devices_mutex, where)
// Remember devices that were not detected as wmbus devices.
// To avoid probing them again and again.
set<string> not_serial_wmbus_devices_;
// Remember devices that were not detected as wmbus devices.
// To avoid probing them again and again.
set<string> not_serial_wmbus_devices_;
// The software radio devices are always swradio devices
// but they might not be available for wmbusmeters.
set<string> not_swradio_wmbus_devices_;
// The software radio devices are always swradio devices
// but they might not be available for wmbusmeters.
set<string> not_swradio_wmbus_devices_;
// When manually supplying stdin or a file, then, after
// it has been read, do not open it again!
set<string> do_not_open_file_again_;
// When manually supplying stdin or a file, then, after
// it has been read, do not open it again!
set<string> do_not_open_file_again_;
// Store simulation files here.
set<string> simulation_files_;
// Store simulation files here.
set<string> simulation_files_;
};
// Manage bus devices wmbus, mbus, HAN etc.
shared_ptr<BusManager> bus_manager_;
// Rendering the telegrams to json,fields or shell calls is
// done by the printer.
@ -232,7 +239,7 @@ void check_for_dead_wmbus_devices(Configuration *config)
trace("[MAIN] checking for dead wmbus devices...\n");
vector<WMBus*> not_working;
for (auto &w : bus_devices_)
for (auto &w : bus_manager_->bus_devices_)
{
if (!w->isWorking())
{
@ -252,20 +259,20 @@ void check_for_dead_wmbus_devices(Configuration *config)
for (auto w : not_working)
{
auto i = bus_devices_.begin();
while (i != bus_devices_.end())
auto i = bus_manager_->bus_devices_.begin();
while (i != bus_manager_->bus_devices_.end())
{
if (w == (*i).get())
{
// The erased shared_ptr will delete the WMBus object.
bus_devices_.erase(i);
bus_manager_->bus_devices_.erase(i);
break;
}
i++;
}
}
if (bus_devices_.size() == 0)
if (bus_manager_->bus_devices_.size() == 0)
{
if (config->single_device_override)
{
@ -452,13 +459,13 @@ void detect_and_configure_wmbus_devices(Configuration *config, DetectionType dt)
specified_device.handled = true;
continue;
}
if (simulation_files_.count(specified_device.file) > 0)
if (bus_manager_->simulation_files_.count(specified_device.file) > 0)
{
debug("(main) %s already configured as simulation\n", specified_device.file.c_str());
specified_device.handled = true;
continue;
}
if (do_not_open_file_again_.count(specified_device.file) > 0)
if (bus_manager_->do_not_open_file_again_.count(specified_device.file) > 0)
{
// This was stdin/file, it should only be opened once.
trace("[MAIN] ignoring handled file %s\n", specified_device.file.c_str());
@ -466,13 +473,13 @@ void detect_and_configure_wmbus_devices(Configuration *config, DetectionType dt)
continue;
}
if (not_serial_wmbus_devices_.count(specified_device.file) > 0)
if (bus_manager_->not_serial_wmbus_devices_.count(specified_device.file) > 0)
{
// Enumerate all serial devices that might connect to a wmbus device.
vector<string> ttys = serial_manager_->listSerialTTYs();
// Did a non-wmbus-device get unplugged? Then remove it from the known-not-wmbus-device set.
remove_lost_serial_devices_from_ignore_list(ttys);
if (not_serial_wmbus_devices_.count(specified_device.file) > 0)
if (bus_manager_->not_serial_wmbus_devices_.count(specified_device.file) > 0)
{
trace("[MAIN] ignoring failed file %s\n", specified_device.file.c_str());
specified_device.handled = true;
@ -496,14 +503,14 @@ void detect_and_configure_wmbus_devices(Configuration *config, DetectionType dt)
if (checkCharacterDeviceExists(specified_device.file.c_str(), false))
{
// Yes, this device actually exists, there is a need to ignore it.
not_serial_wmbus_devices_.insert(specified_device.file);
bus_manager_->not_serial_wmbus_devices_.insert(specified_device.file);
}
}
if (detected.specified_device.is_stdin || detected.specified_device.is_file || detected.specified_device.is_simulation)
{
// Only read stdin and files once!
do_not_open_file_again_.insert(specified_device.file);
bus_manager_->do_not_open_file_again_.insert(specified_device.file);
}
open_bus_device_and_potentially_set_linkmodes(config, "config", &detected);
}
@ -521,7 +528,7 @@ void detect_and_configure_wmbus_devices(Configuration *config, DetectionType dt)
perform_auto_scan_of_swradio_devices(config);
}
for (shared_ptr<WMBus> &wmbus : bus_devices_)
for (shared_ptr<WMBus> &wmbus : bus_manager_->bus_devices_)
{
assert(wmbus->getDetected() != NULL);
find_specified_device_and_mark_as_handled(config, wmbus->getDetected());
@ -776,7 +783,7 @@ void open_bus_device_and_potentially_set_linkmodes(Configuration *config, string
shared_ptr<WMBus> wmbus = create_wmbus_object(detected, config, serial_manager_);
if (wmbus == NULL) return;
bus_devices_.push_back(wmbus);
bus_manager_->bus_devices_.push_back(wmbus);
// By default, reset your dongle once every 23 hours,
// so that the reset is not at the exact same time every day.
@ -800,7 +807,7 @@ void open_bus_device_and_potentially_set_linkmodes(Configuration *config, string
{
simulated = true;
debug("(main) added %s to files\n", detected->found_file.c_str());
simulation_files_.insert(detected->specified_device.file);
bus_manager_->simulation_files_.insert(detected->specified_device.file);
}
wmbus->onTelegram([&, simulated](AboutTelegram &about,vector<uchar> data){return meter_manager_->handleTelegram(about, data, simulated);});
wmbus->setTimeout(config->alarm_timeout, config->alarm_expected_activity);
@ -817,7 +824,7 @@ void perform_auto_scan_of_serial_devices(Configuration *config)
for (string& tty : ttys)
{
trace("[MAIN] serial device %s\n", tty.c_str());
if (not_serial_wmbus_devices_.count(tty) > 0)
if (bus_manager_->not_serial_wmbus_devices_.count(tty) > 0)
{
trace("[MAIN] skipping already probed not wmbus serial device %s\n", tty.c_str());
continue;
@ -859,7 +866,7 @@ void perform_auto_scan_of_serial_devices(Configuration *config)
// This serial device was something that we could not recognize.
// A modem, an android phone, a teletype Model 33, etc....
// Mark this serial device as unknown, to avoid repeated detection attempts.
not_serial_wmbus_devices_.insert(tty);
bus_manager_->not_serial_wmbus_devices_.insert(tty);
verbose("(main) ignoring %s, it does not respond as any of the supported wmbus devices.\n", tty.c_str());
}
}
@ -898,7 +905,7 @@ void perform_auto_scan_of_swradio_devices(Configuration *config)
for (string& serialnr : serialnrs)
{
trace("[MAIN] rtlsdr device %s\n", serialnr.c_str());
if (not_swradio_wmbus_devices_.count(serialnr) > 0)
if (bus_manager_->not_swradio_wmbus_devices_.count(serialnr) > 0)
{
trace("[MAIN] skipping already probed rtlsdr %s\n", serialnr.c_str());
continue;
@ -913,7 +920,7 @@ void perform_auto_scan_of_swradio_devices(Configuration *config)
if (ac != AccessCheck::AccessOK)
{
// We cannot access this swradio device.
not_swradio_wmbus_devices_.insert(serialnr);
bus_manager_->not_swradio_wmbus_devices_.insert(serialnr);
verbose("(main) ignoring rtlsdr %s since it is unavailable.\n", serialnr.c_str());
}
else
@ -950,6 +957,8 @@ void regular_checkup(Configuration *config)
}
}
meter_manager_->pollMeters();
if (serial_manager_ && config)
{
detect_and_configure_wmbus_devices(config, DetectionType::ALL);
@ -958,7 +967,7 @@ void regular_checkup(Configuration *config)
{
LOCK_BUS_DEVICES(regular_checkup);
for (auto &w : bus_devices_)
for (auto &w : bus_manager_->bus_devices_)
{
if (w->isWorking())
{
@ -973,7 +982,7 @@ void remove_lost_serial_devices_from_ignore_list(vector<string> &devices)
vector<string> to_be_removed;
// Iterate over the devices known to NOT be wmbus devices.
for (const string& nots : not_serial_wmbus_devices_)
for (const string& nots : bus_manager_->not_serial_wmbus_devices_)
{
auto i = std::find(devices.begin(), devices.end(), nots);
if (i == devices.end())
@ -988,7 +997,7 @@ void remove_lost_serial_devices_from_ignore_list(vector<string> &devices)
for (string& r : to_be_removed)
{
not_serial_wmbus_devices_.erase(r);
bus_manager_->not_serial_wmbus_devices_.erase(r);
}
}
@ -997,7 +1006,7 @@ void remove_lost_swradio_devices_from_ignore_list(vector<string> &devices)
vector<string> to_be_removed;
// Iterate over the devices known to NOT be wmbus devices.
for (const string& nots : not_swradio_wmbus_devices_)
for (const string& nots : bus_manager_->not_swradio_wmbus_devices_)
{
auto i = std::find(devices.begin(), devices.end(), nots);
if (i == devices.end())
@ -1012,7 +1021,7 @@ void remove_lost_swradio_devices_from_ignore_list(vector<string> &devices)
for (string& r : to_be_removed)
{
not_swradio_wmbus_devices_.erase(r);
bus_manager_->not_swradio_wmbus_devices_.erase(r);
}
}
@ -1041,7 +1050,20 @@ void setup_meters(Configuration *config, MeterManager *manager)
for (auto &m : config->meters)
{
m.conversions = config->conversions;
manager->addMeterTemplate(m);
if (needsPolling(m.driver))
{
// A polling meter must be defined from the start.
auto meter = createMeter(&m);
manager->addMeter(meter);
}
else
{
// Non polling meters are added lazily, when
// the first telegram arrives. Just add a template
// here.
manager->addMeterTemplate(m);
}
}
}
@ -1082,6 +1104,7 @@ bool start(Configuration *config)
// or sent to shell invocations.
printer_ = create_printer(config);
bus_manager_ = shared_ptr<BusManager>(new BusManager());
meter_manager_ = createMeterManager(config->daemon);
// When a meter is updated, print it, shell it, log it, etc.
@ -1105,7 +1128,7 @@ bool start(Configuration *config)
serial_manager_->startEventLoop();
detect_and_configure_wmbus_devices(config, DetectionType::ALL);
if (bus_devices_.size() == 0)
if (bus_manager_->bus_devices_.size() == 0)
{
if (config->nodeviceexit)
{
@ -1139,16 +1162,57 @@ bool start(Configuration *config)
Telegram t;
t.about = about;
MeterKeys mk;
t.parse(frame, &mk, false); // Try a best effort parse, do not print any warnings.
t.print();
t.explainParse("(wmbus)",0);
logTelegram(t.original, t.frame, 0, 0);
if (t.about.type == FrameType::WMBUS)
{
t.parse(frame, &mk, false); // Try a best effort parse, do not print any warnings.
t.print();
t.explainParse("(wmbus)",0);
logTelegram(t.original, t.frame, 0, 0);
}
else
{
t.parseMBusHeader(frame); // Try a best effort parse, do not print any warnings.
t.print();
t.explainParse("(mbus)",0);
logTelegram(t.original, t.frame, 0, 0);
}
return true;
});
}
for (auto &w : bus_manager_->bus_devices_)
{
if (w->type() == WMBusDeviceType::DEVICE_MBUS)
{
fprintf(stderr, "SENDING Query...\n");
vector<uchar> buf(5);
buf[0] = 0x10; // Start
buf[1] = 0x40; // SND_NKE
buf[2] = 0x00; // address 0
uchar cs = 0;
for (int i=1; i<3; ++i) cs += buf[i];
buf[3] = cs; // checksum
buf[4] = 0x16; // Stop
for (auto &w : bus_devices_)
w->serial()->send(buf);
sleep(2);
buf[0] = 0x10; // Start
buf[1] = 0x5b; // REQ_UD2
buf[2] = 0x00; // address 0
cs = 0;
for (int i=1; i<3; ++i) cs += buf[i];
buf[3] = cs; // checksum
buf[4] = 0x16; // Stop
w->serial()->send(buf);
}
}
for (auto &w : bus_manager_->bus_devices_)
{
// Real devices do nothing, but the simulator device will simulate.
w->simulate();
@ -1167,7 +1231,7 @@ bool start(Configuration *config)
}
// Destroy any remaining allocated objects.
bus_devices_.clear();
bus_manager_->bus_devices_.clear();
meter_manager_->removeAllMeters();
printer_.reset();
serial_manager_.reset();

Wyświetl plik

@ -28,7 +28,7 @@ struct MeterPIIGTH : public virtual TempHygroMeter, public virtual MeterCommonIm
double currentRelativeHumidity();
private:
void poll();
void processContent(Telegram *t);
double current_temperature_c_ {};
@ -93,6 +93,33 @@ double MeterPIIGTH::currentRelativeHumidity()
return current_relative_humidity_rh_;
}
void MeterPIIGTH::poll()
{
fprintf(stderr, "SENDING Query...\n");
vector<uchar> buf(5);
buf[0] = 0x10; // Start
buf[1] = 0x40; // SND_NKE
buf[2] = 0x00; // address 0
uchar cs = 0;
for (int i=1; i<3; ++i) cs += buf[i];
buf[3] = cs; // checksum
buf[4] = 0x16; // Stop
bus()->serial()->send(buf);
sleep(2);
buf[0] = 0x10; // Start
buf[1] = 0x5b; // REQ_UD2
buf[2] = 0x00; // address 0
cs = 0;
for (int i=1; i<3; ++i) cs += buf[i];
buf[3] = cs; // checksum
buf[4] = 0x16; // Stop
bus()->serial()->send(buf);
}
void MeterPIIGTH::processContent(Telegram *t)
{
int offset;

Wyświetl plik

@ -48,6 +48,7 @@ public:
{
meters_.push_back(meter);
meter->setIndex(meters_.size());
meter->onUpdate(on_meter_updated_);
}
Meter *lastAddedMeter()
@ -176,8 +177,6 @@ public:
}
// Now build a meter object with for this exact id.
auto meter = createMeter(&tmp);
meter->onUpdate(on_meter_updated_);
addMeter(meter);
string idsc = toIdsCommaSeparated(t.ids);
verbose("(meter) used meter template %s %s %s to match %s\n",
@ -240,11 +239,20 @@ public:
{
on_telegram_ = cb;
}
void whenMeterUpdated(std::function<void(Telegram*t,Meter*)> cb)
{
on_meter_updated_ = cb;
}
void pollMeters()
{
for (auto &m : meters_)
{
m->poll();
}
}
MeterManagerImplementation(bool daemon) : is_daemon_(daemon) {}
~MeterManagerImplementation() {}
};
@ -256,7 +264,7 @@ shared_ptr<MeterManager> createMeterManager(bool daemon)
MeterCommonImplementation::MeterCommonImplementation(MeterInfo &mi,
MeterDriver driver) :
driver_(driver), name_(mi.name)
driver_(driver), bus_(NULL), name_(mi.name)
{
ids_ = mi.ids;
idsc_ = toIdsCommaSeparated(ids_);
@ -336,6 +344,10 @@ void MeterCommonImplementation::addPrint(string vname, Quantity vquantity,
prints_.push_back( { vname, vquantity, defaultUnitForQuantity(vquantity), NULL, getValueFunc, help, field, json, vname } );
}
void MeterCommonImplementation::poll()
{
}
vector<string>& MeterCommonImplementation::ids()
{
return ids_;
@ -388,6 +400,14 @@ string MeterCommonImplementation::datetimeOfUpdateRobot()
return string(datetime);
}
bool needsPolling(MeterDriver d)
{
#define X(mname,linkmodes,info,driver,cname) if (d == MeterDriver::driver && 0 != ((linkmodes) & MBUS_bit)) return true;
LIST_OF_METERS
#undef X
return false;
}
string toMeterDriver(MeterDriver mt)
{
#define X(mname,link,info,type,cname) if (mt == MeterDriver::type) return #mname;
@ -535,6 +555,11 @@ void MeterCommonImplementation::setIndex(int i)
index_ = i;
}
WMBus *MeterCommonImplementation::bus()
{
return bus_;
}
void MeterCommonImplementation::triggerUpdate(Telegram *t)
{
datetime_of_update_ = time(NULL);

Wyświetl plik

@ -102,6 +102,9 @@ struct MeterMatch
int version;
};
// Return true for mbus and C2/T2 meters.
bool needsPolling(MeterDriver driver);
// Return a list of matching drivers, like: multical21
void detectMeterDrivers(int manufacturer, int media, int version, std::vector<std::string> *drivers);
// When entering the driver, check that the telegram is indeed known to be
@ -131,6 +134,11 @@ struct MeterInfo
vector<string> jsons; // Additional static jsons that are added to each message.
vector<Unit> conversions; // Additional units desired in json.
// If this is a meter that needs to be polled.
int poll_seconds; // Poll every x seconds.
int poll_hour_offset; // Instead of
string poll_time_period; // Poll only during these hours.
MeterInfo()
{
}
@ -169,6 +177,8 @@ struct Meter
// and no exact meter exists. Index 1 is the first meter created etc.
virtual int index() = 0;
virtual void setIndex(int i) = 0;
// Use this bus to send messages to the meter.
virtual WMBus *bus() = 0;
// This meter listens to these ids.
virtual vector<string> &ids() = 0;
// Comma separated ids.
@ -208,6 +218,7 @@ struct Meter
virtual void addConversions(std::vector<Unit> cs) = 0;
virtual void addShell(std::string cmdline) = 0;
virtual vector<string> &shellCmdlines() = 0;
virtual void poll() = 0;
virtual ~Meter() = default;
};
@ -224,6 +235,7 @@ struct MeterManager
virtual bool hasMeters() = 0;
virtual void onTelegram(function<void(AboutTelegram&,vector<uchar>)> cb) = 0;
virtual void whenMeterUpdated(std::function<void(Telegram*t,Meter*)> cb) = 0;
virtual void pollMeters() = 0;
virtual ~MeterManager() = default;
};

Wyświetl plik

@ -28,6 +28,7 @@ struct MeterCommonImplementation : public virtual Meter
{
int index();
void setIndex(int i);
WMBus *bus();
vector<string>& ids();
string idsc();
vector<string> fields();
@ -77,6 +78,9 @@ protected:
// Print the dimensionless Text quantity, no unit is needed.
void addPrint(string vname, Quantity vquantity,
function<std::string()> getValueFunc, string help, bool field, bool json);
// The default implementation of poll does nothing.
// Override for mbus meters that need to be queried and likewise for C2/T2 wmbus-meters.
void poll();
bool handleTelegram(AboutTelegram &about, vector<uchar> frame, bool simulated, string *id, bool *id_match);
void printMeter(Telegram *t,
string *human_readable,
@ -92,6 +96,7 @@ private:
int index_ {};
MeterDriver driver_ {};
WMBus *bus_ {};
MeterKeys meter_keys_ {};
ELLSecurityMode expected_ell_sec_mode_ {};
TPLSecurityMode expected_tpl_sec_mode_ {};

Wyświetl plik

@ -113,14 +113,14 @@ private:
vector<shared_ptr<SerialDevice>> serial_devices_;
RecursiveMutex serial_devices_mutex_ = { "serial_devices_mutex" };
#define LOCK_SERIAL_DEVICES(where) WITH(serial_devices_mutex_, where)
#define LOCK_SERIAL_DEVICES(where) WITH(serial_devices_mutex_, serial_devices_mutex, where)
RecursiveMutex event_loop_mutex_ = {"event_loop_mutex" };
#define LOCK_EVENT_LOOP(where) WITH(event_loop_mutex_, where)
#define LOCK_EVENT_LOOP(where) WITH(event_loop_mutex_, event_loop_mutex, where)
vector<Timer> timers_; // Protected by LOCK_TIMERS
RecursiveMutex timers_mutex_ = { "timers_mutex" };
#define LOCK_TIMERS(where) WITH(timers_mutex_, where)
#define LOCK_TIMERS(where) WITH(timers_mutex_, timers_mutex, where)
};
SerialCommunicationManagerImp::~SerialCommunicationManagerImp()
@ -177,10 +177,10 @@ struct SerialDeviceImp : public SerialDevice
protected:
RecursiveMutex read_mutex_ = { "read_mutex" };
#define LOCK_READ_SERIAL(where) WITH(read_mutex_, where)
#define LOCK_READ_SERIAL(where) WITH(read_mutex_, read_mutex, where)
RecursiveMutex write_mutex_ = { "write_mutex" };
#define LOCK_WRITE_SERIAL(where) WITH(write_mutex_, where)
#define LOCK_WRITE_SERIAL(where) WITH(write_mutex_, write_mutex, where)
function<void()> on_data_;
function<void()> on_disappear_;

Wyświetl plik

@ -68,7 +68,7 @@ size_t getCurrentRSS();
x ## pid_ = 0; \
trace("[UNLOCKED] " #x " " func "\n"); }
#define WITH(mutex,func) Lock local_ ## mutex (&mutex, #func)
#define WITH(mutex,name,func) Lock local_ ## name (&mutex, #func)
struct Lock;

Wyświetl plik

@ -3514,6 +3514,10 @@ void WMBusCommonImplementation::onTelegram(function<bool(AboutTelegram&,vector<u
telegram_listeners_.push_back(cb);
}
void WMBusCommonImplementation::sendTelegram(Telegram *t)
{
warning("(bus) Trying to send telegram to bus that has not implemented sending!\n");
}
static bool ignore_duplicate_telegrams_ = false;

Wyświetl plik

@ -553,6 +553,7 @@ struct WMBus
virtual bool canSetLinkModes(LinkModeSet lms) = 0;
virtual void setLinkModes(LinkModeSet lms) = 0;
virtual void onTelegram(function<bool(AboutTelegram&,vector<uchar>)> cb) = 0;
virtual void sendTelegram(Telegram *t) = 0;
virtual SerialDevice *serial() = 0;
// Return true of the serial has been overridden, usually with stdin or a file.
virtual bool serialOverride() = 0;

Wyświetl plik

@ -34,6 +34,7 @@ struct WMBusCommonImplementation : public virtual WMBus
bool isSerial();
WMBusDeviceType type();
void onTelegram(function<bool(AboutTelegram&,vector<uchar>)> cb);
void sendTelegram(Telegram *t);
bool handleTelegram(AboutTelegram &about, vector<uchar> frame);
void checkStatus();
bool isWorking();
@ -105,7 +106,7 @@ protected:
// Lock this mutex when you sent a request to the wmbus device
// Unlock when you received the response or it timedout.
RecursiveMutex command_mutex_;
#define LOCK_WMBUS_EXECUTING_COMMAND(where) WITH(command_mutex_, where)
#define LOCK_WMBUS_EXECUTING_COMMAND(where) WITH(command_mutex_, command_mutex, where)
// Use waitForRespones/notifyReponseIsHere to wait for a response
// while the command_mutex_ is taken.