2024-06-11 17:55:44 +02:00

464 lines
20 KiB
Python

import math
import requests
import pyautogui
import time
import os
import yaml
import json
import numpy
import datetime
from geopy import distance
from fastkml import kml
from shapely import wkt, Point
from PIL import Image
from concurrent import futures
from os import listdir
from os.path import isfile, isdir, join
# global counters
fut_counter = 0
tot_futs = 0
start_time = None
last_screenshot_position = None
# constants
C = 40075016.686 # meters, Earth equatorial circumference
R = C / (2 * math.pi) # meters, Earth equatorial radius
PUT_RETRIES = 10 # allowable number of retries for the PUT request
SLEEP_DISTANCE = 30 # distance in kms. If a screenshot is taken with a distance from the previous screenshot longer than this value, the algorithm waits 5s for DCS to load the textures
def deg_to_num(lat_deg, lon_deg, zoom):
lat_rad = math.radians(lat_deg)
n = 1 << zoom
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
def num_to_deg(xtile, ytile, zoom):
n = 1 << zoom
lon_deg = xtile / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
lat_deg = math.degrees(lat_rad)
return lat_deg, lon_deg
def compute_mpps(lat, z):
return C * math.cos(math.radians(lat)) / math.pow(2, z + 8)
def print_progress_bar(iteration, total, start_time, prefix = '', suffix = '', decimals = 1, length = 40, fill = '', printEnd = "\r"):
now = datetime.datetime.now()
diff = (now - start_time).total_seconds()
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix} {iteration / diff:.3f} ops/s', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def done_callback(fut):
global fut_counter, tot_futs, start_time
fut_counter += 1
print_progress_bar(fut_counter, tot_futs, start_time)
def extract_tiles(n, screenshots_XY, params):
f = params['f']
zoom = params['zoom']
n_width = params['n_width']
n_height = params['n_height']
screenshots_folder = params['screenshots_folder']
tiles_folder = params['tiles_folder']
XY = screenshots_XY[n]
if (os.path.exists(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"))):
# Open the source screenshot
img = Image.open(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"))
# Compute the Web Mercator Projection position of the top left corner of the most centered tile
X_center, Y_center = XY[0], XY[1]
# Compute the position of the top left corner of the top left tile
start_x = img.width / 2 - n_width / 2 * 256
start_y = img.height / 2 - n_height / 2 * 256
# Iterate on the grid
for column in range(0, n_width):
for row in range(0, n_height):
# Crop the tile and compute its Web Mercator Projection position
box = (start_x + column * 256, start_y + row * 256, start_x + (column + 1) * 256, start_y + (row + 1) * 256)
X = X_center - math.floor(n_width / 2) + column
Y = Y_center - math.floor(n_height / 2) + row
# Save the tile
if not os.path.exists(os.path.join(tiles_folder, str(zoom), str(X))):
try:
os.mkdir(os.path.join(tiles_folder, str(zoom), str(X)))
except FileExistsError:
# Ignore this error, it means one other thread has already created the folder
pass
except Exception as e:
raise e
img.crop(box).convert('RGBA').save(os.path.join(tiles_folder, str(zoom), str(X), f"{Y}.png"))
n += 1
else:
raise Exception(f"{os.path.join(screenshots_folder, f'{f}_{n}_{zoom}.jpg')} missing")
def merge_tiles(base_path, zoom, tile):
X = tile[0]
Y = tile[1]
# If the image already exists, open it so we can paste the higher quality data in it
if os.path.exists(os.path.join(base_path, str(zoom - 1), str(X), f"{Y}.png")):
dst = Image.open(os.path.join(base_path, str(zoom - 1), str(X), f"{Y}.png")).convert('RGBA')
else:
dst = Image.new('RGBA', (256, 256), (0, 0, 0, 0))
# Loop on all the 4 subtiles in the tile
positions = [(0, 0), (0, 1), (1, 0), (1, 1)]
for i in range(0, 4):
# Open the subtile, if it exists, and resize it down to 128x128
if os.path.exists(os.path.join(base_path, str(zoom), str(2*X + positions[i][0]), f"{2*Y + positions[i][1]}.png")):
im = Image.open(os.path.join(base_path, str(zoom), str(2*X + positions[i][0]), f"{2*Y + positions[i][1]}.png")).convert('RGBA').resize((128, 128))
dst.paste(im, (positions[i][0] * 128, positions[i][1] * 128), im)
# Create the output folder if it exists
if not os.path.exists(os.path.join(base_path, str(zoom - 1), str(X))):
try:
os.mkdir(os.path.join(base_path, str(zoom - 1), str(X)))
except FileExistsError:
# Ignore this error, it means one other thread has already created the folder
pass
except Exception as e:
raise e
# Save the image
dst.save(os.path.join(base_path, str(zoom - 1), str(X), f"{Y}.png"))
def compress_tiles(base_path, zoom, tile, colors_number):
X = tile[0]
Y = tile[1]
path = os.path.join(base_path, str(zoom), str(X), f"{Y}.png")
initial_size = os.path.getsize(path)
im = Image.open(path)
im = im.quantize(colors_number)
im.save(path)
return initial_size, os.path.getsize(path)
def compute_longitude_error(delta_width, calib_ref_fn, calib_lng_fn, box1, box2):
calib_ref = Image.open(calib_ref_fn)
calib_lng = Image.open(calib_lng_fn)
calib_box1 = calib_ref.resize((calib_ref.width + delta_width, calib_ref.height)).crop(box1).convert('L')
calib_box2 = calib_lng.resize((calib_ref.width + delta_width, calib_ref.height)).crop(box2).convert('L')
return compute_difference(calib_box1, calib_box2)
def compute_latitude_error(delta_height, calib_ref_fn, calib_lat_fn, box3, box4):
calib_ref = Image.open(calib_ref_fn)
calib_lat = Image.open(calib_lat_fn)
calib_box3 = calib_ref.resize((calib_ref.width, calib_ref.height + delta_height)).crop(box3).convert('L')
calib_box4 = calib_lat.resize((calib_ref.width, calib_ref.height + delta_height)).crop(box4).convert('L')
return compute_difference(calib_box3, calib_box4)
def compute_correction_factor(XY, n_width, n_height, map_config, zoom, screenshots_folder, port):
# Take screenshots at the given position
take_screenshot(XY, 0, 0, map_config, zoom, screenshots_folder, "calib", "ref", port)
calib_ref = Image.open(os.path.join(screenshots_folder, f"calib_ref_{zoom}.jpg"))
# These calibration boxes are located at the edge of the interest region
box1 = (calib_ref.width / 2 + n_width / 2 * 256 - 50, calib_ref.height / 2 - n_height / 2 * 256 + 10,
calib_ref.width / 2 + n_width / 2 * 256 + 50, calib_ref.height / 2 + n_height / 2 * 256 - 10)
box2 = (calib_ref.width / 2 - n_width / 2 * 256 - 50, calib_ref.height / 2 - n_height / 2 * 256 + 10,
calib_ref.width / 2 - n_width / 2 * 256 + 50, calib_ref.height / 2 + n_height / 2 * 256 - 10)
box3 = (calib_ref.width / 2 - n_width / 2 * 256 + 10, calib_ref.height / 2 + n_height / 2 * 256 - 50,
calib_ref.width / 2 + n_width / 2 * 256 - 10, calib_ref.height / 2 + n_height / 2 * 256 + 50)
box4 = (calib_ref.width / 2 - n_width / 2 * 256 + 10, calib_ref.height / 2 - n_height / 2 * 256 - 50,
calib_ref.width / 2 + n_width / 2 * 256 - 10, calib_ref.height / 2 - n_height / 2 * 256 + 50)
# Check if there is enough variation at the calibration locations
if compute_variation(calib_ref.crop(box1).convert('L')) < 30 or \
compute_variation(calib_ref.crop(box3).convert('L')) < 30:
return None # Not enough variation
# Take screenshot east and south of it
take_screenshot((XY[0] + n_width, XY[1]), 0, 0, map_config, zoom, screenshots_folder, "calib", "lng", port)
take_screenshot((XY[0], XY[1] + n_height), 0, 0, map_config, zoom, screenshots_folder, "calib", "lat", port)
# Find the best correction factor to bring the two images to be equal on the longitude direction
with futures.ThreadPoolExecutor() as executor:
futs = [executor.submit(compute_longitude_error, delta_width, os.path.join(screenshots_folder, f"calib_ref_{zoom}.jpg"), os.path.join(screenshots_folder, f"calib_lng_{zoom}.jpg"), box1, box2) for delta_width in range(-15, 16)]
values = [fut.result() for fut in futures.as_completed(futs)]
best_delta_width = -15 + min(range(len(values)), key=values.__getitem__)
# Find the best correction factor to bring the two images to be equal on the latitude direction
with futures.ThreadPoolExecutor() as executor:
futs = [executor.submit(compute_latitude_error, delta_height, os.path.join(screenshots_folder, f"calib_ref_{zoom}.jpg"), os.path.join(screenshots_folder, f"calib_lat_{zoom}.jpg"), box3, box4) for delta_height in range(-15, 16)]
values = [fut.result() for fut in futures.as_completed(futs)]
best_delta_height = -15 + min(range(len(values)), key=values.__getitem__)
return (best_delta_width, best_delta_height)
def compute_difference(imageA, imageB):
err = numpy.sum((numpy.array(imageA).astype('float') - numpy.array(imageB).astype('float')) ** 2)
err /= float(imageA.width * imageA.height)
return err
def compute_variation(imageA):
min = numpy.min((numpy.array(imageA)))
max = numpy.max((numpy.array(imageA)))
return max - min
def take_screenshot(XY, n_width, n_height, map_config, zoom, screenshots_folder, f, n, port, correction = (0, 0)):
global last_screenshot_position
# Making POST request
# If the number of rows or columns is odd, we need to take the picture at the CENTER of the tile!
lat, lng = num_to_deg(XY[0] + (n_width % 2) / 2, XY[1] + (n_height % 2) / 2, zoom)
data = json.dumps({'lat': lat, 'lng': lng, 'alt': 1350 + map_config['zoom_factor'] * (25000 - 1350), 'mode': 'map'})
# Try to send the PUT request, up to PUT_RETRIES
retries = PUT_RETRIES
success = False
while not success and retries > 0:
try:
r = requests.post(f'http://127.0.0.1:{port}', data = data)
success = True
except:
retries -= 1
time.sleep(0.5) # Wait for any error to clear
if success == False:
raise Exception(f"Could not fulfill PUT request after {PUT_RETRIES} retries")
geo_data = json.loads(r.text)
if last_screenshot_position is None or distance.geodesic(last_screenshot_position, (lat, lng)).km > SLEEP_DISTANCE:
time.sleep(map_config['wait_period'])
else:
time.sleep(0.2)
last_screenshot_position = (lat, lng)
# Take and save screenshot. The response to the put request contains data, among which there is the north rotation at that point.
screenshot = pyautogui.screenshot()
# Scale the screenshot to account for Mercator Map Deformation
lat1, lng1 = num_to_deg(XY[0], XY[1], zoom)
lat2, lng2 = num_to_deg(XY[0] + 1, XY[1] + 1, zoom)
deltaLat = abs(lat2 - lat1)
deltaLng = abs(lng2 - lng1)
# Compute the height and width each tile should have
m_height = math.radians(deltaLat) * R
m_width = math.radians(deltaLng) * R * math.cos(math.radians(lat1))
# Compute the height and width the tile has
s_height = map_config['mpps'] * 256
s_width = map_config['mpps'] * 256
# Compute the scaling required to achieve that
sx = s_width / m_width
sy = s_height / m_height
# Rotate, resize and save the screenshot
screenshot.rotate(math.degrees(geo_data['northRotation'])).resize((int(sx * screenshot.width) + correction[0], int(sy * screenshot.height)+ correction[1] )).save(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"), quality=98)
def run(map_config, port):
global tot_futs, fut_counter, start_time
print("Script start time: ", datetime.datetime.now())
with open('configs/screen_properties.yml', 'r') as sp:
screen_config = yaml.safe_load(sp)
# Create output folders
if not os.path.exists(map_config['tiles_folder']):
os.makedirs(map_config['tiles_folder'])
if not os.path.exists(os.path.join(map_config['screenshots_folder'])):
skip_screenshots = False
replace_screenshots = True
os.makedirs(os.path.join(map_config['screenshots_folder']))
else:
skip_screenshots = map_config['skip_screenshots']
replace_screenshots = map_config['replace_screenshots']
# Compute the optimal zoom level
usable_width = screen_config['width'] - 400 # Keep a margin around the center
usable_height = screen_config['height'] - 400 # Keep a margin around the center
with open(map_config['boundary_file'], 'rt', encoding="utf-8") as bp:
# Read the config file
doc = bp.read()
k = kml.KML()
k.from_string(doc)
# Extract the features
features = []
for feature in k.features():
for sub_feature in list(feature.features()):
features.append(sub_feature)
# Iterate over all the closed features in the kml file
f = 1
for feature in features:
########### Take screenshots
geo = feature.geometry
# Define the boundary rect around the area
start_lat = geo.bounds[3]
start_lng = geo.bounds[0]
end_lat = geo.bounds[1]
end_lng = geo.bounds[2]
# Find the zoom level that better approximates the provided resolution
mpps_delta = [abs(compute_mpps((start_lat + end_lat) / 2, z) - map_config['mpps']) for z in range(0, 21)]
zoom = mpps_delta.index(min(mpps_delta))
print(f"Feature {f} of {len(features)}, using zoom level {zoom}")
# Find the maximum dimension of the tiles at the given resolution
mpps = compute_mpps(end_lat, zoom)
d = 256 * mpps / map_config['mpps']
n_height = math.floor(usable_height / d)
n_width = math.floor(usable_width / d)
print(f"Feature {f} of {len(features)}, each screenshot will provide {n_height} tiles in height and {n_width} tiles in width")
# Find the starting and ending points
start_X, start_Y = deg_to_num(start_lat, start_lng, zoom)
end_X, end_Y = deg_to_num(end_lat, end_lng, zoom)
print(f"Feature {f} of {len(features)}, finding screenshot locations, this may take a while...")
# Find all the X, Y coordinates inside of the provided area
screenshots_XY = []
for X in range(start_X, end_X, n_width):
for Y in range(start_Y, end_Y, n_height):
lat, lng = num_to_deg(X, Y, zoom)
p = Point(lng, lat)
if p.within(wkt.loads(geo.wkt)):
screenshots_XY.append((X, Y))
########### Take screenshots
if not map_config["extraction_only"] and not map_config["merging_only"] and not map_config["compression_only"]:
# Start looping
correction = None
if not skip_screenshots:
start_time = datetime.datetime.now()
print("Screenshots taking starting at: ", start_time)
print(f"Feature {f} of {len(features)}, {len(screenshots_XY)} screenshots will be taken")
n = 0
for XY in screenshots_XY:
if not os.path.exists(os.path.join(map_config['screenshots_folder'], f"{f}_{n}_{zoom}.jpg")) or replace_screenshots:
if n % 10 == 0 or correction is None:
new_correction = compute_correction_factor(XY, n_width, n_height, map_config, zoom, map_config['screenshots_folder'], port)
if new_correction is not None:
correction = new_correction
take_screenshot(XY, n_width, n_height, map_config, zoom, map_config['screenshots_folder'], f, n, port, correction if correction is not None else (0, 0))
print_progress_bar(n + 1, len(screenshots_XY), start_time)
n += 1
print(f"Taken {n} screenshots in {datetime.datetime.now() - start_time}s")
########### Extract the tiles
if not map_config["screenshots_only"] and not map_config["merging_only"] and not map_config["compression_only"]:
start_time = datetime.datetime.now()
res = []
print("Tiles extraction starting at: ", start_time)
if not os.path.exists(os.path.join(map_config["tiles_folder"], str(zoom))):
os.mkdir(os.path.join(map_config["tiles_folder"], str(zoom)))
params = {
"f": f,
"zoom": zoom,
"n_width": n_width,
"n_height": n_height,
"screenshots_folder": map_config['screenshots_folder'],
"tiles_folder": map_config['tiles_folder']
}
# Extract the tiles with parallel thread execution
with futures.ThreadPoolExecutor() as executor:
print(f"Feature {f} of {len(features)}, extracting tiles...")
futs = [executor.submit(extract_tiles, n, screenshots_XY, params) for n in range(0, len(screenshots_XY))]
tot_futs = len(futs)
fut_counter = 0
[fut.add_done_callback(done_callback) for fut in futs]
res.extend([fut.result() for fut in futures.as_completed(futs)])
print(f"Extracted {len(res) * n_width * n_height} images in {datetime.datetime.now() - start_time}s")
# Increase the feature counter
print(f"Feature {f} of {len(features)} completed!")
f += 1
########### Assemble tiles to get lower zoom levels
if not map_config["screenshots_only"] and not map_config["extraction_only"] and not map_config["compression_only"]:
start_time = datetime.datetime.now()
res = []
print("Tiles merging start time: ", start_time)
for current_zoom in range(zoom, map_config["final_level"], -1):
Xs = [int(d) for d in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom))) if isdir(join(map_config["tiles_folder"], str(current_zoom), d))]
existing_tiles = []
print("Finding tiles to merge...")
for X in Xs:
Ys = [int(f.removesuffix(".png")) for f in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom), str(X))) if isfile(join(map_config["tiles_folder"], str(current_zoom), str(X), f))]
for Y in Ys:
existing_tiles.append((X, Y))
tiles_to_produce = []
for tile in existing_tiles:
if (int(tile[0] / 2), int(tile[1] / 2)) not in tiles_to_produce:
tiles_to_produce.append((int(tile[0] / 2), int(tile[1] / 2)))
# Merge the tiles with parallel thread execution
with futures.ThreadPoolExecutor() as executor:
print(f"Merging tiles for zoom level {current_zoom - 1}...")
if not os.path.exists(os.path.join(map_config["tiles_folder"], str(current_zoom - 1))):
os.mkdir(os.path.join(map_config["tiles_folder"], str(current_zoom - 1)))
futs = [executor.submit(merge_tiles, os.path.join(map_config["tiles_folder"]), current_zoom, tile) for tile in tiles_to_produce]
tot_futs = len(futs)
fut_counter = 0
[fut.add_done_callback(done_callback) for fut in futs]
res.extend([fut.result() for fut in futures.as_completed(futs)])
print(f"Merged {len(res)} images in {datetime.datetime.now() - start_time}s")
########### Compress tiles to reduce storage and download times
if not map_config["screenshots_only"] and not map_config["extraction_only"] and not map_config["merging_only"]:
start_time = datetime.datetime.now()
res = []
print("Tiles compression start time: ", start_time)
for current_zoom in range(zoom, map_config["final_level"] - 1, -1):
Xs = [int(d) for d in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom))) if isdir(join(map_config["tiles_folder"], str(current_zoom), d))]
existing_tiles = []
for X in Xs:
Ys = [int(f.removesuffix(".png")) for f in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom), str(X))) if isfile(join(map_config["tiles_folder"], str(current_zoom), str(X), f))]
for Y in Ys:
existing_tiles.append((X, Y))
# Compress the tiles with parallel thread execution
with futures.ThreadPoolExecutor() as executor:
print(f"Compressing tiles for zoom level {current_zoom }...")
futs = [executor.submit(compress_tiles, os.path.join(map_config["tiles_folder"]), current_zoom, tile, map_config['colors_number']) for tile in existing_tiles]
tot_futs = len(futs)
fut_counter = 0
[fut.add_done_callback(done_callback) for fut in futs]
res.extend([fut.result() for fut in futures.as_completed(futs)])
total_initial_size = numpy.sum([r[0] for r in res]) / 1024 / 1024
total_final_size = numpy.sum([r[1] for r in res]) / 1024 / 1024
print(f"Compressed {len(res)} images in {datetime.datetime.now() - start_time}, initial size {total_initial_size:.3f}MB, final size {total_final_size:.3f}MB, compression ratio {(1 - total_final_size / total_initial_size )* 100:.3f}%")
print("Script end time: ", datetime.datetime.now())