Index: src/enums.h =================================================================== RCS file: /server/cvs-server/squid/squid/src/enums.h,v retrieving revision 1.226 diff -u -r1.226 enums.h --- src/enums.h 18 May 2006 06:49:46 -0000 1.226 +++ src/enums.h 20 May 2006 09:40:02 -0000 @@ -644,6 +644,7 @@ STORE_META_VALID, STORE_META_VARY_HEADERS, /* Stores Vary request headers */ STORE_META_STD_LFS, /* standard metadata in lfs format */ + STORE_META_OBJSIZE, /* object size, if its known */ STORE_META_END }; Index: src/store_swapmeta.c =================================================================== RCS file: /server/cvs-server/squid/squid/src/store_swapmeta.c,v retrieving revision 1.18 diff -u -r1.18 store_swapmeta.c --- src/store_swapmeta.c 17 May 2005 16:56:38 -0000 1.18 +++ src/store_swapmeta.c 20 May 2006 09:40:02 -0000 @@ -68,6 +68,7 @@ tlv **T = &TLV; const char *url; const char *vary; + const squid_off_t objsize = objectLen(e); assert(e->mem_obj != NULL); assert(e->swap_status == SWAPOUT_WRITING); url = storeUrl(e); @@ -79,6 +80,9 @@ T = storeSwapTLVAdd(STORE_META_STD_LFS, &e->timestamp, STORE_HDR_METASIZE, T); #endif T = storeSwapTLVAdd(STORE_META_URL, url, strlen(url) + 1, T); + if (objsize > -1) { + T = storeSwapTLVAdd(STORE_META_OBJSIZE, &objsize, sizeof(objsize), T); + } vary = e->mem_obj->vary_headers; if (vary) T = storeSwapTLVAdd(STORE_META_VARY_HEADERS, vary, strlen(vary) + 1, T); Index: src/fs/coss/async_io.c =================================================================== RCS file: /server/cvs-server/squid/squid/src/fs/coss/async_io.c,v retrieving revision 1.12 diff -u -r1.12 async_io.c --- src/fs/coss/async_io.c 17 May 2005 16:56:42 -0000 1.12 +++ src/fs/coss/async_io.c 20 May 2006 09:40:02 -0000 @@ -11,7 +11,7 @@ * supports are read/write, and since COSS works on a single file * per storedir it should work just fine. * - * $Id: async_io.c,v 1.12 2005/05/17 16:56:42 hno Exp $ + * $Id: async_io.c,v 1.7.2.7 2005/03/26 23:40:21 hno Exp $ */ #include "squid.h" @@ -61,6 +61,7 @@ async_queue_entry_t *qe; assert(q->aq_state == AQ_STATE_SETUP); + assert(offset >= 0); /* Find a free slot */ slot = a_file_findslot(q); @@ -109,6 +110,7 @@ async_queue_entry_t *qe; assert(q->aq_state == AQ_STATE_SETUP); + assert(offset >= 0); /* Find a free slot */ slot = a_file_findslot(q); @@ -196,6 +198,9 @@ fd = aqe->aq_e_fd; type = aqe->aq_e_type; + /* debugging assert */ + assert(reterr == 0); + /* Free slot */ memset(aqe, 0, sizeof(async_queue_entry_t)); aqe->aq_e_state = AQ_ENTRY_FREE; Index: src/fs/coss/coss-notes.txt =================================================================== RCS file: /server/cvs-server/squid/squid/src/fs/coss/coss-notes.txt,v retrieving revision 1.2 diff -u -r1.2 coss-notes.txt --- src/fs/coss/coss-notes.txt 17 May 2005 16:56:42 -0000 1.2 +++ src/fs/coss/coss-notes.txt 20 May 2006 09:40:02 -0000 @@ -2,7 +2,7 @@ Adrian Chadd -$Id: coss-notes.txt,v 1.2 2005/05/17 16:56:42 hno Exp $ +$Id: coss-notes.txt,v 1.1.2.1 2003/07/23 21:00:33 wessels Exp $ COSS is a Cyclic Object storage system originally designed by @@ -121,3 +121,11 @@ However, COSS_ALLOC_NOTIFY was still present in the store_dir_coss.c rebuild routines. To avoid assertions during rebuild, I commented out the storeCossAllocate(SD, e, COSS_ALLOC_NOTIFY) call. + +-- Notes: Adrian Chadd, 9/May/2006 + +* The types used by COSS have been modified to support Large file support, + at least under Linux. One can compile with --with-large-files to make + sure the right options have been enabled. no compile or run-time checks + are currently made to ensure the code has been compiled to support + large filesystems.. at least yet. Index: src/fs/coss/store_coss.h =================================================================== RCS file: /server/cvs-server/squid/squid/src/fs/coss/store_coss.h,v retrieving revision 1.7 diff -u -r1.7 store_coss.h --- src/fs/coss/store_coss.h 17 May 2005 16:56:42 -0000 1.7 +++ src/fs/coss/store_coss.h 20 May 2006 09:40:02 -0000 @@ -5,15 +5,50 @@ #define COSS_MEMBUF_SZ 1048576 #endif -/* Note that swap_filen in sio/e are actually disk offsets too! */ +#define COSS_REPORT_INTERVAL 20 + +/* Note that swap_filen in sio/e are actually disk block offsets too! */ + +typedef struct _cossmembuf CossMemBuf; +typedef struct _cossinfo CossInfo; +typedef struct _cossstate CossState; +typedef struct _cossindex CossIndexNode; +typedef struct _coss_pending_reloc CossPendingReloc; +typedef struct _coss_read_op CossReadOp; +typedef struct _cossstripe CossStripe; /* What we're doing in storeCossAllocate() */ #define COSS_ALLOC_NOTIFY 0 #define COSS_ALLOC_ALLOCATE 1 #define COSS_ALLOC_REALLOC 2 +/* + * Define this if you would like to use the aufs IO method for + * disk IO instead of the POSIX AIO method. + */ +#define USE_AUFSOPS 1 + +#if USE_AUFSOPS +/* XXX a hack; the async ops should be broken out! */ +typedef void AIOCB(int fd, void *cbdata, const char *buf, + int aio_return, int aio_errno); +void aioWrite(int, off_t offset, char *, int size, AIOCB *, void *, FREE *); +void aioRead(int, off_t offset, int size, AIOCB *, void *); +void aioInit(void); +int aioCheckCallbacks(SwapDir *); +void aioSync(SwapDir *); +void squidaio_init(void); +void squidaio_shutdown(void); +extern int squidaio_magic1; +int aioQueueSize(void); +extern int squidaio_magic1; +#define MAGIC1 squidaio_magic1 +#endif + + struct _coss_stats { int stripes; + int dead_stripes; struct { int alloc; int realloc; @@ -33,32 +68,91 @@ struct _cossmembuf { dlink_node node; - size_t diskstart; /* in bytes */ - size_t diskend; /* in bytes */ + off_t diskstart; /* in bytes */ + off_t diskend; /* in bytes */ + int stripe; SwapDir *SD; int lockcount; char buffer[COSS_MEMBUF_SZ]; struct _cossmembuf_flags { unsigned int full:1; unsigned int writing:1; + unsigned int written:1; + unsigned int dead:1; } flags; + int numobjs; +}; + +typedef enum { + COSS_OP_NONE, + COSS_OP_READ, +} coss_op_t; + +struct _coss_read_op { + /* + * callback/callback data are part of the sio, and only one + * read op will be scheduled at any time + */ + coss_op_t type; + dlink_node node; /* per-storedir list */ + dlink_node pending_op_node; /* children of the parent op we're blocking on */ + storeIOState *sio; + size_t requestlen; + size_t requestoffset; /* in blocks */ + off_t reqdiskoffset; /* in blocks */ + char *requestbuf; + char completed; + CossPendingReloc *pr; /* NULL if we're not on a pending op list yet */ +}; + +struct _cossstripe { + int id; + int numdiskobjs; + int pending_relocs; + struct _cossmembuf *membuf; + dlink_list objlist; +}; + +struct _coss_pending_reloc { + CossInfo *cs; + dlink_node node; + size_t len; + sfileno original_filen, new_filen; /* in blocks, not in bytes */ + dlink_list ops; + char *p; }; /* Per-storedir info */ struct _cossinfo { dlink_list membufs; + dlink_list dead_membufs; struct _cossmembuf *current_membuf; - size_t current_offset; /* in bytes */ + off_t current_offset; /* in bytes */ int fd; int swaplog_fd; int numcollisions; - dlink_list index; + dlink_list pending_relocs; + dlink_list pending_ops; + int pending_reloc_count; int count; +#if ! USE_AUFSOPS async_queue_t aq; +#endif dlink_node *walk_current; unsigned int blksz_bits; unsigned int blksz_mask; /* just 1<fsdata; + + /* COSS is pretty useless without 64 bit file offsets */ + if (sizeof(off_t) < 8) { + fatalf("COSS will not function without large file support (off_t is %d bytes long. Please reconsider recompiling squid with --with-large-files and --enable-large-cache-files\n", sizeof(off_t)); + } +#if USE_AUFSOPS + aioInit(); + squidaio_init(); +#else a_file_setupqueue(&cs->aq); - storeCossDirOpenSwapLog(sd); - storeCossDirRebuild(sd); +#endif cs->fd = file_open(sd->path, O_RDWR | O_CREAT); if (cs->fd < 0) { debug(79, 1) ("%s: %s\n", sd->path, xstrerror()); fatal("storeCossDirInit: Failed to open a COSS file."); } + storeCossDirOpenSwapLog(sd); + storeCossDirRebuild(sd); n_coss_dirs++; /* * fs.blksize is normally determined by calling statvfs() etc, @@ -183,27 +196,37 @@ * page. */ sd->fs.blksize = 1 << cs->blksz_bits; + comm_quick_poll_required(); } void storeCossRemove(SwapDir * sd, StoreEntry * e) { CossInfo *cs = (CossInfo *) sd->fsdata; + int stripe; + //debug(1, 1) ("storeCossRemove: %x: %d/%d\n", e, (int) e->swap_dirn, (e) e->swap_filen); CossIndexNode *coss_node = e->repl.data; + assert(sd->index == e->swap_dirn); + assert(e->swap_filen >= 0); e->repl.data = NULL; - dlinkDelete(&coss_node->node, &cs->index); + stripe = storeCossFilenoToStripe(cs, e->swap_filen); + dlinkDelete(&coss_node->node, &cs->stripes[stripe].objlist); memPoolFree(coss_index_pool, coss_node); cs->count -= 1; } void -storeCossAdd(SwapDir * sd, StoreEntry * e) +storeCossAdd(SwapDir * sd, StoreEntry * e, int curstripe) { CossInfo *cs = (CossInfo *) sd->fsdata; + CossStripe *cstripe = &cs->stripes[curstripe]; CossIndexNode *coss_node = memPoolAlloc(coss_index_pool); assert(!e->repl.data); + assert(sd->index == e->swap_dirn); + /* Make sure the object exists in the current stripe, it should do! */ + assert(curstripe == storeCossFilenoToStripe(cs, e->swap_filen)); e->repl.data = coss_node; - dlinkAdd(e, &coss_node->node, &cs->index); + dlinkAddTail(e, &coss_node->node, &cstripe->objlist); cs->count += 1; } @@ -211,155 +234,19 @@ storeCossRebuildComplete(void *data) { RebuildState *rb = data; - SwapDir *sd = rb->sd; - storeCossStartMembuf(sd); + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; + storeCossStartMembuf(SD); store_dirs_rebuilding--; - storeCossDirCloseTmpSwapLog(rb->sd); + storeCossDirCloseTmpSwapLog(SD); storeRebuildComplete(&rb->counts); + debug(47, 1) ("COSS: %s: Rebuild Completed\n", SD->path); + cs->rebuild.rebuilding = 0; + debug(47, 1) (" %d objects scanned, %d objects relocated, %d objects fresher, %d objects ignored\n", + rb->counts.scancount, rb->cosscounts.reloc, rb->cosscounts.fresher, rb->cosscounts.unknown); cbdataFree(rb); } -static void -storeCossRebuildFromSwapLog(void *data) -{ - RebuildState *rb = data; - StoreEntry *e = NULL; - storeSwapLogData s; - size_t ss = sizeof(storeSwapLogData); - int count; - double x; - assert(rb != NULL); - /* load a number of objects per invocation */ - for (count = 0; count < rb->speed; count++) { - if (fread(&s, ss, 1, rb->log) != 1) { - debug(79, 1) ("Done reading %s swaplog (%d entries)\n", - rb->sd->path, rb->n_read); - fclose(rb->log); - rb->log = NULL; - storeCossRebuildComplete(rb); - return; - } - rb->n_read++; - if (s.op <= SWAP_LOG_NOP) - continue; - if (s.op >= SWAP_LOG_MAX) - continue; - debug(20, 3) ("storeCossRebuildFromSwapLog: %s %s %08X\n", - swap_log_op_str[(int) s.op], - storeKeyText(s.key), - s.swap_filen); - if (s.op == SWAP_LOG_ADD) { - (void) 0; - } else if (s.op == SWAP_LOG_DEL) { - /* Delete unless we already have a newer copy */ - if ((e = storeGet(s.key)) != NULL && s.lastref > e->lastref) { - /* - * Make sure we don't unlink the file, it might be - * in use by a subsequent entry. Also note that - * we don't have to subtract from store_swap_size - * because adding to store_swap_size happens in - * the cleanup procedure. - */ - storeExpireNow(e); - storeReleaseRequest(e); - if (e->swap_filen > -1) { - e->swap_filen = -1; - } - storeRelease(e); - /* Fake an unlink here, this is a bad hack :( */ - storeCossRemove(rb->sd, e); - rb->counts.objcount--; - rb->counts.cancelcount++; - } - continue; - } else { - x = log(++rb->counts.bad_log_op) / log(10.0); - if (0.0 == x - (double) (int) x) - debug(20, 1) ("WARNING: %d invalid swap log entries found\n", - rb->counts.bad_log_op); - rb->counts.invalid++; - continue; - } - if ((++rb->counts.scancount & 0xFFF) == 0) { - struct stat sb; - if (0 == fstat(fileno(rb->log), &sb)) - storeRebuildProgress(rb->sd->index, - (int) sb.st_size / ss, rb->n_read); - } - if (EBIT_TEST(s.flags, KEY_PRIVATE)) { - rb->counts.badflags++; - continue; - } - e = storeGet(s.key); - if (e) { - /* key already exists, current entry is newer */ - /* keep old, ignore new */ - rb->counts.dupcount++; - continue; - } - /* update store_swap_size */ - rb->counts.objcount++; - e = storeCossAddDiskRestore(rb->sd, s.key, - s.swap_filen, - s.swap_file_sz, - s.expires, - s.timestamp, - s.lastref, - s.lastmod, - s.refcount, - s.flags, - (int) rb->flags.clean); - storeDirSwapLog(e, SWAP_LOG_ADD); - } - eventAdd("storeCossRebuild", storeCossRebuildFromSwapLog, rb, 0.0, 1); -} - -/* Add a new object to the cache with empty memory copy and pointer to disk - * use to rebuild store from disk. */ -static StoreEntry * -storeCossAddDiskRestore(SwapDir * SD, const cache_key * key, - int file_number, - squid_file_sz swap_file_sz, - time_t expires, - time_t timestamp, - time_t lastref, - time_t lastmod, - u_num32 refcount, - u_short flags, - int clean) -{ - StoreEntry *e = NULL; - debug(20, 5) ("storeCossAddDiskRestore: %s, fileno=%08X\n", storeKeyText(key), file_number); - /* if you call this you'd better be sure file_number is not - * already in use! */ - e = new_StoreEntry(STORE_ENTRY_WITHOUT_MEMOBJ, NULL, NULL); - e->store_status = STORE_OK; - e->swap_dirn = SD->index; - storeSetMemStatus(e, NOT_IN_MEMORY); - e->swap_status = SWAPOUT_DONE; - e->swap_filen = file_number; - e->swap_file_sz = swap_file_sz; - e->lock_count = 0; - e->lastref = lastref; - e->timestamp = timestamp; - e->expires = expires; - e->lastmod = lastmod; - e->refcount = refcount; - e->flags = flags; - EBIT_SET(e->flags, ENTRY_CACHABLE); - EBIT_CLR(e->flags, RELEASE_REQUEST); - EBIT_CLR(e->flags, KEY_PRIVATE); - e->ping_status = PING_NONE; - EBIT_CLR(e->flags, ENTRY_VALIDATED); - storeHashInsert(e, key); /* do it after we clear KEY_PRIVATE */ - storeCossAdd(SD, e); -#if USE_COSS_ALLOC_NOTIFY - e->swap_filen = storeCossAllocate(SD, e, COSS_ALLOC_NOTIFY); -#endif - assert(e->swap_filen >= 0); - return e; -} - CBDATA_TYPE(RebuildState); static void storeCossDirRebuild(SwapDir * sd) @@ -368,40 +255,15 @@ int clean = 0; int zero = 0; FILE *fp; - EVH *func = NULL; CBDATA_INIT_TYPE(RebuildState); rb = cbdataAlloc(RebuildState); rb->sd = sd; - rb->speed = opt_foreground_rebuild ? 1 << 30 : 50; - func = storeCossRebuildFromSwapLog; rb->flags.clean = (unsigned int) clean; - /* - * If the swap.state file exists in the cache_dir, then - * we'll use storeCossRebuildFromSwapLog(). - */ fp = storeCossDirOpenTmpSwapLog(sd, &clean, &zero); - debug(20, 1) ("Rebuilding COSS storage in %s (%s)\n", - sd->path, clean ? "CLEAN" : "DIRTY"); - rb->log = fp; + fclose(fp); + debug(20, 1) ("Rebuilding COSS storage in %s (DIRTY)\n", sd->path); store_dirs_rebuilding++; - if (!clean || fp == NULL) { - /* COSS cannot yet rebuild from a dirty state. If the log - * is dirty then the COSS contents is thrown away. - * Why? I guess it is because some contents will be lost, - * and COSS cannot verify this.. - */ - if (fp != NULL) - fclose(fp); - /* - * XXX Make sure we don't trigger an assertion if this is the first - * storedir, since if we are, this call will cause storeRebuildComplete - * to prematurely complete the rebuild process, and then some other - * storedir will try to rebuild and eventually die. - */ - eventAdd("storeCossRebuildComplete", storeCossRebuildComplete, rb, 0.0, 0); - return; - } - eventAdd("storeCossRebuild", func, rb, 0.0, 1); + storeDirCoss_StartDiskRebuild(rb); } static void @@ -494,7 +356,7 @@ static int storeCossDirWriteCleanStart(SwapDir * sd) { - CossInfo *cs = (CossInfo *) sd->fsdata; + //CossInfo *cs = (CossInfo *) sd->fsdata; struct _clean_state *state = xcalloc(1, sizeof(*state)); #if HAVE_FCHMOD struct stat sb; @@ -513,7 +375,6 @@ state->outbuf = xcalloc(CLEAN_BUF_SZ, 1); state->outbuf_offset = 0; unlink(state->cln); - state->current = cs->index.tail; debug(20, 3) ("storeCOssDirWriteCleanLogs: opened %s, FD %d\n", state->new, state->fd); #if HAVE_FCHMOD @@ -671,9 +532,15 @@ storeCossDirShutdown(SwapDir * SD) { CossInfo *cs = (CossInfo *) SD->fsdata; + debug(47, 1) ("COSS: %s: syncing\n", SD->path); +#if USE_AUFSOPS + aioSync(SD); +#endif storeCossSync(SD); /* This'll call a_file_syncqueue() */ +#if !USE_AUFSOPS a_file_closequeue(&cs->aq); +#endif file_close(cs->fd); cs->fd = -1; @@ -695,21 +562,38 @@ int storeCossDirCheckObj(SwapDir * SD, const StoreEntry * e) { + CossInfo *cs = SD->fsdata; /* Check if the object is a special object, we can't cache these */ if (EBIT_TEST(e->flags, ENTRY_SPECIAL)) return 0; + if (cs->rebuild.rebuilding == 1) + return 0; return 1; } int -storeCossDirCheckLoadAv(SwapDir * SD, store_op_t op) +storeCossDirCheckLoadAv(SwapDir *SD, store_op_t op) { +#if !USE_AUFSOPS CossInfo *cs = (CossInfo *) SD->fsdata; +#else + int ql = 0; +#endif int loadav; /* Return load, cs->aq.aq_numpending out of MAX_ASYNCOP */ +#if USE_AUFSOPS + ql = aioQueueSize(); + if (ql == 0) + loadav = 0; + else + loadav = ql * 1000 / MAGIC1; + debug(47, 9) ("storeAufsDirCheckObj: load=%d\n", loadav); + return loadav; +#else loadav = cs->aq.aq_numpending * 1000 / MAX_ASYNCOP; return loadav; +#endif } @@ -720,8 +604,13 @@ storeCossDirCallback(SwapDir * SD) { CossInfo *cs = (CossInfo *) SD->fsdata; - + storeCossFreeDeadMemBufs(cs); +#if USE_AUFSOPS + /* I believe this call, at the present, checks all callbacks for all SDs, not just ours */ + return aioCheckCallbacks(SD); +#else return a_file_callback(&cs->aq); +#endif } /* ========== LOCAL FUNCTIONS ABOVE, GLOBAL FUNCTIONS BELOW ========== */ @@ -743,13 +632,17 @@ SD->map->n_files_in_map, SD->map->max_n_files, percent(SD->map->n_files_in_map, SD->map->max_n_files)); #endif +#if !USE_AUFSOPS storeAppendPrintf(sentry, "Pending operations: %d out of %d\n", cs->aq.aq_numpending, MAX_ASYNCOP); +#endif storeAppendPrintf(sentry, "Flags:"); if (SD->flags.selected) storeAppendPrintf(sentry, " SELECTED"); if (SD->flags.read_only) storeAppendPrintf(sentry, " READ-ONLY"); storeAppendPrintf(sentry, "\n"); + storeAppendPrintf(sentry, "Pending Relocations: %d\n", cs->pending_reloc_count); + membufsDump(cs, sentry); } static void @@ -812,8 +705,6 @@ cs->numcollisions = 0; cs->membufs.head = cs->membufs.tail = NULL; /* set when the rebuild completes */ cs->current_membuf = NULL; - cs->index.head = NULL; - cs->index.tail = NULL; cs->blksz_bits = 9; /* default block size = 512 */ cs->blksz_mask = (1 << cs->blksz_bits) - 1; @@ -835,8 +726,21 @@ debug(47, 0) ("COSS cache_dir size = %d KB\n", sd->max_size); fatal("COSS cache_dir size exceeds largest offset\n"); } -} + /* XXX todo checks */ + /* Ensure that off_t range can cover the max_size */ + + /* Ensure that the max size IS a multiple of the membuf size, or things + * will get very fruity near the end of the disk. */ + cs->numstripes = (off_t)(((off_t) sd->max_size) << 10) / COSS_MEMBUF_SZ; + debug(47, 0) ("COSS: number of stripes: %d of %d bytes each\n", cs->numstripes, COSS_MEMBUF_SZ); + cs->stripes = xcalloc(cs->numstripes, sizeof(struct _cossstripe)); + for (i = 0; i < cs->numstripes; i++) { + cs->stripes[i].id = i; + cs->stripes[i].membuf = NULL; + cs->stripes[i].numdiskobjs = -1; + } +} static void storeCossDirReconfigure(SwapDir * sd, int index, char *path) @@ -975,9 +879,9 @@ "write", coss_stats.write.ops, coss_stats.write.success, coss_stats.write.fail); storeAppendPrintf(sentry, tbl_fmt, "s_write", coss_stats.stripe_write.ops, coss_stats.stripe_write.success, coss_stats.stripe_write.fail); - storeAppendPrintf(sentry, "\n"); storeAppendPrintf(sentry, "stripes: %d\n", coss_stats.stripes); + storeAppendPrintf(sentry, "dead_stripes: %d\n", coss_stats.dead_stripes); storeAppendPrintf(sentry, "alloc.alloc: %d\n", coss_stats.alloc.alloc); storeAppendPrintf(sentry, "alloc.realloc: %d\n", coss_stats.alloc.realloc); storeAppendPrintf(sentry, "alloc.collisions: %d\n", coss_stats.alloc.collisions); @@ -997,6 +901,361 @@ storefs->donefunc = storeCossDirDone; coss_state_pool = memPoolCreate("COSS IO State data", sizeof(CossState)); coss_index_pool = memPoolCreate("COSS index data", sizeof(CossIndexNode)); + coss_realloc_pool = memPoolCreate("COSS pending realloc", sizeof(CossPendingReloc)); + coss_op_pool = memPoolCreate("COSS pending operation", sizeof(CossReadOp)); cachemgrRegister("coss", "COSS Stats", storeCossStats, 0, 1); coss_initialised = 1; } + +/* New storedir rebuilding code! */ + +static void storeDirCoss_ReadStripe(RebuildState *rb); +static void storeDirCoss_ParseStripeBuffer(RebuildState *rb); +static void storeCoss_ConsiderStoreEntry(RebuildState *rb, const cache_key *key, StoreEntry *e); + +#if USE_AUFSOPS +static void +storeDirCoss_ReadStripeComplete(int fd, void *my_data, const char *buf, int aio_return, int aio_errno) +#else +static void +storeDirCoss_ReadStripeComplete(int fd, const char *buf, int r_len, int r_errflag, void *my_data) +#endif +{ + RebuildState *rb = my_data; + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; +#if USE_AUFSOPS + int r_errflag; + int r_len; + r_len = aio_return; + if (aio_errno) + r_errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR; + else + r_errflag = DISK_OK; + xmemcpy(cs->rebuild.buf, buf, r_len); +#endif + + debug(47, 2) ("COSS: %s: stripe %d, read %d bytes, status %d\n", SD->path, cs->rebuild.curstripe, r_len, r_errflag); + cs->rebuild.reading = 0; + if (r_errflag != DISK_OK) { + debug(47, 2) ("COSS: %s: stripe %d: error! Ignoring objects in this stripe.\n", SD->path, cs->rebuild.curstripe); + goto nextstripe; + } + cs->rebuild.buflen = r_len; + /* parse the stripe contents */ + /* + * XXX note: the read should be put before the parsing so they can happen + * simultaneously. This'll require some code-shifting so the read buffer + * and parse buffer are different. This might speed up the read speed; + * the disk throughput isn't being reached at the present. + */ + storeDirCoss_ParseStripeBuffer(rb); + +nextstripe: + cs->rebuild.curstripe++; + if (cs->rebuild.curstripe >= cs->numstripes) { + /* Completed the rebuild - move onto the next phase */ + debug(47, 2) ("COSS: %s: completed reading the stripes.\n", SD->path); + storeCossRebuildComplete(rb); + return; + } else { + /* Next stripe */ + storeDirCoss_ReadStripe(rb); + } +} + +static void +storeDirCoss_ReadStripe(RebuildState *rb) +{ + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; + + assert(cs->rebuild.reading == 0); + cs->rebuild.reading = 1; + /* Use POSIX AIO for now */ + debug(47, 2) ("COSS: %s: reading stripe %d\n", SD->path, cs->rebuild.curstripe); + if (cs->rebuild.curstripe > rb->report_current) { + debug(47, 1) ("COSS: %s: Rebuilding (%d %% completed - %d/%d stripes)\n", SD->path, + cs->rebuild.curstripe * 100 / cs->numstripes, cs->rebuild.curstripe, cs->numstripes); + rb->report_current += rb->report_interval; + } +#if USE_AUFSOPS + /* XXX this should be a prime candidate to use a modified aioRead which doesn't malloc a damned buffer */ + aioRead(cs->fd, cs->rebuild.curstripe * COSS_MEMBUF_SZ, COSS_MEMBUF_SZ, storeDirCoss_ReadStripeComplete, rb); +#else + a_file_read(&cs->aq, cs->fd, cs->rebuild.buf, COSS_MEMBUF_SZ, cs->rebuild.curstripe * COSS_MEMBUF_SZ, storeDirCoss_ReadStripeComplete, rb); +#endif +} + +static void +storeDirCoss_StartDiskRebuild(RebuildState *rb) +{ + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; + assert(cs->rebuild.rebuilding == 0); + assert(cs->numstripes > 0); + assert(cs->rebuild.buf == NULL); + assert(cs->fd >= 0); + cs->rebuild.rebuilding = 1; + cs->rebuild.curstripe = 0; + cs->rebuild.buf = xmalloc(COSS_MEMBUF_SZ); + rb->report_interval = cs->numstripes / COSS_REPORT_INTERVAL; + rb->report_current = 0; + debug(47, 2) ("COSS: %s: Beginning disk rebuild.\n", SD->path); + storeDirCoss_ReadStripe(rb); +} + +/* + * Take a stripe and attempt to place objects into it + */ +static void +storeDirCoss_ParseStripeBuffer(RebuildState *rb) +{ + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; + tlv *t, *tlv_list; + int j = 0; + int bl = 0; + int tmp; + squid_off_t *l, len; + int blocksize = cs->blksz_mask + 1; + StoreEntry tmpe; + cache_key key[MD5_DIGEST_CHARS]; + sfileno filen; + + assert(cs->rebuild.rebuilding == 1); + assert(cs->numstripes > 0); + assert(cs->rebuild.buf != NULL); + + if (cs->rebuild.buflen == 0) { + debug(47, 3) ("COSS: %s: stripe %d: read 0 bytes, skipping stripe\n", SD->path, cs->rebuild.curstripe); + return; + } + + while (j < cs->rebuild.buflen) + { + l = NULL; + bl = 0; + /* XXX there's no bounds checking on the buffer being passed into storeSwapMetaUnpack! */ + tlv_list = storeSwapMetaUnpack(cs->rebuild.buf + j, &bl); + if (tlv_list == NULL) { + debug(47, 3) ("COSS: %s: stripe %d: offset %d gives NULL swapmeta data; end of stripe\n", SD->path, cs->rebuild.curstripe, j); + return; + } + filen = (off_t) j / (off_t) blocksize + (off_t) ((off_t) cs->rebuild.curstripe * (off_t) COSS_MEMBUF_SZ / (off_t) blocksize); + debug(47, 3) ("COSS: %s: stripe %d: filen %d: header size %d\n", SD->path, cs->rebuild.curstripe, filen, bl); + + /* COSS objects will have an object size written into the metadata */ + bzero(&tmpe, sizeof(tmpe)); + bzero(key, sizeof(key)); + for (t = tlv_list; t; t = t->next) { + switch(t->type) { + case STORE_META_URL: + debug(47, 3) (" URL: %s\n", (char *)t->value); + break; + case STORE_META_OBJSIZE: + l = t->value; + debug(47, 3) ("Size: %lld (len %d)\n", *l, t->length); + break; + case STORE_META_KEY: + assert(t->length == MD5_DIGEST_CHARS); + xmemcpy(key, t->value, MD5_DIGEST_CHARS); + break; +#if SIZEOF_SQUID_FILE_SZ == SIZEOF_SIZE_T + case STORE_META_STD: + assert(t->length == STORE_HDR_METASIZE); + xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE); + break; +#else + case STORE_META_STD_LFS: + assert(t->length == STORE_HDR_METASIZE); + xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE); + break; + case STORE_META_STD: + assert(t->length == STORE_HDR_METASIZE_OLD); + { + struct { + time_t timestamp; + time_t lastref; + time_t expires; + time_t lastmod; + size_t swap_file_sz; + u_short refcount; + u_short flags; + } *tmp = t->value; + assert(sizeof(*tmp) == STORE_HDR_METASIZE_OLD); + tmpe.timestamp = tmp->timestamp; + tmpe.lastref = tmp->lastref; + tmpe.expires = tmp->expires; + tmpe.lastmod = tmp->lastmod; + tmpe.swap_file_sz = tmp->swap_file_sz; + tmpe.refcount = tmp->refcount; + tmpe.flags = tmp->flags; + } + break; +#endif + } + } + /* Make sure we have an object; if we don't then it may be an indication of trouble */ + if (l == NULL) { + debug(47, 3) ("COSS: %s: stripe %d: Object with no size; end of stripe\n", SD->path, cs->rebuild.curstripe); + storeSwapTLVFree(tlv_list); + return; + } + len = *l; + /* Finally, make sure there's enough data left in this stripe to satisfy the object + * we've just been informed about + */ + if (cs->rebuild.buflen - j < len) { + debug(47, 3) ("COSS: %s: stripe %d: Not enough data in this stripe for this object, bye bye.\n", SD->path, cs->rebuild.curstripe); + storeSwapTLVFree(tlv_list); + return; + } + + /* Houston, we have an object */ + if (storeKeyNull(key)) { + debug(47, 3) ("COSS: %s: stripe %d: null data, next!\n", SD->path, cs->rebuild.curstripe); + goto nextobject; + } + rb->counts.scancount++; + tmpe.hash.key = key; + /* Check sizes */ + if (tmpe.swap_file_sz == 0) { + tmpe.swap_file_sz = len; + } + if (tmpe.swap_file_sz != len) { + debug(47, 3) ("COSS: %s: stripe %d: file size mismatch (%d != %d)\n", SD->path, cs->rebuild.curstripe, (int) tmpe.swap_file_sz, (int) len); + goto nextobject; + } + if (EBIT_TEST(tmpe.flags, KEY_PRIVATE)) { + debug(47, 3) ("COSS: %s: stripe %d: private key flag set, ignoring.\n", SD->path, cs->rebuild.curstripe); + rb->counts.badflags++; + goto nextobject; + } + /* Time to consider the object! */ + tmpe.swap_filen = filen; + tmpe.swap_dirn = SD->index; + storeCoss_ConsiderStoreEntry(rb, key, &tmpe); + +nextobject: + /* Free the TLV data */ + storeSwapTLVFree(tlv_list); + tlv_list = NULL; + + /* Now, advance to the next block-aligned offset after this object */ + j = j + len + bl; + /* And now, the blocksize! */ + tmp = j / blocksize; + tmp = (tmp+1) * blocksize; + j = tmp; + } +} + + +static void +storeCoss_AddStoreEntry(RebuildState *rb, const cache_key *key, StoreEntry *e) +{ + StoreEntry *ne; + SwapDir *SD = rb->sd; + CossInfo *cs = SD->fsdata; + rb->counts.objcount++; + /* The Passed-in store entry is temporary; don't bloody use it directly! */ + assert(e->swap_dirn == SD->index); + ne = new_StoreEntry(STORE_ENTRY_WITHOUT_MEMOBJ, NULL, NULL); + ne->store_status = STORE_OK; + storeSetMemStatus(ne, NOT_IN_MEMORY); + ne->swap_status = SWAPOUT_DONE; + ne->swap_filen = e->swap_filen; + ne->swap_dirn = SD->index; + ne->swap_file_sz = e->swap_file_sz; + ne->lock_count = 0; + ne->lastref = e->lastref; + ne->timestamp = e->timestamp; + ne->expires = e->expires; + ne->lastmod = e->lastmod; + ne->refcount = e->refcount; + ne->flags = e->flags; + EBIT_SET(ne->flags, ENTRY_CACHABLE); + EBIT_CLR(ne->flags, RELEASE_REQUEST); + EBIT_CLR(ne->flags, KEY_PRIVATE); + ne->ping_status = PING_NONE; + EBIT_CLR(ne->flags, ENTRY_VALIDATED); + storeHashInsert(ne, key); /* do it after we clear KEY_PRIVATE */ + storeCossAdd(SD, ne, cs->rebuild.curstripe); + storeEntryDump(ne, 5); + assert(ne->repl.data != NULL); + assert(e->repl.data == NULL); +} + +static void +storeCoss_DeleteStoreEntry(RebuildState *rb, const cache_key *key, StoreEntry *e) +{ + assert(rb->counts.objcount >= 0); + rb->counts.objcount--; + assert(e->swap_dirn >= 0); + storeCossRemove(INDEXSD(e->swap_dirn), e); + e->swap_filen = -1; + storeExpireNow(e); + storeReleaseRequest(e); + storeRelease(e); +} + +/* + * Consider inserting the given StoreEntry into the given + * COSS directory. + * + * The rules for doing this is reasonably simple: + * + * If the object doesn't exist in the cache then we simply + * add it to the current stripe list + * + * If the object does exist in the cache then we compare + * "freshness"; if the newer object is fresher then we + * remove it from its stripe and re-add it to the current + * stripe. + */ +static void +storeCoss_ConsiderStoreEntry(RebuildState *rb, const cache_key *key, StoreEntry *e) +{ + StoreEntry *oe; + + /* Check for clashes */ + oe = storeGet(key); + if (oe == NULL) { + rb->cosscounts.new++; + /* no clash! woo, can add and forget */ + storeCoss_AddStoreEntry(rb, key, e); + return; + } + + /* This isn't valid - its possible we have a fresher object in another store */ + /* unlike the UFS-based stores we don't "delete" the disk object when we + * have deleted the object; its one of the annoying things about COSS. */ + //assert(oe->swap_dirn == SD->index); + /* Dang, its a clash. See if its fresher */ + + /* Fresher? Its a new object: deallocate the old one, reallocate the new one */ + if (e->lastref > oe->lastref) { + debug(47, 3) ("COSS: fresher object for filen %d found (%d -> %d)\n", oe->swap_filen, (int) oe->timestamp, (int) e->timestamp); + rb->cosscounts.fresher++; + storeCoss_DeleteStoreEntry(rb, key, oe); + oe = NULL; + storeCoss_AddStoreEntry(rb, key, e); + return; + } + + /* + * Not fresher? Its the same object then we /should/ probably relocate it; I'm + * not sure what should be done here. + */ + if (oe->timestamp == e->timestamp && oe->expires == e->expires) { + debug(47, 3) ("COSS: filen %d -> %d (since they're the same!)\n", oe->swap_filen, e->swap_filen); + rb->cosscounts.reloc++; + storeCoss_DeleteStoreEntry(rb, key, oe); + oe = NULL; + storeCoss_AddStoreEntry(rb, key, e); + return; + } + debug(47, 3) ("COSS: filen %d: ignoring this one for some reason\n", e->swap_filen); + rb->cosscounts.unknown++; +} Index: src/fs/coss/store_io_coss.c =================================================================== RCS file: /server/cvs-server/squid/squid/src/fs/coss/store_io_coss.c,v retrieving revision 1.19 diff -u -r1.19 store_io_coss.c --- src/fs/coss/store_io_coss.c 17 May 2005 16:56:43 -0000 1.19 +++ src/fs/coss/store_io_coss.c 20 May 2006 09:40:02 -0000 @@ -1,6 +1,6 @@ /* - * $Id: store_io_coss.c,v 1.19 2005/05/17 16:56:43 hno Exp $ + * $Id: store_io_coss.c,v 1.13.2.11 2005/03/26 23:40:21 hno Exp $ * * DEBUG: section 79 Storage Manager COSS Interface * AUTHOR: Eric Stern @@ -37,26 +37,47 @@ #include #include "async_io.h" #include "store_coss.h" - +#if USE_AUFSOPS +#include "../aufs/async_io.h" +#endif + +#if USE_AUFSOPS +static AIOCB storeCossWriteMemBufDone; +#else static DWCB storeCossWriteMemBufDone; -static DRCB storeCossReadDone; +#endif static void storeCossIOCallback(storeIOState * sio, int errflag); -static char *storeCossMemPointerFromDiskOffset(SwapDir * SD, size_t offset, CossMemBuf ** mb); +static char *storeCossMemPointerFromDiskOffset(CossInfo *cs, off_t offset, CossMemBuf ** mb); static void storeCossMemBufLock(SwapDir * SD, storeIOState * e); static void storeCossMemBufUnlock(SwapDir * SD, storeIOState * e); static void storeCossWriteMemBuf(SwapDir * SD, CossMemBuf * t); -static void storeCossWriteMemBufDone(int fd, int errflag, size_t len, void *my_data); -static CossMemBuf *storeCossCreateMemBuf(SwapDir * SD, size_t start, - sfileno curfn, int *collision); +static CossMemBuf *storeCossCreateMemBuf(SwapDir * SD, int stripe, sfileno curfn, int *collision); static CBDUNL storeCossIOFreeEntry; static off_t storeCossFilenoToDiskOffset(sfileno f, CossInfo *); static sfileno storeCossDiskOffsetToFileno(off_t o, CossInfo *); static void storeCossMaybeWriteMemBuf(SwapDir * SD, CossMemBuf * t); +static void storeCossMaybeFreeBuf(CossInfo *cs, CossMemBuf *mb); +int storeCossFilenoToStripe(CossInfo *cs, sfileno filen); static void membuf_describe(CossMemBuf * t, int level, int line); +/* Handle relocates - temporary routines until readops have been fleshed out */ +void storeCossNewPendingRelocate(CossInfo *cs, storeIOState *sio, sfileno original_filen, sfileno new_filen); +CossPendingReloc * storeCossGetPendingReloc(CossInfo *cs, sfileno new_filen); +#if USE_AUFSOPS +AIOCB storeCossCompletePendingReloc; +#else +DRCB storeCossCompletePendingReloc; +#endif + +/* Read operation code */ +CossReadOp * storeCossCreateReadOp(CossInfo *cs, storeIOState *sio); +void storeCossCompleteReadOp(CossInfo *cs, CossReadOp *op, int error); +void storeCossKickReadOp(CossInfo *cs, CossReadOp *op); + CBDATA_TYPE(storeIOState); CBDATA_TYPE(CossMemBuf); +CBDATA_TYPE(CossPendingReloc); /* === PUBLIC =========================================================== */ @@ -75,6 +96,7 @@ off_t retofs; size_t allocsize; int coll = 0; + sfileno f; sfileno checkf; /* Make sure we chcek collisions if reallocating */ @@ -104,8 +126,9 @@ cs->current_membuf->flags.full = 1; cs->current_membuf->diskend = cs->current_offset; storeCossMaybeWriteMemBuf(SD, cs->current_membuf); + /* cs->current_membuf may be invalid at this point */ cs->current_offset = 0; /* wrap back to beginning */ - debug(79, 2) ("storeCossAllocate: wrap to 0\n"); + debug(79, 2) ("storeCossAllocate: %s: wrap to 0\n", SD->path); newmb = storeCossCreateMemBuf(SD, 0, checkf, &coll); cs->current_membuf = newmb; @@ -119,21 +142,27 @@ cs->current_membuf->flags.full = 1; cs->current_offset = cs->current_membuf->diskend; storeCossMaybeWriteMemBuf(SD, cs->current_membuf); - debug(79, 2) ("storeCossAllocate: New offset - %ld\n", - (long int) cs->current_offset); - newmb = storeCossCreateMemBuf(SD, cs->current_offset, checkf, &coll); + /* cs->current_membuf may be invalid at this point */ + debug(79, 3) ("storeCossAllocate: %s: New offset - %lld\n", SD->path, + (long long int) cs->current_offset); + assert(cs->curstripe < (cs->numstripes - 1)); + newmb = storeCossCreateMemBuf(SD, cs->curstripe + 1, checkf, &coll); cs->current_membuf = newmb; } /* If we didn't get a collision, then update the current offset and return it */ if (coll == 0) { retofs = cs->current_offset; cs->current_offset = retofs + allocsize; + cs->current_membuf->numobjs++; /* round up to our blocksize */ cs->current_offset = ((cs->current_offset + cs->blksz_mask) >> cs->blksz_bits) << cs->blksz_bits; - return storeCossDiskOffsetToFileno(retofs, cs); + f = storeCossDiskOffsetToFileno(retofs, cs); + assert(f >= 0 && f <= 0xffffff); + debug(79, 3) ("storeCossAllocate: offset %lld, filen: %d\n", retofs, f); + return f; } else { coss_stats.alloc.collisions++; - debug(79, 3) ("storeCossAllocate: Collision\n"); + debug(79, 3) ("storeCossAllocate: %s: Collision\n", SD->path); return -1; } } @@ -141,7 +170,7 @@ void storeCossUnlink(SwapDir * SD, StoreEntry * e) { - debug(79, 3) ("storeCossUnlink: offset %d\n", e->swap_filen); + debug(79, 3) ("storeCossUnlink: %s: offset %d\n", SD->path, e->swap_filen); coss_stats.unlink.ops++; coss_stats.unlink.success++; storeCossRemove(SD, e); @@ -153,7 +182,9 @@ { CossState *cstate; storeIOState *sio; + CossInfo *cs = SD->fsdata; + assert(cs->rebuild.rebuilding == 0); coss_stats.create.ops++; sio = cbdataAlloc(storeIOState); cstate = memPoolAlloc(coss_state_pool); @@ -174,10 +205,7 @@ sio->st_size = objectLen(e) + e->mem_obj->swap_hdr_sz; sio->swap_dirn = SD->index; sio->swap_filen = storeCossAllocate(SD, e, COSS_ALLOC_ALLOCATE); - debug(79, 3) ("storeCossCreate: offset %ld, size %ld, end %ld\n", - (long int) storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata), - (long int) sio->st_size, - (long int) (sio->swap_filen + sio->st_size)); + debug(79, 3) ("storeCossCreate: %p: filen: %d\n", sio, sio->swap_filen); assert(-1 != sio->swap_filen); sio->callback = callback; @@ -188,11 +216,12 @@ cstate->flags.writing = 0; cstate->flags.reading = 0; - cstate->readbuffer = NULL; cstate->reqdiskoffset = -1; /* Now add it into the index list */ - storeCossAdd(SD, e); + e->swap_filen = sio->swap_filen; + e->swap_dirn = sio->swap_dirn; + storeCossAdd(SD, e, cs->curstripe); storeCossMemBufLock(SD, sio); coss_stats.create.success++; @@ -207,14 +236,17 @@ char *p; CossState *cstate; sfileno f = e->swap_filen; + sfileno nf; CossInfo *cs = (CossInfo *) SD->fsdata; - debug(79, 3) ("storeCossOpen: offset %d\n", f); - coss_stats.open.ops++; + assert(cs->rebuild.rebuilding == 0); sio = cbdataAlloc(storeIOState); cstate = memPoolAlloc(coss_state_pool); + debug(79, 3) ("storeCossOpen: %p: offset %d\n", sio, f); + coss_stats.open.ops++; + sio->fsstate = cstate; sio->swap_filen = f; sio->swap_dirn = SD->index; @@ -229,15 +261,18 @@ cstate->flags.writing = 0; cstate->flags.reading = 0; - cstate->readbuffer = NULL; cstate->reqdiskoffset = -1; - p = storeCossMemPointerFromDiskOffset(SD, storeCossFilenoToDiskOffset(f, cs), NULL); + /* make local copy so we don't have to lock membuf */ + p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(f, cs), NULL); if (p) { - cstate->readbuffer = xmalloc(sio->st_size); - xmemcpy(cstate->readbuffer, p, sio->st_size); coss_stats.open_mem_hits++; + // This seems to cause a crash: either the membuf pointer is set wrong or the membuf + // is deallocated from underneath us. + storeCossMemBufLock(SD, sio); + debug(79,3) ("storeCossOpen: %s: memory hit!\n", SD->path); } else { + debug(79, 3) ("storeCossOpen: %s: memory miss - doing reallocation\n", SD->path); /* Do the allocation */ /* this is the first time we've been called on a new sio * read the whole object into memory, then return the @@ -250,45 +285,52 @@ * into the cossmembuf for later writing .. */ cstate->reqdiskoffset = storeCossFilenoToDiskOffset(sio->swap_filen, cs); - sio->swap_filen = storeCossAllocate(SD, e, COSS_ALLOC_REALLOC); - if (sio->swap_filen == -1) { + assert(cstate->reqdiskoffset >= 0); + nf = storeCossAllocate(SD, e, COSS_ALLOC_REALLOC); + if (nf == -1) { /* We have to clean up neatly .. */ coss_stats.open.fail++; cbdataFree(sio); cs->numcollisions++; - debug(79, 2) ("storeCossOpen: Reallocation of %d/%d failed\n", e->swap_dirn, e->swap_filen); + debug(79, 3) ("storeCossOpen: Reallocation of %d/%d failed\n", e->swap_dirn, e->swap_filen); /* XXX XXX XXX Will squid call storeUnlink for this object? */ return NULL; } + /* Remove the object from its currently-allocated stripe */ + storeCossRemove(SD, e); + storeCossNewPendingRelocate(cs, sio, sio->swap_filen, nf); + sio->swap_filen = nf; + cstate->flags.reloc = 1; /* Notify the upper levels that we've changed file number */ sio->file_callback(sio->callback_data, 0, sio); - - /* - * lock the buffer so it doesn't get swapped out on us - * this will get unlocked in storeCossClose - */ - storeCossMemBufLock(SD, sio); - + /* + * lock the new buffer so it doesn't get swapped out on us + * this will get unlocked in storeCossClose + */ + storeCossMemBufLock(SD, sio); /* * Do the index magic to keep the disk and memory LRUs identical + * by adding the object into the link list on the current stripe */ - storeCossRemove(SD, e); - storeCossAdd(SD, e); - - /* - * NOTE cstate->readbuffer is NULL. We'll actually read - * the disk data into the MemBuf in storeCossRead() and - * return that pointer back to the caller - */ + storeCossAdd(SD, e, cs->curstripe); } coss_stats.open.success++; return sio; } +/* + * Aha! The unlocked membuf. + * + * If its storeCossCreate, then it was locked. Fine. + * If it was storeCossOpen() and we found the object in-stripe then cool, + * its locked. + * If it was storeCossOpen() and we didn't find the object in-stripe then + * we reallocated the object into the current stripe and locked THAT. + */ void storeCossClose(SwapDir * SD, storeIOState * sio) { - debug(79, 3) ("storeCossClose: offset %d\n", sio->swap_filen); + debug(79, 3) ("storeCossClose: %p: offset %d\n", sio, sio->swap_filen); coss_stats.close.ops++; coss_stats.close.success++; storeCossMemBufUnlock(SD, sio); @@ -298,16 +340,16 @@ void storeCossRead(SwapDir * SD, storeIOState * sio, char *buf, size_t size, squid_off_t offset, STRCB * callback, void *callback_data) { - char *p; CossState *cstate = (CossState *) sio->fsstate; CossInfo *cs = (CossInfo *) SD->fsdata; + CossReadOp *op; coss_stats.read.ops++; assert(sio->read.callback == NULL); assert(sio->read.callback_data == NULL); sio->read.callback = callback; sio->read.callback_data = callback_data; - debug(79, 3) ("storeCossRead: offset %ld\n", (long int) offset); + debug(79, 3) ("storeCossRead: %s: offset %ld\n", SD->path, (long int) offset); sio->offset = offset; cstate->flags.reading = 1; if ((offset + size) > sio->st_size) @@ -315,25 +357,10 @@ cstate->requestlen = size; cstate->requestbuf = buf; cstate->requestoffset = offset; - if (cstate->readbuffer == NULL) { - p = storeCossMemPointerFromDiskOffset(SD, storeCossFilenoToDiskOffset(sio->swap_filen, cs), NULL); - a_file_read(&cs->aq, cs->fd, - p, - sio->st_size, - cstate->reqdiskoffset, - storeCossReadDone, - sio); - cstate->reqdiskoffset = 0; /* XXX */ - } else { - /* - * It was copied from memory in storeCossOpen() - */ - storeCossReadDone(cs->fd, - cstate->readbuffer, - sio->st_size, - 0, - sio); - } + /* All of these reads should be treated as pending ones */ + /* Ie, we create a read op; then we 'kick' the read op to see if it can be completed now */ + op = storeCossCreateReadOp(cs, sio); + storeCossKickReadOp(cs, op); } void @@ -350,9 +377,10 @@ assert(sio->e->mem_obj->object_sz != -1); coss_stats.write.ops++; - debug(79, 3) ("storeCossWrite: offset %ld, len %lu\n", (long int) sio->offset, (unsigned long int) size); + debug(79, 3) ("storeCossWrite: %s: offset %ld, len %lu\n", SD->path, + (long int) sio->offset, (unsigned long int) size); diskoffset = storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata) + sio->offset; - dest = storeCossMemPointerFromDiskOffset(SD, diskoffset, &membuf); + dest = storeCossMemPointerFromDiskOffset(SD->fsdata, diskoffset, &membuf); assert(dest != NULL); xmemcpy(dest, buf, size); sio->offset += size; @@ -365,57 +393,11 @@ /* === STATIC =========================================================== */ static void -storeCossReadDone(int fd, const char *buf, int len, int errflag, void *my_data) -{ - storeIOState *sio = my_data; - char *p; - STRCB *callback = sio->read.callback; - void *their_data = sio->read.callback_data; - SwapDir *SD = INDEXSD(sio->swap_dirn); - CossState *cstate = (CossState *) sio->fsstate; - ssize_t rlen; - - debug(79, 3) ("storeCossReadDone: fileno %d, FD %d, len %d\n", - sio->swap_filen, fd, len); - cstate->flags.reading = 0; - if (errflag) { - coss_stats.read.fail++; - if (errflag > 0) { - errno = errflag; - debug(79, 1) ("storeCossReadDone: error: %s\n", xstrerror()); - } else { - debug(79, 1) ("storeCossReadDone: got failure (%d)\n", errflag); - } - rlen = -1; - } else { - coss_stats.read.success++; - if (cstate->readbuffer == NULL) { - cstate->readbuffer = xmalloc(sio->st_size); - p = storeCossMemPointerFromDiskOffset(SD, - storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata), - NULL); - xmemcpy(cstate->readbuffer, p, sio->st_size); - } - sio->offset += len; - xmemcpy(cstate->requestbuf, &cstate->readbuffer[cstate->requestoffset], - cstate->requestlen); - rlen = (size_t) cstate->requestlen; - } - assert(callback); - assert(their_data); - sio->read.callback = NULL; - sio->read.callback_data = NULL; - if (cbdataValid(their_data)) - callback(their_data, cstate->requestbuf, rlen); -} - -static void storeCossIOCallback(storeIOState * sio, int errflag) { CossState *cstate = (CossState *) sio->fsstate; debug(79, 3) ("storeCossIOCallback: errflag=%d\n", errflag); assert(NULL == cstate->locked_membuf); - xfree(cstate->readbuffer); if (cbdataValid(sio->callback_data)) sio->callback(sio->callback_data, errflag, sio); cbdataUnlock(sio->callback_data); @@ -424,11 +406,10 @@ } static char * -storeCossMemPointerFromDiskOffset(SwapDir * SD, size_t offset, CossMemBuf ** mb) +storeCossMemPointerFromDiskOffset(CossInfo *cs, off_t offset, CossMemBuf ** mb) { CossMemBuf *t; dlink_node *m; - CossInfo *cs = (CossInfo *) SD->fsdata; for (m = cs->membufs.head; m; m = m->next) { t = m->data; @@ -465,6 +446,8 @@ { CossMemBuf *t = storeCossFilenoToMembuf(SD, sio->swap_filen); CossState *cstate = (CossState *) sio->fsstate; + assert(cstate->locked_membuf == NULL); + assert(t->flags.dead == 0); debug(79, 3) ("storeCossMemBufLock: locking %p, lockcount %d\n", t, t->lockcount); cstate->locked_membuf = t; @@ -475,28 +458,37 @@ storeCossMemBufUnlock(SwapDir * SD, storeIOState * sio) { CossState *cstate = (CossState *) sio->fsstate; + CossInfo *cs = SD->fsdata; CossMemBuf *t = cstate->locked_membuf; if (NULL == t) return; + assert(t->flags.dead == 0); debug(79, 3) ("storeCossMemBufUnlock: unlocking %p, lockcount %d\n", t, t->lockcount); t->lockcount--; cstate->locked_membuf = NULL; storeCossMaybeWriteMemBuf(SD, t); + /* cs->current_membuf may be invalid at this point */ + storeCossMaybeFreeBuf(cs, t); } static void storeCossMaybeWriteMemBuf(SwapDir * SD, CossMemBuf * t) { + //CossInfo *cs = SD->fsdata; membuf_describe(t, 3, __LINE__); + assert(t->flags.dead == 0); if (!t->flags.full) debug(79, 3) ("membuf %p not full\n", t); else if (t->flags.writing) debug(79, 3) ("membuf %p writing\n", t); else if (t->lockcount) debug(79, 3) ("membuf %p lockcount=%d\n", t, t->lockcount); + else if (t->flags.written) + debug(79, 3) ("membuf %p written\n", t); else storeCossWriteMemBuf(SD, t); + /* t may be invalid at this point */ } void @@ -504,10 +496,14 @@ { CossInfo *cs = (CossInfo *) SD->fsdata; dlink_node *m; - int end; + off_t end; /* First, flush pending IO ops */ +#if USE_AUFSOPS + aioSync(SD); +#else a_file_syncqueue(&cs->aq); +#endif /* Then, flush any in-memory partial membufs */ if (!cs->membufs.head) @@ -529,58 +525,167 @@ { CossInfo *cs = (CossInfo *) SD->fsdata; coss_stats.stripe_write.ops++; - debug(79, 3) ("storeCossWriteMemBuf: offset %ld, len %ld\n", + assert(t->flags.dead == 0); + debug(79, 3) ("storeCossWriteMemBuf: %p: offset %ld, len %ld\n", t, (long int) t->diskstart, (long int) (t->diskend - t->diskstart)); t->flags.writing = 1; + /* Check to see whether anything has a pending relocate (ie, a disk read) + * scheduled from the disk data we're about to overwrite. + * According to the specification this should never, ever happen - all the + * objects underneath this stripe were deallocated before we started + * using them - but there is a possibility that an object was opened + * before the objects underneath the membufs stripe were purged and there + * is still a pending relocate for it. Its a slim chance but it might happen. + */ + assert(t->stripe < cs->numstripes); + if (cs->stripes[t->stripe].pending_relocs > 0) { + debug(79, 1) ("WARNING: %s: One or more pending relocate (reads) from stripe %d are queued - and I'm now writing over that part of the disk. This may result in object data corruption!\n", SD->path, t->stripe); + } + /* + * normally nothing should have this node locked here - but between the time + * we call a_file_write and the IO completes someone might have snuck in and + * attached itself somehow. This is why there's a distinction between "written" + * and "writing". Read the rest of the code for more details. + */ +#if USE_AUFSOPS + /* XXX The last stripe, for now, ain't the coss stripe size for some reason */ + /* XXX This may cause problems later on; worry about figuring it out later on */ + //assert(t->diskend - t->diskstart == COSS_MEMBUF_SZ); + debug(79, 3) ("aioWrite: FD %d: disk start: %llu, size %llu\n", cs->fd, t->diskstart, t->diskend - t->diskstart); + aioWrite(cs->fd, t->diskstart, &(t->buffer[0]), t->diskend - t->diskstart, storeCossWriteMemBufDone, t, NULL); +#else a_file_write(&cs->aq, cs->fd, t->diskstart, &t->buffer, t->diskend - t->diskstart, storeCossWriteMemBufDone, t, NULL); +#endif +} + +/* + * Check if a memory buffer can be freed. + * Memory buffers can be freed if their refcount is 0 and they've been written. + */ +static void +storeCossMaybeFreeBuf(CossInfo *cs, CossMemBuf *mb) +{ + assert(mb->lockcount >= 0); + /* It'd be nice if we could walk all the pending sio's somehow to see if some has this membuf locked .. */ + if (mb->flags.dead == 1) { + debug(79, 1) ("storeCossMaybeFreeBuf: %p: dead; it'll be freed soon enough\n", mb); + return; + } + /* Place on dead list rather than free + * the asyncio code fails over to a 'sync' path; which may mean a membuf is + * deallocated somewhere deep in the stack level. This way we just mark them + * as dead and deallocate membufs early in the stack frame (ie, before we + * call the asyncio disk completion handler.) + */ + if (mb->lockcount == 0 && mb->flags.written == 1) { + debug (79, 3) ("storeCossMaybeFreeBuf: %p: lockcount = 0, written = 1: marking dead\n", mb); + mb->flags.dead = 1; + dlinkDelete(&mb->node, &cs->membufs); + dlinkAddTail(mb, &mb->node, &cs->dead_membufs); + coss_stats.dead_stripes++; + coss_stats.stripes--; + } } +void +storeCossFreeDeadMemBufs(CossInfo *cs) +{ + CossMemBuf *mb; + while (cs->dead_membufs.head != NULL) { + mb = cs->dead_membufs.head->data; + assert(mb->flags.dead == 1); + debug(79, 3) ("storeCossFreeDeadMemBufs: %p: freeing\n", mb); + dlinkDelete(&mb->node, &cs->dead_membufs); + cbdataFree(mb); + coss_stats.dead_stripes--; + } +} +/* + * Writing a membuf has completed. Set the written flag to 1; membufs might have been + * locked for read between the initial membuf write and the completion of the disk + * write. + */ +#if USE_AUFSOPS static void -storeCossWriteMemBufDone(int fd, int errflag, size_t len, void *my_data) +storeCossWriteMemBufDone(int fd, void *my_data, const char *buf, int aio_return, int aio_errno) +#else +static void +storeCossWriteMemBufDone(int fd, int r_errflag, size_t r_len, void *my_data) +#endif { CossMemBuf *t = my_data; CossInfo *cs = (CossInfo *) t->SD->fsdata; + int errflag; + int len; +#if USE_AUFSOPS + len = aio_return; + if (aio_errno) + errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR; + else + errflag = DISK_OK; +#else + len = r_len; + errflag = r_errflag; +#endif - debug(79, 3) ("storeCossWriteMemBufDone: buf %p, len %ld\n", t, (long int) len); + debug(79, 3) ("storeCossWriteMemBufDone: stripe %d, buf %p, len %ld\n", t->stripe, t, (long int) len); if (errflag) { coss_stats.stripe_write.fail++; debug(79, 1) ("storeCossWriteMemBufDone: got failure (%d)\n", errflag); - debug(79, 1) ("FD %d, size=%x\n", fd, (int) (t->diskend - t->diskstart)); + debug(79, 1) ("FD %d, size=%d\n", fd, (int) (t->diskend - t->diskstart)); } else { coss_stats.stripe_write.success++; } - - dlinkDelete(&t->node, &cs->membufs); - cbdataFree(t); - coss_stats.stripes--; + assert(cs->stripes[t->stripe].membuf == t); + debug(79, 2) ("storeCossWriteMemBufDone: %s: stripe %d: numobjs written: %d, lockcount %d\n", t->SD->path, t->stripe, t->numobjs, t->lockcount); + cs->stripes[t->stripe].numdiskobjs = t->numobjs; + cs->stripes[t->stripe].membuf = NULL; + t->flags.written = 1; + t->flags.writing = 0; + storeCossMaybeFreeBuf(cs, t); } +/* + * This creates a memory buffer but assumes its going to be at the end + * of the "LRU" and thusly will delete expire objects which appear under + * it. + */ static CossMemBuf * -storeCossCreateMemBuf(SwapDir * SD, size_t start, - sfileno curfn, int *collision) +storeCossCreateMemBuf(SwapDir * SD, int stripe, sfileno curfn, int *collision) { CossMemBuf *newmb, *t; StoreEntry *e; - dlink_node *m, *prev; + dlink_node *m, *n; int numreleased = 0; CossInfo *cs = (CossInfo *) SD->fsdata; + off_t start = (off_t) stripe * COSS_MEMBUF_SZ; + assert(start >= 0); + + /* No, we shouldn't ever try to create a membuf if we haven't freed the one on + * this stripe. Grr */ + assert(cs->stripes[stripe].membuf == NULL); + cs->curstripe = stripe; newmb = cbdataAlloc(CossMemBuf); + cs->stripes[stripe].membuf = newmb; newmb->diskstart = start; - debug(79, 3) ("storeCossCreateMemBuf: creating new membuf at %ld\n", (long int) newmb->diskstart); - debug(79, 3) ("storeCossCreateMemBuf: at %p\n", newmb); + newmb->stripe = stripe; + debug(79, 2) ("storeCossCreateMemBuf: %s: creating new membuf at stripe %d, %lld (%p)\n", SD->path, stripe, (long long int) newmb->diskstart, newmb); newmb->diskend = newmb->diskstart + COSS_MEMBUF_SZ; newmb->flags.full = 0; newmb->flags.writing = 0; newmb->lockcount = 0; + newmb->numobjs = 0; newmb->SD = SD; /* XXX This should be reversed, with the new buffer last in the chain */ dlinkAdd(newmb, &newmb->node, &cs->membufs); + assert(newmb->diskstart >= 0); + assert(newmb->diskend >= 0); /* Print out the list of membufs */ - debug(79, 3) ("storeCossCreateMemBuf: membuflist:\n"); + debug(79, 3) ("storeCossCreateMemBuf: %s: membuflist:\n", SD->path); for (m = cs->membufs.head; m; m = m->next) { t = m->data; membuf_describe(t, 3, __LINE__); @@ -589,18 +694,19 @@ /* * Kill objects from the tail to make space for a new chunk */ - for (m = cs->index.tail; m; m = prev) { + m = cs->stripes[stripe].objlist.head; + while (m != NULL) { + n = m->next; off_t o; - prev = m->prev; e = m->data; o = storeCossFilenoToDiskOffset(e->swap_filen, cs); - if (curfn == e->swap_filen) + if (curfn > -1 && curfn == e->swap_filen) *collision = 1; /* Mark an object alloc collision */ - if ((o >= newmb->diskstart) && (o < newmb->diskend)) { - storeRelease(e); - numreleased++; - } else - break; + assert((o >= newmb->diskstart) && (o < newmb->diskend)); + debug(79, 5) ("check: %s: stripe %d, releasing %p\n", SD->path, stripe, e); + storeRelease(e); + numreleased++; + m = n; } if (numreleased > 0) debug(79, 3) ("storeCossCreateMemBuf: this allocation released %d storeEntries\n", numreleased); @@ -619,7 +725,13 @@ CBDATA_INIT_TYPE_FREECB(storeIOState, storeCossIOFreeEntry); CBDATA_INIT_TYPE_FREECB(CossMemBuf, NULL); CBDATA_INIT_TYPE_FREECB(storeIOState, storeCossIOFreeEntry); - newmb = storeCossCreateMemBuf(sd, cs->current_offset, -1, NULL); + CBDATA_INIT_TYPE_FREECB(CossPendingReloc, NULL); + /* + * XXX for now we start at the beginning of the disk; + * The rebuild logic doesn't 'know' to pad out the current + * offset to make it a multiple of COSS_MEMBUF_SZ. + */ + newmb = storeCossCreateMemBuf(sd, 0, -1, NULL); assert(!cs->current_membuf); cs->current_membuf = newmb; } @@ -636,7 +748,12 @@ static off_t storeCossFilenoToDiskOffset(sfileno f, CossInfo * cs) { - return (off_t) f << cs->blksz_bits; + off_t doff; + + doff = (off_t) f; + doff <<= cs->blksz_bits; + assert(doff >= 0); + return doff; } static sfileno @@ -649,10 +766,294 @@ static void membuf_describe(CossMemBuf * t, int level, int line) { - debug(79, level) ("membuf %p, LC:%02d, ST:%010lu, FL:%c%c\n", + assert(t->lockcount >= 0); + debug(79, level) ("membuf id:%d (%p), LC:%02d, ST:%010lu, FL:%c%c%c\n", + t->stripe, t, t->lockcount, (unsigned long) t->diskstart, t->flags.full ? 'F' : '.', - t->flags.writing ? 'W' : '.'); + t->flags.writing ? 'W' : '.', + t->flags.written ? 'T' : '.'); +} + +int +storeCossFilenoToStripe(CossInfo *cs, sfileno filen) +{ + off_t o; + /* Calculate sfileno to disk offset */ + o = ((off_t) filen) << cs->blksz_bits; + /* Now, divide by COSS_MEMBUF_SZ to get which stripe it is in */ + return (int) (o / (off_t) COSS_MEMBUF_SZ); +} + +/* + * New stuff + */ +void +storeCossNewPendingRelocate(CossInfo *cs, storeIOState *sio, sfileno original_filen, sfileno new_filen) +{ + CossPendingReloc *pr; + char *p; + off_t disk_offset; + int stripe; + + pr = cbdataAlloc(CossPendingReloc); + cbdataLock(pr); + pr->cs = cs; + pr->original_filen = original_filen; + pr->new_filen = new_filen; + pr->len = sio->e->swap_file_sz; + debug(79, 3) ("COSS Pending Relocate: %d -> %d: beginning\n", pr->original_filen, pr->new_filen); + cs->pending_reloc_count++; + dlinkAddTail(pr, &pr->node, &cs->pending_relocs); + + /* Update the stripe count */ + stripe = storeCossFilenoToStripe(cs, original_filen); + assert(stripe >= 0); + assert(stripe < cs->numstripes); + assert(cs->stripes[stripe].pending_relocs >= 0); + cs->stripes[stripe].pending_relocs++; + + /* And now; we begin the IO */ + p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(new_filen, cs), NULL); + pr->p = p; + disk_offset = storeCossFilenoToDiskOffset(original_filen, cs); + debug(79, 3) ("COSS Pending Relocate: size %d, disk_offset %llu\n", (int) sio->e->swap_file_sz, disk_offset); +#if USE_AUFSOPS + /* NOTE: the damned buffer isn't passed into aioRead! */ + debug(79, 3) ("COSS: aioRead: FD %d, from %d -> %d, offset %llu, len: %d\n", cs->fd, pr->original_filen, pr->new_filen, disk_offset, pr->len); + aioRead(cs->fd, (off_t) disk_offset, pr->len, storeCossCompletePendingReloc, pr); +#else + a_file_read(&cs->aq, cs->fd, + p, + pr->len, + disk_offset, + storeCossCompletePendingReloc, + pr); +#endif +} + +CossPendingReloc * +storeCossGetPendingReloc(CossInfo *cs, sfileno new_filen) +{ + dlink_node *n; + CossPendingReloc *pr; + + n = cs->pending_relocs.head; + while (n != NULL) { + pr = n->data; + if (pr->new_filen == new_filen) { + return pr; + } + n = n->next; + } + return NULL; +} +#if USE_AUFSOPS +void +storeCossCompletePendingReloc(int fd, void *my_data, const char *buf, int aio_return, int aio_errno) +#else +void +storeCossCompletePendingReloc(int fd, const char *buf, int r_len, int r_errflag, void *my_data) +#endif +{ + CossPendingReloc *pr = my_data; + CossReadOp *op; + CossInfo *cs = pr->cs; + int stripe; + int errflag, len; +#if USE_AUFSOPS + char *p; +#endif + +#if USE_AUFSOPS + len = aio_return; + if (aio_errno) + errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR; + else + errflag = DISK_OK; +#else + errflag = r_errflag; + len = r_len; +#endif + + debug(79, 3) ("storeCossCompletePendingReloc: %p\n", pr); + assert(cbdataValid(pr)); + if (errflag != 0) { + coss_stats.read.fail++; + if (errflag > 0) { + errno = errflag; + debug(79, 1) ("storeCossCompletePendingReloc: error: %s\n", xstrerror()); + } else { + debug(79, 1) ("storeCossCompletePendingReloc: got failure (%d)\n", errflag); + } + } else { + debug(79, 3) ("COSS Pending Relocate: %d -> %d: completed\n", pr->original_filen, pr->new_filen); + coss_stats.read.success++; + } + /* aufs aioRead() doesn't take a buffer, it reads into its own. Grr */ +#if USE_AUFSOPS + p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(pr->new_filen, cs), NULL); + assert(p != NULL); + assert(p == pr->p); + xmemcpy(p, buf, len); +#endif + + /* Nope, we're not a pending relocate anymore! */ + dlinkDelete(&pr->node, &cs->pending_relocs); + + /* Update the stripe count */ + stripe = storeCossFilenoToStripe(cs, pr->original_filen); + assert(stripe >= 0); + assert(stripe < cs->numstripes); + assert(cs->stripes[stripe].pending_relocs >= 1); + cs->stripes[stripe].pending_relocs--; + + /* Relocate has completed; we can now complete pending read ops on this particular entry */ + while (pr->ops.head != NULL) { + op = pr->ops.head->data; + debug (79, 3) ("storeCossCompletePendingReloc: %p: dequeueing op %p\n", pr, op); + op->pr = NULL; + dlinkDelete(&op->pending_op_node, &pr->ops); + storeCossCompleteReadOp(cs, op, errflag); + /* XXX again, this shouldn't be here (find the dlinkAddTail() in storeCossKickReadOp); these should + * be abstracted out. */ + } + /* Good, now we can delete it */ + cbdataUnlock(pr); + cbdataFree(pr); + assert(cs->pending_reloc_count != 0); + cs->pending_reloc_count--; +} + +/* + * Begin a read operation + * + * the current 'state' of the read operation has already been set in storeIOState. + * + * We assume that the read operation will be from a currently in-memory MemBuf. + */ +CossReadOp * +storeCossCreateReadOp(CossInfo *cs, storeIOState *sio) +{ + CossReadOp *op; + CossState *cstate = sio->fsstate; + + /* Create entry */ + op = memPoolAlloc(coss_op_pool); + + debug(79, 3) ("COSS: Creating Read operation: %p: filen %d, offset %lld, size %lld\n", op, sio->swap_filen, (long long int) cstate->requestoffset, (long long int) cstate->requestlen); + + /* Fill in details */ + op->type = COSS_OP_READ; + op->sio = sio; + op->requestlen = cstate->requestlen; + op->requestoffset = cstate->requestoffset; + op->reqdiskoffset = cstate->reqdiskoffset; + op->requestbuf = cstate->requestbuf; + + /* Add to list */ + dlinkAddTail(op, &op->node, &cs->pending_ops); + return op; +} + +void +storeCossCompleteReadOp(CossInfo *cs, CossReadOp *op, int error) +{ + storeIOState *sio = op->sio; + STRCB *callback = sio->read.callback; + void *callback_data = sio->read.callback_data; + CossState *cstate = sio->fsstate; + ssize_t rlen = -1; + char *p; + SwapDir *SD = INDEXSD(sio->swap_dirn); + + debug(79, 3) ("storeCossCompleteReadOp: op %p, op dependencies satisfied, completing\n", op); + + assert(callback); + assert(callback_data); + assert(storeCossGetPendingReloc(cs, sio->swap_filen) == NULL); + /* and make sure we aren't on a pending op list! */ + assert(op->pr == NULL); + /* Is the callback still valid? If so; copy the data and callback */ + if (cbdataValid(callback_data) && cbdataValid(sio)) { + sio->read.callback = NULL; + sio->read.callback_data = NULL; + if (error == 0) { + /* P is the beginning of the object data we're interested in */ + p = storeCossMemPointerFromDiskOffset(cs, storeCossFilenoToDiskOffset(sio->swap_filen, SD->fsdata), NULL); + assert(p != NULL); + /* cstate->requestlen contains the current copy length */ + assert(cstate->requestlen == op->requestlen); + assert(cstate->requestbuf == op->requestbuf); + assert(cstate->requestoffset == op->requestoffset); + xmemcpy(cstate->requestbuf, &p[cstate->requestoffset], cstate->requestlen); + rlen = cstate->requestlen; + } + callback(callback_data, cstate->requestbuf, rlen); + } + + /* Remove from the operation list */ + dlinkDelete(&op->node, &cs->pending_ops); + + /* Completed! */ + memPoolFree(coss_op_pool, op); +} + +/* See if the read op can be satisfied now */ +void +storeCossKickReadOp(CossInfo *cs, CossReadOp *op) +{ + CossPendingReloc *pr; + + debug(79, 3) ("storeCossKickReadOp: op %p\n", op); + + if ((pr = storeCossGetPendingReloc(cs, op->sio->swap_filen)) == NULL) { + debug(79, 3) ("COSS: filen: %d, tis already in memory; serving.\n", op->sio->swap_filen); + storeCossCompleteReadOp(cs, op, 0); + } else { + debug(79, 3) ("COSS: filen: %d, not in memory, she'll have to wait.\n", op->sio->swap_filen); + /* XXX Eww, hack! It has to be done; but doing it here is yuck */ + if (op->pr == NULL) { + debug(79, 3) ("storeCossKickReadOp: %p: op not bound to a pending read %p; binding\n", op, pr); + dlinkAddTail(op, &op->pending_op_node, &pr->ops); + op->pr = pr; + } + } +} + +static void +membufsPrint(StoreEntry *e, CossMemBuf *t, char *prefix) +{ + storeAppendPrintf(e, "%s: %d, lockcount: %d, numobjects %d, flags: %s,%s,%s\n", + prefix, t->stripe, t->lockcount, t->numobjs, + t->flags.full ? "FULL" : "NOTFULL", + t->flags.writing ? "WRITING" : "NOTWRITING", + t->flags.written ? "WRITTEN" : "NOTWRITTEN"); +} + +void +membufsDump(CossInfo *cs, StoreEntry *e) +{ + dlink_node *m; + int i; + m = cs->membufs.head; + while (m != NULL) { + CossMemBuf *t = m->data; + membufsPrint(e, t, "Stripe"); + m = m->next; + } + m = cs->dead_membufs.head; + while (m != NULL) { + CossMemBuf *t = m->data; + membufsPrint(e, t, "Dead Stripe"); + m = m->next; + } + storeAppendPrintf(e, "Pending Relocations:\n"); + for (i = 0; i < cs->numstripes; i++) { + if (cs->stripes[i].pending_relocs > 0) { + storeAppendPrintf(e, " Stripe: %d Number: %d\n", i, cs->stripes[i].pending_relocs); + } + } }