Many tests done, still more to come.
[szyfrow.git] / szyfrow / keyword_cipher.py
1 from enum import Enum
2 # from itertools import starmap
3 import multiprocessing
4 import math
5 from szyfrow.support.utilities import *
6 from szyfrow.support.language_models import *
7
8
9 class KeywordWrapAlphabet(Enum):
10 from_a = 1
11 from_last = 2
12 from_largest = 3
13
14
15 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
16 """Find the cipher alphabet given a keyword.
17 wrap_alphabet controls how the rest of the alphabet is added
18 after the keyword.
19
20 >>> keyword_cipher_alphabet_of('bayes')
21 'bayescdfghijklmnopqrtuvwxz'
22 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
23 'bayescdfghijklmnopqrtuvwxz'
24 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
25 'bayestuvwxzcdfghijklmnopqr'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
27 'bayeszcdfghijklmnopqrtuvwx'
28 """
29 if wrap_alphabet == KeywordWrapAlphabet.from_a:
30 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
31 string.ascii_lowercase))
32 else:
33 if wrap_alphabet == KeywordWrapAlphabet.from_last:
34 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
35 else:
36 last_keyword_letter = sorted(sanitise(keyword))[-1]
37 last_keyword_position = string.ascii_lowercase.find(
38 last_keyword_letter) + 1
39 cipher_alphabet = cat(
40 deduplicate(sanitise(keyword) +
41 string.ascii_lowercase[last_keyword_position:] +
42 string.ascii_lowercase))
43 return cipher_alphabet
44
45
46 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
47 """Enciphers a message with a keyword substitution cipher.
48 wrap_alphabet controls how the rest of the alphabet is added
49 after the keyword.
50 0 : from 'a'
51 1 : from the last letter in the sanitised keyword
52 2 : from the largest letter in the sanitised keyword
53
54 >>> keyword_encipher('test message', 'bayes')
55 'rsqr ksqqbds'
56 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
57 'rsqr ksqqbds'
58 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
59 'lskl dskkbus'
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
61 'qspq jsppbcs'
62 """
63 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
64 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
65 return unaccent(message).lower().translate(cipher_translation)
66
67 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
68 """Deciphers a message with a keyword substitution cipher.
69 wrap_alphabet controls how the rest of the alphabet is added
70 after the keyword.
71 0 : from 'a'
72 1 : from the last letter in the sanitised keyword
73 2 : from the largest letter in the sanitised keyword
74
75 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
76 'test message'
77 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
78 'test message'
79 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
80 'test message'
81 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
82 'test message'
83 """
84 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
85 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
86 return message.lower().translate(cipher_translation)
87
88
89 def keyword_break(message, wordlist=keywords, fitness=Pletters):
90 """Breaks a keyword substitution cipher using a dictionary and
91 frequency analysis.
92
93 >>> keyword_break(keyword_encipher('this is a test message for the ' \
94 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
95 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
96 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
97 """
98 best_keyword = ''
99 best_wrap_alphabet = True
100 best_fit = float("-inf")
101 for wrap_alphabet in KeywordWrapAlphabet:
102 for keyword in wordlist:
103 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
104 fit = fitness(plaintext)
105 if fit > best_fit:
106 best_fit = fit
107 best_keyword = keyword
108 best_wrap_alphabet = wrap_alphabet
109 return (best_keyword, best_wrap_alphabet), best_fit
110
111 def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
112 number_of_solutions=1, chunksize=500):
113 """Breaks a keyword substitution cipher using a dictionary and
114 frequency analysis
115
116 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
117 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
118 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
119 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
120 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
121 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
122 wordlist=['cat', 'elephant', 'kangaroo'], \
123 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
124 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
125 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
126 """
127 with multiprocessing.Pool() as pool:
128 helper_args = [(message, word, wrap, fitness)
129 for word in wordlist
130 for wrap in KeywordWrapAlphabet]
131 # Gotcha: the helper function here needs to be defined at the top level
132 # (limitation of Pool.starmap)
133 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
134 if number_of_solutions == 1:
135 return max(breaks, key=lambda k: k[1])
136 else:
137 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
138
139 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
140 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
141 fit = fitness(plaintext)
142 return (keyword, wrap_alphabet), fit
143
144
145 def monoalphabetic_break_hillclimbing(message,
146 max_iterations=20000,
147 plain_alphabet=None,
148 cipher_alphabet=None,
149 swap_index_finder=None,
150 fitness=Pletters, chunksize=1):
151 return monoalphabetic_sa_break(message,
152 workers=1,
153 initial_temperature=0,
154 max_iterations=max_iterations,
155 plain_alphabet=plain_alphabet,
156 cipher_alphabet=cipher_alphabet,
157 swap_index_finder=swap_index_finder,
158 fitness=fitness, chunksize=chunksize)
159
160
161 def monoalphabetic_break_hillclimbing_mp(message,
162 workers=10,
163 max_iterations=20000,
164 plain_alphabet=None,
165 cipher_alphabet=None,
166 swap_index_finder=None,
167 fitness=Pletters, chunksize=1):
168 return monoalphabetic_sa_break(message,
169 workers=workers,
170 initial_temperature=0,
171 max_iterations=max_iterations,
172 plain_alphabet=plain_alphabet,
173 cipher_alphabet=cipher_alphabet,
174 swap_index_finder=swap_index_finder,
175 fitness=fitness, chunksize=chunksize)
176
177
178 def gaussian_swap_index(a):
179 return (a + int(random.gauss(0, 4))) % 26
180
181 def uniform_swap_index(a):
182 return random.randrange(26)
183
184 def monoalphabetic_sa_break(message, workers=10,
185 initial_temperature=200,
186 max_iterations=20000,
187 plain_alphabet=None,
188 cipher_alphabet=None,
189 swap_index_finder=None,
190 fitness=Ptrigrams, chunksize=1):
191 worker_args = []
192 ciphertext = sanitise(message)
193 if swap_index_finder is None:
194 swap_index_finder = gaussian_swap_index
195
196 for i in range(workers):
197 if plain_alphabet is None:
198 used_plain_alphabet = string.ascii_lowercase
199 else:
200 used_plain_alphabet = plain_alphabet
201 if cipher_alphabet is None:
202 used_cipher_alphabet = list(string.ascii_lowercase)
203 random.shuffle(used_cipher_alphabet)
204 used_cipher_alphabet = cat(used_cipher_alphabet)
205 else:
206 used_cipher_alphabet = cipher_alphabet
207 # if not plain_alphabet:
208 # plain_alphabet = string.ascii_lowercase
209 # if not cipher_alphabet:
210 # cipher_alphabet = list(string.ascii_lowercase)
211 # random.shuffle(cipher_alphabet)
212 # cipher_alphabet = cat(cipher_alphabet)
213 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
214 swap_index_finder,
215 initial_temperature, max_iterations, fitness,
216 i))
217 with multiprocessing.Pool() as pool:
218 breaks = pool.starmap(monoalphabetic_sa_break_worker,
219 worker_args, chunksize)
220 return max(breaks, key=lambda k: k[1])
221
222
223 def monoalphabetic_sa_break_worker(message, plain_alphabet, cipher_alphabet,
224 swap_index_finder,
225 t0, max_iterations, fitness,
226 logID):
227 def swap(letters, i, j):
228 if i > j:
229 i, j = j, i
230 if i == j:
231 return letters
232 else:
233 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
234 letters[j+1:])
235
236 temperature = t0
237
238 dt = t0 / (0.9 * max_iterations)
239
240 current_alphabet = cipher_alphabet
241 alphabet = current_alphabet
242 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
243 plaintext = message.translate(cipher_translation)
244 current_fitness = fitness(plaintext)
245
246 best_alphabet = current_alphabet
247 best_fitness = current_fitness
248 best_plaintext = plaintext
249
250 # print('starting for', max_iterations)
251 for i in range(max_iterations):
252 swap_a = random.randrange(26)
253 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
254 swap_b = swap_index_finder(swap_a)
255 alphabet = swap(current_alphabet, swap_a, swap_b)
256 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
257 plaintext = message.translate(cipher_translation)
258 new_fitness = fitness(plaintext)
259 try:
260 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
261 except (OverflowError, ZeroDivisionError):
262 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
263 sa_chance = 0
264 if (new_fitness > current_fitness or random.random() < sa_chance):
265 current_fitness = new_fitness
266 current_alphabet = alphabet
267
268 if current_fitness > best_fitness:
269 best_alphabet = current_alphabet
270 best_fitness = current_fitness
271 best_plaintext = plaintext
272
273 temperature = max(temperature - dt, 0.001)
274
275 return best_alphabet, best_fitness # current_alphabet, current_fitness
276
277 if __name__ == "__main__":
278 import doctest