2 # from itertools import starmap
4 from support
.utilities
import *
5 from support
.language_models
import *
7 from logger
import logger
10 class KeywordWrapAlphabet(Enum
):
16 def keyword_cipher_alphabet_of(keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
17 """Find the cipher alphabet given a keyword.
18 wrap_alphabet controls how the rest of the alphabet is added
21 >>> keyword_cipher_alphabet_of('bayes')
22 'bayescdfghijklmnopqrtuvwxz'
23 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
24 'bayescdfghijklmnopqrtuvwxz'
25 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
26 'bayestuvwxzcdfghijklmnopqr'
27 >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
28 'bayeszcdfghijklmnopqrtuvwx'
30 if wrap_alphabet
== KeywordWrapAlphabet
.from_a
:
31 cipher_alphabet
= cat(deduplicate(sanitise(keyword
) +
32 string
.ascii_lowercase
))
34 if wrap_alphabet
== KeywordWrapAlphabet
.from_last
:
35 last_keyword_letter
= deduplicate(sanitise(keyword
))[-1]
37 last_keyword_letter
= sorted(sanitise(keyword
))[-1]
38 last_keyword_position
= string
.ascii_lowercase
.find(
39 last_keyword_letter
) + 1
40 cipher_alphabet
= cat(
41 deduplicate(sanitise(keyword
) +
42 string
.ascii_lowercase
[last_keyword_position
:] +
43 string
.ascii_lowercase
))
44 return cipher_alphabet
47 def keyword_encipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
48 """Enciphers a message with a keyword substitution cipher.
49 wrap_alphabet controls how the rest of the alphabet is added
52 1 : from the last letter in the sanitised keyword
53 2 : from the largest letter in the sanitised keyword
55 >>> keyword_encipher('test message', 'bayes')
57 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
59 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
61 >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
64 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
65 cipher_translation
= ''.maketrans(string
.ascii_lowercase
, cipher_alphabet
)
66 return unaccent(message
).lower().translate(cipher_translation
)
68 def keyword_decipher(message
, keyword
, wrap_alphabet
=KeywordWrapAlphabet
.from_a
):
69 """Deciphers a message with a keyword substitution cipher.
70 wrap_alphabet controls how the rest of the alphabet is added
73 1 : from the last letter in the sanitised keyword
74 2 : from the largest letter in the sanitised keyword
76 >>> keyword_decipher('rsqr ksqqbds', 'bayes')
78 >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
80 >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
82 >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
85 cipher_alphabet
= keyword_cipher_alphabet_of(keyword
, wrap_alphabet
)
86 cipher_translation
= ''.maketrans(cipher_alphabet
, string
.ascii_lowercase
)
87 return message
.lower().translate(cipher_translation
)
90 def keyword_break(message
, wordlist
=keywords
, fitness
=Pletters
):
91 """Breaks a keyword substitution cipher using a dictionary and
94 >>> keyword_break(keyword_encipher('this is a test message for the ' \
95 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
96 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
97 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
100 best_wrap_alphabet
= True
101 best_fit
= float("-inf")
102 for wrap_alphabet
in KeywordWrapAlphabet
:
103 for keyword
in wordlist
:
104 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
105 fit
= fitness(plaintext
)
106 logger
.debug('Keyword break attempt using key {0} (wrap={1}) '
107 'gives fit of {2} and decrypt starting: {3}'.format(
108 keyword
, wrap_alphabet
, fit
,
109 sanitise(plaintext
)[:50]))
112 best_keyword
= keyword
113 best_wrap_alphabet
= wrap_alphabet
114 logger
.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
115 '{2} and decrypt starting: {3}'.format(best_keyword
,
116 best_wrap_alphabet
, best_fit
, sanitise(
117 keyword_decipher(message
, best_keyword
,
118 best_wrap_alphabet
))[:50]))
119 return (best_keyword
, best_wrap_alphabet
), best_fit
121 def keyword_break_mp(message
, wordlist
=keywords
, fitness
=Pletters
,
122 number_of_solutions
=1, chunksize
=500):
123 """Breaks a keyword substitution cipher using a dictionary and
126 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
127 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
128 wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
129 (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
130 >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
131 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
132 wordlist=['cat', 'elephant', 'kangaroo'], \
133 number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
134 [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
135 (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
137 with multiprocessing
.Pool() as pool
:
138 helper_args
= [(message
, word
, wrap
, fitness
)
140 for wrap
in KeywordWrapAlphabet
]
141 # Gotcha: the helper function here needs to be defined at the top level
142 # (limitation of Pool.starmap)
143 breaks
= pool
.starmap(keyword_break_worker
, helper_args
, chunksize
)
144 if number_of_solutions
== 1:
145 return max(breaks
, key
=lambda k
: k
[1])
147 return sorted(breaks
, key
=lambda k
: k
[1], reverse
=True)[:number_of_solutions
]
149 def keyword_break_worker(message
, keyword
, wrap_alphabet
, fitness
):
150 plaintext
= keyword_decipher(message
, keyword
, wrap_alphabet
)
151 fit
= fitness(plaintext
)
152 logger
.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
153 '{2} and decrypt starting: {3}'.format(keyword
,
154 wrap_alphabet
, fit
, sanitise(plaintext
)[:50]))
155 return (keyword
, wrap_alphabet
), fit
158 def monoalphabetic_break_hillclimbing(message
,
159 max_iterations
=20000,
161 cipher_alphabet
=None,
162 fitness
=Pletters
, chunksize
=1):
163 return simulated_annealing_break(message
,
165 initial_temperature
=0,
166 max_iterations
=max_iterations
,
167 plain_alphabet
=plain_alphabet
,
168 cipher_alphabet
=cipher_alphabet
,
169 fitness
=fitness
, chunksize
=chunksize
)
172 def monoalphabetic_break_hillclimbing_mp(message
,
174 max_iterations
=20000,
176 cipher_alphabet
=None,
177 fitness
=Pletters
, chunksize
=1):
178 return simulated_annealing_break(message
,
180 initial_temperature
=0,
181 max_iterations
=max_iterations
,
182 plain_alphabet
=plain_alphabet
,
183 cipher_alphabet
=cipher_alphabet
,
184 fitness
=fitness
, chunksize
=chunksize
)
187 def simulated_annealing_break(message
, workers
=10,
188 initial_temperature
=200,
189 max_iterations
=20000,
191 cipher_alphabet
=None,
192 fitness
=Pletters
, chunksize
=1):
194 ciphertext
= sanitise(message
)
195 for i
in range(workers
):
196 if not plain_alphabet
:
197 plain_alphabet
= string
.ascii_lowercase
198 if not cipher_alphabet
:
199 cipher_alphabet
= list(string
.ascii_lowercase
)
200 random
.shuffle(cipher_alphabet
)
201 cipher_alphabet
= cat(cipher_alphabet
)
202 worker_args
.append((ciphertext
, plain_alphabet
, cipher_alphabet
,
203 initial_temperature
, max_iterations
, fitness
))
204 with multiprocessing
.Pool() as pool
:
205 breaks
= pool
.starmap(simulated_annealing_break_worker
,
206 worker_args
, chunksize
)
207 return max(breaks
, key
=lambda k
: k
[1])
210 def simulated_annealing_break_worker(message
, plain_alphabet
, cipher_alphabet
,
211 t0
, max_iterations
, fitness
):
212 def swap(letters
, i
, j
):
218 return (letters
[:i
] + letters
[j
] + letters
[i
+1:j
] + letters
[i
] +
223 dt
= t0
/ (0.9 * max_iterations
)
225 current_alphabet
= cipher_alphabet
226 alphabet
= current_alphabet
227 cipher_translation
= ''.maketrans(current_alphabet
, plain_alphabet
)
228 plaintext
= message
.translate(cipher_translation
)
229 current_fitness
= fitness(plaintext
)
231 best_alphabet
= current_alphabet
232 best_fitness
= current_fitness
233 best_plaintext
= plaintext
235 # print('starting for', max_iterations)
236 for i
in range(max_iterations
):
237 swap_a
= random
.randrange(26)
238 swap_b
= (swap_a
+ int(random
.gauss(0, 4))) % 26
239 alphabet
= swap(current_alphabet
, swap_a
, swap_b
)
240 cipher_translation
= ''.maketrans(alphabet
, plain_alphabet
)
241 plaintext
= message
.translate(cipher_translation
)
242 new_fitness
= fitness(plaintext
)
244 sa_chance
= math
.exp((new_fitness
- current_fitness
) / temperature
)
245 except (OverflowError, ZeroDivisionError):
246 # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
248 if (new_fitness
> current_fitness
or random
.random() < sa_chance
):
249 # logger.debug('Simulated annealing: iteration {}, temperature {}, '
250 # 'current alphabet {}, current_fitness {}, '
251 # 'best_plaintext {}'.format(i, temperature, current_alphabet,
252 # current_fitness, best_plaintext[:50]))
254 # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
255 current_fitness
= new_fitness
256 current_alphabet
= alphabet
258 if current_fitness
> best_fitness
:
259 best_alphabet
= current_alphabet
260 best_fitness
= current_fitness
261 best_plaintext
= plaintext
263 logger
.debug('Simulated annealing: iteration {}, temperature {}, '
264 'current alphabet {}, current_fitness {}, '
265 'best_plaintext {}'.format(i
, temperature
, current_alphabet
,
266 current_fitness
, plaintext
[:50]))
267 temperature
= max(temperature
- dt
, 0.001)
269 return best_alphabet
, best_fitness
# current_alphabet, current_fitness
271 if __name__
== "__main__":