e6939db5af2a031ca3e17216902162dbc65bd6a1
[szyfrow.git] / szyfrow / playfair.py
1 from szyfrow.support.utilities import *
2 from szyfrow.support.language_models import *
3 from szyfrow.keyword_cipher import KeywordWrapAlphabet, keyword_cipher_alphabet_of
4 from szyfrow.polybius import polybius_grid
5 import multiprocessing
6
7 def playfair_wrap(n, lowest, highest):
8 skip = highest - lowest + 1
9 while n > highest or n < lowest:
10 if n > highest:
11 n -= skip
12 if n < lowest:
13 n += skip
14 return n
15
16 def playfair_encipher_bigram(ab, grid, padding_letter='x'):
17 a, b = ab
18 max_row = max(c[0] for c in grid.values())
19 max_col = max(c[1] for c in grid.values())
20 min_row = min(c[0] for c in grid.values())
21 min_col = min(c[1] for c in grid.values())
22 if a == b:
23 b = padding_letter
24 if grid[a][0] == grid[b][0]: # same row
25 cp = (grid[a][0], playfair_wrap(grid[a][1] + 1, min_col, max_col))
26 dp = (grid[b][0], playfair_wrap(grid[b][1] + 1, min_col, max_col))
27 elif grid[a][1] == grid[b][1]: # same column
28 cp = (playfair_wrap(grid[a][0] + 1, min_row, max_row), grid[a][1])
29 dp = (playfair_wrap(grid[b][0] + 1, min_row, max_row), grid[b][1])
30 else:
31 cp = (grid[a][0], grid[b][1])
32 dp = (grid[b][0], grid[a][1])
33 c = [k for k, v in grid.items() if v == cp][0]
34 d = [k for k, v in grid.items() if v == dp][0]
35 return c + d
36
37 def playfair_decipher_bigram(ab, grid, padding_letter='x'):
38 a, b = ab
39 max_row = max(c[0] for c in grid.values())
40 max_col = max(c[1] for c in grid.values())
41 min_row = min(c[0] for c in grid.values())
42 min_col = min(c[1] for c in grid.values())
43 if a == b:
44 b = padding_letter
45 if grid[a][0] == grid[b][0]: # same row
46 cp = (grid[a][0], playfair_wrap(grid[a][1] - 1, min_col, max_col))
47 dp = (grid[b][0], playfair_wrap(grid[b][1] - 1, min_col, max_col))
48 elif grid[a][1] == grid[b][1]: # same column
49 cp = (playfair_wrap(grid[a][0] - 1, min_row, max_row), grid[a][1])
50 dp = (playfair_wrap(grid[b][0] - 1, min_row, max_row), grid[b][1])
51 else:
52 cp = (grid[a][0], grid[b][1])
53 dp = (grid[b][0], grid[a][1])
54 c = [k for k, v in grid.items() if v == cp][0]
55 d = [k for k, v in grid.items() if v == dp][0]
56 return c + d
57
58 def playfair_bigrams(text, padding_letter='x', padding_replaces_repeat=True):
59 i = 0
60 bigrams = []
61 while i < len(text):
62 bigram = text[i:i+2]
63 if len(bigram) == 1:
64 i = len(text) + 1
65 bigram = bigram + padding_letter
66 else:
67 if bigram[0] == bigram[1]:
68 bigram = bigram[0] + padding_letter
69 if padding_replaces_repeat:
70 i += 2
71 else:
72 i += 1
73 else:
74 i += 2
75 bigrams += [bigram]
76 return bigrams
77
78 def playfair_encipher(message, keyword, padding_letter='x',
79 padding_replaces_repeat=False, letters_to_merge=None,
80 wrap_alphabet=KeywordWrapAlphabet.from_a):
81 column_order = list(range(5))
82 row_order = list(range(5))
83 if letters_to_merge is None:
84 letters_to_merge = {'j': 'i'}
85 grid = polybius_grid(keyword, column_order, row_order,
86 letters_to_merge=letters_to_merge,
87 wrap_alphabet=wrap_alphabet)
88 message_bigrams = playfair_bigrams(
89 sanitise(message), padding_letter=padding_letter,
90 padding_replaces_repeat=padding_replaces_repeat)
91 ciphertext_bigrams = [playfair_encipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
92 return cat(ciphertext_bigrams)
93
94 def playfair_decipher(message, keyword, padding_letter='x',
95 padding_replaces_repeat=False, letters_to_merge=None,
96 wrap_alphabet=KeywordWrapAlphabet.from_a):
97 column_order = list(range(5))
98 row_order = list(range(5))
99 if letters_to_merge is None:
100 letters_to_merge = {'j': 'i'}
101 grid = polybius_grid(keyword, column_order, row_order,
102 letters_to_merge=letters_to_merge,
103 wrap_alphabet=wrap_alphabet)
104 message_bigrams = playfair_bigrams(
105 sanitise(message), padding_letter=padding_letter,
106 padding_replaces_repeat=padding_replaces_repeat)
107 plaintext_bigrams = [playfair_decipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
108 return cat(plaintext_bigrams)
109
110 def playfair_break(message,
111 letters_to_merge=None, padding_letter='x',
112 wordlist=keywords, fitness=Pletters,
113 number_of_solutions=1, chunksize=500):
114 if letters_to_merge is None:
115 letters_to_merge = {'j': 'i'}
116
117 with multiprocessing.Pool() as pool:
118 helper_args = [(message, word, wrap,
119 letters_to_merge, padding_letter,
120 pad_replace,
121 fitness)
122 for word in wordlist
123 for wrap in KeywordWrapAlphabet
124 for pad_replace in [False, True]]
125 # Gotcha: the helper function here needs to be defined at the top level
126 # (limitation of Pool.starmap)
127 breaks = pool.starmap(playfair_break_worker, helper_args, chunksize)
128 if number_of_solutions == 1:
129 return max(breaks, key=lambda k: k[1])
130 else:
131 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
132
133 def playfair_break_worker(message, keyword, wrap,
134 letters_to_merge, padding_letter,
135 pad_replace,
136 fitness):
137 plaintext = playfair_decipher(message, keyword, padding_letter,
138 pad_replace,
139 letters_to_merge,
140 wrap)
141 if plaintext:
142 fit = fitness(plaintext)
143 else:
144 fit = float('-inf')
145 return (keyword, wrap, letters_to_merge, padding_letter, pad_replace), fit
146
147 def playfair_simulated_annealing_break(message, workers=10,
148 initial_temperature=200,
149 max_iterations=20000,
150 plain_alphabet=None,
151 cipher_alphabet=None,
152 fitness=Pletters, chunksize=1):
153 worker_args = []
154 ciphertext = sanitise(message)
155 for i in range(workers):
156 if plain_alphabet is None:
157 used_plain_alphabet = string.ascii_lowercase
158 else:
159 used_plain_alphabet = plain_alphabet
160 if cipher_alphabet is None:
161 # used_cipher_alphabet = list(string.ascii_lowercase)
162 # random.shuffle(used_cipher_alphabet)
163 # used_cipher_alphabet = cat(used_cipher_alphabet)
164 used_cipher_alphabet = random.choice(keywords)
165 else:
166 used_cipher_alphabet = cipher_alphabet
167 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
168 initial_temperature, max_iterations, fitness))
169 with multiprocessing.Pool() as pool:
170 breaks = pool.starmap(playfair_simulated_annealing_break_worker,
171 worker_args, chunksize)
172 return max(breaks, key=lambda k: k[1])
173
174 def playfair_simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
175 t0, max_iterations, fitness):
176 def swap(letters, i, j):
177 if i > j:
178 i, j = j, i
179 if i == j:
180 return letters
181 else:
182 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
183 letters[j+1:])
184
185 temperature = t0
186
187 dt = t0 / (0.9 * max_iterations)
188
189 current_alphabet = cipher_alphabet
190 current_wrap = KeywordWrapAlphabet.from_a
191 current_letters_to_merge = {'j': 'i'}
192 current_pad_replace = False
193 current_padding_letter = 'x'
194
195 alphabet = current_alphabet
196 wrap = current_wrap
197 letters_to_merge = current_letters_to_merge
198 pad_replace = current_pad_replace
199 padding_letter = current_padding_letter
200 plaintext = playfair_decipher(message, alphabet, padding_letter,
201 pad_replace,
202 letters_to_merge,
203 wrap)
204 current_fitness = fitness(plaintext)
205
206 best_alphabet = current_alphabet
207 best_fitness = current_fitness
208 best_plaintext = plaintext
209
210 # print('starting for', max_iterations)
211 for i in range(max_iterations):
212 chosen = random.random()
213 # if chosen < 0.7:
214 # swap_a = random.randrange(26)
215 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
216 # alphabet = swap(current_alphabet, swap_a, swap_b)
217 # elif chosen < 0.8:
218 # wrap = random.choice(list(KeywordWrapAlphabet))
219 # elif chosen < 0.9:
220 # pad_replace = random.choice([True, False])
221 # elif chosen < 0.95:
222 # letter_from = random.choice(string.ascii_lowercase)
223 # letter_to = random.choice([c for c in string.ascii_lowercase if c != letter_from])
224 # letters_to_merge = {letter_from: letter_to}
225 # else:
226 # padding_letter = random.choice(string.ascii_lowercase)
227 if chosen < 0.7:
228 swap_a = random.randrange(len(current_alphabet))
229 swap_b = (swap_a + int(random.gauss(0, 4))) % len(current_alphabet)
230 alphabet = swap(current_alphabet, swap_a, swap_b)
231 elif chosen < 0.85:
232 new_letter = random.choice(string.ascii_lowercase)
233 alphabet = swap(current_alphabet + new_letter, random.randrange(len(current_alphabet)), len(current_alphabet))
234 else:
235 if len(current_alphabet) > 1:
236 deletion_position = random.randrange(len(current_alphabet))
237 alphabet = current_alphabet[:deletion_position] + current_alphabet[deletion_position+1:]
238 else:
239 alphabet = current_alphabet
240
241 try:
242 plaintext = playfair_decipher(message, alphabet, padding_letter,
243 pad_replace,
244 letters_to_merge,
245 wrap)
246 except:
247 print("Error", alphabet, padding_letter,
248 pad_replace,
249 letters_to_merge,
250 wrap)
251 raise
252
253 new_fitness = fitness(plaintext)
254 try:
255 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
256 except (OverflowError, ZeroDivisionError):
257 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
258 sa_chance = 0
259 if (new_fitness > current_fitness or random.random() < sa_chance):
260 current_fitness = new_fitness
261 current_alphabet = alphabet
262 current_wrap = wrap
263 current_letters_to_merge = letters_to_merge
264 current_pad_replace = pad_replace
265 current_padding_letter = padding_letter
266
267 if current_fitness > best_fitness:
268 best_alphabet = current_alphabet
269 best_wrap = current_wrap
270 best_letters_to_merge = current_letters_to_merge
271 best_pad_replace = current_pad_replace
272 best_padding_letter = current_padding_letter
273 best_fitness = current_fitness
274 best_plaintext = plaintext
275 temperature = max(temperature - dt, 0.001)
276
277 return { 'alphabet': best_alphabet
278 , 'wrap': best_wrap
279 , 'letters_to_merge': best_letters_to_merge
280 , 'pad_replace': best_pad_replace
281 , 'padding_letter': best_padding_letter
282 }, best_fitness # current_alphabet, current_fitness