Started on documentation
[szyfrow.git] / szyfrow / playfair.py
1 """Enciphering and deciphering using the [Playfair cipher](https://en.wikipedia.org/wiki/Playfair_cipher).
2 Also attempts to break messages that use a Playfair cipher.
3 """
4 from szyfrow.support.utilities import *
5 from szyfrow.support.language_models import *
6 from szyfrow.keyword_cipher import KeywordWrapAlphabet, keyword_cipher_alphabet_of
7 from szyfrow.polybius import polybius_grid
8 import multiprocessing
9
10 def playfair_wrap(n, lowest, highest):
11 """Ensures _n_ is between _lowest_ and _highest_ (inclusive at both ends).
12 """
13 skip = highest - lowest + 1
14 while n > highest or n < lowest:
15 if n > highest:
16 n -= skip
17 if n < lowest:
18 n += skip
19 return n
20
21 def playfair_encipher_bigram(ab, grid, padding_letter='x'):
22 """Encipher a single pair of letters using the Playfair method."""
23 a, b = ab
24 max_row = max(c[0] for c in grid.values())
25 max_col = max(c[1] for c in grid.values())
26 min_row = min(c[0] for c in grid.values())
27 min_col = min(c[1] for c in grid.values())
28 if a == b:
29 b = padding_letter
30 if grid[a][0] == grid[b][0]: # same row
31 cp = (grid[a][0], playfair_wrap(grid[a][1] + 1, min_col, max_col))
32 dp = (grid[b][0], playfair_wrap(grid[b][1] + 1, min_col, max_col))
33 elif grid[a][1] == grid[b][1]: # same column
34 cp = (playfair_wrap(grid[a][0] + 1, min_row, max_row), grid[a][1])
35 dp = (playfair_wrap(grid[b][0] + 1, min_row, max_row), grid[b][1])
36 else:
37 cp = (grid[a][0], grid[b][1])
38 dp = (grid[b][0], grid[a][1])
39 c = [k for k, v in grid.items() if v == cp][0]
40 d = [k for k, v in grid.items() if v == dp][0]
41 return c + d
42
43 def playfair_decipher_bigram(ab, grid, padding_letter='x'):
44 """Decipher a single pair of letters using the Playfair method."""
45 a, b = ab
46 max_row = max(c[0] for c in grid.values())
47 max_col = max(c[1] for c in grid.values())
48 min_row = min(c[0] for c in grid.values())
49 min_col = min(c[1] for c in grid.values())
50 if a == b:
51 b = padding_letter
52 if grid[a][0] == grid[b][0]: # same row
53 cp = (grid[a][0], playfair_wrap(grid[a][1] - 1, min_col, max_col))
54 dp = (grid[b][0], playfair_wrap(grid[b][1] - 1, min_col, max_col))
55 elif grid[a][1] == grid[b][1]: # same column
56 cp = (playfair_wrap(grid[a][0] - 1, min_row, max_row), grid[a][1])
57 dp = (playfair_wrap(grid[b][0] - 1, min_row, max_row), grid[b][1])
58 else:
59 cp = (grid[a][0], grid[b][1])
60 dp = (grid[b][0], grid[a][1])
61 c = [k for k, v in grid.items() if v == cp][0]
62 d = [k for k, v in grid.items() if v == dp][0]
63 return c + d
64
65 def playfair_bigrams(text, padding_letter='x', padding_replaces_repeat=True):
66 """Find all the bigrams in a method to be enciphered.
67
68 If both letters are the same, the `padding_letter` is used instead of the
69 second letter. If `padding_replaces_repeat` is `True`, the `padding_letter`
70 replaces the second letter of the bigram. If `padding_replaces_repeat` is
71 `False`, the second letter of this bigram becomes the first letter of the
72 next, effectively lengthening the message by one letter.
73 """
74 i = 0
75 bigrams = []
76 while i < len(text):
77 bigram = text[i:i+2]
78 if len(bigram) == 1:
79 i = len(text) + 1
80 bigram = bigram + padding_letter
81 else:
82 if bigram[0] == bigram[1]:
83 bigram = bigram[0] + padding_letter
84 if padding_replaces_repeat:
85 i += 2
86 else:
87 i += 1
88 else:
89 i += 2
90 bigrams += [bigram]
91 return bigrams
92
93 def playfair_encipher(message, keyword, padding_letter='x',
94 padding_replaces_repeat=False, letters_to_merge=None,
95 wrap_alphabet=KeywordWrapAlphabet.from_a):
96 """Encipher a message using the Playfair cipher."""
97 column_order = list(range(5))
98 row_order = list(range(5))
99 if letters_to_merge is None:
100 letters_to_merge = {'j': 'i'}
101 grid = polybius_grid(keyword, column_order, row_order,
102 letters_to_merge=letters_to_merge,
103 wrap_alphabet=wrap_alphabet)
104 message_bigrams = playfair_bigrams(
105 sanitise(message), padding_letter=padding_letter,
106 padding_replaces_repeat=padding_replaces_repeat)
107 ciphertext_bigrams = [playfair_encipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
108 return cat(ciphertext_bigrams)
109
110 def playfair_decipher(message, keyword, padding_letter='x',
111 padding_replaces_repeat=False, letters_to_merge=None,
112 wrap_alphabet=KeywordWrapAlphabet.from_a):
113 """Decipher a message using the Playfair cipher."""
114 column_order = list(range(5))
115 row_order = list(range(5))
116 if letters_to_merge is None:
117 letters_to_merge = {'j': 'i'}
118 grid = polybius_grid(keyword, column_order, row_order,
119 letters_to_merge=letters_to_merge,
120 wrap_alphabet=wrap_alphabet)
121 message_bigrams = playfair_bigrams(
122 sanitise(message), padding_letter=padding_letter,
123 padding_replaces_repeat=padding_replaces_repeat)
124 plaintext_bigrams = [playfair_decipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
125 return cat(plaintext_bigrams)
126
127 def playfair_break(message,
128 letters_to_merge=None, padding_letter='x',
129 wordlist=keywords, fitness=Pletters,
130 number_of_solutions=1, chunksize=500):
131 """Break a message enciphered using the Playfair cipher, using a dictionary
132 of keywords and frequency analysis.
133
134 If `wordlist` is not specified, use
135 [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
136 """
137
138 if letters_to_merge is None:
139 letters_to_merge = {'j': 'i'}
140
141 with multiprocessing.Pool() as pool:
142 helper_args = [(message, word, wrap,
143 letters_to_merge, padding_letter,
144 pad_replace,
145 fitness)
146 for word in wordlist
147 for wrap in KeywordWrapAlphabet
148 for pad_replace in [False, True]]
149 # Gotcha: the helper function here needs to be defined at the top level
150 # (limitation of Pool.starmap)
151 breaks = pool.starmap(playfair_break_worker, helper_args, chunksize)
152 if number_of_solutions == 1:
153 return max(breaks, key=lambda k: k[1])
154 else:
155 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
156
157 def playfair_break_worker(message, keyword, wrap,
158 letters_to_merge, padding_letter,
159 pad_replace,
160 fitness):
161 plaintext = playfair_decipher(message, keyword, padding_letter,
162 pad_replace,
163 letters_to_merge,
164 wrap)
165 if plaintext:
166 fit = fitness(plaintext)
167 else:
168 fit = float('-inf')
169 return (keyword, wrap, letters_to_merge, padding_letter, pad_replace), fit
170
171 def playfair_simulated_annealing_break(message, workers=10,
172 initial_temperature=200,
173 max_iterations=20000,
174 plain_alphabet=None,
175 cipher_alphabet=None,
176 fitness=Pletters, chunksize=1):
177 """Break a message enciphered using the Playfair cipher, using simulated
178 annealing to determine the keyword. This function just sets up a stable
179 of workers who do the actual work, implemented as
180 `szyfrow.playfair.playfair_simulated_annealing_break_worker`.
181
182 See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
183 for detail on how this works.
184 """
185 worker_args = []
186 ciphertext = sanitise(message)
187 for i in range(workers):
188 if plain_alphabet is None:
189 used_plain_alphabet = string.ascii_lowercase
190 else:
191 used_plain_alphabet = plain_alphabet
192 if cipher_alphabet is None:
193 # used_cipher_alphabet = list(string.ascii_lowercase)
194 # random.shuffle(used_cipher_alphabet)
195 # used_cipher_alphabet = cat(used_cipher_alphabet)
196 used_cipher_alphabet = random.choice(keywords)
197 else:
198 used_cipher_alphabet = cipher_alphabet
199 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
200 initial_temperature, max_iterations, fitness))
201 with multiprocessing.Pool() as pool:
202 breaks = pool.starmap(playfair_simulated_annealing_break_worker,
203 worker_args, chunksize)
204 return max(breaks, key=lambda k: k[1])
205
206 def playfair_simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
207 t0, max_iterations, fitness):
208 """One thread of a simulated annealing run.
209 See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
210 for detail on how this works.
211 """
212 def swap(letters, i, j):
213 if i > j:
214 i, j = j, i
215 if i == j:
216 return letters
217 else:
218 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
219 letters[j+1:])
220
221 temperature = t0
222
223 dt = t0 / (0.9 * max_iterations)
224
225 current_alphabet = cipher_alphabet
226 current_wrap = KeywordWrapAlphabet.from_a
227 current_letters_to_merge = {'j': 'i'}
228 current_pad_replace = False
229 current_padding_letter = 'x'
230
231 alphabet = current_alphabet
232 wrap = current_wrap
233 letters_to_merge = current_letters_to_merge
234 pad_replace = current_pad_replace
235 padding_letter = current_padding_letter
236 plaintext = playfair_decipher(message, alphabet, padding_letter,
237 pad_replace,
238 letters_to_merge,
239 wrap)
240 current_fitness = fitness(plaintext)
241
242 best_alphabet = current_alphabet
243 best_fitness = current_fitness
244 best_plaintext = plaintext
245
246 # print('starting for', max_iterations)
247 for i in range(max_iterations):
248 chosen = random.random()
249 # if chosen < 0.7:
250 # swap_a = random.randrange(26)
251 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
252 # alphabet = swap(current_alphabet, swap_a, swap_b)
253 # elif chosen < 0.8:
254 # wrap = random.choice(list(KeywordWrapAlphabet))
255 # elif chosen < 0.9:
256 # pad_replace = random.choice([True, False])
257 # elif chosen < 0.95:
258 # letter_from = random.choice(string.ascii_lowercase)
259 # letter_to = random.choice([c for c in string.ascii_lowercase if c != letter_from])
260 # letters_to_merge = {letter_from: letter_to}
261 # else:
262 # padding_letter = random.choice(string.ascii_lowercase)
263 if chosen < 0.7:
264 swap_a = random.randrange(len(current_alphabet))
265 swap_b = (swap_a + int(random.gauss(0, 4))) % len(current_alphabet)
266 alphabet = swap(current_alphabet, swap_a, swap_b)
267 elif chosen < 0.85:
268 new_letter = random.choice(string.ascii_lowercase)
269 alphabet = swap(current_alphabet + new_letter, random.randrange(len(current_alphabet)), len(current_alphabet))
270 else:
271 if len(current_alphabet) > 1:
272 deletion_position = random.randrange(len(current_alphabet))
273 alphabet = current_alphabet[:deletion_position] + current_alphabet[deletion_position+1:]
274 else:
275 alphabet = current_alphabet
276
277 try:
278 plaintext = playfair_decipher(message, alphabet, padding_letter,
279 pad_replace,
280 letters_to_merge,
281 wrap)
282 except:
283 print("Error", alphabet, padding_letter,
284 pad_replace,
285 letters_to_merge,
286 wrap)
287 raise
288
289 new_fitness = fitness(plaintext)
290 try:
291 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
292 except (OverflowError, ZeroDivisionError):
293 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
294 sa_chance = 0
295 if (new_fitness > current_fitness or random.random() < sa_chance):
296 current_fitness = new_fitness
297 current_alphabet = alphabet
298 current_wrap = wrap
299 current_letters_to_merge = letters_to_merge
300 current_pad_replace = pad_replace
301 current_padding_letter = padding_letter
302
303 if current_fitness > best_fitness:
304 best_alphabet = current_alphabet
305 best_wrap = current_wrap
306 best_letters_to_merge = current_letters_to_merge
307 best_pad_replace = current_pad_replace
308 best_padding_letter = current_padding_letter
309 best_fitness = current_fitness
310 best_plaintext = plaintext
311 temperature = max(temperature - dt, 0.001)
312
313 return { 'alphabet': best_alphabet
314 , 'wrap': best_wrap
315 , 'letters_to_merge': best_letters_to_merge
316 , 'pad_replace': best_pad_replace
317 , 'padding_letter': best_padding_letter
318 }, best_fitness # current_alphabet, current_fitness