Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

serverbase.cpp

Go to the documentation of this file.
00001 //---------------------------------------------------------------------------
00002 #include "serverbase.h"
00003 
00004 
00005 #include <worm_signal.h>
00006 #include <worm_exceptions.h>
00007 #include <logger.h>
00008 #include <timefuncs.h> // MSecSleep()
00009 
00010 
00011 // A Borland pragma, ignored by other compilers
00012 #pragma package(smart_init)
00013 
00014 
00015 #define ACCEPTABLE_SEC_TO_WAIT_FOR_SERVICE_THREAD_TO_START 10
00016 
00017 //---------------------------------------------------------------------------
00018 //---------------------------------------------------------------------------
00019 //---------------------------------------------------------------------------
00020 
00021 bool WormServerBase::DoSCNLsMatch( const char * p_s1
00022                                  , const char * p_c1
00023                                  , const char * p_n1
00024                                  , const char * p_l1
00025                                  , const char * p_s2
00026                                  , const char * p_c2
00027                                  , const char * p_n2
00028                                  , const char * p_l2
00029                                  )
00030 {
00031    // For each descriptor type (S, C, N, L)
00032    // a match is: either string starts with '*'
00033    //             an exact string match
00034    //
00035    if ( ! (   p_s1[0] == '*'
00036            || p_s2[0] == '*'
00037            || strcmp( p_s1, p_s2 ) == 0
00038       )   )
00039    {
00040       return false;
00041    }
00042 
00043    if ( ! (   p_c1[0] == '*'
00044            || p_c2[0] == '*'
00045            || strcmp( p_c1, p_c2 ) == 0
00046       )   )
00047    {
00048       return false;
00049    }
00050 
00051    if ( ! (   p_n1[0] == '*'
00052            || p_n2[0] == '*'
00053            || strcmp( p_n1, p_n2 ) == 0
00054       )   )
00055    {
00056       return false;
00057    }
00058 
00059    if ( ! (   p_l1[0] == '-'
00060            || p_l2[0] == '-'
00061            || strcmp( p_l1, p_l2 ) == 0
00062       )   )
00063    {
00064       return false;
00065    }
00066 
00067    return true;
00068 }
00069 
00070 //---------------------------------------------------------------------------
00071 //---------------------------------------------------------------------------
00072 //---------------------------------------------------------------------------
00073 WormServerBase::WormServerBase()
00074 {
00075    Running = false;
00076    CommandRingKey = WORM_RING_INVALID;
00077    CommandRegion.addr = NULL;
00078    strcpy( ServerIPAddr, "" );
00079    ServerPort = -1;
00080    SocketDebug = false;
00081    PassiveSocket = INVALID_SOCKET;
00082    MaxServiceThreads = 10;
00083    SendTimeoutMS = -2;
00084    RecvTimeoutMS = -1;
00085 
00086    try
00087    {
00088       if ( TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT") == WORM_MSGTYPE_INVALID )
00089       {
00090          throw worm_exception("message type <TYPE_HEARTBEAT> not defined");
00091       }
00092 
00093       if ( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR") == WORM_MSGTYPE_INVALID )
00094       {
00095          throw worm_exception("message type <TYPE_ERROR> not defined");
00096       }
00097 
00098       SocketSysInit();
00099    }
00100    catch( worm_exception _we )
00101    {
00102       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00103                     , "WormServerBase(): error: %s\n"
00104                     , _we.what()
00105                     );
00106    }
00107 }
00108 //---------------------------------------------------------------------------
00109 WormServerBase::~WormServerBase()
00110 {
00111    // tell all threads to quit
00112    Running = false;
00113    // give accept_ew a chance to return before closing socketing system
00114    TTimeFuncs::MSecSleep(1000);
00115    WSACleanup();
00116 }
00117 //---------------------------------------------------------------------------
00118 int WormServerBase::ListenForMsg( SOCKET  p_descriptor
00119                                 , char *  p_rcv_buffer
00120                                 , int *   p_length // in = max read ; out = actually read
00121                                 , int     p_timeoutms
00122                                 )
00123 {
00124 //   if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00125 //   {
00126 //      TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00127 //                    , "WormServerBase.ListenForMsg(): listening at the socket %d...\n"
00128 //                    , p_descriptor
00129 //                    );
00130 //   }
00131 
00132    int r_status = 0;
00133 
00134    int r_readcount = 0;
00135 
00136    int _readmax = *p_length;
00137 
00138    // if p_timeoutms greater than zero, then use it for the
00139    // timeout, otherwise use RecvTimeoutMS.  (In the method
00140    // declaration, p_timeoutms is defaulted to -1 -- so it
00141    // will be ignored).
00142    //
00143    int _timeoutms = ( p_timeoutms < 1 ? RecvTimeoutMS : p_timeoutms );
00144 
00145    // report actual number read
00146    *p_length = 0;
00147 
00148    bool _stillreading = true;
00149 
00150    try
00151    {
00152       if ( _timeoutms == -1 )
00153       {
00154          // tell the main thread that we're entering a potentially
00155          // blocked state so it will not kill this thread during a
00156          // long block.
00157          ThreadsInfo[p_descriptor].status = THREAD_BLOCKINGSOCKET;
00158       }
00159 
00160       // Listen to the socket for incoming characters until our buffer is full
00161       // or '\n' encountered....
00162       //
00163       do
00164       {
00165 
00166          if ( r_readcount == _readmax )
00167          {
00168             // buffer is maxed-out, can't read anymore.
00169             r_status = -4;
00170             throw worm_exception("message buffer overflow");
00171          }
00172 
00173          // tell the main thread this thread still alive
00174          ThreadsInfo[p_descriptor].lastpulse = time(NULL); // "I'm still alive"
00175 
00176          // although this single-char read is inefficient, messages from the clients
00177          // tend to be relatively rare and this method allows us to identify
00178          // the end-of-messge char easily.
00179          //
00180          // In addition, the handling for a socket time-out is optimistic,
00181          // specifically, it only cleanly handles the case of a timeout between
00182          // messages, not in the middle of one.
00183          //
00184          // Ultimately, this should be supplemented with an additional buffering
00185          // area that receives a larger amount from the socket, and can be manipulated
00186          // to identify message termination.
00187          //
00188 //         if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00189 //         {
00190 //            TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDOUT|WORM_LOG_TIMESTAMP
00191 //                          , "WormServerBase.ListenForMsg(): calling recv_ew( %d , %d )\n"
00192 //                          , (int)p_descriptor
00193 //                          , _timeoutms
00194 //                          );
00195 //         }
00196 
00197          int _err;
00198          switch( recv_ew( p_descriptor, &p_rcv_buffer[r_readcount], 1, 0, _timeoutms ) )
00199          {
00200            case 1:
00201                 if ( p_rcv_buffer[r_readcount] == '\n' )
00202                 {
00203                    /* If we got a newline, we're done gathering */
00204                    /* replace the \n with a zero */
00205                    p_rcv_buffer[r_readcount] = '\0';
00206                    _stillreading = false;
00207                 }
00208                 (*p_length)++;
00209                 r_readcount++;
00210                 break;
00211 
00212            case 0:
00213                 r_status = -2;
00214                 if ( WORM_LOG_DETAILS <= TGlobalUtils::GetLoggingLevel() )
00215                 {
00216                    TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00217                                  , "WormServerBase.ListenForMsg(): Client closed socket %d\n"
00218                                  , p_descriptor
00219                                  );
00220                 }
00221                 _stillreading = false;
00222                 break;
00223 
00224            case SOCKET_ERROR:
00225 #if defined(_WINNT) || defined(_Windows)
00226                 switch( (_err = WSAGetLastError()) )
00227                 {
00228                   case WSAEWOULDBLOCK:
00229                        // timed out -- nothing now available on non-blocking socket
00230                        r_status = -1;
00231                        _stillreading = false;
00232                        break;
00233 
00234                   case WSAECONNRESET:
00235                        r_status = -2;
00236                        if ( WORM_LOG_DETAILS <= TGlobalUtils::GetLoggingLevel() )
00237                        {
00238                           TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00239                                         , "WormServerBase.ListenForMsg(): Other end closed socket %d\n"
00240                                         , p_descriptor
00241                                         );
00242                        }
00243                        _stillreading = false;
00244                        break;
00245 
00246                   default:
00247                        r_status = -3;
00248                        throw worm_socket_exception( WSF_RECV
00249                                                   , _err
00250                                                   , "Socket error from recv()"
00251                                                   );
00252                 }
00253                 break;
00254 #endif
00255 /* TODO : CHECK FOR TIMED-OUT ON NON_WINDOWS OS */
00256 
00257          } // switch recv_ew()
00258 
00259       } while( _stillreading );  // for 0 --> (p_maxlen - 1)
00260    }
00261    catch( worm_socket_exception & _se )
00262    {
00263       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00264                     , "WormServerBase::ListenForMsg(): socket exception:\n%s\n"
00265                     , _se.DecodeError()
00266                     );
00267    }
00268    catch( worm_exception & _we )
00269    {
00270       if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
00271       {
00272          TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00273                        , "WormServerBase.ListenForMsg(): Error - %s\n"
00274                        , _we.what()
00275                        );
00276       }
00277    }
00278 
00279    // tell the main thread this thread still alive
00280    ThreadsInfo[p_descriptor].lastpulse = time(NULL); // "I'm still alive"
00281    if ( _timeoutms == -1 )
00282    {
00283       // tell the main thread that we're entering a potentially
00284       // blocked state so it will not kill this thread during a
00285       // long block.
00286       ThreadsInfo[p_descriptor].status = THREAD_PROCESSING;
00287    }
00288 
00289    return r_status;
00290 }
00291 //---------------------------------------------------------------------------
00292 WORM_STATUS_CODE WormServerBase::SendMessage( const SOCKET  p_descriptor
00293                                             , const char *  p_msg
00294                                             , int        *  p_length
00295                                             )
00296 {
00297    WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00298 
00299    if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00300    {
00301       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00302                     , "WormServerBase.SendMessage(): sending message to %s  (timeout %d):\n%s<\n"
00303                     , ThreadsInfo[p_descriptor].ipaddr
00304                     , SendTimeoutMS
00305                     , p_msg
00306                     );
00307    }
00308 
00309 
00310    try
00311    {
00312       if ( p_msg == NULL )
00313       {
00314          throw worm_exception( "WormServerBase.SendMessage(): message string is NULL" );
00315       }
00316 
00317       int _sendcount = *p_length;
00318       if ( (*p_length = send_ew(p_descriptor, p_msg, _sendcount, 0, SendTimeoutMS)) == SOCKET_ERROR )
00319       {
00320          throw worm_socket_exception( WSF_SEND
00321                                     , socketGetError_ew()
00322                                     , "send_ew returned error"
00323                                     );
00324       }
00325 
00326       if ( _sendcount != *p_length )
00327       {
00328          throw worm_exception("bytes sent does not match message size");
00329       }
00330    }
00331    catch( worm_socket_exception& _se )
00332    {
00333       if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
00334       {
00335          TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00336                        , "WormServerBase.SendMessage(): failed sending message to %s:\n  %s\n"
00337                        , ThreadsInfo[p_descriptor].ipaddr
00338                        , _se.DecodeError()
00339                        );
00340       }
00341       r_status = WORM_STAT_FAILURE;
00342    }
00343    catch( worm_exception _we )
00344    {
00345       if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
00346       {
00347          TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00348                        , "WormServerBase.SendMessage(): Error - %s\n"
00349                        , _we.what()
00350                        );
00351       }
00352       r_status = WORM_STAT_FAILURE;
00353    }
00354    if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00355    {
00356       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00357                     , "WormServerBase.SendMessage(): exiting\n"
00358                     );
00359    }
00360    return r_status;
00361 }
00362 //---------------------------------------------------------------------------
00363 void WormServerBase::CheckConfig()
00364 {
00365 
00366    if ( TGlobalUtils::GetHeartbeatInt() < 1 )
00367    {
00368       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00369                     , "WormServerBase::CheckConfig(): No <HeartBeatInt> value from the config file\n"
00370                     );
00371       ConfigState = WORM_STAT_BADSTATE;
00372    }
00373 
00374    if ( RecvTimeoutMS < MIN_RECV_TIMEOUT_MS )
00375    {
00376       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00377                     , "WormServerBase::CheckConfig(): <RecvTimeoutMSecs> too low in config file, using %d\n"
00378                     , MIN_RECV_TIMEOUT_MS
00379                     );
00380       RecvTimeoutMS = MIN_RECV_TIMEOUT_MS;
00381    }
00382 
00383    if ( CommandRingKey == WORM_RING_INVALID )
00384    {
00385       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00386                     , "WormServerBase::CheckConfig(): No <CmdRingName> value from the config file\n"
00387                     );
00388       ConfigState = WORM_STAT_BADSTATE;
00389    }
00390 
00391    if ( strlen(ServerIPAddr) == 0 )
00392    {
00393       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00394                     , "WormServerBase::CheckConfig(): No <ServerIPAddr> value from the config file\n"
00395                     );
00396       ConfigState = WORM_STAT_BADSTATE;
00397    }
00398 
00399    if ( ServerPort == -1 )
00400    {
00401       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00402                     , "WormServerBase::CheckConfig(): No <ServerPort> value from the config file\n"
00403                     );
00404       ConfigState = WORM_STAT_BADSTATE;
00405    }
00406 
00407    if ( SendTimeoutMS == -2 )
00408    {
00409       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00410                     , "WormServerBase::CheckConfig(): No <SendTimeoutMSecs> value from the config file\n"
00411                     );
00412       ConfigState = WORM_STAT_BADSTATE;
00413    }
00414 }
00415 //---------------------------------------------------------------------------
00416 HANDLE_STATUS WormServerBase::HandleConfigLine( ConfigSource * p_parser )
00417 {
00418    // Do not manipulate ConfigState herein.
00419    //
00420    HANDLE_STATUS r_handled = HANDLER_USED;
00421 
00422    try
00423    {
00424       char * _token;
00425 
00426       do
00427       {
00428 
00429          if ( p_parser->Its("CmdRingName") )
00430          {
00431             _token = p_parser->String();
00432             if ( strlen(_token) == 0 )
00433             {
00434                throw worm_exception("missing <CmdRingName> value");
00435             }
00436             strncpy( CommandRingName, _token, MAX_RINGNAME_LEN );
00437 
00438             if ( (CommandRingKey = TGlobalUtils::LookupRingKey(CommandRingName)) == WORM_RING_INVALID )
00439             {
00440                throw worm_exception("invalid <CmdRingName> value");
00441             }
00442 
00443             continue;
00444          }
00445 
00446          if( p_parser->Its( "ServerIPAddr" ) )
00447          {
00448             _token = p_parser->String();
00449 
00450             if ( strlen(_token) == 0 )
00451             {
00452                throw worm_exception("missing <ServerIPAddr> value");
00453             }
00454 
00455             if ( 20 <= strlen(_token) )
00456             {
00457                throw worm_exception("<ServerIPAddr> name too long");
00458             }
00459 
00460             strcpy( ServerIPAddr, _token );
00461             continue;
00462          }
00463 
00464          if ( p_parser->Its("ServerPort") )
00465          {
00466             if ( (ServerPort = p_parser->Int()) == ConfigSource::INVALID_INT )
00467             {
00468                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00469                              , "WormServerBase::HandleConfigLine(): Invalid <ServerPort> value in line\n%s\n"
00470                              , p_parser->GetCurrentLine()
00471                              );
00472             }
00473             continue;
00474          }
00475 
00476          if( p_parser->Its("SendTimeoutMSecs") )
00477          {
00478             // convert to milliseconds
00479             if ( (SendTimeoutMS = p_parser->Int()) < 0 )
00480             {
00481                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00482                              , "WormServerBase::HandleConfigLine(): Invalid <SendTimeoutMSecs> line (must be > 0), reverting to 10000 ms\n"
00483                              );
00484                SendTimeoutMS = 10000;
00485             }
00486             continue;
00487          }
00488 
00489          if( p_parser->Its("RecvTimeoutMSecs") )
00490          {
00491             RecvTimeoutMS = p_parser->Int();
00492             if ( RecvTimeoutMS < -1 )
00493             {
00494                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00495                              , "WormServerBase::HandleConfigLine(): Invalid <RecvTimeoutSecs> line, reverting to 200 ms\n"
00496                              );
00497                RecvTimeoutMS = 200;
00498             }
00499             continue;
00500          }
00501 
00502 
00503          if ( p_parser->Its("MaxServerThreads") )
00504          {
00505             MaxServiceThreads = p_parser->Int();
00506             if ( MaxServiceThreads <= 0 || SERVE_MAX_THREADS < MaxServiceThreads )
00507             {
00508                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00509                              , "WormServerBase::HandleConfigLine(): <MaxServiceThreads> (%d) out of range (0 - %d)\n%s %d\n"
00510                              , MaxServiceThreads
00511                              , SERVE_MAX_THREADS
00512                              , "                                    setting to "
00513                              , SERVE_MAX_THREADS
00514                              );
00515                MaxServiceThreads = SERVE_MAX_THREADS;
00516             }
00517             continue;
00518          }
00519 
00520          if( p_parser->Its("SocketDebug") )
00521          {
00522             SocketDebug = true;
00523             continue;
00524          }
00525 
00526          r_handled = HANDLER_UNUSED;
00527 
00528       } while ( false );
00529    }
00530    catch( worm_exception _we )
00531    {
00532       r_handled = HANDLER_INVALID;
00533       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00534                    , "WormServerBase::HandleConfigLine(): configuration error:\n%s%s\n"
00535                    , "                                    "
00536                    , _we.what()
00537                    );
00538    }
00539 
00540    return r_handled;
00541 }
00542 //---------------------------------------------------------------------------
00543 /******************************************************************************
00544 * SendStatus() builds a heartbeat or error message & puts it into    *
00545 *                     shared memory.  Writes errors to log file & screen.    *
00546 ******************************************************************************/
00547 void WormServerBase::SendStatus( unsigned char type, short p_ierr, char * p_text )
00548 {
00549    if ( CommandRingKey != WORM_RING_INVALID )
00550    {
00551       MSG_LOGO    logo;
00552       char        msg[256];
00553       long        size;
00554       long        t;
00555         
00556       /* Build the message
00557       *******************/
00558       logo.instid = TGlobalUtils::GetThisInstallationId();
00559       logo.mod    = TGlobalUtils::GetThisModuleId();
00560       logo.type   = type;
00561         
00562       time( &t );
00563 
00564       strcpy( msg, "" );
00565    
00566       if( type == TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT") )
00567       {
00568          sprintf( msg, "%ld %ld\n\0", t, TGlobalUtils::GetPID());
00569       }
00570       else if( type == TGlobalUtils::LookupMessageTypeId("TYPE_ERROR") )
00571       {
00572          sprintf( msg, "%ld %hd %s\n\0", t, p_ierr, p_text);
00573       }
00574       else
00575       {
00576          return;
00577       }
00578         
00579       size = strlen( msg );   /* don't include the null byte in the message */
00580 
00581       /* Write the message to shared memory
00582       ************************************/
00583 
00584       if( tport_putmsg( &CommandRegion, &logo, size, msg ) != PUT_OK )
00585       {
00586          if( type == TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT") )
00587          {
00588             TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00589                           , "WormServerBase::SendStatus(): Error sending heartbeat.\n"
00590                           );
00591          }
00592          else // if( type == TGlobalUtils::LookupMessageTypeId("TYPE_ERROR") )
00593          {
00594             TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00595                           , "WormServerBase::SendStatus(): Error sending error: %d.\n"
00596                           , p_ierr
00597                           );
00598          }
00599       }
00600    }
00601 }
00602 //---------------------------------------------------------------------------
00603 WORM_STATUS_CODE WormServerBase::Run()
00604 {
00605    WORM_STATUS_CODE r_status = WORM_STAT_SUCCESS;
00606 
00607    static long CurrentTime
00608              , LastBeatTime
00609              ;
00610 
00611    try
00612    {
00613       if ( ConfigState != WORM_STAT_SUCCESS )
00614       {
00615          throw worm_exception("WormServerBase::Run() Server not configured");
00616       }
00617 
00618       if ( SocketDebug )
00619       {
00620          // Turn Socket level debugging On/Off
00621          setSocket_ewDebug(1);
00622       }
00623 
00624       tport_attach( &CommandRegion, CommandRingKey );
00625 
00626 #ifdef _SOLARIS
00627       // Ignore broken socket signals
00628       (void)sigignore(SIGPIPE);
00629 #endif
00630 
00631       // Set up the signal handler so we can shut down gracefully
00632            //
00633            signal(SIGINT, (SIG_HANDLR_PTR)SignalHandler);     /* <Ctrl-C> interrupt */
00634            signal(SIGTERM, (SIG_HANDLR_PTR)SignalHandler);    /* program termination request */
00635            signal(SIGABRT, (SIG_HANDLR_PTR)SignalHandler);    /* abnormal termination */
00636 #ifdef SIGBREAK
00637            signal(SIGBREAK, (SIG_HANDLR_PTR)SignalHandler);   /* keyboard break */
00638 #endif
00639 
00640       if ( ! PrepareToRun() )
00641       {
00642          throw worm_exception("PrepareToRun() reported not ready");
00643       }
00644 
00645 
00646       // Start the Listener Thread
00647       //
00648       // alternative:     MyThreadFunction = &WormServerBase::Listener;
00649       LastListenerPulse = time(NULL) + 5; // allow an extra 5 seconds to start
00650       if ( StartThread( THREAD_STACK, &ListenerThreadId ) == WORM_STAT_FAILURE )
00651       {
00652          throw worm_exception("failed starting listener thread");
00653       }
00654 
00655 
00656       int flag;
00657 
00658       // initialize the Times
00659       //
00660       time(&CurrentTime);
00661       LastBeatTime = CurrentTime;
00662 
00663       Running = true;
00664       do
00665       {
00666          // Check for and respond to stop flags
00667          flag = tport_getflag(&CommandRegion);
00668          if( flag == TERMINATE  ||  flag == (int)TGlobalUtils::GetPID() )
00669          {
00670             Running = false; // signal all threads to return
00671             continue;
00672          }
00673 
00674          // +2 is to allow sleeps for same time
00675          if ( (LastListenerPulse + ACCEPTABLE_SEC_TO_WAIT_FOR_SERVICE_THREAD_TO_START + 2) < time(NULL) )
00676          {
00677             SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_ERROR"), 0, "listener thread stopped" );
00678             throw worm_exception("Listener thread stopped pulsing");
00679          }
00680 
00681          // Send server's heartbeat if it is time
00682          if( TGlobalUtils::GetHeartbeatInt() <= (time(&CurrentTime) - LastBeatTime) )
00683          {
00684             LastBeatTime = CurrentTime;
00685             SendStatus( TGlobalUtils::LookupMessageTypeId("TYPE_HEARTBEAT"), 0, "" );
00686          }
00687 
00688          // Handle logging, etc, in MainThreadActions()....
00689          // so, don't throw an error here
00690          //
00691          if ( MainThreadActions() == WORM_STAT_SUCCESS )
00692          {
00693             //
00694             TTimeFuncs::MSecSleep(500);
00695          }
00696          else
00697          {
00698             Running = false;
00699          }
00700 
00701       } while( Running );
00702 
00703    }
00704    catch( worm_exception _we )
00705    {
00706       r_status = WORM_STAT_FAILURE;
00707       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00708                     , "WormServerBase::Run(): error: %s\n"
00709                     , _we.what()
00710                     );
00711    }
00712 
00713    TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDOUT|WORM_LOG_TIMESTAMP
00714                  , "WormServerBase::Run(): calling FinishedRunning()\n"
00715                  );
00716 
00717    // Tell the derivative classes to finish up
00718    FinishedRunning();
00719 
00720    if ( CommandRegion.addr != NULL )
00721    {
00722       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00723                     , "WormServerBase::Listener(): detaching from command ring\n"
00724                     );
00725       tport_detach( &CommandRegion );
00726       CommandRegion.addr = NULL;
00727    }
00728 
00729    TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDOUT|WORM_LOG_TIMESTAMP
00730                  , "WormServerBase::Run(): returning.\n"
00731                  );
00732 
00733    return r_status;
00734 }
00735 //---------------------------------------------------------------------------
00736 // THREAD_RETURN is some sort of void
00737 //
00738 // The sockaddr structure varies depending on the the protocol selected.
00739 // The structure below is used with TCP/IP. Other protocols use similar structures.
00740 //
00741 // struct sockaddr_in {
00742 //         short   sin_family;
00743 //         u_short sin_port;
00744 //         struct  in_addr sin_addr;
00745 //         char    sin_zero[8];
00746 // };
00747 //
00748 // struct hostent {
00749 //     char FAR *       h_name;
00750 //     char FAR * FAR * h_aliases;
00751 //     short            h_addrtype;
00752 //     short            h_length;
00753 //     char FAR * FAR * h_addr_list;
00754 // };
00755 //
00756 // Members
00757 //h_name -- Official name of the host (PC).If using the DNS or similar resolution system,
00758 //          it is the Fully Qualified Domain Name (FQDN) that caused the server to return a reply.
00759 //          If using a local "hosts" file, it is the first entry after the IP address.
00760 //h_aliases -- A NULL-terminated array of alternate names.
00761 //h_addrtype -- The type of address being returned.
00762 //h_length -- The length, in bytes, of each address.
00763 //h_addr_list -- A NULL-terminated list of addresses for the host.
00764 //               Addresses are returned in network byte order.
00765 //               The macro h_addr is defined to be h_addr_list[0] for compatibility with older software.
00766 //
00767 THREAD_RETURN WormServerBase::Listener( void * p_dummy )
00768 {
00769 
00770    if ( WORM_LOG_TRACKING <= TGlobalUtils::GetLoggingLevel() )
00771    {
00772       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDOUT|WORM_LOG_TIMESTAMP
00773                     , "WormServerBase::Listener(): Starting\n"
00774                     );
00775    }
00776 
00777    try
00778    {
00779       int              _clientaddrlength;
00780 
00781 /*
00782       // Fill in the hostent structure, given dot address as a char string
00783       //
00784 //      unsigned long    ServerNBOAddr;
00785 //      struct hostent * HostEnt;
00786 //      struct  in_addr  hostentaddr;
00787 //      short            hostentaddrlen;
00788 
00789       // TODO : CHECK inet_addr() ON UNIX
00790       if ( (ServerNBOAddr = inet_addr(ServerIPAddr)) == INADDR_NONE)
00791       {
00792          char _msg[40];
00793          sprintf( _msg, "inet_addr(%s) failed", ServerIPAddr );
00794          throw worm_exception(_msg);
00795       }
00796       // TODO : CHECK gethostbyaddr() ON UNIX
00797       HostEnt = gethostbyaddr((char *)&ServerNBOAddr, sizeof(ServerNBOAddr), AF_INET);
00798       if (HostEnt == NULL)
00799       {
00800          char _msg[80];
00801          sprintf( _msg
00802                 , "gethostbyaddr() for %s failed"
00803                 , ServerIPAddr
00804                 );
00805          throw worm_exception(_msg);
00806       }
00807 
00808       memcpy( (char *)&hostentaddr, HostEnt->h_addr, HostEnt->h_length );
00809       hostentaddrlen = HostEnt->h_length;
00810 */
00811 
00812       while( Running )
00813       {
00814          LastListenerPulse = time(NULL);
00815 
00816 
00817          if ( PassiveSocket == INVALID_SOCKET )
00818          {
00819             if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00820             {
00821                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00822                              , "WormServerBase::Listener(): calling socket_ew()\n"
00823                              );
00824             }
00825 
00826             if ( ( PassiveSocket = socket_ew(AF_INET, SOCK_STREAM, 0) ) == INVALID_SOCKET )
00827             {
00828                throw worm_exception("socket_ew() returned error");
00829             }
00830 
00831             if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00832             {
00833                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00834                              , "WormServerBase::Listener(): calling setsockopt(SO_REUSEADDR)\n"
00835                              );
00836             }
00837 
00838             // Allows the server to reuse the address if problems (stop, crash, etc.)
00839             // kept the socket bound to a previous process
00840             //
00841             char _value = 1;
00842             if( setsockopt( PassiveSocket, SOL_SOCKET, SO_REUSEADDR, &_value, sizeof(char *) ) != 0 )
00843             {
00844                if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
00845                {
00846                   TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00847                                 , "setsockopt() failed setting SO_REUSEADDR, can't reuse socket if crash.\n"
00848                                 );
00849                }
00850             }
00851 
00852 
00853 /*
00854             struct sockaddr_in _serverstruct;
00855 
00856             // Fill in server's socket address structure
00857             //*******************************************
00858             memset( (char *)&_serverstruct, '\0', sizeof(_serverstruct) );
00859             memcpy( (char *)&_serverstruct.sin_addr, &hostentaddr, hostentaddrlen );
00860             _serverstruct.sin_family = AF_INET;
00861             _serverstruct.sin_port   = htons( (short)ServerPort );
00862 */
00863             struct sockaddr_in _serverstruct;
00864 
00865             // Fill in server's socket address structure
00866             //*******************************************
00867             memset( (char *)&_serverstruct, '\0', sizeof(_serverstruct) );
00868 
00869             _serverstruct.sin_family = AF_INET;
00870             _serverstruct.sin_port   = htons( (short)ServerPort );
00871 
00872             if ((int)(_serverstruct.sin_addr.S_un.S_addr = inet_addr(ServerIPAddr)) == -1)
00873             {
00874                worm_exception _expt( "inet_addr(" );
00875                               _expt += ServerIPAddr;
00876                               _expt += ") failed";
00877                throw _expt;
00878             }
00879 
00880 
00881 
00882             if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00883             {
00884                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00885                              , "WormServerBase::Listener(): calling bind_ew()\n"
00886                              );
00887             }
00888 
00889             // Bind socket to a name
00890             if ( bind_ew( PassiveSocket, (struct sockaddr *)&_serverstruct, sizeof(_serverstruct)) )
00891             {
00892                closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW );
00893                PassiveSocket = INVALID_SOCKET;
00894                throw worm_exception("bind_ew() returned error");
00895             }
00896 
00897             if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00898             {
00899                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00900                              , "WormServerBase::Listener(): calling listen_ew(%d, %d)\n"
00901                              , PassiveSocket
00902                              , MaxServiceThreads
00903                              );
00904             }
00905 
00906             // start listening
00907             if ( listen_ew( PassiveSocket, MaxServiceThreads ) )
00908             {
00909                closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW );
00910                PassiveSocket = INVALID_SOCKET;
00911                throw worm_exception("listen_ew() returned error");
00912             }
00913 
00914             LastListenerPulse = time(NULL);
00915 
00916          } // need to initialize server socket
00917 
00918 
00919          // =================================================================
00920          // Clean up the thread info structures
00921          //
00922          // Remove thread tracking structures for any threads in the
00923          // ERROR or COMPLETED state
00924          //
00925          // That is, assume that any thread in another state is still
00926          // talking successfully with its socket.
00927          //
00928          SERVICETHREADID_VECTOR _vec;      // list of items to be removed from list
00929          _vec.reserve(ThreadsInfo.size()); // adjust capacity
00930 
00931          SERVICETHREAD_MAP_ITERATOR _sttitr = ThreadsInfo.begin()
00932                                   , _enditr = ThreadsInfo.end()
00933                                   ;
00934          while ( _sttitr != _enditr )
00935          {
00936 //            if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00937 //            {
00938 //               TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00939 //                             , "WormServerBase::Listener(): checking thread %d (d: %d)\n"
00940 //                             , _sttitr->second.threadid
00941 //                             , _sttitr->second.descriptor
00942 //                             );
00943 //            }
00944 
00945             switch ( _sttitr->second.status )
00946             {
00947               case THREAD_ERROR:
00948                    TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00949                                  , "WormServerBase::Listener(): service thread %d reported error\n"
00950                                  , _sttitr->second.threadid
00951                                  );
00952               case THREAD_COMPLETED:
00953               case THREAD_DISCONNECTED:
00954                    _vec.push_back( _sttitr->second.descriptor );
00955                    break;
00956 
00957               default:
00958                    if (   (_sttitr->second.lastpulse + SERVICE_THREAD_PULSE_MAX) < time(NULL)
00959                        && (_sttitr->second.status != THREAD_BLOCKINGSOCKET)
00960                       )
00961                    {
00962                       // Service thread found without a pulse for too long, and it is not
00963                       // making a possibly blocking socket call.
00964                       // Close socket and kill the thread
00965                       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
00966                                     , "WormServerBase::Listener(): service thread %d found hung, killing it\n"
00967                                     , _sttitr->second.threadid
00968                                     );
00969                       try
00970                       {
00971                          closesocket_ew( _sttitr->second.descriptor, SOCKET_CLOSE_SIMPLY_EW );
00972                       }
00973                       catch( ... ) { } // ensure that the thread is killed despite any closure error
00974                       KillThread( _sttitr->second.threadid );
00975                       _vec.push_back( _sttitr->second.descriptor );
00976                    }
00977                    break;
00978             }
00979 
00980 
00981             _sttitr++;
00982          }
00983 
00984          for ( SOCKET _t = 0 ; _t < _vec.size() ; _t++ )
00985          {
00986             if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
00987             {
00988                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
00989                              , "WormServerBase::Listener(): erasing thread %d\n"
00990                              , _vec[_t]
00991                              );
00992             }
00993             ThreadsInfo.erase(_vec[_t]);
00994          }
00995 
00996 
00997          // =================================================================
00998          // Try to accept new client
00999          //
01000          if ( ThreadsInfo.size() == MaxServiceThreads )
01001          {
01002             if ( WORM_LOG_STATUS <= TGlobalUtils::GetLoggingLevel() )
01003             {
01004                TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
01005                              , "WormServerBase::Listener(): maxed out on service threads, cannot service client\n"
01006                              );
01007             }
01008             TTimeFuncs::MSecSleep(WAIT_FOR_SERVICE_THREAD);
01009          }
01010          else
01011          {
01012 //            if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
01013 //            {
01014 //               TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
01015 //                             , "WormServerBase::Listener(): waiting for new connection\n"
01016 //                             );
01017 //            }
01018 
01019 
01020             _ServiceThreadInfoStruct _newconn_info;
01021             _newconn_info.status = THREAD_STARTING;
01022 
01023             _clientaddrlength = sizeof(_newconn_info.sock_addr);
01024 
01025             // Don't use a blocking socket here (-1), so the loop can clean up
01026             // dead threads if needed.
01027             _newconn_info.descriptor = accept_ew( PassiveSocket
01028                                                 , (struct sockaddr*)&_newconn_info.sock_addr
01029                                                 , &_clientaddrlength
01030                                                 , 100 // do not make this -1
01031                                                 );
01032 
01033             if( _newconn_info.descriptor == INVALID_SOCKET )
01034             {
01035 #if defined(_WINNT) || defined(_Windows)
01036 
01037                int _err = socketGetError_ew();
01038                switch( _err )
01039                {
01040                  case WSAEMFILE:
01041                       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01042                                     , "WormServerBase::Listener(): accept_ew() returned error: no descriptors available\n"
01043                                     );
01044                       // queue not empty, no descriptors available (can try again later)
01045                       break;
01046                  case WSAEWOULDBLOCK:
01047 //                      if ( TGlobalUtils::GetLoggingLevel() == WORM_LOG_DEBUG )
01048 //                      {
01049 //                         TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01050 //                                       , "WormServerBase::Listener(): accept_ew() returned error, no connection attempt\n"
01051 //                                       );
01052 //                      }
01053                       TTimeFuncs::MSecSleep(500);
01054                       break;
01055                  default:
01056                       throw worm_socket_exception( WSF_ACCEPT
01057                                                  , _err
01058                                                  , "Socket error returned from accept_ew()"
01059                                                  );
01060                }
01061 #else
01062                /* TODO : FINISH NON-WINDOWS ERROR HANDLING */
01063                closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW );
01064                PassiveSocket = INVALID_SOCKET;
01065                continue;
01066 #endif
01067             }
01068             else
01069             {
01070                strcpy( _newconn_info.ipaddr, inet_ntoa(_newconn_info.sock_addr.sin_addr) );
01071 
01072                // Start the Service thread
01073                //
01074                if ( WORM_LOG_STATUS <= TGlobalUtils::GetLoggingLevel() )
01075                {
01076                   TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01077                                 , "WormServerBase::Listener(): Connection accepted from IP address %s\n"
01078                                 , _newconn_info.ipaddr
01079                                 );
01080                }
01081 
01082                ThreadsInfo[_newconn_info.descriptor] = _newconn_info;
01083 
01084                TO_THREAD_ID _newthreadid;
01085 
01086                if ( StartThreadWithArg( (unsigned int)THREAD_STACK
01087                                       , &_newthreadid
01088                                       , &_newconn_info.descriptor
01089                                       )  == WORM_STAT_FAILURE )
01090                {
01091                   closesocket_ew( _newconn_info.descriptor, SOCKET_CLOSE_IMMEDIATELY_EW );
01092                   ThreadsInfo.erase(_newconn_info.descriptor);
01093                   if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01094                   {
01095                      TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01096                                    , "WormServerBase::Listener(): failed to start service thread\n"
01097                                    , _newconn_info.ipaddr
01098                                    );
01099                   }
01100                   continue;
01101                }
01102 
01103                ThreadsInfo[_newconn_info.descriptor].threadid = _newthreadid;
01104 
01105                // Check to see if the thread has awakened
01106                if ( WORM_LOG_DETAILS <= TGlobalUtils::GetLoggingLevel() )
01107                {
01108                   TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01109                                 , "WormServerBase::Listener(): waiting for thread %d to come alive\n"
01110                                 , _newthreadid
01111                                 );
01112                }
01113 
01114                LastListenerPulse = time(NULL);
01115                TTimeFuncs::MSecSleep(ACCEPTABLE_SEC_TO_WAIT_FOR_SERVICE_THREAD_TO_START);
01116 
01117                if (   0 < ThreadsInfo.count(_newconn_info.descriptor)
01118                    && ThreadsInfo[_newconn_info.descriptor].status == THREAD_STARTING
01119                   )
01120                {
01121                   if ( WORM_LOG_ERRORS <= TGlobalUtils::GetLoggingLevel() )
01122                   {
01123                      TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01124                                    , "WormServerBase::Listener(): Thread %d never went to busy, killing it\n"
01125                                    , _newthreadid
01126                                    );
01127                   }
01128                   // A confused thread running around out there,
01129                   // clean up the socket and kill it
01130                   //
01131                   KillThread( _newthreadid );
01132                   closesocket_ew( _newconn_info.descriptor, SOCKET_CLOSE_IMMEDIATELY_EW );
01133                   ThreadsInfo.erase(_newconn_info.descriptor);
01134                }
01135 
01136             } // accept_ew() successful
01137 
01138          } // thread available (not maxed out)
01139 
01140       } // Running == true
01141    }
01142    catch( worm_socket_exception& _se )
01143    {
01144       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01145                     , "WormServerBase::Listener(): socket exception:\n%s\n"
01146                     , _se.DecodeError()
01147                     );
01148    }
01149    catch( worm_exception& _we )
01150    {
01151       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR|WORM_LOG_TIMESTAMP
01152                     , "WormServerBase::Listener(): %s\n"
01153                     , _we.what()
01154                     );
01155    }
01156 
01157 
01158    if ( PassiveSocket != INVALID_SOCKET )
01159    {
01160       TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
01161                     , "WormServerBase::Listener(): closing passive socket\n"
01162                     );
01163       closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW );
01164       PassiveSocket = INVALID_SOCKET;
01165    }
01166 
01167    TLogger::Logit( WORM_LOG_TOFILE|WORM_LOG_TOSTDERR
01168                  , "WormServerBase::Listener(): returning\n"
01169                  );
01170 
01171 }

Generated on Tue May 6 09:16:10 2003 for Earthworm Libs by doxygen1.3-rc3