diff --git a/_design/uploader_v1/views/flights/map.js b/_design/uploader_v1/views/flights/map.js deleted file mode 100644 index a6bd573..0000000 --- a/_design/uploader_v1/views/flights/map.js +++ /dev/null @@ -1,6 +0,0 @@ -/* Copyright 2011 (C) Daniel Richman; GNU GPL 3 */ - -function (doc) { - if (doc.type == "flight") - emit(doc.end, null); -} diff --git a/src/CouchDB.cxx b/src/CouchDB.cxx index e2e234e..d4572bf 100644 --- a/src/CouchDB.cxx +++ b/src/CouchDB.cxx @@ -192,7 +192,11 @@ Json::Value *Database::view(const string &design_doc, const string &view_name, string Database::json_query_value(Json::Value &value) { Json::FastWriter writer; - return writer.write(value); + string str = writer.write(value); + int final = str.length() - 1; + if (final >= 0 && str[final] == '\n') + str.erase(final); + return str; } } /* namespace CouchDB */ diff --git a/src/Uploader.cxx b/src/Uploader.cxx index e73a287..11ab7df 100644 --- a/src/Uploader.cxx +++ b/src/Uploader.cxx @@ -246,13 +246,59 @@ string Uploader::listener_info(const Json::Value &data, int time_created) vector *Uploader::flights() { map options; - ostringstream timefmt; - timefmt << time(NULL); + + Json::Value startkey(Json::arrayValue); + startkey.append((unsigned int) time(NULL)); options["include_docs"] = "true"; - options["startkey"] = timefmt.str(); + options["startkey"] = CouchDB::Database::json_query_value(startkey); - Json::Value *response = database.view("uploader_v1", "flights", options); + Json::Value *response = + database.view("flight", "end_start_including_payloads", options); + auto_ptr response_destroyer(response); + + vector *result = new vector; + auto_ptr< vector > result_destroyer(result); + + const Json::Value &rows = (*response)["rows"]; + Json::Value::const_iterator it; + + result->reserve(rows.size()); + Json::Value *current_pcfg_list = NULL; + + for (it = rows.begin(); it != rows.end(); it++) + { + const Json::Value &row = *it; + const Json::Value &key = row["key"], &doc = row["doc"]; + bool is_pcfg = (key[2u].asInt() == 1); + + if (!is_pcfg) + { + result->push_back(doc); + /* copies the doc */ + + Json::Value &doc_copy = result->back(); + doc_copy["_payload_docs"] = Json::Value(Json::arrayValue); + current_pcfg_list = &(doc_copy["_payload_docs"]); + } + else + { + current_pcfg_list->append(doc); + } + } + + result_destroyer.release(); + + return result; +} + +vector *Uploader::payloads() +{ + map options; + options["include_docs"] = "true"; + + Json::Value *response = + database.view("payload_configuration", "name_time_created", options); auto_ptr response_destroyer(response); vector *result = new vector; @@ -269,7 +315,6 @@ vector *Uploader::flights() } result_destroyer.release(); - return result; } diff --git a/src/Uploader.h b/src/Uploader.h index 891d92d..42b57f1 100644 --- a/src/Uploader.h +++ b/src/Uploader.h @@ -54,6 +54,7 @@ public: string listener_telemetry(const Json::Value &data, int time_created=-1); string listener_info(const Json::Value &data, int time_created=-1); vector *flights(); + vector *payloads(); }; } /* namespace habitat */ diff --git a/src/UploaderThread.cxx b/src/UploaderThread.cxx index b40a399..c836e23 100644 --- a/src/UploaderThread.cxx +++ b/src/UploaderThread.cxx @@ -108,6 +108,19 @@ string UploaderFlights::describe() return "Uploader.flights()"; } +void UploaderPayloads::apply(UploaderThread &uthr) +{ + check(uthr.uploader.get()); + auto_ptr< vector > payloads; + payloads.reset(uthr.uploader->payloads()); + uthr.got_payloads(*payloads); +} + +string UploaderPayloads::describe() +{ + return "Uploader.payloads()"; +} + void UploaderShutdown::apply(UploaderThread &uthr) { throw this; @@ -176,6 +189,11 @@ void UploaderThread::flights() queue_action(new UploaderFlights()); } +void UploaderThread::payloads() +{ + queue_action(new UploaderPayloads()); +} + void UploaderThread::shutdown() { /* Borrow the SimpleThread mutex to make queued_shutdown access safe */ @@ -260,4 +278,9 @@ void UploaderThread::got_flights(const vector &flights) log("Default action: got_flights; discarding."); } +void UploaderThread::got_payloads(const vector &payloads) +{ + log("Default action: got_payloads; discarding."); +} + } /* namespace habitat */ diff --git a/src/UploaderThread.h b/src/UploaderThread.h index e74cdb7..de282ea 100644 --- a/src/UploaderThread.h +++ b/src/UploaderThread.h @@ -125,6 +125,15 @@ public: string describe(); }; +class UploaderPayloads : public UploaderAction +{ + void apply(UploaderThread &uthr); + friend class UploaderThread; + +public: + string describe(); +}; + class UploaderShutdown : public UploaderAction { void apply(UploaderThread &uthr); @@ -150,6 +159,7 @@ class UploaderThread : public EZ::SimpleThread friend class UploaderListenerTelemetry; friend class UploaderListenerInfo; friend class UploaderFlights; + friend class UploaderPayloads; public: UploaderThread(); @@ -166,6 +176,7 @@ public: void listener_telemetry(const Json::Value &data, int time_created=-1); void listener_info(const Json::Value &data, int time_created=-1); void flights(); + void payloads(); void shutdown(); void *run(); @@ -179,6 +190,7 @@ public: virtual void caught_exception(const runtime_error &error); virtual void caught_exception(const invalid_argument &error); virtual void got_flights(const vector &flights); + virtual void got_payloads(const vector &payloads); }; } /* namespace habitat */ diff --git a/tests/test_uploader.py b/tests/test_uploader.py index b1f3236..bf6383a 100644 --- a/tests/test_uploader.py +++ b/tests/test_uploader.py @@ -12,11 +12,8 @@ import collections import time import uuid import copy - -try: - import elementtree.ElementTree -except: - elementtree = None +import random +import xml.etree.cElementTree as ET class ProxyException: def __init__(self, name, what=None): @@ -151,12 +148,9 @@ class Proxy: self._check_valgrind() def _check_valgrind(self): - if elementtree == None: - raise AssertionError("Need elementtree in order to check Valgrind") - if self.xmlfile: self.xmlfile.seek(0) - tree = elementtree.ElementTree.parse(self.xmlfile) + tree = ET.parse(self.xmlfile) assert tree.find("error") == None def payload_telemetry(self, data, *args): @@ -171,10 +165,13 @@ class Proxy: def flights(self): return self._proxy(["flights"]) + def payloads(self): + return self._proxy(["payloads"]) + def reset(self): return self._proxy(["reset"]) -temp_port = 51205 +temp_port = 55205 def next_temp_port(): global temp_port @@ -716,10 +713,30 @@ class TestCPPConnector: raise AssertionError("Did not raise UnmergeableError") def test_flights(self): - flights= [{"_id": "flight_{0}".format(i), "a flight": i} - for i in xrange(100)] - rows = [{"id": doc["_id"], "key": None, "value": None, "doc": doc} - for doc in flights] + rows = [] + expect_result = [] + pcfgs = [] + + for i in xrange(100): + pcfgs.append({"_id": "pcfg_{0}".format(i), + "type": "payload_configuration", "i": i}) + for i in xrange(100): + payloads = random.sample(pcfgs, random.randint(1, 5)) + doc = {"_id": "flight_{0}", "type": "flight", "i": i, + "payloads": [p["_id"] for p in payloads]} + + start = self.callbacks.time_project(1000 + i) + end = self.callbacks.time_project(2000 + i) + rows.append({"id": doc["_id"], "key": [end, start, 0], + "value": None, "doc": doc}) + for p in payloads: + rows.append({"id": doc["_id"], "key": [end, start, 1], + "value": {"_id": p["_id"]}, "doc": p}) + + doc = copy.deepcopy(doc) + doc["_payload_docs"] = payloads + expect_result.append(doc) + fake_view_response = \ {"total_rows": len(rows), "offset": 0, "rows": rows} @@ -728,17 +745,39 @@ class TestCPPConnector: self.callbacks.advance_time(1925) view_time = self.callbacks.time_project(1925) - options = "include%5Fdocs=true&startkey=" + str(view_time) + view_path = "_design/flight/_view/end%5Fstart%5Fincluding%5Fpayloads" + options = "include%5Fdocs=true&startkey=%5B{0}%5D".format(view_time) self.couchdb.expect_request( - path=self.db_path + "_design/uploader%5Fv1/_view/flights?" + options, + path=self.db_path + view_path + "?" + options, code=200, respond_json=copy.deepcopy(fake_view_response) ) self.couchdb.run() result = self.uploader.flights() - assert result == flights + assert result == expect_result + + def test_payloads(self): + payloads = [{"_id": "pcfg_{0}".format(i), "a flight": i} + for i in xrange(100)] + rows = [{"id": doc["_id"], "key": None, "value": None, "doc": doc} + for doc in payloads] + fake_view_response = \ + {"total_rows": len(rows), "offset": 0, "rows": rows} + + view_path = "_design/payload%5Fconfiguration/_view/name%5Ftime%5Fcreated" + options = "include%5Fdocs=true" + + self.couchdb.expect_request( + path=self.db_path + view_path + "?" + options, + code=200, + respond_json=copy.deepcopy(fake_view_response) + ) + self.couchdb.run() + + result = self.uploader.payloads() + assert result == payloads class TestCPPConnectorThreaded(TestCPPConnector): command = "tests/cpp_connector_threaded" diff --git a/tests/test_uploader_main.cxx b/tests/test_uploader_main.cxx index 4f79092..06a99a2 100644 --- a/tests/test_uploader_main.cxx +++ b/tests/test_uploader_main.cxx @@ -28,7 +28,7 @@ public: }; static Json::Value proxy_callback(const string &name, const Json::Value &args); -static Json::Value repackage_flights(const vector &flights); +static Json::Value vector_to_json(const vector &vect); static void report_result(const Json::Value &arg1, const Json::Value &arg2=Json::Value::null, const Json::Value &arg3=Json::Value::null); @@ -57,7 +57,10 @@ class TestUploaderThread : public habitat::UploaderThread { report_result("error", "invalid_argument", error.what()); } void got_flights(const vector &flights) - { report_result("return", repackage_flights(flights)); } + { report_result("return", vector_to_json(flights)); } + + void got_payloads(const vector &payloads) + { report_result("return", vector_to_json(payloads)); } }; typedef TestUploaderThread TestSubject; @@ -71,6 +74,7 @@ static r_string proxy_listener_info(TestSubject *u, Json::Value command); static r_string proxy_listener_telemetry(TestSubject *u, Json::Value command); static r_string proxy_payload_telemetry(TestSubject *u, Json::Value command); static r_json proxy_flights(TestSubject *u); +static r_json proxy_payloads(TestSubject *u); static EZ::cURLGlobal cgl; static EZ::Mutex cout_lock; @@ -135,6 +139,8 @@ int main(int argc, char **argv) return_value = proxy_payload_telemetry(u.get(), command); else if (command_name == "flights") return_value = proxy_flights(u.get()); + else if (command_name == "payloads") + return_value = proxy_payloads(u.get()); else throw runtime_error("invalid command name"); @@ -173,6 +179,8 @@ int main(int argc, char **argv) proxy_payload_telemetry(&thread, command); else if (command_name == "flights") proxy_flights(&thread); + else if (command_name == "payloads") + proxy_payloads(&thread); else if (command_name == "return") callback_responses.put(command); #endif @@ -330,17 +338,28 @@ static r_json proxy_flights(TestSubject *u) #ifndef THREADED vector *result = u->flights(); auto_ptr< vector > destroyer(result); - return repackage_flights(*result); + return vector_to_json(*result); #else u->flights(); #endif } -static Json::Value repackage_flights(const vector &flights) +static r_json proxy_payloads(TestSubject *u) +{ +#ifndef THREADED + vector *result = u->payloads(); + auto_ptr< vector > destroyer(result); + return vector_to_json(*result); +#else + u->payloads(); +#endif +} + +static Json::Value vector_to_json(const vector &vect) { Json::Value list(Json::arrayValue); vector::const_iterator it; - for (it = flights.begin(); it != flights.end(); it++) + for (it = vect.begin(); it != vect.end(); it++) list.append(*it); return list; }