+from utilities import *
+from language_models import *
+from enum import Enum
+# from itertools import starmap
+import multiprocessing
+
+from logger import logger
+
+
+class KeywordWrapAlphabet(Enum):
+ from_a = 1
+ from_last = 2
+ from_largest = 3
+
+
+def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
+ """Find the cipher alphabet given a keyword.
+ wrap_alphabet controls how the rest of the alphabet is added
+ after the keyword.
+
+ >>> keyword_cipher_alphabet_of('bayes')
+ 'bayescdfghijklmnopqrtuvwxz'
+ >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_a)
+ 'bayescdfghijklmnopqrtuvwxz'
+ >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_last)
+ 'bayestuvwxzcdfghijklmnopqr'
+ >>> keyword_cipher_alphabet_of('bayes', KeywordWrapAlphabet.from_largest)
+ 'bayeszcdfghijklmnopqrtuvwx'
+ """
+ if wrap_alphabet == KeywordWrapAlphabet.from_a:
+ cipher_alphabet = cat(deduplicate(sanitise(keyword) +
+ string.ascii_lowercase))
+ else:
+ if wrap_alphabet == KeywordWrapAlphabet.from_last:
+ last_keyword_letter = deduplicate(sanitise(keyword))[-1]
+ else:
+ last_keyword_letter = sorted(sanitise(keyword))[-1]
+ last_keyword_position = string.ascii_lowercase.find(
+ last_keyword_letter) + 1
+ cipher_alphabet = cat(
+ deduplicate(sanitise(keyword) +
+ string.ascii_lowercase[last_keyword_position:] +
+ string.ascii_lowercase))
+ return cipher_alphabet
+
+
+def keyword_encipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
+ """Enciphers a message with a keyword substitution cipher.
+ wrap_alphabet controls how the rest of the alphabet is added
+ after the keyword.
+ 0 : from 'a'
+ 1 : from the last letter in the sanitised keyword
+ 2 : from the largest letter in the sanitised keyword
+
+ >>> keyword_encipher('test message', 'bayes')
+ 'rsqr ksqqbds'
+ >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_a)
+ 'rsqr ksqqbds'
+ >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_last)
+ 'lskl dskkbus'
+ >>> keyword_encipher('test message', 'bayes', KeywordWrapAlphabet.from_largest)
+ 'qspq jsppbcs'
+ """
+ cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
+ cipher_translation = ''.maketrans(string.ascii_lowercase, cipher_alphabet)
+ return unaccent(message).lower().translate(cipher_translation)
+
+def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
+ """Deciphers a message with a keyword substitution cipher.
+ wrap_alphabet controls how the rest of the alphabet is added
+ after the keyword.
+ 0 : from 'a'
+ 1 : from the last letter in the sanitised keyword
+ 2 : from the largest letter in the sanitised keyword
+
+ >>> keyword_decipher('rsqr ksqqbds', 'bayes')
+ 'test message'
+ >>> keyword_decipher('rsqr ksqqbds', 'bayes', KeywordWrapAlphabet.from_a)
+ 'test message'
+ >>> keyword_decipher('lskl dskkbus', 'bayes', KeywordWrapAlphabet.from_last)
+ 'test message'
+ >>> keyword_decipher('qspq jsppbcs', 'bayes', KeywordWrapAlphabet.from_largest)
+ 'test message'
+ """
+ cipher_alphabet = keyword_cipher_alphabet_of(keyword, wrap_alphabet)
+ cipher_translation = ''.maketrans(cipher_alphabet, string.ascii_lowercase)
+ return message.lower().translate(cipher_translation)
+
+
+def keyword_break(message, wordlist=keywords, fitness=Pletters):
+ """Breaks a keyword substitution cipher using a dictionary and
+ frequency analysis.
+
+ >>> keyword_break(keyword_encipher('this is a test message for the ' \
+ 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
+ wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
+ (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
+ """
+ best_keyword = ''
+ best_wrap_alphabet = True
+ best_fit = float("-inf")
+ for wrap_alphabet in KeywordWrapAlphabet:
+ for keyword in wordlist:
+ plaintext = keyword_decipher(message, keyword, wrap_alphabet)
+ fit = fitness(plaintext)
+ logger.debug('Keyword break attempt using key {0} (wrap={1}) '
+ 'gives fit of {2} and decrypt starting: {3}'.format(
+ keyword, wrap_alphabet, fit,
+ sanitise(plaintext)[:50]))
+ if fit > best_fit:
+ best_fit = fit
+ best_keyword = keyword
+ best_wrap_alphabet = wrap_alphabet
+ logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
+ '{2} and decrypt starting: {3}'.format(best_keyword,
+ best_wrap_alphabet, best_fit, sanitise(
+ keyword_decipher(message, best_keyword,
+ best_wrap_alphabet))[:50]))
+ return (best_keyword, best_wrap_alphabet), best_fit
+
+def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
+ number_of_solutions=1, chunksize=500):
+ """Breaks a keyword substitution cipher using a dictionary and
+ frequency analysis
+
+ >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
+ 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
+ wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
+ (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
+ >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
+ 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
+ wordlist=['cat', 'elephant', 'kangaroo'], \
+ number_of_solutions=2) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
+ [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
+ (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
+ """
+ with multiprocessing.Pool() as pool:
+ helper_args = [(message, word, wrap, fitness)
+ for word in wordlist
+ for wrap in KeywordWrapAlphabet]
+ # Gotcha: the helper function here needs to be defined at the top level
+ # (limitation of Pool.starmap)
+ breaks = pool.starmap(keyword_break_worker, helper_args, chunksize)
+ if number_of_solutions == 1:
+ return max(breaks, key=lambda k: k[1])
+ else:
+ return sorted(breaks, key=lambda k: k[1], reverse=True)[:number_of_solutions]
+
+def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
+ plaintext = keyword_decipher(message, keyword, wrap_alphabet)
+ fit = fitness(plaintext)
+ logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
+ '{2} and decrypt starting: {3}'.format(keyword,
+ wrap_alphabet, fit, sanitise(plaintext)[:50]))
+ return (keyword, wrap_alphabet), fit
+
+
+def monoalphabetic_break_hillclimbing(message,
+ max_iterations=20000,
+ plain_alphabet=None,
+ cipher_alphabet=None,
+ fitness=Pletters, chunksize=1):
+ return simulated_annealing_break(message,
+ workers=1,
+ initial_temperature=0,
+ max_iterations=max_iterations,
+ plain_alphabet=plain_alphabet,
+ cipher_alphabet=cipher_alphabet,
+ fitness=fitness, chunksize=chunksize)
+
+
+def monoalphabetic_break_hillclimbing_mp(message,
+ workers=10,
+ max_iterations=20000,
+ plain_alphabet=None,
+ cipher_alphabet=None,
+ fitness=Pletters, chunksize=1):
+ return simulated_annealing_break(message,
+ workers=workers,
+ initial_temperature=0,
+ max_iterations=max_iterations,
+ plain_alphabet=plain_alphabet,
+ cipher_alphabet=cipher_alphabet,
+ fitness=fitness, chunksize=chunksize)
+
+
+def simulated_annealing_break(message, workers=10,
+ initial_temperature=200,
+ max_iterations=20000,
+ plain_alphabet=None,
+ cipher_alphabet=None,
+ fitness=Pletters, chunksize=1):
+ worker_args = []
+ ciphertext = sanitise(message)
+ for i in range(workers):
+ if not plain_alphabet:
+ plain_alphabet = string.ascii_lowercase
+ if not cipher_alphabet:
+ cipher_alphabet = list(string.ascii_lowercase)
+ random.shuffle(cipher_alphabet)
+ cipher_alphabet = cat(cipher_alphabet)
+ worker_args.append((ciphertext, plain_alphabet, cipher_alphabet,
+ initial_temperature, max_iterations, fitness))
+ with multiprocessing.Pool() as pool:
+ breaks = pool.starmap(simulated_annealing_break_worker,
+ worker_args, chunksize)
+ return max(breaks, key=lambda k: k[1])
+
+
+def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
+ t0, max_iterations, fitness):
+ def swap(letters, i, j):
+ if i > j:
+ i, j = j, i
+ if i == j:
+ return letters
+ else:
+ return (letters[:i] + letters[j] + letters[i+1:j] + letters[i] +
+ letters[j+1:])
+
+ temperature = t0
+
+ dt = t0 / (0.9 * max_iterations)
+
+ current_alphabet = cipher_alphabet
+ alphabet = current_alphabet
+ cipher_translation = ''.maketrans(current_alphabet, plain_alphabet)
+ plaintext = message.translate(cipher_translation)
+ current_fitness = fitness(plaintext)
+
+ best_alphabet = current_alphabet
+ best_fitness = current_fitness
+ best_plaintext = plaintext
+
+ # print('starting for', max_iterations)
+ for i in range(max_iterations):
+ swap_a = random.randrange(26)
+ swap_b = (swap_a + int(random.gauss(0, 4))) % 26
+ alphabet = swap(current_alphabet, swap_a, swap_b)
+ cipher_translation = ''.maketrans(alphabet, plain_alphabet)
+ plaintext = message.translate(cipher_translation)
+ new_fitness = fitness(plaintext)
+ try:
+ sa_chance = math.exp((new_fitness - current_fitness) / temperature)
+ except (OverflowError, ZeroDivisionError):
+ # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
+ sa_chance = 0
+ if (new_fitness > current_fitness or random.random() < sa_chance):
+ # logger.debug('Simulated annealing: iteration {}, temperature {}, '
+ # 'current alphabet {}, current_fitness {}, '
+ # 'best_plaintext {}'.format(i, temperature, current_alphabet,
+ # current_fitness, best_plaintext[:50]))
+
+ # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
+ current_fitness = new_fitness
+ current_alphabet = alphabet
+
+ if current_fitness > best_fitness:
+ best_alphabet = current_alphabet
+ best_fitness = current_fitness
+ best_plaintext = plaintext
+ if i % 500 == 0:
+ logger.debug('Simulated annealing: iteration {}, temperature {}, '
+ 'current alphabet {}, current_fitness {}, '
+ 'best_plaintext {}'.format(i, temperature, current_alphabet,
+ current_fitness, plaintext[:50]))
+ temperature = max(temperature - dt, 0.001)
+
+ return best_alphabet, best_fitness # current_alphabet, current_fitness
+
+if __name__ == "__main__":
+ import doctest
\ No newline at end of file