X-Git-Url: https://git.njae.me.uk/?a=blobdiff_plain;f=enigma.py;fp=enigma.py;h=72f480792170725c0b59462781d338961c534510;hb=628c83045960e5d94ea55fe90437b465a860d4c7;hp=1150763d35eabc8be0e4fa5cf1c4f5cc3ed26b6f;hpb=027a37ff0a8b8e5dd28c8f6ce3819315d1b7a284;p=cipher-tools.git diff --git a/enigma.py b/enigma.py index 1150763..72f4807 100644 --- a/enigma.py +++ b/enigma.py @@ -1,7 +1,9 @@ # coding: utf-8 +################################## # # Enigma machine +################################## # Specification from [Codes and Ciphers](http://www.codesandciphers.org.uk/enigma/rotorspec.htm) page. # # Example Enigma machines from [Louise Dale](http://enigma.louisedade.co.uk/enigma.html) (full simulation) and [EnigmaCo](http://enigmaco.de/enigma/enigma.html) (good animation of the wheels, but no ring settings). @@ -12,6 +14,8 @@ import string import collections +import multiprocessing +import itertools # Some convenience functions @@ -278,7 +282,6 @@ class SimpleWheel(LetterTransformer): def advance(self): self.position = (self.position + 1) % 26 - # return self.position @@ -429,16 +432,11 @@ class Wheel(SimpleWheel): def set_position(self, position): self.position = (pos(position) - self.ring_setting + 1) % 26 - # self.position_l = position self.peg_positions = [(pos(p) - pos(position)) % 26 for p in self.ring_peg_letters] def advance(self): super(Wheel, self).advance() self.peg_positions = [(p - 1) % 26 for p in self.peg_positions] - # self.position_l = unpos(self.position + self.ring_setting - 1) - # return self.position - - class Enigma(object): @@ -912,20 +910,20 @@ class Enigma(object): 'wenpbqrouxlkychdfgzvitajms' - >>> enigma31.set_wheels('i', 'd', 'z') + >>> enigma31.set_wheels('i', 'z', 'd') >>> enigma31.encipher('verylongtestmessagewithanextrabitofmessageforgoodmeasure') - 'gstsegeqdrthkfwesljjomfvcqwcfspxpfqqmewvddybarzwubxtpejz' + 'apocwtjuikurcfivlozvhffkoacxufcekthcvodfqpxdjqyckdozlqki' >>> enigma31.wheel_positions - (3, 12, 6) + (4, 9, 10) >>> cat(enigma31.wheel_positions_l) - 'ifd' + 'jch' >>> enigma31.peg_positions - ([8], [20], [18]) + ([7], [23], [14]) >>> cat(enigma31.lookup(l) for l in string.ascii_lowercase) - 'urygzpdmxtwshqvfnbljaokice' + 'mopnigfuesqwadbcktjrhylzvx' - >>> enigma31.set_wheels('i', 'd', 'z') - >>> enigma31.decipher('gstsegeqdrthkfwesljjomfvcqwcfspxpfqqmewvddybarzwubxtpejz') + >>> enigma31.set_wheels('i', 'z', 'd') + >>> enigma31.decipher('apocwtjuikurcfivlozvhffkoacxufcekthcvodfqpxdjqyckdozlqki') 'verylongtestmessagewithanextrabitofmessageforgoodmeasure' """ def __init__(self, reflector_spec, @@ -1002,6 +1000,186 @@ class Enigma(object): # print() +################################## +# # Bombe +################################## + +Signal = collections.namedtuple('Signal', ['bank', 'wire']) +Connection = collections.namedtuple('Connection', ['banks', 'scrambler']) +MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number']) + + +class Scrambler(object): + def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, + wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'): + self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos) + self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos) + self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos) + self.reflector = Reflector(reflector_spec) + + def __getattribute__(self, name): + if name=='wheel_positions': + return self.wheel1.position, self.wheel2.position, self.wheel3.position + elif name=='wheel_positions_l': + return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l + else: + return object.__getattribute__(self, name) + + def advance(self, wheel1=False, wheel2=False, wheel3=True): + if wheel1: self.wheel1.advance() + if wheel2: self.wheel2.advance() + if wheel3: self.wheel3.advance() + + def lookup(self, letter): + a = self.wheel3.forward(letter) + b = self.wheel2.forward(a) + c = self.wheel1.forward(b) + d = self.reflector.forward(c) + e = self.wheel1.backward(d) + f = self.wheel2.backward(e) + g = self.wheel3.backward(f) + return g + + def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos): + self.wheel1.set_position(wheel1_pos) + self.wheel2.set_position(wheel2_pos) + self.wheel3.set_position(wheel3_pos) + + +class Bombe(object): + def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, + menu=None, start_signal=None, use_diagonal_board=True, + verify_plugboard=True): + self.connections = [] + self.wheel1_spec = wheel1_spec + self.wheel2_spec = wheel2_spec + self.wheel3_spec = wheel3_spec + self.reflector_spec = reflector_spec + if menu: + self.read_menu(menu) + if start_signal: + self.test_start = start_signal + self.use_diagonal_board = use_diagonal_board + self.verify_plugboard = verify_plugboard + + def __getattribute__(self, name): + if name=='wheel_positions': + return self.connections[0].scrambler.wheel_positions + elif name=='wheel_positions_l': + return self.connections[0].scrambler.wheel_positions_l + else: + return object.__getattribute__(self, name) + + def __call__(self, start_positions): + return start_positions, self.test(initial_signal=self.test_start, + start_positions=start_positions, + use_diagonal_board=self.use_diagonal_board, + verify_plugboard=self.verify_plugboard) + + def add_connection(self, bank_before, bank_after, scrambler): + self.connections += [Connection([bank_before, bank_after], scrambler)] + + def read_menu(self, menu): + for item in menu: + scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec, + self.reflector_spec, + wheel3_pos=unpos(item.number - 1)) + self.add_connection(item.before, item.after, scrambler) + most_common_letter = (collections.Counter(m.before for m in menu) + \ + collections.Counter(m.after for m in menu)).most_common(1)[0][0] + self.test_start = Signal(most_common_letter, most_common_letter) + + def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos): + for i, c in enumerate(self.connections): + c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i)) + + def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True, + verify_plugboard=True): + self.banks = {label: + dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase))) + for label in string.ascii_lowercase} + if start_positions: + self.set_positions(*start_positions) + if not initial_signal: + initial_signal = self.test_start + self.pending = [initial_signal] + self.propagate(use_diagonal_board) + live_wire_count = len([self.banks[self.test_start.bank][w] + for w in self.banks[self.test_start.bank] + if self.banks[self.test_start.bank][w]]) + if live_wire_count < 26: + if verify_plugboard: + possibles = self.possible_plugboards() + return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1) + else: + return True + else: + return False + + def propagate(self, use_diagonal_board): + while self.pending: + current = self.pending[0] + # print("processing", current) + self.pending = self.pending[1:] + if not self.banks[current.bank][current.wire]: + self.banks[current.bank][current.wire] = True + if use_diagonal_board: + self.pending += [Signal(current.wire, current.bank)] + for c in self.connections: + if current.bank in c.banks: + other_bank = [b for b in c.banks if b != current.bank][0] + other_wire = c.scrambler.lookup(current.wire) + # print(" adding", other_bank, other_wire, "because", c.banks) + self.pending += [Signal(other_bank, other_wire)] + + def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True): + if not run_start: + run_start = self.test_start + self.solutions = [] + self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos) + for run_index in range(26*26*26): + if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board): + self.solutions += [self.connections[0].scrambler.wheel_positions_l] + advance3 = True + advance2 = False + advance1 = False + if (run_index + 1) % 26 == 0: advance2 = True + if (run_index + 1) % (26*26) == 0: advance1 = True + for c in self.connections: + c.scrambler.advance(advance1, advance2, advance3) + return self.solutions + + def possible_plugboards(self): + possibles = set() + for b in self.banks: + active = [w for w in self.banks[b] if self.banks[b][w]] + inactive = [w for w in self.banks[b] if not self.banks[b][w]] + if len(active) == 1: + possibles = possibles.union({frozenset((b, active[0]))}) + if len(inactive) == 1: + possibles = possibles.union({frozenset((b, inactive[0]))}) + return possibles + + +def make_menu(plaintext, ciphertext): + return [MenuItem(p, c, i+1) + for i, (p, c) in enumerate(zip(plaintext, ciphertext))] + + +def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu, + start_signal=None, use_diagonal_board=True, + verify_plugboard=True): + allwheels = itertools.product(string.ascii_lowercase, repeat=3) + + with multiprocessing.Pool() as pool: + res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec, + reflector_spec, menu=menu, start_signal=start_signal, + use_diagonal_board=use_diagonal_board, + verify_plugboard=verify_plugboard), + allwheels) + return [r[0] for r in res if r[1]] + + if __name__ == "__main__": import doctest # doctest.testmod(extraglobs={'lt': LetterTransformer(1, 'a')})