2 # from itertools import starmap
5 from support
.utilities
import *
6 from support
.language_models
import *
8 from logger
import logger
11 class KeywordWrapAlphabet(Enum
):
17 def keyword_cipher_alphabet_of(keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
18 """Find the cipher alphabet given a keyword.
19 wrap_alphabet controls how the rest of the alphabet is added
22 >>> keyword_cipher_alphabet_of('bayes')
23 'bayescdfghijklmnopqrtuvwxz'
24 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
25 'bayescdfghijklmnopqrtuvwxz'
26 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
27 'bayestuvwxzcdfghijklmnopqr'
28 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
29 'bayeszcdfghijklmnopqrtuvwx'
31 if wrap_alphabet
== KeywordWrapAlphabet
.from_a
:
32 cipher_alphabet
= cat(deduplicate(sanitise(keyword
) +
33 string
.ascii_lowercase
))
35 if wrap_alphabet
== KeywordWrapAlphabet
.from_last
:
36 last_keyword_letter
= deduplicate(sanitise(keyword
))[-1]
38 last_keyword_letter
= sorted(sanitise(keyword
))[-1]
39 last_keyword_position
= string
.ascii_lowercase
.find(
40 last_keyword_letter
) + 1
41 cipher_alphabet
= cat(
42 deduplicate(sanitise(keyword
) +
43 string
.ascii_lowercase
[last_keyword_position
:] +
44 string
.ascii_lowercase
))
45 return cipher_alphabet
48 def keyword_encipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
49 """Enciphers a message with a keyword substitution cipher.
50 wrap_alphabet controls how the rest of the alphabet is added
53 1 : from the last letter in the sanitised keyword
54 2 : from the largest letter in the sanitised keyword
56 >>> keyword_encipher('test message', 'bayes')
58 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
60 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
62 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
65 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
66 cipher_translation
= ''.maketrans(string
.ascii_lowercase
, cipher_alphabet
)
67 return unaccent(message
).lower().translate(cipher_translation
)
69 def keyword_decipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
70 """Deciphers a message with a keyword substitution cipher.
71 wrap_alphabet controls how the rest of the alphabet is added
74 1 : from the last letter in the sanitised keyword
75 2 : from the largest letter in the sanitised keyword
77 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
79 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
81 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
83 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
86 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
87 cipher_translation
= ''.maketrans(cipher_alphabet
, string
.ascii_lowercase
)
88 return message
.lower().translate(cipher_translation
)
91 def keyword_break(message
, wordlist
=keywords
, fitness
=Pletters
):
92 """Breaks a keyword substitution cipher using a dictionary and
95 >>> keyword_break(keyword_encipher('this is a test message for the ' \
96 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
97 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
98 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
101 best_wrap_alphabet
= True
102 best_fit
= float("-inf")
103 for wrap_alphabet
in KeywordWrapAlphabet
:
104 for keyword
in wordlist
:
105 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
106 fit
= fitness(plaintext
)
107 logger
.debug('Keyword break attempt using key {0} (wrap={1}) '
108 'gives fit of {2} and decrypt starting: {3}'.format(
109 keyword
, wrap_alphabet
, fit
,
110 sanitise(plaintext
)[:50]))
113 best_keyword
= keyword
114 best_wrap_alphabet
= wrap_alphabet
115 logger
.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
116 '{2} and decrypt starting: {3}'.format(best_keyword
,
117 best_wrap_alphabet
, best_fit
, sanitise(
118 keyword_decipher(message
, best_keyword
,
119 best_wrap_alphabet
))[:50]))
120 return (best_keyword
, best_wrap_alphabet
), best_fit
122 def keyword_break_mp(message
, wordlist
=keywords
, fitness
=Pletters
,
123 number_of_solutions
=1, chunksize
=500):
124 """Breaks a keyword substitution cipher using a dictionary and
127 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
128 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
129 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
130 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
131 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
132 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
133 wordlist=['cat', 'elephant', 'kangaroo'], \
134 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
135 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
136 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
138 with multiprocessing
.Pool() as pool
:
139 helper_args
= [(message
, word
, wrap
, fitness
)
141 for wrap
in KeywordWrapAlphabet
]
142 # Gotcha: the helper function here needs to be defined at the top level
143 # (limitation of Pool.starmap)
144 breaks
= pool
.starmap(keyword_break_worker
, helper_args
, chunksize
)
145 if number_of_solutions
== 1:
146 return max(breaks
, key
=lambda k
: k
[1])
148 return sorted(breaks
, key
=lambda k
: k
[1], reverse
=True)[:number_of_solutions
]
150 def keyword_break_worker(message
, keyword
, wrap_alphabet
, fitness
):
151 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
152 fit
= fitness(plaintext
)
153 logger
.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
154 '{2} and decrypt starting: {3}'.format(keyword
,
155 wrap_alphabet
, fit
, sanitise(plaintext
)[:50]))
156 return (keyword
, wrap_alphabet
), fit
159 def monoalphabetic_break_hillclimbing(message
,
160 max_iterations
=20000,
162 cipher_alphabet
=None,
163 fitness
=Pletters
, chunksize
=1):
164 return simulated_annealing_break(message
,
166 initial_temperature
=0,
167 max_iterations
=max_iterations
,
168 plain_alphabet
=plain_alphabet
,
169 cipher_alphabet
=cipher_alphabet
,
170 fitness
=fitness
, chunksize
=chunksize
)
173 def monoalphabetic_break_hillclimbing_mp(message
,
175 max_iterations
=20000,
177 cipher_alphabet
=None,
178 fitness
=Pletters
, chunksize
=1):
179 return simulated_annealing_break(message
,
181 initial_temperature
=0,
182 max_iterations
=max_iterations
,
183 plain_alphabet
=plain_alphabet
,
184 cipher_alphabet
=cipher_alphabet
,
185 fitness
=fitness
, chunksize
=chunksize
)
188 def simulated_annealing_break(message
, workers
=10,
189 initial_temperature
=200,
190 max_iterations
=20000,
192 cipher_alphabet
=None,
193 fitness
=Pletters
, chunksize
=1):
195 ciphertext
= sanitise(message
)
196 for i
in range(workers
):
197 if not plain_alphabet
:
198 plain_alphabet
= string
.ascii_lowercase
199 if not cipher_alphabet
:
200 cipher_alphabet
= list(string
.ascii_lowercase
)
201 random
.shuffle(cipher_alphabet
)
202 cipher_alphabet
= cat(cipher_alphabet
)
203 worker_args
.append((ciphertext
, plain_alphabet
, cipher_alphabet
,
204 initial_temperature
, max_iterations
, fitness
))
205 with multiprocessing
.Pool() as pool
:
206 breaks
= pool
.starmap(simulated_annealing_break_worker
,
207 worker_args
, chunksize
)
208 return max(breaks
, key
=lambda k
: k
[1])
211 def simulated_annealing_break_worker(message
, plain_alphabet
, cipher_alphabet
,
212 t0
, max_iterations
, fitness
):
213 def swap(letters
, i
, j
):
219 return (letters
[:i
] + letters
[j
] + letters
[i
+1:j
] + letters
[i
] +
224 dt
= t0
/ (0.9 * max_iterations
)
226 current_alphabet
= cipher_alphabet
227 alphabet
= current_alphabet
228 cipher_translation
= ''.maketrans(current_alphabet
, plain_alphabet
)
229 plaintext
= message
.translate(cipher_translation
)
230 current_fitness
= fitness(plaintext
)
232 best_alphabet
= current_alphabet
233 best_fitness
= current_fitness
234 best_plaintext
= plaintext
236 # print('starting for', max_iterations)
237 for i
in range(max_iterations
):
238 swap_a
= random
.randrange(26)
239 swap_b
= (swap_a
+ int(random
.gauss(0, 4))) % 26
240 alphabet
= swap(current_alphabet
, swap_a
, swap_b
)
241 cipher_translation
= ''.maketrans(alphabet
, plain_alphabet
)
242 plaintext
= message
.translate(cipher_translation
)
243 new_fitness
= fitness(plaintext
)
245 sa_chance
= math
.exp((new_fitness
- current_fitness
) / temperature
)
246 except (OverflowError, ZeroDivisionError):
247 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
249 if (new_fitness
> current_fitness
or random
.random() < sa_chance
):
250 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
251 # 'current alphabet {}, current_fitness {}, '
252 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
253 # current_fitness, best_plaintext[:50]))
255 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
256 current_fitness
= new_fitness
257 current_alphabet
= alphabet
259 if current_fitness
> best_fitness
:
260 best_alphabet
= current_alphabet
261 best_fitness
= current_fitness
262 best_plaintext
= plaintext
264 logger
.debug('Simulated annealing: iteration {}, temperature {}, '
265 'current alphabet {}, current_fitness {}, '
266 'best_plaintext {}'.format(i
, temperature
, current_alphabet
,
267 current_fitness
, plaintext
[:50]))
268 temperature
= max(temperature
- dt
, 0.001)
270 return best_alphabet
, best_fitness
# current_alphabet, current_fitness
272 if __name__
== "__main__":