From: Neil Smith Date: Tue, 8 Nov 2016 11:33:23 +0000 (+0000) Subject: Added logging, removed additional implementation of bombe from engima.py X-Git-Url: https://git.njae.me.uk/?p=cipher-tools.git;a=commitdiff_plain;h=ea67e6df3465d064bdbcecd527ba39872bdefbf7 Added logging, removed additional implementation of bombe from engima.py --- diff --git a/bombe.py b/bombe.py index b8971a7..17d02a6 100644 --- a/bombe.py +++ b/bombe.py @@ -2,13 +2,35 @@ import string import collections import multiprocessing import itertools +import logging from enigma import * +logger = logging.getLogger('bombe') +# logger.setLevel(logging.WARNING) +# logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) + +# create the logging file handler +fh = logging.FileHandler("enigma.log") +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +fh.setFormatter(formatter) + +# add handler to logger object +logger.addHandler(fh) + +################################## +# # Bombe +################################## +# +# Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury +# + Signal = collections.namedtuple('Signal', ['bank', 'wire']) Connection = collections.namedtuple('Connection', ['banks', 'scrambler']) MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number']) + def make_menu(plaintext, ciphertext): return [MenuItem(p, c, i+1) for i, (p, c) in enumerate(zip(plaintext, ciphertext))] @@ -92,7 +114,8 @@ class Bombe(object): self.reflector_spec, wheel3_pos=unpos(item.number - 1)) self.add_connection(item.before, item.after, scrambler) - most_common_letter = (collections.Counter(m.before for m in menu) + collections.Counter(m.after for m in menu)).most_common(1)[0][0] + most_common_letter = (collections.Counter(m.before for m in menu) +\ + collections.Counter(m.after for m in menu)).most_common(1)[0][0] self.test_start = Signal(most_common_letter, most_common_letter) def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos): @@ -126,6 +149,7 @@ class Bombe(object): while self.pending: current = self.pending[0] # print("processing", current) + logger.debug("Propogater processing {}".format(current)) self.pending = self.pending[1:] if not self.banks[current.bank][current.wire]: self.banks[current.bank][current.wire] = True @@ -136,6 +160,7 @@ class Bombe(object): other_bank = [b for b in c.banks if b != current.bank][0] other_wire = c.scrambler.lookup(current.wire) # print(" adding", other_bank, other_wire, "because", c.banks) + logger.debug("Propogator adding {0} {1} because {2}".format(other_bank, other_wire, c.banks)) self.pending += [Signal(other_bank, other_wire)] def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True): diff --git a/enigma.py b/enigma.py index a0056e4..a26d1d3 100644 --- a/enigma.py +++ b/enigma.py @@ -16,6 +16,21 @@ import string import collections import multiprocessing import itertools +import logging + +logger = logging.getLogger('engima') +logger.setLevel(logging.WARNING) +# logger.setLevel(logging.INFO) +# logger.setLevel(logging.DEBUG) + +# create the logging file handler +fh = logging.FileHandler("enigma.log") +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +fh.setFormatter(formatter) + +# add handler to logger object +logger.addHandler(fh) + # Some convenience functions @@ -329,189 +344,6 @@ class Enigma(object): # print() -################################## -# # Bombe -################################## -# -# Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury -# - -Signal = collections.namedtuple('Signal', ['bank', 'wire']) -Connection = collections.namedtuple('Connection', ['banks', 'scrambler']) -MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number']) - - -class Scrambler(object): - def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, - wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'): - self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos) - self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos) - self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos) - self.reflector = Reflector(reflector_spec) - - def __getattribute__(self, name): - if name=='wheel_positions': - return self.wheel1.position, self.wheel2.position, self.wheel3.position - elif name=='wheel_positions_l': - return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l - else: - return object.__getattribute__(self, name) - - def advance(self, wheel1=False, wheel2=False, wheel3=True): - if wheel1: self.wheel1.advance() - if wheel2: self.wheel2.advance() - if wheel3: self.wheel3.advance() - - def lookup(self, letter): - a = self.wheel3.forward(letter) - b = self.wheel2.forward(a) - c = self.wheel1.forward(b) - d = self.reflector.forward(c) - e = self.wheel1.backward(d) - f = self.wheel2.backward(e) - g = self.wheel3.backward(f) - return g - - def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos): - self.wheel1.set_position(wheel1_pos) - self.wheel2.set_position(wheel2_pos) - self.wheel3.set_position(wheel3_pos) - - -class Bombe(object): - def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, - menu=None, start_signal=None, use_diagonal_board=True, - verify_plugboard=True): - self.connections = [] - self.wheel1_spec = wheel1_spec - self.wheel2_spec = wheel2_spec - self.wheel3_spec = wheel3_spec - self.reflector_spec = reflector_spec - if menu: - self.read_menu(menu) - if start_signal: - self.test_start = start_signal - self.use_diagonal_board = use_diagonal_board - self.verify_plugboard = verify_plugboard - - def __getattribute__(self, name): - if name=='wheel_positions': - return self.connections[0].scrambler.wheel_positions - elif name=='wheel_positions_l': - return self.connections[0].scrambler.wheel_positions_l - else: - return object.__getattribute__(self, name) - - def __call__(self, start_positions): - return start_positions, self.test(initial_signal=self.test_start, - start_positions=start_positions, - use_diagonal_board=self.use_diagonal_board, - verify_plugboard=self.verify_plugboard) - - def add_connection(self, bank_before, bank_after, scrambler): - self.connections += [Connection([bank_before, bank_after], scrambler)] - - def read_menu(self, menu): - for item in menu: - scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec, - self.reflector_spec, - wheel3_pos=unpos(item.number - 1)) - self.add_connection(item.before, item.after, scrambler) - most_common_letter = (collections.Counter(m.before for m in menu) + \ - collections.Counter(m.after for m in menu)).most_common(1)[0][0] - self.test_start = Signal(most_common_letter, most_common_letter) - - def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos): - for i, c in enumerate(self.connections): - c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i)) - - def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True, - verify_plugboard=True): - self.banks = {label: - dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase))) - for label in string.ascii_lowercase} - if start_positions: - self.set_positions(*start_positions) - if not initial_signal: - initial_signal = self.test_start - self.pending = [initial_signal] - self.propagate(use_diagonal_board) - live_wire_count = len([self.banks[self.test_start.bank][w] - for w in self.banks[self.test_start.bank] - if self.banks[self.test_start.bank][w]]) - if live_wire_count < 26: - if verify_plugboard: - possibles = self.possible_plugboards() - return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1) - else: - return True - else: - return False - - def propagate(self, use_diagonal_board): - while self.pending: - current = self.pending[0] - # print("processing", current) - self.pending = self.pending[1:] - if not self.banks[current.bank][current.wire]: - self.banks[current.bank][current.wire] = True - if use_diagonal_board: - self.pending += [Signal(current.wire, current.bank)] - for c in self.connections: - if current.bank in c.banks: - other_bank = [b for b in c.banks if b != current.bank][0] - other_wire = c.scrambler.lookup(current.wire) - # print(" adding", other_bank, other_wire, "because", c.banks) - self.pending += [Signal(other_bank, other_wire)] - - def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True): - if not run_start: - run_start = self.test_start - self.solutions = [] - self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos) - for run_index in range(26*26*26): - if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board): - self.solutions += [self.connections[0].scrambler.wheel_positions_l] - advance3 = True - advance2 = False - advance1 = False - if (run_index + 1) % 26 == 0: advance2 = True - if (run_index + 1) % (26*26) == 0: advance1 = True - for c in self.connections: - c.scrambler.advance(advance1, advance2, advance3) - return self.solutions - - def possible_plugboards(self): - possibles = set() - for b in self.banks: - active = [w for w in self.banks[b] if self.banks[b][w]] - inactive = [w for w in self.banks[b] if not self.banks[b][w]] - if len(active) == 1: - possibles = possibles.union({frozenset((b, active[0]))}) - if len(inactive) == 1: - possibles = possibles.union({frozenset((b, inactive[0]))}) - return possibles - - -def make_menu(plaintext, ciphertext): - return [MenuItem(p, c, i+1) - for i, (p, c) in enumerate(zip(plaintext, ciphertext))] - - -def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu, - start_signal=None, use_diagonal_board=True, - verify_plugboard=True): - allwheels = itertools.product(string.ascii_lowercase, repeat=3) - - with multiprocessing.Pool() as pool: - res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec, - reflector_spec, menu=menu, start_signal=start_signal, - use_diagonal_board=use_diagonal_board, - verify_plugboard=verify_plugboard), - allwheels) - return [r[0] for r in res if r[1]] - - if __name__ == "__main__": import doctest # doctest.testmod(extraglobs={'lt': LetterTransformer(1, 'a')}) diff --git a/test_bombe.py b/test_bombe.py index bac7360..c4b1e48 100644 --- a/test_bombe.py +++ b/test_bombe.py @@ -1,5 +1,5 @@ import unittest -import collections +import string from enigma import * from bombe import * diff --git a/test_enigma.py b/test_enigma.py index 9b59edb..b30be76 100644 --- a/test_enigma.py +++ b/test_enigma.py @@ -1,5 +1,6 @@ import unittest import collections +import string from enigma import *