import socket
import struct
import logging
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from homeassistant.components.fan import FanEntity, FanEntityFeature
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
# import custom_components.broan as broan
from typing import Any, final
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
ENTITYID = "fan." + DOMAIN
CONF_HOST = "host"
CONF_PORT = "port"
CONF_ADDRESS = "address"
# SPEED_MAPPING = {
# 0: SPEED_OFF,
# 1: SPEED_LOW,
# 2: SPEED_MEDIUM,
# 3: SPEED_HIGH
# }
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.positive_int,
vol.Optional(CONF_ADDRESS, default="01"): cv.string,
}),
},
extra=vol.ALLOW_EXTRA)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_devices,
discovery_info = None
) -> None:
"""Set up the sensor platform."""
# We only want this platform to be set up via discovery.
# add_entities([ExampleSensor()])
dev = []
_LOGGER.info("setup platform")
# broan = hass.data["broan"]
dev.append(BroanFan(hass, config))
add_devices(dev, True)
class BroanFan(FanEntity):
def __init__(self, hass, config):
br = hass.data[DOMAIN]
self._hass = hass
self._config = config
# self.broan = broan
#self._status = None
self._state = "off"
self._percentage = 33
#self._direction = None
# self._mode = None
self._name = "broan485"
self._attr_supported_features = FanEntityFeature.SET_SPEED # Enable set_speed
self.host = br.get(CONF_HOST)
self.port = br.get(CONF_PORT)
self.start_flag = "AA"
self.address = br.get(CONF_ADDRESS)
self.length = "05"
self.cmd = "01" # 01: write, 02: read
self.data_addr = "01" # 01: control data, 00: sensor
self.end_flag = "66"
# sensor
self.temperature_inside = None
self.temperature_outside = None
self.humidity = None
self.co2 = None
self.pm25 = None
self.air_quality = None
self.alert = None
self.mode = None # auto, bypass, ...
self.speed_low = "AA01050101808866"
self.speed_medium = "AA01050101818966"
self.speed_high = "AA01050101828A66"
self.turn_off_cmd = "AA01050101000866"
self.bypass = "AA01050101838B66"
self.auto_comfort = "AA01050101848c66"
self.auto_intelligent = "AA01050101858D66"
self.read_all_data = "AA010502000a1266"
# # self._state = None
# self.Host_Id = "02"
# self.Mode = None
# # self.M1_speed = None
# # self.M2_speed = None
# self.New_Opt = "5a"
# self.End_Flag = "f5"
# self.Strat_Flag = "aa"
# self.Temper = None
# self.Error_Code = None
self.async_search()
hass.loop.create_task( self.async_search() )
@property
def name(self):
"""Return the name of the fan."""
return self._name
def turn_off(self, **kwargs):
self._percentage = 0
data = "00"
#cmd = self.make_cmd(self.cmd, self.data_addr, data)
#response = self.send_cmd(cmd)
response = self.send_cmd(self.turn_off_cmd)
_LOGGER.info("Turn off: " + str(response))
def send_cmd(self, cmd):
try:
_LOGGER.info("send cmd hex: " + str(cmd))
cmd = bytes.fromhex(cmd)
host = socket.gethostbyname(self.host)
port = (host, self.port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20)
s.connect(port)
s.send(cmd)
_LOGGER.info("send cmd: " + str(cmd))
message = s.recv(1024)
s.settimeout(None)
message = ''.join(['%02x' % b for b in message])
s.close()
_LOGGER.info("send cmd recv: " + str(message))
return message
except Exception as e:
_LOGGER.info(e)
return e
def make_cmd(self, command, data_addr, data):
# if new_opt is None:
# checksum = bytes.fromhex(self.address + self.Host_Id + mode + m1_speed + m2_speed + self.New_Opt)
# a = checksum[1:]
# b = "%x"%sum(a)
# cmd = bytes.fromhex(self.Strat_Flag + self.address + self.Host_Id + mode + m1_speed + m2_speed + self.New_Opt + str(b) + self.End_Flag)
# else:
checksum = bytes.fromhex(self.address + self.length + command + data_addr + data)
a = checksum[1:]
b = "%x"%sum(a)
_LOGGER.info("check bits: " + str(b))
cmd = bytes.fromhex(self.start_flag + self.address + self.length + command + data_addr + data + str(b) + self.end_flag)
return cmd
async def async_search(self):
command = "02"
data_addr = "00"
data = "0A"
cmd = self.make_cmd(command, data_addr, data)
response = self.send_cmd(cmd)
_LOGGER.info("async_search: " + str(response))
if not str(response).startswith("aa"):
return
air_quality = response[10:11]
mode = response[12:13]
alert = response[14:15]
temperature_inside = response[16:17]
humidity = response[18:19]
temperature_outside = response[20:21]
co2 = response[22:25]
#voc = response[26:28]
pm25 = response[28:31]
#formic_acid = response[32:34]
if air_quality == "00":
self.air_quality = "Great"
elif air_quality == "01":
self.air_quality = "Good"
elif air_quality == "02":
self.air_quality = "Bad"
if mode == '00':
self._percentage = 0
self.mode = "off"
elif mode == '80':
self._percentage = 33
self.mode = "Low"
elif mode == '81':
self._percentage = 66
self.mode = "Medium"
elif mode == '82':
self._percentage = 100
self.mode = "High"
elif mode == '83':
self.mode = "bypass"
elif mode == '84':
self.mode = "Auto-Comportable"
elif mode == '85':
self.mode = "Auto-intelligent"
else:
_LOGGER.warning("unknown speed: " + str(speed))
if alert == "00":
self.alert = "No Alert"
elif alert == "01":
self.alert = "交换芯维护"
elif alert == "02":
self.alert = "Filter Unit Clean"
elif alert == "03":
self.alert = "交换芯/Filter Clean"
else:
_LOGGER.warning("unknown alert: " + str(alert))
temperature_inside = bin(int(temperature_inside, 16))[2:].rjust(8, "0")
if temperature_inside.startswith("0"):
self.temperature_inside = int(temperature_inside, 2)
else:
self.temperature_inside = -int(temperature_inside[1:], 2)
self.humidity = int(humidity, 16)
temperature_outside = bin(int(temperature_outside, 16))[2:].rjust(8, "0")
if temperature_outside.startswith("0"):
self.temperature_outside = int(temperature_outside, 2)
else:
self.temperature_outside = -int(temperature_outside[1:], 2)
co2 = co2[2:] + co2[:2]
self.co2 = int(co2, 16)
#self.voc = int(voc, 16)
pm25 = pm25[2:] + pm25[:2]
self.pm25 = int(pm25, 16)
#self.formic_acid = int(formic_acid, 16)
_LOGGER.info("air_quality: " + str(self.air_quality) +
" mode: " + str(self.mode) +
" alert: " + str(self.alert) +
" temperature_inside: " + str(self.temperature_inside) +
" temperature_outside: " + str(self.temperature_outside) +
" humidity: " + str(self.humidity) +
" CO2: " + str(self.co2) +
" PM2.5: " + str(self.pm25))
async def async_update(self) -> None:
self._state = await self.async_search()
def set_percentage(self, percentage: int) -> None:
"""Set the speed of the fan, as a percentage."""
if percentage == 0:
self.turn_off()
elif percentage <= 33:
self._percentage = 33
data = "80"
cmd = self.make_cmd(self.cmd, self.data_addr, data)
#response = self.send_cmd(cmd)
response = self.send_cmd(self.speed_low)
_LOGGER.info("Set speed: " + str(response))
elif percentage <= 66:
self._percentage = 66
data = "81"
cmd = self.make_cmd(self.cmd, self.data_addr, data)
#response = self.send_cmd(cmd)
response = self.send_cmd(self.speed_medium)
_LOGGER.info("Set speed: " + str(response))
else:
self._percentage = 100
data = "82"
cmd = self.make_cmd(self.cmd, self.data_addr, data)
#response = self.send_cmd(cmd)
response = self.send_cmd(self.speed_high)
_LOGGER.info("Set speed: " + str(response))
@property
def percentage(self) -> int | None:
"""Return the current percentage."""
return self._percentage
def turn_on(
self,
percentage: int | None = None,
preset_mode: str | None = None,
**kwargs
) -> None:
"""Turn on the fan."""
self.set_percentage(33)
@property
async def is_on(self) -> bool | None:
"""Return true if the entity is on."""
await self.async_search()
return (
self._percentage is not None and self._percentage > 0
)
@final
@property
def state_attributes(self) -> dict[str, float | str | None]:
"""Return optional _state attributes."""
data: dict[str, float | str | None] = {}
data = {"_state": self._state,
"mode": self.mode,
"air_quality": self.air_quality,
"percentage": self._percentage,
"alert": self.alert,
"temperature_inside": self.temperature_inside,
"humidity": self.humidity,
"temperature_outside": self.temperature_outside,
"co2": self.co2,
"pm2.5": self.pm25}
return data