kopia lustrzana https://github.com/ukhas/habitat-cpp-connector
Produce rfc3339 times in Uploader
rodzic
4303572e07
commit
11b8f1f7aa
|
@ -8,6 +8,7 @@ You will need these dependencies:
|
|||
|
||||
- [JsonCpp](http://jsoncpp.sourceforge.net/)
|
||||
- [libcURL](http://curl.haxx.se/)
|
||||
- habitat (for habitat.utils.rfc3339, to test the Uploader.)
|
||||
|
||||
Both build from source fairly easily, but the easiest way to acquire them for
|
||||
Ubuntu is:
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include <openssl/evp.h>
|
||||
#include "CouchDB.h"
|
||||
#include "EZ.h"
|
||||
#include "RFC3339.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@ -89,10 +90,11 @@ static string base64(const string &data)
|
|||
return data_b64;
|
||||
}
|
||||
|
||||
static void set_time(Json::Value &thing, int time_created)
|
||||
static void set_time(Json::Value &thing, long long int time_created)
|
||||
{
|
||||
thing["time_uploaded"] = Json::Int(time(NULL));
|
||||
thing["time_created"] = time_created;
|
||||
thing["time_uploaded"] = RFC3339::now_to_rfc3339_localoffset();
|
||||
thing["time_created"] =
|
||||
RFC3339::timestamp_to_rfc3339_localoffset(time_created);
|
||||
}
|
||||
|
||||
static void payload_telemetry_new(Json::Value &doc,
|
||||
|
@ -127,7 +129,7 @@ static void payload_telemetry_merge(Json::Value &doc,
|
|||
|
||||
string Uploader::payload_telemetry(const string &data,
|
||||
const Json::Value &metadata,
|
||||
int time_created)
|
||||
long long int time_created)
|
||||
{
|
||||
EZ::MutexLock lock(mutex);
|
||||
|
||||
|
@ -202,7 +204,7 @@ string Uploader::payload_telemetry(const string &data,
|
|||
}
|
||||
|
||||
string Uploader::listener_doc(const char *type, const Json::Value &data,
|
||||
int time_created)
|
||||
long long int time_created)
|
||||
{
|
||||
if (time_created == -1)
|
||||
time_created = time(NULL);
|
||||
|
@ -226,7 +228,8 @@ string Uploader::listener_doc(const char *type, const Json::Value &data,
|
|||
return doc["_id"].asString();
|
||||
}
|
||||
|
||||
string Uploader::listener_telemetry(const Json::Value &data, int time_created)
|
||||
string Uploader::listener_telemetry(const Json::Value &data,
|
||||
long long int time_created)
|
||||
{
|
||||
EZ::MutexLock lock(mutex);
|
||||
|
||||
|
@ -235,7 +238,8 @@ string Uploader::listener_telemetry(const Json::Value &data, int time_created)
|
|||
return latest_listener_telemetry;
|
||||
}
|
||||
|
||||
string Uploader::listener_info(const Json::Value &data, int time_created)
|
||||
string Uploader::listener_info(const Json::Value &data,
|
||||
long long int time_created)
|
||||
{
|
||||
EZ::MutexLock lock(mutex);
|
||||
|
||||
|
@ -249,7 +253,7 @@ vector<Json::Value> *Uploader::flights()
|
|||
map<string,string> options;
|
||||
|
||||
Json::Value startkey(Json::arrayValue);
|
||||
startkey.append((unsigned int) time(NULL));
|
||||
startkey.append((long long int) time(NULL));
|
||||
|
||||
options["include_docs"] = "true";
|
||||
options["startkey"] = CouchDB::Database::json_query_value(startkey);
|
||||
|
|
|
@ -40,7 +40,7 @@ class Uploader
|
|||
string latest_listener_telemetry;
|
||||
|
||||
string listener_doc(const char *type, const Json::Value &data,
|
||||
int time_created);
|
||||
long long int time_created);
|
||||
|
||||
public:
|
||||
Uploader(const string &callsign,
|
||||
|
@ -50,9 +50,11 @@ public:
|
|||
~Uploader() {};
|
||||
string payload_telemetry(const string &data,
|
||||
const Json::Value &metadata=Json::Value::null,
|
||||
int time_created=-1);
|
||||
string listener_telemetry(const Json::Value &data, int time_created=-1);
|
||||
string listener_info(const Json::Value &data, int time_created=-1);
|
||||
long long int time_created=-1);
|
||||
string listener_telemetry(const Json::Value &data,
|
||||
long long int time_created=-1);
|
||||
string listener_info(const Json::Value &data,
|
||||
long long int time_created=-1);
|
||||
vector<Json::Value> *flights();
|
||||
vector<Json::Value> *payloads();
|
||||
};
|
||||
|
|
|
@ -16,6 +16,8 @@ import random
|
|||
import xml.etree.cElementTree as ET
|
||||
import urllib
|
||||
|
||||
from habitat.utils import rfc3339
|
||||
|
||||
class ProxyException:
|
||||
def __init__(self, name, what=None):
|
||||
self.name = name
|
||||
|
@ -27,7 +29,7 @@ class ProxyException:
|
|||
class Callbacks:
|
||||
def __init__(self):
|
||||
self.lock = threading.RLock()
|
||||
self.fake_time = 10000 # set in fake_main.cxx
|
||||
self.fake_time = 1300000000 # set in fake_main.cxx
|
||||
|
||||
def advance_time(self, amount=1):
|
||||
with self.lock:
|
||||
|
@ -37,9 +39,13 @@ class Callbacks:
|
|||
with self.lock:
|
||||
return self.fake_time
|
||||
|
||||
def time_project(self, value):
|
||||
"""what the time will be value seconds into the future"""
|
||||
return 10000 + value
|
||||
def fake_timestamp(self, value):
|
||||
"""what the timestamp will be `value` fake seconds into the test"""
|
||||
return 1300000000 + value
|
||||
|
||||
def fake_rfc3339(self, value):
|
||||
"""what the local RFC3339 will be `value` fake seconds into the test"""
|
||||
return rfc3339.timestamp_to_rfc3339_localoffset(1300000000 + value)
|
||||
|
||||
class Proxy:
|
||||
def __init__(self, command, callsign, couch_uri=None, couch_db=None,
|
||||
|
@ -367,8 +373,8 @@ class TestCPPConnector:
|
|||
|
||||
doc = {
|
||||
"_id": uuid,
|
||||
"time_created": self.callbacks.time_project(i),
|
||||
"time_uploaded": self.callbacks.time_project(i),
|
||||
"time_created": self.callbacks.fake_rfc3339(i),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(i),
|
||||
"data": {
|
||||
"callsign": "PROXYCALL",
|
||||
"test": 123.356
|
||||
|
@ -392,8 +398,8 @@ class TestCPPConnector:
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(telemetry_data),
|
||||
"type": "listener_telemetry",
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0)
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0)
|
||||
}
|
||||
telemetry_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
|
@ -402,8 +408,8 @@ class TestCPPConnector:
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(info_data),
|
||||
"type": "listener_info",
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0)
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0)
|
||||
}
|
||||
info_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
|
@ -424,11 +430,7 @@ class TestCPPConnector:
|
|||
|
||||
# And now again, but this time, setting time_created.
|
||||
telemetry_data = {
|
||||
"time": {
|
||||
"hour": 12,
|
||||
"minute": 40,
|
||||
"second": 5
|
||||
},
|
||||
"time": "12:40:05",
|
||||
"latitude": 35.11,
|
||||
"longitude": 137.567,
|
||||
"altitude": 12
|
||||
|
@ -437,8 +439,8 @@ class TestCPPConnector:
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(telemetry_data),
|
||||
"type": "listener_telemetry",
|
||||
"time_created": 501,
|
||||
"time_uploaded": self.callbacks.time_project(0)
|
||||
"time_created": self.callbacks.fake_rfc3339(501),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(1000)
|
||||
}
|
||||
telemetry_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
|
@ -452,18 +454,21 @@ class TestCPPConnector:
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(info_data),
|
||||
"type": "listener_info",
|
||||
"time_created": 409,
|
||||
"time_uploaded": self.callbacks.time_project(5)
|
||||
"time_created": self.callbacks.fake_rfc3339(409),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(1005)
|
||||
}
|
||||
info_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
self.expect_save_doc(telemetry_doc, advance_time_after=5)
|
||||
self.expect_save_doc(info_doc)
|
||||
|
||||
self.callbacks.advance_time(1000)
|
||||
self.couchdb.run()
|
||||
telemetry_doc_id = \
|
||||
self.uploader.listener_telemetry(telemetry_data, 501)
|
||||
info_doc_id = self.uploader.listener_info(info_data, 409)
|
||||
self.uploader.listener_telemetry(telemetry_data,
|
||||
self.callbacks.fake_timestamp(501))
|
||||
info_doc_id = self.uploader.listener_info(info_data,
|
||||
self.callbacks.fake_timestamp(409))
|
||||
self.couchdb.check()
|
||||
|
||||
assert telemetry_doc_id == telemetry_doc["_id"]
|
||||
|
@ -496,8 +501,8 @@ class TestCPPConnector:
|
|||
# isn't going to work with nulls in it.
|
||||
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
}
|
||||
|
||||
doc = copy.deepcopy(self.ptlm_doc)
|
||||
|
@ -515,8 +520,8 @@ class TestCPPConnector:
|
|||
self.add_sample_listener_docs()
|
||||
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
"latest_listener_telemetry": self.sample_telemetry_doc_id,
|
||||
"latest_listener_info": self.sample_info_doc_id
|
||||
}
|
||||
|
@ -538,8 +543,8 @@ class TestCPPConnector:
|
|||
"type": "payload_telemetry",
|
||||
"receivers": {
|
||||
"SOMEONEELSE": {
|
||||
"time_created": 200,
|
||||
"time_uploaded": 240,
|
||||
"time_created": "2011-03-13T07:10:00+00:00",
|
||||
"time_uploaded": "2011-03-13T07:10:40+00:00",
|
||||
"frequency": 434074000,
|
||||
"asdf": "World"
|
||||
}
|
||||
|
@ -548,8 +553,8 @@ class TestCPPConnector:
|
|||
|
||||
def test_ptlm_merges_payload_conflicts(self):
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
}
|
||||
|
||||
doc = copy.deepcopy(self.ptlm_doc)
|
||||
|
@ -575,7 +580,7 @@ class TestCPPConnector:
|
|||
doc_merged["receivers"]["PROXYCALL"] = \
|
||||
copy.deepcopy(self.ptlm_doc["receivers"]["PROXYCALL"])
|
||||
receiver_info = copy.deepcopy(receiver_info)
|
||||
receiver_info["time_uploaded"] = self.callbacks.time_project(10)
|
||||
receiver_info["time_uploaded"] = self.callbacks.fake_rfc3339(10)
|
||||
doc_merged["receivers"]["PROXYCALL"].update(receiver_info)
|
||||
|
||||
self.expect_save_doc(doc_merged)
|
||||
|
@ -586,8 +591,8 @@ class TestCPPConnector:
|
|||
|
||||
def test_ptlm_refuses_to_merge_collision(self):
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
}
|
||||
|
||||
doc = copy.deepcopy(self.ptlm_doc)
|
||||
|
@ -630,8 +635,8 @@ class TestCPPConnector:
|
|||
|
||||
def add_mock_conflicts(self, n):
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
}
|
||||
|
||||
doc = copy.deepcopy(self.ptlm_doc)
|
||||
|
@ -672,13 +677,14 @@ class TestCPPConnector:
|
|||
doc_merged = copy.deepcopy(doc_merged)
|
||||
|
||||
new_call = "listener_{0}".format(i)
|
||||
new_info = {"time_created": 600 + i, "time_uploaded": 641 + i}
|
||||
new_info = {"time_created": self.callbacks.fake_rfc3339(i),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(i + 1)}
|
||||
|
||||
doc_existing["receivers"][new_call] = new_info
|
||||
doc_merged["receivers"][new_call] = new_info
|
||||
|
||||
doc_merged["receivers"]["PROXYCALL"]["time_uploaded"] = \
|
||||
self.callbacks.time_project(i + 1)
|
||||
self.callbacks.fake_rfc3339(i + 1)
|
||||
|
||||
return (doc_existing, doc_merged)
|
||||
|
||||
|
@ -726,8 +732,8 @@ class TestCPPConnector:
|
|||
doc = {"_id": "flight_{0}", "type": "flight", "i": i,
|
||||
"payloads": [p["_id"] for p in payloads]}
|
||||
|
||||
start = self.callbacks.time_project(1000 + i)
|
||||
end = self.callbacks.time_project(2000 + i)
|
||||
start = self.callbacks.fake_rfc3339(1000 + i)
|
||||
end = self.callbacks.fake_rfc3339(2000 + i)
|
||||
rows.append({"id": doc["_id"], "key": [end, start, 0],
|
||||
"value": None, "doc": doc})
|
||||
for p in payloads:
|
||||
|
@ -742,7 +748,7 @@ class TestCPPConnector:
|
|||
{"total_rows": len(rows), "offset": 0, "rows": rows}
|
||||
|
||||
self.callbacks.advance_time(1925)
|
||||
view_time = self.callbacks.time_project(1925)
|
||||
view_time = self.callbacks.fake_timestamp(1925)
|
||||
view_path = "_design/flight/_view/end_start_including_payloads"
|
||||
options = "include_docs=true&startkey=[{0}]".format(view_time)
|
||||
|
||||
|
@ -786,8 +792,8 @@ class TestCPPConnectorThreaded(TestCPPConnector):
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(telemetry_data),
|
||||
"type": "listener_telemetry",
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0)
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0)
|
||||
}
|
||||
telemetry_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
|
@ -796,8 +802,8 @@ class TestCPPConnectorThreaded(TestCPPConnector):
|
|||
"_id": self.pop_uuid(),
|
||||
"data": copy.deepcopy(info_data),
|
||||
"type": "listener_info",
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0)
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0)
|
||||
}
|
||||
info_doc["data"]["callsign"] = "PROXYCALL"
|
||||
|
||||
|
@ -849,8 +855,8 @@ class TestCPPConnectorThreaded(TestCPPConnector):
|
|||
self.uploader.re_init("NEWCALL", self.couchdb.url)
|
||||
|
||||
receiver_info = {
|
||||
"time_created": self.callbacks.time_project(0),
|
||||
"time_uploaded": self.callbacks.time_project(0),
|
||||
"time_created": self.callbacks.fake_rfc3339(0),
|
||||
"time_uploaded": self.callbacks.fake_rfc3339(0),
|
||||
}
|
||||
|
||||
doc = copy.deepcopy(self.ptlm_doc)
|
||||
|
|
|
@ -79,7 +79,7 @@ static r_json proxy_payloads(TestSubject *u);
|
|||
static EZ::cURLGlobal cgl;
|
||||
static EZ::Mutex cout_lock;
|
||||
static SafeValue<bool> enable_callbacks(false);
|
||||
static SafeValue<int> last_time(10000);
|
||||
static SafeValue<int> last_time(1300000000);
|
||||
|
||||
#ifdef THREADED
|
||||
static EZ::Queue<Json::Value> callback_responses;
|
||||
|
|
Ładowanie…
Reference in New Issue