Added more configurability to map generator

This commit is contained in:
Pax1601
2024-03-10 13:43:09 +01:00
parent 5cc42dd9cf
commit 8c7f6abb1c
2 changed files with 50 additions and 44 deletions

View File

@@ -58,14 +58,15 @@ def done_callback(fut):
def extract_tiles(n, screenshots_XY, params):
f = params['f']
zoom = params['zoom']
output_directory = params['output_directory']
n_width = params['n_width']
n_height = params['n_height']
screenshots_folder = params['screenshots_folder']
tiles_folder = params['tiles_folder']
XY = screenshots_XY[n]
if (os.path.exists(os.path.join(output_directory, "screenshots", f"{f}_{n}_{zoom}.jpg"))):
if (os.path.exists(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"))):
# Open the source screenshot
img = Image.open(os.path.join(output_directory, "screenshots", f"{f}_{n}_{zoom}.jpg"))
img = Image.open(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"))
# Compute the Web Mercator Projection position of the top left corner of the most centered tile
X_center, Y_center = XY[0], XY[1]
@@ -83,19 +84,19 @@ def extract_tiles(n, screenshots_XY, params):
Y = Y_center - math.floor(n_height / 2) + row
# Save the tile
if not os.path.exists(os.path.join(output_directory, "tiles", str(zoom), str(X))):
if not os.path.exists(os.path.join(tiles_folder, str(zoom), str(X))):
try:
os.mkdir(os.path.join(output_directory, "tiles", str(zoom), str(X)))
os.mkdir(os.path.join(tiles_folder, str(zoom), str(X)))
except FileExistsError:
# Ignore this error, it means one other thread has already created the folder
pass
except Exception as e:
raise e
img.crop(box).convert('RGBA').save(os.path.join(output_directory, "tiles", str(zoom), str(X), f"{Y}.png"))
img.crop(box).convert('RGBA').save(os.path.join(tiles_folder, str(zoom), str(X), f"{Y}.png"))
n += 1
else:
raise Exception(f"{os.path.join(output_directory, 'screenshots', f'{f}_{n}_{zoom}.jpg')} missing")
raise Exception(f"{os.path.join(screenshots_folder, f'{f}_{n}_{zoom}.jpg')} missing")
def merge_tiles(base_path, zoom, tile):
X = tile[0]
@@ -128,10 +129,10 @@ def merge_tiles(base_path, zoom, tile):
# Save the image
dst.save(os.path.join(base_path, str(zoom - 1), str(X), f"{Y}.png"), quality=98)
def compute_correction_factor(XY, n_width, n_height, map_config, zoom, output_directory, port):
def compute_correction_factor(XY, n_width, n_height, map_config, zoom, screenshots_folder, port):
# Take screenshots at the given position
take_screenshot(XY, 0, 0, map_config, zoom, output_directory, "calib", "ref", port)
calib_ref = Image.open(os.path.join(output_directory, "screenshots", f"calib_ref_{zoom}.jpg"))
take_screenshot(XY, 0, 0, map_config, zoom, screenshots_folder, "calib", "ref", port)
calib_ref = Image.open(os.path.join(screenshots_folder, f"calib_ref_{zoom}.jpg"))
# These calibration boxes are located at the edge of the interest region
box1 = (calib_ref.width / 2 + n_width / 2 * 256 - 50, calib_ref.height / 2 - n_height / 2 * 256 + 10,
@@ -150,10 +151,10 @@ def compute_correction_factor(XY, n_width, n_height, map_config, zoom, output_di
return None # Not enough variation
# Take screenshot east and south of it
take_screenshot((XY[0] + n_width, XY[1]), 0, 0, map_config, zoom, output_directory, "calib", "lng", port)
take_screenshot((XY[0], XY[1] + n_height), 0, 0, map_config, zoom, output_directory, "calib", "lat", port)
calib_lat = Image.open(os.path.join(output_directory, "screenshots", f"calib_lat_{zoom}.jpg"))
calib_lng = Image.open(os.path.join(output_directory, "screenshots", f"calib_lng_{zoom}.jpg"))
take_screenshot((XY[0] + n_width, XY[1]), 0, 0, map_config, zoom, screenshots_folder, "calib", "lng", port)
take_screenshot((XY[0], XY[1] + n_height), 0, 0, map_config, zoom, screenshots_folder, "calib", "lat", port)
calib_lat = Image.open(os.path.join(screenshots_folder, f"calib_lat_{zoom}.jpg"))
calib_lng = Image.open(os.path.join(screenshots_folder, f"calib_lng_{zoom}.jpg"))
# Find the best correction factor to bring the two images to be equal on the longitude direction
best_err = None
@@ -189,7 +190,7 @@ def compute_variation(imageA):
max = numpy.max((numpy.array(imageA)))
return max - min
def take_screenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction = (0, 0)):
def take_screenshot(XY, n_width, n_height, map_config, zoom, screenshots_folder, f, n, port, correction = (0, 0)):
# Making PUT request
# If the number of rows or columns is odd, we need to take the picture at the CENTER of the tile!
lat, lng = num_to_deg(XY[0] + (n_width % 2) / 2, XY[1] + (n_height % 2) / 2, zoom)
@@ -236,7 +237,7 @@ def take_screenshot(XY, n_width, n_height, map_config, zoom, output_directory, f
sy = s_height / m_height
# Rotate, resize and save the screenshot
screenshot.rotate(math.degrees(geo_data['northRotation'])).resize((int(sx * screenshot.width) + correction[0], int(sy * screenshot.height)+ correction[1] )).save(os.path.join(output_directory, "screenshots", f"{f}_{n}_{zoom}.jpg"), quality=98)
screenshot.rotate(math.degrees(geo_data['northRotation'])).resize((int(sx * screenshot.width) + correction[0], int(sy * screenshot.height)+ correction[1] )).save(os.path.join(screenshots_folder, f"{f}_{n}_{zoom}.jpg"), quality=98)
def run(map_config, port):
global tot_futs, fut_counter
@@ -246,26 +247,22 @@ def run(map_config, port):
screen_config = yaml.safe_load(sp)
# Create output folders
output_directory = map_config['output_directory']
if not os.path.exists(output_directory):
os.mkdir(output_directory)
if not os.path.exists(map_config['tiles_folder']):
os.makedirs(map_config['tiles_folder'])
if not os.path.exists(os.path.join(output_directory, "screenshots")):
if not os.path.exists(os.path.join(map_config['screenshots_folder'])):
skip_screenshots = False
replace_screenshots = True
os.mkdir(os.path.join(output_directory, "screenshots"))
os.makedirs(os.path.join(map_config['screenshots_folder']))
else:
skip_screenshots = map_config['skip_screenshots']
replace_screenshots = map_config['replace_screenshots']
if not os.path.exists(os.path.join(output_directory, "tiles")):
os.mkdir(os.path.join(output_directory, "tiles"))
# Compute the optimal zoom level
usable_width = screen_config['width'] - 400 # Keep a margin around the center
usable_height = screen_config['height'] - 400 # Keep a margin around the center
existing_zoom_levels = [int(f) for f in listdir(os.path.join(output_directory, "tiles")) if isdir(join(output_directory, "tiles", f))]
existing_zoom_levels = [int(f) for f in listdir(os.path.join(map_config["tiles_folder"])) if isdir(join(map_config["tiles_folder"], f))]
if len(existing_zoom_levels) == 0:
final_level = 1
else:
@@ -331,27 +328,31 @@ def run(map_config, port):
print(f"Feature {f} of {len(features)}, taking screenshots...")
n = 0
for XY in screenshots_XY:
if not os.path.exists(os.path.join(output_directory, "screenshots", f"{f}_{n}_{zoom}.jpg")) or replace_screenshots:
if not os.path.exists(os.path.join(map_config['screenshots_folder'], f"{f}_{n}_{zoom}.jpg")) or replace_screenshots:
if n % 10 == 0 or correction is None:
new_correction = compute_correction_factor(XY, n_width, n_height, map_config, zoom, output_directory, port)
new_correction = compute_correction_factor(XY, n_width, n_height, map_config, zoom, map_config['screenshots_folder'], port)
if new_correction is not None:
correction = new_correction
take_screenshot(XY, n_width, n_height, map_config, zoom, output_directory, f, n, port, correction if correction is not None else (0, 0))
take_screenshot(XY, n_width, n_height, map_config, zoom, map_config['screenshots_folder'], f, n, port, correction if correction is not None else (0, 0))
print_progress_bar(n + 1, len(screenshots_XY))
n += 1
if map_config["screenshots_only"]:
return
########### Extract the tiles
print("Tiles extraction starting at: ", datetime.datetime.now())
if not os.path.exists(os.path.join(output_directory, "tiles", str(zoom))):
os.mkdir(os.path.join(output_directory, "tiles", str(zoom)))
if not os.path.exists(os.path.join(map_config["tiles_folder"], str(zoom))):
os.mkdir(os.path.join(map_config["tiles_folder"], str(zoom)))
params = {
"f": f,
"zoom": zoom,
"output_directory": output_directory,
"n_width": n_width,
"n_height": n_height,
"screenshots_folder": map_config['screenshots_folder'],
"tiles_folder": map_config['tiles_folder']
}
# Extract the tiles with parallel thread execution
@@ -373,10 +374,10 @@ def run(map_config, port):
########### Assemble tiles to get lower zoom levels
print("Tiles merging start time: ", datetime.datetime.now())
for current_zoom in range(zoom, final_level, -1):
Xs = [int(d) for d in listdir(os.path.join(output_directory, "tiles", str(current_zoom))) if isdir(join(output_directory, "tiles", str(current_zoom), d))]
Xs = [int(d) for d in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom))) if isdir(join(map_config["tiles_folder"], str(current_zoom), d))]
existing_tiles = []
for X in Xs:
Ys = [int(f.removesuffix(".png")) for f in listdir(os.path.join(output_directory, "tiles", str(current_zoom), str(X))) if isfile(join(output_directory, "tiles", str(current_zoom), str(X), f))]
Ys = [int(f.removesuffix(".png")) for f in listdir(os.path.join(map_config["tiles_folder"], str(current_zoom), str(X))) if isfile(join(map_config["tiles_folder"], str(current_zoom), str(X), f))]
for Y in Ys:
existing_tiles.append((X, Y))
@@ -389,10 +390,10 @@ def run(map_config, port):
with futures.ThreadPoolExecutor() as executor:
print(f"Merging tiles for zoom level {current_zoom - 1}...")
if not os.path.exists(os.path.join(output_directory, "tiles", str(current_zoom - 1))):
os.mkdir(os.path.join(output_directory, "tiles", str(current_zoom - 1)))
if not os.path.exists(os.path.join(map_config["tiles_folder"], str(current_zoom - 1))):
os.mkdir(os.path.join(map_config["tiles_folder"], str(current_zoom - 1)))
futs = [executor.submit(merge_tiles, os.path.join(output_directory, "tiles"), current_zoom, tile) for tile in tiles_to_produce]
futs = [executor.submit(merge_tiles, os.path.join(map_config["tiles_folder"]), current_zoom, tile) for tile in tiles_to_produce]
tot_futs = len(futs)
fut_counter = 0
[fut.add_done_callback(done_callback) for fut in futs]