Managing firewall rules on the fly using Python

Parsing raw text logs for IPs and automatically creating firewall rules during engagements


xPosed.py

#!/usr/bin/python3

#Author: Shain Lakin

import re
import logging
import argparse
from os import system
from itertools import cycle
from shutil import get_terminal_size
from threading import Thread
from time import sleep
from getpass import getpass
from subprocess import PIPE, Popen
from collections import Counter

logging.basicConfig(level=logging.INFO, filename="xposed.log", filemode="a", format='%(message)s')
password = getpass("sudo password: ")
system('clear')

banner= """
██╗  ██╗██████╗  ██████╗ ███████╗███████╗██████╗ 
╚██╗██╔╝██╔══██╗██╔═══██╗██╔════╝██╔════╝██╔══██╗
 ╚███╔╝ ██████╔╝██║   ██║███████╗█████╗  ██║  ██║
 ██╔██╗ ██╔═══╝ ██║   ██║╚════██║██╔══╝  ██║  ██║
██╔╝ ██╗██║     ╚██████╔╝███████║███████╗██████╔╝
╚═╝  ╚═╝╚═╝      ╚═════╝ ╚══════╝╚══════╝╚═════╝ 
                                                                                                                                                                                                                                                   
"""
print(banner)

parser = argparse.ArgumentParser(description="xPosed parses text logs for IP addresses and \
    adds a deny rule to UFW if the IP is seen more than the user defined number of allowed times. \
    Can also run a custom command instead of modifying the firewall.", \
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-m", "--mode", type=str, default="deny", help="Set the mode to create either deny or allow rules. Map mode uses scapy to passively map the network.")
parser.add_argument("-l", "--logfile", type=str, required=True, help="Path to logfile to parse.")
parser.add_argument("-w", "--whitelist", type=str, required=True, nargs="+", help="Whitelist IP addresses from automated actions.")
parser.add_argument("-a", "--allow", type=str, nargs="+", help="Allow IP addresses.")
parser.add_argument("-b", "--blacklist", type=str, nargs="+", help="Blacklisted IP addresses.")
parser.add_argument("-p", "--ports", type=str, help="Allowed ports.")
parser.add_argument("-x", "--count", default=10, type=int, help="Number of times ip can be seen before either being banned/allowed or a custom action is taken.")
parser.add_argument("-c", "--custom", type=str, help="Run a custom command instead of ufw. (% = IP address) Example: -c 'nmap -p 80,443 -oN % %'")
args = parser.parse_args()


allowp = "sudo -S ufw allow proto tcp from any to any port "
allow_insert = "sudo -S ufw insert 1 allow proto any from "
deny_insert = "sudo -S ufw insert 1 deny proto any from "
s_allow = "sudo -S ufw allow proto any from "
s_deny = "sudo -S ufw deny proto any from "


class Loader:
    def __init__(self, desc="Loading...", end="Done!...", timeout=0.1):
        """
        A loader-like context manager

        Args:
            desc (str, optional): The loader's description. Defaults to "Loading...".
            end (str, optional): Final print. Defaults to "Done!...".
            timeout (float, optional): Sleep time between prints. Defaults to 0.1.
        """
        self.desc = desc
        self.end = end
        self.timeout = timeout

        self._thread = Thread(target=self._animate, daemon=True)
        self.steps = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
        self.done = False

    def start(self):
        self._thread.start()
        return self

    def _animate(self):
        for c in cycle(self.steps):
            if self.done:
                break
            print(f"\r{self.desc} {c}", flush=True, end="")
            sleep(self.timeout)

    def __enter__(self):
        self.start()

    def stop(self):
        self.done = True
        cols = get_terminal_size((80, 20)).columns
        print("\r" + " " * cols, end="", flush=True)
        print(f"\r{self.end}", flush=True)

    def __exit__(self, exc_type, exc_value, tb):
        self.stop()


def proc(cmd):
    try:
        proc = Popen(f"{cmd}".split(), \
            stdin=PIPE, stdout=PIPE, stderr=PIPE)
        proc.communicate(password.encode())
    except KeyboardInterrupt:
        reset()


def reset():
    reset = input("\nReset firewall rules? (y/n): ")
    if reset == "n":
        exit(0)
    else:
        proc("sudo -S ufw --force reset")
        sleep(2)
        exit(0)


def process_log(log):
    pattern = re.compile(r'''((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)''')
    try:
        with open(log,'r') as l:
            ip = []
            valid = []
            for line in l:
                ip += re.findall( r'[0-9]+(?:\.[0-9]+){3}', line)
            for i in ip:
                result = pattern.search(i)
                if result:
                    valid.append(i)
                #print(valid)
            return valid
    except KeyboardInterrupt:
        reset()


def custom(ip):
    try:
        cmd = args.custom.replace("%", f"{ip}")
        loader = Loader(f"Executing : {cmd}...", f"Executed : {cmd}").start()
        proc(args.custom.replace("%", f"{ip}"))
        args.whitelist.append(ip)
        logging.info(cmd)
        loader.stop()
    except KeyboardInterrupt:
        reset()


def map():
    print("[!] Not yet implemented [!]\nExiting...")
    exit(0)


def setup():
    proc("sudo -S ufw enable")
    if args.ports:
        loader = Loader(f"Creating allow rule for : {args.ports}", f"Allowed : {args.ports}").start()
        proc(allowp + args.ports)
        sleep(1)
        loader.stop()
    if args.allow:
        for ip in args.allow:
            allow(ip)
    if args.blacklist:
        for ip in args.blacklist:
            deny(ip)


def deny(ip):
    try:
        loader = Loader(f"Blocking : {ip}", f"Denied : {ip}").start()
        if args.ports:
            proc(deny_insert + ip)
            sleep(1)
        else:
            proc(s_deny + ip)
            sleep(1)
        args.whitelist.append(ip)
        logging.info(ip)
        loader.stop()
    except KeyboardInterrupt:
        reset()


def allow(ip):
    try:
        loader = Loader(f"Allowing : {ip}", f"Allowed : {ip}").start()
        if args.ports:
            proc(allow_insert + ip)
            sleep(1)
        else:
            proc(s_allow + ip)
            sleep(1)
        args.whitelist.append(ip)
        logging.info(ip)
        loader.stop()
    except KeyboardInterrupt:
        reset()


def main():
    setup()
    while True:
        data = process_log(args.logfile)
        sleep(1)
        count = Counter(data)
        for x in (count.keys()):
            if count[x] > args.count:
                try:
                    sleep(1)
                    if x not in args.whitelist:
                        if args.custom:
                            custom(x)
                        if args.mode == "deny":
                            deny(x)
                        if args.mode == "allow":
                            allow(x)
                        if args.mode == "map":
                            map(x)
                except KeyboardInterrupt:
                    reset()


main()
******
Written by Shain Lakin on 29 October 2022