kopia lustrzana https://github.com/thinkst/zippy
Porównaj commity
2 Commity
2954176173
...
513c6bb0b2
Autor | SHA1 | Data |
---|---|---|
Jacob Torrey | 513c6bb0b2 | |
Jacob Torrey | 8f07ef4cc7 |
|
@ -2,7 +2,7 @@
|
|||
|
||||
import pytest, os, jsonlines, csv
|
||||
from warnings import warn
|
||||
from zippy import run_on_file_chunked, run_on_text_chunked, PRELUDE_STR, LzmaLlmDetector, CompressionEngine, ZlibLlmDetector, ENGINE
|
||||
from zippy import Zippy, EnsembledZippy, PRELUDE_STR, LzmaLlmDetector, BrotliLlmDetector, ZlibLlmDetector, CompressionEngine
|
||||
import zippy
|
||||
|
||||
AI_SAMPLE_DIR = 'samples/llm-generated/'
|
||||
|
@ -16,19 +16,34 @@ human_files = os.listdir(HUMAN_SAMPLE_DIR)
|
|||
|
||||
CONFIDENCE_THRESHOLD : float = 0.00 # What confidence to treat as error vs warning
|
||||
|
||||
if ENGINE == CompressionEngine.LZMA:
|
||||
# Bool on whether to ensemble the models or run a single model
|
||||
ENSEMBLE = True
|
||||
|
||||
if not ENSEMBLE:
|
||||
# What compression engine to use for the test
|
||||
ENGINE = CompressionEngine.LZMA
|
||||
|
||||
if ENGINE == CompressionEngine.LZMA:
|
||||
PRELUDE_RATIO = LzmaLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||
elif ENGINE == CompressionEngine.ZLIB:
|
||||
elif ENGINE == CompressionEngine.ZLIB:
|
||||
PRELUDE_RATIO = ZlibLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||
elif ENGINE == CompressionEngine.BROTLI:
|
||||
PRELUDE_RATIO = BrotliLlmDetector(prelude_str=PRELUDE_STR).prelude_ratio
|
||||
|
||||
zippy = Zippy(ENGINE)
|
||||
|
||||
else:
|
||||
zippy = EnsembledZippy()
|
||||
PRELUDE_RATIO = None
|
||||
|
||||
def test_training_file(record_property):
|
||||
(classification, score) = run_on_file_chunked('ai-generated.txt')
|
||||
(classification, score) = zippy.run_on_file_chunked('ai-generated.txt')
|
||||
record_property("score", str(score))
|
||||
assert classification == 'AI', 'The training corpus should always be detected as AI-generated... since it is'
|
||||
|
||||
@pytest.mark.parametrize('f', human_files)
|
||||
def test_human_samples(f, record_property):
|
||||
(classification, score) = run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_file_chunked(HUMAN_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
if score > CONFIDENCE_THRESHOLD:
|
||||
assert classification == 'Human', f + ' is a human-generated file, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||
|
@ -40,7 +55,7 @@ def test_human_samples(f, record_property):
|
|||
|
||||
@pytest.mark.parametrize('f', ai_files)
|
||||
def test_llm_sample(f, record_property):
|
||||
(classification, score) = run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_file_chunked(AI_SAMPLE_DIR + f, prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
if score > CONFIDENCE_THRESHOLD:
|
||||
assert classification == 'AI', f + ' is an LLM-generated file, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||
|
@ -59,7 +74,7 @@ with jsonlines.open(HUMAN_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', human_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_human_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('text', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'Human', HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' (len: ' + str(i.get('length', -1)) + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -98,13 +113,13 @@ with jsonlines.open(NEWS_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_humannews_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('human', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'Human', NEWS_JSONL_FILE + ' is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
@pytest.mark.parametrize('i', news_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_chatgptnews_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('chatgpt', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'AI', NEWS_JSONL_FILE + ' is a AI-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -117,7 +132,7 @@ with jsonlines.open(CHEAT_HUMAN_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', ch_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_cheat_human_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'Human', CHEAT_HUMAN_JSONL_FILE + ':' + str(i.get('id')) + ' [' + str(len(i.get('abstract', ''))) + '] (title: ' + i.get('title', "").replace('\n', ' ')[:15] + ') is a human-generated sample, misclassified as AI-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -130,7 +145,7 @@ with jsonlines.open(CHEAT_GEN_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', cg_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_cheat_generation_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'AI', CHEAT_GEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -143,7 +158,7 @@ with jsonlines.open(CHEAT_POLISH_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', cp_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_cheat_polish_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'AI', CHEAT_POLISH_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -156,7 +171,7 @@ with jsonlines.open(CHEAT_VICUNAGEN_JSONL_FILE) as reader:
|
|||
|
||||
@pytest.mark.parametrize('i', vg_samples[0:NUM_JSONL_SAMPLES])
|
||||
def test_vicuna_generation_jsonl(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('abstract', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == 'AI', CHEAT_VICUNAGEN_JSONL_FILE + ':' + str(i.get('id')) + ' (title: ' + i.get('title', "").replace('\n', ' ')[:50] + ') is an LLM-generated sample, misclassified as human-generated with confidence ' + str(round(score, 8))
|
||||
|
||||
|
@ -170,12 +185,12 @@ with open(GPTZERO_EVAL_FILE) as fp:
|
|||
|
||||
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'Human', ge_samples[0:NUM_JSONL_SAMPLES])))
|
||||
def test_gptzero_eval_dataset_human(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
||||
|
||||
@pytest.mark.parametrize('i', list(filter(lambda x: x.get('Label') == 'AI', ge_samples[0:NUM_JSONL_SAMPLES])))
|
||||
def test_gptzero_eval_dataset_ai(i, record_property):
|
||||
(classification, score) = run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
(classification, score) = zippy.run_on_text_chunked(i.get('Document', ''), prelude_ratio=PRELUDE_RATIO)
|
||||
record_property("score", str(score))
|
||||
assert classification == i.get('Label'), GPTZERO_EVAL_FILE + ':' + str(i.get('Index')) + ' was misclassified with confidence ' + str(round(score, 8))
|
||||
|
|
File diff suppressed because one or more lines are too long
21659
zippy-report.xml
21659
zippy-report.xml
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
281
zippy.py
281
zippy.py
|
@ -6,9 +6,12 @@
|
|||
|
||||
import lzma, argparse, os, itertools
|
||||
from zlib import compressobj, Z_FINISH
|
||||
import re, sys
|
||||
from brotli import compress as brotli_compress, MODE_TEXT
|
||||
from numpy import array_split
|
||||
import re, sys, statistics
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from math import ceil
|
||||
from typing import List, Optional, Tuple, TypeAlias
|
||||
from multiprocessing import Pool, cpu_count
|
||||
|
||||
|
@ -17,8 +20,7 @@ Score : TypeAlias = tuple[str, float]
|
|||
class CompressionEngine(Enum):
|
||||
LZMA = 1
|
||||
ZLIB = 2
|
||||
|
||||
ENGINE : CompressionEngine = CompressionEngine.ZLIB
|
||||
BROTLI = 3
|
||||
|
||||
def clean_text(s : str) -> str:
|
||||
'''
|
||||
|
@ -49,19 +51,21 @@ class AIDetector(ABC):
|
|||
def score_text(self, sample : str) -> Optional[Score]:
|
||||
pass
|
||||
|
||||
class ZlibLlmDetector(AIDetector):
|
||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm'''
|
||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None):
|
||||
self.PRESET = 9
|
||||
self.WBITS = -15
|
||||
class BrotliLlmDetector(AIDetector):
|
||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the brotli compression algorithm'''
|
||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 8):
|
||||
self.PRESET = preset
|
||||
self.WIN_SIZE = 24
|
||||
self.BLOCK_SIZE = 0
|
||||
self.prelude_ratio = 0.0
|
||||
if prelude_ratio != None:
|
||||
self.prelude_ratio = prelude_ratio
|
||||
|
||||
if prelude_file != None:
|
||||
with open(prelude_file) as fp:
|
||||
self.prelude_str = fp.read()
|
||||
self.prelude_str = clean_text(fp.read())
|
||||
self.prelude_ratio = self._compress(self.prelude_str)
|
||||
return
|
||||
|
||||
if prelude_str != None:
|
||||
self.prelude_str = prelude_str
|
||||
|
@ -69,11 +73,7 @@ class ZlibLlmDetector(AIDetector):
|
|||
|
||||
def _compress(self, s : str) -> float:
|
||||
orig_len = len(s.encode())
|
||||
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9)
|
||||
bytes = c.compress(s.encode())
|
||||
bytes += c.flush(Z_FINISH)
|
||||
c_len = len(bytes)
|
||||
#c_len = len(compress(s.encode(), level=self.PRESET, wbits=self.WBITS))
|
||||
c_len = len(brotli_compress(s.encode(), mode=MODE_TEXT, quality=self.PRESET, lgwin=self.WIN_SIZE, lgblock=self.BLOCK_SIZE))
|
||||
return c_len / orig_len
|
||||
|
||||
def score_text(self, sample: str) -> Score | None:
|
||||
|
@ -84,7 +84,54 @@ class ZlibLlmDetector(AIDetector):
|
|||
if self.prelude_ratio == 0.0:
|
||||
return None
|
||||
sample_score = self._compress(self.prelude_str + sample)
|
||||
#print(str((self.prelude_ratio, sample_score)))
|
||||
#print('Brotli: ' + str((self.prelude_ratio, sample_score)))
|
||||
delta = self.prelude_ratio - sample_score
|
||||
determination = 'AI'
|
||||
if delta < 0:
|
||||
determination = 'Human'
|
||||
|
||||
return (determination, abs(delta * 100))
|
||||
|
||||
class ZlibLlmDetector(AIDetector):
|
||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the zlib compression algorithm'''
|
||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 9):
|
||||
self.PRESET = preset
|
||||
self.WBITS = -15
|
||||
self.prelude_ratio = 0.0
|
||||
if prelude_ratio != None:
|
||||
self.prelude_ratio = prelude_ratio
|
||||
|
||||
if prelude_file != None:
|
||||
with open(prelude_file) as fp:
|
||||
self.prelude_str = clean_text(fp.read())
|
||||
lines = self.prelude_str.split('\n')
|
||||
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
|
||||
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
|
||||
return
|
||||
|
||||
if prelude_str != None:
|
||||
self.prelude_str = prelude_str
|
||||
lines = self.prelude_str.split('\n')
|
||||
self.prelude_chunks = array_split(lines, ceil(len(self.prelude_str) / 2**abs(self.WBITS)))
|
||||
self.prelude_ratio = statistics.mean(map(lambda x: self._compress('\n'.join(list(x))), self.prelude_chunks))
|
||||
|
||||
def _compress(self, s : str) -> float:
|
||||
orig_len = len(s.encode())
|
||||
c = compressobj(level=self.PRESET, wbits=self.WBITS, memLevel=9)
|
||||
bytes = c.compress(s.encode())
|
||||
bytes += c.flush(Z_FINISH)
|
||||
c_len = len(bytes)
|
||||
return c_len / orig_len
|
||||
|
||||
def score_text(self, sample: str) -> Score | None:
|
||||
'''
|
||||
Returns a tuple of a string (AI or Human) and a float confidence (higher is more confident) that the sample was generated
|
||||
by either an AI or human. Returns None if it cannot make a determination
|
||||
'''
|
||||
if self.prelude_ratio == 0.0:
|
||||
return None
|
||||
sample_score = statistics.mean(map(lambda x: self._compress('\n'.join(x) + sample), self.prelude_chunks))
|
||||
#print('ZLIB: ' + str((self.prelude_ratio, sample_score)))
|
||||
delta = self.prelude_ratio - sample_score
|
||||
determination = 'AI'
|
||||
if delta < 0:
|
||||
|
@ -95,64 +142,34 @@ class ZlibLlmDetector(AIDetector):
|
|||
|
||||
class LzmaLlmDetector(AIDetector):
|
||||
'''Class providing functionality to attempt to detect LLM/generative AI generated text using the LZMA compression algorithm'''
|
||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None) -> None:
|
||||
def __init__(self, prelude_file : Optional[str] = None, prelude_str : Optional[str] = None, prelude_ratio : Optional[float] = None, preset : int = 3) -> None:
|
||||
'''Initializes a compression with the passed prelude file, and optionally the number of digits to round to compare prelude vs. sample compression'''
|
||||
self.PRESET : int = 2
|
||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
||||
self.PRESET : int = preset
|
||||
self.c_buf : List[bytes] = []
|
||||
self.in_bytes : int = 0
|
||||
self.prelude_ratio : float = 0.0
|
||||
if prelude_ratio != None:
|
||||
self.prelude_ratio = prelude_ratio
|
||||
self.SHORT_SAMPLE_THRESHOLD : int = 350 # What sample length is considered "short"
|
||||
|
||||
if prelude_file != None:
|
||||
# Read it once to get the default compression ratio for the prelude
|
||||
with open(prelude_file, 'r') as fp:
|
||||
self._compress_str(fp.read())
|
||||
self.prelude_ratio = self._finalize()
|
||||
self.prelude_str = fp.read()
|
||||
self.prelude_ratio = self._compress(self.prelude_str)
|
||||
return
|
||||
#print(prelude_file + ' ratio: ' + str(self.prelude_ratio))
|
||||
# Redo this to prime the compressor
|
||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
||||
with open(prelude_file, 'r') as fp:
|
||||
self._compress_str(fp.read())
|
||||
|
||||
if prelude_str != None:
|
||||
if self.prelude_ratio == 0.0:
|
||||
self._compress_str(prelude_str)
|
||||
self.prelude_ratio = self._finalize()
|
||||
self.comp = lzma.LZMACompressor(preset=self.PRESET)
|
||||
self._compress_str(prelude_str)
|
||||
self.prelude_ratio = self._compress(prelude_str)
|
||||
|
||||
def _compress_str(self, s : str) -> None:
|
||||
'''
|
||||
Internal helper function to compress a string
|
||||
'''
|
||||
strb : bytes = s.encode('ascii', errors='ignore')
|
||||
self.c_buf.append(self.comp.compress(strb))
|
||||
self.in_bytes += len(strb)
|
||||
|
||||
def _finalize(self) -> float:
|
||||
'''
|
||||
Finalizes an LZMA compression cycle and returns the percentage compression ratio
|
||||
|
||||
post: _ >= 0
|
||||
'''
|
||||
self.c_buf.append(self.comp.flush())
|
||||
compressed_size : int = len(b''.join(self.c_buf))
|
||||
if self.in_bytes == 0:
|
||||
return 0.0
|
||||
score = compressed_size / self.in_bytes
|
||||
self.in_bytes = 0
|
||||
self.c_buf = []
|
||||
return score
|
||||
|
||||
def get_compression_ratio(self, s : str) -> Tuple[float, float]:
|
||||
'''
|
||||
Returns a tuple of floats with the compression ratio of the prelude (0 if no prelude) and passed string
|
||||
'''
|
||||
self._compress_str(s)
|
||||
return (self.prelude_ratio, self._finalize())
|
||||
def _compress(self, s : str) -> float:
|
||||
orig_len = len(s.encode())
|
||||
c = lzma.LZMACompressor(preset=self.PRESET)
|
||||
bytes = c.compress(s.encode())
|
||||
bytes += c.flush()
|
||||
c_len = len(bytes)
|
||||
return c_len / orig_len
|
||||
|
||||
def score_text(self, sample : str) -> Optional[Score]:
|
||||
'''
|
||||
|
@ -161,40 +178,52 @@ class LzmaLlmDetector(AIDetector):
|
|||
'''
|
||||
if self.prelude_ratio == 0.0:
|
||||
return None
|
||||
(prelude_score, sample_score) = self.get_compression_ratio(sample)
|
||||
print(str((self.prelude_ratio, sample_score)))
|
||||
delta = prelude_score - sample_score
|
||||
#print('LZMA: ' + str((self.prelude_ratio, sample_score)))
|
||||
delta = self.prelude_ratio - self._compress(self.prelude_str + sample)
|
||||
determination = 'AI'
|
||||
if delta < 0:
|
||||
determination = 'Human'
|
||||
|
||||
return (determination, abs(delta * 100))
|
||||
|
||||
def run_on_file(filename : str) -> Optional[Score]:
|
||||
class Zippy:
|
||||
'''
|
||||
Class to wrap the functionality of Zippy
|
||||
'''
|
||||
def __init__(self, engine : CompressionEngine = CompressionEngine.LZMA, preset : Optional[int] = None) -> None:
|
||||
self.ENGINE = engine
|
||||
self.PRESET = preset
|
||||
if engine == CompressionEngine.LZMA:
|
||||
if self.PRESET:
|
||||
self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||
else:
|
||||
self.detector = LzmaLlmDetector(prelude_file=PRELUDE_FILE)
|
||||
elif engine == CompressionEngine.BROTLI:
|
||||
if self.PRESET:
|
||||
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||
else:
|
||||
self.detector = BrotliLlmDetector(prelude_file=PRELUDE_FILE)
|
||||
elif engine == CompressionEngine.ZLIB:
|
||||
if self.PRESET:
|
||||
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE, preset=self.PRESET)
|
||||
else:
|
||||
self.detector = ZlibLlmDetector(prelude_file=PRELUDE_FILE)
|
||||
|
||||
def run_on_file(self, filename : str) -> Optional[Score]:
|
||||
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
|
||||
with open(filename, 'r') as fp:
|
||||
if ENGINE == CompressionEngine.LZMA:
|
||||
l = LzmaLlmDetector(prelude_file=PRELUDE_FILE)
|
||||
elif ENGINE == CompressionEngine.ZLIB:
|
||||
l = ZlibLlmDetector(prelude_file=PRELUDE_FILE)
|
||||
txt = fp.read()
|
||||
#print('Calculating score for input of length ' + str(len(txt)))
|
||||
return l.score_text(txt)
|
||||
return self.detector.score_text(txt)
|
||||
|
||||
def _score_chunk(c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
||||
if prelude_file != None:
|
||||
if ENGINE == CompressionEngine.LZMA:
|
||||
l = LzmaLlmDetector(prelude_file=prelude_file)
|
||||
if ENGINE == CompressionEngine.ZLIB:
|
||||
l = ZlibLlmDetector(prelude_file=prelude_file)
|
||||
else:
|
||||
if ENGINE == CompressionEngine.LZMA:
|
||||
l = LzmaLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio)
|
||||
if ENGINE == CompressionEngine.ZLIB:
|
||||
l = ZlibLlmDetector(prelude_str=PRELUDE_STR, prelude_ratio=prelude_ratio)
|
||||
return l.score_text(c)
|
||||
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
||||
if prelude_file is None and prelude_ratio != None:
|
||||
self.detector.prelude_str = PRELUDE_STR
|
||||
self.detector.prelude_ratio = prelude_ratio
|
||||
|
||||
def run_on_file_chunked(filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
return self.detector.score_text(c)
|
||||
|
||||
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
'''
|
||||
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
|
||||
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||
|
@ -202,9 +231,9 @@ def run_on_file_chunked(filename : str, chunk_size : int = 1500, prelude_ratio :
|
|||
'''
|
||||
with open(filename, 'r') as fp:
|
||||
contents = fp.read()
|
||||
return run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
|
||||
return self.run_on_text_chunked(contents, chunk_size, prelude_ratio=prelude_ratio)
|
||||
|
||||
def run_on_text_chunked(s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
'''
|
||||
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
|
||||
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||
|
@ -223,42 +252,114 @@ def run_on_text_chunked(s : str, chunk_size : int = 1500, prelude_file : Optiona
|
|||
scores = []
|
||||
if len(chunks) > 2:
|
||||
with Pool(cpu_count()) as pool:
|
||||
for r in pool.starmap(_score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
|
||||
for r in pool.starmap(self._score_chunk, zip(chunks, itertools.repeat(prelude_file), itertools.repeat(prelude_ratio))):
|
||||
scores.append(r)
|
||||
else:
|
||||
for c in chunks:
|
||||
scores.append(_score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
|
||||
scores.append(self._score_chunk(c, prelude_file=prelude_file, prelude_ratio=prelude_ratio))
|
||||
ssum : float = 0.0
|
||||
for i, s in enumerate(scores):
|
||||
if s[0] == 'AI':
|
||||
ssum -= s[1] * (len(chunks[i]) / len(contents))
|
||||
else:
|
||||
ssum += s[1] * (len(chunks[i]) / len(contents))
|
||||
sa : float = ssum# / len(scores)
|
||||
sa : float = ssum
|
||||
if sa < 0:
|
||||
return ('AI', abs(sa))
|
||||
else:
|
||||
return ('Human', abs(sa))
|
||||
|
||||
class EnsembledZippy:
|
||||
'''
|
||||
Class to wrap the functionality of Zippy into an ensemble
|
||||
'''
|
||||
def __init__(self) -> None:
|
||||
self.ENGINES = [CompressionEngine.LZMA, CompressionEngine.BROTLI, CompressionEngine.ZLIB]
|
||||
self.WEIGHTS = [.33, .33, .33]
|
||||
self.component_classifiers : list[AIDetector] = []
|
||||
for i, e in enumerate(self.ENGINES):
|
||||
self.component_classifiers.append(Zippy(e))
|
||||
|
||||
def _combine_scores(self, scores : list[Score]) -> Score:
|
||||
ssum : float = 0.0
|
||||
for i, s in enumerate(scores):
|
||||
if s[0] == 'AI':
|
||||
ssum -= s[1] * self.WEIGHTS[i]
|
||||
else:
|
||||
ssum += s[1] * self.WEIGHTS[i]
|
||||
sa : float = ssum
|
||||
if sa < 0:
|
||||
return ('AI', abs(sa))
|
||||
else:
|
||||
return ('Human', abs(sa))
|
||||
|
||||
def run_on_file(self, filename : str) -> Optional[Score]:
|
||||
'''Given a filename (and an optional number of decimal places to round to) returns the score for the contents of that file'''
|
||||
with open(filename, 'r') as fp:
|
||||
txt = fp.read()
|
||||
scores = []
|
||||
for c in self.component_classifiers:
|
||||
scores.append(c.score_text(txt))
|
||||
return self._combine_scores(scores)
|
||||
|
||||
def _score_chunk(self, c : str, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Score:
|
||||
scores = []
|
||||
for c in self.component_classifiers:
|
||||
scores.append(c.score_text(c))
|
||||
return self._combine_scores(scores)
|
||||
|
||||
def run_on_file_chunked(self, filename : str, chunk_size : int = 1500, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
'''
|
||||
Given a filename (and an optional chunk size and number of decimal places to round to) returns the score for the contents of that file.
|
||||
This function chunks the file into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||
'''
|
||||
with open(filename, 'r') as fp:
|
||||
contents = fp.read()
|
||||
return self.run_on_text_chunked(contents, chunk_size)
|
||||
|
||||
def run_on_text_chunked(self, s : str, chunk_size : int = 1500, prelude_file : Optional[str] = None, prelude_ratio : Optional[float] = None) -> Optional[Score]:
|
||||
'''
|
||||
Given a string (and an optional chunk size and number of decimal places to round to) returns the score for the passed string.
|
||||
This function chunks the input into at most chunk_size parts to score separately, then returns an average. This prevents a very large input
|
||||
being skewed because its compression ratio starts to overwhelm the prelude file.
|
||||
'''
|
||||
scores = []
|
||||
for c in self.component_classifiers:
|
||||
scores.append(c.run_on_text_chunked(s, chunk_size=chunk_size))
|
||||
return self._combine_scores(scores)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-e", choices=['zlib', 'lzma'], help='Which compression engine to use: lzma or zlib', default='lzma', required=False)
|
||||
parser.add_argument("-e", choices=['zlib', 'lzma', 'brotli', 'ensemble'], help='Which compression engine to use: lzma, zlib, brotli, or an ensemble of all engines', default='lzma', required=False)
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("-s", help='Read from stdin until EOF is reached instead of from a file', required=False, action='store_true')
|
||||
group.add_argument("sample_files", nargs='*', help='Text file(s) containing the sample to classify', default="")
|
||||
args = parser.parse_args()
|
||||
engine = 'lzma'
|
||||
if args.e:
|
||||
if args.e == 'lzma':
|
||||
ENGINE = CompressionEngine.LZMA
|
||||
engine = CompressionEngine.LZMA
|
||||
elif args.e == 'zlib':
|
||||
ENGINE = CompressionEngine.ZLIB
|
||||
engine = CompressionEngine.ZLIB
|
||||
elif args.e == 'brotli':
|
||||
engine = CompressionEngine.BROTLI
|
||||
elif args.e == 'ensemble':
|
||||
engine = None
|
||||
if args.s:
|
||||
print(str(run_on_text_chunked(''.join(list(sys.stdin)))))
|
||||
if engine:
|
||||
z = Zippy(engine)
|
||||
else:
|
||||
z = EnsembledZippy()
|
||||
print(str(z.run_on_text_chunked(''.join(list(sys.stdin)))))
|
||||
elif len(args.sample_files) == 0:
|
||||
print("Please call with either a list of text files to analyze, or the -s flag to classify stdin.\nCall with the -h flag for additional help.")
|
||||
else:
|
||||
if engine:
|
||||
z = Zippy(engine)
|
||||
else:
|
||||
z = EnsembledZippy()
|
||||
for f in args.sample_files:
|
||||
print(f)
|
||||
if os.path.isfile(f):
|
||||
print(str(run_on_file_chunked(f)))
|
||||
print(str(z.run_on_file_chunked(f)))
|
||||
|
|
Ładowanie…
Reference in New Issue