Worked on Enigma, mainly changing how the notch positions are handled
[cipher-training.git] / bombe.py
1 import string
2 import collections
3 import multiprocessing
4 import itertools
5 import logging
6 from enigma import *
7
8
9 logger = logging.getLogger('bombe')
10 # logger.setLevel(logging.WARNING)
11 # logger.setLevel(logging.INFO)
12 logger.setLevel(logging.DEBUG)
13
14 # create the logging file handler
15 fh = logging.FileHandler("enigma.log")
16 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17 fh.setFormatter(formatter)
18
19 # add handler to logger object
20 logger.addHandler(fh)
21
22 ##################################
23 # # Bombe
24 ##################################
25 #
26 # Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
27 #
28
29 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
30 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
31 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
32
33
34 def make_menu(plaintext, ciphertext):
35 return [MenuItem(p, c, i+1)
36 for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
37
38
39 class Scrambler(object):
40 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
41 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
42 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
43 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
44 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
45 self.reflector = Reflector(reflector_spec)
46
47 def __getattribute__(self, name):
48 if name=='wheel_positions':
49 return self.wheel1.position, self.wheel2.position, self.wheel3.position
50 elif name=='wheel_positions_l':
51 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
52 else:
53 return object.__getattribute__(self, name)
54
55 def advance(self, wheel1=False, wheel2=False, wheel3=True):
56 if wheel1: self.wheel1.advance()
57 if wheel2: self.wheel2.advance()
58 if wheel3: self.wheel3.advance()
59
60 def lookup(self, letter):
61 a = self.wheel3.forward(letter)
62 b = self.wheel2.forward(a)
63 c = self.wheel1.forward(b)
64 d = self.reflector.forward(c)
65 e = self.wheel1.backward(d)
66 f = self.wheel2.backward(e)
67 g = self.wheel3.backward(f)
68 return g
69
70 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
71 self.wheel1.set_position(wheel1_pos)
72 self.wheel2.set_position(wheel2_pos)
73 self.wheel3.set_position(wheel3_pos)
74
75
76 class Bombe(object):
77
78 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
79 menu=None, start_signal=None, use_diagonal_board=True,
80 verify_plugboard=True):
81 self.connections = []
82 self.wheel1_spec = wheel1_spec
83 self.wheel2_spec = wheel2_spec
84 self.wheel3_spec = wheel3_spec
85 self.reflector_spec = reflector_spec
86 if menu:
87 self.read_menu(menu)
88 if start_signal:
89 self.test_start = start_signal
90 self.use_diagonal_board = use_diagonal_board
91 self.verify_plugboard = verify_plugboard
92
93 def __getattribute__(self, name):
94 if name=='wheel_positions':
95 return self.connections[0].scrambler.wheel_positions
96 elif name=='wheel_positions_l':
97 return self.connections[0].scrambler.wheel_positions_l
98 else:
99 return object.__getattribute__(self, name)
100
101 def __call__(self, start_positions):
102 return start_positions, self.test(initial_signal=self.test_start,
103 start_positions=start_positions,
104 use_diagonal_board=self.use_diagonal_board,
105 verify_plugboard=self.verify_plugboard)
106
107 def add_connection(self, bank_before, bank_after, scrambler):
108 self.connections += [Connection([bank_before, bank_after], scrambler)]
109
110 def read_menu(self, menu):
111 self.connections = []
112 for item in menu:
113 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
114 self.reflector_spec,
115 wheel3_pos=unpos(item.number - 1))
116 self.add_connection(item.before, item.after, scrambler)
117 most_common_letter = (collections.Counter(m.before for m in menu) +\
118 collections.Counter(m.after for m in menu)).most_common(1)[0][0]
119 self.test_start = Signal(most_common_letter, most_common_letter)
120
121 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
122 for i, c in enumerate(self.connections):
123 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
124
125 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
126 verify_plugboard=True):
127 self.banks = {label:
128 dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
129 for label in string.ascii_lowercase}
130 if start_positions:
131 self.set_positions(*start_positions)
132 if not initial_signal:
133 initial_signal = self.test_start
134 self.pending = [initial_signal]
135 self.propagate(use_diagonal_board)
136 live_wire_count = len([self.banks[self.test_start.bank][w]
137 for w in self.banks[self.test_start.bank]
138 if self.banks[self.test_start.bank][w]])
139 if live_wire_count < 26:
140 if verify_plugboard:
141 possibles = self.possible_plugboards()
142 return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
143 else:
144 return True
145 else:
146 return False
147
148 def propagate(self, use_diagonal_board):
149 while self.pending:
150 current = self.pending[0]
151 # print("processing", current)
152 logger.debug("Propogater processing {}".format(current))
153 self.pending = self.pending[1:]
154 if not self.banks[current.bank][current.wire]:
155 self.banks[current.bank][current.wire] = True
156 if use_diagonal_board:
157 self.pending += [Signal(current.wire, current.bank)]
158 for c in self.connections:
159 if current.bank in c.banks:
160 other_bank = [b for b in c.banks if b != current.bank][0]
161 other_wire = c.scrambler.lookup(current.wire)
162 # print(" adding", other_bank, other_wire, "because", c.banks)
163 logger.debug("Propogator adding {0} {1} because {2}".format(other_bank, other_wire, c.banks))
164 self.pending += [Signal(other_bank, other_wire)]
165
166 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
167 if not run_start:
168 run_start = self.test_start
169 self.solutions = []
170 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
171 for run_index in range(26*26*26):
172 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
173 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
174 advance3 = True
175 advance2 = False
176 advance1 = False
177 if (run_index + 1) % 26 == 0: advance2 = True
178 if (run_index + 1) % (26*26) == 0: advance1 = True
179 for c in self.connections:
180 c.scrambler.advance(advance1, advance2, advance3)
181 return self.solutions
182
183 def possible_plugboards(self):
184 possibles = set()
185 for b in self.banks:
186 active = [w for w in self.banks[b] if self.banks[b][w]]
187 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
188 if len(active) == 1:
189 possibles = possibles.union({frozenset((b, active[0]))})
190 if len(inactive) == 1:
191 possibles = possibles.union({frozenset((b, inactive[0]))})
192 return possibles
193
194
195 def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
196 start_signal=None, use_diagonal_board=True,
197 verify_plugboard=True):
198 allwheels = itertools.product(string.ascii_lowercase, repeat=3)
199
200 with multiprocessing.Pool() as pool:
201 res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
202 reflector_spec, menu=menu, start_signal=start_signal,
203 use_diagonal_board=use_diagonal_board,
204 verify_plugboard=verify_plugboard),
205 allwheels)
206 return [r[0] for r in res if r[1]]