X-Git-Url: https://git.njae.me.uk/?a=blobdiff_plain;f=szyfrow%2Fkeyword_cipher.py;h=22d2607a6da748c38f4ec24377bb849f263678cb;hb=b535d9d75e69cc395e8de28c99e38564655e5ac9;hp=2cf3290463d50ae6fbe9bb93e901eb9ba3ec8a19;hpb=a870050db6bc974b1bb0d132001750b6624fb43f;p=szyfrow.git diff --git a/szyfrow/keyword_cipher.py b/szyfrow/keyword_cipher.py index 2cf3290..22d2607 100644 --- a/szyfrow/keyword_cipher.py +++ b/szyfrow/keyword_cipher.py @@ -1,16 +1,22 @@ +"""Monoalphabetic substitution ciphers, mainly done by keyword. Enciphering +and deciphering, and a couple of ways to break these ciphers. +""" from enum import Enum -# from itertools import starmap import multiprocessing import math -from support.utilities import * -from support.language_models import * - -from logger import logger -import logging -# logger.setLevel(logging.DEBUG) +from szyfrow.support.utilities import * +from szyfrow.support.language_models import * class KeywordWrapAlphabet(Enum): + """Ways to list the rest of the alphabet after use of a keyword. + + * `from_a` : continue the alphabet from 'a': `bayescdfg...` + * `from_last`: continue from the last letter of the keyword: + `bayestuvwxyzcdf...` + * `from_largest`: continue from the "largest" letter of the keyword: + `bayeszcdfg...` + """ from_a = 1 from_last = 2 from_largest = 3 @@ -18,8 +24,9 @@ class KeywordWrapAlphabet(Enum): def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a): """Find the cipher alphabet given a keyword. - wrap_alphabet controls how the rest of the alphabet is added - after the keyword. + + [`wrap_alphabet`](#szyfrow.keyword_cipher.KeywordWrapAlphabet) controls + how the rest of the alphabet is added after the keyword. >>> keyword_cipher_alphabet_of('bayes') 'bayescdfghijklmnopqrtuvwxz' @@ -90,15 +97,21 @@ def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a) return message.lower().translate(cipher_translation) -def keyword_break(message, wordlist=keywords, fitness=Pletters): +def keyword_break_single_thread(message, wordlist=None, fitness=Pletters): """Breaks a keyword substitution cipher using a dictionary and frequency analysis. + If `wordlist` is not specified, use + [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords). + >>> keyword_break(keyword_encipher('this is a test message for the ' \ 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \ wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS (('elephant', ), -52.834575011...) """ + if wordlist is None: + wordlist = keywords + best_keyword = '' best_wrap_alphabet = True best_fit = float("-inf") @@ -106,25 +119,20 @@ def keyword_break(message, wordlist=keywords, fitness=Pletters): for keyword in wordlist: plaintext = keyword_decipher(message, keyword, wrap_alphabet) fit = fitness(plaintext) - logger.debug('Keyword break attempt using key {0} (wrap={1}) ' - 'gives fit of {2} and decrypt starting: {3}'.format( - keyword, wrap_alphabet, fit, - sanitise(plaintext)[:50])) if fit > best_fit: best_fit = fit best_keyword = keyword best_wrap_alphabet = wrap_alphabet - logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of ' - '{2} and decrypt starting: {3}'.format(best_keyword, - best_wrap_alphabet, best_fit, sanitise( - keyword_decipher(message, best_keyword, - best_wrap_alphabet))[:50])) return (best_keyword, best_wrap_alphabet), best_fit -def keyword_break_mp(message, wordlist=keywords, fitness=Pletters, +def keyword_break(message, wordlist=None, fitness=Pletters, number_of_solutions=1, chunksize=500): """Breaks a keyword substitution cipher using a dictionary and - frequency analysis + frequency analysis. + + If `wordlist` is not specified, use + [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords). + >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \ 'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \ @@ -137,6 +145,9 @@ def keyword_break_mp(message, wordlist=keywords, fitness=Pletters, [(('elephant', ), -52.834575011...), (('elephant', ), -52.834575011...)] """ + if wordlist is None: + wordlist = keywords + with multiprocessing.Pool() as pool: helper_args = [(message, word, wrap, fitness) for word in wordlist @@ -152,19 +163,22 @@ def keyword_break_mp(message, wordlist=keywords, fitness=Pletters, def keyword_break_worker(message, keyword, wrap_alphabet, fitness): plaintext = keyword_decipher(message, keyword, wrap_alphabet) fit = fitness(plaintext) - logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of ' - '{2} and decrypt starting: {3}'.format(keyword, - wrap_alphabet, fit, sanitise(plaintext)[:50])) return (keyword, wrap_alphabet), fit -def monoalphabetic_break_hillclimbing(message, +def monoalphabetic_break_hillclimbing_single(message, max_iterations=20000, plain_alphabet=None, cipher_alphabet=None, swap_index_finder=None, fitness=Pletters, chunksize=1): - return simulated_annealing_break(message, + """Break a monalphabetic substitution cipher using hillclimbing to + guess the keyword. Hillclimbing is done by using the simulated annealing + approach with a temperature of zero. + + This version uses a single worker. + """ + return monoalphabetic_sa_break(message, workers=1, initial_temperature=0, max_iterations=max_iterations, @@ -174,14 +188,20 @@ def monoalphabetic_break_hillclimbing(message, fitness=fitness, chunksize=chunksize) -def monoalphabetic_break_hillclimbing_mp(message, +def monoalphabetic_break_hillclimbing(message, workers=10, max_iterations=20000, plain_alphabet=None, cipher_alphabet=None, swap_index_finder=None, fitness=Pletters, chunksize=1): - return simulated_annealing_break(message, + """Break a monalphabetic substitution cipher using hillclimbing to + guess the keyword. Hillclimbing is done by using the simulated annealing + approach with a temperature of zero. + + This version uses a several workers. + """ + return monoalphabetic_sa_break(message, workers=workers, initial_temperature=0, max_iterations=max_iterations, @@ -192,18 +212,32 @@ def monoalphabetic_break_hillclimbing_mp(message, def gaussian_swap_index(a): + """Return an index to use as the partner of `a` in a swap. The partners + are drawn from a Gaussian distribution. + """ return (a + int(random.gauss(0, 4))) % 26 def uniform_swap_index(a): + """Return an index to use as the partner of `a` in a swap. The partners + are drawn from a uniform distribution. + """ return random.randrange(26) -def simulated_annealing_break(message, workers=10, +def monoalphabetic_sa_break(message, workers=10, initial_temperature=200, max_iterations=20000, plain_alphabet=None, cipher_alphabet=None, swap_index_finder=None, fitness=Ptrigrams, chunksize=1): + """Break a monalphabetic substitution cipher using simulated annealing to + guess the keyword. This function just sets up a stable of workers who + do the actual work, implemented as + `szyfrow.keyword_cipher.monoalphabetic_sa_break_worker`. + + See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/) + for detail on how this works. + """ worker_args = [] ciphertext = sanitise(message) if swap_index_finder is None: @@ -231,15 +265,19 @@ def simulated_annealing_break(message, workers=10, initial_temperature, max_iterations, fitness, i)) with multiprocessing.Pool() as pool: - breaks = pool.starmap(simulated_annealing_break_worker, + breaks = pool.starmap(monoalphabetic_sa_break_worker, worker_args, chunksize) return max(breaks, key=lambda k: k[1]) -def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet, +def monoalphabetic_sa_break_worker(message, plain_alphabet, cipher_alphabet, swap_index_finder, t0, max_iterations, fitness, logID): + """One thread of a simulated annealing run. + See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/) + for detail on how this works. + """ def swap(letters, i, j): if i > j: i, j = j, i @@ -278,12 +316,6 @@ def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet, # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature)) sa_chance = 0 if (new_fitness > current_fitness or random.random() < sa_chance): - # logger.debug('Simulated annealing: iteration {}, temperature {}, ' - # 'current alphabet {}, current_fitness {}, ' - # 'best_plaintext {}'.format(i, temperature, current_alphabet, - # current_fitness, best_plaintext[:50])) - - # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance)) current_fitness = new_fitness current_alphabet = alphabet @@ -291,11 +323,7 @@ def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet, best_alphabet = current_alphabet best_fitness = current_fitness best_plaintext = plaintext - if i % 500 == 0: - logger.debug('Simulated annealing worker {}: iteration {}, temperature {}, ' - 'current alphabet {}, plain alphabet {}, current_fitness {}, ' - 'best_plaintext {}'.format(logID, i, temperature, current_alphabet, plain_alphabet, - current_fitness, plaintext[:50])) + temperature = max(temperature - dt, 0.001) return best_alphabet, best_fitness # current_alphabet, current_fitness