以下送信側のソースコードです。
#pragma once #ifndef __ASIO_SENDER_MODULE__ #define __ASIO_SENDER_MODULE__ #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/function.hpp> #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include <boost/lexical_cast.hpp> namespace thorny_road{ using namespace boost::asio ; // Simple Message data socket class SimpleMsg { private : std::string msg ; boost::asio::streambuf buf ; public: SimpleMsg(const std::string msg) : msg(msg) { std::ostream os(&buf) ; os.write((char *)(msg.c_str()),msg.size()) ; char ch = '\n' ; os.write(&ch,sizeof(char)) ; // Append Return Code } ~SimpleMsg() {} boost::asio::streambuf& rdbuf() { return buf ; } } ; template<typename SOK_PTYPE> class SenderSession : private boost::noncopyable { private: io_service& io ; ip::tcp::socket socket ; boost::asio::streambuf buf ; std::string address ; short port ; boost::shared_ptr<ip::tcp::resolver> resolver ; public: SenderSession(std::string& address,short port, SOK_PTYPE pdata,io_service& io) : io(io),socket(io),address(address),port(port) { // make resolver resolver.reset(new ip::tcp::resolver(io) ); // make query ip::tcp::resolver::query query(address,boost::lexical_cast<std::string>(port)); resolver->async_resolve( query, boost::bind(&SenderSession::resolve_ok, this, boost::asio::placeholders::error, pdata, boost::asio::placeholders::iterator)); } ~SenderSession(){} private: void resolve_ok(const boost::system::error_code e,SOK_PTYPE pdata, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { if (!e) { // Enqueue Connect Handler boost::asio::async_connect( socket, endpoint_iterator, boost::bind(&SenderSession::connect_ok, this, boost::asio::placeholders::error, pdata, endpoint_iterator)); } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; std::cout << e.message() ; delete this; return ; } else { std::cout << "error" ; std::cout << e.message() ; delete this; return ; } } void connect_ok(const boost::system::error_code e,SOK_PTYPE pdata, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { if (!e) { // Enqueue Write Handler boost::asio::async_write( socket, pdata->rdbuf(), boost::bind(&SenderSession::receive_read, this, boost::asio::placeholders::error, pdata)); } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; std::cout << e.message() ; delete this; return ; } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // failed to resolve endpoint. boost::asio::async_connect( socket, endpoint_iterator, boost::bind(&SenderSession::connect_ok, this, boost::asio::placeholders::error, pdata, ++endpoint_iterator)); // Try next resolved endpoint name. } else { std::cout << "error" ; std::cout << e.message() ; delete this; return ; } } void receive_read(const boost::system::error_code e,SOK_PTYPE pdata) { if (!e) { // Call Self until end of receive data. async_read( socket, pdata->rdbuf(), boost::asio::transfer_at_least(1), boost::bind(&SenderSession::receive_read,this, boost::asio::placeholders::error, pdata)) ; } else if (e == boost::asio::error::operation_aborted) { // abort std::cout << "connection abort" ; std::cout << e.message() ; delete this; return ; } else if (e != boost::asio::error::eof) { std::cout << "error" ; std::cout << e.message() ; delete this; return ; } else { delete this ; // end of session. } } public: ip::tcp::socket& getSocket() { return (socket) ; } } ; class SenderModule : private boost::noncopyable { private: io_service& io ; short port ; boost::mutex _lock ; // Lock Object public: SenderModule(io_service& io, const short port) :io(io),port(port) {} ~SenderModule() {} template<typename U> void send(std::string address, U pdata) { boost::mutex::scoped_lock cs(_lock) ; // Lock until escaping this scope SenderSession *session = new SenderSession(address,port,pdata,io) ; } } ; } // namespace thorny_road #endif //__ASIO_SENDER_MODULE__メイン
#include "stdafx.h" #include "SenderModule.hpp" #include "ModuleThread.hpp" // for threading #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/bind.hpp> using namespace thorny_road ; int _tmain(int argc, _TCHAR* argv[]) { ModuleThread<SenderModule> sender(2085,2) ; // create module. boost::shared_ptr<SimpleMsg> msg(new SimpleMsg("[message]")) ; sender.getModule()->send("localhost",msg) ; boost::this_thread::sleep(boost::posix_time::milliseconds(1000)) ; return 0; }今回は単純に文字列を送っています。上の例では"[message]"を送信しています。送信先はsend(...)の第一引数にアドレスを入れます。今回は"localhost"に送るようにしています。第二引数にはパケットデータクラスのポインタを入れます。パケットデータクラスは以下のようにバッファを参照するためのメソッド( rdbuf() )が用意されていればコンパイルが通ります(今のところは・・・)。
class SimpleMsg { private : std::string msg ; boost::asio::streambuf buf ; public: SimpleMsg(const std::string msg) : msg(msg) { std::ostream os(&buf) ; os.write((char *)(msg.c_str()),msg.size()) ; char ch = '\n' ; os.write(&ch,sizeof(char)) ; // Append Return Code } ~SimpleMsg() {} boost::asio::streambuf& rdbuf() { return buf ; } } ;今回の例では単純に文字列をバッファストリームに書き込んでいます。受信側では最初の受信では'\n'まで読み込むので最後に'\n'を足しています。
また、ModuleThreadのgetModule()は前回まではモジュールクラスのポインタを返していましたが、スマートポインタで返すように変更しています。私はポインターは面白くて好きですが、苦い経験も多々あるので最近はスマートポインタを多用しています。DELPHIでは参照カウンターを持っているInterfaceをよく使います。
受信側のプログラムが実行している状態で、送信側のプログラムを実行すると送受信が行われますが、送信セッションが受信待ち状態のままになります。これは受信側のセッションが終了していないためです。受信側で返信した後セッション閉じるようにすれば送信側もセッションが切れます。
void write_ok(const boost::system::error_code e) { if (!e) { // start() ; // restart delete this; // finish this packet }DELPHIにもC++のTemplateのような機能が欲しいと思うことが多々ありますが、ビルドが高速というのがDELPHIの良い点のひとつなので難しいところです。
ソースコードは自由にご使用ください。ただし問題が起きても責任はとれません。また、ソースコードに対する著作権は放棄していません。
0 件のコメント:
コメントを投稿