from weakref import KeyedRef import requests import json import os from datetime import datetime,timedelta from Util.MachineProductCfg import getZones import time import pytz import xml.dom.minidom import shutil import gzip import logging,coloredlogs import sys sys.path.append("./py2lib") import bit l = logging.getLogger(__name__) coloredlogs.install() #Zones/Counties to fetch alerts for zones = getZones() #You can safely edit the API key here. Make sure to include the ' before and after the key headlineApiKey = '21d8a80b3d6b444998a80b3d6b1449d3' detailsApiKey = '21d8a80b3d6b444998a80b3d6b1449d3' k = 0 def getAlerts(location): global k fetchUrl = 'https://api.weather.com/v3/alerts/headlines?areaId=' + location + ':US&format=json&language=en-US&apiKey=' + headlineApiKey response = requests.get(fetchUrl) theCode = response.status_code #Set the actions based on response code if theCode == 204: print('No alerts for area ' + location + '.\n') return elif theCode == 403: print("Uh oh! Your API key may not be authorized for alerts. Tsk Tsk. Maybe you shouldn't pirate IBM data :)\n") return elif theCode == 401: print("Uh oh! This request requires authentication. Maybe you shouldn't try to access resources for IBM employee's only :)\n") return elif theCode == 404: print("Uh oh! The requested resource cannot be found. This means either the URL is wrong or IBM is having technical difficulties :(\n Or.... They deleted the API :O\n") return elif theCode == 405: print("Uh oh! Got a 405! This means that somehow.... someway..... this script made an invalid request. So sad..... So terrible..... :(\n") return elif theCode == 406: print("Uh oh! Got a 406! This means that IBM doesn't like us. :(\n") return elif theCode == 408: print("Uh oh! We were too slow in providing IBM our alert request. Although I prefer to say we were Slowly Capable! :)\n") return elif theCode == 500: print("Uh oh! Seems IBM's on call IT Tech spilled coffee on the server! Looks like no alerts for a while. Please check back later :)\n") return elif theCode == 502 or theCode == 503 or theCode == 504: print("Uh oh! This is why you don't have interns messing with the server configuration. Please stand by while IBM's on call IT Tech resolves the issue :)\n") return elif theCode == 200: pass try: #Alright lets map our headline variables. l.debug('Found Alert for ' + location + '\n') dataH = response.json() alertsRoot = dataH['alerts'] for x in alertsRoot: detailKey = x['detailKey'] #Lets get map our detail variables. detailsUrl = 'https://api.weather.com/v3/alerts/detail?alertId=' + detailKey + '&format=json&language=en-US&apiKey=' + detailsApiKey detailsResponse = requests.get(detailsUrl) dataD = detailsResponse.json() detailsRoot = dataD['alertDetail'] theDetailsText = detailsRoot['texts'] detailsText = theDetailsText[0] descriptionRaw = detailsText['description'] language = detailsText['languageCode'] Identifier = location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + str(x['processTimeUTC']) #Is this for a NWS Zone or County? last4 = location[2:] locationType = None if 'C' in last4: locationType = 'C' elif 'Z' in last4: locationType = 'Z' #theIdent = str(Identifier) try: thecheck = open('./.temp/alertmanifest.txt', "r") check = thecheck.read() if check.find(Identifier) != -1: l.debug("Alert already sent...") return except FileNotFoundError: l.warning("alert manifest does not exist (yet)") k += 1 #We have an alert to send! #Lets Map Our Vocal Codes! vocalCheck = x['phenomena'] + '_' + x['significance'] vocalCode = None if vocalCheck == 'HU_W': vocalCode = 'HE001' elif vocalCheck == 'TY_W': vocalCode = 'HE002' elif vocalCheck == 'HI_W': vocalCode = 'HE003' elif vocalCheck == 'TO_A': vocalCode = 'HE004' elif vocalCheck == 'SV_A': vocalCode = 'HE005' elif vocalCheck == 'HU_A': vocalCode = 'HE006' elif vocalCheck == 'TY_A': vocalCode = 'HE007' elif vocalCheck == 'TR_W': vocalCode = 'HE008' elif vocalCheck == 'TR_A': vocalCode = 'HE009' elif vocalCheck == 'TI_W': vocalCode = 'HE010' elif vocalCheck == 'HI_A': vocalCode = 'HE011' elif vocalCheck == 'TI_A': vocalCode = 'HE012' elif vocalCheck == 'BZ_W': vocalCode = 'HE013' elif vocalCheck == 'IS_W': vocalCode = 'HE014' elif vocalCheck == 'WS_W': vocalCode = 'HE015' elif vocalCheck == 'HW_W': vocalCode = 'HE016' elif vocalCheck == 'LE_W': vocalCode = 'HE017' elif vocalCheck == 'ZR_Y': vocalCode = 'HE018' elif vocalCheck == 'CF_W': vocalCode = 'HE019' elif vocalCheck == 'LS_W': vocalCode = 'HE020' elif vocalCheck == 'WW_Y': vocalCode = 'HE021' elif vocalCheck == 'LB_Y': vocalCode = 'HE022' elif vocalCheck == 'LE_Y': vocalCode = 'HE023' elif vocalCheck == 'BZ_A': vocalCode = 'HE024' elif vocalCheck == 'WS_A': vocalCode = 'HE025' elif vocalCheck == 'FF_A': vocalCode = 'HE026' elif vocalCheck == 'FA_A': vocalCode = 'HE027' elif vocalCheck == 'FA_Y': vocalCode = 'HE028' elif vocalCheck == 'HW_A': vocalCode = 'HE029' elif vocalCheck == 'LE_A': vocalCode = 'HE030' elif vocalCheck == 'SU_W': vocalCode = 'HE031' elif vocalCheck == 'LS_Y': vocalCode = 'HE032' elif vocalCheck == 'CF_A': vocalCode = 'HE033' elif vocalCheck == 'ZF_Y': vocalCode = 'HE034' elif vocalCheck == 'FG_Y': vocalCode = 'HE035' elif vocalCheck == 'SM_Y': vocalCode = 'HE036' elif vocalCheck == 'EC_W': vocalCode = 'HE037' elif vocalCheck == 'EH_W': vocalCode = 'HE038' elif vocalCheck == 'HZ_W': vocalCode = 'HE039' elif vocalCheck == 'FZ_W': vocalCode = 'HE040' elif vocalCheck == 'HT_Y': vocalCode = 'HE041' elif vocalCheck == 'WC_Y': vocalCode = 'HE042' elif vocalCheck == 'FR_Y': vocalCode = 'HE043' elif vocalCheck == 'EC_A': vocalCode = 'HE044' elif vocalCheck == 'EH_A': vocalCode = 'HE045' elif vocalCheck == 'HZ_A': vocalCode = 'HE046' elif vocalCheck == 'DS_W': vocalCode = 'HE047' elif vocalCheck == 'WI_Y': vocalCode = 'HE048' elif vocalCheck == 'SU_Y': vocalCode = 'HE049' elif vocalCheck == 'AS_Y': vocalCode = 'HE050' elif vocalCheck == 'WC_W': vocalCode = 'HE051' elif vocalCheck == 'FZ_A': vocalCode = 'HE052' elif vocalCheck == 'WC_A': vocalCode = 'HE053' elif vocalCheck == 'AF_W': vocalCode = 'HE054' elif vocalCheck == 'AF_Y': vocalCode = 'HE055' elif vocalCheck == 'DU_Y': vocalCode = 'HE056' elif vocalCheck == 'LW_Y': vocalCode = 'HE057' elif vocalCheck == 'LS_A': vocalCode = 'HE058' elif vocalCheck == 'HF_W': vocalCode = 'HE059' elif vocalCheck == 'SR_W': vocalCode = 'HE060' elif vocalCheck == 'GL_W': vocalCode = 'HE061' elif vocalCheck == 'HF_A': vocalCode = 'HE062' elif vocalCheck == 'UP_W': vocalCode = 'HE063' elif vocalCheck == 'SE_W': vocalCode = 'HE064' elif vocalCheck == 'SR_A': vocalCode = 'HE065' elif vocalCheck == 'GL_A': vocalCode = 'HE066' elif vocalCheck == 'MF_Y': vocalCode = 'HE067' elif vocalCheck == 'MS_Y': vocalCode = 'HE068' elif vocalCheck == 'SC_Y': vocalCode = 'HE069' elif vocalCheck == 'UP_Y': vocalCode = 'HE073' elif vocalCheck == 'LO_Y': vocalCode = 'HE074' elif vocalCheck == 'AF_V': vocalCode = 'HE075' elif vocalCheck == 'UP_A': vocalCode = 'HE076' elif vocalCheck == 'TAV_W': vocalCode = 'HE077' elif vocalCheck == 'TAV_A': vocalCode = 'HE078' elif vocalCheck == 'TO_W': vocalCode = 'HE110' else: vocalCode = '' #Do some date/time conversions EndTimeUTCEpoch = x['expireTimeUTC'] EndTimeUTC = datetime.utcfromtimestamp(EndTimeUTCEpoch).strftime('%Y%m%d%H%M') #EndTimeUTC = EndTimeUTCString.astimezone(pytz.UTC) expireTimeEpoch = x['expireTimeUTC'] expireTimeUTC = datetime.utcfromtimestamp(expireTimeEpoch).strftime('%Y%m%d%H%M') #V3 Alert API doesn't give us issueTime in UTC. So we have to convert ourselves. Ughhh!! iTLDTS = x['issueTimeLocal'] iTLDTO = datetime.strptime(iTLDTS, '%Y-%m-%dT%H:%M:%S%z') issueTimeToUTC = iTLDTO.astimezone(pytz.UTC) issueTimeUtc = issueTimeToUTC.strftime('%Y%m%d%H%M') processTimeEpoch = x['processTimeUTC'] processTime = datetime.fromtimestamp(processTimeEpoch).strftime('%Y%m%d%H%M%S') #What is the action of this alert? Action = None if x['messageType'] == 'Update': Action = 'CON' elif x['messageType'] == 'New': Action = 'NEW' #Fix description to replace new lines with space and add XML escape Chars. when needed description = ' '.join(descriptionRaw.splitlines()) description = description.replace('&', '&') description = description.replace('<', '<') description = description.replace('>', '>') description = description.replace('-', '') description = description.replace(':', '') #Is this alert urgent? urgency ='piss' if vocalCheck == 'TO_W' or vocalCheck == 'SV_W' or vocalCheck == 'FF_W': urgency = 'BEUrgent' else: urgency = 'BERecord' alertMsg = 'NOT_USED' + x['productIdentifier'] + 'NOT_USED' + Action + '' + x['officeCode'] + '' + x['phenomena'] + '' + x['significance'] + '' + x['eventTrackingNumber'] + '' + x['eventDescription'] + 'NOT_USED' + EndTimeUTC + '' + str(x['severityCode']) + 'NOT_USED' + expireTimeUTC + '' + location + '' + x['adminDistrictCode'] + 'NOT_USEDNOT_USEDNOT_USED' + x['identifier'] + '' + processTime + '' + issueTimeUtc + '' + x['headlineText'] + '' + vocalCode + 'NOT_USED' + description + 'NOT_USED' + location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + x['eventTrackingNumber'] + '_' + x['officeCode'] + '' #Append BERecord with open('./.temp/BERecord.xml', "a") as b: b.write(alertMsg) b.close() #Add our alert to the manifest so we don't keep sending in the same alert every 60 seconds unless an update is issued. with open('./.temp/alertmanifest.txt', "a") as c: c.write('\n' + location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + str(x['processTimeUTC'])) c.close() except KeyError: # For some reason, TWC will return a 200 even though there's no alerts present in the API. l.error("DO NOT REPORT THE ERROR BELOW") l.error("Failed to write BERecord - Returned 200 but had no alerts.") os.remove('./.temp/BERecord.xml') # TODO: This should be converted into a function so it works better with async, that way we're not getting hung up on that time.sleep() call. def makeRecord(): global k with open("./.temp/BERecord.xml", 'a') as BERecord: BERecord.write('') BERecord.close() for z in zones: getAlerts(z) with open('./.temp/BERecord.xml', 'a') as BERecord: BERecord.write("") BERecord.close() dom = xml.dom.minidom.parse("./.temp/BERecord.xml") pretty_xml_as_string = dom.toprettyxml(indent = " ") with open("./.temp/BERecord.i2m", 'w') as h: h.write(pretty_xml_as_string[23:]) h.close() # If we don't need to send the i2 an alert, we don't need to gzip it. if k > 0: l.info("Sending alert(s) to the IntelliStar 2!") with open("./.temp/BERecord.i2m", 'rb') as f_in: with gzip.open("./.temp/BERecord.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) files = [] commands = [] gZipFile = "./.temp/BERecord.gz" files.append(gZipFile) command = commands.append('') bit.sendFile(files, commands, 1, 0) os.remove(gZipFile) k = 0 os.remove("./.temp/BERecord.xml") os.remove("./.temp/BERecord.i2m")