Added argpars for better control of script execution

This commit is contained in:
Davide Passoni 2024-03-08 16:04:55 +01:00
parent 0c8de2dcf3
commit a2c8563adf
3 changed files with 110 additions and 85 deletions

View File

@ -10,7 +10,7 @@
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"args": ["./configs/Test/MediumResolution.yml"]
"args": ["-s", "-l", "1", "./configs/Test/MediumResolution.yml"]
},
{
"name": "Convert",

View File

@ -2,81 +2,93 @@ import sys
import yaml
import json
import requests
import argparse
from pyproj import Geod
from fastkml import kml
from shapely import wkt
from datetime import timedelta
import map_generator
parser = argparse.ArgumentParser(
prog='DCS Olympus map generator',
description='This script allows to automatically generate maps from DCS World',
epilog='Hit the DCS Olympus Discord for more information')
parser.add_argument('config', help='map configuration yaml file')
parser.add_argument('-s', '--skip_screenshots', action='store_true', help='if screenshots are already present, this flag will cause the script to completely skip the screenshot loop')
parser.add_argument('-r', '--replace_screenshots', action='store_true', help='if screenshots are already present, this flag will cause the script to replace all screenshots, even those that already exist. Has no effect if -s or --skip_screenshots is present')
parser.add_argument('-l', '--final_level', type=int, default=1, help='if tiles are already present for the zoom level that the script will output, this number will instruct up to which zoom level tile merging will be run. Defaults to 1.')
args = parser.parse_args()
# Port on which the camera control module is listening
port = 3003
if len(sys.argv) == 1:
print("Please provide a configuration file as first argument. You can also drop the configuration file on this script to run it.")
else:
config_file = sys.argv[1]
print(f"Using config file: {config_file}")
with open('configs/screen_properties.yml', 'r') as sp:
with open(config_file, 'r') as cp:
screen_config = yaml.safe_load(sp)
map_config = yaml.safe_load(cp)
config_file = args.config
if config_file is None:
raise Exception("No configuration file provided as input. Please run script with -h argument for more info")
print("Screen parameters:")
print(f"-> Screen width: {screen_config['width']}px")
print(f"-> Screen height: {screen_config['height']}px")
print(f"Using config file: {config_file}")
with open('configs/screen_properties.yml', 'r') as sp:
with open(config_file, 'r') as cp:
screen_config = yaml.safe_load(sp)
map_config = yaml.safe_load(cp)
map_config.update(vars(args))
print("Screen parameters:")
print(f"-> Screen width: {screen_config['width']}px")
print(f"-> Screen height: {screen_config['height']}px")
print("Map parameters:")
print(f"-> Output directory: {map_config['output_directory']}")
print(f"-> Boundary file: {map_config['boundary_file']}")
print(f"-> Zoom factor: {map_config['zoom_factor']}")
if 'geo_width' in map_config:
print(f"-> Geo width: {map_config['geo_width']}NM")
with open(map_config['boundary_file'], 'rt', encoding="utf-8") as bp:
# Read the config file and compute the total area of the covered map
doc = bp.read()
k = kml.KML()
k.from_string(doc)
geod = Geod(ellps="WGS84")
features = []
area = 0
for feature in k.features():
for sub_feature in list(feature.features()):
geo = sub_feature.geometry
area += abs(geod.geometry_area_perimeter(wkt.loads(geo.wkt))[0])
features.append(sub_feature)
print(f"Found {len(features)} features in the provided kml file")
if 'geo_width' not in map_config:
# Let the user input the size of the screen to compute resolution
data = json.dumps({'lat': features[0].geometry.bounds[1], 'lng': features[0].geometry.bounds[0], 'alt': 1350 + map_config['zoom_factor'] * (25000 - 1350), 'mode': 'map'})
try:
r = requests.put(f'http://127.0.0.1:{port}', data = data)
print("The F10 map in your DCS installation was setup. Please, use the measure tool and measure the width of the screen in Nautical Miles")
except:
print("No running DCS instance detected. You can still run the algorithm if you already took the screenshots, otherwise you will not be able to produce a map.")
map_config['geo_width'] = input("Insert the width of the screen in Nautical Miles: ")
print("Map parameters:")
print(f"-> Output directory: {map_config['output_directory']}")
print(f"-> Boundary file: {map_config['boundary_file']}")
print(f"-> Zoom factor: {map_config['zoom_factor']}")
map_config['mpps'] = float(map_config['geo_width']) * 1852 / screen_config['width']
if 'geo_width' in map_config:
print(f"-> Geo width: {map_config['geo_width']}NM")
with open(map_config['boundary_file'], 'rt', encoding="utf-8") as bp:
# Read the config file and compute the total area of the covered map
doc = bp.read()
k = kml.KML()
k.from_string(doc)
geod = Geod(ellps="WGS84")
features = []
area = 0
for feature in k.features():
for sub_feature in list(feature.features()):
geo = sub_feature.geometry
area += abs(geod.geometry_area_perimeter(wkt.loads(geo.wkt))[0])
features.append(sub_feature)
print(f"Found {len(features)} features in the provided kml file")
if 'geo_width' not in map_config:
# Let the user input the size of the screen to compute resolution
data = json.dumps({'lat': features[0].geometry.bounds[1], 'lng': features[0].geometry.bounds[0], 'alt': 1350 + map_config['zoom_factor'] * (25000 - 1350), 'mode': 'map'})
try:
r = requests.put(f'http://127.0.0.1:{port}', data = data)
print("The F10 map in your DCS installation was setup. Please, use the measure tool and measure the width of the screen in Nautical Miles")
except:
print("No running DCS instance detected. You can still run the algorithm if you already took the screenshots, otherwise you will not be able to produce a map.")
map_config['geo_width'] = input("Insert the width of the screen in Nautical Miles: ")
map_config['mpps'] = float(map_config['geo_width']) * 1852 / screen_config['width']
tile_size = 256 * map_config['mpps'] # meters
tiles_per_screenshot = int(screen_config['width'] / 256) * int(screen_config['height'] / 256)
tiles_num = int(area / (tile_size * tile_size))
screenshots_num = int(tiles_num / tiles_per_screenshot)
total_time = int(screenshots_num / 1.0)
print(f"Total area: {int(area / 1e6)} square kilometers")
print(f"Estimated number of tiles: {tiles_num}")
print(f"Estimated number of screenshots: {screenshots_num}")
input("Press enter to continue...")
map_generator.run(map_config, port)
tile_size = 256 * map_config['mpps'] # meters
tiles_per_screenshot = int(screen_config['width'] / 256) * int(screen_config['height'] / 256)
tiles_num = int(area / (tile_size * tile_size))
screenshots_num = int(tiles_num / tiles_per_screenshot)
total_time = int(screenshots_num / 1.0)
print(f"Total area: {int(area / 1e6)} square kilometers")
print(f"Estimated number of tiles: {tiles_num}")
print(f"Estimated number of screenshots: {screenshots_num}")
map_generator.run(map_config, port)

View File

@ -41,7 +41,7 @@ def num_to_deg(xtile, ytile, zoom):
def compute_mpps(lat, z):
return C * math.cos(math.radians(lat)) / math.pow(2, z + 8)
def printProgressBar(iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '', printEnd = "\r"):
def print_progress_bar(iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '', printEnd = "\r"):
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
@ -53,7 +53,7 @@ def printProgressBar(iteration, total, prefix = '', suffix = '', decimals = 1, l
def done_callback(fut):
global fut_counter, tot_futs
fut_counter += 1
printProgressBar(fut_counter, tot_futs)
print_progress_bar(fut_counter, tot_futs)
def extract_tiles(n, screenshots_XY, params):
f = params['f']
@ -128,15 +128,10 @@ def merge_tiles(base_path, zoom, tile):
# Save the image
dst.save(os.path.join(base_path, str(zoom - 1), str(X), f"{Y}.png"), quality=98)
def computeCorrectionFactor(XY, n_width, n_height, map_config, zoom, output_directory, port):
# Take screenshots at the given position, then east and south of it
takeScreenshot(XY, 0, 0, map_config, zoom, output_directory, "calib", "ref", port)
takeScreenshot((XY[0] + n_width, XY[1]), 0, 0, map_config, zoom, output_directory, "calib", "lng", port)
takeScreenshot((XY[0], XY[1] + n_height), 0, 0, map_config, zoom, output_directory, "calib", "lat", port)
def compute_correction_factor(XY, n_width, n_height, map_config, zoom, output_directory, port):
# Take screenshots at the given position
take_screenshot(XY, 0, 0, map_config, zoom, output_directory, "calib", "ref", port)
calib_ref = Image.open(os.path.join(output_directory, "screenshots", f"calib_ref_{zoom}.jpg"))
calib_lat = Image.open(os.path.join(output_directory, "screenshots", f"calib_lat_{zoom}.jpg"))
calib_lng = Image.open(os.path.join(output_directory, "screenshots", f"calib_lng_{zoom}.jpg"))
# These calibration boxes are located at the edge of the interest region
box1 = (calib_ref.width / 2 + n_width / 2 * 256 - 50, calib_ref.height / 2 - n_height / 2 * 256 + 10,
@ -149,13 +144,24 @@ def computeCorrectionFactor(XY, n_width, n_height, map_config, zoom, output_dire
box4 = (calib_ref.width / 2 - n_width / 2 * 256 + 10, calib_ref.height / 2 - n_height / 2 * 256 - 50,
calib_ref.width / 2 + n_width / 2 * 256 - 10, calib_ref.height / 2 - n_height / 2 * 256 + 50)
# Check if there is enough variation at the calibration locations
if compute_variation(calib_ref.crop(box1).convert('L')) < 30 or \
compute_variation(calib_ref.crop(box3).convert('L')) < 30:
return None # Not enough variation
# Take screenshot east and south of it
take_screenshot((XY[0] + n_width, XY[1]), 0, 0, map_config, zoom, output_directory, "calib", "lng", port)
take_screenshot((XY[0], XY[1] + n_height), 0, 0, map_config, zoom, output_directory, "calib", "lat", port)
calib_lat = Image.open(os.path.join(output_directory, "screenshots", f"calib_lat_{zoom}.jpg"))
calib_lng = Image.open(os.path.join(output_directory, "screenshots", f"calib_lng_{zoom}.jpg"))
# Find the best correction factor to bring the two images to be equal on the longitude direction
best_err = None
best_delta_width = 0
for delta_width in range(-5, 6):
calib_box1 = calib_ref.resize((calib_ref.width + delta_width, calib_ref.height)).crop(box1).convert('L')
calib_box2 = calib_lng.resize((calib_ref.width + delta_width, calib_ref.height)).crop(box2).convert('L')
err = computeDifference(calib_box1, calib_box2)
err = compute_difference(calib_box1, calib_box2)
if best_err is None or err < best_err:
best_delta_width = delta_width
best_err = err
@ -166,19 +172,24 @@ def computeCorrectionFactor(XY, n_width, n_height, map_config, zoom, output_dire
for delta_height in range(-5, 6):
calib_box3 = calib_ref.resize((calib_ref.width, calib_ref.height + delta_height)).crop(box3).convert('L')
calib_box4 = calib_lat.resize((calib_ref.width, calib_ref.height + delta_height)).crop(box4).convert('L')
err = computeDifference(calib_box3, calib_box4)
err = compute_difference(calib_box3, calib_box4)
if best_err is None or err < best_err:
best_delta_height = delta_height
best_err = err
return (best_delta_width, best_delta_height)
def computeDifference(imageA, imageB):
def compute_difference(imageA, imageB):
err = numpy.sum((numpy.array(imageA).astype('float') - numpy.array(imageB).astype('float')) ** 2)
err /= float(imageA.width * imageA.height)
return err
def takeScreenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction = (0, 0)):
def compute_variation(imageA):
min = numpy.min((numpy.array(imageA)))
max = numpy.max((numpy.array(imageA)))
return max - min
def take_screenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction = (0, 0)):
# Making PUT request
# If the number of rows or columns is odd, we need to take the picture at the CENTER of the tile!
lat, lng = num_to_deg(XY[0] + (n_width % 2) / 2, XY[1] + (n_height % 2) / 2, zoom)
@ -239,13 +250,13 @@ def run(map_config, port):
if not os.path.exists(output_directory):
os.mkdir(output_directory)
skip_screenshots = False
replace_screenshots = True
if not os.path.exists(os.path.join(output_directory, "screenshots")):
skip_screenshots = False
replace_screenshots = True
os.mkdir(os.path.join(output_directory, "screenshots"))
else:
skip_screenshots = (input("Raw screenshots already found for this config, do you want to skip directly to tiles extraction? Enter y to skip: ") == "y")
replace_screenshots = (input("Do you want to replace the existing screenshots? Enter y to replace: ") == "y")
skip_screenshots = map_config['skip_screenshots']
replace_screenshots = map_config['replace_screenshots']
if not os.path.exists(os.path.join(output_directory, "tiles")):
os.mkdir(os.path.join(output_directory, "tiles"))
@ -322,10 +333,12 @@ def run(map_config, port):
for XY in screenshots_XY:
if not os.path.exists(os.path.join(output_directory, "screenshots", f"{f}_{n}_{zoom}.jpg")) or replace_screenshots:
if n % 10 == 0 or correction is None:
correction = computeCorrectionFactor(XY, n_width, n_height, map_config, zoom, output_directory, port)
takeScreenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction)
new_correction = compute_correction_factor(XY, n_width, n_height, map_config, zoom, output_directory, port)
if new_correction is not None:
correction = new_correction
take_screenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction if correction is not None else (0, 0))
printProgressBar(n + 1, len(screenshots_XY))
print_progress_bar(n + 1, len(screenshots_XY))
n += 1
########### Extract the tiles
@ -355,7 +368,7 @@ def run(map_config, port):
f += 1
if zoom <= final_level:
final_level = int(input(f"Zoom level already exists. Starting from level {zoom}, please enter desired final zoom level: "))
final_level = map_config['final_level']
########### Assemble tiles to get lower zoom levels
print("Tiles merging start time: ", datetime.datetime.now())