kopia lustrzana https://github.com/peterhinch/micropython-samples
Initial commit
rodzic
0e3ec62201
commit
a534d5ed43
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2014 Peter Hinch
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,7 @@
|
|||
# mutex.py mutex_test.py
|
||||
|
||||
Implementation of a very simple mutex class
|
||||
|
||||
|
||||
|
||||
[reference](http://www.geeksforgeeks.org/mutex-vs-semaphore/)
|
|
@ -0,0 +1,55 @@
|
|||
import pyb, micropython, array, uctypes
|
||||
micropython.alloc_emergency_exception_buf(100)
|
||||
|
||||
class MutexException(OSError):
|
||||
pass
|
||||
|
||||
class Mutex:
|
||||
@micropython.asm_thumb
|
||||
def _acquire(r0, r1): # Spinlock: wait on the semaphore. Return on success.
|
||||
label(LOOP)
|
||||
ldr(r0, [r1, 0]) # Wait for lock to be zero
|
||||
cmp(r0, 0)
|
||||
bne(LOOP) # Another process has the lock: spin on it
|
||||
cpsid(0) # OK, we have lock at this instant disable interrupts
|
||||
ldr(r0, [r1, 0]) # and re-check in case an interrupt occurred
|
||||
cmp(r0, 0)
|
||||
itt(ne) # if someone got in first re-enable ints
|
||||
cpsie(0) # and start polling again
|
||||
b(LOOP)
|
||||
mov(r0, 1) # We have an exclusive access
|
||||
str(r0, [r1, 0]) # set the lock
|
||||
cpsie(0)
|
||||
|
||||
@micropython.asm_thumb
|
||||
def _attempt(r0, r1): # Nonblocking. Try to lock. Return 0 on success, 1 on fail
|
||||
cpsid(0) # disable interrupts
|
||||
ldr(r0, [r1, 0])
|
||||
cmp(r0, 0)
|
||||
bne(FAIL) # Another process has the lock: fail
|
||||
mov(r2, 1) # No lock
|
||||
str(r2, [r1, 0]) # set the lock
|
||||
label(FAIL)
|
||||
cpsie(0) # enable interrupts
|
||||
|
||||
def __init__(self):
|
||||
self.lock = array.array('i', (0,)) # 1 if a process has the lock else 0
|
||||
|
||||
# POSIX API pthread_mutex_lock() blocks the thread till resource is available.
|
||||
def __enter__(self):
|
||||
self._acquire(uctypes.addressof(self.lock))
|
||||
return self
|
||||
|
||||
def __exit__(self, *_):
|
||||
self.lock[0] = 0
|
||||
|
||||
# POSIX pthread_mutex_unlock()
|
||||
def release(self):
|
||||
if self.lock[0] == 0:
|
||||
raise MutexException('Semaphore already released')
|
||||
self.lock[0] = 0
|
||||
|
||||
# POSIX pthread_mutex_trylock() API. When mutex is not available the function returns immediately
|
||||
def test(self): # Nonblocking: try to acquire, return True if success.
|
||||
return self._attempt(uctypes.addressof(self.lock)) == 0
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
import pyb, mutex
|
||||
|
||||
mutex = mutex.Mutex()
|
||||
|
||||
def update(t): # Interrupt handler may not be able to acquire the lock
|
||||
global var1, var2 # if main loop has it
|
||||
if mutex.test(): # critical section start
|
||||
var1 += 1
|
||||
pyb.udelay(200)
|
||||
var2 += 1
|
||||
mutex.release() # critical section end
|
||||
|
||||
def main():
|
||||
global var1, var2
|
||||
var1, var2 = 0, 0
|
||||
t2 = pyb.Timer(2, freq = 995, callback = update)
|
||||
t4 = pyb.Timer(4, freq = 1000, callback = update)
|
||||
for x in range(1000000):
|
||||
with mutex: # critical section start
|
||||
a = var1
|
||||
pyb.udelay(200)
|
||||
b = var2
|
||||
result = a == b # critical section end
|
||||
if not result:
|
||||
print('Fail after {} iterations'.format(x))
|
||||
break
|
||||
pyb.delay(1)
|
||||
if x % 1000 == 0:
|
||||
print(x)
|
||||
t2.deinit()
|
||||
t4.deinit()
|
||||
print(var1, var2)
|
Ładowanie…
Reference in New Issue