Started on documentation
[szyfrow.git] / szyfrow / bombe.py
1 """A simulator for Bombe machines.
2
3 See `szyfrow.enigma.Enigma` for an implementation of the Enigma to create
4 messages.
5
6 There is a good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm)
7 by Graham Ellsbury.
8
9 In this implementation, there are *banks* of wires (what Ellsbury refers to
10 as "cables"), one bank for each position that appears in the menu. A bank
11 comprises 26 wires, represented as a `dict` of `bool`s, depending on whether
12 that wire is live ("energised") or not.
13
14 The menu, derived from the crib, determines how the scramblers connect the
15 banks. A `Connection` represents this.
16 """
17
18 import string
19 import collections
20 import multiprocessing
21 import itertools
22 import logging
23
24 from szyfrow.enigma import *
25
26 __pdoc__ = {}
27
28 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
29 __pdoc__['Signal'] = """Current propogation through the Bombe indicates that
30 this wire in this bank is live, and the effects need to be proogated further
31 through the machine.
32 """
33 __pdoc__['Signal.bank'] = """The bank of a signal."""
34 __pdoc__['Signal.wire'] = """The wire of a signal."""
35
36 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
37 __pdoc__['Connection'] = """A connection between banks made by a particular
38 scrambler (the scrambler state given by its position in the crib).
39 """
40 __pdoc__['Connection.banks'] = """A list of two items, holding the bnaks of
41 a connection."""
42 __pdoc__['Connection.scrambler'] = """The bnaks of a connection."""
43
44
45 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
46 __pdoc__['MenuItem'] = """One item in the menu, derived from the crib.
47 """
48 __pdoc__['MenuItem.before'] = "The letter before the transform (plaintext)."
49 __pdoc__['MenuItem.after'] = "The letter after the transform (ciphertext)."
50 __pdoc__['MenuItem.number'] = "The position of this item in the menu."
51
52
53 def make_menu(plaintext, ciphertext):
54 """Create a menu from a crib: a given plaintext and ciphertext.
55
56 No validation is done to ensure that this is a viable crib (e.g. no
57 checking for length, no checking that a letter is enciphered to itself).
58 """
59 return [MenuItem(p, c, i+1)
60 for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
61
62
63 class Scrambler(object):
64 """A scrambler is a collection of three `szyfrow.enigma.SimpleWheel`s.
65 """
66 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
67 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
68 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
69 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
70 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
71 self.reflector = Reflector(reflector_spec)
72
73 __pdoc__['Scrambler.wheel_positions'] = """Return a 3-tuple of the wheel
74 positions (as numbers)"""
75 __pdoc__['Scrambler.wheel_positions_l'] = """Return a 3-tuple of the wheel
76 positions (as letters)"""
77 def __getattribute__(self, name):
78 if name=='wheel_positions':
79 return self.wheel1.position, self.wheel2.position, self.wheel3.position
80 elif name=='wheel_positions_l':
81 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
82 else:
83 return object.__getattribute__(self, name)
84
85 def advance(self, wheel1=False, wheel2=False, wheel3=True):
86 """Advance some wheels of a scrambler.
87 """
88 if wheel1: self.wheel1.advance()
89 if wheel2: self.wheel2.advance()
90 if wheel3: self.wheel3.advance()
91
92 def lookup(self, letter):
93 """Lookup the decipherment of a letter, given a particular scrambler
94 orientation.
95 """
96 a = self.wheel3.forward(letter)
97 b = self.wheel2.forward(a)
98 c = self.wheel1.forward(b)
99 d = self.reflector.forward(c)
100 e = self.wheel1.backward(d)
101 f = self.wheel2.backward(e)
102 g = self.wheel3.backward(f)
103 return g
104
105 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
106 """Set the positions of a scrambler's wheels.
107 """
108 self.wheel1.set_position(wheel1_pos)
109 self.wheel2.set_position(wheel2_pos)
110 self.wheel3.set_position(wheel3_pos)
111
112
113 class Bombe(object):
114 """An entire Bombe machine.
115
116 This specifies the pattern of the wheels and reflectors used. The
117 scramblers are connected and wired up according the to the specification
118 given by the menu.
119
120 Bombe objects are callable. Calling a Bombe (with the starting scrambler
121 positions) calls the `test` method and returns the pair of
122 `start_positions` and the result of `test`.
123
124 Bombe objects have attributes `wheel_positions` and `wheel_positions_l`,
125 which return the results of the scramblers' `Scrambler.wheel_positions`
126 and `Scrambler.wheel_positions_l`.
127 """
128
129 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
130 menu=None, start_signal=None, use_diagonal_board=True,
131 verify_plugboard=True):
132 self.connections = []
133 self.wheel1_spec = wheel1_spec
134 self.wheel2_spec = wheel2_spec
135 self.wheel3_spec = wheel3_spec
136 self.reflector_spec = reflector_spec
137 if menu:
138 self.read_menu(menu)
139 if start_signal:
140 self.test_start = start_signal
141 self.use_diagonal_board = use_diagonal_board
142 self.verify_plugboard = verify_plugboard
143
144 __pdoc__['Bombe.wheel_positions'] = """Return a 3-tuple of the wheel
145 positions (as numbers)"""
146 __pdoc__['Bomb3.wheel_positions_l'] = """Return a 3-tuple of the wheel
147 positions (as letters)"""
148 def __getattribute__(self, name):
149 if name=='wheel_positions':
150 return self.connections[0].scrambler.wheel_positions
151 elif name=='wheel_positions_l':
152 return self.connections[0].scrambler.wheel_positions_l
153 else:
154 return object.__getattribute__(self, name)
155
156 def __call__(self, start_positions):
157 return start_positions, self.test(initial_signal=self.test_start,
158 start_positions=start_positions,
159 use_diagonal_board=self.use_diagonal_board,
160 verify_plugboard=self.verify_plugboard)
161
162 def add_connection(self, bank_before, bank_after, scrambler):
163 """Create a new connection between banks.
164 """
165 self.connections += [Connection([bank_before, bank_after], scrambler)]
166
167 def read_menu(self, menu):
168 """Read a menu, creating one scrambler for each element of the menu
169 and setting up the connections it implies. Also defines the most
170 common letter in the menu's plaintext as the default letter to start
171 testing with."""
172 self.connections = []
173 for item in menu:
174 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
175 self.reflector_spec,
176 wheel3_pos=unpos(item.number - 1))
177 self.add_connection(item.before, item.after, scrambler)
178 most_common_letter = (collections.Counter(m.before for m in menu) +\
179 collections.Counter(m.after for m in menu)).most_common(1)[0][0]
180 self.test_start = Signal(most_common_letter, most_common_letter)
181
182 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
183 """Set positions of all scramblers. The first scrambler will be set
184 to the specified positions. The second scrambler will have its
185 third wheel advanced one position; the third scramber will have its
186 third wheel advanced two positios; and so on. Not that the first and
187 second wheels of the scramblers are never advanced in setup."""
188 for i, c in enumerate(self.connections):
189 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
190
191 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
192 verify_plugboard=True):
193 """Test a scrambler setting. It creates a signal (held in
194 `self.pending`) on the `initial_signal` wire then uses
195 `Bombe.propagate` to propagate the signal across the Bombe.
196
197 Returns a Boolean recording if this scrambler setting with
198 this signal is a "stop" (potential valid scrambler setting).
199
200 * If `initial_signal` is `None`, use the default starting signal set in
201 `Bombe.read_menu`
202 * If `start_positions` is `None`, use the existing scramber positions."""
203 self.banks = {label:
204 dict(zip(string.ascii_lowercase,
205 [False]*len(string.ascii_lowercase)))
206 for label in string.ascii_lowercase}
207 if start_positions:
208 self.set_positions(*start_positions)
209 if not initial_signal:
210 initial_signal = self.test_start
211 self.pending = [initial_signal]
212 self.propagate(use_diagonal_board)
213 live_wire_count = len([self.banks[self.test_start.bank][w]
214 for w in self.banks[self.test_start.bank]
215 if self.banks[self.test_start.bank][w]])
216 if live_wire_count < 26:
217 if verify_plugboard:
218 possibles = self.possible_plugboards()
219 return all(s0.isdisjoint(s1)
220 for s0 in possibles
221 for s1 in possibles
222 if s0 != s1)
223 else:
224 return True
225 else:
226 return False
227
228 def propagate(self, use_diagonal_board):
229 """Propagate a signal through the Bombe. Uses `self.pending` as an
230 agenda for a breadth-first search. Each element on the agenda represents
231 a particular wire in a bank that is being "energised" (set to `True`).
232 The first element in the agenda is removed, the wire/bank is set,
233 then all connected wire/banks are added to the `self.pending`
234 agenda.
235 """
236 while self.pending:
237 current = self.pending[0]
238 # print("processing", current)
239 self.pending = self.pending[1:]
240 if not self.banks[current.bank][current.wire]:
241 self.banks[current.bank][current.wire] = True
242 if use_diagonal_board:
243 self.pending += [Signal(current.wire, current.bank)]
244 for c in self.connections:
245 if current.bank in c.banks:
246 other_bank = [b for b in c.banks if b != current.bank][0]
247 other_wire = c.scrambler.lookup(current.wire)
248 # print(" adding", other_bank, other_wire, "because", c.banks)
249 self.pending += [Signal(other_bank, other_wire)]
250
251 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a',
252 use_diagonal_board=True):
253 """Run a Bombe after setup with a menu, by trying all scramber
254 positions. For each scrambler position, `Bombe.test` is run. If the
255 test is successful, the scrambler positiions are added to `self.solutions`.
256 `self.Solutions` is returned.
257 """
258 if not run_start:
259 run_start = self.test_start
260 self.solutions = []
261 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
262 for run_index in range(26*26*26):
263 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
264 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
265 advance3 = True
266 advance2 = False
267 advance1 = False
268 if (run_index + 1) % 26 == 0: advance2 = True
269 if (run_index + 1) % (26*26) == 0: advance1 = True
270 for c in self.connections:
271 c.scrambler.advance(advance1, advance2, advance3)
272 return self.solutions
273
274 def possible_plugboards(self):
275 """Given a Bombe after a `Bombe.test` has been performed, determine
276 what plugboard settings can be derived from the solution.
277 """
278 possibles = set()
279 for b in self.banks:
280 active = [w for w in self.banks[b] if self.banks[b][w]]
281 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
282 if len(active) == 1:
283 possibles = possibles.union({frozenset((b, active[0]))})
284 if len(inactive) == 1:
285 possibles = possibles.union({frozenset((b, inactive[0]))})
286 return possibles
287
288
289 def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
290 start_signal=None, use_diagonal_board=True,
291 verify_plugboard=True):
292 """Run a Bombe solution, spreading the load across multiple CPU cores.
293 Similar to `Bombe.run` in effects, but quicker on a multi-core machine.
294 """
295 allwheels = itertools.product(string.ascii_lowercase, repeat=3)
296
297 with multiprocessing.Pool() as pool:
298 res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
299 reflector_spec, menu=menu, start_signal=start_signal,
300 use_diagonal_board=use_diagonal_board,
301 verify_plugboard=verify_plugboard),
302 allwheels)
303 return [r[0] for r in res if r[1]]