"""Platform for Custom Fan integration."""
import asyncio
import async_timeout
import logging
from datetime import timedelta
from typing import Any
from homeassistant.components.fan import (
FanEntity,
FanEntityFeature,
)
from .const import (
DEFAULT_HOST,
DEFAULT_NAME,
DEFAULT_PORT,
SPEED_MAPPING,
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Custom Fan platform."""
host = config.get("host", DEFAULT_HOST)
port = config.get("port", DEFAULT_PORT)
name = config.get("name", DEFAULT_NAME)
async_add_entities([CustomFan(host, port, name)])
class CustomFan(FanEntity):
"""Representation of a Custom Fan."""
_attr_supported_features = (
FanEntityFeature.SET_SPEED |
FanEntityFeature.TURN_ON |
FanEntityFeature.TURN_OFF
)
_attr_speed_count = 3
SCAN_INTERVAL = timedelta(seconds=60)
def __init__(self, host: str, port: int, name: str):
"""Initialize the fan."""
self._attr_is_on = False
self._attr_percentage = 0
self._host = host
self._port = port
self._attr_name = name
# 生成唯一标识符,这里使用主机和端口的组合作为示例
self._attr_unique_id = f"{self._host}:{self._port}"
@property
def unique_id(self) -> str:
"""Return the unique ID of this fan."""
return self._attr_unique_id
def _calculate_crc16(self, data: bytes) -> int:
"""Calculate CRC-16 (Modbus)."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
async def _send_command(self, command: bytes) -> None:
"""Send command to the fan."""
try:
_LOGGER.info("Connecting to %s:%s", self._host, self._port)
crc = self._calculate_crc16(command)
crc_bytes = crc.to_bytes(2, byteorder='little')
full_command = command + crc_bytes
_LOGGER.info("Sending command: %s", full_command.hex(' ').upper())
reader, writer = await asyncio.open_connection(self._host, self._port)
_LOGGER.info("Connected successfully")
writer.write(full_command)
await writer.drain()
_LOGGER.info("Closing connection")
writer.close()
await writer.wait_closed()
_LOGGER.info("Connection closed")
except Exception as e:
_LOGGER.error("TCP connection error: %s", str(e))
async def async_turn_on(self, percentage: int | None = None, preset_mode: str | None = None, **kwargs: Any) -> None:
"""Turn on the fan."""
try:
command = SPEED_MAPPING["on"]
await self._send_command(command)
self._attr_is_on = True
if percentage is not None:
await self.async_set_percentage(percentage)
else:
await self.async_set_percentage(33)
except Exception as e:
_LOGGER.error("Error turning on fan: %s", str(e))
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off the fan."""
try:
command = SPEED_MAPPING["off"]
await self._send_command(command)
self._attr_is_on = False
self._attr_percentage = 0
except Exception as e:
_LOGGER.error("Error turning off fan: %s", str(e))
async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed percentage of the fan."""
try:
if percentage == 0:
await self.async_turn_off()
return
# 检查风扇是否已开启
if not self._attr_is_on:
# 发送"开"命令,确保风扇开启
command_on = SPEED_MAPPING["on"]
await self._send_command(command_on)
if percentage <= 33:
command = SPEED_MAPPING["low"]
self._attr_percentage = 33
elif percentage <= 66:
command = SPEED_MAPPING["medium"]
self._attr_percentage = 66
else:
command = SPEED_MAPPING["high"]
self._attr_percentage = 100
await self._send_command(command)
self._attr_is_on = True
self.async_write_ha_state()
except Exception as e:
_LOGGER.error("Error setting fan percentage: %s", str(e))
async def async_update(self) -> None:
"""Fetch state from the device."""
try:
async with async_timeout.timeout(10):
query_command = SPEED_MAPPING["query"]
crc = self._calculate_crc16(query_command)
crc_bytes = crc.to_bytes(2, byteorder='little')
full_query = query_command + crc_bytes
_LOGGER.info("Connecting to %s:%s for query", self._host, self._port)
reader, writer = await asyncio.open_connection(self._host, self._port)
_LOGGER.info("Connected successfully")
_LOGGER.info("Sending query: %s", full_query.hex(' ').upper())
writer.write(full_query)
await writer.drain()
_LOGGER.info("Waiting for response")
try:
response = await asyncio.wait_for(reader.read(11), timeout=3.0)
if response:
_LOGGER.info("Received response: %s", response.hex(' ').upper())
except asyncio.TimeoutError:
_LOGGER.error("No response received within 3 seconds")
return
finally:
_LOGGER.info("Closing connection")
writer.close()
await writer.wait_closed()
_LOGGER.info("Connection closed")
if len(response) >= 11:
data = response[:-2]
received_crc = int.from_bytes(response[-2:], byteorder='little')
calculated_crc = self._calculate_crc16(data)
_LOGGER.info("Calculated CRC: 0x%04X", calculated_crc)
if calculated_crc != received_crc:
_LOGGER.error("CRC check failed. Received: 0x%04X, Calculated: 0x%04X",
received_crc, calculated_crc)
return
if response[4] == 0x00:
self._attr_is_on = False
self._attr_percentage = 0
return
self._attr_is_on = (response[4] == 0x01)
speed_value = response[8]
if speed_value == 0x01:
self._attr_percentage = 33
elif speed_value == 0x02:
self._attr_percentage = 66
elif speed_value == 0x04:
self._attr_percentage = 100
else:
self._attr_percentage = 0
else:
_LOGGER.error("Response too short: %s", response.hex(' ').upper() if response else "No data")
except asyncio.TimeoutError:
_LOGGER.error("Update operation timed out")
except Exception as e:
_LOGGER.error("Error updating fan state: %s", str(e))