From 1ff7731501af1583343e59452161571518e50b4c Mon Sep 17 00:00:00 2001 From: April Date: Mon, 17 Oct 2022 03:27:46 -0700 Subject: [PATCH] Heavy optimizations using multithreading & async --- radar/TWCRadarProcessor.py | 171 +++++++++++++++++++------------------ 1 file changed, 90 insertions(+), 81 deletions(-) diff --git a/radar/TWCRadarProcessor.py b/radar/TWCRadarProcessor.py index 56675a4..cbde682 100644 --- a/radar/TWCRadarProcessor.py +++ b/radar/TWCRadarProcessor.py @@ -1,19 +1,18 @@ import asyncio +import collections from genericpath import exists import gzip -from lib2to3.pytree import convert -from tkinter.filedialog import Open +from multiprocessing import Pool import aiohttp -import aiofiles import json import time as epochTime +import requests + from RadarProcessor import * -from os import path, mkdir, listdir, remove -from shutil import copyfile, rmtree, copyfileobj +from os import path, mkdir, listdir, remove, cpu_count +from shutil import rmtree from PIL import Image as PILImage from wand.image import Image as wandImage -from wand.display import display -from wand.drawing import Drawing from wand.color import Color @@ -30,7 +29,7 @@ import bit async def getValidTimestamps(boundaries:ImageBoundaries) -> list: """Gets all valid UNIX timestamps for the TWCRadarMosaic product """ - + print("Getting timestamps for the radar..") times = [] async with aiohttp.ClientSession() as session: @@ -56,25 +55,33 @@ async def getValidTimestamps(boundaries:ImageBoundaries) -> list: times.append(time) return times - -async def downloadRadarTile(x, y, timestamp): - """ Downloads the specified radar tile matching the timestamp, x, and y coordinates. """ - # Make the directory for the tile to sit in. - if not path.exists('tiles/' + str(timestamp)): - mkdir('tiles/' + str(timestamp)) +def downloadRadarTile(url, p, fn): + img = requests.get(url, stream=True) + ts = fn.split("_")[0] + download = True + + # Make the path if it doesn't exist + if exists(f"tiles/output/{ts}.tiff"): + print("Not downloading tiles for timestamp " + str(ts) + " since a frame for it already exists." ) + download = False + if not path.exists(p): + mkdir(p) + print(f"Download {ts}") + if exists(f"{p}/{fn}"): + print(f"Not downloading new tiles for {ts} as they already exist.") + download = False - async with aiohttp.ClientSession() as session: - url = f"https://api.weather.com/v3/TileServer/tile?product=twcRadarMosaic&ts={str(timestamp)}&xyz={x}:{y}:6&apiKey=21d8a80b3d6b444998a80b3d6b1449d3" + if (img.status_code == 200 and download): + with open(f'{p}/{fn}', 'wb') as tile: + for data in img: + tile.write(data) + elif (img.status_code != 200): + print("ERROR DOWNLOADING " + p + "\nSTATUS CODE " + str(img.status_code)) + elif (download == False): + pass - if not path.exists(f'tiles/{timestamp}/{timestamp}_{x}_{y}.png'): - async with session.get(url) as r: - data = await r.read() - async with aiofiles.open(f'tiles/{timestamp}/{timestamp}_{x}_{y}.png', 'wb') as f: - await f.write(data) - await f.close() - def getImageBoundaries() -> ImageBoundaries: """ Gets the image boundaries for the specified radar definition """ @@ -186,12 +193,8 @@ def convertPaletteToWXPro(filepath:str): img.opaque_paint(Color('rgb(40,93,106)'), snowColors[3], 7000.0) img.opaque_paint(Color('rgb(13,49,64)'), snowColors[3]), 7000.0 - img.format = 'tiff' - img.background_color = Color('black') - img.alpha_channel = 'remove' img.compression = 'lzw' - img.save(filename=filepath.replace('png', 'tiff')) - remove(filepath) + img.save(filename=filepath) @@ -215,45 +218,59 @@ async def makeRadarImages(): CalculateBounds(upperRight, lowerLeft, upperLeft, lowerRight) times = await getValidTimestamps(boundaries) - # # Get rid of invalid tiles - # for i in range(0, len(listdir('tiles/'))): - # dir = listdir('tiles/')[i] - - # if int(dir) not in times and int(dir) != "output": - # print("Clearing invalid timestamp " + dir) - # rmtree('tiles/' + dir) - - # # Get rid of invalid radar frames - # for i in range(0, len(listdir('tiles/output'))): - # frame = listdir('titles/output')[i] - # if frame not in str(times): remove('tiles/output/' + frame) + # Get rid of invalid radar frames + for i in listdir('tiles/output'): + if i.split('.')[0] not in [str(x) for x in times] and i != "Thumbs.db": + print(f"Deleting {i} as it is no longer valid.") + remove("tiles/output/" + i) + # Collect coordinates for the frame tiles + for y in range(yStart, yEnd): + if y <= yEnd: + for x in range(xStart, xEnd): + if x <= xEnd: + combinedCoordinates.append(Point(x,y)) - for t in range(0, len(times)): - print("Downloading tiles for timestamp " + str(times[t]) + f" (#{t})") + # Create urls, paths, and filenames to download tiles for. + urls = [] + paths = [] + filenames = [] + for i in range(0, len(times)): + for c in range(0, len(combinedCoordinates)): + if not exists(f'tiles/output/{times[i]}.tiff'): + urls.append(f"https://api.weather.com/v3/TileServer/tile?product=twcRadarMosaic&ts={str(times[i])}&xyz={combinedCoordinates[c].x}:{combinedCoordinates[c].y}:6&apiKey=21d8a80b3d6b444998a80b3d6b1449d3") + paths.append(f"tiles/{times[i]}") + filenames.append(f"{times[i]}_{combinedCoordinates[c].x}_{combinedCoordinates[c].y}.png") - # Download all needed radar tiles to make 1 frame - for y in range(yStart, yEnd): - if y <= yEnd: - for x in range(xStart, xEnd): - if x <= xEnd: - await downloadRadarTile(x, y + 1, times[t]) - - combinedCoordinates.append(Point(x,y + 1)) + print(len(urls)) + if len(urls) != 0 and len(urls) >= 6: + with Pool(cpu_count() - 1) as p: + p.starmap(downloadRadarTile, zip(urls, paths, filenames)) + p.close() + p.join() + elif len(urls) < 6 and len(urls) != 0: # We don't need to run more threads than we need to, that's how we get halted. + with Pool(len(urls)) as p: + p.starmap(downloadRadarTile, zip(urls, paths, filenames)) + p.close() + p.join() + elif len(urls) == 0: + print("No new radar frames need to be downloaded.") + return # Stitch them all together! imgsToGenerate = [] - finishedImages = [] + framesToComposite = [] + finished = [] files = [] for t in times: - imgsToGenerate.append(PILImage.new("RGBA", (imgW, imgH))) - + imgsToGenerate.append(PILImage.new("RGB", (imgW, imgH))) + # Stitch the frames together for i in range(0, len(imgsToGenerate)): - if not exists(F"tiles/output/{times[i]}.png"): - print(f"GENERATE {times[i]}.png") + if not exists(F"tiles/output/{times[i]}.tiff"): + print(f"Generate frame for {times[i]}") for c in combinedCoordinates: path = f"tiles/{times[i]}/{times[i]}_{c.x}_{c.y}.png" @@ -264,52 +281,44 @@ async def makeRadarImages(): imgsToGenerate[i].paste(placeTile, (xPlacement, yPlacement)) - imgsToGenerate[i].save(f"tiles/output/{times[i]}.png") - finishedImages.append(f"tiles/output/{times[i]}.png") # Store the path so we can composite it using WAND and PIL + # Don't render it with an alpha channel + imgsToGenerate[i].save(f"tiles/output/{times[i]}.tiff", compression = 'tiff_lzw') + framesToComposite.append(f"tiles/output/{times[i]}.tiff") # Store the path so we can composite it using WAND and PIL - # Composite images so that the i2 will take them without a fuss - for img in finishedImages: + # Remove the tileset as we don't need it anymore! + rmtree(f'tiles/{times[i]}') + + # Composite images for the i2 + for img in framesToComposite: print("Attempting to composite " + img) # Crop the radar images something that the i2 will actually take img_raw = wandImage(filename=img) - - # print(upperLeftX, upperLeftY, int(lowerRightX - upperLeftX), int(lowerRightY - upperLeftY)) - img_raw.crop(upperLeftX, upperLeftY, int(lowerRightX - upperLeftX), int(lowerRightY - upperLeftY)) - # img_raw.resize(boundaries.OGImgW, boundaries.OGImgH, 'box', 0) - # img_raw.transform(f'{boundaries.OGImgW}x{boundaries.OGImgH}') + img_raw.crop(upperLeftX, upperLeftY, width = int(lowerRightX - upperLeftX), height = int(lowerRightY - upperLeftY)) + img_raw.compression = 'lzw' img_raw.save(filename=img) + # Resize using PIL imgPIL = PILImage.open(img) imgPIL = imgPIL.resize((boundaries.OGImgW, boundaries.OGImgH), 0) imgPIL.save(img) convertPaletteToWXPro(img) + finished.append(img) + commands = [] # Send them all to the i2! - for img in range(0, len(finishedImages)): - files = [] - commands = [] - - files.append( f'tiles/output/{times[i]}.tiff' ) - commands.append( '' ) + for i in range(0, len(finished)): + commands.append( '' ) # print(file + "\n" + command) - bit.sendFile(files, commands, 1, 0) - - commands.pop(0) - files.pop(0) + bit.sendFile([finished[i]], [commands[i]], 1, 0) # print(getTime(1665880800)) -loop = asyncio.get_event_loop() - -radarTask = loop.create_task(makeRadarImages()) - -try: - loop.run_until_complete(radarTask) -except asyncio.CancelledError: pass \ No newline at end of file +if __name__ == "__main__": + asyncio.run(makeRadarImages()) \ No newline at end of file