We must call the io_context::run() member function on the io_context object.
The asio library provides a guarantee that callback handlers will only be called from threads that are currently calling io_context::run(). Therefore unless the io_context::run() function is called the callback for the asynchronous wait completion will never be invoked.
The io_context::run() function will also continue to run while there is still "work" to do. In this example, the work is the asynchronous wait on the timer, so the call will not return until the timer has expired and the callback has completed.
It is important to remember to give the io_context some work to do before calling io_context::run(). For example, if we had omitted the above call to steady_timer::async_wait(), the io_context would not have had any work to do, and consequently io_context::run() would have returned immediately.
#include<iostream>#include<asio.hpp>#include<boost/bind/bind.hpp>voidprint(constasio::error_code&/*e*/,asio::steady_timer*t,int*count){if(*count<5){std::cout<<*count<<std::endl;++(*count);t->expires_at(t->expiry()+asio::chrono::seconds(1));t->async_wait(boost::bind(print,asio::placeholders::error,t,count));}}intmain(){asio::io_contextio;intcount=0;asio::steady_timert(io,asio::chrono::seconds(1));t.async_wait(boost::bind(print,asio::placeholders::error,&t,&count));io.run();std::cout<<"Final count is "<<count<<std::endl;return0;}
To implement a repeating timer using asio you need to change the timer's expiry time in your callback function, and to then start a new asynchronous wait.
#include<iostream>#include<asio.hpp>#include<boost/bind/bind.hpp>classprinter{public:printer(asio::io_context&io):timer_(io,asio::chrono::seconds(1)),count_(0){timer_.async_wait(boost::bind(&printer::print,this));}~printer(){std::cout<<"Final count is "<<count_<<std::endl;}voidprint(){if(count_<5){std::cout<<count_<<std::endl;++count_;timer_.expires_at(timer_.expiry()+asio::chrono::seconds(1));timer_.async_wait(boost::bind(&printer::print,this));}}private:asio::steady_timertimer_;intcount_;};intmain(){asio::io_contextio;printerp(io);io.run();return0;}
#include<iostream>#include<asio.hpp>#include<boost/bind/bind.hpp>classprinter{public:printer(asio::io_context&io):strand_(asio::make_strand(io)),timer1_(io,asio::chrono::seconds(1)),timer2_(io,asio::chrono::seconds(1)),count_(0){timer1_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print1,this)));timer2_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print2,this)));}~printer(){std::cout<<"Final count is "<<count_<<std::endl;}voidprint1(){if(count_<10){std::cout<<"Timer 1: "<<count_<<std::endl;++count_;timer1_.expires_at(timer1_.expiry()+asio::chrono::seconds(1));timer1_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print1,this)));}}voidprint2(){if(count_<10){std::cout<<"Timer 2: "<<count_<<std::endl;++count_;timer2_.expires_at(timer2_.expiry()+asio::chrono::seconds(1));timer2_.async_wait(asio::bind_executor(strand_,boost::bind(&printer::print2,this)));}}private:asio::strand<asio::io_context::executor_type>strand_;asio::steady_timertimer1_;asio::steady_timertimer2_;intcount_;};intmain(){asio::io_contextio;printerp(io);asio::threadt(boost::bind(&asio::io_context::run,&io));io.run();t.join();return0;}
The previous four tutorials avoided the issue of handler synchronisation by calling the io_context::run() function from one thread only. As you already know, the asio library provides a guarantee that callback handlers will only be called from threads that are currently calling io_context::run(). Consequently, calling io_context::run() from only one thread ensures that callback handlers cannot run concurrently.
We could use a pool of threads calling io_context::run() allows handlers to execute concurrently. Thus, we need a method of synchronisation when handlers might be accessing a shared, thread-unsafe resource.
The strand class template is an executor adapter that guarantees that, for those handlers that are dispatched through it, an executing handler will be allowed to complete before the next one is started. This is guaranteed irrespective of the number of threads that are calling io_context::run(). Of course, the handlers may still execute concurrently with other handlers that were not dispatched through an strand, or were dispatched through a different strand object. By binding the handlers to the same strand, we are ensuring that they cannot execute concurrently.
#include<iostream>#include<boost/array.hpp>#include<asio.hpp>usingasio::ip::tcp;intmain(intargc,char*argv[]){try{if(argc!=2){std::cerr<<"Usage: client <host>"<<std::endl;return1;}asio::io_contextio_context;tcp::resolverresolver(io_context);tcp::resolver::results_typeendpoints=resolver.resolve(argv[1],"daytime");tcp::socketsocket(io_context);asio::connect(socket,endpoints);for(;;){boost::array<char,128>buf;asio::error_codeerror;size_tlen=socket.read_some(asio::buffer(buf),error);if(error==asio::error::eof)break;// Connection closed cleanly by peer.
elseif(error)throwasio::system_error(error);// Some other error.
std::cout.write(buf.data(),len);}}catch(std::exception&e){std::cerr<<e.what()<<std::endl;}return0;}
#include<ctime>#include<iostream>#include<string>#include<asio.hpp>usingasio::ip::tcp;std::stringmake_daytime_string(){usingnamespacestd;// For time_t, time and ctime;
time_tnow=time(0);returnctime(&now);}intmain(){try{asio::io_contextio_context;tcp::acceptoracceptor(io_context,tcp::endpoint(tcp::v4(),13));for(;;){tcp::socketsocket(io_context);acceptor.accept(socket);std::stringmessage=make_daytime_string();asio::error_codeignored_error;asio::write(socket,asio::buffer(message),ignored_error);}}catch(std::exception&e){std::cerr<<e.what()<<std::endl;}return0;}
#include<ctime>#include<iostream>#include<string>#include<boost/bind/bind.hpp>#include<boost/shared_ptr.hpp>#include<boost/enable_shared_from_this.hpp>#include<asio.hpp>usingasio::ip::tcp;std::stringmake_daytime_string(){usingnamespacestd;// For time_t, time and ctime;
time_tnow=time(0);returnctime(&now);}classtcp_connection:publicboost::enable_shared_from_this<tcp_connection>{public:typedefboost::shared_ptr<tcp_connection>pointer;staticpointercreate(asio::io_context&io_context){returnpointer(newtcp_connection(io_context));}tcp::socket&socket(){returnsocket_;}voidstart(){message_=make_daytime_string();asio::async_write(socket_,asio::buffer(message_),boost::bind(&tcp_connection::handle_write,shared_from_this(),asio::placeholders::error,asio::placeholders::bytes_transferred));}private:tcp_connection(asio::io_context&io_context):socket_(io_context){}voidhandle_write(constasio::error_code&/*error*/,size_t/*bytes_transferred*/){}tcp::socketsocket_;std::stringmessage_;};classtcp_server{public:tcp_server(asio::io_context&io_context):io_context_(io_context),acceptor_(io_context,tcp::endpoint(tcp::v4(),13)){start_accept();}private:voidstart_accept(){tcp_connection::pointernew_connection=tcp_connection::create(io_context_);acceptor_.async_accept(new_connection->socket(),boost::bind(&tcp_server::handle_accept,this,new_connection,asio::placeholders::error));}voidhandle_accept(tcp_connection::pointernew_connection,constasio::error_code&error){if(!error){new_connection->start();}start_accept();}asio::io_context&io_context_;tcp::acceptoracceptor_;};intmain(){try{asio::io_contextio_context;tcp_serverserver(io_context);io_context.run();}catch(std::exception&e){std::cerr<<e.what()<<std::endl;}return0;}
typedefboost::shared_ptr<ip::tcp::socket>socket_ptr;io_serviceservice;ip::tcp::endpointep(ip::tcp::v4(),2001);// listen on 2001
ip::tcp::acceptoracc(service,ep);while(true){socket_ptrsock(newip::tcp::socket(service));acc.accept(*sock);boost::thread(boost::bind(client_session,sock));}voidclient_session(socket_ptrsock){while(true){chardata[512];size_tlen=sock->read_some(buffer(data));if(len>0)write(*sock,buffer("ok",2));}}
io_serviceservice;// 所有socket操作都由service来处理
ip::tcp::socketsock1(service);// all the socket operations are handled by service
ip::tcp::socketsock2(service);sock1.asyncconnect(ep,connect_handler);sock2.async_connect(ep,connect_handler);deadline_timert(service,boost::posixtime::seconds(5));t.async_wait(timeout_handler);service.run();
在上述代码中,我们保证前面的5个线程和后面的5个线程是顺序执行的。func called, i = 0在func called, i = 1之前被调用,然后调用func called, i = 2……同样func called, i = 5在func called, i = 6之前,func called, i = 6在func called, i = 7被调用……你需要注意的是尽管方法是顺序调用的,但是不意味着它们都在同一个线程执行。
#include<boost/asio.hpp>#include<boost/shared_ptr.hpp>#include<boost/thread.hpp>#include<boost/thread/mutex.hpp>#include<boost/bind.hpp>#include<iostream>boost::mutexglobal_stream_lock;voidWorkerThread(boost::shared_ptr<boost::asio::io_service>io_service){global_stream_lock.lock();std::cout<<"["<<boost::this_thread::get_id()<<"] Thread Start"<<std::endl;global_stream_lock.unlock();io_service->run();global_stream_lock.lock();std::cout<<"["<<boost::this_thread::get_id()<<"] Thread Finish"<<std::endl;global_stream_lock.unlock();}size_tfib(size_tn){if(n<=1){returnn;}boost::this_thread::sleep(boost::posix_time::milliseconds(1000));returnfib(n-1)+fib(n-2);}voidCalculateFib(size_tn){global_stream_lock.lock();std::cout<<"["<<boost::this_thread::get_id()<<"] Now calculating fib( "<<n<<" ) "<<std::endl;global_stream_lock.unlock();size_tf=fib(n);global_stream_lock.lock();std::cout<<"["<<boost::this_thread::get_id()<<"] fib( "<<n<<" ) = "<<f<<std::endl;global_stream_lock.unlock();}intmain(intargc,char*argv[]){boost::shared_ptr<boost::asio::io_service>io_service(newboost::asio::io_service);boost::shared_ptr<boost::asio::io_service::work>work(newboost::asio::io_service::work(*io_service));global_stream_lock.lock();std::cout<<"["<<boost::this_thread::get_id()<<"] The program will exit when all work has finished."<<std::endl;global_stream_lock.unlock();boost::thread_groupworker_threads;for(intx=0;x<2;++x){worker_threads.create_thread(boost::bind(&WorkerThread,io_service));}io_service->post(boost::bind(CalculateFib,3));io_service->post(boost::bind(CalculateFib,4));io_service->post(boost::bind(CalculateFib,5));work.reset();worker_threads.join_all();return0;}