Module sharkiq.ayla_api

Simple implementation of the Ayla networks API

Shark IQ robots use the Ayla networks IoT API to communicate with the device. Documentation can be found at: - https://developer.aylanetworks.com/apibrowser/ - https://docs.aylanetworks.com/cloud-services/api-browser/

Functions

def get_ayla_api(username: str,
password: str,
websession: aiohttp.client.ClientSession | None = None,
europe: bool = False)
Expand source code
def get_ayla_api(username: str, password: str, websession: Optional[aiohttp.ClientSession] = None, europe: bool = False):
    """
    Get an AylaApi object.

    Args:
        username: The email address of the user.
        password: The password of the user.
        websession: A websession to use for the API.  If None, a new session will be created.
        europe: If True, use the EU login URL and app ID/secret.

    Returns:
        An AylaApi object.
    """
    if europe:
        return AylaApi(username, password, EU_SHARK_APP_ID, EU_AUTH0_CLIENT_ID, EU_SHARK_APP_SECRET, websession=websession, europe=europe)
    else:
        return AylaApi(username, password, SHARK_APP_ID, AUTH0_CLIENT_ID, SHARK_APP_SECRET, websession=websession)

Get an AylaApi object.

Args

username
The email address of the user.
password
The password of the user.
websession
A websession to use for the API. If None, a new session will be created.
europe
If True, use the EU login URL and app ID/secret.

Returns

An AylaApi object.

Classes

class AylaApi (email: str,
password: str,
app_id: str,
auth0_client_id: str,
app_secret: str,
websession: aiohttp.client.ClientSession | None = None,
europe: bool = False)
Expand source code
class AylaApi:
    """Simple Ayla Networks API wrapper."""

    def __init__(
            self,
            email: str,
            password: str,
            app_id: str,
            auth0_client_id: str,
            app_secret: str,
            websession: Optional[aiohttp.ClientSession] = None,
            europe: bool = False):
        """
        Initialize the AylaApi object.

        Args:
            email: The email address of the user.
            password: The password of the user.
            app_id: The app ID of the Ayla app.
            app_secret: The app secret of the Ayla app.
            websession: A websession to use for the API.  If None, a new session will be created.
            europe: If True, use the EU login URL and app ID/secret.
        """
        self._email = email
        self._password = password
        self._auth0_id_token = None  # type: Optional[str]
        self._access_token = None  # type: Optional[str]
        self._refresh_token = None  # type: Optional[str]
        self._auth_expiration = None  # type: Optional[datetime]
        self._is_authed = False  # type: bool
        self._app_id = app_id
        self._auth0_client_id = auth0_client_id
        self._app_secret = app_secret
        self.websession = websession
        self.europe = europe

    async def ensure_session(self) -> aiohttp.ClientSession:
        """
        Ensure that we have an aiohttp ClientSession.
        
        Returns:
            An aiohttp ClientSession.
        """
        if self.websession is None:
            self.websession = aiohttp.ClientSession()
        return self.websession

    @property
    def _login_data(self) -> Dict[str, Dict]:
        """
        Prettily formatted data for the login flow.
        
        Returns:
            A dict containing the login data.
        """
        return {
            "app_id": self._app_id,
            "app_secret": self._app_secret,
            "token": self._auth0_id_token
        }
    
    @property
    def _auth0_login_data(self) -> Dict[str, Dict]:
        """
        Prettily formatted data for the Auth0 login flow.
        
        Returns:
            A dict containing the login data.
        """
        return {
            "grant_type": "password",
            "client_id": self._auth0_client_id,
            "username": self._email,
            "password": self._password,
            "scope": "openid profile email offline_access"
        }

    def _set_credentials(self, status_code: int, login_result: Dict):
        """
        Update the internal credentials store.
        
        Args:
            status_code: The status code of the login response.
            login_result: The result of the login response.
        """
        if status_code == 404:
            raise SharkIqAuthError(login_result["errors"] + " (Confirm app_id and app_secret are correct)")
        elif status_code == 401:
            raise SharkIqAuthError(login_result["errors"])

        self._access_token = login_result["access_token"]
        self._refresh_token = login_result["refresh_token"]
        self._auth_expiration = datetime.now() + timedelta(seconds=login_result["expires_in"])
        self._is_authed = True  # TODO: Any non 200 status code should cause this to be false

    def _set_id_token(self, status_code: int, login_result: Dict):
        """
        Update the ID token.

        Args:
            status_code: The status code of the login response.
            login_result: The result of the login response.
        """
        if status_code == 401 and login_result["error"] == "requires_verification":
            raise SharkIqAuthError(login_result["error_description"] + " (Try logging in with the SharkClean app, then try again)")
        elif status_code == 401:
            raise SharkIqAuthError(login_result["error_description"] + " (Confirm client_id is correct)")
        elif status_code == 400 or status_code == 403:
            raise SharkIqAuthError(login_result["error_description"])
        
        self._auth0_id_token = login_result["id_token"]

    def sign_in(self):
        """
        Authenticate to Ayla API synchronously.
        """
        auth0_login_data = self._auth0_login_data
        headers = {
            "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0"
        }

        auth0_resp = requests.post(f"{EU_AUTH0_URL if self.europe else AUTH0_URL:s}/oauth/token", json=auth0_login_data, headers=headers)
        self._set_id_token(auth0_resp.status_code, auth0_resp.json())

        login_data = self._login_data
        resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/api/v1/token_sign_in", json=login_data)
        self._set_credentials(resp.status_code, resp.json())

    def refresh_auth(self):
        """
        Refresh the authentication synchronously.
        """
        refresh_data = {"user": {"refresh_token": self._refresh_token}}
        resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data)
        self._set_credentials(resp.status_code, resp.json())

    async def async_sign_in(self):
        """
        Authenticate to Ayla API asynchronously.
        """
        session = await self.ensure_session()

        auth0_login_data = self._auth0_login_data
        headers = {
            "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0"
        }
        auth0_url = f"{EU_AUTH0_URL if self.europe else AUTH0_URL}/oauth/token"
        async with session.post(auth0_url, json=auth0_login_data, headers=headers) as auth0_resp:
            auth0_resp_json = await auth0_resp.json()
            self._set_id_token(auth0_resp.status, auth0_resp_json)

        login_data = self._login_data
        login_url = f"{EU_LOGIN_URL if self.europe else LOGIN_URL}/api/v1/token_sign_in"
        async with session.post(login_url, json=login_data) as login_resp:
            login_resp_json = await login_resp.json()
            self._set_credentials(login_resp.status, login_resp_json)


    async def async_refresh_auth(self):
        """
        Refresh the authentication synchronously.
        """
        session = await self.ensure_session()
        refresh_data = {"user": {"refresh_token": self._refresh_token}}
        async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp:
            self._set_credentials(resp.status, await resp.json())

    @property
    def sign_out_data(self) -> Dict:
        """
        Payload for the sign_out call.
        
        Returns:
            A dict containing the sign out data.
        """
        return {"user": {"access_token": self._access_token}}

    def _clear_auth(self):
        """
        Clear authentication state.
        """
        self._is_authed = False
        self._access_token = None
        self._refresh_token = None
        self._auth_expiration = None

    def sign_out(self):
        """
        Sign out and invalidate the access token.
        """
        requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data)
        self._clear_auth()

    async def async_sign_out(self):
        """
        Sign out and invalidate the access token.
        """
        session = await self.ensure_session()
        async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _:
            pass
        self._clear_auth()

    @property
    def auth_expiration(self) -> Optional[datetime]:
        """
        Get the time at which the authentication expires.
        
        Returns:
            The time at which the authentication expires.
        """
        if not self._is_authed:
            return None
        elif self._auth_expiration is None:  # This should not happen, but let's be ready if it does...
            raise SharkIqNotAuthedError("Invalid state.  Please reauthorize.")
        else:
            return self._auth_expiration

    @property
    def token_expired(self) -> bool:
        """
        Return true if the token has already expired.
        
        Returns:
            True if the token has already expired.
        """
        if self.auth_expiration is None:
            return True
        return datetime.now() > self.auth_expiration

    @property
    def token_expiring_soon(self) -> bool:
        """
        Return true if the token will expire soon.
        
        Returns:
            True if the token will expire soon.
        """
        if self.auth_expiration is None:
            return True
        return datetime.now() > self.auth_expiration - timedelta(seconds=600)  # Prevent timeout immediately following

    def check_auth(self, raise_expiring_soon=True):
        """
        Confirm authentication status.
        
        Args:
            raise_expiring_soon: Raise an exception if the token will expire soon.

        Raises:
            SharkIqAuthExpiringError: If the token will expire soon.
            SharkIqAuthError: If the token has already expired.
        """
        if not self._access_token or not self._is_authed or self.token_expired:
            self._is_authed = False
            raise SharkIqNotAuthedError()
        elif raise_expiring_soon and self.token_expiring_soon:
            raise SharkIqAuthExpiringError()

    @property
    def auth_header(self) -> Dict[str, str]:
        """
        Get the authorization header.

        Returns:
            The authorization header.
        """
        self.check_auth()
        return {"Authorization": f"auth_token {self._access_token:s}"}

    def _get_headers(self, fn_kwargs) -> Dict[str, str]:
        """
        Extract the headers element from fn_kwargs, removing it if it exists
        and updating with self.auth_header.

        Args:
            fn_kwargs: The kwargs passed to the function.

        Returns:
            The headers.
        """
        try:
            headers = fn_kwargs['headers']
        except KeyError:
            headers = {}
        else:
            del fn_kwargs['headers']
        headers.update(self.auth_header)
        return headers

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """
        Make a request to the Ayla API.

        Args:
            method: The HTTP method to use.
            url: The URL to request.
            **kwargs: Additional keyword arguments to pass to requests.

        Returns:
            The response from the request.
        """
        headers = self._get_headers(kwargs)
        return requests.request(method, url, headers=headers, **kwargs)

    async def async_request(self, http_method: str, url: str, **kwargs):
        """
        Make a request to the Ayla API.
        
        Args:
            http_method: The HTTP method to use.
            url: The URL to request.
            **kwargs: Additional keyword arguments to pass to requests.

        Returns:
            The response from the request.
        """
        session = await self.ensure_session()
        headers = self._get_headers(kwargs)
        return session.request(http_method, url, headers=headers, **kwargs)

    def list_devices(self) -> List[Dict]:
        """
        List the devices on the account.

        Returns:
            A list of devices.
        """
        resp = self.request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json")
        devices = resp.json()
        if resp.status_code == 401:
            raise SharkIqAuthError(devices["error"]["message"])
        return [d["device"] for d in devices]

    async def async_list_devices(self) -> List[Dict]:
        """
        List the devices on the account.

        Returns:
            A list of devices.
        """
        async with await self.async_request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") as resp:
            devices = await resp.json()
            if resp.status == 401:
                raise SharkIqAuthError(devices["error"]["message"])
        return [d["device"] for d in devices]

    def get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
        """
        Get the devices on the account.
        
        Args:
            update: Update the device list if it is out of date.

        Returns:
            A list of devices.
        """
        devices = [SharkIqVacuum(self, d, europe=self.europe) for d in self.list_devices()]
        if update:
            for device in devices:
                device.get_metadata()
                device.update()
        return devices

    async def async_get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
        """
        Get the devices on the account.

        Args:
            update: Update the device list if it is out of date.
        
        Returns:
            A list of devices.
        """
        devices = [SharkIqVacuum(self, d, europe=self.europe) for d in await self.async_list_devices()]
        if update:
            for device in devices:
                await device.async_get_metadata()
                await device.async_update()
        return devices

Simple Ayla Networks API wrapper.

Initialize the AylaApi object.

Args

email
The email address of the user.
password
The password of the user.
app_id
The app ID of the Ayla app.
app_secret
The app secret of the Ayla app.
websession
A websession to use for the API. If None, a new session will be created.
europe
If True, use the EU login URL and app ID/secret.

Instance variables

prop auth_expiration : datetime.datetime | None
Expand source code
@property
def auth_expiration(self) -> Optional[datetime]:
    """
    Get the time at which the authentication expires.
    
    Returns:
        The time at which the authentication expires.
    """
    if not self._is_authed:
        return None
    elif self._auth_expiration is None:  # This should not happen, but let's be ready if it does...
        raise SharkIqNotAuthedError("Invalid state.  Please reauthorize.")
    else:
        return self._auth_expiration

Get the time at which the authentication expires.

Returns

The time at which the authentication expires.

prop auth_header : Dict[str, str]
Expand source code
@property
def auth_header(self) -> Dict[str, str]:
    """
    Get the authorization header.

    Returns:
        The authorization header.
    """
    self.check_auth()
    return {"Authorization": f"auth_token {self._access_token:s}"}

Get the authorization header.

Returns

The authorization header.

prop sign_out_data : Dict
Expand source code
@property
def sign_out_data(self) -> Dict:
    """
    Payload for the sign_out call.
    
    Returns:
        A dict containing the sign out data.
    """
    return {"user": {"access_token": self._access_token}}

Payload for the sign_out call.

Returns

A dict containing the sign out data.

prop token_expired : bool
Expand source code
@property
def token_expired(self) -> bool:
    """
    Return true if the token has already expired.
    
    Returns:
        True if the token has already expired.
    """
    if self.auth_expiration is None:
        return True
    return datetime.now() > self.auth_expiration

Return true if the token has already expired.

Returns

True if the token has already expired.

prop token_expiring_soon : bool
Expand source code
@property
def token_expiring_soon(self) -> bool:
    """
    Return true if the token will expire soon.
    
    Returns:
        True if the token will expire soon.
    """
    if self.auth_expiration is None:
        return True
    return datetime.now() > self.auth_expiration - timedelta(seconds=600)  # Prevent timeout immediately following

Return true if the token will expire soon.

Returns

True if the token will expire soon.

Methods

async def async_get_devices(self, update: bool = True) ‑> List[SharkIqVacuum]
Expand source code
async def async_get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
    """
    Get the devices on the account.

    Args:
        update: Update the device list if it is out of date.
    
    Returns:
        A list of devices.
    """
    devices = [SharkIqVacuum(self, d, europe=self.europe) for d in await self.async_list_devices()]
    if update:
        for device in devices:
            await device.async_get_metadata()
            await device.async_update()
    return devices

Get the devices on the account.

Args

update
Update the device list if it is out of date.

Returns

A list of devices.

async def async_list_devices(self) ‑> List[Dict]
Expand source code
async def async_list_devices(self) -> List[Dict]:
    """
    List the devices on the account.

    Returns:
        A list of devices.
    """
    async with await self.async_request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") as resp:
        devices = await resp.json()
        if resp.status == 401:
            raise SharkIqAuthError(devices["error"]["message"])
    return [d["device"] for d in devices]

List the devices on the account.

Returns

A list of devices.

async def async_refresh_auth(self)
Expand source code
async def async_refresh_auth(self):
    """
    Refresh the authentication synchronously.
    """
    session = await self.ensure_session()
    refresh_data = {"user": {"refresh_token": self._refresh_token}}
    async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp:
        self._set_credentials(resp.status, await resp.json())

Refresh the authentication synchronously.

async def async_request(self, http_method: str, url: str, **kwargs)
Expand source code
async def async_request(self, http_method: str, url: str, **kwargs):
    """
    Make a request to the Ayla API.
    
    Args:
        http_method: The HTTP method to use.
        url: The URL to request.
        **kwargs: Additional keyword arguments to pass to requests.

    Returns:
        The response from the request.
    """
    session = await self.ensure_session()
    headers = self._get_headers(kwargs)
    return session.request(http_method, url, headers=headers, **kwargs)

Make a request to the Ayla API.

Args

http_method
The HTTP method to use.
url
The URL to request.
**kwargs
Additional keyword arguments to pass to requests.

Returns

The response from the request.

async def async_sign_in(self)
Expand source code
async def async_sign_in(self):
    """
    Authenticate to Ayla API asynchronously.
    """
    session = await self.ensure_session()

    auth0_login_data = self._auth0_login_data
    headers = {
        "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0"
    }
    auth0_url = f"{EU_AUTH0_URL if self.europe else AUTH0_URL}/oauth/token"
    async with session.post(auth0_url, json=auth0_login_data, headers=headers) as auth0_resp:
        auth0_resp_json = await auth0_resp.json()
        self._set_id_token(auth0_resp.status, auth0_resp_json)

    login_data = self._login_data
    login_url = f"{EU_LOGIN_URL if self.europe else LOGIN_URL}/api/v1/token_sign_in"
    async with session.post(login_url, json=login_data) as login_resp:
        login_resp_json = await login_resp.json()
        self._set_credentials(login_resp.status, login_resp_json)

Authenticate to Ayla API asynchronously.

async def async_sign_out(self)
Expand source code
async def async_sign_out(self):
    """
    Sign out and invalidate the access token.
    """
    session = await self.ensure_session()
    async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _:
        pass
    self._clear_auth()

Sign out and invalidate the access token.

def check_auth(self, raise_expiring_soon=True)
Expand source code
def check_auth(self, raise_expiring_soon=True):
    """
    Confirm authentication status.
    
    Args:
        raise_expiring_soon: Raise an exception if the token will expire soon.

    Raises:
        SharkIqAuthExpiringError: If the token will expire soon.
        SharkIqAuthError: If the token has already expired.
    """
    if not self._access_token or not self._is_authed or self.token_expired:
        self._is_authed = False
        raise SharkIqNotAuthedError()
    elif raise_expiring_soon and self.token_expiring_soon:
        raise SharkIqAuthExpiringError()

Confirm authentication status.

Args

raise_expiring_soon
Raise an exception if the token will expire soon.

Raises

SharkIqAuthExpiringError
If the token will expire soon.
SharkIqAuthError
If the token has already expired.
async def ensure_session(self) ‑> aiohttp.client.ClientSession
Expand source code
async def ensure_session(self) -> aiohttp.ClientSession:
    """
    Ensure that we have an aiohttp ClientSession.
    
    Returns:
        An aiohttp ClientSession.
    """
    if self.websession is None:
        self.websession = aiohttp.ClientSession()
    return self.websession

Ensure that we have an aiohttp ClientSession.

Returns

An aiohttp ClientSession.

def get_devices(self, update: bool = True) ‑> List[SharkIqVacuum]
Expand source code
def get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
    """
    Get the devices on the account.
    
    Args:
        update: Update the device list if it is out of date.

    Returns:
        A list of devices.
    """
    devices = [SharkIqVacuum(self, d, europe=self.europe) for d in self.list_devices()]
    if update:
        for device in devices:
            device.get_metadata()
            device.update()
    return devices

Get the devices on the account.

Args

update
Update the device list if it is out of date.

Returns

A list of devices.

def list_devices(self) ‑> List[Dict]
Expand source code
def list_devices(self) -> List[Dict]:
    """
    List the devices on the account.

    Returns:
        A list of devices.
    """
    resp = self.request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json")
    devices = resp.json()
    if resp.status_code == 401:
        raise SharkIqAuthError(devices["error"]["message"])
    return [d["device"] for d in devices]

List the devices on the account.

Returns

A list of devices.

def refresh_auth(self)
Expand source code
def refresh_auth(self):
    """
    Refresh the authentication synchronously.
    """
    refresh_data = {"user": {"refresh_token": self._refresh_token}}
    resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data)
    self._set_credentials(resp.status_code, resp.json())

Refresh the authentication synchronously.

def request(self, method: str, url: str, **kwargs) ‑> requests.models.Response
Expand source code
def request(self, method: str, url: str, **kwargs) -> requests.Response:
    """
    Make a request to the Ayla API.

    Args:
        method: The HTTP method to use.
        url: The URL to request.
        **kwargs: Additional keyword arguments to pass to requests.

    Returns:
        The response from the request.
    """
    headers = self._get_headers(kwargs)
    return requests.request(method, url, headers=headers, **kwargs)

Make a request to the Ayla API.

Args

method
The HTTP method to use.
url
The URL to request.
**kwargs
Additional keyword arguments to pass to requests.

Returns

The response from the request.

def sign_in(self)
Expand source code
def sign_in(self):
    """
    Authenticate to Ayla API synchronously.
    """
    auth0_login_data = self._auth0_login_data
    headers = {
        "User-Agent": "SharkClean/29562 CFNetwork/3826.400.120 Darwin/24.3.0"
    }

    auth0_resp = requests.post(f"{EU_AUTH0_URL if self.europe else AUTH0_URL:s}/oauth/token", json=auth0_login_data, headers=headers)
    self._set_id_token(auth0_resp.status_code, auth0_resp.json())

    login_data = self._login_data
    resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/api/v1/token_sign_in", json=login_data)
    self._set_credentials(resp.status_code, resp.json())

Authenticate to Ayla API synchronously.

def sign_out(self)
Expand source code
def sign_out(self):
    """
    Sign out and invalidate the access token.
    """
    requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data)
    self._clear_auth()

Sign out and invalidate the access token.