-from utilities import *
-from language_models import *
from enum import Enum
# from itertools import starmap
import multiprocessing
+import math
+from support.utilities import *
+from support.language_models import *
from logger import logger
+import logging
+# logger.setLevel(logging.DEBUG)
class KeywordWrapAlphabet(Enum):
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
+ swap_index_finder=None,
fitness=Pletters, chunksize=1):
return simulated_annealing_break(message,
workers=1,
max_iterations=max_iterations,
plain_alphabet=plain_alphabet,
cipher_alphabet=cipher_alphabet,
+ swap_index_finder=swap_index_finder,
fitness=fitness, chunksize=chunksize)
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
+ swap_index_finder=None,
fitness=Pletters, chunksize=1):
return simulated_annealing_break(message,
workers=workers,
max_iterations=max_iterations,
plain_alphabet=plain_alphabet,
cipher_alphabet=cipher_alphabet,
+ swap_index_finder=swap_index_finder,
fitness=fitness, chunksize=chunksize)
+def gaussian_swap_index(a):
+ return (a + int(random.gauss(0, 4))) % 26
+
+def uniform_swap_index(a):
+ return random.randrange(26)
+
def simulated_annealing_break(message, workers=10,
initial_temperature=200,
max_iterations=20000,
plain_alphabet=None,
cipher_alphabet=None,
- fitness=Pletters, chunksize=1):
+ swap_index_finder=None,
+ fitness=Ptrigrams, chunksize=1):
worker_args = []
ciphertext = sanitise(message)
+ if swap_index_finder is None:
+ swap_index_finder = gaussian_swap_index
for i in range(workers):
- if not plain_alphabet:
- plain_alphabet = string.ascii_lowercase
- if not cipher_alphabet:
- cipher_alphabet = list(string.ascii_lowercase)
- random.shuffle(cipher_alphabet)
- cipher_alphabet = cat(cipher_alphabet)
- worker_args.append((ciphertext, plain_alphabet, cipher_alphabet,
- initial_temperature, max_iterations, fitness))
+ if plain_alphabet is None:
+ used_plain_alphabet = string.ascii_lowercase
+ else:
+ used_plain_alphabet = plain_alphabet
+ if cipher_alphabet is None:
+ used_cipher_alphabet = list(string.ascii_lowercase)
+ random.shuffle(used_cipher_alphabet)
+ used_cipher_alphabet = cat(used_cipher_alphabet)
+ else:
+ used_cipher_alphabet = cipher_alphabet
+ # if not plain_alphabet:
+ # plain_alphabet = string.ascii_lowercase
+ # if not cipher_alphabet:
+ # cipher_alphabet = list(string.ascii_lowercase)
+ # random.shuffle(cipher_alphabet)
+ # cipher_alphabet = cat(cipher_alphabet)
+ worker_args.append((ciphertext, used_plain_alphabet, used_cipher_alphabet,
+ swap_index_finder,
+ initial_temperature, max_iterations, fitness,
+ i))
with multiprocessing.Pool() as pool:
breaks = pool.starmap(simulated_annealing_break_worker,
worker_args, chunksize)
def simulated_annealing_break_worker(message, plain_alphabet, cipher_alphabet,
- t0, max_iterations, fitness):
+ swap_index_finder,
+ t0, max_iterations, fitness,
+ logID):
def swap(letters, i, j):
if i > j:
i, j = j, i
# print('starting for', max_iterations)
for i in range(max_iterations):
swap_a = random.randrange(26)
- swap_b = (swap_a + int(random.gauss(0, 4))) % 26
+ # swap_b = (swap_a + int(random.gauss(0, 4))) % 26
+ swap_b = swap_index_finder(swap_a)
alphabet = swap(current_alphabet, swap_a, swap_b)
cipher_translation = ''.maketrans(alphabet, plain_alphabet)
plaintext = message.translate(cipher_translation)
best_fitness = current_fitness
best_plaintext = plaintext
if i % 500 == 0:
- logger.debug('Simulated annealing: iteration {}, temperature {}, '
- 'current alphabet {}, current_fitness {}, '
- 'best_plaintext {}'.format(i, temperature, current_alphabet,
+ logger.debug('Simulated annealing worker {}: iteration {}, temperature {}, '
+ 'current alphabet {}, plain alphabet {}, current_fitness {}, '
+ 'best_plaintext {}'.format(logID, i, temperature, current_alphabet, plain_alphabet,
current_fitness, plaintext[:50]))
temperature = max(temperature - dt, 0.001)