diff --git a/habitat/CouchDB.h b/habitat/CouchDB.h index a82e2af..f061311 100644 --- a/habitat/CouchDB.h +++ b/habitat/CouchDB.h @@ -36,6 +36,10 @@ public: Json::Value *operator[](const string &doc_id); Json::Value *view(const string &design_doc, const string &view_name, const map &options=view_default_options); + string update_put(const string &design_doc, const string &update_name, + const string &doc_id, const Json::Value &payload); + string update_put(const string &design_doc, const string &update_name, + const string &doc_id, const string &payload); static string json_query_value(Json::Value &value); }; diff --git a/habitat/Uploader.h b/habitat/Uploader.h index b118404..48985f4 100644 --- a/habitat/Uploader.h +++ b/habitat/Uploader.h @@ -22,13 +22,6 @@ public: UnmergeableError(const string &what) : runtime_error(what) {}; }; -class CollisionError : public runtime_error -{ -public: - CollisionError() : runtime_error("habitat::CollisionError") {}; - CollisionError(const string &what) : runtime_error(what) {}; -}; - class Uploader { EZ::Mutex mutex; diff --git a/src/CouchDB.cxx b/src/CouchDB.cxx index cd4ffe1..e70fbe1 100644 --- a/src/CouchDB.cxx +++ b/src/CouchDB.cxx @@ -189,6 +189,49 @@ Json::Value *Database::view(const string &design_doc, const string &view_name, return server.get_json(view_url); } +string Database::update_put(const string &design_doc, + const string &update_name, + const string &doc_id, + const Json::Value &payload) +{ + Json::FastWriter writer; + string json_payload = writer.write(payload); + return update_put(design_doc, update_name, doc_id, json_payload); +} + +string Database::update_put(const string &design_doc, + const string &update_name, + const string &doc_id, + const string &payload) +{ + string update_url(url); + + update_url.append("_design/"); + update_url.append(EZ::cURL::escape(design_doc)); + update_url.append("/_update/"); + update_url.append(EZ::cURL::escape(update_name)); + + if (doc_id.size()) + { + update_url.append("/"); + update_url.append(doc_id); + } + + try + { + return server.curl.put(update_url, payload); + } + catch (EZ::HTTPResponse &e) + { + /* Catch HTTP 409 Resource Conflict */ + + if (e.response_code != 409) + throw; + + throw Conflict(doc_id); + } +} + string Database::json_query_value(Json::Value &value) { Json::FastWriter writer; diff --git a/src/Uploader.cxx b/src/Uploader.cxx index fa49000..ffe26f4 100644 --- a/src/Uploader.cxx +++ b/src/Uploader.cxx @@ -97,36 +97,6 @@ static void set_time(Json::Value &thing, long long int time_created) RFC3339::timestamp_to_rfc3339_localoffset(time_created); } -static void payload_telemetry_new(Json::Value &doc, - const string &data_b64, - const string &callsign, - Json::Value &receiver_info) -{ - doc["data"] = Json::Value(Json::objectValue); - doc["receivers"] = Json::Value(Json::objectValue); - doc["type"] = "payload_telemetry"; - - doc["data"]["_raw"] = data_b64; - doc["receivers"][callsign] = receiver_info; -} - -static void payload_telemetry_merge(Json::Value &doc, - const string &data_b64, - const string &callsign, - Json::Value &receiver_info) -{ - if (!doc.isObject() || !doc["data"].isObject() || - !doc["receivers"].isObject()) - throw runtime_error("Server gave us an invalid payload telemetry doc"); - - string other_b64 = doc["data"]["_raw"].asString(); - - if (!other_b64.length() || other_b64 != data_b64) - throw CollisionError(); - - doc["receivers"][callsign] = receiver_info; -} - string Uploader::payload_telemetry(const string &data, const Json::Value &metadata, long long int time_created) @@ -142,7 +112,13 @@ string Uploader::payload_telemetry(const string &data, if (time_created == -1) time_created = time(NULL); - Json::Value receiver_info; + Json::Value doc; + doc["data"] = Json::Value(Json::objectValue); + doc["data"]["_raw"] = data_b64; + doc["receivers"] = Json::Value(Json::objectValue); + doc["receivers"][callsign] = Json::Value(Json::objectValue); + + Json::Value &receiver_info = doc["receivers"][callsign]; if (metadata.isObject()) { @@ -169,39 +145,29 @@ string Uploader::payload_telemetry(const string &data, if (latest_listener_telemetry.length()) receiver_info["latest_listener_telemetry"] = latest_listener_telemetry; - try + for (int attempts = 0; attempts < max_merge_attempts; attempts++) { - Json::Value doc(Json::objectValue); - set_time(receiver_info, time_created); - payload_telemetry_new(doc, data_b64, callsign, receiver_info); - doc["_id"] = doc_id; - database.save_doc(doc); - return doc_id; - } - catch (CouchDB::Conflict &e) - { - for (int attempts = 0; attempts < max_merge_attempts; attempts++) + try { - try - { - Json::Value *doc = database[doc_id]; - auto_ptr doc_destroyer(doc); - - set_time(receiver_info, time_created); - payload_telemetry_merge(*doc, data_b64, callsign, - receiver_info); - database.save_doc(*doc); - - return doc_id; - } - catch (CouchDB::Conflict &e) - { - continue; - } + set_time(receiver_info, time_created); + database.update_put("payload_telemetry", "add_listener", doc_id, + doc); + return doc_id; + } + catch (CouchDB::Conflict &e) + { + continue; + } + catch (EZ::HTTPResponse &e) + { + if (e.response_code == 403 || e.response_code == 401) + break; // Unmergeable + else + throw; } - - throw UnmergeableError(); } + + throw UnmergeableError(); } string Uploader::listener_doc(const char *type, const Json::Value &data, diff --git a/tests/test_uploader.py b/tests/test_uploader.py index bb754f3..92247ea 100644 --- a/tests/test_uploader.py +++ b/tests/test_uploader.py @@ -82,16 +82,16 @@ class Proxy: self._proxy(init_args) def _write(self, command): - print ">>", repr(command) - self.p.stdin.write(json.dumps(command)) + s = json.dumps(command) + print ">>", s + self.p.stdin.write(s) self.p.stdin.write("\n") def _read(self): line = self.p.stdout.readline() assert line and line.endswith("\n") + print "<<", line.strip() obj = json.loads(line) - - print "<<", repr(obj) return obj def _proxy(self, command): @@ -380,6 +380,21 @@ class TestCPPConnector: **kwargs ) + def expect_add_listener_update(self, doc_id, protodoc, **kwargs): + if "code" not in kwargs: + kwargs["code"] = 200 + if "respond" not in kwargs and "respond_json" not in kwargs: + kwargs["respond"] = "OK" + + self.couchdb.expect_request( + method="PUT", + path=self.db_path + "_design/payload_telemetry/" + + "_update/add_listener/" + doc_id, + body_json=protodoc, + validate_body_json=False, + **kwargs + ) + def test_uses_server_uuids(self): should_use_uuids = [] @@ -501,12 +516,10 @@ class TestCPPConnector: "8f83926278b23f5b813bdc75f6b9cdd6" ptlm_string = "asdf blah \x12 binar\x04\x01 asdfasdfsz" ptlm_metadata = {"frequency": 434075000, "misc": "Hi"} - ptlm_doc = { - "_id": ptlm_doc_id, + ptlm_doc_ish = { "data": { "_raw": "YXNkZiBibGFoIBIgYmluYXIEASBhc2RmYXNkZnN6" }, - "type": "payload_telemetry", "receivers": { "PROXYCALL": { "frequency": 434075000, @@ -515,7 +528,19 @@ class TestCPPConnector: } } - def test_payload_telemetry(self): + def make_ptlm_doc_ish(self, time_created=0, time_uploaded=0, **extra): + receiver_info = { + "time_created": self.callbacks.fake_rfc3339(time_created), + "time_uploaded": self.callbacks.fake_rfc3339(time_uploaded), + } + receiver_info.update(extra) + + doc_ish = copy.deepcopy(self.ptlm_doc_ish) + doc_ish["receivers"]["PROXYCALL"].update(receiver_info) + + return doc_ish + + def test_payload_telemetry_simple(self): # WARNING: JsonCPP does not support strings with \0 in the middle of # them, because it does not store the length of the string and instead # later figures it out with strlen. This does not harm the uploader @@ -523,16 +548,11 @@ class TestCPPConnector: # to the json encoder. However, the json stdin proxy call interface # isn't going to work with nulls in it. - receiver_info = { - "time_created": self.callbacks.fake_rfc3339(0), - "time_uploaded": self.callbacks.fake_rfc3339(0), - } + doc_ish = self.make_ptlm_doc_ish() - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - - self.expect_save_doc(doc) + self.expect_add_listener_update(self.ptlm_doc_id, doc_ish) self.couchdb.run() + ret_doc_id = self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) self.couchdb.check() @@ -542,193 +562,50 @@ class TestCPPConnector: def test_adds_latest_listener_doc(self): self.add_sample_listener_docs() - receiver_info = { - "time_created": self.callbacks.fake_rfc3339(0), - "time_uploaded": self.callbacks.fake_rfc3339(0), - "latest_listener_telemetry": self.sample_telemetry_doc_id, - "latest_listener_information": self.sample_info_doc_id - } + doc_ish = self.make_ptlm_doc_ish( + latest_listener_telemetry=self.sample_telemetry_doc_id, + latest_listener_information=self.sample_info_doc_id + ) - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - - self.expect_save_doc(doc) + self.expect_add_listener_update(self.ptlm_doc_id, doc_ish) self.couchdb.run() self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) self.couchdb.check() - ptlm_doc_existing = { - "_id": ptlm_doc_id, - "data": { - "_raw": "YXNkZiBibGFoIBIgYmluYXIEASBhc2RmYXNkZnN6", - "some_parsed_data": 12345 - }, - "type": "payload_telemetry", - "receivers": { - "SOMEONEELSE": { - "time_created": "2011-03-13T07:10:00+00:00", - "time_uploaded": "2011-03-13T07:10:40+00:00", - "frequency": 434074000, - "asdf": "World" - } - } - } + def test_ptlm_retries_conflicts(self): + doc_ish = self.make_ptlm_doc_ish() - def test_ptlm_merges_payload_conflicts(self): - receiver_info = { - "time_created": self.callbacks.fake_rfc3339(0), - "time_uploaded": self.callbacks.fake_rfc3339(0), - } - - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - - self.couchdb.expect_request( - method="PUT", - path=self.db_path + doc["_id"], - body_json=doc, + self.expect_add_listener_update( + self.ptlm_doc_id, doc_ish, code=409, respond_json={"error": "conflict"}, advance_time_after=5 ) - self.couchdb.expect_request( - path=self.db_path + self.ptlm_doc_id, - code=200, - respond_json=self.ptlm_doc_existing, - advance_time_after=5, - ) + doc_ish = copy.deepcopy(doc_ish) + doc_ish["receivers"]["PROXYCALL"]["time_uploaded"] = \ + self.callbacks.fake_rfc3339(5) - doc_merged = copy.deepcopy(self.ptlm_doc_existing) - doc_merged["receivers"]["PROXYCALL"] = \ - copy.deepcopy(self.ptlm_doc["receivers"]["PROXYCALL"]) - receiver_info = copy.deepcopy(receiver_info) - receiver_info["time_uploaded"] = self.callbacks.fake_rfc3339(10) - doc_merged["receivers"]["PROXYCALL"].update(receiver_info) - - self.expect_save_doc(doc_merged, validate_old=self.ptlm_doc_existing) + self.expect_add_listener_update(self.ptlm_doc_id, doc_ish) self.couchdb.run() self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) self.couchdb.check() - def test_ptlm_refuses_to_merge_collision(self): - receiver_info = { - "time_created": self.callbacks.fake_rfc3339(0), - "time_uploaded": self.callbacks.fake_rfc3339(0), - } + def test_ptlm_doesnt_retry_other_errors(self): + yield self.check_ptlm_doesnt_retry_code, 401 + yield self.check_ptlm_doesnt_retry_code, 403 - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - del doc["receivers"]["PROXYCALL"]["frequency"] - del doc["receivers"]["PROXYCALL"]["misc"] + def check_ptlm_doesnt_retry_code(self, code): + doc_ish = self.make_ptlm_doc_ish() - self.couchdb.expect_request( - method="PUT", - path=self.db_path + doc["_id"], - body_json=doc, - code=409, - respond_json={"error": "conflict"}, + self.expect_add_listener_update( + self.ptlm_doc_id, doc_ish, + code=code, + respond_json={"error": "of some sort"}, advance_time_after=5 ) - doc_conflict = copy.deepcopy(self.ptlm_doc_existing) - doc_conflict["data"]["_raw"] = "cGluZWFwcGxlcw==" - - self.couchdb.expect_request( - path=self.db_path + self.ptlm_doc_id, - code=200, - respond_json=doc_conflict - ) - - self.couchdb.run() - - try: - self.uploader.payload_telemetry(self.ptlm_string) - except ProxyException, e: - if e.name == "runtime_error" and \ - e.what == "habitat::CollisionError": - pass - else: - raise - else: - raise AssertionError("Did not raise CollisionError") - - self.couchdb.check() - - def add_mock_conflicts(self, n): - receiver_info = { - "time_created": self.callbacks.fake_rfc3339(0), - "time_uploaded": self.callbacks.fake_rfc3339(0), - } - - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - - self.couchdb.expect_request( - method="PUT", - path=self.db_path + self.ptlm_doc_id, - body_json=doc, - code=409, - respond_json={"error": "conflict"} - ) - - doc_existing = copy.deepcopy(self.ptlm_doc_existing) - - doc_merged = copy.deepcopy(self.ptlm_doc_existing) - doc_merged["receivers"]["PROXYCALL"] = \ - copy.deepcopy(self.ptlm_doc["receivers"]["PROXYCALL"]) - doc_merged["receivers"]["PROXYCALL"].update(receiver_info) - - for i in xrange(n): - self.couchdb.expect_request( - path=self.db_path + self.ptlm_doc_id, - code=200, - respond_json=doc_existing, - ) - - self.couchdb.expect_request( - method="PUT", - path=self.db_path + self.ptlm_doc_id, - body_json=doc_merged, - validate_old=doc_existing, - code=409, - respond_json={"error": "conflict"}, - advance_time_after=1 - ) - - doc_existing = copy.deepcopy(doc_existing) - doc_merged = copy.deepcopy(doc_merged) - - new_call = "listener_{0}".format(i) - new_info = {"time_created": self.callbacks.fake_rfc3339(i), - "time_uploaded": self.callbacks.fake_rfc3339(i + 1)} - - doc_existing["receivers"][new_call] = new_info - doc_merged["receivers"][new_call] = new_info - - doc_merged["receivers"]["PROXYCALL"]["time_uploaded"] = \ - self.callbacks.fake_rfc3339(i + 1) - - return (doc_existing, doc_merged) - - def test_merges_multiple_conflicts(self): - (final_doc_existing, final_doc_merged) = self.add_mock_conflicts(15) - - self.couchdb.expect_request( - path=self.db_path + self.ptlm_doc_id, - code=200, - respond_json=final_doc_existing, - ) - - self.expect_save_doc(final_doc_merged, validate_old=final_doc_existing) - - self.couchdb.run() - self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) - self.couchdb.check() - - def test_gives_up_after_many_conflicts(self): - self.add_mock_conflicts(20) self.couchdb.run() try: @@ -743,6 +620,64 @@ class TestCPPConnector: else: raise AssertionError("Did not raise UnmergeableError") + self.couchdb.check() + + def add_mock_conflicts(self, n): + doc_ish = self.make_ptlm_doc_ish() + + self.expect_add_listener_update( + self.ptlm_doc_id, doc_ish, + code=409, + respond_json={"error": "conflict"}, + advance_time_after=1 + ) + + for i in xrange(n): + doc_ish = copy.deepcopy(doc_ish) + doc_ish["receivers"]["PROXYCALL"]["time_uploaded"] = \ + self.callbacks.fake_rfc3339(i + 1) + + self.expect_add_listener_update( + self.ptlm_doc_id, doc_ish, + code=409, + respond_json={"error": "conflict"}, + advance_time_after=1 + ) + + def test_merges_multiple_conflicts(self): + self.add_mock_conflicts(15) + doc_ish = self.make_ptlm_doc_ish(time_created=0, time_uploaded=16) + self.expect_add_listener_update(self.ptlm_doc_id, doc_ish) + + self.couchdb.run() + self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) + self.couchdb.check() + + def test_gives_up_after_many_conflicts(self): + self.add_mock_conflicts(19) + self.couchdb.run() + + try: + self.uploader.payload_telemetry(self.ptlm_string, + self.ptlm_metadata) + except ProxyException, e: + if e.name == "runtime_error" and \ + e.what == "habitat::UnmergeableError": + pass + else: + raise + else: + raise AssertionError("Did not raise UnmergeableError") + + def test_update_func_likes_doc(self): + doc_ish = self.make_ptlm_doc_ish() + + req = { + "body": json.dumps(doc_ish), + "id": self.ptlm_doc_id + } + views.payload_telemetry.add_listener_update(None, req) + def test_flights(self): rows = [] expect_result = [] @@ -897,13 +832,13 @@ class TestCPPConnectorThreaded(TestCPPConnector): "time_uploaded": self.callbacks.fake_rfc3339(0), } - doc = copy.deepcopy(self.ptlm_doc) - doc["receivers"]["PROXYCALL"].update(receiver_info) - doc["receivers"]["NEWCALL"] = doc["receivers"]["PROXYCALL"] - del doc["receivers"]["PROXYCALL"] + doc_ish = self.make_ptlm_doc_ish() + doc_ish["receivers"]["NEWCALL"] = doc_ish["receivers"]["PROXYCALL"] + del doc_ish["receivers"]["PROXYCALL"] - self.expect_save_doc(doc) + self.expect_add_listener_update(self.ptlm_doc_id, doc_ish) self.couchdb.run() + ret_doc_id = self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata) self.couchdb.check()