import datetime import io import os from enum import Enum from urllib.parse import urlparse from uuid import uuid4 import pandas as pd import requests from pandera.typing import DataFrame from pandera.errors import SchemaError from .validation import ContactSchema, CreneauSchema, CreneauDataSchema planning_gid = "1001381542" creneau_gid = "1884137958" benevole_gid = "82247394" class ParserState(Enum): STARTING_VALUE = 0 READING_VALUE = 1 ESCAPING = 2 CLOSING_ESCAPE = 3 def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]: state: ParserState = ParserState.STARTING_VALUE arr = [] current_item = "" for c in raw_data: if state == ParserState.STARTING_VALUE: if c == escape: state = ParserState.ESCAPING elif c == separator: arr.append("") else: state = ParserState.READING_VALUE current_item = c elif state == ParserState.READING_VALUE: if c == separator: state = ParserState.STARTING_VALUE arr.append(current_item) current_item = "" else: current_item += c elif state == ParserState.ESCAPING: if c == escape: state = ParserState.CLOSING_ESCAPE else: current_item += c elif state == ParserState.CLOSING_ESCAPE: if c == escape: state = ParserState.ESCAPING current_item += c else: state = ParserState.READING_VALUE arr.append(current_item) current_item = "" arr.append(current_item) return arr class InvalidUrlError(Exception): pass class ParsingError(Exception): pass def extract_doc_uid(url: str) -> str: res = urlparse(url) if res.netloc != "docs.google.com": raise InvalidUrlError("Invalid netloc") if not res.path.startswith("/spreadsheets/d/"): raise InvalidUrlError("Invalid path") doc_id = res.path.split("/")[3] l_doc_id = len(doc_id) if l_doc_id < 32 and 50 > l_doc_id: raise InvalidUrlError("Invalid path") return doc_id def build_sheet_url(doc_id, sheet_id): return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}" def downloadAndSave(doc_ui, sheet_gid, fname): url = build_sheet_url(doc_ui, sheet_gid) print("Downloading " + fname) rep = requests.get(url) with open(fname, "wb") as f: f.write(rep.content) def getContactDataFrame( csv_filename: str, skiprows: int = 2 ) -> DataFrame[ContactSchema]: df_contact = pd.read_csv(csv_filename, skiprows=skiprows) column_to_drop = [name for name in df_contact.columns if "Unnamed" in name] df_contact.drop(column_to_drop, axis=1, inplace=True) # Filter out empty name df_contact = df_contact[~df_contact.Nom.isnull()] df_contact.reset_index() # coerce SMS to boolean df_contact["SMS"] = df_contact["SMS"] == "Oui" # create unique contact key df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1) return ContactSchema.validate(df_contact) def getCreneauDataFrame(csv_filename: str) -> DataFrame[CreneauDataSchema]: df_creneau = pd.read_csv(csv_filename) df_creneau.columns = ["title", "lieu", "description", "responsable", "tags"] df_creneau.loc[df_creneau.tags.isnull(), "tags"] = "" df_creneau.loc[df_creneau.lieu.isnull(), "lieu"] = "" return CreneauDataSchema.validate(df_creneau) def getPlanningDataFrame(csv_filename, starting_date, skip_column=3): list_creneau = [] with io.open(csv_filename, "r", encoding="utf-8") as f: datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()] def getDate(day: str) -> datetime.datetime: s = day.lower() if s.startswith("mercredi"): return starting_date + datetime.timedelta(days=-3) elif s.startswith("jeudi"): return starting_date + datetime.timedelta(days=-2) elif s.startswith("vendredi"): return starting_date + datetime.timedelta(days=-1) elif s.startswith("samedi"): return starting_date elif s.startswith("dimanche"): return starting_date + datetime.timedelta(days=1) elif s.startswith("lundi"): return starting_date + datetime.timedelta(days=2) raise KeyError("This day is not valid : " + s) def getTime(time_str: str) -> datetime.timedelta: splitted_time = time_str.split("h") hours = int(splitted_time[0]) if hours < 5: hours += 24 if len(splitted_time) > 1 and splitted_time[1] != "": return datetime.timedelta(hours=hours, minutes=int(splitted_time[1])) else: return datetime.timedelta(hours=hours) def getStartEnd(time_str: str) -> tuple: pair = time_str.split("-") if len(pair) == 2: return getTime(pair[0]), getTime(pair[1]) else: start = getTime(time_str.split("+")[0]) return start, start + datetime.timedelta(hours=1) # Parse headers headers = datas[skip_column : skip_column + 2] days_str = headers[0] hours = headers[1] column_to_dates: dict[int, dict[str, datetime.datetime]] = {} for i in range(skip_column, len(days_str)): day = getDate(days_str[i]) start, end = getStartEnd(hours[i]) column_to_dates[i] = {"start": day + start, "end": day + end} list_creneau: list[dict] = [] for i in range(5, len(datas)): row = datas[i] current_benevole_name = "" current_time: dict[str, datetime.datetime] = {} for j in range(skip_column, len(row)): if (current_benevole_name != "") and ( row[j] == "" or row[j] != current_benevole_name ): new_creneau = { "id": str(uuid4()), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) current_benevole_name = "" current_time = {} if row[j] != "": current_benevole_name = row[j] if len(current_time.keys()) == 0: current_time = column_to_dates[j].copy() else: current_time["end"] = column_to_dates[j]["end"] if current_benevole_name != "": new_creneau = { "id": str(uuid4()), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) print(f"{len(list_creneau)} créneaux trouvés") df = pd.DataFrame.from_dict(list_creneau) return CreneauSchema.validate(df) def parseGsheet(doc_uuid: str, saturday_date: datetime.datetime): suffix = "_2023" fname_planning = f"./planning{suffix}.csv" fname_creneau = f"./creneau{suffix}.csv" fname_contact = f"./benevole{suffix}.csv" downloadAndSave(doc_uuid, planning_gid, fname_planning) downloadAndSave(doc_uuid, creneau_gid, fname_creneau) downloadAndSave(doc_uuid, benevole_gid, fname_contact) try: df_contact = getContactDataFrame(fname_contact) except SchemaError as exc: msg = "Donnée erronée sur les bénévoles\n" + str(exc) raise ParsingError(msg) try: df_creneau = getCreneauDataFrame(fname_creneau) except SchemaError as exc: raise ParsingError("Donnée erronée des description des créneaux\n" + str(exc)) try: df_planning = getPlanningDataFrame(fname_planning, saturday_date) except SchemaError as exc: raise ParsingError("Donnée erronée des créneaux\n" + str(exc)) os.remove(fname_planning) os.remove(fname_creneau) os.remove(fname_contact) return df_contact, df_creneau, df_planning