| #!/usr/bin/env python3 |
| |
| import re |
| import os |
| import sys |
| import time |
| import atexit |
| import signal |
| import ipaddress |
| from collections import Counter |
| from random import randint |
| from threading import Thread |
| from threading import Lock |
| import redis |
| import json |
| import iptc |
| import dns.resolver |
| import dns.exception |
| |
| while True: |
| try: |
| redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '') |
| redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '') |
| if "".__eq__(redis_slaveof_ip): |
| r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0) |
| else: |
| r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0) |
| r.ping() |
| except Exception as ex: |
| print('%s - trying again in 3 seconds' % (ex)) |
| time.sleep(3) |
| else: |
| break |
| |
| pubsub = r.pubsub() |
| |
| WHITELIST = [] |
| BLACKLIST= [] |
| |
| bans = {} |
| |
| quit_now = False |
| exit_code = 0 |
| lock = Lock() |
| |
| def log(priority, message): |
| tolog = {} |
| tolog['time'] = int(round(time.time())) |
| tolog['priority'] = priority |
| tolog['message'] = message |
| r.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False)) |
| print(message) |
| |
| def logWarn(message): |
| log('warn', message) |
| |
| def logCrit(message): |
| log('crit', message) |
| |
| def logInfo(message): |
| log('info', message) |
| |
| def refreshF2boptions(): |
| global f2boptions |
| global quit_now |
| global exit_code |
| if not r.get('F2B_OPTIONS'): |
| f2boptions = {} |
| f2boptions['ban_time'] = int |
| f2boptions['max_attempts'] = int |
| f2boptions['retry_window'] = int |
| f2boptions['netban_ipv4'] = int |
| f2boptions['netban_ipv6'] = int |
| f2boptions['ban_time'] = r.get('F2B_BAN_TIME') or 1800 |
| f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS') or 10 |
| f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW') or 600 |
| f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4') or 32 |
| f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6') or 128 |
| r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False)) |
| else: |
| try: |
| f2boptions = {} |
| f2boptions = json.loads(r.get('F2B_OPTIONS')) |
| except ValueError: |
| print('Error loading F2B options: F2B_OPTIONS is not json') |
| quit_now = True |
| exit_code = 2 |
| |
| def refreshF2bregex(): |
| global f2bregex |
| global quit_now |
| global exit_code |
| if not r.get('F2B_REGEX'): |
| f2bregex = {} |
| f2bregex[1] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)' |
| f2bregex[2] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)' |
| f2bregex[3] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+' |
| f2bregex[4] = 'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+' |
| f2bregex[5] = 'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+' |
| f2bregex[6] = '-login: Disconnected.+ \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),' |
| f2bregex[7] = '-login: Aborted login.+ \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+' |
| f2bregex[8] = '-login: Aborted login.+ \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+' |
| f2bregex[9] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked' |
| f2bregex[10] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+' |
| r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False)) |
| else: |
| try: |
| f2bregex = {} |
| f2bregex = json.loads(r.get('F2B_REGEX')) |
| except ValueError: |
| print('Error loading F2B options: F2B_REGEX is not json') |
| quit_now = True |
| exit_code = 2 |
| |
| if r.exists('F2B_LOG'): |
| r.rename('F2B_LOG', 'NETFILTER_LOG') |
| |
| def mailcowChainOrder(): |
| global lock |
| global quit_now |
| global exit_code |
| while not quit_now: |
| time.sleep(10) |
| with lock: |
| filter4_table = iptc.Table(iptc.Table.FILTER) |
| filter4_table.refresh() |
| for f in [filter4_table]: |
| forward_chain = iptc.Chain(f, 'FORWARD') |
| input_chain = iptc.Chain(f, 'INPUT') |
| for chain in [forward_chain, input_chain]: |
| target_found = False |
| for position, item in enumerate(chain.rules): |
| if item.target.name == 'MAILCOW': |
| target_found = True |
| if position > 2: |
| logCrit('Error in %s chain order: MAILCOW on position %d, restarting container' % (chain.name, position)) |
| quit_now = True |
| exit_code = 2 |
| if not target_found: |
| logCrit('Error in %s chain: MAILCOW target not found, restarting container' % (chain.name)) |
| quit_now = True |
| exit_code = 2 |
| |
| def ban(address): |
| global lock |
| refreshF2boptions() |
| BAN_TIME = int(f2boptions['ban_time']) |
| MAX_ATTEMPTS = int(f2boptions['max_attempts']) |
| RETRY_WINDOW = int(f2boptions['retry_window']) |
| NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4']) |
| NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6']) |
| |
| ip = ipaddress.ip_address(address) |
| if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped: |
| ip = ip.ipv4_mapped |
| address = str(ip) |
| if ip.is_private or ip.is_loopback: |
| return |
| |
| self_network = ipaddress.ip_network(address) |
| |
| with lock: |
| temp_whitelist = set(WHITELIST) |
| |
| if temp_whitelist: |
| for wl_key in temp_whitelist: |
| wl_net = ipaddress.ip_network(wl_key, False) |
| if wl_net.overlaps(self_network): |
| logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net)) |
| return |
| |
| net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False) |
| net = str(net) |
| |
| if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW: |
| bans[net] = { 'attempts': 0 } |
| active_window = RETRY_WINDOW |
| else: |
| active_window = time.time() - bans[net]['last_attempt'] |
| |
| bans[net]['attempts'] += 1 |
| bans[net]['last_attempt'] = time.time() |
| |
| active_window = time.time() - bans[net]['last_attempt'] |
| |
| if bans[net]['attempts'] >= MAX_ATTEMPTS: |
| cur_time = int(round(time.time())) |
| logCrit('Banning %s for %d minutes' % (net, BAN_TIME / 60)) |
| if type(ip) is ipaddress.IPv4Address: |
| with lock: |
| chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW') |
| rule = iptc.Rule() |
| rule.src = net |
| target = iptc.Target(rule, "REJECT") |
| rule.target = target |
| if rule not in chain.rules: |
| chain.insert_rule(rule) |
| else: |
| pass |
| r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + BAN_TIME) |
| else: |
| logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net)) |
| |
| def unban(net): |
| global lock |
| if not net in bans: |
| logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net) |
| r.hdel('F2B_QUEUE_UNBAN', '%s' % net) |
| return |
| logInfo('Unbanning %s' % net) |
| if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network: |
| with lock: |
| chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW') |
| rule = iptc.Rule() |
| rule.src = net |
| target = iptc.Target(rule, "REJECT") |
| rule.target = target |
| if rule in chain.rules: |
| chain.delete_rule(rule) |
| else: |
| pass |
| r.hdel('F2B_ACTIVE_BANS', '%s' % net) |
| r.hdel('F2B_QUEUE_UNBAN', '%s' % net) |
| if net in bans: |
| del bans[net] |
| |
| def permBan(net, unban=False): |
| global lock |
| if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network: |
| with lock: |
| chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW') |
| rule = iptc.Rule() |
| rule.src = net |
| target = iptc.Target(rule, "REJECT") |
| rule.target = target |
| if rule not in chain.rules and not unban: |
| logCrit('Add host/network %s to blacklist' % net) |
| chain.insert_rule(rule) |
| r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time()))) |
| elif rule in chain.rules and unban: |
| logCrit('Remove host/network %s from blacklist' % net) |
| chain.delete_rule(rule) |
| r.hdel('F2B_PERM_BANS', '%s' % net) |
| else: |
| pass |
| |
| def quit(signum, frame): |
| global quit_now |
| quit_now = True |
| |
| def clear(): |
| global lock |
| logInfo('Clearing all bans') |
| for net in bans.copy(): |
| unban(net) |
| with lock: |
| filter4_table = iptc.Table(iptc.Table.FILTER) |
| for filter_table in [filter4_table]: |
| filter_table.autocommit = False |
| forward_chain = iptc.Chain(filter_table, "FORWARD") |
| input_chain = iptc.Chain(filter_table, "INPUT") |
| mailcow_chain = iptc.Chain(filter_table, "MAILCOW") |
| if mailcow_chain in filter_table.chains: |
| for rule in mailcow_chain.rules: |
| mailcow_chain.delete_rule(rule) |
| for rule in forward_chain.rules: |
| if rule.target.name == 'MAILCOW': |
| forward_chain.delete_rule(rule) |
| for rule in input_chain.rules: |
| if rule.target.name == 'MAILCOW': |
| input_chain.delete_rule(rule) |
| filter_table.delete_chain("MAILCOW") |
| filter_table.commit() |
| filter_table.refresh() |
| filter_table.autocommit = True |
| r.delete('F2B_ACTIVE_BANS') |
| r.delete('F2B_PERM_BANS') |
| pubsub.unsubscribe() |
| |
| def watch(): |
| logInfo('Watching Redis channel F2B_CHANNEL') |
| pubsub.subscribe('F2B_CHANNEL') |
| |
| global quit_now |
| global exit_code |
| |
| while not quit_now: |
| try: |
| for item in pubsub.listen(): |
| refreshF2bregex() |
| for rule_id, rule_regex in f2bregex.items(): |
| if item['data'] and item['type'] == 'message': |
| try: |
| result = re.search(rule_regex, item['data']) |
| except re.error: |
| result = False |
| if result: |
| addr = result.group(1) |
| ip = ipaddress.ip_address(addr) |
| if ip.is_private or ip.is_loopback: |
| continue |
| logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data'])) |
| ban(addr) |
| except Exception as ex: |
| logWarn('Error reading log line from pubsub') |
| quit_now = True |
| exit_code = 2 |
| |
| def snat4(snat_target): |
| global lock |
| global quit_now |
| |
| def get_snat4_rule(): |
| rule = iptc.Rule() |
| rule.src = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24' |
| rule.dst = '!' + rule.src |
| target = rule.create_target("SNAT") |
| target.to_source = snat_target |
| match = rule.create_match("comment") |
| match.comment = f'{int(round(time.time()))}' |
| return rule |
| |
| while not quit_now: |
| time.sleep(10) |
| with lock: |
| try: |
| table = iptc.Table('nat') |
| table.refresh() |
| chain = iptc.Chain(table, 'POSTROUTING') |
| table.autocommit = False |
| new_rule = get_snat4_rule() |
| for position, rule in enumerate(chain.rules): |
| match = all(( |
| new_rule.get_src() == rule.get_src(), |
| new_rule.get_dst() == rule.get_dst(), |
| new_rule.target.parameters == rule.target.parameters, |
| new_rule.target.name == rule.target.name |
| )) |
| if position == 0: |
| if not match: |
| logInfo(f'Added POSTROUTING rule for source network {new_rule.src} to SNAT target {snat_target}') |
| chain.insert_rule(new_rule) |
| else: |
| if match: |
| logInfo(f'Remove rule for source network {new_rule.src} to SNAT target {snat_target} from POSTROUTING chain at position {position}') |
| chain.delete_rule(rule) |
| table.commit() |
| table.autocommit = True |
| except: |
| print('Error running SNAT4, retrying...') |
| |
| def autopurge(): |
| while not quit_now: |
| time.sleep(10) |
| refreshF2boptions() |
| BAN_TIME = int(f2boptions['ban_time']) |
| MAX_ATTEMPTS = int(f2boptions['max_attempts']) |
| QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN') |
| if QUEUE_UNBAN: |
| for net in QUEUE_UNBAN: |
| unban(str(net)) |
| for net in bans.copy(): |
| if bans[net]['attempts'] >= MAX_ATTEMPTS: |
| if time.time() - bans[net]['last_attempt'] > BAN_TIME: |
| unban(net) |
| |
| def isIpNetwork(address): |
| try: |
| ipaddress.ip_network(address, False) |
| except ValueError: |
| return False |
| return True |
| |
| |
| def genNetworkList(list): |
| resolver = dns.resolver.Resolver() |
| hostnames = [] |
| networks = [] |
| for key in list: |
| if isIpNetwork(key): |
| networks.append(key) |
| else: |
| hostnames.append(key) |
| for hostname in hostnames: |
| hostname_ips = [] |
| for rdtype in ['A', 'AAAA']: |
| try: |
| answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3) |
| except dns.exception.Timeout: |
| logInfo('Hostname %s timedout on resolve' % hostname) |
| break |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): |
| continue |
| except dns.exception.DNSException as dnsexception: |
| logInfo('%s' % dnsexception) |
| continue |
| for rdata in answer: |
| hostname_ips.append(rdata.to_text()) |
| networks.extend(hostname_ips) |
| return set(networks) |
| |
| def whitelistUpdate(): |
| global lock |
| global quit_now |
| global WHITELIST |
| while not quit_now: |
| start_time = time.time() |
| list = r.hgetall('F2B_WHITELIST') |
| new_whitelist = [] |
| if list: |
| new_whitelist = genNetworkList(list) |
| with lock: |
| if Counter(new_whitelist) != Counter(WHITELIST): |
| WHITELIST = new_whitelist |
| logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST)) |
| time.sleep(60.0 - ((time.time() - start_time) % 60.0)) |
| |
| def blacklistUpdate(): |
| global quit_now |
| global BLACKLIST |
| while not quit_now: |
| start_time = time.time() |
| list = r.hgetall('F2B_BLACKLIST') |
| new_blacklist = [] |
| if list: |
| new_blacklist = genNetworkList(list) |
| if Counter(new_blacklist) != Counter(BLACKLIST): |
| addban = set(new_blacklist).difference(BLACKLIST) |
| delban = set(BLACKLIST).difference(new_blacklist) |
| BLACKLIST = new_blacklist |
| logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST)) |
| if addban: |
| for net in addban: |
| permBan(net=net) |
| if delban: |
| for net in delban: |
| permBan(net=net, unban=True) |
| time.sleep(60.0 - ((time.time() - start_time) % 60.0)) |
| |
| def initChain(): |
| # Is called before threads start, no locking |
| print("Initializing mailcow netfilter chain") |
| # IPv4 |
| if not iptc.Chain(iptc.Table(iptc.Table.FILTER), "MAILCOW") in iptc.Table(iptc.Table.FILTER).chains: |
| iptc.Table(iptc.Table.FILTER).create_chain("MAILCOW") |
| for c in ['FORWARD', 'INPUT']: |
| chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), c) |
| rule = iptc.Rule() |
| rule.src = '0.0.0.0/0' |
| rule.dst = '0.0.0.0/0' |
| target = iptc.Target(rule, "MAILCOW") |
| rule.target = target |
| if rule not in chain.rules: |
| chain.insert_rule(rule) |
| |
| if __name__ == '__main__': |
| |
| # In case a previous session was killed without cleanup |
| clear() |
| # Reinit MAILCOW chain |
| initChain() |
| |
| watch_thread = Thread(target=watch) |
| watch_thread.daemon = True |
| watch_thread.start() |
| |
| if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n': |
| try: |
| snat_ip = os.getenv('SNAT_TO_SOURCE') |
| snat_ipo = ipaddress.ip_address(snat_ip) |
| if type(snat_ipo) is ipaddress.IPv4Address: |
| snat4_thread = Thread(target=snat4,args=(snat_ip,)) |
| snat4_thread.daemon = True |
| snat4_thread.start() |
| except ValueError: |
| print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address') |
| |
| autopurge_thread = Thread(target=autopurge) |
| autopurge_thread.daemon = True |
| autopurge_thread.start() |
| |
| mailcowchainwatch_thread = Thread(target=mailcowChainOrder) |
| mailcowchainwatch_thread.daemon = True |
| mailcowchainwatch_thread.start() |
| |
| blacklistupdate_thread = Thread(target=blacklistUpdate) |
| blacklistupdate_thread.daemon = True |
| blacklistupdate_thread.start() |
| |
| whitelistupdate_thread = Thread(target=whitelistUpdate) |
| whitelistupdate_thread.daemon = True |
| whitelistupdate_thread.start() |
| |
| signal.signal(signal.SIGTERM, quit) |
| atexit.register(clear) |
| |
| while not quit_now: |
| time.sleep(0.5) |
| |
| sys.exit(exit_code) |