Updated for challenge 9
[cipher-tools.git] / cipher / bombe.py
1 import string
2 import collections
3 import multiprocessing
4 import itertools
5 import logging
6
7 from cipher.enigma import *
8
9
10 logger = logging.getLogger('bombe')
11 # logger.setLevel(logging.WARNING)
12 # logger.setLevel(logging.INFO)
13 logger.setLevel(logging.DEBUG)
14
15 # create the logging file handler
16 fh = logging.FileHandler("enigma.log")
17 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18 fh.setFormatter(formatter)
19
20 # add handler to logger object
21 logger.addHandler(fh)
22
23 ##################################
24 # # Bombe
25 ##################################
26 #
27 # Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
28 #
29
30 Signal = collections.namedtuple('Signal', ['bank', 'wire'])
31 Connection = collections.namedtuple('Connection', ['banks', 'scrambler'])
32 MenuItem = collections.namedtuple('MenuIem', ['before', 'after', 'number'])
33
34
35 def make_menu(plaintext, ciphertext):
36 return [MenuItem(p, c, i+1)
37 for i, (p, c) in enumerate(zip(plaintext, ciphertext))]
38
39
40 class Scrambler(object):
41 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
42 wheel1_pos='a', wheel2_pos='a', wheel3_pos='a'):
43 self.wheel1 = SimpleWheel(wheel1_spec, position=wheel1_pos)
44 self.wheel2 = SimpleWheel(wheel2_spec, position=wheel2_pos)
45 self.wheel3 = SimpleWheel(wheel3_spec, position=wheel3_pos)
46 self.reflector = Reflector(reflector_spec)
47
48 def __getattribute__(self, name):
49 if name=='wheel_positions':
50 return self.wheel1.position, self.wheel2.position, self.wheel3.position
51 elif name=='wheel_positions_l':
52 return self.wheel1.position_l, self.wheel2.position_l, self.wheel3.position_l
53 else:
54 return object.__getattribute__(self, name)
55
56 def advance(self, wheel1=False, wheel2=False, wheel3=True):
57 if wheel1: self.wheel1.advance()
58 if wheel2: self.wheel2.advance()
59 if wheel3: self.wheel3.advance()
60
61 def lookup(self, letter):
62 a = self.wheel3.forward(letter)
63 b = self.wheel2.forward(a)
64 c = self.wheel1.forward(b)
65 d = self.reflector.forward(c)
66 e = self.wheel1.backward(d)
67 f = self.wheel2.backward(e)
68 g = self.wheel3.backward(f)
69 return g
70
71 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
72 self.wheel1.set_position(wheel1_pos)
73 self.wheel2.set_position(wheel2_pos)
74 self.wheel3.set_position(wheel3_pos)
75
76
77 class Bombe(object):
78
79 def __init__(self, wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec,
80 menu=None, start_signal=None, use_diagonal_board=True,
81 verify_plugboard=True):
82 self.connections = []
83 self.wheel1_spec = wheel1_spec
84 self.wheel2_spec = wheel2_spec
85 self.wheel3_spec = wheel3_spec
86 self.reflector_spec = reflector_spec
87 if menu:
88 self.read_menu(menu)
89 if start_signal:
90 self.test_start = start_signal
91 self.use_diagonal_board = use_diagonal_board
92 self.verify_plugboard = verify_plugboard
93
94 def __getattribute__(self, name):
95 if name=='wheel_positions':
96 return self.connections[0].scrambler.wheel_positions
97 elif name=='wheel_positions_l':
98 return self.connections[0].scrambler.wheel_positions_l
99 else:
100 return object.__getattribute__(self, name)
101
102 def __call__(self, start_positions):
103 return start_positions, self.test(initial_signal=self.test_start,
104 start_positions=start_positions,
105 use_diagonal_board=self.use_diagonal_board,
106 verify_plugboard=self.verify_plugboard)
107
108 def add_connection(self, bank_before, bank_after, scrambler):
109 self.connections += [Connection([bank_before, bank_after], scrambler)]
110
111 def read_menu(self, menu):
112 self.connections = []
113 for item in menu:
114 scrambler = Scrambler(self.wheel1_spec, self.wheel2_spec, self.wheel3_spec,
115 self.reflector_spec,
116 wheel3_pos=unpos(item.number - 1))
117 self.add_connection(item.before, item.after, scrambler)
118 most_common_letter = (collections.Counter(m.before for m in menu) +\
119 collections.Counter(m.after for m in menu)).most_common(1)[0][0]
120 self.test_start = Signal(most_common_letter, most_common_letter)
121
122 def set_positions(self, wheel1_pos, wheel2_pos, wheel3_pos):
123 for i, c in enumerate(self.connections):
124 c.scrambler.set_positions(wheel1_pos, wheel2_pos, unpos(pos(wheel3_pos) + i))
125
126 def test(self, initial_signal=None, start_positions=None, use_diagonal_board=True,
127 verify_plugboard=True):
128 self.banks = {label:
129 dict(zip(string.ascii_lowercase, [False]*len(string.ascii_lowercase)))
130 for label in string.ascii_lowercase}
131 if start_positions:
132 self.set_positions(*start_positions)
133 if not initial_signal:
134 initial_signal = self.test_start
135 self.pending = [initial_signal]
136 self.propagate(use_diagonal_board)
137 live_wire_count = len([self.banks[self.test_start.bank][w]
138 for w in self.banks[self.test_start.bank]
139 if self.banks[self.test_start.bank][w]])
140 if live_wire_count < 26:
141 if verify_plugboard:
142 possibles = self.possible_plugboards()
143 return all(s0.isdisjoint(s1) for s0 in possibles for s1 in possibles if s0 != s1)
144 else:
145 return True
146 else:
147 return False
148
149 def propagate(self, use_diagonal_board):
150 while self.pending:
151 current = self.pending[0]
152 # print("processing", current)
153 logger.debug("Propogater processing {}".format(current))
154 self.pending = self.pending[1:]
155 if not self.banks[current.bank][current.wire]:
156 self.banks[current.bank][current.wire] = True
157 if use_diagonal_board:
158 self.pending += [Signal(current.wire, current.bank)]
159 for c in self.connections:
160 if current.bank in c.banks:
161 other_bank = [b for b in c.banks if b != current.bank][0]
162 other_wire = c.scrambler.lookup(current.wire)
163 # print(" adding", other_bank, other_wire, "because", c.banks)
164 logger.debug("Propogator adding {0} {1} because {2}".format(other_bank, other_wire, c.banks))
165 self.pending += [Signal(other_bank, other_wire)]
166
167 def run(self, run_start=None, wheel1_pos='a', wheel2_pos='a', wheel3_pos='a', use_diagonal_board=True):
168 if not run_start:
169 run_start = self.test_start
170 self.solutions = []
171 self.set_positions(wheel1_pos, wheel2_pos, wheel3_pos)
172 for run_index in range(26*26*26):
173 if self.test(initial_signal=run_start, use_diagonal_board=use_diagonal_board):
174 self.solutions += [self.connections[0].scrambler.wheel_positions_l]
175 advance3 = True
176 advance2 = False
177 advance1 = False
178 if (run_index + 1) % 26 == 0: advance2 = True
179 if (run_index + 1) % (26*26) == 0: advance1 = True
180 for c in self.connections:
181 c.scrambler.advance(advance1, advance2, advance3)
182 return self.solutions
183
184 def possible_plugboards(self):
185 possibles = set()
186 for b in self.banks:
187 active = [w for w in self.banks[b] if self.banks[b][w]]
188 inactive = [w for w in self.banks[b] if not self.banks[b][w]]
189 if len(active) == 1:
190 possibles = possibles.union({frozenset((b, active[0]))})
191 if len(inactive) == 1:
192 possibles = possibles.union({frozenset((b, inactive[0]))})
193 return possibles
194
195
196 def run_multi_bombe(wheel1_spec, wheel2_spec, wheel3_spec, reflector_spec, menu,
197 start_signal=None, use_diagonal_board=True,
198 verify_plugboard=True):
199 allwheels = itertools.product(string.ascii_lowercase, repeat=3)
200
201 with multiprocessing.Pool() as pool:
202 res = pool.map(Bombe(wheel1_spec, wheel2_spec, wheel3_spec,
203 reflector_spec, menu=menu, start_signal=start_signal,
204 use_diagonal_board=use_diagonal_board,
205 verify_plugboard=verify_plugboard),
206 allwheels)
207 return [r[0] for r in res if r[1]]