00001 00002 /* 00003 * THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE 00004 * CHECKED IT OUT USING THE COMMAND CHECKOUT. 00005 * 00006 * $Id: transport_8c-source.html 2161 2006-05-19 16:55:03Z paulf $ 00007 * 00008 * Revision history: 00009 * $Log$ 00009 * Revision 1.1 2006/05/19 16:55:02 paulf 00009 * first inclusion 00009 * 00010 * Revision 1.3 2001/05/04 23:43:54 dietz 00011 * Changed flag arg of tport_putflag from short to int to handle 00012 * processids properly. 00013 * 00014 * Revision 1.2 2000/06/02 18:19:48 dietz 00015 * Fixed tport_putmsg,tport_copyto to always release semaphore before returning 00016 * 00017 * Revision 1.1 2000/02/14 18:53:30 lucky 00018 * Initial revision 00019 * 00020 * 00021 */ 00022 00023 /********************************************************************/ 00024 /* 6/2000 */ 00025 /* transport.c */ 00026 /* */ 00027 /* Transport layer functions to access shared memory regions. */ 00028 /* */ 00029 /* written by Lynn Dietz, Will Kohler with inspiration from */ 00030 /* Carl Johnson, Alex Bittenbinder, Barbara Bogaert */ 00031 /* */ 00032 /********************************************************************/ 00033 00034 /* ***** Notes on development, delete when appropriate 00035 1. Change the quotes for the transport.h and earthworm.h includes 00036 when these are moved to the appropriately pathed included directory. 00037 */ 00038 #include <windows.h> 00039 #include <sys/types.h> 00040 #include <stdio.h> 00041 #include <errno.h> 00042 #include <string.h> 00043 #include <time.h> 00044 #include <process.h> 00045 #include <transport.h> 00046 00047 static short Put_Init=1; /* initialization flag */ 00048 static short Get_Init=1; /* initialization flag */ 00049 static short Copyfrom_Init=1; /* initialization flag */ 00050 static short Copyto_Init =1; /* initialization flag */ 00051 00052 /* These functions are for internal use by transport functions only 00053 ****************************************************************/ 00054 void tport_syserr ( char *, long ); 00055 void tport_buferror( short, char * ); 00056 00057 /* These statements and variables are required by the functions of 00058 the input-buffering thread 00059 ***************************************************************/ 00060 #include "earthworm.h" 00061 volatile SHM_INFO *PubRegion; /* transport public ring */ 00062 volatile SHM_INFO *BufRegion; /* pointer to private ring */ 00063 volatile MSG_LOGO *Getlogo; /* array of logos to copy */ 00064 volatile short Nget; /* number of logos in getlogo */ 00065 volatile unsigned MaxMsgSize; /* size of message buffer */ 00066 volatile char *Message; /* message buffer */ 00067 static unsigned char MyModuleId; /* module id of main thread */ 00068 unsigned char MyInstid; /* instid of main thread */ 00069 unsigned char TypeError; /* type for error messages */ 00070 00071 /******************** function tport_create *************************/ 00072 /* Create a shared memory region & its semaphore, */ 00073 /* attach to it and initialize header values. */ 00074 /********************************************************************/ 00075 void tport_create( SHM_INFO *region, /* info structure for memory region */ 00076 long nbytes, /* size of shared memory region */ 00077 long memkey ) /* key to shared memory region */ 00078 { 00079 SHM_HEAD *shm; /* pointer to start of memory region */ 00080 HANDLE hshare; // Handle to memory shared file 00081 HANDLE hmutex; // Handle to mutex object 00082 char share[20]; // Shared file name from memkey 00083 char mutex[20]; // Mutex name 00084 int err; // Error code from GetLastError() 00085 00086 /**** Create shared memory region ****/ 00087 sprintf(share, "SHR_%ld", memkey); 00088 hshare = CreateFileMapping( 00089 (HANDLE)0xFFFFFFFF, // Request memory file (swap space) 00090 NULL, // Security attributes 00091 PAGE_READWRITE, // Access restrictions 00092 0, // High order size (for very large mappings) 00093 nbytes, // Low order size 00094 share); // Name of file for other processes 00095 if ( hshare == NULL ) 00096 { 00097 err = GetLastError(); 00098 tport_syserr( "CreateFileMapping", err); 00099 } 00100 00101 /**** Attach to shared memory region ****/ 00102 shm = (SHM_HEAD *) MapViewOfFile( 00103 hshare, // File object to map 00104 FILE_MAP_WRITE, // Access desired 00105 0, // High-order 32 bits of file offset 00106 0, // Low-order 32 bits of file offset 00107 nbytes); // Number of bytes to map 00108 if ( shm == NULL ) 00109 { 00110 err = GetLastError(); 00111 tport_syserr( "MapViewOfFile", err ); 00112 } 00113 00114 /**** Initialize shared memory region header ****/ 00115 shm->nbytes = nbytes; 00116 shm->keymax = nbytes - sizeof(SHM_HEAD); 00117 shm->keyin = sizeof(SHM_HEAD); 00118 shm->keyold = shm->keyin; 00119 shm->flag = 0; 00120 00121 /**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/ 00122 sprintf(mutex, "MTX_%ld", memkey); 00123 hmutex = CreateMutex( 00124 NULL, // Security attributes 00125 FALSE, // Initial ownership 00126 mutex); // Name of mutex (derived from memkey) 00127 if ( hmutex == NULL ) 00128 { 00129 err = GetLastError(); 00130 tport_syserr( "CreateMutex", err); 00131 } 00132 00133 /**** set values in the shared memory information structure ****/ 00134 region->addr = shm; 00135 region->hShare = hshare; 00136 region->hMutex = hmutex; 00137 region->key = memkey; 00138 } 00139 00140 00141 /******************** function tport_destroy *************************/ 00142 /* Destroy a shared memory region. */ 00143 /*********************************************************************/ 00144 00145 void tport_destroy( SHM_INFO *region ) 00146 { 00147 int err; 00148 00149 /***** Set kill flag, give other attached programs time to terminate ****/ 00150 00151 tport_putflag( region, TERMINATE ); 00152 Sleep( 1000 ); 00153 00154 /***** Detach from shared memory region *****/ 00155 if(!UnmapViewOfFile( region->addr )) { 00156 err = GetLastError(); 00157 tport_syserr( "UnmapViewOfFile", err); 00158 } 00159 00160 /***** Destroy semaphore set for shared memory region *****/ 00161 if(!CloseHandle(region->hMutex)) { 00162 err = GetLastError(); 00163 tport_syserr( "CloseHandle (mutex)", err); 00164 } 00165 00166 00167 /***** Destroy shared memory region *****/ 00168 if(!CloseHandle(region->hShare)) { 00169 err = GetLastError(); 00170 tport_syserr( "CloseHandle (share)", err); 00171 } 00172 } 00173 00174 /******************** function tport_attach *************************/ 00175 /* Map to an existing shared memory region. */ 00176 /********************************************************************/ 00177 00178 void tport_attach( SHM_INFO *region, /* info structure for memory region */ 00179 long memkey ) /* key to shared memory region */ 00180 { 00181 SHM_HEAD *shm; /* pointer to start of memory region */ 00182 HANDLE hshare; // Handle to memory shared file 00183 HANDLE hmutex; // Handle to mutex object 00184 char share[20]; // Shared file name from memkey 00185 char mutex[20]; // Mutex name 00186 int err; // Error code from GetLastError() 00187 00188 /**** Create shared memory region ****/ 00189 sprintf(share, "SHR_%ld", memkey); 00190 hshare = OpenFileMapping( 00191 FILE_MAP_WRITE, 00192 TRUE, 00193 share); 00194 if ( hshare == NULL ) 00195 { 00196 err = GetLastError(); 00197 tport_syserr( "OpenFileMapping", err); 00198 } 00199 00200 /**** Attach to shared memory region ****/ 00201 shm = (SHM_HEAD *) MapViewOfFile( 00202 hshare, // File object to map 00203 FILE_MAP_WRITE, // Access desired 00204 0, // High-order 32 bits of file offset 00205 0, // Low-order 32 bits of file offset 00206 0); // Number of bytes to map 00207 if ( shm == NULL ) 00208 { 00209 err = GetLastError(); 00210 tport_syserr( "MapViewOfFile", err ); 00211 } 00212 00213 /**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/ 00214 sprintf(mutex, "MTX_%ld", memkey); 00215 hmutex = CreateMutex( 00216 NULL, // Security attributes 00217 FALSE, // Initial ownership 00218 mutex); // Name of mutex (derived from memkey) 00219 if ( hmutex == NULL ) 00220 { 00221 err = GetLastError(); 00222 tport_syserr( "CreateMutex", err); 00223 } 00224 00225 /**** set values in the shared memory information structure ****/ 00226 region->addr = shm; 00227 region->hShare = hshare; 00228 region->hMutex = hmutex; 00229 region->key = memkey; 00230 } 00231 00232 /******************** function tport_detach **************************/ 00233 /* Detach from a shared memory region. */ 00234 /*********************************************************************/ 00235 00236 void tport_detach( SHM_INFO *region ) 00237 { 00238 int err; 00239 00240 /***** Detach from shared memory region *****/ 00241 if(!UnmapViewOfFile( region->addr )) { 00242 err = GetLastError(); 00243 tport_syserr( "UnmapViewOfFile", err); 00244 } 00245 00246 /***** Destroy semaphore set for shared memory region *****/ 00247 if(!CloseHandle(region->hMutex)) { 00248 err = GetLastError(); 00249 tport_syserr( "CloseHandle (mutex)", err); 00250 } 00251 00252 00253 /***** Destroy shared memory region *****/ 00254 if(!CloseHandle(region->hShare)) { 00255 err = GetLastError(); 00256 tport_syserr( "CloseHandle (share)", err); 00257 } 00258 } 00259 00260 00261 00262 /*********************** function tport_putmsg ***********************/ 00263 /* Put a message into a shared memory region. */ 00264 /* Assigns a transport-layer sequence number. */ 00265 /*********************************************************************/ 00266 00267 int tport_putmsg( SHM_INFO *region, /* info structure for memory region */ 00268 MSG_LOGO *putlogo, /* type, module, instid of incoming msg */ 00269 long length, /* size of incoming message */ 00270 char *msg ) /* pointer to incoming message */ 00271 { 00272 volatile static MSG_TRACK trak[NTRACK_PUT]; /* sequence number keeper */ 00273 volatile static int nlogo; /* # of logos seen so far */ 00274 int it; /* index into trak */ 00275 SHM_HEAD *shm; /* pointer to start of memory region */ 00276 char *ring; /* pointer to ring part of memory */ 00277 unsigned long ir; /* index into memory ring */ 00278 long nfill; /* # bytes from ir to ring's last-byte */ 00279 long nwrap; /* # bytes to wrap to front of ring */ 00280 TPORT_HEAD hd; /* transport layer header to put */ 00281 char *h; /* pointer to transport layer header */ 00282 TPORT_HEAD old; /* transport header of oldest msg */ 00283 char *o; /* pointer to oldest transport header */ 00284 int j; 00285 int retval = PUT_OK; /* return value for this function */ 00286 00287 /**** First time around, init the sequence counters, semaphore controls ****/ 00288 00289 if (Put_Init) 00290 { 00291 nlogo = 0; 00292 00293 for( j=0 ; j < NTRACK_PUT ; j++ ) 00294 { 00295 trak[j].memkey = 0; 00296 trak[j].logo.type = 0; 00297 trak[j].logo.mod = 0; 00298 trak[j].logo.instid = 0; 00299 trak[j].seq = 0; 00300 trak[j].keyout = 0; 00301 } 00302 00303 Put_Init = 0; 00304 } 00305 00306 /**** Set up pointers for shared memory, etc. ****/ 00307 00308 shm = region->addr; 00309 ring = (char *) shm + sizeof(SHM_HEAD); 00310 h = (char *) (&hd); 00311 o = (char *) (&old); 00312 00313 /**** First, see if the incoming message will fit in the memory region ****/ 00314 00315 if ( length + sizeof(TPORT_HEAD) > shm->keymax ) 00316 { 00317 fprintf( stdout, 00318 "ERROR: tport_putmsg; message too large (%ld) for Region %ld\n", 00319 length, region->key); 00320 return( PUT_TOOBIG ); 00321 } 00322 00323 /**** Change semaphore; let others know you're using tracking structure & memory ****/ 00324 00325 WaitForSingleObject(region->hMutex, INFINITE); 00326 00327 /**** Next, find incoming logo in list of combinations already seen ****/ 00328 00329 for( it=0 ; it < nlogo ; it++ ) 00330 { 00331 if ( region->key != trak[it].memkey ) continue; 00332 if ( putlogo->type != trak[it].logo.type ) continue; 00333 if ( putlogo->mod != trak[it].logo.mod ) continue; 00334 if ( putlogo->instid != trak[it].logo.instid ) continue; 00335 goto build_header; 00336 } 00337 00338 /**** Incoming logo is a new combination; store it, if there's room ****/ 00339 00340 if ( nlogo == NTRACK_PUT ) 00341 { 00342 fprintf( stdout, 00343 "ERROR: tport_putmsg; exceeded NTRACK_PUT, msg not sent\n"); 00344 retval = PUT_NOTRACK; 00345 goto release_semaphore; 00346 } 00347 it = nlogo; 00348 trak[it].memkey = region->key; 00349 trak[it].logo = *putlogo; 00350 nlogo++; 00351 00352 /**** Store everything you need in the transport header ****/ 00353 00354 build_header: 00355 hd.start = FIRST_BYTE; 00356 hd.size = length; 00357 hd.logo = trak[it].logo; 00358 hd.seq = trak[it].seq++; 00359 00360 /**** In shared memory, see if keyin will wrap; if so, reset keyin and keyold ****/ 00361 00362 if ( shm->keyin + sizeof(TPORT_HEAD) + length < shm->keyold ) 00363 { 00364 shm->keyin = shm->keyin % shm->keymax; 00365 shm->keyold = shm->keyold % shm->keymax; 00366 if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax; 00367 /*fprintf( stdout, 00368 "NOTICE: tport_putmsg; keyin wrapped & reset; Region %ld\n", 00369 region->key );*/ 00370 } 00371 00372 /**** Then see if there's enough room for new message in shared memory ****/ 00373 /**** If not, "delete" oldest messages until there's room ****/ 00374 00375 while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax ) 00376 { 00377 ir = shm->keyold % shm->keymax; 00378 if ( ring[ir] != FIRST_BYTE ) 00379 { 00380 fprintf( stdout, 00381 "ERROR: tport_putmsg; keyold not at FIRST_BYTE, Region %ld\n", 00382 region->key ); 00383 retval = TPORT_FATAL; 00384 goto release_semaphore; 00385 } 00386 for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ ) 00387 { 00388 if ( ir >= shm->keymax ) ir -= shm->keymax; 00389 o[j] = ring[ir++]; 00390 } 00391 shm->keyold += sizeof(TPORT_HEAD) + old.size; 00392 } 00393 00394 /**** Now copy transport header into shared memory by chunks... ****/ 00395 00396 ir = shm->keyin % shm->keymax; 00397 nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax; 00398 if ( nwrap <= 0 ) 00399 { 00400 memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) ); 00401 } 00402 else 00403 { 00404 nfill = sizeof(TPORT_HEAD) - nwrap; 00405 memcpy( (void *) &ring[ir], (void *) &h[0], nfill ); 00406 memcpy( (void *) &ring[0], (void *) &h[nfill], nwrap ); 00407 } 00408 ir += sizeof(TPORT_HEAD); 00409 if ( ir >= shm->keymax ) ir -= shm->keymax; 00410 00411 /**** ...and copy message into shared memory by chunks ****/ 00412 00413 nwrap = ir + length - shm->keymax; 00414 if ( nwrap <= 0 ) 00415 { 00416 memcpy( (void *) &ring[ir], (void *) msg, length ); 00417 } 00418 else 00419 { 00420 nfill = length - nwrap; 00421 memcpy( (void *) &ring[ir], (void *) &msg[0], nfill ); 00422 memcpy( (void *) &ring[0], (void *) &msg[nfill], nwrap ); 00423 } 00424 shm->keyin += sizeof(TPORT_HEAD) + length; 00425 00426 /**** Finished with shared memory, let others know via semaphore ****/ 00427 00428 release_semaphore: 00429 ReleaseMutex(region->hMutex); 00430 00431 if( retval == TPORT_FATAL ) exit( 1 ); 00432 return( retval ); 00433 } 00434 00435 00436 /*********************** function tport_getmsg ***********************/ 00437 /* Get a message out of shared memory. */ 00438 /*********************************************************************/ 00439 00440 int tport_getmsg( SHM_INFO *region, /* info structure for memory region */ 00441 MSG_LOGO *getlogo, /* requested logo(s) */ 00442 short nget, /* number of logos in getlogo */ 00443 MSG_LOGO *logo, /* logo of retrieved msg */ 00444 long *length, /* size of retrieved message */ 00445 char *msg, /* retrieved message */ 00446 long maxsize ) /* max length for retrieved message */ 00447 { 00448 static MSG_TRACK trak[NTRACK_GET]; /* sequence #, outpointer keeper */ 00449 static int nlogo; /* # modid,type,instid combos so far */ 00450 int it; /* index into trak */ 00451 SHM_HEAD *shm; /* pointer to start of memory region */ 00452 char *ring; /* pointer to ring part of memory */ 00453 TPORT_HEAD *tmphd; /* temp pointer into shared memory */ 00454 unsigned long ir; /* index into the ring */ 00455 long nfill; /* bytes from ir to ring's last-byte */ 00456 long nwrap; /* bytes to grab from front of ring */ 00457 TPORT_HEAD hd; /* transport header from memory */ 00458 char *h; /* pointer to transport layer header */ 00459 int ih; /* index into the transport header */ 00460 unsigned long keyin; /* in-pointer to shared memory */ 00461 unsigned long keyold; /* oldest complete message in memory */ 00462 unsigned long keyget; /* pointer at which to start search */ 00463 int status = GET_OK; /* how did retrieval go? */ 00464 int trakked; /* flag for trakking list entries */ 00465 int i,j; 00466 00467 /**** Get the pointers set up ****/ 00468 00469 shm = region->addr; 00470 ring = (char *) shm + sizeof(SHM_HEAD); 00471 h = (char *) (&hd); 00472 00473 /**** First time around, initialize sequence counters, outpointers ****/ 00474 00475 if (Get_Init) 00476 { 00477 nlogo = 0; 00478 00479 for( i=0 ; i < NTRACK_GET ; i++ ) 00480 { 00481 trak[i].memkey = 0; 00482 trak[i].logo.type = 0; 00483 trak[i].logo.mod = 0; 00484 trak[i].logo.instid = 0; 00485 trak[i].seq = 0; 00486 trak[i].keyout = 0; 00487 trak[i].active = 0; /*960618:ldd*/ 00488 } 00489 Get_Init = 0; 00490 } 00491 00492 /**** make sure all requested logos are entered in tracking list ****/ 00493 00494 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 00495 { 00496 trakked = 0; /* assume it's not being trakked */ 00497 for( it=0 ; it < nlogo ; it++ ) /* for all logos we're tracking */ 00498 { 00499 if( region->key != trak[it].memkey ) continue; 00500 if( getlogo[j].type != trak[it].logo.type ) continue; 00501 if( getlogo[j].mod != trak[it].logo.mod ) continue; 00502 if( getlogo[j].instid != trak[it].logo.instid ) continue; 00503 trakked = 1; /* found it in the trakking list! */ 00504 break; 00505 } 00506 if( trakked ) continue; 00507 /* Make an entry in trak for this logo; if there's room */ 00508 if ( nlogo < NTRACK_GET ) 00509 { 00510 it = nlogo; 00511 trak[it].memkey = region->key; 00512 trak[it].logo = getlogo[j]; 00513 nlogo++; 00514 } 00515 } 00516 00517 /**** find latest starting index to look for any of the requested logos ****/ 00518 00519 findkey: 00520 00521 keyget = shm->keyold; 00522 00523 for ( it=0 ; it < nlogo ; it++ ) /* for all message logos we're tracking */ 00524 { 00525 if ( trak[it].memkey != region->key ) continue; 00526 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 00527 { 00528 if((getlogo[j].type == trak[it].logo.type || getlogo[j].type==WILD ) && 00529 (getlogo[j].mod == trak[it].logo.mod || getlogo[j].mod==WILD ) && 00530 (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) ) 00531 { 00532 if ( trak[it].keyout > keyget ) keyget = trak[it].keyout; 00533 } 00534 } 00535 } 00536 keyin = shm->keyin; 00537 00538 /**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/ 00539 /**** If so, reset trak[xx].keyout and go back to findkey ****/ 00540 00541 if ( keyget > keyin ) 00542 { 00543 keyold = shm->keyold; 00544 for ( it=0 ; it < nlogo ; it++ ) 00545 { 00546 if( trak[it].memkey == region->key ) 00547 { 00548 /* reset keyout */ 00549 /*DEBUG*/ /*printf("tport_getmsg: Pre-reset: keyout=%10u keyold=%10u keyin=%10u\n", 00550 trak[it].keyout, keyold, keyin );*/ 00551 trak[it].keyout = trak[it].keyout % shm->keymax; 00552 /*DEBUG*/ /*printf("tport_getmsg: Intermed: keyout=%10u keyold=%10u keyin=%10u\n", 00553 trak[it].keyout, keyold, keyin );*/ 00554 00555 /* make sure new keyout points to keyin or to a msg's first-byte; */ 00556 /* if not, we've been lapped, so set keyout to keyold */ 00557 ir = trak[it].keyout; 00558 tmphd = (TPORT_HEAD *) &ring[ir]; 00559 if ( trak[it].keyout == keyin || 00560 (keyin-trak[it].keyout)%shm->keymax == 0 ) 00561 { 00562 /*DEBUG*/ /*printf("tport_getmsg: Intermed: keyout=%10u same as keyin\n", 00563 trak[it].keyout );*/ 00564 trak[it].keyout = keyin; 00565 } 00566 else if( tmphd->start != FIRST_BYTE ) 00567 { 00568 /*DEBUG*/ /*printf("tport_getmsg: Intermed: keyout=%10u does not point to FIRST_BYTE\n", 00569 trak[it].keyout );*/ 00570 trak[it].keyout = keyold; 00571 } 00572 00573 /* else, make sure keyout's value is between keyold and keyin */ 00574 else if ( trak[it].keyout < keyold ) 00575 { 00576 do { 00577 trak[it].keyout += shm->keymax; 00578 } while ( trak[it].keyout < keyold ); 00579 } 00580 /*DEBUG*/ /*printf("tport_getmsg: Reset: keyout=%10u keyold=%10u keyin=%10u\n", 00581 trak[it].keyout, keyold, keyin );*/ 00582 } 00583 } 00584 /*fprintf( stdout, 00585 "NOTICE: tport_getmsg; keyin wrapped, keyout(s) reset; Region %ld\n", 00586 region->key );*/ 00587 00588 goto findkey; 00589 } 00590 00591 00592 /**** Find next message from requested type, module, instid ****/ 00593 00594 nextmsg: 00595 00596 while ( keyget < keyin ) 00597 { 00598 /* make sure you haven't been lapped by tport_putmsg */ 00599 if ( keyget < shm->keyold ) keyget = shm->keyold; 00600 00601 /* load next header; make sure you weren't lapped */ 00602 ir = keyget % shm->keymax; 00603 for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ ) 00604 { 00605 if ( ir >= shm->keymax ) ir -= shm->keymax; 00606 h[ih] = ring[ir++]; 00607 } 00608 if ( keyget < shm->keyold ) continue; /*added 960612:ldd*/ 00609 00610 /* make sure it starts at beginning of a header */ 00611 if ( hd.start != FIRST_BYTE ) 00612 { 00613 fprintf( stdout, 00614 "ERROR: tport_getmsg; keyget not at FIRST_BYTE, Region %ld\n", 00615 region->key ); 00616 exit( 1 ); 00617 } 00618 keyget += sizeof(TPORT_HEAD) + hd.size; 00619 00620 /* see if this msg matches any requested type */ 00621 for ( j=0 ; j < nget ; j++ ) 00622 { 00623 if((getlogo[j].type == hd.logo.type || getlogo[j].type == WILD) && 00624 (getlogo[j].mod == hd.logo.mod || getlogo[j].mod == WILD) && 00625 (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) ) 00626 { 00627 00628 /**** Found a message of requested logo; retrieve it! ****/ 00629 /* complain if retreived msg is too big */ 00630 if ( hd.size > maxsize ) 00631 { 00632 *logo = hd.logo; 00633 *length = hd.size; 00634 status = GET_TOOBIG; 00635 goto trackit; /*changed 960612:ldd*/ 00636 } 00637 /* copy message by chunks to caller's address */ 00638 nwrap = ir + hd.size - shm->keymax; 00639 if ( nwrap <= 0 ) 00640 { 00641 memcpy( (void *) msg, (void *) &ring[ir], hd.size ); 00642 } 00643 else 00644 { 00645 nfill = hd.size - nwrap; 00646 memcpy( (void *) &msg[0], (void *) &ring[ir], nfill ); 00647 memcpy( (void *) &msg[nfill], (void *) &ring[0], nwrap ); 00648 } 00649 /* see if we got run over by tport_putmsg while copying msg */ 00650 /* if we did, go back and try to get a msg cleanly */ 00651 keyold = shm->keyold; 00652 if ( keyold >= keyget ) 00653 { 00654 keyget = keyold; 00655 goto nextmsg; 00656 } 00657 00658 /* set other returned variables */ 00659 *logo = hd.logo; 00660 *length = hd.size; 00661 00662 trackit: 00663 /* find msg logo in tracked list */ 00664 for ( it=0 ; it < nlogo ; it++ ) 00665 { 00666 if ( region->key != trak[it].memkey ) continue; 00667 if ( hd.logo.type != trak[it].logo.type ) continue; 00668 if ( hd.logo.mod != trak[it].logo.mod ) continue; 00669 if ( hd.logo.instid != trak[it].logo.instid ) continue; 00670 /* activate sequence tracking if 1st msg */ 00671 if ( !trak[it].active ) 00672 { 00673 trak[it].seq = hd.seq; 00674 trak[it].active = 1; 00675 } 00676 goto sequence; 00677 } 00678 /* new logo, track it if there's room */ 00679 if ( nlogo == NTRACK_GET ) 00680 { 00681 fprintf( stdout, 00682 "ERROR: tport_getmsg; exceeded NTRACK_GET\n"); 00683 if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/ 00684 goto wrapup; 00685 } 00686 it = nlogo; 00687 trak[it].memkey = region->key; 00688 trak[it].logo = hd.logo; 00689 trak[it].seq = hd.seq; 00690 trak[it].active = 1; /*960618:ldd*/ 00691 nlogo++; 00692 00693 sequence: 00694 /* check if sequence #'s match; update sequence # */ 00695 if ( status == GET_TOOBIG ) goto wrapup; /*added 960612:ldd*/ 00696 if ( hd.seq != trak[it].seq ) 00697 { 00698 status = GET_MISS; 00699 trak[it].seq = hd.seq; 00700 } 00701 trak[it].seq++; 00702 00703 /* Ok, we're finished grabbing this one */ 00704 goto wrapup; 00705 00706 } /* end if of logo & getlogo match */ 00707 } /* end for over getlogo */ 00708 } /* end while over ring */ 00709 00710 /**** If you got here, there were no messages of requested logo(s) ****/ 00711 00712 status = GET_NONE; 00713 00714 /**** update outpointer (->msg after retrieved one) for all requested logos ****/ 00715 00716 wrapup: 00717 for ( it=0 ; it < nlogo ; it++ ) /* for all message logos we're tracking */ 00718 { 00719 if ( trak[it].memkey != region->key ) continue; 00720 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 00721 { 00722 if((getlogo[j].type == trak[it].logo.type || getlogo[j].type==WILD) && 00723 (getlogo[j].mod == trak[it].logo.mod || getlogo[j].mod==WILD) && 00724 (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) ) 00725 { 00726 trak[it].keyout = keyget; 00727 } 00728 } 00729 } 00730 00731 return( status ); 00732 00733 } 00734 00735 00736 /********************* function tport_putflag ************************/ 00737 /* Puts the kill flag into a shared memory region. */ 00738 /*********************************************************************/ 00739 00740 void tport_putflag( SHM_INFO *region, /* shared memory info structure */ 00741 int flag ) /* tells attached processes to exit */ 00742 { 00743 SHM_HEAD *shm; 00744 00745 shm = region->addr; 00746 shm->flag = flag; 00747 return; 00748 } 00749 00750 00751 00752 /*********************** function tport_getflag **********************/ 00753 /* Returns the kill flag from a shared memory region. */ 00754 /*********************************************************************/ 00755 00756 int tport_getflag( SHM_INFO *region ) 00757 00758 { 00759 SHM_HEAD *shm; 00760 00761 shm = region->addr; 00762 return( (int)shm->flag ); 00763 } 00764 00765 00766 /************************** tport_bufthr ****************************/ 00767 /* Thread to buffer input from one transport ring to another. */ 00768 /********************************************************************/ 00769 void tport_bufthr( void *dummy ) 00770 { 00771 char errnote[150]; 00772 MSG_LOGO logo; 00773 long msgsize; 00774 unsigned char msgseq; 00775 int res1, res2; 00776 int gotmsg; 00777 HANDLE myHandle = GetCurrentThread(); 00778 00779 /* Reset my own thread priority 00780 ****************************/ 00781 if ( SetThreadPriority( myHandle, THREAD_PRIORITY_TIME_CRITICAL ) == 0 ) 00782 { 00783 printf( "Error setting buffer thread priority: %d\n", GetLastError() ); 00784 exit( -1 ); 00785 } 00786 00787 /* Flush all existing messages from the public memory region 00788 *********************************************************/ 00789 while( tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo, 00790 Nget, &logo, &msgsize, (char *) Message, 00791 MaxMsgSize, &msgseq ) != GET_NONE ); 00792 00793 while ( 1 ) 00794 { 00795 Sleep( 500 ); 00796 00797 /* If a terminate flag is found, copy it to the private ring. 00798 Then, terminate this thread. 00799 *********************************************************/ 00800 if ( tport_getflag( (SHM_INFO *) PubRegion ) == TERMINATE ) 00801 { 00802 tport_putflag( (SHM_INFO *) BufRegion, TERMINATE ); 00803 _endthread(); 00804 } 00805 00806 /* Try to copy a message from the public memory region 00807 ***************************************************/ 00808 do 00809 { 00810 res1 = tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo, 00811 Nget, &logo, &msgsize, (char *) Message, 00812 MaxMsgSize, &msgseq ); 00813 gotmsg = 1; 00814 00815 /* Handle return values 00816 ********************/ 00817 switch ( res1 ) 00818 { 00819 case GET_MISS_LAPPED: 00820 sprintf( errnote, 00821 "tport_bufthr: Missed msg(s) c%d m%d t%d Overwritten, region:%ld.", 00822 (int) logo.instid, (int) logo.mod, (int) logo.type, 00823 PubRegion->key ); 00824 tport_buferror( ERR_LAPPED, errnote ); 00825 break; 00826 case GET_MISS_SEQGAP: 00827 sprintf( errnote, 00828 "tport_bufthr: Missed msg(s) c%d m%d t%d Sequence gap, region:%ld.", 00829 (int) logo.instid, (int) logo.mod, (int) logo.type, 00830 PubRegion->key ); 00831 tport_buferror( ERR_SEQGAP, errnote ); 00832 break; 00833 case GET_NOTRACK: 00834 sprintf( errnote, 00835 "tport_bufthr: Logo c%d m%d t%d not tracked; NTRACK_GET exceeded.", 00836 (int) logo.instid, (int) logo.mod, (int) logo.type ); 00837 tport_buferror( ERR_UNTRACKED, errnote ); 00838 case GET_OK: 00839 break; 00840 case GET_TOOBIG: 00841 sprintf( errnote, 00842 "tport_bufthr: msg[%ld] c%d m%d t%d seq%d too big; skipped in region:%ld.", 00843 msgsize, (int) logo.instid, (int) logo.mod, 00844 (int) logo.type, (int) msgseq, PubRegion->key ); 00845 tport_buferror( ERR_OVERFLOW, errnote ); 00846 case GET_NONE: 00847 gotmsg = 0; 00848 break; 00849 } 00850 00851 /* If you did get a message, copy it to private ring 00852 *************************************************/ 00853 if ( gotmsg ) 00854 { 00855 res2 = tport_copyto( (SHM_INFO *) BufRegion, &logo, 00856 msgsize, (char *) Message, msgseq ); 00857 switch (res2) 00858 { 00859 case PUT_TOOBIG: 00860 sprintf( errnote, 00861 "tport_bufthr: msg[%ld] (c%d m%d t%d) too big for Region:%ld.", 00862 msgsize, (int) logo.instid, (int) logo.mod, (int) logo.type, 00863 BufRegion->key ); 00864 tport_buferror( ERR_OVERFLOW, errnote ); 00865 case PUT_OK: 00866 break; 00867 } 00868 } 00869 } while ( res1 != GET_NONE ); 00870 } 00871 } 00872 00873 00874 /************************** tport_buffer ****************************/ 00875 /* Function to initialize the input buffering thread */ 00876 /********************************************************************/ 00877 int tport_buffer( SHM_INFO *region1, /* transport ring */ 00878 SHM_INFO *region2, /* private ring */ 00879 MSG_LOGO *getlogo, /* array of logos to copy */ 00880 short nget, /* number of logos in getlogo */ 00881 unsigned maxMsgSize, /* size of message buffer */ 00882 unsigned char module, /* module id of main thread */ 00883 unsigned char instid ) /* instid id of main thread */ 00884 { 00885 unsigned long thread_id; /* Thread id of the buffer thread */ 00886 00887 /* Allocate message buffer 00888 ***********************/ 00889 Message = (char *) malloc( maxMsgSize ); 00890 if ( Message == NULL ) 00891 { 00892 fprintf( stdout, "tport_buffer: Error allocating message buffer\n" ); 00893 return -1; 00894 } 00895 00896 /* Copy function arguments to global variables 00897 *******************************************/ 00898 PubRegion = region1; 00899 BufRegion = region2; 00900 Getlogo = getlogo; 00901 Nget = nget; 00902 MaxMsgSize = maxMsgSize; 00903 MyModuleId = module; 00904 MyInstid = instid; 00905 00906 /* Lookup message type for error messages 00907 **************************************/ 00908 if ( GetType( "TYPE_ERROR", &TypeError ) != 0 ) 00909 { 00910 fprintf( stderr, 00911 "tport_buffer: Invalid message type <TYPE_ERROR>\n" ); 00912 return -1; 00913 } 00914 00915 /* Start the input buffer thread 00916 *****************************/ 00917 thread_id = _beginthread( tport_bufthr, 0, NULL ); 00918 00919 if ( thread_id == -1 ) /* Couldn't create thread */ 00920 { 00921 fprintf( stderr, "tport_buffer: Can't start the buffer thread." ); 00922 return -1; 00923 } 00924 return 0; 00925 } 00926 00927 00928 /********************** function tport_copyfrom *********************/ 00929 /* get a message out of public shared memory; save the */ 00930 /* sequence number from the transport layer, with the intent */ 00931 /* of copying it to a private (buffering) memory ring */ 00932 /********************************************************************/ 00933 00934 int tport_copyfrom( SHM_INFO *region, /* info structure for memory region */ 00935 MSG_LOGO *getlogo, /* requested logo(s) */ 00936 short nget, /* number of logos in getlogo */ 00937 MSG_LOGO *logo, /* logo of retrieved message */ 00938 long *length, /* size of retrieved message */ 00939 char *msg, /* retrieved message */ 00940 long maxsize, /* max length for retrieved message */ 00941 unsigned char *seq ) /* TPORT_HEAD seq# of retrieved msg */ 00942 { 00943 static MSG_TRACK trak[NTRACK_GET]; /* sequence #, outpointer keeper */ 00944 static int nlogo; /* # modid,type,instid combos so far */ 00945 int it; /* index into trak */ 00946 SHM_HEAD *shm; /* pointer to start of memory region */ 00947 char *ring; /* pointer to ring part of memory */ 00948 TPORT_HEAD *tmphd; /* temp pointer into shared memory */ 00949 unsigned long ir; /* index into the ring */ 00950 long nfill; /* bytes from ir to ring's last-byte */ 00951 long nwrap; /* bytes to grab from front of ring */ 00952 TPORT_HEAD hd; /* transport header from memory */ 00953 char *h; /* pointer to transport layer header */ 00954 int ih; /* index into the transport header */ 00955 unsigned long keyin; /* in-pointer to shared memory */ 00956 unsigned long keyold; /* oldest complete message in memory */ 00957 unsigned long keyget; /* pointer at which to start search */ 00958 int status = GET_OK; /* how did retrieval go? */ 00959 int lapped = 0; /* = 1 if memory ring has been over- */ 00960 /* written since last tport_copyfrom */ 00961 int trakked; /* flag for trakking list entries */ 00962 int i,j; 00963 00964 /**** Get the pointers set up ****/ 00965 00966 shm = region->addr; 00967 ring = (char *) shm + sizeof(SHM_HEAD); 00968 h = (char *) (&hd); 00969 00970 /**** First time around, initialize sequence counters, outpointers ****/ 00971 00972 if (Copyfrom_Init) 00973 { 00974 nlogo = 0; 00975 00976 for( i=0 ; i < NTRACK_GET ; i++ ) 00977 { 00978 trak[i].memkey = 0; 00979 trak[i].logo.type = 0; 00980 trak[i].logo.mod = 0; 00981 trak[i].logo.instid = 0; 00982 trak[i].seq = 0; 00983 trak[i].keyout = 0; 00984 trak[i].active = 0; /*960618:ldd*/ 00985 } 00986 Copyfrom_Init = 0; 00987 } 00988 00989 /**** make sure all requested logos are entered in tracking list ****/ 00990 00991 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 00992 { 00993 trakked = 0; /* assume it's not being trakked */ 00994 for( it=0 ; it < nlogo ; it++ ) /* for all logos we're tracking */ 00995 { 00996 if( region->key != trak[it].memkey ) continue; 00997 if( getlogo[j].type != trak[it].logo.type ) continue; 00998 if( getlogo[j].mod != trak[it].logo.mod ) continue; 00999 if( getlogo[j].instid != trak[it].logo.instid ) continue; 01000 trakked = 1; /* found it in the trakking list! */ 01001 break; 01002 } 01003 if( trakked ) continue; 01004 /* Make an entry in trak for this logo; if there's room */ 01005 if ( nlogo < NTRACK_GET ) 01006 { 01007 it = nlogo; 01008 trak[it].memkey = region->key; 01009 trak[it].logo = getlogo[j]; 01010 nlogo++; 01011 } 01012 } 01013 01014 /**** find latest starting index to look for any of the requested logos ****/ 01015 01016 findkey: 01017 01018 keyget = 0; 01019 01020 for ( it=0 ; it < nlogo ; it++ ) /* for all message logos we're tracking */ 01021 { 01022 if ( trak[it].memkey != region->key ) continue; 01023 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 01024 { 01025 if((getlogo[j].type == trak[it].logo.type || getlogo[j].type==WILD) && 01026 (getlogo[j].mod == trak[it].logo.mod || getlogo[j].mod==WILD) && 01027 (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) ) 01028 { 01029 if ( trak[it].keyout > keyget ) keyget = trak[it].keyout; 01030 } 01031 } 01032 } 01033 01034 /**** make sure you haven't been lapped by tport_copyto or tport_putmsg ****/ 01035 if ( keyget < shm->keyold ) { 01036 keyget = shm->keyold; 01037 lapped = 1; 01038 } 01039 01040 /**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/ 01041 /**** If so, reset trak[xx].keyout and go back to findkey ****/ 01042 01043 keyin = shm->keyin; 01044 if ( keyget > keyin ) 01045 { 01046 keyold = shm->keyold; 01047 for ( it=0 ; it < nlogo ; it++ ) 01048 { 01049 if( trak[it].memkey == region->key ) 01050 { 01051 /* reset keyout */ 01052 /*DEBUG*/ /*printf("tport_copyfrom: Pre-reset: keyout=%10u keyold=%10u keyin=%10u\n", 01053 trak[it].keyout, keyold, keyin );*/ 01054 trak[it].keyout = trak[it].keyout % shm->keymax; 01055 /*DEBUG*/ /*printf("tport_copyfrom: Intermed: keyout=%10u keyold=%10u keyin=%10u\n", 01056 trak[it].keyout, keyold, keyin );*/ 01057 01058 /* make sure new keyout points to keyin or to a msg's first-byte; */ 01059 /* if not, we've been lapped, so set keyout to keyold */ 01060 ir = trak[it].keyout; 01061 tmphd = (TPORT_HEAD *) &ring[ir]; 01062 if ( trak[it].keyout == keyin || 01063 (keyin-trak[it].keyout)%shm->keymax == 0 ) 01064 { 01065 /*DEBUG*/ /*printf("tport_copyfrom: Intermed: keyout=%10u same as keyin\n", 01066 trak[it].keyout );*/ 01067 trak[it].keyout = keyin; 01068 } 01069 else if( tmphd->start != FIRST_BYTE ) 01070 { 01071 /*DEBUG*/ /*printf("tport_copyfrom: Intermed: keyout=%10u does not point to FIRST_BYTE\n", 01072 trak[it].keyout );*/ 01073 trak[it].keyout = keyold; 01074 lapped = 1; 01075 } 01076 01077 /* else, make sure keyout's value is between keyold and keyin */ 01078 else if ( trak[it].keyout < keyold ) 01079 { 01080 do { 01081 trak[it].keyout += shm->keymax; 01082 } while ( trak[it].keyout < keyold ); 01083 } 01084 /*DEBUG*/ /*printf("tport_copyfrom: Reset: keyout=%10u keyold=%10u keyin=%10u\n", 01085 trak[it].keyout, keyold, keyin );*/ 01086 } 01087 } 01088 /*fprintf( stdout, 01089 "NOTICE: tport_copyfrom; keyin wrapped, keyout(s) reset; Region %ld\n", 01090 region->key );*/ 01091 01092 goto findkey; 01093 } 01094 01095 01096 /**** Find next message from requested type, module, instid ****/ 01097 01098 nextmsg: 01099 01100 while ( keyget < keyin ) 01101 { 01102 /* make sure you haven't been lapped by tport_copyto or tport_putmsg */ 01103 if ( keyget < shm->keyold ) { 01104 keyget = shm->keyold; 01105 lapped = 1; 01106 } 01107 01108 /* load next header; make sure you weren't lapped */ 01109 ir = keyget % shm->keymax; 01110 for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ ) 01111 { 01112 if ( ir >= shm->keymax ) ir -= shm->keymax; 01113 h[ih] = ring[ir++]; 01114 } 01115 if ( keyget < shm->keyold ) continue; /*added 960612:ldd*/ 01116 01117 /* make sure it starts at beginning of a header */ 01118 if ( hd.start != FIRST_BYTE ) 01119 { 01120 fprintf( stdout, 01121 "ERROR: tport_copyfrom; keyget not at FIRST_BYTE, Region %ld\n", 01122 region->key ); 01123 exit( 1 ); 01124 } 01125 keyget += sizeof(TPORT_HEAD) + hd.size; 01126 01127 /* see if this msg matches any requested type */ 01128 for ( j=0 ; j < nget ; j++ ) 01129 { 01130 if((getlogo[j].type == hd.logo.type || getlogo[j].type == WILD) && 01131 (getlogo[j].mod == hd.logo.mod || getlogo[j].mod == WILD) && 01132 (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) ) 01133 { 01134 01135 /**** Found a message of requested logo; retrieve it! ****/ 01136 /* complain if retreived msg is too big */ 01137 if ( hd.size > maxsize ) 01138 { 01139 *logo = hd.logo; 01140 *length = hd.size; 01141 *seq = hd.seq; 01142 status = GET_TOOBIG; 01143 goto trackit; /*changed 960612:ldd*/ 01144 } 01145 /* copy message by chunks to caller's address */ 01146 nwrap = ir + hd.size - shm->keymax; 01147 if ( nwrap <= 0 ) 01148 { 01149 memcpy( (void *) msg, (void *) &ring[ir], hd.size ); 01150 } 01151 else 01152 { 01153 nfill = hd.size - nwrap; 01154 memcpy( (void *) &msg[0], (void *) &ring[ir], nfill ); 01155 memcpy( (void *) &msg[nfill], (void *) &ring[0], nwrap ); 01156 } 01157 /* see if we got lapped by tport_copyto or tport_putmsg while copying msg */ 01158 /* if we did, go back and try to get a msg cleanly */ 01159 keyold = shm->keyold; 01160 if ( keyold >= keyget ) 01161 { 01162 keyget = keyold; 01163 lapped = 1; 01164 goto nextmsg; 01165 } 01166 01167 /* set other returned variables */ 01168 *logo = hd.logo; 01169 *length = hd.size; 01170 *seq = hd.seq; 01171 01172 trackit: 01173 /* find logo in tracked list */ 01174 for ( it=0 ; it < nlogo ; it++ ) 01175 { 01176 if ( region->key != trak[it].memkey ) continue; 01177 if ( hd.logo.type != trak[it].logo.type ) continue; 01178 if ( hd.logo.mod != trak[it].logo.mod ) continue; 01179 if ( hd.logo.instid != trak[it].logo.instid ) continue; 01180 /* activate sequence tracking if 1st msg */ 01181 if ( !trak[it].active ) 01182 { 01183 trak[it].seq = hd.seq; 01184 trak[it].active = 1; 01185 } 01186 goto sequence; 01187 } 01188 /* new logo, track it if there's room */ 01189 if ( nlogo == NTRACK_GET ) 01190 { 01191 fprintf( stdout, 01192 "ERROR: tport_copyfrom; exceeded NTRACK_GET\n"); 01193 if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/ 01194 goto wrapup; 01195 } 01196 it = nlogo; 01197 trak[it].memkey = region->key; 01198 trak[it].logo = hd.logo; 01199 trak[it].seq = hd.seq; 01200 trak[it].active = 1; /*960618:ldd*/ 01201 nlogo++; 01202 01203 sequence: 01204 /* check if sequence #'s match; update sequence # */ 01205 if ( status == GET_TOOBIG ) goto wrapup; /*added 960612:ldd*/ 01206 if ( hd.seq != trak[it].seq ) 01207 { 01208 if (lapped) status = GET_MISS_LAPPED; 01209 else status = GET_MISS_SEQGAP; 01210 trak[it].seq = hd.seq; 01211 } 01212 trak[it].seq++; 01213 01214 /* Ok, we're finished grabbing this one */ 01215 goto wrapup; 01216 01217 } /* end if of logo & getlogo match */ 01218 } /* end for over getlogo */ 01219 } /* end while over ring */ 01220 01221 /**** If you got here, there were no messages of requested logo(s) ****/ 01222 01223 status = GET_NONE; 01224 01225 /**** update outpointer (->msg after retrieved one) for all requested logos ****/ 01226 01227 wrapup: 01228 for ( it=0 ; it < nlogo ; it++ ) /* for all message logos we're tracking */ 01229 { 01230 if ( trak[it].memkey != region->key ) continue; 01231 for ( j=0 ; j < nget ; j++ ) /* for all requested message logos */ 01232 { 01233 if((getlogo[j].type == trak[it].logo.type || getlogo[j].type==WILD) && 01234 (getlogo[j].mod == trak[it].logo.mod || getlogo[j].mod==WILD) && 01235 (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) ) 01236 { 01237 trak[it].keyout = keyget; 01238 } 01239 } 01240 } 01241 01242 return( status ); 01243 01244 } 01245 01246 01247 /*********************** function tport_copyto ***********************/ 01248 /* Puts a message into a shared memory region. */ 01249 /* Preserves the sequence number (passed as argument) as the */ 01250 /* transport layer sequence number */ 01251 /*********************************************************************/ 01252 01253 int tport_copyto( SHM_INFO *region, /* info structure for memory region */ 01254 MSG_LOGO *putlogo, /* type, module, instid of incoming msg */ 01255 long length, /* size of incoming message */ 01256 char *msg, /* pointer to incoming message */ 01257 unsigned char seq ) /* preserve as sequence# in TPORT_HEAD */ 01258 { 01259 SHM_HEAD *shm; /* pointer to start of memory region */ 01260 char *ring; /* pointer to ring part of memory */ 01261 unsigned long ir; /* index into memory ring */ 01262 long nfill; /* # bytes from ir to ring's last-byte */ 01263 long nwrap; /* # bytes to wrap to front of ring */ 01264 TPORT_HEAD hd; /* transport layer header to put */ 01265 char *h; /* pointer to transport layer header */ 01266 TPORT_HEAD old; /* transport header of oldest msg */ 01267 char *o; /* pointer to oldest transport header */ 01268 int j; 01269 int retval = PUT_OK; /* return value for this function */ 01270 01271 /**** First time around, initialize semaphore controls ****/ 01272 01273 if (Copyto_Init) 01274 { 01275 Copyto_Init = 0; 01276 } 01277 01278 /**** Set up pointers for shared memory, etc. ****/ 01279 01280 shm = region->addr; 01281 ring = (char *) shm + sizeof(SHM_HEAD); 01282 h = (char *) (&hd); 01283 o = (char *) (&old); 01284 01285 /**** First, see if the incoming message will fit in the memory region ****/ 01286 01287 if ( length + sizeof(TPORT_HEAD) > shm->keymax ) 01288 { 01289 fprintf( stdout, 01290 "ERROR: tport_copyto; message too large (%ld) for Region %ld\n", 01291 length, region->key); 01292 return( PUT_TOOBIG ); 01293 } 01294 01295 /**** Change semaphore to let others know you're using memory ****/ 01296 01297 WaitForSingleObject( region->hMutex, INFINITE ); 01298 01299 /**** Store everything you need in the transport header ****/ 01300 01301 hd.start = FIRST_BYTE; 01302 hd.size = length; 01303 hd.logo = *putlogo; 01304 hd.seq = seq; 01305 01306 /**** First see if keyin will wrap; if so, reset both keyin and keyold ****/ 01307 01308 if ( shm->keyin + sizeof(TPORT_HEAD) + length < shm->keyold ) 01309 { 01310 shm->keyin = shm->keyin % shm->keymax; 01311 shm->keyold = shm->keyold % shm->keymax; 01312 if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax; 01313 /*fprintf( stdout, 01314 "NOTICE: tport_copyto; keyin wrapped & reset; Region %ld\n", 01315 region->key );*/ 01316 } 01317 01318 /**** Then see if there's enough room for new message in shared memory ****/ 01319 /**** If not, "delete" oldest messages until there's room ****/ 01320 01321 while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax ) 01322 { 01323 ir = shm->keyold % shm->keymax; 01324 if ( ring[ir] != FIRST_BYTE ) 01325 { 01326 fprintf( stdout, 01327 "ERROR: tport_copyto; keyold not at FIRST_BYTE, Region %ld\n", 01328 region->key ); 01329 retval = TPORT_FATAL; 01330 goto release_semaphore; 01331 } 01332 for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ ) 01333 { 01334 if ( ir >= shm->keymax ) ir -= shm->keymax; 01335 o[j] = ring[ir++]; 01336 } 01337 shm->keyold += sizeof(TPORT_HEAD) + old.size; 01338 } 01339 01340 /**** Now copy transport header into shared memory by chunks... ****/ 01341 01342 ir = shm->keyin % shm->keymax; 01343 nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax; 01344 if ( nwrap <= 0 ) 01345 { 01346 memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) ); 01347 } 01348 else 01349 { 01350 nfill = sizeof(TPORT_HEAD) - nwrap; 01351 memcpy( (void *) &ring[ir], (void *) &h[0], nfill ); 01352 memcpy( (void *) &ring[0], (void *) &h[nfill], nwrap ); 01353 } 01354 ir += sizeof(TPORT_HEAD); 01355 if ( ir >= shm->keymax ) ir -= shm->keymax; 01356 01357 /**** ...and copy message into shared memory by chunks ****/ 01358 01359 nwrap = ir + length - shm->keymax; 01360 if ( nwrap <= 0 ) 01361 { 01362 memcpy( (void *) &ring[ir], (void *) msg, length ); 01363 } 01364 else 01365 { 01366 nfill = length - nwrap; 01367 memcpy( (void *) &ring[ir], (void *) &msg[0], nfill ); 01368 memcpy( (void *) &ring[0], (void *) &msg[nfill], nwrap ); 01369 } 01370 shm->keyin += sizeof(TPORT_HEAD) + length; 01371 01372 /**** Finished with shared memory, let others know via semaphore ****/ 01373 01374 release_semaphore: 01375 ReleaseMutex(region->hMutex); 01376 01377 if( retval == TPORT_FATAL ) exit( 1 ); 01378 return( retval ); 01379 } 01380 01381 01382 /************************* tport_buferror ***************************/ 01383 /* Build an error message and put it in the public memory region */ 01384 /********************************************************************/ 01385 void tport_buferror( short ierr, /* 2-byte error word */ 01386 char *note ) /* string describing error */ 01387 { 01388 MSG_LOGO logo; 01389 char msg[256]; 01390 long size; 01391 time_t t; 01392 01393 logo.instid = MyInstid; 01394 logo.mod = MyModuleId; 01395 logo.type = TypeError; 01396 01397 time( &t ); 01398 sprintf( msg, "%ld %hd %s\n", t, ierr, note ); 01399 size = strlen( msg ); /* don't include the null byte in the message */ 01400 01401 if ( tport_putmsg( (SHM_INFO *) PubRegion, &logo, size, msg ) != PUT_OK ) 01402 { 01403 printf("tport_bufthr: Error sending error:%hd for module:%d.\n", 01404 ierr, MyModuleId ); 01405 } 01406 return; 01407 } 01408 01409 01410 /************************ function tport_syserr **********************/ 01411 /* Print a system error and terminate. */ 01412 /*********************************************************************/ 01413 01414 void tport_syserr( char *msg, /* message to print (which routine had an error) */ 01415 long key ) /* identifies which memory region had the error */ 01416 { 01417 extern int sys_nerr; 01418 extern char *sys_errlist[]; 01419 01420 long err = GetLastError(); /* Override with per thread err */ 01421 01422 fprintf( stdout, "ERROR: %s (%d", msg, err ); 01423 fprintf( stdout, "; %s) Region: %ld\n", strerror(err), key); 01424 01425 /* if ( err > 0 && err < sys_nerr ) 01426 fprintf( stdout,"; %s) Region: %ld\n", sys_errlist[err], key ); 01427 else 01428 fprintf( stdout, ") Region: %ld\n", key ); */ 01429 01430 exit( 1 ); 01431 } 01432