Asio Notes

本文记录 Asio 相关的笔记

Tutorial

Using a timer synchronously

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#include <iostream>
#include <asio.hpp>

int main()
{
  asio::io_context io;

  asio::steady_timer t(io, asio::chrono::seconds(5));
  t.wait();

  std::cout << "Hello, world!" << std::endl;

  return 0;
}

Using a timer asynchronously

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <asio.hpp>

void print(const asio::error_code& /*e*/)
{
  std::cout << "Hello, world!" << std::endl;
}

int main()
{
  asio::io_context io;

  asio::steady_timer t(io, asio::chrono::seconds(5));
  t.async_wait(&print);

  io.run();

  return 0;
}

We must call the io_context::run() member function on the io_context object.

The asio library provides a guarantee that callback handlers will only be called from threads that are currently calling io_context::run(). Therefore unless the io_context::run() function is called the callback for the asynchronous wait completion will never be invoked.

The io_context::run() function will also continue to run while there is still "work" to do. In this example, the work is the asynchronous wait on the timer, so the call will not return until the timer has expired and the callback has completed.

It is important to remember to give the io_context some work to do before calling io_context::run(). For example, if we had omitted the above call to steady_timer::async_wait(), the io_context would not have had any work to do, and consequently io_context::run() would have returned immediately.

Binding arguments to a handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <asio.hpp>
#include <boost/bind/bind.hpp>

void print(const asio::error_code& /*e*/,
    asio::steady_timer* t, int* count)
{
  if (*count < 5)
  {
    std::cout << *count << std::endl;
    ++(*count);

    t->expires_at(t->expiry() + asio::chrono::seconds(1));
    t->async_wait(boost::bind(print,
          asio::placeholders::error, t, count));
  }
}

int main()
{
  asio::io_context io;

  int count = 0;
  asio::steady_timer t(io, asio::chrono::seconds(1));
  t.async_wait(boost::bind(print,
        asio::placeholders::error, &t, &count));

  io.run();

  std::cout << "Final count is " << count << std::endl;

  return 0;
}

To implement a repeating timer using asio you need to change the timer's expiry time in your callback function, and to then start a new asynchronous wait.

Using a member function as a handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <asio.hpp>
#include <boost/bind/bind.hpp>

class printer
{
public:
  printer(asio::io_context& io)
    : timer_(io, asio::chrono::seconds(1)),
      count_(0)
  {
    timer_.async_wait(boost::bind(&printer::print, this));
  }

  ~printer()
  {
    std::cout << "Final count is " << count_ << std::endl;
  }

  void print()
  {
    if (count_ < 5)
    {
      std::cout << count_ << std::endl;
      ++count_;

      timer_.expires_at(timer_.expiry() + asio::chrono::seconds(1));
      timer_.async_wait(boost::bind(&printer::print, this));
    }
  }

private:
  asio::steady_timer timer_;
  int count_;
};

int main()
{
  asio::io_context io;
  printer p(io);
  io.run();

  return 0;
}

Synchronising handlers in multithreaded programs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <asio.hpp>
#include <boost/bind/bind.hpp>

class printer
{
public:
  printer(asio::io_context& io)
    : strand_(asio::make_strand(io)),
      timer1_(io, asio::chrono::seconds(1)),
      timer2_(io, asio::chrono::seconds(1)),
      count_(0)
  {
    timer1_.async_wait(asio::bind_executor(strand_,
          boost::bind(&printer::print1, this)));

    timer2_.async_wait(asio::bind_executor(strand_,
          boost::bind(&printer::print2, this)));
  }

  ~printer()
  {
    std::cout << "Final count is " << count_ << std::endl;
  }

  void print1()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 1: " << count_ << std::endl;
      ++count_;

      timer1_.expires_at(timer1_.expiry() + asio::chrono::seconds(1));

      timer1_.async_wait(asio::bind_executor(strand_,
            boost::bind(&printer::print1, this)));
    }
  }

  void print2()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 2: " << count_ << std::endl;
      ++count_;

      timer2_.expires_at(timer2_.expiry() + asio::chrono::seconds(1));

      timer2_.async_wait(asio::bind_executor(strand_,
            boost::bind(&printer::print2, this)));
    }
  }

private:
  asio::strand<asio::io_context::executor_type> strand_;
  asio::steady_timer timer1_;
  asio::steady_timer timer2_;
  int count_;
};

int main()
{
  asio::io_context io;
  printer p(io);
  asio::thread t(boost::bind(&asio::io_context::run, &io));
  io.run();
  t.join();

  return 0;
}

The previous four tutorials avoided the issue of handler synchronisation by calling the io_context::run() function from one thread only. As you already know, the asio library provides a guarantee that callback handlers will only be called from threads that are currently calling io_context::run(). Consequently, calling io_context::run() from only one thread ensures that callback handlers cannot run concurrently.

We could use a pool of threads calling io_context::run() allows handlers to execute concurrently. Thus, we need a method of synchronisation when handlers might be accessing a shared, thread-unsafe resource.

The strand class template is an executor adapter that guarantees that, for those handlers that are dispatched through it, an executing handler will be allowed to complete before the next one is started. This is guaranteed irrespective of the number of threads that are calling io_context::run(). Of course, the handlers may still execute concurrently with other handlers that were not dispatched through an strand, or were dispatched through a different strand object. By binding the handlers to the same strand, we are ensuring that they cannot execute concurrently.

A synchronous TCP daytime client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>

using asio::ip::tcp;

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: client <host>" << std::endl;
      return 1;
    }

    asio::io_context io_context;

    tcp::resolver resolver(io_context);
    tcp::resolver::results_type endpoints =
      resolver.resolve(argv[1], "daytime");

    tcp::socket socket(io_context);
    asio::connect(socket, endpoints);

    for (;;)
    {
      boost::array<char, 128> buf;
      asio::error_code error;

      size_t len = socket.read_some(asio::buffer(buf), error);

      if (error == asio::error::eof)
        break; // Connection closed cleanly by peer.
      else if (error)
        throw asio::system_error(error); // Some other error.

      std::cout.write(buf.data(), len);
    }
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

A synchronous TCP daytime server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>

using asio::ip::tcp;

std::string make_daytime_string()
{
  using namespace std; // For time_t, time and ctime;
  time_t now = time(0);
  return ctime(&now);
}

int main()
{
  try
  {
    asio::io_context io_context;

    tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 13));

    for (;;)
    {
      tcp::socket socket(io_context);
      acceptor.accept(socket);

      std::string message = make_daytime_string();

      asio::error_code ignored_error;
      asio::write(socket, asio::buffer(message), ignored_error);
    }
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

An asynchronous TCP daytime server

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
#include <ctime>
#include <iostream>
#include <string>
#include <boost/bind/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <asio.hpp>

using asio::ip::tcp;

std::string make_daytime_string()
{
  using namespace std; // For time_t, time and ctime;
  time_t now = time(0);
  return ctime(&now);
}

class tcp_connection
  : public boost::enable_shared_from_this<tcp_connection>
{
public:
  typedef boost::shared_ptr<tcp_connection> pointer;

  static pointer create(asio::io_context& io_context)
  {
    return pointer(new tcp_connection(io_context));
  }

  tcp::socket& socket()
  {
    return socket_;
  }

  void start()
  {
    message_ = make_daytime_string();

    asio::async_write(socket_, asio::buffer(message_),
        boost::bind(&tcp_connection::handle_write, shared_from_this(),
          asio::placeholders::error,
          asio::placeholders::bytes_transferred));
  }

private:
  tcp_connection(asio::io_context& io_context)
    : socket_(io_context)
  {
  }

  void handle_write(const asio::error_code& /*error*/,
      size_t /*bytes_transferred*/)
  {
  }

  tcp::socket socket_;
  std::string message_;
};

class tcp_server
{
public:
  tcp_server(asio::io_context& io_context)
    : io_context_(io_context),
      acceptor_(io_context, tcp::endpoint(tcp::v4(), 13))
  {
    start_accept();
  }

private:
  void start_accept()
  {
    tcp_connection::pointer new_connection =
      tcp_connection::create(io_context_);

    acceptor_.async_accept(new_connection->socket(),
        boost::bind(&tcp_server::handle_accept, this, new_connection,
          asio::placeholders::error));
  }

  void handle_accept(tcp_connection::pointer new_connection,
      const asio::error_code& error)
  {
    if (!error)
    {
      new_connection->start();
    }

    start_accept();
  }

  asio::io_context& io_context_;
  tcp::acceptor acceptor_;
};

int main()
{
  try
  {
    asio::io_context io_context;
    tcp_server server(io_context);
    io_context.run();
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}

同步 VS 异步

同步客户端:

1
2
3
4
5
using boost::asio;
io_service service;
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
ip::tcp::socket sock(service);
sock.connect(ep);

你的程序至少需要一个io_service实例。Boost.Asio使用io_service同操作系统的输入/输出服务进行交互。然后,创建你想要连接的地址和端口,再建立socket。把socket连接到你创建的地址和端口。

同步服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
io_service service;
ip::tcp::endpoint ep(ip::tcp::v4(), 2001); // listen on 2001
ip::tcp::acceptor acc(service, ep);
while ( true) {
    socket_ptr sock(new ip::tcp::socket(service));
    acc.accept(*sock);
    boost::thread( boost::bind(client_session, sock));
}
void client_session(socket_ptr sock) {
    while ( true) {
        char data[512];
        size_t len = sock->read_some(buffer(data));
        if ( len > 0)
            write(*sock, buffer("ok", 2));
    }
}

同样是至少需要一个io_service实例。然后你指定你想要监听的端口,再创建一个接收器——一个用来接收客户端连接的对象。 在接下来的循环中,你创建一个虚拟的socket来等待客户端的连接。然后当一个连接被建立时,你创建一个线程来处理这个连接。

异步的客户端:

1
2
3
4
5
6
7
8
9
using boost::asio;
io_service service;
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
ip::tcp::socket sock(service);
sock.async_connect(ep, connect_handler);
service.run();
void connect_handler(const boost::system::error_code & ec) {
    // 如果ec返回成功我们就可以知道连接成功了
}

只要还有待处理的异步操作,service.run()循环就会一直运行。在上述例子中,只执行了一个这样的操作,就是socket的async_connect。在这之后,service.run()就退出了。

异步服务端;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
using boost::asio;
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
io_service service;
ip::tcp::endpoint ep( ip::tcp::v4(), 2001)); // 监听端口2001
ip::tcp::acceptor acc(service, ep);
socket_ptr sock(new ip::tcp::socket(service));
start_accept(sock);
service.run();
void start_accept(socket_ptr sock) {
    acc.async_accept(*sock, boost::bind( handle_accept, sock, _1) );
}
void handle_accept(socket_ptr sock, const boost::system::error_code &
err) {
    if ( err) return;
    // 从这里开始, 你可以从socket读取或者写入
    socket_ptr sock(new ip::tcp::socket(service));
    start_accept(sock);
}

你创建一个io_service实例,指定监听的端口。然后,你创建接收器acc——一个接受客户端连接,创建虚拟的socket,异步等待客户端连接的对象。

最后,运行异步service.run()循环。当接收到客户端连接时,handle_accept被调用(调用async_accept的完成处理程序)。如果没有错误,这个socket就可以用来做读写操作。

在使用这个socket之后,你创建了一个新的socket,然后再次调用start_accept(),用来创建另外一个“等待客户端连接”的异步操作,从而使service.run()循环一直保持忙碌状态。

不仅仅是网络通信

除了网络通信,Boost.Asio还包含了其他的I/O功能。

Boost.Asio支持信号量,比如SIGTERM(软件终止)、SIGINT(中断信号)、SIGSEGV(段错误)等等。 你可以创建一个signal_set实例,指定异步等待的信号量,然后当这些信号量产生时,就会调用你的异步处理程序:

1
2
3
4
5
6
void signal_handler(const boost::system::error_code & err, int signal)
{
    // 纪录日志,然后退出应用
}
boost::asio::signal_set sig(service, SIGINT, SIGTERM);
sig.async_wait(signal_handler);

计时器

一些I/O操作需要一个超时时间。这只能应用在异步操作上。例如,下一条信息必须在100毫秒内从你的同伴那传递给你。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
bool read = false;
void deadline_handler(const boost::system::error_code &) {
    std::cout << (read ? "read successfully" : "read failed") << std::endl;
}
void read_handler(const boost::system::error_code &) {
    read = true;
}
ip::tcp::socket sock(service);

read = false;
char data[512];
sock.async_read_some(buffer(data, 512));
deadline_timer t(service, boost::posix_time::milliseconds(100));
t.async_wait(&deadline_handler);
service.run();

io_service 类

io_service 是这个库里面最重要的类;它负责和操作系统打交道,等待所有异步操作的结束,然后为每一个异步操作调用其完成处理程序。

一个 io_service 实例和一个处理线程的单线程例子:

1
2
3
4
5
6
7
io_service service; // 所有socket操作都由service来处理
ip::tcp::socket sock1(service); // all the socket operations are handled by service
ip::tcp::socket sock2(service); sock1.asyncconnect( ep, connect_handler);
sock2.async_connect( ep, connect_handler);
deadline_timer t(service, boost::posixtime::seconds(5));
t.async_wait(timeout_handler);
service.run();

一个 io_service 实例和多个处理线程的多线程例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
io_service service;
ip::tcp::socket sock1(service);
ip::tcp::socket sock2(service);
sock1.asyncconnect( ep, connect_handler);
sock2.async_connect( ep, connect_handler);
deadline_timer t(service, boost::posixtime::seconds(5));
t.async_wait(timeout_handler);
for ( int i = 0; i < 5; ++i)
    boost::thread( run_service);
void run_service()
{
    service.run();
}

多个io_service实例和多个处理线程的多线程例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
io_service service[2];
ip::tcp::socket sock1(service[0]);
ip::tcp::socket sock2(service[1]);
sock1.asyncconnect( ep, connect_handler);
sock2.async_connect( ep, connect_handler);
deadline_timer t(service[0], boost::posixtime::seconds(5));
t.async_wait(timeout_handler);
for ( int i = 0; i < 2; ++i)
    boost::thread( boost::bind(run_service, i));
void run_service(int idx)
{
    service[idx].run();
}
  • 第一种情况是非常基础的应用程序。因为是串行的方式,所以当几个处理程序需要被同时调用时,你通常会遇到瓶颈。如果一个处理程序需要花费很长的时间来执行,所有随后的处理程序都不得不等待。
  • 第二种情况是比较适用的应用程序。他是非常强壮的——如果几个处理程序被同时调用了(这是有可能的),它们会在各自的线程里面被调用。唯一的瓶颈就是所有的处理线程都很忙的同时又有新的处理程序被调用。然而,这是有快速的解决方式的,增加处理线程的数目即可。
  • 第三种情况则可以支持更大的并发数量。

如果你想要service.run()接着执行,你需要分配更多的工作给它。这里有两个方式来完成这个目标。一种方式是在connect_handler中启动另外一个异步操作来分配更多的工作。 另一种方式会模拟一些工作给它,用下面的代码片段:

1
2
typedef boost::shared_ptr work_ptr;
work_ptr dummy_work(new io_service::work(service));

为了实现监听循环,io_service 类提供了 4 个方法,比如:run(), run_one(), poll() 和 poll_one()。 大部分情况下使用 run() 即可。

run()

如果有等待执行的操作,run()会一直执行,直到你手动调用io_service::stop()。为了保证io_service一直执行,通常你添加一个或者多个异步操作,然后在它们被执行时,你继续一直不停地添加异步操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
    sock.async_read_some(buffer(buff_read), on_read);
}
void on_read(const boost::system::error_code &err, std::size_t bytes)
{
    // ... 处理读取操作 ...
    sock.async_write_some(buffer(buff_write,3), on_write);
}
void on_connect(const boost::system::error_code &err) {
    sock.async_read_some(buffer(buff_read), on_read);
}
int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
    sock.async_connect(ep, on_connect);
    service.run();
}

异步工作

异步工作不仅仅指用异步地方式接受客户端到服务端的连接、异步地从一个socket读取或者写入到socket。它包含了所有可以异步执行的操作。

默认情况下,你是不知道每个异步handler的调用顺序的。除了通常的异步调用(来自异步socket的读取/写入/接收)。你可以使用service.post()来使你的自定义方法被异步地调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
io_service service;
void func(int i) {
    std::cout << "func called, i= " << i << std::endl;
}

void worker_thread() {
    service.run();
}

int main(int argc, char* argv[]) {
    for ( int i = 0; i < 10; ++i)
        service.post(boost::bind(func, i));
    boost::thread_group threads;
    for ( int i = 0; i < 3; ++i)
        threads.create_thread(worker_thread);
    // 等待所有线程被创建完
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    threads.join_all();
}

service.post(some_function)添加了一个异步方法调用。

这个方法在某一个调用了service.run()的线程中请求io_service实例,然后调用给定的some_funtion之后立即返回。

有时候你会想让一些异步处理方法顺序执行。比如,你去一个餐馆(go_to_restaurant),下单(order),然后吃(eat)。你需要先去餐馆,然后下单,最后吃。这样的话,你需要用到 io_service::strand这个方法会让你的异步方法被顺序调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
using namespace boost::asio;
io_service service;
void func(int i) {
    std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread() {
    service.run();
}
int main(int argc, char* argv[])
{
    io_service::strand strand_one(service), strand_two(service);
    for ( int i = 0; i < 5; ++i)
        service.post( strand_one.wrap( boost::bind(func, i)));
    for ( int i = 5; i < 10; ++i)
        service.post( strand_two.wrap( boost::bind(func, i)));
    boost::thread_group threads;
    for ( int i = 0; i < 3; ++i)
        threads.create_thread(worker_thread);
    // 等待所有线程被创建完
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    threads.join_all();
}

在上述代码中,我们保证前面的5个线程和后面的5个线程是顺序执行的。func called, i = 0在func called, i = 1之前被调用,然后调用func called, i = 2……同样func called, i = 5在func called, i = 6之前,func called, i = 6在func called, i = 7被调用……你需要注意的是尽管方法是顺序调用的,但是不意味着它们都在同一个线程执行。

Boost.Asio 提供了三种方法用于异步调用你提供的函数 handler:

  • service.post(handler) : 该函数会立刻返回,然后在调用了 service.run() 的线程中执行该 handler. 其会将任务放进队列排队,任务会在某个时机被完成。
  • service.dispatch(handler) : 如果调用 dispatch 的线程有调用 io_service.run(),则自身线程会即时请求 io_service 去执行该任务. 否则放置队列进行排队执行。
  • service.wrap(handler) : 该方法将 handler 包装成了一个函数(boost::function<void()> func),当其被调用时,它会调用 io_service.dispatch(handler).

给 post 添加回调函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <functional>

typedef std::function<void()> handler_t;
void foo(handler_t handler)
{
    std::cout << "Hello asio!" << std::endl;

    handler();
}

void handler_fun()
{
    std::cout << "mission complete!" << std::endl;
}

int main(int argc, char* argv[])
{
    boost::asio::io_service ios;
    ios.post(boost::bind(foo, handler_fun));

    std::cout << "after post" << std::endl;

    ios.run();
    return 0;
}

在新版的 boost.asio 废弃了 io_service 使用 io_context 替代。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>

typedef std::function<void()> handler_t;
void foo(handler_t handler)
{
  std::cout << "Hello asio!" << std::endl;

  handler();
}

void handler_fun()
{
  std::cout << "mission complete!" << std::endl;
}

int main(int argc, char* argv[])
{
  boost::asio::io_context ios;
  auto work = boost::asio::require(ios.get_executor(), boost::asio::execution::outstanding_work.tracked);

  std::thread thread0([&ios]() {ios.run();});

  std::this_thread::sleep_for(std::chrono::seconds(1));

  ios.post(boost::bind(foo, handler_fun));

  std::cout << "after post" << std::endl;


  std::cout << "run is done" << std::endl;

  ios.post(boost::bind(foo, handler_fun));

  std::this_thread::sleep_for(std::chrono::seconds(1));
  ios.stop();
  thread0.join();
  return 0;
}

上示例代码,可以时的 io_context.run 处于一直在监听,直至调用了 stop().

编译

编译链接 boost 库

1
g++ main.cpp -lboost_system -lboost_thread -pthread

Example

keep io.run alive

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>

boost::mutex global_stream_lock;

void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Start" << std::endl;
    global_stream_lock.unlock();

    io_service->run();

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Thread Finish" << std::endl;
    global_stream_lock.unlock();
}

size_t fib( size_t n )
{
    if ( n <= 1 )
    {
        return n;
    }
    boost::this_thread::sleep(
        boost::posix_time::milliseconds( 1000 )
        );
    return fib( n - 1 ) + fib( n - 2);
}

void CalculateFib( size_t n )
{
    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] Now calculating fib( " << n << " ) " << std::endl;
    global_stream_lock.unlock();

    size_t f = fib( n );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] fib( " << n << " ) = " << f << std::endl;
    global_stream_lock.unlock();
}

int main( int argc, char * argv[] )
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
        );
    boost::shared_ptr< boost::asio::io_service::work > work(
        new boost::asio::io_service::work( *io_service )
        );

    global_stream_lock.lock();
    std::cout << "[" << boost::this_thread::get_id()
        << "] The program will exit when all work has finished."
        << std::endl;
    global_stream_lock.unlock();

    boost::thread_group worker_threads;
    for( int x = 0; x < 2; ++x )
    {
        worker_threads.create_thread(
            boost::bind( &WorkerThread, io_service )
            );
    }

    io_service->post( boost::bind( CalculateFib, 3 ) );
    io_service->post( boost::bind( CalculateFib, 4 ) );
    io_service->post( boost::bind( CalculateFib, 5 ) );

    work.reset();

    worker_threads.join_all();

    return 0;
}

blocked

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

std::mutex print_lock;
std::condition_variable cv;

void WorkerThread(boost::shared_ptr<boost::asio::io_service> io) {
  io->run();
}

void job(int n) {
  if (n == 0) {
    std::mutex m;
    std::unique_lock<std::mutex> lk(m);
    std::cout << "job 0 start wait signal from job 1 " << std::endl;
    cv.wait(lk);
    std::unique_lock<std::mutex> pk(print_lock);
    std::cout << "Job 0 complete" << std::endl;
  }
  if (n == 1) {
    sleep(3);
    cv.notify_one();
  }
}

int main() {
  boost::shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
  boost::shared_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(*io_service));
  boost::thread_group threads;
  threads.create_thread(boost::bind(&WorkerThread, io_service));
  io_service->post(boost::bind(job, 0));
  io_service->post(boost::bind(job, 1));
  work.reset();
  threads.join_all();
  return 0;
}

io.run() 执行一个 post 的任务,如果被阻塞后并不会执行其它的任务。

New version Boost::Asio

  • io_context::work 弃用,采用 executor_work_guard 用于告知 io_context 还有工作需要做。

Reference

  1. Boost.Asio C++ 网络编成
  2. The boost library
  3. boost asio blog
  4. boost asio official documents
  5. asio io_context
  6. asio tutorial
  7. asio reference
updatedupdated2022-09-212022-09-21