Updated after tests with real Enigma machine
[cipher-training.git] / bombe.py
1 import string
2 import collections
3 import multiprocessing
4 import itertools
5 from enigma import *
6
7
8 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
9 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
10 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
11
12
13 class Scrambler(object):
14 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
15 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
16 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
17 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
18 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
19 self.reflector = Reflector(reflector_spec)
20
21 def __getattribute__(self, name):
22 if name=='wheel_positions':
23 return self.wheel1.position, self.wheel2.position, self.wheel3.position
24 elif name=='wheel_positions_l':
25 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
26 else:
27 return object.__getattribute__(self, name)
28
29 def advance(self, wheel1=False, wheel2=False, wheel3=True):
30 if wheel1: self.wheel1.advance()
31 if wheel2: self.wheel2.advance()
32 if wheel3: self.wheel3.advance()
33
34 def lookup(self, letter):
35 a = self.wheel3.forward(letter)
36 b = self.wheel2.forward(a)
37 c = self.wheel1.forward(b)
38 d = self.reflector.forward(c)
39 e = self.wheel1.backward(d)
40 f = self.wheel2.backward(e)
41 g = self.wheel3.backward(f)
42 return g
43
44 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
45 self.wheel1.set_position(wheel1_pos)
46 self.wheel2.set_position(wheel2_pos)
47 self.wheel3.set_position(wheel3_pos)
48
49
50 class Bombe(object):
51
52 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
53 menu=None, start_signal=None, use_diagonal_board=True,
54 verify_plugboard=True):
55 self.connections = []
56 self.wheel1_spec = wheel1_spec
57 self.wheel2_spec = wheel2_spec
58 self.wheel3_spec = wheel3_spec
59 self.reflector_spec = reflector_spec
60 if menu:
61 self.read_menu(menu)
62 if start_signal:
63 self.test_start = start_signal
64 self.use_diagonal_board = use_diagonal_board
65 self.verify_plugboard = verify_plugboard
66
67 def __getattribute__(self, name):
68 if name=='wheel_positions':
69 return self.connections[0].scrambler.wheel_positions
70 elif name=='wheel_positions_l':
71 return self.connections[0].scrambler.wheel_positions_l
72 else:
73 return object.__getattribute__(self, name)
74
75 def __call__(self, start_positions):
76 return start_positions, self.test(initial_signal=self.test_start,
77 start_positions=start_positions,
78 use_diagonal_board=self.use_diagonal_board,
79 verify_plugboard=self.verify_plugboard)
80
81 def add_connection(self, bank_before, bank_after, scrambler):
82 self.connections += [Connection([bank_before, bank_after], scrambler)]
83
84 def read_menu(self, menu):
85 for item in menu:
86 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
87 self.reflector_spec,
88 wheel3_pos=unpos(item.number - 1))
89 self.add_connection(item.before, item.after, scrambler)
90 most_common_letter = (collections.Counter(m.before for m in menu) + collections.Counter(m.after for m in menu)).most_common(1)[0][0]
91 self.test_start = Signal(most_common_letter, most_common_letter)
92
93 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
94 for i, c in enumerate(self.connections):
95 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
96
97 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
98 verify_plugboard=True):
99 self.banks = {label:
100 dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
101 for label in string.ascii_lowercase}
102 if start_positions:
103 self.set_positions(*start_positions)
104 if not initial_signal:
105 initial_signal = self.test_start
106 self.pending = [initial_signal]
107 self.propagate(use_diagonal_board)
108 live_wire_count = len([self.banks[self.test_start.bank][w]
109 for w in self.banks[self.test_start.bank]
110 if self.banks[self.test_start.bank][w]])
111 if live_wire_count < 26:
112 if verify_plugboard:
113 possibles = self.possible_plugboards()
114 return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
115 else:
116 return True
117 else:
118 return False
119
120 def propagate(self, use_diagonal_board):
121 while self.pending:
122 current = self.pending[0]
123 # print("processing", current)
124 self.pending = self.pending[1:]
125 if not self.banks[current.bank][current.wire]:
126 self.banks[current.bank][current.wire] = True
127 if use_diagonal_board:
128 self.pending += [Signal(current.wire, current.bank)]
129 for c in self.connections:
130 if current.bank in c.banks:
131 other_bank = [b for b in c.banks if b != current.bank][0]
132 other_wire = c.scrambler.lookup(current.wire)
133 # print(" adding", other_bank, other_wire, "because", c.banks)
134 self.pending += [Signal(other_bank, other_wire)]
135
136 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
137 if not run_start:
138 run_start = self.test_start
139 self.solutions = []
140 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
141 for run_index in range(26*26*26):
142 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
143 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
144 advance3 = True
145 advance2 = False
146 advance1 = False
147 if (run_index + 1) % 26 == 0: advance2 = True
148 if (run_index + 1) % (26*26) == 0: advance1 = True
149 for c in self.connections:
150 c.scrambler.advance(advance1, advance2, advance3)
151 return self.solutions
152
153 def possible_plugboards(self):
154 possibles = set()
155 for b in self.banks:
156 active = [w for w in self.banks[b] if self.banks[b][w]]
157 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
158 if len(active) == 1:
159 possibles = possibles.union({frozenset((b, active[0]))})
160 if len(inactive) == 1:
161 possibles = possibles.union({frozenset((b, inactive[0]))})
162 return possibles
163