"""Asynchronous client for Invoxia API."""
# pylint: disable=R0801 # Code duplicated with sync client
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Any, List, Optional
import aiohttp
from .datatypes import Device, Tracker, TrackerConfig, TrackerData, TrackerStatus, User
from .exceptions import HttpException, UnknownAnswerScheme
from .url_provider import UrlProvider
if TYPE_CHECKING:
from .config import Config
[docs]class AsyncClient:
"""Asynchronous client for Invoxia API."""
def __init__(self, config: Config, session: Optional[aiohttp.ClientSession] = None):
"""Initialize the Client with given configuration."""
self._cfg: Config = config
self._url_provider = UrlProvider(api_url=config.api_url)
self._session: Optional[aiohttp.ClientSession] = session
self._external_session = session is not None
async def __aenter__(self):
"""Enter context manager"""
await self._get_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager."""
await self.close()
async def _get_session(self) -> aiohttp.ClientSession:
"""Open the session if needed and return it."""
if self._session is None:
auth = self.get_auth(self._cfg)
self._session = aiohttp.ClientSession(auth=auth)
return self._session
[docs] @classmethod
def get_auth(cls, config: Config) -> aiohttp.BasicAuth:
"""Form the authentication instance associated to a config."""
return aiohttp.BasicAuth(login=config.username, password=config.password)
async def _query(self, url: str) -> Any:
"""Query the API asynchronously and return the decoded JSON response."""
# Run the request
session = await self._get_session()
async with session.get(url) as resp:
# Extract JSON answer if possible
json_answer = None
try:
json_answer = await resp.json()
except aiohttp.ContentTypeError:
pass
# Raise known exception if required
exception = HttpException.get(resp.status)
if exception is not None:
raise exception(json_answer=json_answer)
# Raise unknown exception if required
resp.raise_for_status()
return json_answer
[docs] async def close(self):
"""Close current session."""
if self._session is not None and not self._external_session:
await self._session.close()
[docs] async def get_user(self, user_id: int) -> User:
"""
Return a user referenced by its id.
:param user_id: ID of the user to retrieve
:type user_id: int
:return: User instance associated to given ID
:rtype: User
:raise UnauthorizedQuery: Credentials are invalid
:raise ForbiddenQuery: User of given ID is not linked to
current account
:raise aiohttp.ClientResponseError: Unexpected HTTP error
during API call
"""
data = await self._query(self._url_provider.user(user_id))
try:
return User(**data)
except TypeError as err:
raise UnknownAnswerScheme(data, err.args[0]) from err
[docs] async def get_users(self) -> List[User]:
"""
Return all users associated to credentials.
The API definition seems to indicate that multiple users
can be associated to a single account (probably for pro subscriptions).
For public consumers, this methods will return a single user.
:return: List of User instances associated to account
:rtype: List[User]
:raise UnauthorizedQuery: Credentials are invalid
:raise aiohttp.ClientResponseError: Unexpected HTTP error
during API call
"""
data = await self._query(self._url_provider.users())
users: List[User] = []
for item in data:
try:
users.append(User(**item))
except TypeError as err:
raise UnknownAnswerScheme(item, err.args[0]) from err
return users
[docs] async def get_device(self, device_id: int) -> Device:
"""
Return a device referenced by its id.
:param device_id: Unique identifier of a device
:type device_id: int
:return: Device instance of given id
:rtype: Device
:raise UnauthorizedQuery: Credentials are invalid
:raise ForbiddenQuery: Device of given ID is not linked to
current account
:raise aiohttp.ClientResponseError: Unexpected HTTP error
during API call
"""
data = await self._query(self._url_provider.device(device_id))
return Device.get(data)
[docs] async def get_devices(self, kind: Optional[str] = None) -> List[Device]:
"""
Return devices associated to credentials.
By default, all devices (included associated smartphones) are
returned. The `kind` parameter allows to filter only
devices of a given type ('android', 'iphone' or 'tracker').
:param kind: kind of devices to retrieve
:type kind: str, optional
:return: List of retrieved devices
:rtype: List[Device]
:raise UnauthorizedQuery: Credentials are invalid
:raise aiohttp.ClientResponseError: Unexpected HTTP error
during API call
:raise KeyError: Undefined kind requested
"""
data = await self._query(self._url_provider.devices(kind=kind))
return [Device.get(item) for item in data]
[docs] async def get_trackers(self) -> List[Tracker]:
"""
Query API for the list of trackers associated to credentials.
:return: Tracker devices associated to current account
:rtype: List[Tracker]
:raise UnauthorizedQuery: Credentials are invalid
:raise requests.HTTPError: Unexpected HTTP error during API call
"""
data = await self._query(self._url_provider.devices(kind="tracker"))
trackers: List[Tracker] = []
for item in data:
device = Device.get(item)
if isinstance(device, Tracker):
trackers.append(device)
return trackers
[docs] async def get_locations(
self,
device: Tracker,
not_before: Optional[datetime.datetime] = None,
not_after: Optional[datetime.datetime] = None,
max_count: int = 20,
) -> List[TrackerData]:
"""
Extract the list of tracker locations.
:param device: The tracker instance whose locations must be extracted.
:type device: Tracker
:param not_before: Minimum date-time of the locations to extract.
:type not_before: datetime.datetime, optional
:param not_after: Maximum date-time of the locations to extract.
:type not_after: datetime.datetime, optional
:param max_count: Maximum count of position to extract. Note that
one API query yields 20 locations.
:type max_count: int, optional
:return: List of extracted locations
:rtype: List[TrackerData]
:raise UnauthorizedQuery: Credentials are invalid
:raise ForbiddenQuery: provided Device is not linked to
current account (should not happen if Device was obtained
with :meth:`get_devices`).
:raise aiohttp.ClientResponseError: Unexpected HTTP error
during API call
"""
not_before_ts: Optional[int] = (
None if not_before is None else not_before.timestamp().__ceil__()
)
not_after_ts: Optional[int] = (
None if not_after is None else not_after.timestamp().__floor__()
)
res = []
while max_count > 0:
data = await self._query(
self._url_provider.locations(
device_id=device.id,
not_after=not_after_ts,
not_before=not_before_ts,
)
) # Seems to return between 0 and 20 locations.
# Stop if not result returned.
if len(data) == 0:
break
# Pop returned results one by one and stop if max_count is reached.
while len(data) > 0:
tracker_data = data.pop(0)
try:
res.append(TrackerData(**tracker_data))
except TypeError as err:
raise UnknownAnswerScheme(tracker_data, err.args[0]) from err
max_count -= 1
if max_count <= 0:
break
# Update not_after to match the currently oldest location.
not_after_ts = res[-1].datetime.timestamp().__floor__()
return res
[docs] async def get_tracker_status(self, device: Tracker) -> TrackerStatus:
"""
Get the current status of a given tracker.
:param device: The tracker instance whose status is queried.
:type device: Tracker
:return: Current status of the tracker
:rtype: TrackerStatus
"""
data = await self._query(self._url_provider.tracker_status(device_id=device.id))
try:
return TrackerStatus(**data)
except TypeError as err:
raise UnknownAnswerScheme(data, err.args[0]) from err
[docs] async def get_tracker_config(self, device: Tracker) -> TrackerConfig:
"""
Get the current configuration of a given tracker.
:param device: The tracker instance whose configuration is queried.
:type device: Tracker
:return: Current config of the tracker
:rtype: TrackerConfig
"""
data = await self._query(self._url_provider.tracker_config(device_id=device.id))
try:
return TrackerConfig(**data)
except TypeError as err:
raise UnknownAnswerScheme(data, err.args[0]) from err