『瀚思彼岸』» 智能家居技术论坛

 找回密码
 立即注册
查看: 845|回复: 2

[插件集成] 【求教】如何让插件自动同步状态

[复制链接]

4

主题

92

帖子

1210

积分

金牌会员

Rank: 6Rank: 6

积分
1210
金钱
1113
HASS币
20
发表于 2022-8-6 23:22:58 | 显示全部楼层 |阅读模式
本帖最后由 furong600 于 2022-8-7 19:24 编辑

背景:家里有个新风是485控制的,一直用论坛大神的插件。但现在HA版本更新了,插件也要随着更新。
问题:自己依葫芦画瓢,写了一些代码。但不明白如何让插件自动同步状态?

官网虽然有介绍:https://developers.home-assistan ... _working_with_async
但一直没搞明白,特向论坛大神请教。

代码主动控制没问题,就是不能同步状态,开关状态也不对。

下图实为关机状态,但还显示开着。
新风.png

插件主要代码如下:
fan.py
import socket
import struct
import logging

from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from homeassistant.components.fan import FanEntity, FanEntityFeature
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType

# import custom_components.broan as broan

from typing import Any, final

from . import DOMAIN

_LOGGER = logging.getLogger(__name__)


ENTITYID = "fan." + DOMAIN

CONF_HOST = "host"
CONF_PORT = "port"
CONF_ADDRESS = "address"

# SPEED_MAPPING = {
#     0: SPEED_OFF,
#     1: SPEED_LOW,
#     2: SPEED_MEDIUM,
#     3: SPEED_HIGH
# }

CONFIG_SCHEMA = vol.Schema(
    {
        DOMAIN: vol.Schema(
            {
                vol.Required(CONF_HOST): cv.string,
                vol.Required(CONF_PORT): cv.positive_int,
                vol.Optional(CONF_ADDRESS, default="01"): cv.string,
            }),
    },
    extra=vol.ALLOW_EXTRA)

def setup_platform(
    hass: HomeAssistant,
    config: ConfigType,
    add_devices,
    discovery_info = None
) -> None:
    """Set up the sensor platform."""
    # We only want this platform to be set up via discovery.
    # add_entities([ExampleSensor()])
    dev = []
    _LOGGER.info("setup platform")
    # broan = hass.data["broan"]
    dev.append(BroanFan(hass, config))
    add_devices(dev, True)


class BroanFan(FanEntity):

    def __init__(self, hass, config):
        br = hass.data[DOMAIN]

        self._hass = hass
        self._config = config
        # self.broan = broan
        #self._status = None
        self._state = "off"
        self._percentage = 33
        #self._direction = None
        # self._mode = None
        self._name = "broan485"
        self._attr_supported_features = FanEntityFeature.SET_SPEED   # Enable set_speed


        self.host = br.get(CONF_HOST)
        self.port = br.get(CONF_PORT)

        self.start_flag = "AA"
        self.address = br.get(CONF_ADDRESS)
        self.length = "05"
        self.cmd = "01"       # 01: write, 02: read
        self.data_addr = "01"   # 01: control data, 00: sensor
        self.end_flag = "66"

        # sensor
        self.temperature_inside = None
        self.temperature_outside = None
        self.humidity = None
        self.co2 = None
        self.pm25 = None
        self.air_quality = None
        self.alert = None
        self.mode = None     # auto, bypass, ...

        self.speed_low = "AA01050101808866"
        self.speed_medium = "AA01050101818966"
        self.speed_high = "AA01050101828A66"
        self.turn_off_cmd = "AA01050101000866"
        self.bypass = "AA01050101838B66"
        self.auto_comfort = "AA01050101848c66"
        self.auto_intelligent = "AA01050101858D66"
        self.read_all_data = "AA010502000a1266"

        # # self._state = None
        # self.Host_Id = "02"
        # self.Mode = None
        # # self.M1_speed = None
        # # self.M2_speed = None
        # self.New_Opt = "5a"
        # self.End_Flag = "f5"
        # self.Strat_Flag = "aa"

        
        # self.Temper = None
        # self.Error_Code = None
        self.async_search()
        hass.loop.create_task( self.async_search() )
        
    @property
    def name(self):
        """Return the name of the fan."""
        return self._name

    def turn_off(self, **kwargs):
        self._percentage = 0
        data      = "00"
        #cmd = self.make_cmd(self.cmd, self.data_addr, data)
        #response = self.send_cmd(cmd)
        response = self.send_cmd(self.turn_off_cmd)
        _LOGGER.info("Turn off: " + str(response))

    def send_cmd(self, cmd):
        try:
            _LOGGER.info("send cmd hex: " + str(cmd))
            cmd = bytes.fromhex(cmd)
            host = socket.gethostbyname(self.host)
            port = (host, self.port)
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(20)
            s.connect(port)
            s.send(cmd)
            _LOGGER.info("send cmd: " + str(cmd))
            message = s.recv(1024)
            s.settimeout(None)
            message = ''.join(['%02x' % b for b in message])
            s.close()
            _LOGGER.info("send cmd recv: " + str(message))
            return message
        except Exception as e:
            _LOGGER.info(e)
            return e

    def make_cmd(self, command, data_addr, data):
        # if new_opt is None:
        #     checksum = bytes.fromhex(self.address + self.Host_Id + mode + m1_speed + m2_speed + self.New_Opt)
        #     a = checksum[1:]
        #     b = "%x"%sum(a)
        #     cmd = bytes.fromhex(self.Strat_Flag + self.address + self.Host_Id + mode + m1_speed + m2_speed + self.New_Opt + str(b) + self.End_Flag)
        # else:
        checksum = bytes.fromhex(self.address + self.length + command + data_addr + data)
        a = checksum[1:]
        b = "%x"%sum(a)
        _LOGGER.info("check bits: " + str(b))
        cmd = bytes.fromhex(self.start_flag + self.address + self.length + command + data_addr + data + str(b) + self.end_flag)
        return cmd

    async def async_search(self):
        command = "02"
        data_addr = "00"
        data = "0A"

        cmd = self.make_cmd(command, data_addr, data)
        response = self.send_cmd(cmd)
        _LOGGER.info("async_search: " + str(response))

        if not str(response).startswith("aa"):
            return
        air_quality         = response[10:11]
        mode                = response[12:13]
        alert               = response[14:15]
        temperature_inside  = response[16:17]
        humidity            = response[18:19]
        temperature_outside = response[20:21]
        co2                 = response[22:25]
        #voc                 = response[26:28]
        pm25                = response[28:31]
        #formic_acid         = response[32:34]

        if air_quality == "00":
            self.air_quality = "Great"
        elif air_quality == "01":
            self.air_quality = "Good"
        elif air_quality == "02":
            self.air_quality = "Bad"

        if mode == '00':
            self._percentage = 0
            self.mode = "off"
        elif mode == '80':
            self._percentage = 33
            self.mode = "Low" 
        elif mode == '81':
            self._percentage = 66
            self.mode = "Medium"
        elif mode == '82':
            self._percentage = 100
            self.mode = "High" 
        elif mode == '83':
            self.mode = "bypass"
        elif mode == '84':
            self.mode = "Auto-Comportable"
        elif mode == '85':
            self.mode = "Auto-intelligent"
        else:
            _LOGGER.warning("unknown speed: " + str(speed))

        if alert == "00":
            self.alert = "No Alert"
        elif alert == "01":
            self.alert = "交换芯维护"
        elif alert == "02":
            self.alert = "Filter Unit Clean"
        elif alert == "03":
            self.alert = "交换芯/Filter Clean"
        else:
            _LOGGER.warning("unknown alert: " + str(alert))

        temperature_inside = bin(int(temperature_inside, 16))[2:].rjust(8, "0")
        if temperature_inside.startswith("0"):
            self.temperature_inside = int(temperature_inside, 2)
        else:
            self.temperature_inside = -int(temperature_inside[1:], 2)

        self.humidity = int(humidity, 16)

        temperature_outside = bin(int(temperature_outside, 16))[2:].rjust(8, "0")
        if temperature_outside.startswith("0"):
            self.temperature_outside = int(temperature_outside, 2)
        else:
            self.temperature_outside = -int(temperature_outside[1:], 2)

        co2 = co2[2:] + co2[:2]
        self.co2 = int(co2, 16)

        #self.voc = int(voc, 16)

        pm25 = pm25[2:] + pm25[:2]
        self.pm25 = int(pm25, 16)

        #self.formic_acid = int(formic_acid, 16)

        _LOGGER.info("air_quality: " + str(self.air_quality) +
                     " mode: " + str(self.mode) +
                     " alert: " + str(self.alert) +
                     " temperature_inside: " + str(self.temperature_inside) +
                     " temperature_outside: " + str(self.temperature_outside) +
                     " humidity: " + str(self.humidity) +
                     " CO2: " + str(self.co2) +
                     " PM2.5: " + str(self.pm25))

    async def async_update(self) -> None:
        self._state = await self.async_search()

    def set_percentage(self, percentage: int) -> None:
        """Set the speed of the fan, as a percentage."""
        if percentage == 0:
            self.turn_off()
        elif percentage <= 33:
            self._percentage = 33
            data      = "80"
            cmd = self.make_cmd(self.cmd, self.data_addr, data)
            #response = self.send_cmd(cmd)
            response = self.send_cmd(self.speed_low)
            _LOGGER.info("Set speed: " + str(response))
        elif percentage <= 66:
            self._percentage = 66
            data      = "81"
            cmd = self.make_cmd(self.cmd, self.data_addr, data)
            #response = self.send_cmd(cmd)
            response = self.send_cmd(self.speed_medium)
            _LOGGER.info("Set speed: " + str(response))
        else:
            self._percentage = 100
            data      = "82"
            cmd = self.make_cmd(self.cmd, self.data_addr, data)
            #response = self.send_cmd(cmd)
            response = self.send_cmd(self.speed_high)
            _LOGGER.info("Set speed: " + str(response))

    @property
    def percentage(self) -> int | None:
        """Return the current percentage."""
        return self._percentage

    def turn_on(
        self,
        percentage: int | None = None,
        preset_mode: str | None = None,
        **kwargs
    ) -> None:
        """Turn on the fan."""
        self.set_percentage(33)

    @property
    async def is_on(self) -> bool | None:
        """Return true if the entity is on."""
        await self.async_search()
        return (
            self._percentage is not None and self._percentage > 0
        )

    @final
    @property
    def state_attributes(self) -> dict[str, float | str | None]:
        """Return optional _state attributes."""
        data: dict[str, float | str | None] = {}
        data = {"_state": self._state,
            "mode": self.mode, 
            "air_quality": self.air_quality, 
            "percentage": self._percentage, 
            "alert": self.alert, 
            "temperature_inside": self.temperature_inside, 
            "humidity": self.humidity, 
            "temperature_outside": self.temperature_outside, 
            "co2": self.co2, 
            "pm2.5": self.pm25}
        return data





回复

使用道具 举报

4

主题

92

帖子

1210

积分

金牌会员

Rank: 6Rank: 6

积分
1210
金钱
1113
HASS币
20
 楼主| 发表于 2022-8-7 19:25:53 | 显示全部楼层
有懂插件的大神,帮忙看下吗?
回复

使用道具 举报

4

主题

92

帖子

1210

积分

金牌会员

Rank: 6Rank: 6

积分
1210
金钱
1113
HASS币
20
 楼主| 发表于 2022-8-8 11:21:42 | 显示全部楼层
本帖最后由 furong600 于 2022-8-8 11:23 编辑
ryanh7 发表于 2022-8-7 23:28
async_search可能不正常,而且没有返回状态值

感谢感谢,我把log改成debug,打印了信息。
bytes.fromhex(xxx) 传的参数有问题。多谢多谢。我现在用组合好的命令发出去,正常工作。等有空再debug下格式的问题。
另外请教下:
这里面自动更新状态是哪个函数实现的?

hass.loop.create_task( self.async_search() )
还是 async_update()
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|Hassbian

GMT+8, 2024-4-23 22:07 , Processed in 3.856562 second(s), 27 queries .

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表