import requests import json import os from datetime import datetime,timedelta from Util.MachineProductCfg import getAlertZones import time import pytz import xml.dom.minidom import shutil import gzip import logging,coloredlogs import sys sys.path.append("./py2lib") import bit l = logging.getLogger(__name__) coloredlogs.install() #Zones/Counties to fetch alerts for alertLocations = getAlertZones() #You can safely edit the API key here. Make sure to include the ' before and after the key headlineApiKey = '21d8a80b3d6b444998a80b3d6b1449d3' detailsApiKey = '21d8a80b3d6b444998a80b3d6b1449d3' k = 0 def getAlerts(location): global k fetchUrl = 'https://api.weather.com/v3/alerts/headlines?areaId=' + location + ':US&format=json&language=en-US&apiKey=' + headlineApiKey response = requests.get(fetchUrl) theCode = response.status_code #Our global variables #Set the actions based on response code if theCode == 204: l.info('No alerts for area ' + location + '.\n') return elif theCode == 403: l.critical("Uh oh! Your API key may not be authorized for alerts. Tsk Tsk. Maybe you shouldn't pirate IBM data :)\n") return elif theCode == 401: l.critical("Uh oh! This request requires authentication. Maybe you shouldn't try to access resources for IBM employee's only :)\n") return elif theCode == 404: l.error("Uh oh! The requested resource cannot be found. This means either the URL is wrong or IBM is having technical difficulties :(\n Or.... They deleted the API :O\n") return elif theCode == 405: l.error("Uh oh! Got a 405! This means that somehow.... someway..... this script made an invalid request. So sad..... So terrible..... :(\n") return elif theCode == 406: l.critical("Uh oh! Got a 406! This means that IBM doesn't like us. :(\n") return elif theCode == 408: l.error("Uh oh! We were too slow in providing IBM our alert request. Although I prefer to say we were Slowly Capable! :)\n") return elif theCode == 500: l.error("Uh oh! Seems IBM's on call IT Tech spilled coffee on the server! Looks like no alerts for a while. Please check back later :)\n") return elif theCode == 502 or theCode == 503 or theCode == 504: l.error("Uh oh! This is why you don't have interns messing with the server configuration. Please stand by while IBM's on call IT Tech resolves the issue :)\n") return elif theCode == 200: pass #Alright lets map our headline variables. l.debug('Found Alert for ' + location + '\n') dataH = response.json() alertsRoot = dataH['alerts'] for x in alertsRoot: detailKey = x['detailKey'] #Lets get map our detail variables. detailsUrl = 'https://api.weather.com/v3/alerts/detail?alertId=' + detailKey + '&format=json&language=en-US&apiKey=' + detailsApiKey detailsResponse = requests.get(detailsUrl) dataD = detailsResponse.json() detailsRoot = dataD['alertDetail'] theDetailsText = detailsRoot['texts'] detailsText = theDetailsText[0] descriptionRaw = detailsText['description'] language = detailsText['languageCode'] Identifier = location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + str(x['processTimeUTC']) #Is this for a NWS Zone or County? last4 = location[2:] locationType = None if 'C' in last4: locationType = 'C' elif 'Z' in last4: locationType = 'Z' #theIdent = str(Identifier) try: thecheck = open('./.temp/alertmanifest.txt', "r") check = thecheck.read() if check.find(Identifier) != -1: l.debug("Alert already sent...") return except FileNotFoundError: l.warning("alert manifest does not exist (yet)") k += 1 #We have an alert to send! #Lets Map Our Vocal Codes! vocalCheck = x['phenomena'] + '_' + x['significance'] vocalCode = None if vocalCheck == 'HU_W': vocalCode = 'HE001' elif vocalCheck == 'TY_W': vocalCode = 'HE002' elif vocalCheck == 'HI_W': vocalCode = 'HE003' elif vocalCheck == 'TO_A': vocalCode = 'HE004' elif vocalCheck == 'SV_A': vocalCode = 'HE005' elif vocalCheck == 'HU_A': vocalCode = 'HE006' elif vocalCheck == 'TY_A': vocalCode = 'HE007' elif vocalCheck == 'TR_W': vocalCode = 'HE008' elif vocalCheck == 'TR_A': vocalCode = 'HE009' elif vocalCheck == 'TI_W': vocalCode = 'HE010' elif vocalCheck == 'HI_A': vocalCode = 'HE011' elif vocalCheck == 'TI_A': vocalCode = 'HE012' elif vocalCheck == 'BZ_W': vocalCode = 'HE013' elif vocalCheck == 'IS_W': vocalCode = 'HE014' elif vocalCheck == 'WS_W': vocalCode = 'HE015' elif vocalCheck == 'HW_W': vocalCode = 'HE016' elif vocalCheck == 'LE_W': vocalCode = 'HE017' elif vocalCheck == 'ZR_Y': vocalCode = 'HE018' elif vocalCheck == 'CF_W': vocalCode = 'HE019' elif vocalCheck == 'LS_W': vocalCode = 'HE020' elif vocalCheck == 'WW_Y': vocalCode = 'HE021' elif vocalCheck == 'LB_Y': vocalCode = 'HE022' elif vocalCheck == 'LE_Y': vocalCode = 'HE023' elif vocalCheck == 'BZ_A': vocalCode = 'HE024' elif vocalCheck == 'WS_A': vocalCode = 'HE025' elif vocalCheck == 'FF_A': vocalCode = 'HE026' elif vocalCheck == 'FA_A': vocalCode = 'HE027' elif vocalCheck == 'FA_Y': vocalCode = 'HE028' elif vocalCheck == 'HW_A': vocalCode = 'HE029' elif vocalCheck == 'LE_A': vocalCode = 'HE030' elif vocalCheck == 'SU_W': vocalCode = 'HE031' elif vocalCheck == 'LS_Y': vocalCode = 'HE032' elif vocalCheck == 'CF_A': vocalCode = 'HE033' elif vocalCheck == 'ZF_Y': vocalCode = 'HE034' elif vocalCheck == 'FG_Y': vocalCode = 'HE035' elif vocalCheck == 'SM_Y': vocalCode = 'HE036' elif vocalCheck == 'EC_W': vocalCode = 'HE037' elif vocalCheck == 'EH_W': vocalCode = 'HE038' elif vocalCheck == 'HZ_W': vocalCode = 'HE039' elif vocalCheck == 'FZ_W': vocalCode = 'HE040' elif vocalCheck == 'HT_Y': vocalCode = 'HE041' elif vocalCheck == 'WC_Y': vocalCode = 'HE042' elif vocalCheck == 'FR_Y': vocalCode = 'HE043' elif vocalCheck == 'EC_A': vocalCode = 'HE044' elif vocalCheck == 'EH_A': vocalCode = 'HE045' elif vocalCheck == 'HZ_A': vocalCode = 'HE046' elif vocalCheck == 'DS_W': vocalCode = 'HE047' elif vocalCheck == 'WI_Y': vocalCode = 'HE048' elif vocalCheck == 'SU_Y': vocalCode = 'HE049' elif vocalCheck == 'AS_Y': vocalCode = 'HE050' elif vocalCheck == 'WC_W': vocalCode = 'HE051' elif vocalCheck == 'FZ_A': vocalCode = 'HE052' elif vocalCheck == 'WC_A': vocalCode = 'HE053' elif vocalCheck == 'AF_W': vocalCode = 'HE054' elif vocalCheck == 'AF_Y': vocalCode = 'HE055' elif vocalCheck == 'DU_Y': vocalCode = 'HE056' elif vocalCheck == 'LW_Y': vocalCode = 'HE057' elif vocalCheck == 'LS_A': vocalCode = 'HE058' elif vocalCheck == 'HF_W': vocalCode = 'HE059' elif vocalCheck == 'SR_W': vocalCode = 'HE060' elif vocalCheck == 'GL_W': vocalCode = 'HE061' elif vocalCheck == 'HF_A': vocalCode = 'HE062' elif vocalCheck == 'UP_W': vocalCode = 'HE063' elif vocalCheck == 'SE_W': vocalCode = 'HE064' elif vocalCheck == 'SR_A': vocalCode = 'HE065' elif vocalCheck == 'GL_A': vocalCode = 'HE066' elif vocalCheck == 'MF_Y': vocalCode = 'HE067' elif vocalCheck == 'MS_Y': vocalCode = 'HE068' elif vocalCheck == 'SC_Y': vocalCode = 'HE069' elif vocalCheck == 'UP_Y': vocalCode = 'HE073' elif vocalCheck == 'LO_Y': vocalCode = 'HE074' elif vocalCheck == 'AF_V': vocalCode = 'HE075' elif vocalCheck == 'UP_A': vocalCode = 'HE076' elif vocalCheck == 'TAV_W': vocalCode = 'HE077' elif vocalCheck == 'TAV_A': vocalCode = 'HE078' elif vocalCheck == 'TO_W': vocalCode = 'HE110' else: vocalCode = '' #Do some date/time conversions EndTimeUTCEpoch = x['expireTimeUTC'] EndTimeUTC = datetime.utcfromtimestamp(EndTimeUTCEpoch).strftime('%Y%m%d%H%M') #EndTimeUTC = EndTimeUTCString.astimezone(pytz.UTC) expireTimeEpoch = x['expireTimeUTC'] expireTimeUTC = datetime.utcfromtimestamp(expireTimeEpoch).strftime('%Y%m%d%H%M') #V3 Alert API doesn't give us issueTime in UTC. So we have to convert ourselves. Ughhh!! iTLDTS = x['issueTimeLocal'] iTLDTO = datetime.strptime(iTLDTS, '%Y-%m-%dT%H:%M:%S%z') issueTimeToUTC = iTLDTO.astimezone(pytz.UTC) issueTimeUtc = issueTimeToUTC.strftime('%Y%m%d%H%M') processTimeEpoch = x['processTimeUTC'] processTime = datetime.fromtimestamp(processTimeEpoch).strftime('%Y%m%d%H%M%S') #What is the action of this alert? Action = None if x['messageType'] == 'Update': Action = 'CON' elif x['messageType'] == 'New': Action = 'NEW' #Fix description to replace new lines with space and add XML escape Chars. when needed description = ' '.join(descriptionRaw.splitlines()) description = description.replace('&', '&') description = description.replace('<', '<') description = description.replace('>', '>') description = description.replace('-', '') description = description.replace(':', '') #Is this alert urgent? urgency ='piss' if vocalCheck == 'TO_W' or vocalCheck == 'SV_W' or vocalCheck == 'FF_W': urgency = 'BEUrgent' else: urgency = 'BERecord' alertMsg = 'NOT_USED' + x['productIdentifier'] + 'NOT_USED' + Action + '' + x['officeCode'] + '' + x['phenomena'] + '' + x['significance'] + '' + x['eventTrackingNumber'] + '' + x['eventDescription'] + 'NOT_USED' + EndTimeUTC + '' + str(x['severityCode']) + 'NOT_USED' + expireTimeUTC + '' + location + '' + x['adminDistrictCode'] + 'NOT_USEDNOT_USEDNOT_USED' + x['identifier'] + '' + processTime + '' + issueTimeUtc + '' + x['headlineText'] + '' + vocalCode + 'NOT_USED' + description + 'NOT_USED' + location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + x['eventTrackingNumber'] + '_' + x['officeCode'] + '' #Append BERecord with open('./.temp/BERecord.xml', "a") as b: b.write(alertMsg) b.close() #Add our alert to the manifest so we don't keep sending in the same alert every 60 seconds unless an update is issued. with open('./.temp/alertmanifest.txt', "a") as c: c.write('\n' + location + '_' + x['phenomena'] + '_' + x['significance'] + '_' + str(x['processTimeUTC'])) c.close() # TODO: This should be converted into a function so it works better with async, that way we're not getting hung up on that time.sleep() call. def makeRecord(): global k with open("./.temp/BERecord.xml", 'a') as BERecord: BERecord.write('') BERecord.close() for z in alertLocations: getAlerts(z) with open('./.temp/BERecord.xml', 'a') as BERecord: BERecord.write("") BERecord.close() dom = xml.dom.minidom.parse("./.temp/BERecord.xml") pretty_xml_as_string = dom.toprettyxml(indent = " ") with open("./.temp/BERecord.i2m", 'w') as h: h.write(pretty_xml_as_string[23:]) h.close() # If we don't need to send the i2 an alert, we don't need to gzip it. if k > 0: l.info("Sending alert(s) to the IntelliStar 2!") with open("./.temp/BERecord.i2m", 'rb') as f_in: with gzip.open("./.temp/BERecord.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) files = [] commands = [] gZipFile = "./.temp/BERecord.gz" files.append(gZipFile) command = commands.append('') bit.sendFile(files, commands, 1, 0) os.remove(gZipFile) k = 0 os.remove("./.temp/BERecord.xml") os.remove("./.temp/BERecord.i2m")