47 lines
1.2 KiB
Python
47 lines
1.2 KiB
Python
import json
|
|
import logging
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
from urllib.request import urlopen
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
class KumiSensors:
|
|
def __init__(self, host: str, hass: HomeAssistant) -> None:
|
|
"""Initialize."""
|
|
if not "://" in host:
|
|
host = "http://" + host
|
|
self.host = host
|
|
self.hass = hass
|
|
|
|
def fetch_data(self) -> dict:
|
|
return json.load(urlopen(self.host))
|
|
|
|
async def get_data(self) -> dict:
|
|
return await self.hass.async_add_executor_job(self.fetch_data)
|
|
|
|
async def connect(self) -> bool:
|
|
"""Test if we can connect to the host."""
|
|
try:
|
|
await self.get_data()
|
|
return True
|
|
except Exception as e: # pylint: disable=broad-except
|
|
_LOGGER.error(repr(e))
|
|
return False
|
|
|
|
async def get_identifier(self):
|
|
data = await self.get_data()
|
|
return data["identifier"]
|
|
|
|
async def get_temperature(self):
|
|
data = await self.get_data()
|
|
return data["temperature"]
|
|
|
|
async def get_pressure(self):
|
|
data = await self.get_data()
|
|
return data["pressure"]
|
|
|
|
async def get_humidity(self):
|
|
data = await self.get_data()
|
|
return data["humidity"]
|