-
- import random
- from termcolor import colored
- from oslo_stations_and_chargers import district_names
- from oslo_stations_and_chargers import charge_points_schkuko, charge_points_type2, charge_points_superCharger
-
- from charging_modes import type1charger, type2charger, type3charger
- import logging
-
- logger = logging.getLogger("my_logger")
-
-
- class ChargingStation():
- amount_of_cars_that_got_sent_to_this_station = 0
- amount_of_cars_that_entered_the_station = 0
- station_index = 0
- received_cars_from_neighbours = 0
- next_station = False
- rejected_cars = 0
- bailed_cars = 0
-
- def __init__(self, station_name, first_station=True):
- self.first_station = first_station
- self.station_name = station_name
-
- self.station_index = district_names.index(self.station_name) # imported from the oslo_stations_and_chargers.
- print(f"Station Index: {self.station_index}")
- print(f"Station Name: {self.station_name}")
- self.type_1_charger = charge_points_schkuko[
- self.station_index] # imported from the oslo_stations_and_chargers.py
- self.type_2_charger = charge_points_type2[self.station_index] # imported from the oslo_stations_and_chargers.py
- self.super_charger = charge_points_superCharger[
- self.station_index] # imported from the oslo_stations_and_chargers.py
- print(f"type_1_chargers: {self.type_1_charger}")
-
- self.my_neighbours = []
- self.total_amount_of_chargers = []
- self.exhausted_cars = []
-
- # ================== Send information about amount of chargers into charging_modes.py ===============#
- self.type_1_all_chargers = []
-
- for i in range(0, self.type_1_charger):
- self.type_1_all_chargers.append(type1charger())
- self.total_amount_of_chargers.append(type1charger())
- # ============================================================================================#
- self.type_2_all_chargers = []
-
- for i in range(0, self.type_2_charger):
- self.type_2_all_chargers.append(type2charger())
- self.total_amount_of_chargers.append(type2charger())
- # ============================================================================================#
- self.type_3_all_chargers = []
-
- for i in range(0, self.super_charger):
- self.type_3_all_chargers.append(type3charger())
- self.total_amount_of_chargers.append(type3charger())
-
- # ----------------
- for self.one in self.type_1_all_chargers:
- self.one.finished_charging = []
-
- for self.two in self.type_2_all_chargers:
- self.two.finished_charging = []
-
- for self.three in self.type_3_all_chargers:
- self.three.finished_charging = []
- # ----------------------
-
- # ===================================Queue parameters here:==================
-
- self.cars_waiting_in_queue = []
- self.max_queue_length = 20
-
- # ===========================================================================
-
- def __eq__(self, other):
- return self.station_index == other.station_index
-
- def __repr__(self):
- return (f"""
- Station: {self.station_name}:
- Total amount of chargers at this station: {len(self.total_amount_of_chargers)}
-
- Amount of type1 chargers: {self.type_1_charger}
- Amount of type2: {self.type_2_charger}
- Amount of superchargers: {self.super_charger}
-
- """)
-
- def add_car(self, car, attempts):
- self.amount_of_cars_that_got_sent_to_this_station += 1
- car_added = self.always_pick_the_best_charger_if_possible(car, attempts)
- if car_added == True:
- self.amount_of_cars_that_entered_the_station += 1
- else:
- try:
- # logger.info(f"Queue length before: {len(self.cars_waiting_in_queue)}")
- # logger.debug(f"Queue length before: {len(self.cars_waiting_in_queue)}")
- queue_car = self.cars_waiting_in_queue.pop(0)
- # logger.debug(f"Queue car picked: {queue_car}")
- # logger.info(f"Queue car picked: {queue_car}")
- # logger.debug("calling recycle method")
- # logger.info("calling recycle method")
- self.always_pick_the_best_charger_if_possible(queue_car, attempts)
- # logger.debug("calling recycle method")
- # logger.info("calling recycle method")
- # logger.debug(f"Queue length after: {len(self.cars_waiting_in_queue)}")
- # logger.info(f"Queue length after: {len(self.cars_waiting_in_queue)}")
- except IndexError as e:
- # logger.exception(e)
- logger.debug("Calm before the storm..")
-
- def add_neighbour(self, n):
- self.my_neighbours.append(n)
-
- # ============================== QUEUE METHODS HERE ===================================
- # ============================== QUEUE METHODS HERE ===================================
- # ============================== QUEUE METHODS HERE ===================================
- def add_to_waiting_queue(self, car, attempts):
- if len(self.cars_waiting_in_queue) >= self.max_queue_length:
- # logger.debug("QUEUE IS FULL !!!") # WORKS fine
-
- self.send_car_to_next_station(car, attempts)
- self.rejected_cars = self.rejected_cars + 1
-
- elif len(self.cars_waiting_in_queue) >= 0 and len(self.total_amount_of_chargers) == 0:
- logger.debug("ID 1: NO CHARGERS AT THIS STATION")
- self.send_car_to_next_station(car, attempts)
- self.rejected_cars = self.rejected_cars + 1
- logger.debug("ID 1:This method got called!") # Works fine
-
- else:
- self.cars_waiting_in_queue.append(car)
- return True
-
- def update_patience_and_status_for_cars_in_queue(self):
- self.cars_that_wants_to_leave_the_queue_and_go_to_next_station = []
- # unpatient_cars = self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.pop(0)
- for cars in self.cars_waiting_in_queue:
- # print(f"before execution: {(len(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station))} , {(len(self.cars_waiting_in_queue))}")
- cars.update_patience_in_queue()
- if cars.zero_patience_car_wants_to_leave == True:
- self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.append(cars)
- self.cars_waiting_in_queue.remove(cars)
- # print(f"AFTER execution: {(len(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station))} , {(len(self.cars_waiting_in_queue))}")
- # logger.debug("I am leaving!")
-
- self.send_car_to_next_station(self.cars_that_wants_to_leave_the_queue_and_go_to_next_station.pop(0), attempts=2)
- self.bailed_cars = self.bailed_cars + 1
- # logger.debug("I got sent!")
-
- # print(self.cars_waiting_in_queue)
-
- # ============================== QUEUE METHODS above ===================================
- # ============================== QUEUE METHODS above ===================================
- # ============================== QUEUE METHODS above ===================================
-
- def always_pick_the_best_charger_if_possible(self, car, attempts):
- logger.debug("Let us see if Type 3 chargers are available...")
- for type3charger in self.type_3_all_chargers:
- if type3charger.car == None:
- type3charger.add_car_to_charger(car)
- logger.debug("TYPE 3 was available, picked it!")
- return True
- else:
- logger.debug("No type 3 chargers available, what about type 2?")
- return self.pick_the_second_best_charger(car, attempts)
-
- def pick_the_second_best_charger(self, car, attempts):
- logger.debug("Let us see if Type 2 chargers are available...")
- for type2charger in self.type_2_all_chargers:
- if type2charger.car == None:
- type2charger.add_car_to_charger(car)
- logger.debug("TYPE 2 was available, picked it!")
- return True
- else:
- logger.debug("No type 2 chargers avaiable, what about type 1/Schkuko then?")
- return self.pick_a_schkuko_charger(car, attempts)
-
- def pick_a_schkuko_charger(self, car, attempts):
- logger.debug("Ok..Let us see if Type 1/Schkuko chargers are available...")
- for type1charger in self.type_1_all_chargers:
- if type1charger.car == None:
- type1charger.add_car_to_charger(car)
- logger.debug("TYPE 1/Schkuko was available, picked it!")
- return True
- else:
- logger.debug("All chargers are full! --> plz wait in queue!")
- # print("All chargers full!, adding to Queue")
- return self.add_to_waiting_queue(car, attempts)
-
- def grab_car_from_waiting_queue_and_place_to_charger(self):
- # logger.info("picking car from queue")
- for charger1 in self.type_1_all_chargers:
- if len(self.cars_waiting_in_queue) >= 0 and charger1.car == None:
- try:
- queued_car = self.cars_waiting_in_queue.pop(0)
- logger.debug(f"Grabbing car: {queued_car}")
- # logger.info(f"Grabbing car: {queued_car}")
- charger1.add_car_to_charger(queued_car)
- # logger.info(f"Grabbed car and placed to CHARGER 1: {queued_car}")
- logger.debug(f"Grabbed car and placed to CHARGER 1: {queued_car}")
-
- except IndexError as e:
- # logger.exception(e)
- logger.debug("Queue is empty")
-
- for charger2 in self.type_2_all_chargers: # NOT GETTING CALLED?
- if len(self.cars_waiting_in_queue) >= 0 and charger2.car == None:
- try:
- queued_car = self.cars_waiting_in_queue.pop(0)
- logger.debug(f"Grabbing car: {queued_car}")
- # logger.info(f"Grabbing car: {queued_car}")
- charger2.add_car_to_charger(queued_car)
-
- # logger.info(f"Grabbed car and placed to CHARGER 2: {queued_car}")
- logger.debug(f"Grabbed car and placed to CHARGER 2: {queued_car}")
- except IndexError as e:
- # logger.exception(e)
- logger.debug("Queue is empty")
-
- for charger3 in self.type_3_all_chargers: # NOT GETTING CALLED?
- if len(self.cars_waiting_in_queue) >= 0 and charger3.car == None:
- try:
- queued_car = self.cars_waiting_in_queue.pop(0)
- logger.debug(f"Grabbing car: {queued_car}")
- # logger.info(f"Grabbing car: {queued_car}")
- charger3.add_car_to_charger(queued_car)
- logger.info(f"Grabbed car and placed to CHARGER 3: {queued_car}")
- # logger.debug(f"Grabbed car and placed to CHARGER 3: {queued_car}")
- except IndexError as e:
- # logger.exception(e)
- logger.debug("Queue is empty")
-
- else:
- return False
-
- def reset_method(self):
- self.amount_of_cars_that_got_sent_to_this_station = 0
- self.amount_of_cars_that_entered_the_station = 0
- self.chargers_in_use = 0
- self.station_index = 0
- self.received_cars_from_neighbours = 0
- self.next_station = False
- self.rejected_cars = 0
- self.exhausted_cars = []
- self.bailed_cars = 0
- self.car_received_from_neighbours = 0
- self.cars_waiting_in_queue = []
- self.total_chargers_in_use = 0
- self.amount_of_cars_charging_at_type1 = 0
- self.amount_of_cars_charging_at_type2 = 0
- self.amount_of_cars_charging_at_type3 = 0
-
- self.cars_that_have_finished_charging_at_type1 = 0
- self.cars_that_have_finished_charging_at_type2 = 0
- self.cars_that_have_finished_charging_at_type3 = 0
-
- for finished_charging1 in self.type_1_all_chargers:
- finished_charging1.finished_charging = []
- for finished_charging2 in self.type_2_all_chargers:
- finished_charging2.finished_charging = []
- for finished_charging3 in self.type_3_all_chargers:
- finished_charging3.finished_charging = []
-
- def print_status_of_station(self):
- self.amount_of_cars_charging_at_type1 = 0
- self.amount_of_cars_charging_at_type2 = 0
- self.amount_of_cars_charging_at_type3 = 0
-
- self.cars_that_have_finished_charging_at_type1 = 0
- self.cars_that_have_finished_charging_at_type2 = 0
- self.cars_that_have_finished_charging_at_type3 = 0
-
- for charger1 in self.type_1_all_chargers:
- #print(colored(len(charger1.finished_charging),'green'))
- if charger1.active_car == True:
- self.amount_of_cars_charging_at_type1 += 1
- if charger1.car_that_has_finished_charging == True:
- self.cars_that_have_finished_charging_at_type1 += 1
-
- for charger2 in self.type_2_all_chargers:
- #print(colored(len(charger2.finished_charging), 'yellow'))
- if charger2.active_car == True:
- self.amount_of_cars_charging_at_type2 += 1
- if charger2.car_that_has_finished_charging == True:
- self.cars_that_have_finished_charging_at_type2 += 1
- for charger3 in self.type_3_all_chargers:
- #print(colored(len(charger3.finished_charging), 'blue'))
- if charger3.active_car == True:
- self.amount_of_cars_charging_at_type3 += 1
- if charger3.car_that_has_finished_charging == True:
- self.cars_that_have_finished_charging_at_type3 += 1
-
- self.total_chargers_in_use = (self.amount_of_cars_charging_at_type1 + self.amount_of_cars_charging_at_type2 + self.amount_of_cars_charging_at_type3)
- self.percentage_of_chargers_in_use = self.total_chargers_in_use / len(self.total_amount_of_chargers) * 100 if len(self.total_amount_of_chargers) > 0 else 0
-
- return (f"""
- Total amount of EV that visited the station during the day : {self.amount_of_cars_that_got_sent_to_this_station}.
- EV`s that got to enter the station : {self.amount_of_cars_that_entered_the_station}
-
- EV`s finished charging at type 1 : {self.cars_that_have_finished_charging_at_type1}
- EV`s finished charging at type 2 : {self.cars_that_have_finished_charging_at_type2}
- EV`s finished charging at type 3 : {self.cars_that_have_finished_charging_at_type3}
- "--------------------------------------------------------------------------------------------------------------------------------"
- cars charging at type 1: {self.amount_of_cars_charging_at_type1}
- cars charging at type 2: {self.amount_of_cars_charging_at_type2}
- cars charging at type 3: {self.amount_of_cars_charging_at_type3}
-
- cars charging totally : {self.total_chargers_in_use}
- Percentage of chargers used : {self.percentage_of_chargers_in_use}%
-
-
- "--------------------------------------------------------------------------------------------------------------------------------"
- Cars currently waiting in Queue : {(len(self.cars_waiting_in_queue))}
- Total cars rejected from the station: {self.rejected_cars}
- Total cars that have bailed away : {self.bailed_cars}
-
- Cars exhausted after 2 attempts : {len(self.exhausted_cars)}
-
- "--------------------------------------------------------------------------------------------------------------------------------"
- """)
-
- def get_status_of_queue(self):
- return len(self.cars_waiting_in_queue)
-
- def give_me_neighbour_stations(self):
- for neighbour in self.my_neighbours:
- print("Closest nearest station are: " + neighbour)
-
- def pick_a_random_neighbour(self):
- self.next_station = True
- self.first_station = False
- return random.choice(self.my_neighbours) # and self.first_station == False
-
- def send_car_to_next_station(self, car, attempts):
- if attempts <= 0:
- logger.debug(("NO MORE ATTEMPTS: {}".format(car)))
- logger.debug("Adding car to exhausted/destroyed list:")
- self.exhausted_cars.append(car)
- return False
- elif attempts == 1:
- logger.debug("................................... we try again?")
- logger.debug(f"Calling method: send_car_to_next_station")
- # logger.info(f"Calling method: send_car_to_next_station ")
- # if self.pick_a_random_neighbour() == self.first_station:
- # logger.debug("You have already been here!!")
- car_at_first_station = False
- self.pick_a_random_neighbour().add_car(car, attempts-1)
-
- logger.debug("1 ATTEMPTS LEFT")
- logger.debug(("CAR SENT TO NEXT STATION: {}".format(car)))
- logger.debug(("NOW YOU HAVE 0 ATTEMPTS LEFT AFTER"))
- # random.choice(self.my_neighbours).add_car(car)
- # logger.debug("Car sent part 1?")
- # logger.info("Car sent part 2?")
- elif attempts == 2:
- try:
- logger.debug("2 ATTEMPTS LEFT")
- logger.debug(("CAR SENT TO NEXT STATION: {}".format(car)))
- logger.debug("NOW YOU HAVE 1 ATTEMPT LEFT")
-
- # logger.info(f"Calling method: send_car_to_next_station ")
- # if self.pick_a_random_neighbour() == self.first_station:
- # logger.debug("You have already been here!!")
- car.car_at_first_station = False
- self.pick_a_random_neighbour().add_car(car, attempts-1)
-
- logger.debug("Car sent to next station!")
- # random.choice(self.my_neighbours).add_car(car)
- # logger.debug("Car sent part 1?")
- # logger.info("Car sent part 2?")
- except IndexError as e:
- logger.exception(e)
- logger.info("Neighours: {}".format(len(self.my_neighbours)))
-