Changed the break routines, no to make them all work again
authorNeil Smith <neil.git@njae.me.uk>
Wed, 5 Mar 2014 21:37:40 +0000 (16:37 -0500)
committerNeil Smith <neil.git@njae.me.uk>
Wed, 5 Mar 2014 21:37:40 +0000 (16:37 -0500)
cipherbreak.py
language_models.py

index 03165e5ecca6985e6a2ead4912520e8d6000d50b..06a66eb555c9a02723e96e3966876eb1b843bc82 100644 (file)
@@ -86,10 +86,7 @@ def caesar_break(message, fitness=Pletters):
                     caesar_decipher(sanitised_message, best_shift)[:50]))
     return best_shift, best_fit
 
-def affine_break(message, 
-                 metric=norms.euclidean_distance, 
-                 target_counts=normalised_english_counts, 
-                 message_frequency_scaling=norms.normalise):
+def affine_break(message, fitness=Pletters):
     """Breaks an affine cipher using frequency analysis
     
     >>> affine_break('lmyfu bkuusd dyfaxw claol psfaom jfasd snsfg jfaoe ls ' \
@@ -104,19 +101,18 @@ def affine_break(message,
     best_multiplier = 0
     best_adder = 0
     best_one_based = True
-    best_fit = float("inf")
+    best_fit = float("-inf")
     for one_based in [True, False]:
         for multiplier in range(1, 26, 2):
             for adder in range(26):
                 plaintext = affine_decipher(sanitised_message, 
                                             multiplier, adder, one_based)
-                counts = message_frequency_scaling(frequencies(plaintext))
-                fit = metric(target_counts, counts)
+                fit = fitness(plaintext)
                 logger.debug('Affine break attempt using key {0}x+{1} ({2}) '
                              'gives fit of {3} and decrypt starting: {4}'.
                              format(multiplier, adder, one_based, fit, 
                                     plaintext[:50]))
-                if fit < best_fit:
+                if fit > best_fit:
                     best_fit = fit
                     best_multiplier = multiplier
                     best_adder = adder
@@ -128,11 +124,7 @@ def affine_break(message,
                         best_adder, best_one_based)[:50]))
     return (best_multiplier, best_adder, best_one_based), best_fit
 
-def keyword_break(message, 
-                  wordlist=keywords, 
-                  metric=norms.euclidean_distance, 
-                  target_counts=normalised_english_counts, 
-                  message_frequency_scaling=norms.normalise):
+def keyword_break(message, wordlist=keywords, fitness=Pletters):
     """Breaks a keyword substitution cipher using a dictionary and 
     frequency analysis
 
@@ -143,17 +135,16 @@ def keyword_break(message,
     """
     best_keyword = ''
     best_wrap_alphabet = True
-    best_fit = float("inf")
+    best_fit = float("-inf")
     for wrap_alphabet in range(3):
         for keyword in wordlist:
             plaintext = keyword_decipher(message, keyword, wrap_alphabet)
-            counts = message_frequency_scaling(frequencies(plaintext))
-            fit = metric(target_counts, counts)
+            fit = fitness(plaintext)
             logger.debug('Keyword break attempt using key {0} (wrap={1}) '
                          'gives fit of {2} and decrypt starting: {3}'.format(
                              keyword, wrap_alphabet, fit, 
                              sanitise(plaintext)[:50]))
-            if fit < best_fit:
+            if fit > best_fit:
                 best_fit = fit
                 best_keyword = keyword
                 best_wrap_alphabet = wrap_alphabet
@@ -164,12 +155,7 @@ def keyword_break(message,
                                          best_wrap_alphabet))[:50]))
     return (best_keyword, best_wrap_alphabet), best_fit
 
-def keyword_break_mp(message, 
-                     wordlist=keywords, 
-                     metric=norms.euclidean_distance, 
-                     target_counts=normalised_english_counts, 
-                     message_frequency_scaling=norms.normalise, 
-                     chunksize=500):
+def keyword_break_mp(message, wordlist=keywords, fitness=Pletters, chunksize=500):
     """Breaks a keyword substitution cipher using a dictionary and 
     frequency analysis
 
@@ -179,28 +165,22 @@ def keyword_break_mp(message,
     (('elephant', 1), 0.106645344886...)
     """
     with Pool() as pool:
-        helper_args = [(message, word, wrap, metric, target_counts, 
-                        message_frequency_scaling) 
+        helper_args = [(message, word, wrap, fitness) 
                        for word in wordlist for wrap in range(3)]
         # Gotcha: the helper function here needs to be defined at the top level 
         #   (limitation of Pool.starmap)
         breaks = pool.starmap(keyword_break_worker, helper_args, chunksize) 
-        return min(breaks, key=lambda k: k[1])
+        return max(breaks, key=lambda k: k[1])
 
-def keyword_break_worker(message, keyword, wrap_alphabet, metric, target_counts, 
-                      message_frequency_scaling):
+def keyword_break_worker(message, keyword, wrap_alphabet, fitness):
     plaintext = keyword_decipher(message, keyword, wrap_alphabet)
-    counts = message_frequency_scaling(frequencies(plaintext))
-    fit = metric(target_counts, counts)
+    fit = fitness(plaintext)
     logger.debug('Keyword break attempt using key {0} (wrap={1}) gives fit of '
                  '{2} and decrypt starting: {3}'.format(keyword, 
                      wrap_alphabet, fit, sanitise(plaintext)[:50]))
     return (keyword, wrap_alphabet), fit
 
-def scytale_break(message, 
-                  metric=norms.euclidean_distance, 
-                  target_counts=normalised_english_bigram_counts, 
-                  message_frequency_scaling=norms.normalise):
+def scytale_break(message, fitness=Pbigrams):
     """Breaks a Scytale cipher
     
     >>> scytale_break('tfeulchtrtteehwahsdehneoifeayfsondmwpltmaoalhikotoere' \
@@ -209,18 +189,15 @@ def scytale_break(message,
     (6, 0.092599933059...)
     """
     best_key = 0
-    best_fit = float("inf")
-    ngram_length = len(next(iter(target_counts.keys())))
+    best_fit = float("-inf")
     for key in range(1, 20):
         if len(message) % key == 0:
             plaintext = scytale_decipher(message, key)
-            counts = message_frequency_scaling(frequencies(
-                         ngrams(sanitise(plaintext), ngram_length)))
-            fit = metric(target_counts, counts)
+            fit = fitness(sanitise(plaintext))
             logger.debug('Scytale break attempt using key {0} gives fit of '
                          '{1} and decrypt starting: {2}'.format(key, 
                              fit, sanitise(plaintext)[:50]))
-            if fit < best_fit:
+            if fit > best_fit:
                 best_fit = fit
                 best_key = key
     logger.info('Scytale break best fit with key {0} gives fit of {1} and '
@@ -229,12 +206,8 @@ def scytale_break(message,
     return best_key, best_fit
 
 
-def column_transposition_break_mp(message, 
-                     translist=transpositions, 
-                     metric=norms.euclidean_distance, 
-                     target_counts=normalised_english_bigram_counts, 
-                     message_frequency_scaling=norms.normalise, 
-                     chunksize=500):
+def column_transposition_break_mp(message, translist=transpositions, 
+                     fitness=Pbigrams, chunksize=500):
     """Breaks a column transposition cipher using a dictionary and 
     n-gram frequency analysis
     """
@@ -264,23 +237,21 @@ def column_transposition_break_mp(message,
     #     target_counts=normalised_english_trigram_counts) # doctest: +ELLIPSIS
     # (((2, 0, 5, 3, 1, 4, 6), False), 0.0592259560...)
     # """
-    ngram_length = len(next(iter(target_counts.keys())))
     with Pool() as pool:
-        helper_args = [(message, trans, columnwise, metric, target_counts, ngram_length,
-                        message_frequency_scaling
-                       for trans in translist.keys() for columnwise in [True, False]]
+        helper_args = [(message, trans, columnwise, fitness) 
+                       for trans in translist.keys(
+                       for columnwise in [True, False]]
         # Gotcha: the helper function here needs to be defined at the top level 
         #   (limitation of Pool.starmap)
-        breaks = pool.starmap(column_transposition_break_worker, helper_args, chunksize) 
-        return min(breaks, key=lambda k: k[1])
+        breaks = pool.starmap(column_transposition_break_worker, 
+          helper_args, chunksize) 
+        return max(breaks, key=lambda k: k[1])
 column_transposition_break = column_transposition_break_mp
 
-def column_transposition_break_worker(message, transposition, columnwise, metric, target_counts, 
-                      ngram_length, message_frequency_scaling):
+def column_transposition_break_worker(message, transposition, columnwise, 
+                                        fitness):
     plaintext = column_transposition_decipher(message, transposition, columnwise=columnwise)
-    counts = message_frequency_scaling(frequencies(
-                         ngrams(sanitise(plaintext), ngram_length)))
-    fit = metric(target_counts, counts)
+    fit = fitness(sanitise(plaintext))
     logger.debug('Column transposition break attempt using key {0} '
                          'gives fit of {1} and decrypt starting: {2}'.format(
                              transposition, fit, 
@@ -288,36 +259,28 @@ def column_transposition_break_worker(message, transposition, columnwise, metric
     return (transposition, columnwise), fit
 
 
-def transposition_break_exhaustive(message):
+def transposition_break_exhaustive(message, fitness=Pbigrams):
     best_transposition = ''
-    best_pw = -float('inf')
+    best_pw = float('-inf')
     for keylength in range(1, 21):
         if len(message) % keylength == 0:
             for transposition in permutations(range(keylength)):
                 for columnwise in [True, False]:
                     plaintext = column_transposition_decipher(message, 
                         transposition, columnwise=columnwise)
-                    # pw = Pwords(segment(plaintext))
-                    pw = sum([log10(bigram_likelihood(b, 
-                                              normalised_english_bigram_counts, 
-                                              normalised_english_counts))
-                                           for b in ngrams(plaintext, 2)])
+                    fit=fitness(plaintext)
                     logger.debug('Column transposition break attempt using key {0} {1} '
                          'gives fit of {2} and decrypt starting: {3}'.format(
                              transposition, columnwise, pw, 
                              sanitise(plaintext)[:50]))
-                    if pw > best_pw:
+                    if fit > best_fit:
                         best_transposition = transposition
                         best_columnwise = columnwise
-                        best_pw = pw
+                        best_fit = fit
     return (best_transposition, best_columnwise), best_pw
 
 
-def vigenere_keyword_break(message, 
-                  wordlist=keywords, 
-                  metric=norms.euclidean_distance, 
-                  target_counts=normalised_english_counts, 
-                  message_frequency_scaling=norms.normalise):
+def vigenere_keyword_break(message, wordlist=keywords, fitness=Pletters):
     """Breaks a vigenere cipher using a dictionary and 
     frequency analysis
     
@@ -327,16 +290,15 @@ def vigenere_keyword_break(message,
     ('cat', 0.15965224935...)
     """
     best_keyword = ''
-    best_fit = float("inf")
+    best_fit = float("-inf")
     for keyword in wordlist:
         plaintext = vigenere_decipher(message, keyword)
-        counts = message_frequency_scaling(frequencies(plaintext))
-        fit = metric(target_counts, counts)
+        fit = fitness(plaintext)
         logger.debug('Vigenere break attempt using key {0} '
                          'gives fit of {1} and decrypt starting: {2}'.format(
                              keyword, fit, 
                              sanitise(plaintext)[:50]))
-        if fit < best_fit:
+        if fit > best_fit:
             best_fit = fit
             best_keyword = keyword
     logger.info('Vigenere break best fit with key {0} gives fit '
@@ -345,11 +307,7 @@ def vigenere_keyword_break(message,
                         vigenere_decipher(message, best_keyword))[:50]))
     return best_keyword, best_fit
 
-def vigenere_keyword_break_mp(message,
-                     wordlist=keywords,
-                     metric=norms.euclidean_distance,
-                     target_counts=normalised_english_counts,
-                     message_frequency_scaling=norms.normalise,
+def vigenere_keyword_break_mp(message, wordlist=keywords, fitness=Pletters, 
                      chunksize=500):
     """Breaks a vigenere cipher using a dictionary and 
     frequency analysis
@@ -360,19 +318,16 @@ def vigenere_keyword_break_mp(message,
     ('cat', 0.159652249358...)
     """
     with Pool() as pool:
-        helper_args = [(message, word, metric, target_counts, 
-                        message_frequency_scaling) 
+        helper_args = [(message, word, fitness) 
                        for word in wordlist]
         # Gotcha: the helper function here needs to be defined at the top level 
         #   (limitation of Pool.starmap)
         breaks = pool.starmap(vigenere_keyword_break_worker, helper_args, chunksize) 
         return min(breaks, key=lambda k: k[1])
 
-def vigenere_keyword_break_worker(message, keyword, metric, target_counts, 
-                      message_frequency_scaling):
+def vigenere_keyword_break_worker(message, keyword, fitness):
     plaintext = vigenere_decipher(message, keyword)
-    counts = message_frequency_scaling(frequencies(plaintext))
-    fit = metric(target_counts, counts)
+    fit = fitness(plaintext)
     logger.debug('Vigenere keyword break attempt using key {0} gives fit of '
                  '{1} and decrypt starting: {2}'.format(keyword, 
                      fit, sanitise(plaintext)[:50]))
@@ -380,30 +335,29 @@ def vigenere_keyword_break_worker(message, keyword, metric, target_counts,
 
 
 
-def vigenere_frequency_break(message,
-                  metric=norms.euclidean_distance, 
-                  target_counts=normalised_english_counts, 
-                  message_frequency_scaling=norms.normalise):
+def vigenere_frequency_break(message, fitness=Pletters):
     """Breaks a Vigenere cipher with frequency analysis
 
     >>> vigenere_frequency_break(vigenere_encipher(sanitise("It is time to " \
             "run. She is ready and so am I. I stole Daniel's pocketbook this " \
             "afternoon when he left his jacket hanging on the easel in the " \
-            "attic."), 'florence')) # doctest: +ELLIPSIS
+            "attic. I jump every time I hear a footstep on the stairs, " \
+            "certain that the theft has been discovered and that I will " \
+            "and that I will be caught. The SS officer visits less often now " \
+            "that he is sure"), 'florence')) # doctest: +ELLIPSIS
     ('florence', 0.077657073...)
     """
-    best_fit = float("inf")
+    best_fit = float("-inf")
     best_key = ''
     sanitised_message = sanitise(message)
     for trial_length in range(1, 20):
         splits = every_nth(sanitised_message, trial_length)
         key = ''.join([chr(caesar_break(s)[0] + ord('a')) for s in splits])
         plaintext = vigenere_decipher(sanitised_message, key)
-        counts = message_frequency_scaling(frequencies(plaintext))
-        fit = metric(target_counts, counts)
+        fit = fitness(plaintext)
         logger.debug('Vigenere key length of {0} ({1}) gives fit of {2}'.
                      format(trial_length, key, fit))
-        if fit < best_fit:
+        if fit > best_fit:
             best_fit = fit
             best_key = key
     logger.info('Vigenere break best fit with key {0} gives fit '
@@ -412,30 +366,29 @@ def vigenere_frequency_break(message,
                         vigenere_decipher(message, best_key))[:50]))
     return best_key, best_fit
 
-def beaufort_frequency_break(message,
-                  metric=norms.euclidean_distance, 
-                  target_counts=normalised_english_counts, 
-                  message_frequency_scaling=norms.normalise):
+def beaufort_frequency_break(message, fitness=Pletters):
     """Breaks a Beaufort cipher with frequency analysis
 
     >>> beaufort_frequency_break(beaufort_encipher(sanitise("It is time to " \
             "run. She is ready and so am I. I stole Daniel's pocketbook this " \
             "afternoon when he left his jacket hanging on the easel in the " \
-            "attic."), 'florence')) # doctest: +ELLIPSIS
+            "attic. I jump every time I hear a footstep on the stairs, " \
+            "certain that the theft has been discovered and that I will " \
+            "and that I will be caught. The SS officer visits less often now " \
+            "that he is sure"), 'florence')) # doctest: +ELLIPSIS
     ('florence', 0.077657073...)
     """
-    best_fit = float("inf")
+    best_fit = float("-inf")
     best_key = ''
     sanitised_message = sanitise(message)
     for trial_length in range(1, 20):
         splits = every_nth(sanitised_message, trial_length)
         key = ''.join([chr(caesar_break(s)[0] + ord('a')) for s in splits])
         plaintext = beaufort_decipher(sanitised_message, key)
-        counts = message_frequency_scaling(frequencies(plaintext))
-        fit = metric(target_counts, counts)
+        fit = fitness(plaintext)
         logger.debug('Beaufort key length of {0} ({1}) gives fit of {2}'.
                      format(trial_length, key, fit))
-        if fit < best_fit:
+        if fit > best_fit:
             best_fit = fit
             best_key = key
     logger.info('Beaufort break best fit with key {0} gives fit '
index d45738657a356a38c5139838da665044fe9257e6..1b90ac2ca425c1a246b410ac2f7a588931105f00 100644 (file)
@@ -88,6 +88,19 @@ def random_english_letter():
        return weighted_choice(normalised_english_counts)
 
 
+def ngrams(text, n):
+    """Returns all n-grams of a text
+    
+    >>> ngrams(sanitise('the quick brown fox'), 2) # doctest: +NORMALIZE_WHITESPACE
+    ['th', 'he', 'eq', 'qu', 'ui', 'ic', 'ck', 'kb', 'br', 'ro', 'ow', 'wn', 
+     'nf', 'fo', 'ox']
+    >>> ngrams(sanitise('the quick brown fox'), 4) # doctest: +NORMALIZE_WHITESPACE
+    ['theq', 'hequ', 'equi', 'quic', 'uick', 'ickb', 'ckbr', 'kbro', 'brow', 
+     'rown', 'ownf', 'wnfo', 'nfox']
+    """
+    return [text[i:i+n] for i in range(len(text)-n+1)]
+
+
 class Pdist(dict):
     """A probability distribution estimated from counts in datafile.
     Values are stored and returned as log probabilities.
@@ -108,6 +121,7 @@ def log_probability_of_unknown_word(key, N):
 
 Pw = Pdist(datafile('count_1w.txt'), log_probability_of_unknown_word)
 Pl = Pdist(datafile('count_1l.txt'), lambda _k, _N: 0)
+Pl2 = Pdist(datafile('count_2l.txt'), lambda _k, _N: 0)
 
 def Pwords(words): 
     """The Naive Bayes log probability of a sequence of words.
@@ -119,6 +133,11 @@ def Pletters(letters):
     """
     return sum(Pl[l.lower()] for l in letters)
 
+def Pbigrams(letters):
+    """The Naive Bayes log probability of the bigrams formed from a sequence 
+    of letters.
+    """
+    return sum(P2l[p] for p in ngrams(letters, 2))
 
 
 def cosine_distance_score(text):