00001
00002
00004
00005
00006 #include "mutableserverbase.h"
00007
00008
00009 #define THR_KEY_LISTENER -1
00010 #define THR_KEY_STACKER -2
00011 #define THR_KEY_HANDLER -3
00012
00013
00014 extern "C" {
00015 #include <transport.h>
00016 }
00017
00018
00019
00020
00021
00022 const char * MutableServerBase::MSB_MODE_NAME[] =
00023 {
00024 ""
00025 , "Standalone"
00026 , "Module"
00027 , "Server"
00028 , "Client"
00029 };
00030
00031
00033
00035
00036 MutableServerBase::MutableServerBase()
00037 {
00038 MaxMessageLength = 0;
00039 MessageBuffer = NULL;
00040
00041 LoggingOptions = WORM_LOG_TOFILE;
00042
00043 Mode = MMT_NONE;
00044
00045 ConnectFailureSec = -1;
00046
00047 MessageMutex = NULL;
00048
00049 ServeLogoCount = 0;
00050
00051 MaxClients = 1;
00052
00053 MaxServerTryLoopCount = 2;
00054
00055 TransmitBuffer = NULL;
00056 TransmitBufferLength = 0;
00057
00058 InputRingName = "";
00059 InputRingKey = WORM_RING_INVALID;
00060 InputRegionStruct.addr = NULL;
00061 InputRegion = &InputRegionStruct;
00062
00063 OutputRingName = "";
00064 OutputRingKey = WORM_RING_INVALID;
00065 OutputRegionStruct.addr = NULL;
00066 OutputRegion = &OutputRegionStruct;
00067
00068 }
00069
00070
00071
00072 MutableServerBase::~MutableServerBase()
00073 {
00074 if ( OutputRingKey != WORM_RING_INVALID
00075 && OutputRingKey != InputRingKey
00076 && OutputRingKey != CommandRingKey
00077 )
00078 {
00079 if ( OutputRegionStruct.addr != NULL )
00080 {
00081 TLogger::Logit( LoggingOptions
00082 , "~MutableServerBase(): detaching from output ring\n"
00083 );
00084 tport_detach( &OutputRegionStruct );
00085 OutputRegionStruct.addr = NULL;
00086 }
00087
00088 OutputRingKey = WORM_RING_INVALID;
00089 }
00090
00091 if ( InputRingKey != WORM_RING_INVALID
00092 && InputRingKey != CommandRingKey
00093 )
00094 {
00095 if ( InputRegionStruct.addr != NULL )
00096 {
00097 TLogger::Logit( LoggingOptions
00098 , "~MutableServerBase(): detaching from input ring\n"
00099 );
00100 tport_detach( &InputRegionStruct );
00101 InputRegionStruct.addr = NULL;
00102 }
00103
00104 InputRingKey = WORM_RING_INVALID;
00105 }
00106
00107 if ( CommandRegion.addr != NULL )
00108 {
00109 TLogger::Logit( LoggingOptions
00110 , "~MutableServerBase(): detaching from command ring\n"
00111 );
00112 tport_detach( &CommandRegion );
00113 CommandRegion.addr = NULL;
00114 }
00115
00116 if ( MessageMutex != NULL )
00117 {
00118 delete( MessageMutex );
00119 }
00120
00121 if ( MessageBuffer != NULL )
00122 {
00123 delete [] MessageBuffer;
00124 }
00125
00126 if ( TransmitBuffer != NULL )
00127 {
00128 delete [] TransmitBuffer;
00129 }
00130
00131 }
00132
00133
00134
00135 HANDLE_STATUS MutableServerBase::HandleConfigLine( ConfigSource * p_parser )
00136 {
00137
00138
00139 HANDLE_STATUS r_handled = HANDLER_USED;
00140
00141 try
00142 {
00143 char * _token;
00144
00145 int _ival;
00146
00147 do
00148 {
00149 if ( p_parser->Its("ImA") )
00150 {
00151 _token = p_parser->String();
00152 if ( strlen(_token) == 0 )
00153 {
00154 throw worm_exception("Incomplete <ImA> line");
00155 }
00156
00157 if ( p_parser->Its("StandAlone") )
00158 {
00159 Mode = MMT_STANDALONE;
00160 continue;
00161 }
00162
00163 if ( p_parser->Its("Module") )
00164 {
00165 Mode = MMT_MODULE;
00166 continue;
00167 }
00168
00169 if ( p_parser->Its("Server") )
00170 {
00171 Mode = MMT_SERVER;
00172 continue;
00173 }
00174
00175 if ( p_parser->Its("Client") )
00176 {
00177 Mode = MMT_CLIENT;
00178 continue;
00179 }
00180 }
00181
00182
00183
00184
00185
00186 if ( p_parser->Its("Provider") )
00187 {
00188 PROVIDER_ADDR _provider;
00189
00190 _token = p_parser->String();
00191 if ( strlen(_token) == 0 )
00192 {
00193 throw worm_exception("Incomplete <Provider> line");
00194 }
00195 _provider.IPAddr = _token;
00196
00197 _token = p_parser->String();
00198 if ( strlen(_token) == 0 )
00199 {
00200 throw worm_exception("Incomplete <Provider> line");
00201 }
00202 _provider.Port = _token;
00203
00204 Providers.push_back( _provider );
00205
00206 continue;
00207 }
00208
00209 if ( p_parser->Its("ConnectFailDelay") )
00210 {
00211 if ( (_ival = p_parser->Int()) == p_parser->INVALID_INT )
00212 {
00213 throw worm_exception("Invalid or incomplete <ConnectFailDelay> line");
00214 }
00215 ConnectFailureSec = _ival;
00216
00217 continue;
00218 }
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 if ( p_parser->Its("InputRing") )
00231 {
00232 _token = p_parser->String();
00233 if ( strlen(_token) == 0 )
00234 {
00235 throw worm_exception("Missing <InputRing> value");
00236 }
00237 InputRingName = _token;
00238
00239 if ( (InputRingKey = TGlobalUtils::LookupRingKey(InputRingName.c_str())) == WORM_RING_INVALID )
00240 {
00241 throw worm_exception("invalid <InputRing> value");
00242 }
00243
00244 continue;
00245 }
00246
00247 if ( p_parser->Its("OutputRing") )
00248 {
00249 _token = p_parser->String();
00250 if ( strlen(_token) == 0 )
00251 {
00252 throw worm_exception("missing <OutputRing> value");
00253 }
00254 OutputRingName = _token;
00255
00256 if ( (OutputRingKey = TGlobalUtils::LookupRingKey(OutputRingName.c_str())) == WORM_RING_INVALID )
00257 {
00258 throw worm_exception("invalid <OutputRing> value");
00259 }
00260
00261 continue;
00262 }
00263
00264 if ( p_parser->Its("AcceptLogo") )
00265 {
00266 if ( ServeLogoCount == SERVE_MAX_LOGOS )
00267 {
00268 throw worm_exception("too many <AcceptLogo> lines");
00269 }
00270
00271
00272
00273 _token = p_parser->String();
00274 if ( strlen(_token) == 0 )
00275 {
00276 throw worm_exception("missing <AcceptLogo> institution value");
00277 }
00278
00279 ServeLogo[ServeLogoCount].instid = (WORM_INSTALLATION_ID)TGlobalUtils::LookupInstallationId(_token);
00280
00281
00282
00283 _token = p_parser->String();
00284 if ( strlen(_token) == 0 )
00285 {
00286 throw worm_exception("missing <AcceptLogo> module value");
00287 }
00288
00289 ServeLogo[ServeLogoCount].mod = (WORM_MODULE_ID)TGlobalUtils::LookupModuleId(_token);
00290
00291
00292
00293 _token = p_parser->String();
00294 if ( strlen(_token) == 0 )
00295 {
00296 throw worm_exception("missing <AcceptLogo> type value");
00297 }
00298
00299 ServeLogo[ServeLogoCount].type = (WORM_MSGTYPE_ID)TGlobalUtils::LookupMessageTypeId(_token);
00300
00301 ServeLogoCount++;
00302
00303 continue;
00304 }
00305
00306
00307
00308
00309
00310
00311 if ( p_parser->Its("MaxMessageLength") )
00312 {
00313 int _wrkint = p_parser->Int();
00314 if ( _wrkint == ConfigSource::INVALID_INT )
00315 {
00316 throw worm_exception("Missing or invalid <MaxMessageLength> value");
00317 }
00318 MaxMessageLength = _wrkint;
00319
00320 if ( (MessageBuffer = new char[MaxMessageLength]) == NULL )
00321 {
00322 throw worm_exception("failed allocating MessageBuffer");
00323 }
00324 }
00325
00326
00327
00328
00329 r_handled = WormServerBase::HandleConfigLine( p_parser );
00330
00331
00332 } while ( false );
00333 }
00334 catch( worm_exception _we )
00335 {
00336 r_handled = HANDLER_INVALID;
00337 ConfigState = WORM_STAT_BADSTATE;
00338 TLogger::Logit( LoggingOptions
00339 , "MutableServerBase::HandleConfigLine(): configuration error:\n%s\n"
00340 , _we.what()
00341 );
00342 }
00343
00344 return r_handled;
00345 }
00346
00347
00348
00349 void MutableServerBase::CheckConfig()
00350 {
00351
00352 switch ( Mode )
00353 {
00354 case MMT_NONE:
00355 TLogger::Logit( LoggingOptions
00356 , "MutableServerBase::CheckConfig(): <ImA> line is required\n"
00357 );
00358 ConfigState = WORM_STAT_BADSTATE;
00359
00360 break;
00361
00362 case MMT_CLIENT:
00363
00364 if ( Providers.size() == 0 )
00365 {
00366 TLogger::Logit( LoggingOptions
00367 , "MutableServerBase::CheckConfig(): client mode requires at least one <Provider> line\n"
00368 );
00369 ConfigState = WORM_STAT_BADSTATE;
00370 }
00371
00372 if ( ConnectFailureSec < 0 )
00373 {
00374 TLogger::Logit( LoggingOptions
00375 , "MutableServerBase::CheckConfig() client mode requires <ConnectFailureSec> line\n"
00376 );
00377 ConfigState = WORM_STAT_BADSTATE;
00378 }
00379
00380 if ( SendTimeoutMS == -2 )
00381 {
00382 TLogger::Logit( LoggingOptions
00383 , "MutableServerBase::CheckConfig() client mode requires <SendTimeoutMSecs> line\n"
00384 );
00385 ConfigState = WORM_STAT_BADSTATE;
00386 }
00387
00388 if ( RecvTimeoutMS < MIN_RECV_TIMEOUT_MS )
00389 {
00390 TLogger::Logit( LoggingOptions
00391 , "WormServerBase::CheckConfig(): <RecvTimeoutMSecs> too low in config file, using %d\n"
00392 , MIN_RECV_TIMEOUT_MS
00393 );
00394 RecvTimeoutMS = MIN_RECV_TIMEOUT_MS;
00395 }
00396
00397 break;
00398
00399
00400 case MMT_SERVER:
00401
00402 if ( strlen(ServerIPAddr) == 0 )
00403 {
00404 TLogger::Logit( LoggingOptions
00405 , "MutableServerBase::CheckConfig() server mode requires <ServerIPAddr> line\n"
00406 );
00407 ConfigState = WORM_STAT_BADSTATE;
00408 }
00409
00410 if ( ServerPort == 0 )
00411 {
00412 TLogger::Logit( LoggingOptions
00413 , "MutableServerBase::CheckConfig() server mode requires <ServerPort> line\n"
00414 );
00415 ConfigState = WORM_STAT_BADSTATE;
00416 }
00417
00418 if ( CommandRingKey != WORM_RING_INVALID )
00419 {
00420 if ( MaxMessageLength == 0 )
00421 {
00422 TLogger::Logit( LoggingOptions
00423 , "MutableServerBase::CheckConfig() Error: missing or invalid <MaxMessageLength> line\n"
00424 );
00425 ConfigState = WORM_STAT_BADSTATE;
00426 }
00427 }
00428
00429 break;
00430
00431 case MMT_MODULE:
00432
00433 if ( CommandRingKey == WORM_RING_INVALID )
00434 {
00435 TLogger::Logit( LoggingOptions
00436 , "MutableServerBase::CheckConfig() module mode requires <CmdRingName> line\n"
00437 );
00438 ConfigState = WORM_STAT_BADSTATE;
00439 }
00440
00441 if ( InputRingKey == WORM_RING_INVALID )
00442 {
00443 InputRingKey = CommandRingKey;
00444 }
00445
00446 if ( OutputRingKey == WORM_RING_INVALID )
00447 {
00448 OutputRingKey = InputRingKey;
00449 }
00450
00451 if ( MaxMessageLength == 0 )
00452 {
00453 TLogger::Logit( LoggingOptions
00454 , "MutableServerBase::CheckConfig() Error: missing or invalid <MaxMessageLength> line\n"
00455 );
00456 ConfigState = WORM_STAT_BADSTATE;
00457 }
00458
00459 ResultMsgLogo.instid = TGlobalUtils::GetThisInstallationId();
00460 ResultMsgLogo.mod = TGlobalUtils::GetThisModuleId();
00461
00462 if ( ServeLogoCount == 0 )
00463 {
00464 TLogger::Logit( LoggingOptions
00465 , "MutableServerBase::CheckConfig() Error: no valid <AcceptLogo> lines found\n"
00466 );
00467 ConfigState = WORM_STAT_BADSTATE;
00468 }
00469
00470 if ( (ResultMsgLogo.type = TGlobalUtils::LookupMessageTypeId(OutputMessageTypeKey())) == WORM_MSGTYPE_INVALID )
00471 {
00472 TLogger::Logit( LoggingOptions
00473 , "MutableServerBase::CheckConfig() Error: id not found for output message type %s\n"
00474 , OutputMessageTypeKey()
00475 );
00476 ConfigState = WORM_STAT_BADSTATE;
00477 }
00478
00479 break;
00480
00481 }
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492 }
00493
00494
00495
00496 bool MutableServerBase::CheckForThreadDeath()
00497 {
00498 try
00499 {
00500 switch( Mode )
00501 {
00502 case MMT_MODULE:
00503
00504 if ( (LastStackerPulse + 6) < time(NULL) )
00505 {
00506 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "stacker thread stopped" );
00507 throw worm_exception("Stacker thread stopped pulsing");
00508 }
00509
00510 if ( (LastHandlerPulse + 6) < time(NULL) )
00511 {
00512 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "handler thread stopped" );
00513 throw worm_exception("Handler thread stopped pulsing");
00514 }
00515
00516 break;
00517
00518 case MMT_SERVER:
00519
00520
00521
00522
00523
00524 if ( (LastListenerPulse + 10 + 3) < time(NULL) )
00525 {
00526 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "listener thread stopped" );
00527 throw worm_exception("Listener thread stopped pulsing");
00528 }
00529
00530 break;
00531 }
00532 }
00533 catch( worm_exception _we )
00534 {
00535 return true;
00536 }
00537
00538 return false;
00539 }
00540
00541
00542
00543 void MutableServerBase::StartThreadFunc( void * p_arg )
00544 {
00545 switch ( *((int *)p_arg) )
00546 {
00547 case THR_KEY_LISTENER:
00548 Listener(p_arg);
00549 break;
00550
00551 case THR_KEY_STACKER:
00552 Stacker();
00553 break;
00554
00555 case THR_KEY_HANDLER:
00556 Handler();
00557 break;
00558
00559 default:
00560 ClientServicer( p_arg );
00561 break;
00562 }
00563 }
00564
00565
00566
00567
00568
00569 WORM_STATUS_CODE MutableServerBase::MainThreadActions()
00570 {
00571 static long CurrentTime
00572 , LastBeatTime
00573 ;
00574
00575
00576
00577 Running = true;
00578
00579
00580
00581
00582 int _arg;
00583
00584 switch( Mode )
00585 {
00586 case MMT_MODULE:
00587
00588 if ( (MessageMutex = new TMutex("msbmbmtx")) == NULL )
00589 {
00590 throw worm_exception("Failed creating Message buffer mutex");
00591 }
00592
00593
00594
00595 LastHandlerPulse = time(NULL) + 5;
00596 _arg = THR_KEY_HANDLER;
00597 if ( StartThreadWithArg( THREAD_STACK, &HandlerThreadId, &_arg ) == WORM_STAT_FAILURE )
00598 {
00599 throw worm_exception("Server failed starting handler thread");
00600 }
00601
00602 TTimeFuncs::MSecSleep(300);
00603
00604
00605
00606 LastStackerPulse = time(NULL) + 5;
00607 _arg = THR_KEY_STACKER;
00608 if ( StartThreadWithArg( THREAD_STACK, &StackerThreadId, &_arg ) == WORM_STAT_FAILURE )
00609 {
00610 throw worm_exception("Server failed starting stacker thread");
00611 }
00612
00613 TTimeFuncs::MSecSleep(250);
00614
00615 break;
00616
00617 case MMT_SERVER:
00618
00619 if ( SocketDebug )
00620 {
00621
00622 setSocket_ewDebug(1);
00623 }
00624
00625
00626
00627 LastListenerPulse = time(NULL) + 5;
00628 _arg = THR_KEY_LISTENER;
00629 if ( StartThreadWithArg( THREAD_STACK, &ListenerThreadId, &_arg ) == WORM_STAT_FAILURE )
00630 {
00631 throw worm_exception("Server failed starting listener thread");
00632 }
00633
00634 TTimeFuncs::MSecSleep(250);
00635
00636 break;
00637 }
00638
00639
00640 int shutdown_flag;
00641
00642
00643
00644 time(&CurrentTime);
00645 LastBeatTime = CurrentTime;
00646
00647
00648 while ( Running )
00649 {
00650
00651
00652 if ( CommandRingKey != WORM_RING_INVALID )
00653 {
00654
00655 shutdown_flag = tport_getflag(&CommandRegion);
00656 if( shutdown_flag == TERMINATE || shutdown_flag == (int)TGlobalUtils::GetPID() )
00657 {
00658 Running = false;
00659 continue;
00660 }
00661
00662
00663 if( TGlobalUtils::GetHeartbeatInt() <= (time(&CurrentTime) - LastBeatTime) )
00664 {
00665 LastBeatTime = CurrentTime;
00666 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT"), 0, "" );
00667 }
00668 }
00669
00670
00671
00672
00673 if ( TGlobalUtils::GetTerminateFlag() )
00674 {
00675 Running = false;
00676 continue;
00677 }
00678
00679
00680 if ( CheckForThreadDeath() )
00681 {
00682 Running = false;
00683 }
00684 else
00685 {
00686 TTimeFuncs::MSecSleep(500);
00687 }
00688
00689 }
00690
00691
00692 Running = false;
00693
00694
00695 return WORM_STAT_SUCCESS;
00696 }
00697
00698
00699
00700 WORM_STATUS_CODE MutableServerBase::Run( int argc, char* argv[] )
00701 {
00702 WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00703
00704 MutableServerRequest * _requestcontainer = NULL;
00705
00706 MutableServerResult * _resultcontainer = NULL;
00707
00708 try
00709 {
00710 if ( ConfigState != WORM_STAT_SUCCESS )
00711 {
00712 throw worm_exception("MutableServerBase::Run() not properly configured");
00713 }
00714
00715
00716
00717 signal(SIGINT, (SIG_HANDLR_PTR)SignalHandler);
00718 signal(SIGTERM, (SIG_HANDLR_PTR)SignalHandler);
00719 signal(SIGABRT, (SIG_HANDLR_PTR)SignalHandler);
00720 #ifdef SIGBREAK
00721 signal(SIGBREAK, (SIG_HANDLR_PTR)SignalHandler);
00722 #endif
00723
00724 #ifdef _SOLARIS
00725
00726 (void)sigignore(SIGPIPE);
00727 #endif
00728
00729
00730
00731
00732
00733
00734 if ( ! PrepareToRun() )
00735 {
00736 throw worm_exception("PrepareToRun() returned not ready");
00737 }
00738
00739
00740 switch( Mode )
00741 {
00742 case MMT_CLIENT:
00743
00744 LoggingOptions = WORM_LOG_TOFILE;
00745
00746 if ( (_requestcontainer = GetRequestContainer()) == NULL )
00747 {
00748 throw worm_exception("Failed creating request container");
00749 }
00750
00751 if ( GetRequestFromInput( argc, argv, _requestcontainer ) != WORM_STAT_SUCCESS )
00752 {
00753 throw worm_exception("GetRequestFromInput() returned error");
00754 }
00755
00756 if ( (_resultcontainer = GetResultContainer()) == NULL )
00757 {
00758 throw worm_exception("Failed creating result container");
00759 }
00760
00761 if ( TransmitRequest( _requestcontainer, _resultcontainer ) != WORM_STAT_SUCCESS )
00762 {
00763 throw worm_exception("TransmitRequest() returned error");
00764 }
00765
00766
00767
00768
00769
00770
00771 if ( (r_status = HandleResult( _resultcontainer )) == WORM_STAT_FAILURE )
00772 {
00773 throw worm_exception("HandleResult() returned error");
00774 }
00775
00776 break;
00777
00778
00779 case MMT_STANDALONE:
00780
00781 LoggingOptions = WORM_LOG_TOFILE;
00782
00783 if ( (_requestcontainer = GetRequestContainer()) == NULL )
00784 {
00785 throw worm_exception("Failed creating request container");
00786 }
00787
00788 if ( GetRequestFromInput( argc, argv, _requestcontainer ) != WORM_STAT_SUCCESS )
00789 {
00790 throw worm_exception("GetRequestFromInput() returned error");
00791 }
00792
00793 if ( (_resultcontainer = GetResultContainer()) == NULL )
00794 {
00795 throw worm_exception("Failed creating result container");
00796 }
00797
00798 if ( (r_status = ProcessRequest(_requestcontainer, _resultcontainer)) == WORM_STAT_SUCCESS )
00799 {
00800 if ( HandleResult( _resultcontainer ) != WORM_STAT_SUCCESS )
00801 {
00802 throw worm_exception("HandleResult() returned error");
00803 }
00804 }
00805
00806 break;
00807
00808
00809 case MMT_MODULE:
00810
00811 LoggingOptions = WORM_LOG_TOFILE|WORM_LOG_TOSTDERR;
00812
00813 tport_attach( &CommandRegion, CommandRingKey );
00814
00815 MainThreadActions();
00816
00817 tport_detach( &CommandRegion );
00818
00819 break;
00820
00821
00822 case MMT_SERVER:
00823
00824 LoggingOptions = WORM_LOG_TOFILE|WORM_LOG_TOSTDERR;
00825
00826 if ( CommandRingKey != WORM_RING_INVALID )
00827 {
00828 tport_attach( &CommandRegion, CommandRingKey );
00829 }
00830
00831 MainThreadActions();
00832
00833 if ( CommandRingKey != WORM_RING_INVALID )
00834 {
00835 tport_detach( &CommandRegion );
00836 }
00837
00838 break;
00839 }
00840
00841 }
00842 catch( worm_exception & _we )
00843 {
00844 Running = false;
00845 TLogger::Logit( LoggingOptions
00846 , "MutableServerBase::Run() Error: %s\n"
00847 , _we.what()
00848 );
00849 r_status = WORM_STAT_FAILURE;
00850 }
00851 catch( ... )
00852 {
00853 Running = false;
00854 r_status = WORM_STAT_FAILURE;
00855 }
00856
00857 if ( _requestcontainer != NULL )
00858 {
00859 delete( _requestcontainer );
00860 }
00861
00862 if ( _resultcontainer != NULL )
00863 {
00864 delete( _resultcontainer );
00865 }
00866
00867 return r_status;
00868 }
00869
00870
00871
00872 WORM_STATUS_CODE MutableServerBase::TransmitRequest( MutableServerRequest * p_request
00873 , MutableServerResult * r_result
00874 )
00875 {
00876 WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00877
00878 SOCKET _clientsocket = INVALID_SOCKET;
00879
00880 char * _buffer = NULL;
00881
00882 try
00883 {
00884 if ( p_request == NULL || r_result == NULL )
00885 {
00886 throw worm_exception("NULL parameter pointer");
00887 }
00888
00889 switch( Mode )
00890 {
00891 case MMT_CLIENT:
00892
00893 {
00894 int _lastprovider = 0
00895 , _currprovider = 0
00896 ;
00897 PROVIDER_ADDR _provider_addr;
00898 struct sockaddr_in _socket_addr;
00899
00900
00901 bool _needresult = true;
00902
00903
00904
00905 _currprovider = _lastprovider;
00906
00907
00908
00909 for ( int _pass = 0 ; _needresult && _pass < MaxServerTryLoopCount ; _pass++ )
00910 {
00911
00912 if ( _pass != 0 )
00913 {
00914
00915 TTimeFuncs::MSecSleep(250);
00916 }
00917
00918
00919 do
00920 {
00921 _currprovider++;
00922
00923
00924 if ( Providers.size() <= _currprovider )
00925 {
00926 _currprovider = 0;
00927 }
00928
00929 _provider_addr = Providers[_currprovider];
00930
00931
00932 memset( (char *)&_socket_addr, 0, sizeof(_socket_addr) );
00933 _socket_addr.sin_family = AF_INET;
00934 _socket_addr.sin_port = htons( (short)atoi(_provider_addr.Port.c_str()) );
00935
00936 if ( (int)(_socket_addr.sin_addr.S_un.S_addr = inet_addr(_provider_addr.IPAddr.c_str())) == INADDR_NONE )
00937 {
00938 worm_exception _expt("inet_addr failed for ");
00939 _expt += _provider_addr.IPAddr;
00940 _expt += ":";
00941 _expt += _provider_addr.Port;
00942 throw _expt;
00943 }
00944
00945 if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
00946 {
00947 TLogger::Logit( LoggingOptions
00948 , "Attempting connection to provider %d: %s:%s\n"
00949 , _currprovider
00950 , _provider_addr.IPAddr.c_str()
00951 , _provider_addr.Port.c_str()
00952 );
00953 }
00954
00955
00956 if ( ( _clientsocket = socket_ew( AF_INET, SOCK_STREAM, 0) ) == INVALID_SOCKET )
00957 {
00958 throw worm_exception("socket_ew() failed");
00959 }
00960
00961 if ( _clientsocket == SOCKET_ERROR )
00962 {
00963 _clientsocket = INVALID_SOCKET;
00964 throw worm_exception("socket_ew() failed");
00965 }
00966
00967
00968 if ( connect_ew( _clientsocket
00969 , (struct sockaddr * )&_socket_addr
00970 , sizeof(_socket_addr)
00971 , ConnectFailureSec
00972 ) != 0 )
00973 {
00974 if( WORM_LOG_DETAILS <= TGlobalUtils::GetLoggingLevel() )
00975 {
00976 TLogger::Logit( LoggingOptions
00977 , "connect_ew(): failed for %s:%s\n"
00978 , _provider_addr.IPAddr.c_str()
00979 , _provider_addr.Port.c_str()
00980 );
00981 }
00982 }
00983 else
00984 {
00985
00986
00987
00988
00989 if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
00990 {
00991 TLogger::Logit( LoggingOptions
00992 , "Connected to %s:%s\n"
00993 , _provider_addr.IPAddr.c_str()
00994 , _provider_addr.Port.c_str()
00995 );
00996 }
00997
00998
00999 if ( (_buffer = new char[GetMaxSocketBufferSize()]) == NULL )
01000 {
01001 throw worm_exception("failed to allocate socket buffer");
01002 }
01003
01004
01005
01006
01007 ServiceThreadInfoStruct _thr_info;
01008
01009 _thr_info.descriptor = _clientsocket;
01010 strcpy ( _thr_info.ipaddr , _provider_addr.IPAddr.c_str() );
01011
01012 ThreadsInfo[_clientsocket] = _thr_info;
01013
01014
01015 p_request->FormatBuffer();
01016
01017 int _msglen = p_request->GetBufferLength();
01018
01019 strcpy( _buffer, p_request->GetBuffer() );
01020
01021
01022
01023 TTimeFuncs::MSecSleep(250);
01024
01025
01026 if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
01027 {
01028 TLogger::Logit( LoggingOptions
01029 , "sending to descriptor %d (%d); message [%d]:\n%s<\n"
01030 , (int)_clientsocket
01031 , SendTimeoutMS
01032 , _msglen
01033 , _buffer
01034 );
01035 }
01036
01037
01038 if ( SendMessage( _clientsocket
01039 , _buffer
01040 , &_msglen
01041 )
01042 == WORM_STAT_FAILURE )
01043 {
01044 throw worm_exception("Failed sending request");
01045 }
01046
01047
01048
01049 time_t _quit_time = time(NULL)
01050 + ( RecvTimeoutMS / 1000 < 5 ? 5 : RecvTimeoutMS / 1000 )
01051 ;
01052
01053
01054 bool _readfinished = false;
01055
01056 do
01057 {
01058
01059
01060 _msglen = GetMaxSocketBufferSize();
01061
01062 switch( ListenForMsg( _clientsocket
01063 , _buffer
01064 , &_msglen
01065 , 500
01066 ) )
01067 {
01068 case 0:
01069 _readfinished = r_result->ParseMessageLine( _buffer );
01070 break;
01071 case -1:
01072 if ( _quit_time < time(NULL) )
01073 {
01074 throw worm_exception("timed out waiting for server result");
01075 }
01076 break;
01077 case -2:
01078 throw worm_exception("Server closed socket");
01079 case -3:
01080 throw worm_exception("Socket error");
01081 case -4:
01082 throw worm_exception("Server response too large for buffer");
01083 }
01084
01085
01086 } while( ! _readfinished );
01087
01088
01089
01090
01091
01092 if ( r_result->GetStatus() != MSB_RESULT_BUSY )
01093 {
01094 _needresult = false;
01095 }
01096
01097 }
01098
01099
01100 } while( _needresult
01101 && _currprovider != _lastprovider
01102 );
01103
01104 }
01105
01106
01107 if ( _needresult )
01108 {
01109 if( WORM_LOG_STATUS <= TGlobalUtils::GetLoggingLevel()
01110 && TGlobalUtils::GetLoggingLevel() < WORM_LOG_DETAILS
01111 )
01112 {
01113 TLogger::Logit( LoggingOptions
01114 , "Unable to establish connection to providers:\n"
01115 );
01116 for ( int _p = 0 ; _p < Providers.size() ; _p )
01117 {
01118 TLogger::Logit( LoggingOptions
01119 , " %s:%s\n"
01120 , Providers[_p].IPAddr.c_str()
01121 , Providers[_p].Port.c_str()
01122 );
01123 }
01124 }
01125 r_status = WORM_STAT_DISCONNECT;
01126 }
01127
01128
01129 }
01130 break;
01131
01132 default:
01133
01134 {
01135 worm_exception _expt("Invalid call for ");
01136 _expt += MSB_MODE_NAME[Mode];
01137 _expt += " mode.";
01138 throw _expt;
01139 }
01140 }
01141 }
01142 catch( worm_exception & _we )
01143 {
01144 r_status = WORM_STAT_FAILURE;
01145
01146 if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01147 {
01148 TLogger::Logit( LoggingOptions
01149 , "MutableServerBase::TransmitRequest(): %s\n"
01150 , _we.what()
01151 );
01152 }
01153 }
01154 catch( ... )
01155 {
01156 r_status = WORM_STAT_FAILURE;
01157
01158 if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01159 {
01160 TLogger::Logit( LoggingOptions
01161 , "MutableServerBase::TransmitRequest(): Unknown error\n"
01162 );
01163 }
01164 }
01165
01166
01167 if ( _clientsocket != INVALID_SOCKET )
01168 {
01169 closesocket_ew( _clientsocket, SOCKET_CLOSE_IMMEDIATELY_EW );
01170 }
01171
01172 return r_status;
01173 }
01174
01175
01176
01177 WORM_STATUS_CODE MutableServerBase::TransmitResult( MutableServerResult * p_result
01178 , const SOCKET * p_socketdescriptor
01179 )
01180 {
01181 WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
01182
01183 MutableServerResult * _workresult = p_result;
01184 bool _myresult = false;
01185
01186 try
01187 {
01188 int _neededlength;
01189
01190 switch( Mode )
01191 {
01192 case MMT_SERVER:
01193
01194 if ( p_socketdescriptor == NULL )
01195 {
01196 throw worm_exception("SOCKET pointer NULL");
01197 }
01198 else
01199 {
01200 int _msglen;
01201
01202 SOCKET _socket = (SOCKET)*p_socketdescriptor;
01203
01204 if ( _workresult == NULL )
01205 {
01206 if ( WORM_LOG_ERRORS <= LoggingLevel )
01207 {
01208 TLogger::Logit( LoggingOptions
01209 , "NULL result container pointer\n"
01210 );
01211 }
01212 if ( (_workresult = GetResultContainer()) == NULL )
01213 {
01214 throw worm_exception("Failed to create work result buffer for error");
01215 }
01216 _myresult = true;
01217 _workresult->SetStatus( MSB_RESULT_ERROR );
01218 }
01219
01220 _workresult->FormatBuffer();
01221 _msglen = p_result->GetBufferLength();
01222
01223
01224 TLogger::Logit( LoggingOptions
01225 , "DEBUG [%d] returning status to client at %d; message [%d]:\n%s<\n"
01226 , SendTimeoutMS
01227 , (int)_socket
01228 , _msglen
01229 , p_result->GetBuffer()
01230 );
01231
01232 if ( SendMessage( _socket
01233 , p_result->GetBuffer()
01234 , &_msglen
01235 ) != WORM_STAT_SUCCESS
01236 )
01237 {
01238 throw worm_exception("Failed sending result across socket");
01239 }
01240 }
01241 break;
01242
01243 case MMT_MODULE:
01244
01245
01246
01247 if ( p_result == NULL )
01248 {
01249
01250 _neededlength = 4;
01251 }
01252 else
01253 {
01254 p_result->FormatBuffer();
01255 _neededlength = p_result->GetBufferLength();
01256 }
01257
01258 if ( TransmitBufferLength < _neededlength )
01259 {
01260 if ( TransmitBuffer != NULL )
01261 {
01262 delete [] TransmitBuffer;
01263 }
01264 TransmitBuffer = NULL;
01265 }
01266
01267 if ( TransmitBuffer == NULL )
01268 {
01269 if ( (TransmitBuffer = new char[p_result->GetBufferLength()+100]) == NULL )
01270 {
01271 throw worm_exception("Failed allocating transmit buffer");
01272 }
01273 TransmitBufferLength = p_result->GetBufferLength()+100;
01274 }
01275
01276
01277 TransmitBuffer[0] = '\0';
01278
01279 if ( p_result == NULL )
01280 {
01281 strcpy( TransmitBuffer , "-1\n\n" );
01282 }
01283 else
01284 {
01285 strcpy( TransmitBuffer , p_result->GetBuffer() );
01286 }
01287
01288 if ( tport_putmsg( OutputRegion
01289 , &ResultMsgLogo
01290 , _neededlength
01291 , TransmitBuffer
01292 )
01293 != PUT_OK
01294 )
01295 {
01296 throw worm_exception("tport_putmsg() failed");
01297 }
01298
01299 break;
01300
01301 default:
01302
01303 {
01304 worm_exception _expt("invalid call for ");
01305 _expt += MSB_MODE_NAME[Mode];
01306 _expt += " mode.";
01307 throw _expt;
01308 }
01309 }
01310 }
01311 catch( worm_exception & _we )
01312 {
01313 r_status = WORM_STAT_FAILURE;
01314
01315 if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01316 {
01317 TLogger::Logit( LoggingOptions
01318 , "MutableServerBase::TransmitResult(): %s\n"
01319 , _we.what()
01320 );
01321 }
01322 }
01323
01324 if ( _myresult && _workresult != NULL )
01325 {
01326 delete ( _workresult );
01327 }
01328
01329 return r_status;
01330 }
01331
01332
01333
01334 THREAD_RETURN MutableServerBase::Stacker()
01335 {
01336 static InstallWildcard = TGlobalUtils::LookupInstallationId("INST_WILDCARD");
01337 static ModuleWildcard = TGlobalUtils::LookupModuleId("MOD_WILDCARD");
01338 static MessageWildcard = TGlobalUtils::LookupMessageTypeId("TYPE_WILDCARD");
01339
01340 int _getmsg_status;
01341 MSG_LOGO _arrivelogo;
01342 long _arr_msg_len;
01343 bool _msg_ready;
01344 char _errormsg[300];
01345
01346 bool _attached = false;
01347
01348 TLogger::Logit( LoggingOptions
01349 , "MutableServerBase::Stacker(): A\n"
01350 );
01351
01352 try
01353 {
01354 LastStackerPulse = time(NULL);
01355
01356
01357 if ( InputRingKey == CommandRingKey )
01358 {
01359 InputRegion = &CommandRegion;
01360 }
01361 else
01362 {
01363 InputRegion = &InputRegionStruct;
01364 tport_attach( InputRegion, InputRingKey );
01365 _attached = true;
01366 }
01367
01368
01369
01370
01371 do
01372 {
01373 _getmsg_status = tport_getmsg( InputRegion
01374 , ServeLogo
01375 , ServeLogoCount
01376 , &_arrivelogo
01377 , &_arr_msg_len
01378 , MessageBuffer
01379 , MaxMessageLength
01380 );
01381 }
01382 while ( _getmsg_status != GET_NONE );
01383
01384
01385
01386
01387 while ( Running )
01388 {
01389 LastStackerPulse = time(NULL);
01390
01391 _msg_ready = true;
01392
01393 switch( (_getmsg_status = tport_getmsg( InputRegion
01394 , ServeLogo
01395 , ServeLogoCount
01396 , &_arrivelogo
01397 , &_arr_msg_len
01398 , MessageBuffer
01399 , MaxMessageLength
01400 ))
01401 )
01402 {
01403 case GET_OK:
01404 break;
01405
01406 case GET_MISS:
01407 case GET_MISS_LAPPED:
01408
01409 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_MISSMSG, "missed messages" );
01410 break;
01411
01412 case GET_MISS_SEQGAP:
01413
01414 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_MISSMSG, "saw sequence gap" );
01415 break;
01416
01417 case GET_NOTRACK:
01418
01419 sprintf( _errormsg
01420 , "no tracking for logo i%d m%d t%d in %s"
01421 , (int)_arrivelogo.instid
01422 , (int)_arrivelogo.mod
01423 , (int)_arrivelogo.type
01424 , InputRingName
01425 );
01426 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_NOTRACK, _errormsg );
01427 break;
01428
01429 case GET_TOOBIG:
01430
01431 sprintf( _errormsg
01432 , "tport msg[%ld] i%d m%d t%d too big. Max is %ld"
01433 , _arr_msg_len
01434 , (int)_arrivelogo.instid
01435 , (int)_arrivelogo.mod
01436 , (int)_arrivelogo.type
01437 , MaxMessageLength
01438 );
01439 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_TOOBIG, _errormsg );
01440
01441 _msg_ready = false;
01442
01443 break;
01444
01445 case GET_NONE:
01446
01447 TTimeFuncs::MSecSleep(500);
01448 _msg_ready = false;
01449
01450 break;
01451
01452 }
01453
01454
01455 if ( _msg_ready )
01456 {
01457
01458 MessageBuffer[_arr_msg_len] = 0;
01459
01460
01461 if ( MessageBuffer[_arr_msg_len-1] == '\n' )
01462 {
01463 _arr_msg_len--;
01464 MessageBuffer[_arr_msg_len] = 0;
01465 }
01466
01467 for ( int _i = 0, _sz = ServeLogoCount ; _i < _sz ; _i++ )
01468 {
01469 if ( ( ServeLogo[_i].instid == InstallWildcard
01470 || _arrivelogo.instid == InstallWildcard
01471 || _arrivelogo.instid == ServeLogo[_i].instid
01472 )
01473 && ( ServeLogo[_i].mod == ModuleWildcard
01474 || _arrivelogo.mod == ModuleWildcard
01475 || _arrivelogo.mod == ServeLogo[_i].mod
01476 )
01477 && ( ServeLogo[_i].type == MessageWildcard
01478 || _arrivelogo.type == MessageWildcard
01479 || _arrivelogo.type == ServeLogo[_i].type
01480 )
01481 )
01482 {
01483 std::string _newmsg = MessageBuffer;
01484
01485
01486 MessageQueue.push( _newmsg );
01487
01488 break;
01489 }
01490 }
01491 }
01492
01493 }
01494
01495 }
01496 catch( worm_exception & _we )
01497 {
01498 TLogger::Logit( LoggingOptions
01499 , "MutableServerBase::Stacker() Error: %s"
01500 , _we.what()
01501 );
01502 TransmitResult( NULL );
01503 }
01504 catch( ... )
01505 {
01506 TLogger::Logit( LoggingOptions
01507 , "MutableServerBase::Stacker() Unknown Error"
01508 );
01509 TransmitResult( NULL );
01510 }
01511
01512 if ( _attached )
01513 {
01514 tport_detach( InputRegion );
01515 }
01516 }
01517
01518
01519
01520 THREAD_RETURN MutableServerBase::Handler()
01521 {
01522
01523 bool _attached = false;
01524
01525 std::string _message;
01526
01527 MutableServerRequest * _requestcontainer = NULL;
01528
01529 MutableServerResult * _resultcontainer = NULL;
01530
01531 try
01532 {
01533 LastHandlerPulse = time(NULL);
01534
01535
01536 if ( (_requestcontainer = GetRequestContainer()) == NULL )
01537 {
01538 throw worm_exception("failed to create request container");
01539 }
01540
01541 if ( (_resultcontainer = GetResultContainer()) == NULL )
01542 {
01543 throw worm_exception("failed to create result container");
01544 }
01545
01546
01547 if ( OutputRingKey == CommandRingKey )
01548 {
01549 OutputRegion = &CommandRegion;
01550 }
01551 else if ( OutputRingKey == InputRingKey )
01552 {
01553 OutputRegion = &InputRegionStruct;
01554 }
01555 else
01556 {
01557 OutputRegion = &OutputRegionStruct;
01558 tport_attach( OutputRegion, OutputRingKey );
01559 _attached = true;
01560 }
01561
01562
01563 WORM_STATUS_CODE _processing_state;
01564
01565
01566 while ( Running )
01567 {
01568 LastHandlerPulse = time(NULL);
01569
01570 if ( MessageQueue.empty() )
01571 {
01572 TTimeFuncs::MSecSleep(500);
01573 }
01574 else
01575 {
01576
01577 _message = MessageQueue.front();
01578
01579
01580 MessageQueue.pop();
01581
01582 try
01583 {
01584 _requestcontainer->ParseFromBuffer( _message.c_str() );
01585 }
01586 catch( worm_exception & _we )
01587 {
01588 TLogger::Logit( LoggingOptions
01589 , "MutableServerBase::Handler() Error parsing message:\n%s\n%s\n"
01590 , _message.c_str()
01591 , _we.what()
01592 );
01593
01594
01595 continue;
01596 }
01597
01598
01599 _processing_state = ProcessRequest( _requestcontainer, _resultcontainer );
01600
01601 if ( _processing_state == WORM_STAT_FAILURE )
01602 {
01603 throw worm_exception("ProcessRequest() returned error");
01604 }
01605
01606 if ( TransmitResult( _resultcontainer ) != WORM_STAT_SUCCESS )
01607 {
01608
01609
01610 throw worm_exception("ParseRequestFromMessage() returned error");
01611 }
01612
01613 }
01614
01615 }
01616
01617 }
01618 catch( worm_exception & _we )
01619 {
01620 TLogger::Logit( LoggingOptions
01621 , "MutableServerBase::Handler() Error: %s\n"
01622 , _we.what()
01623 );
01624 TransmitResult( NULL );
01625 }
01626 catch( ... )
01627 {
01628 TLogger::Logit( LoggingOptions
01629 , "MutableServerBase::Handler() Unknown Error\n"
01630 );
01631 TransmitResult( NULL );
01632 }
01633
01634
01635 if ( _attached )
01636 {
01637 tport_detach( OutputRegion );
01638 }
01639
01640
01641 if ( _requestcontainer != NULL )
01642 {
01643 delete( _requestcontainer );
01644 }
01645
01646 if ( _resultcontainer != NULL )
01647 {
01648 delete( _resultcontainer );
01649 }
01650 }
01651
01652
01653
01654 THREAD_RETURN MutableServerBase::ClientServicer( void * p_socketdescriptor )
01655 {
01656 if ( p_socketdescriptor == NULL )
01657 {
01658 TLogger::Logit( LoggingOptions
01659 , "MutableServerBase::ClientServicer() Error: NULL descriptor parameter\n"
01660 );
01661 return;
01662 }
01663
01664 char * _buffer = NULL;
01665
01666 MutableServerRequest * _requestcontainer = NULL;
01667
01668 MutableServerResult * _resultcontainer = NULL;
01669
01670
01671 SOCKET _mapkey = *((SOCKET *)p_socketdescriptor);
01672
01673 bool _send_error_message = false;
01674
01675 TLogger::Logit( LoggingOptions
01676 , "MutableServerBase::ClientServicer(): descriptor %d\n"
01677 , (int)_mapkey
01678 );
01679
01680 try
01681 {
01682
01683 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01684 ThreadsInfo[_mapkey].status = THREAD_INITIALIZING;
01685
01686 if ( ThreadsInfo.size() == MaxClients )
01687 {
01688
01689
01690
01691 if ( (_resultcontainer = GetResultContainer()) == NULL )
01692 {
01693 throw worm_exception("Failed creating result container");
01694 }
01695
01696 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01697 ThreadsInfo[_mapkey].status = THREAD_WAITING;
01698
01699 _resultcontainer->SetStatus( MSB_RESULT_BUSY );
01700
01701 if ( TransmitResult( _resultcontainer, &_mapkey ) != WORM_STAT_SUCCESS )
01702 {
01703 throw worm_exception("TransmitResult() returned error");
01704 }
01705 }
01706 else
01707 {
01708 if ( (_buffer = new char[GetMaxSocketBufferSize()+1]) == NULL )
01709 {
01710 throw worm_exception("Failed getting request buffer");
01711 }
01712
01713 if ( (_requestcontainer = GetRequestContainer()) == NULL )
01714 {
01715 throw worm_exception("Failed creating request container");
01716 }
01717
01718 if ( (_resultcontainer = GetResultContainer()) == NULL )
01719 {
01720 throw worm_exception("Failed creating result container");
01721 }
01722
01723 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01724 ThreadsInfo[_mapkey].status = THREAD_WAITING;
01725
01726
01727 bool _messagefinished = false
01728 , _readfinished = false
01729 ;
01730
01731 int _length;
01732
01733
01734
01735
01736 time_t _quit_time = time(NULL)
01737 + ( RecvTimeoutMS / 1000 < 5 ? 5 : RecvTimeoutMS / 1000 )
01738 ;
01739
01740 if ( WORM_LOG_STATUS <= LoggingLevel )
01741 {
01742 TLogger::Logit( LoggingOptions
01743 , "MutableServerBase::ClientServicer(): calling ListenForMsg(,,,%d)\n"
01744 , RecvTimeoutMS
01745 );
01746 }
01747
01748
01749 do
01750 {
01751
01752
01753
01754 _length = GetMaxSocketBufferSize();
01755
01756 switch ( ListenForMsg( _mapkey
01757 , _buffer
01758 , &_length
01759 , 500
01760 )
01761 )
01762 {
01763 case 0:
01764 _buffer[_length] = '\0';
01765
01766 if ( WORM_LOG_ERRORS <= LoggingLevel )
01767 {
01768 TLogger::Logit( LoggingOptions
01769 , "MutableServerBase::ClientServicer(): received msg line: %s\n"
01770 , _buffer
01771 );
01772 }
01773
01774 _readfinished = _messagefinished = _requestcontainer->ParseMessageLine( _buffer );
01775
01776
01777
01778
01779
01780
01781
01782
01783
01784
01785 break;
01786
01787 case -1:
01788 if ( _quit_time < time(NULL) )
01789 {
01790 _send_error_message = true;
01791 throw worm_exception("Client connection timed out");
01792 }
01793 break;
01794
01795 case -2:
01796 throw worm_exception("Client closed connection");
01797
01798 case -3:
01799 throw worm_exception("Socket error");
01800
01801 case -4:
01802 _send_error_message = true;
01803 throw worm_exception("Message too large for transmit buffer");
01804 }
01805
01806
01807
01808
01809
01810
01811
01812 } while( ! _readfinished );
01813
01814 TLogger::Logit( LoggingOptions
01815 , "MutableServerBase::ClientServicer(): DEBUG N\n"
01816 );
01817
01818 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01819 ThreadsInfo[_mapkey].status = THREAD_PROCESSING;
01820
01821 if ( _messagefinished )
01822 {
01823
01824 TLogger::Logit( LoggingOptions
01825 , "MutableServerBase::ClientServicer(): DEBUG O -- message finished\n"
01826 );
01827
01828 WORM_STATUS_CODE _state = ProcessRequest( _requestcontainer, _resultcontainer );
01829
01830 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01831
01832 if ( _state == WORM_STAT_FAILURE )
01833 {
01834 throw worm_exception("ProcessRequest() returned error");
01835 }
01836
01837 ThreadsInfo[_mapkey].lastpulse = time(NULL);
01838
01839 if ( TransmitResult( _resultcontainer, &_mapkey ) != WORM_STAT_SUCCESS )
01840 {
01841 throw worm_exception("TransmitResult() returned error");
01842 }
01843 }
01844 else
01845 {
01846 throw worm_exception("Incomplete message received");
01847 }
01848
01849
01850 }
01851
01852 ThreadsInfo[_mapkey].status = THREAD_COMPLETED;
01853
01854 }
01855 catch( worm_exception & _we )
01856 {
01857 ThreadsInfo[_mapkey].status = THREAD_ERROR;
01858 TLogger::Logit( LoggingOptions
01859 , "MutableServerBase::ClientServicer(thr: %d) Error: %s\n"
01860 , ThreadsInfo[_mapkey].threadid
01861 , _we.what()
01862 );
01863 if ( _send_error_message )
01864 {
01865 TransmitResult( NULL, &_mapkey );
01866 }
01867 }
01868 catch( ... )
01869 {
01870 ThreadsInfo[_mapkey].status = THREAD_ERROR;
01871 TLogger::Logit( LoggingOptions
01872 , "MutableServerBase::ClientServicer(thr: %d) Unknown error\n"
01873 , ThreadsInfo[_mapkey].threadid
01874 );
01875 if ( _send_error_message )
01876 {
01877 TransmitResult( NULL, &_mapkey );
01878 }
01879 }
01880
01881
01882
01883
01884 if ( _mapkey != NULL )
01885 {
01886 closesocket_ew( _mapkey, SOCKET_CLOSE_IMMEDIATELY_EW );
01887 }
01888
01889 if ( _buffer != NULL )
01890 {
01891 delete[] _buffer;
01892 }
01893
01894 if ( _requestcontainer != NULL )
01895 {
01896 delete( _requestcontainer );
01897 }
01898
01899 if ( _resultcontainer != NULL )
01900 {
01901 delete( _resultcontainer );
01902 }
01903 }
01904