send_sms.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import os
  2. import requests
  3. from KdeConnect import KDEConnect
  4. from app.schemas.responses import AccessTokenResponse, SMSResponse
  5. def send_sms_from_api(url: str, login: str, pwd: str, device_name: str):
  6. """Connect to sms-server API to retreieve sms to be send and send them using kdeconnect-cli"""
  7. # Log in to API get access token
  8. response = requests.get(
  9. url + "/auth/access-token",
  10. data={"grant_type": "password", "username": login, "password": pwd},
  11. )
  12. authentication: AccessTokenResponse = response.json()
  13. headers = {"Authorization": "Bearer " + authentication.access_token}
  14. # List SMS to be send
  15. response = requests.get(url + "/sms/to-send", headers=headers)
  16. sms_list: list[SMSResponse] = response.json()
  17. # Init KDE Connect
  18. kde = KDEConnect(device_name=device_name)
  19. # send sms & update api
  20. for sms in sms_list:
  21. kde.send_sms(sms.phone_number, sms.content)
  22. requests.get(url + "/sms/sent-now/" + sms.id, headers=headers)
  23. class InvalidEnvironnementVariable(Exception):
  24. pass
  25. def get_env(key: str) -> str:
  26. """Get a variable from the environnement or raise an InvalidEnvironnementVariable"""
  27. value = os.environ.get(key)
  28. if value is None:
  29. raise InvalidEnvironnementVariable(key)
  30. return value
  31. def main():
  32. API_URL = get_env("API_URL")
  33. API_LOGIN = get_env("API_LOGIN")
  34. API_PASSWORD = get_env("API_PASSWORD")
  35. DEVICE_NAME = get_env("DEVICE_NAME")
  36. send_sms_from_api(API_URL, API_LOGIN, API_PASSWORD, DEVICE_NAME)
  37. if __name__ == "__main__":
  38. main()