kopia lustrzana https://github.com/pimoroni/pimoroni-pico
Upload of inkylauncher example
rodzic
a46a53a709
commit
3bf10632c2
|
@ -0,0 +1,121 @@
|
|||
import gc
|
||||
import ujson
|
||||
from urllib import urequest
|
||||
|
||||
# Length of time between updates in Seconds.
|
||||
# Frequent updates will reduce battery life!
|
||||
UPDATE_INTERVAL = 2
|
||||
|
||||
# API URL
|
||||
URL = "https://www.boredapi.com/api/activity"
|
||||
|
||||
graphics = None
|
||||
text = None
|
||||
|
||||
gc.collect()
|
||||
|
||||
|
||||
def display_quote(text, ox, oy, scale, wordwrap):
|
||||
# Processing text is memory intensive
|
||||
# so we'll do it one char at a time as we draw to the screen
|
||||
line_height = 8 * scale
|
||||
html = False
|
||||
html_tag = ""
|
||||
word = ""
|
||||
space_width = graphics.measure_text(" ", scale=scale)
|
||||
x = ox
|
||||
y = oy
|
||||
for char in text:
|
||||
if char in "[]":
|
||||
continue
|
||||
if char == "<":
|
||||
html = True
|
||||
html_tag = ""
|
||||
continue
|
||||
if char == ">":
|
||||
html = False
|
||||
continue
|
||||
if html:
|
||||
if char in "/ ":
|
||||
continue
|
||||
html_tag += char
|
||||
continue
|
||||
if char in (" ", "\n") or html_tag == "br":
|
||||
w = graphics.measure_text(word, scale=scale)
|
||||
if x + w > wordwrap or char == "\n" or html_tag == "br":
|
||||
x = ox
|
||||
y += line_height
|
||||
|
||||
graphics.text(word, x, y, scale=scale)
|
||||
word = ""
|
||||
html_tag = ""
|
||||
x += w + space_width
|
||||
continue
|
||||
|
||||
word += char
|
||||
|
||||
# Last word
|
||||
w = graphics.measure_text(word, scale=scale)
|
||||
if x + w > wordwrap:
|
||||
x = ox
|
||||
y += line_height
|
||||
|
||||
graphics.text(word, x, y, scale=scale)
|
||||
|
||||
|
||||
def update():
|
||||
global text
|
||||
|
||||
gc.collect()
|
||||
|
||||
# Grab the data
|
||||
socket = urequest.urlopen(URL)
|
||||
j = ujson.load(socket)
|
||||
socket.close()
|
||||
|
||||
text = [j['activity'], j['type'], j['participants']]
|
||||
|
||||
gc.collect()
|
||||
|
||||
|
||||
def draw():
|
||||
global text
|
||||
|
||||
WIDTH, HEIGHT = graphics.get_bounds()
|
||||
|
||||
# Clear the screen
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
graphics.set_pen(0)
|
||||
|
||||
# Page lines!
|
||||
graphics.set_pen(3)
|
||||
graphics.line(0, 65, WIDTH, 65)
|
||||
for i in range(2, 13):
|
||||
graphics.line(0, i * 35, WIDTH, i * 35)
|
||||
|
||||
gc.collect()
|
||||
|
||||
# Page margin
|
||||
graphics.set_pen(4)
|
||||
graphics.line(50, 0, 50, HEIGHT)
|
||||
graphics.set_pen(0)
|
||||
|
||||
# Main text
|
||||
graphics.set_font("cursive")
|
||||
graphics.set_pen(4)
|
||||
graphics.set_font("cursive")
|
||||
graphics.text("Activity Idea", 55, 30, WIDTH - 20, 2)
|
||||
graphics.set_pen(0)
|
||||
graphics.set_font("bitmap8")
|
||||
display_quote(text[0], 55, 170, 5, WIDTH - 20)
|
||||
|
||||
gc.collect()
|
||||
|
||||
graphics.set_pen(2)
|
||||
graphics.text("Activity Type: " + text[1], 55, HEIGHT - 45, WIDTH - 20, 2)
|
||||
graphics.text("Participants: " + str(text[2]), 400, HEIGHT - 45, WIDTH - 20, 2)
|
||||
|
||||
graphics.update()
|
||||
|
||||
gc.collect()
|
|
@ -0,0 +1,175 @@
|
|||
from pimoroni_i2c import PimoroniI2C
|
||||
from pcf85063a import PCF85063A
|
||||
import math
|
||||
from machine import Pin, PWM, Timer
|
||||
import time
|
||||
import inky_frame
|
||||
import json
|
||||
import network
|
||||
import os
|
||||
|
||||
# Pin setup for VSYS_HOLD needed to sleep and wake.
|
||||
HOLD_VSYS_EN_PIN = 2
|
||||
hold_vsys_en_pin = Pin(HOLD_VSYS_EN_PIN, Pin.OUT)
|
||||
|
||||
# intialise the pcf85063a real time clock chip
|
||||
I2C_SDA_PIN = 4
|
||||
I2C_SCL_PIN = 5
|
||||
i2c = PimoroniI2C(I2C_SDA_PIN, I2C_SCL_PIN, 100000)
|
||||
rtc = PCF85063A(i2c)
|
||||
|
||||
# set up the button LEDs
|
||||
button_a_led = Pin(11, Pin.OUT)
|
||||
button_b_led = Pin(12, Pin.OUT)
|
||||
button_c_led = Pin(13, Pin.OUT)
|
||||
button_d_led = Pin(14, Pin.OUT)
|
||||
button_e_led = Pin(15, Pin.OUT)
|
||||
led_warn = Pin(6, Pin.OUT)
|
||||
|
||||
# set up for the network LED
|
||||
network_led_pwm = PWM(Pin(7))
|
||||
network_led_pwm.freq(1000)
|
||||
network_led_pwm.duty_u16(0)
|
||||
|
||||
|
||||
# set the brightness of the network led
|
||||
def network_led(brightness):
|
||||
brightness = max(0, min(100, brightness)) # clamp to range
|
||||
# gamma correct the brightness (gamma 2.8)
|
||||
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
|
||||
network_led_pwm.duty_u16(value)
|
||||
|
||||
|
||||
network_led_timer = Timer(-1)
|
||||
network_led_pulse_speed_hz = 1
|
||||
|
||||
|
||||
def network_led_callback(t):
|
||||
# updates the network led brightness based on a sinusoid seeded by the current time
|
||||
brightness = (math.sin(time.ticks_ms() * math.pi * 2 / (1000 / network_led_pulse_speed_hz)) * 40) + 60
|
||||
value = int(pow(brightness / 100.0, 2.8) * 65535.0 + 0.5)
|
||||
network_led_pwm.duty_u16(value)
|
||||
|
||||
|
||||
# set the network led into pulsing mode
|
||||
def pulse_network_led(speed_hz=1):
|
||||
global network_led_timer, network_led_pulse_speed_hz
|
||||
network_led_pulse_speed_hz = speed_hz
|
||||
network_led_timer.deinit()
|
||||
network_led_timer.init(period=50, mode=Timer.PERIODIC, callback=network_led_callback)
|
||||
|
||||
|
||||
# turn off the network led and disable any pulsing animation that's running
|
||||
def stop_network_led():
|
||||
global network_led_timer
|
||||
network_led_timer.deinit()
|
||||
network_led_pwm.duty_u16(0)
|
||||
|
||||
|
||||
# returns the id of the button that is currently pressed or
|
||||
# None if none are
|
||||
def pressed():
|
||||
if inky_frame.button_a.read():
|
||||
return inky_frame.button_a
|
||||
if inky_frame.button_b.read():
|
||||
return inky_frame.button_b
|
||||
if inky_frame.button_c.read():
|
||||
return inky_frame.button_c
|
||||
if inky_frame.button_d.read():
|
||||
return inky_frame.button_d
|
||||
if inky_frame.button_e.read():
|
||||
return inky_frame.button_e
|
||||
return None
|
||||
|
||||
|
||||
def sleep(t):
|
||||
# Time to have a little nap until the next update
|
||||
rtc.clear_timer_flag()
|
||||
rtc.set_timer(t, ttp=rtc.TIMER_TICK_1_OVER_60HZ)
|
||||
rtc.enable_timer_interrupt(True)
|
||||
|
||||
# Set the HOLD VSYS pin to an input
|
||||
# this allows the device to go into sleep mode when on battery power.
|
||||
hold_vsys_en_pin.init(Pin.IN)
|
||||
|
||||
# Regular time.sleep for those powering from USB
|
||||
time.sleep(60 * t)
|
||||
|
||||
|
||||
# Turns off the button LEDs
|
||||
def clear_button_leds():
|
||||
button_a_led.off()
|
||||
button_b_led.off()
|
||||
button_c_led.off()
|
||||
button_d_led.off()
|
||||
button_e_led.off()
|
||||
|
||||
|
||||
def network_connect(SSID, PSK):
|
||||
# Enable the Wireless
|
||||
wlan = network.WLAN(network.STA_IF)
|
||||
wlan.active(True)
|
||||
|
||||
# Number of attempts to make before timeout
|
||||
max_wait = 10
|
||||
|
||||
# Sets the Wireless LED pulsing and attempts to connect to your local network.
|
||||
pulse_network_led()
|
||||
wlan.connect(SSID, PSK)
|
||||
|
||||
while max_wait > 0:
|
||||
if wlan.status() < 0 or wlan.status() >= 3:
|
||||
break
|
||||
max_wait -= 1
|
||||
print('waiting for connection...')
|
||||
time.sleep(1)
|
||||
|
||||
stop_network_led()
|
||||
network_led_pwm.duty_u16(30000)
|
||||
|
||||
# Handle connection error. Switches the Warn LED on.
|
||||
if wlan.status() != 3:
|
||||
stop_network_led()
|
||||
led_warn.on()
|
||||
|
||||
|
||||
state = {"run": None}
|
||||
app = None
|
||||
|
||||
|
||||
def file_exists(filename):
|
||||
try:
|
||||
return (os.stat(filename)[0] & 0x4000) == 0
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def clear_state():
|
||||
if file_exists("state.json"):
|
||||
os.remove("state.json")
|
||||
|
||||
|
||||
def save_state(data):
|
||||
with open("/state.json", "w") as f:
|
||||
f.write(json.dumps(data))
|
||||
f.flush()
|
||||
|
||||
|
||||
def load_state():
|
||||
global state
|
||||
data = json.loads(open("/state.json", "r").read())
|
||||
if type(data) is dict:
|
||||
state = data
|
||||
|
||||
|
||||
def update_state(running):
|
||||
global state
|
||||
state['run'] = running
|
||||
save_state(state)
|
||||
|
||||
|
||||
def launch_app(app_name):
|
||||
global app
|
||||
app = __import__(app_name)
|
||||
print(app)
|
||||
update_state(app_name)
|
Plik binarny nie jest wyświetlany.
Plik binarny nie jest wyświetlany.
Plik binarny nie jest wyświetlany.
|
@ -0,0 +1,662 @@
|
|||
"""
|
||||
Tiny Web - pretty simple and powerful web server for tiny platforms like ESP8266 / ESP32
|
||||
MIT license
|
||||
(C) Konstantin Belyalov 2017-2018
|
||||
"""
|
||||
import logging
|
||||
import uasyncio as asyncio
|
||||
import uasyncio.core
|
||||
import ujson as json
|
||||
import gc
|
||||
import uos as os
|
||||
import sys
|
||||
import uerrno as errno
|
||||
import usocket as socket
|
||||
|
||||
|
||||
log = logging.getLogger('WEB')
|
||||
|
||||
type_gen = type((lambda: (yield))()) # noqa: E275
|
||||
|
||||
# uasyncio v3 is shipped with MicroPython 1.13, and contains some subtle
|
||||
# but breaking changes. See also https://github.com/peterhinch/micropython-async/blob/master/v3/README.md
|
||||
IS_UASYNCIO_V3 = hasattr(asyncio, "__version__") and asyncio.__version__ >= (3,)
|
||||
|
||||
|
||||
def urldecode_plus(s):
|
||||
"""Decode urlencoded string (including '+' char).
|
||||
Returns decoded string
|
||||
"""
|
||||
s = s.replace('+', ' ')
|
||||
arr = s.split('%')
|
||||
res = arr[0]
|
||||
for it in arr[1:]:
|
||||
if len(it) >= 2:
|
||||
res += chr(int(it[:2], 16)) + it[2:]
|
||||
elif len(it) == 0:
|
||||
res += '%'
|
||||
else:
|
||||
res += it
|
||||
return res
|
||||
|
||||
|
||||
def parse_query_string(s):
|
||||
"""Parse urlencoded string into dict.
|
||||
Returns dict
|
||||
"""
|
||||
res = {}
|
||||
pairs = s.split('&')
|
||||
for p in pairs:
|
||||
vals = [urldecode_plus(x) for x in p.split('=', 1)]
|
||||
if len(vals) == 1:
|
||||
res[vals[0]] = ''
|
||||
else:
|
||||
res[vals[0]] = vals[1]
|
||||
return res
|
||||
|
||||
|
||||
class HTTPException(Exception):
|
||||
"""HTTP protocol exceptions"""
|
||||
|
||||
def __init__(self, code=400):
|
||||
self.code = code
|
||||
|
||||
|
||||
class request:
|
||||
"""HTTP Request class"""
|
||||
|
||||
def __init__(self, _reader):
|
||||
self.reader = _reader
|
||||
self.headers = {}
|
||||
self.method = b''
|
||||
self.path = b''
|
||||
self.query_string = b''
|
||||
|
||||
async def read_request_line(self):
|
||||
"""Read and parse first line (AKA HTTP Request Line).
|
||||
Function is generator.
|
||||
Request line is something like:
|
||||
GET /something/script?param1=val1 HTTP/1.1
|
||||
"""
|
||||
while True:
|
||||
rl = await self.reader.readline()
|
||||
# skip empty lines
|
||||
if rl == b'\r\n' or rl == b'\n':
|
||||
continue
|
||||
break
|
||||
rl_frags = rl.split()
|
||||
if len(rl_frags) != 3:
|
||||
raise HTTPException(400)
|
||||
self.method = rl_frags[0]
|
||||
url_frags = rl_frags[1].split(b'?', 1)
|
||||
self.path = url_frags[0]
|
||||
if len(url_frags) > 1:
|
||||
self.query_string = url_frags[1]
|
||||
|
||||
async def read_headers(self, save_headers=[]):
|
||||
"""Read and parse HTTP headers until \r\n\r\n:
|
||||
Optional argument 'save_headers' controls which headers to save.
|
||||
This is done mostly to deal with memory constrains.
|
||||
Function is generator.
|
||||
HTTP headers could be like:
|
||||
Host: google.com
|
||||
Content-Type: blah
|
||||
\r\n
|
||||
"""
|
||||
while True:
|
||||
gc.collect()
|
||||
line = await self.reader.readline()
|
||||
if line == b'\r\n':
|
||||
break
|
||||
frags = line.split(b':', 1)
|
||||
if len(frags) != 2:
|
||||
raise HTTPException(400)
|
||||
if frags[0] in save_headers:
|
||||
self.headers[frags[0]] = frags[1].strip()
|
||||
|
||||
async def read_parse_form_data(self):
|
||||
"""Read HTTP form data (payload), if any.
|
||||
Function is generator.
|
||||
Returns:
|
||||
- dict of key / value pairs
|
||||
- None in case of no form data present
|
||||
"""
|
||||
# TODO: Probably there is better solution how to handle
|
||||
# request body, at least for simple urlencoded forms - by processing
|
||||
# chunks instead of accumulating payload.
|
||||
gc.collect()
|
||||
if b'Content-Length' not in self.headers:
|
||||
return {}
|
||||
# Parse payload depending on content type
|
||||
if b'Content-Type' not in self.headers:
|
||||
# Unknown content type, return unparsed, raw data
|
||||
return {}
|
||||
size = int(self.headers[b'Content-Length'])
|
||||
if size > self.params['max_body_size'] or size < 0:
|
||||
raise HTTPException(413)
|
||||
data = await self.reader.readexactly(size)
|
||||
# Use only string before ';', e.g:
|
||||
# application/x-www-form-urlencoded; charset=UTF-8
|
||||
ct = self.headers[b'Content-Type'].split(b';', 1)[0]
|
||||
try:
|
||||
if ct == b'application/json':
|
||||
return json.loads(data)
|
||||
elif ct == b'application/x-www-form-urlencoded':
|
||||
return parse_query_string(data.decode())
|
||||
except ValueError:
|
||||
# Re-generate exception for malformed form data
|
||||
raise HTTPException(400)
|
||||
|
||||
|
||||
class response:
|
||||
"""HTTP Response class"""
|
||||
|
||||
def __init__(self, _writer):
|
||||
self.writer = _writer
|
||||
self.send = _writer.awrite
|
||||
self.code = 200
|
||||
self.version = '1.0'
|
||||
self.headers = {}
|
||||
|
||||
async def _send_headers(self):
|
||||
"""Compose and send:
|
||||
- HTTP request line
|
||||
- HTTP headers following by \r\n.
|
||||
This function is generator.
|
||||
P.S.
|
||||
Because of usually we have only a few HTTP headers (2-5) it doesn't make sense
|
||||
to send them separately - sometimes it could increase latency.
|
||||
So combining headers together and send them as single "packet".
|
||||
"""
|
||||
# Request line
|
||||
hdrs = 'HTTP/{} {} MSG\r\n'.format(self.version, self.code)
|
||||
# Headers
|
||||
for k, v in self.headers.items():
|
||||
hdrs += '{}: {}\r\n'.format(k, v)
|
||||
hdrs += '\r\n'
|
||||
# Collect garbage after small mallocs
|
||||
gc.collect()
|
||||
await self.send(hdrs)
|
||||
|
||||
async def error(self, code, msg=None):
|
||||
"""Generate HTTP error response
|
||||
This function is generator.
|
||||
Arguments:
|
||||
code - HTTP response code
|
||||
Example:
|
||||
# Not enough permissions. Send HTTP 403 - Forbidden
|
||||
await resp.error(403)
|
||||
"""
|
||||
self.code = code
|
||||
if msg:
|
||||
self.add_header('Content-Length', len(msg))
|
||||
await self._send_headers()
|
||||
if msg:
|
||||
await self.send(msg)
|
||||
|
||||
async def redirect(self, location, msg=None):
|
||||
"""Generate HTTP redirect response to 'location'.
|
||||
Basically it will generate HTTP 302 with 'Location' header
|
||||
Arguments:
|
||||
location - URL to redirect to
|
||||
Example:
|
||||
# Redirect to /something
|
||||
await resp.redirect('/something')
|
||||
"""
|
||||
self.code = 302
|
||||
self.add_header('Location', location)
|
||||
if msg:
|
||||
self.add_header('Content-Length', len(msg))
|
||||
await self._send_headers()
|
||||
if msg:
|
||||
await self.send(msg)
|
||||
|
||||
def add_header(self, key, value):
|
||||
"""Add HTTP response header
|
||||
Arguments:
|
||||
key - header name
|
||||
value - header value
|
||||
Example:
|
||||
resp.add_header('Content-Encoding', 'gzip')
|
||||
"""
|
||||
self.headers[key] = value
|
||||
|
||||
def add_access_control_headers(self):
|
||||
"""Add Access Control related HTTP response headers.
|
||||
This is required when working with RestApi (JSON requests)
|
||||
"""
|
||||
self.add_header('Access-Control-Allow-Origin', self.params['allowed_access_control_origins'])
|
||||
self.add_header('Access-Control-Allow-Methods', self.params['allowed_access_control_methods'])
|
||||
self.add_header('Access-Control-Allow-Headers', self.params['allowed_access_control_headers'])
|
||||
|
||||
async def start_html(self):
|
||||
"""Start response with HTML content type.
|
||||
This function is generator.
|
||||
Example:
|
||||
await resp.start_html()
|
||||
await resp.send('<html><h1>Hello, world!</h1></html>')
|
||||
"""
|
||||
self.add_header('Content-Type', 'text/html')
|
||||
await self._send_headers()
|
||||
|
||||
async def send_file(self, filename, content_type=None, content_encoding=None, max_age=2592000, buf_size=128):
|
||||
"""Send local file as HTTP response.
|
||||
This function is generator.
|
||||
Arguments:
|
||||
filename - Name of file which exists in local filesystem
|
||||
Keyword arguments:
|
||||
content_type - Filetype. By default - None means auto-detect.
|
||||
max_age - Cache control. How long browser can keep this file on disk.
|
||||
By default - 30 days
|
||||
Set to 0 - to disable caching.
|
||||
Example 1: Default use case:
|
||||
await resp.send_file('images/cat.jpg')
|
||||
Example 2: Disable caching:
|
||||
await resp.send_file('static/index.html', max_age=0)
|
||||
Example 3: Override content type:
|
||||
await resp.send_file('static/file.bin', content_type='application/octet-stream')
|
||||
"""
|
||||
try:
|
||||
# Get file size
|
||||
stat = os.stat(filename)
|
||||
slen = str(stat[6])
|
||||
self.add_header('Content-Length', slen)
|
||||
# Find content type
|
||||
if content_type:
|
||||
self.add_header('Content-Type', content_type)
|
||||
# Add content-encoding, if any
|
||||
if content_encoding:
|
||||
self.add_header('Content-Encoding', content_encoding)
|
||||
# Since this is static content is totally make sense
|
||||
# to tell browser to cache it, however, you can always
|
||||
# override it by setting max_age to zero
|
||||
self.add_header('Cache-Control', 'max-age={}, public'.format(max_age))
|
||||
with open(filename) as f:
|
||||
await self._send_headers()
|
||||
gc.collect()
|
||||
buf = bytearray(min(stat[6], buf_size))
|
||||
while True:
|
||||
size = f.readinto(buf)
|
||||
if size == 0:
|
||||
break
|
||||
await self.send(buf, sz=size)
|
||||
except OSError as e:
|
||||
# special handling for ENOENT / EACCESS
|
||||
if e.args[0] in (errno.ENOENT, errno.EACCES):
|
||||
raise HTTPException(404)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
async def restful_resource_handler(req, resp, param=None):
|
||||
"""Handler for RESTful API endpoins"""
|
||||
# Gather data - query string, JSON in request body...
|
||||
data = await req.read_parse_form_data()
|
||||
# Add parameters from URI query string as well
|
||||
# This one is actually for simply development of RestAPI
|
||||
if req.query_string != b'':
|
||||
data.update(parse_query_string(req.query_string.decode()))
|
||||
# Call actual handler
|
||||
_handler, _kwargs = req.params['_callmap'][req.method]
|
||||
# Collect garbage before / after handler execution
|
||||
gc.collect()
|
||||
if param:
|
||||
res = _handler(data, param, **_kwargs)
|
||||
else:
|
||||
res = _handler(data, **_kwargs)
|
||||
gc.collect()
|
||||
# Handler result could be:
|
||||
# 1. generator - in case of large payload
|
||||
# 2. string - just string :)
|
||||
# 2. dict - meaning client what tinyweb to convert it to JSON
|
||||
# it can also return error code together with str / dict
|
||||
# res = {'blah': 'blah'}
|
||||
# res = {'blah': 'blah'}, 201
|
||||
if isinstance(res, type_gen):
|
||||
# Result is generator, use chunked response
|
||||
# NOTICE: HTTP 1.0 by itself does not support chunked responses, so, making workaround:
|
||||
# Response is HTTP/1.1 with Connection: close
|
||||
resp.version = '1.1'
|
||||
resp.add_header('Connection', 'close')
|
||||
resp.add_header('Content-Type', 'application/json')
|
||||
resp.add_header('Transfer-Encoding', 'chunked')
|
||||
resp.add_access_control_headers()
|
||||
await resp._send_headers()
|
||||
# Drain generator
|
||||
for chunk in res:
|
||||
chunk_len = len(chunk.encode('utf-8'))
|
||||
await resp.send('{:x}\r\n'.format(chunk_len))
|
||||
await resp.send(chunk)
|
||||
await resp.send('\r\n')
|
||||
gc.collect()
|
||||
await resp.send('0\r\n\r\n')
|
||||
else:
|
||||
if type(res) == tuple:
|
||||
resp.code = res[1]
|
||||
res = res[0]
|
||||
elif res is None:
|
||||
raise Exception('Result expected')
|
||||
# Send response
|
||||
if type(res) is dict:
|
||||
res_str = json.dumps(res)
|
||||
else:
|
||||
res_str = res
|
||||
resp.add_header('Content-Type', 'application/json')
|
||||
resp.add_header('Content-Length', str(len(res_str)))
|
||||
resp.add_access_control_headers()
|
||||
await resp._send_headers()
|
||||
await resp.send(res_str)
|
||||
|
||||
|
||||
class webserver:
|
||||
|
||||
def __init__(self, request_timeout=3, max_concurrency=3, backlog=16, debug=False):
|
||||
"""Tiny Web Server class.
|
||||
Keyword arguments:
|
||||
request_timeout - Time for client to send complete request
|
||||
after that connection will be closed.
|
||||
max_concurrency - How many connections can be processed concurrently.
|
||||
It is very important to limit this number because of
|
||||
memory constrain.
|
||||
Default value depends on platform
|
||||
backlog - Parameter to socket.listen() function. Defines size of
|
||||
pending to be accepted connections queue.
|
||||
Must be greater than max_concurrency
|
||||
debug - Whether send exception info (text + backtrace)
|
||||
to client together with HTTP 500 or not.
|
||||
"""
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.request_timeout = request_timeout
|
||||
self.max_concurrency = max_concurrency
|
||||
self.backlog = backlog
|
||||
self.debug = debug
|
||||
self.explicit_url_map = {}
|
||||
self.catch_all_handler = None
|
||||
self.parameterized_url_map = {}
|
||||
# Currently opened connections
|
||||
self.conns = {}
|
||||
# Statistics
|
||||
self.processed_connections = 0
|
||||
|
||||
def _find_url_handler(self, req):
|
||||
"""Helper to find URL handler.
|
||||
Returns tuple of (function, opts, param) or (None, None) if not found.
|
||||
"""
|
||||
# First try - lookup in explicit (non parameterized URLs)
|
||||
if req.path in self.explicit_url_map:
|
||||
return self.explicit_url_map[req.path]
|
||||
# Second try - strip last path segment and lookup in another map
|
||||
idx = req.path.rfind(b'/') + 1
|
||||
path2 = req.path[:idx]
|
||||
if len(path2) > 0 and path2 in self.parameterized_url_map:
|
||||
# Save parameter into request
|
||||
req._param = req.path[idx:].decode()
|
||||
return self.parameterized_url_map[path2]
|
||||
|
||||
if self.catch_all_handler:
|
||||
return self.catch_all_handler
|
||||
|
||||
# No handler found
|
||||
return (None, None)
|
||||
|
||||
async def _handle_request(self, req, resp):
|
||||
await req.read_request_line()
|
||||
# Find URL handler
|
||||
req.handler, req.params = self._find_url_handler(req)
|
||||
if not req.handler:
|
||||
# No URL handler found - read response and issue HTTP 404
|
||||
await req.read_headers()
|
||||
raise HTTPException(404)
|
||||
# req.params = params
|
||||
# req.handler = han
|
||||
resp.params = req.params
|
||||
# Read / parse headers
|
||||
await req.read_headers(req.params['save_headers'])
|
||||
|
||||
async def _handler(self, reader, writer):
|
||||
"""Handler for TCP connection with
|
||||
HTTP/1.0 protocol implementation
|
||||
"""
|
||||
gc.collect()
|
||||
|
||||
try:
|
||||
req = request(reader)
|
||||
resp = response(writer)
|
||||
# Read HTTP Request with timeout
|
||||
await asyncio.wait_for(self._handle_request(req, resp),
|
||||
self.request_timeout)
|
||||
|
||||
# OPTIONS method is handled automatically
|
||||
if req.method == b'OPTIONS':
|
||||
resp.add_access_control_headers()
|
||||
# Since we support only HTTP 1.0 - it is important
|
||||
# to tell browser that there is no payload expected
|
||||
# otherwise some webkit based browsers (Chrome)
|
||||
# treat this behavior as an error
|
||||
resp.add_header('Content-Length', '0')
|
||||
await resp._send_headers()
|
||||
return
|
||||
|
||||
# Ensure that HTTP method is allowed for this path
|
||||
if req.method not in req.params['methods']:
|
||||
raise HTTPException(405)
|
||||
|
||||
# Handle URL
|
||||
gc.collect()
|
||||
if hasattr(req, '_param'):
|
||||
await req.handler(req, resp, req._param)
|
||||
else:
|
||||
await req.handler(req, resp)
|
||||
# Done here
|
||||
except (asyncio.CancelledError, asyncio.TimeoutError):
|
||||
pass
|
||||
except OSError as e:
|
||||
# Do not send response for connection related errors - too late :)
|
||||
# P.S. code 32 - is possible BROKEN PIPE error (TODO: is it true?)
|
||||
if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET, 32):
|
||||
try:
|
||||
await resp.error(500)
|
||||
except Exception as e:
|
||||
log.exc(e, "")
|
||||
except HTTPException as e:
|
||||
try:
|
||||
await resp.error(e.code)
|
||||
except Exception as e:
|
||||
log.exc(e)
|
||||
except Exception as e:
|
||||
# Unhandled expection in user's method
|
||||
log.error(req.path.decode())
|
||||
log.exc(e, "")
|
||||
try:
|
||||
await resp.error(500)
|
||||
# Send exception info if desired
|
||||
if self.debug:
|
||||
sys.print_exception(e, resp.writer.s)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
await writer.aclose()
|
||||
# Max concurrency support -
|
||||
# if queue is full schedule resume of TCP server task
|
||||
if len(self.conns) == self.max_concurrency:
|
||||
self.loop.create_task(self._server_coro)
|
||||
# Delete connection, using socket as a key
|
||||
del self.conns[id(writer.s)]
|
||||
|
||||
def add_route(self, url, f, **kwargs):
|
||||
"""Add URL to function mapping.
|
||||
Arguments:
|
||||
url - url to map function with
|
||||
f - function to map
|
||||
Keyword arguments:
|
||||
methods - list of allowed methods. Defaults to ['GET', 'POST']
|
||||
save_headers - contains list of HTTP headers to be saved. Case sensitive. Default - empty.
|
||||
max_body_size - Max HTTP body size (e.g. POST form data). Defaults to 1024
|
||||
allowed_access_control_headers - Default value for the same name header. Defaults to *
|
||||
allowed_access_control_origins - Default value for the same name header. Defaults to *
|
||||
"""
|
||||
if url == '' or '?' in url:
|
||||
raise ValueError('Invalid URL')
|
||||
# Initial params for route
|
||||
params = {'methods': ['GET'],
|
||||
'save_headers': [],
|
||||
'max_body_size': 1024,
|
||||
'allowed_access_control_headers': '*',
|
||||
'allowed_access_control_origins': '*',
|
||||
}
|
||||
params.update(kwargs)
|
||||
params['allowed_access_control_methods'] = ', '.join(params['methods'])
|
||||
# Convert methods/headers to bytestring
|
||||
params['methods'] = [x.encode() for x in params['methods']]
|
||||
params['save_headers'] = [x.encode() for x in params['save_headers']]
|
||||
# If URL has a parameter
|
||||
if url.endswith('>'):
|
||||
idx = url.rfind('<')
|
||||
path = url[:idx]
|
||||
idx += 1
|
||||
param = url[idx:-1]
|
||||
if path.encode() in self.parameterized_url_map:
|
||||
raise ValueError('URL exists')
|
||||
params['_param_name'] = param
|
||||
self.parameterized_url_map[path.encode()] = (f, params)
|
||||
|
||||
if url.encode() in self.explicit_url_map:
|
||||
raise ValueError('URL exists')
|
||||
self.explicit_url_map[url.encode()] = (f, params)
|
||||
|
||||
def add_resource(self, cls, url, **kwargs):
|
||||
"""Map resource (RestAPI) to URL
|
||||
Arguments:
|
||||
cls - Resource class to map to
|
||||
url - url to map to class
|
||||
kwargs - User defined key args to pass to the handler.
|
||||
Example:
|
||||
class myres():
|
||||
def get(self, data):
|
||||
return {'hello': 'world'}
|
||||
app.add_resource(myres, '/api/myres')
|
||||
"""
|
||||
methods = []
|
||||
callmap = {}
|
||||
# Create instance of resource handler, if passed as just class (not instance)
|
||||
try:
|
||||
obj = cls()
|
||||
except TypeError:
|
||||
obj = cls
|
||||
# Get all implemented HTTP methods and make callmap
|
||||
for m in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
|
||||
fn = m.lower()
|
||||
if hasattr(obj, fn):
|
||||
methods.append(m)
|
||||
callmap[m.encode()] = (getattr(obj, fn), kwargs)
|
||||
self.add_route(url, restful_resource_handler,
|
||||
methods=methods,
|
||||
save_headers=['Content-Length', 'Content-Type'],
|
||||
_callmap=callmap)
|
||||
|
||||
def catchall(self):
|
||||
"""Decorator for catchall()
|
||||
Example:
|
||||
@app.catchall()
|
||||
def catchall_handler(req, resp):
|
||||
response.code = 404
|
||||
await response.start_html()
|
||||
await response.send('<html><body><h1>My custom 404!</h1></html>\n')
|
||||
"""
|
||||
params = {'methods': [b'GET'], 'save_headers': [], 'max_body_size': 1024, 'allowed_access_control_headers': '*', 'allowed_access_control_origins': '*'}
|
||||
|
||||
def _route(f):
|
||||
self.catch_all_handler = (f, params)
|
||||
return f
|
||||
return _route
|
||||
|
||||
def route(self, url, **kwargs):
|
||||
"""Decorator for add_route()
|
||||
Example:
|
||||
@app.route('/')
|
||||
def index(req, resp):
|
||||
await resp.start_html()
|
||||
await resp.send('<html><body><h1>Hello, world!</h1></html>\n')
|
||||
"""
|
||||
def _route(f):
|
||||
self.add_route(url, f, **kwargs)
|
||||
return f
|
||||
return _route
|
||||
|
||||
def resource(self, url, method='GET', **kwargs):
|
||||
"""Decorator for add_resource() method
|
||||
Examples:
|
||||
@app.resource('/users')
|
||||
def users(data):
|
||||
return {'a': 1}
|
||||
@app.resource('/messages/<topic_id>')
|
||||
async def index(data, topic_id):
|
||||
yield '{'
|
||||
yield '"topic_id": "{}",'.format(topic_id)
|
||||
yield '"message": "test",'
|
||||
yield '}'
|
||||
"""
|
||||
def _resource(f):
|
||||
self.add_route(url, restful_resource_handler,
|
||||
methods=[method],
|
||||
save_headers=['Content-Length', 'Content-Type'],
|
||||
_callmap={method.encode(): (f, kwargs)})
|
||||
return f
|
||||
return _resource
|
||||
|
||||
async def _tcp_server(self, host, port, backlog):
|
||||
"""TCP Server implementation.
|
||||
Opens socket for accepting connection and
|
||||
creates task for every new accepted connection
|
||||
"""
|
||||
addr = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][-1]
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setblocking(False)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind(addr)
|
||||
sock.listen(backlog)
|
||||
try:
|
||||
while True:
|
||||
if IS_UASYNCIO_V3:
|
||||
yield uasyncio.core._io_queue.queue_read(sock)
|
||||
else:
|
||||
yield asyncio.IORead(sock)
|
||||
csock, caddr = sock.accept()
|
||||
csock.setblocking(False)
|
||||
# Start handler / keep it in the map - to be able to
|
||||
# shutdown gracefully - by close all connections
|
||||
self.processed_connections += 1
|
||||
hid = id(csock)
|
||||
handler = self._handler(asyncio.StreamReader(csock),
|
||||
asyncio.StreamWriter(csock, {}))
|
||||
self.conns[hid] = handler
|
||||
self.loop.create_task(handler)
|
||||
# In case of max concurrency reached - temporary pause server:
|
||||
# 1. backlog must be greater than max_concurrency, otherwise
|
||||
# client will got "Connection Reset"
|
||||
# 2. Server task will be resumed whenever one active connection finished
|
||||
if len(self.conns) == self.max_concurrency:
|
||||
# Pause
|
||||
yield False
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
def run(self, host="127.0.0.1", port=8081, loop_forever=True):
|
||||
"""Run Web Server. By default it runs forever.
|
||||
Keyword arguments:
|
||||
host - host to listen on. By default - localhost (127.0.0.1)
|
||||
port - port to listen on. By default - 8081
|
||||
loop_forever - run loo.loop_forever(), otherwise caller must run it by itself.
|
||||
"""
|
||||
self._server_coro = self._tcp_server(host, port, self.backlog)
|
||||
self.loop.create_task(self._server_coro)
|
||||
if loop_forever:
|
||||
self.loop.run_forever()
|
||||
|
||||
def shutdown(self):
|
||||
"""Gracefully shutdown Web Server"""
|
||||
asyncio.cancel(self._server_coro)
|
||||
for hid, coro in self.conns.items():
|
||||
asyncio.cancel(coro)
|
Plik binarny nie jest wyświetlany.
|
@ -0,0 +1,111 @@
|
|||
from picographics import PicoGraphics, DISPLAY_INKY_FRAME_4 as DISPLAY # 4.0"
|
||||
import gc
|
||||
import time
|
||||
from machine import reset
|
||||
import inky_helper as ih
|
||||
|
||||
# Create a secrets.py with your Wifi details to be able to get the time
|
||||
#
|
||||
# secrets.py should contain:
|
||||
# WIFI_SSID = "Your WiFi SSID"
|
||||
# WIFI_PASSWORD = "Your WiFi password"
|
||||
|
||||
# Setup for the display.
|
||||
graphics = PicoGraphics(DISPLAY)
|
||||
WIDTH, HEIGHT = graphics.get_bounds()
|
||||
graphics.set_font("bitmap8")
|
||||
|
||||
|
||||
def launcher():
|
||||
# Draws the menu
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
graphics.set_pen(0)
|
||||
|
||||
graphics.text("Inky Launcher", 5, 5, 350, 5)
|
||||
graphics.text("Menu", 5, 50, 350, 3)
|
||||
|
||||
graphics.text("A. NASA Picture Of the Day", 5, 90, 350, 3)
|
||||
graphics.text("B. Word Clock", 5, 130, 350, 3)
|
||||
graphics.text("C. Daily Activity", 5, 170, 350, 3)
|
||||
graphics.text("D. Headlines", 5, 210, 350, 3)
|
||||
graphics.text("E. Random Joke", 5, 250, 350, 3)
|
||||
|
||||
graphics.update()
|
||||
|
||||
# Now we've drawn the menu to the screen, we wait here for the user to select an app.
|
||||
# Then once an app is selected, we set that as the current app and reset the device and load into it.
|
||||
|
||||
# You can replace any of the included examples with one of your own,
|
||||
# just replace the name of the app in the line "ih.update_last_app("nasa_apod")"
|
||||
|
||||
while True:
|
||||
if ih.inky_frame.button_a.read():
|
||||
ih.button_a_led.on()
|
||||
ih.update_state("nasa_apod")
|
||||
time.sleep(0.5)
|
||||
reset()
|
||||
if ih.inky_frame.button_b.read():
|
||||
ih.button_b_led.on()
|
||||
ih.update_state("word_clock")
|
||||
time.sleep(0.5)
|
||||
reset()
|
||||
if ih.inky_frame.button_c.read():
|
||||
ih.button_c_led.on()
|
||||
ih.update_state("daily_activity")
|
||||
time.sleep(0.5)
|
||||
reset()
|
||||
if ih.inky_frame.button_d.read():
|
||||
ih.button_d_led.on()
|
||||
ih.update_state("news_headlines")
|
||||
time.sleep(0.5)
|
||||
reset()
|
||||
if ih.inky_frame.button_e.read():
|
||||
ih.button_e_led.on()
|
||||
ih.update_state("random_joke")
|
||||
time.sleep(0.5)
|
||||
reset()
|
||||
|
||||
|
||||
# Turn any LEDs off that may still be on from last run.
|
||||
ih.clear_button_leds()
|
||||
ih.led_warn.off()
|
||||
|
||||
if ih.pressed() == ih.inky_frame.button_a and ih.pressed() == ih.inky_frame.button_e:
|
||||
launcher()
|
||||
|
||||
ih.clear_button_leds()
|
||||
|
||||
if ih.file_exists("state.json"):
|
||||
# Loads the JSON and launches the app
|
||||
ih.load_state()
|
||||
ih.launch_app(ih.state['run'])
|
||||
|
||||
# Passes the the graphics object from the launcher to the app
|
||||
ih.app.graphics = graphics
|
||||
ih.app.WIDTH = WIDTH
|
||||
ih.app.HEIGHT = HEIGHT
|
||||
|
||||
else:
|
||||
launcher()
|
||||
|
||||
try:
|
||||
from secrets import WIFI_SSID, WIFI_PASSWORD
|
||||
ih.network_connect(WIFI_SSID, WIFI_PASSWORD)
|
||||
except ImportError:
|
||||
print("Create secrets.py with your WiFi credentials")
|
||||
|
||||
# Get some memory back, we really need it!
|
||||
gc.collect()
|
||||
|
||||
# The main loop executes the update and draw function from the imported app,
|
||||
# and then goes to sleep ZzzzZZz
|
||||
|
||||
file = ih.file_exists("state.json")
|
||||
|
||||
print(file)
|
||||
|
||||
while True:
|
||||
ih.app.update()
|
||||
ih.app.draw()
|
||||
ih.sleep(ih.app.UPDATE_INTERVAL)
|
|
@ -0,0 +1,90 @@
|
|||
import gc
|
||||
import jpegdec
|
||||
from urllib import urequest
|
||||
from ujson import load
|
||||
|
||||
gc.collect()
|
||||
|
||||
graphics = None
|
||||
|
||||
WIDTH, HEIGHT = 0, 0
|
||||
|
||||
FILENAME = "nasa-apod-640x400-daily"
|
||||
IMG_URL = "https://pimoroni.github.io/feed2image/nasa-apod-640x400-daily.jpg"
|
||||
# A Demo Key is used in this example and is IP rate limited. You can get your own API Key from https://api.nasa.gov/
|
||||
API_URL = "https://api.nasa.gov/planetary/apod?api_key=CgQGiTiyzQWEfkPgZ4btNM1FTLZQP5DeSfEwbVr7"
|
||||
|
||||
# Length of time between updates in minutes.
|
||||
# Frequent updates will reduce battery life!
|
||||
UPDATE_INTERVAL = 1
|
||||
|
||||
# Variable for storing the NASA APOD Title
|
||||
apod_title = None
|
||||
|
||||
|
||||
def show_error(text):
|
||||
graphics.set_pen(4)
|
||||
graphics.rectangle(0, 10, 640, 35)
|
||||
graphics.set_pen(1)
|
||||
graphics.text(text, 5, 16, 400, 2)
|
||||
|
||||
|
||||
def update():
|
||||
global apod_title
|
||||
|
||||
try:
|
||||
# Grab the data
|
||||
socket = urequest.urlopen(API_URL)
|
||||
gc.collect()
|
||||
j = load(socket)
|
||||
socket.close()
|
||||
apod_title = j['title']
|
||||
gc.collect()
|
||||
except OSError as e:
|
||||
print(e)
|
||||
apod_title = "Image Title Unavailable"
|
||||
|
||||
try:
|
||||
# Grab the image
|
||||
socket = urequest.urlopen(IMG_URL)
|
||||
|
||||
gc.collect()
|
||||
|
||||
data = bytearray(1024)
|
||||
with open(FILENAME, "wb") as f:
|
||||
while True:
|
||||
if socket.readinto(data) == 0:
|
||||
break
|
||||
f.write(data)
|
||||
socket.close()
|
||||
del data
|
||||
gc.collect()
|
||||
except OSError as e:
|
||||
print(e)
|
||||
show_error("Unable to download image")
|
||||
|
||||
|
||||
def draw():
|
||||
jpeg = jpegdec.JPEG(graphics)
|
||||
gc.collect() # For good measure...
|
||||
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
|
||||
try:
|
||||
jpeg.open_file(FILENAME)
|
||||
jpeg.decode()
|
||||
except OSError:
|
||||
graphics.set_pen(4)
|
||||
graphics.rectangle(0, 170, 640, 25)
|
||||
graphics.set_pen(1)
|
||||
graphics.text("Unable to display image! :(", 5, 175, 400, 2)
|
||||
|
||||
graphics.set_pen(0)
|
||||
graphics.rectangle(0, 375, 640, 25)
|
||||
graphics.set_pen(1)
|
||||
graphics.text(apod_title, 5, 380, 400, 2)
|
||||
|
||||
gc.collect()
|
||||
|
||||
graphics.update()
|
|
@ -0,0 +1,160 @@
|
|||
from urllib import urequest
|
||||
import gc
|
||||
import qrcode
|
||||
|
||||
# Uncomment one URL to use (Top Stories, World News and technology)
|
||||
# URL = "http://feeds.bbci.co.uk/news/rss.xml"
|
||||
# URL = "http://feeds.bbci.co.uk/news/world/rss.xml"
|
||||
URL = "http://feeds.bbci.co.uk/news/technology/rss.xml"
|
||||
|
||||
# Length of time between updates in minutes.
|
||||
# Frequent updates will reduce battery life!
|
||||
UPDATE_INTERVAL = 30
|
||||
|
||||
graphics = None
|
||||
code = qrcode.QRCode()
|
||||
|
||||
|
||||
def read_until(stream, char):
|
||||
result = b""
|
||||
while True:
|
||||
c = stream.read(1)
|
||||
if c == char:
|
||||
return result
|
||||
result += c
|
||||
|
||||
|
||||
def discard_until(stream, c):
|
||||
while stream.read(1) != c:
|
||||
pass
|
||||
|
||||
|
||||
def parse_xml_stream(s, accept_tags, group_by, max_items=3):
|
||||
tag = []
|
||||
text = b""
|
||||
count = 0
|
||||
current = {}
|
||||
while True:
|
||||
char = s.read(1)
|
||||
if len(char) == 0:
|
||||
break
|
||||
|
||||
if char == b"<":
|
||||
next_char = s.read(1)
|
||||
|
||||
# Discard stuff like <?xml vers...
|
||||
if next_char == b"?":
|
||||
discard_until(s, b">")
|
||||
continue
|
||||
|
||||
# Detect <![CDATA
|
||||
elif next_char == b"!":
|
||||
s.read(1) # Discard [
|
||||
discard_until(s, b"[") # Discard CDATA[
|
||||
text = read_until(s, b"]")
|
||||
discard_until(s, b">") # Discard ]>
|
||||
gc.collect()
|
||||
|
||||
elif next_char == b"/":
|
||||
current_tag = read_until(s, b">")
|
||||
top_tag = tag[-1]
|
||||
|
||||
# Populate our result dict
|
||||
if top_tag in accept_tags:
|
||||
current[top_tag.decode("utf-8")] = text.decode("utf-8")
|
||||
|
||||
# If we've found a group of items, yield the dict
|
||||
elif top_tag == group_by:
|
||||
yield current
|
||||
current = {}
|
||||
count += 1
|
||||
if count == max_items:
|
||||
return
|
||||
tag.pop()
|
||||
text = b""
|
||||
gc.collect()
|
||||
continue
|
||||
|
||||
else:
|
||||
current_tag = read_until(s, b">")
|
||||
tag += [next_char + current_tag.split(b" ")[0]]
|
||||
text = b""
|
||||
gc.collect()
|
||||
|
||||
else:
|
||||
text += char
|
||||
|
||||
|
||||
def measure_qr_code(size, code):
|
||||
w, h = code.get_size()
|
||||
module_size = int(size / w)
|
||||
return module_size * w, module_size
|
||||
|
||||
|
||||
def draw_qr_code(ox, oy, size, code):
|
||||
size, module_size = measure_qr_code(size, code)
|
||||
graphics.set_pen(1)
|
||||
graphics.rectangle(ox, oy, size, size)
|
||||
graphics.set_pen(0)
|
||||
for x in range(size):
|
||||
for y in range(size):
|
||||
if code.get_module(x, y):
|
||||
graphics.rectangle(ox + x * module_size, oy + y * module_size, module_size, module_size)
|
||||
|
||||
|
||||
# A function to get the data from an RSS Feed, this in case BBC News.
|
||||
def get_rss():
|
||||
try:
|
||||
stream = urequest.urlopen(URL)
|
||||
output = list(parse_xml_stream(stream, [b"title", b"description", b"guid", b"pubDate"], b"item"))
|
||||
return output
|
||||
|
||||
except OSError as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
|
||||
feed = None
|
||||
|
||||
|
||||
def update():
|
||||
global feed
|
||||
# Gets Feed Data
|
||||
feed = get_rss()
|
||||
|
||||
|
||||
def draw():
|
||||
global feed
|
||||
WIDTH, HEIGHT = graphics.get_bounds()
|
||||
graphics.set_font("bitmap8")
|
||||
|
||||
# Clear the screen
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
graphics.set_pen(0)
|
||||
|
||||
# Title
|
||||
graphics.text("Headlines from BBC News:", 10, 10, 320, 3)
|
||||
|
||||
# Draws 2 articles from the feed if they're available.
|
||||
if feed:
|
||||
graphics.set_pen(4)
|
||||
graphics.text(feed[0]["title"], 10, 70, WIDTH - 150, 3 if graphics.measure_text(feed[0]["title"]) < 600 else 2)
|
||||
graphics.text(feed[1]["title"], 130, 260, WIDTH - 140, 3 if graphics.measure_text(feed[1]["title"]) < 600 else 2)
|
||||
|
||||
graphics.set_pen(3)
|
||||
graphics.text(feed[0]["description"], 10, 135 if graphics.measure_text(feed[0]["title"]) < 650 else 90, WIDTH - 150, 2)
|
||||
graphics.text(feed[1]["description"], 130, 320 if graphics.measure_text(feed[1]["title"]) < 650 else 230, WIDTH - 145, 2)
|
||||
|
||||
graphics.line(10, 215, 620, 215)
|
||||
|
||||
code.set_text(feed[0]["guid"])
|
||||
draw_qr_code(530, 65, 100, code)
|
||||
code.set_text(feed[1]["guid"])
|
||||
draw_qr_code(10, 265, 100, code)
|
||||
|
||||
else:
|
||||
graphics.set_pen(4)
|
||||
graphics.text("Error: Unable to get feed :(", 10, 40, WIDTH - 150, 4)
|
||||
|
||||
graphics.update()
|
|
@ -0,0 +1,32 @@
|
|||
import machine
|
||||
import time
|
||||
import usocket
|
||||
import struct
|
||||
|
||||
|
||||
def fetch(synch_with_rtc=True, timeout=10):
|
||||
ntp_host = "pool.ntp.org"
|
||||
|
||||
timestamp = None
|
||||
try:
|
||||
query = bytearray(48)
|
||||
query[0] = 0x1b
|
||||
address = usocket.getaddrinfo(ntp_host, 123)[0][-1]
|
||||
socket = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
|
||||
socket.settimeout(timeout)
|
||||
socket.sendto(query, address)
|
||||
data = socket.recv(48)
|
||||
socket.close()
|
||||
local_epoch = 2208988800 # selected by Chris - blame him. :-D
|
||||
timestamp = struct.unpack("!I", data[40:44])[0] - local_epoch
|
||||
timestamp = time.gmtime(timestamp)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# if requested set the machines RTC to the fetched timestamp
|
||||
if synch_with_rtc:
|
||||
machine.RTC().datetime((
|
||||
timestamp[0], timestamp[1], timestamp[2], timestamp[6],
|
||||
timestamp[3], timestamp[4], timestamp[5], 0))
|
||||
|
||||
return timestamp
|
|
@ -0,0 +1,80 @@
|
|||
import gc
|
||||
import random
|
||||
import jpegdec
|
||||
from urllib import urequest
|
||||
|
||||
gc.collect() # We're really gonna need that RAM!
|
||||
|
||||
graphics = None
|
||||
|
||||
WIDTH = 0
|
||||
HEIGHT = 0
|
||||
|
||||
FILENAME = "random-joke.jpg"
|
||||
|
||||
JOKE_IDS = "https://pimoroni.github.io/feed2image/jokeapi-ids.txt"
|
||||
JOKE_IMG = "https://pimoroni.github.io/feed2image/jokeapi-{}-{}x{}.jpg"
|
||||
|
||||
gc.collect() # Claw back some RAM!
|
||||
|
||||
|
||||
# We don't have the RAM to store the list of Joke IDs in memory.
|
||||
# the first line of `jokeapi-ids.txt` is a COUNT of IDs.
|
||||
# Grab it, then pick a random line between 0 and COUNT.
|
||||
# Seek to that line and ...y'know... there's our totally random joke ID
|
||||
|
||||
def update():
|
||||
|
||||
try:
|
||||
socket = urequest.urlopen(JOKE_IDS)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
# Get the first line, which is a count of the joke IDs
|
||||
number_of_lines = int(socket.readline().decode("ascii"))
|
||||
print("Total jokes {}".format(number_of_lines))
|
||||
|
||||
# Pick a random joke (by its line number)
|
||||
line = random.randint(0, number_of_lines)
|
||||
print("Getting ID from line {}".format(line))
|
||||
|
||||
for x in range(line): # Throw away lines to get where we need
|
||||
socket.readline()
|
||||
|
||||
# Read our chosen joke ID!
|
||||
random_joke_id = int(socket.readline().decode("ascii"))
|
||||
socket.close()
|
||||
|
||||
print("Random joke ID: {}".format(random_joke_id))
|
||||
|
||||
url = JOKE_IMG.format(random_joke_id, WIDTH, HEIGHT)
|
||||
|
||||
socket = urequest.urlopen(url)
|
||||
|
||||
# Stream the image data from the socket onto disk in 1024 byte chunks
|
||||
# the 600x448-ish jpeg will be roughly ~24k, we really don't have the RAM!
|
||||
data = bytearray(1024)
|
||||
with open(FILENAME, "wb") as f:
|
||||
while True:
|
||||
if socket.readinto(data) == 0:
|
||||
break
|
||||
f.write(data)
|
||||
socket.close()
|
||||
del data
|
||||
gc.collect() # We really are tight on RAM!
|
||||
|
||||
|
||||
def draw():
|
||||
jpeg = jpegdec.JPEG(graphics)
|
||||
gc.collect() # For good measure...
|
||||
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
|
||||
try:
|
||||
jpeg.open_file(FILENAME)
|
||||
jpeg.decode()
|
||||
except OSError:
|
||||
return
|
||||
|
||||
graphics.update()
|
|
@ -0,0 +1,80 @@
|
|||
import ntp
|
||||
import machine
|
||||
|
||||
# Length of time between updates in minutes.
|
||||
UPDATE_INTERVAL = 15
|
||||
graphics = None
|
||||
|
||||
rtc = machine.RTC()
|
||||
time_string = None
|
||||
words = ["it", "dx", "is", "mn", "about", "lve", "quarter", "c", "half", "to", "ou", "past", "n", "one",
|
||||
"two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve", "r", "O'Clock"]
|
||||
|
||||
|
||||
def approx_time(hours, minutes):
|
||||
nums = {0: "twelve", 1: "one", 2: "two",
|
||||
3: "three", 4: "four", 5: "five", 6: "six",
|
||||
7: "seven", 8: "eight", 9: "nine", 10: "ten",
|
||||
11: "eleven", 12: "twelve"}
|
||||
|
||||
if hours == 12:
|
||||
hours = 0
|
||||
if minutes > 0 and minutes < 8:
|
||||
return "it is about " + nums[hours] + " O'Clock"
|
||||
elif minutes >= 8 and minutes < 23:
|
||||
return "it is about quarter past " + nums[hours]
|
||||
elif minutes >= 23 and minutes < 38:
|
||||
return "Tit is about half past " + nums[hours]
|
||||
elif minutes >= 38 and minutes < 53:
|
||||
return "it is about quarter to " + nums[hours + 1]
|
||||
else:
|
||||
return "it is about " + nums[hours + 1] + " O'Clock"
|
||||
|
||||
|
||||
def update():
|
||||
global time_string
|
||||
# grab the current time from the ntp server and update the Pico RTC
|
||||
ntp.fetch()
|
||||
|
||||
current_t = rtc.datetime()
|
||||
time_string = approx_time(current_t[4] - 12 if current_t[4] > 12 else current_t[4], current_t[5])
|
||||
|
||||
# Splits the string into an array of words for displaying later
|
||||
time_string = time_string.split()
|
||||
|
||||
print(time_string)
|
||||
|
||||
|
||||
def draw():
|
||||
global time_string
|
||||
graphics.set_font("bitmap8")
|
||||
|
||||
WIDTH, HEIGHT = graphics.get_bounds()
|
||||
|
||||
# Clear the screen
|
||||
graphics.set_pen(1)
|
||||
graphics.clear()
|
||||
graphics.set_pen(6)
|
||||
|
||||
x = 10
|
||||
y = 10
|
||||
scale = 5
|
||||
spacing = 2
|
||||
|
||||
for word in words:
|
||||
|
||||
if word in time_string:
|
||||
graphics.set_pen(0)
|
||||
else:
|
||||
graphics.set_pen(7)
|
||||
|
||||
for letter in word:
|
||||
text_length = graphics.measure_text(letter, scale, spacing)
|
||||
if not x + text_length <= WIDTH:
|
||||
y += 70
|
||||
x = 5
|
||||
|
||||
graphics.text(letter.upper(), x, y, 640, scale, spacing)
|
||||
x += 40
|
||||
|
||||
graphics.update()
|
Ładowanie…
Reference in New Issue