blob: bf3980afefa9e44a4a84b7ee2ef61882eca83b61 [file] [log] [blame]
Matthias Andreas Benkard8bb60d02021-08-16 10:41:15 +02001#!/usr/bin/env python3
2
3import re
4import os
5import time
6import atexit
7import signal
8import ipaddress
9from collections import Counter
10from random import randint
11from threading import Thread
12from threading import Lock
13import redis
14import json
15import iptc
16import dns.resolver
17import dns.exception
18
19while True:
20 try:
21 redis_slaveof_ip = os.getenv('REDIS_SLAVEOF_IP', '')
22 redis_slaveof_port = os.getenv('REDIS_SLAVEOF_PORT', '')
23 if "".__eq__(redis_slaveof_ip):
24 r = redis.StrictRedis(host=os.getenv('IPV4_NETWORK', '172.22.1') + '.249', decode_responses=True, port=6379, db=0)
25 else:
26 r = redis.StrictRedis(host=redis_slaveof_ip, decode_responses=True, port=redis_slaveof_port, db=0)
27 r.ping()
28 except Exception as ex:
29 print('%s - trying again in 3 seconds' % (ex))
30 time.sleep(3)
31 else:
32 break
33
34pubsub = r.pubsub()
35
36WHITELIST = []
37BLACKLIST= []
38
39bans = {}
40
41quit_now = False
42lock = Lock()
43
44def log(priority, message):
45 tolog = {}
46 tolog['time'] = int(round(time.time()))
47 tolog['priority'] = priority
48 tolog['message'] = message
49 r.lpush('NETFILTER_LOG', json.dumps(tolog, ensure_ascii=False))
50 print(message)
51
52def logWarn(message):
53 log('warn', message)
54
55def logCrit(message):
56 log('crit', message)
57
58def logInfo(message):
59 log('info', message)
60
61def refreshF2boptions():
62 global f2boptions
63 global quit_now
64 if not r.get('F2B_OPTIONS'):
65 f2boptions = {}
66 f2boptions['ban_time'] = int
67 f2boptions['max_attempts'] = int
68 f2boptions['retry_window'] = int
69 f2boptions['netban_ipv4'] = int
70 f2boptions['netban_ipv6'] = int
71 f2boptions['ban_time'] = r.get('F2B_BAN_TIME') or 1800
72 f2boptions['max_attempts'] = r.get('F2B_MAX_ATTEMPTS') or 10
73 f2boptions['retry_window'] = r.get('F2B_RETRY_WINDOW') or 600
74 f2boptions['netban_ipv4'] = r.get('F2B_NETBAN_IPV4') or 32
75 f2boptions['netban_ipv6'] = r.get('F2B_NETBAN_IPV6') or 128
76 r.set('F2B_OPTIONS', json.dumps(f2boptions, ensure_ascii=False))
77 else:
78 try:
79 f2boptions = {}
80 f2boptions = json.loads(r.get('F2B_OPTIONS'))
81 except ValueError:
82 print('Error loading F2B options: F2B_OPTIONS is not json')
83 quit_now = True
84
85def refreshF2bregex():
86 global f2bregex
87 global quit_now
88 if not r.get('F2B_REGEX'):
89 f2bregex = {}
90 f2bregex[1] = 'warning: .*\[([0-9a-f\.:]+)\]: SASL .+ authentication failed'
91 f2bregex[2] = '-login: Disconnected \(auth failed, .+\): user=.*, method=.+, rip=([0-9a-f\.:]+),'
92 f2bregex[3] = '-login: Aborted login \(tried to use disallowed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
93 f2bregex[4] = 'SOGo.+ Login from \'([0-9a-f\.:]+)\' for user .+ might not have worked'
94 f2bregex[5] = 'mailcow UI: Invalid password for .+ by ([0-9a-f\.:]+)'
95 f2bregex[6] = '([0-9a-f\.:]+) \"GET \/SOGo\/.* HTTP.+\" 403 .+'
96 f2bregex[7] = 'Rspamd UI: Invalid password by ([0-9a-f\.:]+)'
97 f2bregex[8] = '-login: Aborted login \(auth failed .+\): user=.+, rip=([0-9a-f\.:]+), lip.+'
98 r.set('F2B_REGEX', json.dumps(f2bregex, ensure_ascii=False))
99 else:
100 try:
101 f2bregex = {}
102 f2bregex = json.loads(r.get('F2B_REGEX'))
103 except ValueError:
104 print('Error loading F2B options: F2B_REGEX is not json')
105 quit_now = True
106
107if r.exists('F2B_LOG'):
108 r.rename('F2B_LOG', 'NETFILTER_LOG')
109
110def mailcowChainOrder():
111 global lock
112 global quit_now
113 while not quit_now:
114 time.sleep(10)
115 with lock:
116 filter4_table = iptc.Table(iptc.Table.FILTER)
117 filter4_table.refresh()
118 for f in [filter4_table]:
119 forward_chain = iptc.Chain(f, 'FORWARD')
120 input_chain = iptc.Chain(f, 'INPUT')
121 for chain in [forward_chain, input_chain]:
122 target_found = False
123 for position, item in enumerate(chain.rules):
124 if item.target.name == 'MAILCOW':
125 target_found = True
126 if position > 2:
127 logCrit('Error in %s chain order: MAILCOW on position %d, restarting container' % (chain.name, position))
128 quit_now = True
129 if not target_found:
130 logCrit('Error in %s chain: MAILCOW target not found, restarting container' % (chain.name))
131 quit_now = True
132
133def ban(address):
134 global lock
135 refreshF2boptions()
136 BAN_TIME = int(f2boptions['ban_time'])
137 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
138 RETRY_WINDOW = int(f2boptions['retry_window'])
139 NETBAN_IPV4 = '/' + str(f2boptions['netban_ipv4'])
140 NETBAN_IPV6 = '/' + str(f2boptions['netban_ipv6'])
141
142 ip = ipaddress.ip_address(address)
143 if type(ip) is ipaddress.IPv6Address and ip.ipv4_mapped:
144 ip = ip.ipv4_mapped
145 address = str(ip)
146 if ip.is_private or ip.is_loopback:
147 return
148
149 self_network = ipaddress.ip_network(address)
150
151 with lock:
152 temp_whitelist = set(WHITELIST)
153
154 if temp_whitelist:
155 for wl_key in temp_whitelist:
156 wl_net = ipaddress.ip_network(wl_key, False)
157 if wl_net.overlaps(self_network):
158 logInfo('Address %s is whitelisted by rule %s' % (self_network, wl_net))
159 return
160
161 net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
162 net = str(net)
163
164 if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
165 bans[net] = { 'attempts': 0 }
166 active_window = RETRY_WINDOW
167 else:
168 active_window = time.time() - bans[net]['last_attempt']
169
170 bans[net]['attempts'] += 1
171 bans[net]['last_attempt'] = time.time()
172
173 active_window = time.time() - bans[net]['last_attempt']
174
175 if bans[net]['attempts'] >= MAX_ATTEMPTS:
176 cur_time = int(round(time.time()))
177 logCrit('Banning %s for %d minutes' % (net, BAN_TIME / 60))
178 if type(ip) is ipaddress.IPv4Address:
179 with lock:
180 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
181 rule = iptc.Rule()
182 rule.src = net
183 target = iptc.Target(rule, "REJECT")
184 rule.target = target
185 if rule not in chain.rules:
186 chain.insert_rule(rule)
187 else:
188 pass
189 r.hset('F2B_ACTIVE_BANS', '%s' % net, cur_time + BAN_TIME)
190 else:
191 logWarn('%d more attempts in the next %d seconds until %s is banned' % (MAX_ATTEMPTS - bans[net]['attempts'], RETRY_WINDOW, net))
192
193def unban(net):
194 global lock
195 if not net in bans:
196 logInfo('%s is not banned, skipping unban and deleting from queue (if any)' % net)
197 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
198 return
199 logInfo('Unbanning %s' % net)
200 if type(ipaddress.ip_network(net)) is ipaddress.IPv4Network:
201 with lock:
202 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
203 rule = iptc.Rule()
204 rule.src = net
205 target = iptc.Target(rule, "REJECT")
206 rule.target = target
207 if rule in chain.rules:
208 chain.delete_rule(rule)
209 else:
210 pass
211 r.hdel('F2B_ACTIVE_BANS', '%s' % net)
212 r.hdel('F2B_QUEUE_UNBAN', '%s' % net)
213 if net in bans:
214 del bans[net]
215
216def permBan(net, unban=False):
217 global lock
218 if type(ipaddress.ip_network(net, strict=False)) is ipaddress.IPv4Network:
219 with lock:
220 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), 'MAILCOW')
221 rule = iptc.Rule()
222 rule.src = net
223 target = iptc.Target(rule, "REJECT")
224 rule.target = target
225 if rule not in chain.rules and not unban:
226 logCrit('Add host/network %s to blacklist' % net)
227 chain.insert_rule(rule)
228 r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
229 elif rule in chain.rules and unban:
230 logCrit('Remove host/network %s from blacklist' % net)
231 chain.delete_rule(rule)
232 r.hdel('F2B_PERM_BANS', '%s' % net)
233 else:
234 pass
235
236def quit(signum, frame):
237 global quit_now
238 quit_now = True
239
240def clear():
241 global lock
242 logInfo('Clearing all bans')
243 for net in bans.copy():
244 unban(net)
245 with lock:
246 filter4_table = iptc.Table(iptc.Table.FILTER)
247 for filter_table in [filter4_table]:
248 filter_table.autocommit = False
249 forward_chain = iptc.Chain(filter_table, "FORWARD")
250 input_chain = iptc.Chain(filter_table, "INPUT")
251 mailcow_chain = iptc.Chain(filter_table, "MAILCOW")
252 if mailcow_chain in filter_table.chains:
253 for rule in mailcow_chain.rules:
254 mailcow_chain.delete_rule(rule)
255 for rule in forward_chain.rules:
256 if rule.target.name == 'MAILCOW':
257 forward_chain.delete_rule(rule)
258 for rule in input_chain.rules:
259 if rule.target.name == 'MAILCOW':
260 input_chain.delete_rule(rule)
261 filter_table.delete_chain("MAILCOW")
262 filter_table.commit()
263 filter_table.refresh()
264 filter_table.autocommit = True
265 r.delete('F2B_ACTIVE_BANS')
266 r.delete('F2B_PERM_BANS')
267 pubsub.unsubscribe()
268
269def watch():
270 logInfo('Watching Redis channel F2B_CHANNEL')
271 pubsub.subscribe('F2B_CHANNEL')
272
273 while not quit_now:
274 for item in pubsub.listen():
275 refreshF2bregex()
276 for rule_id, rule_regex in f2bregex.items():
277 if item['data'] and item['type'] == 'message':
278 try:
279 result = re.search(rule_regex, item['data'])
280 except re.error:
281 result = False
282 if result:
283 addr = result.group(1)
284 ip = ipaddress.ip_address(addr)
285 if ip.is_private or ip.is_loopback:
286 continue
287 logWarn('%s matched rule id %s (%s)' % (addr, rule_id, item['data']))
288 ban(addr)
289
290def snat4(snat_target):
291 global lock
292 global quit_now
293
294 def get_snat4_rule():
295 rule = iptc.Rule()
296 rule.src = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24'
297 rule.dst = '!' + rule.src
298 target = rule.create_target("SNAT")
299 target.to_source = snat_target
300 return rule
301
302 while not quit_now:
303 time.sleep(10)
304 with lock:
305 try:
306 table = iptc.Table('nat')
307 table.refresh()
308 chain = iptc.Chain(table, 'POSTROUTING')
309 table.autocommit = False
310 if get_snat4_rule() not in chain.rules:
311 logCrit('Added POSTROUTING rule for source network %s to SNAT target %s' % (get_snat4_rule().src, snat_target))
312 chain.insert_rule(get_snat4_rule())
313 table.commit()
314 else:
315 for position, item in enumerate(chain.rules):
316 if item == get_snat4_rule():
317 if position != 0:
318 chain.delete_rule(get_snat4_rule())
319 table.commit()
320 table.autocommit = True
321 except:
322 print('Error running SNAT4, retrying...')
323
324def autopurge():
325 while not quit_now:
326 time.sleep(10)
327 refreshF2boptions()
328 BAN_TIME = int(f2boptions['ban_time'])
329 MAX_ATTEMPTS = int(f2boptions['max_attempts'])
330 QUEUE_UNBAN = r.hgetall('F2B_QUEUE_UNBAN')
331 if QUEUE_UNBAN:
332 for net in QUEUE_UNBAN:
333 unban(str(net))
334 for net in bans.copy():
335 if bans[net]['attempts'] >= MAX_ATTEMPTS:
336 if time.time() - bans[net]['last_attempt'] > BAN_TIME:
337 unban(net)
338
339def isIpNetwork(address):
340 try:
341 ipaddress.ip_network(address, False)
342 except ValueError:
343 return False
344 return True
345
346
347def genNetworkList(list):
348 resolver = dns.resolver.Resolver()
349 hostnames = []
350 networks = []
351 for key in list:
352 if isIpNetwork(key):
353 networks.append(key)
354 else:
355 hostnames.append(key)
356 for hostname in hostnames:
357 hostname_ips = []
358 for rdtype in ['A', 'AAAA']:
359 try:
360 answer = resolver.resolve(qname=hostname, rdtype=rdtype, lifetime=3)
361 except dns.exception.Timeout:
362 logInfo('Hostname %s timedout on resolve' % hostname)
363 break
364 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
365 continue
366 except dns.exception.DNSException as dnsexception:
367 logInfo('%s' % dnsexception)
368 continue
369 for rdata in answer:
370 hostname_ips.append(rdata.to_text())
371 networks.extend(hostname_ips)
372 return set(networks)
373
374def whitelistUpdate():
375 global lock
376 global quit_now
377 global WHITELIST
378 while not quit_now:
379 start_time = time.time()
380 list = r.hgetall('F2B_WHITELIST')
381 new_whitelist = []
382 if list:
383 new_whitelist = genNetworkList(list)
384 with lock:
385 if Counter(new_whitelist) != Counter(WHITELIST):
386 WHITELIST = new_whitelist
387 logInfo('Whitelist was changed, it has %s entries' % len(WHITELIST))
388 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
389
390def blacklistUpdate():
391 global quit_now
392 global BLACKLIST
393 while not quit_now:
394 start_time = time.time()
395 list = r.hgetall('F2B_BLACKLIST')
396 new_blacklist = []
397 if list:
398 new_blacklist = genNetworkList(list)
399 if Counter(new_blacklist) != Counter(BLACKLIST):
400 addban = set(new_blacklist).difference(BLACKLIST)
401 delban = set(BLACKLIST).difference(new_blacklist)
402 BLACKLIST = new_blacklist
403 logInfo('Blacklist was changed, it has %s entries' % len(BLACKLIST))
404 if addban:
405 for net in addban:
406 permBan(net=net)
407 if delban:
408 for net in delban:
409 permBan(net=net, unban=True)
410 time.sleep(60.0 - ((time.time() - start_time) % 60.0))
411
412def initChain():
413 # Is called before threads start, no locking
414 print("Initializing mailcow netfilter chain")
415 # IPv4
416 if not iptc.Chain(iptc.Table(iptc.Table.FILTER), "MAILCOW") in iptc.Table(iptc.Table.FILTER).chains:
417 iptc.Table(iptc.Table.FILTER).create_chain("MAILCOW")
418 for c in ['FORWARD', 'INPUT']:
419 chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), c)
420 rule = iptc.Rule()
421 rule.src = '0.0.0.0/0'
422 rule.dst = '0.0.0.0/0'
423 target = iptc.Target(rule, "MAILCOW")
424 rule.target = target
425 if rule not in chain.rules:
426 chain.insert_rule(rule)
427
428if __name__ == '__main__':
429
430 # In case a previous session was killed without cleanup
431 clear()
432 # Reinit MAILCOW chain
433 initChain()
434
435 watch_thread = Thread(target=watch)
436 watch_thread.daemon = True
437 watch_thread.start()
438
439 if os.getenv('SNAT_TO_SOURCE') and os.getenv('SNAT_TO_SOURCE') != 'n':
440 try:
441 snat_ip = os.getenv('SNAT_TO_SOURCE')
442 snat_ipo = ipaddress.ip_address(snat_ip)
443 if type(snat_ipo) is ipaddress.IPv4Address:
444 snat4_thread = Thread(target=snat4,args=(snat_ip,))
445 snat4_thread.daemon = True
446 snat4_thread.start()
447 except ValueError:
448 print(os.getenv('SNAT_TO_SOURCE') + ' is not a valid IPv4 address')
449
450 autopurge_thread = Thread(target=autopurge)
451 autopurge_thread.daemon = True
452 autopurge_thread.start()
453
454 mailcowchainwatch_thread = Thread(target=mailcowChainOrder)
455 mailcowchainwatch_thread.daemon = True
456 mailcowchainwatch_thread.start()
457
458 blacklistupdate_thread = Thread(target=blacklistUpdate)
459 blacklistupdate_thread.daemon = True
460 blacklistupdate_thread.start()
461
462 whitelistupdate_thread = Thread(target=whitelistUpdate)
463 whitelistupdate_thread.daemon = True
464 whitelistupdate_thread.start()
465
466 signal.signal(signal.SIGTERM, quit)
467 atexit.register(clear)
468
469 while not quit_now:
470 time.sleep(0.5)