Implemented playfair cipher
[cipher-tools.git] / cipher / keyword_cipher.py
1 from enum import Enum
2 # from itertools import starmap
3 import multiprocessing
4 import math
5 from support.utilities import *
6 from support.language_models import *
7
8 from logger import logger
9
10
11 class KeywordWrapAlphabet(Enum):
12 from_a = 1
13 from_last = 2
14 from_largest = 3
15
16
17 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
18 """Find the cipher alphabet given a keyword.
19 wrap_alphabet controls how the rest of the alphabet is added
20 after the keyword.
21
22 >>> keyword_cipher_alphabet_of('bayes')
23 'bayescdfghijklmnopqrtuvwxz'
24 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
25 'bayescdfghijklmnopqrtuvwxz'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
27 'bayestuvwxzcdfghijklmnopqr'
28 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
29 'bayeszcdfghijklmnopqrtuvwx'
30 """
31 if wrap_alphabet == KeywordWrapAlphabet.from_a:
32 cipher_alphabet = cat(deduplicate(sanitise(keyword) +
33 string.ascii_lowercase))
34 else:
35 if wrap_alphabet == KeywordWrapAlphabet.from_last:
36 last_keyword_letter = deduplicate(sanitise(keyword))[-1]
37 else:
38 last_keyword_letter = sorted(sanitise(keyword))[-1]
39 last_keyword_position = string.ascii_lowercase.find(
40 last_keyword_letter) + 1
41 cipher_alphabet = cat(
42 deduplicate(sanitise(keyword) +
43 string.ascii_lowercase[last_keyword_position:] +
44 string.ascii_lowercase))
45 return cipher_alphabet
46
47
48 def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
49 """Enciphers a message with a keyword substitution cipher.
50 wrap_alphabet controls how the rest of the alphabet is added
51 after the keyword.
52 0 : from 'a'
53 1 : from the last letter in the sanitised keyword
54 2 : from the largest letter in the sanitised keyword
55
56 >>> keyword_encipher('test message', 'bayes')
57 'rsqr ksqqbds'
58 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
59 'rsqr ksqqbds'
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
61 'lskl dskkbus'
62 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
63 'qspq jsppbcs'
64 """
65 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
66 cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
67 return unaccent(message).lower().translate(cipher_translation)
68
69 def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
70 """Deciphers a message with a keyword substitution cipher.
71 wrap_alphabet controls how the rest of the alphabet is added
72 after the keyword.
73 0 : from 'a'
74 1 : from the last letter in the sanitised keyword
75 2 : from the largest letter in the sanitised keyword
76
77 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
78 'test message'
79 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
80 'test message'
81 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
82 'test message'
83 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
84 'test message'
85 """
86 cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
87 cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
88 return message.lower().translate(cipher_translation)
89
90
91 def keyword_break(message, wordlist=keywords, fitness=Pletters):
92 """Breaks a keyword substitution cipher using a dictionary and
93 frequency analysis.
94
95 >>> keyword_break(keyword_encipher('this is a test message for the ' \
96 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
97 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
98 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
99 """
100 best_keyword = ''
101 best_wrap_alphabet = True
102 best_fit = float("-inf")
103 for wrap_alphabet in KeywordWrapAlphabet:
104 for keyword in wordlist:
105 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
106 fit = fitness(plaintext)
107 logger.debug('Keyword break attempt using key {0} (wrap={1}) '
108 'gives fit of {2} and decrypt starting: {3}'.format(
109 keyword, wrap_alphabet, fit,
110 sanitise(plaintext)[:50]))
111 if fit > best_fit:
112 best_fit = fit
113 best_keyword = keyword
114 best_wrap_alphabet = wrap_alphabet
115 logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
116 '{2} and decrypt starting: {3}'.format(best_keyword,
117 best_wrap_alphabet, best_fit, sanitise(
118 keyword_decipher(message, best_keyword,
119 best_wrap_alphabet))[:50]))
120 return (best_keyword, best_wrap_alphabet), best_fit
121
122 def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
123 number_of_solutions=1, chunksize=500):
124 """Breaks a keyword substitution cipher using a dictionary and
125 frequency analysis
126
127 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
128 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
129 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
130 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
131 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
132 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
133 wordlist=['cat', 'elephant', 'kangaroo'], \
134 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
135 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
136 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
137 """
138 with multiprocessing.Pool() as pool:
139 helper_args = [(message, word, wrap, fitness)
140 for word in wordlist
141 for wrap in KeywordWrapAlphabet]
142 # Gotcha: the helper function here needs to be defined at the top level
143 # (limitation of Pool.starmap)
144 breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
145 if number_of_solutions == 1:
146 return max(breaks, key=lambda k: k[1])
147 else:
148 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
149
150 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
151 plaintext = keyword_decipher(message, keyword, wrap_alphabet)
152 fit = fitness(plaintext)
153 logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
154 '{2} and decrypt starting: {3}'.format(keyword,
155 wrap_alphabet, fit, sanitise(plaintext)[:50]))
156 return (keyword, wrap_alphabet), fit
157
158
159 def monoalphabetic_break_hillclimbing(message,
160 max_iterations=20000,
161 plain_alphabet=None,
162 cipher_alphabet=None,
163 fitness=Pletters, chunksize=1):
164 return simulated_annealing_break(message,
165 workers=1,
166 initial_temperature=0,
167 max_iterations=max_iterations,
168 plain_alphabet=plain_alphabet,
169 cipher_alphabet=cipher_alphabet,
170 fitness=fitness, chunksize=chunksize)
171
172
173 def monoalphabetic_break_hillclimbing_mp(message,
174 workers=10,
175 max_iterations=20000,
176 plain_alphabet=None,
177 cipher_alphabet=None,
178 fitness=Pletters, chunksize=1):
179 return simulated_annealing_break(message,
180 workers=workers,
181 initial_temperature=0,
182 max_iterations=max_iterations,
183 plain_alphabet=plain_alphabet,
184 cipher_alphabet=cipher_alphabet,
185 fitness=fitness, chunksize=chunksize)
186
187
188 def simulated_annealing_break(message, workers=10,
189 initial_temperature=200,
190 max_iterations=20000,
191 plain_alphabet=None,
192 cipher_alphabet=None,
193 fitness=Pletters, chunksize=1):
194 worker_args = []
195 ciphertext = sanitise(message)
196 for i in range(workers):
197 if plain_alphabet is None:
198 used_plain_alphabet = string.ascii_lowercase
199 else:
200 used_plain_alphabet = plain_alphabet
201 if cipher_alphabet is None:
202 used_cipher_alphabet = list(string.ascii_lowercase)
203 random.shuffle(used_cipher_alphabet)
204 used_cipher_alphabet = cat(used_cipher_alphabet)
205 else:
206 used_cipher_alphabet = cipher_alphabet
207 # if not plain_alphabet:
208 # plain_alphabet = string.ascii_lowercase
209 # if not cipher_alphabet:
210 # cipher_alphabet = list(string.ascii_lowercase)
211 # random.shuffle(cipher_alphabet)
212 # cipher_alphabet = cat(cipher_alphabet)
213 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
214 initial_temperature, max_iterations, fitness))
215 with multiprocessing.Pool() as pool:
216 breaks = pool.starmap(simulated_annealing_break_worker,
217 worker_args, chunksize)
218 return max(breaks, key=lambda k: k[1])
219
220
221 def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
222 t0, max_iterations, fitness):
223 def swap(letters, i, j):
224 if i > j:
225 i, j = j, i
226 if i == j:
227 return letters
228 else:
229 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
230 letters[j+1:])
231
232 temperature = t0
233
234 dt = t0 / (0.9 * max_iterations)
235
236 current_alphabet = cipher_alphabet
237 alphabet = current_alphabet
238 cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
239 plaintext = message.translate(cipher_translation)
240 current_fitness = fitness(plaintext)
241
242 best_alphabet = current_alphabet
243 best_fitness = current_fitness
244 best_plaintext = plaintext
245
246 # print('starting for', max_iterations)
247 for i in range(max_iterations):
248 swap_a = random.randrange(26)
249 swap_b = (swap_a + int(random.gauss(0, 4))) % 26
250 alphabet = swap(current_alphabet, swap_a, swap_b)
251 cipher_translation = ''.maketrans(alphabet, plain_alphabet)
252 plaintext = message.translate(cipher_translation)
253 new_fitness = fitness(plaintext)
254 try:
255 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
256 except (OverflowError, ZeroDivisionError):
257 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
258 sa_chance = 0
259 if (new_fitness > current_fitness or random.random() < sa_chance):
260 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
261 # 'current alphabet {}, current_fitness {}, '
262 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
263 # current_fitness, best_plaintext[:50]))
264
265 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
266 current_fitness = new_fitness
267 current_alphabet = alphabet
268
269 if current_fitness > best_fitness:
270 best_alphabet = current_alphabet
271 best_fitness = current_fitness
272 best_plaintext = plaintext
273 if i % 500 == 0:
274 logger.debug('Simulated annealing: iteration {}, temperature {}, '
275 'current alphabet {}, current_fitness {}, '
276 'best_plaintext {}'.format(i, temperature, current_alphabet,
277 current_fitness, plaintext[:50]))
278 temperature = max(temperature - dt, 0.001)
279
280 return best_alphabet, best_fitness # current_alphabet, current_fitness
281
282 if __name__ == "__main__":
283 import doctest