send_sms.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import os
  2. import requests
  3. from .KdeConnect import KDEConnect
  4. from app.schemas.responses import AccessTokenResponse, SMSResponse
  5. def send_sms_from_api(url: str, login: str, pwd: str, device_name: str):
  6. """Connect to sms-server API to retreieve sms to be send and send them using kdeconnect-cli"""
  7. # Log in to API get access token
  8. response = requests.post(
  9. url + "/auth/access-token",
  10. data={"grant_type": "password", "username": login, "password": pwd},
  11. )
  12. authentication = AccessTokenResponse.parse_raw(response.content)
  13. headers = {"Authorization": "Bearer " + authentication.access_token}
  14. # List SMS to be send
  15. response = requests.get(url + "/sms/to-send", headers=headers)
  16. sms_list: list[SMSResponse] = [SMSResponse.parse_obj(obj) for obj in response.json()]
  17. print(sms_list)
  18. # Init KDE Connect
  19. kde = KDEConnect(device_name=device_name)
  20. # send sms & update api
  21. for sms in sms_list:
  22. try:
  23. kde.send_sms(sms.phone_number, sms.content)
  24. requests.post(url + "/sms/send-now/" + sms.id, headers=headers)
  25. except Exception as exc:
  26. print(exc)
  27. class InvalidEnvironnementVariable(Exception):
  28. pass
  29. def get_env(key: str) -> str:
  30. """Get a variable from the environnement or raise an InvalidEnvironnementVariable"""
  31. value = os.environ.get(key)
  32. if value is None:
  33. raise InvalidEnvironnementVariable(key)
  34. return value
  35. def main():
  36. API_URL = get_env("API_URL")
  37. API_LOGIN = get_env("API_LOGIN")
  38. API_PASSWORD = get_env("API_PASSWORD")
  39. DEVICE_NAME = get_env("DEVICE_NAME")
  40. send_sms_from_api(API_URL, API_LOGIN, API_PASSWORD, DEVICE_NAME)
  41. if __name__ == "__main__":
  42. main()