Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

mutableserverbase.cpp

Go to the documentation of this file.
00001 // mutableserverbase.cpp: implementation of the MutableServerBase class.
00002 //
00004 
00005 
00006 #include "mutableserverbase.h"
00007 
00008 
00009 #define THR_KEY_LISTENER -1
00010 #define THR_KEY_STACKER  -2
00011 #define THR_KEY_HANDLER  -3
00012 
00013 // this must follow the other includes
00014 extern "C" {
00015 #include <transport.h>
00016 }
00017 
00018 // Implementation of mode names,
00019 // this array must match the enum MUTABLE_MODE_TYPE
00020 // towards the top of mutableserverbase.h
00021 //
00022 const char * MutableServerBase::MSB_MODE_NAME[] =
00023 {
00024   ""
00025 , "Standalone"
00026 , "Module"
00027 , "Server"
00028 , "Client"
00029 };
00030 
00031 
00033 // Construction/Destruction
00035 
00036 MutableServerBase::MutableServerBase()
00037 {
00038    MaxMessageLength = 0;
00039    MessageBuffer = NULL;
00040 
00041    LoggingOptions = WORM_LOG_TOFILE;
00042 
00043    Mode = MMT_NONE;
00044 
00045    ConnectFailureSec = -1;
00046 
00047    MessageMutex = NULL;
00048   
00049    ServeLogoCount = 0;
00050 
00051    MaxClients = 1;
00052 
00053    MaxServerTryLoopCount = 2;
00054 
00055    TransmitBuffer = NULL;
00056    TransmitBufferLength = 0;
00057 
00058    InputRingName = "";
00059    InputRingKey = WORM_RING_INVALID;
00060    InputRegionStruct.addr = NULL;
00061    InputRegion = &InputRegionStruct;
00062    
00063    OutputRingName = "";
00064    OutputRingKey = WORM_RING_INVALID;
00065    OutputRegionStruct.addr = NULL;
00066    OutputRegion = &OutputRegionStruct;
00067       
00068 }
00069 //
00070 //-------------------------------------------------------------------
00071 //
00072 MutableServerBase::~MutableServerBase()
00073 {
00074    if (   OutputRingKey != WORM_RING_INVALID
00075        && OutputRingKey != InputRingKey
00076        && OutputRingKey != CommandRingKey
00077       )
00078    {
00079       if ( OutputRegionStruct.addr != NULL )
00080       {
00081          TLogger::Logit( LoggingOptions
00082                        , "~MutableServerBase(): detaching from output ring\n"
00083                        );
00084          tport_detach( &OutputRegionStruct );
00085          OutputRegionStruct.addr = NULL;
00086       }
00087 
00088       OutputRingKey = WORM_RING_INVALID;
00089    }
00090 
00091    if (   InputRingKey != WORM_RING_INVALID
00092        && InputRingKey != CommandRingKey
00093       )
00094    {
00095       if ( InputRegionStruct.addr != NULL )
00096       {
00097          TLogger::Logit( LoggingOptions
00098                        , "~MutableServerBase(): detaching from input ring\n"
00099                        );
00100          tport_detach( &InputRegionStruct );
00101          InputRegionStruct.addr = NULL;
00102       }
00103 
00104       InputRingKey = WORM_RING_INVALID;
00105    }
00106 
00107    if ( CommandRegion.addr != NULL )
00108    {
00109       TLogger::Logit( LoggingOptions
00110                     , "~MutableServerBase(): detaching from command ring\n"
00111                     );
00112       tport_detach( &CommandRegion );
00113       CommandRegion.addr = NULL;
00114    }
00115 
00116    if ( MessageMutex != NULL )
00117    {
00118       delete( MessageMutex );
00119    }
00120 
00121    if ( MessageBuffer != NULL )
00122    {
00123       delete [] MessageBuffer;
00124    }
00125 
00126    if ( TransmitBuffer != NULL )
00127    {
00128       delete [] TransmitBuffer;
00129    }
00130 
00131 }
00132 //
00133 //-------------------------------------------------------------------
00134 //
00135 HANDLE_STATUS MutableServerBase::HandleConfigLine( ConfigSource * p_parser )
00136 {
00137    // Do not manipulate ConfigState herein.
00138    //
00139    HANDLE_STATUS r_handled = HANDLER_USED;
00140 
00141    try
00142    {
00143       char * _token;
00144 
00145       int _ival;
00146 
00147       do
00148       {
00149          if ( p_parser->Its("ImA") )
00150          {
00151             _token = p_parser->String();
00152             if ( strlen(_token) == 0 )
00153             {
00154                throw worm_exception("Incomplete <ImA> line");
00155             }
00156 
00157             if ( p_parser->Its("StandAlone") )
00158             {
00159                Mode = MMT_STANDALONE;
00160                continue;
00161             }
00162 
00163             if ( p_parser->Its("Module") )
00164             {
00165                Mode = MMT_MODULE;
00166                continue;
00167             }
00168 
00169             if ( p_parser->Its("Server") )
00170             {
00171                Mode = MMT_SERVER;
00172                continue;
00173             }
00174 
00175             if ( p_parser->Its("Client") )
00176             {
00177                Mode = MMT_CLIENT;
00178                continue;
00179             }
00180          }
00181 
00182 
00183          // - - - - - - - - - - - - - - - - - - - - - - - - -
00184          // Client
00185          //
00186          if ( p_parser->Its("Provider") )
00187          {
00188             PROVIDER_ADDR _provider;
00189 
00190             _token = p_parser->String();
00191             if ( strlen(_token) == 0 )
00192             {
00193                throw worm_exception("Incomplete <Provider> line");
00194             }
00195             _provider.IPAddr = _token;
00196 
00197             _token = p_parser->String();
00198             if ( strlen(_token) == 0 )
00199             {
00200                throw worm_exception("Incomplete <Provider> line");
00201             }
00202             _provider.Port = _token;
00203 
00204             Providers.push_back( _provider );
00205 
00206             continue;
00207          }
00208 
00209          if ( p_parser->Its("ConnectFailDelay") )
00210          {
00211             if ( (_ival = p_parser->Int()) == p_parser->INVALID_INT )
00212             {
00213                throw worm_exception("Invalid or incomplete <ConnectFailDelay> line");
00214             }
00215             ConnectFailureSec = _ival;
00216 
00217             continue;
00218          }
00219 
00220 
00221          // - - - - - - - - - - - - - - - - - - - - - - - - -
00222          // Server
00223          //
00224          // see WormServerBase::HandleConfigLine()
00225 
00226 
00227          // - - - - - - - - - - - - - - - - - - - - - - - - -
00228          // Module
00229          //
00230          if ( p_parser->Its("InputRing") )
00231          {
00232             _token = p_parser->String();
00233             if ( strlen(_token) == 0 )
00234             {
00235                throw worm_exception("Missing <InputRing> value");
00236             }
00237             InputRingName = _token;
00238 
00239             if ( (InputRingKey = TGlobalUtils::LookupRingKey(InputRingName.c_str())) == WORM_RING_INVALID )
00240             {
00241                throw worm_exception("invalid <InputRing> value");
00242             }
00243 
00244             continue;
00245          }
00246 
00247          if ( p_parser->Its("OutputRing") )
00248          {
00249             _token = p_parser->String();
00250             if ( strlen(_token) == 0 )
00251             {
00252                throw worm_exception("missing <OutputRing> value");
00253             }
00254             OutputRingName = _token;
00255 
00256             if ( (OutputRingKey = TGlobalUtils::LookupRingKey(OutputRingName.c_str())) == WORM_RING_INVALID )
00257             {
00258                throw worm_exception("invalid <OutputRing> value");
00259             }
00260 
00261             continue;
00262          }
00263 
00264          if ( p_parser->Its("AcceptLogo") )
00265          {
00266             if ( ServeLogoCount == SERVE_MAX_LOGOS )
00267             {
00268                throw worm_exception("too many <AcceptLogo> lines");
00269             }
00270 
00271             // Institution
00272 
00273             _token = p_parser->String();
00274             if ( strlen(_token) == 0 )
00275             {
00276                throw worm_exception("missing <AcceptLogo> institution value");
00277             }
00278 
00279             ServeLogo[ServeLogoCount].instid = (WORM_INSTALLATION_ID)TGlobalUtils::LookupInstallationId(_token);
00280 
00281             // Module
00282 
00283             _token = p_parser->String();
00284             if ( strlen(_token) == 0 )
00285             {
00286                throw worm_exception("missing <AcceptLogo> module value");
00287             }
00288 
00289             ServeLogo[ServeLogoCount].mod = (WORM_MODULE_ID)TGlobalUtils::LookupModuleId(_token);
00290 
00291             // Type
00292 
00293             _token = p_parser->String();
00294             if ( strlen(_token) == 0 )
00295             {
00296                throw worm_exception("missing <AcceptLogo> type value");
00297             }
00298 
00299             ServeLogo[ServeLogoCount].type = (WORM_MSGTYPE_ID)TGlobalUtils::LookupMessageTypeId(_token);
00300 
00301             ServeLogoCount++;
00302 
00303             continue;
00304          }
00305 
00306 
00307          // - - - - - - - - - - - - - - - - - - - - - - - - -
00308          // Module or Server
00309          //
00310 
00311          if ( p_parser->Its("MaxMessageLength") )
00312          {
00313             int _wrkint = p_parser->Int();
00314             if ( _wrkint == ConfigSource::INVALID_INT )
00315             {
00316                throw worm_exception("Missing or invalid <MaxMessageLength> value");
00317             }
00318             MaxMessageLength = _wrkint;
00319 
00320             if ( (MessageBuffer = new char[MaxMessageLength]) == NULL )
00321             {
00322                throw worm_exception("failed allocating MessageBuffer");
00323             }
00324          }
00325 
00326 
00327          // Give the base class a whack at it
00328 
00329          r_handled = WormServerBase::HandleConfigLine( p_parser );
00330 
00331 
00332       } while ( false );
00333    }
00334    catch( worm_exception _we )
00335    {
00336       r_handled = HANDLER_INVALID;
00337       ConfigState = WORM_STAT_BADSTATE;
00338       TLogger::Logit( LoggingOptions
00339                    , "MutableServerBase::HandleConfigLine(): configuration error:\n%s\n"
00340                    , _we.what()
00341                    );
00342    }
00343 
00344    return r_handled;
00345 }
00346 //
00347 //-------------------------------------------------------------------
00348 //
00349 void MutableServerBase::CheckConfig()
00350 {
00351 
00352    switch ( Mode )
00353    {
00354      case MMT_NONE:
00355           TLogger::Logit( LoggingOptions
00356                         , "MutableServerBase::CheckConfig(): <ImA> line is required\n"
00357                         );
00358           ConfigState = WORM_STAT_BADSTATE;
00359 
00360           break;
00361 
00362      case MMT_CLIENT:
00363 
00364           if ( Providers.size() == 0 )
00365           {
00366              TLogger::Logit( LoggingOptions
00367                            , "MutableServerBase::CheckConfig(): client mode requires at least one <Provider> line\n"
00368                            );
00369              ConfigState = WORM_STAT_BADSTATE;
00370           }
00371 
00372           if ( ConnectFailureSec < 0 )
00373           {
00374              TLogger::Logit( LoggingOptions
00375                            , "MutableServerBase::CheckConfig() client mode requires <ConnectFailureSec> line\n"
00376                            );
00377              ConfigState = WORM_STAT_BADSTATE;
00378           }
00379           
00380           if ( SendTimeoutMS == -2 )
00381           {
00382              TLogger::Logit( LoggingOptions
00383                            , "MutableServerBase::CheckConfig() client mode requires <SendTimeoutMSecs> line\n"
00384                            );
00385              ConfigState = WORM_STAT_BADSTATE;
00386           }
00387 
00388           if ( RecvTimeoutMS < MIN_RECV_TIMEOUT_MS )
00389           {
00390              TLogger::Logit( LoggingOptions
00391                            , "WormServerBase::CheckConfig(): <RecvTimeoutMSecs> too low in config file, using %d\n"
00392                            , MIN_RECV_TIMEOUT_MS
00393                            );
00394              RecvTimeoutMS = MIN_RECV_TIMEOUT_MS;
00395           }
00396 
00397           break;
00398 
00399  
00400      case MMT_SERVER:
00401 
00402           if ( strlen(ServerIPAddr) == 0 )
00403           {
00404              TLogger::Logit( LoggingOptions
00405                            , "MutableServerBase::CheckConfig() server mode requires <ServerIPAddr> line\n"
00406                            );
00407              ConfigState = WORM_STAT_BADSTATE;
00408           }
00409 
00410           if ( ServerPort == 0 )
00411           {
00412              TLogger::Logit( LoggingOptions
00413                            , "MutableServerBase::CheckConfig() server mode requires <ServerPort> line\n"
00414                            );
00415              ConfigState = WORM_STAT_BADSTATE;
00416           }
00417 
00418           if ( CommandRingKey != WORM_RING_INVALID )
00419           {
00420              if ( MaxMessageLength == 0 )
00421              {
00422                 TLogger::Logit( LoggingOptions
00423                               , "MutableServerBase::CheckConfig() Error: missing or invalid <MaxMessageLength> line\n"
00424                               );
00425                 ConfigState = WORM_STAT_BADSTATE;
00426              }
00427           }
00428 
00429           break;
00430  
00431      case MMT_MODULE:
00432 
00433           if ( CommandRingKey == WORM_RING_INVALID )
00434           {
00435              TLogger::Logit( LoggingOptions
00436                            , "MutableServerBase::CheckConfig() module mode requires <CmdRingName> line\n"
00437                            );
00438              ConfigState = WORM_STAT_BADSTATE;
00439           }
00440 
00441           if ( InputRingKey == WORM_RING_INVALID )
00442           {
00443              InputRingKey = CommandRingKey;
00444           }
00445 
00446           if ( OutputRingKey == WORM_RING_INVALID )
00447           {
00448              OutputRingKey = InputRingKey;
00449           }
00450 
00451           if ( MaxMessageLength == 0 )
00452           {
00453              TLogger::Logit( LoggingOptions
00454                            , "MutableServerBase::CheckConfig() Error: missing or invalid <MaxMessageLength> line\n"
00455                            );
00456              ConfigState = WORM_STAT_BADSTATE;
00457           }
00458 
00459           ResultMsgLogo.instid = TGlobalUtils::GetThisInstallationId();
00460           ResultMsgLogo.mod    = TGlobalUtils::GetThisModuleId();
00461 
00462           if ( ServeLogoCount == 0 )
00463           {
00464              TLogger::Logit( LoggingOptions
00465                            , "MutableServerBase::CheckConfig() Error: no valid <AcceptLogo> lines found\n"
00466                            );
00467              ConfigState = WORM_STAT_BADSTATE;
00468           }
00469 
00470           if ( (ResultMsgLogo.type = TGlobalUtils::LookupMessageTypeId(OutputMessageTypeKey())) == WORM_MSGTYPE_INVALID )
00471           {
00472              TLogger::Logit( LoggingOptions
00473                            , "MutableServerBase::CheckConfig() Error: id not found for output message type %s\n"
00474                            , OutputMessageTypeKey()
00475                            );
00476              ConfigState = WORM_STAT_BADSTATE;
00477           }
00478 
00479           break;
00480  
00481    }
00482 
00483    
00484    // - - - - - - - - - - - - - - - - - - - - - - - - -
00485    // Standalone
00486    //
00487 
00488 //   TGlobalUtils::SetFileLoggingState( false );
00489 
00490 //   Mode = MMT_STANDALONE;
00491 
00492 }
00493 //
00494 //-------------------------------------------------------------------
00495 //
00496 bool MutableServerBase::CheckForThreadDeath()
00497 {
00498    try
00499    {
00500       switch( Mode )
00501       {
00502         case MMT_MODULE:
00503 
00504              if ( (LastStackerPulse + 6) < time(NULL) )
00505              {
00506                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "stacker thread stopped" );
00507                 throw worm_exception("Stacker thread stopped pulsing");
00508              }
00509 
00510              if ( (LastHandlerPulse + 6) < time(NULL) )
00511              {
00512                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "handler thread stopped" );
00513                 throw worm_exception("Handler thread stopped pulsing");
00514              }
00515           
00516              break;
00517 
00518         case MMT_SERVER:
00519 
00520              // length of acceptable delay in listener pulse is tied to the
00521              // length of time it takes for a new service thread to start up,
00522              // (since the listener is incapacitated during that time).
00523              //
00524              if ( (LastListenerPulse + 10 + 3) < time(NULL) )
00525              {
00526                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "listener thread stopped" );
00527                 throw worm_exception("Listener thread stopped pulsing");
00528              }
00529           
00530              break;
00531       }
00532    }
00533    catch( worm_exception _we )
00534    {
00535       return true;
00536    }
00537 
00538    return false;
00539 }
00540 //
00541 //-------------------------------------------------------------------
00542 //
00543 void MutableServerBase::StartThreadFunc( void * p_arg )
00544 {
00545    switch ( *((int *)p_arg) )
00546    {
00547      case THR_KEY_LISTENER:
00548           Listener(p_arg);  // p_arg is ignored
00549           break;
00550 
00551      case THR_KEY_STACKER:
00552           Stacker();  // p_arg is ignored
00553           break;
00554 
00555      case THR_KEY_HANDLER:
00556           Handler();  // p_arg is ignored
00557           break;
00558 
00559      default:
00560           ClientServicer( p_arg ); // p_arg is socket descriptor
00561           break;
00562    }
00563 }
00564 //
00565 //-------------------------------------------------------------------
00566 //
00567 // Server and Module
00568 //
00569 WORM_STATUS_CODE MutableServerBase::MainThreadActions()
00570 {
00571    static long CurrentTime
00572              , LastBeatTime
00573              ;
00574 
00575       // set running state to keep worker threads from dying
00576       //
00577       Running = true;
00578 
00579       //
00580       // START WORKER THREADS
00581       //
00582       int _arg;
00583 
00584       switch( Mode )
00585       {
00586         case MMT_MODULE:
00587 
00588              if ( (MessageMutex = new TMutex("msbmbmtx")) == NULL )
00589              {
00590                 throw worm_exception("Failed creating Message buffer mutex");
00591              }
00592 
00593              // Start processing thread
00594              //
00595              LastHandlerPulse = time(NULL) + 5; // allow an extra 5 seconds to start
00596              _arg = THR_KEY_HANDLER;
00597              if ( StartThreadWithArg( THREAD_STACK, &HandlerThreadId, &_arg ) == WORM_STAT_FAILURE )
00598              {
00599                 throw worm_exception("Server failed starting handler thread");
00600              }
00601              // give the thread a chance to start
00602              TTimeFuncs::MSecSleep(300);
00603 
00604              // Start stacker thread
00605              //
00606              LastStackerPulse = time(NULL) + 5; // allow an extra 5 seconds to start
00607              _arg = THR_KEY_STACKER;
00608              if ( StartThreadWithArg( THREAD_STACK, &StackerThreadId, &_arg ) == WORM_STAT_FAILURE )
00609              {
00610                 throw worm_exception("Server failed starting stacker thread");
00611              }
00612              // give the thread a chance to start
00613              TTimeFuncs::MSecSleep(250);
00614           
00615              break;
00616 
00617         case MMT_SERVER:
00618 
00619              if ( SocketDebug )
00620              {
00621                 // Turn Socket level debugging On/Off
00622                 setSocket_ewDebug(1);
00623              }
00624 
00625              // Start the Listener Thread
00626              //
00627              LastListenerPulse = time(NULL) + 5; // allow an extra 5 seconds to start
00628              _arg = THR_KEY_LISTENER;
00629              if ( StartThreadWithArg( THREAD_STACK, &ListenerThreadId, &_arg ) == WORM_STAT_FAILURE )
00630              {
00631                 throw worm_exception("Server failed starting listener thread");
00632              }
00633              // give the thread a chance to start
00634              TTimeFuncs::MSecSleep(250);
00635 
00636              break;
00637       }
00638 
00639 
00640       int shutdown_flag;
00641 
00642       // initialize main heartbeat times
00643       //
00644       time(&CurrentTime);
00645       LastBeatTime = CurrentTime;
00646 
00647       
00648       while ( Running )
00649       {
00650          // Module and Server check for shutdown commands
00651          //
00652          if ( CommandRingKey != WORM_RING_INVALID )
00653          {
00654             // Check for and respond to stop flags
00655             shutdown_flag = tport_getflag(&CommandRegion);
00656             if( shutdown_flag == TERMINATE  ||  shutdown_flag  == (int)TGlobalUtils::GetPID() )
00657             {
00658                Running = false; // signal all threads to return
00659                continue;
00660             }
00661 
00662             // Send heartbeat if it is time
00663             if( TGlobalUtils::GetHeartbeatInt() <= (time(&CurrentTime) - LastBeatTime) )
00664             {
00665                LastBeatTime = CurrentTime;
00666                SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT"), 0, "" );
00667             }
00668          }
00669 
00670 
00671          // signal handler sets global utils terminate flag, check that
00672          //
00673          if ( TGlobalUtils::GetTerminateFlag() )
00674          {
00675             Running = false; // signal all threads to return
00676             continue;
00677          }
00678 
00679 
00680          if ( CheckForThreadDeath() )
00681          {
00682             Running = false;
00683          }
00684          else
00685          {
00686             TTimeFuncs::MSecSleep(500);
00687          }
00688 
00689       } // Running
00690 
00691 
00692    Running = false;
00693 
00694    // All errors are thrown
00695    return WORM_STAT_SUCCESS;
00696 }
00697 //
00698 //-------------------------------------------------------------------
00699 //
00700 WORM_STATUS_CODE MutableServerBase::Run( int argc, char* argv[] )
00701 {
00702    WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00703 
00704    MutableServerRequest * _requestcontainer = NULL;
00705 
00706    MutableServerResult * _resultcontainer  = NULL;
00707 
00708    try
00709    {
00710       if ( ConfigState != WORM_STAT_SUCCESS )
00711       {
00712          throw worm_exception("MutableServerBase::Run() not properly configured");
00713       }
00714 
00715       // Set up the signal handler so we can shut down gracefully
00716            //
00717       signal(SIGINT, (SIG_HANDLR_PTR)SignalHandler);     /* <Ctrl-C> interrupt */
00718       signal(SIGTERM, (SIG_HANDLR_PTR)SignalHandler);    /* program termination request */
00719       signal(SIGABRT, (SIG_HANDLR_PTR)SignalHandler);    /* abnormal termination */
00720 #ifdef SIGBREAK
00721       signal(SIGBREAK, (SIG_HANDLR_PTR)SignalHandler);   /* keyboard break */
00722 #endif
00723 
00724 #ifdef _SOLARIS
00725       // Ignore broken socket signals
00726       (void)sigignore(SIGPIPE);
00727 #endif
00728 
00729 
00730       // Call base class's PrepareToRun() to init logging level
00731       // and to perform other initialization needed by derivative
00732       // classes.
00733       //
00734       if ( ! PrepareToRun() )
00735       {
00736          throw worm_exception("PrepareToRun() returned not ready");
00737       }
00738 
00739 
00740       switch( Mode )
00741       {
00742         case MMT_CLIENT:
00743    
00744              LoggingOptions = WORM_LOG_TOFILE;
00745 
00746              if ( (_requestcontainer = GetRequestContainer()) == NULL )
00747              {
00748                 throw worm_exception("Failed creating request container");
00749              }
00750 
00751              if ( GetRequestFromInput( argc, argv, _requestcontainer ) != WORM_STAT_SUCCESS )
00752              {
00753                 throw worm_exception("GetRequestFromInput() returned error");
00754              }
00755 
00756              if ( (_resultcontainer = GetResultContainer()) == NULL )
00757              {
00758                 throw worm_exception("Failed creating result container");
00759              }
00760 
00761              if ( TransmitRequest( _requestcontainer, _resultcontainer ) != WORM_STAT_SUCCESS )
00762              {
00763                 throw worm_exception("TransmitRequest() returned error");
00764              }
00765 
00766              // Handle special case of client that will return 
00767              // WORM_STAT_SUCCESS  = got good result
00768              // WORM_STAT_BADSTATE = did not get good result
00769              // WORM_STAT_FAILURE  = system failure
00770              //
00771              if ( (r_status = HandleResult( _resultcontainer )) == WORM_STAT_FAILURE )
00772              {
00773                 throw worm_exception("HandleResult() returned error");
00774              }
00775              
00776              break;
00777 
00778 
00779         case MMT_STANDALONE:
00780 
00781              LoggingOptions = WORM_LOG_TOFILE;
00782 
00783              if ( (_requestcontainer = GetRequestContainer()) == NULL )
00784              {
00785                 throw worm_exception("Failed creating request container");
00786              }
00787 
00788              if ( GetRequestFromInput( argc, argv, _requestcontainer ) != WORM_STAT_SUCCESS )
00789              {
00790                 throw worm_exception("GetRequestFromInput() returned error");
00791              }
00792 
00793              if ( (_resultcontainer = GetResultContainer()) == NULL )
00794              {
00795                 throw worm_exception("Failed creating result container");
00796              }
00797 
00798              if ( (r_status = ProcessRequest(_requestcontainer, _resultcontainer)) == WORM_STAT_SUCCESS )
00799              {
00800                 if ( HandleResult( _resultcontainer ) != WORM_STAT_SUCCESS )
00801                 {
00802                    throw worm_exception("HandleResult() returned error");
00803                 }
00804              }
00805 
00806              break;
00807 
00808 
00809         case MMT_MODULE:
00810 
00811              LoggingOptions = WORM_LOG_TOFILE|WORM_LOG_TOSTDERR;
00812 
00813              tport_attach( &CommandRegion, CommandRingKey );
00814              
00815              MainThreadActions();
00816 
00817              tport_detach( &CommandRegion );
00818 
00819              break;
00820 
00821 
00822         case MMT_SERVER:
00823 
00824              LoggingOptions = WORM_LOG_TOFILE|WORM_LOG_TOSTDERR;
00825 
00826              if ( CommandRingKey != WORM_RING_INVALID )
00827              {
00828                 tport_attach( &CommandRegion, CommandRingKey );
00829              }
00830              
00831              MainThreadActions();
00832 
00833              if ( CommandRingKey != WORM_RING_INVALID )
00834              {
00835                 tport_detach( &CommandRegion );
00836              }
00837 
00838              break;
00839       }
00840 
00841    }
00842    catch( worm_exception & _we )
00843    {
00844       Running = false;
00845       TLogger::Logit( LoggingOptions
00846                     , "MutableServerBase::Run() Error: %s\n"
00847                     , _we.what()
00848                     );
00849       r_status = WORM_STAT_FAILURE;
00850    }
00851    catch( ... )
00852    {
00853       Running = false;
00854       r_status = WORM_STAT_FAILURE;
00855    }
00856 
00857    if ( _requestcontainer != NULL )
00858    {
00859       delete( _requestcontainer );
00860    }
00861 
00862    if ( _resultcontainer != NULL )
00863    {
00864       delete( _resultcontainer );
00865    }
00866 
00867    return r_status;
00868 }
00869 //
00870 //-------------------------------------------------------------------
00871 //
00872 WORM_STATUS_CODE MutableServerBase::TransmitRequest( MutableServerRequest * p_request
00873                                                    , MutableServerResult  * r_result
00874                                                    )
00875 {
00876    WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00877 
00878    SOCKET _clientsocket = INVALID_SOCKET;
00879 
00880    char * _buffer = NULL;
00881 
00882    try
00883    {
00884       if ( p_request == NULL || r_result == NULL )
00885       {
00886          throw worm_exception("NULL parameter pointer");
00887       }
00888 
00889       switch( Mode )
00890       {
00891         case MMT_CLIENT:
00892              // send via socket
00893              {
00894                 int                 _lastprovider = 0
00895                   ,                 _currprovider = 0
00896                   ;
00897                 PROVIDER_ADDR       _provider_addr;
00898                 struct sockaddr_in  _socket_addr;
00899                 
00900 
00901                 bool _needresult = true;
00902 
00903    /* TODO: get last provider id from persistence store  */
00904 
00905                 _currprovider = _lastprovider;
00906 
00907                 // Willing to try each provider up to 2 times
00908                 //
00909                 for ( int _pass = 0 ; _needresult && _pass < MaxServerTryLoopCount ; _pass++ )
00910                 {
00911 
00912                    if ( _pass != 0 )
00913                    {
00914                       // sleep before trying to connect again
00915                       TTimeFuncs::MSecSleep(250);
00916                    }
00917 
00918 
00919                    do
00920                    {
00921                       _currprovider++;
00922 
00923 
00924                       if ( Providers.size() <= _currprovider )
00925                       {
00926                          _currprovider = 0;
00927                       }
00928 
00929                       _provider_addr = Providers[_currprovider];
00930 
00931                       // Fill socket address structure with  address and port
00932                       memset( (char *)&_socket_addr, 0, sizeof(_socket_addr) );
00933                       _socket_addr.sin_family = AF_INET;
00934                       _socket_addr.sin_port = htons( (short)atoi(_provider_addr.Port.c_str()) );
00935 
00936                       if ( (int)(_socket_addr.sin_addr.S_un.S_addr = inet_addr(_provider_addr.IPAddr.c_str())) == INADDR_NONE )
00937                       {
00938                          worm_exception _expt("inet_addr failed for ");
00939                                         _expt += _provider_addr.IPAddr;
00940                                         _expt += ":";
00941                                         _expt += _provider_addr.Port;
00942                          throw _expt;
00943                       }
00944 
00945                       if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
00946                       {
00947                          TLogger::Logit( LoggingOptions
00948                                        , "Attempting connection to provider %d: %s:%s\n"
00949                                        , _currprovider
00950                                        , _provider_addr.IPAddr.c_str()
00951                                        , _provider_addr.Port.c_str()
00952                                        );
00953                       }
00954 
00955 
00956                       if ( ( _clientsocket = socket_ew( AF_INET, SOCK_STREAM, 0) ) == INVALID_SOCKET )
00957                       {
00958                          throw worm_exception("socket_ew() failed");
00959                       }
00960 
00961                       if ( _clientsocket == SOCKET_ERROR )
00962                       {
00963                          _clientsocket = INVALID_SOCKET;
00964                          throw worm_exception("socket_ew() failed");
00965                       }
00966 
00967 
00968                       if ( connect_ew( _clientsocket
00969                                      , (struct sockaddr * )&_socket_addr
00970                                      , sizeof(_socket_addr)
00971                                      , ConnectFailureSec
00972                                      ) != 0 )
00973                       {
00974                          if( WORM_LOG_DETAILS <= TGlobalUtils::GetLoggingLevel() )
00975                          {
00976                             TLogger::Logit( LoggingOptions
00977                                          , "connect_ew(): failed for %s:%s\n"
00978                                          , _provider_addr.IPAddr.c_str()
00979                                          , _provider_addr.Port.c_str()
00980                                          );
00981                          }
00982                       }
00983                       else
00984                       {
00985 
00986                          // have a connection to a server
00987                          //
00988 
00989                          if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
00990                          {
00991                             TLogger::Logit( LoggingOptions
00992                                           , "Connected to %s:%s\n"
00993                                           , _provider_addr.IPAddr.c_str()
00994                                           , _provider_addr.Port.c_str()
00995                                           );
00996                          }
00997 
00998 
00999                          if ( (_buffer = new char[GetMaxSocketBufferSize()]) == NULL )
01000                          {
01001                             throw worm_exception("failed to allocate socket buffer");
01002                          }
01003 
01004 
01005                          // Set up thread info for base class handling
01006                          //
01007                          ServiceThreadInfoStruct _thr_info;
01008 
01009                          _thr_info.descriptor = _clientsocket;
01010                          strcpy ( _thr_info.ipaddr , _provider_addr.IPAddr.c_str() );
01011                 
01012                          ThreadsInfo[_clientsocket] = _thr_info;
01013 
01014                                
01015                          p_request->FormatBuffer();
01016 
01017                          int _msglen = p_request->GetBufferLength();
01018 
01019                          strcpy( _buffer, p_request->GetBuffer() );
01020 
01021                          // wait a quarter second for the server socket to prepare
01022                          //
01023                          TTimeFuncs::MSecSleep(250);
01024 
01025 
01026                          if( WORM_LOG_DEBUG <= TGlobalUtils::GetLoggingLevel() )
01027                          {
01028                             TLogger::Logit( LoggingOptions
01029                                           , "sending to descriptor %d (%d); message [%d]:\n%s<\n"
01030                                           , (int)_clientsocket
01031                                           , SendTimeoutMS
01032                                           , _msglen
01033                                           , _buffer
01034                                           );
01035                          }
01036 
01037 
01038                          if ( SendMessage(  _clientsocket
01039                                          ,  _buffer
01040                                          , &_msglen
01041                                          )
01042                               == WORM_STAT_FAILURE )
01043                          {
01044                             throw worm_exception("Failed sending request");
01045                          }
01046 
01047 
01048                          // willing to wait at least 5 seconds for 
01049                          time_t _quit_time = time(NULL)
01050                                            + ( RecvTimeoutMS / 1000 < 5 ? 5 : RecvTimeoutMS / 1000 )
01051                                            ;
01052 
01053 
01054                          bool _readfinished = false;
01055 
01056                          do
01057                          {
01058                             // Get message line from socket
01059                
01060                             _msglen = GetMaxSocketBufferSize();
01061 
01062                             switch( ListenForMsg(  _clientsocket
01063                                                 ,  _buffer
01064                                                 , &_msglen // in = max read ; out = actually read
01065                                                 ,   500   // 500 ms for this try
01066                                                 ) )
01067                             {
01068                               case  0:  // got message line
01069                                    _readfinished = r_result->ParseMessageLine( _buffer );
01070                                    break;
01071                               case -1:  // timed out
01072                                    if ( _quit_time < time(NULL) )
01073                                    {
01074                                       throw worm_exception("timed out waiting for server result");
01075                                    }
01076                                    break;
01077                               case -2:  // socket closed on other end
01078                                     throw worm_exception("Server closed socket");
01079                               case -3:  // socket error
01080                                     throw worm_exception("Socket error");
01081                               case -4:  // message too large for buffer
01082                                     throw worm_exception("Server response too large for buffer");
01083                             }
01084         
01085 
01086                          } while( ! _readfinished );
01087 
01088 
01089                          // Leave the loop unless the server responded that it
01090                          // was busy, which case try another server
01091                          //
01092                          if ( r_result->GetStatus() != MSB_RESULT_BUSY )
01093                          {
01094                             _needresult = false;
01095                          }
01096 
01097                       } // obtained a server connection
01098 
01099                       // Loop until connected or all providers tried once
01100                    } while(   _needresult
01101                            && _currprovider != _lastprovider 
01102                           );
01103 
01104                 } // try all providers for n passes
01105 
01106 
01107                 if ( _needresult )
01108                 {
01109                    if(   WORM_LOG_STATUS <= TGlobalUtils::GetLoggingLevel()
01110                       &&                    TGlobalUtils::GetLoggingLevel() < WORM_LOG_DETAILS
01111                      )
01112                    {
01113                          TLogger::Logit( LoggingOptions
01114                                        , "Unable to establish connection to providers:\n"
01115                                        );
01116                       for ( int _p = 0 ; _p < Providers.size() ; _p )
01117                       {
01118                          TLogger::Logit( LoggingOptions
01119                                        , "   %s:%s\n"
01120                                        , Providers[_p].IPAddr.c_str()
01121                                        , Providers[_p].Port.c_str()
01122                                        );
01123                       }
01124                    }
01125                    r_status = WORM_STAT_DISCONNECT;
01126                 }
01127 
01128 
01129              } // client Mode
01130              break;
01131 
01132         default:
01133              // other modes invalid
01134              {
01135                 worm_exception _expt("Invalid call for ");
01136                                _expt +=                MSB_MODE_NAME[Mode];
01137                                _expt +=                                   " mode.";
01138                 throw _expt;
01139              }
01140       }
01141    }
01142    catch( worm_exception & _we )
01143    {
01144       r_status = WORM_STAT_FAILURE;
01145 
01146       if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01147       {
01148          TLogger::Logit( LoggingOptions
01149                       , "MutableServerBase::TransmitRequest(): %s\n"
01150                       , _we.what()
01151                       );
01152       }
01153    }
01154    catch( ... )
01155    {
01156       r_status = WORM_STAT_FAILURE;
01157 
01158       if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01159       {
01160          TLogger::Logit( LoggingOptions
01161                       , "MutableServerBase::TransmitRequest(): Unknown error\n"
01162                       );
01163       }
01164    }
01165 
01166 
01167    if ( _clientsocket != INVALID_SOCKET )
01168    {
01169       closesocket_ew( _clientsocket, SOCKET_CLOSE_IMMEDIATELY_EW );
01170    }
01171 
01172    return r_status;
01173 }
01174 //
01175 //-------------------------------------------------------------------
01176 //
01177 WORM_STATUS_CODE MutableServerBase::TransmitResult(       MutableServerResult * p_result
01178                                                   , const SOCKET              * p_socketdescriptor /* = NULL */
01179                                                   )
01180 {
01181    WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
01182 
01183    MutableServerResult * _workresult = p_result;
01184    bool _myresult = false;
01185 
01186    try
01187    {
01188       int _neededlength;
01189 
01190       switch( Mode )
01191       {
01192         case MMT_SERVER:
01193              // send via socket
01194              if ( p_socketdescriptor == NULL )
01195              {
01196                 throw worm_exception("SOCKET pointer NULL");
01197              }
01198              else
01199              {
01200                 int    _msglen;
01201 
01202                 SOCKET _socket = (SOCKET)*p_socketdescriptor;
01203 
01204                 if ( _workresult == NULL )
01205                 {
01206                    if ( WORM_LOG_ERRORS <= LoggingLevel )
01207                    {
01208                       TLogger::Logit( LoggingOptions
01209                                     , "NULL result container pointer\n"
01210                                     );
01211                    }
01212                    if ( (_workresult = GetResultContainer()) == NULL )
01213                    {
01214                       throw worm_exception("Failed to create work result buffer for error");
01215                    }
01216                    _myresult = true;
01217                    _workresult->SetStatus( MSB_RESULT_ERROR );
01218                 }
01219 
01220                 _workresult->FormatBuffer();
01221                 _msglen = p_result->GetBufferLength();
01222 
01223 
01224 TLogger::Logit( LoggingOptions
01225               , "DEBUG [%d] returning status to client at %d; message [%d]:\n%s<\n"
01226               , SendTimeoutMS
01227               , (int)_socket
01228               , _msglen
01229               , p_result->GetBuffer()
01230               );
01231 
01232                 if ( SendMessage(  _socket
01233                                 ,  p_result->GetBuffer()
01234                                 , &_msglen
01235                                 ) != WORM_STAT_SUCCESS
01236                    )
01237                 {
01238                    throw worm_exception("Failed sending result across socket");
01239                 }
01240              }
01241              break;
01242 
01243         case MMT_MODULE:
01244 
01245              // send via ring
01246 
01247              if ( p_result == NULL )
01248              {
01249                 // this represents an error condition ("-1\n\n")
01250                 _neededlength = 4;
01251              }
01252              else
01253              {
01254                 p_result->FormatBuffer();
01255                 _neededlength = p_result->GetBufferLength();
01256              }
01257 
01258              if ( TransmitBufferLength < _neededlength )
01259              {
01260                 if ( TransmitBuffer != NULL )
01261                 {
01262                    delete [] TransmitBuffer;
01263                 }
01264                 TransmitBuffer = NULL;
01265              }
01266 
01267              if ( TransmitBuffer == NULL )
01268              {
01269                 if ( (TransmitBuffer = new char[p_result->GetBufferLength()+100]) == NULL )
01270                 {
01271                    throw worm_exception("Failed allocating transmit buffer");
01272                 }
01273                 TransmitBufferLength = p_result->GetBufferLength()+100;
01274              }
01275 
01276 
01277              TransmitBuffer[0] = '\0';
01278 
01279              if ( p_result == NULL )
01280              {
01281                 strcpy( TransmitBuffer , "-1\n\n" );
01282              }
01283              else
01284              {
01285                 strcpy( TransmitBuffer , p_result->GetBuffer() );
01286              }
01287 
01288              if ( tport_putmsg(  OutputRegion
01289                               , &ResultMsgLogo
01290                               ,  _neededlength
01291                               ,  TransmitBuffer
01292                               )
01293                   != PUT_OK
01294                 )
01295              {
01296                 throw worm_exception("tport_putmsg() failed");
01297              }
01298 
01299              break;
01300 
01301         default:
01302              // other modes invalid
01303              {
01304                 worm_exception _expt("invalid call for ");
01305                                _expt +=                MSB_MODE_NAME[Mode];
01306                                _expt +=                                  " mode.";
01307                 throw _expt;
01308              }
01309       }
01310    }
01311    catch( worm_exception & _we )
01312    {
01313       r_status = WORM_STAT_FAILURE;
01314 
01315       if( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01316       {
01317          TLogger::Logit( LoggingOptions
01318                       , "MutableServerBase::TransmitResult(): %s\n"
01319                       , _we.what()
01320                       );
01321       }
01322    }
01323 
01324    if ( _myresult && _workresult != NULL )
01325    {
01326       delete ( _workresult );
01327    }
01328 
01329    return r_status;
01330 }
01331 //
01332 //-------------------------------------------------------------------
01333 //
01334 THREAD_RETURN MutableServerBase::Stacker()
01335 {
01336    static InstallWildcard = TGlobalUtils::LookupInstallationId("INST_WILDCARD");
01337    static ModuleWildcard  = TGlobalUtils::LookupModuleId("MOD_WILDCARD");
01338    static MessageWildcard = TGlobalUtils::LookupMessageTypeId("TYPE_WILDCARD");
01339 
01340    int      _getmsg_status;
01341         MSG_LOGO _arrivelogo;      // logo of arriving message
01342    long     _arr_msg_len;     // length of arriving message
01343    bool     _msg_ready;
01344    char     _errormsg[300];   //
01345 
01346    bool _attached = false;
01347 
01348 TLogger::Logit( LoggingOptions
01349               , "MutableServerBase::Stacker(): A\n"
01350               );
01351 
01352    try
01353    {
01354       LastStackerPulse = time(NULL); // "I'm still alive"
01355 
01356 
01357       if ( InputRingKey == CommandRingKey )
01358       {
01359          InputRegion = &CommandRegion;
01360       }
01361       else
01362       {
01363          InputRegion = &InputRegionStruct;
01364          tport_attach( InputRegion, InputRingKey );
01365          _attached = true;
01366       }
01367 
01368 
01369       // Skip any messages already in ring
01370       //
01371       do
01372       {
01373          _getmsg_status = tport_getmsg(  InputRegion
01374                                       ,  ServeLogo
01375                                       ,  ServeLogoCount
01376                                       , &_arrivelogo
01377                                       , &_arr_msg_len
01378                                       ,  MessageBuffer
01379                                       ,  MaxMessageLength
01380                                       );
01381       }
01382       while ( _getmsg_status != GET_NONE );
01383       
01384 
01385       // Add wildcard to those accepted
01386 
01387       while ( Running )
01388       {
01389          LastStackerPulse = time(NULL); // "I'm still alive"
01390 
01391          _msg_ready = true;
01392 
01393          switch( (_getmsg_status = tport_getmsg(  InputRegion
01394                                                ,  ServeLogo
01395                                                ,  ServeLogoCount
01396                                                , &_arrivelogo
01397                                                , &_arr_msg_len
01398                                                ,  MessageBuffer
01399                                                ,  MaxMessageLength
01400                                                ))
01401                )
01402          {
01403            case GET_OK:
01404                 break;
01405 
01406            case GET_MISS:
01407            case GET_MISS_LAPPED:
01408                 // report error, handle message
01409                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_MISSMSG, "missed messages" );
01410                 break;
01411 
01412            case GET_MISS_SEQGAP:
01413                 // report error, handle message
01414                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_MISSMSG, "saw sequence gap" );
01415                 break;
01416 
01417            case GET_NOTRACK:
01418                 // report error, handle message
01419                 sprintf( _errormsg
01420                        , "no tracking for logo i%d m%d t%d in %s"
01421                        , (int)_arrivelogo.instid
01422                        , (int)_arrivelogo.mod
01423                        , (int)_arrivelogo.type
01424                        , InputRingName
01425                        );
01426                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_NOTRACK, _errormsg );
01427                 break;
01428 
01429            case GET_TOOBIG:
01430                 // report error, return to main loop to sleep
01431                 sprintf( _errormsg
01432                        , "tport msg[%ld] i%d m%d t%d too big. Max is %ld"
01433                        , _arr_msg_len
01434                        , (int)_arrivelogo.instid
01435                        , (int)_arrivelogo.mod
01436                        , (int)_arrivelogo.type
01437                        , MaxMessageLength
01438                        );
01439                 SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), MSB_ERR_TOOBIG, _errormsg );
01440 
01441                 _msg_ready = false;
01442 
01443                break;
01444 
01445            case GET_NONE:
01446 
01447                 TTimeFuncs::MSecSleep(500);
01448                 _msg_ready = false;
01449 
01450                break;
01451 
01452          }  // switch( tport_getmsg() )
01453 
01454 
01455          if ( _msg_ready )
01456          {
01457             // NULL-terminate string buffer
01458             MessageBuffer[_arr_msg_len] = 0;
01459 
01460             // truncate end-of-line if present
01461             if ( MessageBuffer[_arr_msg_len-1] == '\n' )
01462             {
01463                _arr_msg_len--;
01464                MessageBuffer[_arr_msg_len] = 0;
01465             }
01466 
01467             for ( int _i = 0, _sz = ServeLogoCount ; _i < _sz ; _i++ )
01468             {
01469                if (   (   ServeLogo[_i].instid == InstallWildcard
01470                        || _arrivelogo.instid   == InstallWildcard
01471                        || _arrivelogo.instid   == ServeLogo[_i].instid
01472                       )
01473                    && (   ServeLogo[_i].mod == ModuleWildcard
01474                        || _arrivelogo.mod   == ModuleWildcard
01475                        || _arrivelogo.mod   == ServeLogo[_i].mod
01476                       )
01477                    && (   ServeLogo[_i].type == MessageWildcard
01478                        || _arrivelogo.type   == MessageWildcard
01479                        || _arrivelogo.type   == ServeLogo[_i].type
01480                       )
01481                   )
01482                {
01483                   std::string _newmsg = MessageBuffer;
01484 
01485                   // put the new message on the queue for the handler thread
01486                   MessageQueue.push( _newmsg );
01487 
01488                   break;
01489                }
01490             } // check each logo for match
01491          } // msg_ready
01492 
01493       } // Running
01494 
01495    }
01496    catch( worm_exception & _we )
01497    {
01498       TLogger::Logit( LoggingOptions
01499                     , "MutableServerBase::Stacker() Error: %s"
01500                     , _we.what()
01501                     );
01502       TransmitResult( NULL );
01503    }
01504    catch( ... )
01505    {
01506       TLogger::Logit( LoggingOptions
01507                     , "MutableServerBase::Stacker() Unknown Error"
01508                     );
01509       TransmitResult( NULL );
01510    }
01511 
01512    if ( _attached )
01513    {
01514       tport_detach( InputRegion );
01515    }
01516 }
01517 //
01518 //-------------------------------------------------------------------
01519 //
01520 THREAD_RETURN MutableServerBase::Handler()
01521 {
01522 
01523    bool _attached = false;
01524 
01525    std::string _message;
01526 
01527    MutableServerRequest * _requestcontainer = NULL;
01528 
01529    MutableServerResult * _resultcontainer = NULL;
01530 
01531    try
01532    {
01533       LastHandlerPulse = time(NULL); // "I'm still alive"
01534 
01535 
01536       if ( (_requestcontainer = GetRequestContainer()) == NULL )
01537       {
01538          throw worm_exception("failed to create request container");
01539       }
01540 
01541       if ( (_resultcontainer = GetResultContainer()) == NULL )
01542       {
01543          throw worm_exception("failed to create result container");
01544       }
01545 
01546 
01547       if      ( OutputRingKey == CommandRingKey )
01548       {
01549          OutputRegion = &CommandRegion;
01550       }
01551       else if ( OutputRingKey == InputRingKey )
01552       {
01553          OutputRegion = &InputRegionStruct;
01554       }
01555       else
01556       {
01557          OutputRegion = &OutputRegionStruct;
01558          tport_attach( OutputRegion, OutputRingKey );
01559          _attached = true;
01560       }
01561 
01562 
01563       WORM_STATUS_CODE _processing_state;
01564 
01565 
01566       while ( Running )
01567       {
01568          LastHandlerPulse = time(NULL); // "I'm still alive"
01569 
01570          if ( MessageQueue.empty() )
01571          {
01572             TTimeFuncs::MSecSleep(500);
01573          }
01574          else
01575          {
01576             // get the next message
01577             _message = MessageQueue.front();
01578 
01579             // remove the message from the queue
01580             MessageQueue.pop();
01581 
01582             try
01583             {
01584                _requestcontainer->ParseFromBuffer( _message.c_str() );
01585             }
01586             catch( worm_exception & _we )
01587             {
01588                TLogger::Logit( LoggingOptions
01589                              , "MutableServerBase::Handler() Error parsing message:\n%s\n%s\n"
01590                              , _message.c_str()
01591                              , _we.what()
01592                              );
01593 
01594                //  loop for next messsage
01595                continue;
01596             }
01597          
01598 
01599             _processing_state = ProcessRequest( _requestcontainer, _resultcontainer );
01600 
01601             if ( _processing_state == WORM_STAT_FAILURE )
01602             {
01603                throw worm_exception("ProcessRequest() returned error");
01604             }
01605 
01606             if ( TransmitResult( _resultcontainer ) != WORM_STAT_SUCCESS )
01607             {
01608                // failure to transmit here means that we're unable
01609                // to send the response to the ring -- should die
01610                throw worm_exception("ParseRequestFromMessage() returned error");
01611             }
01612 
01613          } // handling pending message
01614 
01615       } // Running
01616 
01617    }
01618    catch( worm_exception & _we )
01619    {
01620       TLogger::Logit( LoggingOptions
01621                     , "MutableServerBase::Handler() Error: %s\n"
01622                     , _we.what()
01623                     );
01624       TransmitResult( NULL );
01625    }
01626    catch( ... )
01627    {
01628       TLogger::Logit( LoggingOptions
01629                     , "MutableServerBase::Handler() Unknown Error\n"
01630                     );
01631       TransmitResult( NULL );
01632    }
01633 
01634 
01635    if ( _attached )
01636    {
01637       tport_detach( OutputRegion );
01638    }
01639 
01640 
01641    if ( _requestcontainer != NULL )
01642    {
01643       delete( _requestcontainer );
01644    }
01645 
01646    if ( _resultcontainer != NULL )
01647    {
01648       delete( _resultcontainer );
01649    }
01650 }
01651 //
01652 //-------------------------------------------------------------------
01653 //
01654 THREAD_RETURN MutableServerBase::ClientServicer( void * p_socketdescriptor )
01655 {
01656    if ( p_socketdescriptor == NULL )
01657    {
01658       TLogger::Logit( LoggingOptions
01659                     , "MutableServerBase::ClientServicer() Error: NULL descriptor parameter\n"
01660                     );
01661       return;
01662    }
01663 
01664    char * _buffer = NULL;
01665 
01666    MutableServerRequest * _requestcontainer = NULL;
01667 
01668    MutableServerResult * _resultcontainer = NULL;
01669 
01670 
01671    SOCKET _mapkey = *((SOCKET *)p_socketdescriptor);
01672 
01673    bool _send_error_message = false;
01674 
01675 TLogger::Logit( LoggingOptions
01676               , "MutableServerBase::ClientServicer(): descriptor %d\n"
01677               , (int)_mapkey
01678               );
01679 
01680    try
01681    {
01682       // tell the main thread this thread is alive
01683       ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01684       ThreadsInfo[_mapkey].status = THREAD_INITIALIZING;
01685 
01686       if ( ThreadsInfo.size() == MaxClients )
01687       {
01688          // no more service threads allowed, tell client that
01689          // we're busy
01690 
01691          if ( (_resultcontainer = GetResultContainer()) == NULL )
01692          {
01693             throw worm_exception("Failed creating result container");
01694          }
01695 
01696          ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01697          ThreadsInfo[_mapkey].status = THREAD_WAITING;
01698 
01699          _resultcontainer->SetStatus( MSB_RESULT_BUSY );
01700 
01701          if ( TransmitResult( _resultcontainer, &_mapkey ) != WORM_STAT_SUCCESS )
01702          {
01703             throw worm_exception("TransmitResult() returned error");
01704          }
01705       }
01706       else
01707       {
01708          if ( (_buffer = new char[GetMaxSocketBufferSize()+1]) == NULL )
01709          {
01710             throw worm_exception("Failed getting request buffer");
01711          }
01712 
01713          if ( (_requestcontainer = GetRequestContainer()) == NULL )
01714          {
01715             throw worm_exception("Failed creating request container");
01716          }
01717 
01718          if ( (_resultcontainer = GetResultContainer()) == NULL )
01719          {
01720             throw worm_exception("Failed creating result container");
01721          }
01722 
01723          ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01724          ThreadsInfo[_mapkey].status = THREAD_WAITING;
01725 
01726 
01727          bool _messagefinished = false
01728             , _readfinished    = false
01729             ;
01730 
01731          int _length;
01732 
01733 
01734          // set number of seconds to wait for client's message
01735          //
01736          time_t _quit_time = time(NULL)
01737                            + ( RecvTimeoutMS / 1000 < 5 ? 5 : RecvTimeoutMS / 1000 )
01738                            ;
01739 
01740          if ( WORM_LOG_STATUS <= LoggingLevel )
01741          {
01742             TLogger::Logit( LoggingOptions
01743                           , "MutableServerBase::ClientServicer(): calling ListenForMsg(,,,%d)\n"
01744                           , RecvTimeoutMS
01745                           );
01746          }
01747 
01748 
01749          do
01750          {
01751             // Get message line from socket
01752 
01753          
01754             _length = GetMaxSocketBufferSize();
01755 
01756             switch ( ListenForMsg(  _mapkey
01757                                  ,  _buffer
01758                                  , &_length
01759                                  ,  500   // 500 ms for this pass
01760                                  )
01761                    )
01762             {
01763               case 0:    // success
01764                    _buffer[_length] = '\0';
01765 
01766                    if ( WORM_LOG_ERRORS <= LoggingLevel )
01767                    {
01768                       TLogger::Logit( LoggingOptions
01769                                     , "MutableServerBase::ClientServicer(): received msg line: %s\n"
01770                                     , _buffer
01771                                     );
01772                    }
01773 
01774                    _readfinished = _messagefinished = _requestcontainer->ParseMessageLine( _buffer );
01775 /*
01776                    if ( ! (_messagefinished = _requestcontainer->ParseMessageLine( _buffer )) )
01777                    {
01778                       _readfinished = false;
01779                    }
01780                    else
01781                    {
01782                       _readfinished = true;
01783                    }
01784 */
01785                    break;
01786 
01787               case -1:  // timed out
01788                    if ( _quit_time < time(NULL) )
01789                    {
01790                       _send_error_message = true;
01791                       throw worm_exception("Client connection timed out");
01792                    }
01793                    break;
01794 
01795               case -2:  // client closed socket
01796                    throw worm_exception("Client closed connection");
01797 
01798               case -3:  // socket error
01799                    throw worm_exception("Socket error");
01800 
01801               case -4:  // msg too large for buffer
01802                    _send_error_message = true;
01803                    throw worm_exception("Message too large for transmit buffer");
01804             }
01805 
01806 
01807 //TLogger::Logit( LoggingOptions
01808 //              , "MutableServerBase::ClientServicer(): DEBUG M\n"
01809 //              );
01810         
01811 
01812          } while( ! _readfinished );
01813 
01814 TLogger::Logit( LoggingOptions
01815               , "MutableServerBase::ClientServicer(): DEBUG N\n"
01816               );
01817 
01818          ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01819          ThreadsInfo[_mapkey].status = THREAD_PROCESSING;
01820 
01821          if ( _messagefinished )
01822          {
01823 
01824 TLogger::Logit( LoggingOptions
01825               , "MutableServerBase::ClientServicer(): DEBUG O -- message finished\n"
01826               );
01827 
01828             WORM_STATUS_CODE _state = ProcessRequest( _requestcontainer, _resultcontainer );
01829 
01830             ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01831 
01832             if ( _state == WORM_STAT_FAILURE )
01833             {
01834                throw worm_exception("ProcessRequest() returned error");
01835             }
01836 
01837             ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01838 
01839             if ( TransmitResult( _resultcontainer, &_mapkey ) != WORM_STAT_SUCCESS )
01840             {
01841                throw worm_exception("TransmitResult() returned error");
01842             }
01843          }
01844          else
01845          {
01846             throw worm_exception("Incomplete message received");
01847          }
01848 
01849 //         ThreadsInfo[_mapkey].lastpulse = time(NULL); // "I'm still alive"
01850       } // service thread(s) not maxed out
01851 
01852       ThreadsInfo[_mapkey].status = THREAD_COMPLETED;
01853    
01854    }
01855    catch( worm_exception & _we )
01856    {
01857       ThreadsInfo[_mapkey].status = THREAD_ERROR;
01858       TLogger::Logit( LoggingOptions
01859                     , "MutableServerBase::ClientServicer(thr: %d) Error: %s\n"
01860                     , ThreadsInfo[_mapkey].threadid
01861                     , _we.what()
01862                     );
01863       if ( _send_error_message )
01864       {
01865          TransmitResult( NULL, &_mapkey );
01866       }
01867    }
01868    catch( ... )
01869    {
01870       ThreadsInfo[_mapkey].status = THREAD_ERROR;
01871       TLogger::Logit( LoggingOptions
01872                     , "MutableServerBase::ClientServicer(thr: %d) Unknown error\n"
01873                     , ThreadsInfo[_mapkey].threadid
01874                     );
01875       if ( _send_error_message )
01876       {
01877          TransmitResult( NULL, &_mapkey );
01878       }
01879    }
01880 
01881 
01882    // Close socket
01883 
01884    if ( _mapkey != NULL )
01885    {
01886       closesocket_ew( _mapkey, SOCKET_CLOSE_IMMEDIATELY_EW );
01887    }
01888 
01889    if ( _buffer != NULL )
01890    {
01891       delete[] _buffer;
01892    }
01893 
01894    if ( _requestcontainer != NULL )
01895    {
01896       delete( _requestcontainer );
01897    }
01898 
01899    if ( _resultcontainer != NULL )
01900    {
01901       delete( _resultcontainer );
01902    }
01903 }
01904 

Generated on Tue May 6 09:16:06 2003 for Earthworm Libs by doxygen1.3-rc3