Index: src/helper.c =================================================================== RCS file: /cvsroot/squid/squid/src/helper.c,v retrieving revision 1.1.1.3.12.7 diff -c -r1.1.1.3.12.7 helper.c *** src/helper.c 2000/08/10 12:39:17 1.1.1.3.12.7 --- src/helper.c 2000/09/19 17:17:41 *************** *** 54,60 **** --- 54,143 ---- static void helperRequestFree(helper_request * r); static void helperStatefulRequestFree(helper_stateful_request * r); + static void helperStatefulShutdownServers(helper * hlp, int count); + static void helperShutdownServers(helper * hlp, int count); + /* + * called by helperOpenServers and ... queue detection thingy. helperOpenServers is responsible for sanity + * checking the parameters. This avoids repeating the checks when new helpers are spawned + * todo: change n_to_start to max_allowed. + srv->index = k needs to change to be n_running + k + */ + void + helperSpawnServers(helper *hlp, int count) + { + char *s; + char *progname; + char *shortname; + char *procname; + char *args[HELPER_MAX_ARGS]; + char fd_note_buf[FD_DESC_SZ]; + helper_server *srv; + int nargs = 0; + int k; + int x; + int rfd; + int wfd; + wordlist *w; + progname = hlp->cmdline->key; + if ((s = strrchr(progname, '/'))) + shortname = xstrdup(s + 1); + else + shortname = xstrdup(progname); + debug(29, 1) ("helperSpawnServers: Starting %d '%s' processes\n", + count, shortname); + procname = xmalloc(strlen(shortname) + 3); + snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); + args[nargs++] = procname; + for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) + args[nargs++] = w->key; + args[nargs++] = NULL; + assert(nargs <= HELPER_MAX_ARGS); + for (k = 0; k < count; k++) { + getCurrentTime(); + rfd = wfd = -1; + x = ipcCreate(hlp->ipc_type, + progname, + args, + shortname, + &rfd, + &wfd); + if (x < 0) { + debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname); + continue; + } + hlp->n_running++; + srv = memAllocate(MEM_HELPER_SERVER); + cbdataAdd(srv, memFree, MEM_HELPER_SERVER); + srv->flags.alive = 1; + srv->index = hlp->n_running-1; + srv->rfd = rfd; + srv->wfd = wfd; + srv->buf = memAllocate(MEM_8K_BUF); + srv->buf_sz = 8192; + srv->offset = 0; + srv->parent = hlp; + cbdataLock(hlp); /* lock because of the parent backlink */ + dlinkAddTail(srv, &srv->link, &hlp->servers); + if (rfd == wfd) { + snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); + fd_note(rfd, fd_note_buf); + } else { + snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1); + fd_note(rfd, fd_note_buf); + snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1); + fd_note(wfd, fd_note_buf); + } + commSetNonBlocking(rfd); + if (wfd != rfd) + commSetNonBlocking(wfd); + comm_add_close_handler(rfd, helperServerFree, srv); + } + safe_free(shortname); + safe_free(procname); + helperKickQueue(hlp); + } + void helperOpenServers(helper * hlp) { *************** *** 75,86 **** return; progname = hlp->cmdline->key; if ((s = strrchr(progname, '/'))) ! shortname = xstrdup(s + 1); else ! shortname = xstrdup(progname); ! debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n", ! hlp->n_to_start, shortname); ! procname = xmalloc(strlen(shortname) + 3); snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); args[nargs++] = procname; for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) --- 158,171 ---- return; progname = hlp->cmdline->key; if ((s = strrchr(progname, '/'))) ! shortname = xstrdup(s + 1); else ! shortname = xstrdup(progname); ! debug(29, 1) ("helperOpenServers: Starting %d initial '%s' processes\n", ! (int)(hlp->n_to_start/10+1), shortname); ! helperSpawnServers(hlp,(int)(hlp->n_to_start/10+1)); ! ! /* procname = xmalloc(strlen(shortname) + 3); snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); args[nargs++] = procname; for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) *************** *** 111,117 **** srv->buf_sz = 8192; srv->offset = 0; srv->parent = hlp; ! cbdataLock(hlp); /* lock because of the parent backlink */ dlinkAddTail(srv, &srv->link, &hlp->servers); if (rfd == wfd) { snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); --- 196,202 ---- srv->buf_sz = 8192; srv->offset = 0; srv->parent = hlp; ! cbdataLock(hlp); dlinkAddTail(srv, &srv->link, &hlp->servers); if (rfd == wfd) { snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); *************** *** 127,137 **** commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperServerFree, srv); } safe_free(shortname); safe_free(procname); ! helperKickQueue(hlp); } void helperStatefulOpenServers(statefulhelper * hlp) { --- 212,300 ---- commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperServerFree, srv); } + */ safe_free(shortname); + // safe_free(procname); + // helperKickQueue(hlp); + } + + + void helperStatefulSpawnServers(statefulhelper * hlp, int count) + { + char *s; + char *progname; + char *shortname; + char *procname; + char *args[HELPER_MAX_ARGS]; + char fd_note_buf[FD_DESC_SZ]; + helper_stateful_server *srv; + int nargs = 0; + int k; + int x; + int rfd; + int wfd; + wordlist *w; + if (hlp->cmdline == NULL) + return; + progname = hlp->cmdline->key; + if ((s = strrchr(progname, '/'))) + shortname = xstrdup(s + 1); + else + shortname = xstrdup(progname); + debug(29, 1) ("helperStatefulSpawnServers: Starting %d '%s' processes\n", + count, shortname); + procname = xmalloc(strlen(shortname) + 3); + snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); + args[nargs++] = procname; + for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) + args[nargs++] = w->key; + args[nargs++] = NULL; + assert(nargs <= HELPER_MAX_ARGS); + for (k = 0; k < count; k++) { + getCurrentTime(); + rfd = wfd = -1; + x = ipcCreate(hlp->ipc_type, + progname, + args, + shortname, + &rfd, + &wfd); + if (x < 0) { + debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname); + continue; + } + hlp->n_running++; + srv = memAllocate(MEM_HELPER_STATEFUL_SERVER); + cbdataAdd(srv, memFree, MEM_HELPER_STATEFUL_SERVER); + srv->flags.alive = 1; + srv->index = hlp->n_running-1; + srv->rfd = rfd; + srv->wfd = wfd; + srv->buf = memAllocate(MEM_8K_BUF); + srv->buf_sz = 8192; + srv->offset = 0; + srv->parent = hlp; + cbdataLock(hlp); /* lock because of the parent backlink */ + dlinkAddTail(srv, &srv->link, &hlp->servers); + if (rfd == wfd) { + snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); + fd_note(rfd, fd_note_buf); + } else { + snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1); + fd_note(rfd, fd_note_buf); + snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1); + fd_note(wfd, fd_note_buf); + } + commSetNonBlocking(rfd); + if (wfd != rfd) + commSetNonBlocking(wfd); + comm_add_close_handler(rfd, helperStatefulServerFree, srv); + } safe_free(shortname); safe_free(procname); ! helperStatefulKickQueue(hlp); } + void helperStatefulOpenServers(statefulhelper * hlp) { *************** *** 155,161 **** shortname = xstrdup(s + 1); else shortname = xstrdup(progname); ! debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n", hlp->n_to_start, shortname); procname = xmalloc(strlen(shortname) + 3); snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); --- 318,328 ---- shortname = xstrdup(s + 1); else shortname = xstrdup(progname); ! debug(29, 1) ("helperStatefulOpenServers: Starting %d initial '%s' processes\n", ! (int)(hlp->n_to_start/10+1), shortname); ! helperStatefulSpawnServers(hlp,(int)(hlp->n_to_start/10+1)); ! ! /* debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n", hlp->n_to_start, shortname); procname = xmalloc(strlen(shortname) + 3); snprintf(procname, strlen(shortname) + 3, "(%s)", shortname); *************** *** 188,194 **** srv->buf_sz = 8192; srv->offset = 0; srv->parent = hlp; ! cbdataLock(hlp); /* lock because of the parent backlink */ dlinkAddTail(srv, &srv->link, &hlp->servers); if (rfd == wfd) { snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); --- 355,361 ---- srv->buf_sz = 8192; srv->offset = 0; srv->parent = hlp; ! cbdataLock(hlp); dlinkAddTail(srv, &srv->link, &hlp->servers); if (rfd == wfd) { snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1); *************** *** 204,212 **** commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperStatefulServerFree, srv); } ! safe_free(shortname); ! safe_free(procname); ! helperStatefulKickQueue(hlp); } --- 371,379 ---- commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperStatefulServerFree, srv); } ! */ safe_free(shortname); ! // safe_free(procname); ! // helperStatefulKickQueue(hlp); } *************** *** 224,233 **** r->data = data; r->buf = xstrdup(buf); cbdataLock(r->data); ! if ((srv = GetFirstAvailable(hlp))) helperDispatch(srv, r); else ! Enqueue(hlp, r); debug(29,9) ("helperSubmit: %s\n",buf); } --- 391,414 ---- r->data = data; r->buf = xstrdup(buf); cbdataLock(r->data); ! if ((srv = GetFirstAvailable(hlp))){ helperDispatch(srv, r); + if(hlp->stats.queue_size==0) + { + if (hlp->stats.queue_zero_counter>=hlp->n_running/2) + { + helperShutdownServers(hlp, (int)(hlp->n_running/10)); + hlp->stats.queue_zero_counter=0; + } + else + hlp->stats.queue_zero_counter++; + } + } else ! { ! hlp->stats.queue_zero_counter=0; ! Enqueue(hlp, r); ! } debug(29,9) ("helperSubmit: %s\n",buf); } *************** *** 252,260 **** { if ((srv = StatefulGetFirstAvailable(hlp))){ helperStatefulDispatch(srv, r); } else ! StatefulEnqueue(hlp, r); } debug(29,9) ("helperStatefulSubmit: %s\n",buf); } --- 433,454 ---- { if ((srv = StatefulGetFirstAvailable(hlp))){ helperStatefulDispatch(srv, r); + if(hlp->stats.queue_size==0) + { + if (hlp->stats.queue_zero_counter>=hlp->n_running/2) + { + helperStatefulShutdownServers(hlp, (int)(hlp->n_running/10)); + hlp->stats.queue_zero_counter=0; + } + else + hlp->stats.queue_zero_counter++; + } } else ! { ! hlp->stats.queue_zero_counter=0; ! StatefulEnqueue(hlp, r); ! } } debug(29,9) ("helperStatefulSubmit: %s\n",buf); } *************** *** 376,381 **** --- 570,593 ---- } void + helperShutdownServers(helper * hlp, int count) + { + /* request shutdowns from the top count helpers */ + int n=count; + dlink_node *link = hlp->servers.tail; + helper_server *srv; + if (n>hlp->n_running) + n=hlp->n_running; + while (n!=0) + { + n--; + srv=link->data; + link=link->prev; + srv->flags.shutdown=1; /* request a shutdown even if currently busy */ + } + } + + void helperShutdown(helper * hlp) { dlink_node *link = hlp->servers.head; *************** *** 406,411 **** --- 618,642 ---- } void + helperStatefulShutdownServers(helper * hlp, int count) + { + /* request shutdowns from the top count helpers */ + int n=count; + dlink_node *link = hlp->servers.tail; + helper_stateful_server *srv; + if (n>hlp->n_running) + n=hlp->n_running; + while (n!=0) + { + n--; + srv=link->data; + link=link->prev; + srv->flags.shutdown=1; /* request a shutdown even if currently busy */ + } + } + + + void helperStatefulShutdown(statefulhelper * hlp) { dlink_node *link = hlp->servers.head; *************** *** 661,676 **** hlp->stats.queue_size++; if (hlp->stats.queue_size < hlp->n_running) return; - if (squid_curtime - hlp->last_queue_warn < 600) - return; if (shutting_down || reconfiguring) return; ! hlp->last_queue_warn = squid_curtime; ! debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name); ! debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size); ! if (hlp->stats.queue_size > hlp->n_running * 2) ! fatalf("Too many queued %s requests", hlp->id_name); ! debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); } static void --- 892,916 ---- hlp->stats.queue_size++; if (hlp->stats.queue_size < hlp->n_running) return; if (shutting_down || reconfiguring) return; ! if ((hlp->n_running < hlp->n_to_start) && (hlp->stats.queue_size > hlp->n_running *2)) ! { ! debug(14, 1) ("Spawning new %s processes because of excessive queue length.\n",hlp->id_name); ! helperStatefulSpawnServers(hlp,(int)(hlp->n_running/10+1)); ! } ! else ! { ! if (squid_curtime - hlp->last_queue_warn < 600) ! return; ! hlp->last_queue_warn = squid_curtime; ! debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name); ! debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size); ! debug(14, 0) ("WARNING: cannot increase number of processes.\n"); ! if (hlp->stats.queue_size > (hlp->n_running * 2)) ! fatalf("Too many queued %s requests", hlp->id_name); ! debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); ! } } static void *************** *** 681,696 **** hlp->stats.queue_size++; if (hlp->stats.queue_size < hlp->n_running) return; - if (squid_curtime - hlp->last_queue_warn < 600) - return; if (shutting_down || reconfiguring) return; ! hlp->last_queue_warn = squid_curtime; ! debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name); ! debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size); ! if (hlp->stats.queue_size > hlp->n_running * 2) ! fatalf("Too many queued %s requests", hlp->id_name); ! debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); } --- 921,945 ---- hlp->stats.queue_size++; if (hlp->stats.queue_size < hlp->n_running) return; if (shutting_down || reconfiguring) return; ! if ((hlp->n_running < hlp->n_to_start) && (hlp->stats.queue_size > hlp->n_running *2)) ! { ! debug(14, 1) ("Spawning new %s processes because of excessive queue length.\n",hlp->id_name); ! helperStatefulSpawnServers(hlp,(int)(hlp->n_running/10+1)); ! } ! else ! { ! if (squid_curtime - hlp->last_queue_warn < 600) ! return; ! hlp->last_queue_warn = squid_curtime; ! debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name); ! debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size); ! debug(14, 0) ("WARNING: cannot increase number of processes.\n"); ! if (hlp->stats.queue_size > hlp->n_running * 2) ! fatalf("Too many queued %s requests", hlp->id_name); ! debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); ! } } Index: src/structs.h =================================================================== RCS file: /cvsroot/squid/squid/src/structs.h,v retrieving revision 1.1.1.3.4.1.2.16 diff -c -r1.1.1.3.4.1.2.16 structs.h *** src/structs.h 2000/09/07 21:23:56 1.1.1.3.4.1.2.16 --- src/structs.h 2000/09/19 17:17:45 *************** *** 1881,1886 **** --- 1881,1887 ---- int ipc_type; time_t last_queue_warn; struct { + int queue_zero_counter; int requests; int replies; int queue_size;