import collections
import multiprocessing
import itertools
+import logging
from enigma import *
+logger = logging.getLogger('bombe')
+# logger.setLevel(logging.WARNING)
+# logger.setLevel(logging.INFO)
+logger.setLevel(logging.DEBUG)
+
+# create the logging file handler
+fh = logging.FileHandler("enigma.log")
+formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+fh.setFormatter(formatter)
+
+# add handler to logger object
+logger.addHandler(fh)
+
+##################################
+# # Bombe
+##################################
+#
+# Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
+#
+
Signal = collections.namedtuple('Signal', ['bank', 'wire'])
Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
+def make_menu(plaintext, ciphertext):
+ return [MenuItem(p, c, i+1)
+ for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
+
+
class Scrambler(object):
def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
self.connections += [Connection([bank_before, bank_after], scrambler)]
def read_menu(self, menu):
+ self.connections = []
for item in menu:
scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
self.reflector_spec,
wheel3_pos=unpos(item.number - 1))
self.add_connection(item.before, item.after, scrambler)
- most_common_letter = (collections.Counter(m.before for m in menu) + collections.Counter(m.after for m in menu)).most_common(1)[0][0]
+ most_common_letter = (collections.Counter(m.before for m in menu) +\
+ collections.Counter(m.after for m in menu)).most_common(1)[0][0]
self.test_start = Signal(most_common_letter, most_common_letter)
def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
while self.pending:
current = self.pending[0]
# print("processing", current)
+ logger.debug("Propogater processing {}".format(current))
self.pending = self.pending[1:]
if not self.banks[current.bank][current.wire]:
self.banks[current.bank][current.wire] = True
other_bank = [b for b in c.banks if b != current.bank][0]
other_wire = c.scrambler.lookup(current.wire)
# print(" adding", other_bank, other_wire, "because", c.banks)
+ logger.debug("Propogator adding {0} {1} because {2}".format(other_bank, other_wire, c.banks))
self.pending += [Signal(other_bank, other_wire)]
def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
possibles = possibles.union({frozenset((b, inactive[0]))})
return possibles
+
+def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
+ start_signal=None, use_diagonal_board=True,
+ verify_plugboard=True):
+ allwheels = itertools.product(string.ascii_lowercase, repeat=3)
+
+ with multiprocessing.Pool() as pool:
+ res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
+ reflector_spec, menu=menu, start_signal=start_signal,
+ use_diagonal_board=use_diagonal_board,
+ verify_plugboard=verify_plugboard),
+ allwheels)
+ return [r[0] for r in res if r[1]]
\ No newline at end of file