|
|
@@ -0,0 +1,194 @@
|
|
|
+import datetime
|
|
|
+import io
|
|
|
+import os
|
|
|
+from enum import Enum
|
|
|
+from uuid import uuid4
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+import requests
|
|
|
+
|
|
|
+
|
|
|
+planning_gid = "1001381542"
|
|
|
+creneau_gid = "1884137958"
|
|
|
+benevole_gid = "82247394"
|
|
|
+
|
|
|
+
|
|
|
+class ParserState(Enum):
|
|
|
+ STARTING_VALUE = 0
|
|
|
+ READING_VALUE = 1
|
|
|
+ ESCAPING = 2
|
|
|
+ CLOSING_ESCAPE = 3
|
|
|
+
|
|
|
+
|
|
|
+def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]:
|
|
|
+ state: ParserState = ParserState.STARTING_VALUE
|
|
|
+ arr = []
|
|
|
+ current_item = ""
|
|
|
+ for c in raw_data:
|
|
|
+ if state == ParserState.STARTING_VALUE:
|
|
|
+ if c == escape:
|
|
|
+ state = ParserState.ESCAPING
|
|
|
+ elif c == separator:
|
|
|
+ arr.append("")
|
|
|
+ else:
|
|
|
+ state = ParserState.READING_VALUE
|
|
|
+ current_item = c
|
|
|
+ elif state == ParserState.READING_VALUE:
|
|
|
+ if c == separator:
|
|
|
+ state = ParserState.STARTING_VALUE
|
|
|
+ arr.append(current_item)
|
|
|
+ current_item = ""
|
|
|
+ else:
|
|
|
+ current_item += c
|
|
|
+ elif state == ParserState.ESCAPING:
|
|
|
+ if c == escape:
|
|
|
+ state = ParserState.CLOSING_ESCAPE
|
|
|
+ else:
|
|
|
+ current_item += c
|
|
|
+ elif state == ParserState.CLOSING_ESCAPE:
|
|
|
+ if c == escape:
|
|
|
+ state = ParserState.ESCAPING
|
|
|
+ current_item += c
|
|
|
+ else:
|
|
|
+ state = ParserState.READING_VALUE
|
|
|
+ arr.append(current_item)
|
|
|
+ current_item = ""
|
|
|
+ arr.append(current_item)
|
|
|
+ return arr
|
|
|
+
|
|
|
+
|
|
|
+def build_sheet_url(doc_id, sheet_id):
|
|
|
+ return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}"
|
|
|
+
|
|
|
+
|
|
|
+def downloadAndSave(doc_ui, sheet_gid, fname):
|
|
|
+ url = build_sheet_url(doc_ui, sheet_gid)
|
|
|
+ print("Downloading " + fname)
|
|
|
+ rep = requests.get(url)
|
|
|
+ with open(fname, "wb") as f:
|
|
|
+ f.write(rep.content)
|
|
|
+
|
|
|
+
|
|
|
+def getContactDataFrame(csv_filename: str, skiprows: int = 2) -> pd.DataFrame:
|
|
|
+ df_contact = pd.read_csv(csv_filename, skiprows=skiprows)
|
|
|
+ column_to_drop = [name for name in df_contact.columns if "Unnamed" in name]
|
|
|
+ df_contact.drop(column_to_drop, axis=1, inplace=True)
|
|
|
+
|
|
|
+ # Filter out empty name
|
|
|
+ df_contact = df_contact[~df_contact.Nom.isnull()]
|
|
|
+ df_contact.reset_index()
|
|
|
+ return df_contact
|
|
|
+
|
|
|
+
|
|
|
+def getCreneauDataFrame(csv_filename: str) -> pd.DataFrame:
|
|
|
+ df_creneau = pd.read_csv(csv_filename)
|
|
|
+ df_creneau.columns = ["nom", "lieu", "description", "responsable"]
|
|
|
+ return df_creneau
|
|
|
+
|
|
|
+
|
|
|
+def getPlanningDataFrame(csv_filename, starting_date, skip_column=3):
|
|
|
+ list_creneau = []
|
|
|
+ with io.open(csv_filename, "r", encoding="utf-8") as f:
|
|
|
+ datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()]
|
|
|
+
|
|
|
+ def getDate(day: str) -> datetime.datetime:
|
|
|
+ s = day.lower()
|
|
|
+ if s.startswith("jeudi"):
|
|
|
+ return starting_date
|
|
|
+ elif s.startswith("vendredi"):
|
|
|
+ return starting_date + datetime.timedelta(days=1)
|
|
|
+ elif s.startswith("samedi"):
|
|
|
+ return starting_date + datetime.timedelta(days=2)
|
|
|
+ elif s.startswith("dimanche"):
|
|
|
+ return starting_date + datetime.timedelta(days=3)
|
|
|
+ elif s.startswith("lundi"):
|
|
|
+ return starting_date + datetime.timedelta(days=4)
|
|
|
+ raise KeyError("This day is not valid : " + s)
|
|
|
+
|
|
|
+ def getTime(time_str: str) -> datetime.timedelta:
|
|
|
+ l = time_str.split("h")
|
|
|
+ hours = int(l[0])
|
|
|
+ if hours < 5:
|
|
|
+ hours += 24
|
|
|
+ if len(l) > 1 and l[1] != "":
|
|
|
+ return datetime.timedelta(hours=hours, minutes=int(l[1]))
|
|
|
+ else:
|
|
|
+ return datetime.timedelta(hours=hours)
|
|
|
+
|
|
|
+ def getStartEnd(time_str: str) -> tuple:
|
|
|
+ l = time_str.split("-")
|
|
|
+ if len(l) == 2:
|
|
|
+ return getTime(l[0]), getTime(l[1])
|
|
|
+ else:
|
|
|
+ start = getTime(time_str.split("+")[0])
|
|
|
+ return start, start + datetime.timedelta(hours=1)
|
|
|
+
|
|
|
+ # Parse headers
|
|
|
+ headers = datas[skip_column : skip_column + 2]
|
|
|
+ days_str = headers[0]
|
|
|
+ hours = headers[1]
|
|
|
+ column_to_dates: dict[int, dict[str, datetime.datetime]] = {}
|
|
|
+ assert len(hours) == len(hours)
|
|
|
+ for i in range(skip_column, len(days_str)):
|
|
|
+ day = getDate(days_str[i])
|
|
|
+ start, end = getStartEnd(hours[i])
|
|
|
+ column_to_dates[i] = {"start": day + start, "end": day + end}
|
|
|
+
|
|
|
+ list_creneau: list[dict] = []
|
|
|
+ for i in range(5, len(datas)):
|
|
|
+ row = datas[i]
|
|
|
+ current_benevole_name = ""
|
|
|
+ current_time: dict[str, datetime.datetime] = {}
|
|
|
+ for j in range(skip_column, len(row)):
|
|
|
+ if (current_benevole_name != "") and (row[j] == "" or row[j] != current_benevole_name):
|
|
|
+ new_creneau = {
|
|
|
+ "id": uuid4(),
|
|
|
+ "description_id": row[0],
|
|
|
+ "nom": row[1],
|
|
|
+ "benevole_nom": current_benevole_name,
|
|
|
+ "ligne": i + 1,
|
|
|
+ **current_time,
|
|
|
+ }
|
|
|
+ list_creneau.append(new_creneau)
|
|
|
+ current_benevole_name = ""
|
|
|
+ current_time = {}
|
|
|
+ if row[j] != "":
|
|
|
+ current_benevole_name = row[j]
|
|
|
+ if len(current_time.keys()) == 0:
|
|
|
+ current_time = column_to_dates[j].copy()
|
|
|
+ else:
|
|
|
+ current_time["end"] = column_to_dates[j]["end"]
|
|
|
+ if current_benevole_name != "":
|
|
|
+ new_creneau = {
|
|
|
+ "id": uuid4(),
|
|
|
+ "description_id": row[0],
|
|
|
+ "nom": row[1],
|
|
|
+ "benevole_nom": current_benevole_name,
|
|
|
+ "ligne": i + 1,
|
|
|
+ **current_time,
|
|
|
+ }
|
|
|
+ list_creneau.append(new_creneau)
|
|
|
+
|
|
|
+ print(f"{len(list_creneau)} créneaux trouvés")
|
|
|
+ return pd.DataFrame.from_dict(list_creneau)
|
|
|
+
|
|
|
+
|
|
|
+def parseGsheet(doc_uuid="1Ueuw8VVAmnx5k_t8tjHwLIuYwQJrKcwNbibbaqWRWAs"):
|
|
|
+ suffix = "_2023"
|
|
|
+ fname_planning = f"./planning{suffix}.csv"
|
|
|
+ fname_creneau = f"./creneau{suffix}.csv"
|
|
|
+ fname_contact = f"./benevole{suffix}.csv"
|
|
|
+ downloadAndSave(doc_uuid, planning_gid, fname_planning)
|
|
|
+ downloadAndSave(doc_uuid, creneau_gid, fname_creneau)
|
|
|
+ downloadAndSave(doc_uuid, benevole_gid, fname_contact)
|
|
|
+
|
|
|
+ df_contact = getContactDataFrame(fname_contact)
|
|
|
+ df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1)
|
|
|
+
|
|
|
+ df_creneau = getCreneauDataFrame(fname_creneau)
|
|
|
+ df_planning = getPlanningDataFrame(fname_planning, datetime.datetime(2023, 9, 14))
|
|
|
+
|
|
|
+ os.remove(fname_planning)
|
|
|
+ os.remove(fname_creneau)
|
|
|
+ os.remove(fname_contact)
|
|
|
+ return df_contact, df_creneau, df_planning
|