Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions apps/dav/lib/Connector/Sabre/File.php
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public function put($data) {
}
}

$lengthHeader = $this->request->getHeader('content-length');
$expected = $lengthHeader !== '' ? (int)$lengthHeader : null;

if ($partStorage->instanceOfStorage(IWriteStreamStorage::class)) {
$isEOF = false;
$wrappedData = CallbackWrapper::wrap($data, null, null, null, null, function ($stream) use (&$isEOF): void {
Expand All @@ -212,7 +215,7 @@ public function put($data) {
$count = -1;
try {
/** @var IWriteStreamStorage $partStorage */
$count = $partStorage->writeStream($internalPartPath, $wrappedData);
$count = $partStorage->writeStream($internalPartPath, $wrappedData, $expected);
} catch (GenericFileException $e) {
$logger = Server::get(LoggerInterface::class);
$logger->error('Error while writing stream to storage: ' . $e->getMessage(), ['exception' => $e, 'app' => 'webdav']);
Expand All @@ -232,10 +235,7 @@ public function put($data) {
[$count, $result] = \OC_Helper::streamCopy($data, $target);
fclose($target);
}

$lengthHeader = $this->request->getHeader('content-length');
$expected = $lengthHeader !== '' ? (int)$lengthHeader : -1;
if ($result === false && $expected >= 0) {
if ($result === false && $expected !== null) {
throw new Exception(
$this->l10n->t(
'Error while copying file to target location (copied: %1$s, expected filesize: %2$s)',
Expand All @@ -250,7 +250,7 @@ public function put($data) {
// if content length is sent by client:
// double check if the file was fully received
// compare expected and actual size
if ($expected >= 0
if ($expected !== null
&& $expected !== $count
&& $this->request->getMethod() === 'PUT'
) {
Expand Down
9 changes: 9 additions & 0 deletions lib/private/Files/ObjectStore/IObjectStoreMetaData.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ public function getObjectMetaData(string $urn): array;
* @since 32.0.0
*/
public function listObjects(string $prefix = ''): \Iterator;

/**
* @param string $urn the unified resource name used to identify the object
* @param resource $stream stream with the data to write
* @param ObjectMetaData $metaData the metadata to set for the object
* @throws \Exception when something goes wrong, message will be logged
* @since 32.0.0
*/
public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void;
}
37 changes: 23 additions & 14 deletions lib/private/Files/ObjectStore/ObjectStoreStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ public function writeStream(string $path, $stream, ?int $size = null): int {

$mimetypeDetector = \OC::$server->getMimeTypeDetector();
$mimetype = $mimetypeDetector->detectPath($path);
$metadata = [
'mimetype' => $mimetype,
];
if ($size) {
$metadata['size'] = $size;
}

$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($path);
Expand All @@ -500,24 +506,27 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
$urn = $this->getURN($fileId);
try {
//upload to object storage
if ($size === null) {
$countStream = CountWrapper::wrap($stream, function ($writtenSize) use ($fileId, &$size) {

$totalWritten = 0;
$countStream = CountWrapper::wrap($stream, function ($writtenSize) use ($fileId, $size, $exists, &$totalWritten) {
if (is_null($size) && !$exists) {
$this->getCache()->update($fileId, [
'size' => $writtenSize,
]);
$size = $writtenSize;
});
$this->objectStore->writeObject($urn, $countStream, $mimetype);
if (is_resource($countStream)) {
fclose($countStream);
}
$stat['size'] = $size;
$totalWritten = $writtenSize;
});

if ($this->objectStore instanceof IObjectStoreMetaData) {
$this->objectStore->writeObjectWithMetaData($urn, $countStream, $metadata);
} else {
$this->objectStore->writeObject($urn, $stream, $mimetype);
if (is_resource($stream)) {
fclose($stream);
}
$this->objectStore->writeObject($urn, $countStream, $metadata['mimetype']);
}
if (is_resource($countStream)) {
fclose($countStream);
}

$stat['size'] = $totalWritten;
} catch (\Exception $ex) {
if (!$exists) {
/*
Expand All @@ -541,7 +550,7 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
]
);
}
throw $ex; // make this bubble up
throw new GenericFileException('Error while writing stream to object store', 0, $ex);
}

if ($exists) {
Expand All @@ -557,7 +566,7 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
}
}

return $size;
return $totalWritten;
}

public function getObjectStore(): IObjectStore {
Expand Down
66 changes: 45 additions & 21 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
namespace OC\Files\ObjectStore;

use Aws\Command;
use Aws\Exception\MultipartUploadException;
use Aws\S3\Exception\S3MultipartUploadException;
use Aws\S3\MultipartCopy;
use Aws\S3\MultipartUploader;
Expand Down Expand Up @@ -83,18 +85,24 @@ public function readObject($urn) {
*
* @param string $urn the unified resource name used to identify the object
* @param StreamInterface $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
* @param array $metaData the metadata to set for the object
* @throws \Exception when something goes wrong, message will be logged
*/
protected function writeSingle(string $urn, StreamInterface $stream, ?string $mimetype = null): void {
$this->getConnection()->putObject([
protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void {
$args = [
'Bucket' => $this->bucket,
'Key' => $urn,
'Body' => $stream,
'ACL' => 'private',
'ContentType' => $mimetype,
'ContentType' => $metaData['mimetype'] ?? null,
'StorageClass' => $this->storageClass,
] + $this->getSSECParameters());
] + $this->getSSECParameters();

if ($size = $stream->getSize()) {
$args['ContentLength'] = $size;
}

$this->getConnection()->putObject($args);
}


Expand All @@ -103,15 +111,17 @@ protected function writeSingle(string $urn, StreamInterface $stream, ?string $mi
*
* @param string $urn the unified resource name used to identify the object
* @param StreamInterface $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object
* @param array $metaData the metadata to set for the object
* @throws \Exception when something goes wrong, message will be logged
*/
protected function writeMultiPart(string $urn, StreamInterface $stream, ?string $mimetype = null): void {
protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void {
$attempts = 0;
$uploaded = false;
$concurrency = $this->concurrency;
$exception = null;
$state = null;
$size = $stream->getSize();
$totalWritten = 0;

// retry multipart upload once with concurrency at half on failure
while (!$uploaded && $attempts <= 1) {
Expand All @@ -122,9 +132,18 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, ?string
'part_size' => $this->uploadPartSize,
'state' => $state,
'params' => [
'ContentType' => $mimetype,
'ContentType' => $metaData['mimetype'] ?? null,
'StorageClass' => $this->storageClass,
] + $this->getSSECParameters(),
'before_upload' => function (Command $command) use (&$totalWritten) {
$totalWritten += $command['ContentLength'];
},
'before_complete' => function ($_command) use (&$totalWritten, $size, &$uploader, &$attempts) {
if ($size !== null && $totalWritten != $size) {
$e = new \Exception('Incomplete multi part upload, expected ' . $size . ' bytes, wrote ' . $totalWritten);
throw new MultipartUploadException($uploader->getState(), $e);
}
},
]);

try {
Expand All @@ -141,6 +160,9 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, ?string
if ($stream->isSeekable()) {
$stream->rewind();
}
} catch (MultipartUploadException $e) {
$exception = $e;
break;
}
}

Expand All @@ -156,17 +178,19 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, ?string
}
}


/**
* @param string $urn the unified resource name used to identify the object
* @param resource $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
* @throws \Exception when something goes wrong, message will be logged
* @since 7.0.0
*/
public function writeObject($urn, $stream, ?string $mimetype = null) {
$metaData = [];
if ($mimetype) {
$metaData['mimetype'] = $mimetype;
}
$this->writeObjectWithMetaData($urn, $stream, $metaData);
}

public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void {
$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
$psrStream = Utils::streamFor($stream);
$psrStream = Utils::streamFor($stream, [
'size' => $metaData['size'] ?? null,
]);


$size = $psrStream->getSize();
Expand All @@ -179,16 +203,16 @@ public function writeObject($urn, $stream, ?string $mimetype = null) {
$buffer->seek(0);
if ($buffer->getSize() < $this->putSizeLimit) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
$this->writeSingle($urn, $buffer, $metaData);
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
$this->writeMultiPart($urn, $loadStream, $mimetype);
$this->writeMultiPart($urn, $loadStream, $metaData);
}
} else {
if ($size < $this->putSizeLimit) {
$this->writeSingle($urn, $psrStream, $mimetype);
$this->writeSingle($urn, $psrStream, $metaData);
} else {
$this->writeMultiPart($urn, $psrStream, $mimetype);
$this->writeMultiPart($urn, $psrStream, $metaData);
}
}
$psrStream->close();
Expand Down
Loading