Module sharkiq.ayla_api
Simple implementation of the Ayla networks API
Shark IQ robots use the Ayla networks IoT API to communicate with the device. Documentation can be found at: - https://developer.aylanetworks.com/apibrowser/ - https://docs.aylanetworks.com/cloud-services/api-browser/
Functions
-
Expand source code
def get_ayla_api(username: str, password: str, websession: Optional[aiohttp.ClientSession] = None, europe: bool = False): """ Get an AylaApi object. Args: username: The email address of the user. password: The password of the user. websession: A websession to use for the API. If None, a new session will be created. europe: If True, use the EU login URL and app ID/secret. Returns: An AylaApi object. """ if europe: return AylaApi(username, password, EU_SHARK_APP_ID, EU_AUTH0_CLIENT_ID, EU_SHARK_APP_SECRET, websession=websession, europe=europe) else: return AylaApi(username, password, SHARK_APP_ID, AUTH0_CLIENT_ID, SHARK_APP_SECRET, websession=websession)
Get an AylaApi object.
Args
username
- The email address of the user.
password
- The password of the user.
websession
- A websession to use for the API. If None, a new session will be created.
europe
- If True, use the EU login URL and app ID/secret.
Returns
An AylaApi object.
Classes
-
Expand source code
class AylaApi: """Simple Ayla Networks API wrapper.""" def __init__( self, email: str, password: str, app_id: str, auth0_client_id: str, app_secret: str, websession: Optional[aiohttp.ClientSession] = None, europe: bool = False): """ Initialize the AylaApi object. Args: email: The email address of the user. password: The password of the user. app_id: The app ID of the Ayla app. app_secret: The app secret of the Ayla app. websession: A websession to use for the API. If None, a new session will be created. europe: If True, use the EU login URL and app ID/secret. """ self._email = email self._password = password self._auth0_id_token = None # type: Optional[str] self._access_token = None # type: Optional[str] self._refresh_token = None # type: Optional[str] self._auth_expiration = None # type: Optional[datetime] self._is_authed = False # type: bool self._app_id = app_id self._auth0_client_id = auth0_client_id self._app_secret = app_secret self.websession = websession self.europe = europe async def ensure_session(self) -> aiohttp.ClientSession: """ Ensure that we have an aiohttp ClientSession. Returns: An aiohttp ClientSession. """ if self.websession is None: self.websession = aiohttp.ClientSession() return self.websession @property def _login_data(self) -> Dict[str, Dict]: """ Prettily formatted data for the login flow. Returns: A dict containing the login data. """ return { "app_id": self._app_id, "app_secret": self._app_secret, "token": self._auth0_id_token } @property def _auth0_login_data(self) -> Dict[str, Dict]: """ Prettily formatted data for the Auth0 login flow. Returns: A dict containing the login data. """ return { "grant_type": "password", "client_id": self._auth0_client_id, "username": self._email, "password": self._password, "scope": "openid profile email offline_access" } def _set_credentials(self, status_code: int, login_result: Dict): """ Update the internal credentials store. Args: status_code: The status code of the login response. login_result: The result of the login response. """ if status_code == 404: raise SharkIqAuthError(login_result["errors"] + " (Confirm app_id and app_secret are correct)") elif status_code == 401: raise SharkIqAuthError(login_result["errors"]) self._access_token = login_result["access_token"] self._refresh_token = login_result["refresh_token"] self._auth_expiration = datetime.now() + timedelta(seconds=login_result["expires_in"]) self._is_authed = True # TODO: Any non 200 status code should cause this to be false def _set_id_token(self, status_code: int, login_result: Dict): """ Update the ID token. Args: status_code: The status code of the login response. login_result: The result of the login response. """ if status_code == 401 and login_result["error"] == "requires_verification": raise SharkIqAuthError(login_result["error_description"] + " (Try logging in with the SharkClean app, then try again)") elif status_code == 401: raise SharkIqAuthError(login_result["error_description"] + " (Confirm client_id is correct)") elif status_code == 400 or status_code == 403: raise SharkIqAuthError(login_result["error_description"]) self._auth0_id_token = login_result["id_token"] def sign_in(self): """ Authenticate to Ayla API synchronously. """ auth0_login_data = self._auth0_login_data headers = { "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0" } auth0_resp = requests.post(f"{EU_AUTH0_URL if self.europe else AUTH0_URL:s}/oauth/token", json=auth0_login_data, headers=headers) self._set_id_token(auth0_resp.status_code, auth0_resp.json()) login_data = self._login_data resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/api/v1/token_sign_in", json=login_data) self._set_credentials(resp.status_code, resp.json()) def refresh_auth(self): """ Refresh the authentication synchronously. """ refresh_data = {"user": {"refresh_token": self._refresh_token}} resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) self._set_credentials(resp.status_code, resp.json()) async def async_sign_in(self): """ Authenticate to Ayla API asynchronously. """ session = await self.ensure_session() auth0_login_data = self._auth0_login_data headers = { "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0" } auth0_url = f"{EU_AUTH0_URL if self.europe else AUTH0_URL}/oauth/token" async with session.post(auth0_url, json=auth0_login_data, headers=headers) as auth0_resp: auth0_resp_json = await auth0_resp.json() self._set_id_token(auth0_resp.status, auth0_resp_json) login_data = self._login_data login_url = f"{EU_LOGIN_URL if self.europe else LOGIN_URL}/api/v1/token_sign_in" async with session.post(login_url, json=login_data) as login_resp: login_resp_json = await login_resp.json() self._set_credentials(login_resp.status, login_resp_json) async def async_refresh_auth(self): """ Refresh the authentication synchronously. """ session = await self.ensure_session() refresh_data = {"user": {"refresh_token": self._refresh_token}} async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp: self._set_credentials(resp.status, await resp.json()) @property def sign_out_data(self) -> Dict: """ Payload for the sign_out call. Returns: A dict containing the sign out data. """ return {"user": {"access_token": self._access_token}} def _clear_auth(self): """ Clear authentication state. """ self._is_authed = False self._access_token = None self._refresh_token = None self._auth_expiration = None def sign_out(self): """ Sign out and invalidate the access token. """ requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) self._clear_auth() async def async_sign_out(self): """ Sign out and invalidate the access token. """ session = await self.ensure_session() async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _: pass self._clear_auth() @property def auth_expiration(self) -> Optional[datetime]: """ Get the time at which the authentication expires. Returns: The time at which the authentication expires. """ if not self._is_authed: return None elif self._auth_expiration is None: # This should not happen, but let's be ready if it does... raise SharkIqNotAuthedError("Invalid state. Please reauthorize.") else: return self._auth_expiration @property def token_expired(self) -> bool: """ Return true if the token has already expired. Returns: True if the token has already expired. """ if self.auth_expiration is None: return True return datetime.now() > self.auth_expiration @property def token_expiring_soon(self) -> bool: """ Return true if the token will expire soon. Returns: True if the token will expire soon. """ if self.auth_expiration is None: return True return datetime.now() > self.auth_expiration - timedelta(seconds=600) # Prevent timeout immediately following def check_auth(self, raise_expiring_soon=True): """ Confirm authentication status. Args: raise_expiring_soon: Raise an exception if the token will expire soon. Raises: SharkIqAuthExpiringError: If the token will expire soon. SharkIqAuthError: If the token has already expired. """ if not self._access_token or not self._is_authed or self.token_expired: self._is_authed = False raise SharkIqNotAuthedError() elif raise_expiring_soon and self.token_expiring_soon: raise SharkIqAuthExpiringError() @property def auth_header(self) -> Dict[str, str]: """ Get the authorization header. Returns: The authorization header. """ self.check_auth() return {"Authorization": f"auth_token {self._access_token:s}"} def _get_headers(self, fn_kwargs) -> Dict[str, str]: """ Extract the headers element from fn_kwargs, removing it if it exists and updating with self.auth_header. Args: fn_kwargs: The kwargs passed to the function. Returns: The headers. """ try: headers = fn_kwargs['headers'] except KeyError: headers = {} else: del fn_kwargs['headers'] headers.update(self.auth_header) return headers def request(self, method: str, url: str, **kwargs) -> requests.Response: """ Make a request to the Ayla API. Args: method: The HTTP method to use. url: The URL to request. **kwargs: Additional keyword arguments to pass to requests. Returns: The response from the request. """ headers = self._get_headers(kwargs) return requests.request(method, url, headers=headers, **kwargs) async def async_request(self, http_method: str, url: str, **kwargs): """ Make a request to the Ayla API. Args: http_method: The HTTP method to use. url: The URL to request. **kwargs: Additional keyword arguments to pass to requests. Returns: The response from the request. """ session = await self.ensure_session() headers = self._get_headers(kwargs) return session.request(http_method, url, headers=headers, **kwargs) def list_devices(self) -> List[Dict]: """ List the devices on the account. Returns: A list of devices. """ resp = self.request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") devices = resp.json() if resp.status_code == 401: raise SharkIqAuthError(devices["error"]["message"]) return [d["device"] for d in devices] async def async_list_devices(self) -> List[Dict]: """ List the devices on the account. Returns: A list of devices. """ async with await self.async_request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") as resp: devices = await resp.json() if resp.status == 401: raise SharkIqAuthError(devices["error"]["message"]) return [d["device"] for d in devices] def get_devices(self, update: bool = True) -> List[SharkIqVacuum]: """ Get the devices on the account. Args: update: Update the device list if it is out of date. Returns: A list of devices. """ devices = [SharkIqVacuum(self, d, europe=self.europe) for d in self.list_devices()] if update: for device in devices: device.get_metadata() device.update() return devices async def async_get_devices(self, update: bool = True) -> List[SharkIqVacuum]: """ Get the devices on the account. Args: update: Update the device list if it is out of date. Returns: A list of devices. """ devices = [SharkIqVacuum(self, d, europe=self.europe) for d in await self.async_list_devices()] if update: for device in devices: await device.async_get_metadata() await device.async_update() return devices
Simple Ayla Networks API wrapper.
Initialize the AylaApi object.
Args
email
- The email address of the user.
password
- The password of the user.
app_id
- The app ID of the Ayla app.
app_secret
- The app secret of the Ayla app.
websession
- A websession to use for the API. If None, a new session will be created.
europe
- If True, use the EU login URL and app ID/secret.
Instance variables
-
Expand source code
@property def auth_expiration(self) -> Optional[datetime]: """ Get the time at which the authentication expires. Returns: The time at which the authentication expires. """ if not self._is_authed: return None elif self._auth_expiration is None: # This should not happen, but let's be ready if it does... raise SharkIqNotAuthedError("Invalid state. Please reauthorize.") else: return self._auth_expiration
Get the time at which the authentication expires.
Returns
The time at which the authentication expires.
-
Expand source code
@property def auth_header(self) -> Dict[str, str]: """ Get the authorization header. Returns: The authorization header. """ self.check_auth() return {"Authorization": f"auth_token {self._access_token:s}"}
Get the authorization header.
Returns
The authorization header.
-
Expand source code
@property def sign_out_data(self) -> Dict: """ Payload for the sign_out call. Returns: A dict containing the sign out data. """ return {"user": {"access_token": self._access_token}}
Payload for the sign_out call.
Returns
A dict containing the sign out data.
-
Expand source code
@property def token_expired(self) -> bool: """ Return true if the token has already expired. Returns: True if the token has already expired. """ if self.auth_expiration is None: return True return datetime.now() > self.auth_expiration
Return true if the token has already expired.
Returns
True if the token has already expired.
-
Expand source code
@property def token_expiring_soon(self) -> bool: """ Return true if the token will expire soon. Returns: True if the token will expire soon. """ if self.auth_expiration is None: return True return datetime.now() > self.auth_expiration - timedelta(seconds=600) # Prevent timeout immediately following
Return true if the token will expire soon.
Returns
True if the token will expire soon.
Methods
-
Expand source code
async def async_get_devices(self, update: bool = True) -> List[SharkIqVacuum]: """ Get the devices on the account. Args: update: Update the device list if it is out of date. Returns: A list of devices. """ devices = [SharkIqVacuum(self, d, europe=self.europe) for d in await self.async_list_devices()] if update: for device in devices: await device.async_get_metadata() await device.async_update() return devices
Get the devices on the account.
Args
update
- Update the device list if it is out of date.
Returns
A list of devices.
-
Expand source code
async def async_list_devices(self) -> List[Dict]: """ List the devices on the account. Returns: A list of devices. """ async with await self.async_request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") as resp: devices = await resp.json() if resp.status == 401: raise SharkIqAuthError(devices["error"]["message"]) return [d["device"] for d in devices]
List the devices on the account.
Returns
A list of devices.
-
Expand source code
async def async_refresh_auth(self): """ Refresh the authentication synchronously. """ session = await self.ensure_session() refresh_data = {"user": {"refresh_token": self._refresh_token}} async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp: self._set_credentials(resp.status, await resp.json())
Refresh the authentication synchronously.
-
Expand source code
async def async_request(self, http_method: str, url: str, **kwargs): """ Make a request to the Ayla API. Args: http_method: The HTTP method to use. url: The URL to request. **kwargs: Additional keyword arguments to pass to requests. Returns: The response from the request. """ session = await self.ensure_session() headers = self._get_headers(kwargs) return session.request(http_method, url, headers=headers, **kwargs)
Make a request to the Ayla API.
Args
http_method
- The HTTP method to use.
url
- The URL to request.
**kwargs
- Additional keyword arguments to pass to requests.
Returns
The response from the request.
-
Expand source code
async def async_sign_in(self): """ Authenticate to Ayla API asynchronously. """ session = await self.ensure_session() auth0_login_data = self._auth0_login_data headers = { "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0" } auth0_url = f"{EU_AUTH0_URL if self.europe else AUTH0_URL}/oauth/token" async with session.post(auth0_url, json=auth0_login_data, headers=headers) as auth0_resp: auth0_resp_json = await auth0_resp.json() self._set_id_token(auth0_resp.status, auth0_resp_json) login_data = self._login_data login_url = f"{EU_LOGIN_URL if self.europe else LOGIN_URL}/api/v1/token_sign_in" async with session.post(login_url, json=login_data) as login_resp: login_resp_json = await login_resp.json() self._set_credentials(login_resp.status, login_resp_json)
Authenticate to Ayla API asynchronously.
-
Expand source code
async def async_sign_out(self): """ Sign out and invalidate the access token. """ session = await self.ensure_session() async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _: pass self._clear_auth()
Sign out and invalidate the access token.
-
Expand source code
def check_auth(self, raise_expiring_soon=True): """ Confirm authentication status. Args: raise_expiring_soon: Raise an exception if the token will expire soon. Raises: SharkIqAuthExpiringError: If the token will expire soon. SharkIqAuthError: If the token has already expired. """ if not self._access_token or not self._is_authed or self.token_expired: self._is_authed = False raise SharkIqNotAuthedError() elif raise_expiring_soon and self.token_expiring_soon: raise SharkIqAuthExpiringError()
Confirm authentication status.
Args
raise_expiring_soon
- Raise an exception if the token will expire soon.
Raises
SharkIqAuthExpiringError
- If the token will expire soon.
SharkIqAuthError
- If the token has already expired.
-
Expand source code
async def ensure_session(self) -> aiohttp.ClientSession: """ Ensure that we have an aiohttp ClientSession. Returns: An aiohttp ClientSession. """ if self.websession is None: self.websession = aiohttp.ClientSession() return self.websession
Ensure that we have an aiohttp ClientSession.
Returns
An aiohttp ClientSession.
-
Expand source code
def get_devices(self, update: bool = True) -> List[SharkIqVacuum]: """ Get the devices on the account. Args: update: Update the device list if it is out of date. Returns: A list of devices. """ devices = [SharkIqVacuum(self, d, europe=self.europe) for d in self.list_devices()] if update: for device in devices: device.get_metadata() device.update() return devices
Get the devices on the account.
Args
update
- Update the device list if it is out of date.
Returns
A list of devices.
-
Expand source code
def list_devices(self) -> List[Dict]: """ List the devices on the account. Returns: A list of devices. """ resp = self.request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") devices = resp.json() if resp.status_code == 401: raise SharkIqAuthError(devices["error"]["message"]) return [d["device"] for d in devices]
List the devices on the account.
Returns
A list of devices.
-
Expand source code
def refresh_auth(self): """ Refresh the authentication synchronously. """ refresh_data = {"user": {"refresh_token": self._refresh_token}} resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) self._set_credentials(resp.status_code, resp.json())
Refresh the authentication synchronously.
-
Expand source code
def request(self, method: str, url: str, **kwargs) -> requests.Response: """ Make a request to the Ayla API. Args: method: The HTTP method to use. url: The URL to request. **kwargs: Additional keyword arguments to pass to requests. Returns: The response from the request. """ headers = self._get_headers(kwargs) return requests.request(method, url, headers=headers, **kwargs)
Make a request to the Ayla API.
Args
method
- The HTTP method to use.
url
- The URL to request.
**kwargs
- Additional keyword arguments to pass to requests.
Returns
The response from the request.
-
Expand source code
def sign_in(self): """ Authenticate to Ayla API synchronously. """ auth0_login_data = self._auth0_login_data headers = { "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0" } auth0_resp = requests.post(f"{EU_AUTH0_URL if self.europe else AUTH0_URL:s}/oauth/token", json=auth0_login_data, headers=headers) self._set_id_token(auth0_resp.status_code, auth0_resp.json()) login_data = self._login_data resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/api/v1/token_sign_in", json=login_data) self._set_credentials(resp.status_code, resp.json())
Authenticate to Ayla API synchronously.
-
Expand source code
def sign_out(self): """ Sign out and invalidate the access token. """ requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) self._clear_auth()
Sign out and invalidate the access token.