9 logger
= logging
.getLogger('bombe')
10 # logger.setLevel(logging.WARNING)
11 # logger.setLevel(logging.INFO)
12 logger
.setLevel(logging
.DEBUG
)
14 # create the logging file handler
15 fh
= logging
.FileHandler("enigma.log")
16 formatter
= logging
.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17 fh
.setFormatter(formatter
)
19 # add handler to logger object
22 ##################################
24 ##################################
26 # Good explanation of [how the bombe worked](http://www.ellsbury.com/enigmabombe.htm) by Graham Ellsbury
29 Signal
= collections
.namedtuple('Signal', ['bank', 'wire'])
30 Connection
= collections
.namedtuple('Connection', ['banks', 'scrambler'])
31 MenuItem
= collections
.namedtuple('MenuIem', ['before', 'after', 'number'])
34 def make_menu(plaintext
, ciphertext
):
35 return [MenuItem(p
, c
, i
+1)
36 for i
, (p
, c
) in enumerate(zip(plaintext
, ciphertext
))]
39 class Scrambler(object):
40 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
41 wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a'):
42 self
.wheel1
= SimpleWheel(wheel1_spec
, position
=wheel1_pos
)
43 self
.wheel2
= SimpleWheel(wheel2_spec
, position
=wheel2_pos
)
44 self
.wheel3
= SimpleWheel(wheel3_spec
, position
=wheel3_pos
)
45 self
.reflector
= Reflector(reflector_spec
)
47 def __getattribute__(self
, name
):
48 if name
=='wheel_positions':
49 return self
.wheel1
.position
, self
.wheel2
.position
, self
.wheel3
.position
50 elif name
=='wheel_positions_l':
51 return self
.wheel1
.position_l
, self
.wheel2
.position_l
, self
.wheel3
.position_l
53 return object.__getattribute
__(self
, name
)
55 def advance(self
, wheel1
=False, wheel2
=False, wheel3
=True):
56 if wheel1
: self
.wheel1
.advance()
57 if wheel2
: self
.wheel2
.advance()
58 if wheel3
: self
.wheel3
.advance()
60 def lookup(self
, letter
):
61 a
= self
.wheel3
.forward(letter
)
62 b
= self
.wheel2
.forward(a
)
63 c
= self
.wheel1
.forward(b
)
64 d
= self
.reflector
.forward(c
)
65 e
= self
.wheel1
.backward(d
)
66 f
= self
.wheel2
.backward(e
)
67 g
= self
.wheel3
.backward(f
)
70 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
71 self
.wheel1
.set_position(wheel1_pos
)
72 self
.wheel2
.set_position(wheel2_pos
)
73 self
.wheel3
.set_position(wheel3_pos
)
78 def __init__(self
, wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
,
79 menu
=None, start_signal
=None, use_diagonal_board
=True,
80 verify_plugboard
=True):
82 self
.wheel1_spec
= wheel1_spec
83 self
.wheel2_spec
= wheel2_spec
84 self
.wheel3_spec
= wheel3_spec
85 self
.reflector_spec
= reflector_spec
89 self
.test_start
= start_signal
90 self
.use_diagonal_board
= use_diagonal_board
91 self
.verify_plugboard
= verify_plugboard
93 def __getattribute__(self
, name
):
94 if name
=='wheel_positions':
95 return self
.connections
[0].scrambler
.wheel_positions
96 elif name
=='wheel_positions_l':
97 return self
.connections
[0].scrambler
.wheel_positions_l
99 return object.__getattribute
__(self
, name
)
101 def __call__(self
, start_positions
):
102 return start_positions
, self
.test(initial_signal
=self
.test_start
,
103 start_positions
=start_positions
,
104 use_diagonal_board
=self
.use_diagonal_board
,
105 verify_plugboard
=self
.verify_plugboard
)
107 def add_connection(self
, bank_before
, bank_after
, scrambler
):
108 self
.connections
+= [Connection([bank_before
, bank_after
], scrambler
)]
110 def read_menu(self
, menu
):
111 self
.connections
= []
113 scrambler
= Scrambler(self
.wheel1_spec
, self
.wheel2_spec
, self
.wheel3_spec
,
115 wheel3_pos
=unpos(item
.number
- 1))
116 self
.add_connection(item
.before
, item
.after
, scrambler
)
117 most_common_letter
= (collections
.Counter(m
.before
for m
in menu
) +\
118 collections
.Counter(m
.after
for m
in menu
)).most_common(1)[0][0]
119 self
.test_start
= Signal(most_common_letter
, most_common_letter
)
121 def set_positions(self
, wheel1_pos
, wheel2_pos
, wheel3_pos
):
122 for i
, c
in enumerate(self
.connections
):
123 c
.scrambler
.set_positions(wheel1_pos
, wheel2_pos
, unpos(pos(wheel3_pos
) + i
))
125 def test(self
, initial_signal
=None, start_positions
=None, use_diagonal_board
=True,
126 verify_plugboard
=True):
128 dict(zip(string
.ascii_lowercase
, [False]*len(string
.ascii_lowercase
)))
129 for label
in string
.ascii_lowercase
}
131 self
.set_positions(*start_positions
)
132 if not initial_signal
:
133 initial_signal
= self
.test_start
134 self
.pending
= [initial_signal
]
135 self
.propagate(use_diagonal_board
)
136 live_wire_count
= len([self
.banks
[self
.test_start
.bank
][w
]
137 for w
in self
.banks
[self
.test_start
.bank
]
138 if self
.banks
[self
.test_start
.bank
][w
]])
139 if live_wire_count
< 26:
141 possibles
= self
.possible_plugboards()
142 return all(s0
.isdisjoint(s1
) for s0
in possibles
for s1
in possibles
if s0
!= s1
)
148 def propagate(self
, use_diagonal_board
):
150 current
= self
.pending
[0]
151 # print("processing", current)
152 logger
.debug("Propogater processing {}".format(current
))
153 self
.pending
= self
.pending
[1:]
154 if not self
.banks
[current
.bank
][current
.wire
]:
155 self
.banks
[current
.bank
][current
.wire
] = True
156 if use_diagonal_board
:
157 self
.pending
+= [Signal(current
.wire
, current
.bank
)]
158 for c
in self
.connections
:
159 if current
.bank
in c
.banks
:
160 other_bank
= [b
for b
in c
.banks
if b
!= current
.bank
][0]
161 other_wire
= c
.scrambler
.lookup(current
.wire
)
162 # print(" adding", other_bank, other_wire, "because", c.banks)
163 logger
.debug("Propogator adding {0} {1} because {2}".format(other_bank
, other_wire
, c
.banks
))
164 self
.pending
+= [Signal(other_bank
, other_wire
)]
166 def run(self
, run_start
=None, wheel1_pos
='a', wheel2_pos
='a', wheel3_pos
='a', use_diagonal_board
=True):
168 run_start
= self
.test_start
170 self
.set_positions(wheel1_pos
, wheel2_pos
, wheel3_pos
)
171 for run_index
in range(26*26*26):
172 if self
.test(initial_signal
=run_start
, use_diagonal_board
=use_diagonal_board
):
173 self
.solutions
+= [self
.connections
[0].scrambler
.wheel_positions_l
]
177 if (run_index
+ 1) % 26 == 0: advance2
= True
178 if (run_index
+ 1) % (26*26) == 0: advance1
= True
179 for c
in self
.connections
:
180 c
.scrambler
.advance(advance1
, advance2
, advance3
)
181 return self
.solutions
183 def possible_plugboards(self
):
186 active
= [w
for w
in self
.banks
[b
] if self
.banks
[b
][w
]]
187 inactive
= [w
for w
in self
.banks
[b
] if not self
.banks
[b
][w
]]
189 possibles
= possibles
.union({frozenset((b
, active
[0]))})
190 if len(inactive
) == 1:
191 possibles
= possibles
.union({frozenset((b
, inactive
[0]))})
195 def run_multi_bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
, reflector_spec
, menu
,
196 start_signal
=None, use_diagonal_board
=True,
197 verify_plugboard
=True):
198 allwheels
= itertools
.product(string
.ascii_lowercase
, repeat
=3)
200 with multiprocessing
.Pool() as pool
:
201 res
= pool
.map(Bombe(wheel1_spec
, wheel2_spec
, wheel3_spec
,
202 reflector_spec
, menu
=menu
, start_signal
=start_signal
,
203 use_diagonal_board
=use_diagonal_board
,
204 verify_plugboard
=verify_plugboard
),
206 return [r
[0] for r
in res
if r
[1]]