Scheduling/runtime.py

77 lines
2.7 KiB
Python
Raw Normal View History

2021-04-18 05:53:07 +00:00
import model
from datetime import datetime, timedelta
2021-04-18 19:34:13 +00:00
from typing import List, Dict
2021-04-18 05:53:07 +00:00
class RuntimeProduct:
def __init__(self, product, amount):
self.ddl: datetime = datetime.today()
2021-04-18 19:56:21 +00:00
self.start: datetime = datetime.today()
2021-04-18 05:53:07 +00:00
self.product: model.Product = product
self.amount: int = amount
2021-04-18 19:56:21 +00:00
def set_ddl_start(self, ddl: datetime, start: datetime):
2021-04-18 05:53:07 +00:00
self.ddl = ddl
2021-04-18 19:56:21 +00:00
self.start = start
2021-04-18 05:53:07 +00:00
class ProductLine:
def __init__(self, product: model.Product):
self.product: product = product
self.runtime_products: List[RuntimeProduct] = []
def add_runtime_product(self, runtime_product: RuntimeProduct):
self.runtime_products.append(runtime_product)
2021-04-18 19:34:13 +00:00
class RuntimeProductLines:
def __init__(self):
self.product_lines: Dict[str, ProductLine] = {}
self.product_lines_list: List[ProductLine] = []
def add_runtime_product(self, runtime_product: RuntimeProduct):
if runtime_product.product.product_id not in self.product_lines.keys():
self.product_lines[runtime_product.product.product_id] = ProductLine(runtime_product.product)
self.product_lines_list = list(self.product_lines.values())
self.product_lines[runtime_product.product.product_id].add_runtime_product(runtime_product)
def pop_runtime_product(self):
if self.product_lines_list is None:
return None
earliest_end_time_runtime_product_line = self.product_lines_list[0]
earliest_end_time_runtime_product = None
if len(earliest_end_time_runtime_product_line.runtime_products) > 0:
earliest_end_time_runtime_product = earliest_end_time_runtime_product_line.runtime_products[0]
for product_line in self.product_lines_list:
if len(product_line.runtime_products) > 0:
runtime_product = product_line.runtime_products[0]
if earliest_end_time_runtime_product is None \
or runtime_product.ddl < earliest_end_time_runtime_product.ddl:
earliest_end_time_runtime_product = runtime_product
earliest_end_time_runtime_product_line = product_line
if len(earliest_end_time_runtime_product_line.runtime_products) > 0:
earliest_end_time_runtime_product_line.runtime_products.pop(0)
return earliest_end_time_runtime_product
def reset(self):
self.product_lines_list = list(self.product_lines)
2021-04-18 05:53:07 +00:00
class RuntimeProcess:
def __init__(self, runtime_product: RuntimeProduct, process: model.Process):
self.runtime_product = runtime_product
self.process = process
2021-04-18 19:56:21 +00:00
self.ddl = runtime_product.ddl
self.delay = self.runtime_product.ddl - timedelta(minutes=process.pdt_time)