Updated notebooks for new library organisation
[cipher-tools.git] / cipher / keyword_cipher.py
1 from enum import Enum
2 # from itertools import starmap
3 import multiprocessing
4 import math
5 from support.utilities import *
6 from support.language_models import *
7
8 from logger import logger
9
10
11 class KeywordWrapAlphabet(Enum):
12 from_a = 1
13 from_last = 2
14 from_largest = 3
15
16
17 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
18 """Find the cipher alphabet given a keyword.
19 wrap_alphabet controls how the rest of the alphabet is added
20 after the keyword.
21
22 >>> keyword_cipher_alphabet_of('bayes')
23 'bayescdfghijklmnopqrtuvwxz'
24 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
25 'bayescdfghijklmnopqrtuvwxz'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
27 'bayestuvwxzcdfghijklmnopqr'
28 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
29 'bayeszcdfghijklmnopqrtuvwx'
30 """
31 if wrap_alphabet == KeywordWrapAlphabet.from_a:
32 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
33 string.ascii_lowercase))
34 else:
35 if wrap_alphabet == KeywordWrapAlphabet.from_last:
36 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
37 else:
38 last_keyword_letter = sorted(sanitise(keyword))[-1]
39 last_keyword_position = string.ascii_lowercase.find(
40 last_keyword_letter) + 1
41 cipher_alphabet = cat(
42 deduplicate(sanitise(keyword) +
43 string.ascii_lowercase[last_keyword_position:] +
44 string.ascii_lowercase))
45 return cipher_alphabet
46
47
48 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
49 """Enciphers a message with a keyword substitution cipher.
50 wrap_alphabet controls how the rest of the alphabet is added
51 after the keyword.
52 0 : from 'a'
53 1 : from the last letter in the sanitised keyword
54 2 : from the largest letter in the sanitised keyword
55
56 >>> keyword_encipher('test message', 'bayes')
57 'rsqr ksqqbds'
58 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
59 'rsqr ksqqbds'
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
61 'lskl dskkbus'
62 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
63 'qspq jsppbcs'
64 """
65 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
66 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
67 return unaccent(message).lower().translate(cipher_translation)
68
69 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
70 """Deciphers a message with a keyword substitution cipher.
71 wrap_alphabet controls how the rest of the alphabet is added
72 after the keyword.
73 0 : from 'a'
74 1 : from the last letter in the sanitised keyword
75 2 : from the largest letter in the sanitised keyword
76
77 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
78 'test message'
79 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
80 'test message'
81 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
82 'test message'
83 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
84 'test message'
85 """
86 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
87 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
88 return message.lower().translate(cipher_translation)
89
90
91 def keyword_break(message, wordlist=keywords, fitness=Pletters):
92 """Breaks a keyword substitution cipher using a dictionary and
93 frequency analysis.
94
95 >>> keyword_break(keyword_encipher('this is a test message for the ' \
96 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
97 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
98 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
99 """
100 best_keyword = ''
101 best_wrap_alphabet = True
102 best_fit = float("-inf")
103 for wrap_alphabet in KeywordWrapAlphabet:
104 for keyword in wordlist:
105 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
106 fit = fitness(plaintext)
107 logger.debug('Keyword break attempt using key {0} (wrap={1}) '
108 'gives fit of {2} and decrypt starting: {3}'.format(
109 keyword, wrap_alphabet, fit,
110 sanitise(plaintext)[:50]))
111 if fit > best_fit:
112 best_fit = fit
113 best_keyword = keyword
114 best_wrap_alphabet = wrap_alphabet
115 logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
116 '{2} and decrypt starting: {3}'.format(best_keyword,
117 best_wrap_alphabet, best_fit, sanitise(
118 keyword_decipher(message, best_keyword,
119 best_wrap_alphabet))[:50]))
120 return (best_keyword, best_wrap_alphabet), best_fit
121
122 def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
123 number_of_solutions=1, chunksize=500):
124 """Breaks a keyword substitution cipher using a dictionary and
125 frequency analysis
126
127 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
128 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
129 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
130 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
131 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
132 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
133 wordlist=['cat', 'elephant', 'kangaroo'], \
134 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
135 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
136 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
137 """
138 with multiprocessing.Pool() as pool:
139 helper_args = [(message, word, wrap, fitness)
140 for word in wordlist
141 for wrap in KeywordWrapAlphabet]
142 # Gotcha: the helper function here needs to be defined at the top level
143 # (limitation of Pool.starmap)
144 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
145 if number_of_solutions == 1:
146 return max(breaks, key=lambda k: k[1])
147 else:
148 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
149
150 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
151 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
152 fit = fitness(plaintext)
153 logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
154 '{2} and decrypt starting: {3}'.format(keyword,
155 wrap_alphabet, fit, sanitise(plaintext)[:50]))
156 return (keyword, wrap_alphabet), fit
157
158
159 def monoalphabetic_break_hillclimbing(message,
160 max_iterations=20000,
161 plain_alphabet=None,
162 cipher_alphabet=None,
163 fitness=Pletters, chunksize=1):
164 return simulated_annealing_break(message,
165 workers=1,
166 initial_temperature=0,
167 max_iterations=max_iterations,
168 plain_alphabet=plain_alphabet,
169 cipher_alphabet=cipher_alphabet,
170 fitness=fitness, chunksize=chunksize)
171
172
173 def monoalphabetic_break_hillclimbing_mp(message,
174 workers=10,
175 max_iterations=20000,
176 plain_alphabet=None,
177 cipher_alphabet=None,
178 fitness=Pletters, chunksize=1):
179 return simulated_annealing_break(message,
180 workers=workers,
181 initial_temperature=0,
182 max_iterations=max_iterations,
183 plain_alphabet=plain_alphabet,
184 cipher_alphabet=cipher_alphabet,
185 fitness=fitness, chunksize=chunksize)
186
187
188 def simulated_annealing_break(message, workers=10,
189 initial_temperature=200,
190 max_iterations=20000,
191 plain_alphabet=None,
192 cipher_alphabet=None,
193 fitness=Pletters, chunksize=1):
194 worker_args = []
195 ciphertext = sanitise(message)
196 for i in range(workers):
197 if not plain_alphabet:
198 plain_alphabet = string.ascii_lowercase
199 if not cipher_alphabet:
200 cipher_alphabet = list(string.ascii_lowercase)
201 random.shuffle(cipher_alphabet)
202 cipher_alphabet = cat(cipher_alphabet)
203 worker_args.append((ciphertext, plain_alphabet, cipher_alphabet,
204 initial_temperature, max_iterations, fitness))
205 with multiprocessing.Pool() as pool:
206 breaks = pool.starmap(simulated_annealing_break_worker,
207 worker_args, chunksize)
208 return max(breaks, key=lambda k: k[1])
209
210
211 def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
212 t0, max_iterations, fitness):
213 def swap(letters, i, j):
214 if i > j:
215 i, j = j, i
216 if i == j:
217 return letters
218 else:
219 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
220 letters[j+1:])
221
222 temperature = t0
223
224 dt = t0 / (0.9 * max_iterations)
225
226 current_alphabet = cipher_alphabet
227 alphabet = current_alphabet
228 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
229 plaintext = message.translate(cipher_translation)
230 current_fitness = fitness(plaintext)
231
232 best_alphabet = current_alphabet
233 best_fitness = current_fitness
234 best_plaintext = plaintext
235
236 # print('starting for', max_iterations)
237 for i in range(max_iterations):
238 swap_a = random.randrange(26)
239 swap_b = (swap_a + int(random.gauss(0, 4))) % 26
240 alphabet = swap(current_alphabet, swap_a, swap_b)
241 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
242 plaintext = message.translate(cipher_translation)
243 new_fitness = fitness(plaintext)
244 try:
245 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
246 except (OverflowError, ZeroDivisionError):
247 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
248 sa_chance = 0
249 if (new_fitness > current_fitness or random.random() < sa_chance):
250 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
251 # 'current alphabet {}, current_fitness {}, '
252 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
253 # current_fitness, best_plaintext[:50]))
254
255 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
256 current_fitness = new_fitness
257 current_alphabet = alphabet
258
259 if current_fitness > best_fitness:
260 best_alphabet = current_alphabet
261 best_fitness = current_fitness
262 best_plaintext = plaintext
263 if i % 500 == 0:
264 logger.debug('Simulated annealing: iteration {}, temperature {}, '
265 'current alphabet {}, current_fitness {}, '
266 'best_plaintext {}'.format(i, temperature, current_alphabet,
267 current_fitness, plaintext[:50]))
268 temperature = max(temperature - dt, 0.001)
269
270 return best_alphabet, best_fitness # current_alphabet, current_fitness
271
272 if __name__ == "__main__":
273 import doctest