kopia lustrzana https://github.com/thinkst/zippy
Porównaj commity
2 Commity
2954176173
...
513c6bb0b2
Autor | SHA1 | Data |
---|---|---|
Jacob Torrey | 513c6bb0b2 | |
Jacob Torrey | 8f07ef4cc7 |
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
import pytest, os, jsonlines, csv
|
import pytest, os, jsonlines, csv
|
||||||
from warnings import warn
|
from warnings import warn
|
||||||
from zippy import run_on_file_chunked, run_on_text_chunked, PRELUDE_STR, LzmaLlmDetector, CompressionEngine, ZlibLlmDetector, ENGINE
|
from zippy import Zippy, EnsembledZippy, PRELUDE_STR, LzmaLlmDetector, BrotliLlmDetector, ZlibLlmDetector, CompressionEngine
|
||||||
import zippy
|
import zippy
|
||||||
|
|
||||||
AI_SAMPLE_DIR = 'samples/llm-generated/'
|
AI_SAMPLE_DIR = 'samples/llm-generated/'
|
||||||
|
@ -16,19 +16,34 @@ human_files = os.listdir(HUMAN_SAMPLE_DIR)
|
||||||
|
|
||||||
CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning
|
CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning
|
||||||
|
|
||||||
if ENGINE == CompressionEngine.LZMA:
|
# Bool on whether to ensemble the models or run a single model
|
||||||
PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
ENSEMBLE = True
|
||||||
elif ENGINE == CompressionEngine.ZLIB:
|
|
||||||
PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
if not ENSEMBLE:
|
||||||
|
# What compression engine to use for the test
|
||||||
|
ENGINE = CompressionEngine.LZMA
|
||||||
|
|
||||||
|
if ENGINE == CompressionEngine.LZMA:
|
||||||
|
PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||||
|
elif ENGINE == CompressionEngine.ZLIB:
|
||||||
|
PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||||
|
elif ENGINE == CompressionEngine.BROTLI:
|
||||||
|
PRELUDE_RATIO = BrotliLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||||
|
|
||||||
|
zippy = Zippy(ENGINE)
|
||||||
|
|
||||||
|
else:
|
||||||
|
zippy = EnsembledZippy()
|
||||||
|
PRELUDE_RATIO = None
|
||||||
|
|
||||||
def test_training_file(record_property):
|
def test_training_file(record_property):
|
||||||
(classification, score) = run_on_file_chunked('ai-generated.txt')
|
(classification, score) = zippy.run_on_file_chunked('ai-generated.txt')
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is'
|
assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is'
|
||||||
|
|
||||||
@pytest.mark.parametrize('f', human_files)
|
@pytest.mark.parametrize('f', human_files)
|
||||||
def test_human_samples(f, record_property):
|
def test_human_samples(f, record_property):
|
||||||
(classification, score) = run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
if score > CONFIDENCE_THRESHOLD:
|
if score > CONFIDENCE_THRESHOLD:
|
||||||
assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||||
|
@ -40,7 +55,7 @@ def test_human_samples(f, record_property):
|
||||||
|
|
||||||
@pytest.mark.parametrize('f', ai_files)
|
@pytest.mark.parametrize('f', ai_files)
|
||||||
def test_llm_sample(f, record_property):
|
def test_llm_sample(f, record_property):
|
||||||
(classification, score) = run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
if score > CONFIDENCE_THRESHOLD:
|
if score > CONFIDENCE_THRESHOLD:
|
||||||
assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8))
|
assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||||
|
@ -59,7 +74,7 @@ with jsonlines.open(HUMAN_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_human_jsonl(i, record_property):
|
def test_human_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -98,13 +113,13 @@ with jsonlines.open(NEWS_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_humannews_jsonl(i, record_property):
|
def test_humannews_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_chatgptnews_jsonl(i, record_property):
|
def test_chatgptnews_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -117,7 +132,7 @@ with jsonlines.open(CHEAT_HUMAN_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_cheat_human_jsonl(i, record_property):
|
def test_cheat_human_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -130,7 +145,7 @@ with jsonlines.open(CHEAT_GEN_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_cheat_generation_jsonl(i, record_property):
|
def test_cheat_generation_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -143,7 +158,7 @@ with jsonlines.open(CHEAT_POLISH_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_cheat_polish_jsonl(i, record_property):
|
def test_cheat_polish_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -156,7 +171,7 @@ with jsonlines.open(CHEAT_VICUNAGEN_JSONL_FILE) as reader:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES])
|
@pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES])
|
||||||
def test_vicuna_generation_jsonl(i, record_property):
|
def test_vicuna_generation_jsonl(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
|
@ -170,12 +185,12 @@ with open(GPTZERO_EVAL_FILE) as fp:
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES])))
|
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES])))
|
||||||
def test_gptzero_eval_dataset_human(i, record_property):
|
def test_gptzero_eval_dataset_human(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
||||||
|
|
||||||
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES])))
|
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES])))
|
||||||
def test_gptzero_eval_dataset_ai(i, record_property):
|
def test_gptzero_eval_dataset_ai(i, record_property):
|
||||||
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
(classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||||
record_property("score", str(score))
|
record_property("score", str(score))
|
||||||
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
||||||
|
|
File diff suppressed because one or more lines are too long
21659
zippy-report.xml
21659
zippy-report.xml
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
369
zippy.py
369
zippy.py
|
@ -6,9 +6,12 @@
|
||||||
|
|
||||||
import lzma, argparse, os, itertools
|
import lzma, argparse, os, itertools
|
||||||
from zlib import compressobj, Z_FINISH
|
from zlib import compressobj, Z_FINISH
|
||||||
import re, sys
|
from brotli import compress as brotli_compress, MODE_TEXT
|
||||||
|
from numpy import array_split
|
||||||
|
import re, sys, statistics
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
from math import ceil
|
||||||
from typing import List, Optional, Tuple, TypeAlias
|
from typing import List, Optional, Tuple, TypeAlias
|
||||||
from multiprocessing import Pool, cpu_count
|
from multiprocessing import Pool, cpu_count
|
||||||
|
|
||||||
|
@ -17,8 +20,7 @@ Score : TypeAlias = tuple[str, float]
|
||||||
class CompressionEngine(Enum):
|
class CompressionEngine(Enum):
|
||||||
LZMA = 1
|
LZMA = 1
|
||||||
ZLIB = 2
|
ZLIB = 2
|
||||||
|
BROTLI = 3
|
||||||
ENGINE : CompressionEngine = CompressionEngine.ZLIB
|
|
||||||
|
|
||||||
def clean_text(s : str) -> str:
|
def clean_text(s : str) -> str:
|
||||||
'''
|
'''
|
||||||
|
@ -49,19 +51,21 @@ class AIDetector(ABC):
|
||||||
def score_text(self, sample : str) -> Optional[Score]:
|
def score_text(self, sample : str) -> Optional[Score]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class ZlibLlmDetector(AIDetector):
|
class BrotliLlmDetector(AIDetector):
|
||||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm'''
|
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the brotli compression algorithm'''
|
||||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None):
|
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 8):
|
||||||
self.PRESET = 9
|
self.PRESET = preset
|
||||||
self.WBITS = -15
|
self.WIN_SIZE = 24
|
||||||
|
self.BLOCK_SIZE = 0
|
||||||
self.prelude_ratio = 0.0
|
self.prelude_ratio = 0.0
|
||||||
if prelude_ratio != None:
|
if prelude_ratio != None:
|
||||||
self.prelude_ratio = prelude_ratio
|
self.prelude_ratio = prelude_ratio
|
||||||
|
|
||||||
if prelude_file != None:
|
if prelude_file != None:
|
||||||
with open(prelude_file) as fp:
|
with open(prelude_file) as fp:
|
||||||
self.prelude_str = fp.read()
|
self.prelude_str = clean_text(fp.read())
|
||||||
self.prelude_ratio = self._compress(self.prelude_str)
|
self.prelude_ratio = self._compress(self.prelude_str)
|
||||||
|
return
|
||||||
|
|
||||||
if prelude_str != None:
|
if prelude_str != None:
|
||||||
self.prelude_str = prelude_str
|
self.prelude_str = prelude_str
|
||||||
|
@ -69,11 +73,7 @@ class ZlibLlmDetector(AIDetector):
|
||||||
|
|
||||||
def _compress(self, s : str) -> float:
|
def _compress(self, s : str) -> float:
|
||||||
orig_len = len(s.encode())
|
orig_len = len(s.encode())
|
||||||
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9)
|
c_len = len(brotli_compress(s.encode(), mode=MODE_TEXT, quality=self.PRESET, lgwin=self.WIN_SIZE, lgblock=self.BLOCK_SIZE))
|
||||||
bytes = c.compress(s.encode())
|
|
||||||
bytes += c.flush(Z_FINISH)
|
|
||||||
c_len = len(bytes)
|
|
||||||
#c_len = len(compress(s.encode(), level=self.PRESET, wbits=self.WBITS))
|
|
||||||
return c_len / orig_len
|
return c_len / orig_len
|
||||||
|
|
||||||
def score_text(self, sample: str) -> Score | None:
|
def score_text(self, sample: str) -> Score | None:
|
||||||
|
@ -84,7 +84,54 @@ class ZlibLlmDetector(AIDetector):
|
||||||
if self.prelude_ratio == 0.0:
|
if self.prelude_ratio == 0.0:
|
||||||
return None
|
return None
|
||||||
sample_score = self._compress(self.prelude_str + sample)
|
sample_score = self._compress(self.prelude_str + sample)
|
||||||
#print(str((self.prelude_ratio, sample_score)))
|
#print('Brotli: ' + str((self.prelude_ratio, sample_score)))
|
||||||
|
delta = self.prelude_ratio - sample_score
|
||||||
|
determination = 'AI'
|
||||||
|
if delta < 0:
|
||||||
|
determination = 'Human'
|
||||||
|
|
||||||
|
return (determination, abs(delta * 100))
|
||||||
|
|
||||||
|
class ZlibLlmDetector(AIDetector):
|
||||||
|
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm'''
|
||||||
|
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 9):
|
||||||
|
self.PRESET = preset
|
||||||
|
self.WBITS = -15
|
||||||
|
self.prelude_ratio = 0.0
|
||||||
|
if prelude_ratio != None:
|
||||||
|
self.prelude_ratio = prelude_ratio
|
||||||
|
|
||||||
|
if prelude_file != None:
|
||||||
|
with open(prelude_file) as fp:
|
||||||
|
self.prelude_str = clean_text(fp.read())
|
||||||
|
lines = self.prelude_str.split('\n')
|
||||||
|
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
|
||||||
|
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
|
||||||
|
return
|
||||||
|
|
||||||
|
if prelude_str != None:
|
||||||
|
self.prelude_str = prelude_str
|
||||||
|
lines = self.prelude_str.split('\n')
|
||||||
|
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
|
||||||
|
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
|
||||||
|
|
||||||
|
def _compress(self, s : str) -> float:
|
||||||
|
orig_len = len(s.encode())
|
||||||
|
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9)
|
||||||
|
bytes = c.compress(s.encode())
|
||||||
|
bytes += c.flush(Z_FINISH)
|
||||||
|
c_len = len(bytes)
|
||||||
|
return c_len / orig_len
|
||||||
|
|
||||||
|
def score_text(self, sample: str) -> Score | None:
|
||||||
|
'''
|
||||||
|
Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated
|
||||||
|
by either an AI or human. Returns None if it cannot make a determination
|
||||||
|
'''
|
||||||
|
if self.prelude_ratio == 0.0:
|
||||||
|
return None
|
||||||
|
sample_score = statistics.mean(map(lambda x: self._compress('\n'.join(x) + sample), self.prelude_chunks))
|
||||||
|
#print('ZLIB: ' + str((self.prelude_ratio, sample_score)))
|
||||||
delta = self.prelude_ratio - sample_score
|
delta = self.prelude_ratio - sample_score
|
||||||
determination = 'AI'
|
determination = 'AI'
|
||||||
if delta < 0:
|
if delta < 0:
|
||||||
|
@ -95,64 +142,34 @@ class ZlibLlmDetector(AIDetector):
|
||||||
|
|
||||||
class LzmaLlmDetector(AIDetector):
|
class LzmaLlmDetector(AIDetector):
|
||||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm'''
|
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm'''
|
||||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None) -> None:
|
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 3) -> None:
|
||||||
'''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression'''
|
'''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression'''
|
||||||
self.PRESET : int = 2
|
self.PRESET : int = preset
|
||||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
|
||||||
self.c_buf : List[bytes] = []
|
self.c_buf : List[bytes] = []
|
||||||
self.in_bytes : int = 0
|
self.in_bytes : int = 0
|
||||||
self.prelude_ratio : float = 0.0
|
self.prelude_ratio : float = 0.0
|
||||||
if prelude_ratio != None:
|
if prelude_ratio != None:
|
||||||
self.prelude_ratio = prelude_ratio
|
self.prelude_ratio = prelude_ratio
|
||||||
self.SHORT_SAMPLE_THRESHOLD : int = 350 # What sample length is considered "short"
|
|
||||||
|
|
||||||
if prelude_file != None:
|
if prelude_file != None:
|
||||||
# Read it once to get the default compression ratio for the prelude
|
# Read it once to get the default compression ratio for the prelude
|
||||||
with open(prelude_file, 'r') as fp:
|
with open(prelude_file, 'r') as fp:
|
||||||
self._compress_str(fp.read())
|
self.prelude_str = fp.read()
|
||||||
self.prelude_ratio = self._finalize()
|
self.prelude_ratio = self._compress(self.prelude_str)
|
||||||
|
return
|
||||||
#print(prelude_file + ' ratio: ' + str(self.prelude_ratio))
|
#print(prelude_file + ' ratio: ' + str(self.prelude_ratio))
|
||||||
# Redo this to prime the compressor
|
|
||||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
|
||||||
with open(prelude_file, 'r') as fp:
|
|
||||||
self._compress_str(fp.read())
|
|
||||||
|
|
||||||
if prelude_str != None:
|
if prelude_str != None:
|
||||||
if self.prelude_ratio == 0.0:
|
if self.prelude_ratio == 0.0:
|
||||||
self._compress_str(prelude_str)
|
self.prelude_ratio = self._compress(prelude_str)
|
||||||
self.prelude_ratio = self._finalize()
|
|
||||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
def _compress(self, s : str) -> float:
|
||||||
self._compress_str(prelude_str)
|
orig_len = len(s.encode())
|
||||||
|
c = lzma.LZMACompressor(preset=self.PRESET)
|
||||||
def _compress_str(self, s : str) -> None:
|
bytes = c.compress(s.encode())
|
||||||
'''
|
bytes += c.flush()
|
||||||
Internal helper function to compress a string
|
c_len = len(bytes)
|
||||||
'''
|
return c_len / orig_len
|
||||||
strb : bytes = s.encode('ascii', errors='ignore')
|
|
||||||
self.c_buf.append(self.comp.compress(strb))
|
|
||||||
self.in_bytes += len(strb)
|
|
||||||
|
|
||||||
def _finalize(self) -> float:
|
|
||||||
'''
|
|
||||||
Finalizes an LZMA compression cycle and returns the percentage compression ratio
|
|
||||||
|
|
||||||
post: _ >= 0
|
|
||||||
'''
|
|
||||||
self.c_buf.append(self.comp.flush())
|
|
||||||
compressed_size : int = len(b''.join(self.c_buf))
|
|
||||||
if self.in_bytes == 0:
|
|
||||||
return 0.0
|
|
||||||
score = compressed_size / self.in_bytes
|
|
||||||
self.in_bytes = 0
|
|
||||||
self.c_buf = []
|
|
||||||
return score
|
|
||||||
|
|
||||||
def get_compression_ratio(self, s : str) -> Tuple[float, float]:
|
|
||||||
'''
|
|
||||||
Returns a tuple of floats with the compression ratio of the prelude (0 if no prelude) and passed string
|
|
||||||
'''
|
|
||||||
self._compress_str(s)
|
|
||||||
return (self.prelude_ratio, self._finalize())
|
|
||||||
|
|
||||||
def score_text(self, sample : str) -> Optional[Score]:
|
def score_text(self, sample : str) -> Optional[Score]:
|
||||||
'''
|
'''
|
||||||
|
@ -161,104 +178,188 @@ class LzmaLlmDetector(AIDetector):
|
||||||
'''
|
'''
|
||||||
if self.prelude_ratio == 0.0:
|
if self.prelude_ratio == 0.0:
|
||||||
return None
|
return None
|
||||||
(prelude_score, sample_score) = self.get_compression_ratio(sample)
|
#print('LZMA: ' + str((self.prelude_ratio, sample_score)))
|
||||||
print(str((self.prelude_ratio, sample_score)))
|
delta = self.prelude_ratio - self._compress(self.prelude_str + sample)
|
||||||
delta = prelude_score - sample_score
|
|
||||||
determination = 'AI'
|
determination = 'AI'
|
||||||
if delta < 0:
|
if delta < 0:
|
||||||
determination = 'Human'
|
determination = 'Human'
|
||||||
|
|
||||||
return (determination, abs(delta * 100))
|
return (determination, abs(delta * 100))
|
||||||
|
|
||||||
def run_on_file(filename : str) -> Optional[Score]:
|
class Zippy:
|
||||||
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
|
'''
|
||||||
with open(filename, 'r') as fp:
|
Class to wrap the functionality of Zippy
|
||||||
if ENGINE == CompressionEngine.LZMA:
|
'''
|
||||||
l = LzmaLlmDetector(prelude_file=PRELUDE_FILE)
|
def __init__(self, engine : CompressionEngine = CompressionEngine.LZMA, preset : Optional[int] = None) -> None:
|
||||||
elif ENGINE == CompressionEngine.ZLIB:
|
self.ENGINE = engine
|
||||||
l = ZlibLlmDetector(prelude_file=PRELUDE_FILE)
|
self.PRESET = preset
|
||||||
txt = fp.read()
|
if engine == CompressionEngine.LZMA:
|
||||||
#print('Calculating score for input of length ' + str(len(txt)))
|
if self.PRESET:
|
||||||
return l.score_text(txt)
|
self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||||
|
else:
|
||||||
|
self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE)
|
||||||
|
elif engine == CompressionEngine.BROTLI:
|
||||||
|
if self.PRESET:
|
||||||
|
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||||
|
else:
|
||||||
|
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE)
|
||||||
|
elif engine == CompressionEngine.ZLIB:
|
||||||
|
if self.PRESET:
|
||||||
|
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||||
|
else:
|
||||||
|
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE)
|
||||||
|
|
||||||
def _score_chunk(c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
def run_on_file(self, filename : str) -> Optional[Score]:
|
||||||
if prelude_file != None:
|
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
|
||||||
if ENGINE == CompressionEngine.LZMA:
|
with open(filename, 'r') as fp:
|
||||||
l = LzmaLlmDetector(prelude_file=prelude_file)
|
txt = fp.read()
|
||||||
if ENGINE == CompressionEngine.ZLIB:
|
#print('Calculating score for input of length ' + str(len(txt)))
|
||||||
l = ZlibLlmDetector(prelude_file=prelude_file)
|
return self.detector.score_text(txt)
|
||||||
|
|
||||||
|
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
||||||
|
if prelude_file is None and prelude_ratio != None:
|
||||||
|
self.detector.prelude_str = PRELUDE_STR
|
||||||
|
self.detector.prelude_ratio = prelude_ratio
|
||||||
|
|
||||||
|
return self.detector.score_text(c)
|
||||||
|
|
||||||
|
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||||
|
'''
|
||||||
|
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
|
||||||
|
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||||
|
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||||
|
'''
|
||||||
|
with open(filename, 'r') as fp:
|
||||||
|
contents = fp.read()
|
||||||
|
return self.run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
|
||||||
|
|
||||||
|
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||||
|
'''
|
||||||
|
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
|
||||||
|
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||||
|
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||||
|
'''
|
||||||
|
contents = clean_text(s)
|
||||||
|
|
||||||
|
start = 0
|
||||||
|
end = 0
|
||||||
|
chunks = []
|
||||||
|
while start + chunk_size < len(contents) and end != -1:
|
||||||
|
end = contents.rfind(' ', start, start + chunk_size + 1)
|
||||||
|
chunks.append(contents[start:end])
|
||||||
|
start = end + 1
|
||||||
|
chunks.append(contents[start:])
|
||||||
|
scores = []
|
||||||
|
if len(chunks) > 2:
|
||||||
|
with Pool(cpu_count()) as pool:
|
||||||
|
for r in pool.starmap(self._score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
|
||||||
|
scores.append(r)
|
||||||
else:
|
else:
|
||||||
if ENGINE == CompressionEngine.LZMA:
|
for c in chunks:
|
||||||
l = LzmaLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio)
|
scores.append(self._score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
|
||||||
if ENGINE == CompressionEngine.ZLIB:
|
ssum : float = 0.0
|
||||||
l = ZlibLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio)
|
for i, s in enumerate(scores):
|
||||||
return l.score_text(c)
|
if s[0] == 'AI':
|
||||||
|
ssum -= s[1] * (len(chunks[i]) / len(contents))
|
||||||
def run_on_file_chunked(filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
else:
|
||||||
'''
|
ssum += s[1] * (len(chunks[i]) / len(contents))
|
||||||
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
|
sa : float = ssum
|
||||||
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
if sa < 0:
|
||||||
being skewed because its compression ratio starts to overwhelm the prelude file.
|
return ('AI', abs(sa))
|
||||||
'''
|
|
||||||
with open(filename, 'r') as fp:
|
|
||||||
contents = fp.read()
|
|
||||||
return run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
|
|
||||||
|
|
||||||
def run_on_text_chunked(s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
|
||||||
'''
|
|
||||||
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
|
|
||||||
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
|
||||||
being skewed because its compression ratio starts to overwhelm the prelude file.
|
|
||||||
'''
|
|
||||||
contents = clean_text(s)
|
|
||||||
|
|
||||||
start = 0
|
|
||||||
end = 0
|
|
||||||
chunks = []
|
|
||||||
while start + chunk_size < len(contents) and end != -1:
|
|
||||||
end = contents.rfind(' ', start, start + chunk_size + 1)
|
|
||||||
chunks.append(contents[start:end])
|
|
||||||
start = end + 1
|
|
||||||
chunks.append(contents[start:])
|
|
||||||
scores = []
|
|
||||||
if len(chunks) > 2:
|
|
||||||
with Pool(cpu_count()) as pool:
|
|
||||||
for r in pool.starmap(_score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
|
|
||||||
scores.append(r)
|
|
||||||
else:
|
|
||||||
for c in chunks:
|
|
||||||
scores.append(_score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
|
|
||||||
ssum : float = 0.0
|
|
||||||
for i, s in enumerate(scores):
|
|
||||||
if s[0] == 'AI':
|
|
||||||
ssum -= s[1] * (len(chunks[i]) / len(contents))
|
|
||||||
else:
|
else:
|
||||||
ssum += s[1] * (len(chunks[i]) / len(contents))
|
return ('Human', abs(sa))
|
||||||
sa : float = ssum# / len(scores)
|
|
||||||
if sa < 0:
|
|
||||||
return ('AI', abs(sa))
|
|
||||||
else:
|
|
||||||
return ('Human', abs(sa))
|
|
||||||
|
|
||||||
|
class EnsembledZippy:
|
||||||
|
'''
|
||||||
|
Class to wrap the functionality of Zippy into an ensemble
|
||||||
|
'''
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.ENGINES = [CompressionEngine.LZMA, CompressionEngine.BROTLI, CompressionEngine.ZLIB]
|
||||||
|
self.WEIGHTS = [.33, .33, .33]
|
||||||
|
self.component_classifiers : list[AIDetector] = []
|
||||||
|
for i, e in enumerate(self.ENGINES):
|
||||||
|
self.component_classifiers.append(Zippy(e))
|
||||||
|
|
||||||
|
def _combine_scores(self, scores : list[Score]) -> Score:
|
||||||
|
ssum : float = 0.0
|
||||||
|
for i, s in enumerate(scores):
|
||||||
|
if s[0] == 'AI':
|
||||||
|
ssum -= s[1] * self.WEIGHTS[i]
|
||||||
|
else:
|
||||||
|
ssum += s[1] * self.WEIGHTS[i]
|
||||||
|
sa : float = ssum
|
||||||
|
if sa < 0:
|
||||||
|
return ('AI', abs(sa))
|
||||||
|
else:
|
||||||
|
return ('Human', abs(sa))
|
||||||
|
|
||||||
|
def run_on_file(self, filename : str) -> Optional[Score]:
|
||||||
|
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
|
||||||
|
with open(filename, 'r') as fp:
|
||||||
|
txt = fp.read()
|
||||||
|
scores = []
|
||||||
|
for c in self.component_classifiers:
|
||||||
|
scores.append(c.score_text(txt))
|
||||||
|
return self._combine_scores(scores)
|
||||||
|
|
||||||
|
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
||||||
|
scores = []
|
||||||
|
for c in self.component_classifiers:
|
||||||
|
scores.append(c.score_text(c))
|
||||||
|
return self._combine_scores(scores)
|
||||||
|
|
||||||
|
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||||
|
'''
|
||||||
|
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
|
||||||
|
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||||
|
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||||
|
'''
|
||||||
|
with open(filename, 'r') as fp:
|
||||||
|
contents = fp.read()
|
||||||
|
return self.run_on_text_chunked(contents, chunk_size)
|
||||||
|
|
||||||
|
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||||
|
'''
|
||||||
|
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
|
||||||
|
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||||
|
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||||
|
'''
|
||||||
|
scores = []
|
||||||
|
for c in self.component_classifiers:
|
||||||
|
scores.append(c.run_on_text_chunked(s, chunk_size=chunk_size))
|
||||||
|
return self._combine_scores(scores)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("-e", choices=['zlib', 'lzma'], help='Which compression engine to use: lzma or zlib', default='lzma', required=False)
|
parser.add_argument("-e", choices=['zlib', 'lzma', 'brotli', 'ensemble'], help='Which compression engine to use: lzma, zlib, brotli, or an ensemble of all engines', default='lzma', required=False)
|
||||||
group = parser.add_mutually_exclusive_group()
|
group = parser.add_mutually_exclusive_group()
|
||||||
group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true')
|
group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true')
|
||||||
group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="")
|
group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
engine = 'lzma'
|
||||||
if args.e:
|
if args.e:
|
||||||
if args.e == 'lzma':
|
if args.e == 'lzma':
|
||||||
ENGINE = CompressionEngine.LZMA
|
engine = CompressionEngine.LZMA
|
||||||
elif args.e == 'zlib':
|
elif args.e == 'zlib':
|
||||||
ENGINE = CompressionEngine.ZLIB
|
engine = CompressionEngine.ZLIB
|
||||||
|
elif args.e == 'brotli':
|
||||||
|
engine = CompressionEngine.BROTLI
|
||||||
|
elif args.e == 'ensemble':
|
||||||
|
engine = None
|
||||||
if args.s:
|
if args.s:
|
||||||
print(str(run_on_text_chunked(''.join(list(sys.stdin)))))
|
if engine:
|
||||||
|
z = Zippy(engine)
|
||||||
|
else:
|
||||||
|
z = EnsembledZippy()
|
||||||
|
print(str(z.run_on_text_chunked(''.join(list(sys.stdin)))))
|
||||||
elif len(args.sample_files) == 0:
|
elif len(args.sample_files) == 0:
|
||||||
print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.")
|
print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.")
|
||||||
else:
|
else:
|
||||||
|
if engine:
|
||||||
|
z = Zippy(engine)
|
||||||
|
else:
|
||||||
|
z = EnsembledZippy()
|
||||||
for f in args.sample_files:
|
for f in args.sample_files:
|
||||||
print(f)
|
print(f)
|
||||||
if os.path.isfile(f):
|
if os.path.isfile(f):
|
||||||
print(str(run_on_file_chunked(f)))
|
print(str(z.run_on_file_chunked(f)))
|
||||||
|
|
Ładowanie…
Reference in New Issue