blob: 04f6c47d2bd0341356f4e5b53bdbfe9dc262f0fa [file] [log] [blame]
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +02001#!/usr/bin/env python3
2
3import re
4import os
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +02005import sys
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +02006import time
7import atexit
8import signal
9import ipaddress
10from collections import Counter
11from random import randint
12from threading import Thread
13from threading import Lock
14import redis
15import json
16import iptc
17import dns.resolver
18import dns.exception
19
20while True:
21 try:
22 redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
23 redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
24 if "".__eq__(redis_slaveof_ip):
25 r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0)
26 else:
27 r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0)
28 r.ping()
29 except Exception as ex:
30 print('%s - trying again in 3 seconds' % (ex))
31 time.sleep(3)
32 else:
33 break
34
35pubsub = r.pubsub()
36
37WHITELIST = []
38BLACKLIST= []
39
40bans = {}
41
42quit_now = False
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +020043exit_code = 0
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +020044lock = Lock()
45
46def log(priority, message):
47 tolog = {}
48 tolog['time'] = int(round(time.time()))
49 tolog['priority'] = priority
50 tolog['message'] = message
51 r.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False))
52 print(message)
53
54def logWarn(message):
55 log('warn', message)
56
57def logCrit(message):
58 log('crit', message)
59
60def logInfo(message):
61 log('info', message)
62
63def refreshF2boptions():
64 global f2boptions
65 global quit_now
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +020066 global exit_code
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +020067 if not r.get('F2B_OPTIONS'):
68 f2boptions = {}
69 f2boptions['ban_time'] = int
70 f2boptions['max_attempts'] = int
71 f2boptions['retry_window'] = int
72 f2boptions['netban_ipv4'] = int
73 f2boptions['netban_ipv6'] = int
74 f2boptions['ban_time'] = r.get('F2B_BAN_TIME') or 1800
75 f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS') or 10
76 f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW') or 600
77 f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4') or 32
78 f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6') or 128
79 r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
80 else:
81 try:
82 f2boptions = {}
83 f2boptions = json.loads(r.get('F2B_OPTIONS'))
84 except ValueError:
85 print('Error loading F2B options: F2B_OPTIONS is not json')
86 quit_now = True
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +020087 exit_code = 2
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +020088
89def refreshF2bregex():
90 global f2bregex
91 global quit_now
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +020092 global exit_code
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +020093 if not r.get('F2B_REGEX'):
94 f2bregex = {}
95 f2bregex[1] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed'
96 f2bregex[2] = '-login: Disconnected \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
97 f2bregex[3] = '-login: Aborted login \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
98 f2bregex[4] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
99 f2bregex[5] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
100 f2bregex[6] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
101 f2bregex[7] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
102 f2bregex[8] = '-login: Aborted login \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200103 f2bregex[9] = 'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200104 r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
105 else:
106 try:
107 f2bregex = {}
108 f2bregex = json.loads(r.get('F2B_REGEX'))
109 except ValueError:
110 print('Error loading F2B options: F2B_REGEX is not json')
111 quit_now = True
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200112 exit_code = 2
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200113
114if r.exists('F2B_LOG'):
115 r.rename('F2B_LOG', 'NETFILTER_LOG')
116
117def mailcowChainOrder():
118 global lock
119 global quit_now
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200120 global exit_code
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200121 while not quit_now:
122 time.sleep(10)
123 with lock:
124 filter4_table = iptc.Table(iptc.Table.FILTER)
125 filter4_table.refresh()
126 for f in [filter4_table]:
127 forward_chain = iptc.Chain(f, 'FORWARD')
128 input_chain = iptc.Chain(f, 'INPUT')
129 for chain in [forward_chain, input_chain]:
130 target_found = False
131 for position, item in enumerate(chain.rules):
132 if item.target.name == 'MAILCOW':
133 target_found = True
134 if position > 2:
135 logCrit('Error in %s chain order: MAILCOW on position %d, restarting container' % (chain.name, position))
136 quit_now = True
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200137 exit_code = 2
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200138 if not target_found:
139 logCrit('Error in %s chain: MAILCOW target not found, restarting container' % (chain.name))
140 quit_now = True
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200141 exit_code = 2
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200142
143def ban(address):
144 global lock
145 refreshF2boptions()
146 BAN_TIME = int(f2boptions['ban_time'])
147 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
148 RETRY_WINDOW = int(f2boptions['retry_window'])
149 NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
150 NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
151
152 ip = ipaddress.ip_address(address)
153 if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
154 ip = ip.ipv4_mapped
155 address = str(ip)
156 if ip.is_private or ip.is_loopback:
157 return
158
159 self_network = ipaddress.ip_network(address)
160
161 with lock:
162 temp_whitelist = set(WHITELIST)
163
164 if temp_whitelist:
165 for wl_key in temp_whitelist:
166 wl_net = ipaddress.ip_network(wl_key, False)
167 if wl_net.overlaps(self_network):
168 logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
169 return
170
171 net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
172 net = str(net)
173
174 if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
175 bans[net] = { 'attempts': 0 }
176 active_window = RETRY_WINDOW
177 else:
178 active_window = time.time() - bans[net]['last_attempt']
179
180 bans[net]['attempts'] += 1
181 bans[net]['last_attempt'] = time.time()
182
183 active_window = time.time() - bans[net]['last_attempt']
184
185 if bans[net]['attempts'] >= MAX_ATTEMPTS:
186 cur_time = int(round(time.time()))
187 logCrit('Banning %s for %d minutes' % (net, BAN_TIME / 60))
188 if type(ip) is ipaddress.IPv4Address:
189 with lock:
190 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
191 rule = iptc.Rule()
192 rule.src = net
193 target = iptc.Target(rule, "REJECT")
194 rule.target = target
195 if rule not in chain.rules:
196 chain.insert_rule(rule)
197 else:
198 pass
199 r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + BAN_TIME)
200 else:
201 logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
202
203def unban(net):
204 global lock
205 if not net in bans:
206 logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
207 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
208 return
209 logInfo('Unbanning %s' % net)
210 if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
211 with lock:
212 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
213 rule = iptc.Rule()
214 rule.src = net
215 target = iptc.Target(rule, "REJECT")
216 rule.target = target
217 if rule in chain.rules:
218 chain.delete_rule(rule)
219 else:
220 pass
221 r.hdel('F2B_ACTIVE_BANS', '%s' % net)
222 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
223 if net in bans:
224 del bans[net]
225
226def permBan(net, unban=False):
227 global lock
228 if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
229 with lock:
230 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
231 rule = iptc.Rule()
232 rule.src = net
233 target = iptc.Target(rule, "REJECT")
234 rule.target = target
235 if rule not in chain.rules and not unban:
236 logCrit('Add host/network %s to blacklist' % net)
237 chain.insert_rule(rule)
238 r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
239 elif rule in chain.rules and unban:
240 logCrit('Remove host/network %s from blacklist' % net)
241 chain.delete_rule(rule)
242 r.hdel('F2B_PERM_BANS', '%s' % net)
243 else:
244 pass
245
246def quit(signum, frame):
247 global quit_now
248 quit_now = True
249
250def clear():
251 global lock
252 logInfo('Clearing all bans')
253 for net in bans.copy():
254 unban(net)
255 with lock:
256 filter4_table = iptc.Table(iptc.Table.FILTER)
257 for filter_table in [filter4_table]:
258 filter_table.autocommit = False
259 forward_chain = iptc.Chain(filter_table, "FORWARD")
260 input_chain = iptc.Chain(filter_table, "INPUT")
261 mailcow_chain = iptc.Chain(filter_table, "MAILCOW")
262 if mailcow_chain in filter_table.chains:
263 for rule in mailcow_chain.rules:
264 mailcow_chain.delete_rule(rule)
265 for rule in forward_chain.rules:
266 if rule.target.name == 'MAILCOW':
267 forward_chain.delete_rule(rule)
268 for rule in input_chain.rules:
269 if rule.target.name == 'MAILCOW':
270 input_chain.delete_rule(rule)
271 filter_table.delete_chain("MAILCOW")
272 filter_table.commit()
273 filter_table.refresh()
274 filter_table.autocommit = True
275 r.delete('F2B_ACTIVE_BANS')
276 r.delete('F2B_PERM_BANS')
277 pubsub.unsubscribe()
278
279def watch():
280 logInfo('Watching Redis channel F2B_CHANNEL')
281 pubsub.subscribe('F2B_CHANNEL')
282
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200283 global quit_now
284 global exit_code
285
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200286 while not quit_now:
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200287 try:
288 for item in pubsub.listen():
289 refreshF2bregex()
290 for rule_id, rule_regex in f2bregex.items():
291 if item['data'] and item['type'] == 'message':
292 try:
293 result = re.search(rule_regex, item['data'])
294 except re.error:
295 result = False
296 if result:
297 addr = result.group(1)
298 ip = ipaddress.ip_address(addr)
299 if ip.is_private or ip.is_loopback:
300 continue
301 logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
302 ban(addr)
303 except Exception as ex:
304 logWarn('Error reading log line from pubsub')
305 quit_now = True
306 exit_code = 2
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +0200307
308def snat4(snat_target):
309 global lock
310 global quit_now
311
312 def get_snat4_rule():
313 rule = iptc.Rule()
314 rule.src = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24'
315 rule.dst = '!' + rule.src
316 target = rule.create_target("SNAT")
317 target.to_source = snat_target
318 return rule
319
320 while not quit_now:
321 time.sleep(10)
322 with lock:
323 try:
324 table = iptc.Table('nat')
325 table.refresh()
326 chain = iptc.Chain(table, 'POSTROUTING')
327 table.autocommit = False
328 if get_snat4_rule() not in chain.rules:
329 logCrit('Added POSTROUTING rule for source network %s to SNAT target %s' % (get_snat4_rule().src, snat_target))
330 chain.insert_rule(get_snat4_rule())
331 table.commit()
332 else:
333 for position, item in enumerate(chain.rules):
334 if item == get_snat4_rule():
335 if position != 0:
336 chain.delete_rule(get_snat4_rule())
337 table.commit()
338 table.autocommit = True
339 except:
340 print('Error running SNAT4, retrying...')
341
342def autopurge():
343 while not quit_now:
344 time.sleep(10)
345 refreshF2boptions()
346 BAN_TIME = int(f2boptions['ban_time'])
347 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
348 QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
349 if QUEUE_UNBAN:
350 for net in QUEUE_UNBAN:
351 unban(str(net))
352 for net in bans.copy():
353 if bans[net]['attempts'] >= MAX_ATTEMPTS:
354 if time.time() - bans[net]['last_attempt'] > BAN_TIME:
355 unban(net)
356
357def isIpNetwork(address):
358 try:
359 ipaddress.ip_network(address, False)
360 except ValueError:
361 return False
362 return True
363
364
365def genNetworkList(list):
366 resolver = dns.resolver.Resolver()
367 hostnames = []
368 networks = []
369 for key in list:
370 if isIpNetwork(key):
371 networks.append(key)
372 else:
373 hostnames.append(key)
374 for hostname in hostnames:
375 hostname_ips = []
376 for rdtype in ['A', 'AAAA']:
377 try:
378 answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
379 except dns.exception.Timeout:
380 logInfo('Hostname %s timedout on resolve' % hostname)
381 break
382 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
383 continue
384 except dns.exception.DNSException as dnsexception:
385 logInfo('%s' % dnsexception)
386 continue
387 for rdata in answer:
388 hostname_ips.append(rdata.to_text())
389 networks.extend(hostname_ips)
390 return set(networks)
391
392def whitelistUpdate():
393 global lock
394 global quit_now
395 global WHITELIST
396 while not quit_now:
397 start_time = time.time()
398 list = r.hgetall('F2B_WHITELIST')
399 new_whitelist = []
400 if list:
401 new_whitelist = genNetworkList(list)
402 with lock:
403 if Counter(new_whitelist) != Counter(WHITELIST):
404 WHITELIST = new_whitelist
405 logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
406 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
407
408def blacklistUpdate():
409 global quit_now
410 global BLACKLIST
411 while not quit_now:
412 start_time = time.time()
413 list = r.hgetall('F2B_BLACKLIST')
414 new_blacklist = []
415 if list:
416 new_blacklist = genNetworkList(list)
417 if Counter(new_blacklist) != Counter(BLACKLIST):
418 addban = set(new_blacklist).difference(BLACKLIST)
419 delban = set(BLACKLIST).difference(new_blacklist)
420 BLACKLIST = new_blacklist
421 logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
422 if addban:
423 for net in addban:
424 permBan(net=net)
425 if delban:
426 for net in delban:
427 permBan(net=net, unban=True)
428 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
429
430def initChain():
431 # Is called before threads start, no locking
432 print("Initializing mailcow netfilter chain")
433 # IPv4
434 if not iptc.Chain(iptc.Table(iptc.Table.FILTER), "MAILCOW") in iptc.Table(iptc.Table.FILTER).chains:
435 iptc.Table(iptc.Table.FILTER).create_chain("MAILCOW")
436 for c in ['FORWARD', 'INPUT']:
437 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), c)
438 rule = iptc.Rule()
439 rule.src = '0.0.0.0/0'
440 rule.dst = '0.0.0.0/0'
441 target = iptc.Target(rule, "MAILCOW")
442 rule.target = target
443 if rule not in chain.rules:
444 chain.insert_rule(rule)
445
446if __name__ == '__main__':
447
448 # In case a previous session was killed without cleanup
449 clear()
450 # Reinit MAILCOW chain
451 initChain()
452
453 watch_thread = Thread(target=watch)
454 watch_thread.daemon = True
455 watch_thread.start()
456
457 if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
458 try:
459 snat_ip = os.getenv('SNAT_TO_SOURCE')
460 snat_ipo = ipaddress.ip_address(snat_ip)
461 if type(snat_ipo) is ipaddress.IPv4Address:
462 snat4_thread = Thread(target=snat4,args=(snat_ip,))
463 snat4_thread.daemon = True
464 snat4_thread.start()
465 except ValueError:
466 print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
467
468 autopurge_thread = Thread(target=autopurge)
469 autopurge_thread.daemon = True
470 autopurge_thread.start()
471
472 mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
473 mailcowchainwatch_thread.daemon = True
474 mailcowchainwatch_thread.start()
475
476 blacklistupdate_thread = Thread(target=blacklistUpdate)
477 blacklistupdate_thread.daemon = True
478 blacklistupdate_thread.start()
479
480 whitelistupdate_thread = Thread(target=whitelistUpdate)
481 whitelistupdate_thread.daemon = True
482 whitelistupdate_thread.start()
483
484 signal.signal(signal.SIGTERM, quit)
485 atexit.register(clear)
486
487 while not quit_now:
488 time.sleep(0.5)
Matthias Andreas Benkard7b2a3a12021-08-16 10:57:25 +0200489
490 sys.exit(exit_code)