1bab0e0d3e78fda786487c1798d3cc5d2b458750
[szyfrow.git] / bombe.py
1 import string
2 import collections
3 import multiprocessing
4 import itertools
5 import logging
6
7 from szyfrow.enigma import *
8
9
10 ##################################
11 # # Bombe
12 ##################################
13 #
14 # Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
15 #
16
17 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
18 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
19 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
20
21
22 def make_menu(plaintext, ciphertext):
23 return [MenuItem(p, c, i+1)
24 for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
25
26
27 class Scrambler(object):
28 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
29 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
30 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
31 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
32 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
33 self.reflector = Reflector(reflector_spec)
34
35 def __getattribute__(self, name):
36 if name=='wheel_positions':
37 return self.wheel1.position, self.wheel2.position, self.wheel3.position
38 elif name=='wheel_positions_l':
39 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
40 else:
41 return object.__getattribute__(self, name)
42
43 def advance(self, wheel1=False, wheel2=False, wheel3=True):
44 if wheel1: self.wheel1.advance()
45 if wheel2: self.wheel2.advance()
46 if wheel3: self.wheel3.advance()
47
48 def lookup(self, letter):
49 a = self.wheel3.forward(letter)
50 b = self.wheel2.forward(a)
51 c = self.wheel1.forward(b)
52 d = self.reflector.forward(c)
53 e = self.wheel1.backward(d)
54 f = self.wheel2.backward(e)
55 g = self.wheel3.backward(f)
56 return g
57
58 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
59 self.wheel1.set_position(wheel1_pos)
60 self.wheel2.set_position(wheel2_pos)
61 self.wheel3.set_position(wheel3_pos)
62
63
64 class Bombe(object):
65
66 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
67 menu=None, start_signal=None, use_diagonal_board=True,
68 verify_plugboard=True):
69 self.connections = []
70 self.wheel1_spec = wheel1_spec
71 self.wheel2_spec = wheel2_spec
72 self.wheel3_spec = wheel3_spec
73 self.reflector_spec = reflector_spec
74 if menu:
75 self.read_menu(menu)
76 if start_signal:
77 self.test_start = start_signal
78 self.use_diagonal_board = use_diagonal_board
79 self.verify_plugboard = verify_plugboard
80
81 def __getattribute__(self, name):
82 if name=='wheel_positions':
83 return self.connections[0].scrambler.wheel_positions
84 elif name=='wheel_positions_l':
85 return self.connections[0].scrambler.wheel_positions_l
86 else:
87 return object.__getattribute__(self, name)
88
89 def __call__(self, start_positions):
90 return start_positions, self.test(initial_signal=self.test_start,
91 start_positions=start_positions,
92 use_diagonal_board=self.use_diagonal_board,
93 verify_plugboard=self.verify_plugboard)
94
95 def add_connection(self, bank_before, bank_after, scrambler):
96 self.connections += [Connection([bank_before, bank_after], scrambler)]
97
98 def read_menu(self, menu):
99 self.connections = []
100 for item in menu:
101 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
102 self.reflector_spec,
103 wheel3_pos=unpos(item.number - 1))
104 self.add_connection(item.before, item.after, scrambler)
105 most_common_letter = (collections.Counter(m.before for m in menu) +\
106 collections.Counter(m.after for m in menu)).most_common(1)[0][0]
107 self.test_start = Signal(most_common_letter, most_common_letter)
108
109 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
110 for i, c in enumerate(self.connections):
111 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
112
113 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
114 verify_plugboard=True):
115 self.banks = {label:
116 dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
117 for label in string.ascii_lowercase}
118 if start_positions:
119 self.set_positions(*start_positions)
120 if not initial_signal:
121 initial_signal = self.test_start
122 self.pending = [initial_signal]
123 self.propagate(use_diagonal_board)
124 live_wire_count = len([self.banks[self.test_start.bank][w]
125 for w in self.banks[self.test_start.bank]
126 if self.banks[self.test_start.bank][w]])
127 if live_wire_count < 26:
128 if verify_plugboard:
129 possibles = self.possible_plugboards()
130 return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
131 else:
132 return True
133 else:
134 return False
135
136 def propagate(self, use_diagonal_board):
137 while self.pending:
138 current = self.pending[0]
139 # print("processing", current)
140 self.pending = self.pending[1:]
141 if not self.banks[current.bank][current.wire]:
142 self.banks[current.bank][current.wire] = True
143 if use_diagonal_board:
144 self.pending += [Signal(current.wire, current.bank)]
145 for c in self.connections:
146 if current.bank in c.banks:
147 other_bank = [b for b in c.banks if b != current.bank][0]
148 other_wire = c.scrambler.lookup(current.wire)
149 # print(" adding", other_bank, other_wire, "because", c.banks)
150 self.pending += [Signal(other_bank, other_wire)]
151
152 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
153 if not run_start:
154 run_start = self.test_start
155 self.solutions = []
156 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
157 for run_index in range(26*26*26):
158 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
159 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
160 advance3 = True
161 advance2 = False
162 advance1 = False
163 if (run_index + 1) % 26 == 0: advance2 = True
164 if (run_index + 1) % (26*26) == 0: advance1 = True
165 for c in self.connections:
166 c.scrambler.advance(advance1, advance2, advance3)
167 return self.solutions
168
169 def possible_plugboards(self):
170 possibles = set()
171 for b in self.banks:
172 active = [w for w in self.banks[b] if self.banks[b][w]]
173 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
174 if len(active) == 1:
175 possibles = possibles.union({frozenset((b, active[0]))})
176 if len(inactive) == 1:
177 possibles = possibles.union({frozenset((b, inactive[0]))})
178 return possibles
179
180
181 def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
182 start_signal=None, use_diagonal_board=True,
183 verify_plugboard=True):
184 allwheels = itertools.product(string.ascii_lowercase, repeat=3)
185
186 with multiprocessing.Pool() as pool:
187 res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
188 reflector_spec, menu=menu, start_signal=start_signal,
189 use_diagonal_board=use_diagonal_board,
190 verify_plugboard=verify_plugboard),
191 allwheels)
192 return [r[0] for r in res if r[1]]