2011年11月14日月曜日

BOOST::ASIOを使ってみる(5)

非同期処理であるASIOをマルチスレッド上で動かしてみます。
その前に前回までのソースコードを少し変更しています。簡単に関数オブジェクトを実装できるboost::format<T>ですが、前回まではSessionのメンバ変数の値として持っていましたが、ServerModuleが先に解放された場合、クリアできなくなるので参照として持つようにしました。その代わりServerModuleのメンバ変数として存在するようにしました。値型と参照やスコープなどを考えないといけないところが私にとってC++が難しく感じるところかもしれません。
boost::format<T>などの使い方も勉強していきたいと思います。Lamdaもあって便利だと思いますので。

今回、boost::thread_groupを使いたいと思います。Threadをグループとして扱うことができます。boost::io_service::ioはハンドラーを効率よくスレッドに振り分けるようで、何も考えずにスレッドの中でio.run()を実行します。スレッドを開始するクラスをtemplateを使って少し汎用的にして別ファイルにしました。

前回少し説明したio_service::workクラスを使っています。今回、マルチスレッドの各スレッドでio.run()を実行してもキューが空だと終了してしまうからです。io_service::workを代入して空ではない状態にします。逆に終了するときはio_service::workを破棄します。

以下、ソースコードです。

ModuleThread.hpp
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>

namespace thorny_road{

// T is necessary 'stop', 'constructor(io,short port)'
template<typename T>
class ModuleThread : private boost::noncopyable
{
private:
    boost::thread_group tg ;
    boost::shared_ptr<T> _module ; // Target Module
    boost::shared_ptr<boost::asio::io_service> io_ptr ; // for creating service io
    boost::asio::io_service& io ; // service IO
    boost::shared_ptr<boost::asio::io_service::work> work;
    short port ;
public:
    ModuleThread(short Port, int NoThreads)
        : port(Port),
        io_ptr(new boost::asio::io_service()),
        io(*io_ptr),
        work(new boost::asio::io_service::work(io)) // add waiting work in io loop
    {
        io.reset() ;
        // Create some threads. and then run io.
        for (int i = 0 ; i < NoThreads; i++)
            tg.create_thread( boost::bind(&boost::asio::io_service::run, &io) ) ;

        // Create module
        _module.reset(new T(io,Port)) ;
    }

    // Destructor. reset IO work and waiting by stopping threads
    ~ModuleThread()
    {
        stop() ;
    }

    void stop()
    {
        work.reset() ; // destroy work object which is the waiting object.
        tg.join_all() ; // waiting by stopping threads.

        _module.reset() ;
    }

    T& getModule() { return *_module ; }
    T& getModule() const { return static_cast<const T&>(getModule()); }
} ;

} // thorny_road

#endif  //__ASIO_MODULE_THREAD__

ServerModule.hpp

#pragma once

#ifndef __ASIO_SERVER_MODULE__
#define __ASIO_SERVER_MODULE__

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>

namespace thorny_road{

using namespace boost::asio ;

// For calling event function handler easily.
typedef boost::function<void ()> SimpleProcedure ;

class ServerSession : private boost::noncopyable
{
private:
    io_service& io ;
    ip::tcp::socket socket ;
    boost::asio::streambuf buf ;
    SimpleProcedure& acc_stop ;
public:
    ServerSession(io_service& io,SimpleProcedure& acc_stop)
        : io(io),socket(io),acc_stop(acc_stop)    {}

    ~ServerSession(){}

    void start()
    {
        // Enqueue Read Handler
        async_read_until(
            socket,
            buf,
            '\n',
            boost::bind(&ServerSession::read_ok,this,_1) ) ;
    }

private:
    void read_ok(const boost::system::error_code e)
    {
        if (!e)
        {
            std::iostream ios(&buf) ;

            std::string tmp ;
            ios >> tmp ; // get input stream
            std::cout << tmp ;
            if (tmp == "end") { delete this; return; }
            else if (tmp == "bye")
            {
                acc_stop() ;
                delete this;
                return;
            }

            ios << tmp <<std::endl ; // retrun as it is

            // Enqueue Write Handler
            async_write(
                socket,
                buf,
                boost::bind(&ServerSession::write_ok,this,_1) ) ;
        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            delete this;
            return ;
        }
        else
        {
            std::cout << "error" ;
            delete this;
            return ;
        }
    }

    void write_ok(const boost::system::error_code e)
    {
        if (!e)
        {
            start() ; // Restart
        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            delete this;
            return ;
        }
        else
        {
            std::cout << "error" ;
            delete this;
            return ;
        }
    }

public:
    ip::tcp::socket& getSocket()
    {
        return (socket) ;
    }

} ;

class ServerModule : private boost::noncopyable
{
private:
    io_service& io ;   
    ip::tcp::acceptor accept ;
    ServerSession* session ;
    SimpleProcedure stopEvent ;
public:
    ServerModule(io_service& io, const short port)
        :io(io),accept(io,ip::tcp::endpoint(ip::tcp::v4(),port))
    {
        // Initialize Function Object
        stopEvent = boost::bind(&ServerModule::stop,this) ;
        start_accept() ;
    }

    ~ServerModule() {}

    void start_accept()
    {
        session = new ServerSession(io,stopEvent) ;
        // Enqueue Accept Handler
        accept.async_accept(
            session->getSocket(),
            boost::bind(&ServerModule::accept_ok,this,_1) ) ;
    }

    // Accept service will be closed. Then IOServce throw operation_aborted.
    // After queue get be empty, the io.run() will stop.
    void stop()
    {
        accept.close() ;
    }
private:

    void accept_ok(const boost::system::error_code e)
    {
        if (!e)
        {
            session->start() ;
            start_accept() ;
        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            delete session ;
            session = NULL ;
            return ;
        }
        else
        {
            // error
            std::cout << "error" ;
            delete session ;
            session = NULL ;
            return ;
        }
    }

} ;

} // namespace thorny_road
#endif //__ASIO_SERVER_MODULE__
Main

#include "stdafx.h"
#include "ServerModule.hpp"
#include "ModuleThread.hpp" // for threading

using namespace thorny_road ;

int _tmain(int argc, _TCHAR* argv[])
{
    ModuleThread<ServerModule> module(2085,2) ; // create module.

    return 0;
}
ModuleThreadの引数でポート番号とスレッドの数を指定します。"bye"と入力があると、ハンドラーが破棄され、アプリケーションが終了します。

次回は送信側を勉強してみます。

ソースコードは自由にご使用ください。ただし問題が起きても責任はとれません。また、ソースコードに対する著作権は放棄していません。

0 件のコメント:

コメントを投稿