1 Commits

Author SHA1 Message Date
bobprofisker
a67540c12c Infantry boarding python script
First version of working python script to manage units embarking and disembarking more natively than DCS core can handle / do.
2025-10-10 20:04:15 +01:00

View File

@@ -1,28 +1,16 @@
import asyncio import asyncio
from asyncio import Semaphore from asyncio import Semaphore
import json
from random import randrange from random import randrange
from api import API, Unit, UnitSpawnTable from api import API, Unit, UnitSpawnTable
from math import pi from math import pi
import logging import logging
import time
#Set some globals up #Set some globals up
alternate_time = 300 before_can_re_embark_time = 300 # this is the time it takes for the infantry, after disembarking, to become embarkable again
before_can_re_embark_time = 300 min_toggle_time_period = 30 # this should typically be however long it takes the longest thing to load or unload, used to prevent accidental re toggling the toggle switch too early by accident
####Transport types##### ####Transport types#####
transport_ground = { transport_ground = {}
"M-113": {
"max_capacity": 4,
"max_embark_range": 50,
"doors": 1,
"door_positions": [(3.35,pi),(0,0)],
"board_positions": [(15,pi),(0,0)],
"door_argument_nos": None,
"door_open_thresholds": None,
"is_rear_loader": True,
"boarding_distance": 5
}
}
transport_helicopters = { transport_helicopters = {
"UH-1H":{ "UH-1H":{
@@ -30,13 +18,50 @@ transport_helicopters = {
"max_embark_range": 100, "max_embark_range": 100,
"doors": 2, "doors": 2,
"door_positions": [(2.5,-pi/2),(0.8,0),(2.5,pi/2),(0.8,0)], #two values here offset and heading offset in radians and second distance offset and heading offset in radians "door_positions": [(2.5,-pi/2),(0.8,0),(2.5,pi/2),(0.8,0)], #two values here offset and heading offset in radians and second distance offset and heading offset in radians
"board_positions": [(15,-pi/2),(0,0),(15,pi/2),(0,0)], "centre_offset_position": [(0,0),(0,0)], #used for calculating the unit door centre, when the doors aren't in line with the centre
"door_argument_nos": [43,44], #draw argument numbers for the doors "door_argument_nos": [43,44], #draw argument numbers for the doors
"door_open_thresholds": [0.8,0.8], #value above which the door is considered open "door_open_thresholds": [0.8,0.8], #value above which the door is considered open
"is_rear_loader": False, "is_rear_loader": False,
"boarding_distance": 5 "boarding_distance": 5,
"rotor_radius": 15
},
"CH-47Fbl1":{
"max_capacity": 30,
"max_embark_range": 100,
"doors": 1,
"door_positions": [(12,-pi),(0,0)], #two values here offset and heading offset in radians and second distance offset and heading offset in radians
"centre_offset_position": [(11,-pi),(0,0)], #used for calculating the unit door centre, when the doors aren't in line with the centre
"door_argument_nos": [86], #draw argument numbers for the doors
"door_open_thresholds": [0.55], #value above which the door is considered open
"is_rear_loader": True,
"boarding_distance": 10,
"rotor_radius": 31
},
"Mi-8MT":{
"max_capacity": 24,
"max_embark_range": 100,
"doors": 1,
"door_positions": [(6,-pi),(0,0)], #two values here offset and heading offset in radians and second distance offset and heading offset in radians
"centre_offset_position": [(5.5,-pi),(0,0)], #used for calculating the unit door centre, when the doors aren't in line with the centre
"door_argument_nos": [86], #draw argument numbers for the doors
"door_open_thresholds": [0.8], #value above which the door is considered open
"is_rear_loader": True,
"boarding_distance": 10,
"rotor_radius": 22
},
"Mi-24P":{
"max_capacity": 8,
"max_embark_range": 100,
"doors": 2,
"door_positions": [(2.5,+pi/2),(1.2,0),(2.5,-pi/2),(1.2,0)], #two values here offset and heading offset in radians and second distance offset and heading offset in radians
"centre_offset_position": [(0,0),(0,0)], #used for calculating the unit door centre, when the doors aren't in line with the centre
"door_argument_nos": [38,86], #draw argument numbers for the doors
"door_open_thresholds": [0.8,0.8], #value above which the door is considered open
"is_rear_loader": False,
"boarding_distance": 5,
"rotor_radius": 18
} }
} }
transport_types = set(transport_helicopters.keys()).union(transport_ground.keys()) transport_types = set(transport_helicopters.keys()).union(transport_ground.keys())
@@ -48,36 +73,42 @@ embarker_types = embarker_inf_blue.union(embarker_inf_red)
#Time it takes after loading or unloading to swap back to the other #Time it takes after loading or unloading to swap back to the other
# Setup a logger for the module # Setup a logger for the module
logger = logging.getLogger("infantry_transport") logger = logging.getLogger("infantry_boarding")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
handler = logging.StreamHandler() handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s') formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
def cross_product_2d(v1, v2):
return v1[0] * v2[1] - v1[1] * v2[0]
def determine_side(self, door):
# Calculate relative vectors
vector_to_unit = [
self.position.lat - self.transport_unit.position.lat,
self.position.lng - self.transport_unit.position.lng
]
vector_to_door = [
door.lat - self.transport_unit.position.lat,
door.lng - self.transport_unit.position.lng
]
# Compute the 2D cross product
cross_z = cross_product_2d(vector_to_unit, vector_to_door)
# Determine the side based on the sign of the cross product
if cross_z > 0:
return True
elif cross_z < 0:
return False
else:
return True
class Transporter(Unit): class Transporter(Unit):
def __init__(self, Unit): def __init__(self, Unit):
self.unit = Unit self.unit = Unit
def to_json(self):
return {
"is_transport": self.unit.is_transport,
"max_capacity": self.unit.max_capacity,
"current_capacity": self.unit.current_capacity,
"max_embark_range": self.unit.max_embark_range,
"boarding_distance": self.unit.boarding_distance,
"current_cargo_weight": self.unit.current_cargo_weight,
"unit_array": [unit.ID for unit in self.unit.unit_array],
"en_boarding_queue": [unit.ID for unit in self.unit.en_boarding_queue],
"doors": self.unit.doors,
"door_positions": self.unit.door_positions,
"board_positions": self.unit.board_positions,
"door_argument_nos": self.unit.door_argument_nos,
"door_open_thresholds": self.unit.door_open_thresholds,
"is_rear_loader": self.unit.is_rear_loader,
"will_disembark": self.unit.will_disembark
}
def set_as_transport(self): def set_as_transport(self):
self.unit.is_transport = True self.unit.is_transport = True
if self.unit.name in transport_helicopters: if self.unit.name in transport_helicopters:
@@ -89,65 +120,86 @@ class Transporter(Unit):
self.unit.current_cargo_weight = 0 self.unit.current_cargo_weight = 0
self.unit.unit_array = [] self.unit.unit_array = []
self.unit.en_boarding_queue = [] self.unit.en_boarding_queue = []
self.unit.current_max_capacity = 0
self.unit.doors = transport_helicopters["UH-1H"]["doors"] self.unit.doors = transport_helicopters["UH-1H"]["doors"]
self.unit.door_positions = transport_helicopters["UH-1H"]["door_positions"] self.unit.door_positions = transport_helicopters["UH-1H"]["door_positions"]
self.unit.board_positions = transport_helicopters["UH-1H"]["board_positions"] self.unit.centre_offset_position = transport_helicopters["UH-1H"]["centre_offset_position"]
self.unit.door_argument_nos = transport_helicopters["UH-1H"]["door_argument_nos"] self.unit.door_argument_nos = transport_helicopters["UH-1H"]["door_argument_nos"]
self.unit.will_disembark = False
self.unit.register_draw_argument(43) #Register draw argument 43 for UH-1H
self.unit.register_draw_argument(44)
self.unit.door_open_thresholds = transport_helicopters["UH-1H"]["door_open_thresholds"] self.unit.door_open_thresholds = transport_helicopters["UH-1H"]["door_open_thresholds"]
self.unit.will_disembark = True
self.unit.register_draw_argument(43) #left door
self.unit.register_draw_argument(44) #right door
self.unit.register_draw_argument(446) #interior light colour switch, we use as a toggle
self.unit.loading_toggle_argument = 2 #this is the argument registered in the index on the draw arguments that controls loading state, have to trial and error what it is for each transport type
self.unit.disembark_embark_argument_toggle_argument_threshold = 0.7
self.unit.is_rear_loader = transport_helicopters["UH-1H"]["is_rear_loader"] self.unit.is_rear_loader = transport_helicopters["UH-1H"]["is_rear_loader"]
self.unit.rotor_radius = transport_helicopters["UH-1H"]["rotor_radius"]
elif self.unit.name == "CH-47Fbl1":
self.unit.max_capacity = transport_helicopters["CH-47Fbl1"]["max_capacity"]
self.unit.max_embark_range = transport_helicopters["CH-47Fbl1"]["max_embark_range"]
self.unit.boarding_distance = transport_helicopters["CH-47Fbl1"]["boarding_distance"]
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.current_max_capacity = 0
self.unit.doors = transport_helicopters["CH-47Fbl1"]["doors"]
self.unit.centre_offset_position = transport_helicopters["CH-47Fbl1"]["centre_offset_position"]
self.unit.door_positions = transport_helicopters["CH-47Fbl1"]["door_positions"]
self.unit.door_argument_nos = transport_helicopters["CH-47Fbl1"]["door_argument_nos"]
self.unit.door_open_thresholds = transport_helicopters["CH-47Fbl1"]["door_open_thresholds"]
self.unit.will_disembark = True
self.unit.register_draw_argument(86) #rear ramp
self.unit.register_draw_argument(606) #interior light colour switch, we use as a toggle
self.unit.loading_toggle_argument = 1 #this is the argument registered in the index on the draw arguments that controls loading state, have to trial and error what it is for each transport type
self.unit.disembark_embark_argument_toggle_argument_threshold = 0.5
self.unit.is_rear_loader = transport_helicopters["CH-47Fbl1"]["is_rear_loader"]
self.unit.rotor_radius = transport_helicopters["CH-47Fbl1"]["rotor_radius"]
elif self.unit.name == "Mi-8MT":
self.unit.max_capacity = transport_helicopters["Mi-8MT"]["max_capacity"]
self.unit.max_embark_range = transport_helicopters["Mi-8MT"]["max_embark_range"]
self.unit.boarding_distance = transport_helicopters["Mi-8MT"]["boarding_distance"]
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.current_max_capacity = 0
self.unit.doors = transport_helicopters["Mi-8MT"]["doors"]
self.unit.centre_offset_position = transport_helicopters["Mi-8MT"]["centre_offset_position"]
self.unit.door_positions = transport_helicopters["Mi-8MT"]["door_positions"]
self.unit.door_argument_nos = transport_helicopters["Mi-8MT"]["door_argument_nos"]
self.unit.door_open_thresholds = transport_helicopters["Mi-8MT"]["door_open_thresholds"]
self.unit.will_disembark = True
self.unit.register_draw_argument(86) #rear ramp
self.unit.register_draw_argument(133) #interior light colour switch, we use as a toggle
self.unit.loading_toggle_argument = 1 #this is the argument registered in the index on the draw arguments that controls loading state, have to trial and error what it is for each transport type
self.unit.disembark_embark_argument_toggle_argument_threshold = 0.8
self.unit.is_rear_loader = transport_helicopters["Mi-8MT"]["is_rear_loader"]
self.unit.rotor_radius = transport_helicopters["Mi-8MT"]["rotor_radius"]
elif self.unit.name == "Mi-24P":
self.unit.max_capacity = transport_helicopters["Mi-24P"]["max_capacity"]
self.unit.max_embark_range = transport_helicopters["Mi-24P"]["max_embark_range"]
self.unit.boarding_distance = transport_helicopters["Mi-24P"]["boarding_distance"]
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.current_max_capacity = 0
self.unit.doors = transport_helicopters["Mi-24P"]["doors"]
self.unit.centre_offset_position = transport_helicopters["Mi-24P"]["centre_offset_position"]
self.unit.door_positions = transport_helicopters["Mi-24P"]["door_positions"]
self.unit.door_argument_nos = transport_helicopters["Mi-24P"]["door_argument_nos"]
self.unit.door_open_thresholds = transport_helicopters["Mi-24P"]["door_open_thresholds"]
self.unit.will_disembark = True
self.unit.register_draw_argument(38) #left door
self.unit.register_draw_argument(86) #right door
self.unit.register_draw_argument(47) #interior light colour switch, we use as a toggle
self.unit.loading_toggle_argument = 2 #this is the argument registered in the index on the draw arguments that controls loading state, have to trial and error what it is for each transport type
self.unit.disembark_embark_argument_toggle_argument_threshold = 0.8
self.unit.is_rear_loader = transport_helicopters["Mi-24P"]["is_rear_loader"]
self.unit.rotor_radius = transport_helicopters["Mi-24P"]["rotor_radius"]
else: else:
self.unit.max_capacity = 8 pass
self.unit.max_embark_range = 100
self.unit.boarding_distance = 5
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.doors = 1
self.unit.door_positions = [(5,pi),(0,0)]
self.unit.board_positions = [(15,pi),(0,0)]
self.unit.door_argument_nos = None
self.unit.door_open_thresholds = None
self.unit.will_disembark = False
self.unit.is_rear_loader = True
elif self.unit.name in transport_ground:
if self.unit.name == "M-113":
self.unit.max_capacity = transport_ground["M-113"]["max_capacity"]
self.unit.max_embark_range = transport_ground["M-113"]["max_embark_range"]
self.unit.boarding_distance = transport_ground["M-113"]["boarding_distance"]
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.doors = transport_ground["M-113"]["doors"]
self.unit.door_positions = transport_ground["M-113"]["door_positions"]
self.unit.board_positions = transport_ground["M-113"]["board_positions"]
self.unit.door_argument_nos = transport_ground["M-113"]["door_argument_nos"]
self.unit.door_open_thresholds = transport_ground["M-113"]["door_open_thresholds"]
self.unit.will_disembark = False
self.unit.is_rear_loader = transport_ground["M-113"]["is_rear_loader"]
else:
self.unit.max_capacity = 4
self.unit.max_embark_range = 50
self.unit.boarding_distance = 5
self.unit.current_capacity = 0
self.unit.current_cargo_weight = 0
self.unit.unit_array = []
self.unit.en_boarding_queue = []
self.unit.doors = 1
self.unit.door_positions = [(5,pi),(0,0)]
self.unit.board_positions = [(15,pi),(0,0)]
self.unit.door_argument_nos = None
self.unit.door_open_thresholds = None
self.unit.will_disembark = False
self.unit.is_rear_loader = True
logger.info(f"Set unit '{self.unit.name}' as transport, with {self.unit.current_capacity} / {self.unit.max_capacity}.")
class DisembarkedInfantry(Unit): class DisembarkedInfantry(Unit):
def __str__(self): def __str__(self):
@@ -165,7 +217,7 @@ class DisembarkedInfantry(Unit):
if self.check_for_enemy_in_range(): if self.check_for_enemy_in_range():
self.set_speed(10) self.set_speed(10)
else: else:
self.set_speed(3) self.set_speed(2)
self.register_on_destination_reached_callback( self.register_on_destination_reached_callback(
self.on_destination_reached, self.on_destination_reached,
destination, destination,
@@ -178,7 +230,7 @@ class DisembarkedInfantry(Unit):
for unit in units.values(): for unit in units.values():
if unit.alive and unit.coalition != self.coalition: if unit.alive and unit.coalition != self.coalition:
distance_to_enemy = self.position.distance_to(unit.position) distance_to_enemy = self.position.distance_to(unit.position)
if distance_to_enemy < 2000: #if an enemy is within 100m if distance_to_enemy < 2000: #if an enemy is within 2000m, approx rifle max range
return True return True
return False return False
@@ -187,8 +239,15 @@ class DisembarkedInfantry(Unit):
# logger.info(f"Unit {self} did not reach its destination.") # logger.info(f"Unit {self} did not reach its destination.")
self.set_roe(1) self.set_roe(1)
new_patrol = self.position.project_with_bearing_and_distance(1000, self.transport_spawn_heading) new_patrol = self.position.project_with_bearing_and_distance(1000, self.transport_spawn_heading)
await asyncio.sleep(self.time_delay) #wait a bit before trying again await asyncio.sleep(10) #wait a bit before trying again
self.set_path([new_patrol]) try:
if self.og_transport.is_rear_loader:
side_offset = self.position.project_with_bearing_and_distance(30,self.transport_spawn_heading-pi/2)
self.set_path([side_offset,new_patrol])
else:
self.set_path([new_patrol])
except AttributeError:
self.set_path([new_patrol])
if self.check_for_enemy_in_range(): if self.check_for_enemy_in_range():
self.set_speed(10) self.set_speed(10)
else: else:
@@ -198,10 +257,16 @@ class DisembarkedInfantry(Unit):
logger.info(f"Unit {self} is now boardable again.") logger.info(f"Unit {self} is now boardable again.")
else: else:
self.set_roe(1) self.set_roe(1)
logger.info(f"Unit {self} has reached its destination.")
new_patrol = self.position.project_with_bearing_and_distance(1000, self.transport_spawn_heading) new_patrol = self.position.project_with_bearing_and_distance(1000, self.transport_spawn_heading)
await asyncio.sleep(self.time_delay) #wait a bit before trying again await asyncio.sleep(10) #wait a bit before trying again
self.set_path([new_patrol]) try:
if self.og_transport.is_rear_loader:
side_offset = self.position.project_with_bearing_and_distance(30,self.transport_spawn_heading-pi/2)
self.set_path([side_offset,new_patrol])
else:
self.set_path([new_patrol])
except AttributeError:
self.set_path([new_patrol])
if self.check_for_enemy_in_range(): if self.check_for_enemy_in_range():
self.set_speed(10) self.set_speed(10)
else: else:
@@ -210,7 +275,6 @@ class DisembarkedInfantry(Unit):
self.is_loadable = True self.is_loadable = True
logger.info(f"Unit {self} is now boardable again.") logger.info(f"Unit {self} is now boardable again.")
class Embarker(Unit): class Embarker(Unit):
def __str__(self): def __str__(self):
return f"DisembarkedInfrantry(unit_id={self.unit_id}, group_id={self.group_id}, position={self.position}, heading={self.heading})" return f"DisembarkedInfrantry(unit_id={self.unit_id}, group_id={self.group_id}, position={self.position}, heading={self.heading})"
@@ -218,21 +282,11 @@ class Embarker(Unit):
def __init__(self, Unit): def __init__(self, Unit):
self.unit = Unit self.unit = Unit
def to_json(self):
return {
"is_embarker": self.unit.is_embarker,
"is_moving": self.unit.is_moving,
"is_loadable": self.unit.is_loadable,
"in_embark_queue": self.unit.in_embark_queue if hasattr(self.unit, 'in_embark_queue') else False,
"transport_unit": self.unit.transport_unit.ID if hasattr(self.unit, 'transport_unit') and self.unit.transport_unit else None
}
def set_as_embarker(self): def set_as_embarker(self):
self.unit.is_embarker = True self.unit.is_embarker = True
self.unit.is_moving = False self.unit.is_moving = False
self.unit.is_loadable = True self.unit.is_loadable = True
logger.info(f"Set unit '{self.unit.name}' as embarker.") logger.info(f"Set unit '{self.unit.name}' as embarker.")
self.unit.set_custom_string("I am an embarker.")
def can_board(self): def can_board(self):
transport = self.transport_unit transport = self.transport_unit
@@ -245,8 +299,10 @@ class Embarker(Unit):
def board_transport(self): def board_transport(self):
door, num_doors_open = self.get_closest_door() door, num_doors_open = self.get_closest_door()
if num_doors_open > 1: door_bypass = True if num_doors_open > 1:
else: door_bypass = False door_bypass = True
else:
door_bypass = False
if door is None: if door is None:
pass pass
@@ -254,42 +310,69 @@ class Embarker(Unit):
if self.is_moving: if self.is_moving:
pass pass
elif not self.is_moving: elif not self.is_moving:
distance_to_door = self.position.distance_to(door) distance = self.position.distance_to(door)
distance_to_centre = self.position.distance_to(self.transport_unit.position) distance_centre_offset_position = self.position.distance_to(self.transport_unit.position.project_with_bearing_and_distance(self.transport_unit.centre_offset_position[0][0], self.transport_unit.centre_offset_position[0][1]))
if distance_to_door < distance_to_centre:
bearing = self.position.bearing_to(door) if distance >= distance_centre_offset_position:
if hasattr(self,'nudge'): if determine_side(self,door): #right side
nudge_factor = self.nudge if self.transport_unit.is_rear_loader: # chinook rear loader
else: destination = door.project_with_bearing_and_distance(self.transport_unit.rotor_radius/3, self.transport_unit.heading-pi)
nudge_factor = 0 destination = destination.project_with_bearing_and_distance(self.transport_unit.rotor_radius/2, self.transport_unit.heading+pi/2)
destination = self.position.project_with_bearing_and_distance(distance_to_door+nudge_factor, bearing) destination.threshold = 2
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0 # Timeout after 30 seconds if the destination is not reached
)
self.is_moving = True
else: # huey front loader
destination = door.project_with_bearing_and_distance(self.transport_unit.rotor_radius, self.transport_unit.heading)
destination = destination.project_with_bearing_and_distance(self.transport_unit.rotor_radius/2, self.transport_unit.heading+pi/2)
destination.threshold = 2
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0 # Timeout after 30 seconds if the destination is not reached
)
self.is_moving = True
else: #left side
if self.transport_unit.is_rear_loader: # chinook rear loader
destination = door.project_with_bearing_and_distance(self.transport_unit.rotor_radius/3, self.transport_unit.heading-pi)
destination = destination.project_with_bearing_and_distance(self.transport_unit.rotor_radius/2, self.transport_unit.heading-pi/2)
destination.threshold = 2
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0 # Timeout after 30 seconds if the destination is not reached
)
self.is_moving = True
else: # huey front loader
destination = door.project_with_bearing_and_distance(self.transport_unit.rotor_radius, self.transport_unit.heading)
destination = destination.project_with_bearing_and_distance(self.transport_unit.rotor_radius/2, self.transport_unit.heading-pi/2)
destination.threshold = 2
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0 # Timeout after 30 seconds if the destination is not reached
)
self.is_moving = True
else:
destination = self.position.project_with_bearing_and_distance(distance+2, self.position.bearing_to(door))
#destination = destination.project_with_bearing_and_distance(self.transport_unit.rotor_radius, self.transport_unit.heading+pi/2)
destination.threshold = 2 destination.threshold = 2
# Set the destination for the unit self.set_path([destination,door])
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0 # Timeout after 30 seconds if the destination is not reached
)
self.is_moving = True
else:# distance_to_door >= distance_to_centre:
if self.transport_unit.is_rear_loader:
in_front_of_transport = self.transport_unit.position.project_with_bearing_and_distance(15, self.transport_unit.heading-pi)
else:
in_front_of_transport = self.transport_unit.position.project_with_bearing_and_distance(15, self.transport_unit.heading)
bearing = self.position.bearing_to(in_front_of_transport)
destination = self.position.project_with_bearing_and_distance(distance_to_door, bearing)
destination.threshold = 2
self.set_path([destination])
self.register_on_destination_reached_callback(
self.on_destination_reached,
destination,
threshold=2.0,
timeout=10.0
)
self.is_moving = True self.is_moving = True
def get_closest_door(self): def get_closest_door(self):
return check_closest_open_door(self.transport_unit, self) return check_closest_open_door(self.transport_unit, self)
@@ -322,33 +405,23 @@ def check_closest_open_door(transport, embarker):
return closest_door, doors_open return closest_door, doors_open
else: else:
return None, 0 return None, 0
elif transport.name in transport_ground:
if transport.door_argument_nos is None and transport.doors > 0:
return transport.position.project_with_bearing_and_distance(2,transport.heading + pi), transport.heading + pi
elif transport.door_argument_nos is not None and transport.doors > 0:
closest_door = None
doors_open = 0
distance_to_closest_door = float('inf')
for i in range(transport.doors):
if transport.draw_arguments[i].value >= transport.door_open_thresholds[i]:
doors_open += 1
distance = embarker.position.distance_to(transport.position.project_with_bearing_and_distance(transport.door_positions[i*2][0], transport.heading + transport.door_positions[i*2][1]).project_with_bearing_and_distance(transport.door_positions[i*2+1][0], transport.heading + transport.door_positions[i*2+1][1]))
if distance < distance_to_closest_door:
distance_to_closest_door = distance
closest_door = transport.position.project_with_bearing_and_distance(transport.door_positions[i*2][0], transport.heading + transport.door_positions[i*2][1]).project_with_bearing_and_distance(transport.door_positions[i*2+1][0], transport.heading + transport.door_positions[i*2+1][1])
return closest_door, doors_open
else:
return None, 0
def check_for_door_status(transporter): def check_for_door_status(transporter):
if not hasattr(transporter, 'draw_arguments') or len(transporter.draw_arguments) < transporter.doors:
#logger.warning(f"Transporter '{transporter.name}' does not have enough draw arguments registered.")
return False
if transporter.name in transport_helicopters: if transporter.name in transport_helicopters:
if transporter.door_argument_nos is None and transporter.doors > 0: if transporter.door_argument_nos is None and transporter.doors > 0:
return True return True
elif transporter.door_argument_nos is not None and transporter.doors > 0: elif transporter.door_argument_nos is not None and transporter.doors > 0:
a_door_is_open = False a_door_is_open = False
for i in range(transporter.doors): for i in range(transporter.doors):
if transporter.draw_arguments[i].value >= transporter.door_open_thresholds[i]: if i >= len(transporter.draw_arguments): # Ensure the index is valid
a_door_is_open = True #logger.error(f"Index {i} out of range for draw_arguments in transporter '{transporter.name}'.")
continue
if transporter.draw_arguments[i].value >= transporter.door_open_thresholds[i]:
a_door_is_open = True
return a_door_is_open return a_door_is_open
else: else:
return False return False
@@ -358,6 +431,9 @@ def check_for_door_status(transporter):
elif transporter.door_argument_nos is not None and transporter.doors > 0: elif transporter.door_argument_nos is not None and transporter.doors > 0:
a_door_is_open = False a_door_is_open = False
for i in range(transporter.doors): for i in range(transporter.doors):
if i >= len(transporter.draw_arguments): # Ensure the index is valid
#logger.error(f"Index {i} out of range for draw_arguments in transporter '{transporter.name}'.")
continue
if transporter.draw_arguments[i].value >= transporter.door_open_thresholds[i]: if transporter.draw_arguments[i].value >= transporter.door_open_thresholds[i]:
a_door_is_open = True a_door_is_open = True
return a_door_is_open return a_door_is_open
@@ -370,14 +446,6 @@ async def load_loadable_units():
if embarker.alive and hasattr(embarker, 'is_embarker'): if embarker.alive and hasattr(embarker, 'is_embarker'):
if hasattr(embarker, 'in_embark_queue') and hasattr(embarker, 'transport_unit') and hasattr(embarker, 'is_moving'): if hasattr(embarker, 'in_embark_queue') and hasattr(embarker, 'transport_unit') and hasattr(embarker, 'is_moving'):
if embarker.transport_unit.name in transport_types: if embarker.transport_unit.name in transport_types:
#check the speed and distance, slow down if close
distance_to_transport = embarker.position.distance_to(embarker.transport_unit.position)
if distance_to_transport > 10 and embarker.speed < 1.4:
embarker.set_speed(10)
elif distance_to_transport < 10 and embarker.speed >= 3:
embarker.set_speed(2)
elif distance_to_transport < 5 and embarker.speed >= 1.3:
embarker.set_speed(1.3)
if embarker.roe != "hold": if embarker.roe != "hold":
embarker.set_roe(4) #set to hold fire to avoid stopping to shoot embarker.set_roe(4) #set to hold fire to avoid stopping to shoot
#check the doors are open #check the doors are open
@@ -389,14 +457,14 @@ async def load_loadable_units():
#check if close enough to board #check if close enough to board
closest_door, _ = embarker.get_closest_door() closest_door, _ = embarker.get_closest_door()
door_distance = embarker.position.distance_to(closest_door) door_distance = embarker.position.distance_to(closest_door)
if door_distance < embarker.transport_unit.boarding_distance: if door_distance < embarker.transport_unit.boarding_distance or embarker.position.distance_to(embarker.transport_unit.position) < embarker.transport_unit.boarding_distance:
transport = embarker.transport_unit transport = embarker.transport_unit
embarker_units = [ embarker_units = [
(embarker, embarker.position.distance_to(transport.position)) (embarker, embarker.position.distance_to(transport.position))
for embarker in units.values() for embarker in units.values()
if embarker.alive if embarker.alive
and hasattr(embarker, 'is_embarker') and hasattr(embarker, 'is_embarker')
and embarker.position.distance_to(transport.position) < transport.boarding_distance and embarker.position.distance_to(closest_door) < transport.boarding_distance
] ]
embarkers_sorted = sorted(embarker_units, key=lambda x: x[1]) embarkers_sorted = sorted(embarker_units, key=lambda x: x[1])
@@ -405,22 +473,18 @@ async def load_loadable_units():
else: else:
if embarker.ID == embarkers_sorted[0][0].ID: if embarker.ID == embarkers_sorted[0][0].ID:
transport.current_capacity += 1 transport.current_capacity += 1
transport.current_max_capacity +=1
transport.unit_array.append(embarker) transport.unit_array.append(embarker)
transport.set_cargo_weight(transport.current_cargo_weight + 100) #assume 100kg per infantry with kit transport.set_cargo_weight(transport.current_cargo_weight + 100) #assume 100kg per infantry with kit
transport.current_cargo_weight += 100 transport.current_cargo_weight += 100
embarker.delete_unit() embarker.delete_unit()
asyncio.create_task(set_as_disembarking(transport)) #asyncio.create_task(set_as_disembarking(transport))
break break
#else run it closer #else run it closer
if embarker.is_moving: if embarker.is_moving:
if hasattr(embarker, 'last_pos'): if hasattr(embarker, 'last_pos'):
if embarker.position == embarker.last_pos: if embarker.position == embarker.last_pos:
embarker.is_moving = False embarker.is_moving = False
embarker.set_speed(1.3)
if hasattr(embarker, 'nudge'):
embarker.nudge = embarker.nudge + 2
else:
embarker.nudge = 2
embarker.last_pos = embarker.position embarker.last_pos = embarker.position
pass pass
elif not embarker.is_moving: elif not embarker.is_moving:
@@ -441,12 +505,12 @@ def generate_transport_units():
new_emabarquee.set_as_embarker() new_emabarquee.set_as_embarker()
async def set_as_disembarking(transport): async def set_as_disembarking(transport):
await asyncio.sleep(alternate_time)
transport.will_disembark = True transport.will_disembark = True
transport.en_boarding_queue = []
async def set_as_not_disembarking(transport): async def set_as_not_disembarking(transport):
await asyncio.sleep(alternate_time)
transport.will_disembark = False transport.will_disembark = False
transport.current_max_capacity = transport.current_capacity
unload_semaphore = Semaphore(1) unload_semaphore = Semaphore(1)
@@ -458,9 +522,9 @@ async def check_for_unloadable_units():
for transporter in units.values(): for transporter in units.values():
if transporter.alive and hasattr(transporter, 'is_transport') and transporter.will_disembark: if transporter.alive and hasattr(transporter, 'is_transport') and transporter.will_disembark:
# Check if the transporter is in a position to disembark units # Check if the transporter is in a position to disembark units
if transporter.speed < 2 and check_for_door_status(transporter) and not transporter.airborne: # check speed is less than 2 m/s and doors are open if transporter.speed < 2 and check_for_door_status(transporter) and not transporter.airborne and transporter.current_capacity > 0: # check speed is less than 2 m/s and doors are open
first_two_spawns = True # Track if we are handling the first two spawns # Transport is ready to disembark
to_remove = [] #sets up variable to hold units to remove from queue to_remove = [] # Sets up variable to hold units to remove from queue
for disembarker in transporter.unit_array: for disembarker in transporter.unit_array:
# Get the open doors # Get the open doors
open_doors = [] open_doors = []
@@ -483,20 +547,7 @@ async def check_for_unloadable_units():
transporter.last_door_index = 0 # Initialize the last used door index transporter.last_door_index = 0 # Initialize the last used door index
# Get the next door in the round-robin sequence # Get the next door in the round-robin sequence
door_index = transporter.last_door_index % len(open_doors)
transporter.last_door_index += 1 # Increment the door index for the next spawn
# Spawn the unit at the selected door
door_position = open_doors[door_index]
door_heading = open_doors_headings[door_index]
spawn_table: UnitSpawnTable = UnitSpawnTable(
unit_type=disembarker.name,
location=door_position,
heading=door_heading,
skill="High",
livery_id=""
)
async def execution_callback(new_group_ID: int): async def execution_callback(new_group_ID: int):
logger.info(f"New units spawned, groupID: {new_group_ID}") logger.info(f"New units spawned, groupID: {new_group_ID}")
@@ -506,72 +557,146 @@ async def check_for_unloadable_units():
logger.info(f"New unit spawned: {new_unit}") logger.info(f"New unit spawned: {new_unit}")
new_unit.__class__ = DisembarkedInfantry new_unit.__class__ = DisembarkedInfantry
new_unit.transport_spawn_heading = transporter.heading new_unit.transport_spawn_heading = transporter.heading
new_unit.og_transport = transporter
new_unit.disembark_from_transport() new_unit.disembark_from_transport()
new_unit.original_position = new_unit.position new_unit.original_position = new_unit.position
#the delay is a function of how many units are left to disembark and how long it takes to get to the disembark spot # The delay is a function of how many units are left to disembark and how long it takes to get to the disembark spot
new_unit.time_delay = transporter.max_capacity*2 - transporter.current_capacity # Random delay between 10 and 30 seconds
async def delayed_spawn(delay,transporter,open_doors,open_doors_headings,disembarker):
door_index = transporter.last_door_index % len(open_doors)
transporter.last_door_index += 1
# Increment the door index for the next spawn
# Spawn the unit at the selected door
door_position = open_doors[door_index]
door_heading = open_doors_headings[door_index]
spawn_table: UnitSpawnTable = UnitSpawnTable(
unit_type=disembarker.name,
location=door_position,
heading=door_heading,
skill="High",
livery_id=""
)
# Add a delay before spawning the unit
await asyncio.sleep(delay) # Delay of 2 seconds (adjust as needed)
api.spawn_ground_units([spawn_table], transporter.coalition, "", True, 0, execution_callback)
transporter.set_cargo_weight(transporter.current_cargo_weight - 100) # Assume 100kg per infantry with kit
transporter.current_cargo_weight -= 100
logger.info(f"Spawned unit '{disembarker.name}' from open door of transport '{transporter.name}'.")
if len(open_doors) > 1:
if (transporter.current_max_capacity - transporter.current_capacity) < len(open_doors):
delay = 0.1
else:
delay = (transporter.current_max_capacity - transporter.current_capacity) * 1.25 - ((len(open_doors)-1) * 2.5) + 2.5
else:
delay = (transporter.current_max_capacity - transporter.current_capacity) * 2.5
asyncio.create_task(delayed_spawn(delay,transporter,open_doors,open_doors_headings,disembarker))
api.spawn_ground_units([spawn_table], transporter.coalition, "", True, 0, execution_callback)
to_remove.append(disembarker)
transporter.en_boarding_queue = [] transporter.en_boarding_queue = []
transporter.current_capacity -= 1 transporter.current_capacity -= 1
transporter.set_cargo_weight(transporter.current_cargo_weight - 100) # Assume 100kg per infantry with kit to_remove.append(disembarker)
transporter.current_cargo_weight -= 100
# Add a delay between spawns
if len(open_doors) > 1 and first_two_spawns:
# Shorter delay for the first two spawns if both doors are open
await asyncio.sleep(0.5)
first_two_spawns = False
else:
# Normal delay for subsequent spawns or single-door spawns
await asyncio.sleep(2.5)
for disembarker in to_remove: for disembarker in to_remove:
transporter.unit_array.remove(disembarker) transporter.unit_array.remove(disembarker)
if transporter.current_capacity == 0:
await set_as_not_disembarking(transporter)
logger.info(f"Spawned unit '{disembarker.name}' from open door of transport '{transporter.name}'.")
except Exception as e: except Exception as e:
logger.error(f"Error in check_for_unloadable_units: {e}") #logging.warning(e, exc_info=True)
logger.info(f"Error in check_for_unloadable_units: {e}")
def check_for_loadable_units(): async def check_for_loadable_units():
units = api.get_units() units = api.get_units()
for transporter in units.values(): try:
if transporter.alive and hasattr(transporter, 'is_transport') and not transporter.will_disembark: for transporter in units.values():
if len(transporter.unit_array) < transporter.max_capacity: if transporter.alive and hasattr(transporter, 'is_transport') and not transporter.will_disembark:
if transporter.speed < 2 and check_for_door_status(transporter): #check speed is less than 2 m/s and doors are open if len(transporter.unit_array) < transporter.max_capacity:
# print("Speed is okay") if transporter.speed < 2 and check_for_door_status(transporter): #check speed is less than 2 m/s and doors are open
embarker_units = [ # print("Speed is okay")
(embarker, embarker.position.distance_to(transporter.position)) embarker_units = [
for embarker in units.values() (embarker, embarker.position.distance_to(transporter.position))
if embarker.alive for embarker in units.values()
and hasattr(embarker, 'is_embarker') if embarker.alive
and getattr(embarker, 'is_loadable', True) # Check if is_loadable is True and hasattr(embarker, 'is_embarker')
and embarker.position.distance_to(transporter.position) < transporter.max_embark_range and getattr(embarker, 'is_loadable', True) # Check if is_loadable is True
] and embarker.position.distance_to(transporter.position) < transporter.max_embark_range
if embarker_units is None or len(embarker_units) == 0: ]
continue if embarker_units is None or len(embarker_units) == 0:
continue
else:
for embarker in embarker_units:
if hasattr(embarker, 'in_embark_queue') and embarker.in_embark_queue:
if embarker.in_embark_queue:
embarker_units.remove(embarker)
embarkers_sorted = sorted(embarker_units, key=lambda x: x[1])
closest_embarkers = embarkers_sorted[:transporter.max_capacity-len(transporter.en_boarding_queue)]
for embarker, distance in closest_embarkers:
if embarker not in transporter.en_boarding_queue and distance < transporter.max_embark_range and not hasattr(embarker, 'is_in_queue'):
transporter.en_boarding_queue.append(embarker)
embarker.in_embark_queue = True
embarker.is_in_queue = True
embarker.transport_unit = transporter
logger.info(f"Added embarker '{embarker.name}' to '{transporter.name}' s boarding queue.")
elif embarker not in transporter.en_boarding_queue and distance < transporter.max_embark_range and hasattr(embarker, 'is_in_queue'):
if embarker.is_in_queue:
await asyncio.sleep(60) #wait a bit and try again next time
embarker.is_in_queue = False
else:
transporter.en_boarding_queue.append(embarker)
embarker.in_embark_queue = True
embarker.is_in_queue = True
embarker.transport_unit = transporter
logger.info(f"Added embarker '{embarker.name}' to '{transporter.name}' s boarding queue.")
elif embarker in transporter.en_boarding_queue:
pass
else:
pass #we pass as the transport is full
except Exception as e:
logger.error(f"Error in check_for_loadable_units: {e}")
async def check_for_transport_embarker_or_disembark():
units = api.get_units()
try:
for transporter in units.values():
if transporter.alive and hasattr(transporter, 'is_transport'):
# Ensure the transporter has a `last_toggle_time` attribute
if not hasattr(transporter, 'last_toggle_time'):
transporter.last_toggle_time = 0 # Initialize it to 0
# Get the current time
current_time = time.time()
# Check if the toggle is allowed (min_toggle_time_period seconds since the last toggle)
if current_time - transporter.last_toggle_time < min_toggle_time_period:
continue # Skip toggling if the cooldown hasn't passed
# Check the loading toggle argument and toggle the state
if transporter.loading_toggle_argument is None or not hasattr(transporter, 'draw_arguments') or len(transporter.draw_arguments) <= transporter.loading_toggle_argument:
pass
else:
if transporter.will_disembark:
if transporter.draw_arguments[transporter.loading_toggle_argument].value <= transporter.disembark_embark_argument_toggle_argument_threshold:
continue
elif transporter.draw_arguments[transporter.loading_toggle_argument].value > transporter.disembark_embark_argument_toggle_argument_threshold:
# Set to embark
await set_as_not_disembarking(transporter)
transporter.last_toggle_time = current_time # Update the last toggle time
logger.info(f"Transporter '{transporter.name}' set to embark.")
else: else:
for embarker in embarker_units: if transporter.draw_arguments[transporter.loading_toggle_argument].value <= transporter.disembark_embark_argument_toggle_argument_threshold:
if hasattr(embarker, 'in_embark_queue') and embarker.in_embark_queue: # Set to disembark
if embarker.in_embark_queue: await set_as_disembarking(transporter)
embarker_units.remove(embarker) transporter.last_toggle_time = current_time # Update the last toggle time
logger.info(f"Transporter '{transporter.name}' set to disembark.")
embarkers_sorted = sorted(embarker_units, key=lambda x: x[1]) elif transporter.draw_arguments[transporter.loading_toggle_argument].value > transporter.disembark_embark_argument_toggle_argument_threshold:
closest_embarkers = embarkers_sorted[:transporter.max_capacity-len(transporter.en_boarding_queue)] continue
for embarker, distance in closest_embarkers:
if embarker not in transporter.en_boarding_queue and distance < transporter.max_embark_range:
transporter.en_boarding_queue.append(embarker)
embarker.in_embark_queue = True
embarker.transport_unit = transporter
logger.info(f"Added embarker '{embarker.name}' to '{transporter.name}' s boarding queue.")
elif embarker in transporter.en_boarding_queue:
pass
else:
pass #we pass as the transport is full
except Exception as e:
logger.error(f"Error in check_for_transport_embarker_or_disembark: {e}")
############# #############
#API SECTION# #API SECTION#
@@ -585,6 +710,8 @@ def on_api_startup(api: API):
# Initialize the list to hold units to delete # Initialize the list to hold units to delete
units_to_delete = [] units_to_delete = []
generate_transport_units() #comment this if doing draw Args testing
#new_test_unit() # comment this if running normally, this is used only for getting draw args
def on_unit_alive_change(unit: Unit, value: bool): def on_unit_alive_change(unit: Unit, value: bool):
global units_to_delete global units_to_delete
@@ -600,26 +727,50 @@ def on_unit_alive_change(unit: Unit, value: bool):
else: else:
pass pass
async def update_data(): # unit_args = []
units = api.get_units() # exclusions_array = [1,102,103,11,12,17,278,279,280,281,282,283,284,286,287,288,289,290,337,37,39,393,399,4,40,41,42,448,487,488,6,77,99]
for unit in units.values(): # async def check_args_changed():
if unit.alive and hasattr(unit, 'is_transport'): # global unit_args
stringified_json = json.dumps(Transporter(unit).to_json()) # units = api.get_units()
unit.set_custom_string(stringified_json) # for unit in units.values():
elif unit.alive and hasattr(unit, 'is_embarker'): # for argument in unit.draw_arguments:
stringified_json = json.dumps(Embarker(unit).to_json()) # if argument in unit_args:
unit.set_custom_string(stringified_json) # pass
await asyncio.sleep(1) # else:
# if argument.argument in exclusions_array:
# pass
# else:
# print(argument.argument, end=",")
# unit_args = unit.draw_arguments
# print("New loop")
# def new_test_unit():
# units = api.get_units()
# for unit in units.values():
# if unit.alive and unit.name in transport_types and not hasattr(unit, 'is_transport'):
# for i in range(500): #191
# unit.register_draw_argument(i)
# def check_arg_value():
# units = api.get_units()
# for unit in units.values():
# if unit.alive and unit.name in transport_types:
# unit.register_draw_argument(47) #191
# print(f"{unit.draw_arguments[0].argument} value is {unit.draw_arguments[0].value}")
async def on_api_update(api: API): async def on_api_update(api: API):
generate_transport_units() asyncio.create_task(check_for_loadable_units())
check_for_loadable_units()
asyncio.create_task(load_loadable_units()) asyncio.create_task(load_loadable_units())
asyncio.create_task(check_for_unloadable_units()) asyncio.create_task(check_for_unloadable_units())
asyncio.create_task(update_data()) asyncio.create_task(check_for_transport_embarker_or_disembark())
generate_transport_units()
#asyncio.create_task(check_args_changed())
#check_arg_value()
if __name__ == "__main__": if __name__ == "__main__":
api = API() api = API()
api.register_on_update_callback(on_api_update)
api.register_on_startup_callback(on_api_startup) api.register_on_startup_callback(on_api_startup)
api.register_on_update_callback(on_api_update)
api.run() api.run()