"""
Support for TP-Link routers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.tplink/
"""
import base64
from datetime import datetime
import hashlib
import logging
import re
from aiohttp.hdrs import (
ACCEPT, COOKIE, PRAGMA, REFERER, CONNECTION, KEEP_ALIVE, USER_AGENT,
CONTENT_TYPE, CACHE_CONTROL, ACCEPT_ENCODING, ACCEPT_LANGUAGE)
import requests
import voluptuous as vol
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, HTTP_HEADER_X_REQUESTED_WITH)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
HTTP_HEADER_NO_CACHE = 'no-cache'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string
})
def get_scanner(hass, config):
"""Validate the configuration and return a TP-Link scanner."""
scanner = TplinkDeviceScanner(config[DOMAIN])
if scanner.success_init:
return scanner
return None
class TpDeviceScanner(DeviceScanner):
"""This class queries a wireless router running TP-Link firmware."""
def __init__(self, config):
"""Initialize the scanner."""
host = config[CONF_HOST]
username, password = config[CONF_USERNAME], config[CONF_PASSWORD]
self.parse_macs = re.compile('[0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2}-' +
'[0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2}')
self.host = host
self.username = username
self.password = password
self.last_results = {}
self.success_init = self._update_info()
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results
# pylint: disable=no-self-use
def get_device_name(self, device):
"""Get firmware doesn't save the name of the wireless device."""
return None
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
_LOGGER.info("Loading wireless clients...")
url = 'http://{}/userRpm/WlanStationRpm.htm'.format(self.host)
referer = 'http://{}'.format(self.host)
page = requests.get(
url, auth=(self.username, self.password),
headers={REFERER: referer}, timeout=4)
result = self.parse_macs.findall(page.text)
if result:
self.last_results = [mac.replace("-", ":") for mac in result]
return True
return False
class TplinkDeviceScanner(TpDeviceScanner):
"""This class queries the Archer C9 router with version 150811 or high."""
def __init__(self, config):
"""Initialize the scanner."""
self.stok = ''
self.sysauth = ''
super(TplinkDeviceScanner, self).__init__(config)
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
self._log_out()
return self.last_results.keys()
# pylint: disable=no-self-use
def get_device_name(self, device):
"""Get the firmware doesn't save the name of the wireless device.
We are forced to use the MAC address as name here.
"""
return self.last_results.get(device)
def _get_auth_tokens(self):
"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving auth tokens...")
url = 'http://{}/cgi-bin/luci/;stok=/login?form=login' \
.format(self.host)
referer = 'http://{}/webpages/login.html'.format(self.host)
# If possible implement RSA encryption of password here.
data = '{"method" : "login", "params": { "username": "' + self.username + '" , "password": "' + self.password + '"}}'
response = requests.post(
url, data={ 'data': data},
headers={'Referer': referer})
try:
_LOGGER.info('1' + response.text)
self.stok = response.json().get('result').get('stok')
_LOGGER.info(self.stok)
regex_result = re.search(
'sysauth=(.*);', response.headers['set-cookie'])
self.sysauth = regex_result.group(1)
_LOGGER.info(self.sysauth)
return True
except (ValueError, KeyError) as _:
_LOGGER.error("Couldn't fetch auth tokens! Response was: %s",
response.text)
return False
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens()
_LOGGER.info("Loading wireless clients...")
url = ('http://{}/cgi-bin/luci/;stok={}/admin/wlan_station?'
'form=stainfo').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
data = '{"method" : "get", "params": { "key_idx": "-1"}}'
response = requests.post(
url, data={'data': data},headers={'Referer': referer},
cookies={'sysauth': self.sysauth}, timeout=5)
try:
_LOGGER.error(response.text)
json_response = response.json()
if json_response.get('error_code') == '0':
result = response.json().get('result').get('sta_info')
else:
if json_response.get('errorcode') == 'timeout':
_LOGGER.info("Token timed out. Relogging on next scan")
self.stok = ''
self.sysauth = ''
return False
_LOGGER.error(
"An unknown error happened while fetching data")
return False
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct")
return False
if result:
self.last_results = {
device['sta_mac'].replace('-', ':'): device['host_name']
for device in result
}
return True
return False
def _log_out(self):
_LOGGER.info("Logging out of router admin interface...")
url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?'
'form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
requests.post(
url, params={'operation': 'write'}, headers={REFERER: referer},
cookies={'sysauth': self.sysauth})
self.stok = ''
self.sysauth = ''