2 # from itertools import starmap
5 from support
.utilities
import *
6 from support
.language_models
import *
8 from logger
import logger
10 # logger.setLevel(logging.DEBUG)
13 class KeywordWrapAlphabet(Enum
):
19 def keyword_cipher_alphabet_of(keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
20 """Find the cipher alphabet given a keyword.
21 wrap_alphabet controls how the rest of the alphabet is added
24 >>> keyword_cipher_alphabet_of('bayes')
25 'bayescdfghijklmnopqrtuvwxz'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
27 'bayescdfghijklmnopqrtuvwxz'
28 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
29 'bayestuvwxzcdfghijklmnopqr'
30 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
31 'bayeszcdfghijklmnopqrtuvwx'
33 if wrap_alphabet
== KeywordWrapAlphabet
.from_a
:
34 cipher_alphabet
= cat(deduplicate(sanitise(keyword
) +
35 string
.ascii_lowercase
))
37 if wrap_alphabet
== KeywordWrapAlphabet
.from_last
:
38 last_keyword_letter
= deduplicate(sanitise(keyword
))[-1]
40 last_keyword_letter
= sorted(sanitise(keyword
))[-1]
41 last_keyword_position
= string
.ascii_lowercase
.find(
42 last_keyword_letter
) + 1
43 cipher_alphabet
= cat(
44 deduplicate(sanitise(keyword
) +
45 string
.ascii_lowercase
[last_keyword_position
:] +
46 string
.ascii_lowercase
))
47 return cipher_alphabet
50 def keyword_encipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
51 """Enciphers a message with a keyword substitution cipher.
52 wrap_alphabet controls how the rest of the alphabet is added
55 1 : from the last letter in the sanitised keyword
56 2 : from the largest letter in the sanitised keyword
58 >>> keyword_encipher('test message', 'bayes')
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
62 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
64 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
67 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
68 cipher_translation
= ''.maketrans(string
.ascii_lowercase
, cipher_alphabet
)
69 return unaccent(message
).lower().translate(cipher_translation
)
71 def keyword_decipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
72 """Deciphers a message with a keyword substitution cipher.
73 wrap_alphabet controls how the rest of the alphabet is added
76 1 : from the last letter in the sanitised keyword
77 2 : from the largest letter in the sanitised keyword
79 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
81 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
83 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
85 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
88 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
89 cipher_translation
= ''.maketrans(cipher_alphabet
, string
.ascii_lowercase
)
90 return message
.lower().translate(cipher_translation
)
93 def keyword_break(message
, wordlist
=keywords
, fitness
=Pletters
):
94 """Breaks a keyword substitution cipher using a dictionary and
97 >>> keyword_break(keyword_encipher('this is a test message for the ' \
98 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
99 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
100 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
103 best_wrap_alphabet
= True
104 best_fit
= float("-inf")
105 for wrap_alphabet
in KeywordWrapAlphabet
:
106 for keyword
in wordlist
:
107 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
108 fit
= fitness(plaintext
)
109 logger
.debug('Keyword break attempt using key {0} (wrap={1}) '
110 'gives fit of {2} and decrypt starting: {3}'.format(
111 keyword
, wrap_alphabet
, fit
,
112 sanitise(plaintext
)[:50]))
115 best_keyword
= keyword
116 best_wrap_alphabet
= wrap_alphabet
117 logger
.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
118 '{2} and decrypt starting: {3}'.format(best_keyword
,
119 best_wrap_alphabet
, best_fit
, sanitise(
120 keyword_decipher(message
, best_keyword
,
121 best_wrap_alphabet
))[:50]))
122 return (best_keyword
, best_wrap_alphabet
), best_fit
124 def keyword_break_mp(message
, wordlist
=keywords
, fitness
=Pletters
,
125 number_of_solutions
=1, chunksize
=500):
126 """Breaks a keyword substitution cipher using a dictionary and
129 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
130 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
131 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
132 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
133 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
134 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
135 wordlist=['cat', 'elephant', 'kangaroo'], \
136 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
137 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
138 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
140 with multiprocessing
.Pool() as pool
:
141 helper_args
= [(message
, word
, wrap
, fitness
)
143 for wrap
in KeywordWrapAlphabet
]
144 # Gotcha: the helper function here needs to be defined at the top level
145 # (limitation of Pool.starmap)
146 breaks
= pool
.starmap(keyword_break_worker
, helper_args
, chunksize
)
147 if number_of_solutions
== 1:
148 return max(breaks
, key
=lambda k
: k
[1])
150 return sorted(breaks
, key
=lambda k
: k
[1], reverse
=True)[:number_of_solutions
]
152 def keyword_break_worker(message
, keyword
, wrap_alphabet
, fitness
):
153 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
154 fit
= fitness(plaintext
)
155 logger
.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
156 '{2} and decrypt starting: {3}'.format(keyword
,
157 wrap_alphabet
, fit
, sanitise(plaintext
)[:50]))
158 return (keyword
, wrap_alphabet
), fit
161 def monoalphabetic_break_hillclimbing(message
,
162 max_iterations
=20000,
164 cipher_alphabet
=None,
165 swap_index_finder
=None,
166 fitness
=Pletters
, chunksize
=1):
167 return simulated_annealing_break(message
,
169 initial_temperature
=0,
170 max_iterations
=max_iterations
,
171 plain_alphabet
=plain_alphabet
,
172 cipher_alphabet
=cipher_alphabet
,
173 swap_index_finder
=swap_index_finder
,
174 fitness
=fitness
, chunksize
=chunksize
)
177 def monoalphabetic_break_hillclimbing_mp(message
,
179 max_iterations
=20000,
181 cipher_alphabet
=None,
182 swap_index_finder
=None,
183 fitness
=Pletters
, chunksize
=1):
184 return simulated_annealing_break(message
,
186 initial_temperature
=0,
187 max_iterations
=max_iterations
,
188 plain_alphabet
=plain_alphabet
,
189 cipher_alphabet
=cipher_alphabet
,
190 swap_index_finder
=swap_index_finder
,
191 fitness
=fitness
, chunksize
=chunksize
)
194 def gaussian_swap_index(a
):
195 return (a
+ int(random
.gauss(0, 4))) % 26
197 def uniform_swap_index(a
):
198 return random
.randrange(26)
200 def simulated_annealing_break(message
, workers
=10,
201 initial_temperature
=200,
202 max_iterations
=20000,
204 cipher_alphabet
=None,
205 swap_index_finder
=None,
206 fitness
=Ptrigrams
, chunksize
=1):
208 ciphertext
= sanitise(message
)
209 if swap_index_finder
is None:
210 swap_index_finder
= gaussian_swap_index
212 for i
in range(workers
):
213 if plain_alphabet
is None:
214 used_plain_alphabet
= string
.ascii_lowercase
216 used_plain_alphabet
= plain_alphabet
217 if cipher_alphabet
is None:
218 used_cipher_alphabet
= list(string
.ascii_lowercase
)
219 random
.shuffle(used_cipher_alphabet
)
220 used_cipher_alphabet
= cat(used_cipher_alphabet
)
222 used_cipher_alphabet
= cipher_alphabet
223 # if not plain_alphabet:
224 # plain_alphabet = string.ascii_lowercase
225 # if not cipher_alphabet:
226 # cipher_alphabet = list(string.ascii_lowercase)
227 # random.shuffle(cipher_alphabet)
228 # cipher_alphabet = cat(cipher_alphabet)
229 worker_args
.append((ciphertext
, used_plain_alphabet
, used_cipher_alphabet
,
231 initial_temperature
, max_iterations
, fitness
,
233 with multiprocessing
.Pool() as pool
:
234 breaks
= pool
.starmap(simulated_annealing_break_worker
,
235 worker_args
, chunksize
)
236 return max(breaks
, key
=lambda k
: k
[1])
239 def simulated_annealing_break_worker(message
, plain_alphabet
, cipher_alphabet
,
241 t0
, max_iterations
, fitness
,
243 def swap(letters
, i
, j
):
249 return (letters
[:i
] + letters
[j
] + letters
[i
+1:j
] + letters
[i
] +
254 dt
= t0
/ (0.9 * max_iterations
)
256 current_alphabet
= cipher_alphabet
257 alphabet
= current_alphabet
258 cipher_translation
= ''.maketrans(current_alphabet
, plain_alphabet
)
259 plaintext
= message
.translate(cipher_translation
)
260 current_fitness
= fitness(plaintext
)
262 best_alphabet
= current_alphabet
263 best_fitness
= current_fitness
264 best_plaintext
= plaintext
266 # print('starting for', max_iterations)
267 for i
in range(max_iterations
):
268 swap_a
= random
.randrange(26)
269 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
270 swap_b
= swap_index_finder(swap_a
)
271 alphabet
= swap(current_alphabet
, swap_a
, swap_b
)
272 cipher_translation
= ''.maketrans(alphabet
, plain_alphabet
)
273 plaintext
= message
.translate(cipher_translation
)
274 new_fitness
= fitness(plaintext
)
276 sa_chance
= math
.exp((new_fitness
- current_fitness
) / temperature
)
277 except (OverflowError, ZeroDivisionError):
278 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
280 if (new_fitness
> current_fitness
or random
.random() < sa_chance
):
281 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
282 # 'current alphabet {}, current_fitness {}, '
283 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
284 # current_fitness, best_plaintext[:50]))
286 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
287 current_fitness
= new_fitness
288 current_alphabet
= alphabet
290 if current_fitness
> best_fitness
:
291 best_alphabet
= current_alphabet
292 best_fitness
= current_fitness
293 best_plaintext
= plaintext
295 logger
.debug('Simulated annealing worker {}: iteration {}, temperature {}, '
296 'current alphabet {}, plain alphabet {}, current_fitness {}, '
297 'best_plaintext {}'.format(logID
, i
, temperature
, current_alphabet
, plain_alphabet
,
298 current_fitness
, plaintext
[:50]))
299 temperature
= max(temperature
- dt
, 0.001)
301 return best_alphabet
, best_fitness
# current_alphabet, current_fitness
303 if __name__
== "__main__":