power: add RF transmitter support
Signed-off: Daniel Bauer <github@dbauer.me>
This commit is contained in:
@@ -12,6 +12,7 @@ import json
|
||||
import struct
|
||||
import socket
|
||||
import asyncio
|
||||
import time
|
||||
from tornado.iostream import IOStream
|
||||
from tornado.httpclient import AsyncHTTPClient
|
||||
from tornado.escape import json_decode
|
||||
@@ -68,7 +69,8 @@ class PrinterPower:
|
||||
"shelly": Shelly,
|
||||
"homeseer": HomeSeer,
|
||||
"homeassistant": HomeAssistant,
|
||||
"loxonev1": Loxonev1
|
||||
"loxonev1": Loxonev1,
|
||||
"rf": RFDevice
|
||||
}
|
||||
try:
|
||||
for section in prefix_sections:
|
||||
@@ -79,7 +81,7 @@ class PrinterPower:
|
||||
if dev_class is None:
|
||||
raise config.error(f"Unsupported Device Type: {dev_type}")
|
||||
dev = dev_class(cfg)
|
||||
if isinstance(dev, GpioDevice):
|
||||
if isinstance(dev, GpioDevice) or isinstance(dev, RFDevice):
|
||||
if not HAS_GPIOD:
|
||||
continue
|
||||
dev.configure_line(cfg, self.chip_factory)
|
||||
@@ -470,6 +472,53 @@ class GpioDevice(PowerDevice):
|
||||
def close(self) -> None:
|
||||
self.line.release()
|
||||
|
||||
class RFDevice(GpioDevice):
|
||||
|
||||
# Protocol definition
|
||||
# [1, 3] means HIGH is set for 1x pulse_len and LOW for 3x pulse_len
|
||||
ZERO_BIT = [1, 3] # zero bit
|
||||
ONE_BIT = [3, 1] # one bit
|
||||
SYNC_BIT = [1, 31] # sync between
|
||||
PULSE_LEN = 0.00035 # length of a single pulse
|
||||
RETRIES = 10 # send the code this many times
|
||||
|
||||
def __init__(self, config: ConfigHelper):
|
||||
super().__init__(config)
|
||||
self.on = config.get("on_code").zfill(24)
|
||||
self.off = config.get("off_code").zfill(24)
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.set_power("on" if self.initial_state else "off")
|
||||
|
||||
def _transmit_digit(self, waveform) -> None:
|
||||
self.line.set_value(1)
|
||||
time.sleep(waveform[0]*RFDevice.PULSE_LEN)
|
||||
self.line.set_value(0)
|
||||
time.sleep(waveform[1]*RFDevice.PULSE_LEN)
|
||||
|
||||
def _transmit_code(self, code) -> None:
|
||||
for _ in range(RFDevice.RETRIES):
|
||||
for i in code:
|
||||
if i == "1":
|
||||
self._transmit_digit(RFDevice.ONE_BIT)
|
||||
elif i == "0":
|
||||
self._transmit_digit(RFDevice.ZERO_BIT)
|
||||
self._transmit_digit(RFDevice.SYNC_BIT)
|
||||
|
||||
def set_power(self, state) -> None:
|
||||
try:
|
||||
if state == "on":
|
||||
code = self.on
|
||||
else:
|
||||
code = self.off
|
||||
self._transmit_code(code)
|
||||
except Exception:
|
||||
self.state = "error"
|
||||
msg = f"Error Toggling Device Power: {self.name}"
|
||||
logging.exception(msg)
|
||||
raise self.server.error(msg) from None
|
||||
self.state = state
|
||||
|
||||
|
||||
# This implementation based off the work tplink_smartplug
|
||||
# script by Lubomir Stroetmann available at:
|
||||
|
Reference in New Issue
Block a user