Updated for challenge 9
[cipher-tools.git] / cipher / autokey.py
1 import math
2 import multiprocessing
3 from support.utilities import *
4 from support.language_models import *
5 from cipher.caesar import caesar_encipher_letter, caesar_decipher_letter
6
7 from logger import logger
8
9
10 def autokey_encipher(message, keyword):
11 """Encipher with the autokey cipher
12
13 >>> autokey_encipher('meetatthefountain', 'kilt')
14 'wmpmmxxaeyhbryoca'
15 """
16 shifts = [pos(l) for l in keyword + message]
17 pairs = zip(message, shifts)
18 return cat([caesar_encipher_letter(l, k) for l, k in pairs])
19
20 def autokey_decipher(ciphertext, keyword):
21 """Decipher with the autokey cipher
22
23 >>> autokey_decipher('wmpmmxxaeyhbryoca', 'kilt')
24 'meetatthefountain'
25 """
26 plaintext = []
27 keys = list(keyword)
28 for c in ciphertext:
29 plaintext_letter = caesar_decipher_letter(c, pos(keys[0]))
30 plaintext += [plaintext_letter]
31 keys = keys[1:] + [plaintext_letter]
32 return cat(plaintext)
33
34
35
36 def autokey_sa_break( message
37 , min_keylength=2
38 , max_keylength=20
39 , workers=10
40 , initial_temperature=200
41 , max_iterations=20000
42 , fitness=Pletters
43 , chunksize=1
44 , result_count=1
45 ):
46 """Break an autokey cipher by simulated annealing
47 """
48 worker_args = []
49 ciphertext = sanitise(message)
50 for keylength in range(min_keylength, max_keylength+1):
51 for i in range(workers):
52 key = cat(random.choice(string.ascii_lowercase) for _ in range(keylength))
53 worker_args.append((ciphertext, key,
54 initial_temperature, max_iterations, fitness))
55
56 with multiprocessing.Pool() as pool:
57 breaks = pool.starmap(autokey_sa_break_worker,
58 worker_args, chunksize)
59 if result_count <= 1:
60 return max(breaks, key=lambda k: k[1])
61 else:
62 return sorted(set(breaks), key=lambda k: k[1], reverse=True)[:result_count]
63
64
65 def autokey_sa_break_worker(message, key,
66 t0, max_iterations, fitness):
67
68 temperature = t0
69
70 dt = t0 / (0.9 * max_iterations)
71
72 plaintext = autokey_decipher(message, key)
73 current_fitness = fitness(plaintext)
74 current_key = key
75
76 best_key = current_key
77 best_fitness = current_fitness
78 best_plaintext = plaintext
79
80 # print('starting for', max_iterations)
81 for i in range(max_iterations):
82 swap_pos = random.randrange(len(current_key))
83 swap_char = random.choice(string.ascii_lowercase)
84
85 new_key = current_key[:swap_pos] + swap_char + current_key[swap_pos+1:]
86
87 plaintext = autokey_decipher(message, new_key)
88 new_fitness = fitness(plaintext)
89 try:
90 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
91 except (OverflowError, ZeroDivisionError):
92 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
93 sa_chance = 0
94 if (new_fitness > current_fitness or random.random() < sa_chance):
95 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
96 # 'current alphabet {}, current_fitness {}, '
97 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
98 # current_fitness, best_plaintext[:50]))
99
100 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
101 # print(new_fitness, new_key, plaintext[:100])
102 current_fitness = new_fitness
103 current_key = new_key
104
105 if current_fitness > best_fitness:
106 best_key = current_key
107 best_fitness = current_fitness
108 best_plaintext = plaintext
109 if i % 500 == 0:
110 logger.debug('Simulated annealing: iteration {}, temperature {}, '
111 'current key {}, current_fitness {}, '
112 'best_plaintext {}'.format(i, temperature, current_key,
113 current_fitness, plaintext[:50]))
114 temperature = max(temperature - dt, 0.001)
115
116 # print(best_key, best_fitness, best_plaintext[:70])
117 return best_key, best_fitness # current_alphabet, current_fitness
118
119 if __name__ == "__main__":
120 import doctest