Use the update function 'add_listener'

pull/1/head
Daniel Richman 2012-12-27 14:07:04 +00:00
rodzic 8dacdc32cd
commit cc2408285d
5 zmienionych plików z 194 dodań i 253 usunięć

Wyświetl plik

@ -36,6 +36,10 @@ public:
Json::Value *operator[](const string &doc_id);
Json::Value *view(const string &design_doc, const string &view_name,
const map<string,string> &options=view_default_options);
string update_put(const string &design_doc, const string &update_name,
const string &doc_id, const Json::Value &payload);
string update_put(const string &design_doc, const string &update_name,
const string &doc_id, const string &payload);
static string json_query_value(Json::Value &value);
};

Wyświetl plik

@ -22,13 +22,6 @@ public:
UnmergeableError(const string &what) : runtime_error(what) {};
};
class CollisionError : public runtime_error
{
public:
CollisionError() : runtime_error("habitat::CollisionError") {};
CollisionError(const string &what) : runtime_error(what) {};
};
class Uploader
{
EZ::Mutex mutex;

Wyświetl plik

@ -189,6 +189,49 @@ Json::Value *Database::view(const string &design_doc, const string &view_name,
return server.get_json(view_url);
}
string Database::update_put(const string &design_doc,
const string &update_name,
const string &doc_id,
const Json::Value &payload)
{
Json::FastWriter writer;
string json_payload = writer.write(payload);
return update_put(design_doc, update_name, doc_id, json_payload);
}
string Database::update_put(const string &design_doc,
const string &update_name,
const string &doc_id,
const string &payload)
{
string update_url(url);
update_url.append("_design/");
update_url.append(EZ::cURL::escape(design_doc));
update_url.append("/_update/");
update_url.append(EZ::cURL::escape(update_name));
if (doc_id.size())
{
update_url.append("/");
update_url.append(doc_id);
}
try
{
return server.curl.put(update_url, payload);
}
catch (EZ::HTTPResponse &e)
{
/* Catch HTTP 409 Resource Conflict */
if (e.response_code != 409)
throw;
throw Conflict(doc_id);
}
}
string Database::json_query_value(Json::Value &value)
{
Json::FastWriter writer;

Wyświetl plik

@ -97,36 +97,6 @@ static void set_time(Json::Value &thing, long long int time_created)
RFC3339::timestamp_to_rfc3339_localoffset(time_created);
}
static void payload_telemetry_new(Json::Value &doc,
const string &data_b64,
const string &callsign,
Json::Value &receiver_info)
{
doc["data"] = Json::Value(Json::objectValue);
doc["receivers"] = Json::Value(Json::objectValue);
doc["type"] = "payload_telemetry";
doc["data"]["_raw"] = data_b64;
doc["receivers"][callsign] = receiver_info;
}
static void payload_telemetry_merge(Json::Value &doc,
const string &data_b64,
const string &callsign,
Json::Value &receiver_info)
{
if (!doc.isObject() || !doc["data"].isObject() ||
!doc["receivers"].isObject())
throw runtime_error("Server gave us an invalid payload telemetry doc");
string other_b64 = doc["data"]["_raw"].asString();
if (!other_b64.length() || other_b64 != data_b64)
throw CollisionError();
doc["receivers"][callsign] = receiver_info;
}
string Uploader::payload_telemetry(const string &data,
const Json::Value &metadata,
long long int time_created)
@ -142,7 +112,13 @@ string Uploader::payload_telemetry(const string &data,
if (time_created == -1)
time_created = time(NULL);
Json::Value receiver_info;
Json::Value doc;
doc["data"] = Json::Value(Json::objectValue);
doc["data"]["_raw"] = data_b64;
doc["receivers"] = Json::Value(Json::objectValue);
doc["receivers"][callsign] = Json::Value(Json::objectValue);
Json::Value &receiver_info = doc["receivers"][callsign];
if (metadata.isObject())
{
@ -169,39 +145,29 @@ string Uploader::payload_telemetry(const string &data,
if (latest_listener_telemetry.length())
receiver_info["latest_listener_telemetry"] = latest_listener_telemetry;
try
for (int attempts = 0; attempts < max_merge_attempts; attempts++)
{
Json::Value doc(Json::objectValue);
set_time(receiver_info, time_created);
payload_telemetry_new(doc, data_b64, callsign, receiver_info);
doc["_id"] = doc_id;
database.save_doc(doc);
return doc_id;
}
catch (CouchDB::Conflict &e)
{
for (int attempts = 0; attempts < max_merge_attempts; attempts++)
try
{
try
{
Json::Value *doc = database[doc_id];
auto_ptr<Json::Value> doc_destroyer(doc);
set_time(receiver_info, time_created);
payload_telemetry_merge(*doc, data_b64, callsign,
receiver_info);
database.save_doc(*doc);
return doc_id;
}
catch (CouchDB::Conflict &e)
{
continue;
}
set_time(receiver_info, time_created);
database.update_put("payload_telemetry", "add_listener", doc_id,
doc);
return doc_id;
}
catch (CouchDB::Conflict &e)
{
continue;
}
catch (EZ::HTTPResponse &e)
{
if (e.response_code == 403 || e.response_code == 401)
break; // Unmergeable
else
throw;
}
throw UnmergeableError();
}
throw UnmergeableError();
}
string Uploader::listener_doc(const char *type, const Json::Value &data,

Wyświetl plik

@ -82,16 +82,16 @@ class Proxy:
self._proxy(init_args)
def _write(self, command):
print ">>", repr(command)
self.p.stdin.write(json.dumps(command))
s = json.dumps(command)
print ">>", s
self.p.stdin.write(s)
self.p.stdin.write("\n")
def _read(self):
line = self.p.stdout.readline()
assert line and line.endswith("\n")
print "<<", line.strip()
obj = json.loads(line)
print "<<", repr(obj)
return obj
def _proxy(self, command):
@ -380,6 +380,21 @@ class TestCPPConnector:
**kwargs
)
def expect_add_listener_update(self, doc_id, protodoc, **kwargs):
if "code" not in kwargs:
kwargs["code"] = 200
if "respond" not in kwargs and "respond_json" not in kwargs:
kwargs["respond"] = "OK"
self.couchdb.expect_request(
method="PUT",
path=self.db_path + "_design/payload_telemetry/" +
"_update/add_listener/" + doc_id,
body_json=protodoc,
validate_body_json=False,
**kwargs
)
def test_uses_server_uuids(self):
should_use_uuids = []
@ -501,12 +516,10 @@ class TestCPPConnector:
"8f83926278b23f5b813bdc75f6b9cdd6"
ptlm_string = "asdf blah \x12 binar\x04\x01 asdfasdfsz"
ptlm_metadata = {"frequency": 434075000, "misc": "Hi"}
ptlm_doc = {
"_id": ptlm_doc_id,
ptlm_doc_ish = {
"data": {
"_raw": "YXNkZiBibGFoIBIgYmluYXIEASBhc2RmYXNkZnN6"
},
"type": "payload_telemetry",
"receivers": {
"PROXYCALL": {
"frequency": 434075000,
@ -515,7 +528,19 @@ class TestCPPConnector:
}
}
def test_payload_telemetry(self):
def make_ptlm_doc_ish(self, time_created=0, time_uploaded=0, **extra):
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(time_created),
"time_uploaded": self.callbacks.fake_rfc3339(time_uploaded),
}
receiver_info.update(extra)
doc_ish = copy.deepcopy(self.ptlm_doc_ish)
doc_ish["receivers"]["PROXYCALL"].update(receiver_info)
return doc_ish
def test_payload_telemetry_simple(self):
# WARNING: JsonCPP does not support strings with \0 in the middle of
# them, because it does not store the length of the string and instead
# later figures it out with strlen. This does not harm the uploader
@ -523,16 +548,11 @@ class TestCPPConnector:
# to the json encoder. However, the json stdin proxy call interface
# isn't going to work with nulls in it.
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(0),
"time_uploaded": self.callbacks.fake_rfc3339(0),
}
doc_ish = self.make_ptlm_doc_ish()
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
self.expect_save_doc(doc)
self.expect_add_listener_update(self.ptlm_doc_id, doc_ish)
self.couchdb.run()
ret_doc_id = self.uploader.payload_telemetry(self.ptlm_string,
self.ptlm_metadata)
self.couchdb.check()
@ -542,193 +562,50 @@ class TestCPPConnector:
def test_adds_latest_listener_doc(self):
self.add_sample_listener_docs()
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(0),
"time_uploaded": self.callbacks.fake_rfc3339(0),
"latest_listener_telemetry": self.sample_telemetry_doc_id,
"latest_listener_information": self.sample_info_doc_id
}
doc_ish = self.make_ptlm_doc_ish(
latest_listener_telemetry=self.sample_telemetry_doc_id,
latest_listener_information=self.sample_info_doc_id
)
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
self.expect_save_doc(doc)
self.expect_add_listener_update(self.ptlm_doc_id, doc_ish)
self.couchdb.run()
self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata)
self.couchdb.check()
ptlm_doc_existing = {
"_id": ptlm_doc_id,
"data": {
"_raw": "YXNkZiBibGFoIBIgYmluYXIEASBhc2RmYXNkZnN6",
"some_parsed_data": 12345
},
"type": "payload_telemetry",
"receivers": {
"SOMEONEELSE": {
"time_created": "2011-03-13T07:10:00+00:00",
"time_uploaded": "2011-03-13T07:10:40+00:00",
"frequency": 434074000,
"asdf": "World"
}
}
}
def test_ptlm_retries_conflicts(self):
doc_ish = self.make_ptlm_doc_ish()
def test_ptlm_merges_payload_conflicts(self):
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(0),
"time_uploaded": self.callbacks.fake_rfc3339(0),
}
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
self.couchdb.expect_request(
method="PUT",
path=self.db_path + doc["_id"],
body_json=doc,
self.expect_add_listener_update(
self.ptlm_doc_id, doc_ish,
code=409,
respond_json={"error": "conflict"},
advance_time_after=5
)
self.couchdb.expect_request(
path=self.db_path + self.ptlm_doc_id,
code=200,
respond_json=self.ptlm_doc_existing,
advance_time_after=5,
)
doc_ish = copy.deepcopy(doc_ish)
doc_ish["receivers"]["PROXYCALL"]["time_uploaded"] = \
self.callbacks.fake_rfc3339(5)
doc_merged = copy.deepcopy(self.ptlm_doc_existing)
doc_merged["receivers"]["PROXYCALL"] = \
copy.deepcopy(self.ptlm_doc["receivers"]["PROXYCALL"])
receiver_info = copy.deepcopy(receiver_info)
receiver_info["time_uploaded"] = self.callbacks.fake_rfc3339(10)
doc_merged["receivers"]["PROXYCALL"].update(receiver_info)
self.expect_save_doc(doc_merged, validate_old=self.ptlm_doc_existing)
self.expect_add_listener_update(self.ptlm_doc_id, doc_ish)
self.couchdb.run()
self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata)
self.couchdb.check()
def test_ptlm_refuses_to_merge_collision(self):
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(0),
"time_uploaded": self.callbacks.fake_rfc3339(0),
}
def test_ptlm_doesnt_retry_other_errors(self):
yield self.check_ptlm_doesnt_retry_code, 401
yield self.check_ptlm_doesnt_retry_code, 403
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
del doc["receivers"]["PROXYCALL"]["frequency"]
del doc["receivers"]["PROXYCALL"]["misc"]
def check_ptlm_doesnt_retry_code(self, code):
doc_ish = self.make_ptlm_doc_ish()
self.couchdb.expect_request(
method="PUT",
path=self.db_path + doc["_id"],
body_json=doc,
code=409,
respond_json={"error": "conflict"},
self.expect_add_listener_update(
self.ptlm_doc_id, doc_ish,
code=code,
respond_json={"error": "of some sort"},
advance_time_after=5
)
doc_conflict = copy.deepcopy(self.ptlm_doc_existing)
doc_conflict["data"]["_raw"] = "cGluZWFwcGxlcw=="
self.couchdb.expect_request(
path=self.db_path + self.ptlm_doc_id,
code=200,
respond_json=doc_conflict
)
self.couchdb.run()
try:
self.uploader.payload_telemetry(self.ptlm_string)
except ProxyException, e:
if e.name == "runtime_error" and \
e.what == "habitat::CollisionError":
pass
else:
raise
else:
raise AssertionError("Did not raise CollisionError")
self.couchdb.check()
def add_mock_conflicts(self, n):
receiver_info = {
"time_created": self.callbacks.fake_rfc3339(0),
"time_uploaded": self.callbacks.fake_rfc3339(0),
}
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
self.couchdb.expect_request(
method="PUT",
path=self.db_path + self.ptlm_doc_id,
body_json=doc,
code=409,
respond_json={"error": "conflict"}
)
doc_existing = copy.deepcopy(self.ptlm_doc_existing)
doc_merged = copy.deepcopy(self.ptlm_doc_existing)
doc_merged["receivers"]["PROXYCALL"] = \
copy.deepcopy(self.ptlm_doc["receivers"]["PROXYCALL"])
doc_merged["receivers"]["PROXYCALL"].update(receiver_info)
for i in xrange(n):
self.couchdb.expect_request(
path=self.db_path + self.ptlm_doc_id,
code=200,
respond_json=doc_existing,
)
self.couchdb.expect_request(
method="PUT",
path=self.db_path + self.ptlm_doc_id,
body_json=doc_merged,
validate_old=doc_existing,
code=409,
respond_json={"error": "conflict"},
advance_time_after=1
)
doc_existing = copy.deepcopy(doc_existing)
doc_merged = copy.deepcopy(doc_merged)
new_call = "listener_{0}".format(i)
new_info = {"time_created": self.callbacks.fake_rfc3339(i),
"time_uploaded": self.callbacks.fake_rfc3339(i + 1)}
doc_existing["receivers"][new_call] = new_info
doc_merged["receivers"][new_call] = new_info
doc_merged["receivers"]["PROXYCALL"]["time_uploaded"] = \
self.callbacks.fake_rfc3339(i + 1)
return (doc_existing, doc_merged)
def test_merges_multiple_conflicts(self):
(final_doc_existing, final_doc_merged) = self.add_mock_conflicts(15)
self.couchdb.expect_request(
path=self.db_path + self.ptlm_doc_id,
code=200,
respond_json=final_doc_existing,
)
self.expect_save_doc(final_doc_merged, validate_old=final_doc_existing)
self.couchdb.run()
self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata)
self.couchdb.check()
def test_gives_up_after_many_conflicts(self):
self.add_mock_conflicts(20)
self.couchdb.run()
try:
@ -743,6 +620,64 @@ class TestCPPConnector:
else:
raise AssertionError("Did not raise UnmergeableError")
self.couchdb.check()
def add_mock_conflicts(self, n):
doc_ish = self.make_ptlm_doc_ish()
self.expect_add_listener_update(
self.ptlm_doc_id, doc_ish,
code=409,
respond_json={"error": "conflict"},
advance_time_after=1
)
for i in xrange(n):
doc_ish = copy.deepcopy(doc_ish)
doc_ish["receivers"]["PROXYCALL"]["time_uploaded"] = \
self.callbacks.fake_rfc3339(i + 1)
self.expect_add_listener_update(
self.ptlm_doc_id, doc_ish,
code=409,
respond_json={"error": "conflict"},
advance_time_after=1
)
def test_merges_multiple_conflicts(self):
self.add_mock_conflicts(15)
doc_ish = self.make_ptlm_doc_ish(time_created=0, time_uploaded=16)
self.expect_add_listener_update(self.ptlm_doc_id, doc_ish)
self.couchdb.run()
self.uploader.payload_telemetry(self.ptlm_string, self.ptlm_metadata)
self.couchdb.check()
def test_gives_up_after_many_conflicts(self):
self.add_mock_conflicts(19)
self.couchdb.run()
try:
self.uploader.payload_telemetry(self.ptlm_string,
self.ptlm_metadata)
except ProxyException, e:
if e.name == "runtime_error" and \
e.what == "habitat::UnmergeableError":
pass
else:
raise
else:
raise AssertionError("Did not raise UnmergeableError")
def test_update_func_likes_doc(self):
doc_ish = self.make_ptlm_doc_ish()
req = {
"body": json.dumps(doc_ish),
"id": self.ptlm_doc_id
}
views.payload_telemetry.add_listener_update(None, req)
def test_flights(self):
rows = []
expect_result = []
@ -897,13 +832,13 @@ class TestCPPConnectorThreaded(TestCPPConnector):
"time_uploaded": self.callbacks.fake_rfc3339(0),
}
doc = copy.deepcopy(self.ptlm_doc)
doc["receivers"]["PROXYCALL"].update(receiver_info)
doc["receivers"]["NEWCALL"] = doc["receivers"]["PROXYCALL"]
del doc["receivers"]["PROXYCALL"]
doc_ish = self.make_ptlm_doc_ish()
doc_ish["receivers"]["NEWCALL"] = doc_ish["receivers"]["PROXYCALL"]
del doc_ish["receivers"]["PROXYCALL"]
self.expect_save_doc(doc)
self.expect_add_listener_update(self.ptlm_doc_id, doc_ish)
self.couchdb.run()
ret_doc_id = self.uploader.payload_telemetry(self.ptlm_string,
self.ptlm_metadata)
self.couchdb.check()