Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions paddle/fluid/memory/allocation/mmap_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ struct CountInfo {
std::atomic<int> refcount;
};

void AllocateMemoryMap(
std::string filename, int flags, size_t size, void **map_ptr_, int *fd_) {
void AllocateMemoryMap(std::string filename,
int *shared_fd,
int flags,
size_t size,
void **map_ptr_) {
// TODO(@ZHUI): support win32
int file_flags = 0;
int fd = -1;
int fd = *shared_fd;
if (flags & MAPPED_SHAREDMEM) {
file_flags = O_RDWR | O_CREAT;
} else {
Expand All @@ -71,7 +74,7 @@ void AllocateMemoryMap(
file_flags &= ~O_CREAT;
}

if (!(flags & MAPPED_FROMFD)) {
if (!(flags & MAPPED_FROMFD) && fd == -1) {
if (flags & MAPPED_SHAREDMEM) {
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
PADDLE_ENFORCE_NE(
Expand All @@ -83,8 +86,6 @@ void AllocateMemoryMap(
VLOG(6) << "shm_open: " << filename;
MemoryMapFdSet::Instance().Insert(filename);
}
} else {
fd = -1;
}

PADDLE_ENFORCE_EQ(ftruncate(fd, size),
Expand All @@ -98,32 +99,38 @@ void AllocateMemoryMap(
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
}

if (flags & MAPPED_UNLINK) {
VLOG(6) << "shm_unlink: " << filename;
shm_unlink(filename.c_str());
}

PADDLE_ENFORCE_NE(*map_ptr_,
MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when create shared memory."));

if (flags & MAPPED_KEEPFD) {
*fd_ = fd;
*shared_fd = fd;
VLOG(6) << "keep fd: " << *shared_fd;
} else {
PADDLE_ENFORCE_NE(::close(fd),
-1,
platform::errors::Unavailable(
"Error closing memory maped file <", filename, ">"));

*fd_ = -1;
*shared_fd = -1;
}
}

std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename,
int shared_fd,
int flags,
size_t size,
int buffer_id) {
int fd = -1;
int fd = shared_fd;
void *base_ptr = nullptr;
if (buffer_id == -1) {
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
AllocateMemoryMap(filename, &fd, flags, size + mmap_alignment, &base_ptr);
VLOG(4) << "Create and mmap a new shm: " << filename;
} else {
base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_;
Expand All @@ -132,7 +139,7 @@ AllocateRefcountedMemoryMapAllocation(std::string filename,
void *aliged_base_ptr =
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
return std::make_shared<RefcountedMemoryMapAllocation>(
aliged_base_ptr, size, filename, flags, fd, buffer_id);
aliged_base_ptr, size, filename, fd, flags, buffer_id);
}

RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
Expand All @@ -145,11 +152,22 @@ RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
// must reset base ptr first.
buffer_id_ = buffer_id;
fd_ = fd;
flags_ = flags;
resetBaseptr();
initializeRefercount();
}

void MemoryMapAllocation::close() {
if (!closed_fd_) {
closed_fd_ = true;
if (flags_ & MAPPED_KEEPFD) {
PADDLE_ENFORCE_NE(::close(fd_),
-1,
platform::errors::Unavailable(
"Error closing file descriptor <", fd_, ">"));
}
}
if (closed_) {
return;
}
Expand Down Expand Up @@ -193,6 +211,15 @@ void RefcountedMemoryMapAllocation::close() {
void *data = map_ptr_;
CountInfo *info = reinterpret_cast<CountInfo *>(data);
--info->refcount;
if (flags_ & MAPPED_KEEPFD) {
closed_fd_ = true;
PADDLE_ENFORCE_NE(::close(fd_),
-1,
platform::errors::Unavailable(
"Error closing file descriptor <", fd_, ">"));
VLOG(6) << "close fd: " << fd_;
}

if (FLAGS_use_shm_cache && buffer_id_ != -1) {
return;
} else {
Expand Down Expand Up @@ -260,6 +287,7 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
const std::string &ipc_name = GetIPCName();
int flags = O_RDWR | O_CREAT;
int fd = shm_open(ipc_name.c_str(), flags, 0600);

PADDLE_ENFORCE_NE(fd,
-1,
platform::errors::Unavailable(
Expand All @@ -283,7 +311,6 @@ std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
const std::string &ipc_name, size_t size) {
int flags = O_RDWR | O_CREAT;
flags &= ~O_CREAT;

int fd = shm_open(ipc_name.c_str(), flags, 0600);
PADDLE_ENFORCE_NE(fd,
-1,
Expand Down
22 changes: 18 additions & 4 deletions paddle/fluid/memory/allocation/mmap_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ enum MappedModes {

class MemoryMapAllocation : public Allocation {
public:
explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name)
explicit MemoryMapAllocation(void *ptr,
size_t size,
std::string ipc_name,
int fd)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)),
fd_(fd),
map_ptr_(ptr),
map_size_(size) {}
explicit MemoryMapAllocation(
void *ptr, size_t size, std::string ipc_name, int flags, int fd)
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
: Allocation(ptr, size, platform::CPUPlace()),
ipc_name_(std::move(ipc_name)),
fd_(fd),
Expand All @@ -59,6 +63,7 @@ class MemoryMapAllocation : public Allocation {
map_size_(size) {}

inline const std::string &ipc_name() const { return ipc_name_; }
inline const int shared_fd() const { return fd_; }

virtual void close();

Expand All @@ -71,6 +76,7 @@ class MemoryMapAllocation : public Allocation {
void *map_ptr_ = nullptr;
size_t map_size_ = 0;
bool closed_ = false;
bool closed_fd_ = false;
};

class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
Expand All @@ -93,11 +99,15 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
void resetBaseptr();
};

void AllocateMemoryMap(
std::string filename, int flags, size_t size, void **base_ptr_, int *fd_);
void AllocateMemoryMap(std::string filename,
int *shared_fd,
int flags,
size_t size,
void **base_ptr_);

std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename,
int shared_fd,
int flags,
size_t size,
int buffer_id = -1);
Expand All @@ -111,11 +121,13 @@ class MemoryMapWriterAllocation : public Allocation {
ipc_name_(std::move(ipc_name)) {}

inline const std::string &ipc_name() const { return ipc_name_; }
inline const int shared_fd() const { return fd_; }

~MemoryMapWriterAllocation() override;

private:
std::string ipc_name_;
int fd_ = -1;
};

class MemoryMapReaderAllocation : public Allocation {
Expand All @@ -127,11 +139,13 @@ class MemoryMapReaderAllocation : public Allocation {
ipc_name_(std::move(ipc_name)) {}

inline const std::string &ipc_name() const { return ipc_name_; }
inline const int shared_fd() const { return fd_; }

~MemoryMapReaderAllocation() override;

private:
std::string ipc_name_;
int fd_ = -1;
};

std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
Expand Down
32 changes: 23 additions & 9 deletions paddle/fluid/pybind/tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ void BindTensor(pybind11::module &m) { // NOLINT
)DOC")
#endif
.def("_share_filename",
[](phi::DenseTensor &self) {
[](phi::DenseTensor &self, bool use_file_descriptor) {
if (!self.IsInitialized() || self.numel() == 0)
throw std::runtime_error(
"Tensor not initialized or numel is 0. could not pass to "
Expand All @@ -890,6 +890,10 @@ void BindTensor(pybind11::module &m) { // NOLINT

int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_EXCLUSIVE;
if (use_file_descriptor) {
flags = flags | memory::allocation::MAPPED_KEEPFD |
memory::allocation::MAPPED_UNLINK;
}
std::string handle = memory::allocation::GetIPCName();
int find_id = -1;
if (FLAGS_use_shm_cache) {
Expand All @@ -898,9 +902,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
if (find_id != -1) {
handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT
}
int shared_fd = -1;
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
handle, flags, data_size, find_id);
handle, shared_fd, flags, data_size, find_id);

// copy data & reset holder
if (platform::is_cuda_pinned_place(holder->place())) {
Expand All @@ -918,8 +923,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
int type_idx = static_cast<int>(self.type());

return py::make_tuple(mmap_allocation->ipc_name(),
mmap_allocation->shared_fd(),
mmap_allocation->size(), type_idx,
common::vectorize(self.dims()), self.lod());
common::vectorize(self.dims()), self.lod(),
use_file_descriptor);
},
R"DOC(
Serialize CPU lod tensor in shared memory to tuple.
Expand All @@ -939,30 +946,37 @@ void BindTensor(pybind11::module &m) { // NOLINT
)DOC")
.def("_new_shared_filename",
[](py::tuple t) { // __setstate__
if (t.size() != 5)
if (t.size() != 7)
throw std::runtime_error("Invalid Tensor meta info state!");

phi::DenseTensor tensor;

// 2. Rebuild Allocation
const std::string &ipc_name = t[0].cast<std::string>();
size_t size = t[1].cast<size_t>();
const int shared_fd = t[1].cast<int>();
const bool use_file_descriptor = t[6].cast<bool>();

size_t size = t[2].cast<size_t>();
int flags = memory::allocation::MAPPED_SHAREDMEM |
memory::allocation::MAPPED_NOCREATE;
if (use_file_descriptor) {
flags = flags | memory::allocation::MAPPED_KEEPFD |
memory::allocation::MAPPED_UNLINK;
}
int find_id = -1;
if (FLAGS_use_shm_cache) {
find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT
}
auto shared_holder =
memory::allocation::AllocateRefcountedMemoryMapAllocation(
ipc_name, flags, size, find_id);
ipc_name, shared_fd, flags, size, find_id);

// 3. Rebuild Tensor
tensor.ResetHolderWithType(
shared_holder,
static_cast<phi::DataType>(t[2].cast<int>()));
tensor.Resize(common::make_ddim(t[3].cast<std::vector<int>>()));
tensor.set_lod(t[4].cast<framework::LoD>());
static_cast<phi::DataType>(t[3].cast<int>()));
tensor.Resize(common::make_ddim(t[4].cast<std::vector<int>>()));
tensor.set_lod(t[5].cast<framework::LoD>());

return tensor;
},
Expand Down
13 changes: 13 additions & 0 deletions paddle/phi/core/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,19 @@ PHI_DEFINE_EXPORTED_bool(use_shm_cache,
false,
"Use shm cache in mmap_allocator.");

/**
* mmap_allocator related FLAG
* Name: dataloader_use_file_descriptor
* Since Version: 2.6.0
* Value Range: bool, default=true
* Example:
* Note: . If True, mmap_allocator will use file descripor to open shared memory
* operation.
*/
PHI_DEFINE_EXPORTED_bool(dataloader_use_file_descriptor,
true,
"Use file descriptor in mmap_allocator.");

/**
* Tensor operants related FLAG
* Name: tensor_operants_mode
Expand Down
11 changes: 9 additions & 2 deletions python/paddle/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,17 @@ def _convert_places(places):


# NOTE(chenweihang): _reader_process_loop must be top level method to be pickled
def _reader_process_loop(batch_reader, data_queue):
def _reader_process_loop(
batch_reader, data_queue, dataloader_use_file_descriptor=True
):
try:
# set signal handler
core._set_process_signal_handler()
if not dataloader_use_file_descriptor:
# set dataloader_use_file_descriptor to false to avoid use descriptor.
paddle.base.core.globals()[
"FLAGS_dataloader_use_file_descriptor"
] = False

# NOTE: [ mmap files clear ] When the child process exits unexpectedly,
# some shared memory objects may have been applied for but have not yet
Expand Down Expand Up @@ -606,7 +613,7 @@ def _start(self):
multiprocess_queue_set.add(self._data_queue)
self._process = multiprocessing.Process(
target=_reader_process_loop,
args=(self._batch_reader, self._data_queue),
args=(self._batch_reader, self._data_queue, False),
)
self._process.daemon = True
self._process.start()
Expand Down
Loading