Main Page   Class Hierarchy   Compound List   File List   Compound Members   File Members  

transport.c

Go to the documentation of this file.
00001 
00002 /*
00003  *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
00004  *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
00005  *
00006  *    $Id: transport_8c-source.html 2161 2006-05-19 16:55:03Z paulf $
00007  *
00008  *    Revision history:
00009  *     $Log$
00009  *     Revision 1.1  2006/05/19 16:55:02  paulf
00009  *     first inclusion
00009  *
00010  *     Revision 1.3  2001/05/04 23:43:54  dietz
00011  *     Changed flag arg of tport_putflag from short to int to handle
00012  *     processids properly.
00013  *
00014  *     Revision 1.2  2000/06/02 18:19:48  dietz
00015  *     Fixed tport_putmsg,tport_copyto to always release semaphore before returning
00016  *
00017  *     Revision 1.1  2000/02/14 18:53:30  lucky
00018  *     Initial revision
00019  *
00020  *
00021  */
00022 
00023 /********************************************************************/
00024 /*                                                        6/2000    */
00025 /*                           transport.c                            */
00026 /*                                                                  */
00027 /*   Transport layer functions to access shared memory regions.     */
00028 /*                                                                  */
00029 /*   written by Lynn Dietz, Will Kohler with inspiration from       */
00030 /*       Carl Johnson, Alex Bittenbinder, Barbara Bogaert           */
00031 /*                                                                  */
00032 /********************************************************************/
00033 
00034 /* ***** Notes on development, delete when appropriate
00035 1. Change the quotes for the transport.h and earthworm.h includes
00036    when these are moved to the appropriately pathed included directory.
00037 */
00038 #include <windows.h>
00039 #include <sys/types.h>
00040 #include <stdio.h>
00041 #include <errno.h>
00042 #include <string.h>
00043 #include <time.h>
00044 #include <process.h>
00045 #include <transport.h>
00046 
00047 static short Put_Init=1;           /* initialization flag */
00048 static short Get_Init=1;           /* initialization flag */
00049 static short Copyfrom_Init=1;      /* initialization flag */
00050 static short Copyto_Init  =1;      /* initialization flag */
00051 
00052 /* These functions are for internal use by transport functions only
00053    ****************************************************************/
00054 void  tport_syserr  ( char *, long );
00055 void  tport_buferror( short, char * );
00056 
00057 /* These statements and variables are required by the functions of
00058    the input-buffering thread
00059    ***************************************************************/
00060 #include "earthworm.h"
00061 volatile SHM_INFO *PubRegion;      /* transport public ring      */
00062 volatile SHM_INFO *BufRegion;      /* pointer to private ring    */
00063 volatile MSG_LOGO *Getlogo;        /* array of logos to copy     */
00064 volatile short     Nget;           /* number of logos in getlogo */
00065 volatile unsigned  MaxMsgSize;     /* size of message buffer     */
00066 volatile char     *Message;        /* message buffer             */
00067 static unsigned char MyModuleId;   /* module id of main thread   */
00068 unsigned char      MyInstid;       /* instid of main thread      */
00069 unsigned char      TypeError;      /* type for error messages    */
00070 
00071 /******************** function tport_create *************************/
00072 /*         Create a shared memory region & its semaphore,           */
00073 /*           attach to it and initialize header values.             */
00074 /********************************************************************/
00075 void tport_create( SHM_INFO *region,   /* info structure for memory region  */
00076                    long      nbytes,   /* size of shared memory region      */
00077                    long      memkey )  /* key to shared memory region       */
00078 {
00079    SHM_HEAD       *shm;       /* pointer to start of memory region */
00080    HANDLE         hshare;     // Handle to memory shared file
00081    HANDLE         hmutex;     // Handle to mutex object
00082    char           share[20];  // Shared file name from memkey
00083    char           mutex[20];  // Mutex name
00084    int            err;        // Error code from GetLastError()
00085 
00086 /**** Create shared memory region ****/
00087    sprintf(share, "SHR_%ld", memkey);
00088    hshare = CreateFileMapping(
00089       (HANDLE)0xFFFFFFFF,  // Request memory file (swap space)
00090       NULL,                // Security attributes
00091       PAGE_READWRITE,      // Access restrictions
00092       0,                   // High order size (for very large mappings)
00093       nbytes,              // Low order size
00094       share);              // Name of file for other processes
00095    if ( hshare == NULL )
00096    {
00097       err = GetLastError();
00098       tport_syserr( "CreateFileMapping", err);
00099    }
00100 
00101 /**** Attach to shared memory region ****/
00102    shm = (SHM_HEAD *) MapViewOfFile(
00103       hshare,              // File object to map
00104       FILE_MAP_WRITE,      // Access desired
00105       0,                   // High-order 32 bits of file offset
00106       0,                   // Low-order 32 bits of file offset
00107       nbytes);             // Number of bytes to map
00108    if ( shm == NULL )
00109    {
00110       err = GetLastError();
00111       tport_syserr( "MapViewOfFile", err );
00112    }
00113 
00114 /**** Initialize shared memory region header ****/
00115    shm->nbytes = nbytes;
00116    shm->keymax = nbytes - sizeof(SHM_HEAD);
00117    shm->keyin  = sizeof(SHM_HEAD);
00118    shm->keyold = shm->keyin;
00119    shm->flag   = 0;
00120 
00121 /**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
00122    sprintf(mutex, "MTX_%ld", memkey);
00123    hmutex = CreateMutex(
00124       NULL,                // Security attributes
00125       FALSE,               // Initial ownership
00126       mutex);              // Name of mutex (derived from memkey)
00127    if ( hmutex == NULL )
00128    {
00129       err = GetLastError();
00130       tport_syserr( "CreateMutex", err);
00131    }
00132 
00133 /**** set values in the shared memory information structure ****/
00134    region->addr = shm;
00135    region->hShare = hshare;
00136    region->hMutex = hmutex;
00137    region->key  = memkey;
00138 }
00139 
00140 
00141 /******************** function tport_destroy *************************/
00142 /*                Destroy a shared memory region.                    */
00143 /*********************************************************************/
00144 
00145 void tport_destroy( SHM_INFO *region )
00146 {
00147    int err;
00148 
00149 /***** Set kill flag, give other attached programs time to terminate ****/
00150 
00151    tport_putflag( region, TERMINATE );
00152    Sleep( 1000 );
00153 
00154 /***** Detach from shared memory region *****/
00155    if(!UnmapViewOfFile( region->addr )) {
00156       err = GetLastError();
00157       tport_syserr( "UnmapViewOfFile", err);
00158    }
00159 
00160 /***** Destroy semaphore set for shared memory region *****/
00161    if(!CloseHandle(region->hMutex)) {
00162       err = GetLastError();
00163       tport_syserr( "CloseHandle (mutex)", err);
00164    }
00165 
00166 
00167 /***** Destroy shared memory region *****/
00168    if(!CloseHandle(region->hShare)) {
00169       err = GetLastError();
00170       tport_syserr( "CloseHandle (share)", err);
00171    }
00172 }
00173 
00174 /******************** function tport_attach *************************/
00175 /*            Map to an existing shared memory region.              */
00176 /********************************************************************/
00177 
00178 void tport_attach( SHM_INFO *region,   /* info structure for memory region  */
00179                    long      memkey )  /* key to shared memory region       */
00180 {
00181    SHM_HEAD       *shm;       /* pointer to start of memory region */
00182    HANDLE         hshare;     // Handle to memory shared file
00183    HANDLE         hmutex;     // Handle to mutex object
00184    char           share[20];  // Shared file name from memkey
00185    char           mutex[20];  // Mutex name
00186    int            err;        // Error code from GetLastError()
00187 
00188 /**** Create shared memory region ****/
00189    sprintf(share, "SHR_%ld", memkey);
00190    hshare = OpenFileMapping(
00191        FILE_MAP_WRITE,
00192        TRUE,
00193        share);
00194    if ( hshare == NULL )
00195    {
00196       err = GetLastError();
00197       tport_syserr( "OpenFileMapping", err);
00198    }
00199 
00200 /**** Attach to shared memory region ****/
00201    shm = (SHM_HEAD *) MapViewOfFile(
00202       hshare,              // File object to map
00203       FILE_MAP_WRITE,      // Access desired
00204       0,                   // High-order 32 bits of file offset
00205       0,                   // Low-order 32 bits of file offset
00206       0);                  // Number of bytes to map
00207    if ( shm == NULL )
00208    {
00209       err = GetLastError();
00210       tport_syserr( "MapViewOfFile", err );
00211    }
00212 
00213 /**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
00214    sprintf(mutex, "MTX_%ld", memkey);
00215    hmutex = CreateMutex(
00216       NULL,                // Security attributes
00217       FALSE,               // Initial ownership
00218       mutex);              // Name of mutex (derived from memkey)
00219    if ( hmutex == NULL )
00220    {
00221       err = GetLastError();
00222       tport_syserr( "CreateMutex", err);
00223    }
00224 
00225 /**** set values in the shared memory information structure ****/
00226    region->addr = shm;
00227    region->hShare = hshare;
00228    region->hMutex = hmutex;
00229    region->key  = memkey;
00230 }
00231 
00232 /******************** function tport_detach **************************/
00233 /*                Detach from a shared memory region.                */
00234 /*********************************************************************/
00235 
00236 void tport_detach( SHM_INFO *region )
00237 {
00238    int err;
00239 
00240 /***** Detach from shared memory region *****/
00241    if(!UnmapViewOfFile( region->addr )) {
00242       err = GetLastError();
00243       tport_syserr( "UnmapViewOfFile", err);
00244    }
00245 
00246 /***** Destroy semaphore set for shared memory region *****/
00247    if(!CloseHandle(region->hMutex)) {
00248       err = GetLastError();
00249       tport_syserr( "CloseHandle (mutex)", err);
00250    }
00251 
00252 
00253 /***** Destroy shared memory region *****/
00254    if(!CloseHandle(region->hShare)) {
00255       err = GetLastError();
00256       tport_syserr( "CloseHandle (share)", err);
00257    }
00258 }
00259 
00260 
00261 
00262 /*********************** function tport_putmsg ***********************/
00263 /*            Put a message into a shared memory region.             */
00264 /*            Assigns a transport-layer sequence number.             */
00265 /*********************************************************************/
00266 
00267 int tport_putmsg( SHM_INFO *region,    /* info structure for memory region    */
00268                   MSG_LOGO *putlogo,   /* type, module, instid of incoming msg */
00269                   long      length,    /* size of incoming message            */
00270                   char     *msg )      /* pointer to incoming message         */
00271 {
00272    volatile static MSG_TRACK  trak[NTRACK_PUT];   /* sequence number keeper   */
00273    volatile static int        nlogo;              /* # of logos seen so far   */
00274    int                        it;                 /* index into trak          */
00275    SHM_HEAD         *shm;              /* pointer to start of memory region   */
00276    char             *ring;             /* pointer to ring part of memory      */
00277    unsigned long     ir;               /* index into memory ring              */
00278    long              nfill;            /* # bytes from ir to ring's last-byte */
00279    long              nwrap;            /* # bytes to wrap to front of ring    */
00280    TPORT_HEAD        hd;               /* transport layer header to put       */
00281    char             *h;                /* pointer to transport layer header   */
00282    TPORT_HEAD        old;              /* transport header of oldest msg      */
00283    char             *o;                /* pointer to oldest transport header  */
00284    int j;
00285    int retval = PUT_OK;                /* return value for this function      */
00286 
00287 /**** First time around, init the sequence counters, semaphore controls ****/
00288 
00289    if (Put_Init)
00290    {
00291        nlogo    = 0;
00292 
00293        for( j=0 ; j < NTRACK_PUT ; j++ )
00294        {
00295           trak[j].memkey      = 0;
00296           trak[j].logo.type   = 0;
00297           trak[j].logo.mod    = 0;
00298           trak[j].logo.instid = 0;
00299           trak[j].seq         = 0;
00300           trak[j].keyout      = 0;
00301        }
00302 
00303        Put_Init = 0;
00304    }
00305 
00306 /**** Set up pointers for shared memory, etc. ****/
00307 
00308    shm  = region->addr;
00309    ring = (char *) shm + sizeof(SHM_HEAD);
00310    h    = (char *) (&hd);
00311    o    = (char *) (&old);
00312 
00313 /**** First, see if the incoming message will fit in the memory region ****/
00314 
00315    if ( length + sizeof(TPORT_HEAD) > shm->keymax )
00316    {
00317       fprintf( stdout,
00318               "ERROR: tport_putmsg; message too large (%ld) for Region %ld\n",
00319                length, region->key);
00320       return( PUT_TOOBIG );
00321    }
00322 
00323 /**** Change semaphore; let others know you're using tracking structure & memory  ****/
00324 
00325    WaitForSingleObject(region->hMutex, INFINITE);
00326 
00327 /**** Next, find incoming logo in list of combinations already seen ****/
00328 
00329    for( it=0 ; it < nlogo ; it++ )
00330    {
00331       if ( region->key     != trak[it].memkey     )  continue;
00332       if ( putlogo->type   != trak[it].logo.type  )  continue;
00333       if ( putlogo->mod    != trak[it].logo.mod   )  continue;
00334       if ( putlogo->instid != trak[it].logo.instid ) continue;
00335       goto build_header;
00336    }
00337 
00338 /**** Incoming logo is a new combination; store it, if there's room ****/
00339 
00340    if ( nlogo == NTRACK_PUT )
00341    {
00342       fprintf( stdout,
00343               "ERROR: tport_putmsg; exceeded NTRACK_PUT, msg not sent\n");
00344       retval = PUT_NOTRACK;
00345       goto release_semaphore; 
00346    }
00347    it = nlogo;
00348    trak[it].memkey =  region->key;
00349    trak[it].logo   = *putlogo;
00350    nlogo++;
00351 
00352 /**** Store everything you need in the transport header ****/
00353 
00354 build_header:
00355    hd.start = FIRST_BYTE;
00356    hd.size  = length;
00357    hd.logo  = trak[it].logo;
00358    hd.seq   = trak[it].seq++;
00359 
00360 /**** In shared memory, see if keyin will wrap; if so, reset keyin and keyold ****/
00361 
00362    if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
00363    {
00364        shm->keyin  = shm->keyin  % shm->keymax;
00365        shm->keyold = shm->keyold % shm->keymax;
00366        if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
00367      /*fprintf( stdout,
00368                "NOTICE: tport_putmsg; keyin wrapped & reset; Region %ld\n",
00369                 region->key );*/
00370    }
00371 
00372 /**** Then see if there's enough room for new message in shared memory ****/
00373 /****      If not, "delete" oldest messages until there's room         ****/
00374 
00375    while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
00376    {
00377       ir = shm->keyold % shm->keymax;
00378       if ( ring[ir] != FIRST_BYTE )
00379       {
00380           fprintf( stdout,
00381                   "ERROR: tport_putmsg; keyold not at FIRST_BYTE, Region %ld\n",
00382                    region->key );
00383           retval = TPORT_FATAL;
00384           goto release_semaphore; 
00385       }
00386       for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
00387       {
00388          if ( ir >= shm->keymax )   ir -= shm->keymax;
00389          o[j] = ring[ir++];
00390       }
00391       shm->keyold += sizeof(TPORT_HEAD) + old.size;
00392    }
00393 
00394 /**** Now copy transport header into shared memory by chunks... ****/
00395 
00396    ir = shm->keyin % shm->keymax;
00397    nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
00398    if ( nwrap <= 0 )
00399    {
00400          memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
00401    }
00402    else
00403    {
00404          nfill = sizeof(TPORT_HEAD) - nwrap;
00405          memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
00406          memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
00407    }
00408    ir += sizeof(TPORT_HEAD);
00409    if ( ir >= shm->keymax )  ir -= shm->keymax;
00410 
00411 /**** ...and copy message into shared memory by chunks ****/
00412 
00413    nwrap = ir + length - shm->keymax;
00414    if ( nwrap <= 0 )
00415    {
00416          memcpy( (void *) &ring[ir], (void *) msg, length );
00417    }
00418    else
00419    {
00420          nfill = length - nwrap;
00421          memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
00422          memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
00423    }
00424    shm->keyin += sizeof(TPORT_HEAD) + length;
00425 
00426 /**** Finished with shared memory, let others know via semaphore ****/
00427 
00428 release_semaphore:
00429    ReleaseMutex(region->hMutex);
00430 
00431    if( retval == TPORT_FATAL ) exit( 1 );
00432    return( retval ); 
00433 }
00434 
00435 
00436 /*********************** function tport_getmsg ***********************/
00437 /*                 Get a message out of shared memory.               */
00438 /*********************************************************************/
00439 
00440 int tport_getmsg( SHM_INFO  *region,   /* info structure for memory region  */
00441                   MSG_LOGO  *getlogo,  /* requested logo(s)                 */
00442                   short      nget,     /* number of logos in getlogo        */
00443                   MSG_LOGO  *logo,     /* logo of retrieved msg             */
00444                   long      *length,   /* size of retrieved message         */
00445                   char      *msg,      /* retrieved message                 */
00446                   long       maxsize ) /* max length for retrieved message  */
00447 {
00448    static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
00449    static int        nlogo;            /* # modid,type,instid combos so far */
00450    int               it;               /* index into trak                   */
00451    SHM_HEAD         *shm;              /* pointer to start of memory region */
00452    char             *ring;             /* pointer to ring part of memory    */
00453    TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
00454    unsigned long     ir;               /* index into the ring               */
00455    long              nfill;            /* bytes from ir to ring's last-byte */
00456    long              nwrap;            /* bytes to grab from front of ring  */
00457    TPORT_HEAD        hd;               /* transport header from memory      */
00458    char             *h;                /* pointer to transport layer header */
00459    int               ih;               /* index into the transport header   */
00460    unsigned long     keyin;            /* in-pointer to shared memory       */
00461    unsigned long     keyold;           /* oldest complete message in memory */
00462    unsigned long     keyget;           /* pointer at which to start search  */
00463    int               status = GET_OK;  /* how did retrieval go?             */
00464    int               trakked;          /* flag for trakking list entries    */
00465    int               i,j;
00466 
00467 /**** Get the pointers set up ****/
00468 
00469    shm  = region->addr;
00470    ring = (char *) shm + sizeof(SHM_HEAD);
00471    h    = (char *) (&hd);
00472 
00473 /**** First time around, initialize sequence counters, outpointers ****/
00474 
00475    if (Get_Init)
00476    {
00477        nlogo = 0;
00478 
00479        for( i=0 ; i < NTRACK_GET ; i++ )
00480        {
00481           trak[i].memkey      = 0;
00482           trak[i].logo.type   = 0;
00483           trak[i].logo.mod    = 0;
00484           trak[i].logo.instid = 0;
00485           trak[i].seq         = 0;
00486           trak[i].keyout      = 0;
00487           trak[i].active      = 0; /*960618:ldd*/
00488        }
00489        Get_Init = 0;
00490    }
00491 
00492 /**** make sure all requested logos are entered in tracking list ****/
00493 
00494    for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
00495    {
00496        trakked = 0;  /* assume it's not being trakked */
00497        for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
00498        {
00499           if( region->key       != trak[it].memkey      ) continue;
00500           if( getlogo[j].type   != trak[it].logo.type   ) continue;
00501           if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
00502           if( getlogo[j].instid != trak[it].logo.instid ) continue;
00503           trakked = 1;  /* found it in the trakking list! */
00504           break;
00505        }
00506        if( trakked ) continue;
00507     /* Make an entry in trak for this logo; if there's room */
00508        if ( nlogo < NTRACK_GET )
00509        {
00510           it = nlogo;
00511           trak[it].memkey = region->key;
00512           trak[it].logo   = getlogo[j];
00513           nlogo++;
00514        }
00515    }
00516 
00517 /**** find latest starting index to look for any of the requested logos ****/
00518 
00519 findkey:
00520 
00521    keyget = shm->keyold;
00522 
00523    for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
00524    {
00525        if ( trak[it].memkey != region->key ) continue;
00526        for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
00527        {
00528           if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD  ) &&
00529              (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD   ) &&
00530              (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
00531           {
00532              if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
00533           }
00534        }
00535     }
00536     keyin = shm->keyin;
00537 
00538 /**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
00539 /****       If so, reset trak[xx].keyout and go back to findkey       ****/
00540 
00541    if ( keyget > keyin )
00542    {
00543       keyold = shm->keyold;
00544       for ( it=0 ; it < nlogo ; it++ )
00545       {
00546          if( trak[it].memkey == region->key )
00547          {
00548           /* reset keyout */
00549 /*DEBUG*/    /*printf("tport_getmsg: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
00550                      trak[it].keyout, keyold, keyin );*/
00551              trak[it].keyout = trak[it].keyout % shm->keymax;
00552 /*DEBUG*/    /*printf("tport_getmsg:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
00553                      trak[it].keyout, keyold, keyin );*/
00554 
00555           /* make sure new keyout points to keyin or to a msg's first-byte; */
00556           /* if not, we've been lapped, so set keyout to keyold             */
00557              ir    = trak[it].keyout;
00558              tmphd = (TPORT_HEAD *) &ring[ir];
00559              if ( trak[it].keyout == keyin   ||
00560                   (keyin-trak[it].keyout)%shm->keymax == 0 )
00561              {
00562 /*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  same as keyin\n",
00563                        trak[it].keyout );*/
00564                 trak[it].keyout = keyin;
00565              }
00566              else if( tmphd->start != FIRST_BYTE )
00567              {
00568 /*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
00569                         trak[it].keyout );*/
00570                 trak[it].keyout = keyold;
00571              }
00572 
00573           /* else, make sure keyout's value is between keyold and keyin */
00574              else if ( trak[it].keyout < keyold )
00575              {
00576                 do {
00577                     trak[it].keyout += shm->keymax;
00578                 } while ( trak[it].keyout < keyold );
00579              }
00580 /*DEBUG*/    /*printf("tport_getmsg:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
00581                      trak[it].keyout, keyold, keyin );*/
00582          }
00583       }
00584     /*fprintf( stdout,
00585           "NOTICE: tport_getmsg; keyin wrapped, keyout(s) reset; Region %ld\n",
00586            region->key );*/
00587 
00588       goto findkey;
00589    }
00590 
00591 
00592 /**** Find next message from requested type, module, instid ****/
00593 
00594 nextmsg:
00595 
00596    while ( keyget < keyin )
00597    {
00598    /* make sure you haven't been lapped by tport_putmsg */
00599        if ( keyget < shm->keyold ) keyget = shm->keyold;
00600 
00601    /* load next header; make sure you weren't lapped */
00602        ir = keyget % shm->keymax;
00603        for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
00604        {
00605           if ( ir >= shm->keymax )  ir -= shm->keymax;
00606           h[ih] = ring[ir++];
00607        }
00608        if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
00609 
00610    /* make sure it starts at beginning of a header */
00611        if ( hd.start != FIRST_BYTE )
00612        {
00613           fprintf( stdout,
00614                   "ERROR: tport_getmsg; keyget not at FIRST_BYTE, Region %ld\n",
00615                    region->key );
00616           exit( 1 );
00617        }
00618        keyget += sizeof(TPORT_HEAD) + hd.size;
00619 
00620    /* see if this msg matches any requested type */
00621        for ( j=0 ; j < nget ; j++ )
00622        {
00623           if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
00624              (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
00625              (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
00626           {
00627 
00628 /**** Found a message of requested logo; retrieve it! ****/
00629         /* complain if retreived msg is too big */
00630              if ( hd.size > maxsize )
00631              {
00632                *logo   = hd.logo;
00633                *length = hd.size;
00634                 status = GET_TOOBIG;
00635                 goto trackit;    /*changed 960612:ldd*/
00636              }
00637         /* copy message by chunks to caller's address */
00638              nwrap = ir + hd.size - shm->keymax;
00639              if ( nwrap <= 0 )
00640              {
00641                 memcpy( (void *) msg, (void *) &ring[ir], hd.size );
00642              }
00643              else
00644              {
00645                 nfill = hd.size - nwrap;
00646                 memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
00647                 memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
00648              }
00649         /* see if we got run over by tport_putmsg while copying msg */
00650         /* if we did, go back and try to get a msg cleanly          */
00651              keyold = shm->keyold;
00652              if ( keyold >= keyget )
00653              {
00654                 keyget = keyold;
00655                 goto nextmsg;
00656              }
00657 
00658         /* set other returned variables */
00659             *logo   = hd.logo;
00660             *length = hd.size;
00661 
00662 trackit:
00663         /* find msg logo in tracked list */
00664              for ( it=0 ; it < nlogo ; it++ )
00665              {
00666                 if ( region->key    != trak[it].memkey      )  continue;
00667                 if ( hd.logo.type   != trak[it].logo.type   )  continue;
00668                 if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
00669                 if ( hd.logo.instid != trak[it].logo.instid )  continue;
00670                 /* activate sequence tracking if 1st msg */
00671                 if ( !trak[it].active )
00672                 {
00673                     trak[it].seq    = hd.seq;
00674                     trak[it].active = 1;
00675                 }
00676                 goto sequence;
00677              }
00678         /* new logo, track it if there's room */
00679              if ( nlogo == NTRACK_GET )
00680              {
00681                 fprintf( stdout,
00682                      "ERROR: tport_getmsg; exceeded NTRACK_GET\n");
00683                 if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
00684                 goto wrapup;
00685              }
00686              it = nlogo;
00687              trak[it].memkey = region->key;
00688              trak[it].logo   = hd.logo;
00689              trak[it].seq    = hd.seq;
00690              trak[it].active = 1;      /*960618:ldd*/
00691              nlogo++;
00692 
00693 sequence:
00694         /* check if sequence #'s match; update sequence # */
00695              if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
00696              if ( hd.seq != trak[it].seq )
00697              {
00698                 status = GET_MISS;
00699                 trak[it].seq = hd.seq;
00700              }
00701              trak[it].seq++;
00702 
00703         /* Ok, we're finished grabbing this one */
00704              goto wrapup;
00705 
00706           } /* end if of logo & getlogo match */
00707        }    /* end for over getlogo */
00708    }        /* end while over ring */
00709 
00710 /**** If you got here, there were no messages of requested logo(s) ****/
00711 
00712    status = GET_NONE;
00713 
00714 /**** update outpointer (->msg after retrieved one) for all requested logos ****/
00715 
00716 wrapup:
00717    for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
00718    {
00719        if ( trak[it].memkey != region->key ) continue;
00720        for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
00721        {
00722           if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
00723              (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
00724              (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
00725           {
00726              trak[it].keyout = keyget;
00727           }
00728        }
00729     }
00730 
00731    return( status );
00732 
00733 }
00734 
00735 
00736 /********************* function tport_putflag ************************/
00737 /*           Puts the kill flag into a shared memory region.         */
00738 /*********************************************************************/
00739 
00740 void tport_putflag( SHM_INFO *region,  /* shared memory info structure     */
00741                     int       flag )   /* tells attached processes to exit */
00742 {
00743    SHM_HEAD  *shm;
00744 
00745    shm = region->addr;
00746    shm->flag = flag;
00747    return;
00748 }
00749 
00750 
00751 
00752 /*********************** function tport_getflag **********************/
00753 /*         Returns the kill flag from a shared memory region.        */
00754 /*********************************************************************/
00755 
00756 int tport_getflag( SHM_INFO *region )
00757 
00758 {
00759    SHM_HEAD  *shm;
00760 
00761    shm = region->addr;
00762    return( (int)shm->flag );
00763 }
00764 
00765 
00766 /************************** tport_bufthr ****************************/
00767 /*     Thread to buffer input from one transport ring to another.   */
00768 /********************************************************************/
00769 void tport_bufthr( void *dummy )
00770 {
00771    char          errnote[150];
00772    MSG_LOGO      logo;
00773    long          msgsize;
00774    unsigned char msgseq;
00775    int           res1, res2;
00776    int           gotmsg;
00777    HANDLE myHandle = GetCurrentThread();
00778 
00779 /* Reset my own thread priority
00780    ****************************/
00781    if ( SetThreadPriority( myHandle, THREAD_PRIORITY_TIME_CRITICAL ) == 0 )
00782    {
00783       printf( "Error setting buffer thread priority: %d\n", GetLastError() );
00784       exit( -1 );
00785    }
00786 
00787 /* Flush all existing messages from the public memory region
00788    *********************************************************/
00789    while( tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo, 
00790                           Nget, &logo, &msgsize, (char *) Message, 
00791                           MaxMsgSize, &msgseq )  !=  GET_NONE  );
00792 
00793    while ( 1 )
00794    {
00795       Sleep( 500 );
00796 
00797 /* If a terminate flag is found, copy it to the private ring.
00798    Then, terminate this thread.
00799    *********************************************************/
00800       if ( tport_getflag( (SHM_INFO *) PubRegion ) == TERMINATE )
00801       {
00802          tport_putflag( (SHM_INFO *) BufRegion, TERMINATE );
00803          _endthread();
00804       }
00805 
00806 /* Try to copy a message from the public memory region
00807    ***************************************************/
00808       do
00809       {
00810           res1 = tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo,
00811                                 Nget, &logo, &msgsize, (char *) Message,
00812                                 MaxMsgSize, &msgseq );
00813           gotmsg = 1;
00814 
00815 /* Handle return values
00816    ********************/
00817           switch ( res1 )
00818           {
00819           case GET_MISS_LAPPED:
00820                 sprintf( errnote,
00821                         "tport_bufthr: Missed msg(s)  c%d m%d t%d  Overwritten, region:%ld.",
00822                          (int) logo.instid, (int) logo.mod, (int) logo.type,
00823                          PubRegion->key );
00824                 tport_buferror( ERR_LAPPED, errnote );
00825                 break;
00826           case GET_MISS_SEQGAP:
00827                 sprintf( errnote,
00828                         "tport_bufthr: Missed msg(s)  c%d m%d t%d  Sequence gap, region:%ld.",
00829                          (int) logo.instid, (int) logo.mod, (int) logo.type,
00830                          PubRegion->key );
00831                 tport_buferror( ERR_SEQGAP, errnote );
00832                 break;
00833           case GET_NOTRACK:
00834                 sprintf( errnote,
00835                         "tport_bufthr: Logo c%d m%d t%d not tracked; NTRACK_GET exceeded.",
00836                         (int) logo.instid, (int) logo.mod, (int) logo.type );
00837                 tport_buferror( ERR_UNTRACKED, errnote );
00838           case GET_OK:
00839                 break;
00840           case GET_TOOBIG:
00841                 sprintf( errnote,
00842                         "tport_bufthr: msg[%ld] c%d m%d t%d seq%d too big; skipped in region:%ld.",
00843                          msgsize, (int) logo.instid, (int) logo.mod,
00844                          (int) logo.type, (int) msgseq, PubRegion->key );
00845                 tport_buferror( ERR_OVERFLOW, errnote );
00846           case GET_NONE:
00847                 gotmsg = 0;
00848                 break;
00849           }
00850 
00851 /* If you did get a message, copy it to private ring
00852    *************************************************/
00853           if ( gotmsg )
00854           {
00855               res2 = tport_copyto( (SHM_INFO *) BufRegion, &logo,
00856                                    msgsize, (char *) Message, msgseq );
00857               switch (res2)
00858               {
00859               case PUT_TOOBIG:
00860                  sprintf( errnote,
00861                      "tport_bufthr: msg[%ld] (c%d m%d t%d) too big for Region:%ld.",
00862                       msgsize, (int) logo.instid, (int) logo.mod, (int) logo.type,
00863                       BufRegion->key );
00864                  tport_buferror( ERR_OVERFLOW, errnote );
00865               case PUT_OK:
00866                  break;
00867               }
00868           }
00869       } while ( res1 != GET_NONE );
00870    }
00871 }
00872 
00873 
00874 /************************** tport_buffer ****************************/
00875 /*       Function to initialize the input buffering thread          */
00876 /********************************************************************/
00877 int tport_buffer( SHM_INFO  *region1,      /* transport ring             */
00878                   SHM_INFO  *region2,      /* private ring               */
00879                   MSG_LOGO  *getlogo,      /* array of logos to copy     */
00880                   short      nget,         /* number of logos in getlogo */
00881                   unsigned   maxMsgSize,   /* size of message buffer     */
00882                   unsigned char module,    /* module id of main thread   */
00883                   unsigned char instid )   /* instid id of main thread   */
00884 {
00885    unsigned long thread_id;            /* Thread id of the buffer thread */
00886 
00887 /* Allocate message buffer
00888    ***********************/
00889    Message = (char *) malloc( maxMsgSize );
00890    if ( Message == NULL )
00891    {
00892       fprintf( stdout, "tport_buffer: Error allocating message buffer\n" );
00893       return -1;
00894    }
00895 
00896 /* Copy function arguments to global variables
00897    *******************************************/
00898    PubRegion   = region1;
00899    BufRegion   = region2;
00900    Getlogo     = getlogo;
00901    Nget        = nget;
00902    MaxMsgSize  = maxMsgSize;
00903    MyModuleId  = module;
00904    MyInstid    = instid;
00905 
00906 /* Lookup message type for error messages
00907    **************************************/
00908    if ( GetType( "TYPE_ERROR", &TypeError ) != 0 )
00909    {
00910       fprintf( stderr,
00911               "tport_buffer: Invalid message type <TYPE_ERROR>\n" );
00912       return -1;
00913    }
00914 
00915 /* Start the input buffer thread
00916    *****************************/
00917    thread_id = _beginthread( tport_bufthr, 0, NULL );
00918 
00919    if ( thread_id == -1 )                /* Couldn't create thread */
00920    {
00921       fprintf( stderr, "tport_buffer: Can't start the buffer thread." );
00922       return -1;
00923    }
00924    return 0;
00925 }
00926 
00927 
00928 /********************** function tport_copyfrom *********************/
00929 /*      get a message out of public shared memory; save the         */
00930 /*     sequence number from the transport layer, with the intent    */
00931 /*       of copying it to a private (buffering) memory ring         */
00932 /********************************************************************/
00933 
00934 int tport_copyfrom( SHM_INFO  *region,   /* info structure for memory region */
00935                     MSG_LOGO  *getlogo,  /* requested logo(s)                */
00936                     short      nget,     /* number of logos in getlogo       */
00937                     MSG_LOGO  *logo,     /* logo of retrieved message        */
00938                     long      *length,   /* size of retrieved message        */
00939                     char      *msg,      /* retrieved message                */
00940                     long       maxsize,  /* max length for retrieved message */
00941                     unsigned char *seq ) /* TPORT_HEAD seq# of retrieved msg */
00942 {
00943    static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
00944    static int        nlogo;            /* # modid,type,instid combos so far */
00945    int               it;               /* index into trak                   */
00946    SHM_HEAD         *shm;              /* pointer to start of memory region */
00947    char             *ring;             /* pointer to ring part of memory    */
00948    TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
00949    unsigned long     ir;               /* index into the ring               */
00950    long              nfill;            /* bytes from ir to ring's last-byte */
00951    long              nwrap;            /* bytes to grab from front of ring  */
00952    TPORT_HEAD        hd;               /* transport header from memory      */
00953    char             *h;                /* pointer to transport layer header */
00954    int               ih;               /* index into the transport header   */
00955    unsigned long     keyin;            /* in-pointer to shared memory       */
00956    unsigned long     keyold;           /* oldest complete message in memory */
00957    unsigned long     keyget;           /* pointer at which to start search  */
00958    int               status = GET_OK;  /* how did retrieval go?             */
00959    int               lapped = 0;       /* = 1 if memory ring has been over- */
00960                                        /* written since last tport_copyfrom */
00961    int               trakked;          /* flag for trakking list entries    */
00962    int               i,j;
00963 
00964 /**** Get the pointers set up ****/
00965 
00966    shm  = region->addr;
00967    ring = (char *) shm + sizeof(SHM_HEAD);
00968    h    = (char *) (&hd);
00969 
00970 /**** First time around, initialize sequence counters, outpointers ****/
00971 
00972    if (Copyfrom_Init)
00973    {
00974        nlogo = 0;
00975 
00976        for( i=0 ; i < NTRACK_GET ; i++ )
00977        {
00978           trak[i].memkey      = 0;
00979           trak[i].logo.type   = 0;
00980           trak[i].logo.mod    = 0;
00981           trak[i].logo.instid = 0;
00982           trak[i].seq         = 0;
00983           trak[i].keyout      = 0;
00984           trak[i].active      = 0; /*960618:ldd*/
00985        }
00986        Copyfrom_Init = 0;
00987    }
00988 
00989 /**** make sure all requested logos are entered in tracking list ****/
00990 
00991    for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
00992    {
00993        trakked = 0;  /* assume it's not being trakked */
00994        for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
00995        {
00996           if( region->key       != trak[it].memkey      ) continue;
00997           if( getlogo[j].type   != trak[it].logo.type   ) continue;
00998           if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
00999           if( getlogo[j].instid != trak[it].logo.instid ) continue;
01000           trakked = 1;  /* found it in the trakking list! */
01001           break;
01002        }
01003        if( trakked ) continue;
01004     /* Make an entry in trak for this logo; if there's room */
01005        if ( nlogo < NTRACK_GET )
01006        {
01007           it = nlogo;
01008           trak[it].memkey = region->key;
01009           trak[it].logo   = getlogo[j];
01010           nlogo++;
01011        }
01012    }
01013 
01014 /**** find latest starting index to look for any of the requested logos ****/
01015 
01016 findkey:
01017 
01018    keyget = 0;
01019 
01020    for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
01021    {
01022        if ( trak[it].memkey != region->key ) continue;
01023        for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
01024        {
01025           if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
01026              (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
01027              (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
01028           {
01029              if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
01030           }
01031        }
01032    }
01033 
01034 /**** make sure you haven't been lapped by tport_copyto or tport_putmsg ****/
01035    if ( keyget < shm->keyold ) {
01036       keyget = shm->keyold;
01037       lapped = 1;
01038    }
01039 
01040 /**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
01041 /****       If so, reset trak[xx].keyout and go back to findkey       ****/
01042 
01043    keyin = shm->keyin;
01044    if ( keyget > keyin )
01045    {
01046       keyold = shm->keyold;
01047       for ( it=0 ; it < nlogo ; it++ )
01048       {
01049          if( trak[it].memkey == region->key )
01050          {
01051           /* reset keyout */
01052 /*DEBUG*/    /*printf("tport_copyfrom: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
01053                      trak[it].keyout, keyold, keyin );*/
01054              trak[it].keyout = trak[it].keyout % shm->keymax;
01055 /*DEBUG*/    /*printf("tport_copyfrom:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
01056                      trak[it].keyout, keyold, keyin );*/
01057 
01058           /* make sure new keyout points to keyin or to a msg's first-byte; */
01059           /* if not, we've been lapped, so set keyout to keyold             */
01060              ir    = trak[it].keyout;
01061              tmphd = (TPORT_HEAD *) &ring[ir];
01062              if ( trak[it].keyout == keyin   ||
01063                   (keyin-trak[it].keyout)%shm->keymax == 0 )
01064              {
01065 /*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  same as keyin\n",
01066                         trak[it].keyout );*/
01067                 trak[it].keyout = keyin;
01068              }
01069              else if( tmphd->start != FIRST_BYTE )
01070              {
01071 /*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
01072                         trak[it].keyout );*/
01073                 trak[it].keyout = keyold;
01074                 lapped = 1;
01075              }
01076 
01077           /* else, make sure keyout's value is between keyold and keyin */
01078              else if ( trak[it].keyout < keyold )
01079              {
01080                 do {
01081                     trak[it].keyout += shm->keymax;
01082                 } while ( trak[it].keyout < keyold );
01083              }
01084 /*DEBUG*/    /*printf("tport_copyfrom:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
01085                      trak[it].keyout, keyold, keyin );*/
01086          }
01087       }
01088     /*fprintf( stdout,
01089           "NOTICE: tport_copyfrom; keyin wrapped, keyout(s) reset; Region %ld\n",
01090            region->key );*/
01091 
01092       goto findkey;
01093    }
01094 
01095 
01096 /**** Find next message from requested type, module, instid ****/
01097 
01098 nextmsg:
01099 
01100    while ( keyget < keyin )
01101    {
01102    /* make sure you haven't been lapped by tport_copyto or tport_putmsg */
01103        if ( keyget < shm->keyold ) {
01104           keyget = shm->keyold;
01105           lapped = 1;
01106        }
01107 
01108    /* load next header; make sure you weren't lapped */
01109        ir = keyget % shm->keymax;
01110        for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
01111        {
01112           if ( ir >= shm->keymax )  ir -= shm->keymax;
01113           h[ih] = ring[ir++];
01114        }
01115        if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
01116 
01117    /* make sure it starts at beginning of a header */
01118        if ( hd.start != FIRST_BYTE )
01119        {
01120           fprintf( stdout,
01121                   "ERROR: tport_copyfrom; keyget not at FIRST_BYTE, Region %ld\n",
01122                    region->key );
01123           exit( 1 );
01124        }
01125        keyget += sizeof(TPORT_HEAD) + hd.size;
01126 
01127    /* see if this msg matches any requested type */
01128        for ( j=0 ; j < nget ; j++ )
01129        {
01130           if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
01131              (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
01132              (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
01133           {
01134 
01135 /**** Found a message of requested logo; retrieve it! ****/
01136         /* complain if retreived msg is too big */
01137              if ( hd.size > maxsize )
01138              {
01139                *logo   = hd.logo;
01140                *length = hd.size;
01141                *seq    = hd.seq;
01142                 status = GET_TOOBIG;
01143                 goto trackit;    /*changed 960612:ldd*/
01144              }
01145         /* copy message by chunks to caller's address */
01146              nwrap = ir + hd.size - shm->keymax;
01147              if ( nwrap <= 0 )
01148              {
01149                 memcpy( (void *) msg, (void *) &ring[ir], hd.size );
01150              }
01151              else
01152              {
01153                 nfill = hd.size - nwrap;
01154                 memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
01155                 memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
01156              }
01157         /* see if we got lapped by tport_copyto or tport_putmsg while copying msg */
01158         /* if we did, go back and try to get a msg cleanly          */
01159              keyold = shm->keyold;
01160              if ( keyold >= keyget )
01161              {
01162                 keyget = keyold;
01163                 lapped = 1;
01164                 goto nextmsg;
01165              }
01166 
01167         /* set other returned variables */
01168             *logo   = hd.logo;
01169             *length = hd.size;
01170             *seq    = hd.seq;
01171 
01172 trackit:
01173         /* find logo in tracked list */
01174              for ( it=0 ; it < nlogo ; it++ )
01175              {
01176                 if ( region->key    != trak[it].memkey      )  continue;
01177                 if ( hd.logo.type   != trak[it].logo.type   )  continue;
01178                 if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
01179                 if ( hd.logo.instid != trak[it].logo.instid )  continue;
01180                 /* activate sequence tracking if 1st msg */
01181                 if ( !trak[it].active )
01182                 {
01183                     trak[it].seq    = hd.seq;
01184                     trak[it].active = 1;
01185                 }
01186                 goto sequence;
01187              }
01188         /* new logo, track it if there's room */
01189              if ( nlogo == NTRACK_GET )
01190              {
01191                 fprintf( stdout,
01192                      "ERROR: tport_copyfrom; exceeded NTRACK_GET\n");
01193                 if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
01194                 goto wrapup;
01195              }
01196              it = nlogo;
01197              trak[it].memkey = region->key;
01198              trak[it].logo   = hd.logo;
01199              trak[it].seq    = hd.seq;
01200              trak[it].active = 1;      /*960618:ldd*/
01201              nlogo++;
01202 
01203 sequence:
01204         /* check if sequence #'s match; update sequence # */
01205              if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
01206              if ( hd.seq != trak[it].seq )
01207              {
01208                 if (lapped)  status = GET_MISS_LAPPED;
01209                 else         status = GET_MISS_SEQGAP;
01210                 trak[it].seq = hd.seq;
01211              }
01212              trak[it].seq++;
01213 
01214         /* Ok, we're finished grabbing this one */
01215              goto wrapup;
01216 
01217           } /* end if of logo & getlogo match */
01218        }    /* end for over getlogo */
01219    }        /* end while over ring */
01220 
01221 /**** If you got here, there were no messages of requested logo(s) ****/
01222 
01223    status = GET_NONE;
01224 
01225 /**** update outpointer (->msg after retrieved one) for all requested logos ****/
01226 
01227 wrapup:
01228    for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
01229    {
01230        if ( trak[it].memkey != region->key ) continue;
01231        for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
01232        {
01233           if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
01234              (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
01235              (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
01236           {
01237              trak[it].keyout = keyget;
01238           }
01239        }
01240     }
01241 
01242    return( status );
01243 
01244 }
01245 
01246 
01247 /*********************** function tport_copyto ***********************/
01248 /*           Puts a message into a shared memory region.             */
01249 /*    Preserves the sequence number (passed as argument) as the      */
01250 /*                transport layer sequence number                    */
01251 /*********************************************************************/
01252 
01253 int tport_copyto( SHM_INFO    *region,  /* info structure for memory region     */
01254                   MSG_LOGO    *putlogo, /* type, module, instid of incoming msg */
01255                   long         length,  /* size of incoming message             */
01256                   char        *msg,     /* pointer to incoming message          */
01257                   unsigned char seq )   /* preserve as sequence# in TPORT_HEAD  */
01258 {
01259    SHM_HEAD         *shm;              /* pointer to start of memory region   */
01260    char             *ring;             /* pointer to ring part of memory      */
01261    unsigned long     ir;               /* index into memory ring              */
01262    long              nfill;            /* # bytes from ir to ring's last-byte */
01263    long              nwrap;            /* # bytes to wrap to front of ring    */
01264    TPORT_HEAD        hd;               /* transport layer header to put       */
01265    char             *h;                /* pointer to transport layer header   */
01266    TPORT_HEAD        old;              /* transport header of oldest msg      */
01267    char             *o;                /* pointer to oldest transport header  */
01268    int j;
01269    int retval = PUT_OK;                /* return value for this function      */
01270 
01271 /**** First time around, initialize semaphore controls ****/
01272 
01273    if (Copyto_Init)
01274    {
01275        Copyto_Init  = 0;
01276    }
01277 
01278 /**** Set up pointers for shared memory, etc. ****/
01279 
01280    shm  = region->addr;
01281    ring = (char *) shm + sizeof(SHM_HEAD);
01282    h    = (char *) (&hd);
01283    o    = (char *) (&old);
01284 
01285 /**** First, see if the incoming message will fit in the memory region ****/
01286 
01287    if ( length + sizeof(TPORT_HEAD) > shm->keymax )
01288    {
01289       fprintf( stdout,
01290               "ERROR: tport_copyto; message too large (%ld) for Region %ld\n",
01291                length, region->key);
01292       return( PUT_TOOBIG );
01293    }
01294 
01295 /**** Change semaphore to let others know you're using memory ****/
01296 
01297    WaitForSingleObject( region->hMutex, INFINITE );
01298 
01299 /**** Store everything you need in the transport header ****/
01300 
01301    hd.start = FIRST_BYTE;
01302    hd.size  = length;
01303    hd.logo  = *putlogo;
01304    hd.seq   = seq;
01305 
01306 /**** First see if keyin will wrap; if so, reset both keyin and keyold ****/
01307 
01308    if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
01309    {
01310        shm->keyin  = shm->keyin  % shm->keymax;
01311        shm->keyold = shm->keyold % shm->keymax;
01312        if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
01313      /*fprintf( stdout,
01314                "NOTICE: tport_copyto; keyin wrapped & reset; Region %ld\n",
01315                 region->key );*/
01316    }
01317 
01318 /**** Then see if there's enough room for new message in shared memory ****/
01319 /****      If not, "delete" oldest messages until there's room         ****/
01320 
01321    while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
01322    {
01323       ir = shm->keyold % shm->keymax;
01324       if ( ring[ir] != FIRST_BYTE )
01325       {
01326           fprintf( stdout,
01327                   "ERROR: tport_copyto; keyold not at FIRST_BYTE, Region %ld\n",
01328                    region->key );
01329           retval = TPORT_FATAL;
01330           goto release_semaphore; 
01331       }
01332       for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
01333       {
01334          if ( ir >= shm->keymax )   ir -= shm->keymax;
01335          o[j] = ring[ir++];
01336       }
01337       shm->keyold += sizeof(TPORT_HEAD) + old.size;
01338    }
01339 
01340 /**** Now copy transport header into shared memory by chunks... ****/
01341 
01342    ir = shm->keyin % shm->keymax;
01343    nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
01344    if ( nwrap <= 0 )
01345    {
01346          memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
01347    }
01348    else
01349    {
01350          nfill = sizeof(TPORT_HEAD) - nwrap;
01351          memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
01352          memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
01353    }
01354    ir += sizeof(TPORT_HEAD);
01355    if ( ir >= shm->keymax )  ir -= shm->keymax;
01356 
01357 /**** ...and copy message into shared memory by chunks ****/
01358 
01359    nwrap = ir + length - shm->keymax;
01360    if ( nwrap <= 0 )
01361    {
01362          memcpy( (void *) &ring[ir], (void *) msg, length );
01363    }
01364    else
01365    {
01366          nfill = length - nwrap;
01367          memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
01368          memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
01369    }
01370    shm->keyin += sizeof(TPORT_HEAD) + length;
01371 
01372 /**** Finished with shared memory, let others know via semaphore ****/
01373 
01374 release_semaphore:
01375    ReleaseMutex(region->hMutex);
01376 
01377    if( retval == TPORT_FATAL ) exit( 1 );
01378    return( retval ); 
01379 }
01380 
01381 
01382 /************************* tport_buferror ***************************/
01383 /*  Build an error message and put it in the public memory region   */
01384 /********************************************************************/
01385 void tport_buferror( short  ierr,       /* 2-byte error word       */
01386                      char  *note  )     /* string describing error */
01387 {
01388         MSG_LOGO    logo;
01389         char        msg[256];
01390         long        size;
01391         time_t      t;
01392 
01393         logo.instid = MyInstid;
01394         logo.mod    = MyModuleId;
01395         logo.type   = TypeError;
01396 
01397         time( &t );
01398         sprintf( msg, "%ld %hd %s\n", t, ierr, note );
01399         size = strlen( msg );   /* don't include the null byte in the message */
01400 
01401         if ( tport_putmsg( (SHM_INFO *) PubRegion, &logo, size, msg ) != PUT_OK )
01402         {
01403             printf("tport_bufthr:  Error sending error:%hd for module:%d.\n",
01404                     ierr, MyModuleId );
01405         }
01406         return;
01407 }
01408 
01409 
01410 /************************ function tport_syserr **********************/
01411 /*                 Print a system error and terminate.               */
01412 /*********************************************************************/
01413 
01414 void tport_syserr( char *msg,   /* message to print (which routine had an error) */
01415                    long  key )  /* identifies which memory region had the error  */
01416 {
01417    extern int   sys_nerr;
01418    extern char *sys_errlist[];
01419 
01420    long err = GetLastError();   /* Override with per thread err */
01421 
01422    fprintf( stdout, "ERROR: %s (%d", msg, err );
01423    fprintf( stdout, "; %s) Region: %ld\n", strerror(err), key);
01424 
01425 /*   if ( err > 0 && err < sys_nerr )
01426       fprintf( stdout,"; %s) Region: %ld\n", sys_errlist[err], key );
01427    else
01428       fprintf( stdout, ") Region: %ld\n", key ); */
01429 
01430    exit( 1 );
01431 }
01432 

Generated on Tue May 6 09:16:12 2003 for Earthworm Libs by doxygen1.3-rc3