first re-commit.

This commit is contained in:
2025-08-05 22:33:23 +02:00
commit e1ff579d1a
295 changed files with 107130 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
Class to read the relative humidity and temperature from a DHT11 sensor.
It implements the iterator python protocol.

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import time
import pigpio
class DHT11(object):
"""
The DHT11 class is a stripped version of the DHT22 sensor code by joan2937.
You can find the initial implementation here:
- https://github.com/srounet/pigpio/tree/master/EXAMPLES/Python/DHT22_AM2302_SENSOR
example code:
>>> pi = pigpio.pi()
>>> sensor = DHT11(pi, 4) # 4 is the data GPIO pin connected to your sensor
>>> for response in sensor:
.... print("Temperature: {}".format(response['temperature']))
.... print("Humidity: {}".format(response['humidity']))
"""
def __init__(self, pi, gpio):
"""
pi (pigpio): an instance of pigpio
gpio (int): gpio pin number
"""
self.pi = pi
self.gpio = gpio
self.high_tick = 0
self.bit = 40
self.temperature = 0
self.humidity = 0
self.either_edge_cb = None
self.setup()
def setup(self):
"""
Clears the internal gpio pull-up/down resistor.
Kills any watchdogs.
"""
self.pi.set_pull_up_down(self.gpio, pigpio.PUD_OFF)
self.pi.set_watchdog(self.gpio, 0)
self.register_callbacks()
def register_callbacks(self):
"""
Monitors RISING_EDGE changes using callback.
"""
self.either_edge_cb = self.pi.callback(
self.gpio,
pigpio.EITHER_EDGE,
self.either_edge_callback
)
def either_edge_callback(self, gpio, level, tick):
"""
Either Edge callbacks, called each time the gpio edge changes.
Accumulate the 40 data bits from the dht11 sensor.
"""
level_handlers = {
pigpio.FALLING_EDGE: self._edge_FALL,
pigpio.RISING_EDGE: self._edge_RISE,
pigpio.EITHER_EDGE: self._edge_EITHER
}
handler = level_handlers[level]
diff = pigpio.tickDiff(self.high_tick, tick)
handler(tick, diff)
def _edge_RISE(self, tick, diff):
"""
Handle Rise signal.
"""
val = 0
if diff >= 50:
val = 1
if diff >= 200: # Bad bit?
self.checksum = 256 # Force bad checksum
if self.bit >= 40: # Message complete
self.bit = 40
elif self.bit >= 32: # In checksum byte
self.checksum = (self.checksum << 1) + val
if self.bit == 39:
# 40th bit received
self.pi.set_watchdog(self.gpio, 0)
total = self.humidity + self.temperature
# is checksum ok ?
if not (total & 255) == self.checksum:
raise
elif 16 <= self.bit < 24: # in temperature byte
self.temperature = (self.temperature << 1) + val
elif 0 <= self.bit < 8: # in humidity byte
self.humidity = (self.humidity << 1) + val
else: # skip header bits
pass
self.bit += 1
def _edge_FALL(self, tick, diff):
"""
Handle Fall signal.
"""
self.high_tick = tick
if diff <= 250000:
return
self.bit = -2
self.checksum = 0
self.temperature = 0
self.humidity = 0
def _edge_EITHER(self, tick, diff):
"""
Handle Either signal.
"""
self.pi.set_watchdog(self.gpio, 0)
def read(self):
"""
Start reading over DHT11 sensor.
"""
self.pi.write(self.gpio, pigpio.LOW)
time.sleep(0.017) # 17 ms
self.pi.set_mode(self.gpio, pigpio.INPUT)
self.pi.set_watchdog(self.gpio, 200)
time.sleep(0.2)
def close(self):
"""
Stop reading sensor, remove callbacks.
"""
self.pi.set_watchdog(self.gpio, 0)
if self.either_edge_cb:
self.either_edge_cb.cancel()
self.either_edge_cb = None
def __iter__(self):
"""
Support the iterator protocol.
"""
return self
def next(self):
"""
Call the read method and return temperature and humidity informations.
"""
self.read()
response = {
'humidity': self.humidity,
'temperature': self.temperature
}
return response
if __name__ == '__main__':
pi = pigpio.pi()
sensor = DHT11(pi, 4)
for d in sensor:
print("temperature: {}".format(d['temperature']))
print("humidity: {}".format(d['humidity']))
time.sleep(1)
sensor.close()

View File

@@ -0,0 +1,283 @@
#!/usr/bin/env python
# 2014-07-11 DHT22.py
import time
import atexit
import pigpio
class sensor:
"""
A class to read relative humidity and temperature from the
DHT22 sensor. The sensor is also known as the AM2302.
The sensor can be powered from the Pi 3V3 or the Pi 5V rail.
Powering from the 3V3 rail is simpler and safer. You may need
to power from 5V if the sensor is connected via a long cable.
For 3V3 operation connect pin 1 to 3V3 and pin 4 to ground.
Connect pin 2 to a gpio.
For 5V operation connect pin 1 to 5V and pin 4 to ground.
The following pin 2 connection works for me. Use at YOUR OWN RISK.
5V--5K_resistor--+--10K_resistor--Ground
|
DHT22 pin 2 -----+
|
gpio ------------+
"""
def __init__(self, pi, gpio, LED=None, power=None):
"""
Instantiate with the Pi and gpio to which the DHT22 output
pin is connected.
Optionally a LED may be specified. This will be blinked for
each successful reading.
Optionally a gpio used to power the sensor may be specified.
This gpio will be set high to power the sensor. If the sensor
locks it will be power cycled to restart the readings.
Taking readings more often than about once every two seconds will
eventually cause the DHT22 to hang. A 3 second interval seems OK.
"""
self.pi = pi
self.gpio = gpio
self.LED = LED
self.power = power
if power is not None:
pi.write(power, 1) # Switch sensor on.
time.sleep(2)
self.powered = True
self.cb = None
atexit.register(self.cancel)
self.bad_CS = 0 # Bad checksum count.
self.bad_SM = 0 # Short message count.
self.bad_MM = 0 # Missing message count.
self.bad_SR = 0 # Sensor reset count.
# Power cycle if timeout > MAX_TIMEOUTS.
self.no_response = 0
self.MAX_NO_RESPONSE = 2
self.rhum = -999
self.temp = -999
self.tov = None
self.high_tick = 0
self.bit = 40
pi.set_pull_up_down(gpio, pigpio.PUD_OFF)
pi.set_watchdog(gpio, 0) # Kill any watchdogs.
self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
def _cb(self, gpio, level, tick):
"""
Accumulate the 40 data bits. Format into 5 bytes, humidity high,
humidity low, temperature high, temperature low, checksum.
"""
diff = pigpio.tickDiff(self.high_tick, tick)
if level == 0:
# Edge length determines if bit is 1 or 0.
if diff >= 50:
val = 1
if diff >= 200: # Bad bit?
self.CS = 256 # Force bad checksum.
else:
val = 0
if self.bit >= 40: # Message complete.
self.bit = 40
elif self.bit >= 32: # In checksum byte.
self.CS = (self.CS << 1) + val
if self.bit == 39:
# 40th bit received.
self.pi.set_watchdog(self.gpio, 0)
self.no_response = 0
total = self.hH + self.hL + self.tH + self.tL
if (total & 255) == self.CS: # Is checksum ok?
self.rhum = ((self.hH << 8) + self.hL) * 0.1
if self.tH & 128: # Negative temperature.
mult = -0.1
self.tH = self.tH & 127
else:
mult = 0.1
self.temp = ((self.tH << 8) + self.tL) * mult
self.tov = time.time()
if self.LED is not None:
self.pi.write(self.LED, 0)
else:
self.bad_CS += 1
elif self.bit >= 24: # in temp low byte
self.tL = (self.tL << 1) + val
elif self.bit >= 16: # in temp high byte
self.tH = (self.tH << 1) + val
elif self.bit >= 8: # in humidity low byte
self.hL = (self.hL << 1) + val
elif self.bit >= 0: # in humidity high byte
self.hH = (self.hH << 1) + val
else: # header bits
pass
self.bit += 1
elif level == 1:
self.high_tick = tick
if diff > 250000:
self.bit = -2
self.hH = 0
self.hL = 0
self.tH = 0
self.tL = 0
self.CS = 0
else: # level == pigpio.TIMEOUT:
self.pi.set_watchdog(self.gpio, 0)
if self.bit < 8: # Too few data bits received.
self.bad_MM += 1 # Bump missing message count.
self.no_response += 1
if self.no_response > self.MAX_NO_RESPONSE:
self.no_response = 0
self.bad_SR += 1 # Bump sensor reset count.
if self.power is not None:
self.powered = False
self.pi.write(self.power, 0)
time.sleep(2)
self.pi.write(self.power, 1)
time.sleep(2)
self.powered = True
elif self.bit < 39: # Short message receieved.
self.bad_SM += 1 # Bump short message count.
self.no_response = 0
else: # Full message received.
self.no_response = 0
def temperature(self):
"""Return current temperature."""
return self.temp
def humidity(self):
"""Return current relative humidity."""
return self.rhum
def staleness(self):
"""Return time since measurement made."""
if self.tov is not None:
return time.time() - self.tov
else:
return -999
def bad_checksum(self):
"""Return count of messages received with bad checksums."""
return self.bad_CS
def short_message(self):
"""Return count of short messages."""
return self.bad_SM
def missing_message(self):
"""Return count of missing messages."""
return self.bad_MM
def sensor_resets(self):
"""Return count of power cycles because of sensor hangs."""
return self.bad_SR
def trigger(self):
"""Trigger a new relative humidity and temperature reading."""
if self.powered:
if self.LED is not None:
self.pi.write(self.LED, 1)
self.pi.write(self.gpio, pigpio.LOW)
time.sleep(0.017) # 17 ms
self.pi.set_mode(self.gpio, pigpio.INPUT)
self.pi.set_watchdog(self.gpio, 200)
def cancel(self):
"""Cancel the DHT22 sensor."""
self.pi.set_watchdog(self.gpio, 0)
if self.cb is not None:
self.cb.cancel()
self.cb = None
if __name__ == "__main__":
import time
import pigpio
import DHT22
# Intervals of about 2 seconds or less will eventually hang the DHT22.
INTERVAL = 3
pi = pigpio.pi()
s = DHT22.sensor(pi, 22, LED=16, power=8)
r = 0
next_reading = time.time()
while True:
r += 1
s.trigger()
time.sleep(0.2)
print("{} {} {} {:3.2f} {} {} {} {}".format(
r, s.humidity(), s.temperature(), s.staleness(),
s.bad_checksum(), s.short_message(), s.missing_message(),
s.sensor_resets()))
next_reading += INTERVAL
time.sleep(next_reading-time.time()) # Overall INTERVAL second polling.
s.cancel()
pi.stop()

View File

@@ -0,0 +1,2 @@
Class to read the relative humidity and temperature from a DHT22/AM2302 sensor.

View File

@@ -0,0 +1,2 @@
Script to display the status of gpios 0-31.

View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python
import time
import curses
import atexit
import sys
import pigpio
GPIOS=32
MODES=["INPUT", "OUTPUT", "ALT5", "ALT4", "ALT0", "ALT1", "ALT2", "ALT3"]
def cleanup():
curses.nocbreak()
curses.echo()
curses.endwin()
pi.stop()
pi = pigpio.pi()
if not pi.connected:
sys.exit(1)
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
atexit.register(cleanup)
cb = []
for g in range(GPIOS):
cb.append(pi.callback(g, pigpio.EITHER_EDGE))
# disable gpio 28 as the PCM clock is swamping the system
cb[28].cancel()
stdscr.nodelay(1)
stdscr.addstr(0, 23, "Status of gpios 0-31", curses.A_REVERSE)
while True:
for g in range(GPIOS):
tally = cb[g].tally()
mode = pi.get_mode(g)
col = (g // 11) * 25
row = (g % 11) + 2
stdscr.addstr(row, col, "{:2}".format(g), curses.A_BOLD)
stdscr.addstr(
"={} {:>6}: {:<10}".format(pi.read(g), MODES[mode], tally))
stdscr.refresh()
time.sleep(0.1)
c = stdscr.getch()
if c != curses.ERR:
break

View File

@@ -0,0 +1,2 @@
Program to show status changes for a Hall effect sensor.

View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
import time
import pigpio
#
# OH3144E or equivalent Hall effect sensor
#
# Pin 1 - 5V
# Pin 2 - Ground
# Pin 3 - gpio (here P1-8, gpio 14, TXD is used)
#
# The internal gpio pull-up is enabled so that the sensor
# normally reads high. It reads low when a magnet is close.
#
HALL=14
pi = pigpio.pi() # connect to local Pi
pi.set_mode(HALL, pigpio.INPUT)
pi.set_pull_up_down(HALL, pigpio.PUD_UP)
start = time.time()
while (time.time() - start) < 60:
print("Hall = {}".format(pi.read(HALL)))
time.sleep(0.2)
pi.stop()

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python
import time
import pigpio
class sniffer:
"""
A class to passively monitor activity on an I2C bus. This should
work for an I2C bus running at 100kbps or less. You are unlikely
to get any usable results for a bus running any faster.
"""
def __init__(self, pi, SCL, SDA, set_as_inputs=True):
"""
Instantiate with the Pi and the gpios for the I2C clock
and data lines.
If you are monitoring one of the Raspberry Pi buses you
must set set_as_inputs to False so that they remain in
I2C mode.
The pigpio daemon should have been started with a higher
than default sample rate.
For an I2C bus rate of 100Kbps sudo pigpiod -s 2 should work.
A message is printed for each I2C transaction formatted with
"[" for the START
"XX" two hex characters for each data byte
"+" if the data is ACKd, "-" if the data is NACKd
"]" for the STOP
E.g. Reading the X, Y, Z values from an ADXL345 gives:
[A6+32+]
[A7+01+FF+F2+FF+06+00-]
"""
self.pi = pi
self.gSCL = SCL
self.gSDA = SDA
self.FALLING = 0
self.RISING = 1
self.STEADY = 2
self.in_data = False
self.byte = 0
self.bit = 0
self.oldSCL = 1
self.oldSDA = 1
self.transact = ""
if set_as_inputs:
self.pi.set_mode(SCL, pigpio.INPUT)
self.pi.set_mode(SDA, pigpio.INPUT)
self.cbA = self.pi.callback(SCL, pigpio.EITHER_EDGE, self._cb)
self.cbB = self.pi.callback(SDA, pigpio.EITHER_EDGE, self._cb)
def _parse(self, SCL, SDA):
"""
Accumulate all the data between START and STOP conditions
into a string and output when STOP is detected.
"""
if SCL != self.oldSCL:
self.oldSCL = SCL
if SCL:
xSCL = self.RISING
else:
xSCL = self.FALLING
else:
xSCL = self.STEADY
if SDA != self.oldSDA:
self.oldSDA = SDA
if SDA:
xSDA = self.RISING
else:
xSDA = self.FALLING
else:
xSDA = self.STEADY
if xSCL == self.RISING:
if self.in_data:
if self.bit < 8:
self.byte = (self.byte << 1) | SDA
self.bit += 1
else:
self.transact += '{:02X}'.format(self.byte)
if SDA:
self.transact += '-'
else:
self.transact += '+'
self.bit = 0
self.byte = 0
elif xSCL == self.STEADY:
if xSDA == self.RISING:
if SCL:
self.in_data = False
self.byte = 0
self.bit = 0
self.transact += ']' # STOP
print (self.transact)
self.transact = ""
if xSDA == self.FALLING:
if SCL:
self.in_data = True
self.byte = 0
self.bit = 0
self.transact += '[' # START
def _cb(self, gpio, level, tick):
"""
Check which line has altered state (ignoring watchdogs) and
call the parser with the new state.
"""
SCL = self.oldSCL
SDA = self.oldSDA
if gpio == self.gSCL:
if level == 0:
SCL = 0
elif level == 1:
SCL = 1
if gpio == self.gSDA:
if level == 0:
SDA = 0
elif level == 1:
SDA = 1
self._parse(SCL, SDA)
def cancel(self):
"""Cancel the I2C callbacks."""
self.cbA.cancel()
self.cbB.cancel()
if __name__ == "__main__":
import time
import pigpio
import I2C_sniffer
pi = pigpio.pi()
s = I2C_sniffer.sniffer(pi, 1, 0, False) # leave gpios 1/0 in I2C mode
time.sleep(60000)
s.cancel()
pi.stop()

View File

@@ -0,0 +1,2 @@
A program to passively sniff I2C transactions (100kHz bus maximum) and display the results.

View File

@@ -0,0 +1,2 @@
Class to hash a code from an IR receiver (reading an IR remote control).

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python
import pigpio
class hasher:
"""
This class forms a hash over the IR pulses generated by an
IR remote.
The remote key press is not converted into a code in the manner of
the lirc module. No attempt is made to decode the type of protocol
used by the remote. The hash is likely to be unique for different
keys and different remotes but this is not guaranteed.
This hashing process works for some remotes/protocols but not for
others. The only way to find out if it works for one or more of
your remotes is to try it and see.
EXAMPLE CODE
#!/usr/bin/env python
import time
import pigpio
import ir_hasher
def callback(hash):
print("hash={}".format(hash));
pi = pigpio.pi()
ir = ir_hasher.hasher(pi, 7, callback, 5)
print("ctrl c to exit");
time.sleep(300)
pi.stop()
"""
def __init__(self, pi, gpio, callback, timeout=5):
"""
Initialises an IR remote hasher on a pi's gpio. A gap of timeout
milliseconds indicates the end of the remote key press.
"""
self.pi = pi
self.gpio = gpio
self.code_timeout = timeout
self.callback = callback
self.in_code = False
pi.set_mode(gpio, pigpio.INPUT)
self.cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cb)
def _hash(self, old_val, new_val):
if new_val < (old_val * 0.60):
val = 13
elif old_val < (new_val * 0.60):
val = 23
else:
val = 2
self.hash_val = self.hash_val ^ val
self.hash_val *= 16777619 # FNV_PRIME_32
self.hash_val = self.hash_val & ((1<<32)-1)
def _cb(self, gpio, level, tick):
if level != pigpio.TIMEOUT:
if self.in_code == False:
self.in_code = True
self.pi.set_watchdog(self.gpio, self.code_timeout)
self.hash_val = 2166136261 # FNV_BASIS_32
self.edges = 1
self.t1 = None
self.t2 = None
self.t3 = None
self.t4 = tick
else:
self.edges += 1
self.t1 = self.t2
self.t2 = self.t3
self.t3 = self.t4
self.t4 = tick
if self.t1 is not None:
d1 = pigpio.tickDiff(self.t1,self.t2)
d2 = pigpio.tickDiff(self.t3,self.t4)
self._hash(d1, d2)
else:
if self.in_code:
self.in_code = False
self.pi.set_watchdog(self.gpio, 0)
if self.edges > 12:
self.callback(self.hash_val)
if __name__ == "__main__":
import time
import pigpio
import ir_hasher
hashes = {
142650387: '2', 244341844: 'menu', 262513468: 'vol-',
272048826: '5', 345069212: '6', 363685443: 'prev.ch',
434191356: '1', 492745084: 'OK', 549497027: 'mute',
603729091: 'text', 646476378: 'chan-', 832916949: 'home',
923778138: 'power', 938165610: 'power', 953243510: 'forward',
1009731980:'1', 1018231875:'TV', 1142888517:'c-up',
1151589683:'chan+', 1344018636:'OK', 1348032067:'chan+',
1367109971:'prev.ch', 1370712102:'c-left', 1438405361:'rewind',
1452589043:'pause', 1518578730:'chan-', 1554432645:'8',
1583569525:'0', 1629745313:'rewind', 1666513749:'record',
1677653754:'c-down', 1825951717:'c-right', 1852412236:'6',
1894279468:'9', 1904895749:'vol+', 1941947509:'ff',
2076573637:'0', 2104823531:'back', 2141641957:'home',
2160787557:'record', 2398525299:'7', 2468117013:'8',
2476712746:'play', 2574308838:'forward', 2577952149:'4',
2706654902:'stop', 2829002741:'c-up', 2956097083:'back',
3112717386:'5', 3263244773:'ff', 3286088195:'pause',
3363767978:'c-down', 3468076364:'vol-', 3491068358:'stop',
3593710134:'c-left', 3708232515:'3', 3734134565:'back',
3766109107:'TV', 3798010010:'play', 3869937700:'menu',
3872715523:'7', 3885097091:'2', 3895301587:'text',
3931058739:'mute', 3983900853:'c-right', 4032250885:'4',
4041913909:'vol+', 4207017660:'9', 4227138677:'back',
4294027955:'3'}
def callback(hash):
if hash in hashes:
print("key={} hash={}".format(hashes[hash], hash));
pi = pigpio.pi()
ir = ir_hasher.hasher(pi, 7, callback, 5)
print("ctrl c to exit");
time.sleep(300)
pi.stop()

View File

@@ -0,0 +1,2 @@
Script to transmit the morse code corresponding to a text string.

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python
import pigpio
morse={
'a':'.-' , 'b':'-...' , 'c':'-.-.' , 'd':'-..' , 'e':'.' ,
'f':'..-.' , 'g':'--.' , 'h':'....' , 'i':'..' , 'j':'.---' ,
'k':'-.-' , 'l':'.-..' , 'm':'--' , 'n':'-.' , 'o':'---' ,
'p':'.--.' , 'q':'--.-' , 'r':'.-.' , 's':'...' , 't':'-' ,
'u':'..-' , 'v':'...-' , 'w':'.--' , 'x':'-..-' , 'y':'-.--' ,
'z':'--..' , '1':'.----', '2':'..---', '3':'...--', '4':'....-',
'5':'.....', '6':'-....', '7':'--...', '8':'---..', '9':'----.',
'0':'-----'}
GPIO=22
MICROS=100000
NONE=0
DASH=3
DOT=1
GAP=1
LETTER_GAP=3-GAP
WORD_GAP=7-LETTER_GAP
def transmit_string(pi, gpio, str):
pi.wave_clear() # start a new waveform
wf=[]
for C in str:
c=C.lower()
print(c)
if c in morse:
k = morse[c]
for x in k:
if x == '.':
wf.append(pigpio.pulse(1<<gpio, NONE, DOT * MICROS))
else:
wf.append(pigpio.pulse(1<<gpio, NONE, DASH * MICROS))
wf.append(pigpio.pulse(NONE, 1<<gpio, GAP * MICROS))
wf.append(pigpio.pulse(NONE, 1<<gpio, LETTER_GAP * MICROS))
elif c == ' ':
wf.append(pigpio.pulse(NONE, 1<<gpio, WORD_GAP * MICROS))
pi.wave_add_generic(wf)
pi.wave_tx_start()
pi = pigpio.pi()
pi.set_mode(GPIO, pigpio.OUTPUT)
transmit_string(pi, GPIO, "Now is the winter of our discontent")
while pi.wave_tx_busy():
pass
transmit_string(pi, GPIO, "made glorious summer by this sun of York")
while pi.wave_tx_busy():
pass
pi.stop()

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
# 2014-08-26 PCF8591.py
import time
import curses
import pigpio
# sudo pigpiod
# ./PCF8591.py
# Connect Pi 3V3 - VCC, Ground - Ground, SDA - SDA, SCL - SCL.
YL_40=0x48
pi = pigpio.pi() # Connect to local Pi.
handle = pi.i2c_open(1, YL_40, 0)
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
aout = 0
stdscr.addstr(10, 0, "Brightness")
stdscr.addstr(12, 0, "Temperature")
stdscr.addstr(14, 0, "AOUT->AIN2")
stdscr.addstr(16, 0, "Resistor")
stdscr.nodelay(1)
try:
while True:
for a in range(0,4):
aout = aout + 1
pi.i2c_write_byte_data(handle, 0x40 | ((a+1) & 0x03), aout&0xFF)
v = pi.i2c_read_byte(handle)
hashes = v / 4
spaces = 64 - hashes
stdscr.addstr(10+a*2, 12, str(v) + ' ')
stdscr.addstr(10+a*2, 16, '#' * hashes + ' ' * spaces )
stdscr.refresh()
time.sleep(0.04)
c = stdscr.getch()
if c != curses.ERR:
break
except:
pass
curses.nocbreak()
curses.echo()
curses.endwin()
pi.i2c_close(handle)
pi.stop()

View File

@@ -0,0 +1,2 @@
Script to display readings from the (I2C) PCF8591.

View File

@@ -0,0 +1,2 @@
Script to benchmark the pigpio Python module's performance.

View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# WARNING!
#
##############################################################################
# #
# Unless you know what you are doing don't run this script with anything #
# connected to the gpios. You will CAUSE damage. #
# #
##############################################################################
#
# WARNING!
#
import time
import pigpio
delay = 30
class gpioTest:
def __init__(self, pi, gpio, edge, freq, duty):
self.pi = pi
self.gpio = gpio
self.edge = edge
self.freq = freq
self.duty = duty
self.calls = 0
def cb(self, g, t, l):
self.calls = self.calls + 1
# print g,t,l
def num(self):
return self.calls
def start(self):
self.pi.set_PWM_frequency(self.gpio, self.freq)
self.pi.set_PWM_range(self.gpio, 25)
self.pi.set_PWM_dutycycle(self.gpio, self.duty)
self.n = self.pi.callback(self.gpio, self.edge, self.cb)
def stop(self):
self.pi.set_PWM_dutycycle(self.gpio, 0)
self.n.cancel()
pi = pigpio.pi()
t1 = gpioTest(pi, 4, pigpio.EITHER_EDGE, 4000, 1)
t2 = gpioTest(pi, 7, pigpio.RISING_EDGE, 8000, 2)
t3 = gpioTest(pi, 8, pigpio.FALLING_EDGE, 8000, 3)
t4 = gpioTest(pi, 9, pigpio.EITHER_EDGE, 4000, 4)
t5 = gpioTest(pi,10, pigpio.RISING_EDGE, 8000, 5)
t6 = gpioTest(pi,11, pigpio.FALLING_EDGE, 8000, 6)
t7 = gpioTest(pi,14, pigpio.EITHER_EDGE, 4000, 7)
t8 = gpioTest(pi,15, pigpio.RISING_EDGE, 8000, 8)
t9 = gpioTest(pi,17, pigpio.FALLING_EDGE, 8000, 9)
t10 = gpioTest(pi,18, pigpio.EITHER_EDGE, 4000, 10)
t11 = gpioTest(pi,22, pigpio.RISING_EDGE, 8000, 11)
t12 = gpioTest(pi,23, pigpio.FALLING_EDGE, 8000, 12)
t13 = gpioTest(pi,24, pigpio.EITHER_EDGE, 4000, 13)
t14 = gpioTest(pi,25, pigpio.RISING_EDGE, 8000, 14)
# R1: 0 1 4 7 8 9 10 11 14 15 17 18 21 22 23 24 25
# R2: 2 3 4 7 8 9 10 11 14 15 17 18 22 23 24 25 27 28 29 30 31
tests = [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14]
for i in tests: i.start()
time.sleep(delay)
for i in tests: i.stop()
pi.stop()
tot = 0
msg = ""
for i in tests:
tot += i.num()
msg += str(i.num()) + " "
print(msg)
print("eps={} ({}/{})".format(tot/delay, tot, delay))

View File

@@ -0,0 +1,2 @@
Class to decode a mechanical rotary encoder.

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python
import pigpio
class decoder:
"""Class to decode mechanical rotary encoder pulses."""
def __init__(self, pi, gpioA, gpioB, callback):
"""
Instantiate the class with the pi and gpios connected to
rotary encoder contacts A and B. The common contact
should be connected to ground. The callback is
called when the rotary encoder is turned. It takes
one parameter which is +1 for clockwise and -1 for
counterclockwise.
EXAMPLE
import time
import pigpio
import rotary_encoder
pos = 0
def callback(way):
global pos
pos += way
print("pos={}".format(pos))
pi = pigpio.pi()
decoder = rotary_encoder.decoder(pi, 7, 8, callback)
time.sleep(300)
decoder.cancel()
pi.stop()
"""
self.pi = pi
self.gpioA = gpioA
self.gpioB = gpioB
self.callback = callback
self.levA = 0
self.levB = 0
self.lastGpio = None
self.pi.set_mode(gpioA, pigpio.INPUT)
self.pi.set_mode(gpioB, pigpio.INPUT)
self.pi.set_pull_up_down(gpioA, pigpio.PUD_UP)
self.pi.set_pull_up_down(gpioB, pigpio.PUD_UP)
self.cbA = self.pi.callback(gpioA, pigpio.EITHER_EDGE, self._pulse)
self.cbB = self.pi.callback(gpioB, pigpio.EITHER_EDGE, self._pulse)
def _pulse(self, gpio, level, tick):
"""
Decode the rotary encoder pulse.
+---------+ +---------+ 0
| | | |
A | | | |
| | | |
+---------+ +---------+ +----- 1
+---------+ +---------+ 0
| | | |
B | | | |
| | | |
----+ +---------+ +---------+ 1
"""
if gpio == self.gpioA:
self.levA = level
else:
self.levB = level;
if gpio != self.lastGpio: # debounce
self.lastGpio = gpio
if gpio == self.gpioA and level == 1:
if self.levB == 1:
self.callback(1)
elif gpio == self.gpioB and level == 1:
if self.levA == 1:
self.callback(-1)
def cancel(self):
"""
Cancel the rotary encoder decoder.
"""
self.cbA.cancel()
self.cbB.cancel()
if __name__ == "__main__":
import time
import pigpio
import rotary_encoder
pos = 0
def callback(way):
global pos
pos += way
print("pos={}".format(pos))
pi = pigpio.pi()
decoder = rotary_encoder.decoder(pi, 7, 8, callback)
time.sleep(300)
decoder.cancel()
pi.stop()

View File

@@ -0,0 +1,28 @@
# Python Class for Reading Single Edge Nibble Transmission (SENT) using the Raspberry Pi
A full description of this Python script is described at [www.surfncircuits.com](https://surfncircuits.com) in the blog entry: [Implementing a Single Edge Nibble Transmission (SENT) protocol in Python for the Raspberry Pi Zero](https://surfncircuits.com/?p=3725)
This python library will read a Raspberry Pi GPIO pin connected. Start the pigpiod daemon with one microsecond sampling to read SENT transmissions with three microsecond tick times.
## To start the daemon on Raspberry Pi
- sudo pigpiod -s 1
## SENT packet frame summary
- Sync Pulse: 56 ticks
- 4 bit Status and Message Pulse: 17-32 ticks
- 4 bit (9:12) Data1 Field: 17-32 ticks
- 4 bit (5:8) Data1 Field: 17-32 ticks
- 4 bit (1:4) Data1 Field: 17-32 ticks
- 4 bit (9-12) Data2 Field: 17-32 ticks
- 4 bit (5-8) Data2 Field: 17-32 ticks
- 4 bit (1-4) Data2 Field: 17-32 ticks
- 4 bit CRC: 17-32 ticks
## requirements
[pigpiod](http://abyz.me.uk/rpi/pigpio/) library required
## To run the script for a signal attached to GPIO BCD 18 (pin 12)
- python3 sent_READ.py

View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
# read_PWM.py
# Public Domain by mark smith, www.surfncircuits.com
# blog:https://surfncircuits.com/2020/11/27/implementing-a-single-edge-nibble-transmission-sent-protocol-in-python-for-the-raspberry-pi-zero/
import time
import pigpio # http://abyz.co.uk/rpi/pigpio/python.html
import threading
class SENTReader:
"""
A class to read short Format SENT frames
(see the LX3302A datasheet for a SENT reference from Microchip)
(also using sent transmission mode where )
from wikiPedia: The SAE J2716 SENT (Single Edge Nibble Transmission) protocol
is a point-to-point scheme for transmitting signal values
from a sensor to a controller. It is intended to allow for
transmission of high resolution data with a low system cost.
Short sensor format:
The first is the SYNC pulse (56 ticks)
first Nibble : Status (4 bits)
2nd NIbble : DAta1 (4 bits)
3nd Nibble : Data2 (4 bits)
4th Nibble : Data3 (4 bits)
5th Nibble : Data1 (4 bits)
6th Nibble : Data2 (4 bits)
7th Nibble : Data3 (4 bits)
8th Nibble : CRC (4 bits)
"""
def __init__(self, pi, gpio, Mode = 0):
"""
Instantiate with the Pi and gpio of the SENT signal
to monitor.
SENT mode = A0: Microchip LX3302A where the two 12 bit data values are identical. there are other modes
"""
self.pi = pi
self.gpio = gpio
self.SENTMode = Mode
# the time that pulse goes high
self._high_tick = 0
# the period of the low tick
self._low_tick = 0
# the period of the pulse (total data)
self._period = 0
# the time the item was low during the period
self._low = 0
# the time the output was high during the period
self._high = 0
# setting initial value to 100
self.syncTick = 100
#keep track of the periods
self.syncWidth = 0
self.status = 0
self.data1 = 0
self.data2 = 0
self.data3 = 0
self.data4 = 0
self.data5 = 0
self.data6 = 0
self.crc = 0
#initize the sent frame . Need to use hex for data
#self.frame = [0,0,0,'0x0','0x0','0x0','0x0','0x0','0x0',0]
self.frame = [0,0,0,0,0,0,0,0,0,0]
self.syncFound = False
self.frameComplete = False
self.nibble = 0
self.numberFrames = 0
self.SampleStopped = False
self.pi.set_mode(gpio, pigpio.INPUT)
#self._cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cbf)
#sleep enougth to start reading SENT
#time.sleep(0.05)
#start thread to sample the SENT property
# this is needed for piGPIO sample of 1us and sensing the 3us
self.OutputSampleThread = threading.Thread(target = self.SampleCallBack)
self.OutputSampleThread.daemon = True
self.OutputSampleThread.start()
#give time for thread to start capturing data
time.sleep(.05)
def SampleCallBack(self):
# this will run in a loop and sample the SENT path
# this sampling is required when 1us sample rate for SENT 3us tick time
while True:
self.SampleStopped = False
self._cb = self.pi.callback(self.gpio, pigpio.EITHER_EDGE, self._cbf)
# wait until sample stopped
while self.SampleStopped == False:
#do nothing
time.sleep(.001)
# gives the callback time to cancel so we can start again.
time.sleep(0.20)
def _cbf(self, gpio, level, tick):
# depending on the system state set the tick times.
# first look for sync pulse. this is found when duty ratio >90
#print(pgio)
#print("inside _cpf")
#print(tick)
if self.syncFound == False:
if level == 1:
self._high_tick = tick
self._low = pigpio.tickDiff(self._low_tick,tick)
elif level == 0:
# this may be a syncpulse if the duty is 51/56
self._period = pigpio.tickDiff(self._low_tick,tick)
# not reset the self._low_tick
self._low_tick = tick
self._high = pigpio.tickDiff(self._high_tick,tick)
# sync pulse is detected by finding duty ratio. 51/56
# but also filter if period is > 90us*56 = 5040
if (100*self._high/self._period) > 87 and (self._period<5100):
self.syncFound = True
self.syncWidth = self._high
self.syncPeriod = self._period
#self.syncTick = round(self.syncPeriod/56.0,2)
self.syncTick = self.syncPeriod
# reset the nibble to zero
self.nibble = 0
self.SampleStopped = False
else:
# now look for the nibble information for each nibble (8 Nibbles)
if level == 1:
self._high_tick = tick
self._low = pigpio.tickDiff(self._low_tick,tick)
elif level == 0:
# This will be a data nibble
self._period = pigpio.tickDiff(self._low_tick,tick)
# not reset the self._low_tick
self._low_tick = tick
self._high = pigpio.tickDiff(self._high_tick,tick)
self.nibble = self.nibble + 1
if self.nibble == 1:
self.status = self._period
elif self.nibble == 2:
#self.data1 = hex(int(round(self._period / self.syncTick)-12))
self.data1 = self._period
elif self.nibble == 3:
self.data2 = self._period
elif self.nibble == 4:
self.data3 = self._period
elif self.nibble == 5:
self.data4 = self._period
elif self.nibble == 6:
self.data5 = self._period
elif self.nibble == 7:
self.data6 = self._period
elif self.nibble == 8:
self.crc = self._period
# now send all to the SENT Frame
self.frame = [self.syncPeriod,self.syncTick,self.status,self.data1,self.data2,self.data3,self.data4,self.data5,self.data6,self.crc]
self.syncFound = False
self.nibble = 0
self.numberFrames += 1
if self.numberFrames > 2:
self.cancel()
self.SampleStopped = True
self.numberFrames = 0
def ConvertData(self,tickdata,tickTime):
if tickdata == 0:
t = '0x0'
else:
t = hex(int(round(tickdata / tickTime)-12))
if t[0] =='-':
t='0x0'
return t
def SENTData(self):
# check that data1 = Data2 if they are not equal return fault = True
# will check the CRC code for faults. if fault, return = true
# returns status, data1, data2, crc, fault
#self._cb = self.pi.callback(self.gpio, pigpio.EITHER_EDGE, self._cbf)
#time.sleep(0.1)
fault = False
SentFrame = self.frame[:]
SENTTick = round(SentFrame[1]/56.0,2)
# the greatest SYNC sync is 90us. So trip a fault if this occurs
if SENTTick > 90:
fault = True
#print(SentFrame)
# convert SentFrame to HEX Format including the status and Crc bits
for x in range (2,10):
SentFrame[x] = self.ConvertData(SentFrame[x],SENTTick)
SENTCrc = SentFrame[9]
SENTStatus = SentFrame[2]
SENTPeriod = SentFrame[0]
#print(SentFrame)
# combine the datafield nibbles
datanibble = '0x'
datanibble2 = '0x'
for x in range (3,6):
datanibble = datanibble + str((SentFrame[x]))[2:]
for x in range (6,9):
datanibble2 = datanibble2 + str((SentFrame[x]))[2:]
# if using SENT mode 0, then data nibbles should be equal
#if self.SENTMode == 0 :
# if datanibble != datanibble2:
# fault = True
# if datanibble or datanibble2 == 0 then fault = true
if (int(datanibble,16) == 0) or (int(datanibble2,16) ==0):
fault = True
# if datanibble or datanibble2 > FFF (4096) then fault = True
if ( (int(datanibble,16) > 0xFFF) or (int(datanibble2,16) > 0xFFF)):
fault = True
#print(datanibble)
# CRC checking
# converting the datanibble values to a binary bit string.
# remove the first two characters. Not needed for crcCheck
InputBitString = bin(int((datanibble + datanibble2[2:]),16))[2:]
# converting Crcvalue to bin but remove the first two characters 0b
# format is set to remove the leading 0b, 4 charactors long
crcBitValue = format(int(str(SENTCrc),16),'04b')
#checking the crcValue
# polybitstring is 1*X^4+1*X^3+1*x^2+0*X+1 = '11101'
if self.crcCheck(InputBitString,'11101',crcBitValue) == False:
fault = True
# converter to decimnal
returnData = int(datanibble,16)
returnData2 = int(datanibble2,16)
#returns both Data values and if there is a FAULT
return (SENTStatus, returnData, returnData2,SENTTick, SENTCrc, fault, SENTPeriod)
def tick(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return ticktime
def crcNibble(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return crc
def dataField1(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return data1
def dataField2(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return data2
def statusNibble(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return status
def syncPulse(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return syncPulse
def errorFrame(self):
status, data1, data2, ticktime, crc, errors, syncPulse = self.SENTData()
return errors
def cancel(self):
self._cb.cancel()
def stop(self):
self.OutputSampleThread.stop()
def crcCheck(self, InputBitString, PolyBitString, crcValue ):
# the input string will be a binary string all 6 nibbles of the SENT data
# the seed value ( = '0101) is appended to the input string. Do not use zeros for SENT protocal
# this uses the SENT CRC recommended implementation.
checkOK = False
LenPolyBitString = len(PolyBitString)
PolyBitString = PolyBitString.lstrip('0')
LenInput = len(InputBitString)
InputPaddedArray = list(InputBitString + '0101')
while '1' in InputPaddedArray[:LenInput]:
cur_shift = InputPaddedArray.index('1')
for i in range(len(PolyBitString)):
InputPaddedArray[cur_shift + i] = str(int(PolyBitString[i] != InputPaddedArray[cur_shift + i]))
if (InputPaddedArray[LenInput:] == list(crcValue)):
checkOK = True
return checkOK
if __name__ == "__main__":
import time
import pigpio
import read_SENT
SENT_GPIO = 18
RUN_TIME = 6000000000.0
SAMPLE_TIME = 0.1
pi = pigpio.pi()
p = read_SENT.SENTReader(pi, SENT_GPIO)
start = time.time()
while (time.time() - start) < RUN_TIME:
time.sleep(SAMPLE_TIME)
status, data1, data2, ticktime, crc, errors, syncPulse = p.SENTData()
print("Sent Status= %s - 12-bit DATA 1= %4.0f - DATA 2= %4.0f - tickTime(uS)= %4.2f - CRC= %s - Errors= %s - PERIOD = %s" % (status,data1,data2,ticktime,crc,errors,syncPulse))
print("Sent Stat2s= %s - 12-bit DATA 1= %4.0f - DATA 2= %4.0f - tickTime(uS)= %4.2f - CRC= %s - Errors= %s - PERIOD = %s" % (p.statusNibble(),p.dataField1(),p.dataField2(),p.tick(),p.crcNibble(),p.errorFrame(),p.syncPulse()))
# stop the thread in SENTReader
p.stop()
# clear the pi object instance
pi.stop()

View File

@@ -0,0 +1,2 @@
Class to read sonar rangers with separate trigger and echo pins.

View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python
import time
import pigpio
class ranger:
"""
This class encapsulates a type of acoustic ranger. In particular
the type of ranger with separate trigger and echo pins.
A pulse on the trigger initiates the sonar ping and shortly
afterwards a sonar pulse is transmitted and the echo pin
goes high. The echo pins stays high until a sonar echo is
received (or the response times-out). The time between
the high and low edges indicates the sonar round trip time.
"""
def __init__(self, pi, trigger, echo):
"""
The class is instantiated with the Pi to use and the
gpios connected to the trigger and echo pins.
"""
self.pi = pi
self._trig = trigger
self._echo = echo
self._ping = False
self._high = None
self._time = None
self._triggered = False
self._trig_mode = pi.get_mode(self._trig)
self._echo_mode = pi.get_mode(self._echo)
pi.set_mode(self._trig, pigpio.OUTPUT)
pi.set_mode(self._echo, pigpio.INPUT)
self._cb = pi.callback(self._trig, pigpio.EITHER_EDGE, self._cbf)
self._cb = pi.callback(self._echo, pigpio.EITHER_EDGE, self._cbf)
self._inited = True
def _cbf(self, gpio, level, tick):
if gpio == self._trig:
if level == 0: # trigger sent
self._triggered = True
self._high = None
else:
if self._triggered:
if level == 1:
self._high = tick
else:
if self._high is not None:
self._time = tick - self._high
self._high = None
self._ping = True
def read(self):
"""
Triggers a reading. The returned reading is the number
of microseconds for the sonar round-trip.
round trip cms = round trip time / 1000000.0 * 34030
"""
if self._inited:
self._ping = False
self.pi.gpio_trigger(self._trig)
start = time.time()
while not self._ping:
if (time.time()-start) > 5.0:
return 20000
time.sleep(0.001)
return self._time
else:
return None
def cancel(self):
"""
Cancels the ranger and returns the gpios to their
original mode.
"""
if self._inited:
self._inited = False
self._cb.cancel()
self.pi.set_mode(self._trig, self._trig_mode)
self.pi.set_mode(self._echo, self._echo_mode)
if __name__ == "__main__":
import time
import pigpio
import sonar_trigger_echo
pi = pigpio.pi()
sonar = sonar_trigger_echo.ranger(pi, 23, 18)
end = time.time() + 600.0
r = 1
while time.time() < end:
print("{} {}".format(r, sonar.read()))
r += 1
time.sleep(0.03)
sonar.cancel()
pi.stop()

View File

@@ -0,0 +1,2 @@
Class to send and receive radio messages compatible with the Virtual Wire library for Arduinos.

View File

@@ -0,0 +1,372 @@
#!/usr/bin/env python
"""
This module provides a 313MHz/434MHz radio interface compatible
with the Virtual Wire library used on Arduinos.
It has been tested between a Pi, TI Launchpad, and Arduino Pro Mini.
"""
# 2014-08-14
# vw.py
import time
import pigpio
MAX_MESSAGE_BYTES=77
MIN_BPS=50
MAX_BPS=10000
_HEADER=[0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x2a, 0x38, 0x2c]
_CTL=3
_SYMBOL=[
0x0d, 0x0e, 0x13, 0x15, 0x16, 0x19, 0x1a, 0x1c,
0x23, 0x25, 0x26, 0x29, 0x2a, 0x2c, 0x32, 0x34]
def _sym2nibble(symbol):
for nibble in range(16):
if symbol == _SYMBOL[nibble]:
return nibble
return 0
def _crc_ccitt_update(crc, data):
data = data ^ (crc & 0xFF);
data = (data ^ (data << 4)) & 0xFF;
return (
(((data << 8) & 0xFFFF) | (crc >> 8)) ^
((data >> 4) & 0x00FF) ^ ((data << 3) & 0xFFFF)
)
class tx():
def __init__(self, pi, txgpio, bps=2000):
"""
Instantiate a transmitter with the Pi, the transmit gpio,
and the bits per second (bps). The bps defaults to 2000.
The bps is constrained to be within MIN_BPS to MAX_BPS.
"""
self.pi = pi
self.txbit = (1<<txgpio)
if bps < MIN_BPS:
bps = MIN_BPS
elif bps > MAX_BPS:
bps = MAX_BPS
self.mics = int(1000000 / bps)
self.wave_id = None
pi.wave_add_new()
pi.set_mode(txgpio, pigpio.OUTPUT)
def _nibble(self, nibble):
for i in range(6):
if nibble & (1<<i):
self.wf.append(pigpio.pulse(self.txbit, 0, self.mics))
else:
self.wf.append(pigpio.pulse(0, self.txbit, self.mics))
def _byte(self, crc, byte):
self._nibble(_SYMBOL[byte>>4])
self._nibble(_SYMBOL[byte&0x0F])
return _crc_ccitt_update(crc, byte)
def put(self, data):
"""
Transmit a message. If the message is more than
MAX_MESSAGE_BYTES in size it is discarded. If a message
is currently being transmitted it is aborted and replaced
with the new message. True is returned if message
transmission has successfully started. False indicates
an error.
"""
if len(data) > MAX_MESSAGE_BYTES:
return False
self.wf = []
self.cancel()
for i in _HEADER:
self._nibble(i)
crc = self._byte(0xFFFF, len(data)+_CTL)
for i in data:
if type(i) == type(""):
v = ord(i)
else:
v = i
crc = self._byte(crc, v)
crc = ~crc
self._byte(0, crc&0xFF)
self._byte(0, crc>>8)
self.pi.wave_add_generic(self.wf)
self.wave_id = self.pi.wave_create()
if self.wave_id >= 0:
self.pi.wave_send_once(self.wave_id)
return True
else:
return False
def ready(self):
"""
Returns True if a new message may be transmitted.
"""
return not self.pi.wave_tx_busy()
def cancel(self):
"""
Cancels the wireless transmitter, aborting any message
in progress.
"""
if self.wave_id is not None:
self.pi.wave_tx_stop()
self.pi.wave_delete(self.wave_id)
self.pi.wave_add_new()
self.wave_id = None
class rx():
def __init__(self, pi, rxgpio, bps=2000):
"""
Instantiate a receiver with the Pi, the receive gpio, and
the bits per second (bps). The bps defaults to 2000.
The bps is constrained to be within MIN_BPS to MAX_BPS.
"""
self.pi = pi
self.rxgpio = rxgpio
self.messages = []
self.bad_CRC = 0
if bps < MIN_BPS:
bps = MIN_BPS
elif bps > MAX_BPS:
bps = MAX_BPS
slack = 0.20
self.mics = int(1000000 / bps)
slack_mics = int(slack * self.mics)
self.min_mics = self.mics - slack_mics # Shortest legal edge.
self.max_mics = (self.mics + slack_mics) * 4 # Longest legal edge.
self.timeout = 8 * self.mics / 1000 # 8 bits time in ms.
if self.timeout < 8:
self.timeout = 8
self.last_tick = None
self.good = 0
self.bits = 0
self.token = 0
self.in_message = False
self.message = [0]*(MAX_MESSAGE_BYTES+_CTL)
self.message_len = 0
self.byte = 0
pi.set_mode(rxgpio, pigpio.INPUT)
self.cb = pi.callback(rxgpio, pigpio.EITHER_EDGE, self._cb)
def _calc_crc(self):
crc = 0xFFFF
for i in range(self.message_length):
crc = _crc_ccitt_update(crc, self.message[i])
return crc
def _insert(self, bits, level):
for i in range(bits):
self.token >>= 1
if level == 0:
self.token |= 0x800
if self.in_message:
self.bits += 1
if self.bits >= 12: # Complete token.
byte = (
_sym2nibble(self.token & 0x3f) << 4 |
_sym2nibble(self.token >> 6))
if self.byte == 0:
self.message_length = byte
if byte > (MAX_MESSAGE_BYTES+_CTL):
self.in_message = False # Abort message.
return
self.message[self.byte] = byte
self.byte += 1
self.bits = 0
if self.byte >= self.message_length:
self.in_message = False
self.pi.set_watchdog(self.rxgpio, 0)
crc = self._calc_crc()
if crc == 0xF0B8: # Valid CRC.
self.messages.append(
self.message[1:self.message_length-2])
else:
self.bad_CRC += 1
else:
if self.token == 0xB38: # Start message token.
self.in_message = True
self.pi.set_watchdog(self.rxgpio, self.timeout)
self.bits = 0
self.byte = 0
def _cb(self, gpio, level, tick):
if self.last_tick is not None:
if level == pigpio.TIMEOUT:
self.pi.set_watchdog(self.rxgpio, 0) # Switch watchdog off.
if self.in_message:
self._insert(4, not self.last_level)
self.good = 0
self.in_message = False
else:
edge = pigpio.tickDiff(self.last_tick, tick)
if edge < self.min_mics:
self.good = 0
self.in_message = False
elif edge > self.max_mics:
if self.in_message:
self._insert(4, level)
self.good = 0
self.in_message = False
else:
self.good += 1
if self.good > 8:
bitlen = (100 * edge) / self.mics
if bitlen < 140:
bits = 1
elif bitlen < 240:
bits = 2
elif bitlen < 340:
bits = 3
else:
bits = 4
self._insert(bits, level)
self.last_tick = tick
self.last_level = level
def get(self):
"""
Returns the next unread message, or None if none is avaiable.
"""
if len(self.messages):
return self.messages.pop(0)
else:
return None
def ready(self):
"""
Returns True if there is a message available to be read.
"""
return len(self.messages)
def cancel(self):
"""
Cancels the wireless receiver.
"""
if self.cb is not None:
self.cb.cancel()
self.pi.set_watchdog(self.rxgpio, 0)
self.cb = None
if __name__ == "__main__":
import time
import pigpio
import vw
RX=11
TX=25
BPS=2000
pi = pigpio.pi() # Connect to local Pi.
rx = vw.rx(pi, RX, BPS) # Specify Pi, rx gpio, and baud.
tx = vw.tx(pi, TX, BPS) # Specify Pi, tx gpio, and baud.
msg = 0
start = time.time()
while (time.time()-start) < 300:
msg += 1
while not tx.ready():
time.sleep(0.1)
time.sleep(0.2)
tx.put([48, 49, 65, ((msg>>6)&0x3F)+32, (msg&0x3F)+32])
while not tx.ready():
time.sleep(0.1)
time.sleep(0.2)
tx.put("Hello World #{}!".format(msg))
while rx.ready():
print("".join(chr (c) for c in rx.get()))
rx.cancel()
tx.cancel()
pi.stop()

View File

@@ -0,0 +1,2 @@
Class to decode a Wiegand code.

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python
import pigpio
class decoder:
"""
A class to read Wiegand codes of an arbitrary length.
The code length and value are returned.
EXAMPLE
#!/usr/bin/env python
import time
import pigpio
import wiegand
def callback(bits, code):
print("bits={} code={}".format(bits, code))
pi = pigpio.pi()
w = wiegand.decoder(pi, 14, 15, callback)
time.sleep(300)
w.cancel()
pi.stop()
"""
def __init__(self, pi, gpio_0, gpio_1, callback, bit_timeout=5):
"""
Instantiate with the pi, gpio for 0 (green wire), the gpio for 1
(white wire), the callback function, and the bit timeout in
milliseconds which indicates the end of a code.
The callback is passed the code length in bits and the value.
"""
self.pi = pi
self.gpio_0 = gpio_0
self.gpio_1 = gpio_1
self.callback = callback
self.bit_timeout = bit_timeout
self.in_code = False
self.pi.set_mode(gpio_0, pigpio.INPUT)
self.pi.set_mode(gpio_1, pigpio.INPUT)
self.pi.set_pull_up_down(gpio_0, pigpio.PUD_UP)
self.pi.set_pull_up_down(gpio_1, pigpio.PUD_UP)
self.cb_0 = self.pi.callback(gpio_0, pigpio.FALLING_EDGE, self._cb)
self.cb_1 = self.pi.callback(gpio_1, pigpio.FALLING_EDGE, self._cb)
def _cb(self, gpio, level, tick):
"""
Accumulate bits until both gpios 0 and 1 timeout.
"""
if level < pigpio.TIMEOUT:
if self.in_code == False:
self.bits = 1
self.num = 0
self.in_code = True
self.code_timeout = 0
self.pi.set_watchdog(self.gpio_0, self.bit_timeout)
self.pi.set_watchdog(self.gpio_1, self.bit_timeout)
else:
self.bits += 1
self.num = self.num << 1
if gpio == self.gpio_0:
self.code_timeout = self.code_timeout & 2 # clear gpio 0 timeout
else:
self.code_timeout = self.code_timeout & 1 # clear gpio 1 timeout
self.num = self.num | 1
else:
if self.in_code:
if gpio == self.gpio_0:
self.code_timeout = self.code_timeout | 1 # timeout gpio 0
else:
self.code_timeout = self.code_timeout | 2 # timeout gpio 1
if self.code_timeout == 3: # both gpios timed out
self.pi.set_watchdog(self.gpio_0, 0)
self.pi.set_watchdog(self.gpio_1, 0)
self.in_code = False
self.callback(self.bits, self.num)
def cancel(self):
"""
Cancel the Wiegand decoder.
"""
self.cb_0.cancel()
self.cb_1.cancel()
if __name__ == "__main__":
import time
import pigpio
import wiegand
def callback(bits, value):
print("bits={} value={}".format(bits, value))
pi = pigpio.pi()
w = wiegand.decoder(pi, 14, 15, callback)
time.sleep(300)
w.cancel()
pi.stop()