gsheet.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import datetime
  2. import io
  3. import os
  4. from enum import Enum
  5. from urllib.parse import urlparse
  6. from uuid import uuid4
  7. from typing import Union
  8. import pandas as pd
  9. import requests
  10. from pandera.errors import SchemaError
  11. from pandera.typing import DataFrame
  12. from .validation import ContactSchema, CreneauSchema, CreneauDataSchema, GsheetData
  13. planning_gid = "1001381542"
  14. creneau_gid = "1884137958"
  15. benevole_gid = "82247394"
  16. class ParserState(Enum):
  17. STARTING_VALUE = 0
  18. READING_VALUE = 1
  19. ESCAPING = 2
  20. CLOSING_ESCAPE = 3
  21. def split_csv_row(raw_data: str, separator: str = ",", escape: str = '"') -> list[str]:
  22. """Split a csv row into the different value
  23. Args:
  24. raw_data (str): data
  25. separator (str, optional): column separator. Defaults to ",".
  26. escape (str, optional): excaping character . Defaults to '"'.
  27. Returns:
  28. list[str]: list of value in the csv row
  29. """
  30. state: ParserState = ParserState.STARTING_VALUE
  31. arr = []
  32. current_item = ""
  33. for c in raw_data:
  34. if state == ParserState.STARTING_VALUE:
  35. if c == escape:
  36. state = ParserState.ESCAPING
  37. elif c == separator:
  38. arr.append("")
  39. else:
  40. state = ParserState.READING_VALUE
  41. current_item = c
  42. elif state == ParserState.READING_VALUE:
  43. if c == separator:
  44. state = ParserState.STARTING_VALUE
  45. arr.append(current_item)
  46. current_item = ""
  47. else:
  48. current_item += c
  49. elif state == ParserState.ESCAPING:
  50. if c == escape:
  51. state = ParserState.CLOSING_ESCAPE
  52. else:
  53. current_item += c
  54. elif state == ParserState.CLOSING_ESCAPE:
  55. if c == escape:
  56. state = ParserState.ESCAPING
  57. current_item += c
  58. else:
  59. state = ParserState.READING_VALUE
  60. arr.append(current_item)
  61. current_item = ""
  62. arr.append(current_item)
  63. return arr
  64. class InvalidUrlError(Exception):
  65. pass
  66. class ParsingError(Exception):
  67. pass
  68. def extract_doc_uid(url: str) -> str:
  69. """Extract the uid of a gsheet from its url
  70. Args:
  71. url (str): url of google sheet to extract uid from
  72. Raises:
  73. InvalidUrlError: if the url does not correspond to a gsheet url
  74. Returns:
  75. str: uuid of the google sheet
  76. """
  77. res = urlparse(url)
  78. if res.netloc != "docs.google.com":
  79. raise InvalidUrlError("Invalid netloc")
  80. if not res.path.startswith("/spreadsheets/d/"):
  81. raise InvalidUrlError("Invalid path")
  82. doc_id = res.path.split("/")[3]
  83. l_doc_id = len(doc_id)
  84. if l_doc_id < 32 and 50 > l_doc_id:
  85. raise InvalidUrlError("Invalid path")
  86. return doc_id
  87. def build_sheet_url(doc_id: str, sheet_id: str):
  88. return f"https://docs.google.com/spreadsheets/d/{doc_id}/export?format=csv&gid={sheet_id}"
  89. def downloadAndSave(doc_ui: str, sheet_gid: str, file: Union[str, bytes, os.PathLike]):
  90. url = build_sheet_url(doc_ui, sheet_gid)
  91. print("Downloading " + str(file))
  92. rep = requests.get(url)
  93. with open(file, "wb") as f:
  94. f.write(rep.content)
  95. def getContactDataFrame(csv_filename: str, skiprows: int = 2) -> DataFrame[ContactSchema]:
  96. df_contact = pd.read_csv(csv_filename, skiprows=skiprows)
  97. column_to_drop = [name for name in df_contact.columns if "Unnamed" in name]
  98. df_contact.drop(column_to_drop, axis=1, inplace=True)
  99. # Filter out empty name
  100. df_contact = df_contact[~df_contact.Nom.isnull()]
  101. df_contact.reset_index()
  102. # coerce SMS to boolean
  103. df_contact["SMS"] = df_contact["SMS"] == "Oui"
  104. # create unique contact key
  105. df_contact["key"] = df_contact["Prénom"] + " " + df_contact.Nom.str.slice(0, 1)
  106. return ContactSchema.validate(df_contact)
  107. def getCreneauDataFrame(csv_filename: str) -> DataFrame[CreneauDataSchema]:
  108. df_creneau = pd.read_csv(csv_filename)
  109. df_creneau.columns = ["title", "lieu", "description", "responsable", "tags"]
  110. df_creneau.loc[df_creneau.tags.isnull(), "tags"] = ""
  111. df_creneau.loc[df_creneau.lieu.isnull(), "lieu"] = ""
  112. return CreneauDataSchema.validate(df_creneau)
  113. def getPlanningDataFrame(
  114. csv_filename: Union[str, bytes, os.PathLike],
  115. starting_date: datetime.datetime,
  116. skip_column: int = 3,
  117. ) -> DataFrame[CreneauSchema]:
  118. list_creneau = []
  119. with io.open(csv_filename, "r", encoding="utf-8") as f:
  120. datas = [split_csv_row(s.replace("\n", "")) for s in f.readlines()]
  121. def getDate(day: str) -> datetime.datetime:
  122. s = day.lower()
  123. if s.startswith("mercredi"):
  124. return starting_date + datetime.timedelta(days=-3)
  125. elif s.startswith("jeudi"):
  126. return starting_date + datetime.timedelta(days=-2)
  127. elif s.startswith("vendredi"):
  128. return starting_date + datetime.timedelta(days=-1)
  129. elif s.startswith("samedi"):
  130. return starting_date
  131. elif s.startswith("dimanche"):
  132. return starting_date + datetime.timedelta(days=1)
  133. elif s.startswith("lundi"):
  134. return starting_date + datetime.timedelta(days=2)
  135. raise KeyError("This day is not valid : " + s)
  136. def getTime(time_str: str) -> datetime.timedelta:
  137. splitted_time = time_str.split("h")
  138. hours = int(splitted_time[0])
  139. if hours < 5:
  140. hours += 24
  141. if len(splitted_time) > 1 and splitted_time[1] != "":
  142. return datetime.timedelta(hours=hours, minutes=int(splitted_time[1]))
  143. else:
  144. return datetime.timedelta(hours=hours)
  145. def getStartEnd(time_str: str) -> tuple:
  146. pair = time_str.split("-")
  147. if len(pair) == 2:
  148. return getTime(pair[0]), getTime(pair[1])
  149. else:
  150. start = getTime(time_str.split("+")[0])
  151. return start, start + datetime.timedelta(hours=1)
  152. # Parse headers
  153. headers = datas[skip_column : skip_column + 2]
  154. days_str = headers[0]
  155. hours = headers[1]
  156. column_to_dates: dict[int, dict[str, datetime.datetime]] = {}
  157. for i in range(skip_column, len(days_str)):
  158. day = getDate(days_str[i])
  159. start, end = getStartEnd(hours[i])
  160. column_to_dates[i] = {"start": day + start, "end": day + end}
  161. list_creneau: list[dict] = []
  162. for i in range(5, len(datas)):
  163. row = datas[i]
  164. current_benevole_name = ""
  165. current_time: dict[str, datetime.datetime] = {}
  166. for j in range(skip_column, len(row)):
  167. if (current_benevole_name != "") and (row[j] == "" or row[j] != current_benevole_name):
  168. new_creneau = {
  169. "id": str(uuid4()),
  170. "template_id": row[0],
  171. "nom": row[1],
  172. "benevole_nom": current_benevole_name,
  173. "ligne": i + 1,
  174. **current_time,
  175. }
  176. list_creneau.append(new_creneau)
  177. current_benevole_name = ""
  178. current_time = {}
  179. if row[j] != "":
  180. current_benevole_name = row[j]
  181. if len(current_time.keys()) == 0:
  182. current_time = column_to_dates[j].copy()
  183. else:
  184. current_time["end"] = column_to_dates[j]["end"]
  185. if current_benevole_name != "":
  186. new_creneau = {
  187. "id": str(uuid4()),
  188. "template_id": row[0],
  189. "nom": row[1],
  190. "benevole_nom": current_benevole_name,
  191. "ligne": i + 1,
  192. **current_time,
  193. }
  194. list_creneau.append(new_creneau)
  195. print(f"{len(list_creneau)} créneaux trouvés")
  196. df = pd.DataFrame.from_dict(list_creneau)
  197. return CreneauSchema.validate(df)
  198. def parseGsheet(doc_uuid: str, saturday_date: datetime.datetime) -> GsheetData:
  199. suffix = "_2023"
  200. fname_planning = f"./planning{suffix}.csv"
  201. fname_creneau = f"./creneau{suffix}.csv"
  202. fname_contact = f"./benevole{suffix}.csv"
  203. downloadAndSave(doc_uuid, planning_gid, fname_planning)
  204. downloadAndSave(doc_uuid, creneau_gid, fname_creneau)
  205. downloadAndSave(doc_uuid, benevole_gid, fname_contact)
  206. try:
  207. df_contact = getContactDataFrame(fname_contact)
  208. except SchemaError as exc:
  209. msg = "Donnée erronée sur les bénévoles\n" + str(exc)
  210. raise ParsingError(msg)
  211. try:
  212. df_creneau = getCreneauDataFrame(fname_creneau)
  213. except SchemaError as exc:
  214. raise ParsingError("Donnée erronée des description des créneaux\n" + str(exc))
  215. try:
  216. df_planning = getPlanningDataFrame(fname_planning, saturday_date)
  217. except SchemaError as exc:
  218. raise ParsingError("Donnée erronée des créneaux\n" + str(exc))
  219. os.remove(fname_planning)
  220. os.remove(fname_creneau)
  221. os.remove(fname_contact)
  222. return GsheetData(df_contact, df_creneau, df_planning)