| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import datetime
- import io
- import os
- from enum import Enum
- from urllib.parse import urlparse
- from uuid import uuid4
- from typing import Union
- import pandas as pd
- import requests
- from pandera.errors import SchemaError
- from pandera.typing import DataFrame
- from .validation import ContactSchema, CreneauSchema, CreneauDataSchema, GsheetData
- planning_gid = "1001381542"
- creneau_gid = "1884137958"
- benevole_gid = "82247394"
- class ParserState(Enum):
- STARTING_VALUE = 0
- READING_VALUE = 1
- ESCAPING = 2
- CLOSING_ESCAPE = 3
- def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]:
- """Split a csv row into the different value
- Args:
- raw_data (str): data
- separator (str, optional): column separator. Defaults to ",".
- escape (str, optional): excaping character . Defaults to '"'.
- Returns:
- list[str]: list of value in the csv row
- """
- state: ParserState = ParserState.STARTING_VALUE
- arr = []
- current_item = ""
- for c in raw_data:
- if state == ParserState.STARTING_VALUE:
- if c == escape:
- state = ParserState.ESCAPING
- elif c == separator:
- arr.append("")
- else:
- state = ParserState.READING_VALUE
- current_item = c
- elif state == ParserState.READING_VALUE:
- if c == separator:
- state = ParserState.STARTING_VALUE
- arr.append(current_item)
- current_item = ""
- else:
- current_item += c
- elif state == ParserState.ESCAPING:
- if c == escape:
- state = ParserState.CLOSING_ESCAPE
- else:
- current_item += c
- elif state == ParserState.CLOSING_ESCAPE:
- if c == escape:
- state = ParserState.ESCAPING
- current_item += c
- else:
- state = ParserState.READING_VALUE
- arr.append(current_item)
- current_item = ""
- arr.append(current_item)
- return arr
- class InvalidUrlError(Exception):
- pass
- class ParsingError(Exception):
- pass
- def extract_doc_uid(url: str) -> str:
- """Extract the uid of a gsheet from its url
- Args:
- url (str): url of google sheet to extract uid from
- Raises:
- InvalidUrlError: if the url does not correspond to a gsheet url
- Returns:
- str: uuid of the google sheet
- """
- res = urlparse(url)
- if res.netloc != "docs.google.com":
- raise InvalidUrlError("Invalid netloc")
- if not res.path.startswith("/spreadsheets/d/"):
- raise InvalidUrlError("Invalid path")
- doc_id = res.path.split("/")[3]
- l_doc_id = len(doc_id)
- if l_doc_id < 32 and 50 > l_doc_id:
- raise InvalidUrlError("Invalid path")
- return doc_id
- def build_sheet_url(doc_id: str, sheet_id: str):
- return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}"
- def downloadAndSave(doc_ui: str, sheet_gid: str, file: Union[str, bytes, os.PathLike]):
- url = build_sheet_url(doc_ui, sheet_gid)
- print("Downloading " + str(file))
- rep = requests.get(url)
- with open(file, "wb") as f:
- f.write(rep.content)
- def getContactDataFrame(csv_filename: str, skiprows: int = 2) -> DataFrame[ContactSchema]:
- df_contact = pd.read_csv(csv_filename, skiprows=skiprows)
- column_to_drop = [name for name in df_contact.columns if "Unnamed" in name]
- df_contact.drop(column_to_drop, axis=1, inplace=True)
- # Filter out empty name
- df_contact = df_contact[~df_contact.Nom.isnull()]
- df_contact.reset_index()
- # coerce SMS to boolean
- df_contact["SMS"] = df_contact["SMS"] == "Oui"
- # create unique contact key
- df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1)
- return ContactSchema.validate(df_contact)
- def getCreneauDataFrame(csv_filename: str) -> DataFrame[CreneauDataSchema]:
- df_creneau = pd.read_csv(csv_filename)
- df_creneau.columns = ["title", "lieu", "description", "responsable", "tags"]
- df_creneau.loc[df_creneau.tags.isnull(), "tags"] = ""
- df_creneau.loc[df_creneau.lieu.isnull(), "lieu"] = ""
- return CreneauDataSchema.validate(df_creneau)
- def getPlanningDataFrame(
- csv_filename: Union[str, bytes, os.PathLike],
- starting_date: datetime.datetime,
- skip_column: int = 3,
- ) -> DataFrame[CreneauSchema]:
- list_creneau = []
- with io.open(csv_filename, "r", encoding="utf-8") as f:
- datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()]
- def getDate(day: str) -> datetime.datetime:
- s = day.lower()
- if s.startswith("mercredi"):
- return starting_date + datetime.timedelta(days=-3)
- elif s.startswith("jeudi"):
- return starting_date + datetime.timedelta(days=-2)
- elif s.startswith("vendredi"):
- return starting_date + datetime.timedelta(days=-1)
- elif s.startswith("samedi"):
- return starting_date
- elif s.startswith("dimanche"):
- return starting_date + datetime.timedelta(days=1)
- elif s.startswith("lundi"):
- return starting_date + datetime.timedelta(days=2)
- raise KeyError("This day is not valid : " + s)
- def getTime(time_str: str) -> datetime.timedelta:
- splitted_time = time_str.split("h")
- hours = int(splitted_time[0])
- if hours < 5:
- hours += 24
- if len(splitted_time) > 1 and splitted_time[1] != "":
- return datetime.timedelta(hours=hours, minutes=int(splitted_time[1]))
- else:
- return datetime.timedelta(hours=hours)
- def getStartEnd(time_str: str) -> tuple:
- pair = time_str.split("-")
- if len(pair) == 2:
- return getTime(pair[0]), getTime(pair[1])
- else:
- start = getTime(time_str.split("+")[0])
- return start, start + datetime.timedelta(hours=1)
- # Parse headers
- headers = datas[skip_column : skip_column + 2]
- days_str = headers[0]
- hours = headers[1]
- column_to_dates: dict[int, dict[str, datetime.datetime]] = {}
- for i in range(skip_column, len(days_str)):
- day = getDate(days_str[i])
- start, end = getStartEnd(hours[i])
- column_to_dates[i] = {"start": day + start, "end": day + end}
- list_creneau: list[dict] = []
- for i in range(5, len(datas)):
- row = datas[i]
- current_benevole_name = ""
- current_time: dict[str, datetime.datetime] = {}
- for j in range(skip_column, len(row)):
- if (current_benevole_name != "") and (row[j] == "" or row[j] != current_benevole_name):
- new_creneau = {
- "id": str(uuid4()),
- "template_id": row[0],
- "nom": row[1],
- "benevole_nom": current_benevole_name,
- "ligne": i + 1,
- **current_time,
- }
- list_creneau.append(new_creneau)
- current_benevole_name = ""
- current_time = {}
- if row[j] != "":
- current_benevole_name = row[j]
- if len(current_time.keys()) == 0:
- current_time = column_to_dates[j].copy()
- else:
- current_time["end"] = column_to_dates[j]["end"]
- if current_benevole_name != "":
- new_creneau = {
- "id": str(uuid4()),
- "template_id": row[0],
- "nom": row[1],
- "benevole_nom": current_benevole_name,
- "ligne": i + 1,
- **current_time,
- }
- list_creneau.append(new_creneau)
- print(f"{len(list_creneau)} créneaux trouvés")
- df = pd.DataFrame.from_dict(list_creneau)
- return CreneauSchema.validate(df)
- def parseGsheet(doc_uuid: str, saturday_date: datetime.datetime) -> GsheetData:
- suffix = "_2023"
- fname_planning = f"./planning{suffix}.csv"
- fname_creneau = f"./creneau{suffix}.csv"
- fname_contact = f"./benevole{suffix}.csv"
- downloadAndSave(doc_uuid, planning_gid, fname_planning)
- downloadAndSave(doc_uuid, creneau_gid, fname_creneau)
- downloadAndSave(doc_uuid, benevole_gid, fname_contact)
- try:
- df_contact = getContactDataFrame(fname_contact)
- except SchemaError as exc:
- msg = "Donnée erronée sur les bénévoles\n" + str(exc)
- raise ParsingError(msg)
- try:
- df_creneau = getCreneauDataFrame(fname_creneau)
- except SchemaError as exc:
- raise ParsingError("Donnée erronée des description des créneaux\n" + str(exc))
- try:
- df_planning = getPlanningDataFrame(fname_planning, saturday_date)
- except SchemaError as exc:
- raise ParsingError("Donnée erronée des créneaux\n" + str(exc))
- os.remove(fname_planning)
- os.remove(fname_creneau)
- os.remove(fname_contact)
- return GsheetData(df_contact, df_creneau, df_planning)
|