00001
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "stdnet.h"
00025 #include "nel/misc/time_nl.h"
00026 #include "nel/net/module_gateway.h"
00027 #include "nel/net/module.h"
00028 #include "nel/net/module_manager.h"
00029 #include "nel/net/module_socket.h"
00030 #include "nel/net/module_message.h"
00031 #include "nel/net/callback_client.h"
00032 #include "nel/net/callback_server.h"
00033
00034 using namespace std;
00035 using namespace NLMISC;
00036
00037
00038
00039 namespace NLNET
00040 {
00041
00042
00043
00044
00045
00046
00047
00048
00049 const uint32 KEEP_ALIVE_DELAY = 120;
00050
00052 class CL3ServerRoute : public CGatewayRoute
00053 {
00054 public:
00056 TSockId SockId;
00057
00059 mutable uint32 LastCommTime;
00060
00061
00062 CL3ServerRoute(IGatewayTransport *transport)
00063 : CGatewayRoute(transport),
00064 LastCommTime(CTime::getSecondsSince1970())
00065 {
00066 }
00067
00068 void sendMessage(const CMessage &message) const;
00069 };
00070
00071 #define LAYER3_SERVER_CLASS_NAME "L3Server"
00072
00074 class CGatewayL3ServerTransport : public IGatewayTransport
00075 {
00076 friend class CL3ServerRoute;
00077 public:
00079 auto_ptr<CCallbackServer> _CallbackServer;
00080
00082 typedef map<CCallbackNetBase*, CGatewayL3ServerTransport*> TDispatcherIndex;
00083 static TDispatcherIndex _DispatcherIndex;
00084
00086 typedef std::map<TSockId, CL3ServerRoute*> TRouteMap;
00087 TRouteMap _Routes;
00088
00089
00091 CGatewayL3ServerTransport(const IGatewayTransport::TCtorParam ¶m)
00092 : IGatewayTransport(param)
00093 {
00094 }
00095
00096 ~CGatewayL3ServerTransport()
00097 {
00098 if (_CallbackServer.get() != NULL)
00099 {
00100
00101 closeServer();
00102 }
00103 }
00104
00105 const std::string &getClassName() const
00106 {
00107 static string className(LAYER3_SERVER_CLASS_NAME);
00108 return className;
00109 }
00110
00111 virtual void update()
00112 {
00113 H_AUTO(L3S_update);
00114
00115 if (_CallbackServer.get() != NULL)
00116 _CallbackServer->update2(100, 0);
00117
00118 uint32 now = CTime::getSecondsSince1970();
00119
00120 TRouteMap::iterator first(_Routes.begin()), last(_Routes.end());
00121 for (; first != last; ++first)
00122 {
00123 CL3ServerRoute *route = first->second;
00124
00125 if (now - route->LastCommTime > KEEP_ALIVE_DELAY)
00126 {
00127 nldebug("NETL6:L3Server: sending KeepAlive message");
00128
00129 CMessage keepAlive("KA");
00130 route->sendMessage(keepAlive);
00131
00132
00133 route->LastCommTime = CTime::getSecondsSince1970();
00134 }
00135
00136
00137 _CallbackServer->flush(route->SockId);
00138 }
00139
00140
00141 }
00142
00143 virtual uint32 getRouteCount() const
00144 {
00145 return _Routes.size();
00146 }
00147
00148 void dump(NLMISC::CLog &log) const
00149 {
00150 IModuleManager &mm = IModuleManager::getInstance();
00151 log.displayNL(" NeL Net layer 3 transport, SERVER mode");
00152 if (_CallbackServer.get() == NULL)
00153 {
00154 log.displayNL(" The server is currently closed.");
00155 }
00156 else
00157 {
00158 log.displayNL(" The server is open on '%s' and support %u routes :",
00159 _CallbackServer->listenAddress().asString().c_str(),
00160 _Routes.size());
00161 TRouteMap::const_iterator first(_Routes.begin()), last(_Routes.end());
00162 for (; first != last; ++first)
00163 {
00164 TSockId sockId = first->first;
00165 CL3ServerRoute *route = first->second;
00166 log.displayNL(" + route to '%s', %u entries in the proxy translation table :",
00167 sockId->getTcpSock()->remoteAddr().asString().c_str(),
00168 route->ForeignToLocalIdx.getAToBMap().size());
00169
00170 {
00171 CGatewayRoute::TForeignToLocalIdx::TAToBMap::const_iterator first(route->ForeignToLocalIdx.getAToBMap().begin()), last(route->ForeignToLocalIdx.getAToBMap().end());
00172 for (; first != last; ++first)
00173 {
00174 IModuleProxy *modProx = mm.getModuleProxy(first->second);
00175
00176 log.displayNL(" - Proxy '%s' : local proxy id %u => foreign module id %u",
00177 modProx != NULL ? modProx->getModuleName().c_str() : "ERROR, invalid module",
00178 first->second,
00179 first->first);
00180 }
00181 }
00182 }
00183
00184 log.displayNL(" Dumping send buffers states");
00185 _CallbackServer->displaySendQueueStat(&log);
00186 log.displayNL(" Dumping receive buffers states");
00187 _CallbackServer->displayReceiveQueueStat(&log);
00188 }
00189 }
00190
00191 void onCommand(const CMessage &) throw (EInvalidCommand)
00192 {
00193
00194 throw EInvalidCommand();
00195 }
00197 bool onCommand(const TParsedCommandLine &command) throw (EInvalidCommand)
00198 {
00199 if (command.SubParams.size() < 1)
00200 throw EInvalidCommand();
00201
00202 const std::string &commandName = command.SubParams[0]->ParamName;
00203 if (commandName == "open")
00204 {
00205 const TParsedCommandLine *portParam = command.getParam("port");
00206 if (portParam == NULL)
00207 throw EInvalidCommand();
00208
00209 uint16 port;
00210 fromString(portParam->ParamValue, port);
00211
00212 openServer(port);
00213 }
00214 else if (commandName == "close")
00215 {
00216 closeServer();
00217 }
00218 else
00219 return false;
00220
00221 return true;
00222 }
00223
00225 void openServer(uint16 port) throw (ETransportError)
00226 {
00227 if (_CallbackServer.get() != NULL)
00228 throw ETransportError("openServer : The server is already open");
00229
00230
00231 auto_ptr<CCallbackServer> cbs = auto_ptr<CCallbackServer> (new CCallbackServer());
00232
00233
00234 cbs->setConnectionCallback(cbConnection, static_cast<IGatewayTransport*>(this));
00235 cbs->setDisconnectionCallback(cbDisconnection, static_cast<IGatewayTransport*>(this));
00236 cbs->setDefaultCallback(cbDispatchMessage);
00237
00238
00239 cbs->init(port);
00240
00241 _CallbackServer = cbs;
00242
00243
00244 _DispatcherIndex.insert(make_pair(_CallbackServer.get(), this));
00245 }
00246
00248 void closeServer()
00249 {
00250 if (_CallbackServer.get() == NULL)
00251 throw ETransportError("closeServer : The server is not open");
00252
00253
00254 while (!_Routes.empty())
00255 {
00256 CL3ServerRoute *route = _Routes.begin()->second;
00257
00258
00259 _CallbackServer->disconnect(route->SockId);
00260
00261 _Gateway->onRouteRemoved(route);
00262
00263
00264 _Routes.erase(_Routes.begin());
00265 delete route;
00266 }
00267
00268
00269 _DispatcherIndex.erase(_CallbackServer.get());
00270
00271
00272 delete _CallbackServer.release();
00273 }
00274
00275
00276
00278
00279
00280
00281 void onConnection ( TSockId from)
00282 {
00283 H_AUTO(L3S_onConnection);
00284 nlassert(_Routes.find(from) == _Routes.end());
00285
00286
00287 CL3ServerRoute* route = new CL3ServerRoute(this);
00288 route->SockId = from;
00289
00290
00291 route->LastCommTime = CTime::getSecondsSince1970();
00292
00293
00294 _Routes.insert(make_pair(from, route));
00295
00296
00297 _Gateway->onRouteAdded(route);
00298 }
00299
00300
00301 void onDisconnection ( TSockId from)
00302 {
00303 H_AUTO(L3S_onDisconnection);
00304 TRouteMap::iterator it(_Routes.find(from));
00305 nlassert(it != _Routes.end());
00306
00307
00308 _Gateway->onRouteRemoved(it->second);
00309
00310
00311 CL3ServerRoute *route = it->second;
00312 _Routes.erase(it);
00313 delete route;
00314 }
00315
00316
00317 void onDispatchMessage(const CMessage &msgin, TSockId from, CCallbackNetBase &)
00318 {
00319 H_AUTO(L3S_onDispatchMessage);
00320 TRouteMap::iterator it(_Routes.find(from));
00321 nlassert(it != _Routes.end());
00322
00323
00324 it->second->LastCommTime = CTime::getSecondsSince1970();
00325
00326 if (msgin.getName() == "KA")
00327 {
00328
00329 return;
00330 }
00331
00332 _Gateway->onReceiveMessage(it->second, msgin);
00333
00334 }
00335
00336
00337
00339
00340
00341 static void cbConnection ( TSockId from, void *arg )
00342 {
00343 nlassert(arg != NULL);
00344 CGatewayL3ServerTransport *transport = dynamic_cast<CGatewayL3ServerTransport *>(static_cast<IGatewayTransport*>(arg));
00345 nlassert(transport != NULL);
00346
00347 transport->onConnection(from);
00348 }
00349
00350
00351 static void cbDisconnection ( TSockId from, void *arg )
00352 {
00353 nlassert(arg != NULL);
00354 CGatewayL3ServerTransport *transport = dynamic_cast<CGatewayL3ServerTransport *>(static_cast<IGatewayTransport*>(arg));
00355 nlassert(transport != NULL);
00356
00357 transport->onDisconnection(from);
00358 }
00359
00360
00361 static void cbDispatchMessage (CMessage &msgin, TSockId from, CCallbackNetBase &netbase)
00362 {
00363
00364 TDispatcherIndex::iterator it(_DispatcherIndex.find(&netbase));
00365 nlassert(it != _DispatcherIndex.end());
00366
00367
00368 it->second->onDispatchMessage(msgin, from, netbase);
00369 }
00370
00371 };
00372
00373 CGatewayL3ServerTransport::TDispatcherIndex CGatewayL3ServerTransport::_DispatcherIndex;
00374
00375
00376 NLMISC_REGISTER_OBJECT(IGatewayTransport, CGatewayL3ServerTransport, std::string, string(LAYER3_SERVER_CLASS_NAME));
00377
00378 void CL3ServerRoute::sendMessage(const CMessage &message) const
00379 {
00380 H_AUTO(L3SRoute_sendMessage);
00381 NLNET_AUTO_DELTE_ASSERT;
00382
00383 CGatewayL3ServerTransport *trpt = static_cast<CGatewayL3ServerTransport*>(_Transport);
00384
00385
00386 trpt->_CallbackServer->send(message, SockId);
00387
00388
00389 LastCommTime = CTime::getSecondsSince1970();
00390 }
00391
00397 class CL3ClientRoute : public CGatewayRoute
00398 {
00399 public:
00401 CInetAddress ServerAddr;
00403 mutable CCallbackClient CallbackClient;
00405 mutable uint32 LastCommTime;
00406
00408 uint32 LastConnectionRetry;
00409
00410
00411 uint32 ConnId;
00412
00413 CL3ClientRoute(IGatewayTransport *transport, CInetAddress serverAddr,uint32 connId)
00414 : CGatewayRoute(transport),
00415 ServerAddr(serverAddr),
00416 LastCommTime(CTime::getSecondsSince1970()),
00417 LastConnectionRetry(0),
00418 ConnId(connId)
00419 {
00420 }
00421
00422 void sendMessage(const CMessage &message) const
00423 {
00424 NLNET_AUTO_DELTE_ASSERT;
00425 H_AUTO(L3CRoute_sendMessage);
00426 if (CallbackClient.connected())
00427 {
00428
00429 LastCommTime = CTime::getSecondsSince1970();
00430
00431 CallbackClient.send(message);
00432 }
00433 }
00434 };
00435
00436 #define LAYER3_CLIENT_CLASS_NAME "L3Client"
00437
00439 class CGatewayL3ClientTransport : public IGatewayTransport
00440 {
00441 friend class CL3ClientRoute;
00442 public:
00444 typedef map<CCallbackNetBase*, CGatewayL3ClientTransport*> TDispatcherIndex;
00445 static TDispatcherIndex _DispatcherIndex;
00446
00448 typedef map<TSockId, CL3ClientRoute*> TClientRoutes;
00449 TClientRoutes _Routes;
00450
00453 typedef vector<TSockId> TClientRouteIds;
00454 TClientRouteIds _RouteIds;
00456 typedef vector<TClientRouteIds::difference_type> TFreeRouteIds;
00457 TFreeRouteIds _FreeRoutesIds;
00458
00460 list<CL3ClientRoute*> _RouteToRemove;
00461
00463 uint32 _RetryInterval;
00464
00465 enum
00466 {
00468 RETRY_INTERVAL = 5,
00470 MIN_RETRY_INTERVAL = 1,
00471 };
00472
00474 CGatewayL3ClientTransport(const IGatewayTransport::TCtorParam ¶m)
00475 : IGatewayTransport(param),
00476 _RetryInterval(RETRY_INTERVAL)
00477 {
00478 }
00479
00480 ~CGatewayL3ClientTransport()
00481 {
00482 deletePendingRoute();
00483
00484
00485 for (uint i=0; i<_RouteIds.size(); ++i)
00486 {
00487 if (_RouteIds[i] != NULL)
00488 {
00489
00490 close(i);
00491 }
00492 }
00493 }
00494
00495 void deletePendingRoute()
00496 {
00497 H_AUTO(L3C_deletePendingRoute);
00498
00499 while (!_RouteToRemove.empty())
00500 {
00501 CL3ClientRoute *route = _RouteToRemove.front();
00502 _DispatcherIndex.erase(&(route->CallbackClient));
00503 _Routes.erase(route->CallbackClient.getSockId());
00504
00505 _RouteIds[route->ConnId] = NULL;
00506 _FreeRoutesIds.push_back(route->ConnId);
00507 delete route;
00508 _RouteToRemove.pop_front();
00509 }
00510 }
00511
00512 const std::string &getClassName() const
00513 {
00514 static string className(LAYER3_CLIENT_CLASS_NAME);
00515 return className;
00516 }
00517
00518 virtual void update()
00519 {
00520 H_AUTO(L3C_update);
00521
00522 deletePendingRoute();
00523
00524 uint32 now = CTime::getSecondsSince1970();
00525
00526 TClientRoutes::iterator first(_Routes.begin()), last(_Routes.end());
00527 for (; first != last; ++first)
00528 {
00529 CL3ClientRoute *route = first->second;
00530
00531 if (!route->CallbackClient.connected())
00532 {
00533
00534 if (route->LastConnectionRetry + _RetryInterval < now)
00535 {
00536 route->LastConnectionRetry = now;
00537 try
00538 {
00539 nldebug("Connecting to %s...", route->ServerAddr.asString().c_str());
00540 route->CallbackClient.connect(route->ServerAddr);
00541 nldebug("Connected to %s", route->ServerAddr.asString().c_str());
00542 _Gateway->onRouteAdded(route);
00543 }
00544 catch(...)
00545 {
00546 nlinfo("Server %s still not available for connection", route->ServerAddr.asString().c_str());
00547 }
00548 }
00549 }
00550 else
00551 {
00552 route->CallbackClient.update2(100, 0);
00553
00554
00555
00556
00557
00558 if (now - route->LastCommTime > (KEEP_ALIVE_DELAY+5))
00559 {
00560 nldebug("NETL6:L3Client: sending KeepAlive message");
00561
00562
00563 CMessage keepAlive("KA");
00564
00565 route->sendMessage(keepAlive);
00566 }
00567
00568
00569 route->CallbackClient.flush();
00570 }
00571 }
00572 }
00573
00574 virtual uint32 getRouteCount() const
00575 {
00576 return _Routes.size();
00577 }
00578
00579 void dump(NLMISC::CLog &log) const
00580 {
00581 IModuleManager &mm = IModuleManager::getInstance();
00582 log.displayNL(" NeL Net layer 3 transport, CLIENT mode");
00583
00584 log.displayNL(" There are actually %u active route :", _Routes.size());
00585
00586 TClientRoutes::const_iterator first(_Routes.begin()), last(_Routes.end());
00587 for (; first != last; ++first)
00588 {
00589 CL3ClientRoute *route = first->second;
00590 log.displayNL(" + route to '%s', %s, %u entries in the proxy translation table :",
00591 route->ServerAddr.asString().c_str(),
00592 route->CallbackClient.connected() ? "connected" : "NOT CONNECTED",
00593 route->ForeignToLocalIdx.getAToBMap().size());
00594 {
00595 CGatewayRoute::TForeignToLocalIdx::TAToBMap::const_iterator first(route->ForeignToLocalIdx.getAToBMap().begin()), last(route->ForeignToLocalIdx.getAToBMap().end());
00596 for (; first != last; ++first)
00597 {
00598 IModuleProxy *modProx = mm.getModuleProxy(first->second);
00599
00600 log.displayNL(" - Proxy '%s' : local proxy id %u => foreign module id %u",
00601 modProx != NULL ? modProx->getModuleName().c_str() : "ERROR, invalid module",
00602 first->second,
00603 first->first);
00604 }
00605 }
00606
00607 log.displayNL(" Dumping send buffer state");
00608 route->CallbackClient.displaySendQueueStat(&log);
00609 log.displayNL(" Dumping receive buffer state");
00610 route->CallbackClient.displayReceiveQueueStat(&log);
00611 }
00612 }
00613
00614 void onCommand(const CMessage &) throw (EInvalidCommand)
00615 {
00616
00617 throw EInvalidCommand();
00618 }
00620 bool onCommand(const TParsedCommandLine &command) throw (EInvalidCommand)
00621 {
00622 if (command.SubParams.size() < 1)
00623 throw EInvalidCommand();
00624
00625 const std::string &commandName = command.SubParams[0]->ParamName;
00626 if (commandName == "connect")
00627 {
00628 const TParsedCommandLine *addrParam = command.getParam("addr");
00629 if (addrParam == NULL)
00630 throw EInvalidCommand();
00631
00632 CInetAddress addr(addrParam->ParamValue);
00633
00634 connect(addr);
00635 }
00636 else if (commandName == "close")
00637 {
00638 const TParsedCommandLine *conIdParam= command.getParam("connId");
00639 if (conIdParam == NULL)
00640 throw EInvalidCommand();
00641
00642 uint32 connId;
00643 fromString(conIdParam->ParamValue, connId);
00644
00645 close(connId);
00646 }
00647 else if (commandName == "retryInterval")
00648 {
00649 uint32 interval;
00650 fromString(command.SubParams[0]->ParamValue, interval);
00651 _RetryInterval = std::max(uint32(MIN_RETRY_INTERVAL), interval);
00652
00653 nldebug("CGatewayL3ClientTransport : setting retry interval to %u", _RetryInterval);
00654 }
00655 else
00656 return false;
00657
00658 return true;
00659
00660 }
00661
00663 void connect(CInetAddress &addr)
00664 {
00665 H_AUTO(L3C_connect);
00666 uint32 connId;
00667
00668
00669 if (_FreeRoutesIds.empty())
00670 {
00671 connId = _RouteIds.size();
00672 _RouteIds.push_back(InvalidSockId);
00673 }
00674 else
00675 {
00676 connId = _FreeRoutesIds.back();
00677 _FreeRoutesIds.pop_back();
00678 }
00679
00680 auto_ptr<CL3ClientRoute> route = auto_ptr<CL3ClientRoute>(new CL3ClientRoute(this, addr, connId));
00681
00682
00683 route->CallbackClient.setDisconnectionCallback(cbDisconnection, static_cast<IGatewayTransport*>(this));
00684 route->CallbackClient.setDefaultCallback(cbDispatchMessage);
00685
00686 try
00687 {
00688 nldebug("CGatewayL3ClientTransport : Connecting to %s...", addr.asString().c_str());
00689 route->LastConnectionRetry = CTime::getSecondsSince1970();
00690
00691 route->CallbackClient.connect(addr);
00692 nldebug("CGatewayL3ClientTransport : Connected to %s with connId %u", addr.asString().c_str(), connId);
00693 }
00694 catch (ESocketConnectionFailed e)
00695 {
00696 nlinfo("CGatewayL3ClientTransport : Failed to connect to server %s, retrying in %u seconds", addr.asString().c_str(), _RetryInterval);
00697 }
00698
00699
00700 _Routes.insert(make_pair(route->CallbackClient.getSockId(), route.get()));
00701 _RouteIds[connId] = route->CallbackClient.getSockId();
00702
00703
00704 _DispatcherIndex.insert(make_pair(&route->CallbackClient, this));
00705
00706
00707 CL3ClientRoute *rt = route.release();
00708
00709
00710 if (rt->CallbackClient.connected())
00711 _Gateway->onRouteAdded(rt);
00712 }
00713
00714
00715 void close ( uint32 connId)
00716 {
00717 H_AUTO(L3C_close);
00718
00719 if (connId >= _RouteIds.size())
00720 {
00721 nlwarning("CGatewayL3ClientTransport : Invalid connectionId %u, max is %u", connId, _RouteIds.size()-1);
00722 return;
00723 }
00724
00725 if (_RouteIds[connId] == NULL)
00726 {
00727 nlwarning("CGatewayL3ClientTransport : Invalid connectionId %u, the connection is unused now.", connId);
00728 return;
00729 }
00730
00731
00732 deletePendingRoute();
00733
00734
00735 TClientRoutes::iterator it(_Routes.find(_RouteIds[connId]));
00736 nlassert(it != _Routes.end());
00737
00738 CL3ClientRoute *route = it->second;
00739
00740 nldebug("CGatewayL3ClientTransport : Closing connection %u to %s", connId, route->ServerAddr.asString().c_str());
00741
00742
00743 if (route->CallbackClient.connected())
00744 {
00745
00746 _Gateway->onRouteRemoved(route);
00747
00748
00749 route->CallbackClient.disconnect();
00750 }
00751
00752
00753 _DispatcherIndex.erase(&(route->CallbackClient));
00754 _Routes.erase(it);
00755 delete route;
00756 _RouteIds[connId] = NULL;
00757 _FreeRoutesIds.push_back(connId);
00758 }
00759
00760
00762
00763
00764
00765 void onDisconnection ( TSockId from)
00766 {
00767 H_AUTO(L3C_onDisconnection);
00768
00769
00770 TClientRoutes::iterator it(_Routes.find(from));
00771 nlassert(it != _Routes.end());
00772
00773 nldebug("CGatewayL3ClientTransport : Disconnection from %s", it->second->ServerAddr.asString().c_str());
00774
00775
00776
00777 _Gateway->onRouteRemoved(it->second);
00778
00779
00780 it->second->LastConnectionRetry = CTime::getSecondsSince1970();
00781
00782
00783
00784
00785
00786 }
00787
00788
00789 void onDispatchMessage(const CMessage &msgin, TSockId from, CCallbackNetBase &)
00790 {
00791 H_AUTO(L3C_onDispatchMessage);
00792 TClientRoutes::iterator it(_Routes.find(from));
00793 nlassert(it != _Routes.end());
00794
00795
00796 it->second->LastCommTime = CTime::getSecondsSince1970();
00797
00798 if (msgin.getName() == "KA")
00799 {
00800
00801 return;
00802 }
00803
00804 _Gateway->onReceiveMessage(it->second, msgin);
00805 }
00806
00807
00808
00810
00811
00812
00813 static void cbDisconnection ( TSockId from, void *arg )
00814 {
00815 nlassert(arg != NULL);
00816 CGatewayL3ClientTransport *transport = dynamic_cast<CGatewayL3ClientTransport *>(static_cast<IGatewayTransport*>(arg));
00817 nlassert(transport != NULL);
00818
00819 transport->onDisconnection(from);
00820 }
00821
00822
00823 static void cbDispatchMessage (CMessage &msgin, TSockId from, CCallbackNetBase &netbase)
00824 {
00825
00826 TDispatcherIndex::iterator it(_DispatcherIndex.find(&netbase));
00827 nlassert(it != _DispatcherIndex.end());
00828
00829
00830 it->second->onDispatchMessage(msgin, from, netbase);
00831 }
00832
00833 };
00834
00835 CGatewayL3ClientTransport::TDispatcherIndex CGatewayL3ClientTransport::_DispatcherIndex;
00836
00837
00838 NLMISC_REGISTER_OBJECT(IGatewayTransport, CGatewayL3ClientTransport, std::string, string(LAYER3_CLIENT_CLASS_NAME));
00839
00840
00841 void forceGatewayTransportLink()
00842 {
00843 }
00844
00845 }