Updated for challenge 9
[cipher-tools.git] / cipher / keyword_cipher.py
1 from enum import Enum
2 # from itertools import starmap
3 import multiprocessing
4 import math
5 from support.utilities import *
6 from support.language_models import *
7
8 from logger import logger
9 import logging
10 # logger.setLevel(logging.DEBUG)
11
12
13 class KeywordWrapAlphabet(Enum):
14 from_a = 1
15 from_last = 2
16 from_largest = 3
17
18
19 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
20 """Find the cipher alphabet given a keyword.
21 wrap_alphabet controls how the rest of the alphabet is added
22 after the keyword.
23
24 >>> keyword_cipher_alphabet_of('bayes')
25 'bayescdfghijklmnopqrtuvwxz'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
27 'bayescdfghijklmnopqrtuvwxz'
28 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
29 'bayestuvwxzcdfghijklmnopqr'
30 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
31 'bayeszcdfghijklmnopqrtuvwx'
32 """
33 if wrap_alphabet == KeywordWrapAlphabet.from_a:
34 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
35 string.ascii_lowercase))
36 else:
37 if wrap_alphabet == KeywordWrapAlphabet.from_last:
38 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
39 else:
40 last_keyword_letter = sorted(sanitise(keyword))[-1]
41 last_keyword_position = string.ascii_lowercase.find(
42 last_keyword_letter) + 1
43 cipher_alphabet = cat(
44 deduplicate(sanitise(keyword) +
45 string.ascii_lowercase[last_keyword_position:] +
46 string.ascii_lowercase))
47 return cipher_alphabet
48
49
50 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
51 """Enciphers a message with a keyword substitution cipher.
52 wrap_alphabet controls how the rest of the alphabet is added
53 after the keyword.
54 0 : from 'a'
55 1 : from the last letter in the sanitised keyword
56 2 : from the largest letter in the sanitised keyword
57
58 >>> keyword_encipher('test message', 'bayes')
59 'rsqr ksqqbds'
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
61 'rsqr ksqqbds'
62 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
63 'lskl dskkbus'
64 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
65 'qspq jsppbcs'
66 """
67 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
68 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
69 return unaccent(message).lower().translate(cipher_translation)
70
71 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
72 """Deciphers a message with a keyword substitution cipher.
73 wrap_alphabet controls how the rest of the alphabet is added
74 after the keyword.
75 0 : from 'a'
76 1 : from the last letter in the sanitised keyword
77 2 : from the largest letter in the sanitised keyword
78
79 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
80 'test message'
81 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
82 'test message'
83 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
84 'test message'
85 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
86 'test message'
87 """
88 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
89 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
90 return message.lower().translate(cipher_translation)
91
92
93 def keyword_break(message, wordlist=keywords, fitness=Pletters):
94 """Breaks a keyword substitution cipher using a dictionary and
95 frequency analysis.
96
97 >>> keyword_break(keyword_encipher('this is a test message for the ' \
98 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
99 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
100 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
101 """
102 best_keyword = ''
103 best_wrap_alphabet = True
104 best_fit = float("-inf")
105 for wrap_alphabet in KeywordWrapAlphabet:
106 for keyword in wordlist:
107 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
108 fit = fitness(plaintext)
109 logger.debug('Keyword break attempt using key {0} (wrap={1}) '
110 'gives fit of {2} and decrypt starting: {3}'.format(
111 keyword, wrap_alphabet, fit,
112 sanitise(plaintext)[:50]))
113 if fit > best_fit:
114 best_fit = fit
115 best_keyword = keyword
116 best_wrap_alphabet = wrap_alphabet
117 logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
118 '{2} and decrypt starting: {3}'.format(best_keyword,
119 best_wrap_alphabet, best_fit, sanitise(
120 keyword_decipher(message, best_keyword,
121 best_wrap_alphabet))[:50]))
122 return (best_keyword, best_wrap_alphabet), best_fit
123
124 def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
125 number_of_solutions=1, chunksize=500):
126 """Breaks a keyword substitution cipher using a dictionary and
127 frequency analysis
128
129 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
130 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
131 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
132 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
133 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
134 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
135 wordlist=['cat', 'elephant', 'kangaroo'], \
136 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
137 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
138 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
139 """
140 with multiprocessing.Pool() as pool:
141 helper_args = [(message, word, wrap, fitness)
142 for word in wordlist
143 for wrap in KeywordWrapAlphabet]
144 # Gotcha: the helper function here needs to be defined at the top level
145 # (limitation of Pool.starmap)
146 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
147 if number_of_solutions == 1:
148 return max(breaks, key=lambda k: k[1])
149 else:
150 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
151
152 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
153 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
154 fit = fitness(plaintext)
155 logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
156 '{2} and decrypt starting: {3}'.format(keyword,
157 wrap_alphabet, fit, sanitise(plaintext)[:50]))
158 return (keyword, wrap_alphabet), fit
159
160
161 def monoalphabetic_break_hillclimbing(message,
162 max_iterations=20000,
163 plain_alphabet=None,
164 cipher_alphabet=None,
165 swap_index_finder=None,
166 fitness=Pletters, chunksize=1):
167 return simulated_annealing_break(message,
168 workers=1,
169 initial_temperature=0,
170 max_iterations=max_iterations,
171 plain_alphabet=plain_alphabet,
172 cipher_alphabet=cipher_alphabet,
173 swap_index_finder=swap_index_finder,
174 fitness=fitness, chunksize=chunksize)
175
176
177 def monoalphabetic_break_hillclimbing_mp(message,
178 workers=10,
179 max_iterations=20000,
180 plain_alphabet=None,
181 cipher_alphabet=None,
182 swap_index_finder=None,
183 fitness=Pletters, chunksize=1):
184 return simulated_annealing_break(message,
185 workers=workers,
186 initial_temperature=0,
187 max_iterations=max_iterations,
188 plain_alphabet=plain_alphabet,
189 cipher_alphabet=cipher_alphabet,
190 swap_index_finder=swap_index_finder,
191 fitness=fitness, chunksize=chunksize)
192
193
194 def gaussian_swap_index(a):
195 return (a + int(random.gauss(0, 4))) % 26
196
197 def uniform_swap_index(a):
198 return random.randrange(26)
199
200 def simulated_annealing_break(message, workers=10,
201 initial_temperature=200,
202 max_iterations=20000,
203 plain_alphabet=None,
204 cipher_alphabet=None,
205 swap_index_finder=None,
206 fitness=Ptrigrams, chunksize=1):
207 worker_args = []
208 ciphertext = sanitise(message)
209 if swap_index_finder is None:
210 swap_index_finder = gaussian_swap_index
211
212 for i in range(workers):
213 if plain_alphabet is None:
214 used_plain_alphabet = string.ascii_lowercase
215 else:
216 used_plain_alphabet = plain_alphabet
217 if cipher_alphabet is None:
218 used_cipher_alphabet = list(string.ascii_lowercase)
219 random.shuffle(used_cipher_alphabet)
220 used_cipher_alphabet = cat(used_cipher_alphabet)
221 else:
222 used_cipher_alphabet = cipher_alphabet
223 # if not plain_alphabet:
224 # plain_alphabet = string.ascii_lowercase
225 # if not cipher_alphabet:
226 # cipher_alphabet = list(string.ascii_lowercase)
227 # random.shuffle(cipher_alphabet)
228 # cipher_alphabet = cat(cipher_alphabet)
229 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
230 swap_index_finder,
231 initial_temperature, max_iterations, fitness,
232 i))
233 with multiprocessing.Pool() as pool:
234 breaks = pool.starmap(simulated_annealing_break_worker,
235 worker_args, chunksize)
236 return max(breaks, key=lambda k: k[1])
237
238
239 def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
240 swap_index_finder,
241 t0, max_iterations, fitness,
242 logID):
243 def swap(letters, i, j):
244 if i > j:
245 i, j = j, i
246 if i == j:
247 return letters
248 else:
249 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
250 letters[j+1:])
251
252 temperature = t0
253
254 dt = t0 / (0.9 * max_iterations)
255
256 current_alphabet = cipher_alphabet
257 alphabet = current_alphabet
258 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
259 plaintext = message.translate(cipher_translation)
260 current_fitness = fitness(plaintext)
261
262 best_alphabet = current_alphabet
263 best_fitness = current_fitness
264 best_plaintext = plaintext
265
266 # print('starting for', max_iterations)
267 for i in range(max_iterations):
268 swap_a = random.randrange(26)
269 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
270 swap_b = swap_index_finder(swap_a)
271 alphabet = swap(current_alphabet, swap_a, swap_b)
272 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
273 plaintext = message.translate(cipher_translation)
274 new_fitness = fitness(plaintext)
275 try:
276 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
277 except (OverflowError, ZeroDivisionError):
278 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
279 sa_chance = 0
280 if (new_fitness > current_fitness or random.random() < sa_chance):
281 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
282 # 'current alphabet {}, current_fitness {}, '
283 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
284 # current_fitness, best_plaintext[:50]))
285
286 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
287 current_fitness = new_fitness
288 current_alphabet = alphabet
289
290 if current_fitness > best_fitness:
291 best_alphabet = current_alphabet
292 best_fitness = current_fitness
293 best_plaintext = plaintext
294 if i % 500 == 0:
295 logger.debug('Simulated annealing worker {}: iteration {}, temperature {}, '
296 'current alphabet {}, plain alphabet {}, current_fitness {}, '
297 'best_plaintext {}'.format(logID, i, temperature, current_alphabet, plain_alphabet,
298 current_fitness, plaintext[:50]))
299 temperature = max(temperature - dt, 0.001)
300
301 return best_alphabet, best_fitness # current_alphabet, current_fitness
302
303 if __name__ == "__main__":
304 import doctest