import datetime import io import os from enum import Enum from urllib.parse import urlparse from uuid import uuid4 from typing import Union import pandas as pd import requests from pandera.errors import SchemaError from pandera.typing import DataFrame from .validation import ContactSchema, CreneauSchema, CreneauDataSchema, GsheetData planning_gid = "1001381542" creneau_gid = "1884137958" benevole_gid = "82247394" class ParserState(Enum): STARTING_VALUE = 0 READING_VALUE = 1 ESCAPING = 2 CLOSING_ESCAPE = 3 def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]: """Split a csv row into the different value Args: raw_data (str): data separator (str, optional): column separator. Defaults to ",". escape (str, optional): excaping character . Defaults to '"'. Returns: list[str]: list of value in the csv row """ state: ParserState = ParserState.STARTING_VALUE arr = [] current_item = "" for c in raw_data: if state == ParserState.STARTING_VALUE: if c == escape: state = ParserState.ESCAPING elif c == separator: arr.append("") else: state = ParserState.READING_VALUE current_item = c elif state == ParserState.READING_VALUE: if c == separator: state = ParserState.STARTING_VALUE arr.append(current_item) current_item = "" else: current_item += c elif state == ParserState.ESCAPING: if c == escape: state = ParserState.CLOSING_ESCAPE else: current_item += c elif state == ParserState.CLOSING_ESCAPE: if c == escape: state = ParserState.ESCAPING current_item += c else: state = ParserState.READING_VALUE arr.append(current_item) current_item = "" arr.append(current_item) return arr class InvalidUrlError(Exception): pass class ParsingError(Exception): pass def extract_doc_uid(url: str) -> str: """Extract the uid of a gsheet from its url Args: url (str): url of google sheet to extract uid from Raises: InvalidUrlError: if the url does not correspond to a gsheet url Returns: str: uuid of the google sheet """ res = urlparse(url) if res.netloc != "docs.google.com": raise InvalidUrlError("Invalid netloc") if not res.path.startswith("/spreadsheets/d/"): raise InvalidUrlError("Invalid path") doc_id = res.path.split("/")[3] l_doc_id = len(doc_id) if l_doc_id < 32 and 50 > l_doc_id: raise InvalidUrlError("Invalid path") return doc_id def build_sheet_url(doc_id: str, sheet_id: str): return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}" def downloadAndSave(doc_ui: str, sheet_gid: str, file: Union[str, bytes, os.PathLike]): url = build_sheet_url(doc_ui, sheet_gid) print("Downloading " + str(file)) rep = requests.get(url) with open(file, "wb") as f: f.write(rep.content) def getContactDataFrame( csv_filename: str, skiprows: int = 2 ) -> DataFrame[ContactSchema]: df_contact = pd.read_csv(csv_filename, skiprows=skiprows) column_to_drop = [name for name in df_contact.columns if "Unnamed" in name] df_contact.drop(column_to_drop, axis=1, inplace=True) # Filter out empty name df_contact = df_contact[~df_contact.Nom.isnull()] df_contact.reset_index() # coerce SMS to boolean df_contact["SMS"] = df_contact["SMS"] == "Oui" # create unique contact key df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1) return ContactSchema.validate(df_contact) def getCreneauDataFrame(csv_filename: str) -> DataFrame[CreneauDataSchema]: df_creneau = pd.read_csv(csv_filename) df_creneau.columns = ["title", "lieu", "description", "responsable", "tags"] df_creneau.loc[df_creneau.tags.isnull(), "tags"] = "" df_creneau.loc[df_creneau.lieu.isnull(), "lieu"] = "" return CreneauDataSchema.validate(df_creneau) def getPlanningDataFrame( csv_filename: Union[str, bytes, os.PathLike], starting_date: datetime.datetime, skip_column: int = 3, ) -> DataFrame[CreneauSchema]: list_creneau = [] with io.open(csv_filename, "r", encoding="utf-8") as f: datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()] def getDate(day: str) -> datetime.datetime: s = day.lower() if s.startswith("mercredi"): return starting_date + datetime.timedelta(days=-3) elif s.startswith("jeudi"): return starting_date + datetime.timedelta(days=-2) elif s.startswith("vendredi"): return starting_date + datetime.timedelta(days=-1) elif s.startswith("samedi"): return starting_date elif s.startswith("dimanche"): return starting_date + datetime.timedelta(days=1) elif s.startswith("lundi"): return starting_date + datetime.timedelta(days=2) raise KeyError("This day is not valid : " + s) def getTime(time_str: str) -> datetime.timedelta: splitted_time = time_str.split("h") hours = int(splitted_time[0]) if hours < 5: hours += 24 if len(splitted_time) > 1 and splitted_time[1] != "": return datetime.timedelta(hours=hours, minutes=int(splitted_time[1])) else: return datetime.timedelta(hours=hours) def getStartEnd(time_str: str) -> tuple: pair = time_str.split("-") if len(pair) == 2: return getTime(pair[0]), getTime(pair[1]) else: start = getTime(time_str.split("+")[0]) return start, start + datetime.timedelta(hours=1) # Parse headers headers = datas[skip_column : skip_column + 2] days_str = headers[0] hours = headers[1] column_to_dates: dict[int, dict[str, datetime.datetime]] = {} for i in range(skip_column, len(days_str)): day = getDate(days_str[i]) start, end = getStartEnd(hours[i]) column_to_dates[i] = {"start": day + start, "end": day + end} list_creneau: list[dict] = [] for i in range(5, len(datas)): row = datas[i] current_benevole_name = "" current_time: dict[str, datetime.datetime] = {} for j in range(skip_column, len(row)): if (current_benevole_name != "") and ( row[j] == "" or row[j] != current_benevole_name ): new_creneau = { "id": str(uuid4()), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) current_benevole_name = "" current_time = {} if row[j] != "": current_benevole_name = row[j] if len(current_time.keys()) == 0: current_time = column_to_dates[j].copy() else: current_time["end"] = column_to_dates[j]["end"] if current_benevole_name != "": new_creneau = { "id": str(uuid4()), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) print(f"{len(list_creneau)} créneaux trouvés") df = pd.DataFrame.from_dict(list_creneau) return CreneauSchema.validate(df) def parseGsheet(doc_uuid: str, saturday_date: datetime.datetime) -> GsheetData: suffix = "_2023" fname_planning = f"./planning{suffix}.csv" fname_creneau = f"./creneau{suffix}.csv" fname_contact = f"./benevole{suffix}.csv" downloadAndSave(doc_uuid, planning_gid, fname_planning) downloadAndSave(doc_uuid, creneau_gid, fname_creneau) downloadAndSave(doc_uuid, benevole_gid, fname_contact) try: df_contact = getContactDataFrame(fname_contact) except SchemaError as exc: msg = "Donnée erronée sur les bénévoles\n" + str(exc) raise ParsingError(msg) try: df_creneau = getCreneauDataFrame(fname_creneau) except SchemaError as exc: raise ParsingError("Donnée erronée des description des créneaux\n" + str(exc)) try: df_planning = getPlanningDataFrame(fname_planning, saturday_date) except SchemaError as exc: raise ParsingError("Donnée erronée des créneaux\n" + str(exc)) os.remove(fname_planning) os.remove(fname_creneau) os.remove(fname_contact) return GsheetData(df_contact, df_creneau, df_planning)