ffdc3a95d96bbe0e972d19710a0a5934a4591e3f
[cipher-tools.git] / cipher / autokey.py
1 import multiprocessing
2 from support.utilities import *
3 from support.language_models import *
4 from cipher.caesar import caesar_encipher_letter, caesar_decipher_letter
5
6 from logger import logger
7
8
9 def autokey_encipher(message, keyword):
10 """Encipher with the autokey cipher
11
12 >>> autokey_encipher('meetatthefountain', 'kilt')
13 'wmpmmxxaeyhbryoca'
14 """
15 shifts = [pos(l) for l in keyword + message]
16 pairs = zip(message, shifts)
17 return cat([caesar_encipher_letter(l, k) for l, k in pairs])
18
19 def autokey_decipher(ciphertext, keyword):
20 """Decipher with the autokey cipher
21
22 >>> autokey_decipher('wmpmmxxaeyhbryoca', 'kilt')
23 'meetatthefountain'
24 """
25 plaintext = []
26 keys = list(keyword)
27 for c in ciphertext:
28 plaintext_letter = caesar_decipher_letter(c, pos(keys[0]))
29 plaintext += [plaintext_letter]
30 keys = keys[1:] + [plaintext_letter]
31 return cat(plaintext)
32
33
34
35 def autokey_sa_break( message
36 , min_keylength=2
37 , max_keylength=20
38 , workers=10
39 , initial_temperature=200
40 , max_iterations=20000
41 , fitness=Pletters
42 , chunksize=1
43 , result_count=1
44 ):
45 """Break an autokey cipher by simulated annealing
46 """
47 worker_args = []
48 ciphertext = sanitise(message)
49 for keylength in range(min_keylength, max_keylength+1):
50 for i in range(workers):
51 key = cat(random.choice(string.ascii_lowercase) for _ in range(keylength))
52 worker_args.append((ciphertext, key,
53 initial_temperature, max_iterations, fitness))
54
55 with multiprocessing.Pool() as pool:
56 breaks = pool.starmap(autokey_sa_break_worker,
57 worker_args, chunksize)
58 if result_count <= 1:
59 return max(breaks, key=lambda k: k[1])
60 else:
61 return sorted(set(breaks), key=lambda k: k[1], reverse=True)[:result_count]
62
63
64 def autokey_sa_break_worker(message, key,
65 t0, max_iterations, fitness):
66
67 temperature = t0
68
69 dt = t0 / (0.9 * max_iterations)
70
71 plaintext = autokey_decipher(message, key)
72 current_fitness = fitness(plaintext)
73 current_key = key
74
75 best_key = current_key
76 best_fitness = current_fitness
77 best_plaintext = plaintext
78
79 # print('starting for', max_iterations)
80 for i in range(max_iterations):
81 swap_pos = random.randrange(len(current_key))
82 swap_char = random.choice(string.ascii_lowercase)
83
84 new_key = current_key[:swap_pos] + swap_char + current_key[swap_pos+1:]
85
86 plaintext = autokey_decipher(message, new_key)
87 new_fitness = fitness(plaintext)
88 try:
89 sa_chance = math.exp((new_fitness - current_fitness) / temperature)
90 except (OverflowError, ZeroDivisionError):
91 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
92 sa_chance = 0
93 if (new_fitness > current_fitness or random.random() < sa_chance):
94 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
95 # 'current alphabet {}, current_fitness {}, '
96 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
97 # current_fitness, best_plaintext[:50]))
98
99 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
100 # print(new_fitness, new_key, plaintext[:100])
101 current_fitness = new_fitness
102 current_key = new_key
103
104 if current_fitness > best_fitness:
105 best_key = current_key
106 best_fitness = current_fitness
107 best_plaintext = plaintext
108 if i % 500 == 0:
109 logger.debug('Simulated annealing: iteration {}, temperature {}, '
110 'current key {}, current_fitness {}, '
111 'best_plaintext {}'.format(i, temperature, current_key,
112 current_fitness, plaintext[:50]))
113 temperature = max(temperature - dt, 0.001)
114
115 # print(best_key, best_fitness, best_plaintext[:70])
116 return best_key, best_fitness # current_alphabet, current_fitness
117
118 if __name__ == "__main__":
119 import doctest