| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import subprocess
- import logging
- import re
- from pydantic import BaseModel
- class InvalidArgument(Exception):
- pass
- class InvalidDevice(Exception):
- pass
- class KDEConnectError(Exception):
- pass
- class KDEDevice(BaseModel):
- id: str
- name: str
- paired: bool
- reachable: bool
- class KDEConnect:
- device: KDEDevice
- def __init__(self, device_name=None, device_id=None):
- if device_name is None and device_id is None:
- raise InvalidArgument("device_name or device_id need to be provided")
- try:
- # Verify kdeconnect is available
- KDEConnect.run_kde_command(["-v"])
- except FileNotFoundError:
- raise KDEConnectError("KDEconnect-cli is not available on this computer")
- # verify the device exist
- devices = KDEConnect.get_device_list()
- device = None
- for d in devices:
- if (d.name == device_name) or (d.id == device_id):
- device = d
- break
- if device is None:
- raise InvalidDevice(f"No device found with id {device_id} or name {device_name}")
- if not device.paired:
- raise InvalidDevice(f"Device {device.name} is not paired with this computer")
- if not device.reachable:
- raise InvalidDevice(f"Device {device.name} is not reachable by this computer")
- self.device = device
- def send_sms(self, phone_number: str, sms_content: str):
- # Compact phone number by removing space
- phone = re.sub("[^+0-9]", "", phone_number)
- command = [
- "-d",
- self.device.id,
- "--send-sms",
- sms_content,
- "--destination",
- phone,
- ]
- KDEConnect.run_kde_command(command)
- @staticmethod
- def run_kde_command(args: list[str]) -> str:
- command: list[str] = ["kdeconnect-cli"]
- command.extend(args)
- process: CompletedProcess = subprocess.run(command, capture_output=True)
- subprocess_response = process.stdout.decode("utf-8")
- logging.debug("Subprocess call : \n>> " + " ".join(command) + "\n" + subprocess_response)
- if process.stderr != b"":
- error_msg: str = process.stderr.decode("utf-8")
- if error_msg.endswith("devices found\n"):
- logging.info(error_msg)
- else:
- raise KDEConnectError(error_msg)
- return subprocess_response
- @staticmethod
- def get_device_list() -> list[KDEDevice]:
- """Return the list of devices known by kde connect of this computer"""
- KDEConnect.run_kde_command(["--refresh"])
- output: list[KDEDevice] = []
- device_entries: list[str] = KDEConnect.run_kde_command(["-l"]).strip().split("\n")
- for device_str in device_entries:
- if device_str == "":
- continue
- result = re.search("^- ([^:]+): ([a-z0-9_]{36})", device_str)
- reachable = "reachable" in device_str
- paired = "paired" in device_str
- device = KDEDevice(
- name=result.group(1),
- id=result.group(2),
- reachable=reachable,
- paired=paired,
- )
- output.append(device)
- return output
- if __name__ == "__main__":
- kde = KDEConnect(device_name="Redmi 7A")
- kde.send_sms("0700000000", ("160" * 53) + "\n" + "b")
- kde.send_sms("0700000000", "162" * 54)
- kde.send_sms("0700000000", "simple message")
- kde.send_sms("+33700000000", "+33 number")
- kde.send_sms("+33 77 000 000 0", "spaced number")
- kde.send_sms("0700000000", "Multiline sms\nthis is a second line")
- kde.send_sms(
- "0700000000",
- """Lorem ipsum dolor sit amet,
- consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
- Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
- Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
- Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.""",
- )
|