Heavy optimizations using multithreading & async

This commit is contained in:
April 2022-10-17 03:27:46 -07:00
parent 52d470ad65
commit 1ff7731501
No known key found for this signature in database
GPG Key ID: 17A9A017FAA4DE5E

View File

@ -1,19 +1,18 @@
import asyncio import asyncio
import collections
from genericpath import exists from genericpath import exists
import gzip import gzip
from lib2to3.pytree import convert from multiprocessing import Pool
from tkinter.filedialog import Open
import aiohttp import aiohttp
import aiofiles
import json import json
import time as epochTime import time as epochTime
import requests
from RadarProcessor import * from RadarProcessor import *
from os import path, mkdir, listdir, remove from os import path, mkdir, listdir, remove, cpu_count
from shutil import copyfile, rmtree, copyfileobj from shutil import rmtree
from PIL import Image as PILImage from PIL import Image as PILImage
from wand.image import Image as wandImage from wand.image import Image as wandImage
from wand.display import display
from wand.drawing import Drawing
from wand.color import Color from wand.color import Color
@ -30,7 +29,7 @@ import bit
async def getValidTimestamps(boundaries:ImageBoundaries) -> list: async def getValidTimestamps(boundaries:ImageBoundaries) -> list:
"""Gets all valid UNIX timestamps for the TWCRadarMosaic product """ """Gets all valid UNIX timestamps for the TWCRadarMosaic product """
print("Getting timestamps for the radar..")
times = [] times = []
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
@ -57,23 +56,31 @@ async def getValidTimestamps(boundaries:ImageBoundaries) -> list:
return times return times
def downloadRadarTile(url, p, fn):
img = requests.get(url, stream=True)
ts = fn.split("_")[0]
download = True
async def downloadRadarTile(x, y, timestamp): # Make the path if it doesn't exist
""" Downloads the specified radar tile matching the timestamp, x, and y coordinates. """ if exists(f"tiles/output/{ts}.tiff"):
# Make the directory for the tile to sit in. print("Not downloading tiles for timestamp " + str(ts) + " since a frame for it already exists." )
if not path.exists('tiles/' + str(timestamp)): download = False
mkdir('tiles/' + str(timestamp)) if not path.exists(p):
mkdir(p)
print(f"Download {ts}")
if exists(f"{p}/{fn}"):
print(f"Not downloading new tiles for {ts} as they already exist.")
download = False
async with aiohttp.ClientSession() as session: if (img.status_code == 200 and download):
url = f"https://api.weather.com/v3/TileServer/tile?product=twcRadarMosaic&ts={str(timestamp)}&xyz={x}:{y}:6&apiKey=21d8a80b3d6b444998a80b3d6b1449d3" with open(f'{p}/{fn}', 'wb') as tile:
for data in img:
tile.write(data)
elif (img.status_code != 200):
print("ERROR DOWNLOADING " + p + "\nSTATUS CODE " + str(img.status_code))
elif (download == False):
pass
if not path.exists(f'tiles/{timestamp}/{timestamp}_{x}_{y}.png'):
async with session.get(url) as r:
data = await r.read()
async with aiofiles.open(f'tiles/{timestamp}/{timestamp}_{x}_{y}.png', 'wb') as f:
await f.write(data)
await f.close()
def getImageBoundaries() -> ImageBoundaries: def getImageBoundaries() -> ImageBoundaries:
@ -186,12 +193,8 @@ def convertPaletteToWXPro(filepath:str):
img.opaque_paint(Color('rgb(40,93,106)'), snowColors[3], 7000.0) img.opaque_paint(Color('rgb(40,93,106)'), snowColors[3], 7000.0)
img.opaque_paint(Color('rgb(13,49,64)'), snowColors[3]), 7000.0 img.opaque_paint(Color('rgb(13,49,64)'), snowColors[3]), 7000.0
img.format = 'tiff'
img.background_color = Color('black')
img.alpha_channel = 'remove'
img.compression = 'lzw' img.compression = 'lzw'
img.save(filename=filepath.replace('png', 'tiff')) img.save(filename=filepath)
remove(filepath)
@ -215,45 +218,59 @@ async def makeRadarImages():
CalculateBounds(upperRight, lowerLeft, upperLeft, lowerRight) CalculateBounds(upperRight, lowerLeft, upperLeft, lowerRight)
times = await getValidTimestamps(boundaries) times = await getValidTimestamps(boundaries)
# # Get rid of invalid tiles # Get rid of invalid radar frames
# for i in range(0, len(listdir('tiles/'))): for i in listdir('tiles/output'):
# dir = listdir('tiles/')[i] if i.split('.')[0] not in [str(x) for x in times] and i != "Thumbs.db":
print(f"Deleting {i} as it is no longer valid.")
remove("tiles/output/" + i)
# if int(dir) not in times and int(dir) != "output": # Collect coordinates for the frame tiles
# print("Clearing invalid timestamp " + dir)
# rmtree('tiles/' + dir)
# # Get rid of invalid radar frames
# for i in range(0, len(listdir('tiles/output'))):
# frame = listdir('titles/output')[i]
# if frame not in str(times): remove('tiles/output/' + frame)
for t in range(0, len(times)):
print("Downloading tiles for timestamp " + str(times[t]) + f" (#{t})")
# Download all needed radar tiles to make 1 frame
for y in range(yStart, yEnd): for y in range(yStart, yEnd):
if y <= yEnd: if y <= yEnd:
for x in range(xStart, xEnd): for x in range(xStart, xEnd):
if x <= xEnd: if x <= xEnd:
await downloadRadarTile(x, y + 1, times[t]) combinedCoordinates.append(Point(x,y))
combinedCoordinates.append(Point(x,y + 1)) # Create urls, paths, and filenames to download tiles for.
urls = []
paths = []
filenames = []
for i in range(0, len(times)):
for c in range(0, len(combinedCoordinates)):
if not exists(f'tiles/output/{times[i]}.tiff'):
urls.append(f"https://api.weather.com/v3/TileServer/tile?product=twcRadarMosaic&ts={str(times[i])}&xyz={combinedCoordinates[c].x}:{combinedCoordinates[c].y}:6&apiKey=21d8a80b3d6b444998a80b3d6b1449d3")
paths.append(f"tiles/{times[i]}")
filenames.append(f"{times[i]}_{combinedCoordinates[c].x}_{combinedCoordinates[c].y}.png")
print(len(urls))
if len(urls) != 0 and len(urls) >= 6:
with Pool(cpu_count() - 1) as p:
p.starmap(downloadRadarTile, zip(urls, paths, filenames))
p.close()
p.join()
elif len(urls) < 6 and len(urls) != 0: # We don't need to run more threads than we need to, that's how we get halted.
with Pool(len(urls)) as p:
p.starmap(downloadRadarTile, zip(urls, paths, filenames))
p.close()
p.join()
elif len(urls) == 0:
print("No new radar frames need to be downloaded.")
return
# Stitch them all together! # Stitch them all together!
imgsToGenerate = [] imgsToGenerate = []
finishedImages = [] framesToComposite = []
finished = []
files = [] files = []
for t in times: for t in times:
imgsToGenerate.append(PILImage.new("RGBA", (imgW, imgH))) imgsToGenerate.append(PILImage.new("RGB", (imgW, imgH)))
# Stitch the frames together
for i in range(0, len(imgsToGenerate)): for i in range(0, len(imgsToGenerate)):
if not exists(F"tiles/output/{times[i]}.png"): if not exists(F"tiles/output/{times[i]}.tiff"):
print(f"GENERATE {times[i]}.png") print(f"Generate frame for {times[i]}")
for c in combinedCoordinates: for c in combinedCoordinates:
path = f"tiles/{times[i]}/{times[i]}_{c.x}_{c.y}.png" path = f"tiles/{times[i]}/{times[i]}_{c.x}_{c.y}.png"
@ -264,52 +281,44 @@ async def makeRadarImages():
imgsToGenerate[i].paste(placeTile, (xPlacement, yPlacement)) imgsToGenerate[i].paste(placeTile, (xPlacement, yPlacement))
imgsToGenerate[i].save(f"tiles/output/{times[i]}.png") # Don't render it with an alpha channel
finishedImages.append(f"tiles/output/{times[i]}.png") # Store the path so we can composite it using WAND and PIL imgsToGenerate[i].save(f"tiles/output/{times[i]}.tiff", compression = 'tiff_lzw')
framesToComposite.append(f"tiles/output/{times[i]}.tiff") # Store the path so we can composite it using WAND and PIL
# Composite images so that the i2 will take them without a fuss # Remove the tileset as we don't need it anymore!
for img in finishedImages: rmtree(f'tiles/{times[i]}')
# Composite images for the i2
for img in framesToComposite:
print("Attempting to composite " + img) print("Attempting to composite " + img)
# Crop the radar images something that the i2 will actually take # Crop the radar images something that the i2 will actually take
img_raw = wandImage(filename=img) img_raw = wandImage(filename=img)
img_raw.crop(upperLeftX, upperLeftY, width = int(lowerRightX - upperLeftX), height = int(lowerRightY - upperLeftY))
# print(upperLeftX, upperLeftY, int(lowerRightX - upperLeftX), int(lowerRightY - upperLeftY)) img_raw.compression = 'lzw'
img_raw.crop(upperLeftX, upperLeftY, int(lowerRightX - upperLeftX), int(lowerRightY - upperLeftY))
# img_raw.resize(boundaries.OGImgW, boundaries.OGImgH, 'box', 0)
# img_raw.transform(f'{boundaries.OGImgW}x{boundaries.OGImgH}')
img_raw.save(filename=img) img_raw.save(filename=img)
# Resize using PIL
imgPIL = PILImage.open(img) imgPIL = PILImage.open(img)
imgPIL = imgPIL.resize((boundaries.OGImgW, boundaries.OGImgH), 0) imgPIL = imgPIL.resize((boundaries.OGImgW, boundaries.OGImgH), 0)
imgPIL.save(img) imgPIL.save(img)
convertPaletteToWXPro(img) convertPaletteToWXPro(img)
finished.append(img)
commands = [] commands = []
# Send them all to the i2! # Send them all to the i2!
for img in range(0, len(finishedImages)): for i in range(0, len(finished)):
files = [] commands.append( '<MSG><Exec workRequest="storePriorityImage(FileExtension=.tiff,File={0},Location=US,ImageType=Radar,IssueTime=' + getTime(times[i]) + ')"/></MSG>' )
commands = []
files.append( f'tiles/output/{times[i]}.tiff' )
commands.append( '<MSG><Exec workRequest="storePriorityImage(FileExtension=.tiff,File={0},Location=US,ImageType=Radar,IssueTime=' + getTime(times[img]) + ')"/></MSG>' )
# print(file + "\n" + command) # print(file + "\n" + command)
bit.sendFile(files, commands, 1, 0) bit.sendFile([finished[i]], [commands[i]], 1, 0)
commands.pop(0)
files.pop(0)
# print(getTime(1665880800)) # print(getTime(1665880800))
loop = asyncio.get_event_loop() if __name__ == "__main__":
asyncio.run(makeRadarImages())
radarTask = loop.create_task(makeRadarImages())
try:
loop.run_until_complete(radarTask)
except asyncio.CancelledError: pass