74e03ba5331ebe165b90c762c5f3c2cd63c459d7
7 from cipher
.enigma
import *
10 logger
= logging
.getLogger('bombe')
11 # logger.setLevel(logging.WARNING)
12 # logger.setLevel(logging.INFO)
13 logger
.setLevel(logging
.DEBUG
)
15 # create the logging file handler
16 fh
= logging
.FileHandler("enigma.log")
17 formatter
= logging
.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18 fh
.setFormatter(formatter
)
20 # add handler to logger object
23 ##################################
25 ##################################
27 # Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
30 Signal
= collections
.namedtuple('Signal', ['bank', 'wire'])
31 Connection
= collections
.namedtuple('Connection', ['banks', 'scrambler'])
32 MenuItem
= collections
.namedtuple('MenuIem', ['before', 'after', 'number'])
35 def make_menu(plaintext
, ciphertext
):
36 return [MenuItem(p
, c
, i
+1)
37 for i
, (p
, c
) in enumerate(zip(plaintext
, ciphertext
))]
40 class Scrambler(object):
41 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
42 wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a'):
43 self
.wheel1
= SimpleWheel(wheel1_spec
, position
=wheel1_pos
)
44 self
.wheel2
= SimpleWheel(wheel2_spec
, position
=wheel2_pos
)
45 self
.wheel3
= SimpleWheel(wheel3_spec
, position
=wheel3_pos
)
46 self
.reflector
= Reflector(reflector_spec
)
48 def __getattribute__(self
, name
):
49 if name
=='wheel_positions':
50 return self
.wheel1
.position
, self
.wheel2
.position
, self
.wheel3
.position
51 elif name
=='wheel_positions_l':
52 return self
.wheel1
.position_l
, self
.wheel2
.position_l
, self
.wheel3
.position_l
54 return object.__getattribute
__(self
, name
)
56 def advance(self
, wheel1
=False, wheel2
=False, wheel3
=True):
57 if wheel1
: self
.wheel1
.advance()
58 if wheel2
: self
.wheel2
.advance()
59 if wheel3
: self
.wheel3
.advance()
61 def lookup(self
, letter
):
62 a
= self
.wheel3
.forward(letter
)
63 b
= self
.wheel2
.forward(a
)
64 c
= self
.wheel1
.forward(b
)
65 d
= self
.reflector
.forward(c
)
66 e
= self
.wheel1
.backward(d
)
67 f
= self
.wheel2
.backward(e
)
68 g
= self
.wheel3
.backward(f
)
71 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
72 self
.wheel1
.set_position(wheel1_pos
)
73 self
.wheel2
.set_position(wheel2_pos
)
74 self
.wheel3
.set_position(wheel3_pos
)
79 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
80 menu
=None, start_signal
=None, use_diagonal_board
=True,
81 verify_plugboard
=True):
83 self
.wheel1_spec
= wheel1_spec
84 self
.wheel2_spec
= wheel2_spec
85 self
.wheel3_spec
= wheel3_spec
86 self
.reflector_spec
= reflector_spec
90 self
.test_start
= start_signal
91 self
.use_diagonal_board
= use_diagonal_board
92 self
.verify_plugboard
= verify_plugboard
94 def __getattribute__(self
, name
):
95 if name
=='wheel_positions':
96 return self
.connections
[0].scrambler
.wheel_positions
97 elif name
=='wheel_positions_l':
98 return self
.connections
[0].scrambler
.wheel_positions_l
100 return object.__getattribute
__(self
, name
)
102 def __call__(self
, start_positions
):
103 return start_positions
, self
.test(initial_signal
=self
.test_start
,
104 start_positions
=start_positions
,
105 use_diagonal_board
=self
.use_diagonal_board
,
106 verify_plugboard
=self
.verify_plugboard
)
108 def add_connection(self
, bank_before
, bank_after
, scrambler
):
109 self
.connections
+= [Connection([bank_before
, bank_after
], scrambler
)]
111 def read_menu(self
, menu
):
112 self
.connections
= []
114 scrambler
= Scrambler(self
.wheel1_spec
, self
.wheel2_spec
, self
.wheel3_spec
,
116 wheel3_pos
=unpos(item
.number
- 1))
117 self
.add_connection(item
.before
, item
.after
, scrambler
)
118 most_common_letter
= (collections
.Counter(m
.before
for m
in menu
) +\
119 collections
.Counter(m
.after
for m
in menu
)).most_common(1)[0][0]
120 self
.test_start
= Signal(most_common_letter
, most_common_letter
)
122 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
123 for i
, c
in enumerate(self
.connections
):
124 c
.scrambler
.set_positions(wheel1_pos
, wheel2_pos
, unpos(pos(wheel3_pos
) + i
))
126 def test(self
, initial_signal
=None, start_positions
=None, use_diagonal_board
=True,
127 verify_plugboard
=True):
129 dict(zip(string
.ascii_lowercase
, [False]*len(string
.ascii_lowercase
)))
130 for label
in string
.ascii_lowercase
}
132 self
.set_positions(*start_positions
)
133 if not initial_signal
:
134 initial_signal
= self
.test_start
135 self
.pending
= [initial_signal
]
136 self
.propagate(use_diagonal_board
)
137 live_wire_count
= len([self
.banks
[self
.test_start
.bank
][w
]
138 for w
in self
.banks
[self
.test_start
.bank
]
139 if self
.banks
[self
.test_start
.bank
][w
]])
140 if live_wire_count
< 26:
142 possibles
= self
.possible_plugboards()
143 return all(s0
.isdisjoint(s1
) for s0
in possibles
for s1
in possibles
if s0
!= s1
)
149 def propagate(self
, use_diagonal_board
):
151 current
= self
.pending
[0]
152 # print("processing", current)
153 logger
.debug("Propogater processing {}".format(current
))
154 self
.pending
= self
.pending
[1:]
155 if not self
.banks
[current
.bank
][current
.wire
]:
156 self
.banks
[current
.bank
][current
.wire
] = True
157 if use_diagonal_board
:
158 self
.pending
+= [Signal(current
.wire
, current
.bank
)]
159 for c
in self
.connections
:
160 if current
.bank
in c
.banks
:
161 other_bank
= [b
for b
in c
.banks
if b
!= current
.bank
][0]
162 other_wire
= c
.scrambler
.lookup(current
.wire
)
163 # print(" adding", other_bank, other_wire, "because", c.banks)
164 logger
.debug("Propogator adding {0} {1} because {2}".format(other_bank
, other_wire
, c
.banks
))
165 self
.pending
+= [Signal(other_bank
, other_wire
)]
167 def run(self
, run_start
=None, wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a', use_diagonal_board
=True):
169 run_start
= self
.test_start
171 self
.set_positions(wheel1_pos
, wheel2_pos
, wheel3_pos
)
172 for run_index
in range(26*26*26):
173 if self
.test(initial_signal
=run_start
, use_diagonal_board
=use_diagonal_board
):
174 self
.solutions
+= [self
.connections
[0].scrambler
.wheel_positions_l
]
178 if (run_index
+ 1) % 26 == 0: advance2
= True
179 if (run_index
+ 1) % (26*26) == 0: advance1
= True
180 for c
in self
.connections
:
181 c
.scrambler
.advance(advance1
, advance2
, advance3
)
182 return self
.solutions
184 def possible_plugboards(self
):
187 active
= [w
for w
in self
.banks
[b
] if self
.banks
[b
][w
]]
188 inactive
= [w
for w
in self
.banks
[b
] if not self
.banks
[b
][w
]]
190 possibles
= possibles
.union({frozenset((b
, active
[0]))})
191 if len(inactive
) == 1:
192 possibles
= possibles
.union({frozenset((b
, inactive
[0]))})
196 def run_multi_bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
, menu
,
197 start_signal
=None, use_diagonal_board
=True,
198 verify_plugboard
=True):
199 allwheels
= itertools
.product(string
.ascii_lowercase
, repeat
=3)
201 with multiprocessing
.Pool() as pool
:
202 res
= pool
.map(Bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
,
203 reflector_spec
, menu
=menu
, start_signal
=start_signal
,
204 use_diagonal_board
=use_diagonal_board
,
205 verify_plugboard
=verify_plugboard
),
207 return [r
[0] for r
in res
if r
[1]]