import gymnasium as gym
import numpy as np
import numpy.typing as npt
import networkx as nx
import pandas as pd
import matplotlib.pyplot as plt
from collections import OrderedDict
from typing import List, Union, Dict, Callable
from graph_jsp_env.disjunctive_graph_jsp_visualizer import DisjunctiveGraphJspVisualizer
from graph_jsp_env.disjunctive_graph_logger import log
from typing import Any, SupportsFloat, Set
[docs]class DisjunctiveGraphJspEnv(gym.Env):
"""
Custom Environment for the Job Shop Problem (jsp) that follows gymnasium interface.
This environment is inspired by the
`The disjunctive graph machine representation of the job shop scheduling problem`
by Jacek Błażewicz 2000
https://www.sciencedirect.com/science/article/pii/S0377221799004865
and
`Learning to Dispatch for Job Shop Scheduling via Deep Reinforcement Learning`
by Zhang, Cong, et al. 2020
https://proceedings.neurips.cc/paper/2020/file/11958dfee29b6709f48a9ba0387a2431-Paper.pdf
https://github.com/zcaicaros/L2D
This environment does not explicitly include disjunctive edges, like specified by Jacek Błażewicz,
only conjunctive edges. Additional information is saved in the edges and nodes, such that one could construct
the disjunctive edges, so the is no loss in information.
Moreover, this environment does not implement the graph matrix datastructure by Jacek Błażewicz, since in provides
no benefits in chosen the reinforcement learning stetting (for more details have a look at the
master thesis).
This environment is more similar to the Zhang, Cong, et al. implementation.
Zhang, Cong, et al. seems to store exclusively time-information exclusively inside nodes
(see Figure 2: Example of state transition) and no additional information inside the edges (like weights in the
representation of Jacek Błażewicz).
However, I had a rough time in understanding the code of Zhang, Cong, et al. 2020, so I might be wrong about that.
The DisjunctiveGraphJssEnv uses the `networkx` library for graph structure and graph visualization.
It is highly configurable and offers a lot of rendering options.
"""
metadata = {'render.modes': ['human', 'rgb_array', 'console']}
def __init__(self,
jps_instance: npt.NDArray = None, *,
# parameters for reward
reward_function='trivial',
custom_reward_function: Callable = None,
reward_function_parameters: Dict = None,
# parameters for observation
normalize_observation_space: bool = True,
flat_observation_space: bool = True,
dtype: str = "float32",
# parameters for actions
action_mode: str = "task",
env_transform: str = None,
perform_left_shift_if_possible: bool = True,
# parameters for rendering
c_map: str = "rainbow",
dummy_task_color="tab:gray",
default_visualisations: List[str] = None,
visualizer_kwargs: Dict = None,
verbose: int = 0
):
"""
:param jps_instance: a jsp instance as numpy array
:param scaling_divisor: lower-bound of the jsp or some other scaling number for the reward.
Only has an effect when `:param scale_reward` is `True`.
If `None` is specified and `:param scale_reward` is `True` a naive
lower-bound will be calculated automatically.
If `scaling_divisor` is equal to the optimal makespan, then the (sparse)
reward will be always smaller or equal to -1.
:param scale_reward: `:param scaling_divisor` is only applied if set to `True`
:param normalize_observation_space: If set to `True` all values in the observation space will be between
0.0 and 1.0.
This includes an one-hot-encoding of the task-to-machine mapping.
See `DisjunctiveGraphJssEnv._state_array`
:param flat_observation_space: If set to `True` the observation space will be flat. Otherwise, a matrix
The exact size depends on the jsp size.
:param dtype: the dtype for the observation space. Must follow numpy notation.
:param action_mode: 'task' or 'job'. 'task' is default. Specifies weather the
`action`-argument of the `DisjunctiveGraphJssEnv.step`-method
corresponds to a job or an task (or node in the graph representation)
Note:
task actions and node_ids are shifted by 1.
So action = 0 corresponds to the node/task 1.
:param perform_left_shift_if_possible: if the specified task in the `DisjunctiveGraphJssEnv.step`-method can
fit between two other task without changing their start- and finishing-
times, the task will be scheduled between them if set to `True`.
Otherwise, it will be appended at the end.
Performing a left shift is never a downside in therms of the makespan.
:param c_map: the name of a matplotlib colormap for visualization.
Default is `rainbow`.
:param dummy_task_color: the color that shall be used for the dummy tasks (source and sink task),
introduced in the graph representation.
Can be any string that is supported by `networkx`.
:param default_visualisations: the visualizations that will be shown by default when calling `render`
Can be any subset of
["gantt_window", "gantt_console", "graph_window", "graph_console"]
as a list of strings.
Note:
"gantt_window" is computationally expensive operation.
:param visualizer_kwargs: additional keyword arguments for
`jss_graph_env.DisjunctiveGraphJspVisualizer`
:param verbose: 0 = no information printed console,
1 = 'important' printed to console,
2 = all information printed to console,
"""
# Note: None-fields will be populated in the 'load_instance' method
self.size = None
self.n_jobs = None
self.n_machines = None
self.total_tasks_without_dummies = None
self.total_tasks = None
self.src_task = None
self.sink_task = None
self.longest_processing_time = None
self.observation_space_shape = None
self.scaling_divisor = None
self.machine_colors = None
self.G = None
self.machine_routes = None
self.makespan_previous_step = None
# reward function settings
if reward_function not in ['nasuta', 'zhang', 'graph-tassel', 'samsonov', 'zero', 'custom', 'trivial']:
raise ValueError(f"only 'nasuta', 'zhang', 'graph-tassel', 'samsonov', 'zero', 'custom', trivial"
f"are valid arguments for 'reward_function'. {reward_function} is not.")
if reward_function == 'custom' and custom_reward_function is None:
raise ValueError("if 'reward_function' is 'custom', 'custom_reward_function' must be specified.")
self.reward_function = reward_function
self.custom_reward_function = custom_reward_function
# default reward function params
if reward_function_parameters is None:
if reward_function == 'trivial' or reward_function == 'nasuta':
self.reward_function_parameters = {
'scaling_divisor': 1.0
}
elif reward_function == 'zhang':
self.reward_function_parameters = {}
elif reward_function == 'samsonov':
self.reward_function_parameters = {
'gamma': 1.025,
't_opt': None,
}
elif reward_function == 'graph-tassel':
self.reward_function_parameters = {
}
elif reward_function == 'zero':
self.reward_function_parameters = {}
elif reward_function == 'custom':
self.reward_function_parameters = {}
else:
raise ValueError('something went wrong. This error should not be called.')
else:
self.reward_function_parameters = reward_function_parameters
# observation settings
self.normalize_observation_space = normalize_observation_space
self.flat_observation_space = flat_observation_space
self.dtype = dtype
# action setting
self.perform_left_shift_if_possible = perform_left_shift_if_possible
if action_mode not in ['task', 'job']:
raise ValueError(f"only 'task' and 'job' are valid arguments for 'action_mode'. {action_mode} is not.")
self.action_mode = action_mode
if env_transform not in [None, 'mask']:
raise ValueError(f"only `None` and 'mask' are valid arguments for 'action_mode'. {action_mode} is not.")
self.env_transform = env_transform
# rendering settings
self.c_map = c_map
if default_visualisations is None:
self.default_visualisations = ["gantt_console", "gantt_window", "graph_console", "graph_window"]
else:
self.default_visualisations = default_visualisations
if visualizer_kwargs is None:
visualizer_kwargs = {}
self.visualizer = DisjunctiveGraphJspVisualizer(**visualizer_kwargs)
# values for dummy tasks nedded for the graph structure
self.dummy_task_machine = -1
self.dummy_task_job = -1
self.dummy_task_color = dummy_task_color
self.verbose = verbose
# save all taken actions into a list
# this might be useful for reconstruction the state of the environment after a reset
self.action_history = []
if jps_instance is not None:
self.load_instance(jsp_instance=jps_instance)
[docs] def load_instance(self, jsp_instance: npt.NDArray, *, reward_function_parameters: Dict = None) -> None:
"""
This loads a jsp instance, sets up the corresponding graph and sets the attributes accordingly.
:param jsp_instance: a jsp instance as numpy array
:param reward_function_parameters: if specified, the reward functions params will be updated.
:return: None
"""
_, n_jobs, n_machines = jsp_instance.shape
self.size = (n_jobs, n_machines)
self.n_jobs = n_jobs
self.n_machines = n_machines
self.total_tasks_without_dummies = n_jobs * n_machines
self.total_tasks = n_jobs * n_machines + 2 # 2 dummy tasks: start, finish
self.src_task = 0
self.sink_task = self.total_tasks - 1
self.longest_processing_time = jsp_instance[1].max()
if self.action_mode == 'task':
self.action_space = gym.spaces.Discrete(self.total_tasks_without_dummies)
else: # action mode 'job'
self.action_space = gym.spaces.Discrete(self.n_jobs)
if self.normalize_observation_space:
self.observation_space_shape = (self.total_tasks_without_dummies,
self.total_tasks_without_dummies + self.n_machines + 1)
else:
self.observation_space_shape = (self.total_tasks_without_dummies, self.total_tasks_without_dummies + 2)
if self.flat_observation_space:
a, b = self.observation_space_shape
self.observation_space_shape = (a * b,)
if self.env_transform is None:
self.observation_space = gym.spaces.Box(
low=0.0,
high=1.0 if self.normalize_observation_space else jsp_instance.max(),
shape=self.observation_space_shape,
dtype=self.dtype
)
elif self.env_transform == 'mask':
self.observation_space = gym.spaces.Dict({
"action_mask": gym.spaces.Box(0, 1, shape=(self.action_space.n,), dtype=np.int32),
"observations": gym.spaces.Box(
low=0.0,
high=1.0 if self.normalize_observation_space else jsp_instance.max(),
shape=self.observation_space_shape,
dtype=self.dtype)
})
else:
raise NotImplementedError(f"'{self.env_transform}' is not supported.")
# generate colors for machines
c_map = plt.cm.get_cmap(self.c_map) # select the desired cmap
arr = np.linspace(0, 1, n_machines, dtype=self.dtype) # create a list with numbers from 0 to 1 with n items
self.machine_colors = {m_id: c_map(val) for m_id, val in enumerate(arr)}
# jsp representation as directed graph
self.G = nx.DiGraph()
# 'routes' of the machines. indicates in which order a machine processes tasks
self.machine_routes = {m_id: np.array([], dtype=int) for m_id in range(n_machines)}
#
# setting up the graph
#
# src node
self.G.add_node(
self.src_task,
pos=(-2, int(-n_jobs * 0.5)),
duration=0,
machine=self.dummy_task_machine,
scheduled=True,
color=self.dummy_task_color,
job=self.dummy_task_job,
start_time=0,
finish_time=0
)
# add nodes in grid format
task_id = 0
machine_order = jsp_instance[0]
processing_times = jsp_instance[1]
for i in range(n_jobs):
for j in range(n_machines):
task_id += 1 # start from task id 1, 0 is dummy starting task
m_id = machine_order[i, j] # machine id
dur = processing_times[i, j] # duration of the task
self.G.add_node(
task_id,
pos=(j, -i),
color=self.machine_colors[m_id],
duration=dur,
scheduled=False,
machine=m_id,
job=i,
start_time=None,
finish_time=None
)
if j == 0: # first task in a job
self.G.add_edge(
self.src_task, task_id,
job_edge=True,
weight=self.G.nodes[self.src_task]['duration'],
nweight=-self.G.nodes[self.src_task]['duration']
)
elif j == n_machines - 1: # last task of a job
self.G.add_edge(
task_id - 1, task_id,
job_edge=True,
weight=self.G.nodes[task_id - 1]['duration'],
nweight=-self.G.nodes[task_id - 1]['duration']
)
else:
self.G.add_edge(
task_id - 1, task_id,
job_edge=True,
weight=self.G.nodes[task_id - 1]['duration'],
nweight=-self.G.nodes[task_id - 1]['duration']
)
# add sink task at the end to avoid permutation in the adj matrix.
# the rows and cols correspond to the order the nodes were added not the index/value of the node.
self.G.add_node(
self.sink_task,
pos=(n_machines + 1, int(-n_jobs * 0.5)),
color=self.dummy_task_color,
duration=0,
machine=self.dummy_task_machine,
job=self.dummy_task_job,
scheduled=True,
start_time=None,
finish_time=None
)
# add edges from last tasks in job to sink
for task_id in range(n_machines, self.total_tasks, n_machines):
self.G.add_edge(
task_id, self.sink_task,
job_edge=True,
weight=self.G.nodes[task_id]['duration']
)
initial_makespan = nx.dag_longest_path_length(self.G)
self.makespan_previous_step = initial_makespan
if reward_function_parameters is not None:
if self.verbose > 1:
log.info(f"updating reward_function_parameters from '{self.reward_function_parameters}' "
f"to '{reward_function_parameters}'")
self.reward_function_parameters = reward_function_parameters
[docs] def step(self, action: int) -> tuple[npt.NDArray, SupportsFloat, bool, bool, Dict[str, Any]]:
"""
perform an action on the environment. Not valid actions will have no effect.
:param action: an action
:return: state, reward, done-flag, info-dict
"""
info = {
'action': action
}
# add arc in graph
if self.action_mode == 'task':
task_id = action + 1
if self.verbose > 1:
log.info(f"handling action={action} (Task {task_id})")
info = {
**info,
**self._schedule_task(task_id=task_id, action=action)
}
else: # case for self.action_mode == 'job'
task_mask = self.valid_action_mask(action_mode='task')
job_mask = np.array_split(task_mask, self.n_jobs)[action]
if True not in job_mask:
if self.verbose > 0:
log.info(f"job {action} is already completely scheduled. Ignoring it.")
info["valid_action"] = False
else:
task_id = 1 + action * self.n_machines + np.argmax(job_mask)
if self.verbose > 1:
log.info(f"handling job={action} (Task {task_id})")
info = {
**info,
**self._schedule_task(task_id=task_id, action=action)
}
# check if done
min_length = min([len(route) for m_id, route in self.machine_routes.items()])
done = min_length == self.n_jobs
makespan = nx.dag_longest_path_length(self.G)
if done:
try:
# by construction a cycle should never happen
# add cycle check just to be sure
cycle = nx.find_cycle(self.G, source=self.src_task)
log.critical(f"CYCLE DETECTED cycle: {cycle}")
raise RuntimeError(f"CYCLE DETECTED cycle: {cycle}")
except nx.exception.NetworkXNoCycle:
pass
info["makespan"] = makespan
info["gantt_df"] = self.network_as_dataframe()
if self.verbose > 0:
log.info(f"makespan: {makespan}")
state = self.get_state()
reward = self.get_reward(
state=state,
done=done,
info=info,
makespan_this_step=makespan
)
self.makespan_previous_step = makespan
truncated = False # by construction always false. might by changed by a wrapper
return state, reward, done, truncated, info
[docs] def is_terminal(self) -> bool:
"""
checks if the current state is terminal.
:return: bool flag. Flase -> not terminal, True -> terminal
"""
# check if done
min_length = min([len(route) for m_id, route in self.machine_routes.items()])
done = min_length == self.n_jobs
return done
[docs] def get_makespan(self) -> float:
"""
returns the makespan in the terminal state.
:return:
"""
if not self.is_terminal():
raise RuntimeError("cannot get makespan of non-terminal state.")
return self.makespan_previous_step
[docs] def get_reward(self, state: npt.NDArray, done: bool, info: Dict, makespan_this_step: float):
info['reward_function'] = self.reward_function
reward_function_parameters = self.reward_function_parameters
if self.reward_function == 'trivial' or self.reward_function == 'nasuta':
if not done:
return 0.0
else:
return - makespan_this_step / reward_function_parameters['scaling_divisor']
elif self.reward_function == 'zhang':
return 1.0 * self.makespan_previous_step - makespan_this_step # 1.0 to convert to float
elif self.reward_function == 'graph-tassel':
# this implementation is not equal to the orginal tassel implementation, because in the disjunctive graph
# approach a timestep does not correspond to a step in time but rather a step in the scheduling process.
# However this code uses tassels idea to use the machine utilization or the scheduled area in the gant
# chart.
max_finish_time = 0.0
total_filled_area = 0.0
at_least_one_scheduled_node = False
for m, m_route in self.machine_routes.items():
if len(m_route):
m_ft = self.G.nodes[m_route[-1]]["finish_time"]
if m_ft >= max_finish_time:
max_finish_time = m_ft
m_filled_area = sum([self.G.nodes[task]["duration"] for task in m_route])
total_filled_area += m_filled_area
at_least_one_scheduled_node = True
else:
pass
if not at_least_one_scheduled_node: # needed to avoid division through zero after invalid first action
return 0.0
total_gantt_area = max_finish_time * self.n_machines
machine_utilization = total_filled_area / total_gantt_area # always between 0 and 1
return machine_utilization
elif self.reward_function == 'samsonov':
if not done:
return 0.0
else:
gamma = reward_function_parameters['gamma']
if reward_function_parameters['t_opt'] is None:
raise ValueError("'t_opt' must be provided inside 'reward_function_parameters' for the samsonov "
"reward function.")
t_opt = reward_function_parameters['t_opt']
return 1000 * gamma ** t_opt / gamma ** makespan_this_step
elif self.reward_function == 'zero':
return 0.0
elif self.reward_function == 'custom':
return self.custom_reward_function(
state, # numpy array
done, # boolean done flag
info, # info dict
self.G, # networkX directed graph
makespan_this_step, # longest path in G before the current timestep action
self.makespan_previous_step, # longest path in G before the current timestep action
**reward_function_parameters # custom reward function parameters
)
[docs] def reset(self, **kwargs) -> tuple[npt.NDArray, Dict[str, Any]]:
"""
resets the environment and returns the initial state.
:param **kwargs: additional keyword arguments for the generic gymnasium reset method.
:return:
"""
# reset seed
super().reset(**kwargs)
#
info = {
"action_history": self.action_history
}
self.action_history = [] # rest action history
# remove machine edges/routes
machine_edges = [(from_, to_) for from_, to_, data_dict in self.G.edges(data=True) if not data_dict["job_edge"]]
self.G.remove_edges_from(machine_edges)
# reset machine routes dict
self.machine_routes = {m_id: np.array([]) for m_id in range(self.n_machines)}
# remove scheduled flags, reset start_time and finish_time
for i in range(1, self.total_tasks_without_dummies + 1):
node = self.G.nodes[i]
node["scheduled"] = False
node["start_time"] = None,
node["finish_time"] = None
return self.get_state(), info # obs, info
[docs] def get_action_history(self) -> List[int]:
"""
returns the action history of the current episode.
:return: list of actions
"""
return self.action_history
[docs] def render(self, mode="human", show: List[str] = None, **render_kwargs) -> Union[None, npt.NDArray]:
"""
renders the enviorment.
:param mode: valid options: "human", "rgb_array", "console"
"human" (default)
render the visualisation specified in :param show:
If :param show: is `None` `DisjunctiveGraphJssEnv.default_visualisations` will be
used.
"rgb_array"
returns rgb-arrays of the 'window' visualisation specified in
`DisjunctiveGraphJssEnv.default_visualisations`
"console"
prints the 'console' visualisations specified in
`DisjunctiveGraphJssEnv.default_visualisations` to the console
:param show: subset of the available visualisations
["gantt_window", "gantt_console", "graph_window", "graph_console"]
as list of strings.
:param render_kwargs: additional keword arguments for the
`jss_graph_env.DisjunctiveGraphJspVisualizer.render_rgb_array`-method.
:return: numpy array if mode="rgb_array" else `None`
"""
df = None
colors = None
if mode not in ["human", "rgb_array", "console"]:
raise ValueError(f"mode '{mode}' is not defined. allowed modes are: 'human' and 'rgb_array'.")
if show is None:
if mode == "rgb_array":
show = [s for s in self.default_visualisations if "window" in s]
elif mode == "console":
show = [s for s in self.default_visualisations if "console" in s]
else:
show = self.default_visualisations
if "gantt_console" in show or "gantt_window" in show:
df = self.network_as_dataframe()
colors = {f"Machine {m_id}": (r, g, b) for m_id, (r, g, b, a) in self.machine_colors.items()}
if "graph_console" in show:
self.visualizer.graph_console(self.G, shape=self.size, colors=colors)
if "gantt_console" in show:
self.visualizer.gantt_chart_console(df=df, colors=colors)
if "graph_window" in show:
if "gantt_window" in show:
if mode == "human":
self.visualizer.render_graph_and_gant_in_window(G=self.G, df=df, colors=colors, **render_kwargs)
elif mode == "rgb_array":
return self.visualizer.gantt_and_graph_vis_as_rgb_array(G=self.G, df=df, colors=colors)
else:
if mode == "human":
self.visualizer.render_graph_in_window(G=self.G, **render_kwargs)
elif mode == "rgb_array":
return self.visualizer.graph_rgb_array(G=self.G)
elif "gantt_window" in show:
if mode == "human":
self.visualizer.render_gantt_in_window(df=df, colors=colors, **render_kwargs)
elif mode == "rgb_array":
return self.visualizer.gantt_chart_rgb_array(df=df, colors=colors)
def _schedule_task(self, task_id: int, action: int) -> Dict[str, Any]:
"""
schedules a task/node in the graph representation if the task can be scheduled.
This adding one or multiple corresponding edges (multiple when performing a left shift) and updating the
information stored in the nodes.
:param task_id: the task or node that shall be scheduled.
:return: a dict with additional information for the `DisjunctiveGraphJssEnv.step`-method.
"""
node = self.G.nodes[task_id]
if node["scheduled"]:
if self.verbose > 0:
log.info(f"task {task_id} is already scheduled. ignoring it.")
return {
"valid_action": False,
"node_id": task_id,
}
self.action_history.append(action)
m_id = node["machine"]
prev_task_in_job_id, _ = list(self.G.in_edges(task_id))[0]
prev_job_node = self.G.nodes[prev_task_in_job_id]
if not prev_job_node["scheduled"]:
if self.verbose > 1:
log.info(f"the previous task (T{prev_task_in_job_id}) in the job is not scheduled jet. "
f"Not scheduling task T{task_id} to avoid cycles in the graph.")
return {
"valid_action": False,
"node_id": task_id,
}
len_m_routes = len(self.machine_routes[m_id])
if len_m_routes:
if self.perform_left_shift_if_possible:
j_lower_bound_st = prev_job_node["finish_time"]
duration = node["duration"]
j_lower_bound_ft = j_lower_bound_st + duration
# check if task can be scheduled between src and first task
m_first = self.machine_routes[m_id][0]
first_task_on_machine_st = self.G.nodes[m_first]["start_time"]
if j_lower_bound_ft <= first_task_on_machine_st:
# schedule task as first node on machine
# self.render(show=["gantt_console"])
info = self._insert_at_index_0(task_id=task_id, node=node, prev_job_node=prev_job_node, m_id=m_id)
self.G.add_edge(
task_id, m_first,
job_edge=False,
weight=duration
)
# self.render(show=["gantt_console"])
return info
elif len_m_routes == 1:
return self._append_at_the_end(task_id=task_id, node=node, prev_job_node=prev_job_node, m_id=m_id)
# check if task can be scheduled between two tasks
for i, (m_prev, m_next) in enumerate(zip(self.machine_routes[m_id], self.machine_routes[m_id][1:])):
m_temp_prev_ft = self.G.nodes[m_prev]["finish_time"]
m_temp_next_st = self.G.nodes[m_next]["start_time"]
if j_lower_bound_ft > m_temp_next_st:
continue
m_gap = m_temp_next_st - m_temp_prev_ft
if m_gap < duration:
continue
# at this point the task can fit in between two already scheduled task
# self.render(show=["gantt_console"])
# remove the edge from m_temp_prev to m_temp_next
replaced_edge_data = self.G.get_edge_data(m_prev, m_next)
st = max(j_lower_bound_st, m_temp_prev_ft)
ft = st + duration
node["start_time"] = st
node["finish_time"] = ft
node["scheduled"] = True
# from previous task to the task to schedule
self.G.add_edge(
m_prev, task_id,
job_edge=replaced_edge_data['job_edge'],
weight=replaced_edge_data['weight']
)
# from the task to schedule to the next
self.G.add_edge(
task_id, m_next,
job_edge=False,
weight=duration
)
# remove replaced edge
self.G.remove_edge(m_prev, m_next)
# insert task at the corresponding place in the machine routes list
self.machine_routes[m_id] = np.insert(self.machine_routes[m_id], i + 1, task_id)
if self.verbose > 1:
log.info(f"scheduled task {task_id} on machine {m_id} between task {m_prev:.0f} "
f"and task {m_next:.0f}")
# self.render(show=["gantt_console"])
return {
"start_time": st,
"finish_time": ft,
"node_id": task_id,
"valid_action": True,
"scheduling_method": 'left_shift',
"left_shift": 1,
}
else:
return self._append_at_the_end(task_id=task_id, node=node, prev_job_node=prev_job_node, m_id=m_id)
else:
return self._append_at_the_end(task_id=task_id, node=node, prev_job_node=prev_job_node, m_id=m_id)
else:
return self._insert_at_index_0(task_id=task_id, node=node, prev_job_node=prev_job_node, m_id=m_id)
def _append_at_the_end(self, task_id: int, node: Dict, prev_job_node: Dict, m_id: int) -> Dict[str, Any]:
"""
inserts a task at the end (last element) in the `DisjunctiveGraphJssEnv.machine_routes`-dictionary.
:param task_id: the id oth the task with in graph representation.
:param node: the corresponding node in the graph (self.G).
:param prev_job_node: the node the is connected to :param node: via a job_edge (job_edge=True).
:param m_id: the id of the machine that corresponds to :param task_id:.
:return: a dict with additional information for the `DisjunctiveGraphJssEnv.step`-method.
"""
prev_m_task = self.machine_routes[m_id][-1]
prev_m_node = self.G.nodes[prev_m_task]
self.G.add_edge(
prev_m_task, task_id,
job_edge=False,
weight=prev_m_node['duration']
)
self.machine_routes[m_id] = np.append(self.machine_routes[m_id], task_id)
st = max(prev_job_node["finish_time"], prev_m_node["finish_time"])
ft = st + node["duration"]
node["start_time"] = st
node["finish_time"] = ft
node["scheduled"] = True
# return additional info
return {
"start_time": st,
"finish_time": ft,
"node_id": task_id,
"valid_action": True,
"scheduling_method": '_append_at_the_end',
"left_shift": 0,
}
def _insert_at_index_0(self, task_id: int, node: Dict, prev_job_node: Dict, m_id: int) -> Dict:
"""
inserts a task at index 0 (first element) in the `DisjunctiveGraphJssEnv.machine_routes`-dictionary.
:param task_id: the id oth the task with in graph representation.
:param node: the corresponding node in the graph (self.G).
:param prev_job_node: the node the is connected to :param node: via a job_edge (job_edge=True).
:param m_id: the id of the machine that corresponds to :param task_id:.
:return: a dict with additional information for the `DisjunctiveGraphJssEnv.step`-method.
"""
self.machine_routes[m_id] = np.insert(self.machine_routes[m_id], 0, task_id)
st = prev_job_node["finish_time"]
ft = st + node["duration"]
node["start_time"] = st
node["finish_time"] = ft
node["scheduled"] = True
# return additional info
return {
"start_time": st,
"finish_time": ft,
"node_id": task_id,
"valid_action": True,
"scheduling_method": '_insert_at_index_0',
"left_shift": 0,
}
[docs] def get_state(self) -> npt.NDArray:
"""
returns the state of the environment as numpy array.
:return: the state of the environment as numpy array.
"""
adj = nx.to_numpy_array(self.G)[1:-1, 1:-1].astype(dtype=int) # remove dummy tasks
task_to_machine_mapping = np.zeros(shape=(self.total_tasks_without_dummies, 1), dtype=int)
task_to_duration_mapping = np.zeros(shape=(self.total_tasks_without_dummies, 1), dtype=self.dtype)
for task_id, data in self.G.nodes(data=True):
if task_id == self.src_task or task_id == self.sink_task:
continue
else:
# index shift because of the removed dummy tasks
task_to_machine_mapping[task_id - 1] = data["machine"]
task_to_duration_mapping[task_id - 1] = data["duration"]
if self.normalize_observation_space:
# one hot encoding for task to machine mapping
task_to_machine_mapping = task_to_machine_mapping.astype(int).ravel()
n_values = np.max(task_to_machine_mapping) + 1
task_to_machine_mapping = np.eye(n_values)[task_to_machine_mapping]
# normalize
adj = adj / self.longest_processing_time # note: adj matrix contains weights
task_to_duration_mapping = task_to_duration_mapping / self.longest_processing_time
# merge arrays
res = np.concatenate((adj, task_to_machine_mapping, task_to_duration_mapping), axis=1, dtype=self.dtype)
"""
Example:
normalize_observation_space = True
(flat_observation_space = False)
jsp: (numpy array)
[
# jobs order on machine
[
[1, 2, 0], # job 0
[0, 2, 1] # job 1
],
# task durations within a job
[
[17, 12, 19], # task durations of job 0
[8, 6, 2] # task durations of job 1
]
]
total number of tasks: 6 (2 * 3)
scaling/normalisation:
longest_processing_time = 19 (third task of the first job)
initial observation:
┏━━━━━━━━━┳━━━━━━━━┯━━━━━━━━┯━━━━━━┯━━━━━━━━┳━━━━━━━━━━━┯━━━━━━━━━━━┯━━━━━━━━━━━┳━━━━━━━━━━┓
┃ ┃ task_1 │ task_2 │ ... │ task_6 ┃ machine_0 │ machine_1 │ machine_2 ┃ duration ┃
┣━━━━━━━━━╋━━━━━━━━┿━━━━━━━━┿━━━━━━┿━━━━━━━━╋━━━━━━━━━━━┿━━━━━━━━━━━┿━━━━━━━━━━━╋━━━━━━━━━━┫
┃ task_1 ┃ 0. │ 17/19 │ ... │ 0. ┃ 0. │ 0. │ 0. ┃ 17/19 ┃
┠─────────╂────────┼────────┼──────┼────────╂───────────┼───────────┼───────────╂──────────┨
┃ task_2 ┃ 0. │ 0. │ ... │ 0. ┃ 0. │ 0. │ 1. ┃ 12/19 ┃
┠─────────╂────────┼────────┼──────┼────────╂───────────┼───────────┼───────────╂──────────┨
┠ ... ┃ ... │ ... │ ... │ ... ┃ ... │ ... │ ... ┃ ... ┃
┠─────────╂────────┼────────┼──────┼────────╂───────────┼───────────┼───────────╂──────────┨
┃ task_6 ┃ 0. │ 0. │ ... │ 0. ┃ 0. │ 1. │ 0. ┃ 2/19 ┃
┗━━━━━━━━━┻━━━━━━━━┷━━━━━━━━┷━━━━━━┷━━━━━━━━┻━━━━━━━━━━━┷━━━━━━━━━━━┷━━━━━━━━━━━┻━━━━━━━━━━┛
or:
[
[0. , 0.89473684, ..., 0. , 0. , 1. ,0. , 0.89473684],
[0. , 0. , ..., 0. , 0. , 0. ,1. , 0.63157895],
...
[0. , 0. , ..., 0. , 0. , 1. ,0. , 0.10526316]
]
"""
else:
"""
Example:
normalize_observation_space = False
(flat_observation_space = False)
jsp: (numpy array)
[
# jobs order on machine
[
[1, 2, 0], # job 0
[0, 2, 1] # job 1
],
# task durations within a job
[
[17, 12, 19], # task durations of job 0
[8, 6, 2] # task durations of job 1
]
]
total number of tasks: 6 (2 * 3)
initial observation:
┏━━━━━━━━┳━━━━━━━━┯━━━━━━━━┯━━━━━┯━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━┓
┃ ┃ task_1 │ task_2 │ ... │ task_6 ┃ machine ┃ duration ┃
┣━━━━━━━━╋━━━━━━━━┿━━━━━━━━┿━━━━━┿━━━━━━━━━╋━━━━━━━━━╋━━━━━━━━━━┫
┃ task_1 ┃ 0. │ 17. │ ... │ 0. ┃ 1. ┃ 17. ┃
┠────────╂────────┼────────┼─────┼─────────╂─────────╂──────────┨
┃ task_2 ┃ 0. │ 0. │ ... │ 0. ┃ 2. ┃ 12. ┃
┠────────╂────────┼────────┼─────┼─────────╂─────────╂──────────┨
┃ ... ┃ ... │ ... │ ... │ ... ┃ ... ┃ .. ┃
┠────────╂────────┼────────┼─────┼─────────╂─────────╂──────────┨
┃ task_6 ┃ 0. │ 0. │ ... │ 0. ┃ 1. ┃ 2. ┃
┗━━━━━━━━┻━━━━━━━━┷━━━━━━━━┷━━━━━┷━━━━━━━━━┻━━━━━━━━━┻━━━━━━━━━━┛
or
[
[ 0., 17., ..., 0., 1., 17.],
[ 0., 0., ..., 0., 2., 12.],
...
[ 0., 0., ..., 0., 1., 2.]
]
"""
res = np.concatenate((adj, task_to_machine_mapping, task_to_duration_mapping), axis=1, dtype=self.dtype)
if self.flat_observation_space:
# falter observation
res = np.ravel(res).astype(self.dtype)
if self.env_transform == 'mask':
res = OrderedDict({
"action_mask": np.array(self.valid_action_mask()).astype(np.int32),
"observations": res
})
return res
[docs] def network_as_dataframe(self) -> pd.DataFrame:
"""
returns the current state of the environment in a format that is supported by Plotly gant charts.
(https://plotly.com/python/gantt/)
:return: the current state as pandas dataframe
"""
return pd.DataFrame([
{
'Task': f'Job {data["job"]}',
'Start': data["start_time"],
'Finish': data["finish_time"],
'Resource': f'Machine {data["machine"]}'
}
for task_id, data in self.G.nodes(data=True)
if data["job"] != -1 and data["finish_time"] is not None
])
[docs] def valid_action_mask(self, action_mode: str = None) -> List[bool]:
"""
returns that indicates which action in the action space is valid (or will have an effect on the environment) and
which one is not.
:param action_mode: Specifies weather the `action`-argument of the `DisjunctiveGraphJssEnv.step`-method
corresponds to a job or a task (or node in the graph representation)
:return: list of boolean in the same shape as the action-space.
"""
if action_mode is None:
action_mode = self.action_mode
if action_mode == 'task':
mask = [False] * self.total_tasks_without_dummies
for task_id in range(1, self.total_tasks_without_dummies + 1):
node = self.G.nodes[task_id]
if node["scheduled"]:
continue
prev_task_in_job_id, _ = list(self.G.in_edges(task_id))[0]
prev_job_node = self.G.nodes[prev_task_in_job_id]
if not prev_job_node["scheduled"]:
continue
mask[task_id - 1] = True
if True not in mask:
if self.verbose >= 2:
log.debug("no action options remaining")
if not self.env_transform == 'mask':
log.debug("no action options remaining")
# raise RuntimeError("something went wrong")
return mask
elif action_mode == 'job':
task_mask = self.valid_action_mask(action_mode='task')
masks_per_job = np.array_split(task_mask, self.n_jobs)
return [True in job_mask for job_mask in masks_per_job]
else:
raise ValueError(f"only 'task' and 'job' are valid arguments for 'action_mode'. {action_mode} is not.")
[docs] def valid_actions(self) -> Set[int]:
"""
Returns the set of valid actions that can be taken in the current state of the environment.
The set contains the values one can pass to the step-function.
The values depend on the action_mode and the current state of the environment.
The set is empty if there are no valid actions.
:return: set of valid actions
"""
return set(np.where(self.valid_action_mask())[0])
[docs] def valid_action_list(self) -> list[int]:
"""
Returns a list of valid actions that can be taken in the current state of the environment.
"""
return [i for i, is_valid in enumerate(self.valid_action_mask()) if is_valid]
[docs] def random_rollout(self) -> (float, dict[str, list]):
"""
performs a random rollout from the current state until a terminal state is reached.
"""
term = self.is_terminal()
# unfortunately, we don't have any information about the past rewards
# so we just return the cumulative reward from the current state onwards
cumulative_reward_from_current_state_onwards = 0
observations = list()
rewards = list()
terms = list()
truns = list()
infos = list()
while not term:
valid_action_list = self.valid_action_list()
random_action = np.random.choice(valid_action_list)
obs, rew, term, trun, info = self.step(random_action)
observations.append(obs)
rewards.append(rew)
terms.append(term)
truns.append(trun)
infos.append(info)
cumulative_reward_from_current_state_onwards += rew
info = {
"observations": observations,
"rewards": rewards,
"terms": terms,
"truns": truns,
"infos": infos,
}
return cumulative_reward_from_current_state_onwards, info
[docs] def greedy_machine_utilization_rollout(self) -> (int, dict[str, list]):
# store the original reward function
original_reward_function = self.reward_function
self.reward_function = 'graph-tassel'
def get_best_action():
prev_state = self.action_history
max_utilisation = 0.0
best_action = None
for action in self.valid_action_list():
_, _, done, _, _ = self.step(action)
# the keyword arguments are primarily used for custom reward functions, so
# they are not needed here.
utilisation = self.get_reward(
state=_,
done=done,
info=_,
makespan_this_step=_
)
if utilisation > max_utilisation:
max_utilisation = utilisation
best_action = action
self.reset()
for a in prev_state:
_ = self.step(a)
return best_action
term = self.is_terminal()
cumulative_reward_from_current_state_onwards = 0
observations = list()
rewards = list()
terms = list()
truns = list()
infos = list()
while not term:
best_action = get_best_action()
obs, rew, term, trun, info = self.step(best_action)
observations.append(obs)
rewards.append(rew)
terms.append(term)
truns.append(trun)
infos.append(info)
cumulative_reward_from_current_state_onwards += rew
info = {
"observations": observations,
"rewards": rewards,
"terms": terms,
"truns": truns,
"infos": infos,
}
# restore the original reward function
self.reward_function = original_reward_function
return cumulative_reward_from_current_state_onwards, info