34 Execution control library [exec]

34.9 Senders [exec.snd]

34.9.10 execution​::​connect [exec.connect]

connect connects ([exec.async.ops]) a sender with a receiver.
The name connect denotes a customization point object.
For subexpressions sndr and rcvr, let Sndr be decltype((sndr)) and Rcvr be decltype((rcvr)), let new_sndr be the expression transform_sender(decltype(get-domain-late(sndr, get_env(rcvr))){}, sndr, get_env(rcvr)) and let DS and DR be decay_t<decltype((new_sndr))> and decay_t<Rcvr>, respectively.
Let connect-awaitable-promise be the following exposition-only class:
namespace std::execution { struct connect-awaitable-promise : with-await-transform<connect-awaitable-promise> { connect-awaitable-promise(DS&, DR& rcvr) noexcept : rcvr(rcvr) {} suspend_always initial_suspend() noexcept { return {}; } [[noreturn]] suspend_always final_suspend() noexcept { terminate(); } [[noreturn]] void unhandled_exception() noexcept { terminate(); } [[noreturn]] void return_void() noexcept { terminate(); } coroutine_handle<> unhandled_stopped() noexcept { set_stopped(std::move(rcvr)); return noop_coroutine(); } operation-state-task get_return_object() noexcept { return operation-state-task{ coroutine_handle<connect-awaitable-promise>::from_promise(*this)}; } env_of_t<DR> get_env() const noexcept { return execution::get_env(rcvr); } private: DR& rcvr; // exposition only }; }
Let operation-state-task be the following exposition-only class: namespace std::execution { struct operation-state-task { using operation_state_concept = operation_state_t; using promise_type = connect-awaitable-promise; explicit operation-state-task(coroutine_handle<> h) noexcept : coro(h) {} operation-state-task(operation-state-task&& o) noexcept : coro(exchange(o.coro, {})) {} ~operation-state-task() { if (coro) coro.destroy(); } void start() & noexcept { coro.resume(); } private: coroutine_handle<> coro; // exposition only }; }
Let V name the type await-result-type<DS, connect-awaitable-promise>, let Sigs name the type completion_signatures< SET-VALUE-SIG(V), // see ([exec.snd.concepts]) set_error_t(exception_ptr), set_stopped_t()> and let connect-awaitable be an exposition-only coroutine defined as follows: namespace std::execution { template<class Fun, class... Ts> auto suspend-complete(Fun fun, Ts&&... as) noexcept { // exposition only auto fn = [&, fun]() noexcept { fun(std::forward<Ts>(as)...); }; struct awaiter { decltype(fn) fn; // exposition only static constexpr bool await_ready() noexcept { return false; } void await_suspend(coroutine_handle<>) noexcept { fn(); } [[noreturn]] void await_resume() noexcept { unreachable(); } }; return awaiter{fn}; } operation-state-task connect-awaitable(DS sndr, DR rcvr) requires receiver_of<DR, Sigs> { exception_ptr ep; try { if constexpr (same_as<V, void>) { co_await std::move(sndr); co_await suspend-complete(set_value, std::move(rcvr)); } else { co_await suspend-complete(set_value, std::move(rcvr), co_await std::move(sndr)); } } catch(...) { ep = current_exception(); } co_await suspend-complete(set_error, std::move(rcvr), std::move(ep)); } }
The expression connect(sndr, rcvr) is expression-equivalent to:
  • new_sndr.connect(rcvr) if that expression is well-formed.
    Mandates: The type of the expression above satisfies operation_state.
  • Otherwise, connect-awaitable(new_sndr, rcvr).
Mandates: sender<Sndr> && receiver<Rcvr> is true.