b84f0a94c08c1acefc4cece55054cab15ce5b8f3
[cipher-tools.git] / cipher / autokey.py
1 from utilities import *
2 from language_models import *
3 import multiprocessing
4
5 from logger import logger
6
7
8 def autokey_encipher(message, keyword):
9 """Encipher with the autokey cipher
10
11 >>> autokey_encipher('meetatthefountain', 'kilt')
12 'wmpmmxxaeyhbryoca'
13 """
14 shifts = [pos(l) for l in keyword + message]
15 pairs = zip(message, shifts)
16 return cat([caesar_encipher_letter(l, k) for l, k in pairs])
17
18 def autokey_decipher(ciphertext, keyword):
19 """Decipher with the autokey cipher
20
21 >>> autokey_decipher('wmpmmxxaeyhbryoca', 'kilt')
22 'meetatthefountain'
23 """
24 plaintext = []
25 keys = list(keyword)
26 for c in ciphertext:
27 plaintext_letter = caesar_decipher_letter(c, pos(keys[0]))
28 plaintext += [plaintext_letter]
29 keys = keys[1:] + [plaintext_letter]
30 return cat(plaintext)
31
32
33
34 def autokey_sa_break( message
35 , min_keylength=2
36 , max_keylength=20
37 , workers=10
38 , initial_temperature=200
39 , max_iterations=20000
40 , fitness=Pletters
41 , chunksize=1
42 , result_count=1
43 ):
44 """Break an autokey cipher by simulated annealing
45 """
46 worker_args = []
47 ciphertext = sanitise(message)
48 for keylength in range(min_keylength, max_keylength+1):
49 for i in range(workers):
50 key = cat(random.choice(string.ascii_lowercase) for _ in range(keylength))
51 worker_args.append((ciphertext, key,
52 initial_temperature, max_iterations, fitness))
53
54 with multiprocessing.Pool() as pool:
55 breaks = pool.starmap(autokey_sa_break_worker,
56 worker_args, chunksize)
57 if result_count <= 1:
58 return max(breaks, key=lambda k: k[1])
59 else:
60 return sorted(set(breaks), key=lambda k: k[1], reverse=True)[:result_count]
61
62
63 def autokey_sa_break_worker(message, key,
64 t0, max_iterations, fitness):
65
66 temperature = t0
67
68 dt = t0 / (0.9 * max_iterations)
69
70 plaintext = autokey_decipher(message, key)
71 current_fitness = fitness(plaintext)
72 current_key = key
73
74 best_key = current_key
75 best_fitness = current_fitness
76 best_plaintext = plaintext
77
78 # print('starting for', max_iterations)
79 for i in range(max_iterations):
80 swap_pos = random.randrange(len(current_key))
81 swap_char = random.choice(string.ascii_lowercase)
82
83 new_key = current_key[:swap_pos] + swap_char + current_key[swap_pos+1:]
84
85 plaintext = autokey_decipher(message, new_key)
86 new_fitness = fitness(plaintext)
87 try:
88 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
89 except (OverflowError, ZeroDivisionError):
90 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
91 sa_chance = 0
92 if (new_fitness > current_fitness or random.random() < sa_chance):
93 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
94 # 'current alphabet {}, current_fitness {}, '
95 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
96 # current_fitness, best_plaintext[:50]))
97
98 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
99 # print(new_fitness, new_key, plaintext[:100])
100 current_fitness = new_fitness
101 current_key = new_key
102
103 if current_fitness > best_fitness:
104 best_key = current_key
105 best_fitness = current_fitness
106 best_plaintext = plaintext
107 if i % 500 == 0:
108 logger.debug('Simulated annealing: iteration {}, temperature {}, '
109 'current key {}, current_fitness {}, '
110 'best_plaintext {}'.format(i, temperature, current_key,
111 current_fitness, plaintext[:50]))
112 temperature = max(temperature - dt, 0.001)
113
114 # print(best_key, best_fitness, best_plaintext[:70])
115 return best_key, best_fitness # current_alphabet, current_fitness
116
117 if __name__ == "__main__":
118 import doctest