kopia lustrzana https://github.com/ukhas/habitat-cpp-connector
298 wiersze
6.7 KiB
C++
298 wiersze
6.7 KiB
C++
/* Copyright 2011-2012 (C) Daniel Richman. License: GNU GPL 3; see LICENSE. */
|
|
|
|
#include "habitat/UploaderThread.h"
|
|
#include <stdexcept>
|
|
#include <sstream>
|
|
|
|
namespace habitat {
|
|
|
|
void UploaderAction::check(habitat::Uploader *u)
|
|
{
|
|
if (u == NULL)
|
|
throw NotInitialisedError();
|
|
}
|
|
|
|
void UploaderSettings::apply(UploaderThread &uthr)
|
|
{
|
|
uthr.uploader.reset(new habitat::Uploader(
|
|
callsign, couch_uri, couch_db, max_merge_attempts));
|
|
uthr.initialised();
|
|
}
|
|
|
|
string UploaderSettings::describe()
|
|
{
|
|
stringstream ss(stringstream::out);
|
|
ss << "Uploader('" << callsign << "', '" << couch_uri << "', '"
|
|
<< couch_db << "', " << max_merge_attempts << ")";
|
|
return ss.str();
|
|
}
|
|
|
|
void UploaderReset::apply(UploaderThread &uthr)
|
|
{
|
|
uthr.uploader.reset();
|
|
uthr.reset_done();
|
|
}
|
|
|
|
string UploaderReset::describe()
|
|
{
|
|
return "~Uploader()";
|
|
}
|
|
|
|
void UploaderPayloadTelemetry::apply(UploaderThread &uthr)
|
|
{
|
|
check(uthr.uploader.get());
|
|
|
|
string result;
|
|
result = uthr.uploader->payload_telemetry(data, metadata, time_created);
|
|
|
|
uthr.saved_id("payload_telemetry", result);
|
|
}
|
|
|
|
string UploaderPayloadTelemetry::describe()
|
|
{
|
|
stringstream ss(stringstream::out);
|
|
Json::FastWriter writer;
|
|
string metadata_json = writer.write(metadata);
|
|
metadata_json.erase(metadata_json.length() - 1, 1);
|
|
ss << "Uploader.payload_telemetry('" << data << "', "
|
|
<< metadata_json << ", " << time_created << ")";
|
|
return ss.str();
|
|
}
|
|
|
|
void UploaderListenerTelemetry::apply(UploaderThread &uthr)
|
|
{
|
|
check(uthr.uploader.get());
|
|
string result = uthr.uploader->listener_telemetry(data, time_created);
|
|
uthr.saved_id("listener_telemetry", result);
|
|
}
|
|
|
|
string UploaderListenerTelemetry::describe()
|
|
{
|
|
stringstream ss(stringstream::out);
|
|
Json::FastWriter writer;
|
|
string data_json = writer.write(data);
|
|
data_json.erase(data_json.length() - 1, 1);
|
|
ss << "Uploader.listener_telemetry(" << data_json << ", "
|
|
<< time_created << ")";
|
|
return ss.str();
|
|
}
|
|
|
|
void UploaderListenerInfo::apply(UploaderThread &uthr)
|
|
{
|
|
check(uthr.uploader.get());
|
|
string result = uthr.uploader->listener_information(data, time_created);
|
|
uthr.saved_id("listener_information", result);
|
|
}
|
|
|
|
string UploaderListenerInfo::describe()
|
|
{
|
|
stringstream ss(stringstream::out);
|
|
Json::FastWriter writer;
|
|
string data_json = writer.write(data);
|
|
data_json.erase(data_json.length() - 1, 1);
|
|
ss << "Uploader.listener_information(" << data_json << ", "
|
|
<< time_created << ")";
|
|
return ss.str();
|
|
}
|
|
|
|
void UploaderFlights::apply(UploaderThread &uthr)
|
|
{
|
|
check(uthr.uploader.get());
|
|
auto_ptr< vector<Json::Value> > flights;
|
|
flights.reset(uthr.uploader->flights());
|
|
uthr.got_flights(*flights);
|
|
}
|
|
|
|
string UploaderFlights::describe()
|
|
{
|
|
return "Uploader.flights()";
|
|
}
|
|
|
|
void UploaderPayloads::apply(UploaderThread &uthr)
|
|
{
|
|
check(uthr.uploader.get());
|
|
auto_ptr< vector<Json::Value> > payloads;
|
|
payloads.reset(uthr.uploader->payloads());
|
|
uthr.got_payloads(*payloads);
|
|
}
|
|
|
|
string UploaderPayloads::describe()
|
|
{
|
|
return "Uploader.payloads()";
|
|
}
|
|
|
|
void UploaderShutdown::apply(UploaderThread &uthr)
|
|
{
|
|
throw this;
|
|
}
|
|
|
|
string UploaderShutdown::describe()
|
|
{
|
|
return "Shutdown";
|
|
}
|
|
|
|
UploaderThread::UploaderThread() : queued_shutdown(false) {}
|
|
|
|
UploaderThread::~UploaderThread()
|
|
{
|
|
EZ::MutexLock lock(mutex);
|
|
|
|
if (!queued_shutdown)
|
|
shutdown();
|
|
|
|
join();
|
|
}
|
|
|
|
void UploaderThread::queue_action(UploaderAction *action)
|
|
{
|
|
auto_ptr<UploaderAction> destroyer(action);
|
|
|
|
log("Queuing " + action->describe());
|
|
queue.put(action);
|
|
destroyer.release();
|
|
}
|
|
|
|
void UploaderThread::settings(const string &callsign, const string &couch_uri,
|
|
const string &couch_db, int max_merge_attempts)
|
|
{
|
|
queue_action(
|
|
new UploaderSettings(callsign, couch_uri, couch_db, max_merge_attempts)
|
|
);
|
|
}
|
|
|
|
void UploaderThread::reset()
|
|
{
|
|
queue_action(new UploaderReset());
|
|
}
|
|
|
|
void UploaderThread::payload_telemetry(const string &data,
|
|
const Json::Value &metadata,
|
|
int time_created)
|
|
{
|
|
queue_action(new UploaderPayloadTelemetry(data, metadata, time_created));
|
|
}
|
|
|
|
void UploaderThread::listener_telemetry(const Json::Value &data,
|
|
int time_created)
|
|
{
|
|
queue_action(new UploaderListenerTelemetry(data, time_created));
|
|
}
|
|
|
|
void UploaderThread::listener_information(const Json::Value &data,
|
|
int time_created)
|
|
{
|
|
queue_action(new UploaderListenerInfo(data, time_created));
|
|
}
|
|
|
|
void UploaderThread::flights()
|
|
{
|
|
queue_action(new UploaderFlights());
|
|
}
|
|
|
|
void UploaderThread::payloads()
|
|
{
|
|
queue_action(new UploaderPayloads());
|
|
}
|
|
|
|
void UploaderThread::shutdown()
|
|
{
|
|
/* Borrow the SimpleThread mutex to make queued_shutdown access safe */
|
|
EZ::MutexLock lock(mutex);
|
|
|
|
if (!queued_shutdown)
|
|
{
|
|
queue_action(new UploaderShutdown());
|
|
queued_shutdown = true;
|
|
}
|
|
}
|
|
|
|
void *UploaderThread::run()
|
|
{
|
|
log("Started");
|
|
|
|
for (;;)
|
|
{
|
|
auto_ptr<UploaderAction> action(queue.get());
|
|
|
|
log("Running " + action->describe());
|
|
|
|
try
|
|
{
|
|
action->apply(*this);
|
|
}
|
|
catch (UploaderShutdown *s)
|
|
{
|
|
break;
|
|
}
|
|
catch (NotInitialisedError &e)
|
|
{
|
|
caught_exception(e);
|
|
continue;
|
|
}
|
|
catch (runtime_error &e)
|
|
{
|
|
caught_exception(e);
|
|
continue;
|
|
}
|
|
catch (invalid_argument &e)
|
|
{
|
|
caught_exception(e);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
log("Shutting down");
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void UploaderThread::warning(const string &message)
|
|
{
|
|
log("Warning: " + message);
|
|
}
|
|
|
|
void UploaderThread::saved_id(const string &type, const string &id)
|
|
{
|
|
log("Saved " + type + " doc: " + id);
|
|
}
|
|
|
|
void UploaderThread::initialised()
|
|
{
|
|
log("Initialised Uploader");
|
|
}
|
|
|
|
void UploaderThread::reset_done()
|
|
{
|
|
log("Settings reset");
|
|
}
|
|
|
|
void UploaderThread::caught_exception(const NotInitialisedError &error)
|
|
{
|
|
const string what(error.what());
|
|
warning("Caught NotInitialisedError");
|
|
}
|
|
|
|
void UploaderThread::caught_exception(const runtime_error &error)
|
|
{
|
|
const string what(error.what());
|
|
warning("Caught runtime_error: " + what);
|
|
}
|
|
|
|
void UploaderThread::caught_exception(const invalid_argument &error)
|
|
{
|
|
const string what(error.what());
|
|
warning("Caught invalid_argument: " + what);
|
|
}
|
|
|
|
void UploaderThread::got_flights(const vector<Json::Value> &flights)
|
|
{
|
|
log("Default action: got_flights; discarding.");
|
|
}
|
|
|
|
void UploaderThread::got_payloads(const vector<Json::Value> &payloads)
|
|
{
|
|
log("Default action: got_payloads; discarding.");
|
|
}
|
|
|
|
} /* namespace habitat */
|