Boost.Asio의 스트랜드와 우선 순위 래퍼를 동시에 사용하고 싶습니다.
코드를 작성하기 전에 다음 정보를 읽었습니다.
http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531
boost :: asio를 사용할 때 연결 당 가닥이 필요한 이유는 무엇입니까?
async_read, async_write 및 async_connect와 같은 다양한 비동기 API를 사용하고 싶기 때문에 래퍼 방식을 사용하고 싶습니다. http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531 에 따르면 우선 순위 래퍼와 스트랜드 래퍼를 결합 할 수있는 것으로 보입니다.
그래서 다음 예제를 기반으로 코드를 작성했습니다.
내 코드는 다음과 같습니다.
#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}
void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}
template <typename Handler>
class wrapped_handler {
public:
wrapped_handler(handler_priority_queue& q, int p, Handler h)
: queue_(q), priority_(p), handler_(std::move(h))
{
}
template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}
private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}
friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}
private:
int priority_;
std::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}
//----------------------------------------------------------------------
int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;
boost::asio::io_service ios;
boost::asio::strand strand(ios);
handler_priority_queue pq;
for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_PRIORITY
)
#endif
#if ENABLE_STRAND
)
#endif
);
}
std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}
래퍼는 다음 매크로에 의해 활성화됩니다.
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
두 매크로가 모두 활성화되면 다음과 같은 결과가 나타납니다.
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
execute(3)
execute(2)
execute(1)
execute(0)
before run_one()
before run_one()
before run_one()
나는 내가 얻을 것으로 기대한다
[called] priority,thread_id
다음으로 출력
[called] 1,140512649541376
하지만 이해하지 못했습니다.
함수 execute()
에서는 function_()
호출되었지만 wrapped_handler::operator()
호출되지 않은 것 같습니다 . (이 함수 execute()
는 pq.execute_all();
내 코드에서 호출됩니다 .)
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_(); // It is called.
}
template <typename Handler>
class wrapped_handler {
public:
template <typename... Args>
void operator()(Args&&... args) { // It is NOT called
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
function_()
호출 된 후 시퀀스를 추적했습니다 .
다음 함수가 호출됩니다.
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63 .0 / include / boost / asio / detail / wrapped_handler.hpp # L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https : / /github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63. 0 / include / boost / asio / detail / impl / strand_service.ipp # L94
그런 다음 함수 bool strand_service::do_dispatch(implementation_type& impl, operation* op)
에서 작업 op
이 호출되지 않고 다음 줄의 대기열로 푸시됩니다.
왜 그것이 function_()
strand_service로 파견 되었는지 잘 모르겠습니다 . 내 코드의 다음 지점에서 스트랜드 래퍼가 이미 풀린 것 같습니다.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}
우선 순위 래퍼 만 활성화하면 다음과 같은 결과가 나타납니다. 예상대로 작동하는 것 같습니다.
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
operator()
[called] 4,140512649541376
execute(3)
operator()
[called] 3,140512649541376
execute(2)
operator()
[called] 2,140512649541376
execute(1)
operator()
[called] 1,140512649541376
execute(0)
operator()
[called] 0,140512649541376
before run_one()
before run_one()
before run_one()
스트랜드 래퍼 만 활성화하면 다음과 같은 결과가 나타납니다. 예상대로 작동하는 것 같습니다.
before run_one()
[called] 0,140127385941760
before poll_one()
[called] 1,140127385941760
[called] 2,140127385941760
[called] 3,140127385941760
[called] 4,140127385941760
before execute_all()
before run_one()
before run_one()
before run_one()
어떤 아이디어?
나는 문제를 해결했다.
function_ ()이 strand_service로 발송되는 이유를 모르겠습니다. 내 코드의 다음 지점에서 스트랜드 래퍼가 이미 풀린 것 같습니다.
template <typename Function, typename Handler> void asio_handler_invoke(Function&& f, handler_priority_queue::wrapped_handler<Handler>* h) { std::cout << "asio_handler_invoke " << std::endl; h->queue_.add(h->priority_, std::forward<Function>(f)); }
매개 변수 f
는 원래 핸들러입니다. 즉, 우선 순위 큐 래핑 및 스트랜드 래핑 처리기를 의미합니다. 스트랜드 래퍼가 밖에 있습니다. 따라서를 호출하면 strand_service f
로 전달됩니다. 이 프로세스는 동일한 strand_service에서 발생하므로 핸들러가 호출되지 않습니다.
이 문제를 해결하려면 다음 h->handler_
대신 우선 순위가 지정된 대기열에 추가하십시오 f
.
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}
handler_
클래스 템플릿의 멤버 변수입니다 wrapped_handler
. 랩핑되지 않은 핸들러를 보유합니다.
다음은 완전한 코드입니다.
#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}
void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}
template <typename Handler>
class wrapped_handler {
public:
template <typename HandlerArg>
wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h)
: queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h))
{
}
template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}
private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}
friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}
private:
int priority_;
std::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}
//----------------------------------------------------------------------
int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;
boost::asio::io_service ios;
boost::asio::strand strand(ios);
handler_priority_queue pq;
for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_STRAND
)
#endif
#if ENABLE_PRIORITY
)
#endif
);
}
std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}
다음은 출력입니다.
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
[called] 4,139903315736320
execute(3)
[called] 3,139903315736320
execute(2)
[called] 2,139903315736320
execute(1)
[called] 1,139903315736320
execute(0)
[called] 0,139903315736320
before run_one()
before run_one()
before run_one()
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다