|
@@ -1,7 +1,7 @@
|
|
|
-
|
|
|
+
|
|
|
|
|
|
|
|
|
- * Copyright (c) 2001-2004 Willem Dijkstra
|
|
|
+ * Copyright (c) 2001-2005 Willem Dijkstra
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
@@ -58,48 +58,37 @@
|
|
|
*
|
|
|
* The moment the master symux receives a new packet:
|
|
|
*
|
|
|
- * master calls master_forbidread:
|
|
|
- * - master checks the 'SEM_READ' semaphore to be equal to the number of
|
|
|
- * children active. If so, all children are up to date. If not, a child is
|
|
|
- * lagging and will die soon. Reap_clients().
|
|
|
- * - master adds any pending new clients
|
|
|
- * - master resets all semaphores, preventing clients to start reading
|
|
|
+ * master determines the dataslot and issues master_forbidread for that slot:
|
|
|
+ * - checks if the read semaphore for the slot is zero. If not, reap_clients is
|
|
|
+ * called to reap any stopped clients
|
|
|
+ * - adds any pending new clients
|
|
|
+ * - reset the semaphore for the slot to prevent clients to start reading
|
|
|
*
|
|
|
- * master calls master_permitread:
|
|
|
+ * master copies new data into the dataslot and calls master_permitread
|
|
|
* - increment sequence number in the shared region
|
|
|
- * - increment 'SEM_WAIT' with the number of registered clients
|
|
|
+ * - increment read semaphore for the slot with the number of registered clients
|
|
|
*
|
|
|
* clients call client_waitread:
|
|
|
- * - block on semaphore 'SEM_WAIT'
|
|
|
- * - set client sequence number to shared region sequence number
|
|
|
+ * - increment client sequence number and determine dataslot
|
|
|
+ * - exit if client sequence number < (shared sequence - max dataslots - 1)
|
|
|
+ * - block on semaphore for dataslot
|
|
|
*
|
|
|
* clients do something to the data in the shared region
|
|
|
*
|
|
|
- * clients call client_doneread:
|
|
|
- * - If the recorded sequence number deviates from the one in the shared region
|
|
|
- * the client is lagging behind. The client will kill itself.
|
|
|
- * - The client increments 'SEM_READ'.
|
|
|
- *
|
|
|
*/
|
|
|
|
|
|
__BEGIN_DECLS
|
|
|
-int recvfd(int);
|
|
|
-int sendfd(int, int);
|
|
|
void check_master();
|
|
|
void check_sem();
|
|
|
void client_doneread();
|
|
|
void client_loop();
|
|
|
void client_signalhandler();
|
|
|
-void client_waitread();
|
|
|
+int client_waitread();
|
|
|
void exitmaster();
|
|
|
-void master_resetsem();
|
|
|
+void master_resetsem(int);
|
|
|
void reap_clients();
|
|
|
__END_DECLS
|
|
|
|
|
|
-#define SEM_WAIT 0
|
|
|
-#define SEM_READ 1
|
|
|
-#define SEM_TOTAL 2
|
|
|
-
|
|
|
int realclients;
|
|
|
int newclients;
|
|
|
int master;
|
|
@@ -116,33 +105,36 @@ key_t semid;
|
|
|
enum ipcstat semstat;
|
|
|
int seqnr;
|
|
|
|
|
|
-long *
|
|
|
-shared_getmem()
|
|
|
+char *
|
|
|
+shared_getmem(int slot)
|
|
|
{
|
|
|
- return &shm->data;
|
|
|
+ long offset = ((slot % SYMUX_SHARESLOTS) * shm->slotlen);
|
|
|
+ char *data = (char *)&shm->data + offset;
|
|
|
+
|
|
|
+ return data;
|
|
|
}
|
|
|
|
|
|
long
|
|
|
shared_getmaxlen()
|
|
|
{
|
|
|
- return shm->reglen - sizeof(struct sharedregion);
|
|
|
+ return shm->slotlen;
|
|
|
}
|
|
|
|
|
|
void
|
|
|
-shared_setlen(long length)
|
|
|
+shared_setlen(int slot, long length)
|
|
|
{
|
|
|
- if (length > (shm->reglen - (long) sizeof(struct sharedregion)))
|
|
|
+ if (length > shm->slotlen)
|
|
|
fatal("%s:%d: internal error:"
|
|
|
"set_length of shared region called with value larger than actual size",
|
|
|
__FILE__, __LINE__);
|
|
|
|
|
|
- shm->ctlen = length;
|
|
|
+ shm->ctlen[slot % SYMUX_SHARESLOTS] = length;
|
|
|
}
|
|
|
|
|
|
long
|
|
|
-shared_getlen()
|
|
|
+shared_getlen(int slot)
|
|
|
{
|
|
|
- return shm->ctlen;
|
|
|
+ return shm->ctlen[slot % SYMUX_SHARESLOTS];
|
|
|
}
|
|
|
|
|
|
void
|
|
@@ -164,7 +156,7 @@ check_master()
|
|
|
|
|
|
|
|
|
void
|
|
|
-master_resetsem()
|
|
|
+master_resetsem(int semnum)
|
|
|
{
|
|
|
union semun semarg;
|
|
|
|
|
@@ -173,16 +165,16 @@ master_resetsem()
|
|
|
check_sem();
|
|
|
check_master();
|
|
|
|
|
|
- if ((semctl(semid, SEM_WAIT, SETVAL, semarg) != 0) ||
|
|
|
- (semctl(semid, SEM_READ, SETVAL, semarg) != 0))
|
|
|
- fatal("%s:%d: internal error: cannot reset semaphores",
|
|
|
- __FILE__, __LINE__);
|
|
|
+ if ((semctl(semid, semnum, SETVAL, semarg) != 0))
|
|
|
+ fatal("%s:%d: internal error: cannot reset semaphore %d",
|
|
|
+ __FILE__, __LINE__, semnum);
|
|
|
}
|
|
|
|
|
|
-void
|
|
|
+int
|
|
|
master_forbidread()
|
|
|
{
|
|
|
- int clientsread;
|
|
|
+ int slot = (shm->seqnr + 1) % SYMUX_SHARESLOTS;
|
|
|
+ int stalledclients;
|
|
|
union semun semarg;
|
|
|
|
|
|
check_sem();
|
|
@@ -190,43 +182,50 @@ master_forbidread()
|
|
|
|
|
|
|
|
|
semarg.val = 0;
|
|
|
- if ((clientsread = semctl(semid, SEM_READ, GETVAL, semarg)) < 0)
|
|
|
+ if ((stalledclients = semctl(semid, slot, GETVAL, semarg)) < 0) {
|
|
|
fatal("%s:%d: internal error: cannot read semaphore",
|
|
|
__FILE__, __LINE__);
|
|
|
-
|
|
|
- if (clientsread != realclients) {
|
|
|
+ } else {
|
|
|
reap_clients();
|
|
|
- debug("realclients = %d; clientsread = %d", realclients, clientsread);
|
|
|
+ debug("realclients = %d; stalledclients = %d", realclients, stalledclients);
|
|
|
}
|
|
|
|
|
|
|
|
|
realclients += newclients;
|
|
|
newclients = 0;
|
|
|
- master_resetsem();
|
|
|
+ master_resetsem(slot);
|
|
|
+
|
|
|
+ return slot;
|
|
|
}
|
|
|
|
|
|
void
|
|
|
master_permitread()
|
|
|
{
|
|
|
+ int slot = ++shm->seqnr % SYMUX_SHARESLOTS;
|
|
|
union semun semarg;
|
|
|
|
|
|
- shm->seqnr++;
|
|
|
-
|
|
|
semarg.val = realclients;
|
|
|
|
|
|
- if (semctl(semid, SEM_WAIT, SETVAL, semarg) != 0)
|
|
|
- fatal("%s:%d: internal error: cannot reset semaphores",
|
|
|
- __FILE__, __LINE__);
|
|
|
+ if (semctl(semid, slot, SETVAL, semarg) != 0)
|
|
|
+ fatal("%s:%d: internal error: cannot set semaphore %d",
|
|
|
+ __FILE__, __LINE__, slot);
|
|
|
}
|
|
|
|
|
|
-void
|
|
|
+int
|
|
|
client_waitread()
|
|
|
{
|
|
|
+ int slot = ++seqnr % SYMUX_SHARESLOTS;
|
|
|
struct sembuf sops;
|
|
|
|
|
|
+ if (seqnr < (shm->seqnr - SYMUX_SHARESLOTS - 1)) {
|
|
|
+ close(clientsock);
|
|
|
+ fatal("%s:%d: client(%d) lagging behind (%d, %d) = high load?",
|
|
|
+ __FILE__, __LINE__, clientpid, seqnr, shm->seqnr);
|
|
|
+ }
|
|
|
+
|
|
|
check_sem();
|
|
|
|
|
|
- sops.sem_num = SEM_WAIT;
|
|
|
+ sops.sem_num = slot;
|
|
|
sops.sem_op = -1;
|
|
|
sops.sem_flg = 0;
|
|
|
|
|
@@ -234,30 +233,12 @@ client_waitread()
|
|
|
fatal("%s:%d: internal error: client(%d): cannot obtain semaphore (%.200s)",
|
|
|
__FILE__, __LINE__, clientpid, strerror(errno));
|
|
|
|
|
|
- seqnr = shm->seqnr;
|
|
|
+ return slot;
|
|
|
}
|
|
|
|
|
|
void
|
|
|
client_doneread()
|
|
|
{
|
|
|
- struct sembuf sops;
|
|
|
-
|
|
|
- check_sem();
|
|
|
-
|
|
|
- if (seqnr != shm->seqnr) {
|
|
|
- close(clientsock);
|
|
|
- fatal("%s:%d: client(%d) lagging behind (%d, %d) = high load?",
|
|
|
- __FILE__, __LINE__, clientpid, seqnr, shm->seqnr);
|
|
|
- }
|
|
|
-
|
|
|
- sops.sem_num = SEM_READ;
|
|
|
- sops.sem_op = 1;
|
|
|
- sops.sem_flg = IPC_NOWAIT;
|
|
|
-
|
|
|
- if (semop(semid, &sops, 1) != 0)
|
|
|
- fatal("%s:%d: internal error: cannot release semaphore (%.200s)",
|
|
|
- __FILE__, __LINE__, strerror(errno));
|
|
|
-
|
|
|
|
|
|
sleep(1);
|
|
|
}
|
|
@@ -272,19 +253,22 @@ client_signalhandler(int s)
|
|
|
void
|
|
|
initshare(int bufsize)
|
|
|
{
|
|
|
+ int i;
|
|
|
+ int totalsize;
|
|
|
+
|
|
|
newclients = 0;
|
|
|
realclients = 0;
|
|
|
master = 1;
|
|
|
|
|
|
|
|
|
- bufsize += sizeof(struct sharedregion);
|
|
|
+ totalsize = (bufsize * SYMUX_SHARESLOTS) + sizeof(struct sharedregion);
|
|
|
|
|
|
|
|
|
shmstat = semstat = SIPC_FREE;
|
|
|
|
|
|
atexit(exitmaster);
|
|
|
|
|
|
- if ((shmid = shmget(IPC_PRIVATE, bufsize, SHM_R | SHM_W)) < 0)
|
|
|
+ if ((shmid = shmget(IPC_PRIVATE, totalsize, SHM_R | SHM_W)) < 0)
|
|
|
fatal("could not get a shared memory identifier");
|
|
|
|
|
|
shmstat = SIPC_KEYED;
|
|
@@ -293,16 +277,18 @@ initshare(int bufsize)
|
|
|
fatal("could not attach shared memory");
|
|
|
|
|
|
shmstat = SIPC_ATTACHED;
|
|
|
- bzero(shm, bufsize);
|
|
|
- shm->reglen = bufsize;
|
|
|
+ bzero(shm, totalsize);
|
|
|
+ debug("shm from 0x%8x to 0x%8x", shm, shm + totalsize);
|
|
|
+ shm->slotlen = bufsize;
|
|
|
|
|
|
|
|
|
- if ((semid = semget(IPC_PRIVATE, SEM_TOTAL, SEM_ARGS)) < 0)
|
|
|
+ if ((semid = semget(IPC_PRIVATE, SYMUX_SHARESLOTS, SEM_ARGS)) < 0)
|
|
|
fatal("could not get a semaphore");
|
|
|
|
|
|
semstat = SIPC_KEYED;
|
|
|
|
|
|
- master_resetsem();
|
|
|
+ for (i = 0; i < SYMUX_SHARESLOTS; i++)
|
|
|
+ master_resetsem(i);
|
|
|
}
|
|
|
|
|
|
pid_t
|
|
@@ -334,6 +320,7 @@ spawn_client(int sock)
|
|
|
} else {
|
|
|
|
|
|
master = 0;
|
|
|
+ seqnr = shm->seqnr;
|
|
|
|
|
|
|
|
|
signal(SIGHUP, client_signalhandler);
|
|
@@ -424,29 +411,27 @@ exitmaster()
|
|
|
void
|
|
|
client_loop()
|
|
|
{
|
|
|
+ int slot;
|
|
|
int total;
|
|
|
int sent;
|
|
|
int written;
|
|
|
|
|
|
for (;;) {
|
|
|
|
|
|
- client_waitread();
|
|
|
-
|
|
|
- total = shared_getlen();
|
|
|
+ slot = client_waitread();
|
|
|
+ total = shared_getlen(slot);
|
|
|
sent = 0;
|
|
|
|
|
|
while (sent < total) {
|
|
|
|
|
|
- if ((written = write(clientsock, (char *) (shared_getmem() + sent), total - sent)) == -1) {
|
|
|
+ if ((written = write(clientsock, (char *) (shared_getmem(slot) + sent), total - sent)) == -1) {
|
|
|
info("client(%d): write error. Client will quit.", clientpid);
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
|
sent += written;
|
|
|
|
|
|
- debug("client(%d): written %d bytes of %d total", clientpid, sent, total);
|
|
|
+ debug("client(%d): written %d bytes of %d total from slot %d", clientpid, sent, total, slot);
|
|
|
}
|
|
|
-
|
|
|
- client_doneread();
|
|
|
}
|
|
|
}
|