その前に前回までのソースコードを少し変更しています。簡単に関数オブジェクトを実装できるboost::format<T>ですが、前回まではSessionのメンバ変数の値として持っていましたが、ServerModuleが先に解放された場合、クリアできなくなるので参照として持つようにしました。その代わりServerModuleのメンバ変数として存在するようにしました。値型と参照やスコープなどを考えないといけないところが私にとってC++が難しく感じるところかもしれません。
boost::format<T>などの使い方も勉強していきたいと思います。Lamdaもあって便利だと思いますので。
今回、boost::thread_groupを使いたいと思います。Threadをグループとして扱うことができます。boost::io_service::ioはハンドラーを効率よくスレッドに振り分けるようで、何も考えずにスレッドの中でio.run()を実行します。スレッドを開始するクラスをtemplateを使って少し汎用的にして別ファイルにしました。
前回少し説明したio_service::workクラスを使っています。今回、マルチスレッドの各スレッドでio.run()を実行してもキューが空だと終了してしまうからです。io_service::workを代入して空ではない状態にします。逆に終了するときはio_service::workを破棄します。
以下、ソースコードです。
ModuleThread.hpp
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> namespace thorny_road{ // T is necessary 'stop', 'constructor(io,short port)' template<typename T> class ModuleThread : private boost::noncopyable { private: boost::thread_group tg ; boost::shared_ptr<T> _module ; // Target Module boost::shared_ptr<boost::asio::io_service> io_ptr ; // for creating service io boost::asio::io_service& io ; // service IO boost::shared_ptr<boost::asio::io_service::work> work; short port ; public: ModuleThread(short Port, int NoThreads) : port(Port), io_ptr(new boost::asio::io_service()), io(*io_ptr), work(new boost::asio::io_service::work(io)) // add waiting work in io loop { io.reset() ; // Create some threads. and then run io. for (int i = 0 ; i < NoThreads; i++) tg.create_thread( boost::bind(&boost::asio::io_service::run, &io) ) ; // Create module _module.reset(new T(io,Port)) ; } // Destructor. reset IO work and waiting by stopping threads ~ModuleThread() { stop() ; } void stop() { work.reset() ; // destroy work object which is the waiting object. tg.join_all() ; // waiting by stopping threads. _module.reset() ; } T& getModule() { return *_module ; } T& getModule() const { return static_cast<const T&>(getModule()); } } ; } // thorny_road #endif //__ASIO_MODULE_THREAD__
ServerModule.hpp
#pragma once #ifndef __ASIO_SERVER_MODULE__ #define __ASIO_SERVER_MODULE__ #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/function.hpp> #include <boost/shared_ptr.hpp> namespace thorny_road{ using namespace boost::asio ; // For calling event function handler easily. typedef boost::function<void ()> SimpleProcedure ; class ServerSession : private boost::noncopyable { private: io_service& io ; ip::tcp::socket socket ; boost::asio::streambuf buf ; SimpleProcedure& acc_stop ; public: ServerSession(io_service& io,SimpleProcedure& acc_stop) : io(io),socket(io),acc_stop(acc_stop) {} ~ServerSession(){} void start() { // Enqueue Read Handler async_read_until( socket, buf, '\n', boost::bind(&ServerSession::read_ok,this,_1) ) ; } private: void read_ok(const boost::system::error_code e) { if (!e) { std::iostream ios(&buf) ; std::string tmp ; ios >> tmp ; // get input stream std::cout << tmp ; if (tmp == "end") { delete this; return; } else if (tmp == "bye") { acc_stop() ; delete this; return; } ios << tmp <<std::endl ; // retrun as it is // Enqueue Write Handler async_write( socket, buf, boost::bind(&ServerSession::write_ok,this,_1) ) ; } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; delete this; return ; } else { std::cout << "error" ; delete this; return ; } } void write_ok(const boost::system::error_code e) { if (!e) { start() ; // Restart } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; delete this; return ; } else { std::cout << "error" ; delete this; return ; } } public: ip::tcp::socket& getSocket() { return (socket) ; } } ; class ServerModule : private boost::noncopyable { private: io_service& io ; ip::tcp::acceptor accept ; ServerSession* session ; SimpleProcedure stopEvent ; public: ServerModule(io_service& io, const short port) :io(io),accept(io,ip::tcp::endpoint(ip::tcp::v4(),port)) { // Initialize Function Object stopEvent = boost::bind(&ServerModule::stop,this) ; start_accept() ; } ~ServerModule() {} void start_accept() { session = new ServerSession(io,stopEvent) ; // Enqueue Accept Handler accept.async_accept( session->getSocket(), boost::bind(&ServerModule::accept_ok,this,_1) ) ; } // Accept service will be closed. Then IOServce throw operation_aborted. // After queue get be empty, the io.run() will stop. void stop() { accept.close() ; } private: void accept_ok(const boost::system::error_code e) { if (!e) { session->start() ; start_accept() ; } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; delete session ; session = NULL ; return ; } else { // error std::cout << "error" ; delete session ; session = NULL ; return ; } } } ; } // namespace thorny_road #endif //__ASIO_SERVER_MODULE__Main
#include "stdafx.h" #include "ServerModule.hpp" #include "ModuleThread.hpp" // for threading using namespace thorny_road ; int _tmain(int argc, _TCHAR* argv[]) { ModuleThread<ServerModule> module(2085,2) ; // create module. return 0; }ModuleThreadの引数でポート番号とスレッドの数を指定します。"bye"と入力があると、ハンドラーが破棄され、アプリケーションが終了します。
次回は送信側を勉強してみます。
ソースコードは自由にご使用ください。ただし問題が起きても責任はとれません。また、ソースコードに対する著作権は放棄していません。
0 件のコメント:
コメントを投稿