diff --git a/cmake/develop.cmake b/cmake/develop.cmake index 864fb2b5..e9ec2529 100644 --- a/cmake/develop.cmake +++ b/cmake/develop.cmake @@ -39,7 +39,11 @@ int main() endmacro() # Enable address sanitizer -option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON) +if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" OFF) +else() + option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON) +endif() if(ENABLE_SANITIZER AND NOT MSVC) if(CMAKE_BUILD_TYPE STREQUAL "Debug") check_asan(HAS_ASAN) diff --git a/include/async_simple/CMakeLists.txt b/include/async_simple/CMakeLists.txt deleted file mode 100644 index 59b77423..00000000 --- a/include/async_simple/CMakeLists.txt +++ /dev/null @@ -1,73 +0,0 @@ -file(GLOB coro_src "coro/*.cpp") -file(GLOB executors_src "executors/*.cpp") - -if(${UTHREAD}) - file(GLOB uthread_src "uthread/internal/*.cc") - if (CMAKE_BUILD_TYPE STREQUAL "Debug" AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND - CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL "13") - endif() -file(GLOB uthread_asm_src "uthread/internal/${CMAKE_SYSTEM_NAME}/${CMAKE_SYSTEM_PROCESSOR}/*.S") -endif() - -file(GLOB headers "*.h") -file(GLOB coro_header "coro/*.h") -file(GLOB executors_header "executors/*.h") -file(GLOB experimental_header "experimental/*.h") -file(GLOB util_header "util/*.h") -if(UTHREAD) - file(GLOB uthread_header "uthread/*.h") - file(GLOB uthread_internal_header "uthread/internal/*.h") -endif() - - -set(SRCS - ${coro_src} - ${executors_src} - ) -if(UTHREAD) - list(APPEND SRCS ${uthread_src}) - list(APPEND SRCS ${uthread_asm_src}) -endif() - -if(NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC") - add_library(async_simple_static STATIC ${SRCS}) - add_library(async_simple SHARED ${SRCS}) - target_link_libraries(async_simple PUBLIC libasync_simple) - target_link_libraries(async_simple_static PUBLIC libasync_simple) - - set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple") - - install(TARGETS async_simple DESTINATION lib/) - install(TARGETS async_simple_static DESTINATION lib/) -else() - add_library(async_simple STATIC ${SRCS}) - target_link_libraries(async_simple PUBLIC libasync_simple) - install(TARGETS async_simple DESTINATION lib/) -endif() - -set_target_properties(async_simple PROPERTIES - VERSION ${PROJECT_VERSION} - SOVERSION ${PROJECT_VERSION_MAJOR}) - -install(FILES ${headers} DESTINATION include/async_simple) -install(FILES ${coro_header} DESTINATION include/async_simple/coro) -install(FILES ${executors_header} DESTINATION include/async_simple/executors) -install(FILES ${experimental_header} DESTINATION include/async_simple/experimental) -install(FILES ${util_header} DESTINATION include/async_simple/util) -if(UTHREAD) - install(FILES ${uthread_header} DESTINATION include/async_simple/uthread) - install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal) -endif() - -if (${ASYNC_SIMPLE_ENABLE_TESTS}) - add_subdirectory(test) - add_subdirectory(util/test) - add_subdirectory(coro/test) - add_subdirectory(executors/test) - if(UTHREAD) - add_subdirectory(uthread/test) - endif() -endif() -if (NOT TARGET async_simple::async_simple_header_only) - add_library(async_simple::async_simple_header_only ALIAS libasync_simple) -endif () diff --git a/include/async_simple/Common.h b/include/async_simple/Common.h index a82a40c3..bde017e6 100644 --- a/include/async_simple/Common.h +++ b/include/async_simple/Common.h @@ -44,6 +44,20 @@ #endif // __SANITIZE_ADDRESS__ #endif // __GNUC__ +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::coro_only_destroy_when_complete) +#define CORO_ONLY_DESTROY_WHEN_DONE [[ACC::coro_only_destroy_when_complete]] +#else +#define CORO_ONLY_DESTROY_WHEN_DONE +#endif + +#if defined(__alibaba_clang__) && \ + __has_cpp_attribute(ACC::elideable_after_await) +#define ELIDEABLE_AFTER_AWAIT [[ACC::elideable_after_await]] +#else +#define ELIDEABLE_AFTER_AWAIT +#endif + namespace async_simple { // Different from assert, logicAssert is meaningful in // release mode. logicAssert should be used in case that diff --git a/include/async_simple/Executor.h b/include/async_simple/Executor.h index 47eb8b72..d1edfedf 100644 --- a/include/async_simple/Executor.h +++ b/include/async_simple/Executor.h @@ -20,7 +20,9 @@ #include #include #include +#include "async_simple/MoveWrapper.h" #include "async_simple/experimental/coroutine.h" +#include "async_simple/util/move_only_function.h" namespace async_simple { // Stat information for an executor. @@ -86,6 +88,13 @@ class Executor { // func will not be executed. In case schedule return true, the executor // should guarantee that the func would be executed. virtual bool schedule(Func func) = 0; + + // Schedule a move only functor + bool schedule_move_only(util::move_only_function func) { + MoveWrapper tmp(std::move(func)); + return schedule([func = tmp]() { func.get()(); }); + } + // Return true if caller runs in the executor. virtual bool currentThreadInExecutor() const { throw std::logic_error("Not implemented"); @@ -143,10 +152,7 @@ class Executor::TimeAwaiter { template void await_suspend(std::coroutine_handle continuation) { - std::function func = [c = continuation]() mutable { - c.resume(); - }; - _ex->schedule(func, _dur); + _ex->schedule(std::move(continuation), _dur); } void await_resume() const noexcept {} diff --git a/include/async_simple/Future.h b/include/async_simple/Future.h index fc6ce832..014e9578 100644 --- a/include/async_simple/Future.h +++ b/include/async_simple/Future.h @@ -58,6 +58,7 @@ class Future { public: using value_type = T; + Future(FutureState* fs) : _sharedState(fs) { if (_sharedState) { _sharedState->attachOne(); @@ -89,6 +90,8 @@ class Future { return *this; } + auto coAwait(Executor*) && noexcept { return std::move(*this); } + public: bool valid() const { return _sharedState != nullptr || _localState.hasResult(); @@ -349,7 +352,9 @@ template Future makeReadyFuture(std::exception_ptr ex) { return Future(Try(ex)); } -inline Future makeReadyFuture() { return Future(Try()); } +inline Future makeReadyFuture() { + return Future(Try(Unit())); +} } // namespace async_simple diff --git a/include/async_simple/FutureState.h b/include/async_simple/FutureState.h index 9cac785f..7f495839 100644 --- a/include/async_simple/FutureState.h +++ b/include/async_simple/FutureState.h @@ -27,8 +27,8 @@ #include #include "async_simple/Common.h" #include "async_simple/Executor.h" -#include "async_simple/MoveWrapper.h" #include "async_simple/Try.h" +#include "async_simple/util/move_only_function.h" namespace async_simple { @@ -62,7 +62,7 @@ constexpr State operator&(State lhs, State rhs) { template class FutureState { private: - using Continuation = std::function&& value)>; + using Continuation = util::move_only_function&& value)>; private: // A helper to help FutureState to count the references to guarantee @@ -228,11 +228,10 @@ class FutureState { void setContinuation(F&& func) { logicAssert(!hasContinuation(), "FutureState already has a continuation"); - MoveWrapper lambdaFunc(std::move(func)); - new (&_continuation) Continuation([lambdaFunc](Try&& v) mutable { - auto& lambda = lambdaFunc.get(); - lambda(std::forward>(v)); - }); + new (&_continuation) + Continuation([func = std::move(func)](Try&& v) mutable { + func(std::forward>(v)); + }); auto state = _state.load(std::memory_order_acquire); switch (state) { diff --git a/include/async_simple/LocalState.h b/include/async_simple/LocalState.h index f205a045..297de380 100644 --- a/include/async_simple/LocalState.h +++ b/include/async_simple/LocalState.h @@ -27,7 +27,6 @@ #include #include "async_simple/Common.h" #include "async_simple/Executor.h" -#include "async_simple/MoveWrapper.h" #include "async_simple/Try.h" namespace async_simple { diff --git a/include/async_simple/MoveWrapper.h b/include/async_simple/MoveWrapper.h index 698039a3..0fbfb9b4 100644 --- a/include/async_simple/MoveWrapper.h +++ b/include/async_simple/MoveWrapper.h @@ -18,6 +18,7 @@ #define ASYNC_SIMPLE_MOVEWRAPPER_H #include + #include "async_simple/Common.h" namespace async_simple { @@ -26,23 +27,23 @@ namespace async_simple { // copy as move. template class MoveWrapper { -public: - MoveWrapper() = default; - MoveWrapper(T&& value) : _value(std::move(value)) {} + public: + MoveWrapper() = default; + MoveWrapper(T&& value) : _value(std::move(value)) {} - MoveWrapper(const MoveWrapper& other) : _value(std::move(other._value)) {} - MoveWrapper(MoveWrapper&& other) : _value(std::move(other._value)) {} + MoveWrapper(const MoveWrapper& other) : _value(std::move(other._value)) {} + MoveWrapper(MoveWrapper&& other) : _value(std::move(other._value)) {} - MoveWrapper& operator=(const MoveWrapper&) = delete; - MoveWrapper& operator=(MoveWrapper&&) = delete; + MoveWrapper& operator=(const MoveWrapper&) = delete; + MoveWrapper& operator=(MoveWrapper&&) = delete; - T& get() { return _value; } - const T& get() const { return _value; } + T& get() { return _value; } + const T& get() const { return _value; } - ~MoveWrapper() {} + ~MoveWrapper() {} -private: - mutable T _value; + private: + mutable T _value; }; } // namespace async_simple diff --git a/include/async_simple/Try.h b/include/async_simple/Try.h index 6fa2be33..8ec40189 100644 --- a/include/async_simple/Try.h +++ b/include/async_simple/Try.h @@ -118,7 +118,7 @@ class Try { } std::exception_ptr getException() const { logicAssert(std::holds_alternative(_value), - "Try object do not has on error"); + "Try object do not has an error"); return std::get(_value); } diff --git a/include/async_simple/coro/Collect.h b/include/async_simple/coro/Collect.h index 93e48e53..218cd7b4 100644 --- a/include/async_simple/coro/Collect.h +++ b/include/async_simple/coro/Collect.h @@ -26,6 +26,7 @@ #include #include "async_simple/Common.h" #include "async_simple/Try.h" +#include "async_simple/Unit.h" #include "async_simple/coro/CountEvent.h" #include "async_simple/coro/Lazy.h" #include "async_simple/experimental/coroutine.h" @@ -66,9 +67,7 @@ struct CollectAnyResult { bool hasError() const { return _value.hasError(); } // Require hasError() == true. Otherwise it is UB to call // this method. - std::exception_ptr getException() const { - return _value.getException(); - } + std::exception_ptr getException() const { return _value.getException(); } // Require hasError() == false. Otherwise it is UB to call // value() method. @@ -85,7 +84,7 @@ struct CollectAnyResult { #endif }; -template +template struct CollectAnyAwaiter { using ValueType = typename LazyType::ValueType; using ResultType = CollectAnyResult; @@ -93,10 +92,17 @@ struct CollectAnyAwaiter { CollectAnyAwaiter(std::vector&& input) : _input(std::move(input)), _result(nullptr) {} + CollectAnyAwaiter(std::vector&& input, Callback callback) + : _input(std::move(input)), + _result(nullptr), + _callback(std::move(callback)) {} + CollectAnyAwaiter(const CollectAnyAwaiter&) = delete; CollectAnyAwaiter& operator=(const CollectAnyAwaiter&) = delete; CollectAnyAwaiter(CollectAnyAwaiter&& other) - : _input(std::move(other._input)), _result(std::move(other._result)) {} + : _input(std::move(other._input)), + _result(std::move(other._result)), + _callback(std::move(other._callback)) {} bool await_ready() const noexcept { return _input.empty() || @@ -115,6 +121,7 @@ struct CollectAnyAwaiter { // if any coroutine finishes before this function. auto result = std::make_shared(); auto event = std::make_shared(input.size()); + auto callback = std::move(_callback); _result = result; for (size_t i = 0; @@ -124,26 +131,130 @@ struct CollectAnyAwaiter { input[i]._coro.promise()._executor = executor; } - input[i].start([i = i, size = input.size(), r = result, - c = continuation, - e = event](Try&& result) mutable { - assert(e != nullptr); - auto count = e->downCount(); - if (count == size + 1) { - r->_idx = i; - r->_value = std::move(result); - c.resume(); - } - }); + if constexpr (std::is_same_v) { + (void)callback; + input[i].start([i, size = input.size(), r = result, + c = continuation, + e = event](Try&& result) mutable { + assert(e != nullptr); + auto count = e->downCount(); + if (count == size + 1) { + r->_idx = i; + r->_value = std::move(result); + c.resume(); + } + }); + } else { + input[i].start([i, size = input.size(), r = result, + c = continuation, e = event, + callback](Try&& result) mutable { + assert(e != nullptr); + auto count = e->downCount(); + if (count == size + 1) { + r->_idx = i; + (*callback)(i, std::move(result)); + c.resume(); + } + }); + } } // end for } auto await_resume() { - assert(_result != nullptr); - return std::move(*_result); + if constexpr (std::is_same_v) { + assert(_result != nullptr); + return std::move(*_result); + } else { + return _result->index(); + } } std::vector _input; std::shared_ptr _result; + [[no_unique_address]] Callback _callback; +}; + +template +struct CollectAnyVariadicPairAwaiter { + using InputType = std::tuple; + + CollectAnyVariadicPairAwaiter(Ts&&... inputs) + : _input(std::move(inputs)...), _result(nullptr) {} + + CollectAnyVariadicPairAwaiter(InputType&& inputs) + : _input(std::move(inputs)), _result(nullptr) {} + + CollectAnyVariadicPairAwaiter(const CollectAnyVariadicPairAwaiter&) = + delete; + CollectAnyVariadicPairAwaiter& operator=( + const CollectAnyVariadicPairAwaiter&) = delete; + CollectAnyVariadicPairAwaiter(CollectAnyVariadicPairAwaiter&& other) + : _input(std::move(other._input)), _result(std::move(other._result)) {} + + bool await_ready() const noexcept { + return _result && _result->has_value(); + } + + void await_suspend(std::coroutine_handle<> continuation) { + auto promise_type = + std::coroutine_handle::from_address( + continuation.address()) + .promise(); + auto executor = promise_type._executor; + auto event = + std::make_shared(std::tuple_size()); + auto result = std::make_shared>(); + _result = result; + + auto input = std::move(_input); + + [&](std::index_sequence) { + ( + [&](auto& lazy, auto& callback) { + if (result->has_value()) { + return; + } + + if (!lazy._coro.promise()._executor) { + lazy._coro.promise()._executor = executor; + } + + lazy.start([result, event, continuation, + callback](auto&& res) mutable { + auto count = event->downCount(); + if (count == std::tuple_size() + 1) { + callback(std::move(res)); + *result = I; + continuation.resume(); + } + }); + }(std::get<0>(std::get(input)), + std::get<1>(std::get(input))), + ...); + } + (std::make_index_sequence()); + } + + auto await_resume() { + assert(_result != nullptr); + return std::move(_result->value()); + } + + std::tuple _input; + std::shared_ptr> _result; +}; + +template +struct SimpleCollectAnyVariadicPairAwaiter { + using InputType = std::tuple; + + InputType _inputs; + + SimpleCollectAnyVariadicPairAwaiter(Ts&&... inputs) + : _inputs(std::move(inputs)...) {} + + auto coAwait(Executor* ex) { + return CollectAnyVariadicPairAwaiter(std::move(_inputs)); + } }; template