319 lines
17 KiB
Python

import os
import json
import logging
import requests
import re
from ipaddress import ip_address
# Open the config file and make it accessible via "cfg"
with open("conf.json", "r") as file:
cfg = json.load(file)
# Configure logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger()
try:
if cfg["loglevel"] == "debug":
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
except KeyError:
log.setLevel(logging.INFO)
log.debug("Logger was initialized")
## Functions
def checkIfIP(host):
try:
ip_address(host)
log.debug(str(host) + " is an IP address")
return True
except:
log.debug(host + " is not an IP address.")
return False
def loginToNPM(apiurl, user, pw): # To retrieve NPM's API session token
# Returns the token used for API requests.
session = requests.Session()
url = apiurl + "/api/tokens"
loginreq = session.post(url,data={"identity": user, "secret": pw})
if loginreq.status_code == 200: # Check if the API accepted our creds.
log.info("Logged into Nginx Proxy Manager")
response = loginreq.json()
return response['token']
elif loginreq.status_code == 401:
log.error("Your Nginx Proxy Manager credentials are incorrect. Please verify you put in the right ones!")
exit()
else:
log.error("Nginx Proxy Manager: " + response['error']['message'])
return None
def loginToPihole(serveraddr, phpassword, session):
# NEW Method as of Pi-Hole version 6.0 and newer.
# I truly appreciate them fixing but this broke so many things.
apiurl = "http://" + serveraddr + "/api/auth"
passwordjson = {"password": phpassword}
loginrq = session.request("POST", apiurl, json=passwordjson, verify=False)
log.debug("Sent POST for getting Pi-hole API authorization.")
if loginrq.status_code == 200: # yay
log.debug("API returned a 200, proceeding.") # We're good c:
log.info("Logged into Pi-hole API on "+serveraddr+"!")
fullresponse = loginrq.json()
phpsessid = fullresponse["session"]["sid"]
token = fullresponse["session"]["csrf"]
log.debug("phpsessid: " + phpsessid + " Token: "+ token)
return {'phpsessid': phpsessid, "csrftoken": token}
elif loginrq.status_code == 401: # Unauthorized
log.debug("Pi-hole returned a 401, halting!")
log.error("The Pi-hole retured an unauthorized error!")
log.error("Please make sure your Pi-hole password in conf.json is correct and accurate.")
return None
else:
log.error("Pi-hole returned this " + str(apireq.status_code) + " in error: "+ apireq.text)
def loginToPiholeOld(apiurl, phpassword):
# For whatever reason, Pi-hole's web CSRF token is in a hidden <div> element returned by the login.php in the homepage right after you login.
# This took *WAY* too long to figure out. This should be in the header for ease-of-use.
session = requests.Session()
url = "http://" + apiurl + "/admin/login.php"
loginrq = session.post(url,data={"pw": phpassword})
phpsessid = (str(loginrq.cookies.get_dict()["PHPSESSID"]))
regex = r'(<div id="token" hidden>)(\S+)(<\/div>)'
try:
token = re.findall(regex, loginrq.text, re.MULTILINE)[0][1]
except IndexError:
log.error(f"An authentication token wasn't returned from the Pi-hole server at {apiurl} - your password is likely incorrect!")
exit()
log.debug("phpsessid: " + phpsessid + " Token: "+ token)
return {'phpsessid': phpsessid, "csrftoken": token}
def getNPMHosts(apiurl,type):
log.debug("Type is set to "+str(type)+".")
# Add the port if it exists in the config
if cfg["npmadminport"] is not None:
log.debug("Added port "+ cfg["npmadminport"] + " to the URL")
apiurl = "http://"+ apiurl + ":" + cfg["npmadminport"]
if type == "proxy":
log.info("Retrieving proxy hosts from NPM API...")
url = apiurl + '/api/nginx/proxy-hosts'
log.debug("Set hosts API URL to " + url + ".")
elif type == "redir":
log.info("Retrieving redirection hosts from NPM API...")
url = apiurl + '/api/nginx/redirection-hosts'
log.debug("Set hosts API URL to " + url + ".")
elif type == "404" or type == "dead":
log.info("Retrieving 404 (dead) hosts from NPM API...")
url = apiurl + '/api/nginx/dead-hosts'
log.debug("Set hosts API URL to " + url + ".")
else:
log.error("No type of hosts to retrieve from NPM was specified.")
return None
hostlist = [] # Make the list.
apikey = loginToNPM(apiurl,cfg["npmemail"],cfg["npmpassword"])
apireq = requests.get(url, headers={'Authorization': "Bearer "+apikey})
if apireq.headers['content-type'] == "application/json; charset=utf-8": # Check if the API shat us a JSON.
log.debug("Passed content-type = application/json check.")
if apireq.status_code == 200: # Check if the API returned a 200 and accepted our token/key.
log.debug("API returned a 200, proceeding.") # We're good c:
fullresponse = apireq.json()
for i in range(len(fullresponse)):
if cfg['addDisabled']:
log.debug("Adding indice " + str(i) + " to list, containing "+ str(fullresponse[i]['domain_names']))
hostlist = hostlist + fullresponse[i]['domain_names']
else:
if fullresponse[i]['enabled'] == 1:
log.debug("Adding indice " + str(i) + " to list, containing "+ str(fullresponse[i]['domain_names']))
hostlist = hostlist + fullresponse[i]['domain_names']
else:
log.debug("Not adding indice " + str(i) + "to list.")
log.debug(type + " list has been created.")
for h in range(len(hostlist)):
if checkIfIP(hostlist[h]):
log.info("Deleting IP address "+ hostlist[h] + " from the list of " + type + " hosts.")
hostlist.pop(h)
break # Temp fix!!
log.info(type + " hosts have been retrieved. Count: " + str(len(hostlist)))
return hostlist
elif apireq.status_code == 403: # If the API gave us a 403 Forbidden/Permission Denied
log.debug("API returned a 403, halting!")
log.error("The API retured a permission denied error!")
log.error("Please make sure your Nginx Proxy Manager API key in conf.json is correct and accurate.")
return None
elif apireq.status_code == 401: # If the API gave us a 401 Unauthorized
log.debug("API returned a 401, halting!")
log.error("The API retured an unauthorized error!")
log.error("Please make sure your Nginx Proxy Manager API key in conf.json is correct and accurate.")
return None
else:
log.error("The API returned a " + str(apireq.status_code)+".")
log.error("Please make sure your Nginx Proxy Manager API key or URL in conf.json is correct and accurate.")
return None
else: # If we don't get a JSON
log.error("The API did not return a JSON, and instead a content type of "+str(apireq.headers['content-type'])+".")
log.error("Please make sure your Nginx Proxy Manager API key or URL in conf.json is correct and accurate.")
return None
def filterPiHoleHosts(targetsvr, list, filter):
# Adding filter functionality
if filter is not None:
filterlist = filter.replace(" ","").split(",")
for domain in filterlist:
log.debug("Filtering domain " + domain)
for host in list[:]:
log.debug("Host " + host + " Filter " + domain)
if domain in host:
index = list.index(host)
log.info("Removing host " + host + " from the filtered host list due to the filtered domain " + domain + ".")
list.remove(host)
else:
log.debug("The domain " + host + "does not contain the domain " + domain)
log.debug(domain + " domain was filtered, moving on to next one")
log.info("Domains were filtered. Adding the list to the server...")
return list
def addPiHoleHosts(apiurl, phpassword, targetsvr, list, session):
baseurl = "http://" + apiurl + "/api/config/dns/cnameRecords/"
piauth = loginToPihole(apiurl,phpassword,session)
hostname = apiurl
# Moved filter functionality to filterPiHoleHosts().
for i in list:
log.debug("Adding host " + i + " to " + hostname + " Pi-hole CNAME list.")
url = baseurl + i + "," + targetsvr
apireq = requests.put(url,cookies={"sid": piauth["phpsessid"]},headers={"X-CSRF-TOKEN": piauth["csrftoken"]})
response = apireq.json()
if apireq.status_code == 201: # Check if the API returned a "created object" message
log.info("Added " + i + " to " + hostname + " CNAME list.")
log.debug("PiHole API returned true. Message returned: "+ apireq.text)
elif apireq.status_code == 400 and ("Item already present" in response['error']['message']):
log.info("There is already a CNAME record for " + i)
else:
log.error("Pi-hole returned this " + str(apireq.status_code) + " in error: "+ apireq.text)
def addPiHoleHostsOld(apiurl, phpassword, targetsvr, list):
url = "http://" + apiurl + "/scripts/pi-hole/php/customcname.php"
hostname = apiurl
piauth = loginToPiholeOld(apiurl,phpassword)
# Moved filter functionality to filterPiHoleHosts().
for i in list:
payload = {"action": "add", "domain": i, "target": targetsvr, "token": piauth["csrftoken"]}
apireq = requests.post(url,data=payload,cookies={"PHPSESSID": piauth["phpsessid"]})
if apireq.status_code == 200: # Check if the API returned a 200 and accepted our token/key.
log.debug("Adding host " + i + " to " + hostname + " Pi-hole CNAME list.")
response = apireq.json()
log.debug(response)
try:
if response['success'] == False:
if "There is already" in response['message']:
log.info("There is already a CNAME record for " + i)
else:
log.debug("Pi-Hole API returned false!")
log.warning("The Pi-Hole API gave the following message:"+ response['message'])
elif response['success'] == True:
log.info("Added " + i + " to " + hostname + " CNAME list.")
log.debug("PiHole API returned true. Message returned: "+ response['message'])
except:
log.error("Pi-hole returned this message and was not JSON: "+ response)
def removePiHoleHosts(apiurl, phpassword, targetsvr, list, session):
baseurl = "http://" + apiurl + "/api/config/dns/cnameRecords/"
piauth = loginToPihole(apiurl,phpassword,session)
hostname = apiurl
# Moved filter functionality to filterPiHoleHosts().
for i in list:
log.debug("Adding host " + i + " to " + hostname + " Pi-hole CNAME list.")
url = baseurl + i + "," + targetsvr
apireq = requests.put(url,cookies={"sid": piauth["phpsessid"]},headers={"X-CSRF-TOKEN": piauth["csrftoken"]})
response = apireq.json()
if apireq.status_code == 201: # Check if the API returned a "created object" message
log.info("Added " + i + " to " + hostname + " CNAME list.")
log.debug("PiHole API returned true. Message returned: "+ apireq.text)
elif apireq.status_code == 400 and ("Item already present" in response['error']['message']):
log.info("There is already a CNAME record for " + i)
else:
log.error("Pi-hole returned this " + str(apireq.status_code) + " in error: "+ apireq.text)
def removePiHoleHostsOld(apiurl, phpassword, targetsvr, list):
url = apiurl + "/scripts/pi-hole/php/customcname.php"
piauth = loginToPihole(apiurl,phpassword)
for i in list:
log.info("Removing host "+ i + " from Pi-hole's CNAME list.")
payload = {"action": "delete", "domain": i, "target": targetsvr, "token": piauth["csrftoken"]}
apireq = requests.post(url,data=payload,cookies={"PHPSESSID": piauth["phpsessid"]})
if apireq.status_code == 200: # Check if the API returned a 200 and accepted our token/key.
response = apireq.json()
try:
if response['success'] == "false":
log.debug("Pi-Hole API returned false!")
log.warning("The Pi-Hole API gave the following message:"+ response['message'])
elif response['success'] == "true":
log.debug("PiHole API returned true. Message returned: "+ response['message'])
except:
log.error("Pi-hole returned this message: "+ response)
log.debug("Script has started!")
log.info("Getting all NPM hosts.")
proxyhosts = getNPMHosts(cfg['npmdnshostname'],"proxy") # Get All Proxy Hosts
redirhosts = getNPMHosts(cfg['npmdnshostname'],"redir") # Get All Redir Hosts
deadhosts = getNPMHosts(cfg['npmdnshostname'],"dead") # Get All 404 Hosts
session = requests.Session()
if ((proxyhosts == None) or (redirhosts == None) or (deadhosts == None)):
log.error("One (or more) of the lists of hosts returned None. Check above for any errors while retriving hosts from NPM.")
exit()
else:
log.info("NPM Hosts were retrieved!")
log.debug("Adding all the hosts together")
if cfg['removeDead'] == False: # if "removeDead" in the config is true, add the dead hosts to the main list, but otherwise no.
allhosts = proxyhosts + redirhosts + deadhosts
else:
allhosts = proxyhosts + redirhosts
log.info("Adding all hosts in the list to specified Pi-Hole server at "+ cfg["piholeurl"] + "...")
## I'll clean up the following later lmfao
if cfg["phOldVersion"] == True:
addPiHoleHostsOld(cfg["piholeurl"],cfg["piholepass"],cfg["npmdnshostname"],allhosts)
log.info("Succeeded in adding all hosts in the list to specified Pi-Hole server at "+ cfg["piholeurl"])
if cfg['filterenabled'] == "True":
log.debug("The filter server option was enabled.")
try:
if cfg["filterlist"] is not str(''):
filteredhosts = filterPiHoleHosts(cfg["npmdnshostname"],allhosts,cfg["filterlist"])
log.info("Adding all hosts in the list to filtered Pi-Hole server at "+ cfg["filteredpiholeurl"] + "...")
if cfg["filteredpiholepass"] is not str(''):
addPiHoleHostsOld(cfg["filteredpiholeurl"],cfg["filteredpiholepass"],cfg["npmdnshostname"],filteredhosts)
else:
addPiHoleHostsOld(cfg["filteredpiholeurl"],cfg["piholepass"],cfg["npmdnshostname"],filteredhosts)
log.info("Succeeded in adding all hosts in the list to filtered Pi-Hole server at "+ cfg["filteredpiholeurl"])
else:
log.error("There is nothing in the filter!")
log.error("Make sure you put at least 1 (one) domain in the filter list.")
except KeyError:
log.error("There is nothing in the filter!")
log.error("Make sure you put at least 1 (one) domain in the filter list.")
else:
addPiHoleHosts(cfg["piholeurl"],cfg["piholepass"],cfg["npmdnshostname"],allhosts,session)
log.info("Succeeded in adding all hosts in the list to specified Pi-Hole server at "+ cfg["piholeurl"])
if cfg['filterenabled'] == "True":
log.debug("The filter server option was enabled.")
try:
if cfg["filterlist"] is not str(''):
log.info("Filtering all hosts for filtered Pi-hole server " + cfg["filteredpiholeurl"])
filteredhosts = filterPiHoleHosts(cfg["npmdnshostname"],allhosts,cfg["filterlist"])
log.info("Adding all hosts in the list to filtered Pi-Hole server at "+ cfg["filteredpiholeurl"] + "...")
if cfg["filteredpiholepass"] is not str(''):
addPiHoleHosts(cfg["filteredpiholeurl"],cfg["filteredpiholepass"],cfg["npmdnshostname"],filteredhosts,session)
else:
addPiHoleHosts(cfg["filteredpiholeurl"],cfg["piholepass"],cfg["npmdnshostname"],filteredhosts,session)
log.info("Succeeded in adding all hosts in the list to filtered Pi-Hole server at "+ cfg["filteredpiholeurl"])
else:
log.error("There is nothing in the filter!")
log.error("Make sure you put at least 1 (one) domain in the filter list.")
except KeyError:
log.error("There is nothing in the filter!")
log.error("Make sure you put at least 1 (one) domain in the filter list.")
log.debug("Script ending")