b903ec0e1646de491a7510f341fb66c9e877081f
[szyfrow.git] / playfair.py
1 from szyfrow.support.utilities import *
2 from szyfrow.support.language_models import *
3 from szyfrow.keyword_cipher import KeywordWrapAlphabet, keyword_cipher_alphabet_of
4 from szyfrow.polybius import polybius_grid
5 import multiprocessing
6
7 def playfair_wrap(n, lowest, highest):
8 skip = highest - lowest + 1
9 while n > highest or n < lowest:
10 if n > highest:
11 n -= skip
12 if n < lowest:
13 n += skip
14 return n
15
16 def playfair_encipher_bigram(ab, grid, padding_letter='x'):
17 a, b = ab
18 max_row = max(c[0] for c in grid.values())
19 max_col = max(c[1] for c in grid.values())
20 min_row = min(c[0] for c in grid.values())
21 min_col = min(c[1] for c in grid.values())
22 if a == b:
23 b = padding_letter
24 if grid[a][0] == grid[b][0]: # same row
25 cp = (grid[a][0], playfair_wrap(grid[a][1] + 1, min_col, max_col))
26 dp = (grid[b][0], playfair_wrap(grid[b][1] + 1, min_col, max_col))
27 elif grid[a][1] == grid[b][1]: # same column
28 cp = (playfair_wrap(grid[a][0] + 1, min_row, max_row), grid[a][1])
29 dp = (playfair_wrap(grid[b][0] + 1, min_row, max_row), grid[b][1])
30 else:
31 cp = (grid[a][0], grid[b][1])
32 dp = (grid[b][0], grid[a][1])
33 c = [k for k, v in grid.items() if v == cp][0]
34 d = [k for k, v in grid.items() if v == dp][0]
35 return c + d
36
37 def playfair_decipher_bigram(ab, grid, padding_letter='x'):
38 a, b = ab
39 max_row = max(c[0] for c in grid.values())
40 max_col = max(c[1] for c in grid.values())
41 min_row = min(c[0] for c in grid.values())
42 min_col = min(c[1] for c in grid.values())
43 if a == b:
44 b = padding_letter
45 if grid[a][0] == grid[b][0]: # same row
46 cp = (grid[a][0], playfair_wrap(grid[a][1] - 1, min_col, max_col))
47 dp = (grid[b][0], playfair_wrap(grid[b][1] - 1, min_col, max_col))
48 elif grid[a][1] == grid[b][1]: # same column
49 cp = (playfair_wrap(grid[a][0] - 1, min_row, max_row), grid[a][1])
50 dp = (playfair_wrap(grid[b][0] - 1, min_row, max_row), grid[b][1])
51 else:
52 cp = (grid[a][0], grid[b][1])
53 dp = (grid[b][0], grid[a][1])
54 c = [k for k, v in grid.items() if v == cp][0]
55 d = [k for k, v in grid.items() if v == dp][0]
56 return c + d
57
58 def playfair_bigrams(text, padding_letter='x', padding_replaces_repeat=True):
59 i = 0
60 bigrams = []
61 while i < len(text):
62 bigram = text[i:i+2]
63 if len(bigram) == 1:
64 i = len(text) + 1
65 bigram = bigram + padding_letter
66 else:
67 if bigram[0] == bigram[1]:
68 bigram = bigram[0] + padding_letter
69 if padding_replaces_repeat:
70 i += 2
71 else:
72 i += 1
73 else:
74 i += 2
75 bigrams += [bigram]
76 return bigrams
77
78 def playfair_encipher(message, keyword, padding_letter='x',
79 padding_replaces_repeat=False, letters_to_merge=None,
80 wrap_alphabet=KeywordWrapAlphabet.from_a):
81 column_order = list(range(5))
82 row_order = list(range(5))
83 if letters_to_merge is None:
84 letters_to_merge = {'j': 'i'}
85 grid = polybius_grid(keyword, column_order, row_order,
86 letters_to_merge=letters_to_merge,
87 wrap_alphabet=wrap_alphabet)
88 message_bigrams = playfair_bigrams(sanitise(message), padding_letter=padding_letter,
89 padding_replaces_repeat=padding_replaces_repeat)
90 ciphertext_bigrams = [playfair_encipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
91 return cat(ciphertext_bigrams)
92
93 def playfair_decipher(message, keyword, padding_letter='x',
94 padding_replaces_repeat=False, letters_to_merge=None,
95 wrap_alphabet=KeywordWrapAlphabet.from_a):
96 column_order = list(range(5))
97 row_order = list(range(5))
98 if letters_to_merge is None:
99 letters_to_merge = {'j': 'i'}
100 grid = polybius_grid(keyword, column_order, row_order,
101 letters_to_merge=letters_to_merge,
102 wrap_alphabet=wrap_alphabet)
103 message_bigrams = playfair_bigrams(sanitise(message), padding_letter=padding_letter,
104 padding_replaces_repeat=padding_replaces_repeat)
105 plaintext_bigrams = [playfair_decipher_bigram(b, grid, padding_letter=padding_letter) for b in message_bigrams]
106 return cat(plaintext_bigrams)
107
108 def playfair_break_mp(message,
109 letters_to_merge=None, padding_letter='x',
110 wordlist=keywords, fitness=Pletters,
111 number_of_solutions=1, chunksize=500):
112 if letters_to_merge is None:
113 letters_to_merge = {'j': 'i'}
114
115 with multiprocessing.Pool() as pool:
116 helper_args = [(message, word, wrap,
117 letters_to_merge, padding_letter,
118 pad_replace,
119 fitness)
120 for word in wordlist
121 for wrap in KeywordWrapAlphabet
122 for pad_replace in [False, True]]
123 # Gotcha: the helper function here needs to be defined at the top level
124 # (limitation of Pool.starmap)
125 breaks = pool.starmap(playfair_break_worker, helper_args, chunksize)
126 if number_of_solutions == 1:
127 return max(breaks, key=lambda k: k[1])
128 else:
129 return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
130
131 def playfair_break_worker(message, keyword, wrap,
132 letters_to_merge, padding_letter,
133 pad_replace,
134 fitness):
135 plaintext = playfair_decipher(message, keyword, padding_letter,
136 pad_replace,
137 letters_to_merge,
138 wrap)
139 if plaintext:
140 fit = fitness(plaintext)
141 else:
142 fit = float('-inf')
143 return (keyword, wrap, letters_to_merge, padding_letter, pad_replace), fit
144
145 def playfair_simulated_annealing_break(message, workers=10,
146 initial_temperature=200,
147 max_iterations=20000,
148 plain_alphabet=None,
149 cipher_alphabet=None,
150 fitness=Pletters, chunksize=1):
151 worker_args = []
152 ciphertext = sanitise(message)
153 for i in range(workers):
154 if plain_alphabet is None:
155 used_plain_alphabet = string.ascii_lowercase
156 else:
157 used_plain_alphabet = plain_alphabet
158 if cipher_alphabet is None:
159 # used_cipher_alphabet = list(string.ascii_lowercase)
160 # random.shuffle(used_cipher_alphabet)
161 # used_cipher_alphabet = cat(used_cipher_alphabet)
162 used_cipher_alphabet = random.choice(keywords)
163 else:
164 used_cipher_alphabet = cipher_alphabet
165 worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
166 initial_temperature, max_iterations, fitness))
167 with multiprocessing.Pool() as pool:
168 breaks = pool.starmap(playfair_simulated_annealing_break_worker,
169 worker_args, chunksize)
170 return max(breaks, key=lambda k: k[1])
171
172 def playfair_simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
173 t0, max_iterations, fitness):
174 def swap(letters, i, j):
175 if i > j:
176 i, j = j, i
177 if i == j:
178 return letters
179 else:
180 return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
181 letters[j+1:])
182
183 temperature = t0
184
185 dt = t0 / (0.9 * max_iterations)
186
187 current_alphabet = cipher_alphabet
188 current_wrap = KeywordWrapAlphabet.from_a
189 current_letters_to_merge = {'j': 'i'}
190 current_pad_replace = False
191 current_padding_letter = 'x'
192
193 alphabet = current_alphabet
194 wrap = current_wrap
195 letters_to_merge = current_letters_to_merge
196 pad_replace = current_pad_replace
197 padding_letter = current_padding_letter
198 plaintext = playfair_decipher(message, alphabet, padding_letter,
199 pad_replace,
200 letters_to_merge,
201 wrap)
202 current_fitness = fitness(plaintext)
203
204 best_alphabet = current_alphabet
205 best_fitness = current_fitness
206 best_plaintext = plaintext
207
208 # print('starting for', max_iterations)
209 for i in range(max_iterations):
210 chosen = random.random()
211 # if chosen < 0.7:
212 # swap_a = random.randrange(26)
213 # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
214 # alphabet = swap(current_alphabet, swap_a, swap_b)
215 # elif chosen < 0.8:
216 # wrap = random.choice(list(KeywordWrapAlphabet))
217 # elif chosen < 0.9:
218 # pad_replace = random.choice([True, False])
219 # elif chosen < 0.95:
220 # letter_from = random.choice(string.ascii_lowercase)
221 # letter_to = random.choice([c for c in string.ascii_lowercase if c != letter_from])
222 # letters_to_merge = {letter_from: letter_to}
223 # else:
224 # padding_letter = random.choice(string.ascii_lowercase)
225 if chosen < 0.7:
226 swap_a = random.randrange(len(current_alphabet))
227 swap_b = (swap_a + int(random.gauss(0, 4))) % len(current_alphabet)
228 alphabet = swap(current_alphabet, swap_a, swap_b)
229 elif chosen < 0.85:
230 new_letter = random.choice(string.ascii_lowercase)
231 alphabet = swap(current_alphabet + new_letter, random.randrange(len(current_alphabet)), len(current_alphabet))
232 else:
233 if len(current_alphabet) > 1:
234 deletion_position = random.randrange(len(current_alphabet))
235 alphabet = current_alphabet[:deletion_position] + current_alphabet[deletion_position+1:]
236 else:
237 alphabet = current_alphabet
238
239 try:
240 plaintext = playfair_decipher(message, alphabet, padding_letter,
241 pad_replace,
242 letters_to_merge,
243 wrap)
244 except:
245 print("Error", alphabet, padding_letter,
246 pad_replace,
247 letters_to_merge,
248 wrap)
249 raise
250
251 new_fitness = fitness(plaintext)
252 try:
253 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
254 except (OverflowError, ZeroDivisionError):
255 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
256 sa_chance = 0
257 if (new_fitness > current_fitness or random.random() < sa_chance):
258 current_fitness = new_fitness
259 current_alphabet = alphabet
260 current_wrap = wrap
261 current_letters_to_merge = letters_to_merge
262 current_pad_replace = pad_replace
263 current_padding_letter = padding_letter
264
265 if current_fitness > best_fitness:
266 best_alphabet = current_alphabet
267 best_wrap = current_wrap
268 best_letters_to_merge = current_letters_to_merge
269 best_pad_replace = current_pad_replace
270 best_padding_letter = current_padding_letter
271 best_fitness = current_fitness
272 best_plaintext = plaintext
273 temperature = max(temperature - dt, 0.001)
274
275 return { 'alphabet': best_alphabet
276 , 'wrap': best_wrap
277 , 'letters_to_merge': best_letters_to_merge
278 , 'pad_replace': best_pad_replace
279 , 'padding_letter': best_padding_letter
280 }, best_fitness # current_alphabet, current_fitness