Source code for bestlab_platform.tuya.openapi

"""Tuya Open API. Forked and refactored from Tuya SDK"""

from __future__ import annotations

import hashlib
import hmac
import json
import time
from typing import Any, Dict, Optional, Tuple

import requests

from ..exceptions import ResponseError
from .openlogging import filter_logger, logger

GET_TOKEN_API = "/v1.0/token"
REFRESH_TOKEN_API = "/v1.0/token/{}"

[docs]class TuyaTokenInfo: """Tuya token info. Attributes: access_token: Access token. expire_time: Valid period in seconds. refresh_token: Refresh token. uid: Tuya user ID. # platform_url: user region platform url """
[docs] def __init__(self, token_response: Dict[str, Any]): """Init TuyaTokenInfo.""" result = token_response.get("result", {}) self.expire_time = ( token_response.get("t", 0) + result.get("expire", result.get("expire_time", 0)) * 1000 ) self.access_token = result.get("access_token", "") self.refresh_token = result.get("refresh_token", "") self.uid = result.get("uid", "")
# self.platform_url = result.get("platform_url", "")
[docs]class TuyaOpenAPI: """Open Api. Typical usage example: openapi = TuyaOpenAPI(ENDPOINT, ACCESS_ID, ACCESS_KEY) """
[docs] def __init__( self, endpoint: str, access_id: str, access_secret: str, lang: str = "en", auto_connect: bool = True ): """Init TuyaOpenAPI.""" self.session = requests.session() self.endpoint = endpoint self.access_id = access_id self.access_secret = access_secret self.lang = lang self.__login_path = GET_TOKEN_API self.__refresh_token_path = REFRESH_TOKEN_API self.token_info: TuyaTokenInfo | None = None if auto_connect: self.connect()
# def _calculate_sign( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, body: Optional[Dict[str, Any]] = None, ) -> Tuple[str, int]: # HTTPMethod str_to_sign = method str_to_sign += "\n" # Content-SHA256 content_to_sha256 = ( "" if body is None or len(body.keys()) == 0 else json.dumps(body) ) str_to_sign += ( hashlib.sha256(content_to_sha256.encode( "utf8")).hexdigest().lower() ) str_to_sign += "\n" # Header str_to_sign += "\n" # URL str_to_sign += path if params is not None and len(params.keys()) > 0: str_to_sign += "?" query_builder = "" params_keys = sorted(params.keys()) for key in params_keys: query_builder += f"{key}={params[key]}&" str_to_sign += query_builder[:-1] # Sign t = int(time.time() * 1000) message = self.access_id if self.token_info is not None: message += self.token_info.access_token message += str(t) + str_to_sign sign = ( self.access_secret.encode("utf8"), msg=message.encode("utf8"), digestmod=hashlib.sha256, ) .hexdigest() .upper() ) return sign, t def _refresh_access_token_if_need(self, path: str) -> None: if path.startswith(self.__login_path): return if path.startswith(self.__refresh_token_path): return if self.token_info is None: self.connect() return # should use refresh token? now = int(time.time() * 1000) expired_time = self.token_info.expire_time if expired_time - 60 * 1000 > now: # 1min return self.token_info.access_token = "" response = self.get( self.__refresh_token_path.format(self.token_info.refresh_token) ) self.token_info = TuyaTokenInfo(response)
[docs] def connect( self ) -> Dict[str, Any]: """Connect to Tuya Cloud. Returns: response: connect response """ # Fix signature invalid bug when the user explicitly calls connect() self.token_info = None response = self.get( path=GET_TOKEN_API, params={ "grant_type": 1 } ) # Cache token info. self.token_info = TuyaTokenInfo(response) return response
[docs] def is_connect(self) -> bool: """Whether we have an access token. Note: will return true even if the access token is expired. Token refreshing is handled internally. """ return self.token_info is not None and len(self.token_info.access_token) > 0
def __request( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, body: Optional[Dict[str, Any]] = None, ) -> dict[str, Any]: """Internal method to sign and send. You should avoid using this method directly. Args: method (str): HTTP method path (str): relative path starting with "/" params (Optional[Dict[str, Any]]): HTTP parameters body (Optional[Dict[str, Any]]): HTTP body Returns: JSON decoded response (a dict). Raises: ResponseError: HTTP status code and response text """ self._refresh_access_token_if_need(path) access_token = self.token_info.access_token if self.token_info else "" sign, t = self._calculate_sign(method, path, params, body) headers = { "client_id": self.access_id, "sign": sign, "sign_method": "HMAC-SHA256", "access_token": access_token, "t": str(t), "lang": self.lang, } logger.debug( f"Request: method = {method}, " f"url = {self.endpoint + path}, " f"params = {params}, " f"body = {filter_logger(body)}, " f"t = {int(time.time()*1000)}" ) response = self.session.request( method, self.endpoint + path, params=params, json=body, headers=headers ) # Tuya returns HTTP 200 OK even if there is an error. # They use their own error code to indicate the error. if response.ok is False or response.json().get("success", False) is False: # Retry logger.warning( f"Response error, trying to reconnect: " f"code={response.status_code}, " f"body={response.text}, " f"t = {int(time.time() * 1000)}" ) self.token_info = None response = self.session.request( method, self.endpoint + path, params=params, json=body, headers=headers ) # Somehow failed again. if response.ok is False or response.json().get("success", False) is False: logger.error( f"Response error: code={response.status_code}, body={response.text}" ) raise ResponseError(response.status_code, response.text) # otherwise persist the response result: dict[str, Any] = response.json() logger.debug( f"Response: {json.dumps(filter_logger(result), ensure_ascii=False, indent=2)}" ) return result
[docs] def get( self, path: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Http Get. Requests the server to return specified resources. Args: path (str): api path params (map): request parameter Returns: response: response body """ return self.__request("GET", path, params, None)
[docs] def post( self, path: str, body: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Http Post. Requests the server to update specified resources. Args: path (str): api path body (map): request body Returns: response: response body """ return self.__request("POST", path, None, body)
[docs] def put( self, path: str, body: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Http Put. Requires the server to perform specified operations. Args: path (str): api path body (map): request body Returns: response: response body """ return self.__request("PUT", path, None, body)
[docs] def delete( self, path: str, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Http Delete. Requires the server to delete specified resources. Args: path (str): api path params (map): request param Returns: response: response body """ return self.__request("DELETE", path, params, None)