aiops_win32.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 43 Windows AIOPS */
10
11#include "squid.h"
13#include "DiskThreads.h"
14#include "fd.h"
15#include "mem/Pool.h"
16#include "SquidConfig.h"
17#include "Store.h"
18
19#include <cerrno>
20#include <csignal>
21#include <sys/stat.h>
22#include <fcntl.h>
23#include <dirent.h>
24
25#define RIDICULOUS_LENGTH 4096
26
33};
35
36typedef struct squidaio_request_t {
37
40 int cancelled;
41 char *path;
42 int oflag;
44 int fd;
45 char *bufferp;
46 char *tmpbufp;
47 size_t buflen;
48 off_t offset;
49 int whence;
50 int ret;
51 int err;
52
53 struct stat *tmpstatp;
54
55 struct stat *statp;
58
59typedef struct squidaio_request_queue_t {
60 HANDLE mutex;
61 HANDLE cond; /* See Event objects */
62 squidaio_request_t *volatile head;
63 squidaio_request_t *volatile *volatile tailp;
64 unsigned long requests;
65 unsigned long blocked; /* main failed to lock the queue */
67
69
70struct squidaio_thread_t {
72 HANDLE thread;
73 DWORD dwThreadId; /* thread ID */
75
77 unsigned long requests;
78 int volatile exit;
79};
80
83static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
90#if AIO_OPENDIR
91static void *squidaio_do_opendir(squidaio_request_t *);
92#endif
94static void squidaio_poll_queues(void);
95
96static squidaio_thread_t *threads = nullptr;
97static int squidaio_initialised = 0;
98
99#define AIO_LARGE_BUFS 16384
100#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
101#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
102#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
103#define AIO_MICRO_BUFS 128
104
105static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
106static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
107static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
108static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
109static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
110
111static int request_queue_len = 0;
115
116static struct {
118}
119
121
122 nullptr, &request_queue2.head
125
126static struct {
128}
129
131
132 nullptr, &done_requests.head
134
135static HANDLE main_thread;
136
137static Mem::Allocator *
139{
140 if (size <= AIO_LARGE_BUFS) {
141 if (size <= AIO_MICRO_BUFS)
142 return squidaio_micro_bufs;
143 else if (size <= AIO_TINY_BUFS)
144 return squidaio_tiny_bufs;
145 else if (size <= AIO_SMALL_BUFS)
146 return squidaio_small_bufs;
147 else if (size <= AIO_MEDIUM_BUFS)
149 else
150 return squidaio_large_bufs;
151 }
152
153 return nullptr;
154}
155
156void *
158{
159 void *p;
160 if (const auto pool = squidaio_get_pool(size)) {
161 p = pool->alloc();
162 } else
163 p = xmalloc(size);
164
165 return p;
166}
167
168static char *
169squidaio_xstrdup(const char *str)
170{
171 char *p;
172 int len = strlen(str) + 1;
173
174 p = (char *)squidaio_xmalloc(len);
175 strncpy(p, str, len);
176
177 return p;
178}
179
180void
181squidaio_xfree(void *p, int size)
182{
183 if (const auto pool = squidaio_get_pool(size)) {
184 pool->freeOne(p);
185 } else
186 xfree(p);
187}
188
189static void
191{
192 int len = strlen(str) + 1;
193
194 if (const auto pool = squidaio_get_pool(len)) {
195 pool->freeOne(str);
196 } else
197 xfree(str);
198}
199
200void
202{
203 int i;
204 squidaio_thread_t *threadp;
205
207 return;
208
209 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
210 GetCurrentThread(), /* pseudo handle to copy */
211 GetCurrentProcess(), /* pseudo handle, don't close */
213 0, /* required access */
214 FALSE, /* child process's don't inherit the handle */
215 DUPLICATE_SAME_ACCESS)) {
216 /* spit errors */
217 fatal("Couldn't get current thread handle");
218 }
219
220 /* Initialize request queue */
221 if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
222 FALSE, /* start unowned (as per mutex_init) */
223 nullptr) /* no name */
224 ) == NULL) {
225 fatal("Failed to create mutex");
226 }
227
228 if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
229 FALSE, /* auto signal reset - which I think is pthreads like ? */
230 FALSE, /* start non signaled */
231 nullptr) /* no name */
232 ) == NULL) {
233 fatal("Failed to create condition variable");
234 }
235
236 request_queue.head = nullptr;
237
239
241
243
244 /* Initialize done queue */
245
246 if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
247 FALSE, /* start unowned (as per mutex_init) */
248 nullptr) /* no name */
249 ) == NULL) {
250 fatal("Failed to create mutex");
251 }
252
253 if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
254 TRUE, /* manually signaled - which I think is pthreads like ? */
255 FALSE, /* start non signaled */
256 nullptr) /* no name */
257 ) == NULL) {
258 fatal("Failed to create condition variable");
259 }
260
261 done_queue.head = nullptr;
262
264
266
268
269 // Initialize the thread I/O pipes before creating any threads
270 // see bug 3189 comment 5 about race conditions.
272
273 /* Create threads and get them to sit in their wait loop */
275
276 assert(NUMTHREADS > 0);
277
278 for (i = 0; i < NUMTHREADS; ++i) {
280 threadp->status = _THREAD_STARTING;
281 threadp->current_req = nullptr;
282 threadp->requests = 0;
283 threadp->next = threads;
284 threads = threadp;
285
286 if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
287 0, /* use default stack size */
288 squidaio_thread_loop, /* thread function */
289 threadp, /* argument to thread function */
290 0, /* use default creation flags */
291 &(threadp->dwThreadId)) /* returns the thread identifier */
292 ) == NULL) {
293 fprintf(stderr, "Thread creation failed\n");
294 threadp->status = _THREAD_FAILED;
295 continue;
296 }
297
298 /* Set the new thread priority above parent process */
299 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
300 }
301
302 /* Create request pool */
304
305 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
306
307 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
308
309 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
310
311 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
312
313 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
314
316}
317
318void
320{
321 squidaio_thread_t *threadp;
322 int i;
323 HANDLE * hthreads;
324
326 return;
327
328 /* This is the same as in squidaio_sync */
329 do {
331 } while (request_queue_len > 0);
332
333 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
334
335 threadp = threads;
336
337 for (i = 0; i < NUMTHREADS; ++i) {
338 threadp->exit = 1;
339 hthreads[i] = threadp->thread;
340 threadp = threadp->next;
341 }
342
343 ReleaseMutex(request_queue.mutex);
344 ResetEvent(request_queue.cond);
345 ReleaseMutex(done_queue.mutex);
346 ResetEvent(done_queue.cond);
347 Sleep(0);
348
349 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
350
351 for (i = 0; i < NUMTHREADS; ++i) {
352 CloseHandle(hthreads[i]);
353 }
354
355 CloseHandle(main_thread);
357
359 xfree(hthreads);
360}
361
362static DWORD WINAPI
363squidaio_thread_loop(LPVOID lpParam)
364{
365 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
366 squidaio_request_t *request;
367 HANDLE cond; /* local copy of the event queue because win32 event handles
368 * don't atomically release the mutex as cond variables do. */
369
370 /* lock the thread info */
371
372 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
373 fatal("Can't get ownership of mutex\n");
374 }
375
376 /* duplicate the handle */
377 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
378 request_queue.cond, /* handle to copy */
379 GetCurrentProcess(), /* pseudo handle, don't close */
380 &cond,
381 0, /* required access */
382 FALSE, /* child process's don't inherit the handle */
383 DUPLICATE_SAME_ACCESS))
384 fatal("Can't duplicate mutex handle\n");
385
386 if (!ReleaseMutex(request_queue.mutex)) {
387 CloseHandle(cond);
388 fatal("Can't release mutex\n");
389 }
390
391 Sleep(0);
392
393 while (1) {
394 DWORD rv;
395 threadp->current_req = request = nullptr;
396 request = nullptr;
397 /* Get a request to process */
398 threadp->status = _THREAD_WAITING;
399
400 if (threadp->exit) {
401 CloseHandle(request_queue.mutex);
402 CloseHandle(cond);
403 return 0;
404 }
405
406 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
407
408 if (rv == WAIT_FAILED) {
409 CloseHandle(cond);
410 return 1;
411 }
412
413 while (!request_queue.head) {
414 if (!ReleaseMutex(request_queue.mutex)) {
415 CloseHandle(cond);
416 threadp->status = _THREAD_FAILED;
417 return 1;
418 }
419
420 Sleep(0);
421 rv = WaitForSingleObject(cond, INFINITE);
422
423 if (rv == WAIT_FAILED) {
424 CloseHandle(cond);
425 return 1;
426 }
427
428 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
429
430 if (rv == WAIT_FAILED) {
431 CloseHandle(cond);
432 return 1;
433 }
434 }
435
436 request = request_queue.head;
437
438 if (request)
439 request_queue.head = request->next;
440
441 if (!request_queue.head)
443
444 if (!ReleaseMutex(request_queue.mutex)) {
445 CloseHandle(cond);
446 return 1;
447 }
448
449 Sleep(0);
450
451 /* process the request */
452 threadp->status = _THREAD_BUSY;
453
454 request->next = nullptr;
455
456 threadp->current_req = request;
457
458 errno = 0;
459
460 if (!request->cancelled) {
461 switch (request->request_type) {
462
463 case _AIO_OP_OPEN:
464 squidaio_do_open(request);
465 break;
466
467 case _AIO_OP_READ:
468 squidaio_do_read(request);
469 break;
470
471 case _AIO_OP_WRITE:
472 squidaio_do_write(request);
473 break;
474
475 case _AIO_OP_CLOSE:
476 squidaio_do_close(request);
477 break;
478
479 case _AIO_OP_UNLINK:
480 squidaio_do_unlink(request);
481 break;
482
483#if AIO_OPENDIR /* Opendir not implemented yet */
484
485 case _AIO_OP_OPENDIR:
486 squidaio_do_opendir(request);
487 break;
488#endif
489
490 case _AIO_OP_STAT:
491 squidaio_do_stat(request);
492 break;
493
494 default:
495 request->ret = -1;
496 request->err = EINVAL;
497 break;
498 }
499 } else { /* cancelled */
500 request->ret = -1;
501 request->err = EINTR;
502 }
503
504 threadp->status = _THREAD_DONE;
505 /* put the request in the done queue */
506 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
507
508 if (rv == WAIT_FAILED) {
509 CloseHandle(cond);
510 return 1;
511 }
512
513 *done_queue.tailp = request;
514 done_queue.tailp = &request->next;
515
516 if (!ReleaseMutex(done_queue.mutex)) {
517 CloseHandle(cond);
518 return 1;
519 }
520
522 Sleep(0);
523 ++ threadp->requests;
524 } /* while forever */
525
526 CloseHandle(cond);
527
528 return 0;
529} /* squidaio_thread_loop */
530
531static void
533{
534 static int high_start = 0;
535 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
536 /* Mark it as not executed (failing result, no error) */
537 request->ret = -1;
538 request->err = 0;
539 /* Internal housekeeping */
541 request->resultp->_data = request;
542 /* Play some tricks with the request_queue2 queue */
543 request->next = nullptr;
544
545 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
546 if (request_queue2.head) {
547 /* Grab blocked requests */
550 }
551
552 /* Enqueue request */
553 *request_queue.tailp = request;
554
555 request_queue.tailp = &request->next;
556
557 if (!SetEvent(request_queue.cond))
558 fatal("Couldn't push queue");
559
560 if (!ReleaseMutex(request_queue.mutex)) {
561 /* unexpected error */
562 fatal("Couldn't push queue");
563 }
564
565 Sleep(0);
566
567 if (request_queue2.head) {
568 /* Clear queue of blocked requests */
569 request_queue2.head = nullptr;
570 request_queue2.tailp = &request_queue2.head;
571 }
572 } else {
573 /* Oops, the request queue is blocked, use request_queue2 */
574 *request_queue2.tailp = request;
575 request_queue2.tailp = &request->next;
576 }
577
578 if (request_queue2.head) {
579 static uint64_t filter = 0;
580 static uint64_t filter_limit = 8196;
581
582 if (++filter >= filter_limit) {
583 filter_limit += filter;
584 filter = 0;
585 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
586 }
587 }
588
589 /* Warn if out of threads */
591 static int last_warn = 0;
592 static int queue_high, queue_low;
593
594 if (high_start == 0) {
595 high_start = (int)squid_curtime;
596 queue_high = request_queue_len;
597 queue_low = request_queue_len;
598 }
599
600 if (request_queue_len > queue_high)
601 queue_high = request_queue_len;
602
603 if (request_queue_len < queue_low)
604 queue_low = request_queue_len;
605
606 if (squid_curtime >= (last_warn + 15) &&
607 squid_curtime >= (high_start + 5)) {
608 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
609
610 if (squid_curtime >= (high_start + 15))
611 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
612 request_queue_len << ", high=" << queue_high <<
613 ", low=" << queue_low << ", duration=" <<
614 (long int) (squid_curtime - high_start));
615
616 last_warn = (int)squid_curtime;
617 }
618 } else {
619 high_start = 0;
620 }
621
622 /* Warn if seriously overloaded */
624 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
625 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
627 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
628 }
629} /* squidaio_queue_request */
630
631static void
633{
634 squidaio_result_t *resultp = requestp->resultp;
635 int cancelled = requestp->cancelled;
636
637 /* Free allocated structures and copy data back to user space if the */
638 /* request hasn't been cancelled */
639
640 switch (requestp->request_type) {
641
642 case _AIO_OP_STAT:
643
644 if (!cancelled && requestp->ret == 0)
645 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
646
647 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
648
649 squidaio_xstrfree(requestp->path);
650
651 break;
652
653 case _AIO_OP_OPEN:
654 if (cancelled && requestp->ret >= 0)
655 /* The open() was cancelled but completed */
656 close(requestp->ret);
657
658 squidaio_xstrfree(requestp->path);
659
660 break;
661
662 case _AIO_OP_CLOSE:
663 if (cancelled && requestp->ret < 0)
664 /* The close() was cancelled and never got executed */
665 close(requestp->fd);
666
667 break;
668
669 case _AIO_OP_UNLINK:
670
671 case _AIO_OP_OPENDIR:
672 squidaio_xstrfree(requestp->path);
673
674 break;
675
676 case _AIO_OP_READ:
677 break;
678
679 case _AIO_OP_WRITE:
680 break;
681
682 default:
683 break;
684 }
685
686 if (resultp != NULL && !cancelled) {
687 resultp->aio_return = requestp->ret;
688 resultp->aio_errno = requestp->err;
689 }
690
692} /* squidaio_cleanup_request */
693
694int
696{
697 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
698
699 if (request && request->resultp == resultp) {
700 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
701 request->cancelled = 1;
702 request->resultp = nullptr;
703 resultp->_data = nullptr;
704 resultp->result_type = _AIO_OP_NONE;
705 return 0;
706 }
707
708 return 1;
709} /* squidaio_cancel */
710
711int
712squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
713{
715 squidaio_request_t *requestp;
716
718
719 requestp->path = (char *) squidaio_xstrdup(path);
720
721 requestp->oflag = oflag;
722
723 requestp->mode = mode;
724
725 requestp->resultp = resultp;
726
727 requestp->request_type = _AIO_OP_OPEN;
728
729 requestp->cancelled = 0;
730
731 resultp->result_type = _AIO_OP_OPEN;
732
733 squidaio_queue_request(requestp);
734
735 return 0;
736}
737
738static void
740{
741 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
742 requestp->err = errno;
743}
744
745int
746squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
747{
748 squidaio_request_t *requestp;
749
751
752 requestp->fd = fd;
753
754 requestp->bufferp = bufp;
755
756 requestp->buflen = bufs;
757
758 requestp->offset = offset;
759
760 requestp->whence = whence;
761
762 requestp->resultp = resultp;
763
764 requestp->request_type = _AIO_OP_READ;
765
766 requestp->cancelled = 0;
767
768 resultp->result_type = _AIO_OP_READ;
769
770 squidaio_queue_request(requestp);
771
772 return 0;
773}
774
775static void
777{
778 lseek(requestp->fd, requestp->offset, requestp->whence);
779
780 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
781 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
782 WIN32_maperror(GetLastError());
783 requestp->ret = -1;
784 }
785
786 requestp->err = errno;
787}
788
789int
790squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
791{
792 squidaio_request_t *requestp;
793
795
796 requestp->fd = fd;
797
798 requestp->bufferp = bufp;
799
800 requestp->buflen = bufs;
801
802 requestp->offset = offset;
803
804 requestp->whence = whence;
805
806 requestp->resultp = resultp;
807
808 requestp->request_type = _AIO_OP_WRITE;
809
810 requestp->cancelled = 0;
811
812 resultp->result_type = _AIO_OP_WRITE;
813
814 squidaio_queue_request(requestp);
815
816 return 0;
817}
818
819static void
821{
822 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
823 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
824 WIN32_maperror(GetLastError());
825 requestp->ret = -1;
826 }
827
828 requestp->err = errno;
829}
830
831int
833{
834 squidaio_request_t *requestp;
835
837
838 requestp->fd = fd;
839
840 requestp->resultp = resultp;
841
842 requestp->request_type = _AIO_OP_CLOSE;
843
844 requestp->cancelled = 0;
845
846 resultp->result_type = _AIO_OP_CLOSE;
847
848 squidaio_queue_request(requestp);
849
850 return 0;
851}
852
853static void
855{
856 if ((requestp->ret = close(requestp->fd)) < 0) {
857 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
858 close(requestp->fd);
859 }
860
861 requestp->err = errno;
862}
863
864int
865
866squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
867{
869 squidaio_request_t *requestp;
870
872
873 requestp->path = (char *) squidaio_xstrdup(path);
874
875 requestp->statp = sb;
876
877 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
878
879 requestp->resultp = resultp;
880
881 requestp->request_type = _AIO_OP_STAT;
882
883 requestp->cancelled = 0;
884
885 resultp->result_type = _AIO_OP_STAT;
886
887 squidaio_queue_request(requestp);
888
889 return 0;
890}
891
892static void
894{
895 requestp->ret = stat(requestp->path, requestp->tmpstatp);
896 requestp->err = errno;
897}
898
899int
900squidaio_unlink(const char *path, squidaio_result_t * resultp)
901{
903 squidaio_request_t *requestp;
904
906
907 requestp->path = squidaio_xstrdup(path);
908
909 requestp->resultp = resultp;
910
911 requestp->request_type = _AIO_OP_UNLINK;
912
913 requestp->cancelled = 0;
914
915 resultp->result_type = _AIO_OP_UNLINK;
916
917 squidaio_queue_request(requestp);
918
919 return 0;
920}
921
922static void
924{
925 requestp->ret = unlink(requestp->path);
926 requestp->err = errno;
927}
928
929#if AIO_OPENDIR
930/* XXX squidaio_opendir NOT implemented yet.. */
931
932int
933squidaio_opendir(const char *path, squidaio_result_t * resultp)
934{
935 squidaio_request_t *requestp;
936 int len;
937
938 requestp = squidaio_request_pool->alloc();
939
940 resultp->result_type = _AIO_OP_OPENDIR;
941
942 return -1;
943}
944
945static void
946squidaio_do_opendir(squidaio_request_t * requestp)
947{
948 /* NOT IMPLEMENTED */
949}
950
951#endif
952
953static void
955{
956 /* kick "overflow" request queue */
957
958 if (request_queue2.head &&
959 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
962
963 if (!SetEvent(request_queue.cond))
964 fatal("couldn't push queue\n");
965
966 if (!ReleaseMutex(request_queue.mutex)) {
967 /* unexpected error */
968 }
969
970 Sleep(0);
971 request_queue2.head = nullptr;
972 request_queue2.tailp = &request_queue2.head;
973 }
974
975 /* poll done queue */
976 if (done_queue.head &&
977 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
978
979 struct squidaio_request_t *requests = done_queue.head;
980 done_queue.head = nullptr;
982
983 if (!ReleaseMutex(done_queue.mutex)) {
984 /* unexpected error */
985 }
986
987 Sleep(0);
988 *done_requests.tailp = requests;
990
991 while (requests->next) {
992 requests = requests->next;
994 }
995
996 done_requests.tailp = &requests->next;
997 }
998}
999
1002{
1003 squidaio_request_t *request;
1005 int cancelled;
1006 int polled = 0;
1007
1008AIO_REPOLL:
1009 request = done_requests.head;
1010
1011 if (request == NULL && !polled) {
1014 polled = 1;
1015 request = done_requests.head;
1016 }
1017
1018 if (!request) {
1019 return nullptr;
1020 }
1021
1022 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1023 done_requests.head = request->next;
1024
1025 if (!done_requests.head)
1026 done_requests.tailp = &done_requests.head;
1027
1028 resultp = request->resultp;
1029
1030 cancelled = request->cancelled;
1031
1032 squidaio_debug(request);
1033
1034 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1035
1036 squidaio_cleanup_request(request);
1037
1038 if (cancelled)
1039 goto AIO_REPOLL;
1040
1041 return resultp;
1042} /* squidaio_poll_done */
1043
1044int
1046{
1047 return request_queue_len + (done_requests.head ? 1 : 0);
1048}
1049
1050int
1052{
1053 /* XXX This might take a while if the queue is large.. */
1054
1055 do {
1057 } while (request_queue_len > 0);
1058
1060}
1061
1062int
1064{
1065 return request_queue_len;
1066}
1067
1068static void
1070{
1071 switch (request->request_type) {
1072
1073 case _AIO_OP_OPEN:
1074 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1075 break;
1076
1077 case _AIO_OP_READ:
1078 debugs(43, 5, "READ on fd: " << request->fd);
1079 break;
1080
1081 case _AIO_OP_WRITE:
1082 debugs(43, 5, "WRITE on fd: " << request->fd);
1083 break;
1084
1085 case _AIO_OP_CLOSE:
1086 debugs(43, 5, "CLOSE of fd: " << request->fd);
1087 break;
1088
1089 case _AIO_OP_UNLINK:
1090 debugs(43, 5, "UNLINK of " << request->path);
1091 break;
1092
1093 default:
1094 break;
1095 }
1096}
1097
1098void
1100{
1101 squidaio_thread_t *threadp;
1102 int i;
1103
1105 return;
1106
1107 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1108
1109 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1110
1111 threadp = threads;
1112
1113 for (i = 0; i < NUMTHREADS; ++i) {
1114 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1115 threadp = threadp->next;
1116 }
1117}
1118
int squidaio_opendir(const char *, squidaio_result_t *)
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
@ _AIO_OP_READ
Definition: DiskThreads.h:48
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
#define MAGIC1
Definition: DiskThreads.h:34
#define NUMTHREADS
Definition: DiskThreads.h:30
int size
Definition: ModDevPoll.cc:75
time_t squid_curtime
Definition: stub_libtime.cc:20
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
_squidaio_thread_status
Definition: aiops.cc:40
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
static void squidaio_debug(squidaio_request_t *)
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:866
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:102
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:900
int squidaio_operations_pending(void)
squidaio_request_t * head
Definition: aiops_win32.cc:117
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:114
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:632
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:954
static Mem::Allocator * squidaio_small_bufs
Definition: aiops_win32.cc:107
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:100
static Mem::Allocator * squidaio_large_bufs
Definition: aiops_win32.cc:105
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops_win32.cc:106
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:138
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:169
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:893
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:712
static squidaio_thread_t * threads
Definition: aiops_win32.cc:96
struct squidaio_request_t squidaio_request_t
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:739
void squidaio_init(void)
Definition: aiops_win32.cc:201
static Mem::Allocator * squidaio_request_pool
Definition: aiops_win32.cc:112
@ _THREAD_BUSY
Definition: aiops_win32.cc:30
@ _THREAD_FAILED
Definition: aiops_win32.cc:31
@ _THREAD_DONE
Definition: aiops_win32.cc:32
@ _THREAD_WAITING
Definition: aiops_win32.cc:29
@ _THREAD_STARTING
Definition: aiops_win32.cc:28
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:25
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:124
static struct @45 request_queue2
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:746
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:99
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:190
static HANDLE main_thread
Definition: aiops_win32.cc:135
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:854
void squidaio_stats(StoreEntry *sentry)
void squidaio_shutdown(void)
Definition: aiops_win32.cc:319
static struct @46 done_requests
static Mem::Allocator * squidaio_thread_pool
Definition: aiops_win32.cc:113
squidaio_result_t * squidaio_poll_done(void)
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:108
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:34
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:103
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops_win32.cc:109
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:532
squidaio_request_t ** tailp
Definition: aiops_win32.cc:117
struct squidaio_request_queue_t squidaio_request_queue_t
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:776
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:832
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:790
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:363
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:820
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:181
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:923
int squidaio_sync(void)
int squidaio_get_queue_len(void)
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:695
static int request_queue_len
Definition: aiops_win32.cc:111
static int squidaio_initialised
Definition: aiops_win32.cc:97
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:157
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:101
#define assert(EX)
Definition: assert.h:17
static void NotifyIOCompleted()
Definition: CommIO.h:36
static void NotifyIOClose()
Definition: CommIO.cc:38
static void ResetNotifications()
Definition: CommIO.cc:69
static void Initialize()
Definition: CommIO.cc:19
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
#define DBG_IMPORTANT
Definition: Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
#define DBG_CRITICAL
Definition: Stream.h:37
void fatal(const char *message)
Definition: fatal.cc:28
#define xfree
#define xmalloc
static struct stat sb
Definition: squidclient.cc:71
#define TRUE
Definition: std-includes.h:55
#define FALSE
Definition: std-includes.h:56
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:841
unsigned long blocked
Definition: aiops.cc:77
squidaio_request_t *volatile head
Definition: aiops.cc:74
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
pthread_cond_t cond
Definition: aiops.cc:73
pthread_mutex_t mutex
Definition: aiops.cc:72
unsigned long requests
Definition: aiops.cc:76
squidaio_result_t * resultp
Definition: aiops.cc:68
struct stat * tmpstatp
Definition: aiops.cc:65
struct squidaio_request_t * next
Definition: aiops.cc:51
struct stat * statp
Definition: aiops.cc:67
size_t buflen
Definition: aiops.cc:59
squidaio_request_type request_type
Definition: aiops.cc:52
char * bufferp
Definition: aiops.cc:58
squidaio_thread_t * next
Definition: aiops.cc:83
pthread_t thread
Definition: aiops.cc:84
struct squidaio_request_t * current_req
Definition: aiops.cc:87
int volatile exit
Definition: aiops_win32.cc:78
squidaio_thread_status status
Definition: aiops.cc:85
unsigned long requests
Definition: aiops.cc:88
int unsigned int
Definition: stub_fd.cc:19
#define NULL
Definition: types.h:145
unsigned short mode_t
Definition: types.h:129
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors