utarfile: Support creating/appending tar files.

This adds a utarfile-write extension package that adds the ability to
create and append to tar files.

Work done by Doug Ellis <dpwe@ee.columbia.edu>.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
pull/623/merge
Dan Ellis 2023-05-14 15:16:21 -04:00 zatwierdzone przez Jim Mussared
rodzic a1b9aa934c
commit 7128d423c2
8 zmienionych plików z 315 dodań i 101 usunięć

Wyświetl plik

@ -0,0 +1,15 @@
""" tar append writes additional files to the end of an existing tar file."""
import os
import sys
import utarfile
if len(sys.argv) < 2:
raise ValueError("Usage: %s appendfile.tar newinputfile1 ..." % sys.argv[0])
tarfile = sys.argv[1]
if not tarfile.endswith(".tar"):
raise ValueError("Filename %s does not end with .tar" % tarfile)
with utarfile.TarFile(sys.argv[1], "a") as t:
for filename in sys.argv[2:]:
t.add(filename)

Wyświetl plik

@ -0,0 +1,14 @@
""" tar create writes a new tar file containing the specified files."""
import sys
import utarfile
if len(sys.argv) < 2:
raise ValueError("Usage: %s outputfile.tar inputfile1 ..." % sys.argv[0])
tarfile = sys.argv[1]
if not tarfile.endswith(".tar"):
raise ValueError("Filename %s does not end with .tar" % tarfile)
with utarfile.TarFile(sys.argv[1], "w") as t:
for filename in sys.argv[2:]:
t.add(filename)

Wyświetl plik

@ -0,0 +1,4 @@
metadata(description="Adds write (create/append) support to utarfile.", version="0.1")
require("utarfile")
package("utarfile")

Wyświetl plik

@ -0,0 +1,126 @@
"""Additions to the TarFile class to support creating and appending tar files.
The methods defined below in are injected into the TarFile class in the
utarfile package.
"""
import uctypes
import os
# Extended subset of tar header fields including the ones we'll write.
# http://www.gnu.org/software/tar/manual/html_node/Standard.html
_TAR_HEADER = {
"name": (uctypes.ARRAY | 0, uctypes.UINT8 | 100),
"mode": (uctypes.ARRAY | 100, uctypes.UINT8 | 7),
"uid": (uctypes.ARRAY | 108, uctypes.UINT8 | 7),
"gid": (uctypes.ARRAY | 116, uctypes.UINT8 | 7),
"size": (uctypes.ARRAY | 124, uctypes.UINT8 | 12),
"mtime": (uctypes.ARRAY | 136, uctypes.UINT8 | 12),
"chksum": (uctypes.ARRAY | 148, uctypes.UINT8 | 8),
"typeflag": (uctypes.ARRAY | 156, uctypes.UINT8 | 1),
}
_NUL = const(b"\0") # the null character
_BLOCKSIZE = const(512) # length of processing blocks
_RECORDSIZE = const(_BLOCKSIZE * 20) # length of records
# Write a string into a bytearray by copying each byte.
def _setstring(b, s, maxlen):
for i, c in enumerate(s.encode("utf-8")[:maxlen]):
b[i] = c
def _open_write(self, name, mode, fileobj):
if mode == "w":
if not fileobj:
self.f = open(name, "wb")
else:
self.f = fileobj
elif mode == "a":
if not fileobj:
self.f = open(name, "r+b")
else:
self.f = fileobj
# Read through the existing file.
while self.next():
pass
# Position at start of end block.
self.f.seek(self.offset)
else:
raise ValueError("mode " + mode + " not supported.")
def _close_write(self):
# Must be called to complete writing a tar file.
if self.mode == "w":
self.f.write(_NUL * (_BLOCKSIZE * 2))
self.offset += _BLOCKSIZE * 2
remainder = self.offset % _RECORDSIZE
if remainder:
self.f.write(_NUL * (_RECORDSIZE - remainder))
def addfile(self, tarinfo, fileobj=None):
# Write the header: 100 bytes of name, 8 bytes of mode in octal...
buf = bytearray(_BLOCKSIZE)
name = tarinfo.name
size = tarinfo.size
if tarinfo.isdir():
size = 0
if not name.endswith("/"):
name += "/"
hdr = uctypes.struct(uctypes.addressof(buf), _TAR_HEADER, uctypes.LITTLE_ENDIAN)
_setstring(hdr.name, name, 100)
_setstring(hdr.mode, "%06o " % (tarinfo.mode & 0o7777), 7)
_setstring(hdr.uid, "%06o " % tarinfo.uid, 7)
_setstring(hdr.gid, "%06o " % tarinfo.gid, 7)
_setstring(hdr.size, "%011o " % size, 12)
_setstring(hdr.mtime, "%011o " % tarinfo.mtime, 12)
_setstring(hdr.typeflag, "5" if tarinfo.isdir() else "0", 1)
# Checksum is calculated with checksum field all blanks.
_setstring(hdr.chksum, " " * 8, 8)
# Calculate and insert the actual checksum.
chksum = sum(buf)
_setstring(hdr.chksum, "%06o\0" % chksum, 7)
# Emit the header.
self.f.write(buf)
self.offset += len(buf)
# Copy the file contents, if any.
if fileobj:
n_bytes = self.f.write(fileobj.read())
self.offset += n_bytes
remains = -n_bytes & (_BLOCKSIZE - 1) # == 0b111111111
if remains:
buf = bytearray(remains)
self.f.write(buf)
self.offset += len(buf)
def add(self, name, recursive=True):
from . import TarInfo
tarinfo = TarInfo(name)
try:
stat = os.stat(name)
tarinfo.mode = stat[0]
tarinfo.uid = stat[4]
tarinfo.gid = stat[5]
tarinfo.size = stat[6]
tarinfo.mtime = stat[8]
except OSError:
print("Cannot stat", name, " - skipping.")
return
if not (tarinfo.isdir() or tarinfo.isreg()):
# We only accept directories or regular files.
print(name, "is not a directory or regular file - skipping.")
return
if tarinfo.isdir():
self.addfile(tarinfo)
if recursive:
for f in os.ilistdir(name):
self.add(name + "/" + f[0], recursive)
else: # type == REGTYPE
self.addfile(tarinfo, open(name, "rb"))

Wyświetl plik

@ -1,13 +1,16 @@
import sys
import os
import shutil
import utarfile
if len(sys.argv) < 2:
raise ValueError("Usage: %s inputfile.tar" % sys.argv[0])
t = utarfile.TarFile(sys.argv[1])
for i in t:
print(i)
print(i.name)
if i.type == utarfile.DIRTYPE:
os.makedirs(i.name)
os.mkdir(i.name)
else:
f = t.extractfile(i)
shutil.copyfileobj(f, open(i.name, "wb"))
with open(i.name, "wb") as of:
of.write(f.read())

Wyświetl plik

@ -1,5 +1,5 @@
metadata(description="Lightweight tarfile module subset", version="0.3.2")
metadata(description="Read-only implementation of Python's tarfile.", version="0.4.0")
# Originally written by Paul Sokolovsky.
module("utarfile.py")
package("utarfile")

Wyświetl plik

@ -1,95 +0,0 @@
import uctypes
# http://www.gnu.org/software/tar/manual/html_node/Standard.html
TAR_HEADER = {
"name": (uctypes.ARRAY | 0, uctypes.UINT8 | 100),
"size": (uctypes.ARRAY | 124, uctypes.UINT8 | 11),
}
DIRTYPE = "dir"
REGTYPE = "file"
def roundup(val, align):
return (val + align - 1) & ~(align - 1)
class FileSection:
def __init__(self, f, content_len, aligned_len):
self.f = f
self.content_len = content_len
self.align = aligned_len - content_len
def read(self, sz=65536):
if self.content_len == 0:
return b""
if sz > self.content_len:
sz = self.content_len
data = self.f.read(sz)
sz = len(data)
self.content_len -= sz
return data
def readinto(self, buf):
if self.content_len == 0:
return 0
if len(buf) > self.content_len:
buf = memoryview(buf)[: self.content_len]
sz = self.f.readinto(buf)
self.content_len -= sz
return sz
def skip(self):
sz = self.content_len + self.align
if sz:
buf = bytearray(16)
while sz:
s = min(sz, 16)
self.f.readinto(buf, s)
sz -= s
class TarInfo:
def __str__(self):
return "TarInfo(%r, %s, %d)" % (self.name, self.type, self.size)
class TarFile:
def __init__(self, name=None, fileobj=None):
if fileobj:
self.f = fileobj
else:
self.f = open(name, "rb")
self.subf = None
def next(self):
if self.subf:
self.subf.skip()
buf = self.f.read(512)
if not buf:
return None
h = uctypes.struct(uctypes.addressof(buf), TAR_HEADER, uctypes.LITTLE_ENDIAN)
# Empty block means end of archive
if h.name[0] == 0:
return None
d = TarInfo()
d.name = str(h.name, "utf-8").rstrip("\0")
d.size = int(bytes(h.size), 8)
d.type = [REGTYPE, DIRTYPE][d.name[-1] == "/"]
self.subf = d.subf = FileSection(self.f, d.size, roundup(d.size, 512))
return d
def __iter__(self):
return self
def __next__(self):
v = self.next()
if v is None:
raise StopIteration
return v
def extractfile(self, tarinfo):
return tarinfo.subf

Wyświetl plik

@ -0,0 +1,147 @@
"""Subset of cpython tarfile class methods needed to decode tar files."""
import uctypes
# Minimal set of tar header fields for reading.
# http://www.gnu.org/software/tar/manual/html_node/Standard.html
_TAR_HEADER = {
"name": (uctypes.ARRAY | 0, uctypes.UINT8 | 100),
"size": (uctypes.ARRAY | 124, uctypes.UINT8 | 12),
}
DIRTYPE = const("dir")
REGTYPE = const("file")
# Constants for TarInfo.isdir, isreg.
_S_IFMT = const(0o170000)
_S_IFREG = const(0o100000)
_S_IFDIR = const(0o040000)
_BLOCKSIZE = const(512) # length of processing blocks
def _roundup(val, align):
return (val + align - 1) & ~(align - 1)
class FileSection:
def __init__(self, f, content_len, aligned_len):
self.f = f
self.content_len = content_len
self.align = aligned_len - content_len
def read(self, sz=65536):
if self.content_len == 0:
return b""
if sz > self.content_len:
sz = self.content_len
data = self.f.read(sz)
sz = len(data)
self.content_len -= sz
return data
def readinto(self, buf):
if self.content_len == 0:
return 0
if len(buf) > self.content_len:
buf = memoryview(buf)[: self.content_len]
sz = self.f.readinto(buf)
self.content_len -= sz
return sz
def skip(self):
sz = self.content_len + self.align
if sz:
buf = bytearray(16)
while sz:
s = min(sz, 16)
self.f.readinto(buf, s)
sz -= s
class TarInfo:
def __init__(self, name=""):
self.name = name
self.mode = _S_IFDIR if self.name[-1] == "/" else _S_IFREG
@property
def type(self):
return DIRTYPE if self.isdir() else REGTYPE
def __str__(self):
return "TarInfo(%r, %s, %d)" % (self.name, self.type, self.size)
def isdir(self):
return (self.mode & _S_IFMT) == _S_IFDIR
def isreg(self):
return (self.mode & _S_IFMT) == _S_IFREG
class TarFile:
def __init__(self, name=None, mode="r", fileobj=None):
self.subf = None
self.mode = mode
self.offset = 0
if mode == "r":
if fileobj:
self.f = fileobj
else:
self.f = open(name, "rb")
else:
try:
self._open_write(name=name, mode=mode, fileobj=fileobj)
except AttributeError:
raise NotImplementedError("Install utarfile-write")
def __enter__(self):
return self
def __exit__(self, unused_type, unused_value, unused_traceback):
self.close()
def next(self):
if self.subf:
self.subf.skip()
buf = self.f.read(_BLOCKSIZE)
if not buf:
return None
h = uctypes.struct(uctypes.addressof(buf), _TAR_HEADER, uctypes.LITTLE_ENDIAN)
# Empty block means end of archive
if h.name[0] == 0:
return None
# Update the offset once we're sure it's not the run-out.
self.offset += len(buf)
d = TarInfo(str(h.name, "utf-8").rstrip("\0"))
d.size = int(bytes(h.size), 8)
self.subf = d.subf = FileSection(self.f, d.size, _roundup(d.size, _BLOCKSIZE))
self.offset += _roundup(d.size, _BLOCKSIZE)
return d
def __iter__(self):
return self
def __next__(self):
v = self.next()
if v is None:
raise StopIteration
return v
def extractfile(self, tarinfo):
return tarinfo.subf
def close(self):
try:
self._close_write()
except AttributeError:
pass
self.f.close()
# Add additional methods to support write/append from the utarfile-write package.
try:
from .write import _open_write, _close_write, addfile, add
except ImportError:
pass