Skip to content

Commit 71440b5

Browse files
committed
[cherry-pick][dataloader] use file descripor instead of file system (#62696)
1 parent 89a60d7 commit 71440b5

File tree

8 files changed

+286
-37
lines changed

8 files changed

+286
-37
lines changed

paddle/fluid/memory/allocation/mmap_allocator.cc

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,14 @@ struct CountInfo {
5454
std::atomic<int> refcount;
5555
};
5656

57-
void AllocateMemoryMap(
58-
std::string filename, int flags, size_t size, void **map_ptr_, int *fd_) {
57+
void AllocateMemoryMap(std::string filename,
58+
int *shared_fd,
59+
int flags,
60+
size_t size,
61+
void **map_ptr_) {
5962
// TODO(@ZHUI): support win32
6063
int file_flags = 0;
61-
int fd = -1;
64+
int fd = *shared_fd;
6265
if (flags & MAPPED_SHAREDMEM) {
6366
file_flags = O_RDWR | O_CREAT;
6467
} else {
@@ -71,7 +74,7 @@ void AllocateMemoryMap(
7174
file_flags &= ~O_CREAT;
7275
}
7376

74-
if (!(flags & MAPPED_FROMFD)) {
77+
if (!(flags & MAPPED_FROMFD) && fd == -1) {
7578
if (flags & MAPPED_SHAREDMEM) {
7679
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
7780
PADDLE_ENFORCE_NE(
@@ -83,8 +86,6 @@ void AllocateMemoryMap(
8386
VLOG(6) << "shm_open: " << filename;
8487
MemoryMapFdSet::Instance().Insert(filename);
8588
}
86-
} else {
87-
fd = -1;
8889
}
8990

9091
PADDLE_ENFORCE_EQ(ftruncate(fd, size),
@@ -98,32 +99,38 @@ void AllocateMemoryMap(
9899
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
99100
}
100101

102+
if (flags & MAPPED_UNLINK) {
103+
VLOG(6) << "shm_unlink: " << filename;
104+
shm_unlink(filename.c_str());
105+
}
106+
101107
PADDLE_ENFORCE_NE(*map_ptr_,
102108
MAP_FAILED,
103109
platform::errors::Unavailable(
104110
"Memory map failed when create shared memory."));
105-
106111
if (flags & MAPPED_KEEPFD) {
107-
*fd_ = fd;
112+
*shared_fd = fd;
113+
VLOG(6) << "keep fd: " << *shared_fd;
108114
} else {
109115
PADDLE_ENFORCE_NE(::close(fd),
110116
-1,
111117
platform::errors::Unavailable(
112118
"Error closing memory maped file <", filename, ">"));
113119

114-
*fd_ = -1;
120+
*shared_fd = -1;
115121
}
116122
}
117123

118124
std::shared_ptr<RefcountedMemoryMapAllocation>
119125
AllocateRefcountedMemoryMapAllocation(std::string filename,
126+
int shared_fd,
120127
int flags,
121128
size_t size,
122129
int buffer_id) {
123-
int fd = -1;
130+
int fd = shared_fd;
124131
void *base_ptr = nullptr;
125132
if (buffer_id == -1) {
126-
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
133+
AllocateMemoryMap(filename, &fd, flags, size + mmap_alignment, &base_ptr);
127134
VLOG(4) << "Create and mmap a new shm: " << filename;
128135
} else {
129136
base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_;
@@ -132,7 +139,7 @@ AllocateRefcountedMemoryMapAllocation(std::string filename,
132139
void *aliged_base_ptr =
133140
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
134141
return std::make_shared<RefcountedMemoryMapAllocation>(
135-
aliged_base_ptr, size, filename, flags, fd, buffer_id);
142+
aligned_base_ptr, size, filename, fd, flags, buffer_id);
136143
}
137144

138145
RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
@@ -145,11 +152,22 @@ RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
145152
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
146153
// must reset base ptr first.
147154
buffer_id_ = buffer_id;
155+
fd_ = fd;
156+
flags_ = flags;
148157
resetBaseptr();
149158
initializeRefercount();
150159
}
151160

152161
void MemoryMapAllocation::close() {
162+
if (!closed_fd_) {
163+
closed_fd_ = true;
164+
if (flags_ & MAPPED_KEEPFD) {
165+
PADDLE_ENFORCE_NE(::close(fd_),
166+
-1,
167+
platform::errors::Unavailable(
168+
"Error closing file descriptor <", fd_, ">"));
169+
}
170+
}
153171
if (closed_) {
154172
return;
155173
}
@@ -193,6 +211,15 @@ void RefcountedMemoryMapAllocation::close() {
193211
void *data = map_ptr_;
194212
CountInfo *info = reinterpret_cast<CountInfo *>(data);
195213
--info->refcount;
214+
if (flags_ & MAPPED_KEEPFD) {
215+
closed_fd_ = true;
216+
PADDLE_ENFORCE_NE(::close(fd_),
217+
-1,
218+
platform::errors::Unavailable(
219+
"Error closing file descriptor <", fd_, ">"));
220+
VLOG(6) << "close fd: " << fd_;
221+
}
222+
196223
if (FLAGS_use_shm_cache && buffer_id_ != -1) {
197224
return;
198225
} else {
@@ -260,6 +287,7 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
260287
const std::string &ipc_name = GetIPCName();
261288
int flags = O_RDWR | O_CREAT;
262289
int fd = shm_open(ipc_name.c_str(), flags, 0600);
290+
263291
PADDLE_ENFORCE_NE(fd,
264292
-1,
265293
platform::errors::Unavailable(
@@ -283,7 +311,6 @@ std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
283311
const std::string &ipc_name, size_t size) {
284312
int flags = O_RDWR | O_CREAT;
285313
flags &= ~O_CREAT;
286-
287314
int fd = shm_open(ipc_name.c_str(), flags, 0600);
288315
PADDLE_ENFORCE_NE(fd,
289316
-1,

paddle/fluid/memory/allocation/mmap_allocator.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,17 @@ enum MappedModes {
4444

4545
class MemoryMapAllocation : public Allocation {
4646
public:
47-
explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name)
47+
explicit MemoryMapAllocation(void *ptr,
48+
size_t size,
49+
std::string ipc_name,
50+
int fd)
4851
: Allocation(ptr, size, platform::CPUPlace()),
4952
ipc_name_(std::move(ipc_name)),
53+
fd_(fd),
5054
map_ptr_(ptr),
5155
map_size_(size) {}
5256
explicit MemoryMapAllocation(
53-
void *ptr, size_t size, std::string ipc_name, int flags, int fd)
57+
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
5458
: Allocation(ptr, size, platform::CPUPlace()),
5559
ipc_name_(std::move(ipc_name)),
5660
fd_(fd),
@@ -59,6 +63,7 @@ class MemoryMapAllocation : public Allocation {
5963
map_size_(size) {}
6064

6165
inline const std::string &ipc_name() const { return ipc_name_; }
66+
inline const int shared_fd() const { return fd_; }
6267

6368
virtual void close();
6469

@@ -71,6 +76,7 @@ class MemoryMapAllocation : public Allocation {
7176
void *map_ptr_ = nullptr;
7277
size_t map_size_ = 0;
7378
bool closed_ = false;
79+
bool closed_fd_ = false;
7480
};
7581

7682
class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
@@ -93,11 +99,15 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
9399
void resetBaseptr();
94100
};
95101

96-
void AllocateMemoryMap(
97-
std::string filename, int flags, size_t size, void **base_ptr_, int *fd_);
102+
void AllocateMemoryMap(std::string filename,
103+
int *shared_fd,
104+
int flags,
105+
size_t size,
106+
void **base_ptr_);
98107

99108
std::shared_ptr<RefcountedMemoryMapAllocation>
100109
AllocateRefcountedMemoryMapAllocation(std::string filename,
110+
int shared_fd,
101111
int flags,
102112
size_t size,
103113
int buffer_id = -1);
@@ -111,11 +121,13 @@ class MemoryMapWriterAllocation : public Allocation {
111121
ipc_name_(std::move(ipc_name)) {}
112122

113123
inline const std::string &ipc_name() const { return ipc_name_; }
124+
inline const int shared_fd() const { return fd_; }
114125

115126
~MemoryMapWriterAllocation() override;
116127

117128
private:
118129
std::string ipc_name_;
130+
int fd_ = -1;
119131
};
120132

121133
class MemoryMapReaderAllocation : public Allocation {
@@ -127,11 +139,13 @@ class MemoryMapReaderAllocation : public Allocation {
127139
ipc_name_(std::move(ipc_name)) {}
128140

129141
inline const std::string &ipc_name() const { return ipc_name_; }
142+
inline const int shared_fd() const { return fd_; }
130143

131144
~MemoryMapReaderAllocation() override;
132145

133146
private:
134147
std::string ipc_name_;
148+
int fd_ = -1;
135149
};
136150

137151
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(

paddle/fluid/pybind/tensor.cc

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,7 @@ void BindTensor(pybind11::module &m) { // NOLINT
863863
)DOC")
864864
#endif
865865
.def("_share_filename",
866-
[](phi::DenseTensor &self) {
866+
[](phi::DenseTensor &self, bool use_file_descriptor) {
867867
if (!self.IsInitialized() || self.numel() == 0)
868868
throw std::runtime_error(
869869
"Tensor not initialized or numel is 0. could not pass to "
@@ -890,6 +890,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
890890

891891
int flags = memory::allocation::MAPPED_SHAREDMEM |
892892
memory::allocation::MAPPED_EXCLUSIVE;
893+
if (use_file_descriptor) {
894+
flags = flags | memory::allocation::MAPPED_KEEPFD |
895+
memory::allocation::MAPPED_UNLINK;
896+
}
893897
std::string handle = memory::allocation::GetIPCName();
894898
int find_id = -1;
895899
if (FLAGS_use_shm_cache) {
@@ -898,9 +902,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
898902
if (find_id != -1) {
899903
handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT
900904
}
905+
int shared_fd = -1;
901906
auto shared_holder =
902907
memory::allocation::AllocateRefcountedMemoryMapAllocation(
903-
handle, flags, data_size, find_id);
908+
handle, shared_fd, flags, data_size, find_id);
904909

905910
// copy data & reset holder
906911
if (platform::is_cuda_pinned_place(holder->place())) {
@@ -918,8 +923,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
918923
int type_idx = static_cast<int>(self.type());
919924

920925
return py::make_tuple(mmap_allocation->ipc_name(),
926+
mmap_allocation->shared_fd(),
921927
mmap_allocation->size(), type_idx,
922-
common::vectorize(self.dims()), self.lod());
928+
common::vectorize(self.dims()), self.lod(),
929+
use_file_descriptor);
923930
},
924931
R"DOC(
925932
Serialize CPU lod tensor in shared memory to tuple.
@@ -939,30 +946,37 @@ void BindTensor(pybind11::module &m) { // NOLINT
939946
)DOC")
940947
.def("_new_shared_filename",
941948
[](py::tuple t) { // __setstate__
942-
if (t.size() != 5)
949+
if (t.size() != 7)
943950
throw std::runtime_error("Invalid Tensor meta info state!");
944951

945952
phi::DenseTensor tensor;
946953

947954
// 2. Rebuild Allocation
948955
const std::string &ipc_name = t[0].cast<std::string>();
949-
size_t size = t[1].cast<size_t>();
956+
const int shared_fd = t[1].cast<int>();
957+
const bool use_file_descriptor = t[6].cast<bool>();
958+
959+
size_t size = t[2].cast<size_t>();
950960
int flags = memory::allocation::MAPPED_SHAREDMEM |
951961
memory::allocation::MAPPED_NOCREATE;
962+
if (use_file_descriptor) {
963+
flags = flags | memory::allocation::MAPPED_KEEPFD |
964+
memory::allocation::MAPPED_UNLINK;
965+
}
952966
int find_id = -1;
953967
if (FLAGS_use_shm_cache) {
954968
find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT
955969
}
956970
auto shared_holder =
957971
memory::allocation::AllocateRefcountedMemoryMapAllocation(
958-
ipc_name, flags, size, find_id);
972+
ipc_name, shared_fd, flags, size, find_id);
959973

960974
// 3. Rebuild Tensor
961975
tensor.ResetHolderWithType(
962976
shared_holder,
963-
static_cast<phi::DataType>(t[2].cast<int>()));
964-
tensor.Resize(common::make_ddim(t[3].cast<std::vector<int>>()));
965-
tensor.set_lod(t[4].cast<framework::LoD>());
977+
static_cast<phi::DataType>(t[3].cast<int>()));
978+
tensor.Resize(common::make_ddim(t[4].cast<std::vector<int>>()));
979+
tensor.set_lod(t[5].cast<framework::LoD>());
966980

967981
return tensor;
968982
},

paddle/phi/core/flags.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,19 @@ PHI_DEFINE_EXPORTED_bool(use_shm_cache,
13101310
false,
13111311
"Use shm cache in mmap_allocator.");
13121312

1313+
/**
1314+
* mmap_allocator related FLAG
1315+
* Name: dataloader_use_file_descriptor
1316+
* Since Version: 2.6.0
1317+
* Value Range: bool, default=true
1318+
* Example:
1319+
* Note: . If True, mmap_allocator will use file descripor to open shared memory
1320+
* operation.
1321+
*/
1322+
PHI_DEFINE_EXPORTED_bool(dataloader_use_file_descriptor,
1323+
true,
1324+
"Use file descriptor in mmap_allocator.");
1325+
13131326
/**
13141327
* Tensor operants related FLAG
13151328
* Name: tensor_operants_mode

python/paddle/base/reader.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,17 @@ def _convert_places(places):
9696

9797

9898
# NOTE(chenweihang): _reader_process_loop must be top level method to be pickled
99-
def _reader_process_loop(batch_reader, data_queue):
99+
def _reader_process_loop(
100+
batch_reader, data_queue, dataloader_use_file_descriptor=True
101+
):
100102
try:
101103
# set signal handler
102104
core._set_process_signal_handler()
105+
if not dataloader_use_file_descriptor:
106+
# set dataloader_use_file_descriptor to false to avoid use descriptor.
107+
paddle.base.core.globals()[
108+
"FLAGS_dataloader_use_file_descriptor"
109+
] = False
103110

104111
# NOTE: [ mmap files clear ] When the child process exits unexpectedly,
105112
# some shared memory objects may have been applied for but have not yet
@@ -606,7 +613,7 @@ def _start(self):
606613
multiprocess_queue_set.add(self._data_queue)
607614
self._process = multiprocessing.Process(
608615
target=_reader_process_loop,
609-
args=(self._batch_reader, self._data_queue),
616+
args=(self._batch_reader, self._data_queue, False),
610617
)
611618
self._process.daemon = True
612619
self._process.start()

0 commit comments

Comments
 (0)