『瀚思彼岸』» 智能家居技术论坛

 找回密码
 立即注册
查看: 383|回复: 7

[经验分享] 新风接入HA 485总线

[复制链接]

8

主题

55

帖子

457

积分

论坛分享达人

积分
457
金钱
402
HASS币
0
发表于 2025-1-10 22:00:12 | 显示全部楼层 |阅读模式
本帖最后由 zmaozhao 于 2025-1-11 07:29 编辑

上海某地产配的垃圾小蚁智家网关经常失联反应慢,经研究是485总线,自有协议
抓包发现原来网关会自动更新token
所以直接追到根上,通过485转网口接入ha的方案浮上水面
通过cursor编写了一个fan的插件来接入ha
原来串口的HEX命令,最后经过运算是CRC-16校验
低风 A1 06 10 07 00 01 E5 AB 
中风 A1 06 10 07 00 02 A5 AA 
高风 A1 06 10 07 00 04 25 A8 
打开 A1 06 10 05 00 01 44 6B
关闭 A1 06 10 05 00 00 85 AB 
查询 A1 03 10 05 00 03 09 AA
回复低风打开 A1 03 06 00 01 00 01 00 01 74 B2 
回复中风打开 A1 03 06 00 01 00 01 00 02 34 B3 
回复高风打开 A1 03 06 00 01 00 01 00 04 B4 B1

                               
登录/注册后可看大图

通过cursor kimi等AI工具一顿操作,终于完美接入了
"""Platform for Custom Fan integration."""
import asyncio
import async_timeout
import logging
from datetime import timedelta
from typing import Any

from homeassistant.components.fan import (
    FanEntity,
    FanEntityFeature,
)

from .const import (
    DEFAULT_HOST,
    DEFAULT_NAME,
    DEFAULT_PORT,
    SPEED_MAPPING,
)

_LOGGER = logging.getLogger(__name__)

async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
    """Set up the Custom Fan platform."""
    host = config.get("host", DEFAULT_HOST)
    port = config.get("port", DEFAULT_PORT)
    name = config.get("name", DEFAULT_NAME)
    
    async_add_entities([CustomFan(host, port, name)])

class CustomFan(FanEntity):
    """Representation of a Custom Fan."""
    
    _attr_supported_features = (
        FanEntityFeature.SET_SPEED |
        FanEntityFeature.TURN_ON |
        FanEntityFeature.TURN_OFF
    )
    _attr_speed_count = 3
    SCAN_INTERVAL = timedelta(seconds=60)

    def __init__(self, host: str, port: int, name: str):
        """Initialize the fan."""
        self._attr_is_on = False
        self._attr_percentage = 0
        self._host = host
        self._port = port
        self._attr_name = name
        # 生成唯一标识符,这里使用主机和端口的组合作为示例
        self._attr_unique_id = f"{self._host}:{self._port}"
    @property
    def unique_id(self) -> str:
        """Return the unique ID of this fan."""
        return self._attr_unique_id
    def _calculate_crc16(self, data: bytes) -> int:
        """Calculate CRC-16 (Modbus)."""
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
        return crc

    async def _send_command(self, command: bytes) -> None:
        """Send command to the fan."""
        try:
            _LOGGER.info("Connecting to %s:%s", self._host, self._port)
            crc = self._calculate_crc16(command)
            crc_bytes = crc.to_bytes(2, byteorder='little')
            full_command = command + crc_bytes
            
            _LOGGER.info("Sending command: %s", full_command.hex(' ').upper())
            reader, writer = await asyncio.open_connection(self._host, self._port)
            _LOGGER.info("Connected successfully")
            writer.write(full_command)
            await writer.drain()
            
            _LOGGER.info("Closing connection")
            writer.close()
            await writer.wait_closed()
            _LOGGER.info("Connection closed")
        except Exception as e:
            _LOGGER.error("TCP connection error: %s", str(e))

    async def async_turn_on(self, percentage: int | None = None, preset_mode: str | None = None, **kwargs: Any) -> None:
        """Turn on the fan."""
        try:
            command = SPEED_MAPPING["on"]
            await self._send_command(command)
            self._attr_is_on = True
            if percentage is not None:
                await self.async_set_percentage(percentage)
            else:
                await self.async_set_percentage(33)
        except Exception as e:
            _LOGGER.error("Error turning on fan: %s", str(e))

    async def async_turn_off(self, **kwargs: Any) -> None:
        """Turn off the fan."""
        try:
            command = SPEED_MAPPING["off"]
            await self._send_command(command)
            self._attr_is_on = False
            self._attr_percentage = 0
        except Exception as e:
            _LOGGER.error("Error turning off fan: %s", str(e))

    async def async_set_percentage(self, percentage: int) -> None:
        """Set the speed percentage of the fan."""
        
        try:
            if percentage == 0:
                await self.async_turn_off()
                return
                    # 检查风扇是否已开启
            if not self._attr_is_on:
                # 发送"开"命令,确保风扇开启
                command_on = SPEED_MAPPING["on"]
                await self._send_command(command_on)

            if percentage <= 33:
                command = SPEED_MAPPING["low"]
                self._attr_percentage = 33
            elif percentage <= 66:
                command = SPEED_MAPPING["medium"]
                self._attr_percentage = 66
            else:
                command = SPEED_MAPPING["high"]
                self._attr_percentage = 100

            await self._send_command(command)
            self._attr_is_on = True
            self.async_write_ha_state()
        except Exception as e:
            _LOGGER.error("Error setting fan percentage: %s", str(e))

    async def async_update(self) -> None:
        """Fetch state from the device."""
        try:
            async with async_timeout.timeout(10):
                query_command = SPEED_MAPPING["query"]
                crc = self._calculate_crc16(query_command)
                crc_bytes = crc.to_bytes(2, byteorder='little')
                full_query = query_command + crc_bytes
                
                _LOGGER.info("Connecting to %s:%s for query", self._host, self._port)
                reader, writer = await asyncio.open_connection(self._host, self._port)
                _LOGGER.info("Connected successfully")
                
                _LOGGER.info("Sending query: %s", full_query.hex(' ').upper())
                writer.write(full_query)
                await writer.drain()
                
                _LOGGER.info("Waiting for response")
                try:
                    response = await asyncio.wait_for(reader.read(11), timeout=3.0)
                    if response:
                        _LOGGER.info("Received response: %s", response.hex(' ').upper())
                except asyncio.TimeoutError:
                    _LOGGER.error("No response received within 3 seconds")
                    return
                finally:
                    _LOGGER.info("Closing connection")
                    writer.close()
                    await writer.wait_closed()
                    _LOGGER.info("Connection closed")
                
                if len(response) >= 11:
                    data = response[:-2]
                    received_crc = int.from_bytes(response[-2:], byteorder='little')
                    calculated_crc = self._calculate_crc16(data)
                    
                    _LOGGER.info("Calculated CRC: 0x%04X", calculated_crc)
                    
                    if calculated_crc != received_crc:
                        _LOGGER.error("CRC check failed. Received: 0x%04X, Calculated: 0x%04X", 
                                     received_crc, calculated_crc)
                        return
                    
                    if response[4] == 0x00:
                        self._attr_is_on = False
                        self._attr_percentage = 0
                        return
                    
                    self._attr_is_on = (response[4] == 0x01)
                    
                    speed_value = response[8]
                    if speed_value == 0x01:
                        self._attr_percentage = 33
                    elif speed_value == 0x02:
                        self._attr_percentage = 66
                    elif speed_value == 0x04:
                        self._attr_percentage = 100
                    else:
                        self._attr_percentage = 0
                else:
                    _LOGGER.error("Response too short: %s", response.hex(' ').upper() if response else "No data")
                    
        except asyncio.TimeoutError:
            _LOGGER.error("Update operation timed out")
        except Exception as e:
            _LOGGER.error("Error updating fan state: %s", str(e))
游客,如果您要查看本帖隐藏内容请回复



评分

参与人数 1金钱 +16 收起 理由
sorrypqa + 16 大神666!

查看全部评分

回复

使用道具 举报

55

主题

1407

帖子

5996

积分

论坛DIY达人

积分
5996
金钱
4589
HASS币
20
发表于 2025-1-11 08:40:18 | 显示全部楼层
本帖最后由 sorrypqa 于 2025-1-11 08:41 编辑

谢谢大佬的分享,有技术就是牛!这里的每个字符都认识,但拼在一起就是看不懂......
回复

使用道具 举报

8

主题

55

帖子

457

积分

论坛分享达人

积分
457
金钱
402
HASS币
0
 楼主| 发表于 2025-1-11 21:08:42 | 显示全部楼层
sorrypqa 发表于 2025-1-11 08:40
谢谢大佬的分享,有技术就是牛!这里的每个字符都认识,但拼在一起就是看不懂...... ...

AI写的,我也不懂,主打能用就行
回复

使用道具 举报

0

主题

20

帖子

141

积分

注册会员

Rank: 2

积分
141
金钱
121
HASS币
0
发表于 2025-1-12 09:25:55 | 显示全部楼层
我这边开发商也用了套私有协议的rs485模块,分别控制灯、空调、新风和地暖,如何能获取他的相关协议?
回复

使用道具 举报

8

主题

55

帖子

457

积分

论坛分享达人

积分
457
金钱
402
HASS币
0
 楼主| 发表于 2025-1-12 18:27:41 | 显示全部楼层
vickzhou 发表于 2025-1-12 09:25
我这边开发商也用了套私有协议的rs485模块,分别控制灯、空调、新风和地暖,如何能获取他的相关协议? ...

把485接一个电脑,用开发商的设备控制,截获控制协议,再套用这个模板或者ai生成
回复

使用道具 举报

2

主题

7

帖子

152

积分

注册会员

Rank: 2

积分
152
金钱
145
HASS币
0
发表于 2025-1-16 09:39:17 | 显示全部楼层
6666666666666666666
回复

使用道具 举报

0

主题

27

帖子

165

积分

注册会员

Rank: 2

积分
165
金钱
138
HASS币
0
发表于 2025-1-16 10:58:02 | 显示全部楼层
所有新风适用吗
回复

使用道具 举报

8

主题

55

帖子

457

积分

论坛分享达人

积分
457
金钱
402
HASS币
0
 楼主| 发表于 2025-1-18 19:28:43 | 显示全部楼层
grtwall 发表于 2025-1-16 10:58
所有新风适用吗

提供的是思路,把几个指令放进去就可以控制任何新风
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|Hassbian

GMT+8, 2025-2-2 17:57 , Processed in 0.066915 second(s), 34 queries .

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表