"""Support for Xiaomi device tracker."""
import logging
import time,math
from datetime import timedelta
x_pi = 3.14159265358979324 * 3000.0 / 180.0
pi = 3.1415926535897932384626
a = 6378245.0
ee = 0.00669342162296594323
from homeassistant.const import * # noqa: F401
from homeassistant.components.device_tracker import (
DOMAIN as ENTITY_DOMAIN,
)
from homeassistant.components.device_tracker.const import SOURCE_TYPE_GPS
from homeassistant.components.device_tracker.config_entry import TrackerEntity
from . import (
DOMAIN,
CONF_MODEL,
XIAOMI_CONFIG_SCHEMA as PLATFORM_SCHEMA, # noqa: F401
MiotEntity,
async_setup_config_entry,
bind_services_to_entries,
)
from .core.miot_spec import (
MiotSpec,
MiotService,
)
_LOGGER = logging.getLogger(__name__)
DATA_KEY = f'{ENTITY_DOMAIN}.{DOMAIN}'
SCAN_INTERVAL = timedelta(seconds=60)
SERVICE_TO_METHOD = {}
#BD09转为WGS84
def out_of_china(lng, lat):
return not (lng > 73.66 and lng < 135.05 and lat > 3.86 and lat < 53.55)
def _lat(lng, lat):
ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat + \
0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 *
math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lat * pi) + 40.0 *
math.sin(lat / 3.0 * pi)) * 2.0 / 3.0
ret += (160.0 * math.sin(lat / 12.0 * pi) + 320 *
math.sin(lat * pi / 30.0)) * 2.0 / 3.0
return ret
def _lng(lng, lat):
ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng + \
0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * pi) + 20.0 *
math.sin(2.0 * lng * pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lng * pi) + 40.0 *
math.sin(lng / 3.0 * pi)) * 2.0 / 3.0
ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 *
math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
return ret
def bd09_to_gcj02(bd_lon, bd_lat):
x = bd_lon - 0.0065
y = bd_lat - 0.006
z = math.sqrt(x * x + y * y) - 0.00002 * math.sin(y * x_pi)
theta = math.atan2(y, x) - 0.000003 * math.cos(x * x_pi)
gg_lng = z * math.cos(theta)
gg_lat = z * math.sin(theta)
return [gg_lng, gg_lat]
def gcj02_to_wgs84(lng, lat):
if out_of_china(lng, lat):
return [lng, lat]
dlat = _lat(lng - 105.0, lat - 35.0)
dlng = _lng(lng - 105.0, lat - 35.0)
radlat = lat / 180.0 * pi
magic = math.sin(radlat)
magic = 1 - ee * magic * magic
sqrtmagic = math.sqrt(magic)
dlat = (dlat * 180.0) / ((a * (1 - ee)) / (magic * sqrtmagic) * pi)
dlng = (dlng * 180.0) / (a / sqrtmagic * math.cos(radlat) * pi)
mglat = lat + dlat
mglng = lng + dlng
return [lng * 2 - mglng, lat * 2 - mglat]
def bd09_to_wgs84(bd_lon, bd_lat):
lon, lat = bd09_to_gcj02(bd_lon, bd_lat)
return gcj02_to_wgs84(lon, lat)
async def async_setup_entry(hass, config_entry, async_add_entities):
await async_setup_config_entry(hass, config_entry, async_setup_platform, async_add_entities, ENTITY_DOMAIN)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
hass.data.setdefault(DATA_KEY, {})
hass.data[DOMAIN]['add_entities'][ENTITY_DOMAIN] = async_add_entities
config['hass'] = hass
model = str(config.get(CONF_MODEL) or '')
spec = hass.data[DOMAIN]['miot_specs'].get(model)
entities = []
if isinstance(spec, MiotSpec):
for srv in spec.get_services('watch', 'rearview_mirror', 'head_up_display'):
if 'xiaoxun.watch.' in model:
entities.append(XiaoxunWatchTrackerEntity(config, srv))
elif srv.get_property('latitude', 'longitude'):
entities.append(MiotTrackerEntity(config, srv))
if not entities and ('xiaoxun.watch.' in model or 'xiaoxun.tracker.' in model):
# xiaoxun.watch.sw763
# xiaoxun.tracker.v1
entities.append(XiaoxunWatchTrackerEntity(config))
for entity in entities:
hass.data[DOMAIN]['entities'][entity.unique_id] = entity
async_add_entities(entities, update_before_add=True)
bind_services_to_entries(hass, SERVICE_TO_METHOD)
class MiotTrackerEntity(MiotEntity, TrackerEntity):
_attr_latitude = None
_attr_longitude = None
_attr_location_name = None
_attr_location_accuracy = 0
_disable_location_name = False
def __init__(self, config, miot_service: MiotService = None):
super().__init__(miot_service, config=config, logger=_LOGGER)
async def async_added_to_hass(self):
await super().async_added_to_hass()
self._disable_location_name = self.custom_config_bool('disable_location_name')
async def async_update(self):
await super().async_update()
if not self._available or not self._miot_service:
return
if prop := self._miot_service.get_property('latitude'):
self._attr_latitude = prop.from_dict(self._state_attrs)
if prop := self._miot_service.get_property('longitude'):
self._attr_longitude = prop.from_dict(self._state_attrs)
if prop := self._miot_service.get_property('current_address'):
self._attr_location_name = prop.from_dict(self._state_attrs)
for p in self._miot_service.get_properties('driving_status'):
self._update_sub_entities(p, None, 'binary_sensor')
@property
def should_poll(self):
"""No polling for entities that have location pushed."""
return True
@property
def source_type(self):
"""Return the source type, eg gps or router, of the device."""
return SOURCE_TYPE_GPS
@property
def latitude(self):
"""Return latitude value of the device."""
return self._attr_latitude
@property
def longitude(self):
"""Return longitude value of the device."""
return self._attr_longitude
@property
def location_name(self):
"""Return a location name for the current location of the device."""
if self._disable_location_name:
return None
return self._attr_location_name
@property
def location_accuracy(self):
"""Return the location accuracy of the device.
Value in meters.
"""
return self._attr_location_accuracy
@property
def battery_level(self):
"""Return the battery level of the device."""
if not self._miot_service:
return None
sls = [self._miot_service, *self._miot_service.spec.get_services('battery')]
for srv in sls:
prop = srv.get_property('battery_level')
if prop:
return prop.from_dict(self._state_attrs)
return None
class XiaoxunWatchTrackerEntity(MiotTrackerEntity):
def __init__(self, config, miot_service: MiotService = None, miot_spec: MiotSpec = None):
self._miot_spec = miot_spec
super().__init__(config=config, miot_service=miot_service)
@property
def device_eid(self):
did = f'{self.miot_did}'
return did.replace('xiaoxun.', '')
async def async_update(self):
await super().async_update()
await self.update_location()
async def update_location(self):
did = f'{self.miot_did}'
mic = self.xiaomi_cloud
if not did or not mic:
return
pms = {
'app_id': '10025',
'dids': [did],
'params': {
'CID': 50031,
'model': self._model,
'SN': int(time.time() / 1000),
'PL': {
'Size': 1,
'Key': '78999898989898998',
'EID': self.device_eid,
},
},
}
rdt = await mic.async_request_api('third/api', pms) or {}
loc = {}
for v in (rdt.get('result') or {}).get('PL', {}).get('List', {}).values():
loc = v.get('result', {})
break
if not loc:
self.logger.warning('%s: Got xiaoxun watch location faild: %s', self.name_model, rdt)
return
self.logger.debug('%s: Got xiaoxun watch location: %s', self.name_model, rdt)
gps = f"{loc.get('location', '')},".split(',')
gps=bd09_to_wgs84(float(gps[0]),float(gps[1])) #修改为wgs84坐标
self._attr_latitude = float(gps[1])
self._attr_longitude = float(gps[0])
self._attr_location_name = loc.get('desc')
self._attr_location_accuracy = int(loc.get('radius') or 0)
tim = loc.get('timestamp', '')
self.update_attrs({
'timestamp': f'{tim[0:4]}-{tim[4:6]}-{tim[6:8]} {tim[8:10]}:{tim[10:12]}:{tim[12:14]}',
})