diff --git a/README.md b/README.md index 0db8d88..2cb40f7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ -# activitypub +# ActivityPub + Prototyping a Python ActivityPub distributed server in Tornado + +``` +python -m activitypub +``` \ No newline at end of file diff --git a/activitypub/__init__.py b/activitypub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/activitypub/__main__.py b/activitypub/__main__.py new file mode 100644 index 0000000..b982e5d --- /dev/null +++ b/activitypub/__main__.py @@ -0,0 +1,3 @@ +from .app import main + +main() diff --git a/activitypub/app.py b/activitypub/app.py new file mode 100644 index 0000000..cd66c0d --- /dev/null +++ b/activitypub/app.py @@ -0,0 +1,41 @@ +import logging +import os + +import tornado.web +from tornado.options import define, options, parse_command_line + +from .handlers import (MessageNewHandler, MessageUpdatesHandler, + MainHandler, MessageBuffer) + +define("port", default=8888, help="run on the given port", type=int) +define("debug", default=False, help="run in debug mode", type=bool) +define("directory", default=".", help="location of templates and static", type=str) + +class ActivityPubApplocation(tornado.web.Application): + """ + """ + def __init__(self, *args, **kwargs): + self.message_buffer = MessageBuffer() + super().__init__(*args, **kwargs) + + +def make_app(): + parse_command_line() + app = ActivityPubApplocation( + [ + (r"/", MainHandler), + (r"/a/message/new", MessageNewHandler), + (r"/a/message/updates", MessageUpdatesHandler), + ], + cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", + template_path=os.path.join(os.path.dirname(options.directory), "templates"), + static_path=os.path.join(os.path.dirname(options.directory), "static"), + xsrf_cookies=True, + debug=options.debug, + ) + return app + +def main(): + app = make_app() + app.listen(options.port) + tornado.ioloop.IOLoop.current().start() diff --git a/activitypub/handlers.py b/activitypub/handlers.py new file mode 100644 index 0000000..31e483a --- /dev/null +++ b/activitypub/handlers.py @@ -0,0 +1,80 @@ +import uuid +import logging + +import tornado.escape +from tornado.concurrent import Future +from tornado import gen + +class MessageBuffer(object): + def __init__(self): + self.waiters = set() + self.cache = [] + self.cache_size = 200 + + def wait_for_messages(self, cursor=None): + # Construct a Future to return to our caller. This allows + # wait_for_messages to be yielded from a coroutine even though + # it is not a coroutine itself. We will set the result of the + # Future when results are available. + result_future = Future() + if cursor: + new_count = 0 + for msg in reversed(self.cache): + if msg["id"] == cursor: + break + new_count += 1 + if new_count: + result_future.set_result(self.cache[-new_count:]) + return result_future + self.waiters.add(result_future) + return result_future + + def cancel_wait(self, future): + self.waiters.remove(future) + # Set an empty result to unblock any coroutines waiting. + future.set_result([]) + + def new_messages(self, messages): + logging.info("Sending new message to %r listeners", len(self.waiters)) + for future in self.waiters: + future.set_result(messages) + self.waiters = set() + self.cache.extend(messages) + if len(self.cache) > self.cache_size: + self.cache = self.cache[-self.cache_size:] + +class MainHandler(tornado.web.RequestHandler): + def get(self): + self.render("index.html", messages=self.application.message_buffer.cache) + +class MessageNewHandler(tornado.web.RequestHandler): + def post(self): + message = { + "id": str(uuid.uuid4()), + "body": self.get_argument("body"), + "html": "", + } + # to_basestring is necessary for Python 3's json encoder, + # which doesn't accept byte strings. + message["html"] = tornado.escape.to_basestring( + self.render_string("message.html", message=message)) + if self.get_argument("next", None): + self.redirect(self.get_argument("next")) + else: + self.write(message) + self.application.message_buffer.new_messages([message]) + +class MessageUpdatesHandler(tornado.web.RequestHandler): + @gen.coroutine + def post(self): + cursor = self.get_argument("cursor", None) + # Save the future returned by wait_for_messages so we can cancel + # it in wait_for_messages + self.future = self.application.message_buffer.wait_for_messages(cursor=cursor) + messages = yield self.future + if self.request.connection.stream.closed(): + return + self.write(dict(messages=messages)) + + def on_connection_close(self): + self.application.message_buffer.cancel_wait(self.future) diff --git a/static/chat.css b/static/chat.css new file mode 100644 index 0000000..a400c32 --- /dev/null +++ b/static/chat.css @@ -0,0 +1,56 @@ +/* + * Copyright 2009 FriendFeed + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +body { + background: white; + margin: 10px; +} + +body, +input { + font-family: sans-serif; + font-size: 10pt; + color: black; +} + +table { + border-collapse: collapse; + border: 0; +} + +td { + border: 0; + padding: 0; +} + +#body { + position: absolute; + bottom: 10px; + left: 10px; +} + +#input { + margin-top: 0.5em; +} + +#inbox .message { + padding-top: 0.25em; +} + +#nav { + float: right; + z-index: 99; +} diff --git a/static/chat.js b/static/chat.js new file mode 100644 index 0000000..151a588 --- /dev/null +++ b/static/chat.js @@ -0,0 +1,136 @@ +// Copyright 2009 FriendFeed +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +$(document).ready(function() { + if (!window.console) window.console = {}; + if (!window.console.log) window.console.log = function() {}; + + $("#messageform").on("submit", function() { + newMessage($(this)); + return false; + }); + $("#messageform").on("keypress", function(e) { + if (e.keyCode == 13) { + newMessage($(this)); + return false; + } + return true; + }); + $("#message").select(); + updater.poll(); +}); + +function newMessage(form) { + var message = form.formToDict(); + var disabled = form.find("input[type=submit]"); + disabled.disable(); + $.postJSON("/a/message/new", message, function(response) { + updater.showMessage(response); + if (message.id) { + form.parent().remove(); + } else { + form.find("input[type=text]").val("").select(); + disabled.enable(); + } + }); +} + +function getCookie(name) { + var r = document.cookie.match("\\b" + name + "=([^;]*)\\b"); + return r ? r[1] : undefined; +} + +jQuery.postJSON = function(url, args, callback) { + args._xsrf = getCookie("_xsrf"); + $.ajax({url: url, data: $.param(args), dataType: "text", type: "POST", + success: function(response) { + if (callback) callback(eval("(" + response + ")")); + }, error: function(response) { + console.log("ERROR:", response); + }}); +}; + +jQuery.fn.formToDict = function() { + var fields = this.serializeArray(); + var json = {}; + for (var i = 0; i < fields.length; i++) { + json[fields[i].name] = fields[i].value; + } + if (json.next) delete json.next; + return json; +}; + +jQuery.fn.disable = function() { + this.enable(false); + return this; +}; + +jQuery.fn.enable = function(opt_enable) { + if (arguments.length && !opt_enable) { + this.attr("disabled", "disabled"); + } else { + this.removeAttr("disabled"); + } + return this; +}; + +var updater = { + errorSleepTime: 500, + cursor: null, + + poll: function() { + var args = {"_xsrf": getCookie("_xsrf")}; + if (updater.cursor) args.cursor = updater.cursor; + $.ajax({url: "/a/message/updates", type: "POST", dataType: "text", + data: $.param(args), success: updater.onSuccess, + error: updater.onError}); + }, + + onSuccess: function(response) { + try { + updater.newMessages(eval("(" + response + ")")); + } catch (e) { + updater.onError(); + return; + } + updater.errorSleepTime = 500; + window.setTimeout(updater.poll, 0); + }, + + onError: function(response) { + updater.errorSleepTime *= 2; + console.log("Poll error; sleeping for", updater.errorSleepTime, "ms"); + window.setTimeout(updater.poll, updater.errorSleepTime); + }, + + newMessages: function(response) { + if (!response.messages) return; + updater.cursor = response.cursor; + var messages = response.messages; + updater.cursor = messages[messages.length - 1].id; + console.log(messages.length, "new messages, cursor:", updater.cursor); + for (var i = 0; i < messages.length; i++) { + updater.showMessage(messages[i]); + } + }, + + showMessage: function(message) { + var existing = $("#m" + message.id); + if (existing.length > 0) return; + var node = $(message.html); + node.hide(); + $("#inbox").append(node); + node.slideDown(); + } +}; diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..528c319 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,41 @@ + + +
+ +