import collections
import multiprocessing
import itertools
+import logging
+
+logger = logging.getLogger('engima')
+logger.setLevel(logging.WARNING)
+# logger.setLevel(logging.INFO)
+# logger.setLevel(logging.DEBUG)
+
+# create the logging file handler
+fh = logging.FileHandler("enigma.log")
+formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+fh.setFormatter(formatter)
+
+# add handler to logger object
+logger.addHandler(fh)
+
# Some convenience functions
beta_wheel_spec = 'leyjvcnixwpbqmdrtakzgfuhos'
gamma_wheel_spec = 'fsokanuerhmbtiycwlqpzxvgjd'
-wheel_i_pegs = ['q']
-wheel_ii_pegs = ['e']
-wheel_iii_pegs = ['v']
-wheel_iv_pegs = ['j']
-wheel_v_pegs = ['z']
-wheel_vi_pegs = ['z', 'm']
-wheel_vii_pegs = ['z', 'm']
-wheel_viii_pegs = ['z', 'm']
+wheel_i_notches = ['q']
+wheel_ii_notches = ['e']
+wheel_iii_notches = ['v']
+wheel_iv_notches = ['j']
+wheel_v_notches = ['z']
+wheel_vi_notches = ['z', 'm']
+wheel_vii_notches = ['z', 'm']
+wheel_viii_notches = ['z', 'm']
reflector_b_spec = 'ay br cu dh eq fs gl ip jx kn mo tz vw'
reflector_c_spec = 'af bv cp dj ei go hy kr lz mx nw tq su'
return object.__getattribute__(self, name)
def set_position(self, position):
- self.position = ord(position) - ord('a')
+ if isinstance(position, str):
+ # self.position = ord(position) - ord('a')
+ self.position = pos(position)
+ else:
+ self.position = position
def forward(self, letter):
if letter in string.ascii_lowercase:
class Wheel(SimpleWheel):
"""A wheel with a movable ring.
- The ring holds the letters and the pegs that turn other wheels. The core
+ The ring holds the letters and the notches that turn other wheels. The core
holds the wiring that does the transformation.
The ring position is how many steps the core is turned relative to the ring.
The position_l is the position of the ring, or what would be observed
by the user of the Enigma machine.
- The peg_positions are the number of advances of this wheel before it will
+ The notch_positions are the number of advances of this wheel before it will
advance the next wheel.
"""
- def __init__(self, transform, ring_peg_letters, ring_setting=1, position='a', raw_transform=False):
- self.ring_peg_letters = ring_peg_letters
+ def __init__(self, transform, ring_notch_letters, ring_setting=1, position='a', raw_transform=False):
+ self.ring_notch_letters = ring_notch_letters
self.ring_setting = ring_setting
super(Wheel, self).__init__(transform, position=position, raw_transform=raw_transform)
self.set_position(position)
return object.__getattribute__(self, name)
def set_position(self, position):
- self.position = (pos(position) - self.ring_setting + 1) % 26
- self.peg_positions = [(pos(p) - pos(position)) % 26 for p in self.ring_peg_letters]
+ if isinstance(position, str):
+ self.position = (pos(position) - self.ring_setting + 1) % 26
+ else:
+ self.position = (position - self.ring_setting) % 26
+ # # self.notch_positions = [(pos(p) - pos(position)) % 26 for p in self.ring_notch_letters]
+ # self.notch_positions = [(pos(p) - (self.position + self.ring_setting - 1)) % 26 for p in self.ring_notch_letters]
+ self.notch_positions = [(self.position + self.ring_setting - 1 - pos(p)) % 26 for p in self.ring_notch_letters]
def advance(self):
super(Wheel, self).advance()
- self.peg_positions = [(p - 1) % 26 for p in self.peg_positions]
+ self.notch_positions = [(p + 1) % 26 for p in self.notch_positions]
+ return self.position
class Enigma(object):
"""
def __init__(self, reflector_spec,
- left_wheel_spec, left_wheel_pegs,
- middle_wheel_spec, middle_wheel_pegs,
- right_wheel_spec, right_wheel_pegs,
+ left_wheel_spec, left_wheel_notches,
+ middle_wheel_spec, middle_wheel_notches,
+ right_wheel_spec, right_wheel_notches,
left_ring_setting, middle_ring_setting, right_ring_setting,
plugboard_setting):
self.reflector = Reflector(reflector_spec)
- self.left_wheel = Wheel(left_wheel_spec, left_wheel_pegs, ring_setting=left_ring_setting)
- self.middle_wheel = Wheel(middle_wheel_spec, middle_wheel_pegs, ring_setting=middle_ring_setting)
- self.right_wheel = Wheel(right_wheel_spec, right_wheel_pegs, ring_setting=right_ring_setting)
+ self.left_wheel = Wheel(left_wheel_spec, left_wheel_notches, ring_setting=left_ring_setting)
+ self.middle_wheel = Wheel(middle_wheel_spec, middle_wheel_notches, ring_setting=middle_ring_setting)
+ self.right_wheel = Wheel(right_wheel_spec, right_wheel_notches, ring_setting=right_ring_setting)
self.plugboard = Plugboard(plugboard_setting)
def __getattribute__(self,name):
return self.left_wheel.position, self.middle_wheel.position, self.right_wheel.position
elif name=='wheel_positions_l':
return self.left_wheel.position_l, self.middle_wheel.position_l, self.right_wheel.position_l
- elif name=='peg_positions':
- return self.left_wheel.peg_positions, self.middle_wheel.peg_positions, self.right_wheel.peg_positions
+ elif name=='notch_positions':
+ return self.left_wheel.notch_positions, self.middle_wheel.notch_positions, self.right_wheel.notch_positions
else:
return object.__getattribute__(self, name)
def advance(self):
advance_middle = False
advance_left = False
- if 0 in self.right_wheel.peg_positions:
+ if 0 in self.right_wheel.notch_positions:
advance_middle = True
- if 0 in self.middle_wheel.peg_positions:
+ if 0 in self.middle_wheel.notch_positions:
advance_left = True
advance_middle = True
self.right_wheel.advance()
# print('enigma.advance()')
# print("assert(enigma.wheel_positions == {})".format(enigma.wheel_positions))
# print("assert(cat(enigma.wheel_positions_l) == '{}')".format(cat(enigma.wheel_positions_l)))
-# print("assert(enigma.peg_positions == {})".format(enigma.peg_positions))
+# print("assert(enigma.notch_positions == {})".format(enigma.notch_positions))
# print("assert(cat(enigma.lookup(l) for l in string.ascii_lowercase) == '{}')".format(cat(enigma.lookup(l) for l in string.ascii_lowercase)))
# print()
-##################################
-# # Bombe
-##################################
-#
-# Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
-#
-
-Signal = collections.namedtuple('Signal', ['bank', 'wire'])
-Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
-MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
-
-
-class Scrambler(object):
- def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
- wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
- self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
- self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
- self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
- self.reflector = Reflector(reflector_spec)
-
- def __getattribute__(self, name):
- if name=='wheel_positions':
- return self.wheel1.position, self.wheel2.position, self.wheel3.position
- elif name=='wheel_positions_l':
- return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
- else:
- return object.__getattribute__(self, name)
-
- def advance(self, wheel1=False, wheel2=False, wheel3=True):
- if wheel1: self.wheel1.advance()
- if wheel2: self.wheel2.advance()
- if wheel3: self.wheel3.advance()
-
- def lookup(self, letter):
- a = self.wheel3.forward(letter)
- b = self.wheel2.forward(a)
- c = self.wheel1.forward(b)
- d = self.reflector.forward(c)
- e = self.wheel1.backward(d)
- f = self.wheel2.backward(e)
- g = self.wheel3.backward(f)
- return g
-
- def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
- self.wheel1.set_position(wheel1_pos)
- self.wheel2.set_position(wheel2_pos)
- self.wheel3.set_position(wheel3_pos)
-
-
-class Bombe(object):
- def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
- menu=None, start_signal=None, use_diagonal_board=True,
- verify_plugboard=True):
- self.connections = []
- self.wheel1_spec = wheel1_spec
- self.wheel2_spec = wheel2_spec
- self.wheel3_spec = wheel3_spec
- self.reflector_spec = reflector_spec
- if menu:
- self.read_menu(menu)
- if start_signal:
- self.test_start = start_signal
- self.use_diagonal_board = use_diagonal_board
- self.verify_plugboard = verify_plugboard
-
- def __getattribute__(self, name):
- if name=='wheel_positions':
- return self.connections[0].scrambler.wheel_positions
- elif name=='wheel_positions_l':
- return self.connections[0].scrambler.wheel_positions_l
- else:
- return object.__getattribute__(self, name)
-
- def __call__(self, start_positions):
- return start_positions, self.test(initial_signal=self.test_start,
- start_positions=start_positions,
- use_diagonal_board=self.use_diagonal_board,
- verify_plugboard=self.verify_plugboard)
-
- def add_connection(self, bank_before, bank_after, scrambler):
- self.connections += [Connection([bank_before, bank_after], scrambler)]
-
- def read_menu(self, menu):
- for item in menu:
- scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
- self.reflector_spec,
- wheel3_pos=unpos(item.number - 1))
- self.add_connection(item.before, item.after, scrambler)
- most_common_letter = (collections.Counter(m.before for m in menu) + \
- collections.Counter(m.after for m in menu)).most_common(1)[0][0]
- self.test_start = Signal(most_common_letter, most_common_letter)
-
- def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
- for i, c in enumerate(self.connections):
- c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
-
- def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
- verify_plugboard=True):
- self.banks = {label:
- dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
- for label in string.ascii_lowercase}
- if start_positions:
- self.set_positions(*start_positions)
- if not initial_signal:
- initial_signal = self.test_start
- self.pending = [initial_signal]
- self.propagate(use_diagonal_board)
- live_wire_count = len([self.banks[self.test_start.bank][w]
- for w in self.banks[self.test_start.bank]
- if self.banks[self.test_start.bank][w]])
- if live_wire_count < 26:
- if verify_plugboard:
- possibles = self.possible_plugboards()
- return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
- else:
- return True
- else:
- return False
-
- def propagate(self, use_diagonal_board):
- while self.pending:
- current = self.pending[0]
- # print("processing", current)
- self.pending = self.pending[1:]
- if not self.banks[current.bank][current.wire]:
- self.banks[current.bank][current.wire] = True
- if use_diagonal_board:
- self.pending += [Signal(current.wire, current.bank)]
- for c in self.connections:
- if current.bank in c.banks:
- other_bank = [b for b in c.banks if b != current.bank][0]
- other_wire = c.scrambler.lookup(current.wire)
- # print(" adding", other_bank, other_wire, "because", c.banks)
- self.pending += [Signal(other_bank, other_wire)]
-
- def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
- if not run_start:
- run_start = self.test_start
- self.solutions = []
- self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
- for run_index in range(26*26*26):
- if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
- self.solutions += [self.connections[0].scrambler.wheel_positions_l]
- advance3 = True
- advance2 = False
- advance1 = False
- if (run_index + 1) % 26 == 0: advance2 = True
- if (run_index + 1) % (26*26) == 0: advance1 = True
- for c in self.connections:
- c.scrambler.advance(advance1, advance2, advance3)
- return self.solutions
-
- def possible_plugboards(self):
- possibles = set()
- for b in self.banks:
- active = [w for w in self.banks[b] if self.banks[b][w]]
- inactive = [w for w in self.banks[b] if not self.banks[b][w]]
- if len(active) == 1:
- possibles = possibles.union({frozenset((b, active[0]))})
- if len(inactive) == 1:
- possibles = possibles.union({frozenset((b, inactive[0]))})
- return possibles
-
-
-def make_menu(plaintext, ciphertext):
- return [MenuItem(p, c, i+1)
- for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
-
-
-def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
- start_signal=None, use_diagonal_board=True,
- verify_plugboard=True):
- allwheels = itertools.product(string.ascii_lowercase, repeat=3)
-
- with multiprocessing.Pool() as pool:
- res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
- reflector_spec, menu=menu, start_signal=start_signal,
- use_diagonal_board=use_diagonal_board,
- verify_plugboard=verify_plugboard),
- allwheels)
- return [r[0] for r in res if r[1]]
-
-
if __name__ == "__main__":
import doctest
# doctest.testmod(extraglobs={'lt': LetterTransformer(1, 'a')})