blob: 4eb8e7a93784bbd44202a64ee62581d29c3171be [file] [log] [blame]
Matthias Andreas Benkard1ba53812022-12-27 17:32:58 +01001#!/usr/bin/env python3
2
3import re
4import os
5import sys
6import time
7import atexit
8import signal
9import ipaddress
10from collections import Counter
11from random import randint
12from threading import Thread
13from threading import Lock
14import redis
15import json
16import iptc
17import dns.resolver
18import dns.exception
19
20while True:
21 try:
22 redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
23 redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
24 if "".__eq__(redis_slaveof_ip):
25 r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0)
26 else:
27 r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0)
28 r.ping()
29 except Exception as ex:
30 print('%s - trying again in 3 seconds' % (ex))
31 time.sleep(3)
32 else:
33 break
34
35pubsub = r.pubsub()
36
37WHITELIST = []
38BLACKLIST= []
39
40bans = {}
41
42quit_now = False
43exit_code = 0
44lock = Lock()
45
46def log(priority, message):
47 tolog = {}
48 tolog['time'] = int(round(time.time()))
49 tolog['priority'] = priority
50 tolog['message'] = message
51 r.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False))
52 print(message)
53
54def logWarn(message):
55 log('warn', message)
56
57def logCrit(message):
58 log('crit', message)
59
60def logInfo(message):
61 log('info', message)
62
63def refreshF2boptions():
64 global f2boptions
65 global quit_now
66 global exit_code
67 if not r.get('F2B_OPTIONS'):
68 f2boptions = {}
69 f2boptions['ban_time'] = int
70 f2boptions['max_attempts'] = int
71 f2boptions['retry_window'] = int
72 f2boptions['netban_ipv4'] = int
73 f2boptions['netban_ipv6'] = int
74 f2boptions['ban_time'] = r.get('F2B_BAN_TIME') or 1800
75 f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS') or 10
76 f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW') or 600
77 f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4') or 32
78 f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6') or 128
79 r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
80 else:
81 try:
82 f2boptions = {}
83 f2boptions = json.loads(r.get('F2B_OPTIONS'))
84 except ValueError:
85 print('Error loading F2B options: F2B_OPTIONS is not json')
86 quit_now = True
87 exit_code = 2
88
89def refreshF2bregex():
90 global f2bregex
91 global quit_now
92 global exit_code
93 if not r.get('F2B_REGEX'):
94 f2bregex = {}
95 f2bregex[1] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
96 f2bregex[2] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
97 f2bregex[3] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed: (?!.*Connection lost to authentication server).+'
98 f2bregex[4] = 'warning: non-SMTP command from .*\[([0-9a-f\.:]+)]:.+'
99 f2bregex[5] = 'NOQUEUE: reject: RCPT from \[([0-9a-f\.:]+)].+Protocol error.+'
100 f2bregex[6] = '-login: Disconnected.+ \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
101 f2bregex[7] = '-login: Aborted login.+ \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
102 f2bregex[8] = '-login: Aborted login.+ \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
103 f2bregex[9] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
104 f2bregex[10] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
105 r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
106 else:
107 try:
108 f2bregex = {}
109 f2bregex = json.loads(r.get('F2B_REGEX'))
110 except ValueError:
111 print('Error loading F2B options: F2B_REGEX is not json')
112 quit_now = True
113 exit_code = 2
114
115if r.exists('F2B_LOG'):
116 r.rename('F2B_LOG', 'NETFILTER_LOG')
117
118def mailcowChainOrder():
119 global lock
120 global quit_now
121 global exit_code
122 while not quit_now:
123 time.sleep(10)
124 with lock:
125 filter4_table = iptc.Table(iptc.Table.FILTER)
126 filter4_table.refresh()
127 for f in [filter4_table]:
128 forward_chain = iptc.Chain(f, 'FORWARD')
129 input_chain = iptc.Chain(f, 'INPUT')
130 for chain in [forward_chain, input_chain]:
131 target_found = False
132 for position, item in enumerate(chain.rules):
133 if item.target.name == 'MAILCOW':
134 target_found = True
135 if position > 2:
136 logCrit('Error in %s chain order: MAILCOW on position %d, restarting container' % (chain.name, position))
137 quit_now = True
138 exit_code = 2
139 if not target_found:
140 logCrit('Error in %s chain: MAILCOW target not found, restarting container' % (chain.name))
141 quit_now = True
142 exit_code = 2
143
144def ban(address):
145 global lock
146 refreshF2boptions()
147 BAN_TIME = int(f2boptions['ban_time'])
148 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
149 RETRY_WINDOW = int(f2boptions['retry_window'])
150 NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
151 NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
152
153 ip = ipaddress.ip_address(address)
154 if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
155 ip = ip.ipv4_mapped
156 address = str(ip)
157 if ip.is_private or ip.is_loopback:
158 return
159
160 self_network = ipaddress.ip_network(address)
161
162 with lock:
163 temp_whitelist = set(WHITELIST)
164
165 if temp_whitelist:
166 for wl_key in temp_whitelist:
167 wl_net = ipaddress.ip_network(wl_key, False)
168 if wl_net.overlaps(self_network):
169 logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
170 return
171
172 net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
173 net = str(net)
174
175 if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
176 bans[net] = { 'attempts': 0 }
177 active_window = RETRY_WINDOW
178 else:
179 active_window = time.time() - bans[net]['last_attempt']
180
181 bans[net]['attempts'] += 1
182 bans[net]['last_attempt'] = time.time()
183
184 active_window = time.time() - bans[net]['last_attempt']
185
186 if bans[net]['attempts'] >= MAX_ATTEMPTS:
187 cur_time = int(round(time.time()))
188 logCrit('Banning %s for %d minutes' % (net, BAN_TIME / 60))
189 if type(ip) is ipaddress.IPv4Address:
190 with lock:
191 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
192 rule = iptc.Rule()
193 rule.src = net
194 target = iptc.Target(rule, "REJECT")
195 rule.target = target
196 if rule not in chain.rules:
197 chain.insert_rule(rule)
198 else:
199 pass
200 r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + BAN_TIME)
201 else:
202 logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
203
204def unban(net):
205 global lock
206 if not net in bans:
207 logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
208 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
209 return
210 logInfo('Unbanning %s' % net)
211 if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
212 with lock:
213 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
214 rule = iptc.Rule()
215 rule.src = net
216 target = iptc.Target(rule, "REJECT")
217 rule.target = target
218 if rule in chain.rules:
219 chain.delete_rule(rule)
220 else:
221 pass
222 r.hdel('F2B_ACTIVE_BANS', '%s' % net)
223 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
224 if net in bans:
225 del bans[net]
226
227def permBan(net, unban=False):
228 global lock
229 if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
230 with lock:
231 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
232 rule = iptc.Rule()
233 rule.src = net
234 target = iptc.Target(rule, "REJECT")
235 rule.target = target
236 if rule not in chain.rules and not unban:
237 logCrit('Add host/network %s to blacklist' % net)
238 chain.insert_rule(rule)
239 r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
240 elif rule in chain.rules and unban:
241 logCrit('Remove host/network %s from blacklist' % net)
242 chain.delete_rule(rule)
243 r.hdel('F2B_PERM_BANS', '%s' % net)
244 else:
245 pass
246
247def quit(signum, frame):
248 global quit_now
249 quit_now = True
250
251def clear():
252 global lock
253 logInfo('Clearing all bans')
254 for net in bans.copy():
255 unban(net)
256 with lock:
257 filter4_table = iptc.Table(iptc.Table.FILTER)
258 for filter_table in [filter4_table]:
259 filter_table.autocommit = False
260 forward_chain = iptc.Chain(filter_table, "FORWARD")
261 input_chain = iptc.Chain(filter_table, "INPUT")
262 mailcow_chain = iptc.Chain(filter_table, "MAILCOW")
263 if mailcow_chain in filter_table.chains:
264 for rule in mailcow_chain.rules:
265 mailcow_chain.delete_rule(rule)
266 for rule in forward_chain.rules:
267 if rule.target.name == 'MAILCOW':
268 forward_chain.delete_rule(rule)
269 for rule in input_chain.rules:
270 if rule.target.name == 'MAILCOW':
271 input_chain.delete_rule(rule)
272 filter_table.delete_chain("MAILCOW")
273 filter_table.commit()
274 filter_table.refresh()
275 filter_table.autocommit = True
276 r.delete('F2B_ACTIVE_BANS')
277 r.delete('F2B_PERM_BANS')
278 pubsub.unsubscribe()
279
280def watch():
281 logInfo('Watching Redis channel F2B_CHANNEL')
282 pubsub.subscribe('F2B_CHANNEL')
283
284 global quit_now
285 global exit_code
286
287 while not quit_now:
288 try:
289 for item in pubsub.listen():
290 refreshF2bregex()
291 for rule_id, rule_regex in f2bregex.items():
292 if item['data'] and item['type'] == 'message':
293 try:
294 result = re.search(rule_regex, item['data'])
295 except re.error:
296 result = False
297 if result:
298 addr = result.group(1)
299 ip = ipaddress.ip_address(addr)
300 if ip.is_private or ip.is_loopback:
301 continue
302 logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
303 ban(addr)
304 except Exception as ex:
305 logWarn('Error reading log line from pubsub')
306 quit_now = True
307 exit_code = 2
308
309def snat4(snat_target):
310 global lock
311 global quit_now
312
313 def get_snat4_rule():
314 rule = iptc.Rule()
315 rule.src = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24'
316 rule.dst = '!' + rule.src
317 target = rule.create_target("SNAT")
318 target.to_source = snat_target
319 match = rule.create_match("comment")
320 match.comment = f'{int(round(time.time()))}'
321 return rule
322
323 while not quit_now:
324 time.sleep(10)
325 with lock:
326 try:
327 table = iptc.Table('nat')
328 table.refresh()
329 chain = iptc.Chain(table, 'POSTROUTING')
330 table.autocommit = False
331 new_rule = get_snat4_rule()
332 for position, rule in enumerate(chain.rules):
333 match = all((
334 new_rule.get_src() == rule.get_src(),
335 new_rule.get_dst() == rule.get_dst(),
336 new_rule.target.parameters == rule.target.parameters,
337 new_rule.target.name == rule.target.name
338 ))
339 if position == 0:
340 if not match:
341 logInfo(f'Added POSTROUTING rule for source network {new_rule.src} to SNAT target {snat_target}')
342 chain.insert_rule(new_rule)
343 else:
344 if match:
345 logInfo(f'Remove rule for source network {new_rule.src} to SNAT target {snat_target} from POSTROUTING chain at position {position}')
346 chain.delete_rule(rule)
347 table.commit()
348 table.autocommit = True
349 except:
350 print('Error running SNAT4, retrying...')
351
352def autopurge():
353 while not quit_now:
354 time.sleep(10)
355 refreshF2boptions()
356 BAN_TIME = int(f2boptions['ban_time'])
357 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
358 QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
359 if QUEUE_UNBAN:
360 for net in QUEUE_UNBAN:
361 unban(str(net))
362 for net in bans.copy():
363 if bans[net]['attempts'] >= MAX_ATTEMPTS:
364 if time.time() - bans[net]['last_attempt'] > BAN_TIME:
365 unban(net)
366
367def isIpNetwork(address):
368 try:
369 ipaddress.ip_network(address, False)
370 except ValueError:
371 return False
372 return True
373
374
375def genNetworkList(list):
376 resolver = dns.resolver.Resolver()
377 hostnames = []
378 networks = []
379 for key in list:
380 if isIpNetwork(key):
381 networks.append(key)
382 else:
383 hostnames.append(key)
384 for hostname in hostnames:
385 hostname_ips = []
386 for rdtype in ['A', 'AAAA']:
387 try:
388 answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
389 except dns.exception.Timeout:
390 logInfo('Hostname %s timedout on resolve' % hostname)
391 break
392 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
393 continue
394 except dns.exception.DNSException as dnsexception:
395 logInfo('%s' % dnsexception)
396 continue
397 for rdata in answer:
398 hostname_ips.append(rdata.to_text())
399 networks.extend(hostname_ips)
400 return set(networks)
401
402def whitelistUpdate():
403 global lock
404 global quit_now
405 global WHITELIST
406 while not quit_now:
407 start_time = time.time()
408 list = r.hgetall('F2B_WHITELIST')
409 new_whitelist = []
410 if list:
411 new_whitelist = genNetworkList(list)
412 with lock:
413 if Counter(new_whitelist) != Counter(WHITELIST):
414 WHITELIST = new_whitelist
415 logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
416 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
417
418def blacklistUpdate():
419 global quit_now
420 global BLACKLIST
421 while not quit_now:
422 start_time = time.time()
423 list = r.hgetall('F2B_BLACKLIST')
424 new_blacklist = []
425 if list:
426 new_blacklist = genNetworkList(list)
427 if Counter(new_blacklist) != Counter(BLACKLIST):
428 addban = set(new_blacklist).difference(BLACKLIST)
429 delban = set(BLACKLIST).difference(new_blacklist)
430 BLACKLIST = new_blacklist
431 logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
432 if addban:
433 for net in addban:
434 permBan(net=net)
435 if delban:
436 for net in delban:
437 permBan(net=net, unban=True)
438 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
439
440def initChain():
441 # Is called before threads start, no locking
442 print("Initializing mailcow netfilter chain")
443 # IPv4
444 if not iptc.Chain(iptc.Table(iptc.Table.FILTER), "MAILCOW") in iptc.Table(iptc.Table.FILTER).chains:
445 iptc.Table(iptc.Table.FILTER).create_chain("MAILCOW")
446 for c in ['FORWARD', 'INPUT']:
447 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), c)
448 rule = iptc.Rule()
449 rule.src = '0.0.0.0/0'
450 rule.dst = '0.0.0.0/0'
451 target = iptc.Target(rule, "MAILCOW")
452 rule.target = target
453 if rule not in chain.rules:
454 chain.insert_rule(rule)
455
456if __name__ == '__main__':
457
458 # In case a previous session was killed without cleanup
459 clear()
460 # Reinit MAILCOW chain
461 initChain()
462
463 watch_thread = Thread(target=watch)
464 watch_thread.daemon = True
465 watch_thread.start()
466
467 if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
468 try:
469 snat_ip = os.getenv('SNAT_TO_SOURCE')
470 snat_ipo = ipaddress.ip_address(snat_ip)
471 if type(snat_ipo) is ipaddress.IPv4Address:
472 snat4_thread = Thread(target=snat4,args=(snat_ip,))
473 snat4_thread.daemon = True
474 snat4_thread.start()
475 except ValueError:
476 print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
477
478 autopurge_thread = Thread(target=autopurge)
479 autopurge_thread.daemon = True
480 autopurge_thread.start()
481
482 mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
483 mailcowchainwatch_thread.daemon = True
484 mailcowchainwatch_thread.start()
485
486 blacklistupdate_thread = Thread(target=blacklistUpdate)
487 blacklistupdate_thread.daemon = True
488 blacklistupdate_thread.start()
489
490 whitelistupdate_thread = Thread(target=whitelistUpdate)
491 whitelistupdate_thread.daemon = True
492 whitelistupdate_thread.start()
493
494 signal.signal(signal.SIGTERM, quit)
495 atexit.register(clear)
496
497 while not quit_now:
498 time.sleep(0.5)
499
500 sys.exit(exit_code)