NotesWhat is notes.io?

Notes brand slogan

Notes - notes.io

"""
Berth Pipeline
"""
from bisect import bisect_left, bisect_right
from collections import defaultdict, deque
from dataclasses import dataclass, field
import datetime
import logging
import re
from typing import Any, Optional

from eventflow.core import (
BaseProcess,
ProcessDrivenProcessToRun,
TicketDrivenProcessToRun,
)
from eventflow.processes.flag import FlagHold
from eventflow.processes.simple import Simple
from eventflow.processes.store_exit import StoreExitSingle
from eventflow.processes.workpool import WorkpoolMultiple, WorkpoolSingle
import pandas as pd

from simulation.data_pipelines.input_pipeline import BerthPlanData
from simulation.eventflow.apmt.environment import APMT_EventFlowEnvironment
from simulation.eventflow.apmt.ticket import Container, Vessel
from simulation.flows.berth.berth_sim_class import BerthFlowSimClass
from simulation.utils.debug_and_profile import set_pudb_trace
from simulation.utils.utility_funcs import DEFAULT_LOG_LEVEL, setup_logger

SECONDS_PER_DAY = 24 * 60 * 60
WAIT_FOR_RTG_TIMEOUT_SECONDS = 7 * SECONDS_PER_DAY
WAIT_FOR_TRUCK_TIMEOUT_SECONDS = 7 * SECONDS_PER_DAY


@dataclass
class ImportFlowPipeline:
"""
class: ImportFlowPipeline used to setup and the processes in the Import Flow
"""

env: APMT_EventFlowEnvironment
logger: Optional[logging.Logger] = None
input_data: Any = field(init=False)

def __post_init__(self):
if self.logger is None:
self.logger = setup_logger(logger_name=self.__class__.__name__, log_level=DEFAULT_LOG_LEVEL)
self.input_data = self.env.input_data

def execute_call_method(self):
"""
Wrapper to execute the __call__ method in this class in an explicit way
throughout codebase
"""
self()
return self

def __call__(self):
# Intialise the import flow simulation class
# do we need ImportFlowSimClass(self.env) ?

# Create a simulation class for the loading flow (included any functionality require during the simulation)
# ImportFlowSimClass(self.env)

# Setup the EvenFlow processes

self.setup_processes()
self.get_import_tickets()

def setup_processes(self):
"""
Method used to setup the EventFlow process required for the import flow (i.e. from yard to gate).
"""
rtgs = self.env.rtgs
external_trucks = self.env.external_trucks

"""self.env.add_workpool(
resource_id="RS",
initial_capacity=20,
requests_uses_priority=False,
)"""

# Process of containers exiting stack. Timeout of 1s to ensure we don't build backlog of requests when stack is
# empty
self.env.add_process(
StoreExitSingle,
process_id="ReadyToExitStackForImport",
resource_ids=["yard"],
record_flow_detail=False,
resource_request_timeout=1,
process_driven_processes_to_run=[ProcessDrivenProcessToRun("RequestExternalTruck_Import")],
success_hook=[set_ticket_as_import],
meta_data={"process_type": "import"},
)

# Request one of the slots in the external trucks workpools (limited by the number of external trucks)
self.env.add_process(
WorkpoolSingle,
process_id="RequestExternalTruck_Import",
resource_ids=["ET"], # lambda process, ticket: [ticket._env.external_trucks],
process_driven_processes_to_run=[
ProcessDrivenProcessToRun("AssignExternalTruck_Import")
], # AssignExternalTruck_Import
success_hook=[
request_an_external_truck,
], # , if_debug_print_container_flow_state],
meta_data={"process_type": "import"},
)
self.env.add_process(
FlagHold,
process_id="AssignExternalTruck_Import",
resource_ids=lambda process, ticket: [ticket.has_import_external_truck.flag_id],
resource_request_timeout=WAIT_FOR_TRUCK_TIMEOUT_SECONDS,
# timeout_hook=raise_simulation_timeout_from_process,
process_driven_processes_to_run=lambda process, ticket: [
ProcessDrivenProcessToRun("WaitExternalTruck_Import")
],
success_hook=[
# if_needs_request_reachStacker,
], # if_debug_print_container_flow_state
meta_data={"process_type": "import"},
)
self.env.add_process(
FlagHold,
process_id="WaitExternalTruck_Import",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_source.flag_id],
resource_request_timeout=WAIT_FOR_TRUCK_TIMEOUT_SECONDS,
# timeout_hook=raise_simulation_timeout_from_process,
process_driven_processes_to_run=lambda process, ticket: [
ProcessDrivenProcessToRun("DriveExternalTruck_Import_toYard")
],
success_hook=[
assign_ticket_to_truck,
# if_needs_request_reachStacker,
], # if_debug_print_container_flow_state
meta_data={"process_type": "import"},
)

# Add process to assign a truck to the ticket.
# Note: As RTG has already been assigned to the ticket (see
# `rtg_sim_class.add_load_container_tickets_to_rtg`)
self.env.add_process(
FlagHold,
process_id="DriveExternalTruck_Import_toYard",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_destination.flag_id],
resource_request_timeout=WAIT_FOR_TRUCK_TIMEOUT_SECONDS,
# timeout_hook=raise_simulation_timeout_from_process,
process_driven_processes_to_run=lambda process, ticket: [
ProcessDrivenProcessToRun("WaitForRTG_Import")
if ticket.lift_requires_rtg
else ProcessDrivenProcessToRun("RequestReachStacker_Import")
if ticket.lift_requires_rs
else ProcessDrivenProcessToRun("EmptyHandler_Import")
],
success_hook=[
if_needs_rtg_add_lift_to_rtg,
if_needs_rtg_add_import_truck_to_rtg,
# if_needs_request_reachStacker,
], # if_debug_print_container_flow_state
meta_data={"process_type": "import"},
)

# Add process of wait for the RTG to start the container lift.
self.env.add_process(
FlagHold,
process_id="WaitForRTG_Import",
resource_ids=lambda process, ticket: [ticket.is_rtg_load_lift_started.flag_id],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("RTGLift_Import")],
# success_hook=if_debug_print_container_flow_state,
meta_data={"process_type": "import"},
)

# Add process to wait for the RTG to place the container on the external truck.
self.env.add_process(
FlagHold,
process_id="RTGLift_Import",
resource_ids=lambda process, ticket: [ticket.is_rtg_load_lift_complete.flag_id],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("RTGLoadPlacement_Import")],
success_hook=[
# self.env.alerts.update_clash_dict,
self.env.yard.remove_ticket_from_yard,
# if_debug_print_container_flow_state,
],
meta_data={"process_type": "import"},
)
self.env.add_process(
FlagHold,
process_id="RTGLoadPlacement_Import",
resource_ids=lambda process, ticket: [ticket.is_rtg_load_placement_complete.flag_id],
process_driven_processes_to_run=lambda process, ticket: [ProcessDrivenProcessToRun("DriveToGate_Import")],
success_hook=[
add_gate_out_destination_to_external_truck
# assign_ticket_to_truck,
# if_not_needs_rtg_assign_ticket_to_truck,
# if_debug_print_container_flow_state,
],
meta_data={"process_type": "import"},
)

# Request one of the slots in the Reach Stacker workpools (limited by the number of Reach stackers)
self.env.add_process(
WorkpoolSingle,
process_id="RequestReachStacker_Import",
resource_ids=["RS"], # lambda process, ticket: [ticket._env.external_trucks],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("WaitForReachStacker_Import")],
success_hook=[request_a_reach_stacker], # , if_debug_print_container_flow_state],
meta_data={"process_type": "import"},
)

# Process to carry out reach stacker import. Requires an external truck to be there
# before it will start the lift
self.env.add_process(
FlagHold,
process_id="WaitForReachStacker_Import",
resource_ids=lambda process, ticket: [ticket.has_import_reach_stacker.flag_id],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("DriveRStoYard_Import")],
success_hook=[], # if_debug_print_container_flow_state],
meta_data={"process_type": "import"},
)

# Add process to assign a reachstacker to the ticket.
self.env.add_process(
FlagHold,
process_id="DriveRStoYard_Import",
resource_ids=lambda process, ticket: [ticket.import_reach_stacker.is_at_source.flag_id],
# timeout_hook=raise_simulation_timeout_from_process,
process_driven_processes_to_run=lambda process, ticket: [ProcessDrivenProcessToRun("RSlift_Import")],
success_hook=[
self.env.yard.remove_ticket_from_yard,
assign_ticket_to_reach_stacker,
], # if_debug_print_container_flow_state
meta_data={"process_type": "import"},
)
self.env.add_process(
FlagHold,
process_id="RSlift_Import",
resource_ids=lambda process, ticket: [ticket.import_reach_stacker.is_lift_completed.flag_id],
process_driven_processes_to_run=lambda process, ticket: [ProcessDrivenProcessToRun("DriveToGate_Import")],
success_hook=[
release_reach_stacker,
add_gate_out_destination_to_external_truck,
# if_debug_print_container_flow_state,
],
meta_data={"process_type": "import"},
)
# Add process to wait for the assigned external truck to arrive at the container
# yard position.

# Process for truck to drive from gate to yard position.
""" self.env.add_process(
FlagHold,
process_id="Release_ReachStacker_Import",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_destination.flag_id],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("DriveToGate_Import")],
success_hook=[

], # if_debug_print_container_flow_state,
meta_data={"process_type": "import"},
) """
self.env.add_process(
FlagHold,
process_id="EmptyHandler_Import",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_destination.flag_id],
process_driven_processes_to_run=[ProcessDrivenProcessToRun("DriveToGate_Import")],
process_time=0,
success_hook=[add_gate_out_destination_to_external_truck], # if_debug_print_container_flow_state,
meta_data={"process_type": "import"},
)

# Process for truck to drive from the pull position to the gate.
self.env.add_process(
FlagHold,
process_id="DriveToGate_Import",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_destination.flag_id],
process_time=456, # gate out stages time = 50+386+20 = Get formalities like customs, weight checking time
success_hook=[
release_truck,
],
rel_workpools_all=True,
rel_workpools_delay=3600, # if_debug_print_container_flow_state,
meta_data={"process_type": "import"},
)
"""self.env.add_process(
FlagHold,
process_id="WaitForGateRelease_Import",
resource_ids=lambda process, ticket: [ticket.import_external_truck.is_at_destination.flag_id],
process_time=456, # gate out stages time = 50+386+20 = Get formalities like customs, weight checking time
success_hook=[
release_truck,
],
rel_workpools_all=True,
rel_workpools_delay=3600, # if_debug_print_container_flow_state,
meta_data={"process_type": "import"},
)"""
# TODO decide if we need Gate class
# Add process of wait for the gate to start the release the truck.
# self.env.add_process(
# FlagHold,
# process_id="WaitForGateRelease_Import",
# resource_ids=lambda process, ticket: [ticket.is_gate_import_started.flag_id],
# process_driven_processes_to_run=[ProcessDrivenProcessToRun("LeaveGate_Import")],
# success_hook=release_truck,
# meta_data={"process_type": "import"},
# )

# self.env.add_process(
# FlagHold,
# process_id="LeaveGate_Import",
# resource_ids=lambda process, ticket: [ticket.is_gate_import_complete.flag_id],
# success_hook=[check_if_import_complete], # if_debug_print_container_flow_state],
# #rel_workpools_all=True,
# #rel_workpools_delay=3600,
# meta_data={"process_type": "import"},
# )

# def print_ticket(self, process: BaseProcess, ticket: Container) -> None:
# print(ticket.Container_No, ticket.gate_in_time, self.env.now)

def get_import_tickets(self) -> None:
"""
Method to create import container tickets
"""
# Create a df of all the container tickets in the simulation so we can merge on the booking data
# Note this includes all yard inventory tickets + all the tickets discharged from vessels
sim_df = pd.DataFrame.from_records(
[{"ticket": x, "Container_No": x.Container_No} for x in self.env.container_tickets]
)
# Get the required input data
import_containers = self.input_data.import_booking_data_raw
import_containers = import_containers.merge(sim_df, how="inner", on="Container_No")

# import_containers["ticket_driven_processes_to_run"] = (
# deque([TicketDrivenProcessToRun("ReadyToExitStackForImport")]),
# ) * len(import_containers)

# TODO - Check if this is needed to check berth planer to compensate the
# discrepancy between the berth plan and the import booking data

# import_tickets = self.env.create_ticket_list_from_df(
# import_containers,
# ticket_class=Container,
# cols_meta_data=set(Container.__slots__) & set(import_containers.columns),
# add_to_ticket_list=False,
# progress_bar=False,
# )
# import_containers["ticket"]["gate_in_time"] = import_containers["gate_in_time"]
for index, row in import_containers.iterrows():
import_ticket = row["ticket"]
import_ticket.gate_in_time = row["gate_in_time"]
import_ticket.in_time = row["gate_in_time"]

# n_of_import_containers = len(import_containers)

self.assign_rtgs_to_import_tickets(import_containers["ticket"])
# request external truck
# after each lift call success hook self.env.yard.remove_ticket_from_yard,

def assign_rtgs_to_import_tickets(self, ticket_list: list):
loading_point_dict = defaultdict(list)
n_of_import_containers = 0
for container in ticket_list:
# print("container bay value", container.Container_No)
if container.bay is not None:
n_of_import_containers += 1
container.is_import = True
container.gate_in_time = datetime.datetime.strptime(container.gate_in_time, "%Y-%m-%dT%H:%M:%S.%fZ")
container.gate_in_time = (container.gate_in_time - self.env.input_data.start_sim_time).total_seconds()
# print("container bay not none", container.Container_No)
loading_point_dict[container.bay.position_id].append(container)
# sort loading point dict by container.stack_position per container.bay ( dict key)
# and return a dict of list of containers per bay as key but each value is sorted by container.stack_position
loading_point_dict = {
bay: sorted(loading_point_dict[bay], key=lambda x: x.stack_position) for bay in loading_point_dict.keys()
}
self.env.add_workpool(
resource_id="ET", # f"{ET}",
initial_capacity=n_of_import_containers,
requests_uses_priority=False,
)
print("import_tickets:", n_of_import_containers)
self.env.input_data.n_total_external_trucks += n_of_import_containers
# for each list in the dict of list of containers per bay as key (list as the index of for loop)
for container_list in loading_point_dict.values():
if container_list[-1].lift_requires_rtg:
# print("container with RTG", container_list[-1].Container_No)
rtg, gantry_period = self.env.rtgs.router.find_optimal_rtg(
*container_list[-1].yard_position.position_tuple
)
self.start_import_by_rtg(rtg, container_list, gantry_period)
else:
for ticket in container_list:
# print("container not required RTG", ticket.Container_No)
# self.env.store_exit(process_id="ReadyToExitStackForImport", resource_items=[{ticket}])
self.env.store_exits_over_time(
process_id="ReadyToExitStackForImport", resource_items=[[{ticket}]], times=[ticket.gate_in_time]
)

def start_import_by_rtg(self, rtg, ticket_list, gantry_period):
# Start the process of loading from this block
vessel_ticket = []
self.env.process(
self.env.rtgs.add_load_container_tickets_to_rtg(
vessel_ticket=vessel_ticket,
rtg=rtg,
container_ticket_list=ticket_list,
initial_gantry_period=gantry_period,
for_import=True,
)
)


def set_ticket_as_import(process: BaseProcess, ticket: Container) -> None:
"""Set the `ticket` `is_import` attribute to True.

:param process: EventFlow process of the form <ReadyToExitStackForImport>.
:param ticket: EventFlow ticket of the form <Container>.
"""
ticket.is_import = True


def if_needs_rtg_add_lift_to_rtg(process: BaseProcess, ticket: Container) -> None:
"""If the load container requires an RTG, assign the lift to the RTG
selected by `find_optimal_rtg` (see rtg_sim_class.py).

:param process: EventFlow process of the form <TruckLimit_Load>.
:param ticket: EventFlow ticket of the form <Container>.
"""
if ticket.lift_requires_rtg:
rtg = ticket.load_rtg
rtg.add_load_container(ticket)


def request_an_external_truck(process: BaseProcess, ticket: Container) -> None:
"""Add `ticket` to the TruckSimClass `allocation_request` queue.

:param process: EventFlow process of the form <TruckLimit_Load>.
:param ticket: EventFlow ticket of the form <Container>.
"""
external_truck_sim_class = ticket._env.external_trucks
external_truck_sim_class.add_allocation_request(ticket)


def request_a_reach_stacker(process: BaseProcess, ticket: Container) -> None:
"""Add `ticket` to the TruckSimClass `allocation_request` queue.
:param process: EventFlow process of the form <TruckLimit_Discharge>.
:param ticket: EventFlow ticket of the form <Container>.
"""
reach_stacker_sim_class = ticket._env.reach_stackers
reach_stacker_sim_class.add_allocation_request(ticket)


def if_needs_rtg_add_import_truck_to_rtg(process: BaseProcess, ticket: Container) -> None:
"""Add the external truck to the assigned RTG's lift
schedule.
Note: Also clears the ticket `import_external_truck` as this will be selected by the
RTG later.

:param process: EventFlow process of the form <AssignTruck_Load>.
:param ticket: EventFlow ticket of the form <Container>.
"""
if ticket.lift_requires_rtg:
rtg = ticket.load_rtg
truck = ticket.import_external_truck
truck.hold_for_dual_cycle()
# truck.add_destination(ticket)
# truck.source = truck.destination
rtg.add_load_trucks([truck])
# ticket.import_external_truck = None


def if_needs_request_reachStacker(process: BaseProcess, ticket: Container) -> None:
"""Add `ticket` to the TruckSimClass `allocation_request` queue.
:param process: EventFlow process of the form <TruckLimit_Discharge>.
:param ticket: EventFlow ticket of the form <Container>.
"""
if not ticket.lift_requires_rtg:
reach_stacker_sim_class = ticket._env.reach_stackers
reach_stacker_sim_class.add_allocation_request(ticket)


def assign_ticket_to_reach_stacker(process: BaseProcess, ticket: Container) -> None:
"""Assign the load container to the current job of the `load_truck`.
Note: This is a step that would otherwise have been performed by the RTG.

:param process: EventFlow process of the form <EmptyHandler_Load>.
:param ticket: EventFlow ticket of the form <Container>.
"""
reach_stacker = ticket.import_reach_stacker
reach_stacker.assign_ticket_to_job(ticket)


def assign_ticket_to_truck(process: BaseProcess, ticket: Container) -> None:
"""Assign the load container to the current job of the `load_truck`.
Note: This is a step that would otherwise have been performed by the RTG.

:param process: EventFlow process of the form <EmptyHandler_Load>.
:param ticket: EventFlow ticket of the form <Container>.
"""

external_truck = ticket.import_external_truck
external_truck.hold_for_dual_cycle()
# print(external_truck.is_held_for_dual_cycle)
external_truck.assign_ticket_to_job(ticket)


def check_if_import_complete(self, process, ticket: Container):
"""
Check if the import process is complete
"""
pass
# Get the import flag
# import_flag = ticket.gate_flags["import"]

# if not import_flag:
# Trigger the flag to denote import is complete
# import_flag.change_status(True)


def release_reach_stacker(process: BaseProcess, ticket: Container) -> None:
"""Release the `reach stacker`.

:param ticket: EventFlow ticket of the form <Container>.
"""
reach_stacker = ticket.import_reach_stacker
reach_stacker.release()


def add_gate_out_destination_to_external_truck(process: BaseProcess, ticket: Container) -> None:
external_truck = ticket.import_external_truck
ticket.is_import_operation_done = True
external_truck.is_held_for_dual_cycle = False
external_truck.add_destination(ticket) # need to assign gate out position


def release_truck(process: BaseProcess, ticket: Container) -> None:
"""Release the `discharge_truck`.
Note: This is a step that would otherwise have been performed by the RTG.

:param process: EventFlow process of the form <EmptyHandler_Discharge>.
:param ticket: EventFlow ticket of the form <Container>.
"""
external_truck = ticket.import_external_truck
external_truck.is_held_for_dual_cycle = False
external_truck.release()
     
 
what is notes.io
 

Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...

With notes.io;

  • * You can take a note from anywhere and any device with internet connection.
  • * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
  • * You can quickly share your contents without website, blog and e-mail.
  • * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
  • * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.

Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.

Easy: Notes.io doesn’t require installation. Just write and share note!

Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )

Free: Notes.io works for 12 years and has been free since the day it was started.


You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;


Email: [email protected]

Twitter: http://twitter.com/notesio

Instagram: http://instagram.com/notes.io

Facebook: http://facebook.com/notesio



Regards;
Notes.io Team

     
 
Shortened Note Link
 
 
Looding Image
 
     
 
Long File
 
 

For written notes was greater than 18KB Unable to shorten.

To be smaller than 18KB, please organize your notes, or sign in.