+"""Monoalphabetic substitution ciphers, mainly done by keyword. Enciphering
+and deciphering, and a couple of ways to break these ciphers.
+"""
from enum import Enum
-# from itertools import starmap
import multiprocessing
import math
-from support.utilities import *
-from support.language_models import *
-
-from logger import logger
-import logging
-# logger.setLevel(logging.DEBUG)
+from szyfrow.support.utilities import *
+from szyfrow.support.language_models import *
class KeywordWrapAlphabet(Enum):
+ """Ways to list the rest of the alphabet after use of a keyword.
+
+ * `from_a` : continue the alphabet from 'a': `bayescdfg...`
+ * `from_last`: continue from the last letter of the keyword:
+ `bayestuvwxyzcdf...`
+ * `from_largest`: continue from the "largest" letter of the keyword:
+ `bayeszcdfg...`
+ """
from_a = 1
from_last = 2
from_largest = 3
def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
"""Find the cipher alphabet given a keyword.
- wrap_alphabet controls how the rest of the alphabet is added
- after the keyword.
+
+ [`wrap_alphabet`](#szyfrow.keyword_cipher.KeywordWrapAlphabet) controls
+ how the rest of the alphabet is added after the keyword.
>>> keyword_cipher_alphabet_of('bayes')
'bayescdfghijklmnopqrtuvwxz'
return message.lower().translate(cipher_translation)
-def keyword_break(message, wordlist=keywords, fitness=Pletters):
+def keyword_break_single_thread(message, wordlist=None, fitness=Pletters):
"""Breaks a keyword substitution cipher using a dictionary and
frequency analysis.
+ If `wordlist` is not specified, use
+ [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
+
>>> keyword_break(keyword_encipher('this is a test message for the ' \
'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
"""
+ if wordlist is None:
+ wordlist = keywords
+
best_keyword = ''
best_wrap_alphabet = True
best_fit = float("-inf")
for keyword in wordlist:
plaintext = keyword_decipher(message, keyword, wrap_alphabet)
fit = fitness(plaintext)
- logger.debug('Keyword break attempt using key {0} (wrap={1}) '
- 'gives fit of {2} and decrypt starting: {3}'.format(
- keyword, wrap_alphabet, fit,
- sanitise(plaintext)[:50]))
if fit > best_fit:
best_fit = fit
best_keyword = keyword
best_wrap_alphabet = wrap_alphabet
- logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
- '{2} and decrypt starting: {3}'.format(best_keyword,
- best_wrap_alphabet, best_fit, sanitise(
- keyword_decipher(message, best_keyword,
- best_wrap_alphabet))[:50]))
return (best_keyword, best_wrap_alphabet), best_fit
-def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
+def keyword_break(message, wordlist=None, fitness=Pletters,
number_of_solutions=1, chunksize=500):
"""Breaks a keyword substitution cipher using a dictionary and
- frequency analysis
+ frequency analysis.
+
+ If `wordlist` is not specified, use
+ [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
+
>>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
[(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...),
(('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
"""
+ if wordlist is None:
+ wordlist = keywords
+
with multiprocessing.Pool() as pool:
helper_args = [(message, word, wrap, fitness)
for word in wordlist
def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
plaintext = keyword_decipher(message, keyword, wrap_alphabet)
fit = fitness(plaintext)
- logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
- '{2} and decrypt starting: {3}'.format(keyword,
- wrap_alphabet, fit, sanitise(plaintext)[:50]))
return (keyword, wrap_alphabet), fit
-def monoalphabetic_break_hillclimbing(message,
+def monoalphabetic_break_hillclimbing_single(message,
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
swap_index_finder=None,
fitness=Pletters, chunksize=1):
- return simulated_annealing_break(message,
+ """Break a monalphabetic substitution cipher using hillclimbing to
+ guess the keyword. Hillclimbing is done by using the simulated annealing
+ approach with a temperature of zero.
+
+ This version uses a single worker.
+ """
+ return monoalphabetic_sa_break(message,
workers=1,
initial_temperature=0,
max_iterations=max_iterations,
fitness=fitness, chunksize=chunksize)
-def monoalphabetic_break_hillclimbing_mp(message,
+def monoalphabetic_break_hillclimbing(message,
workers=10,
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
swap_index_finder=None,
fitness=Pletters, chunksize=1):
- return simulated_annealing_break(message,
+ """Break a monalphabetic substitution cipher using hillclimbing to
+ guess the keyword. Hillclimbing is done by using the simulated annealing
+ approach with a temperature of zero.
+
+ This version uses a several workers.
+ """
+ return monoalphabetic_sa_break(message,
workers=workers,
initial_temperature=0,
max_iterations=max_iterations,
def gaussian_swap_index(a):
+ """Return an index to use as the partner of `a` in a swap. The partners
+ are drawn from a Gaussian distribution.
+ """
return (a + int(random.gauss(0, 4))) % 26
def uniform_swap_index(a):
+ """Return an index to use as the partner of `a` in a swap. The partners
+ are drawn from a uniform distribution.
+ """
return random.randrange(26)
-def simulated_annealing_break(message, workers=10,
+def monoalphabetic_sa_break(message, workers=10,
initial_temperature=200,
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
swap_index_finder=None,
fitness=Ptrigrams, chunksize=1):
+ """Break a monalphabetic substitution cipher using simulated annealing to
+ guess the keyword. This function just sets up a stable of workers who
+ do the actual work, implemented as
+ `szyfrow.keyword_cipher.monoalphabetic_sa_break_worker`.
+
+ See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
+ for detail on how this works.
+ """
worker_args = []
ciphertext = sanitise(message)
if swap_index_finder is None:
initial_temperature, max_iterations, fitness,
i))
with multiprocessing.Pool() as pool:
- breaks = pool.starmap(simulated_annealing_break_worker,
+ breaks = pool.starmap(monoalphabetic_sa_break_worker,
worker_args, chunksize)
return max(breaks, key=lambda k: k[1])
-def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
+def monoalphabetic_sa_break_worker(message, plain_alphabet, cipher_alphabet,
swap_index_finder,
t0, max_iterations, fitness,
logID):
+ """One thread of a simulated annealing run.
+ See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
+ for detail on how this works.
+ """
def swap(letters, i, j):
if i > j:
i, j = j, i
# print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
sa_chance = 0
if (new_fitness > current_fitness or random.random() < sa_chance):
- # logger.debug('Simulated annealing: iteration {}, temperature {}, '
- # 'current alphabet {}, current_fitness {}, '
- # 'best_plaintext {}'.format(i, temperature, current_alphabet,
- # current_fitness, best_plaintext[:50]))
-
- # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
current_fitness = new_fitness
current_alphabet = alphabet
best_alphabet = current_alphabet
best_fitness = current_fitness
best_plaintext = plaintext
- if i % 500 == 0:
- logger.debug('Simulated annealing worker {}: iteration {}, temperature {}, '
- 'current alphabet {}, plain alphabet {}, current_fitness {}, '
- 'best_plaintext {}'.format(logID, i, temperature, current_alphabet, plain_alphabet,
- current_fitness, plaintext[:50]))
+
temperature = max(temperature - dt, 0.001)
return best_alphabet, best_fitness # current_alphabet, current_fitness