X-Git-Url: https://git.njae.me.uk/?a=blobdiff_plain;f=cipher.py;h=84a4915e0f61952215671a42fe1aee81268ca259;hb=04150429da13371a54282c0d808ea3b45a73e18d;hp=db2caffe96c55a6d075709b5a98949e0d8456e4c;hpb=a30c5ec7f93201f5d0225727b46828b3e23f3a72;p=cipher-tools.git diff --git a/cipher.py b/cipher.py index db2caff..84a4915 100644 --- a/cipher.py +++ b/cipher.py @@ -3,7 +3,7 @@ import collections import norms import logging import math -from itertools import zip_longest, repeat +from itertools import zip_longest, repeat, cycle from segment import segment from multiprocessing import Pool @@ -51,6 +51,14 @@ for a in range(26): c = (a * b) % 26 modular_division_table[b][c] = a +def letters(text): + """Remove all non-alphabetic characters from a text + >>> letters('The Quick') + 'TheQuick' + >>> letters('The Quick BROWN fox jumped! over... the (9lazy) DOG') + 'TheQuickBROWNfoxjumpedoverthelazyDOG' + """ + return ''.join([c for c in text if c in string.ascii_letters]) def sanitise(text): """Remove all non-alphabetic characters and convert the text to lowercase @@ -60,8 +68,9 @@ def sanitise(text): >>> sanitise('The Quick BROWN fox jumped! over... the (9lazy) DOG') 'thequickbrownfoxjumpedoverthelazydog' """ - sanitised = [c.lower() for c in text if c in string.ascii_letters] - return ''.join(sanitised) + # sanitised = [c.lower() for c in text if c in string.ascii_letters] + # return ''.join(sanitised) + return letters(text).lower() def ngrams(text, n): """Returns all n-grams of a text @@ -160,11 +169,14 @@ def frequencies(text): ('h', 2), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 4), ('p', 1), ('q', 1), ('r', 2), ('t', 2), ('u', 2), ('v', 1), ('w', 1), ('x', 1), ('y', 1), ('z', 1)] + >>> frequencies('abcdefabcdef')['x'] + 0 """ - counts = collections.defaultdict(int) - for c in text: - counts[c] += 1 - return counts + #counts = collections.defaultdict(int) + #for c in text: + # counts[c] += 1 + #return counts + return collections.Counter(c for c in text) letter_frequencies = frequencies def deduplicate(text): @@ -484,6 +496,26 @@ def column_transposition_worker(message, keyword, transposed_columns = untranspose(columns, transpositions) return combine_every_nth(transposed_columns) +def vigenere_encipher(message, keyword): + """Vigenere encipher + + >>> vigenere_encipher('hello', 'abc') + 'hfnlp' + """ + shifts = [ord(l) - ord('a') for l in sanitise(keyword)] + pairs = zip(message, cycle(shifts)) + return ''.join([caesar_encipher_letter(l, k) for l, k in pairs]) + +def vigenere_decipher(message, keyword): + """Vigenere decipher + + >>> vigenere_decipher('hfnlp', 'abc') + 'hello' + """ + shifts = [ord(l) - ord('a') for l in sanitise(keyword)] + pairs = zip(message, cycle(shifts)) + return ''.join([caesar_decipher_letter(l, k) for l, k in pairs]) + def caesar_break(message, @@ -617,10 +649,10 @@ def keyword_break_mp(message, for word in wordlist for wrap in range(3)] # Gotcha: the helper function here needs to be defined at the top level # (limitation of Pool.starmap) - breaks = pool.starmap(keyword_break_one, helper_args, chunksize) + breaks = pool.starmap(keyword_break_worker, helper_args, chunksize) return min(breaks, key=lambda k: k[1]) -def keyword_break_one(message, keyword, wrap_alphabet, metric, target_counts, +def keyword_break_worker(message, keyword, wrap_alphabet, metric, target_counts, message_frequency_scaling): plaintext = keyword_decipher(message, keyword, wrap_alphabet) counts = message_frequency_scaling(letter_frequencies(plaintext)) @@ -643,11 +675,12 @@ def scytale_break(message, """ best_key = 0 best_fit = float("inf") + ngram_length = len(next(iter(target_counts.keys()))) for key in range(1, 20): if len(message) % key == 0: plaintext = scytale_decipher(message, key) counts = message_frequency_scaling(frequencies( - ngrams(sanitise(plaintext), 2))) + ngrams(sanitise(plaintext), ngram_length))) fit = metric(target_counts, counts) logger.debug('Scytale break attempt using key {0} gives fit of ' '{1} and decrypt starting: {2}'.format(key, @@ -663,7 +696,6 @@ def scytale_break(message, def column_transposition_break(message, wordlist=keywords, metric=norms.euclidean_distance, - #test_ngram_length=2, target_counts=normalised_english_bigram_counts, message_frequency_scaling=norms.normalise): """Breaks a column transposition cipher using a dictionary and @@ -707,6 +739,117 @@ def column_transposition_break(message, return best_keyword, best_fit +def column_transposition_break_mp(message, + wordlist=keywords, + metric=norms.euclidean_distance, + target_counts=normalised_english_bigram_counts, + message_frequency_scaling=norms.normalise, + chunksize=500): + """Breaks a column transposition cipher using a dictionary and + n-gram frequency analysis + + >>> column_transposition_break_mp(column_transposition_encipher(sanitise( \ + "Turing's homosexuality resulted in a criminal prosecution in 1952, \ + when homosexual acts were still illegal in the United Kingdom. "), \ + 'encipher'), \ + wordlist=['encipher', 'keyword', 'fourteen']) # doctest: +ELLIPSIS + ('encipher', 0.898128626285...) + >>> column_transposition_break_mp(column_transposition_encipher(sanitise( \ + "Turing's homosexuality resulted in a criminal prosecution in 1952, " \ + "when homosexual acts were still illegal in the United Kingdom."), \ + 'encipher'), \ + wordlist=['encipher', 'keyword', 'fourteen'], \ + target_counts=normalised_english_trigram_counts) # doctest: +ELLIPSIS + ('encipher', 1.1958792913127...) + """ + ngram_length = len(next(iter(target_counts.keys()))) + with Pool() as pool: + helper_args = [(message, word, metric, target_counts, ngram_length, + message_frequency_scaling) + for word in wordlist] + # Gotcha: the helper function here needs to be defined at the top level + # (limitation of Pool.starmap) + breaks = pool.starmap(column_transposition_break_worker, helper_args, chunksize) + return min(breaks, key=lambda k: k[1]) + +def column_transposition_break_worker(message, keyword, metric, target_counts, + ngram_length, message_frequency_scaling): + plaintext = column_transposition_decipher(message, keyword) + counts = message_frequency_scaling(frequencies( + ngrams(sanitise(plaintext), ngram_length))) + fit = metric(target_counts, counts) + logger.debug('Column transposition break attempt using key {0} ' + 'gives fit of {1} and decrypt starting: {2}'.format( + keyword, fit, + sanitise(plaintext)[:50])) + return keyword, fit + +def vigenere_keyword_break(message, + wordlist=keywords, + metric=norms.euclidean_distance, + target_counts=normalised_english_counts, + message_frequency_scaling=norms.normalise): + """Breaks a vigenere cipher using a dictionary and + frequency analysis + + >>> vigenere_keyword_break(keyword_encipher('this is a test message for the ' \ + 'keyword decipherment', 'elephant', 1), \ + wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS + ('elephant', 0.7166585201707...) + """ + best_keyword = '' + best_fit = float("inf") + for keyword in wordlist: + plaintext = vigenere_decipher(message, keyword) + counts = message_frequency_scaling(letter_frequencies(plaintext)) + fit = metric(target_counts, counts) + logger.debug('Vigenere break attempt using key {0} ' + 'gives fit of {1} and decrypt starting: {2}'.format( + keyword, fit, + sanitise(plaintext)[:50])) + if fit < best_fit: + best_fit = fit + best_keyword = keyword + logger.info('Vigenere break best fit with key {0} gives fit ' + 'of {1} and decrypt starting: {2}'.format(best_keyword, + best_fit, sanitise( + vigenere_decipher(message, best_keyword))[:50])) + return best_keyword, best_fit + +def vigenere_keyword_break_mp(message, + wordlist=keywords, + metric=norms.euclidean_distance, + target_counts=normalised_english_counts, + message_frequency_scaling=norms.normalise, + chunksize=500): + """Breaks a vigenere cipher using a dictionary and + frequency analysis + + >>> vigenere_keyword_break_mp(keyword_encipher('this is a test message for the ' \ + 'keyword decipherment', 'elephant', 1), \ + wordlist=['cat', 'elephant', 'kangaroo']) # doctest: +ELLIPSIS + ('elephant', 0.7166585201707...) + """ + with Pool() as pool: + helper_args = [(message, word, metric, target_counts, + message_frequency_scaling) + for word in wordlist] + # Gotcha: the helper function here needs to be defined at the top level + # (limitation of Pool.starmap) + breaks = pool.starmap(vigenere_keyword_break_worker, helper_args, chunksize) + return min(breaks, key=lambda k: k[1]) + +def vigenere_keyword_break_worker(message, keyword, metric, target_counts, + message_frequency_scaling): + plaintext = vigenere_decipher(message, keyword) + counts = message_frequency_scaling(letter_frequencies(plaintext)) + fit = metric(target_counts, counts) + logger.debug('Vigenere keyword break attempt using key {0} gives fit of ' + '{1} and decrypt starting: {2}'.format(keyword, + fit, sanitise(plaintext)[:50])) + return keyword, fit + + if __name__ == "__main__": import doctest