import datetime import io import os from enum import Enum from urllib.parse import urlparse from uuid import uuid4 import pandas as pd import requests planning_gid = "1001381542" creneau_gid = "1884137958" benevole_gid = "82247394" class ParserState(Enum): STARTING_VALUE = 0 READING_VALUE = 1 ESCAPING = 2 CLOSING_ESCAPE = 3 def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]: state: ParserState = ParserState.STARTING_VALUE arr = [] current_item = "" for c in raw_data: if state == ParserState.STARTING_VALUE: if c == escape: state = ParserState.ESCAPING elif c == separator: arr.append("") else: state = ParserState.READING_VALUE current_item = c elif state == ParserState.READING_VALUE: if c == separator: state = ParserState.STARTING_VALUE arr.append(current_item) current_item = "" else: current_item += c elif state == ParserState.ESCAPING: if c == escape: state = ParserState.CLOSING_ESCAPE else: current_item += c elif state == ParserState.CLOSING_ESCAPE: if c == escape: state = ParserState.ESCAPING current_item += c else: state = ParserState.READING_VALUE arr.append(current_item) current_item = "" arr.append(current_item) return arr class InvalidUrlError(Exception): pass def extract_doc_uid(url: str) -> str: res = urlparse(url) if res.netloc != "docs.google.com": raise InvalidUrlError("Invalid netloc") if not res.path.startswith("/spreadsheets/d/"): raise InvalidUrlError("Invalid path") doc_id = res.path.split("/")[3] l_doc_id = len(doc_id) if l_doc_id < 32 and 50 > l_doc_id: raise InvalidUrlError("Invalid path") return doc_id def build_sheet_url(doc_id, sheet_id): return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}" def downloadAndSave(doc_ui, sheet_gid, fname): url = build_sheet_url(doc_ui, sheet_gid) print("Downloading " + fname) rep = requests.get(url) with open(fname, "wb") as f: f.write(rep.content) def getContactDataFrame(csv_filename: str, skiprows: int = 2) -> pd.DataFrame: df_contact = pd.read_csv(csv_filename, skiprows=skiprows) column_to_drop = [name for name in df_contact.columns if "Unnamed" in name] df_contact.drop(column_to_drop, axis=1, inplace=True) # Filter out empty name df_contact = df_contact[~df_contact.Nom.isnull()] df_contact.reset_index() return df_contact def getCreneauDataFrame(csv_filename: str) -> pd.DataFrame: df_creneau = pd.read_csv(csv_filename) df_creneau.columns = ["title", "lieu", "description", "responsable", "tags"] df_creneau[df_creneau.tags.isnull()].tags = "" return df_creneau def getPlanningDataFrame(csv_filename, starting_date, skip_column=3): list_creneau = [] with io.open(csv_filename, "r", encoding="utf-8") as f: datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()] def getDate(day: str) -> datetime.datetime: s = day.lower() if s.startswith("mercredi"): return starting_date + datetime.timedelta(days=-3) elif s.startswith("jeudi"): return starting_date + datetime.timedelta(days=-2) elif s.startswith("vendredi"): return starting_date + datetime.timedelta(days=-1) elif s.startswith("samedi"): return starting_date elif s.startswith("dimanche"): return starting_date + datetime.timedelta(days=1) elif s.startswith("lundi"): return starting_date + datetime.timedelta(days=2) raise KeyError("This day is not valid : " + s) def getTime(time_str: str) -> datetime.timedelta: splitted_time = time_str.split("h") hours = int(splitted_time[0]) if hours < 5: hours += 24 if len(splitted_time) > 1 and splitted_time[1] != "": return datetime.timedelta(hours=hours, minutes=int(splitted_time[1])) else: return datetime.timedelta(hours=hours) def getStartEnd(time_str: str) -> tuple: pair = time_str.split("-") if len(pair) == 2: return getTime(pair[0]), getTime(pair[1]) else: start = getTime(time_str.split("+")[0]) return start, start + datetime.timedelta(hours=1) # Parse headers headers = datas[skip_column : skip_column + 2] days_str = headers[0] hours = headers[1] column_to_dates: dict[int, dict[str, datetime.datetime]] = {} for i in range(skip_column, len(days_str)): day = getDate(days_str[i]) start, end = getStartEnd(hours[i]) column_to_dates[i] = {"start": day + start, "end": day + end} list_creneau: list[dict] = [] for i in range(5, len(datas)): row = datas[i] current_benevole_name = "" current_time: dict[str, datetime.datetime] = {} for j in range(skip_column, len(row)): if (current_benevole_name != "") and ( row[j] == "" or row[j] != current_benevole_name ): new_creneau = { "id": uuid4(), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) current_benevole_name = "" current_time = {} if row[j] != "": current_benevole_name = row[j] if len(current_time.keys()) == 0: current_time = column_to_dates[j].copy() else: current_time["end"] = column_to_dates[j]["end"] if current_benevole_name != "": new_creneau = { "id": uuid4(), "template_id": row[0], "nom": row[1], "benevole_nom": current_benevole_name, "ligne": i + 1, **current_time, } list_creneau.append(new_creneau) print(f"{len(list_creneau)} créneaux trouvés") return pd.DataFrame.from_dict(list_creneau) def parseGsheet(doc_uuid: str, saturday_date: datetime.datetime): suffix = "_2023" fname_planning = f"./planning{suffix}.csv" fname_creneau = f"./creneau{suffix}.csv" fname_contact = f"./benevole{suffix}.csv" downloadAndSave(doc_uuid, planning_gid, fname_planning) downloadAndSave(doc_uuid, creneau_gid, fname_creneau) downloadAndSave(doc_uuid, benevole_gid, fname_contact) df_contact = getContactDataFrame(fname_contact) df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1) df_creneau = getCreneauDataFrame(fname_creneau) df_planning = getPlanningDataFrame(fname_planning, saturday_date) os.remove(fname_planning) os.remove(fname_creneau) os.remove(fname_contact) return df_contact, df_creneau, df_planning