1 """A simulator for Bombe machines.
3 See `szyfrow.enigma.Enigma` for an implementation of the Enigma to create
6 There is a good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm)
9 In this implementation, there are *banks* of wires (what Ellsbury refers to
10 as "cables"), one bank for each position that appears in the menu. A bank
11 comprises 26 wires, represented as a `dict` of `bool`s, depending on whether
12 that wire is live ("energised") or not.
14 The menu, derived from the crib, determines how the scramblers connect the
15 banks. A `Connection` represents this.
20 import multiprocessing
24 from szyfrow
.enigma
import *
28 Signal
= collections
.namedtuple('Signal', ['bank', 'wire'])
29 __pdoc__
['Signal'] = """Current propogation through the Bombe indicates that
30 this wire in this bank is live, and the effects need to be proogated further
33 __pdoc__
['Signal.bank'] = """The bank of a signal."""
34 __pdoc__
['Signal.wire'] = """The wire of a signal."""
36 Connection
= collections
.namedtuple('Connection', ['banks', 'scrambler'])
37 __pdoc__
['Connection'] = """A connection between banks made by a particular
38 scrambler (the scrambler state given by its position in the crib).
40 __pdoc__
['Connection.banks'] = """A list of two items, holding the bnaks of
42 __pdoc__
['Connection.scrambler'] = """The bnaks of a connection."""
45 MenuItem
= collections
.namedtuple('MenuIem', ['before', 'after', 'number'])
46 __pdoc__
['MenuItem'] = """One item in the menu, derived from the crib.
48 __pdoc__
['MenuItem.before'] = "The letter before the transform (plaintext)."
49 __pdoc__
['MenuItem.after'] = "The letter after the transform (ciphertext)."
50 __pdoc__
['MenuItem.number'] = "The position of this item in the menu."
53 def make_menu(plaintext
, ciphertext
):
54 """Create a menu from a crib: a given plaintext and ciphertext.
56 No validation is done to ensure that this is a viable crib (e.g. no
57 checking for length, no checking that a letter is enciphered to itself).
59 return [MenuItem(p
, c
, i
+1)
60 for i
, (p
, c
) in enumerate(zip(plaintext
, ciphertext
))]
63 class Scrambler(object):
64 """A scrambler is a collection of three `szyfrow.enigma.SimpleWheel`s.
66 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
67 wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a'):
68 self
.wheel1
= SimpleWheel(wheel1_spec
, position
=wheel1_pos
)
69 self
.wheel2
= SimpleWheel(wheel2_spec
, position
=wheel2_pos
)
70 self
.wheel3
= SimpleWheel(wheel3_spec
, position
=wheel3_pos
)
71 self
.reflector
= Reflector(reflector_spec
)
73 __pdoc__
['Scrambler.wheel_positions'] = """Return a 3-tuple of the wheel
74 positions (as numbers)"""
75 __pdoc__
['Scrambler.wheel_positions_l'] = """Return a 3-tuple of the wheel
76 positions (as letters)"""
77 def __getattribute__(self
, name
):
78 if name
=='wheel_positions':
79 return self
.wheel1
.position
, self
.wheel2
.position
, self
.wheel3
.position
80 elif name
=='wheel_positions_l':
81 return self
.wheel1
.position_l
, self
.wheel2
.position_l
, self
.wheel3
.position_l
83 return object.__getattribute
__(self
, name
)
85 def advance(self
, wheel1
=False, wheel2
=False, wheel3
=True):
86 """Advance some wheels of a scrambler.
88 if wheel1
: self
.wheel1
.advance()
89 if wheel2
: self
.wheel2
.advance()
90 if wheel3
: self
.wheel3
.advance()
92 def lookup(self
, letter
):
93 """Lookup the decipherment of a letter, given a particular scrambler
96 a
= self
.wheel3
.forward(letter
)
97 b
= self
.wheel2
.forward(a
)
98 c
= self
.wheel1
.forward(b
)
99 d
= self
.reflector
.forward(c
)
100 e
= self
.wheel1
.backward(d
)
101 f
= self
.wheel2
.backward(e
)
102 g
= self
.wheel3
.backward(f
)
105 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
106 """Set the positions of a scrambler's wheels.
108 self
.wheel1
.set_position(wheel1_pos
)
109 self
.wheel2
.set_position(wheel2_pos
)
110 self
.wheel3
.set_position(wheel3_pos
)
114 """An entire Bombe machine.
116 This specifies the pattern of the wheels and reflectors used. The
117 scramblers are connected and wired up according the to the specification
120 Bombe objects are callable. Calling a Bombe (with the starting scrambler
121 positions) calls the `test` method and returns the pair of
122 `start_positions` and the result of `test`.
124 Bombe objects have attributes `wheel_positions` and `wheel_positions_l`,
125 which return the results of the scramblers' `Scrambler.wheel_positions`
126 and `Scrambler.wheel_positions_l`.
129 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
130 menu
=None, start_signal
=None, use_diagonal_board
=True,
131 verify_plugboard
=True):
132 self
.connections
= []
133 self
.wheel1_spec
= wheel1_spec
134 self
.wheel2_spec
= wheel2_spec
135 self
.wheel3_spec
= wheel3_spec
136 self
.reflector_spec
= reflector_spec
140 self
.test_start
= start_signal
141 self
.use_diagonal_board
= use_diagonal_board
142 self
.verify_plugboard
= verify_plugboard
144 __pdoc__
['Bombe.wheel_positions'] = """Return a 3-tuple of the wheel
145 positions (as numbers)"""
146 __pdoc__
['Bomb3.wheel_positions_l'] = """Return a 3-tuple of the wheel
147 positions (as letters)"""
148 def __getattribute__(self
, name
):
149 if name
=='wheel_positions':
150 return self
.connections
[0].scrambler
.wheel_positions
151 elif name
=='wheel_positions_l':
152 return self
.connections
[0].scrambler
.wheel_positions_l
154 return object.__getattribute
__(self
, name
)
156 def __call__(self
, start_positions
):
157 return start_positions
, self
.test(initial_signal
=self
.test_start
,
158 start_positions
=start_positions
,
159 use_diagonal_board
=self
.use_diagonal_board
,
160 verify_plugboard
=self
.verify_plugboard
)
162 def add_connection(self
, bank_before
, bank_after
, scrambler
):
163 """Create a new connection between banks.
165 self
.connections
+= [Connection([bank_before
, bank_after
], scrambler
)]
167 def read_menu(self
, menu
):
168 """Read a menu, creating one scrambler for each element of the menu
169 and setting up the connections it implies. Also defines the most
170 common letter in the menu's plaintext as the default letter to start
172 self
.connections
= []
174 scrambler
= Scrambler(self
.wheel1_spec
, self
.wheel2_spec
, self
.wheel3_spec
,
176 wheel3_pos
=unpos(item
.number
- 1))
177 self
.add_connection(item
.before
, item
.after
, scrambler
)
178 most_common_letter
= (collections
.Counter(m
.before
for m
in menu
) +\
179 collections
.Counter(m
.after
for m
in menu
)).most_common(1)[0][0]
180 self
.test_start
= Signal(most_common_letter
, most_common_letter
)
182 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
183 """Set positions of all scramblers. The first scrambler will be set
184 to the specified positions. The second scrambler will have its
185 third wheel advanced one position; the third scramber will have its
186 third wheel advanced two positios; and so on. Not that the first and
187 second wheels of the scramblers are never advanced in setup."""
188 for i
, c
in enumerate(self
.connections
):
189 c
.scrambler
.set_positions(wheel1_pos
, wheel2_pos
, unpos(pos(wheel3_pos
) + i
))
191 def test(self
, initial_signal
=None, start_positions
=None, use_diagonal_board
=True,
192 verify_plugboard
=True):
193 """Test a scrambler setting. It creates a signal (held in
194 `self.pending`) on the `initial_signal` wire then uses
195 `Bombe.propagate` to propagate the signal across the Bombe.
197 Returns a Boolean recording if this scrambler setting with
198 this signal is a "stop" (potential valid scrambler setting).
200 * If `initial_signal` is `None`, use the default starting signal set in
202 * If `start_positions` is `None`, use the existing scramber positions."""
204 dict(zip(string
.ascii_lowercase
,
205 [False]*len(string
.ascii_lowercase
)))
206 for label
in string
.ascii_lowercase
}
208 self
.set_positions(*start_positions
)
209 if not initial_signal
:
210 initial_signal
= self
.test_start
211 self
.pending
= [initial_signal
]
212 self
.propagate(use_diagonal_board
)
213 live_wire_count
= len([self
.banks
[self
.test_start
.bank
][w
]
214 for w
in self
.banks
[self
.test_start
.bank
]
215 if self
.banks
[self
.test_start
.bank
][w
]])
216 if live_wire_count
< 26:
218 possibles
= self
.possible_plugboards()
219 return all(s0
.isdisjoint(s1
)
228 def propagate(self
, use_diagonal_board
):
229 """Propagate a signal through the Bombe. Uses `self.pending` as an
230 agenda for a breadth-first search. Each element on the agenda represents
231 a particular wire in a bank that is being "energised" (set to `True`).
232 The first element in the agenda is removed, the wire/bank is set,
233 then all connected wire/banks are added to the `self.pending`
237 current
= self
.pending
[0]
238 # print("processing", current)
239 self
.pending
= self
.pending
[1:]
240 if not self
.banks
[current
.bank
][current
.wire
]:
241 self
.banks
[current
.bank
][current
.wire
] = True
242 if use_diagonal_board
:
243 self
.pending
+= [Signal(current
.wire
, current
.bank
)]
244 for c
in self
.connections
:
245 if current
.bank
in c
.banks
:
246 other_bank
= [b
for b
in c
.banks
if b
!= current
.bank
][0]
247 other_wire
= c
.scrambler
.lookup(current
.wire
)
248 # print(" adding", other_bank, other_wire, "because", c.banks)
249 self
.pending
+= [Signal(other_bank
, other_wire
)]
251 def run(self
, run_start
=None, wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a',
252 use_diagonal_board
=True):
253 """Run a Bombe after setup with a menu, by trying all scramber
254 positions. For each scrambler position, `Bombe.test` is run. If the
255 test is successful, the scrambler positiions are added to `self.solutions`.
256 `self.Solutions` is returned.
259 run_start
= self
.test_start
261 self
.set_positions(wheel1_pos
, wheel2_pos
, wheel3_pos
)
262 for run_index
in range(26*26*26):
263 if self
.test(initial_signal
=run_start
, use_diagonal_board
=use_diagonal_board
):
264 self
.solutions
+= [self
.connections
[0].scrambler
.wheel_positions_l
]
268 if (run_index
+ 1) % 26 == 0: advance2
= True
269 if (run_index
+ 1) % (26*26) == 0: advance1
= True
270 for c
in self
.connections
:
271 c
.scrambler
.advance(advance1
, advance2
, advance3
)
272 return self
.solutions
274 def possible_plugboards(self
):
275 """Given a Bombe after a `Bombe.test` has been performed, determine
276 what plugboard settings can be derived from the solution.
280 active
= [w
for w
in self
.banks
[b
] if self
.banks
[b
][w
]]
281 inactive
= [w
for w
in self
.banks
[b
] if not self
.banks
[b
][w
]]
283 possibles
= possibles
.union({frozenset((b
, active
[0]))})
284 if len(inactive
) == 1:
285 possibles
= possibles
.union({frozenset((b
, inactive
[0]))})
289 def run_multi_bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
, menu
,
290 start_signal
=None, use_diagonal_board
=True,
291 verify_plugboard
=True):
292 """Run a Bombe solution, spreading the load across multiple CPU cores.
293 Similar to `Bombe.run` in effects, but quicker on a multi-core machine.
295 allwheels
= itertools
.product(string
.ascii_lowercase
, repeat
=3)
297 with multiprocessing
.Pool() as pool
:
298 res
= pool
.map(Bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
,
299 reflector_spec
, menu
=menu
, start_signal
=start_signal
,
300 use_diagonal_board
=use_diagonal_board
,
301 verify_plugboard
=verify_plugboard
),
303 return [r
[0] for r
in res
if r
[1]]