非同期処理であるASIOをマルチスレッド上で動かしてみます。
その前に前回までのソースコードを少し変更しています。簡単に関数オブジェクトを実装できる
boost::format<T>ですが、前回までは
Sessionのメンバ変数の値として持っていましたが、
ServerModuleが先に解放された場合、クリアできなくなるので参照として持つようにしました。その代わり
ServerModuleのメンバ変数として存在するようにしました。値型と参照やスコープなどを考えないといけないところが私にとってC++が難しく感じるところかもしれません。
boost::format<T>などの使い方も勉強していきたいと思います。
Lamdaもあって便利だと思いますので。
今回、
boost::thread_groupを使いたいと思います。
Threadをグループとして扱うことができます。
boost::io_service::ioはハンドラーを効率よくスレッドに振り分けるようで、何も考えずにスレッドの中で
io.run()を実行します。スレッドを開始するクラスをtemplateを使って少し汎用的にして別ファイルにしました。
前回少し説明した
io_service::workクラスを使っています。今回、マルチスレッドの各スレッドで
io.run()を実行してもキューが空だと終了してしまうからです。
io_service::workを代入して空ではない状態にします。逆に終了するときは
io_service::workを破棄します。
以下、ソースコードです。
ModuleThread.hpp
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
namespace thorny_road{
// T is necessary 'stop', 'constructor(io,short port)'
template<typename T>
class ModuleThread : private boost::noncopyable
{
private:
boost::thread_group tg ;
boost::shared_ptr<T> _module ; // Target Module
boost::shared_ptr<boost::asio::io_service> io_ptr ; // for creating service io
boost::asio::io_service& io ; // service IO
boost::shared_ptr<boost::asio::io_service::work> work;
short port ;
public:
ModuleThread(short Port, int NoThreads)
: port(Port),
io_ptr(new boost::asio::io_service()),
io(*io_ptr),
work(new boost::asio::io_service::work(io)) // add waiting work in io loop
{
io.reset() ;
// Create some threads. and then run io.
for (int i = 0 ; i < NoThreads; i++)
tg.create_thread( boost::bind(&boost::asio::io_service::run, &io) ) ;
// Create module
_module.reset(new T(io,Port)) ;
}
// Destructor. reset IO work and waiting by stopping threads
~ModuleThread()
{
stop() ;
}
void stop()
{
work.reset() ; // destroy work object which is the waiting object.
tg.join_all() ; // waiting by stopping threads.
_module.reset() ;
}
T& getModule() { return *_module ; }
T& getModule() const { return static_cast<const T&>(getModule()); }
} ;
} // thorny_road
#endif //__ASIO_MODULE_THREAD__
ServerModule.hpp
#pragma once
#ifndef __ASIO_SERVER_MODULE__
#define __ASIO_SERVER_MODULE__
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
namespace thorny_road{
using namespace boost::asio ;
// For calling event function handler easily.
typedef boost::function<void ()> SimpleProcedure ;
class ServerSession : private boost::noncopyable
{
private:
io_service& io ;
ip::tcp::socket socket ;
boost::asio::streambuf buf ;
SimpleProcedure& acc_stop ;
public:
ServerSession(io_service& io,SimpleProcedure& acc_stop)
: io(io),socket(io),acc_stop(acc_stop) {}
~ServerSession(){}
void start()
{
// Enqueue Read Handler
async_read_until(
socket,
buf,
'\n',
boost::bind(&ServerSession::read_ok,this,_1) ) ;
}
private:
void read_ok(const boost::system::error_code e)
{
if (!e)
{
std::iostream ios(&buf) ;
std::string tmp ;
ios >> tmp ; // get input stream
std::cout << tmp ;
if (tmp == "end") { delete this; return; }
else if (tmp == "bye")
{
acc_stop() ;
delete this;
return;
}
ios << tmp <<std::endl ; // retrun as it is
// Enqueue Write Handler
async_write(
socket,
buf,
boost::bind(&ServerSession::write_ok,this,_1) ) ;
}
else if (e == boost::asio::error::operation_aborted)
{
// abort
std::cout << "connection abort" ;
delete this;
return ;
}
else
{
std::cout << "error" ;
delete this;
return ;
}
}
void write_ok(const boost::system::error_code e)
{
if (!e)
{
start() ; // Restart
}
else if (e == boost::asio::error::operation_aborted)
{
// abort
std::cout << "connection abort" ;
delete this;
return ;
}
else
{
std::cout << "error" ;
delete this;
return ;
}
}
public:
ip::tcp::socket& getSocket()
{
return (socket) ;
}
} ;
class ServerModule : private boost::noncopyable
{
private:
io_service& io ;
ip::tcp::acceptor accept ;
ServerSession* session ;
SimpleProcedure stopEvent ;
public:
ServerModule(io_service& io, const short port)
:io(io),accept(io,ip::tcp::endpoint(ip::tcp::v4(),port))
{
// Initialize Function Object
stopEvent = boost::bind(&ServerModule::stop,this) ;
start_accept() ;
}
~ServerModule() {}
void start_accept()
{
session = new ServerSession(io,stopEvent) ;
// Enqueue Accept Handler
accept.async_accept(
session->getSocket(),
boost::bind(&ServerModule::accept_ok,this,_1) ) ;
}
// Accept service will be closed. Then IOServce throw operation_aborted.
// After queue get be empty, the io.run() will stop.
void stop()
{
accept.close() ;
}
private:
void accept_ok(const boost::system::error_code e)
{
if (!e)
{
session->start() ;
start_accept() ;
}
else if (e == boost::asio::error::operation_aborted)
{
// abort
std::cout << "connection abort" ;
delete session ;
session = NULL ;
return ;
}
else
{
// error
std::cout << "error" ;
delete session ;
session = NULL ;
return ;
}
}
} ;
} // namespace thorny_road
#endif //__ASIO_SERVER_MODULE__
Main
#include "stdafx.h"
#include "ServerModule.hpp"
#include "ModuleThread.hpp" // for threading
using namespace thorny_road ;
int _tmain(int argc, _TCHAR* argv[])
{
ModuleThread<ServerModule> module(2085,2) ; // create module.
return 0;
}
ModuleThreadの引数でポート番号とスレッドの数を指定します。"bye"と入力があると、ハンドラーが破棄され、アプリケーションが終了します。
次回は送信側を勉強してみます。
ソースコードは自由にご使用ください。ただし問題が起きても責任はとれません。また、ソースコードに対する著作権は放棄していません。