Started on documentation
[szyfrow.git] / szyfrow / keyword_cipher.py
index 2cf3290463d50ae6fbe9bb93e901eb9ba3ec8a19..22d2607a6da748c38f4ec24377bb849f263678cb 100644 (file)
@@ -1,16 +1,22 @@
+"""Monoalphabetic substitution ciphers, mainly done by keyword. Enciphering
+and deciphering, and a couple of ways to break these ciphers.
+"""
 from enum import Enum
-# from itertools import starmap
 import multiprocessing
 import math
-from support.utilities import *
-from support.language_models import *
-
-from logger import logger
-import logging
-# logger.setLevel(logging.DEBUG)
+from szyfrow.support.utilities import *
+from szyfrow.support.language_models import *
 
 
 class KeywordWrapAlphabet(Enum):
+    """Ways to list the rest of the alphabet after use of a keyword.
+
+    * `from_a` : continue the alphabet from 'a': `bayescdfg...`
+    * `from_last`: continue from the last letter of the keyword: 
+        `bayestuvwxyzcdf...`
+    * `from_largest`: continue from the "largest" letter of the keyword:
+        `bayeszcdfg...`
+    """
     from_a = 1
     from_last = 2
     from_largest = 3
@@ -18,8 +24,9 @@ class KeywordWrapAlphabet(Enum):
 
 def keyword_cipher_alphabet_of(keyword, wrap_alphabet=KeywordWrapAlphabet.from_a):
     """Find the cipher alphabet given a keyword.
-    wrap_alphabet controls how the rest of the alphabet is added
-    after the keyword.
+
+    [`wrap_alphabet`](#szyfrow.keyword_cipher.KeywordWrapAlphabet) controls 
+    how the rest of the alphabet is added after the keyword.
 
     >>> keyword_cipher_alphabet_of('bayes')
     'bayescdfghijklmnopqrtuvwxz'
@@ -90,15 +97,21 @@ def keyword_decipher(message, keyword, wrap_alphabet=KeywordWrapAlphabet.from_a)
     return message.lower().translate(cipher_translation)
 
 
-def keyword_break(message, wordlist=keywords, fitness=Pletters):
+def keyword_break_single_thread(message, wordlist=None, fitness=Pletters):
     """Breaks a keyword substitution cipher using a dictionary and
     frequency analysis.
 
+    If `wordlist` is not specified, use 
+    [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
+
     >>> keyword_break(keyword_encipher('this is a test message for the ' \
           'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
           wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS
     (('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...)
     """
+    if wordlist is None:
+      wordlist = keywords
+
     best_keyword = ''
     best_wrap_alphabet = True
     best_fit = float("-inf")
@@ -106,25 +119,20 @@ def keyword_break(message, wordlist=keywords, fitness=Pletters):
         for keyword in wordlist:
             plaintext = keyword_decipher(message, keyword, wrap_alphabet)
             fit = fitness(plaintext)
-            logger.debug('Keyword break attempt using key {0} (wrap={1}) '
-                         'gives fit of {2} and decrypt starting: {3}'.format(
-                             keyword, wrap_alphabet, fit,
-                             sanitise(plaintext)[:50]))
             if fit > best_fit:
                 best_fit = fit
                 best_keyword = keyword
                 best_wrap_alphabet = wrap_alphabet
-    logger.info('Keyword break best fit with key {0} (wrap={1}) gives fit of '
-                '{2} and decrypt starting: {3}'.format(best_keyword,
-                    best_wrap_alphabet, best_fit, sanitise(
-                        keyword_decipher(message, best_keyword,
-                                         best_wrap_alphabet))[:50]))
     return (best_keyword, best_wrap_alphabet), best_fit
 
-def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
+def keyword_break(message, wordlist=None, fitness=Pletters,
                      number_of_solutions=1, chunksize=500):
     """Breaks a keyword substitution cipher using a dictionary and
-    frequency analysis
+    frequency analysis.
+
+    If `wordlist` is not specified, use 
+    [`szyfrow.support.langauge_models.keywords`](support/language_models.html#szyfrow.support.language_models.keywords).
+
 
     >>> keyword_break_mp(keyword_encipher('this is a test message for the ' \
           'keyword decipherment', 'elephant', KeywordWrapAlphabet.from_last), \
@@ -137,6 +145,9 @@ def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
     [(('elephant', <KeywordWrapAlphabet.from_last: 2>), -52.834575011...), 
     (('elephant', <KeywordWrapAlphabet.from_largest: 3>), -52.834575011...)]
     """
+    if wordlist is None:
+        wordlist = keywords
+
     with multiprocessing.Pool() as pool:
         helper_args = [(message, word, wrap, fitness)
                        for word in wordlist
@@ -152,19 +163,22 @@ def keyword_break_mp(message, wordlist=keywords, fitness=Pletters,
 def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
     plaintext = keyword_decipher(message, keyword, wrap_alphabet)
     fit = fitness(plaintext)
-    logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
-                 '{2} and decrypt starting: {3}'.format(keyword, 
-                     wrap_alphabet, fit, sanitise(plaintext)[:50]))
     return (keyword, wrap_alphabet), fit
 
 
-def monoalphabetic_break_hillclimbing(message, 
+def monoalphabetic_break_hillclimbing_single(message, 
                               max_iterations=20000,
                               plain_alphabet=None, 
                               cipher_alphabet=None, 
                               swap_index_finder=None,
                               fitness=Pletters, chunksize=1):
-    return simulated_annealing_break(message, 
+    """Break a monalphabetic substitution cipher using hillclimbing to
+    guess the keyword. Hillclimbing is done by using the simulated annealing
+    approach with a temperature of zero.
+
+    This version uses a single worker.
+    """
+    return monoalphabetic_sa_break(message, 
                               workers=1, 
                               initial_temperature=0,
                               max_iterations=max_iterations,
@@ -174,14 +188,20 @@ def monoalphabetic_break_hillclimbing(message,
                               fitness=fitness, chunksize=chunksize)
 
 
-def monoalphabetic_break_hillclimbing_mp(message, 
+def monoalphabetic_break_hillclimbing(message, 
                               workers=10, 
                               max_iterations=20000,
                               plain_alphabet=None, 
                               cipher_alphabet=None, 
                               swap_index_finder=None,
                               fitness=Pletters, chunksize=1):
-    return simulated_annealing_break(message, 
+    """Break a monalphabetic substitution cipher using hillclimbing to
+    guess the keyword. Hillclimbing is done by using the simulated annealing
+    approach with a temperature of zero.
+
+    This version uses a several workers.
+    """
+    return monoalphabetic_sa_break(message, 
                               workers=workers, 
                               initial_temperature=0,
                               max_iterations=max_iterations,
@@ -192,18 +212,32 @@ def monoalphabetic_break_hillclimbing_mp(message,
 
 
 def gaussian_swap_index(a):
+    """Return an index to use as the partner of `a` in a swap. The partners
+    are drawn from a Gaussian distribution.
+    """
     return (a + int(random.gauss(0, 4))) % 26
 
 def uniform_swap_index(a):
+    """Return an index to use as the partner of `a` in a swap. The partners
+    are drawn from a uniform distribution.
+    """
     return random.randrange(26)
 
-def simulated_annealing_break(message, workers=10, 
+def monoalphabetic_sa_break(message, workers=10, 
                               initial_temperature=200,
                               max_iterations=20000,
                               plain_alphabet=None, 
                               cipher_alphabet=None, 
                               swap_index_finder=None,
                               fitness=Ptrigrams, chunksize=1):
+    """Break a monalphabetic substitution cipher using simulated annealing to
+    guess the keyword. This function just sets up a stable of workers who
+    do the actual work, implemented as 
+    `szyfrow.keyword_cipher.monoalphabetic_sa_break_worker`.
+
+    See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
+    for detail on how this works.
+    """
     worker_args = []
     ciphertext = sanitise(message)
     if swap_index_finder is None:
@@ -231,15 +265,19 @@ def simulated_annealing_break(message, workers=10,
                             initial_temperature, max_iterations, fitness,
                             i))
     with multiprocessing.Pool() as pool:
-        breaks = pool.starmap(simulated_annealing_break_worker,
+        breaks = pool.starmap(monoalphabetic_sa_break_worker,
                               worker_args, chunksize)
     return max(breaks, key=lambda k: k[1])
 
 
-def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet, 
+def monoalphabetic_sa_break_worker(message, plain_alphabet, cipher_alphabet, 
                                      swap_index_finder,
                                      t0, max_iterations, fitness,
                                      logID):
+    """One thread of a simulated annealing run. 
+    See a [post on simulated annealing](https://work.njae.me.uk/2019/07/08/simulated-annealing-and-breaking-substitution-ciphers/)
+    for detail on how this works.
+    """
     def swap(letters, i, j):
         if i > j:
             i, j = j, i
@@ -278,12 +316,6 @@ def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
             # print('exception triggered: new_fit {}, current_fit {}, temp {}'.format(new_fitness, current_fitness, temperature))
             sa_chance = 0
         if (new_fitness > current_fitness or random.random() < sa_chance):
-            # logger.debug('Simulated annealing: iteration {}, temperature {}, '
-            #     'current alphabet {}, current_fitness {}, '
-            #     'best_plaintext {}'.format(i, temperature, current_alphabet, 
-            #     current_fitness, best_plaintext[:50]))
-
-            # logger.debug('new_fit {}, current_fit {}, temp {}, sa_chance {}'.format(new_fitness, current_fitness, temperature, sa_chance))
             current_fitness = new_fitness
             current_alphabet = alphabet
             
@@ -291,11 +323,7 @@ def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
             best_alphabet = current_alphabet
             best_fitness = current_fitness
             best_plaintext = plaintext
-        if i % 500 == 0:
-            logger.debug('Simulated annealing worker {}: iteration {}, temperature {}, '
-                'current alphabet {}, plain alphabet {}, current_fitness {}, '
-                'best_plaintext {}'.format(logID, i, temperature, current_alphabet, plain_alphabet,
-                current_fitness, plaintext[:50]))
+
         temperature = max(temperature - dt, 0.001)
 
     return best_alphabet, best_fitness # current_alphabet, current_fitness