91491ba8bf79cb569d0917d4532660815a2a6b79
[cipher-tools.git] / keyword.py
1 from utilities import *
2 from language_models import *
3 from enum import Enum
4 # from itertools import starmap
5 from multiprocessing import Pool
6
7 from logger import logger
8
9
10 class KeywordWrapAlphabet(Enum):
11 from_a = 1
12 from_last = 2
13 from_largest = 3
14
15
16 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
17 """Find the cipher alphabet given a keyword.
18 wrap_alphabet controls how the rest of the alphabet is added
19 after the keyword.
20
21 >>> keyword_cipher_alphabet_of('bayes')
22 'bayescdfghijklmnopqrtuvwxz'
23 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
24 'bayescdfghijklmnopqrtuvwxz'
25 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
26 'bayestuvwxzcdfghijklmnopqr'
27 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
28 'bayeszcdfghijklmnopqrtuvwx'
29 """
30 if wrap_alphabet == KeywordWrapAlphabet.from_a:
31 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
32 string.ascii_lowercase))
33 else:
34 if wrap_alphabet == KeywordWrapAlphabet.from_last:
35 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
36 else:
37 last_keyword_letter = sorted(sanitise(keyword))[-1]
38 last_keyword_position = string.ascii_lowercase.find(
39 last_keyword_letter) + 1
40 cipher_alphabet = cat(
41 deduplicate(sanitise(keyword) +
42 string.ascii_lowercase[last_keyword_position:] +
43 string.ascii_lowercase))
44 return cipher_alphabet
45
46
47 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
48 """Enciphers a message with a keyword substitution cipher.
49 wrap_alphabet controls how the rest of the alphabet is added
50 after the keyword.
51 0 : from 'a'
52 1 : from the last letter in the sanitised keyword
53 2 : from the largest letter in the sanitised keyword
54
55 >>> keyword_encipher('test message', 'bayes')
56 'rsqr ksqqbds'
57 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
58 'rsqr ksqqbds'
59 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
60 'lskl dskkbus'
61 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
62 'qspq jsppbcs'
63 """
64 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
65 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
66 return unaccent(message).lower().translate(cipher_translation)
67
68 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
69 """Deciphers a message with a keyword substitution cipher.
70 wrap_alphabet controls how the rest of the alphabet is added
71 after the keyword.
72 0 : from 'a'
73 1 : from the last letter in the sanitised keyword
74 2 : from the largest letter in the sanitised keyword
75
76 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
77 'test message'
78 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
79 'test message'
80 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
81 'test message'
82 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
83 'test message'
84 """
85 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
86 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
87 return message.lower().translate(cipher_translation)
88
89
90 def keyword_break(message, wordlist=keywords, fitness=Pletters):
91 """Breaks a keyword substitution cipher using a dictionary and
92 frequency analysis.
93
94 >>> keyword_break(keyword_encipher('this is a test message for the ' \
95 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
96 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
97 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
98 """
99 best_keyword = ''
100 best_wrap_alphabet = True
101 best_fit = float("-inf")
102 for wrap_alphabet in KeywordWrapAlphabet:
103 for keyword in wordlist:
104 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
105 fit = fitness(plaintext)
106 logger.debug('Keyword break attempt using key {0} (wrap={1}) '
107 'gives fit of {2} and decrypt starting: {3}'.format(
108 keyword, wrap_alphabet, fit,
109 sanitise(plaintext)[:50]))
110 if fit > best_fit:
111 best_fit = fit
112 best_keyword = keyword
113 best_wrap_alphabet = wrap_alphabet
114 logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
115 '{2} and decrypt starting: {3}'.format(best_keyword,
116 best_wrap_alphabet, best_fit, sanitise(
117 keyword_decipher(message, best_keyword,
118 best_wrap_alphabet))[:50]))
119 return (best_keyword, best_wrap_alphabet), best_fit
120
121 def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
122 number_of_solutions=1, chunksize=500):
123 """Breaks a keyword substitution cipher using a dictionary and
124 frequency analysis
125
126 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
127 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
128 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
129 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
130 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
131 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
132 wordlist=['cat', 'elephant', 'kangaroo'], \
133 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
134 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
135 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
136 """
137 with Pool() as pool:
138 helper_args = [(message, word, wrap, fitness)
139 for word in wordlist
140 for wrap in KeywordWrapAlphabet]
141 # Gotcha: the helper function here needs to be defined at the top level
142 # (limitation of Pool.starmap)
143 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
144 if number_of_solutions == 1:
145 return max(breaks, key=lambda k: k[1])
146 else:
147 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
148
149 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
150 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
151 fit = fitness(plaintext)
152 logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
153 '{2} and decrypt starting: {3}'.format(keyword,
154 wrap_alphabet, fit, sanitise(plaintext)[:50]))
155 return (keyword, wrap_alphabet), fit
156
157
158 def monoalphabetic_break_hillclimbing(message,
159 max_iterations=20000,
160 plain_alphabet=None,
161 cipher_alphabet=None,
162 fitness=Pletters, chunksize=1):
163 return simulated_annealing_break(message,
164 workers=1,
165 initial_temperature=0,
166 max_iterations=max_iterations,
167 plain_alphabet=plain_alphabet,
168 cipher_alphabet=cipher_alphabet,
169 fitness=fitness, chunksize=chunksize)
170
171
172 def monoalphabetic_break_hillclimbing_mp(message,
173 workers=10,
174 max_iterations=20000,
175 plain_alphabet=None,
176 cipher_alphabet=None,
177 fitness=Pletters, chunksize=1):
178 return simulated_annealing_break(message,
179 workers=workers,
180 initial_temperature=0,
181 max_iterations=max_iterations,
182 plain_alphabet=plain_alphabet,
183 cipher_alphabet=cipher_alphabet,
184 fitness=fitness, chunksize=chunksize)
185
186
187 def simulated_annealing_break(message, workers=10,
188 initial_temperature=200,
189 max_iterations=20000,
190 plain_alphabet=None,
191 cipher_alphabet=None,
192 fitness=Pletters, chunksize=1):
193 worker_args = []
194 ciphertext = sanitise(message)
195 for i in range(workers):
196 if not plain_alphabet:
197 plain_alphabet = string.ascii_lowercase
198 if not cipher_alphabet:
199 cipher_alphabet = list(string.ascii_lowercase)
200 random.shuffle(cipher_alphabet)
201 cipher_alphabet = cat(cipher_alphabet)
202 worker_args.append((ciphertext, plain_alphabet, cipher_alphabet,
203 initial_temperature, max_iterations, fitness))
204 with Pool() as pool:
205 breaks = pool.starmap(simulated_annealing_break_worker,
206 worker_args, chunksize)
207 return max(breaks, key=lambda k: k[1])
208
209
210 def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
211 t0, max_iterations, fitness):
212 def swap(letters, i, j):
213 if i > j:
214 i, j = j, i
215 if i == j:
216 return letters
217 else:
218 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
219 letters[j+1:])
220
221 temperature = t0
222
223 dt = t0 / (0.9 * max_iterations)
224
225 current_alphabet = cipher_alphabet
226 alphabet = current_alphabet
227 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
228 plaintext = message.translate(cipher_translation)
229 current_fitness = fitness(plaintext)
230
231 best_alphabet = current_alphabet
232 best_fitness = current_fitness
233 best_plaintext = plaintext
234
235 # print('starting for', max_iterations)
236 for i in range(max_iterations):
237 swap_a = random.randrange(26)
238 swap_b = (swap_a + int(random.gauss(0, 4))) % 26
239 alphabet = swap(current_alphabet, swap_a, swap_b)
240 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
241 plaintext = message.translate(cipher_translation)
242 new_fitness = fitness(plaintext)
243 try:
244 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
245 except (OverflowError, ZeroDivisionError):
246 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
247 sa_chance = 0
248 if (new_fitness > current_fitness or random.random() < sa_chance):
249 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
250 # 'current alphabet {}, current_fitness {}, '
251 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
252 # current_fitness, best_plaintext[:50]))
253
254 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
255 current_fitness = new_fitness
256 current_alphabet = alphabet
257
258 if current_fitness > best_fitness:
259 best_alphabet = current_alphabet
260 best_fitness = current_fitness
261 best_plaintext = plaintext
262 if i % 500 == 0:
263 logger.debug('Simulated annealing: iteration {}, temperature {}, '
264 'current alphabet {}, current_fitness {}, '
265 'best_plaintext {}'.format(i, temperature, current_alphabet,
266 current_fitness, plaintext[:50]))
267 temperature = max(temperature - dt, 0.001)
268
269 return best_alphabet, best_fitness # current_alphabet, current_fitness