Added the ability to ensemble different compression models

Signed-off-by: Jacob Torrey <jacob@thinkst.com>
pull/6/head
Jacob Torrey 2023-09-27 13:39:45 -06:00
rodzic 8f07ef4cc7
commit 513c6bb0b2
2 zmienionych plików z 267 dodań i 151 usunięć

Wyświetl plik

@ -2,7 +2,7 @@
import pytest, os, jsonlines, csv import pytest, os, jsonlines, csv
from warnings import warn from warnings import warn
from zippy import run_on_file_chunked, run_on_text_chunked, PRELUDE_STR, LzmaLlmDetector, CompressionEngine, ZlibLlmDetector, ENGINE from zippy import Zippy, EnsembledZippy, PRELUDE_STR, LzmaLlmDetector, BrotliLlmDetector, ZlibLlmDetector, CompressionEngine
import zippy import zippy
AI_SAMPLE_DIR = 'samples/llm-generated/' AI_SAMPLE_DIR = 'samples/llm-generated/'
@ -16,19 +16,34 @@ human_files = os.listdir(HUMAN_SAMPLE_DIR)
CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning
if ENGINE == CompressionEngine.LZMA: # Bool on whether to ensemble the models or run a single model
PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio ENSEMBLE = True
elif ENGINE == CompressionEngine.ZLIB:
PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio if not ENSEMBLE:
# What compression engine to use for the test
ENGINE = CompressionEngine.LZMA
if ENGINE == CompressionEngine.LZMA:
PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
elif ENGINE == CompressionEngine.ZLIB:
PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
elif ENGINE == CompressionEngine.BROTLI:
PRELUDE_RATIO = BrotliLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
zippy = Zippy(ENGINE)
else:
zippy = EnsembledZippy()
PRELUDE_RATIO = None
def test_training_file(record_property): def test_training_file(record_property):
(classification, score) = run_on_file_chunked('ai-generated.txt') (classification, score) = zippy.run_on_file_chunked('ai-generated.txt')
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is' assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is'
@pytest.mark.parametrize('f', human_files) @pytest.mark.parametrize('f', human_files)
def test_human_samples(f, record_property): def test_human_samples(f, record_property):
(classification, score) = run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
if score > CONFIDENCE_THRESHOLD: if score > CONFIDENCE_THRESHOLD:
assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8)) assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8))
@ -40,7 +55,7 @@ def test_human_samples(f, record_property):
@pytest.mark.parametrize('f', ai_files) @pytest.mark.parametrize('f', ai_files)
def test_llm_sample(f, record_property): def test_llm_sample(f, record_property):
(classification, score) = run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
if score > CONFIDENCE_THRESHOLD: if score > CONFIDENCE_THRESHOLD:
assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8)) assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8))
@ -59,7 +74,7 @@ with jsonlines.open(HUMAN_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES])
def test_human_jsonl(i, record_property): def test_human_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
@ -98,13 +113,13 @@ with jsonlines.open(NEWS_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
def test_humannews_jsonl(i, record_property): def test_humannews_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
def test_chatgptnews_jsonl(i, record_property): def test_chatgptnews_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
@ -117,7 +132,7 @@ with jsonlines.open(CHEAT_HUMAN_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES])
def test_cheat_human_jsonl(i, record_property): def test_cheat_human_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8)) assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
@ -130,7 +145,7 @@ with jsonlines.open(CHEAT_GEN_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES])
def test_cheat_generation_jsonl(i, record_property): def test_cheat_generation_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
@ -143,7 +158,7 @@ with jsonlines.open(CHEAT_POLISH_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES])
def test_cheat_polish_jsonl(i, record_property): def test_cheat_polish_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
@ -156,7 +171,7 @@ with jsonlines.open(CHEAT_VICUNAGEN_JSONL_FILE) as reader:
@pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES]) @pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES])
def test_vicuna_generation_jsonl(i, record_property): def test_vicuna_generation_jsonl(i, record_property):
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8)) assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
@ -170,12 +185,12 @@ with open(GPTZERO_EVAL_FILE) as fp:
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES]))) @pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES])))
def test_gptzero_eval_dataset_human(i, record_property): def test_gptzero_eval_dataset_human(i, record_property):
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8)) assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES]))) @pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES])))
def test_gptzero_eval_dataset_ai(i, record_property): def test_gptzero_eval_dataset_ai(i, record_property):
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO) (classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
record_property("score", str(score)) record_property("score", str(score))
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8)) assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))

369
zippy.py
Wyświetl plik

@ -6,9 +6,12 @@
import lzma, argparse, os, itertools import lzma, argparse, os, itertools
from zlib import compressobj, Z_FINISH from zlib import compressobj, Z_FINISH
import re, sys from brotli import compress as brotli_compress, MODE_TEXT
from numpy import array_split
import re, sys, statistics
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
from math import ceil
from typing import List, Optional, Tuple, TypeAlias from typing import List, Optional, Tuple, TypeAlias
from multiprocessing import Pool, cpu_count from multiprocessing import Pool, cpu_count
@ -17,8 +20,7 @@ Score : TypeAlias = tuple[str, float]
class CompressionEngine(Enum): class CompressionEngine(Enum):
LZMA = 1 LZMA = 1
ZLIB = 2 ZLIB = 2
BROTLI = 3
ENGINE : CompressionEngine = CompressionEngine.ZLIB
def clean_text(s : str) -> str: def clean_text(s : str) -> str:
''' '''
@ -49,19 +51,21 @@ class AIDetector(ABC):
def score_text(self, sample : str) -> Optional[Score]: def score_text(self, sample : str) -> Optional[Score]:
pass pass
class ZlibLlmDetector(AIDetector): class BrotliLlmDetector(AIDetector):
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm''' '''Class providing functionality to attempt to detect LLM/generative AI generated text using the brotli compression algorithm'''
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None): def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 8):
self.PRESET = 9 self.PRESET = preset
self.WBITS = -15 self.WIN_SIZE = 24
self.BLOCK_SIZE = 0
self.prelude_ratio = 0.0 self.prelude_ratio = 0.0
if prelude_ratio != None: if prelude_ratio != None:
self.prelude_ratio = prelude_ratio self.prelude_ratio = prelude_ratio
if prelude_file != None: if prelude_file != None:
with open(prelude_file) as fp: with open(prelude_file) as fp:
self.prelude_str = fp.read() self.prelude_str = clean_text(fp.read())
self.prelude_ratio = self._compress(self.prelude_str) self.prelude_ratio = self._compress(self.prelude_str)
return
if prelude_str != None: if prelude_str != None:
self.prelude_str = prelude_str self.prelude_str = prelude_str
@ -69,11 +73,7 @@ class ZlibLlmDetector(AIDetector):
def _compress(self, s : str) -> float: def _compress(self, s : str) -> float:
orig_len = len(s.encode()) orig_len = len(s.encode())
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9) c_len = len(brotli_compress(s.encode(), mode=MODE_TEXT, quality=self.PRESET, lgwin=self.WIN_SIZE, lgblock=self.BLOCK_SIZE))
bytes = c.compress(s.encode())
bytes += c.flush(Z_FINISH)
c_len = len(bytes)
#c_len = len(compress(s.encode(), level=self.PRESET, wbits=self.WBITS))
return c_len / orig_len return c_len / orig_len
def score_text(self, sample: str) -> Score | None: def score_text(self, sample: str) -> Score | None:
@ -84,7 +84,54 @@ class ZlibLlmDetector(AIDetector):
if self.prelude_ratio == 0.0: if self.prelude_ratio == 0.0:
return None return None
sample_score = self._compress(self.prelude_str + sample) sample_score = self._compress(self.prelude_str + sample)
#print(str((self.prelude_ratio, sample_score))) #print('Brotli: ' + str((self.prelude_ratio, sample_score)))
delta = self.prelude_ratio - sample_score
determination = 'AI'
if delta < 0:
determination = 'Human'
return (determination, abs(delta * 100))
class ZlibLlmDetector(AIDetector):
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm'''
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 9):
self.PRESET = preset
self.WBITS = -15
self.prelude_ratio = 0.0
if prelude_ratio != None:
self.prelude_ratio = prelude_ratio
if prelude_file != None:
with open(prelude_file) as fp:
self.prelude_str = clean_text(fp.read())
lines = self.prelude_str.split('\n')
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
return
if prelude_str != None:
self.prelude_str = prelude_str
lines = self.prelude_str.split('\n')
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
def _compress(self, s : str) -> float:
orig_len = len(s.encode())
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9)
bytes = c.compress(s.encode())
bytes += c.flush(Z_FINISH)
c_len = len(bytes)
return c_len / orig_len
def score_text(self, sample: str) -> Score | None:
'''
Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated
by either an AI or human. Returns None if it cannot make a determination
'''
if self.prelude_ratio == 0.0:
return None
sample_score = statistics.mean(map(lambda x: self._compress('\n'.join(x) + sample), self.prelude_chunks))
#print('ZLIB: ' + str((self.prelude_ratio, sample_score)))
delta = self.prelude_ratio - sample_score delta = self.prelude_ratio - sample_score
determination = 'AI' determination = 'AI'
if delta < 0: if delta < 0:
@ -95,64 +142,34 @@ class ZlibLlmDetector(AIDetector):
class LzmaLlmDetector(AIDetector): class LzmaLlmDetector(AIDetector):
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm''' '''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm'''
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None) -> None: def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 3) -> None:
'''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression''' '''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression'''
self.PRESET : int = 2 self.PRESET : int = preset
self.comp = lzma.LZMACompressor(preset=self.PRESET)
self.c_buf : List[bytes] = [] self.c_buf : List[bytes] = []
self.in_bytes : int = 0 self.in_bytes : int = 0
self.prelude_ratio : float = 0.0 self.prelude_ratio : float = 0.0
if prelude_ratio != None: if prelude_ratio != None:
self.prelude_ratio = prelude_ratio self.prelude_ratio = prelude_ratio
self.SHORT_SAMPLE_THRESHOLD : int = 350 # What sample length is considered "short"
if prelude_file != None: if prelude_file != None:
# Read it once to get the default compression ratio for the prelude # Read it once to get the default compression ratio for the prelude
with open(prelude_file, 'r') as fp: with open(prelude_file, 'r') as fp:
self._compress_str(fp.read()) self.prelude_str = fp.read()
self.prelude_ratio = self._finalize() self.prelude_ratio = self._compress(self.prelude_str)
return
#print(prelude_file + ' ratio: ' + str(self.prelude_ratio)) #print(prelude_file + ' ratio: ' + str(self.prelude_ratio))
# Redo this to prime the compressor
self.comp = lzma.LZMACompressor(preset=self.PRESET)
with open(prelude_file, 'r') as fp:
self._compress_str(fp.read())
if prelude_str != None: if prelude_str != None:
if self.prelude_ratio == 0.0: if self.prelude_ratio == 0.0:
self._compress_str(prelude_str) self.prelude_ratio = self._compress(prelude_str)
self.prelude_ratio = self._finalize()
self.comp = lzma.LZMACompressor(preset=self.PRESET) def _compress(self, s : str) -> float:
self._compress_str(prelude_str) orig_len = len(s.encode())
c = lzma.LZMACompressor(preset=self.PRESET)
def _compress_str(self, s : str) -> None: bytes = c.compress(s.encode())
''' bytes += c.flush()
Internal helper function to compress a string c_len = len(bytes)
''' return c_len / orig_len
strb : bytes = s.encode('ascii', errors='ignore')
self.c_buf.append(self.comp.compress(strb))
self.in_bytes += len(strb)
def _finalize(self) -> float:
'''
Finalizes an LZMA compression cycle and returns the percentage compression ratio
post: _ >= 0
'''
self.c_buf.append(self.comp.flush())
compressed_size : int = len(b''.join(self.c_buf))
if self.in_bytes == 0:
return 0.0
score = compressed_size / self.in_bytes
self.in_bytes = 0
self.c_buf = []
return score
def get_compression_ratio(self, s : str) -> Tuple[float, float]:
'''
Returns a tuple of floats with the compression ratio of the prelude (0 if no prelude) and passed string
'''
self._compress_str(s)
return (self.prelude_ratio, self._finalize())
def score_text(self, sample : str) -> Optional[Score]: def score_text(self, sample : str) -> Optional[Score]:
''' '''
@ -161,104 +178,188 @@ class LzmaLlmDetector(AIDetector):
''' '''
if self.prelude_ratio == 0.0: if self.prelude_ratio == 0.0:
return None return None
(prelude_score, sample_score) = self.get_compression_ratio(sample) #print('LZMA: ' + str((self.prelude_ratio, sample_score)))
print(str((self.prelude_ratio, sample_score))) delta = self.prelude_ratio - self._compress(self.prelude_str + sample)
delta = prelude_score - sample_score
determination = 'AI' determination = 'AI'
if delta < 0: if delta < 0:
determination = 'Human' determination = 'Human'
return (determination, abs(delta * 100)) return (determination, abs(delta * 100))
def run_on_file(filename : str) -> Optional[Score]: class Zippy:
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file''' '''
with open(filename, 'r') as fp: Class to wrap the functionality of Zippy
if ENGINE == CompressionEngine.LZMA: '''
l = LzmaLlmDetector(prelude_file=PRELUDE_FILE) def __init__(self, engine : CompressionEngine = CompressionEngine.LZMA, preset : Optional[int] = None) -> None:
elif ENGINE == CompressionEngine.ZLIB: self.ENGINE = engine
l = ZlibLlmDetector(prelude_file=PRELUDE_FILE) self.PRESET = preset
txt = fp.read() if engine == CompressionEngine.LZMA:
#print('Calculating score for input of length ' + str(len(txt))) if self.PRESET:
return l.score_text(txt) self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
else:
self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE)
elif engine == CompressionEngine.BROTLI:
if self.PRESET:
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
else:
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE)
elif engine == CompressionEngine.ZLIB:
if self.PRESET:
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
else:
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE)
def _score_chunk(c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score: def run_on_file(self, filename : str) -> Optional[Score]:
if prelude_file != None: '''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
if ENGINE == CompressionEngine.LZMA: with open(filename, 'r') as fp:
l = LzmaLlmDetector(prelude_file=prelude_file) txt = fp.read()
if ENGINE == CompressionEngine.ZLIB: #print('Calculating score for input of length ' + str(len(txt)))
l = ZlibLlmDetector(prelude_file=prelude_file) return self.detector.score_text(txt)
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
if prelude_file is None and prelude_ratio != None:
self.detector.prelude_str = PRELUDE_STR
self.detector.prelude_ratio = prelude_ratio
return self.detector.score_text(c)
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
'''
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
being skewed because its compression ratio starts to overwhelm the prelude file.
'''
with open(filename, 'r') as fp:
contents = fp.read()
return self.run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
'''
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
being skewed because its compression ratio starts to overwhelm the prelude file.
'''
contents = clean_text(s)
start = 0
end = 0
chunks = []
while start + chunk_size < len(contents) and end != -1:
end = contents.rfind(' ', start, start + chunk_size + 1)
chunks.append(contents[start:end])
start = end + 1
chunks.append(contents[start:])
scores = []
if len(chunks) > 2:
with Pool(cpu_count()) as pool:
for r in pool.starmap(self._score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
scores.append(r)
else: else:
if ENGINE == CompressionEngine.LZMA: for c in chunks:
l = LzmaLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio) scores.append(self._score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
if ENGINE == CompressionEngine.ZLIB: ssum : float = 0.0
l = ZlibLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio) for i, s in enumerate(scores):
return l.score_text(c) if s[0] == 'AI':
ssum -= s[1] * (len(chunks[i]) / len(contents))
def run_on_file_chunked(filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]: else:
''' ssum += s[1] * (len(chunks[i]) / len(contents))
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file. sa : float = ssum
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input if sa < 0:
being skewed because its compression ratio starts to overwhelm the prelude file. return ('AI', abs(sa))
'''
with open(filename, 'r') as fp:
contents = fp.read()
return run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
def run_on_text_chunked(s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
'''
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
being skewed because its compression ratio starts to overwhelm the prelude file.
'''
contents = clean_text(s)
start = 0
end = 0
chunks = []
while start + chunk_size < len(contents) and end != -1:
end = contents.rfind(' ', start, start + chunk_size + 1)
chunks.append(contents[start:end])
start = end + 1
chunks.append(contents[start:])
scores = []
if len(chunks) > 2:
with Pool(cpu_count()) as pool:
for r in pool.starmap(_score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
scores.append(r)
else:
for c in chunks:
scores.append(_score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
ssum : float = 0.0
for i, s in enumerate(scores):
if s[0] == 'AI':
ssum -= s[1] * (len(chunks[i]) / len(contents))
else: else:
ssum += s[1] * (len(chunks[i]) / len(contents)) return ('Human', abs(sa))
sa : float = ssum# / len(scores)
if sa < 0:
return ('AI', abs(sa))
else:
return ('Human', abs(sa))
class EnsembledZippy:
'''
Class to wrap the functionality of Zippy into an ensemble
'''
def __init__(self) -> None:
self.ENGINES = [CompressionEngine.LZMA, CompressionEngine.BROTLI, CompressionEngine.ZLIB]
self.WEIGHTS = [.33, .33, .33]
self.component_classifiers : list[AIDetector] = []
for i, e in enumerate(self.ENGINES):
self.component_classifiers.append(Zippy(e))
def _combine_scores(self, scores : list[Score]) -> Score:
ssum : float = 0.0
for i, s in enumerate(scores):
if s[0] == 'AI':
ssum -= s[1] * self.WEIGHTS[i]
else:
ssum += s[1] * self.WEIGHTS[i]
sa : float = ssum
if sa < 0:
return ('AI', abs(sa))
else:
return ('Human', abs(sa))
def run_on_file(self, filename : str) -> Optional[Score]:
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
with open(filename, 'r') as fp:
txt = fp.read()
scores = []
for c in self.component_classifiers:
scores.append(c.score_text(txt))
return self._combine_scores(scores)
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
scores = []
for c in self.component_classifiers:
scores.append(c.score_text(c))
return self._combine_scores(scores)
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
'''
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
being skewed because its compression ratio starts to overwhelm the prelude file.
'''
with open(filename, 'r') as fp:
contents = fp.read()
return self.run_on_text_chunked(contents, chunk_size)
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
'''
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
being skewed because its compression ratio starts to overwhelm the prelude file.
'''
scores = []
for c in self.component_classifiers:
scores.append(c.run_on_text_chunked(s, chunk_size=chunk_size))
return self._combine_scores(scores)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-e", choices=['zlib', 'lzma'], help='Which compression engine to use: lzma or zlib', default='lzma', required=False) parser.add_argument("-e", choices=['zlib', 'lzma', 'brotli', 'ensemble'], help='Which compression engine to use: lzma, zlib, brotli, or an ensemble of all engines', default='lzma', required=False)
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true') group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true')
group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="") group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="")
args = parser.parse_args() args = parser.parse_args()
engine = 'lzma'
if args.e: if args.e:
if args.e == 'lzma': if args.e == 'lzma':
ENGINE = CompressionEngine.LZMA engine = CompressionEngine.LZMA
elif args.e == 'zlib': elif args.e == 'zlib':
ENGINE = CompressionEngine.ZLIB engine = CompressionEngine.ZLIB
elif args.e == 'brotli':
engine = CompressionEngine.BROTLI
elif args.e == 'ensemble':
engine = None
if args.s: if args.s:
print(str(run_on_text_chunked(''.join(list(sys.stdin))))) if engine:
z = Zippy(engine)
else:
z = EnsembledZippy()
print(str(z.run_on_text_chunked(''.join(list(sys.stdin)))))
elif len(args.sample_files) == 0: elif len(args.sample_files) == 0:
print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.") print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.")
else: else:
if engine:
z = Zippy(engine)
else:
z = EnsembledZippy()
for f in args.sample_files: for f in args.sample_files:
print(f) print(f)
if os.path.isfile(f): if os.path.isfile(f):
print(str(run_on_file_chunked(f))) print(str(z.run_on_file_chunked(f)))