0c0bc6e04c97fba11e323995fd68e0f9b394b7c3
[szyfrow.git] / szyfrow / playfair.py
1 from support.utilities import *
2 from support.language_models import *
3 from cipher.keyword_cipher import KeywordWrapAlphabet, keyword_cipher_alphabet_of
4 from cipher.polybius import polybius_grid
5 import multiprocessing
6
7 from logger import logger
8
9 def playfair_wrap(n, lowest, highest):
10 skip = highest - lowest + 1
11 while n > highest or n < lowest:
12 if n > highest:
13 n -= skip
14 if n < lowest:
15 n += skip
16 return n
17
18 def playfair_encipher_bigram(ab, grid, padding_letter='x'):
19 a, b = ab
20 max_row = max(c[0] for c in grid.values())
21 max_col = max(c[1] for c in grid.values())
22 min_row = min(c[0] for c in grid.values())
23 min_col = min(c[1] for c in grid.values())
24 if a == b:
25 b = padding_letter
26 if grid[a][0] == grid[b][0]: # same row
27 cp = (grid[a][0], playfair_wrap(grid[a][1] + 1, min_col, max_col))
28 dp = (grid[b][0], playfair_wrap(grid[b][1] + 1, min_col, max_col))
29 elif grid[a][1] == grid[b][1]: # same column
30 cp = (playfair_wrap(grid[a][0] + 1, min_row, max_row), grid[a][1])
31 dp = (playfair_wrap(grid[b][0] + 1, min_row, max_row), grid[b][1])
32 else:
33 cp = (grid[a][0], grid[b][1])
34 dp = (grid[b][0], grid[a][1])
35 c = [k for k, v in grid.items() if v == cp][0]
36 d = [k for k, v in grid.items() if v == dp][0]
37 return c + d
38
39 def playfair_decipher_bigram(ab, grid, padding_letter='x'):
40 a, b = ab
41 max_row = max(c[0] for c in grid.values())
42 max_col = max(c[1] for c in grid.values())
43 min_row = min(c[0] for c in grid.values())
44 min_col = min(c[1] for c in grid.values())
45 if a == b:
46 b = padding_letter
47 if grid[a][0] == grid[b][0]: # same row
48 cp = (grid[a][0], playfair_wrap(grid[a][1] - 1, min_col, max_col))
49 dp = (grid[b][0], playfair_wrap(grid[b][1] - 1, min_col, max_col))
50 elif grid[a][1] == grid[b][1]: # same column
51 cp = (playfair_wrap(grid[a][0] - 1, min_row, max_row), grid[a][1])
52 dp = (playfair_wrap(grid[b][0] - 1, min_row, max_row), grid[b][1])
53 else:
54 cp = (grid[a][0], grid[b][1])
55 dp = (grid[b][0], grid[a][1])
56 c = [k for k, v in grid.items() if v == cp][0]
57 d = [k for k, v in grid.items() if v == dp][0]
58 return c + d
59
60 def playfair_bigrams(text, padding_letter='x', padding_replaces_repeat=True):
61 i = 0
62 bigrams = []
63 while i < len(text):
64 bigram = text[i:i+2]
65 if len(bigram) == 1:
66 i = len(text) + 1
67 bigram = bigram + padding_letter
68 else:
69 if bigram[0] == bigram[1]:
70 bigram = bigram[0] + padding_letter
71 if padding_replaces_repeat:
72 i += 2
73 else:
74 i += 1
75 else:
76 i += 2
77 bigrams += [bigram]
78 return bigrams
79
80 def playfair_encipher(message, keyword, padding_letter='x',
81 padding_replaces_repeat=False, letters_to_merge=None,
82 wrap_alphabet=KeywordWrapAlphabet.from_a):
83 column_order = list(range(5))
84 row_order = list(range(5))
85 if letters_to_merge is None:
86 letters_to_merge = {'j': 'i'}
87 grid = polybius_grid(keyword, column_order, row_order,
88 letters_to_merge=letters_to_merge,
89 wrap_alphabet=wrap_alphabet)
90 message_bigrams = playfair_bigrams(sanitise(message), padding_letter=padding_letter,
91 padding_replaces_repeat=padding_replaces_repeat)
92 ciphertext_bigrams = [playfair_encipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
93 return cat(ciphertext_bigrams)
94
95 def playfair_decipher(message, keyword, padding_letter='x',
96 padding_replaces_repeat=False, letters_to_merge=None,
97 wrap_alphabet=KeywordWrapAlphabet.from_a):
98 column_order = list(range(5))
99 row_order = list(range(5))
100 if letters_to_merge is None:
101 letters_to_merge = {'j': 'i'}
102 grid = polybius_grid(keyword, column_order, row_order,
103 letters_to_merge=letters_to_merge,
104 wrap_alphabet=wrap_alphabet)
105 message_bigrams = playfair_bigrams(sanitise(message), padding_letter=padding_letter,
106 padding_replaces_repeat=padding_replaces_repeat)
107 plaintext_bigrams = [playfair_decipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
108 return cat(plaintext_bigrams)
109
110 def playfair_break_mp(message,
111 letters_to_merge=None, padding_letter='x',
112 wordlist=keywords, fitness=Pletters,
113 number_of_solutions=1, chunksize=500):
114 if letters_to_merge is None:
115 letters_to_merge = {'j': 'i'}
116
117 with multiprocessing.Pool() as pool:
118 helper_args = [(message, word, wrap,
119 letters_to_merge, padding_letter,
120 pad_replace,
121 fitness)
122 for word in wordlist
123 for wrap in KeywordWrapAlphabet
124 for pad_replace in [False, True]]
125 # Gotcha: the helper function here needs to be defined at the top level
126 # (limitation of Pool.starmap)
127 breaks = pool.starmap(playfair_break_worker, helper_args, chunksize)
128 if number_of_solutions == 1:
129 return max(breaks, key=lambda k: k[1])
130 else:
131 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
132
133 def playfair_break_worker(message, keyword, wrap,
134 letters_to_merge, padding_letter,
135 pad_replace,
136 fitness):
137 plaintext = playfair_decipher(message, keyword, padding_letter,
138 pad_replace,
139 letters_to_merge,
140 wrap)
141 if plaintext:
142 fit = fitness(plaintext)
143 else:
144 fit = float('-inf')
145 logger.debug('Playfair break attempt using key {0} (wrap={1}, merging {2}, '
146 'pad replaces={3}), '
147 'gives fit of {4} and decrypt starting: '
148 '{5}'.format(keyword, wrap, letters_to_merge, pad_replace,
149 fit, sanitise(plaintext)[:50]))
150 return (keyword, wrap, letters_to_merge, padding_letter, pad_replace), fit
151
152 def playfair_simulated_annealing_break(message, workers=10,
153 initial_temperature=200,
154 max_iterations=20000,
155 plain_alphabet=None,
156 cipher_alphabet=None,
157 fitness=Pletters, chunksize=1):
158 worker_args = []
159 ciphertext = sanitise(message)
160 for i in range(workers):
161 if plain_alphabet is None:
162 used_plain_alphabet = string.ascii_lowercase
163 else:
164 used_plain_alphabet = plain_alphabet
165 if cipher_alphabet is None:
166 # used_cipher_alphabet = list(string.ascii_lowercase)
167 # random.shuffle(used_cipher_alphabet)
168 # used_cipher_alphabet = cat(used_cipher_alphabet)
169 used_cipher_alphabet = random.choice(keywords)
170 else:
171 used_cipher_alphabet = cipher_alphabet
172 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
173 initial_temperature, max_iterations, fitness))
174 with multiprocessing.Pool() as pool:
175 breaks = pool.starmap(playfair_simulated_annealing_break_worker,
176 worker_args, chunksize)
177 return max(breaks, key=lambda k: k[1])
178
179 def playfair_simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
180 t0, max_iterations, fitness):
181 def swap(letters, i, j):
182 if i > j:
183 i, j = j, i
184 if i == j:
185 return letters
186 else:
187 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
188 letters[j+1:])
189
190 temperature = t0
191
192 dt = t0 / (0.9 * max_iterations)
193
194 current_alphabet = cipher_alphabet
195 current_wrap = KeywordWrapAlphabet.from_a
196 current_letters_to_merge = {'j': 'i'}
197 current_pad_replace = False
198 current_padding_letter = 'x'
199
200 alphabet = current_alphabet
201 wrap = current_wrap
202 letters_to_merge = current_letters_to_merge
203 pad_replace = current_pad_replace
204 padding_letter = current_padding_letter
205 plaintext = playfair_decipher(message, alphabet, padding_letter,
206 pad_replace,
207 letters_to_merge,
208 wrap)
209 current_fitness = fitness(plaintext)
210
211 best_alphabet = current_alphabet
212 best_fitness = current_fitness
213 best_plaintext = plaintext
214
215 # print('starting for', max_iterations)
216 for i in range(max_iterations):
217 chosen = random.random()
218 # if chosen < 0.7:
219 # swap_a = random.randrange(26)
220 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
221 # alphabet = swap(current_alphabet, swap_a, swap_b)
222 # elif chosen < 0.8:
223 # wrap = random.choice(list(KeywordWrapAlphabet))
224 # elif chosen < 0.9:
225 # pad_replace = random.choice([True, False])
226 # elif chosen < 0.95:
227 # letter_from = random.choice(string.ascii_lowercase)
228 # letter_to = random.choice([c for c in string.ascii_lowercase if c != letter_from])
229 # letters_to_merge = {letter_from: letter_to}
230 # else:
231 # padding_letter = random.choice(string.ascii_lowercase)
232 if chosen < 0.7:
233 swap_a = random.randrange(len(current_alphabet))
234 swap_b = (swap_a + int(random.gauss(0, 4))) % len(current_alphabet)
235 alphabet = swap(current_alphabet, swap_a, swap_b)
236 elif chosen < 0.85:
237 new_letter = random.choice(string.ascii_lowercase)
238 alphabet = swap(current_alphabet + new_letter, random.randrange(len(current_alphabet)), len(current_alphabet))
239 else:
240 if len(current_alphabet) > 1:
241 deletion_position = random.randrange(len(current_alphabet))
242 alphabet = current_alphabet[:deletion_position] + current_alphabet[deletion_position+1:]
243 else:
244 alphabet = current_alphabet
245
246 try:
247 plaintext = playfair_decipher(message, alphabet, padding_letter,
248 pad_replace,
249 letters_to_merge,
250 wrap)
251 except:
252 print("Error", alphabet, padding_letter,
253 pad_replace,
254 letters_to_merge,
255 wrap)
256 raise
257
258 new_fitness = fitness(plaintext)
259 try:
260 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
261 except (OverflowError, ZeroDivisionError):
262 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
263 sa_chance = 0
264 if (new_fitness > current_fitness or random.random() < sa_chance):
265 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
266 # 'current alphabet {}, current_fitness {}, '
267 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
268 # current_fitness, best_plaintext[:50]))
269
270 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
271 current_fitness = new_fitness
272 current_alphabet = alphabet
273 current_wrap = wrap
274 current_letters_to_merge = letters_to_merge
275 current_pad_replace = pad_replace
276 current_padding_letter = padding_letter
277
278 if current_fitness > best_fitness:
279 best_alphabet = current_alphabet
280 best_wrap = current_wrap
281 best_letters_to_merge = current_letters_to_merge
282 best_pad_replace = current_pad_replace
283 best_padding_letter = current_padding_letter
284 best_fitness = current_fitness
285 best_plaintext = plaintext
286 if i % 500 == 0:
287 logger.debug('Simulated annealing: iteration {}, temperature {}, '
288 'current alphabet {}, current_fitness {}, '
289 'best_plaintext {}'.format(i, temperature, current_alphabet,
290 current_fitness, plaintext[:50]))
291 temperature = max(temperature - dt, 0.001)
292
293 return { 'alphabet': best_alphabet
294 , 'wrap': best_wrap
295 , 'letters_to_merge': best_letters_to_merge
296 , 'pad_replace': best_pad_replace
297 , 'padding_letter': best_padding_letter
298 }, best_fitness # current_alphabet, current_fitness