Started on documentation
[szyfrow.git] / szyfrow / keyword_cipher.py
1 """Monoalphabetic substitution ciphers, mainly done by keyword. Enciphering
2 and deciphering, and a couple of ways to break these ciphers.
3 """
4 from enum import Enum
5 import multiprocessing
6 import math
7 from szyfrow.support.utilities import *
8 from szyfrow.support.language_models import *
9
10
11 class KeywordWrapAlphabet(Enum):
12 """Ways to list the rest of the alphabet after use of a keyword.
13
14 * `from_a` : continue the alphabet from 'a': `bayescdfg...`
15 * `from_last`: continue from the last letter of the keyword:
16 `bayestuvwxyzcdf...`
17 * `from_largest`: continue from the "largest" letter of the keyword:
18 `bayeszcdfg...`
19 """
20 from_a = 1
21 from_last = 2
22 from_largest = 3
23
24
25 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
26 """Find the cipher alphabet given a keyword.
27
28 [`wrap_alphabet`](#szyfrow.keyword_cipher.KeywordWrapAlphabet) controls
29 how the rest of the alphabet is added after the keyword.
30
31 >>> keyword_cipher_alphabet_of('bayes')
32 'bayescdfghijklmnopqrtuvwxz'
33 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
34 'bayescdfghijklmnopqrtuvwxz'
35 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
36 'bayestuvwxzcdfghijklmnopqr'
37 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
38 'bayeszcdfghijklmnopqrtuvwx'
39 """
40 if wrap_alphabet == KeywordWrapAlphabet.from_a:
41 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
42 string.ascii_lowercase))
43 else:
44 if wrap_alphabet == KeywordWrapAlphabet.from_last:
45 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
46 else:
47 last_keyword_letter = sorted(sanitise(keyword))[-1]
48 last_keyword_position = string.ascii_lowercase.find(
49 last_keyword_letter) + 1
50 cipher_alphabet = cat(
51 deduplicate(sanitise(keyword) +
52 string.ascii_lowercase[last_keyword_position:] +
53 string.ascii_lowercase))
54 return cipher_alphabet
55
56
57 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
58 """Enciphers a message with a keyword substitution cipher.
59 wrap_alphabet controls how the rest of the alphabet is added
60 after the keyword.
61 0 : from 'a'
62 1 : from the last letter in the sanitised keyword
63 2 : from the largest letter in the sanitised keyword
64
65 >>> keyword_encipher('test message', 'bayes')
66 'rsqr ksqqbds'
67 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
68 'rsqr ksqqbds'
69 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
70 'lskl dskkbus'
71 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
72 'qspq jsppbcs'
73 """
74 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
75 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
76 return unaccent(message).lower().translate(cipher_translation)
77
78 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
79 """Deciphers a message with a keyword substitution cipher.
80 wrap_alphabet controls how the rest of the alphabet is added
81 after the keyword.
82 0 : from 'a'
83 1 : from the last letter in the sanitised keyword
84 2 : from the largest letter in the sanitised keyword
85
86 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
87 'test message'
88 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
89 'test message'
90 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
91 'test message'
92 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
93 'test message'
94 """
95 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
96 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
97 return message.lower().translate(cipher_translation)
98
99
100 def keyword_break_single_thread(message, wordlist=None, fitness=Pletters):
101 """Breaks a keyword substitution cipher using a dictionary and
102 frequency analysis.
103
104 If `wordlist` is not specified, use
105 [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
106
107 >>> keyword_break(keyword_encipher('this is a test message for the ' \
108 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
109 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
110 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
111 """
112 if wordlist is None:
113 wordlist = keywords
114
115 best_keyword = ''
116 best_wrap_alphabet = True
117 best_fit = float("-inf")
118 for wrap_alphabet in KeywordWrapAlphabet:
119 for keyword in wordlist:
120 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
121 fit = fitness(plaintext)
122 if fit > best_fit:
123 best_fit = fit
124 best_keyword = keyword
125 best_wrap_alphabet = wrap_alphabet
126 return (best_keyword, best_wrap_alphabet), best_fit
127
128 def keyword_break(message, wordlist=None, fitness=Pletters,
129 number_of_solutions=1, chunksize=500):
130 """Breaks a keyword substitution cipher using a dictionary and
131 frequency analysis.
132
133 If `wordlist` is not specified, use
134 [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
135
136
137 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
138 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
139 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
140 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
141 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
142 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
143 wordlist=['cat', 'elephant', 'kangaroo'], \
144 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
145 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
146 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
147 """
148 if wordlist is None:
149 wordlist = keywords
150
151 with multiprocessing.Pool() as pool:
152 helper_args = [(message, word, wrap, fitness)
153 for word in wordlist
154 for wrap in KeywordWrapAlphabet]
155 # Gotcha: the helper function here needs to be defined at the top level
156 # (limitation of Pool.starmap)
157 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
158 if number_of_solutions == 1:
159 return max(breaks, key=lambda k: k[1])
160 else:
161 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
162
163 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
164 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
165 fit = fitness(plaintext)
166 return (keyword, wrap_alphabet), fit
167
168
169 def monoalphabetic_break_hillclimbing_single(message,
170 max_iterations=20000,
171 plain_alphabet=None,
172 cipher_alphabet=None,
173 swap_index_finder=None,
174 fitness=Pletters, chunksize=1):
175 """Break a monalphabetic substitution cipher using hillclimbing to
176 guess the keyword. Hillclimbing is done by using the simulated annealing
177 approach with a temperature of zero.
178
179 This version uses a single worker.
180 """
181 return monoalphabetic_sa_break(message,
182 workers=1,
183 initial_temperature=0,
184 max_iterations=max_iterations,
185 plain_alphabet=plain_alphabet,
186 cipher_alphabet=cipher_alphabet,
187 swap_index_finder=swap_index_finder,
188 fitness=fitness, chunksize=chunksize)
189
190
191 def monoalphabetic_break_hillclimbing(message,
192 workers=10,
193 max_iterations=20000,
194 plain_alphabet=None,
195 cipher_alphabet=None,
196 swap_index_finder=None,
197 fitness=Pletters, chunksize=1):
198 """Break a monalphabetic substitution cipher using hillclimbing to
199 guess the keyword. Hillclimbing is done by using the simulated annealing
200 approach with a temperature of zero.
201
202 This version uses a several workers.
203 """
204 return monoalphabetic_sa_break(message,
205 workers=workers,
206 initial_temperature=0,
207 max_iterations=max_iterations,
208 plain_alphabet=plain_alphabet,
209 cipher_alphabet=cipher_alphabet,
210 swap_index_finder=swap_index_finder,
211 fitness=fitness, chunksize=chunksize)
212
213
214 def gaussian_swap_index(a):
215 """Return an index to use as the partner of `a` in a swap. The partners
216 are drawn from a Gaussian distribution.
217 """
218 return (a + int(random.gauss(0, 4))) % 26
219
220 def uniform_swap_index(a):
221 """Return an index to use as the partner of `a` in a swap. The partners
222 are drawn from a uniform distribution.
223 """
224 return random.randrange(26)
225
226 def monoalphabetic_sa_break(message, workers=10,
227 initial_temperature=200,
228 max_iterations=20000,
229 plain_alphabet=None,
230 cipher_alphabet=None,
231 swap_index_finder=None,
232 fitness=Ptrigrams, chunksize=1):
233 """Break a monalphabetic substitution cipher using simulated annealing to
234 guess the keyword. This function just sets up a stable of workers who
235 do the actual work, implemented as
236 `szyfrow.keyword_cipher.monoalphabetic_sa_break_worker`.
237
238 See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
239 for detail on how this works.
240 """
241 worker_args = []
242 ciphertext = sanitise(message)
243 if swap_index_finder is None:
244 swap_index_finder = gaussian_swap_index
245
246 for i in range(workers):
247 if plain_alphabet is None:
248 used_plain_alphabet = string.ascii_lowercase
249 else:
250 used_plain_alphabet = plain_alphabet
251 if cipher_alphabet is None:
252 used_cipher_alphabet = list(string.ascii_lowercase)
253 random.shuffle(used_cipher_alphabet)
254 used_cipher_alphabet = cat(used_cipher_alphabet)
255 else:
256 used_cipher_alphabet = cipher_alphabet
257 # if not plain_alphabet:
258 # plain_alphabet = string.ascii_lowercase
259 # if not cipher_alphabet:
260 # cipher_alphabet = list(string.ascii_lowercase)
261 # random.shuffle(cipher_alphabet)
262 # cipher_alphabet = cat(cipher_alphabet)
263 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
264 swap_index_finder,
265 initial_temperature, max_iterations, fitness,
266 i))
267 with multiprocessing.Pool() as pool:
268 breaks = pool.starmap(monoalphabetic_sa_break_worker,
269 worker_args, chunksize)
270 return max(breaks, key=lambda k: k[1])
271
272
273 def monoalphabetic_sa_break_worker(message, plain_alphabet, cipher_alphabet,
274 swap_index_finder,
275 t0, max_iterations, fitness,
276 logID):
277 """One thread of a simulated annealing run.
278 See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
279 for detail on how this works.
280 """
281 def swap(letters, i, j):
282 if i > j:
283 i, j = j, i
284 if i == j:
285 return letters
286 else:
287 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
288 letters[j+1:])
289
290 temperature = t0
291
292 dt = t0 / (0.9 * max_iterations)
293
294 current_alphabet = cipher_alphabet
295 alphabet = current_alphabet
296 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
297 plaintext = message.translate(cipher_translation)
298 current_fitness = fitness(plaintext)
299
300 best_alphabet = current_alphabet
301 best_fitness = current_fitness
302 best_plaintext = plaintext
303
304 # print('starting for', max_iterations)
305 for i in range(max_iterations):
306 swap_a = random.randrange(26)
307 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
308 swap_b = swap_index_finder(swap_a)
309 alphabet = swap(current_alphabet, swap_a, swap_b)
310 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
311 plaintext = message.translate(cipher_translation)
312 new_fitness = fitness(plaintext)
313 try:
314 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
315 except (OverflowError, ZeroDivisionError):
316 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
317 sa_chance = 0
318 if (new_fitness > current_fitness or random.random() < sa_chance):
319 current_fitness = new_fitness
320 current_alphabet = alphabet
321
322 if current_fitness > best_fitness:
323 best_alphabet = current_alphabet
324 best_fitness = current_fitness
325 best_plaintext = plaintext
326
327 temperature = max(temperature - dt, 0.001)
328
329 return best_alphabet, best_fitness # current_alphabet, current_fitness
330
331 if __name__ == "__main__":
332 import doctest