Added formatting to the logger
[cipher-tools.git] / bombe.py
1 import string
2 import collections
3 import multiprocessing
4 import itertools
5 from enigma import *
6
7
8 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
9 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
10 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
11
12 def make_menu(plaintext, ciphertext):
13 return [MenuItem(p, c, i+1)
14 for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
15
16
17 class Scrambler(object):
18 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
19 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
20 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
21 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
22 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
23 self.reflector = Reflector(reflector_spec)
24
25 def __getattribute__(self, name):
26 if name=='wheel_positions':
27 return self.wheel1.position, self.wheel2.position, self.wheel3.position
28 elif name=='wheel_positions_l':
29 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
30 else:
31 return object.__getattribute__(self, name)
32
33 def advance(self, wheel1=False, wheel2=False, wheel3=True):
34 if wheel1: self.wheel1.advance()
35 if wheel2: self.wheel2.advance()
36 if wheel3: self.wheel3.advance()
37
38 def lookup(self, letter):
39 a = self.wheel3.forward(letter)
40 b = self.wheel2.forward(a)
41 c = self.wheel1.forward(b)
42 d = self.reflector.forward(c)
43 e = self.wheel1.backward(d)
44 f = self.wheel2.backward(e)
45 g = self.wheel3.backward(f)
46 return g
47
48 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
49 self.wheel1.set_position(wheel1_pos)
50 self.wheel2.set_position(wheel2_pos)
51 self.wheel3.set_position(wheel3_pos)
52
53
54 class Bombe(object):
55
56 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
57 menu=None, start_signal=None, use_diagonal_board=True,
58 verify_plugboard=True):
59 self.connections = []
60 self.wheel1_spec = wheel1_spec
61 self.wheel2_spec = wheel2_spec
62 self.wheel3_spec = wheel3_spec
63 self.reflector_spec = reflector_spec
64 if menu:
65 self.read_menu(menu)
66 if start_signal:
67 self.test_start = start_signal
68 self.use_diagonal_board = use_diagonal_board
69 self.verify_plugboard = verify_plugboard
70
71 def __getattribute__(self, name):
72 if name=='wheel_positions':
73 return self.connections[0].scrambler.wheel_positions
74 elif name=='wheel_positions_l':
75 return self.connections[0].scrambler.wheel_positions_l
76 else:
77 return object.__getattribute__(self, name)
78
79 def __call__(self, start_positions):
80 return start_positions, self.test(initial_signal=self.test_start,
81 start_positions=start_positions,
82 use_diagonal_board=self.use_diagonal_board,
83 verify_plugboard=self.verify_plugboard)
84
85 def add_connection(self, bank_before, bank_after, scrambler):
86 self.connections += [Connection([bank_before, bank_after], scrambler)]
87
88 def read_menu(self, menu):
89 self.connections = []
90 for item in menu:
91 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
92 self.reflector_spec,
93 wheel3_pos=unpos(item.number - 1))
94 self.add_connection(item.before, item.after, scrambler)
95 most_common_letter = (collections.Counter(m.before for m in menu) + collections.Counter(m.after for m in menu)).most_common(1)[0][0]
96 self.test_start = Signal(most_common_letter, most_common_letter)
97
98 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
99 for i, c in enumerate(self.connections):
100 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
101
102 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
103 verify_plugboard=True):
104 self.banks = {label:
105 dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
106 for label in string.ascii_lowercase}
107 if start_positions:
108 self.set_positions(*start_positions)
109 if not initial_signal:
110 initial_signal = self.test_start
111 self.pending = [initial_signal]
112 self.propagate(use_diagonal_board)
113 live_wire_count = len([self.banks[self.test_start.bank][w]
114 for w in self.banks[self.test_start.bank]
115 if self.banks[self.test_start.bank][w]])
116 if live_wire_count < 26:
117 if verify_plugboard:
118 possibles = self.possible_plugboards()
119 return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
120 else:
121 return True
122 else:
123 return False
124
125 def propagate(self, use_diagonal_board):
126 while self.pending:
127 current = self.pending[0]
128 # print("processing", current)
129 self.pending = self.pending[1:]
130 if not self.banks[current.bank][current.wire]:
131 self.banks[current.bank][current.wire] = True
132 if use_diagonal_board:
133 self.pending += [Signal(current.wire, current.bank)]
134 for c in self.connections:
135 if current.bank in c.banks:
136 other_bank = [b for b in c.banks if b != current.bank][0]
137 other_wire = c.scrambler.lookup(current.wire)
138 # print(" adding", other_bank, other_wire, "because", c.banks)
139 self.pending += [Signal(other_bank, other_wire)]
140
141 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
142 if not run_start:
143 run_start = self.test_start
144 self.solutions = []
145 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
146 for run_index in range(26*26*26):
147 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
148 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
149 advance3 = True
150 advance2 = False
151 advance1 = False
152 if (run_index + 1) % 26 == 0: advance2 = True
153 if (run_index + 1) % (26*26) == 0: advance1 = True
154 for c in self.connections:
155 c.scrambler.advance(advance1, advance2, advance3)
156 return self.solutions
157
158 def possible_plugboards(self):
159 possibles = set()
160 for b in self.banks:
161 active = [w for w in self.banks[b] if self.banks[b][w]]
162 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
163 if len(active) == 1:
164 possibles = possibles.union({frozenset((b, active[0]))})
165 if len(inactive) == 1:
166 possibles = possibles.union({frozenset((b, inactive[0]))})
167 return possibles
168
169
170 def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
171 start_signal=None, use_diagonal_board=True,
172 verify_plugboard=True):
173 allwheels = itertools.product(string.ascii_lowercase, repeat=3)
174
175 with multiprocessing.Pool() as pool:
176 res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
177 reflector_spec, menu=menu, start_signal=start_signal,
178 use_diagonal_board=use_diagonal_board,
179 verify_plugboard=verify_plugboard),
180 allwheels)
181 return [r[0] for r in res if r[1]]