| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import os
- import requests
- from .KdeConnect import KDEConnect
- from app.schemas.responses import AccessTokenResponse, SMSResponse
- def send_sms_from_api(url: str, login: str, pwd: str, device_name: str):
- """Connect to sms-server API to retreieve sms to be send and send them using kdeconnect-cli"""
- # Log in to API get access token
- response = requests.post(
- url + "/auth/access-token",
- data={"grant_type": "password", "username": login, "password": pwd},
- )
- authentication = AccessTokenResponse.parse_raw(response.content)
- headers = {"Authorization": "Bearer " + authentication.access_token}
- # List SMS to be send
- response = requests.get(url + "/sms/to-send", headers=headers)
- sms_list: list[SMSResponse] = [SMSResponse.parse_obj(obj) for obj in response.json()]
- print(sms_list)
- # Init KDE Connect
- kde = KDEConnect(device_name=device_name)
- # send sms & update api
- for sms in sms_list:
- try:
- kde.send_sms(sms.phone_number, sms.content)
- requests.post(url + "/sms/send-now/" + sms.id, headers=headers)
- except Exception as exc:
- print(exc)
- class InvalidEnvironnementVariable(Exception):
- pass
- def get_env(key: str) -> str:
- """Get a variable from the environnement or raise an InvalidEnvironnementVariable"""
- value = os.environ.get(key)
- if value is None:
- raise InvalidEnvironnementVariable(key)
- return value
- def main():
- API_URL = get_env("API_URL")
- API_LOGIN = get_env("API_LOGIN")
- API_PASSWORD = get_env("API_PASSWORD")
- DEVICE_NAME = get_env("DEVICE_NAME")
- send_sms_from_api(API_URL, API_LOGIN, API_PASSWORD, DEVICE_NAME)
- if __name__ == "__main__":
- main()
|