diff --git a/paddle/fluid/memory/allocation/mmap_allocator.cc b/paddle/fluid/memory/allocation/mmap_allocator.cc index 5e857f9acb7171..104e7362eb541d 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.cc +++ b/paddle/fluid/memory/allocation/mmap_allocator.cc @@ -54,11 +54,14 @@ struct CountInfo { std::atomic refcount; }; -void AllocateMemoryMap( - std::string filename, int flags, size_t size, void **map_ptr_, int *fd_) { +void AllocateMemoryMap(std::string filename, + int *shared_fd, + int flags, + size_t size, + void **map_ptr_) { // TODO(@ZHUI): support win32 int file_flags = 0; - int fd = -1; + int fd = *shared_fd; if (flags & MAPPED_SHAREDMEM) { file_flags = O_RDWR | O_CREAT; } else { @@ -71,7 +74,7 @@ void AllocateMemoryMap( file_flags &= ~O_CREAT; } - if (!(flags & MAPPED_FROMFD)) { + if (!(flags & MAPPED_FROMFD) && fd == -1) { if (flags & MAPPED_SHAREDMEM) { fd = shm_open(filename.c_str(), file_flags, (mode_t)0600); PADDLE_ENFORCE_NE( @@ -83,8 +86,6 @@ void AllocateMemoryMap( VLOG(6) << "shm_open: " << filename; MemoryMapFdSet::Instance().Insert(filename); } - } else { - fd = -1; } PADDLE_ENFORCE_EQ(ftruncate(fd, size), @@ -98,32 +99,38 @@ void AllocateMemoryMap( *map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0); } + if (flags & MAPPED_UNLINK) { + VLOG(6) << "shm_unlink: " << filename; + shm_unlink(filename.c_str()); + } + PADDLE_ENFORCE_NE(*map_ptr_, MAP_FAILED, platform::errors::Unavailable( "Memory map failed when create shared memory.")); - if (flags & MAPPED_KEEPFD) { - *fd_ = fd; + *shared_fd = fd; + VLOG(6) << "keep fd: " << *shared_fd; } else { PADDLE_ENFORCE_NE(::close(fd), -1, platform::errors::Unavailable( "Error closing memory maped file <", filename, ">")); - *fd_ = -1; + *shared_fd = -1; } } std::shared_ptr AllocateRefcountedMemoryMapAllocation(std::string filename, + int shared_fd, int flags, size_t size, int buffer_id) { - int fd = -1; + int fd = shared_fd; void *base_ptr = nullptr; if (buffer_id == -1) { - AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd); + AllocateMemoryMap(filename, &fd, flags, size + mmap_alignment, &base_ptr); VLOG(4) << "Create and mmap a new shm: " << filename; } else { base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_; @@ -132,7 +139,7 @@ AllocateRefcountedMemoryMapAllocation(std::string filename, void *aliged_base_ptr = static_cast(static_cast(base_ptr) + mmap_alignment); return std::make_shared( - aliged_base_ptr, size, filename, flags, fd, buffer_id); + aliged_base_ptr, size, filename, fd, flags, buffer_id); } RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation( @@ -145,11 +152,22 @@ RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation( : MemoryMapAllocation(ptr, size, ipc_name, fd, flags) { // must reset base ptr first. buffer_id_ = buffer_id; + fd_ = fd; + flags_ = flags; resetBaseptr(); initializeRefercount(); } void MemoryMapAllocation::close() { + if (!closed_fd_) { + closed_fd_ = true; + if (flags_ & MAPPED_KEEPFD) { + PADDLE_ENFORCE_NE(::close(fd_), + -1, + platform::errors::Unavailable( + "Error closing file descriptor <", fd_, ">")); + } + } if (closed_) { return; } @@ -193,6 +211,15 @@ void RefcountedMemoryMapAllocation::close() { void *data = map_ptr_; CountInfo *info = reinterpret_cast(data); --info->refcount; + if (flags_ & MAPPED_KEEPFD) { + closed_fd_ = true; + PADDLE_ENFORCE_NE(::close(fd_), + -1, + platform::errors::Unavailable( + "Error closing file descriptor <", fd_, ">")); + VLOG(6) << "close fd: " << fd_; + } + if (FLAGS_use_shm_cache && buffer_id_ != -1) { return; } else { @@ -260,6 +287,7 @@ std::shared_ptr AllocateMemoryMapWriterAllocation( const std::string &ipc_name = GetIPCName(); int flags = O_RDWR | O_CREAT; int fd = shm_open(ipc_name.c_str(), flags, 0600); + PADDLE_ENFORCE_NE(fd, -1, platform::errors::Unavailable( @@ -283,7 +311,6 @@ std::shared_ptr RebuildMemoryMapReaderAllocation( const std::string &ipc_name, size_t size) { int flags = O_RDWR | O_CREAT; flags &= ~O_CREAT; - int fd = shm_open(ipc_name.c_str(), flags, 0600); PADDLE_ENFORCE_NE(fd, -1, diff --git a/paddle/fluid/memory/allocation/mmap_allocator.h b/paddle/fluid/memory/allocation/mmap_allocator.h index 412e3a3545769d..64a3ae9de7658c 100644 --- a/paddle/fluid/memory/allocation/mmap_allocator.h +++ b/paddle/fluid/memory/allocation/mmap_allocator.h @@ -44,13 +44,17 @@ enum MappedModes { class MemoryMapAllocation : public Allocation { public: - explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name) + explicit MemoryMapAllocation(void *ptr, + size_t size, + std::string ipc_name, + int fd) : Allocation(ptr, size, platform::CPUPlace()), ipc_name_(std::move(ipc_name)), + fd_(fd), map_ptr_(ptr), map_size_(size) {} explicit MemoryMapAllocation( - void *ptr, size_t size, std::string ipc_name, int flags, int fd) + void *ptr, size_t size, std::string ipc_name, int fd, int flags) : Allocation(ptr, size, platform::CPUPlace()), ipc_name_(std::move(ipc_name)), fd_(fd), @@ -59,6 +63,7 @@ class MemoryMapAllocation : public Allocation { map_size_(size) {} inline const std::string &ipc_name() const { return ipc_name_; } + inline const int shared_fd() const { return fd_; } virtual void close(); @@ -71,6 +76,7 @@ class MemoryMapAllocation : public Allocation { void *map_ptr_ = nullptr; size_t map_size_ = 0; bool closed_ = false; + bool closed_fd_ = false; }; class RefcountedMemoryMapAllocation : public MemoryMapAllocation { @@ -93,11 +99,15 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation { void resetBaseptr(); }; -void AllocateMemoryMap( - std::string filename, int flags, size_t size, void **base_ptr_, int *fd_); +void AllocateMemoryMap(std::string filename, + int *shared_fd, + int flags, + size_t size, + void **base_ptr_); std::shared_ptr AllocateRefcountedMemoryMapAllocation(std::string filename, + int shared_fd, int flags, size_t size, int buffer_id = -1); @@ -111,11 +121,13 @@ class MemoryMapWriterAllocation : public Allocation { ipc_name_(std::move(ipc_name)) {} inline const std::string &ipc_name() const { return ipc_name_; } + inline const int shared_fd() const { return fd_; } ~MemoryMapWriterAllocation() override; private: std::string ipc_name_; + int fd_ = -1; }; class MemoryMapReaderAllocation : public Allocation { @@ -127,11 +139,13 @@ class MemoryMapReaderAllocation : public Allocation { ipc_name_(std::move(ipc_name)) {} inline const std::string &ipc_name() const { return ipc_name_; } + inline const int shared_fd() const { return fd_; } ~MemoryMapReaderAllocation() override; private: std::string ipc_name_; + int fd_ = -1; }; std::shared_ptr AllocateMemoryMapWriterAllocation( diff --git a/paddle/fluid/pybind/tensor.cc b/paddle/fluid/pybind/tensor.cc index 44983e3e13df7f..a967e666fa113a 100644 --- a/paddle/fluid/pybind/tensor.cc +++ b/paddle/fluid/pybind/tensor.cc @@ -863,7 +863,7 @@ void BindTensor(pybind11::module &m) { // NOLINT )DOC") #endif .def("_share_filename", - [](phi::DenseTensor &self) { + [](phi::DenseTensor &self, bool use_file_descriptor) { if (!self.IsInitialized() || self.numel() == 0) throw std::runtime_error( "Tensor not initialized or numel is 0. could not pass to " @@ -890,6 +890,10 @@ void BindTensor(pybind11::module &m) { // NOLINT int flags = memory::allocation::MAPPED_SHAREDMEM | memory::allocation::MAPPED_EXCLUSIVE; + if (use_file_descriptor) { + flags = flags | memory::allocation::MAPPED_KEEPFD | + memory::allocation::MAPPED_UNLINK; + } std::string handle = memory::allocation::GetIPCName(); int find_id = -1; if (FLAGS_use_shm_cache) { @@ -898,9 +902,10 @@ void BindTensor(pybind11::module &m) { // NOLINT if (find_id != -1) { handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT } + int shared_fd = -1; auto shared_holder = memory::allocation::AllocateRefcountedMemoryMapAllocation( - handle, flags, data_size, find_id); + handle, shared_fd, flags, data_size, find_id); // copy data & reset holder if (platform::is_cuda_pinned_place(holder->place())) { @@ -918,8 +923,10 @@ void BindTensor(pybind11::module &m) { // NOLINT int type_idx = static_cast(self.type()); return py::make_tuple(mmap_allocation->ipc_name(), + mmap_allocation->shared_fd(), mmap_allocation->size(), type_idx, - common::vectorize(self.dims()), self.lod()); + common::vectorize(self.dims()), self.lod(), + use_file_descriptor); }, R"DOC( Serialize CPU lod tensor in shared memory to tuple. @@ -939,30 +946,37 @@ void BindTensor(pybind11::module &m) { // NOLINT )DOC") .def("_new_shared_filename", [](py::tuple t) { // __setstate__ - if (t.size() != 5) + if (t.size() != 7) throw std::runtime_error("Invalid Tensor meta info state!"); phi::DenseTensor tensor; // 2. Rebuild Allocation const std::string &ipc_name = t[0].cast(); - size_t size = t[1].cast(); + const int shared_fd = t[1].cast(); + const bool use_file_descriptor = t[6].cast(); + + size_t size = t[2].cast(); int flags = memory::allocation::MAPPED_SHAREDMEM | memory::allocation::MAPPED_NOCREATE; + if (use_file_descriptor) { + flags = flags | memory::allocation::MAPPED_KEEPFD | + memory::allocation::MAPPED_UNLINK; + } int find_id = -1; if (FLAGS_use_shm_cache) { find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT } auto shared_holder = memory::allocation::AllocateRefcountedMemoryMapAllocation( - ipc_name, flags, size, find_id); + ipc_name, shared_fd, flags, size, find_id); // 3. Rebuild Tensor tensor.ResetHolderWithType( shared_holder, - static_cast(t[2].cast())); - tensor.Resize(common::make_ddim(t[3].cast>())); - tensor.set_lod(t[4].cast()); + static_cast(t[3].cast())); + tensor.Resize(common::make_ddim(t[4].cast>())); + tensor.set_lod(t[5].cast()); return tensor; }, diff --git a/paddle/phi/core/flags.cc b/paddle/phi/core/flags.cc index a6764dfcf1c31f..d6280dbfc6d085 100644 --- a/paddle/phi/core/flags.cc +++ b/paddle/phi/core/flags.cc @@ -1310,6 +1310,19 @@ PHI_DEFINE_EXPORTED_bool(use_shm_cache, false, "Use shm cache in mmap_allocator."); +/** + * mmap_allocator related FLAG + * Name: dataloader_use_file_descriptor + * Since Version: 2.6.0 + * Value Range: bool, default=true + * Example: + * Note: . If True, mmap_allocator will use file descripor to open shared memory + * operation. + */ +PHI_DEFINE_EXPORTED_bool(dataloader_use_file_descriptor, + true, + "Use file descriptor in mmap_allocator."); + /** * Tensor operants related FLAG * Name: tensor_operants_mode diff --git a/python/paddle/base/reader.py b/python/paddle/base/reader.py index 7fcccf8910fc46..a4a8b822efa002 100644 --- a/python/paddle/base/reader.py +++ b/python/paddle/base/reader.py @@ -96,10 +96,17 @@ def _convert_places(places): # NOTE(chenweihang): _reader_process_loop must be top level method to be pickled -def _reader_process_loop(batch_reader, data_queue): +def _reader_process_loop( + batch_reader, data_queue, dataloader_use_file_descriptor=True +): try: # set signal handler core._set_process_signal_handler() + if not dataloader_use_file_descriptor: + # set dataloader_use_file_descriptor to false to avoid use descriptor. + paddle.base.core.globals()[ + "FLAGS_dataloader_use_file_descriptor" + ] = False # NOTE: [ mmap files clear ] When the child process exits unexpectedly, # some shared memory objects may have been applied for but have not yet @@ -606,7 +613,7 @@ def _start(self): multiprocess_queue_set.add(self._data_queue) self._process = multiprocessing.Process( target=_reader_process_loop, - args=(self._batch_reader, self._data_queue), + args=(self._batch_reader, self._data_queue, False), ) self._process.daemon = True self._process.start() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index 520aa0a2b24ee2..a940edcd6b8093 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import multiprocessing # TODO: check the hooks of tensor # TODO: check serializing named tensor @@ -117,8 +118,53 @@ def _reduce_tensor(tensor): ) -def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): - lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod)) +def _rebuild_lodtensor_filename( + cls, + ipc_name, + shared_fd, + size, + type_idx, + dims, + lod, + dataloader_use_file_descriptor, +): + lodtensor = cls._new_shared_filename( + ( + ipc_name, + shared_fd, + size, + type_idx, + dims, + lod, + dataloader_use_file_descriptor, + ) + ) + lodtensor._shared_decref() + return lodtensor + + +def _rebuild_lodtensor_filedescriptor( + cls, + ipc_name, + shared_fd, + size, + type_idx, + dims, + lod, + dataloader_use_file_descriptor, +): + shared_fd = shared_fd.detach() + lodtensor = cls._new_shared_filename( + ( + ipc_name, + shared_fd, + size, + type_idx, + dims, + lod, + dataloader_use_file_descriptor, + ) + ) lodtensor._shared_decref() return lodtensor @@ -161,15 +207,23 @@ def _reduce_lodtensor(lodtensor): if dim == 0: # Empty tensors have nothing be mmapped. return (_rebuild_lodtensor_empty, (type(lodtensor),)) - - # Default use share filename stratege - metadata = ( - lodtensor._share_filename() - ) # ipc_name, size, type_idx, dims, lod - rebuild = _rebuild_lodtensor_filename + dataloader_use_file_descriptor = paddle.base.core.globals()[ + "FLAGS_dataloader_use_file_descriptor" + ] + # Default use share filename strategy + metadata = lodtensor._share_filename( + dataloader_use_file_descriptor + ) # ipc_name, fd, size, type_idx, dims, lod + + if dataloader_use_file_descriptor: + metalist = list(metadata) + metalist[1] = multiprocessing.reduction.DupFd(metalist[1]) + metadata = tuple(metalist) + rebuild = _rebuild_lodtensor_filedescriptor + else: + rebuild = _rebuild_lodtensor_filename lodtensor._shared_incref() # TODO, maintain reference for lodtensor - # TODO: support file_discriptor stratege elif lodtensor._place().is_gpu_place(): metadata = lodtensor._share_cuda() rebuild = _rebuild_cuda_tensor diff --git a/test/legacy_test/CMakeLists.txt b/test/legacy_test/CMakeLists.txt index 476d8f5d02a88f..c5db16c7cc9c2e 100644 --- a/test/legacy_test/CMakeLists.txt +++ b/test/legacy_test/CMakeLists.txt @@ -1101,6 +1101,7 @@ set_tests_properties(test_pad3d_op PROPERTIES TIMEOUT 120) set_tests_properties(test_dataloader_keep_order PROPERTIES TIMEOUT 120) set_tests_properties(test_mean_op PROPERTIES TIMEOUT 120) set_tests_properties(test_dataloader_unkeep_order PROPERTIES TIMEOUT 120) +set_tests_properties(test_dataloader PROPERTIES TIMEOUT 120) set_tests_properties(test_reader_reset PROPERTIES TIMEOUT 120) set_tests_properties(test_pool3d_api PROPERTIES TIMEOUT 120) set_tests_properties(test_cumprod_op PROPERTIES TIMEOUT 120) diff --git a/test/legacy_test/test_dataloader.py b/test/legacy_test/test_dataloader.py new file mode 100644 index 00000000000000..a7e0de0ba55f18 --- /dev/null +++ b/test/legacy_test/test_dataloader.py @@ -0,0 +1,119 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np + +import paddle +import paddle.nn.functional as F +from paddle import base +from paddle.io import DataLoader, Dataset + +BATCH_NUM = 4 +BATCH_SIZE = 8 +EPOCH_NUM = 2 + +IMAGE_SIZE = 784 +CLASS_NUM = 10 + + +# define a random dataset +class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([IMAGE_SIZE]).astype('float32') + label = np.random.randint(0, CLASS_NUM - 1, (1,)).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + +dataset = RandomDataset(BATCH_NUM * BATCH_SIZE) + + +class TestDygraphDataLoader(unittest.TestCase): + def setUp(self): + self.batch_size = BATCH_SIZE + self.batch_num = BATCH_NUM + self.epoch_num = EPOCH_NUM + + def iter_loader_data(self, loader): + for _ in range(self.epoch_num): + for image, label in loader(): + relu = F.relu(image) + self.assertEqual(image.shape, [self.batch_size, IMAGE_SIZE]) + self.assertEqual(label.shape, [self.batch_size, 1]) + self.assertEqual(relu.shape, [self.batch_size, IMAGE_SIZE]) + + def test_single_process_loader_filedescriptor(self): + with base.dygraph.guard(): + loader = DataLoader( + dataset, + batch_size=self.batch_size, + shuffle=True, + drop_last=True, + use_shared_memory=True, + num_workers=0, + ) + self.iter_loader_data(loader) + + def test_multi_process_dataloader_filedescriptor(self): + with base.dygraph.guard(): + loader = DataLoader( + dataset, + batch_size=self.batch_size, + shuffle=True, + drop_last=True, + use_shared_memory=True, + num_workers=2, + ) + self.iter_loader_data(loader) + + def test_single_process_loader_filename(self): + paddle.base.core.globals()[ + "FLAGS_dataloader_use_file_descriptor" + ] = False + with base.dygraph.guard(): + loader = DataLoader( + dataset, + batch_size=self.batch_size, + shuffle=True, + drop_last=True, + use_shared_memory=True, + num_workers=0, + ) + self.iter_loader_data(loader) + + def test_multi_process_dataloader_filename(self): + paddle.base.core.globals()[ + "FLAGS_dataloader_use_file_descriptor" + ] = False + with base.dygraph.guard(): + loader = DataLoader( + dataset, + batch_size=self.batch_size, + shuffle=True, + drop_last=True, + use_shared_memory=True, + num_workers=2, + ) + self.iter_loader_data(loader) + + +if __name__ == '__main__': + unittest.main()