diff --git a/test_zippy_detect.py b/test_zippy_detect.py index dc7a28e..1ecd5b0 100644 --- a/test_zippy_detect.py +++ b/test_zippy_detect.py @@ -2,7 +2,7 @@ import pytest, os, jsonlines, csv from warnings import warn -from zippy import run_on_file_chunked, run_on_text_chunked, PRELUDE_STR, LzmaLlmDetector, CompressionEngine, ZlibLlmDetector, ENGINE +from zippy import Zippy, EnsembledZippy, PRELUDE_STR, LzmaLlmDetector, BrotliLlmDetector, ZlibLlmDetector, CompressionEngine import zippy AI_SAMPLE_DIR = 'samples/llm-generated/' @@ -16,19 +16,34 @@ human_files = os.listdir(HUMAN_SAMPLE_DIR) CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning -if ENGINE == CompressionEngine.LZMA: - PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio -elif ENGINE == CompressionEngine.ZLIB: - PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio +# Bool on whether to ensemble the models or run a single model +ENSEMBLE = True + +if not ENSEMBLE: + # What compression engine to use for the test + ENGINE = CompressionEngine.LZMA + + if ENGINE == CompressionEngine.LZMA: + PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio + elif ENGINE == CompressionEngine.ZLIB: + PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio + elif ENGINE == CompressionEngine.BROTLI: + PRELUDE_RATIO = BrotliLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio + + zippy = Zippy(ENGINE) + +else: + zippy = EnsembledZippy() + PRELUDE_RATIO = None def test_training_file(record_property): - (classification, score) = run_on_file_chunked('ai-generated.txt') + (classification, score) = zippy.run_on_file_chunked('ai-generated.txt') record_property("score", str(score)) assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is' @pytest.mark.parametrize('f', human_files) def test_human_samples(f, record_property): - (classification, score) = run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) if score > CONFIDENCE_THRESHOLD: assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8)) @@ -40,7 +55,7 @@ def test_human_samples(f, record_property): @pytest.mark.parametrize('f', ai_files) def test_llm_sample(f, record_property): - (classification, score) = run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) if score > CONFIDENCE_THRESHOLD: assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8)) @@ -59,7 +74,7 @@ with jsonlines.open(HUMAN_JSONL_FILE) as reader: @pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES]) def test_human_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) @@ -98,13 +113,13 @@ with jsonlines.open(NEWS_JSONL_FILE) as reader: @pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES]) def test_humannews_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) @pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES]) def test_chatgptnews_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) @@ -117,7 +132,7 @@ with jsonlines.open(CHEAT_HUMAN_JSONL_FILE) as reader: @pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES]) def test_cheat_human_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) @@ -130,7 +145,7 @@ with jsonlines.open(CHEAT_GEN_JSONL_FILE) as reader: @pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES]) def test_cheat_generation_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) @@ -143,7 +158,7 @@ with jsonlines.open(CHEAT_POLISH_JSONL_FILE) as reader: @pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES]) def test_cheat_polish_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) @@ -156,7 +171,7 @@ with jsonlines.open(CHEAT_VICUNAGEN_JSONL_FILE) as reader: @pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES]) def test_vicuna_generation_jsonl(i, record_property): - (classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) @@ -170,12 +185,12 @@ with open(GPTZERO_EVAL_FILE) as fp: @pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES]))) def test_gptzero_eval_dataset_human(i, record_property): - (classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8)) @pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES]))) def test_gptzero_eval_dataset_ai(i, record_property): - (classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) + (classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) record_property("score", str(score)) assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8)) diff --git a/zippy.py b/zippy.py index 3455818..82382d8 100755 --- a/zippy.py +++ b/zippy.py @@ -6,9 +6,12 @@ import lzma, argparse, os, itertools from zlib import compressobj, Z_FINISH -import re, sys +from brotli import compress as brotli_compress, MODE_TEXT +from numpy import array_split +import re, sys, statistics from abc import ABC, abstractmethod from enum import Enum +from math import ceil from typing import List, Optional, Tuple, TypeAlias from multiprocessing import Pool, cpu_count @@ -17,8 +20,7 @@ Score : TypeAlias = tuple[str, float] class CompressionEngine(Enum): LZMA = 1 ZLIB = 2 - -ENGINE : CompressionEngine = CompressionEngine.ZLIB + BROTLI = 3 def clean_text(s : str) -> str: ''' @@ -49,19 +51,21 @@ class AIDetector(ABC): def score_text(self, sample : str) -> Optional[Score]: pass -class ZlibLlmDetector(AIDetector): - '''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm''' - def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None): - self.PRESET = 9 - self.WBITS = -15 +class BrotliLlmDetector(AIDetector): + '''Class providing functionality to attempt to detect LLM/generative AI generated text using the brotli compression algorithm''' + def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 8): + self.PRESET = preset + self.WIN_SIZE = 24 + self.BLOCK_SIZE = 0 self.prelude_ratio = 0.0 if prelude_ratio != None: self.prelude_ratio = prelude_ratio if prelude_file != None: with open(prelude_file) as fp: - self.prelude_str = fp.read() - self.prelude_ratio = self._compress(self.prelude_str) + self.prelude_str = clean_text(fp.read()) + self.prelude_ratio = self._compress(self.prelude_str) + return if prelude_str != None: self.prelude_str = prelude_str @@ -69,11 +73,7 @@ class ZlibLlmDetector(AIDetector): def _compress(self, s : str) -> float: orig_len = len(s.encode()) - c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9) - bytes = c.compress(s.encode()) - bytes += c.flush(Z_FINISH) - c_len = len(bytes) - #c_len = len(compress(s.encode(), level=self.PRESET, wbits=self.WBITS)) + c_len = len(brotli_compress(s.encode(), mode=MODE_TEXT, quality=self.PRESET, lgwin=self.WIN_SIZE, lgblock=self.BLOCK_SIZE)) return c_len / orig_len def score_text(self, sample: str) -> Score | None: @@ -84,7 +84,54 @@ class ZlibLlmDetector(AIDetector): if self.prelude_ratio == 0.0: return None sample_score = self._compress(self.prelude_str + sample) - #print(str((self.prelude_ratio, sample_score))) + #print('Brotli: ' + str((self.prelude_ratio, sample_score))) + delta = self.prelude_ratio - sample_score + determination = 'AI' + if delta < 0: + determination = 'Human' + + return (determination, abs(delta * 100)) + +class ZlibLlmDetector(AIDetector): + '''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm''' + def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 9): + self.PRESET = preset + self.WBITS = -15 + self.prelude_ratio = 0.0 + if prelude_ratio != None: + self.prelude_ratio = prelude_ratio + + if prelude_file != None: + with open(prelude_file) as fp: + self.prelude_str = clean_text(fp.read()) + lines = self.prelude_str.split('\n') + self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS))) + self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks)) + return + + if prelude_str != None: + self.prelude_str = prelude_str + lines = self.prelude_str.split('\n') + self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS))) + self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks)) + + def _compress(self, s : str) -> float: + orig_len = len(s.encode()) + c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9) + bytes = c.compress(s.encode()) + bytes += c.flush(Z_FINISH) + c_len = len(bytes) + return c_len / orig_len + + def score_text(self, sample: str) -> Score | None: + ''' + Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated + by either an AI or human. Returns None if it cannot make a determination + ''' + if self.prelude_ratio == 0.0: + return None + sample_score = statistics.mean(map(lambda x: self._compress('\n'.join(x) + sample), self.prelude_chunks)) + #print('ZLIB: ' + str((self.prelude_ratio, sample_score))) delta = self.prelude_ratio - sample_score determination = 'AI' if delta < 0: @@ -95,64 +142,34 @@ class ZlibLlmDetector(AIDetector): class LzmaLlmDetector(AIDetector): '''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm''' - def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None) -> None: + def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 3) -> None: '''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression''' - self.PRESET : int = 2 - self.comp = lzma.LZMACompressor(preset=self.PRESET) + self.PRESET : int = preset self.c_buf : List[bytes] = [] self.in_bytes : int = 0 self.prelude_ratio : float = 0.0 if prelude_ratio != None: self.prelude_ratio = prelude_ratio - self.SHORT_SAMPLE_THRESHOLD : int = 350 # What sample length is considered "short" if prelude_file != None: # Read it once to get the default compression ratio for the prelude with open(prelude_file, 'r') as fp: - self._compress_str(fp.read()) - self.prelude_ratio = self._finalize() + self.prelude_str = fp.read() + self.prelude_ratio = self._compress(self.prelude_str) + return #print(prelude_file + ' ratio: ' + str(self.prelude_ratio)) - # Redo this to prime the compressor - self.comp = lzma.LZMACompressor(preset=self.PRESET) - with open(prelude_file, 'r') as fp: - self._compress_str(fp.read()) if prelude_str != None: if self.prelude_ratio == 0.0: - self._compress_str(prelude_str) - self.prelude_ratio = self._finalize() - self.comp = lzma.LZMACompressor(preset=self.PRESET) - self._compress_str(prelude_str) - - def _compress_str(self, s : str) -> None: - ''' - Internal helper function to compress a string - ''' - strb : bytes = s.encode('ascii', errors='ignore') - self.c_buf.append(self.comp.compress(strb)) - self.in_bytes += len(strb) - - def _finalize(self) -> float: - ''' - Finalizes an LZMA compression cycle and returns the percentage compression ratio - - post: _ >= 0 - ''' - self.c_buf.append(self.comp.flush()) - compressed_size : int = len(b''.join(self.c_buf)) - if self.in_bytes == 0: - return 0.0 - score = compressed_size / self.in_bytes - self.in_bytes = 0 - self.c_buf = [] - return score - - def get_compression_ratio(self, s : str) -> Tuple[float, float]: - ''' - Returns a tuple of floats with the compression ratio of the prelude (0 if no prelude) and passed string - ''' - self._compress_str(s) - return (self.prelude_ratio, self._finalize()) + self.prelude_ratio = self._compress(prelude_str) + + def _compress(self, s : str) -> float: + orig_len = len(s.encode()) + c = lzma.LZMACompressor(preset=self.PRESET) + bytes = c.compress(s.encode()) + bytes += c.flush() + c_len = len(bytes) + return c_len / orig_len def score_text(self, sample : str) -> Optional[Score]: ''' @@ -161,104 +178,188 @@ class LzmaLlmDetector(AIDetector): ''' if self.prelude_ratio == 0.0: return None - (prelude_score, sample_score) = self.get_compression_ratio(sample) - print(str((self.prelude_ratio, sample_score))) - delta = prelude_score - sample_score + #print('LZMA: ' + str((self.prelude_ratio, sample_score))) + delta = self.prelude_ratio - self._compress(self.prelude_str + sample) determination = 'AI' if delta < 0: determination = 'Human' return (determination, abs(delta * 100)) -def run_on_file(filename : str) -> Optional[Score]: - '''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file''' - with open(filename, 'r') as fp: - if ENGINE == CompressionEngine.LZMA: - l = LzmaLlmDetector(prelude_file=PRELUDE_FILE) - elif ENGINE == CompressionEngine.ZLIB: - l = ZlibLlmDetector(prelude_file=PRELUDE_FILE) - txt = fp.read() - #print('Calculating score for input of length ' + str(len(txt))) - return l.score_text(txt) +class Zippy: + ''' + Class to wrap the functionality of Zippy + ''' + def __init__(self, engine : CompressionEngine = CompressionEngine.LZMA, preset : Optional[int] = None) -> None: + self.ENGINE = engine + self.PRESET = preset + if engine == CompressionEngine.LZMA: + if self.PRESET: + self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET) + else: + self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE) + elif engine == CompressionEngine.BROTLI: + if self.PRESET: + self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET) + else: + self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE) + elif engine == CompressionEngine.ZLIB: + if self.PRESET: + self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET) + else: + self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE) -def _score_chunk(c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score: - if prelude_file != None: - if ENGINE == CompressionEngine.LZMA: - l = LzmaLlmDetector(prelude_file=prelude_file) - if ENGINE == CompressionEngine.ZLIB: - l = ZlibLlmDetector(prelude_file=prelude_file) + def run_on_file(self, filename : str) -> Optional[Score]: + '''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file''' + with open(filename, 'r') as fp: + txt = fp.read() + #print('Calculating score for input of length ' + str(len(txt))) + return self.detector.score_text(txt) + + def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score: + if prelude_file is None and prelude_ratio != None: + self.detector.prelude_str = PRELUDE_STR + self.detector.prelude_ratio = prelude_ratio + + return self.detector.score_text(c) + + def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]: + ''' + Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file. + This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input + being skewed because its compression ratio starts to overwhelm the prelude file. + ''' + with open(filename, 'r') as fp: + contents = fp.read() + return self.run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio) + + def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]: + ''' + Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string. + This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input + being skewed because its compression ratio starts to overwhelm the prelude file. + ''' + contents = clean_text(s) + + start = 0 + end = 0 + chunks = [] + while start + chunk_size < len(contents) and end != -1: + end = contents.rfind(' ', start, start + chunk_size + 1) + chunks.append(contents[start:end]) + start = end + 1 + chunks.append(contents[start:]) + scores = [] + if len(chunks) > 2: + with Pool(cpu_count()) as pool: + for r in pool.starmap(self._score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))): + scores.append(r) else: - if ENGINE == CompressionEngine.LZMA: - l = LzmaLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio) - if ENGINE == CompressionEngine.ZLIB: - l = ZlibLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio) - return l.score_text(c) - -def run_on_file_chunked(filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]: - ''' - Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file. - This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input - being skewed because its compression ratio starts to overwhelm the prelude file. - ''' - with open(filename, 'r') as fp: - contents = fp.read() - return run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio) - -def run_on_text_chunked(s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]: - ''' - Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string. - This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input - being skewed because its compression ratio starts to overwhelm the prelude file. - ''' - contents = clean_text(s) - - start = 0 - end = 0 - chunks = [] - while start + chunk_size < len(contents) and end != -1: - end = contents.rfind(' ', start, start + chunk_size + 1) - chunks.append(contents[start:end]) - start = end + 1 - chunks.append(contents[start:]) - scores = [] - if len(chunks) > 2: - with Pool(cpu_count()) as pool: - for r in pool.starmap(_score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))): - scores.append(r) - else: - for c in chunks: - scores.append(_score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio)) - ssum : float = 0.0 - for i, s in enumerate(scores): - if s[0] == 'AI': - ssum -= s[1] * (len(chunks[i]) / len(contents)) + for c in chunks: + scores.append(self._score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio)) + ssum : float = 0.0 + for i, s in enumerate(scores): + if s[0] == 'AI': + ssum -= s[1] * (len(chunks[i]) / len(contents)) + else: + ssum += s[1] * (len(chunks[i]) / len(contents)) + sa : float = ssum + if sa < 0: + return ('AI', abs(sa)) else: - ssum += s[1] * (len(chunks[i]) / len(contents)) - sa : float = ssum# / len(scores) - if sa < 0: - return ('AI', abs(sa)) - else: - return ('Human', abs(sa)) + return ('Human', abs(sa)) +class EnsembledZippy: + ''' + Class to wrap the functionality of Zippy into an ensemble + ''' + def __init__(self) -> None: + self.ENGINES = [CompressionEngine.LZMA, CompressionEngine.BROTLI, CompressionEngine.ZLIB] + self.WEIGHTS = [.33, .33, .33] + self.component_classifiers : list[AIDetector] = [] + for i, e in enumerate(self.ENGINES): + self.component_classifiers.append(Zippy(e)) + + def _combine_scores(self, scores : list[Score]) -> Score: + ssum : float = 0.0 + for i, s in enumerate(scores): + if s[0] == 'AI': + ssum -= s[1] * self.WEIGHTS[i] + else: + ssum += s[1] * self.WEIGHTS[i] + sa : float = ssum + if sa < 0: + return ('AI', abs(sa)) + else: + return ('Human', abs(sa)) + + def run_on_file(self, filename : str) -> Optional[Score]: + '''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file''' + with open(filename, 'r') as fp: + txt = fp.read() + scores = [] + for c in self.component_classifiers: + scores.append(c.score_text(txt)) + return self._combine_scores(scores) + + def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score: + scores = [] + for c in self.component_classifiers: + scores.append(c.score_text(c)) + return self._combine_scores(scores) + + def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]: + ''' + Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file. + This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input + being skewed because its compression ratio starts to overwhelm the prelude file. + ''' + with open(filename, 'r') as fp: + contents = fp.read() + return self.run_on_text_chunked(contents, chunk_size) + + def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]: + ''' + Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string. + This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input + being skewed because its compression ratio starts to overwhelm the prelude file. + ''' + scores = [] + for c in self.component_classifiers: + scores.append(c.run_on_text_chunked(s, chunk_size=chunk_size)) + return self._combine_scores(scores) if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument("-e", choices=['zlib', 'lzma'], help='Which compression engine to use: lzma or zlib', default='lzma', required=False) + parser.add_argument("-e", choices=['zlib', 'lzma', 'brotli', 'ensemble'], help='Which compression engine to use: lzma, zlib, brotli, or an ensemble of all engines', default='lzma', required=False) group = parser.add_mutually_exclusive_group() group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true') group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="") args = parser.parse_args() + engine = 'lzma' if args.e: if args.e == 'lzma': - ENGINE = CompressionEngine.LZMA + engine = CompressionEngine.LZMA elif args.e == 'zlib': - ENGINE = CompressionEngine.ZLIB + engine = CompressionEngine.ZLIB + elif args.e == 'brotli': + engine = CompressionEngine.BROTLI + elif args.e == 'ensemble': + engine = None if args.s: - print(str(run_on_text_chunked(''.join(list(sys.stdin))))) + if engine: + z = Zippy(engine) + else: + z = EnsembledZippy() + print(str(z.run_on_text_chunked(''.join(list(sys.stdin))))) elif len(args.sample_files) == 0: print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.") else: + if engine: + z = Zippy(engine) + else: + z = EnsembledZippy() for f in args.sample_files: print(f) if os.path.isfile(f): - print(str(run_on_file_chunked(f))) + print(str(z.run_on_file_chunked(f)))