diff --git a/drivers/onewire/ds18x20.py b/drivers/onewire/ds18x20.py index 72adcbfd46..bf06094835 100644 --- a/drivers/onewire/ds18x20.py +++ b/drivers/onewire/ds18x20.py @@ -1,101 +1,51 @@ -""" -DS18x20 temperature sensor driver for MicroPython. +# DS18x20 temperature sensor driver for MicroPython. +# MIT license; Copyright (c) 2016 Damien P. George -This driver uses the OneWire driver to control DS18S20 and DS18B20 -temperature sensors. It supports multiple devices on the same 1-wire bus. +from micropython import const -The following example assumes the ground of your DS18x20 is connected to -Y11, vcc is connected to Y9 and the data pin is connected to Y10. +_CONVERT = const(0x44) +_RD_SCRATCH = const(0xbe) +_WR_SCRATCH = const(0x4e) ->>> from machine import Pin ->>> gnd = Pin('Y11', Pin.OUT_PP) ->>> gnd.off() ->>> vcc = Pin('Y9', Pin.OUT_PP) ->>> vcc.on() +class DS18X20: + def __init__(self, onewire): + self.ow = onewire + self.buf = bytearray(9) ->>> from ds18x20 import DS18X20 ->>> d = DS18X20(Pin('Y10')) + def scan(self): + return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28] -Call read_temps to read all sensors: + def convert_temp(self): + self.ow.reset(True) + self.ow.writebyte(self.ow.SKIP_ROM) + self.ow.writebyte(_CONVERT) ->>> result = d.read_temps() ->>> print(result) -[20.875, 20.8125] + def read_scratch(self, rom): + self.ow.reset(True) + self.ow.select_rom(rom) + self.ow.writebyte(_RD_SCRATCH) + self.ow.readinto(self.buf) + if self.ow.crc8(self.buf): + raise Exception('CRC error') + return self.buf -Call read_temp to read the temperature of a specific sensor: + def write_scratch(self, rom, buf): + self.ow.reset(True) + self.ow.select_rom(rom) + self.ow.writebyte(_WR_SCRATCH) + self.ow.write(buf) ->>> result = d.read_temp(d.roms[0]) ->>> print(result) -20.25 - -If only one DS18x20 is attached to the bus, then you don't need to -pass a ROM to read_temp: - ->>> result = d.read_temp() ->>> print(result) -20.25 - -""" - -from onewire import OneWire - -class DS18X20(object): - def __init__(self, pin): - self.ow = OneWire(pin) - # Scan the 1-wire devices, but only keep those which have the - # correct # first byte in their rom for a DS18x20 device. - self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28] - - def read_temp(self, rom=None): - """ - Read and return the temperature of one DS18x20 device. - Pass the 8-byte bytes object with the ROM of the specific device you want to read. - If only one DS18x20 device is attached to the bus you may omit the rom parameter. - """ - rom = rom or self.roms[0] - ow = self.ow - ow.reset() - ow.select_rom(rom) - ow.write_byte(0x44) # Convert Temp - while True: - if ow.read_bit(): - break - ow.reset() - ow.select_rom(rom) - ow.write_byte(0xbe) # Read scratch - data = ow.read_bytes(9) - return self.convert_temp(rom[0], data) - - def read_temps(self): - """ - Read and return the temperatures of all attached DS18x20 devices. - """ - temps = [] - for rom in self.roms: - temps.append(self.read_temp(rom)) - return temps - - def convert_temp(self, rom0, data): - """ - Convert the raw temperature data into degrees celsius and return as a float. - """ - temp_lsb = data[0] - temp_msb = data[1] - if rom0 == 0x10: - if temp_msb != 0: - # convert negative number - temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1. - temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement + def read_temp(self, rom): + buf = self.read_scratch(rom) + if rom[0] == 0x10: + if buf[1]: + t = buf[0] >> 1 | 0x80 + t = -((~t + 1) & 0xff) else: - temp_read = temp_lsb >> 1 # truncate bit 0 by shifting - count_remain = data[6] - count_per_c = data[7] - temp = temp_read - 0.25 + (count_per_c - count_remain) / count_per_c - return temp - elif rom0 == 0x28: - temp = (temp_msb << 8 | temp_lsb) / 16 - if (temp_msb & 0xf8) == 0xf8: # for negative temperature - temp -= 0x1000 - return temp + t = buf[0] >> 1 + return t - 0.25 + (buf[7] - buf[6]) / buf[7] else: - assert False + t = buf[1] << 8 | buf[0] + if t & 0x8000: # sign bit set + t = -((t ^ 0xffff) + 1) + return t / 16 diff --git a/drivers/onewire/onewire.py b/drivers/onewire/onewire.py index c8016c0dac..546a69b304 100644 --- a/drivers/onewire/onewire.py +++ b/drivers/onewire/onewire.py @@ -1,336 +1,91 @@ -""" -OneWire library ported to MicroPython by Jason Hildebrand. +# 1-Wire driver for MicroPython +# MIT license; Copyright (c) 2016 Damien P. George +from micropython import const +import _onewire as _ow -TODO: - * implement and test parasite-power mode (as an init option) - * port the crc checks - -The original upstream copyright and terms follow. ------------------------------------------------------------------------------- - -Copyright (c) 2007, Jim Studt (original old version - many contributors since) - -OneWire has been maintained by Paul Stoffregen (paul@pjrc.com) since -January 2010. - -26 Sept 2008 -- Robin James - -Jim Studt's original library was modified by Josh Larios. - -Tom Pollard, pollard@alum.mit.edu, contributed around May 20, 2008 - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -Much of the code was inspired by Derek Yerger's code, though I don't -think much of that remains. In any event that was.. - (copyleft) 2006 by Derek Yerger - Free to distribute freely. -""" - -import pyb -from pyb import disable_irq -from pyb import enable_irq +class OneWireError(Exception): + pass class OneWire: + SEARCH_ROM = const(0xf0) + MATCH_ROM = const(0x55) + SKIP_ROM = const(0xcc) + def __init__(self, pin): - """ - Pass the data pin connected to your one-wire device(s), for example Pin('X1'). - The one-wire protocol allows for multiple devices to be attached. - """ - self.data_pin = pin - self.write_delays = (1, 40, 40, 1) - self.read_delays = (1, 1, 40) + self.pin = pin + self.pin.init(pin.OPEN_DRAIN) - # cache a bunch of methods and attributes. This is necessary in _write_bit and - # _read_bit to achieve the timing required by the OneWire protocol. - self.cache = (pin.init, pin.value, pin.OUT_PP, pin.IN, pin.PULL_UP) + def reset(self, required=False): + reset = _ow.reset(self.pin) + if required and not reset: + raise OneWireError + return reset - pin.init(pin.IN, pin.PULL_UP) + def readbit(self): + return _ow.readbit(self.pin) - def reset(self): - """ - Perform the onewire reset function. - Returns 1 if a device asserted a presence pulse, 0 otherwise. + def readbyte(self): + return _ow.readbyte(self.pin) - If you receive 0, then check your wiring and make sure you are providing - power and ground to your devices. - """ - retries = 25 - self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP) + def readinto(self, buf): + for i in range(len(buf)): + buf[i] = _ow.readbyte(self.pin) - # We will wait up to 250uS for - # the bus to come high, if it doesn't then it is broken or shorted - # and we return a 0; + def writebit(self, value): + return _ow.writebit(self.pin, value) - # wait until the wire is high... just in case - while True: - if self.data_pin.value(): - break - retries -= 1 - if retries == 0: - raise OSError("OneWire pin didn't go high") - pyb.udelay(10) + def writebyte(self, value): + return _ow.writebyte(self.pin, value) - # pull the bus low for at least 480us - self.data_pin.low() - self.data_pin.init(self.data_pin.OUT_PP) - pyb.udelay(480) - - # If there is a slave present, it should pull the bus low within 60us - i = pyb.disable_irq() - self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP) - pyb.udelay(70) - presence = not self.data_pin.value() - pyb.enable_irq(i) - pyb.udelay(410) - return presence - - def write_bit(self, value): - """ - Write a single bit. - """ - pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache - self._write_bit(value, pin_init, pin_value, Pin_OUT_PP) - - def _write_bit(self, value, pin_init, pin_value, Pin_OUT_PP): - """ - Write a single bit - requires cached methods/attributes be passed as arguments. - See also write_bit() - """ - d0, d1, d2, d3 = self.write_delays - udelay = pyb.udelay - if value: - # write 1 - i = disable_irq() - pin_value(0) - pin_init(Pin_OUT_PP) - udelay(d0) - pin_value(1) - enable_irq(i) - udelay(d1) - else: - # write 0 - i = disable_irq() - pin_value(0) - pin_init(Pin_OUT_PP) - udelay(d2) - pin_value(1) - enable_irq(i) - udelay(d3) - - def write_byte(self, value): - """ - Write a byte. The pin will go tri-state at the end of the write to avoid - heating in a short or other mishap. - """ - pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache - for i in range(8): - self._write_bit(value & 1, pin_init, pin_value, Pin_OUT_PP) - value >>= 1 - pin_init(Pin_IN, Pin_PULL_UP) - - def write_bytes(self, bytestring): - """ - Write a sequence of bytes. - """ - for byte in bytestring: - self.write_byte(byte) - - def _read_bit(self, pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP): - """ - Read a single bit - requires cached methods/attributes be passed as arguments. - See also read_bit() - """ - d0, d1, d2 = self.read_delays - udelay = pyb.udelay - pin_init(Pin_IN, Pin_PULL_UP) # TODO why do we need this? - i = disable_irq() - pin_value(0) - pin_init(Pin_OUT_PP) - udelay(d0) - pin_init(Pin_IN, Pin_PULL_UP) - udelay(d1) - value = pin_value() - enable_irq(i) - udelay(d2) - return value - - def read_bit(self): - """ - Read a single bit. - """ - pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache - return self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP) - - def read_byte(self): - """ - Read a single byte and return the value as an integer. - See also read_bytes() - """ - pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache - value = 0 - for i in range(8): - bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP) - value |= bit << i - return value - - def read_bytes(self, count): - """ - Read a sequence of N bytes. - The bytes are returned as a bytearray. - """ - s = bytearray(count) - for i in range(count): - s[i] = self.read_byte() - return s + def write(self, buf): + for b in buf: + _ow.writebyte(self.pin, b) def select_rom(self, rom): - """ - Select a specific device to talk to. Pass in rom as a bytearray (8 bytes). - """ - assert len(rom) == 8, "ROM must be 8 bytes" self.reset() - self.write_byte(0x55) # ROM MATCH - self.write_bytes(rom) - - def read_rom(self): - """ - Read the ROM - this works if there is only a single device attached. - """ - self.reset() - self.write_byte(0x33) # READ ROM - rom = self.read_bytes(8) - # TODO: check CRC of the ROM - return rom - - def skip_rom(self): - """ - Send skip-rom command - this works if there is only one device attached. - """ - self.write_byte(0xCC) # SKIP ROM - - def depower(self): - self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_NONE) + self.writebyte(MATCH_ROM) + self.write(rom) def scan(self): - """ - Return a list of ROMs for all attached devices. - Each ROM is returned as a bytes object of 8 bytes. - """ devices = [] - self._reset_search() - while True: - rom = self._search() - if not rom: - return devices - devices.append(rom) + diff = 65 + rom = False + for i in range(0xff): + rom, diff = self._search_rom(rom, diff) + if rom: + devices += [rom] + if diff == 0: + break + return devices - def _reset_search(self): - self.last_discrepancy = 0 - self.last_device_flag = False - self.last_family_discrepancy = 0 - self.rom = bytearray(8) - - def _search(self): - # initialize for search - id_bit_number = 1 - last_zero = 0 - rom_byte_number = 0 - rom_byte_mask = 1 - search_result = 0 - pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache - - # if the last call was not the last one - if not self.last_device_flag: - # 1-Wire reset - if not self.reset(): - self._reset_search() - return None - - # issue the search command - self.write_byte(0xF0) - - # loop to do the search - while rom_byte_number < 8: # loop until through all ROM bytes 0-7 - # read a bit and its complement - id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP) - cmp_id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP) - - # check for no devices on 1-wire - if (id_bit == 1) and (cmp_id_bit == 1): - break + def _search_rom(self, l_rom, diff): + if not self.reset(): + return None, 0 + self.writebyte(SEARCH_ROM) + if not l_rom: + l_rom = bytearray(8) + rom = bytearray(8) + next_diff = 0 + i = 64 + for byte in range(8): + r_b = 0 + for bit in range(8): + b = self.readbit() + if self.readbit(): + if b: # there are no devices or there is an error on the bus + return None, 0 else: - # all devices coupled have 0 or 1 - if (id_bit != cmp_id_bit): - search_direction = id_bit # bit write value for search - else: - # if this discrepancy if before the Last Discrepancy - # on a previous next then pick the same as last time - if (id_bit_number < self.last_discrepancy): - search_direction = (self.rom[rom_byte_number] & rom_byte_mask) > 0 - else: - # if equal to last pick 1, if not then pick 0 - search_direction = (id_bit_number == self.last_discrepancy) - - # if 0 was picked then record its position in LastZero - if search_direction == 0: - last_zero = id_bit_number - - # check for Last discrepancy in family - if last_zero < 9: - self.last_family_discrepancy = last_zero + if not b: # collision, two devices with different bit meaning + if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i): + b = 1 + next_diff = i + self.writebit(b) + if b: + r_b |= 1 << bit + i -= 1 + rom[byte] = r_b + return rom, next_diff - # set or clear the bit in the ROM byte rom_byte_number - # with mask rom_byte_mask - if search_direction == 1: - self.rom[rom_byte_number] |= rom_byte_mask - else: - self.rom[rom_byte_number] &= ~rom_byte_mask - - # serial number search direction write bit - #print('sd', search_direction) - self.write_bit(search_direction) - - # increment the byte counter id_bit_number - # and shift the mask rom_byte_mask - id_bit_number += 1 - rom_byte_mask <<= 1 - - # if the mask is 0 then go to new SerialNum byte rom_byte_number and reset mask - if rom_byte_mask == 0x100: - rom_byte_number += 1 - rom_byte_mask = 1 - - # if the search was successful then - if not (id_bit_number < 65): - # search successful so set last_discrepancy,last_device_flag,search_result - self.last_discrepancy = last_zero - - # check for last device - if self.last_discrepancy == 0: - self.last_device_flag = True - search_result = True - - # if no device found then reset counters so next 'search' will be like a first - if not search_result or not self.rom[0]: - self._reset_search() - return None - else: - return bytes(self.rom) + def crc8(self, data): + return _ow.crc8(data) diff --git a/esp8266/modules/ds18x20.py b/esp8266/modules/ds18x20.py deleted file mode 100644 index bf06094835..0000000000 --- a/esp8266/modules/ds18x20.py +++ /dev/null @@ -1,51 +0,0 @@ -# DS18x20 temperature sensor driver for MicroPython. -# MIT license; Copyright (c) 2016 Damien P. George - -from micropython import const - -_CONVERT = const(0x44) -_RD_SCRATCH = const(0xbe) -_WR_SCRATCH = const(0x4e) - -class DS18X20: - def __init__(self, onewire): - self.ow = onewire - self.buf = bytearray(9) - - def scan(self): - return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28] - - def convert_temp(self): - self.ow.reset(True) - self.ow.writebyte(self.ow.SKIP_ROM) - self.ow.writebyte(_CONVERT) - - def read_scratch(self, rom): - self.ow.reset(True) - self.ow.select_rom(rom) - self.ow.writebyte(_RD_SCRATCH) - self.ow.readinto(self.buf) - if self.ow.crc8(self.buf): - raise Exception('CRC error') - return self.buf - - def write_scratch(self, rom, buf): - self.ow.reset(True) - self.ow.select_rom(rom) - self.ow.writebyte(_WR_SCRATCH) - self.ow.write(buf) - - def read_temp(self, rom): - buf = self.read_scratch(rom) - if rom[0] == 0x10: - if buf[1]: - t = buf[0] >> 1 | 0x80 - t = -((~t + 1) & 0xff) - else: - t = buf[0] >> 1 - return t - 0.25 + (buf[7] - buf[6]) / buf[7] - else: - t = buf[1] << 8 | buf[0] - if t & 0x8000: # sign bit set - t = -((t ^ 0xffff) + 1) - return t / 16 diff --git a/esp8266/modules/onewire.py b/esp8266/modules/onewire.py deleted file mode 100644 index 83318d1a47..0000000000 --- a/esp8266/modules/onewire.py +++ /dev/null @@ -1,91 +0,0 @@ -# 1-Wire driver for MicroPython on ESP8266 -# MIT license; Copyright (c) 2016 Damien P. George - -from micropython import const -import _onewire as _ow - -class OneWireError(Exception): - pass - -class OneWire: - SEARCH_ROM = const(0xf0) - MATCH_ROM = const(0x55) - SKIP_ROM = const(0xcc) - - def __init__(self, pin): - self.pin = pin - self.pin.init(pin.OPEN_DRAIN) - - def reset(self, required=False): - reset = _ow.reset(self.pin) - if required and not reset: - raise OneWireError - return reset - - def readbit(self): - return _ow.readbit(self.pin) - - def readbyte(self): - return _ow.readbyte(self.pin) - - def readinto(self, buf): - for i in range(len(buf)): - buf[i] = _ow.readbyte(self.pin) - - def writebit(self, value): - return _ow.writebit(self.pin, value) - - def writebyte(self, value): - return _ow.writebyte(self.pin, value) - - def write(self, buf): - for b in buf: - _ow.writebyte(self.pin, b) - - def select_rom(self, rom): - self.reset() - self.writebyte(MATCH_ROM) - self.write(rom) - - def scan(self): - devices = [] - diff = 65 - rom = False - for i in range(0xff): - rom, diff = self._search_rom(rom, diff) - if rom: - devices += [rom] - if diff == 0: - break - return devices - - def _search_rom(self, l_rom, diff): - if not self.reset(): - return None, 0 - self.writebyte(SEARCH_ROM) - if not l_rom: - l_rom = bytearray(8) - rom = bytearray(8) - next_diff = 0 - i = 64 - for byte in range(8): - r_b = 0 - for bit in range(8): - b = self.readbit() - if self.readbit(): - if b: # there are no devices or there is an error on the bus - return None, 0 - else: - if not b: # collision, two devices with different bit meaning - if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i): - b = 1 - next_diff = i - self.writebit(b) - if b: - r_b |= 1 << bit - i -= 1 - rom[byte] = r_b - return rom, next_diff - - def crc8(self, data): - return _ow.crc8(data)