Skip to content
62 changes: 61 additions & 1 deletion libarchive/entry.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from contextlib import contextmanager
from ctypes import create_string_buffer
from ctypes import create_string_buffer, string_at
from enum import IntEnum
import math

from . import ffi
from .exception import ArchiveError


class FileType(IntEnum):
Expand Down Expand Up @@ -86,6 +87,7 @@ def modify(self, header_codec=None, **attributes):
rdev (int | Tuple[int, int]): device number, if the file is a device
rdevmajor (int): major part of the device number
rdevminor (int): minor part of the device number
stored_digests (dict[str, bytes]): hashes of the file's contents
"""
if header_codec:
self.header_codec = header_codec
Expand Down Expand Up @@ -433,6 +435,64 @@ def rdevminor(self):
def rdevminor(self, value):
ffi.entry_set_rdevminor(self._entry_p, value)

@property
def stored_digests(self):
"""The file's hashes stored in the archive.

libarchive only supports reading and writing digests from and to 'mtree'
files. Setting the digests requires at least version 3.8.0 of libarchive
(released in May 2025). It also requires including the names of the
digest algorithms in the string of options passed to the archive writer
(e.g. `file_writer(archive_path, 'mtree', options='md5,rmd160,sha256')`).
"""
return {name: self.get_stored_digest(name) for name in ffi.DIGEST_ALGORITHMS}

@stored_digests.setter
def stored_digests(self, values):
for name, value in values.items():
self.set_stored_digest(name, value)

def get_stored_digest(self, algorithm_name):
algorithm = ffi.DIGEST_ALGORITHMS[algorithm_name]
try:
ptr = ffi.entry_digest(self._entry_p, algorithm.libarchive_id)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, path "
f"{ffi.libarchive_path}) doesn't support reading entry digests"
) from None
except ArchiveError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, path "
f"{ffi.libarchive_path}) doesn't support {algorithm_name} digests"
) from None
return string_at(ptr, algorithm.bytes_length)

def set_stored_digest(self, algorithm_name, value):
algorithm = ffi.DIGEST_ALGORITHMS[algorithm_name]
expected_length = algorithm.bytes_length
if len(value) != expected_length:
raise ValueError(
f"invalid input digest: expected {expected_length} bytes, "
f"got {len(value)}"
)
try:
retcode = ffi.entry_set_digest(
self._entry_p,
algorithm.libarchive_id,
(expected_length * ffi.c_ubyte).from_buffer_copy(value)
)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, path "
f"{ffi.libarchive_path}) doesn't support writing entry digests"
) from None
if retcode < 0:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, path "
f"{ffi.libarchive_path}) doesn't support {algorithm_name} digests"
) from None


class ConsumedArchiveEntry(ArchiveEntry):

Expand Down
41 changes: 40 additions & 1 deletion libarchive/ffi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ctypes import (
c_char_p, c_int, c_uint, c_long, c_longlong, c_size_t, c_int64,
c_void_p, c_wchar_p, CFUNCTYPE, POINTER,
c_ubyte, c_void_p, c_wchar_p, CFUNCTYPE, POINTER,
)

try:
Expand Down Expand Up @@ -365,3 +365,42 @@ def get_write_filter_function(filter_name):
f"the libarchive being used (version {version_number()}, "
f"path {libarchive_path}) doesn't support encryption"
)


# archive entry digests (a.k.a. hashes)

class DigestAlgorithm:
__slots__ = ('name', 'libarchive_id', 'bytes_length')

def __init__(self, name, libarchive_id, bytes_length):
self.name = name
self.libarchive_id = libarchive_id
self.bytes_length = bytes_length


DIGEST_ALGORITHMS = {
'md5': DigestAlgorithm('md5', libarchive_id=1, bytes_length=16),
'rmd160': DigestAlgorithm('rmd160', libarchive_id=2, bytes_length=20),
'sha1': DigestAlgorithm('sha1', libarchive_id=3, bytes_length=20),
'sha256': DigestAlgorithm('sha256', libarchive_id=4, bytes_length=32),
'sha384': DigestAlgorithm('sha384', libarchive_id=5, bytes_length=48),
'sha512': DigestAlgorithm('sha512', libarchive_id=6, bytes_length=64),
}

try:
ffi('entry_digest', [c_archive_entry_p, c_int], POINTER(c_ubyte), check_null)
except AttributeError:
logger.info(
f"the libarchive being used (version {version_number()}, "
f"path {libarchive_path}) doesn't support reading entry digests"
)

try:
ffi('entry_set_digest',
[c_archive_entry_p, c_int, POINTER(c_ubyte)],
c_int, check_int)
except AttributeError:
logger.info(
f"the libarchive being used (version {version_number()}, "
f"path {libarchive_path}) doesn't support modifying entry digests"
)
46 changes: 39 additions & 7 deletions tests/test_entry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-

from codecs import open
import json
import locale
from os import environ, stat
Expand All @@ -9,14 +6,12 @@

import pytest

from libarchive import ArchiveError, memory_reader, memory_writer
from libarchive import ArchiveError, ffi, file_writer, memory_reader, memory_writer
from libarchive.entry import ArchiveEntry, ConsumedArchiveEntry, PassedArchiveEntry

from . import data_dir, get_entries, get_tarinfos


text_type = unicode if str is bytes else str # noqa: F821

locale.setlocale(locale.LC_ALL, '')

# needed for sane time stamp comparison
Expand Down Expand Up @@ -106,7 +101,7 @@ def check_entries(test_file, regen=False, ignore=''):
# Normalize all unicode (can vary depending on the system)
for d in (e1, e2):
for key in d:
if isinstance(d[key], text_type):
if isinstance(d[key], str):
d[key] = unicodedata.normalize('NFC', d[key])
assert e1 == e2

Expand Down Expand Up @@ -155,3 +150,40 @@ def test_non_ASCII_encoding_of_file_metadata():
with memory_reader(buf, header_codec='cp037') as archive:
entry = next(iter(archive))
assert entry.pathname == file_name


fake_hashes = dict(
md5=b'!' * 16,
rmd160=b'!' * 20,
sha1=b'!' * 20,
sha256=b'!' * 32,
sha384=b'!' * 48,
sha512=b'!' * 64,
)
mtree = (
'#mtree\n'
'./empty.txt nlink=0 time=0.0 mode=664 gid=0 uid=0 type=file size=42 '
f'md5digest={'21'*16} rmd160digest={'21'*20} sha1digest={'21'*20} '
f'sha256digest={'21'*32} sha384digest={'21'*48} sha512digest={'21'*64}\n'
)


def test_reading_entry_digests(tmpdir):
with memory_reader(mtree.encode('ascii')) as archive:
entry = next(iter(archive))
assert entry.stored_digests == fake_hashes


@pytest.mark.xfail(
condition=ffi.version_number() < 3008000,
reason="libarchive < 3.8",
)
def test_writing_entry_digests(tmpdir):
archive_path = str(tmpdir / 'mtree')
options = ','.join(fake_hashes.keys())
with file_writer(archive_path, 'mtree', options=options) as archive:
# Add an empty file, with fake hashes.
archive.add_file_from_memory('empty.txt', 42, (), stored_digests=fake_hashes)
with open(archive_path) as f:
libarchive_mtree = f.read()
assert libarchive_mtree == mtree