spacepaste

  1.  
  2. import random
  3. from termcolor import colored
  4. from oslo_stations_and_chargers import district_names
  5. from oslo_stations_and_chargers import charge_points_schkuko, charge_points_type2, charge_points_superCharger
  6. from charging_modes import type1charger, type2charger, type3charger
  7. import logging
  8. logger = logging.getLogger("my_logger")
  9. class ChargingStation():
  10. amount_of_cars_that_got_sent_to_this_station = 0
  11. amount_of_cars_that_entered_the_station = 0
  12. station_index = 0
  13. received_cars_from_neighbours = 0
  14. next_station = False
  15. rejected_cars = 0
  16. bailed_cars = 0
  17. def __init__(self, station_name, first_station=True):
  18. self.first_station = first_station
  19. self.station_name = station_name
  20. self.station_index = district_names.index(self.station_name) # imported from the oslo_stations_and_chargers.
  21. print(f"Station Index: {self.station_index}")
  22. print(f"Station Name: {self.station_name}")
  23. self.type_1_charger = charge_points_schkuko[
  24. self.station_index] # imported from the oslo_stations_and_chargers.py
  25. self.type_2_charger = charge_points_type2[self.station_index] # imported from the oslo_stations_and_chargers.py
  26. self.super_charger = charge_points_superCharger[
  27. self.station_index] # imported from the oslo_stations_and_chargers.py
  28. print(f"type_1_chargers: {self.type_1_charger}")
  29. self.my_neighbours = []
  30. self.total_amount_of_chargers = []
  31. self.exhausted_cars = []
  32. # ================== Send information about amount of chargers into charging_modes.py ===============#
  33. self.type_1_all_chargers = []
  34. for i in range(0, self.type_1_charger):
  35. self.type_1_all_chargers.append(type1charger())
  36. self.total_amount_of_chargers.append(type1charger())
  37. # ============================================================================================#
  38. self.type_2_all_chargers = []
  39. for i in range(0, self.type_2_charger):
  40. self.type_2_all_chargers.append(type2charger())
  41. self.total_amount_of_chargers.append(type2charger())
  42. # ============================================================================================#
  43. self.type_3_all_chargers = []
  44. for i in range(0, self.super_charger):
  45. self.type_3_all_chargers.append(type3charger())
  46. self.total_amount_of_chargers.append(type3charger())
  47. # ----------------
  48. for self.one in self.type_1_all_chargers:
  49. self.one.finished_charging = []
  50. for self.two in self.type_2_all_chargers:
  51. self.two.finished_charging = []
  52. for self.three in self.type_3_all_chargers:
  53. self.three.finished_charging = []
  54. # ----------------------
  55. # ===================================Queue parameters here:==================
  56. self.cars_waiting_in_queue = []
  57. self.max_queue_length = 20
  58. # ===========================================================================
  59. def __eq__(self, other):
  60. return self.station_index == other.station_index
  61. def __repr__(self):
  62. return (f"""
  63. Station: {self.station_name}:
  64. Total amount of chargers at this station: {len(self.total_amount_of_chargers)}
  65. Amount of type1 chargers: {self.type_1_charger}
  66. Amount of type2: {self.type_2_charger}
  67. Amount of superchargers: {self.super_charger}
  68. """)
  69. def add_car(self, car, attempts):
  70. self.amount_of_cars_that_got_sent_to_this_station += 1
  71. car_added = self.always_pick_the_best_charger_if_possible(car, attempts)
  72. if car_added == True:
  73. self.amount_of_cars_that_entered_the_station += 1
  74. else:
  75. try:
  76. # logger.info(f"Queue length before: {len(self.cars_waiting_in_queue)}")
  77. # logger.debug(f"Queue length before: {len(self.cars_waiting_in_queue)}")
  78. queue_car = self.cars_waiting_in_queue.pop(0)
  79. # logger.debug(f"Queue car picked: {queue_car}")
  80. # logger.info(f"Queue car picked: {queue_car}")
  81. # logger.debug("calling recycle method")
  82. # logger.info("calling recycle method")
  83. self.always_pick_the_best_charger_if_possible(queue_car, attempts)
  84. # logger.debug("calling recycle method")
  85. # logger.info("calling recycle method")
  86. # logger.debug(f"Queue length after: {len(self.cars_waiting_in_queue)}")
  87. # logger.info(f"Queue length after: {len(self.cars_waiting_in_queue)}")
  88. except IndexError as e:
  89. # logger.exception(e)
  90. logger.debug("Calm before the storm..")
  91. def add_neighbour(self, n):
  92. self.my_neighbours.append(n)
  93. # ============================== QUEUE METHODS HERE ===================================
  94. # ============================== QUEUE METHODS HERE ===================================
  95. # ============================== QUEUE METHODS HERE ===================================
  96. def add_to_waiting_queue(self, car, attempts):
  97. if len(self.cars_waiting_in_queue) >= self.max_queue_length:
  98. # logger.debug("QUEUE IS FULL !!!") # WORKS fine
  99. self.send_car_to_next_station(car, attempts)
  100. self.rejected_cars = self.rejected_cars + 1
  101. elif len(self.cars_waiting_in_queue) >= 0 and len(self.total_amount_of_chargers) == 0:
  102. logger.debug("ID 1: NO CHARGERS AT THIS STATION")
  103. self.send_car_to_next_station(car, attempts)
  104. self.rejected_cars = self.rejected_cars + 1
  105. logger.debug("ID 1:This method got called!") # Works fine
  106. else:
  107. self.cars_waiting_in_queue.append(car)
  108. return True
  109. def update_patience_and_status_for_cars_in_queue(self):
  110. self.cars_that_wants_to_leave_the_queue_and_go_to_next_station = []
  111. # unpatient_cars = self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.pop(0)
  112. for cars in self.cars_waiting_in_queue:
  113. # print(f"before execution: {(len(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station))} , {(len(self.cars_waiting_in_queue))}")
  114. cars.update_patience_in_queue()
  115. if cars.zero_patience_car_wants_to_leave == True:
  116. self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.append(cars)
  117. self.cars_waiting_in_queue.remove(cars)
  118. # print(f"AFTER execution: {(len(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station))} , {(len(self.cars_waiting_in_queue))}")
  119. # logger.debug("I am leaving!")
  120. self.send_car_to_next_station(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.pop(0), attempts=2)
  121. self.bailed_cars = self.bailed_cars + 1
  122. # logger.debug("I got sent!")
  123. # print(self.cars_waiting_in_queue)
  124. # ============================== QUEUE METHODS above ===================================
  125. # ============================== QUEUE METHODS above ===================================
  126. # ============================== QUEUE METHODS above ===================================
  127. def always_pick_the_best_charger_if_possible(self, car, attempts):
  128. logger.debug("Let us see if Type 3 chargers are available...")
  129. for type3charger in self.type_3_all_chargers:
  130. if type3charger.car == None:
  131. type3charger.add_car_to_charger(car)
  132. logger.debug("TYPE 3 was available, picked it!")
  133. return True
  134. else:
  135. logger.debug("No type 3 chargers available, what about type 2?")
  136. return self.pick_the_second_best_charger(car, attempts)
  137. def pick_the_second_best_charger(self, car, attempts):
  138. logger.debug("Let us see if Type 2 chargers are available...")
  139. for type2charger in self.type_2_all_chargers:
  140. if type2charger.car == None:
  141. type2charger.add_car_to_charger(car)
  142. logger.debug("TYPE 2 was available, picked it!")
  143. return True
  144. else:
  145. logger.debug("No type 2 chargers avaiable, what about type 1/Schkuko then?")
  146. return self.pick_a_schkuko_charger(car, attempts)
  147. def pick_a_schkuko_charger(self, car, attempts):
  148. logger.debug("Ok..Let us see if Type 1/Schkuko chargers are available...")
  149. for type1charger in self.type_1_all_chargers:
  150. if type1charger.car == None:
  151. type1charger.add_car_to_charger(car)
  152. logger.debug("TYPE 1/Schkuko was available, picked it!")
  153. return True
  154. else:
  155. logger.debug("All chargers are full! --> plz wait in queue!")
  156. # print("All chargers full!, adding to Queue")
  157. return self.add_to_waiting_queue(car, attempts)
  158. def grab_car_from_waiting_queue_and_place_to_charger(self):
  159. # logger.info("picking car from queue")
  160. for charger1 in self.type_1_all_chargers:
  161. if len(self.cars_waiting_in_queue) >= 0 and charger1.car == None:
  162. try:
  163. queued_car = self.cars_waiting_in_queue.pop(0)
  164. logger.debug(f"Grabbing car: {queued_car}")
  165. # logger.info(f"Grabbing car: {queued_car}")
  166. charger1.add_car_to_charger(queued_car)
  167. # logger.info(f"Grabbed car and placed to CHARGER 1: {queued_car}")
  168. logger.debug(f"Grabbed car and placed to CHARGER 1: {queued_car}")
  169. except IndexError as e:
  170. # logger.exception(e)
  171. logger.debug("Queue is empty")
  172. for charger2 in self.type_2_all_chargers: # NOT GETTING CALLED?
  173. if len(self.cars_waiting_in_queue) >= 0 and charger2.car == None:
  174. try:
  175. queued_car = self.cars_waiting_in_queue.pop(0)
  176. logger.debug(f"Grabbing car: {queued_car}")
  177. # logger.info(f"Grabbing car: {queued_car}")
  178. charger2.add_car_to_charger(queued_car)
  179. # logger.info(f"Grabbed car and placed to CHARGER 2: {queued_car}")
  180. logger.debug(f"Grabbed car and placed to CHARGER 2: {queued_car}")
  181. except IndexError as e:
  182. # logger.exception(e)
  183. logger.debug("Queue is empty")
  184. for charger3 in self.type_3_all_chargers: # NOT GETTING CALLED?
  185. if len(self.cars_waiting_in_queue) >= 0 and charger3.car == None:
  186. try:
  187. queued_car = self.cars_waiting_in_queue.pop(0)
  188. logger.debug(f"Grabbing car: {queued_car}")
  189. # logger.info(f"Grabbing car: {queued_car}")
  190. charger3.add_car_to_charger(queued_car)
  191. logger.info(f"Grabbed car and placed to CHARGER 3: {queued_car}")
  192. # logger.debug(f"Grabbed car and placed to CHARGER 3: {queued_car}")
  193. except IndexError as e:
  194. # logger.exception(e)
  195. logger.debug("Queue is empty")
  196. else:
  197. return False
  198. def reset_method(self):
  199. self.amount_of_cars_that_got_sent_to_this_station = 0
  200. self.amount_of_cars_that_entered_the_station = 0
  201. self.chargers_in_use = 0
  202. self.station_index = 0
  203. self.received_cars_from_neighbours = 0
  204. self.next_station = False
  205. self.rejected_cars = 0
  206. self.exhausted_cars = []
  207. self.bailed_cars = 0
  208. self.car_received_from_neighbours = 0
  209. self.cars_waiting_in_queue = []
  210. self.total_chargers_in_use = 0
  211. self.amount_of_cars_charging_at_type1 = 0
  212. self.amount_of_cars_charging_at_type2 = 0
  213. self.amount_of_cars_charging_at_type3 = 0
  214. self.cars_that_have_finished_charging_at_type1 = 0
  215. self.cars_that_have_finished_charging_at_type2 = 0
  216. self.cars_that_have_finished_charging_at_type3 = 0
  217. for finished_charging1 in self.type_1_all_chargers:
  218. finished_charging1.finished_charging = []
  219. for finished_charging2 in self.type_2_all_chargers:
  220. finished_charging2.finished_charging = []
  221. for finished_charging3 in self.type_3_all_chargers:
  222. finished_charging3.finished_charging = []
  223. def print_status_of_station(self):
  224. self.amount_of_cars_charging_at_type1 = 0
  225. self.amount_of_cars_charging_at_type2 = 0
  226. self.amount_of_cars_charging_at_type3 = 0
  227. self.cars_that_have_finished_charging_at_type1 = 0
  228. self.cars_that_have_finished_charging_at_type2 = 0
  229. self.cars_that_have_finished_charging_at_type3 = 0
  230. for charger1 in self.type_1_all_chargers:
  231. #print(colored(len(charger1.finished_charging),'green'))
  232. if charger1.active_car == True:
  233. self.amount_of_cars_charging_at_type1 += 1
  234. if charger1.car_that_has_finished_charging == True:
  235. self.cars_that_have_finished_charging_at_type1 += 1
  236. for charger2 in self.type_2_all_chargers:
  237. #print(colored(len(charger2.finished_charging), 'yellow'))
  238. if charger2.active_car == True:
  239. self.amount_of_cars_charging_at_type2 += 1
  240. if charger2.car_that_has_finished_charging == True:
  241. self.cars_that_have_finished_charging_at_type2 += 1
  242. for charger3 in self.type_3_all_chargers:
  243. #print(colored(len(charger3.finished_charging), 'blue'))
  244. if charger3.active_car == True:
  245. self.amount_of_cars_charging_at_type3 += 1
  246. if charger3.car_that_has_finished_charging == True:
  247. self.cars_that_have_finished_charging_at_type3 += 1
  248. self.total_chargers_in_use = (self.amount_of_cars_charging_at_type1 + self.amount_of_cars_charging_at_type2 + self.amount_of_cars_charging_at_type3)
  249. self.percentage_of_chargers_in_use = self.total_chargers_in_use / len(self.total_amount_of_chargers) * 100 if len(self.total_amount_of_chargers) > 0 else 0
  250. return (f"""
  251. Total amount of EV that visited the station during the day : {self.amount_of_cars_that_got_sent_to_this_station}.
  252. EV`s that got to enter the station : {self.amount_of_cars_that_entered_the_station}
  253. EV`s finished charging at type 1 : {self.cars_that_have_finished_charging_at_type1}
  254. EV`s finished charging at type 2 : {self.cars_that_have_finished_charging_at_type2}
  255. EV`s finished charging at type 3 : {self.cars_that_have_finished_charging_at_type3}
  256. "--------------------------------------------------------------------------------------------------------------------------------"
  257. cars charging at type 1: {self.amount_of_cars_charging_at_type1}
  258. cars charging at type 2: {self.amount_of_cars_charging_at_type2}
  259. cars charging at type 3: {self.amount_of_cars_charging_at_type3}
  260. cars charging totally : {self.total_chargers_in_use}
  261. Percentage of chargers used : {self.percentage_of_chargers_in_use}%
  262. "--------------------------------------------------------------------------------------------------------------------------------"
  263. Cars currently waiting in Queue : {(len(self.cars_waiting_in_queue))}
  264. Total cars rejected from the station: {self.rejected_cars}
  265. Total cars that have bailed away : {self.bailed_cars}
  266. Cars exhausted after 2 attempts : {len(self.exhausted_cars)}
  267. "--------------------------------------------------------------------------------------------------------------------------------"
  268. """)
  269. def get_status_of_queue(self):
  270. return len(self.cars_waiting_in_queue)
  271. def give_me_neighbour_stations(self):
  272. for neighbour in self.my_neighbours:
  273. print("Closest nearest station are: " + neighbour)
  274. def pick_a_random_neighbour(self):
  275. self.next_station = True
  276. self.first_station = False
  277. return random.choice(self.my_neighbours) # and self.first_station == False
  278. def send_car_to_next_station(self, car, attempts):
  279. if attempts <= 0:
  280. logger.debug(("NO MORE ATTEMPTS: {}".format(car)))
  281. logger.debug("Adding car to exhausted/destroyed list:")
  282. self.exhausted_cars.append(car)
  283. return False
  284. elif attempts == 1:
  285. logger.debug("................................... we try again?")
  286. logger.debug(f"Calling method: send_car_to_next_station")
  287. # logger.info(f"Calling method: send_car_to_next_station ")
  288. # if self.pick_a_random_neighbour() == self.first_station:
  289. # logger.debug("You have already been here!!")
  290. car_at_first_station = False
  291. self.pick_a_random_neighbour().add_car(car, attempts-1)
  292. logger.debug("1 ATTEMPTS LEFT")
  293. logger.debug(("CAR SENT TO NEXT STATION: {}".format(car)))
  294. logger.debug(("NOW YOU HAVE 0 ATTEMPTS LEFT AFTER"))
  295. # random.choice(self.my_neighbours).add_car(car)
  296. # logger.debug("Car sent part 1?")
  297. # logger.info("Car sent part 2?")
  298. elif attempts == 2:
  299. try:
  300. logger.debug("2 ATTEMPTS LEFT")
  301. logger.debug(("CAR SENT TO NEXT STATION: {}".format(car)))
  302. logger.debug("NOW YOU HAVE 1 ATTEMPT LEFT")
  303. # logger.info(f"Calling method: send_car_to_next_station ")
  304. # if self.pick_a_random_neighbour() == self.first_station:
  305. # logger.debug("You have already been here!!")
  306. car.car_at_first_station = False
  307. self.pick_a_random_neighbour().add_car(car, attempts-1)
  308. logger.debug("Car sent to next station!")
  309. # random.choice(self.my_neighbours).add_car(car)
  310. # logger.debug("Car sent part 1?")
  311. # logger.info("Car sent part 2?")
  312. except IndexError as e:
  313. logger.exception(e)
  314. logger.info("Neighours: {}".format(len(self.my_neighbours)))
  315.