2011年11月11日金曜日

BOOST::ASIOを使ってみる(6)

今回は非同期送信処理を作ってみます。基本的な構造は受信側と同じでModuleThreadクラスを使ってマルチスレッド化しています。送信処理の流れは名前解決→接続(コネクション)→データの送信→返信データの受信となります。

以下送信側のソースコードです。

#pragma once

#ifndef __ASIO_SENDER_MODULE__
#define __ASIO_SENDER_MODULE__

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>

namespace thorny_road{

using namespace boost::asio ;

// Simple Message data socket
class SimpleMsg
{
private :
    std::string msg ;
    boost::asio::streambuf buf ;
public:
    SimpleMsg(const std::string msg) : msg(msg)
    {
        std::ostream os(&buf) ;
        os.write((char *)(msg.c_str()),msg.size()) ;

        char ch = '\n' ;
        os.write(&ch,sizeof(char)) ; // Append Return Code
    }

    ~SimpleMsg() {}

    boost::asio::streambuf& rdbuf()
    {
        return buf ;
    }
} ;


template<typename SOK_PTYPE>
class SenderSession : private boost::noncopyable
{
private:
    io_service& io ;
    ip::tcp::socket socket ;
    boost::asio::streambuf buf ;
    std::string address ;
    short port ;
    boost::shared_ptr<ip::tcp::resolver> resolver ;
public:
    SenderSession(std::string& address,short port, SOK_PTYPE pdata,io_service& io)
        : io(io),socket(io),address(address),port(port)
    {
        // make resolver
        resolver.reset(new ip::tcp::resolver(io) );
        // make query
        ip::tcp::resolver::query query(address,boost::lexical_cast<std::string>(port));

        resolver->async_resolve(
            query,
            boost::bind(&SenderSession::resolve_ok, this,
                boost::asio::placeholders::error,
                pdata,
                boost::asio::placeholders::iterator));
    }

    ~SenderSession(){}

private:
    void resolve_ok(const boost::system::error_code e,SOK_PTYPE pdata,
            boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    {
        if (!e)
        {
            // Enqueue Connect Handler
            boost::asio::async_connect(
                socket,
                endpoint_iterator,
                boost::bind(&SenderSession::connect_ok, this,
                    boost::asio::placeholders::error,
                    pdata,
                    endpoint_iterator));

        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
        else
        {
            std::cout << "error" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
    }

    void connect_ok(const boost::system::error_code e,SOK_PTYPE pdata,
            boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    {
        if (!e)
        {
            // Enqueue Write Handler
            boost::asio::async_write(
                socket,
                pdata->rdbuf(),
                boost::bind(&SenderSession::receive_read, this,
                    boost::asio::placeholders::error,
                    pdata));
           
        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
        else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator())
        {
            // failed to resolve endpoint.
            boost::asio::async_connect(
                socket,
                endpoint_iterator,
                boost::bind(&SenderSession::connect_ok, this,
                    boost::asio::placeholders::error,
                    pdata,
                    ++endpoint_iterator)); // Try next resolved endpoint name.
        }
        else
        {
            std::cout << "error" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
    }

    void receive_read(const boost::system::error_code e,SOK_PTYPE pdata)
    {
        if (!e)
        {
            // Call Self until end of receive data.
            async_read(
                socket,
                pdata->rdbuf(),
                boost::asio::transfer_at_least(1),
                boost::bind(&SenderSession::receive_read,this,
                    boost::asio::placeholders::error,
                    pdata)) ;
        }
        else if (e == boost::asio::error::operation_aborted)
        {
            // abort
            std::cout << "connection abort" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
        else if (e != boost::asio::error::eof)
        {
            std::cout << "error" ;
            std::cout << e.message() ;
            delete this;
            return ;
        }
        else
        {
            delete this ; // end of session.
        }
    }


public:
    ip::tcp::socket& getSocket()
    {
        return (socket) ;
    }

} ;

class SenderModule : private boost::noncopyable
{
private:
    io_service& io ;   
    short port ;
    boost::mutex _lock ; // Lock Object
public:
    SenderModule(io_service& io, const short port)
        :io(io),port(port)
    {}

    ~SenderModule() {}

    template<typename U>
    void send(std::string address, U pdata)
    {
        boost::mutex::scoped_lock cs(_lock) ; // Lock until escaping this scope

        SenderSession *session = new SenderSession(address,port,pdata,io) ;
    }

} ;

} // namespace thorny_road
#endif //__ASIO_SENDER_MODULE__
メイン
#include "stdafx.h"
#include "SenderModule.hpp"
#include "ModuleThread.hpp" // for threading
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/bind.hpp>


using namespace thorny_road ;

int _tmain(int argc, _TCHAR* argv[])
{
    ModuleThread<SenderModule> sender(2085,2) ; // create module.

    boost::shared_ptr<SimpleMsg> msg(new SimpleMsg("[message]")) ;
    sender.getModule()->send("localhost",msg) ;

    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)) ;

    return 0;
}
今回は単純に文字列を送っています。上の例では"[message]"を送信しています。送信先はsend(...)の第一引数にアドレスを入れます。今回は"localhost"に送るようにしています。第二引数にはパケットデータクラスのポインタを入れます。パケットデータクラスは以下のようにバッファを参照するためのメソッド( rdbuf() )が用意されていればコンパイルが通ります(今のところは・・・)。
class SimpleMsg
{
private :
    std::string msg ;
    boost::asio::streambuf buf ;
public:
    SimpleMsg(const std::string msg) : msg(msg)
    {
        std::ostream os(&buf) ;
        os.write((char *)(msg.c_str()),msg.size()) ;

        char ch = '\n' ;
        os.write(&ch,sizeof(char)) ; // Append Return Code
    }

    ~SimpleMsg() {}

    boost::asio::streambuf& rdbuf()
    {
        return buf ;
    }
} ;
今回の例では単純に文字列をバッファストリームに書き込んでいます。受信側では最初の受信では'\n'まで読み込むので最後に'\n'を足しています。
また、ModuleThreadgetModule()は前回まではモジュールクラスのポインタを返していましたが、スマートポインタで返すように変更しています。私はポインターは面白くて好きですが、苦い経験も多々あるので最近はスマートポインタを多用しています。DELPHIでは参照カウンターを持っているInterfaceをよく使います。

受信側のプログラムが実行している状態で、送信側のプログラムを実行すると送受信が行われますが、送信セッションが受信待ち状態のままになります。これは受信側のセッションが終了していないためです。受信側で返信した後セッション閉じるようにすれば送信側もセッションが切れます。
    void write_ok(const boost::system::error_code e)
    {
        if (!e)
        {
            // start() ; // restart
            delete this; // finish this packet
        }
DELPHIにもC++のTemplateのような機能が欲しいと思うことが多々ありますが、ビルドが高速というのがDELPHIの良い点のひとつなので難しいところです。

ソースコードは自由にご使用ください。ただし問題が起きても責任はとれません。また、ソースコードに対する著作権は放棄していません。

0 件のコメント:

コメントを投稿