diff -uNr a/blatta/lib/infosec.py b/blatta/lib/infosec.py --- a/blatta/lib/infosec.py 25a41366c0d57c2fa6eaaba61e0bdb07e77b9f48d09fb7fea1d65296fcc68c8b8836f954b13de8ee071061a6f49bf8db89e958ec7b961ac974926f5163694c21 +++ b/blatta/lib/infosec.py 2bebeb6ee55f0941c567e114ee01352f18f96d02ac4e2f037b9ced7e71c219cc2ed51ac615d28e22244c5e07f136ec7ce1fb565c7217c62de4708aee4d8b05d5 @@ -37,7 +37,7 @@ def __init__(self, server=None): self.server = server - def pack(self, peer, message): + def get_message_bytes(self, message, peer=None): try: timestamp = message.timestamp except: @@ -52,14 +52,9 @@ else: int_ts = timestamp - key_bytes = base64.b64decode(peer.get_key()) - signing_key = key_bytes[:32] - cipher_key = key_bytes[32:] - # let's generate the self_chain value from the last message or set it to zero if # there this is the first message - if message.original: if command == DIRECT: self_chain = self.server.state.get_last_message_hash(message.speaker, peer.peer_id) @@ -75,21 +70,21 @@ # pack message bytes message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, int_ts, self_chain, net_chain, speaker, message.body) + return message_bytes - # log messages + def pack(self, peer, message): + key_bytes = base64.b64decode(peer.get_key()) + signing_key = key_bytes[:32] + cipher_key = key_bytes[32:] - if message.original: - if command == DIRECT: - self.server.state.log(message.speaker, message_bytes, peer.peer_id) - elif command == BROADCAST: - self.server.state.log(message.speaker, message_bytes) + message_bytes = self.get_message_bytes(message, peer) # pack packet bytes nonce = self._generate_nonce(16) bounces = message.bounces version = 0xfe - red_packet_bytes = struct.pack(RED_PACKET_FORMAT, nonce, bounces, version, command, self._pad(message_bytes, MAX_MESSAGE_LENGTH)) + red_packet_bytes = struct.pack(RED_PACKET_FORMAT, nonce, bounces, version, message.command, self._pad(message_bytes, MAX_MESSAGE_LENGTH)) # encrypt packet @@ -104,9 +99,6 @@ signed_packet_bytes = struct.pack(BLACK_PACKET_FORMAT, black_packet_bytes, signature_bytes) - # we want to ignore this ts if it is sent back to us - - self.server.recent.insert(int_ts) return signed_packet_bytes def unpack(self, peer, black_packet): @@ -148,16 +140,18 @@ if command == IGNORE: return Message({"speaker": speaker, "error_code": IGNORED}) - # check timestamp if(int_ts not in self._ts_range()): return Message({ "error_code": STALE_PACKET }, self.server) - if(self.server.recent.has(int_ts)): + # check for duplicates + + message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) + if(self.server.state.is_duplicate_message(message_hash)): return Message({ "error_code": DUPLICATE_PACKET }, self.server) else: - self.server.recent.insert(int_ts) + self.server.state.add_to_dedup_queue(message_hash) # check self_chain diff -uNr a/blatta/lib/peer.py b/blatta/lib/peer.py --- a/blatta/lib/peer.py 42717c9441e8b3e62b53ca92879cbb9929ee1625046c622acdfe7063f52941cf9dc40b7448726691a5946acf6250e54e79e3156a509cc0507ee374a9c214ef27 +++ b/blatta/lib/peer.py e763bb836eba69aedebd4d4adfdd8820e1a173f16e1fd493ddd16bf6d41718155c20ee38073570d2aeb0144ea281b6fbcc5ddde48d5ca60cb01a0b08103dd1f5 @@ -24,9 +24,9 @@ def send(self, msg): try: - signed_packet_bytes = self.infosec.pack(self, msg) if msg.command != IGNORE: self.server.print_debug("packing message: %s" % msg.body) + signed_packet_bytes = self.infosec.pack(self, msg) self.socket.sendto(signed_packet_bytes, (self.address, self.port)) self.server.print_debug("[%s:%d] <- %s" % (self.address, self.port, diff -uNr a/blatta/lib/ringbuffer.py b/blatta/lib/ringbuffer.py --- a/blatta/lib/ringbuffer.py 9b025befc73016d689ba87a93add4f57dc2e08b9825e29e8e6cc57d4e0a6983ccf0affe9449186d7742f60e3f88eab24af883cf12928ea828d7dcfd1db2db49c +++ b/blatta/lib/ringbuffer.py false @@ -1,9 +0,0 @@ -class Ringbuffer(object): - def __init__(self, size): - self.data = [False]*size - self._cursor = 0 - def insert(self, item): - self.data[self._cursor] = item - self._cursor = (self._cursor + 1) % len(self.data) - def has(self, item): - return item in self.data diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py f36bf2d21b07d0a2e017b7c0a1f7c8784ded0a4539f2e6cb9dac2b834f2909686b0ab3cfa28a64b35db83c16018b049f99da69b4eeb61ef238c5fe90555927e3 +++ b/blatta/lib/server.py 16e7971b6eab7483a4060d5cae5111dec2f61618a2022620343ef7aa3fcedee87cc6499c9f9978215c315fde958e70fa7810f50967e97dd299cd98842118c12d @@ -1,4 +1,4 @@ -VERSION = "9989" +VERSION = "9988" import os import select @@ -9,6 +9,7 @@ import time import string import binascii +import hashlib import datetime from datetime import datetime from lib.client import Client @@ -24,7 +25,6 @@ from lib.infosec import Infosec from lib.peer import Peer from lib.message import Message -from lib.ringbuffer import Ringbuffer from funcs import * from commands import BROADCAST from commands import DIRECT @@ -63,7 +63,6 @@ self.channels = {} # irc_lower(Channel name) --> Channel instance. self.clients = {} # Socket --> Client instance..peers = "" self.nicknames = {} # irc_lower(Nickname) --> Client instance. - self.recent = Ringbuffer(100) if self.logdir: create_directory(self.logdir) if self.statedir: @@ -207,11 +206,22 @@ message.original = True if message.command == DIRECT: peer = self.state.get_peer_by_handle(message.handle) + message_bytes = self.infosec.get_message_bytes(message, peer) + message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) + self.state.add_to_dedup_queue(message_hash) + + self.state.log(message.speaker, message_bytes, peer.peer_id) if peer and (peer.get_key() != None): peer.send(message) else: self.print_debug("Discarding message to unknown handle or handle with no key: %s" % message.handle) else: + message.timestamp = int(time.time()) + message_bytes = self.infosec.get_message_bytes(message) + if message.command != IGNORE: + self.state.log(message.speaker, message_bytes) + message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) + self.state.add_to_dedup_queue(message_hash) for peer in self.state.get_peers(): if peer.get_key() != None: peer.send(message) @@ -228,14 +238,13 @@ def sendrubbish(self): - for peer in self.state.get_peers(): - for socket in self.clients: - self.peer_message(Message({ - "speaker": self.clients[socket].nickname, - "command": IGNORE, - "bounces": 0, - "body": self.infosec.gen_rubbish_body() - }, self)) + for socket in self.clients: + self.peer_message(Message({ + "speaker": self.clients[socket].nickname, + "command": IGNORE, + "bounces": 0, + "body": self.infosec.gen_rubbish_body() + }, self)) def start(self): # Setup UDP first diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py cf7a79184dc4f229361a26a8db0901bbcc915ec3c46c7d8a41177a5dda82e50aaca1ab94c324239251ec1fc148b6c14cb9fb967d4eb08223fa6679d15bac599a +++ b/blatta/lib/state.py acd5eaffdba356d5b2b2e0ce494e3be8aed35ccf0b96f9605bfd73fd3f758286f1908d043274a5480ac02c2d270550e1b061b32c0856e521a4eaba2f9f6b29f3 @@ -35,6 +35,10 @@ message_bytes blob not null,\ created_at datetime default current_timestamp)") + self.cursor.execute("create table if not exists dedup_queue(\ + hash text not null,\ + created_at datetime default current_timestamp)") + def get_at(self, handle=None): at = [] if handle == None: @@ -60,6 +64,22 @@ return at + def is_duplicate_message(self, message_hash): + self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") + self.conn.commit() + result = self.cursor.execute("select hash from dedup_queue where hash=?", + (message_hash,)).fetchone() + if(result != None): + return True + else: + return False + + def add_to_dedup_queue(self, message_hash): + self.cursor.execute("insert into dedup_queue(hash)\ + values(?)", + (message_hash,)) + self.conn.commit() + def get_last_message_hash(self, handle, peer_id=None): if peer_id: message_bytes = self.cursor.execute("select message_bytes from logs\