00001 00002 /* 00003 * THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE 00004 * CHECKED IT OUT USING THE COMMAND CHECKOUT. 00005 * 00006 * $Id: mem__circ__queue_8c-source.html 2161 2006-05-19 16:55:03Z paulf $ 00007 * 00008 * Revision history: 00009 * $Log$ 00009 * Revision 1.1 2006/05/19 16:55:02 paulf 00009 * first inclusion 00009 * 00010 * Revision 1.2 2001/04/12 03:57:44 lombard 00011 * Added functions dumpqueue, undumpqueue and peekNextElement 00012 * to allow saving queue to disk and reloading. 00013 * 00014 * Revision 1.1 2000/02/14 18:51:48 lucky 00015 * Initial revision 00016 * 00017 * 00018 */ 00019 00020 /* 00021 This is son of in 'queue_max_size.c', which is son of queue, which Will got from 00022 Kelley and Pohl. These were clever linked list schemes which allocated as needed. 00023 This was too scary: the system could run out of memory and fail after running for 00024 a long time. 00025 So, out with the cleverness, and in with brute force: this version pre- 00026 allocates the max amount of memory it will ever use at startup, and does the 00027 buffering in an array of message slots. The buffering is circular, as before: 00028 When we run out of message slots, we overwrite the oldest message. 00029 alex 2/8/98 00030 */ 00031 00032 #include <errno.h> 00033 #include <stdio.h> 00034 #include <string.h> 00035 #include <stdlib.h> 00036 #include <earthworm.h> 00037 #include <time.h> 00038 #include <mem_circ_queue.h> 00039 00040 /* Internal Function Prototypes */ 00041 static void inc_circular( QUEUE* q, Q_POS * pQPos); 00042 static void dec_circular( QUEUE* q, Q_POS * pQPos); 00043 00044 /**************************************************************** 00045 * initqueue * 00046 * allocates the three queue regions * 00047 * returns: * 00048 * -1 if we couldn't get the memory * 00049 * -2 if the amount implied is bigger than * 00050 * a long * 00051 *****************************************************************/ 00052 int initqueue( QUEUE* q, unsigned long maxElements, 00053 unsigned long elementMaxSize ) 00054 { 00055 unsigned long temp; 00056 QUEUE_ENTRY* pQE; 00057 unsigned long StartingOffset, malloc_size; 00058 00059 /* First allocate the QUEUE structure 00060 *************************************/ 00061 /* this contains the pointers to the start of the message descriptor array 00062 and the message storage area */ 00063 if (q == (QUEUE*)NULL) return(-1); 00064 00065 /* save some arguments to local variables 00066 *****************************************/ 00067 q->MyMaxElements = maxElements; 00068 q->MyMaxSize = elementMaxSize; 00069 q->NumOfElements = 0; 00070 00071 /* Is the total number of bytes within an unsigned long? 00072 * I really hope so, because otherwise this is 00073 * going to be a 4GB queue. 00074 ********************************************/ 00075 temp = q->MyMaxElements * q->MyMaxSize; 00076 temp = temp/maxElements; 00077 if (temp != elementMaxSize) return (-2); 00078 00079 /* Allocate the array of queue entries 00080 ***************************************************/ 00081 malloc_size= maxElements * (sizeof(QUEUE_ENTRY) + elementMaxSize); 00082 q->pQE = (QUEUE_ENTRY*)malloc(malloc_size); 00083 if (q->pQE == (QUEUE_ENTRY*)NULL) return(-1); 00084 00085 /* Set DATA pointers for individual elements 00086 **********************************************************************/ 00087 pQE= q->pQE; /* element of the QUEUE structure which points to the array of buffer descriptors */ 00088 00089 StartingOffset=((unsigned long) pQE) + (maxElements * sizeof(QUEUE_ENTRY)); 00090 00091 for (temp=0; temp< maxElements; temp++) 00092 { 00093 /* For each element, set the data pointer */ 00094 pQE->d = (char *)(StartingOffset + (elementMaxSize * temp)); 00095 pQE++; 00096 } 00097 00098 /* set in and out pointers to indicate empty queue 00099 *************************************************/ 00100 q->first=q->last=0; 00101 /* in = out = first descriptor */ 00102 00103 return(0); 00104 } 00105 00106 /*********************************************************************** 00107 * dequeue * 00108 * * 00109 * Copies oldest message and its logo into caller's space. * 00110 * Returns 0 if no error. * 00111 * Returns -1 if the queue is empty. * 00112 ***********************************************************************/ 00113 int dequeue( QUEUE *q, DATA x, long* size, MSG_LOGO* userLogoPtr ) 00114 { 00115 QUEUE_ENTRY * pFQE; /* Pointer to first(oldest) queue entry */ 00116 00117 if (!(q->NumOfElements) ) /* then the queue is empty */ 00118 return( -1 ); 00119 00120 /* set shortcut for first queue element */ 00121 pFQE=&(q->pQE[q->first]); 00122 00123 /* copy text of message to caller's memory 00124 ******************************************/ 00125 memcpy( x, pFQE->d, (size_t)(pFQE->length) ); 00126 00127 /* copy the logo 00128 ***************/ 00129 userLogoPtr->type = (pFQE->queueLogo).type; 00130 userLogoPtr->mod = (pFQE->queueLogo).mod ; 00131 userLogoPtr->instid = (pFQE->queueLogo).instid; 00132 00133 *size = pFQE->length; /* message length */ 00134 00135 /* move the out pointer 00136 **********************/ 00137 inc_circular(q, &(q->first) ); 00138 00139 q->NumOfElements--; /* we're dequeueing an element, so there is one less */ 00140 00141 return( 0 ); 00142 } 00143 00144 00145 /*********************************************************************** 00146 * enqueue * 00147 * * 00148 * moves message into circular buffer. * 00149 * Copies string x to the new element. * 00150 * Returns 0 if no error. * 00151 * Returns -1 message too big * 00152 * Returns -3 if we clobbered an unsent message (stepped on our tail) * 00153 ***********************************************************************/ 00154 int enqueue( QUEUE *q, DATA x, long size, MSG_LOGO userLogo ) 00155 { 00156 int ourRet; 00157 QUEUE_ENTRY * pLQE; /* pointer to last(youngest) entry in queue */ 00158 00159 ourRet=0 ; /* presume all will go well */ 00160 00161 /* if message is larger than max, do nothing and return error 00162 ************************************************************/ 00163 if (size > q->MyMaxSize) return(-1); 00164 00165 /* set shortcut for last queue element */ 00166 pLQE=&((q->pQE)[q->last]); 00167 00168 /* copy message text 00169 *******************/ 00170 memcpy( pLQE->d, x, size); 00171 00172 /* copy logo and size 00173 *********************/ 00174 (pLQE->queueLogo).type = userLogo.type; 00175 (pLQE->queueLogo).mod = userLogo.mod; 00176 (pLQE->queueLogo).instid = userLogo.instid; 00177 (pLQE)->length = size; 00178 00179 00180 /* move to next buffer 00181 *********************/ 00182 inc_circular(q, &(q->last) ); 00183 if( q->NumOfElements == q->MyMaxElements ) /* then the queue is full */ 00184 { /* in that case we just overwrote the contents of the first(oldest) 00185 message in the queue, so 'drive' the outpointer forward one */ 00186 inc_circular(q, &(q->first) ); 00187 q->NumOfElements--; /* we're overwriting an element, so there is one less */ 00188 ourRet = -3; /* meaning we're over-writing messages */ 00189 } 00190 00191 q->NumOfElements++; /* we're adding an element, so there is one more */ 00192 00193 return( ourRet ); 00194 } 00195 00196 /* to move supplied pointer to the next buffer descriptor structure 00197 * - in a circular way */ 00198 static void inc_circular( QUEUE* q, Q_POS * pQPos) 00199 { 00200 (*pQPos)++; 00201 if(*pQPos >= q->MyMaxElements) 00202 *pQPos=0; 00203 } 00204 00205 /* to move supplied pointer to the prev buffer descriptor structure 00206 * - in a circular way */ 00207 static void dec_circular( QUEUE* q, Q_POS * pQPos) 00208 { 00209 (*pQPos)--; 00210 if((*pQPos) < 0) 00211 *pQPos= (q->MyMaxElements)-1; 00212 } 00213 00214 00215 int getNumOfElementsInQueue( QUEUE * q) 00216 { 00217 return(q->NumOfElements); 00218 } 00219 00220 Q_POS getNext(QUEUE * q, int QueuePosition) 00221 { 00222 Q_POS temp; 00223 00224 temp=QueuePosition; 00225 if(temp < 0 || temp >= q->MyMaxElements) 00226 return(-1); 00227 else 00228 { 00229 inc_circular(q,&temp); 00230 return(temp); 00231 } 00232 } 00233 00234 00235 Q_POS getPrev(QUEUE * q, int QueuePosition) 00236 { 00237 Q_POS temp; 00238 00239 temp=QueuePosition; 00240 if(temp < 0 || temp >= q->MyMaxElements) 00241 return(-1); 00242 else 00243 { 00244 dec_circular(q,&temp); 00245 return(temp); 00246 } 00247 } 00248 00249 Q_POS getPosFirst(QUEUE * q) 00250 { 00251 return(q->first); 00252 } 00253 00254 00255 Q_POS getPosLast(QUEUE * q) 00256 { 00257 return(q->last); 00258 } 00259 00260 /* 00261 * dumpqueue: dump the contents of a queue to a file to be used for restarts 00262 * returns: 0 on success 00263 * -1 on errors opening or writing file; file is deleted 00264 */ 00265 int dumpqueue( QUEUE *q, char *filename) 00266 { 00267 QUEUE_ENTRY *pQE; 00268 FILE *fp; 00269 long i; 00270 Q_POS p; 00271 int ret = 0; 00272 time_t timestamp; 00273 00274 if ( (fp = fopen(filename, "wb")) == (FILE *)NULL) 00275 return( -1 ); 00276 00277 timestamp = time(0); 00278 if (fwrite(×tamp, sizeof(time_t), 1, fp) < 1) 00279 { 00280 ret = -1; 00281 goto Cleanup; 00282 } 00283 if (fwrite(&q->NumOfElements, sizeof(long), 1, fp) < 1) 00284 { 00285 ret = -1; 00286 goto Cleanup; 00287 } 00288 /* Loop through all the queue entries */ 00289 p = q->first; 00290 for (i = 0l; i < q->NumOfElements; i++, inc_circular(q, &p)) 00291 { 00292 pQE = &(q->pQE[p]); 00293 if (fwrite(&pQE->queueLogo, sizeof(MSG_LOGO), 1, fp) < 1) 00294 { 00295 ret = -1; 00296 goto Cleanup; 00297 } 00298 if (fwrite(&pQE->length, sizeof(long), 1, fp) < 1) 00299 { 00300 ret = -1; 00301 goto Cleanup; 00302 } 00303 if ((long)fwrite(pQE->d, sizeof(char), pQE->length, fp) < pQE->length) 00304 { 00305 ret = -1; 00306 goto Cleanup; 00307 } 00308 } 00309 if (fwrite(×tamp, sizeof(time_t), 1, fp) < 1) 00310 { 00311 ret = -1; 00312 goto Cleanup; 00313 } 00314 00315 Cleanup: 00316 fclose(fp); 00317 if (ret < 0) 00318 unlink(filename); 00319 00320 return( ret ); 00321 } 00322 00323 00324 /* 00325 * undumpqueue: read a queue file previously written by dumpqueue(). 00326 * Contents of file will be inserted into the specified queue. 00327 * The queue is assumed to be empty on entry to this function. 00328 * Returns: 0 on success 00329 * +1 if dump file does not exist 00330 * -1 on errors opening or reading file 00331 * -2 if there are more entries in the file than will fit in queue 00332 * -3 if file entry is larger than will fit in queue 00333 * -4 if timestamps at each end of file don't match 00334 */ 00335 int undumpqueue( QUEUE *q, char *filename) 00336 { 00337 QUEUE_ENTRY *pQE; 00338 FILE *fp; 00339 long i, num_entries; 00340 int ret = 0; 00341 time_t timestamp1, timestamp2; 00342 00343 if ( (fp = fopen(filename, "rb")) == (FILE *)NULL) 00344 { 00345 if (errno == ENOENT) 00346 return( +1); 00347 else 00348 return( -1 ); 00349 } 00350 00351 if (fread(×tamp1, sizeof(time_t), 1, fp) < 1) 00352 { 00353 ret = -1; 00354 goto Cleanup; 00355 } 00356 00357 if (fread(&num_entries, sizeof(long), 1, fp) < 1) 00358 { 00359 ret = -1; 00360 goto Cleanup; 00361 } 00362 if (num_entries > q->MyMaxElements) 00363 { 00364 ret = -2; 00365 goto Cleanup; 00366 } 00367 00368 for (i = 0l; i < num_entries; i++) 00369 { 00370 pQE = &(q->pQE[q->last]); 00371 if (fread(&pQE->queueLogo, sizeof(MSG_LOGO), 1, fp) < 1) 00372 { 00373 ret = -1; 00374 goto Cleanup; 00375 } 00376 if (fread(&pQE->length, sizeof(long), 1, fp) < 1) 00377 { 00378 ret = -1; 00379 goto Cleanup; 00380 } 00381 if (pQE->length > q->MyMaxSize) 00382 { 00383 ret = -3; 00384 goto Cleanup; 00385 } 00386 if ((long)fread(pQE->d, sizeof(char), pQE->length, fp) < pQE->length) 00387 { 00388 ret = -1; 00389 goto Cleanup; 00390 } 00391 inc_circular(q, &q->last); 00392 q->NumOfElements++; 00393 } 00394 00395 if (fread(×tamp2, sizeof(time_t), 1, fp) < 1) 00396 { 00397 ret = -1; 00398 goto Cleanup; 00399 } 00400 if (timestamp2 != timestamp1) 00401 { /* Timestamps don't match; can't trust file, so reset the queue to empty */ 00402 q->last = q->first = 0; 00403 q->NumOfElements = 0; 00404 ret = -4; 00405 } 00406 00407 Cleanup: 00408 fclose(fp); 00409 00410 return( ret ); 00411 } 00412 00413 /* 00414 * peekNextElement: return a pointer to the next element without changing it. 00415 * Returns: pointer to data on success 00416 * NULL if there are no elements in the queue 00417 */ 00418 DATA peekNextElement( QUEUE *q ) 00419 { 00420 if (q->NumOfElements > 0) 00421 return( q->pQE[q->first].d ); 00422 else 00423 return( NULL ); 00424 }