|
本帖最后由 furong600 于 2021-8-17 16:59 编辑
家里装了新风,可以通过RS485总线控制。
所以想参考大神的新风插件,依葫芦画瓢改改。(大神新风插件:https://bbs.hassbian.com/thread-5091-1-1.html)
但集成一直报错:Component error: jing_yang - Integration 'jing_yang' not found.
请小伙伴们帮忙看下什么原因,多谢多谢。
configuration.yml:
jing_yang:
host: 192.168.19.26 ## 设备的ip地址,支持域名
port: 9999 ## 设备的端口号
custom_components/jing_yang/manifest.json
{
"domain": "jing_yang",
"name": "Fresh Air - JingYanng",
"documentation": "https://www.example.com",
"dependencies": [],
"codeowners": [],
"requirements": [],
"version": "1.0"
}
custom_components/jing_yang/__init__.py
import socket
import struct
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import logging
from homeassistant.components.fan import SPEED_OFF, SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH
from homeassistant.helpers.discovery import load_platform
DOMAIN = "jing_yang"
CONF_HOST = "host"
CONF_PORT = "port"
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.positive_int,
}),
},
extra=vol.ALLOW_EXTRA)
def setup(hass, config):
hass.data["jing_yang"] = JingYang(hass, config[DOMAIN])
jy = hass.data["jing_yang"]
load_platform(hass, 'fan', DOMAIN, {}, config)
jy.search()
return True
class JingYang:
def __init__(self, hass, domain_config):
self._hass = hass
self.config = domain_config
self.host = domain_config[CONF_HOST]
self.port = domain_config[CONF_PORT]
self.state = None
self.mode = None
# air_quality [00:Great, 01:Good, 02:Bad]
self.air_quality = None
# speed [80:low, 81:medium, 82:high, 83:bypass, 84:auto_comfort, 85:auto_intelligent, 00ff]
self.speed = None
# alert state [00:no alert, 01:交换芯提醒, 02:过滤部件清洁更换提醒, 03:两者都有]
self.alert = None
self.temperature_inside = None
self.humidity = None
self.temperature_outside = None
self.co2 = None
#self.voc = None
self.pm25 = None
#self.formic_acid = None # 甲醛
self.speed_low = "AA01050101808866"
self.speed_medium = "AA01050101818966"
self.speed_high = "AA01050101828A66"
self.turn_off_cmd = "AA01050101000866"
self.bypass = "AA01050101838B66"
self.auto_comfort = "AA01050101848c66"
self.auto_intelligent = "AA01050101858D66"
self.read_all_data = "aa010502000a1266"
def send_cmd(self, cmd):
try:
_LOGGER.info("send cmd hex: " + str(cmd))
cmd = bytes.fromhex(cmd)
host = socket.gethostbyname(self.host)
port = (host, self.port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(20)
s.connect(port)
s.send(cmd)
_LOGGER.info("send cmd: " + str(cmd))
message = s.recv(1024)
s.settimeout(None)
message = ''.join(['%02x' % b for b in message])
s.close()
_LOGGER.info("send cmd recv: " + str(message))
return message
except Exception as e:
_LOGGER.info(e)
return e
def turn_on(self, mode, speed):
self.state = "on"
self.speed = speed
if speed == SPEED_LOW:
cmd = self.speed_low
elif speed == SPEED_MEDIUM:
cmd = self.speed_medium
elif speed == SPEED_HIGH:
cmd = self.speed_high
else:
cmd = self.speed_low
self.speed = SPEED_LOW
_LOGGER.info("turn on speed of unknown: " + str(speed))
response = self.send_cmd(cmd)
_LOGGER.info("components turn on: " + str(response))
def turn_off(self):
self.state = "off"
self.speed = SPEED_OFF
response = self.send_cmd(self.turn_off_cmd)
_LOGGER.info("components turn off: " + str(response))
def set_mode(self, mode):
# forward 排风 reverse 换气
if self.mode is None:
self.search()
if mode == "bypass":
self.mode = "bypass"
cmd = self.bypass
else:
return
res = self.send_cmd(cmd)
_LOGGER.info("set mode in jing_yang.py: " + str(res))
def set_speed(self, speed_set):
_LOGGER.info("components set speed: " + str(speed_set))
if speed_set == SPEED_LOW:
cmd = self.speed_low
self.speed = SPEED_LOW
elif speed_set == SPEED_MEDIUM:
cmd = self.speed_medium
self.speed = SPEED_MEDIUM
elif speed_set == SPEED_HIGH:
cmd = self.speed_high
self.speed = SPEED_HIGH
elif speed_set == SPEED_OFF:
cmd = self.turn_off_cmd
self.speed = SPEED_OFF
else:
cmd = self.speed_low
self.speed = SPEED_LOW
_LOGGER.warning("set speed of unknown: " + str(speed_set))
res = self.send_cmd(cmd)
_LOGGER.info("set speed in jing_yang.py: " + str(res))
def search(self):
response = self.send_cmd(self.read_all_data)
_LOGGER.info("search: " + str(response))
if not str(response).startswith("aa"):
return
air_quality_temp = response[10:12]
state_temp = response[12]
speed_temp = response[13]
mode_temp = response[14]
alert_temp = response[15]
temperature_inside_temp = response[16:18]
humidity_temp = response[18:20]
temperature_outside_temp = response[20:22]
co2_temp = response[22:26]
#voc_temp = response[26:28]
pm25_temp = response[28:32]
#formic_acid_temp = response[32:34]
if air_quality_temp == "00":
self.air_quality = "Great"
elif air_quality_temp == "01":
self.air_quality = "Good"
elif air_quality_temp == "02":
self.air_quality = "Bad"
else:
self.air_quality = self.air_quality
if speed_temp == '0':
self.speed = SPEED_LOW
elif speed_temp == '1':
self.speed = SPEED_MEDIUM
elif speed_temp == '2':
self.speed = SPEED_HIGH
elif speed_temp == '3':
self.speed = "bypass"
elif speed_temp == '4':
self.speed = "Auto-Comportable"
elif speed_temp == '5':
self.speed = "Auto-intelligent"
else:
_LOGGER.warning("unknown speed: " + str(speed_temp))
if state_temp == '0':
self.state = "off"
elif state_temp == '8':
self.state = "on"
else:
_LOGGER.warning("unknown state: " + str(state_temp))
if mode_temp == '0':
self.mode = "standard"
elif mode_temp == '8':
self.mode = "bypass"
else:
_LOGGER.warning("unknown state: " + str(mode_temp))
if alert_temp == "0":
self.alert = "No Alert"
elif alert_temp == "1":
self.alert = "交换芯维护"
elif alert_temp == "2":
self.alert = "Filter Unit Clean"
elif alert_temp == "3":
self.alert = "交换芯/Filter Clean"
else:
_LOGGER.warning("unknown alert: " + str(alert_temp))
temperature_inside_temp = bin(int(temperature_inside_temp, 16))[2:].rjust(8, "0")
if temperature_inside_temp.startswith("0"):
self.temperature_inside = int(temperature_inside_temp, 2)
else:
self.temperature_inside = -int(temperature_inside_temp[1:], 2)
self.humidity = int(humidity_temp, 16)
temperature_outside_temp = bin(int(temperature_outside_temp, 16))[2:].rjust(8, "0")
if temperature_outside_temp.startswith("0"):
self.temperature_outside = int(temperature_outside_temp, 2)
else:
self.temperature_outside = -int(temperature_outside_temp[1:], 2)
co2_temp = co2_temp[2:] + co2_temp[:2]
self.co2 = int(co2_temp, 16)
#self.voc = int(voc_temp, 16)
pm25_temp = pm25_temp[2:] + pm25_temp[:2]
self.pm25 = int(pm25_temp, 16)
#self.formic_acid = int(formic_acid_temp, 16)
_LOGGER.info("air_quality: " + str(self.air_quality) +
" speed: " + str(self.speed) +
" mode: " + str(self.mode) +
" alert: " + str(self.alert) +
" temperature_inside: " + str(self.temperature_inside) +
" temperature_outside: " + str(self.temperature_outside) +
" humidity: " + str(self.humidity) +
" CO2: " + str(self.co2) +
" PM2.5: " + str(self.pm25))
custom_components/jing_yang/fan.py
import logging
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from homeassistant.components.fan import (
FanEntity, SPEED_OFF, SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH, SUPPORT_SET_SPEED)
import custom_components.jing_yang as jing_yang
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['jing_yang']
DOMAIN = 'jing_yang'
ENTITYID = "fan." + DOMAIN
CONF_HOST = "host"
CONF_PORT = "port"
SPEED_MAPPING = {
0: SPEED_OFF,
1: SPEED_LOW,
2: SPEED_MEDIUM,
3: SPEED_HIGH
}
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.positive_int,
}),
},
extra=vol.ALLOW_EXTRA)
# Validation of the user's configuration
# PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
# vol.Required(CONF_HOST): cv.string,
# vol.Optional(CONF_PORT): cv.string,
# })
def setup_platform(hass, config, add_devices, discovery_info=None):
dev = []
_LOGGER.info("setup platform")
jing_yang = hass.data["jing_yang"]
dev.append(JingYangFan(hass, config, jing_yang))
add_devices(dev, True)
class JingYangFan(FanEntity):
def __init__(self, hass, config, jing_yang):
self._hass = hass
self._config = config
self.jing_yang = jing_yang
self._name = "jing_yang"
self._state = "off"
self._mode = None
self._speed = SPEED_OFF
self._air_quality = None
self._alert = None
self._temperature_inside = None
self._humidity = None
self._temperature_outside = None
self._co2 = None
#self._voc = None
self._pm25 = None
#self._formic_acid = None
self.speeds = []
@property
def name(self):
"""Return the name of the fan."""
return self._name
@property
def speed(self):
"""Return the current speed."""
return self._speed
@property
def icon(self):
"""Return the icon to use in the frontend."""
return 'mdi:air-conditioner'
@property
def speed_list(self):
"""List of available fan modes."""
return [SPEED_OFF, SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH]
@property
def device_state_attributes(self):
# self._state = self.jing_yang.state
self._mode = self.jing_yang.mode
self._air_quality = self.jing_yang.air_quality
#self._speed = self.jing_yang.speed
self._alert = self.jing_yang.alert
self._temperature_inside = self.jing_yang.temperature_inside
self._humidity = self.jing_yang.humidity
self._temperature_outside = self.jing_yang.temperature_outside
self._co2 = self.jing_yang.co2
#self._voc = self.jing_yang.voc
self._pm25 = self.jing_yang.pm25
#self._formic_acid = self.jing_yang.formic_acid
data = {"state": self._state,
"mode": self._mode,
"air_quality": self._air_quality,
"speed": self._speed,
"alert": self._alert,
"temperature_inside": self._temperature_inside,
"humidity": self._humidity,
"temperature_outside": self._temperature_outside,
"co2": self._co2,
"pm2.5": self._pm25}
return data
def set_percentage(self, percentage: int) -> None:
"""Set the speed percentage of the fan."""
if percentage == 0:
self.turn_off()
else:
jingyang_speed = percentage_to_speed(percentage)
self.set_speed(jingyang_speed)
@property
def is_on(self):
"""Return true if light is on."""
_LOGGER.info("is on: " + str(self._state))
self.jing_yang.search()
self._state = self.jing_yang.state
self._mode = self.jing_yang.mode
self._air_quality = self.jing_yang.air_quality
self._speed = self.jing_yang.speed
self._alert = self.jing_yang.alert
self._temperature_inside = self.jing_yang.temperature_inside
self._humidity = self.jing_yang.humidity
self._temperature_outside = self.jing_yang.temperature_outside
self._co2 = self.jing_yang.co2
#self._voc = self.jing_yang.voc
self._pm25 = self.jing_yang.pm25
#self._formic_acid = self.jing_yang.formic_acid
return self._state not in ["off", None]
def turn_off(self, *args, **kwargs):
self._speed = SPEED_OFF
self.jing_yang.turn_off()
if self.jing_yang.state == "off":
self._state = "off"
def turn_on(
self,
speed: str = None,
percentage: int = None,
preset_mode: str = None,
**kwargs,
) -> None:
"""Turn on the fan."""
if speed is None:
speed = SPEED_LOW
if percentage is not None:
self.set_percentage(percentage)
else:
self.jing_yang.turn_on(None, speed)
#self._state = self.jing_yang.state
#self._speed = self.jing_yang.speed
def set_speed(self, speed: str):
if speed is None:
self._speed = SPEED_LOW
self.jing_yang.set_speed(self._speed)
elif speed == SPEED_LOW:
self._speed = SPEED_LOW
self.jing_yang.set_speed(self._speed)
elif speed == SPEED_MEDIUM:
self._speed = SPEED_MEDIUM
self.jing_yang.set_speed(self._speed)
elif speed == SPEED_HIGH:
self._speed = SPEED_HIGH
self.jing_yang.set_speed(self._speed)
elif speed == SPEED_OFF or speed == "":
self._speed = SPEED_OFF
self.turn_off()
else:
_LOGGER.info("set speed : " + speed + ". Warnning: Don't have this kind of speed")
_LOGGER.info("set speed : " + speed + ", _speed: " + self._speed)
return self._speed
@property
def percentage(self) -> int | None:
"""Return the current speed."""
if not self.is_on:
return 0
if self.speeds is None:
return None
return speed_to_percentage(self.speed)
@property
def supported_features(self) -> int:
"""Flag supported features."""
return SUPPORT_SET_SPEED
|
|